28 Commits

Author SHA1 Message Date
61d81bed62 update 2026-02-25 06:04:40 +07:00
1a10bc1a5f update 2026-02-25 05:32:59 +07:00
7f68d08134 update 2026-02-24 21:40:33 +07:00
ab20cd896f update 2026-02-24 21:18:19 +07:00
5a9e93d6e7 update 2026-02-24 20:38:45 +07:00
b51641dc7e update 2026-02-24 20:09:10 +07:00
45f1257896 update 2026-02-24 18:50:28 +07:00
3e2b8b1e3a update 2026-02-24 18:19:03 +07:00
90d81617ef update 2026-02-24 17:58:59 +07:00
64c62e616b update 2026-02-23 22:06:57 +07:00
2c340e37c7 update 2026-02-23 22:00:06 +07:00
7853e94d2e update 2026-02-23 21:54:50 +07:00
99bf57b154 update 2026-02-23 21:43:09 +07:00
0fa6eaf95b update 2026-02-23 21:37:50 +07:00
76f42be740 update 2026-02-23 21:32:22 +07:00
d99dc41be9 update 2026-02-23 21:09:36 +07:00
263508b8f7 update 2026-02-23 20:50:41 +07:00
0c2cca30ed update 2026-02-23 20:34:08 +07:00
46fdf668c6 update 2026-02-23 19:18:12 +07:00
f8a92a45a0 update README.md 2026-02-23 09:39:24 +07:00
cec70e6036 update 2026-02-23 08:11:03 +07:00
f9e08ba628 add Plik fileserver 2026-02-23 07:58:18 +07:00
c12a078149 update README.md 2026-02-23 07:55:10 +07:00
dedd803dc3 fix README.md 2026-02-23 07:24:54 +07:00
e8e927a491 move README.md 2026-02-23 07:17:31 +07:00
ton
d950bbac23 Merge pull request 'smartreceive_return_envelope' (#7) from smartreceive_return_envelope into main
Reviewed-on: #7
2026-02-23 00:11:09 +00:00
fc8da2ebf5 update 2026-02-23 07:08:17 +07:00
f6e50c405f update 2026-02-23 07:06:53 +07:00
35 changed files with 2039 additions and 1022 deletions

View File

@@ -1,6 +1,6 @@
name = "NATSBridge"
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
version = "0.4.2"
version = "0.4.3"
authors = ["narawat <narawat@gmail.com>"]
[deps]

968
README.md Normal file
View File

@@ -0,0 +1,968 @@
# NATSBridge
A high-performance, bi-directional data bridge between **Julia**, **JavaScript**, and **Python/Micropython** applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![NATS](https://img.shields.io/badge/NATS-Enabled-green.svg)](https://nats.io)
---
## Table of Contents
- [Overview](#overview)
- [Features](#features)
- [Architecture](#architecture)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [API Reference](#api-reference)
- [Payload Types](#payload-types)
- [Transport Strategies](#transport-strategies)
- [Examples](#examples)
- [Testing](#testing)
- [License](#license)
---
## Overview
NATSBridge enables seamless communication across Julia, JavaScript, and Python/Micropython applications through NATS, with intelligent transport selection based on payload size:
| Transport | Payload Size | Method |
|-----------|--------------|--------|
| **Direct** | < 1MB | Sent directly via NATS (Base64 encoded) |
| **Link** | >= 1MB | Uploaded to HTTP file server, URL sent via NATS |
### Use Cases
- **Chat Applications**: Text, images, audio, video in a single message
- **File Transfer**: Efficient transfer of large files using claim-check pattern
- **Streaming Data**: Sensor data, telemetry, and analytics pipelines
- **Cross-Platform Communication**: Julia ↔ JavaScript ↔ Python/Micropython
- **IoT Devices**: Micropython devices sending data to cloud services
---
## Features
-**Bi-directional messaging** between Julia, JavaScript, and Python/Micropython
-**Multi-payload support** - send multiple payloads with different types in one message
-**Automatic transport selection** - direct vs link based on payload size
-**Claim-Check pattern** for payloads > 1MB
-**Apache Arrow IPC** support for tabular data (zero-copy reading)
-**Exponential backoff** for reliable file server downloads
-**Correlation ID tracking** for message tracing
-**Reply-to support** for request-response patterns
-**JetStream support** for message replay and durability
-**Lightweight Micropython implementation** for microcontrollers
---
## Architecture
### System Components
```
┌─────────────────────────────────────────────────────────────────────┐
│ NATSBridge Architecture │
├─────────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Julia │ │ JavaScript │ │ Python/ │ │
│ │ (NATS.jl) │◄──►│ (nats.js) │◄──►│ Micropython │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ NATS │ │
│ │ (Message Broker) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ File Server │ │
│ │ (HTTP Upload/Get) │ │
│ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
```
### Message Flow
1. **Sender** creates a message envelope with payloads
2. **NATSBridge** serializes and encodes payloads based on type
3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server
4. **NATS** routes messages to subscribers
5. **Receiver** fetches payloads (from NATS or file server)
6. **NATSBridge** deserializes and decodes payloads
---
## Installation
### Prerequisites
- **NATS Server** (v2.10+ recommended)
- **HTTP File Server** (optional, for payloads > 1MB)
### Julia
```julia
using Pkg
Pkg.add("NATS")
Pkg.add("https://git.yiem.cc/ton/NATSBridge")
```
### JavaScript
```bash
npm install nats.js apache-arrow uuid base64-url
```
For Node.js:
```javascript
const { smartsend, smartreceive } = require('./src/NATSBridge');
```
For browser:
```html
<script src="./src/NATSBridge.js"></script>
<script>
// NATSBridge is available as window.NATSBridge
</script>
```
### Python/Micropython
1. Copy [`src/nats_bridge.py`](src/nats_bridge.py) to your device
2. Install dependencies:
**For Python (desktop):**
```bash
pip install nats-py
```
**For Micropython:**
- `urequests` for HTTP requests (built-in for ESP32)
- `base64` for base64 encoding (built-in)
- `json` for JSON handling (built-in)
---
## Quick Start
### Step 1: Start NATS Server
```bash
docker run -p 4222:4222 nats:latest
```
### Step 2: Start HTTP File Server (Optional)
```bash
# Create a directory for file uploads
mkdir -p /tmp/fileserver
# Use Python's built-in server
python3 -m http.server 8080 --directory /tmp/fileserver
```
### Step 3: Send Your First Message
#### Python/Micropython
```python
from nats_bridge import smartsend
# Send a text message
data = [("message", "Hello World", "text")]
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
print("Message sent!")
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
// Send a text message
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
console.log("Message sent!");
```
#### Julia
```julia
using NATSBridge
# Send a text message
data = [("message", "Hello World", "text")]
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; nats_url="nats://localhost:4222")
println("Message sent!")
```
### Step 4: Receive Messages
#### Python/Micropython
```python
import nats
import asyncio
from nats_bridge import smartreceive
# Configuration
SUBJECT = "/chat/room1"
NATS_URL = "nats://localhost:4222"
async def main():
# Connect to NATS
nc = await nats.connect(NATS_URL)
# Subscribe to the subject - msg comes from the callback
async def message_handler(msg):
# Receive and process message
env = smartreceive(msg.data)
for dataname, data, type in env["payloads"]:
print(f"Received {dataname}: {data}")
sid = await nc.subscribe(SUBJECT, cb=message_handler)
await asyncio.sleep(120) # Keep listening
await nc.close()
asyncio.run(main())
```
#### JavaScript
```javascript
const { smartreceive } = require('./src/NATSBridge');
const { connect } = require('nats');
// Configuration
const SUBJECT = "/chat/room1";
const NATS_URL = "nats://localhost:4222";
async function main() {
// Connect to NATS
const nc = await connect({ servers: [NATS_URL] });
// Subscribe to the subject - msg comes from the async iteration
const sub = nc.subscribe(SUBJECT);
for await (const msg of sub) {
// Receive and process message
const env = await smartreceive(msg);
for (const payload of env.payloads) {
console.log(`Received ${payload.dataname}: ${payload.data}`);
}
}
}
main();
```
#### Julia
```julia
using NATS, NATSBridge
# Configuration
const SUBJECT = "/chat/room1"
const NATS_URL = "nats://localhost:4222"
# Helper: Log with correlation ID
function log_trace(message)
timestamp = Dates.now()
println("[$timestamp] $message")
end
# Receiver: Listen for messages - msg comes from the callback
function test_receive()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
log_trace("Received message on $(msg.subject)")
# Receive and process message
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data")
end
end
# Keep listening for 120 seconds
sleep(120)
NATS.drain(conn)
end
test_receive()
```
---
## API Reference
### smartsend
Sends data either directly via NATS or via a fileserver URL, depending on payload size.
#### Python/Micropython
```python
from nats_bridge import smartsend
env, env_json_str = smartsend(
subject, # NATS subject to publish to
data, # List of (dataname, data, type) tuples
nats_url="nats://localhost:4222", # NATS server URL
fileserver_url="http://localhost:8080", # File server URL
fileserver_upload_handler=plik_oneshot_upload, # Upload handler function
size_threshold=1_000_000, # Threshold in bytes (default: 1MB)
correlation_id=None, # Optional correlation ID for tracing
msg_purpose="chat", # Message purpose
sender_name="NATSBridge", # Sender name
receiver_name="", # Receiver name (empty = broadcast)
receiver_id="", # Receiver UUID (empty = broadcast)
reply_to="", # Reply topic
reply_to_msg_id="", # Reply message ID
is_publish=True # Whether to automatically publish to NATS
)
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
const { env, env_json_str } = await smartsend(
subject, // NATS subject
data, // Array of {dataname, data, type}
{
natsUrl: "nats://localhost:4222",
fileserverUrl: "http://localhost:8080",
fileserverUploadHandler: customUploadHandler,
sizeThreshold: 1_000_000,
correlationId: "custom-id",
msgPurpose: "chat",
senderName: "NATSBridge",
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: "",
isPublish: true // Whether to automatically publish to NATS
}
);
```
#### Julia
```julia
using NATSBridge
env, env_json_str = NATSBridge.smartsend(
subject, # NATS subject
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type)
nats_url::String = "nats://localhost:4222",
fileserver_url = "http://localhost:8080",
fileserverUploadHandler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS
)
# Returns: (msgEnvelope_v1, JSON string)
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
# - env_json_str: JSON string representation of the envelope for publishing
```
### smartreceive
Receives and processes messages from NATS, handling both direct and link transport.
#### Python/Micropython
```python
from nats_bridge import smartreceive
# Note: For nats-py, use msg.data to pass the raw message data
env = smartreceive(
msg.data, # NATS message data (msg.data for nats-py)
fileserver_download_handler=_fetch_with_backoff, # Download handler
max_retries=5, # Max retry attempts
base_delay=100, # Initial delay in ms
max_delay=5000 # Max delay in ms
)
# Returns: Dict with envelope metadata and 'payloads' field
```
#### JavaScript
```javascript
const { smartreceive } = require('./src/NATSBridge');
// Note: msg is the NATS message object from subscription
const env = await smartreceive(
msg, // NATS message (raw object from subscription)
{
fileserverDownloadHandler: customDownloadHandler,
maxRetries: 5,
baseDelay: 100,
maxDelay: 5000
}
);
// Returns: Object with envelope metadata and payloads array
```
#### Julia
```julia
using NATSBridge
# Note: msg is a NATS.Msg object passed from the subscription callback
env, env_json_str = NATSBridge.smartreceive(
msg::NATS.Msg;
fileserverDownloadHandler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)
# Returns: Dict with envelope metadata and payloads array
```
---
## Payload Types
| Type | Description | Serialization |
|------|-------------|---------------|
| `text` | Plain text strings | UTF-8 bytes |
| `dictionary` | JSON-serializable dictionaries | JSON |
| `table` | Tabular data (DataFrames, arrays) | Apache Arrow IPC |
| `image` | Image data (PNG, JPG) | Raw bytes |
| `audio` | Audio data (WAV, MP3) | Raw bytes |
| `video` | Video data (MP4, AVI) | Raw bytes |
| `binary` | Generic binary data | Raw bytes |
---
## Transport Strategies
### Direct Transport (Payloads < 1MB)
Small payloads are sent directly via NATS with Base64 encoding.
#### Python/Micropython
```python
data = [("message", "Hello", "text")]
smartsend("/topic", data)
```
#### JavaScript
```javascript
await smartsend("/topic", [
{ dataname: "message", data: "Hello", type: "text" }
]);
```
#### Julia
```julia
data = [("message", "Hello", "text")]
smartsend("/topic", data)
```
### Link Transport (Payloads >= 1MB)
Large payloads are uploaded to an HTTP file server.
#### Python/Micropython
```python
data = [("file", large_data, "binary")]
smartsend("/topic", data, fileserver_url="http://localhost:8080")
```
#### JavaScript
```javascript
await smartsend("/topic", [
{ dataname: "file", data: largeData, type: "binary" }
], { fileserverUrl: "http://localhost:8080" });
```
#### Julia
```julia
data = [("file", large_data, "binary")]
smartsend("/topic", data; fileserver_url="http://localhost:8080")
```
---
## Examples
All examples include code for **Julia**, **JavaScript**, and **Python/Micropython** unless otherwise specified.
### Example 1: Chat with Mixed Content
Send text, small image, and large file in one message.
#### Python/Micropython
```python
from nats_bridge import smartsend
data = [
("message_text", "Hello!", "text"),
("user_avatar", image_data, "image"),
("large_document", large_file_data, "binary")
]
env, env_json_str = smartsend("/chat/room1", data, fileserver_url="http://localhost:8080")
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message_text", data: "Hello!", type: "text" },
{ dataname: "user_avatar", data: image_data, type: "image" },
{ dataname: "large_document", data: large_file_data, type: "binary" }
], {
fileserverUrl: "http://localhost:8080"
});
```
#### Julia
```julia
using NATSBridge
data = [
("message_text", "Hello!", "text"),
("user_avatar", image_data, "image"),
("large_document", large_file_data, "binary")
]
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; fileserver_url="http://localhost:8080")
```
### Example 2: Dictionary Exchange
Send configuration data between platforms.
#### Python/Micropython
```python
from nats_bridge import smartsend
config = {
"wifi_ssid": "MyNetwork",
"wifi_password": "password123",
"update_interval": 60
}
data = [("config", config, "dictionary")]
env, env_json_str = smartsend("/device/config", data)
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
const config = {
wifi_ssid: "MyNetwork",
wifi_password: "password123",
update_interval: 60
};
const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" }
]);
```
#### Julia
```julia
using NATSBridge
config = Dict(
"wifi_ssid" => "MyNetwork",
"wifi_password" => "password123",
"update_interval" => 60
)
data = [("config", config, "dictionary")]
env, env_json_str = NATSBridge.smartsend("/device/config", data)
```
### Example 3: Table Data (Arrow IPC)
Send tabular data using Apache Arrow IPC format.
#### Python/Micropython
```python
import pandas as pd
from nats_bridge import smartsend
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"score": [95, 88, 92]
})
data = [("students", df, "table")]
env, env_json_str = smartsend("/data/analysis", data)
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
const tableData = [
{ id: 1, name: "Alice", score: 95 },
{ id: 2, name: "Bob", score: 88 },
{ id: 3, name: "Charlie", score: 92 }
];
const { env, env_json_str } = await smartsend("/data/analysis", [
{ dataname: "students", data: tableData, type: "table" }
]);
```
#### Julia
```julia
using NATSBridge
using DataFrames
df = DataFrame(
id = [1, 2, 3],
name = ["Alice", "Bob", "Charlie"],
score = [95, 88, 92]
)
data = [("students", df, "table")]
env, env_json_str = NATSBridge.smartsend("/data/analysis", data)
```
### Example 4: Request-Response Pattern with Envelope JSON
Bi-directional communication with reply-to support. The `smartsend` function now returns both the envelope object and a JSON string that can be published directly.
#### Python/Micropython (Requester)
```python
from nats_bridge import smartsend
env, env_json_str = smartsend(
"/device/command",
[("command", {"action": "read_sensor"}, "dictionary")],
reply_to="/device/response"
)
# env: msgEnvelope_v1 object
# env_json_str: JSON string for publishing to NATS
# The env_json_str can also be published directly using NATS request-reply pattern
# nc.request("/device/command", env_json_str, reply_to="/device/response")
```
#### Python/Micropython (Responder)
```python
import nats
import asyncio
from nats_bridge import smartreceive, smartsend
# Configuration
SUBJECT = "/device/command"
REPLY_SUBJECT = "/device/response"
NATS_URL = "nats://localhost:4222"
async def main():
nc = await nats.connect(NATS_URL)
async def message_handler(msg):
env = smartreceive(msg.data)
for dataname, data, type in env["payloads"]:
if data.get("action") == "read_sensor":
response = {"sensor_id": "sensor-001", "value": 42.5}
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
sid = await nc.subscribe(SUBJECT, cb=message_handler)
await asyncio.sleep(120)
await nc.close()
asyncio.run(main())
```
#### JavaScript (Requester)
```javascript
const { smartsend } = require('./src/NATSBridge');
const { env, env_json_str } = await smartsend("/device/command", [
{ dataname: "command", data: { action: "read_sensor" }, type: "dictionary" }
], {
replyTo: "/device/response"
});
```
#### JavaScript (Responder)
```javascript
const { smartreceive, smartsend } = require('./src/NATSBridge');
const { connect } = require('nats');
// Configuration
const SUBJECT = "/device/command";
const REPLY_SUBJECT = "/device/response";
const NATS_URL = "nats://localhost:4222";
async function main() {
const nc = await connect({ servers: [NATS_URL] });
const sub = nc.subscribe(SUBJECT);
for await (const msg of sub) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
if (payload.dataname === "command" && payload.data.action === "read_sensor") {
const response = { sensor_id: "sensor-001", value: 42.5 };
await smartsend(REPLY_SUBJECT, [
{ dataname: "data", data: response, type: "dictionary" }
]);
}
}
}
}
main();
```
#### Julia (Requester)
```julia
using NATSBridge
env, env_json_str = NATSBridge.smartsend(
"/device/command",
[("command", Dict("action" => "read_sensor"), "dictionary")];
reply_to="/device/response"
)
```
#### Julia (Responder)
```julia
using NATS, NATSBridge
# Configuration
const SUBJECT = "/device/command"
const REPLY_SUBJECT = "/device/response"
const NATS_URL = "nats://localhost:4222"
function test_responder()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
if dataname == "command" && data["action"] == "read_sensor"
response = Dict("sensor_id" => "sensor-001", "value" => 42.5)
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
end
end
end
sleep(120)
NATS.drain(conn)
end
test_responder()
```
### Example 5: Micropython IoT Device
Lightweight Micropython device sending sensor data.
#### Micropython
```python
import nats
import asyncio
from nats_bridge import smartsend, smartreceive
# Configuration
SUBJECT = "/device/sensors"
NATS_URL = "nats://localhost:4222"
async def main():
nc = await nats.connect(NATS_URL)
# Send sensor data
data = [("temperature", "25.5", "text"), ("humidity", 65, "dictionary")]
smartsend("/device/sensors", data, nats_url="nats://localhost:4222")
# Receive commands - msg comes from the callback
async def message_handler(msg):
env = smartreceive(msg.data)
for dataname, data, type in env["payloads"]:
if type == "dictionary" and data.get("action") == "reboot":
# Execute reboot
pass
sid = await nc.subscribe(SUBJECT, cb=message_handler)
await asyncio.sleep(120)
await nc.close()
asyncio.run(main())
```
#### JavaScript (Receiver)
```javascript
const { smartreceive } = require('./src/NATSBridge');
const { connect } = require('nats');
// Configuration
const SUBJECT = "/device/sensors";
const NATS_URL = "nats://localhost:4222";
async function main() {
const nc = await connect({ servers: [NATS_URL] });
const sub = nc.subscribe(SUBJECT);
for await (const msg of sub) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
if (payload.dataname === "temperature") {
console.log(`Temperature: ${payload.data}`);
} else if (payload.dataname === "humidity") {
console.log(`Humidity: ${payload.data}`);
}
}
}
}
main();
```
#### Julia (Receiver)
```julia
using NATS, NATSBridge
# Configuration
const SUBJECT = "/device/sensors"
const NATS_URL = "nats://localhost:4222"
function test_receiver()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
if dataname == "temperature"
println("Temperature: $data")
elseif dataname == "humidity"
println("Humidity: $data")
end
end
end
sleep(120)
NATS.drain(conn)
end
test_receiver()
```
---
## Testing
Run the test scripts to verify functionality:
### Python/Micropython
```bash
# Basic functionality test
python test/test_micropython_basic.py
# Text message exchange
python test/test_micropython_text_sender.py
python test/test_micropython_text_receiver.py
# Dictionary exchange
python test/test_micropython_dict_sender.py
python test/test_micropython_dict_receiver.py
# File transfer
python test/test_micropython_file_sender.py
python test/test_micropython_file_receiver.py
# Mixed payload types
python test/test_micropython_mixed_sender.py
python test/test_micropython_mixed_receiver.py
```
### JavaScript
```bash
# Text message exchange
node test/test_js_text_sender.js
node test/test_js_text_receiver.js
# Dictionary exchange
node test/test_js_dict_sender.js
node test/test_js_dict_receiver.js
# File transfer
node test/test_js_file_sender.js
node test/test_js_file_receiver.js
# Mixed payload types
node test/test_js_mix_payload_sender.js
node test/test_js_mix_payloads_receiver.js
# Table exchange
node test/test_js_table_sender.js
node test/test_js_table_receiver.js
```
### Julia
```julia
# Text message exchange
julia test/test_julia_text_sender.jl
julia test/test_julia_text_receiver.jl
# Dictionary exchange
julia test/test_julia_dict_sender.jl
julia test/test_julia_dict_receiver.jl
# File transfer
julia test/test_julia_file_sender.jl
julia test/test_julia_file_receiver.jl
# Mixed payload types
julia test/test_julia_mix_payloads_sender.jl
julia test/test_julia_mix_payloads_receiver.jl
# Table exchange
julia test/test_julia_table_sender.jl
julia test/test_julia_table_receiver.jl
```
---
## License
MIT License
Copyright (c) 2026 NATSBridge Contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -17,16 +17,16 @@ The system uses **handler functions** to abstract file server operations, allowi
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserverUploadHandler parameter
# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserverDownloadHandler parameter
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
```
This design allows the system to support multiple file server backends without changing the core messaging logic.
@@ -40,21 +40,21 @@ The system uses a **standardized list-of-tuples format** for all payload operati
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# {
# "correlationId": "...",
# "msgId": "...",
# "correlation_id": "...",
# "msg_id": "...",
# "timestamp": "...",
# "sendTo": "...",
# "msgPurpose": "...",
# "senderName": "...",
# "senderId": "...",
# "receiverName": "...",
# "receiverId": "...",
# "replyTo": "...",
# "replyToMsgId": "...",
# "brokerURL": "...",
# "send_to": "...",
# "msg_purpose": "...",
# "sender_name": "...",
# "sender_id": "...",
# "receiver_name": "...",
# "receiver_id": "...",
# "reply_to": "...",
# "reply_to_msg_id": "...",
# "broker_url": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
@@ -78,17 +78,16 @@ This design allows per-payload type specification, enabling **mixed-content mess
smartsend(
"/test",
[("dataname1", data1, "dictionary")], # List with one tuple (data, type)
nats_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload,
metadata=user_provided_envelope_level_metadata
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads in one message with different types
smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
nats_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Mixed content (e.g., chat with text, image, audio)
@@ -99,13 +98,14 @@ smartsend(
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
nats_url="nats://localhost:4222"
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay)
# envelope["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# envelope["correlationId"], envelope["msgId"], etc.
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
```
## Architecture Diagram
@@ -138,48 +138,48 @@ flowchart TD
## System Components
### 1. msgEnvelope_v1 - Message Envelope
### 1. msg_envelope_v1 - Message Envelope
The `msgEnvelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications.
The `msg_envelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications.
**Julia Structure:**
```julia
struct msgEnvelope_v1
correlationId::String # Unique identifier to track messages across systems
msgId::String # This message id
struct msg_envelope_v1
correlation_id::String # Unique identifier to track messages across systems
msg_id::String # This message id
timestamp::String # Message published timestamp
sendTo::String # Topic/subject the sender sends to
msgPurpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...)
senderName::String # Sender name (e.g., "agent-wine-web-frontend")
senderId::String # Sender id (uuid4)
receiverName::String # Message receiver name (e.g., "agent-backend")
receiverId::String # Message receiver id (uuid4 or nothing for broadcast)
replyTo::String # Topic to reply to
replyToMsgId::String # Message id this message is replying to
brokerURL::String # NATS server address
send_to::String # Topic/subject the sender sends to
msg_purpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...)
sender_name::String # Sender name (e.g., "agent-wine-web-frontend")
sender_id::String # Sender id (uuid4)
receiver_name::String # Message receiver name (e.g., "agent-backend")
receiver_id::String # Message receiver id (uuid4 or nothing for broadcast)
reply_to::String # Topic to reply to
reply_to_msg_id::String # Message id this message is replying to
broker_url::String # NATS server address
metadata::Dict{String, Any}
payloads::AbstractArray{msgPayload_v1} # Multiple payloads stored here
payloads::Vector{msg_payload_v1} # Multiple payloads stored here
end
```
**JSON Schema:**
```json
{
"correlationId": "uuid-v4-string",
"msgId": "uuid-v4-string",
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"sendTo": "topic/subject",
"msgPurpose": "ACK | NACK | updateStatus | shutdown | chat",
"senderName": "agent-wine-web-frontend",
"senderId": "uuid4",
"receiverName": "agent-backend",
"receiverId": "uuid4",
"replyTo": "topic",
"replyToMsgId": "uuid4",
"brokerURL": "nats://localhost:4222",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
@@ -189,7 +189,7 @@ end
{
"id": "uuid4",
"dataname": "login_image",
"type": "image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,
@@ -201,7 +201,7 @@ end
{
"id": "uuid4",
"dataname": "large_data",
"type": "table",
"payload_type": "table",
"transport": "link",
"encoding": "none",
"size": 524288,
@@ -214,16 +214,16 @@ end
}
```
### 2. msgPayload_v1 - Payload Structure
### 2. msg_payload_v1 - Payload Structure
The `msgPayload_v1` structure provides flexible payload handling for various data types across all supported platforms.
The `msg_payload_v1` structure provides flexible payload handling for various data types across all supported platforms.
**Julia Structure:**
```julia
struct msgPayload_v1
struct msg_payload_v1
id::String # Id of this payload (e.g., "uuid4")
dataname::String # Name of this payload (e.g., "login_image")
type::String # "text | dictionary | table | image | audio | video | binary"
payload_type::String # "text | dictionary | table | image | audio | video | binary"
transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc"
size::Integer # Data size in bytes
@@ -383,13 +383,32 @@ graph TD
```julia
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, Any, String}}; # No standalone type parameter
nats_url::String = "nats://localhost:4222",
fileserverUploadHandler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000 # 1MB
data::AbstractArray{Tuple{String, Any, String}, 1}; # List of (dataname, data, type) tuples
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS
)
```
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
- `env_json_str::String` - JSON string representation of the envelope for publishing
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
The envelope object can be accessed directly for programmatic use, while the JSON string can be published directly to NATS using the request-reply pattern.
**Input Format:**
- `data::AbstractArray{Tuple{String, Any, String}}` - **Must be a list of (dataname, data, type) tuples**: `[("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
@@ -406,8 +425,8 @@ function smartsend(
```julia
function smartreceive(
msg::NATS.Message,
fileserverDownloadHandler::Function;
msg::NATS.Msg;
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
@@ -416,7 +435,7 @@ function smartreceive(
# Iterate through all payloads
# For each payload: check transport type
# If direct: decode Base64 payload
# If link: fetch from URL with exponential backoff using fileserverDownloadHandler
# If link: fetch from URL with exponential backoff using fileserver_download_handler
# Deserialize payload based on type
# Return envelope dictionary with all metadata and deserialized payloads
end
@@ -424,9 +443,9 @@ end
**Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
- `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
@@ -434,71 +453,186 @@ end
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff (via `fileserverDownloadHandler`)
- If `link`: fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserverDownloadHandler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
**Note:** The `fileserver_download_handler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
### JavaScript Implementation
#### Dependencies
- `nats.js` - Core NATS functionality
- `apache-arrow` - Arrow IPC serialization
- `uuid` - Correlation ID generation
- `uuid` - Correlation ID and message ID generation
- `base64-arraybuffer` - Base64 encoding/decoding
- `node-fetch` or `fetch` - HTTP client for file server
#### smartsend Function
```javascript
async function smartsend(subject, data, options = {})
// data format: [(dataname, data, type), ...]
// options object should include:
// - natsUrl: NATS server URL
// - fileserverUrl: base URL of the file server
// - sizeThreshold: threshold in bytes for transport selection
// - correlationId: optional correlation ID for tracing
async function smartsend(
subject,
data, // List of (dataname, data, type) tuples: [(dataname1, data1, type1), ...]
options = {}
)
```
**Options:**
- `broker_url` (String) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (String) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (Number) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (String) - Optional correlation ID for tracing
- `msg_purpose` (String) - Purpose of the message (default: `"chat"`)
- `sender_name` (String) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (String) - Message receiver name (default: `""`)
- `receiver_id` (String) - Message receiver ID (default: `""`)
- `reply_to` (String) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (String) - Message ID this message is replying to (default: `""`)
- `fileserver_upload_handler` (Function) - Custom upload handler function
**Return Value:**
- Returns a Promise that resolves to an object containing:
- `env` - The envelope object containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
- `published` - Boolean indicating whether the message was automatically published to NATS
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Flow:**
1. Iterate through the list of (dataname, data, type) tuples
2. For each payload: extract the type from the tuple and serialize accordingly
3. Check payload size
4. If < threshold: publish directly to NATS
5. If >= threshold: upload to HTTP server, publish NATS with URL
1. Generate correlation ID and message ID if not provided
2. Iterate through the list of `(dataname, data, type)` tuples
3. For each payload:
- Serialize based on payload type
- Check payload size
- If < threshold: Base64 encode and include in envelope
- If >= threshold: Upload to HTTP server, store URL in envelope
4. Publish the JSON envelope to NATS
5. Return envelope object and JSON string
#### smartreceive Handler
```javascript
async function smartreceive(msg, options = {})
// options object should include:
// - fileserverDownloadHandler: function to fetch data from file server URL
// - max_retries: maximum retry attempts for fetching URL
// - base_delay: initial delay for exponential backoff in ms
// - max_delay: maximum delay for exponential backoff in ms
// - correlationId: optional correlation ID for tracing
```
**Options:**
- `fileserver_download_handler` (Function) - Custom download handler function
- `max_retries` (Number) - Maximum retry attempts for fetching URL (default: `5`)
- `base_delay` (Number) - Initial delay for exponential backoff in ms (default: `100`)
- `max_delay` (Number) - Maximum delay for exponential backoff in ms (default: `5000`)
- `correlation_id` (String) - Optional correlation ID for tracing
**Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
- Returns a Promise that resolves to an object containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
- `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads`
2. Iterate through each payload in `payloads` array
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`.
### Python/Micropython Implementation
#### Dependencies
- `nats-python` - Core NATS functionality
- `pyarrow` - Arrow IPC serialization
- `uuid` - Correlation ID and message ID generation
- `base64` - Base64 encoding/decoding
- `requests` or `aiohttp` - HTTP client for file server
#### smartsend Function
```python
async def smartsend(
subject: str,
data: List[Tuple[str, Any, str]], # List of (dataname, data, type) tuples
options: Dict = {}
)
```
**Options:**
- `broker_url` (str) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (str) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (int) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (str) - Optional correlation ID for tracing
- `msg_purpose` (str) - Purpose of the message (default: `"chat"`)
- `sender_name` (str) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (str) - Message receiver name (default: `""`)
- `receiver_id` (str) - Message receiver ID (default: `""`)
- `reply_to` (str) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (str) - Message ID this message is replying to (default: `""`)
- `fileserver_upload_handler` (Callable) - Custom upload handler function
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env` - The envelope dictionary containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Flow:**
1. Generate correlation ID and message ID if not provided
2. Iterate through the list of `(dataname, data, type)` tuples
3. For each payload:
- Serialize based on payload type
- Check payload size
- If < threshold: Base64 encode and include in envelope
- If >= threshold: Upload to HTTP server, store URL in envelope
4. Publish the JSON envelope to NATS
5. Return envelope dictionary and JSON string
#### smartreceive Handler
```python
async def smartreceive(
msg: NATS.Message,
options: Dict = {}
)
```
**Options:**
- `fileserver_download_handler` (Callable) - Custom download handler function
- `max_retries` (int) - Maximum retry attempts for fetching URL (default: `5`)
- `base_delay` (int) - Initial delay for exponential backoff in ms (default: `100`)
- `max_delay` (int) - Maximum delay for exponential backoff in ms (default: `5000`)
- `correlation_id` (str) - Optional correlation ID for tracing
**Output Format:**
- Returns a dictionary containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of tuples, each containing `(dataname, data, payload_type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads` list
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str)` and returns `bytes`.
## Scenario Implementations
### Scenario 1: Command & Control (Small Dictionary)
@@ -690,7 +824,7 @@ async function smartreceive(msg, options = {})
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components across all platforms.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
## Performance Considerations

View File

@@ -19,49 +19,102 @@ NATSBridge is implemented in three languages, each providing the same API:
| **JavaScript** | [`src/NATSBridge.js`](../src/NATSBridge.js) | JavaScript implementation for Node.js and browsers |
| **Python/Micropython** | [`src/nats_bridge.py`](../src/nats_bridge.py) | Python implementation for desktop and microcontrollers |
### Multi-Payload Support
### File Server Handler Architecture
The implementation uses a **standardized list-of-tuples format** for all payload operations. **Even when sending a single payload, the user must wrap it in a list.**
The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
**Handler Function Signatures:**
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
```
This design allows the system to support multiple file server backends without changing the core messaging logic.
### Multi-Payload Support (Standard API)
The system uses a **standardized list-of-tuples format** for all payload operations. **Even when sending a single payload, the user must wrap it in a list.**
**API Standard:**
```julia
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# {
# "correlationId": "...",
# "msgId": "...",
# "correlation_id": "...",
# "msg_id": "...",
# "timestamp": "...",
# "sendTo": "...",
# "msgPurpose": "...",
# "senderName": "...",
# "senderId": "...",
# "receiverName": "...",
# "receiverId": "...",
# "replyTo": "...",
# "replyToMsgId": "...",
# "brokerURL": "...",
# "send_to": "...",
# "msg_purpose": "...",
# "sender_name": "...",
# "sender_id": "...",
# "receiver_name": "...",
# "receiver_id": "...",
# "reply_to": "...",
# "reply_to_msg_id": "...",
# "broker_url": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
```
Where `type` can be: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Supported Types:**
- `"text"` - Plain text
- `"dictionary"` - JSON-serializable dictionaries (Dict, NamedTuple)
- `"table"` - Tabular data (DataFrame, array of structs)
- `"image"` - Image data (Bitmap, PNG/JPG bytes)
- `"audio"` - Audio data (WAV, MP3 bytes)
- `"video"` - Video data (MP4, AVI bytes)
- `"binary"` - Generic binary data (Vector{UInt8})
This design allows per-payload type specification, enabling **mixed-content messages** where different payloads can use different serialization formats in a single message.
**Examples:**
```julia
# Single payload - still wrapped in a list (type is required as third element)
smartsend("/test", [(dataname1, data1, "text")], ...)
# Single payload - still wrapped in a list
smartsend(
"/test",
[("dataname1", data1, "dictionary")], # List with one tuple (data, type)
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads in one message (each payload has its own type)
smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], ...)
# Multiple payloads in one message with different types
smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Mixed content (e.g., chat with text, image, audio)
smartsend(
"/chat",
[
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, ...)
# envelope["payloads"] = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...]
# envelope["correlationId"], envelope["msgId"], etc.
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
```
## Cross-Platform Interoperability
@@ -98,14 +151,14 @@ NATSBridge is designed for seamless communication between Julia, JavaScript, and
# Julia sender
using NATSBridge
data = [("message", "Hello from Julia!", "text")]
smartsend("/cross_platform", data, nats_url="nats://localhost:4222")
smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
```
```javascript
// JavaScript receiver
const { smartreceive } = require('./src/NATSBridge');
const envelope = await smartreceive(msg);
// envelope.payloads[0].data === "Hello from Julia!"
const env = await smartreceive(msg);
// env.payloads[0].data === "Hello from Julia!"
```
```python
@@ -146,15 +199,31 @@ All three implementations (Julia, JavaScript, Python/Micropython) follow the sam
└─────────────────┘ └─────────────────┘
```
## Files
## smartsend Return Value
The `smartsend` function now returns a tuple containing both the envelope object and the JSON string representation:
```julia
env, env_json_str = smartsend(...)
# env::msg_envelope_v1 - The envelope object with all metadata and payloads
# env_json_str::String - JSON string for publishing to NATS
```
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
This enables two use cases:
1. **Programmatic envelope access**: Access envelope fields directly via the `env` object
2. **Direct JSON publishing**: Publish the JSON string directly using NATS request-reply pattern
### Julia Module: [`src/NATSBridge.jl`](../src/NATSBridge.jl)
The Julia implementation provides:
- **[`MessageEnvelope`](src/NATSBridge.jl)**: Struct for the unified JSON envelope
- **[`SmartSend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size
- **[`SmartReceive()`](src/NATSBridge.jl)**: Handles both direct and link transport
- **[`msg_envelope_v1`](src/NATSBridge.jl)**: Struct for the unified JSON envelope
- **[`msg_payload_v1`](src/NATSBridge.jl)**: Struct for individual payload representation
- **[`smartsend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size
- **[`smartreceive()`](src/NATSBridge.jl)**: Handles both direct and link transport
### JavaScript Module: [`src/NATSBridge.js`](../src/NATSBridge.js)
@@ -247,7 +316,53 @@ node test/scenario3_julia_to_julia.js
## Usage
### Scenario 0: Basic Multi-Payload Example
### Scenario 1: Command & Control (Small Dictionary)
**Focus:** Sending small dictionary configurations across platforms. This is the simplest use case for command and control scenarios.
**Julia (Sender/Receiver):**
```julia
using NATSBridge
# Subscribe to control subject
# Parse JSON envelope
# Execute simulation with parameters
# Send acknowledgment
```
**JavaScript (Sender/Receiver):**
```javascript
const { smartsend } = require('./src/NATSBridge');
// Create small dictionary config
// Send via smartsend with type="dictionary"
const config = {
step_size: 0.01,
iterations: 1000,
threshold: 0.5
};
await smartsend("control", [
{ dataname: "config", data: config, type: "dictionary" }
]);
```
**Python/Micropython (Sender/Receiver):**
```python
from nats_bridge import smartsend
# Create small dictionary config
# Send via smartsend with type="dictionary"
config = {
"step_size": 0.01,
"iterations": 1000,
"threshold": 0.5
}
smartsend("control", [("config", config, "dictionary")])
```
### Basic Multi-Payload Example
#### Python/Micropython (Sender)
```python
@@ -262,16 +377,16 @@ smartsend(
)
# Even single payload must be wrapped in a list with type
smartsend("/test", [("single_data", mydata, "dictionary")])
smartsend("/test", [("single_data", mydata, "dictionary")], nats_url="nats://localhost:4222")
```
#### Python/Micropython (Receiver)
```python
from nats_bridge import smartreceive
# Receive returns a list of (dataname, data, type) tuples
payloads = smartreceive(msg)
# payloads = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
# Receive returns a dictionary with envelope metadata and payloads field
env = smartreceive(msg)
# env["payloads"] = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
```
#### JavaScript (Sender)
@@ -315,18 +430,18 @@ const nc = await connect({ servers: ['nats://localhost:4222'] });
const sub = nc.subscribe("control");
for await (const msg of sub) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process the payloads from the envelope
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
const { dataname, data, type } = payload;
console.log(`Received ${dataname} of type ${type}`);
console.log(`Data: ${JSON.stringify(data)}`);
}
// Also access envelope metadata
console.log(`Correlation ID: ${envelope.correlationId}`);
console.log(`Message ID: ${envelope.msgId}`);
console.log(`Correlation ID: ${env.correlation_id}`);
console.log(`Message ID: ${env.msg_id}`);
}
```
@@ -344,19 +459,21 @@ df = DataFrame(
category = rand(["A", "B", "C"], 10_000_000)
)
# Send via SmartSend - wrapped in a list (type is part of each tuple)
await SmartSend("analysis_results", [("table_data", df, "table")]);
# Send via smartsend - wrapped in a list (type is part of each tuple)
env, env_json_str = smartsend("analysis_results", [("table_data", df, "table")], broker_url="nats://localhost:4222")
# env: msg_envelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
```
#### JavaScript (Receiver)
```javascript
const { smartreceive } = require('./src/NATSBridge');
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Use table data from the payloads field
// Note: Tables are sent as arrays of objects in JavaScript
const table = envelope.payloads;
const table = env.payloads;
```
### Scenario 3: Live Binary Processing
@@ -406,10 +523,10 @@ from nats_bridge import smartreceive
# Receive binary data
def process_binary(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process the binary data from envelope.payloads
for dataname, data, type in envelope["payloads"]:
# Process the binary data from env.payloads
for dataname, data, type in env["payloads"]:
if type == "binary":
# data is bytes
print(f"Received binary data: {dataname}, size: {len(data)}")
@@ -422,10 +539,10 @@ const { smartreceive } = require('./src/NATSBridge');
// Receive binary data
function process_binary(msg) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process the binary data from envelope.payloads
for (const payload of envelope.payloads) {
// Process the binary data from env.payloads
for (const payload of env.payloads) {
if (payload.type === "binary") {
// data is an ArrayBuffer or Uint8Array
console.log(`Received binary data: ${payload.dataname}, size: ${payload.data.length}`);
@@ -444,7 +561,7 @@ using NATSBridge
function publish_health_status(nats_url)
# Send status wrapped in a list (type is part of each tuple)
status = Dict("cpu" => rand(), "memory" => rand())
smartsend("health", [("status", status, "dictionary")], nats_url=nats_url)
smartsend("health", [("status", status, "dictionary")], broker_url=nats_url)
sleep(5) # Every 5 seconds
end
```
@@ -466,8 +583,8 @@ const consumer = await js.pullSubscribe("health", {
// Process historical and real-time messages
for await (const msg of consumer) {
const envelope = await smartreceive(msg);
// envelope.payloads contains the list of payloads
const env = await smartreceive(msg);
// env.payloads contains the list of payloads
// Each payload has: dataname, data, type
msg.ack();
}
@@ -484,10 +601,10 @@ import json
# Device configuration handler
def handle_device_config(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process configuration from payloads
for dataname, data, type in envelope["payloads"]:
for dataname, data, type in env["payloads"]:
if type == "dictionary":
print(f"Received configuration: {data}")
# Apply configuration to device
@@ -506,7 +623,7 @@ def handle_device_config(msg):
"device/response",
[("config", config, "dictionary")],
nats_url="nats://localhost:4222",
reply_to=envelope.get("replyTo")
reply_to=env.get("reply_to")
)
```
@@ -566,11 +683,11 @@ smartsend(
const { smartreceive, smartsend } = require('./src/NATSBridge');
// Receive NATS message with direct transport
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Decode Base64 payload (for direct transport)
// For tables, data is in envelope.payloads
const table = envelope.payloads; // Array of objects
// For tables, data is in env.payloads
const table = env.payloads; // Array of objects
// User makes selection
const selection = uiComponent.getSelectedOption();
@@ -619,7 +736,7 @@ chat_message = [
smartsend(
"chat.room123",
chat_message,
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
msg_purpose="chat",
reply_to="chat.room123.responses"
)
@@ -667,7 +784,7 @@ await smartsend("chat.room123", message);
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
## Configuration
@@ -683,19 +800,19 @@ await smartsend("chat.room123", message);
```json
{
"correlationId": "uuid-v4-string",
"msgId": "uuid-v4-string",
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"sendTo": "topic/subject",
"msgPurpose": "ACK | NACK | updateStatus | shutdown | chat",
"senderName": "agent-wine-web-frontend",
"senderId": "uuid4",
"receiverName": "agent-backend",
"receiverId": "uuid4",
"replyTo": "topic",
"replyToMsgId": "uuid4",
"BrokerURL": "nats://localhost:4222",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
"content_type": "application/octet-stream",
@@ -706,7 +823,7 @@ await smartsend("chat.room123", message);
{
"id": "uuid4",
"dataname": "login_image",
"type": "image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,

View File

@@ -107,10 +107,15 @@ python3 -m http.server 8080 --directory /tmp/fileserver
```python
from nats_bridge import smartsend
# Send a text message
# Send a text message (is_publish=True by default)
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
print("Message sent!")
# Or use is_publish=False to get envelope and JSON without publishing
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222", is_publish=False)
# env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS
```
#### JavaScript
@@ -118,12 +123,19 @@ print("Message sent!")
```javascript
const { smartsend } = require('./src/NATSBridge');
// Send a text message
// Send a text message (isPublish=true by default)
await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
console.log("Message sent!");
// Or use isPublish=false to get envelope and JSON without publishing
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222", isPublish: false });
// env: MessageEnvelope object
// env_json_str: JSON string for publishing to NATS
```
#### Julia
@@ -133,7 +145,9 @@ using NATSBridge
# Send a text message
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# env: msgEnvelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
println("Message sent!")
```
@@ -145,8 +159,8 @@ println("Message sent!")
from nats_bridge import smartreceive
# Receive and process message
envelope = smartreceive(msg)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg)
for dataname, data, type in env["payloads"]:
print(f"Received {dataname}: {data}")
```
@@ -156,8 +170,8 @@ for dataname, data, type in envelope["payloads"]:
const { smartreceive } = require('./src/NATSBridge');
// Receive and process message
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
console.log(`Received ${payload.dataname}: ${payload.data}`);
}
```
@@ -168,8 +182,8 @@ for (const payload of envelope.payloads) {
using NATSBridge
# Receive and process message
envelope = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data")
end
```
@@ -194,7 +208,7 @@ config = {
# Send as dictionary type
data = [("config", config, "dictionary")]
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/device/config", data, nats_url="nats://localhost:4222")
```
#### JavaScript
@@ -208,7 +222,7 @@ const config = {
update_interval: 60
};
await smartsend("/device/config", [
const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" }
]);
```
@@ -225,7 +239,7 @@ config = Dict(
)
data = [("config", config, "dictionary")]
smartsend("/device/config", data)
env, env_json_str = smartsend("/device/config", data)
```
### Example 2: Sending Binary Data (Image)
@@ -241,7 +255,7 @@ with open("image.png", "rb") as f:
# Send as binary type
data = [("user_image", image_data, "binary")]
env = smartsend("/chat/image", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/image", data, nats_url="nats://localhost:4222")
```
#### JavaScript
@@ -253,7 +267,7 @@ const { smartsend } = require('./src/NATSBridge');
const fs = require('fs');
const image_data = fs.readFileSync('image.png');
await smartsend("/chat/image", [
const { env, env_json_str } = await smartsend("/chat/image", [
{ dataname: "user_image", data: image_data, type: "binary" }
]);
```
@@ -267,7 +281,7 @@ using NATSBridge
image_data = read("image.png")
data = [("user_image", image_data, "binary")]
smartsend("/chat/image", data)
env, env_json_str = smartsend("/chat/image", data)
```
### Example 3: Request-Response Pattern
@@ -279,13 +293,15 @@ from nats_bridge import smartsend
# Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend(
env, env_json_str = smartsend(
"/device/command",
data,
nats_url="nats://localhost:4222",
reply_to="/device/response",
reply_to_msg_id="cmd-001"
)
# env: msgEnvelope_v1 object
# env_json_str: JSON string for publishing to NATS
```
#### JavaScript (Responder)
@@ -297,10 +313,10 @@ const { smartreceive, smartsend } = require('./src/NATSBridge');
const sub = nc.subscribe("/device/command");
for await (const msg of sub) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process command
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.dataname === "command") {
const command = payload.data;
@@ -315,8 +331,8 @@ for await (const msg of sub) {
await smartsend("/device/response", [
{ dataname: "sensor_data", data: response, type: "dictionary" }
], {
reply_to: envelope.replyTo,
reply_to_msg_id: envelope.msgId
reply_to: env.replyTo,
reply_to_msg_id: env.msgId
});
}
}
@@ -342,7 +358,7 @@ import os
large_data = os.urandom(2_000_000) # 2MB of random data
# Send with file server URL
env = smartsend(
env, env_json_str = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
nats_url="nats://localhost:4222",
@@ -364,7 +380,7 @@ const largeData = new ArrayBuffer(2_000_000);
const view = new Uint8Array(largeData);
view.fill(42); // Fill with some data
await smartsend("/data/large", [
const { env, env_json_str } = await smartsend("/data/large", [
{ dataname: "large_file", data: largeData, type: "binary" }
], {
fileserverUrl: "http://localhost:8080",
@@ -380,7 +396,7 @@ using NATSBridge
# Create large data (> 1MB)
large_data = rand(UInt8, 2_000_000)
env = smartsend(
env, env_json_str = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
fileserver_url="http://localhost:8080"
@@ -409,7 +425,7 @@ data = [
("user_avatar", image_data, "image")
]
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
```
#### JavaScript
@@ -419,7 +435,7 @@ const { smartsend } = require('./src/NATSBridge');
const fs = require('fs');
await smartsend("/chat/mixed", [
const { env, env_json_str } = await smartsend("/chat/mixed", [
{
dataname: "message_text",
data: "Hello with image!",
@@ -445,7 +461,7 @@ data = [
("user_avatar", image_data, "image")
]
smartsend("/chat/mixed", data)
env, env_json_str = smartsend("/chat/mixed", data)
```
### Example 6: Table Data (Arrow IPC)
@@ -467,7 +483,7 @@ df = pd.DataFrame({
# Send as table type
data = [("students", df, "table")]
env = smartsend("/data/students", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/data/students", data, nats_url="nats://localhost:4222")
```
#### Julia
@@ -484,7 +500,7 @@ df = DataFrame(
)
data = [("students", df, "table")]
smartsend("/data/students", data)
env, env_json_str = smartsend("/data/students", data)
```
---
@@ -503,7 +519,7 @@ using NATSBridge
# Send dictionary from Julia to JavaScript
config = Dict("step_size" => 0.01, "iterations" => 1000)
data = [("config", config, "dictionary")]
smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
```
#### JavaScript Receiver
@@ -512,8 +528,8 @@ smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
const { smartreceive } = require('./src/NATSBridge');
// Receive dictionary from Julia
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
if (payload.type === "dictionary") {
console.log("Received config:", payload.data);
// payload.data = { step_size: 0.01, iterations: 1000 }
@@ -528,7 +544,7 @@ for (const payload of envelope.payloads) {
```javascript
const { smartsend } = require('./src/NATSBridge');
await smartsend("/data/transfer", [
const { env, env_json_str } = await smartsend("/data/transfer", [
{ dataname: "message", data: "Hello from JS!", type: "text" }
]);
```
@@ -538,8 +554,8 @@ await smartsend("/data/transfer", [
```python
from nats_bridge import smartreceive
envelope = smartreceive(msg)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg)
for dataname, data, type in env["payloads"]:
if type == "text":
print(f"Received from JS: {data}")
```
@@ -552,7 +568,7 @@ for dataname, data, type in envelope["payloads"]:
from nats_bridge import smartsend
data = [("message", "Hello from Python!", "text")]
smartsend("/chat/python", data)
env, env_json_str = smartsend("/chat/python", data)
```
#### Julia Receiver
@@ -560,8 +576,8 @@ smartsend("/chat/python", data)
```julia
using NATSBridge
envelope = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
if type == "text"
println("Received from Python: $data")
end

View File

@@ -132,7 +132,7 @@ class ChatUI {
});
}
await smartsend(
const { env, env_json_str } = await smartsend(
`/chat/${this.currentRoom}`,
data,
{
@@ -216,15 +216,15 @@ class ChatHandler {
}
async handleMessage(msg) {
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.downloadFile.bind(this)
});
// Extract sender info from envelope
const sender = envelope.senderName || 'Anonymous';
const sender = env.senderName || 'Anonymous';
// Process each payload
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'text') {
this.ui.addMessage(sender, payload.data);
} else if (payload.type === 'image') {
@@ -304,7 +304,7 @@ class FileUploadService {
type: 'binary'
}];
const envelope = await smartsend(
const { env, env_json_str } = await smartsend(
`/files/${recipient}`,
data,
{
@@ -314,7 +314,7 @@ class FileUploadService {
}
);
return envelope;
return env;
}
async uploadLargeFile(filePath, recipient) {
@@ -356,12 +356,12 @@ class FileDownloadService {
async downloadFile(sender, downloadId) {
// Subscribe to sender's file channel
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
});
// Process each payload
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'binary') {
const filePath = `/downloads/${payload.dataname}`;
fs.writeFileSync(filePath, payload.data);
@@ -422,9 +422,9 @@ async function uploadFile(config) {
const fileService = new FileUploadService(config.nats_url, config.fileserver_url);
try {
const envelope = await fileService.uploadFile(filePath, recipient);
const env = await fileService.uploadFile(filePath, recipient);
console.log('Upload successful!');
console.log(`File ID: ${envelope.payloads[0].id}`);
console.log(`File ID: ${env.payloads[0].id}`);
} catch (error) {
console.error('Upload failed:', error.message);
}
@@ -514,6 +514,7 @@ class SensorSender:
data = [("reading", reading.to_dict(), "dictionary")]
# Default: is_publish=True (automatically publishes to NATS)
smartsend(
f"/sensors/{sensor_id}",
data,
@@ -521,6 +522,31 @@ class SensorSender:
fileserver_url=self.fileserver_url
)
def prepare_message_only(self, sensor_id: str, value: float, unit: str):
"""Prepare a message without publishing (is_publish=False)."""
reading = SensorReading(
sensor_id=sensor_id,
timestamp=datetime.now().isoformat(),
value=value,
unit=unit
)
data = [("reading", reading.to_dict(), "dictionary")]
# With is_publish=False, returns (env, env_json_str) without publishing
env, env_json_str = smartsend(
f"/sensors/{sensor_id}/prepare",
data,
nats_url=self.nats_url,
fileserver_url=self.fileserver_url,
is_publish=False
)
# Now you can publish manually using NATS request-reply pattern
# nc.request(subject, env_json_str, reply_to=reply_to_topic)
return env, env_json_str
def send_batch(self, readings: List[SensorReading]):
batch = SensorBatch()
for reading in readings:
@@ -571,9 +597,9 @@ class SensorReceiver:
self.fileserver_download_handler = fileserver_download_handler
def process_reading(self, msg):
envelope = smartreceive(msg, self.fileserver_download_handler)
env = smartreceive(msg, self.fileserver_download_handler)
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if data_type == "dictionary":
reading = SensorReading(
sensor_id=data["sensor_id"],
@@ -673,10 +699,10 @@ class DeviceBridge:
# Poll for messages
msg = self._poll_for_message()
if msg:
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process payloads
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if dataname == "command":
callback(data)
@@ -772,9 +798,9 @@ class DashboardServer:
def receive_selection(self, callback):
def handler(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if data_type == "dictionary":
callback(data)
@@ -807,7 +833,7 @@ class DashboardUI {
async refreshData() {
// Request fresh data
await smartsend("/dashboard/request", [
const { env, env_json_str } = await smartsend("/dashboard/request", [
{ dataname: "request", data: { type: "refresh" }, type: "dictionary" }
], {
fileserverUrl: window.config.fileserver_url
@@ -816,12 +842,12 @@ class DashboardUI {
async fetchData() {
// Subscribe to data updates
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
});
// Process table data
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'table') {
// Deserialize Arrow IPC
this.data = this.deserializeArrow(payload.data);

View File

@@ -0,0 +1,14 @@
services:
plik:
image: rootgg/plik:latest
container_name: plik-server
restart: unless-stopped
ports:
- "8080:8080"
volumes:
# # Mount the config file (created below)
# - ./plikd.cfg:/home/plik/server/plikd.cfg
# Mount local folder for uploads and database
- ./plik-data:/data
# Set user to match your host UID to avoid permission issues
user: "1000:1000"

View File

@@ -12,10 +12,10 @@
#
# ```jldoctest
# # Upload handler - uploads data to file server and returns URL
# fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# fileserver_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
#
# # Download handler - fetches data from file server URL with exponential backoff
# fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
# fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
# ```
#
# Multi-Payload Support (Standard API):
@@ -35,24 +35,23 @@
module NATSBridge
using Revise
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting
# ---------------------------------------------- 100 --------------------------------------------- #
# Constants
const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport
const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL
const DEFAULT_BROKER_URL = "nats://localhost:4222" # Default NATS server URL
const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport
""" msgPayload_v1 - Internal message payload structure
""" msg_payload_v1 - Internal message payload structure
This structure represents a single payload within a NATS message envelope.
It supports both direct transport (base64-encoded data) and link transport (URL-based).
# Arguments:
- `id::String` - Unique identifier for this payload (e.g., "uuid4")
- `dataname::String` - Name of the payload (e.g., "login_image")
- `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
- `payload_type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
- `transport::String` - Transport method: "direct" or "link"
- `encoding::String` - Encoding method: "none", "json", "base64", "arrow-ipc"
- `size::Integer` - Size of the payload in bytes (e.g., 15433)
@@ -68,14 +67,14 @@ It supports both direct transport (base64-encoded data) and link transport (URL-
- `metadata::Dict{String, T} = Dict{String, Any}()` - Metadata dictionary
# Return:
- A msgPayload_v1 struct instance
- A msg_payload_v1 struct instance
# Example
```jldoctest
using UUIDs
# Create a direct transport payload
payload = msgPayload_v1(
payload = msg_payload_v1(
"Hello World",
"text";
id = string(uuid4()),
@@ -87,7 +86,7 @@ payload = msgPayload_v1(
)
# Create a link transport payload
payload = msgPayload_v1(
payload = msg_payload_v1(
"http://example.com/file.zip",
"binary";
id = string(uuid4()),
@@ -98,21 +97,21 @@ payload = msgPayload_v1(
)
```
"""
struct msgPayload_v1
struct msg_payload_v1
id::String # id of this payload e.g. "uuid4"
dataname::String # name of this payload e.g. "login_image"
type::String # this payload type. Can be "text | dictionary | table | image | audio | video | binary"
transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc"
payload_type::String # this payload type. Can be "text", "dictionary", "table", "image", "audio", "video", "binary"
transport::String # transport method: "direct" or "link"
encoding::String # encoding method: "none", "json", "base64", "arrow-ipc"
size::Integer # data size in bytes e.g. 15433
data::Any # payload data in case of direct transport or a URL in case of link
metadata::Dict{String, Any} # Dict("checksum" => "sha256_hash", ...) This metadata is for this payload
end
# constructor
function msgPayload_v1(
function msg_payload_v1(
data::Any,
type::String;
payload_type::String;
id::String = "",
dataname::String = string(uuid4()),
transport::String = "direct",
@@ -120,10 +119,10 @@ function msgPayload_v1(
size::Integer = 0,
metadata::Dict{String, T} = Dict{String, Any}()
) where {T<:Any}
return msgPayload_v1(
return msg_payload_v1(
id,
dataname,
type,
payload_type,
transport,
encoding,
size,
@@ -133,101 +132,101 @@ function msgPayload_v1(
end
""" msgEnvelope_v1 - Internal message envelope structure
""" msg_envelope_v1 - Internal message envelope structure
This structure represents a complete NATS message envelope containing multiple payloads
with metadata for routing, tracing, and message context.
# Arguments:
- `sendTo::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt")
- `payloads::AbstractArray{msgPayload_v1}` - List of payloads to include in the message
- `send_to::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt")
- `payloads::Vector{msg_payload_v1}` - List of payloads to include in the message
# Keyword Arguments:
- `correlationId::String = ""` - Unique identifier to track messages across systems; auto-generated if empty
- `msgId::String = ""` - Unique message identifier; auto-generated if empty
- `correlation_id::String = ""` - Unique identifier to track messages across systems; auto-generated if empty
- `msg_id::String = ""` - Unique message identifier; auto-generated if empty
- `timestamp::String = string(Dates.now())` - Message publication timestamp
- `msgPurpose::String = ""` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
- `senderName::String = ""` - Name of the sender (e.g., "agent-wine-web-frontend")
- `senderId::String = ""` - UUID of the sender; auto-generated if empty
- `receiverName::String = ""` - Name of the receiver (empty string means broadcast)
- `receiverId::String = ""` - UUID of the receiver (empty string means broadcast)
- `replyTo::String = ""` - Topic where receiver should reply (empty string if no reply expected)
- `replyToMsgId::String = ""` - Message ID this message is replying to
- `brokerURL::String = DEFAULT_NATS_URL` - NATS broker URL
- `msg_purpose::String = ""` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
- `sender_name::String = ""` - Name of the sender (e.g., "agent-wine-web-frontend")
- `sender_id::String = ""` - UUID of the sender; auto-generated if empty
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
- `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast)
- `reply_to::String = ""` - Topic where receiver should reply (empty string if no reply expected)
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
- `broker_url::String = DEFAULT_BROKER_URL` - NATS broker URL
- `metadata::Dict{String, Any} = Dict{String, Any}()` - Optional message-level metadata
# Return:
- A msgEnvelope_v1 struct instance
- A msg_envelope_v1 struct instance
# Example
```jldoctest
using UUIDs, NATSBridge
# Create payloads for the message
payload1 = msgPayload_v1("Hello", "text"; dataname="message", transport="direct", encoding="base64")
payload2 = msgPayload_v1("http://example.com/file.zip", "binary"; dataname="file", transport="link")
payload1 = msg_payload_v1("Hello", "text"; dataname="message", transport="direct", encoding="base64")
payload2 = msg_payload_v1("http://example.com/file.zip", "binary"; dataname="file", transport="link")
# Create message envelope
env = msgEnvelope_v1(
env = msg_envelope_v1(
"my.subject",
[payload1, payload2];
correlationId = string(uuid4()),
msgPurpose = "chat",
senderName = "my-app",
receiverName = "receiver-app",
replyTo = "reply.subject"
correlation_id = string(uuid4()),
msg_purpose = "chat",
sender_name = "my-app",
receiver_name = "receiver-app",
reply_to = "reply.subject"
)
```
"""
struct msgEnvelope_v1
correlationId::String # Unique identifier to track messages across systems. Many senders can talk about the same topic.
msgId::String # this message id
timestamp::String # message published timestamp. string(Dates.now())
struct msg_envelope_v1
correlation_id::String # Unique identifier to track messages across systems. Many senders can talk about the same topic.
msg_id::String # this message id
timestamp::String # message published timestamp (string(Dates.now()))
sendTo::String # topic/subject the sender sends to e.g. "/agent/wine/api/v1/prompt"
msgPurpose::String # purpose of this message e.g. "ACK | NACK | updateStatus | shutdown | ..."
senderName::String # sender name (String) e.g. "agent-wine-web-frontend"
senderId::String # sender id e.g. uuid4snakecase()
receiverName::String # msg receiver name (String) e.g. "agent-backend"
receiverId::String # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
send_to::String # topic/subject the sender sends to e.g. "/agent/wine/api/v1/prompt"
msg_purpose::String # purpose of this message e.g. "ACK", "NACK", "updateStatus", "shutdown", ...
sender_name::String # sender name (String) e.g. "agent-wine-web-frontend"
sender_id::String # sender id e.g. uuid4()
receiver_name::String # msg receiver name (String) e.g. "agent-backend"
receiver_id::String # msg receiver id, nothing means everyone in the topic e.g. uuid4()
replyTo::String # sender ask receiver to reply to this topic
replyToMsgId::String # the message id this message is replying to
brokerURL::String # mqtt/NATS server address
reply_to::String # sender ask receiver to reply to this topic
reply_to_msg_id::String # the message id this message is replying to
broker_url::String # NATS server address
metadata::Dict{String, Any}
payloads::AbstractArray{msgPayload_v1} # multiple payload store here
payloads::Vector{msg_payload_v1} # multiple payload store here
end
# constructor
function msgEnvelope_v1(
sendTo::String,
payloads::AbstractArray{msgPayload_v1};
correlationId::String = "",
msgId::String = "",
function msg_envelope_v1(
send_to::String,
payloads::Vector{msg_payload_v1};
correlation_id::String = "",
msg_id::String = "",
timestamp::String = string(Dates.now()),
msgPurpose::String = "",
senderName::String = "",
senderId::String = "",
receiverName::String = "",
receiverId::String = "",
replyTo::String = "",
replyToMsgId::String = "",
brokerURL::String = DEFAULT_NATS_URL,
msg_purpose::String = "",
sender_name::String = "",
sender_id::String = "",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
broker_url::String = DEFAULT_BROKER_URL,
metadata::Dict{String, Any} = Dict{String, Any}()
)
return msgEnvelope_v1(
correlationId,
msgId,
return msg_envelope_v1(
correlation_id,
msg_id,
timestamp,
sendTo,
msgPurpose,
senderName,
senderId,
receiverName,
receiverId,
replyTo,
replyToMsgId,
brokerURL,
send_to,
msg_purpose,
sender_name,
sender_id,
receiver_name,
receiver_id,
reply_to,
reply_to_msg_id,
broker_url,
metadata,
payloads
)
@@ -235,19 +234,19 @@ end
""" envelope_to_json - Convert msgEnvelope_v1 to JSON string
This function converts the msgEnvelope_v1 struct to a JSON string representation,
""" envelope_to_json - Convert msg_envelope_v1 to JSON string
This function converts the msg_envelope_v1 struct to a JSON string representation,
preserving all metadata and payload information for NATS message publishing.
# Function Workflow:
1. Creates a dictionary with envelope metadata (correlationId, msgId, timestamp, etc.)
1. Creates a dictionary with envelope metadata (correlation_id, msg_id, timestamp, etc.)
2. Conditionally includes metadata dictionary if not empty
3. Iterates through payloads and converts each to JSON-compatible dictionary
4. Handles direct transport payloads (Base64 encoding) and link transport payloads (URL)
5. Returns final JSON string representation
# Arguments:
- `env::msgEnvelope_v1` - The msgEnvelope_v1 struct to convert to JSON
- `env::msg_envelope_v1` - The msg_envelope_v1 struct to convert to JSON
# Return:
- `String` - JSON string representation of the envelope
@@ -257,27 +256,27 @@ preserving all metadata and payload information for NATS message publishing.
using UUIDs
# Create an envelope with payloads
payload = msgPayload_v1("Hello", "text"; dataname="msg", transport="direct", encoding="base64")
env = msgEnvelope_v1("my.subject", [payload])
payload = msg_payload_v1("Hello", "text"; dataname="msg", transport="direct", encoding="base64")
env = msg_envelope_v1("my.subject", [payload])
# Convert to JSON for publishing
json_msg = envelope_to_json(env)
```
"""
function envelope_to_json(env::msgEnvelope_v1)
function envelope_to_json(env::msg_envelope_v1)
obj = Dict{String, Any}(
"correlationId" => env.correlationId,
"msgId" => env.msgId,
"correlation_id" => env.correlation_id,
"msg_id" => env.msg_id,
"timestamp" => env.timestamp,
"sendTo" => env.sendTo,
"msgPurpose" => env.msgPurpose,
"senderName" => env.senderName,
"senderId" => env.senderId,
"receiverName" => env.receiverName,
"receiverId" => env.receiverId,
"replyTo" => env.replyTo,
"replyToMsgId" => env.replyToMsgId,
"brokerURL" => env.brokerURL
"send_to" => env.send_to,
"msg_purpose" => env.msg_purpose,
"sender_name" => env.sender_name,
"sender_id" => env.sender_id,
"receiver_name" => env.receiver_name,
"receiver_id" => env.receiver_id,
"reply_to" => env.reply_to,
"reply_to_msg_id" => env.reply_to_msg_id,
"broker_url" => env.broker_url
)
if !isempty(env.metadata) # Only include metadata if it exists and is not empty
@@ -291,7 +290,7 @@ function envelope_to_json(env::msgEnvelope_v1)
payload_obj = Dict{String, Any}(
"id" => payload.id,
"dataname" => payload.dataname,
"type" => payload.type,
"payload_type" => payload.payload_type,
"transport" => payload.transport,
"encoding" => payload.encoding,
"size" => payload.size,
@@ -359,20 +358,22 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
1. Iterates through the list of (dataname, data, type) tuples
2. For each payload: extracts the type from the tuple and serializes accordingly
3. Compares the serialized size against `size_threshold`
4. For small payloads: encodes as Base64, constructs a "direct" msgPayload_v1
5. For large payloads: uploads to the fileserver, constructs a "link" msgPayload_v1 with the URL
4. For small payloads: encodes as Base64, constructs a "direct" msg_payload_v1
5. For large payloads: uploads to the fileserver, constructs a "link" msg_payload_v1 with the URL
6. Converts envelope to JSON string and optionally publishes to NATS
# Arguments:
- `subject::String` - NATS subject to publish the message to
- `data::AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples to send
- `dataname::String` - Name of the payload
- `data::Any` - The actual data to send
- `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
- `payload_type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
- No standalone `type` parameter - type is specified per payload
# Keyword Arguments:
- `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server
- `fileserverUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
- `broker_url::String = DEFAULT_BROKER_URL` - URL of the NATS server
- `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads
- `fileserver_upload_handler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
@@ -381,9 +382,12 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
- `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast)
- `reply_to::String = ""` - Topic to reply to (empty string if no reply expected)
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
- `is_publish::Bool = true` - Whether to automatically publish the message to NATS
# Return:
- A `msgEnvelope_v1` object containing metadata and transport information
- A tuple `(env, env_json_str)` where:
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
- `env_json_str::String` - JSON string representation of the envelope for publishing
# Example
```jldoctest
@@ -391,31 +395,34 @@ using UUIDs
# Send a single payload (still wrapped in a list)
data = Dict("key" => "value")
env = smartsend("my.subject", [("dataname1", data, "dictionary")])
env, msg_json = smartsend("my.subject", [("dataname1", data, "dictionary")])
# Send multiple payloads in one message with different types
data1 = Dict("key1" => "value1")
data2 = rand(10_000) # Small array
env = smartsend("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")])
env, msg_json = smartsend("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")])
# Send a large array using fileserver upload
data = rand(10_000_000) # ~80 MB
env = smartsend("large.data", [("large_table", data, "table")])
env, msg_json = smartsend("large.data", [("large_table", data, "table")])
# Mixed content (e.g., chat with text and image)
env = smartsend("chat.subject", [
env, msg_json = smartsend("chat.subject", [
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
])
# Publish the JSON string directly using NATS request-reply pattern
# reply = NATS.request(broker_url, subject, env_json_str; reply_to=reply_to_topic)
```
"""
function smartsend(
subject::String, # smartreceive's subject
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples
nats_url::String = DEFAULT_NATS_URL,
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples. Use Tuple{String, Any, String}[] for empty payloads
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
fileserver_url = DEFAULT_FILESERVER_URL,
fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
@@ -423,25 +430,25 @@ function smartsend(
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = ""
reply_to_msg_id::String = "",
is_publish::Bool = true # some time the user want to get env and env_json_str from this function without publishing the msg
) where {T1<:Any}
# Generate correlation ID if not provided
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
# Generate message metadata
msg_id = string(uuid4())
# Process each payload in the list
payloads = msgPayload_v1[]
payloads = msg_payload_v1[]
for (dataname, payload_data, payload_type) in data
# Serialize data based on type
payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = length(payload_bytes) # Calculate payload size in bytes
log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size
log_trace(cid, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size
# Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport
@@ -449,8 +456,8 @@ function smartsend(
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
# Create msgPayload_v1 for direct transport
payload = msgPayload_v1(
# Create msg_payload_v1 for direct transport
payload = msg_payload_v1(
payload_b64,
payload_type;
id = string(uuid4()),
@@ -466,7 +473,7 @@ function smartsend(
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
# Upload to HTTP server
response = fileserverUploadHandler(fileserver_url, dataname, payload_bytes)
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
if response["status"] != 200 # Check if upload was successful
error("Failed to upload data to fileserver: $(response["status"])") # Throw error if upload failed
@@ -475,8 +482,8 @@ function smartsend(
url = response["url"] # URL for the uploaded data
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
# Create msgPayload_v1 for link transport
payload = msgPayload_v1(
# Create msg_payload_v1 for link transport
payload = msg_payload_v1(
url,
payload_type;
id = string(uuid4()),
@@ -490,27 +497,29 @@ function smartsend(
end
end
# Create msgEnvelope_v1 with all payloads
env = msgEnvelope_v1(
# Create msg_envelope_v1 with all payloads
env = msg_envelope_v1(
subject,
payloads;
correlationId = cid,
msgId = msg_id,
msgPurpose = msg_purpose,
senderName = sender_name,
senderId = string(uuid4()),
receiverName = receiver_name,
receiverId = receiver_id,
replyTo = reply_to,
replyToMsgId = reply_to_msg_id,
brokerURL = nats_url,
correlation_id = cid,
msg_id = msg_id,
msg_purpose = msg_purpose,
sender_name = sender_name,
sender_id = string(uuid4()),
receiver_name = receiver_name,
receiver_id = receiver_id,
reply_to = reply_to,
reply_to_msg_id = reply_to_msg_id,
broker_url = broker_url,
metadata = Dict{String, Any}(),
)
msg_json = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
env_json_str = envelope_to_json(env) # Convert envelope to JSON
if is_publish
publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS
end
return env # Return the envelope for tracking
return (env, env_json_str)
end
@@ -528,14 +537,14 @@ It supports multiple serialization formats for different data types.
# Arguments:
- `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`)
- `type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
- `payload_type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
# Return:
- `Vector{UInt8}` - Binary representation of the serialized data
# Throws:
- `Error` if `type` is not one of the supported types
- `Error` if `type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}`
- `Error` if `payload_type` is not one of the supported types
- `Error` if `payload_type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}`
# Example
```jldoctest
@@ -574,7 +583,7 @@ binary_bytes = _serialize_data(buf, "binary")
binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary")
```
"""
function _serialize_data(data::Any, type::String)
function _serialize_data(data::Any, payload_type::String)
""" Example on how JSON.jl convert: dictionary -> json string -> json string bytes -> json string -> json object
d = Dict(
"name"=>"ton",
@@ -591,40 +600,40 @@ function _serialize_data(data::Any, type::String)
json_obj = JSON.parse(json_str_2)
"""
if type == "text" # Text data - convert to UTF-8 bytes
if payload_type == "text" # Text data - convert to UTF-8 bytes
if isa(data, String)
data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes
return data_bytes
else
error("Text data must be a String")
end
elseif type == "dictionary" # JSON data - serialize directly
elseif payload_type == "dictionary" # JSON data - serialize directly
json_str = JSON.json(data) # Convert Julia data to JSON string
json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes
return json_str_bytes
elseif type == "table" # Table data - convert to Arrow IPC stream
elseif payload_type == "table" # Table data - convert to Arrow IPC stream
io = IOBuffer() # Create in-memory buffer
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
return take!(io) # Return the buffer contents as bytes
elseif type == "image" # Image data - treat as binary
elseif payload_type == "image" # Image data - treat as binary
if isa(data, Vector{UInt8})
return data # Return binary data directly
else
error("Image data must be Vector{UInt8}")
end
elseif type == "audio" # Audio data - treat as binary
elseif payload_type == "audio" # Audio data - treat as binary
if isa(data, Vector{UInt8})
return data # Return binary data directly
else
error("Audio data must be Vector{UInt8}")
end
elseif type == "video" # Video data - treat as binary
elseif payload_type == "video" # Video data - treat as binary
if isa(data, Vector{UInt8})
return data # Return binary data directly
else
error("Video data must be Vector{UInt8}")
end
elseif type == "binary" # Binary data - treat as binary
elseif payload_type == "binary" # Binary data - treat as binary
if isa(data, IOBuffer) # Check if data is an IOBuffer
return take!(data) # Return buffer contents as bytes
elseif isa(data, Vector{UInt8}) # Check if data is already binary
@@ -633,7 +642,7 @@ function _serialize_data(data::Any, type::String)
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
end
else # Unknown type
error("Unknown type: $type")
error("Unknown payload_type: $payload_type")
end
end
@@ -643,7 +652,7 @@ This internal function publishes a message to a NATS subject with proper
connection management and logging.
# Arguments:
- `nats_url::String` - NATS server URL (e.g., "nats://localhost:4222")
- `broker_url::String` - NATS server URL (e.g., "nats://localhost:4222")
- `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
- `message::String` - JSON message to publish
- `correlation_id::String` - Correlation ID for tracing and logging
@@ -652,18 +661,18 @@ connection management and logging.
- `nothing` - This function performs publishing but returns nothing
# Example
```jldoctest
using NATS
```jldoctest
using NATS
# Prepare JSON message
message = "{\"correlationId\":\"abc123\",\"payload\":\"test\"}"
# Prepare JSON message
message = "{\"correlation_id\":\"abc123\",\"payload\":\"test\"}"
# Publish to NATS
publish_message("nats://localhost:4222", "my.subject", message, "abc123")
```
# Publish to NATS
publish_message("nats://localhost:4222", "my.subject", message, "abc123")
```
"""
function publish_message(nats_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(nats_url) # Create NATS connection
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url) # Create NATS connection
try
NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject") # Log successful publish
@@ -690,46 +699,46 @@ A HTTP file server is required along with its download function.
- `msg::NATS.Msg` - NATS message to process
# Keyword Arguments:
- `fileserverDownloadHandler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs
- `fileserver_download_handler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs
- `max_retries::Int = 5` - Maximum retry attempts for fetching URL
- `base_delay::Int = 100` - Initial delay for exponential backoff in ms
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
# Return:
- `AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples
- JSON object of envelope with list of (dataname, data, data_type) tuples in payloads field
# Example
```jldoctest
# Receive and process message
msg = nats_message # NATS message
payloads = smartreceive(msg; fileserverDownloadHandler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
payloads = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# payloads = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]
```
"""
function smartreceive(
msg::NATS.Msg;
fileserverDownloadHandler::Function=_fetch_with_backoff,
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)
# Parse the JSON envelope
json_data = JSON.parse(String(msg.payload))
log_trace(json_data["correlationId"], "Processing received message") # Log message processing start
env_json_obj = JSON.parse(String(msg.payload))
log_trace(env_json_obj["correlation_id"], "Processing received message") # Log message processing start
# Process all payloads in the envelope
payloads_list = Tuple{String, Any, String}[]
# Get number of payloads
num_payloads = length(json_data["payloads"])
num_payloads = length(env_json_obj["payloads"])
for i in 1:num_payloads
payload = json_data["payloads"][i]
payload = env_json_obj["payloads"][i]
transport = String(payload["transport"])
dataname = String(payload["dataname"])
if transport == "direct" # Direct transport - payload is in the message
log_trace(json_data["correlationId"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
# Extract base64 payload from the payload
payload_b64 = String(payload["data"])
@@ -738,29 +747,29 @@ function smartreceive(
payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes
# Deserialize based on type
data_type = String(payload["type"])
data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"])
data_type = String(payload["payload_type"])
data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"])
push!(payloads_list, (dataname, data, data_type))
elseif transport == "link" # Link transport - payload is at URL
# Extract download URL from the payload
url = String(payload["data"])
log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
# Fetch with exponential backoff using the download handler
downloaded_data = fileserverDownloadHandler(url, max_retries, base_delay, max_delay, json_data["correlationId"])
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"])
# Deserialize based on type
data_type = String(payload["type"])
data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"])
data_type = String(payload["payload_type"])
data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"])
push!(payloads_list, (dataname, data, data_type))
else # Unknown transport type
error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
end
end
json_data["payloads"] = payloads_list
return json_data # Return envelope with list of (dataname, data, data_type) tuples in payloads field
env_json_obj["payloads"] = payloads_list
return env_json_obj # JSON object of envelope with list of (dataname, data, data_type) tuples in payloads field
end
@@ -840,19 +849,19 @@ It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow
# Arguments:
- `data::Vector{UInt8}` - Serialized data as bytes
- `type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
- `payload_type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
- `correlation_id::String` - Correlation ID for logging
# Return:
- Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary")
# Throws:
- `Error` if `type` is not one of the supported types
- `Error` if `payload_type` is not one of the supported types
# Example
```jldoctest
# Text data
text_bytes = UInt8["Hello World"]
text_bytes = Vector{UInt8}("Hello World")
text_data = _deserialize_data(text_bytes, "text", "correlation123")
# JSON data
@@ -860,34 +869,34 @@ json_bytes = UInt8[123, 34, 110, 97, 109, 101, 34, 58, 34, 65, 108, 105, 99, 101
json_data = _deserialize_data(json_bytes, "dictionary", "correlation123")
# Arrow IPC data (table)
arrow_bytes = UInt8[1, 2, 3] # Arrow IPC bytes
arrow_bytes = Vector{UInt8}([1, 2, 3]) # Arrow IPC bytes
table_data = _deserialize_data(arrow_bytes, "table", "correlation123")
```
"""
function _deserialize_data(
data::Vector{UInt8},
type::String,
payload_type::String,
correlation_id::String
)
if type == "text" # Text data - convert to string
if payload_type == "text" # Text data - convert to string
return String(data) # Convert bytes to string
elseif type == "dictionary" # JSON data - deserialize
elseif payload_type == "dictionary" # JSON data - deserialize
json_str = String(data) # Convert bytes to string
return JSON.parse(json_str) # Parse JSON string to JSON object
elseif type == "table" # Table data - deserialize Arrow IPC stream
elseif payload_type == "table" # Table data - deserialize Arrow IPC stream
io = IOBuffer(data) # Create buffer from bytes
df = Arrow.Table(io) # Read Arrow IPC format from buffer
return df # Return DataFrame
elseif type == "image" # Image data - return binary
elseif payload_type == "image" # Image data - return binary
return data # Return bytes directly
elseif type == "audio" # Audio data - return binary
elseif payload_type == "audio" # Audio data - return binary
return data # Return bytes directly
elseif type == "video" # Video data - return binary
elseif payload_type == "video" # Video data - return binary
return data # Return bytes directly
elseif type == "binary" # Binary data - return binary
elseif payload_type == "binary" # Binary data - return binary
return data # Return bytes directly
else # Unknown type
error("Unknown type: $type") # Throw error for unknown type
error("Unknown payload_type: $payload_type") # Throw error for unknown type
end
end
@@ -904,7 +913,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
4. Returns identifiers and download URL for the uploaded file
# Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `file_server_url::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `filename::String` - Name of the file being uploaded
- `data::Vector{UInt8}` - Raw byte data of the file content
@@ -916,36 +925,36 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
- `"url"` - Full URL to download the uploaded file
# Example
```jldoctest
using HTTP, JSON
```jldoctest
using HTTP, JSON
fileServerURL = "http://localhost:8080"
filename = "test.txt"
data = UInt8["hello world"]
fileserver_url = "http://localhost:8080"
filename = "test.txt"
data = Vector{UInt8}("hello world")
# Upload to local plik server
result = plik_oneshot_upload(fileServerURL, filename, data)
# Upload to local plik server
result = plik_oneshot_upload(file_server_url, filename, data)
# Access the result as a Dict
# result["status"], result["uploadid"], result["fileid"], result["url"]
```
# Access the result as a Dict
# result["status"], result["uploadid"], result["fileid"], result["url"]
```
"""
function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8})
function plik_oneshot_upload(file_server_url::String, filename::String, data::Vector{UInt8})
# ----------------------------------------- get upload id ---------------------------------------- #
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
url_getUploadID = "$fileServerURL/upload" # URL to get upload ID
url_getUploadID = "$file_server_url/upload" # URL to get upload ID
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(httpResponse.body)
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
response_json = JSON.parse(http_response.body)
uploadid = response_json["id"]
uploadtoken = response_json["uploadToken"]
# ------------------------------------------ upload file ----------------------------------------- #
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload
url_upload = "$fileServerURL/file/$uploadid"
url_upload = "$file_server_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
# Create the multipart form data
@@ -954,24 +963,23 @@ function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vect
))
# Execute the POST request
httpResponse = nothing
http_response = nothing
try
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(httpResponse.body)
http_response = HTTP.post(url_upload, headers, form)
catch e
@error "Request failed" exception=e
end
fileid = responseJson["id"]
response_json = JSON.parse(http_response.body)
fileid = response_json["id"]
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
url = "$fileServerURL/file/$uploadid/$fileid/$filename"
url = "$file_server_url/file/$uploadid/$fileid/$filename"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
""" plik_oneshot_upload(fileServerURL::String, filepath::String)
""" plik_oneshot_upload(file_server_url::String, filepath::String)
This function uploads a file from disk to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
@@ -983,7 +991,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
4. Returns identifiers and download URL for the uploaded file
# Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `file_server_url::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `filepath::String` - Full path to the local file to upload
# Return:
@@ -997,59 +1005,58 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
```jldoctest
using HTTP, JSON
fileServerURL = "http://localhost:8080"
fileserver_url = "http://localhost:8080"
filepath = "./test.zip"
# Upload to local plik server
result = plik_oneshot_upload(fileServerURL, filepath)
result = plik_oneshot_upload(file_server_url, filepath)
# Access the result as a Dict
# result["status"], result["uploadid"], result["fileid"], result["url"]
```
"""
function plik_oneshot_upload(fileServerURL::String, filepath::String)
function plik_oneshot_upload(file_server_url::String, filepath::String)
# ----------------------------------------- get upload id ---------------------------------------- #
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
filename = basename(filepath)
url_getUploadID = "$fileServerURL/upload" # URL to get upload ID
url_getUploadID = "$file_server_url/upload" # URL to get upload ID
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(httpResponse.body)
http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
response_json = JSON.parse(http_response.body)
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
uploadid = response_json["id"]
uploadtoken = response_json["uploadToken"]
# ------------------------------------------ upload file ----------------------------------------- #
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
file_multipart = open(filepath, "r")
url_upload = "$fileServerURL/file/$uploadid"
url_upload = "$file_server_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
http_response = open(filepath, "r") do file_stream
form = HTTP.Form(Dict("file" => file_stream))
# Create the multipart form data
form = HTTP.Form(Dict(
"file" => file_multipart
))
# Execute the POST request
httpResponse = nothing
try
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(httpResponse.body)
catch e
@error "Request failed" exception=e
# Adding status_exception=false prevents 4xx/5xx from triggering 'catch'
HTTP.post(url_upload, headers, form; status_exception = false)
end
fileid = responseJson["id"]
if !isnothing(http_response) && http_response.status == 200
# Success - response already logged by caller
else
error("Failed to upload file: server returned status $(http_response.status)")
end
response_json = JSON.parse(http_response.body)
fileid = response_json["id"]
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
url = "$fileServerURL/file/$uploadid/$fileid/$filename"
url = "$file_server_url/file/$uploadid/$fileid/$filename"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function _get_payload_bytes(data)
@error "didn't implement yet"
end

View File

@@ -460,8 +460,9 @@ async function smartsend(subject, data, options = {}) {
* @param {string} options.receiverId - UUID of the receiver (default: "")
* @param {string} options.replyTo - Topic to reply to (default: "")
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "")
* @param {boolean} options.isPublish - Whether to automatically publish the message to NATS (default: true)
*
* @returns {Promise<MessageEnvelope>} - The envelope for tracking
* @returns {Promise<Object>} - An object with { env: MessageEnvelope, env_json_str: string }
*/
const {
natsUrl = DEFAULT_NATS_URL,
@@ -474,7 +475,8 @@ async function smartsend(subject, data, options = {}) {
receiverName = "",
receiverId = "",
replyTo = "",
replyToMsgId = ""
replyToMsgId = "",
isPublish = true // Whether to automatically publish the message to NATS
} = options;
log_trace(correlationId, `Starting smartsend for subject: ${subject}`);
@@ -556,10 +558,19 @@ async function smartsend(subject, data, options = {}) {
payloads: payloads
});
// Publish message to NATS
await publish_message(natsUrl, subject, env.toString(), correlationId);
// Convert envelope to JSON string
const env_json_str = env.toString();
return env;
// Publish to NATS if isPublish is true
if (isPublish) {
await publish_message(natsUrl, subject, env_json_str, correlationId);
}
// Return both envelope and JSON string (tuple-like structure)
return {
env: env,
env_json_str: env_json_str
};
}
// Helper: Publish message to NATS

View File

@@ -1,295 +0,0 @@
# NATSBridge
A high-performance, bi-directional data bridge for **Julia**, **JavaScript**, and **Python/Micropython** using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
## Overview
NATSBridge enables seamless communication between Julia, JavaScript, and Python/Micropython applications through NATS, with automatic transport selection based on payload size:
- **Direct Transport**: Payloads < 1MB are sent directly via NATS (Base64 encoded)
- **Link Transport**: Payloads >= 1MB are uploaded to an HTTP file server and referenced via URL
## Features
- ✅ Bi-directional NATS communication across Julia ↔ JavaScript ↔ Python/Micropython
- ✅ Multi-payload support (mixed content in single message)
- ✅ Automatic transport selection based on payload size
- ✅ File server integration for large payloads
- ✅ Exponential backoff for URL fetching
- ✅ Correlation ID tracking
- ✅ Reply-to support for request-response pattern
## Supported Payload Types
| Type | Description |
|------|-------------|
| `text` | Plain text strings |
| `dictionary` | JSON-serializable dictionaries |
| `table` | Tabular data (Arrow IPC format) |
| `image` | Image data (PNG, JPG bytes) |
| `audio` | Audio data (WAV, MP3 bytes) |
| `video` | Video data (MP4, AVI bytes) |
| `binary` | Generic binary data |
## Implementation Guides
### [Julia Implementation](../tutorial_julia.md)
See the [Julia tutorial](../tutorial_julia.md) for getting started with Julia.
### [JavaScript Implementation](#javascript-implementation)
See [`NATSBridge.js`](NATSBridge.js) for the JavaScript implementation.
### [Python/Micropython Implementation](#pythonmicropython-implementation)
See [`nats_bridge.py`](nats_bridge.py) for the Python/Micropython implementation.
## Installation
### Julia
```julia
using Pkg
Pkg.add("NATS")
Pkg.add("Arrow")
Pkg.add("JSON3")
Pkg.add("HTTP")
Pkg.add("UUIDs")
Pkg.add("Dates")
```
### JavaScript
```bash
npm install nats.js apache-arrow uuid base64-url
```
### Python/Micropython
1. Copy `nats_bridge.py` to your device
2. Ensure you have the following dependencies:
- `urequests` for HTTP requests (Micropython)
- `requests` for HTTP requests (Python)
- `base64` for base64 encoding
- `json` for JSON handling
- `socket` for networking (Micropython)
## Usage
### Basic Text Message
#### Python/Micropython
```python
from nats_bridge import smartsend, smartreceive
# Sender
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# Receiver
payloads = smartreceive(msg)
for dataname, data, type in payloads:
print("Received {}: {}".format(dataname, data))
```
#### Julia
```julia
using NATSBridge
# Sender
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# Receiver
envelope = smartreceive(msg, fileserverDownloadHandler)
# envelope["payloads"] = [("message", "Hello World", "text"), ...]
```
#### JavaScript
```javascript
const { smartsend, smartreceive } = require('./src/NATSBridge');
// Sender
await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
// Receiver
const envelope = await smartreceive(msg);
// envelope.payloads = [{ dataname: "message", data: "Hello World", type: "text" }, ...]
```
### Sending JSON Configuration
#### Python/Micropython
```python
from nats_bridge import smartsend
config = {
"wifi_ssid": "MyNetwork",
"wifi_password": "password123",
"update_interval": 60
}
data = [("config", config, "dictionary")]
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")
```
### Mixed Content (Chat with Text + Image)
#### Python/Micropython
```python
from nats_bridge import smartsend
image_data = b"\x89PNG..." # PNG bytes
data = [
("message_text", "Hello with image!", "text"),
("user_avatar", image_data, "binary")
]
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
```
### Request-Response Pattern
#### Python/Micropython
```python
from nats_bridge import smartsend
# Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend(
"/device/command",
data,
nats_url="nats://localhost:4222",
reply_to="/device/response",
reply_to_msg_id="cmd-001"
)
```
### Large Payloads (File Server)
#### Python/Micropython
```python
from nats_bridge import smartsend
# Large data (> 1MB)
large_data = b"A" * 2000000 # 2MB
env = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
nats_url="nats://localhost:4222",
fileserver_url="http://localhost:8080",
size_threshold=1000000 # 1MB threshold
)
```
## API Reference
### `smartsend(subject, data, ...)`
Send data via NATS with automatic transport selection.
**Arguments:**
- `subject` (str): NATS subject to publish to
- `data` (list): List of `(dataname, data, type)` tuples
- `nats_url` (str): NATS server URL (default: `nats://localhost:4222`)
- `fileserver_url` (str): HTTP file server URL (default: `http://localhost:8080`)
- `size_threshold` (int): Threshold in bytes (default: 1,000,000)
- `correlation_id` (str): Optional correlation ID for tracing
- `msg_purpose` (str): Message purpose (default: `"chat"`)
- `sender_name` (str): Sender name (default: `"NATSBridge"`)
- `receiver_name` (str): Receiver name (default: `""`)
- `receiver_id` (str): Receiver ID (default: `""`)
- `reply_to` (str): Reply topic (default: `""`)
- `reply_to_msg_id` (str): Reply message ID (default: `""`)
**Returns:** `MessageEnvelope` object
### `smartreceive(msg, ...)`
Receive and process NATS messages.
**Arguments:**
- `msg`: NATS message (dict or JSON string)
- `fileserver_download_handler` (function): Function to fetch data from URLs
- `max_retries` (int): Maximum retry attempts (default: 5)
- `base_delay` (int): Initial delay in ms (default: 100)
- `max_delay` (int): Maximum delay in ms (default: 5000)
**Returns:** List of `(dataname, data, type)` tuples
### `MessageEnvelope`
Represents a complete NATS message envelope.
**Attributes:**
- `correlation_id`: Unique identifier for tracing
- `msg_id`: Unique message identifier
- `timestamp`: Message publication timestamp
- `send_to`: NATS subject
- `msg_purpose`: Message purpose
- `sender_name`: Sender name
- `sender_id`: Sender UUID
- `receiver_name`: Receiver name
- `receiver_id`: Receiver UUID
- `reply_to`: Reply topic
- `reply_to_msg_id`: Reply message ID
- `broker_url`: NATS broker URL
- `metadata`: Message-level metadata
- `payloads`: List of MessagePayload objects
### `MessagePayload`
Represents a single payload within a message envelope.
**Attributes:**
- `id`: Unique payload identifier
- `dataname`: Name of the payload
- `type`: Payload type ("text", "dictionary", etc.)
- `transport`: Transport method ("direct" or "link")
- `encoding`: Encoding method ("none", "base64", etc.)
- `size`: Payload size in bytes
- `data`: Payload data (bytes for direct, URL for link)
- `metadata`: Payload-level metadata
## Examples
See [`examples/micropython_example.py`](../examples/micropython_example.py) for more detailed examples.
## Testing
Run the test suite:
```bash
# Python/Micropython
python test/test_micropython_basic.py
# JavaScript
node test/test_js_to_js_text_sender.js
node test/test_js_to_js_text_receiver.js
# Julia
julia test/test_julia_to_julia_text_sender.jl
julia test/test_julia_to_julia_text_receiver.jl
```
## Requirements
- **Julia**: NATS server (nats.io), HTTP file server (optional)
- **JavaScript**: NATS server (nats.io), HTTP file server (optional)
- **Python/Micropython**: NATS server (nats.io), HTTP file server (optional)
## License
MIT

View File

@@ -437,7 +437,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL,
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id=""):
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True):
"""Send data either directly via NATS or via a fileserver URL, depending on payload size.
This function intelligently routes data delivery based on payload size relative to a threshold.
@@ -459,9 +459,12 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
receiver_id: UUID of the receiver
reply_to: Topic to reply to
reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS (default: True)
Returns:
MessageEnvelope: The envelope object for tracking
tuple: (env, env_json_str) where:
- env: MessageEnvelope object with all metadata and payloads
- env_json_str: JSON string representation of the envelope for publishing
"""
# Generate correlation ID if not provided
cid = correlation_id if correlation_id else str(uuid.uuid4())
@@ -549,13 +552,15 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
msg_json = env.to_json()
# Publish to NATS
# Publish to NATS if is_publish is True
if is_publish:
nats_conn = NATSConnection(nats_url)
nats_conn.connect()
nats_conn.publish(subject, msg_json)
nats_conn.close()
return env
# Return tuple of (envelope, json_string) for both direct and link transport
return (env, msg_json)
def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5,

View File

@@ -118,7 +118,7 @@ async function test_dict_send() {
// Use smartsend with dictionary type
// For small Dictionary: will use direct transport (JSON encoded)
// For large Dictionary: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -132,7 +132,8 @@ async function test_dict_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -98,7 +98,7 @@ async function test_large_binary_send() {
// Use smartsend with binary type - will automatically use link transport
// if file size exceeds the threshold (1MB by default)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -112,7 +112,8 @@ async function test_large_binary_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -222,7 +222,7 @@ async function test_mix_send() {
];
// Use smartsend with mixed content
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
payloads,
{
@@ -236,7 +236,8 @@ async function test_mix_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -118,7 +118,7 @@ async function test_table_send() {
// Use smartsend with table type
// For small Table: will use direct transport (Arrow IPC encoded)
// For large Table: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -132,7 +132,8 @@ async function test_table_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -94,7 +94,7 @@ async function test_text_send() {
// Use smartsend with text type
// For small text: will use direct transport (Base64 encoded UTF-8)
// For large text: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -108,7 +108,8 @@ async function test_text_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -92,12 +92,12 @@ function test_dict_send()
# Use smartsend with dictionary type
# For small Dictionary: will use direct transport (JSON encoded)
# For large Dictionary: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -105,7 +105,8 @@ function test_dict_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -114,7 +115,7 @@ function test_dict_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -79,12 +79,12 @@ function test_large_binary_send()
# Use smartsend with binary type - will automatically use link transport
# if file size exceeds the threshold (1MB by default)
# API: smartsend(subject, [(dataname, data, type), ...]; keywords...)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL;
broker_url = NATS_URL;
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -92,11 +92,12 @@ function test_large_binary_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with transport: $(env.payloads[1].transport)")
log_trace("Envelope type: $(env.payloads[1].type)")
log_trace("Envelope type: $(env.payloads[1].payload_type)")
# Check if link transport was used
if env.payloads[1].transport == "link"

View File

@@ -186,12 +186,12 @@ function test_mix_send()
]
# Use smartsend with mixed content
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
payloads; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -199,7 +199,8 @@ function test_mix_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -208,7 +209,7 @@ function test_mix_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -90,12 +90,12 @@ function test_table_send()
# Use smartsend with table type
# For small DataFrame: will use direct transport (Base64 encoded Arrow IPC)
# For large DataFrame: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -103,7 +103,8 @@ function test_table_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -112,7 +113,7 @@ function test_table_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -75,12 +75,12 @@ function test_text_send()
# Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -88,7 +88,8 @@ function test_text_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -97,7 +98,7 @@ function test_text_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -64,7 +64,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with dictionary type
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -76,7 +76,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -44,7 +44,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with binary type
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -56,7 +56,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -58,7 +58,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with mixed types
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
data, # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -70,7 +70,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -46,7 +46,7 @@ def main():
# Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver)
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -58,7 +58,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")