2242 lines
67 KiB
Markdown
2242 lines
67 KiB
Markdown
# Cross-Platform Implementation Guide: Bi-Directional Data Bridge
|
||
|
||
## Overview
|
||
|
||
This document describes the implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language.
|
||
|
||
**Supported Platforms:**
|
||
- **Julia** - Ground truth implementation (reference)
|
||
- **JavaScript** - Node.js and browser implementation
|
||
- **Python/MicroPython** - Desktop and embedded implementation
|
||
|
||
---
|
||
|
||
## Implementation Files
|
||
|
||
| Language | Implementation File | Description |
|
||
|----------|---------------------|-------------|
|
||
| **Julia** | [`src/NATSBridge.jl`](../src/NATSBridge.jl) | Full Julia implementation with Arrow IPC support |
|
||
| **JavaScript** | `src/natsbridge.js` | Node.js/browser implementation |
|
||
| **Python** | `src/natsbridge.py` | Desktop Python implementation |
|
||
| **MicroPython** | `src/natsbridge_mpy.py` | MicroPython implementation (limited features) |
|
||
|
||
---
|
||
|
||
## File Server Handler Architecture
|
||
|
||
The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
|
||
|
||
### Handler Function Signatures
|
||
|
||
#### Julia
|
||
|
||
```julia
|
||
# Upload handler - uploads data to file server and returns URL
|
||
fileserver_upload_handler(
|
||
fileserver_url::String,
|
||
dataname::String,
|
||
data::Vector{UInt8}
|
||
)::Dict{String, Any}
|
||
|
||
# Download handler - fetches data from file server URL with exponential backoff
|
||
fileserver_download_handler(
|
||
url::String,
|
||
max_retries::Int,
|
||
base_delay::Int,
|
||
max_delay::Int,
|
||
correlation_id::String
|
||
)::Vector{UInt8}
|
||
```
|
||
|
||
#### JavaScript
|
||
|
||
```javascript
|
||
// Upload handler - async function
|
||
async function fileserver_upload_handler(
|
||
fileserver_url,
|
||
dataname,
|
||
data // Uint8Array
|
||
) {
|
||
// Returns: { status, uploadid, fileid, url }
|
||
}
|
||
|
||
// Download handler - async function
|
||
async function fileserver_download_handler(
|
||
url,
|
||
max_retries,
|
||
base_delay,
|
||
max_delay,
|
||
correlation_id
|
||
) {
|
||
// Returns: Uint8Array
|
||
}
|
||
```
|
||
|
||
#### Python
|
||
|
||
```python
|
||
# Upload handler - async function
|
||
async def fileserver_upload_handler(
|
||
fileserver_url: str,
|
||
dataname: str,
|
||
data: bytes
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Upload data to file server.
|
||
|
||
Returns:
|
||
Dict with keys: 'status', 'uploadid', 'fileid', 'url'
|
||
"""
|
||
pass
|
||
|
||
# Download handler - async function
|
||
async def fileserver_download_handler(
|
||
url: str,
|
||
max_retries: int,
|
||
base_delay: int,
|
||
max_delay: int,
|
||
correlation_id: str
|
||
) -> bytes:
|
||
"""
|
||
Download data from URL with exponential backoff.
|
||
|
||
Returns:
|
||
Downloaded bytes
|
||
"""
|
||
pass
|
||
```
|
||
|
||
#### MicroPython
|
||
|
||
```python
|
||
# Upload handler - synchronous (no async in MicroPython)
|
||
def fileserver_upload_handler(
|
||
fileserver_url: str,
|
||
dataname: str,
|
||
data: bytearray
|
||
) -> Dict:
|
||
"""
|
||
Upload data to file server (synchronous).
|
||
|
||
Returns:
|
||
Dict with keys: 'status', 'url'
|
||
"""
|
||
pass
|
||
|
||
# Download handler - synchronous
|
||
def fileserver_download_handler(
|
||
url: str,
|
||
max_retries: int,
|
||
base_delay: int,
|
||
max_delay: int,
|
||
correlation_id: str
|
||
) -> bytearray:
|
||
"""
|
||
Download data from URL with exponential backoff (synchronous).
|
||
|
||
Returns:
|
||
Downloaded bytes
|
||
"""
|
||
pass
|
||
```
|
||
|
||
---
|
||
|
||
## Multi-Payload Support (Standard API)
|
||
|
||
The system uses a **standardized list-of-tuples format** for all payload operations across all platforms.
|
||
|
||
### API Standard
|
||
|
||
```
|
||
# Input format for smartsend (always a list of tuples with type info)
|
||
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
||
|
||
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
|
||
{
|
||
"correlation_id": "...",
|
||
"msg_id": "...",
|
||
"timestamp": "...",
|
||
"send_to": "...",
|
||
"msg_purpose": "...",
|
||
"sender_name": "...",
|
||
"sender_id": "...",
|
||
"receiver_name": "...",
|
||
"receiver_id": "...",
|
||
"reply_to": "...",
|
||
"reply_to_msg_id": "...",
|
||
"broker_url": "...",
|
||
"metadata": {...},
|
||
"payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
||
}
|
||
```
|
||
|
||
### Supported Types
|
||
|
||
| Type | Julia | JavaScript | Python | MicroPython |
|
||
|------|-------|------------|--------|-------------|
|
||
| `text` | `String` | `string` | `str` | `str` |
|
||
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` |
|
||
| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array<Object>` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) |
|
||
| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array<Object>` | `list[dict]`, `list` | `list` |
|
||
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
||
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
||
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
||
| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` |
|
||
|
||
### Cross-Platform Examples
|
||
|
||
#### Julia
|
||
|
||
```julia
|
||
using NATSBridge
|
||
|
||
# Single payload - still wrapped in a list
|
||
env, env_json_str = smartsend(
|
||
"/test",
|
||
[("dataname1", data1, "dictionary")],
|
||
broker_url="nats://localhost:4222",
|
||
fileserver_upload_handler=plik_oneshot_upload
|
||
)
|
||
|
||
# Multiple payloads with different types
|
||
env, env_json_str = smartsend(
|
||
"/test",
|
||
[("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")],
|
||
broker_url="nats://localhost:4222"
|
||
)
|
||
|
||
# Mixed content (chat with text, image, audio)
|
||
env, env_json_str = smartsend(
|
||
"/chat",
|
||
[
|
||
("message_text", "Hello!", "text"),
|
||
("user_image", image_data, "image"),
|
||
("audio_clip", audio_data, "audio")
|
||
],
|
||
broker_url="nats://localhost:4222"
|
||
)
|
||
|
||
# Receive returns a JSON.Object{String, Any} envelope
|
||
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
|
||
# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}}
|
||
# Access payloads: env["payloads"] which is a Vector of tuples
|
||
for (dataname, data, type) in env["payloads"]
|
||
println("$dataname: $data (type: $type)")
|
||
end
|
||
```
|
||
|
||
#### JavaScript
|
||
|
||
```javascript
|
||
const NATSBridge = require('natsbridge');
|
||
|
||
// Single payload
|
||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||
"/test",
|
||
[["dataname1", data1, "dictionary"]],
|
||
{
|
||
broker_url: "nats://localhost:4222",
|
||
fileserver_upload_handler: plikOneshotUpload
|
||
}
|
||
);
|
||
|
||
// Multiple payloads
|
||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||
"/test",
|
||
[
|
||
["dataname1", data1, "dictionary"],
|
||
["dataname2", data2, "arrowtable"]
|
||
],
|
||
{ broker_url: "nats://localhost:4222" }
|
||
);
|
||
|
||
// Mixed content
|
||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||
"/chat",
|
||
[
|
||
["message_text", "Hello!", "text"],
|
||
["user_image", imageData, "image"],
|
||
["audio_clip", audioData, "audio"]
|
||
],
|
||
{ broker_url: "nats://localhost:4222" }
|
||
);
|
||
|
||
// Receive
|
||
const env = await NATSBridge.smartreceive(msg, {
|
||
fileserver_download_handler: fetchWithBackoff
|
||
});
|
||
// env is an object with "payloads" field containing Array of arrays
|
||
// Access payloads: env.payloads which is an Array of [dataname, data, type] arrays
|
||
for (const [dataname, data, type] of env.payloads) {
|
||
console.log(`${dataname}: ${data} (type: ${type})`);
|
||
}
|
||
```
|
||
|
||
#### Python
|
||
|
||
```python
|
||
from natsbridge import NATSBridge
|
||
|
||
# Single payload
|
||
env, env_json_str = await NATSBridge.smartsend(
|
||
"/test",
|
||
[("dataname1", data1, "dictionary")],
|
||
broker_url="nats://localhost:4222",
|
||
fileserver_upload_handler=plik_oneshot_upload
|
||
)
|
||
|
||
# Multiple payloads
|
||
env, env_json_str = await NATSBridge.smartsend(
|
||
"/test",
|
||
[("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")],
|
||
broker_url="nats://localhost:4222"
|
||
)
|
||
|
||
# Mixed content
|
||
env, env_json_str = await NATSBridge.smartsend(
|
||
"/chat",
|
||
[
|
||
("message_text", "Hello!", "text"),
|
||
("user_image", image_data, "image"),
|
||
("audio_clip", audio_data, "audio")
|
||
],
|
||
broker_url="nats://localhost:4222"
|
||
)
|
||
|
||
# Receive
|
||
env = await NATSBridge.smartreceive(
|
||
msg,
|
||
fileserver_download_handler=fetch_with_backoff
|
||
)
|
||
# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
|
||
# Access payloads: env["payloads"] which is a list of tuples
|
||
for dataname, data, type_ in env["payloads"]:
|
||
print(f"{dataname}: {data} (type: {type_})")
|
||
```
|
||
|
||
#### MicroPython
|
||
|
||
```python
|
||
from natsbridge import NATSBridge
|
||
|
||
# Limited to text and binary (no tables due to memory constraints)
|
||
env, env_json_str = NATSBridge.smartsend(
|
||
"/chat",
|
||
[
|
||
("message_text", "Hello!", "text"),
|
||
("binary_data", data_bytes, "binary")
|
||
],
|
||
broker_url="nats://localhost:4222",
|
||
size_threshold=100000 # Lower threshold for memory constraints
|
||
)
|
||
# Note: MicroPython uses synchronous handlers
|
||
```
|
||
|
||
---
|
||
|
||
## Row-Oriented vs Column-Oriented Data Structures
|
||
|
||
Different platforms use different internal representations for tabular data. Understanding these differences is crucial for proper serialization/deserialization when using `jsontable` and `arrowtable` datatypes.
|
||
|
||
### Data Structure Comparison
|
||
|
||
| Platform | Table Structure | Orientation |
|
||
|----------|-----------------|-------------|
|
||
| **Julia (DataFrame)** | `Dict{String, Vector}` | Column-oriented |
|
||
| **Python (pandas)** | `dict[str, list]` | Column-oriented |
|
||
| **JavaScript** | `Array<Object>` | Row-oriented |
|
||
| **MicroPython** | `list[list]` | Row-oriented |
|
||
|
||
### Column-Oriented (Julia DataFrame, Python pandas)
|
||
|
||
In column-oriented structures, each column is stored as a separate array/vector:
|
||
|
||
**Julia Example:**
|
||
```julia
|
||
# Create dictionary with column vectors
|
||
dict = Dict("customer age" => [15, 20, 25],
|
||
"first name" => ["Rohit", "Rahul", "Akshat"])
|
||
|
||
# Convert to DataFrame
|
||
df = DataFrame(dict)
|
||
println(df)
|
||
# Output:
|
||
# 3×2 DataFrame
|
||
# Row ┆ customer age ┆ first name
|
||
# ┆ Int64 ┆ String
|
||
# ─────┼──────────────┼────────────
|
||
# 1 ┆ 15 ┆ "Rohit"
|
||
# 2 ┆ 20 ┆ "Rahul"
|
||
# 3 ┆ 25 ┆ "Akshat"
|
||
```
|
||
|
||
**Python Example:**
|
||
```python
|
||
# Create dictionary with column lists
|
||
data = {
|
||
"Name": ["Alice", "Bob", "Charlie"],
|
||
"Age": [25, 30, 35],
|
||
"Score": [88.5, 92.0, 79.5]
|
||
}
|
||
|
||
# Convert to DataFrame
|
||
df = pd.DataFrame(data)
|
||
print(df)
|
||
# Output:
|
||
# Name Age Score
|
||
# 0 Alice 25 88.5
|
||
# 1 Bob 30 92.0
|
||
# 2 Charlie 35 79.5
|
||
```
|
||
|
||
### Row-Oriented (JavaScript, MicroPython)
|
||
|
||
In row-oriented structures, each row is stored as a separate object/array:
|
||
|
||
**JavaScript Example:**
|
||
```javascript
|
||
// Array of objects (row-oriented)
|
||
const users = [
|
||
{ Name: "Alice", Age: 25, Score: 88.5 },
|
||
{ Name: "Bob", Age: 30, Score: 92.0 },
|
||
{ Name: "Charlie", Age: 35, Score: 79.5 }
|
||
];
|
||
```
|
||
|
||
**MicroPython Example:**
|
||
```python
|
||
# List of lists (row-oriented)
|
||
users = [
|
||
["Alice", 25, 88.5],
|
||
["Bob", 30, 92.0],
|
||
["Charlie", 35, 79.5]
|
||
]
|
||
```
|
||
|
||
### Cross-Platform Conversion for jsontable
|
||
|
||
When sending `jsontable` across platforms, the system performs automatic conversion between row-oriented and column-oriented formats:
|
||
|
||
**Sending from Julia/Python (column-oriented) to JS/MicroPython (row-oriented):**
|
||
1. Convert column-oriented dict to row-oriented array of objects
|
||
2. Serialize to JSON
|
||
3. Send with `payload_type = "jsontable"`
|
||
|
||
**Receiving from JS/MicroPython (row-oriented) to Julia/Python (column-oriented):**
|
||
1. Deserialize JSON to row-oriented array of objects
|
||
2. Convert to column-oriented dict
|
||
3. Create DataFrame from column-oriented dict
|
||
|
||
**Example: Julia to JavaScript**
|
||
```julia
|
||
# Julia side - column-oriented DataFrame
|
||
df = DataFrame(
|
||
"Name" => ["Alice", "Bob", "Charlie"],
|
||
"Age" => [25, 30, 35],
|
||
"Score" => [88.5, 92.0, 79.5]
|
||
)
|
||
|
||
# smartsend automatically converts to row-oriented JSON
|
||
env, env_json_str = smartsend(
|
||
"/data",
|
||
[("users", df, "jsontable")]
|
||
)
|
||
# JSON sent: [{"Name":"Alice","Age":25,"Score":88.5}, ...]
|
||
```
|
||
|
||
```javascript
|
||
// JavaScript side - receives row-oriented array
|
||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||
"/data",
|
||
[["users", users, "jsontable"]]
|
||
);
|
||
// users is already row-oriented: [{Name: "Alice", Age: 25, ...}, ...]
|
||
```
|
||
|
||
**Example: JavaScript to Julia**
|
||
```javascript
|
||
// JavaScript side - row-oriented array
|
||
const users = [
|
||
{ Name: "Alice", Age: 25, Score: 88.5 },
|
||
{ Name: "Bob", Age: 30, Score: 92.0 }
|
||
];
|
||
|
||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||
"/data",
|
||
[["users", users, "jsontable"]]
|
||
);
|
||
```
|
||
|
||
```julia
|
||
# Julia side - receives and converts to column-oriented DataFrame
|
||
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
|
||
# The jsontable is automatically converted to DataFrame
|
||
for (dataname, data, type) in env["payloads"]
|
||
if type == "jsontable"
|
||
# data is now a DataFrame with column-oriented structure
|
||
println(data)
|
||
# Output:
|
||
# 2×3 DataFrame
|
||
# Row ┆ Name ┆ Age ┆ Score
|
||
# ┆ String ┆ Int64 ┆ Float64
|
||
# ─────┼────────┼──────┼───────
|
||
# 1 ┆ Alice ┆ 25 ┆ 88.5
|
||
# 2 ┆ Bob ┆ 30 ┆ 92.0
|
||
end
|
||
end
|
||
```
|
||
|
||
---
|
||
|
||
## Architecture
|
||
|
||
### Cross-Platform Claim-Check Pattern
|
||
|
||
```mermaid
|
||
flowchart TD
|
||
A[SmartSend Function] --> B{Is payload size < 1MB?}
|
||
B -->|Yes | C[Direct Path<br/><small>< 1MB</small>]
|
||
B -->|No | D[Link Path<br/><small>>= 1MB</small>]
|
||
|
||
C --> C1[Serialize to Buffer]
|
||
C1 --> C2[Base64/JSON encode]
|
||
C2 --> C3[Publish to NATS]
|
||
|
||
D --> D1[Serialize to Buffer]
|
||
D1 --> D2[Upload to HTTP Server]
|
||
D2 --> D3[Publish to NATS with URL]
|
||
|
||
style A fill:#e1f5ff,stroke:#0066cc,stroke-width:2px
|
||
style B fill:#fff4e1,stroke:#cc6600,stroke-width:2px
|
||
style C fill:#e8f5e9,stroke:#008000,stroke-width:2px
|
||
style D fill:#e8f5e9,stroke:#008000,stroke-width:2px
|
||
style C1 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
||
style C2 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
||
style C3 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
||
style D1 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
||
style D2 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
||
style D3 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
||
```
|
||
|
||
**Claim-Check Pattern Overview:**
|
||
- **Direct Path** (< 1MB): Payload is serialized, Base64-encoded, and published directly to NATS
|
||
- **Link Path** (≥ 1MB): Payload is serialized, uploaded to an HTTP file server, and only the URL is published to NATS (claim-check pattern)
|
||
|
||
### smartsend Return Value
|
||
|
||
All platforms return a tuple/array containing both the envelope and JSON string:
|
||
|
||
#### Julia
|
||
|
||
```julia
|
||
env, env_json_str = smartsend(...)
|
||
# Returns: ::Tuple{msg_envelope_v1, String}
|
||
# env::msg_envelope_v1 - The envelope object with all metadata and payloads
|
||
# env_json_str::String - JSON string for publishing to NATS
|
||
```
|
||
|
||
#### JavaScript
|
||
|
||
```javascript
|
||
const [env, env_json_str] = await smartsend(...);
|
||
// Returns: Promise<[env, env_json_str]>
|
||
// env: Object with all metadata and payloads
|
||
// env_json_str: String for publishing to NATS
|
||
```
|
||
|
||
#### Python
|
||
|
||
```python
|
||
env, env_json_str = await smartsend(...)
|
||
# Returns: Tuple[Dict, str]
|
||
# env: Dict with all metadata and payloads
|
||
# env_json_str: String for publishing to NATS
|
||
```
|
||
|
||
#### MicroPython
|
||
|
||
```python
|
||
env, env_json_str = NATSBridge.smartsend(...)
|
||
# Returns: Tuple[Dict, str]
|
||
# Note: MicroPython returns plain dicts (no structured envelope object)
|
||
```
|
||
|
||
---
|
||
|
||
## Installation
|
||
|
||
### Julia Dependencies
|
||
|
||
```julia
|
||
using Pkg
|
||
Pkg.add("NATS")
|
||
Pkg.add("Arrow")
|
||
Pkg.add("JSON3")
|
||
Pkg.add("HTTP")
|
||
Pkg.add("UUIDs")
|
||
Pkg.add("Dates")
|
||
```
|
||
|
||
### JavaScript Dependencies (Node.js)
|
||
|
||
```bash
|
||
npm install nats apache-arrow node-fetch
|
||
# or
|
||
yarn add nats apache-arrow node-fetch
|
||
```
|
||
|
||
**Note:** Node.js has a built-in `crypto` module for UUID generation, so no external `uuid` package is needed.
|
||
|
||
### JavaScript Dependencies (Browser)
|
||
|
||
```bash
|
||
npm install nats apache-arrow
|
||
# or use CDN:
|
||
# https://unpkg.com/nats-js/dist/bundle/nats.min.js
|
||
# https://unpkg.com/apache-arrow/arrow.min.js
|
||
```
|
||
|
||
**Note:** For browser UUID generation, use the built-in `crypto.randomUUID()` API (available in modern browsers) or a lightweight alternative like `uuidv4` package.
|
||
|
||
### Python Dependencies (Desktop)
|
||
|
||
```bash
|
||
pip install nats-py aiohttp pyarrow pandas python-dateutil
|
||
```
|
||
|
||
### MicroPython Dependencies
|
||
|
||
MicroPython uses built-in modules:
|
||
- `network` - NATS connection (custom implementation)
|
||
- `time` - Timestamps
|
||
- `uos` - File operations
|
||
- `base64` - Base64 encoding
|
||
- `json` - JSON parsing
|
||
- `struct` - Binary data handling
|
||
|
||
---
|
||
|
||
## Usage Tutorial
|
||
|
||
### Step 1: Start NATS Server
|
||
|
||
```bash
|
||
docker run -p 4222:4222 nats:latest
|
||
```
|
||
|
||
### Step 2: Start HTTP File Server (optional)
|
||
|
||
```bash
|
||
# Create a directory for file uploads
|
||
mkdir -p /tmp/fileserver
|
||
|
||
# Use any HTTP server that supports POST for file uploads
|
||
# Example: Python's built-in server
|
||
python3 -m http.server 8080 --directory /tmp/fileserver
|
||
```
|
||
|
||
### Step 3: Run Test Scenarios
|
||
|
||
```bash
|
||
# Julia tests
|
||
julia test/test_julia_to_julia_text_sender.jl
|
||
julia test/test_julia_to_julia_text_receiver.jl
|
||
|
||
# JavaScript tests (Node.js)
|
||
node test/test_js_text_sender.js
|
||
node test/test_js_text_receiver.js
|
||
|
||
# Python tests
|
||
python3 test/test_py_text_sender.py
|
||
python3 test/test_py_text_receiver.py
|
||
```
|
||
|
||
---
|
||
|
||
## Platform-Specific Implementations
|
||
|
||
### Julia Implementation
|
||
|
||
#### Module Structure
|
||
|
||
```julia
|
||
module NATSBridge
|
||
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64
|
||
|
||
# Constants
|
||
const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB
|
||
const DEFAULT_BROKER_URL = "nats://localhost:4222"
|
||
const DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
||
|
||
# Structs
|
||
struct msg_payload_v1
|
||
id::String
|
||
dataname::String
|
||
payload_type::String
|
||
transport::String
|
||
encoding::String
|
||
size::Integer
|
||
data::Any
|
||
metadata::Dict{String, Any}
|
||
end
|
||
|
||
struct msg_envelope_v1
|
||
correlation_id::String
|
||
msg_id::String
|
||
timestamp::String
|
||
send_to::String
|
||
msg_purpose::String
|
||
sender_name::String
|
||
sender_id::String
|
||
receiver_name::String
|
||
receiver_id::String
|
||
reply_to::String
|
||
reply_to_msg_id::String
|
||
broker_url::String
|
||
metadata::Dict{String, Any}
|
||
payloads::Vector{msg_payload_v1}
|
||
end
|
||
|
||
# Main functions
|
||
function smartsend(...) end
|
||
function smartreceive(...) end
|
||
|
||
# Utility functions
|
||
function _serialize_data(...) end
|
||
function _deserialize_data(...) end
|
||
function envelope_to_json(...) end
|
||
function log_trace(...) end
|
||
|
||
# File server handlers
|
||
function plik_oneshot_upload(...) end
|
||
function _fetch_with_backoff(...) end
|
||
function publish_message(...) end
|
||
|
||
# Internal helpers
|
||
function _get_payload_bytes(...) end
|
||
end
|
||
```
|
||
|
||
#### Multiple Dispatch Pattern
|
||
|
||
Julia leverages multiple dispatch for type-specific implementations:
|
||
|
||
```julia
|
||
# publish_message has two overloads based on argument types
|
||
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
|
||
conn = NATS.connect(broker_url)
|
||
publish_message(conn, subject, message, correlation_id)
|
||
end
|
||
|
||
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
|
||
try
|
||
NATS.publish(conn, subject, message)
|
||
log_trace(correlation_id, "Message published to $subject")
|
||
finally
|
||
NATS.drain(conn)
|
||
end
|
||
end
|
||
|
||
# Type-specific serialization
|
||
function _serialize_data(data::String, payload_type::String)
|
||
# Text handling
|
||
return Vector{UInt8}(data)
|
||
end
|
||
|
||
function _serialize_data(data::Dict, payload_type::String)
|
||
# Dictionary handling
|
||
json_str = JSON.json(data)
|
||
return Vector{UInt8}(json_str)
|
||
end
|
||
|
||
function _serialize_data(data::DataFrame, payload_type::String)
|
||
# Table handling - arrowtable
|
||
io = IOBuffer()
|
||
Arrow.write(io, data)
|
||
return take!(io)
|
||
end
|
||
```
|
||
|
||
#### smartsend Implementation
|
||
|
||
```julia
|
||
function smartsend(
|
||
subject::String,
|
||
data::AbstractArray{Tuple{String, T1, String}, 1};
|
||
broker_url::String = DEFAULT_BROKER_URL,
|
||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||
fileserver_upload_handler::Function = plik_oneshot_upload,
|
||
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
||
correlation_id::String = string(uuid4()),
|
||
msg_purpose::String = "chat",
|
||
sender_name::String = "NATSBridge",
|
||
receiver_name::String = "",
|
||
receiver_id::String = "",
|
||
reply_to::String = "",
|
||
reply_to_msg_id::String = "",
|
||
is_publish::Bool = true,
|
||
NATS_connection::Union{NATS.Connection, Nothing} = nothing,
|
||
msg_id::String = string(uuid4()),
|
||
sender_id::String = string(uuid4())
|
||
)::Tuple{msg_envelope_v1, String} where {T1<:Any}
|
||
|
||
log_trace(correlation_id, "Starting smartsend for subject: $subject")
|
||
|
||
# Process each payload in the list
|
||
payloads = msg_payload_v1[]
|
||
for (dataname, payload_data, payload_type) in data
|
||
# Serialize data based on type
|
||
payload_bytes = _serialize_data(payload_data, payload_type)
|
||
|
||
payload_size = length(payload_bytes)
|
||
log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes")
|
||
|
||
# Decision: Direct vs Link
|
||
if payload_size < size_threshold
|
||
# Direct path - Base64 encode and send via NATS
|
||
payload_b64 = Base64.base64encode(payload_bytes)
|
||
log_trace(correlation_id, "Using direct transport for $payload_size bytes")
|
||
|
||
payload = msg_payload_v1(
|
||
payload_b64,
|
||
payload_type;
|
||
id = string(uuid4()),
|
||
dataname = dataname,
|
||
transport = "direct",
|
||
encoding = "base64",
|
||
size = payload_size,
|
||
metadata = Dict{String, Any}("payload_bytes" => payload_size)
|
||
)
|
||
push!(payloads, payload)
|
||
else
|
||
# Link path - Upload to HTTP server, send URL via NATS
|
||
log_trace(correlation_id, "Using link transport, uploading to fileserver")
|
||
|
||
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||
|
||
if response["status"] != 200
|
||
error("Failed to upload data to fileserver: $(response["status"])")
|
||
end
|
||
|
||
url = response["url"]
|
||
log_trace(correlation_id, "Uploaded to URL: $url")
|
||
|
||
payload = msg_payload_v1(
|
||
url,
|
||
payload_type;
|
||
id = string(uuid4()),
|
||
dataname = dataname,
|
||
transport = "link",
|
||
encoding = "none",
|
||
size = payload_size,
|
||
metadata = Dict{String, Any}()
|
||
)
|
||
push!(payloads, payload)
|
||
end
|
||
end
|
||
|
||
# Create msg_envelope_v1 with all payloads
|
||
# Note: First positional argument is "send_to" (the NATS subject), not "subject"
|
||
env = msg_envelope_v1(
|
||
subject, # send_to: NATS subject to publish to
|
||
payloads;
|
||
correlation_id = correlation_id,
|
||
msg_id = msg_id,
|
||
msg_purpose = msg_purpose,
|
||
sender_name = sender_name,
|
||
sender_id = sender_id,
|
||
receiver_name = receiver_name,
|
||
receiver_id = receiver_id,
|
||
reply_to = reply_to,
|
||
reply_to_msg_id = reply_to_msg_id,
|
||
broker_url = broker_url,
|
||
metadata = Dict{String, Any}(),
|
||
)
|
||
|
||
env_json_str = envelope_to_json(env)
|
||
|
||
if is_publish == false
|
||
# skip publish
|
||
elseif is_publish == true && NATS_connection === nothing
|
||
publish_message(broker_url, subject, env_json_str, correlation_id)
|
||
elseif is_publish == true && NATS_connection !== nothing
|
||
publish_message(NATS_connection, subject, env_json_str, correlation_id)
|
||
end
|
||
|
||
return (env, env_json_str)
|
||
end
|
||
```
|
||
|
||
#### smartreceive Implementation
|
||
|
||
```julia
|
||
function smartreceive(
|
||
msg::NATS.Msg;
|
||
fileserver_download_handler::Function = _fetch_with_backoff,
|
||
max_retries::Int = 5,
|
||
base_delay::Int = 100,
|
||
max_delay::Int = 5000
|
||
)::JSON.Object{String, Any}
|
||
# Parse the JSON envelope
|
||
env_json_obj = JSON.parse(String(msg.payload))
|
||
log_trace(env_json_obj["correlation_id"], "Processing received message")
|
||
|
||
# Process all payloads in the envelope
|
||
payloads_list = Tuple{String, Any, String}[]
|
||
|
||
num_payloads = length(env_json_obj["payloads"])
|
||
|
||
for i in 1:num_payloads
|
||
payload = env_json_obj["payloads"][i]
|
||
transport = String(payload["transport"])
|
||
dataname = String(payload["dataname"])
|
||
|
||
if transport == "direct"
|
||
log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'")
|
||
|
||
# Extract base64 payload from the payload
|
||
payload_b64 = String(payload["data"])
|
||
|
||
# Decode Base64 payload
|
||
payload_bytes = Base64.base64decode(payload_b64)
|
||
|
||
# Deserialize based on type
|
||
data_type = String(payload["payload_type"])
|
||
data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"])
|
||
|
||
push!(payloads_list, (dataname, data, data_type))
|
||
elseif transport == "link"
|
||
# Extract download URL from the payload
|
||
url = String(payload["data"])
|
||
log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url")
|
||
|
||
# Fetch with exponential backoff using the download handler
|
||
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"])
|
||
|
||
# Deserialize based on type
|
||
data_type = String(payload["payload_type"])
|
||
data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"])
|
||
|
||
push!(payloads_list, (dataname, data, data_type))
|
||
else
|
||
error("Unknown transport type for payload '$dataname': $(transport)")
|
||
end
|
||
end
|
||
env_json_obj["payloads"] = payloads_list
|
||
return env_json_obj
|
||
end
|
||
```
|
||
|
||
#### _serialize_data Implementation
|
||
|
||
```julia
|
||
function _serialize_data(data::Any, payload_type::String)
|
||
if payload_type == "text"
|
||
if isa(data, String)
|
||
data_bytes = Vector{UInt8}(data)
|
||
return data_bytes
|
||
else
|
||
error("Text data must be a String")
|
||
end
|
||
elseif payload_type == "dictionary"
|
||
json_str = JSON.json(data)
|
||
json_str_bytes = Vector{UInt8}(json_str)
|
||
return json_str_bytes
|
||
elseif payload_type == "arrowtable"
|
||
# Serialize DataFrame to Arrow IPC format
|
||
io = IOBuffer()
|
||
Arrow.write(io, data)
|
||
return take!(io)
|
||
elseif payload_type == "jsontable"
|
||
# Convert column-oriented to row-oriented JSON
|
||
# data is Vector{NamedTuple} or Vector{Dict}
|
||
json_str = JSON.json(data)
|
||
return Vector{UInt8}(json_str)
|
||
elseif payload_type == "image"
|
||
if isa(data, Vector{UInt8})
|
||
return data
|
||
else
|
||
error("Image data must be Vector{UInt8}")
|
||
end
|
||
elseif payload_type == "audio"
|
||
if isa(data, Vector{UInt8})
|
||
return data
|
||
else
|
||
error("Audio data must be Vector{UInt8}")
|
||
end
|
||
elseif payload_type == "video"
|
||
if isa(data, Vector{UInt8})
|
||
return data
|
||
else
|
||
error("Video data must be Vector{UInt8}")
|
||
end
|
||
elseif payload_type == "binary"
|
||
if isa(data, IOBuffer)
|
||
return take!(data)
|
||
elseif isa(data, Vector{UInt8})
|
||
return data
|
||
else
|
||
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
||
end
|
||
else
|
||
error("Unknown payload_type: $payload_type")
|
||
end
|
||
end
|
||
```
|
||
|
||
#### _deserialize_data Implementation
|
||
|
||
```julia
|
||
function _deserialize_data(
|
||
data::Vector{UInt8},
|
||
payload_type::String,
|
||
correlation_id::String
|
||
)
|
||
if payload_type == "text"
|
||
return String(data)
|
||
elseif payload_type == "dictionary"
|
||
json_str = String(data)
|
||
return JSON.parse(json_str)
|
||
elseif payload_type == "arrowtable"
|
||
# Deserialize from Arrow IPC format
|
||
io = IOBuffer(data)
|
||
arrow_table = Arrow.Table(io)
|
||
return arrow_table
|
||
elseif payload_type == "jsontable"
|
||
# Deserialize from JSON format
|
||
# Returns Vector{NamedTuple} (column-oriented compatible)
|
||
json_str = String(data)
|
||
parsed = JSON.parse(json_str)
|
||
return parsed
|
||
elseif payload_type == "image"
|
||
return data
|
||
elseif payload_type == "audio"
|
||
return data
|
||
elseif payload_type == "video"
|
||
return data
|
||
elseif payload_type == "binary"
|
||
return data
|
||
else
|
||
error("Unknown payload_type: $payload_type")
|
||
end
|
||
end
|
||
```
|
||
|
||
#### _fetch_with_backoff Implementation
|
||
|
||
```julia
|
||
function _fetch_with_backoff(
|
||
url::String,
|
||
max_retries::Int,
|
||
base_delay::Int,
|
||
max_delay::Int,
|
||
correlation_id::String
|
||
)
|
||
delay = base_delay
|
||
for attempt in 1:max_retries
|
||
try
|
||
response = HTTP.request("GET", url)
|
||
if response.status == 200
|
||
log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt")
|
||
return response.body
|
||
else
|
||
error("Failed to fetch: $(response.status)")
|
||
end
|
||
catch e
|
||
log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))")
|
||
|
||
if attempt < max_retries
|
||
sleep(delay / 1000.0)
|
||
delay = min(delay * 2, max_delay)
|
||
end
|
||
end
|
||
end
|
||
|
||
error("Failed to fetch data after $max_retries attempts")
|
||
end
|
||
```
|
||
|
||
#### plik_oneshot_upload Implementation
|
||
|
||
```julia
|
||
function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8})
|
||
# Get upload id
|
||
url_getUploadID = "$file_server_url/upload"
|
||
headers = ["Content-Type" => "application/json"]
|
||
body = """{ "OneShot" : true }"""
|
||
http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||
response_json = JSON.parse(http_response.body)
|
||
uploadid = response_json["id"]
|
||
uploadtoken = response_json["uploadToken"]
|
||
|
||
# Upload file
|
||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||
url_upload = "$file_server_url/file/$uploadid"
|
||
headers = ["X-UploadToken" => uploadtoken]
|
||
|
||
form = HTTP.Form(Dict(
|
||
"file" => file_multipart
|
||
))
|
||
|
||
http_response = nothing
|
||
try
|
||
http_response = HTTP.post(url_upload, headers, form)
|
||
catch e
|
||
@error "Request failed" exception=e
|
||
end
|
||
response_json = JSON.parse(http_response.body)
|
||
fileid = response_json["id"]
|
||
|
||
url = "$file_server_url/file/$uploadid/$fileid/$dataname"
|
||
|
||
return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||
end
|
||
```
|
||
|
||
---
|
||
|
||
### JavaScript Implementation
|
||
|
||
#### Module Structure
|
||
|
||
```javascript
|
||
// natsbridge.js
|
||
const nats = require('nats');
|
||
const crypto = require('crypto');
|
||
const fetch = require('node-fetch');
|
||
|
||
// UUID generation using built-in crypto module
|
||
const uuidv4 = () => crypto.randomUUID();
|
||
|
||
const DEFAULT_SIZE_THRESHOLD = 1_000_000;
|
||
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
|
||
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
|
||
|
||
class NATSClient {
|
||
constructor(url) {
|
||
this.url = url;
|
||
this.connection = null;
|
||
}
|
||
|
||
async connect() {
|
||
this.connection = await nats.connect({ servers: this.url });
|
||
return this.connection;
|
||
}
|
||
|
||
async publish(subject, message) {
|
||
if (!this.connection) {
|
||
await this.connect();
|
||
}
|
||
await this.connection.publish(subject, message);
|
||
}
|
||
|
||
async close() {
|
||
if (this.connection) {
|
||
this.connection.close();
|
||
}
|
||
}
|
||
}
|
||
|
||
async function smartsend(subject, data, options = {}) {
|
||
// Implementation
|
||
}
|
||
|
||
async function smartreceive(msg, options = {}) {
|
||
// Implementation
|
||
}
|
||
|
||
module.exports = {
|
||
NATSClient,
|
||
smartsend,
|
||
smartreceive,
|
||
plikOneshotUpload,
|
||
fetchWithBackoff
|
||
};
|
||
```
|
||
|
||
#### smartsend Implementation
|
||
|
||
```javascript
|
||
const nats = require('nats');
|
||
const crypto = require('crypto');
|
||
const fetch = require('node-fetch');
|
||
const arrow = require('apache-arrow');
|
||
|
||
// UUID generation using built-in crypto module
|
||
const uuidv4 = () => crypto.randomUUID();
|
||
|
||
const DEFAULT_SIZE_THRESHOLD = 1_000_000;
|
||
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
|
||
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
|
||
|
||
async function smartsend(subject, data, options = {}) {
|
||
const {
|
||
broker_url = DEFAULT_BROKER_URL,
|
||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||
fileserver_upload_handler = plikOneshotUpload,
|
||
size_threshold = DEFAULT_SIZE_THRESHOLD,
|
||
correlation_id = uuidv4(),
|
||
msg_purpose = 'chat',
|
||
sender_name = 'NATSBridge',
|
||
receiver_name = '',
|
||
receiver_id = '',
|
||
reply_to = '',
|
||
reply_to_msg_id = '',
|
||
is_publish = true,
|
||
nats_connection = null,
|
||
msg_id = uuidv4(),
|
||
sender_id = uuidv4()
|
||
} = options;
|
||
|
||
console.log(`[Correlation: ${correlation_id}] Starting smartsend for subject: ${subject}`);
|
||
|
||
// Process payloads
|
||
const payloads = [];
|
||
for (const [dataname, payloadData, payloadType] of data) {
|
||
const payloadBytes = await serializeData(payloadData, payloadType);
|
||
const payloadSize = payloadBytes.byteLength;
|
||
|
||
console.log(`[Correlation: ${correlation_id}] Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);
|
||
|
||
if (payloadSize < size_threshold) {
|
||
// Direct path
|
||
const payloadB64 = bufferToBase64(payloadBytes);
|
||
console.log(`[Correlation: ${correlation_id}] Using direct transport for ${payloadSize} bytes`);
|
||
|
||
payloads.push({
|
||
id: uuidv4(),
|
||
dataname,
|
||
payload_type: payloadType,
|
||
transport: 'direct',
|
||
encoding: 'base64',
|
||
size: payloadSize,
|
||
data: payloadB64,
|
||
metadata: { payload_bytes: payloadSize }
|
||
});
|
||
} else {
|
||
// Link path
|
||
console.log(`[Correlation: ${correlation_id}] Using link transport, uploading to fileserver`);
|
||
|
||
const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes);
|
||
|
||
if (response.status !== 200) {
|
||
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
|
||
}
|
||
|
||
console.log(`[Correlation: ${correlation_id}] Uploaded to URL: ${response.url}`);
|
||
|
||
payloads.push({
|
||
id: uuidv4(),
|
||
dataname,
|
||
payload_type: payloadType,
|
||
transport: 'link',
|
||
encoding: 'none',
|
||
size: payloadSize,
|
||
data: response.url,
|
||
metadata: {}
|
||
});
|
||
}
|
||
}
|
||
|
||
// Build envelope
|
||
const env = {
|
||
correlation_id,
|
||
msg_id,
|
||
timestamp: new Date().toISOString(),
|
||
send_to: subject,
|
||
msg_purpose,
|
||
sender_name,
|
||
sender_id,
|
||
receiver_name,
|
||
receiver_id,
|
||
reply_to,
|
||
reply_to_msg_id,
|
||
broker_url,
|
||
metadata: {},
|
||
payloads
|
||
};
|
||
|
||
const env_json_str = JSON.stringify(env);
|
||
|
||
if (is_publish) {
|
||
if (nats_connection) {
|
||
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
|
||
} else {
|
||
await publishMessage(broker_url, subject, env_json_str, correlation_id);
|
||
}
|
||
}
|
||
|
||
return [env, env_json_str];
|
||
}
|
||
```
|
||
|
||
#### serializeData Implementation
|
||
|
||
```javascript
|
||
const arrow = require('apache-arrow');
|
||
|
||
async function serializeData(data, payload_type) {
|
||
if (payload_type === 'text') {
|
||
if (typeof data === 'string') {
|
||
return Buffer.from(data, 'utf8');
|
||
} else {
|
||
throw new Error('Text data must be a string');
|
||
}
|
||
} else if (payload_type === 'dictionary') {
|
||
const jsonStr = JSON.stringify(data);
|
||
return Buffer.from(jsonStr, 'utf8');
|
||
} else if (payload_type === 'arrowtable') {
|
||
// Convert Array<Object> to Arrow IPC
|
||
// data is row-oriented: [{id: 1, name: "Alice"}, ...]
|
||
if (!Array.isArray(data) || data.length === 0) {
|
||
throw new Error('arrowtable data must be a non-empty array of objects');
|
||
}
|
||
|
||
// Create schema from first row
|
||
const schemaFields = Object.keys(data[0]).map(key =>
|
||
new arrow.Field(key, arrow.any())
|
||
);
|
||
const schema = new arrow.Schema(schemaFields);
|
||
|
||
// Create writer
|
||
const writer = new arrow.RecordBatchWriter([schema]);
|
||
|
||
// Write rows
|
||
for (const row of data) {
|
||
const recordBatch = arrow.recordBatch.fromObjects([row], schema);
|
||
writer.write(recordBatch);
|
||
}
|
||
await writer.close();
|
||
|
||
// Read buffer
|
||
return writer.toBuffer();
|
||
} else if (payload_type === 'jsontable') {
|
||
// data is already row-oriented Array<Object>
|
||
// Serialize directly to JSON
|
||
const jsonStr = JSON.stringify(data);
|
||
return Buffer.from(jsonStr, 'utf8');
|
||
} else if (payload_type === 'image') {
|
||
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
||
return Buffer.from(data);
|
||
} else {
|
||
throw new Error('Image data must be Uint8Array or Buffer');
|
||
}
|
||
} else if (payload_type === 'audio') {
|
||
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
||
return Buffer.from(data);
|
||
} else {
|
||
throw new Error('Audio data must be Uint8Array or Buffer');
|
||
}
|
||
} else if (payload_type === 'video') {
|
||
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
||
return Buffer.from(data);
|
||
} else {
|
||
throw new Error('Video data must be Uint8Array or Buffer');
|
||
}
|
||
} else if (payload_type === 'binary') {
|
||
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
||
return Buffer.from(data);
|
||
} else {
|
||
throw new Error('Binary data must be Uint8Array or Buffer');
|
||
}
|
||
} else {
|
||
throw new Error(`Unknown payload_type: ${payload_type}`);
|
||
}
|
||
}
|
||
|
||
function bufferToBase64(buffer) {
|
||
return buffer.toString('base64');
|
||
}
|
||
```
|
||
|
||
#### deserializeData Implementation
|
||
|
||
```javascript
|
||
const arrow = require('apache-arrow');
|
||
|
||
async function deserializeData(data, payload_type, correlation_id) {
|
||
if (payload_type === 'text') {
|
||
return Buffer.from(data).toString('utf8');
|
||
} else if (payload_type === 'dictionary') {
|
||
const jsonStr = Buffer.from(data).toString('utf8');
|
||
return JSON.parse(jsonStr);
|
||
} else if (payload_type === 'arrowtable') {
|
||
// Deserialize from Arrow IPC
|
||
const buffer = Buffer.from(data);
|
||
const table = arrow.tableFromRawBytes(buffer);
|
||
return table;
|
||
} else if (payload_type === 'jsontable') {
|
||
// Deserialize from JSON - returns Array<Object> (row-oriented)
|
||
const jsonStr = Buffer.from(data).toString('utf8');
|
||
return JSON.parse(jsonStr);
|
||
} else if (payload_type === 'image') {
|
||
return Buffer.from(data);
|
||
} else if (payload_type === 'audio') {
|
||
return Buffer.from(data);
|
||
} else if (payload_type === 'video') {
|
||
return Buffer.from(data);
|
||
} else if (payload_type === 'binary') {
|
||
return Buffer.from(data);
|
||
} else {
|
||
throw new Error(`Unknown payload_type: ${payload_type}`);
|
||
}
|
||
}
|
||
```
|
||
|
||
#### fetchWithBackoff Implementation
|
||
|
||
```javascript
|
||
async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) {
|
||
let delay = base_delay;
|
||
|
||
for (let attempt = 1; attempt <= max_retries; attempt++) {
|
||
try {
|
||
const response = await fetch(url);
|
||
|
||
if (response.status === 200) {
|
||
console.log(`[Correlation: ${correlation_id}] Successfully fetched data from ${url} on attempt ${attempt}`);
|
||
return await response.arrayBuffer();
|
||
} else {
|
||
throw new Error(`Failed to fetch: ${response.status}`);
|
||
}
|
||
} catch (e) {
|
||
console.log(`[Correlation: ${correlation_id}] Attempt ${attempt} failed: ${e.constructor.name}`);
|
||
|
||
if (attempt < max_retries) {
|
||
await new Promise(resolve => setTimeout(resolve, delay));
|
||
delay = Math.min(delay * 2, max_delay);
|
||
}
|
||
}
|
||
}
|
||
|
||
throw new Error(`Failed to fetch data after ${max_retries} attempts`);
|
||
}
|
||
```
|
||
|
||
#### plikOneshotUpload Implementation
|
||
|
||
```javascript
|
||
async function plikOneshotUpload(file_server_url, dataname, data) {
|
||
// Get upload id
|
||
const url_getUploadID = `${file_server_url}/upload`;
|
||
const headers = { 'Content-Type': 'application/json' };
|
||
const body = JSON.stringify({ OneShot: true });
|
||
|
||
const http_response = await fetch(url_getUploadID, {
|
||
method: 'POST',
|
||
headers,
|
||
body
|
||
});
|
||
|
||
const response_json = await http_response.json();
|
||
const uploadid = response_json.id;
|
||
const uploadtoken = response_json.uploadToken;
|
||
|
||
// Upload file
|
||
const url_upload = `${file_server_url}/file/${uploadid}`;
|
||
const form = new FormData();
|
||
const blob = new Blob([data]);
|
||
form.append('file', blob, dataname);
|
||
|
||
const upload_headers = {
|
||
'X-UploadToken': uploadtoken
|
||
};
|
||
|
||
const upload_response = await fetch(url_upload, {
|
||
method: 'POST',
|
||
headers: upload_headers,
|
||
body: form
|
||
});
|
||
|
||
const upload_json = await upload_response.json();
|
||
const fileid = upload_json.id;
|
||
|
||
const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`;
|
||
|
||
return {
|
||
status: upload_response.status,
|
||
uploadid,
|
||
fileid,
|
||
url
|
||
};
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### Python Implementation
|
||
|
||
#### Module Structure
|
||
|
||
```python
|
||
# natsbridge.py
|
||
import asyncio
|
||
import base64
|
||
import json
|
||
import uuid
|
||
import time
|
||
from typing import Any, Dict, List, Tuple, Union, Callable
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
|
||
try:
|
||
import pyarrow as arrow
|
||
import pyarrow.parquet as pq
|
||
ARROW_AVAILABLE = True
|
||
except ImportError:
|
||
ARROW_AVAILABLE = False
|
||
|
||
try:
|
||
import aiohttp
|
||
import nats
|
||
from nats.aio.client import Client as NATSClient
|
||
NATS_AVAILABLE = True
|
||
except ImportError:
|
||
NATS_AVAILABLE = False
|
||
|
||
|
||
DEFAULT_SIZE_THRESHOLD = 1_000_000
|
||
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
||
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
||
|
||
|
||
@dataclass
|
||
class MsgPayloadV1:
|
||
"""Message payload structure."""
|
||
id: str
|
||
dataname: str
|
||
payload_type: str
|
||
transport: str
|
||
encoding: str
|
||
size: int
|
||
data: Union[str, bytes]
|
||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||
|
||
|
||
@dataclass
|
||
class MsgEnvelopeV1:
|
||
"""Message envelope structure."""
|
||
correlation_id: str
|
||
msg_id: str
|
||
timestamp: str
|
||
send_to: str
|
||
msg_purpose: str
|
||
sender_name: str
|
||
sender_id: str
|
||
receiver_name: str
|
||
receiver_id: str
|
||
reply_to: str
|
||
reply_to_msg_id: str
|
||
broker_url: str
|
||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||
payloads: List[MsgPayloadV1] = field(default_factory=list)
|
||
|
||
|
||
class NATSBridge:
|
||
"""Cross-platform NATS bridge implementation."""
|
||
|
||
def __init__(self, broker_url: str = None, fileserver_url: str = None):
|
||
self.broker_url = broker_url or DEFAULT_BROKER_URL
|
||
self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
|
||
self._nats_client: NATSClient = None
|
||
|
||
async def smartsend(self, subject: str, data: List[Tuple[str, Any, str]], **kwargs) -> Tuple[Dict, str]:
|
||
"""Send data via NATS."""
|
||
pass
|
||
|
||
async def smartreceive(self, msg: Any, **kwargs) -> Dict:
|
||
"""Receive and process NATS message."""
|
||
pass
|
||
```
|
||
|
||
#### smartsend Implementation
|
||
|
||
```python
|
||
import asyncio
|
||
import base64
|
||
import json
|
||
import uuid
|
||
from typing import Any, Dict, List, Tuple, Union, Callable
|
||
from datetime import datetime
|
||
|
||
DEFAULT_SIZE_THRESHOLD = 1_000_000
|
||
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
||
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
||
|
||
|
||
async def smartsend(
|
||
subject: str,
|
||
data: List[Tuple[str, Any, str]],
|
||
broker_url: str = DEFAULT_BROKER_URL,
|
||
fileserver_url: str = DEFAULT_FILESERVER_URL,
|
||
fileserver_upload_handler: Callable = plik_oneshot_upload,
|
||
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
|
||
correlation_id: str = None,
|
||
msg_purpose: str = "chat",
|
||
sender_name: str = "NATSBridge",
|
||
receiver_name: str = "",
|
||
receiver_id: str = "",
|
||
reply_to: str = "",
|
||
reply_to_msg_id: str = "",
|
||
is_publish: bool = True,
|
||
nats_connection: Any = None,
|
||
msg_id: str = None,
|
||
sender_id: str = None
|
||
) -> Tuple[Dict, str]:
|
||
"""
|
||
Send data via NATS with automatic transport selection.
|
||
|
||
Args:
|
||
subject: NATS subject to publish to
|
||
data: List of (dataname, data, type) tuples
|
||
**kwargs: Additional options
|
||
|
||
Returns:
|
||
Tuple of (env, env_json_str)
|
||
"""
|
||
if correlation_id is None:
|
||
correlation_id = str(uuid.uuid4())
|
||
if msg_id is None:
|
||
msg_id = str(uuid.uuid4())
|
||
if sender_id is None:
|
||
sender_id = str(uuid.uuid4())
|
||
|
||
print(f"[Correlation: {correlation_id}] Starting smartsend for subject: {subject}")
|
||
|
||
# Process payloads
|
||
payloads = []
|
||
for dataname, payload_data, payload_type in data:
|
||
payload_bytes = _serialize_data(payload_data, payload_type)
|
||
payload_size = len(payload_bytes)
|
||
|
||
print(f"[Correlation: {correlation_id}] Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes")
|
||
|
||
if payload_size < size_threshold:
|
||
# Direct path
|
||
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
|
||
print(f"[Correlation: {correlation_id}] Using direct transport for {payload_size} bytes")
|
||
|
||
payloads.append({
|
||
'id': str(uuid.uuid4()),
|
||
'dataname': dataname,
|
||
'payload_type': payload_type,
|
||
'transport': 'direct',
|
||
'encoding': 'base64',
|
||
'size': payload_size,
|
||
'data': payload_b64,
|
||
'metadata': {'payload_bytes': payload_size}
|
||
})
|
||
else:
|
||
# Link path
|
||
print(f"[Correlation: {correlation_id}] Using link transport, uploading to fileserver")
|
||
|
||
response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||
|
||
if response['status'] != 200:
|
||
raise Exception(f"Failed to upload data to fileserver: {response['status']}")
|
||
|
||
print(f"[Correlation: {correlation_id}] Uploaded to URL: {response['url']}")
|
||
|
||
payloads.append({
|
||
'id': str(uuid.uuid4()),
|
||
'dataname': dataname,
|
||
'payload_type': payload_type,
|
||
'transport': 'link',
|
||
'encoding': 'none',
|
||
'size': payload_size,
|
||
'data': response['url'],
|
||
'metadata': {}
|
||
})
|
||
|
||
# Build envelope
|
||
env = {
|
||
'correlation_id': correlation_id,
|
||
'msg_id': msg_id,
|
||
'timestamp': datetime.utcnow().isoformat() + 'Z',
|
||
'send_to': subject,
|
||
'msg_purpose': msg_purpose,
|
||
'sender_name': sender_name,
|
||
'sender_id': sender_id,
|
||
'receiver_name': receiver_name,
|
||
'receiver_id': receiver_id,
|
||
'reply_to': reply_to,
|
||
'reply_to_msg_id': reply_to_msg_id,
|
||
'broker_url': broker_url,
|
||
'metadata': {},
|
||
'payloads': payloads
|
||
}
|
||
|
||
env_json_str = json.dumps(env)
|
||
|
||
if is_publish:
|
||
if nats_connection:
|
||
await publish_message(nats_connection, subject, env_json_str, correlation_id)
|
||
else:
|
||
await publish_message(broker_url, subject, env_json_str, correlation_id)
|
||
|
||
return env, env_json_str
|
||
```
|
||
|
||
#### serializeData Implementation
|
||
|
||
```python
|
||
import base64
|
||
import json
|
||
from typing import Any
|
||
|
||
try:
|
||
import pyarrow as arrow
|
||
import pyarrow.feather as feather
|
||
import pyarrow.ipc as ipc
|
||
ARROW_AVAILABLE = True
|
||
except ImportError:
|
||
ARROW_AVAILABLE = False
|
||
|
||
|
||
def _serialize_data(data: Any, payload_type: str) -> bytes:
|
||
"""Serialize data to bytes based on type."""
|
||
if payload_type == 'text':
|
||
if isinstance(data, str):
|
||
return data.encode('utf-8')
|
||
else:
|
||
raise Error('Text data must be a string')
|
||
elif payload_type == 'dictionary':
|
||
json_str = json.dumps(data)
|
||
return json_str.encode('utf-8')
|
||
elif payload_type == 'arrowtable':
|
||
if not ARROW_AVAILABLE:
|
||
raise Error('pyarrow not available for table serialization')
|
||
|
||
import io
|
||
buf = io.BytesIO()
|
||
import pandas as pd
|
||
if isinstance(data, pd.DataFrame):
|
||
# Column-oriented DataFrame to Arrow
|
||
table = arrow.Table.from_pandas(data)
|
||
sink = arrow.ipc.new_file(buf)
|
||
arrow.ipc.write_table(table, sink)
|
||
sink.close()
|
||
return buf.getvalue()
|
||
else:
|
||
raise Error('arrowtable data must be a pandas DataFrame')
|
||
elif payload_type == 'jsontable':
|
||
# data is list[dict] or list (row-oriented)
|
||
# Serialize directly to JSON
|
||
json_str = json.dumps(data)
|
||
return json_str.encode('utf-8')
|
||
elif payload_type == 'image':
|
||
if isinstance(data, (bytes, bytearray)):
|
||
return bytes(data)
|
||
else:
|
||
raise Error('Image data must be bytes')
|
||
elif payload_type == 'audio':
|
||
if isinstance(data, (bytes, bytearray)):
|
||
return bytes(data)
|
||
else:
|
||
raise Error('Audio data must be bytes')
|
||
elif payload_type == 'video':
|
||
if isinstance(data, (bytes, bytearray)):
|
||
return bytes(data)
|
||
else:
|
||
raise Error('Video data must be bytes')
|
||
elif payload_type == 'binary':
|
||
if isinstance(data, (bytes, bytearray)):
|
||
return bytes(data)
|
||
else:
|
||
raise Error('Binary data must be bytes')
|
||
else:
|
||
raise Error(f'Unknown payload_type: {payload_type}')
|
||
```
|
||
|
||
#### deserializeData Implementation
|
||
|
||
```python
|
||
import base64
|
||
import json
|
||
from typing import Any
|
||
|
||
try:
|
||
import pyarrow as arrow
|
||
import pyarrow.feather as feather
|
||
import pyarrow.ipc as ipc
|
||
ARROW_AVAILABLE = True
|
||
except ImportError:
|
||
ARROW_AVAILABLE = False
|
||
|
||
|
||
def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any:
|
||
"""Deserialize bytes to data based on type."""
|
||
if payload_type == 'text':
|
||
return data.decode('utf-8')
|
||
elif payload_type == 'dictionary':
|
||
json_str = data.decode('utf-8')
|
||
return json.loads(json_str)
|
||
elif payload_type == 'arrowtable':
|
||
if not ARROW_AVAILABLE:
|
||
raise Error('pyarrow not available for table deserialization')
|
||
|
||
import io
|
||
buf = io.BytesIO(data)
|
||
reader = arrow.ipc.open_file(buf)
|
||
return reader.read_all().to_pandas()
|
||
elif payload_type == 'jsontable':
|
||
# Deserialize from JSON - returns list[dict] (row-oriented)
|
||
json_str = data.decode('utf-8')
|
||
return json.loads(json_str)
|
||
elif payload_type == 'image':
|
||
return data
|
||
elif payload_type == 'audio':
|
||
return data
|
||
elif payload_type == 'video':
|
||
return data
|
||
elif payload_type == 'binary':
|
||
return data
|
||
else:
|
||
raise Error(f'Unknown payload_type: {payload_type}')
|
||
```
|
||
|
||
#### fetchWithBackoff Implementation
|
||
|
||
```python
|
||
import asyncio
|
||
import aiohttp
|
||
from typing import Callable
|
||
|
||
|
||
async def fetch_with_backoff(
|
||
url: str,
|
||
max_retries: int,
|
||
base_delay: int,
|
||
max_delay: int,
|
||
correlation_id: str
|
||
) -> bytes:
|
||
"""Fetch URL with exponential backoff."""
|
||
delay = base_delay
|
||
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url) as response:
|
||
if response.status == 200:
|
||
print(f"[Correlation: {correlation_id}] Successfully fetched data from {url} on attempt {attempt}")
|
||
return await response.read()
|
||
else:
|
||
raise Exception(f"Failed to fetch: {response.status}")
|
||
except Exception as e:
|
||
print(f"[Correlation: {correlation_id}] Attempt {attempt} failed: {type(e).__name__}")
|
||
|
||
if attempt < max_retries:
|
||
await asyncio.sleep(delay / 1000.0)
|
||
delay = min(delay * 2, max_delay)
|
||
|
||
raise Exception(f"Failed to fetch data after {max_retries} attempts")
|
||
```
|
||
|
||
#### plikOneshotUpload Implementation
|
||
|
||
```python
|
||
import aiohttp
|
||
import json
|
||
from typing import Dict, Any
|
||
|
||
|
||
async def plik_oneshot_upload(
|
||
file_server_url: str,
|
||
dataname: str,
|
||
data: bytes
|
||
) -> Dict[str, Any]:
|
||
"""Upload data to plik server in one-shot mode."""
|
||
|
||
# Get upload id
|
||
async with aiohttp.ClientSession() as session:
|
||
url_getUploadID = f"{file_server_url}/upload"
|
||
headers = {'Content-Type': 'application/json'}
|
||
body = json.dumps({"OneShot": True})
|
||
|
||
async with session.post(url_getUploadID, headers=headers, data=body) as response:
|
||
response_json = await response.json()
|
||
uploadid = response_json['id']
|
||
uploadtoken = response_json['uploadToken']
|
||
|
||
# Upload file
|
||
url_upload = f"{file_server_url}/file/{uploadid}"
|
||
headers = {'X-UploadToken': uploadtoken}
|
||
|
||
form = aiohttp.FormData()
|
||
form.add_field('file', data, filename=dataname, content_type='application/octet-stream')
|
||
|
||
async with session.post(url_upload, headers=headers, data=form) as upload_response:
|
||
upload_json = await upload_response.json()
|
||
fileid = upload_json['id']
|
||
|
||
url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}"
|
||
|
||
return {
|
||
'status': upload_response.status,
|
||
'uploadid': uploadid,
|
||
'fileid': fileid,
|
||
'url': url
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## MicroPython Implementation
|
||
|
||
### Limitations
|
||
|
||
MicroPython has significant constraints compared to desktop implementations:
|
||
|
||
| Feature | Desktop | MicroPython |
|
||
|---------|---------|-------------|
|
||
| Memory | Unlimited | ~256KB - 1MB |
|
||
| Arrow IPC | ✅ | ❌ (not supported) |
|
||
| Async/Await | ✅ | ⚠️ (uasyncio only) |
|
||
| Large payloads (>1MB) | ✅ | ❌ (enforced limit) |
|
||
| arrowtable | ✅ | ❌ |
|
||
| jsontable | ⚠️ (limited) | ⚠️ (limited) |
|
||
| Multiple payloads | ✅ | ⚠️ (limited) |
|
||
|
||
### MicroPython Module Structure
|
||
|
||
```python
|
||
# natsbridge_mpy.py (MicroPython)
|
||
import network
|
||
import time
|
||
import json
|
||
import base64
|
||
import uos
|
||
import struct
|
||
|
||
# Constants
|
||
DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython
|
||
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
||
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
||
MAX_PAYLOAD_SIZE = 50000 # Hard limit
|
||
|
||
# Note: MicroPython uses list[list] for jsontable (row-oriented)
|
||
# No DataFrame support - data is always row-oriented
|
||
|
||
|
||
class NATSBridge:
|
||
"""MicroPython NATS bridge implementation."""
|
||
|
||
def __init__(self, broker_url=None, fileserver_url=None):
|
||
self.broker_url = broker_url or DEFAULT_BROKER_URL
|
||
self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
|
||
self._nats_conn = None
|
||
|
||
def smartsend(self, subject, data, **kwargs):
|
||
"""Send data (synchronous)."""
|
||
correlation_id = self._generate_uuid()
|
||
msg_id = self._generate_uuid()
|
||
sender_id = self._generate_uuid()
|
||
|
||
print(f"[Correlation: {correlation_id}] Starting smartsend")
|
||
|
||
payloads = []
|
||
for dataname, payload_data, payload_type in data:
|
||
payload_bytes = self._serialize_data(payload_data, payload_type)
|
||
payload_size = len(payload_bytes)
|
||
|
||
if payload_size > MAX_PAYLOAD_SIZE:
|
||
raise MemoryError(f"Payload {dataname} exceeds max size {MAX_PAYLOAD_SIZE}")
|
||
|
||
if payload_size < DEFAULT_SIZE_THRESHOLD:
|
||
# Direct path
|
||
payload_b64 = base64.b64encode(payload_bytes).decode('ascii')
|
||
payloads.append({
|
||
'id': self._generate_uuid(),
|
||
'dataname': dataname,
|
||
'payload_type': payload_type,
|
||
'transport': 'direct',
|
||
'encoding': 'base64',
|
||
'size': payload_size,
|
||
'data': payload_b64
|
||
})
|
||
else:
|
||
# Link path (limited support)
|
||
response = self._sync_fileserver_upload(self.fileserver_url, dataname, payload_bytes)
|
||
payloads.append({
|
||
'id': self._generate_uuid(),
|
||
'dataname': dataname,
|
||
'payload_type': payload_type,
|
||
'transport': 'link',
|
||
'encoding': 'none',
|
||
'size': payload_size,
|
||
'data': response['url']
|
||
})
|
||
|
||
env = {
|
||
'correlation_id': correlation_id,
|
||
'msg_id': msg_id,
|
||
'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()),
|
||
'send_to': subject,
|
||
'msg_purpose': kwargs.get('msg_purpose', 'chat'),
|
||
'sender_name': kwargs.get('sender_name', 'NATSBridge'),
|
||
'sender_id': sender_id,
|
||
'receiver_name': kwargs.get('receiver_name', ''),
|
||
'receiver_id': kwargs.get('receiver_id', ''),
|
||
'reply_to': kwargs.get('reply_to', ''),
|
||
'reply_to_msg_id': kwargs.get('reply_to_msg_id', ''),
|
||
'broker_url': self.broker_url,
|
||
'metadata': {},
|
||
'payloads': payloads
|
||
}
|
||
|
||
env_json_str = json.dumps(env)
|
||
|
||
# Publish
|
||
self._publish(subject, env_json_str, correlation_id)
|
||
|
||
return env, env_json_str
|
||
|
||
def smartreceive(self, msg, **kwargs):
|
||
"""Receive and process message (synchronous)."""
|
||
env_json_obj = json.loads(msg.payload)
|
||
correlation_id = env_json_obj['correlation_id']
|
||
|
||
payloads_list = []
|
||
for payload in env_json_obj['payloads']:
|
||
transport = payload['transport']
|
||
dataname = payload['dataname']
|
||
|
||
if transport == 'direct':
|
||
payload_b64 = payload['data']
|
||
payload_bytes = base64.b64decode(payload_b64)
|
||
data_type = payload['payload_type']
|
||
data = self._deserialize_data(payload_bytes, data_type)
|
||
payloads_list.append((dataname, data, data_type))
|
||
elif transport == 'link':
|
||
url = payload['data']
|
||
downloaded_data = self._sync_fileserver_download(
|
||
url,
|
||
kwargs.get('max_retries', 3),
|
||
kwargs.get('base_delay', 100),
|
||
kwargs.get('max_delay', 1000),
|
||
correlation_id
|
||
)
|
||
data_type = payload['payload_type']
|
||
data = self._deserialize_data(downloaded_data, data_type)
|
||
payloads_list.append((dataname, data, data_type))
|
||
|
||
env_json_obj['payloads'] = payloads_list
|
||
return env_json_obj
|
||
|
||
def _serialize_data(self, data, payload_type):
|
||
"""Serialize data (MicroPython version - no arrowtable support)."""
|
||
if payload_type == 'text':
|
||
return data.encode('utf-8')
|
||
elif payload_type == 'dictionary':
|
||
return json.dumps(data).encode('utf-8')
|
||
elif payload_type == 'jsontable':
|
||
# data is list[list] (row-oriented)
|
||
return json.dumps(data).encode('utf-8')
|
||
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
||
return bytes(data)
|
||
else:
|
||
raise ValueError(f"Unknown payload_type: {payload_type}")
|
||
|
||
def _deserialize_data(self, data, payload_type):
|
||
"""Deserialize data (MicroPython version)."""
|
||
if payload_type == 'text':
|
||
return data.decode('utf-8')
|
||
elif payload_type == 'dictionary':
|
||
return json.loads(data.decode('utf-8'))
|
||
elif payload_type == 'jsontable':
|
||
# Returns list[list] (row-oriented)
|
||
return json.loads(data.decode('utf-8'))
|
||
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
||
return data
|
||
else:
|
||
raise ValueError(f"Unknown payload_type: {payload_type}")
|
||
|
||
def _generate_uuid(self):
|
||
"""Generate simple UUID (MicroPython compatible)."""
|
||
return 'mp-%04x%04x-%04x-%04x-%04x-%04x%04x%04x' % (
|
||
time.time_ns() // (10**6) % 0xFFFFFFFF,
|
||
time.time_ns() % 0xFFFFFFFF,
|
||
time.time_ns() >> 32 & 0xFFFF,
|
||
time.time_ns() >> 48 & 0xFFFF,
|
||
time.time_ns() >> 64 & 0xFFFF,
|
||
time.time_ns() >> 80 & 0xFFFF,
|
||
time.time_ns() >> 96 & 0xFFFF,
|
||
time.time_ns() >> 112 & 0xFFFF
|
||
)
|
||
|
||
def _sync_fileserver_upload(self, url, dataname, data):
|
||
"""Synchronous file upload (limited)."""
|
||
# Simplified implementation for MicroPython
|
||
# In practice, would use network.HTTP or similar
|
||
raise NotImplementedError("File upload not implemented in MicroPython")
|
||
|
||
def _sync_fileserver_download(self, url, max_retries, base_delay, max_delay, correlation_id):
|
||
"""Synchronous file download with backoff."""
|
||
# Simplified implementation for MicroPython
|
||
raise NotImplementedError("File download not implemented in MicroPython")
|
||
|
||
def _publish(self, subject, message, correlation_id):
|
||
"""Publish message to NATS."""
|
||
# Simplified implementation for MicroPython
|
||
raise NotImplementedError("NATS publishing not implemented in MicroPython")
|
||
```
|
||
|
||
---
|
||
|
||
## Configuration
|
||
|
||
### Environment Variables
|
||
|
||
| Variable | Default | Description |
|
||
|----------|---------|-------------|
|
||
| `NATS_URL` | `nats://localhost:4222` | NATS server URL |
|
||
| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL |
|
||
| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) |
|
||
|
||
### MicroPython Configuration
|
||
|
||
```python
|
||
# micropython.conf
|
||
NATS_URL = "nats://broker.local:4222"
|
||
FILESERVER_URL = "http://fileserver.local:8080"
|
||
SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices
|
||
MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython
|
||
```
|
||
|
||
---
|
||
|
||
## Performance Considerations
|
||
|
||
### Zero-Copy Reading
|
||
|
||
| Platform | Strategy |
|
||
|----------|----------|
|
||
| **Julia** | `Arrow.read()` with memory-mapped files |
|
||
| **JavaScript** | `ArrayBuffer` with `DataView` |
|
||
| **Python** | `pyarrow` memory mapping |
|
||
| **MicroPython** | Not available (streaming only) |
|
||
|
||
### Exponential Backoff
|
||
|
||
All platforms implement exponential backoff for HTTP downloads:
|
||
|
||
```python
|
||
# Python
|
||
async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id):
|
||
delay = base_delay
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url) as response:
|
||
if response.status == 200:
|
||
return await response.read()
|
||
except Exception as e:
|
||
if attempt < max_retries:
|
||
await asyncio.sleep(delay / 1000.0)
|
||
delay = min(delay * 2, max_delay)
|
||
raise Exception("Failed to fetch after max retries")
|
||
```
|
||
|
||
### Correlation ID Logging
|
||
|
||
All platforms use correlation IDs for distributed tracing:
|
||
|
||
```
|
||
[timestamp] [Correlation: abc123] Message published to subject
|
||
```
|
||
|
||
### Serialization Performance
|
||
|
||
| Format | Use Case | Pros | Cons |
|
||
|--------|----------|------|------|
|
||
| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library, not supported in MicroPython |
|
||
| `jsontable` | Small/medium tabular data | Human-readable, universal support, works in MicroPython | Slower, larger size, no schema enforcement |
|
||
|
||
---
|
||
|
||
## Testing
|
||
|
||
### Test File Organization
|
||
|
||
| Platform | Sender Tests | Receiver Tests |
|
||
|----------|--------------|----------------|
|
||
| **Julia** | `test/test_julia_*_sender.jl` | `test/test_julia_*_receiver.jl` |
|
||
| **JavaScript** | `test/test_js_*_sender.js` | `test/test_js_*_receiver.js` |
|
||
| **Python** | `test/test_py_*_sender.py` | `test/test_py_*_receiver.py` |
|
||
|
||
### Run Tests
|
||
|
||
```bash
|
||
# Julia
|
||
julia test/test_julia_text_sender.jl
|
||
julia test/test_julia_text_receiver.jl
|
||
|
||
# JavaScript (Node.js)
|
||
node test/test_js_text_sender.js
|
||
node test/test_js_text_receiver.js
|
||
|
||
# Python
|
||
python3 test/test_py_text_sender.py
|
||
python3 test/test_py_text_receiver.py
|
||
```
|
||
|
||
---
|
||
|
||
## Troubleshooting
|
||
|
||
### Common Issues
|
||
|
||
1. **NATS Connection Failed**
|
||
- Ensure NATS server is running
|
||
- Check `broker_url` configuration
|
||
|
||
2. **HTTP Upload Failed**
|
||
- Ensure file server is running
|
||
- Check `fileserver_url` configuration
|
||
- Verify upload permissions
|
||
|
||
3. **Arrow IPC Deserialization Error**
|
||
- Ensure data is properly serialized to Arrow format
|
||
- Check Arrow version compatibility
|
||
- MicroPython doesn't support Arrow IPC
|
||
|
||
4. **Memory Constraints (MicroPython)**
|
||
- Reduce `size_threshold`
|
||
- Use direct transport only (< 100KB)
|
||
- Avoid large payloads
|
||
- Use `jsontable` instead of `arrowtable` (arrowtable not supported)
|
||
|
||
5. **Row-Oriented vs Column-Oriented Conversion Issues**
|
||
- Julia/Python: DataFrames are column-oriented; when sending `jsontable`, they are converted to row-oriented JSON
|
||
- JavaScript/MicroPython: Data is natively row-oriented
|
||
- When receiving `jsontable` in Julia/Python, JSON is automatically converted back to column-oriented DataFrame
|
||
|
||
---
|
||
|
||
## Summary
|
||
|
||
This cross-platform NATS bridge provides:
|
||
|
||
1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across all platforms
|
||
2. **Idiomatic Implementations**:
|
||
- **Julia**: Multiple dispatch, struct-based design, native Arrow IPC
|
||
- **JavaScript**: Async/await, prototype-based utilities, class-based NATS client
|
||
- **Python**: Class-based design with dataclasses, type hints, async/await
|
||
- **MicroPython**: Synchronous API, memory-constrained optimizations
|
||
3. **Message Format Consistency**: Identical JSON schemas across all platforms
|
||
4. **Handler Abstraction**: File server operations abstracted through configurable handlers
|
||
5. **Platform-Specific Optimizations**:
|
||
- **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython)
|
||
- **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in all platforms)
|
||
6. **Row-Oriented ↔ Column-Oriented Conversion**: Automatic conversion between row-oriented (JS, MicroPython) and column-oriented (Julia DataFrame, Python pandas) formats when using `jsontable`
|
||
|
||
The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior.
|
||
|
||
### Datatype Summary
|
||
|
||
| Datatype | Serialization | Use Case | Encoding | Supported Platforms |
|
||
|----------|---------------|----------|----------|---------------------|
|
||
| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python |
|
||
| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python, MicroPython |
|