Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Neumann Client Architecture

The Neumann Client (neumann_client) provides a Rust SDK for interacting with the Neumann database. It supports two modes: embedded mode for in-process database access via the Query Router, and remote mode for network access via gRPC to a Neumann Server.

The client follows four design principles: dual-mode flexibility (same API for embedded and remote), security-first (API keys are zeroized on drop), async-native (built on tokio for remote operations), and zero-copy where possible (streaming results for large datasets).

Architecture Overview

flowchart TD
    subgraph Application
        App[User Application]
    end

    subgraph NeumannClient
        Client[NeumannClient]
        Builder[ClientBuilder]
        Config[ClientConfig]
    end

    subgraph EmbeddedMode
        Router[QueryRouter]
        Store[TensorStore]
    end

    subgraph RemoteMode
        gRPC[gRPC Channel]
        TLS[TLS Layer]
    end

    subgraph NeumannServer
        Server[NeumannServer]
    end

    App --> Builder
    Builder --> Client
    Client -->|embedded| Router
    Router --> Store
    Client -->|remote| gRPC
    gRPC --> TLS
    TLS --> Server

Key Types

TypeDescription
NeumannClientMain client struct supporting both embedded and remote modes
ClientBuilderFluent builder for remote client connections
ClientConfigConfiguration for remote connections (address, API key, TLS)
ClientModeEnum: Embedded or Remote
ClientErrorError type for client operations
RemoteQueryResultWrapper for proto query response with typed accessors
QueryResultRe-export of query_router result type (embedded mode)

Client Modes

ModeFeature FlagUse Case
EmbeddedembeddedIn-process database, unit testing, CLI tools
Remoteremote (default)Production gRPC connections to server
FullfullBoth modes available

Feature Flags

[dependencies]
# Remote only (default)
neumann_client = "0.1"

# Embedded only
neumann_client = { version = "0.1", default-features = false, features = ["embedded"] }

# Both modes
neumann_client = { version = "0.1", features = ["full"] }

Client Configuration

FieldTypeDefaultDescription
addressStringlocalhost:9200Server address (host:port)
api_keyOption<String>NoneAPI key for authentication
tlsboolfalseEnable TLS encryption
timeout_msu6430000Request timeout in milliseconds

Security: API Key Zeroization

API keys are automatically zeroed from memory when the configuration is dropped to prevent credential leakage:

#![allow(unused)]
fn main() {
impl Drop for ClientConfig {
    fn drop(&mut self) {
        if let Some(ref mut key) = self.api_key {
            key.zeroize();  // Overwrites memory with zeros
        }
    }
}
}

Remote Mode

Connection Builder

The ClientBuilder provides a fluent API for configuring remote connections:

#![allow(unused)]
fn main() {
use neumann_client::NeumannClient;
use std::time::Duration;

// Minimal connection
let client = NeumannClient::connect("localhost:9200")
    .build()
    .await?;

// Full configuration
let client = NeumannClient::connect("db.example.com:9443")
    .api_key("sk-production-key")
    .with_tls()
    .timeout_ms(60_000)
    .build()
    .await?;
}

Connection Flow

sequenceDiagram
    participant App as Application
    participant Builder as ClientBuilder
    participant Client as NeumannClient
    participant Channel as gRPC Channel
    participant Server as NeumannServer

    App->>Builder: connect("address")
    App->>Builder: api_key("key")
    App->>Builder: with_tls()
    App->>Builder: build().await
    Builder->>Channel: Create endpoint
    Channel->>Server: TCP/TLS handshake
    Server-->>Channel: Connection established
    Channel-->>Builder: Channel ready
    Builder-->>Client: NeumannClient created
    Client-->>App: Ready for queries

Query Execution

#![allow(unused)]
fn main() {
// Single query
let result = client.execute("SELECT * FROM users").await?;

// With identity (for vault access)
let result = client
    .execute_with_identity("VAULT GET 'secret'", Some("service:backend"))
    .await?;

// Batch queries
let results = client
    .execute_batch(&[
        "CREATE TABLE orders (id:int, total:float)",
        "INSERT orders id=1, total=99.99",
        "SELECT orders",
    ])
    .await?;
}

RemoteQueryResult Accessors

The RemoteQueryResult wrapper provides typed access to query results:

#![allow(unused)]
fn main() {
let result = client.execute("SELECT * FROM users").await?;

// Check for errors
if result.has_error() {
    eprintln!("Error: {}", result.error_message().unwrap());
    return Err(...);
}

// Check result type
if result.is_empty() {
    println!("No results");
}

// Access typed data
if let Some(count) = result.count() {
    println!("Count: {}", count);
}

if let Some(rows) = result.rows() {
    for row in rows {
        println!("Row ID: {}", row.id);
    }
}

if let Some(nodes) = result.nodes() {
    for node in nodes {
        println!("Node: {} ({})", node.id, node.label);
    }
}

if let Some(edges) = result.edges() {
    for edge in edges {
        println!("Edge: {} -> {}", edge.from, edge.to);
    }
}

if let Some(similar) = result.similar() {
    for item in similar {
        println!("{}: {:.4}", item.key, item.score);
    }
}

// Access raw proto response
let proto = result.into_inner();
}

Blocking Connection

For synchronous contexts, use the blocking builder:

#![allow(unused)]
fn main() {
let client = NeumannClient::connect("localhost:9200")
    .api_key("test-key")
    .build_blocking()?;  // Creates temporary tokio runtime
}

Embedded Mode

Creating an Embedded Client

#![allow(unused)]
fn main() {
use neumann_client::NeumannClient;

// New embedded database
let client = NeumannClient::embedded()?;

// With custom router (for shared state)
use query_router::QueryRouter;
use std::sync::Arc;
use parking_lot::RwLock;

let router = Arc::new(RwLock::new(QueryRouter::new()));
let client = NeumannClient::with_router(router);
}

Synchronous Query Execution

Embedded mode provides synchronous execution for simpler code flow:

#![allow(unused)]
fn main() {
use neumann_client::QueryResult;

// Create table
let result = client.execute_sync("CREATE TABLE users (name:string, age:int)")?;
assert!(matches!(result, QueryResult::Empty));

// Insert data
let result = client.execute_sync("INSERT users name=\"Alice\", age=30")?;

// Query data
let result = client.execute_sync("SELECT users")?;
match result {
    QueryResult::Rows(rows) => {
        for row in rows {
            println!("{:?}", row);
        }
    }
    _ => {}
}
}

With Identity

#![allow(unused)]
fn main() {
// Set identity for vault access control
let result = client.execute_sync_with_identity(
    "VAULT GET 'api_secret'",
    Some("service:backend"),
)?;
}

Error Handling

Error Types

ErrorCodeRetryableDescription
Connection6YesFailed to connect to server
Query9NoQuery execution failed
Authentication5NoInvalid API key
PermissionDenied3NoAccess denied
NotFound2NoResource not found
InvalidArgument1NoBad request data
Parse8NoQuery parse error
Internal7NoServer internal error
Timeout6YesRequest timed out
Unavailable6YesServer unavailable

Error Methods

#![allow(unused)]
fn main() {
let err = ClientError::Connection("connection refused".to_string());

// Get error code
let code = err.code();  // 6

// Check if retryable
if err.is_retryable() {
    // Retry with exponential backoff
}

// Display error
eprintln!("Error: {}", err);  // "connection error: connection refused"
}

Error Handling Pattern

#![allow(unused)]
fn main() {
use neumann_client::ClientError;

match client.execute("SELECT * FROM users").await {
    Ok(result) => {
        if result.has_error() {
            // Query-level error (e.g., table not found)
            eprintln!("Query error: {}", result.error_message().unwrap());
        } else {
            // Process results
        }
    }
    Err(ClientError::Connection(msg)) => {
        // Network error - maybe retry
        eprintln!("Connection failed: {}", msg);
    }
    Err(ClientError::Authentication(msg)) => {
        // Bad credentials - check API key
        eprintln!("Auth failed: {}", msg);
    }
    Err(ClientError::Timeout(msg)) => {
        // Request too slow - maybe retry with longer timeout
        eprintln!("Timeout: {}", msg);
    }
    Err(e) => {
        eprintln!("Unexpected error: {}", e);
    }
}
}

Conversion from gRPC Status

Remote errors are automatically converted from tonic Status:

#![allow(unused)]
fn main() {
impl From<tonic::Status> for ClientError {
    fn from(status: tonic::Status) -> Self {
        match status.code() {
            Code::InvalidArgument => Self::InvalidArgument(status.message().to_string()),
            Code::NotFound => Self::NotFound(status.message().to_string()),
            Code::PermissionDenied => Self::PermissionDenied(status.message().to_string()),
            Code::Unauthenticated => Self::Authentication(status.message().to_string()),
            Code::Unavailable => Self::Unavailable(status.message().to_string()),
            Code::DeadlineExceeded => Self::Timeout(status.message().to_string()),
            _ => Self::Internal(status.message().to_string()),
        }
    }
}
}

Connection Management

Connection State

#![allow(unused)]
fn main() {
let client = NeumannClient::connect("localhost:9200")
    .build()
    .await?;

// Check mode
match client.mode() {
    ClientMode::Embedded => println!("In-process"),
    ClientMode::Remote => println!("Connected to server"),
}

// Check connection status
if client.is_connected() {
    // Ready for queries
}
}

Closing Connections

#![allow(unused)]
fn main() {
let mut client = NeumannClient::connect("localhost:9200")
    .build()
    .await?;

// Explicit close
client.close();

// Or automatic on drop
drop(client);  // Connection closed, API key zeroized
}

Usage Examples

Complete Remote Example

use neumann_client::{NeumannClient, ClientError};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to server
    let client = NeumannClient::connect("localhost:9200")
        .api_key(std::env::var("NEUMANN_API_KEY")?)
        .with_tls()
        .timeout_ms(30_000)
        .build()
        .await?;

    // Create schema
    client.execute("CREATE TABLE products (name:string, price:float)").await?;

    // Insert data
    client.execute("INSERT products name=\"Widget\", price=9.99").await?;
    client.execute("INSERT products name=\"Gadget\", price=19.99").await?;

    // Query data
    let result = client.execute("SELECT products WHERE price > 10").await?;

    if let Some(rows) = result.rows() {
        for row in rows {
            println!("Product: {:?}", row);
        }
    }

    Ok(())
}

Complete Embedded Example

use neumann_client::{NeumannClient, QueryResult};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create embedded client
    let client = NeumannClient::embedded()?;

    // Create schema
    client.execute_sync("CREATE TABLE events (name:string, timestamp:int)")?;

    // Insert data
    client.execute_sync("INSERT events name=\"login\", timestamp=1700000000")?;

    // Query data
    match client.execute_sync("SELECT events")? {
        QueryResult::Rows(rows) => {
            println!("Found {} events", rows.len());
            for row in rows {
                println!("  {:?}", row);
            }
        }
        _ => println!("Unexpected result type"),
    }

    Ok(())
}

Testing with Embedded Mode

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use neumann_client::{NeumannClient, QueryResult};

    #[test]
    fn test_user_creation() {
        let client = NeumannClient::embedded().unwrap();

        // Setup
        client
            .execute_sync("CREATE TABLE users (email:string, active:bool)")
            .unwrap();

        // Test
        client
            .execute_sync("INSERT users email=\"test@example.com\", active=true")
            .unwrap();

        // Verify
        let result = client.execute_sync("SELECT users").unwrap();
        match result {
            QueryResult::Rows(rows) => {
                assert_eq!(rows.len(), 1);
            }
            _ => panic!("Expected rows"),
        }
    }
}
}

Shared Router Between Clients

#![allow(unused)]
fn main() {
use neumann_client::NeumannClient;
use query_router::QueryRouter;
use std::sync::Arc;
use parking_lot::RwLock;

// Create shared router
let router = Arc::new(RwLock::new(QueryRouter::new()));

// Create multiple clients sharing same state
let client1 = NeumannClient::with_router(Arc::clone(&router));
let client2 = NeumannClient::with_router(Arc::clone(&router));

// Changes from client1 visible to client2
client1.execute_sync("CREATE TABLE shared (x:int)")?;
let result = client2.execute_sync("SELECT shared")?;  // Works!
}

Best Practices

Connection Reuse

Create one client and reuse it for multiple queries:

#![allow(unused)]
fn main() {
// Good: Reuse client
let client = NeumannClient::connect("localhost:9200").build().await?;
for query in queries {
    client.execute(&query).await?;
}

// Bad: New connection per query
for query in queries {
    let client = NeumannClient::connect("localhost:9200").build().await?;
    client.execute(&query).await?;
}  // Connection overhead for each query
}

Timeout Configuration

Set appropriate timeouts based on query complexity:

#![allow(unused)]
fn main() {
// Quick queries
let client = NeumannClient::connect("localhost:9200")
    .timeout_ms(5_000)  // 5 seconds
    .build()
    .await?;

// Complex analytics
let client = NeumannClient::connect("localhost:9200")
    .timeout_ms(300_000)  // 5 minutes
    .build()
    .await?;
}

API Key Security

Never hardcode API keys:

#![allow(unused)]
fn main() {
// Good: Environment variable
let api_key = std::env::var("NEUMANN_API_KEY")?;
let client = NeumannClient::connect("localhost:9200")
    .api_key(api_key)
    .build()
    .await?;

// Bad: Hardcoded key
let client = NeumannClient::connect("localhost:9200")
    .api_key("sk-secret-key-12345")  // Will be in binary!
    .build()
    .await?;
}

Dependencies

CratePurposeFeature
query_routerEmbedded mode query executionembedded
tonicgRPC clientremote
tokioAsync runtimeremote
parking_lotThread-safe router accessembedded
zeroizeSecure memory clearingAlways
thiserrorError type derivationAlways
tracingStructured loggingAlways
ModuleRelationship
neumann_serverServer counterpart for remote mode
query_routerQuery execution backend for embedded mode
neumann_shellAlternative interactive interface