19 Commits

Author SHA1 Message Date
61d81bed62 update 2026-02-25 06:04:40 +07:00
1a10bc1a5f update 2026-02-25 05:32:59 +07:00
7f68d08134 update 2026-02-24 21:40:33 +07:00
ab20cd896f update 2026-02-24 21:18:19 +07:00
5a9e93d6e7 update 2026-02-24 20:38:45 +07:00
b51641dc7e update 2026-02-24 20:09:10 +07:00
45f1257896 update 2026-02-24 18:50:28 +07:00
3e2b8b1e3a update 2026-02-24 18:19:03 +07:00
90d81617ef update 2026-02-24 17:58:59 +07:00
64c62e616b update 2026-02-23 22:06:57 +07:00
2c340e37c7 update 2026-02-23 22:00:06 +07:00
7853e94d2e update 2026-02-23 21:54:50 +07:00
99bf57b154 update 2026-02-23 21:43:09 +07:00
0fa6eaf95b update 2026-02-23 21:37:50 +07:00
76f42be740 update 2026-02-23 21:32:22 +07:00
d99dc41be9 update 2026-02-23 21:09:36 +07:00
263508b8f7 update 2026-02-23 20:50:41 +07:00
0c2cca30ed update 2026-02-23 20:34:08 +07:00
46fdf668c6 update 2026-02-23 19:18:12 +07:00
23 changed files with 1115 additions and 774 deletions

View File

@@ -1,6 +1,6 @@
name = "NATSBridge" name = "NATSBridge"
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10" uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
version = "0.4.2" version = "0.4.3"
authors = ["narawat <narawat@gmail.com>"] authors = ["narawat <narawat@gmail.com>"]
[deps] [deps]

105
README.md
View File

@@ -173,7 +173,7 @@ from nats_bridge import smartsend
# Send a text message # Send a text message
data = [("message", "Hello World", "text")] data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
print("Message sent!") print("Message sent!")
``` ```
@@ -183,7 +183,7 @@ print("Message sent!")
const { smartsend } = require('./src/NATSBridge'); const { smartsend } = require('./src/NATSBridge');
// Send a text message // Send a text message
await smartsend("/chat/room1", [ const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" } { dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" }); ], { natsUrl: "nats://localhost:4222" });
@@ -197,7 +197,7 @@ using NATSBridge
# Send a text message # Send a text message
data = [("message", "Hello World", "text")] data = [("message", "Hello World", "text")]
env = NATSBridge.smartsend("/chat/room1", data, nats_url="nats://localhost:4222") env, env_json_str = NATSBridge.smartsend("/chat/room1", data; nats_url="nats://localhost:4222")
println("Message sent!") println("Message sent!")
``` ```
@@ -221,8 +221,8 @@ async def main():
# Subscribe to the subject - msg comes from the callback # Subscribe to the subject - msg comes from the callback
async def message_handler(msg): async def message_handler(msg):
# Receive and process message # Receive and process message
envelope = smartreceive(msg.data) env = smartreceive(msg.data)
for dataname, data, type in envelope["payloads"]: for dataname, data, type in env["payloads"]:
print(f"Received {dataname}: {data}") print(f"Received {dataname}: {data}")
sid = await nc.subscribe(SUBJECT, cb=message_handler) sid = await nc.subscribe(SUBJECT, cb=message_handler)
@@ -251,8 +251,8 @@ async function main() {
for await (const msg of sub) { for await (const msg of sub) {
// Receive and process message // Receive and process message
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
console.log(`Received ${payload.dataname}: ${payload.data}`); console.log(`Received ${payload.dataname}: ${payload.data}`);
} }
} }
@@ -283,8 +283,8 @@ function test_receive()
log_trace("Received message on $(msg.subject)") log_trace("Received message on $(msg.subject)")
# Receive and process message # Receive and process message
envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler) env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"] for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data") println("Received $dataname: $data")
end end
end end
@@ -310,7 +310,7 @@ Sends data either directly via NATS or via a fileserver URL, depending on payloa
```python ```python
from nats_bridge import smartsend from nats_bridge import smartsend
env = smartsend( env, env_json_str = smartsend(
subject, # NATS subject to publish to subject, # NATS subject to publish to
data, # List of (dataname, data, type) tuples data, # List of (dataname, data, type) tuples
nats_url="nats://localhost:4222", # NATS server URL nats_url="nats://localhost:4222", # NATS server URL
@@ -323,7 +323,8 @@ env = smartsend(
receiver_name="", # Receiver name (empty = broadcast) receiver_name="", # Receiver name (empty = broadcast)
receiver_id="", # Receiver UUID (empty = broadcast) receiver_id="", # Receiver UUID (empty = broadcast)
reply_to="", # Reply topic reply_to="", # Reply topic
reply_to_msg_id="" # Reply message ID reply_to_msg_id="", # Reply message ID
is_publish=True # Whether to automatically publish to NATS
) )
``` ```
@@ -332,7 +333,7 @@ env = smartsend(
```javascript ```javascript
const { smartsend } = require('./src/NATSBridge'); const { smartsend } = require('./src/NATSBridge');
const env = await smartsend( const { env, env_json_str } = await smartsend(
subject, // NATS subject subject, // NATS subject
data, // Array of {dataname, data, type} data, // Array of {dataname, data, type}
{ {
@@ -346,7 +347,8 @@ const env = await smartsend(
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Whether to automatically publish to NATS
} }
); );
``` ```
@@ -356,8 +358,8 @@ const env = await smartsend(
```julia ```julia
using NATSBridge using NATSBridge
env = NATSBridge.smartsend( env, env_json_str = NATSBridge.smartsend(
subject; # NATS subject subject, # NATS subject
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type)
nats_url::String = "nats://localhost:4222", nats_url::String = "nats://localhost:4222",
fileserver_url = "http://localhost:8080", fileserver_url = "http://localhost:8080",
@@ -369,8 +371,12 @@ env = NATSBridge.smartsend(
receiver_name::String = "", receiver_name::String = "",
receiver_id::String = "", receiver_id::String = "",
reply_to::String = "", reply_to::String = "",
reply_to_msg_id::String = "" reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS
) )
# Returns: (msgEnvelope_v1, JSON string)
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
# - env_json_str: JSON string representation of the envelope for publishing
``` ```
### smartreceive ### smartreceive
@@ -383,7 +389,7 @@ Receives and processes messages from NATS, handling both direct and link transpo
from nats_bridge import smartreceive from nats_bridge import smartreceive
# Note: For nats-py, use msg.data to pass the raw message data # Note: For nats-py, use msg.data to pass the raw message data
envelope = smartreceive( env = smartreceive(
msg.data, # NATS message data (msg.data for nats-py) msg.data, # NATS message data (msg.data for nats-py)
fileserver_download_handler=_fetch_with_backoff, # Download handler fileserver_download_handler=_fetch_with_backoff, # Download handler
max_retries=5, # Max retry attempts max_retries=5, # Max retry attempts
@@ -399,7 +405,7 @@ envelope = smartreceive(
const { smartreceive } = require('./src/NATSBridge'); const { smartreceive } = require('./src/NATSBridge');
// Note: msg is the NATS message object from subscription // Note: msg is the NATS message object from subscription
const envelope = await smartreceive( const env = await smartreceive(
msg, // NATS message (raw object from subscription) msg, // NATS message (raw object from subscription)
{ {
fileserverDownloadHandler: customDownloadHandler, fileserverDownloadHandler: customDownloadHandler,
@@ -417,7 +423,7 @@ const envelope = await smartreceive(
using NATSBridge using NATSBridge
# Note: msg is a NATS.Msg object passed from the subscription callback # Note: msg is a NATS.Msg object passed from the subscription callback
envelope = NATSBridge.smartreceive( env, env_json_str = NATSBridge.smartreceive(
msg::NATS.Msg; msg::NATS.Msg;
fileserverDownloadHandler::Function = _fetch_with_backoff, fileserverDownloadHandler::Function = _fetch_with_backoff,
max_retries::Int = 5, max_retries::Int = 5,
@@ -488,7 +494,7 @@ await smartsend("/topic", [
#### Julia #### Julia
```julia ```julia
data = [("file", large_data, "binary")] data = [("file", large_data, "binary")]
smartsend("/topic", data, fileserver_url="http://localhost:8080") smartsend("/topic", data; fileserver_url="http://localhost:8080")
``` ```
--- ---
@@ -511,14 +517,14 @@ data = [
("large_document", large_file_data, "binary") ("large_document", large_file_data, "binary")
] ]
smartsend("/chat/room1", data, fileserver_url="http://localhost:8080") env, env_json_str = smartsend("/chat/room1", data, fileserver_url="http://localhost:8080")
``` ```
#### JavaScript #### JavaScript
```javascript ```javascript
const { smartsend } = require('./src/NATSBridge'); const { smartsend } = require('./src/NATSBridge');
await smartsend("/chat/room1", [ const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message_text", data: "Hello!", type: "text" }, { dataname: "message_text", data: "Hello!", type: "text" },
{ dataname: "user_avatar", data: image_data, type: "image" }, { dataname: "user_avatar", data: image_data, type: "image" },
{ dataname: "large_document", data: large_file_data, type: "binary" } { dataname: "large_document", data: large_file_data, type: "binary" }
@@ -537,7 +543,7 @@ data = [
("large_document", large_file_data, "binary") ("large_document", large_file_data, "binary")
] ]
NATSBridge.smartsend("/chat/room1", data, fileserver_url="http://localhost:8080") env, env_json_str = NATSBridge.smartsend("/chat/room1", data; fileserver_url="http://localhost:8080")
``` ```
### Example 2: Dictionary Exchange ### Example 2: Dictionary Exchange
@@ -555,7 +561,7 @@ config = {
} }
data = [("config", config, "dictionary")] data = [("config", config, "dictionary")]
smartsend("/device/config", data) env, env_json_str = smartsend("/device/config", data)
``` ```
#### JavaScript #### JavaScript
@@ -568,7 +574,7 @@ const config = {
update_interval: 60 update_interval: 60
}; };
await smartsend("/device/config", [ const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" } { dataname: "config", data: config, type: "dictionary" }
]); ]);
``` ```
@@ -584,7 +590,7 @@ config = Dict(
) )
data = [("config", config, "dictionary")] data = [("config", config, "dictionary")]
NATSBridge.smartsend("/device/config", data) env, env_json_str = NATSBridge.smartsend("/device/config", data)
``` ```
### Example 3: Table Data (Arrow IPC) ### Example 3: Table Data (Arrow IPC)
@@ -603,7 +609,7 @@ df = pd.DataFrame({
}) })
data = [("students", df, "table")] data = [("students", df, "table")]
smartsend("/data/analysis", data) env, env_json_str = smartsend("/data/analysis", data)
``` ```
#### JavaScript #### JavaScript
@@ -616,7 +622,7 @@ const tableData = [
{ id: 3, name: "Charlie", score: 92 } { id: 3, name: "Charlie", score: 92 }
]; ];
await smartsend("/data/analysis", [ const { env, env_json_str } = await smartsend("/data/analysis", [
{ dataname: "students", data: tableData, type: "table" } { dataname: "students", data: tableData, type: "table" }
]); ]);
``` ```
@@ -633,22 +639,27 @@ df = DataFrame(
) )
data = [("students", df, "table")] data = [("students", df, "table")]
NATSBridge.smartsend("/data/analysis", data) env, env_json_str = NATSBridge.smartsend("/data/analysis", data)
``` ```
### Example 4: Request-Response Pattern ### Example 4: Request-Response Pattern with Envelope JSON
Bi-directional communication with reply-to support. Bi-directional communication with reply-to support. The `smartsend` function now returns both the envelope object and a JSON string that can be published directly.
#### Python/Micropython (Requester) #### Python/Micropython (Requester)
```python ```python
from nats_bridge import smartsend from nats_bridge import smartsend
env = smartsend( env, env_json_str = smartsend(
"/device/command", "/device/command",
[("command", {"action": "read_sensor"}, "dictionary")], [("command", {"action": "read_sensor"}, "dictionary")],
reply_to="/device/response" reply_to="/device/response"
) )
# env: msgEnvelope_v1 object
# env_json_str: JSON string for publishing to NATS
# The env_json_str can also be published directly using NATS request-reply pattern
# nc.request("/device/command", env_json_str, reply_to="/device/response")
``` ```
#### Python/Micropython (Responder) #### Python/Micropython (Responder)
@@ -666,8 +677,8 @@ async def main():
nc = await nats.connect(NATS_URL) nc = await nats.connect(NATS_URL)
async def message_handler(msg): async def message_handler(msg):
envelope = smartreceive(msg.data) env = smartreceive(msg.data)
for dataname, data, type in envelope["payloads"]: for dataname, data, type in env["payloads"]:
if data.get("action") == "read_sensor": if data.get("action") == "read_sensor":
response = {"sensor_id": "sensor-001", "value": 42.5} response = {"sensor_id": "sensor-001", "value": 42.5}
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")]) smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
@@ -683,7 +694,7 @@ asyncio.run(main())
```javascript ```javascript
const { smartsend } = require('./src/NATSBridge'); const { smartsend } = require('./src/NATSBridge');
await smartsend("/device/command", [ const { env, env_json_str } = await smartsend("/device/command", [
{ dataname: "command", data: { action: "read_sensor" }, type: "dictionary" } { dataname: "command", data: { action: "read_sensor" }, type: "dictionary" }
], { ], {
replyTo: "/device/response" replyTo: "/device/response"
@@ -706,8 +717,8 @@ async function main() {
const sub = nc.subscribe(SUBJECT); const sub = nc.subscribe(SUBJECT);
for await (const msg of sub) { for await (const msg of sub) {
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
if (payload.dataname === "command" && payload.data.action === "read_sensor") { if (payload.dataname === "command" && payload.data.action === "read_sensor") {
const response = { sensor_id: "sensor-001", value: 42.5 }; const response = { sensor_id: "sensor-001", value: 42.5 };
await smartsend(REPLY_SUBJECT, [ await smartsend(REPLY_SUBJECT, [
@@ -725,9 +736,9 @@ main();
```julia ```julia
using NATSBridge using NATSBridge
env = NATSBridge.smartsend( env, env_json_str = NATSBridge.smartsend(
"/device/command", "/device/command",
[("command", Dict("action" => "read_sensor"), "dictionary")], [("command", Dict("action" => "read_sensor"), "dictionary")];
reply_to="/device/response" reply_to="/device/response"
) )
``` ```
@@ -744,8 +755,8 @@ const NATS_URL = "nats://localhost:4222"
function test_responder() function test_responder()
conn = NATS.connect(NATS_URL) conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg NATS.subscribe(conn, SUBJECT) do msg
envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler) env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"] for (dataname, data, type) in env["payloads"]
if dataname == "command" && data["action"] == "read_sensor" if dataname == "command" && data["action"] == "read_sensor"
response = Dict("sensor_id" => "sensor-001", "value" => 42.5) response = Dict("sensor_id" => "sensor-001", "value" => 42.5)
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")]) smartsend(REPLY_SUBJECT, [("data", response, "dictionary")])
@@ -783,8 +794,8 @@ async def main():
# Receive commands - msg comes from the callback # Receive commands - msg comes from the callback
async def message_handler(msg): async def message_handler(msg):
envelope = smartreceive(msg.data) env = smartreceive(msg.data)
for dataname, data, type in envelope["payloads"]: for dataname, data, type in env["payloads"]:
if type == "dictionary" and data.get("action") == "reboot": if type == "dictionary" and data.get("action") == "reboot":
# Execute reboot # Execute reboot
pass pass
@@ -811,8 +822,8 @@ async function main() {
const sub = nc.subscribe(SUBJECT); const sub = nc.subscribe(SUBJECT);
for await (const msg of sub) { for await (const msg of sub) {
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
if (payload.dataname === "temperature") { if (payload.dataname === "temperature") {
console.log(`Temperature: ${payload.data}`); console.log(`Temperature: ${payload.data}`);
} else if (payload.dataname === "humidity") { } else if (payload.dataname === "humidity") {
@@ -836,8 +847,8 @@ const NATS_URL = "nats://localhost:4222"
function test_receiver() function test_receiver()
conn = NATS.connect(NATS_URL) conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg NATS.subscribe(conn, SUBJECT) do msg
envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler) env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"] for (dataname, data, type) in env["payloads"]
if dataname == "temperature" if dataname == "temperature"
println("Temperature: $data") println("Temperature: $data")
elseif dataname == "humidity" elseif dataname == "humidity"

View File

@@ -17,16 +17,16 @@ The system uses **handler functions** to abstract file server operations, allowi
```julia ```julia
# Upload handler - uploads data to file server and returns URL # Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserverUploadHandler parameter # The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8}) # It receives: (file_server_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url" # Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff # Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserverDownloadHandler parameter # The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String) # It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data) # Returns: Vector{UInt8} (the downloaded data)
fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
``` ```
This design allows the system to support multiple file server backends without changing the core messaging logic. This design allows the system to support multiple file server backends without changing the core messaging logic.
@@ -40,21 +40,21 @@ The system uses a **standardized list-of-tuples format** for all payload operati
# Input format for smartsend (always a list of tuples with type info) # Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...] [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field) # Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing list of tuples # Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# { # {
# "correlationId": "...", # "correlation_id": "...",
# "msgId": "...", # "msg_id": "...",
# "timestamp": "...", # "timestamp": "...",
# "sendTo": "...", # "send_to": "...",
# "msgPurpose": "...", # "msg_purpose": "...",
# "senderName": "...", # "sender_name": "...",
# "senderId": "...", # "sender_id": "...",
# "receiverName": "...", # "receiver_name": "...",
# "receiverId": "...", # "receiver_id": "...",
# "replyTo": "...", # "reply_to": "...",
# "replyToMsgId": "...", # "reply_to_msg_id": "...",
# "brokerURL": "...", # "broker_url": "...",
# "metadata": {...}, # "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] # "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# } # }
@@ -78,17 +78,16 @@ This design allows per-payload type specification, enabling **mixed-content mess
smartsend( smartsend(
"/test", "/test",
[("dataname1", data1, "dictionary")], # List with one tuple (data, type) [("dataname1", data1, "dictionary")], # List with one tuple (data, type)
nats_url="nats://localhost:4222", broker_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload, fileserver_upload_handler=plik_oneshot_upload
metadata=user_provided_envelope_level_metadata
) )
# Multiple payloads in one message with different types # Multiple payloads in one message with different types
smartsend( smartsend(
"/test", "/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")], [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
nats_url="nats://localhost:4222", broker_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload fileserver_upload_handler=plik_oneshot_upload
) )
# Mixed content (e.g., chat with text, image, audio) # Mixed content (e.g., chat with text, image, audio)
@@ -99,13 +98,14 @@ smartsend(
("user_image", image_data, "image"), ("user_image", image_data, "image"),
("audio_clip", audio_data, "audio") ("audio_clip", audio_data, "audio")
], ],
nats_url="nats://localhost:4222" broker_url="nats://localhost:4222"
) )
# Receive returns a dictionary envelope with all metadata and deserialized payloads # Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay) env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# envelope["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...] # env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# envelope["correlationId"], envelope["msgId"], etc. # env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
``` ```
## Architecture Diagram ## Architecture Diagram
@@ -138,48 +138,48 @@ flowchart TD
## System Components ## System Components
### 1. msgEnvelope_v1 - Message Envelope ### 1. msg_envelope_v1 - Message Envelope
The `msgEnvelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications. The `msg_envelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications.
**Julia Structure:** **Julia Structure:**
```julia ```julia
struct msgEnvelope_v1 struct msg_envelope_v1
correlationId::String # Unique identifier to track messages across systems correlation_id::String # Unique identifier to track messages across systems
msgId::String # This message id msg_id::String # This message id
timestamp::String # Message published timestamp timestamp::String # Message published timestamp
sendTo::String # Topic/subject the sender sends to send_to::String # Topic/subject the sender sends to
msgPurpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...) msg_purpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...)
senderName::String # Sender name (e.g., "agent-wine-web-frontend") sender_name::String # Sender name (e.g., "agent-wine-web-frontend")
senderId::String # Sender id (uuid4) sender_id::String # Sender id (uuid4)
receiverName::String # Message receiver name (e.g., "agent-backend") receiver_name::String # Message receiver name (e.g., "agent-backend")
receiverId::String # Message receiver id (uuid4 or nothing for broadcast) receiver_id::String # Message receiver id (uuid4 or nothing for broadcast)
replyTo::String # Topic to reply to reply_to::String # Topic to reply to
replyToMsgId::String # Message id this message is replying to reply_to_msg_id::String # Message id this message is replying to
brokerURL::String # NATS server address broker_url::String # NATS server address
metadata::Dict{String, Any} metadata::Dict{String, Any}
payloads::AbstractArray{msgPayload_v1} # Multiple payloads stored here payloads::Vector{msg_payload_v1} # Multiple payloads stored here
end end
``` ```
**JSON Schema:** **JSON Schema:**
```json ```json
{ {
"correlationId": "uuid-v4-string", "correlation_id": "uuid-v4-string",
"msgId": "uuid-v4-string", "msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z", "timestamp": "2024-01-15T10:30:00Z",
"sendTo": "topic/subject", "send_to": "topic/subject",
"msgPurpose": "ACK | NACK | updateStatus | shutdown | chat", "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"senderName": "agent-wine-web-frontend", "sender_name": "agent-wine-web-frontend",
"senderId": "uuid4", "sender_id": "uuid4",
"receiverName": "agent-backend", "receiver_name": "agent-backend",
"receiverId": "uuid4", "receiver_id": "uuid4",
"replyTo": "topic", "reply_to": "topic",
"replyToMsgId": "uuid4", "reply_to_msg_id": "uuid4",
"brokerURL": "nats://localhost:4222", "broker_url": "nats://localhost:4222",
"metadata": { "metadata": {
@@ -189,7 +189,7 @@ end
{ {
"id": "uuid4", "id": "uuid4",
"dataname": "login_image", "dataname": "login_image",
"type": "image", "payload_type": "image",
"transport": "direct", "transport": "direct",
"encoding": "base64", "encoding": "base64",
"size": 15433, "size": 15433,
@@ -201,7 +201,7 @@ end
{ {
"id": "uuid4", "id": "uuid4",
"dataname": "large_data", "dataname": "large_data",
"type": "table", "payload_type": "table",
"transport": "link", "transport": "link",
"encoding": "none", "encoding": "none",
"size": 524288, "size": 524288,
@@ -214,16 +214,16 @@ end
} }
``` ```
### 2. msgPayload_v1 - Payload Structure ### 2. msg_payload_v1 - Payload Structure
The `msgPayload_v1` structure provides flexible payload handling for various data types across all supported platforms. The `msg_payload_v1` structure provides flexible payload handling for various data types across all supported platforms.
**Julia Structure:** **Julia Structure:**
```julia ```julia
struct msgPayload_v1 struct msg_payload_v1
id::String # Id of this payload (e.g., "uuid4") id::String # Id of this payload (e.g., "uuid4")
dataname::String # Name of this payload (e.g., "login_image") dataname::String # Name of this payload (e.g., "login_image")
type::String # "text | dictionary | table | image | audio | video | binary" payload_type::String # "text | dictionary | table | image | audio | video | binary"
transport::String # "direct | link" transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc" encoding::String # "none | json | base64 | arrow-ipc"
size::Integer # Data size in bytes size::Integer # Data size in bytes
@@ -383,13 +383,32 @@ graph TD
```julia ```julia
function smartsend( function smartsend(
subject::String, subject::String,
data::AbstractArray{Tuple{String, Any, String}}; # No standalone type parameter data::AbstractArray{Tuple{String, Any, String}, 1}; # List of (dataname, data, type) tuples
nats_url::String = "nats://localhost:4222", broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
fileserverUploadHandler::Function = plik_oneshot_upload, fileserver_url = DEFAULT_FILESERVER_URL,
size_threshold::Int = 1_000_000 # 1MB fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS
) )
``` ```
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
- `env_json_str::String` - JSON string representation of the envelope for publishing
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
The envelope object can be accessed directly for programmatic use, while the JSON string can be published directly to NATS using the request-reply pattern.
**Input Format:** **Input Format:**
- `data::AbstractArray{Tuple{String, Any, String}}` - **Must be a list of (dataname, data, type) tuples**: `[("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]` - `data::AbstractArray{Tuple{String, Any, String}}` - **Must be a list of (dataname, data, type) tuples**: `[("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]` - Even for single payloads: `[(dataname1, data1, "type1")]`
@@ -406,8 +425,8 @@ function smartsend(
```julia ```julia
function smartreceive( function smartreceive(
msg::NATS.Message, msg::NATS.Msg;
fileserverDownloadHandler::Function; fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5, max_retries::Int = 5,
base_delay::Int = 100, base_delay::Int = 100,
max_delay::Int = 5000 max_delay::Int = 5000
@@ -416,7 +435,7 @@ function smartreceive(
# Iterate through all payloads # Iterate through all payloads
# For each payload: check transport type # For each payload: check transport type
# If direct: decode Base64 payload # If direct: decode Base64 payload
# If link: fetch from URL with exponential backoff using fileserverDownloadHandler # If link: fetch from URL with exponential backoff using fileserver_download_handler
# Deserialize payload based on type # Deserialize payload based on type
# Return envelope dictionary with all metadata and deserialized payloads # Return envelope dictionary with all metadata and deserialized payloads
end end
@@ -424,9 +443,9 @@ end
**Output Format:** **Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields: - Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL` - `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary - `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data - `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
**Process Flow:** **Process Flow:**
1. Parse the JSON envelope to extract all fields 1. Parse the JSON envelope to extract all fields
@@ -434,71 +453,186 @@ end
3. For each payload: 3. For each payload:
- Determine transport type (`direct` or `link`) - Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message - If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff (via `fileserverDownloadHandler`) - If `link`: fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.) - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples 4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserverDownloadHandler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`. **Note:** The `fileserver_download_handler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
### JavaScript Implementation ### JavaScript Implementation
#### Dependencies #### Dependencies
- `nats.js` - Core NATS functionality - `nats.js` - Core NATS functionality
- `apache-arrow` - Arrow IPC serialization - `apache-arrow` - Arrow IPC serialization
- `uuid` - Correlation ID generation - `uuid` - Correlation ID and message ID generation
- `base64-arraybuffer` - Base64 encoding/decoding
- `node-fetch` or `fetch` - HTTP client for file server
#### smartsend Function #### smartsend Function
```javascript ```javascript
async function smartsend(subject, data, options = {}) async function smartsend(
// data format: [(dataname, data, type), ...] subject,
// options object should include: data, // List of (dataname, data, type) tuples: [(dataname1, data1, type1), ...]
// - natsUrl: NATS server URL options = {}
// - fileserverUrl: base URL of the file server )
// - sizeThreshold: threshold in bytes for transport selection
// - correlationId: optional correlation ID for tracing
``` ```
**Options:**
- `broker_url` (String) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (String) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (Number) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (String) - Optional correlation ID for tracing
- `msg_purpose` (String) - Purpose of the message (default: `"chat"`)
- `sender_name` (String) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (String) - Message receiver name (default: `""`)
- `receiver_id` (String) - Message receiver ID (default: `""`)
- `reply_to` (String) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (String) - Message ID this message is replying to (default: `""`)
- `fileserver_upload_handler` (Function) - Custom upload handler function
**Return Value:**
- Returns a Promise that resolves to an object containing:
- `env` - The envelope object containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
- `published` - Boolean indicating whether the message was automatically published to NATS
**Input Format:** **Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]` - `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]` - Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages - Each payload can have a different type, enabling mixed-content messages
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Flow:** **Flow:**
1. Iterate through the list of (dataname, data, type) tuples 1. Generate correlation ID and message ID if not provided
2. For each payload: extract the type from the tuple and serialize accordingly 2. Iterate through the list of `(dataname, data, type)` tuples
3. Check payload size 3. For each payload:
4. If < threshold: publish directly to NATS - Serialize based on payload type
5. If >= threshold: upload to HTTP server, publish NATS with URL - Check payload size
- If < threshold: Base64 encode and include in envelope
- If >= threshold: Upload to HTTP server, store URL in envelope
4. Publish the JSON envelope to NATS
5. Return envelope object and JSON string
#### smartreceive Handler #### smartreceive Handler
```javascript ```javascript
async function smartreceive(msg, options = {}) async function smartreceive(msg, options = {})
// options object should include:
// - fileserverDownloadHandler: function to fetch data from file server URL
// - max_retries: maximum retry attempts for fetching URL
// - base_delay: initial delay for exponential backoff in ms
// - max_delay: maximum delay for exponential backoff in ms
// - correlationId: optional correlation ID for tracing
``` ```
**Options:**
- `fileserver_download_handler` (Function) - Custom download handler function
- `max_retries` (Number) - Maximum retry attempts for fetching URL (default: `5`)
- `base_delay` (Number) - Initial delay for exponential backoff in ms (default: `100`)
- `max_delay` (Number) - Maximum delay for exponential backoff in ms (default: `5000`)
- `correlation_id` (String) - Optional correlation ID for tracing
**Output Format:** **Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields: - Returns a Promise that resolves to an object containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL` - `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary - `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data - `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
**Process Flow:** **Process Flow:**
1. Parse the JSON envelope to extract all fields 1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads` 2. Iterate through each payload in `payloads` array
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`.
### Python/Micropython Implementation
#### Dependencies
- `nats-python` - Core NATS functionality
- `pyarrow` - Arrow IPC serialization
- `uuid` - Correlation ID and message ID generation
- `base64` - Base64 encoding/decoding
- `requests` or `aiohttp` - HTTP client for file server
#### smartsend Function
```python
async def smartsend(
subject: str,
data: List[Tuple[str, Any, str]], # List of (dataname, data, type) tuples
options: Dict = {}
)
```
**Options:**
- `broker_url` (str) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (str) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (int) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (str) - Optional correlation ID for tracing
- `msg_purpose` (str) - Purpose of the message (default: `"chat"`)
- `sender_name` (str) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (str) - Message receiver name (default: `""`)
- `receiver_id` (str) - Message receiver ID (default: `""`)
- `reply_to` (str) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (str) - Message ID this message is replying to (default: `""`)
- `fileserver_upload_handler` (Callable) - Custom upload handler function
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env` - The envelope dictionary containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Flow:**
1. Generate correlation ID and message ID if not provided
2. Iterate through the list of `(dataname, data, type)` tuples
3. For each payload:
- Serialize based on payload type
- Check payload size
- If < threshold: Base64 encode and include in envelope
- If >= threshold: Upload to HTTP server, store URL in envelope
4. Publish the JSON envelope to NATS
5. Return envelope dictionary and JSON string
#### smartreceive Handler
```python
async def smartreceive(
msg: NATS.Message,
options: Dict = {}
)
```
**Options:**
- `fileserver_download_handler` (Callable) - Custom download handler function
- `max_retries` (int) - Maximum retry attempts for fetching URL (default: `5`)
- `base_delay` (int) - Initial delay for exponential backoff in ms (default: `100`)
- `max_delay` (int) - Maximum delay for exponential backoff in ms (default: `5000`)
- `correlation_id` (str) - Optional correlation ID for tracing
**Output Format:**
- Returns a dictionary containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of tuples, each containing `(dataname, data, payload_type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads` list
3. For each payload: 3. For each payload:
- Determine transport type (`direct` or `link`) - Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message - If `direct`: Base64 decode the data from the message
- If `link`: fetch data from URL using exponential backoff - If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.) - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples 4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str)` and returns `bytes`.
## Scenario Implementations ## Scenario Implementations
### Scenario 1: Command & Control (Small Dictionary) ### Scenario 1: Command & Control (Small Dictionary)
@@ -690,7 +824,7 @@ async function smartreceive(msg, options = {})
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components across all platforms. **Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components across all platforms.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads. **Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
## Performance Considerations ## Performance Considerations

View File

@@ -19,49 +19,102 @@ NATSBridge is implemented in three languages, each providing the same API:
| **JavaScript** | [`src/NATSBridge.js`](../src/NATSBridge.js) | JavaScript implementation for Node.js and browsers | | **JavaScript** | [`src/NATSBridge.js`](../src/NATSBridge.js) | JavaScript implementation for Node.js and browsers |
| **Python/Micropython** | [`src/nats_bridge.py`](../src/nats_bridge.py) | Python implementation for desktop and microcontrollers | | **Python/Micropython** | [`src/nats_bridge.py`](../src/nats_bridge.py) | Python implementation for desktop and microcontrollers |
### Multi-Payload Support ### File Server Handler Architecture
The implementation uses a **standardized list-of-tuples format** for all payload operations. **Even when sending a single payload, the user must wrap it in a list.** The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
**Handler Function Signatures:**
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
```
This design allows the system to support multiple file server backends without changing the core messaging logic.
### Multi-Payload Support (Standard API)
The system uses a **standardized list-of-tuples format** for all payload operations. **Even when sending a single payload, the user must wrap it in a list.**
**API Standard:** **API Standard:**
```julia ```julia
# Input format for smartsend (always a list of tuples with type info) # Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...] [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field) # Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing list of tuples # Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# { # {
# "correlationId": "...", # "correlation_id": "...",
# "msgId": "...", # "msg_id": "...",
# "timestamp": "...", # "timestamp": "...",
# "sendTo": "...", # "send_to": "...",
# "msgPurpose": "...", # "msg_purpose": "...",
# "senderName": "...", # "sender_name": "...",
# "senderId": "...", # "sender_id": "...",
# "receiverName": "...", # "receiver_name": "...",
# "receiverId": "...", # "receiver_id": "...",
# "replyTo": "...", # "reply_to": "...",
# "replyToMsgId": "...", # "reply_to_msg_id": "...",
# "brokerURL": "...", # "broker_url": "...",
# "metadata": {...}, # "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] # "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# } # }
``` ```
Where `type` can be: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"` **Supported Types:**
- `"text"` - Plain text
- `"dictionary"` - JSON-serializable dictionaries (Dict, NamedTuple)
- `"table"` - Tabular data (DataFrame, array of structs)
- `"image"` - Image data (Bitmap, PNG/JPG bytes)
- `"audio"` - Audio data (WAV, MP3 bytes)
- `"video"` - Video data (MP4, AVI bytes)
- `"binary"` - Generic binary data (Vector{UInt8})
This design allows per-payload type specification, enabling **mixed-content messages** where different payloads can use different serialization formats in a single message.
**Examples:** **Examples:**
```julia ```julia
# Single payload - still wrapped in a list (type is required as third element) # Single payload - still wrapped in a list
smartsend("/test", [(dataname1, data1, "text")], ...) smartsend(
"/test",
[("dataname1", data1, "dictionary")], # List with one tuple (data, type)
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads in one message (each payload has its own type) # Multiple payloads in one message with different types
smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], ...) smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Mixed content (e.g., chat with text, image, audio)
smartsend(
"/chat",
[
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads # Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, ...) env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# envelope["payloads"] = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...] # env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# envelope["correlationId"], envelope["msgId"], etc. # env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
``` ```
## Cross-Platform Interoperability ## Cross-Platform Interoperability
@@ -98,14 +151,14 @@ NATSBridge is designed for seamless communication between Julia, JavaScript, and
# Julia sender # Julia sender
using NATSBridge using NATSBridge
data = [("message", "Hello from Julia!", "text")] data = [("message", "Hello from Julia!", "text")]
smartsend("/cross_platform", data, nats_url="nats://localhost:4222") smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
``` ```
```javascript ```javascript
// JavaScript receiver // JavaScript receiver
const { smartreceive } = require('./src/NATSBridge'); const { smartreceive } = require('./src/NATSBridge');
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
// envelope.payloads[0].data === "Hello from Julia!" // env.payloads[0].data === "Hello from Julia!"
``` ```
```python ```python
@@ -146,15 +199,31 @@ All three implementations (Julia, JavaScript, Python/Micropython) follow the sam
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
``` ```
## Files ## smartsend Return Value
The `smartsend` function now returns a tuple containing both the envelope object and the JSON string representation:
```julia
env, env_json_str = smartsend(...)
# env::msg_envelope_v1 - The envelope object with all metadata and payloads
# env_json_str::String - JSON string for publishing to NATS
```
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
This enables two use cases:
1. **Programmatic envelope access**: Access envelope fields directly via the `env` object
2. **Direct JSON publishing**: Publish the JSON string directly using NATS request-reply pattern
### Julia Module: [`src/NATSBridge.jl`](../src/NATSBridge.jl) ### Julia Module: [`src/NATSBridge.jl`](../src/NATSBridge.jl)
The Julia implementation provides: The Julia implementation provides:
- **[`MessageEnvelope`](src/NATSBridge.jl)**: Struct for the unified JSON envelope - **[`msg_envelope_v1`](src/NATSBridge.jl)**: Struct for the unified JSON envelope
- **[`SmartSend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size - **[`msg_payload_v1`](src/NATSBridge.jl)**: Struct for individual payload representation
- **[`SmartReceive()`](src/NATSBridge.jl)**: Handles both direct and link transport - **[`smartsend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size
- **[`smartreceive()`](src/NATSBridge.jl)**: Handles both direct and link transport
### JavaScript Module: [`src/NATSBridge.js`](../src/NATSBridge.js) ### JavaScript Module: [`src/NATSBridge.js`](../src/NATSBridge.js)
@@ -247,7 +316,53 @@ node test/scenario3_julia_to_julia.js
## Usage ## Usage
### Scenario 0: Basic Multi-Payload Example ### Scenario 1: Command & Control (Small Dictionary)
**Focus:** Sending small dictionary configurations across platforms. This is the simplest use case for command and control scenarios.
**Julia (Sender/Receiver):**
```julia
using NATSBridge
# Subscribe to control subject
# Parse JSON envelope
# Execute simulation with parameters
# Send acknowledgment
```
**JavaScript (Sender/Receiver):**
```javascript
const { smartsend } = require('./src/NATSBridge');
// Create small dictionary config
// Send via smartsend with type="dictionary"
const config = {
step_size: 0.01,
iterations: 1000,
threshold: 0.5
};
await smartsend("control", [
{ dataname: "config", data: config, type: "dictionary" }
]);
```
**Python/Micropython (Sender/Receiver):**
```python
from nats_bridge import smartsend
# Create small dictionary config
# Send via smartsend with type="dictionary"
config = {
"step_size": 0.01,
"iterations": 1000,
"threshold": 0.5
}
smartsend("control", [("config", config, "dictionary")])
```
### Basic Multi-Payload Example
#### Python/Micropython (Sender) #### Python/Micropython (Sender)
```python ```python
@@ -262,16 +377,16 @@ smartsend(
) )
# Even single payload must be wrapped in a list with type # Even single payload must be wrapped in a list with type
smartsend("/test", [("single_data", mydata, "dictionary")]) smartsend("/test", [("single_data", mydata, "dictionary")], nats_url="nats://localhost:4222")
``` ```
#### Python/Micropython (Receiver) #### Python/Micropython (Receiver)
```python ```python
from nats_bridge import smartreceive from nats_bridge import smartreceive
# Receive returns a list of (dataname, data, type) tuples # Receive returns a dictionary with envelope metadata and payloads field
payloads = smartreceive(msg) env = smartreceive(msg)
# payloads = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...] # env["payloads"] = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
``` ```
#### JavaScript (Sender) #### JavaScript (Sender)
@@ -315,18 +430,18 @@ const nc = await connect({ servers: ['nats://localhost:4222'] });
const sub = nc.subscribe("control"); const sub = nc.subscribe("control");
for await (const msg of sub) { for await (const msg of sub) {
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
// Process the payloads from the envelope // Process the payloads from the envelope
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
const { dataname, data, type } = payload; const { dataname, data, type } = payload;
console.log(`Received ${dataname} of type ${type}`); console.log(`Received ${dataname} of type ${type}`);
console.log(`Data: ${JSON.stringify(data)}`); console.log(`Data: ${JSON.stringify(data)}`);
} }
// Also access envelope metadata // Also access envelope metadata
console.log(`Correlation ID: ${envelope.correlationId}`); console.log(`Correlation ID: ${env.correlation_id}`);
console.log(`Message ID: ${envelope.msgId}`); console.log(`Message ID: ${env.msg_id}`);
} }
``` ```
@@ -344,19 +459,21 @@ df = DataFrame(
category = rand(["A", "B", "C"], 10_000_000) category = rand(["A", "B", "C"], 10_000_000)
) )
# Send via SmartSend - wrapped in a list (type is part of each tuple) # Send via smartsend - wrapped in a list (type is part of each tuple)
await SmartSend("analysis_results", [("table_data", df, "table")]); env, env_json_str = smartsend("analysis_results", [("table_data", df, "table")], broker_url="nats://localhost:4222")
# env: msg_envelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
``` ```
#### JavaScript (Receiver) #### JavaScript (Receiver)
```javascript ```javascript
const { smartreceive } = require('./src/NATSBridge'); const { smartreceive } = require('./src/NATSBridge');
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
// Use table data from the payloads field // Use table data from the payloads field
// Note: Tables are sent as arrays of objects in JavaScript // Note: Tables are sent as arrays of objects in JavaScript
const table = envelope.payloads; const table = env.payloads;
``` ```
### Scenario 3: Live Binary Processing ### Scenario 3: Live Binary Processing
@@ -406,10 +523,10 @@ from nats_bridge import smartreceive
# Receive binary data # Receive binary data
def process_binary(msg): def process_binary(msg):
envelope = smartreceive(msg) env = smartreceive(msg)
# Process the binary data from envelope.payloads # Process the binary data from env.payloads
for dataname, data, type in envelope["payloads"]: for dataname, data, type in env["payloads"]:
if type == "binary": if type == "binary":
# data is bytes # data is bytes
print(f"Received binary data: {dataname}, size: {len(data)}") print(f"Received binary data: {dataname}, size: {len(data)}")
@@ -422,10 +539,10 @@ const { smartreceive } = require('./src/NATSBridge');
// Receive binary data // Receive binary data
function process_binary(msg) { function process_binary(msg) {
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
// Process the binary data from envelope.payloads // Process the binary data from env.payloads
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
if (payload.type === "binary") { if (payload.type === "binary") {
// data is an ArrayBuffer or Uint8Array // data is an ArrayBuffer or Uint8Array
console.log(`Received binary data: ${payload.dataname}, size: ${payload.data.length}`); console.log(`Received binary data: ${payload.dataname}, size: ${payload.data.length}`);
@@ -444,7 +561,7 @@ using NATSBridge
function publish_health_status(nats_url) function publish_health_status(nats_url)
# Send status wrapped in a list (type is part of each tuple) # Send status wrapped in a list (type is part of each tuple)
status = Dict("cpu" => rand(), "memory" => rand()) status = Dict("cpu" => rand(), "memory" => rand())
smartsend("health", [("status", status, "dictionary")], nats_url=nats_url) smartsend("health", [("status", status, "dictionary")], broker_url=nats_url)
sleep(5) # Every 5 seconds sleep(5) # Every 5 seconds
end end
``` ```
@@ -466,8 +583,8 @@ const consumer = await js.pullSubscribe("health", {
// Process historical and real-time messages // Process historical and real-time messages
for await (const msg of consumer) { for await (const msg of consumer) {
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
// envelope.payloads contains the list of payloads // env.payloads contains the list of payloads
// Each payload has: dataname, data, type // Each payload has: dataname, data, type
msg.ack(); msg.ack();
} }
@@ -484,10 +601,10 @@ import json
# Device configuration handler # Device configuration handler
def handle_device_config(msg): def handle_device_config(msg):
envelope = smartreceive(msg) env = smartreceive(msg)
# Process configuration from payloads # Process configuration from payloads
for dataname, data, type in envelope["payloads"]: for dataname, data, type in env["payloads"]:
if type == "dictionary": if type == "dictionary":
print(f"Received configuration: {data}") print(f"Received configuration: {data}")
# Apply configuration to device # Apply configuration to device
@@ -506,7 +623,7 @@ def handle_device_config(msg):
"device/response", "device/response",
[("config", config, "dictionary")], [("config", config, "dictionary")],
nats_url="nats://localhost:4222", nats_url="nats://localhost:4222",
reply_to=envelope.get("replyTo") reply_to=env.get("reply_to")
) )
``` ```
@@ -566,11 +683,11 @@ smartsend(
const { smartreceive, smartsend } = require('./src/NATSBridge'); const { smartreceive, smartsend } = require('./src/NATSBridge');
// Receive NATS message with direct transport // Receive NATS message with direct transport
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
// Decode Base64 payload (for direct transport) // Decode Base64 payload (for direct transport)
// For tables, data is in envelope.payloads // For tables, data is in env.payloads
const table = envelope.payloads; // Array of objects const table = env.payloads; // Array of objects
// User makes selection // User makes selection
const selection = uiComponent.getSelectedOption(); const selection = uiComponent.getSelectedOption();
@@ -619,7 +736,7 @@ chat_message = [
smartsend( smartsend(
"chat.room123", "chat.room123",
chat_message, chat_message,
nats_url="nats://localhost:4222", broker_url="nats://localhost:4222",
msg_purpose="chat", msg_purpose="chat",
reply_to="chat.room123.responses" reply_to="chat.room123.responses"
) )
@@ -667,7 +784,7 @@ await smartsend("chat.room123", message);
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components. **Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads. **Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
## Configuration ## Configuration
@@ -683,19 +800,19 @@ await smartsend("chat.room123", message);
```json ```json
{ {
"correlationId": "uuid-v4-string", "correlation_id": "uuid-v4-string",
"msgId": "uuid-v4-string", "msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z", "timestamp": "2024-01-15T10:30:00Z",
"sendTo": "topic/subject", "send_to": "topic/subject",
"msgPurpose": "ACK | NACK | updateStatus | shutdown | chat", "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"senderName": "agent-wine-web-frontend", "sender_name": "agent-wine-web-frontend",
"senderId": "uuid4", "sender_id": "uuid4",
"receiverName": "agent-backend", "receiver_name": "agent-backend",
"receiverId": "uuid4", "receiver_id": "uuid4",
"replyTo": "topic", "reply_to": "topic",
"replyToMsgId": "uuid4", "reply_to_msg_id": "uuid4",
"BrokerURL": "nats://localhost:4222", "broker_url": "nats://localhost:4222",
"metadata": { "metadata": {
"content_type": "application/octet-stream", "content_type": "application/octet-stream",
@@ -706,7 +823,7 @@ await smartsend("chat.room123", message);
{ {
"id": "uuid4", "id": "uuid4",
"dataname": "login_image", "dataname": "login_image",
"type": "image", "payload_type": "image",
"transport": "direct", "transport": "direct",
"encoding": "base64", "encoding": "base64",
"size": 15433, "size": 15433,

View File

@@ -107,10 +107,15 @@ python3 -m http.server 8080 --directory /tmp/fileserver
```python ```python
from nats_bridge import smartsend from nats_bridge import smartsend
# Send a text message # Send a text message (is_publish=True by default)
data = [("message", "Hello World", "text")] data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
print("Message sent!") print("Message sent!")
# Or use is_publish=False to get envelope and JSON without publishing
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222", is_publish=False)
# env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS
``` ```
#### JavaScript #### JavaScript
@@ -118,12 +123,19 @@ print("Message sent!")
```javascript ```javascript
const { smartsend } = require('./src/NATSBridge'); const { smartsend } = require('./src/NATSBridge');
// Send a text message // Send a text message (isPublish=true by default)
await smartsend("/chat/room1", [ await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" } { dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" }); ], { natsUrl: "nats://localhost:4222" });
console.log("Message sent!"); console.log("Message sent!");
// Or use isPublish=false to get envelope and JSON without publishing
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222", isPublish: false });
// env: MessageEnvelope object
// env_json_str: JSON string for publishing to NATS
``` ```
#### Julia #### Julia
@@ -133,7 +145,9 @@ using NATSBridge
# Send a text message # Send a text message
data = [("message", "Hello World", "text")] data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# env: msgEnvelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
println("Message sent!") println("Message sent!")
``` ```
@@ -145,8 +159,8 @@ println("Message sent!")
from nats_bridge import smartreceive from nats_bridge import smartreceive
# Receive and process message # Receive and process message
envelope = smartreceive(msg) env = smartreceive(msg)
for dataname, data, type in envelope["payloads"]: for dataname, data, type in env["payloads"]:
print(f"Received {dataname}: {data}") print(f"Received {dataname}: {data}")
``` ```
@@ -156,8 +170,8 @@ for dataname, data, type in envelope["payloads"]:
const { smartreceive } = require('./src/NATSBridge'); const { smartreceive } = require('./src/NATSBridge');
// Receive and process message // Receive and process message
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
console.log(`Received ${payload.dataname}: ${payload.data}`); console.log(`Received ${payload.dataname}: ${payload.data}`);
} }
``` ```
@@ -168,8 +182,8 @@ for (const payload of envelope.payloads) {
using NATSBridge using NATSBridge
# Receive and process message # Receive and process message
envelope = smartreceive(msg, fileserverDownloadHandler) env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
for (dataname, data, type) in envelope["payloads"] for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data") println("Received $dataname: $data")
end end
``` ```
@@ -194,7 +208,7 @@ config = {
# Send as dictionary type # Send as dictionary type
data = [("config", config, "dictionary")] data = [("config", config, "dictionary")]
env = smartsend("/device/config", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/device/config", data, nats_url="nats://localhost:4222")
``` ```
#### JavaScript #### JavaScript
@@ -208,7 +222,7 @@ const config = {
update_interval: 60 update_interval: 60
}; };
await smartsend("/device/config", [ const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" } { dataname: "config", data: config, type: "dictionary" }
]); ]);
``` ```
@@ -225,7 +239,7 @@ config = Dict(
) )
data = [("config", config, "dictionary")] data = [("config", config, "dictionary")]
smartsend("/device/config", data) env, env_json_str = smartsend("/device/config", data)
``` ```
### Example 2: Sending Binary Data (Image) ### Example 2: Sending Binary Data (Image)
@@ -241,7 +255,7 @@ with open("image.png", "rb") as f:
# Send as binary type # Send as binary type
data = [("user_image", image_data, "binary")] data = [("user_image", image_data, "binary")]
env = smartsend("/chat/image", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/chat/image", data, nats_url="nats://localhost:4222")
``` ```
#### JavaScript #### JavaScript
@@ -253,7 +267,7 @@ const { smartsend } = require('./src/NATSBridge');
const fs = require('fs'); const fs = require('fs');
const image_data = fs.readFileSync('image.png'); const image_data = fs.readFileSync('image.png');
await smartsend("/chat/image", [ const { env, env_json_str } = await smartsend("/chat/image", [
{ dataname: "user_image", data: image_data, type: "binary" } { dataname: "user_image", data: image_data, type: "binary" }
]); ]);
``` ```
@@ -267,7 +281,7 @@ using NATSBridge
image_data = read("image.png") image_data = read("image.png")
data = [("user_image", image_data, "binary")] data = [("user_image", image_data, "binary")]
smartsend("/chat/image", data) env, env_json_str = smartsend("/chat/image", data)
``` ```
### Example 3: Request-Response Pattern ### Example 3: Request-Response Pattern
@@ -279,13 +293,15 @@ from nats_bridge import smartsend
# Send command with reply-to # Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")] data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend( env, env_json_str = smartsend(
"/device/command", "/device/command",
data, data,
nats_url="nats://localhost:4222", nats_url="nats://localhost:4222",
reply_to="/device/response", reply_to="/device/response",
reply_to_msg_id="cmd-001" reply_to_msg_id="cmd-001"
) )
# env: msgEnvelope_v1 object
# env_json_str: JSON string for publishing to NATS
``` ```
#### JavaScript (Responder) #### JavaScript (Responder)
@@ -297,10 +313,10 @@ const { smartreceive, smartsend } = require('./src/NATSBridge');
const sub = nc.subscribe("/device/command"); const sub = nc.subscribe("/device/command");
for await (const msg of sub) { for await (const msg of sub) {
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
// Process command // Process command
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
if (payload.dataname === "command") { if (payload.dataname === "command") {
const command = payload.data; const command = payload.data;
@@ -315,8 +331,8 @@ for await (const msg of sub) {
await smartsend("/device/response", [ await smartsend("/device/response", [
{ dataname: "sensor_data", data: response, type: "dictionary" } { dataname: "sensor_data", data: response, type: "dictionary" }
], { ], {
reply_to: envelope.replyTo, reply_to: env.replyTo,
reply_to_msg_id: envelope.msgId reply_to_msg_id: env.msgId
}); });
} }
} }
@@ -342,7 +358,7 @@ import os
large_data = os.urandom(2_000_000) # 2MB of random data large_data = os.urandom(2_000_000) # 2MB of random data
# Send with file server URL # Send with file server URL
env = smartsend( env, env_json_str = smartsend(
"/data/large", "/data/large",
[("large_file", large_data, "binary")], [("large_file", large_data, "binary")],
nats_url="nats://localhost:4222", nats_url="nats://localhost:4222",
@@ -364,7 +380,7 @@ const largeData = new ArrayBuffer(2_000_000);
const view = new Uint8Array(largeData); const view = new Uint8Array(largeData);
view.fill(42); // Fill with some data view.fill(42); // Fill with some data
await smartsend("/data/large", [ const { env, env_json_str } = await smartsend("/data/large", [
{ dataname: "large_file", data: largeData, type: "binary" } { dataname: "large_file", data: largeData, type: "binary" }
], { ], {
fileserverUrl: "http://localhost:8080", fileserverUrl: "http://localhost:8080",
@@ -380,7 +396,7 @@ using NATSBridge
# Create large data (> 1MB) # Create large data (> 1MB)
large_data = rand(UInt8, 2_000_000) large_data = rand(UInt8, 2_000_000)
env = smartsend( env, env_json_str = smartsend(
"/data/large", "/data/large",
[("large_file", large_data, "binary")], [("large_file", large_data, "binary")],
fileserver_url="http://localhost:8080" fileserver_url="http://localhost:8080"
@@ -409,7 +425,7 @@ data = [
("user_avatar", image_data, "image") ("user_avatar", image_data, "image")
] ]
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
``` ```
#### JavaScript #### JavaScript
@@ -419,7 +435,7 @@ const { smartsend } = require('./src/NATSBridge');
const fs = require('fs'); const fs = require('fs');
await smartsend("/chat/mixed", [ const { env, env_json_str } = await smartsend("/chat/mixed", [
{ {
dataname: "message_text", dataname: "message_text",
data: "Hello with image!", data: "Hello with image!",
@@ -445,7 +461,7 @@ data = [
("user_avatar", image_data, "image") ("user_avatar", image_data, "image")
] ]
smartsend("/chat/mixed", data) env, env_json_str = smartsend("/chat/mixed", data)
``` ```
### Example 6: Table Data (Arrow IPC) ### Example 6: Table Data (Arrow IPC)
@@ -467,7 +483,7 @@ df = pd.DataFrame({
# Send as table type # Send as table type
data = [("students", df, "table")] data = [("students", df, "table")]
env = smartsend("/data/students", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/data/students", data, nats_url="nats://localhost:4222")
``` ```
#### Julia #### Julia
@@ -484,7 +500,7 @@ df = DataFrame(
) )
data = [("students", df, "table")] data = [("students", df, "table")]
smartsend("/data/students", data) env, env_json_str = smartsend("/data/students", data)
``` ```
--- ---
@@ -503,7 +519,7 @@ using NATSBridge
# Send dictionary from Julia to JavaScript # Send dictionary from Julia to JavaScript
config = Dict("step_size" => 0.01, "iterations" => 1000) config = Dict("step_size" => 0.01, "iterations" => 1000)
data = [("config", config, "dictionary")] data = [("config", config, "dictionary")]
smartsend("/analysis/config", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
``` ```
#### JavaScript Receiver #### JavaScript Receiver
@@ -512,8 +528,8 @@ smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
const { smartreceive } = require('./src/NATSBridge'); const { smartreceive } = require('./src/NATSBridge');
// Receive dictionary from Julia // Receive dictionary from Julia
const envelope = await smartreceive(msg); const env = await smartreceive(msg);
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
if (payload.type === "dictionary") { if (payload.type === "dictionary") {
console.log("Received config:", payload.data); console.log("Received config:", payload.data);
// payload.data = { step_size: 0.01, iterations: 1000 } // payload.data = { step_size: 0.01, iterations: 1000 }
@@ -528,7 +544,7 @@ for (const payload of envelope.payloads) {
```javascript ```javascript
const { smartsend } = require('./src/NATSBridge'); const { smartsend } = require('./src/NATSBridge');
await smartsend("/data/transfer", [ const { env, env_json_str } = await smartsend("/data/transfer", [
{ dataname: "message", data: "Hello from JS!", type: "text" } { dataname: "message", data: "Hello from JS!", type: "text" }
]); ]);
``` ```
@@ -538,8 +554,8 @@ await smartsend("/data/transfer", [
```python ```python
from nats_bridge import smartreceive from nats_bridge import smartreceive
envelope = smartreceive(msg) env = smartreceive(msg)
for dataname, data, type in envelope["payloads"]: for dataname, data, type in env["payloads"]:
if type == "text": if type == "text":
print(f"Received from JS: {data}") print(f"Received from JS: {data}")
``` ```
@@ -552,7 +568,7 @@ for dataname, data, type in envelope["payloads"]:
from nats_bridge import smartsend from nats_bridge import smartsend
data = [("message", "Hello from Python!", "text")] data = [("message", "Hello from Python!", "text")]
smartsend("/chat/python", data) env, env_json_str = smartsend("/chat/python", data)
``` ```
#### Julia Receiver #### Julia Receiver
@@ -560,8 +576,8 @@ smartsend("/chat/python", data)
```julia ```julia
using NATSBridge using NATSBridge
envelope = smartreceive(msg, fileserverDownloadHandler) env = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"] for (dataname, data, type) in env["payloads"]
if type == "text" if type == "text"
println("Received from Python: $data") println("Received from Python: $data")
end end

View File

@@ -132,7 +132,7 @@ class ChatUI {
}); });
} }
await smartsend( const { env, env_json_str } = await smartsend(
`/chat/${this.currentRoom}`, `/chat/${this.currentRoom}`,
data, data,
{ {
@@ -216,15 +216,15 @@ class ChatHandler {
} }
async handleMessage(msg) { async handleMessage(msg) {
const envelope = await smartreceive(msg, { const env = await smartreceive(msg, {
fileserverDownloadHandler: this.downloadFile.bind(this) fileserverDownloadHandler: this.downloadFile.bind(this)
}); });
// Extract sender info from envelope // Extract sender info from envelope
const sender = envelope.senderName || 'Anonymous'; const sender = env.senderName || 'Anonymous';
// Process each payload // Process each payload
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
if (payload.type === 'text') { if (payload.type === 'text') {
this.ui.addMessage(sender, payload.data); this.ui.addMessage(sender, payload.data);
} else if (payload.type === 'image') { } else if (payload.type === 'image') {
@@ -304,7 +304,7 @@ class FileUploadService {
type: 'binary' type: 'binary'
}]; }];
const envelope = await smartsend( const { env, env_json_str } = await smartsend(
`/files/${recipient}`, `/files/${recipient}`,
data, data,
{ {
@@ -314,7 +314,7 @@ class FileUploadService {
} }
); );
return envelope; return env;
} }
async uploadLargeFile(filePath, recipient) { async uploadLargeFile(filePath, recipient) {
@@ -356,12 +356,12 @@ class FileDownloadService {
async downloadFile(sender, downloadId) { async downloadFile(sender, downloadId) {
// Subscribe to sender's file channel // Subscribe to sender's file channel
const envelope = await smartreceive(msg, { const env = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this) fileserverDownloadHandler: this.fetchFromUrl.bind(this)
}); });
// Process each payload // Process each payload
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
if (payload.type === 'binary') { if (payload.type === 'binary') {
const filePath = `/downloads/${payload.dataname}`; const filePath = `/downloads/${payload.dataname}`;
fs.writeFileSync(filePath, payload.data); fs.writeFileSync(filePath, payload.data);
@@ -422,9 +422,9 @@ async function uploadFile(config) {
const fileService = new FileUploadService(config.nats_url, config.fileserver_url); const fileService = new FileUploadService(config.nats_url, config.fileserver_url);
try { try {
const envelope = await fileService.uploadFile(filePath, recipient); const env = await fileService.uploadFile(filePath, recipient);
console.log('Upload successful!'); console.log('Upload successful!');
console.log(`File ID: ${envelope.payloads[0].id}`); console.log(`File ID: ${env.payloads[0].id}`);
} catch (error) { } catch (error) {
console.error('Upload failed:', error.message); console.error('Upload failed:', error.message);
} }
@@ -514,6 +514,7 @@ class SensorSender:
data = [("reading", reading.to_dict(), "dictionary")] data = [("reading", reading.to_dict(), "dictionary")]
# Default: is_publish=True (automatically publishes to NATS)
smartsend( smartsend(
f"/sensors/{sensor_id}", f"/sensors/{sensor_id}",
data, data,
@@ -521,6 +522,31 @@ class SensorSender:
fileserver_url=self.fileserver_url fileserver_url=self.fileserver_url
) )
def prepare_message_only(self, sensor_id: str, value: float, unit: str):
"""Prepare a message without publishing (is_publish=False)."""
reading = SensorReading(
sensor_id=sensor_id,
timestamp=datetime.now().isoformat(),
value=value,
unit=unit
)
data = [("reading", reading.to_dict(), "dictionary")]
# With is_publish=False, returns (env, env_json_str) without publishing
env, env_json_str = smartsend(
f"/sensors/{sensor_id}/prepare",
data,
nats_url=self.nats_url,
fileserver_url=self.fileserver_url,
is_publish=False
)
# Now you can publish manually using NATS request-reply pattern
# nc.request(subject, env_json_str, reply_to=reply_to_topic)
return env, env_json_str
def send_batch(self, readings: List[SensorReading]): def send_batch(self, readings: List[SensorReading]):
batch = SensorBatch() batch = SensorBatch()
for reading in readings: for reading in readings:
@@ -571,9 +597,9 @@ class SensorReceiver:
self.fileserver_download_handler = fileserver_download_handler self.fileserver_download_handler = fileserver_download_handler
def process_reading(self, msg): def process_reading(self, msg):
envelope = smartreceive(msg, self.fileserver_download_handler) env = smartreceive(msg, self.fileserver_download_handler)
for dataname, data, data_type in envelope["payloads"]: for dataname, data, data_type in env["payloads"]:
if data_type == "dictionary": if data_type == "dictionary":
reading = SensorReading( reading = SensorReading(
sensor_id=data["sensor_id"], sensor_id=data["sensor_id"],
@@ -673,10 +699,10 @@ class DeviceBridge:
# Poll for messages # Poll for messages
msg = self._poll_for_message() msg = self._poll_for_message()
if msg: if msg:
envelope = smartreceive(msg) env = smartreceive(msg)
# Process payloads # Process payloads
for dataname, data, data_type in envelope["payloads"]: for dataname, data, data_type in env["payloads"]:
if dataname == "command": if dataname == "command":
callback(data) callback(data)
@@ -772,9 +798,9 @@ class DashboardServer:
def receive_selection(self, callback): def receive_selection(self, callback):
def handler(msg): def handler(msg):
envelope = smartreceive(msg) env = smartreceive(msg)
for dataname, data, data_type in envelope["payloads"]: for dataname, data, data_type in env["payloads"]:
if data_type == "dictionary": if data_type == "dictionary":
callback(data) callback(data)
@@ -807,7 +833,7 @@ class DashboardUI {
async refreshData() { async refreshData() {
// Request fresh data // Request fresh data
await smartsend("/dashboard/request", [ const { env, env_json_str } = await smartsend("/dashboard/request", [
{ dataname: "request", data: { type: "refresh" }, type: "dictionary" } { dataname: "request", data: { type: "refresh" }, type: "dictionary" }
], { ], {
fileserverUrl: window.config.fileserver_url fileserverUrl: window.config.fileserver_url
@@ -816,12 +842,12 @@ class DashboardUI {
async fetchData() { async fetchData() {
// Subscribe to data updates // Subscribe to data updates
const envelope = await smartreceive(msg, { const env = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this) fileserverDownloadHandler: this.fetchFromUrl.bind(this)
}); });
// Process table data // Process table data
for (const payload of envelope.payloads) { for (const payload of env.payloads) {
if (payload.type === 'table') { if (payload.type === 'table') {
// Deserialize Arrow IPC // Deserialize Arrow IPC
this.data = this.deserializeArrow(payload.data); this.data = this.deserializeArrow(payload.data);

File diff suppressed because it is too large Load Diff

View File

@@ -460,8 +460,9 @@ async function smartsend(subject, data, options = {}) {
* @param {string} options.receiverId - UUID of the receiver (default: "") * @param {string} options.receiverId - UUID of the receiver (default: "")
* @param {string} options.replyTo - Topic to reply to (default: "") * @param {string} options.replyTo - Topic to reply to (default: "")
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "") * @param {string} options.replyToMsgId - Message ID this message is replying to (default: "")
* @param {boolean} options.isPublish - Whether to automatically publish the message to NATS (default: true)
* *
* @returns {Promise<MessageEnvelope>} - The envelope for tracking * @returns {Promise<Object>} - An object with { env: MessageEnvelope, env_json_str: string }
*/ */
const { const {
natsUrl = DEFAULT_NATS_URL, natsUrl = DEFAULT_NATS_URL,
@@ -474,7 +475,8 @@ async function smartsend(subject, data, options = {}) {
receiverName = "", receiverName = "",
receiverId = "", receiverId = "",
replyTo = "", replyTo = "",
replyToMsgId = "" replyToMsgId = "",
isPublish = true // Whether to automatically publish the message to NATS
} = options; } = options;
log_trace(correlationId, `Starting smartsend for subject: ${subject}`); log_trace(correlationId, `Starting smartsend for subject: ${subject}`);
@@ -556,10 +558,19 @@ async function smartsend(subject, data, options = {}) {
payloads: payloads payloads: payloads
}); });
// Publish message to NATS // Convert envelope to JSON string
await publish_message(natsUrl, subject, env.toString(), correlationId); const env_json_str = env.toString();
return env; // Publish to NATS if isPublish is true
if (isPublish) {
await publish_message(natsUrl, subject, env_json_str, correlationId);
}
// Return both envelope and JSON string (tuple-like structure)
return {
env: env,
env_json_str: env_json_str
};
} }
// Helper: Publish message to NATS // Helper: Publish message to NATS

View File

@@ -437,7 +437,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL, def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL,
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD, fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge", correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id=""): receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True):
"""Send data either directly via NATS or via a fileserver URL, depending on payload size. """Send data either directly via NATS or via a fileserver URL, depending on payload size.
This function intelligently routes data delivery based on payload size relative to a threshold. This function intelligently routes data delivery based on payload size relative to a threshold.
@@ -459,9 +459,12 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
receiver_id: UUID of the receiver receiver_id: UUID of the receiver
reply_to: Topic to reply to reply_to: Topic to reply to
reply_to_msg_id: Message ID this message is replying to reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS (default: True)
Returns: Returns:
MessageEnvelope: The envelope object for tracking tuple: (env, env_json_str) where:
- env: MessageEnvelope object with all metadata and payloads
- env_json_str: JSON string representation of the envelope for publishing
""" """
# Generate correlation ID if not provided # Generate correlation ID if not provided
cid = correlation_id if correlation_id else str(uuid.uuid4()) cid = correlation_id if correlation_id else str(uuid.uuid4())
@@ -549,13 +552,15 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
msg_json = env.to_json() msg_json = env.to_json()
# Publish to NATS # Publish to NATS if is_publish is True
nats_conn = NATSConnection(nats_url) if is_publish:
nats_conn.connect() nats_conn = NATSConnection(nats_url)
nats_conn.publish(subject, msg_json) nats_conn.connect()
nats_conn.close() nats_conn.publish(subject, msg_json)
nats_conn.close()
return env # Return tuple of (envelope, json_string) for both direct and link transport
return (env, msg_json)
def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5, def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5,

View File

@@ -118,7 +118,7 @@ async function test_dict_send() {
// Use smartsend with dictionary type // Use smartsend with dictionary type
// For small Dictionary: will use direct transport (JSON encoded) // For small Dictionary: will use direct transport (JSON encoded)
// For large Dictionary: will use link transport (uploaded to fileserver) // For large Dictionary: will use link transport (uploaded to fileserver)
const env = await smartsend( const { env, env_json_str } = await smartsend(
SUBJECT, SUBJECT,
[data1, data2], [data1, data2],
{ {
@@ -132,7 +132,8 @@ async function test_dict_send() {
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Publish the message to NATS
} }
); );

View File

@@ -98,7 +98,7 @@ async function test_large_binary_send() {
// Use smartsend with binary type - will automatically use link transport // Use smartsend with binary type - will automatically use link transport
// if file size exceeds the threshold (1MB by default) // if file size exceeds the threshold (1MB by default)
const env = await smartsend( const { env, env_json_str } = await smartsend(
SUBJECT, SUBJECT,
[data1, data2], [data1, data2],
{ {
@@ -112,7 +112,8 @@ async function test_large_binary_send() {
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Publish the message to NATS
} }
); );

View File

@@ -222,7 +222,7 @@ async function test_mix_send() {
]; ];
// Use smartsend with mixed content // Use smartsend with mixed content
const env = await smartsend( const { env, env_json_str } = await smartsend(
SUBJECT, SUBJECT,
payloads, payloads,
{ {
@@ -236,7 +236,8 @@ async function test_mix_send() {
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Publish the message to NATS
} }
); );

View File

@@ -118,7 +118,7 @@ async function test_table_send() {
// Use smartsend with table type // Use smartsend with table type
// For small Table: will use direct transport (Arrow IPC encoded) // For small Table: will use direct transport (Arrow IPC encoded)
// For large Table: will use link transport (uploaded to fileserver) // For large Table: will use link transport (uploaded to fileserver)
const env = await smartsend( const { env, env_json_str } = await smartsend(
SUBJECT, SUBJECT,
[data1, data2], [data1, data2],
{ {
@@ -132,7 +132,8 @@ async function test_table_send() {
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Publish the message to NATS
} }
); );

View File

@@ -94,7 +94,7 @@ async function test_text_send() {
// Use smartsend with text type // Use smartsend with text type
// For small text: will use direct transport (Base64 encoded UTF-8) // For small text: will use direct transport (Base64 encoded UTF-8)
// For large text: will use link transport (uploaded to fileserver) // For large text: will use link transport (uploaded to fileserver)
const env = await smartsend( const { env, env_json_str } = await smartsend(
SUBJECT, SUBJECT,
[data1, data2], [data1, data2],
{ {
@@ -108,7 +108,8 @@ async function test_text_send() {
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Publish the message to NATS
} }
); );

View File

@@ -92,12 +92,12 @@ function test_dict_send()
# Use smartsend with dictionary type # Use smartsend with dictionary type
# For small Dictionary: will use direct transport (JSON encoded) # For small Dictionary: will use direct transport (JSON encoded)
# For large Dictionary: will use link transport (uploaded to fileserver) # For large Dictionary: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend( env, env_json_str = NATSBridge.smartsend(
SUBJECT, SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples [data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL, broker_url = NATS_URL,
fileserver_url = FILESERVER_URL, fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler, fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id, correlation_id = correlation_id,
msg_purpose = "chat", msg_purpose = "chat",
@@ -105,7 +105,8 @@ function test_dict_send()
receiver_name = "", receiver_name = "",
receiver_id = "", receiver_id = "",
reply_to = "", reply_to = "",
reply_to_msg_id = "" reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
) )
log_trace("Sent message with $(length(env.payloads)) payloads") log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -114,7 +115,7 @@ function test_dict_send()
for (i, payload) in enumerate(env.payloads) for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):") log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)") log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)") log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes") log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)") log_trace(" Encoding: $(payload.encoding)")

View File

@@ -79,12 +79,12 @@ function test_large_binary_send()
# Use smartsend with binary type - will automatically use link transport # Use smartsend with binary type - will automatically use link transport
# if file size exceeds the threshold (1MB by default) # if file size exceeds the threshold (1MB by default)
# API: smartsend(subject, [(dataname, data, type), ...]; keywords...) # API: smartsend(subject, [(dataname, data, type), ...]; keywords...)
env = NATSBridge.smartsend( env, env_json_str = NATSBridge.smartsend(
SUBJECT, SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples [data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL; broker_url = NATS_URL;
fileserver_url = FILESERVER_URL, fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler, fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, size_threshold = 1_000_000,
correlation_id = correlation_id, correlation_id = correlation_id,
msg_purpose = "chat", msg_purpose = "chat",
@@ -92,11 +92,12 @@ function test_large_binary_send()
receiver_name = "", receiver_name = "",
receiver_id = "", receiver_id = "",
reply_to = "", reply_to = "",
reply_to_msg_id = "" reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
) )
log_trace("Sent message with transport: $(env.payloads[1].transport)") log_trace("Sent message with transport: $(env.payloads[1].transport)")
log_trace("Envelope type: $(env.payloads[1].type)") log_trace("Envelope type: $(env.payloads[1].payload_type)")
# Check if link transport was used # Check if link transport was used
if env.payloads[1].transport == "link" if env.payloads[1].transport == "link"

View File

@@ -186,12 +186,12 @@ function test_mix_send()
] ]
# Use smartsend with mixed content # Use smartsend with mixed content
env = NATSBridge.smartsend( env, env_json_str = NATSBridge.smartsend(
SUBJECT, SUBJECT,
payloads; # List of (dataname, data, type) tuples payloads; # List of (dataname, data, type) tuples
nats_url = NATS_URL, broker_url = NATS_URL,
fileserver_url = FILESERVER_URL, fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler, fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id, correlation_id = correlation_id,
msg_purpose = "chat", msg_purpose = "chat",
@@ -199,7 +199,8 @@ function test_mix_send()
receiver_name = "", receiver_name = "",
receiver_id = "", receiver_id = "",
reply_to = "", reply_to = "",
reply_to_msg_id = "" reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
) )
log_trace("Sent message with $(length(env.payloads)) payloads") log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -208,7 +209,7 @@ function test_mix_send()
for (i, payload) in enumerate(env.payloads) for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):") log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)") log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)") log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes") log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)") log_trace(" Encoding: $(payload.encoding)")

View File

@@ -90,12 +90,12 @@ function test_table_send()
# Use smartsend with table type # Use smartsend with table type
# For small DataFrame: will use direct transport (Base64 encoded Arrow IPC) # For small DataFrame: will use direct transport (Base64 encoded Arrow IPC)
# For large DataFrame: will use link transport (uploaded to fileserver) # For large DataFrame: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend( env, env_json_str = NATSBridge.smartsend(
SUBJECT, SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples [data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL, broker_url = NATS_URL,
fileserver_url = FILESERVER_URL, fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler, fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id, correlation_id = correlation_id,
msg_purpose = "chat", msg_purpose = "chat",
@@ -103,7 +103,8 @@ function test_table_send()
receiver_name = "", receiver_name = "",
receiver_id = "", receiver_id = "",
reply_to = "", reply_to = "",
reply_to_msg_id = "" reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
) )
log_trace("Sent message with $(length(env.payloads)) payloads") log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -112,7 +113,7 @@ function test_table_send()
for (i, payload) in enumerate(env.payloads) for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):") log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)") log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)") log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes") log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)") log_trace(" Encoding: $(payload.encoding)")

View File

@@ -75,12 +75,12 @@ function test_text_send()
# Use smartsend with text type # Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8) # For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver) # For large text: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend( env, env_json_str = NATSBridge.smartsend(
SUBJECT, SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples [data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL, broker_url = NATS_URL,
fileserver_url = FILESERVER_URL, fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler, fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id, correlation_id = correlation_id,
msg_purpose = "chat", msg_purpose = "chat",
@@ -88,7 +88,8 @@ function test_text_send()
receiver_name = "", receiver_name = "",
receiver_id = "", receiver_id = "",
reply_to = "", reply_to = "",
reply_to_msg_id = "" reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
) )
log_trace("Sent message with $(length(env.payloads)) payloads") log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -97,7 +98,7 @@ function test_text_send()
for (i, payload) in enumerate(env.payloads) for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):") log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)") log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)") log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes") log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)") log_trace(" Encoding: $(payload.encoding)")

View File

@@ -64,7 +64,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}") log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with dictionary type # Use smartsend with dictionary type
env = smartsend( env, env_json_str = smartsend(
SUBJECT, SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples [data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL, nats_url=NATS_URL,
@@ -76,7 +76,8 @@ def main():
receiver_name="", receiver_name="",
receiver_id="", receiver_id="",
reply_to="", reply_to="",
reply_to_msg_id="" reply_to_msg_id="",
is_publish=True # Publish the message to NATS
) )
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads") log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -44,7 +44,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}") log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with binary type # Use smartsend with binary type
env = smartsend( env, env_json_str = smartsend(
SUBJECT, SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples [data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL, nats_url=NATS_URL,
@@ -56,7 +56,8 @@ def main():
receiver_name="", receiver_name="",
receiver_id="", receiver_id="",
reply_to="", reply_to="",
reply_to_msg_id="" reply_to_msg_id="",
is_publish=True # Publish the message to NATS
) )
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads") log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -58,7 +58,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}") log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with mixed types # Use smartsend with mixed types
env = smartsend( env, env_json_str = smartsend(
SUBJECT, SUBJECT,
data, # List of (dataname, data, type) tuples data, # List of (dataname, data, type) tuples
nats_url=NATS_URL, nats_url=NATS_URL,
@@ -70,7 +70,8 @@ def main():
receiver_name="", receiver_name="",
receiver_id="", receiver_id="",
reply_to="", reply_to="",
reply_to_msg_id="" reply_to_msg_id="",
is_publish=True # Publish the message to NATS
) )
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads") log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -46,7 +46,7 @@ def main():
# Use smartsend with text type # Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8) # For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver) # For large text: will use link transport (uploaded to fileserver)
env = smartsend( env, env_json_str = smartsend(
SUBJECT, SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples [data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL, nats_url=NATS_URL,
@@ -58,7 +58,8 @@ def main():
receiver_name="", receiver_name="",
receiver_id="", receiver_id="",
reply_to="", reply_to="",
reply_to_msg_id="" reply_to_msg_id="",
is_publish=True # Publish the message to NATS
) )
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads") log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")