26 Commits

Author SHA1 Message Date
7f68d08134 update 2026-02-24 21:40:33 +07:00
ab20cd896f update 2026-02-24 21:18:19 +07:00
5a9e93d6e7 update 2026-02-24 20:38:45 +07:00
b51641dc7e update 2026-02-24 20:09:10 +07:00
45f1257896 update 2026-02-24 18:50:28 +07:00
3e2b8b1e3a update 2026-02-24 18:19:03 +07:00
90d81617ef update 2026-02-24 17:58:59 +07:00
64c62e616b update 2026-02-23 22:06:57 +07:00
2c340e37c7 update 2026-02-23 22:00:06 +07:00
7853e94d2e update 2026-02-23 21:54:50 +07:00
99bf57b154 update 2026-02-23 21:43:09 +07:00
0fa6eaf95b update 2026-02-23 21:37:50 +07:00
76f42be740 update 2026-02-23 21:32:22 +07:00
d99dc41be9 update 2026-02-23 21:09:36 +07:00
263508b8f7 update 2026-02-23 20:50:41 +07:00
0c2cca30ed update 2026-02-23 20:34:08 +07:00
46fdf668c6 update 2026-02-23 19:18:12 +07:00
f8a92a45a0 update README.md 2026-02-23 09:39:24 +07:00
cec70e6036 update 2026-02-23 08:11:03 +07:00
f9e08ba628 add Plik fileserver 2026-02-23 07:58:18 +07:00
c12a078149 update README.md 2026-02-23 07:55:10 +07:00
dedd803dc3 fix README.md 2026-02-23 07:24:54 +07:00
e8e927a491 move README.md 2026-02-23 07:17:31 +07:00
ton
d950bbac23 Merge pull request 'smartreceive_return_envelope' (#7) from smartreceive_return_envelope into main
Reviewed-on: #7
2026-02-23 00:11:09 +00:00
fc8da2ebf5 update 2026-02-23 07:08:17 +07:00
f6e50c405f update 2026-02-23 07:06:53 +07:00
35 changed files with 2037 additions and 1019 deletions

View File

@@ -1,6 +1,6 @@
name = "NATSBridge"
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
version = "0.4.2"
version = "0.4.3"
authors = ["narawat <narawat@gmail.com>"]
[deps]

968
README.md Normal file
View File

@@ -0,0 +1,968 @@
# NATSBridge
A high-performance, bi-directional data bridge between **Julia**, **JavaScript**, and **Python/Micropython** applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![NATS](https://img.shields.io/badge/NATS-Enabled-green.svg)](https://nats.io)
---
## Table of Contents
- [Overview](#overview)
- [Features](#features)
- [Architecture](#architecture)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [API Reference](#api-reference)
- [Payload Types](#payload-types)
- [Transport Strategies](#transport-strategies)
- [Examples](#examples)
- [Testing](#testing)
- [License](#license)
---
## Overview
NATSBridge enables seamless communication across Julia, JavaScript, and Python/Micropython applications through NATS, with intelligent transport selection based on payload size:
| Transport | Payload Size | Method |
|-----------|--------------|--------|
| **Direct** | < 1MB | Sent directly via NATS (Base64 encoded) |
| **Link** | >= 1MB | Uploaded to HTTP file server, URL sent via NATS |
### Use Cases
- **Chat Applications**: Text, images, audio, video in a single message
- **File Transfer**: Efficient transfer of large files using claim-check pattern
- **Streaming Data**: Sensor data, telemetry, and analytics pipelines
- **Cross-Platform Communication**: Julia ↔ JavaScript ↔ Python/Micropython
- **IoT Devices**: Micropython devices sending data to cloud services
---
## Features
-**Bi-directional messaging** between Julia, JavaScript, and Python/Micropython
-**Multi-payload support** - send multiple payloads with different types in one message
-**Automatic transport selection** - direct vs link based on payload size
-**Claim-Check pattern** for payloads > 1MB
-**Apache Arrow IPC** support for tabular data (zero-copy reading)
-**Exponential backoff** for reliable file server downloads
-**Correlation ID tracking** for message tracing
-**Reply-to support** for request-response patterns
-**JetStream support** for message replay and durability
-**Lightweight Micropython implementation** for microcontrollers
---
## Architecture
### System Components
```
┌─────────────────────────────────────────────────────────────────────┐
│ NATSBridge Architecture │
├─────────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Julia │ │ JavaScript │ │ Python/ │ │
│ │ (NATS.jl) │◄──►│ (nats.js) │◄──►│ Micropython │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ NATS │ │
│ │ (Message Broker) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ File Server │ │
│ │ (HTTP Upload/Get) │ │
│ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
```
### Message Flow
1. **Sender** creates a message envelope with payloads
2. **NATSBridge** serializes and encodes payloads based on type
3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server
4. **NATS** routes messages to subscribers
5. **Receiver** fetches payloads (from NATS or file server)
6. **NATSBridge** deserializes and decodes payloads
---
## Installation
### Prerequisites
- **NATS Server** (v2.10+ recommended)
- **HTTP File Server** (optional, for payloads > 1MB)
### Julia
```julia
using Pkg
Pkg.add("NATS")
Pkg.add("https://git.yiem.cc/ton/NATSBridge")
```
### JavaScript
```bash
npm install nats.js apache-arrow uuid base64-url
```
For Node.js:
```javascript
const { smartsend, smartreceive } = require('./src/NATSBridge');
```
For browser:
```html
<script src="./src/NATSBridge.js"></script>
<script>
// NATSBridge is available as window.NATSBridge
</script>
```
### Python/Micropython
1. Copy [`src/nats_bridge.py`](src/nats_bridge.py) to your device
2. Install dependencies:
**For Python (desktop):**
```bash
pip install nats-py
```
**For Micropython:**
- `urequests` for HTTP requests (built-in for ESP32)
- `base64` for base64 encoding (built-in)
- `json` for JSON handling (built-in)
---
## Quick Start
### Step 1: Start NATS Server
```bash
docker run -p 4222:4222 nats:latest
```
### Step 2: Start HTTP File Server (Optional)
```bash
# Create a directory for file uploads
mkdir -p /tmp/fileserver
# Use Python's built-in server
python3 -m http.server 8080 --directory /tmp/fileserver
```
### Step 3: Send Your First Message
#### Python/Micropython
```python
from nats_bridge import smartsend
# Send a text message
data = [("message", "Hello World", "text")]
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
print("Message sent!")
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
// Send a text message
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
console.log("Message sent!");
```
#### Julia
```julia
using NATSBridge
# Send a text message
data = [("message", "Hello World", "text")]
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; nats_url="nats://localhost:4222")
println("Message sent!")
```
### Step 4: Receive Messages
#### Python/Micropython
```python
import nats
import asyncio
from nats_bridge import smartreceive
# Configuration
SUBJECT = "/chat/room1"
NATS_URL = "nats://localhost:4222"
async def main():
# Connect to NATS
nc = await nats.connect(NATS_URL)
# Subscribe to the subject - msg comes from the callback
async def message_handler(msg):
# Receive and process message
env = smartreceive(msg.data)
for dataname, data, type in env["payloads"]:
print(f"Received {dataname}: {data}")
sid = await nc.subscribe(SUBJECT, cb=message_handler)
await asyncio.sleep(120) # Keep listening
await nc.close()
asyncio.run(main())
```
#### JavaScript
```javascript
const { smartreceive } = require('./src/NATSBridge');
const { connect } = require('nats');
// Configuration
const SUBJECT = "/chat/room1";
const NATS_URL = "nats://localhost:4222";
async function main() {
// Connect to NATS
const nc = await connect({ servers: [NATS_URL] });
// Subscribe to the subject - msg comes from the async iteration
const sub = nc.subscribe(SUBJECT);
for await (const msg of sub) {
// Receive and process message
const env = await smartreceive(msg);
for (const payload of env.payloads) {
console.log(`Received ${payload.dataname}: ${payload.data}`);
}
}
}
main();
```
#### Julia
```julia
using NATS, NATSBridge
# Configuration
const SUBJECT = "/chat/room1"
const NATS_URL = "nats://localhost:4222"
# Helper: Log with correlation ID
function log_trace(message)
timestamp = Dates.now()
println("[$timestamp] $message")
end
# Receiver: Listen for messages - msg comes from the callback
function test_receive()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
log_trace("Received message on $(msg.subject)")
# Receive and process message
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data")
end
end
# Keep listening for 120 seconds
sleep(120)
NATS.drain(conn)
end
test_receive()
```
---
## API Reference
### smartsend
Sends data either directly via NATS or via a fileserver URL, depending on payload size.
#### Python/Micropython
```python
from nats_bridge import smartsend
env, env_json_str = smartsend(
subject, # NATS subject to publish to
data, # List of (dataname, data, type) tuples
nats_url="nats://localhost:4222", # NATS server URL
fileserver_url="http://localhost:8080", # File server URL
fileserver_upload_handler=plik_oneshot_upload, # Upload handler function
size_threshold=1_000_000, # Threshold in bytes (default: 1MB)
correlation_id=None, # Optional correlation ID for tracing
msg_purpose="chat", # Message purpose
sender_name="NATSBridge", # Sender name
receiver_name="", # Receiver name (empty = broadcast)
receiver_id="", # Receiver UUID (empty = broadcast)
reply_to="", # Reply topic
reply_to_msg_id="", # Reply message ID
is_publish=True # Whether to automatically publish to NATS
)
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
const { env, env_json_str } = await smartsend(
subject, // NATS subject
data, // Array of {dataname, data, type}
{
natsUrl: "nats://localhost:4222",
fileserverUrl: "http://localhost:8080",
fileserverUploadHandler: customUploadHandler,
sizeThreshold: 1_000_000,
correlationId: "custom-id",
msgPurpose: "chat",
senderName: "NATSBridge",
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: "",
isPublish: true // Whether to automatically publish to NATS
}
);
```
#### Julia
```julia
using NATSBridge
env, env_json_str = NATSBridge.smartsend(
subject, # NATS subject
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type)
nats_url::String = "nats://localhost:4222",
fileserver_url = "http://localhost:8080",
fileserverUploadHandler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS
)
# Returns: (msgEnvelope_v1, JSON string)
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
# - env_json_str: JSON string representation of the envelope for publishing
```
### smartreceive
Receives and processes messages from NATS, handling both direct and link transport.
#### Python/Micropython
```python
from nats_bridge import smartreceive
# Note: For nats-py, use msg.data to pass the raw message data
env = smartreceive(
msg.data, # NATS message data (msg.data for nats-py)
fileserver_download_handler=_fetch_with_backoff, # Download handler
max_retries=5, # Max retry attempts
base_delay=100, # Initial delay in ms
max_delay=5000 # Max delay in ms
)
# Returns: Dict with envelope metadata and 'payloads' field
```
#### JavaScript
```javascript
const { smartreceive } = require('./src/NATSBridge');
// Note: msg is the NATS message object from subscription
const env = await smartreceive(
msg, // NATS message (raw object from subscription)
{
fileserverDownloadHandler: customDownloadHandler,
maxRetries: 5,
baseDelay: 100,
maxDelay: 5000
}
);
// Returns: Object with envelope metadata and payloads array
```
#### Julia
```julia
using NATSBridge
# Note: msg is a NATS.Msg object passed from the subscription callback
env, env_json_str = NATSBridge.smartreceive(
msg::NATS.Msg;
fileserverDownloadHandler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)
# Returns: Dict with envelope metadata and payloads array
```
---
## Payload Types
| Type | Description | Serialization |
|------|-------------|---------------|
| `text` | Plain text strings | UTF-8 bytes |
| `dictionary` | JSON-serializable dictionaries | JSON |
| `table` | Tabular data (DataFrames, arrays) | Apache Arrow IPC |
| `image` | Image data (PNG, JPG) | Raw bytes |
| `audio` | Audio data (WAV, MP3) | Raw bytes |
| `video` | Video data (MP4, AVI) | Raw bytes |
| `binary` | Generic binary data | Raw bytes |
---
## Transport Strategies
### Direct Transport (Payloads < 1MB)
Small payloads are sent directly via NATS with Base64 encoding.
#### Python/Micropython
```python
data = [("message", "Hello", "text")]
smartsend("/topic", data)
```
#### JavaScript
```javascript
await smartsend("/topic", [
{ dataname: "message", data: "Hello", type: "text" }
]);
```
#### Julia
```julia
data = [("message", "Hello", "text")]
smartsend("/topic", data)
```
### Link Transport (Payloads >= 1MB)
Large payloads are uploaded to an HTTP file server.
#### Python/Micropython
```python
data = [("file", large_data, "binary")]
smartsend("/topic", data, fileserver_url="http://localhost:8080")
```
#### JavaScript
```javascript
await smartsend("/topic", [
{ dataname: "file", data: largeData, type: "binary" }
], { fileserverUrl: "http://localhost:8080" });
```
#### Julia
```julia
data = [("file", large_data, "binary")]
smartsend("/topic", data; fileserver_url="http://localhost:8080")
```
---
## Examples
All examples include code for **Julia**, **JavaScript**, and **Python/Micropython** unless otherwise specified.
### Example 1: Chat with Mixed Content
Send text, small image, and large file in one message.
#### Python/Micropython
```python
from nats_bridge import smartsend
data = [
("message_text", "Hello!", "text"),
("user_avatar", image_data, "image"),
("large_document", large_file_data, "binary")
]
env, env_json_str = smartsend("/chat/room1", data, fileserver_url="http://localhost:8080")
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message_text", data: "Hello!", type: "text" },
{ dataname: "user_avatar", data: image_data, type: "image" },
{ dataname: "large_document", data: large_file_data, type: "binary" }
], {
fileserverUrl: "http://localhost:8080"
});
```
#### Julia
```julia
using NATSBridge
data = [
("message_text", "Hello!", "text"),
("user_avatar", image_data, "image"),
("large_document", large_file_data, "binary")
]
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; fileserver_url="http://localhost:8080")
```
### Example 2: Dictionary Exchange
Send configuration data between platforms.
#### Python/Micropython
```python
from nats_bridge import smartsend
config = {
"wifi_ssid": "MyNetwork",
"wifi_password": "password123",
"update_interval": 60
}
data = [("config", config, "dictionary")]
env, env_json_str = smartsend("/device/config", data)
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
const config = {
wifi_ssid: "MyNetwork",
wifi_password: "password123",
update_interval: 60
};
const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" }
]);
```
#### Julia
```julia
using NATSBridge
config = Dict(
"wifi_ssid" => "MyNetwork",
"wifi_password" => "password123",
"update_interval" => 60
)
data = [("config", config, "dictionary")]
env, env_json_str = NATSBridge.smartsend("/device/config", data)
```
### Example 3: Table Data (Arrow IPC)
Send tabular data using Apache Arrow IPC format.
#### Python/Micropython
```python
import pandas as pd
from nats_bridge import smartsend
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"score": [95, 88, 92]
})
data = [("students", df, "table")]
env, env_json_str = smartsend("/data/analysis", data)
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
const tableData = [
{ id: 1, name: "Alice", score: 95 },
{ id: 2, name: "Bob", score: 88 },
{ id: 3, name: "Charlie", score: 92 }
];
const { env, env_json_str } = await smartsend("/data/analysis", [
{ dataname: "students", data: tableData, type: "table" }
]);
```
#### Julia
```julia
using NATSBridge
using DataFrames
df = DataFrame(
id = [1, 2, 3],
name = ["Alice", "Bob", "Charlie"],
score = [95, 88, 92]
)
data = [("students", df, "table")]
env, env_json_str = NATSBridge.smartsend("/data/analysis", data)
```
### Example 4: Request-Response Pattern with Envelope JSON
Bi-directional communication with reply-to support. The `smartsend` function now returns both the envelope object and a JSON string that can be published directly.
#### Python/Micropython (Requester)
```python
from nats_bridge import smartsend
env, env_json_str = smartsend(
"/device/command",
[("command", {"action": "read_sensor"}, "dictionary")],
reply_to="/device/response"
)
# env: msgEnvelope_v1 object
# env_json_str: JSON string for publishing to NATS
# The env_json_str can also be published directly using NATS request-reply pattern
# nc.request("/device/command", env_json_str, reply_to="/device/response")
```
#### Python/Micropython (Responder)
```python
import nats
import asyncio
from nats_bridge import smartreceive, smartsend
# Configuration
SUBJECT = "/device/command"
REPLY_SUBJECT = "/device/response"
NATS_URL = "nats://localhost:4222"
async def main():
nc = await nats.connect(NATS_URL)
async def message_handler(msg):
env = smartreceive(msg.data)
for dataname, data, type in env["payloads"]:
if data.get("action") == "read_sensor":
response = {"sensor_id": "sensor-001", "value": 42.5}
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
sid = await nc.subscribe(SUBJECT, cb=message_handler)
await asyncio.sleep(120)
await nc.close()
asyncio.run(main())
```
#### JavaScript (Requester)
```javascript
const { smartsend } = require('./src/NATSBridge');
const { env, env_json_str } = await smartsend("/device/command", [
{ dataname: "command", data: { action: "read_sensor" }, type: "dictionary" }
], {
replyTo: "/device/response"
});
```
#### JavaScript (Responder)
```javascript
const { smartreceive, smartsend } = require('./src/NATSBridge');
const { connect } = require('nats');
// Configuration
const SUBJECT = "/device/command";
const REPLY_SUBJECT = "/device/response";
const NATS_URL = "nats://localhost:4222";
async function main() {
const nc = await connect({ servers: [NATS_URL] });
const sub = nc.subscribe(SUBJECT);
for await (const msg of sub) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
if (payload.dataname === "command" && payload.data.action === "read_sensor") {
const response = { sensor_id: "sensor-001", value: 42.5 };
await smartsend(REPLY_SUBJECT, [
{ dataname: "data", data: response, type: "dictionary" }
]);
}
}
}
}
main();
```
#### Julia (Requester)
```julia
using NATSBridge
env, env_json_str = NATSBridge.smartsend(
"/device/command",
[("command", Dict("action" => "read_sensor"), "dictionary")];
reply_to="/device/response"
)
```
#### Julia (Responder)
```julia
using NATS, NATSBridge
# Configuration
const SUBJECT = "/device/command"
const REPLY_SUBJECT = "/device/response"
const NATS_URL = "nats://localhost:4222"
function test_responder()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
if dataname == "command" && data["action"] == "read_sensor"
response = Dict("sensor_id" => "sensor-001", "value" => 42.5)
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
end
end
end
sleep(120)
NATS.drain(conn)
end
test_responder()
```
### Example 5: Micropython IoT Device
Lightweight Micropython device sending sensor data.
#### Micropython
```python
import nats
import asyncio
from nats_bridge import smartsend, smartreceive
# Configuration
SUBJECT = "/device/sensors"
NATS_URL = "nats://localhost:4222"
async def main():
nc = await nats.connect(NATS_URL)
# Send sensor data
data = [("temperature", "25.5", "text"), ("humidity", 65, "dictionary")]
smartsend("/device/sensors", data, nats_url="nats://localhost:4222")
# Receive commands - msg comes from the callback
async def message_handler(msg):
env = smartreceive(msg.data)
for dataname, data, type in env["payloads"]:
if type == "dictionary" and data.get("action") == "reboot":
# Execute reboot
pass
sid = await nc.subscribe(SUBJECT, cb=message_handler)
await asyncio.sleep(120)
await nc.close()
asyncio.run(main())
```
#### JavaScript (Receiver)
```javascript
const { smartreceive } = require('./src/NATSBridge');
const { connect } = require('nats');
// Configuration
const SUBJECT = "/device/sensors";
const NATS_URL = "nats://localhost:4222";
async function main() {
const nc = await connect({ servers: [NATS_URL] });
const sub = nc.subscribe(SUBJECT);
for await (const msg of sub) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
if (payload.dataname === "temperature") {
console.log(`Temperature: ${payload.data}`);
} else if (payload.dataname === "humidity") {
console.log(`Humidity: ${payload.data}`);
}
}
}
}
main();
```
#### Julia (Receiver)
```julia
using NATS, NATSBridge
# Configuration
const SUBJECT = "/device/sensors"
const NATS_URL = "nats://localhost:4222"
function test_receiver()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
if dataname == "temperature"
println("Temperature: $data")
elseif dataname == "humidity"
println("Humidity: $data")
end
end
end
sleep(120)
NATS.drain(conn)
end
test_receiver()
```
---
## Testing
Run the test scripts to verify functionality:
### Python/Micropython
```bash
# Basic functionality test
python test/test_micropython_basic.py
# Text message exchange
python test/test_micropython_text_sender.py
python test/test_micropython_text_receiver.py
# Dictionary exchange
python test/test_micropython_dict_sender.py
python test/test_micropython_dict_receiver.py
# File transfer
python test/test_micropython_file_sender.py
python test/test_micropython_file_receiver.py
# Mixed payload types
python test/test_micropython_mixed_sender.py
python test/test_micropython_mixed_receiver.py
```
### JavaScript
```bash
# Text message exchange
node test/test_js_text_sender.js
node test/test_js_text_receiver.js
# Dictionary exchange
node test/test_js_dict_sender.js
node test/test_js_dict_receiver.js
# File transfer
node test/test_js_file_sender.js
node test/test_js_file_receiver.js
# Mixed payload types
node test/test_js_mix_payload_sender.js
node test/test_js_mix_payloads_receiver.js
# Table exchange
node test/test_js_table_sender.js
node test/test_js_table_receiver.js
```
### Julia
```julia
# Text message exchange
julia test/test_julia_text_sender.jl
julia test/test_julia_text_receiver.jl
# Dictionary exchange
julia test/test_julia_dict_sender.jl
julia test/test_julia_dict_receiver.jl
# File transfer
julia test/test_julia_file_sender.jl
julia test/test_julia_file_receiver.jl
# Mixed payload types
julia test/test_julia_mix_payloads_sender.jl
julia test/test_julia_mix_payloads_receiver.jl
# Table exchange
julia test/test_julia_table_sender.jl
julia test/test_julia_table_receiver.jl
```
---
## License
MIT License
Copyright (c) 2026 NATSBridge Contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -17,16 +17,16 @@ The system uses **handler functions** to abstract file server operations, allowi
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserverUploadHandler parameter
# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserverDownloadHandler parameter
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
```
This design allows the system to support multiple file server backends without changing the core messaging logic.
@@ -40,21 +40,21 @@ The system uses a **standardized list-of-tuples format** for all payload operati
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# {
# "correlationId": "...",
# "msgId": "...",
# "correlation_id": "...",
# "msg_id": "...",
# "timestamp": "...",
# "sendTo": "...",
# "msgPurpose": "...",
# "senderName": "...",
# "senderId": "...",
# "receiverName": "...",
# "receiverId": "...",
# "replyTo": "...",
# "replyToMsgId": "...",
# "brokerURL": "...",
# "send_to": "...",
# "msg_purpose": "...",
# "sender_name": "...",
# "sender_id": "...",
# "receiver_name": "...",
# "receiver_id": "...",
# "reply_to": "...",
# "reply_to_msg_id": "...",
# "broker_url": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
@@ -78,17 +78,16 @@ This design allows per-payload type specification, enabling **mixed-content mess
smartsend(
"/test",
[("dataname1", data1, "dictionary")], # List with one tuple (data, type)
nats_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload,
metadata=user_provided_envelope_level_metadata
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads in one message with different types
smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
nats_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Mixed content (e.g., chat with text, image, audio)
@@ -99,13 +98,14 @@ smartsend(
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
nats_url="nats://localhost:4222"
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay)
# envelope["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# envelope["correlationId"], envelope["msgId"], etc.
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
```
## Architecture Diagram
@@ -138,48 +138,48 @@ flowchart TD
## System Components
### 1. msgEnvelope_v1 - Message Envelope
### 1. msg_envelope_v1 - Message Envelope
The `msgEnvelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications.
The `msg_envelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications.
**Julia Structure:**
```julia
struct msgEnvelope_v1
correlationId::String # Unique identifier to track messages across systems
msgId::String # This message id
timestamp::String # Message published timestamp
struct msg_envelope_v1
correlation_id::String # Unique identifier to track messages across systems
msg_id::String # This message id
timestamp::String # Message published timestamp
sendTo::String # Topic/subject the sender sends to
msgPurpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...)
senderName::String # Sender name (e.g., "agent-wine-web-frontend")
senderId::String # Sender id (uuid4)
receiverName::String # Message receiver name (e.g., "agent-backend")
receiverId::String # Message receiver id (uuid4 or nothing for broadcast)
replyTo::String # Topic to reply to
replyToMsgId::String # Message id this message is replying to
brokerURL::String # NATS server address
send_to::String # Topic/subject the sender sends to
msg_purpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...)
sender_name::String # Sender name (e.g., "agent-wine-web-frontend")
sender_id::String # Sender id (uuid4)
receiver_name::String # Message receiver name (e.g., "agent-backend")
receiver_id::String # Message receiver id (uuid4 or nothing for broadcast)
reply_to::String # Topic to reply to
reply_to_msg_id::String # Message id this message is replying to
broker_url::String # NATS server address
metadata::Dict{String, Any}
payloads::AbstractArray{msgPayload_v1} # Multiple payloads stored here
payloads::Vector{msg_payload_v1} # Multiple payloads stored here
end
```
**JSON Schema:**
```json
{
"correlationId": "uuid-v4-string",
"msgId": "uuid-v4-string",
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"sendTo": "topic/subject",
"msgPurpose": "ACK | NACK | updateStatus | shutdown | chat",
"senderName": "agent-wine-web-frontend",
"senderId": "uuid4",
"receiverName": "agent-backend",
"receiverId": "uuid4",
"replyTo": "topic",
"replyToMsgId": "uuid4",
"brokerURL": "nats://localhost:4222",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
@@ -189,7 +189,7 @@ end
{
"id": "uuid4",
"dataname": "login_image",
"type": "image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,
@@ -201,7 +201,7 @@ end
{
"id": "uuid4",
"dataname": "large_data",
"type": "table",
"payload_type": "table",
"transport": "link",
"encoding": "none",
"size": 524288,
@@ -214,16 +214,16 @@ end
}
```
### 2. msgPayload_v1 - Payload Structure
### 2. msg_payload_v1 - Payload Structure
The `msgPayload_v1` structure provides flexible payload handling for various data types across all supported platforms.
The `msg_payload_v1` structure provides flexible payload handling for various data types across all supported platforms.
**Julia Structure:**
```julia
struct msgPayload_v1
struct msg_payload_v1
id::String # Id of this payload (e.g., "uuid4")
dataname::String # Name of this payload (e.g., "login_image")
type::String # "text | dictionary | table | image | audio | video | binary"
payload_type::String # "text | dictionary | table | image | audio | video | binary"
transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc"
size::Integer # Data size in bytes
@@ -383,13 +383,32 @@ graph TD
```julia
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, Any, String}}; # No standalone type parameter
nats_url::String = "nats://localhost:4222",
fileserverUploadHandler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000 # 1MB
data::AbstractArray{Tuple{String, Any, String}, 1}; # List of (dataname, data, type) tuples
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS
)
```
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
- `env_json_str::String` - JSON string representation of the envelope for publishing
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
The envelope object can be accessed directly for programmatic use, while the JSON string can be published directly to NATS using the request-reply pattern.
**Input Format:**
- `data::AbstractArray{Tuple{String, Any, String}}` - **Must be a list of (dataname, data, type) tuples**: `[("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
@@ -406,8 +425,8 @@ function smartsend(
```julia
function smartreceive(
msg::NATS.Message,
fileserverDownloadHandler::Function;
msg::NATS.Msg;
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
@@ -416,7 +435,7 @@ function smartreceive(
# Iterate through all payloads
# For each payload: check transport type
# If direct: decode Base64 payload
# If link: fetch from URL with exponential backoff using fileserverDownloadHandler
# If link: fetch from URL with exponential backoff using fileserver_download_handler
# Deserialize payload based on type
# Return envelope dictionary with all metadata and deserialized payloads
end
@@ -424,7 +443,7 @@ end
**Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
@@ -434,71 +453,186 @@ end
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff (via `fileserverDownloadHandler`)
- If `link`: fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserverDownloadHandler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
**Note:** The `fileserver_download_handler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
### JavaScript Implementation
#### Dependencies
- `nats.js` - Core NATS functionality
- `apache-arrow` - Arrow IPC serialization
- `uuid` - Correlation ID generation
- `uuid` - Correlation ID and message ID generation
- `base64-arraybuffer` - Base64 encoding/decoding
- `node-fetch` or `fetch` - HTTP client for file server
#### smartsend Function
```javascript
async function smartsend(subject, data, options = {})
// data format: [(dataname, data, type), ...]
// options object should include:
// - natsUrl: NATS server URL
// - fileserverUrl: base URL of the file server
// - sizeThreshold: threshold in bytes for transport selection
// - correlationId: optional correlation ID for tracing
async function smartsend(
subject,
data, // List of (dataname, data, type) tuples: [(dataname1, data1, type1), ...]
options = {}
)
```
**Options:**
- `broker_url` (String) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (String) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (Number) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (String) - Optional correlation ID for tracing
- `msg_purpose` (String) - Purpose of the message (default: `"chat"`)
- `sender_name` (String) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (String) - Message receiver name (default: `""`)
- `receiver_id` (String) - Message receiver ID (default: `""`)
- `reply_to` (String) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (String) - Message ID this message is replying to (default: `""`)
- `fileserver_upload_handler` (Function) - Custom upload handler function
**Return Value:**
- Returns a Promise that resolves to an object containing:
- `env` - The envelope object containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
- `published` - Boolean indicating whether the message was automatically published to NATS
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Flow:**
1. Iterate through the list of (dataname, data, type) tuples
2. For each payload: extract the type from the tuple and serialize accordingly
3. Check payload size
4. If < threshold: publish directly to NATS
5. If >= threshold: upload to HTTP server, publish NATS with URL
1. Generate correlation ID and message ID if not provided
2. Iterate through the list of `(dataname, data, type)` tuples
3. For each payload:
- Serialize based on payload type
- Check payload size
- If < threshold: Base64 encode and include in envelope
- If >= threshold: Upload to HTTP server, store URL in envelope
4. Publish the JSON envelope to NATS
5. Return envelope object and JSON string
#### smartreceive Handler
```javascript
async function smartreceive(msg, options = {})
// options object should include:
// - fileserverDownloadHandler: function to fetch data from file server URL
// - max_retries: maximum retry attempts for fetching URL
// - base_delay: initial delay for exponential backoff in ms
// - max_delay: maximum delay for exponential backoff in ms
// - correlationId: optional correlation ID for tracing
```
**Options:**
- `fileserver_download_handler` (Function) - Custom download handler function
- `max_retries` (Number) - Maximum retry attempts for fetching URL (default: `5`)
- `base_delay` (Number) - Initial delay for exponential backoff in ms (default: `100`)
- `max_delay` (Number) - Maximum delay for exponential backoff in ms (default: `5000`)
- `correlation_id` (String) - Optional correlation ID for tracing
**Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
- Returns a Promise that resolves to an object containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
- `payloads` - List of dictionaries, each containing deserialized payload data with keys: `dataname`, `data`, `payload_type`
**Process Flow:**
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads`
2. Iterate through each payload in `payloads` array
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`.
### Python/Micropython Implementation
#### Dependencies
- `nats-python` - Core NATS functionality
- `pyarrow` - Arrow IPC serialization
- `uuid` - Correlation ID and message ID generation
- `base64` - Base64 encoding/decoding
- `requests` or `aiohttp` - HTTP client for file server
#### smartsend Function
```python
async def smartsend(
subject: str,
data: List[Tuple[str, Any, str]], # List of (dataname, data, type) tuples
options: Dict = {}
)
```
**Options:**
- `broker_url` (str) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (str) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (int) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (str) - Optional correlation ID for tracing
- `msg_purpose` (str) - Purpose of the message (default: `"chat"`)
- `sender_name` (str) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (str) - Message receiver name (default: `""`)
- `receiver_id` (str) - Message receiver ID (default: `""`)
- `reply_to` (str) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (str) - Message ID this message is replying to (default: `""`)
- `fileserver_upload_handler` (Callable) - Custom upload handler function
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env` - The envelope dictionary containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Flow:**
1. Generate correlation ID and message ID if not provided
2. Iterate through the list of `(dataname, data, type)` tuples
3. For each payload:
- Serialize based on payload type
- Check payload size
- If < threshold: Base64 encode and include in envelope
- If >= threshold: Upload to HTTP server, store URL in envelope
4. Publish the JSON envelope to NATS
5. Return envelope dictionary and JSON string
#### smartreceive Handler
```python
async def smartreceive(
msg: NATS.Message,
options: Dict = {}
)
```
**Options:**
- `fileserver_download_handler` (Callable) - Custom download handler function
- `max_retries` (int) - Maximum retry attempts for fetching URL (default: `5`)
- `base_delay` (int) - Initial delay for exponential backoff in ms (default: `100`)
- `max_delay` (int) - Maximum delay for exponential backoff in ms (default: `5000`)
- `correlation_id` (str) - Optional correlation ID for tracing
**Output Format:**
- Returns a dictionary containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of tuples, each containing `(dataname, data, payload_type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads` list
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str)` and returns `bytes`.
## Scenario Implementations
### Scenario 1: Command & Control (Small Dictionary)

View File

@@ -19,49 +19,102 @@ NATSBridge is implemented in three languages, each providing the same API:
| **JavaScript** | [`src/NATSBridge.js`](../src/NATSBridge.js) | JavaScript implementation for Node.js and browsers |
| **Python/Micropython** | [`src/nats_bridge.py`](../src/nats_bridge.py) | Python implementation for desktop and microcontrollers |
### Multi-Payload Support
### File Server Handler Architecture
The implementation uses a **standardized list-of-tuples format** for all payload operations. **Even when sending a single payload, the user must wrap it in a list.**
The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
**Handler Function Signatures:**
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
```
This design allows the system to support multiple file server backends without changing the core messaging logic.
### Multi-Payload Support (Standard API)
The system uses a **standardized list-of-tuples format** for all payload operations. **Even when sending a single payload, the user must wrap it in a list.**
**API Standard:**
```julia
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# {
# "correlationId": "...",
# "msgId": "...",
# "correlation_id": "...",
# "msg_id": "...",
# "timestamp": "...",
# "sendTo": "...",
# "msgPurpose": "...",
# "senderName": "...",
# "senderId": "...",
# "receiverName": "...",
# "receiverId": "...",
# "replyTo": "...",
# "replyToMsgId": "...",
# "brokerURL": "...",
# "send_to": "...",
# "msg_purpose": "...",
# "sender_name": "...",
# "sender_id": "...",
# "receiver_name": "...",
# "receiver_id": "...",
# "reply_to": "...",
# "reply_to_msg_id": "...",
# "broker_url": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
```
Where `type` can be: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Supported Types:**
- `"text"` - Plain text
- `"dictionary"` - JSON-serializable dictionaries (Dict, NamedTuple)
- `"table"` - Tabular data (DataFrame, array of structs)
- `"image"` - Image data (Bitmap, PNG/JPG bytes)
- `"audio"` - Audio data (WAV, MP3 bytes)
- `"video"` - Video data (MP4, AVI bytes)
- `"binary"` - Generic binary data (Vector{UInt8})
This design allows per-payload type specification, enabling **mixed-content messages** where different payloads can use different serialization formats in a single message.
**Examples:**
```julia
# Single payload - still wrapped in a list (type is required as third element)
smartsend("/test", [(dataname1, data1, "text")], ...)
# Single payload - still wrapped in a list
smartsend(
"/test",
[("dataname1", data1, "dictionary")], # List with one tuple (data, type)
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads in one message (each payload has its own type)
smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], ...)
# Multiple payloads in one message with different types
smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Mixed content (e.g., chat with text, image, audio)
smartsend(
"/chat",
[
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, ...)
# envelope["payloads"] = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...]
# envelope["correlationId"], envelope["msgId"], etc.
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
```
## Cross-Platform Interoperability
@@ -98,14 +151,14 @@ NATSBridge is designed for seamless communication between Julia, JavaScript, and
# Julia sender
using NATSBridge
data = [("message", "Hello from Julia!", "text")]
smartsend("/cross_platform", data, nats_url="nats://localhost:4222")
smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
```
```javascript
// JavaScript receiver
const { smartreceive } = require('./src/NATSBridge');
const envelope = await smartreceive(msg);
// envelope.payloads[0].data === "Hello from Julia!"
const env = await smartreceive(msg);
// env.payloads[0].data === "Hello from Julia!"
```
```python
@@ -146,15 +199,31 @@ All three implementations (Julia, JavaScript, Python/Micropython) follow the sam
└─────────────────┘ └─────────────────┘
```
## Files
## smartsend Return Value
The `smartsend` function now returns a tuple containing both the envelope object and the JSON string representation:
```julia
env, env_json_str = smartsend(...)
# env::msg_envelope_v1 - The envelope object with all metadata and payloads
# env_json_str::String - JSON string for publishing to NATS
```
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
This enables two use cases:
1. **Programmatic envelope access**: Access envelope fields directly via the `env` object
2. **Direct JSON publishing**: Publish the JSON string directly using NATS request-reply pattern
### Julia Module: [`src/NATSBridge.jl`](../src/NATSBridge.jl)
The Julia implementation provides:
- **[`MessageEnvelope`](src/NATSBridge.jl)**: Struct for the unified JSON envelope
- **[`SmartSend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size
- **[`SmartReceive()`](src/NATSBridge.jl)**: Handles both direct and link transport
- **[`msg_envelope_v1`](src/NATSBridge.jl)**: Struct for the unified JSON envelope
- **[`msg_payload_v1`](src/NATSBridge.jl)**: Struct for individual payload representation
- **[`smartsend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size
- **[`smartreceive()`](src/NATSBridge.jl)**: Handles both direct and link transport
### JavaScript Module: [`src/NATSBridge.js`](../src/NATSBridge.js)
@@ -247,7 +316,53 @@ node test/scenario3_julia_to_julia.js
## Usage
### Scenario 0: Basic Multi-Payload Example
### Scenario 1: Command & Control (Small Dictionary)
**Focus:** Sending small dictionary configurations across platforms. This is the simplest use case for command and control scenarios.
**Julia (Sender/Receiver):**
```julia
using NATSBridge
# Subscribe to control subject
# Parse JSON envelope
# Execute simulation with parameters
# Send acknowledgment
```
**JavaScript (Sender/Receiver):**
```javascript
const { smartsend } = require('./src/NATSBridge');
// Create small dictionary config
// Send via smartsend with type="dictionary"
const config = {
step_size: 0.01,
iterations: 1000,
threshold: 0.5
};
await smartsend("control", [
{ dataname: "config", data: config, type: "dictionary" }
]);
```
**Python/Micropython (Sender/Receiver):**
```python
from nats_bridge import smartsend
# Create small dictionary config
# Send via smartsend with type="dictionary"
config = {
"step_size": 0.01,
"iterations": 1000,
"threshold": 0.5
}
smartsend("control", [("config", config, "dictionary")])
```
### Basic Multi-Payload Example
#### Python/Micropython (Sender)
```python
@@ -262,16 +377,16 @@ smartsend(
)
# Even single payload must be wrapped in a list with type
smartsend("/test", [("single_data", mydata, "dictionary")])
smartsend("/test", [("single_data", mydata, "dictionary")], nats_url="nats://localhost:4222")
```
#### Python/Micropython (Receiver)
```python
from nats_bridge import smartreceive
# Receive returns a list of (dataname, data, type) tuples
payloads = smartreceive(msg)
# payloads = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
# Receive returns a dictionary with envelope metadata and payloads field
env = smartreceive(msg)
# env["payloads"] = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
```
#### JavaScript (Sender)
@@ -315,18 +430,18 @@ const nc = await connect({ servers: ['nats://localhost:4222'] });
const sub = nc.subscribe("control");
for await (const msg of sub) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process the payloads from the envelope
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
const { dataname, data, type } = payload;
console.log(`Received ${dataname} of type ${type}`);
console.log(`Data: ${JSON.stringify(data)}`);
}
// Also access envelope metadata
console.log(`Correlation ID: ${envelope.correlationId}`);
console.log(`Message ID: ${envelope.msgId}`);
console.log(`Correlation ID: ${env.correlation_id}`);
console.log(`Message ID: ${env.msg_id}`);
}
```
@@ -344,19 +459,21 @@ df = DataFrame(
category = rand(["A", "B", "C"], 10_000_000)
)
# Send via SmartSend - wrapped in a list (type is part of each tuple)
await SmartSend("analysis_results", [("table_data", df, "table")]);
# Send via smartsend - wrapped in a list (type is part of each tuple)
env, env_json_str = smartsend("analysis_results", [("table_data", df, "table")], broker_url="nats://localhost:4222")
# env: msg_envelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
```
#### JavaScript (Receiver)
```javascript
const { smartreceive } = require('./src/NATSBridge');
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Use table data from the payloads field
// Note: Tables are sent as arrays of objects in JavaScript
const table = envelope.payloads;
const table = env.payloads;
```
### Scenario 3: Live Binary Processing
@@ -406,10 +523,10 @@ from nats_bridge import smartreceive
# Receive binary data
def process_binary(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process the binary data from envelope.payloads
for dataname, data, type in envelope["payloads"]:
# Process the binary data from env.payloads
for dataname, data, type in env["payloads"]:
if type == "binary":
# data is bytes
print(f"Received binary data: {dataname}, size: {len(data)}")
@@ -422,10 +539,10 @@ const { smartreceive } = require('./src/NATSBridge');
// Receive binary data
function process_binary(msg) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process the binary data from envelope.payloads
for (const payload of envelope.payloads) {
// Process the binary data from env.payloads
for (const payload of env.payloads) {
if (payload.type === "binary") {
// data is an ArrayBuffer or Uint8Array
console.log(`Received binary data: ${payload.dataname}, size: ${payload.data.length}`);
@@ -444,7 +561,7 @@ using NATSBridge
function publish_health_status(nats_url)
# Send status wrapped in a list (type is part of each tuple)
status = Dict("cpu" => rand(), "memory" => rand())
smartsend("health", [("status", status, "dictionary")], nats_url=nats_url)
smartsend("health", [("status", status, "dictionary")], broker_url=nats_url)
sleep(5) # Every 5 seconds
end
```
@@ -466,8 +583,8 @@ const consumer = await js.pullSubscribe("health", {
// Process historical and real-time messages
for await (const msg of consumer) {
const envelope = await smartreceive(msg);
// envelope.payloads contains the list of payloads
const env = await smartreceive(msg);
// env.payloads contains the list of payloads
// Each payload has: dataname, data, type
msg.ack();
}
@@ -484,10 +601,10 @@ import json
# Device configuration handler
def handle_device_config(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process configuration from payloads
for dataname, data, type in envelope["payloads"]:
for dataname, data, type in env["payloads"]:
if type == "dictionary":
print(f"Received configuration: {data}")
# Apply configuration to device
@@ -506,7 +623,7 @@ def handle_device_config(msg):
"device/response",
[("config", config, "dictionary")],
nats_url="nats://localhost:4222",
reply_to=envelope.get("replyTo")
reply_to=env.get("reply_to")
)
```
@@ -566,11 +683,11 @@ smartsend(
const { smartreceive, smartsend } = require('./src/NATSBridge');
// Receive NATS message with direct transport
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Decode Base64 payload (for direct transport)
// For tables, data is in envelope.payloads
const table = envelope.payloads; // Array of objects
// For tables, data is in env.payloads
const table = env.payloads; // Array of objects
// User makes selection
const selection = uiComponent.getSelectedOption();
@@ -619,7 +736,7 @@ chat_message = [
smartsend(
"chat.room123",
chat_message,
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
msg_purpose="chat",
reply_to="chat.room123.responses"
)
@@ -667,7 +784,7 @@ await smartsend("chat.room123", message);
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
## Configuration
@@ -683,19 +800,19 @@ await smartsend("chat.room123", message);
```json
{
"correlationId": "uuid-v4-string",
"msgId": "uuid-v4-string",
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"sendTo": "topic/subject",
"msgPurpose": "ACK | NACK | updateStatus | shutdown | chat",
"senderName": "agent-wine-web-frontend",
"senderId": "uuid4",
"receiverName": "agent-backend",
"receiverId": "uuid4",
"replyTo": "topic",
"replyToMsgId": "uuid4",
"BrokerURL": "nats://localhost:4222",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
"content_type": "application/octet-stream",
@@ -706,7 +823,7 @@ await smartsend("chat.room123", message);
{
"id": "uuid4",
"dataname": "login_image",
"type": "image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,

View File

@@ -107,10 +107,15 @@ python3 -m http.server 8080 --directory /tmp/fileserver
```python
from nats_bridge import smartsend
# Send a text message
# Send a text message (is_publish=True by default)
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
print("Message sent!")
# Or use is_publish=False to get envelope and JSON without publishing
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222", is_publish=False)
# env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS
```
#### JavaScript
@@ -118,12 +123,19 @@ print("Message sent!")
```javascript
const { smartsend } = require('./src/NATSBridge');
// Send a text message
// Send a text message (isPublish=true by default)
await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
console.log("Message sent!");
// Or use isPublish=false to get envelope and JSON without publishing
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222", isPublish: false });
// env: MessageEnvelope object
// env_json_str: JSON string for publishing to NATS
```
#### Julia
@@ -133,7 +145,9 @@ using NATSBridge
# Send a text message
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# env: msgEnvelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
println("Message sent!")
```
@@ -145,8 +159,8 @@ println("Message sent!")
from nats_bridge import smartreceive
# Receive and process message
envelope = smartreceive(msg)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg)
for dataname, data, type in env["payloads"]:
print(f"Received {dataname}: {data}")
```
@@ -156,8 +170,8 @@ for dataname, data, type in envelope["payloads"]:
const { smartreceive } = require('./src/NATSBridge');
// Receive and process message
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
console.log(`Received ${payload.dataname}: ${payload.data}`);
}
```
@@ -168,8 +182,8 @@ for (const payload of envelope.payloads) {
using NATSBridge
# Receive and process message
envelope = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data")
end
```
@@ -194,7 +208,7 @@ config = {
# Send as dictionary type
data = [("config", config, "dictionary")]
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/device/config", data, nats_url="nats://localhost:4222")
```
#### JavaScript
@@ -208,7 +222,7 @@ const config = {
update_interval: 60
};
await smartsend("/device/config", [
const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" }
]);
```
@@ -225,7 +239,7 @@ config = Dict(
)
data = [("config", config, "dictionary")]
smartsend("/device/config", data)
env, env_json_str = smartsend("/device/config", data)
```
### Example 2: Sending Binary Data (Image)
@@ -241,7 +255,7 @@ with open("image.png", "rb") as f:
# Send as binary type
data = [("user_image", image_data, "binary")]
env = smartsend("/chat/image", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/image", data, nats_url="nats://localhost:4222")
```
#### JavaScript
@@ -253,7 +267,7 @@ const { smartsend } = require('./src/NATSBridge');
const fs = require('fs');
const image_data = fs.readFileSync('image.png');
await smartsend("/chat/image", [
const { env, env_json_str } = await smartsend("/chat/image", [
{ dataname: "user_image", data: image_data, type: "binary" }
]);
```
@@ -267,7 +281,7 @@ using NATSBridge
image_data = read("image.png")
data = [("user_image", image_data, "binary")]
smartsend("/chat/image", data)
env, env_json_str = smartsend("/chat/image", data)
```
### Example 3: Request-Response Pattern
@@ -279,13 +293,15 @@ from nats_bridge import smartsend
# Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend(
env, env_json_str = smartsend(
"/device/command",
data,
nats_url="nats://localhost:4222",
reply_to="/device/response",
reply_to_msg_id="cmd-001"
)
# env: msgEnvelope_v1 object
# env_json_str: JSON string for publishing to NATS
```
#### JavaScript (Responder)
@@ -297,10 +313,10 @@ const { smartreceive, smartsend } = require('./src/NATSBridge');
const sub = nc.subscribe("/device/command");
for await (const msg of sub) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process command
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.dataname === "command") {
const command = payload.data;
@@ -315,8 +331,8 @@ for await (const msg of sub) {
await smartsend("/device/response", [
{ dataname: "sensor_data", data: response, type: "dictionary" }
], {
reply_to: envelope.replyTo,
reply_to_msg_id: envelope.msgId
reply_to: env.replyTo,
reply_to_msg_id: env.msgId
});
}
}
@@ -342,7 +358,7 @@ import os
large_data = os.urandom(2_000_000) # 2MB of random data
# Send with file server URL
env = smartsend(
env, env_json_str = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
nats_url="nats://localhost:4222",
@@ -364,7 +380,7 @@ const largeData = new ArrayBuffer(2_000_000);
const view = new Uint8Array(largeData);
view.fill(42); // Fill with some data
await smartsend("/data/large", [
const { env, env_json_str } = await smartsend("/data/large", [
{ dataname: "large_file", data: largeData, type: "binary" }
], {
fileserverUrl: "http://localhost:8080",
@@ -380,7 +396,7 @@ using NATSBridge
# Create large data (> 1MB)
large_data = rand(UInt8, 2_000_000)
env = smartsend(
env, env_json_str = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
fileserver_url="http://localhost:8080"
@@ -409,7 +425,7 @@ data = [
("user_avatar", image_data, "image")
]
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
```
#### JavaScript
@@ -419,7 +435,7 @@ const { smartsend } = require('./src/NATSBridge');
const fs = require('fs');
await smartsend("/chat/mixed", [
const { env, env_json_str } = await smartsend("/chat/mixed", [
{
dataname: "message_text",
data: "Hello with image!",
@@ -445,7 +461,7 @@ data = [
("user_avatar", image_data, "image")
]
smartsend("/chat/mixed", data)
env, env_json_str = smartsend("/chat/mixed", data)
```
### Example 6: Table Data (Arrow IPC)
@@ -467,7 +483,7 @@ df = pd.DataFrame({
# Send as table type
data = [("students", df, "table")]
env = smartsend("/data/students", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/data/students", data, nats_url="nats://localhost:4222")
```
#### Julia
@@ -484,7 +500,7 @@ df = DataFrame(
)
data = [("students", df, "table")]
smartsend("/data/students", data)
env, env_json_str = smartsend("/data/students", data)
```
---
@@ -503,7 +519,7 @@ using NATSBridge
# Send dictionary from Julia to JavaScript
config = Dict("step_size" => 0.01, "iterations" => 1000)
data = [("config", config, "dictionary")]
smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
```
#### JavaScript Receiver
@@ -512,8 +528,8 @@ smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
const { smartreceive } = require('./src/NATSBridge');
// Receive dictionary from Julia
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
if (payload.type === "dictionary") {
console.log("Received config:", payload.data);
// payload.data = { step_size: 0.01, iterations: 1000 }
@@ -528,7 +544,7 @@ for (const payload of envelope.payloads) {
```javascript
const { smartsend } = require('./src/NATSBridge');
await smartsend("/data/transfer", [
const { env, env_json_str } = await smartsend("/data/transfer", [
{ dataname: "message", data: "Hello from JS!", type: "text" }
]);
```
@@ -538,8 +554,8 @@ await smartsend("/data/transfer", [
```python
from nats_bridge import smartreceive
envelope = smartreceive(msg)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg)
for dataname, data, type in env["payloads"]:
if type == "text":
print(f"Received from JS: {data}")
```
@@ -552,7 +568,7 @@ for dataname, data, type in envelope["payloads"]:
from nats_bridge import smartsend
data = [("message", "Hello from Python!", "text")]
smartsend("/chat/python", data)
env, env_json_str = smartsend("/chat/python", data)
```
#### Julia Receiver
@@ -560,8 +576,8 @@ smartsend("/chat/python", data)
```julia
using NATSBridge
envelope = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
if type == "text"
println("Received from Python: $data")
end

View File

@@ -132,7 +132,7 @@ class ChatUI {
});
}
await smartsend(
const { env, env_json_str } = await smartsend(
`/chat/${this.currentRoom}`,
data,
{
@@ -216,15 +216,15 @@ class ChatHandler {
}
async handleMessage(msg) {
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.downloadFile.bind(this)
});
// Extract sender info from envelope
const sender = envelope.senderName || 'Anonymous';
const sender = env.senderName || 'Anonymous';
// Process each payload
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'text') {
this.ui.addMessage(sender, payload.data);
} else if (payload.type === 'image') {
@@ -304,7 +304,7 @@ class FileUploadService {
type: 'binary'
}];
const envelope = await smartsend(
const { env, env_json_str } = await smartsend(
`/files/${recipient}`,
data,
{
@@ -314,7 +314,7 @@ class FileUploadService {
}
);
return envelope;
return env;
}
async uploadLargeFile(filePath, recipient) {
@@ -356,12 +356,12 @@ class FileDownloadService {
async downloadFile(sender, downloadId) {
// Subscribe to sender's file channel
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
});
// Process each payload
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'binary') {
const filePath = `/downloads/${payload.dataname}`;
fs.writeFileSync(filePath, payload.data);
@@ -422,9 +422,9 @@ async function uploadFile(config) {
const fileService = new FileUploadService(config.nats_url, config.fileserver_url);
try {
const envelope = await fileService.uploadFile(filePath, recipient);
const env = await fileService.uploadFile(filePath, recipient);
console.log('Upload successful!');
console.log(`File ID: ${envelope.payloads[0].id}`);
console.log(`File ID: ${env.payloads[0].id}`);
} catch (error) {
console.error('Upload failed:', error.message);
}
@@ -514,6 +514,7 @@ class SensorSender:
data = [("reading", reading.to_dict(), "dictionary")]
# Default: is_publish=True (automatically publishes to NATS)
smartsend(
f"/sensors/{sensor_id}",
data,
@@ -521,6 +522,31 @@ class SensorSender:
fileserver_url=self.fileserver_url
)
def prepare_message_only(self, sensor_id: str, value: float, unit: str):
"""Prepare a message without publishing (is_publish=False)."""
reading = SensorReading(
sensor_id=sensor_id,
timestamp=datetime.now().isoformat(),
value=value,
unit=unit
)
data = [("reading", reading.to_dict(), "dictionary")]
# With is_publish=False, returns (env, env_json_str) without publishing
env, env_json_str = smartsend(
f"/sensors/{sensor_id}/prepare",
data,
nats_url=self.nats_url,
fileserver_url=self.fileserver_url,
is_publish=False
)
# Now you can publish manually using NATS request-reply pattern
# nc.request(subject, env_json_str, reply_to=reply_to_topic)
return env, env_json_str
def send_batch(self, readings: List[SensorReading]):
batch = SensorBatch()
for reading in readings:
@@ -571,9 +597,9 @@ class SensorReceiver:
self.fileserver_download_handler = fileserver_download_handler
def process_reading(self, msg):
envelope = smartreceive(msg, self.fileserver_download_handler)
env = smartreceive(msg, self.fileserver_download_handler)
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if data_type == "dictionary":
reading = SensorReading(
sensor_id=data["sensor_id"],
@@ -673,10 +699,10 @@ class DeviceBridge:
# Poll for messages
msg = self._poll_for_message()
if msg:
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process payloads
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if dataname == "command":
callback(data)
@@ -772,9 +798,9 @@ class DashboardServer:
def receive_selection(self, callback):
def handler(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if data_type == "dictionary":
callback(data)
@@ -807,7 +833,7 @@ class DashboardUI {
async refreshData() {
// Request fresh data
await smartsend("/dashboard/request", [
const { env, env_json_str } = await smartsend("/dashboard/request", [
{ dataname: "request", data: { type: "refresh" }, type: "dictionary" }
], {
fileserverUrl: window.config.fileserver_url
@@ -816,12 +842,12 @@ class DashboardUI {
async fetchData() {
// Subscribe to data updates
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
});
// Process table data
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'table') {
// Deserialize Arrow IPC
this.data = this.deserializeArrow(payload.data);

View File

@@ -0,0 +1,14 @@
services:
plik:
image: rootgg/plik:latest
container_name: plik-server
restart: unless-stopped
ports:
- "8080:8080"
volumes:
# # Mount the config file (created below)
# - ./plikd.cfg:/home/plik/server/plikd.cfg
# Mount local folder for uploads and database
- ./plik-data:/data
# Set user to match your host UID to avoid permission issues
user: "1000:1000"

File diff suppressed because it is too large Load Diff

View File

@@ -460,8 +460,9 @@ async function smartsend(subject, data, options = {}) {
* @param {string} options.receiverId - UUID of the receiver (default: "")
* @param {string} options.replyTo - Topic to reply to (default: "")
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "")
* @param {boolean} options.isPublish - Whether to automatically publish the message to NATS (default: true)
*
* @returns {Promise<MessageEnvelope>} - The envelope for tracking
* @returns {Promise<Object>} - An object with { env: MessageEnvelope, env_json_str: string }
*/
const {
natsUrl = DEFAULT_NATS_URL,
@@ -474,7 +475,8 @@ async function smartsend(subject, data, options = {}) {
receiverName = "",
receiverId = "",
replyTo = "",
replyToMsgId = ""
replyToMsgId = "",
isPublish = true // Whether to automatically publish the message to NATS
} = options;
log_trace(correlationId, `Starting smartsend for subject: ${subject}`);
@@ -556,10 +558,19 @@ async function smartsend(subject, data, options = {}) {
payloads: payloads
});
// Publish message to NATS
await publish_message(natsUrl, subject, env.toString(), correlationId);
// Convert envelope to JSON string
const env_json_str = env.toString();
return env;
// Publish to NATS if isPublish is true
if (isPublish) {
await publish_message(natsUrl, subject, env_json_str, correlationId);
}
// Return both envelope and JSON string (tuple-like structure)
return {
env: env,
env_json_str: env_json_str
};
}
// Helper: Publish message to NATS

View File

@@ -1,295 +0,0 @@
# NATSBridge
A high-performance, bi-directional data bridge for **Julia**, **JavaScript**, and **Python/Micropython** using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
## Overview
NATSBridge enables seamless communication between Julia, JavaScript, and Python/Micropython applications through NATS, with automatic transport selection based on payload size:
- **Direct Transport**: Payloads < 1MB are sent directly via NATS (Base64 encoded)
- **Link Transport**: Payloads >= 1MB are uploaded to an HTTP file server and referenced via URL
## Features
- ✅ Bi-directional NATS communication across Julia ↔ JavaScript ↔ Python/Micropython
- ✅ Multi-payload support (mixed content in single message)
- ✅ Automatic transport selection based on payload size
- ✅ File server integration for large payloads
- ✅ Exponential backoff for URL fetching
- ✅ Correlation ID tracking
- ✅ Reply-to support for request-response pattern
## Supported Payload Types
| Type | Description |
|------|-------------|
| `text` | Plain text strings |
| `dictionary` | JSON-serializable dictionaries |
| `table` | Tabular data (Arrow IPC format) |
| `image` | Image data (PNG, JPG bytes) |
| `audio` | Audio data (WAV, MP3 bytes) |
| `video` | Video data (MP4, AVI bytes) |
| `binary` | Generic binary data |
## Implementation Guides
### [Julia Implementation](../tutorial_julia.md)
See the [Julia tutorial](../tutorial_julia.md) for getting started with Julia.
### [JavaScript Implementation](#javascript-implementation)
See [`NATSBridge.js`](NATSBridge.js) for the JavaScript implementation.
### [Python/Micropython Implementation](#pythonmicropython-implementation)
See [`nats_bridge.py`](nats_bridge.py) for the Python/Micropython implementation.
## Installation
### Julia
```julia
using Pkg
Pkg.add("NATS")
Pkg.add("Arrow")
Pkg.add("JSON3")
Pkg.add("HTTP")
Pkg.add("UUIDs")
Pkg.add("Dates")
```
### JavaScript
```bash
npm install nats.js apache-arrow uuid base64-url
```
### Python/Micropython
1. Copy `nats_bridge.py` to your device
2. Ensure you have the following dependencies:
- `urequests` for HTTP requests (Micropython)
- `requests` for HTTP requests (Python)
- `base64` for base64 encoding
- `json` for JSON handling
- `socket` for networking (Micropython)
## Usage
### Basic Text Message
#### Python/Micropython
```python
from nats_bridge import smartsend, smartreceive
# Sender
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# Receiver
payloads = smartreceive(msg)
for dataname, data, type in payloads:
print("Received {}: {}".format(dataname, data))
```
#### Julia
```julia
using NATSBridge
# Sender
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# Receiver
envelope = smartreceive(msg, fileserverDownloadHandler)
# envelope["payloads"] = [("message", "Hello World", "text"), ...]
```
#### JavaScript
```javascript
const { smartsend, smartreceive } = require('./src/NATSBridge');
// Sender
await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
// Receiver
const envelope = await smartreceive(msg);
// envelope.payloads = [{ dataname: "message", data: "Hello World", type: "text" }, ...]
```
### Sending JSON Configuration
#### Python/Micropython
```python
from nats_bridge import smartsend
config = {
"wifi_ssid": "MyNetwork",
"wifi_password": "password123",
"update_interval": 60
}
data = [("config", config, "dictionary")]
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")
```
### Mixed Content (Chat with Text + Image)
#### Python/Micropython
```python
from nats_bridge import smartsend
image_data = b"\x89PNG..." # PNG bytes
data = [
("message_text", "Hello with image!", "text"),
("user_avatar", image_data, "binary")
]
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
```
### Request-Response Pattern
#### Python/Micropython
```python
from nats_bridge import smartsend
# Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend(
"/device/command",
data,
nats_url="nats://localhost:4222",
reply_to="/device/response",
reply_to_msg_id="cmd-001"
)
```
### Large Payloads (File Server)
#### Python/Micropython
```python
from nats_bridge import smartsend
# Large data (> 1MB)
large_data = b"A" * 2000000 # 2MB
env = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
nats_url="nats://localhost:4222",
fileserver_url="http://localhost:8080",
size_threshold=1000000 # 1MB threshold
)
```
## API Reference
### `smartsend(subject, data, ...)`
Send data via NATS with automatic transport selection.
**Arguments:**
- `subject` (str): NATS subject to publish to
- `data` (list): List of `(dataname, data, type)` tuples
- `nats_url` (str): NATS server URL (default: `nats://localhost:4222`)
- `fileserver_url` (str): HTTP file server URL (default: `http://localhost:8080`)
- `size_threshold` (int): Threshold in bytes (default: 1,000,000)
- `correlation_id` (str): Optional correlation ID for tracing
- `msg_purpose` (str): Message purpose (default: `"chat"`)
- `sender_name` (str): Sender name (default: `"NATSBridge"`)
- `receiver_name` (str): Receiver name (default: `""`)
- `receiver_id` (str): Receiver ID (default: `""`)
- `reply_to` (str): Reply topic (default: `""`)
- `reply_to_msg_id` (str): Reply message ID (default: `""`)
**Returns:** `MessageEnvelope` object
### `smartreceive(msg, ...)`
Receive and process NATS messages.
**Arguments:**
- `msg`: NATS message (dict or JSON string)
- `fileserver_download_handler` (function): Function to fetch data from URLs
- `max_retries` (int): Maximum retry attempts (default: 5)
- `base_delay` (int): Initial delay in ms (default: 100)
- `max_delay` (int): Maximum delay in ms (default: 5000)
**Returns:** List of `(dataname, data, type)` tuples
### `MessageEnvelope`
Represents a complete NATS message envelope.
**Attributes:**
- `correlation_id`: Unique identifier for tracing
- `msg_id`: Unique message identifier
- `timestamp`: Message publication timestamp
- `send_to`: NATS subject
- `msg_purpose`: Message purpose
- `sender_name`: Sender name
- `sender_id`: Sender UUID
- `receiver_name`: Receiver name
- `receiver_id`: Receiver UUID
- `reply_to`: Reply topic
- `reply_to_msg_id`: Reply message ID
- `broker_url`: NATS broker URL
- `metadata`: Message-level metadata
- `payloads`: List of MessagePayload objects
### `MessagePayload`
Represents a single payload within a message envelope.
**Attributes:**
- `id`: Unique payload identifier
- `dataname`: Name of the payload
- `type`: Payload type ("text", "dictionary", etc.)
- `transport`: Transport method ("direct" or "link")
- `encoding`: Encoding method ("none", "base64", etc.)
- `size`: Payload size in bytes
- `data`: Payload data (bytes for direct, URL for link)
- `metadata`: Payload-level metadata
## Examples
See [`examples/micropython_example.py`](../examples/micropython_example.py) for more detailed examples.
## Testing
Run the test suite:
```bash
# Python/Micropython
python test/test_micropython_basic.py
# JavaScript
node test/test_js_to_js_text_sender.js
node test/test_js_to_js_text_receiver.js
# Julia
julia test/test_julia_to_julia_text_sender.jl
julia test/test_julia_to_julia_text_receiver.jl
```
## Requirements
- **Julia**: NATS server (nats.io), HTTP file server (optional)
- **JavaScript**: NATS server (nats.io), HTTP file server (optional)
- **Python/Micropython**: NATS server (nats.io), HTTP file server (optional)
## License
MIT

View File

@@ -437,7 +437,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL,
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id=""):
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True):
"""Send data either directly via NATS or via a fileserver URL, depending on payload size.
This function intelligently routes data delivery based on payload size relative to a threshold.
@@ -459,9 +459,12 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
receiver_id: UUID of the receiver
reply_to: Topic to reply to
reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS (default: True)
Returns:
MessageEnvelope: The envelope object for tracking
tuple: (env, env_json_str) where:
- env: MessageEnvelope object with all metadata and payloads
- env_json_str: JSON string representation of the envelope for publishing
"""
# Generate correlation ID if not provided
cid = correlation_id if correlation_id else str(uuid.uuid4())
@@ -549,13 +552,15 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
msg_json = env.to_json()
# Publish to NATS
nats_conn = NATSConnection(nats_url)
nats_conn.connect()
nats_conn.publish(subject, msg_json)
nats_conn.close()
# Publish to NATS if is_publish is True
if is_publish:
nats_conn = NATSConnection(nats_url)
nats_conn.connect()
nats_conn.publish(subject, msg_json)
nats_conn.close()
return env
# Return tuple of (envelope, json_string) for both direct and link transport
return (env, msg_json)
def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5,

View File

@@ -118,7 +118,7 @@ async function test_dict_send() {
// Use smartsend with dictionary type
// For small Dictionary: will use direct transport (JSON encoded)
// For large Dictionary: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -132,7 +132,8 @@ async function test_dict_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -98,7 +98,7 @@ async function test_large_binary_send() {
// Use smartsend with binary type - will automatically use link transport
// if file size exceeds the threshold (1MB by default)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -112,7 +112,8 @@ async function test_large_binary_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -222,7 +222,7 @@ async function test_mix_send() {
];
// Use smartsend with mixed content
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
payloads,
{
@@ -236,7 +236,8 @@ async function test_mix_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -118,7 +118,7 @@ async function test_table_send() {
// Use smartsend with table type
// For small Table: will use direct transport (Arrow IPC encoded)
// For large Table: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -132,7 +132,8 @@ async function test_table_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -94,7 +94,7 @@ async function test_text_send() {
// Use smartsend with text type
// For small text: will use direct transport (Base64 encoded UTF-8)
// For large text: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -108,7 +108,8 @@ async function test_text_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -92,12 +92,12 @@ function test_dict_send()
# Use smartsend with dictionary type
# For small Dictionary: will use direct transport (JSON encoded)
# For large Dictionary: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -105,7 +105,8 @@ function test_dict_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -114,7 +115,7 @@ function test_dict_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -79,12 +79,12 @@ function test_large_binary_send()
# Use smartsend with binary type - will automatically use link transport
# if file size exceeds the threshold (1MB by default)
# API: smartsend(subject, [(dataname, data, type), ...]; keywords...)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL;
broker_url = NATS_URL;
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -92,11 +92,12 @@ function test_large_binary_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with transport: $(env.payloads[1].transport)")
log_trace("Envelope type: $(env.payloads[1].type)")
log_trace("Envelope type: $(env.payloads[1].payload_type)")
# Check if link transport was used
if env.payloads[1].transport == "link"

View File

@@ -186,12 +186,12 @@ function test_mix_send()
]
# Use smartsend with mixed content
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
payloads; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -199,7 +199,8 @@ function test_mix_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -208,7 +209,7 @@ function test_mix_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -90,12 +90,12 @@ function test_table_send()
# Use smartsend with table type
# For small DataFrame: will use direct transport (Base64 encoded Arrow IPC)
# For large DataFrame: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -103,7 +103,8 @@ function test_table_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -112,7 +113,7 @@ function test_table_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -75,12 +75,12 @@ function test_text_send()
# Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -88,7 +88,8 @@ function test_text_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -97,7 +98,7 @@ function test_text_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -64,7 +64,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with dictionary type
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -76,7 +76,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -44,7 +44,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with binary type
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -56,7 +56,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -58,7 +58,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with mixed types
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
data, # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -70,7 +70,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -46,7 +46,7 @@ def main():
# Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver)
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -58,7 +58,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")