Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

TCP Transport

The TCP transport layer provides reliable, secure node-to-node communication for the tensor_chain distributed system. It implements connection pooling, TLS security, rate limiting, compression, and automatic reconnection.

Overview

The TcpTransport implements the Transport trait, providing:

  • Connection pooling for efficient peer communication
  • TLS encryption with mutual authentication support
  • Rate limiting using token bucket algorithm
  • LZ4 compression for bandwidth efficiency
  • Automatic reconnection with exponential backoff
flowchart TD
    A[Application] --> B[TcpTransport]
    B --> C[ConnectionManager]
    C --> D1[ConnectionPool Node A]
    C --> D2[ConnectionPool Node B]
    C --> D3[ConnectionPool Node C]
    D1 --> E1[TLS Stream]
    D2 --> E2[TLS Stream]
    D3 --> E3[TLS Stream]

Connection Architecture

Connection Manager

The ConnectionManager maintains connection pools for each peer. Each pool can hold multiple connections for load distribution.

#![allow(unused)]
fn main() {
let config = TcpTransportConfig::new("node1", "0.0.0.0:9100".parse()?);
let transport = TcpTransport::new(config);
transport.start().await?;
}

Connection Lifecycle

stateDiagram-v2
    [*] --> Connecting
    Connecting --> Handshaking: TCP Connected
    Handshaking --> Active: Handshake Success
    Handshaking --> Failed: Handshake Failed
    Active --> Reading: Message Available
    Active --> Writing: Send Request
    Reading --> Active: Message Processed
    Writing --> Active: Message Sent
    Active --> Reconnecting: Connection Lost
    Reconnecting --> Connecting: Backoff Complete
    Reconnecting --> Failed: Max Retries
    Failed --> [*]

Configuration

ParameterDefaultDescription
pool_size2Connections per peer
connect_timeout_ms5000Connection timeout in milliseconds
io_timeout_ms30000Read/write timeout in milliseconds
max_message_size16 MBMaximum message size in bytes
keepalivetrueEnable TCP keepalive
keepalive_interval_secs30Keepalive probe interval
max_pending_messages1000Outbound queue size per peer
recv_buffer_size1000Incoming message channel size

TLS Security

The transport supports four security modes to accommodate different deployment scenarios.

Security Modes

ModeTLSmTLSNodeId VerifyUse Case
StrictYesYesYesProduction deployments
PermissiveYesNoNoGradual TLS rollout
DevelopmentNoNoNoLocal testing only
LegacyNoNoNoMigration from older versions

NodeId Verification

NodeId verification ensures the peer’s identity matches their TLS certificate:

ModeDescription
NoneTrust NodeId from handshake (testing only)
CommonNameNodeId must match certificate CN
SubjectAltNameNodeId must match a SAN DNS entry

TLS Configuration

[tls]
cert_path = "/etc/neumann/node.crt"
key_path = "/etc/neumann/node.key"
ca_cert_path = "/etc/neumann/ca.crt"
require_client_auth = true
node_id_verification = "CommonName"

mTLS Handshake

sequenceDiagram
    participant C as Client Node
    participant S as Server Node

    C->>S: TCP Connect
    C->>S: TLS ClientHello
    S->>C: TLS ServerHello + Certificate
    S->>C: CertificateRequest
    C->>S: Client Certificate
    C->>S: CertificateVerify
    C->>S: Finished
    S->>C: Finished
    Note over C,S: TLS Established
    C->>S: Handshake(node_id, capabilities)
    S->>C: Handshake(node_id, capabilities)
    Note over C,S: Connection Ready

Rate Limiting

Per-peer rate limiting uses the token bucket algorithm to prevent any single peer from overwhelming the system.

Token Bucket Algorithm

flowchart LR
    A[Refill Timer] -->|tokens/sec| B[Token Bucket]
    B -->|check| C{Tokens > 0?}
    C -->|Yes| D[Allow Message]
    C -->|No| E[Reject Message]
    D --> F[Consume Token]

Configuration Presets

PresetBucket SizeRefill RateDescription
Default10050/secBalanced throughput
Aggressive5025/secLower burst, tighter limit
Permissive200100/secHigher throughput allowed
DisabledNo rate limiting

Configuration Example

[rate_limit]
enabled = true
bucket_size = 100
refill_rate = 50.0

Compression

Frame-level LZ4 compression reduces bandwidth usage for larger messages. Compression is negotiated during the handshake.

Frame Format

+--------+--------+------------+
| Length | Flags  | Payload    |
| 4 bytes| 1 byte | N bytes    |
+--------+--------+------------+

Flags byte:
  bit 0: 1 = LZ4 compressed, 0 = uncompressed
  bits 1-7: reserved (must be 0)

Configuration

ParameterDefaultDescription
enabledtrueEnable compression
methodLz4Compression algorithm
min_size256Minimum payload size to compress

Messages smaller than min_size are sent uncompressed to avoid overhead.

[compression]
enabled = true
method = "Lz4"
min_size = 256

Reconnection

Automatic reconnection uses exponential backoff with jitter to recover from transient failures without overwhelming the network.

Backoff Calculation

backoff = min(initial * multiplier^attempt, max_backoff)
jitter = backoff * random(-jitter_factor, +jitter_factor)
final_delay = backoff + jitter

Configuration

ParameterDefaultDescription
enabledtrueEnable auto-reconnection
initial_backoff_ms100Initial backoff delay
max_backoff_ms30000Maximum backoff delay
multiplier2.0Exponential multiplier
max_attemptsNoneMax retries (None = infinite)
jitter0.1Jitter factor (0.0 to 1.0)

Backoff Example

AttemptBase DelayWith 10% Jitter
0100ms90-110ms
1200ms180-220ms
2400ms360-440ms
3800ms720-880ms
8+30000ms27000-33000ms

Metrics

The transport exposes statistics through TransportStats:

MetricDescription
messages_sentTotal messages sent
messages_receivedTotal messages received
bytes_sentTotal bytes sent
bytes_receivedTotal bytes received
peer_countNumber of connected peers
connection_countTotal active connections
#![allow(unused)]
fn main() {
let stats = transport.stats();
println!("Messages sent: {}", stats.messages_sent);
println!("Connected peers: {}", stats.peer_count);
}

Error Handling

ErrorCauseRecovery
TimeoutOperation exceeded timeoutRetry with backoff
PeerNotFoundNo pool for peerEstablish connection first
HandshakeFailedProtocol mismatch or bad certCheck configuration
TlsRequiredTLS needed but not configuredConfigure TLS
MtlsRequiredmTLS needed but not enabledEnable client auth
RateLimitedToken bucket exhaustedWait for refill
CompressionDecompression failedCheck for data corruption

Usage Example

#![allow(unused)]
fn main() {
use tensor_chain::tcp::{
    TcpTransport, TcpTransportConfig, TlsConfig, SecurityMode,
    RateLimitConfig, CompressionConfig,
};

// Create secure production configuration
let tls = TlsConfig::new_secure(
    "/etc/neumann/node.crt",
    "/etc/neumann/node.key",
    "/etc/neumann/ca.crt",
);

let config = TcpTransportConfig::new("node1", "0.0.0.0:9100".parse()?)
    .with_tls(tls)
    .with_security_mode(SecurityMode::Strict)
    .with_rate_limit(RateLimitConfig::default())
    .with_compression(CompressionConfig::default())
    .with_pool_size(4);

// Validate security before starting
config.validate_security()?;

// Start transport
let transport = TcpTransport::new(config);
transport.start().await?;

// Connect to peer
transport.connect(&PeerConfig {
    node_id: "node2".to_string(),
    address: "10.0.1.2:9100".to_string(),
}).await?;

// Send message
transport.send(&"node2".to_string(), Message::Ping { term: 1 }).await?;

// Receive messages
let (from, msg) = transport.recv().await?;
}

Source Reference

  • tensor_chain/src/tcp/config.rs - Configuration types
  • tensor_chain/src/tcp/transport.rs - Transport implementation
  • tensor_chain/src/tcp/tls.rs - TLS wrapper
  • tensor_chain/src/tcp/rate_limit.rs - Token bucket rate limiter
  • tensor_chain/src/tcp/compression.rs - LZ4 compression
  • tensor_chain/src/tcp/framing.rs - Wire protocol codec
  • tensor_chain/src/tcp/connection.rs - Connection pool