From 060c68cd05c1539ebc3d2e9fb92ad6ddd9326116 Mon Sep 17 00:00:00 2001 From: narawat Date: Thu, 5 Mar 2026 11:00:46 +0700 Subject: [PATCH] update --- docs/expanded_architecture.md | 1136 ++++++++++++++++++ docs/expanded_implementation.md | 1986 +++++++++++++++++++++++++++++++ src/natbridge.js | 719 +++++++++++ src/natbridge.py | 784 ++++++++++++ src/natbridge_mpy.py | 728 +++++++++++ 5 files changed, 5353 insertions(+) create mode 100644 docs/expanded_architecture.md create mode 100644 docs/expanded_implementation.md create mode 100644 src/natbridge.js create mode 100644 src/natbridge.py create mode 100644 src/natbridge_mpy.py diff --git a/docs/expanded_architecture.md b/docs/expanded_architecture.md new file mode 100644 index 0000000..6d12eec --- /dev/null +++ b/docs/expanded_architecture.md @@ -0,0 +1,1136 @@ +# Cross-Platform Architecture Documentation: Bi-Directional Data Bridge + +## Overview + +This document describes the architecture for a high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. + +**Supported Platforms:** +- **Julia** - Ground truth implementation with full feature set +- **JavaScript** - Node.js and browser-compatible implementation +- **Python/MicroPython** - Desktop and embedded-compatible implementation + +### Cross-Platform Design Principles + +1. **High-Level API Parity**: All three platforms expose the same `smartsend()` and `smartreceive()` functions with identical signatures and behavior +2. **Idiomatic Implementations**: Each platform uses its native patterns (multiple dispatch in Julia, async/prototype in JS, class-based in Python) +3. **Message Format Consistency**: The `msg_envelope_v1` and `msg_payload_v1` JSON schemas are identical across all platforms +4. **Handler Function Abstraction**: File server operations are abstracted through handler functions for backend flexibility + +--- + +## High-Level API Standard (Cross-Platform) + +### Unified API Signature + +All three platforms expose the same high-level API: + +**Input Format (smartsend):** +``` +[(dataname1, data1, type1), (dataname2, data2, type2), ...] +``` + +**Output Format (smartreceive):** +``` +{ + "correlation_id": "...", + "msg_id": "...", + "timestamp": "...", + "send_to": "...", + "msg_purpose": "...", + "sender_name": "...", + "sender_id": "...", + "receiver_name": "...", + "receiver_id": "...", + "reply_to": "...", + "reply_to_msg_id": "...", + "broker_url": "...", + "metadata": {...}, + "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] +} +``` + +### Supported Payload Types + +| Type | Julia | JavaScript | Python/MicroPython | +|------|-------|------------|-------------------| +| `text` | `String` | `string` | `str` | +| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | +| `table` | `DataFrame`, `Arrow.Table` | `Array`, `Buffer` (Arrow) | `pandas.DataFrame`, `bytes` (Arrow) | +| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | +| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | +| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | +| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray`, `io.BytesIO` | + +### Cross-Platform API Examples + +**Julia:** +```julia +using NATSBridge + +# Send +env, env_json_str = smartsend( + "/chat", + [("message", "Hello!", "text"), ("image", image_bytes, "image")], + broker_url="nats://localhost:4222" +) + +# Receive +env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) +# env["payloads"] = [("message", "Hello!", "text"), ("image", bytes, "image")] +``` + +**JavaScript:** +```javascript +const NATSBridge = require('natbridge'); + +// Send +const [env, env_json_str] = await NATSBridge.smartsend( + "/chat", + [ + ["message", "Hello!", "text"], + ["image", imageBuffer, "image"] + ], + { broker_url: "nats://localhost:4222" } +); + +// Receive +const env = await NATSBridge.smartreceive(msg, { + fileserver_download_handler: fetchWithBackoff +}); +// env.payloads = [["message", "Hello!", "text"], ["image", bytes, "image"]] +``` + +**Python:** +```python +from natbridge import NATSBridge + +# Send +env, env_json_str = NATSBridge.smartsend( + "/chat", + [("message", "Hello!", "text"), ("image", image_bytes, "image")], + broker_url="nats://localhost:4222" +) + +# Receive +env = NATSBridge.smartreceive( + msg, + fileserver_download_handler=fetch_with_backoff +) +# env["payloads"] = [("message", "Hello!", "text"), ("image", bytes, "image")] +``` + +**MicroPython:** +```python +from natbridge import NATSBridge + +# Send (limited to direct transport due to memory constraints) +env, env_json_str = NATSBridge.smartsend( + "/chat", + [("message", "Hello!", "text")], + broker_url="nats://localhost:4222" +) +``` + +--- + +## Architecture Diagram (Cross-Platform) + +```mermaid +flowchart TB + subgraph JuliaApp["Julia Application"] + JuliaAppCode[App Code] + JuliaBridge[NATSBridge.jl] + JuliaNATS[NATS.jl] + end + + subgraph JSApp["JavaScript Application"] + JSAppCode[App Code] + JSBridge[NATSBridge.js] + JSNATS[nats.js] + end + + subgraph PythonApp["Python/MicroPython Application"] + PythonAppCode[App Code] + PythonBridge[NATSBridge.py] + PythonNATS[nats.py] + end + + subgraph Infrastructure["Infrastructure"] + NATS[NATS Server
Message Broker] + FileServer[HTTP File Server
Upload/Download] + end + + JuliaAppCode --> JuliaBridge + JuliaBridge --> JuliaNATS + JSAppCode --> JSBridge + JSBridge --> JSNATS + PythonAppCode --> PythonBridge + PythonBridge --> PythonNATS + + JuliaNATS --> NATS + JSNATS --> NATS + PythonNATS --> NATS + + NATS --> JuliaNATS + NATS --> JSNATS + NATS --> PythonNATS + + JuliaBridge -.->|HTTP POST upload| FileServer + JSBridge -.->|HTTP POST upload| FileServer + PythonBridge -.->|HTTP POST upload| FileServer + + FileServer -.->|HTTP GET download| JuliaBridge + FileServer -.->|HTTP GET download| JSBridge + FileServer -.->|HTTP GET download| PythonBridge + + style JuliaApp fill:#c5e1a5 + style JSApp fill:#bbdefb + style PythonApp fill:#f8bbd0 + style NATS fill:#fff3e0 + style FileServer fill:#f3e5f5 +``` + +--- + +## System Components + +### 1. msg_envelope_v1 - Message Envelope + +**JSON Schema (Identical Across All Platforms):** +```json +{ + "correlation_id": "uuid-v4-string", + "msg_id": "uuid-v4-string", + "timestamp": "2024-01-15T10:30:00Z", + + "send_to": "topic/subject", + "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat", + "sender_name": "agent-wine-web-frontend", + "sender_id": "uuid4", + "receiver_name": "agent-backend", + "receiver_id": "uuid4", + "reply_to": "topic", + "reply_to_msg_id": "uuid4", + "broker_url": "nats://localhost:4222", + + "metadata": { + "content_type": "application/octet-stream", + "content_length": 123456 + }, + + "payloads": [ + { + "id": "uuid4", + "dataname": "login_image", + "payload_type": "image", + "transport": "direct", + "encoding": "base64", + "size": 15433, + "data": "base64-encoded-string", + "metadata": { + "checksum": "sha256_hash" + } + }, + { + "id": "uuid4", + "dataname": "large_table", + "payload_type": "table", + "transport": "link", + "encoding": "none", + "size": 524288, + "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow", + "metadata": {} + } + ] +} +``` + +### 2. msg_payload_v1 - Payload Structure + +**JSON Schema (Identical Across All Platforms):** +```json +{ + "id": "uuid4", + "dataname": "login_image", + "payload_type": "image | dictionary | table | text | audio | video | binary", + "transport": "direct | link", + "encoding": "none | json | base64 | arrow-ipc", + "size": 15433, + "data": "base64-encoded-string | http-url", + "metadata": { + "checksum": "sha256_hash" + } +} +``` + +### 3. Transport Strategy Decision Logic (Cross-Platform) + +``` +┌─────────────────────────────────────────────────────────────┐ +│ smartsend Function (All Platforms) │ +│ Accepts: [(dataname1, data1, type1), ...] │ +│ (Type is per payload, not standalone) │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ For each payload: │ +│ 1. Extract type from tuple/array │ +│ 2. Serialize based on type │ +│ 3. Check payload size │ +└─────────────────────────────────────────────────────────────┘ + │ + ┌───────────┴────────────┐ + ▼ ▼ + ┌──────────────┐ ┌──────────────┐ + │ Direct Path │ │ Link Path │ + │ (< 1MB) │ │ (>= 1MB) │ + │ │ │ │ + │ • Serialize │ │ • Serialize │ + │ to buffer │ │ to buffer │ + │ • Base64 │ │ • Upload to │ + │ encode │ │ HTTP Server│ + │ • Publish to │ │ • Publish to │ + │ NATS │ │ NATS with │ + │ (in msg) │ │ URL │ + └──────────────┘ └──────────────┘ +``` + +--- + +## Platform-Specific Implementations + +### Julia Implementation + +#### Architecture Patterns + +**Multiple Dispatch:** Julia's core strength is leveraged through function overloading: + +```julia +# publish_message has two overloads based on argument types +function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) + conn = NATS.connect(broker_url) + publish_message(conn, subject, message, correlation_id) +end + +function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) + try + NATS.publish(conn, subject, message) + log_trace(correlation_id, "Message published to $subject") + finally + NATS.drain(conn) + end +end +``` + +**Struct-Based Data Models:** +```julia +struct msg_payload_v1 + id::String + dataname::String + payload_type::String + transport::String + encoding::String + size::Integer + data::Any + metadata::Dict{String, Any} +end + +struct msg_envelope_v1 + correlation_id::String + msg_id::String + timestamp::String + send_to::String + msg_purpose::String + sender_name::String + sender_id::String + receiver_name::String + receiver_id::String + reply_to::String + reply_to_msg_id::String + broker_url::String + metadata::Dict{String, Any} + payloads::Vector{msg_payload_v1} +end +``` + +#### Dependencies + +| Package | Purpose | +|---------|---------| +| `NATS.jl` | Core NATS functionality | +| `Arrow.jl` | Arrow IPC serialization | +| `JSON3.jl` | JSON parsing | +| `HTTP.jl` | HTTP client for file server | +| `UUIDs.jl` | UUID generation | +| `Dates.jl` | Timestamps | +| `Base64` | Base64 encoding | + +#### File Server Handler Signatures + +```julia +# Upload handler +fileserver_upload_handler( + fileserver_url::String, + dataname::String, + data::Vector{UInt8} +)::Dict{String, Any} + +# Download handler +fileserver_download_handler( + url::String, + max_retries::Int, + base_delay::Int, + max_delay::Int, + correlation_id::String +)::Vector{UInt8} +``` + +#### Key Functions + +```julia +# Main send/receive functions +function smartsend( + subject::String, + data::AbstractArray{Tuple{String, Any, String}, 1}; + broker_url::String = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler::Function = plik_oneshot_upload, + size_threshold::Int = DEFAULT_SIZE_THRESHOLD, + correlation_id::String = string(uuid4()), + msg_purpose::String = "chat", + sender_name::String = "NATSBridge", + receiver_name::String = "", + receiver_id::String = "", + reply_to::String = "", + reply_to_msg_id::String = "", + is_publish::Bool = true, + NATS_connection::Union{NATS.Connection, Nothing} = nothing, + msg_id::String = string(uuid4()), + sender_id::String = string(uuid4()) +)::Tuple{msg_envelope_v1, String} + +function smartreceive( + msg::NATS.Msg; + fileserver_download_handler::Function = _fetch_with_backoff, + max_retries::Int = 5, + base_delay::Int = 100, + max_delay::Int = 5000 +)::JSON.Object{String, Any} +``` + +--- + +### JavaScript Implementation + +#### Architecture Patterns + +**Async/Await Pattern:** JavaScript uses async/await for non-blocking I/O: + +```javascript +// smartsend is async and returns a Promise +async function smartsend(subject, data, options = {}) { + const { + broker_url = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler = plikOneshotUpload, + size_threshold = DEFAULT_SIZE_THRESHOLD, + correlation_id = generateUUID(), + msg_purpose = "chat", + sender_name = "NATSBridge", + receiver_name = "", + receiver_id = "", + reply_to = "", + reply_to_msg_id = "", + is_publish = true, + nats_connection = null, + msg_id = generateUUID(), + sender_id = generateUUID() + } = options; + + // Process payloads + const payloads = []; + for (const [dataname, payloadData, payloadType] of data) { + const payloadBytes = await serializeData(payloadData, payloadType); + const payloadSize = payloadBytes.byteLength; + + if (payloadSize < size_threshold) { + // Direct path + const payloadB64 = base64Encode(payloadBytes); + payloads.push({ + id: generateUUID(), + dataname, + payload_type: payloadType, + transport: "direct", + encoding: "base64", + size: payloadSize, + data: payloadB64 + }); + } else { + // Link path + const response = await fileserver_upload_handler( + fileserver_url, dataname, payloadBytes + ); + payloads.push({ + id: generateUUID(), + dataname, + payload_type: payloadType, + transport: "link", + encoding: "none", + size: payloadSize, + data: response.url + }); + } + } + + const env = buildEnvelope(subject, payloads, { + correlation_id, msg_id, msg_purpose, + sender_name, sender_id, receiver_name, + receiver_id, reply_to, reply_to_msg_id, + broker_url + }); + + const env_json_str = JSON.stringify(env); + + if (is_publish) { + if (nats_connection) { + await publishMessage(nats_connection, subject, env_json_str, correlation_id); + } else { + await publishMessage(broker_url, subject, env_json_str, correlation_id); + } + } + + return [env, env_json_str]; +} +``` + +**Prototype-Based Utilities:** +```javascript +// NATS client wrapper (prototype-based) +class NATSClient { + constructor(url) { + this.url = url; + this.connection = null; + } + + async connect() { + this.connection = await nats.connect({ servers: this.url }); + return this.connection; + } + + async publish(subject, message) { + if (!this.connection) { + await this.connect(); + } + await this.connection.publish(subject, message); + } + + async close() { + if (this.connection) { + this.connection.close(); + } + } +} +``` + +#### Dependencies (Node.js) + +| Package | Purpose | +|---------|---------| +| `nats` | Core NATS functionality (nats.js) | +| `uuid` | UUID generation | +| `node-fetch` or `axios` | HTTP client for file server | +| `apache-arrow` | Arrow IPC serialization | + +#### Dependencies (Browser) + +| Package | Purpose | +|---------|---------| +| `nats` | Browser-compatible NATS client | +| `uuid` | UUID generation | +| `fetch` (native) | HTTP client for file server | +| `apache-arrow` | Arrow IPC serialization | + +#### Dependencies (MicroPython) + +| Module | Purpose | +|--------|---------| +| `nats` (custom) | MicroPython NATS client | +| `time` | Timestamps | +| `uos` | File operations | +| `base64` | Base64 encoding | + +#### File Server Handler Signatures + +```javascript +// Upload handler - async function returning Promise +async function fileserver_upload_handler( + fileserver_url, + dataname, + data // Uint8Array +) { + // Returns: { status, uploadid, fileid, url } +} + +// Download handler - async function returning Promise +async function fileserver_download_handler( + url, + max_retries, + base_delay, + max_delay, + correlation_id +) { + // Returns: Uint8Array +} +``` + +#### Key Functions + +```javascript +// Main send/receive functions +async function smartsend(subject, data, options = {}) { + // data: Array of [dataname, data, type] tuples + // Returns: Promise<[env, env_json_str]> +} + +async function smartreceive(msg, options = {}) { + // msg: NATS message object + // Returns: Promise +} + +// Utility functions +async function serializeData(data, payload_type) { + // Returns: Uint8Array +} + +async function deserializeData(data, payload_type) { + // Returns: deserialized data +} + +async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) { + // Returns: Uint8Array +} +``` + +--- + +### Python/MicroPython Implementation + +#### Architecture Patterns + +**Class-Based Design:** Python uses classes for stateful operations: + +```python +class NATSBridge: + """Cross-platform NATS bridge implementation.""" + + DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB + DEFAULT_BROKER_URL = "nats://localhost:4222" + DEFAULT_FILESERVER_URL = "http://localhost:8080" + + def __init__(self, broker_url=None, fileserver_url=None): + self.broker_url = broker_url or self.DEFAULT_BROKER_URL + self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL + self._nats_client = None + + async def smartsend(self, subject, data, **kwargs): + """ + Send data via NATS with automatic transport selection. + + Args: + subject: NATS subject to publish to + data: List of (dataname, data, type) tuples + **kwargs: Additional options (broker_url, fileserver_url, etc.) + + Returns: + Tuple of (env, env_json_str) + """ + # Extract options with defaults + options = self._merge_options(kwargs) + + # Process payloads + payloads = [] + for dataname, payload_data, payload_type in data: + payload_bytes = self._serialize_data(payload_data, payload_type) + payload_size = len(payload_bytes) + + if payload_size < options['size_threshold']: + # Direct path + payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') + payloads.append({ + 'id': uuid.uuid4().hex, + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'direct', + 'encoding': 'base64', + 'size': payload_size, + 'data': payload_b64 + }) + else: + # Link path + response = await options['fileserver_upload_handler']( + options['fileserver_url'], dataname, payload_bytes + ) + payloads.append({ + 'id': uuid.uuid4().hex, + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'link', + 'encoding': 'none', + 'size': payload_size, + 'data': response['url'] + }) + + # Build envelope + env = self._build_envelope(subject, payloads, options) + env_json_str = json.dumps(env) + + if options['is_publish']: + await self._publish_message( + subject, env_json_str, options['correlation_id'], + nats_connection=options.get('nats_connection') + ) + + return env, env_json_str + + async def smartreceive(self, msg, **kwargs): + """ + Receive and process NATS message. + + Args: + msg: NATS message object + **kwargs: Additional options (fileserver_download_handler, etc.) + + Returns: + Dict with envelope metadata and payloads + """ + # Parse envelope + env_json_obj = json.loads(msg.payload) + + # Process payloads + payloads_list = [] + for payload in env_json_obj['payloads']: + transport = payload['transport'] + dataname = payload['dataname'] + + if transport == 'direct': + payload_b64 = payload['data'] + payload_bytes = base64.b64decode(payload_b64) + data_type = payload['payload_type'] + data = self._deserialize_data(payload_bytes, data_type) + payloads_list.append((dataname, data, data_type)) + elif transport == 'link': + url = payload['data'] + downloaded_data = await options['fileserver_download_handler']( + url, + options['max_retries'], + options['base_delay'], + options['max_delay'], + env_json_obj['correlation_id'] + ) + data_type = payload['payload_type'] + data = self._deserialize_data(downloaded_data, data_type) + payloads_list.append((dataname, data, data_type)) + + env_json_obj['payloads'] = payloads_list + return env_json_obj +``` + +**Dataclass for Type Safety:** +```python +from dataclasses import dataclass, field +from typing import Any, Dict, List, Tuple, Union + +@dataclass +class MsgPayloadV1: + """Message payload structure.""" + id: str + dataname: str + payload_type: str + transport: str + encoding: str + size: int + data: Union[str, bytes] # URL for link, base64 for direct + metadata: Dict[str, Any] = field(default_factory=dict) + +@dataclass +class MsgEnvelopeV1: + """Message envelope structure.""" + correlation_id: str + msg_id: str + timestamp: str + send_to: str + msg_purpose: str + sender_name: str + sender_id: str + receiver_name: str + receiver_id: str + reply_to: str + reply_to_msg_id: str + broker_url: str + metadata: Dict[str, Any] = field(default_factory=dict) + payloads: List[MsgPayloadV1] = field(default_factory=list) +``` + +#### Dependencies (Desktop Python) + +| Package | Purpose | +|---------|---------| +| `nats-py` | Core NATS functionality | +| `uuid` | UUID generation (stdlib) | +| `aiohttp` or `requests` | HTTP client for file server | +| `pyarrow` | Arrow IPC serialization | +| `pandas` | DataFrame support (optional) | +| `python-dateutil` | Timestamps | +| `base64` | Base64 encoding (stdlib) | + +#### Dependencies (MicroPython) + +| Module | Purpose | +|--------|---------| +| `network` | NATS connection (custom) | +| `time` | Timestamps | +| `uos` | File operations | +| `base64` | Base64 encoding | +| `json` | JSON parsing | +| `struct` | Binary data handling | + +**MicroPython Limitations:** +- No Arrow IPC support (memory constraints) +- Only direct transport (< 1MB threshold enforced) +- Simplified UUID generation +- No async/await (use callbacks or uasyncio) + +#### File Server Handler Signatures + +```python +# Upload handler - async function +async def fileserver_upload_handler( + fileserver_url: str, + dataname: str, + data: bytes +) -> Dict[str, Any]: + """ + Upload data to file server. + + Args: + fileserver_url: Base URL of file server + dataname: Name of the file + data: Binary data + + Returns: + Dict with keys: 'status', 'uploadid', 'fileid', 'url' + """ + pass + +# Download handler - async function +async def fileserver_download_handler( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytes: + """ + Download data from URL with exponential backoff. + + Args: + url: URL to download from + max_retries: Maximum retry attempts + base_delay: Initial delay in ms + max_delay: Maximum delay in ms + correlation_id: Correlation ID for logging + + Returns: + Downloaded bytes + """ + pass +``` + +#### Key Functions + +```python +# Main send/receive functions (standalone or class methods) +async def smartsend( + subject: str, + data: List[Tuple[str, Any, str]], + broker_url: str = DEFAULT_BROKER_URL, + fileserver_url: str = DEFAULT_FILESERVER_URL, + fileserver_upload_handler: Callable = plik_oneshot_upload, + size_threshold: int = DEFAULT_SIZE_THRESHOLD, + correlation_id: str = None, + msg_purpose: str = "chat", + sender_name: str = "NATSBridge", + receiver_name: str = "", + receiver_id: str = "", + reply_to: str = "", + reply_to_msg_id: str = "", + is_publish: bool = True, + nats_connection: Any = None, + msg_id: str = None, + sender_id: str = None +) -> Tuple[Dict, str]: + """Send data via NATS.""" + pass + +async def smartreceive( + msg: Any, + fileserver_download_handler: Callable = fetch_with_backoff, + max_retries: int = 5, + base_delay: int = 100, + max_delay: int = 5000 +) -> Dict: + """Receive and process NATS message.""" + pass + +# Utility functions +def _serialize_data(data: Any, payload_type: str) -> bytes: + """Serialize data to bytes.""" + pass + +def _deserialize_data(data: bytes, payload_type: str) -> Any: + """Deserialize bytes to data.""" + pass + +async def fetch_with_backoff( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytes: + """Fetch URL with exponential backoff.""" + pass +``` + +--- + +## Platform Comparison Matrix + +| Feature | Julia | JavaScript | Python | MicroPython | +|---------|-------|------------|--------|-------------| +| **Multiple Dispatch** | ✅ Native | ❌ (Prototypes) | ❌ (Overload via `@overload`) | ❌ | +| **Async/Await** | ❌ (Tasks) | ✅ Native | ✅ Native | ⚠️ (uasyncio) | +| **Type Safety** | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ | +| **Memory Management** | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) | +| **Arrow IPC** | ✅ Native | ✅ (arrow package) | ✅ (pyarrow) | ❌ | +| **Direct Transport** | ✅ | ✅ | ✅ | ✅ | +| **Link Transport** | ✅ | ✅ | ✅ | ⚠️ (Limited) | +| **Handler Functions** | ✅ | ✅ | ✅ | ✅ | +| **Cross-Platform API** | ✅ | ✅ | ✅ | ✅ | + +--- + +## Implementation Details by Platform + +### Julia: Multiple Dispatch Pattern + +```julia +# Function overloading based on argument types +function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) + # Creates new connection +end + +function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) + # Uses pre-existing connection +end + +# Type-specific serialization +function _serialize_data(data::String, payload_type::String) + # Text handling +end + +function _serialize_data(data::Dict, payload_type::String) + # Dictionary handling +end + +function _serialize_data(data::DataFrame, payload_type::String) + # Table handling +end +``` + +### JavaScript: Prototype + Async Pattern + +```javascript +// Class-based NATS client +class NATSClient { + constructor(url) { + this.url = url; + } + + async connect() { + // Connection logic + } + + async publish(subject, message) { + // Publish logic + } +} + +// Module-level utility functions +function generateUUID() { + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => { + const r = Math.random() * 16 | 0; + return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16); + }); +} + +async function serializeData(data, payload_type) { + // Serialization logic +} +``` + +### Python: Class-Based Pattern + +```python +class NATSBridge: + """Main bridge class.""" + + def __init__(self, broker_url=None): + self.broker_url = broker_url or DEFAULT_BROKER_URL + + async def smartsend(self, subject, data, **kwargs): + """Send data.""" + pass + + async def smartreceive(self, msg, **kwargs): + """Receive message.""" + pass + +# Module-level convenience functions +def smartsend(subject, data, **kwargs): + """Convenience function using default NATSBridge instance.""" + bridge = NATSBridge() + return await bridge.smartsend(subject, data, **kwargs) + +def smartreceive(msg, **kwargs): + """Convenience function using default NATSBridge instance.""" + bridge = NATSBridge() + return await bridge.smartreceive(msg, **kwargs) +``` + +--- + +## Scenario Implementations (Cross-Platform) + +### Scenario 1: Command & Control (Small Dictionary) + +| Platform | Code | +|----------|------| +| **Julia** | ```julia
config = Dict("step_size" => 0.01)
env, env_json_str = smartsend("control", [("config", config, "dictionary")])``` | +| **JavaScript** | ```javascript
const config = { step_size: 0.01 };
[env, env_json_str] = await smartsend("control", [["config", config, "dictionary"]]);``` | +| **Python** | ```python
config = {"step_size": 0.01}
env, env_json_str = await smartsend("control", [("config", config, "dictionary")])``` | + +### Scenario 2: Deep Dive Analysis (Large Arrow Table) + +| Platform | Code | +|----------|------| +| **Julia** | ```julia
df = DataFrame(id=1:1000000, value=rand(1000000))
env, env_json_str = smartsend("analysis", [("table", df, "table")])``` | +| **JavaScript** | ```javascript
const df = [{ id: 1, value: 0.5 }, ...];
[env, env_json_str] = await smartsend("analysis", [["table", df, "table"]]);``` | +| **Python** | ```python
import pandas as pd
df = pd.DataFrame({"id": range(1000000), "value": np.random.rand(1000000)})
env, env_json_str = await smartsend("analysis", [("table", df, "table")])``` | + +### Scenario 3: Chat System (Multi-Payload) + +| Platform | Code | +|----------|------| +| **Julia** | ```julia
chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]
env, env_json_str = smartsend("chat", chat)``` | +| **JavaScript** | ```javascript
const chat = [["text", "Hello!", "text"], ["image", imgBuffer, "image"]];
[env, env_json_str] = await smartsend("chat", chat);``` | +| **Python** | ```python
chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]
env, env_json_str = await smartsend("chat", chat)``` | + +--- + +## Performance Considerations (Cross-Platform) + +### Zero-Copy Reading + +| Platform | Strategy | +|----------|----------| +| **Julia** | `Arrow.read()` with memory-mapped files | +| **JavaScript** | `ArrayBuffer` with `DataView` | +| **Python** | `pyarrow` memory mapping | +| **MicroPython** | Not available (streaming only) | + +### Exponential Backoff + +```python +# Python/MicroPython +async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id): + delay = base_delay + for attempt in range(1, max_retries + 1): + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + return await response.read() + except Exception as e: + if attempt < max_retries: + await asyncio.sleep(delay / 1000.0) + delay = min(delay * 2, max_delay) + raise Exception("Failed to fetch after max retries") +``` + +### Correlation ID Logging + +All platforms use correlation IDs for distributed tracing: + +``` +[timestamp] [Correlation: abc123] Message published to subject +``` + +--- + +## Testing Strategy (Cross-Platform) + +### Unit Tests + +| Test Type | Julia | JavaScript | Python | +|-----------|-------|------------|--------| +| **Serialization** | `test/test_julia_text_sender.jl` | `test/test_js_text_sender.js` | `test/test_py_text_sender.py` | +| **Deserialization** | `test/test_julia_text_receiver.jl` | `test/test_js_text_receiver.js` | `test/test_py_text_receiver.py` | +| **Large Payload** | `test/test_julia_file_sender.jl` | `test/test_js_file_sender.js` | `test/test_py_file_sender.py` | +| **Multi-Payload** | `test/test_julia_mix_payloads_sender.jl` | `test/test_js_mix_payloads_sender.js` | `test/test_py_mix_payloads_sender.py` | + +### Integration Tests + +- NATS server communication +- File server upload/download +- Cross-platform message exchange + +--- + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `NATS_URL` | `nats://localhost:4222` | NATS server URL | +| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | +| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) | + +### MicroPython-Specific Configuration + +```python +# micropython.conf +NATS_URL = "nats://broker.local:4222" +FILESERVER_URL = "http://fileserver.local:8080" +SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices +MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython +``` + +--- + +## Summary + +This cross-platform NATS bridge provides: + +1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across Julia, JavaScript, and Python/MicroPython +2. **Idiomatic Implementations**: + - Julia: Multiple dispatch and struct-based design + - JavaScript: Async/await and prototype-based utilities + - Python: Class-based design with type hints +3. **Message Format Consistency**: Identical `msg_envelope_v1` and `msg_payload_v1` JSON schemas +4. **Handler Abstraction**: File server operations abstracted through configurable handlers +5. **Platform-Specific Optimizations**: Arrow IPC support in desktop platforms, streaming support in MicroPython + +The Julia implementation serves as the **ground truth** for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms. \ No newline at end of file diff --git a/docs/expanded_implementation.md b/docs/expanded_implementation.md new file mode 100644 index 0000000..417a150 --- /dev/null +++ b/docs/expanded_implementation.md @@ -0,0 +1,1986 @@ +# Cross-Platform Implementation Guide: Bi-Directional Data Bridge + +## Overview + +This document describes the implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. + +**Supported Platforms:** +- **Julia** - Ground truth implementation (reference) +- **JavaScript** - Node.js and browser implementation +- **Python/MicroPython** - Desktop and embedded implementation + +--- + +## Implementation Files + +| Language | Implementation File | Description | +|----------|---------------------|-------------| +| **Julia** | [`src/NATSBridge.jl`](../src/NATSBridge.jl) | Full Julia implementation with Arrow IPC support | +| **JavaScript** | `src/natbridge.js` | Node.js/browser implementation | +| **Python** | `src/natbridge.py` | Desktop Python implementation | +| **MicroPython** | `src/natbridge_mpy.py` | MicroPython implementation (limited features) | + +--- + +## File Server Handler Architecture + +The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). + +### Handler Function Signatures + +#### Julia + +```julia +# Upload handler - uploads data to file server and returns URL +fileserver_upload_handler( + fileserver_url::String, + dataname::String, + data::Vector{UInt8} +)::Dict{String, Any} + +# Download handler - fetches data from file server URL with exponential backoff +fileserver_download_handler( + url::String, + max_retries::Int, + base_delay::Int, + max_delay::Int, + correlation_id::String +)::Vector{UInt8} +``` + +#### JavaScript + +```javascript +// Upload handler - async function +async function fileserver_upload_handler( + fileserver_url, + dataname, + data // Uint8Array +) { + // Returns: { status, uploadid, fileid, url } +} + +// Download handler - async function +async function fileserver_download_handler( + url, + max_retries, + base_delay, + max_delay, + correlation_id +) { + // Returns: Uint8Array +} +``` + +#### Python + +```python +# Upload handler - async function +async def fileserver_upload_handler( + fileserver_url: str, + dataname: str, + data: bytes +) -> Dict[str, Any]: + """ + Upload data to file server. + + Returns: + Dict with keys: 'status', 'uploadid', 'fileid', 'url' + """ + pass + +# Download handler - async function +async def fileserver_download_handler( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytes: + """ + Download data from URL with exponential backoff. + + Returns: + Downloaded bytes + """ + pass +``` + +#### MicroPython + +```python +# Upload handler - synchronous (no async in MicroPython) +def fileserver_upload_handler( + fileserver_url: str, + dataname: str, + data: bytearray +) -> Dict: + """ + Upload data to file server (synchronous). + + Returns: + Dict with keys: 'status', 'url' + """ + pass + +# Download handler - synchronous +def fileserver_download_handler( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytearray: + """ + Download data from URL with exponential backoff (synchronous). + + Returns: + Downloaded bytes + """ + pass +``` + +--- + +## Multi-Payload Support (Standard API) + +The system uses a **standardized list-of-tuples format** for all payload operations across all platforms. + +### API Standard + +``` +# Input format for smartsend (always a list of tuples with type info) +[(dataname1, data1, type1), (dataname2, data2, type2), ...] + +# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples) +{ + "correlation_id": "...", + "msg_id": "...", + "timestamp": "...", + "send_to": "...", + "msg_purpose": "...", + "sender_name": "...", + "sender_id": "...", + "receiver_name": "...", + "receiver_id": "...", + "reply_to": "...", + "reply_to_msg_id": "...", + "broker_url": "...", + "metadata": {...}, + "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] +} +``` + +### Supported Types + +| Type | Julia | JavaScript | Python | MicroPython | +|------|-------|------------|--------|-------------| +| `text` | `String` | `string` | `str` | `str` | +| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | +| `table` | `DataFrame`, `Arrow.Table` | `Array` | `pandas.DataFrame` | ❌ (not supported) | +| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | +| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | +| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | +| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` | + +### Cross-Platform Examples + +#### Julia + +```julia +using NATSBridge + +# Single payload - still wrapped in a list +env, env_json_str = smartsend( + "/test", + [("dataname1", data1, "dictionary")], + broker_url="nats://localhost:4222", + fileserver_upload_handler=plik_oneshot_upload +) + +# Multiple payloads with different types +env, env_json_str = smartsend( + "/test", + [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")], + broker_url="nats://localhost:4222" +) + +# Mixed content (chat with text, image, audio) +env, env_json_str = smartsend( + "/chat", + [ + ("message_text", "Hello!", "text"), + ("user_image", image_data, "image"), + ("audio_clip", audio_data, "audio") + ], + broker_url="nats://localhost:4222" +) + +# Receive returns a JSON object envelope +env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) +# env["payloads"] = [("dataname1", data1, type1), ...] +``` + +#### JavaScript + +```javascript +const NATSBridge = require('natbridge'); + +// Single payload +const [env, env_json_str] = await NATSBridge.smartsend( + "/test", + [["dataname1", data1, "dictionary"]], + { + broker_url: "nats://localhost:4222", + fileserver_upload_handler: plikOneshotUpload + } +); + +// Multiple payloads +const [env, env_json_str] = await NATSBridge.smartsend( + "/test", + [ + ["dataname1", data1, "dictionary"], + ["dataname2", data2, "table"] + ], + { broker_url: "nats://localhost:4222" } +); + +// Mixed content +const [env, env_json_str] = await NATSBridge.smartsend( + "/chat", + [ + ["message_text", "Hello!", "text"], + ["user_image", imageData, "image"], + ["audio_clip", audioData, "audio"] + ], + { broker_url: "nats://localhost:4222" } +); + +// Receive +const env = await NATSBridge.smartreceive(msg, { + fileserver_download_handler: fetchWithBackoff +}); +// env.payloads = [["dataname1", data1, type1], ...] +``` + +#### Python + +```python +from natbridge import NATSBridge + +# Single payload +env, env_json_str = await NATSBridge.smartsend( + "/test", + [("dataname1", data1, "dictionary")], + broker_url="nats://localhost:4222", + fileserver_upload_handler=plik_oneshot_upload +) + +# Multiple payloads +env, env_json_str = await NATSBridge.smartsend( + "/test", + [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")], + broker_url="nats://localhost:4222" +) + +# Mixed content +env, env_json_str = await NATSBridge.smartsend( + "/chat", + [ + ("message_text", "Hello!", "text"), + ("user_image", image_data, "image"), + ("audio_clip", audio_data, "audio") + ], + broker_url="nats://localhost:4222" +) + +# Receive +env = await NATSBridge.smartreceive( + msg, + fileserver_download_handler=fetch_with_backoff +) +# env["payloads"] = [("dataname1", data1, type1), ...] +``` + +#### MicroPython + +```python +from natbridge import NATSBridge + +# Limited to text and binary (no tables due to memory constraints) +env, env_json_str = NATSBridge.smartsend( + "/chat", + [ + ("message_text", "Hello!", "text"), + ("binary_data", data_bytes, "binary") + ], + broker_url="nats://localhost:4222", + size_threshold=100000 # Lower threshold for memory constraints +) +# Note: MicroPython uses synchronous handlers +``` + +--- + +## Architecture + +### Cross-Platform Claim-Check Pattern + +```mermaid +flowchart TD + A[SmartSend Function] --> B{Is payload size < 1MB?} + B -->|Yes | C[Direct Path
< 1MB] + B -->|No | D[Link Path
>= 1MB] + + C --> C1[Serialize to Buffer] + C1 --> C2[Base64 encode] + C2 --> C3[Publish to NATS] + + D --> D1[Serialize to Buffer] + D1 --> D2[Upload to HTTP Server] + D2 --> D3[Publish to NATS with URL] + + style A fill:#e1f5ff,stroke:#0066cc,stroke-width:2px + style B fill:#fff4e1,stroke:#cc6600,stroke-width:2px + style C fill:#e8f5e9,stroke:#008000,stroke-width:2px + style D fill:#e8f5e9,stroke:#008000,stroke-width:2px + style C1 fill:#f5f5f5,stroke:#666,stroke-width:1px + style C2 fill:#f5f5f5,stroke:#666,stroke-width:1px + style C3 fill:#f5f5f5,stroke:#666,stroke-width:1px + style D1 fill:#f5f5f5,stroke:#666,stroke-width:1px + style D2 fill:#f5f5f5,stroke:#666,stroke-width:1px + style D3 fill:#f5f5f5,stroke:#666,stroke-width:1px +``` + +**Claim-Check Pattern Overview:** +- **Direct Path** (< 1MB): Payload is serialized, Base64-encoded, and published directly to NATS +- **Link Path** (≥ 1MB): Payload is serialized, uploaded to an HTTP file server, and only the URL is published to NATS (claim-check pattern) + +### smartsend Return Value + +All platforms return a tuple/array containing both the envelope and JSON string: + +#### Julia + +```julia +env, env_json_str = smartsend(...) +# Returns: ::Tuple{msg_envelope_v1, String} +# env::msg_envelope_v1 - The envelope object with all metadata and payloads +# env_json_str::String - JSON string for publishing to NATS +``` + +#### JavaScript + +```javascript +const [env, env_json_str] = await smartsend(...); +// Returns: Promise<[env, env_json_str]> +// env: Object with all metadata and payloads +// env_json_str: String for publishing to NATS +``` + +#### Python + +```python +env, env_json_str = await smartsend(...) +# Returns: Tuple[Dict, str] +# env: Dict with all metadata and payloads +# env_json_str: String for publishing to NATS +``` + +#### MicroPython + +```python +env, env_json_str = NATSBridge.smartsend(...) +# Returns: Tuple[Dict, str] +# Note: MicroPython returns plain dicts (no structured envelope object) +``` + +--- + +## Installation + +### Julia Dependencies + +```julia +using Pkg +Pkg.add("NATS") +Pkg.add("Arrow") +Pkg.add("JSON3") +Pkg.add("HTTP") +Pkg.add("UUIDs") +Pkg.add("Dates") +``` + +### JavaScript Dependencies (Node.js) + +```bash +npm install nats uuid apache-arrow node-fetch +# or +yarn add nats uuid apache-arrow node-fetch +``` + +### JavaScript Dependencies (Browser) + +```bash +npm install nats uuid apache-arrow +# or use CDN: +# https://unpkg.com/nats-js/dist/bundle/nats.min.js +# https://unpkg.com/apache-arrow/arrow.min.js +``` + +### Python Dependencies (Desktop) + +```bash +pip install nats-py aiohttp pyarrow pandas python-dateutil +``` + +### MicroPython Dependencies + +MicroPython uses built-in modules: +- `network` - NATS connection (custom implementation) +- `time` - Timestamps +- `uos` - File operations +- `base64` - Base64 encoding +- `json` - JSON parsing +- `struct` - Binary data handling + +--- + +## Usage Tutorial + +### Step 1: Start NATS Server + +```bash +docker run -p 4222:4222 nats:latest +``` + +### Step 2: Start HTTP File Server (optional) + +```bash +# Create a directory for file uploads +mkdir -p /tmp/fileserver + +# Use any HTTP server that supports POST for file uploads +# Example: Python's built-in server +python3 -m http.server 8080 --directory /tmp/fileserver +``` + +### Step 3: Run Test Scenarios + +```bash +# Julia tests +julia test/test_julia_to_julia_text_sender.jl +julia test/test_julia_to_julia_text_receiver.jl + +# JavaScript tests (Node.js) +node test/test_js_text_sender.js +node test/test_js_text_receiver.js + +# Python tests +python3 test/test_py_text_sender.py +python3 test/test_py_text_receiver.py +``` + +--- + +## Platform-Specific Implementations + +### Julia Implementation + +#### Module Structure + +```julia +module NATSBridge + using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64 + + # Constants + const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB + const DEFAULT_BROKER_URL = "nats://localhost:4222" + const DEFAULT_FILESERVER_URL = "http://localhost:8080" + + # Structs + struct msg_payload_v1 + id::String + dataname::String + payload_type::String + transport::String + encoding::String + size::Integer + data::Any + metadata::Dict{String, Any} + end + + struct msg_envelope_v1 + correlation_id::String + msg_id::String + timestamp::String + send_to::String + msg_purpose::String + sender_name::String + sender_id::String + receiver_name::String + receiver_id::String + reply_to::String + reply_to_msg_id::String + broker_url::String + metadata::Dict{String, Any} + payloads::Vector{msg_payload_v1} + end + + # Main functions + function smartsend(...) end + function smartreceive(...) end + + # Utility functions + function _serialize_data(...) end + function _deserialize_data(...) end + function envelope_to_json(...) end + function log_trace(...) end + + # File server handlers + function plik_oneshot_upload(...) end + function _fetch_with_backoff(...) end + function publish_message(...) end + + # Internal helpers + function _get_payload_bytes(...) end +end +``` + +#### Multiple Dispatch Pattern + +Julia leverages multiple dispatch for type-specific implementations: + +```julia +# publish_message has two overloads based on argument types +function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) + conn = NATS.connect(broker_url) + publish_message(conn, subject, message, correlation_id) +end + +function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) + try + NATS.publish(conn, subject, message) + log_trace(correlation_id, "Message published to $subject") + finally + NATS.drain(conn) + end +end + +# Type-specific serialization +function _serialize_data(data::String, payload_type::String) + # Text handling + return Vector{UInt8}(data) +end + +function _serialize_data(data::Dict, payload_type::String) + # Dictionary handling + json_str = JSON.json(data) + return Vector{UInt8}(json_str) +end + +function _serialize_data(data::DataFrame, payload_type::String) + # Table handling + io = IOBuffer() + Arrow.write(io, data) + return take!(io) +end +``` + +#### smartsend Implementation + +```julia +function smartsend( + subject::String, + data::AbstractArray{Tuple{String, T1, String}, 1}; + broker_url::String = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler::Function = plik_oneshot_upload, + size_threshold::Int = DEFAULT_SIZE_THRESHOLD, + correlation_id::String = string(uuid4()), + msg_purpose::String = "chat", + sender_name::String = "NATSBridge", + receiver_name::String = "", + receiver_id::String = "", + reply_to::String = "", + reply_to_msg_id::String = "", + is_publish::Bool = true, + NATS_connection::Union{NATS.Connection, Nothing} = nothing, + msg_id::String = string(uuid4()), + sender_id::String = string(uuid4()) +)::Tuple{msg_envelope_v1, String} where {T1<:Any} + + log_trace(correlation_id, "Starting smartsend for subject: $subject") + + # Process each payload in the list + payloads = msg_payload_v1[] + for (dataname, payload_data, payload_type) in data + # Serialize data based on type + payload_bytes = _serialize_data(payload_data, payload_type) + + payload_size = length(payload_bytes) + log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes") + + # Decision: Direct vs Link + if payload_size < size_threshold + # Direct path - Base64 encode and send via NATS + payload_b64 = Base64.base64encode(payload_bytes) + log_trace(correlation_id, "Using direct transport for $payload_size bytes") + + payload = msg_payload_v1( + payload_b64, + payload_type; + id = string(uuid4()), + dataname = dataname, + transport = "direct", + encoding = "base64", + size = payload_size, + metadata = Dict{String, Any}("payload_bytes" => payload_size) + ) + push!(payloads, payload) + else + # Link path - Upload to HTTP server, send URL via NATS + log_trace(correlation_id, "Using link transport, uploading to fileserver") + + response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) + + if response["status"] != 200 + error("Failed to upload data to fileserver: $(response["status"])") + end + + url = response["url"] + log_trace(correlation_id, "Uploaded to URL: $url") + + payload = msg_payload_v1( + url, + payload_type; + id = string(uuid4()), + dataname = dataname, + transport = "link", + encoding = "none", + size = payload_size, + metadata = Dict{String, Any}() + ) + push!(payloads, payload) + end + end + + # Create msg_envelope_v1 with all payloads + env = msg_envelope_v1( + subject, + payloads; + correlation_id = correlation_id, + msg_id = msg_id, + msg_purpose = msg_purpose, + sender_name = sender_name, + sender_id = sender_id, + receiver_name = receiver_name, + receiver_id = receiver_id, + reply_to = reply_to, + reply_to_msg_id = reply_to_msg_id, + broker_url = broker_url, + metadata = Dict{String, Any}(), + ) + + env_json_str = envelope_to_json(env) + + if is_publish == false + # skip publish + elseif is_publish == true && NATS_connection === nothing + publish_message(broker_url, subject, env_json_str, correlation_id) + elseif is_publish == true && NATS_connection !== nothing + publish_message(NATS_connection, subject, env_json_str, correlation_id) + end + + return (env, env_json_str) +end +``` + +#### smartreceive Implementation + +```julia +function smartreceive( + msg::NATS.Msg; + fileserver_download_handler::Function = _fetch_with_backoff, + max_retries::Int = 5, + base_delay::Int = 100, + max_delay::Int = 5000 +)::JSON.Object{String, Any} + # Parse the JSON envelope + env_json_obj = JSON.parse(String(msg.payload)) + log_trace(env_json_obj["correlation_id"], "Processing received message") + + # Process all payloads in the envelope + payloads_list = Tuple{String, Any, String}[] + + num_payloads = length(env_json_obj["payloads"]) + + for i in 1:num_payloads + payload = env_json_obj["payloads"][i] + transport = String(payload["transport"]) + dataname = String(payload["dataname"]) + + if transport == "direct" + log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'") + + # Extract base64 payload from the payload + payload_b64 = String(payload["data"]) + + # Decode Base64 payload + payload_bytes = Base64.base64decode(payload_b64) + + # Deserialize based on type + data_type = String(payload["payload_type"]) + data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"]) + + push!(payloads_list, (dataname, data, data_type)) + elseif transport == "link" + # Extract download URL from the payload + url = String(payload["data"]) + log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") + + # Fetch with exponential backoff using the download handler + downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"]) + + # Deserialize based on type + data_type = String(payload["payload_type"]) + data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"]) + + push!(payloads_list, (dataname, data, data_type)) + else + error("Unknown transport type for payload '$dataname': $(transport)") + end + end + env_json_obj["payloads"] = payloads_list + return env_json_obj +end +``` + +#### _serialize_data Implementation + +```julia +function _serialize_data(data::Any, payload_type::String) + if payload_type == "text" + if isa(data, String) + data_bytes = Vector{UInt8}(data) + return data_bytes + else + error("Text data must be a String") + end + elseif payload_type == "dictionary" + json_str = JSON.json(data) + json_str_bytes = Vector{UInt8}(json_str) + return json_str_bytes + elseif payload_type == "table" + io = IOBuffer() + Arrow.write(io, data) + return take!(io) + elseif payload_type == "image" + if isa(data, Vector{UInt8}) + return data + else + error("Image data must be Vector{UInt8}") + end + elseif payload_type == "audio" + if isa(data, Vector{UInt8}) + return data + else + error("Audio data must be Vector{UInt8}") + end + elseif payload_type == "video" + if isa(data, Vector{UInt8}) + return data + else + error("Video data must be Vector{UInt8}") + end + elseif payload_type == "binary" + if isa(data, IOBuffer) + return take!(data) + elseif isa(data, Vector{UInt8}) + return data + else + error("Binary data must be binary (Vector{UInt8} or IOBuffer)") + end + else + error("Unknown payload_type: $payload_type") + end +end +``` + +#### _deserialize_data Implementation + +```julia +function _deserialize_data( + data::Vector{UInt8}, + payload_type::String, + correlation_id::String +) + if payload_type == "text" + return String(data) + elseif payload_type == "dictionary" + json_str = String(data) + return JSON.parse(json_str) + elseif payload_type == "table" + io = IOBuffer(data) + df = Arrow.Table(io) + return df + elseif payload_type == "image" + return data + elseif payload_type == "audio" + return data + elseif payload_type == "video" + return data + elseif payload_type == "binary" + return data + else + error("Unknown payload_type: $payload_type") + end +end +``` + +#### _fetch_with_backoff Implementation + +```julia +function _fetch_with_backoff( + url::String, + max_retries::Int, + base_delay::Int, + max_delay::Int, + correlation_id::String +) + delay = base_delay + for attempt in 1:max_retries + try + response = HTTP.request("GET", url) + if response.status == 200 + log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") + return response.body + else + error("Failed to fetch: $(response.status)") + end + catch e + log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") + + if attempt < max_retries + sleep(delay / 1000.0) + delay = min(delay * 2, max_delay) + end + end + end + + error("Failed to fetch data after $max_retries attempts") +end +``` + +#### plik_oneshot_upload Implementation + +```julia +function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8}) + # Get upload id + url_getUploadID = "$file_server_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + response_json = JSON.parse(http_response.body) + uploadid = response_json["id"] + uploadtoken = response_json["uploadToken"] + + # Upload file + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$file_server_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict( + "file" => file_multipart + )) + + http_response = nothing + try + http_response = HTTP.post(url_upload, headers, form) + catch e + @error "Request failed" exception=e + end + response_json = JSON.parse(http_response.body) + fileid = response_json["id"] + + url = "$file_server_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end +``` + +--- + +### JavaScript Implementation + +#### Module Structure + +```javascript +// natbridge.js +const nats = require('nats'); +const { v4: uuidv4 } = require('uuid'); +const fetch = require('node-fetch'); + +const DEFAULT_SIZE_THRESHOLD = 1_000_000; +const DEFAULT_BROKER_URL = 'nats://localhost:4222'; +const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; + +class NATSClient { + constructor(url) { + this.url = url; + this.connection = null; + } + + async connect() { + this.connection = await nats.connect({ servers: this.url }); + return this.connection; + } + + async publish(subject, message) { + if (!this.connection) { + await this.connect(); + } + await this.connection.publish(subject, message); + } + + async close() { + if (this.connection) { + this.connection.close(); + } + } +} + +async function smartsend(subject, data, options = {}) { + // Implementation +} + +async function smartreceive(msg, options = {}) { + // Implementation +} + +module.exports = { + NATSClient, + smartsend, + smartreceive, + plikOneshotUpload, + fetchWithBackoff +}; +``` + +#### smartsend Implementation + +```javascript +const nats = require('nats'); +const { v4: uuidv4 } = require('uuid'); +const fetch = require('node-fetch'); +const arrow = require('apache-arrow'); + +const DEFAULT_SIZE_THRESHOLD = 1_000_000; +const DEFAULT_BROKER_URL = 'nats://localhost:4222'; +const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; + +async function smartsend(subject, data, options = {}) { + const { + broker_url = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler = plikOneshotUpload, + size_threshold = DEFAULT_SIZE_THRESHOLD, + correlation_id = uuidv4(), + msg_purpose = 'chat', + sender_name = 'NATSBridge', + receiver_name = '', + receiver_id = '', + reply_to = '', + reply_to_msg_id = '', + is_publish = true, + nats_connection = null, + msg_id = uuidv4(), + sender_id = uuidv4() + } = options; + + console.log(`[Correlation: ${correlation_id}] Starting smartsend for subject: ${subject}`); + + // Process payloads + const payloads = []; + for (const [dataname, payloadData, payloadType] of data) { + const payloadBytes = await serializeData(payloadData, payloadType); + const payloadSize = payloadBytes.byteLength; + + console.log(`[Correlation: ${correlation_id}] Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); + + if (payloadSize < size_threshold) { + // Direct path + const payloadB64 = bufferToBase64(payloadBytes); + console.log(`[Correlation: ${correlation_id}] Using direct transport for ${payloadSize} bytes`); + + payloads.push({ + id: uuidv4(), + dataname, + payload_type: payloadType, + transport: 'direct', + encoding: 'base64', + size: payloadSize, + data: payloadB64, + metadata: { payload_bytes: payloadSize } + }); + } else { + // Link path + console.log(`[Correlation: ${correlation_id}] Using link transport, uploading to fileserver`); + + const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); + + if (response.status !== 200) { + throw new Error(`Failed to upload data to fileserver: ${response.status}`); + } + + console.log(`[Correlation: ${correlation_id}] Uploaded to URL: ${response.url}`); + + payloads.push({ + id: uuidv4(), + dataname, + payload_type: payloadType, + transport: 'link', + encoding: 'none', + size: payloadSize, + data: response.url, + metadata: {} + }); + } + } + + // Build envelope + const env = { + correlation_id, + msg_id, + timestamp: new Date().toISOString(), + send_to: subject, + msg_purpose, + sender_name, + sender_id, + receiver_name, + receiver_id, + reply_to, + reply_to_msg_id, + broker_url, + metadata: {}, + payloads + }; + + const env_json_str = JSON.stringify(env); + + if (is_publish) { + if (nats_connection) { + await publishMessage(nats_connection, subject, env_json_str, correlation_id); + } else { + await publishMessage(broker_url, subject, env_json_str, correlation_id); + } + } + + return [env, env_json_str]; +} +``` + +#### serializeData Implementation + +```javascript +const arrow = require('apache-arrow'); + +async function serializeData(data, payload_type) { + if (payload_type === 'text') { + if (typeof data === 'string') { + return Buffer.from(data, 'utf8'); + } else { + throw new Error('Text data must be a string'); + } + } else if (payload_type === 'dictionary') { + const jsonStr = JSON.stringify(data); + return Buffer.from(jsonStr, 'utf8'); + } else if (payload_type === 'table') { + // Convert to Arrow IPC + const buffer = Buffer.alloc(1024 * 1024); // Pre-allocate buffer + const writer = new arrow.RecordBatchWriter([ + new arrow.Schema(Object.keys(data[0]).map(key => new arrow.Field(key, arrow.any()))) + ]); + + for (const row of data) { + const recordBatch = arrow.recordBatch.fromObjects([row], writer.schema); + writer.write(recordBatch); + } + await writer.close(); + + // Read from the underlying buffer + return buffer; + } else if (payload_type === 'image') { + if (data instanceof Uint8Array || Buffer.isBuffer(data)) { + return Buffer.from(data); + } else { + throw new Error('Image data must be Uint8Array or Buffer'); + } + } else if (payload_type === 'audio') { + if (data instanceof Uint8Array || Buffer.isBuffer(data)) { + return Buffer.from(data); + } else { + throw new Error('Audio data must be Uint8Array or Buffer'); + } + } else if (payload_type === 'video') { + if (data instanceof Uint8Array || Buffer.isBuffer(data)) { + return Buffer.from(data); + } else { + throw new Error('Video data must be Uint8Array or Buffer'); + } + } else if (payload_type === 'binary') { + if (data instanceof Uint8Array || Buffer.isBuffer(data)) { + return Buffer.from(data); + } else { + throw new Error('Binary data must be Uint8Array or Buffer'); + } + } else { + throw new Error(`Unknown payload_type: ${payload_type}`); + } +} + +function bufferToBase64(buffer) { + return buffer.toString('base64'); +} +``` + +#### deserializeData Implementation + +```javascript +const arrow = require('apache-arrow'); + +async function deserializeData(data, payload_type, correlation_id) { + if (payload_type === 'text') { + return Buffer.from(data).toString('utf8'); + } else if (payload_type === 'dictionary') { + const jsonStr = Buffer.from(data).toString('utf8'); + return JSON.parse(jsonStr); + } else if (payload_type === 'table') { + const buffer = Buffer.from(data); + const table = arrow.tableFromRawBytes(buffer); + return table; + } else if (payload_type === 'image') { + return Buffer.from(data); + } else if (payload_type === 'audio') { + return Buffer.from(data); + } else if (payload_type === 'video') { + return Buffer.from(data); + } else if (payload_type === 'binary') { + return Buffer.from(data); + } else { + throw new Error(`Unknown payload_type: ${payload_type}`); + } +} +``` + +#### fetchWithBackoff Implementation + +```javascript +async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) { + let delay = base_delay; + + for (let attempt = 1; attempt <= max_retries; attempt++) { + try { + const response = await fetch(url); + + if (response.status === 200) { + console.log(`[Correlation: ${correlation_id}] Successfully fetched data from ${url} on attempt ${attempt}`); + return await response.arrayBuffer(); + } else { + throw new Error(`Failed to fetch: ${response.status}`); + } + } catch (e) { + console.log(`[Correlation: ${correlation_id}] Attempt ${attempt} failed: ${e.constructor.name}`); + + if (attempt < max_retries) { + await new Promise(resolve => setTimeout(resolve, delay)); + delay = Math.min(delay * 2, max_delay); + } + } + } + + throw new Error(`Failed to fetch data after ${max_retries} attempts`); +} +``` + +#### plikOneshotUpload Implementation + +```javascript +async function plikOneshotUpload(file_server_url, dataname, data) { + // Get upload id + const url_getUploadID = `${file_server_url}/upload`; + const headers = { 'Content-Type': 'application/json' }; + const body = JSON.stringify({ OneShot: true }); + + const http_response = await fetch(url_getUploadID, { + method: 'POST', + headers, + body + }); + + const response_json = await http_response.json(); + const uploadid = response_json.id; + const uploadtoken = response_json.uploadToken; + + // Upload file + const url_upload = `${file_server_url}/file/${uploadid}`; + const form = new FormData(); + const blob = new Blob([data]); + form.append('file', blob, dataname); + + const upload_headers = { + 'X-UploadToken': uploadtoken + }; + + const upload_response = await fetch(url_upload, { + method: 'POST', + headers: upload_headers, + body: form + }); + + const upload_json = await upload_response.json(); + const fileid = upload_json.id; + + const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`; + + return { + status: upload_response.status, + uploadid, + fileid, + url + }; +} +``` + +--- + +### Python Implementation + +#### Module Structure + +```python +# natbridge.py +import asyncio +import base64 +import json +import uuid +import time +from typing import Any, Dict, List, Tuple, Union, Callable +from dataclasses import dataclass, field +from datetime import datetime + +try: + import pyarrow as arrow + import pyarrow.parquet as pq + ARROW_AVAILABLE = True +except ImportError: + ARROW_AVAILABLE = False + +try: + import aiohttp + import nats + from nats.aio.client import Client as NATSClient + NATS_AVAILABLE = True +except ImportError: + NATS_AVAILABLE = False + + +DEFAULT_SIZE_THRESHOLD = 1_000_000 +DEFAULT_BROKER_URL = "nats://localhost:4222" +DEFAULT_FILESERVER_URL = "http://localhost:8080" + + +@dataclass +class MsgPayloadV1: + """Message payload structure.""" + id: str + dataname: str + payload_type: str + transport: str + encoding: str + size: int + data: Union[str, bytes] + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class MsgEnvelopeV1: + """Message envelope structure.""" + correlation_id: str + msg_id: str + timestamp: str + send_to: str + msg_purpose: str + sender_name: str + sender_id: str + receiver_name: str + receiver_id: str + reply_to: str + reply_to_msg_id: str + broker_url: str + metadata: Dict[str, Any] = field(default_factory=dict) + payloads: List[MsgPayloadV1] = field(default_factory=list) + + +class NATSBridge: + """Cross-platform NATS bridge implementation.""" + + def __init__(self, broker_url: str = None, fileserver_url: str = None): + self.broker_url = broker_url or DEFAULT_BROKER_URL + self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL + self._nats_client: NATSClient = None + + async def smartsend(self, subject: str, data: List[Tuple[str, Any, str]], **kwargs) -> Tuple[Dict, str]: + """Send data via NATS.""" + pass + + async def smartreceive(self, msg: Any, **kwargs) -> Dict: + """Receive and process NATS message.""" + pass +``` + +#### smartsend Implementation + +```python +import asyncio +import base64 +import json +import uuid +from typing import Any, Dict, List, Tuple, Union, Callable +from datetime import datetime + +DEFAULT_SIZE_THRESHOLD = 1_000_000 +DEFAULT_BROKER_URL = "nats://localhost:4222" +DEFAULT_FILESERVER_URL = "http://localhost:8080" + + +async def smartsend( + subject: str, + data: List[Tuple[str, Any, str]], + broker_url: str = DEFAULT_BROKER_URL, + fileserver_url: str = DEFAULT_FILESERVER_URL, + fileserver_upload_handler: Callable = plik_oneshot_upload, + size_threshold: int = DEFAULT_SIZE_THRESHOLD, + correlation_id: str = None, + msg_purpose: str = "chat", + sender_name: str = "NATSBridge", + receiver_name: str = "", + receiver_id: str = "", + reply_to: str = "", + reply_to_msg_id: str = "", + is_publish: bool = True, + nats_connection: Any = None, + msg_id: str = None, + sender_id: str = None +) -> Tuple[Dict, str]: + """ + Send data via NATS with automatic transport selection. + + Args: + subject: NATS subject to publish to + data: List of (dataname, data, type) tuples + **kwargs: Additional options + + Returns: + Tuple of (env, env_json_str) + """ + if correlation_id is None: + correlation_id = str(uuid.uuid4()) + if msg_id is None: + msg_id = str(uuid.uuid4()) + if sender_id is None: + sender_id = str(uuid.uuid4()) + + print(f"[Correlation: {correlation_id}] Starting smartsend for subject: {subject}") + + # Process payloads + payloads = [] + for dataname, payload_data, payload_type in data: + payload_bytes = _serialize_data(payload_data, payload_type) + payload_size = len(payload_bytes) + + print(f"[Correlation: {correlation_id}] Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes") + + if payload_size < size_threshold: + # Direct path + payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') + print(f"[Correlation: {correlation_id}] Using direct transport for {payload_size} bytes") + + payloads.append({ + 'id': str(uuid.uuid4()), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'direct', + 'encoding': 'base64', + 'size': payload_size, + 'data': payload_b64, + 'metadata': {'payload_bytes': payload_size} + }) + else: + # Link path + print(f"[Correlation: {correlation_id}] Using link transport, uploading to fileserver") + + response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes) + + if response['status'] != 200: + raise Exception(f"Failed to upload data to fileserver: {response['status']}") + + print(f"[Correlation: {correlation_id}] Uploaded to URL: {response['url']}") + + payloads.append({ + 'id': str(uuid.uuid4()), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'link', + 'encoding': 'none', + 'size': payload_size, + 'data': response['url'], + 'metadata': {} + }) + + # Build envelope + env = { + 'correlation_id': correlation_id, + 'msg_id': msg_id, + 'timestamp': datetime.utcnow().isoformat() + 'Z', + 'send_to': subject, + 'msg_purpose': msg_purpose, + 'sender_name': sender_name, + 'sender_id': sender_id, + 'receiver_name': receiver_name, + 'receiver_id': receiver_id, + 'reply_to': reply_to, + 'reply_to_msg_id': reply_to_msg_id, + 'broker_url': broker_url, + 'metadata': {}, + 'payloads': payloads + } + + env_json_str = json.dumps(env) + + if is_publish: + if nats_connection: + await publish_message(nats_connection, subject, env_json_str, correlation_id) + else: + await publish_message(broker_url, subject, env_json_str, correlation_id) + + return env, env_json_str +``` + +#### serializeData Implementation + +```python +import base64 +import json +from typing import Any + +try: + import pyarrow as arrow + import pyarrow.parquet as pq + ARROW_AVAILABLE = True +except ImportError: + ARROW_AVAILABLE = False + + +def _serialize_data(data: Any, payload_type: str) -> bytes: + """Serialize data to bytes based on type.""" + if payload_type == 'text': + if isinstance(data, str): + return data.encode('utf-8') + else: + raise Error('Text data must be a string') + elif payload_type == 'dictionary': + json_str = json.dumps(data) + return json_str.encode('utf-8') + elif payload_type == 'table': + if not ARROW_AVAILABLE: + raise Error('pyarrow not available for table serialization') + + # Convert DataFrame to Arrow + import io + buf = io.BytesIO() + import pandas as pd + if isinstance(data, pd.DataFrame): + table = arrow.Table.from_pandas(data) + sink = arrow.ipc.new_file(buf) + arrow.ipc.write_table(table, sink) + sink.close() + return buf.getvalue() + else: + raise Error('Table data must be a pandas DataFrame') + elif payload_type == 'image': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Image data must be bytes') + elif payload_type == 'audio': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Audio data must be bytes') + elif payload_type == 'video': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Video data must be bytes') + elif payload_type == 'binary': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Binary data must be bytes') + else: + raise Error(f'Unknown payload_type: {payload_type}') +``` + +#### deserializeData Implementation + +```python +import base64 +import json +from typing import Any + +try: + import pyarrow as arrow + ARROW_AVAILABLE = True +except ImportError: + ARROW_AVAILABLE = False + + +def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any: + """Deserialize bytes to data based on type.""" + if payload_type == 'text': + return data.decode('utf-8') + elif payload_type == 'dictionary': + json_str = data.decode('utf-8') + return json.loads(json_str) + elif payload_type == 'table': + if not ARROW_AVAILABLE: + raise Error('pyarrow not available for table deserialization') + + import io + buf = io.BytesIO(data) + reader = arrow.ipc.open_file(buf) + return reader.read_all().to_pandas() + elif payload_type == 'image': + return data + elif payload_type == 'audio': + return data + elif payload_type == 'video': + return data + elif payload_type == 'binary': + return data + else: + raise Error(f'Unknown payload_type: {payload_type}') +``` + +#### fetchWithBackoff Implementation + +```python +import asyncio +import aiohttp +from typing import Callable + + +async def fetch_with_backoff( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytes: + """Fetch URL with exponential backoff.""" + delay = base_delay + + for attempt in range(1, max_retries + 1): + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + print(f"[Correlation: {correlation_id}] Successfully fetched data from {url} on attempt {attempt}") + return await response.read() + else: + raise Exception(f"Failed to fetch: {response.status}") + except Exception as e: + print(f"[Correlation: {correlation_id}] Attempt {attempt} failed: {type(e).__name__}") + + if attempt < max_retries: + await asyncio.sleep(delay / 1000.0) + delay = min(delay * 2, max_delay) + + raise Exception(f"Failed to fetch data after {max_retries} attempts") +``` + +#### plikOneshotUpload Implementation + +```python +import aiohttp +import json +from typing import Dict, Any + + +async def plik_oneshot_upload( + file_server_url: str, + dataname: str, + data: bytes +) -> Dict[str, Any]: + """Upload data to plik server in one-shot mode.""" + + # Get upload id + async with aiohttp.ClientSession() as session: + url_getUploadID = f"{file_server_url}/upload" + headers = {'Content-Type': 'application/json'} + body = json.dumps({"OneShot": True}) + + async with session.post(url_getUploadID, headers=headers, data=body) as response: + response_json = await response.json() + uploadid = response_json['id'] + uploadtoken = response_json['uploadToken'] + + # Upload file + url_upload = f"{file_server_url}/file/{uploadid}" + headers = {'X-UploadToken': uploadtoken} + + form = aiohttp.FormData() + form.add_field('file', data, filename=dataname, content_type='application/octet-stream') + + async with session.post(url_upload, headers=headers, data=form) as upload_response: + upload_json = await upload_response.json() + fileid = upload_json['id'] + + url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}" + + return { + 'status': upload_response.status, + 'uploadid': uploadid, + 'fileid': fileid, + 'url': url + } +``` + +--- + +## MicroPython Implementation + +### Limitations + +MicroPython has significant constraints compared to desktop implementations: + +| Feature | Desktop | MicroPython | +|---------|---------|-------------| +| Memory | Unlimited | ~256KB - 1MB | +| Arrow IPC | ✅ | ❌ (not supported) | +| Async/Await | ✅ | ⚠️ (uasyncio only) | +| Large payloads (>1MB) | ✅ | ❌ (enforced limit) | +| Table type | ✅ | ❌ | +| Multiple payloads | ✅ | ⚠️ (limited) | + +### MicroPython Module Structure + +```python +# natbridge_mpy.py (MicroPython) +import network +import time +import json +import base64 +import uos +import struct + +# Constants +DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython +DEFAULT_BROKER_URL = "nats://localhost:4222" +DEFAULT_FILESERVER_URL = "http://localhost:8080" +MAX_PAYLOAD_SIZE = 50000 # Hard limit + + +class NATSBridge: + """MicroPython NATS bridge implementation.""" + + def __init__(self, broker_url=None, fileserver_url=None): + self.broker_url = broker_url or DEFAULT_BROKER_URL + self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL + self._nats_conn = None + + def smartsend(self, subject, data, **kwargs): + """Send data (synchronous).""" + correlation_id = self._generate_uuid() + msg_id = self._generate_uuid() + sender_id = self._generate_uuid() + + print(f"[Correlation: {correlation_id}] Starting smartsend") + + payloads = [] + for dataname, payload_data, payload_type in data: + payload_bytes = self._serialize_data(payload_data, payload_type) + payload_size = len(payload_bytes) + + if payload_size > MAX_PAYLOAD_SIZE: + raise MemoryError(f"Payload {dataname} exceeds max size {MAX_PAYLOAD_SIZE}") + + if payload_size < DEFAULT_SIZE_THRESHOLD: + # Direct path + payload_b64 = base64.b64encode(payload_bytes).decode('ascii') + payloads.append({ + 'id': self._generate_uuid(), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'direct', + 'encoding': 'base64', + 'size': payload_size, + 'data': payload_b64 + }) + else: + # Link path (limited support) + response = self._sync_fileserver_upload(self.fileserver_url, dataname, payload_bytes) + payloads.append({ + 'id': self._generate_uuid(), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'link', + 'encoding': 'none', + 'size': payload_size, + 'data': response['url'] + }) + + env = { + 'correlation_id': correlation_id, + 'msg_id': msg_id, + 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), + 'send_to': subject, + 'msg_purpose': kwargs.get('msg_purpose', 'chat'), + 'sender_name': kwargs.get('sender_name', 'NATSBridge'), + 'sender_id': sender_id, + 'receiver_name': kwargs.get('receiver_name', ''), + 'receiver_id': kwargs.get('receiver_id', ''), + 'reply_to': kwargs.get('reply_to', ''), + 'reply_to_msg_id': kwargs.get('reply_to_msg_id', ''), + 'broker_url': self.broker_url, + 'metadata': {}, + 'payloads': payloads + } + + env_json_str = json.dumps(env) + + # Publish + self._publish(subject, env_json_str, correlation_id) + + return env, env_json_str + + def smartreceive(self, msg, **kwargs): + """Receive and process message (synchronous).""" + env_json_obj = json.loads(msg.payload) + correlation_id = env_json_obj['correlation_id'] + + payloads_list = [] + for payload in env_json_obj['payloads']: + transport = payload['transport'] + dataname = payload['dataname'] + + if transport == 'direct': + payload_b64 = payload['data'] + payload_bytes = base64.b64decode(payload_b64) + data_type = payload['payload_type'] + data = self._deserialize_data(payload_bytes, data_type) + payloads_list.append((dataname, data, data_type)) + elif transport == 'link': + url = payload['data'] + downloaded_data = self._sync_fileserver_download( + url, + kwargs.get('max_retries', 3), + kwargs.get('base_delay', 100), + kwargs.get('max_delay', 1000), + correlation_id + ) + data_type = payload['payload_type'] + data = self._deserialize_data(downloaded_data, data_type) + payloads_list.append((dataname, data, data_type)) + + env_json_obj['payloads'] = payloads_list + return env_json_obj + + def _serialize_data(self, data, payload_type): + """Serialize data (MicroPython version - no table support).""" + if payload_type == 'text': + return data.encode('utf-8') + elif payload_type == 'dictionary': + return json.dumps(data).encode('utf-8') + elif payload_type in ('image', 'audio', 'video', 'binary'): + return bytes(data) + else: + raise ValueError(f"Unknown payload_type: {payload_type}") + + def _deserialize_data(self, data, payload_type): + """Deserialize data (MicroPython version).""" + if payload_type == 'text': + return data.decode('utf-8') + elif payload_type == 'dictionary': + return json.loads(data.decode('utf-8')) + elif payload_type in ('image', 'audio', 'video', 'binary'): + return data + else: + raise ValueError(f"Unknown payload_type: {payload_type}") + + def _generate_uuid(self): + """Generate simple UUID (MicroPython compatible).""" + return 'mp-%04x%04x-%04x-%04x-%04x-%04x%04x%04x' % ( + time.time_ns() // (10**6) % 0xFFFFFFFF, + time.time_ns() % 0xFFFFFFFF, + time.time_ns() >> 32 & 0xFFFF, + time.time_ns() >> 48 & 0xFFFF, + time.time_ns() >> 64 & 0xFFFF, + time.time_ns() >> 80 & 0xFFFF, + time.time_ns() >> 96 & 0xFFFF, + time.time_ns() >> 112 & 0xFFFF + ) + + def _sync_fileserver_upload(self, url, dataname, data): + """Synchronous file upload (limited).""" + # Simplified implementation for MicroPython + # In practice, would use network.HTTP or similar + raise NotImplementedError("File upload not implemented in MicroPython") + + def _sync_fileserver_download(self, url, max_retries, base_delay, max_delay, correlation_id): + """Synchronous file download with backoff.""" + # Simplified implementation for MicroPython + raise NotImplementedError("File download not implemented in MicroPython") + + def _publish(self, subject, message, correlation_id): + """Publish message to NATS.""" + # Simplified implementation for MicroPython + raise NotImplementedError("NATS publishing not implemented in MicroPython") +``` + +--- + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `NATS_URL` | `nats://localhost:4222` | NATS server URL | +| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | +| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) | + +### MicroPython Configuration + +```python +# micropython.conf +NATS_URL = "nats://broker.local:4222" +FILESERVER_URL = "http://fileserver.local:8080" +SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices +MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython +``` + +--- + +## Performance Considerations + +### Zero-Copy Reading + +| Platform | Strategy | +|----------|----------| +| **Julia** | `Arrow.read()` with memory-mapped files | +| **JavaScript** | `ArrayBuffer` with `DataView` | +| **Python** | `pyarrow` memory mapping | +| **MicroPython** | Not available (streaming only) | + +### Exponential Backoff + +All platforms implement exponential backoff for HTTP downloads: + +```python +# Python +async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id): + delay = base_delay + for attempt in range(1, max_retries + 1): + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + return await response.read() + except Exception as e: + if attempt < max_retries: + await asyncio.sleep(delay / 1000.0) + delay = min(delay * 2, max_delay) + raise Exception("Failed to fetch after max retries") +``` + +### Correlation ID Logging + +All platforms use correlation IDs for distributed tracing: + +``` +[timestamp] [Correlation: abc123] Message published to subject +``` + +--- + +## Testing + +### Test File Organization + +| Platform | Sender Tests | Receiver Tests | +|----------|--------------|----------------| +| **Julia** | `test/test_julia_*_sender.jl` | `test/test_julia_*_receiver.jl` | +| **JavaScript** | `test/test_js_*_sender.js` | `test/test_js_*_receiver.js` | +| **Python** | `test/test_py_*_sender.py` | `test/test_py_*_receiver.py` | + +### Run Tests + +```bash +# Julia +julia test/test_julia_text_sender.jl +julia test/test_julia_text_receiver.jl + +# JavaScript (Node.js) +node test/test_js_text_sender.js +node test/test_js_text_receiver.js + +# Python +python3 test/test_py_text_sender.py +python3 test/test_py_text_receiver.py +``` + +--- + +## Troubleshooting + +### Common Issues + +1. **NATS Connection Failed** + - Ensure NATS server is running + - Check `broker_url` configuration + +2. **HTTP Upload Failed** + - Ensure file server is running + - Check `fileserver_url` configuration + - Verify upload permissions + +3. **Arrow IPC Deserialization Error** + - Ensure data is properly serialized to Arrow format + - Check Arrow version compatibility + - MicroPython doesn't support Arrow IPC + +4. **Memory Constraints (MicroPython)** + - Reduce `size_threshold` + - Use direct transport only (< 100KB) + - Avoid large payloads + +--- + +## Summary + +This cross-platform NATS bridge provides: + +1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across all platforms +2. **Idiomatic Implementations**: + - **Julia**: Multiple dispatch, struct-based design, native Arrow IPC + - **JavaScript**: Async/await, prototype-based utilities, class-based NATS client + - **Python**: Class-based design with dataclasses, type hints, async/await + - **MicroPython**: Synchronous API, memory-constrained optimizations +3. **Message Format Consistency**: Identical JSON schemas across all platforms +4. **Handler Abstraction**: File server operations abstracted through configurable handlers +5. **Platform-Specific Optimizations**: Arrow IPC in desktop platforms, streaming support in MicroPython + +The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior. \ No newline at end of file diff --git a/src/natbridge.js b/src/natbridge.js new file mode 100644 index 0000000..eef2ab3 --- /dev/null +++ b/src/natbridge.js @@ -0,0 +1,719 @@ +/** + * NATSBridge - Cross-Platform Bi-Directional Data Bridge + * JavaScript Implementation (Node.js and Browser) + * + * This module provides functionality for sending and receiving data across network boundaries + * using NATS as the message bus, with support for both direct payload transport and + * URL-based transport for larger payloads. + * + * File Server Handler Architecture: + * The system uses handler functions to abstract file server operations, allowing support + * for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). + * + * Handler Function Signatures: + * + * ```javascript + * // Upload handler - uploads data to file server and returns URL + * // The handler is passed to smartsend as fileserver_upload_handler parameter + * // It receives: (fileserver_url, dataname, data) + * // Returns: Promise<{ status, uploadid, fileid, url }> + * fileserver_upload_handler(fileserver_url, dataname, data) + * + * // Download handler - fetches data from file server URL with exponential backoff + * // The handler is passed to smartreceive as fileserver_download_handler parameter + * // It receives: (url, max_retries, base_delay, max_delay, correlation_id) + * // Returns: Promise + * fileserver_download_handler(url, max_retries, base_delay, max_delay, correlation_id) + * ``` + * + * Multi-Payload Support (Standard API): + * The system uses a standardized list-of-tuples format for all payload operations. + * Even when sending a single payload, the user must wrap it in a list. + * + * API Standard: + * + * ```javascript + * // Input format for smartsend (always a list of tuples with type info) + * [[dataname1, data1, type1], [dataname2, data2, type2], ...] + * + * // Output format for smartreceive (returns a dictionary with payloads field containing list of tuples) + * { + * "correlation_id": "...", + * "msg_id": "...", + * "timestamp": "...", + * "send_to": "...", + * "msg_purpose": "...", + * "sender_name": "...", + * "sender_id": "...", + * "receiver_name": "...", + * "receiver_id": "...", + * "reply_to": "...", + * "reply_to_msg_id": "...", + * "broker_url": "...", + * "metadata": {...}, + * "payloads": [[dataname1, data1, type1], [dataname2, data2, type2], ...] + * } + * ``` + * + * Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" + */ + +const nats = typeof require !== 'undefined' ? require('nats') : null; +const { v4: uuidv4 } = typeof require !== 'undefined' ? require('uuid') : null; +const fetch = typeof require !== 'undefined' ? require('node-fetch') : (typeof globalThis !== 'undefined' ? globalThis.fetch : undefined); +const arrow = typeof require !== 'undefined' ? require('apache-arrow') : null; + +/** + * Default configuration values + */ +const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB - threshold for switching from direct to link transport +const DEFAULT_BROKER_URL = 'nats://localhost:4222'; // Default NATS server URL +const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; // Default HTTP file server URL + +/** + * Generate a UUID v4 + * @returns {string} UUID string + */ +function generateUUID() { + if (uuidv4) { + return uuidv4(); + } + // Fallback UUID generation for environments without uuid package + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { + const r = Math.random() * 16 | 0; + const v = c === 'x' ? r : (r & 0x3 | 0x8); + return v.toString(16); + }); +} + +/** + * Log a trace message with correlation ID and timestamp + * @param {string} correlation_id - Correlation ID for tracing + * @param {string} message - Message content to log + */ +function logTrace(correlation_id, message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`); +} + +/** + * Serialize data according to specified format + * @param {any} data - Data to serialize + * @param {string} payload_type - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary" + * @returns {Promise} Binary representation of the serialized data + */ +async function serializeData(data, payload_type) { + if (payload_type === 'text') { + if (typeof data === 'string') { + return new TextEncoder().encode(data); + } else { + throw new Error('Text data must be a string'); + } + } else if (payload_type === 'dictionary') { + const jsonStr = JSON.stringify(data); + return new TextEncoder().encode(jsonStr); + } else if (payload_type === 'table') { + // Use Apache Arrow for table serialization + if (!arrow) { + throw new Error('apache-arrow not available. Install with: npm install apache-arrow'); + } + + // Convert array of objects to Arrow Table + if (!Array.isArray(data)) { + throw new Error('Table data must be an array of objects'); + } + + // Build schema from first row if not provided + const schemaFields = []; + if (data.length > 0) { + for (const key in data[0]) { + const value = data[0][key]; + let arrowType; + if (typeof value === 'number') { + arrowType = arrow.float64; + } else if (typeof value === 'boolean') { + arrowType = arrow.bool; + } else if (value instanceof Date) { + arrowType = arrow.string; // Date as string + } else { + arrowType = arrow.string; + } + schemaFields.push(new arrow.Field(key, arrowType)); + } + } + + const schema = new arrow.Schema(schemaFields); + + // Convert data to Arrow RecordBatch + const arrays = {}; + for (const field of schema.fields) { + const name = field.name; + const type = field.type; + + if (type instanceof arrow.Float64) { + arrays[name] = new arrow.Float64Array(data.length); + for (let i = 0; i < data.length; i++) { + arrays[name][i] = data[i][name] || 0; + } + } else if (type instanceof arrow.Boolean) { + arrays[name] = new arrow.BooleanArray(data.length); + for (let i = 0; i < data.length; i++) { + arrays[name][i] = data[i][name] || false; + } + } else { + // String type + const values = data.map(row => String(row[name] ?? '')); + const offsets = new Int32Array(values.length + 1); + let offset = 0; + for (let i = 0; i < values.length; i++) { + offsets[i + 1] = offset += values[i].length; + } + const buffer = new Uint8Array(offsets[values.length]); + for (let i = 0; i < values.length; i++) { + const encoder = new TextEncoder(); + const bytes = encoder.encode(values[i]); + buffer.set(bytes, offsets[i]); + } + arrays[name] = new arrow.StringArray( + new arrow.DataView(new arrow.Buffer(buffer), 0, offsets[values.length]), + new arrow.Buffer(offsets.buffer, 0, offsets.length * 4), + 0, + data.length + ); + } + } + + const recordBatch = arrow.RecordBatch.fromArrays(schema, arrays, data.length); + + // Write to IPC format + const buffer = arrow.tableFromBatches([recordBatch]).toBuffer(); + return new Uint8Array(buffer); + } else if (payload_type === 'image') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || Buffer.isBuffer(data)) { + return data instanceof Uint8Array ? data : new Uint8Array(data); + } else { + throw new Error('Image data must be Uint8Array or ArrayBuffer'); + } + } else if (payload_type === 'audio') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || Buffer.isBuffer(data)) { + return data instanceof Uint8Array ? data : new Uint8Array(data); + } else { + throw new Error('Audio data must be Uint8Array or ArrayBuffer'); + } + } else if (payload_type === 'video') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || Buffer.isBuffer(data)) { + return data instanceof Uint8Array ? data : new Uint8Array(data); + } else { + throw new Error('Video data must be Uint8Array or ArrayBuffer'); + } + } else if (payload_type === 'binary') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || Buffer.isBuffer(data)) { + return data instanceof Uint8Array ? data : new Uint8Array(data); + } else { + throw new Error('Binary data must be Uint8Array or ArrayBuffer'); + } + } else { + throw new Error(`Unknown payload_type: ${payload_type}`); + } +} + +/** + * Deserialize bytes to data based on type + * @param {Uint8Array} data - Serialized data as bytes + * @param {string} payload_type - Data type + * @param {string} correlation_id - Correlation ID for logging + * @returns {Promise} Deserialized data + */ +async function deserializeData(data, payload_type, correlation_id) { + if (payload_type === 'text') { + return new TextDecoder().decode(data); + } else if (payload_type === 'dictionary') { + const jsonStr = new TextDecoder().decode(data); + return JSON.parse(jsonStr); + } else if (payload_type === 'table') { + // Use Apache Arrow for table deserialization + if (!arrow) { + throw new Error('apache-arrow not available. Install with: npm install apache-arrow'); + } + + // Read Arrow IPC format + const buffer = arrow.Buffer.wrap(data.buffer, data.byteOffset, data.byteLength); + const table = arrow.tableFromRawBytes(buffer); + + // Convert to array of objects for consistency with the API + const result = []; + const numRows = table.numRows; + + for (let i = 0; i < numRows; i++) { + const row = {}; + for (const colName of table.columnNames) { + const column = table.getColumn(colName); + row[colName] = column.get(i); + } + result.push(row); + } + + return result; + } else if (payload_type === 'image') { + return data; + } else if (payload_type === 'audio') { + return data; + } else if (payload_type === 'video') { + return data; + } else if (payload_type === 'binary') { + return data; + } else { + throw new Error(`Unknown payload_type: ${payload_type}`); + } +} + +/** + * Fetch data from URL with exponential backoff + * @param {string} url - URL to fetch from + * @param {number} max_retries - Maximum retry attempts + * @param {number} base_delay - Initial delay in milliseconds + * @param {number} max_delay - Maximum delay in milliseconds + * @param {string} correlation_id - Correlation ID for logging + * @returns {Promise} Fetched data as bytes + */ +async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) { + let delay = base_delay; + + for (let attempt = 1; attempt <= max_retries; attempt++) { + try { + const response = await fetch(url); + + if (response.status === 200) { + logTrace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`); + const arrayBuffer = await response.arrayBuffer(); + return new Uint8Array(arrayBuffer); + } else { + throw new Error(`Failed to fetch: ${response.status}`); + } + } catch (e) { + logTrace(correlation_id, `Attempt ${attempt} failed: ${e.constructor.name}`); + + if (attempt < max_retries) { + await new Promise(resolve => setTimeout(resolve, delay)); + delay = Math.min(delay * 2, max_delay); + } + } + } + + throw new Error(`Failed to fetch data after ${max_retries} attempts`); +} + +/** + * Upload a single file to a plik server using one-shot mode + * @param {string} file_server_url - Base URL of the plik server + * @param {string} dataname - Name of the file being uploaded + * @param {Uint8Array} data - Raw byte data of the file content + * @returns {Promise<{ status, uploadid, fileid, url }>} Upload result + */ +async function plikOneshotUpload(file_server_url, dataname, data) { + // Get upload id + const url_getUploadID = `${file_server_url}/upload`; + const headers = { 'Content-Type': 'application/json' }; + const body = JSON.stringify({ OneShot: true }); + + const http_response = await fetch(url_getUploadID, { + method: 'POST', + headers, + body + }); + + const response_json = await http_response.json(); + const uploadid = response_json.id; + const uploadtoken = response_json.uploadToken; + + // Upload file + const url_upload = `${file_server_url}/file/${uploadid}`; + const form = new FormData(); + const blob = new Blob([data]); + form.append('file', blob, dataname); + + const upload_headers = { + 'X-UploadToken': uploadtoken + }; + + const upload_response = await fetch(url_upload, { + method: 'POST', + headers: upload_headers, + body: form + }); + + const upload_json = await upload_response.json(); + const fileid = upload_json.id; + + const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`; + + return { + status: upload_response.status, + uploadid, + fileid, + url + }; +} + +/** + * Publish message to NATS + * @param {string|object} broker_url_or_conn - NATS server URL or pre-existing connection + * @param {string} subject - NATS subject to publish to + * @param {string} message - JSON message to publish + * @param {string} correlation_id - Correlation ID for tracing and logging + */ +async function publishMessage(broker_url_or_conn, subject, message, correlation_id) { + if (broker_url_or_conn instanceof Object && broker_url_or_conn.publish) { + // Pre-existing connection + try { + await broker_url_or_conn.publish(subject, message); + logTrace(correlation_id, `Message published to ${subject}`); + } finally { + // Note: In a real implementation, you might want to drain/close the connection + } + } else { + // URL-based - create new connection + if (!nats) { + throw new Error('nats package not available. Install with: npm install nats'); + } + + const conn = await nats.connect(broker_url_or_conn); + try { + await conn.publish(subject, message); + logTrace(correlation_id, `Message published to ${subject}`); + } finally { + conn.close(); + } + } +} + +/** + * Build message envelope from payloads and metadata + * @param {string} subject - NATS subject + * @param {Array} payloads - Array of payload objects + * @param {Object} options - Envelope options + * @returns {Object} Message envelope + */ +function buildEnvelope(subject, payloads, options) { + return { + correlation_id: options.correlation_id || generateUUID(), + msg_id: options.msg_id || generateUUID(), + timestamp: new Date().toISOString(), + send_to: subject, + msg_purpose: options.msg_purpose || '', + sender_name: options.sender_name || '', + sender_id: options.sender_id || generateUUID(), + receiver_name: options.receiver_name || '', + receiver_id: options.receiver_id || '', + reply_to: options.reply_to || '', + reply_to_msg_id: options.reply_to_msg_id || '', + broker_url: options.broker_url || DEFAULT_BROKER_URL, + metadata: options.metadata || {}, + payloads + }; +} + +/** + * Convert data to base64 string + * @param {Uint8Array} buffer - Data buffer + * @returns {string} Base64 encoded string + */ +function bufferToBase64(buffer) { + if (typeof Buffer !== 'undefined' && Buffer.isBuffer(buffer)) { + return buffer.toString('base64'); + } + + // For browser/Node Uint8Array + let binary = ''; + const bytes = new Uint8Array(buffer); + const len = bytes.byteLength; + for (let i = 0; i < len; i++) { + binary += String.fromCharCode(bytes[i]); + } + return btoa(binary); +} + +/** + * Convert base64 string to Uint8Array + * @param {string} base64 - Base64 encoded string + * @returns {Uint8Array} Decoded bytes + */ +function base64ToBuffer(base64) { + const binary = atob(base64); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < binary.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + return bytes; +} + +/** + * smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size + * + * This function intelligently routes data delivery based on payload size relative to a threshold. + * If the serialized payload is smaller than size_threshold, it encodes the data as Base64 and publishes directly over NATS. + * Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS. + * + * @param {string} subject - NATS subject to publish the message to + * @param {Array} data - Array of [dataname, data, type] tuples to send + * @param {Object} options - Optional configuration + * @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server + * @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server for large payloads + * @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads + * @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold in bytes separating direct vs link transport + * @param {string} [options.correlation_id] - Correlation ID for tracing (auto-generated if not provided) + * @param {string} [options.msg_purpose="chat"] - Purpose of the message + * @param {string} [options.sender_name="NATSBridge"] - Name of the sender + * @param {string} [options.receiver_name=""] - Name of the receiver + * @param {string} [options.receiver_id=""] - UUID of the receiver + * @param {string} [options.reply_to=""] - Topic to reply to + * @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to + * @param {boolean} [options.is_publish=true] - Whether to automatically publish the message to NATS + * @param {object} [options.nats_connection] - Pre-existing NATS connection + * @param {string} [options.msg_id] - Message ID (auto-generated if not provided) + * @param {string} [options.sender_id] - Sender ID (auto-generated if not provided) + * @returns {Promise<[Object, string]>} Promise resolving to [envelope, env_json_str] + */ +async function smartsend(subject, data, options = {}) { + const { + broker_url = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler = plikOneshotUpload, + size_threshold = DEFAULT_SIZE_THRESHOLD, + correlation_id = generateUUID(), + msg_purpose = 'chat', + sender_name = 'NATSBridge', + receiver_name = '', + receiver_id = '', + reply_to = '', + reply_to_msg_id = '', + is_publish = true, + nats_connection = null, + msg_id = generateUUID(), + sender_id = generateUUID() + } = options; + + logTrace(correlation_id, `Starting smartsend for subject: ${subject}`); + + // Process each payload in the list + const payloads = []; + for (const [dataname, payloadData, payloadType] of data) { + const payloadBytes = await serializeData(payloadData, payloadType); + const payloadSize = payloadBytes.byteLength; + + logTrace(correlation_id, `Serialized payload '${dataname}' (payload_type: ${payloadType}) size: ${payloadSize} bytes`); + + // Decision: Direct vs Link + if (payloadSize < size_threshold) { + // Direct path - Base64 encode and send via NATS + const payloadB64 = bufferToBase64(payloadBytes); + logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes`); + + payloads.push({ + id: generateUUID(), + dataname, + payload_type: payloadType, + transport: 'direct', + encoding: 'base64', + size: payloadSize, + data: payloadB64, + metadata: { payload_bytes: payloadSize } + }); + } else { + // Link path - Upload to HTTP server, send URL via NATS + logTrace(correlation_id, 'Using link transport, uploading to fileserver'); + + const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); + + if (response.status !== 200) { + throw new Error(`Failed to upload data to fileserver: ${response.status}`); + } + + logTrace(correlation_id, `Uploaded to URL: ${response.url}`); + + payloads.push({ + id: generateUUID(), + dataname, + payload_type: payloadType, + transport: 'link', + encoding: 'none', + size: payloadSize, + data: response.url, + metadata: {} + }); + } + } + + // Build envelope + const env = buildEnvelope(subject, payloads, { + correlation_id, + msg_id, + msg_purpose, + sender_name, + sender_id, + receiver_name, + receiver_id, + reply_to, + reply_to_msg_id, + broker_url + }); + + const env_json_str = JSON.stringify(env); + + if (is_publish) { + if (nats_connection) { + await publishMessage(nats_connection, subject, env_json_str, correlation_id); + } else { + await publishMessage(broker_url, subject, env_json_str, correlation_id); + } + } + + return [env, env_json_str]; +} + +/** + * smartreceive - Receive and process messages from NATS + * + * This function processes incoming NATS messages, handling both direct transport + * (base64 decoded payloads) and link transport (URL-based payloads). + * It deserializes the data based on the transport type and returns the result. + * + * @param {Object} msg - NATS message object + * @param {Object} options - Optional configuration + * @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle downloading data from file server URLs + * @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL + * @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms + * @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms + * @returns {Promise} Promise resolving to envelope object with deserialized payloads + */ +async function smartreceive(msg, options = {}) { + const { + fileserver_download_handler = fetchWithBackoff, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + } = options; + + // Parse the JSON envelope + const payload = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload); + const env_json_obj = JSON.parse(payload); + + logTrace(env_json_obj.correlation_id, 'Processing received message'); + + // Process all payloads in the envelope + const payloads_list = []; + + for (const payload of env_json_obj.payloads) { + const transport = payload.transport; + const dataname = payload.dataname; + + if (transport === 'direct') { + logTrace(env_json_obj.correlation_id, `Direct transport - decoding payload '${dataname}'`); + + // Extract base64 payload from the payload + const payload_b64 = payload.data; + + // Decode Base64 payload + const payload_bytes = base64ToBuffer(payload_b64); + + // Deserialize based on type + const data_type = payload.payload_type; + const data = await deserializeData(payload_bytes, data_type, env_json_obj.correlation_id); + + payloads_list.push([dataname, data, data_type]); + } else if (transport === 'link') { + // Extract download URL from the payload + const url = payload.data; + logTrace(env_json_obj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`); + + // Fetch with exponential backoff using the download handler + const downloaded_data = await fileserver_download_handler( + url, + max_retries, + base_delay, + max_delay, + env_json_obj.correlation_id + ); + + // Deserialize based on type + const data_type = payload.payload_type; + const data = await deserializeData(downloaded_data, data_type, env_json_obj.correlation_id); + + payloads_list.push([dataname, data, data_type]); + } else { + throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`); + } + } + + env_json_obj.payloads = payloads_list; + return env_json_obj; +} + +/** + * NATS Client wrapper for managing connections + */ +class NATSClient { + constructor(url) { + this.url = url; + this.connection = null; + } + + async connect() { + if (!nats) { + throw new Error('nats package not available. Install with: npm install nats'); + } + this.connection = await nats.connect({ servers: this.url }); + return this.connection; + } + + async publish(subject, message) { + if (!this.connection) { + await this.connect(); + } + await this.connection.publish(subject, message); + } + + async close() { + if (this.connection) { + this.connection.close(); + } + } +} + +// Export for Node.js +if (typeof module !== 'undefined' && module.exports) { + module.exports = { + NATSClient, + smartsend, + smartreceive, + plikOneshotUpload, + fetchWithBackoff, + serializeData, + deserializeData, + publishMessage, + logTrace, + generateUUID, + DEFAULT_SIZE_THRESHOLD, + DEFAULT_BROKER_URL, + DEFAULT_FILESERVER_URL + }; +} + +// Export for browser (global scope) +if (typeof window !== 'undefined') { + window.NATSBridge = { + smartsend, + smartreceive, + plikOneshotUpload, + fetchWithBackoff, + serializeData, + deserializeData, + publishMessage, + logTrace, + generateUUID, + NATSClient, + DEFAULT_SIZE_THRESHOLD, + DEFAULT_BROKER_URL, + DEFAULT_FILESERVER_URL + }; +} \ No newline at end of file diff --git a/src/natbridge.py b/src/natbridge.py new file mode 100644 index 0000000..45a7e35 --- /dev/null +++ b/src/natbridge.py @@ -0,0 +1,784 @@ +""" +NATSBridge - Cross-Platform Bi-Directional Data Bridge +Python Implementation (Desktop Python) + +This module provides functionality for sending and receiving data across network boundaries +using NATS as the message bus, with support for both direct payload transport and +URL-based transport for larger payloads. + +File Server Handler Architecture: +The system uses handler functions to abstract file server operations, allowing support +for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). + +Handler Function Signatures: + +```python +# Upload handler - uploads data to file server and returns URL +# The handler is passed to smartsend as fileserver_upload_handler parameter +# It receives: (fileserver_url, dataname, data) +# Returns: Coroutine[Dict[str, Any]] +async def fileserver_upload_handler(fileserver_url, dataname, data) + +# Download handler - fetches data from file server URL with exponential backoff +# The handler is passed to smartreceive as fileserver_download_handler parameter +# It receives: (url, max_retries, base_delay, max_delay, correlation_id) +# Returns: Coroutine[bytes] +async def fileserver_download_handler(url, max_retries, base_delay, max_delay, correlation_id) +``` + +Multi-Payload Support (Standard API): +The system uses a standardized list-of-tuples format for all payload operations. +Even when sending a single payload, the user must wrap it in a list. + +API Standard: + +```python +# Input format for smartsend (always a list of tuples with type info) +[(dataname1, data1, type1), (dataname2, data2, type2), ...] + +# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples) +{ + "correlation_id": "...", + "msg_id": "...", + "timestamp": "...", + "send_to": "...", + "msg_purpose": "...", + "sender_name": "...", + "sender_id": "...", + "receiver_name": "...", + "receiver_id": "...", + "reply_to": "...", + "reply_to_msg_id": "...", + "broker_url": "...", + "metadata": {...}, + "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] +} +``` + +Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" +""" + +import asyncio +import base64 +import json +import uuid +import time +from typing import Any, Callable, Dict, List, Tuple, Union +from dataclasses import dataclass, field +from datetime import datetime + +# Optional dependencies +try: + import aiohttp + AIOHTTP_AVAILABLE = True +except ImportError: + AIOHTTP_AVAILABLE = False + +try: + import pyarrow as arrow + import pyarrow.parquet as pq + import pandas as pd + ARROW_AVAILABLE = True +except ImportError: + ARROW_AVAILABLE = False + +try: + import nats + from nats.aio.client import Client as NATSClient + NATS_AVAILABLE = True +except ImportError: + NATS_AVAILABLE = False + + +# ============================================================================ +# Constants +# ============================================================================ + +DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport +DEFAULT_BROKER_URL = "nats://localhost:4222" # Default NATS server URL +DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL + + +# ============================================================================ +# Data Classes +# ============================================================================ + +@dataclass +class MsgPayloadV1: + """Message payload structure.""" + id: str + dataname: str + payload_type: str + transport: str + encoding: str + size: int + data: Union[str, bytes] # URL for link, base64 for direct + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class MsgEnvelopeV1: + """Message envelope structure.""" + correlation_id: str + msg_id: str + timestamp: str + send_to: str + msg_purpose: str + sender_name: str + sender_id: str + receiver_name: str + receiver_id: str + reply_to: str + reply_to_msg_id: str + broker_url: str + metadata: Dict[str, Any] = field(default_factory=dict) + payloads: List[MsgPayloadV1] = field(default_factory=list) + + +# ============================================================================ +# Utility Functions +# ============================================================================ + +def log_trace(correlation_id: str, message: str) -> None: + """Log a trace message with correlation ID and timestamp.""" + timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + print(f"[{timestamp}] [Correlation: {correlation_id}] {message}") + + +def generate_uuid() -> str: + """Generate a UUID v4 string.""" + return str(uuid.uuid4()) + + +# ============================================================================ +# Serialization Functions +# ============================================================================ + +def _serialize_data(data: Any, payload_type: str) -> bytes: + """ + Serialize data according to specified format. + + Args: + data: Data to serialize + payload_type: Target format: "text", "dictionary", "table", "image", "audio", "video", "binary" + + Returns: + Binary representation of the serialized data + + Raises: + Error: If data type doesn't match payload_type + """ + if payload_type == 'text': + if isinstance(data, str): + return data.encode('utf-8') + else: + raise Error('Text data must be a string') + elif payload_type == 'dictionary': + json_str = json.dumps(data) + return json_str.encode('utf-8') + elif payload_type == 'table': + if not ARROW_AVAILABLE: + raise Error('pyarrow not available for table serialization. Install with: pip install pyarrow pandas') + + # Convert DataFrame to Arrow IPC + import io + buf = io.BytesIO() + + if isinstance(data, pd.DataFrame): + table = arrow.Table.from_pandas(data) + sink = arrow.ipc.new_file(buf) + arrow.ipc.write_table(table, sink) + sink.close() + return buf.getvalue() + else: + raise Error('Table data must be a pandas DataFrame') + elif payload_type == 'image': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Image data must be bytes') + elif payload_type == 'audio': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Audio data must be bytes') + elif payload_type == 'video': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Video data must be bytes') + elif payload_type == 'binary': + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise Error('Binary data must be bytes') + else: + raise Error(f'Unknown payload_type: {payload_type}') + + +def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any: + """ + Deserialize bytes to data based on type. + + Args: + data: Serialized data as bytes + payload_type: Data type + correlation_id: Correlation ID for logging + + Returns: + Deserialized data + """ + if payload_type == 'text': + return data.decode('utf-8') + elif payload_type == 'dictionary': + json_str = data.decode('utf-8') + return json.loads(json_str) + elif payload_type == 'table': + if not ARROW_AVAILABLE: + raise Error('pyarrow not available for table deserialization') + + import io + buf = io.BytesIO(data) + reader = arrow.ipc.open_file(buf) + return reader.read_all().to_pandas() + elif payload_type == 'image': + return data + elif payload_type == 'audio': + return data + elif payload_type == 'video': + return data + elif payload_type == 'binary': + return data + else: + raise Error(f'Unknown payload_type: {payload_type}') + + +# ============================================================================ +# File Server Handlers +# ============================================================================ + +async def plik_oneshot_upload( + file_server_url: str, + dataname: str, + data: bytes +) -> Dict[str, Any]: + """ + Upload a single file to a plik server using one-shot mode. + + Args: + file_server_url: Base URL of the plik server + dataname: Name of the file being uploaded + data: Raw byte data of the file content + + Returns: + Dict with keys: 'status', 'uploadid', 'fileid', 'url' + """ + if not AIOHTTP_AVAILABLE: + raise Error('aiohttp not available. Install with: pip install aiohttp') + + async with aiohttp.ClientSession() as session: + # Get upload id + url_getUploadID = f"{file_server_url}/upload" + headers = {'Content-Type': 'application/json'} + body = json.dumps({"OneShot": True}) + + async with session.post(url_getUploadID, headers=headers, data=body) as response: + response_json = await response.json() + uploadid = response_json['id'] + uploadtoken = response_json['uploadToken'] + + # Upload file + url_upload = f"{file_server_url}/file/{uploadid}" + headers = {'X-UploadToken': uploadtoken} + + form = aiohttp.FormData() + form.add_field('file', data, filename=dataname, content_type='application/octet-stream') + + async with session.post(url_upload, headers=headers, data=form) as upload_response: + upload_json = await upload_response.json() + fileid = upload_json['id'] + + url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}" + + return { + 'status': upload_response.status, + 'uploadid': uploadid, + 'fileid': fileid, + 'url': url + } + + +async def fetch_with_backoff( + url: str, + max_retries: int, + base_delay: int, + max_delay: int, + correlation_id: str +) -> bytes: + """ + Fetch data from URL with exponential backoff. + + Args: + url: URL to fetch from + max_retries: Maximum retry attempts + base_delay: Initial delay in milliseconds + max_delay: Maximum delay in milliseconds + correlation_id: Correlation ID for logging + + Returns: + Fetched data as bytes + + Raises: + Error: If all retry attempts fail + """ + if not AIOHTTP_AVAILABLE: + raise Error('aiohttp not available. Install with: pip install aiohttp') + + delay = base_delay + + for attempt in range(1, max_retries + 1): + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + log_trace(correlation_id, f"Successfully fetched data from {url} on attempt {attempt}") + return await response.read() + else: + raise Error(f"Failed to fetch: {response.status}") + except Exception as e: + log_trace(correlation_id, f"Attempt {attempt} failed: {type(e).__name__}") + + if attempt < max_retries: + await asyncio.sleep(delay / 1000.0) + delay = min(delay * 2, max_delay) + + raise Error(f"Failed to fetch data after {max_retries} attempts") + + +# ============================================================================ +# NATS Publishing +# ============================================================================ + +async def publish_message( + broker_url_or_conn: Union[str, NATSClient], + subject: str, + message: str, + correlation_id: str, + nats_connection: NATSClient = None +) -> None: + """ + Publish message to NATS. + + Args: + broker_url_or_conn: NATS server URL or pre-existing connection + subject: NATS subject to publish to + message: JSON message to publish + correlation_id: Correlation ID for tracing and logging + nats_connection: Optional pre-existing NATS connection + """ + if nats_connection: + # Use provided connection + try: + await nats_connection.publish(subject, message) + log_trace(correlation_id, f"Message published to {subject}") + finally: + # Note: In a real implementation, you might want to drain/close the connection + pass + elif isinstance(broker_url_or_conn, str): + # URL-based - create new connection + if not NATS_AVAILABLE: + raise Error('nats-py not available. Install with: pip install nats-py') + + conn = await nats.connect(broker_url_or_conn) + try: + await conn.publish(subject, message) + log_trace(correlation_id, f"Message published to {subject}") + finally: + await conn.drain() + else: + raise Error('Invalid broker_url_or_conn type') + + +# ============================================================================ +# Core Functions +# ============================================================================ + +async def smartsend( + subject: str, + data: List[Tuple[str, Any, str]], + broker_url: str = DEFAULT_BROKER_URL, + fileserver_url: str = DEFAULT_FILESERVER_URL, + fileserver_upload_handler: Callable = plik_oneshot_upload, + size_threshold: int = DEFAULT_SIZE_THRESHOLD, + correlation_id: str = None, + msg_purpose: str = "chat", + sender_name: str = "NATSBridge", + receiver_name: str = "", + receiver_id: str = "", + reply_to: str = "", + reply_to_msg_id: str = "", + is_publish: bool = True, + nats_connection: NATSClient = None, + msg_id: str = None, + sender_id: str = None +) -> Tuple[Dict, str]: + """ + Send data via NATS with automatic transport selection. + + This function intelligently routes data delivery based on payload size relative to a threshold. + If the serialized payload is smaller than size_threshold, it encodes the data as Base64 + and publishes directly over NATS. Otherwise, it uploads the data to a fileserver + and publishes only the download URL over NATS. + + Args: + subject: NATS subject to publish the message to + data: List of (dataname, data, type) tuples + broker_url: URL of the NATS server + fileserver_url: URL of the HTTP file server for large payloads + fileserver_upload_handler: Function to handle fileserver uploads + size_threshold: Threshold in bytes separating direct vs link transport + correlation_id: Correlation ID for tracing (auto-generated if not provided) + msg_purpose: Purpose of the message + sender_name: Name of the sender + receiver_name: Name of the receiver + receiver_id: UUID of the receiver + reply_to: Topic to reply to + reply_to_msg_id: Message ID this message is replying to + is_publish: Whether to automatically publish the message to NATS + nats_connection: Pre-existing NATS connection (if provided, uses this instead of creating a new one) + msg_id: Message ID (auto-generated if not provided) + sender_id: Sender ID (auto-generated if not provided) + + Returns: + Tuple of (env, env_json_str) where: + - env: Dict with all metadata and payloads + - env_json_str: JSON string for publishing to NATS + + Example: + ```python + # Send a single payload (still wrapped in a list) + config = {"key": "value"} + env, env_json_str = await smartsend( + "my.subject", + [("config", config, "dictionary")], + broker_url="nats://localhost:4222" + ) + + # Send multiple payloads in one message with different types + data1 = {"key1": "value1"} + data2 = [1, 2, 3, 4, 5] + env, env_json_str = await smartsend( + "my.subject", + [("data1", data1, "dictionary"), ("data2", data2, "table")] + ) + + # Mixed content (e.g., chat with text and image) + env, env_json_str = await smartsend( + "chat.subject", + [ + ("message_text", "Hello!", "text"), + ("user_image", image_bytes, "image"), + ("audio_clip", audio_bytes, "audio") + ] + ) + ``` + """ + if correlation_id is None: + correlation_id = generate_uuid() + if msg_id is None: + msg_id = generate_uuid() + if sender_id is None: + sender_id = generate_uuid() + + log_trace(correlation_id, f"Starting smartsend for subject: {subject}") + + # Process each payload in the list + payloads = [] + for dataname, payload_data, payload_type in data: + payload_bytes = _serialize_data(payload_data, payload_type) + payload_size = len(payload_bytes) + + log_trace(correlation_id, f"Serialized payload '{dataname}' (payload_type: {payload_type}) size: {payload_size} bytes") + + # Decision: Direct vs Link + if payload_size < size_threshold: + # Direct path - Base64 encode and send via NATS + payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') + log_trace(correlation_id, f"Using direct transport for {payload_size} bytes") + + payloads.append({ + 'id': generate_uuid(), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'direct', + 'encoding': 'base64', + 'size': payload_size, + 'data': payload_b64, + 'metadata': {'payload_bytes': payload_size} + }) + else: + # Link path - Upload to HTTP server, send URL via NATS + log_trace(correlation_id, "Using link transport, uploading to fileserver") + + response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes) + + if response['status'] != 200: + raise Error(f"Failed to upload data to fileserver: {response['status']}") + + log_trace(correlation_id, f"Uploaded to URL: {response['url']}") + + payloads.append({ + 'id': generate_uuid(), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'link', + 'encoding': 'none', + 'size': payload_size, + 'data': response['url'], + 'metadata': {} + }) + + # Build envelope + env = { + 'correlation_id': correlation_id, + 'msg_id': msg_id, + 'timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', + 'send_to': subject, + 'msg_purpose': msg_purpose, + 'sender_name': sender_name, + 'sender_id': sender_id, + 'receiver_name': receiver_name, + 'receiver_id': receiver_id, + 'reply_to': reply_to, + 'reply_to_msg_id': reply_to_msg_id, + 'broker_url': broker_url, + 'metadata': {}, + 'payloads': payloads + } + + env_json_str = json.dumps(env) + + if is_publish: + if nats_connection: + await publish_message(broker_url, subject, env_json_str, correlation_id, nats_connection) + else: + await publish_message(broker_url, subject, env_json_str, correlation_id) + + return env, env_json_str + + +async def smartreceive( + msg: Any, + fileserver_download_handler: Callable = fetch_with_backoff, + max_retries: int = 5, + base_delay: int = 100, + max_delay: int = 5000 +) -> Dict: + """ + Receive and process messages from NATS. + + This function processes incoming NATS messages, handling both direct transport + (base64 decoded payloads) and link transport (URL-based payloads). + It deserializes the data based on the transport type and returns the result. + + Args: + msg: NATS message object + fileserver_download_handler: Function to handle downloading data from file server URLs + max_retries: Maximum retry attempts for fetching URL + base_delay: Initial delay for exponential backoff in ms + max_delay: Maximum delay for exponential backoff in ms + + Returns: + Dict with envelope metadata and payloads field containing list of tuples + + Example: + ```python + # Receive and process message + msg = await nats_connection.subscribe("my.subject") + env = await smartreceive(msg) + # env["payloads"] = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...] + ``` + """ + # Parse the JSON envelope + if isinstance(msg.payload, bytes): + payload = msg.payload.decode('utf-8') + else: + payload = msg.payload + + env_json_obj = json.loads(payload) + log_trace(env_json_obj['correlation_id'], "Processing received message") + + # Process all payloads in the envelope + payloads_list = [] + + for payload in env_json_obj['payloads']: + transport = payload['transport'] + dataname = payload['dataname'] + + if transport == 'direct': + log_trace(env_json_obj['correlation_id'], f"Direct transport - decoding payload '{dataname}'") + + # Extract base64 payload from the payload + payload_b64 = payload['data'] + + # Decode Base64 payload + payload_bytes = base64.b64decode(payload_b64) + + # Deserialize based on type + data_type = payload['payload_type'] + data = _deserialize_data(payload_bytes, data_type, env_json_obj['correlation_id']) + + payloads_list.append((dataname, data, data_type)) + elif transport == 'link': + # Extract download URL from the payload + url = payload['data'] + log_trace(env_json_obj['correlation_id'], f"Link transport - fetching '{dataname}' from URL: {url}") + + # Fetch with exponential backoff using the download handler + downloaded_data = await fileserver_download_handler( + url, + max_retries, + base_delay, + max_delay, + env_json_obj['correlation_id'] + ) + + # Deserialize based on type + data_type = payload['payload_type'] + data = _deserialize_data(downloaded_data, data_type, env_json_obj['correlation_id']) + + payloads_list.append((dataname, data, data_type)) + else: + raise Error(f"Unknown transport type for payload '{dataname}': {transport}") + + env_json_obj['payloads'] = payloads_list + return env_json_obj + + +# ============================================================================ +# NATS Client Wrapper +# ============================================================================ + +class NATSBridge: + """ + Cross-platform NATS bridge implementation. + + Provides a class-based interface for NATSBridge functionality. + """ + + def __init__(self, broker_url: str = None, fileserver_url: str = None): + """ + Initialize the NATSBridge client. + + Args: + broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL) + fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL) + """ + self.broker_url = broker_url or DEFAULT_BROKER_URL + self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL + self._nats_client: NATSClient = None + + async def connect(self, broker_url: str = None) -> NATSClient: + """ + Connect to NATS server. + + Args: + broker_url: NATS server URL (optional, uses instance broker_url if not provided) + + Returns: + NATS connection client + """ + url = broker_url or self.broker_url + if not NATS_AVAILABLE: + raise Error('nats-py not available. Install with: pip install nats-py') + self._nats_client = await nats.connect(url) + return self._nats_client + + async def smartsend( + self, + subject: str, + data: List[Tuple[str, Any, str]], + **kwargs + ) -> Tuple[Dict, str]: + """ + Send data via NATS using instance configuration. + + Args: + subject: NATS subject to publish to + data: List of (dataname, data, type) tuples + **kwargs: Additional options (broker_url, fileserver_url, etc.) + + Returns: + Tuple of (env, env_json_str) + """ + # Merge instance config with kwargs + options = { + 'broker_url': kwargs.get('broker_url', self.broker_url), + 'fileserver_url': kwargs.get('fileserver_url', self.fileserver_url), + **kwargs + } + return await smartsend(subject, data, **options) + + async def smartreceive( + self, + msg: Any, + **kwargs + ) -> Dict: + """ + Receive and process NATS message using instance configuration. + + Args: + msg: NATS message object + **kwargs: Additional options + + Returns: + Dict with envelope metadata and payloads + """ + return await smartreceive(msg, **kwargs) + + async def close(self): + """Close the NATS connection.""" + if self._nats_client: + await self._nats_client.close() + self._nats_client = None + + +# ============================================================================ +# Module Exports +# ============================================================================ + +__all__ = [ + # Core functions + 'smartsend', + 'smartreceive', + + # Utility functions + 'log_trace', + 'generate_uuid', + '_serialize_data', + '_deserialize_data', + + # File server handlers + 'plik_oneshot_upload', + 'fetch_with_backoff', + + # NATS publishing + 'publish_message', + + # Data classes + 'MsgPayloadV1', + 'MsgEnvelopeV1', + + # Client class + 'NATSBridge', + + # Constants + 'DEFAULT_SIZE_THRESHOLD', + 'DEFAULT_BROKER_URL', + 'DEFAULT_FILESERVER_URL', + + # Availability flags + 'ARROW_AVAILABLE', + 'AIOHTTP_AVAILABLE', + 'NATS_AVAILABLE', +] \ No newline at end of file diff --git a/src/natbridge_mpy.py b/src/natbridge_mpy.py new file mode 100644 index 0000000..e5cd464 --- /dev/null +++ b/src/natbridge_mpy.py @@ -0,0 +1,728 @@ +""" +NATSBridge - Cross-Platform Bi-Directional Data Bridge +MicroPython Implementation + +This module provides functionality for sending and receiving data across network boundaries +using NATS as the message bus, with support for both direct payload transport and +URL-based transport for larger payloads. + +MicroPython Limitations: +- No Arrow IPC support (memory constraints) +- Only direct transport (< 100KB threshold enforced) +- Synchronous API (no async/await) +- Limited UUID generation +- Simplified file server handlers + +Handler Function Signatures: + +```python +# Upload handler - uploads data to file server and returns URL +# The handler is passed to smartsend as fileserver_upload_handler parameter +# It receives: (fileserver_url, dataname, data) +# Returns: Dict with keys: 'status', 'url' (MicroPython simplified) +def fileserver_upload_handler(fileserver_url, dataname, data) + +# Download handler - fetches data from file server URL with exponential backoff +# The handler is passed to smartreceive as fileserver_download_handler parameter +# It receives: (url, max_retries, base_delay, max_delay, correlation_id) +# Returns: bytearray +def fileserver_download_handler(url, max_retries, base_delay, max_delay, correlation_id) +``` + +Multi-Payload Support (Standard API): +The system uses a standardized list-of-tuples format for all payload operations. + +API Standard: + +```python +# Input format for smartsend (always a list of tuples with type info) +[(dataname1, data1, type1), (dataname2, data2, type2), ...] + +# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples) +{ + "correlation_id": "...", + "msg_id": "...", + "timestamp": "...", + "send_to": "...", + "msg_purpose": "...", + "sender_name": "...", + "sender_id": "...", + "receiver_name": "...", + "receiver_id": "...", + "reply_to": "...", + "reply_to_msg_id": "...", + "broker_url": "...", + "metadata": {...}, + "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] +} +``` + +Supported types: "text", "dictionary", "image", "audio", "video", "binary" +Note: "table" type is NOT supported in MicroPython due to memory constraints +""" + +import network +import socket +import time +import json +import base64 +import uos + +# ============================================================================ +# Constants +# ============================================================================ + +# MicroPython-specific lower thresholds due to memory constraints +DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython (vs 1MB on desktop) +DEFAULT_BROKER_URL = "nats://localhost:4222" +DEFAULT_FILESERVER_URL = "http://localhost:8080" +MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython to prevent OOM + +# NATS connection settings +NATS_SERVER_HOST = "localhost" +NATS_SERVER_PORT = 4222 +NATS_RECONNECT_DELAY = 5000 # 5 seconds + + +# ============================================================================ +# Utility Functions +# ============================================================================ + +def log_trace(correlation_id, message): + """Log a trace message with correlation ID and timestamp.""" + try: + timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()) + print(f"[{timestamp}] [Correlation: {correlation_id}] {message}") + except Exception: + # Fallback for MicroPython + print(f"[Correlation: {correlation_id}] {message}") + + +def generate_uuid(): + """ + Generate a simple UUID for MicroPython. + + Note: This is not a true UUID v4, but provides uniqueness for tracing. + """ + # Use time-based unique identifier + tick = time.ticks_ms() + random_byte = __builtins__.chr(tick % 256) if hasattr(__builtins__, 'chr') else chr(tick % 256) + return f"mp-{tick:08x}-{random_byte}" + + +def _serialize_data(data, payload_type): + """ + Serialize data to bytes based on type. + + MicroPython version - no table support. + + Args: + data: Data to serialize + payload_type: Target format: "text", "dictionary", "image", "audio", "video", "binary" + + Returns: + bytes: Serialized data + + Raises: + ValueError: If payload_type is unknown + """ + if payload_type == 'text': + if isinstance(data, str): + return data.encode('utf-8') + else: + raise ValueError('Text data must be a string') + elif payload_type == 'dictionary': + json_str = json.dumps(data) + return json_str.encode('utf-8') + elif payload_type in ('image', 'audio', 'video', 'binary'): + if isinstance(data, (bytes, bytearray)): + return bytes(data) + else: + raise ValueError(f'{payload_type} data must be bytes') + else: + raise ValueError(f'Unknown payload_type: {payload_type}') + + +def _deserialize_data(data, payload_type): + """ + Deserialize bytes to data based on type. + + MicroPython version - no table support. + + Args: + data: Serialized data as bytes + payload_type: Data type + + Returns: + Deserialized data + """ + if payload_type == 'text': + return data.decode('utf-8') + elif payload_type == 'dictionary': + json_str = data.decode('utf-8') + return json.loads(json_str) + elif payload_type in ('image', 'audio', 'video', 'binary'): + return data + else: + raise ValueError(f'Unknown payload_type: {payload_type}') + + +# ============================================================================ +# File Server Handlers (Simplified for MicroPython) +# ============================================================================ + +def _http_request(method, url, headers=None, data=None): + """ + Make an HTTP request (simplified for MicroPython). + + Args: + method: HTTP method ('GET' or 'POST') + url: URL to request + headers: Optional headers dict + data: Optional request body + + Returns: + tuple: (status_code, response_body, headers_dict) + """ + # Parse URL + # Simple parsing: http://host:port/path + try: + # Remove protocol + url = url.replace('http://', '').replace('https://', '') + + # Split host and path + if '/' in url: + host_part, path = url.split('/', 1) + path = '/' + path + else: + host_part = url + path = '/' + + # Split host and port + if ':' in host_part: + host, port = host_part.split(':') + port = int(port) + else: + host = host_part + port = 80 + + # Connect + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(10) # 10 second timeout + sock.connect((host, port)) + + # Build request + request_lines = [f"{method} {path} HTTP/1.1"] + request_lines.append(f"Host: {host}") + request_lines.append("Connection: close") + + if headers: + for key, value in headers.items(): + request_lines.append(f"{key}: {value}") + + request_lines.append("") + + request = '\r\n'.join(request_lines) + if data: + request += data.decode('utf-8') if isinstance(data, bytes) else data + + sock.send(request.encode('utf-8')) + + # Read response + response = b'' + while True: + try: + chunk = sock.recv(1024) + if not chunk: + break + response += chunk + except Exception: + break + + sock.close() + + # Parse response + if b'\r\n\r\n' in response: + header_part, body = response.split(b'\r\n\r\n', 1) + header_lines = header_part.decode('utf-8', errors='ignore').split('\r\n') + else: + header_lines = response.decode('utf-8', errors='ignore').split('\r\n') + body = b'' + + # Parse status line + status_line = header_lines[0] + status_code = int(status_line.split(' ')[1]) if len(status_line.split(' ')) > 1 else 200 + + # Parse headers + headers_dict = {} + for line in header_lines[1:]: + if ':' in line: + key, value = line.split(':', 1) + headers_dict[key.strip()] = value.strip() + + return status_code, body, headers_dict + + except Exception as e: + # Return error indicator + return 0, b'', {'error': str(e)} + + +def _simple_fileserver_upload(fileserver_url, dataname, data): + """ + Simple file upload handler for MicroPython (simplified plik-style). + + Note: This is a basic implementation. For production, use a proper + file server with the same API as plik. + + Args: + fileserver_url: Base URL of file server + dataname: Name of the file + data: Binary data + + Returns: + Dict with keys: 'status', 'url' + """ + # Simple implementation: return a constructed URL + # In practice, you'd need to implement actual upload logic + + # For MicroPython, we'll use a simple approach + # Generate a unique ID based on timestamp and dataname + unique_id = f"{int(time.time())}-{abs(hash(dataname)) % 10000:04d}" + + # Construct download URL + url = f"{fileserver_url}/file/{unique_id}/{dataname}" + + # In a real implementation, you would: + # 1. POST to /upload to get upload ID + # 2. POST file data to /file/{upload_id} + # 3. Return the download URL + + # For now, return a mock response + return { + 'status': 200, + 'url': url + } + + +def _simple_fileserver_download(url, max_retries, base_delay, max_delay, correlation_id): + """ + Simple file download handler with exponential backoff. + + Args: + url: URL to download from + max_retries: Maximum retry attempts + base_delay: Initial delay in milliseconds + max_delay: Maximum delay in milliseconds + correlation_id: Correlation ID for logging + + Returns: + bytearray: Downloaded data + """ + delay = base_delay + + for attempt in range(1, max_retries + 1): + try: + status_code, body, headers = _http_request('GET', url) + + if status_code == 200: + log_trace(correlation_id, f"Successfully fetched data from {url} on attempt {attempt}") + return bytearray(body) + else: + log_trace(correlation_id, f"Download failed with status {status_code}") + if attempt < max_retries: + time.sleep_ms(delay) + delay = min(delay * 2, max_delay) + else: + raise RuntimeError(f"Download failed with status {status_code}") + except Exception as e: + log_trace(correlation_id, f"Attempt {attempt} failed: {type(e).__name__}") + if attempt < max_retries: + time.sleep_ms(delay) + delay = min(delay * 2, max_delay) + else: + raise RuntimeError(f"Failed to download after {max_retries} attempts: {e}") + + return bytearray() + + +# ============================================================================ +# NATS Connection (Simplified for MicroPython) +# ============================================================================ + +class NATSConnection: + """ + Simplified NATS connection for MicroPython. + + Note: This is a basic implementation. For production use, consider + using a proper MicroPython NATS client or the full Python implementation. + """ + + def __init__(self, host=NATS_SERVER_HOST, port=NATS_SERVER_PORT): + self.host = host + self.port = port + self.sock = None + self.connected = False + + def connect(self): + """Connect to NATS server.""" + try: + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.settimeout(10) + self.sock.connect((self.host, self.port)) + self.connected = True + + # Send CONNECT command (simplified) + connect_cmd = b'\x08CONNECT{"verbose":false,"protocol":1,"version":"1.0.2","auth_token":"","lang":"mpy"}\r\n' + self.sock.send(connect_cmd) + + # Wait for PONG + response = self.sock.recv(1024) + return True + except Exception as e: + self.connected = False + raise RuntimeError(f"Failed to connect to NATS: {e}") + + def publish(self, subject, message): + """Publish a message to a subject.""" + if not self.connected or not self.sock: + raise RuntimeError("Not connected to NATS") + + # Publish command: PUB subject size\r\nmessage\r\n + msg_bytes = message.encode('utf-8') if isinstance(message, str) else message + pub_cmd = f"PUB {subject} {len(msg_bytes)}\r\n".encode('utf-8') + pub_cmd += msg_bytes + b'\r\n' + + self.sock.send(pub_cmd) + + # Wait for +OK response (simplified) + try: + self.sock.recv(4) + except Exception: + pass + + def close(self): + """Close the connection.""" + if self.sock: + try: + self.sock.close() + except Exception: + pass + self.sock = None + self.connected = False + + +# ============================================================================ +# Core Functions +# ============================================================================ + +def smartsend( + subject, + data, + broker_url=DEFAULT_BROKER_URL, + fileserver_url=DEFAULT_FILESERVER_URL, + fileserver_upload_handler=_simple_fileserver_upload, + size_threshold=DEFAULT_SIZE_THRESHOLD, + correlation_id=None, + msg_purpose="chat", + sender_name="NATSBridge", + receiver_name="", + receiver_id="", + reply_to="", + reply_to_msg_id="", + is_publish=True, + msg_id=None, + sender_id=None +): + """ + Send data via NATS with automatic transport selection. + + MicroPython version - synchronous, limited features. + + Args: + subject: NATS subject to publish to + data: List of (dataname, data, type) tuples + broker_url: NATS server URL + fileserver_url: HTTP file server URL + fileserver_upload_handler: Function to handle fileserver uploads + size_threshold: Threshold in bytes (enforced MAX_PAYLOAD_SIZE) + correlation_id: Correlation ID for tracing + msg_purpose: Purpose of the message + sender_name: Name of the sender + receiver_name: Name of the receiver + receiver_id: UUID of the receiver + reply_to: Topic to reply to + reply_to_msg_id: Message ID this message is replying to + is_publish: Whether to automatically publish the message + msg_id: Message ID + sender_id: Sender ID + + Returns: + Tuple of (env, env_json_str) + + Example: + ```python + from natbridge_mpy import NATSBridge + + bridge = NATSBridge() + env, env_json_str = bridge.smartsend( + "/chat", + [("message", "Hello!", "text"), ("data", data_bytes, "binary")], + size_threshold=100000 # Lower threshold for MicroPython + ) + ``` + """ + if correlation_id is None: + correlation_id = generate_uuid() + if msg_id is None: + msg_id = generate_uuid() + if sender_id is None: + sender_id = generate_uuid() + + # Enforce MAX_PAYLOAD_SIZE for MicroPython + effective_threshold = min(size_threshold, MAX_PAYLOAD_SIZE) + + log_trace(correlation_id, f"Starting smartsend for subject: {subject}") + + # Process each payload + payloads = [] + for dataname, payload_data, payload_type in data: + payload_bytes = _serialize_data(payload_data, payload_type) + payload_size = len(payload_bytes) + + # Check against hard limit + if payload_size > MAX_PAYLOAD_SIZE: + raise MemoryError(f"Payload '{dataname}' exceeds max size {MAX_PAYLOAD_SIZE} bytes") + + log_trace(correlation_id, f"Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes") + + # Always use direct transport in MicroPython (link transport not fully supported) + # But respect size_threshold for smaller payloads + if payload_size < effective_threshold: + # Direct path - Base64 encode + payload_b64 = base64.b64encode(payload_bytes).decode('ascii') + log_trace(correlation_id, f"Using direct transport for {payload_size} bytes") + + payloads.append({ + 'id': generate_uuid(), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'direct', + 'encoding': 'base64', + 'size': payload_size, + 'data': payload_b64, + 'metadata': {'payload_bytes': payload_size} + }) + else: + # Link path (limited support) + log_trace(correlation_id, "Using link transport, uploading to fileserver") + + response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) + + log_trace(correlation_id, f"Uploaded to URL: {response.get('url', 'N/A')}") + + payloads.append({ + 'id': generate_uuid(), + 'dataname': dataname, + 'payload_type': payload_type, + 'transport': 'link', + 'encoding': 'none', + 'size': payload_size, + 'data': response.get('url', ''), + 'metadata': {} + }) + + # Build envelope + env = { + 'correlation_id': correlation_id, + 'msg_id': msg_id, + 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), + 'send_to': subject, + 'msg_purpose': msg_purpose, + 'sender_name': sender_name, + 'sender_id': sender_id, + 'receiver_name': receiver_name, + 'receiver_id': receiver_id, + 'reply_to': reply_to, + 'reply_to_msg_id': reply_to_msg_id, + 'broker_url': broker_url, + 'metadata': {}, + 'payloads': payloads + } + + env_json_str = json.dumps(env) + + # Publish if requested + if is_publish: + try: + conn = NATSConnection() + conn.connect() + conn.publish(subject, env_json_str) + conn.close() + log_trace(correlation_id, f"Message published to {subject}") + except Exception as e: + log_trace(correlation_id, f"Failed to publish: {e}") + # Don't raise - MicroPython should be resilient + + return env, env_json_str + + +def smartreceive(msg, fileserver_download_handler=None, max_retries=3, base_delay=100, max_delay=1000): + """ + Receive and process NATS message. + + MicroPython version - synchronous. + + Args: + msg: Message object with 'payload' attribute + fileserver_download_handler: Function to handle fileserver downloads + max_retries: Maximum retry attempts + base_delay: Initial delay in milliseconds + max_delay: Maximum delay in milliseconds + + Returns: + Dict with envelope metadata and payloads + """ + if fileserver_download_handler is None: + fileserver_download_handler = _simple_fileserver_download + + # Parse envelope + if isinstance(msg.payload, bytes): + payload = msg.payload.decode('utf-8') + else: + payload = msg.payload + + try: + env_json_obj = json.loads(payload) + except Exception: + # Handle cases where payload might be a bytearray + payload_str = str(msg.payload) + env_json_obj = json.loads(payload_str) + + correlation_id = env_json_obj.get('correlation_id', 'unknown') + log_trace(correlation_id, "Processing received message") + + # Process payloads + payloads_list = [] + + for payload in env_json_obj.get('payloads', []): + transport = payload.get('transport', 'direct') + dataname = payload.get('dataname', 'unknown') + + if transport == 'direct': + log_trace(correlation_id, f"Direct transport - decoding payload '{dataname}'") + + payload_b64 = payload.get('data', '') + payload_bytes = base64.b64decode(payload_b64) + + data_type = payload.get('payload_type', 'text') + data = _deserialize_data(payload_bytes, data_type) + + payloads_list.append((dataname, data, data_type)) + elif transport == 'link': + url = payload.get('data', '') + log_trace(correlation_id, f"Link transport - fetching '{dataname}' from URL: {url}") + + downloaded_data = fileserver_download_handler( + url, max_retries, base_delay, max_delay, correlation_id + ) + + data_type = payload.get('payload_type', 'binary') + data = _deserialize_data(downloaded_data, data_type) + + payloads_list.append((dataname, data, data_type)) + else: + log_trace(correlation_id, f"Unknown transport type: {transport}") + + env_json_obj['payloads'] = payloads_list + return env_json_obj + + +# ============================================================================ +# NATSBridge Class (MicroPython) +# ============================================================================ + +class NATSBridge: + """ + MicroPython NATS bridge implementation. + + Provides a class-based interface for NATSBridge functionality. + Optimized for memory-constrained devices. + """ + + def __init__(self, broker_url=None, fileserver_url=None): + """ + Initialize the NATSBridge client. + + Args: + broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL) + fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL) + """ + self.broker_url = broker_url or DEFAULT_BROKER_URL + self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL + self._nats_conn = None + + def smartsend(self, subject, data, **kwargs): + """ + Send data via NATS. + + Args: + subject: NATS subject to publish to + data: List of (dataname, data, type) tuples + **kwargs: Additional options + + Returns: + Tuple of (env, env_json_str) + """ + # Merge with defaults + options = { + 'broker_url': kwargs.get('broker_url', self.broker_url), + 'fileserver_url': kwargs.get('fileserver_url', self.fileserver_url), + **kwargs + } + return smartsend(subject, data, **options) + + def smartreceive(self, msg, **kwargs): + """ + Receive and process NATS message. + + Args: + msg: Message object with 'payload' attribute + **kwargs: Additional options + + Returns: + Dict with envelope metadata and payloads + """ + return smartreceive(msg, **kwargs) + + +# ============================================================================ +# Module Exports +# ============================================================================ + +__all__ = [ + # Core functions + 'smartsend', + 'smartreceive', + + # Utility functions + 'generate_uuid', + '_serialize_data', + '_deserialize_data', + + # File server handlers + '_simple_fileserver_upload', + '_simple_fileserver_download', + + # NATS connection + 'NATSConnection', + 'NATSBridge', + + # Constants + 'DEFAULT_SIZE_THRESHOLD', + 'DEFAULT_BROKER_URL', + 'DEFAULT_FILESERVER_URL', + 'MAX_PAYLOAD_SIZE', + 'NATS_SERVER_HOST', + 'NATS_SERVER_PORT', + 'NATS_RECONNECT_DELAY', +] \ No newline at end of file