17 Commits

Author SHA1 Message Date
7f68d08134 update 2026-02-24 21:40:33 +07:00
ab20cd896f update 2026-02-24 21:18:19 +07:00
5a9e93d6e7 update 2026-02-24 20:38:45 +07:00
b51641dc7e update 2026-02-24 20:09:10 +07:00
45f1257896 update 2026-02-24 18:50:28 +07:00
3e2b8b1e3a update 2026-02-24 18:19:03 +07:00
90d81617ef update 2026-02-24 17:58:59 +07:00
64c62e616b update 2026-02-23 22:06:57 +07:00
2c340e37c7 update 2026-02-23 22:00:06 +07:00
7853e94d2e update 2026-02-23 21:54:50 +07:00
99bf57b154 update 2026-02-23 21:43:09 +07:00
0fa6eaf95b update 2026-02-23 21:37:50 +07:00
76f42be740 update 2026-02-23 21:32:22 +07:00
d99dc41be9 update 2026-02-23 21:09:36 +07:00
263508b8f7 update 2026-02-23 20:50:41 +07:00
0c2cca30ed update 2026-02-23 20:34:08 +07:00
46fdf668c6 update 2026-02-23 19:18:12 +07:00
23 changed files with 1113 additions and 771 deletions

View File

@@ -1,6 +1,6 @@
name = "NATSBridge"
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
version = "0.4.2"
version = "0.4.3"
authors = ["narawat <narawat@gmail.com>"]
[deps]

105
README.md
View File

@@ -173,7 +173,7 @@ from nats_bridge import smartsend
# Send a text message
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
print("Message sent!")
```
@@ -183,7 +183,7 @@ print("Message sent!")
const { smartsend } = require('./src/NATSBridge');
// Send a text message
await smartsend("/chat/room1", [
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
@@ -197,7 +197,7 @@ using NATSBridge
# Send a text message
data = [("message", "Hello World", "text")]
env = NATSBridge.smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; nats_url="nats://localhost:4222")
println("Message sent!")
```
@@ -221,8 +221,8 @@ async def main():
# Subscribe to the subject - msg comes from the callback
async def message_handler(msg):
# Receive and process message
envelope = smartreceive(msg.data)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg.data)
for dataname, data, type in env["payloads"]:
print(f"Received {dataname}: {data}")
sid = await nc.subscribe(SUBJECT, cb=message_handler)
@@ -251,8 +251,8 @@ async function main() {
for await (const msg of sub) {
// Receive and process message
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
console.log(`Received ${payload.dataname}: ${payload.data}`);
}
}
@@ -283,8 +283,8 @@ function test_receive()
log_trace("Received message on $(msg.subject)")
# Receive and process message
envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data")
end
end
@@ -310,7 +310,7 @@ Sends data either directly via NATS or via a fileserver URL, depending on payloa
```python
from nats_bridge import smartsend
env = smartsend(
env, env_json_str = smartsend(
subject, # NATS subject to publish to
data, # List of (dataname, data, type) tuples
nats_url="nats://localhost:4222", # NATS server URL
@@ -323,7 +323,8 @@ env = smartsend(
receiver_name="", # Receiver name (empty = broadcast)
receiver_id="", # Receiver UUID (empty = broadcast)
reply_to="", # Reply topic
reply_to_msg_id="" # Reply message ID
reply_to_msg_id="", # Reply message ID
is_publish=True # Whether to automatically publish to NATS
)
```
@@ -332,7 +333,7 @@ env = smartsend(
```javascript
const { smartsend } = require('./src/NATSBridge');
const env = await smartsend(
const { env, env_json_str } = await smartsend(
subject, // NATS subject
data, // Array of {dataname, data, type}
{
@@ -346,7 +347,8 @@ const env = await smartsend(
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Whether to automatically publish to NATS
}
);
```
@@ -356,8 +358,8 @@ const env = await smartsend(
```julia
using NATSBridge
env = NATSBridge.smartsend(
subject; # NATS subject
env, env_json_str = NATSBridge.smartsend(
subject, # NATS subject
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type)
nats_url::String = "nats://localhost:4222",
fileserver_url = "http://localhost:8080",
@@ -369,8 +371,12 @@ env = NATSBridge.smartsend(
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = ""
reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS
)
# Returns: (msgEnvelope_v1, JSON string)
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
# - env_json_str: JSON string representation of the envelope for publishing
```
### smartreceive
@@ -383,7 +389,7 @@ Receives and processes messages from NATS, handling both direct and link transpo
from nats_bridge import smartreceive
# Note: For nats-py, use msg.data to pass the raw message data
envelope = smartreceive(
env = smartreceive(
msg.data, # NATS message data (msg.data for nats-py)
fileserver_download_handler=_fetch_with_backoff, # Download handler
max_retries=5, # Max retry attempts
@@ -399,7 +405,7 @@ envelope = smartreceive(
const { smartreceive } = require('./src/NATSBridge');
// Note: msg is the NATS message object from subscription
const envelope = await smartreceive(
const env = await smartreceive(
msg, // NATS message (raw object from subscription)
{
fileserverDownloadHandler: customDownloadHandler,
@@ -417,7 +423,7 @@ const envelope = await smartreceive(
using NATSBridge
# Note: msg is a NATS.Msg object passed from the subscription callback
envelope = NATSBridge.smartreceive(
env, env_json_str = NATSBridge.smartreceive(
msg::NATS.Msg;
fileserverDownloadHandler::Function = _fetch_with_backoff,
max_retries::Int = 5,
@@ -488,7 +494,7 @@ await smartsend("/topic", [
#### Julia
```julia
data = [("file", large_data, "binary")]
smartsend("/topic", data, fileserver_url="http://localhost:8080")
smartsend("/topic", data; fileserver_url="http://localhost:8080")
```
---
@@ -511,14 +517,14 @@ data = [
("large_document", large_file_data, "binary")
]
smartsend("/chat/room1", data, fileserver_url="http://localhost:8080")
env, env_json_str = smartsend("/chat/room1", data, fileserver_url="http://localhost:8080")
```
#### JavaScript
```javascript
const { smartsend } = require('./src/NATSBridge');
await smartsend("/chat/room1", [
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message_text", data: "Hello!", type: "text" },
{ dataname: "user_avatar", data: image_data, type: "image" },
{ dataname: "large_document", data: large_file_data, type: "binary" }
@@ -537,7 +543,7 @@ data = [
("large_document", large_file_data, "binary")
]
NATSBridge.smartsend("/chat/room1", data, fileserver_url="http://localhost:8080")
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; fileserver_url="http://localhost:8080")
```
### Example 2: Dictionary Exchange
@@ -555,7 +561,7 @@ config = {
}
data = [("config", config, "dictionary")]
smartsend("/device/config", data)
env, env_json_str = smartsend("/device/config", data)
```
#### JavaScript
@@ -568,7 +574,7 @@ const config = {
update_interval: 60
};
await smartsend("/device/config", [
const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" }
]);
```
@@ -584,7 +590,7 @@ config = Dict(
)
data = [("config", config, "dictionary")]
NATSBridge.smartsend("/device/config", data)
env, env_json_str = NATSBridge.smartsend("/device/config", data)
```
### Example 3: Table Data (Arrow IPC)
@@ -603,7 +609,7 @@ df = pd.DataFrame({
})
data = [("students", df, "table")]
smartsend("/data/analysis", data)
env, env_json_str = smartsend("/data/analysis", data)
```
#### JavaScript
@@ -616,7 +622,7 @@ const tableData = [
{ id: 3, name: "Charlie", score: 92 }
];
await smartsend("/data/analysis", [
const { env, env_json_str } = await smartsend("/data/analysis", [
{ dataname: "students", data: tableData, type: "table" }
]);
```
@@ -633,22 +639,27 @@ df = DataFrame(
)
data = [("students", df, "table")]
NATSBridge.smartsend("/data/analysis", data)
env, env_json_str = NATSBridge.smartsend("/data/analysis", data)
```
### Example 4: Request-Response Pattern
### Example 4: Request-Response Pattern with Envelope JSON
Bi-directional communication with reply-to support.
Bi-directional communication with reply-to support. The `smartsend` function now returns both the envelope object and a JSON string that can be published directly.
#### Python/Micropython (Requester)
```python
from nats_bridge import smartsend
env = smartsend(
env, env_json_str = smartsend(
"/device/command",
[("command", {"action": "read_sensor"}, "dictionary")],
reply_to="/device/response"
)
# env: msgEnvelope_v1 object
# env_json_str: JSON string for publishing to NATS
# The env_json_str can also be published directly using NATS request-reply pattern
# nc.request("/device/command", env_json_str, reply_to="/device/response")
```
#### Python/Micropython (Responder)
@@ -666,8 +677,8 @@ async def main():
nc = await nats.connect(NATS_URL)
async def message_handler(msg):
envelope = smartreceive(msg.data)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg.data)
for dataname, data, type in env["payloads"]:
if data.get("action") == "read_sensor":
response = {"sensor_id": "sensor-001", "value": 42.5}
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
@@ -683,7 +694,7 @@ asyncio.run(main())
```javascript
const { smartsend } = require('./src/NATSBridge');
await smartsend("/device/command", [
const { env, env_json_str } = await smartsend("/device/command", [
{ dataname: "command", data: { action: "read_sensor" }, type: "dictionary" }
], {
replyTo: "/device/response"
@@ -706,8 +717,8 @@ async function main() {
const sub = nc.subscribe(SUBJECT);
for await (const msg of sub) {
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
if (payload.dataname === "command" && payload.data.action === "read_sensor") {
const response = { sensor_id: "sensor-001", value: 42.5 };
await smartsend(REPLY_SUBJECT, [
@@ -725,9 +736,9 @@ main();
```julia
using NATSBridge
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
"/device/command",
[("command", Dict("action" => "read_sensor"), "dictionary")],
[("command", Dict("action" => "read_sensor"), "dictionary")];
reply_to="/device/response"
)
```
@@ -744,8 +755,8 @@ const NATS_URL = "nats://localhost:4222"
function test_responder()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
if dataname == "command" && data["action"] == "read_sensor"
response = Dict("sensor_id" => "sensor-001", "value" => 42.5)
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
@@ -783,8 +794,8 @@ async def main():
# Receive commands - msg comes from the callback
async def message_handler(msg):
envelope = smartreceive(msg.data)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg.data)
for dataname, data, type in env["payloads"]:
if type == "dictionary" and data.get("action") == "reboot":
# Execute reboot
pass
@@ -811,8 +822,8 @@ async function main() {
const sub = nc.subscribe(SUBJECT);
for await (const msg of sub) {
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
if (payload.dataname === "temperature") {
console.log(`Temperature: ${payload.data}`);
} else if (payload.dataname === "humidity") {
@@ -836,8 +847,8 @@ const NATS_URL = "nats://localhost:4222"
function test_receiver()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
if dataname == "temperature"
println("Temperature: $data")
elseif dataname == "humidity"

View File

@@ -17,16 +17,16 @@ The system uses **handler functions** to abstract file server operations, allowi
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserverUploadHandler parameter
# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserverDownloadHandler parameter
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
```
This design allows the system to support multiple file server backends without changing the core messaging logic.
@@ -40,21 +40,21 @@ The system uses a **standardized list-of-tuples format** for all payload operati
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# {
# "correlationId": "...",
# "msgId": "...",
# "correlation_id": "...",
# "msg_id": "...",
# "timestamp": "...",
# "sendTo": "...",
# "msgPurpose": "...",
# "senderName": "...",
# "senderId": "...",
# "receiverName": "...",
# "receiverId": "...",
# "replyTo": "...",
# "replyToMsgId": "...",
# "brokerURL": "...",
# "send_to": "...",
# "msg_purpose": "...",
# "sender_name": "...",
# "sender_id": "...",
# "receiver_name": "...",
# "receiver_id": "...",
# "reply_to": "...",
# "reply_to_msg_id": "...",
# "broker_url": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
@@ -78,17 +78,16 @@ This design allows per-payload type specification, enabling **mixed-content mess
smartsend(
"/test",
[("dataname1", data1, "dictionary")], # List with one tuple (data, type)
nats_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload,
metadata=user_provided_envelope_level_metadata
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads in one message with different types
smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
nats_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Mixed content (e.g., chat with text, image, audio)
@@ -99,13 +98,14 @@ smartsend(
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
nats_url="nats://localhost:4222"
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay)
# envelope["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# envelope["correlationId"], envelope["msgId"], etc.
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
```
## Architecture Diagram
@@ -138,48 +138,48 @@ flowchart TD
## System Components
### 1. msgEnvelope_v1 - Message Envelope
### 1. msg_envelope_v1 - Message Envelope
The `msgEnvelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications.
The `msg_envelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications.
**Julia Structure:**
```julia
struct msgEnvelope_v1
correlationId::String # Unique identifier to track messages across systems
msgId::String # This message id
timestamp::String # Message published timestamp
struct msg_envelope_v1
correlation_id::String # Unique identifier to track messages across systems
msg_id::String # This message id
timestamp::String # Message published timestamp
sendTo::String # Topic/subject the sender sends to
msgPurpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...)
senderName::String # Sender name (e.g., "agent-wine-web-frontend")
senderId::String # Sender id (uuid4)
receiverName::String # Message receiver name (e.g., "agent-backend")
receiverId::String # Message receiver id (uuid4 or nothing for broadcast)
replyTo::String # Topic to reply to
replyToMsgId::String # Message id this message is replying to
brokerURL::String # NATS server address
send_to::String # Topic/subject the sender sends to
msg_purpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...)
sender_name::String # Sender name (e.g., "agent-wine-web-frontend")
sender_id::String # Sender id (uuid4)
receiver_name::String # Message receiver name (e.g., "agent-backend")
receiver_id::String # Message receiver id (uuid4 or nothing for broadcast)
reply_to::String # Topic to reply to
reply_to_msg_id::String # Message id this message is replying to
broker_url::String # NATS server address
metadata::Dict{String, Any}
payloads::AbstractArray{msgPayload_v1} # Multiple payloads stored here
payloads::Vector{msg_payload_v1} # Multiple payloads stored here
end
```
**JSON Schema:**
```json
{
"correlationId": "uuid-v4-string",
"msgId": "uuid-v4-string",
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"sendTo": "topic/subject",
"msgPurpose": "ACK | NACK | updateStatus | shutdown | chat",
"senderName": "agent-wine-web-frontend",
"senderId": "uuid4",
"receiverName": "agent-backend",
"receiverId": "uuid4",
"replyTo": "topic",
"replyToMsgId": "uuid4",
"brokerURL": "nats://localhost:4222",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
@@ -189,7 +189,7 @@ end
{
"id": "uuid4",
"dataname": "login_image",
"type": "image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,
@@ -201,7 +201,7 @@ end
{
"id": "uuid4",
"dataname": "large_data",
"type": "table",
"payload_type": "table",
"transport": "link",
"encoding": "none",
"size": 524288,
@@ -214,16 +214,16 @@ end
}
```
### 2. msgPayload_v1 - Payload Structure
### 2. msg_payload_v1 - Payload Structure
The `msgPayload_v1` structure provides flexible payload handling for various data types across all supported platforms.
The `msg_payload_v1` structure provides flexible payload handling for various data types across all supported platforms.
**Julia Structure:**
```julia
struct msgPayload_v1
struct msg_payload_v1
id::String # Id of this payload (e.g., "uuid4")
dataname::String # Name of this payload (e.g., "login_image")
type::String # "text | dictionary | table | image | audio | video | binary"
payload_type::String # "text | dictionary | table | image | audio | video | binary"
transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc"
size::Integer # Data size in bytes
@@ -383,13 +383,32 @@ graph TD
```julia
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, Any, String}}; # No standalone type parameter
nats_url::String = "nats://localhost:4222",
fileserverUploadHandler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000 # 1MB
data::AbstractArray{Tuple{String, Any, String}, 1}; # List of (dataname, data, type) tuples
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS
)
```
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
- `env_json_str::String` - JSON string representation of the envelope for publishing
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
The envelope object can be accessed directly for programmatic use, while the JSON string can be published directly to NATS using the request-reply pattern.
**Input Format:**
- `data::AbstractArray{Tuple{String, Any, String}}` - **Must be a list of (dataname, data, type) tuples**: `[("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
@@ -406,8 +425,8 @@ function smartsend(
```julia
function smartreceive(
msg::NATS.Message,
fileserverDownloadHandler::Function;
msg::NATS.Msg;
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
@@ -416,7 +435,7 @@ function smartreceive(
# Iterate through all payloads
# For each payload: check transport type
# If direct: decode Base64 payload
# If link: fetch from URL with exponential backoff using fileserverDownloadHandler
# If link: fetch from URL with exponential backoff using fileserver_download_handler
# Deserialize payload based on type
# Return envelope dictionary with all metadata and deserialized payloads
end
@@ -424,7 +443,7 @@ end
**Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
@@ -434,71 +453,186 @@ end
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff (via `fileserverDownloadHandler`)
- If `link`: fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserverDownloadHandler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
**Note:** The `fileserver_download_handler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
### JavaScript Implementation
#### Dependencies
- `nats.js` - Core NATS functionality
- `apache-arrow` - Arrow IPC serialization
- `uuid` - Correlation ID generation
- `uuid` - Correlation ID and message ID generation
- `base64-arraybuffer` - Base64 encoding/decoding
- `node-fetch` or `fetch` - HTTP client for file server
#### smartsend Function
```javascript
async function smartsend(subject, data, options = {})
// data format: [(dataname, data, type), ...]
// options object should include:
// - natsUrl: NATS server URL
// - fileserverUrl: base URL of the file server
// - sizeThreshold: threshold in bytes for transport selection
// - correlationId: optional correlation ID for tracing
async function smartsend(
subject,
data, // List of (dataname, data, type) tuples: [(dataname1, data1, type1), ...]
options = {}
)
```
**Options:**
- `broker_url` (String) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (String) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (Number) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (String) - Optional correlation ID for tracing
- `msg_purpose` (String) - Purpose of the message (default: `"chat"`)
- `sender_name` (String) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (String) - Message receiver name (default: `""`)
- `receiver_id` (String) - Message receiver ID (default: `""`)
- `reply_to` (String) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (String) - Message ID this message is replying to (default: `""`)
- `fileserver_upload_handler` (Function) - Custom upload handler function
**Return Value:**
- Returns a Promise that resolves to an object containing:
- `env` - The envelope object containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
- `published` - Boolean indicating whether the message was automatically published to NATS
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Flow:**
1. Iterate through the list of (dataname, data, type) tuples
2. For each payload: extract the type from the tuple and serialize accordingly
3. Check payload size
4. If < threshold: publish directly to NATS
5. If >= threshold: upload to HTTP server, publish NATS with URL
1. Generate correlation ID and message ID if not provided
2. Iterate through the list of `(dataname, data, type)` tuples
3. For each payload:
- Serialize based on payload type
- Check payload size
- If < threshold: Base64 encode and include in envelope
- If >= threshold: Upload to HTTP server, store URL in envelope
4. Publish the JSON envelope to NATS
5. Return envelope object and JSON string
#### smartreceive Handler
```javascript
async function smartreceive(msg, options = {})
// options object should include:
// - fileserverDownloadHandler: function to fetch data from file server URL
// - max_retries: maximum retry attempts for fetching URL
// - base_delay: initial delay for exponential backoff in ms
// - max_delay: maximum delay for exponential backoff in ms
// - correlationId: optional correlation ID for tracing
```
**Options:**
- `fileserver_download_handler` (Function) - Custom download handler function
- `max_retries` (Number) - Maximum retry attempts for fetching URL (default: `5`)
- `base_delay` (Number) - Initial delay for exponential backoff in ms (default: `100`)
- `max_delay` (Number) - Maximum delay for exponential backoff in ms (default: `5000`)
- `correlation_id` (String) - Optional correlation ID for tracing
**Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
- Returns a Promise that resolves to an object containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
- `payloads` - List of dictionaries, each containing deserialized payload data with keys: `dataname`, `data`, `payload_type`
**Process Flow:**
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads`
2. Iterate through each payload in `payloads` array
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`.
### Python/Micropython Implementation
#### Dependencies
- `nats-python` - Core NATS functionality
- `pyarrow` - Arrow IPC serialization
- `uuid` - Correlation ID and message ID generation
- `base64` - Base64 encoding/decoding
- `requests` or `aiohttp` - HTTP client for file server
#### smartsend Function
```python
async def smartsend(
subject: str,
data: List[Tuple[str, Any, str]], # List of (dataname, data, type) tuples
options: Dict = {}
)
```
**Options:**
- `broker_url` (str) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (str) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (int) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (str) - Optional correlation ID for tracing
- `msg_purpose` (str) - Purpose of the message (default: `"chat"`)
- `sender_name` (str) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (str) - Message receiver name (default: `""`)
- `receiver_id` (str) - Message receiver ID (default: `""`)
- `reply_to` (str) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (str) - Message ID this message is replying to (default: `""`)
- `fileserver_upload_handler` (Callable) - Custom upload handler function
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env` - The envelope dictionary containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Flow:**
1. Generate correlation ID and message ID if not provided
2. Iterate through the list of `(dataname, data, type)` tuples
3. For each payload:
- Serialize based on payload type
- Check payload size
- If < threshold: Base64 encode and include in envelope
- If >= threshold: Upload to HTTP server, store URL in envelope
4. Publish the JSON envelope to NATS
5. Return envelope dictionary and JSON string
#### smartreceive Handler
```python
async def smartreceive(
msg: NATS.Message,
options: Dict = {}
)
```
**Options:**
- `fileserver_download_handler` (Callable) - Custom download handler function
- `max_retries` (int) - Maximum retry attempts for fetching URL (default: `5`)
- `base_delay` (int) - Initial delay for exponential backoff in ms (default: `100`)
- `max_delay` (int) - Maximum delay for exponential backoff in ms (default: `5000`)
- `correlation_id` (str) - Optional correlation ID for tracing
**Output Format:**
- Returns a dictionary containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of tuples, each containing `(dataname, data, payload_type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads` list
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str)` and returns `bytes`.
## Scenario Implementations
### Scenario 1: Command & Control (Small Dictionary)

View File

@@ -19,49 +19,102 @@ NATSBridge is implemented in three languages, each providing the same API:
| **JavaScript** | [`src/NATSBridge.js`](../src/NATSBridge.js) | JavaScript implementation for Node.js and browsers |
| **Python/Micropython** | [`src/nats_bridge.py`](../src/nats_bridge.py) | Python implementation for desktop and microcontrollers |
### Multi-Payload Support
### File Server Handler Architecture
The implementation uses a **standardized list-of-tuples format** for all payload operations. **Even when sending a single payload, the user must wrap it in a list.**
The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
**Handler Function Signatures:**
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
```
This design allows the system to support multiple file server backends without changing the core messaging logic.
### Multi-Payload Support (Standard API)
The system uses a **standardized list-of-tuples format** for all payload operations. **Even when sending a single payload, the user must wrap it in a list.**
**API Standard:**
```julia
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# {
# "correlationId": "...",
# "msgId": "...",
# "correlation_id": "...",
# "msg_id": "...",
# "timestamp": "...",
# "sendTo": "...",
# "msgPurpose": "...",
# "senderName": "...",
# "senderId": "...",
# "receiverName": "...",
# "receiverId": "...",
# "replyTo": "...",
# "replyToMsgId": "...",
# "brokerURL": "...",
# "send_to": "...",
# "msg_purpose": "...",
# "sender_name": "...",
# "sender_id": "...",
# "receiver_name": "...",
# "receiver_id": "...",
# "reply_to": "...",
# "reply_to_msg_id": "...",
# "broker_url": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
```
Where `type` can be: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Supported Types:**
- `"text"` - Plain text
- `"dictionary"` - JSON-serializable dictionaries (Dict, NamedTuple)
- `"table"` - Tabular data (DataFrame, array of structs)
- `"image"` - Image data (Bitmap, PNG/JPG bytes)
- `"audio"` - Audio data (WAV, MP3 bytes)
- `"video"` - Video data (MP4, AVI bytes)
- `"binary"` - Generic binary data (Vector{UInt8})
This design allows per-payload type specification, enabling **mixed-content messages** where different payloads can use different serialization formats in a single message.
**Examples:**
```julia
# Single payload - still wrapped in a list (type is required as third element)
smartsend("/test", [(dataname1, data1, "text")], ...)
# Single payload - still wrapped in a list
smartsend(
"/test",
[("dataname1", data1, "dictionary")], # List with one tuple (data, type)
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads in one message (each payload has its own type)
smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], ...)
# Multiple payloads in one message with different types
smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Mixed content (e.g., chat with text, image, audio)
smartsend(
"/chat",
[
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, ...)
# envelope["payloads"] = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...]
# envelope["correlationId"], envelope["msgId"], etc.
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
```
## Cross-Platform Interoperability
@@ -98,14 +151,14 @@ NATSBridge is designed for seamless communication between Julia, JavaScript, and
# Julia sender
using NATSBridge
data = [("message", "Hello from Julia!", "text")]
smartsend("/cross_platform", data, nats_url="nats://localhost:4222")
smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
```
```javascript
// JavaScript receiver
const { smartreceive } = require('./src/NATSBridge');
const envelope = await smartreceive(msg);
// envelope.payloads[0].data === "Hello from Julia!"
const env = await smartreceive(msg);
// env.payloads[0].data === "Hello from Julia!"
```
```python
@@ -146,15 +199,31 @@ All three implementations (Julia, JavaScript, Python/Micropython) follow the sam
└─────────────────┘ └─────────────────┘
```
## Files
## smartsend Return Value
The `smartsend` function now returns a tuple containing both the envelope object and the JSON string representation:
```julia
env, env_json_str = smartsend(...)
# env::msg_envelope_v1 - The envelope object with all metadata and payloads
# env_json_str::String - JSON string for publishing to NATS
```
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
This enables two use cases:
1. **Programmatic envelope access**: Access envelope fields directly via the `env` object
2. **Direct JSON publishing**: Publish the JSON string directly using NATS request-reply pattern
### Julia Module: [`src/NATSBridge.jl`](../src/NATSBridge.jl)
The Julia implementation provides:
- **[`MessageEnvelope`](src/NATSBridge.jl)**: Struct for the unified JSON envelope
- **[`SmartSend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size
- **[`SmartReceive()`](src/NATSBridge.jl)**: Handles both direct and link transport
- **[`msg_envelope_v1`](src/NATSBridge.jl)**: Struct for the unified JSON envelope
- **[`msg_payload_v1`](src/NATSBridge.jl)**: Struct for individual payload representation
- **[`smartsend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size
- **[`smartreceive()`](src/NATSBridge.jl)**: Handles both direct and link transport
### JavaScript Module: [`src/NATSBridge.js`](../src/NATSBridge.js)
@@ -247,7 +316,53 @@ node test/scenario3_julia_to_julia.js
## Usage
### Scenario 0: Basic Multi-Payload Example
### Scenario 1: Command & Control (Small Dictionary)
**Focus:** Sending small dictionary configurations across platforms. This is the simplest use case for command and control scenarios.
**Julia (Sender/Receiver):**
```julia
using NATSBridge
# Subscribe to control subject
# Parse JSON envelope
# Execute simulation with parameters
# Send acknowledgment
```
**JavaScript (Sender/Receiver):**
```javascript
const { smartsend } = require('./src/NATSBridge');
// Create small dictionary config
// Send via smartsend with type="dictionary"
const config = {
step_size: 0.01,
iterations: 1000,
threshold: 0.5
};
await smartsend("control", [
{ dataname: "config", data: config, type: "dictionary" }
]);
```
**Python/Micropython (Sender/Receiver):**
```python
from nats_bridge import smartsend
# Create small dictionary config
# Send via smartsend with type="dictionary"
config = {
"step_size": 0.01,
"iterations": 1000,
"threshold": 0.5
}
smartsend("control", [("config", config, "dictionary")])
```
### Basic Multi-Payload Example
#### Python/Micropython (Sender)
```python
@@ -262,16 +377,16 @@ smartsend(
)
# Even single payload must be wrapped in a list with type
smartsend("/test", [("single_data", mydata, "dictionary")])
smartsend("/test", [("single_data", mydata, "dictionary")], nats_url="nats://localhost:4222")
```
#### Python/Micropython (Receiver)
```python
from nats_bridge import smartreceive
# Receive returns a list of (dataname, data, type) tuples
payloads = smartreceive(msg)
# payloads = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
# Receive returns a dictionary with envelope metadata and payloads field
env = smartreceive(msg)
# env["payloads"] = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
```
#### JavaScript (Sender)
@@ -315,18 +430,18 @@ const nc = await connect({ servers: ['nats://localhost:4222'] });
const sub = nc.subscribe("control");
for await (const msg of sub) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process the payloads from the envelope
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
const { dataname, data, type } = payload;
console.log(`Received ${dataname} of type ${type}`);
console.log(`Data: ${JSON.stringify(data)}`);
}
// Also access envelope metadata
console.log(`Correlation ID: ${envelope.correlationId}`);
console.log(`Message ID: ${envelope.msgId}`);
console.log(`Correlation ID: ${env.correlation_id}`);
console.log(`Message ID: ${env.msg_id}`);
}
```
@@ -344,19 +459,21 @@ df = DataFrame(
category = rand(["A", "B", "C"], 10_000_000)
)
# Send via SmartSend - wrapped in a list (type is part of each tuple)
await SmartSend("analysis_results", [("table_data", df, "table")]);
# Send via smartsend - wrapped in a list (type is part of each tuple)
env, env_json_str = smartsend("analysis_results", [("table_data", df, "table")], broker_url="nats://localhost:4222")
# env: msg_envelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
```
#### JavaScript (Receiver)
```javascript
const { smartreceive } = require('./src/NATSBridge');
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Use table data from the payloads field
// Note: Tables are sent as arrays of objects in JavaScript
const table = envelope.payloads;
const table = env.payloads;
```
### Scenario 3: Live Binary Processing
@@ -406,10 +523,10 @@ from nats_bridge import smartreceive
# Receive binary data
def process_binary(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process the binary data from envelope.payloads
for dataname, data, type in envelope["payloads"]:
# Process the binary data from env.payloads
for dataname, data, type in env["payloads"]:
if type == "binary":
# data is bytes
print(f"Received binary data: {dataname}, size: {len(data)}")
@@ -422,10 +539,10 @@ const { smartreceive } = require('./src/NATSBridge');
// Receive binary data
function process_binary(msg) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process the binary data from envelope.payloads
for (const payload of envelope.payloads) {
// Process the binary data from env.payloads
for (const payload of env.payloads) {
if (payload.type === "binary") {
// data is an ArrayBuffer or Uint8Array
console.log(`Received binary data: ${payload.dataname}, size: ${payload.data.length}`);
@@ -444,7 +561,7 @@ using NATSBridge
function publish_health_status(nats_url)
# Send status wrapped in a list (type is part of each tuple)
status = Dict("cpu" => rand(), "memory" => rand())
smartsend("health", [("status", status, "dictionary")], nats_url=nats_url)
smartsend("health", [("status", status, "dictionary")], broker_url=nats_url)
sleep(5) # Every 5 seconds
end
```
@@ -466,8 +583,8 @@ const consumer = await js.pullSubscribe("health", {
// Process historical and real-time messages
for await (const msg of consumer) {
const envelope = await smartreceive(msg);
// envelope.payloads contains the list of payloads
const env = await smartreceive(msg);
// env.payloads contains the list of payloads
// Each payload has: dataname, data, type
msg.ack();
}
@@ -484,10 +601,10 @@ import json
# Device configuration handler
def handle_device_config(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process configuration from payloads
for dataname, data, type in envelope["payloads"]:
for dataname, data, type in env["payloads"]:
if type == "dictionary":
print(f"Received configuration: {data}")
# Apply configuration to device
@@ -506,7 +623,7 @@ def handle_device_config(msg):
"device/response",
[("config", config, "dictionary")],
nats_url="nats://localhost:4222",
reply_to=envelope.get("replyTo")
reply_to=env.get("reply_to")
)
```
@@ -566,11 +683,11 @@ smartsend(
const { smartreceive, smartsend } = require('./src/NATSBridge');
// Receive NATS message with direct transport
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Decode Base64 payload (for direct transport)
// For tables, data is in envelope.payloads
const table = envelope.payloads; // Array of objects
// For tables, data is in env.payloads
const table = env.payloads; // Array of objects
// User makes selection
const selection = uiComponent.getSelectedOption();
@@ -619,7 +736,7 @@ chat_message = [
smartsend(
"chat.room123",
chat_message,
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
msg_purpose="chat",
reply_to="chat.room123.responses"
)
@@ -667,7 +784,7 @@ await smartsend("chat.room123", message);
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
## Configuration
@@ -683,19 +800,19 @@ await smartsend("chat.room123", message);
```json
{
"correlationId": "uuid-v4-string",
"msgId": "uuid-v4-string",
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"sendTo": "topic/subject",
"msgPurpose": "ACK | NACK | updateStatus | shutdown | chat",
"senderName": "agent-wine-web-frontend",
"senderId": "uuid4",
"receiverName": "agent-backend",
"receiverId": "uuid4",
"replyTo": "topic",
"replyToMsgId": "uuid4",
"BrokerURL": "nats://localhost:4222",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
"content_type": "application/octet-stream",
@@ -706,7 +823,7 @@ await smartsend("chat.room123", message);
{
"id": "uuid4",
"dataname": "login_image",
"type": "image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,

View File

@@ -107,10 +107,15 @@ python3 -m http.server 8080 --directory /tmp/fileserver
```python
from nats_bridge import smartsend
# Send a text message
# Send a text message (is_publish=True by default)
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
print("Message sent!")
# Or use is_publish=False to get envelope and JSON without publishing
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222", is_publish=False)
# env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS
```
#### JavaScript
@@ -118,12 +123,19 @@ print("Message sent!")
```javascript
const { smartsend } = require('./src/NATSBridge');
// Send a text message
// Send a text message (isPublish=true by default)
await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
console.log("Message sent!");
// Or use isPublish=false to get envelope and JSON without publishing
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222", isPublish: false });
// env: MessageEnvelope object
// env_json_str: JSON string for publishing to NATS
```
#### Julia
@@ -133,7 +145,9 @@ using NATSBridge
# Send a text message
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# env: msgEnvelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
println("Message sent!")
```
@@ -145,8 +159,8 @@ println("Message sent!")
from nats_bridge import smartreceive
# Receive and process message
envelope = smartreceive(msg)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg)
for dataname, data, type in env["payloads"]:
print(f"Received {dataname}: {data}")
```
@@ -156,8 +170,8 @@ for dataname, data, type in envelope["payloads"]:
const { smartreceive } = require('./src/NATSBridge');
// Receive and process message
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
console.log(`Received ${payload.dataname}: ${payload.data}`);
}
```
@@ -168,8 +182,8 @@ for (const payload of envelope.payloads) {
using NATSBridge
# Receive and process message
envelope = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data")
end
```
@@ -194,7 +208,7 @@ config = {
# Send as dictionary type
data = [("config", config, "dictionary")]
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/device/config", data, nats_url="nats://localhost:4222")
```
#### JavaScript
@@ -208,7 +222,7 @@ const config = {
update_interval: 60
};
await smartsend("/device/config", [
const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" }
]);
```
@@ -225,7 +239,7 @@ config = Dict(
)
data = [("config", config, "dictionary")]
smartsend("/device/config", data)
env, env_json_str = smartsend("/device/config", data)
```
### Example 2: Sending Binary Data (Image)
@@ -241,7 +255,7 @@ with open("image.png", "rb") as f:
# Send as binary type
data = [("user_image", image_data, "binary")]
env = smartsend("/chat/image", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/image", data, nats_url="nats://localhost:4222")
```
#### JavaScript
@@ -253,7 +267,7 @@ const { smartsend } = require('./src/NATSBridge');
const fs = require('fs');
const image_data = fs.readFileSync('image.png');
await smartsend("/chat/image", [
const { env, env_json_str } = await smartsend("/chat/image", [
{ dataname: "user_image", data: image_data, type: "binary" }
]);
```
@@ -267,7 +281,7 @@ using NATSBridge
image_data = read("image.png")
data = [("user_image", image_data, "binary")]
smartsend("/chat/image", data)
env, env_json_str = smartsend("/chat/image", data)
```
### Example 3: Request-Response Pattern
@@ -279,13 +293,15 @@ from nats_bridge import smartsend
# Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend(
env, env_json_str = smartsend(
"/device/command",
data,
nats_url="nats://localhost:4222",
reply_to="/device/response",
reply_to_msg_id="cmd-001"
)
# env: msgEnvelope_v1 object
# env_json_str: JSON string for publishing to NATS
```
#### JavaScript (Responder)
@@ -297,10 +313,10 @@ const { smartreceive, smartsend } = require('./src/NATSBridge');
const sub = nc.subscribe("/device/command");
for await (const msg of sub) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process command
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.dataname === "command") {
const command = payload.data;
@@ -315,8 +331,8 @@ for await (const msg of sub) {
await smartsend("/device/response", [
{ dataname: "sensor_data", data: response, type: "dictionary" }
], {
reply_to: envelope.replyTo,
reply_to_msg_id: envelope.msgId
reply_to: env.replyTo,
reply_to_msg_id: env.msgId
});
}
}
@@ -342,7 +358,7 @@ import os
large_data = os.urandom(2_000_000) # 2MB of random data
# Send with file server URL
env = smartsend(
env, env_json_str = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
nats_url="nats://localhost:4222",
@@ -364,7 +380,7 @@ const largeData = new ArrayBuffer(2_000_000);
const view = new Uint8Array(largeData);
view.fill(42); // Fill with some data
await smartsend("/data/large", [
const { env, env_json_str } = await smartsend("/data/large", [
{ dataname: "large_file", data: largeData, type: "binary" }
], {
fileserverUrl: "http://localhost:8080",
@@ -380,7 +396,7 @@ using NATSBridge
# Create large data (> 1MB)
large_data = rand(UInt8, 2_000_000)
env = smartsend(
env, env_json_str = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
fileserver_url="http://localhost:8080"
@@ -409,7 +425,7 @@ data = [
("user_avatar", image_data, "image")
]
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
```
#### JavaScript
@@ -419,7 +435,7 @@ const { smartsend } = require('./src/NATSBridge');
const fs = require('fs');
await smartsend("/chat/mixed", [
const { env, env_json_str } = await smartsend("/chat/mixed", [
{
dataname: "message_text",
data: "Hello with image!",
@@ -445,7 +461,7 @@ data = [
("user_avatar", image_data, "image")
]
smartsend("/chat/mixed", data)
env, env_json_str = smartsend("/chat/mixed", data)
```
### Example 6: Table Data (Arrow IPC)
@@ -467,7 +483,7 @@ df = pd.DataFrame({
# Send as table type
data = [("students", df, "table")]
env = smartsend("/data/students", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/data/students", data, nats_url="nats://localhost:4222")
```
#### Julia
@@ -484,7 +500,7 @@ df = DataFrame(
)
data = [("students", df, "table")]
smartsend("/data/students", data)
env, env_json_str = smartsend("/data/students", data)
```
---
@@ -503,7 +519,7 @@ using NATSBridge
# Send dictionary from Julia to JavaScript
config = Dict("step_size" => 0.01, "iterations" => 1000)
data = [("config", config, "dictionary")]
smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
```
#### JavaScript Receiver
@@ -512,8 +528,8 @@ smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
const { smartreceive } = require('./src/NATSBridge');
// Receive dictionary from Julia
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
if (payload.type === "dictionary") {
console.log("Received config:", payload.data);
// payload.data = { step_size: 0.01, iterations: 1000 }
@@ -528,7 +544,7 @@ for (const payload of envelope.payloads) {
```javascript
const { smartsend } = require('./src/NATSBridge');
await smartsend("/data/transfer", [
const { env, env_json_str } = await smartsend("/data/transfer", [
{ dataname: "message", data: "Hello from JS!", type: "text" }
]);
```
@@ -538,8 +554,8 @@ await smartsend("/data/transfer", [
```python
from nats_bridge import smartreceive
envelope = smartreceive(msg)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg)
for dataname, data, type in env["payloads"]:
if type == "text":
print(f"Received from JS: {data}")
```
@@ -552,7 +568,7 @@ for dataname, data, type in envelope["payloads"]:
from nats_bridge import smartsend
data = [("message", "Hello from Python!", "text")]
smartsend("/chat/python", data)
env, env_json_str = smartsend("/chat/python", data)
```
#### Julia Receiver
@@ -560,8 +576,8 @@ smartsend("/chat/python", data)
```julia
using NATSBridge
envelope = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in env["payloads"]
if type == "text"
println("Received from Python: $data")
end

View File

@@ -132,7 +132,7 @@ class ChatUI {
});
}
await smartsend(
const { env, env_json_str } = await smartsend(
`/chat/${this.currentRoom}`,
data,
{
@@ -216,15 +216,15 @@ class ChatHandler {
}
async handleMessage(msg) {
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.downloadFile.bind(this)
});
// Extract sender info from envelope
const sender = envelope.senderName || 'Anonymous';
const sender = env.senderName || 'Anonymous';
// Process each payload
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'text') {
this.ui.addMessage(sender, payload.data);
} else if (payload.type === 'image') {
@@ -304,7 +304,7 @@ class FileUploadService {
type: 'binary'
}];
const envelope = await smartsend(
const { env, env_json_str } = await smartsend(
`/files/${recipient}`,
data,
{
@@ -314,7 +314,7 @@ class FileUploadService {
}
);
return envelope;
return env;
}
async uploadLargeFile(filePath, recipient) {
@@ -356,12 +356,12 @@ class FileDownloadService {
async downloadFile(sender, downloadId) {
// Subscribe to sender's file channel
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
});
// Process each payload
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'binary') {
const filePath = `/downloads/${payload.dataname}`;
fs.writeFileSync(filePath, payload.data);
@@ -422,9 +422,9 @@ async function uploadFile(config) {
const fileService = new FileUploadService(config.nats_url, config.fileserver_url);
try {
const envelope = await fileService.uploadFile(filePath, recipient);
const env = await fileService.uploadFile(filePath, recipient);
console.log('Upload successful!');
console.log(`File ID: ${envelope.payloads[0].id}`);
console.log(`File ID: ${env.payloads[0].id}`);
} catch (error) {
console.error('Upload failed:', error.message);
}
@@ -514,6 +514,7 @@ class SensorSender:
data = [("reading", reading.to_dict(), "dictionary")]
# Default: is_publish=True (automatically publishes to NATS)
smartsend(
f"/sensors/{sensor_id}",
data,
@@ -521,6 +522,31 @@ class SensorSender:
fileserver_url=self.fileserver_url
)
def prepare_message_only(self, sensor_id: str, value: float, unit: str):
"""Prepare a message without publishing (is_publish=False)."""
reading = SensorReading(
sensor_id=sensor_id,
timestamp=datetime.now().isoformat(),
value=value,
unit=unit
)
data = [("reading", reading.to_dict(), "dictionary")]
# With is_publish=False, returns (env, env_json_str) without publishing
env, env_json_str = smartsend(
f"/sensors/{sensor_id}/prepare",
data,
nats_url=self.nats_url,
fileserver_url=self.fileserver_url,
is_publish=False
)
# Now you can publish manually using NATS request-reply pattern
# nc.request(subject, env_json_str, reply_to=reply_to_topic)
return env, env_json_str
def send_batch(self, readings: List[SensorReading]):
batch = SensorBatch()
for reading in readings:
@@ -571,9 +597,9 @@ class SensorReceiver:
self.fileserver_download_handler = fileserver_download_handler
def process_reading(self, msg):
envelope = smartreceive(msg, self.fileserver_download_handler)
env = smartreceive(msg, self.fileserver_download_handler)
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if data_type == "dictionary":
reading = SensorReading(
sensor_id=data["sensor_id"],
@@ -673,10 +699,10 @@ class DeviceBridge:
# Poll for messages
msg = self._poll_for_message()
if msg:
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process payloads
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if dataname == "command":
callback(data)
@@ -772,9 +798,9 @@ class DashboardServer:
def receive_selection(self, callback):
def handler(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if data_type == "dictionary":
callback(data)
@@ -807,7 +833,7 @@ class DashboardUI {
async refreshData() {
// Request fresh data
await smartsend("/dashboard/request", [
const { env, env_json_str } = await smartsend("/dashboard/request", [
{ dataname: "request", data: { type: "refresh" }, type: "dictionary" }
], {
fileserverUrl: window.config.fileserver_url
@@ -816,12 +842,12 @@ class DashboardUI {
async fetchData() {
// Subscribe to data updates
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
});
// Process table data
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'table') {
// Deserialize Arrow IPC
this.data = this.deserializeArrow(payload.data);

File diff suppressed because it is too large Load Diff

View File

@@ -460,8 +460,9 @@ async function smartsend(subject, data, options = {}) {
* @param {string} options.receiverId - UUID of the receiver (default: "")
* @param {string} options.replyTo - Topic to reply to (default: "")
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "")
* @param {boolean} options.isPublish - Whether to automatically publish the message to NATS (default: true)
*
* @returns {Promise<MessageEnvelope>} - The envelope for tracking
* @returns {Promise<Object>} - An object with { env: MessageEnvelope, env_json_str: string }
*/
const {
natsUrl = DEFAULT_NATS_URL,
@@ -474,7 +475,8 @@ async function smartsend(subject, data, options = {}) {
receiverName = "",
receiverId = "",
replyTo = "",
replyToMsgId = ""
replyToMsgId = "",
isPublish = true // Whether to automatically publish the message to NATS
} = options;
log_trace(correlationId, `Starting smartsend for subject: ${subject}`);
@@ -556,10 +558,19 @@ async function smartsend(subject, data, options = {}) {
payloads: payloads
});
// Publish message to NATS
await publish_message(natsUrl, subject, env.toString(), correlationId);
// Convert envelope to JSON string
const env_json_str = env.toString();
return env;
// Publish to NATS if isPublish is true
if (isPublish) {
await publish_message(natsUrl, subject, env_json_str, correlationId);
}
// Return both envelope and JSON string (tuple-like structure)
return {
env: env,
env_json_str: env_json_str
};
}
// Helper: Publish message to NATS

View File

@@ -437,7 +437,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL,
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id=""):
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True):
"""Send data either directly via NATS or via a fileserver URL, depending on payload size.
This function intelligently routes data delivery based on payload size relative to a threshold.
@@ -459,9 +459,12 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
receiver_id: UUID of the receiver
reply_to: Topic to reply to
reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS (default: True)
Returns:
MessageEnvelope: The envelope object for tracking
tuple: (env, env_json_str) where:
- env: MessageEnvelope object with all metadata and payloads
- env_json_str: JSON string representation of the envelope for publishing
"""
# Generate correlation ID if not provided
cid = correlation_id if correlation_id else str(uuid.uuid4())
@@ -549,13 +552,15 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
msg_json = env.to_json()
# Publish to NATS
nats_conn = NATSConnection(nats_url)
nats_conn.connect()
nats_conn.publish(subject, msg_json)
nats_conn.close()
# Publish to NATS if is_publish is True
if is_publish:
nats_conn = NATSConnection(nats_url)
nats_conn.connect()
nats_conn.publish(subject, msg_json)
nats_conn.close()
return env
# Return tuple of (envelope, json_string) for both direct and link transport
return (env, msg_json)
def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5,

View File

@@ -118,7 +118,7 @@ async function test_dict_send() {
// Use smartsend with dictionary type
// For small Dictionary: will use direct transport (JSON encoded)
// For large Dictionary: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -132,7 +132,8 @@ async function test_dict_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -98,7 +98,7 @@ async function test_large_binary_send() {
// Use smartsend with binary type - will automatically use link transport
// if file size exceeds the threshold (1MB by default)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -112,7 +112,8 @@ async function test_large_binary_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -222,7 +222,7 @@ async function test_mix_send() {
];
// Use smartsend with mixed content
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
payloads,
{
@@ -236,7 +236,8 @@ async function test_mix_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -118,7 +118,7 @@ async function test_table_send() {
// Use smartsend with table type
// For small Table: will use direct transport (Arrow IPC encoded)
// For large Table: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -132,7 +132,8 @@ async function test_table_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -94,7 +94,7 @@ async function test_text_send() {
// Use smartsend with text type
// For small text: will use direct transport (Base64 encoded UTF-8)
// For large text: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -108,7 +108,8 @@ async function test_text_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -92,12 +92,12 @@ function test_dict_send()
# Use smartsend with dictionary type
# For small Dictionary: will use direct transport (JSON encoded)
# For large Dictionary: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -105,7 +105,8 @@ function test_dict_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -114,7 +115,7 @@ function test_dict_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -79,12 +79,12 @@ function test_large_binary_send()
# Use smartsend with binary type - will automatically use link transport
# if file size exceeds the threshold (1MB by default)
# API: smartsend(subject, [(dataname, data, type), ...]; keywords...)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL;
broker_url = NATS_URL;
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -92,11 +92,12 @@ function test_large_binary_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with transport: $(env.payloads[1].transport)")
log_trace("Envelope type: $(env.payloads[1].type)")
log_trace("Envelope type: $(env.payloads[1].payload_type)")
# Check if link transport was used
if env.payloads[1].transport == "link"

View File

@@ -186,12 +186,12 @@ function test_mix_send()
]
# Use smartsend with mixed content
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
payloads; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -199,7 +199,8 @@ function test_mix_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -208,7 +209,7 @@ function test_mix_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -90,12 +90,12 @@ function test_table_send()
# Use smartsend with table type
# For small DataFrame: will use direct transport (Base64 encoded Arrow IPC)
# For large DataFrame: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -103,7 +103,8 @@ function test_table_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -112,7 +113,7 @@ function test_table_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -75,12 +75,12 @@ function test_text_send()
# Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -88,7 +88,8 @@ function test_text_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -97,7 +98,7 @@ function test_text_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -64,7 +64,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with dictionary type
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -76,7 +76,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -44,7 +44,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with binary type
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -56,7 +56,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -58,7 +58,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with mixed types
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
data, # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -70,7 +70,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -46,7 +46,7 @@ def main():
# Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver)
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -58,7 +58,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")