2079 lines
62 KiB
Markdown
2079 lines
62 KiB
Markdown
# Cross-Platform Implementation Guide: Bi-Directional Data Bridge
|
|
|
|
## Overview
|
|
|
|
This document describes the implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language.
|
|
|
|
**Supported Platforms:**
|
|
- **Julia** - Ground truth implementation (reference)
|
|
- **JavaScript** - Node.js and browser implementation
|
|
- **Python/MicroPython** - Desktop and embedded implementation
|
|
|
|
---
|
|
|
|
## Implementation Files
|
|
|
|
| Language | Implementation File | Description |
|
|
|----------|---------------------|-------------|
|
|
| **Julia** | [`src/NATSBridge.jl`](../src/NATSBridge.jl) | Full Julia implementation with Arrow IPC support |
|
|
| **JavaScript** | `src/natsbridge.js` | Node.js/browser implementation |
|
|
| **Python** | `src/natsbridge.py` | Desktop Python implementation |
|
|
| **MicroPython** | `src/natsbridge_mpy.py` | MicroPython implementation (limited features) |
|
|
|
|
---
|
|
|
|
## File Server Handler Architecture
|
|
|
|
The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
|
|
|
|
### Handler Function Signatures
|
|
|
|
#### Julia
|
|
|
|
```julia
|
|
# Upload handler - uploads data to file server and returns URL
|
|
fileserver_upload_handler(
|
|
fileserver_url::String,
|
|
dataname::String,
|
|
data::Vector{UInt8}
|
|
)::Dict{String, Any}
|
|
|
|
# Download handler - fetches data from file server URL with exponential backoff
|
|
fileserver_download_handler(
|
|
url::String,
|
|
max_retries::Int,
|
|
base_delay::Int,
|
|
max_delay::Int,
|
|
correlation_id::String
|
|
)::Vector{UInt8}
|
|
```
|
|
|
|
#### JavaScript
|
|
|
|
```javascript
|
|
// Upload handler - async function
|
|
async function fileserver_upload_handler(
|
|
fileserver_url,
|
|
dataname,
|
|
data // Uint8Array
|
|
) {
|
|
// Returns: { status, uploadid, fileid, url }
|
|
}
|
|
|
|
// Download handler - async function
|
|
async function fileserver_download_handler(
|
|
url,
|
|
max_retries,
|
|
base_delay,
|
|
max_delay,
|
|
correlation_id
|
|
) {
|
|
// Returns: Uint8Array
|
|
}
|
|
```
|
|
|
|
#### Python
|
|
|
|
```python
|
|
# Upload handler - async function
|
|
async def fileserver_upload_handler(
|
|
fileserver_url: str,
|
|
dataname: str,
|
|
data: bytes
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Upload data to file server.
|
|
|
|
Returns:
|
|
Dict with keys: 'status', 'uploadid', 'fileid', 'url'
|
|
"""
|
|
pass
|
|
|
|
# Download handler - async function
|
|
async def fileserver_download_handler(
|
|
url: str,
|
|
max_retries: int,
|
|
base_delay: int,
|
|
max_delay: int,
|
|
correlation_id: str
|
|
) -> bytes:
|
|
"""
|
|
Download data from URL with exponential backoff.
|
|
|
|
Returns:
|
|
Downloaded bytes
|
|
"""
|
|
pass
|
|
```
|
|
|
|
#### MicroPython
|
|
|
|
```python
|
|
# Upload handler - synchronous (no async in MicroPython)
|
|
def fileserver_upload_handler(
|
|
fileserver_url: str,
|
|
dataname: str,
|
|
data: bytearray
|
|
) -> Dict:
|
|
"""
|
|
Upload data to file server (synchronous).
|
|
|
|
Returns:
|
|
Dict with keys: 'status', 'url'
|
|
"""
|
|
pass
|
|
|
|
# Download handler - synchronous
|
|
def fileserver_download_handler(
|
|
url: str,
|
|
max_retries: int,
|
|
base_delay: int,
|
|
max_delay: int,
|
|
correlation_id: str
|
|
) -> bytearray:
|
|
"""
|
|
Download data from URL with exponential backoff (synchronous).
|
|
|
|
Returns:
|
|
Downloaded bytes
|
|
"""
|
|
pass
|
|
```
|
|
|
|
---
|
|
|
|
## Multi-Payload Support (Standard API)
|
|
|
|
The system uses a **standardized list-of-tuples format** for all payload operations across all platforms.
|
|
|
|
### API Standard
|
|
|
|
```
|
|
# Input format for smartsend (always a list of tuples with type info)
|
|
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
|
|
|
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
|
|
{
|
|
"correlation_id": "...",
|
|
"msg_id": "...",
|
|
"timestamp": "...",
|
|
"send_to": "...",
|
|
"msg_purpose": "...",
|
|
"sender_name": "...",
|
|
"sender_id": "...",
|
|
"receiver_name": "...",
|
|
"receiver_id": "...",
|
|
"reply_to": "...",
|
|
"reply_to_msg_id": "...",
|
|
"broker_url": "...",
|
|
"metadata": {...},
|
|
"payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
|
}
|
|
```
|
|
|
|
### Supported Types
|
|
|
|
| Type | Julia | JavaScript | Python | MicroPython |
|
|
|------|-------|------------|--------|-------------|
|
|
| `text` | `String` | `string` | `str` | `str` |
|
|
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` |
|
|
| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array<Object>` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) |
|
|
| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array<Object>` | `list[dict]`, `list` | `list` |
|
|
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
|
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
|
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
|
| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` |
|
|
|
|
### Cross-Platform Examples
|
|
|
|
#### Julia
|
|
|
|
```julia
|
|
using NATSBridge
|
|
|
|
# Single payload - still wrapped in a list
|
|
env, env_json_str = smartsend(
|
|
"/test",
|
|
[("dataname1", data1, "dictionary")],
|
|
broker_url="nats://localhost:4222",
|
|
fileserver_upload_handler=plik_oneshot_upload
|
|
)
|
|
|
|
# Multiple payloads with different types
|
|
env, env_json_str = smartsend(
|
|
"/test",
|
|
[("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")],
|
|
broker_url="nats://localhost:4222"
|
|
)
|
|
|
|
# Mixed content (chat with text, image, audio)
|
|
env, env_json_str = smartsend(
|
|
"/chat",
|
|
[
|
|
("message_text", "Hello!", "text"),
|
|
("user_image", image_data, "image"),
|
|
("audio_clip", audio_data, "audio")
|
|
],
|
|
broker_url="nats://localhost:4222"
|
|
)
|
|
|
|
# Receive returns a JSON.Object{String, Any} envelope
|
|
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
|
|
# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}}
|
|
# Access payloads: env["payloads"] which is a Vector of tuples
|
|
for (dataname, data, type) in env["payloads"]
|
|
println("$dataname: $data (type: $type)")
|
|
end
|
|
```
|
|
|
|
#### JavaScript
|
|
|
|
```javascript
|
|
const NATSBridge = require('natsbridge');
|
|
|
|
// Single payload
|
|
const [env, env_json_str] = await NATSBridge.smartsend(
|
|
"/test",
|
|
[["dataname1", data1, "dictionary"]],
|
|
{
|
|
broker_url: "nats://localhost:4222",
|
|
fileserver_upload_handler: plikOneshotUpload
|
|
}
|
|
);
|
|
|
|
// Multiple payloads
|
|
const [env, env_json_str] = await NATSBridge.smartsend(
|
|
"/test",
|
|
[
|
|
["dataname1", data1, "dictionary"],
|
|
["dataname2", data2, "arrowtable"]
|
|
],
|
|
{ broker_url: "nats://localhost:4222" }
|
|
);
|
|
|
|
// Mixed content
|
|
const [env, env_json_str] = await NATSBridge.smartsend(
|
|
"/chat",
|
|
[
|
|
["message_text", "Hello!", "text"],
|
|
["user_image", imageData, "image"],
|
|
["audio_clip", audioData, "audio"]
|
|
],
|
|
{ broker_url: "nats://localhost:4222" }
|
|
);
|
|
|
|
// Receive
|
|
const env = await NATSBridge.smartreceive(msg, {
|
|
fileserver_download_handler: fetchWithBackoff
|
|
});
|
|
// env is an object with "payloads" field containing Array of arrays
|
|
// Access payloads: env.payloads which is an Array of [dataname, data, type] arrays
|
|
for (const [dataname, data, type] of env.payloads) {
|
|
console.log(`${dataname}: ${data} (type: ${type})`);
|
|
}
|
|
```
|
|
|
|
#### Python
|
|
|
|
```python
|
|
from natsbridge import NATSBridge
|
|
|
|
# Single payload
|
|
env, env_json_str = await NATSBridge.smartsend(
|
|
"/test",
|
|
[("dataname1", data1, "dictionary")],
|
|
broker_url="nats://localhost:4222",
|
|
fileserver_upload_handler=plik_oneshot_upload
|
|
)
|
|
|
|
# Multiple payloads
|
|
env, env_json_str = await NATSBridge.smartsend(
|
|
"/test",
|
|
[("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")],
|
|
broker_url="nats://localhost:4222"
|
|
)
|
|
|
|
# Mixed content
|
|
env, env_json_str = await NATSBridge.smartsend(
|
|
"/chat",
|
|
[
|
|
("message_text", "Hello!", "text"),
|
|
("user_image", image_data, "image"),
|
|
("audio_clip", audio_data, "audio")
|
|
],
|
|
broker_url="nats://localhost:4222"
|
|
)
|
|
|
|
# Receive
|
|
env = await NATSBridge.smartreceive(
|
|
msg,
|
|
fileserver_download_handler=fetch_with_backoff
|
|
)
|
|
# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
|
|
# Access payloads: env["payloads"] which is a list of tuples
|
|
for dataname, data, type_ in env["payloads"]:
|
|
print(f"{dataname}: {data} (type: {type_})")
|
|
```
|
|
|
|
#### MicroPython
|
|
|
|
```python
|
|
from natsbridge import NATSBridge
|
|
|
|
# Limited to text and binary (no tables due to memory constraints)
|
|
env, env_json_str = NATSBridge.smartsend(
|
|
"/chat",
|
|
[
|
|
("message_text", "Hello!", "text"),
|
|
("binary_data", data_bytes, "binary")
|
|
],
|
|
broker_url="nats://localhost:4222",
|
|
size_threshold=100000 # Lower threshold for memory constraints
|
|
)
|
|
# Note: MicroPython uses synchronous handlers
|
|
```
|
|
|
|
---
|
|
|
|
## Architecture
|
|
|
|
### Cross-Platform Claim-Check Pattern
|
|
|
|
```mermaid
|
|
flowchart TD
|
|
A[SmartSend Function] --> B{Is payload size < 1MB?}
|
|
B -->|Yes | C[Direct Path<br/><small>< 1MB</small>]
|
|
B -->|No | D[Link Path<br/><small>>= 1MB</small>]
|
|
|
|
C --> C1[Serialize to Buffer]
|
|
C1 --> C2[Base64/JSON encode]
|
|
C2 --> C3[Publish to NATS]
|
|
|
|
D --> D1[Serialize to Buffer]
|
|
D1 --> D2[Upload to HTTP Server]
|
|
D2 --> D3[Publish to NATS with URL]
|
|
|
|
style A fill:#e1f5ff,stroke:#0066cc,stroke-width:2px
|
|
style B fill:#fff4e1,stroke:#cc6600,stroke-width:2px
|
|
style C fill:#e8f5e9,stroke:#008000,stroke-width:2px
|
|
style D fill:#e8f5e9,stroke:#008000,stroke-width:2px
|
|
style C1 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
style C2 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
style C3 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
style D1 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
style D2 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
style D3 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
```
|
|
|
|
**Claim-Check Pattern Overview:**
|
|
- **Direct Path** (< 1MB): Payload is serialized, Base64-encoded, and published directly to NATS
|
|
- **Link Path** (≥ 1MB): Payload is serialized, uploaded to an HTTP file server, and only the URL is published to NATS (claim-check pattern)
|
|
|
|
### smartsend Return Value
|
|
|
|
All platforms return a tuple/array containing both the envelope and JSON string:
|
|
|
|
#### Julia
|
|
|
|
```julia
|
|
env, env_json_str = smartsend(...)
|
|
# Returns: ::Tuple{msg_envelope_v1, String}
|
|
# env::msg_envelope_v1 - The envelope object with all metadata and payloads
|
|
# env_json_str::String - JSON string for publishing to NATS
|
|
```
|
|
|
|
#### JavaScript
|
|
|
|
```javascript
|
|
const [env, env_json_str] = await smartsend(...);
|
|
// Returns: Promise<[env, env_json_str]>
|
|
// env: Object with all metadata and payloads
|
|
// env_json_str: String for publishing to NATS
|
|
```
|
|
|
|
#### Python
|
|
|
|
```python
|
|
env, env_json_str = await smartsend(...)
|
|
# Returns: Tuple[Dict, str]
|
|
# env: Dict with all metadata and payloads
|
|
# env_json_str: String for publishing to NATS
|
|
```
|
|
|
|
#### MicroPython
|
|
|
|
```python
|
|
env, env_json_str = NATSBridge.smartsend(...)
|
|
# Returns: Tuple[Dict, str]
|
|
# Note: MicroPython returns plain dicts (no structured envelope object)
|
|
```
|
|
|
|
---
|
|
|
|
## Installation
|
|
|
|
### Julia Dependencies
|
|
|
|
```julia
|
|
using Pkg
|
|
Pkg.add("NATS")
|
|
Pkg.add("Arrow")
|
|
Pkg.add("JSON3")
|
|
Pkg.add("HTTP")
|
|
Pkg.add("UUIDs")
|
|
Pkg.add("Dates")
|
|
```
|
|
|
|
### JavaScript Dependencies (Node.js)
|
|
|
|
```bash
|
|
npm install nats apache-arrow node-fetch
|
|
# or
|
|
yarn add nats apache-arrow node-fetch
|
|
```
|
|
|
|
**Note:** Node.js has a built-in `crypto` module for UUID generation, so no external `uuid` package is needed.
|
|
|
|
### JavaScript Dependencies (Browser)
|
|
|
|
```bash
|
|
npm install nats apache-arrow
|
|
# or use CDN:
|
|
# https://unpkg.com/nats-js/dist/bundle/nats.min.js
|
|
# https://unpkg.com/apache-arrow/arrow.min.js
|
|
```
|
|
|
|
**Note:** For browser UUID generation, use the built-in `crypto.randomUUID()` API (available in modern browsers) or a lightweight alternative like `uuidv4` package.
|
|
|
|
### Python Dependencies (Desktop)
|
|
|
|
```bash
|
|
pip install nats-py aiohttp pyarrow pandas python-dateutil
|
|
```
|
|
|
|
### MicroPython Dependencies
|
|
|
|
MicroPython uses built-in modules:
|
|
- `network` - NATS connection (custom implementation)
|
|
- `time` - Timestamps
|
|
- `uos` - File operations
|
|
- `base64` - Base64 encoding
|
|
- `json` - JSON parsing
|
|
- `struct` - Binary data handling
|
|
|
|
---
|
|
|
|
## Usage Tutorial
|
|
|
|
### Step 1: Start NATS Server
|
|
|
|
```bash
|
|
docker run -p 4222:4222 nats:latest
|
|
```
|
|
|
|
### Step 2: Start HTTP File Server (optional)
|
|
|
|
```bash
|
|
# Create a directory for file uploads
|
|
mkdir -p /tmp/fileserver
|
|
|
|
# Use any HTTP server that supports POST for file uploads
|
|
# Example: Python's built-in server
|
|
python3 -m http.server 8080 --directory /tmp/fileserver
|
|
```
|
|
|
|
### Step 3: Run Test Scenarios
|
|
|
|
```bash
|
|
# Julia tests
|
|
julia test/test_julia_to_julia_text_sender.jl
|
|
julia test/test_julia_to_julia_text_receiver.jl
|
|
|
|
# JavaScript tests (Node.js)
|
|
node test/test_js_text_sender.js
|
|
node test/test_js_text_receiver.js
|
|
|
|
# Python tests
|
|
python3 test/test_py_text_sender.py
|
|
python3 test/test_py_text_receiver.py
|
|
```
|
|
|
|
---
|
|
|
|
## Platform-Specific Implementations
|
|
|
|
### Julia Implementation
|
|
|
|
#### Module Structure
|
|
|
|
```julia
|
|
module NATSBridge
|
|
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64
|
|
|
|
# Constants
|
|
const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB
|
|
const DEFAULT_BROKER_URL = "nats://localhost:4222"
|
|
const DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
|
|
|
# Structs
|
|
struct msg_payload_v1
|
|
id::String
|
|
dataname::String
|
|
payload_type::String
|
|
transport::String
|
|
encoding::String
|
|
size::Integer
|
|
data::Any
|
|
metadata::Dict{String, Any}
|
|
end
|
|
|
|
struct msg_envelope_v1
|
|
correlation_id::String
|
|
msg_id::String
|
|
timestamp::String
|
|
send_to::String
|
|
msg_purpose::String
|
|
sender_name::String
|
|
sender_id::String
|
|
receiver_name::String
|
|
receiver_id::String
|
|
reply_to::String
|
|
reply_to_msg_id::String
|
|
broker_url::String
|
|
metadata::Dict{String, Any}
|
|
payloads::Vector{msg_payload_v1}
|
|
end
|
|
|
|
# Main functions
|
|
function smartsend(...) end
|
|
function smartreceive(...) end
|
|
|
|
# Utility functions
|
|
function _serialize_data(...) end
|
|
function _deserialize_data(...) end
|
|
function envelope_to_json(...) end
|
|
function log_trace(...) end
|
|
|
|
# File server handlers
|
|
function plik_oneshot_upload(...) end
|
|
function _fetch_with_backoff(...) end
|
|
function publish_message(...) end
|
|
|
|
# Internal helpers
|
|
function _get_payload_bytes(...) end
|
|
end
|
|
```
|
|
|
|
#### Multiple Dispatch Pattern
|
|
|
|
Julia leverages multiple dispatch for type-specific implementations:
|
|
|
|
```julia
|
|
# publish_message has two overloads based on argument types
|
|
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
|
|
conn = NATS.connect(broker_url)
|
|
publish_message(conn, subject, message, correlation_id)
|
|
end
|
|
|
|
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
|
|
try
|
|
NATS.publish(conn, subject, message)
|
|
log_trace(correlation_id, "Message published to $subject")
|
|
finally
|
|
NATS.drain(conn)
|
|
end
|
|
end
|
|
|
|
# Type-specific serialization
|
|
function _serialize_data(data::String, payload_type::String)
|
|
# Text handling
|
|
return Vector{UInt8}(data)
|
|
end
|
|
|
|
function _serialize_data(data::Dict, payload_type::String)
|
|
# Dictionary handling
|
|
json_str = JSON.json(data)
|
|
return Vector{UInt8}(json_str)
|
|
end
|
|
|
|
function _serialize_data(data::DataFrame, payload_type::String)
|
|
# Table handling - arrowtable
|
|
io = IOBuffer()
|
|
Arrow.write(io, data)
|
|
return take!(io)
|
|
end
|
|
```
|
|
|
|
#### smartsend Implementation
|
|
|
|
```julia
|
|
function smartsend(
|
|
subject::String,
|
|
data::AbstractArray{Tuple{String, T1, String}, 1};
|
|
broker_url::String = DEFAULT_BROKER_URL,
|
|
fileserver_url = DEFAULT_FILESERVER_URL,
|
|
fileserver_upload_handler::Function = plik_oneshot_upload,
|
|
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
|
correlation_id::String = string(uuid4()),
|
|
msg_purpose::String = "chat",
|
|
sender_name::String = "NATSBridge",
|
|
receiver_name::String = "",
|
|
receiver_id::String = "",
|
|
reply_to::String = "",
|
|
reply_to_msg_id::String = "",
|
|
is_publish::Bool = true,
|
|
NATS_connection::Union{NATS.Connection, Nothing} = nothing,
|
|
msg_id::String = string(uuid4()),
|
|
sender_id::String = string(uuid4())
|
|
)::Tuple{msg_envelope_v1, String} where {T1<:Any}
|
|
|
|
log_trace(correlation_id, "Starting smartsend for subject: $subject")
|
|
|
|
# Process each payload in the list
|
|
payloads = msg_payload_v1[]
|
|
for (dataname, payload_data, payload_type) in data
|
|
# Serialize data based on type
|
|
payload_bytes = _serialize_data(payload_data, payload_type)
|
|
|
|
payload_size = length(payload_bytes)
|
|
log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes")
|
|
|
|
# Decision: Direct vs Link
|
|
if payload_size < size_threshold
|
|
# Direct path - Base64 encode and send via NATS
|
|
payload_b64 = Base64.base64encode(payload_bytes)
|
|
log_trace(correlation_id, "Using direct transport for $payload_size bytes")
|
|
|
|
payload = msg_payload_v1(
|
|
payload_b64,
|
|
payload_type;
|
|
id = string(uuid4()),
|
|
dataname = dataname,
|
|
transport = "direct",
|
|
encoding = "base64",
|
|
size = payload_size,
|
|
metadata = Dict{String, Any}("payload_bytes" => payload_size)
|
|
)
|
|
push!(payloads, payload)
|
|
else
|
|
# Link path - Upload to HTTP server, send URL via NATS
|
|
log_trace(correlation_id, "Using link transport, uploading to fileserver")
|
|
|
|
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
|
|
|
if response["status"] != 200
|
|
error("Failed to upload data to fileserver: $(response["status"])")
|
|
end
|
|
|
|
url = response["url"]
|
|
log_trace(correlation_id, "Uploaded to URL: $url")
|
|
|
|
payload = msg_payload_v1(
|
|
url,
|
|
payload_type;
|
|
id = string(uuid4()),
|
|
dataname = dataname,
|
|
transport = "link",
|
|
encoding = "none",
|
|
size = payload_size,
|
|
metadata = Dict{String, Any}()
|
|
)
|
|
push!(payloads, payload)
|
|
end
|
|
end
|
|
|
|
# Create msg_envelope_v1 with all payloads
|
|
# Note: First positional argument is "send_to" (the NATS subject), not "subject"
|
|
env = msg_envelope_v1(
|
|
subject, # send_to: NATS subject to publish to
|
|
payloads;
|
|
correlation_id = correlation_id,
|
|
msg_id = msg_id,
|
|
msg_purpose = msg_purpose,
|
|
sender_name = sender_name,
|
|
sender_id = sender_id,
|
|
receiver_name = receiver_name,
|
|
receiver_id = receiver_id,
|
|
reply_to = reply_to,
|
|
reply_to_msg_id = reply_to_msg_id,
|
|
broker_url = broker_url,
|
|
metadata = Dict{String, Any}(),
|
|
)
|
|
|
|
env_json_str = envelope_to_json(env)
|
|
|
|
if is_publish == false
|
|
# skip publish
|
|
elseif is_publish == true && NATS_connection === nothing
|
|
publish_message(broker_url, subject, env_json_str, correlation_id)
|
|
elseif is_publish == true && NATS_connection !== nothing
|
|
publish_message(NATS_connection, subject, env_json_str, correlation_id)
|
|
end
|
|
|
|
return (env, env_json_str)
|
|
end
|
|
```
|
|
|
|
#### smartreceive Implementation
|
|
|
|
```julia
|
|
function smartreceive(
|
|
msg::NATS.Msg;
|
|
fileserver_download_handler::Function = _fetch_with_backoff,
|
|
max_retries::Int = 5,
|
|
base_delay::Int = 100,
|
|
max_delay::Int = 5000
|
|
)::JSON.Object{String, Any}
|
|
# Parse the JSON envelope
|
|
env_json_obj = JSON.parse(String(msg.payload))
|
|
log_trace(env_json_obj["correlation_id"], "Processing received message")
|
|
|
|
# Process all payloads in the envelope
|
|
payloads_list = Tuple{String, Any, String}[]
|
|
|
|
num_payloads = length(env_json_obj["payloads"])
|
|
|
|
for i in 1:num_payloads
|
|
payload = env_json_obj["payloads"][i]
|
|
transport = String(payload["transport"])
|
|
dataname = String(payload["dataname"])
|
|
|
|
if transport == "direct"
|
|
log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'")
|
|
|
|
# Extract base64 payload from the payload
|
|
payload_b64 = String(payload["data"])
|
|
|
|
# Decode Base64 payload
|
|
payload_bytes = Base64.base64decode(payload_b64)
|
|
|
|
# Deserialize based on type
|
|
data_type = String(payload["payload_type"])
|
|
data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"])
|
|
|
|
push!(payloads_list, (dataname, data, data_type))
|
|
elseif transport == "link"
|
|
# Extract download URL from the payload
|
|
url = String(payload["data"])
|
|
log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url")
|
|
|
|
# Fetch with exponential backoff using the download handler
|
|
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"])
|
|
|
|
# Deserialize based on type
|
|
data_type = String(payload["payload_type"])
|
|
data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"])
|
|
|
|
push!(payloads_list, (dataname, data, data_type))
|
|
else
|
|
error("Unknown transport type for payload '$dataname': $(transport)")
|
|
end
|
|
end
|
|
env_json_obj["payloads"] = payloads_list
|
|
return env_json_obj
|
|
end
|
|
```
|
|
|
|
#### _serialize_data Implementation
|
|
|
|
```julia
|
|
function _serialize_data(data::Any, payload_type::String)
|
|
if payload_type == "text"
|
|
if isa(data, String)
|
|
data_bytes = Vector{UInt8}(data)
|
|
return data_bytes
|
|
else
|
|
error("Text data must be a String")
|
|
end
|
|
elseif payload_type == "dictionary"
|
|
json_str = JSON.json(data)
|
|
json_str_bytes = Vector{UInt8}(json_str)
|
|
return json_str_bytes
|
|
elseif payload_type == "arrowtable"
|
|
# Serialize DataFrame to Arrow IPC format
|
|
io = IOBuffer()
|
|
Arrow.write(io, data)
|
|
return take!(io)
|
|
elseif payload_type == "jsontable"
|
|
# Serialize to JSON
|
|
# data is Vector{NamedTuple} or Vector{Dict}
|
|
json_str = JSON.json(data)
|
|
return Vector{UInt8}(json_str)
|
|
elseif payload_type == "image"
|
|
if isa(data, Vector{UInt8})
|
|
return data
|
|
else
|
|
error("Image data must be Vector{UInt8}")
|
|
end
|
|
elseif payload_type == "audio"
|
|
if isa(data, Vector{UInt8})
|
|
return data
|
|
else
|
|
error("Audio data must be Vector{UInt8}")
|
|
end
|
|
elseif payload_type == "video"
|
|
if isa(data, Vector{UInt8})
|
|
return data
|
|
else
|
|
error("Video data must be Vector{UInt8}")
|
|
end
|
|
elseif payload_type == "binary"
|
|
if isa(data, IOBuffer)
|
|
return take!(data)
|
|
elseif isa(data, Vector{UInt8})
|
|
return data
|
|
else
|
|
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
|
end
|
|
else
|
|
error("Unknown payload_type: $payload_type")
|
|
end
|
|
end
|
|
```
|
|
|
|
#### _deserialize_data Implementation
|
|
|
|
```julia
|
|
function _deserialize_data(
|
|
data::Vector{UInt8},
|
|
payload_type::String,
|
|
correlation_id::String
|
|
)
|
|
if payload_type == "text"
|
|
return String(data)
|
|
elseif payload_type == "dictionary"
|
|
json_str = String(data)
|
|
return JSON.parse(json_str)
|
|
elseif payload_type == "arrowtable"
|
|
# Deserialize from Arrow IPC format
|
|
io = IOBuffer(data)
|
|
arrow_table = Arrow.Table(io)
|
|
return arrow_table
|
|
elseif payload_type == "jsontable"
|
|
# Deserialize from JSON format
|
|
# Returns Vector{NamedTuple} or Vector{Dict}
|
|
json_str = String(data)
|
|
parsed = JSON.parse(json_str)
|
|
return parsed
|
|
elseif payload_type == "image"
|
|
return data
|
|
elseif payload_type == "audio"
|
|
return data
|
|
elseif payload_type == "video"
|
|
return data
|
|
elseif payload_type == "binary"
|
|
return data
|
|
else
|
|
error("Unknown payload_type: $payload_type")
|
|
end
|
|
end
|
|
```
|
|
|
|
#### _fetch_with_backoff Implementation
|
|
|
|
```julia
|
|
function _fetch_with_backoff(
|
|
url::String,
|
|
max_retries::Int,
|
|
base_delay::Int,
|
|
max_delay::Int,
|
|
correlation_id::String
|
|
)
|
|
delay = base_delay
|
|
for attempt in 1:max_retries
|
|
try
|
|
response = HTTP.request("GET", url)
|
|
if response.status == 200
|
|
log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt")
|
|
return response.body
|
|
else
|
|
error("Failed to fetch: $(response.status)")
|
|
end
|
|
catch e
|
|
log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))")
|
|
|
|
if attempt < max_retries
|
|
sleep(delay / 1000.0)
|
|
delay = min(delay * 2, max_delay)
|
|
end
|
|
end
|
|
end
|
|
|
|
error("Failed to fetch data after $max_retries attempts")
|
|
end
|
|
```
|
|
|
|
#### plik_oneshot_upload Implementation
|
|
|
|
```julia
|
|
function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8})
|
|
# Get upload id
|
|
url_getUploadID = "$file_server_url/upload"
|
|
headers = ["Content-Type" => "application/json"]
|
|
body = """{ "OneShot" : true }"""
|
|
http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
|
response_json = JSON.parse(http_response.body)
|
|
uploadid = response_json["id"]
|
|
uploadtoken = response_json["uploadToken"]
|
|
|
|
# Upload file
|
|
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
|
url_upload = "$file_server_url/file/$uploadid"
|
|
headers = ["X-UploadToken" => uploadtoken]
|
|
|
|
form = HTTP.Form(Dict(
|
|
"file" => file_multipart
|
|
))
|
|
|
|
http_response = nothing
|
|
try
|
|
http_response = HTTP.post(url_upload, headers, form)
|
|
catch e
|
|
@error "Request failed" exception=e
|
|
end
|
|
response_json = JSON.parse(http_response.body)
|
|
fileid = response_json["id"]
|
|
|
|
url = "$file_server_url/file/$uploadid/$fileid/$dataname"
|
|
|
|
return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
|
end
|
|
```
|
|
|
|
---
|
|
|
|
### JavaScript Implementation
|
|
|
|
#### Module Structure
|
|
|
|
```javascript
|
|
// natsbridge.js
|
|
const nats = require('nats');
|
|
const crypto = require('crypto');
|
|
const fetch = require('node-fetch');
|
|
|
|
// UUID generation using built-in crypto module
|
|
const uuidv4 = () => crypto.randomUUID();
|
|
|
|
const DEFAULT_SIZE_THRESHOLD = 1_000_000;
|
|
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
|
|
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
|
|
|
|
class NATSClient {
|
|
constructor(url) {
|
|
this.url = url;
|
|
this.connection = null;
|
|
}
|
|
|
|
async connect() {
|
|
this.connection = await nats.connect({ servers: this.url });
|
|
return this.connection;
|
|
}
|
|
|
|
async publish(subject, message) {
|
|
if (!this.connection) {
|
|
await this.connect();
|
|
}
|
|
await this.connection.publish(subject, message);
|
|
}
|
|
|
|
async close() {
|
|
if (this.connection) {
|
|
this.connection.close();
|
|
}
|
|
}
|
|
}
|
|
|
|
async function smartsend(subject, data, options = {}) {
|
|
// Implementation
|
|
}
|
|
|
|
async function smartreceive(msg, options = {}) {
|
|
// Implementation
|
|
}
|
|
|
|
module.exports = {
|
|
NATSClient,
|
|
smartsend,
|
|
smartreceive,
|
|
plikOneshotUpload,
|
|
fetchWithBackoff
|
|
};
|
|
```
|
|
|
|
#### smartsend Implementation
|
|
|
|
```javascript
|
|
const nats = require('nats');
|
|
const crypto = require('crypto');
|
|
const fetch = require('node-fetch');
|
|
const arrow = require('apache-arrow');
|
|
|
|
// UUID generation using built-in crypto module
|
|
const uuidv4 = () => crypto.randomUUID();
|
|
|
|
const DEFAULT_SIZE_THRESHOLD = 1_000_000;
|
|
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
|
|
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
|
|
|
|
async function smartsend(subject, data, options = {}) {
|
|
const {
|
|
broker_url = DEFAULT_BROKER_URL,
|
|
fileserver_url = DEFAULT_FILESERVER_URL,
|
|
fileserver_upload_handler = plikOneshotUpload,
|
|
size_threshold = DEFAULT_SIZE_THRESHOLD,
|
|
correlation_id = uuidv4(),
|
|
msg_purpose = 'chat',
|
|
sender_name = 'NATSBridge',
|
|
receiver_name = '',
|
|
receiver_id = '',
|
|
reply_to = '',
|
|
reply_to_msg_id = '',
|
|
is_publish = true,
|
|
nats_connection = null,
|
|
msg_id = uuidv4(),
|
|
sender_id = uuidv4()
|
|
} = options;
|
|
|
|
console.log(`[Correlation: ${correlation_id}] Starting smartsend for subject: ${subject}`);
|
|
|
|
// Process payloads
|
|
const payloads = [];
|
|
for (const [dataname, payloadData, payloadType] of data) {
|
|
const payloadBytes = await serializeData(payloadData, payloadType);
|
|
const payloadSize = payloadBytes.byteLength;
|
|
|
|
console.log(`[Correlation: ${correlation_id}] Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);
|
|
|
|
if (payloadSize < size_threshold) {
|
|
// Direct path
|
|
const payloadB64 = bufferToBase64(payloadBytes);
|
|
console.log(`[Correlation: ${correlation_id}] Using direct transport for ${payloadSize} bytes`);
|
|
|
|
payloads.push({
|
|
id: uuidv4(),
|
|
dataname,
|
|
payload_type: payloadType,
|
|
transport: 'direct',
|
|
encoding: 'base64',
|
|
size: payloadSize,
|
|
data: payloadB64,
|
|
metadata: { payload_bytes: payloadSize }
|
|
});
|
|
} else {
|
|
// Link path
|
|
console.log(`[Correlation: ${correlation_id}] Using link transport, uploading to fileserver`);
|
|
|
|
const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes);
|
|
|
|
if (response.status !== 200) {
|
|
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
|
|
}
|
|
|
|
console.log(`[Correlation: ${correlation_id}] Uploaded to URL: ${response.url}`);
|
|
|
|
payloads.push({
|
|
id: uuidv4(),
|
|
dataname,
|
|
payload_type: payloadType,
|
|
transport: 'link',
|
|
encoding: 'none',
|
|
size: payloadSize,
|
|
data: response.url,
|
|
metadata: {}
|
|
});
|
|
}
|
|
}
|
|
|
|
// Build envelope
|
|
const env = {
|
|
correlation_id,
|
|
msg_id,
|
|
timestamp: new Date().toISOString(),
|
|
send_to: subject,
|
|
msg_purpose,
|
|
sender_name,
|
|
sender_id,
|
|
receiver_name,
|
|
receiver_id,
|
|
reply_to,
|
|
reply_to_msg_id,
|
|
broker_url,
|
|
metadata: {},
|
|
payloads
|
|
};
|
|
|
|
const env_json_str = JSON.stringify(env);
|
|
|
|
if (is_publish) {
|
|
if (nats_connection) {
|
|
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
|
|
} else {
|
|
await publishMessage(broker_url, subject, env_json_str, correlation_id);
|
|
}
|
|
}
|
|
|
|
return [env, env_json_str];
|
|
}
|
|
```
|
|
|
|
#### serializeData Implementation
|
|
|
|
```javascript
|
|
const arrow = require('apache-arrow');
|
|
|
|
async function serializeData(data, payload_type) {
|
|
if (payload_type === 'text') {
|
|
if (typeof data === 'string') {
|
|
return Buffer.from(data, 'utf8');
|
|
} else {
|
|
throw new Error('Text data must be a string');
|
|
}
|
|
} else if (payload_type === 'dictionary') {
|
|
const jsonStr = JSON.stringify(data);
|
|
return Buffer.from(jsonStr, 'utf8');
|
|
} else if (payload_type === 'arrowtable') {
|
|
// Convert Array<Object> to Arrow IPC
|
|
if (!Array.isArray(data) || data.length === 0) {
|
|
throw new Error('arrowtable data must be a non-empty array of objects');
|
|
}
|
|
|
|
// Create schema from first row
|
|
const schemaFields = Object.keys(data[0]).map(key =>
|
|
new arrow.Field(key, arrow.any())
|
|
);
|
|
const schema = new arrow.Schema(schemaFields);
|
|
|
|
// Create writer
|
|
const writer = new arrow.RecordBatchWriter([schema]);
|
|
|
|
// Write rows
|
|
for (const row of data) {
|
|
const recordBatch = arrow.recordBatch.fromObjects([row], schema);
|
|
writer.write(recordBatch);
|
|
}
|
|
await writer.close();
|
|
|
|
// Read buffer
|
|
return writer.toBuffer();
|
|
} else if (payload_type === 'jsontable') {
|
|
// Serialize directly to JSON
|
|
const jsonStr = JSON.stringify(data);
|
|
return Buffer.from(jsonStr, 'utf8');
|
|
} else if (payload_type === 'image') {
|
|
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
|
return Buffer.from(data);
|
|
} else {
|
|
throw new Error('Image data must be Uint8Array or Buffer');
|
|
}
|
|
} else if (payload_type === 'audio') {
|
|
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
|
return Buffer.from(data);
|
|
} else {
|
|
throw new Error('Audio data must be Uint8Array or Buffer');
|
|
}
|
|
} else if (payload_type === 'video') {
|
|
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
|
return Buffer.from(data);
|
|
} else {
|
|
throw new Error('Video data must be Uint8Array or Buffer');
|
|
}
|
|
} else if (payload_type === 'binary') {
|
|
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
|
return Buffer.from(data);
|
|
} else {
|
|
throw new Error('Binary data must be Uint8Array or Buffer');
|
|
}
|
|
} else {
|
|
throw new Error(`Unknown payload_type: ${payload_type}`);
|
|
}
|
|
}
|
|
|
|
function bufferToBase64(buffer) {
|
|
return buffer.toString('base64');
|
|
}
|
|
```
|
|
|
|
#### deserializeData Implementation
|
|
|
|
```javascript
|
|
const arrow = require('apache-arrow');
|
|
|
|
async function deserializeData(data, payload_type, correlation_id) {
|
|
if (payload_type === 'text') {
|
|
return Buffer.from(data).toString('utf8');
|
|
} else if (payload_type === 'dictionary') {
|
|
const jsonStr = Buffer.from(data).toString('utf8');
|
|
return JSON.parse(jsonStr);
|
|
} else if (payload_type === 'arrowtable') {
|
|
// Deserialize from Arrow IPC
|
|
const buffer = Buffer.from(data);
|
|
const table = arrow.tableFromRawBytes(buffer);
|
|
return table;
|
|
} else if (payload_type === 'jsontable') {
|
|
// Deserialize from JSON - returns Array<Object>
|
|
const jsonStr = Buffer.from(data).toString('utf8');
|
|
return JSON.parse(jsonStr);
|
|
} else if (payload_type === 'image') {
|
|
return Buffer.from(data);
|
|
} else if (payload_type === 'audio') {
|
|
return Buffer.from(data);
|
|
} else if (payload_type === 'video') {
|
|
return Buffer.from(data);
|
|
} else if (payload_type === 'binary') {
|
|
return Buffer.from(data);
|
|
} else {
|
|
throw new Error(`Unknown payload_type: ${payload_type}`);
|
|
}
|
|
}
|
|
```
|
|
|
|
#### fetchWithBackoff Implementation
|
|
|
|
```javascript
|
|
async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) {
|
|
let delay = base_delay;
|
|
|
|
for (let attempt = 1; attempt <= max_retries; attempt++) {
|
|
try {
|
|
const response = await fetch(url);
|
|
|
|
if (response.status === 200) {
|
|
console.log(`[Correlation: ${correlation_id}] Successfully fetched data from ${url} on attempt ${attempt}`);
|
|
return await response.arrayBuffer();
|
|
} else {
|
|
throw new Error(`Failed to fetch: ${response.status}`);
|
|
}
|
|
} catch (e) {
|
|
console.log(`[Correlation: ${correlation_id}] Attempt ${attempt} failed: ${e.constructor.name}`);
|
|
|
|
if (attempt < max_retries) {
|
|
await new Promise(resolve => setTimeout(resolve, delay));
|
|
delay = Math.min(delay * 2, max_delay);
|
|
}
|
|
}
|
|
}
|
|
|
|
throw new Error(`Failed to fetch data after ${max_retries} attempts`);
|
|
}
|
|
```
|
|
|
|
#### plikOneshotUpload Implementation
|
|
|
|
```javascript
|
|
async function plikOneshotUpload(file_server_url, dataname, data) {
|
|
// Get upload id
|
|
const url_getUploadID = `${file_server_url}/upload`;
|
|
const headers = { 'Content-Type': 'application/json' };
|
|
const body = JSON.stringify({ OneShot: true });
|
|
|
|
const http_response = await fetch(url_getUploadID, {
|
|
method: 'POST',
|
|
headers,
|
|
body
|
|
});
|
|
|
|
const response_json = await http_response.json();
|
|
const uploadid = response_json.id;
|
|
const uploadtoken = response_json.uploadToken;
|
|
|
|
// Upload file
|
|
const url_upload = `${file_server_url}/file/${uploadid}`;
|
|
const form = new FormData();
|
|
const blob = new Blob([data]);
|
|
form.append('file', blob, dataname);
|
|
|
|
const upload_headers = {
|
|
'X-UploadToken': uploadtoken
|
|
};
|
|
|
|
const upload_response = await fetch(url_upload, {
|
|
method: 'POST',
|
|
headers: upload_headers,
|
|
body: form
|
|
});
|
|
|
|
const upload_json = await upload_response.json();
|
|
const fileid = upload_json.id;
|
|
|
|
const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`;
|
|
|
|
return {
|
|
status: upload_response.status,
|
|
uploadid,
|
|
fileid,
|
|
url
|
|
};
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
### Python Implementation
|
|
|
|
#### Module Structure
|
|
|
|
```python
|
|
# natsbridge.py
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import uuid
|
|
import time
|
|
from typing import Any, Dict, List, Tuple, Union, Callable
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
|
|
try:
|
|
import pyarrow as arrow
|
|
import pyarrow.parquet as pq
|
|
ARROW_AVAILABLE = True
|
|
except ImportError:
|
|
ARROW_AVAILABLE = False
|
|
|
|
try:
|
|
import aiohttp
|
|
import nats
|
|
from nats.aio.client import Client as NATSClient
|
|
NATS_AVAILABLE = True
|
|
except ImportError:
|
|
NATS_AVAILABLE = False
|
|
|
|
|
|
DEFAULT_SIZE_THRESHOLD = 1_000_000
|
|
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
|
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
|
|
|
|
|
@dataclass
|
|
class MsgPayloadV1:
|
|
"""Message payload structure."""
|
|
id: str
|
|
dataname: str
|
|
payload_type: str
|
|
transport: str
|
|
encoding: str
|
|
size: int
|
|
data: Union[str, bytes]
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class MsgEnvelopeV1:
|
|
"""Message envelope structure."""
|
|
correlation_id: str
|
|
msg_id: str
|
|
timestamp: str
|
|
send_to: str
|
|
msg_purpose: str
|
|
sender_name: str
|
|
sender_id: str
|
|
receiver_name: str
|
|
receiver_id: str
|
|
reply_to: str
|
|
reply_to_msg_id: str
|
|
broker_url: str
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
payloads: List[MsgPayloadV1] = field(default_factory=list)
|
|
|
|
|
|
class NATSBridge:
|
|
"""Cross-platform NATS bridge implementation."""
|
|
|
|
def __init__(self, broker_url: str = None, fileserver_url: str = None):
|
|
self.broker_url = broker_url or DEFAULT_BROKER_URL
|
|
self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
|
|
self._nats_client: NATSClient = None
|
|
|
|
async def smartsend(self, subject: str, data: List[Tuple[str, Any, str]], **kwargs) -> Tuple[Dict, str]:
|
|
"""Send data via NATS."""
|
|
pass
|
|
|
|
async def smartreceive(self, msg: Any, **kwargs) -> Dict:
|
|
"""Receive and process NATS message."""
|
|
pass
|
|
```
|
|
|
|
#### smartsend Implementation
|
|
|
|
```python
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import uuid
|
|
from typing import Any, Dict, List, Tuple, Union, Callable
|
|
from datetime import datetime
|
|
|
|
DEFAULT_SIZE_THRESHOLD = 1_000_000
|
|
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
|
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
|
|
|
|
|
async def smartsend(
|
|
subject: str,
|
|
data: List[Tuple[str, Any, str]],
|
|
broker_url: str = DEFAULT_BROKER_URL,
|
|
fileserver_url: str = DEFAULT_FILESERVER_URL,
|
|
fileserver_upload_handler: Callable = plik_oneshot_upload,
|
|
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
|
|
correlation_id: str = None,
|
|
msg_purpose: str = "chat",
|
|
sender_name: str = "NATSBridge",
|
|
receiver_name: str = "",
|
|
receiver_id: str = "",
|
|
reply_to: str = "",
|
|
reply_to_msg_id: str = "",
|
|
is_publish: bool = True,
|
|
nats_connection: Any = None,
|
|
msg_id: str = None,
|
|
sender_id: str = None
|
|
) -> Tuple[Dict, str]:
|
|
"""
|
|
Send data via NATS with automatic transport selection.
|
|
|
|
Args:
|
|
subject: NATS subject to publish to
|
|
data: List of (dataname, data, type) tuples
|
|
**kwargs: Additional options
|
|
|
|
Returns:
|
|
Tuple of (env, env_json_str)
|
|
"""
|
|
if correlation_id is None:
|
|
correlation_id = str(uuid.uuid4())
|
|
if msg_id is None:
|
|
msg_id = str(uuid.uuid4())
|
|
if sender_id is None:
|
|
sender_id = str(uuid.uuid4())
|
|
|
|
print(f"[Correlation: {correlation_id}] Starting smartsend for subject: {subject}")
|
|
|
|
# Process payloads
|
|
payloads = []
|
|
for dataname, payload_data, payload_type in data:
|
|
payload_bytes = _serialize_data(payload_data, payload_type)
|
|
payload_size = len(payload_bytes)
|
|
|
|
print(f"[Correlation: {correlation_id}] Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes")
|
|
|
|
if payload_size < size_threshold:
|
|
# Direct path
|
|
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
|
|
print(f"[Correlation: {correlation_id}] Using direct transport for {payload_size} bytes")
|
|
|
|
payloads.append({
|
|
'id': str(uuid.uuid4()),
|
|
'dataname': dataname,
|
|
'payload_type': payload_type,
|
|
'transport': 'direct',
|
|
'encoding': 'base64',
|
|
'size': payload_size,
|
|
'data': payload_b64,
|
|
'metadata': {'payload_bytes': payload_size}
|
|
})
|
|
else:
|
|
# Link path
|
|
print(f"[Correlation: {correlation_id}] Using link transport, uploading to fileserver")
|
|
|
|
response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
|
|
|
if response['status'] != 200:
|
|
raise Exception(f"Failed to upload data to fileserver: {response['status']}")
|
|
|
|
print(f"[Correlation: {correlation_id}] Uploaded to URL: {response['url']}")
|
|
|
|
payloads.append({
|
|
'id': str(uuid.uuid4()),
|
|
'dataname': dataname,
|
|
'payload_type': payload_type,
|
|
'transport': 'link',
|
|
'encoding': 'none',
|
|
'size': payload_size,
|
|
'data': response['url'],
|
|
'metadata': {}
|
|
})
|
|
|
|
# Build envelope
|
|
env = {
|
|
'correlation_id': correlation_id,
|
|
'msg_id': msg_id,
|
|
'timestamp': datetime.utcnow().isoformat() + 'Z',
|
|
'send_to': subject,
|
|
'msg_purpose': msg_purpose,
|
|
'sender_name': sender_name,
|
|
'sender_id': sender_id,
|
|
'receiver_name': receiver_name,
|
|
'receiver_id': receiver_id,
|
|
'reply_to': reply_to,
|
|
'reply_to_msg_id': reply_to_msg_id,
|
|
'broker_url': broker_url,
|
|
'metadata': {},
|
|
'payloads': payloads
|
|
}
|
|
|
|
env_json_str = json.dumps(env)
|
|
|
|
if is_publish:
|
|
if nats_connection:
|
|
await publish_message(nats_connection, subject, env_json_str, correlation_id)
|
|
else:
|
|
await publish_message(broker_url, subject, env_json_str, correlation_id)
|
|
|
|
return env, env_json_str
|
|
```
|
|
|
|
#### serializeData Implementation
|
|
|
|
```python
|
|
import base64
|
|
import json
|
|
from typing import Any
|
|
|
|
try:
|
|
import pyarrow as arrow
|
|
import pyarrow.feather as feather
|
|
import pyarrow.ipc as ipc
|
|
ARROW_AVAILABLE = True
|
|
except ImportError:
|
|
ARROW_AVAILABLE = False
|
|
|
|
|
|
def _serialize_data(data: Any, payload_type: str) -> bytes:
|
|
"""Serialize data to bytes based on type."""
|
|
if payload_type == 'text':
|
|
if isinstance(data, str):
|
|
return data.encode('utf-8')
|
|
else:
|
|
raise Error('Text data must be a string')
|
|
elif payload_type == 'dictionary':
|
|
json_str = json.dumps(data)
|
|
return json_str.encode('utf-8')
|
|
elif payload_type == 'arrowtable':
|
|
if not ARROW_AVAILABLE:
|
|
raise Error('pyarrow not available for table serialization')
|
|
|
|
import io
|
|
buf = io.BytesIO()
|
|
import pandas as pd
|
|
if isinstance(data, pd.DataFrame):
|
|
# Serialize DataFrame to Arrow
|
|
table = arrow.Table.from_pandas(data)
|
|
sink = arrow.ipc.new_file(buf)
|
|
arrow.ipc.write_table(table, sink)
|
|
sink.close()
|
|
return buf.getvalue()
|
|
else:
|
|
raise Error('arrowtable data must be a pandas DataFrame')
|
|
elif payload_type == 'jsontable':
|
|
# Serialize directly to JSON
|
|
json_str = json.dumps(data)
|
|
return json_str.encode('utf-8')
|
|
elif payload_type == 'image':
|
|
if isinstance(data, (bytes, bytearray)):
|
|
return bytes(data)
|
|
else:
|
|
raise Error('Image data must be bytes')
|
|
elif payload_type == 'audio':
|
|
if isinstance(data, (bytes, bytearray)):
|
|
return bytes(data)
|
|
else:
|
|
raise Error('Audio data must be bytes')
|
|
elif payload_type == 'video':
|
|
if isinstance(data, (bytes, bytearray)):
|
|
return bytes(data)
|
|
else:
|
|
raise Error('Video data must be bytes')
|
|
elif payload_type == 'binary':
|
|
if isinstance(data, (bytes, bytearray)):
|
|
return bytes(data)
|
|
else:
|
|
raise Error('Binary data must be bytes')
|
|
else:
|
|
raise Error(f'Unknown payload_type: {payload_type}')
|
|
```
|
|
|
|
#### deserializeData Implementation
|
|
|
|
```python
|
|
import base64
|
|
import json
|
|
from typing import Any
|
|
|
|
try:
|
|
import pyarrow as arrow
|
|
import pyarrow.feather as feather
|
|
import pyarrow.ipc as ipc
|
|
ARROW_AVAILABLE = True
|
|
except ImportError:
|
|
ARROW_AVAILABLE = False
|
|
|
|
|
|
def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any:
|
|
"""Deserialize bytes to data based on type."""
|
|
if payload_type == 'text':
|
|
return data.decode('utf-8')
|
|
elif payload_type == 'dictionary':
|
|
json_str = data.decode('utf-8')
|
|
return json.loads(json_str)
|
|
elif payload_type == 'arrowtable':
|
|
if not ARROW_AVAILABLE:
|
|
raise Error('pyarrow not available for table deserialization')
|
|
|
|
import io
|
|
buf = io.BytesIO(data)
|
|
reader = arrow.ipc.open_file(buf)
|
|
return reader.read_all().to_pandas()
|
|
elif payload_type == 'jsontable':
|
|
# Deserialize from JSON - returns list[dict]
|
|
json_str = data.decode('utf-8')
|
|
return json.loads(json_str)
|
|
elif payload_type == 'image':
|
|
return data
|
|
elif payload_type == 'audio':
|
|
return data
|
|
elif payload_type == 'video':
|
|
return data
|
|
elif payload_type == 'binary':
|
|
return data
|
|
else:
|
|
raise Error(f'Unknown payload_type: {payload_type}')
|
|
```
|
|
|
|
#### fetchWithBackoff Implementation
|
|
|
|
```python
|
|
import asyncio
|
|
import aiohttp
|
|
from typing import Callable
|
|
|
|
|
|
async def fetch_with_backoff(
|
|
url: str,
|
|
max_retries: int,
|
|
base_delay: int,
|
|
max_delay: int,
|
|
correlation_id: str
|
|
) -> bytes:
|
|
"""Fetch URL with exponential backoff."""
|
|
delay = base_delay
|
|
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
print(f"[Correlation: {correlation_id}] Successfully fetched data from {url} on attempt {attempt}")
|
|
return await response.read()
|
|
else:
|
|
raise Exception(f"Failed to fetch: {response.status}")
|
|
except Exception as e:
|
|
print(f"[Correlation: {correlation_id}] Attempt {attempt} failed: {type(e).__name__}")
|
|
|
|
if attempt < max_retries:
|
|
await asyncio.sleep(delay / 1000.0)
|
|
delay = min(delay * 2, max_delay)
|
|
|
|
raise Exception(f"Failed to fetch data after {max_retries} attempts")
|
|
```
|
|
|
|
#### plikOneshotUpload Implementation
|
|
|
|
```python
|
|
import aiohttp
|
|
import json
|
|
from typing import Dict, Any
|
|
|
|
|
|
async def plik_oneshot_upload(
|
|
file_server_url: str,
|
|
dataname: str,
|
|
data: bytes
|
|
) -> Dict[str, Any]:
|
|
"""Upload data to plik server in one-shot mode."""
|
|
|
|
# Get upload id
|
|
async with aiohttp.ClientSession() as session:
|
|
url_getUploadID = f"{file_server_url}/upload"
|
|
headers = {'Content-Type': 'application/json'}
|
|
body = json.dumps({"OneShot": True})
|
|
|
|
async with session.post(url_getUploadID, headers=headers, data=body) as response:
|
|
response_json = await response.json()
|
|
uploadid = response_json['id']
|
|
uploadtoken = response_json['uploadToken']
|
|
|
|
# Upload file
|
|
url_upload = f"{file_server_url}/file/{uploadid}"
|
|
headers = {'X-UploadToken': uploadtoken}
|
|
|
|
form = aiohttp.FormData()
|
|
form.add_field('file', data, filename=dataname, content_type='application/octet-stream')
|
|
|
|
async with session.post(url_upload, headers=headers, data=form) as upload_response:
|
|
upload_json = await upload_response.json()
|
|
fileid = upload_json['id']
|
|
|
|
url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}"
|
|
|
|
return {
|
|
'status': upload_response.status,
|
|
'uploadid': uploadid,
|
|
'fileid': fileid,
|
|
'url': url
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## MicroPython Implementation
|
|
|
|
### Limitations
|
|
|
|
MicroPython has significant constraints compared to desktop implementations:
|
|
|
|
| Feature | Desktop | MicroPython |
|
|
|---------|---------|-------------|
|
|
| Memory | Unlimited | ~256KB - 1MB |
|
|
| Arrow IPC | ✅ | ❌ (not supported) |
|
|
| Async/Await | ✅ | ⚠️ (uasyncio only) |
|
|
| Large payloads (>1MB) | ✅ | ❌ (enforced limit) |
|
|
| arrowtable | ✅ | ❌ |
|
|
| jsontable | ⚠️ (limited) | ⚠️ (limited) |
|
|
| Multiple payloads | ✅ | ⚠️ (limited) |
|
|
|
|
### MicroPython Module Structure
|
|
|
|
```python
|
|
# natsbridge_mpy.py (MicroPython)
|
|
import network
|
|
import time
|
|
import json
|
|
import base64
|
|
import uos
|
|
import struct
|
|
|
|
# Constants
|
|
DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython
|
|
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
|
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
|
MAX_PAYLOAD_SIZE = 50000 # Hard limit
|
|
|
|
# Note: MicroPython uses list[list] for jsontable
|
|
# No DataFrame support
|
|
|
|
|
|
class NATSBridge:
|
|
"""MicroPython NATS bridge implementation."""
|
|
|
|
def __init__(self, broker_url=None, fileserver_url=None):
|
|
self.broker_url = broker_url or DEFAULT_BROKER_URL
|
|
self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
|
|
self._nats_conn = None
|
|
|
|
def smartsend(self, subject, data, **kwargs):
|
|
"""Send data (synchronous)."""
|
|
correlation_id = self._generate_uuid()
|
|
msg_id = self._generate_uuid()
|
|
sender_id = self._generate_uuid()
|
|
|
|
print(f"[Correlation: {correlation_id}] Starting smartsend")
|
|
|
|
payloads = []
|
|
for dataname, payload_data, payload_type in data:
|
|
payload_bytes = self._serialize_data(payload_data, payload_type)
|
|
payload_size = len(payload_bytes)
|
|
|
|
if payload_size > MAX_PAYLOAD_SIZE:
|
|
raise MemoryError(f"Payload {dataname} exceeds max size {MAX_PAYLOAD_SIZE}")
|
|
|
|
if payload_size < DEFAULT_SIZE_THRESHOLD:
|
|
# Direct path
|
|
payload_b64 = base64.b64encode(payload_bytes).decode('ascii')
|
|
payloads.append({
|
|
'id': self._generate_uuid(),
|
|
'dataname': dataname,
|
|
'payload_type': payload_type,
|
|
'transport': 'direct',
|
|
'encoding': 'base64',
|
|
'size': payload_size,
|
|
'data': payload_b64
|
|
})
|
|
else:
|
|
# Link path (limited support)
|
|
response = self._sync_fileserver_upload(self.fileserver_url, dataname, payload_bytes)
|
|
payloads.append({
|
|
'id': self._generate_uuid(),
|
|
'dataname': dataname,
|
|
'payload_type': payload_type,
|
|
'transport': 'link',
|
|
'encoding': 'none',
|
|
'size': payload_size,
|
|
'data': response['url']
|
|
})
|
|
|
|
env = {
|
|
'correlation_id': correlation_id,
|
|
'msg_id': msg_id,
|
|
'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()),
|
|
'send_to': subject,
|
|
'msg_purpose': kwargs.get('msg_purpose', 'chat'),
|
|
'sender_name': kwargs.get('sender_name', 'NATSBridge'),
|
|
'sender_id': sender_id,
|
|
'receiver_name': kwargs.get('receiver_name', ''),
|
|
'receiver_id': kwargs.get('receiver_id', ''),
|
|
'reply_to': kwargs.get('reply_to', ''),
|
|
'reply_to_msg_id': kwargs.get('reply_to_msg_id', ''),
|
|
'broker_url': self.broker_url,
|
|
'metadata': {},
|
|
'payloads': payloads
|
|
}
|
|
|
|
env_json_str = json.dumps(env)
|
|
|
|
# Publish
|
|
self._publish(subject, env_json_str, correlation_id)
|
|
|
|
return env, env_json_str
|
|
|
|
def smartreceive(self, msg, **kwargs):
|
|
"""Receive and process message (synchronous)."""
|
|
env_json_obj = json.loads(msg.payload)
|
|
correlation_id = env_json_obj['correlation_id']
|
|
|
|
payloads_list = []
|
|
for payload in env_json_obj['payloads']:
|
|
transport = payload['transport']
|
|
dataname = payload['dataname']
|
|
|
|
if transport == 'direct':
|
|
payload_b64 = payload['data']
|
|
payload_bytes = base64.b64decode(payload_b64)
|
|
data_type = payload['payload_type']
|
|
data = self._deserialize_data(payload_bytes, data_type)
|
|
payloads_list.append((dataname, data, data_type))
|
|
elif transport == 'link':
|
|
url = payload['data']
|
|
downloaded_data = self._sync_fileserver_download(
|
|
url,
|
|
kwargs.get('max_retries', 3),
|
|
kwargs.get('base_delay', 100),
|
|
kwargs.get('max_delay', 1000),
|
|
correlation_id
|
|
)
|
|
data_type = payload['payload_type']
|
|
data = self._deserialize_data(downloaded_data, data_type)
|
|
payloads_list.append((dataname, data, data_type))
|
|
|
|
env_json_obj['payloads'] = payloads_list
|
|
return env_json_obj
|
|
|
|
def _serialize_data(self, data, payload_type):
|
|
"""Serialize data (MicroPython version - no arrowtable support)."""
|
|
if payload_type == 'text':
|
|
return data.encode('utf-8')
|
|
elif payload_type == 'dictionary':
|
|
return json.dumps(data).encode('utf-8')
|
|
elif payload_type == 'jsontable':
|
|
# Serialize list of lists to JSON
|
|
return json.dumps(data).encode('utf-8')
|
|
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
|
return bytes(data)
|
|
else:
|
|
raise ValueError(f"Unknown payload_type: {payload_type}")
|
|
|
|
def _deserialize_data(self, data, payload_type):
|
|
"""Deserialize data (MicroPython version)."""
|
|
if payload_type == 'text':
|
|
return data.decode('utf-8')
|
|
elif payload_type == 'dictionary':
|
|
return json.loads(data.decode('utf-8'))
|
|
elif payload_type == 'jsontable':
|
|
# Returns list of lists
|
|
return json.loads(data.decode('utf-8'))
|
|
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
|
return data
|
|
else:
|
|
raise ValueError(f"Unknown payload_type: {payload_type}")
|
|
|
|
def _generate_uuid(self):
|
|
"""Generate simple UUID (MicroPython compatible)."""
|
|
return 'mp-%04x%04x-%04x-%04x-%04x-%04x%04x%04x' % (
|
|
time.time_ns() // (10**6) % 0xFFFFFFFF,
|
|
time.time_ns() % 0xFFFFFFFF,
|
|
time.time_ns() >> 32 & 0xFFFF,
|
|
time.time_ns() >> 48 & 0xFFFF,
|
|
time.time_ns() >> 64 & 0xFFFF,
|
|
time.time_ns() >> 80 & 0xFFFF,
|
|
time.time_ns() >> 96 & 0xFFFF,
|
|
time.time_ns() >> 112 & 0xFFFF
|
|
)
|
|
|
|
def _sync_fileserver_upload(self, url, dataname, data):
|
|
"""Synchronous file upload (limited)."""
|
|
# Simplified implementation for MicroPython
|
|
# In practice, would use network.HTTP or similar
|
|
raise NotImplementedError("File upload not implemented in MicroPython")
|
|
|
|
def _sync_fileserver_download(self, url, max_retries, base_delay, max_delay, correlation_id):
|
|
"""Synchronous file download with backoff."""
|
|
# Simplified implementation for MicroPython
|
|
raise NotImplementedError("File download not implemented in MicroPython")
|
|
|
|
def _publish(self, subject, message, correlation_id):
|
|
"""Publish message to NATS."""
|
|
# Simplified implementation for MicroPython
|
|
raise NotImplementedError("NATS publishing not implemented in MicroPython")
|
|
```
|
|
|
|
---
|
|
|
|
## Configuration
|
|
|
|
### Environment Variables
|
|
|
|
| Variable | Default | Description |
|
|
|----------|---------|-------------|
|
|
| `NATS_URL` | `nats://localhost:4222` | NATS server URL |
|
|
| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL |
|
|
| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) |
|
|
|
|
### MicroPython Configuration
|
|
|
|
```python
|
|
# micropython.conf
|
|
NATS_URL = "nats://broker.local:4222"
|
|
FILESERVER_URL = "http://fileserver.local:8080"
|
|
SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices
|
|
MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython
|
|
```
|
|
|
|
---
|
|
|
|
## Performance Considerations
|
|
|
|
### Zero-Copy Reading
|
|
|
|
| Platform | Strategy |
|
|
|----------|----------|
|
|
| **Julia** | `Arrow.read()` with memory-mapped files |
|
|
| **JavaScript** | `ArrayBuffer` with `DataView` |
|
|
| **Python** | `pyarrow` memory mapping |
|
|
| **MicroPython** | Not available (streaming only) |
|
|
|
|
### Exponential Backoff
|
|
|
|
All platforms implement exponential backoff for HTTP downloads:
|
|
|
|
```python
|
|
# Python
|
|
async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id):
|
|
delay = base_delay
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
return await response.read()
|
|
except Exception as e:
|
|
if attempt < max_retries:
|
|
await asyncio.sleep(delay / 1000.0)
|
|
delay = min(delay * 2, max_delay)
|
|
raise Exception("Failed to fetch after max retries")
|
|
```
|
|
|
|
### Correlation ID Logging
|
|
|
|
All platforms use correlation IDs for distributed tracing:
|
|
|
|
```
|
|
[timestamp] [Correlation: abc123] Message published to subject
|
|
```
|
|
|
|
### Serialization Performance
|
|
|
|
| Format | Use Case | Pros | Cons |
|
|
|--------|----------|------|------|
|
|
| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library, not supported in MicroPython |
|
|
| `jsontable` | Small/medium tabular data | Human-readable, universal support, works in MicroPython | Slower, larger size, no schema enforcement |
|
|
|
|
---
|
|
|
|
## Testing
|
|
|
|
### Test File Organization
|
|
|
|
| Platform | Sender Tests | Receiver Tests |
|
|
|----------|--------------|----------------|
|
|
| **Julia** | `test/test_julia_*_sender.jl` | `test/test_julia_*_receiver.jl` |
|
|
| **JavaScript** | `test/test_js_*_sender.js` | `test/test_js_*_receiver.js` |
|
|
| **Python** | `test/test_py_*_sender.py` | `test/test_py_*_receiver.py` |
|
|
|
|
### Run Tests
|
|
|
|
```bash
|
|
# Julia
|
|
julia test/test_julia_text_sender.jl
|
|
julia test/test_julia_text_receiver.jl
|
|
|
|
# JavaScript (Node.js)
|
|
node test/test_js_text_sender.js
|
|
node test/test_js_text_receiver.js
|
|
|
|
# Python
|
|
python3 test/test_py_text_sender.py
|
|
python3 test/test_py_text_receiver.py
|
|
```
|
|
|
|
---
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
1. **NATS Connection Failed**
|
|
- Ensure NATS server is running
|
|
- Check `broker_url` configuration
|
|
|
|
2. **HTTP Upload Failed**
|
|
- Ensure file server is running
|
|
- Check `fileserver_url` configuration
|
|
- Verify upload permissions
|
|
|
|
3. **Arrow IPC Deserialization Error**
|
|
- Ensure data is properly serialized to Arrow format
|
|
- Check Arrow version compatibility
|
|
- MicroPython doesn't support Arrow IPC
|
|
|
|
4. **Memory Constraints (MicroPython)**
|
|
- Reduce `size_threshold`
|
|
- Use direct transport only (< 100KB)
|
|
- Avoid large payloads
|
|
- Use `jsontable` instead of `arrowtable` (arrowtable not supported)
|
|
|
|
---
|
|
|
|
## Summary
|
|
|
|
This cross-platform NATS bridge provides:
|
|
|
|
1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across all platforms
|
|
2. **Idiomatic Implementations**:
|
|
- **Julia**: Multiple dispatch, struct-based design, native Arrow IPC
|
|
- **JavaScript**: Async/await, prototype-based utilities, class-based NATS client
|
|
- **Python**: Class-based design with dataclasses, type hints, async/await
|
|
- **MicroPython**: Synchronous API, memory-constrained optimizations
|
|
3. **Message Format Consistency**: Identical JSON schemas across all platforms
|
|
4. **Handler Abstraction**: File server operations abstracted through configurable handlers
|
|
5. **Platform-Specific Optimizations**:
|
|
- **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython)
|
|
- **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in all platforms)
|
|
|
|
The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior.
|
|
|
|
### Datatype Summary
|
|
|
|
| Datatype | Serialization | Use Case | Encoding | Supported Platforms |
|
|
|----------|---------------|----------|----------|---------------------|
|
|
| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python |
|
|
| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python, MicroPython |
|