Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Relational Engine

The Relational Engine (Module 2) provides SQL-like table operations on top of the Tensor Store. It implements schema enforcement, composable condition predicates, SIMD-accelerated columnar filtering, and both hash and B-tree indexes for query acceleration.

Tables, rows, and indexes are stored as tensor data in the underlying Tensor Store, inheriting its thread safety from DashMap. The engine supports all standard CRUD operations, six SQL join types, aggregate functions, and batch operations for bulk inserts.

Architecture

flowchart TD
    subgraph RelationalEngine
        API[Public API]
        Schema[Schema Validation]
        Cond[Condition Evaluation]
        Hash[Hash Index]
        BTree[B-Tree Index]
        Columnar[Columnar SIMD]
    end

    API --> Schema
    API --> Cond
    Cond --> Hash
    Cond --> BTree
    Cond --> Columnar

    subgraph TensorStore
        Store[(DashMap Storage)]
        Meta[Table Metadata]
        Rows[Row Data]
        Idx[Index Entries]
    end

    Schema --> Meta
    API --> Rows
    Hash --> Idx
    BTree --> Idx

Query Execution Flow

flowchart TD
    Query[SELECT Query] --> ParseCond[Parse Condition]
    ParseCond --> CheckIdx{Has Index?}

    CheckIdx -->|Hash Index + Eq| HashLookup[O(1) Hash Lookup]
    CheckIdx -->|BTree + Range| BTreeRange[O(log n) Range Scan]
    CheckIdx -->|No Index| FullScan[Full Table Scan]

    HashLookup --> FilterRows[Apply Remaining Conditions]
    BTreeRange --> FilterRows
    FullScan --> SIMDFilter{Columnar Data?}

    SIMDFilter -->|Yes| VectorFilter[SIMD Vectorized Filter]
    SIMDFilter -->|No| RowFilter[Row-by-Row Filter]

    VectorFilter --> Results[Build Result Set]
    RowFilter --> Results
    FilterRows --> Results

Key Types

TypeDescription
RelationalEngineMain engine struct with TensorStore backend
RelationalConfigConfiguration for limits, timeouts, thresholds
SchemaTable schema with column definitions and constraints
ColumnColumn name, type, and nullability
ColumnTypeInt, Float, String, Bool, Bytes, Json
ValueTyped value: Null, Int(i64), Float(f64), String(String), Bool(bool), Bytes(Vec<u8>), Json(Value)
RowRow with ID and ordered column values
ConditionComposable filter predicate tree
ConstraintTable constraint: PrimaryKey, Unique, ForeignKey, NotNull
ForeignKeyConstraintForeign key definition with referential actions
ReferentialActionRestrict, Cascade, SetNull, SetDefault, NoAction
RelationalErrorError variants for table/column/index/constraint operations
ColumnDataColumnar storage for a single column with null bitmap
SelectionVectorBitmap-based row selection for SIMD operations
OrderedKeyB-tree index key with total ordering semantics
StreamingCursorIterator for batch-based query result streaming
CursorBuilderBuilder for customizing streaming cursor options
QueryMetricsQuery execution metrics for observability
IndexTrackerTracks index hits/misses to detect missing indexes

Column Types

TypeRust TypeStorage FormatDescription
Inti648-byte little-endian64-bit signed integer
Floatf648-byte IEEE 75464-bit floating point
StringStringDictionary-encodedUTF-8 string with deduplication
BoolboolPacked bitmap (64 values per u64)Boolean
BytesVec<u8>Raw bytesBinary data
Jsonserde_json::ValueJSON stringJSON value

Conditions

ConditionDescriptionIndex Support
Condition::TrueMatches all rowsN/A
Condition::Eq(col, val)Column equals valueHash Index
Condition::Ne(col, val)Column not equals valueNone
Condition::Lt(col, val)Column less than valueB-Tree Index
Condition::Le(col, val)Column less than or equalB-Tree Index
Condition::Gt(col, val)Column greater than valueB-Tree Index
Condition::Ge(col, val)Column greater than or equalB-Tree Index
Condition::And(a, b)Logical AND of two conditionsPartial (first indexable)
Condition::Or(a, b)Logical OR of two conditionsNone

Conditions can be combined using .and() and .or() methods:

#![allow(unused)]
fn main() {
// age >= 18 AND age < 65
let condition = Condition::Ge("age".into(), Value::Int(18))
    .and(Condition::Lt("age".into(), Value::Int(65)));

// status = 'active' OR priority > 5
let condition = Condition::Eq("status".into(), Value::String("active".into()))
    .or(Condition::Gt("priority".into(), Value::Int(5)));
}

The special column _id filters by row ID and can be indexed.

Error Types

ErrorCause
TableNotFoundTable does not exist
TableAlreadyExistsCreating duplicate table
ColumnNotFoundUpdate references unknown column
ColumnAlreadyExistsColumn already exists in table
TypeMismatchValue type does not match column type
NullNotAllowedNULL in non-nullable column
IndexAlreadyExistsCreating duplicate index
IndexNotFoundDropping non-existent index
IndexCorruptedIndex data is corrupted
StorageErrorUnderlying Tensor Store error
InvalidNameInvalid table or column name
SchemaCorruptedSchema metadata is corrupted
TransactionNotFoundTransaction ID not found
TransactionInactiveTransaction already committed/aborted
LockConflictLock conflict with another transaction
LockTimeoutLock acquisition timed out
RollbackFailedRollback operation failed
ResultTooLargeResult set exceeds maximum size
TooManyTablesMaximum table count exceeded
TooManyIndexesMaximum index count exceeded
QueryTimeoutQuery execution timed out
PrimaryKeyViolationPrimary key constraint violated
UniqueViolationUnique constraint violated
ForeignKeyViolationForeign key constraint violated on insert/update
ForeignKeyRestrictForeign key prevents delete/update
ConstraintNotFoundConstraint does not exist
ConstraintAlreadyExistsConstraint already exists
ColumnHasConstraintColumn has constraint preventing operation
CannotAddColumnCannot add column due to constraint

Storage Model

Tables, rows, and indexes are stored in Tensor Store with specific key patterns:

Key PatternContent
_meta:table:{name}Schema metadata
{table}:{row_id}Row data
_idx:{table}:{column}Hash index metadata
_idx:{table}:{column}:{hash}Hash index entries (list of row IDs)
_btree:{table}:{column}B-tree index metadata
_btree:{table}:{column}:{sortable_key}B-tree index entries
_col:{table}:{column}:dataColumnar data storage
_col:{table}:{column}:idsColumnar row ID mapping
_col:{table}:{column}:nullsColumnar null bitmap
_col:{table}:{column}:metaColumnar metadata

Schema metadata encodes:

  • _columns: Comma-separated column names
  • _col:{name}: Type and nullability for each column

Row Storage Format

Each row is stored as a TensorData object:

#![allow(unused)]
fn main() {
// Internal row structure
{
    "_id": Scalar(Int(row_id)),
    "name": Scalar(String("Alice")),
    "age": Scalar(Int(30)),
    "email": Scalar(String("alice@example.com"))
}
}

Usage Examples

Table Operations

#![allow(unused)]
fn main() {
let engine = RelationalEngine::new();

// Create table with schema
let schema = Schema::new(vec![
    Column::new("name", ColumnType::String),
    Column::new("age", ColumnType::Int),
    Column::new("email", ColumnType::String).nullable(),
]);
engine.create_table("users", schema)?;

// Check existence
engine.table_exists("users")?;  // -> bool

// List all tables
let tables = engine.list_tables();  // -> Vec<String>

// Get schema
let schema = engine.get_schema("users")?;

// Drop table (deletes all rows and indexes)
engine.drop_table("users")?;

// Row count
engine.row_count("users")?;  // -> usize
}

CRUD Operations

#![allow(unused)]
fn main() {
// INSERT
let mut values = HashMap::new();
values.insert("name".to_string(), Value::String("Alice".into()));
values.insert("age".to_string(), Value::Int(30));
let row_id = engine.insert("users", values)?;

// BATCH INSERT (59x faster for bulk inserts)
let rows: Vec<HashMap<String, Value>> = (0..1000)
    .map(|i| {
        let mut values = HashMap::new();
        values.insert("name".to_string(), Value::String(format!("User{}", i)));
        values.insert("age".to_string(), Value::Int(20 + i));
        values
    })
    .collect();
let row_ids = engine.batch_insert("users", rows)?;

// SELECT
let rows = engine.select("users", Condition::Eq("age".into(), Value::Int(30)))?;

// UPDATE
let mut updates = HashMap::new();
updates.insert("age".to_string(), Value::Int(31));
let count = engine.update(
    "users",
    Condition::Eq("name".into(), Value::String("Alice".into())),
    updates
)?;

// DELETE
let count = engine.delete_rows("users", Condition::Lt("age".into(), Value::Int(18)))?;
}

Constraints

The engine supports four constraint types for data integrity:

ConstraintDescription
PrimaryKeyUnique + not null, identifies rows uniquely
UniqueValues must be unique (NULLs allowed)
ForeignKeyReferences rows in another table
NotNullColumn cannot contain NULL values
#![allow(unused)]
fn main() {
use relational_engine::{Constraint, ForeignKeyConstraint, ReferentialAction};

// Create table with constraints
let schema = Schema::with_constraints(
    vec![
        Column::new("id", ColumnType::Int),
        Column::new("email", ColumnType::String),
        Column::new("dept_id", ColumnType::Int).nullable(),
    ],
    vec![
        Constraint::primary_key("pk_users", vec!["id".to_string()]),
        Constraint::unique("uq_email", vec!["email".to_string()]),
    ],
);
engine.create_table("users", schema)?;

// Add constraint after table creation
engine.add_constraint("users", Constraint::not_null("nn_email", "email"))?;

// Add foreign key with referential actions
let fk = ForeignKeyConstraint::new(
    "fk_users_dept",
    vec!["dept_id".to_string()],
    "departments",
    vec!["id".to_string()],
)
.on_delete(ReferentialAction::SetNull)
.on_update(ReferentialAction::Cascade);
engine.add_constraint("users", Constraint::foreign_key(fk))?;

// Get constraints
let constraints = engine.get_constraints("users")?;

// Drop constraint
engine.drop_constraint("users", "uq_email")?;
}

Referential Actions

Foreign keys support these actions on delete/update of referenced rows:

ActionDescription
Restrict (default)Prevent the operation
CascadeCascade to referencing rows
SetNullSet referencing columns to NULL
SetDefaultSet referencing columns to default
NoActionSame as Restrict, checked at commit

ALTER TABLE Operations

#![allow(unused)]
fn main() {
// Add a new column (nullable or with default)
engine.add_column("users", Column::new("phone", ColumnType::String).nullable())?;

// Drop a column (fails if column has constraints)
engine.drop_column("users", "phone")?;

// Rename a column (updates constraints automatically)
engine.rename_column("users", "email", "email_address")?;
}

Joins

All six SQL join types are supported using hash join algorithm (O(n+m)):

#![allow(unused)]
fn main() {
// INNER JOIN - Only matching rows from both tables
let joined = engine.join("users", "posts", "_id", "user_id")?;
// Returns: Vec<(Row, Row)>

// LEFT JOIN - All rows from left, matching from right (or None)
let joined = engine.left_join("users", "posts", "_id", "user_id")?;
// Returns: Vec<(Row, Option<Row>)>

// RIGHT JOIN - All rows from right, matching from left (or None)
let joined = engine.right_join("users", "posts", "_id", "user_id")?;
// Returns: Vec<(Option<Row>, Row)>

// FULL JOIN - All rows from both tables
let joined = engine.full_join("users", "posts", "_id", "user_id")?;
// Returns: Vec<(Option<Row>, Option<Row>)>

// CROSS JOIN (Cartesian product)
let joined = engine.cross_join("users", "posts")?;
// Returns: Vec<(Row, Row)> with n*m rows

// NATURAL JOIN (on common column names)
let joined = engine.natural_join("users", "user_profiles")?;
// Returns: Vec<(Row, Row)> matching on all common columns
}

Aggregate Functions

#![allow(unused)]
fn main() {
// COUNT(*) - count all rows
let count = engine.count("users", Condition::True)?;

// COUNT(column) - count non-null values
let count = engine.count_column("users", "email", Condition::True)?;

// SUM - returns f64
let total = engine.sum("orders", "amount", Condition::True)?;

// AVG - returns Option<f64> (None if no matching rows)
let avg = engine.avg("orders", "amount", Condition::True)?;

// MIN/MAX - returns Option<Value>
let min = engine.min("products", "price", Condition::True)?;
let max = engine.max("products", "price", Condition::True)?;
}

Indexes

Hash Indexes

Hash indexes provide O(1) equality lookups for Condition::Eq queries:

#![allow(unused)]
fn main() {
// Create hash index
engine.create_index("users", "age")?;

// Check existence
engine.has_index("users", "age");  // -> bool

// Get indexed columns
engine.get_indexed_columns("users");  // -> Vec<String>

// Drop index
engine.drop_index("users", "age")?;
}

Hash Index Implementation Details:

graph LR
    subgraph "Hash Index Structure"
        Value[Column Value] --> Hash[hash_key()]
        Hash --> Bucket["_idx:table:col:hash"]
        Bucket --> IDs["Vec<row_id>"]
    end

The hash index uses value-specific hashing:

Value TypeHash FormatExample
Null"null""null"
Int(i)"i:{value}""i:42"
Float(f)"f:{bits}""f:4614253070214989087"
String(s)"s:{hash}""s:a1b2c3d4"
Bool(b)"b:{value}""b:true"

Hash index performance:

Query TypeWithout IndexWith IndexSpeedup
Equality (2% match on 5K rows)5.96ms126us47x
Single row by _id (5K rows)5.59ms3.5us1,597x

B-Tree Indexes

B-tree indexes accelerate range queries (Lt, Le, Gt, Ge) with O(log n + m) complexity:

#![allow(unused)]
fn main() {
// Create B-tree index
engine.create_btree_index("users", "age")?;

// Check existence
engine.has_btree_index("users", "age");  // -> bool

// Get B-tree indexed columns
engine.get_btree_indexed_columns("users");  // -> Vec<String>

// Drop index
engine.drop_btree_index("users", "age")?;

// Range queries now use the index
engine.select("users", Condition::Ge("age".into(), Value::Int(18)))?;
}

B-Tree Index Implementation Details:

The B-tree index uses a dual-storage approach:

  1. In-memory BTreeMap: For O(log n) range operations
  2. Persistent TensorStore: For durability and recovery
#![allow(unused)]
fn main() {
// Internal B-tree index structure
btree_indexes: RwLock<HashMap<
    (String, String),           // (table, column)
    BTreeMap<OrderedKey, Vec<u64>>  // value -> row_ids
>>
}

OrderedKey for Total Ordering:

The OrderedKey enum provides correct ordering semantics:

#![allow(unused)]
fn main() {
pub enum OrderedKey {
    Null,                    // Sorts first
    Bool(bool),              // false < true
    Int(i64),                // Standard integer ordering
    Float(OrderedFloat),     // NaN < all other values
    String(String),          // Lexicographic ordering
}
}

Sortable Key Encoding:

For persistent storage, values are encoded to maintain lexicographic ordering:

TypeEncodingExample
Null"0""0"
Int(i)"i{hex(i + 2^63)}""i8000000000000000" for 0
Float(f)"f{sortable_bits}"IEEE 754 with sign handling
String(s)"s{s}""sAlice"
Bool(b)"b0" or "b1""b1" for true

Integer encoding shifts the range from [-2^63, 2^63-1] to [0, 2^64-1] for correct lexicographic ordering of negative numbers.

B-Tree Range Operations:

#![allow(unused)]
fn main() {
// Internal range lookup
fn btree_range_lookup(&self, table: &str, column: &str,
                      value: &Value, op: RangeOp) -> Option<Vec<u64>> {
    match op {
        RangeOp::Lt => btree.range(..target),
        RangeOp::Le => btree.range(..=target),
        RangeOp::Gt => btree.range((Excluded(target), Unbounded)),
        RangeOp::Ge => btree.range(target..),
    }
}
}

Columnar Architecture

The engine uses columnar storage with SIMD-accelerated filtering:

Columnar Data Structures

graph TD
    subgraph "ColumnData"
        Name[name: String]
        RowIDs[row_ids: Vec<u64>]
        Nulls[nulls: NullBitmap]
        Values[values: ColumnValues]
    end

    subgraph "ColumnValues Variants"
        Int["Int(Vec<i64>)"]
        Float["Float(Vec<f64>)"]
        String["String { dict, indices }"]
        Bool["Bool(Vec<u64>)"]
    end

    subgraph "NullBitmap Variants"
        None["None (no nulls)"]
        Dense["Dense(Vec<u64>)"]
        Sparse["Sparse(Vec<u64>)"]
    end

    Values --> Int
    Values --> Float
    Values --> String
    Values --> Bool
    Nulls --> None
    Nulls --> Dense
    Nulls --> Sparse

Null Bitmap Selection:

  • None: When column has no null values
  • Sparse: When nulls are < 10% of rows (stores positions)
  • Dense: When nulls are >= 10% of rows (stores bitmap)

SIMD Filtering

Column data is stored in contiguous arrays enabling 4-wide SIMD vectorized comparisons using the wide crate:

#![allow(unused)]
fn main() {
// SIMD filter implementation using wide::i64x4
pub fn filter_lt_i64(values: &[i64], threshold: i64, result: &mut [u64]) {
    let chunks = values.len() / 4;
    let threshold_vec = i64x4::splat(threshold);

    for i in 0..chunks {
        let offset = i * 4;
        let v = i64x4::new([
            values[offset],
            values[offset + 1],
            values[offset + 2],
            values[offset + 3],
        ]);
        let cmp = v.cmp_lt(threshold_vec);
        let mask_arr: [i64; 4] = cmp.into();

        for (j, &m) in mask_arr.iter().enumerate() {
            if m != 0 {
                let bit_pos = offset + j;
                result[bit_pos / 64] |= 1u64 << (bit_pos % 64);
            }
        }
    }

    // Handle remainder with scalar fallback
    let start = chunks * 4;
    for i in start..values.len() {
        if values[i] < threshold {
            result[i / 64] |= 1u64 << (i % 64);
        }
    }
}
}

Available SIMD Filter Functions:

FunctionOperationTypes
filter_lt_i64Less thani64
filter_le_i64Less than or equali64
filter_gt_i64Greater thani64
filter_ge_i64Greater than or equali64
filter_eq_i64Equali64
filter_ne_i64Not equali64
filter_lt_f64Less thanf64
filter_gt_f64Greater thanf64
filter_eq_f64Equal (with epsilon)f64

Bitmap Operations:

#![allow(unused)]
fn main() {
// AND two selection bitmaps
pub fn bitmap_and(a: &[u64], b: &[u64], result: &mut [u64])

// OR two selection bitmaps
pub fn bitmap_or(a: &[u64], b: &[u64], result: &mut [u64])

// Count set bits
pub fn popcount(bitmap: &[u64]) -> usize

// Extract selected indices
pub fn selected_indices(bitmap: &[u64], max_count: usize) -> Vec<usize>
}

Selection Vectors

Query results use bitmap-based selection vectors to avoid copying data:

#![allow(unused)]
fn main() {
pub struct SelectionVector {
    bitmap: Vec<u64>,  // Packed bits indicating selected rows
    row_count: usize,
}

impl SelectionVector {
    // Create selection of all rows
    pub fn all(row_count: usize) -> Self;

    // Create empty selection
    pub fn none(row_count: usize) -> Self;

    // Check if row is selected
    pub fn is_selected(&self, idx: usize) -> bool;

    // Count selected rows
    pub fn count(&self) -> usize;

    // AND two selections (intersection)
    pub fn intersect(&self, other: &SelectionVector) -> SelectionVector;

    // OR two selections (union)
    pub fn union(&self, other: &SelectionVector) -> SelectionVector;
}
}

Columnar Select API

#![allow(unused)]
fn main() {
// Materialize columns for SIMD filtering
engine.materialize_columns("users", &["age", "name"])?;

// Check if columnar data exists
engine.has_columnar_data("users", "age");  // -> bool

// Select with columnar scan options
let options = ColumnarScanOptions {
    projection: Some(vec!["name".into()]),  // Only return these columns
    prefer_columnar: true,                   // Use SIMD when available
};

let rows = engine.select_columnar(
    "users",
    Condition::Gt("age".into(), Value::Int(50)),
    options
)?;

// Drop columnar data
engine.drop_columnar_data("users", "age")?;
}

Condition Evaluation

Two evaluation methods are available:

MethodInputPerformanceUse Case
evaluate(&row)Row structLegacy, creates intermediate objectsRow-by-row filtering
evaluate_tensor(&tensor)TensorData31% faster, no intermediate allocationDirect tensor filtering

The engine automatically chooses the optimal evaluation path:

flowchart TD
    Cond[Condition] --> CheckColumnar{Columnar Data Available?}
    CheckColumnar -->|Yes| CheckType{Int Column?}
    CheckColumnar -->|No| RowEval[evaluate_tensor per row]

    CheckType -->|Yes| SIMDEval[SIMD Vectorized Filter]
    CheckType -->|No| RowEval

    SIMDEval --> Bitmap[Selection Bitmap]
    RowEval --> Filter[Filter Matching Rows]

    Bitmap --> Materialize[Materialize Results]
    Filter --> Materialize

Join Algorithm Implementations

Hash Join (INNER, LEFT, RIGHT, FULL)

All equality joins use the hash join algorithm with O(n+m) complexity:

flowchart LR
    subgraph "Build Phase"
        RightTable[Right Table] --> BuildHash[Build Hash Index]
        BuildHash --> HashIndex["HashMap<hash, Vec<idx>>"]
    end

    subgraph "Probe Phase"
        LeftTable[Left Table] --> Probe[Probe Hash Index]
        Probe --> HashIndex
        HashIndex --> Match[Find Matching Rows]
    end

    Match --> Results[Join Results]

Hash Join Implementation:

#![allow(unused)]
fn main() {
pub fn join(&self, table_a: &str, table_b: &str,
            on_a: &str, on_b: &str) -> Result<Vec<(Row, Row)>> {
    let rows_a = self.select(table_a, Condition::True)?;
    let rows_b = self.select(table_b, Condition::True)?;

    // Build phase: index the right table
    let mut index: HashMap<String, Vec<usize>> = HashMap::with_capacity(rows_b.len());
    for (i, row) in rows_b.iter().enumerate() {
        if let Some(val) = row.get_with_id(on_b) {
            let hash = val.hash_key();
            index.entry(hash).or_default().push(i);
        }
    }

    // Probe phase: scan left table and probe index
    let mut results = Vec::with_capacity(min(rows_a.len(), rows_b.len()));
    for row_a in &rows_a {
        if let Some(val) = row_a.get_with_id(on_a) {
            let hash = val.hash_key();
            if let Some(indices) = index.get(&hash) {
                for &i in indices {
                    let row_b = &rows_b[i];
                    // Verify actual equality (handles hash collisions)
                    if row_b.get_with_id(on_b).as_ref() == Some(&val) {
                        results.push((row_a.clone(), row_b.clone()));
                    }
                }
            }
        }
    }
    Ok(results)
}
}

Parallel Join Optimization:

When left table exceeds PARALLEL_THRESHOLD (1000 rows), joins use Rayon for parallel probing:

#![allow(unused)]
fn main() {
if rows_a.len() >= Self::PARALLEL_THRESHOLD {
    rows_a.par_iter()
        .flat_map(|row_a| {
            // Parallel probe of hash index
        })
        .collect()
}
}

Natural Join

Natural join finds all common column names and joins on their equality:

#![allow(unused)]
fn main() {
pub fn natural_join(&self, table_a: &str, table_b: &str) -> Result<Vec<(Row, Row)>> {
    let schema_a = self.get_schema(table_a)?;
    let schema_b = self.get_schema(table_b)?;

    // Find common columns
    let cols_a: HashSet<_> = schema_a.columns.iter().map(|c| c.name.as_str()).collect();
    let cols_b: HashSet<_> = schema_b.columns.iter().map(|c| c.name.as_str()).collect();
    let common_cols: Vec<_> = cols_a.intersection(&cols_b).copied().collect();

    // No common columns = cross join
    if common_cols.is_empty() {
        return self.cross_join(table_a, table_b);
    }

    // Build composite hash key from all common columns
    // ...
}
}

Aggregate Function Internals

Parallel Aggregation

For tables exceeding PARALLEL_THRESHOLD (1000 rows), aggregates use parallel reduction:

#![allow(unused)]
fn main() {
pub fn avg(&self, table: &str, column: &str, condition: Condition) -> Result<Option<f64>> {
    let rows = self.select(table, condition)?;

    let (total, count) = if rows.len() >= Self::PARALLEL_THRESHOLD {
        // Parallel map-reduce
        rows.par_iter()
            .map(|row| extract_numeric(row, column))
            .reduce(|| (0.0, 0u64), |(s1, c1), (s2, c2)| (s1 + s2, c1 + c2))
    } else {
        // Sequential accumulation
        let mut total = 0.0;
        let mut count = 0u64;
        for row in &rows {
            // accumulate...
        }
        (total, count)
    };

    if count == 0 { Ok(None) } else { Ok(Some(total / count as f64)) }
}
}

MIN/MAX with Parallel Reduction

#![allow(unused)]
fn main() {
pub fn min(&self, table: &str, column: &str, condition: Condition) -> Result<Option<Value>> {
    let rows = self.select(table, condition)?;

    if rows.len() >= Self::PARALLEL_THRESHOLD {
        rows.par_iter()
            .filter_map(|row| row.get(column).filter(|v| !matches!(v, Value::Null)))
            .reduce_with(|a, b| {
                if a.partial_cmp_value(&b) == Some(Ordering::Less) { a } else { b }
            })
    } else {
        // Sequential scan
    }
}
}

Performance Characteristics

OperationComplexityNotes
insertO(1) + O(k)Schema validation + store put + k index updates
batch_insertO(n) + O(n*k)Single schema lookup, 59x faster than n inserts
select (no index)O(n)Full table scan with SIMD filter
select (hash index)O(1)Direct lookup via hash index
select (btree range)O(log n + m)B-tree lookup + m matching rows
updateO(n) + O(k)Scan + conditional update + index maintenance
delete_rowsO(n) + O(k)Scan + conditional delete + index removal
joinO(n+m)Hash join for all 6 join types
cross_joinO(n*m)Cartesian product
count/sum/avg/min/maxO(n)Single pass over matching rows
create_indexO(n)Scan all rows to build index
materialize_columnsO(n)Extract column to contiguous array

Where k = number of indexes on the table, n = rows in left table, m = rows in right table.

Parallel Threshold

Operations automatically switch to parallel execution when row count exceeds PARALLEL_THRESHOLD:

#![allow(unused)]
fn main() {
impl RelationalEngine {
    const PARALLEL_THRESHOLD: usize = 1000;
}
}

Parallel Operations:

  • delete_rows (parallel deletion via Rayon)
  • join (parallel probe phase)
  • sum, avg, min, max (parallel reduction)

Configuration

RelationalConfig

The engine can be configured with RelationalConfig:

#![allow(unused)]
fn main() {
let config = RelationalConfig {
    max_tables: Some(1000),              // Maximum tables allowed
    max_indexes_per_table: Some(10),     // Maximum indexes per table
    max_btree_entries: 10_000_000,       // Maximum B-tree index entries
    default_query_timeout_ms: Some(5000),// Default query timeout
    max_query_timeout_ms: Some(300_000), // Maximum allowed timeout (5 min)
    slow_query_threshold_ms: 100,        // Slow query warning threshold
    max_query_result_rows: Some(10_000), // Maximum rows per query
    transaction_timeout_secs: 60,        // Transaction timeout
    lock_timeout_secs: 30,               // Lock acquisition timeout
};
let engine = RelationalEngine::with_config(config);
}
OptionDefaultDescription
max_tablesNone (unlimited)Maximum number of tables
max_indexes_per_tableNone (unlimited)Maximum indexes per table
max_btree_entries10,000,000Maximum B-tree index entries total
default_query_timeout_msNoneDefault timeout for queries
max_query_timeout_ms300,000 (5 min)Maximum allowed query timeout
slow_query_threshold_ms100Threshold for slow query warnings
max_query_result_rowsNone (unlimited)Maximum rows returned per query
transaction_timeout_secs60Transaction timeout
lock_timeout_secs30Lock acquisition timeout

Internal Constants

ConstantValueDescription
PARALLEL_THRESHOLD1000Minimum rows for parallel operations
Null bitmap sparse threshold10%Use sparse bitmap when nulls < 10%
SIMD vector width4i64x4/f64x4 operations

Observability

The observability module provides query metrics, slow query detection, and index usage tracking.

Query Metrics

#![allow(unused)]
fn main() {
use relational_engine::observability::{QueryMetrics, check_slow_query};
use std::time::Duration;

let metrics = QueryMetrics::new("users", "select")
    .with_rows_scanned(10000)
    .with_rows_returned(50)
    .with_index("idx_user_id")
    .with_duration(Duration::from_millis(25));

// Log warning if query exceeds threshold
check_slow_query(&metrics, 100); // threshold in ms
}

Index Tracking

Track index usage to identify missing indexes:

#![allow(unused)]
fn main() {
use relational_engine::observability::IndexTracker;

let tracker = IndexTracker::new();

// Record when index is used
tracker.record_hit("users", "id");

// Record when index could have been used but wasn't
tracker.record_miss("users", "email");

// Get reports of columns needing indexes
let reports = tracker.report_misses();
for report in reports {
    println!(
        "Table {}, column {}: {} misses, {} hits",
        report.table, report.column, report.miss_count, report.hit_count
    );
}

// Aggregate statistics
let total_hits = tracker.total_hits();
let total_misses = tracker.total_misses();
}

Slow Query Warnings

The check_slow_query function logs a tracing::warn! when queries exceed the threshold:

#![allow(unused)]
fn main() {
use relational_engine::observability::{check_slow_query, warn_full_table_scan};

// Warn if query took > 100ms
check_slow_query(&metrics, 100);

// Warn about full table scans on large tables (> 1000 rows)
warn_full_table_scan("users", "select", 5000);
}

Streaming Cursor API

For large result sets, use streaming cursors to avoid loading all rows into memory at once. The cursor fetches rows in configurable batches.

Basic Usage

#![allow(unused)]
fn main() {
use relational_engine::{StreamingCursor, Condition};

// Create streaming cursor with default batch size (1000)
let cursor = engine.select_streaming("users", Condition::True);

// Iterate over results
for row_result in cursor {
    let row = row_result?;
    println!("User: {:?}", row);
}
}

Custom Options

#![allow(unused)]
fn main() {
// With custom batch size
let cursor = engine.select_streaming("users", Condition::True)
    .with_batch_size(100)
    .with_max_rows(5000);

// Using the builder
let cursor = engine.select_streaming_builder("users", Condition::True)
    .batch_size(100)
    .max_rows(5000)
    .build();

// Check cursor state
let mut cursor = engine.select_streaming("users", Condition::True);
while let Some(row) = cursor.next() {
    println!("Yielded so far: {}", cursor.rows_yielded());
}
println!("Exhausted: {}", cursor.is_exhausted());
}

Cursor Methods

MethodDescription
with_batch_size(n)Set rows fetched per batch (default: 1000)
with_max_rows(n)Limit total rows returned
rows_yielded()Number of rows returned so far
is_exhausted()Whether cursor has no more rows

Edge Cases and Gotchas

NULL Handling

  1. NULL in conditions: Comparisons with NULL columns return false:

    #![allow(unused)]
    fn main() {
    // If email is NULL, this returns false (not true!)
    Condition::Lt("email".into(), Value::String("z".into()))
    
    }

2. **NULL in joins**: NULL values never match in join conditions:

   ```rust
   // Post with user_id = NULL will not join with any user
   engine.join("users", "posts", "_id", "user_id")
  1. COUNT vs COUNT(column):
    • count() counts all rows
    • count_column() counts non-null values only

Type Mismatches

Comparisons between incompatible types return false rather than error:

#![allow(unused)]
fn main() {
// Age is Int, comparing with String returns 0 matches (not error)
engine.select("users", Condition::Lt("age".into(), Value::String("30".into())));
}

Index Maintenance

Indexes are automatically maintained on INSERT, UPDATE, and DELETE:

#![allow(unused)]
fn main() {
// Creating index AFTER data exists
engine.insert("users", values)?;  // No index update
engine.create_index("users", "age")?;  // Scans all rows

// Creating index BEFORE data exists
engine.create_index("users", "age")?;  // Empty index
engine.insert("users", values)?;  // Updates index
}

Batch Insert Atomicity

batch_insert validates ALL rows upfront before inserting any:

#![allow(unused)]
fn main() {
let rows = vec![valid_row, invalid_row];
// Fails on validation - NO rows inserted (not partial insert)
engine.batch_insert("users", rows);
}

B-Tree Index Recovery

B-tree indexes maintain both in-memory and persistent state. The in-memory BTreeMap is rebuilt lazily on first access after restart.

Best Practices

Index Selection

Query PatternRecommended Index
WHERE col = valueHash Index
WHERE col > valueB-Tree Index
WHERE col BETWEEN a AND bB-Tree Index
WHERE col IN (...)Hash Index
Unique lookups by IDHash Index on _id

Columnar Materialization

Materialize columns when:

  • Performing many range scans on large tables
  • Query selectivity is low (scanning most rows)
  • Column data fits in memory
#![allow(unused)]
fn main() {
// Good: Materialize frequently-filtered columns
engine.materialize_columns("events", &["timestamp", "user_id"])?;

// Query uses SIMD acceleration
engine.select_columnar("events",
    Condition::Gt("timestamp".into(), Value::Int(cutoff)),
    ColumnarScanOptions { prefer_columnar: true, .. }
)?;
}

Batch Operations

Use batch_insert for bulk loading:

#![allow(unused)]
fn main() {
// Bad: 1000 individual inserts
for row in rows {
    engine.insert("table", row)?;  // 1000 schema lookups
}

// Good: Single batch insert
engine.batch_insert("table", rows)?;  // 1 schema lookup, 59x faster
}

SQL Features via Query Router

When using the relational engine through query_router, additional SQL features are available:

ORDER BY and OFFSET

SELECT * FROM users ORDER BY age ASC;
SELECT * FROM users ORDER BY department DESC, name ASC;
SELECT * FROM users ORDER BY email NULLS FIRST;
SELECT * FROM users ORDER BY created_at DESC LIMIT 10 OFFSET 20;

GROUP BY and HAVING

SELECT department, COUNT(*), AVG(salary) FROM employees GROUP BY department;

SELECT product, SUM(quantity) as total
FROM orders
GROUP BY product
HAVING SUM(quantity) > 100;
ModuleRelationship
tensor_storeStorage backend for tables, rows, and indexes
query_routerExecutes SQL queries using RelationalEngine
neumann_parserParses SQL statements into AST
tensor_unifiedMulti-engine unified storage layer

Feature Summary

Implemented

FeatureDescription
Hash indexesO(1) equality lookups
B-tree indexesO(log n) range query acceleration
All 6 JOIN typesINNER, LEFT, RIGHT, FULL, CROSS, NATURAL
Aggregate functionsCOUNT, SUM, AVG, MIN, MAX
ORDER BYMulti-column sorting with ASC/DESC, NULLS FIRST/LAST
LIMIT/OFFSETPagination support
GROUP BY + HAVINGRow grouping with aggregate filtering
Columnar storageSIMD-accelerated filtering with selection vectors
Batch operations59x faster bulk inserts
Parallel operationsRayon-based parallelism for large tables
Dictionary encodingString column compression
TransactionsRow-level ACID with undo log - see Transactions
ConstraintsPRIMARY KEY, UNIQUE, FOREIGN KEY, NOT NULL
Foreign KeysFull referential integrity with CASCADE/SET NULL/RESTRICT
ALTER TABLEadd_column, drop_column, rename_column
Streaming cursorsMemory-efficient iteration over large result sets
ObservabilityQuery metrics, slow query detection, index tracking

Future Considerations

FeatureStatus
Query OptimizationNot implemented
SubqueriesNot implemented
Window FunctionsNot implemented
Composite IndexesNot implemented