Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Neumann Server Architecture

The Neumann Server (neumann_server) provides a gRPC server that exposes the Neumann database over the network. It serves as the network gateway for remote clients, wrapping the Query Router with authentication, TLS encryption, and streaming support for large result sets and blob storage.

The server follows four design principles: zero-configuration startup (works out of the box with sensible defaults), security-first (API key authentication with constant-time comparison, TLS support), streaming-native (all large operations use gRPC streaming), and health monitoring (automatic failure tracking with configurable thresholds).

Architecture Overview

flowchart TD
    subgraph Clients
        CLI[neumann_client]
        gRPC[gRPC Clients]
        Web[gRPC-Web Browsers]
    end

    subgraph NeumannServer
        QS[QueryService]
        BS[BlobService]
        HS[HealthService]
        RS[ReflectionService]
        Auth[Auth Middleware]
        TLS[TLS Layer]
    end

    subgraph Backend
        QR[QueryRouter]
        Blob[BlobStore]
    end

    CLI --> TLS
    gRPC --> TLS
    Web --> TLS
    TLS --> Auth
    Auth --> QS
    Auth --> BS
    Auth --> HS
    QS --> QR
    BS --> Blob
    RS --> |Service Discovery| gRPC

Key Types

TypeDescription
NeumannServerMain server struct with router, blob store, and configuration
ServerConfigConfiguration for bind address, TLS, auth, and limits
TlsConfigTLS certificate paths and client certificate settings
AuthConfigAPI key list, header name, and anonymous access control
ApiKeyIndividual API key with identity and optional description
QueryServiceImplgRPC service for query execution with streaming
BlobServiceImplgRPC service for artifact storage with streaming
HealthServiceImplgRPC service for health checks
HealthStateShared health state across services
ServerErrorError type for server operations

Server Configuration

FieldTypeDefaultDescription
bind_addrSocketAddr127.0.0.1:9200Server bind address
tlsOption<TlsConfig>NoneTLS configuration
authOption<AuthConfig>NoneAuthentication configuration
max_message_sizeusize64 MBMaximum gRPC message size
max_upload_sizeusize512 MBMaximum blob upload size
enable_grpc_webbooltrueEnable gRPC-web for browsers
enable_reflectionbooltrueEnable service reflection
blob_chunk_sizeusize64 KBChunk size for blob streaming
stream_channel_capacityusize32Bounded channel capacity for backpressure

Configuration Builder

#![allow(unused)]
fn main() {
use neumann_server::{ServerConfig, TlsConfig, AuthConfig, ApiKey};
use std::path::PathBuf;

let config = ServerConfig::new()
    .with_bind_addr("0.0.0.0:9443".parse()?)
    .with_tls(TlsConfig::new(
        PathBuf::from("server.crt"),
        PathBuf::from("server.key"),
    ))
    .with_auth(
        AuthConfig::new()
            .with_api_key(ApiKey::new(
                "sk-prod-key-12345678".to_string(),
                "service:backend".to_string(),
            ))
            .with_anonymous(false)
    )
    .with_max_message_size(128 * 1024 * 1024)
    .with_grpc_web(true)
    .with_reflection(true);
}

TLS Configuration

FieldTypeDefaultDescription
cert_pathPathBufRequiredPath to certificate file (PEM)
key_pathPathBufRequiredPath to private key file (PEM)
ca_cert_pathOption<PathBuf>NoneCA certificate for client auth
require_client_certboolfalseRequire client certificates

TLS Setup Example

#![allow(unused)]
fn main() {
use neumann_server::TlsConfig;
use std::path::PathBuf;

// Basic TLS
let tls = TlsConfig::new(
    PathBuf::from("/etc/neumann/server.crt"),
    PathBuf::from("/etc/neumann/server.key"),
);

// Mutual TLS (mTLS)
let tls = TlsConfig::new(
    PathBuf::from("/etc/neumann/server.crt"),
    PathBuf::from("/etc/neumann/server.key"),
)
.with_ca_cert(PathBuf::from("/etc/neumann/ca.crt"))
.with_required_client_cert(true);
}

Authentication

AuthConfig Options

FieldTypeDefaultDescription
api_keysVec<ApiKey>EmptyList of valid API keys
api_key_headerStringx-api-keyHeader name for API key
allow_anonymousboolfalseAllow unauthenticated access

API Key Validation

The server uses constant-time comparison to prevent timing attacks. All keys are checked regardless of match status to avoid leaking information about valid prefixes:

#![allow(unused)]
fn main() {
// Internal validation logic
fn validate_key(&self, key: &str) -> Option<&str> {
    let key_bytes = key.as_bytes();
    let mut found_identity: Option<&str> = None;

    for api_key in &self.api_keys {
        let stored_bytes = api_key.key.as_bytes();
        let max_len = stored_bytes.len().max(key_bytes.len());

        let mut matches: u8 = 1;
        for i in 0..max_len {
            let stored_byte = stored_bytes.get(i).copied().unwrap_or(0);
            let key_byte = key_bytes.get(i).copied().unwrap_or(0);
            matches &= u8::from(stored_byte == key_byte);
        }

        let lengths_match = u8::from(stored_bytes.len() == key_bytes.len());
        matches &= lengths_match;

        if matches == 1 {
            found_identity = Some(api_key.identity.as_str());
        }
    }

    found_identity
}
}

Authentication Flow

flowchart TD
    A[Request arrives] --> B{Auth configured?}
    B -->|No| C[Allow with no identity]
    B -->|Yes| D{API key header present?}
    D -->|No| E{Anonymous allowed?}
    E -->|Yes| C
    E -->|No| F[Return UNAUTHENTICATED]
    D -->|Yes| G{Key valid?}
    G -->|Yes| H[Allow with identity from key]
    G -->|No| F

gRPC Services

QueryService

The QueryService provides query execution with three RPC methods:

MethodTypeDescription
ExecuteUnaryExecute single query, return full result
ExecuteStreamServer streamingExecute query, stream results chunk by chunk
ExecuteBatchUnaryExecute multiple queries, return all results

Execute RPC

rpc Execute(QueryRequest) returns (QueryResponse);

message QueryRequest {
    string query = 1;
    optional string identity = 2;
}

message QueryResponse {
    oneof result {
        EmptyResult empty = 1;
        CountResult count = 2;
        RowsResult rows = 3;
        NodesResult nodes = 4;
        EdgesResult edges = 5;
        PathResult path = 6;
        SimilarResult similar = 7;
        TableListResult table_list = 8;
        BlobResult blob = 9;
        IdsResult ids = 10;
    }
    optional ErrorInfo error = 15;
}

ExecuteStream RPC

For large result sets (rows, nodes, edges, similar items, blobs), the streaming RPC sends results one item at a time:

rpc ExecuteStream(QueryRequest) returns (stream QueryResponseChunk);

message QueryResponseChunk {
    oneof chunk {
        RowChunk row = 1;
        NodeChunk node = 2;
        EdgeChunk edge = 3;
        SimilarChunk similar_item = 4;
        bytes blob_data = 5;
        ErrorInfo error = 15;
    }
    bool is_final = 16;
}

ExecuteBatch RPC

rpc ExecuteBatch(BatchQueryRequest) returns (BatchQueryResponse);

message BatchQueryRequest {
    repeated QueryRequest queries = 1;
}

message BatchQueryResponse {
    repeated QueryResponse results = 1;
}

Security Note: In batch execution, the authenticated request identity is always used. Per-query identity fields are ignored to prevent privilege escalation attacks.

BlobService

The BlobService provides artifact storage with streaming upload/download:

MethodTypeDescription
UploadClient streamingUpload artifact with metadata
DownloadServer streamingDownload artifact in chunks
DeleteUnaryDelete artifact
GetMetadataUnaryGet artifact metadata

Upload Protocol

sequenceDiagram
    participant C as Client
    participant S as BlobService

    C->>S: UploadMetadata (filename, content_type, tags)
    C->>S: Chunk 1
    C->>S: Chunk 2
    C->>S: ...
    C->>S: Chunk N (end stream)
    S->>C: UploadResponse (artifact_id, size, checksum)

The first message must be metadata, followed by data chunks:

rpc Upload(stream BlobUploadRequest) returns (BlobUploadResponse);

message BlobUploadRequest {
    oneof request {
        UploadMetadata metadata = 1;
        bytes chunk = 2;
    }
}

message UploadMetadata {
    string filename = 1;
    optional string content_type = 2;
    repeated string tags = 3;
}

message BlobUploadResponse {
    string artifact_id = 1;
    uint64 size = 2;
    string checksum = 3;
}

Download Protocol

rpc Download(BlobDownloadRequest) returns (stream BlobDownloadChunk);

message BlobDownloadRequest {
    string artifact_id = 1;
}

message BlobDownloadChunk {
    bytes data = 1;
    bool is_final = 2;
}

HealthService

The HealthService follows the gRPC health checking protocol:

rpc Check(HealthCheckRequest) returns (HealthCheckResponse);

message HealthCheckRequest {
    optional string service = 1;
}

message HealthCheckResponse {
    ServingStatus status = 1;
}

enum ServingStatus {
    UNSPECIFIED = 0;
    SERVING = 1;
    NOT_SERVING = 2;
}

Health Check Targets

Service NameChecks
Empty or ""Overall server health (all services)
neumann.v1.QueryServiceQuery service health
neumann.v1.BlobServiceBlob service health
Unknown serviceReturns UNSPECIFIED

Automatic Health Tracking

The QueryService tracks consecutive failures and marks itself unhealthy after reaching the threshold (default: 5 failures):

#![allow(unused)]
fn main() {
const FAILURE_THRESHOLD: u32 = 5;

fn record_failure(&self) {
    let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
    if failures >= FAILURE_THRESHOLD {
        if let Some(ref health) = self.health_state {
            health.set_query_service_healthy(false);
        }
    }
}

fn record_success(&self) {
    self.consecutive_failures.store(0, Ordering::SeqCst);
    if let Some(ref health) = self.health_state {
        health.set_query_service_healthy(true);
    }
}
}

Server Lifecycle

Startup Sequence

flowchart TD
    A[Create NeumannServer] --> B[Validate configuration]
    B --> C{TLS configured?}
    C -->|Yes| D[Load certificates]
    C -->|No| E[Plain TCP]
    D --> F[Build TLS config]
    F --> G[Create services]
    E --> G
    G --> H{gRPC-web enabled?}
    H -->|Yes| I[Add gRPC-web layer]
    H -->|No| J[Standard gRPC]
    I --> K{Reflection enabled?}
    J --> K
    K -->|Yes| L[Add reflection service]
    K -->|No| M[Start serving]
    L --> M
    M --> N[Accept connections]

Basic Server Setup

use neumann_server::{NeumannServer, ServerConfig};
use query_router::QueryRouter;
use std::sync::Arc;
use parking_lot::RwLock;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create router
    let router = Arc::new(RwLock::new(QueryRouter::new()));

    // Create server with default config
    let server = NeumannServer::new(router, ServerConfig::default());

    // Start serving (blocks until shutdown)
    server.serve().await?;

    Ok(())
}

Server with Shared Storage

For applications that need both query and blob services sharing the same storage:

use neumann_server::{NeumannServer, ServerConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ServerConfig::default();

    // Creates QueryRouter and BlobStore sharing the same TensorStore
    let server = NeumannServer::with_shared_storage(config).await?;

    server.serve().await?;

    Ok(())
}

Graceful Shutdown

use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let server = NeumannServer::with_shared_storage(ServerConfig::default()).await?;

    // Shutdown on Ctrl+C
    server.serve_with_shutdown(signal::ctrl_c().map(|_| ())).await?;

    Ok(())
}

Error Handling

Server Errors

ErrorCausegRPC Status
ConfigInvalid configurationINVALID_ARGUMENT
TransportNetwork/TLS failureUNAVAILABLE
QueryQuery execution failedINVALID_ARGUMENT
AuthAuthentication failedUNAUTHENTICATED
BlobBlob operation failedINTERNAL
InternalUnexpected server errorINTERNAL
InvalidArgumentBad request dataINVALID_ARGUMENT
NotFoundResource not foundNOT_FOUND
PermissionDeniedAccess deniedPERMISSION_DENIED
IoI/O errorINTERNAL

Error Conversion

#![allow(unused)]
fn main() {
impl From<ServerError> for Status {
    fn from(err: ServerError) -> Self {
        match &err {
            ServerError::Config(msg) => Status::invalid_argument(msg),
            ServerError::Transport(e) => Status::unavailable(e.to_string()),
            ServerError::Query(msg) => Status::invalid_argument(msg),
            ServerError::Auth(msg) => Status::unauthenticated(msg),
            ServerError::Blob(msg) => Status::internal(msg),
            ServerError::Internal(msg) => Status::internal(msg),
            ServerError::InvalidArgument(msg) => Status::invalid_argument(msg),
            ServerError::NotFound(msg) => Status::not_found(msg),
            ServerError::PermissionDenied(msg) => Status::permission_denied(msg),
            ServerError::Io(e) => Status::internal(e.to_string()),
        }
    }
}
}

Backpressure and Flow Control

Streaming Backpressure

The server uses bounded channels for streaming responses to prevent memory exhaustion:

#![allow(unused)]
fn main() {
// Default: 32 items buffered
let (tx, rx) = mpsc::channel(self.stream_channel_capacity);

tokio::spawn(async move {
    for item in results {
        // This will block if channel is full, providing backpressure
        if tx.send(Ok(item)).await.is_err() {
            // Receiver dropped, stop sending
            return;
        }
    }
});
}

Upload Size Limits

The BlobService enforces upload size limits:

#![allow(unused)]
fn main() {
if data.len().saturating_add(chunk.len()) > max_size {
    return Err(Status::resource_exhausted(format!(
        "upload exceeds maximum size of {max_size} bytes"
    )));
}
}

Production Deployment

#![allow(unused)]
fn main() {
let config = ServerConfig::new()
    .with_bind_addr("0.0.0.0:9443".parse()?)
    .with_tls(TlsConfig::new(
        PathBuf::from("/etc/neumann/tls/server.crt"),
        PathBuf::from("/etc/neumann/tls/server.key"),
    ))
    .with_auth(
        AuthConfig::new()
            .with_api_key(ApiKey::new(
                std::env::var("NEUMANN_API_KEY")?,
                "service:default".to_string(),
            ))
            .with_anonymous(false)
    )
    .with_max_message_size(64 * 1024 * 1024)
    .with_max_upload_size(1024 * 1024 * 1024)  // 1GB
    .with_stream_channel_capacity(64)
    .with_grpc_web(true)
    .with_reflection(false);  // Disable in production
}

Health Check Integration

Use health checks with load balancers:

# grpcurl health check
grpcurl -plaintext localhost:9200 neumann.v1.Health/Check

# With service name
grpcurl -plaintext -d '{"service":"neumann.v1.QueryService"}' \
    localhost:9200 neumann.v1.Health/Check

Logging

The server uses the tracing crate for structured logging:

#![allow(unused)]
fn main() {
use tracing_subscriber::FmtSubscriber;

let subscriber = FmtSubscriber::builder()
    .with_max_level(tracing::Level::INFO)
    .finish();
tracing::subscriber::set_global_default(subscriber)?;

// Server logs connection info and errors
// INFO: Starting Neumann gRPC server with TLS on 0.0.0.0:9443
// ERROR: Query execution error: table 'users' not found
}

Dependencies

CratePurpose
query_routerQuery execution backend
tensor_blobBlob storage backend
tensor_storeShared storage for both query and blob
tonicgRPC server framework
tonic-webgRPC-web layer for browser support
tonic-reflectionService reflection for debugging
tokioAsync runtime
parking_lotThread-safe router access
tracingStructured logging
thiserrorError type derivation
ModuleRelationship
neumann_clientClient SDK for connecting to this server
query_routerQuery execution backend
tensor_blobBlob storage backend
neumann_shellInteractive CLI (alternative interface)