# NATSBridge A high-performance, bi-directional data bridge between **Julia**, **JavaScript**, and **Python/Micropython** applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) [![NATS](https://img.shields.io/badge/NATS-Enabled-green.svg)](https://nats.io) --- ## Table of Contents - [Overview](#overview) - [Features](#features) - [Architecture](#architecture) - [Installation](#installation) - [Quick Start](#quick-start) - [API Reference](#api-reference) - [Payload Types](#payload-types) - [Transport Strategies](#transport-strategies) - [Examples](#examples) - [Testing](#testing) - [License](#license) --- ## Overview NATSBridge enables seamless communication across Julia, JavaScript, and Python/Micropython applications through NATS, with intelligent transport selection based on payload size: | Transport | Payload Size | Method | |-----------|--------------|--------| | **Direct** | < 1MB | Sent directly via NATS (Base64 encoded) | | **Link** | >= 1MB | Uploaded to HTTP file server, URL sent via NATS | ### Use Cases - **Chat Applications**: Text, images, audio, video in a single message - **File Transfer**: Efficient transfer of large files using claim-check pattern - **Streaming Data**: Sensor data, telemetry, and analytics pipelines - **Cross-Platform Communication**: Julia ↔ JavaScript ↔ Python/Micropython - **IoT Devices**: Micropython devices sending data to cloud services --- ## Features - ✅ **Bi-directional messaging** between Julia, JavaScript, and Python/Micropython - ✅ **Multi-payload support** - send multiple payloads with different types in one message - ✅ **Automatic transport selection** - direct vs link based on payload size - ✅ **Claim-Check pattern** for payloads > 1MB - ✅ **Apache Arrow IPC** support for tabular data (zero-copy reading) - ✅ **Exponential backoff** for reliable file server downloads - ✅ **Correlation ID tracking** for message tracing - ✅ **Reply-to support** for request-response patterns - ✅ **JetStream support** for message replay and durability - ✅ **Lightweight Micropython implementation** for microcontrollers --- ## Architecture ### System Components ``` ┌─────────────────────────────────────────────────────────────────────┐ │ NATSBridge Architecture │ ├─────────────────────────────────────────────────────────────────────┤ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Julia │ │ JavaScript │ │ Python/ │ │ │ │ (NATS.jl) │◄──►│ (nats.js) │◄──►│ Micropython │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ NATS │ │ │ │ (Message Broker) │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────┐ │ │ │ File Server │ │ │ │ (HTTP Upload/Get) │ │ │ └──────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ``` ### Message Flow 1. **Sender** creates a message envelope with payloads 2. **NATSBridge** serializes and encodes payloads based on type 3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server 4. **NATS** routes messages to subscribers 5. **Receiver** fetches payloads (from NATS or file server) 6. **NATSBridge** deserializes and decodes payloads --- ## Installation ### Prerequisites - **NATS Server** (v2.10+ recommended) - **HTTP File Server** (optional, for payloads > 1MB) ### Julia ```julia using Pkg Pkg.add("NATS") Pkg.add("Arrow") Pkg.add("JSON3") Pkg.add("HTTP") Pkg.add("UUIDs") Pkg.add("Dates") ``` Add to your `Project.toml`: ```toml [NATSBridge] deps = ["NATS", "Arrow", "JSON3", "HTTP", "UUIDs", "Dates"] ``` ### JavaScript ```bash npm install nats.js apache-arrow uuid base64-url ``` For Node.js: ```javascript const { smartsend, smartreceive } = require('./src/NATSBridge'); ``` For browser: ```html ``` ### Python/Micropython 1. Copy [`src/nats_bridge.py`](src/nats_bridge.py) to your device 2. Install dependencies: **For Python (desktop):** ```bash pip install nats-py ``` **For Micropython:** - `urequests` for HTTP requests (built-in for ESP32) - `base64` for base64 encoding (built-in) - `json` for JSON handling (built-in) --- ## Quick Start ### Step 1: Start NATS Server ```bash docker run -p 4222:4222 nats:latest ``` ### Step 2: Start HTTP File Server (Optional) ```bash # Create a directory for file uploads mkdir -p /tmp/fileserver # Use Python's built-in server python3 -m http.server 8080 --directory /tmp/fileserver ``` ### Step 3: Send Your First Message #### Python/Micropython ```python from nats_bridge import smartsend # Send a text message data = [("message", "Hello World", "text")] env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222") print("Message sent!") ``` #### JavaScript ```javascript const { smartsend } = require('./src/NATSBridge'); // Send a text message await smartsend("/chat/room1", [ { dataname: "message", data: "Hello World", type: "text" } ], { natsUrl: "nats://localhost:4222" }); console.log("Message sent!"); ``` #### Julia ```julia using NATSBridge # Send a text message data = [("message", "Hello World", "text")] env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222") println("Message sent!") ``` ### Step 4: Receive Messages #### Python/Micropython ```python from nats_bridge import smartreceive # Receive and process message envelope = smartreceive(msg) for dataname, data, type in envelope["payloads"]: print(f"Received {dataname}: {data}") ``` #### JavaScript ```javascript const { smartreceive } = require('./src/NATSBridge'); // Receive and process message const envelope = await smartreceive(msg); for (const payload of envelope.payloads) { console.log(`Received ${payload.dataname}: ${payload.data}`); } ``` #### Julia ```julia using NATSBridge # Receive and process message envelope = smartreceive(msg, fileserverDownloadHandler) for (dataname, data, type) in envelope["payloads"] println("Received $dataname: $data") end ``` --- ## API Reference ### smartsend Sends data either directly via NATS or via a fileserver URL, depending on payload size. #### Python/Micropython ```python from nats_bridge import smartsend env = smartsend( subject, # NATS subject to publish to data, # List of (dataname, data, type) tuples nats_url="nats://localhost:4222", # NATS server URL fileserver_url="http://localhost:8080", # File server URL fileserver_upload_handler=plik_oneshot_upload, # Upload handler function size_threshold=1_000_000, # Threshold in bytes (default: 1MB) correlation_id=None, # Optional correlation ID for tracing msg_purpose="chat", # Message purpose sender_name="NATSBridge", # Sender name receiver_name="", # Receiver name (empty = broadcast) receiver_id="", # Receiver UUID (empty = broadcast) reply_to="", # Reply topic reply_to_msg_id="" # Reply message ID ) ``` #### JavaScript ```javascript const { smartsend } = require('./src/NATSBridge'); const env = await smartsend( subject, // NATS subject data, // Array of {dataname, data, type} { natsUrl: "nats://localhost:4222", fileserverUrl: "http://localhost:8080", fileserverUploadHandler: customUploadHandler, sizeThreshold: 1_000_000, correlationId: "custom-id", msgPurpose: "chat", senderName: "NATSBridge", receiverName: "", receiverId: "", replyTo: "", replyToMsgId: "" } ); ``` #### Julia ```julia using NATSBridge env = smartsend( subject; # NATS subject data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) nats_url::String = "nats://localhost:4222", fileserver_url = "http://localhost:8080", fileserverUploadHandler::Function = plik_oneshot_upload, size_threshold::Int = 1_000_000, correlation_id::Union{String, Nothing} = nothing, msg_purpose::String = "chat", sender_name::String = "NATSBridge", receiver_name::String = "", receiver_id::String = "", reply_to::String = "", reply_to_msg_id::String = "" ) ``` ### smartreceive Receives and processes messages from NATS, handling both direct and link transport. #### Python/Micropython ```python from nats_bridge import smartreceive envelope = smartreceive( msg, # NATS message fileserver_download_handler=_fetch_with_backoff, # Download handler max_retries=5, # Max retry attempts base_delay=100, # Initial delay in ms max_delay=5000 # Max delay in ms ) # Returns: Dict with envelope metadata and 'payloads' field ``` #### JavaScript ```javascript const { smartreceive } = require('./src/NATSBridge'); const envelope = await smartreceive( msg, // NATS message { fileserverDownloadHandler: customDownloadHandler, maxRetries: 5, baseDelay: 100, maxDelay: 5000 } ); // Returns: Object with envelope metadata and payloads array ``` #### Julia ```julia using NATSBridge envelope = smartreceive( msg::NATS.Msg; fileserverDownloadHandler::Function = _fetch_with_backoff, max_retries::Int = 5, base_delay::Int = 100, max_delay::Int = 5000 ) # Returns: Dict with envelope metadata and payloads array ``` --- ## Payload Types | Type | Description | Serialization | |------|-------------|---------------| | `text` | Plain text strings | UTF-8 bytes | | `dictionary` | JSON-serializable dictionaries | JSON | | `table` | Tabular data (DataFrames, arrays) | Apache Arrow IPC | | `image` | Image data (PNG, JPG) | Raw bytes | | `audio` | Audio data (WAV, MP3) | Raw bytes | | `video` | Video data (MP4, AVI) | Raw bytes | | `binary` | Generic binary data | Raw bytes | --- ## Transport Strategies ### Direct Transport (Payloads < 1MB) Small payloads are sent directly via NATS with Base64 encoding. #### Python/Micropython ```python data = [("message", "Hello", "text")] smartsend("/topic", data) ``` #### JavaScript ```javascript await smartsend("/topic", [ { dataname: "message", data: "Hello", type: "text" } ]); ``` #### Julia ```julia data = [("message", "Hello", "text")] smartsend("/topic", data) ``` ### Link Transport (Payloads >= 1MB) Large payloads are uploaded to an HTTP file server. #### Python/Micropython ```python data = [("file", large_data, "binary")] smartsend("/topic", data, fileserver_url="http://localhost:8080") ``` #### JavaScript ```javascript await smartsend("/topic", [ { dataname: "file", data: largeData, type: "binary" } ], { fileserverUrl: "http://localhost:8080" }); ``` #### Julia ```julia data = [("file", large_data, "binary")] smartsend("/topic", data, fileserver_url="http://localhost:8080") ``` --- ## Examples All examples include code for **Julia**, **JavaScript**, and **Python/Micropython** unless otherwise specified. ### Example 1: Chat with Mixed Content Send text, small image, and large file in one message. #### Python/Micropython ```python from nats_bridge import smartsend data = [ ("message_text", "Hello!", "text"), ("user_avatar", image_data, "image"), ("large_document", large_file_data, "binary") ] smartsend("/chat/room1", data, fileserver_url="http://localhost:8080") ``` #### JavaScript ```javascript const { smartsend } = require('./src/NATSBridge'); await smartsend("/chat/room1", [ { dataname: "message_text", data: "Hello!", type: "text" }, { dataname: "user_avatar", data: image_data, type: "image" }, { dataname: "large_document", data: large_file_data, type: "binary" } ], { fileserverUrl: "http://localhost:8080" }); ``` #### Julia ```julia using NATSBridge data = [ ("message_text", "Hello!", "text"), ("user_avatar", image_data, "image"), ("large_document", large_file_data, "binary") ] smartsend("/chat/room1", data, fileserver_url="http://localhost:8080") ``` ### Example 2: Dictionary Exchange Send configuration data between platforms. #### Python/Micropython ```python from nats_bridge import smartsend config = { "wifi_ssid": "MyNetwork", "wifi_password": "password123", "update_interval": 60 } data = [("config", config, "dictionary")] smartsend("/device/config", data) ``` #### JavaScript ```javascript const { smartsend } = require('./src/NATSBridge'); const config = { wifi_ssid: "MyNetwork", wifi_password: "password123", update_interval: 60 }; await smartsend("/device/config", [ { dataname: "config", data: config, type: "dictionary" } ]); ``` #### Julia ```julia using NATSBridge config = Dict( "wifi_ssid" => "MyNetwork", "wifi_password" => "password123", "update_interval" => 60 ) data = [("config", config, "dictionary")] smartsend("/device/config", data) ``` ### Example 3: Table Data (Arrow IPC) Send tabular data using Apache Arrow IPC format. #### Python/Micropython ```python import pandas as pd from nats_bridge import smartsend df = pd.DataFrame({ "id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"], "score": [95, 88, 92] }) data = [("students", df, "table")] smartsend("/data/analysis", data) ``` #### JavaScript ```javascript const { smartsend } = require('./src/NATSBridge'); const tableData = [ { id: 1, name: "Alice", score: 95 }, { id: 2, name: "Bob", score: 88 }, { id: 3, name: "Charlie", score: 92 } ]; await smartsend("/data/analysis", [ { dataname: "students", data: tableData, type: "table" } ]); ``` #### Julia ```julia using NATSBridge using DataFrames df = DataFrame( id = [1, 2, 3], name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92] ) data = [("students", df, "table")] smartsend("/data/analysis", data) ``` ### Example 4: Request-Response Pattern Bi-directional communication with reply-to support. #### Python/Micropython (Requester) ```python from nats_bridge import smartsend env = smartsend( "/device/command", [("command", {"action": "read_sensor"}, "dictionary")], reply_to="/device/response" ) ``` #### Python/Micropython (Responder) ```python from nats_bridge import smartreceive envelope = smartreceive(msg) for dataname, data, type in envelope["payloads"]: if data.get("action") == "read_sensor": response = {"sensor_id": "sensor-001", "value": 42.5} smartsend("/device/response", [("data", response, "dictionary")]) ``` #### JavaScript (Requester) ```javascript const { smartsend } = require('./src/NATSBridge'); await smartsend("/device/command", [ { dataname: "command", data: { action: "read_sensor" }, type: "dictionary" } ], { replyTo: "/device/response" }); ``` #### JavaScript (Responder) ```javascript const { smartreceive, smartsend } = require('./src/NATSBridge'); const envelope = await smartreceive(msg); for (const payload of envelope.payloads) { if (payload.dataname === "command" && payload.data.action === "read_sensor") { const response = { sensor_id: "sensor-001", value: 42.5 }; await smartsend("/device/response", [ { dataname: "data", data: response, type: "dictionary" } ]); } } ``` #### Julia (Requester) ```julia using NATSBridge env = smartsend( "/device/command", [("command", Dict("action" => "read_sensor"), "dictionary")], reply_to="/device/response" ) ``` #### Julia (Responder) ```julia using NATSBridge envelope = smartreceive(msg, fileserverDownloadHandler) for (dataname, data, type) in envelope["payloads"] if dataname == "command" && data["action"] == "read_sensor" response = Dict("sensor_id" => "sensor-001", "value" => 42.5) smartsend("/device/response", [("data", response, "dictionary")]) end end ``` ### Example 5: Micropython IoT Device Lightweight Micropython device sending sensor data. #### Micropython ```python from nats_bridge import smartsend, smartreceive import json # Send sensor data data = [("temperature", "25.5", "text"), ("humidity", 65, "dictionary")] smartsend("/device/sensors", data, nats_url="nats://localhost:4222") # Receive commands envelope = smartreceive(msg) for dataname, data, type in envelope["payloads"]: if type == "dictionary" and data.get("action") == "reboot": # Execute reboot pass ``` #### JavaScript (Receiver) ```javascript const { smartreceive } = require('./src/NATSBridge'); const envelope = await smartreceive(msg); for (const payload of envelope.payloads) { if (payload.dataname === "temperature") { console.log(`Temperature: ${payload.data}`); } else if (payload.dataname === "humidity") { console.log(`Humidity: ${payload.data}`); } } ``` #### Julia (Receiver) ```julia using NATSBridge envelope = smartreceive(msg, fileserverDownloadHandler) for (dataname, data, type) in envelope["payloads"] if dataname == "temperature" println("Temperature: $data") elseif dataname == "humidity" println("Humidity: $data") end end ``` --- ## Testing Run the test scripts to verify functionality: ### Python/Micropython ```bash # Basic functionality test python test/test_micropython_basic.py # Text message exchange python test/test_micropython_text_sender.py python test/test_micropython_text_receiver.py # Dictionary exchange python test/test_micropython_dict_sender.py python test/test_micropython_dict_receiver.py # File transfer python test/test_micropython_file_sender.py python test/test_micropython_file_receiver.py # Mixed payload types python test/test_micropython_mixed_sender.py python test/test_micropython_mixed_receiver.py ``` ### JavaScript ```bash # Text message exchange node test/test_js_text_sender.js node test/test_js_text_receiver.js # Dictionary exchange node test/test_js_dict_sender.js node test/test_js_dict_receiver.js # File transfer node test/test_js_file_sender.js node test/test_js_file_receiver.js # Mixed payload types node test/test_js_mix_payload_sender.js node test/test_js_mix_payloads_receiver.js # Table exchange node test/test_js_table_sender.js node test/test_js_table_receiver.js ``` ### Julia ```julia # Text message exchange julia test/test_julia_text_sender.jl julia test/test_julia_text_receiver.jl # Dictionary exchange julia test/test_julia_dict_sender.jl julia test/test_julia_dict_receiver.jl # File transfer julia test/test_julia_file_sender.jl julia test/test_julia_file_receiver.jl # Mixed payload types julia test/test_julia_mix_payloads_sender.jl julia test/test_julia_mix_payloads_receiver.jl # Table exchange julia test/test_julia_table_sender.jl julia test/test_julia_table_receiver.jl ``` --- ## License MIT License Copyright (c) 2026 NATSBridge Contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.