42 KiB
Cross-Platform Architecture Documentation: Bi-Directional Data Bridge
Overview
This document describes the architecture for a high-performance, bi-directional data bridge using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with high-level API parity while maintaining idiomatic implementations for each language.
Supported Platforms:
- Julia - Ground truth implementation with full feature set
- JavaScript - Node.js and browser-compatible implementation
- Python/MicroPython - Desktop and embedded-compatible implementation
Cross-Platform Design Principles
- High-Level API Parity: All three platforms expose the same
smartsend()andsmartreceive()functions with identical signatures and behavior - Idiomatic Implementations: Each platform uses its native patterns (multiple dispatch in Julia, async/prototype in JS, class-based in Python)
- Message Format Consistency: The
msg_envelope_v1andmsg_payload_v1JSON schemas are identical across all platforms - Handler Function Abstraction: File server operations are abstracted through handler functions for backend flexibility
High-Level API Standard (Cross-Platform)
Unified API Signature
All three platforms expose the same high-level API:
Input Format (smartsend):
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
Output Format (smartreceive):
{
"correlation_id": "...",
"msg_id": "...",
"timestamp": "...",
"send_to": "...",
"msg_purpose": "...",
"sender_name": "...",
"sender_id": "...",
"receiver_name": "...",
"receiver_id": "...",
"reply_to": "...",
"reply_to_msg_id": "...",
"broker_url": "...",
"metadata": {...},
"payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
}
Supported Payload Types
| Type | Julia | JavaScript | Python/MicroPython |
|---|---|---|---|
text |
String |
string |
str |
dictionary |
Dict, NamedTuple |
Object, Array |
dict, list |
arrowtable |
DataFrame, Arrow.Table |
Array<Object> (input) → Buffer (Arrow IPC) |
pandas.DataFrame, bytes (Arrow IPC) |
jsontable |
Vector{NamedTuple}, Vector{Dict} |
Array<Object> |
list[dict], list |
image |
Vector{UInt8} |
Uint8Array, Buffer |
bytes, bytearray |
audio |
Vector{UInt8} |
Uint8Array, Buffer |
bytes, bytearray |
video |
Vector{UInt8} |
Uint8Array, Buffer |
bytes, bytearray |
binary |
Vector{UInt8}, IOBuffer |
Uint8Array, Buffer |
bytes, bytearray, io.BytesIO |
Cross-Platform API Examples
Julia:
using NATSBridge
# Send
env, env_json_str = smartsend(
"/chat",
[("message", "Hello!", "text"), ("image", image_bytes, "image")],
broker_url="nats://localhost:4222"
)
# Receive - returns JSON.Object{String, Any}
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}}
# Access payloads: for (dataname, data, type) in env["payloads]
JavaScript:
const NATSBridge = require('natsbridge');
// Send
const [env, env_json_str] = await NATSBridge.smartsend(
"/chat",
[
["message", "Hello!", "text"],
["image", imageBuffer, "image"]
],
{ broker_url: "nats://localhost:4222" }
);
// Receive - returns Promise<object>
const env = await NATSBridge.smartreceive(msg, {
fileserver_download_handler: fetchWithBackoff
});
// env is an object with "payloads" field containing Array of arrays
// Access payloads: for (const [dataname, data, type] of env.payloads)
Python:
from natsbridge import NATSBridge
# Send
env, env_json_str = NATSBridge.smartsend(
"/chat",
[("message", "Hello!", "text"), ("image", image_bytes, "image")],
broker_url="nats://localhost:4222"
)
# Receive - returns Tuple[Dict, str]
env = NATSBridge.smartreceive(
msg,
fileserver_download_handler=fetch_with_backoff
)
# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
# Access payloads: for dataname, data, type_ in env["payloads"]
MicroPython:
from natsbridge import NATSBridge
# Send (limited to direct transport due to memory constraints)
env, env_json_str = NATSBridge.smartsend(
"/chat",
[("message", "Hello!", "text")],
broker_url="nats://localhost:4222"
)
Architecture Diagram (Cross-Platform)
flowchart TB
subgraph JuliaApp["Julia Application"]
JuliaAppCode[App Code]
JuliaBridge[NATSBridge.jl]
JuliaNATS[<b>NATS.jl</b>]
end
subgraph JSApp["JavaScript Application"]
JSAppCode[App Code]
JSBridge[NATSBridge.js]
JSNATS[<b>nats.js</b>]
end
subgraph PythonApp["Python/MicroPython Application"]
PythonAppCode[App Code]
PythonBridge[NATSBridge.py]
PythonNATS[<b>nats.py</b>]
end
subgraph Infrastructure["Infrastructure"]
NATS[<b>NATS Server</b><br/>Message Broker]
FileServer[<b>HTTP File Server</b><br/>Upload/Download]
end
JuliaAppCode --> JuliaBridge
JuliaBridge --> JuliaNATS
JSAppCode --> JSBridge
JSBridge --> JSNATS
PythonAppCode --> PythonBridge
PythonBridge --> PythonNATS
JuliaNATS --> NATS
JSNATS --> NATS
PythonNATS --> NATS
NATS --> JuliaNATS
NATS --> JSNATS
NATS --> PythonNATS
JuliaBridge -.->|HTTP POST upload| FileServer
JSBridge -.->|HTTP POST upload| FileServer
PythonBridge -.->|HTTP POST upload| FileServer
FileServer -.->|HTTP GET download| JuliaBridge
FileServer -.->|HTTP GET download| JSBridge
FileServer -.->|HTTP GET download| PythonBridge
style JuliaApp fill:#c5e1a5
style JSApp fill:#bbdefb
style PythonApp fill:#f8bbd0
style NATS fill:#fff3e0
style FileServer fill:#f3e5f5
System Components
1. msg_envelope_v1 - Message Envelope
JSON Schema (Identical Across All Platforms):
{
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
"content_type": "application/octet-stream",
"content_length": 123456
},
"payloads": [
{
"id": "uuid4",
"dataname": "login_image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,
"data": "base64-encoded-string",
"metadata": {
"checksum": "sha256_hash"
}
},
{
"id": "uuid4",
"dataname": "large_arrow_table",
"payload_type": "arrowtable",
"transport": "link",
"encoding": "arrow-ipc",
"size": 524288,
"data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow",
"metadata": {}
},
{
"id": "uuid4",
"dataname": "json_table",
"payload_type": "jsontable",
"transport": "direct",
"encoding": "json",
"size": 1024,
"data": "[{\"id\": 1, \"name\": \"Alice\"}, {\"id\": 2, \"name\": \"Bob\"}]",
"metadata": {}
}
]
}
2. msg_payload_v1 - Payload Structure
JSON Schema (Identical Across All Platforms):
{
"id": "uuid4",
"dataname": "login_image",
"payload_type": "image | dictionary | arrowtable | jsontable | text | audio | video | binary",
"transport": "direct | link",
"encoding": "none | json | base64 | arrow-ipc",
"size": 15433,
"data": "base64-encoded-string | http-url | json-string",
"metadata": {
"checksum": "sha256_hash"
}
}
3. Transport Strategy Decision Logic (Cross-Platform)
┌─────────────────────────────────────────────────────────────┐
│ smartsend Function (All Platforms) │
│ Accepts: [(dataname1, data1, type1), ...] │
│ (Type is per payload, not standalone) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ For each payload: │
│ 1. Extract type from tuple/array │
│ 2. Serialize based on type │
│ 3. Check payload size │
└─────────────────────────────────────────────────────────────┘
│
┌───────────┴────────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Direct Path │ │ Link Path │
│ (< 1MB) │ │ (>= 1MB) │
│ │ │ │
│ • Serialize │ │ • Serialize │
│ to buffer │ │ to buffer │
│ • Base64/JSON│ │ • Upload to │
│ encode │ │ HTTP Server│
│ • Publish to │ │ • Publish to │
│ NATS │ │ NATS with │
│ (in msg) │ │ URL │
└──────────────┘ └──────────────┘
Platform-Specific Implementations
Julia Implementation
Architecture Patterns
Multiple Dispatch: Julia's core strength is leveraged through function overloading:
# publish_message has two overloads based on argument types
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url)
publish_message(conn, subject, message, correlation_id)
end
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message)
log_trace(correlation_id, "Message published to $subject")
finally
NATS.drain(conn)
end
end
Struct-Based Data Models:
struct msg_payload_v1
id::String
dataname::String
payload_type::String
transport::String
encoding::String
size::Integer
data::Any
metadata::Dict{String, Any}
end
struct msg_envelope_v1
correlation_id::String
msg_id::String
timestamp::String
send_to::String
msg_purpose::String
sender_name::String
sender_id::String
receiver_name::String
receiver_id::String
reply_to::String
reply_to_msg_id::String
broker_url::String
metadata::Dict{String, Any}
payloads::Vector{msg_payload_v1}
end
Dependencies
| Package | Purpose |
|---|---|
NATS.jl |
Core NATS functionality |
Arrow.jl |
Arrow IPC serialization |
JSON.jl |
JSON parsing |
HTTP.jl |
HTTP client for file server |
UUIDs.jl |
UUID generation |
Dates.jl |
Timestamps |
Base64 |
Base64 encoding |
File Server Handler Signatures
# Upload handler
fileserver_upload_handler(
fileserver_url::String,
dataname::String,
data::Vector{UInt8}
)::Dict{String, Any}
# Download handler
fileserver_download_handler(
url::String,
max_retries::Int,
base_delay::Int,
max_delay::Int,
correlation_id::String
)::Vector{UInt8}
Key Functions
# Main send/receive functions
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, Any, String}, 1};
broker_url::String = DEFAULT_BROKER_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::String = string(uuid4()),
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true,
NATS_connection::Union{NATS.Connection, Nothing} = nothing,
msg_id::String = string(uuid4()),
sender_id::String = string(uuid4())
)::Tuple{msg_envelope_v1, String}
function smartreceive(
msg::NATS.Msg;
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)::JSON.Object{String, Any}
Serialization Logic for Tables
# Serialize table data based on payload_type
function _serialize_table_data(data::Any, payload_type::String)::Vector{UInt8}
if payload_type == "arrowtable"
# Serialize to Apache Arrow IPC format
buffer = IOBuffer()
Arrow.write(buffer, data)
return take!(buffer)
elseif payload_type == "jsontable"
# Serialize to JSON format
json_str = JSON.json(data)
return Vector{UInt8}(json_str)
else
throw(ArgumentError("Unknown payload_type: $payload_type"))
end
end
# Deserialize table data based on payload_type
function _deserialize_table_data(data::Vector{UInt8}, payload_type::String)::Any
if payload_type == "arrowtable"
# Deserialize from Apache Arrow IPC format
buffer = Buffer(data)
return Arrow.read(buffer)
elseif payload_type == "jsontable"
# Deserialize from JSON format
json_str = String(data)
return JSON.parse(json_str)
else
throw(ArgumentError("Unknown payload_type: $payload_type"))
end
end
JavaScript Implementation
Architecture Patterns
Async/Await Pattern: JavaScript uses async/await for non-blocking I/O:
// smartsend is async and returns a Promise
async function smartsend(subject, data, options = {}) {
const {
broker_url = DEFAULT_BROKER_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler = plikOneshotUpload,
size_threshold = DEFAULT_SIZE_THRESHOLD,
correlation_id = generateUUID(),
msg_purpose = "chat",
sender_name = "NATSBridge",
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = "",
is_publish = true,
nats_connection = null,
msg_id = generateUUID(),
sender_id = generateUUID()
} = options;
// Process payloads
const payloads = [];
for (const [dataname, payloadData, payloadType] of data) {
const payloadBytes = await serializeData(payloadData, payloadType);
const payloadSize = payloadBytes.byteLength;
if (payloadSize < size_threshold) {
// Direct path
const payloadB64 = base64Encode(payloadBytes);
payloads.push({
id: generateUUID(),
dataname,
payload_type: payloadType,
transport: "direct",
encoding: "base64",
size: payloadSize,
data: payloadB64
});
} else {
// Link path
const response = await fileserver_upload_handler(
fileserver_url, dataname, payloadBytes
);
payloads.push({
id: generateUUID(),
dataname,
payload_type: payloadType,
transport: "link",
encoding: "none",
size: payloadSize,
data: response.url
});
}
}
const env = buildEnvelope(subject, payloads, {
correlation_id, msg_id, msg_purpose,
sender_name, sender_id, receiver_name,
receiver_id, reply_to, reply_to_msg_id,
broker_url
});
const env_json_str = JSON.stringify(env);
if (is_publish) {
if (nats_connection) {
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
} else {
await publishMessage(broker_url, subject, env_json_str, correlation_id);
}
}
return [env, env_json_str];
}
Prototype-Based Utilities:
// NATS client wrapper (prototype-based)
class NATSClient {
constructor(url) {
this.url = url;
this.connection = null;
}
async connect() {
this.connection = await nats.connect({ servers: this.url });
return this.connection;
}
async publish(subject, message) {
if (!this.connection) {
await this.connect();
}
await this.connection.publish(subject, message);
}
async close() {
if (this.connection) {
this.connection.close();
}
}
}
Dependencies (Node.js)
| Package | Purpose |
|---|---|
nats |
Core NATS functionality (nats.js) |
crypto (built-in) |
UUID generation (Node.js) |
node-fetch or axios |
HTTP client for file server |
apache-arrow |
Arrow IPC serialization |
Dependencies (Browser)
| Package | Purpose |
|---|---|
nats |
Browser-compatible NATS client |
crypto (built-in) |
UUID generation (browser) |
fetch (native) |
HTTP client for file server |
apache-arrow |
Arrow IPC serialization |
Dependencies (MicroPython)
| Module | Purpose |
|---|---|
nats (custom) |
MicroPython NATS client |
time |
Timestamps |
uos |
File operations |
base64 |
Base64 encoding |
File Server Handler Signatures
// Upload handler - async function returning Promise
async function fileserver_upload_handler(
fileserver_url,
dataname,
data // Uint8Array
) {
// Returns: { status, uploadid, fileid, url }
}
// Download handler - async function returning Promise
async function fileserver_download_handler(
url,
max_retries,
base_delay,
max_delay,
correlation_id
) {
// Returns: Uint8Array
}
Key Functions
// Main send/receive functions
async function smartsend(subject, data, options = {}) {
// data: Array of [dataname, data, type] tuples
// Returns: Promise<[env, env_json_str]>
}
async function smartreceive(msg, options = {}) {
// msg: NATS message object
// Returns: Promise<env_object>
}
// Utility functions
async function serializeData(data, payload_type) {
// Returns: Uint8Array
}
async function deserializeData(data, payload_type) {
// Returns: deserialized data
}
async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) {
// Returns: Uint8Array
}
Serialization Logic for Tables
// Serialize table data based on payload_type
async function serializeTableData(data, payload_type) {
if (payload_type === "arrowtable") {
// Serialize to Apache Arrow IPC format
const schema = new arrow.Schema([...]); // Define schema
const arr = arrow.tableToArrowTable(data, schema);
const buffer = arrow.RecordBatch.from(arr).toBuffer();
return new Uint8Array(buffer);
} else if (payload_type === "jsontable") {
// Serialize to JSON format
const jsonStr = JSON.stringify(data);
return new TextEncoder().encode(jsonStr);
} else {
throw new Error(`Unknown payload_type: ${payload_type}`);
}
}
// Deserialize table data based on payload_type
async function deserializeTableData(data, payload_type) {
if (payload_type === "arrowtable") {
// Deserialize from Apache Arrow IPC format
const buffer = arrow.arrayBufferToBuffer(data.buffer);
const batch = arrow.RecordBatch.deserialize(buffer);
return arrow.tableFromBatch(batch);
} else if (payload_type === "jsontable") {
// Deserialize from JSON format
const jsonStr = new TextDecoder().decode(data);
return JSON.parse(jsonStr);
} else {
throw new Error(`Unknown payload_type: ${payload_type}`);
}
}
Python/MicroPython Implementation
Architecture Patterns
Class-Based Design: Python uses classes for stateful operations:
class NATSBridge:
"""Cross-platform NATS bridge implementation."""
DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"
def __init__(self, broker_url=None, fileserver_url=None):
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL
self._nats_client = None
async def smartsend(self, subject, data, **kwargs):
"""
Send data via NATS with automatic transport selection.
Args:
subject: NATS subject to publish to
data: List of (dataname, data, type) tuples
**kwargs: Additional options (broker_url, fileserver_url, etc.)
Returns:
Tuple of (env, env_json_str)
"""
# Extract options with defaults
options = self._merge_options(kwargs)
# Process payloads
payloads = []
for dataname, payload_data, payload_type in data:
payload_bytes = self._serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes)
if payload_size < options['size_threshold']:
# Direct path
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
payloads.append({
'id': uuid.uuid4().hex,
'dataname': dataname,
'payload_type': payload_type,
'transport': 'direct',
'encoding': 'base64',
'size': payload_size,
'data': payload_b64
})
else:
# Link path
response = await options['fileserver_upload_handler'](
options['fileserver_url'], dataname, payload_bytes
)
payloads.append({
'id': uuid.uuid4().hex,
'dataname': dataname,
'payload_type': payload_type,
'transport': 'link',
'encoding': 'none',
'size': payload_size,
'data': response['url']
})
# Build envelope
env = self._build_envelope(subject, payloads, options)
env_json_str = json.dumps(env)
if options['is_publish']:
await self._publish_message(
subject, env_json_str, options['correlation_id'],
nats_connection=options.get('nats_connection')
)
return env, env_json_str
async def smartreceive(self, msg, **kwargs):
"""
Receive and process NATS message.
Args:
msg: NATS message object
**kwargs: Additional options (fileserver_download_handler, etc.)
Returns:
Dict with envelope metadata and payloads
"""
# Parse envelope
env_json_obj = json.loads(msg.payload)
# Process payloads
payloads_list = []
for payload in env_json_obj['payloads']:
transport = payload['transport']
dataname = payload['dataname']
if transport == 'direct':
payload_b64 = payload['data']
payload_bytes = base64.b64decode(payload_b64)
data_type = payload['payload_type']
data = self._deserialize_data(payload_bytes, data_type)
payloads_list.append((dataname, data, data_type))
elif transport == 'link':
url = payload['data']
downloaded_data = await options['fileserver_download_handler'](
url,
options['max_retries'],
options['base_delay'],
options['max_delay'],
env_json_obj['correlation_id']
)
data_type = payload['payload_type']
data = self._deserialize_data(downloaded_data, data_type)
payloads_list.append((dataname, data, data_type))
env_json_obj['payloads'] = payloads_list
return env_json_obj
Dataclass for Type Safety:
from dataclasses import dataclass, field
from typing import Any, Dict, List, Tuple, Union
@dataclass
class MsgPayloadV1:
"""Message payload structure."""
id: str
dataname: str
payload_type: str
transport: str
encoding: str
size: int
data: Union[str, bytes] # URL for link, base64 for direct
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MsgEnvelopeV1:
"""Message envelope structure."""
correlation_id: str
msg_id: str
timestamp: str
send_to: str
msg_purpose: str
sender_name: str
sender_id: str
receiver_name: str
receiver_id: str
reply_to: str
reply_to_msg_id: str
broker_url: str
metadata: Dict[str, Any] = field(default_factory=dict)
payloads: List[MsgPayloadV1] = field(default_factory=list)
Dependencies (Desktop Python)
| Package | Purpose |
|---|---|
nats-py |
Core NATS functionality |
uuid |
UUID generation (stdlib) |
aiohttp or requests |
HTTP client for file server |
pyarrow |
Arrow IPC serialization |
pandas |
DataFrame support (optional) |
python-dateutil |
Timestamps |
base64 |
Base64 encoding (stdlib) |
Dependencies (MicroPython)
| Module | Purpose |
|---|---|
network |
NATS connection (custom) |
time |
Timestamps |
uos |
File operations |
base64 |
Base64 encoding |
json |
JSON parsing |
struct |
Binary data handling |
MicroPython Limitations:
- No Arrow IPC support (memory constraints)
- Only direct transport (< 1MB threshold enforced)
- Simplified UUID generation
- No async/await (use callbacks or uasyncio)
File Server Handler Signatures
# Upload handler - async function
async def fileserver_upload_handler(
fileserver_url: str,
dataname: str,
data: bytes
) -> Dict[str, Any]:
"""
Upload data to file server.
Args:
fileserver_url: Base URL of file server
dataname: Name of the file
data: Binary data
Returns:
Dict with keys: 'status', 'uploadid', 'fileid', 'url'
"""
pass
# Download handler - async function
async def fileserver_download_handler(
url: str,
max_retries: int,
base_delay: int,
max_delay: int,
correlation_id: str
) -> bytes:
"""
Download data from URL with exponential backoff.
Args:
url: URL to download from
max_retries: Maximum retry attempts
base_delay: Initial delay in ms
max_delay: Maximum delay in ms
correlation_id: Correlation ID for logging
Returns:
Downloaded bytes
"""
pass
Key Functions
# Main send/receive functions (standalone or class methods)
async def smartsend(
subject: str,
data: List[Tuple[str, Any, str]],
broker_url: str = DEFAULT_BROKER_URL,
fileserver_url: str = DEFAULT_FILESERVER_URL,
fileserver_upload_handler: Callable = plik_oneshot_upload,
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
correlation_id: str = None,
msg_purpose: str = "chat",
sender_name: str = "NATSBridge",
receiver_name: str = "",
receiver_id: str = "",
reply_to: str = "",
reply_to_msg_id: str = "",
is_publish: bool = True,
nats_connection: Any = None,
msg_id: str = None,
sender_id: str = None
) -> Tuple[Dict, str]:
"""Send data via NATS."""
pass
async def smartreceive(
msg: Any,
fileserver_download_handler: Callable = fetch_with_backoff,
max_retries: int = 5,
base_delay: int = 100,
max_delay: int = 5000
) -> Dict:
"""Receive and process NATS message."""
pass
# Utility functions
def _serialize_data(data: Any, payload_type: str) -> bytes:
"""Serialize data to bytes."""
pass
def _deserialize_data(data: bytes, payload_type: str) -> Any:
"""Deserialize bytes to data."""
pass
async def fetch_with_backoff(
url: str,
max_retries: int,
base_delay: int,
max_delay: int,
correlation_id: str
) -> bytes:
"""Fetch URL with exponential backoff."""
pass
Serialization Logic for Tables
# Serialize table data based on payload_type
def serialize_table_data(data: Any, payload_type: str) -> bytes:
if payload_type == "arrowtable":
# Serialize to Apache Arrow IPC format
import pyarrow as pa
import pyarrow.feather as feather
import io
if isinstance(data, pd.DataFrame):
table = pa.Table.from_pandas(data)
buffer = io.BytesIO()
feather.write_feather(table, buffer)
return buffer.getvalue()
else:
raise TypeError("Expected pandas DataFrame for arrowtable")
elif payload_type == "jsontable":
# Serialize to JSON format
if isinstance(data, list) and all(isinstance(row, dict) for row in data):
return json.dumps(data).encode('utf-8')
else:
raise TypeError("Expected list of dicts for jsontable")
else:
raise ValueError(f"Unknown payload_type: {payload_type}")
# Deserialize table data based on payload_type
def deserialize_table_data(data: bytes, payload_type: str) -> Any:
if payload_type == "arrowtable":
# Deserialize from Apache Arrow IPC format
import pyarrow as pa
import pyarrow.feather as feather
import io
buffer = io.BytesIO(data)
table = feather.read_table(buffer)
return table.to_pandas()
elif payload_type == "jsontable":
# Deserialize from JSON format
json_str = data.decode('utf-8')
return json.loads(json_str)
else:
raise ValueError(f"Unknown payload_type: {payload_type}")
Platform Comparison Matrix
| Feature | Julia | JavaScript | Python | MicroPython |
|---|---|---|---|---|
| Multiple Dispatch | ✅ Native | ❌ (Prototypes) | ❌ (Overload via @overload) |
❌ |
| Async/Await | ❌ (Tasks) | ✅ Native | ✅ Native | ⚠️ (uasyncio) |
| Type Safety | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ |
| Memory Management | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) |
| Arrow IPC | ✅ Native | ✅ (arrow package) | ✅ (pyarrow) | ❌ |
| JSON Serialization | ✅ (JSON.jl) | ✅ (native) | ✅ (json) | ✅ (json) |
| arrowtable Support | ✅ | ✅ | ✅ | ❌ |
| jsontable Support | ✅ | ✅ | ✅ | ✅ |
| Direct Transport | ✅ | ✅ | ✅ | ✅ |
| Link Transport | ✅ | ✅ | ✅ | ⚠️ (Limited) |
| Handler Functions | ✅ | ✅ | ✅ | ✅ |
| Cross-Platform API | ✅ | ✅ | ✅ | ✅ |
Implementation Details by Platform
Julia: Multiple Dispatch Pattern
# Function overloading based on argument types
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
# Creates new connection
end
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
# Uses pre-existing connection
end
# Type-specific serialization
function _serialize_data(data::String, payload_type::String)
# Text handling
end
function _serialize_data(data::Dict, payload_type::String)
# Dictionary handling
end
function _serialize_data(data::DataFrame, payload_type::String)
# Table handling - arrowtable
end
function _serialize_data(data::Vector{NamedTuple}, payload_type::String)
# Table handling - jsontable
end
JavaScript: Prototype + Async Pattern
// Class-based NATS client
class NATSClient {
constructor(url) {
this.url = url;
}
async connect() {
// Connection logic
}
async publish(subject, message) {
// Publish logic
}
}
// Module-level utility functions
function generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => {
const r = Math.random() * 16 | 0;
return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16);
});
}
async function serializeData(data, payload_type) {
// Serialization logic for arrowtable and jsontable
}
Python: Class-Based Pattern
class NATSBridge:
"""Main bridge class."""
def __init__(self, broker_url=None):
self.broker_url = broker_url or DEFAULT_BROKER_URL
async def smartsend(self, subject, data, **kwargs):
"""Send data."""
pass
async def smartreceive(self, msg, **kwargs):
"""Receive message."""
pass
# Module-level convenience functions
def smartsend(subject, data, **kwargs):
"""Convenience function using default NATSBridge instance."""
bridge = NATSBridge()
return await bridge.smartsend(subject, data, **kwargs)
def smartreceive(msg, **kwargs):
"""Convenience function using default NATSBridge instance."""
bridge = NATSBridge()
return await bridge.smartreceive(msg, **kwargs)
Scenario Implementations (Cross-Platform)
Scenario 1: Command & Control (Small Dictionary)
| Platform | Code |
|---|---|
| Julia | julia<br>config = Dict("step_size" => 0.01)<br>env, env_json_str = smartsend("control", [("config", config, "dictionary")]) |
| JavaScript | javascript<br>const config = { step_size: 0.01 };<br>[env, env_json_str] = await smartsend("control", [["config", config, "dictionary"]]); |
| Python | python<br>config = {"step_size": 0.01}<br>env, env_json_str = await smartsend("control", [("config", config, "dictionary")]) |
Scenario 2: Deep Dive Analysis (Large Arrow Table)
| Platform | Code |
|---|---|
| Julia | julia<br>df = DataFrame(id=1:1000000, value=rand(1000000))<br>env, env_json_str = smartsend("analysis", [("table_data", df, "arrowtable")]) |
| JavaScript | javascript<br>const df = [{ id: 1, value: 0.5 }, ...];<br>[env, env_json_str] = await smartsend("analysis", [["table_data", df, "arrowtable"]]); |
| Python | python<br>import pandas as pd<br>df = pd.DataFrame({"id": range(1000000), "value": np.random.rand(1000000)})<br>env, env_json_str = await smartsend("analysis", [("table_data", df, "arrowtable")]) |
Scenario 3: Chat System (Multi-Payload)
| Platform | Code |
|---|---|
| Julia | julia<br>chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]<br>env, env_json_str = smartsend("chat", chat) |
| JavaScript | javascript<br>const chat = [["text", "Hello!", "text"], ["image", imgBuffer, "image"]];<br>[env, env_json_str] = await smartsend("chat", chat); |
| Python | python<br>chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]<br>env, env_json_str = await smartsend("chat", chat) |
Scenario 4: JSON Table Transfer (Cross-Platform)
| Platform | Code |
|---|---|
| Julia | julia<br>rows = [Dict("id" => 1, "name" => "Alice"), Dict("id" => 2, "name" => "Bob")]<br>env, env_json_str = smartsend("data", [("users", rows, "jsontable")]) |
| JavaScript | javascript<br>const users = [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }];<br>[env, env_json_str] = await smartsend("data", [["users", users, "jsontable"]]); |
| Python | python<br>users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]<br>env, env_json_str = await smartsend("data", [("users", users, "jsontable")]) |
Scenario 5: Smart Transport Selection
The smartsend function automatically selects the transport method based on payload size:
-
Direct Transport (< 1MB): Payload is serialized and embedded directly in the NATS message
arrowtable: Serialized to Arrow IPC, base64 encodedjsontable: Serialized to JSON, base64 encodeddictionary: Serialized to JSON, base64 encodedtext: Serialized to UTF-8, base64 encodedimage/audio/video/binary: Base64 encoded
-
Link Transport (>= 1MB): Payload is uploaded to HTTP file server, URL embedded in message
- All types supported
- Receiver downloads from URL and deserializes
Performance Considerations (Cross-Platform)
Zero-Copy Reading
| Platform | Strategy |
|---|---|
| Julia | Arrow.read() with memory-mapped files |
| JavaScript | ArrayBuffer with DataView |
| Python | pyarrow memory mapping |
| MicroPython | Not available (streaming only) |
Exponential Backoff
# Python/MicroPython
async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id):
delay = base_delay
for attempt in range(1, max_retries + 1):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.read()
except Exception as e:
if attempt < max_retries:
await asyncio.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
raise Exception("Failed to fetch after max retries")
Correlation ID Logging
All platforms use correlation IDs for distributed tracing:
[timestamp] [Correlation: abc123] Message published to subject
Serialization Performance Comparison
| Format | Use Case | Pros | Cons |
|---|---|---|---|
arrowtable |
Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library |
jsontable |
Small/medium tabular data | Human-readable, universal support | Slower, larger size, no schema |
Testing Strategy (Cross-Platform)
Unit Tests
| Test Type | Julia | JavaScript | Python |
|---|---|---|---|
| Serialization | test/test_julia_text_sender.jl |
test/test_js_text_sender.js |
test/test_py_text_sender.py |
| Deserialization | test/test_julia_text_receiver.jl |
test/test_js_text_receiver.js |
test/test_py_text_receiver.py |
| Large Payload | test/test_julia_file_sender.jl |
test/test_js_file_sender.js |
test/test_py_file_sender.py |
| Multi-Payload | test/test_julia_mix_payloads_sender.jl |
test/test_js_mix_payloads_sender.js |
test/test_py_mix_payloads_sender.py |
| Arrow Table | test/test_julia_table_sender.jl |
test/test_js_table_sender.js |
test/test_py_table_sender.py |
Integration Tests
- NATS server communication
- File server upload/download
- Cross-platform message exchange
- Arrow table serialization/deserialization
- JSON table serialization/deserialization
Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
NATS_URL |
nats://localhost:4222 |
NATS server URL |
FILESERVER_URL |
http://localhost:8080 |
HTTP file server URL |
SIZE_THRESHOLD |
1000000 |
Size threshold in bytes (1MB) |
MicroPython-Specific Configuration
# micropython.conf
NATS_URL = "nats://broker.local:4222"
FILESERVER_URL = "http://fileserver.local:8080"
SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices
MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython
Summary
This cross-platform NATS bridge provides:
- High-Level API Parity: Identical
smartsend()andsmartreceive()signatures across Julia, JavaScript, and Python/MicroPython - Idiomatic Implementations:
- Julia: Multiple dispatch and struct-based design
- JavaScript: Async/await and prototype-based utilities
- Python: Class-based design with type hints
- Message Format Consistency: Identical
msg_envelope_v1andmsg_payload_v1JSON schemas - Handler Abstraction: File server operations abstracted through configurable handlers
- Platform-Specific Optimizations:
- Arrow IPC (
arrowtable): Efficient binary format for large tabular data - JSON (
jsontable): Universal human-readable format for smaller tables - Streaming support in MicroPython
- Arrow IPC (
The Julia implementation serves as the ground truth for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms.
Datatype Summary
| Datatype | Serialization | Use Case | Encoding |
|---|---|---|---|
text |
UTF-8 bytes | Text messages, chat content | utf-8 → base64 |
dictionary |
JSON | Structured key-value data, config | json → base64 |
arrowtable |
Apache Arrow IPC | Large tabular data, schema-preserving | arrow-ipc → base64 |
jsontable |
JSON | Small/medium tabular data, human-readable | json → base64 |
image |
Binary | Image files (JPEG, PNG, etc.) | binary → base64 |
audio |
Binary | Audio files (WAV, MP3, etc.) | binary → base64 |
video |
Binary | Video files (MP4, AVI, etc.) | binary → base64 |
binary |
Binary | Generic binary data, files | binary → base64 |