1323 lines
41 KiB
Markdown
1323 lines
41 KiB
Markdown
# Cross-Platform Architecture Documentation: Bi-Directional Data Bridge
|
|
|
|
## Overview
|
|
|
|
This document describes the architecture for a high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language.
|
|
|
|
**Supported Platforms:**
|
|
- **Julia** - Ground truth implementation with full feature set
|
|
- **JavaScript** - Node.js and browser-compatible implementation
|
|
- **Python/MicroPython** - Desktop and embedded-compatible implementation
|
|
|
|
### Cross-Platform Design Principles
|
|
|
|
1. **High-Level API Parity**: All three platforms expose the same `smartsend()` and `smartreceive()` functions with identical signatures and behavior
|
|
2. **Idiomatic Implementations**: Each platform uses its native patterns (multiple dispatch in Julia, async/prototype in JS, class-based in Python)
|
|
3. **Message Format Consistency**: The `msg_envelope_v1` and `msg_payload_v1` JSON schemas are identical across all platforms
|
|
4. **Handler Function Abstraction**: File server operations are abstracted through handler functions for backend flexibility
|
|
|
|
---
|
|
|
|
## High-Level API Standard (Cross-Platform)
|
|
|
|
### Unified API Signature
|
|
|
|
All three platforms expose the same high-level API:
|
|
|
|
**Input Format (smartsend):**
|
|
```
|
|
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
|
```
|
|
|
|
**Output Format (smartreceive):**
|
|
```
|
|
{
|
|
"correlation_id": "...",
|
|
"msg_id": "...",
|
|
"timestamp": "...",
|
|
"send_to": "...",
|
|
"msg_purpose": "...",
|
|
"sender_name": "...",
|
|
"sender_id": "...",
|
|
"receiver_name": "...",
|
|
"receiver_id": "...",
|
|
"reply_to": "...",
|
|
"reply_to_msg_id": "...",
|
|
"broker_url": "...",
|
|
"metadata": {...},
|
|
"payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
|
}
|
|
```
|
|
|
|
### Supported Payload Types
|
|
|
|
| Type | Julia | JavaScript | Python/MicroPython |
|
|
|------|-------|------------|-------------------|
|
|
| `text` | `String` | `string` | `str` |
|
|
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` |
|
|
| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array<Object>` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) |
|
|
| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array<Object>` | `list[dict]`, `list` |
|
|
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` |
|
|
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` |
|
|
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` |
|
|
| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray`, `io.BytesIO` |
|
|
|
|
### Cross-Platform API Examples
|
|
|
|
**Julia:**
|
|
```julia
|
|
using NATSBridge
|
|
|
|
# Send
|
|
env, env_json_str = smartsend(
|
|
"/chat",
|
|
[("message", "Hello!", "text"), ("image", image_bytes, "image")],
|
|
broker_url="nats://localhost:4222"
|
|
)
|
|
|
|
# Receive - returns JSON.Object{String, Any}
|
|
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
|
|
# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}}
|
|
# Access payloads: for (dataname, data, type) in env["payloads]
|
|
```
|
|
|
|
**JavaScript:**
|
|
```javascript
|
|
const NATSBridge = require('natsbridge');
|
|
|
|
// Send
|
|
const [env, env_json_str] = await NATSBridge.smartsend(
|
|
"/chat",
|
|
[
|
|
["message", "Hello!", "text"],
|
|
["image", imageBuffer, "image"]
|
|
],
|
|
{ broker_url: "nats://localhost:4222" }
|
|
);
|
|
|
|
// Receive - returns Promise<object>
|
|
const env = await NATSBridge.smartreceive(msg, {
|
|
fileserver_download_handler: fetchWithBackoff
|
|
});
|
|
// env is an object with "payloads" field containing Array of arrays
|
|
// Access payloads: for (const [dataname, data, type] of env.payloads)
|
|
```
|
|
|
|
**Python:**
|
|
```python
|
|
from natsbridge import NATSBridge
|
|
|
|
# Send
|
|
env, env_json_str = NATSBridge.smartsend(
|
|
"/chat",
|
|
[("message", "Hello!", "text"), ("image", image_bytes, "image")],
|
|
broker_url="nats://localhost:4222"
|
|
)
|
|
|
|
# Receive - returns Tuple[Dict, str]
|
|
env = NATSBridge.smartreceive(
|
|
msg,
|
|
fileserver_download_handler=fetch_with_backoff
|
|
)
|
|
# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
|
|
# Access payloads: for dataname, data, type_ in env["payloads"]
|
|
```
|
|
|
|
**MicroPython:**
|
|
```python
|
|
from natsbridge import NATSBridge
|
|
|
|
# Send (limited to direct transport due to memory constraints)
|
|
env, env_json_str = NATSBridge.smartsend(
|
|
"/chat",
|
|
[("message", "Hello!", "text")],
|
|
broker_url="nats://localhost:4222"
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## Architecture Diagram (Cross-Platform)
|
|
|
|
```mermaid
|
|
flowchart TB
|
|
subgraph JuliaApp["Julia Application"]
|
|
JuliaAppCode[App Code]
|
|
JuliaBridge[NATSBridge.jl]
|
|
JuliaNATS[<b>NATS.jl</b>]
|
|
end
|
|
|
|
subgraph JSApp["JavaScript Application"]
|
|
JSAppCode[App Code]
|
|
JSBridge[NATSBridge.js]
|
|
JSNATS[<b>nats.js</b>]
|
|
end
|
|
|
|
subgraph PythonApp["Python/MicroPython Application"]
|
|
PythonAppCode[App Code]
|
|
PythonBridge[NATSBridge.py]
|
|
PythonNATS[<b>nats.py</b>]
|
|
end
|
|
|
|
subgraph Infrastructure["Infrastructure"]
|
|
NATS[<b>NATS Server</b><br/>Message Broker]
|
|
FileServer[<b>HTTP File Server</b><br/>Upload/Download]
|
|
end
|
|
|
|
JuliaAppCode --> JuliaBridge
|
|
JuliaBridge --> JuliaNATS
|
|
JSAppCode --> JSBridge
|
|
JSBridge --> JSNATS
|
|
PythonAppCode --> PythonBridge
|
|
PythonBridge --> PythonNATS
|
|
|
|
JuliaNATS --> NATS
|
|
JSNATS --> NATS
|
|
PythonNATS --> NATS
|
|
|
|
NATS --> JuliaNATS
|
|
NATS --> JSNATS
|
|
NATS --> PythonNATS
|
|
|
|
JuliaBridge -.->|HTTP POST upload| FileServer
|
|
JSBridge -.->|HTTP POST upload| FileServer
|
|
PythonBridge -.->|HTTP POST upload| FileServer
|
|
|
|
FileServer -.->|HTTP GET download| JuliaBridge
|
|
FileServer -.->|HTTP GET download| JSBridge
|
|
FileServer -.->|HTTP GET download| PythonBridge
|
|
|
|
style JuliaApp fill:#c5e1a5
|
|
style JSApp fill:#bbdefb
|
|
style PythonApp fill:#f8bbd0
|
|
style NATS fill:#fff3e0
|
|
style FileServer fill:#f3e5f5
|
|
```
|
|
|
|
---
|
|
|
|
## System Components
|
|
|
|
### 1. msg_envelope_v1 - Message Envelope
|
|
|
|
**JSON Schema (Identical Across All Platforms):**
|
|
```json
|
|
{
|
|
"correlation_id": "uuid-v4-string",
|
|
"msg_id": "uuid-v4-string",
|
|
"timestamp": "2024-01-15T10:30:00Z",
|
|
|
|
"send_to": "topic/subject",
|
|
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
|
|
"sender_name": "agent-wine-web-frontend",
|
|
"sender_id": "uuid4",
|
|
"receiver_name": "agent-backend",
|
|
"receiver_id": "uuid4",
|
|
"reply_to": "topic",
|
|
"reply_to_msg_id": "uuid4",
|
|
"broker_url": "nats://localhost:4222",
|
|
|
|
"metadata": {
|
|
"content_type": "application/octet-stream",
|
|
"content_length": 123456
|
|
},
|
|
|
|
"payloads": [
|
|
{
|
|
"id": "uuid4",
|
|
"dataname": "login_image",
|
|
"payload_type": "image",
|
|
"transport": "direct",
|
|
"encoding": "base64",
|
|
"size": 15433,
|
|
"data": "base64-encoded-string",
|
|
"metadata": {
|
|
"checksum": "sha256_hash"
|
|
}
|
|
},
|
|
{
|
|
"id": "uuid4",
|
|
"dataname": "large_arrow_table",
|
|
"payload_type": "arrowtable",
|
|
"transport": "link",
|
|
"encoding": "arrow-ipc",
|
|
"size": 524288,
|
|
"data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow",
|
|
"metadata": {}
|
|
},
|
|
{
|
|
"id": "uuid4",
|
|
"dataname": "json_table",
|
|
"payload_type": "jsontable",
|
|
"transport": "direct",
|
|
"encoding": "json",
|
|
"size": 1024,
|
|
"data": "[{\"id\": 1, \"name\": \"Alice\"}, {\"id\": 2, \"name\": \"Bob\"}]",
|
|
"metadata": {}
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
### 2. msg_payload_v1 - Payload Structure
|
|
|
|
**JSON Schema (Identical Across All Platforms):**
|
|
```json
|
|
{
|
|
"id": "uuid4",
|
|
"dataname": "login_image",
|
|
"payload_type": "image | dictionary | arrowtable | jsontable | text | audio | video | binary",
|
|
"transport": "direct | link",
|
|
"encoding": "none | json | base64 | arrow-ipc",
|
|
"size": 15433,
|
|
"data": "base64-encoded-string | http-url | json-string",
|
|
"metadata": {
|
|
"checksum": "sha256_hash"
|
|
}
|
|
}
|
|
```
|
|
|
|
### 3. Transport Strategy Decision Logic (Cross-Platform)
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ smartsend Function (All Platforms) │
|
|
│ Accepts: [(dataname1, data1, type1), ...] │
|
|
│ (Type is per payload, not standalone) │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ For each payload: │
|
|
│ 1. Extract type from tuple/array │
|
|
│ 2. Serialize based on type │
|
|
│ 3. Check payload size │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
┌───────────┴────────────┐
|
|
▼ ▼
|
|
┌──────────────┐ ┌──────────────┐
|
|
│ Direct Path │ │ Link Path │
|
|
│ (< 1MB) │ │ (>= 1MB) │
|
|
│ │ │ │
|
|
│ • Serialize │ │ • Serialize │
|
|
│ to buffer │ │ to buffer │
|
|
│ • Base64/JSON│ │ • Upload to │
|
|
│ encode │ │ HTTP Server│
|
|
│ • Publish to │ │ • Publish to │
|
|
│ NATS │ │ NATS with │
|
|
│ (in msg) │ │ URL │
|
|
└──────────────┘ └──────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## Platform-Specific Implementations
|
|
|
|
### Julia Implementation
|
|
|
|
#### Architecture Patterns
|
|
|
|
**Multiple Dispatch:** Julia's core strength is leveraged through function overloading:
|
|
|
|
```julia
|
|
# publish_message has two overloads based on argument types
|
|
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
|
|
conn = NATS.connect(broker_url)
|
|
publish_message(conn, subject, message, correlation_id)
|
|
end
|
|
|
|
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
|
|
try
|
|
NATS.publish(conn, subject, message)
|
|
log_trace(correlation_id, "Message published to $subject")
|
|
finally
|
|
NATS.drain(conn)
|
|
end
|
|
end
|
|
```
|
|
|
|
**Struct-Based Data Models:**
|
|
```julia
|
|
struct msg_payload_v1
|
|
id::String
|
|
dataname::String
|
|
payload_type::String
|
|
transport::String
|
|
encoding::String
|
|
size::Integer
|
|
data::Any
|
|
metadata::Dict{String, Any}
|
|
end
|
|
|
|
struct msg_envelope_v1
|
|
correlation_id::String
|
|
msg_id::String
|
|
timestamp::String
|
|
send_to::String
|
|
msg_purpose::String
|
|
sender_name::String
|
|
sender_id::String
|
|
receiver_name::String
|
|
receiver_id::String
|
|
reply_to::String
|
|
reply_to_msg_id::String
|
|
broker_url::String
|
|
metadata::Dict{String, Any}
|
|
payloads::Vector{msg_payload_v1}
|
|
end
|
|
```
|
|
|
|
#### Dependencies
|
|
|
|
| Package | Purpose |
|
|
|---------|---------|
|
|
| `NATS.jl` | Core NATS functionality |
|
|
| `Arrow.jl` | Arrow IPC serialization |
|
|
| `JSON.jl` | JSON parsing |
|
|
| `HTTP.jl` | HTTP client for file server |
|
|
| `UUIDs.jl` | UUID generation |
|
|
| `Dates.jl` | Timestamps |
|
|
| `Base64` | Base64 encoding |
|
|
|
|
#### File Server Handler Signatures
|
|
|
|
```julia
|
|
# Upload handler
|
|
fileserver_upload_handler(
|
|
fileserver_url::String,
|
|
dataname::String,
|
|
data::Vector{UInt8}
|
|
)::Dict{String, Any}
|
|
|
|
# Download handler
|
|
fileserver_download_handler(
|
|
url::String,
|
|
max_retries::Int,
|
|
base_delay::Int,
|
|
max_delay::Int,
|
|
correlation_id::String
|
|
)::Vector{UInt8}
|
|
```
|
|
|
|
#### Key Functions
|
|
|
|
```julia
|
|
# Main send/receive functions
|
|
function smartsend(
|
|
subject::String,
|
|
data::AbstractArray{Tuple{String, Any, String}, 1};
|
|
broker_url::String = DEFAULT_BROKER_URL,
|
|
fileserver_url = DEFAULT_FILESERVER_URL,
|
|
fileserver_upload_handler::Function = plik_oneshot_upload,
|
|
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
|
correlation_id::String = string(uuid4()),
|
|
msg_purpose::String = "chat",
|
|
sender_name::String = "NATSBridge",
|
|
receiver_name::String = "",
|
|
receiver_id::String = "",
|
|
reply_to::String = "",
|
|
reply_to_msg_id::String = "",
|
|
is_publish::Bool = true,
|
|
NATS_connection::Union{NATS.Connection, Nothing} = nothing,
|
|
msg_id::String = string(uuid4()),
|
|
sender_id::String = string(uuid4())
|
|
)::Tuple{msg_envelope_v1, String}
|
|
|
|
function smartreceive(
|
|
msg::NATS.Msg;
|
|
fileserver_download_handler::Function = _fetch_with_backoff,
|
|
max_retries::Int = 5,
|
|
base_delay::Int = 100,
|
|
max_delay::Int = 5000
|
|
)::JSON.Object{String, Any}
|
|
```
|
|
|
|
#### Serialization Logic for Tables
|
|
|
|
```julia
|
|
# Serialize table data based on payload_type
|
|
function _serialize_table_data(data::Any, payload_type::String)::Vector{UInt8}
|
|
if payload_type == "arrowtable"
|
|
# Serialize to Apache Arrow IPC format
|
|
buffer = IOBuffer()
|
|
Arrow.write(buffer, data)
|
|
return take!(buffer)
|
|
elseif payload_type == "jsontable"
|
|
# Serialize to JSON format
|
|
json_str = JSON.json(data)
|
|
return Vector{UInt8}(json_str)
|
|
else
|
|
throw(ArgumentError("Unknown payload_type: $payload_type"))
|
|
end
|
|
end
|
|
|
|
# Deserialize table data based on payload_type
|
|
function _deserialize_table_data(data::Vector{UInt8}, payload_type::String)::Any
|
|
if payload_type == "arrowtable"
|
|
# Deserialize from Apache Arrow IPC format
|
|
buffer = Buffer(data)
|
|
return Arrow.read(buffer)
|
|
elseif payload_type == "jsontable"
|
|
# Deserialize from JSON format
|
|
json_str = String(data)
|
|
return JSON.parse(json_str)
|
|
else
|
|
throw(ArgumentError("Unknown payload_type: $payload_type"))
|
|
end
|
|
end
|
|
```
|
|
|
|
---
|
|
|
|
### JavaScript Implementation
|
|
|
|
#### Architecture Patterns
|
|
|
|
**Async/Await Pattern:** JavaScript uses async/await for non-blocking I/O:
|
|
|
|
```javascript
|
|
// smartsend is async and returns a Promise
|
|
async function smartsend(subject, data, options = {}) {
|
|
const {
|
|
broker_url = DEFAULT_BROKER_URL,
|
|
fileserver_url = DEFAULT_FILESERVER_URL,
|
|
fileserver_upload_handler = plikOneshotUpload,
|
|
size_threshold = DEFAULT_SIZE_THRESHOLD,
|
|
correlation_id = generateUUID(),
|
|
msg_purpose = "chat",
|
|
sender_name = "NATSBridge",
|
|
receiver_name = "",
|
|
receiver_id = "",
|
|
reply_to = "",
|
|
reply_to_msg_id = "",
|
|
is_publish = true,
|
|
nats_connection = null,
|
|
msg_id = generateUUID(),
|
|
sender_id = generateUUID()
|
|
} = options;
|
|
|
|
// Process payloads
|
|
const payloads = [];
|
|
for (const [dataname, payloadData, payloadType] of data) {
|
|
const payloadBytes = await serializeData(payloadData, payloadType);
|
|
const payloadSize = payloadBytes.byteLength;
|
|
|
|
if (payloadSize < size_threshold) {
|
|
// Direct path
|
|
const payloadB64 = base64Encode(payloadBytes);
|
|
payloads.push({
|
|
id: generateUUID(),
|
|
dataname,
|
|
payload_type: payloadType,
|
|
transport: "direct",
|
|
encoding: "base64",
|
|
size: payloadSize,
|
|
data: payloadB64
|
|
});
|
|
} else {
|
|
// Link path
|
|
const response = await fileserver_upload_handler(
|
|
fileserver_url, dataname, payloadBytes
|
|
);
|
|
payloads.push({
|
|
id: generateUUID(),
|
|
dataname,
|
|
payload_type: payloadType,
|
|
transport: "link",
|
|
encoding: "none",
|
|
size: payloadSize,
|
|
data: response.url
|
|
});
|
|
}
|
|
}
|
|
|
|
const env = buildEnvelope(subject, payloads, {
|
|
correlation_id, msg_id, msg_purpose,
|
|
sender_name, sender_id, receiver_name,
|
|
receiver_id, reply_to, reply_to_msg_id,
|
|
broker_url
|
|
});
|
|
|
|
const env_json_str = JSON.stringify(env);
|
|
|
|
if (is_publish) {
|
|
if (nats_connection) {
|
|
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
|
|
} else {
|
|
await publishMessage(broker_url, subject, env_json_str, correlation_id);
|
|
}
|
|
}
|
|
|
|
return [env, env_json_str];
|
|
}
|
|
```
|
|
|
|
**Prototype-Based Utilities:**
|
|
```javascript
|
|
// NATS client wrapper (prototype-based)
|
|
class NATSClient {
|
|
constructor(url) {
|
|
this.url = url;
|
|
this.connection = null;
|
|
}
|
|
|
|
async connect() {
|
|
this.connection = await nats.connect({ servers: this.url });
|
|
return this.connection;
|
|
}
|
|
|
|
async publish(subject, message) {
|
|
if (!this.connection) {
|
|
await this.connect();
|
|
}
|
|
await this.connection.publish(subject, message);
|
|
}
|
|
|
|
async close() {
|
|
if (this.connection) {
|
|
this.connection.close();
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
#### Dependencies (Node.js)
|
|
|
|
| Package | Purpose |
|
|
|---------|---------|
|
|
| `nats` | Core NATS functionality (nats.js) |
|
|
| `crypto` (built-in) | UUID generation (Node.js) |
|
|
| `node-fetch` or `axios` | HTTP client for file server |
|
|
| `apache-arrow` | Arrow IPC serialization |
|
|
|
|
#### Dependencies (Browser)
|
|
|
|
| Package | Purpose |
|
|
|---------|---------|
|
|
| `nats` | Browser-compatible NATS client |
|
|
| `crypto` (built-in) | UUID generation (browser) |
|
|
| `fetch` (native) | HTTP client for file server |
|
|
| `apache-arrow` | Arrow IPC serialization |
|
|
|
|
#### Dependencies (MicroPython)
|
|
|
|
| Module | Purpose |
|
|
|--------|---------|
|
|
| `nats` (custom) | MicroPython NATS client |
|
|
| `time` | Timestamps |
|
|
| `uos` | File operations |
|
|
| `base64` | Base64 encoding |
|
|
|
|
#### File Server Handler Signatures
|
|
|
|
```javascript
|
|
// Upload handler - async function returning Promise
|
|
async function fileserver_upload_handler(
|
|
fileserver_url,
|
|
dataname,
|
|
data // Uint8Array
|
|
) {
|
|
// Returns: { status, uploadid, fileid, url }
|
|
}
|
|
|
|
// Download handler - async function returning Promise
|
|
async function fileserver_download_handler(
|
|
url,
|
|
max_retries,
|
|
base_delay,
|
|
max_delay,
|
|
correlation_id
|
|
) {
|
|
// Returns: Uint8Array
|
|
}
|
|
```
|
|
|
|
#### Key Functions
|
|
|
|
```javascript
|
|
// Main send/receive functions
|
|
async function smartsend(subject, data, options = {}) {
|
|
// data: Array of [dataname, data, type] tuples
|
|
// Returns: Promise<[env, env_json_str]>
|
|
}
|
|
|
|
async function smartreceive(msg, options = {}) {
|
|
// msg: NATS message object
|
|
// Returns: Promise<env_object>
|
|
}
|
|
|
|
// Utility functions
|
|
async function serializeData(data, payload_type) {
|
|
// Returns: Uint8Array
|
|
}
|
|
|
|
async function deserializeData(data, payload_type) {
|
|
// Returns: deserialized data
|
|
}
|
|
|
|
async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) {
|
|
// Returns: Uint8Array
|
|
}
|
|
```
|
|
|
|
#### Serialization Logic for Tables
|
|
|
|
```javascript
|
|
// Serialize table data based on payload_type
|
|
async function serializeTableData(data, payload_type) {
|
|
if (payload_type === "arrowtable") {
|
|
// Serialize to Apache Arrow IPC format
|
|
const schema = new arrow.Schema([...]); // Define schema
|
|
const arr = arrow.tableToArrowTable(data, schema);
|
|
const buffer = arrow.RecordBatch.from(arr).toBuffer();
|
|
return new Uint8Array(buffer);
|
|
} else if (payload_type === "jsontable") {
|
|
// Serialize to JSON format
|
|
const jsonStr = JSON.stringify(data);
|
|
return new TextEncoder().encode(jsonStr);
|
|
} else {
|
|
throw new Error(`Unknown payload_type: ${payload_type}`);
|
|
}
|
|
}
|
|
|
|
// Deserialize table data based on payload_type
|
|
async function deserializeTableData(data, payload_type) {
|
|
if (payload_type === "arrowtable") {
|
|
// Deserialize from Apache Arrow IPC format
|
|
const buffer = arrow.arrayBufferToBuffer(data.buffer);
|
|
const batch = arrow.RecordBatch.deserialize(buffer);
|
|
return arrow.tableFromBatch(batch);
|
|
} else if (payload_type === "jsontable") {
|
|
// Deserialize from JSON format
|
|
const jsonStr = new TextDecoder().decode(data);
|
|
return JSON.parse(jsonStr);
|
|
} else {
|
|
throw new Error(`Unknown payload_type: ${payload_type}`);
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
### Python/MicroPython Implementation
|
|
|
|
#### Architecture Patterns
|
|
|
|
**Class-Based Design:** Python uses classes for stateful operations:
|
|
|
|
```python
|
|
class NATSBridge:
|
|
"""Cross-platform NATS bridge implementation."""
|
|
|
|
DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB
|
|
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
|
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
|
|
|
def __init__(self, broker_url=None, fileserver_url=None):
|
|
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
|
|
self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL
|
|
self._nats_client = None
|
|
|
|
async def smartsend(self, subject, data, **kwargs):
|
|
"""
|
|
Send data via NATS with automatic transport selection.
|
|
|
|
Args:
|
|
subject: NATS subject to publish to
|
|
data: List of (dataname, data, type) tuples
|
|
**kwargs: Additional options (broker_url, fileserver_url, etc.)
|
|
|
|
Returns:
|
|
Tuple of (env, env_json_str)
|
|
"""
|
|
# Extract options with defaults
|
|
options = self._merge_options(kwargs)
|
|
|
|
# Process payloads
|
|
payloads = []
|
|
for dataname, payload_data, payload_type in data:
|
|
payload_bytes = self._serialize_data(payload_data, payload_type)
|
|
payload_size = len(payload_bytes)
|
|
|
|
if payload_size < options['size_threshold']:
|
|
# Direct path
|
|
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
|
|
payloads.append({
|
|
'id': uuid.uuid4().hex,
|
|
'dataname': dataname,
|
|
'payload_type': payload_type,
|
|
'transport': 'direct',
|
|
'encoding': 'base64',
|
|
'size': payload_size,
|
|
'data': payload_b64
|
|
})
|
|
else:
|
|
# Link path
|
|
response = await options['fileserver_upload_handler'](
|
|
options['fileserver_url'], dataname, payload_bytes
|
|
)
|
|
payloads.append({
|
|
'id': uuid.uuid4().hex,
|
|
'dataname': dataname,
|
|
'payload_type': payload_type,
|
|
'transport': 'link',
|
|
'encoding': 'none',
|
|
'size': payload_size,
|
|
'data': response['url']
|
|
})
|
|
|
|
# Build envelope
|
|
env = self._build_envelope(subject, payloads, options)
|
|
env_json_str = json.dumps(env)
|
|
|
|
if options['is_publish']:
|
|
await self._publish_message(
|
|
subject, env_json_str, options['correlation_id'],
|
|
nats_connection=options.get('nats_connection')
|
|
)
|
|
|
|
return env, env_json_str
|
|
|
|
async def smartreceive(self, msg, **kwargs):
|
|
"""
|
|
Receive and process NATS message.
|
|
|
|
Args:
|
|
msg: NATS message object
|
|
**kwargs: Additional options (fileserver_download_handler, etc.)
|
|
|
|
Returns:
|
|
Dict with envelope metadata and payloads
|
|
"""
|
|
# Parse envelope
|
|
env_json_obj = json.loads(msg.payload)
|
|
|
|
# Process payloads
|
|
payloads_list = []
|
|
for payload in env_json_obj['payloads']:
|
|
transport = payload['transport']
|
|
dataname = payload['dataname']
|
|
|
|
if transport == 'direct':
|
|
payload_b64 = payload['data']
|
|
payload_bytes = base64.b64decode(payload_b64)
|
|
data_type = payload['payload_type']
|
|
data = self._deserialize_data(payload_bytes, data_type)
|
|
payloads_list.append((dataname, data, data_type))
|
|
elif transport == 'link':
|
|
url = payload['data']
|
|
downloaded_data = await options['fileserver_download_handler'](
|
|
url,
|
|
options['max_retries'],
|
|
options['base_delay'],
|
|
options['max_delay'],
|
|
env_json_obj['correlation_id']
|
|
)
|
|
data_type = payload['payload_type']
|
|
data = self._deserialize_data(downloaded_data, data_type)
|
|
payloads_list.append((dataname, data, data_type))
|
|
|
|
env_json_obj['payloads'] = payloads_list
|
|
return env_json_obj
|
|
```
|
|
|
|
**Dataclass for Type Safety:**
|
|
```python
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Tuple, Union
|
|
|
|
@dataclass
|
|
class MsgPayloadV1:
|
|
"""Message payload structure."""
|
|
id: str
|
|
dataname: str
|
|
payload_type: str
|
|
transport: str
|
|
encoding: str
|
|
size: int
|
|
data: Union[str, bytes] # URL for link, base64 for direct
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
@dataclass
|
|
class MsgEnvelopeV1:
|
|
"""Message envelope structure."""
|
|
correlation_id: str
|
|
msg_id: str
|
|
timestamp: str
|
|
send_to: str
|
|
msg_purpose: str
|
|
sender_name: str
|
|
sender_id: str
|
|
receiver_name: str
|
|
receiver_id: str
|
|
reply_to: str
|
|
reply_to_msg_id: str
|
|
broker_url: str
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
payloads: List[MsgPayloadV1] = field(default_factory=list)
|
|
```
|
|
|
|
#### Dependencies (Desktop Python)
|
|
|
|
| Package | Purpose |
|
|
|---------|---------|
|
|
| `nats-py` | Core NATS functionality |
|
|
| `uuid` | UUID generation (stdlib) |
|
|
| `aiohttp` or `requests` | HTTP client for file server |
|
|
| `pyarrow` | Arrow IPC serialization |
|
|
| `pandas` | DataFrame support (optional) |
|
|
| `python-dateutil` | Timestamps |
|
|
| `base64` | Base64 encoding (stdlib) |
|
|
|
|
#### Dependencies (MicroPython)
|
|
|
|
| Module | Purpose |
|
|
|--------|---------|
|
|
| `network` | NATS connection (custom) |
|
|
| `time` | Timestamps |
|
|
| `uos` | File operations |
|
|
| `base64` | Base64 encoding |
|
|
| `json` | JSON parsing |
|
|
| `struct` | Binary data handling |
|
|
|
|
**MicroPython Limitations:**
|
|
- No Arrow IPC support (memory constraints)
|
|
- Only direct transport (< 1MB threshold enforced)
|
|
- Simplified UUID generation
|
|
- No async/await (use callbacks or uasyncio)
|
|
|
|
#### File Server Handler Signatures
|
|
|
|
```python
|
|
# Upload handler - async function
|
|
async def fileserver_upload_handler(
|
|
fileserver_url: str,
|
|
dataname: str,
|
|
data: bytes
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Upload data to file server.
|
|
|
|
Args:
|
|
fileserver_url: Base URL of file server
|
|
dataname: Name of the file
|
|
data: Binary data
|
|
|
|
Returns:
|
|
Dict with keys: 'status', 'uploadid', 'fileid', 'url'
|
|
"""
|
|
pass
|
|
|
|
# Download handler - async function
|
|
async def fileserver_download_handler(
|
|
url: str,
|
|
max_retries: int,
|
|
base_delay: int,
|
|
max_delay: int,
|
|
correlation_id: str
|
|
) -> bytes:
|
|
"""
|
|
Download data from URL with exponential backoff.
|
|
|
|
Args:
|
|
url: URL to download from
|
|
max_retries: Maximum retry attempts
|
|
base_delay: Initial delay in ms
|
|
max_delay: Maximum delay in ms
|
|
correlation_id: Correlation ID for logging
|
|
|
|
Returns:
|
|
Downloaded bytes
|
|
"""
|
|
pass
|
|
```
|
|
|
|
#### Key Functions
|
|
|
|
```python
|
|
# Main send/receive functions (standalone or class methods)
|
|
async def smartsend(
|
|
subject: str,
|
|
data: List[Tuple[str, Any, str]],
|
|
broker_url: str = DEFAULT_BROKER_URL,
|
|
fileserver_url: str = DEFAULT_FILESERVER_URL,
|
|
fileserver_upload_handler: Callable = plik_oneshot_upload,
|
|
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
|
|
correlation_id: str = None,
|
|
msg_purpose: str = "chat",
|
|
sender_name: str = "NATSBridge",
|
|
receiver_name: str = "",
|
|
receiver_id: str = "",
|
|
reply_to: str = "",
|
|
reply_to_msg_id: str = "",
|
|
is_publish: bool = True,
|
|
nats_connection: Any = None,
|
|
msg_id: str = None,
|
|
sender_id: str = None
|
|
) -> Tuple[Dict, str]:
|
|
"""Send data via NATS."""
|
|
pass
|
|
|
|
async def smartreceive(
|
|
msg: Any,
|
|
fileserver_download_handler: Callable = fetch_with_backoff,
|
|
max_retries: int = 5,
|
|
base_delay: int = 100,
|
|
max_delay: int = 5000
|
|
) -> Dict:
|
|
"""Receive and process NATS message."""
|
|
pass
|
|
|
|
# Utility functions
|
|
def _serialize_data(data: Any, payload_type: str) -> bytes:
|
|
"""Serialize data to bytes."""
|
|
pass
|
|
|
|
def _deserialize_data(data: bytes, payload_type: str) -> Any:
|
|
"""Deserialize bytes to data."""
|
|
pass
|
|
|
|
async def fetch_with_backoff(
|
|
url: str,
|
|
max_retries: int,
|
|
base_delay: int,
|
|
max_delay: int,
|
|
correlation_id: str
|
|
) -> bytes:
|
|
"""Fetch URL with exponential backoff."""
|
|
pass
|
|
```
|
|
|
|
#### Serialization Logic for Tables
|
|
|
|
```python
|
|
# Serialize table data based on payload_type
|
|
def serialize_table_data(data: Any, payload_type: str) -> bytes:
|
|
if payload_type == "arrowtable":
|
|
# Serialize to Apache Arrow IPC format
|
|
import pyarrow as pa
|
|
import pyarrow.feather as feather
|
|
import io
|
|
|
|
if isinstance(data, pd.DataFrame):
|
|
table = pa.Table.from_pandas(data)
|
|
buffer = io.BytesIO()
|
|
feather.write_feather(table, buffer)
|
|
return buffer.getvalue()
|
|
else:
|
|
raise TypeError("Expected pandas DataFrame for arrowtable")
|
|
|
|
elif payload_type == "jsontable":
|
|
# Serialize to JSON format
|
|
if isinstance(data, list) and all(isinstance(row, dict) for row in data):
|
|
return json.dumps(data).encode('utf-8')
|
|
else:
|
|
raise TypeError("Expected list of dicts for jsontable")
|
|
|
|
else:
|
|
raise ValueError(f"Unknown payload_type: {payload_type}")
|
|
|
|
# Deserialize table data based on payload_type
|
|
def deserialize_table_data(data: bytes, payload_type: str) -> Any:
|
|
if payload_type == "arrowtable":
|
|
# Deserialize from Apache Arrow IPC format
|
|
import pyarrow as pa
|
|
import pyarrow.feather as feather
|
|
import io
|
|
|
|
buffer = io.BytesIO(data)
|
|
table = feather.read_table(buffer)
|
|
return table.to_pandas()
|
|
|
|
elif payload_type == "jsontable":
|
|
# Deserialize from JSON format
|
|
json_str = data.decode('utf-8')
|
|
return json.loads(json_str)
|
|
|
|
else:
|
|
raise ValueError(f"Unknown payload_type: {payload_type}")
|
|
```
|
|
|
|
---
|
|
|
|
## Platform Comparison Matrix
|
|
|
|
| Feature | Julia | JavaScript | Python | MicroPython |
|
|
|---------|-------|------------|--------|-------------|
|
|
| **Multiple Dispatch** | ✅ Native | ❌ (Prototypes) | ❌ (Overload via `@overload`) | ❌ |
|
|
| **Async/Await** | ❌ (Tasks) | ✅ Native | ✅ Native | ⚠️ (uasyncio) |
|
|
| **Type Safety** | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ |
|
|
| **Memory Management** | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) |
|
|
| **Arrow IPC** | ✅ Native | ✅ (arrow package) | ✅ (pyarrow) | ❌ |
|
|
| **JSON Serialization** | ✅ (JSON.jl) | ✅ (native) | ✅ (json) | ✅ (json) |
|
|
| **arrowtable Support** | ✅ | ✅ | ✅ | ❌ |
|
|
| **jsontable Support** | ✅ | ✅ | ✅ | ✅ |
|
|
| **Direct Transport** | ✅ | ✅ | ✅ | ✅ |
|
|
| **Link Transport** | ✅ | ✅ | ✅ | ⚠️ (Limited) |
|
|
| **Handler Functions** | ✅ | ✅ | ✅ | ✅ |
|
|
| **Cross-Platform API** | ✅ | ✅ | ✅ | ✅ |
|
|
|
|
---
|
|
|
|
## Implementation Details by Platform
|
|
|
|
### Julia: Multiple Dispatch Pattern
|
|
|
|
```julia
|
|
# Function overloading based on argument types
|
|
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
|
|
# Creates new connection
|
|
end
|
|
|
|
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
|
|
# Uses pre-existing connection
|
|
end
|
|
|
|
# Type-specific serialization
|
|
function _serialize_data(data::String, payload_type::String)
|
|
# Text handling
|
|
end
|
|
|
|
function _serialize_data(data::Dict, payload_type::String)
|
|
# Dictionary handling
|
|
end
|
|
|
|
function _serialize_data(data::DataFrame, payload_type::String)
|
|
# Table handling - arrowtable
|
|
end
|
|
|
|
function _serialize_data(data::Vector{NamedTuple}, payload_type::String)
|
|
# Table handling - jsontable
|
|
end
|
|
```
|
|
|
|
### JavaScript: Prototype + Async Pattern
|
|
|
|
```javascript
|
|
// Class-based NATS client
|
|
class NATSClient {
|
|
constructor(url) {
|
|
this.url = url;
|
|
}
|
|
|
|
async connect() {
|
|
// Connection logic
|
|
}
|
|
|
|
async publish(subject, message) {
|
|
// Publish logic
|
|
}
|
|
}
|
|
|
|
// Module-level utility functions
|
|
function generateUUID() {
|
|
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => {
|
|
const r = Math.random() * 16 | 0;
|
|
return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16);
|
|
});
|
|
}
|
|
|
|
async function serializeData(data, payload_type) {
|
|
// Serialization logic for arrowtable and jsontable
|
|
}
|
|
```
|
|
|
|
### Python: Class-Based Pattern
|
|
|
|
```python
|
|
class NATSBridge:
|
|
"""Main bridge class."""
|
|
|
|
def __init__(self, broker_url=None):
|
|
self.broker_url = broker_url or DEFAULT_BROKER_URL
|
|
|
|
async def smartsend(self, subject, data, **kwargs):
|
|
"""Send data."""
|
|
pass
|
|
|
|
async def smartreceive(self, msg, **kwargs):
|
|
"""Receive message."""
|
|
pass
|
|
|
|
# Module-level convenience functions
|
|
def smartsend(subject, data, **kwargs):
|
|
"""Convenience function using default NATSBridge instance."""
|
|
bridge = NATSBridge()
|
|
return await bridge.smartsend(subject, data, **kwargs)
|
|
|
|
def smartreceive(msg, **kwargs):
|
|
"""Convenience function using default NATSBridge instance."""
|
|
bridge = NATSBridge()
|
|
return await bridge.smartreceive(msg, **kwargs)
|
|
```
|
|
|
|
---
|
|
|
|
## Scenario Implementations (Cross-Platform)
|
|
|
|
### Scenario 1: Command & Control (Small Dictionary)
|
|
|
|
| Platform | Code |
|
|
|----------|------|
|
|
| **Julia** | ```julia<br>config = Dict("step_size" => 0.01)<br>env, env_json_str = smartsend("control", [("config", config, "dictionary")])``` |
|
|
| **JavaScript** | ```javascript<br>const config = { step_size: 0.01 };<br>[env, env_json_str] = await smartsend("control", [["config", config, "dictionary"]]);``` |
|
|
| **Python** | ```python<br>config = {"step_size": 0.01}<br>env, env_json_str = await smartsend("control", [("config", config, "dictionary")])``` |
|
|
|
|
### Scenario 2: Deep Dive Analysis (Large Arrow Table)
|
|
|
|
| Platform | Code |
|
|
|----------|------|
|
|
| **Julia** | ```julia<br>df = DataFrame(id=1:1000000, value=rand(1000000))<br>env, env_json_str = smartsend("analysis", [("table_data", df, "arrowtable")])``` |
|
|
| **JavaScript** | ```javascript<br>const df = [{ id: 1, value: 0.5 }, ...];<br>[env, env_json_str] = await smartsend("analysis", [["table_data", df, "arrowtable"]]);``` |
|
|
| **Python** | ```python<br>import pandas as pd<br>df = pd.DataFrame({"id": range(1000000), "value": np.random.rand(1000000)})<br>env, env_json_str = await smartsend("analysis", [("table_data", df, "arrowtable")])``` |
|
|
|
|
### Scenario 3: Chat System (Multi-Payload)
|
|
|
|
| Platform | Code |
|
|
|----------|------|
|
|
| **Julia** | ```julia<br>chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]<br>env, env_json_str = smartsend("chat", chat)``` |
|
|
| **JavaScript** | ```javascript<br>const chat = [["text", "Hello!", "text"], ["image", imgBuffer, "image"]];<br>[env, env_json_str] = await smartsend("chat", chat);``` |
|
|
| **Python** | ```python<br>chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]<br>env, env_json_str = await smartsend("chat", chat)``` |
|
|
|
|
### Scenario 4: JSON Table Transfer (Cross-Platform)
|
|
|
|
| Platform | Code |
|
|
|----------|------|
|
|
| **Julia** | ```julia<br>rows = [Dict("id" => 1, "name" => "Alice"), Dict("id" => 2, "name" => "Bob")]<br>env, env_json_str = smartsend("data", [("users", rows, "jsontable")])``` |
|
|
| **JavaScript** | ```javascript<br>const users = [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }];<br>[env, env_json_str] = await smartsend("data", [["users", users, "jsontable"]]);``` |
|
|
| **Python** | ```python<br>users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]<br>env, env_json_str = await smartsend("data", [("users", users, "jsontable")])``` |
|
|
|
|
### Scenario 5: Smart Transport Selection
|
|
|
|
The `smartsend` function automatically selects the transport method based on payload size:
|
|
|
|
- **Direct Transport (< 1MB)**: Payload is serialized and embedded directly in the NATS message
|
|
- `arrowtable`: Serialized to Arrow IPC, base64 encoded
|
|
- `jsontable`: Serialized to JSON, base64 encoded
|
|
- `dictionary`: Serialized to JSON, base64 encoded
|
|
- `text`: Serialized to UTF-8, base64 encoded
|
|
- `image/audio/video/binary`: Base64 encoded
|
|
|
|
- **Link Transport (>= 1MB)**: Payload is uploaded to HTTP file server, URL embedded in message
|
|
- All types supported
|
|
- Receiver downloads from URL and deserializes
|
|
|
|
---
|
|
|
|
## Performance Considerations (Cross-Platform)
|
|
|
|
### Zero-Copy Reading
|
|
|
|
| Platform | Strategy |
|
|
|----------|----------|
|
|
| **Julia** | `Arrow.read()` with memory-mapped files |
|
|
| **JavaScript** | `ArrayBuffer` with `DataView` |
|
|
| **Python** | `pyarrow` memory mapping |
|
|
| **MicroPython** | Not available (streaming only) |
|
|
|
|
### Exponential Backoff
|
|
|
|
```python
|
|
# Python/MicroPython
|
|
async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id):
|
|
delay = base_delay
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
return await response.read()
|
|
except Exception as e:
|
|
if attempt < max_retries:
|
|
await asyncio.sleep(delay / 1000.0)
|
|
delay = min(delay * 2, max_delay)
|
|
raise Exception("Failed to fetch after max retries")
|
|
```
|
|
|
|
### Correlation ID Logging
|
|
|
|
All platforms use correlation IDs for distributed tracing:
|
|
|
|
```
|
|
[timestamp] [Correlation: abc123] Message published to subject
|
|
```
|
|
|
|
### Serialization Performance Comparison
|
|
|
|
| Format | Use Case | Pros | Cons |
|
|
|--------|----------|------|------|
|
|
| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library |
|
|
| `jsontable` | Small/medium tabular data | Human-readable, universal support | Slower, larger size, no schema |
|
|
|
|
---
|
|
|
|
## Testing Strategy (Cross-Platform)
|
|
|
|
### Unit Tests
|
|
|
|
| Test Type | Julia | JavaScript | Python |
|
|
|-----------|-------|------------|--------|
|
|
| **Serialization** | `test/test_julia_text_sender.jl` | `test/test_js_text_sender.js` | `test/test_py_text_sender.py` |
|
|
| **Deserialization** | `test/test_julia_text_receiver.jl` | `test/test_js_text_receiver.js` | `test/test_py_text_receiver.py` |
|
|
| **Large Payload** | `test/test_julia_file_sender.jl` | `test/test_js_file_sender.js` | `test/test_py_file_sender.py` |
|
|
| **Multi-Payload** | `test/test_julia_mix_payloads_sender.jl` | `test/test_js_mix_payloads_sender.js` | `test/test_py_mix_payloads_sender.py` |
|
|
| **Arrow Table** | `test/test_julia_table_sender.jl` | `test/test_js_table_sender.js` | `test/test_py_table_sender.py` |
|
|
|
|
### Integration Tests
|
|
|
|
- NATS server communication
|
|
- File server upload/download
|
|
- Cross-platform message exchange
|
|
- Arrow table serialization/deserialization
|
|
- JSON table serialization/deserialization
|
|
|
|
---
|
|
|
|
## Configuration
|
|
|
|
### Environment Variables
|
|
|
|
| Variable | Default | Description |
|
|
|----------|---------|-------------|
|
|
| `NATS_URL` | `nats://localhost:4222` | NATS server URL |
|
|
| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL |
|
|
| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) |
|
|
|
|
### MicroPython-Specific Configuration
|
|
|
|
```python
|
|
# micropython.conf
|
|
NATS_URL = "nats://broker.local:4222"
|
|
FILESERVER_URL = "http://fileserver.local:8080"
|
|
SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices
|
|
MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython
|
|
```
|
|
|
|
---
|
|
|
|
## Summary
|
|
|
|
This cross-platform NATS bridge provides:
|
|
|
|
1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across Julia, JavaScript, and Python/MicroPython
|
|
2. **Idiomatic Implementations**:
|
|
- Julia: Multiple dispatch and struct-based design
|
|
- JavaScript: Async/await and prototype-based utilities
|
|
- Python: Class-based design with type hints
|
|
3. **Message Format Consistency**: Identical `msg_envelope_v1` and `msg_payload_v1` JSON schemas
|
|
4. **Handler Abstraction**: File server operations abstracted through configurable handlers
|
|
5. **Platform-Specific Optimizations**:
|
|
- **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data
|
|
- **JSON** (`jsontable`): Universal human-readable format for smaller tables
|
|
- Streaming support in MicroPython
|
|
|
|
The Julia implementation serves as the **ground truth** for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms.
|
|
|
|
### Datatype Summary
|
|
|
|
| Datatype | Serialization | Use Case | Encoding |
|
|
|----------|---------------|----------|----------|
|
|
| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` |
|
|
| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` |
|