2026-02-23 09:39:24 +07:00
2026-02-22 21:55:18 +07:00
2026-02-23 06:58:16 +07:00
2026-02-23 07:58:18 +07:00
2026-02-23 07:17:31 +07:00
2026-02-23 07:08:17 +07:00
2026-02-15 21:02:22 +07:00
2026-02-22 20:43:28 +07:00
2026-02-22 14:15:24 +07:00
2026-02-19 19:25:34 +07:00
2026-02-23 06:28:41 +07:00
2026-02-23 09:39:24 +07:00
2026-02-10 07:12:16 +07:00
2026-02-10 13:29:41 +07:00

NATSBridge

A high-performance, bi-directional data bridge between Julia, JavaScript, and Python/Micropython applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.

License: MIT NATS


Table of Contents


Overview

NATSBridge enables seamless communication across Julia, JavaScript, and Python/Micropython applications through NATS, with intelligent transport selection based on payload size:

Transport Payload Size Method
Direct < 1MB Sent directly via NATS (Base64 encoded)
Link >= 1MB Uploaded to HTTP file server, URL sent via NATS

Use Cases

  • Chat Applications: Text, images, audio, video in a single message
  • File Transfer: Efficient transfer of large files using claim-check pattern
  • Streaming Data: Sensor data, telemetry, and analytics pipelines
  • Cross-Platform Communication: Julia ↔ JavaScript ↔ Python/Micropython
  • IoT Devices: Micropython devices sending data to cloud services

Features

  • Bi-directional messaging between Julia, JavaScript, and Python/Micropython
  • Multi-payload support - send multiple payloads with different types in one message
  • Automatic transport selection - direct vs link based on payload size
  • Claim-Check pattern for payloads > 1MB
  • Apache Arrow IPC support for tabular data (zero-copy reading)
  • Exponential backoff for reliable file server downloads
  • Correlation ID tracking for message tracing
  • Reply-to support for request-response patterns
  • JetStream support for message replay and durability
  • Lightweight Micropython implementation for microcontrollers

Architecture

System Components

┌─────────────────────────────────────────────────────────────────────┐
│                     NATSBridge Architecture                         │
├─────────────────────────────────────────────────────────────────────┤
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐           │
│  │   Julia      │    │  JavaScript  │    │  Python/     │           │
│  │  (NATS.jl)   │◄──►│  (nats.js)   │◄──►│  Micropython │           │
│  └──────────────┘    └──────────────┘    └──────────────┘           │
│       │                    │                    │                   │
│       ▼                    ▼                    ▼                   │
│    ┌─────────────────────────────────────────────────────┐          │
│    │                    NATS                             │          │
│    │               (Message Broker)                      │          │
│    └─────────────────────────────────────────────────────┘          │
│                              │                                      │
│                              ▼                                      │
│                    ┌──────────────────────┐                         │
│                    │  File Server         │                         │
│                    │  (HTTP Upload/Get)   │                         │
│                    └──────────────────────┘                         │
└─────────────────────────────────────────────────────────────────────┘

Message Flow

  1. Sender creates a message envelope with payloads
  2. NATSBridge serializes and encodes payloads based on type
  3. Transport Decision: Small payloads go directly to NATS, large payloads are uploaded to file server
  4. NATS routes messages to subscribers
  5. Receiver fetches payloads (from NATS or file server)
  6. NATSBridge deserializes and decodes payloads

Installation

Prerequisites

  • NATS Server (v2.10+ recommended)
  • HTTP File Server (optional, for payloads > 1MB)

Julia

using Pkg
Pkg.add("NATS")
Pkg.add("https://git.yiem.cc/ton/NATSBridge")

JavaScript

npm install nats.js apache-arrow uuid base64-url

For Node.js:

const { smartsend, smartreceive } = require('./src/NATSBridge');

For browser:

<script src="./src/NATSBridge.js"></script>
<script>
  // NATSBridge is available as window.NATSBridge
</script>

Python/Micropython

  1. Copy src/nats_bridge.py to your device
  2. Install dependencies:

For Python (desktop):

pip install nats-py

For Micropython:

  • urequests for HTTP requests (built-in for ESP32)
  • base64 for base64 encoding (built-in)
  • json for JSON handling (built-in)

Quick Start

Step 1: Start NATS Server

docker run -p 4222:4222 nats:latest

Step 2: Start HTTP File Server (Optional)

# Create a directory for file uploads
mkdir -p /tmp/fileserver

# Use Python's built-in server
python3 -m http.server 8080 --directory /tmp/fileserver

Step 3: Send Your First Message

Python/Micropython

from nats_bridge import smartsend

# Send a text message
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
print("Message sent!")

JavaScript

const { smartsend } = require('./src/NATSBridge');

// Send a text message
await smartsend("/chat/room1", [
    { dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });

console.log("Message sent!");

Julia

using NATSBridge

# Send a text message
data = [("message", "Hello World", "text")]
env = NATSBridge.smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
println("Message sent!")

Step 4: Receive Messages

Python/Micropython

import nats
import asyncio
from nats_bridge import smartreceive

# Configuration
SUBJECT = "/chat/room1"
NATS_URL = "nats://localhost:4222"

async def main():
    # Connect to NATS
    nc = await nats.connect(NATS_URL)
    
    # Subscribe to the subject - msg comes from the callback
    async def message_handler(msg):
        # Receive and process message
        envelope = smartreceive(msg.data)
        for dataname, data, type in envelope["payloads"]:
            print(f"Received {dataname}: {data}")
    
    sid = await nc.subscribe(SUBJECT, cb=message_handler)
    await asyncio.sleep(120)  # Keep listening
    await nc.close()

asyncio.run(main())

JavaScript

const { smartreceive } = require('./src/NATSBridge');
const { connect } = require('nats');

// Configuration
const SUBJECT = "/chat/room1";
const NATS_URL = "nats://localhost:4222";

async function main() {
    // Connect to NATS
    const nc = await connect({ servers: [NATS_URL] });
    
    // Subscribe to the subject - msg comes from the async iteration
    const sub = nc.subscribe(SUBJECT);
    
    for await (const msg of sub) {
        // Receive and process message
        const envelope = await smartreceive(msg);
        for (const payload of envelope.payloads) {
            console.log(`Received ${payload.dataname}: ${payload.data}`);
        }
    }
}

main();

Julia

using NATS, NATSBridge

# Configuration
const SUBJECT = "/chat/room1"
const NATS_URL = "nats://localhost:4222"

# Helper: Log with correlation ID
function log_trace(message)
    timestamp = Dates.now()
    println("[$timestamp] $message")
end

# Receiver: Listen for messages - msg comes from the callback
function test_receive()
    conn = NATS.connect(NATS_URL)
    NATS.subscribe(conn, SUBJECT) do msg
        log_trace("Received message on $(msg.subject)")
        
        # Receive and process message
        envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
        for (dataname, data, type) in envelope["payloads"]
            println("Received $dataname: $data")
        end
    end
    
    # Keep listening for 120 seconds
    sleep(120)
    NATS.drain(conn)
end

test_receive()

API Reference

smartsend

Sends data either directly via NATS or via a fileserver URL, depending on payload size.

Python/Micropython

from nats_bridge import smartsend

env = smartsend(
    subject,                           # NATS subject to publish to
    data,                              # List of (dataname, data, type) tuples
    nats_url="nats://localhost:4222",  # NATS server URL
    fileserver_url="http://localhost:8080",  # File server URL
    fileserver_upload_handler=plik_oneshot_upload,  # Upload handler function
    size_threshold=1_000_000,          # Threshold in bytes (default: 1MB)
    correlation_id=None,               # Optional correlation ID for tracing
    msg_purpose="chat",                # Message purpose
    sender_name="NATSBridge",          # Sender name
    receiver_name="",                  # Receiver name (empty = broadcast)
    receiver_id="",                    # Receiver UUID (empty = broadcast)
    reply_to="",                       # Reply topic
    reply_to_msg_id=""                 # Reply message ID
)

JavaScript

const { smartsend } = require('./src/NATSBridge');

const env = await smartsend(
    subject,                           // NATS subject
    data,                              // Array of {dataname, data, type}
    {
        natsUrl: "nats://localhost:4222",
        fileserverUrl: "http://localhost:8080",
        fileserverUploadHandler: customUploadHandler,
        sizeThreshold: 1_000_000,
        correlationId: "custom-id",
        msgPurpose: "chat",
        senderName: "NATSBridge",
        receiverName: "",
        receiverId: "",
        replyTo: "",
        replyToMsgId: ""
    }
);

Julia

using NATSBridge

env = NATSBridge.smartsend(
    subject;                           # NATS subject
    data::AbstractArray{Tuple{String, Any, String}};  # List of (dataname, data, type)
    nats_url::String = "nats://localhost:4222",
    fileserver_url = "http://localhost:8080",
    fileserverUploadHandler::Function = plik_oneshot_upload,
    size_threshold::Int = 1_000_000,
    correlation_id::Union{String, Nothing} = nothing,
    msg_purpose::String = "chat",
    sender_name::String = "NATSBridge",
    receiver_name::String = "",
    receiver_id::String = "",
    reply_to::String = "",
    reply_to_msg_id::String = ""
)

smartreceive

Receives and processes messages from NATS, handling both direct and link transport.

Python/Micropython

from nats_bridge import smartreceive

# Note: For nats-py, use msg.data to pass the raw message data
envelope = smartreceive(
    msg.data,                          # NATS message data (msg.data for nats-py)
    fileserver_download_handler=_fetch_with_backoff,  # Download handler
    max_retries=5,                     # Max retry attempts
    base_delay=100,                    # Initial delay in ms
    max_delay=5000                     # Max delay in ms
)
# Returns: Dict with envelope metadata and 'payloads' field

JavaScript

const { smartreceive } = require('./src/NATSBridge');

// Note: msg is the NATS message object from subscription
const envelope = await smartreceive(
    msg,                               // NATS message (raw object from subscription)
    {
        fileserverDownloadHandler: customDownloadHandler,
        maxRetries: 5,
        baseDelay: 100,
        maxDelay: 5000
    }
);
// Returns: Object with envelope metadata and payloads array

Julia

using NATSBridge

# Note: msg is a NATS.Msg object passed from the subscription callback
envelope = NATSBridge.smartreceive(
    msg::NATS.Msg;
    fileserverDownloadHandler::Function = _fetch_with_backoff,
    max_retries::Int = 5,
    base_delay::Int = 100,
    max_delay::Int = 5000
)
# Returns: Dict with envelope metadata and payloads array

Payload Types

Type Description Serialization
text Plain text strings UTF-8 bytes
dictionary JSON-serializable dictionaries JSON
table Tabular data (DataFrames, arrays) Apache Arrow IPC
image Image data (PNG, JPG) Raw bytes
audio Audio data (WAV, MP3) Raw bytes
video Video data (MP4, AVI) Raw bytes
binary Generic binary data Raw bytes

Transport Strategies

Direct Transport (Payloads < 1MB)

Small payloads are sent directly via NATS with Base64 encoding.

Python/Micropython

data = [("message", "Hello", "text")]
smartsend("/topic", data)

JavaScript

await smartsend("/topic", [
    { dataname: "message", data: "Hello", type: "text" }
]);

Julia

data = [("message", "Hello", "text")]
smartsend("/topic", data)

Large payloads are uploaded to an HTTP file server.

Python/Micropython

data = [("file", large_data, "binary")]
smartsend("/topic", data, fileserver_url="http://localhost:8080")

JavaScript

await smartsend("/topic", [
    { dataname: "file", data: largeData, type: "binary" }
], { fileserverUrl: "http://localhost:8080" });

Julia

data = [("file", large_data, "binary")]
smartsend("/topic", data, fileserver_url="http://localhost:8080")

Examples

All examples include code for Julia, JavaScript, and Python/Micropython unless otherwise specified.

Example 1: Chat with Mixed Content

Send text, small image, and large file in one message.

Python/Micropython

from nats_bridge import smartsend

data = [
    ("message_text", "Hello!", "text"),
    ("user_avatar", image_data, "image"),
    ("large_document", large_file_data, "binary")
]

smartsend("/chat/room1", data, fileserver_url="http://localhost:8080")

JavaScript

const { smartsend } = require('./src/NATSBridge');

await smartsend("/chat/room1", [
    { dataname: "message_text", data: "Hello!", type: "text" },
    { dataname: "user_avatar", data: image_data, type: "image" },
    { dataname: "large_document", data: large_file_data, type: "binary" }
], {
    fileserverUrl: "http://localhost:8080"
});

Julia

using NATSBridge

data = [
    ("message_text", "Hello!", "text"),
    ("user_avatar", image_data, "image"),
    ("large_document", large_file_data, "binary")
]

NATSBridge.smartsend("/chat/room1", data, fileserver_url="http://localhost:8080")

Example 2: Dictionary Exchange

Send configuration data between platforms.

Python/Micropython

from nats_bridge import smartsend

config = {
    "wifi_ssid": "MyNetwork",
    "wifi_password": "password123",
    "update_interval": 60
}

data = [("config", config, "dictionary")]
smartsend("/device/config", data)

JavaScript

const { smartsend } = require('./src/NATSBridge');

const config = {
    wifi_ssid: "MyNetwork",
    wifi_password: "password123",
    update_interval: 60
};

await smartsend("/device/config", [
    { dataname: "config", data: config, type: "dictionary" }
]);

Julia

using NATSBridge

config = Dict(
    "wifi_ssid" => "MyNetwork",
    "wifi_password" => "password123",
    "update_interval" => 60
)

data = [("config", config, "dictionary")]
NATSBridge.smartsend("/device/config", data)

Example 3: Table Data (Arrow IPC)

Send tabular data using Apache Arrow IPC format.

Python/Micropython

import pandas as pd
from nats_bridge import smartsend

df = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Charlie"],
    "score": [95, 88, 92]
})

data = [("students", df, "table")]
smartsend("/data/analysis", data)

JavaScript

const { smartsend } = require('./src/NATSBridge');

const tableData = [
    { id: 1, name: "Alice", score: 95 },
    { id: 2, name: "Bob", score: 88 },
    { id: 3, name: "Charlie", score: 92 }
];

await smartsend("/data/analysis", [
    { dataname: "students", data: tableData, type: "table" }
]);

Julia

using NATSBridge
using DataFrames

df = DataFrame(
    id = [1, 2, 3],
    name = ["Alice", "Bob", "Charlie"],
    score = [95, 88, 92]
)

data = [("students", df, "table")]
NATSBridge.smartsend("/data/analysis", data)

Example 4: Request-Response Pattern

Bi-directional communication with reply-to support.

Python/Micropython (Requester)

from nats_bridge import smartsend

env = smartsend(
    "/device/command",
    [("command", {"action": "read_sensor"}, "dictionary")],
    reply_to="/device/response"
)

Python/Micropython (Responder)

import nats
import asyncio
from nats_bridge import smartreceive, smartsend

# Configuration
SUBJECT = "/device/command"
REPLY_SUBJECT = "/device/response"
NATS_URL = "nats://localhost:4222"

async def main():
    nc = await nats.connect(NATS_URL)
    
    async def message_handler(msg):
        envelope = smartreceive(msg.data)
        for dataname, data, type in envelope["payloads"]:
            if data.get("action") == "read_sensor":
                response = {"sensor_id": "sensor-001", "value": 42.5}
                smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
    
    sid = await nc.subscribe(SUBJECT, cb=message_handler)
    await asyncio.sleep(120)
    await nc.close()

asyncio.run(main())

JavaScript (Requester)

const { smartsend } = require('./src/NATSBridge');

await smartsend("/device/command", [
    { dataname: "command", data: { action: "read_sensor" }, type: "dictionary" }
], {
    replyTo: "/device/response"
});

JavaScript (Responder)

const { smartreceive, smartsend } = require('./src/NATSBridge');
const { connect } = require('nats');

// Configuration
const SUBJECT = "/device/command";
const REPLY_SUBJECT = "/device/response";
const NATS_URL = "nats://localhost:4222";

async function main() {
    const nc = await connect({ servers: [NATS_URL] });
    
    const sub = nc.subscribe(SUBJECT);
    
    for await (const msg of sub) {
        const envelope = await smartreceive(msg);
        for (const payload of envelope.payloads) {
            if (payload.dataname === "command" && payload.data.action === "read_sensor") {
                const response = { sensor_id: "sensor-001", value: 42.5 };
                await smartsend(REPLY_SUBJECT, [
                    { dataname: "data", data: response, type: "dictionary" }
                ]);
            }
        }
    }
}

main();

Julia (Requester)

using NATSBridge

env = NATSBridge.smartsend(
    "/device/command",
    [("command", Dict("action" => "read_sensor"), "dictionary")],
    reply_to="/device/response"
)

Julia (Responder)

using NATS, NATSBridge

# Configuration
const SUBJECT = "/device/command"
const REPLY_SUBJECT = "/device/response"
const NATS_URL = "nats://localhost:4222"

function test_responder()
    conn = NATS.connect(NATS_URL)
    NATS.subscribe(conn, SUBJECT) do msg
        envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
        for (dataname, data, type) in envelope["payloads"]
            if dataname == "command" && data["action"] == "read_sensor"
                response = Dict("sensor_id" => "sensor-001", "value" => 42.5)
                smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
            end
        end
    end
    
    sleep(120)
    NATS.drain(conn)
end

test_responder()

Example 5: Micropython IoT Device

Lightweight Micropython device sending sensor data.

Micropython

import nats
import asyncio
from nats_bridge import smartsend, smartreceive

# Configuration
SUBJECT = "/device/sensors"
NATS_URL = "nats://localhost:4222"

async def main():
    nc = await nats.connect(NATS_URL)
    
    # Send sensor data
    data = [("temperature", "25.5", "text"), ("humidity", 65, "dictionary")]
    smartsend("/device/sensors", data, nats_url="nats://localhost:4222")
    
    # Receive commands - msg comes from the callback
    async def message_handler(msg):
        envelope = smartreceive(msg.data)
        for dataname, data, type in envelope["payloads"]:
            if type == "dictionary" and data.get("action") == "reboot":
                # Execute reboot
                pass
    
    sid = await nc.subscribe(SUBJECT, cb=message_handler)
    await asyncio.sleep(120)
    await nc.close()

asyncio.run(main())

JavaScript (Receiver)

const { smartreceive } = require('./src/NATSBridge');
const { connect } = require('nats');

// Configuration
const SUBJECT = "/device/sensors";
const NATS_URL = "nats://localhost:4222";

async function main() {
    const nc = await connect({ servers: [NATS_URL] });
    
    const sub = nc.subscribe(SUBJECT);
    
    for await (const msg of sub) {
        const envelope = await smartreceive(msg);
        for (const payload of envelope.payloads) {
            if (payload.dataname === "temperature") {
                console.log(`Temperature: ${payload.data}`);
            } else if (payload.dataname === "humidity") {
                console.log(`Humidity: ${payload.data}`);
            }
        }
    }
}

main();

Julia (Receiver)

using NATS, NATSBridge

# Configuration
const SUBJECT = "/device/sensors"
const NATS_URL = "nats://localhost:4222"

function test_receiver()
    conn = NATS.connect(NATS_URL)
    NATS.subscribe(conn, SUBJECT) do msg
        envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
        for (dataname, data, type) in envelope["payloads"]
            if dataname == "temperature"
                println("Temperature: $data")
            elseif dataname == "humidity"
                println("Humidity: $data")
            end
        end
    end
    
    sleep(120)
    NATS.drain(conn)
end

test_receiver()

Testing

Run the test scripts to verify functionality:

Python/Micropython

# Basic functionality test
python test/test_micropython_basic.py

# Text message exchange
python test/test_micropython_text_sender.py
python test/test_micropython_text_receiver.py

# Dictionary exchange
python test/test_micropython_dict_sender.py
python test/test_micropython_dict_receiver.py

# File transfer
python test/test_micropython_file_sender.py
python test/test_micropython_file_receiver.py

# Mixed payload types
python test/test_micropython_mixed_sender.py
python test/test_micropython_mixed_receiver.py

JavaScript

# Text message exchange
node test/test_js_text_sender.js
node test/test_js_text_receiver.js

# Dictionary exchange
node test/test_js_dict_sender.js
node test/test_js_dict_receiver.js

# File transfer
node test/test_js_file_sender.js
node test/test_js_file_receiver.js

# Mixed payload types
node test/test_js_mix_payload_sender.js
node test/test_js_mix_payloads_receiver.js

# Table exchange
node test/test_js_table_sender.js
node test/test_js_table_receiver.js

Julia

# Text message exchange
julia test/test_julia_text_sender.jl
julia test/test_julia_text_receiver.jl

# Dictionary exchange
julia test/test_julia_dict_sender.jl
julia test/test_julia_dict_receiver.jl

# File transfer
julia test/test_julia_file_sender.jl
julia test/test_julia_file_receiver.jl

# Mixed payload types
julia test/test_julia_mix_payloads_sender.jl
julia test/test_julia_mix_payloads_receiver.jl

# Table exchange
julia test/test_julia_table_sender.jl
julia test/test_julia_table_receiver.jl

License

MIT License

Copyright (c) 2026 NATSBridge Contributors

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Description
No description provided
Readme 3.4 MiB
v0.4.1 Latest
2026-02-22 07:16:57 +00:00
Languages
Julia 40.7%
JavaScript 34.5%
Python 24.8%