Files
NATSBridge/docs/walkthrough.md
2026-03-13 20:53:35 +07:00

27 KiB

Walkthrough: NATSBridge

Version: 1.0.0
Date: 2026-03-13
Status: Active
Ground Truth: src/NATSBridge.jl


Executive Summary

This document provides the story of flow for NATSBridge - the cross-platform bi-directional data bridge that enables seamless communication between Julia, JavaScript, Python, and MicroPython applications using NATS as the message bus.

This walkthrough serves as the primary onboarding guide for new developers and explains:

  • How the system works - Step-by-step flow of data transmission and reception
  • Why steps are sequenced - The rationale behind architectural decisions
  • What could go wrong - Common failure scenarios and recovery strategies

Overview: The Big Picture

NATSBridge implements the Claim-Check pattern for efficient handling of large payloads (>0.5MB):

┌─────────────────────────────────────────────────────────────────────┐
│                    NATSBridge Architecture                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────┐        ┌──────────────┐                          │
│  │   Sender     │        │   Receiver   │                          │
│  │              │        │              │                          │
│  │ ┌──────────┐ │        │ ┌──────────┐ │                          │
│  │ │smartsend │◀─────────┤ │smartreceive│ │                          │
│  │ └────┬─────┘ │        │ └────┬─────┘ │                          │
│  │      │       │        │      │       │                          │
│  │      ▼       │        │      ▼       │                          │
│  │ ┌──────────┐ │        │ ┌──────────┐ │                          │
│  │ │Serialize │◀─────────┤ │Deserialize│ │                          │
│  │ └────┬─────┘ │        │ └────┬─────┘ │                          │
│  │      │       │        │      │       │                          │
│  │      ▼       │        │      ▼       │                          │
│  │ ┌──────────┐ │        │ ┌──────────┐ │                          │
│  │ │Transport  │◀─────────┤ │Transport │ │                          │
│  │ │Selection │ │        │ │Selection │ │                          │
│  │ └────┬─────┘ │        │ └────┬─────┘ │                          │
│  │      │       │        │      │       │                          │
│  │      ▼       │        │      ▼       │                          │
│  │ ┌──────────┐ │        │ ┌──────────┐ │                          │
│  │ │  NATS    │◀─────────┤ │  NATS    │ │                          │
│  │ │Publish   │ │        │ │Subscribe │ │                          │
│  │ └──────────┘ │        │ └──────────┘ │                          │
│  │              │        │              │                          │
│  │ ┌──────────┐ │        │ ┌──────────┐ │                          │
│  │ │File Server│◀─────────┤ │File Server│ │                          │
│  │ │Upload    │ │        │ │Download  │ │                          │
│  │ └──────────┘ │        │ └──────────┘ │                          │
│  └──────────────┘        └──────────────┘                          │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Key Design Principles

Principle Description Rationale
Claim-Check Pattern Large payloads uploaded to HTTP server, URL sent via NATS NATS has message size limits; avoids NATS overflow
Automatic Transport Selection Direct (< threshold) vs Link (≥ threshold) based on size Optimizes memory vs network I/O trade-off
Cross-Platform API Consistent smartsend()/smartreceive() across all platforms Simplifies developer experience
Exponential Backoff Retry downloads with increasing delays Handles transient failures gracefully

The Sending Flow: smartsend()

Step-by-Step Journey

flowchart TD
    A[User calls smartsend subject data] --> B[Process each payload]
    B --> C{Parse payload tuple}
    C --> D[Extract: dataname, data, payload_type]
    
    D --> E[_serialize_data]
    E --> F{payload_type}
    
    F -->|"text"| G[UTF-8 encode]
    F -->|"dictionary"| H[JSON serialize]
    F -->|"arrowtable"| I[Arrow IPC serialize]
    F -->|"jsontable"| J[JSON serialize]
    F -->|"image"| K[Raw bytes]
    F -->|"audio"| L[Raw bytes]
    F -->|"video"| M[Raw bytes]
    F -->|"binary"| N[Raw bytes]
    
    G --> O[Return bytes]
    H --> O
    I --> O
    J --> O
    K --> O
    L --> O
    M --> O
    N --> O
    
    O --> P[Calculate serialized size]
    P --> Q{Size < Threshold?}
    
    Q -->|Yes| R[Direct Transport]
    Q -->|No| S[Link Transport]
    
    R --> T[Base64 encode]
    T --> U[Build payload with direct]
    
    S --> V[Upload to file server]
    V --> W[Get download URL]
    W --> U
    
    U --> X[Build envelope]
    X --> Y[Convert to JSON]
    Y --> Z[Publish to NATS]
    
    style A fill:#f9f9f9,stroke:#333
    style Z fill:#e0e7ff,stroke:#3b82f6
    style R fill:#d1fae5,stroke:#10b981
    style S fill:#fef3c7,stroke:#f59e0b

Detailed Walkthrough

Step 1: User Calls smartsend()

# Julia
data = [
    ("msg", "Hello World", "text"),
    ("img", binary_data, "image")
]
env, msg_json = smartsend("/chat/user/v1/message", data)
# Python
data = [
    ("msg", "Hello World", "text"),
    ("img", binary_data, "image")
]
env, msg_json = await smartsend("/chat/user/v1/message", data)
// JavaScript
const data = [
    ["msg", "Hello World", "text"],
    ["img", binaryData, "image"]
];
const [env, msgJson] = await smartsend("/chat/user/v1/message", data);

What happens:

  • User provides a list of tuples: (dataname, data, payload_type)
  • dataname: Identifier for the payload (e.g., "msg", "login_image")
  • data: The actual data to send
  • payload_type: Type string determining serialization method

Step 2: Serialization (_serialize_data)

Each payload is serialized based on its type:

Payload Type Julia Python JavaScript Encoding
text UTF-8 bytes UTF-8 bytes UTF-8 bytes Base64
dictionary JSON string JSON string JSON string Base64
arrowtable Arrow IPC Arrow IPC Arrow IPC Base64/arrow-ipc
jsontable JSON array JSON array JSON array Base64/json
image/audio/video/binary Raw bytes Raw bytes Raw bytes Base64

Example:

# Text serialization
text_bytes = Vector{UInt8}("Hello World")  # 11 bytes

# Dictionary serialization
dict_bytes = Vector{UInt8}("{\"key\":\"value\"}")  # 17 bytes

# Arrow table serialization
io = IOBuffer()
Arrow.write(io, data_frame)
arrow_bytes = take!(io)  # Binary Arrow IPC stream

Step 3: Transport Selection

The serialized size determines the transport method:

Platform Threshold Notes
Desktop (Julia/JS/Python) 500,000 bytes (0.5MB) Default threshold
MicroPython 100,000 bytes (100KB) Lower threshold for memory constraints

Decision Logic:

if payload_size < size_threshold
    # Direct transport: send via NATS
else
    # Link transport: upload to file server
end

Step 4: Direct Transport Path

For payloads < threshold:

  1. Base64 Encode: Convert binary data to ASCII string
  2. Build Payload: Create msg_payload_v1 with transport="direct"
# Encode as Base64
payload_b64 = Base64.base64encode(payload_bytes)

# Build payload
payload = msg_payload_v1(
    payload_b64,
    payload_type;
    transport = "direct",
    encoding = "base64",
    size = payload_size
)

For payloads ≥ threshold:

  1. Upload to File Server: Use plik_oneshot_upload()
  2. Get Download URL: Server returns URL for the uploaded file
  3. Build Payload: Create msg_payload_v1 with transport="link"
# Upload to Plik server
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)

# Extract URL
url = response["url"]

# Build payload
payload = msg_payload_v1(
    url,
    payload_type;
    transport = "link",
    encoding = "none",
    size = payload_size
)

File Server Handler Contract:

function fileserver_upload_handler(
    file_server_url::String,
    dataname::String,
    data::Vector{UInt8}
)::Dict{String, Any}
    # Returns: Dict("status" => 200, "uploadid" => "...", "fileid" => "...", "url" => "...")
end

Step 6: Build Envelope

All payloads are wrapped in a message envelope:

env = msg_envelope_v1(
    subject,
    payloads;
    correlation_id = correlation_id,
    msg_id = msg_id,
    msg_purpose = msg_purpose,
    sender_name = sender_name,
    sender_id = sender_id,
    receiver_name = receiver_name,
    receiver_id = receiver_id,
    reply_to = reply_to,
    reply_to_msg_id = reply_to_msg_id,
    broker_url = broker_url
)

Envelope Fields:

Field Purpose
correlation_id Track message flow across distributed systems
msg_id Unique identifier for this message
timestamp ISO 8601 UTC timestamp
send_to NATS subject to publish to
msg_purpose ACK, NACK, updateStatus, shutdown, chat, command, event
sender_name/sender_id Sender identification
receiver_name/receiver_id Receiver identification (empty = broadcast)
reply_to Topic for reply messages
broker_url NATS server URL
metadata Message-level metadata
payloads Array of payload objects

Step 7: Publish to NATS

The envelope is converted to JSON and published to NATS:

env_json_str = envelope_to_json(env)

# Publish with existing connection
publish_message(nats_connection, subject, env_json_str, correlation_id)

# Or publish by creating new connection
publish_message(broker_url, subject, env_json_str, correlation_id)

The Receiving Flow: smartreceive()

Step-by-Step Journey

flowchart TD
    A[NATS message arrives] --> B[Parse JSON envelope]
    B --> C[Extract payloads array]
    C --> D{Iterate through payloads}
    
    D --> E[Get payload transport]
    E --> F{transport == direct?}
    
    F -->|Yes| G[Extract Base64 data]
    G --> H[Decode Base64]
    H --> I[_deserialize_data]
    
    F -->|No| J[Extract download URL]
    J --> K[Fetch with exponential backoff]
    K --> L[_deserialize_data]
    
    I --> M[Build payload tuple]
    L --> M
    
    M --> N{More payloads?}
    N -->|Yes| D
    N -->|No| O[Replace payloads array]
    O --> P[Return envelope]
    
    style A fill:#f9f9f9,stroke:#333
    style P fill:#e0e7ff,stroke:#3b82f6
    style G fill:#d1fae5,stroke:#10b981
    style J fill:#fef3c7,stroke:#f59e0b

Detailed Walkthrough

Step 1: NATS Message Arrives

The receiver gets a message from NATS:

# Julia
msg = nats_subscription.next()  # Get next message
env = smartreceive(msg)
# Python
msg = await nats_consumer.next()  # Get next message
env = await smartreceive(msg)
// JavaScript
const msg = await natsSubscription.next();
const env = await smartreceive(msg);

Step 2: Parse JSON Envelope

The message payload is parsed as JSON:

env_json_obj = JSON.parse(String(msg.payload))

Expected Structure:

{
  "correlation_id": "abc123...",
  "msg_id": "def456...",
  "timestamp": "2026-03-13T07:02:50.443Z",
  "send_to": "/chat/user/v1/message",
  "msg_purpose": "chat",
  "sender_name": "sender-app",
  "sender_id": "sender-uuid...",
  "receiver_name": "receiver-app",
  "receiver_id": "receiver-uuid...",
  "reply_to": "reply.subject",
  "reply_to_msg_id": "msg-id...",
  "broker_url": "nats://localhost:4222",
  "metadata": {},
  "payloads": [
    {
      "id": "payload-uuid...",
      "dataname": "msg",
      "payload_type": "text",
      "transport": "direct",
      "encoding": "base64",
      "size": 11,
      "data": "SGVsbG8gV29ybGQ=",
      "metadata": {"payload_bytes": 11}
    }
  ]
}

Step 3: Process Each Payload

For each payload in the envelope:

num_payloads = length(env_json_obj["payloads"])

for i in 1:num_payloads
    payload = env_json_obj["payloads"][i]
    transport = String(payload["transport"])
    dataname = String(payload["dataname"])
    
    if transport == "direct"
        # Direct transport path
    elseif transport == "link"
        # Link transport path
    else
        error("Unknown transport type: $transport")
    end
end

Step 4: Direct Transport Path

For payloads with transport == "direct":

  1. Extract Base64 Data: Get the Base64-encoded string
  2. Decode Base64: Convert to binary data
  3. Deserialize: Convert bytes to native data type
# Extract Base64 payload
payload_b64 = String(payload["data"])

# Decode Base64
payload_bytes = Base64.base64decode(payload_b64)

# Deserialize based on type
data_type = String(payload["payload_type"])
data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"])

Deserialization Logic:

Payload Type Deserialization
text UTF-8 bytes → String
dictionary UTF-8 bytes → JSON string → Julia object
arrowtable UTF-8 bytes → Arrow IPC → DataFrame
jsontable UTF-8 bytes → JSON string → Vector{Dict} → DataFrame
image/audio/video/binary Bytes directly

For payloads with transport == "link":

  1. Extract URL: Get the download URL from payload
  2. Fetch with Backoff: Download data with retry logic
  3. Deserialize: Convert bytes to native data type
# Extract download URL
url = String(payload["data"])

# Fetch with exponential backoff
downloaded_data = fileserver_download_handler(
    url,
    max_retries,
    base_delay,
    max_delay,
    env_json_obj["correlation_id"]
)

# Deserialize based on type
data_type = String(payload["payload_type"])
data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"])

Download Handler Contract:

function fileserver_download_handler(
    url::String,
    max_retries::Int,
    base_delay::Int,
    max_delay::Int,
    correlation_id::String
)::Vector{UInt8}
    # Returns: Vector{UInt8} (downloaded bytes)
end

Step 6: Build Payload List

Each processed payload is added to the result list:

payloads_list = Tuple{String, Any, String}[]

# After processing each payload
push!(payloads_list, (dataname, data, data_type))

Result Format:

[
    ("msg", "Hello World", "text"),
    ("img", binary_data, "image")
]

Step 7: Return Envelope

The envelope is updated with the processed payloads and returned:

env_json_obj["payloads"] = payloads_list
return env_json_obj

File Server Integration

Plik One-Shot Upload

NATSBridge uses Plik as the default HTTP file server for link transport:

# Upload handler
function plik_oneshot_upload(
    file_server_url::String,
    dataname::String,
    data::Vector{UInt8}
)::Dict{String, Any}

Upload Flow:

  1. Create One-Shot Session: POST /upload with {"OneShot": true}
  2. Get Upload ID: Server returns uploadid and uploadtoken
  3. Upload File: POST /file/{uploadid} with multipart form data
  4. Get File ID: Server returns fileid
  5. Return URL: Construct download URL
# Step 1: Create one-shot session
POST /upload
Headers: Content-Type: application/json
Body: {"OneShot": true}

Response:
{
  "id": "UPLOAD_ID",
  "uploadToken": "UPLOAD_TOKEN",
  "status": 200
}

# Step 2: Upload file
POST /file/UPLOAD_ID
Headers: X-UploadToken: UPLOAD_TOKEN
Body: multipart/form-data (file)

Response:
{
  "id": "FILE_ID",
  "status": 200
}

# Final URL: http://localhost:8080/file/UPLOAD_ID/FILE_ID/filename.ext

Exponential Backoff for Downloads

File downloads use exponential backoff for resilience:

function _fetch_with_backoff(
    url::String,
    max_retries::Int,
    base_delay::Int,
    max_delay::Int,
    correlation_id::String
)::Vector{UInt8}

Retry Policy:

  • Initial delay: base_delay milliseconds (default: 100ms)
  • Multiplier: 2x per retry
  • Maximum delay: max_delay milliseconds (default: 5000ms)
  • Maximum retries: max_retries (default: 5)

Delay Calculation:

delay = base_delay  # Start with 100ms

for attempt in 1:max_retries
    try
        # Try to fetch
        response = HTTP.request("GET", url)
        if response.status == 200
            return response.body
        end
    catch e
        if attempt < max_retries
            sleep(delay / 1000.0)  # Sleep before retry
            delay = min(delay * 2, max_delay)  # Double delay, cap at max
        end
    end
end

error("Failed after $max_retries attempts")

Example Delays:

Attempt Delay
1 100ms
2 200ms
3 400ms
4 800ms
5 1600ms (capped at 5000ms)

Cross-Platform Compatibility

Platform-Specific Implementations

Platform File Key Features
Julia src/NATSBridge.jl Multiple dispatch, Arrow.jl support
Python src/natsbridge.py Async/await, pyarrow support
Node.js src/natsbridge_ssr.js Buffer, nats.js
Browser src/natsbridge_csr.js Uint8Array, nats.ws, Web Crypto
MicroPython src/natsbridge_mpy.py Synchronous, limited payload types

API Parity

All platforms implement the same core API:

Function Julia Python JavaScript MicroPython
smartsend()
smartreceive()
plik_oneshot_upload() ⚠️ (placeholder)
fetch_with_backoff() ⚠️ (placeholder)

Payload Type Support by Platform

Type Julia Python Node.js Browser MicroPython
text
dictionary
arrowtable
jsontable ⚠️
image
audio
video
binary

Error Handling

Common Error Scenarios

Scenario Error Code Recovery
Unknown payload_type INVALID_PAYLOAD_TYPE Use supported payload_type
Failed to upload UPLOAD_FAILED Retry or use direct transport
Failed to fetch DOWNLOAD_FAILED Retry with exponential backoff
Unknown transport INVALID_TRANSPORT Check payload transport field
NATS connection failed NATS_CONNECTION_FAILED Check NATS server availability
Deserialization error DESERIALIZATION_ERROR Validate payload_type matches data

Error Response Format

{
  "correlation_id": "abc123...",
  "msg_id": "def456...",
  "timestamp": "2026-03-13T07:02:50.443Z",
  "send_to": "/chat/user/v1/message",
  "error": {
    "code": "DOWNLOAD_FAILED",
    "message": "Failed to fetch data after 5 attempts",
    "details": {
      "url": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/filename.ext",
      "correlation_id": "abc123..."
    }
  }
}

Exception Handling Examples

# File server unavailable
try
    env, msg_json = smartsend("/subject", data)
catch e
    # Retry with direct transport or use smaller payloads
end

# Deserialization error
try
    env = smartreceive(msg)
catch e
    # Log correlation_id and inspect payload structure
    @error "Deserialization failed" exception=(e, env.correlation_id)
end

Debugging and Tracing

Correlation ID Tracking

Every message includes a correlation_id for distributed tracing:

# Generate correlation ID at start of request
correlation_id = string(uuid4())

# Use throughout the request flow
log_trace(correlation_id, "Starting smartsend for subject: $subject")
log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes")
log_trace(correlation_id, "Using direct transport for $payload_size bytes")

Log Format:

[2026-03-13T07:02:50.443Z] [Correlation: abc123...] Starting smartsend for subject: /chat/user/v1/message
[2026-03-13T07:02:50.445Z] [Correlation: abc123...] Serialized payload 'msg' (type: text) size: 11 bytes
[2026-03-13T07:02:50.446Z] [Correlation: abc123...] Using direct transport for 11 bytes

Logging in All Implementations

Platform Logging Method
Julia @info macro
Python print() with timestamp
JavaScript console.log()
MicroPython print()

Testing the Flow

Example: End-to-End Test

# Sender side
data = [
    ("msg", "Hello", "text"),
    ("img", image_data, "image")
]
env, msg_json = smartsend("/chat/user/v1/message", data)

# Receiver side
msg = nats_subscription.next()
env = smartreceive(msg)

# Verify payloads
for (dataname, data, type_) in env["payloads"]
    println("$dataname: $data (type: $type_)")
end

Test Scenarios

Scenario Payloads Transport Expected Result
Single text (small) text direct Round-trip successful
Single dictionary (small) dictionary direct Round-trip successful
Single arrow table (small) arrowtable direct Arrow IPC round-trip
Single image (large) image link File server upload/download
Mixed payloads text + image direct + link All payloads preserved

Deployment Considerations

Minimum Infrastructure

Component Minimum Notes
NATS Server 1 instance Single node for development
File Server 1 instance HTTP server for large payloads
Client Memory 50MB Desktop platforms
Client Memory 256KB MicroPython devices

Environment Variables

Variable Default Description
NATS_URL nats://localhost:4222 NATS server URL
FILESERVER_URL http://localhost:8080 HTTP file server URL
SIZE_THRESHOLD 1000000 Size threshold in bytes

Container Deployment

# docker-compose.yml
version: '3'
services:
  nats:
    image: nats:latest
    ports:
      - "4222:4222"
  
  plik:
    image: rootfs/plik:latest
    ports:
      - "8080:8080"
    volumes:
      - plik-data:/data
  
  app:
    image: my-app:latest
    depends_on:
      - nats
      - plik

Common Pitfalls

Pitfall 1: Payload Size Threshold

Issue: Payloads just above threshold may cause unnecessary file server uploads

Solution: Monitor payload sizes and adjust threshold based on:

  • Network latency to file server
  • Memory constraints
  • File server performance
# Adjust threshold based on use case
env, msg_json = smartsend("/subject", data; size_threshold = 1_000_000)  # 1MB

Pitfall 2: File Server Availability

Issue: File server down during upload/download

Solution: Implement fallback strategies:

  • Fall back to direct transport for uploads
  • Use smaller payloads to avoid link transport
  • Implement application-level retries
# Fallback to direct transport if file upload fails
try
    response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
catch e
    # Fall back to direct transport
    payload_b64 = Base64.base64encode(payload_bytes)
    # Build payload with direct transport
end

Pitfall 3: Payload Type Mismatch

Issue: Receiver deserializes with wrong payload_type

Solution: Always validate payload_type matches data:

  • Sender and receiver must agree on payload types
  • Use consistent payload_type strings across platforms
# Sender
smartsend("/subject", [("data", data, "arrowtable")])

# Receiver (must use same payload_type)
env = smartreceive(msg)
# env["payloads"][1][3] == "arrowtable"

Performance Considerations

Optimization Strategies

Strategy Description When to Use
Pre-create NATS connection Reuse connection for multiple sends High-throughput scenarios
Batch small payloads Combine multiple small payloads Reduce NATS overhead
Adjust size threshold Increase threshold if file server slow File server bottleneck
Use direct transport Avoid file server for small payloads Low latency requirements

Benchmarking

# Benchmark direct vs link transport
using BenchmarkTools

# Direct transport
@btime smartsend("/subject", [("data", rand(1000), "arrowtable")])

# Link transport (with file server)
@btime smartsend("/subject", [("data", rand(1_000_000), "arrowtable")])

Versioning

Current Version

  • Major: 1 (Breaking changes require major version bump)
  • Minor: 0 (Feature additions)
  • Patch: 0 (Bug fixes)

Version Compatibility

Version Supported Platforms
v1.0.x Julia 1.7+, Node.js 16+, Python 3.8+, MicroPython 1.19+

Change Log

Date Version Changes
2026-03-13 1.0.0 Initial walkthrough documentation

References


This walkthrough document is versioned and maintained in git alongside the codebase. All implementations must adhere to this documentation.

<tool_call> <function=update_todo_list> <parameter=todos> [x] Analyze existing documentation (requirements.md, spec.md, architecture.md) [x] Read all source files in src/ folder [x] Write docs/walkthrough.md according to SDD framework