diff --git a/docs/updated_architecture.md b/docs/updated_architecture.md new file mode 100644 index 0000000..6b0763e --- /dev/null +++ b/docs/updated_architecture.md @@ -0,0 +1,1322 @@ +# Cross-Platform Architecture Documentation: Bi-Directional Data Bridge + +## Overview + +This document describes the architecture for a high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. + +**Supported Platforms:** +- **Julia** - Ground truth implementation with full feature set +- **JavaScript** - Node.js and browser-compatible implementation +- **Python/MicroPython** - Desktop and embedded-compatible implementation + +### Cross-Platform Design Principles + +1. **High-Level API Parity**: All three platforms expose the same `smartsend()` and `smartreceive()` functions with identical signatures and behavior +2. **Idiomatic Implementations**: Each platform uses its native patterns (multiple dispatch in Julia, async/prototype in JS, class-based in Python) +3. **Message Format Consistency**: The `msg_envelope_v1` and `msg_payload_v1` JSON schemas are identical across all platforms +4. **Handler Function Abstraction**: File server operations are abstracted through handler functions for backend flexibility + +--- + +## High-Level API Standard (Cross-Platform) + +### Unified API Signature + +All three platforms expose the same high-level API: + +**Input Format (smartsend):** +``` +[(dataname1, data1, type1), (dataname2, data2, type2), ...] +``` + +**Output Format (smartreceive):** +``` +{ + "correlation_id": "...", + "msg_id": "...", + "timestamp": "...", + "send_to": "...", + "msg_purpose": "...", + "sender_name": "...", + "sender_id": "...", + "receiver_name": "...", + "receiver_id": "...", + "reply_to": "...", + "reply_to_msg_id": "...", + "broker_url": "...", + "metadata": {...}, + "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] +} +``` + +### Supported Payload Types + +| Type | Julia | JavaScript | Python/MicroPython | +|------|-------|------------|-------------------| +| `text` | `String` | `string` | `str` | +| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | +| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | +| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | +| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | +| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | +| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | +| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray`, `io.BytesIO` | + +### Cross-Platform API Examples + +**Julia:** +```julia +using NATSBridge + +# Send +env, env_json_str = smartsend( + "/chat", + [("message", "Hello!", "text"), ("image", image_bytes, "image")], + broker_url="nats://localhost:4222" +) + +# Receive - returns JSON.Object{String, Any} +env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) +# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}} +# Access payloads: for (dataname, data, type) in env["payloads] +``` + +**JavaScript:** +```javascript +const NATSBridge = require('natsbridge'); + +// Send +const [env, env_json_str] = await NATSBridge.smartsend( + "/chat", + [ + ["message", "Hello!", "text"], + ["image", imageBuffer, "image"] + ], + { broker_url: "nats://localhost:4222" } +); + +// Receive - returns Promise +const env = await NATSBridge.smartreceive(msg, { + fileserver_download_handler: fetchWithBackoff +}); +// env is an object with "payloads" field containing Array of arrays +// Access payloads: for (const [dataname, data, type] of env.payloads) +``` + +**Python:** +```python +from natsbridge import NATSBridge + +# Send +env, env_json_str = NATSBridge.smartsend( + "/chat", + [("message", "Hello!", "text"), ("image", image_bytes, "image")], + broker_url="nats://localhost:4222" +) + +# Receive - returns Tuple[Dict, str] +env = NATSBridge.smartreceive( + msg, + fileserver_download_handler=fetch_with_backoff +) +# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]] +# Access payloads: for dataname, data, type_ in env["payloads"] +``` + +**MicroPython:** +```python +from natsbridge import NATSBridge + +# Send (limited to direct transport due to memory constraints) +env, env_json_str = NATSBridge.smartsend( + "/chat", + [("message", "Hello!", "text")], + broker_url="nats://localhost:4222" +) +``` + +--- + +## Architecture Diagram (Cross-Platform) + +```mermaid +flowchart TB + subgraph JuliaApp["Julia Application"] + JuliaAppCode[App Code] + JuliaBridge[NATSBridge.jl] + JuliaNATS[NATS.jl] + end + + subgraph JSApp["JavaScript Application"] + JSAppCode[App Code] + JSBridge[NATSBridge.js] + JSNATS[nats.js] + end + + subgraph PythonApp["Python/MicroPython Application"] + PythonAppCode[App Code] + PythonBridge[NATSBridge.py] + PythonNATS[nats.py] + end + + subgraph Infrastructure["Infrastructure"] + NATS[NATS Server
Message Broker] + FileServer[HTTP File Server
Upload/Download] + end + + JuliaAppCode --> JuliaBridge + JuliaBridge --> JuliaNATS + JSAppCode --> JSBridge + JSBridge --> JSNATS + PythonAppCode --> PythonBridge + PythonBridge --> PythonNATS + + JuliaNATS --> NATS + JSNATS --> NATS + PythonNATS --> NATS + + NATS --> JuliaNATS + NATS --> JSNATS + NATS --> PythonNATS + + JuliaBridge -.->|HTTP POST upload| FileServer + JSBridge -.->|HTTP POST upload| FileServer + PythonBridge -.->|HTTP POST upload| FileServer + + FileServer -.->|HTTP GET download| JuliaBridge + FileServer -.->|HTTP GET download| JSBridge + FileServer -.->|HTTP GET download| PythonBridge + + style JuliaApp fill:#c5e1a5 + style JSApp fill:#bbdefb + style PythonApp fill:#f8bbd0 + style NATS fill:#fff3e0 + style FileServer fill:#f3e5f5 +``` + +--- + +## System Components + +### 1. msg_envelope_v1 - Message Envelope + +**JSON Schema (Identical Across All Platforms):** +```json +{ + "correlation_id": "uuid-v4-string", + "msg_id": "uuid-v4-string", + "timestamp": "2024-01-15T10:30:00Z", + + "send_to": "topic/subject", + "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat", + "sender_name": "agent-wine-web-frontend", + "sender_id": "uuid4", + "receiver_name": "agent-backend", + "receiver_id": "uuid4", + "reply_to": "topic", + "reply_to_msg_id": "uuid4", + "broker_url": "nats://localhost:4222", + + "metadata": { + "content_type": "application/octet-stream", + "content_length": 123456 + }, + + "payloads": [ + { + "id": "uuid4", + "dataname": "login_image", + "payload_type": "image", + "transport": "direct", + "encoding": "base64", + "size": 15433, + "data": "base64-encoded-string", + "metadata": { + "checksum": "sha256_hash" + } + }, + { + "id": "uuid4", + "dataname": "large_arrow_table", + "payload_type": "arrowtable", + "transport": "link", + "encoding": "arrow-ipc", + "size": 524288, + "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow", + "metadata": {} + }, + { + "id": "uuid4", + "dataname": "json_table", + "payload_type": "jsontable", + "transport": "direct", + "encoding": "json", + "size": 1024, + "data": "[{\"id\": 1, \"name\": \"Alice\"}, {\"id\": 2, \"name\": \"Bob\"}]", + "metadata": {} + } + ] +} +``` + +### 2. msg_payload_v1 - Payload Structure + +**JSON Schema (Identical Across All Platforms):** +```json +{ + "id": "uuid4", + "dataname": "login_image", + "payload_type": "image | dictionary | arrowtable | jsontable | text | audio | video | binary", + "transport": "direct | link", + "encoding": "none | json | base64 | arrow-ipc", + "size": 15433, + "data": "base64-encoded-string | http-url | json-string", + "metadata": { + "checksum": "sha256_hash" + } +} +``` + +### 3. Transport Strategy Decision Logic (Cross-Platform) + +``` +┌─────────────────────────────────────────────────────────────┐ +│ smartsend Function (All Platforms) │ +│ Accepts: [(dataname1, data1, type1), ...] │ +│ (Type is per payload, not standalone) │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ For each payload: │ +│ 1. Extract type from tuple/array │ +│ 2. Serialize based on type │ +│ 3. Check payload size │ +└─────────────────────────────────────────────────────────────┘ + │ + ┌───────────┴────────────┐ + ▼ ▼ + ┌──────────────┐ ┌──────────────┐ + │ Direct Path │ │ Link Path │ + │ (< 1MB) │ │ (>= 1MB) │ + │ │ │ │ + │ • Serialize │ │ • Serialize │ + │ to buffer │ │ to buffer │ + │ • Base64/JSON│ │ • Upload to │ + │ encode │ │ HTTP Server│ + │ • Publish to │ │ • Publish to │ + │ NATS │ │ NATS with │ + │ (in msg) │ │ URL │ + └──────────────┘ └──────────────┘ +``` + +--- + +## Platform-Specific Implementations + +### Julia Implementation + +#### Architecture Patterns + +**Multiple Dispatch:** Julia's core strength is leveraged through function overloading: + +```julia +# publish_message has two overloads based on argument types +function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) + conn = NATS.connect(broker_url) + publish_message(conn, subject, message, correlation_id) +end + +function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) + try + NATS.publish(conn, subject, message) + log_trace(correlation_id, "Message published to $subject") + finally + NATS.drain(conn) + end +end +``` + +**Struct-Based Data Models:** +```julia +struct msg_payload_v1 + id::String + dataname::String + payload_type::String + transport::String + encoding::String + size::Integer + data::Any + metadata::Dict{String, Any} +end + +struct msg_envelope_v1 + correlation_id::String + msg_id::String + timestamp::String + send_to::String + msg_purpose::String + sender_name::String + sender_id::String + receiver_name::String + receiver_id::String + reply_to::String + reply_to_msg_id::String + broker_url::String + metadata::Dict{String, Any} + payloads::Vector{msg_payload_v1} +end +``` + +#### Dependencies + +| Package | Purpose | +|---------|---------| +| `NATS.jl` | Core NATS functionality | +| `Arrow.jl` | Arrow IPC serialization | +| `JSON.jl` | JSON parsing | +| `HTTP.jl` | HTTP client for file server | +| `UUIDs.jl` | UUID generation | +| `Dates.jl` | Timestamps | +| `Base64` | Base64 encoding | + +#### File Server Handler Signatures + +```julia +# Upload handler +fileserver_upload_handler( + fileserver_url::String, + dataname::String, + data::Vector{UInt8} +)::Dict{String, Any} + +# Download handler +fileserver_download_handler( + url::String, + max_retries::Int, + base_delay::Int, + max_delay::Int, + correlation_id::String +)::Vector{UInt8} +``` + +#### Key Functions + +```julia +# Main send/receive functions +function smartsend( + subject::String, + data::AbstractArray{Tuple{String, Any, String}, 1}; + broker_url::String = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler::Function = plik_oneshot_upload, + size_threshold::Int = DEFAULT_SIZE_THRESHOLD, + correlation_id::String = string(uuid4()), + msg_purpose::String = "chat", + sender_name::String = "NATSBridge", + receiver_name::String = "", + receiver_id::String = "", + reply_to::String = "", + reply_to_msg_id::String = "", + is_publish::Bool = true, + NATS_connection::Union{NATS.Connection, Nothing} = nothing, + msg_id::String = string(uuid4()), + sender_id::String = string(uuid4()) +)::Tuple{msg_envelope_v1, String} + +function smartreceive( + msg::NATS.Msg; + fileserver_download_handler::Function = _fetch_with_backoff, + max_retries::Int = 5, + base_delay::Int = 100, + max_delay::Int = 5000 +)::JSON.Object{String, Any} +``` + +#### Serialization Logic for Tables + +```julia +# Serialize table data based on payload_type +function _serialize_table_data(data::Any, payload_type::String)::Vector{UInt8} + if payload_type == "arrowtable" + # Serialize to Apache Arrow IPC format + buffer = IOBuffer() + Arrow.write(buffer, data) + return take!(buffer) + elseif payload_type == "jsontable" + # Serialize to JSON format + json_str = JSON.json(data) + return Vector{UInt8}(json_str) + else + throw(ArgumentError("Unknown payload_type: $payload_type")) + end +end + +# Deserialize table data based on payload_type +function _deserialize_table_data(data::Vector{UInt8}, payload_type::String)::Any + if payload_type == "arrowtable" + # Deserialize from Apache Arrow IPC format + buffer = Buffer(data) + return Arrow.read(buffer) + elseif payload_type == "jsontable" + # Deserialize from JSON format + json_str = String(data) + return JSON.parse(json_str) + else + throw(ArgumentError("Unknown payload_type: $payload_type")) + end +end +``` + +--- + +### JavaScript Implementation + +#### Architecture Patterns + +**Async/Await Pattern:** JavaScript uses async/await for non-blocking I/O: + +```javascript +// smartsend is async and returns a Promise +async function smartsend(subject, data, options = {}) { + const { + broker_url = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler = plikOneshotUpload, + size_threshold = DEFAULT_SIZE_THRESHOLD, + correlation_id = generateUUID(), + msg_purpose = "chat", + sender_name = "NATSBridge", + receiver_name = "", + receiver_id = "", + reply_to = "", + reply_to_msg_id = "", + is_publish = true, + nats_connection = null, + msg_id = generateUUID(), + sender_id = generateUUID() + } = options; + + // Process payloads + const payloads = []; + for (const [dataname, payloadData, payloadType] of data) { + const payloadBytes = await serializeData(payloadData, payloadType); + const payloadSize = payloadBytes.byteLength; + + if (payloadSize < size_threshold) { + // Direct path + const payloadB64 = base64Encode(payloadBytes); + payloads.push({ + id: generateUUID(), + dataname, + payload_type: payloadType, + transport: "direct", + encoding: "base64", + size: payloadSize, + data: payloadB64 + }); + } else { + // Link path + const response = await fileserver_upload_handler( + fileserver_url, dataname, payloadBytes + ); + payloads.push({ + id: generateUUID(), + dataname, + payload_type: payloadType, + transport: "link", + encoding: "none", + size: payloadSize, + data: response.url + }); + } + } + + const env = buildEnvelope(subject, payloads, { + correlation_id, msg_id, msg_purpose, + sender_name, sender_id, receiver_name, + receiver_id, reply_to, reply_to_msg_id, + broker_url + }); + + const env_json_str = JSON.stringify(env); + + if (is_publish) { + if (nats_connection) { + await publishMessage(nats_connection, subject, env_json_str, correlation_id); + } else { + await publishMessage(broker_url, subject, env_json_str, correlation_id); + } + } + + return [env, env_json_str]; +} +``` + +**Prototype-Based Utilities:** +```javascript +// NATS client wrapper (prototype-based) +class NATSClient { + constructor(url) { + this.url = url; + this.connection = null; + } + + async connect() { + this.connection = await nats.connect({ servers: this.url }); + return this.connection; + } + + async publish(subject, message) { + if (!this.connection) { + await this.connect(); + } + await this.connection.publish(subject, message); + } + + async close() { + if (this.connection) { + this.connection.close(); + } + } +} +``` + +#### Dependencies (Node.js) + +| Package | Purpose | +|---------|---------| +| `nats` | Core NATS functionality (nats.js) | +| `uuid` | UUID generation | +| `node-fetch` or `axios` | HTTP client for file server | +| `apache-arrow` | Arrow IPC serialization | + +#### Dependencies (Browser) + +| Package | Purpose | +|---------|---------| +| `nats` | Browser-compatible NATS client | +| `uuid` | UUID generation | +| `fetch` (native) | HTTP client for file server | +| `apache-arrow` | Arrow IPC serialization | + +#### Dependencies (MicroPython) + +| Module | Purpose | +|--------|---------| +| `nats` (custom) | MicroPython NATS client | +| `time` | Timestamps | +| `uos` | File operations | +| `base64` | Base64 encoding | + +#### File Server Handler Signatures + +```javascript +// Upload handler - async function returning Promise +async function fileserver_upload_handler( + fileserver_url, + dataname, + data // Uint8Array +) { + // Returns: { status, uploadid, fileid, url } +} + +// Download handler - async function returning Promise +async function fileserver_download_handler( + url, + max_retries, + base_delay, + max_delay, + correlation_id +) { + // Returns: Uint8Array +} +``` + +#### Key Functions + +```javascript +// Main send/receive functions +async function smartsend(subject, data, options = {}) { + // data: Array of [dataname, data, type] tuples + // Returns: Promise<[env, env_json_str]> +} + +async function smartreceive(msg, options = {}) { + // msg: NATS message object + // Returns: Promise +} + +// Utility functions +async function serializeData(data, payload_type) { + // Returns: Uint8Array +} + +async function deserializeData(data, payload_type) { + // Returns: deserialized data +} + +async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) { + // Returns: Uint8Array +} +``` + +#### Serialization Logic for Tables + +```javascript +// Serialize table data based on payload_type +async function serializeTableData(data, payload_type) { + if (payload_type === "arrowtable") { + // Serialize to Apache Arrow IPC format + const schema = new arrow.Schema([...]); // Define schema + const arr = arrow.tableToArrowTable(data, schema); + const buffer = arrow.RecordBatch.from(arr).toBuffer(); + return new Uint8Array(buffer); + } else if (payload_type === "jsontable") { + // Serialize to JSON format + const jsonStr = JSON.stringify(data); + return new TextEncoder().encode(jsonStr); + } else { + throw new Error(`Unknown payload_type: ${payload_type}`); + } +} + +// Deserialize table data based on payload_type +async function deserializeTableData(data, payload_type) { + if (payload_type === "arrowtable") { + // Deserialize from Apache Arrow IPC format + const buffer = arrow.arrayBufferToBuffer(data.buffer); + const batch = arrow.RecordBatch.deserialize(buffer); + return arrow.tableFromBatch(batch); + } else if (payload_type === "jsontable") { + // Deserialize from JSON format + const jsonStr = new TextDecoder().decode(data); + return JSON.parse(jsonStr); + } else { + throw new Error(`Unknown payload_type: ${payload_type}`); + } +} +``` + +--- + +### Python/MicroPython Implementation + +#### Architecture Patterns + +**Class-Based Design:** Python uses classes for stateful operations: + +```python +class NATSBridge: + """Cross-platform NATS bridge implementation.""" + + DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB + DEFAULT_BROKER_URL = "nats://localhost:4222" + DEFAULT_FILESERVER_URL = "http://localhost:8080" + + def __init__(self, broker_url=None, fileserver_url=None): + self.broker_url = broker_url or self.DEFAULT_BROKER_URL + self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL + self._nats_client = None + + async def smartsend(self, subject, data, **kwargs): + """ + Send data via NATS with automatic transport selection. + + Args: + subject: NATS subject to publish to + data: List of (dataname, data, type) tuples + **kwargs: Additional options (broker_url, fileserver_url, etc.) + + Returns: + Tuple of (env, env_json_str) + """ + # Extract options with defaults + options = self._merge_options(kwargs) + + # Process payloads + payloads = [] + for dataname, payload_data, payload_type in data: + payload_bytes = self._serialize_data(payload_data, payload_type) + payload_size = len(payload_bytes) + + if payload_size < options['size_threshold']: + # Direct path + payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') + payloads.append({ + 'id': uuid.uuid4().hex, + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'direct', + 'encoding': 'base64', + 'size': payload_size, + 'data': payload_b64 + }) + else: + # Link path + response = await options['fileserver_upload_handler']( + options['fileserver_url'], dataname, payload_bytes + ) + payloads.append({ + 'id': uuid.uuid4().hex, + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'link', + 'encoding': 'none', + 'size': payload_size, + 'data': response['url'] + }) + + # Build envelope + env = self._build_envelope(subject, payloads, options) + env_json_str = json.dumps(env) + + if options['is_publish']: + await self._publish_message( + subject, env_json_str, options['correlation_id'], + nats_connection=options.get('nats_connection') + ) + + return env, env_json_str + + async def smartreceive(self, msg, **kwargs): + """ + Receive and process NATS message. + + Args: + msg: NATS message object + **kwargs: Additional options (fileserver_download_handler, etc.) + + Returns: + Dict with envelope metadata and payloads + """ + # Parse envelope + env_json_obj = json.loads(msg.payload) + + # Process payloads + payloads_list = [] + for payload in env_json_obj['payloads']: + transport = payload['transport'] + dataname = payload['dataname'] + + if transport == 'direct': + payload_b64 = payload['data'] + payload_bytes = base64.b64decode(payload_b64) + data_type = payload['payload_type'] + data = self._deserialize_data(payload_bytes, data_type) + payloads_list.append((dataname, data, data_type)) + elif transport == 'link': + url = payload['data'] + downloaded_data = await options['fileserver_download_handler']( + url, + options['max_retries'], + options['base_delay'], + options['max_delay'], + env_json_obj['correlation_id'] + ) + data_type = payload['payload_type'] + data = self._deserialize_data(downloaded_data, data_type) + payloads_list.append((dataname, data, data_type)) + + env_json_obj['payloads'] = payloads_list + return env_json_obj +``` + +**Dataclass for Type Safety:** +```python +from dataclasses import dataclass, field +from typing import Any, Dict, List, Tuple, Union + +@dataclass +class MsgPayloadV1: + """Message payload structure.""" + id: str + dataname: str + payload_type: str + transport: str + encoding: str + size: int + data: Union[str, bytes] # URL for link, base64 for direct + metadata: Dict[str, Any] = field(default_factory=dict) + +@dataclass +class MsgEnvelopeV1: + """Message envelope structure.""" + correlation_id: str + msg_id: str + timestamp: str + send_to: str + msg_purpose: str + sender_name: str + sender_id: str + receiver_name: str + receiver_id: str + reply_to: str + reply_to_msg_id: str + broker_url: str + metadata: Dict[str, Any] = field(default_factory=dict) + payloads: List[MsgPayloadV1] = field(default_factory=list) +``` + +#### Dependencies (Desktop Python) + +| Package | Purpose | +|---------|---------| +| `nats-py` | Core NATS functionality | +| `uuid` | UUID generation (stdlib) | +| `aiohttp` or `requests` | HTTP client for file server | +| `pyarrow` | Arrow IPC serialization | +| `pandas` | DataFrame support (optional) | +| `python-dateutil` | Timestamps | +| `base64` | Base64 encoding (stdlib) | + +#### Dependencies (MicroPython) + +| Module | Purpose | +|--------|---------| +| `network` | NATS connection (custom) | +| `time` | Timestamps | +| `uos` | File operations | +| `base64` | Base64 encoding | +| `json` | JSON parsing | +| `struct` | Binary data handling | + +**MicroPython Limitations:** +- No Arrow IPC support (memory constraints) +- Only direct transport (< 1MB threshold enforced) +- Simplified UUID generation +- No async/await (use callbacks or uasyncio) + +#### File Server Handler Signatures + +```python +# Upload handler - async function +async def fileserver_upload_handler( + fileserver_url: str, + dataname: str, + data: bytes +) -> Dict[str, Any]: + """ + Upload data to file server. + + Args: + fileserver_url: Base URL of file server + dataname: Name of the file + data: Binary data + + Returns: + Dict with keys: 'status', 'uploadid', 'fileid', 'url' + """ + pass + +# Download handler - async function +async def fileserver_download_handler( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytes: + """ + Download data from URL with exponential backoff. + + Args: + url: URL to download from + max_retries: Maximum retry attempts + base_delay: Initial delay in ms + max_delay: Maximum delay in ms + correlation_id: Correlation ID for logging + + Returns: + Downloaded bytes + """ + pass +``` + +#### Key Functions + +```python +# Main send/receive functions (standalone or class methods) +async def smartsend( + subject: str, + data: List[Tuple[str, Any, str]], + broker_url: str = DEFAULT_BROKER_URL, + fileserver_url: str = DEFAULT_FILESERVER_URL, + fileserver_upload_handler: Callable = plik_oneshot_upload, + size_threshold: int = DEFAULT_SIZE_THRESHOLD, + correlation_id: str = None, + msg_purpose: str = "chat", + sender_name: str = "NATSBridge", + receiver_name: str = "", + receiver_id: str = "", + reply_to: str = "", + reply_to_msg_id: str = "", + is_publish: bool = True, + nats_connection: Any = None, + msg_id: str = None, + sender_id: str = None +) -> Tuple[Dict, str]: + """Send data via NATS.""" + pass + +async def smartreceive( + msg: Any, + fileserver_download_handler: Callable = fetch_with_backoff, + max_retries: int = 5, + base_delay: int = 100, + max_delay: int = 5000 +) -> Dict: + """Receive and process NATS message.""" + pass + +# Utility functions +def _serialize_data(data: Any, payload_type: str) -> bytes: + """Serialize data to bytes.""" + pass + +def _deserialize_data(data: bytes, payload_type: str) -> Any: + """Deserialize bytes to data.""" + pass + +async def fetch_with_backoff( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytes: + """Fetch URL with exponential backoff.""" + pass +``` + +#### Serialization Logic for Tables + +```python +# Serialize table data based on payload_type +def serialize_table_data(data: Any, payload_type: str) -> bytes: + if payload_type == "arrowtable": + # Serialize to Apache Arrow IPC format + import pyarrow as pa + import pyarrow.feather as feather + import io + + if isinstance(data, pd.DataFrame): + table = pa.Table.from_pandas(data) + buffer = io.BytesIO() + feather.write_feather(table, buffer) + return buffer.getvalue() + else: + raise TypeError("Expected pandas DataFrame for arrowtable") + + elif payload_type == "jsontable": + # Serialize to JSON format + if isinstance(data, list) and all(isinstance(row, dict) for row in data): + return json.dumps(data).encode('utf-8') + else: + raise TypeError("Expected list of dicts for jsontable") + + else: + raise ValueError(f"Unknown payload_type: {payload_type}") + +# Deserialize table data based on payload_type +def deserialize_table_data(data: bytes, payload_type: str) -> Any: + if payload_type == "arrowtable": + # Deserialize from Apache Arrow IPC format + import pyarrow as pa + import pyarrow.feather as feather + import io + + buffer = io.BytesIO(data) + table = feather.read_table(buffer) + return table.to_pandas() + + elif payload_type == "jsontable": + # Deserialize from JSON format + json_str = data.decode('utf-8') + return json.loads(json_str) + + else: + raise ValueError(f"Unknown payload_type: {payload_type}") +``` + +--- + +## Platform Comparison Matrix + +| Feature | Julia | JavaScript | Python | MicroPython | +|---------|-------|------------|--------|-------------| +| **Multiple Dispatch** | ✅ Native | ❌ (Prototypes) | ❌ (Overload via `@overload`) | ❌ | +| **Async/Await** | ❌ (Tasks) | ✅ Native | ✅ Native | ⚠️ (uasyncio) | +| **Type Safety** | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ | +| **Memory Management** | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) | +| **Arrow IPC** | ✅ Native | ✅ (arrow package) | ✅ (pyarrow) | ❌ | +| **JSON Serialization** | ✅ (JSON.jl) | ✅ (native) | ✅ (json) | ✅ (json) | +| **arrowtable Support** | ✅ | ✅ | ✅ | ❌ | +| **jsontable Support** | ✅ | ✅ | ✅ | ✅ | +| **Direct Transport** | ✅ | ✅ | ✅ | ✅ | +| **Link Transport** | ✅ | ✅ | ✅ | ⚠️ (Limited) | +| **Handler Functions** | ✅ | ✅ | ✅ | ✅ | +| **Cross-Platform API** | ✅ | ✅ | ✅ | ✅ | + +--- + +## Implementation Details by Platform + +### Julia: Multiple Dispatch Pattern + +```julia +# Function overloading based on argument types +function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) + # Creates new connection +end + +function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) + # Uses pre-existing connection +end + +# Type-specific serialization +function _serialize_data(data::String, payload_type::String) + # Text handling +end + +function _serialize_data(data::Dict, payload_type::String) + # Dictionary handling +end + +function _serialize_data(data::DataFrame, payload_type::String) + # Table handling - arrowtable +end + +function _serialize_data(data::Vector{NamedTuple}, payload_type::String) + # Table handling - jsontable +end +``` + +### JavaScript: Prototype + Async Pattern + +```javascript +// Class-based NATS client +class NATSClient { + constructor(url) { + this.url = url; + } + + async connect() { + // Connection logic + } + + async publish(subject, message) { + // Publish logic + } +} + +// Module-level utility functions +function generateUUID() { + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => { + const r = Math.random() * 16 | 0; + return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16); + }); +} + +async function serializeData(data, payload_type) { + // Serialization logic for arrowtable and jsontable +} +``` + +### Python: Class-Based Pattern + +```python +class NATSBridge: + """Main bridge class.""" + + def __init__(self, broker_url=None): + self.broker_url = broker_url or DEFAULT_BROKER_URL + + async def smartsend(self, subject, data, **kwargs): + """Send data.""" + pass + + async def smartreceive(self, msg, **kwargs): + """Receive message.""" + pass + +# Module-level convenience functions +def smartsend(subject, data, **kwargs): + """Convenience function using default NATSBridge instance.""" + bridge = NATSBridge() + return await bridge.smartsend(subject, data, **kwargs) + +def smartreceive(msg, **kwargs): + """Convenience function using default NATSBridge instance.""" + bridge = NATSBridge() + return await bridge.smartreceive(msg, **kwargs) +``` + +--- + +## Scenario Implementations (Cross-Platform) + +### Scenario 1: Command & Control (Small Dictionary) + +| Platform | Code | +|----------|------| +| **Julia** | ```julia
config = Dict("step_size" => 0.01)
env, env_json_str = smartsend("control", [("config", config, "dictionary")])``` | +| **JavaScript** | ```javascript
const config = { step_size: 0.01 };
[env, env_json_str] = await smartsend("control", [["config", config, "dictionary"]]);``` | +| **Python** | ```python
config = {"step_size": 0.01}
env, env_json_str = await smartsend("control", [("config", config, "dictionary")])``` | + +### Scenario 2: Deep Dive Analysis (Large Arrow Table) + +| Platform | Code | +|----------|------| +| **Julia** | ```julia
df = DataFrame(id=1:1000000, value=rand(1000000))
env, env_json_str = smartsend("analysis", [("table_data", df, "arrowtable")])``` | +| **JavaScript** | ```javascript
const df = [{ id: 1, value: 0.5 }, ...];
[env, env_json_str] = await smartsend("analysis", [["table_data", df, "arrowtable"]]);``` | +| **Python** | ```python
import pandas as pd
df = pd.DataFrame({"id": range(1000000), "value": np.random.rand(1000000)})
env, env_json_str = await smartsend("analysis", [("table_data", df, "arrowtable")])``` | + +### Scenario 3: Chat System (Multi-Payload) + +| Platform | Code | +|----------|------| +| **Julia** | ```julia
chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]
env, env_json_str = smartsend("chat", chat)``` | +| **JavaScript** | ```javascript
const chat = [["text", "Hello!", "text"], ["image", imgBuffer, "image"]];
[env, env_json_str] = await smartsend("chat", chat);``` | +| **Python** | ```python
chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]
env, env_json_str = await smartsend("chat", chat)``` | + +### Scenario 4: JSON Table Transfer (Cross-Platform) + +| Platform | Code | +|----------|------| +| **Julia** | ```julia
rows = [Dict("id" => 1, "name" => "Alice"), Dict("id" => 2, "name" => "Bob")]
env, env_json_str = smartsend("data", [("users", rows, "jsontable")])``` | +| **JavaScript** | ```javascript
const users = [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }];
[env, env_json_str] = await smartsend("data", [["users", users, "jsontable"]]);``` | +| **Python** | ```python
users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
env, env_json_str = await smartsend("data", [("users", users, "jsontable")])``` | + +### Scenario 5: Smart Transport Selection + +The `smartsend` function automatically selects the transport method based on payload size: + +- **Direct Transport (< 1MB)**: Payload is serialized and embedded directly in the NATS message + - `arrowtable`: Serialized to Arrow IPC, base64 encoded + - `jsontable`: Serialized to JSON, base64 encoded + - `dictionary`: Serialized to JSON, base64 encoded + - `text`: Serialized to UTF-8, base64 encoded + - `image/audio/video/binary`: Base64 encoded + +- **Link Transport (>= 1MB)**: Payload is uploaded to HTTP file server, URL embedded in message + - All types supported + - Receiver downloads from URL and deserializes + +--- + +## Performance Considerations (Cross-Platform) + +### Zero-Copy Reading + +| Platform | Strategy | +|----------|----------| +| **Julia** | `Arrow.read()` with memory-mapped files | +| **JavaScript** | `ArrayBuffer` with `DataView` | +| **Python** | `pyarrow` memory mapping | +| **MicroPython** | Not available (streaming only) | + +### Exponential Backoff + +```python +# Python/MicroPython +async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id): + delay = base_delay + for attempt in range(1, max_retries + 1): + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + return await response.read() + except Exception as e: + if attempt < max_retries: + await asyncio.sleep(delay / 1000.0) + delay = min(delay * 2, max_delay) + raise Exception("Failed to fetch after max retries") +``` + +### Correlation ID Logging + +All platforms use correlation IDs for distributed tracing: + +``` +[timestamp] [Correlation: abc123] Message published to subject +``` + +### Serialization Performance Comparison + +| Format | Use Case | Pros | Cons | +|--------|----------|------|------| +| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library | +| `jsontable` | Small/medium tabular data | Human-readable, universal support | Slower, larger size, no schema | + +--- + +## Testing Strategy (Cross-Platform) + +### Unit Tests + +| Test Type | Julia | JavaScript | Python | +|-----------|-------|------------|--------| +| **Serialization** | `test/test_julia_text_sender.jl` | `test/test_js_text_sender.js` | `test/test_py_text_sender.py` | +| **Deserialization** | `test/test_julia_text_receiver.jl` | `test/test_js_text_receiver.js` | `test/test_py_text_receiver.py` | +| **Large Payload** | `test/test_julia_file_sender.jl` | `test/test_js_file_sender.js` | `test/test_py_file_sender.py` | +| **Multi-Payload** | `test/test_julia_mix_payloads_sender.jl` | `test/test_js_mix_payloads_sender.js` | `test/test_py_mix_payloads_sender.py` | +| **Arrow Table** | `test/test_julia_table_sender.jl` | `test/test_js_table_sender.js` | `test/test_py_table_sender.py` | + +### Integration Tests + +- NATS server communication +- File server upload/download +- Cross-platform message exchange +- Arrow table serialization/deserialization +- JSON table serialization/deserialization + +--- + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `NATS_URL` | `nats://localhost:4222` | NATS server URL | +| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | +| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) | + +### MicroPython-Specific Configuration + +```python +# micropython.conf +NATS_URL = "nats://broker.local:4222" +FILESERVER_URL = "http://fileserver.local:8080" +SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices +MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython +``` + +--- + +## Summary + +This cross-platform NATS bridge provides: + +1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across Julia, JavaScript, and Python/MicroPython +2. **Idiomatic Implementations**: + - Julia: Multiple dispatch and struct-based design + - JavaScript: Async/await and prototype-based utilities + - Python: Class-based design with type hints +3. **Message Format Consistency**: Identical `msg_envelope_v1` and `msg_payload_v1` JSON schemas +4. **Handler Abstraction**: File server operations abstracted through configurable handlers +5. **Platform-Specific Optimizations**: + - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data + - **JSON** (`jsontable`): Universal human-readable format for smaller tables + - Streaming support in MicroPython + +The Julia implementation serves as the **ground truth** for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms. + +### Datatype Summary + +| Datatype | Serialization | Use Case | Encoding | +|----------|---------------|----------|----------| +| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | +| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | diff --git a/docs/updated_implementation.md b/docs/updated_implementation.md new file mode 100644 index 0000000..82b703d --- /dev/null +++ b/docs/updated_implementation.md @@ -0,0 +1,2231 @@ +# Cross-Platform Implementation Guide: Bi-Directional Data Bridge + +## Overview + +This document describes the implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. + +**Supported Platforms:** +- **Julia** - Ground truth implementation (reference) +- **JavaScript** - Node.js and browser implementation +- **Python/MicroPython** - Desktop and embedded implementation + +--- + +## Implementation Files + +| Language | Implementation File | Description | +|----------|---------------------|-------------| +| **Julia** | [`src/NATSBridge.jl`](../src/NATSBridge.jl) | Full Julia implementation with Arrow IPC support | +| **JavaScript** | `src/natsbridge.js` | Node.js/browser implementation | +| **Python** | `src/natsbridge.py` | Desktop Python implementation | +| **MicroPython** | `src/natsbridge_mpy.py` | MicroPython implementation (limited features) | + +--- + +## File Server Handler Architecture + +The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). + +### Handler Function Signatures + +#### Julia + +```julia +# Upload handler - uploads data to file server and returns URL +fileserver_upload_handler( + fileserver_url::String, + dataname::String, + data::Vector{UInt8} +)::Dict{String, Any} + +# Download handler - fetches data from file server URL with exponential backoff +fileserver_download_handler( + url::String, + max_retries::Int, + base_delay::Int, + max_delay::Int, + correlation_id::String +)::Vector{UInt8} +``` + +#### JavaScript + +```javascript +// Upload handler - async function +async function fileserver_upload_handler( + fileserver_url, + dataname, + data // Uint8Array +) { + // Returns: { status, uploadid, fileid, url } +} + +// Download handler - async function +async function fileserver_download_handler( + url, + max_retries, + base_delay, + max_delay, + correlation_id +) { + // Returns: Uint8Array +} +``` + +#### Python + +```python +# Upload handler - async function +async def fileserver_upload_handler( + fileserver_url: str, + dataname: str, + data: bytes +) -> Dict[str, Any]: + """ + Upload data to file server. + + Returns: + Dict with keys: 'status', 'uploadid', 'fileid', 'url' + """ + pass + +# Download handler - async function +async def fileserver_download_handler( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytes: + """ + Download data from URL with exponential backoff. + + Returns: + Downloaded bytes + """ + pass +``` + +#### MicroPython + +```python +# Upload handler - synchronous (no async in MicroPython) +def fileserver_upload_handler( + fileserver_url: str, + dataname: str, + data: bytearray +) -> Dict: + """ + Upload data to file server (synchronous). + + Returns: + Dict with keys: 'status', 'url' + """ + pass + +# Download handler - synchronous +def fileserver_download_handler( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytearray: + """ + Download data from URL with exponential backoff (synchronous). + + Returns: + Downloaded bytes + """ + pass +``` + +--- + +## Multi-Payload Support (Standard API) + +The system uses a **standardized list-of-tuples format** for all payload operations across all platforms. + +### API Standard + +``` +# Input format for smartsend (always a list of tuples with type info) +[(dataname1, data1, type1), (dataname2, data2, type2), ...] + +# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples) +{ + "correlation_id": "...", + "msg_id": "...", + "timestamp": "...", + "send_to": "...", + "msg_purpose": "...", + "sender_name": "...", + "sender_id": "...", + "receiver_name": "...", + "receiver_id": "...", + "reply_to": "...", + "reply_to_msg_id": "...", + "broker_url": "...", + "metadata": {...}, + "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] +} +``` + +### Supported Types + +| Type | Julia | JavaScript | Python | MicroPython | +|------|-------|------------|--------|-------------| +| `text` | `String` | `string` | `str` | `str` | +| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | +| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) | +| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | `list` | +| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | +| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | +| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | +| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` | + +### Cross-Platform Examples + +#### Julia + +```julia +using NATSBridge + +# Single payload - still wrapped in a list +env, env_json_str = smartsend( + "/test", + [("dataname1", data1, "dictionary")], + broker_url="nats://localhost:4222", + fileserver_upload_handler=plik_oneshot_upload +) + +# Multiple payloads with different types +env, env_json_str = smartsend( + "/test", + [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")], + broker_url="nats://localhost:4222" +) + +# Mixed content (chat with text, image, audio) +env, env_json_str = smartsend( + "/chat", + [ + ("message_text", "Hello!", "text"), + ("user_image", image_data, "image"), + ("audio_clip", audio_data, "audio") + ], + broker_url="nats://localhost:4222" +) + +# Receive returns a JSON.Object{String, Any} envelope +env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) +# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}} +# Access payloads: env["payloads"] which is a Vector of tuples +for (dataname, data, type) in env["payloads"] + println("$dataname: $data (type: $type)") +end +``` + +#### JavaScript + +```javascript +const NATSBridge = require('natsbridge'); + +// Single payload +const [env, env_json_str] = await NATSBridge.smartsend( + "/test", + [["dataname1", data1, "dictionary"]], + { + broker_url: "nats://localhost:4222", + fileserver_upload_handler: plikOneshotUpload + } +); + +// Multiple payloads +const [env, env_json_str] = await NATSBridge.smartsend( + "/test", + [ + ["dataname1", data1, "dictionary"], + ["dataname2", data2, "arrowtable"] + ], + { broker_url: "nats://localhost:4222" } +); + +// Mixed content +const [env, env_json_str] = await NATSBridge.smartsend( + "/chat", + [ + ["message_text", "Hello!", "text"], + ["user_image", imageData, "image"], + ["audio_clip", audioData, "audio"] + ], + { broker_url: "nats://localhost:4222" } +); + +// Receive +const env = await NATSBridge.smartreceive(msg, { + fileserver_download_handler: fetchWithBackoff +}); +// env is an object with "payloads" field containing Array of arrays +// Access payloads: env.payloads which is an Array of [dataname, data, type] arrays +for (const [dataname, data, type] of env.payloads) { + console.log(`${dataname}: ${data} (type: ${type})`); +} +``` + +#### Python + +```python +from natsbridge import NATSBridge + +# Single payload +env, env_json_str = await NATSBridge.smartsend( + "/test", + [("dataname1", data1, "dictionary")], + broker_url="nats://localhost:4222", + fileserver_upload_handler=plik_oneshot_upload +) + +# Multiple payloads +env, env_json_str = await NATSBridge.smartsend( + "/test", + [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")], + broker_url="nats://localhost:4222" +) + +# Mixed content +env, env_json_str = await NATSBridge.smartsend( + "/chat", + [ + ("message_text", "Hello!", "text"), + ("user_image", image_data, "image"), + ("audio_clip", audio_data, "audio") + ], + broker_url="nats://localhost:4222" +) + +# Receive +env = await NATSBridge.smartreceive( + msg, + fileserver_download_handler=fetch_with_backoff +) +# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]] +# Access payloads: env["payloads"] which is a list of tuples +for dataname, data, type_ in env["payloads"]: + print(f"{dataname}: {data} (type: {type_})") +``` + +#### MicroPython + +```python +from natsbridge import NATSBridge + +# Limited to text and binary (no tables due to memory constraints) +env, env_json_str = NATSBridge.smartsend( + "/chat", + [ + ("message_text", "Hello!", "text"), + ("binary_data", data_bytes, "binary") + ], + broker_url="nats://localhost:4222", + size_threshold=100000 # Lower threshold for memory constraints +) +# Note: MicroPython uses synchronous handlers +``` + +--- + +## Row-Oriented vs Column-Oriented Data Structures + +Different platforms use different internal representations for tabular data. Understanding these differences is crucial for proper serialization/deserialization when using `jsontable` and `arrowtable` datatypes. + +### Data Structure Comparison + +| Platform | Table Structure | Orientation | +|----------|-----------------|-------------| +| **Julia (DataFrame)** | `Dict{String, Vector}` | Column-oriented | +| **Python (pandas)** | `dict[str, list]` | Column-oriented | +| **JavaScript** | `Array` | Row-oriented | +| **MicroPython** | `list[list]` | Row-oriented | + +### Column-Oriented (Julia DataFrame, Python pandas) + +In column-oriented structures, each column is stored as a separate array/vector: + +**Julia Example:** +```julia +# Create dictionary with column vectors +dict = Dict("customer age" => [15, 20, 25], + "first name" => ["Rohit", "Rahul", "Akshat"]) + +# Convert to DataFrame +df = DataFrame(dict) +println(df) +# Output: +# 3×2 DataFrame +# Row ┆ customer age ┆ first name +# ┆ Int64 ┆ String +# ─────┼──────────────┼──────────── +# 1 ┆ 15 ┆ "Rohit" +# 2 ┆ 20 ┆ "Rahul" +# 3 ┆ 25 ┆ "Akshat" +``` + +**Python Example:** +```python +# Create dictionary with column lists +data = { + "Name": ["Alice", "Bob", "Charlie"], + "Age": [25, 30, 35], + "Score": [88.5, 92.0, 79.5] +} + +# Convert to DataFrame +df = pd.DataFrame(data) +print(df) +# Output: +# Name Age Score +# 0 Alice 25 88.5 +# 1 Bob 30 92.0 +# 2 Charlie 35 79.5 +``` + +### Row-Oriented (JavaScript, MicroPython) + +In row-oriented structures, each row is stored as a separate object/array: + +**JavaScript Example:** +```javascript +// Array of objects (row-oriented) +const users = [ + { Name: "Alice", Age: 25, Score: 88.5 }, + { Name: "Bob", Age: 30, Score: 92.0 }, + { Name: "Charlie", Age: 35, Score: 79.5 } +]; +``` + +**MicroPython Example:** +```python +# List of lists (row-oriented) +users = [ + ["Alice", 25, 88.5], + ["Bob", 30, 92.0], + ["Charlie", 35, 79.5] +] +``` + +### Cross-Platform Conversion for jsontable + +When sending `jsontable` across platforms, the system performs automatic conversion between row-oriented and column-oriented formats: + +**Sending from Julia/Python (column-oriented) to JS/MicroPython (row-oriented):** +1. Convert column-oriented dict to row-oriented array of objects +2. Serialize to JSON +3. Send with `payload_type = "jsontable"` + +**Receiving from JS/MicroPython (row-oriented) to Julia/Python (column-oriented):** +1. Deserialize JSON to row-oriented array of objects +2. Convert to column-oriented dict +3. Create DataFrame from column-oriented dict + +**Example: Julia to JavaScript** +```julia +# Julia side - column-oriented DataFrame +df = DataFrame( + "Name" => ["Alice", "Bob", "Charlie"], + "Age" => [25, 30, 35], + "Score" => [88.5, 92.0, 79.5] +) + +# smartsend automatically converts to row-oriented JSON +env, env_json_str = smartsend( + "/data", + [("users", df, "jsontable")] +) +# JSON sent: [{"Name":"Alice","Age":25,"Score":88.5}, ...] +``` + +```javascript +// JavaScript side - receives row-oriented array +const [env, env_json_str] = await NATSBridge.smartsend( + "/data", + [["users", users, "jsontable"]] +); +// users is already row-oriented: [{Name: "Alice", Age: 25, ...}, ...] +``` + +**Example: JavaScript to Julia** +```javascript +// JavaScript side - row-oriented array +const users = [ + { Name: "Alice", Age: 25, Score: 88.5 }, + { Name: "Bob", Age: 30, Score: 92.0 } +]; + +const [env, env_json_str] = await NATSBridge.smartsend( + "/data", + [["users", users, "jsontable"]] +); +``` + +```julia +# Julia side - receives and converts to column-oriented DataFrame +env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) +# The jsontable is automatically converted to DataFrame +for (dataname, data, type) in env["payloads"] + if type == "jsontable" + # data is now a DataFrame with column-oriented structure + println(data) + # Output: + # 2×3 DataFrame + # Row ┆ Name ┆ Age ┆ Score + # ┆ String ┆ Int64 ┆ Float64 + # ─────┼────────┼──────┼─────── + # 1 ┆ Alice ┆ 25 ┆ 88.5 + # 2 ┆ Bob ┆ 30 ┆ 92.0 + end +end +``` + +--- + +## Architecture + +### Cross-Platform Claim-Check Pattern + +```mermaid +flowchart TD + A[SmartSend Function] --> B{Is payload size < 1MB?} + B -->|Yes | C[Direct Path
< 1MB] + B -->|No | D[Link Path
>= 1MB] + + C --> C1[Serialize to Buffer] + C1 --> C2[Base64/JSON encode] + C2 --> C3[Publish to NATS] + + D --> D1[Serialize to Buffer] + D1 --> D2[Upload to HTTP Server] + D2 --> D3[Publish to NATS with URL] + + style A fill:#e1f5ff,stroke:#0066cc,stroke-width:2px + style B fill:#fff4e1,stroke:#cc6600,stroke-width:2px + style C fill:#e8f5e9,stroke:#008000,stroke-width:2px + style D fill:#e8f5e9,stroke:#008000,stroke-width:2px + style C1 fill:#f5f5f5,stroke:#666,stroke-width:1px + style C2 fill:#f5f5f5,stroke:#666,stroke-width:1px + style C3 fill:#f5f5f5,stroke:#666,stroke-width:1px + style D1 fill:#f5f5f5,stroke:#666,stroke-width:1px + style D2 fill:#f5f5f5,stroke:#666,stroke-width:1px + style D3 fill:#f5f5f5,stroke:#666,stroke-width:1px +``` + +**Claim-Check Pattern Overview:** +- **Direct Path** (< 1MB): Payload is serialized, Base64-encoded, and published directly to NATS +- **Link Path** (≥ 1MB): Payload is serialized, uploaded to an HTTP file server, and only the URL is published to NATS (claim-check pattern) + +### smartsend Return Value + +All platforms return a tuple/array containing both the envelope and JSON string: + +#### Julia + +```julia +env, env_json_str = smartsend(...) +# Returns: ::Tuple{msg_envelope_v1, String} +# env::msg_envelope_v1 - The envelope object with all metadata and payloads +# env_json_str::String - JSON string for publishing to NATS +``` + +#### JavaScript + +```javascript +const [env, env_json_str] = await smartsend(...); +// Returns: Promise<[env, env_json_str]> +// env: Object with all metadata and payloads +// env_json_str: String for publishing to NATS +``` + +#### Python + +```python +env, env_json_str = await smartsend(...) +# Returns: Tuple[Dict, str] +# env: Dict with all metadata and payloads +# env_json_str: String for publishing to NATS +``` + +#### MicroPython + +```python +env, env_json_str = NATSBridge.smartsend(...) +# Returns: Tuple[Dict, str] +# Note: MicroPython returns plain dicts (no structured envelope object) +``` + +--- + +## Installation + +### Julia Dependencies + +```julia +using Pkg +Pkg.add("NATS") +Pkg.add("Arrow") +Pkg.add("JSON3") +Pkg.add("HTTP") +Pkg.add("UUIDs") +Pkg.add("Dates") +``` + +### JavaScript Dependencies (Node.js) + +```bash +npm install nats uuid apache-arrow node-fetch +# or +yarn add nats uuid apache-arrow node-fetch +``` + +### JavaScript Dependencies (Browser) + +```bash +npm install nats uuid apache-arrow +# or use CDN: +# https://unpkg.com/nats-js/dist/bundle/nats.min.js +# https://unpkg.com/apache-arrow/arrow.min.js +``` + +### Python Dependencies (Desktop) + +```bash +pip install nats-py aiohttp pyarrow pandas python-dateutil +``` + +### MicroPython Dependencies + +MicroPython uses built-in modules: +- `network` - NATS connection (custom implementation) +- `time` - Timestamps +- `uos` - File operations +- `base64` - Base64 encoding +- `json` - JSON parsing +- `struct` - Binary data handling + +--- + +## Usage Tutorial + +### Step 1: Start NATS Server + +```bash +docker run -p 4222:4222 nats:latest +``` + +### Step 2: Start HTTP File Server (optional) + +```bash +# Create a directory for file uploads +mkdir -p /tmp/fileserver + +# Use any HTTP server that supports POST for file uploads +# Example: Python's built-in server +python3 -m http.server 8080 --directory /tmp/fileserver +``` + +### Step 3: Run Test Scenarios + +```bash +# Julia tests +julia test/test_julia_to_julia_text_sender.jl +julia test/test_julia_to_julia_text_receiver.jl + +# JavaScript tests (Node.js) +node test/test_js_text_sender.js +node test/test_js_text_receiver.js + +# Python tests +python3 test/test_py_text_sender.py +python3 test/test_py_text_receiver.py +``` + +--- + +## Platform-Specific Implementations + +### Julia Implementation + +#### Module Structure + +```julia +module NATSBridge + using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64 + + # Constants + const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB + const DEFAULT_BROKER_URL = "nats://localhost:4222" + const DEFAULT_FILESERVER_URL = "http://localhost:8080" + + # Structs + struct msg_payload_v1 + id::String + dataname::String + payload_type::String + transport::String + encoding::String + size::Integer + data::Any + metadata::Dict{String, Any} + end + + struct msg_envelope_v1 + correlation_id::String + msg_id::String + timestamp::String + send_to::String + msg_purpose::String + sender_name::String + sender_id::String + receiver_name::String + receiver_id::String + reply_to::String + reply_to_msg_id::String + broker_url::String + metadata::Dict{String, Any} + payloads::Vector{msg_payload_v1} + end + + # Main functions + function smartsend(...) end + function smartreceive(...) end + + # Utility functions + function _serialize_data(...) end + function _deserialize_data(...) end + function envelope_to_json(...) end + function log_trace(...) end + + # File server handlers + function plik_oneshot_upload(...) end + function _fetch_with_backoff(...) end + function publish_message(...) end + + # Internal helpers + function _get_payload_bytes(...) end +end +``` + +#### Multiple Dispatch Pattern + +Julia leverages multiple dispatch for type-specific implementations: + +```julia +# publish_message has two overloads based on argument types +function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) + conn = NATS.connect(broker_url) + publish_message(conn, subject, message, correlation_id) +end + +function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) + try + NATS.publish(conn, subject, message) + log_trace(correlation_id, "Message published to $subject") + finally + NATS.drain(conn) + end +end + +# Type-specific serialization +function _serialize_data(data::String, payload_type::String) + # Text handling + return Vector{UInt8}(data) +end + +function _serialize_data(data::Dict, payload_type::String) + # Dictionary handling + json_str = JSON.json(data) + return Vector{UInt8}(json_str) +end + +function _serialize_data(data::DataFrame, payload_type::String) + # Table handling - arrowtable + io = IOBuffer() + Arrow.write(io, data) + return take!(io) +end +``` + +#### smartsend Implementation + +```julia +function smartsend( + subject::String, + data::AbstractArray{Tuple{String, T1, String}, 1}; + broker_url::String = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler::Function = plik_oneshot_upload, + size_threshold::Int = DEFAULT_SIZE_THRESHOLD, + correlation_id::String = string(uuid4()), + msg_purpose::String = "chat", + sender_name::String = "NATSBridge", + receiver_name::String = "", + receiver_id::String = "", + reply_to::String = "", + reply_to_msg_id::String = "", + is_publish::Bool = true, + NATS_connection::Union{NATS.Connection, Nothing} = nothing, + msg_id::String = string(uuid4()), + sender_id::String = string(uuid4()) +)::Tuple{msg_envelope_v1, String} where {T1<:Any} + + log_trace(correlation_id, "Starting smartsend for subject: $subject") + + # Process each payload in the list + payloads = msg_payload_v1[] + for (dataname, payload_data, payload_type) in data + # Serialize data based on type + payload_bytes = _serialize_data(payload_data, payload_type) + + payload_size = length(payload_bytes) + log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes") + + # Decision: Direct vs Link + if payload_size < size_threshold + # Direct path - Base64 encode and send via NATS + payload_b64 = Base64.base64encode(payload_bytes) + log_trace(correlation_id, "Using direct transport for $payload_size bytes") + + payload = msg_payload_v1( + payload_b64, + payload_type; + id = string(uuid4()), + dataname = dataname, + transport = "direct", + encoding = "base64", + size = payload_size, + metadata = Dict{String, Any}("payload_bytes" => payload_size) + ) + push!(payloads, payload) + else + # Link path - Upload to HTTP server, send URL via NATS + log_trace(correlation_id, "Using link transport, uploading to fileserver") + + response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) + + if response["status"] != 200 + error("Failed to upload data to fileserver: $(response["status"])") + end + + url = response["url"] + log_trace(correlation_id, "Uploaded to URL: $url") + + payload = msg_payload_v1( + url, + payload_type; + id = string(uuid4()), + dataname = dataname, + transport = "link", + encoding = "none", + size = payload_size, + metadata = Dict{String, Any}() + ) + push!(payloads, payload) + end + end + + # Create msg_envelope_v1 with all payloads + # Note: First positional argument is "send_to" (the NATS subject), not "subject" + env = msg_envelope_v1( + subject, # send_to: NATS subject to publish to + payloads; + correlation_id = correlation_id, + msg_id = msg_id, + msg_purpose = msg_purpose, + sender_name = sender_name, + sender_id = sender_id, + receiver_name = receiver_name, + receiver_id = receiver_id, + reply_to = reply_to, + reply_to_msg_id = reply_to_msg_id, + broker_url = broker_url, + metadata = Dict{String, Any}(), + ) + + env_json_str = envelope_to_json(env) + + if is_publish == false + # skip publish + elseif is_publish == true && NATS_connection === nothing + publish_message(broker_url, subject, env_json_str, correlation_id) + elseif is_publish == true && NATS_connection !== nothing + publish_message(NATS_connection, subject, env_json_str, correlation_id) + end + + return (env, env_json_str) +end +``` + +#### smartreceive Implementation + +```julia +function smartreceive( + msg::NATS.Msg; + fileserver_download_handler::Function = _fetch_with_backoff, + max_retries::Int = 5, + base_delay::Int = 100, + max_delay::Int = 5000 +)::JSON.Object{String, Any} + # Parse the JSON envelope + env_json_obj = JSON.parse(String(msg.payload)) + log_trace(env_json_obj["correlation_id"], "Processing received message") + + # Process all payloads in the envelope + payloads_list = Tuple{String, Any, String}[] + + num_payloads = length(env_json_obj["payloads"]) + + for i in 1:num_payloads + payload = env_json_obj["payloads"][i] + transport = String(payload["transport"]) + dataname = String(payload["dataname"]) + + if transport == "direct" + log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'") + + # Extract base64 payload from the payload + payload_b64 = String(payload["data"]) + + # Decode Base64 payload + payload_bytes = Base64.base64decode(payload_b64) + + # Deserialize based on type + data_type = String(payload["payload_type"]) + data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"]) + + push!(payloads_list, (dataname, data, data_type)) + elseif transport == "link" + # Extract download URL from the payload + url = String(payload["data"]) + log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") + + # Fetch with exponential backoff using the download handler + downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"]) + + # Deserialize based on type + data_type = String(payload["payload_type"]) + data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"]) + + push!(payloads_list, (dataname, data, data_type)) + else + error("Unknown transport type for payload '$dataname': $(transport)") + end + end + env_json_obj["payloads"] = payloads_list + return env_json_obj +end +``` + +#### _serialize_data Implementation + +```julia +function _serialize_data(data::Any, payload_type::String) + if payload_type == "text" + if isa(data, String) + data_bytes = Vector{UInt8}(data) + return data_bytes + else + error("Text data must be a String") + end + elseif payload_type == "dictionary" + json_str = JSON.json(data) + json_str_bytes = Vector{UInt8}(json_str) + return json_str_bytes + elseif payload_type == "arrowtable" + # Serialize DataFrame to Arrow IPC format + io = IOBuffer() + Arrow.write(io, data) + return take!(io) + elseif payload_type == "jsontable" + # Convert column-oriented to row-oriented JSON + # data is Vector{NamedTuple} or Vector{Dict} + json_str = JSON.json(data) + return Vector{UInt8}(json_str) + elseif payload_type == "image" + if isa(data, Vector{UInt8}) + return data + else + error("Image data must be Vector{UInt8}") + end + elseif payload_type == "audio" + if isa(data, Vector{UInt8}) + return data + else + error("Audio data must be Vector{UInt8}") + end + elseif payload_type == "video" + if isa(data, Vector{UInt8}) + return data + else + error("Video data must be Vector{UInt8}") + end + elseif payload_type == "binary" + if isa(data, IOBuffer) + return take!(data) + elseif isa(data, Vector{UInt8}) + return data + else + error("Binary data must be binary (Vector{UInt8} or IOBuffer)") + end + else + error("Unknown payload_type: $payload_type") + end +end +``` + +#### _deserialize_data Implementation + +```julia +function _deserialize_data( + data::Vector{UInt8}, + payload_type::String, + correlation_id::String +) + if payload_type == "text" + return String(data) + elseif payload_type == "dictionary" + json_str = String(data) + return JSON.parse(json_str) + elseif payload_type == "arrowtable" + # Deserialize from Arrow IPC format + io = IOBuffer(data) + arrow_table = Arrow.Table(io) + return arrow_table + elseif payload_type == "jsontable" + # Deserialize from JSON format + # Returns Vector{NamedTuple} (column-oriented compatible) + json_str = String(data) + parsed = JSON.parse(json_str) + return parsed + elseif payload_type == "image" + return data + elseif payload_type == "audio" + return data + elseif payload_type == "video" + return data + elseif payload_type == "binary" + return data + else + error("Unknown payload_type: $payload_type") + end +end +``` + +#### _fetch_with_backoff Implementation + +```julia +function _fetch_with_backoff( + url::String, + max_retries::Int, + base_delay::Int, + max_delay::Int, + correlation_id::String +) + delay = base_delay + for attempt in 1:max_retries + try + response = HTTP.request("GET", url) + if response.status == 200 + log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") + return response.body + else + error("Failed to fetch: $(response.status)") + end + catch e + log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") + + if attempt < max_retries + sleep(delay / 1000.0) + delay = min(delay * 2, max_delay) + end + end + end + + error("Failed to fetch data after $max_retries attempts") +end +``` + +#### plik_oneshot_upload Implementation + +```julia +function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8}) + # Get upload id + url_getUploadID = "$file_server_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + response_json = JSON.parse(http_response.body) + uploadid = response_json["id"] + uploadtoken = response_json["uploadToken"] + + # Upload file + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$file_server_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict( + "file" => file_multipart + )) + + http_response = nothing + try + http_response = HTTP.post(url_upload, headers, form) + catch e + @error "Request failed" exception=e + end + response_json = JSON.parse(http_response.body) + fileid = response_json["id"] + + url = "$file_server_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end +``` + +--- + +### JavaScript Implementation + +#### Module Structure + +```javascript +// natsbridge.js +const nats = require('nats'); +const { v4: uuidv4 } = require('uuid'); +const fetch = require('node-fetch'); + +const DEFAULT_SIZE_THRESHOLD = 1_000_000; +const DEFAULT_BROKER_URL = 'nats://localhost:4222'; +const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; + +class NATSClient { + constructor(url) { + this.url = url; + this.connection = null; + } + + async connect() { + this.connection = await nats.connect({ servers: this.url }); + return this.connection; + } + + async publish(subject, message) { + if (!this.connection) { + await this.connect(); + } + await this.connection.publish(subject, message); + } + + async close() { + if (this.connection) { + this.connection.close(); + } + } +} + +async function smartsend(subject, data, options = {}) { + // Implementation +} + +async function smartreceive(msg, options = {}) { + // Implementation +} + +module.exports = { + NATSClient, + smartsend, + smartreceive, + plikOneshotUpload, + fetchWithBackoff +}; +``` + +#### smartsend Implementation + +```javascript +const nats = require('nats'); +const { v4: uuidv4 } = require('uuid'); +const fetch = require('node-fetch'); +const arrow = require('apache-arrow'); + +const DEFAULT_SIZE_THRESHOLD = 1_000_000; +const DEFAULT_BROKER_URL = 'nats://localhost:4222'; +const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; + +async function smartsend(subject, data, options = {}) { + const { + broker_url = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler = plikOneshotUpload, + size_threshold = DEFAULT_SIZE_THRESHOLD, + correlation_id = uuidv4(), + msg_purpose = 'chat', + sender_name = 'NATSBridge', + receiver_name = '', + receiver_id = '', + reply_to = '', + reply_to_msg_id = '', + is_publish = true, + nats_connection = null, + msg_id = uuidv4(), + sender_id = uuidv4() + } = options; + + console.log(`[Correlation: ${correlation_id}] Starting smartsend for subject: ${subject}`); + + // Process payloads + const payloads = []; + for (const [dataname, payloadData, payloadType] of data) { + const payloadBytes = await serializeData(payloadData, payloadType); + const payloadSize = payloadBytes.byteLength; + + console.log(`[Correlation: ${correlation_id}] Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); + + if (payloadSize < size_threshold) { + // Direct path + const payloadB64 = bufferToBase64(payloadBytes); + console.log(`[Correlation: ${correlation_id}] Using direct transport for ${payloadSize} bytes`); + + payloads.push({ + id: uuidv4(), + dataname, + payload_type: payloadType, + transport: 'direct', + encoding: 'base64', + size: payloadSize, + data: payloadB64, + metadata: { payload_bytes: payloadSize } + }); + } else { + // Link path + console.log(`[Correlation: ${correlation_id}] Using link transport, uploading to fileserver`); + + const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); + + if (response.status !== 200) { + throw new Error(`Failed to upload data to fileserver: ${response.status}`); + } + + console.log(`[Correlation: ${correlation_id}] Uploaded to URL: ${response.url}`); + + payloads.push({ + id: uuidv4(), + dataname, + payload_type: payloadType, + transport: 'link', + encoding: 'none', + size: payloadSize, + data: response.url, + metadata: {} + }); + } + } + + // Build envelope + const env = { + correlation_id, + msg_id, + timestamp: new Date().toISOString(), + send_to: subject, + msg_purpose, + sender_name, + sender_id, + receiver_name, + receiver_id, + reply_to, + reply_to_msg_id, + broker_url, + metadata: {}, + payloads + }; + + const env_json_str = JSON.stringify(env); + + if (is_publish) { + if (nats_connection) { + await publishMessage(nats_connection, subject, env_json_str, correlation_id); + } else { + await publishMessage(broker_url, subject, env_json_str, correlation_id); + } + } + + return [env, env_json_str]; +} +``` + +#### serializeData Implementation + +```javascript +const arrow = require('apache-arrow'); + +async function serializeData(data, payload_type) { + if (payload_type === 'text') { + if (typeof data === 'string') { + return Buffer.from(data, 'utf8'); + } else { + throw new Error('Text data must be a string'); + } + } else if (payload_type === 'dictionary') { + const jsonStr = JSON.stringify(data); + return Buffer.from(jsonStr, 'utf8'); + } else if (payload_type === 'arrowtable') { + // Convert Array to Arrow IPC + // data is row-oriented: [{id: 1, name: "Alice"}, ...] + if (!Array.isArray(data) || data.length === 0) { + throw new Error('arrowtable data must be a non-empty array of objects'); + } + + // Create schema from first row + const schemaFields = Object.keys(data[0]).map(key => + new arrow.Field(key, arrow.any()) + ); + const schema = new arrow.Schema(schemaFields); + + // Create writer + const writer = new arrow.RecordBatchWriter([schema]); + + // Write rows + for (const row of data) { + const recordBatch = arrow.recordBatch.fromObjects([row], schema); + writer.write(recordBatch); + } + await writer.close(); + + // Read buffer + return writer.toBuffer(); + } else if (payload_type === 'jsontable') { + // data is already row-oriented Array + // Serialize directly to JSON + const jsonStr = JSON.stringify(data); + return Buffer.from(jsonStr, 'utf8'); + } else if (payload_type === 'image') { + if (data instanceof Uint8Array || Buffer.isBuffer(data)) { + return Buffer.from(data); + } else { + throw new Error('Image data must be Uint8Array or Buffer'); + } + } else if (payload_type === 'audio') { + if (data instanceof Uint8Array || Buffer.isBuffer(data)) { + return Buffer.from(data); + } else { + throw new Error('Audio data must be Uint8Array or Buffer'); + } + } else if (payload_type === 'video') { + if (data instanceof Uint8Array || Buffer.isBuffer(data)) { + return Buffer.from(data); + } else { + throw new Error('Video data must be Uint8Array or Buffer'); + } + } else if (payload_type === 'binary') { + if (data instanceof Uint8Array || Buffer.isBuffer(data)) { + return Buffer.from(data); + } else { + throw new Error('Binary data must be Uint8Array or Buffer'); + } + } else { + throw new Error(`Unknown payload_type: ${payload_type}`); + } +} + +function bufferToBase64(buffer) { + return buffer.toString('base64'); +} +``` + +#### deserializeData Implementation + +```javascript +const arrow = require('apache-arrow'); + +async function deserializeData(data, payload_type, correlation_id) { + if (payload_type === 'text') { + return Buffer.from(data).toString('utf8'); + } else if (payload_type === 'dictionary') { + const jsonStr = Buffer.from(data).toString('utf8'); + return JSON.parse(jsonStr); + } else if (payload_type === 'arrowtable') { + // Deserialize from Arrow IPC + const buffer = Buffer.from(data); + const table = arrow.tableFromRawBytes(buffer); + return table; + } else if (payload_type === 'jsontable') { + // Deserialize from JSON - returns Array (row-oriented) + const jsonStr = Buffer.from(data).toString('utf8'); + return JSON.parse(jsonStr); + } else if (payload_type === 'image') { + return Buffer.from(data); + } else if (payload_type === 'audio') { + return Buffer.from(data); + } else if (payload_type === 'video') { + return Buffer.from(data); + } else if (payload_type === 'binary') { + return Buffer.from(data); + } else { + throw new Error(`Unknown payload_type: ${payload_type}`); + } +} +``` + +#### fetchWithBackoff Implementation + +```javascript +async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) { + let delay = base_delay; + + for (let attempt = 1; attempt <= max_retries; attempt++) { + try { + const response = await fetch(url); + + if (response.status === 200) { + console.log(`[Correlation: ${correlation_id}] Successfully fetched data from ${url} on attempt ${attempt}`); + return await response.arrayBuffer(); + } else { + throw new Error(`Failed to fetch: ${response.status}`); + } + } catch (e) { + console.log(`[Correlation: ${correlation_id}] Attempt ${attempt} failed: ${e.constructor.name}`); + + if (attempt < max_retries) { + await new Promise(resolve => setTimeout(resolve, delay)); + delay = Math.min(delay * 2, max_delay); + } + } + } + + throw new Error(`Failed to fetch data after ${max_retries} attempts`); +} +``` + +#### plikOneshotUpload Implementation + +```javascript +async function plikOneshotUpload(file_server_url, dataname, data) { + // Get upload id + const url_getUploadID = `${file_server_url}/upload`; + const headers = { 'Content-Type': 'application/json' }; + const body = JSON.stringify({ OneShot: true }); + + const http_response = await fetch(url_getUploadID, { + method: 'POST', + headers, + body + }); + + const response_json = await http_response.json(); + const uploadid = response_json.id; + const uploadtoken = response_json.uploadToken; + + // Upload file + const url_upload = `${file_server_url}/file/${uploadid}`; + const form = new FormData(); + const blob = new Blob([data]); + form.append('file', blob, dataname); + + const upload_headers = { + 'X-UploadToken': uploadtoken + }; + + const upload_response = await fetch(url_upload, { + method: 'POST', + headers: upload_headers, + body: form + }); + + const upload_json = await upload_response.json(); + const fileid = upload_json.id; + + const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`; + + return { + status: upload_response.status, + uploadid, + fileid, + url + }; +} +``` + +--- + +### Python Implementation + +#### Module Structure + +```python +# natsbridge.py +import asyncio +import base64 +import json +import uuid +import time +from typing import Any, Dict, List, Tuple, Union, Callable +from dataclasses import dataclass, field +from datetime import datetime + +try: + import pyarrow as arrow + import pyarrow.parquet as pq + ARROW_AVAILABLE = True +except ImportError: + ARROW_AVAILABLE = False + +try: + import aiohttp + import nats + from nats.aio.client import Client as NATSClient + NATS_AVAILABLE = True +except ImportError: + NATS_AVAILABLE = False + + +DEFAULT_SIZE_THRESHOLD = 1_000_000 +DEFAULT_BROKER_URL = "nats://localhost:4222" +DEFAULT_FILESERVER_URL = "http://localhost:8080" + + +@dataclass +class MsgPayloadV1: + """Message payload structure.""" + id: str + dataname: str + payload_type: str + transport: str + encoding: str + size: int + data: Union[str, bytes] + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class MsgEnvelopeV1: + """Message envelope structure.""" + correlation_id: str + msg_id: str + timestamp: str + send_to: str + msg_purpose: str + sender_name: str + sender_id: str + receiver_name: str + receiver_id: str + reply_to: str + reply_to_msg_id: str + broker_url: str + metadata: Dict[str, Any] = field(default_factory=dict) + payloads: List[MsgPayloadV1] = field(default_factory=list) + + +class NATSBridge: + """Cross-platform NATS bridge implementation.""" + + def __init__(self, broker_url: str = None, fileserver_url: str = None): + self.broker_url = broker_url or DEFAULT_BROKER_URL + self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL + self._nats_client: NATSClient = None + + async def smartsend(self, subject: str, data: List[Tuple[str, Any, str]], **kwargs) -> Tuple[Dict, str]: + """Send data via NATS.""" + pass + + async def smartreceive(self, msg: Any, **kwargs) -> Dict: + """Receive and process NATS message.""" + pass +``` + +#### smartsend Implementation + +```python +import asyncio +import base64 +import json +import uuid +from typing import Any, Dict, List, Tuple, Union, Callable +from datetime import datetime + +DEFAULT_SIZE_THRESHOLD = 1_000_000 +DEFAULT_BROKER_URL = "nats://localhost:4222" +DEFAULT_FILESERVER_URL = "http://localhost:8080" + + +async def smartsend( + subject: str, + data: List[Tuple[str, Any, str]], + broker_url: str = DEFAULT_BROKER_URL, + fileserver_url: str = DEFAULT_FILESERVER_URL, + fileserver_upload_handler: Callable = plik_oneshot_upload, + size_threshold: int = DEFAULT_SIZE_THRESHOLD, + correlation_id: str = None, + msg_purpose: str = "chat", + sender_name: str = "NATSBridge", + receiver_name: str = "", + receiver_id: str = "", + reply_to: str = "", + reply_to_msg_id: str = "", + is_publish: bool = True, + nats_connection: Any = None, + msg_id: str = None, + sender_id: str = None +) -> Tuple[Dict, str]: + """ + Send data via NATS with automatic transport selection. + + Args: + subject: NATS subject to publish to + data: List of (dataname, data, type) tuples + **kwargs: Additional options + + Returns: + Tuple of (env, env_json_str) + """ + if correlation_id is None: + correlation_id = str(uuid.uuid4()) + if msg_id is None: + msg_id = str(uuid.uuid4()) + if sender_id is None: + sender_id = str(uuid.uuid4()) + + print(f"[Correlation: {correlation_id}] Starting smartsend for subject: {subject}") + + # Process payloads + payloads = [] + for dataname, payload_data, payload_type in data: + payload_bytes = _serialize_data(payload_data, payload_type) + payload_size = len(payload_bytes) + + print(f"[Correlation: {correlation_id}] Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes") + + if payload_size < size_threshold: + # Direct path + payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') + print(f"[Correlation: {correlation_id}] Using direct transport for {payload_size} bytes") + + payloads.append({ + 'id': str(uuid.uuid4()), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'direct', + 'encoding': 'base64', + 'size': payload_size, + 'data': payload_b64, + 'metadata': {'payload_bytes': payload_size} + }) + else: + # Link path + print(f"[Correlation: {correlation_id}] Using link transport, uploading to fileserver") + + response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes) + + if response['status'] != 200: + raise Exception(f"Failed to upload data to fileserver: {response['status']}") + + print(f"[Correlation: {correlation_id}] Uploaded to URL: {response['url']}") + + payloads.append({ + 'id': str(uuid.uuid4()), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'link', + 'encoding': 'none', + 'size': payload_size, + 'data': response['url'], + 'metadata': {} + }) + + # Build envelope + env = { + 'correlation_id': correlation_id, + 'msg_id': msg_id, + 'timestamp': datetime.utcnow().isoformat() + 'Z', + 'send_to': subject, + 'msg_purpose': msg_purpose, + 'sender_name': sender_name, + 'sender_id': sender_id, + 'receiver_name': receiver_name, + 'receiver_id': receiver_id, + 'reply_to': reply_to, + 'reply_to_msg_id': reply_to_msg_id, + 'broker_url': broker_url, + 'metadata': {}, + 'payloads': payloads + } + + env_json_str = json.dumps(env) + + if is_publish: + if nats_connection: + await publish_message(nats_connection, subject, env_json_str, correlation_id) + else: + await publish_message(broker_url, subject, env_json_str, correlation_id) + + return env, env_json_str +``` + +#### serializeData Implementation + +```python +import base64 +import json +from typing import Any + +try: + import pyarrow as arrow + import pyarrow.feather as feather + import pyarrow.ipc as ipc + ARROW_AVAILABLE = True +except ImportError: + ARROW_AVAILABLE = False + + +def _serialize_data(data: Any, payload_type: str) -> bytes: + """Serialize data to bytes based on type.""" + if payload_type == 'text': + if isinstance(data, str): + return data.encode('utf-8') + else: + raise Error('Text data must be a string') + elif payload_type == 'dictionary': + json_str = json.dumps(data) + return json_str.encode('utf-8') + elif payload_type == 'arrowtable': + if not ARROW_AVAILABLE: + raise Error('pyarrow not available for table serialization') + + import io + buf = io.BytesIO() + import pandas as pd + if isinstance(data, pd.DataFrame): + # Column-oriented DataFrame to Arrow + table = arrow.Table.from_pandas(data) + sink = arrow.ipc.new_file(buf) + arrow.ipc.write_table(table, sink) + sink.close() + return buf.getvalue() + else: + raise Error('arrowtable data must be a pandas DataFrame') + elif payload_type == 'jsontable': + # data is list[dict] or list (row-oriented) + # Serialize directly to JSON + json_str = json.dumps(data) + return json_str.encode('utf-8') + elif payload_type == 'image': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Image data must be bytes') + elif payload_type == 'audio': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Audio data must be bytes') + elif payload_type == 'video': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Video data must be bytes') + elif payload_type == 'binary': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Binary data must be bytes') + else: + raise Error(f'Unknown payload_type: {payload_type}') +``` + +#### deserializeData Implementation + +```python +import base64 +import json +from typing import Any + +try: + import pyarrow as arrow + import pyarrow.feather as feather + import pyarrow.ipc as ipc + ARROW_AVAILABLE = True +except ImportError: + ARROW_AVAILABLE = False + + +def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any: + """Deserialize bytes to data based on type.""" + if payload_type == 'text': + return data.decode('utf-8') + elif payload_type == 'dictionary': + json_str = data.decode('utf-8') + return json.loads(json_str) + elif payload_type == 'arrowtable': + if not ARROW_AVAILABLE: + raise Error('pyarrow not available for table deserialization') + + import io + buf = io.BytesIO(data) + reader = arrow.ipc.open_file(buf) + return reader.read_all().to_pandas() + elif payload_type == 'jsontable': + # Deserialize from JSON - returns list[dict] (row-oriented) + json_str = data.decode('utf-8') + return json.loads(json_str) + elif payload_type == 'image': + return data + elif payload_type == 'audio': + return data + elif payload_type == 'video': + return data + elif payload_type == 'binary': + return data + else: + raise Error(f'Unknown payload_type: {payload_type}') +``` + +#### fetchWithBackoff Implementation + +```python +import asyncio +import aiohttp +from typing import Callable + + +async def fetch_with_backoff( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytes: + """Fetch URL with exponential backoff.""" + delay = base_delay + + for attempt in range(1, max_retries + 1): + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + print(f"[Correlation: {correlation_id}] Successfully fetched data from {url} on attempt {attempt}") + return await response.read() + else: + raise Exception(f"Failed to fetch: {response.status}") + except Exception as e: + print(f"[Correlation: {correlation_id}] Attempt {attempt} failed: {type(e).__name__}") + + if attempt < max_retries: + await asyncio.sleep(delay / 1000.0) + delay = min(delay * 2, max_delay) + + raise Exception(f"Failed to fetch data after {max_retries} attempts") +``` + +#### plikOneshotUpload Implementation + +```python +import aiohttp +import json +from typing import Dict, Any + + +async def plik_oneshot_upload( + file_server_url: str, + dataname: str, + data: bytes +) -> Dict[str, Any]: + """Upload data to plik server in one-shot mode.""" + + # Get upload id + async with aiohttp.ClientSession() as session: + url_getUploadID = f"{file_server_url}/upload" + headers = {'Content-Type': 'application/json'} + body = json.dumps({"OneShot": True}) + + async with session.post(url_getUploadID, headers=headers, data=body) as response: + response_json = await response.json() + uploadid = response_json['id'] + uploadtoken = response_json['uploadToken'] + + # Upload file + url_upload = f"{file_server_url}/file/{uploadid}" + headers = {'X-UploadToken': uploadtoken} + + form = aiohttp.FormData() + form.add_field('file', data, filename=dataname, content_type='application/octet-stream') + + async with session.post(url_upload, headers=headers, data=form) as upload_response: + upload_json = await upload_response.json() + fileid = upload_json['id'] + + url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}" + + return { + 'status': upload_response.status, + 'uploadid': uploadid, + 'fileid': fileid, + 'url': url + } +``` + +--- + +## MicroPython Implementation + +### Limitations + +MicroPython has significant constraints compared to desktop implementations: + +| Feature | Desktop | MicroPython | +|---------|---------|-------------| +| Memory | Unlimited | ~256KB - 1MB | +| Arrow IPC | ✅ | ❌ (not supported) | +| Async/Await | ✅ | ⚠️ (uasyncio only) | +| Large payloads (>1MB) | ✅ | ❌ (enforced limit) | +| arrowtable | ✅ | ❌ | +| jsontable | ⚠️ (limited) | ⚠️ (limited) | +| Multiple payloads | ✅ | ⚠️ (limited) | + +### MicroPython Module Structure + +```python +# natsbridge_mpy.py (MicroPython) +import network +import time +import json +import base64 +import uos +import struct + +# Constants +DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython +DEFAULT_BROKER_URL = "nats://localhost:4222" +DEFAULT_FILESERVER_URL = "http://localhost:8080" +MAX_PAYLOAD_SIZE = 50000 # Hard limit + +# Note: MicroPython uses list[list] for jsontable (row-oriented) +# No DataFrame support - data is always row-oriented + + +class NATSBridge: + """MicroPython NATS bridge implementation.""" + + def __init__(self, broker_url=None, fileserver_url=None): + self.broker_url = broker_url or DEFAULT_BROKER_URL + self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL + self._nats_conn = None + + def smartsend(self, subject, data, **kwargs): + """Send data (synchronous).""" + correlation_id = self._generate_uuid() + msg_id = self._generate_uuid() + sender_id = self._generate_uuid() + + print(f"[Correlation: {correlation_id}] Starting smartsend") + + payloads = [] + for dataname, payload_data, payload_type in data: + payload_bytes = self._serialize_data(payload_data, payload_type) + payload_size = len(payload_bytes) + + if payload_size > MAX_PAYLOAD_SIZE: + raise MemoryError(f"Payload {dataname} exceeds max size {MAX_PAYLOAD_SIZE}") + + if payload_size < DEFAULT_SIZE_THRESHOLD: + # Direct path + payload_b64 = base64.b64encode(payload_bytes).decode('ascii') + payloads.append({ + 'id': self._generate_uuid(), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'direct', + 'encoding': 'base64', + 'size': payload_size, + 'data': payload_b64 + }) + else: + # Link path (limited support) + response = self._sync_fileserver_upload(self.fileserver_url, dataname, payload_bytes) + payloads.append({ + 'id': self._generate_uuid(), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'link', + 'encoding': 'none', + 'size': payload_size, + 'data': response['url'] + }) + + env = { + 'correlation_id': correlation_id, + 'msg_id': msg_id, + 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), + 'send_to': subject, + 'msg_purpose': kwargs.get('msg_purpose', 'chat'), + 'sender_name': kwargs.get('sender_name', 'NATSBridge'), + 'sender_id': sender_id, + 'receiver_name': kwargs.get('receiver_name', ''), + 'receiver_id': kwargs.get('receiver_id', ''), + 'reply_to': kwargs.get('reply_to', ''), + 'reply_to_msg_id': kwargs.get('reply_to_msg_id', ''), + 'broker_url': self.broker_url, + 'metadata': {}, + 'payloads': payloads + } + + env_json_str = json.dumps(env) + + # Publish + self._publish(subject, env_json_str, correlation_id) + + return env, env_json_str + + def smartreceive(self, msg, **kwargs): + """Receive and process message (synchronous).""" + env_json_obj = json.loads(msg.payload) + correlation_id = env_json_obj['correlation_id'] + + payloads_list = [] + for payload in env_json_obj['payloads']: + transport = payload['transport'] + dataname = payload['dataname'] + + if transport == 'direct': + payload_b64 = payload['data'] + payload_bytes = base64.b64decode(payload_b64) + data_type = payload['payload_type'] + data = self._deserialize_data(payload_bytes, data_type) + payloads_list.append((dataname, data, data_type)) + elif transport == 'link': + url = payload['data'] + downloaded_data = self._sync_fileserver_download( + url, + kwargs.get('max_retries', 3), + kwargs.get('base_delay', 100), + kwargs.get('max_delay', 1000), + correlation_id + ) + data_type = payload['payload_type'] + data = self._deserialize_data(downloaded_data, data_type) + payloads_list.append((dataname, data, data_type)) + + env_json_obj['payloads'] = payloads_list + return env_json_obj + + def _serialize_data(self, data, payload_type): + """Serialize data (MicroPython version - no arrowtable support).""" + if payload_type == 'text': + return data.encode('utf-8') + elif payload_type == 'dictionary': + return json.dumps(data).encode('utf-8') + elif payload_type == 'jsontable': + # data is list[list] (row-oriented) + return json.dumps(data).encode('utf-8') + elif payload_type in ('image', 'audio', 'video', 'binary'): + return bytes(data) + else: + raise ValueError(f"Unknown payload_type: {payload_type}") + + def _deserialize_data(self, data, payload_type): + """Deserialize data (MicroPython version).""" + if payload_type == 'text': + return data.decode('utf-8') + elif payload_type == 'dictionary': + return json.loads(data.decode('utf-8')) + elif payload_type == 'jsontable': + # Returns list[list] (row-oriented) + return json.loads(data.decode('utf-8')) + elif payload_type in ('image', 'audio', 'video', 'binary'): + return data + else: + raise ValueError(f"Unknown payload_type: {payload_type}") + + def _generate_uuid(self): + """Generate simple UUID (MicroPython compatible).""" + return 'mp-%04x%04x-%04x-%04x-%04x-%04x%04x%04x' % ( + time.time_ns() // (10**6) % 0xFFFFFFFF, + time.time_ns() % 0xFFFFFFFF, + time.time_ns() >> 32 & 0xFFFF, + time.time_ns() >> 48 & 0xFFFF, + time.time_ns() >> 64 & 0xFFFF, + time.time_ns() >> 80 & 0xFFFF, + time.time_ns() >> 96 & 0xFFFF, + time.time_ns() >> 112 & 0xFFFF + ) + + def _sync_fileserver_upload(self, url, dataname, data): + """Synchronous file upload (limited).""" + # Simplified implementation for MicroPython + # In practice, would use network.HTTP or similar + raise NotImplementedError("File upload not implemented in MicroPython") + + def _sync_fileserver_download(self, url, max_retries, base_delay, max_delay, correlation_id): + """Synchronous file download with backoff.""" + # Simplified implementation for MicroPython + raise NotImplementedError("File download not implemented in MicroPython") + + def _publish(self, subject, message, correlation_id): + """Publish message to NATS.""" + # Simplified implementation for MicroPython + raise NotImplementedError("NATS publishing not implemented in MicroPython") +``` + +--- + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `NATS_URL` | `nats://localhost:4222` | NATS server URL | +| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | +| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) | + +### MicroPython Configuration + +```python +# micropython.conf +NATS_URL = "nats://broker.local:4222" +FILESERVER_URL = "http://fileserver.local:8080" +SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices +MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython +``` + +--- + +## Performance Considerations + +### Zero-Copy Reading + +| Platform | Strategy | +|----------|----------| +| **Julia** | `Arrow.read()` with memory-mapped files | +| **JavaScript** | `ArrayBuffer` with `DataView` | +| **Python** | `pyarrow` memory mapping | +| **MicroPython** | Not available (streaming only) | + +### Exponential Backoff + +All platforms implement exponential backoff for HTTP downloads: + +```python +# Python +async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id): + delay = base_delay + for attempt in range(1, max_retries + 1): + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + return await response.read() + except Exception as e: + if attempt < max_retries: + await asyncio.sleep(delay / 1000.0) + delay = min(delay * 2, max_delay) + raise Exception("Failed to fetch after max retries") +``` + +### Correlation ID Logging + +All platforms use correlation IDs for distributed tracing: + +``` +[timestamp] [Correlation: abc123] Message published to subject +``` + +### Serialization Performance + +| Format | Use Case | Pros | Cons | +|--------|----------|------|------| +| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library, not supported in MicroPython | +| `jsontable` | Small/medium tabular data | Human-readable, universal support, works in MicroPython | Slower, larger size, no schema enforcement | + +--- + +## Testing + +### Test File Organization + +| Platform | Sender Tests | Receiver Tests | +|----------|--------------|----------------| +| **Julia** | `test/test_julia_*_sender.jl` | `test/test_julia_*_receiver.jl` | +| **JavaScript** | `test/test_js_*_sender.js` | `test/test_js_*_receiver.js` | +| **Python** | `test/test_py_*_sender.py` | `test/test_py_*_receiver.py` | + +### Run Tests + +```bash +# Julia +julia test/test_julia_text_sender.jl +julia test/test_julia_text_receiver.jl + +# JavaScript (Node.js) +node test/test_js_text_sender.js +node test/test_js_text_receiver.js + +# Python +python3 test/test_py_text_sender.py +python3 test/test_py_text_receiver.py +``` + +--- + +## Troubleshooting + +### Common Issues + +1. **NATS Connection Failed** + - Ensure NATS server is running + - Check `broker_url` configuration + +2. **HTTP Upload Failed** + - Ensure file server is running + - Check `fileserver_url` configuration + - Verify upload permissions + +3. **Arrow IPC Deserialization Error** + - Ensure data is properly serialized to Arrow format + - Check Arrow version compatibility + - MicroPython doesn't support Arrow IPC + +4. **Memory Constraints (MicroPython)** + - Reduce `size_threshold` + - Use direct transport only (< 100KB) + - Avoid large payloads + - Use `jsontable` instead of `arrowtable` (arrowtable not supported) + +5. **Row-Oriented vs Column-Oriented Conversion Issues** + - Julia/Python: DataFrames are column-oriented; when sending `jsontable`, they are converted to row-oriented JSON + - JavaScript/MicroPython: Data is natively row-oriented + - When receiving `jsontable` in Julia/Python, JSON is automatically converted back to column-oriented DataFrame + +--- + +## Summary + +This cross-platform NATS bridge provides: + +1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across all platforms +2. **Idiomatic Implementations**: + - **Julia**: Multiple dispatch, struct-based design, native Arrow IPC + - **JavaScript**: Async/await, prototype-based utilities, class-based NATS client + - **Python**: Class-based design with dataclasses, type hints, async/await + - **MicroPython**: Synchronous API, memory-constrained optimizations +3. **Message Format Consistency**: Identical JSON schemas across all platforms +4. **Handler Abstraction**: File server operations abstracted through configurable handlers +5. **Platform-Specific Optimizations**: + - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython) + - **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in all platforms) +6. **Row-Oriented ↔ Column-Oriented Conversion**: Automatic conversion between row-oriented (JS, MicroPython) and column-oriented (Julia DataFrame, Python pandas) formats when using `jsontable` + +The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior. + +### Datatype Summary + +| Datatype | Serialization | Use Case | Encoding | Supported Platforms | +|----------|---------------|----------|----------|---------------------| +| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python | +| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python, MicroPython |