24 KiB
Walkthrough: NATSBridge
Version: 1.0.0
Date: 2026-03-13
Status: Active
Ground Truth: src/NATSBridge.jl
Executive Summary
This document provides the story of flow for NATSBridge - the cross-platform bi-directional data bridge that enables seamless communication between Julia, JavaScript, Python, and MicroPython applications using NATS as the message bus.
This walkthrough serves as the primary onboarding guide for new developers and explains:
- How the system works - Step-by-step flow of data transmission and reception
- Why steps are sequenced - The rationale behind architectural decisions
- What could go wrong - Common failure scenarios and recovery strategies
Overview: The Big Picture
NATSBridge implements the Claim-Check pattern for efficient handling of large payloads (>0.5MB):
flowchart TD
subgraph "Sender Application"
A1[User Code]
A2[smartsend Function]
A3[Serialize Data]
A4[Transport Selection]
A5[Publish to NATS]
A6[File Server Upload]
end
subgraph "NATS Broker"
B1[NATS Server]
end
subgraph "Receiver Application"
C1[Subscribe to NATS]
C2[smartreceive Function]
C3[Deserialize Data]
C4[Transport Selection]
C5[Fetch from File Server]
end
A1 --> A2
A2 --> A3
A3 --> A4
A4 -->|Direct| A5
A4 -->|Link| A6
A6 --> A5
A5 --> B1
B1 --> C1
C1 --> C2
C2 --> C3
C2 --> C4
C4 -->|Fetch| C5
C5 --> C3
C3 --> C4
style A1 fill:#e8f5e9,stroke:#4caf50
style B1 fill:#fff3e0,stroke:#f57c00
style C1 fill:#e3f2fd,stroke:#2196f3
style A6 fill:#fef3c7,stroke:#f59e0b
style C5 fill:#fef3c7,stroke:#f59e0b
Key Design Principles
| Principle | Description | Rationale |
|---|---|---|
| Claim-Check Pattern | Large payloads uploaded to HTTP server, URL sent via NATS | NATS has message size limits; avoids NATS overflow |
| Automatic Transport Selection | Direct (< threshold) vs Link (≥ threshold) based on size | Optimizes memory vs network I/O trade-off |
| Cross-Platform API | Consistent smartsend()/smartreceive() across all platforms |
Simplifies developer experience |
| Exponential Backoff | Retry downloads with increasing delays | Handles transient failures gracefully |
The Sending Flow: smartsend()
Step-by-Step Journey
flowchart TD
A[User calls smartsend subject data] --> B[Process each payload]
B --> C{Parse payload tuple}
C --> D[Extract: dataname, data, payload_type]
D --> E[_serialize_data]
E --> F{payload_type}
F -->|"text"| G[UTF-8 encode]
F -->|"dictionary"| H[JSON serialize]
F -->|"arrowtable"| I[Arrow IPC serialize]
F -->|"jsontable"| J[JSON serialize]
F -->|"image"| K[Raw bytes]
F -->|"audio"| L[Raw bytes]
F -->|"video"| M[Raw bytes]
F -->|"binary"| N[Raw bytes]
G --> O[Return bytes]
H --> O
I --> O
J --> O
K --> O
L --> O
M --> O
N --> O
O --> P[Calculate serialized size]
P --> Q{Size < Threshold?}
Q -->|Yes| R[Direct Transport]
Q -->|No| S[Link Transport]
R --> T[Base64 encode]
T --> U[Build payload with direct]
S --> V[Upload to file server]
V --> W[Get download URL]
W --> U
U --> X[Build envelope]
X --> Y[Convert to JSON]
Y --> Z[Publish to NATS]
style A fill:#f9f9f9,stroke:#333
style Z fill:#e0e7ff,stroke:#3b82f6
style R fill:#d1fae5,stroke:#10b981
style S fill:#fef3c7,stroke:#f59e0b
Detailed Walkthrough
Step 1: User Calls smartsend()
# Julia
data = [
("msg", "Hello World", "text"),
("img", binary_data, "image")
]
env, msg_json = smartsend("/chat/user/v1/message", data)
# Python
data = [
("msg", "Hello World", "text"),
("img", binary_data, "image")
]
env, msg_json = await smartsend("/chat/user/v1/message", data)
// JavaScript
const data = [
["msg", "Hello World", "text"],
["img", binaryData, "image"]
];
const [env, msgJson] = await smartsend("/chat/user/v1/message", data);
What happens:
- User provides a list of tuples:
(dataname, data, payload_type) dataname: Identifier for the payload (e.g., "msg", "login_image")data: The actual data to sendpayload_type: Type string determining serialization method
Step 2: Serialization (_serialize_data)
Each payload is serialized based on its type:
| Payload Type | Julia | Python | JavaScript | Encoding |
|---|---|---|---|---|
text |
UTF-8 bytes | UTF-8 bytes | UTF-8 bytes | Base64 |
dictionary |
JSON string | JSON string | JSON string | Base64 |
arrowtable |
Arrow IPC | Arrow IPC | Arrow IPC | Base64/arrow-ipc |
jsontable |
JSON array | JSON array | JSON array | Base64/json |
image/audio/video/binary |
Raw bytes | Raw bytes | Raw bytes | Base64 |
Example:
# Text serialization
text_bytes = Vector{UInt8}("Hello World") # 11 bytes
# Dictionary serialization
dict_bytes = Vector{UInt8}("{\"key\":\"value\"}") # 17 bytes
# Arrow table serialization
io = IOBuffer()
Arrow.write(io, data_frame)
arrow_bytes = take!(io) # Binary Arrow IPC stream
Step 3: Transport Selection
The serialized size determines the transport method:
| Platform | Threshold | Notes |
|---|---|---|
| Desktop (Julia/JS/Python) | 500,000 bytes (0.5MB) | Default threshold |
| MicroPython | 100,000 bytes (100KB) | Lower threshold for memory constraints |
Decision Logic:
if payload_size < size_threshold
# Direct transport: send via NATS
else
# Link transport: upload to file server
end
Step 4: Direct Transport Path
For payloads < threshold:
- Base64 Encode: Convert binary data to ASCII string
- Build Payload: Create
msg_payload_v1withtransport="direct"
# Encode as Base64
payload_b64 = Base64.base64encode(payload_bytes)
# Build payload
payload = msg_payload_v1(
payload_b64,
payload_type;
transport = "direct",
encoding = "base64",
size = payload_size
)
Step 5: Link Transport Path
For payloads ≥ threshold:
- Upload to File Server: Use
plik_oneshot_upload() - Get Download URL: Server returns URL for the uploaded file
- Build Payload: Create
msg_payload_v1withtransport="link"
# Upload to Plik server
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
# Extract URL
url = response["url"]
# Build payload
payload = msg_payload_v1(
url,
payload_type;
transport = "link",
encoding = "none",
size = payload_size
)
File Server Handler Contract:
function fileserver_upload_handler(
file_server_url::String,
dataname::String,
data::Vector{UInt8}
)::Dict{String, Any}
# Returns: Dict("status" => 200, "uploadid" => "...", "fileid" => "...", "url" => "...")
end
Step 6: Build Envelope
All payloads are wrapped in a message envelope:
env = msg_envelope_v1(
subject,
payloads;
correlation_id = correlation_id,
msg_id = msg_id,
msg_purpose = msg_purpose,
sender_name = sender_name,
sender_id = sender_id,
receiver_name = receiver_name,
receiver_id = receiver_id,
reply_to = reply_to,
reply_to_msg_id = reply_to_msg_id,
broker_url = broker_url
)
Envelope Fields:
| Field | Purpose |
|---|---|
correlation_id |
Track message flow across distributed systems |
msg_id |
Unique identifier for this message |
timestamp |
ISO 8601 UTC timestamp |
send_to |
NATS subject to publish to |
msg_purpose |
ACK, NACK, updateStatus, shutdown, chat, command, event |
sender_name/sender_id |
Sender identification |
receiver_name/receiver_id |
Receiver identification (empty = broadcast) |
reply_to |
Topic for reply messages |
broker_url |
NATS server URL |
metadata |
Message-level metadata |
payloads |
Array of payload objects |
Step 7: Publish to NATS
The envelope is converted to JSON and published to NATS:
env_json_str = envelope_to_json(env)
# Publish with existing connection
publish_message(nats_connection, subject, env_json_str, correlation_id)
# Or publish by creating new connection
publish_message(broker_url, subject, env_json_str, correlation_id)
The Receiving Flow: smartreceive()
Step-by-Step Journey
flowchart TD
A[NATS message arrives] --> B[Parse JSON envelope]
B --> C[Extract payloads array]
C --> D{Iterate through payloads}
D --> E[Get payload transport]
E --> F{transport == direct?}
F -->|Yes| G[Extract Base64 data]
G --> H[Decode Base64]
H --> I[_deserialize_data]
F -->|No| J[Extract download URL]
J --> K[Fetch with exponential backoff]
K --> L[_deserialize_data]
I --> M[Build payload tuple]
L --> M
M --> N{More payloads?}
N -->|Yes| D
N -->|No| O[Replace payloads array]
O --> P[Return envelope]
style A fill:#f9f9f9,stroke:#333
style P fill:#e0e7ff,stroke:#3b82f6
style G fill:#d1fae5,stroke:#10b981
style J fill:#fef3c7,stroke:#f59e0b
Detailed Walkthrough
Step 1: NATS Message Arrives
The receiver gets a message from NATS:
# Julia
msg = nats_subscription.next() # Get next message
env = smartreceive(msg)
# Python
msg = await nats_consumer.next() # Get next message
env = await smartreceive(msg)
// JavaScript
const msg = await natsSubscription.next();
const env = await smartreceive(msg);
Step 2: Parse JSON Envelope
The message payload is parsed as JSON:
env_json_obj = JSON.parse(String(msg.payload))
Expected Structure:
{
"correlation_id": "abc123...",
"msg_id": "def456...",
"timestamp": "2026-03-13T07:02:50.443Z",
"send_to": "/chat/user/v1/message",
"msg_purpose": "chat",
"sender_name": "sender-app",
"sender_id": "sender-uuid...",
"receiver_name": "receiver-app",
"receiver_id": "receiver-uuid...",
"reply_to": "reply.subject",
"reply_to_msg_id": "msg-id...",
"broker_url": "nats://localhost:4222",
"metadata": {},
"payloads": [
{
"id": "payload-uuid...",
"dataname": "msg",
"payload_type": "text",
"transport": "direct",
"encoding": "base64",
"size": 11,
"data": "SGVsbG8gV29ybGQ=",
"metadata": {"payload_bytes": 11}
}
]
}
Step 3: Process Each Payload
For each payload in the envelope:
num_payloads = length(env_json_obj["payloads"])
for i in 1:num_payloads
payload = env_json_obj["payloads"][i]
transport = String(payload["transport"])
dataname = String(payload["dataname"])
if transport == "direct"
# Direct transport path
elseif transport == "link"
# Link transport path
else
error("Unknown transport type: $transport")
end
end
Step 4: Direct Transport Path
For payloads with transport == "direct":
- Extract Base64 Data: Get the Base64-encoded string
- Decode Base64: Convert to binary data
- Deserialize: Convert bytes to native data type
# Extract Base64 payload
payload_b64 = String(payload["data"])
# Decode Base64
payload_bytes = Base64.base64decode(payload_b64)
# Deserialize based on type
data_type = String(payload["payload_type"])
data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"])
Deserialization Logic:
| Payload Type | Deserialization |
|---|---|
text |
UTF-8 bytes → String |
dictionary |
UTF-8 bytes → JSON string → Julia object |
arrowtable |
UTF-8 bytes → Arrow IPC → DataFrame |
jsontable |
UTF-8 bytes → JSON string → Vector{Dict} → DataFrame |
image/audio/video/binary |
Bytes directly |
Step 5: Link Transport Path
For payloads with transport == "link":
- Extract URL: Get the download URL from payload
- Fetch with Backoff: Download data with retry logic
- Deserialize: Convert bytes to native data type
# Extract download URL
url = String(payload["data"])
# Fetch with exponential backoff
downloaded_data = fileserver_download_handler(
url,
max_retries,
base_delay,
max_delay,
env_json_obj["correlation_id"]
)
# Deserialize based on type
data_type = String(payload["payload_type"])
data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"])
Download Handler Contract:
function fileserver_download_handler(
url::String,
max_retries::Int,
base_delay::Int,
max_delay::Int,
correlation_id::String
)::Vector{UInt8}
# Returns: Vector{UInt8} (downloaded bytes)
end
Step 6: Build Payload List
Each processed payload is added to the result list:
payloads_list = Tuple{String, Any, String}[]
# After processing each payload
push!(payloads_list, (dataname, data, data_type))
Result Format:
[
("msg", "Hello World", "text"),
("img", binary_data, "image")
]
Step 7: Return Envelope
The envelope is updated with the processed payloads and returned:
env_json_obj["payloads"] = payloads_list
return env_json_obj
File Server Integration
Plik One-Shot Upload
NATSBridge uses Plik as the default HTTP file server for link transport:
# Upload handler
function plik_oneshot_upload(
file_server_url::String,
dataname::String,
data::Vector{UInt8}
)::Dict{String, Any}
Upload Flow:
- Create One-Shot Session: POST
/uploadwith{"OneShot": true} - Get Upload ID: Server returns
uploadidanduploadtoken - Upload File: POST
/file/{uploadid}with multipart form data - Get File ID: Server returns
fileid - Return URL: Construct download URL
# Step 1: Create one-shot session
POST /upload
Headers: Content-Type: application/json
Body: {"OneShot": true}
Response:
{
"id": "UPLOAD_ID",
"uploadToken": "UPLOAD_TOKEN",
"status": 200
}
# Step 2: Upload file
POST /file/UPLOAD_ID
Headers: X-UploadToken: UPLOAD_TOKEN
Body: multipart/form-data (file)
Response:
{
"id": "FILE_ID",
"status": 200
}
# Final URL: http://localhost:8080/file/UPLOAD_ID/FILE_ID/filename.ext
Exponential Backoff for Downloads
File downloads use exponential backoff for resilience:
function _fetch_with_backoff(
url::String,
max_retries::Int,
base_delay::Int,
max_delay::Int,
correlation_id::String
)::Vector{UInt8}
Retry Policy:
- Initial delay:
base_delaymilliseconds (default: 100ms) - Multiplier: 2x per retry
- Maximum delay:
max_delaymilliseconds (default: 5000ms) - Maximum retries:
max_retries(default: 5)
Delay Calculation:
delay = base_delay # Start with 100ms
for attempt in 1:max_retries
try
# Try to fetch
response = HTTP.request("GET", url)
if response.status == 200
return response.body
end
catch e
if attempt < max_retries
sleep(delay / 1000.0) # Sleep before retry
delay = min(delay * 2, max_delay) # Double delay, cap at max
end
end
end
error("Failed after $max_retries attempts")
Example Delays:
| Attempt | Delay |
|---|---|
| 1 | 100ms |
| 2 | 200ms |
| 3 | 400ms |
| 4 | 800ms |
| 5 | 1600ms (capped at 5000ms) |
Cross-Platform Compatibility
Platform-Specific Implementations
| Platform | File | Key Features |
|---|---|---|
| Julia | src/NATSBridge.jl |
Multiple dispatch, Arrow.jl support |
| Python | src/natsbridge.py |
Async/await, pyarrow support |
| Node.js | src/natsbridge_ssr.js |
Buffer, nats.js |
| Browser | src/natsbridge_csr.js |
Uint8Array, nats.ws, Web Crypto |
| MicroPython | src/natsbridge_mpy.py |
Synchronous, limited payload types |
API Parity
All platforms implement the same core API:
| Function | Julia | Python | JavaScript | MicroPython |
|---|---|---|---|---|
smartsend() |
✅ | ✅ | ✅ | ✅ |
smartreceive() |
✅ | ✅ | ✅ | ✅ |
plik_oneshot_upload() |
✅ | ✅ | ✅ | ⚠️ (placeholder) |
fetch_with_backoff() |
✅ | ✅ | ✅ | ⚠️ (placeholder) |
Payload Type Support by Platform
| Type | Julia | Python | Node.js | Browser | MicroPython |
|---|---|---|---|---|---|
text |
✅ | ✅ | ✅ | ✅ | ✅ |
dictionary |
✅ | ✅ | ✅ | ✅ | ✅ |
arrowtable |
✅ | ✅ | ✅ | ✅ | ❌ |
jsontable |
✅ | ✅ | ✅ | ✅ | ⚠️ |
image |
✅ | ✅ | ✅ | ✅ | ✅ |
audio |
✅ | ✅ | ✅ | ✅ | ✅ |
video |
✅ | ✅ | ✅ | ✅ | ✅ |
binary |
✅ | ✅ | ✅ | ✅ | ✅ |
Error Handling
Common Error Scenarios
| Scenario | Error Code | Recovery |
|---|---|---|
| Unknown payload_type | INVALID_PAYLOAD_TYPE |
Use supported payload_type |
| Failed to upload | UPLOAD_FAILED |
Retry or use direct transport |
| Failed to fetch | DOWNLOAD_FAILED |
Retry with exponential backoff |
| Unknown transport | INVALID_TRANSPORT |
Check payload transport field |
| NATS connection failed | NATS_CONNECTION_FAILED |
Check NATS server availability |
| Deserialization error | DESERIALIZATION_ERROR |
Validate payload_type matches data |
Error Response Format
{
"correlation_id": "abc123...",
"msg_id": "def456...",
"timestamp": "2026-03-13T07:02:50.443Z",
"send_to": "/chat/user/v1/message",
"error": {
"code": "DOWNLOAD_FAILED",
"message": "Failed to fetch data after 5 attempts",
"details": {
"url": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/filename.ext",
"correlation_id": "abc123..."
}
}
}
Exception Handling Examples
# File server unavailable
try
env, msg_json = smartsend("/subject", data)
catch e
# Retry with direct transport or use smaller payloads
end
# Deserialization error
try
env = smartreceive(msg)
catch e
# Log correlation_id and inspect payload structure
@error "Deserialization failed" exception=(e, env.correlation_id)
end
Debugging and Tracing
Correlation ID Tracking
Every message includes a correlation_id for distributed tracing:
# Generate correlation ID at start of request
correlation_id = string(uuid4())
# Use throughout the request flow
log_trace(correlation_id, "Starting smartsend for subject: $subject")
log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes")
log_trace(correlation_id, "Using direct transport for $payload_size bytes")
Log Format:
[2026-03-13T07:02:50.443Z] [Correlation: abc123...] Starting smartsend for subject: /chat/user/v1/message
[2026-03-13T07:02:50.445Z] [Correlation: abc123...] Serialized payload 'msg' (type: text) size: 11 bytes
[2026-03-13T07:02:50.446Z] [Correlation: abc123...] Using direct transport for 11 bytes
Logging in All Implementations
| Platform | Logging Method |
|---|---|
| Julia | @info macro |
| Python | print() with timestamp |
| JavaScript | console.log() |
| MicroPython | print() |
Testing the Flow
Example: End-to-End Test
# Sender side
data = [
("msg", "Hello", "text"),
("img", image_data, "image")
]
env, msg_json = smartsend("/chat/user/v1/message", data)
# Receiver side
msg = nats_subscription.next()
env = smartreceive(msg)
# Verify payloads
for (dataname, data, type_) in env["payloads"]
println("$dataname: $data (type: $type_)")
end
Test Scenarios
| Scenario | Payloads | Transport | Expected Result |
|---|---|---|---|
| Single text (small) | text |
direct | Round-trip successful |
| Single dictionary (small) | dictionary |
direct | Round-trip successful |
| Single arrow table (small) | arrowtable |
direct | Arrow IPC round-trip |
| Single image (large) | image |
link | File server upload/download |
| Mixed payloads | text + image |
direct + link | All payloads preserved |
Deployment Considerations
Minimum Infrastructure
| Component | Minimum | Notes |
|---|---|---|
| NATS Server | 1 instance | Single node for development |
| File Server | 1 instance | HTTP server for large payloads |
| Client Memory | 50MB | Desktop platforms |
| Client Memory | 256KB | MicroPython devices |
Environment Variables
| Variable | Default | Description |
|---|---|---|
NATS_URL |
nats://localhost:4222 |
NATS server URL |
FILESERVER_URL |
http://localhost:8080 |
HTTP file server URL |
SIZE_THRESHOLD |
1000000 |
Size threshold in bytes |
Container Deployment
# docker-compose.yml
version: '3'
services:
nats:
image: nats:latest
ports:
- "4222:4222"
plik:
image: rootfs/plik:latest
ports:
- "8080:8080"
volumes:
- plik-data:/data
app:
image: my-app:latest
depends_on:
- nats
- plik
Common Pitfalls
Pitfall 1: Payload Size Threshold
Issue: Payloads just above threshold may cause unnecessary file server uploads
Solution: Monitor payload sizes and adjust threshold based on:
- Network latency to file server
- Memory constraints
- File server performance
# Adjust threshold based on use case
env, msg_json = smartsend("/subject", data; size_threshold = 1_000_000) # 1MB
Pitfall 2: File Server Availability
Issue: File server down during upload/download
Solution: Implement fallback strategies:
- Fall back to direct transport for uploads
- Use smaller payloads to avoid link transport
- Implement application-level retries
# Fallback to direct transport if file upload fails
try
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
catch e
# Fall back to direct transport
payload_b64 = Base64.base64encode(payload_bytes)
# Build payload with direct transport
end
Pitfall 3: Payload Type Mismatch
Issue: Receiver deserializes with wrong payload_type
Solution: Always validate payload_type matches data:
- Sender and receiver must agree on payload types
- Use consistent payload_type strings across platforms
# Sender
smartsend("/subject", [("data", data, "arrowtable")])
# Receiver (must use same payload_type)
env = smartreceive(msg)
# env["payloads"][1][3] == "arrowtable"
Performance Considerations
Optimization Strategies
| Strategy | Description | When to Use |
|---|---|---|
| Pre-create NATS connection | Reuse connection for multiple sends | High-throughput scenarios |
| Batch small payloads | Combine multiple small payloads | Reduce NATS overhead |
| Adjust size threshold | Increase threshold if file server slow | File server bottleneck |
| Use direct transport | Avoid file server for small payloads | Low latency requirements |
Benchmarking
# Benchmark direct vs link transport
using BenchmarkTools
# Direct transport
@btime smartsend("/subject", [("data", rand(1000), "arrowtable")])
# Link transport (with file server)
@btime smartsend("/subject", [("data", rand(1_000_000), "arrowtable")])
Versioning
Current Version
- Major: 1 (Breaking changes require major version bump)
- Minor: 0 (Feature additions)
- Patch: 0 (Bug fixes)
Version Compatibility
| Version | Supported Platforms |
|---|---|
| v1.0.x | Julia 1.7+, Node.js 16+, Python 3.8+, MicroPython 1.19+ |
Change Log
| Date | Version | Changes |
|---|---|---|
| 2026-03-13 | 1.0.0 | Initial walkthrough documentation |
References
docs/requirements.md- Business requirements and user storiesdocs/spec.md- Technical specification and contractsdocs/architecture.md- System architecture diagramssrc/NATSBridge.jl- Ground truth implementationREADME.md- Project overview
This walkthrough document is versioned and maintained in git alongside the codebase. All implementations must adhere to this documentation.
<tool_call> <function=update_todo_list> <parameter=todos> [x] Analyze existing documentation (requirements.md, spec.md, architecture.md) [x] Read all source files in src/ folder [x] Write docs/walkthrough.md according to SDD framework