67 KiB
Cross-Platform Implementation Guide: Bi-Directional Data Bridge
Overview
This document describes the implementation of the high-performance, bi-directional data bridge using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with high-level API parity while maintaining idiomatic implementations for each language.
Supported Platforms:
- Julia - Ground truth implementation (reference)
- JavaScript - Node.js and browser implementation
- Python/MicroPython - Desktop and embedded implementation
Implementation Files
| Language | Implementation File | Description |
|---|---|---|
| Julia | src/NATSBridge.jl |
Full Julia implementation with Arrow IPC support |
| JavaScript | src/natsbridge.js |
Node.js/browser implementation |
| Python | src/natsbridge.py |
Desktop Python implementation |
| MicroPython | src/natsbridge_mpy.py |
MicroPython implementation (limited features) |
File Server Handler Architecture
The system uses handler functions to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
Handler Function Signatures
Julia
# Upload handler - uploads data to file server and returns URL
fileserver_upload_handler(
fileserver_url::String,
dataname::String,
data::Vector{UInt8}
)::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
fileserver_download_handler(
url::String,
max_retries::Int,
base_delay::Int,
max_delay::Int,
correlation_id::String
)::Vector{UInt8}
JavaScript
// Upload handler - async function
async function fileserver_upload_handler(
fileserver_url,
dataname,
data // Uint8Array
) {
// Returns: { status, uploadid, fileid, url }
}
// Download handler - async function
async function fileserver_download_handler(
url,
max_retries,
base_delay,
max_delay,
correlation_id
) {
// Returns: Uint8Array
}
Python
# Upload handler - async function
async def fileserver_upload_handler(
fileserver_url: str,
dataname: str,
data: bytes
) -> Dict[str, Any]:
"""
Upload data to file server.
Returns:
Dict with keys: 'status', 'uploadid', 'fileid', 'url'
"""
pass
# Download handler - async function
async def fileserver_download_handler(
url: str,
max_retries: int,
base_delay: int,
max_delay: int,
correlation_id: str
) -> bytes:
"""
Download data from URL with exponential backoff.
Returns:
Downloaded bytes
"""
pass
MicroPython
# Upload handler - synchronous (no async in MicroPython)
def fileserver_upload_handler(
fileserver_url: str,
dataname: str,
data: bytearray
) -> Dict:
"""
Upload data to file server (synchronous).
Returns:
Dict with keys: 'status', 'url'
"""
pass
# Download handler - synchronous
def fileserver_download_handler(
url: str,
max_retries: int,
base_delay: int,
max_delay: int,
correlation_id: str
) -> bytearray:
"""
Download data from URL with exponential backoff (synchronous).
Returns:
Downloaded bytes
"""
pass
Multi-Payload Support (Standard API)
The system uses a standardized list-of-tuples format for all payload operations across all platforms.
API Standard
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
{
"correlation_id": "...",
"msg_id": "...",
"timestamp": "...",
"send_to": "...",
"msg_purpose": "...",
"sender_name": "...",
"sender_id": "...",
"receiver_name": "...",
"receiver_id": "...",
"reply_to": "...",
"reply_to_msg_id": "...",
"broker_url": "...",
"metadata": {...},
"payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
}
Supported Types
| Type | Julia | JavaScript | Python | MicroPython |
|---|---|---|---|---|
text |
String |
string |
str |
str |
dictionary |
Dict, NamedTuple |
Object, Array |
dict, list |
dict |
arrowtable |
DataFrame, Arrow.Table |
Array<Object> (input) → Buffer (Arrow IPC) |
pandas.DataFrame, bytes (Arrow IPC) |
❌ (not supported) |
jsontable |
Vector{NamedTuple}, Vector{Dict} |
Array<Object> |
list[dict], list |
list |
image |
Vector{UInt8} |
Uint8Array, Buffer |
bytes |
bytearray |
audio |
Vector{UInt8} |
Uint8Array, Buffer |
bytes |
bytearray |
video |
Vector{UInt8} |
Uint8Array, Buffer |
bytes |
bytearray |
binary |
Vector{UInt8}, IOBuffer |
Uint8Array, Buffer |
bytes, bytearray |
bytearray |
Cross-Platform Examples
Julia
using NATSBridge
# Single payload - still wrapped in a list
env, env_json_str = smartsend(
"/test",
[("dataname1", data1, "dictionary")],
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads with different types
env, env_json_str = smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")],
broker_url="nats://localhost:4222"
)
# Mixed content (chat with text, image, audio)
env, env_json_str = smartsend(
"/chat",
[
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
broker_url="nats://localhost:4222"
)
# Receive returns a JSON.Object{String, Any} envelope
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}}
# Access payloads: env["payloads"] which is a Vector of tuples
for (dataname, data, type) in env["payloads"]
println("$dataname: $data (type: $type)")
end
JavaScript
const NATSBridge = require('natsbridge');
// Single payload
const [env, env_json_str] = await NATSBridge.smartsend(
"/test",
[["dataname1", data1, "dictionary"]],
{
broker_url: "nats://localhost:4222",
fileserver_upload_handler: plikOneshotUpload
}
);
// Multiple payloads
const [env, env_json_str] = await NATSBridge.smartsend(
"/test",
[
["dataname1", data1, "dictionary"],
["dataname2", data2, "arrowtable"]
],
{ broker_url: "nats://localhost:4222" }
);
// Mixed content
const [env, env_json_str] = await NATSBridge.smartsend(
"/chat",
[
["message_text", "Hello!", "text"],
["user_image", imageData, "image"],
["audio_clip", audioData, "audio"]
],
{ broker_url: "nats://localhost:4222" }
);
// Receive
const env = await NATSBridge.smartreceive(msg, {
fileserver_download_handler: fetchWithBackoff
});
// env is an object with "payloads" field containing Array of arrays
// Access payloads: env.payloads which is an Array of [dataname, data, type] arrays
for (const [dataname, data, type] of env.payloads) {
console.log(`${dataname}: ${data} (type: ${type})`);
}
Python
from natsbridge import NATSBridge
# Single payload
env, env_json_str = await NATSBridge.smartsend(
"/test",
[("dataname1", data1, "dictionary")],
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads
env, env_json_str = await NATSBridge.smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")],
broker_url="nats://localhost:4222"
)
# Mixed content
env, env_json_str = await NATSBridge.smartsend(
"/chat",
[
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
broker_url="nats://localhost:4222"
)
# Receive
env = await NATSBridge.smartreceive(
msg,
fileserver_download_handler=fetch_with_backoff
)
# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
# Access payloads: env["payloads"] which is a list of tuples
for dataname, data, type_ in env["payloads"]:
print(f"{dataname}: {data} (type: {type_})")
MicroPython
from natsbridge import NATSBridge
# Limited to text and binary (no tables due to memory constraints)
env, env_json_str = NATSBridge.smartsend(
"/chat",
[
("message_text", "Hello!", "text"),
("binary_data", data_bytes, "binary")
],
broker_url="nats://localhost:4222",
size_threshold=100000 # Lower threshold for memory constraints
)
# Note: MicroPython uses synchronous handlers
Row-Oriented vs Column-Oriented Data Structures
Different platforms use different internal representations for tabular data. Understanding these differences is crucial for proper serialization/deserialization when using jsontable and arrowtable datatypes.
Data Structure Comparison
| Platform | Table Structure | Orientation |
|---|---|---|
| Julia (DataFrame) | Dict{String, Vector} |
Column-oriented |
| Python (pandas) | dict[str, list] |
Column-oriented |
| JavaScript | Array<Object> |
Row-oriented |
| MicroPython | list[list] |
Row-oriented |
Column-Oriented (Julia DataFrame, Python pandas)
In column-oriented structures, each column is stored as a separate array/vector:
Julia Example:
# Create dictionary with column vectors
dict = Dict("customer age" => [15, 20, 25],
"first name" => ["Rohit", "Rahul", "Akshat"])
# Convert to DataFrame
df = DataFrame(dict)
println(df)
# Output:
# 3×2 DataFrame
# Row ┆ customer age ┆ first name
# ┆ Int64 ┆ String
# ─────┼──────────────┼────────────
# 1 ┆ 15 ┆ "Rohit"
# 2 ┆ 20 ┆ "Rahul"
# 3 ┆ 25 ┆ "Akshat"
Python Example:
# Create dictionary with column lists
data = {
"Name": ["Alice", "Bob", "Charlie"],
"Age": [25, 30, 35],
"Score": [88.5, 92.0, 79.5]
}
# Convert to DataFrame
df = pd.DataFrame(data)
print(df)
# Output:
# Name Age Score
# 0 Alice 25 88.5
# 1 Bob 30 92.0
# 2 Charlie 35 79.5
Row-Oriented (JavaScript, MicroPython)
In row-oriented structures, each row is stored as a separate object/array:
JavaScript Example:
// Array of objects (row-oriented)
const users = [
{ Name: "Alice", Age: 25, Score: 88.5 },
{ Name: "Bob", Age: 30, Score: 92.0 },
{ Name: "Charlie", Age: 35, Score: 79.5 }
];
MicroPython Example:
# List of lists (row-oriented)
users = [
["Alice", 25, 88.5],
["Bob", 30, 92.0],
["Charlie", 35, 79.5]
]
Cross-Platform Conversion for jsontable
When sending jsontable across platforms, the system performs automatic conversion between row-oriented and column-oriented formats:
Sending from Julia/Python (column-oriented) to JS/MicroPython (row-oriented):
- Convert column-oriented dict to row-oriented array of objects
- Serialize to JSON
- Send with
payload_type = "jsontable"
Receiving from JS/MicroPython (row-oriented) to Julia/Python (column-oriented):
- Deserialize JSON to row-oriented array of objects
- Convert to column-oriented dict
- Create DataFrame from column-oriented dict
Example: Julia to JavaScript
# Julia side - column-oriented DataFrame
df = DataFrame(
"Name" => ["Alice", "Bob", "Charlie"],
"Age" => [25, 30, 35],
"Score" => [88.5, 92.0, 79.5]
)
# smartsend automatically converts to row-oriented JSON
env, env_json_str = smartsend(
"/data",
[("users", df, "jsontable")]
)
# JSON sent: [{"Name":"Alice","Age":25,"Score":88.5}, ...]
// JavaScript side - receives row-oriented array
const [env, env_json_str] = await NATSBridge.smartsend(
"/data",
[["users", users, "jsontable"]]
);
// users is already row-oriented: [{Name: "Alice", Age: 25, ...}, ...]
Example: JavaScript to Julia
// JavaScript side - row-oriented array
const users = [
{ Name: "Alice", Age: 25, Score: 88.5 },
{ Name: "Bob", Age: 30, Score: 92.0 }
];
const [env, env_json_str] = await NATSBridge.smartsend(
"/data",
[["users", users, "jsontable"]]
);
# Julia side - receives and converts to column-oriented DataFrame
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
# The jsontable is automatically converted to DataFrame
for (dataname, data, type) in env["payloads"]
if type == "jsontable"
# data is now a DataFrame with column-oriented structure
println(data)
# Output:
# 2×3 DataFrame
# Row ┆ Name ┆ Age ┆ Score
# ┆ String ┆ Int64 ┆ Float64
# ─────┼────────┼──────┼───────
# 1 ┆ Alice ┆ 25 ┆ 88.5
# 2 ┆ Bob ┆ 30 ┆ 92.0
end
end
Architecture
Cross-Platform Claim-Check Pattern
flowchart TD
A[SmartSend Function] --> B{Is payload size < 1MB?}
B -->|Yes | C[Direct Path<br/><small>< 1MB</small>]
B -->|No | D[Link Path<br/><small>>= 1MB</small>]
C --> C1[Serialize to Buffer]
C1 --> C2[Base64/JSON encode]
C2 --> C3[Publish to NATS]
D --> D1[Serialize to Buffer]
D1 --> D2[Upload to HTTP Server]
D2 --> D3[Publish to NATS with URL]
style A fill:#e1f5ff,stroke:#0066cc,stroke-width:2px
style B fill:#fff4e1,stroke:#cc6600,stroke-width:2px
style C fill:#e8f5e9,stroke:#008000,stroke-width:2px
style D fill:#e8f5e9,stroke:#008000,stroke-width:2px
style C1 fill:#f5f5f5,stroke:#666,stroke-width:1px
style C2 fill:#f5f5f5,stroke:#666,stroke-width:1px
style C3 fill:#f5f5f5,stroke:#666,stroke-width:1px
style D1 fill:#f5f5f5,stroke:#666,stroke-width:1px
style D2 fill:#f5f5f5,stroke:#666,stroke-width:1px
style D3 fill:#f5f5f5,stroke:#666,stroke-width:1px
Claim-Check Pattern Overview:
- Direct Path (< 1MB): Payload is serialized, Base64-encoded, and published directly to NATS
- Link Path (≥ 1MB): Payload is serialized, uploaded to an HTTP file server, and only the URL is published to NATS (claim-check pattern)
smartsend Return Value
All platforms return a tuple/array containing both the envelope and JSON string:
Julia
env, env_json_str = smartsend(...)
# Returns: ::Tuple{msg_envelope_v1, String}
# env::msg_envelope_v1 - The envelope object with all metadata and payloads
# env_json_str::String - JSON string for publishing to NATS
JavaScript
const [env, env_json_str] = await smartsend(...);
// Returns: Promise<[env, env_json_str]>
// env: Object with all metadata and payloads
// env_json_str: String for publishing to NATS
Python
env, env_json_str = await smartsend(...)
# Returns: Tuple[Dict, str]
# env: Dict with all metadata and payloads
# env_json_str: String for publishing to NATS
MicroPython
env, env_json_str = NATSBridge.smartsend(...)
# Returns: Tuple[Dict, str]
# Note: MicroPython returns plain dicts (no structured envelope object)
Installation
Julia Dependencies
using Pkg
Pkg.add("NATS")
Pkg.add("Arrow")
Pkg.add("JSON3")
Pkg.add("HTTP")
Pkg.add("UUIDs")
Pkg.add("Dates")
JavaScript Dependencies (Node.js)
npm install nats uuid apache-arrow node-fetch
# or
yarn add nats uuid apache-arrow node-fetch
JavaScript Dependencies (Browser)
npm install nats uuid apache-arrow
# or use CDN:
# https://unpkg.com/nats-js/dist/bundle/nats.min.js
# https://unpkg.com/apache-arrow/arrow.min.js
Python Dependencies (Desktop)
pip install nats-py aiohttp pyarrow pandas python-dateutil
MicroPython Dependencies
MicroPython uses built-in modules:
network- NATS connection (custom implementation)time- Timestampsuos- File operationsbase64- Base64 encodingjson- JSON parsingstruct- Binary data handling
Usage Tutorial
Step 1: Start NATS Server
docker run -p 4222:4222 nats:latest
Step 2: Start HTTP File Server (optional)
# Create a directory for file uploads
mkdir -p /tmp/fileserver
# Use any HTTP server that supports POST for file uploads
# Example: Python's built-in server
python3 -m http.server 8080 --directory /tmp/fileserver
Step 3: Run Test Scenarios
# Julia tests
julia test/test_julia_to_julia_text_sender.jl
julia test/test_julia_to_julia_text_receiver.jl
# JavaScript tests (Node.js)
node test/test_js_text_sender.js
node test/test_js_text_receiver.js
# Python tests
python3 test/test_py_text_sender.py
python3 test/test_py_text_receiver.py
Platform-Specific Implementations
Julia Implementation
Module Structure
module NATSBridge
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64
# Constants
const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB
const DEFAULT_BROKER_URL = "nats://localhost:4222"
const DEFAULT_FILESERVER_URL = "http://localhost:8080"
# Structs
struct msg_payload_v1
id::String
dataname::String
payload_type::String
transport::String
encoding::String
size::Integer
data::Any
metadata::Dict{String, Any}
end
struct msg_envelope_v1
correlation_id::String
msg_id::String
timestamp::String
send_to::String
msg_purpose::String
sender_name::String
sender_id::String
receiver_name::String
receiver_id::String
reply_to::String
reply_to_msg_id::String
broker_url::String
metadata::Dict{String, Any}
payloads::Vector{msg_payload_v1}
end
# Main functions
function smartsend(...) end
function smartreceive(...) end
# Utility functions
function _serialize_data(...) end
function _deserialize_data(...) end
function envelope_to_json(...) end
function log_trace(...) end
# File server handlers
function plik_oneshot_upload(...) end
function _fetch_with_backoff(...) end
function publish_message(...) end
# Internal helpers
function _get_payload_bytes(...) end
end
Multiple Dispatch Pattern
Julia leverages multiple dispatch for type-specific implementations:
# publish_message has two overloads based on argument types
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url)
publish_message(conn, subject, message, correlation_id)
end
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message)
log_trace(correlation_id, "Message published to $subject")
finally
NATS.drain(conn)
end
end
# Type-specific serialization
function _serialize_data(data::String, payload_type::String)
# Text handling
return Vector{UInt8}(data)
end
function _serialize_data(data::Dict, payload_type::String)
# Dictionary handling
json_str = JSON.json(data)
return Vector{UInt8}(json_str)
end
function _serialize_data(data::DataFrame, payload_type::String)
# Table handling - arrowtable
io = IOBuffer()
Arrow.write(io, data)
return take!(io)
end
smartsend Implementation
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, T1, String}, 1};
broker_url::String = DEFAULT_BROKER_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::String = string(uuid4()),
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true,
NATS_connection::Union{NATS.Connection, Nothing} = nothing,
msg_id::String = string(uuid4()),
sender_id::String = string(uuid4())
)::Tuple{msg_envelope_v1, String} where {T1<:Any}
log_trace(correlation_id, "Starting smartsend for subject: $subject")
# Process each payload in the list
payloads = msg_payload_v1[]
for (dataname, payload_data, payload_type) in data
# Serialize data based on type
payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = length(payload_bytes)
log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes")
# Decision: Direct vs Link
if payload_size < size_threshold
# Direct path - Base64 encode and send via NATS
payload_b64 = Base64.base64encode(payload_bytes)
log_trace(correlation_id, "Using direct transport for $payload_size bytes")
payload = msg_payload_v1(
payload_b64,
payload_type;
id = string(uuid4()),
dataname = dataname,
transport = "direct",
encoding = "base64",
size = payload_size,
metadata = Dict{String, Any}("payload_bytes" => payload_size)
)
push!(payloads, payload)
else
# Link path - Upload to HTTP server, send URL via NATS
log_trace(correlation_id, "Using link transport, uploading to fileserver")
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
if response["status"] != 200
error("Failed to upload data to fileserver: $(response["status"])")
end
url = response["url"]
log_trace(correlation_id, "Uploaded to URL: $url")
payload = msg_payload_v1(
url,
payload_type;
id = string(uuid4()),
dataname = dataname,
transport = "link",
encoding = "none",
size = payload_size,
metadata = Dict{String, Any}()
)
push!(payloads, payload)
end
end
# Create msg_envelope_v1 with all payloads
# Note: First positional argument is "send_to" (the NATS subject), not "subject"
env = msg_envelope_v1(
subject, # send_to: NATS subject to publish to
payloads;
correlation_id = correlation_id,
msg_id = msg_id,
msg_purpose = msg_purpose,
sender_name = sender_name,
sender_id = sender_id,
receiver_name = receiver_name,
receiver_id = receiver_id,
reply_to = reply_to,
reply_to_msg_id = reply_to_msg_id,
broker_url = broker_url,
metadata = Dict{String, Any}(),
)
env_json_str = envelope_to_json(env)
if is_publish == false
# skip publish
elseif is_publish == true && NATS_connection === nothing
publish_message(broker_url, subject, env_json_str, correlation_id)
elseif is_publish == true && NATS_connection !== nothing
publish_message(NATS_connection, subject, env_json_str, correlation_id)
end
return (env, env_json_str)
end
smartreceive Implementation
function smartreceive(
msg::NATS.Msg;
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)::JSON.Object{String, Any}
# Parse the JSON envelope
env_json_obj = JSON.parse(String(msg.payload))
log_trace(env_json_obj["correlation_id"], "Processing received message")
# Process all payloads in the envelope
payloads_list = Tuple{String, Any, String}[]
num_payloads = length(env_json_obj["payloads"])
for i in 1:num_payloads
payload = env_json_obj["payloads"][i]
transport = String(payload["transport"])
dataname = String(payload["dataname"])
if transport == "direct"
log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'")
# Extract base64 payload from the payload
payload_b64 = String(payload["data"])
# Decode Base64 payload
payload_bytes = Base64.base64decode(payload_b64)
# Deserialize based on type
data_type = String(payload["payload_type"])
data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"])
push!(payloads_list, (dataname, data, data_type))
elseif transport == "link"
# Extract download URL from the payload
url = String(payload["data"])
log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url")
# Fetch with exponential backoff using the download handler
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"])
# Deserialize based on type
data_type = String(payload["payload_type"])
data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"])
push!(payloads_list, (dataname, data, data_type))
else
error("Unknown transport type for payload '$dataname': $(transport)")
end
end
env_json_obj["payloads"] = payloads_list
return env_json_obj
end
_serialize_data Implementation
function _serialize_data(data::Any, payload_type::String)
if payload_type == "text"
if isa(data, String)
data_bytes = Vector{UInt8}(data)
return data_bytes
else
error("Text data must be a String")
end
elseif payload_type == "dictionary"
json_str = JSON.json(data)
json_str_bytes = Vector{UInt8}(json_str)
return json_str_bytes
elseif payload_type == "arrowtable"
# Serialize DataFrame to Arrow IPC format
io = IOBuffer()
Arrow.write(io, data)
return take!(io)
elseif payload_type == "jsontable"
# Convert column-oriented to row-oriented JSON
# data is Vector{NamedTuple} or Vector{Dict}
json_str = JSON.json(data)
return Vector{UInt8}(json_str)
elseif payload_type == "image"
if isa(data, Vector{UInt8})
return data
else
error("Image data must be Vector{UInt8}")
end
elseif payload_type == "audio"
if isa(data, Vector{UInt8})
return data
else
error("Audio data must be Vector{UInt8}")
end
elseif payload_type == "video"
if isa(data, Vector{UInt8})
return data
else
error("Video data must be Vector{UInt8}")
end
elseif payload_type == "binary"
if isa(data, IOBuffer)
return take!(data)
elseif isa(data, Vector{UInt8})
return data
else
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
end
else
error("Unknown payload_type: $payload_type")
end
end
_deserialize_data Implementation
function _deserialize_data(
data::Vector{UInt8},
payload_type::String,
correlation_id::String
)
if payload_type == "text"
return String(data)
elseif payload_type == "dictionary"
json_str = String(data)
return JSON.parse(json_str)
elseif payload_type == "arrowtable"
# Deserialize from Arrow IPC format
io = IOBuffer(data)
arrow_table = Arrow.Table(io)
return arrow_table
elseif payload_type == "jsontable"
# Deserialize from JSON format
# Returns Vector{NamedTuple} (column-oriented compatible)
json_str = String(data)
parsed = JSON.parse(json_str)
return parsed
elseif payload_type == "image"
return data
elseif payload_type == "audio"
return data
elseif payload_type == "video"
return data
elseif payload_type == "binary"
return data
else
error("Unknown payload_type: $payload_type")
end
end
_fetch_with_backoff Implementation
function _fetch_with_backoff(
url::String,
max_retries::Int,
base_delay::Int,
max_delay::Int,
correlation_id::String
)
delay = base_delay
for attempt in 1:max_retries
try
response = HTTP.request("GET", url)
if response.status == 200
log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt")
return response.body
else
error("Failed to fetch: $(response.status)")
end
catch e
log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))")
if attempt < max_retries
sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
end
end
end
error("Failed to fetch data after $max_retries attempts")
end
plik_oneshot_upload Implementation
function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8})
# Get upload id
url_getUploadID = "$file_server_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
response_json = JSON.parse(http_response.body)
uploadid = response_json["id"]
uploadtoken = response_json["uploadToken"]
# Upload file
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$file_server_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict(
"file" => file_multipart
))
http_response = nothing
try
http_response = HTTP.post(url_upload, headers, form)
catch e
@error "Request failed" exception=e
end
response_json = JSON.parse(http_response.body)
fileid = response_json["id"]
url = "$file_server_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
JavaScript Implementation
Module Structure
// natsbridge.js
const nats = require('nats');
const { v4: uuidv4 } = require('uuid');
const fetch = require('node-fetch');
const DEFAULT_SIZE_THRESHOLD = 1_000_000;
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
class NATSClient {
constructor(url) {
this.url = url;
this.connection = null;
}
async connect() {
this.connection = await nats.connect({ servers: this.url });
return this.connection;
}
async publish(subject, message) {
if (!this.connection) {
await this.connect();
}
await this.connection.publish(subject, message);
}
async close() {
if (this.connection) {
this.connection.close();
}
}
}
async function smartsend(subject, data, options = {}) {
// Implementation
}
async function smartreceive(msg, options = {}) {
// Implementation
}
module.exports = {
NATSClient,
smartsend,
smartreceive,
plikOneshotUpload,
fetchWithBackoff
};
smartsend Implementation
const nats = require('nats');
const { v4: uuidv4 } = require('uuid');
const fetch = require('node-fetch');
const arrow = require('apache-arrow');
const DEFAULT_SIZE_THRESHOLD = 1_000_000;
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
async function smartsend(subject, data, options = {}) {
const {
broker_url = DEFAULT_BROKER_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler = plikOneshotUpload,
size_threshold = DEFAULT_SIZE_THRESHOLD,
correlation_id = uuidv4(),
msg_purpose = 'chat',
sender_name = 'NATSBridge',
receiver_name = '',
receiver_id = '',
reply_to = '',
reply_to_msg_id = '',
is_publish = true,
nats_connection = null,
msg_id = uuidv4(),
sender_id = uuidv4()
} = options;
console.log(`[Correlation: ${correlation_id}] Starting smartsend for subject: ${subject}`);
// Process payloads
const payloads = [];
for (const [dataname, payloadData, payloadType] of data) {
const payloadBytes = await serializeData(payloadData, payloadType);
const payloadSize = payloadBytes.byteLength;
console.log(`[Correlation: ${correlation_id}] Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);
if (payloadSize < size_threshold) {
// Direct path
const payloadB64 = bufferToBase64(payloadBytes);
console.log(`[Correlation: ${correlation_id}] Using direct transport for ${payloadSize} bytes`);
payloads.push({
id: uuidv4(),
dataname,
payload_type: payloadType,
transport: 'direct',
encoding: 'base64',
size: payloadSize,
data: payloadB64,
metadata: { payload_bytes: payloadSize }
});
} else {
// Link path
console.log(`[Correlation: ${correlation_id}] Using link transport, uploading to fileserver`);
const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes);
if (response.status !== 200) {
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
}
console.log(`[Correlation: ${correlation_id}] Uploaded to URL: ${response.url}`);
payloads.push({
id: uuidv4(),
dataname,
payload_type: payloadType,
transport: 'link',
encoding: 'none',
size: payloadSize,
data: response.url,
metadata: {}
});
}
}
// Build envelope
const env = {
correlation_id,
msg_id,
timestamp: new Date().toISOString(),
send_to: subject,
msg_purpose,
sender_name,
sender_id,
receiver_name,
receiver_id,
reply_to,
reply_to_msg_id,
broker_url,
metadata: {},
payloads
};
const env_json_str = JSON.stringify(env);
if (is_publish) {
if (nats_connection) {
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
} else {
await publishMessage(broker_url, subject, env_json_str, correlation_id);
}
}
return [env, env_json_str];
}
serializeData Implementation
const arrow = require('apache-arrow');
async function serializeData(data, payload_type) {
if (payload_type === 'text') {
if (typeof data === 'string') {
return Buffer.from(data, 'utf8');
} else {
throw new Error('Text data must be a string');
}
} else if (payload_type === 'dictionary') {
const jsonStr = JSON.stringify(data);
return Buffer.from(jsonStr, 'utf8');
} else if (payload_type === 'arrowtable') {
// Convert Array<Object> to Arrow IPC
// data is row-oriented: [{id: 1, name: "Alice"}, ...]
if (!Array.isArray(data) || data.length === 0) {
throw new Error('arrowtable data must be a non-empty array of objects');
}
// Create schema from first row
const schemaFields = Object.keys(data[0]).map(key =>
new arrow.Field(key, arrow.any())
);
const schema = new arrow.Schema(schemaFields);
// Create writer
const writer = new arrow.RecordBatchWriter([schema]);
// Write rows
for (const row of data) {
const recordBatch = arrow.recordBatch.fromObjects([row], schema);
writer.write(recordBatch);
}
await writer.close();
// Read buffer
return writer.toBuffer();
} else if (payload_type === 'jsontable') {
// data is already row-oriented Array<Object>
// Serialize directly to JSON
const jsonStr = JSON.stringify(data);
return Buffer.from(jsonStr, 'utf8');
} else if (payload_type === 'image') {
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
return Buffer.from(data);
} else {
throw new Error('Image data must be Uint8Array or Buffer');
}
} else if (payload_type === 'audio') {
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
return Buffer.from(data);
} else {
throw new Error('Audio data must be Uint8Array or Buffer');
}
} else if (payload_type === 'video') {
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
return Buffer.from(data);
} else {
throw new Error('Video data must be Uint8Array or Buffer');
}
} else if (payload_type === 'binary') {
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
return Buffer.from(data);
} else {
throw new Error('Binary data must be Uint8Array or Buffer');
}
} else {
throw new Error(`Unknown payload_type: ${payload_type}`);
}
}
function bufferToBase64(buffer) {
return buffer.toString('base64');
}
deserializeData Implementation
const arrow = require('apache-arrow');
async function deserializeData(data, payload_type, correlation_id) {
if (payload_type === 'text') {
return Buffer.from(data).toString('utf8');
} else if (payload_type === 'dictionary') {
const jsonStr = Buffer.from(data).toString('utf8');
return JSON.parse(jsonStr);
} else if (payload_type === 'arrowtable') {
// Deserialize from Arrow IPC
const buffer = Buffer.from(data);
const table = arrow.tableFromRawBytes(buffer);
return table;
} else if (payload_type === 'jsontable') {
// Deserialize from JSON - returns Array<Object> (row-oriented)
const jsonStr = Buffer.from(data).toString('utf8');
return JSON.parse(jsonStr);
} else if (payload_type === 'image') {
return Buffer.from(data);
} else if (payload_type === 'audio') {
return Buffer.from(data);
} else if (payload_type === 'video') {
return Buffer.from(data);
} else if (payload_type === 'binary') {
return Buffer.from(data);
} else {
throw new Error(`Unknown payload_type: ${payload_type}`);
}
}
fetchWithBackoff Implementation
async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) {
let delay = base_delay;
for (let attempt = 1; attempt <= max_retries; attempt++) {
try {
const response = await fetch(url);
if (response.status === 200) {
console.log(`[Correlation: ${correlation_id}] Successfully fetched data from ${url} on attempt ${attempt}`);
return await response.arrayBuffer();
} else {
throw new Error(`Failed to fetch: ${response.status}`);
}
} catch (e) {
console.log(`[Correlation: ${correlation_id}] Attempt ${attempt} failed: ${e.constructor.name}`);
if (attempt < max_retries) {
await new Promise(resolve => setTimeout(resolve, delay));
delay = Math.min(delay * 2, max_delay);
}
}
}
throw new Error(`Failed to fetch data after ${max_retries} attempts`);
}
plikOneshotUpload Implementation
async function plikOneshotUpload(file_server_url, dataname, data) {
// Get upload id
const url_getUploadID = `${file_server_url}/upload`;
const headers = { 'Content-Type': 'application/json' };
const body = JSON.stringify({ OneShot: true });
const http_response = await fetch(url_getUploadID, {
method: 'POST',
headers,
body
});
const response_json = await http_response.json();
const uploadid = response_json.id;
const uploadtoken = response_json.uploadToken;
// Upload file
const url_upload = `${file_server_url}/file/${uploadid}`;
const form = new FormData();
const blob = new Blob([data]);
form.append('file', blob, dataname);
const upload_headers = {
'X-UploadToken': uploadtoken
};
const upload_response = await fetch(url_upload, {
method: 'POST',
headers: upload_headers,
body: form
});
const upload_json = await upload_response.json();
const fileid = upload_json.id;
const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`;
return {
status: upload_response.status,
uploadid,
fileid,
url
};
}
Python Implementation
Module Structure
# natsbridge.py
import asyncio
import base64
import json
import uuid
import time
from typing import Any, Dict, List, Tuple, Union, Callable
from dataclasses import dataclass, field
from datetime import datetime
try:
import pyarrow as arrow
import pyarrow.parquet as pq
ARROW_AVAILABLE = True
except ImportError:
ARROW_AVAILABLE = False
try:
import aiohttp
import nats
from nats.aio.client import Client as NATSClient
NATS_AVAILABLE = True
except ImportError:
NATS_AVAILABLE = False
DEFAULT_SIZE_THRESHOLD = 1_000_000
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"
@dataclass
class MsgPayloadV1:
"""Message payload structure."""
id: str
dataname: str
payload_type: str
transport: str
encoding: str
size: int
data: Union[str, bytes]
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MsgEnvelopeV1:
"""Message envelope structure."""
correlation_id: str
msg_id: str
timestamp: str
send_to: str
msg_purpose: str
sender_name: str
sender_id: str
receiver_name: str
receiver_id: str
reply_to: str
reply_to_msg_id: str
broker_url: str
metadata: Dict[str, Any] = field(default_factory=dict)
payloads: List[MsgPayloadV1] = field(default_factory=list)
class NATSBridge:
"""Cross-platform NATS bridge implementation."""
def __init__(self, broker_url: str = None, fileserver_url: str = None):
self.broker_url = broker_url or DEFAULT_BROKER_URL
self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
self._nats_client: NATSClient = None
async def smartsend(self, subject: str, data: List[Tuple[str, Any, str]], **kwargs) -> Tuple[Dict, str]:
"""Send data via NATS."""
pass
async def smartreceive(self, msg: Any, **kwargs) -> Dict:
"""Receive and process NATS message."""
pass
smartsend Implementation
import asyncio
import base64
import json
import uuid
from typing import Any, Dict, List, Tuple, Union, Callable
from datetime import datetime
DEFAULT_SIZE_THRESHOLD = 1_000_000
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"
async def smartsend(
subject: str,
data: List[Tuple[str, Any, str]],
broker_url: str = DEFAULT_BROKER_URL,
fileserver_url: str = DEFAULT_FILESERVER_URL,
fileserver_upload_handler: Callable = plik_oneshot_upload,
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
correlation_id: str = None,
msg_purpose: str = "chat",
sender_name: str = "NATSBridge",
receiver_name: str = "",
receiver_id: str = "",
reply_to: str = "",
reply_to_msg_id: str = "",
is_publish: bool = True,
nats_connection: Any = None,
msg_id: str = None,
sender_id: str = None
) -> Tuple[Dict, str]:
"""
Send data via NATS with automatic transport selection.
Args:
subject: NATS subject to publish to
data: List of (dataname, data, type) tuples
**kwargs: Additional options
Returns:
Tuple of (env, env_json_str)
"""
if correlation_id is None:
correlation_id = str(uuid.uuid4())
if msg_id is None:
msg_id = str(uuid.uuid4())
if sender_id is None:
sender_id = str(uuid.uuid4())
print(f"[Correlation: {correlation_id}] Starting smartsend for subject: {subject}")
# Process payloads
payloads = []
for dataname, payload_data, payload_type in data:
payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes)
print(f"[Correlation: {correlation_id}] Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes")
if payload_size < size_threshold:
# Direct path
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
print(f"[Correlation: {correlation_id}] Using direct transport for {payload_size} bytes")
payloads.append({
'id': str(uuid.uuid4()),
'dataname': dataname,
'payload_type': payload_type,
'transport': 'direct',
'encoding': 'base64',
'size': payload_size,
'data': payload_b64,
'metadata': {'payload_bytes': payload_size}
})
else:
# Link path
print(f"[Correlation: {correlation_id}] Using link transport, uploading to fileserver")
response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
if response['status'] != 200:
raise Exception(f"Failed to upload data to fileserver: {response['status']}")
print(f"[Correlation: {correlation_id}] Uploaded to URL: {response['url']}")
payloads.append({
'id': str(uuid.uuid4()),
'dataname': dataname,
'payload_type': payload_type,
'transport': 'link',
'encoding': 'none',
'size': payload_size,
'data': response['url'],
'metadata': {}
})
# Build envelope
env = {
'correlation_id': correlation_id,
'msg_id': msg_id,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'send_to': subject,
'msg_purpose': msg_purpose,
'sender_name': sender_name,
'sender_id': sender_id,
'receiver_name': receiver_name,
'receiver_id': receiver_id,
'reply_to': reply_to,
'reply_to_msg_id': reply_to_msg_id,
'broker_url': broker_url,
'metadata': {},
'payloads': payloads
}
env_json_str = json.dumps(env)
if is_publish:
if nats_connection:
await publish_message(nats_connection, subject, env_json_str, correlation_id)
else:
await publish_message(broker_url, subject, env_json_str, correlation_id)
return env, env_json_str
serializeData Implementation
import base64
import json
from typing import Any
try:
import pyarrow as arrow
import pyarrow.feather as feather
import pyarrow.ipc as ipc
ARROW_AVAILABLE = True
except ImportError:
ARROW_AVAILABLE = False
def _serialize_data(data: Any, payload_type: str) -> bytes:
"""Serialize data to bytes based on type."""
if payload_type == 'text':
if isinstance(data, str):
return data.encode('utf-8')
else:
raise Error('Text data must be a string')
elif payload_type == 'dictionary':
json_str = json.dumps(data)
return json_str.encode('utf-8')
elif payload_type == 'arrowtable':
if not ARROW_AVAILABLE:
raise Error('pyarrow not available for table serialization')
import io
buf = io.BytesIO()
import pandas as pd
if isinstance(data, pd.DataFrame):
# Column-oriented DataFrame to Arrow
table = arrow.Table.from_pandas(data)
sink = arrow.ipc.new_file(buf)
arrow.ipc.write_table(table, sink)
sink.close()
return buf.getvalue()
else:
raise Error('arrowtable data must be a pandas DataFrame')
elif payload_type == 'jsontable':
# data is list[dict] or list (row-oriented)
# Serialize directly to JSON
json_str = json.dumps(data)
return json_str.encode('utf-8')
elif payload_type == 'image':
if isinstance(data, (bytes, bytearray)):
return bytes(data)
else:
raise Error('Image data must be bytes')
elif payload_type == 'audio':
if isinstance(data, (bytes, bytearray)):
return bytes(data)
else:
raise Error('Audio data must be bytes')
elif payload_type == 'video':
if isinstance(data, (bytes, bytearray)):
return bytes(data)
else:
raise Error('Video data must be bytes')
elif payload_type == 'binary':
if isinstance(data, (bytes, bytearray)):
return bytes(data)
else:
raise Error('Binary data must be bytes')
else:
raise Error(f'Unknown payload_type: {payload_type}')
deserializeData Implementation
import base64
import json
from typing import Any
try:
import pyarrow as arrow
import pyarrow.feather as feather
import pyarrow.ipc as ipc
ARROW_AVAILABLE = True
except ImportError:
ARROW_AVAILABLE = False
def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any:
"""Deserialize bytes to data based on type."""
if payload_type == 'text':
return data.decode('utf-8')
elif payload_type == 'dictionary':
json_str = data.decode('utf-8')
return json.loads(json_str)
elif payload_type == 'arrowtable':
if not ARROW_AVAILABLE:
raise Error('pyarrow not available for table deserialization')
import io
buf = io.BytesIO(data)
reader = arrow.ipc.open_file(buf)
return reader.read_all().to_pandas()
elif payload_type == 'jsontable':
# Deserialize from JSON - returns list[dict] (row-oriented)
json_str = data.decode('utf-8')
return json.loads(json_str)
elif payload_type == 'image':
return data
elif payload_type == 'audio':
return data
elif payload_type == 'video':
return data
elif payload_type == 'binary':
return data
else:
raise Error(f'Unknown payload_type: {payload_type}')
fetchWithBackoff Implementation
import asyncio
import aiohttp
from typing import Callable
async def fetch_with_backoff(
url: str,
max_retries: int,
base_delay: int,
max_delay: int,
correlation_id: str
) -> bytes:
"""Fetch URL with exponential backoff."""
delay = base_delay
for attempt in range(1, max_retries + 1):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
print(f"[Correlation: {correlation_id}] Successfully fetched data from {url} on attempt {attempt}")
return await response.read()
else:
raise Exception(f"Failed to fetch: {response.status}")
except Exception as e:
print(f"[Correlation: {correlation_id}] Attempt {attempt} failed: {type(e).__name__}")
if attempt < max_retries:
await asyncio.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
raise Exception(f"Failed to fetch data after {max_retries} attempts")
plikOneshotUpload Implementation
import aiohttp
import json
from typing import Dict, Any
async def plik_oneshot_upload(
file_server_url: str,
dataname: str,
data: bytes
) -> Dict[str, Any]:
"""Upload data to plik server in one-shot mode."""
# Get upload id
async with aiohttp.ClientSession() as session:
url_getUploadID = f"{file_server_url}/upload"
headers = {'Content-Type': 'application/json'}
body = json.dumps({"OneShot": True})
async with session.post(url_getUploadID, headers=headers, data=body) as response:
response_json = await response.json()
uploadid = response_json['id']
uploadtoken = response_json['uploadToken']
# Upload file
url_upload = f"{file_server_url}/file/{uploadid}"
headers = {'X-UploadToken': uploadtoken}
form = aiohttp.FormData()
form.add_field('file', data, filename=dataname, content_type='application/octet-stream')
async with session.post(url_upload, headers=headers, data=form) as upload_response:
upload_json = await upload_response.json()
fileid = upload_json['id']
url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}"
return {
'status': upload_response.status,
'uploadid': uploadid,
'fileid': fileid,
'url': url
}
MicroPython Implementation
Limitations
MicroPython has significant constraints compared to desktop implementations:
| Feature | Desktop | MicroPython |
|---|---|---|
| Memory | Unlimited | ~256KB - 1MB |
| Arrow IPC | ✅ | ❌ (not supported) |
| Async/Await | ✅ | ⚠️ (uasyncio only) |
| Large payloads (>1MB) | ✅ | ❌ (enforced limit) |
| arrowtable | ✅ | ❌ |
| jsontable | ⚠️ (limited) | ⚠️ (limited) |
| Multiple payloads | ✅ | ⚠️ (limited) |
MicroPython Module Structure
# natsbridge_mpy.py (MicroPython)
import network
import time
import json
import base64
import uos
import struct
# Constants
DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"
MAX_PAYLOAD_SIZE = 50000 # Hard limit
# Note: MicroPython uses list[list] for jsontable (row-oriented)
# No DataFrame support - data is always row-oriented
class NATSBridge:
"""MicroPython NATS bridge implementation."""
def __init__(self, broker_url=None, fileserver_url=None):
self.broker_url = broker_url or DEFAULT_BROKER_URL
self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
self._nats_conn = None
def smartsend(self, subject, data, **kwargs):
"""Send data (synchronous)."""
correlation_id = self._generate_uuid()
msg_id = self._generate_uuid()
sender_id = self._generate_uuid()
print(f"[Correlation: {correlation_id}] Starting smartsend")
payloads = []
for dataname, payload_data, payload_type in data:
payload_bytes = self._serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes)
if payload_size > MAX_PAYLOAD_SIZE:
raise MemoryError(f"Payload {dataname} exceeds max size {MAX_PAYLOAD_SIZE}")
if payload_size < DEFAULT_SIZE_THRESHOLD:
# Direct path
payload_b64 = base64.b64encode(payload_bytes).decode('ascii')
payloads.append({
'id': self._generate_uuid(),
'dataname': dataname,
'payload_type': payload_type,
'transport': 'direct',
'encoding': 'base64',
'size': payload_size,
'data': payload_b64
})
else:
# Link path (limited support)
response = self._sync_fileserver_upload(self.fileserver_url, dataname, payload_bytes)
payloads.append({
'id': self._generate_uuid(),
'dataname': dataname,
'payload_type': payload_type,
'transport': 'link',
'encoding': 'none',
'size': payload_size,
'data': response['url']
})
env = {
'correlation_id': correlation_id,
'msg_id': msg_id,
'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()),
'send_to': subject,
'msg_purpose': kwargs.get('msg_purpose', 'chat'),
'sender_name': kwargs.get('sender_name', 'NATSBridge'),
'sender_id': sender_id,
'receiver_name': kwargs.get('receiver_name', ''),
'receiver_id': kwargs.get('receiver_id', ''),
'reply_to': kwargs.get('reply_to', ''),
'reply_to_msg_id': kwargs.get('reply_to_msg_id', ''),
'broker_url': self.broker_url,
'metadata': {},
'payloads': payloads
}
env_json_str = json.dumps(env)
# Publish
self._publish(subject, env_json_str, correlation_id)
return env, env_json_str
def smartreceive(self, msg, **kwargs):
"""Receive and process message (synchronous)."""
env_json_obj = json.loads(msg.payload)
correlation_id = env_json_obj['correlation_id']
payloads_list = []
for payload in env_json_obj['payloads']:
transport = payload['transport']
dataname = payload['dataname']
if transport == 'direct':
payload_b64 = payload['data']
payload_bytes = base64.b64decode(payload_b64)
data_type = payload['payload_type']
data = self._deserialize_data(payload_bytes, data_type)
payloads_list.append((dataname, data, data_type))
elif transport == 'link':
url = payload['data']
downloaded_data = self._sync_fileserver_download(
url,
kwargs.get('max_retries', 3),
kwargs.get('base_delay', 100),
kwargs.get('max_delay', 1000),
correlation_id
)
data_type = payload['payload_type']
data = self._deserialize_data(downloaded_data, data_type)
payloads_list.append((dataname, data, data_type))
env_json_obj['payloads'] = payloads_list
return env_json_obj
def _serialize_data(self, data, payload_type):
"""Serialize data (MicroPython version - no arrowtable support)."""
if payload_type == 'text':
return data.encode('utf-8')
elif payload_type == 'dictionary':
return json.dumps(data).encode('utf-8')
elif payload_type == 'jsontable':
# data is list[list] (row-oriented)
return json.dumps(data).encode('utf-8')
elif payload_type in ('image', 'audio', 'video', 'binary'):
return bytes(data)
else:
raise ValueError(f"Unknown payload_type: {payload_type}")
def _deserialize_data(self, data, payload_type):
"""Deserialize data (MicroPython version)."""
if payload_type == 'text':
return data.decode('utf-8')
elif payload_type == 'dictionary':
return json.loads(data.decode('utf-8'))
elif payload_type == 'jsontable':
# Returns list[list] (row-oriented)
return json.loads(data.decode('utf-8'))
elif payload_type in ('image', 'audio', 'video', 'binary'):
return data
else:
raise ValueError(f"Unknown payload_type: {payload_type}")
def _generate_uuid(self):
"""Generate simple UUID (MicroPython compatible)."""
return 'mp-%04x%04x-%04x-%04x-%04x-%04x%04x%04x' % (
time.time_ns() // (10**6) % 0xFFFFFFFF,
time.time_ns() % 0xFFFFFFFF,
time.time_ns() >> 32 & 0xFFFF,
time.time_ns() >> 48 & 0xFFFF,
time.time_ns() >> 64 & 0xFFFF,
time.time_ns() >> 80 & 0xFFFF,
time.time_ns() >> 96 & 0xFFFF,
time.time_ns() >> 112 & 0xFFFF
)
def _sync_fileserver_upload(self, url, dataname, data):
"""Synchronous file upload (limited)."""
# Simplified implementation for MicroPython
# In practice, would use network.HTTP or similar
raise NotImplementedError("File upload not implemented in MicroPython")
def _sync_fileserver_download(self, url, max_retries, base_delay, max_delay, correlation_id):
"""Synchronous file download with backoff."""
# Simplified implementation for MicroPython
raise NotImplementedError("File download not implemented in MicroPython")
def _publish(self, subject, message, correlation_id):
"""Publish message to NATS."""
# Simplified implementation for MicroPython
raise NotImplementedError("NATS publishing not implemented in MicroPython")
Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
NATS_URL |
nats://localhost:4222 |
NATS server URL |
FILESERVER_URL |
http://localhost:8080 |
HTTP file server URL |
SIZE_THRESHOLD |
1000000 |
Size threshold in bytes (1MB) |
MicroPython Configuration
# micropython.conf
NATS_URL = "nats://broker.local:4222"
FILESERVER_URL = "http://fileserver.local:8080"
SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices
MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython
Performance Considerations
Zero-Copy Reading
| Platform | Strategy |
|---|---|
| Julia | Arrow.read() with memory-mapped files |
| JavaScript | ArrayBuffer with DataView |
| Python | pyarrow memory mapping |
| MicroPython | Not available (streaming only) |
Exponential Backoff
All platforms implement exponential backoff for HTTP downloads:
# Python
async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id):
delay = base_delay
for attempt in range(1, max_retries + 1):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.read()
except Exception as e:
if attempt < max_retries:
await asyncio.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
raise Exception("Failed to fetch after max retries")
Correlation ID Logging
All platforms use correlation IDs for distributed tracing:
[timestamp] [Correlation: abc123] Message published to subject
Serialization Performance
| Format | Use Case | Pros | Cons |
|---|---|---|---|
arrowtable |
Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library, not supported in MicroPython |
jsontable |
Small/medium tabular data | Human-readable, universal support, works in MicroPython | Slower, larger size, no schema enforcement |
Testing
Test File Organization
| Platform | Sender Tests | Receiver Tests |
|---|---|---|
| Julia | test/test_julia_*_sender.jl |
test/test_julia_*_receiver.jl |
| JavaScript | test/test_js_*_sender.js |
test/test_js_*_receiver.js |
| Python | test/test_py_*_sender.py |
test/test_py_*_receiver.py |
Run Tests
# Julia
julia test/test_julia_text_sender.jl
julia test/test_julia_text_receiver.jl
# JavaScript (Node.js)
node test/test_js_text_sender.js
node test/test_js_text_receiver.js
# Python
python3 test/test_py_text_sender.py
python3 test/test_py_text_receiver.py
Troubleshooting
Common Issues
-
NATS Connection Failed
- Ensure NATS server is running
- Check
broker_urlconfiguration
-
HTTP Upload Failed
- Ensure file server is running
- Check
fileserver_urlconfiguration - Verify upload permissions
-
Arrow IPC Deserialization Error
- Ensure data is properly serialized to Arrow format
- Check Arrow version compatibility
- MicroPython doesn't support Arrow IPC
-
Memory Constraints (MicroPython)
- Reduce
size_threshold - Use direct transport only (< 100KB)
- Avoid large payloads
- Use
jsontableinstead ofarrowtable(arrowtable not supported)
- Reduce
-
Row-Oriented vs Column-Oriented Conversion Issues
- Julia/Python: DataFrames are column-oriented; when sending
jsontable, they are converted to row-oriented JSON - JavaScript/MicroPython: Data is natively row-oriented
- When receiving
jsontablein Julia/Python, JSON is automatically converted back to column-oriented DataFrame
- Julia/Python: DataFrames are column-oriented; when sending
Summary
This cross-platform NATS bridge provides:
- High-Level API Parity: Identical
smartsend()andsmartreceive()signatures across all platforms - Idiomatic Implementations:
- Julia: Multiple dispatch, struct-based design, native Arrow IPC
- JavaScript: Async/await, prototype-based utilities, class-based NATS client
- Python: Class-based design with dataclasses, type hints, async/await
- MicroPython: Synchronous API, memory-constrained optimizations
- Message Format Consistency: Identical JSON schemas across all platforms
- Handler Abstraction: File server operations abstracted through configurable handlers
- Platform-Specific Optimizations:
- Arrow IPC (
arrowtable): Efficient binary format for large tabular data (not supported in MicroPython) - JSON (
jsontable): Universal human-readable format for smaller tables (works in all platforms)
- Arrow IPC (
- Row-Oriented ↔ Column-Oriented Conversion: Automatic conversion between row-oriented (JS, MicroPython) and column-oriented (Julia DataFrame, Python pandas) formats when using
jsontable
The Julia implementation in src/NATSBridge.jl serves as the ground truth for API design and behavior.
Datatype Summary
| Datatype | Serialization | Use Case | Encoding | Supported Platforms |
|---|---|---|---|---|
arrowtable |
Apache Arrow IPC | Large tabular data, schema-preserving | arrow-ipc → base64 |
Julia, JavaScript, Python |
jsontable |
JSON | Small/medium tabular data, human-readable | json → base64 |
Julia, JavaScript, Python, MicroPython |