Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Python SDK Architecture

The Python SDK (neumann-db) provides a Python client for the Neumann database with support for both embedded mode (via PyO3 bindings) and remote mode (via gRPC). It includes async support and integrations for pandas and numpy.

The SDK follows four design principles: Pythonic API (context managers, type hints, dataclasses), dual-mode (same API for embedded and remote), async-first (native asyncio support), and ecosystem integration (pandas DataFrame and numpy array support).

Architecture Overview

flowchart TD
    subgraph Application
        App[Python Application]
    end

    subgraph SDK[neumann-db]
        Client[NeumannClient]
        AsyncClient[AsyncNeumannClient]
        Tx[Transaction]
        Types[Data Types]
        Errors[Error Classes]
    end

    subgraph Integrations
        Pandas[pandas Integration]
        Numpy[numpy Integration]
    end

    subgraph EmbeddedMode[Embedded Mode]
        PyO3[_native PyO3 Module]
        Router[QueryRouter]
    end

    subgraph RemoteMode[Remote Mode]
        gRPC[grpcio]
        Proto[Proto Stubs]
    end

    App --> Client
    App --> AsyncClient
    Client --> Tx
    Client --> Types
    Client --> Errors
    Client --> Pandas
    Client --> Numpy
    Client -->|embedded| PyO3
    PyO3 --> Router
    Client -->|remote| gRPC
    AsyncClient --> gRPC
    gRPC --> Proto
    Proto --> Server[NeumannServer]

Installation

# Basic installation (remote mode only)
pip install neumann-db

# With native module for embedded mode
pip install neumann-db[native]

# With pandas integration
pip install neumann-db[pandas]

# With numpy integration
pip install neumann-db[numpy]

# Full installation
pip install neumann-db[full]

Key Types

TypeDescription
NeumannClientSynchronous client supporting both modes
AsyncNeumannClientAsync client for remote mode
TransactionTransaction context manager
QueryResultQuery result with typed accessors
QueryResultTypeEnum of result types
ValueTyped scalar value
ScalarTypeEnum of scalar types
RowRelational row with typed column accessors
NodeGraph node with properties
EdgeGraph edge with properties
PathGraph path as list of segments
PathSegmentPath segment (node + optional edge)
SimilarItemVector similarity result
ArtifactInfoBlob artifact metadata
NeumannErrorBase exception class

Client Modes

ModeClass MethodRequirementsUse Case
EmbeddedNeumannClient.embedded()neumann-db[native]Testing, CLI tools
RemoteNeumannClient.connect()grpcioProduction
Async RemoteAsyncNeumannClient.connect()grpcioAsync applications

Synchronous Client

Embedded Mode

from neumann import NeumannClient

# In-memory database
client = NeumannClient.embedded()

# Persistent storage
client = NeumannClient.embedded(path="/path/to/data")

# Use as context manager
with NeumannClient.embedded() as client:
    client.execute("CREATE TABLE users (name:string)")

Remote Mode

from neumann import NeumannClient

# Basic connection
client = NeumannClient.connect("localhost:9200")

# With authentication and TLS
client = NeumannClient.connect(
    "db.example.com:9443",
    api_key="your-api-key",
    tls=True,
)

# Context manager
with NeumannClient.connect("localhost:9200") as client:
    result = client.execute("SELECT users")

Query Execution

# Single query
result = client.execute("SELECT users")

# With identity for vault access
result = client.execute(
    "VAULT GET 'secret'",
    identity="service:backend",
)

# Streaming query
for chunk in client.execute_stream("SELECT large_table"):
    for row in chunk.rows:
        print(row.to_dict())

# Batch execution
results = client.execute_batch([
    "CREATE TABLE orders (id:int, total:float)",
    "INSERT orders id=1, total=99.99",
    "SELECT orders",
])

Async Client

The async client supports remote mode only (PyO3 has threading limitations):

from neumann.aio import AsyncNeumannClient

# Connect
client = await AsyncNeumannClient.connect(
    "localhost:9200",
    api_key="your-api-key",
)

# Execute query
result = await client.execute("SELECT users")

# Streaming
async for chunk in client.execute_stream("SELECT large_table"):
    for row in chunk.rows:
        print(row.to_dict())

# Batch
results = await client.execute_batch(queries)

# Close
await client.close()

Async Context Manager

async with await AsyncNeumannClient.connect("localhost:9200") as client:
    result = await client.execute("SELECT users")
    for row in result.rows:
        print(row.to_dict())

Run Embedded in Async Context

Use run_in_executor to use embedded mode from async code:

async def query_embedded():
    client = await AsyncNeumannClient.connect("localhost:9200")
    # This runs the embedded client in a thread pool
    result = await client.run_in_executor("SELECT users")
    return result

Transaction Support

Transactions provide automatic commit/rollback with context managers:

from neumann import NeumannClient, Transaction

client = NeumannClient.connect("localhost:9200")

# Using Transaction directly
tx = Transaction(client)
tx.begin()
try:
    tx.execute("INSERT users name='Alice'")
    tx.execute("INSERT users name='Bob'")
    tx.commit()
except Exception:
    tx.rollback()
    raise

# Using context manager (preferred)
with Transaction(client) as tx:
    tx.execute("INSERT users name='Alice'")
    tx.execute("INSERT users name='Bob'")
    # Auto-commits on success, auto-rollbacks on exception

Transaction Properties

PropertyTypeDescription
is_activeboolTrue if transaction is active

Transaction Methods

MethodDescription
begin()Start the transaction
commit()Commit the transaction
rollback()Rollback the transaction
execute(query)Execute query within transaction

Query Result Types

The QueryResult class provides typed access to query results:

PropertyReturn TypeDescription
typeQueryResultTypeResult type enum
is_emptyboolTrue if empty result
is_errorboolTrue if error result
valuestr or NoneSingle value result
countint or NoneRow count
rowslist[Row]Relational rows
nodeslist[Node]Graph nodes
edgeslist[Edge]Graph edges
pathslist[Path]Graph paths
similar_itemslist[SimilarItem]Similarity results
idslist[str]ID list
table_nameslist[str]Table names
blob_databytes or NoneBinary data
blob_infoArtifactInfo or NoneBlob metadata
error_messagestr or NoneError message

Result Type Enum

from neumann import QueryResultType

result = client.execute(query)

match result.type:
    case QueryResultType.EMPTY:
        print("OK")
    case QueryResultType.COUNT:
        print(f"{result.count} rows affected")
    case QueryResultType.ROWS:
        for row in result.rows:
            print(row.to_dict())
    case QueryResultType.NODES:
        for node in result.nodes:
            print(f"[{node.id}] {node.label}")
    case QueryResultType.SIMILAR:
        for item in result.similar_items:
            print(f"{item.key}: {item.score:.4f}")
    case QueryResultType.ERROR:
        raise Exception(result.error_message)

Data Types

Value

Immutable typed scalar value:

from neumann import Value, ScalarType

# Create values
v1 = Value.null()
v2 = Value.int_(42)
v3 = Value.float_(3.14)
v4 = Value.string("hello")
v5 = Value.bool_(True)
v6 = Value.bytes_(b"data")

# Access type and data
print(v2.type)  # ScalarType.INT
print(v2.data)  # 42

# Convert to Python native type
native = v2.as_python()  # 42

Row

Relational row with typed accessors:

from neumann import Row

row = result.rows[0]

# Get raw Value
val = row.get("name")

# Get typed values
name: str | None = row.get_string("name")
age: int | None = row.get_int("age")
score: float | None = row.get_float("score")
active: bool | None = row.get_bool("active")

# Convert to dict
data = row.to_dict()  # {"name": "Alice", "age": 30}

Node

Graph node with properties:

from neumann import Node

node = result.nodes[0]

print(node.id)      # "1"
print(node.label)   # "person"

# Get property
name = node.get_property("name")

# Convert to dict
data = node.to_dict()
# {"id": "1", "label": "person", "properties": {"name": "Alice"}}

Edge

Graph edge with properties:

from neumann import Edge

edge = result.edges[0]

print(edge.id)         # "1"
print(edge.edge_type)  # "knows"
print(edge.source)     # "1"
print(edge.target)     # "2"

# Get property
since = edge.get_property("since")

# Convert to dict
data = edge.to_dict()
# {"id": "1", "type": "knows", "source": "1", "target": "2", "properties": {}}

Path

Graph path as segments:

from neumann import Path

path = result.paths[0]

# Get all nodes in path
nodes = path.nodes  # [Node, Node, ...]

# Get all edges in path
edges = path.edges  # [Edge, Edge, ...]

# Path length
length = len(path)

# Iterate segments
for segment in path.segments:
    print(f"Node: {segment.node.id}")
    if segment.edge:
        print(f"  -> via edge {segment.edge.id}")

SimilarItem

Vector similarity result:

from neumann import SimilarItem

for item in result.similar_items:
    print(f"Key: {item.key}")
    print(f"Score: {item.score:.4f}")
    if item.metadata:
        print(f"Metadata: {item.metadata}")

ArtifactInfo

Blob artifact metadata:

from neumann import ArtifactInfo

info = result.blob_info
print(f"ID: {info.artifact_id}")
print(f"Filename: {info.filename}")
print(f"Size: {info.size} bytes")
print(f"Checksum: {info.checksum}")
print(f"Content-Type: {info.content_type}")
print(f"Created: {info.created_at}")
print(f"Tags: {info.tags}")

Error Handling

Error Codes

CodeNameDescription
0UNKNOWNUnknown error
1INVALID_ARGUMENTBad request data
2NOT_FOUNDResource not found
3PERMISSION_DENIEDAccess denied
4ALREADY_EXISTSResource exists
5UNAUTHENTICATEDAuth failed
6UNAVAILABLEServer unavailable
7INTERNALInternal error
8PARSE_ERRORQuery parse error
9QUERY_ERRORQuery execution error

Error Classes

from neumann import (
    NeumannError,
    ConnectionError,
    AuthenticationError,
    PermissionError,
    NotFoundError,
    InvalidArgumentError,
    ParseError,
    QueryError,
    InternalError,
    ErrorCode,
)

try:
    result = client.execute("SELECT nonexistent")
except ConnectionError as e:
    print(f"Connection failed: {e.message}")
except AuthenticationError:
    print("Check your API key")
except ParseError as e:
    print(f"Query syntax error: {e.message}")
except NeumannError as e:
    print(f"[{e.code.name}] {e.message}")

Error Factory

from neumann.errors import error_from_code, ErrorCode

# Create error from code
error = error_from_code(ErrorCode.NOT_FOUND, "Table 'users' not found")
# Returns NotFoundError instance

Pandas Integration

Convert query results to pandas DataFrames:

from neumann.integrations.pandas import (
    result_to_dataframe,
    rows_to_dataframe,
    dataframe_to_inserts,
)

# Result to DataFrame
result = client.execute("SELECT users")
df = result_to_dataframe(result)

# Rows to DataFrame
df = rows_to_dataframe(result.rows)

# DataFrame to INSERT statements
inserts = dataframe_to_inserts(
    df,
    table="users",
    column_mapping={"user_name": "name"},  # Optional column rename
)

# Execute inserts
for query in inserts:
    client.execute(query)

NumPy Integration

Work with vectors using numpy arrays:

from neumann.integrations.numpy import (
    vector_to_insert,
    vectors_to_inserts,
    parse_embedding,
    cosine_similarity,
    euclidean_distance,
    normalize_vectors,
)
import numpy as np

# Single vector to INSERT
query = vector_to_insert("doc1", np.array([0.1, 0.2, 0.3]))
client.execute(query)

# Multiple vectors
vectors = {
    "doc1": np.array([0.1, 0.2, 0.3]),
    "doc2": np.array([0.4, 0.5, 0.6]),
}
queries = vectors_to_inserts(vectors, normalize=True)
for q in queries:
    client.execute(q)

# Parse embedding from result
embedding = parse_embedding("[0.1, 0.2, 0.3]")

# Distance calculations
sim = cosine_similarity(vec1, vec2)
dist = euclidean_distance(vec1, vec2)

# Batch normalization
normalized = normalize_vectors(np.array([vec1, vec2, vec3]))

Usage Examples

Complete CRUD Example

from neumann import NeumannClient

with NeumannClient.connect("localhost:9200") as client:
    # Create table
    client.execute("CREATE TABLE products (name:string, price:float)")

    # Insert data
    client.execute('INSERT products name="Widget", price=9.99')
    client.execute('INSERT products name="Gadget", price=19.99')

    # Query data
    result = client.execute("SELECT products WHERE price > 10")
    for row in result.rows:
        print(row.to_dict())

    # Update
    client.execute('UPDATE products SET price=24.99 WHERE name="Gadget"')

    # Delete
    client.execute("DELETE products WHERE price < 15")

    # Drop table
    client.execute("DROP TABLE products")

Graph Operations

from neumann import NeumannClient

with NeumannClient.connect("localhost:9200") as client:
    # Create nodes
    client.execute('NODE CREATE person {name: "Alice", age: 30}')
    client.execute('NODE CREATE person {name: "Bob", age: 25}')

    # Create edge
    client.execute("EDGE CREATE 1 -> 2 : knows {since: 2020}")

    # List nodes
    result = client.execute("NODE LIST person")
    for node in result.nodes:
        print(f"[{node.id}] {node.label}: {node.to_dict()['properties']}")

    # Find neighbors
    result = client.execute("NEIGHBORS 1 OUTGOING")

    # Find path
    result = client.execute("PATH 1 -> 2")
    if result.paths:
        path = result.paths[0]
        print(" -> ".join(n.id for n in path.nodes))

Vector Search with NumPy

from neumann import NeumannClient
from neumann.integrations.numpy import vector_to_insert, normalize_vectors
import numpy as np

with NeumannClient.connect("localhost:9200") as client:
    # Generate and store embeddings
    embeddings = np.random.randn(100, 768).astype(np.float32)
    embeddings = normalize_vectors(embeddings)

    for i, emb in enumerate(embeddings):
        query = vector_to_insert(f"doc{i}", emb)
        client.execute(query)

    # Query vector
    query_vec = np.random.randn(768).astype(np.float32)
    query_str = vector_to_insert("query", query_vec)
    client.execute(query_str)

    # Find similar
    result = client.execute('SIMILAR "query" COSINE LIMIT 10')
    for item in result.similar_items:
        print(f"{item.key}: {item.score:.4f}")

Async Web Application

from fastapi import FastAPI
from neumann.aio import AsyncNeumannClient

app = FastAPI()
client: AsyncNeumannClient | None = None

@app.on_event("startup")
async def startup():
    global client
    client = await AsyncNeumannClient.connect(
        "localhost:9200",
        api_key="your-api-key",
    )

@app.on_event("shutdown")
async def shutdown():
    if client:
        await client.close()

@app.get("/users")
async def get_users():
    result = await client.execute("SELECT users")
    return [row.to_dict() for row in result.rows]

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    result = await client.execute(f"SELECT users WHERE id = {user_id}")
    if result.rows:
        return result.rows[0].to_dict()
    return {"error": "Not found"}

Dependencies

PackagePurposeExtra
grpciogRPC clientDefault
protobufProtocol buffersDefault
neumann-nativePyO3 bindings[native]
pandasDataFrame support[pandas]
numpyArray support[numpy]
ModuleRelationship
neumann_serverServer that this SDK connects to
neumann_clientRust SDK with same capabilities
@neumann/clientTypeScript SDK with same API design