NATSBridge
A high-performance, bi-directional data bridge for Julia applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
Table of Contents
- Overview
- Features
- Architecture
- Installation
- Quick Start
- API Reference
- Payload Types
- Transport Strategies
- Examples
- Testing
- License
Overview
NATSBridge enables seamless communication for Julia applications through NATS, with intelligent transport selection based on payload size:
| Transport | Payload Size | Method |
|---|---|---|
| Direct | < 1MB | Sent directly via NATS (Base64 encoded) |
| Link | >= 1MB | Uploaded to HTTP file server, URL sent via NATS |
Use Cases
- Chat Applications: Text, images, audio, video in a single message
- File Transfer: Efficient transfer of large files using claim-check pattern
- Streaming Data: Sensor data, telemetry, and analytics pipelines
Features
- ✅ Bi-directional messaging for Julia applications
- ✅ Multi-payload support - send multiple payloads with different types in one message
- ✅ Automatic transport selection - direct vs link based on payload size
- ✅ Claim-Check pattern for payloads > 1MB
- ✅ Apache Arrow IPC support for tabular data (zero-copy reading)
- ✅ Exponential backoff for reliable file server downloads
- ✅ Correlation ID tracking for message tracing
- ✅ Reply-to support for request-response patterns
- ✅ JetStream support for message replay and durability
Architecture
System Components
┌─────────────────────────────────────────────────────────────────────┐
│ NATSBridge Architecture │
├─────────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ │ │
│ │ Julia │ ▼ │
│ │ (NATS.jl) │ ┌─────────────────────────┐ │
│ └──────────────┘ │ NATS │ │
│ │ (Message Broker) │ │
│ └─────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ File Server │ │
│ │ (HTTP Upload/Get) │ │
│ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Message Flow
- Sender creates a message envelope with payloads
- NATSBridge serializes and encodes payloads based on type
- Transport Decision: Small payloads go directly to NATS, large payloads are uploaded to file server
- NATS routes messages to subscribers
- Receiver fetches payloads (from NATS or file server)
- NATSBridge deserializes and decodes payloads
Installation
Prerequisites
- NATS Server (v2.10+ recommended)
- HTTP File Server (optional, for payloads > 1MB)
Julia
using Pkg
Pkg.add("NATS")
Pkg.add("https://git.yiem.cc/ton/NATSBridge")
Quick Start
Step 1: Start NATS Server
docker run -p 4222:4222 nats:latest
Step 2: Start HTTP File Server (Optional)
# Create a directory for file uploads
mkdir -p /tmp/fileserver
# Start HTTP file server
python3 -m http.server 8080 --directory /tmp/fileserver
Step 3: Send Your First Message
Julia
using NATSBridge
# Send a text message
data = [("message", "Hello World", "text")]
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; broker_url="nats://localhost:4222")
println("Message sent!")
Step 4: Receive Messages
Julia
using NATS, NATSBridge
# Configuration
const SUBJECT = "/chat/room1"
const NATS_URL = "nats://localhost:4222"
# Helper: Log with correlation ID
function log_trace(message)
timestamp = Dates.now()
println("[$timestamp] $message")
end
# Receiver: Listen for messages - msg comes from the callback
function test_receive()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
log_trace("Received message on $(msg.subject)")
# Receive and process message
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data")
end
end
# Keep listening for 120 seconds
sleep(120)
NATS.drain(conn)
end
test_receive()
API Reference
smartsend
Sends data either directly via NATS or via a fileserver URL, depending on payload size.
Julia
using NATSBridge
env, env_json_str = NATSBridge.smartsend(
subject, # NATS subject
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type)
broker_url::String = "nats://localhost:4222",
fileserver_url = "http://localhost:8080",
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true, # Whether to automatically publish to NATS
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
)
# Returns: (msgEnvelope_v1, JSON string)
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
# - env_json_str: JSON string representation of the envelope for publishing
smartreceive
Receives and processes messages from NATS, handling both direct and link transport.
Julia
using NATSBridge
# Note: msg is a NATS.Msg object passed from the subscription callback
env = NATSBridge.smartreceive(
msg::NATS.Msg;
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)
# Returns: Dict with envelope metadata and payloads array
publish_message
Publish a message to a NATS subject. This function is available in Julia with two overloads:
Julia
Using broker URL (creates new connection):
using NATSBridge, NATS
# Publish with URL - creates a new connection
NATSBridge.publish_message(
"nats://localhost:4222", # broker_url
"/chat/room1", # subject
"{\"correlation_id\":\"abc123\"}", # message
"abc123" # correlation_id
)
Using pre-existing connection (saves connection overhead):
using NATSBridge, NATS
# Create connection once and reuse
conn = NATS.connect("nats://localhost:4222")
NATSBridge.publish_message(conn, "/chat/room1", "{\"correlation_id\":\"abc123\"}", "abc123")
# Connection is automatically drained after publish
Payload Types
| Type | Description | Serialization |
|---|---|---|
text |
Plain text strings | UTF-8 bytes |
dictionary |
JSON-serializable dictionaries | JSON |
table |
Tabular data (DataFrames, arrays) | Apache Arrow IPC |
image |
Image data (PNG, JPG) | Raw bytes |
audio |
Audio data (WAV, MP3) | Raw bytes |
video |
Video data (MP4, AVI) | Raw bytes |
binary |
Generic binary data | Raw bytes |
Transport Strategies
Direct Transport (Payloads < 1MB)
Small payloads are sent directly via NATS with Base64 encoding.
Julia
data = [("message", "Hello", "text")]
smartsend("/topic", data)
Link Transport (Payloads >= 1MB)
Large payloads are uploaded to an HTTP file server.
Julia
data = [("file", large_data, "binary")]
smartsend("/topic", data; fileserver_url="http://localhost:8080")
Examples
Example 1: Chat with Mixed Content
Send text, small image, and large file in one message.
Julia
using NATSBridge
data = [
("message_text", "Hello!", "text"),
("user_avatar", image_data, "image"),
("large_document", large_file_data, "binary")
]
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; fileserver_url="http://localhost:8080")
Example 2: Dictionary Exchange
Send configuration data between platforms.
Julia
using NATSBridge
config = Dict(
"wifi_ssid" => "MyNetwork",
"wifi_password" => "password123",
"update_interval" => 60
)
data = [("config", config, "dictionary")]
env, env_json_str = NATSBridge.smartsend("/device/config", data)
Example 3: Table Data (Arrow IPC)
Send tabular data using Apache Arrow IPC format.
Julia
using NATSBridge
using DataFrames
df = DataFrame(
id = [1, 2, 3],
name = ["Alice", "Bob", "Charlie"],
score = [95, 88, 92]
)
data = [("students", df, "table")]
env, env_json_str = NATSBridge.smartsend("/data/analysis", data)
Example 4: Request-Response Pattern with Envelope JSON
Bi-directional communication with reply-to support. The smartsend function now returns both the envelope object and a JSON string that can be published directly.
Julia (Requester)
using NATSBridge
env, env_json_str = NATSBridge.smartsend(
"/device/command",
[("command", Dict("action" => "read_sensor"), "dictionary")];
broker_url="nats://localhost:4222",
reply_to="/device/response"
)
Julia (Responder)
using NATS, NATSBridge
# Configuration
const SUBJECT = "/device/command"
const NATS_URL = "nats://localhost:4222"
function test_responder()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
env = NATSBridge.smartreceive(msg, fileserver_download_handler=_fetch_with_backoff)
# Extract reply_to from the envelope metadata
reply_to = env["reply_to"]
for (dataname, data, type) in env["payloads"]
if dataname == "command" && data["action"] == "read_sensor"
response = Dict("sensor_id" => "sensor-001", "value" => 42.5)
# Send response to the reply_to subject from the request
if !isempty(reply_to)
smartsend(reply_to, [("data", response, "dictionary")])
end
end
end
end
sleep(120)
NATS.drain(conn)
end
test_responder()
Example 5: IoT Device Sensor Data
IoT device sending sensor data.
Julia (Receiver)
using NATS, NATSBridge
# Configuration
const SUBJECT = "/device/sensors"
const NATS_URL = "nats://localhost:4222"
function test_receiver()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
if dataname == "temperature"
println("Temperature: $data")
elseif dataname == "humidity"
println("Humidity: $data")
end
end
end
sleep(120)
NATS.drain(conn)
end
test_receiver()
Testing
Run the test scripts to verify functionality:
Julia
# Text message exchange
julia test/test_julia_text_sender.jl
julia test/test_julia_text_receiver.jl
# Dictionary exchange
julia test/test_julia_dict_sender.jl
julia test/test_julia_dict_receiver.jl
# File transfer
julia test/test_julia_file_sender.jl
julia test/test_julia_file_receiver.jl
# Mixed payload types
julia test/test_julia_mix_payloads_sender.jl
julia test/test_julia_mix_payloads_receiver.jl
# Table exchange
julia test/test_julia_table_sender.jl
julia test/test_julia_table_receiver.jl
License
MIT License
Copyright (c) 2026 NATSBridge Contributors
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.