Files
NATSBridge/docs/updated_architecture.md
2026-03-08 11:34:10 +07:00

1323 lines
41 KiB
Markdown

# Cross-Platform Architecture Documentation: Bi-Directional Data Bridge
## Overview
This document describes the architecture for a high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language.
**Supported Platforms:**
- **Julia** - Ground truth implementation with full feature set
- **JavaScript** - Node.js and browser-compatible implementation
- **Python/MicroPython** - Desktop and embedded-compatible implementation
### Cross-Platform Design Principles
1. **High-Level API Parity**: All three platforms expose the same `smartsend()` and `smartreceive()` functions with identical signatures and behavior
2. **Idiomatic Implementations**: Each platform uses its native patterns (multiple dispatch in Julia, async/prototype in JS, class-based in Python)
3. **Message Format Consistency**: The `msg_envelope_v1` and `msg_payload_v1` JSON schemas are identical across all platforms
4. **Handler Function Abstraction**: File server operations are abstracted through handler functions for backend flexibility
---
## High-Level API Standard (Cross-Platform)
### Unified API Signature
All three platforms expose the same high-level API:
**Input Format (smartsend):**
```
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
```
**Output Format (smartreceive):**
```
{
"correlation_id": "...",
"msg_id": "...",
"timestamp": "...",
"send_to": "...",
"msg_purpose": "...",
"sender_name": "...",
"sender_id": "...",
"receiver_name": "...",
"receiver_id": "...",
"reply_to": "...",
"reply_to_msg_id": "...",
"broker_url": "...",
"metadata": {...},
"payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
}
```
### Supported Payload Types
| Type | Julia | JavaScript | Python/MicroPython |
|------|-------|------------|-------------------|
| `text` | `String` | `string` | `str` |
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` |
| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array<Object>` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) |
| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array<Object>` | `list[dict]`, `list` |
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` |
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` |
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` |
| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray`, `io.BytesIO` |
### Cross-Platform API Examples
**Julia:**
```julia
using NATSBridge
# Send
env, env_json_str = smartsend(
"/chat",
[("message", "Hello!", "text"), ("image", image_bytes, "image")],
broker_url="nats://localhost:4222"
)
# Receive - returns JSON.Object{String, Any}
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}}
# Access payloads: for (dataname, data, type) in env["payloads]
```
**JavaScript:**
```javascript
const NATSBridge = require('natsbridge');
// Send
const [env, env_json_str] = await NATSBridge.smartsend(
"/chat",
[
["message", "Hello!", "text"],
["image", imageBuffer, "image"]
],
{ broker_url: "nats://localhost:4222" }
);
// Receive - returns Promise<object>
const env = await NATSBridge.smartreceive(msg, {
fileserver_download_handler: fetchWithBackoff
});
// env is an object with "payloads" field containing Array of arrays
// Access payloads: for (const [dataname, data, type] of env.payloads)
```
**Python:**
```python
from natsbridge import NATSBridge
# Send
env, env_json_str = NATSBridge.smartsend(
"/chat",
[("message", "Hello!", "text"), ("image", image_bytes, "image")],
broker_url="nats://localhost:4222"
)
# Receive - returns Tuple[Dict, str]
env = NATSBridge.smartreceive(
msg,
fileserver_download_handler=fetch_with_backoff
)
# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
# Access payloads: for dataname, data, type_ in env["payloads"]
```
**MicroPython:**
```python
from natsbridge import NATSBridge
# Send (limited to direct transport due to memory constraints)
env, env_json_str = NATSBridge.smartsend(
"/chat",
[("message", "Hello!", "text")],
broker_url="nats://localhost:4222"
)
```
---
## Architecture Diagram (Cross-Platform)
```mermaid
flowchart TB
subgraph JuliaApp["Julia Application"]
JuliaAppCode[App Code]
JuliaBridge[NATSBridge.jl]
JuliaNATS[<b>NATS.jl</b>]
end
subgraph JSApp["JavaScript Application"]
JSAppCode[App Code]
JSBridge[NATSBridge.js]
JSNATS[<b>nats.js</b>]
end
subgraph PythonApp["Python/MicroPython Application"]
PythonAppCode[App Code]
PythonBridge[NATSBridge.py]
PythonNATS[<b>nats.py</b>]
end
subgraph Infrastructure["Infrastructure"]
NATS[<b>NATS Server</b><br/>Message Broker]
FileServer[<b>HTTP File Server</b><br/>Upload/Download]
end
JuliaAppCode --> JuliaBridge
JuliaBridge --> JuliaNATS
JSAppCode --> JSBridge
JSBridge --> JSNATS
PythonAppCode --> PythonBridge
PythonBridge --> PythonNATS
JuliaNATS --> NATS
JSNATS --> NATS
PythonNATS --> NATS
NATS --> JuliaNATS
NATS --> JSNATS
NATS --> PythonNATS
JuliaBridge -.->|HTTP POST upload| FileServer
JSBridge -.->|HTTP POST upload| FileServer
PythonBridge -.->|HTTP POST upload| FileServer
FileServer -.->|HTTP GET download| JuliaBridge
FileServer -.->|HTTP GET download| JSBridge
FileServer -.->|HTTP GET download| PythonBridge
style JuliaApp fill:#c5e1a5
style JSApp fill:#bbdefb
style PythonApp fill:#f8bbd0
style NATS fill:#fff3e0
style FileServer fill:#f3e5f5
```
---
## System Components
### 1. msg_envelope_v1 - Message Envelope
**JSON Schema (Identical Across All Platforms):**
```json
{
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
"content_type": "application/octet-stream",
"content_length": 123456
},
"payloads": [
{
"id": "uuid4",
"dataname": "login_image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,
"data": "base64-encoded-string",
"metadata": {
"checksum": "sha256_hash"
}
},
{
"id": "uuid4",
"dataname": "large_arrow_table",
"payload_type": "arrowtable",
"transport": "link",
"encoding": "arrow-ipc",
"size": 524288,
"data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow",
"metadata": {}
},
{
"id": "uuid4",
"dataname": "json_table",
"payload_type": "jsontable",
"transport": "direct",
"encoding": "json",
"size": 1024,
"data": "[{\"id\": 1, \"name\": \"Alice\"}, {\"id\": 2, \"name\": \"Bob\"}]",
"metadata": {}
}
]
}
```
### 2. msg_payload_v1 - Payload Structure
**JSON Schema (Identical Across All Platforms):**
```json
{
"id": "uuid4",
"dataname": "login_image",
"payload_type": "image | dictionary | arrowtable | jsontable | text | audio | video | binary",
"transport": "direct | link",
"encoding": "none | json | base64 | arrow-ipc",
"size": 15433,
"data": "base64-encoded-string | http-url | json-string",
"metadata": {
"checksum": "sha256_hash"
}
}
```
### 3. Transport Strategy Decision Logic (Cross-Platform)
```
┌─────────────────────────────────────────────────────────────┐
│ smartsend Function (All Platforms) │
│ Accepts: [(dataname1, data1, type1), ...] │
│ (Type is per payload, not standalone) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ For each payload: │
│ 1. Extract type from tuple/array │
│ 2. Serialize based on type │
│ 3. Check payload size │
└─────────────────────────────────────────────────────────────┘
┌───────────┴────────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Direct Path │ │ Link Path │
│ (< 1MB) │ │ (>= 1MB) │
│ │ │ │
│ • Serialize │ │ • Serialize │
│ to buffer │ │ to buffer │
│ • Base64/JSON│ │ • Upload to │
│ encode │ │ HTTP Server│
│ • Publish to │ │ • Publish to │
│ NATS │ │ NATS with │
│ (in msg) │ │ URL │
└──────────────┘ └──────────────┘
```
---
## Platform-Specific Implementations
### Julia Implementation
#### Architecture Patterns
**Multiple Dispatch:** Julia's core strength is leveraged through function overloading:
```julia
# publish_message has two overloads based on argument types
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url)
publish_message(conn, subject, message, correlation_id)
end
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message)
log_trace(correlation_id, "Message published to $subject")
finally
NATS.drain(conn)
end
end
```
**Struct-Based Data Models:**
```julia
struct msg_payload_v1
id::String
dataname::String
payload_type::String
transport::String
encoding::String
size::Integer
data::Any
metadata::Dict{String, Any}
end
struct msg_envelope_v1
correlation_id::String
msg_id::String
timestamp::String
send_to::String
msg_purpose::String
sender_name::String
sender_id::String
receiver_name::String
receiver_id::String
reply_to::String
reply_to_msg_id::String
broker_url::String
metadata::Dict{String, Any}
payloads::Vector{msg_payload_v1}
end
```
#### Dependencies
| Package | Purpose |
|---------|---------|
| `NATS.jl` | Core NATS functionality |
| `Arrow.jl` | Arrow IPC serialization |
| `JSON.jl` | JSON parsing |
| `HTTP.jl` | HTTP client for file server |
| `UUIDs.jl` | UUID generation |
| `Dates.jl` | Timestamps |
| `Base64` | Base64 encoding |
#### File Server Handler Signatures
```julia
# Upload handler
fileserver_upload_handler(
fileserver_url::String,
dataname::String,
data::Vector{UInt8}
)::Dict{String, Any}
# Download handler
fileserver_download_handler(
url::String,
max_retries::Int,
base_delay::Int,
max_delay::Int,
correlation_id::String
)::Vector{UInt8}
```
#### Key Functions
```julia
# Main send/receive functions
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, Any, String}, 1};
broker_url::String = DEFAULT_BROKER_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::String = string(uuid4()),
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true,
NATS_connection::Union{NATS.Connection, Nothing} = nothing,
msg_id::String = string(uuid4()),
sender_id::String = string(uuid4())
)::Tuple{msg_envelope_v1, String}
function smartreceive(
msg::NATS.Msg;
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)::JSON.Object{String, Any}
```
#### Serialization Logic for Tables
```julia
# Serialize table data based on payload_type
function _serialize_table_data(data::Any, payload_type::String)::Vector{UInt8}
if payload_type == "arrowtable"
# Serialize to Apache Arrow IPC format
buffer = IOBuffer()
Arrow.write(buffer, data)
return take!(buffer)
elseif payload_type == "jsontable"
# Serialize to JSON format
json_str = JSON.json(data)
return Vector{UInt8}(json_str)
else
throw(ArgumentError("Unknown payload_type: $payload_type"))
end
end
# Deserialize table data based on payload_type
function _deserialize_table_data(data::Vector{UInt8}, payload_type::String)::Any
if payload_type == "arrowtable"
# Deserialize from Apache Arrow IPC format
buffer = Buffer(data)
return Arrow.read(buffer)
elseif payload_type == "jsontable"
# Deserialize from JSON format
json_str = String(data)
return JSON.parse(json_str)
else
throw(ArgumentError("Unknown payload_type: $payload_type"))
end
end
```
---
### JavaScript Implementation
#### Architecture Patterns
**Async/Await Pattern:** JavaScript uses async/await for non-blocking I/O:
```javascript
// smartsend is async and returns a Promise
async function smartsend(subject, data, options = {}) {
const {
broker_url = DEFAULT_BROKER_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler = plikOneshotUpload,
size_threshold = DEFAULT_SIZE_THRESHOLD,
correlation_id = generateUUID(),
msg_purpose = "chat",
sender_name = "NATSBridge",
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = "",
is_publish = true,
nats_connection = null,
msg_id = generateUUID(),
sender_id = generateUUID()
} = options;
// Process payloads
const payloads = [];
for (const [dataname, payloadData, payloadType] of data) {
const payloadBytes = await serializeData(payloadData, payloadType);
const payloadSize = payloadBytes.byteLength;
if (payloadSize < size_threshold) {
// Direct path
const payloadB64 = base64Encode(payloadBytes);
payloads.push({
id: generateUUID(),
dataname,
payload_type: payloadType,
transport: "direct",
encoding: "base64",
size: payloadSize,
data: payloadB64
});
} else {
// Link path
const response = await fileserver_upload_handler(
fileserver_url, dataname, payloadBytes
);
payloads.push({
id: generateUUID(),
dataname,
payload_type: payloadType,
transport: "link",
encoding: "none",
size: payloadSize,
data: response.url
});
}
}
const env = buildEnvelope(subject, payloads, {
correlation_id, msg_id, msg_purpose,
sender_name, sender_id, receiver_name,
receiver_id, reply_to, reply_to_msg_id,
broker_url
});
const env_json_str = JSON.stringify(env);
if (is_publish) {
if (nats_connection) {
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
} else {
await publishMessage(broker_url, subject, env_json_str, correlation_id);
}
}
return [env, env_json_str];
}
```
**Prototype-Based Utilities:**
```javascript
// NATS client wrapper (prototype-based)
class NATSClient {
constructor(url) {
this.url = url;
this.connection = null;
}
async connect() {
this.connection = await nats.connect({ servers: this.url });
return this.connection;
}
async publish(subject, message) {
if (!this.connection) {
await this.connect();
}
await this.connection.publish(subject, message);
}
async close() {
if (this.connection) {
this.connection.close();
}
}
}
```
#### Dependencies (Node.js)
| Package | Purpose |
|---------|---------|
| `nats` | Core NATS functionality (nats.js) |
| `crypto` (built-in) | UUID generation (Node.js) |
| `node-fetch` or `axios` | HTTP client for file server |
| `apache-arrow` | Arrow IPC serialization |
#### Dependencies (Browser)
| Package | Purpose |
|---------|---------|
| `nats` | Browser-compatible NATS client |
| `crypto` (built-in) | UUID generation (browser) |
| `fetch` (native) | HTTP client for file server |
| `apache-arrow` | Arrow IPC serialization |
#### Dependencies (MicroPython)
| Module | Purpose |
|--------|---------|
| `nats` (custom) | MicroPython NATS client |
| `time` | Timestamps |
| `uos` | File operations |
| `base64` | Base64 encoding |
#### File Server Handler Signatures
```javascript
// Upload handler - async function returning Promise
async function fileserver_upload_handler(
fileserver_url,
dataname,
data // Uint8Array
) {
// Returns: { status, uploadid, fileid, url }
}
// Download handler - async function returning Promise
async function fileserver_download_handler(
url,
max_retries,
base_delay,
max_delay,
correlation_id
) {
// Returns: Uint8Array
}
```
#### Key Functions
```javascript
// Main send/receive functions
async function smartsend(subject, data, options = {}) {
// data: Array of [dataname, data, type] tuples
// Returns: Promise<[env, env_json_str]>
}
async function smartreceive(msg, options = {}) {
// msg: NATS message object
// Returns: Promise<env_object>
}
// Utility functions
async function serializeData(data, payload_type) {
// Returns: Uint8Array
}
async function deserializeData(data, payload_type) {
// Returns: deserialized data
}
async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) {
// Returns: Uint8Array
}
```
#### Serialization Logic for Tables
```javascript
// Serialize table data based on payload_type
async function serializeTableData(data, payload_type) {
if (payload_type === "arrowtable") {
// Serialize to Apache Arrow IPC format
const schema = new arrow.Schema([...]); // Define schema
const arr = arrow.tableToArrowTable(data, schema);
const buffer = arrow.RecordBatch.from(arr).toBuffer();
return new Uint8Array(buffer);
} else if (payload_type === "jsontable") {
// Serialize to JSON format
const jsonStr = JSON.stringify(data);
return new TextEncoder().encode(jsonStr);
} else {
throw new Error(`Unknown payload_type: ${payload_type}`);
}
}
// Deserialize table data based on payload_type
async function deserializeTableData(data, payload_type) {
if (payload_type === "arrowtable") {
// Deserialize from Apache Arrow IPC format
const buffer = arrow.arrayBufferToBuffer(data.buffer);
const batch = arrow.RecordBatch.deserialize(buffer);
return arrow.tableFromBatch(batch);
} else if (payload_type === "jsontable") {
// Deserialize from JSON format
const jsonStr = new TextDecoder().decode(data);
return JSON.parse(jsonStr);
} else {
throw new Error(`Unknown payload_type: ${payload_type}`);
}
}
```
---
### Python/MicroPython Implementation
#### Architecture Patterns
**Class-Based Design:** Python uses classes for stateful operations:
```python
class NATSBridge:
"""Cross-platform NATS bridge implementation."""
DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"
def __init__(self, broker_url=None, fileserver_url=None):
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL
self._nats_client = None
async def smartsend(self, subject, data, **kwargs):
"""
Send data via NATS with automatic transport selection.
Args:
subject: NATS subject to publish to
data: List of (dataname, data, type) tuples
**kwargs: Additional options (broker_url, fileserver_url, etc.)
Returns:
Tuple of (env, env_json_str)
"""
# Extract options with defaults
options = self._merge_options(kwargs)
# Process payloads
payloads = []
for dataname, payload_data, payload_type in data:
payload_bytes = self._serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes)
if payload_size < options['size_threshold']:
# Direct path
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
payloads.append({
'id': uuid.uuid4().hex,
'dataname': dataname,
'payload_type': payload_type,
'transport': 'direct',
'encoding': 'base64',
'size': payload_size,
'data': payload_b64
})
else:
# Link path
response = await options['fileserver_upload_handler'](
options['fileserver_url'], dataname, payload_bytes
)
payloads.append({
'id': uuid.uuid4().hex,
'dataname': dataname,
'payload_type': payload_type,
'transport': 'link',
'encoding': 'none',
'size': payload_size,
'data': response['url']
})
# Build envelope
env = self._build_envelope(subject, payloads, options)
env_json_str = json.dumps(env)
if options['is_publish']:
await self._publish_message(
subject, env_json_str, options['correlation_id'],
nats_connection=options.get('nats_connection')
)
return env, env_json_str
async def smartreceive(self, msg, **kwargs):
"""
Receive and process NATS message.
Args:
msg: NATS message object
**kwargs: Additional options (fileserver_download_handler, etc.)
Returns:
Dict with envelope metadata and payloads
"""
# Parse envelope
env_json_obj = json.loads(msg.payload)
# Process payloads
payloads_list = []
for payload in env_json_obj['payloads']:
transport = payload['transport']
dataname = payload['dataname']
if transport == 'direct':
payload_b64 = payload['data']
payload_bytes = base64.b64decode(payload_b64)
data_type = payload['payload_type']
data = self._deserialize_data(payload_bytes, data_type)
payloads_list.append((dataname, data, data_type))
elif transport == 'link':
url = payload['data']
downloaded_data = await options['fileserver_download_handler'](
url,
options['max_retries'],
options['base_delay'],
options['max_delay'],
env_json_obj['correlation_id']
)
data_type = payload['payload_type']
data = self._deserialize_data(downloaded_data, data_type)
payloads_list.append((dataname, data, data_type))
env_json_obj['payloads'] = payloads_list
return env_json_obj
```
**Dataclass for Type Safety:**
```python
from dataclasses import dataclass, field
from typing import Any, Dict, List, Tuple, Union
@dataclass
class MsgPayloadV1:
"""Message payload structure."""
id: str
dataname: str
payload_type: str
transport: str
encoding: str
size: int
data: Union[str, bytes] # URL for link, base64 for direct
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MsgEnvelopeV1:
"""Message envelope structure."""
correlation_id: str
msg_id: str
timestamp: str
send_to: str
msg_purpose: str
sender_name: str
sender_id: str
receiver_name: str
receiver_id: str
reply_to: str
reply_to_msg_id: str
broker_url: str
metadata: Dict[str, Any] = field(default_factory=dict)
payloads: List[MsgPayloadV1] = field(default_factory=list)
```
#### Dependencies (Desktop Python)
| Package | Purpose |
|---------|---------|
| `nats-py` | Core NATS functionality |
| `uuid` | UUID generation (stdlib) |
| `aiohttp` or `requests` | HTTP client for file server |
| `pyarrow` | Arrow IPC serialization |
| `pandas` | DataFrame support (optional) |
| `python-dateutil` | Timestamps |
| `base64` | Base64 encoding (stdlib) |
#### Dependencies (MicroPython)
| Module | Purpose |
|--------|---------|
| `network` | NATS connection (custom) |
| `time` | Timestamps |
| `uos` | File operations |
| `base64` | Base64 encoding |
| `json` | JSON parsing |
| `struct` | Binary data handling |
**MicroPython Limitations:**
- No Arrow IPC support (memory constraints)
- Only direct transport (< 1MB threshold enforced)
- Simplified UUID generation
- No async/await (use callbacks or uasyncio)
#### File Server Handler Signatures
```python
# Upload handler - async function
async def fileserver_upload_handler(
fileserver_url: str,
dataname: str,
data: bytes
) -> Dict[str, Any]:
"""
Upload data to file server.
Args:
fileserver_url: Base URL of file server
dataname: Name of the file
data: Binary data
Returns:
Dict with keys: 'status', 'uploadid', 'fileid', 'url'
"""
pass
# Download handler - async function
async def fileserver_download_handler(
url: str,
max_retries: int,
base_delay: int,
max_delay: int,
correlation_id: str
) -> bytes:
"""
Download data from URL with exponential backoff.
Args:
url: URL to download from
max_retries: Maximum retry attempts
base_delay: Initial delay in ms
max_delay: Maximum delay in ms
correlation_id: Correlation ID for logging
Returns:
Downloaded bytes
"""
pass
```
#### Key Functions
```python
# Main send/receive functions (standalone or class methods)
async def smartsend(
subject: str,
data: List[Tuple[str, Any, str]],
broker_url: str = DEFAULT_BROKER_URL,
fileserver_url: str = DEFAULT_FILESERVER_URL,
fileserver_upload_handler: Callable = plik_oneshot_upload,
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
correlation_id: str = None,
msg_purpose: str = "chat",
sender_name: str = "NATSBridge",
receiver_name: str = "",
receiver_id: str = "",
reply_to: str = "",
reply_to_msg_id: str = "",
is_publish: bool = True,
nats_connection: Any = None,
msg_id: str = None,
sender_id: str = None
) -> Tuple[Dict, str]:
"""Send data via NATS."""
pass
async def smartreceive(
msg: Any,
fileserver_download_handler: Callable = fetch_with_backoff,
max_retries: int = 5,
base_delay: int = 100,
max_delay: int = 5000
) -> Dict:
"""Receive and process NATS message."""
pass
# Utility functions
def _serialize_data(data: Any, payload_type: str) -> bytes:
"""Serialize data to bytes."""
pass
def _deserialize_data(data: bytes, payload_type: str) -> Any:
"""Deserialize bytes to data."""
pass
async def fetch_with_backoff(
url: str,
max_retries: int,
base_delay: int,
max_delay: int,
correlation_id: str
) -> bytes:
"""Fetch URL with exponential backoff."""
pass
```
#### Serialization Logic for Tables
```python
# Serialize table data based on payload_type
def serialize_table_data(data: Any, payload_type: str) -> bytes:
if payload_type == "arrowtable":
# Serialize to Apache Arrow IPC format
import pyarrow as pa
import pyarrow.feather as feather
import io
if isinstance(data, pd.DataFrame):
table = pa.Table.from_pandas(data)
buffer = io.BytesIO()
feather.write_feather(table, buffer)
return buffer.getvalue()
else:
raise TypeError("Expected pandas DataFrame for arrowtable")
elif payload_type == "jsontable":
# Serialize to JSON format
if isinstance(data, list) and all(isinstance(row, dict) for row in data):
return json.dumps(data).encode('utf-8')
else:
raise TypeError("Expected list of dicts for jsontable")
else:
raise ValueError(f"Unknown payload_type: {payload_type}")
# Deserialize table data based on payload_type
def deserialize_table_data(data: bytes, payload_type: str) -> Any:
if payload_type == "arrowtable":
# Deserialize from Apache Arrow IPC format
import pyarrow as pa
import pyarrow.feather as feather
import io
buffer = io.BytesIO(data)
table = feather.read_table(buffer)
return table.to_pandas()
elif payload_type == "jsontable":
# Deserialize from JSON format
json_str = data.decode('utf-8')
return json.loads(json_str)
else:
raise ValueError(f"Unknown payload_type: {payload_type}")
```
---
## Platform Comparison Matrix
| Feature | Julia | JavaScript | Python | MicroPython |
|---------|-------|------------|--------|-------------|
| **Multiple Dispatch** | ✅ Native | ❌ (Prototypes) | ❌ (Overload via `@overload`) | ❌ |
| **Async/Await** | ❌ (Tasks) | ✅ Native | ✅ Native | ⚠️ (uasyncio) |
| **Type Safety** | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ |
| **Memory Management** | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) |
| **Arrow IPC** | ✅ Native | ✅ (arrow package) | ✅ (pyarrow) | ❌ |
| **JSON Serialization** | ✅ (JSON.jl) | ✅ (native) | ✅ (json) | ✅ (json) |
| **arrowtable Support** | ✅ | ✅ | ✅ | ❌ |
| **jsontable Support** | ✅ | ✅ | ✅ | ✅ |
| **Direct Transport** | ✅ | ✅ | ✅ | ✅ |
| **Link Transport** | ✅ | ✅ | ✅ | ⚠️ (Limited) |
| **Handler Functions** | ✅ | ✅ | ✅ | ✅ |
| **Cross-Platform API** | ✅ | ✅ | ✅ | ✅ |
---
## Implementation Details by Platform
### Julia: Multiple Dispatch Pattern
```julia
# Function overloading based on argument types
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
# Creates new connection
end
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
# Uses pre-existing connection
end
# Type-specific serialization
function _serialize_data(data::String, payload_type::String)
# Text handling
end
function _serialize_data(data::Dict, payload_type::String)
# Dictionary handling
end
function _serialize_data(data::DataFrame, payload_type::String)
# Table handling - arrowtable
end
function _serialize_data(data::Vector{NamedTuple}, payload_type::String)
# Table handling - jsontable
end
```
### JavaScript: Prototype + Async Pattern
```javascript
// Class-based NATS client
class NATSClient {
constructor(url) {
this.url = url;
}
async connect() {
// Connection logic
}
async publish(subject, message) {
// Publish logic
}
}
// Module-level utility functions
function generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => {
const r = Math.random() * 16 | 0;
return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16);
});
}
async function serializeData(data, payload_type) {
// Serialization logic for arrowtable and jsontable
}
```
### Python: Class-Based Pattern
```python
class NATSBridge:
"""Main bridge class."""
def __init__(self, broker_url=None):
self.broker_url = broker_url or DEFAULT_BROKER_URL
async def smartsend(self, subject, data, **kwargs):
"""Send data."""
pass
async def smartreceive(self, msg, **kwargs):
"""Receive message."""
pass
# Module-level convenience functions
def smartsend(subject, data, **kwargs):
"""Convenience function using default NATSBridge instance."""
bridge = NATSBridge()
return await bridge.smartsend(subject, data, **kwargs)
def smartreceive(msg, **kwargs):
"""Convenience function using default NATSBridge instance."""
bridge = NATSBridge()
return await bridge.smartreceive(msg, **kwargs)
```
---
## Scenario Implementations (Cross-Platform)
### Scenario 1: Command & Control (Small Dictionary)
| Platform | Code |
|----------|------|
| **Julia** | ```julia<br>config = Dict("step_size" => 0.01)<br>env, env_json_str = smartsend("control", [("config", config, "dictionary")])``` |
| **JavaScript** | ```javascript<br>const config = { step_size: 0.01 };<br>[env, env_json_str] = await smartsend("control", [["config", config, "dictionary"]]);``` |
| **Python** | ```python<br>config = {"step_size": 0.01}<br>env, env_json_str = await smartsend("control", [("config", config, "dictionary")])``` |
### Scenario 2: Deep Dive Analysis (Large Arrow Table)
| Platform | Code |
|----------|------|
| **Julia** | ```julia<br>df = DataFrame(id=1:1000000, value=rand(1000000))<br>env, env_json_str = smartsend("analysis", [("table_data", df, "arrowtable")])``` |
| **JavaScript** | ```javascript<br>const df = [{ id: 1, value: 0.5 }, ...];<br>[env, env_json_str] = await smartsend("analysis", [["table_data", df, "arrowtable"]]);``` |
| **Python** | ```python<br>import pandas as pd<br>df = pd.DataFrame({"id": range(1000000), "value": np.random.rand(1000000)})<br>env, env_json_str = await smartsend("analysis", [("table_data", df, "arrowtable")])``` |
### Scenario 3: Chat System (Multi-Payload)
| Platform | Code |
|----------|------|
| **Julia** | ```julia<br>chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]<br>env, env_json_str = smartsend("chat", chat)``` |
| **JavaScript** | ```javascript<br>const chat = [["text", "Hello!", "text"], ["image", imgBuffer, "image"]];<br>[env, env_json_str] = await smartsend("chat", chat);``` |
| **Python** | ```python<br>chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]<br>env, env_json_str = await smartsend("chat", chat)``` |
### Scenario 4: JSON Table Transfer (Cross-Platform)
| Platform | Code |
|----------|------|
| **Julia** | ```julia<br>rows = [Dict("id" => 1, "name" => "Alice"), Dict("id" => 2, "name" => "Bob")]<br>env, env_json_str = smartsend("data", [("users", rows, "jsontable")])``` |
| **JavaScript** | ```javascript<br>const users = [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }];<br>[env, env_json_str] = await smartsend("data", [["users", users, "jsontable"]]);``` |
| **Python** | ```python<br>users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]<br>env, env_json_str = await smartsend("data", [("users", users, "jsontable")])``` |
### Scenario 5: Smart Transport Selection
The `smartsend` function automatically selects the transport method based on payload size:
- **Direct Transport (< 1MB)**: Payload is serialized and embedded directly in the NATS message
- `arrowtable`: Serialized to Arrow IPC, base64 encoded
- `jsontable`: Serialized to JSON, base64 encoded
- `dictionary`: Serialized to JSON, base64 encoded
- `text`: Serialized to UTF-8, base64 encoded
- `image/audio/video/binary`: Base64 encoded
- **Link Transport (>= 1MB)**: Payload is uploaded to HTTP file server, URL embedded in message
- All types supported
- Receiver downloads from URL and deserializes
---
## Performance Considerations (Cross-Platform)
### Zero-Copy Reading
| Platform | Strategy |
|----------|----------|
| **Julia** | `Arrow.read()` with memory-mapped files |
| **JavaScript** | `ArrayBuffer` with `DataView` |
| **Python** | `pyarrow` memory mapping |
| **MicroPython** | Not available (streaming only) |
### Exponential Backoff
```python
# Python/MicroPython
async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id):
delay = base_delay
for attempt in range(1, max_retries + 1):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.read()
except Exception as e:
if attempt < max_retries:
await asyncio.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
raise Exception("Failed to fetch after max retries")
```
### Correlation ID Logging
All platforms use correlation IDs for distributed tracing:
```
[timestamp] [Correlation: abc123] Message published to subject
```
### Serialization Performance Comparison
| Format | Use Case | Pros | Cons |
|--------|----------|------|------|
| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library |
| `jsontable` | Small/medium tabular data | Human-readable, universal support | Slower, larger size, no schema |
---
## Testing Strategy (Cross-Platform)
### Unit Tests
| Test Type | Julia | JavaScript | Python |
|-----------|-------|------------|--------|
| **Serialization** | `test/test_julia_text_sender.jl` | `test/test_js_text_sender.js` | `test/test_py_text_sender.py` |
| **Deserialization** | `test/test_julia_text_receiver.jl` | `test/test_js_text_receiver.js` | `test/test_py_text_receiver.py` |
| **Large Payload** | `test/test_julia_file_sender.jl` | `test/test_js_file_sender.js` | `test/test_py_file_sender.py` |
| **Multi-Payload** | `test/test_julia_mix_payloads_sender.jl` | `test/test_js_mix_payloads_sender.js` | `test/test_py_mix_payloads_sender.py` |
| **Arrow Table** | `test/test_julia_table_sender.jl` | `test/test_js_table_sender.js` | `test/test_py_table_sender.py` |
### Integration Tests
- NATS server communication
- File server upload/download
- Cross-platform message exchange
- Arrow table serialization/deserialization
- JSON table serialization/deserialization
---
## Configuration
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `NATS_URL` | `nats://localhost:4222` | NATS server URL |
| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL |
| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) |
### MicroPython-Specific Configuration
```python
# micropython.conf
NATS_URL = "nats://broker.local:4222"
FILESERVER_URL = "http://fileserver.local:8080"
SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices
MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython
```
---
## Summary
This cross-platform NATS bridge provides:
1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across Julia, JavaScript, and Python/MicroPython
2. **Idiomatic Implementations**:
- Julia: Multiple dispatch and struct-based design
- JavaScript: Async/await and prototype-based utilities
- Python: Class-based design with type hints
3. **Message Format Consistency**: Identical `msg_envelope_v1` and `msg_payload_v1` JSON schemas
4. **Handler Abstraction**: File server operations abstracted through configurable handlers
5. **Platform-Specific Optimizations**:
- **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data
- **JSON** (`jsontable`): Universal human-readable format for smaller tables
- Streaming support in MicroPython
The Julia implementation serves as the **ground truth** for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms.
### Datatype Summary
| Datatype | Serialization | Use Case | Encoding |
|----------|---------------|----------|----------|
| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` |
| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` |