# Walkthrough: NATSBridge **Version**: 1.0.0 **Date**: 2026-03-13 **Status**: Active **Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl) --- ## Executive Summary This document provides the **story of flow** for NATSBridge - the cross-platform bi-directional data bridge that enables seamless communication between **Julia**, **JavaScript**, **Python**, and **MicroPython** applications using NATS as the message bus. This walkthrough serves as the primary onboarding guide for new developers and explains: - **How the system works** - Step-by-step flow of data transmission and reception - **Why steps are sequenced** - The rationale behind architectural decisions - **What could go wrong** - Common failure scenarios and recovery strategies --- ## Overview: The Big Picture NATSBridge implements the **Claim-Check pattern** for efficient handling of large payloads (>0.5MB): ```mermaid flowchart TD subgraph "Sender Application" A1[User Code] A2[smartsend] A3[_serialize_data] A4[Transport Selection] A5[Build Envelope] A6[Publish to NATS] end subgraph "NATS Broker" B1[NATS Server] end subgraph "Receiver Application" C1[Subscribe to NATS] C2[smartreceive] C3[Parse Envelope] C4[Process Payloads] C5[Deserialize Data] end A1 --> A2 A2 --> A3 A3 --> A4 A4 -->|Direct| A5 A4 -->|Link| A6 A6 --> B1 B1 --> C1 C1 --> C2 C2 --> C3 C3 --> C4 C4 --> C5 style A1 fill:#e8f5e9,stroke:#4caf50 style B1 fill:#fff3e0,stroke:#f57c00 style C1 fill:#e3f2fd,stroke:#2196f3 style A4 fill:#fef3c7,stroke:#f59e0b ``` ### Key Design Principles | Principle | Description | Rationale | |-----------|-------------|-----------| | **Claim-Check Pattern** | Large payloads uploaded to HTTP server, URL sent via NATS | NATS has message size limits; avoids NATS overflow | | **Automatic Transport Selection** | Direct (< threshold) vs Link (≥ threshold) based on size | Optimizes memory vs network I/O trade-off | | **Cross-Platform API** | Consistent `smartsend()`/`smartreceive()` across all platforms | Simplifies developer experience | | **Exponential Backoff** | Retry downloads with increasing delays | Handles transient failures gracefully | --- ## The Sending Flow: `smartsend()` ### Step-by-Step Journey ```mermaid flowchart TD A[User calls smartsend subject data] --> B[Process each payload] B --> C{Parse payload tuple} C --> D[Extract: dataname, data, payload_type] D --> E[_serialize_data] E --> F{payload_type} F -->|"text"| G[UTF-8 encode] F -->|"dictionary"| H[JSON serialize] F -->|"arrowtable"| I[Arrow IPC serialize] F -->|"jsontable"| J[JSON serialize] F -->|"image"| K[Raw bytes] F -->|"audio"| L[Raw bytes] F -->|"video"| M[Raw bytes] F -->|"binary"| N[Raw bytes] G --> O[Return bytes] H --> O I --> O J --> O K --> O L --> O M --> O N --> O O --> P[Calculate serialized size] P --> Q{Size < Threshold?} Q -->|Yes| R[Direct Transport] Q -->|No| S[Link Transport] R --> T[Base64 encode] T --> U[Build payload with direct] S --> V[Upload to file server] V --> W[Get download URL] W --> U U --> X[Build envelope] X --> Y[Convert to JSON] Y --> Z[Publish to NATS] style A fill:#f9f9f9,stroke:#333 style Z fill:#e0e7ff,stroke:#3b82f6 style R fill:#d1fae5,stroke:#10b981 style S fill:#fef3c7,stroke:#f59e0b ``` ### Detailed Walkthrough #### Step 1: User Calls `smartsend()` ```julia # Julia data = [ ("msg", "Hello World", "text"), ("img", binary_data, "image") ] env, msg_json = smartsend("/chat/user/v1/message", data) ``` ```python # Python data = [ ("msg", "Hello World", "text"), ("img", binary_data, "image") ] env, msg_json = await smartsend("/chat/user/v1/message", data) ``` ```javascript // JavaScript const data = [ ["msg", "Hello World", "text"], ["img", binaryData, "image"] ]; const [env, msgJson] = await smartsend("/chat/user/v1/message", data); ``` **What happens**: - User provides a list of tuples: `(dataname, data, payload_type)` - `dataname`: Identifier for the payload (e.g., "msg", "login_image") - `data`: The actual data to send - `payload_type`: Type string determining serialization method #### Step 2: Serialization (`_serialize_data`) Each payload is serialized based on its type: | Payload Type | Julia | Python | JavaScript | Encoding | |--------------|-------|--------|------------|----------| | `text` | UTF-8 bytes | UTF-8 bytes | UTF-8 bytes | Base64 | | `dictionary` | JSON string | JSON string | JSON string | Base64 | | `arrowtable` | Arrow IPC | Arrow IPC | Arrow IPC | Base64/arrow-ipc | | `jsontable` | JSON array | JSON array | JSON array | Base64/json | | `image`/`audio`/`video`/`binary` | Raw bytes | Raw bytes | Raw bytes | Base64 | **Example**: ```julia # Text serialization text_bytes = Vector{UInt8}("Hello World") # 11 bytes # Dictionary serialization dict_bytes = Vector{UInt8}("{\"key\":\"value\"}") # 17 bytes # Arrow table serialization io = IOBuffer() Arrow.write(io, data_frame) arrow_bytes = take!(io) # Binary Arrow IPC stream ``` #### Step 3: Transport Selection The serialized size determines the transport method: | Platform | Threshold | Notes | |----------|-----------|-------| | Desktop (Julia/JS/Python) | 500,000 bytes (0.5MB) | Default threshold | | MicroPython | 100,000 bytes (100KB) | Lower threshold for memory constraints | **Decision Logic**: ```julia if payload_size < size_threshold # Direct transport: send via NATS else # Link transport: upload to file server end ``` #### Step 4: Direct Transport Path For payloads < threshold: 1. **Base64 Encode**: Convert binary data to ASCII string 2. **Build Payload**: Create `msg_payload_v1` with `transport="direct"` ```julia # Encode as Base64 payload_b64 = Base64.base64encode(payload_bytes) # Build payload payload = msg_payload_v1( payload_b64, payload_type; transport = "direct", encoding = "base64", size = payload_size ) ``` #### Step 5: Link Transport Path For payloads ≥ threshold: 1. **Upload to File Server**: Use `plik_oneshot_upload()` 2. **Get Download URL**: Server returns URL for the uploaded file 3. **Build Payload**: Create `msg_payload_v1` with `transport="link"` ```julia # Upload to Plik server response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) # Extract URL url = response["url"] # Build payload payload = msg_payload_v1( url, payload_type; transport = "link", encoding = "none", size = payload_size ) ``` **File Server Handler Contract**: ```julia function fileserver_upload_handler( file_server_url::String, dataname::String, data::Vector{UInt8} )::Dict{String, Any} # Returns: Dict("status" => 200, "uploadid" => "...", "fileid" => "...", "url" => "...") end ``` #### Step 6: Build Envelope All payloads are wrapped in a message envelope: ```julia env = msg_envelope_v1( subject, payloads; correlation_id = correlation_id, msg_id = msg_id, msg_purpose = msg_purpose, sender_name = sender_name, sender_id = sender_id, receiver_name = receiver_name, receiver_id = receiver_id, reply_to = reply_to, reply_to_msg_id = reply_to_msg_id, broker_url = broker_url ) ``` **Envelope Fields**: | Field | Purpose | |-------|---------| | `correlation_id` | Track message flow across distributed systems | | `msg_id` | Unique identifier for this message | | `timestamp` | ISO 8601 UTC timestamp | | `send_to` | NATS subject to publish to | | `msg_purpose` | ACK, NACK, updateStatus, shutdown, chat, command, event | | `sender_name`/`sender_id` | Sender identification | | `receiver_name`/`receiver_id` | Receiver identification (empty = broadcast) | | `reply_to` | Topic for reply messages | | `broker_url` | NATS server URL | | `metadata` | Message-level metadata | | `payloads` | Array of payload objects | #### Step 7: Publish to NATS The envelope is converted to JSON and published to NATS: ```julia env_json_str = envelope_to_json(env) # Publish with existing connection publish_message(nats_connection, subject, env_json_str, correlation_id) # Or publish by creating new connection publish_message(broker_url, subject, env_json_str, correlation_id) ``` --- ## The Receiving Flow: `smartreceive()` ### Step-by-Step Journey ```mermaid flowchart TD A[NATS message arrives] --> B[Parse JSON envelope] B --> C[Extract payloads array] C --> D{Iterate through payloads} D --> E[Get payload transport] E --> F{transport == direct?} F -->|Yes| G[Extract Base64 data] G --> H[Decode Base64] H --> I[_deserialize_data] F -->|No| J[Extract download URL] J --> K[Fetch with exponential backoff] K --> L[_deserialize_data] I --> M[Build payload tuple] L --> M M --> N{More payloads?} N -->|Yes| D N -->|No| O[Replace payloads array] O --> P[Return envelope] style A fill:#f9f9f9,stroke:#333 style P fill:#e0e7ff,stroke:#3b82f6 style G fill:#d1fae5,stroke:#10b981 style J fill:#fef3c7,stroke:#f59e0b ``` ### Detailed Walkthrough #### Step 1: NATS Message Arrives The receiver gets a message from NATS: ```julia # Julia msg = nats_subscription.next() # Get next message env = smartreceive(msg) ``` ```python # Python msg = await nats_consumer.next() # Get next message env = await smartreceive(msg) ``` ```javascript // JavaScript const msg = await natsSubscription.next(); const env = await smartreceive(msg); ``` #### Step 2: Parse JSON Envelope The message payload is parsed as JSON: ```julia env_json_obj = JSON.parse(String(msg.payload)) ``` **Expected Structure**: ```json { "correlation_id": "abc123...", "msg_id": "def456...", "timestamp": "2026-03-13T07:02:50.443Z", "send_to": "/chat/user/v1/message", "msg_purpose": "chat", "sender_name": "sender-app", "sender_id": "sender-uuid...", "receiver_name": "receiver-app", "receiver_id": "receiver-uuid...", "reply_to": "reply.subject", "reply_to_msg_id": "msg-id...", "broker_url": "nats://localhost:4222", "metadata": {}, "payloads": [ { "id": "payload-uuid...", "dataname": "msg", "payload_type": "text", "transport": "direct", "encoding": "base64", "size": 11, "data": "SGVsbG8gV29ybGQ=", "metadata": {"payload_bytes": 11} } ] } ``` #### Step 3: Process Each Payload For each payload in the envelope: ```julia num_payloads = length(env_json_obj["payloads"]) for i in 1:num_payloads payload = env_json_obj["payloads"][i] transport = String(payload["transport"]) dataname = String(payload["dataname"]) if transport == "direct" # Direct transport path elseif transport == "link" # Link transport path else error("Unknown transport type: $transport") end end ``` #### Step 4: Direct Transport Path For payloads with `transport == "direct"`: 1. **Extract Base64 Data**: Get the Base64-encoded string 2. **Decode Base64**: Convert to binary data 3. **Deserialize**: Convert bytes to native data type ```julia # Extract Base64 payload payload_b64 = String(payload["data"]) # Decode Base64 payload_bytes = Base64.base64decode(payload_b64) # Deserialize based on type data_type = String(payload["payload_type"]) data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"]) ``` **Deserialization Logic**: | Payload Type | Deserialization | |--------------|-----------------| | `text` | UTF-8 bytes → String | | `dictionary` | UTF-8 bytes → JSON string → Julia object | | `arrowtable` | UTF-8 bytes → Arrow IPC → DataFrame | | `jsontable` | UTF-8 bytes → JSON string → Vector{Dict} → DataFrame | | `image`/`audio`/`video`/`binary` | Bytes directly | #### Step 5: Link Transport Path For payloads with `transport == "link"`: 1. **Extract URL**: Get the download URL from payload 2. **Fetch with Backoff**: Download data with retry logic 3. **Deserialize**: Convert bytes to native data type ```julia # Extract download URL url = String(payload["data"]) # Fetch with exponential backoff downloaded_data = fileserver_download_handler( url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"] ) # Deserialize based on type data_type = String(payload["payload_type"]) data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"]) ``` **Download Handler Contract**: ```julia function fileserver_download_handler( url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String )::Vector{UInt8} # Returns: Vector{UInt8} (downloaded bytes) end ``` #### Step 6: Build Payload List Each processed payload is added to the result list: ```julia payloads_list = Tuple{String, Any, String}[] # After processing each payload push!(payloads_list, (dataname, data, data_type)) ``` **Result Format**: ```julia [ ("msg", "Hello World", "text"), ("img", binary_data, "image") ] ``` #### Step 7: Return Envelope The envelope is updated with the processed payloads and returned: ```julia env_json_obj["payloads"] = payloads_list return env_json_obj ``` --- ## File Server Integration ### Plik One-Shot Upload NATSBridge uses **Plik** as the default HTTP file server for link transport: ```julia # Upload handler function plik_oneshot_upload( file_server_url::String, dataname::String, data::Vector{UInt8} )::Dict{String, Any} ``` **Upload Flow**: 1. **Create One-Shot Session**: POST `/upload` with `{"OneShot": true}` 2. **Get Upload ID**: Server returns `uploadid` and `uploadtoken` 3. **Upload File**: POST `/file/{uploadid}` with multipart form data 4. **Get File ID**: Server returns `fileid` 5. **Return URL**: Construct download URL ```julia # Step 1: Create one-shot session POST /upload Headers: Content-Type: application/json Body: {"OneShot": true} Response: { "id": "UPLOAD_ID", "uploadToken": "UPLOAD_TOKEN", "status": 200 } # Step 2: Upload file POST /file/UPLOAD_ID Headers: X-UploadToken: UPLOAD_TOKEN Body: multipart/form-data (file) Response: { "id": "FILE_ID", "status": 200 } # Final URL: http://localhost:8080/file/UPLOAD_ID/FILE_ID/filename.ext ``` ### Exponential Backoff for Downloads File downloads use exponential backoff for resilience: ```julia function _fetch_with_backoff( url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String )::Vector{UInt8} ``` **Retry Policy**: - Initial delay: `base_delay` milliseconds (default: 100ms) - Multiplier: 2x per retry - Maximum delay: `max_delay` milliseconds (default: 5000ms) - Maximum retries: `max_retries` (default: 5) **Delay Calculation**: ```julia delay = base_delay # Start with 100ms for attempt in 1:max_retries try # Try to fetch response = HTTP.request("GET", url) if response.status == 200 return response.body end catch e if attempt < max_retries sleep(delay / 1000.0) # Sleep before retry delay = min(delay * 2, max_delay) # Double delay, cap at max end end end error("Failed after $max_retries attempts") ``` **Example Delays**: | Attempt | Delay | |---------|-------| | 1 | 100ms | | 2 | 200ms | | 3 | 400ms | | 4 | 800ms | | 5 | 1600ms (capped at 5000ms) | --- ## Cross-Platform Compatibility ### Platform-Specific Implementations | Platform | File | Key Features | |----------|------|--------------| | Julia | `src/NATSBridge.jl` | Multiple dispatch, Arrow.jl support | | Python | `src/natsbridge.py` | Async/await, pyarrow support | | Node.js | `src/natsbridge_ssr.js` | Buffer, nats.js | | Browser | `src/natsbridge_csr.js` | Uint8Array, nats.ws, Web Crypto | | MicroPython | `src/natsbridge_mpy.py` | Synchronous, limited payload types | ### API Parity All platforms implement the same core API: | Function | Julia | Python | JavaScript | MicroPython | |----------|-------|--------|------------|-------------| | `smartsend()` | ✅ | ✅ | ✅ | ✅ | | `smartreceive()` | ✅ | ✅ | ✅ | ✅ | | `plik_oneshot_upload()` | ✅ | ✅ | ✅ | ⚠️ (placeholder) | | `fetch_with_backoff()` | ✅ | ✅ | ✅ | ⚠️ (placeholder) | ### Payload Type Support by Platform | Type | Julia | Python | Node.js | Browser | MicroPython | |------|-------|--------|---------|---------|-------------| | `text` | ✅ | ✅ | ✅ | ✅ | ✅ | | `dictionary` | ✅ | ✅ | ✅ | ✅ | ✅ | | `arrowtable` | ✅ | ✅ | ✅ | ✅ | ❌ | | `jsontable` | ✅ | ✅ | ✅ | ✅ | ⚠️ | | `image` | ✅ | ✅ | ✅ | ✅ | ✅ | | `audio` | ✅ | ✅ | ✅ | ✅ | ✅ | | `video` | ✅ | ✅ | ✅ | ✅ | ✅ | | `binary` | ✅ | ✅ | ✅ | ✅ | ✅ | --- ## Error Handling ### Common Error Scenarios | Scenario | Error Code | Recovery | |----------|------------|----------| | **Unknown payload_type** | `INVALID_PAYLOAD_TYPE` | Use supported payload_type | | **Failed to upload** | `UPLOAD_FAILED` | Retry or use direct transport | | **Failed to fetch** | `DOWNLOAD_FAILED` | Retry with exponential backoff | | **Unknown transport** | `INVALID_TRANSPORT` | Check payload transport field | | **NATS connection failed** | `NATS_CONNECTION_FAILED` | Check NATS server availability | | **Deserialization error** | `DESERIALIZATION_ERROR` | Validate payload_type matches data | ### Error Response Format ```json { "correlation_id": "abc123...", "msg_id": "def456...", "timestamp": "2026-03-13T07:02:50.443Z", "send_to": "/chat/user/v1/message", "error": { "code": "DOWNLOAD_FAILED", "message": "Failed to fetch data after 5 attempts", "details": { "url": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/filename.ext", "correlation_id": "abc123..." } } } ``` ### Exception Handling Examples ```julia # File server unavailable try env, msg_json = smartsend("/subject", data) catch e # Retry with direct transport or use smaller payloads end # Deserialization error try env = smartreceive(msg) catch e # Log correlation_id and inspect payload structure @error "Deserialization failed" exception=(e, env.correlation_id) end ``` --- ## Debugging and Tracing ### Correlation ID Tracking Every message includes a `correlation_id` for distributed tracing: ```julia # Generate correlation ID at start of request correlation_id = string(uuid4()) # Use throughout the request flow log_trace(correlation_id, "Starting smartsend for subject: $subject") log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes") log_trace(correlation_id, "Using direct transport for $payload_size bytes") ``` **Log Format**: ``` [2026-03-13T07:02:50.443Z] [Correlation: abc123...] Starting smartsend for subject: /chat/user/v1/message [2026-03-13T07:02:50.445Z] [Correlation: abc123...] Serialized payload 'msg' (type: text) size: 11 bytes [2026-03-13T07:02:50.446Z] [Correlation: abc123...] Using direct transport for 11 bytes ``` ### Logging in All Implementations | Platform | Logging Method | |----------|----------------| | Julia | `@info` macro | | Python | `print()` with timestamp | | JavaScript | `console.log()` | | MicroPython | `print()` | --- ## Testing the Flow ### Example: End-to-End Test ```julia # Sender side data = [ ("msg", "Hello", "text"), ("img", image_data, "image") ] env, msg_json = smartsend("/chat/user/v1/message", data) # Receiver side msg = nats_subscription.next() env = smartreceive(msg) # Verify payloads for (dataname, data, type_) in env["payloads"] println("$dataname: $data (type: $type_)") end ``` ### Test Scenarios | Scenario | Payloads | Transport | Expected Result | |----------|----------|-----------|-----------------| | Single text (small) | `text` | direct | Round-trip successful | | Single dictionary (small) | `dictionary` | direct | Round-trip successful | | Single arrow table (small) | `arrowtable` | direct | Arrow IPC round-trip | | Single image (large) | `image` | link | File server upload/download | | Mixed payloads | `text` + `image` | direct + link | All payloads preserved | --- ## Deployment Considerations ### Minimum Infrastructure | Component | Minimum | Notes | |-----------|---------|-------| | NATS Server | 1 instance | Single node for development | | File Server | 1 instance | HTTP server for large payloads | | Client Memory | 50MB | Desktop platforms | | Client Memory | 256KB | MicroPython devices | ### Environment Variables | Variable | Default | Description | |----------|---------|-------------| | `NATS_URL` | `nats://localhost:4222` | NATS server URL | | `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | | `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes | ### Container Deployment ```yaml # docker-compose.yml version: '3' services: nats: image: nats:latest ports: - "4222:4222" plik: image: rootfs/plik:latest ports: - "8080:8080" volumes: - plik-data:/data app: image: my-app:latest depends_on: - nats - plik ``` --- ## Common Pitfalls ### Pitfall 1: Payload Size Threshold **Issue**: Payloads just above threshold may cause unnecessary file server uploads **Solution**: Monitor payload sizes and adjust threshold based on: - Network latency to file server - Memory constraints - File server performance ```julia # Adjust threshold based on use case env, msg_json = smartsend("/subject", data; size_threshold = 1_000_000) # 1MB ``` ### Pitfall 2: File Server Availability **Issue**: File server down during upload/download **Solution**: Implement fallback strategies: - Fall back to direct transport for uploads - Use smaller payloads to avoid link transport - Implement application-level retries ```julia # Fallback to direct transport if file upload fails try response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) catch e # Fall back to direct transport payload_b64 = Base64.base64encode(payload_bytes) # Build payload with direct transport end ``` ### Pitfall 3: Payload Type Mismatch **Issue**: Receiver deserializes with wrong payload_type **Solution**: Always validate payload_type matches data: - Sender and receiver must agree on payload types - Use consistent payload_type strings across platforms ```julia # Sender smartsend("/subject", [("data", data, "arrowtable")]) # Receiver (must use same payload_type) env = smartreceive(msg) # env["payloads"][1][3] == "arrowtable" ``` --- ## Performance Considerations ### Optimization Strategies | Strategy | Description | When to Use | |----------|-------------|-------------| | **Pre-create NATS connection** | Reuse connection for multiple sends | High-throughput scenarios | | **Batch small payloads** | Combine multiple small payloads | Reduce NATS overhead | | **Adjust size threshold** | Increase threshold if file server slow | File server bottleneck | | **Use direct transport** | Avoid file server for small payloads | Low latency requirements | ### Benchmarking ```julia # Benchmark direct vs link transport using BenchmarkTools # Direct transport @btime smartsend("/subject", [("data", rand(1000), "arrowtable")]) # Link transport (with file server) @btime smartsend("/subject", [("data", rand(1_000_000), "arrowtable")]) ``` --- ## Versioning ### Current Version - **Major**: 1 (Breaking changes require major version bump) - **Minor**: 0 (Feature additions) - **Patch**: 0 (Bug fixes) ### Version Compatibility | Version | Supported Platforms | |---------|---------------------| | v1.0.x | Julia 1.7+, Node.js 16+, Python 3.8+, MicroPython 1.19+ | --- ## Change Log | Date | Version | Changes | |------|---------|---------| | 2026-03-13 | 1.0.0 | Initial walkthrough documentation | --- ## References - [`docs/requirements.md`](./requirements.md) - Business requirements and user stories - [`docs/spec.md`](./spec.md) - Technical specification and contracts - [`docs/architecture.md`](./architecture.md) - System architecture diagrams - [`src/NATSBridge.jl`](../src/NATSBridge.jl) - Ground truth implementation - [`README.md`](../README.md) - Project overview --- *This walkthrough document is versioned and maintained in git alongside the codebase. All implementations must adhere to this documentation.* [x] Analyze existing documentation (requirements.md, spec.md, architecture.md) [x] Read all source files in src/ folder [x] Write docs/walkthrough.md according to SDD framework