diff --git a/README.md b/README.md index be3f1ce..be22435 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# NATSBridge +# NATSBridge - Cross-Platform Bi-Directional Data Bridge -A high-performance, bi-directional data bridge for **Julia** applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. +A high-performance, bi-directional data bridge for **Julia, JavaScript, Python, and MicroPython** applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) [![NATS](https://img.shields.io/badge/NATS-Enabled-green.svg)](https://nats.io) @@ -10,6 +10,7 @@ A high-performance, bi-directional data bridge for **Julia** applications using ## Table of Contents - [Overview](#overview) +- [Cross-Platform Support](#cross-platform-support) - [Features](#features) - [Architecture](#architecture) - [Installation](#installation) @@ -17,7 +18,7 @@ A high-performance, bi-directional data bridge for **Julia** applications using - [API Reference](#api-reference) - [Payload Types](#payload-types) - [Transport Strategies](#transport-strategies) -- [Examples](#examples) +- [Cross-Platform Examples](#cross-platform-examples) - [Testing](#testing) - [License](#license) @@ -25,7 +26,7 @@ A high-performance, bi-directional data bridge for **Julia** applications using ## Overview -NATSBridge enables seamless communication for Julia applications through NATS, with intelligent transport selection based on payload size: +NATSBridge enables seamless communication across multiple platforms through NATS, with intelligent transport selection based on payload size: | Transport | Payload Size | Method | |-----------|--------------|--------| @@ -36,14 +37,40 @@ NATSBridge enables seamless communication for Julia applications through NATS, w - **Chat Applications**: Text, images, audio, video in a single message - **File Transfer**: Efficient transfer of large files using claim-check pattern -- **Streaming Data**: Sensor data, telemetry, and analytics pipelines +- **IoT/Embedded**: Sensor data, telemetry, and analytics pipelines (MicroPython) +- **Cross-Platform Communication**: Interoperability between Julia, JavaScript, Python, and MicroPython systems +--- + +## Cross-Platform Support + +| Platform | Implementation | Features | +|----------|----------------|----------| +| **Julia** | [`src/NATSBridge.jl`](src/NATSBridge.jl) | Full feature set, Arrow IPC, multiple dispatch | +| **JavaScript** | [`src/natbridge.js`](src/natbridge.js) | Node.js & browser, async/await | +| **Python** | [`src/natbridge.py`](src/natbridge.py) | Desktop Python, asyncio, type hints | +| **MicroPython** | [`src/natbridge_mpy.py`](src/natbridge_mpy.py) | Memory-constrained, synchronous API | + +### Platform Comparison + +| Feature | Julia | JavaScript | Python | MicroPython | +|---------|-------|------------|--------|-------------| +| Multiple Dispatch | ✅ Native | ❌ | ❌ | ❌ | +| Async/Await | ❌ | ✅ Native | ✅ Native | ⚠️ (uasyncio) | +| Type Safety | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ | +| Memory Management | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) | +| Arrow IPC | ✅ Native | ✅ | ✅ | ❌ | +| Direct Transport | ✅ | ✅ | ✅ | ✅ | +| Link Transport | ✅ | ✅ | ✅ | ⚠️ (Limited) | +| Handler Functions | ✅ | ✅ | ✅ | ✅ | +| Cross-Platform API | ✅ | ✅ | ✅ | ✅ | --- ## Features -- ✅ **Bi-directional messaging** for Julia applications +- ✅ **Cross-platform messaging** for Julia, JavaScript, Python, and MicroPython applications +- ✅ **Bi-directional messaging** with request-reply patterns - ✅ **Multi-payload support** - send multiple payloads with different types in one message - ✅ **Automatic transport selection** - direct vs link based on payload size - ✅ **Claim-Check pattern** for payloads > 1MB @@ -51,8 +78,7 @@ NATSBridge enables seamless communication for Julia applications through NATS, w - ✅ **Exponential backoff** for reliable file server downloads - ✅ **Correlation ID tracking** for message tracing - ✅ **Reply-to support** for request-response patterns -- ✅ **JetStream support** for message replay and durability - +- ✅ **Handler function abstraction** - pluggable file server implementations (Plik, AWS S3, custom) --- @@ -62,13 +88,13 @@ NATSBridge enables seamless communication for Julia applications through NATS, w ```mermaid flowchart TB - subgraph Sender["Julia Application (Sender)"] + subgraph Sender["Application (Sender)"] SenderApp[App Code] NATSBridge_Send[NATSBridge] NATS_Client[NATS.jl] end - subgraph Receiver["Julia Application (Receiver)"] + subgraph Receiver["Application (Receiver)"] ReceiverApp[App Code] NATSBridge_Recv[NATSBridge] NATS_Client_Recv[NATS.jl] @@ -96,14 +122,6 @@ flowchart TB style FileServer fill:#f3e5f5 ``` -### Key Components - -| Component | Description | -|-----------|-------------| -| **Julia Application** | Sender and receiver applications using the NATSBridge module | -| **NATS Server** | Message broker for transporting message envelopes | -| **HTTP File Server** | Independent HTTP server for large payload storage (e.g., Plik) | - ### Message Flow 1. **Sender** creates a message envelope with payloads using `smartsend()` @@ -124,11 +142,53 @@ The system uses handler functions to abstract file server operations: | Handler | Purpose | |---------|---------| -| `plik_oneshot_upload()` | Uploads payload bytes to file server, returns URL | -| `_fetch_with_backoff()` | Downloads data from URL with exponential backoff retry | +| `plik_oneshot_upload()` / `plikOneshotUpload()` | Uploads payload bytes to file server, returns URL | +| `_fetch_with_backoff()` / `fetchWithBackoff()` | Downloads data from URL with exponential backoff retry | This abstraction allows support for different file server implementations (Plik, AWS S3, custom HTTP server). +### Message Envelope Schema + +All platforms use identical JSON schemas for message envelopes: + +```json +{ + "correlation_id": "uuid-v4-string", + "msg_id": "uuid-v4-string", + "timestamp": "2024-01-15T10:30:00Z", + "send_to": "topic/subject", + "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat", + "sender_name": "agent-wine-web-frontend", + "sender_id": "uuid4", + "receiver_name": "agent-backend", + "receiver_id": "uuid4", + "reply_to": "topic", + "reply_to_msg_id": "uuid4", + "broker_url": "nats://localhost:4222", + "metadata": {}, + "payloads": [ + { + "id": "uuid4", + "dataname": "login_image", + "payload_type": "image", + "transport": "direct", + "encoding": "base64", + "size": 15433, + "data": "base64-encoded-string" + }, + { + "id": "uuid4", + "dataname": "large_table", + "payload_type": "table", + "transport": "link", + "encoding": "none", + "size": 524288, + "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow" + } + ] +} +``` + --- ## Installation @@ -138,14 +198,53 @@ This abstraction allows support for different file server implementations (Plik, - **NATS Server** (v2.10+ recommended) - **HTTP File Server** (optional, for payloads > 1MB) -### Julia +### Platform-Specific Dependencies + +#### Julia ```julia using Pkg Pkg.add("NATS") -Pkg.add("https://git.yiem.cc/ton/NATSBridge") +Pkg.add("Arrow") +Pkg.add("JSON3") +Pkg.add("HTTP") +Pkg.add("UUIDs") +Pkg.add("Dates") ``` +#### JavaScript (Node.js) + +```bash +npm install nats uuid apache-arrow node-fetch +# or +yarn add nats uuid apache-arrow node-fetch +``` + +#### JavaScript (Browser) + +```bash +npm install nats uuid apache-arrow +# or use CDN: +# https://unpkg.com/nats-js/dist/bundle/nats.min.js +# https://unpkg.com/apache-arrow/arrow.min.js +``` + +#### Python (Desktop) + +```bash +pip install nats-py aiohttp pyarrow pandas python-dateutil +``` + +#### MicroPython + +MicroPython uses built-in modules: +- `network` - NATS connection (custom implementation) +- `time` - Timestamps +- `uos` - File operations +- `base64` - Base64 encoding +- `json` - JSON parsing +- `struct` - Binary data handling + --- ## Quick Start @@ -166,61 +265,39 @@ mkdir -p /tmp/fileserver python3 -m http.server 8080 --directory /tmp/fileserver ``` -### Step 3: Send Your First Message - -#### Julia - -```julia -using NATSBridge - -# Send a text message -data = [("message", "Hello World", "text")] -env, env_json_str = NATSBridge.smartsend("/chat/room1", data; broker_url="nats://localhost:4222") -println("Message sent!") -``` - -### Step 4: Receive Messages - -#### Julia - -```julia -using NATS, NATSBridge - -# Configuration -const SUBJECT = "/chat/room1" -const NATS_URL = "nats://localhost:4222" - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] $message") -end - -# Receiver: Listen for messages - msg comes from the callback -function test_receive() - conn = NATS.connect(NATS_URL) - NATS.subscribe(conn, SUBJECT) do msg - log_trace("Received message on $(msg.subject)") - - # Receive and process message - env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler) - for (dataname, data, type) in env["payloads"] - println("Received $dataname: $data") - end - end - - # Keep listening for 120 seconds - sleep(120) - NATS.drain(conn) -end - -test_receive() -``` - --- ## API Reference +### Unified API Standard + +All platforms use the same input/output format for payloads: + +**Input format for smartsend:** +``` +[(dataname1, data1, type1), (dataname2, data2, type2), ...] +``` + +**Output format for smartreceive:** +``` +{ + "correlation_id": "...", + "msg_id": "...", + "timestamp": "...", + "send_to": "...", + "msg_purpose": "...", + "sender_name": "...", + "sender_id": "...", + "receiver_name": "...", + "receiver_id": "...", + "reply_to": "...", + "reply_to_msg_id": "...", + "broker_url": "...", + "metadata": {...}, + "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] +} +``` + ### smartsend Sends data either directly via NATS or via a fileserver URL, depending on payload size. @@ -231,27 +308,96 @@ Sends data either directly via NATS or via a fileserver URL, depending on payloa using NATSBridge env, env_json_str = NATSBridge.smartsend( - subject, # NATS subject + subject::String, # NATS subject data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) broker_url::String = "nats://localhost:4222", fileserver_url = "http://localhost:8080", fileserver_upload_handler::Function = plik_oneshot_upload, size_threshold::Int = 1_000_000, - correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID) + correlation_id::String = string(uuid4()), msg_purpose::String = "chat", sender_name::String = "NATSBridge", receiver_name::String = "", receiver_id::String = "", reply_to::String = "", reply_to_msg_id::String = "", - is_publish::Bool = true, # Whether to automatically publish to NATS - NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional, saves connection overhead) - msg_id::String = string(uuid4()), # Message ID (auto-generated UUID) - sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID) + is_publish::Bool = true, + NATS_connection::Union{NATS.Connection, Nothing} = nothing, + msg_id::String = string(uuid4()), + sender_id::String = string(uuid4()) ) # Returns: ::Tuple{msg_envelope_v1, String} -# - env: msg_envelope_v1 object with all envelope metadata and payloads -# - env_json_str: JSON string representation of the envelope for publishing +``` + +#### JavaScript + +```javascript +const NATSBridge = require('natbridge'); + +const [env, env_json_str] = await NATSBridge.smartsend( + subject, + data, // Array of [dataname, data, type] tuples + { + broker_url: 'nats://localhost:4222', + fileserver_url: 'http://localhost:8080', + fileserver_upload_handler: NATSBridge.plikOneshotUpload, + size_threshold: 1_000_000, + correlation_id: uuidv4(), + msg_purpose: 'chat', + sender_name: 'NATSBridge', + receiver_name: '', + receiver_id: '', + reply_to: '', + reply_to_msg_id: '', + is_publish: true, + nats_connection: null, + msg_id: uuidv4(), + sender_id: uuidv4() + } +); +// Returns: Promise<[env, env_json_str]> +``` + +#### Python + +```python +from natbridge import NATSBridge + +env, env_json_str = await NATSBridge.smartsend( + subject: str, + data: List[Tuple[str, Any, str]], + broker_url: str = "nats://localhost:4222", + fileserver_url: str = "http://localhost:8080", + fileserver_upload_handler: Callable = plik_oneshot_upload, + size_threshold: int = 1_000_000, + correlation_id: str = None, + msg_purpose: str = "chat", + sender_name: str = "NATSBridge", + receiver_name: str = "", + receiver_id: str = "", + reply_to: str = "", + reply_to_msg_id: str = "", + is_publish: bool = True, + nats_connection: Any = None, + msg_id: str = None, + sender_id: str = None +) +# Returns: Tuple[Dict, str] +``` + +#### MicroPython + +```python +from natbridge import NATSBridge + +# Limited to direct transport (< 100KB threshold) +env, env_json_str = NATSBridge.smartsend( + subject, + data, # List of (dataname, data, type) tuples + broker_url="nats://localhost:4222", + size_threshold=100000 # Lower threshold for memory constraints +) +# Returns: Tuple[Dict, str] ``` ### smartreceive @@ -263,7 +409,6 @@ Receives and processes messages from NATS, handling both direct and link transpo ```julia using NATSBridge -# Note: msg is a NATS.Msg object passed from the subscription callback env = NATSBridge.smartreceive( msg::NATS.Msg; fileserver_download_handler::Function = _fetch_with_backoff, @@ -271,51 +416,63 @@ env = NATSBridge.smartreceive( base_delay::Int = 100, max_delay::Int = 5000 ) -# Returns: ::JSON.Object{String, Any} - key-value structure resemble msg_envelope_v1 +# Returns: ::JSON.Object{String, Any} ``` -### publish_message +#### JavaScript -Publish a message to a NATS subject. This function is available in Julia with two overloads: +```javascript +const env = await NATSBridge.smartreceive( + msg, + { + fileserver_download_handler: NATSBridge.fetchWithBackoff, + max_retries: 5, + base_delay: 100, + max_delay: 5000 + } +); +// Returns: Promise +``` -#### Julia +#### Python -**Using broker URL (creates new connection):** -```julia -using NATSBridge, NATS - -# Publish with URL - creates a new connection -NATSBridge.publish_message( - "nats://localhost:4222", # broker_url - "/chat/room1", # subject - "{\"correlation_id\":\"abc123\"}", # message - "abc123" # correlation_id +```python +env = await NATSBridge.smartreceive( + msg, + fileserver_download_handler=fetch_with_backoff, + max_retries=5, + base_delay=100, + max_delay=5000 ) +# Returns: Dict with "payloads" key ``` -**Using pre-existing connection (saves connection overhead):** -```julia -using NATSBridge, NATS +#### MicroPython -# Create connection once and reuse -conn = NATS.connect("nats://localhost:4222") -NATSBridge.publish_message(conn, "/chat/room1", "{\"correlation_id\":\"abc123\"}", "abc123") -# Connection is automatically drained after publish +```python +env = NATSBridge.smartreceive( + msg, + fileserver_download_handler=_sync_fileserver_download, + max_retries=3, + base_delay=100, + max_delay=1000 +) +# Returns: Dict with "payloads" key ``` --- ## Payload Types -| Type | Description | Serialization | -|------|-------------|---------------| -| `text` | Plain text strings | UTF-8 bytes | -| `dictionary` | JSON-serializable dictionaries | JSON | -| `table` | Tabular data (DataFrames, arrays) | Apache Arrow IPC | -| `image` | Image data (PNG, JPG) | Raw bytes | -| `audio` | Audio data (WAV, MP3) | Raw bytes | -| `video` | Video data (MP4, AVI) | Raw bytes | -| `binary` | Generic binary data | Raw bytes | +| Type | Julia | JavaScript | Python | MicroPython | Description | +|------|-------|------------|--------|-------------|-------------| +| `text` | `String` | `string` | `str` | `str` | Plain text strings | +| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | JSON-serializable dictionaries | +| `table` | `DataFrame`, `Arrow.Table` | `Array` | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) | +| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Image data (PNG, JPG) | +| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Audio data (WAV, MP3) | +| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Video data (MP4, AVI) | +| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` | Generic binary data | --- @@ -325,31 +482,60 @@ NATSBridge.publish_message(conn, "/chat/room1", "{\"correlation_id\":\"abc123\"} Small payloads are sent directly via NATS with Base64 encoding. -#### Julia +#### Cross-Platform + ```julia +# Julia data = [("message", "Hello", "text")] smartsend("/topic", data) ``` +```javascript +// JavaScript +const data = [["message", "Hello", "text"]]; +smartsend("/topic", data); +``` + +```python +# Python +data = [("message", "Hello", "text")] +await smartsend("/topic", data) +``` + ### Link Transport (Payloads >= 1MB) Large payloads are uploaded to an HTTP file server. -#### Julia +#### Cross-Platform + ```julia +# Julia data = [("file", large_data, "binary")] smartsend("/topic", data; fileserver_url="http://localhost:8080") ``` +```javascript +// JavaScript +const data = [["file", largeData, "binary"]]; +smartsend("/topic", data, { fileserver_url: 'http://localhost:8080' }); +``` + +```python +# Python +data = [("file", large_data, "binary")] +await smartsend("/topic", data, fileserver_url="http://localhost:8080") +``` + --- -## Examples +## Cross-Platform Examples ### Example 1: Chat with Mixed Content -Send text, small image, and large file in one message. +Send text, image, and large file in one message. #### Julia + ```julia using NATSBridge @@ -362,11 +548,48 @@ data = [ env, env_json_str = NATSBridge.smartsend("/chat/room1", data; fileserver_url="http://localhost:8080") ``` +#### JavaScript + +```javascript +const NATSBridge = require('natbridge'); + +const data = [ + ["message_text", "Hello!", "text"], + ["user_avatar", imageData, "image"], + ["large_document", largeFileData, "binary"] +]; + +const [env, env_json_str] = await NATSBridge.smartsend( + "/chat/room1", + data, + { fileserver_url: 'http://localhost:8080' } +); +``` + +#### Python + +```python +from natbridge import NATSBridge + +data = [ + ("message_text", "Hello!", "text"), + ("user_avatar", image_data, "image"), + ("large_document", large_file_data, "binary") +] + +env, env_json_str = await NATSBridge.smartsend( + "/chat/room1", + data, + fileserver_url="http://localhost:8080" +) +``` + ### Example 2: Dictionary Exchange Send configuration data between platforms. #### Julia + ```julia using NATSBridge @@ -380,11 +603,44 @@ data = [("config", config, "dictionary")] env, env_json_str = NATSBridge.smartsend("/device/config", data) ``` +#### JavaScript + +```javascript +const NATSBridge = require('natbridge'); + +const config = { + wifi_ssid: "MyNetwork", + wifi_password: "password123", + update_interval: 60 +}; + +const [env, env_json_str] = await NATSBridge.smartsend( + "/device/config", + [["config", config, "dictionary"]] +); +``` + +#### Python + +```python +from natbridge import NATSBridge + +config = { + "wifi_ssid": "MyNetwork", + "wifi_password": "password123", + "update_interval": 60 +} + +data = [("config", config, "dictionary")] +env, env_json_str = await NATSBridge.smartsend("/device/config", data) +``` + ### Example 3: Table Data (Arrow IPC) Send tabular data using Apache Arrow IPC format. #### Julia + ```julia using NATSBridge using DataFrames @@ -399,14 +655,49 @@ data = [("students", df, "table")] env, env_json_str = NATSBridge.smartsend("/data/analysis", data) ``` -### Example 4: Request-Response Pattern with Envelope JSON +#### JavaScript -Bi-directional communication with reply-to support. The `smartsend` function now returns both the envelope object and a JSON string that can be published directly. +```javascript +const NATSBridge = require('natbridge'); + +const df = [ + { id: 1, name: "Alice", score: 95 }, + { id: 2, name: "Bob", score: 88 }, + { id: 3, name: "Charlie", score: 92 } +]; + +const [env, env_json_str] = await NATSBridge.smartsend( + "/data/analysis", + [["students", df, "table"]] +); +``` + +#### Python + +```python +from natbridge import NATSBridge +import pandas as pd + +df = pd.DataFrame({ + "id": [1, 2, 3], + "name": ["Alice", "Bob", "Charlie"], + "score": [95, 88, 92] +}) + +data = [("students", df, "table")] +env, env_json_str = await NATSBridge.smartsend("/data/analysis", data) +``` + +### Example 4: Request-Response Pattern + +Bi-directional communication with reply-to support. + +#### Julia -#### Julia (Requester) ```julia using NATSBridge +# Requester env, env_json_str = NATSBridge.smartsend( "/device/command", [("command", Dict("action" => "read_sensor"), "dictionary")]; @@ -415,26 +706,20 @@ env, env_json_str = NATSBridge.smartsend( ) ``` -#### Julia (Responder) ```julia +# Responder using NATS, NATSBridge -# Configuration -const SUBJECT = "/device/command" -const NATS_URL = "nats://localhost:4222" - function test_responder() - conn = NATS.connect(NATS_URL) - NATS.subscribe(conn, SUBJECT) do msg + conn = NATS.connect("nats://localhost:4222") + NATS.subscribe(conn, "/device/command") do msg env = NATSBridge.smartreceive(msg, fileserver_download_handler=_fetch_with_backoff) - # Extract reply_to from the envelope metadata reply_to = env["reply_to"] for (dataname, data, type) in env["payloads"] if dataname == "command" && data["action"] == "read_sensor" response = Dict("sensor_id" => "sensor-001", "value" => 42.5) - # Send response to the reply_to subject from the request if !isempty(reply_to) smartsend(reply_to, [("data", response, "dictionary")]) end @@ -445,51 +730,118 @@ function test_responder() sleep(120) NATS.drain(conn) end - -test_responder() ``` -### Example 5: IoT Device Sensor Data +#### JavaScript -IoT device sending sensor data. +```javascript +const NATSBridge = require('natbridge'); -#### Julia (Receiver) -```julia -using NATS, NATSBridge +// Requester +const [env, env_json_str] = await NATSBridge.smartsend( + "/device/command", + [["command", { action: "read_sensor" }, "dictionary"]], + { broker_url: 'nats://localhost:4222', reply_to: '/device/response' } +); +``` -# Configuration -const SUBJECT = "/device/sensors" -const NATS_URL = "nats://localhost:4222" +```javascript +// Responder +const nats = require('nats'); +const NATSBridge = require('natbridge'); -function test_receiver() - conn = NATS.connect(NATS_URL) - NATS.subscribe(conn, SUBJECT) do msg - env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler) - for (dataname, data, type) in env["payloads"] - if dataname == "temperature" - println("Temperature: $data") - elseif dataname == "humidity" - println("Humidity: $data") - end - end - end +async function testResponder() { + const conn = await nats.connect('nats://localhost:4222'); - sleep(120) - NATS.drain(conn) -end + const subscription = await conn.subscribe('/device/command'); + + for await (const msg of subscription) { + const env = await NATSBridge.smartreceive(msg, { + fileserver_download_handler: NATSBridge.fetchWithBackoff + }); + + const replyTo = env.reply_to; + + for (const [dataname, data, type] of env.payloads) { + if (dataname === 'command' && data.action === 'read_sensor') { + const response = { sensor_id: 'sensor-001', value: 42.5 }; + if (replyTo) { + await NATSBridge.smartsend( + replyTo, + [["data", response, "dictionary"]] + ); + } + } + } + } + + setTimeout(() => conn.close(), 120000); +} +``` -test_receiver() +#### Python + +```python +from natbridge import NATSBridge + +# Requester +env, env_json_str = await NATSBridge.smartsend( + "/device/command", + [("command", {"action": "read_sensor"}, "dictionary")], + broker_url="nats://localhost:4222", + reply_to="/device/response" +) +``` + +```python +# Responder +from natbridge import NATSBridge +import asyncio +import nats + +async def test_responder(): + nc = await nats.connect('nats://localhost:4222') + + async def msg_handler(msg): + env = await NATSBridge.smartreceive( + msg, + fileserver_download_handler=fetch_with_backoff + ) + + reply_to = env["reply_to"] + + for dataname, data, type_ in env["payloads"]: + if dataname == "command" and data["action"] == "read_sensor": + response = {"sensor_id": "sensor-001", "value": 42.5} + if reply_to: + await NATSBridge.smartsend( + reply_to, + [("data", response, "dictionary")] + ) + + await nc.subscribe('/device/command', cb=msg_handler) + + await asyncio.sleep(120) + await nc.drain() ``` --- ## Testing -Run the test scripts to verify functionality: +### Test File Organization -### Julia +| Platform | Sender Tests | Receiver Tests | +|----------|--------------|----------------| +| **Julia** | `test/test_julia_*_sender.jl` | `test/test_julia_*_receiver.jl` | +| **JavaScript** | `test/test_js_*_sender.js` | `test/test_js_*_receiver.js` | +| **Python** | `test/test_py_*_sender.py` | `test/test_py_*_receiver.py` | -```julia +### Run Tests + +#### Julia + +```bash # Text message exchange julia test/test_julia_text_sender.jl julia test/test_julia_text_receiver.jl @@ -511,6 +863,55 @@ julia test/test_julia_table_sender.jl julia test/test_julia_table_receiver.jl ``` +#### JavaScript (Node.js) + +```bash +# Text message exchange +node test/test_js_text_sender.js +node test/test_js_text_receiver.js + +# Dictionary exchange +node test/test_js_dictionary_sender.js +node test/test_js_dictionary_receiver.js + +# Binary transfer +node test/test_js_binary_sender.js +node test/test_js_binary_receiver.js + +# Table exchange +node test/test_js_table_sender.js +node test/test_js_table_receiver.js +``` + +#### Python + +```bash +# Text message exchange +python3 test/test_py_text_sender.py +python3 test/test_py_text_receiver.py + +# Dictionary exchange +python3 test/test_py_dictionary_sender.py +python3 test/test_py_dictionary_receiver.py + +# Binary transfer +python3 test/test_py_binary_sender.py +python3 test/test_py_binary_receiver.py + +# Table exchange +python3 test/test_py_table_sender.py +python3 test/test_py_table_receiver.py +``` + +--- + +## Documentation + +For detailed architecture and implementation information, see: + +- [Architecture Documentation](docs/architecture.md) - Cross-platform architecture, API parity, platform-specific patterns +- [Implementation Guide](docs/implementation.md) - Detailed implementation for each platform, handler functions, testing + --- ## License diff --git a/image/tutorial.md b/examples/tutorial.md similarity index 95% rename from image/tutorial.md rename to examples/tutorial.md index e19e6d7..0969849 100644 --- a/image/tutorial.md +++ b/examples/tutorial.md @@ -121,13 +121,13 @@ using NATSBridge # Send a text message data = [("message", "Hello World", "text")] env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222") -# env: msg_envelope_v1 object with all metadata and payloads +# env: msg_envelope_v1 struct with all metadata and payloads # env_json_str: JSON string representation of the envelope for publishing println("Message sent!") # Or use is_publish=false to get envelope and JSON without publishing env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222", is_publish=false) -# env: msg_envelope_v1 object +# env: msg_envelope_v1 struct # env_json_str: JSON string for publishing to NATS ``` @@ -178,6 +178,8 @@ env, env_json_str = await smartsend( broker_url="nats://localhost:4222", is_publish=False ) +# env: Dict with all metadata and payloads +# env_json_str: JSON string for publishing to NATS ``` #### MicroPython @@ -206,7 +208,8 @@ using NATSBridge # Receive and process message env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) -# Returns: ::JSON.Object{String, Any} - key-value structure resemble msg_envelope_v1 +# Returns: ::JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}} +# Access payloads: for (dataname, data, type) in env["payloads"] for (dataname, data, type) in env["payloads"] println("Received $dataname: $data") end @@ -230,7 +233,7 @@ for (const [dataname, data, type] of env.payloads) { #### Python ```python -from natbridge import smartreceive +from natbridge import smartreceive, fetch_with_backoff # Receive and process message env = await smartreceive( @@ -238,7 +241,7 @@ env = await smartreceive( fileserver_download_handler=fetch_with_backoff ) # env["payloads"] = [(dataname, data, type), ...] -for dataname, data, type in env["payloads"]: +for dataname, data, type_ in env["payloads"]: print(f"Received {dataname}: {data}") ``` @@ -497,6 +500,7 @@ env, env_json_str = smartsend( ) println("File uploaded to: $(env.payloads[1].data)") +# Note: For link transport, data field contains the URL string ``` #### JavaScript @@ -520,6 +524,7 @@ const [env, env_json_str] = await NATSBridge.smartsend( ); console.log("File uploaded to:", env.payloads[0].data); +// Note: For link transport, data field contains the URL string ``` #### Python @@ -539,6 +544,7 @@ env, env_json_str = await smartsend( ) print(f"File uploaded to: {env['payloads'][0]['data']}") +# Note: For link transport, data field contains the URL string ``` #### MicroPython @@ -619,6 +625,7 @@ env, env_json_str = await smartsend( data, broker_url="nats://localhost:4222" ) +# env: Dict with all metadata and payloads ``` ### Example 6: Table Data (Arrow IPC) diff --git a/image/walkthrough.md b/examples/walkthrough.md similarity index 96% rename from image/walkthrough.md rename to examples/walkthrough.md index decd77e..a8d8efc 100644 --- a/image/walkthrough.md +++ b/examples/walkthrough.md @@ -435,7 +435,7 @@ class ChatHandler: } self.ui.add_message(sender, '', attachment) else: - # For other types, use file server URL + # For other types, use file server URL or data attachment = {'type': type_, 'data': data} self.ui.add_message(sender, '', attachment) ``` @@ -588,7 +588,7 @@ class FileUploadService: self.broker_url = broker_url self.fileserver_url = fileserver_url - async def upload_file(self, file_path: str, recipient: str) -> dict: + async def upload_file(self, file_path: str, recipient: str) -> tuple: with open(file_path, "rb") as f: file_data = f.read() file_name = os.path.basename(file_path) @@ -602,9 +602,9 @@ class FileUploadService: fileserver_url=self.fileserver_url ) - return env + return env, env_json_str - async def upload_large_file(self, file_path: str, recipient: str) -> dict: + async def upload_large_file(self, file_path: str, recipient: str) -> tuple: file_size = os.path.getsize(file_path) if file_size > 100 * 1024 * 1024: # > 100MB @@ -613,7 +613,7 @@ class FileUploadService: return await self.upload_file(file_path, recipient) - async def stream_upload(self, file_path: str, recipient: str) -> dict: + async def stream_upload(self, file_path: str, recipient: str) -> tuple: # Implement streaming upload to file server # This would require a more sophisticated file server # For now, we'll use the standard upload @@ -1098,33 +1098,24 @@ class SensorSender: # Convert to Arrow IPC import pyarrow as arrow - import pyarrow.feather as feather + import pyarrow.ipc as ipc import io table = arrow.Table.from_pandas(df) buf = io.BytesIO() - with feather.FeatherWriter(buf, table) as writer: - pass - buf.seek(0) - arrow_data = buf.read() + sink = ipc.new_file(buf, table.schema) + ipc.write_table(table, sink) + sink.close() + arrow_data = buf.getvalue() - # Send based on size - if len(arrow_data) < 1048576: - data = [("batch", arrow_data, "table")] - await smartsend( - "/sensors/batch", - data, - broker_url=self.broker_url, - fileserver_url=self.fileserver_url - ) - else: - data = [("batch", arrow_data, "table")] - await smartsend( - "/sensors/batch", - data, - broker_url=self.broker_url, - fileserver_url=self.fileserver_url - ) + # Send based on size (auto-selected by smartsend) + data = [("batch", arrow_data, "table")] + await smartsend( + "/sensors/batch", + data, + broker_url=self.broker_url, + fileserver_url=self.fileserver_url + ) ``` --- @@ -1242,13 +1233,16 @@ end ```python # Reuse NATS connections +import asyncio +import nats + class ConnectionPool: def __init__(self): self.connections = {} - def get_connection(self, nats_url: str): + async def get_connection(self, nats_url: str): if nats_url not in self.connections: - self.connections[nats_url] = asyncio.run(nats.connect(nats_url)) + self.connections[nats_url] = await nats.connect(nats_url) return self.connections[nats_url] async def close_all(self): @@ -1286,18 +1280,20 @@ end ```python # Cache file server responses +import asyncio import threading +from natbridge import fetch_with_backoff file_cache = {} cache_lock = threading.Lock() -def fetch_with_caching(url, max_retries, base_delay, max_delay, correlation_id): +async def fetch_with_caching(url, max_retries, base_delay, max_delay, correlation_id): with cache_lock: if url in file_cache: return file_cache[url] # Fetch from file server - data = asyncio.run(fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id)) + data = await fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) # Cache the result with cache_lock: @@ -1341,7 +1337,13 @@ async function safeSmartSend(subject, data, options = {}) { #### Python ```python -async def safe_smartsend(subject: str, data: List[tuple], **kwargs): +from typing import List, Tuple, Optional, Union + +async def safe_smartsend( + subject: str, + data: List[Tuple[str, Any, str]], + **kwargs +) -> Optional[Tuple[dict, str]]: try: return await smartsend(subject, data, **kwargs) except Exception as error: @@ -1369,10 +1371,11 @@ end ```python import logging +from typing import List, Tuple, Any logger = logging.getLogger(__name__) -def log_send(subject: str, data: List[tuple], correlation_id: str): +def log_send(subject: str, data: List[Tuple[str, Any, str]], correlation_id: str): logger.info(f"Sending to {subject}: {len(data)} payloads, correlation_id={correlation_id}") def log_receive(correlation_id: str, num_payloads: int): diff --git a/updated_README.md b/updated_README.md deleted file mode 100644 index be22435..0000000 --- a/updated_README.md +++ /dev/null @@ -1,939 +0,0 @@ -# NATSBridge - Cross-Platform Bi-Directional Data Bridge - -A high-performance, bi-directional data bridge for **Julia, JavaScript, Python, and MicroPython** applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. - -[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) -[![NATS](https://img.shields.io/badge/NATS-Enabled-green.svg)](https://nats.io) - ---- - -## Table of Contents - -- [Overview](#overview) -- [Cross-Platform Support](#cross-platform-support) -- [Features](#features) -- [Architecture](#architecture) -- [Installation](#installation) -- [Quick Start](#quick-start) -- [API Reference](#api-reference) -- [Payload Types](#payload-types) -- [Transport Strategies](#transport-strategies) -- [Cross-Platform Examples](#cross-platform-examples) -- [Testing](#testing) -- [License](#license) - ---- - -## Overview - -NATSBridge enables seamless communication across multiple platforms through NATS, with intelligent transport selection based on payload size: - -| Transport | Payload Size | Method | -|-----------|--------------|--------| -| **Direct** | < 1MB | Sent directly via NATS (Base64 encoded) | -| **Link** | >= 1MB | Uploaded to HTTP file server, URL sent via NATS | - -### Use Cases - -- **Chat Applications**: Text, images, audio, video in a single message -- **File Transfer**: Efficient transfer of large files using claim-check pattern -- **IoT/Embedded**: Sensor data, telemetry, and analytics pipelines (MicroPython) -- **Cross-Platform Communication**: Interoperability between Julia, JavaScript, Python, and MicroPython systems - ---- - -## Cross-Platform Support - -| Platform | Implementation | Features | -|----------|----------------|----------| -| **Julia** | [`src/NATSBridge.jl`](src/NATSBridge.jl) | Full feature set, Arrow IPC, multiple dispatch | -| **JavaScript** | [`src/natbridge.js`](src/natbridge.js) | Node.js & browser, async/await | -| **Python** | [`src/natbridge.py`](src/natbridge.py) | Desktop Python, asyncio, type hints | -| **MicroPython** | [`src/natbridge_mpy.py`](src/natbridge_mpy.py) | Memory-constrained, synchronous API | - -### Platform Comparison - -| Feature | Julia | JavaScript | Python | MicroPython | -|---------|-------|------------|--------|-------------| -| Multiple Dispatch | ✅ Native | ❌ | ❌ | ❌ | -| Async/Await | ❌ | ✅ Native | ✅ Native | ⚠️ (uasyncio) | -| Type Safety | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ | -| Memory Management | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) | -| Arrow IPC | ✅ Native | ✅ | ✅ | ❌ | -| Direct Transport | ✅ | ✅ | ✅ | ✅ | -| Link Transport | ✅ | ✅ | ✅ | ⚠️ (Limited) | -| Handler Functions | ✅ | ✅ | ✅ | ✅ | -| Cross-Platform API | ✅ | ✅ | ✅ | ✅ | - ---- - -## Features - -- ✅ **Cross-platform messaging** for Julia, JavaScript, Python, and MicroPython applications -- ✅ **Bi-directional messaging** with request-reply patterns -- ✅ **Multi-payload support** - send multiple payloads with different types in one message -- ✅ **Automatic transport selection** - direct vs link based on payload size -- ✅ **Claim-Check pattern** for payloads > 1MB -- ✅ **Apache Arrow IPC** support for tabular data (zero-copy reading) -- ✅ **Exponential backoff** for reliable file server downloads -- ✅ **Correlation ID tracking** for message tracing -- ✅ **Reply-to support** for request-response patterns -- ✅ **Handler function abstraction** - pluggable file server implementations (Plik, AWS S3, custom) - ---- - -## Architecture - -### System Components - -```mermaid -flowchart TB - subgraph Sender["Application (Sender)"] - SenderApp[App Code] - NATSBridge_Send[NATSBridge] - NATS_Client[NATS.jl] - end - - subgraph Receiver["Application (Receiver)"] - ReceiverApp[App Code] - NATSBridge_Recv[NATSBridge] - NATS_Client_Recv[NATS.jl] - end - - subgraph Infrastructure["Infrastructure"] - NATS[NATS Server
Message Broker] - FileServer[HTTP File Server
Upload/Download] - end - - SenderApp --> NATSBridge_Send - NATSBridge_Send --> NATS_Client - NATS_Client --> NATS - - NATS --> NATS_Client_Recv - NATS_Client_Recv --> NATSBridge_Recv - NATSBridge_Recv --> ReceiverApp - - NATSBridge_Send -.->|HTTP POST upload| FileServer - FileServer -.->|HTTP GET download| NATSBridge_Recv - - style SenderApp fill:#e8f5e9 - style ReceiverApp fill:#e8f5e9 - style NATS fill:#fff3e0 - style FileServer fill:#f3e5f5 -``` - -### Message Flow - -1. **Sender** creates a message envelope with payloads using `smartsend()` -2. **NATSBridge** serializes and encodes each payload based on type -3. **Transport Decision**: - - **Direct** (< 1MB): Payload encoded as Base64, published to NATS - - **Link** (≥ 1MB): Payload uploaded to HTTP file server, URL published to NATS -4. **NATS** routes message envelope to subscribers -5. **Receiver** receives message via NATS subscription callback -6. **NATSBridge** processes envelope: - - Decodes Base64 payloads from NATS message - - Fetches URLs from file server with exponential backoff -7. **Receiver** deserializes payloads based on their type - -### File Server Handler Abstraction - -The system uses handler functions to abstract file server operations: - -| Handler | Purpose | -|---------|---------| -| `plik_oneshot_upload()` / `plikOneshotUpload()` | Uploads payload bytes to file server, returns URL | -| `_fetch_with_backoff()` / `fetchWithBackoff()` | Downloads data from URL with exponential backoff retry | - -This abstraction allows support for different file server implementations (Plik, AWS S3, custom HTTP server). - -### Message Envelope Schema - -All platforms use identical JSON schemas for message envelopes: - -```json -{ - "correlation_id": "uuid-v4-string", - "msg_id": "uuid-v4-string", - "timestamp": "2024-01-15T10:30:00Z", - "send_to": "topic/subject", - "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat", - "sender_name": "agent-wine-web-frontend", - "sender_id": "uuid4", - "receiver_name": "agent-backend", - "receiver_id": "uuid4", - "reply_to": "topic", - "reply_to_msg_id": "uuid4", - "broker_url": "nats://localhost:4222", - "metadata": {}, - "payloads": [ - { - "id": "uuid4", - "dataname": "login_image", - "payload_type": "image", - "transport": "direct", - "encoding": "base64", - "size": 15433, - "data": "base64-encoded-string" - }, - { - "id": "uuid4", - "dataname": "large_table", - "payload_type": "table", - "transport": "link", - "encoding": "none", - "size": 524288, - "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow" - } - ] -} -``` - ---- - -## Installation - -### Prerequisites - -- **NATS Server** (v2.10+ recommended) -- **HTTP File Server** (optional, for payloads > 1MB) - -### Platform-Specific Dependencies - -#### Julia - -```julia -using Pkg -Pkg.add("NATS") -Pkg.add("Arrow") -Pkg.add("JSON3") -Pkg.add("HTTP") -Pkg.add("UUIDs") -Pkg.add("Dates") -``` - -#### JavaScript (Node.js) - -```bash -npm install nats uuid apache-arrow node-fetch -# or -yarn add nats uuid apache-arrow node-fetch -``` - -#### JavaScript (Browser) - -```bash -npm install nats uuid apache-arrow -# or use CDN: -# https://unpkg.com/nats-js/dist/bundle/nats.min.js -# https://unpkg.com/apache-arrow/arrow.min.js -``` - -#### Python (Desktop) - -```bash -pip install nats-py aiohttp pyarrow pandas python-dateutil -``` - -#### MicroPython - -MicroPython uses built-in modules: -- `network` - NATS connection (custom implementation) -- `time` - Timestamps -- `uos` - File operations -- `base64` - Base64 encoding -- `json` - JSON parsing -- `struct` - Binary data handling - ---- - -## Quick Start - -### Step 1: Start NATS Server - -```bash -docker run -p 4222:4222 nats:latest -``` - -### Step 2: Start HTTP File Server (Optional) - -```bash -# Create a directory for file uploads -mkdir -p /tmp/fileserver - -# Start HTTP file server -python3 -m http.server 8080 --directory /tmp/fileserver -``` - ---- - -## API Reference - -### Unified API Standard - -All platforms use the same input/output format for payloads: - -**Input format for smartsend:** -``` -[(dataname1, data1, type1), (dataname2, data2, type2), ...] -``` - -**Output format for smartreceive:** -``` -{ - "correlation_id": "...", - "msg_id": "...", - "timestamp": "...", - "send_to": "...", - "msg_purpose": "...", - "sender_name": "...", - "sender_id": "...", - "receiver_name": "...", - "receiver_id": "...", - "reply_to": "...", - "reply_to_msg_id": "...", - "broker_url": "...", - "metadata": {...}, - "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] -} -``` - -### smartsend - -Sends data either directly via NATS or via a fileserver URL, depending on payload size. - -#### Julia - -```julia -using NATSBridge - -env, env_json_str = NATSBridge.smartsend( - subject::String, # NATS subject - data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) - broker_url::String = "nats://localhost:4222", - fileserver_url = "http://localhost:8080", - fileserver_upload_handler::Function = plik_oneshot_upload, - size_threshold::Int = 1_000_000, - correlation_id::String = string(uuid4()), - msg_purpose::String = "chat", - sender_name::String = "NATSBridge", - receiver_name::String = "", - receiver_id::String = "", - reply_to::String = "", - reply_to_msg_id::String = "", - is_publish::Bool = true, - NATS_connection::Union{NATS.Connection, Nothing} = nothing, - msg_id::String = string(uuid4()), - sender_id::String = string(uuid4()) -) -# Returns: ::Tuple{msg_envelope_v1, String} -``` - -#### JavaScript - -```javascript -const NATSBridge = require('natbridge'); - -const [env, env_json_str] = await NATSBridge.smartsend( - subject, - data, // Array of [dataname, data, type] tuples - { - broker_url: 'nats://localhost:4222', - fileserver_url: 'http://localhost:8080', - fileserver_upload_handler: NATSBridge.plikOneshotUpload, - size_threshold: 1_000_000, - correlation_id: uuidv4(), - msg_purpose: 'chat', - sender_name: 'NATSBridge', - receiver_name: '', - receiver_id: '', - reply_to: '', - reply_to_msg_id: '', - is_publish: true, - nats_connection: null, - msg_id: uuidv4(), - sender_id: uuidv4() - } -); -// Returns: Promise<[env, env_json_str]> -``` - -#### Python - -```python -from natbridge import NATSBridge - -env, env_json_str = await NATSBridge.smartsend( - subject: str, - data: List[Tuple[str, Any, str]], - broker_url: str = "nats://localhost:4222", - fileserver_url: str = "http://localhost:8080", - fileserver_upload_handler: Callable = plik_oneshot_upload, - size_threshold: int = 1_000_000, - correlation_id: str = None, - msg_purpose: str = "chat", - sender_name: str = "NATSBridge", - receiver_name: str = "", - receiver_id: str = "", - reply_to: str = "", - reply_to_msg_id: str = "", - is_publish: bool = True, - nats_connection: Any = None, - msg_id: str = None, - sender_id: str = None -) -# Returns: Tuple[Dict, str] -``` - -#### MicroPython - -```python -from natbridge import NATSBridge - -# Limited to direct transport (< 100KB threshold) -env, env_json_str = NATSBridge.smartsend( - subject, - data, # List of (dataname, data, type) tuples - broker_url="nats://localhost:4222", - size_threshold=100000 # Lower threshold for memory constraints -) -# Returns: Tuple[Dict, str] -``` - -### smartreceive - -Receives and processes messages from NATS, handling both direct and link transport. - -#### Julia - -```julia -using NATSBridge - -env = NATSBridge.smartreceive( - msg::NATS.Msg; - fileserver_download_handler::Function = _fetch_with_backoff, - max_retries::Int = 5, - base_delay::Int = 100, - max_delay::Int = 5000 -) -# Returns: ::JSON.Object{String, Any} -``` - -#### JavaScript - -```javascript -const env = await NATSBridge.smartreceive( - msg, - { - fileserver_download_handler: NATSBridge.fetchWithBackoff, - max_retries: 5, - base_delay: 100, - max_delay: 5000 - } -); -// Returns: Promise -``` - -#### Python - -```python -env = await NATSBridge.smartreceive( - msg, - fileserver_download_handler=fetch_with_backoff, - max_retries=5, - base_delay=100, - max_delay=5000 -) -# Returns: Dict with "payloads" key -``` - -#### MicroPython - -```python -env = NATSBridge.smartreceive( - msg, - fileserver_download_handler=_sync_fileserver_download, - max_retries=3, - base_delay=100, - max_delay=1000 -) -# Returns: Dict with "payloads" key -``` - ---- - -## Payload Types - -| Type | Julia | JavaScript | Python | MicroPython | Description | -|------|-------|------------|--------|-------------|-------------| -| `text` | `String` | `string` | `str` | `str` | Plain text strings | -| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | JSON-serializable dictionaries | -| `table` | `DataFrame`, `Arrow.Table` | `Array` | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) | -| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Image data (PNG, JPG) | -| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Audio data (WAV, MP3) | -| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Video data (MP4, AVI) | -| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` | Generic binary data | - ---- - -## Transport Strategies - -### Direct Transport (Payloads < 1MB) - -Small payloads are sent directly via NATS with Base64 encoding. - -#### Cross-Platform - -```julia -# Julia -data = [("message", "Hello", "text")] -smartsend("/topic", data) -``` - -```javascript -// JavaScript -const data = [["message", "Hello", "text"]]; -smartsend("/topic", data); -``` - -```python -# Python -data = [("message", "Hello", "text")] -await smartsend("/topic", data) -``` - -### Link Transport (Payloads >= 1MB) - -Large payloads are uploaded to an HTTP file server. - -#### Cross-Platform - -```julia -# Julia -data = [("file", large_data, "binary")] -smartsend("/topic", data; fileserver_url="http://localhost:8080") -``` - -```javascript -// JavaScript -const data = [["file", largeData, "binary"]]; -smartsend("/topic", data, { fileserver_url: 'http://localhost:8080' }); -``` - -```python -# Python -data = [("file", large_data, "binary")] -await smartsend("/topic", data, fileserver_url="http://localhost:8080") -``` - ---- - -## Cross-Platform Examples - -### Example 1: Chat with Mixed Content - -Send text, image, and large file in one message. - -#### Julia - -```julia -using NATSBridge - -data = [ - ("message_text", "Hello!", "text"), - ("user_avatar", image_data, "image"), - ("large_document", large_file_data, "binary") -] - -env, env_json_str = NATSBridge.smartsend("/chat/room1", data; fileserver_url="http://localhost:8080") -``` - -#### JavaScript - -```javascript -const NATSBridge = require('natbridge'); - -const data = [ - ["message_text", "Hello!", "text"], - ["user_avatar", imageData, "image"], - ["large_document", largeFileData, "binary"] -]; - -const [env, env_json_str] = await NATSBridge.smartsend( - "/chat/room1", - data, - { fileserver_url: 'http://localhost:8080' } -); -``` - -#### Python - -```python -from natbridge import NATSBridge - -data = [ - ("message_text", "Hello!", "text"), - ("user_avatar", image_data, "image"), - ("large_document", large_file_data, "binary") -] - -env, env_json_str = await NATSBridge.smartsend( - "/chat/room1", - data, - fileserver_url="http://localhost:8080" -) -``` - -### Example 2: Dictionary Exchange - -Send configuration data between platforms. - -#### Julia - -```julia -using NATSBridge - -config = Dict( - "wifi_ssid" => "MyNetwork", - "wifi_password" => "password123", - "update_interval" => 60 -) - -data = [("config", config, "dictionary")] -env, env_json_str = NATSBridge.smartsend("/device/config", data) -``` - -#### JavaScript - -```javascript -const NATSBridge = require('natbridge'); - -const config = { - wifi_ssid: "MyNetwork", - wifi_password: "password123", - update_interval: 60 -}; - -const [env, env_json_str] = await NATSBridge.smartsend( - "/device/config", - [["config", config, "dictionary"]] -); -``` - -#### Python - -```python -from natbridge import NATSBridge - -config = { - "wifi_ssid": "MyNetwork", - "wifi_password": "password123", - "update_interval": 60 -} - -data = [("config", config, "dictionary")] -env, env_json_str = await NATSBridge.smartsend("/device/config", data) -``` - -### Example 3: Table Data (Arrow IPC) - -Send tabular data using Apache Arrow IPC format. - -#### Julia - -```julia -using NATSBridge -using DataFrames - -df = DataFrame( - id = [1, 2, 3], - name = ["Alice", "Bob", "Charlie"], - score = [95, 88, 92] -) - -data = [("students", df, "table")] -env, env_json_str = NATSBridge.smartsend("/data/analysis", data) -``` - -#### JavaScript - -```javascript -const NATSBridge = require('natbridge'); - -const df = [ - { id: 1, name: "Alice", score: 95 }, - { id: 2, name: "Bob", score: 88 }, - { id: 3, name: "Charlie", score: 92 } -]; - -const [env, env_json_str] = await NATSBridge.smartsend( - "/data/analysis", - [["students", df, "table"]] -); -``` - -#### Python - -```python -from natbridge import NATSBridge -import pandas as pd - -df = pd.DataFrame({ - "id": [1, 2, 3], - "name": ["Alice", "Bob", "Charlie"], - "score": [95, 88, 92] -}) - -data = [("students", df, "table")] -env, env_json_str = await NATSBridge.smartsend("/data/analysis", data) -``` - -### Example 4: Request-Response Pattern - -Bi-directional communication with reply-to support. - -#### Julia - -```julia -using NATSBridge - -# Requester -env, env_json_str = NATSBridge.smartsend( - "/device/command", - [("command", Dict("action" => "read_sensor"), "dictionary")]; - broker_url="nats://localhost:4222", - reply_to="/device/response" -) -``` - -```julia -# Responder -using NATS, NATSBridge - -function test_responder() - conn = NATS.connect("nats://localhost:4222") - NATS.subscribe(conn, "/device/command") do msg - env = NATSBridge.smartreceive(msg, fileserver_download_handler=_fetch_with_backoff) - - reply_to = env["reply_to"] - - for (dataname, data, type) in env["payloads"] - if dataname == "command" && data["action"] == "read_sensor" - response = Dict("sensor_id" => "sensor-001", "value" => 42.5) - if !isempty(reply_to) - smartsend(reply_to, [("data", response, "dictionary")]) - end - end - end - end - - sleep(120) - NATS.drain(conn) -end -``` - -#### JavaScript - -```javascript -const NATSBridge = require('natbridge'); - -// Requester -const [env, env_json_str] = await NATSBridge.smartsend( - "/device/command", - [["command", { action: "read_sensor" }, "dictionary"]], - { broker_url: 'nats://localhost:4222', reply_to: '/device/response' } -); -``` - -```javascript -// Responder -const nats = require('nats'); -const NATSBridge = require('natbridge'); - -async function testResponder() { - const conn = await nats.connect('nats://localhost:4222'); - - const subscription = await conn.subscribe('/device/command'); - - for await (const msg of subscription) { - const env = await NATSBridge.smartreceive(msg, { - fileserver_download_handler: NATSBridge.fetchWithBackoff - }); - - const replyTo = env.reply_to; - - for (const [dataname, data, type] of env.payloads) { - if (dataname === 'command' && data.action === 'read_sensor') { - const response = { sensor_id: 'sensor-001', value: 42.5 }; - if (replyTo) { - await NATSBridge.smartsend( - replyTo, - [["data", response, "dictionary"]] - ); - } - } - } - } - - setTimeout(() => conn.close(), 120000); -} -``` - -#### Python - -```python -from natbridge import NATSBridge - -# Requester -env, env_json_str = await NATSBridge.smartsend( - "/device/command", - [("command", {"action": "read_sensor"}, "dictionary")], - broker_url="nats://localhost:4222", - reply_to="/device/response" -) -``` - -```python -# Responder -from natbridge import NATSBridge -import asyncio -import nats - -async def test_responder(): - nc = await nats.connect('nats://localhost:4222') - - async def msg_handler(msg): - env = await NATSBridge.smartreceive( - msg, - fileserver_download_handler=fetch_with_backoff - ) - - reply_to = env["reply_to"] - - for dataname, data, type_ in env["payloads"]: - if dataname == "command" and data["action"] == "read_sensor": - response = {"sensor_id": "sensor-001", "value": 42.5} - if reply_to: - await NATSBridge.smartsend( - reply_to, - [("data", response, "dictionary")] - ) - - await nc.subscribe('/device/command', cb=msg_handler) - - await asyncio.sleep(120) - await nc.drain() -``` - ---- - -## Testing - -### Test File Organization - -| Platform | Sender Tests | Receiver Tests | -|----------|--------------|----------------| -| **Julia** | `test/test_julia_*_sender.jl` | `test/test_julia_*_receiver.jl` | -| **JavaScript** | `test/test_js_*_sender.js` | `test/test_js_*_receiver.js` | -| **Python** | `test/test_py_*_sender.py` | `test/test_py_*_receiver.py` | - -### Run Tests - -#### Julia - -```bash -# Text message exchange -julia test/test_julia_text_sender.jl -julia test/test_julia_text_receiver.jl - -# Dictionary exchange -julia test/test_julia_dict_sender.jl -julia test/test_julia_dict_receiver.jl - -# File transfer -julia test/test_julia_file_sender.jl -julia test/test_julia_file_receiver.jl - -# Mixed payload types -julia test/test_julia_mix_payloads_sender.jl -julia test/test_julia_mix_payloads_receiver.jl - -# Table exchange -julia test/test_julia_table_sender.jl -julia test/test_julia_table_receiver.jl -``` - -#### JavaScript (Node.js) - -```bash -# Text message exchange -node test/test_js_text_sender.js -node test/test_js_text_receiver.js - -# Dictionary exchange -node test/test_js_dictionary_sender.js -node test/test_js_dictionary_receiver.js - -# Binary transfer -node test/test_js_binary_sender.js -node test/test_js_binary_receiver.js - -# Table exchange -node test/test_js_table_sender.js -node test/test_js_table_receiver.js -``` - -#### Python - -```bash -# Text message exchange -python3 test/test_py_text_sender.py -python3 test/test_py_text_receiver.py - -# Dictionary exchange -python3 test/test_py_dictionary_sender.py -python3 test/test_py_dictionary_receiver.py - -# Binary transfer -python3 test/test_py_binary_sender.py -python3 test/test_py_binary_receiver.py - -# Table exchange -python3 test/test_py_table_sender.py -python3 test/test_py_table_receiver.py -``` - ---- - -## Documentation - -For detailed architecture and implementation information, see: - -- [Architecture Documentation](docs/architecture.md) - Cross-platform architecture, API parity, platform-specific patterns -- [Implementation Guide](docs/implementation.md) - Detailed implementation for each platform, handler functions, testing - ---- - -## License - -MIT License - -Copyright (c) 2026 NATSBridge Contributors - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file