34 KiB
Cross-Platform Architecture Documentation: Bi-Directional Data Bridge
Overview
This document describes the architecture for a high-performance, bi-directional data bridge using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with high-level API parity while maintaining idiomatic implementations for each language.
Supported Platforms:
- Julia - Ground truth implementation with full feature set
- JavaScript - Node.js and browser-compatible implementation
- Python/MicroPython - Desktop and embedded-compatible implementation
Cross-Platform Design Principles
- High-Level API Parity: All three platforms expose the same
smartsend()andsmartreceive()functions with identical signatures and behavior - Idiomatic Implementations: Each platform uses its native patterns (multiple dispatch in Julia, async/prototype in JS, class-based in Python)
- Message Format Consistency: The
msg_envelope_v1andmsg_payload_v1JSON schemas are identical across all platforms - Handler Function Abstraction: File server operations are abstracted through handler functions for backend flexibility
High-Level API Standard (Cross-Platform)
Unified API Signature
All three platforms expose the same high-level API:
Input Format (smartsend):
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
Output Format (smartreceive):
{
"correlation_id": "...",
"msg_id": "...",
"timestamp": "...",
"send_to": "...",
"msg_purpose": "...",
"sender_name": "...",
"sender_id": "...",
"receiver_name": "...",
"receiver_id": "...",
"reply_to": "...",
"reply_to_msg_id": "...",
"broker_url": "...",
"metadata": {...},
"payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
}
Supported Payload Types
| Type | Julia | JavaScript | Python/MicroPython |
|---|---|---|---|
text |
String |
string |
str |
dictionary |
Dict, NamedTuple |
Object, Array |
dict, list |
table |
DataFrame, Arrow.Table |
Array<Object> (input) → Buffer (Arrow IPC) |
pandas.DataFrame, bytes (Arrow IPC) |
image |
Vector{UInt8} |
Uint8Array, Buffer |
bytes, bytearray |
audio |
Vector{UInt8} |
Uint8Array, Buffer |
bytes, bytearray |
video |
Vector{UInt8} |
Uint8Array, Buffer |
bytes, bytearray |
binary |
Vector{UInt8}, IOBuffer |
Uint8Array, Buffer |
bytes, bytearray, io.BytesIO |
Cross-Platform API Examples
Julia:
using NATSBridge
# Send
env, env_json_str = smartsend(
"/chat",
[("message", "Hello!", "text"), ("image", image_bytes, "image")],
broker_url="nats://localhost:4222"
)
# Receive - returns JSON.Object{String, Any}
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}}
# Access payloads: for (dataname, data, type) in env["payloads]
JavaScript:
const NATSBridge = require('natsbridge');
// Send
const [env, env_json_str] = await NATSBridge.smartsend(
"/chat",
[
["message", "Hello!", "text"],
["image", imageBuffer, "image"]
],
{ broker_url: "nats://localhost:4222" }
);
// Receive - returns Promise<object>
const env = await NATSBridge.smartreceive(msg, {
fileserver_download_handler: fetchWithBackoff
});
// env is an object with "payloads" field containing Array of arrays
// Access payloads: for (const [dataname, data, type] of env.payloads)
Python:
from natsbridge import NATSBridge
# Send
env, env_json_str = NATSBridge.smartsend(
"/chat",
[("message", "Hello!", "text"), ("image", image_bytes, "image")],
broker_url="nats://localhost:4222"
)
# Receive - returns Tuple[Dict, str]
env = NATSBridge.smartreceive(
msg,
fileserver_download_handler=fetch_with_backoff
)
# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
# Access payloads: for dataname, data, type_ in env["payloads"]
MicroPython:
from natsbridge import NATSBridge
# Send (limited to direct transport due to memory constraints)
env, env_json_str = NATSBridge.smartsend(
"/chat",
[("message", "Hello!", "text")],
broker_url="nats://localhost:4222"
)
Architecture Diagram (Cross-Platform)
flowchart TB
subgraph JuliaApp["Julia Application"]
JuliaAppCode[App Code]
JuliaBridge[NATSBridge.jl]
JuliaNATS[<b>NATS.jl</b>]
end
subgraph JSApp["JavaScript Application"]
JSAppCode[App Code]
JSBridge[NATSBridge.js]
JSNATS[<b>nats.js</b>]
end
subgraph PythonApp["Python/MicroPython Application"]
PythonAppCode[App Code]
PythonBridge[NATSBridge.py]
PythonNATS[<b>nats.py</b>]
end
subgraph Infrastructure["Infrastructure"]
NATS[<b>NATS Server</b><br/>Message Broker]
FileServer[<b>HTTP File Server</b><br/>Upload/Download]
end
JuliaAppCode --> JuliaBridge
JuliaBridge --> JuliaNATS
JSAppCode --> JSBridge
JSBridge --> JSNATS
PythonAppCode --> PythonBridge
PythonBridge --> PythonNATS
JuliaNATS --> NATS
JSNATS --> NATS
PythonNATS --> NATS
NATS --> JuliaNATS
NATS --> JSNATS
NATS --> PythonNATS
JuliaBridge -.->|HTTP POST upload| FileServer
JSBridge -.->|HTTP POST upload| FileServer
PythonBridge -.->|HTTP POST upload| FileServer
FileServer -.->|HTTP GET download| JuliaBridge
FileServer -.->|HTTP GET download| JSBridge
FileServer -.->|HTTP GET download| PythonBridge
style JuliaApp fill:#c5e1a5
style JSApp fill:#bbdefb
style PythonApp fill:#f8bbd0
style NATS fill:#fff3e0
style FileServer fill:#f3e5f5
System Components
1. msg_envelope_v1 - Message Envelope
JSON Schema (Identical Across All Platforms):
{
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
"content_type": "application/octet-stream",
"content_length": 123456
},
"payloads": [
{
"id": "uuid4",
"dataname": "login_image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,
"data": "base64-encoded-string",
"metadata": {
"checksum": "sha256_hash"
}
},
{
"id": "uuid4",
"dataname": "large_table",
"payload_type": "table",
"transport": "link",
"encoding": "none",
"size": 524288,
"data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow",
"metadata": {}
}
]
}
2. msg_payload_v1 - Payload Structure
JSON Schema (Identical Across All Platforms):
{
"id": "uuid4",
"dataname": "login_image",
"payload_type": "image | dictionary | table | text | audio | video | binary",
"transport": "direct | link",
"encoding": "none | json | base64 | arrow-ipc",
"size": 15433,
"data": "base64-encoded-string | http-url",
"metadata": {
"checksum": "sha256_hash"
}
}
3. Transport Strategy Decision Logic (Cross-Platform)
┌─────────────────────────────────────────────────────────────┐
│ smartsend Function (All Platforms) │
│ Accepts: [(dataname1, data1, type1), ...] │
│ (Type is per payload, not standalone) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ For each payload: │
│ 1. Extract type from tuple/array │
│ 2. Serialize based on type │
│ 3. Check payload size │
└─────────────────────────────────────────────────────────────┘
│
┌───────────┴────────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Direct Path │ │ Link Path │
│ (< 1MB) │ │ (>= 1MB) │
│ │ │ │
│ • Serialize │ │ • Serialize │
│ to buffer │ │ to buffer │
│ • Base64 │ │ • Upload to │
│ encode │ │ HTTP Server│
│ • Publish to │ │ • Publish to │
│ NATS │ │ NATS with │
│ (in msg) │ │ URL │
└──────────────┘ └──────────────┘
Platform-Specific Implementations
Julia Implementation
Architecture Patterns
Multiple Dispatch: Julia's core strength is leveraged through function overloading:
# publish_message has two overloads based on argument types
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url)
publish_message(conn, subject, message, correlation_id)
end
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message)
log_trace(correlation_id, "Message published to $subject")
finally
NATS.drain(conn)
end
end
Struct-Based Data Models:
struct msg_payload_v1
id::String
dataname::String
payload_type::String
transport::String
encoding::String
size::Integer
data::Any
metadata::Dict{String, Any}
end
struct msg_envelope_v1
correlation_id::String
msg_id::String
timestamp::String
send_to::String
msg_purpose::String
sender_name::String
sender_id::String
receiver_name::String
receiver_id::String
reply_to::String
reply_to_msg_id::String
broker_url::String
metadata::Dict{String, Any}
payloads::Vector{msg_payload_v1}
end
Dependencies
| Package | Purpose |
|---|---|
NATS.jl |
Core NATS functionality |
Arrow.jl |
Arrow IPC serialization |
JSON.jl |
JSON parsing |
HTTP.jl |
HTTP client for file server |
UUIDs.jl |
UUID generation |
Dates.jl |
Timestamps |
Base64 |
Base64 encoding |
File Server Handler Signatures
# Upload handler
fileserver_upload_handler(
fileserver_url::String,
dataname::String,
data::Vector{UInt8}
)::Dict{String, Any}
# Download handler
fileserver_download_handler(
url::String,
max_retries::Int,
base_delay::Int,
max_delay::Int,
correlation_id::String
)::Vector{UInt8}
Key Functions
# Main send/receive functions
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, Any, String}, 1};
broker_url::String = DEFAULT_BROKER_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::String = string(uuid4()),
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true,
NATS_connection::Union{NATS.Connection, Nothing} = nothing,
msg_id::String = string(uuid4()),
sender_id::String = string(uuid4())
)::Tuple{msg_envelope_v1, String}
function smartreceive(
msg::NATS.Msg;
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)::JSON.Object{String, Any}
JavaScript Implementation
Architecture Patterns
Async/Await Pattern: JavaScript uses async/await for non-blocking I/O:
// smartsend is async and returns a Promise
async function smartsend(subject, data, options = {}) {
const {
broker_url = DEFAULT_BROKER_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler = plikOneshotUpload,
size_threshold = DEFAULT_SIZE_THRESHOLD,
correlation_id = generateUUID(),
msg_purpose = "chat",
sender_name = "NATSBridge",
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = "",
is_publish = true,
nats_connection = null,
msg_id = generateUUID(),
sender_id = generateUUID()
} = options;
// Process payloads
const payloads = [];
for (const [dataname, payloadData, payloadType] of data) {
const payloadBytes = await serializeData(payloadData, payloadType);
const payloadSize = payloadBytes.byteLength;
if (payloadSize < size_threshold) {
// Direct path
const payloadB64 = base64Encode(payloadBytes);
payloads.push({
id: generateUUID(),
dataname,
payload_type: payloadType,
transport: "direct",
encoding: "base64",
size: payloadSize,
data: payloadB64
});
} else {
// Link path
const response = await fileserver_upload_handler(
fileserver_url, dataname, payloadBytes
);
payloads.push({
id: generateUUID(),
dataname,
payload_type: payloadType,
transport: "link",
encoding: "none",
size: payloadSize,
data: response.url
});
}
}
const env = buildEnvelope(subject, payloads, {
correlation_id, msg_id, msg_purpose,
sender_name, sender_id, receiver_name,
receiver_id, reply_to, reply_to_msg_id,
broker_url
});
const env_json_str = JSON.stringify(env);
if (is_publish) {
if (nats_connection) {
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
} else {
await publishMessage(broker_url, subject, env_json_str, correlation_id);
}
}
return [env, env_json_str];
}
Prototype-Based Utilities:
// NATS client wrapper (prototype-based)
class NATSClient {
constructor(url) {
this.url = url;
this.connection = null;
}
async connect() {
this.connection = await nats.connect({ servers: this.url });
return this.connection;
}
async publish(subject, message) {
if (!this.connection) {
await this.connect();
}
await this.connection.publish(subject, message);
}
async close() {
if (this.connection) {
this.connection.close();
}
}
}
Dependencies (Node.js)
| Package | Purpose |
|---|---|
nats |
Core NATS functionality (nats.js) |
uuid |
UUID generation |
node-fetch or axios |
HTTP client for file server |
apache-arrow |
Arrow IPC serialization |
Dependencies (Browser)
| Package | Purpose |
|---|---|
nats |
Browser-compatible NATS client |
uuid |
UUID generation |
fetch (native) |
HTTP client for file server |
apache-arrow |
Arrow IPC serialization |
Dependencies (MicroPython)
| Module | Purpose |
|---|---|
nats (custom) |
MicroPython NATS client |
time |
Timestamps |
uos |
File operations |
base64 |
Base64 encoding |
File Server Handler Signatures
// Upload handler - async function returning Promise
async function fileserver_upload_handler(
fileserver_url,
dataname,
data // Uint8Array
) {
// Returns: { status, uploadid, fileid, url }
}
// Download handler - async function returning Promise
async function fileserver_download_handler(
url,
max_retries,
base_delay,
max_delay,
correlation_id
) {
// Returns: Uint8Array
}
Key Functions
// Main send/receive functions
async function smartsend(subject, data, options = {}) {
// data: Array of [dataname, data, type] tuples
// Returns: Promise<[env, env_json_str]>
}
async function smartreceive(msg, options = {}) {
// msg: NATS message object
// Returns: Promise<env_object>
}
// Utility functions
async function serializeData(data, payload_type) {
// Returns: Uint8Array
}
async function deserializeData(data, payload_type) {
// Returns: deserialized data
}
async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) {
// Returns: Uint8Array
}
Python/MicroPython Implementation
Architecture Patterns
Class-Based Design: Python uses classes for stateful operations:
class NATSBridge:
"""Cross-platform NATS bridge implementation."""
DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"
def __init__(self, broker_url=None, fileserver_url=None):
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL
self._nats_client = None
async def smartsend(self, subject, data, **kwargs):
"""
Send data via NATS with automatic transport selection.
Args:
subject: NATS subject to publish to
data: List of (dataname, data, type) tuples
**kwargs: Additional options (broker_url, fileserver_url, etc.)
Returns:
Tuple of (env, env_json_str)
"""
# Extract options with defaults
options = self._merge_options(kwargs)
# Process payloads
payloads = []
for dataname, payload_data, payload_type in data:
payload_bytes = self._serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes)
if payload_size < options['size_threshold']:
# Direct path
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
payloads.append({
'id': uuid.uuid4().hex,
'dataname': dataname,
'payload_type': payload_type,
'transport': 'direct',
'encoding': 'base64',
'size': payload_size,
'data': payload_b64
})
else:
# Link path
response = await options['fileserver_upload_handler'](
options['fileserver_url'], dataname, payload_bytes
)
payloads.append({
'id': uuid.uuid4().hex,
'dataname': dataname,
'payload_type': payload_type,
'transport': 'link',
'encoding': 'none',
'size': payload_size,
'data': response['url']
})
# Build envelope
env = self._build_envelope(subject, payloads, options)
env_json_str = json.dumps(env)
if options['is_publish']:
await self._publish_message(
subject, env_json_str, options['correlation_id'],
nats_connection=options.get('nats_connection')
)
return env, env_json_str
async def smartreceive(self, msg, **kwargs):
"""
Receive and process NATS message.
Args:
msg: NATS message object
**kwargs: Additional options (fileserver_download_handler, etc.)
Returns:
Dict with envelope metadata and payloads
"""
# Parse envelope
env_json_obj = json.loads(msg.payload)
# Process payloads
payloads_list = []
for payload in env_json_obj['payloads']:
transport = payload['transport']
dataname = payload['dataname']
if transport == 'direct':
payload_b64 = payload['data']
payload_bytes = base64.b64decode(payload_b64)
data_type = payload['payload_type']
data = self._deserialize_data(payload_bytes, data_type)
payloads_list.append((dataname, data, data_type))
elif transport == 'link':
url = payload['data']
downloaded_data = await options['fileserver_download_handler'](
url,
options['max_retries'],
options['base_delay'],
options['max_delay'],
env_json_obj['correlation_id']
)
data_type = payload['payload_type']
data = self._deserialize_data(downloaded_data, data_type)
payloads_list.append((dataname, data, data_type))
env_json_obj['payloads'] = payloads_list
return env_json_obj
Dataclass for Type Safety:
from dataclasses import dataclass, field
from typing import Any, Dict, List, Tuple, Union
@dataclass
class MsgPayloadV1:
"""Message payload structure."""
id: str
dataname: str
payload_type: str
transport: str
encoding: str
size: int
data: Union[str, bytes] # URL for link, base64 for direct
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MsgEnvelopeV1:
"""Message envelope structure."""
correlation_id: str
msg_id: str
timestamp: str
send_to: str
msg_purpose: str
sender_name: str
sender_id: str
receiver_name: str
receiver_id: str
reply_to: str
reply_to_msg_id: str
broker_url: str
metadata: Dict[str, Any] = field(default_factory=dict)
payloads: List[MsgPayloadV1] = field(default_factory=list)
Dependencies (Desktop Python)
| Package | Purpose |
|---|---|
nats-py |
Core NATS functionality |
uuid |
UUID generation (stdlib) |
aiohttp or requests |
HTTP client for file server |
pyarrow |
Arrow IPC serialization |
pandas |
DataFrame support (optional) |
python-dateutil |
Timestamps |
base64 |
Base64 encoding (stdlib) |
Dependencies (MicroPython)
| Module | Purpose |
|---|---|
network |
NATS connection (custom) |
time |
Timestamps |
uos |
File operations |
base64 |
Base64 encoding |
json |
JSON parsing |
struct |
Binary data handling |
MicroPython Limitations:
- No Arrow IPC support (memory constraints)
- Only direct transport (< 1MB threshold enforced)
- Simplified UUID generation
- No async/await (use callbacks or uasyncio)
File Server Handler Signatures
# Upload handler - async function
async def fileserver_upload_handler(
fileserver_url: str,
dataname: str,
data: bytes
) -> Dict[str, Any]:
"""
Upload data to file server.
Args:
fileserver_url: Base URL of file server
dataname: Name of the file
data: Binary data
Returns:
Dict with keys: 'status', 'uploadid', 'fileid', 'url'
"""
pass
# Download handler - async function
async def fileserver_download_handler(
url: str,
max_retries: int,
base_delay: int,
max_delay: int,
correlation_id: str
) -> bytes:
"""
Download data from URL with exponential backoff.
Args:
url: URL to download from
max_retries: Maximum retry attempts
base_delay: Initial delay in ms
max_delay: Maximum delay in ms
correlation_id: Correlation ID for logging
Returns:
Downloaded bytes
"""
pass
Key Functions
# Main send/receive functions (standalone or class methods)
async def smartsend(
subject: str,
data: List[Tuple[str, Any, str]],
broker_url: str = DEFAULT_BROKER_URL,
fileserver_url: str = DEFAULT_FILESERVER_URL,
fileserver_upload_handler: Callable = plik_oneshot_upload,
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
correlation_id: str = None,
msg_purpose: str = "chat",
sender_name: str = "NATSBridge",
receiver_name: str = "",
receiver_id: str = "",
reply_to: str = "",
reply_to_msg_id: str = "",
is_publish: bool = True,
nats_connection: Any = None,
msg_id: str = None,
sender_id: str = None
) -> Tuple[Dict, str]:
"""Send data via NATS."""
pass
async def smartreceive(
msg: Any,
fileserver_download_handler: Callable = fetch_with_backoff,
max_retries: int = 5,
base_delay: int = 100,
max_delay: int = 5000
) -> Dict:
"""Receive and process NATS message."""
pass
# Utility functions
def _serialize_data(data: Any, payload_type: str) -> bytes:
"""Serialize data to bytes."""
pass
def _deserialize_data(data: bytes, payload_type: str) -> Any:
"""Deserialize bytes to data."""
pass
async def fetch_with_backoff(
url: str,
max_retries: int,
base_delay: int,
max_delay: int,
correlation_id: str
) -> bytes:
"""Fetch URL with exponential backoff."""
pass
Platform Comparison Matrix
| Feature | Julia | JavaScript | Python | MicroPython |
|---|---|---|---|---|
| Multiple Dispatch | ✅ Native | ❌ (Prototypes) | ❌ (Overload via @overload) |
❌ |
| Async/Await | ❌ (Tasks) | ✅ Native | ✅ Native | ⚠️ (uasyncio) |
| Type Safety | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ |
| Memory Management | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) |
| Arrow IPC | ✅ Native | ✅ (arrow package) | ✅ (pyarrow) | ❌ |
| Direct Transport | ✅ | ✅ | ✅ | ✅ |
| Link Transport | ✅ | ✅ | ✅ | ⚠️ (Limited) |
| Handler Functions | ✅ | ✅ | ✅ | ✅ |
| Cross-Platform API | ✅ | ✅ | ✅ | ✅ |
Implementation Details by Platform
Julia: Multiple Dispatch Pattern
# Function overloading based on argument types
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
# Creates new connection
end
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
# Uses pre-existing connection
end
# Type-specific serialization
function _serialize_data(data::String, payload_type::String)
# Text handling
end
function _serialize_data(data::Dict, payload_type::String)
# Dictionary handling
end
function _serialize_data(data::DataFrame, payload_type::String)
# Table handling
end
JavaScript: Prototype + Async Pattern
// Class-based NATS client
class NATSClient {
constructor(url) {
this.url = url;
}
async connect() {
// Connection logic
}
async publish(subject, message) {
// Publish logic
}
}
// Module-level utility functions
function generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => {
const r = Math.random() * 16 | 0;
return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16);
});
}
async function serializeData(data, payload_type) {
// Serialization logic
}
Python: Class-Based Pattern
class NATSBridge:
"""Main bridge class."""
def __init__(self, broker_url=None):
self.broker_url = broker_url or DEFAULT_BROKER_URL
async def smartsend(self, subject, data, **kwargs):
"""Send data."""
pass
async def smartreceive(self, msg, **kwargs):
"""Receive message."""
pass
# Module-level convenience functions
def smartsend(subject, data, **kwargs):
"""Convenience function using default NATSBridge instance."""
bridge = NATSBridge()
return await bridge.smartsend(subject, data, **kwargs)
def smartreceive(msg, **kwargs):
"""Convenience function using default NATSBridge instance."""
bridge = NATSBridge()
return await bridge.smartreceive(msg, **kwargs)
Scenario Implementations (Cross-Platform)
Scenario 1: Command & Control (Small Dictionary)
| Platform | Code |
|---|---|
| Julia | julia<br>config = Dict("step_size" => 0.01)<br>env, env_json_str = smartsend("control", [("config", config, "dictionary")]) |
| JavaScript | javascript<br>const config = { step_size: 0.01 };<br>[env, env_json_str] = await smartsend("control", [["config", config, "dictionary"]]); |
| Python | python<br>config = {"step_size": 0.01}<br>env, env_json_str = await smartsend("control", [("config", config, "dictionary")]) |
Scenario 2: Deep Dive Analysis (Large Arrow Table)
| Platform | Code |
|---|---|
| Julia | julia<br>df = DataFrame(id=1:1000000, value=rand(1000000))<br>env, env_json_str = smartsend("analysis", [("table", df, "table")]) |
| JavaScript | javascript<br>const df = [{ id: 1, value: 0.5 }, ...];<br>[env, env_json_str] = await smartsend("analysis", [["table", df, "table"]]); |
| Python | python<br>import pandas as pd<br>df = pd.DataFrame({"id": range(1000000), "value": np.random.rand(1000000)})<br>env, env_json_str = await smartsend("analysis", [("table", df, "table")]) |
Scenario 3: Chat System (Multi-Payload)
| Platform | Code |
|---|---|
| Julia | julia<br>chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]<br>env, env_json_str = smartsend("chat", chat) |
| JavaScript | javascript<br>const chat = [["text", "Hello!", "text"], ["image", imgBuffer, "image"]];<br>[env, env_json_str] = await smartsend("chat", chat); |
| Python | python<br>chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]<br>env, env_json_str = await smartsend("chat", chat) |
Performance Considerations (Cross-Platform)
Zero-Copy Reading
| Platform | Strategy |
|---|---|
| Julia | Arrow.read() with memory-mapped files |
| JavaScript | ArrayBuffer with DataView |
| Python | pyarrow memory mapping |
| MicroPython | Not available (streaming only) |
Exponential Backoff
# Python/MicroPython
async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id):
delay = base_delay
for attempt in range(1, max_retries + 1):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.read()
except Exception as e:
if attempt < max_retries:
await asyncio.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
raise Exception("Failed to fetch after max retries")
Correlation ID Logging
All platforms use correlation IDs for distributed tracing:
[timestamp] [Correlation: abc123] Message published to subject
Testing Strategy (Cross-Platform)
Unit Tests
| Test Type | Julia | JavaScript | Python |
|---|---|---|---|
| Serialization | test/test_julia_text_sender.jl |
test/test_js_text_sender.js |
test/test_py_text_sender.py |
| Deserialization | test/test_julia_text_receiver.jl |
test/test_js_text_receiver.js |
test/test_py_text_receiver.py |
| Large Payload | test/test_julia_file_sender.jl |
test/test_js_file_sender.js |
test/test_py_file_sender.py |
| Multi-Payload | test/test_julia_mix_payloads_sender.jl |
test/test_js_mix_payloads_sender.js |
test/test_py_mix_payloads_sender.py |
Integration Tests
- NATS server communication
- File server upload/download
- Cross-platform message exchange
Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
NATS_URL |
nats://localhost:4222 |
NATS server URL |
FILESERVER_URL |
http://localhost:8080 |
HTTP file server URL |
SIZE_THRESHOLD |
1000000 |
Size threshold in bytes (1MB) |
MicroPython-Specific Configuration
# micropython.conf
NATS_URL = "nats://broker.local:4222"
FILESERVER_URL = "http://fileserver.local:8080"
SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices
MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython
Summary
This cross-platform NATS bridge provides:
- High-Level API Parity: Identical
smartsend()andsmartreceive()signatures across Julia, JavaScript, and Python/MicroPython - Idiomatic Implementations:
- Julia: Multiple dispatch and struct-based design
- JavaScript: Async/await and prototype-based utilities
- Python: Class-based design with type hints
- Message Format Consistency: Identical
msg_envelope_v1andmsg_payload_v1JSON schemas - Handler Abstraction: File server operations abstracted through configurable handlers
- Platform-Specific Optimizations: Arrow IPC support in desktop platforms, streaming support in MicroPython
The Julia implementation serves as the ground truth for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms.