update
This commit is contained in:
@@ -51,3 +51,15 @@ Ecosystem Variance: Low-level native functions (e.g., NATS.connect(), JSON.read(
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
I'm expanding this Julia package (NATSBridge) into a cross-platform project by adding a JavaScript and Python/MicroPython implementation. To ensure accuracy, the Julia src directory will serve as the ground truth, as the documentation may be outdated.
|
||||
|
||||
My goal is to maintain interface parity at the high-level API for a consistent user experience, while ensuring the low-level implementation adheres strictly to the idiomatic conventions of each respective language (e.g., multiple dispatch in Julia vs. asynchronous, prototype, or class-based patterns in JS and Python/MicroPython)
|
||||
|
||||
Now, help me do the following:
|
||||
1) check architecture.md for any mistake.
|
||||
|
||||
|
||||
|
||||
|
||||
1370
docs/architecture.md
1370
docs/architecture.md
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
8
etc.jl
8
etc.jl
@@ -20,10 +20,12 @@ Ecosystem Variance: Low-level native functions (e.g., NATS.connect(), JSON.read(
|
||||
|
||||
|
||||
|
||||
Help me expands this Julia package (NATSBridge) into a cross-platform project by building a JavaScript implementation at src/NATSBridge.js in the current workspace. To ensure accuracy, the Julia src directory will serve as the ground truth, as the documentation may be outdated.
|
||||
|
||||
My goal is to maintain interface parity at the high-level API for a consistent user experience, while ensuring the low-level implementation adheres strictly to the idiomatic conventions of each respective language (e.g., multiple dispatch in Julia vs. asynchronous, prototype, or class-based patterns in JS)
|
||||
Help me expands this Julia package (NATSBridge) into a cross-platform project by adding a JavaScript and Python/MicroPython implementation. To ensure accuracy, the Julia src directory will serve as the ground truth, as the documentation may be outdated.
|
||||
|
||||
My goal is to maintain interface parity at the high-level API for a consistent user experience, while ensuring the low-level implementation adheres strictly to the idiomatic conventions of each respective language (e.g., multiple dispatch in Julia vs. asynchronous, prototype, or class-based patterns in JS and Python/MicroPython)
|
||||
|
||||
Now do the following:
|
||||
1) check docs to see if there is any mistake.
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# NATSBridge Tutorial
|
||||
# Cross-Platform NATSBridge Tutorial
|
||||
|
||||
A step-by-step guide to get started with NATSBridge - a high-performance, bi-directional data bridge for **Julia**.
|
||||
A step-by-step guide to get started with NATSBridge across **Julia**, **JavaScript**, and **Python/MicroPython**.
|
||||
|
||||
## Table of Contents
|
||||
|
||||
@@ -15,22 +15,35 @@ A step-by-step guide to get started with NATSBridge - a high-performance, bi-dir
|
||||
|
||||
## Overview
|
||||
|
||||
NATSBridge enables seamless communication for Julia applications through NATS, with automatic transport selection based on payload size:
|
||||
NATSBridge enables seamless communication across platforms through NATS, with automatic transport selection based on payload size:
|
||||
|
||||
- **Direct Transport**: Payloads < 1MB are sent directly via NATS (Base64 encoded)
|
||||
- **Link Transport**: Payloads >= 1MB are uploaded to an HTTP file server and referenced via URL
|
||||
|
||||
### Cross-Platform API Parity
|
||||
|
||||
All three platforms use the same high-level API:
|
||||
|
||||
```
|
||||
# Input format
|
||||
smartsend(subject, [(dataname, data, type), ...], options)
|
||||
|
||||
# Output format
|
||||
(env, env_json_str) = smartsend(...)
|
||||
env = smartreceive(msg, options)
|
||||
```
|
||||
|
||||
### Supported Payload Types
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `text` | Plain text strings |
|
||||
| `dictionary` | JSON-serializable dictionaries |
|
||||
| `table` | Tabular data (Arrow IPC format) |
|
||||
| `image` | Image data (PNG, JPG bytes) |
|
||||
| `audio` | Audio data (WAV, MP3 bytes) |
|
||||
| `video` | Video data (MP4, AVI bytes) |
|
||||
| `binary` | Generic binary data |
|
||||
| Type | Julia | JavaScript | Python | MicroPython |
|
||||
|------|-------|------------|--------|-------------|
|
||||
| `text` | `String` | `string` | `str` | `str` |
|
||||
| `dictionary` | `Dict` | `Object` | `dict` | `dict` |
|
||||
| `table` | `DataFrame` | `Array<Object>` | `DataFrame` | ❌ |
|
||||
| `image` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
||||
| `audio` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
||||
| `video` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
||||
| `binary` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
||||
|
||||
---
|
||||
|
||||
@@ -40,7 +53,7 @@ Before you begin, ensure you have:
|
||||
|
||||
1. **NATS Server** running (or accessible)
|
||||
2. **HTTP File Server** (optional, for large payloads > 1MB)
|
||||
3. **Julia** with required packages
|
||||
3. **Platform-specific packages** installed
|
||||
|
||||
---
|
||||
|
||||
@@ -58,6 +71,29 @@ Pkg.add("UUIDs")
|
||||
Pkg.add("Dates")
|
||||
```
|
||||
|
||||
### JavaScript (Node.js)
|
||||
|
||||
```bash
|
||||
npm install nats uuid apache-arrow node-fetch
|
||||
```
|
||||
|
||||
### JavaScript (Browser)
|
||||
|
||||
```html
|
||||
<script src="https://unpkg.com/nats-js/dist/bundle/nats.min.js"></script>
|
||||
<script src="https://unpkg.com/apache-arrow/arrow.min.js"></script>
|
||||
```
|
||||
|
||||
### Python (Desktop)
|
||||
|
||||
```bash
|
||||
pip install nats-py aiohttp pyarrow pandas
|
||||
```
|
||||
|
||||
### MicroPython
|
||||
|
||||
Uses built-in modules: `network`, `socket`, `time`, `json`, `base64`
|
||||
|
||||
---
|
||||
|
||||
## Quick Start
|
||||
@@ -71,10 +107,7 @@ docker run -p 4222:4222 nats:latest
|
||||
### Step 2: Start HTTP File Server (Optional)
|
||||
|
||||
```bash
|
||||
# Create a directory for file uploads
|
||||
mkdir -p /tmp/fileserver
|
||||
|
||||
# Use any HTTP server that supports POST for file uploads
|
||||
python3 -m http.server 8080 --directory /tmp/fileserver
|
||||
```
|
||||
|
||||
@@ -98,6 +131,72 @@ env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:
|
||||
# env_json_str: JSON string for publishing to NATS
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const NATSBridge = require('./src/natbridge.js');
|
||||
|
||||
// Send a text message
|
||||
const data = [["message", "Hello World", "text"]];
|
||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||||
"/chat/room1",
|
||||
data,
|
||||
{ broker_url: "nats://localhost:4222" }
|
||||
);
|
||||
// env: Object with all metadata and payloads
|
||||
// env_json_str: JSON string for publishing
|
||||
console.log("Message sent!");
|
||||
|
||||
// Or use is_publish=false
|
||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||||
"/chat/room1",
|
||||
data,
|
||||
{ broker_url: "nats://localhost:4222", is_publish: false }
|
||||
);
|
||||
```
|
||||
|
||||
#### Python
|
||||
|
||||
```python
|
||||
from natbridge import smartsend
|
||||
|
||||
# Send a text message
|
||||
data = [("message", "Hello World", "text")]
|
||||
env, env_json_str = await smartsend(
|
||||
"/chat/room1",
|
||||
data,
|
||||
broker_url="nats://localhost:4222"
|
||||
)
|
||||
# env: Dict with all metadata and payloads
|
||||
# env_json_str: JSON string for publishing
|
||||
print("Message sent!")
|
||||
|
||||
# Or use is_publish=False
|
||||
env, env_json_str = await smartsend(
|
||||
"/chat/room1",
|
||||
data,
|
||||
broker_url="nats://localhost:4222",
|
||||
is_publish=False
|
||||
)
|
||||
```
|
||||
|
||||
#### MicroPython
|
||||
|
||||
```python
|
||||
from natbridge_mpy import NATSBridge
|
||||
|
||||
bridge = NATSBridge()
|
||||
|
||||
# Send a text message (limited to small payloads)
|
||||
data = [("message", "Hello World", "text")]
|
||||
env, env_json_str = bridge.smartsend(
|
||||
"/chat/room1",
|
||||
data,
|
||||
size_threshold=100000 # Lower threshold for MicroPython
|
||||
)
|
||||
print("Message sent!")
|
||||
```
|
||||
|
||||
### Step 4: Receive Messages
|
||||
|
||||
#### Julia
|
||||
@@ -113,6 +212,36 @@ for (dataname, data, type) in env["payloads"]
|
||||
end
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const NATSBridge = require('./src/natbridge.js');
|
||||
|
||||
// Receive and process message
|
||||
const env = await NATSBridge.smartreceive(msg, {
|
||||
fileserver_download_handler: NATSBridge.fetchWithBackoff
|
||||
});
|
||||
// env.payloads = [[dataname, data, type], ...]
|
||||
for (const [dataname, data, type] of env.payloads) {
|
||||
console.log(`Received ${dataname}:`, data);
|
||||
}
|
||||
```
|
||||
|
||||
#### Python
|
||||
|
||||
```python
|
||||
from natbridge import smartreceive
|
||||
|
||||
# Receive and process message
|
||||
env = await smartreceive(
|
||||
msg,
|
||||
fileserver_download_handler=fetch_with_backoff
|
||||
)
|
||||
# env["payloads"] = [(dataname, data, type), ...]
|
||||
for dataname, data, type in env["payloads"]:
|
||||
print(f"Received {dataname}: {data}")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Basic Examples
|
||||
@@ -134,6 +263,65 @@ data = [("config", config, "dictionary")]
|
||||
env, env_json_str = smartsend("/device/config", data, broker_url="nats://localhost:4222")
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const NATSBridge = require('./src/natbridge.js');
|
||||
|
||||
const config = {
|
||||
wifi_ssid: "MyNetwork",
|
||||
wifi_password: "password123",
|
||||
update_interval: 60
|
||||
};
|
||||
|
||||
const data = [["config", config, "dictionary"]];
|
||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||||
"/device/config",
|
||||
data,
|
||||
{ broker_url: "nats://localhost:4222" }
|
||||
);
|
||||
```
|
||||
|
||||
#### Python
|
||||
|
||||
```python
|
||||
from natbridge import smartsend
|
||||
|
||||
config = {
|
||||
"wifi_ssid": "MyNetwork",
|
||||
"wifi_password": "password123",
|
||||
"update_interval": 60
|
||||
}
|
||||
|
||||
data = [("config", config, "dictionary")]
|
||||
env, env_json_str = await smartsend(
|
||||
"/device/config",
|
||||
data,
|
||||
broker_url="nats://localhost:4222"
|
||||
)
|
||||
```
|
||||
|
||||
#### MicroPython
|
||||
|
||||
```python
|
||||
from natbridge_mpy import NATSBridge
|
||||
|
||||
bridge = NATSBridge()
|
||||
|
||||
config = {
|
||||
"wifi_ssid": "MyNetwork",
|
||||
"wifi_password": "password123",
|
||||
"update_interval": 60
|
||||
}
|
||||
|
||||
data = [("config", config, "dictionary")]
|
||||
env, env_json_str = bridge.smartsend(
|
||||
"/device/config",
|
||||
data,
|
||||
size_threshold=100000
|
||||
)
|
||||
```
|
||||
|
||||
### Example 2: Sending Binary Data (Image)
|
||||
|
||||
#### Julia
|
||||
@@ -148,6 +336,59 @@ data = [("user_image", image_data, "binary")]
|
||||
env, env_json_str = smartsend("/chat/image", data, broker_url="nats://localhost:4222")
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const NATSBridge = require('./src/natbridge.js');
|
||||
const fs = require('fs');
|
||||
|
||||
// Read image file
|
||||
const image_data = fs.readFileSync('image.png');
|
||||
|
||||
const data = [["user_image", image_data, "binary"]];
|
||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||||
"/chat/image",
|
||||
data,
|
||||
{ broker_url: "nats://localhost:4222" }
|
||||
);
|
||||
```
|
||||
|
||||
#### Python
|
||||
|
||||
```python
|
||||
from natbridge import smartsend
|
||||
|
||||
# Read image file
|
||||
with open("image.png", "rb") as f:
|
||||
image_data = f.read()
|
||||
|
||||
data = [("user_image", image_data, "binary")]
|
||||
env, env_json_str = await smartsend(
|
||||
"/chat/image",
|
||||
data,
|
||||
broker_url="nats://localhost:4222"
|
||||
)
|
||||
```
|
||||
|
||||
#### MicroPython
|
||||
|
||||
```python
|
||||
from natbridge_mpy import NATSBridge
|
||||
|
||||
bridge = NATSBridge()
|
||||
|
||||
# Read image file
|
||||
with open("image.png", "rb") as f:
|
||||
image_data = f.read()
|
||||
|
||||
data = [("user_image", image_data, "binary")]
|
||||
env, env_json_str = bridge.smartsend(
|
||||
"/chat/image",
|
||||
data,
|
||||
size_threshold=100000
|
||||
)
|
||||
```
|
||||
|
||||
### Example 3: Request-Response Pattern
|
||||
|
||||
#### Julia (Requester)
|
||||
@@ -164,16 +405,47 @@ env, env_json_str = smartsend(
|
||||
reply_to="/device/response",
|
||||
reply_to_msg_id="cmd-001"
|
||||
)
|
||||
# env: msg_envelope_v1 object
|
||||
# env_json_str: JSON string for publishing to NATS
|
||||
```
|
||||
|
||||
#### JavaScript (Requester)
|
||||
|
||||
```javascript
|
||||
const NATSBridge = require('./src/natbridge.js');
|
||||
|
||||
// Send command with reply-to
|
||||
const data = [["command", { action: "read_sensor" }, "dictionary"]];
|
||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||||
"/device/command",
|
||||
data,
|
||||
{
|
||||
broker_url: "nats://localhost:4222",
|
||||
reply_to: "/device/response",
|
||||
reply_to_msg_id: "cmd-001"
|
||||
}
|
||||
);
|
||||
```
|
||||
|
||||
#### Python (Requester)
|
||||
|
||||
```python
|
||||
from natbridge import smartsend
|
||||
|
||||
# Send command with reply-to
|
||||
data = [("command", {"action": "read_sensor"}, "dictionary")]
|
||||
env, env_json_str = await smartsend(
|
||||
"/device/command",
|
||||
data,
|
||||
broker_url="nats://localhost:4222",
|
||||
reply_to="/device/response",
|
||||
reply_to_msg_id="cmd-001"
|
||||
)
|
||||
```
|
||||
|
||||
#### Julia (Responder)
|
||||
|
||||
```julia
|
||||
using NATS, NATSBridge
|
||||
using NATSBridge, NATS
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/device/command"
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
|
||||
@@ -182,13 +454,11 @@ function test_responder()
|
||||
NATS.subscribe(conn, SUBJECT) do msg
|
||||
env = smartreceive(msg, fileserver_download_handler=_fetch_with_backoff)
|
||||
|
||||
# Extract reply_to from the envelope metadata
|
||||
reply_to = env["reply_to"]
|
||||
|
||||
for (dataname, data, type) in env["payloads"]
|
||||
if dataname == "command" && data["action"] == "read_sensor"
|
||||
response = Dict("sensor_id" => "sensor-001", "value" => 42.5)
|
||||
# Send response to the reply_to subject from the request
|
||||
if !isempty(reply_to)
|
||||
smartsend(reply_to, [("data", response, "dictionary")])
|
||||
end
|
||||
@@ -226,10 +496,72 @@ env, env_json_str = smartsend(
|
||||
fileserver_url="http://localhost:8080"
|
||||
)
|
||||
|
||||
# The envelope will contain the download URL
|
||||
println("File uploaded to: $(env.payloads[1].data)")
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const NATSBridge = require('./src/natbridge.js');
|
||||
|
||||
// Create large data (> 1MB)
|
||||
const large_data = Buffer.alloc(2_000_000);
|
||||
for (let i = 0; i < large_data.length; i++) {
|
||||
large_data[i] = Math.floor(Math.random() * 256);
|
||||
}
|
||||
|
||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||||
"/data/large",
|
||||
[["large_file", large_data, "binary"]],
|
||||
{
|
||||
broker_url: "nats://localhost:4222",
|
||||
fileserver_url: "http://localhost:8080"
|
||||
}
|
||||
);
|
||||
|
||||
console.log("File uploaded to:", env.payloads[0].data);
|
||||
```
|
||||
|
||||
#### Python
|
||||
|
||||
```python
|
||||
from natbridge import smartsend
|
||||
|
||||
# Create large data (> 1MB)
|
||||
import os
|
||||
large_data = os.urandom(2_000_000)
|
||||
|
||||
env, env_json_str = await smartsend(
|
||||
"/data/large",
|
||||
[("large_file", large_data, "binary")],
|
||||
broker_url="nats://localhost:4222",
|
||||
fileserver_url="http://localhost:8080"
|
||||
)
|
||||
|
||||
print(f"File uploaded to: {env['payloads'][0]['data']}")
|
||||
```
|
||||
|
||||
#### MicroPython
|
||||
|
||||
MicroPython enforces a hard limit of 50KB per payload:
|
||||
|
||||
```python
|
||||
from natbridge_mpy import NATSBridge
|
||||
|
||||
bridge = NATSBridge()
|
||||
|
||||
# MicroPython has a hard limit of 50KB per payload
|
||||
# Use streaming or chunking for larger data
|
||||
small_data = bytes(1000) # 1KB
|
||||
|
||||
data = [("small_file", small_data, "binary")]
|
||||
env, env_json_str = bridge.smartsend(
|
||||
"/data/small",
|
||||
data,
|
||||
size_threshold=100000 # Enforced max: 50000 bytes
|
||||
)
|
||||
```
|
||||
|
||||
### Example 5: Mixed Content (Chat with Text + Image)
|
||||
|
||||
NATSBridge supports sending multiple payloads with different types in a single message:
|
||||
@@ -249,6 +581,46 @@ data = [
|
||||
env, env_json_str = smartsend("/chat/mixed", data, broker_url="nats://localhost:4222")
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const NATSBridge = require('./src/natbridge.js');
|
||||
const fs = require('fs');
|
||||
|
||||
const image_data = fs.readFileSync('avatar.png');
|
||||
|
||||
const data = [
|
||||
["message_text", "Hello with image!", "text"],
|
||||
["user_avatar", image_data, "image"]
|
||||
];
|
||||
|
||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||||
"/chat/mixed",
|
||||
data,
|
||||
{ broker_url: "nats://localhost:4222" }
|
||||
);
|
||||
```
|
||||
|
||||
#### Python
|
||||
|
||||
```python
|
||||
from natbridge import smartsend
|
||||
|
||||
with open("avatar.png", "rb") as f:
|
||||
image_data = f.read()
|
||||
|
||||
data = [
|
||||
("message_text", "Hello with image!", "text"),
|
||||
("user_avatar", image_data, "image")
|
||||
]
|
||||
|
||||
env, env_json_str = await smartsend(
|
||||
"/chat/mixed",
|
||||
data,
|
||||
broker_url="nats://localhost:4222"
|
||||
)
|
||||
```
|
||||
|
||||
### Example 6: Table Data (Arrow IPC)
|
||||
|
||||
For tabular data, NATSBridge uses Apache Arrow IPC format:
|
||||
@@ -270,12 +642,58 @@ data = [("students", df, "table")]
|
||||
env, env_json_str = smartsend("/data/students", data, broker_url="nats://localhost:4222")
|
||||
```
|
||||
|
||||
#### JavaScript
|
||||
|
||||
```javascript
|
||||
const NATSBridge = require('./src/natbridge.js');
|
||||
|
||||
// Create table data (array of objects)
|
||||
const table_data = [
|
||||
{ id: 1, name: "Alice", score: 95 },
|
||||
{ id: 2, name: "Bob", score: 88 },
|
||||
{ id: 3, name: "Charlie", score: 92 }
|
||||
];
|
||||
|
||||
const data = [["students", table_data, "table"]];
|
||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||||
"/data/students",
|
||||
data,
|
||||
{ broker_url: "nats://localhost:4222" }
|
||||
);
|
||||
```
|
||||
|
||||
#### Python
|
||||
|
||||
```python
|
||||
from natbridge import smartsend
|
||||
import pandas as pd
|
||||
|
||||
# Create DataFrame
|
||||
df = pd.DataFrame({
|
||||
'id': [1, 2, 3],
|
||||
'name': ['Alice', 'Bob', 'Charlie'],
|
||||
'score': [95, 88, 92]
|
||||
})
|
||||
|
||||
data = [("students", df, "table")]
|
||||
env, env_json_str = await smartsend(
|
||||
"/data/students",
|
||||
data,
|
||||
broker_url="nats://localhost:4222"
|
||||
)
|
||||
```
|
||||
|
||||
#### MicroPython
|
||||
|
||||
MicroPython does not support table type due to memory constraints. Use dictionary or binary instead.
|
||||
|
||||
---
|
||||
|
||||
## Next Steps
|
||||
|
||||
1. **Explore the test directory** for more examples
|
||||
2. **Check the documentation** for advanced configuration options
|
||||
3. **Read the walkthrough** for building real-world applications
|
||||
|
||||
---
|
||||
|
||||
@@ -296,7 +714,8 @@ env, env_json_str = smartsend("/data/students", data, broker_url="nats://localho
|
||||
### Serialization Errors
|
||||
|
||||
- Verify data type matches the specified type
|
||||
- Check that binary data is in the correct format (Vector{UInt8})
|
||||
- Check that binary data is in the correct format
|
||||
- MicroPython: Ensure payload size < 50KB
|
||||
|
||||
---
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -279,42 +279,38 @@ function envelope_to_json(env::msg_envelope_v1)
|
||||
"broker_url" => env.broker_url
|
||||
)
|
||||
|
||||
if !isempty(env.metadata) # Only include metadata if it exists and is not empty
|
||||
obj["metadata"] = Dict(String(k) => v for (k, v) in env.metadata)
|
||||
end
|
||||
obj["metadata"] = Dict(String(k) => v for (k, v) in env.metadata)
|
||||
|
||||
# Convert payloads to JSON array
|
||||
if !isempty(env.payloads)
|
||||
payloads_json = []
|
||||
for payload in env.payloads
|
||||
payload_obj = Dict{String, Any}(
|
||||
"id" => payload.id,
|
||||
"dataname" => payload.dataname,
|
||||
"payload_type" => payload.payload_type,
|
||||
"transport" => payload.transport,
|
||||
"encoding" => payload.encoding,
|
||||
"size" => payload.size,
|
||||
)
|
||||
# Include data based on transport type
|
||||
if payload.transport == "direct" && payload.data !== nothing
|
||||
if payload.encoding == "base64" || payload.encoding == "json"
|
||||
payload_obj["data"] = payload.data
|
||||
else
|
||||
# For other encodings, use base64
|
||||
payload_bytes = _get_payload_bytes(payload.data)
|
||||
payload_obj["data"] = Base64.base64encode(payload_bytes)
|
||||
end
|
||||
elseif payload.transport == "link" && payload.data !== nothing
|
||||
# For link transport, data is a URL string - include directly
|
||||
payloads_json = []
|
||||
for payload in env.payloads
|
||||
payload_obj = Dict{String, Any}(
|
||||
"id" => payload.id,
|
||||
"dataname" => payload.dataname,
|
||||
"payload_type" => payload.payload_type,
|
||||
"transport" => payload.transport,
|
||||
"encoding" => payload.encoding,
|
||||
"size" => payload.size,
|
||||
)
|
||||
# Include data based on transport type
|
||||
if payload.transport == "direct" && payload.data !== nothing
|
||||
if payload.encoding == "base64" || payload.encoding == "json"
|
||||
payload_obj["data"] = payload.data
|
||||
else
|
||||
# For other encodings, use base64
|
||||
payload_bytes = _get_payload_bytes(payload.data)
|
||||
payload_obj["data"] = Base64.base64encode(payload_bytes)
|
||||
end
|
||||
if !isempty(payload.metadata)
|
||||
payload_obj["metadata"] = Dict(String(k) => v for (k, v) in payload.metadata)
|
||||
elseif payload.transport == "link" && payload.data !== nothing
|
||||
# For link transport, data is a URL string - include directly
|
||||
payload_obj["data"] = payload.data
|
||||
end
|
||||
push!(payloads_json, payload_obj)
|
||||
if !isempty(payload.metadata)
|
||||
payload_obj["metadata"] = Dict(String(k) => v for (k, v) in payload.metadata)
|
||||
end
|
||||
obj["payloads"] = payloads_json
|
||||
push!(payloads_json, payload_obj)
|
||||
end
|
||||
obj["payloads"] = payloads_json
|
||||
|
||||
JSON.json(obj)
|
||||
end
|
||||
|
||||
719
src/natbridge.js
719
src/natbridge.js
@@ -1,719 +0,0 @@
|
||||
/**
|
||||
* NATSBridge - Cross-Platform Bi-Directional Data Bridge
|
||||
* JavaScript Implementation (Node.js and Browser)
|
||||
*
|
||||
* This module provides functionality for sending and receiving data across network boundaries
|
||||
* using NATS as the message bus, with support for both direct payload transport and
|
||||
* URL-based transport for larger payloads.
|
||||
*
|
||||
* File Server Handler Architecture:
|
||||
* The system uses handler functions to abstract file server operations, allowing support
|
||||
* for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
|
||||
*
|
||||
* Handler Function Signatures:
|
||||
*
|
||||
* ```javascript
|
||||
* // Upload handler - uploads data to file server and returns URL
|
||||
* // The handler is passed to smartsend as fileserver_upload_handler parameter
|
||||
* // It receives: (fileserver_url, dataname, data)
|
||||
* // Returns: Promise<{ status, uploadid, fileid, url }>
|
||||
* fileserver_upload_handler(fileserver_url, dataname, data)
|
||||
*
|
||||
* // Download handler - fetches data from file server URL with exponential backoff
|
||||
* // The handler is passed to smartreceive as fileserver_download_handler parameter
|
||||
* // It receives: (url, max_retries, base_delay, max_delay, correlation_id)
|
||||
* // Returns: Promise<Uint8Array>
|
||||
* fileserver_download_handler(url, max_retries, base_delay, max_delay, correlation_id)
|
||||
* ```
|
||||
*
|
||||
* Multi-Payload Support (Standard API):
|
||||
* The system uses a standardized list-of-tuples format for all payload operations.
|
||||
* Even when sending a single payload, the user must wrap it in a list.
|
||||
*
|
||||
* API Standard:
|
||||
*
|
||||
* ```javascript
|
||||
* // Input format for smartsend (always a list of tuples with type info)
|
||||
* [[dataname1, data1, type1], [dataname2, data2, type2], ...]
|
||||
*
|
||||
* // Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
|
||||
* {
|
||||
* "correlation_id": "...",
|
||||
* "msg_id": "...",
|
||||
* "timestamp": "...",
|
||||
* "send_to": "...",
|
||||
* "msg_purpose": "...",
|
||||
* "sender_name": "...",
|
||||
* "sender_id": "...",
|
||||
* "receiver_name": "...",
|
||||
* "receiver_id": "...",
|
||||
* "reply_to": "...",
|
||||
* "reply_to_msg_id": "...",
|
||||
* "broker_url": "...",
|
||||
* "metadata": {...},
|
||||
* "payloads": [[dataname1, data1, type1], [dataname2, data2, type2], ...]
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||
*/
|
||||
|
||||
const nats = typeof require !== 'undefined' ? require('nats') : null;
|
||||
const { v4: uuidv4 } = typeof require !== 'undefined' ? require('uuid') : null;
|
||||
const fetch = typeof require !== 'undefined' ? require('node-fetch') : (typeof globalThis !== 'undefined' ? globalThis.fetch : undefined);
|
||||
const arrow = typeof require !== 'undefined' ? require('apache-arrow') : null;
|
||||
|
||||
/**
|
||||
* Default configuration values
|
||||
*/
|
||||
const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB - threshold for switching from direct to link transport
|
||||
const DEFAULT_BROKER_URL = 'nats://localhost:4222'; // Default NATS server URL
|
||||
const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; // Default HTTP file server URL
|
||||
|
||||
/**
|
||||
* Generate a UUID v4
|
||||
* @returns {string} UUID string
|
||||
*/
|
||||
function generateUUID() {
|
||||
if (uuidv4) {
|
||||
return uuidv4();
|
||||
}
|
||||
// Fallback UUID generation for environments without uuid package
|
||||
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
|
||||
const r = Math.random() * 16 | 0;
|
||||
const v = c === 'x' ? r : (r & 0x3 | 0x8);
|
||||
return v.toString(16);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a trace message with correlation ID and timestamp
|
||||
* @param {string} correlation_id - Correlation ID for tracing
|
||||
* @param {string} message - Message content to log
|
||||
*/
|
||||
function logTrace(correlation_id, message) {
|
||||
const timestamp = new Date().toISOString();
|
||||
console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize data according to specified format
|
||||
* @param {any} data - Data to serialize
|
||||
* @param {string} payload_type - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||
* @returns {Promise<Uint8Array>} Binary representation of the serialized data
|
||||
*/
|
||||
async function serializeData(data, payload_type) {
|
||||
if (payload_type === 'text') {
|
||||
if (typeof data === 'string') {
|
||||
return new TextEncoder().encode(data);
|
||||
} else {
|
||||
throw new Error('Text data must be a string');
|
||||
}
|
||||
} else if (payload_type === 'dictionary') {
|
||||
const jsonStr = JSON.stringify(data);
|
||||
return new TextEncoder().encode(jsonStr);
|
||||
} else if (payload_type === 'table') {
|
||||
// Use Apache Arrow for table serialization
|
||||
if (!arrow) {
|
||||
throw new Error('apache-arrow not available. Install with: npm install apache-arrow');
|
||||
}
|
||||
|
||||
// Convert array of objects to Arrow Table
|
||||
if (!Array.isArray(data)) {
|
||||
throw new Error('Table data must be an array of objects');
|
||||
}
|
||||
|
||||
// Build schema from first row if not provided
|
||||
const schemaFields = [];
|
||||
if (data.length > 0) {
|
||||
for (const key in data[0]) {
|
||||
const value = data[0][key];
|
||||
let arrowType;
|
||||
if (typeof value === 'number') {
|
||||
arrowType = arrow.float64;
|
||||
} else if (typeof value === 'boolean') {
|
||||
arrowType = arrow.bool;
|
||||
} else if (value instanceof Date) {
|
||||
arrowType = arrow.string; // Date as string
|
||||
} else {
|
||||
arrowType = arrow.string;
|
||||
}
|
||||
schemaFields.push(new arrow.Field(key, arrowType));
|
||||
}
|
||||
}
|
||||
|
||||
const schema = new arrow.Schema(schemaFields);
|
||||
|
||||
// Convert data to Arrow RecordBatch
|
||||
const arrays = {};
|
||||
for (const field of schema.fields) {
|
||||
const name = field.name;
|
||||
const type = field.type;
|
||||
|
||||
if (type instanceof arrow.Float64) {
|
||||
arrays[name] = new arrow.Float64Array(data.length);
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
arrays[name][i] = data[i][name] || 0;
|
||||
}
|
||||
} else if (type instanceof arrow.Boolean) {
|
||||
arrays[name] = new arrow.BooleanArray(data.length);
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
arrays[name][i] = data[i][name] || false;
|
||||
}
|
||||
} else {
|
||||
// String type
|
||||
const values = data.map(row => String(row[name] ?? ''));
|
||||
const offsets = new Int32Array(values.length + 1);
|
||||
let offset = 0;
|
||||
for (let i = 0; i < values.length; i++) {
|
||||
offsets[i + 1] = offset += values[i].length;
|
||||
}
|
||||
const buffer = new Uint8Array(offsets[values.length]);
|
||||
for (let i = 0; i < values.length; i++) {
|
||||
const encoder = new TextEncoder();
|
||||
const bytes = encoder.encode(values[i]);
|
||||
buffer.set(bytes, offsets[i]);
|
||||
}
|
||||
arrays[name] = new arrow.StringArray(
|
||||
new arrow.DataView(new arrow.Buffer(buffer), 0, offsets[values.length]),
|
||||
new arrow.Buffer(offsets.buffer, 0, offsets.length * 4),
|
||||
0,
|
||||
data.length
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const recordBatch = arrow.RecordBatch.fromArrays(schema, arrays, data.length);
|
||||
|
||||
// Write to IPC format
|
||||
const buffer = arrow.tableFromBatches([recordBatch]).toBuffer();
|
||||
return new Uint8Array(buffer);
|
||||
} else if (payload_type === 'image') {
|
||||
if (data instanceof Uint8Array || data instanceof ArrayBuffer || Buffer.isBuffer(data)) {
|
||||
return data instanceof Uint8Array ? data : new Uint8Array(data);
|
||||
} else {
|
||||
throw new Error('Image data must be Uint8Array or ArrayBuffer');
|
||||
}
|
||||
} else if (payload_type === 'audio') {
|
||||
if (data instanceof Uint8Array || data instanceof ArrayBuffer || Buffer.isBuffer(data)) {
|
||||
return data instanceof Uint8Array ? data : new Uint8Array(data);
|
||||
} else {
|
||||
throw new Error('Audio data must be Uint8Array or ArrayBuffer');
|
||||
}
|
||||
} else if (payload_type === 'video') {
|
||||
if (data instanceof Uint8Array || data instanceof ArrayBuffer || Buffer.isBuffer(data)) {
|
||||
return data instanceof Uint8Array ? data : new Uint8Array(data);
|
||||
} else {
|
||||
throw new Error('Video data must be Uint8Array or ArrayBuffer');
|
||||
}
|
||||
} else if (payload_type === 'binary') {
|
||||
if (data instanceof Uint8Array || data instanceof ArrayBuffer || Buffer.isBuffer(data)) {
|
||||
return data instanceof Uint8Array ? data : new Uint8Array(data);
|
||||
} else {
|
||||
throw new Error('Binary data must be Uint8Array or ArrayBuffer');
|
||||
}
|
||||
} else {
|
||||
throw new Error(`Unknown payload_type: ${payload_type}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize bytes to data based on type
|
||||
* @param {Uint8Array} data - Serialized data as bytes
|
||||
* @param {string} payload_type - Data type
|
||||
* @param {string} correlation_id - Correlation ID for logging
|
||||
* @returns {Promise<any>} Deserialized data
|
||||
*/
|
||||
async function deserializeData(data, payload_type, correlation_id) {
|
||||
if (payload_type === 'text') {
|
||||
return new TextDecoder().decode(data);
|
||||
} else if (payload_type === 'dictionary') {
|
||||
const jsonStr = new TextDecoder().decode(data);
|
||||
return JSON.parse(jsonStr);
|
||||
} else if (payload_type === 'table') {
|
||||
// Use Apache Arrow for table deserialization
|
||||
if (!arrow) {
|
||||
throw new Error('apache-arrow not available. Install with: npm install apache-arrow');
|
||||
}
|
||||
|
||||
// Read Arrow IPC format
|
||||
const buffer = arrow.Buffer.wrap(data.buffer, data.byteOffset, data.byteLength);
|
||||
const table = arrow.tableFromRawBytes(buffer);
|
||||
|
||||
// Convert to array of objects for consistency with the API
|
||||
const result = [];
|
||||
const numRows = table.numRows;
|
||||
|
||||
for (let i = 0; i < numRows; i++) {
|
||||
const row = {};
|
||||
for (const colName of table.columnNames) {
|
||||
const column = table.getColumn(colName);
|
||||
row[colName] = column.get(i);
|
||||
}
|
||||
result.push(row);
|
||||
}
|
||||
|
||||
return result;
|
||||
} else if (payload_type === 'image') {
|
||||
return data;
|
||||
} else if (payload_type === 'audio') {
|
||||
return data;
|
||||
} else if (payload_type === 'video') {
|
||||
return data;
|
||||
} else if (payload_type === 'binary') {
|
||||
return data;
|
||||
} else {
|
||||
throw new Error(`Unknown payload_type: ${payload_type}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch data from URL with exponential backoff
|
||||
* @param {string} url - URL to fetch from
|
||||
* @param {number} max_retries - Maximum retry attempts
|
||||
* @param {number} base_delay - Initial delay in milliseconds
|
||||
* @param {number} max_delay - Maximum delay in milliseconds
|
||||
* @param {string} correlation_id - Correlation ID for logging
|
||||
* @returns {Promise<Uint8Array>} Fetched data as bytes
|
||||
*/
|
||||
async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) {
|
||||
let delay = base_delay;
|
||||
|
||||
for (let attempt = 1; attempt <= max_retries; attempt++) {
|
||||
try {
|
||||
const response = await fetch(url);
|
||||
|
||||
if (response.status === 200) {
|
||||
logTrace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`);
|
||||
const arrayBuffer = await response.arrayBuffer();
|
||||
return new Uint8Array(arrayBuffer);
|
||||
} else {
|
||||
throw new Error(`Failed to fetch: ${response.status}`);
|
||||
}
|
||||
} catch (e) {
|
||||
logTrace(correlation_id, `Attempt ${attempt} failed: ${e.constructor.name}`);
|
||||
|
||||
if (attempt < max_retries) {
|
||||
await new Promise(resolve => setTimeout(resolve, delay));
|
||||
delay = Math.min(delay * 2, max_delay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(`Failed to fetch data after ${max_retries} attempts`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload a single file to a plik server using one-shot mode
|
||||
* @param {string} file_server_url - Base URL of the plik server
|
||||
* @param {string} dataname - Name of the file being uploaded
|
||||
* @param {Uint8Array} data - Raw byte data of the file content
|
||||
* @returns {Promise<{ status, uploadid, fileid, url }>} Upload result
|
||||
*/
|
||||
async function plikOneshotUpload(file_server_url, dataname, data) {
|
||||
// Get upload id
|
||||
const url_getUploadID = `${file_server_url}/upload`;
|
||||
const headers = { 'Content-Type': 'application/json' };
|
||||
const body = JSON.stringify({ OneShot: true });
|
||||
|
||||
const http_response = await fetch(url_getUploadID, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body
|
||||
});
|
||||
|
||||
const response_json = await http_response.json();
|
||||
const uploadid = response_json.id;
|
||||
const uploadtoken = response_json.uploadToken;
|
||||
|
||||
// Upload file
|
||||
const url_upload = `${file_server_url}/file/${uploadid}`;
|
||||
const form = new FormData();
|
||||
const blob = new Blob([data]);
|
||||
form.append('file', blob, dataname);
|
||||
|
||||
const upload_headers = {
|
||||
'X-UploadToken': uploadtoken
|
||||
};
|
||||
|
||||
const upload_response = await fetch(url_upload, {
|
||||
method: 'POST',
|
||||
headers: upload_headers,
|
||||
body: form
|
||||
});
|
||||
|
||||
const upload_json = await upload_response.json();
|
||||
const fileid = upload_json.id;
|
||||
|
||||
const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`;
|
||||
|
||||
return {
|
||||
status: upload_response.status,
|
||||
uploadid,
|
||||
fileid,
|
||||
url
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish message to NATS
|
||||
* @param {string|object} broker_url_or_conn - NATS server URL or pre-existing connection
|
||||
* @param {string} subject - NATS subject to publish to
|
||||
* @param {string} message - JSON message to publish
|
||||
* @param {string} correlation_id - Correlation ID for tracing and logging
|
||||
*/
|
||||
async function publishMessage(broker_url_or_conn, subject, message, correlation_id) {
|
||||
if (broker_url_or_conn instanceof Object && broker_url_or_conn.publish) {
|
||||
// Pre-existing connection
|
||||
try {
|
||||
await broker_url_or_conn.publish(subject, message);
|
||||
logTrace(correlation_id, `Message published to ${subject}`);
|
||||
} finally {
|
||||
// Note: In a real implementation, you might want to drain/close the connection
|
||||
}
|
||||
} else {
|
||||
// URL-based - create new connection
|
||||
if (!nats) {
|
||||
throw new Error('nats package not available. Install with: npm install nats');
|
||||
}
|
||||
|
||||
const conn = await nats.connect(broker_url_or_conn);
|
||||
try {
|
||||
await conn.publish(subject, message);
|
||||
logTrace(correlation_id, `Message published to ${subject}`);
|
||||
} finally {
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build message envelope from payloads and metadata
|
||||
* @param {string} subject - NATS subject
|
||||
* @param {Array} payloads - Array of payload objects
|
||||
* @param {Object} options - Envelope options
|
||||
* @returns {Object} Message envelope
|
||||
*/
|
||||
function buildEnvelope(subject, payloads, options) {
|
||||
return {
|
||||
correlation_id: options.correlation_id || generateUUID(),
|
||||
msg_id: options.msg_id || generateUUID(),
|
||||
timestamp: new Date().toISOString(),
|
||||
send_to: subject,
|
||||
msg_purpose: options.msg_purpose || '',
|
||||
sender_name: options.sender_name || '',
|
||||
sender_id: options.sender_id || generateUUID(),
|
||||
receiver_name: options.receiver_name || '',
|
||||
receiver_id: options.receiver_id || '',
|
||||
reply_to: options.reply_to || '',
|
||||
reply_to_msg_id: options.reply_to_msg_id || '',
|
||||
broker_url: options.broker_url || DEFAULT_BROKER_URL,
|
||||
metadata: options.metadata || {},
|
||||
payloads
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert data to base64 string
|
||||
* @param {Uint8Array} buffer - Data buffer
|
||||
* @returns {string} Base64 encoded string
|
||||
*/
|
||||
function bufferToBase64(buffer) {
|
||||
if (typeof Buffer !== 'undefined' && Buffer.isBuffer(buffer)) {
|
||||
return buffer.toString('base64');
|
||||
}
|
||||
|
||||
// For browser/Node Uint8Array
|
||||
let binary = '';
|
||||
const bytes = new Uint8Array(buffer);
|
||||
const len = bytes.byteLength;
|
||||
for (let i = 0; i < len; i++) {
|
||||
binary += String.fromCharCode(bytes[i]);
|
||||
}
|
||||
return btoa(binary);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert base64 string to Uint8Array
|
||||
* @param {string} base64 - Base64 encoded string
|
||||
* @returns {Uint8Array} Decoded bytes
|
||||
*/
|
||||
function base64ToBuffer(base64) {
|
||||
const binary = atob(base64);
|
||||
const bytes = new Uint8Array(binary.length);
|
||||
for (let i = 0; i < binary.length; i++) {
|
||||
bytes[i] = binary.charCodeAt(i);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
|
||||
*
|
||||
* This function intelligently routes data delivery based on payload size relative to a threshold.
|
||||
* If the serialized payload is smaller than size_threshold, it encodes the data as Base64 and publishes directly over NATS.
|
||||
* Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS.
|
||||
*
|
||||
* @param {string} subject - NATS subject to publish the message to
|
||||
* @param {Array} data - Array of [dataname, data, type] tuples to send
|
||||
* @param {Object} options - Optional configuration
|
||||
* @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server
|
||||
* @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server for large payloads
|
||||
* @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads
|
||||
* @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold in bytes separating direct vs link transport
|
||||
* @param {string} [options.correlation_id] - Correlation ID for tracing (auto-generated if not provided)
|
||||
* @param {string} [options.msg_purpose="chat"] - Purpose of the message
|
||||
* @param {string} [options.sender_name="NATSBridge"] - Name of the sender
|
||||
* @param {string} [options.receiver_name=""] - Name of the receiver
|
||||
* @param {string} [options.receiver_id=""] - UUID of the receiver
|
||||
* @param {string} [options.reply_to=""] - Topic to reply to
|
||||
* @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to
|
||||
* @param {boolean} [options.is_publish=true] - Whether to automatically publish the message to NATS
|
||||
* @param {object} [options.nats_connection] - Pre-existing NATS connection
|
||||
* @param {string} [options.msg_id] - Message ID (auto-generated if not provided)
|
||||
* @param {string} [options.sender_id] - Sender ID (auto-generated if not provided)
|
||||
* @returns {Promise<[Object, string]>} Promise resolving to [envelope, env_json_str]
|
||||
*/
|
||||
async function smartsend(subject, data, options = {}) {
|
||||
const {
|
||||
broker_url = DEFAULT_BROKER_URL,
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler = plikOneshotUpload,
|
||||
size_threshold = DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id = generateUUID(),
|
||||
msg_purpose = 'chat',
|
||||
sender_name = 'NATSBridge',
|
||||
receiver_name = '',
|
||||
receiver_id = '',
|
||||
reply_to = '',
|
||||
reply_to_msg_id = '',
|
||||
is_publish = true,
|
||||
nats_connection = null,
|
||||
msg_id = generateUUID(),
|
||||
sender_id = generateUUID()
|
||||
} = options;
|
||||
|
||||
logTrace(correlation_id, `Starting smartsend for subject: ${subject}`);
|
||||
|
||||
// Process each payload in the list
|
||||
const payloads = [];
|
||||
for (const [dataname, payloadData, payloadType] of data) {
|
||||
const payloadBytes = await serializeData(payloadData, payloadType);
|
||||
const payloadSize = payloadBytes.byteLength;
|
||||
|
||||
logTrace(correlation_id, `Serialized payload '${dataname}' (payload_type: ${payloadType}) size: ${payloadSize} bytes`);
|
||||
|
||||
// Decision: Direct vs Link
|
||||
if (payloadSize < size_threshold) {
|
||||
// Direct path - Base64 encode and send via NATS
|
||||
const payloadB64 = bufferToBase64(payloadBytes);
|
||||
logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes`);
|
||||
|
||||
payloads.push({
|
||||
id: generateUUID(),
|
||||
dataname,
|
||||
payload_type: payloadType,
|
||||
transport: 'direct',
|
||||
encoding: 'base64',
|
||||
size: payloadSize,
|
||||
data: payloadB64,
|
||||
metadata: { payload_bytes: payloadSize }
|
||||
});
|
||||
} else {
|
||||
// Link path - Upload to HTTP server, send URL via NATS
|
||||
logTrace(correlation_id, 'Using link transport, uploading to fileserver');
|
||||
|
||||
const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes);
|
||||
|
||||
if (response.status !== 200) {
|
||||
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
|
||||
}
|
||||
|
||||
logTrace(correlation_id, `Uploaded to URL: ${response.url}`);
|
||||
|
||||
payloads.push({
|
||||
id: generateUUID(),
|
||||
dataname,
|
||||
payload_type: payloadType,
|
||||
transport: 'link',
|
||||
encoding: 'none',
|
||||
size: payloadSize,
|
||||
data: response.url,
|
||||
metadata: {}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Build envelope
|
||||
const env = buildEnvelope(subject, payloads, {
|
||||
correlation_id,
|
||||
msg_id,
|
||||
msg_purpose,
|
||||
sender_name,
|
||||
sender_id,
|
||||
receiver_name,
|
||||
receiver_id,
|
||||
reply_to,
|
||||
reply_to_msg_id,
|
||||
broker_url
|
||||
});
|
||||
|
||||
const env_json_str = JSON.stringify(env);
|
||||
|
||||
if (is_publish) {
|
||||
if (nats_connection) {
|
||||
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
|
||||
} else {
|
||||
await publishMessage(broker_url, subject, env_json_str, correlation_id);
|
||||
}
|
||||
}
|
||||
|
||||
return [env, env_json_str];
|
||||
}
|
||||
|
||||
/**
|
||||
* smartreceive - Receive and process messages from NATS
|
||||
*
|
||||
* This function processes incoming NATS messages, handling both direct transport
|
||||
* (base64 decoded payloads) and link transport (URL-based payloads).
|
||||
* It deserializes the data based on the transport type and returns the result.
|
||||
*
|
||||
* @param {Object} msg - NATS message object
|
||||
* @param {Object} options - Optional configuration
|
||||
* @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle downloading data from file server URLs
|
||||
* @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL
|
||||
* @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms
|
||||
* @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms
|
||||
* @returns {Promise<Object>} Promise resolving to envelope object with deserialized payloads
|
||||
*/
|
||||
async function smartreceive(msg, options = {}) {
|
||||
const {
|
||||
fileserver_download_handler = fetchWithBackoff,
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
} = options;
|
||||
|
||||
// Parse the JSON envelope
|
||||
const payload = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
|
||||
const env_json_obj = JSON.parse(payload);
|
||||
|
||||
logTrace(env_json_obj.correlation_id, 'Processing received message');
|
||||
|
||||
// Process all payloads in the envelope
|
||||
const payloads_list = [];
|
||||
|
||||
for (const payload of env_json_obj.payloads) {
|
||||
const transport = payload.transport;
|
||||
const dataname = payload.dataname;
|
||||
|
||||
if (transport === 'direct') {
|
||||
logTrace(env_json_obj.correlation_id, `Direct transport - decoding payload '${dataname}'`);
|
||||
|
||||
// Extract base64 payload from the payload
|
||||
const payload_b64 = payload.data;
|
||||
|
||||
// Decode Base64 payload
|
||||
const payload_bytes = base64ToBuffer(payload_b64);
|
||||
|
||||
// Deserialize based on type
|
||||
const data_type = payload.payload_type;
|
||||
const data = await deserializeData(payload_bytes, data_type, env_json_obj.correlation_id);
|
||||
|
||||
payloads_list.push([dataname, data, data_type]);
|
||||
} else if (transport === 'link') {
|
||||
// Extract download URL from the payload
|
||||
const url = payload.data;
|
||||
logTrace(env_json_obj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`);
|
||||
|
||||
// Fetch with exponential backoff using the download handler
|
||||
const downloaded_data = await fileserver_download_handler(
|
||||
url,
|
||||
max_retries,
|
||||
base_delay,
|
||||
max_delay,
|
||||
env_json_obj.correlation_id
|
||||
);
|
||||
|
||||
// Deserialize based on type
|
||||
const data_type = payload.payload_type;
|
||||
const data = await deserializeData(downloaded_data, data_type, env_json_obj.correlation_id);
|
||||
|
||||
payloads_list.push([dataname, data, data_type]);
|
||||
} else {
|
||||
throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`);
|
||||
}
|
||||
}
|
||||
|
||||
env_json_obj.payloads = payloads_list;
|
||||
return env_json_obj;
|
||||
}
|
||||
|
||||
/**
|
||||
* NATS Client wrapper for managing connections
|
||||
*/
|
||||
class NATSClient {
|
||||
constructor(url) {
|
||||
this.url = url;
|
||||
this.connection = null;
|
||||
}
|
||||
|
||||
async connect() {
|
||||
if (!nats) {
|
||||
throw new Error('nats package not available. Install with: npm install nats');
|
||||
}
|
||||
this.connection = await nats.connect({ servers: this.url });
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
async publish(subject, message) {
|
||||
if (!this.connection) {
|
||||
await this.connect();
|
||||
}
|
||||
await this.connection.publish(subject, message);
|
||||
}
|
||||
|
||||
async close() {
|
||||
if (this.connection) {
|
||||
this.connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Export for Node.js
|
||||
if (typeof module !== 'undefined' && module.exports) {
|
||||
module.exports = {
|
||||
NATSClient,
|
||||
smartsend,
|
||||
smartreceive,
|
||||
plikOneshotUpload,
|
||||
fetchWithBackoff,
|
||||
serializeData,
|
||||
deserializeData,
|
||||
publishMessage,
|
||||
logTrace,
|
||||
generateUUID,
|
||||
DEFAULT_SIZE_THRESHOLD,
|
||||
DEFAULT_BROKER_URL,
|
||||
DEFAULT_FILESERVER_URL
|
||||
};
|
||||
}
|
||||
|
||||
// Export for browser (global scope)
|
||||
if (typeof window !== 'undefined') {
|
||||
window.NATSBridge = {
|
||||
smartsend,
|
||||
smartreceive,
|
||||
plikOneshotUpload,
|
||||
fetchWithBackoff,
|
||||
serializeData,
|
||||
deserializeData,
|
||||
publishMessage,
|
||||
logTrace,
|
||||
generateUUID,
|
||||
NATSClient,
|
||||
DEFAULT_SIZE_THRESHOLD,
|
||||
DEFAULT_BROKER_URL,
|
||||
DEFAULT_FILESERVER_URL
|
||||
};
|
||||
}
|
||||
784
src/natbridge.py
784
src/natbridge.py
@@ -1,784 +0,0 @@
|
||||
"""
|
||||
NATSBridge - Cross-Platform Bi-Directional Data Bridge
|
||||
Python Implementation (Desktop Python)
|
||||
|
||||
This module provides functionality for sending and receiving data across network boundaries
|
||||
using NATS as the message bus, with support for both direct payload transport and
|
||||
URL-based transport for larger payloads.
|
||||
|
||||
File Server Handler Architecture:
|
||||
The system uses handler functions to abstract file server operations, allowing support
|
||||
for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
|
||||
|
||||
Handler Function Signatures:
|
||||
|
||||
```python
|
||||
# Upload handler - uploads data to file server and returns URL
|
||||
# The handler is passed to smartsend as fileserver_upload_handler parameter
|
||||
# It receives: (fileserver_url, dataname, data)
|
||||
# Returns: Coroutine[Dict[str, Any]]
|
||||
async def fileserver_upload_handler(fileserver_url, dataname, data)
|
||||
|
||||
# Download handler - fetches data from file server URL with exponential backoff
|
||||
# The handler is passed to smartreceive as fileserver_download_handler parameter
|
||||
# It receives: (url, max_retries, base_delay, max_delay, correlation_id)
|
||||
# Returns: Coroutine[bytes]
|
||||
async def fileserver_download_handler(url, max_retries, base_delay, max_delay, correlation_id)
|
||||
```
|
||||
|
||||
Multi-Payload Support (Standard API):
|
||||
The system uses a standardized list-of-tuples format for all payload operations.
|
||||
Even when sending a single payload, the user must wrap it in a list.
|
||||
|
||||
API Standard:
|
||||
|
||||
```python
|
||||
# Input format for smartsend (always a list of tuples with type info)
|
||||
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
||||
|
||||
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
|
||||
{
|
||||
"correlation_id": "...",
|
||||
"msg_id": "...",
|
||||
"timestamp": "...",
|
||||
"send_to": "...",
|
||||
"msg_purpose": "...",
|
||||
"sender_name": "...",
|
||||
"sender_id": "...",
|
||||
"receiver_name": "...",
|
||||
"receiver_id": "...",
|
||||
"reply_to": "...",
|
||||
"reply_to_msg_id": "...",
|
||||
"broker_url": "...",
|
||||
"metadata": {...},
|
||||
"payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
||||
}
|
||||
```
|
||||
|
||||
Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
import uuid
|
||||
import time
|
||||
from typing import Any, Callable, Dict, List, Tuple, Union
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
|
||||
# Optional dependencies
|
||||
try:
|
||||
import aiohttp
|
||||
AIOHTTP_AVAILABLE = True
|
||||
except ImportError:
|
||||
AIOHTTP_AVAILABLE = False
|
||||
|
||||
try:
|
||||
import pyarrow as arrow
|
||||
import pyarrow.parquet as pq
|
||||
import pandas as pd
|
||||
ARROW_AVAILABLE = True
|
||||
except ImportError:
|
||||
ARROW_AVAILABLE = False
|
||||
|
||||
try:
|
||||
import nats
|
||||
from nats.aio.client import Client as NATSClient
|
||||
NATS_AVAILABLE = True
|
||||
except ImportError:
|
||||
NATS_AVAILABLE = False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Constants
|
||||
# ============================================================================
|
||||
|
||||
DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport
|
||||
DEFAULT_BROKER_URL = "nats://localhost:4222" # Default NATS server URL
|
||||
DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Data Classes
|
||||
# ============================================================================
|
||||
|
||||
@dataclass
|
||||
class MsgPayloadV1:
|
||||
"""Message payload structure."""
|
||||
id: str
|
||||
dataname: str
|
||||
payload_type: str
|
||||
transport: str
|
||||
encoding: str
|
||||
size: int
|
||||
data: Union[str, bytes] # URL for link, base64 for direct
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MsgEnvelopeV1:
|
||||
"""Message envelope structure."""
|
||||
correlation_id: str
|
||||
msg_id: str
|
||||
timestamp: str
|
||||
send_to: str
|
||||
msg_purpose: str
|
||||
sender_name: str
|
||||
sender_id: str
|
||||
receiver_name: str
|
||||
receiver_id: str
|
||||
reply_to: str
|
||||
reply_to_msg_id: str
|
||||
broker_url: str
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
payloads: List[MsgPayloadV1] = field(default_factory=list)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Utility Functions
|
||||
# ============================================================================
|
||||
|
||||
def log_trace(correlation_id: str, message: str) -> None:
|
||||
"""Log a trace message with correlation ID and timestamp."""
|
||||
timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
|
||||
print(f"[{timestamp}] [Correlation: {correlation_id}] {message}")
|
||||
|
||||
|
||||
def generate_uuid() -> str:
|
||||
"""Generate a UUID v4 string."""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Serialization Functions
|
||||
# ============================================================================
|
||||
|
||||
def _serialize_data(data: Any, payload_type: str) -> bytes:
|
||||
"""
|
||||
Serialize data according to specified format.
|
||||
|
||||
Args:
|
||||
data: Data to serialize
|
||||
payload_type: Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||
|
||||
Returns:
|
||||
Binary representation of the serialized data
|
||||
|
||||
Raises:
|
||||
Error: If data type doesn't match payload_type
|
||||
"""
|
||||
if payload_type == 'text':
|
||||
if isinstance(data, str):
|
||||
return data.encode('utf-8')
|
||||
else:
|
||||
raise Error('Text data must be a string')
|
||||
elif payload_type == 'dictionary':
|
||||
json_str = json.dumps(data)
|
||||
return json_str.encode('utf-8')
|
||||
elif payload_type == 'table':
|
||||
if not ARROW_AVAILABLE:
|
||||
raise Error('pyarrow not available for table serialization. Install with: pip install pyarrow pandas')
|
||||
|
||||
# Convert DataFrame to Arrow IPC
|
||||
import io
|
||||
buf = io.BytesIO()
|
||||
|
||||
if isinstance(data, pd.DataFrame):
|
||||
table = arrow.Table.from_pandas(data)
|
||||
sink = arrow.ipc.new_file(buf)
|
||||
arrow.ipc.write_table(table, sink)
|
||||
sink.close()
|
||||
return buf.getvalue()
|
||||
else:
|
||||
raise Error('Table data must be a pandas DataFrame')
|
||||
elif payload_type == 'image':
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
return bytes(data)
|
||||
else:
|
||||
raise Error('Image data must be bytes')
|
||||
elif payload_type == 'audio':
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
return bytes(data)
|
||||
else:
|
||||
raise Error('Audio data must be bytes')
|
||||
elif payload_type == 'video':
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
return bytes(data)
|
||||
else:
|
||||
raise Error('Video data must be bytes')
|
||||
elif payload_type == 'binary':
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
return bytes(data)
|
||||
else:
|
||||
raise Error('Binary data must be bytes')
|
||||
else:
|
||||
raise Error(f'Unknown payload_type: {payload_type}')
|
||||
|
||||
|
||||
def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any:
|
||||
"""
|
||||
Deserialize bytes to data based on type.
|
||||
|
||||
Args:
|
||||
data: Serialized data as bytes
|
||||
payload_type: Data type
|
||||
correlation_id: Correlation ID for logging
|
||||
|
||||
Returns:
|
||||
Deserialized data
|
||||
"""
|
||||
if payload_type == 'text':
|
||||
return data.decode('utf-8')
|
||||
elif payload_type == 'dictionary':
|
||||
json_str = data.decode('utf-8')
|
||||
return json.loads(json_str)
|
||||
elif payload_type == 'table':
|
||||
if not ARROW_AVAILABLE:
|
||||
raise Error('pyarrow not available for table deserialization')
|
||||
|
||||
import io
|
||||
buf = io.BytesIO(data)
|
||||
reader = arrow.ipc.open_file(buf)
|
||||
return reader.read_all().to_pandas()
|
||||
elif payload_type == 'image':
|
||||
return data
|
||||
elif payload_type == 'audio':
|
||||
return data
|
||||
elif payload_type == 'video':
|
||||
return data
|
||||
elif payload_type == 'binary':
|
||||
return data
|
||||
else:
|
||||
raise Error(f'Unknown payload_type: {payload_type}')
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# File Server Handlers
|
||||
# ============================================================================
|
||||
|
||||
async def plik_oneshot_upload(
|
||||
file_server_url: str,
|
||||
dataname: str,
|
||||
data: bytes
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Upload a single file to a plik server using one-shot mode.
|
||||
|
||||
Args:
|
||||
file_server_url: Base URL of the plik server
|
||||
dataname: Name of the file being uploaded
|
||||
data: Raw byte data of the file content
|
||||
|
||||
Returns:
|
||||
Dict with keys: 'status', 'uploadid', 'fileid', 'url'
|
||||
"""
|
||||
if not AIOHTTP_AVAILABLE:
|
||||
raise Error('aiohttp not available. Install with: pip install aiohttp')
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
# Get upload id
|
||||
url_getUploadID = f"{file_server_url}/upload"
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
body = json.dumps({"OneShot": True})
|
||||
|
||||
async with session.post(url_getUploadID, headers=headers, data=body) as response:
|
||||
response_json = await response.json()
|
||||
uploadid = response_json['id']
|
||||
uploadtoken = response_json['uploadToken']
|
||||
|
||||
# Upload file
|
||||
url_upload = f"{file_server_url}/file/{uploadid}"
|
||||
headers = {'X-UploadToken': uploadtoken}
|
||||
|
||||
form = aiohttp.FormData()
|
||||
form.add_field('file', data, filename=dataname, content_type='application/octet-stream')
|
||||
|
||||
async with session.post(url_upload, headers=headers, data=form) as upload_response:
|
||||
upload_json = await upload_response.json()
|
||||
fileid = upload_json['id']
|
||||
|
||||
url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}"
|
||||
|
||||
return {
|
||||
'status': upload_response.status,
|
||||
'uploadid': uploadid,
|
||||
'fileid': fileid,
|
||||
'url': url
|
||||
}
|
||||
|
||||
|
||||
async def fetch_with_backoff(
|
||||
url: str,
|
||||
max_retries: int,
|
||||
base_delay: int,
|
||||
max_delay: int,
|
||||
correlation_id: str
|
||||
) -> bytes:
|
||||
"""
|
||||
Fetch data from URL with exponential backoff.
|
||||
|
||||
Args:
|
||||
url: URL to fetch from
|
||||
max_retries: Maximum retry attempts
|
||||
base_delay: Initial delay in milliseconds
|
||||
max_delay: Maximum delay in milliseconds
|
||||
correlation_id: Correlation ID for logging
|
||||
|
||||
Returns:
|
||||
Fetched data as bytes
|
||||
|
||||
Raises:
|
||||
Error: If all retry attempts fail
|
||||
"""
|
||||
if not AIOHTTP_AVAILABLE:
|
||||
raise Error('aiohttp not available. Install with: pip install aiohttp')
|
||||
|
||||
delay = base_delay
|
||||
|
||||
for attempt in range(1, max_retries + 1):
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as response:
|
||||
if response.status == 200:
|
||||
log_trace(correlation_id, f"Successfully fetched data from {url} on attempt {attempt}")
|
||||
return await response.read()
|
||||
else:
|
||||
raise Error(f"Failed to fetch: {response.status}")
|
||||
except Exception as e:
|
||||
log_trace(correlation_id, f"Attempt {attempt} failed: {type(e).__name__}")
|
||||
|
||||
if attempt < max_retries:
|
||||
await asyncio.sleep(delay / 1000.0)
|
||||
delay = min(delay * 2, max_delay)
|
||||
|
||||
raise Error(f"Failed to fetch data after {max_retries} attempts")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# NATS Publishing
|
||||
# ============================================================================
|
||||
|
||||
async def publish_message(
|
||||
broker_url_or_conn: Union[str, NATSClient],
|
||||
subject: str,
|
||||
message: str,
|
||||
correlation_id: str,
|
||||
nats_connection: NATSClient = None
|
||||
) -> None:
|
||||
"""
|
||||
Publish message to NATS.
|
||||
|
||||
Args:
|
||||
broker_url_or_conn: NATS server URL or pre-existing connection
|
||||
subject: NATS subject to publish to
|
||||
message: JSON message to publish
|
||||
correlation_id: Correlation ID for tracing and logging
|
||||
nats_connection: Optional pre-existing NATS connection
|
||||
"""
|
||||
if nats_connection:
|
||||
# Use provided connection
|
||||
try:
|
||||
await nats_connection.publish(subject, message)
|
||||
log_trace(correlation_id, f"Message published to {subject}")
|
||||
finally:
|
||||
# Note: In a real implementation, you might want to drain/close the connection
|
||||
pass
|
||||
elif isinstance(broker_url_or_conn, str):
|
||||
# URL-based - create new connection
|
||||
if not NATS_AVAILABLE:
|
||||
raise Error('nats-py not available. Install with: pip install nats-py')
|
||||
|
||||
conn = await nats.connect(broker_url_or_conn)
|
||||
try:
|
||||
await conn.publish(subject, message)
|
||||
log_trace(correlation_id, f"Message published to {subject}")
|
||||
finally:
|
||||
await conn.drain()
|
||||
else:
|
||||
raise Error('Invalid broker_url_or_conn type')
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Core Functions
|
||||
# ============================================================================
|
||||
|
||||
async def smartsend(
|
||||
subject: str,
|
||||
data: List[Tuple[str, Any, str]],
|
||||
broker_url: str = DEFAULT_BROKER_URL,
|
||||
fileserver_url: str = DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler: Callable = plik_oneshot_upload,
|
||||
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id: str = None,
|
||||
msg_purpose: str = "chat",
|
||||
sender_name: str = "NATSBridge",
|
||||
receiver_name: str = "",
|
||||
receiver_id: str = "",
|
||||
reply_to: str = "",
|
||||
reply_to_msg_id: str = "",
|
||||
is_publish: bool = True,
|
||||
nats_connection: NATSClient = None,
|
||||
msg_id: str = None,
|
||||
sender_id: str = None
|
||||
) -> Tuple[Dict, str]:
|
||||
"""
|
||||
Send data via NATS with automatic transport selection.
|
||||
|
||||
This function intelligently routes data delivery based on payload size relative to a threshold.
|
||||
If the serialized payload is smaller than size_threshold, it encodes the data as Base64
|
||||
and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
|
||||
and publishes only the download URL over NATS.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish the message to
|
||||
data: List of (dataname, data, type) tuples
|
||||
broker_url: URL of the NATS server
|
||||
fileserver_url: URL of the HTTP file server for large payloads
|
||||
fileserver_upload_handler: Function to handle fileserver uploads
|
||||
size_threshold: Threshold in bytes separating direct vs link transport
|
||||
correlation_id: Correlation ID for tracing (auto-generated if not provided)
|
||||
msg_purpose: Purpose of the message
|
||||
sender_name: Name of the sender
|
||||
receiver_name: Name of the receiver
|
||||
receiver_id: UUID of the receiver
|
||||
reply_to: Topic to reply to
|
||||
reply_to_msg_id: Message ID this message is replying to
|
||||
is_publish: Whether to automatically publish the message to NATS
|
||||
nats_connection: Pre-existing NATS connection (if provided, uses this instead of creating a new one)
|
||||
msg_id: Message ID (auto-generated if not provided)
|
||||
sender_id: Sender ID (auto-generated if not provided)
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str) where:
|
||||
- env: Dict with all metadata and payloads
|
||||
- env_json_str: JSON string for publishing to NATS
|
||||
|
||||
Example:
|
||||
```python
|
||||
# Send a single payload (still wrapped in a list)
|
||||
config = {"key": "value"}
|
||||
env, env_json_str = await smartsend(
|
||||
"my.subject",
|
||||
[("config", config, "dictionary")],
|
||||
broker_url="nats://localhost:4222"
|
||||
)
|
||||
|
||||
# Send multiple payloads in one message with different types
|
||||
data1 = {"key1": "value1"}
|
||||
data2 = [1, 2, 3, 4, 5]
|
||||
env, env_json_str = await smartsend(
|
||||
"my.subject",
|
||||
[("data1", data1, "dictionary"), ("data2", data2, "table")]
|
||||
)
|
||||
|
||||
# Mixed content (e.g., chat with text and image)
|
||||
env, env_json_str = await smartsend(
|
||||
"chat.subject",
|
||||
[
|
||||
("message_text", "Hello!", "text"),
|
||||
("user_image", image_bytes, "image"),
|
||||
("audio_clip", audio_bytes, "audio")
|
||||
]
|
||||
)
|
||||
```
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = generate_uuid()
|
||||
if msg_id is None:
|
||||
msg_id = generate_uuid()
|
||||
if sender_id is None:
|
||||
sender_id = generate_uuid()
|
||||
|
||||
log_trace(correlation_id, f"Starting smartsend for subject: {subject}")
|
||||
|
||||
# Process each payload in the list
|
||||
payloads = []
|
||||
for dataname, payload_data, payload_type in data:
|
||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
||||
payload_size = len(payload_bytes)
|
||||
|
||||
log_trace(correlation_id, f"Serialized payload '{dataname}' (payload_type: {payload_type}) size: {payload_size} bytes")
|
||||
|
||||
# Decision: Direct vs Link
|
||||
if payload_size < size_threshold:
|
||||
# Direct path - Base64 encode and send via NATS
|
||||
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
|
||||
log_trace(correlation_id, f"Using direct transport for {payload_size} bytes")
|
||||
|
||||
payloads.append({
|
||||
'id': generate_uuid(),
|
||||
'dataname': dataname,
|
||||
'payload_type': payload_type,
|
||||
'transport': 'direct',
|
||||
'encoding': 'base64',
|
||||
'size': payload_size,
|
||||
'data': payload_b64,
|
||||
'metadata': {'payload_bytes': payload_size}
|
||||
})
|
||||
else:
|
||||
# Link path - Upload to HTTP server, send URL via NATS
|
||||
log_trace(correlation_id, "Using link transport, uploading to fileserver")
|
||||
|
||||
response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||||
|
||||
if response['status'] != 200:
|
||||
raise Error(f"Failed to upload data to fileserver: {response['status']}")
|
||||
|
||||
log_trace(correlation_id, f"Uploaded to URL: {response['url']}")
|
||||
|
||||
payloads.append({
|
||||
'id': generate_uuid(),
|
||||
'dataname': dataname,
|
||||
'payload_type': payload_type,
|
||||
'transport': 'link',
|
||||
'encoding': 'none',
|
||||
'size': payload_size,
|
||||
'data': response['url'],
|
||||
'metadata': {}
|
||||
})
|
||||
|
||||
# Build envelope
|
||||
env = {
|
||||
'correlation_id': correlation_id,
|
||||
'msg_id': msg_id,
|
||||
'timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
|
||||
'send_to': subject,
|
||||
'msg_purpose': msg_purpose,
|
||||
'sender_name': sender_name,
|
||||
'sender_id': sender_id,
|
||||
'receiver_name': receiver_name,
|
||||
'receiver_id': receiver_id,
|
||||
'reply_to': reply_to,
|
||||
'reply_to_msg_id': reply_to_msg_id,
|
||||
'broker_url': broker_url,
|
||||
'metadata': {},
|
||||
'payloads': payloads
|
||||
}
|
||||
|
||||
env_json_str = json.dumps(env)
|
||||
|
||||
if is_publish:
|
||||
if nats_connection:
|
||||
await publish_message(broker_url, subject, env_json_str, correlation_id, nats_connection)
|
||||
else:
|
||||
await publish_message(broker_url, subject, env_json_str, correlation_id)
|
||||
|
||||
return env, env_json_str
|
||||
|
||||
|
||||
async def smartreceive(
|
||||
msg: Any,
|
||||
fileserver_download_handler: Callable = fetch_with_backoff,
|
||||
max_retries: int = 5,
|
||||
base_delay: int = 100,
|
||||
max_delay: int = 5000
|
||||
) -> Dict:
|
||||
"""
|
||||
Receive and process messages from NATS.
|
||||
|
||||
This function processes incoming NATS messages, handling both direct transport
|
||||
(base64 decoded payloads) and link transport (URL-based payloads).
|
||||
It deserializes the data based on the transport type and returns the result.
|
||||
|
||||
Args:
|
||||
msg: NATS message object
|
||||
fileserver_download_handler: Function to handle downloading data from file server URLs
|
||||
max_retries: Maximum retry attempts for fetching URL
|
||||
base_delay: Initial delay for exponential backoff in ms
|
||||
max_delay: Maximum delay for exponential backoff in ms
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads field containing list of tuples
|
||||
|
||||
Example:
|
||||
```python
|
||||
# Receive and process message
|
||||
msg = await nats_connection.subscribe("my.subject")
|
||||
env = await smartreceive(msg)
|
||||
# env["payloads"] = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]
|
||||
```
|
||||
"""
|
||||
# Parse the JSON envelope
|
||||
if isinstance(msg.payload, bytes):
|
||||
payload = msg.payload.decode('utf-8')
|
||||
else:
|
||||
payload = msg.payload
|
||||
|
||||
env_json_obj = json.loads(payload)
|
||||
log_trace(env_json_obj['correlation_id'], "Processing received message")
|
||||
|
||||
# Process all payloads in the envelope
|
||||
payloads_list = []
|
||||
|
||||
for payload in env_json_obj['payloads']:
|
||||
transport = payload['transport']
|
||||
dataname = payload['dataname']
|
||||
|
||||
if transport == 'direct':
|
||||
log_trace(env_json_obj['correlation_id'], f"Direct transport - decoding payload '{dataname}'")
|
||||
|
||||
# Extract base64 payload from the payload
|
||||
payload_b64 = payload['data']
|
||||
|
||||
# Decode Base64 payload
|
||||
payload_bytes = base64.b64decode(payload_b64)
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = payload['payload_type']
|
||||
data = _deserialize_data(payload_bytes, data_type, env_json_obj['correlation_id'])
|
||||
|
||||
payloads_list.append((dataname, data, data_type))
|
||||
elif transport == 'link':
|
||||
# Extract download URL from the payload
|
||||
url = payload['data']
|
||||
log_trace(env_json_obj['correlation_id'], f"Link transport - fetching '{dataname}' from URL: {url}")
|
||||
|
||||
# Fetch with exponential backoff using the download handler
|
||||
downloaded_data = await fileserver_download_handler(
|
||||
url,
|
||||
max_retries,
|
||||
base_delay,
|
||||
max_delay,
|
||||
env_json_obj['correlation_id']
|
||||
)
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = payload['payload_type']
|
||||
data = _deserialize_data(downloaded_data, data_type, env_json_obj['correlation_id'])
|
||||
|
||||
payloads_list.append((dataname, data, data_type))
|
||||
else:
|
||||
raise Error(f"Unknown transport type for payload '{dataname}': {transport}")
|
||||
|
||||
env_json_obj['payloads'] = payloads_list
|
||||
return env_json_obj
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# NATS Client Wrapper
|
||||
# ============================================================================
|
||||
|
||||
class NATSBridge:
|
||||
"""
|
||||
Cross-platform NATS bridge implementation.
|
||||
|
||||
Provides a class-based interface for NATSBridge functionality.
|
||||
"""
|
||||
|
||||
def __init__(self, broker_url: str = None, fileserver_url: str = None):
|
||||
"""
|
||||
Initialize the NATSBridge client.
|
||||
|
||||
Args:
|
||||
broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL)
|
||||
fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL)
|
||||
"""
|
||||
self.broker_url = broker_url or DEFAULT_BROKER_URL
|
||||
self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
|
||||
self._nats_client: NATSClient = None
|
||||
|
||||
async def connect(self, broker_url: str = None) -> NATSClient:
|
||||
"""
|
||||
Connect to NATS server.
|
||||
|
||||
Args:
|
||||
broker_url: NATS server URL (optional, uses instance broker_url if not provided)
|
||||
|
||||
Returns:
|
||||
NATS connection client
|
||||
"""
|
||||
url = broker_url or self.broker_url
|
||||
if not NATS_AVAILABLE:
|
||||
raise Error('nats-py not available. Install with: pip install nats-py')
|
||||
self._nats_client = await nats.connect(url)
|
||||
return self._nats_client
|
||||
|
||||
async def smartsend(
|
||||
self,
|
||||
subject: str,
|
||||
data: List[Tuple[str, Any, str]],
|
||||
**kwargs
|
||||
) -> Tuple[Dict, str]:
|
||||
"""
|
||||
Send data via NATS using instance configuration.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
data: List of (dataname, data, type) tuples
|
||||
**kwargs: Additional options (broker_url, fileserver_url, etc.)
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
"""
|
||||
# Merge instance config with kwargs
|
||||
options = {
|
||||
'broker_url': kwargs.get('broker_url', self.broker_url),
|
||||
'fileserver_url': kwargs.get('fileserver_url', self.fileserver_url),
|
||||
**kwargs
|
||||
}
|
||||
return await smartsend(subject, data, **options)
|
||||
|
||||
async def smartreceive(
|
||||
self,
|
||||
msg: Any,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""
|
||||
Receive and process NATS message using instance configuration.
|
||||
|
||||
Args:
|
||||
msg: NATS message object
|
||||
**kwargs: Additional options
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
return await smartreceive(msg, **kwargs)
|
||||
|
||||
async def close(self):
|
||||
"""Close the NATS connection."""
|
||||
if self._nats_client:
|
||||
await self._nats_client.close()
|
||||
self._nats_client = None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Module Exports
|
||||
# ============================================================================
|
||||
|
||||
__all__ = [
|
||||
# Core functions
|
||||
'smartsend',
|
||||
'smartreceive',
|
||||
|
||||
# Utility functions
|
||||
'log_trace',
|
||||
'generate_uuid',
|
||||
'_serialize_data',
|
||||
'_deserialize_data',
|
||||
|
||||
# File server handlers
|
||||
'plik_oneshot_upload',
|
||||
'fetch_with_backoff',
|
||||
|
||||
# NATS publishing
|
||||
'publish_message',
|
||||
|
||||
# Data classes
|
||||
'MsgPayloadV1',
|
||||
'MsgEnvelopeV1',
|
||||
|
||||
# Client class
|
||||
'NATSBridge',
|
||||
|
||||
# Constants
|
||||
'DEFAULT_SIZE_THRESHOLD',
|
||||
'DEFAULT_BROKER_URL',
|
||||
'DEFAULT_FILESERVER_URL',
|
||||
|
||||
# Availability flags
|
||||
'ARROW_AVAILABLE',
|
||||
'AIOHTTP_AVAILABLE',
|
||||
'NATS_AVAILABLE',
|
||||
]
|
||||
@@ -1,728 +0,0 @@
|
||||
"""
|
||||
NATSBridge - Cross-Platform Bi-Directional Data Bridge
|
||||
MicroPython Implementation
|
||||
|
||||
This module provides functionality for sending and receiving data across network boundaries
|
||||
using NATS as the message bus, with support for both direct payload transport and
|
||||
URL-based transport for larger payloads.
|
||||
|
||||
MicroPython Limitations:
|
||||
- No Arrow IPC support (memory constraints)
|
||||
- Only direct transport (< 100KB threshold enforced)
|
||||
- Synchronous API (no async/await)
|
||||
- Limited UUID generation
|
||||
- Simplified file server handlers
|
||||
|
||||
Handler Function Signatures:
|
||||
|
||||
```python
|
||||
# Upload handler - uploads data to file server and returns URL
|
||||
# The handler is passed to smartsend as fileserver_upload_handler parameter
|
||||
# It receives: (fileserver_url, dataname, data)
|
||||
# Returns: Dict with keys: 'status', 'url' (MicroPython simplified)
|
||||
def fileserver_upload_handler(fileserver_url, dataname, data)
|
||||
|
||||
# Download handler - fetches data from file server URL with exponential backoff
|
||||
# The handler is passed to smartreceive as fileserver_download_handler parameter
|
||||
# It receives: (url, max_retries, base_delay, max_delay, correlation_id)
|
||||
# Returns: bytearray
|
||||
def fileserver_download_handler(url, max_retries, base_delay, max_delay, correlation_id)
|
||||
```
|
||||
|
||||
Multi-Payload Support (Standard API):
|
||||
The system uses a standardized list-of-tuples format for all payload operations.
|
||||
|
||||
API Standard:
|
||||
|
||||
```python
|
||||
# Input format for smartsend (always a list of tuples with type info)
|
||||
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
||||
|
||||
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
|
||||
{
|
||||
"correlation_id": "...",
|
||||
"msg_id": "...",
|
||||
"timestamp": "...",
|
||||
"send_to": "...",
|
||||
"msg_purpose": "...",
|
||||
"sender_name": "...",
|
||||
"sender_id": "...",
|
||||
"receiver_name": "...",
|
||||
"receiver_id": "...",
|
||||
"reply_to": "...",
|
||||
"reply_to_msg_id": "...",
|
||||
"broker_url": "...",
|
||||
"metadata": {...},
|
||||
"payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
||||
}
|
||||
```
|
||||
|
||||
Supported types: "text", "dictionary", "image", "audio", "video", "binary"
|
||||
Note: "table" type is NOT supported in MicroPython due to memory constraints
|
||||
"""
|
||||
|
||||
import network
|
||||
import socket
|
||||
import time
|
||||
import json
|
||||
import base64
|
||||
import uos
|
||||
|
||||
# ============================================================================
|
||||
# Constants
|
||||
# ============================================================================
|
||||
|
||||
# MicroPython-specific lower thresholds due to memory constraints
|
||||
DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython (vs 1MB on desktop)
|
||||
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
||||
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
||||
MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython to prevent OOM
|
||||
|
||||
# NATS connection settings
|
||||
NATS_SERVER_HOST = "localhost"
|
||||
NATS_SERVER_PORT = 4222
|
||||
NATS_RECONNECT_DELAY = 5000 # 5 seconds
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Utility Functions
|
||||
# ============================================================================
|
||||
|
||||
def log_trace(correlation_id, message):
|
||||
"""Log a trace message with correlation ID and timestamp."""
|
||||
try:
|
||||
timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime())
|
||||
print(f"[{timestamp}] [Correlation: {correlation_id}] {message}")
|
||||
except Exception:
|
||||
# Fallback for MicroPython
|
||||
print(f"[Correlation: {correlation_id}] {message}")
|
||||
|
||||
|
||||
def generate_uuid():
|
||||
"""
|
||||
Generate a simple UUID for MicroPython.
|
||||
|
||||
Note: This is not a true UUID v4, but provides uniqueness for tracing.
|
||||
"""
|
||||
# Use time-based unique identifier
|
||||
tick = time.ticks_ms()
|
||||
random_byte = __builtins__.chr(tick % 256) if hasattr(__builtins__, 'chr') else chr(tick % 256)
|
||||
return f"mp-{tick:08x}-{random_byte}"
|
||||
|
||||
|
||||
def _serialize_data(data, payload_type):
|
||||
"""
|
||||
Serialize data to bytes based on type.
|
||||
|
||||
MicroPython version - no table support.
|
||||
|
||||
Args:
|
||||
data: Data to serialize
|
||||
payload_type: Target format: "text", "dictionary", "image", "audio", "video", "binary"
|
||||
|
||||
Returns:
|
||||
bytes: Serialized data
|
||||
|
||||
Raises:
|
||||
ValueError: If payload_type is unknown
|
||||
"""
|
||||
if payload_type == 'text':
|
||||
if isinstance(data, str):
|
||||
return data.encode('utf-8')
|
||||
else:
|
||||
raise ValueError('Text data must be a string')
|
||||
elif payload_type == 'dictionary':
|
||||
json_str = json.dumps(data)
|
||||
return json_str.encode('utf-8')
|
||||
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
return bytes(data)
|
||||
else:
|
||||
raise ValueError(f'{payload_type} data must be bytes')
|
||||
else:
|
||||
raise ValueError(f'Unknown payload_type: {payload_type}')
|
||||
|
||||
|
||||
def _deserialize_data(data, payload_type):
|
||||
"""
|
||||
Deserialize bytes to data based on type.
|
||||
|
||||
MicroPython version - no table support.
|
||||
|
||||
Args:
|
||||
data: Serialized data as bytes
|
||||
payload_type: Data type
|
||||
|
||||
Returns:
|
||||
Deserialized data
|
||||
"""
|
||||
if payload_type == 'text':
|
||||
return data.decode('utf-8')
|
||||
elif payload_type == 'dictionary':
|
||||
json_str = data.decode('utf-8')
|
||||
return json.loads(json_str)
|
||||
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
||||
return data
|
||||
else:
|
||||
raise ValueError(f'Unknown payload_type: {payload_type}')
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# File Server Handlers (Simplified for MicroPython)
|
||||
# ============================================================================
|
||||
|
||||
def _http_request(method, url, headers=None, data=None):
|
||||
"""
|
||||
Make an HTTP request (simplified for MicroPython).
|
||||
|
||||
Args:
|
||||
method: HTTP method ('GET' or 'POST')
|
||||
url: URL to request
|
||||
headers: Optional headers dict
|
||||
data: Optional request body
|
||||
|
||||
Returns:
|
||||
tuple: (status_code, response_body, headers_dict)
|
||||
"""
|
||||
# Parse URL
|
||||
# Simple parsing: http://host:port/path
|
||||
try:
|
||||
# Remove protocol
|
||||
url = url.replace('http://', '').replace('https://', '')
|
||||
|
||||
# Split host and path
|
||||
if '/' in url:
|
||||
host_part, path = url.split('/', 1)
|
||||
path = '/' + path
|
||||
else:
|
||||
host_part = url
|
||||
path = '/'
|
||||
|
||||
# Split host and port
|
||||
if ':' in host_part:
|
||||
host, port = host_part.split(':')
|
||||
port = int(port)
|
||||
else:
|
||||
host = host_part
|
||||
port = 80
|
||||
|
||||
# Connect
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(10) # 10 second timeout
|
||||
sock.connect((host, port))
|
||||
|
||||
# Build request
|
||||
request_lines = [f"{method} {path} HTTP/1.1"]
|
||||
request_lines.append(f"Host: {host}")
|
||||
request_lines.append("Connection: close")
|
||||
|
||||
if headers:
|
||||
for key, value in headers.items():
|
||||
request_lines.append(f"{key}: {value}")
|
||||
|
||||
request_lines.append("")
|
||||
|
||||
request = '\r\n'.join(request_lines)
|
||||
if data:
|
||||
request += data.decode('utf-8') if isinstance(data, bytes) else data
|
||||
|
||||
sock.send(request.encode('utf-8'))
|
||||
|
||||
# Read response
|
||||
response = b''
|
||||
while True:
|
||||
try:
|
||||
chunk = sock.recv(1024)
|
||||
if not chunk:
|
||||
break
|
||||
response += chunk
|
||||
except Exception:
|
||||
break
|
||||
|
||||
sock.close()
|
||||
|
||||
# Parse response
|
||||
if b'\r\n\r\n' in response:
|
||||
header_part, body = response.split(b'\r\n\r\n', 1)
|
||||
header_lines = header_part.decode('utf-8', errors='ignore').split('\r\n')
|
||||
else:
|
||||
header_lines = response.decode('utf-8', errors='ignore').split('\r\n')
|
||||
body = b''
|
||||
|
||||
# Parse status line
|
||||
status_line = header_lines[0]
|
||||
status_code = int(status_line.split(' ')[1]) if len(status_line.split(' ')) > 1 else 200
|
||||
|
||||
# Parse headers
|
||||
headers_dict = {}
|
||||
for line in header_lines[1:]:
|
||||
if ':' in line:
|
||||
key, value = line.split(':', 1)
|
||||
headers_dict[key.strip()] = value.strip()
|
||||
|
||||
return status_code, body, headers_dict
|
||||
|
||||
except Exception as e:
|
||||
# Return error indicator
|
||||
return 0, b'', {'error': str(e)}
|
||||
|
||||
|
||||
def _simple_fileserver_upload(fileserver_url, dataname, data):
|
||||
"""
|
||||
Simple file upload handler for MicroPython (simplified plik-style).
|
||||
|
||||
Note: This is a basic implementation. For production, use a proper
|
||||
file server with the same API as plik.
|
||||
|
||||
Args:
|
||||
fileserver_url: Base URL of file server
|
||||
dataname: Name of the file
|
||||
data: Binary data
|
||||
|
||||
Returns:
|
||||
Dict with keys: 'status', 'url'
|
||||
"""
|
||||
# Simple implementation: return a constructed URL
|
||||
# In practice, you'd need to implement actual upload logic
|
||||
|
||||
# For MicroPython, we'll use a simple approach
|
||||
# Generate a unique ID based on timestamp and dataname
|
||||
unique_id = f"{int(time.time())}-{abs(hash(dataname)) % 10000:04d}"
|
||||
|
||||
# Construct download URL
|
||||
url = f"{fileserver_url}/file/{unique_id}/{dataname}"
|
||||
|
||||
# In a real implementation, you would:
|
||||
# 1. POST to /upload to get upload ID
|
||||
# 2. POST file data to /file/{upload_id}
|
||||
# 3. Return the download URL
|
||||
|
||||
# For now, return a mock response
|
||||
return {
|
||||
'status': 200,
|
||||
'url': url
|
||||
}
|
||||
|
||||
|
||||
def _simple_fileserver_download(url, max_retries, base_delay, max_delay, correlation_id):
|
||||
"""
|
||||
Simple file download handler with exponential backoff.
|
||||
|
||||
Args:
|
||||
url: URL to download from
|
||||
max_retries: Maximum retry attempts
|
||||
base_delay: Initial delay in milliseconds
|
||||
max_delay: Maximum delay in milliseconds
|
||||
correlation_id: Correlation ID for logging
|
||||
|
||||
Returns:
|
||||
bytearray: Downloaded data
|
||||
"""
|
||||
delay = base_delay
|
||||
|
||||
for attempt in range(1, max_retries + 1):
|
||||
try:
|
||||
status_code, body, headers = _http_request('GET', url)
|
||||
|
||||
if status_code == 200:
|
||||
log_trace(correlation_id, f"Successfully fetched data from {url} on attempt {attempt}")
|
||||
return bytearray(body)
|
||||
else:
|
||||
log_trace(correlation_id, f"Download failed with status {status_code}")
|
||||
if attempt < max_retries:
|
||||
time.sleep_ms(delay)
|
||||
delay = min(delay * 2, max_delay)
|
||||
else:
|
||||
raise RuntimeError(f"Download failed with status {status_code}")
|
||||
except Exception as e:
|
||||
log_trace(correlation_id, f"Attempt {attempt} failed: {type(e).__name__}")
|
||||
if attempt < max_retries:
|
||||
time.sleep_ms(delay)
|
||||
delay = min(delay * 2, max_delay)
|
||||
else:
|
||||
raise RuntimeError(f"Failed to download after {max_retries} attempts: {e}")
|
||||
|
||||
return bytearray()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# NATS Connection (Simplified for MicroPython)
|
||||
# ============================================================================
|
||||
|
||||
class NATSConnection:
|
||||
"""
|
||||
Simplified NATS connection for MicroPython.
|
||||
|
||||
Note: This is a basic implementation. For production use, consider
|
||||
using a proper MicroPython NATS client or the full Python implementation.
|
||||
"""
|
||||
|
||||
def __init__(self, host=NATS_SERVER_HOST, port=NATS_SERVER_PORT):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.sock = None
|
||||
self.connected = False
|
||||
|
||||
def connect(self):
|
||||
"""Connect to NATS server."""
|
||||
try:
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.settimeout(10)
|
||||
self.sock.connect((self.host, self.port))
|
||||
self.connected = True
|
||||
|
||||
# Send CONNECT command (simplified)
|
||||
connect_cmd = b'\x08CONNECT{"verbose":false,"protocol":1,"version":"1.0.2","auth_token":"","lang":"mpy"}\r\n'
|
||||
self.sock.send(connect_cmd)
|
||||
|
||||
# Wait for PONG
|
||||
response = self.sock.recv(1024)
|
||||
return True
|
||||
except Exception as e:
|
||||
self.connected = False
|
||||
raise RuntimeError(f"Failed to connect to NATS: {e}")
|
||||
|
||||
def publish(self, subject, message):
|
||||
"""Publish a message to a subject."""
|
||||
if not self.connected or not self.sock:
|
||||
raise RuntimeError("Not connected to NATS")
|
||||
|
||||
# Publish command: PUB subject size\r\nmessage\r\n
|
||||
msg_bytes = message.encode('utf-8') if isinstance(message, str) else message
|
||||
pub_cmd = f"PUB {subject} {len(msg_bytes)}\r\n".encode('utf-8')
|
||||
pub_cmd += msg_bytes + b'\r\n'
|
||||
|
||||
self.sock.send(pub_cmd)
|
||||
|
||||
# Wait for +OK response (simplified)
|
||||
try:
|
||||
self.sock.recv(4)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
"""Close the connection."""
|
||||
if self.sock:
|
||||
try:
|
||||
self.sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.sock = None
|
||||
self.connected = False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Core Functions
|
||||
# ============================================================================
|
||||
|
||||
def smartsend(
|
||||
subject,
|
||||
data,
|
||||
broker_url=DEFAULT_BROKER_URL,
|
||||
fileserver_url=DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler=_simple_fileserver_upload,
|
||||
size_threshold=DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id=None,
|
||||
msg_purpose="chat",
|
||||
sender_name="NATSBridge",
|
||||
receiver_name="",
|
||||
receiver_id="",
|
||||
reply_to="",
|
||||
reply_to_msg_id="",
|
||||
is_publish=True,
|
||||
msg_id=None,
|
||||
sender_id=None
|
||||
):
|
||||
"""
|
||||
Send data via NATS with automatic transport selection.
|
||||
|
||||
MicroPython version - synchronous, limited features.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
data: List of (dataname, data, type) tuples
|
||||
broker_url: NATS server URL
|
||||
fileserver_url: HTTP file server URL
|
||||
fileserver_upload_handler: Function to handle fileserver uploads
|
||||
size_threshold: Threshold in bytes (enforced MAX_PAYLOAD_SIZE)
|
||||
correlation_id: Correlation ID for tracing
|
||||
msg_purpose: Purpose of the message
|
||||
sender_name: Name of the sender
|
||||
receiver_name: Name of the receiver
|
||||
receiver_id: UUID of the receiver
|
||||
reply_to: Topic to reply to
|
||||
reply_to_msg_id: Message ID this message is replying to
|
||||
is_publish: Whether to automatically publish the message
|
||||
msg_id: Message ID
|
||||
sender_id: Sender ID
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
|
||||
Example:
|
||||
```python
|
||||
from natbridge_mpy import NATSBridge
|
||||
|
||||
bridge = NATSBridge()
|
||||
env, env_json_str = bridge.smartsend(
|
||||
"/chat",
|
||||
[("message", "Hello!", "text"), ("data", data_bytes, "binary")],
|
||||
size_threshold=100000 # Lower threshold for MicroPython
|
||||
)
|
||||
```
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = generate_uuid()
|
||||
if msg_id is None:
|
||||
msg_id = generate_uuid()
|
||||
if sender_id is None:
|
||||
sender_id = generate_uuid()
|
||||
|
||||
# Enforce MAX_PAYLOAD_SIZE for MicroPython
|
||||
effective_threshold = min(size_threshold, MAX_PAYLOAD_SIZE)
|
||||
|
||||
log_trace(correlation_id, f"Starting smartsend for subject: {subject}")
|
||||
|
||||
# Process each payload
|
||||
payloads = []
|
||||
for dataname, payload_data, payload_type in data:
|
||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
||||
payload_size = len(payload_bytes)
|
||||
|
||||
# Check against hard limit
|
||||
if payload_size > MAX_PAYLOAD_SIZE:
|
||||
raise MemoryError(f"Payload '{dataname}' exceeds max size {MAX_PAYLOAD_SIZE} bytes")
|
||||
|
||||
log_trace(correlation_id, f"Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes")
|
||||
|
||||
# Always use direct transport in MicroPython (link transport not fully supported)
|
||||
# But respect size_threshold for smaller payloads
|
||||
if payload_size < effective_threshold:
|
||||
# Direct path - Base64 encode
|
||||
payload_b64 = base64.b64encode(payload_bytes).decode('ascii')
|
||||
log_trace(correlation_id, f"Using direct transport for {payload_size} bytes")
|
||||
|
||||
payloads.append({
|
||||
'id': generate_uuid(),
|
||||
'dataname': dataname,
|
||||
'payload_type': payload_type,
|
||||
'transport': 'direct',
|
||||
'encoding': 'base64',
|
||||
'size': payload_size,
|
||||
'data': payload_b64,
|
||||
'metadata': {'payload_bytes': payload_size}
|
||||
})
|
||||
else:
|
||||
# Link path (limited support)
|
||||
log_trace(correlation_id, "Using link transport, uploading to fileserver")
|
||||
|
||||
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||||
|
||||
log_trace(correlation_id, f"Uploaded to URL: {response.get('url', 'N/A')}")
|
||||
|
||||
payloads.append({
|
||||
'id': generate_uuid(),
|
||||
'dataname': dataname,
|
||||
'payload_type': payload_type,
|
||||
'transport': 'link',
|
||||
'encoding': 'none',
|
||||
'size': payload_size,
|
||||
'data': response.get('url', ''),
|
||||
'metadata': {}
|
||||
})
|
||||
|
||||
# Build envelope
|
||||
env = {
|
||||
'correlation_id': correlation_id,
|
||||
'msg_id': msg_id,
|
||||
'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()),
|
||||
'send_to': subject,
|
||||
'msg_purpose': msg_purpose,
|
||||
'sender_name': sender_name,
|
||||
'sender_id': sender_id,
|
||||
'receiver_name': receiver_name,
|
||||
'receiver_id': receiver_id,
|
||||
'reply_to': reply_to,
|
||||
'reply_to_msg_id': reply_to_msg_id,
|
||||
'broker_url': broker_url,
|
||||
'metadata': {},
|
||||
'payloads': payloads
|
||||
}
|
||||
|
||||
env_json_str = json.dumps(env)
|
||||
|
||||
# Publish if requested
|
||||
if is_publish:
|
||||
try:
|
||||
conn = NATSConnection()
|
||||
conn.connect()
|
||||
conn.publish(subject, env_json_str)
|
||||
conn.close()
|
||||
log_trace(correlation_id, f"Message published to {subject}")
|
||||
except Exception as e:
|
||||
log_trace(correlation_id, f"Failed to publish: {e}")
|
||||
# Don't raise - MicroPython should be resilient
|
||||
|
||||
return env, env_json_str
|
||||
|
||||
|
||||
def smartreceive(msg, fileserver_download_handler=None, max_retries=3, base_delay=100, max_delay=1000):
|
||||
"""
|
||||
Receive and process NATS message.
|
||||
|
||||
MicroPython version - synchronous.
|
||||
|
||||
Args:
|
||||
msg: Message object with 'payload' attribute
|
||||
fileserver_download_handler: Function to handle fileserver downloads
|
||||
max_retries: Maximum retry attempts
|
||||
base_delay: Initial delay in milliseconds
|
||||
max_delay: Maximum delay in milliseconds
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
if fileserver_download_handler is None:
|
||||
fileserver_download_handler = _simple_fileserver_download
|
||||
|
||||
# Parse envelope
|
||||
if isinstance(msg.payload, bytes):
|
||||
payload = msg.payload.decode('utf-8')
|
||||
else:
|
||||
payload = msg.payload
|
||||
|
||||
try:
|
||||
env_json_obj = json.loads(payload)
|
||||
except Exception:
|
||||
# Handle cases where payload might be a bytearray
|
||||
payload_str = str(msg.payload)
|
||||
env_json_obj = json.loads(payload_str)
|
||||
|
||||
correlation_id = env_json_obj.get('correlation_id', 'unknown')
|
||||
log_trace(correlation_id, "Processing received message")
|
||||
|
||||
# Process payloads
|
||||
payloads_list = []
|
||||
|
||||
for payload in env_json_obj.get('payloads', []):
|
||||
transport = payload.get('transport', 'direct')
|
||||
dataname = payload.get('dataname', 'unknown')
|
||||
|
||||
if transport == 'direct':
|
||||
log_trace(correlation_id, f"Direct transport - decoding payload '{dataname}'")
|
||||
|
||||
payload_b64 = payload.get('data', '')
|
||||
payload_bytes = base64.b64decode(payload_b64)
|
||||
|
||||
data_type = payload.get('payload_type', 'text')
|
||||
data = _deserialize_data(payload_bytes, data_type)
|
||||
|
||||
payloads_list.append((dataname, data, data_type))
|
||||
elif transport == 'link':
|
||||
url = payload.get('data', '')
|
||||
log_trace(correlation_id, f"Link transport - fetching '{dataname}' from URL: {url}")
|
||||
|
||||
downloaded_data = fileserver_download_handler(
|
||||
url, max_retries, base_delay, max_delay, correlation_id
|
||||
)
|
||||
|
||||
data_type = payload.get('payload_type', 'binary')
|
||||
data = _deserialize_data(downloaded_data, data_type)
|
||||
|
||||
payloads_list.append((dataname, data, data_type))
|
||||
else:
|
||||
log_trace(correlation_id, f"Unknown transport type: {transport}")
|
||||
|
||||
env_json_obj['payloads'] = payloads_list
|
||||
return env_json_obj
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# NATSBridge Class (MicroPython)
|
||||
# ============================================================================
|
||||
|
||||
class NATSBridge:
|
||||
"""
|
||||
MicroPython NATS bridge implementation.
|
||||
|
||||
Provides a class-based interface for NATSBridge functionality.
|
||||
Optimized for memory-constrained devices.
|
||||
"""
|
||||
|
||||
def __init__(self, broker_url=None, fileserver_url=None):
|
||||
"""
|
||||
Initialize the NATSBridge client.
|
||||
|
||||
Args:
|
||||
broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL)
|
||||
fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL)
|
||||
"""
|
||||
self.broker_url = broker_url or DEFAULT_BROKER_URL
|
||||
self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
|
||||
self._nats_conn = None
|
||||
|
||||
def smartsend(self, subject, data, **kwargs):
|
||||
"""
|
||||
Send data via NATS.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
data: List of (dataname, data, type) tuples
|
||||
**kwargs: Additional options
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
"""
|
||||
# Merge with defaults
|
||||
options = {
|
||||
'broker_url': kwargs.get('broker_url', self.broker_url),
|
||||
'fileserver_url': kwargs.get('fileserver_url', self.fileserver_url),
|
||||
**kwargs
|
||||
}
|
||||
return smartsend(subject, data, **options)
|
||||
|
||||
def smartreceive(self, msg, **kwargs):
|
||||
"""
|
||||
Receive and process NATS message.
|
||||
|
||||
Args:
|
||||
msg: Message object with 'payload' attribute
|
||||
**kwargs: Additional options
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
return smartreceive(msg, **kwargs)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Module Exports
|
||||
# ============================================================================
|
||||
|
||||
__all__ = [
|
||||
# Core functions
|
||||
'smartsend',
|
||||
'smartreceive',
|
||||
|
||||
# Utility functions
|
||||
'generate_uuid',
|
||||
'_serialize_data',
|
||||
'_deserialize_data',
|
||||
|
||||
# File server handlers
|
||||
'_simple_fileserver_upload',
|
||||
'_simple_fileserver_download',
|
||||
|
||||
# NATS connection
|
||||
'NATSConnection',
|
||||
'NATSBridge',
|
||||
|
||||
# Constants
|
||||
'DEFAULT_SIZE_THRESHOLD',
|
||||
'DEFAULT_BROKER_URL',
|
||||
'DEFAULT_FILESERVER_URL',
|
||||
'MAX_PAYLOAD_SIZE',
|
||||
'NATS_SERVER_HOST',
|
||||
'NATS_SERVER_PORT',
|
||||
'NATS_RECONNECT_DELAY',
|
||||
]
|
||||
Reference in New Issue
Block a user