Compare commits
6 Commits
d950bbac23
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| f8a92a45a0 | |||
| cec70e6036 | |||
| f9e08ba628 | |||
| c12a078149 | |||
| dedd803dc3 | |||
| e8e927a491 |
957
README.md
Normal file
957
README.md
Normal file
@@ -0,0 +1,957 @@
|
||||
# NATSBridge
|
||||
|
||||
A high-performance, bi-directional data bridge between **Julia**, **JavaScript**, and **Python/Micropython** applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
|
||||
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
[](https://nats.io)
|
||||
|
||||
---
|
||||
|
||||
## Table of Contents
|
||||
|
||||
- [Overview](#overview)
|
||||
- [Features](#features)
|
||||
- [Architecture](#architecture)
|
||||
- [Installation](#installation)
|
||||
- [Quick Start](#quick-start)
|
||||
- [API Reference](#api-reference)
|
||||
- [Payload Types](#payload-types)
|
||||
- [Transport Strategies](#transport-strategies)
|
||||
- [Examples](#examples)
|
||||
- [Testing](#testing)
|
||||
- [License](#license)
|
||||
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
NATSBridge enables seamless communication across Julia, JavaScript, and Python/Micropython applications through NATS, with intelligent transport selection based on payload size:
|
||||
|
||||
| Transport | Payload Size | Method |
|
||||
|-----------|--------------|--------|
|
||||
| **Direct** | < 1MB | Sent directly via NATS (Base64 encoded) |
|
||||
| **Link** | >= 1MB | Uploaded to HTTP file server, URL sent via NATS |
|
||||
|
||||
### Use Cases
|
||||
|
||||
- **Chat Applications**: Text, images, audio, video in a single message
|
||||
- **File Transfer**: Efficient transfer of large files using claim-check pattern
|
||||
- **Streaming Data**: Sensor data, telemetry, and analytics pipelines
|
||||
- **Cross-Platform Communication**: Julia ↔ JavaScript ↔ Python/Micropython
|
||||
- **IoT Devices**: Micropython devices sending data to cloud services
|
||||
|
||||
---
|
||||
|
||||
## Features
|
||||
|
||||
- ✅ **Bi-directional messaging** between Julia, JavaScript, and Python/Micropython
|
||||
- ✅ **Multi-payload support** - send multiple payloads with different types in one message
|
||||
- ✅ **Automatic transport selection** - direct vs link based on payload size
|
||||
- ✅ **Claim-Check pattern** for payloads > 1MB
|
||||
- ✅ **Apache Arrow IPC** support for tabular data (zero-copy reading)
|
||||
- ✅ **Exponential backoff** for reliable file server downloads
|
||||
- ✅ **Correlation ID tracking** for message tracing
|
||||
- ✅ **Reply-to support** for request-response patterns
|
||||
- ✅ **JetStream support** for message replay and durability
|
||||
- ✅ **Lightweight Micropython implementation** for microcontrollers
|
||||
|
||||
---
|
||||
|
||||
## Architecture
|
||||
|
||||
### System Components
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ NATSBridge Architecture │
|
||||
├─────────────────────────────────────────────────────────────────────┤
|
||||
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
||||
│ │ Julia │ │ JavaScript │ │ Python/ │ │
|
||||
│ │ (NATS.jl) │◄──►│ (nats.js) │◄──►│ Micropython │ │
|
||||
│ └──────────────┘ └──────────────┘ └──────────────┘ │
|
||||
│ │ │ │ │
|
||||
│ ▼ ▼ ▼ │
|
||||
│ ┌─────────────────────────────────────────────────────┐ │
|
||||
│ │ NATS │ │
|
||||
│ │ (Message Broker) │ │
|
||||
│ └─────────────────────────────────────────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌──────────────────────┐ │
|
||||
│ │ File Server │ │
|
||||
│ │ (HTTP Upload/Get) │ │
|
||||
│ └──────────────────────┘ │
|
||||
└─────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Message Flow
|
||||
|
||||
1. **Sender** creates a message envelope with payloads
|
||||
2. **NATSBridge** serializes and encodes payloads based on type
|
||||
3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server
|
||||
4. **NATS** routes messages to subscribers
|
||||
5. **Receiver** fetches payloads (from NATS or file server)
|
||||
6. **NATSBridge** deserializes and decodes payloads
|
||||
|
||||
---
|
||||
|
||||
## Installation
|
||||
|
||||
### Prerequisites
|
||||
|
||||
- **NATS Server** (v2.10+ recommended)
|
||||
- **HTTP File Server** (optional, for payloads > 1MB)
|
||||
|
||||
### Julia
|
||||
|
||||
```julia
|
||||
using Pkg
|
||||
Pkg.add("NATS")
|
||||
Pkg.add("https://git.yiem.cc/ton/NATSBridge")
|
||||
```
|
||||
|
||||
### JavaScript
|
||||
|
||||
```bash
|
||||
npm install nats.js apache-arrow uuid base64-url
|
||||
```
|
||||
|
||||
For Node.js:
|
||||
```javascript
|
||||
const { smartsend, smartreceive } = require('./src/NATSBridge');
|
||||
```
|
||||
|
||||
For browser:
|
||||
```html
|
||||
<script src="./src/NATSBridge.js"></script>
|
||||
<script>
|
||||
// NATSBridge is available as window.NATSBridge
|
||||
</script>
|
||||
```
|
||||
|
||||
### Python/Micropython
|
||||
|
||||
1. Copy [`src/nats_bridge.py`](src/nats_bridge.py) to your device
|
||||
2. Install dependencies:
|
||||
|
||||
**For Python (desktop):**
|
||||
```bash
|
||||
pip install nats-py
|
||||
```
|
||||
|
||||
**For Micropython:**
|
||||
- `urequests` for HTTP requests (built-in for ESP32)
|
||||
- `base64` for base64 encoding (built-in)
|
||||
- `json` for JSON handling (built-in)
|
||||
|
||||
---
|
||||
|
||||
## Quick Start
|
||||
|
||||
### Step 1: Start NATS Server
|
||||
|
||||
```bash
|
||||
docker run -p 4222:4222 nats:latest
|
||||
```
|
||||
|
||||
### Step 2: Start HTTP File Server (Optional)
|
||||
|
||||
```bash
|
||||
# Create a directory for file uploads
|
||||
mkdir -p /tmp/fileserver
|
||||
|
||||
# Use Python's built-in server
|
||||
python3 -m http.server 8080 --directory /tmp/fileserver
|
||||
```
|
||||
|
||||
### Step 3: Send Your First Message
|
||||
|
||||
#### Python/Micropython
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
# Send a text message
|
||||
data = [("message", "Hello World", "text")]
|
||||
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
|
||||
print("Message sent!")
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const { smartsend } = require('./src/NATSBridge');
|
||||
|
||||
// Send a text message
|
||||
await smartsend("/chat/room1", [
|
||||
{ dataname: "message", data: "Hello World", type: "text" }
|
||||
], { natsUrl: "nats://localhost:4222" });
|
||||
|
||||
console.log("Message sent!");
|
||||
```
|
||||
|
||||
#### Julia
|
||||
|
||||
```julia
|
||||
using NATSBridge
|
||||
|
||||
# Send a text message
|
||||
data = [("message", "Hello World", "text")]
|
||||
env = NATSBridge.smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
|
||||
println("Message sent!")
|
||||
```
|
||||
|
||||
### Step 4: Receive Messages
|
||||
|
||||
#### Python/Micropython
|
||||
|
||||
```python
|
||||
import nats
|
||||
import asyncio
|
||||
from nats_bridge import smartreceive
|
||||
|
||||
# Configuration
|
||||
SUBJECT = "/chat/room1"
|
||||
NATS_URL = "nats://localhost:4222"
|
||||
|
||||
async def main():
|
||||
# Connect to NATS
|
||||
nc = await nats.connect(NATS_URL)
|
||||
|
||||
# Subscribe to the subject - msg comes from the callback
|
||||
async def message_handler(msg):
|
||||
# Receive and process message
|
||||
envelope = smartreceive(msg.data)
|
||||
for dataname, data, type in envelope["payloads"]:
|
||||
print(f"Received {dataname}: {data}")
|
||||
|
||||
sid = await nc.subscribe(SUBJECT, cb=message_handler)
|
||||
await asyncio.sleep(120) # Keep listening
|
||||
await nc.close()
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const { smartreceive } = require('./src/NATSBridge');
|
||||
const { connect } = require('nats');
|
||||
|
||||
// Configuration
|
||||
const SUBJECT = "/chat/room1";
|
||||
const NATS_URL = "nats://localhost:4222";
|
||||
|
||||
async function main() {
|
||||
// Connect to NATS
|
||||
const nc = await connect({ servers: [NATS_URL] });
|
||||
|
||||
// Subscribe to the subject - msg comes from the async iteration
|
||||
const sub = nc.subscribe(SUBJECT);
|
||||
|
||||
for await (const msg of sub) {
|
||||
// Receive and process message
|
||||
const envelope = await smartreceive(msg);
|
||||
for (const payload of envelope.payloads) {
|
||||
console.log(`Received ${payload.dataname}: ${payload.data}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
main();
|
||||
```
|
||||
|
||||
#### Julia
|
||||
|
||||
```julia
|
||||
using NATS, NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/chat/room1"
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] $message")
|
||||
end
|
||||
|
||||
# Receiver: Listen for messages - msg comes from the callback
|
||||
function test_receive()
|
||||
conn = NATS.connect(NATS_URL)
|
||||
NATS.subscribe(conn, SUBJECT) do msg
|
||||
log_trace("Received message on $(msg.subject)")
|
||||
|
||||
# Receive and process message
|
||||
envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
|
||||
for (dataname, data, type) in envelope["payloads"]
|
||||
println("Received $dataname: $data")
|
||||
end
|
||||
end
|
||||
|
||||
# Keep listening for 120 seconds
|
||||
sleep(120)
|
||||
NATS.drain(conn)
|
||||
end
|
||||
|
||||
test_receive()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## API Reference
|
||||
|
||||
### smartsend
|
||||
|
||||
Sends data either directly via NATS or via a fileserver URL, depending on payload size.
|
||||
|
||||
#### Python/Micropython
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
env = smartsend(
|
||||
subject, # NATS subject to publish to
|
||||
data, # List of (dataname, data, type) tuples
|
||||
nats_url="nats://localhost:4222", # NATS server URL
|
||||
fileserver_url="http://localhost:8080", # File server URL
|
||||
fileserver_upload_handler=plik_oneshot_upload, # Upload handler function
|
||||
size_threshold=1_000_000, # Threshold in bytes (default: 1MB)
|
||||
correlation_id=None, # Optional correlation ID for tracing
|
||||
msg_purpose="chat", # Message purpose
|
||||
sender_name="NATSBridge", # Sender name
|
||||
receiver_name="", # Receiver name (empty = broadcast)
|
||||
receiver_id="", # Receiver UUID (empty = broadcast)
|
||||
reply_to="", # Reply topic
|
||||
reply_to_msg_id="" # Reply message ID
|
||||
)
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const { smartsend } = require('./src/NATSBridge');
|
||||
|
||||
const env = await smartsend(
|
||||
subject, // NATS subject
|
||||
data, // Array of {dataname, data, type}
|
||||
{
|
||||
natsUrl: "nats://localhost:4222",
|
||||
fileserverUrl: "http://localhost:8080",
|
||||
fileserverUploadHandler: customUploadHandler,
|
||||
sizeThreshold: 1_000_000,
|
||||
correlationId: "custom-id",
|
||||
msgPurpose: "chat",
|
||||
senderName: "NATSBridge",
|
||||
receiverName: "",
|
||||
receiverId: "",
|
||||
replyTo: "",
|
||||
replyToMsgId: ""
|
||||
}
|
||||
);
|
||||
```
|
||||
|
||||
#### Julia
|
||||
|
||||
```julia
|
||||
using NATSBridge
|
||||
|
||||
env = NATSBridge.smartsend(
|
||||
subject; # NATS subject
|
||||
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type)
|
||||
nats_url::String = "nats://localhost:4222",
|
||||
fileserver_url = "http://localhost:8080",
|
||||
fileserverUploadHandler::Function = plik_oneshot_upload,
|
||||
size_threshold::Int = 1_000_000,
|
||||
correlation_id::Union{String, Nothing} = nothing,
|
||||
msg_purpose::String = "chat",
|
||||
sender_name::String = "NATSBridge",
|
||||
receiver_name::String = "",
|
||||
receiver_id::String = "",
|
||||
reply_to::String = "",
|
||||
reply_to_msg_id::String = ""
|
||||
)
|
||||
```
|
||||
|
||||
### smartreceive
|
||||
|
||||
Receives and processes messages from NATS, handling both direct and link transport.
|
||||
|
||||
#### Python/Micropython
|
||||
|
||||
```python
|
||||
from nats_bridge import smartreceive
|
||||
|
||||
# Note: For nats-py, use msg.data to pass the raw message data
|
||||
envelope = smartreceive(
|
||||
msg.data, # NATS message data (msg.data for nats-py)
|
||||
fileserver_download_handler=_fetch_with_backoff, # Download handler
|
||||
max_retries=5, # Max retry attempts
|
||||
base_delay=100, # Initial delay in ms
|
||||
max_delay=5000 # Max delay in ms
|
||||
)
|
||||
# Returns: Dict with envelope metadata and 'payloads' field
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const { smartreceive } = require('./src/NATSBridge');
|
||||
|
||||
// Note: msg is the NATS message object from subscription
|
||||
const envelope = await smartreceive(
|
||||
msg, // NATS message (raw object from subscription)
|
||||
{
|
||||
fileserverDownloadHandler: customDownloadHandler,
|
||||
maxRetries: 5,
|
||||
baseDelay: 100,
|
||||
maxDelay: 5000
|
||||
}
|
||||
);
|
||||
// Returns: Object with envelope metadata and payloads array
|
||||
```
|
||||
|
||||
#### Julia
|
||||
|
||||
```julia
|
||||
using NATSBridge
|
||||
|
||||
# Note: msg is a NATS.Msg object passed from the subscription callback
|
||||
envelope = NATSBridge.smartreceive(
|
||||
msg::NATS.Msg;
|
||||
fileserverDownloadHandler::Function = _fetch_with_backoff,
|
||||
max_retries::Int = 5,
|
||||
base_delay::Int = 100,
|
||||
max_delay::Int = 5000
|
||||
)
|
||||
# Returns: Dict with envelope metadata and payloads array
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Payload Types
|
||||
|
||||
| Type | Description | Serialization |
|
||||
|------|-------------|---------------|
|
||||
| `text` | Plain text strings | UTF-8 bytes |
|
||||
| `dictionary` | JSON-serializable dictionaries | JSON |
|
||||
| `table` | Tabular data (DataFrames, arrays) | Apache Arrow IPC |
|
||||
| `image` | Image data (PNG, JPG) | Raw bytes |
|
||||
| `audio` | Audio data (WAV, MP3) | Raw bytes |
|
||||
| `video` | Video data (MP4, AVI) | Raw bytes |
|
||||
| `binary` | Generic binary data | Raw bytes |
|
||||
|
||||
---
|
||||
|
||||
## Transport Strategies
|
||||
|
||||
### Direct Transport (Payloads < 1MB)
|
||||
|
||||
Small payloads are sent directly via NATS with Base64 encoding.
|
||||
|
||||
#### Python/Micropython
|
||||
```python
|
||||
data = [("message", "Hello", "text")]
|
||||
smartsend("/topic", data)
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
```javascript
|
||||
await smartsend("/topic", [
|
||||
{ dataname: "message", data: "Hello", type: "text" }
|
||||
]);
|
||||
```
|
||||
|
||||
#### Julia
|
||||
```julia
|
||||
data = [("message", "Hello", "text")]
|
||||
smartsend("/topic", data)
|
||||
```
|
||||
|
||||
### Link Transport (Payloads >= 1MB)
|
||||
|
||||
Large payloads are uploaded to an HTTP file server.
|
||||
|
||||
#### Python/Micropython
|
||||
```python
|
||||
data = [("file", large_data, "binary")]
|
||||
smartsend("/topic", data, fileserver_url="http://localhost:8080")
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
```javascript
|
||||
await smartsend("/topic", [
|
||||
{ dataname: "file", data: largeData, type: "binary" }
|
||||
], { fileserverUrl: "http://localhost:8080" });
|
||||
```
|
||||
|
||||
#### Julia
|
||||
```julia
|
||||
data = [("file", large_data, "binary")]
|
||||
smartsend("/topic", data, fileserver_url="http://localhost:8080")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Examples
|
||||
|
||||
All examples include code for **Julia**, **JavaScript**, and **Python/Micropython** unless otherwise specified.
|
||||
|
||||
### Example 1: Chat with Mixed Content
|
||||
|
||||
Send text, small image, and large file in one message.
|
||||
|
||||
#### Python/Micropython
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
data = [
|
||||
("message_text", "Hello!", "text"),
|
||||
("user_avatar", image_data, "image"),
|
||||
("large_document", large_file_data, "binary")
|
||||
]
|
||||
|
||||
smartsend("/chat/room1", data, fileserver_url="http://localhost:8080")
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
```javascript
|
||||
const { smartsend } = require('./src/NATSBridge');
|
||||
|
||||
await smartsend("/chat/room1", [
|
||||
{ dataname: "message_text", data: "Hello!", type: "text" },
|
||||
{ dataname: "user_avatar", data: image_data, type: "image" },
|
||||
{ dataname: "large_document", data: large_file_data, type: "binary" }
|
||||
], {
|
||||
fileserverUrl: "http://localhost:8080"
|
||||
});
|
||||
```
|
||||
|
||||
#### Julia
|
||||
```julia
|
||||
using NATSBridge
|
||||
|
||||
data = [
|
||||
("message_text", "Hello!", "text"),
|
||||
("user_avatar", image_data, "image"),
|
||||
("large_document", large_file_data, "binary")
|
||||
]
|
||||
|
||||
NATSBridge.smartsend("/chat/room1", data, fileserver_url="http://localhost:8080")
|
||||
```
|
||||
|
||||
### Example 2: Dictionary Exchange
|
||||
|
||||
Send configuration data between platforms.
|
||||
|
||||
#### Python/Micropython
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
config = {
|
||||
"wifi_ssid": "MyNetwork",
|
||||
"wifi_password": "password123",
|
||||
"update_interval": 60
|
||||
}
|
||||
|
||||
data = [("config", config, "dictionary")]
|
||||
smartsend("/device/config", data)
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
```javascript
|
||||
const { smartsend } = require('./src/NATSBridge');
|
||||
|
||||
const config = {
|
||||
wifi_ssid: "MyNetwork",
|
||||
wifi_password: "password123",
|
||||
update_interval: 60
|
||||
};
|
||||
|
||||
await smartsend("/device/config", [
|
||||
{ dataname: "config", data: config, type: "dictionary" }
|
||||
]);
|
||||
```
|
||||
|
||||
#### Julia
|
||||
```julia
|
||||
using NATSBridge
|
||||
|
||||
config = Dict(
|
||||
"wifi_ssid" => "MyNetwork",
|
||||
"wifi_password" => "password123",
|
||||
"update_interval" => 60
|
||||
)
|
||||
|
||||
data = [("config", config, "dictionary")]
|
||||
NATSBridge.smartsend("/device/config", data)
|
||||
```
|
||||
|
||||
### Example 3: Table Data (Arrow IPC)
|
||||
|
||||
Send tabular data using Apache Arrow IPC format.
|
||||
|
||||
#### Python/Micropython
|
||||
```python
|
||||
import pandas as pd
|
||||
from nats_bridge import smartsend
|
||||
|
||||
df = pd.DataFrame({
|
||||
"id": [1, 2, 3],
|
||||
"name": ["Alice", "Bob", "Charlie"],
|
||||
"score": [95, 88, 92]
|
||||
})
|
||||
|
||||
data = [("students", df, "table")]
|
||||
smartsend("/data/analysis", data)
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
```javascript
|
||||
const { smartsend } = require('./src/NATSBridge');
|
||||
|
||||
const tableData = [
|
||||
{ id: 1, name: "Alice", score: 95 },
|
||||
{ id: 2, name: "Bob", score: 88 },
|
||||
{ id: 3, name: "Charlie", score: 92 }
|
||||
];
|
||||
|
||||
await smartsend("/data/analysis", [
|
||||
{ dataname: "students", data: tableData, type: "table" }
|
||||
]);
|
||||
```
|
||||
|
||||
#### Julia
|
||||
```julia
|
||||
using NATSBridge
|
||||
using DataFrames
|
||||
|
||||
df = DataFrame(
|
||||
id = [1, 2, 3],
|
||||
name = ["Alice", "Bob", "Charlie"],
|
||||
score = [95, 88, 92]
|
||||
)
|
||||
|
||||
data = [("students", df, "table")]
|
||||
NATSBridge.smartsend("/data/analysis", data)
|
||||
```
|
||||
|
||||
### Example 4: Request-Response Pattern
|
||||
|
||||
Bi-directional communication with reply-to support.
|
||||
|
||||
#### Python/Micropython (Requester)
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
env = smartsend(
|
||||
"/device/command",
|
||||
[("command", {"action": "read_sensor"}, "dictionary")],
|
||||
reply_to="/device/response"
|
||||
)
|
||||
```
|
||||
|
||||
#### Python/Micropython (Responder)
|
||||
```python
|
||||
import nats
|
||||
import asyncio
|
||||
from nats_bridge import smartreceive, smartsend
|
||||
|
||||
# Configuration
|
||||
SUBJECT = "/device/command"
|
||||
REPLY_SUBJECT = "/device/response"
|
||||
NATS_URL = "nats://localhost:4222"
|
||||
|
||||
async def main():
|
||||
nc = await nats.connect(NATS_URL)
|
||||
|
||||
async def message_handler(msg):
|
||||
envelope = smartreceive(msg.data)
|
||||
for dataname, data, type in envelope["payloads"]:
|
||||
if data.get("action") == "read_sensor":
|
||||
response = {"sensor_id": "sensor-001", "value": 42.5}
|
||||
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
|
||||
|
||||
sid = await nc.subscribe(SUBJECT, cb=message_handler)
|
||||
await asyncio.sleep(120)
|
||||
await nc.close()
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
#### JavaScript (Requester)
|
||||
```javascript
|
||||
const { smartsend } = require('./src/NATSBridge');
|
||||
|
||||
await smartsend("/device/command", [
|
||||
{ dataname: "command", data: { action: "read_sensor" }, type: "dictionary" }
|
||||
], {
|
||||
replyTo: "/device/response"
|
||||
});
|
||||
```
|
||||
|
||||
#### JavaScript (Responder)
|
||||
```javascript
|
||||
const { smartreceive, smartsend } = require('./src/NATSBridge');
|
||||
const { connect } = require('nats');
|
||||
|
||||
// Configuration
|
||||
const SUBJECT = "/device/command";
|
||||
const REPLY_SUBJECT = "/device/response";
|
||||
const NATS_URL = "nats://localhost:4222";
|
||||
|
||||
async function main() {
|
||||
const nc = await connect({ servers: [NATS_URL] });
|
||||
|
||||
const sub = nc.subscribe(SUBJECT);
|
||||
|
||||
for await (const msg of sub) {
|
||||
const envelope = await smartreceive(msg);
|
||||
for (const payload of envelope.payloads) {
|
||||
if (payload.dataname === "command" && payload.data.action === "read_sensor") {
|
||||
const response = { sensor_id: "sensor-001", value: 42.5 };
|
||||
await smartsend(REPLY_SUBJECT, [
|
||||
{ dataname: "data", data: response, type: "dictionary" }
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
main();
|
||||
```
|
||||
|
||||
#### Julia (Requester)
|
||||
```julia
|
||||
using NATSBridge
|
||||
|
||||
env = NATSBridge.smartsend(
|
||||
"/device/command",
|
||||
[("command", Dict("action" => "read_sensor"), "dictionary")],
|
||||
reply_to="/device/response"
|
||||
)
|
||||
```
|
||||
|
||||
#### Julia (Responder)
|
||||
```julia
|
||||
using NATS, NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/device/command"
|
||||
const REPLY_SUBJECT = "/device/response"
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
|
||||
function test_responder()
|
||||
conn = NATS.connect(NATS_URL)
|
||||
NATS.subscribe(conn, SUBJECT) do msg
|
||||
envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
|
||||
for (dataname, data, type) in envelope["payloads"]
|
||||
if dataname == "command" && data["action"] == "read_sensor"
|
||||
response = Dict("sensor_id" => "sensor-001", "value" => 42.5)
|
||||
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
sleep(120)
|
||||
NATS.drain(conn)
|
||||
end
|
||||
|
||||
test_responder()
|
||||
```
|
||||
|
||||
### Example 5: Micropython IoT Device
|
||||
|
||||
Lightweight Micropython device sending sensor data.
|
||||
|
||||
#### Micropython
|
||||
```python
|
||||
import nats
|
||||
import asyncio
|
||||
from nats_bridge import smartsend, smartreceive
|
||||
|
||||
# Configuration
|
||||
SUBJECT = "/device/sensors"
|
||||
NATS_URL = "nats://localhost:4222"
|
||||
|
||||
async def main():
|
||||
nc = await nats.connect(NATS_URL)
|
||||
|
||||
# Send sensor data
|
||||
data = [("temperature", "25.5", "text"), ("humidity", 65, "dictionary")]
|
||||
smartsend("/device/sensors", data, nats_url="nats://localhost:4222")
|
||||
|
||||
# Receive commands - msg comes from the callback
|
||||
async def message_handler(msg):
|
||||
envelope = smartreceive(msg.data)
|
||||
for dataname, data, type in envelope["payloads"]:
|
||||
if type == "dictionary" and data.get("action") == "reboot":
|
||||
# Execute reboot
|
||||
pass
|
||||
|
||||
sid = await nc.subscribe(SUBJECT, cb=message_handler)
|
||||
await asyncio.sleep(120)
|
||||
await nc.close()
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
#### JavaScript (Receiver)
|
||||
```javascript
|
||||
const { smartreceive } = require('./src/NATSBridge');
|
||||
const { connect } = require('nats');
|
||||
|
||||
// Configuration
|
||||
const SUBJECT = "/device/sensors";
|
||||
const NATS_URL = "nats://localhost:4222";
|
||||
|
||||
async function main() {
|
||||
const nc = await connect({ servers: [NATS_URL] });
|
||||
|
||||
const sub = nc.subscribe(SUBJECT);
|
||||
|
||||
for await (const msg of sub) {
|
||||
const envelope = await smartreceive(msg);
|
||||
for (const payload of envelope.payloads) {
|
||||
if (payload.dataname === "temperature") {
|
||||
console.log(`Temperature: ${payload.data}`);
|
||||
} else if (payload.dataname === "humidity") {
|
||||
console.log(`Humidity: ${payload.data}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
main();
|
||||
```
|
||||
|
||||
#### Julia (Receiver)
|
||||
```julia
|
||||
using NATS, NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/device/sensors"
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
|
||||
function test_receiver()
|
||||
conn = NATS.connect(NATS_URL)
|
||||
NATS.subscribe(conn, SUBJECT) do msg
|
||||
envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
|
||||
for (dataname, data, type) in envelope["payloads"]
|
||||
if dataname == "temperature"
|
||||
println("Temperature: $data")
|
||||
elseif dataname == "humidity"
|
||||
println("Humidity: $data")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
sleep(120)
|
||||
NATS.drain(conn)
|
||||
end
|
||||
|
||||
test_receiver()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Testing
|
||||
|
||||
Run the test scripts to verify functionality:
|
||||
|
||||
### Python/Micropython
|
||||
|
||||
```bash
|
||||
# Basic functionality test
|
||||
python test/test_micropython_basic.py
|
||||
|
||||
# Text message exchange
|
||||
python test/test_micropython_text_sender.py
|
||||
python test/test_micropython_text_receiver.py
|
||||
|
||||
# Dictionary exchange
|
||||
python test/test_micropython_dict_sender.py
|
||||
python test/test_micropython_dict_receiver.py
|
||||
|
||||
# File transfer
|
||||
python test/test_micropython_file_sender.py
|
||||
python test/test_micropython_file_receiver.py
|
||||
|
||||
# Mixed payload types
|
||||
python test/test_micropython_mixed_sender.py
|
||||
python test/test_micropython_mixed_receiver.py
|
||||
```
|
||||
|
||||
### JavaScript
|
||||
|
||||
```bash
|
||||
# Text message exchange
|
||||
node test/test_js_text_sender.js
|
||||
node test/test_js_text_receiver.js
|
||||
|
||||
# Dictionary exchange
|
||||
node test/test_js_dict_sender.js
|
||||
node test/test_js_dict_receiver.js
|
||||
|
||||
# File transfer
|
||||
node test/test_js_file_sender.js
|
||||
node test/test_js_file_receiver.js
|
||||
|
||||
# Mixed payload types
|
||||
node test/test_js_mix_payload_sender.js
|
||||
node test/test_js_mix_payloads_receiver.js
|
||||
|
||||
# Table exchange
|
||||
node test/test_js_table_sender.js
|
||||
node test/test_js_table_receiver.js
|
||||
```
|
||||
|
||||
### Julia
|
||||
|
||||
```julia
|
||||
# Text message exchange
|
||||
julia test/test_julia_text_sender.jl
|
||||
julia test/test_julia_text_receiver.jl
|
||||
|
||||
# Dictionary exchange
|
||||
julia test/test_julia_dict_sender.jl
|
||||
julia test/test_julia_dict_receiver.jl
|
||||
|
||||
# File transfer
|
||||
julia test/test_julia_file_sender.jl
|
||||
julia test/test_julia_file_receiver.jl
|
||||
|
||||
# Mixed payload types
|
||||
julia test/test_julia_mix_payloads_sender.jl
|
||||
julia test/test_julia_mix_payloads_receiver.jl
|
||||
|
||||
# Table exchange
|
||||
julia test/test_julia_table_sender.jl
|
||||
julia test/test_julia_table_receiver.jl
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## License
|
||||
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2026 NATSBridge Contributors
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
14
plik_fileserver/docker-compose.yml
Normal file
14
plik_fileserver/docker-compose.yml
Normal file
@@ -0,0 +1,14 @@
|
||||
services:
|
||||
plik:
|
||||
image: rootgg/plik:latest
|
||||
container_name: plik-server
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "8080:8080"
|
||||
volumes:
|
||||
# # Mount the config file (created below)
|
||||
# - ./plikd.cfg:/home/plik/server/plikd.cfg
|
||||
# Mount local folder for uploads and database
|
||||
- ./plik-data:/data
|
||||
# Set user to match your host UID to avoid permission issues
|
||||
user: "1000:1000"
|
||||
295
src/README.md
295
src/README.md
@@ -1,295 +0,0 @@
|
||||
# NATSBridge
|
||||
|
||||
A high-performance, bi-directional data bridge for **Julia**, **JavaScript**, and **Python/Micropython** using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
|
||||
|
||||
## Overview
|
||||
|
||||
NATSBridge enables seamless communication between Julia, JavaScript, and Python/Micropython applications through NATS, with automatic transport selection based on payload size:
|
||||
|
||||
- **Direct Transport**: Payloads < 1MB are sent directly via NATS (Base64 encoded)
|
||||
- **Link Transport**: Payloads >= 1MB are uploaded to an HTTP file server and referenced via URL
|
||||
|
||||
## Features
|
||||
|
||||
- ✅ Bi-directional NATS communication across Julia ↔ JavaScript ↔ Python/Micropython
|
||||
- ✅ Multi-payload support (mixed content in single message)
|
||||
- ✅ Automatic transport selection based on payload size
|
||||
- ✅ File server integration for large payloads
|
||||
- ✅ Exponential backoff for URL fetching
|
||||
- ✅ Correlation ID tracking
|
||||
- ✅ Reply-to support for request-response pattern
|
||||
|
||||
## Supported Payload Types
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `text` | Plain text strings |
|
||||
| `dictionary` | JSON-serializable dictionaries |
|
||||
| `table` | Tabular data (Arrow IPC format) |
|
||||
| `image` | Image data (PNG, JPG bytes) |
|
||||
| `audio` | Audio data (WAV, MP3 bytes) |
|
||||
| `video` | Video data (MP4, AVI bytes) |
|
||||
| `binary` | Generic binary data |
|
||||
|
||||
## Implementation Guides
|
||||
|
||||
### [Julia Implementation](../tutorial_julia.md)
|
||||
|
||||
See the [Julia tutorial](../tutorial_julia.md) for getting started with Julia.
|
||||
|
||||
### [JavaScript Implementation](#javascript-implementation)
|
||||
|
||||
See [`NATSBridge.js`](NATSBridge.js) for the JavaScript implementation.
|
||||
|
||||
### [Python/Micropython Implementation](#pythonmicropython-implementation)
|
||||
|
||||
See [`nats_bridge.py`](nats_bridge.py) for the Python/Micropython implementation.
|
||||
|
||||
## Installation
|
||||
|
||||
### Julia
|
||||
|
||||
```julia
|
||||
using Pkg
|
||||
Pkg.add("NATS")
|
||||
Pkg.add("Arrow")
|
||||
Pkg.add("JSON3")
|
||||
Pkg.add("HTTP")
|
||||
Pkg.add("UUIDs")
|
||||
Pkg.add("Dates")
|
||||
```
|
||||
|
||||
### JavaScript
|
||||
|
||||
```bash
|
||||
npm install nats.js apache-arrow uuid base64-url
|
||||
```
|
||||
|
||||
### Python/Micropython
|
||||
|
||||
1. Copy `nats_bridge.py` to your device
|
||||
2. Ensure you have the following dependencies:
|
||||
- `urequests` for HTTP requests (Micropython)
|
||||
- `requests` for HTTP requests (Python)
|
||||
- `base64` for base64 encoding
|
||||
- `json` for JSON handling
|
||||
- `socket` for networking (Micropython)
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic Text Message
|
||||
|
||||
#### Python/Micropython
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend, smartreceive
|
||||
|
||||
# Sender
|
||||
data = [("message", "Hello World", "text")]
|
||||
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
|
||||
|
||||
# Receiver
|
||||
payloads = smartreceive(msg)
|
||||
for dataname, data, type in payloads:
|
||||
print("Received {}: {}".format(dataname, data))
|
||||
```
|
||||
|
||||
#### Julia
|
||||
|
||||
```julia
|
||||
using NATSBridge
|
||||
|
||||
# Sender
|
||||
data = [("message", "Hello World", "text")]
|
||||
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
|
||||
|
||||
# Receiver
|
||||
envelope = smartreceive(msg, fileserverDownloadHandler)
|
||||
# envelope["payloads"] = [("message", "Hello World", "text"), ...]
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const { smartsend, smartreceive } = require('./src/NATSBridge');
|
||||
|
||||
// Sender
|
||||
await smartsend("/chat/room1", [
|
||||
{ dataname: "message", data: "Hello World", type: "text" }
|
||||
], { natsUrl: "nats://localhost:4222" });
|
||||
|
||||
// Receiver
|
||||
const envelope = await smartreceive(msg);
|
||||
// envelope.payloads = [{ dataname: "message", data: "Hello World", type: "text" }, ...]
|
||||
```
|
||||
|
||||
### Sending JSON Configuration
|
||||
|
||||
#### Python/Micropython
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
config = {
|
||||
"wifi_ssid": "MyNetwork",
|
||||
"wifi_password": "password123",
|
||||
"update_interval": 60
|
||||
}
|
||||
|
||||
data = [("config", config, "dictionary")]
|
||||
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")
|
||||
```
|
||||
|
||||
### Mixed Content (Chat with Text + Image)
|
||||
|
||||
#### Python/Micropython
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
image_data = b"\x89PNG..." # PNG bytes
|
||||
|
||||
data = [
|
||||
("message_text", "Hello with image!", "text"),
|
||||
("user_avatar", image_data, "binary")
|
||||
]
|
||||
|
||||
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
|
||||
```
|
||||
|
||||
### Request-Response Pattern
|
||||
|
||||
#### Python/Micropython
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
# Send command with reply-to
|
||||
data = [("command", {"action": "read_sensor"}, "dictionary")]
|
||||
env = smartsend(
|
||||
"/device/command",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
reply_to="/device/response",
|
||||
reply_to_msg_id="cmd-001"
|
||||
)
|
||||
```
|
||||
|
||||
### Large Payloads (File Server)
|
||||
|
||||
#### Python/Micropython
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
# Large data (> 1MB)
|
||||
large_data = b"A" * 2000000 # 2MB
|
||||
|
||||
env = smartsend(
|
||||
"/data/large",
|
||||
[("large_file", large_data, "binary")],
|
||||
nats_url="nats://localhost:4222",
|
||||
fileserver_url="http://localhost:8080",
|
||||
size_threshold=1000000 # 1MB threshold
|
||||
)
|
||||
```
|
||||
|
||||
## API Reference
|
||||
|
||||
### `smartsend(subject, data, ...)`
|
||||
|
||||
Send data via NATS with automatic transport selection.
|
||||
|
||||
**Arguments:**
|
||||
- `subject` (str): NATS subject to publish to
|
||||
- `data` (list): List of `(dataname, data, type)` tuples
|
||||
- `nats_url` (str): NATS server URL (default: `nats://localhost:4222`)
|
||||
- `fileserver_url` (str): HTTP file server URL (default: `http://localhost:8080`)
|
||||
- `size_threshold` (int): Threshold in bytes (default: 1,000,000)
|
||||
- `correlation_id` (str): Optional correlation ID for tracing
|
||||
- `msg_purpose` (str): Message purpose (default: `"chat"`)
|
||||
- `sender_name` (str): Sender name (default: `"NATSBridge"`)
|
||||
- `receiver_name` (str): Receiver name (default: `""`)
|
||||
- `receiver_id` (str): Receiver ID (default: `""`)
|
||||
- `reply_to` (str): Reply topic (default: `""`)
|
||||
- `reply_to_msg_id` (str): Reply message ID (default: `""`)
|
||||
|
||||
**Returns:** `MessageEnvelope` object
|
||||
|
||||
### `smartreceive(msg, ...)`
|
||||
|
||||
Receive and process NATS messages.
|
||||
|
||||
**Arguments:**
|
||||
- `msg`: NATS message (dict or JSON string)
|
||||
- `fileserver_download_handler` (function): Function to fetch data from URLs
|
||||
- `max_retries` (int): Maximum retry attempts (default: 5)
|
||||
- `base_delay` (int): Initial delay in ms (default: 100)
|
||||
- `max_delay` (int): Maximum delay in ms (default: 5000)
|
||||
|
||||
**Returns:** List of `(dataname, data, type)` tuples
|
||||
|
||||
### `MessageEnvelope`
|
||||
|
||||
Represents a complete NATS message envelope.
|
||||
|
||||
**Attributes:**
|
||||
- `correlation_id`: Unique identifier for tracing
|
||||
- `msg_id`: Unique message identifier
|
||||
- `timestamp`: Message publication timestamp
|
||||
- `send_to`: NATS subject
|
||||
- `msg_purpose`: Message purpose
|
||||
- `sender_name`: Sender name
|
||||
- `sender_id`: Sender UUID
|
||||
- `receiver_name`: Receiver name
|
||||
- `receiver_id`: Receiver UUID
|
||||
- `reply_to`: Reply topic
|
||||
- `reply_to_msg_id`: Reply message ID
|
||||
- `broker_url`: NATS broker URL
|
||||
- `metadata`: Message-level metadata
|
||||
- `payloads`: List of MessagePayload objects
|
||||
|
||||
### `MessagePayload`
|
||||
|
||||
Represents a single payload within a message envelope.
|
||||
|
||||
**Attributes:**
|
||||
- `id`: Unique payload identifier
|
||||
- `dataname`: Name of the payload
|
||||
- `type`: Payload type ("text", "dictionary", etc.)
|
||||
- `transport`: Transport method ("direct" or "link")
|
||||
- `encoding`: Encoding method ("none", "base64", etc.)
|
||||
- `size`: Payload size in bytes
|
||||
- `data`: Payload data (bytes for direct, URL for link)
|
||||
- `metadata`: Payload-level metadata
|
||||
|
||||
## Examples
|
||||
|
||||
See [`examples/micropython_example.py`](../examples/micropython_example.py) for more detailed examples.
|
||||
|
||||
## Testing
|
||||
|
||||
Run the test suite:
|
||||
|
||||
```bash
|
||||
# Python/Micropython
|
||||
python test/test_micropython_basic.py
|
||||
|
||||
# JavaScript
|
||||
node test/test_js_to_js_text_sender.js
|
||||
node test/test_js_to_js_text_receiver.js
|
||||
|
||||
# Julia
|
||||
julia test/test_julia_to_julia_text_sender.jl
|
||||
julia test/test_julia_to_julia_text_receiver.jl
|
||||
```
|
||||
|
||||
## Requirements
|
||||
|
||||
- **Julia**: NATS server (nats.io), HTTP file server (optional)
|
||||
- **JavaScript**: NATS server (nats.io), HTTP file server (optional)
|
||||
- **Python/Micropython**: NATS server (nats.io), HTTP file server (optional)
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
Reference in New Issue
Block a user