Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Tensor Checkpoint

Tensor Checkpoint provides point-in-time snapshots of the database state for recovery operations. It enables users to create manual checkpoints before important operations, automatically checkpoint before destructive operations, and rollback to any previous checkpoint. Checkpoints are stored as blob artifacts in tensor_blob for content-addressable storage with automatic deduplication.

The module integrates with the query router to provide SQL-like commands (CHECKPOINT, CHECKPOINTS, ROLLBACK TO) and supports interactive confirmation prompts for destructive operations with configurable retention policies.

Module Structure

tensor_checkpoint/
  src/
    lib.rs          # CheckpointManager, CheckpointConfig
    state.rs        # CheckpointState, DestructiveOp, metadata types
    storage.rs      # Blob storage integration
    retention.rs    # Count-based purge logic
    preview.rs      # Destructive operation previews
    error.rs        # Error types

Key Types

Core Types

TypeDescription
CheckpointManagerMain API for checkpoint operations
CheckpointConfigConfiguration (retention, auto-checkpoint, interactive mode)
CheckpointStateFull checkpoint data with snapshot and metadata
CheckpointInfoLightweight checkpoint listing info
CheckpointTriggerContext for auto-checkpoints (command, operation, preview)

State Types

TypeDescription
DestructiveOpEnum of destructive operations that trigger auto-checkpoints
OperationPreviewSummary and sample data for confirmation prompts
CheckpointMetadataStatistics for validation (tables, nodes, embeddings)
RelationalMetaTable and row counts
GraphMetaNode and edge counts
VectorMetaEmbedding count

Error Types

VariantDescriptionCommon Cause
NotFoundCheckpoint not found by ID or nameTypo in checkpoint name or ID was pruned by retention
StorageBlob storage errorDisk full, permissions issue
SerializationBincode serialization errorCorrupt in-memory state
DeserializationBincode deserialization errorCorrupt checkpoint file
BlobUnderlying blob store errorBlobStore not initialized
SnapshotTensorStore snapshot errorStore locked or corrupted
CancelledOperation cancelled by userUser rejected confirmation prompt
InvalidIdInvalid checkpoint identifierEmpty or malformed ID string
RetentionRetention enforcement errorFailed to delete old checkpoints

Architecture

flowchart TB
    subgraph Commands
        CP[CHECKPOINT]
        CPS[CHECKPOINTS]
        RB[ROLLBACK TO]
    end

    subgraph CheckpointManager
        Create[create / create_auto]
        List[list]
        Rollback[rollback]
        Delete[delete]
        Confirm[request_confirmation]
        Preview[generate_preview]
    end

    subgraph Storage Layer
        CS[CheckpointStorage]
        RM[RetentionManager]
        PG[PreviewGenerator]
    end

    subgraph Dependencies
        Blob[tensor_blob::BlobStore]
        Store[tensor_store::TensorStore]
    end

    CP --> Create
    CPS --> List
    RB --> Rollback

    Create --> CS
    Create --> RM
    List --> CS
    Rollback --> CS
    Delete --> CS

    Confirm --> PG
    Preview --> PG

    CS --> Blob
    Create --> Store
    Rollback --> Store

Checkpoint Creation Flow

sequenceDiagram
    participant User
    participant Manager as CheckpointManager
    participant Store as TensorStore
    participant Storage as CheckpointStorage
    participant Retention as RetentionManager
    participant Blob as BlobStore

    User->>Manager: create(name, store)
    Manager->>Manager: Generate UUID
    Manager->>Manager: collect_metadata(store)
    Manager->>Store: snapshot_bytes()
    Store-->>Manager: Vec<u8>
    Manager->>Manager: Create CheckpointState
    Manager->>Storage: store(state, blob)
    Storage->>Storage: bitcode::encode(state)
    Storage->>Blob: put(filename, data, options)
    Blob-->>Storage: artifact_id
    Storage-->>Manager: artifact_id
    Manager->>Retention: enforce(blob)
    Retention->>Storage: list(blob)
    Storage-->>Retention: Vec<CheckpointInfo>
    Retention->>Retention: Sort by created_at DESC
    Retention->>Storage: delete(oldest beyond limit)
    Retention-->>Manager: deleted_count
    Manager-->>User: checkpoint_id

Rollback Flow

sequenceDiagram
    participant User
    participant Manager as CheckpointManager
    participant Storage as CheckpointStorage
    participant Blob as BlobStore
    participant Store as TensorStore

    User->>Manager: rollback(id_or_name, store)
    Manager->>Storage: load(id_or_name, blob)
    Storage->>Storage: find_by_id_or_name()
    Storage->>Storage: list() and match
    Storage->>Blob: get(artifact_id)
    Blob-->>Storage: checkpoint_bytes
    Storage->>Storage: bitcode::decode()
    Storage-->>Manager: CheckpointState
    Manager->>Store: restore_from_bytes(state.store_snapshot)
    Store->>Store: SlabRouter::from_bytes()
    Store->>Store: clear() current data
    Store->>Store: copy all entries from new router
    Store-->>Manager: Ok(())
    Manager-->>User: Success

Storage Format

Checkpoints are stored as blob artifacts using content-addressable storage:

PropertyValue
Tag_system:checkpoint
Content-Typeapplication/x-neumann-checkpoint
Formatbincode-serialized CheckpointState
Filenamecheckpoint_{id}.ncp
Creatorsystem:checkpoint

Checkpoint State Structure

The CheckpointState is serialized using bincode for efficient binary encoding:

#![allow(unused)]
fn main() {
#[derive(Serialize, Deserialize)]
pub struct CheckpointState {
    pub id: String,           // UUID v4
    pub name: String,         // User-provided or auto-generated
    pub created_at: u64,      // Unix timestamp (seconds)
    pub trigger: Option<CheckpointTrigger>,  // For auto-checkpoints
    pub store_snapshot: Vec<u8>,  // Serialized SlabRouterSnapshot
    pub metadata: CheckpointMetadata,
}
}

Snapshot Serialization Format

The store_snapshot field contains a V3 format snapshot:

#![allow(unused)]
fn main() {
// V3 snapshot structure (bincode serialized)
pub struct V3Snapshot {
    pub header: SnapshotHeader,     // Magic bytes, version, entry count
    pub router: SlabRouterSnapshot, // All slab data
}

pub struct SlabRouterSnapshot {
    pub index: EntityIndexSnapshot,      // Key-to-entity mapping
    pub embeddings: EmbeddingSlabSnapshot,
    pub graph: GraphTensorSnapshot,
    pub relations: RelationalSlabSnapshot,
    pub metadata: MetadataSlabSnapshot,
    pub cache: CacheRingSnapshot<TensorData>,
    pub blobs: BlobLogSnapshot,
}
}

Custom metadata stored with each artifact:

KeyTypeDescription
checkpoint_idStringUUID identifier
checkpoint_nameStringUser-provided or auto-generated name
created_atStringUnix timestamp (parsed to u64)
triggerStringOperation name (for auto-checkpoints only)

Metadata Collection Algorithm

When creating a checkpoint, metadata is collected by scanning the store:

#![allow(unused)]
fn main() {
fn collect_metadata(&self, store: &TensorStore) -> CheckpointMetadata {
    let store_key_count = store.len();

    // Count relational tables by scanning _schema: prefix
    let table_keys: Vec<_> = store.scan("_schema:");
    let table_count = table_keys.len();
    let mut total_rows = 0;
    for key in &table_keys {
        if let Some(table_name) = key.strip_prefix("_schema:") {
            total_rows += store.scan_count(&format!("{table_name}:"));
        }
    }

    // Count graph entities
    let node_count = store.scan_count("node:");
    let edge_count = store.scan_count("edge:");

    // Count embeddings
    let embedding_count = store.scan_count("_embed:");

    CheckpointMetadata::new(
        RelationalMeta::new(table_count, total_rows),
        GraphMeta::new(node_count, edge_count),
        VectorMeta::new(embedding_count),
        store_key_count,
    )
}
}

Configuration

CheckpointConfig

FieldTypeDefaultDescription
max_checkpointsusize10Maximum checkpoints before pruning
auto_checkpointbooltrueEnable auto-checkpoints before destructive ops
interactive_confirmbooltrueRequire confirmation for destructive ops
preview_sample_sizeusize5Number of sample rows in previews

Builder Pattern

#![allow(unused)]
fn main() {
let config = CheckpointConfig::default()
    .with_max_checkpoints(20)
    .with_auto_checkpoint(true)
    .with_interactive_confirm(false)
    .with_preview_sample_size(10);
}

Configuration Presets

Presetmax_checkpointsauto_checkpointinteractive_confirmUse Case
Default10truetrueInteractive CLI usage
Automated20truefalseBatch processing scripts
Minimal3falsefalseMemory-constrained environments
Safe50truetrueProduction with high retention

Destructive Operations

Operations that trigger auto-checkpoints when auto_checkpoint is enabled:

OperationVariantFieldsAffected Count
DELETEDeletetable, row_countrow_count
DROP TABLEDropTabletable, row_countrow_count
DROP INDEXDropIndextable, column1
NODE DELETENodeDeletenode_id, edge_count1 + edge_count
EMBED DELETEEmbedDeletekey1
VAULT DELETEVaultDeletekey1
BLOB DELETEBlobDeleteartifact_id, size1
CACHE CLEARCacheClearentry_countentry_count

DestructiveOp Implementation

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DestructiveOp {
    Delete { table: String, row_count: usize },
    DropTable { table: String, row_count: usize },
    DropIndex { table: String, column: String },
    NodeDelete { node_id: u64, edge_count: usize },
    EmbedDelete { key: String },
    VaultDelete { key: String },
    BlobDelete { artifact_id: String, size: usize },
    CacheClear { entry_count: usize },
}

impl DestructiveOp {
    pub fn operation_name(&self) -> &'static str {
        match self {
            DestructiveOp::Delete { .. } => "DELETE",
            DestructiveOp::DropTable { .. } => "DROP TABLE",
            // ... etc
        }
    }

    pub fn affected_count(&self) -> usize {
        match self {
            DestructiveOp::Delete { row_count, .. } => *row_count,
            DestructiveOp::NodeDelete { edge_count, .. } => 1 + edge_count,
            DestructiveOp::DropIndex { .. } => 1,
            // ... etc
        }
    }
}
}

SQL Commands

CHECKPOINT

-- Named checkpoint
CHECKPOINT 'before-migration'

-- Auto-generated name (checkpoint-{timestamp})
CHECKPOINT

CHECKPOINTS

-- List all checkpoints
CHECKPOINTS

-- List last N checkpoints
CHECKPOINTS LIMIT 10

Returns: ID, Name, Created, Type (manual/auto)

ROLLBACK TO

-- By name
ROLLBACK TO 'checkpoint-name'

-- By ID
ROLLBACK TO 'uuid-string'

API Reference

CheckpointManager

#![allow(unused)]
fn main() {
impl CheckpointManager {
    /// Create manager with blob storage and configuration
    pub async fn new(
        blob: Arc<Mutex<BlobStore>>,
        config: CheckpointConfig
    ) -> Self;

    /// Create a manual checkpoint
    pub async fn create(
        &self,
        name: Option<&str>,
        store: &TensorStore
    ) -> Result<String>;

    /// Create an auto-checkpoint before destructive operation
    pub async fn create_auto(
        &self,
        command: &str,
        op: DestructiveOp,
        preview: OperationPreview,
        store: &TensorStore
    ) -> Result<String>;

    /// Rollback to a checkpoint by ID or name
    pub async fn rollback(
        &self,
        id_or_name: &str,
        store: &TensorStore
    ) -> Result<()>;

    /// List checkpoints, most recent first
    pub async fn list(
        &self,
        limit: Option<usize>
    ) -> Result<Vec<CheckpointInfo>>;

    /// Delete a checkpoint by ID or name
    pub async fn delete(&self, id_or_name: &str) -> Result<()>;

    /// Generate preview for a destructive operation
    pub fn generate_preview(
        &self,
        op: &DestructiveOp,
        sample_data: Vec<String>
    ) -> OperationPreview;

    /// Request user confirmation for an operation
    pub fn request_confirmation(
        &self,
        op: &DestructiveOp,
        preview: &OperationPreview
    ) -> bool;

    /// Set custom confirmation handler
    pub fn set_confirmation_handler(
        &mut self,
        handler: Arc<dyn ConfirmationHandler>
    );

    /// Check if auto-checkpoint is enabled
    pub fn auto_checkpoint_enabled(&self) -> bool;

    /// Check if interactive confirmation is enabled
    pub fn interactive_confirm_enabled(&self) -> bool;

    /// Access the current configuration
    pub fn config(&self) -> &CheckpointConfig;
}
}

ConfirmationHandler

#![allow(unused)]
fn main() {
pub trait ConfirmationHandler: Send + Sync {
    fn confirm(&self, op: &DestructiveOp, preview: &OperationPreview) -> bool;
}
}

Built-in implementations:

TypeBehaviorUse Case
AutoConfirmAlways returns trueAutomated scripts, testing
AutoRejectAlways returns falseTesting cancellation paths

CheckpointStorage

Internal storage layer for checkpoint persistence:

#![allow(unused)]
fn main() {
impl CheckpointStorage {
    /// Store a checkpoint state to blob storage
    pub async fn store(state: &CheckpointState, blob: &BlobStore) -> Result<String>;

    /// Load a checkpoint by ID or name
    pub async fn load(checkpoint_id: &str, blob: &BlobStore) -> Result<CheckpointState>;

    /// List all checkpoints (sorted by created_at descending)
    pub async fn list(blob: &BlobStore) -> Result<Vec<CheckpointInfo>>;

    /// Delete a checkpoint by artifact ID
    pub async fn delete(artifact_id: &str, blob: &BlobStore) -> Result<()>;
}
}

PreviewGenerator

Generates human-readable previews for destructive operations:

#![allow(unused)]
fn main() {
impl PreviewGenerator {
    pub fn new(sample_size: usize) -> Self;

    pub fn generate(&self, op: &DestructiveOp, sample_data: Vec<String>) -> OperationPreview;
}

// Utility functions
pub fn format_warning(op: &DestructiveOp) -> String;
pub fn format_confirmation_prompt(op: &DestructiveOp, preview: &OperationPreview) -> String;
}

Usage Examples

Basic Usage

#![allow(unused)]
fn main() {
use tensor_checkpoint::{CheckpointManager, CheckpointConfig};
use tensor_blob::{BlobStore, BlobConfig};
use tensor_store::TensorStore;

// Initialize
let store = TensorStore::new();
let blob = BlobStore::new(store.clone(), BlobConfig::default()).await?;
let blob = Arc::new(Mutex::new(blob));

let config = CheckpointConfig::default();
let manager = CheckpointManager::new(blob, config).await;

// Create checkpoint
let id = manager.create(Some("before-migration"), &store).await?;

// ... make changes ...

// Rollback if needed
manager.rollback("before-migration", &store).await?;
}

With Query Router

#![allow(unused)]
fn main() {
use query_router::QueryRouter;

let mut router = QueryRouter::new();
router.init_blob()?;
router.init_checkpoint()?;

// Execute checkpoint commands via SQL
router.execute_parsed("CHECKPOINT 'backup'")?;
router.execute_parsed("CHECKPOINTS")?;
router.execute_parsed("ROLLBACK TO 'backup'")?;
}

Custom Confirmation Handler

#![allow(unused)]
fn main() {
use tensor_checkpoint::{ConfirmationHandler, DestructiveOp, OperationPreview};
use std::io::{self, Write};

struct InteractiveHandler;

impl ConfirmationHandler for InteractiveHandler {
    fn confirm(&self, op: &DestructiveOp, preview: &OperationPreview) -> bool {
        println!("{}", tensor_checkpoint::format_confirmation_prompt(op, preview));
        io::stdout().flush().unwrap();

        let mut input = String::new();
        io::stdin().read_line(&mut input).unwrap();
        input.trim().to_lowercase() == "yes"
    }
}

// Usage
manager.set_confirmation_handler(Arc::new(InteractiveHandler));
}

Auto-Checkpoint with Rejection

#![allow(unused)]
fn main() {
use tensor_checkpoint::{AutoReject, CheckpointConfig};

// Create config with auto-checkpoint enabled
let config = CheckpointConfig::default()
    .with_auto_checkpoint(true)
    .with_interactive_confirm(true);

let mut manager = CheckpointManager::new(blob, config).await;
manager.set_confirmation_handler(Arc::new(AutoReject));

// DELETE will be rejected, no checkpoint created, operation cancelled
let result = router.execute("DELETE FROM users WHERE age > 50");
assert!(result.is_err());  // Operation cancelled by user
}

Retention Management

Checkpoints are automatically pruned when max_checkpoints is exceeded:

Retention Algorithm

#![allow(unused)]
fn main() {
pub async fn enforce(&self, blob: &BlobStore) -> Result<usize> {
    let checkpoints = CheckpointStorage::list(blob).await?;

    if checkpoints.len() <= self.max_checkpoints {
        return Ok(0);
    }

    let to_remove = checkpoints.len() - self.max_checkpoints;
    let mut removed = 0;

    // Checkpoints are sorted by created_at descending, oldest are at end
    for checkpoint in checkpoints.iter().rev().take(to_remove) {
        if CheckpointStorage::delete(&checkpoint.artifact_id, blob)
            .await
            .is_ok()
        {
            removed += 1;
        }
    }

    Ok(removed)
}
}

Retention Timing

Retention is enforced after every checkpoint creation:

  1. Create new checkpoint
  2. Store in blob storage
  3. Call retention.enforce()
  4. Return checkpoint ID

This ensures the checkpoint count never exceeds max_checkpoints + 1 at any point.

Retention Edge Cases

ScenarioBehavior
Creation failsRetention not called, count unchanged
Retention delete failsLogged but not fatal, continues deleting
max_checkpoints = 0All checkpoints deleted after creation
max_checkpoints = 1Only newest checkpoint retained

Interactive Confirmation

When interactive_confirm is enabled, destructive operations display a preview:

WARNING: About to delete 5 row(s) from table 'users'
Will delete 5 row(s) from table 'users'

Affected data sample:
  1. id=1, name='Alice'
  2. id=2, name='Bob'
  ... and 3 more

Type 'yes' to proceed, anything else to cancel:

Preview Generation

The preview generator formats human-readable summaries:

#![allow(unused)]
fn main() {
fn format_summary(&self, op: &DestructiveOp) -> String {
    match op {
        DestructiveOp::Delete { table, row_count } => {
            format!("Will delete {row_count} row(s) from table '{table}'")
        },
        DestructiveOp::DropTable { table, row_count } => {
            format!("Will drop table '{table}' containing {row_count} row(s)")
        },
        DestructiveOp::BlobDelete { artifact_id, size } => {
            let size_str = format_bytes(*size);
            format!("Will delete blob artifact '{artifact_id}' ({size_str})")
        },
        // ... etc
    }
}
}

Size Formatting

Blob sizes are formatted for readability:

BytesDisplay
< 1024“N bytes”
>= 1 KB“N.NN KB”
>= 1 MB“N.NN MB”
>= 1 GB“N.NN GB”

Rollback Algorithm

The rollback process completely replaces the store contents:

Algorithm Steps

  1. Locate Checkpoint: Search by ID first, then by name
  2. Load State: Deserialize CheckpointState from blob storage
  3. Deserialize Snapshot: Convert store_snapshot bytes to SlabRouter
  4. Clear Current Data: Remove all entries from current store
  5. Copy Restored Data: Iterate and copy all entries from restored router

Rollback Implementation

#![allow(unused)]
fn main() {
pub async fn rollback(&self, id_or_name: &str, store: &TensorStore) -> Result<()> {
    let blob = self.blob.lock().await;
    let state = CheckpointStorage::load(id_or_name, &blob).await?;

    store
        .restore_from_bytes(&state.store_snapshot)
        .map_err(|e| CheckpointError::Snapshot(e.to_string()))?;

    Ok(())
}

// In TensorStore
pub fn restore_from_bytes(&self, bytes: &[u8]) -> SnapshotResult<()> {
    let new_router = SlabRouter::from_bytes(bytes)?;

    // Clear current and copy data from new router
    self.router.clear();
    for key in new_router.scan("") {
        if let Ok(value) = new_router.get(&key) {
            let _ = self.router.put(&key, value);
        }
    }

    Ok(())
}
}

Rollback Characteristics

AspectBehavior
AtomicityNot atomic - partial restore possible on failure
IsolationNo locking - concurrent operations may see partial state
DurationO(n) where n = number of entries
MemoryRequires 2x memory during restore (old + new)

Edge Cases and Gotchas

Name vs ID Lookup

Checkpoints can be referenced by either name or ID:

#![allow(unused)]
fn main() {
async fn find_by_id_or_name(id_or_name: &str, blob: &BlobStore) -> Result<String> {
    let checkpoints = Self::list(blob).await?;

    for cp in checkpoints {
        // Exact match on ID or name
        if cp.id == id_or_name || cp.name == id_or_name {
            return Ok(cp.artifact_id);
        }
    }

    Err(CheckpointError::NotFound(id_or_name.to_string()))
}
}

Gotcha: If a checkpoint is named with a valid UUID format, it may conflict with ID lookup.

Auto-Generated Names

When no name is provided:

#![allow(unused)]
fn main() {
let name = name.map(String::from).unwrap_or_else(|| {
    let now = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_secs())
        .unwrap_or(0);
    format!("checkpoint-{now}")
});
}

Auto-checkpoint names follow the pattern: auto-before-{operation-name}

Timestamp Edge Cases

ScenarioBehavior
System time before epochTimestamp becomes 0
Rapid checkpoint creationMay have same second timestamp
Clock driftCheckpoints may be out of order

Blob Store Dependency

#![allow(unused)]
fn main() {
pub fn init_checkpoint(&mut self) -> Result<()> {
    self.init_checkpoint_with_config(CheckpointConfig::default())
}

pub fn init_checkpoint_with_config(&mut self, config: CheckpointConfig) -> Result<()> {
    let blob = self
        .blob
        .as_ref()
        .ok_or_else(|| {
            RouterError::CheckpointError(
                "Blob store must be initialized first".to_string()
            )
        })?;
    // ...
}
}

Gotcha: Always call init_blob() before init_checkpoint().

Performance Tips

Checkpoint Creation Performance

FactorImpactRecommendation
Store sizeO(n) serializationKeep hot data separate
Retention limitMore deletions on creationSet appropriate max_checkpoints
Blob storageNetwork latency for remoteUse local storage for fast checkpoints

Memory Considerations

  • Full snapshot is held in memory during creation
  • Rollback requires 2x memory temporarily
  • Large embeddings significantly increase checkpoint size

Optimization Strategies

  1. Incremental Checkpoints (not yet supported)

    • Currently full snapshots only
    • Future: delta-based checkpoints
  2. Selective Checkpointing

    • Use separate stores for hot vs. cold data
    • Only checkpoint critical data
  3. Compression

    • TensorStore supports compressed snapshots for file I/O
    • Checkpoint uses bincode (no compression)

Benchmarks

Store SizeCheckpoint TimeRollback TimeMemory
1K entries~5ms~3ms~100KB
10K entries~50ms~30ms~1MB
100K entries~500ms~300ms~10MB
1M entries~5s~3s~100MB
ModuleRelationship
tensor_blobStorage backend for checkpoint data
tensor_storeSource of snapshots and restore target
query_routerSQL command integration
neumann_shellInteractive confirmation handling

Limitations

  • Full snapshots only (no incremental checkpoints)
  • Single-node operation (no distributed checkpoints)
  • In-memory restore (entire snapshot loaded)
  • No automatic scheduling (manual or trigger-based only)
  • Not atomic (partial restore possible on failure)
  • No encryption (checkpoints stored in plaintext)
  • Bloom filter state not preserved (rebuilt on load if needed)