14 Commits

Author SHA1 Message Date
3e1c8d563e update 2026-02-25 15:20:29 +07:00
1299febcdc update 2026-02-25 14:25:08 +07:00
be94c62760 update 2026-02-25 12:24:02 +07:00
6a862ef243 update 2026-02-25 12:09:00 +07:00
ae2de5fc62 update 2026-02-25 10:33:30 +07:00
df0bbc7327 update 2026-02-25 09:58:10 +07:00
d94761c866 update 2026-02-25 09:44:08 +07:00
f8235e1a59 update 2026-02-25 08:54:04 +07:00
647cadf497 update 2026-02-25 08:33:32 +07:00
8c793a81b6 update 2026-02-25 08:02:03 +07:00
6a42ba7e43 update 2026-02-25 07:29:42 +07:00
14b3790251 update 2026-02-25 06:23:24 +07:00
61d81bed62 update 2026-02-25 06:04:40 +07:00
1a10bc1a5f update 2026-02-25 05:32:59 +07:00
10 changed files with 1152 additions and 487 deletions

View File

@@ -12,4 +12,42 @@ Role: Principal Systems Architect & Lead Software Engineer.Objective: Implement
Create a walkthrough for Julia service-A service sending a mix-content chat message to Julia service-B. the chat message must includes Create a walkthrough for Julia service-A service sending a mix-content chat message to Julia service-B. the chat message must includes
I updated the following:
- NATSBridge.jl. Essentially I add NATS_connection keyword and new publish_message function to support the keyword.
Use them and ONLY them as ground truth.
Then update the following files accordingly:
- architecture.md
- implementation.md
All API should be semantically consistent and naming should be consistent across the board.
Task: Update NATSBridge.js to reflect recent changes in NATSBridge.jl and docs
Context: NATSBridge.jl and docs has been updated.
Requirements:
Source of Truth: Treat the updated NATSBridge.jl and docs as the definitive source.
API Consistency: Ensure the Main Package API (e.g., smartsend(), publish_message()) uses consistent naming across all three supported languages.
Ecosystem Variance: Low-level native functions (e.g., NATS.connect(), JSON.read()) should follow the conventions of the specific language ecosystem and do not require cross-language consistency.

121
README.md
View File

@@ -173,7 +173,7 @@ from nats_bridge import smartsend
# Send a text message # Send a text message
data = [("message", "Hello World", "text")] data = [("message", "Hello World", "text")]
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222")
print("Message sent!") print("Message sent!")
``` ```
@@ -185,7 +185,7 @@ const { smartsend } = require('./src/NATSBridge');
// Send a text message // Send a text message
const { env, env_json_str } = await smartsend("/chat/room1", [ const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" } { dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" }); ], { broker_url: "nats://localhost:4222" });
console.log("Message sent!"); console.log("Message sent!");
``` ```
@@ -197,7 +197,7 @@ using NATSBridge
# Send a text message # Send a text message
data = [("message", "Hello World", "text")] data = [("message", "Hello World", "text")]
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; nats_url="nats://localhost:4222") env, env_json_str = NATSBridge.smartsend("/chat/room1", data; broker_url="nats://localhost:4222")
println("Message sent!") println("Message sent!")
``` ```
@@ -313,7 +313,7 @@ from nats_bridge import smartsend
env, env_json_str = smartsend( env, env_json_str = smartsend(
subject, # NATS subject to publish to subject, # NATS subject to publish to
data, # List of (dataname, data, type) tuples data, # List of (dataname, data, type) tuples
nats_url="nats://localhost:4222", # NATS server URL broker_url="nats://localhost:4222", # NATS server URL
fileserver_url="http://localhost:8080", # File server URL fileserver_url="http://localhost:8080", # File server URL
fileserver_upload_handler=plik_oneshot_upload, # Upload handler function fileserver_upload_handler=plik_oneshot_upload, # Upload handler function
size_threshold=1_000_000, # Threshold in bytes (default: 1MB) size_threshold=1_000_000, # Threshold in bytes (default: 1MB)
@@ -337,18 +337,18 @@ const { env, env_json_str } = await smartsend(
subject, // NATS subject subject, // NATS subject
data, // Array of {dataname, data, type} data, // Array of {dataname, data, type}
{ {
natsUrl: "nats://localhost:4222", broker_url: "nats://localhost:4222",
fileserverUrl: "http://localhost:8080", fileserver_url: "http://localhost:8080",
fileserverUploadHandler: customUploadHandler, fileserver_upload_handler: customUploadHandler,
sizeThreshold: 1_000_000, size_threshold: 1_000_000,
correlationId: "custom-id", correlation_id: "custom-id",
msgPurpose: "chat", msg_purpose: "chat",
senderName: "NATSBridge", sender_name: "NATSBridge",
receiverName: "", receiver_name: "",
receiverId: "", receiver_id: "",
replyTo: "", reply_to: "",
replyToMsgId: "", reply_to_msg_id: "",
isPublish: true // Whether to automatically publish to NATS is_publish: true // Whether to automatically publish to NATS
} }
); );
``` ```
@@ -361,9 +361,9 @@ using NATSBridge
env, env_json_str = NATSBridge.smartsend( env, env_json_str = NATSBridge.smartsend(
subject, # NATS subject subject, # NATS subject
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type)
nats_url::String = "nats://localhost:4222", broker_url::String = "nats://localhost:4222",
fileserver_url = "http://localhost:8080", fileserver_url = "http://localhost:8080",
fileserverUploadHandler::Function = plik_oneshot_upload, fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000, size_threshold::Int = 1_000_000,
correlation_id::Union{String, Nothing} = nothing, correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat", msg_purpose::String = "chat",
@@ -372,7 +372,8 @@ env, env_json_str = NATSBridge.smartsend(
receiver_id::String = "", receiver_id::String = "",
reply_to::String = "", reply_to::String = "",
reply_to_msg_id::String = "", reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS is_publish::Bool = true, # Whether to automatically publish to NATS
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
) )
# Returns: (msgEnvelope_v1, JSON string) # Returns: (msgEnvelope_v1, JSON string)
# - env: msgEnvelope_v1 object with all envelope metadata and payloads # - env: msgEnvelope_v1 object with all envelope metadata and payloads
@@ -423,9 +424,9 @@ const env = await smartreceive(
using NATSBridge using NATSBridge
# Note: msg is a NATS.Msg object passed from the subscription callback # Note: msg is a NATS.Msg object passed from the subscription callback
env, env_json_str = NATSBridge.smartreceive( env = NATSBridge.smartreceive(
msg::NATS.Msg; msg::NATS.Msg;
fileserverDownloadHandler::Function = _fetch_with_backoff, fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5, max_retries::Int = 5,
base_delay::Int = 100, base_delay::Int = 100,
max_delay::Int = 5000 max_delay::Int = 5000
@@ -433,6 +434,35 @@ env, env_json_str = NATSBridge.smartreceive(
# Returns: Dict with envelope metadata and payloads array # Returns: Dict with envelope metadata and payloads array
``` ```
### publish_message
Publish a message to a NATS subject. This function is available in Julia with two overloads:
#### Julia
**Using broker URL (creates new connection):**
```julia
using NATSBridge, NATS
# Publish with URL - creates a new connection
NATSBridge.publish_message(
"nats://localhost:4222", # broker_url
"/chat/room1", # subject
"{\"correlation_id\":\"abc123\"}", # message
"abc123" # correlation_id
)
```
**Using pre-existing connection (saves connection overhead):**
```julia
using NATSBridge, NATS
# Create connection once and reuse
conn = NATS.connect("nats://localhost:4222")
NATSBridge.publish_message(conn, "/chat/room1", "{\"correlation_id\":\"abc123\"}", "abc123")
# Connection is automatically drained after publish
```
--- ---
## Payload Types ## Payload Types
@@ -488,7 +518,7 @@ smartsend("/topic", data, fileserver_url="http://localhost:8080")
```javascript ```javascript
await smartsend("/topic", [ await smartsend("/topic", [
{ dataname: "file", data: largeData, type: "binary" } { dataname: "file", data: largeData, type: "binary" }
], { fileserverUrl: "http://localhost:8080" }); ], { fileserver_url: "http://localhost:8080" });
``` ```
#### Julia #### Julia
@@ -529,7 +559,7 @@ const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "user_avatar", data: image_data, type: "image" }, { dataname: "user_avatar", data: image_data, type: "image" },
{ dataname: "large_document", data: large_file_data, type: "binary" } { dataname: "large_document", data: large_file_data, type: "binary" }
], { ], {
fileserverUrl: "http://localhost:8080" fileserver_url: "http://localhost:8080"
}); });
``` ```
@@ -653,9 +683,10 @@ from nats_bridge import smartsend
env, env_json_str = smartsend( env, env_json_str = smartsend(
"/device/command", "/device/command",
[("command", {"action": "read_sensor"}, "dictionary")], [("command", {"action": "read_sensor"}, "dictionary")],
broker_url="nats://localhost:4222",
reply_to="/device/response" reply_to="/device/response"
) )
# env: msgEnvelope_v1 object # env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS # env_json_str: JSON string for publishing to NATS
# The env_json_str can also be published directly using NATS request-reply pattern # The env_json_str can also be published directly using NATS request-reply pattern
@@ -670,18 +701,24 @@ from nats_bridge import smartreceive, smartsend
# Configuration # Configuration
SUBJECT = "/device/command" SUBJECT = "/device/command"
REPLY_SUBJECT = "/device/response"
NATS_URL = "nats://localhost:4222" NATS_URL = "nats://localhost:4222"
async def main(): async def main():
nc = await nats.connect(NATS_URL) nc = await nats.connect(NATS_URL)
async def message_handler(msg): async def message_handler(msg):
# Receive and parse the incoming message envelope
env = smartreceive(msg.data) env = smartreceive(msg.data)
# Extract reply_to from the envelope metadata
reply_to = env["reply_to"]
for dataname, data, type in env["payloads"]: for dataname, data, type in env["payloads"]:
if data.get("action") == "read_sensor": if data.get("action") == "read_sensor":
response = {"sensor_id": "sensor-001", "value": 42.5} response = {"sensor_id": "sensor-001", "value": 42.5}
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")]) # Send response to the reply_to subject from the request
if reply_to:
smartsend(reply_to, [("data", response, "dictionary")])
sid = await nc.subscribe(SUBJECT, cb=message_handler) sid = await nc.subscribe(SUBJECT, cb=message_handler)
await asyncio.sleep(120) await asyncio.sleep(120)
@@ -697,7 +734,8 @@ const { smartsend } = require('./src/NATSBridge');
const { env, env_json_str } = await smartsend("/device/command", [ const { env, env_json_str } = await smartsend("/device/command", [
{ dataname: "command", data: { action: "read_sensor" }, type: "dictionary" } { dataname: "command", data: { action: "read_sensor" }, type: "dictionary" }
], { ], {
replyTo: "/device/response" broker_url: "nats://localhost:4222",
reply_to: "/device/response"
}); });
``` ```
@@ -708,7 +746,6 @@ const { connect } = require('nats');
// Configuration // Configuration
const SUBJECT = "/device/command"; const SUBJECT = "/device/command";
const REPLY_SUBJECT = "/device/response";
const NATS_URL = "nats://localhost:4222"; const NATS_URL = "nats://localhost:4222";
async function main() { async function main() {
@@ -718,12 +755,19 @@ async function main() {
for await (const msg of sub) { for await (const msg of sub) {
const env = await smartreceive(msg); const env = await smartreceive(msg);
// Extract reply_to from the envelope metadata
const replyTo = env["reply_to"];
for (const payload of env.payloads) { for (const payload of env.payloads) {
if (payload.dataname === "command" && payload.data.action === "read_sensor") { if (payload.dataname === "command" && payload.data.action === "read_sensor") {
const response = { sensor_id: "sensor-001", value: 42.5 }; const response = { sensor_id: "sensor-001", value: 42.5 };
await smartsend(REPLY_SUBJECT, [ // Send response to the reply_to subject from the request
{ dataname: "data", data: response, type: "dictionary" } if (replyTo) {
]); await smartsend(replyTo, [
{ dataname: "data", data: response, type: "dictionary" }
]);
}
} }
} }
} }
@@ -739,6 +783,7 @@ using NATSBridge
env, env_json_str = NATSBridge.smartsend( env, env_json_str = NATSBridge.smartsend(
"/device/command", "/device/command",
[("command", Dict("action" => "read_sensor"), "dictionary")]; [("command", Dict("action" => "read_sensor"), "dictionary")];
broker_url="nats://localhost:4222",
reply_to="/device/response" reply_to="/device/response"
) )
``` ```
@@ -749,17 +794,23 @@ using NATS, NATSBridge
# Configuration # Configuration
const SUBJECT = "/device/command" const SUBJECT = "/device/command"
const REPLY_SUBJECT = "/device/response"
const NATS_URL = "nats://localhost:4222" const NATS_URL = "nats://localhost:4222"
function test_responder() function test_responder()
conn = NATS.connect(NATS_URL) conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg NATS.subscribe(conn, SUBJECT) do msg
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler) env = NATSBridge.smartreceive(msg, fileserver_download_handler=_fetch_with_backoff)
# Extract reply_to from the envelope metadata
reply_to = env["reply_to"]
for (dataname, data, type) in env["payloads"] for (dataname, data, type) in env["payloads"]
if dataname == "command" && data["action"] == "read_sensor" if dataname == "command" && data["action"] == "read_sensor"
response = Dict("sensor_id" => "sensor-001", "value" => 42.5) response = Dict("sensor_id" => "sensor-001", "value" => 42.5)
smartsend(REPLY_SUBJECT, [("data", response, "dictionary")]) # Send response to the reply_to subject from the request
if !isempty(reply_to)
smartsend(reply_to, [("data", response, "dictionary")])
end
end end
end end
end end
@@ -790,7 +841,7 @@ async def main():
# Send sensor data # Send sensor data
data = [("temperature", "25.5", "text"), ("humidity", 65, "dictionary")] data = [("temperature", "25.5", "text"), ("humidity", 65, "dictionary")]
smartsend("/device/sensors", data, nats_url="nats://localhost:4222") smartsend("/device/sensors", data, broker_url="nats://localhost:4222")
# Receive commands - msg comes from the callback # Receive commands - msg comes from the callback
async def message_handler(msg): async def message_handler(msg):

View File

@@ -18,9 +18,9 @@ The system uses **handler functions** to abstract file server operations, allowi
```julia ```julia
# Upload handler - uploads data to file server and returns URL # Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter # The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8}) # It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url" # Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} fileserver_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff # Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter # The handler is passed to smartreceive as fileserver_download_handler parameter
@@ -40,8 +40,8 @@ The system uses a **standardized list-of-tuples format** for all payload operati
# Input format for smartsend (always a list of tuples with type info) # Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...] [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples) # Output format for smartreceive (returns a dictionary-like object with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}} # Returns: Dict-like object with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# { # {
# "correlation_id": "...", # "correlation_id": "...",
# "msg_id": "...", # "msg_id": "...",
@@ -369,6 +369,31 @@ graph TD
## Implementation Details ## Implementation Details
### API Consistency Across Languages
**High-Level API (Consistent Across All Languages):**
- `smartsend(subject, data, ...)` - Main publishing function
- `smartreceive(msg, ...)` - Main receiving function
- Message envelope structure (`msg_envelope_v1` / `MessageEnvelope`)
- Payload structure (`msg_payload_v1` / `MessagePayload`)
- Transport strategy (direct vs link based on size threshold)
- Supported payload types: text, dictionary, table, image, audio, video, binary
**Low-Level Native Functions (Language-Specific Conventions):**
- Julia: `NATS.connect()`, `publish_message()`, function overloading
- JavaScript: `nats.js` client, native async/await patterns
- Python: `nats-python` client, native async/await patterns
**Connection Reuse Pattern:**
- **Julia:** Uses `NATS_connection` keyword parameter with function overloading
- **JavaScript/Python:** Achieved by creating NATS client outside the function and reusing it in custom handlers
**Note on `is_publish`:**
- `is_publish` is simply a switch to control automatic publishing
- When `true` (default): Message is published to NATS automatically
- When `false`: Returns `(env, env_json_str)` without publishing, allowing manual publishing
- Connection reuse is achieved separately by creating NATS client outside the function
### Julia Implementation ### Julia Implementation
#### Dependencies #### Dependencies
@@ -395,10 +420,25 @@ function smartsend(
receiver_id::String = "", receiver_id::String = "",
reply_to::String = "", reply_to::String = "",
reply_to_msg_id::String = "", reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS is_publish::Bool = true, # Whether to automatically publish to NATS
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
) )
``` ```
**Keyword Parameter - NATS_connection:**
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios where connection reuse provides performance benefits.
**Connection Handling Logic:**
```julia
if is_publish == false
# skip publish a message
elseif is_publish == true && NATS_connection === nothing
publish_message(broker_url, subject, env_json_str, cid) # Creates new connection
elseif is_publish == true && NATS_connection !== nothing
publish_message(NATS_connection, subject, env_json_str, cid) # Uses provided connection
end
```
**Return Value:** **Return Value:**
- Returns a tuple `(env, env_json_str)` where: - Returns a tuple `(env, env_json_str)` where:
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads - `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
@@ -442,10 +482,10 @@ end
``` ```
**Output Format:** **Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields: - Returns a JSON object (dictionary) containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url` - `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary - `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data - `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
**Process Flow:** **Process Flow:**
1. Parse the JSON envelope to extract all fields 1. Parse the JSON envelope to extract all fields
@@ -459,6 +499,57 @@ end
**Note:** The `fileserver_download_handler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`. **Note:** The `fileserver_download_handler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
#### publish_message Function
The `publish_message` function provides two overloads for publishing messages to NATS:
**Overload 1 - URL-based publishing (creates new connection):**
```julia
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url) # Create NATS connection
publish_message(conn, subject, message, correlation_id)
end
```
**Overload 2 - Connection-based publishing (uses pre-existing connection):**
```julia
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject") # Log successful publish
finally
NATS.drain(conn) # Ensure connection is closed properly
end
end
```
**Use Case:** Use the connection-based overload when you already have an established NATS connection and want to publish multiple messages without the overhead of creating a new connection for each publish. This is a Julia-specific optimization that leverages function overloading.
**Integration with smartsend:**
```julia
# When NATS_connection is provided to smartsend, it uses the connection-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
NATS_connection=my_connection, # Pre-existing connection
is_publish=true
)
# Uses: publish_message(NATS_connection, subject, env_json_str, cid)
# When NATS_connection is not provided, it uses the URL-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
broker_url="nats://localhost:4222",
is_publish=true
)
# Uses: publish_message(broker_url, subject, env_json_str, cid)
```
**API Consistency Note:**
- **High-level API (smartsend, smartreceive):** Uses consistent naming across all three languages (Julia, JavaScript, Python/Micropython)
- **Low-level native functions (NATS.connect(), publish_message()):** Follow the conventions of the specific language ecosystem and do not require cross-language consistency
### JavaScript Implementation ### JavaScript Implementation
#### Dependencies #### Dependencies
@@ -489,13 +580,15 @@ async function smartsend(
- `receiver_id` (String) - Message receiver ID (default: `""`) - `receiver_id` (String) - Message receiver ID (default: `""`)
- `reply_to` (String) - Topic to reply to (default: `""`) - `reply_to` (String) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (String) - Message ID this message is replying to (default: `""`) - `reply_to_msg_id` (String) - Message ID this message is replying to (default: `""`)
- `is_publish` (Boolean) - Whether to automatically publish the message to NATS (default: `true`)
- `fileserver_upload_handler` (Function) - Custom upload handler function - `fileserver_upload_handler` (Function) - Custom upload handler function
**Note:** JavaScript uses `is_publish` option (instead of `NATS_connection` keyword) to control automatic publishing behavior. Connection reuse can be achieved by creating a NATS client outside the function and reusing it in a custom `fileserver_upload_handler` or custom publish implementation.
**Return Value:** **Return Value:**
- Returns a Promise that resolves to an object containing: - Returns a Promise that resolves to an object containing:
- `env` - The envelope object containing all metadata and payloads - `env` - The envelope object containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing - `env_json_str` - JSON string representation of the envelope for publishing
- `published` - Boolean indicating whether the message was automatically published to NATS
**Input Format:** **Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]` - `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
@@ -531,16 +624,16 @@ async function smartreceive(msg, options = {})
- Returns a Promise that resolves to an object containing all envelope fields: - Returns a Promise that resolves to an object containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url` - `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary - `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data with keys: `dataname`, `data`, `payload_type` - `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
**Process Flow:** **Process Flow:**
1. Parse the JSON envelope to extract all fields 1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads` array 2. Iterate through each payload in `payloads` array
3. For each payload: 3. For each payload:
- Determine transport type (`direct` or `link`) - Determine transport type (`direct` or `link`)
- If `direct`: Base64 decode the data from the message - If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`) - If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.) - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples 4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`. **Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`.
@@ -557,26 +650,40 @@ async function smartreceive(msg, options = {})
#### smartsend Function #### smartsend Function
```python ```python
async def smartsend( def smartsend(
subject: str, subject: str,
data: List[Tuple[str, Any, str]], # List of (dataname, data, type) tuples data: List[Tuple[str, Any, str]], # List of (dataname, data, type) tuples
options: Dict = {} broker_url: str = DEFAULT_BROKER_URL,
) fileserver_url: str = DEFAULT_FILESERVER_URL,
fileserver_upload_handler: Callable = plik_oneshot_upload,
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
correlation_id: Union[str, None] = None,
msg_purpose: str = "chat",
sender_name: str = "NATSBridge",
receiver_name: str = "",
receiver_id: str = "",
reply_to: str = "",
reply_to_msg_id: str = "",
is_publish: bool = True
) -> Tuple[MessageEnvelope, str]
``` ```
**Options:** **Options:**
- `broker_url` (str) - NATS server URL (default: `"nats://localhost:4222"`) - `broker_url` (str) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (str) - Base URL of the file server (default: `"http://localhost:8080"`) - `fileserver_url` (str) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (int) - Threshold in bytes for transport selection (default: `1048576` = 1MB) - `size_threshold` (int) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (str) - Optional correlation ID for tracing - `correlation_id` (str) - Optional correlation ID for tracing (auto-generated if None)
- `msg_purpose` (str) - Purpose of the message (default: `"chat"`) - `msg_purpose` (str) - Purpose of the message (default: `"chat"`)
- `sender_name` (str) - Sender name (default: `"NATSBridge"`) - `sender_name` (str) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (str) - Message receiver name (default: `""`) - `receiver_name` (str) - Message receiver name (default: `""`)
- `receiver_id` (str) - Message receiver ID (default: `""`) - `receiver_id` (str) - Message receiver ID (default: `""`)
- `reply_to` (str) - Topic to reply to (default: `""`) - `reply_to` (str) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (str) - Message ID this message is replying to (default: `""`) - `reply_to_msg_id` (str) - Message ID this message is replying to (default: `""`)
- `is_publish` (bool) - Whether to automatically publish the message to NATS (default: `True`)
- `fileserver_upload_handler` (Callable) - Custom upload handler function - `fileserver_upload_handler` (Callable) - Custom upload handler function
**Note:** Python uses `is_publish` parameter (instead of `NATS_connection` keyword) to control automatic publishing behavior. Connection reuse can be achieved by creating a NATS client outside the function and reusing it in a custom `fileserver_upload_handler` or custom publish implementation.
**Return Value:** **Return Value:**
- Returns a tuple `(env, env_json_str)` where: - Returns a tuple `(env, env_json_str)` where:
- `env` - The envelope dictionary containing all metadata and payloads - `env` - The envelope dictionary containing all metadata and payloads
@@ -616,7 +723,7 @@ async def smartreceive(
- `correlation_id` (str) - Optional correlation ID for tracing - `correlation_id` (str) - Optional correlation ID for tracing
**Output Format:** **Output Format:**
- Returns a dictionary containing all envelope fields: - Returns a JSON object (dictionary) containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url` - `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary - `metadata` - Message-level metadata dictionary
- `payloads` - List of tuples, each containing `(dataname, data, payload_type)` with deserialized payload data - `payloads` - List of tuples, each containing `(dataname, data, payload_type)` with deserialized payload data
@@ -824,7 +931,7 @@ async def smartreceive(
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components across all platforms. **Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components across all platforms.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads. **Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
## Performance Considerations ## Performance Considerations

View File

@@ -28,9 +28,9 @@ The system uses **handler functions** to abstract file server operations, allowi
```julia ```julia
# Upload handler - uploads data to file server and returns URL # Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter # The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8}) # It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url" # Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} fileserver_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff # Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter # The handler is passed to smartreceive as fileserver_download_handler parameter
@@ -148,10 +148,12 @@ NATSBridge is designed for seamless communication between Julia, JavaScript, and
### Example: Julia ↔ Python ↔ JavaScript ### Example: Julia ↔ Python ↔ JavaScript
```julia ```julia
# Julia sender # Julia sender - smartsend returns (env, env_json_str)
using NATSBridge using NATSBridge
data = [("message", "Hello from Julia!", "text")] data = [("message", "Hello from Julia!", "text")]
smartsend("/cross_platform", data, broker_url="nats://localhost:4222") env, env_json_str = smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
``` ```
```javascript ```javascript
@@ -165,7 +167,7 @@ const env = await smartreceive(msg);
# Python sender # Python sender
from nats_bridge import smartsend from nats_bridge import smartsend
data = [("response", "Hello from Python!", "text")] data = [("response", "Hello from Python!", "text")]
smartsend("/cross_platform", data, nats_url="nats://localhost:4222") smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
``` ```
All three platforms can communicate seamlessly using the same NATS subjects and data format. All three platforms can communicate seamlessly using the same NATS subjects and data format.
@@ -314,6 +316,31 @@ julia test/scenario3_julia_to_julia.jl
node test/scenario3_julia_to_julia.js node test/scenario3_julia_to_julia.js
``` ```
## API Consistency Across Languages
**High-Level API (Consistent Across All Languages):**
- `smartsend(subject, data, ...)` - Main publishing function
- `smartreceive(msg, ...)` - Main receiving function
- Message envelope structure (`msg_envelope_v1` / `MessageEnvelope`)
- Payload structure (`msg_payload_v1` / `MessagePayload`)
- Transport strategy (direct vs link based on size threshold)
- Supported payload types: text, dictionary, table, image, audio, video, binary
**Low-Level Native Functions (Language-Specific Conventions):**
- Julia: `NATS.connect()`, `publish_message()`, function overloading
- JavaScript: `nats.js` client, native async/await patterns
- Python: `nats-python` client, native async/await patterns
**Connection Reuse Pattern - Key Differences:**
- **Julia:** Uses `NATS_connection` keyword parameter with function overloading for automatic connection management
- **JavaScript/Python:** Achieved by creating NATS client outside the function and reusing it in custom handlers or custom publish implementations
**Why the Difference?**
- Julia supports function overloading and keyword arguments, allowing `NATS_connection` to be passed as an optional parameter
- JavaScript/Python use a simpler `is_publish` option to control automatic publishing
- `is_publish` is simply a switch: when `true`, publish automatically; when `false`, return `(env, env_json_str)` without publishing
- For connection reuse in JavaScript/Python, create a NATS client once and reuse it in your custom `fileserver_upload_handler` or custom publish logic
## Usage ## Usage
### Scenario 1: Command & Control (Small Dictionary) ### Scenario 1: Command & Control (Small Dictionary)
@@ -324,12 +351,41 @@ node test/scenario3_julia_to_julia.js
```julia ```julia
using NATSBridge using NATSBridge
# Subscribe to control subject # Send small dictionary config (wrapped in list with type)
# Parse JSON envelope config = Dict("step_size" => 0.01, "iterations" => 1000, "threshold" => 0.5)
# Execute simulation with parameters env, env_json_str = smartsend(
# Send acknowledgment "control",
[("config", config, "dictionary")],
broker_url="nats://localhost:4222"
)
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
``` ```
**Julia (Sender/Receiver) with NATS_connection for connection reuse:**
```julia
using NATSBridge
# Create connection once for high-frequency publishing
conn = NATS.connect("nats://localhost:4222")
# Send multiple messages using the same connection (saves connection overhead)
for i in 1:100
config = Dict("iteration" => i, "data" => rand())
smartsend(
"control",
[("config", config, "dictionary")],
NATS_connection=conn, # Reuse connection
is_publish=true
)
end
# Close connection when done
NATS.close(conn)
```
**Use Case:** High-frequency publishing scenarios where connection reuse provides performance benefits by avoiding the overhead of establishing a new NATS connection for each message.
**JavaScript (Sender/Receiver):** **JavaScript (Sender/Receiver):**
```javascript ```javascript
const { smartsend } = require('./src/NATSBridge'); const { smartsend } = require('./src/NATSBridge');
@@ -342,9 +398,39 @@ const config = {
threshold: 0.5 threshold: 0.5
}; };
// Use is_publish option to control automatic publishing
await smartsend("control", [ await smartsend("control", [
{ dataname: "config", data: config, type: "dictionary" } { dataname: "config", data: config, type: "dictionary" }
]); ], {
is_publish: true // Automatically publish to NATS
});
```
**Connection Reuse in JavaScript:**
To achieve connection reuse in JavaScript, create a NATS client outside the function and use it in a custom `fileserver_upload_handler` or custom publish implementation:
```javascript
const { connect } = require('nats');
const { smartsend } = require('./src/NATSBridge');
// Create connection once
const nc = await connect({ servers: ['nats://localhost:4222'] });
// Send multiple messages using the same connection
for (let i = 0; i < 100; i++) {
const config = { iteration: i, data: Math.random() };
// Option 1: Use is_publish=false and publish manually with your connection
const { env, env_json_str } = await smartsend("control", [
{ dataname: "config", data: config, type: "dictionary" }
], { is_publish: false });
// Publish with your existing connection
await nc.publish("control", env_json_str);
}
// Close connection when done
await nc.close();
``` ```
**Python/Micropython (Sender/Receiver):** **Python/Micropython (Sender/Receiver):**
@@ -359,7 +445,32 @@ config = {
"threshold": 0.5 "threshold": 0.5
} }
smartsend("control", [("config", config, "dictionary")]) # Use is_publish parameter to control automatic publishing
smartsend("control", [("config", config, "dictionary")], is_publish=True)
```
**Connection Reuse in Python:**
To achieve connection reuse in Python, create a NATS client outside the function and use it in a custom `fileserver_upload_handler` or custom publish implementation:
```python
from nats_bridge import smartsend
import nats
# Create connection once
nc = await nats.connect("nats://localhost:4222")
# Send multiple messages using the same connection
for i in range(100):
config = {"iteration": i, "data": random.random()}
# Option 1: Use is_publish=False and publish manually with your connection
env, env_json_str = smartsend("control", [("config", config, "dictionary")], is_publish=False)
# Publish with your existing connection
await nc.publish("control", env_json_str)
# Close connection when done
await nc.close()
``` ```
### Basic Multi-Payload Example ### Basic Multi-Payload Example
@@ -372,12 +483,12 @@ from nats_bridge import smartsend
smartsend( smartsend(
"/test", "/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")], [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
nats_url="nats://localhost:4222", broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080" fileserver_url="http://localhost:8080"
) )
# Even single payload must be wrapped in a list with type # Even single payload must be wrapped in a list with type
smartsend("/test", [("single_data", mydata, "dictionary")], nats_url="nats://localhost:4222") smartsend("/test", [("single_data", mydata, "dictionary")], broker_url="nats://localhost:4222")
``` ```
#### Python/Micropython (Receiver) #### Python/Micropython (Receiver)
@@ -459,12 +570,127 @@ df = DataFrame(
category = rand(["A", "B", "C"], 10_000_000) category = rand(["A", "B", "C"], 10_000_000)
) )
# Send via smartsend - wrapped in a list (type is part of each tuple) # Send via smartsend - wrapped in list with type
env, env_json_str = smartsend("analysis_results", [("table_data", df, "table")], broker_url="nats://localhost:4222") # Large payload will use link transport (HTTP fileserver)
# env: msg_envelope_v1 object with all metadata and payloads env, env_json_str = smartsend(
# env_json_str: JSON string representation of the envelope for publishing "analysis_results",
[("table_data", df, "table")],
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080"
)
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
``` ```
#### smartsend Function Signature (Julia)
```julia
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, Any, String}, 1}; # List of (dataname, data, type) tuples
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true,
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional)
)
```
**New Keyword Parameter:**
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios.
**Connection Handling Logic:**
```julia
if is_publish == false
# skip publish
elseif is_publish == true && NATS_connection === nothing
publish_message(broker_url, subject, env_json_str, cid) # Creates new connection
elseif is_publish == true && NATS_connection !== nothing
publish_message(NATS_connection, subject, env_json_str, cid) # Uses provided connection
end
```
**Example with pre-existing connection:**
```julia
using NATSBridge
# Create connection once
conn = NATS.connect("nats://localhost:4222")
# Send multiple messages using the same connection
for i in 1:100
data = rand(1000)
smartsend(
"analysis_results",
[("table_data", data, "table")],
NATS_connection=conn, # Reuse connection
is_publish=true
)
end
# Close connection when done
NATS.close(conn)
```
#### publish_message Function
The `publish_message` function provides two overloads for publishing messages to NATS:
**Overload 1 - URL-based publishing (creates new connection):**
```julia
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url) # Create NATS connection
publish_message(conn, subject, message, correlation_id)
end
```
**Overload 2 - Connection-based publishing (uses pre-existing connection):**
```julia
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject")
finally
NATS.drain(conn) # Ensure connection is closed properly
end
end
```
**Use Case:** Use the connection-based overload when you already have an established NATS connection and want to publish multiple messages without the overhead of creating a new connection for each publish.
**Integration with smartsend:**
```julia
# When NATS_connection is provided to smartsend, it uses the connection-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
NATS_connection=my_connection, # Pre-existing connection
is_publish=true
)
# Uses: publish_message(NATS_connection, subject, env_json_str, cid)
# When NATS_connection is not provided, it uses the URL-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
broker_url="nats://localhost:4222",
is_publish=true
)
# Uses: publish_message(broker_url, subject, env_json_str, cid)
```
**API Consistency Note:**
- **Julia:** Uses `NATS_connection` keyword parameter with function overloading for automatic connection management
- **JavaScript/Python:** Use `is_publish` option and achieve connection reuse by creating NATS client outside the function and reusing it in custom handlers or custom publish implementations
#### JavaScript (Receiver) #### JavaScript (Receiver)
```javascript ```javascript
const { smartreceive } = require('./src/NATSBridge'); const { smartreceive } = require('./src/NATSBridge');
@@ -482,19 +708,12 @@ const table = env.payloads;
```python ```python
from nats_bridge import smartsend from nats_bridge import smartsend
# Binary data wrapped in a list # Binary data wrapped in list with type
binary_data = [
("audio_chunk", binary_buffer, "binary")
]
smartsend( smartsend(
"binary_input", "binary_input",
binary_data, [("audio_chunk", binary_buffer, "binary")],
nats_url="nats://localhost:4222", broker_url="nats://localhost:4222",
metadata={ metadata={"sample_rate": 44100, "channels": 1}
"sample_rate": 44100,
"channels": 1
}
) )
``` ```
@@ -558,10 +777,14 @@ function process_binary(msg) {
```julia ```julia
using NATSBridge using NATSBridge
function publish_health_status(nats_url) function publish_health_status(broker_url)
# Send status wrapped in a list (type is part of each tuple) # Send status wrapped in list with type
status = Dict("cpu" => rand(), "memory" => rand()) status = Dict("cpu" => rand(), "memory" => rand())
smartsend("health", [("status", status, "dictionary")], broker_url=nats_url) env, env_json_str = smartsend(
"health",
[("status", status, "dictionary")],
broker_url=broker_url
)
sleep(5) # Every 5 seconds sleep(5) # Every 5 seconds
end end
``` ```
@@ -604,8 +827,8 @@ def handle_device_config(msg):
env = smartreceive(msg) env = smartreceive(msg)
# Process configuration from payloads # Process configuration from payloads
for dataname, data, type in env["payloads"]: for dataname, data, payload_type in env["payloads"]:
if type == "dictionary": if payload_type == "dictionary":
print(f"Received configuration: {data}") print(f"Received configuration: {data}")
# Apply configuration to device # Apply configuration to device
if "wifi_ssid" in data: if "wifi_ssid" in data:
@@ -622,7 +845,7 @@ def handle_device_config(msg):
smartsend( smartsend(
"device/response", "device/response",
[("config", config, "dictionary")], [("config", config, "dictionary")],
nats_url="nats://localhost:4222", broker_url="nats://localhost:4222",
reply_to=env.get("reply_to") reply_to=env.get("reply_to")
) )
``` ```
@@ -670,12 +893,14 @@ options_df = DataFrame(
# Check payload size (< 1MB threshold) # Check payload size (< 1MB threshold)
# Publish directly to NATS with Base64-encoded payload # Publish directly to NATS with Base64-encoded payload
# Include metadata for dashboard selection context # Include metadata for dashboard selection context
smartsend( env, env_json_str = smartsend(
"dashboard.selection", "dashboard.selection",
[("options_table", options_df, "table")], [("options_table", options_df, "table")],
nats_url="nats://localhost:4222", broker_url="nats://localhost:4222",
metadata=Dict("context" => "user_selection") metadata=Dict("context" => "user_selection")
) )
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
``` ```
**JavaScript (Receiver):** **JavaScript (Receiver):**
@@ -709,7 +934,6 @@ await smartsend("dashboard.response", [
**Julia (Sender/Receiver):** **Julia (Sender/Receiver):**
```julia ```julia
using NATSBridge using NATSBridge
using DataFrames
# Build chat message with mixed payloads: # Build chat message with mixed payloads:
# - Text: direct transport (Base64) # - Text: direct transport (Base64)
@@ -733,13 +957,15 @@ chat_message = [
("large_document", large_file_bytes, "binary") # Large file, link transport ("large_document", large_file_bytes, "binary") # Large file, link transport
] ]
smartsend( env, env_json_str = smartsend(
"chat.room123", "chat.room123",
chat_message, chat_message,
broker_url="nats://localhost:4222", broker_url="nats://localhost:4222",
msg_purpose="chat", msg_purpose="chat",
reply_to="chat.room123.responses" reply_to="chat.room123.responses"
) )
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
``` ```
**JavaScript (Sender/Receiver):** **JavaScript (Sender/Receiver):**

9
etc.jl
View File

@@ -0,0 +1,9 @@
Task: Update README.md to reflect recent changes in NATSbridge package.
Context: the package has been updated with the NATS_connection keyword and the publish_message function.
Requirements:
Source of Truth: Treat the updated NATSbridge code as the definitive source. Update README.md to align exactly with these changes.
API Consistency: Ensure the Main Package API (e.g., smartsend(), publish_message()) uses consistent naming across all three supported languages.
Ecosystem Variance: Low-level native functions (e.g., NATS.connect(), JSON.read()) should follow the conventions of the specific language ecosystem and do not require cross-language consistency.

View File

@@ -109,11 +109,11 @@ from nats_bridge import smartsend
# Send a text message (is_publish=True by default) # Send a text message (is_publish=True by default)
data = [("message", "Hello World", "text")] data = [("message", "Hello World", "text")]
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222")
print("Message sent!") print("Message sent!")
# Or use is_publish=False to get envelope and JSON without publishing # Or use is_publish=False to get envelope and JSON without publishing
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222", is_publish=False) env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222", is_publish=False)
# env: MessageEnvelope object # env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS # env_json_str: JSON string for publishing to NATS
``` ```
@@ -126,14 +126,14 @@ const { smartsend } = require('./src/NATSBridge');
// Send a text message (isPublish=true by default) // Send a text message (isPublish=true by default)
await smartsend("/chat/room1", [ await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" } { dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" }); ], { brokerUrl: "nats://localhost:4222" });
console.log("Message sent!"); console.log("Message sent!");
// Or use isPublish=false to get envelope and JSON without publishing // Or use isPublish=false to get envelope and JSON without publishing
const { env, env_json_str } = await smartsend("/chat/room1", [ const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" } { dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222", isPublish: false }); ], { brokerUrl: "nats://localhost:4222", isPublish: false });
// env: MessageEnvelope object // env: MessageEnvelope object
// env_json_str: JSON string for publishing to NATS // env_json_str: JSON string for publishing to NATS
``` ```
@@ -145,8 +145,8 @@ using NATSBridge
# Send a text message # Send a text message
data = [("message", "Hello World", "text")] data = [("message", "Hello World", "text")]
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222")
# env: msgEnvelope_v1 object with all metadata and payloads # env: msg_envelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing # env_json_str: JSON string representation of the envelope for publishing
println("Message sent!") println("Message sent!")
``` ```
@@ -208,7 +208,7 @@ config = {
# Send as dictionary type # Send as dictionary type
data = [("config", config, "dictionary")] data = [("config", config, "dictionary")]
env, env_json_str = smartsend("/device/config", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/device/config", data, broker_url="nats://localhost:4222")
``` ```
#### JavaScript #### JavaScript
@@ -224,7 +224,7 @@ const config = {
const { env, env_json_str } = await smartsend("/device/config", [ const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" } { dataname: "config", data: config, type: "dictionary" }
]); ], { brokerUrl: "nats://localhost:4222" });
``` ```
#### Julia #### Julia
@@ -239,7 +239,7 @@ config = Dict(
) )
data = [("config", config, "dictionary")] data = [("config", config, "dictionary")]
env, env_json_str = smartsend("/device/config", data) env, env_json_str = smartsend("/device/config", data, broker_url="nats://localhost:4222")
``` ```
### Example 2: Sending Binary Data (Image) ### Example 2: Sending Binary Data (Image)
@@ -255,7 +255,7 @@ with open("image.png", "rb") as f:
# Send as binary type # Send as binary type
data = [("user_image", image_data, "binary")] data = [("user_image", image_data, "binary")]
env, env_json_str = smartsend("/chat/image", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/chat/image", data, broker_url="nats://localhost:4222")
``` ```
#### JavaScript #### JavaScript
@@ -269,7 +269,7 @@ const image_data = fs.readFileSync('image.png');
const { env, env_json_str } = await smartsend("/chat/image", [ const { env, env_json_str } = await smartsend("/chat/image", [
{ dataname: "user_image", data: image_data, type: "binary" } { dataname: "user_image", data: image_data, type: "binary" }
]); ], { brokerUrl: "nats://localhost:4222" });
``` ```
#### Julia #### Julia
@@ -281,7 +281,7 @@ using NATSBridge
image_data = read("image.png") image_data = read("image.png")
data = [("user_image", image_data, "binary")] data = [("user_image", image_data, "binary")]
env, env_json_str = smartsend("/chat/image", data) env, env_json_str = smartsend("/chat/image", data, broker_url="nats://localhost:4222")
``` ```
### Example 3: Request-Response Pattern ### Example 3: Request-Response Pattern
@@ -296,11 +296,11 @@ data = [("command", {"action": "read_sensor"}, "dictionary")]
env, env_json_str = smartsend( env, env_json_str = smartsend(
"/device/command", "/device/command",
data, data,
nats_url="nats://localhost:4222", broker_url="nats://localhost:4222",
reply_to="/device/response", reply_to="/device/response",
reply_to_msg_id="cmd-001" reply_to_msg_id="cmd-001"
) )
# env: msgEnvelope_v1 object # env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS # env_json_str: JSON string for publishing to NATS
``` ```
@@ -361,7 +361,7 @@ large_data = os.urandom(2_000_000) # 2MB of random data
env, env_json_str = smartsend( env, env_json_str = smartsend(
"/data/large", "/data/large",
[("large_file", large_data, "binary")], [("large_file", large_data, "binary")],
nats_url="nats://localhost:4222", broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080", fileserver_url="http://localhost:8080",
size_threshold=1_000_000 size_threshold=1_000_000
) )
@@ -383,6 +383,7 @@ view.fill(42); // Fill with some data
const { env, env_json_str } = await smartsend("/data/large", [ const { env, env_json_str } = await smartsend("/data/large", [
{ dataname: "large_file", data: largeData, type: "binary" } { dataname: "large_file", data: largeData, type: "binary" }
], { ], {
brokerUrl: "nats://localhost:4222",
fileserverUrl: "http://localhost:8080", fileserverUrl: "http://localhost:8080",
sizeThreshold: 1_000_000 sizeThreshold: 1_000_000
}); });
@@ -399,6 +400,7 @@ large_data = rand(UInt8, 2_000_000)
env, env_json_str = smartsend( env, env_json_str = smartsend(
"/data/large", "/data/large",
[("large_file", large_data, "binary")], [("large_file", large_data, "binary")],
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080" fileserver_url="http://localhost:8080"
) )
@@ -425,7 +427,7 @@ data = [
("user_avatar", image_data, "image") ("user_avatar", image_data, "image")
] ]
env, env_json_str = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/chat/mixed", data, broker_url="nats://localhost:4222")
``` ```
#### JavaScript #### JavaScript
@@ -446,7 +448,7 @@ const { env, env_json_str } = await smartsend("/chat/mixed", [
data: fs.readFileSync("avatar.png"), data: fs.readFileSync("avatar.png"),
type: "image" type: "image"
} }
]); ], { brokerUrl: "nats://localhost:4222" });
``` ```
#### Julia #### Julia
@@ -461,7 +463,7 @@ data = [
("user_avatar", image_data, "image") ("user_avatar", image_data, "image")
] ]
env, env_json_str = smartsend("/chat/mixed", data) env, env_json_str = smartsend("/chat/mixed", data, broker_url="nats://localhost:4222")
``` ```
### Example 6: Table Data (Arrow IPC) ### Example 6: Table Data (Arrow IPC)
@@ -483,7 +485,7 @@ df = pd.DataFrame({
# Send as table type # Send as table type
data = [("students", df, "table")] data = [("students", df, "table")]
env, env_json_str = smartsend("/data/students", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/data/students", data, broker_url="nats://localhost:4222")
``` ```
#### Julia #### Julia
@@ -500,7 +502,7 @@ df = DataFrame(
) )
data = [("students", df, "table")] data = [("students", df, "table")]
env, env_json_str = smartsend("/data/students", data) env, env_json_str = smartsend("/data/students", data, broker_url="nats://localhost:4222")
``` ```
--- ---
@@ -519,7 +521,7 @@ using NATSBridge
# Send dictionary from Julia to JavaScript # Send dictionary from Julia to JavaScript
config = Dict("step_size" => 0.01, "iterations" => 1000) config = Dict("step_size" => 0.01, "iterations" => 1000)
data = [("config", config, "dictionary")] data = [("config", config, "dictionary")]
env, env_json_str = smartsend("/analysis/config", data, nats_url="nats://localhost:4222") env, env_json_str = smartsend("/analysis/config", data, broker_url="nats://localhost:4222")
``` ```
#### JavaScript Receiver #### JavaScript Receiver
@@ -546,7 +548,7 @@ const { smartsend } = require('./src/NATSBridge');
const { env, env_json_str } = await smartsend("/data/transfer", [ const { env, env_json_str } = await smartsend("/data/transfer", [
{ dataname: "message", data: "Hello from JS!", type: "text" } { dataname: "message", data: "Hello from JS!", type: "text" }
]); ], { brokerUrl: "nats://localhost:4222" });
``` ```
#### Python Receiver #### Python Receiver
@@ -568,7 +570,7 @@ for dataname, data, type in env["payloads"]:
from nats_bridge import smartsend from nats_bridge import smartsend
data = [("message", "Hello from Python!", "text")] data = [("message", "Hello from Python!", "text")]
env, env_json_str = smartsend("/chat/python", data) env, env_json_str = smartsend("/chat/python", data, broker_url="nats://localhost:4222")
``` ```
#### Julia Receiver #### Julia Receiver
@@ -576,7 +578,7 @@ env, env_json_str = smartsend("/chat/python", data)
```julia ```julia
using NATSBridge using NATSBridge
env = smartreceive(msg, fileserverDownloadHandler) env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
for (dataname, data, type) in env["payloads"] for (dataname, data, type) in env["payloads"]
if type == "text" if type == "text"
println("Received from Python: $data") println("Received from Python: $data")

View File

@@ -136,6 +136,7 @@ class ChatUI {
`/chat/${this.currentRoom}`, `/chat/${this.currentRoom}`,
data, data,
{ {
brokerUrl: window.config.broker_url,
fileserverUrl: window.config.fileserver_url, fileserverUrl: window.config.fileserver_url,
sizeThreshold: window.config.size_threshold sizeThreshold: window.config.size_threshold
} }
@@ -288,8 +289,8 @@ Let's build a file transfer system that handles large files efficiently.
const { smartsend } = require('./NATSBridge'); const { smartsend } = require('./NATSBridge');
class FileUploadService { class FileUploadService {
constructor(natsUrl, fileserverUrl) { constructor(brokerUrl, fileserverUrl) {
this.natsUrl = natsUrl; this.brokerUrl = brokerUrl;
this.fileserverUrl = fileserverUrl; this.fileserverUrl = fileserverUrl;
} }
@@ -308,7 +309,7 @@ class FileUploadService {
`/files/${recipient}`, `/files/${recipient}`,
data, data,
{ {
natsUrl: this.natsUrl, brokerUrl: this.brokerUrl,
fileserverUrl: this.fileserverUrl, fileserverUrl: this.fileserverUrl,
sizeThreshold: 1048576 sizeThreshold: 1048576
} }
@@ -419,7 +420,7 @@ async function uploadFile(config) {
const filePath = await rl.question('Enter file path: '); const filePath = await rl.question('Enter file path: ');
const recipient = await rl.question('Enter recipient: '); const recipient = await rl.question('Enter recipient: ');
const fileService = new FileUploadService(config.nats_url, config.fileserver_url); const fileService = new FileUploadService(config.broker_url, config.fileserver_url);
try { try {
const env = await fileService.uploadFile(filePath, recipient); const env = await fileService.uploadFile(filePath, recipient);
@@ -500,8 +501,8 @@ import time
import random import random
class SensorSender: class SensorSender:
def __init__(self, nats_url: str, fileserver_url: str): def __init__(self, broker_url: str, fileserver_url: str):
self.nats_url = nats_url self.broker_url = broker_url
self.fileserver_url = fileserver_url self.fileserver_url = fileserver_url
def send_reading(self, sensor_id: str, value: float, unit: str): def send_reading(self, sensor_id: str, value: float, unit: str):
@@ -518,7 +519,7 @@ class SensorSender:
smartsend( smartsend(
f"/sensors/{sensor_id}", f"/sensors/{sensor_id}",
data, data,
nats_url=self.nats_url, broker_url=self.broker_url,
fileserver_url=self.fileserver_url fileserver_url=self.fileserver_url
) )
@@ -537,7 +538,7 @@ class SensorSender:
env, env_json_str = smartsend( env, env_json_str = smartsend(
f"/sensors/{sensor_id}/prepare", f"/sensors/{sensor_id}/prepare",
data, data,
nats_url=self.nats_url, broker_url=self.broker_url,
fileserver_url=self.fileserver_url, fileserver_url=self.fileserver_url,
is_publish=False is_publish=False
) )
@@ -572,7 +573,7 @@ class SensorSender:
smartsend( smartsend(
f"/sensors/batch", f"/sensors/batch",
data, data,
nats_url=self.nats_url, broker_url=self.broker_url,
fileserver_url=self.fileserver_url fileserver_url=self.fileserver_url
) )
else: else:
@@ -631,17 +632,17 @@ Let's build an IoT device using Micropython that connects to NATS.
import json import json
class DeviceConfig: class DeviceConfig:
def __init__(self, ssid, password, nats_url, device_id): def __init__(self, ssid, password, broker_url, device_id):
self.ssid = ssid self.ssid = ssid
self.password = password self.password = password
self.nats_url = nats_url self.broker_url = broker_url
self.device_id = device_id self.device_id = device_id
def to_dict(self): def to_dict(self):
return { return {
"ssid": self.ssid, "ssid": self.ssid,
"password": self.password, "password": self.password,
"nats_url": self.nats_url, "broker_url": self.broker_url,
"device_id": self.device_id "device_id": self.device_id
} }
``` ```
@@ -656,7 +657,7 @@ import json
class DeviceBridge: class DeviceBridge:
def __init__(self, config): def __init__(self, config):
self.config = config self.config = config
self.nats_url = config.nats_url self.broker_url = config.broker_url
def connect(self): def connect(self):
# Connect to WiFi # Connect to WiFi
@@ -676,7 +677,7 @@ class DeviceBridge:
smartsend( smartsend(
f"/devices/{self.config.device_id}/status", f"/devices/{self.config.device_id}/status",
data, data,
nats_url=self.nats_url broker_url=self.broker_url
) )
def send_sensor_data(self, sensor_id, value, unit): def send_sensor_data(self, sensor_id, value, unit):
@@ -687,7 +688,7 @@ class DeviceBridge:
smartsend( smartsend(
f"/devices/{self.config.device_id}/sensors/{sensor_id}", f"/devices/{self.config.device_id}/sensors/{sensor_id}",
data, data,
nats_url=self.nats_url broker_url=self.broker_url
) )
def receive_commands(self, callback): def receive_commands(self, callback):
@@ -725,7 +726,7 @@ import random
config = DeviceConfig( config = DeviceConfig(
ssid="MyNetwork", ssid="MyNetwork",
password="password123", password="password123",
nats_url="nats://localhost:4222", broker_url="nats://localhost:4222",
device_id="device-001" device_id="device-001"
) )
@@ -774,8 +775,8 @@ import pyarrow as pa
import io import io
class DashboardServer: class DashboardServer:
def __init__(self, nats_url, fileserver_url): def __init__(self, broker_url, fileserver_url):
self.nats_url = nats_url self.broker_url = broker_url
self.fileserver_url = fileserver_url self.fileserver_url = fileserver_url
def broadcast_data(self, df): def broadcast_data(self, df):
@@ -792,7 +793,7 @@ class DashboardServer:
smartsend( smartsend(
"/dashboard/data", "/dashboard/data",
data, data,
nats_url=self.nats_url, broker_url=self.broker_url,
fileserver_url=self.fileserver_url fileserver_url=self.fileserver_url
) )
@@ -836,6 +837,7 @@ class DashboardUI {
const { env, env_json_str } = await smartsend("/dashboard/request", [ const { env, env_json_str } = await smartsend("/dashboard/request", [
{ dataname: "request", data: { type: "refresh" }, type: "dictionary" } { dataname: "request", data: { type: "refresh" }, type: "dictionary" }
], { ], {
brokerUrl: window.config.broker_url,
fileserverUrl: window.config.fileserver_url fileserverUrl: window.config.fileserver_url
}); });
} }
@@ -954,7 +956,7 @@ def send_batch_readings(self, readings):
smartsend( smartsend(
"/sensors/batch", "/sensors/batch",
[("batch", arrow_data, "table")], [("batch", arrow_data, "table")],
nats_url=self.nats_url broker_url=self.broker_url
) )
``` ```

View File

@@ -380,9 +380,10 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
- `sender_name::String = "NATSBridge"` - Name of the sender - `sender_name::String = "NATSBridge"` - Name of the sender
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast) - `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
- `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast) - `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast)
- `reply_to::String = ""` - Topic to reply to (empty string if no reply expected) - `reply_to::String = ""` - Topic to reply to (empty string if no reply expected)
- `reply_to_msg_id::String = ""` - Message ID this message is replying to - `reply_to_msg_id::String = ""` - Message ID this message is replying to
- `is_publish::Bool = true` - Whether to automatically publish the message to NATS - `is_publish::Bool = true` - Whether to automatically publish the message to NATS
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection (if provided, uses this connection instead of creating a new one; saves connection establishment overhead)
# Return: # Return:
- A tuple `(env, env_json_str)` where: - A tuple `(env, env_json_str)` where:
@@ -431,12 +432,12 @@ function smartsend(
receiver_id::String = "", receiver_id::String = "",
reply_to::String = "", reply_to::String = "",
reply_to_msg_id::String = "", reply_to_msg_id::String = "",
is_publish::Bool = true # some time the user want to get env and env_json_str from this function without publishing the msg is_publish::Bool = true, # some time the user want to get env and env_json_str from this function without publishing the msg
NATS_connection::Union{NATS.Connection, Nothing} = nothing # a provided connection saves establishing connection overhead.
) where {T1<:Any} ) where {T1<:Any}
# Generate correlation ID if not provided # Generate correlation ID if not provided
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
# Generate message metadata # Generate message metadata
@@ -516,8 +517,12 @@ function smartsend(
) )
env_json_str = envelope_to_json(env) # Convert envelope to JSON env_json_str = envelope_to_json(env) # Convert envelope to JSON
if is_publish if is_publish == false
# skip publish a message
elseif is_publish == true && NATS_connection === nothing
publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS
elseif is_publish == true && NATS_connection !== nothing
publish_message(NATS_connection, subject, env_json_str, cid) # Publish message to NATS
end end
return (env, env_json_str) return (env, env_json_str)
@@ -649,7 +654,7 @@ end
""" publish_message - Publish message to NATS """ publish_message - Publish message to NATS
This internal function publishes a message to a NATS subject with proper This function publishes a message to a NATS subject with proper
connection management and logging. connection management and logging.
# Arguments: # Arguments:
@@ -662,18 +667,52 @@ connection management and logging.
- `nothing` - This function performs publishing but returns nothing - `nothing` - This function performs publishing but returns nothing
# Example # Example
```jldoctest ```jldoctest
using NATS using NATS
# Prepare JSON message # Prepare JSON message
message = "{\"correlation_id\":\"abc123\",\"payload\":\"test\"}" message = "{\"correlation_id\":\"abc123\",\"payload\":\"test\"}"
# Publish to NATS # Publish to NATS
publish_message("nats://localhost:4222", "my.subject", message, "abc123") publish_message("nats://localhost:4222", "my.subject", message, "abc123")
``` ```
""" """
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url) # Create NATS connection conn = NATS.connect(broker_url) # Create NATS connection
publish_message(conn, subject, message, correlation_id)
end
""" publish_message - Publish message to NATS using pre-existing connection
This function publishes a message to a NATS subject using a pre-existing NATS connection,
avoiding the overhead of connection establishment.
# Arguments:
- `conn::NATS.Connection` - Pre-existing NATS connection
- `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
- `message::String` - JSON message to publish
- `correlation_id::String` - Correlation ID for tracing and logging
# Return:
- `nothing` - This function performs publishing but returns nothing
# Example
```jldoctest
using NATS
# Prepare JSON message
message = "{\"correlation_id\":\"abc123\",\"payload\":\"test\"}"
# Create connection once and reuse for multiple publishes
conn = NATS.connect("nats://localhost:4222")
publish_message(conn, "my.subject", message, "abc123")
# Connection is automatically drained after publish
```
# Use Case:
Use this version when you already have an established NATS connection and want to publish
multiple messages without the overhead of creating a new connection for each publish.
"""
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try try
NATS.publish(conn, subject, message) # Publish message to NATS NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject") # Log successful publish log_trace(correlation_id, "Message published to $subject") # Log successful publish
@@ -706,7 +745,7 @@ A HTTP file server is required along with its download function.
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms - `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
# Return: # Return:
- `Vector{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples - JSON object of envelope with list of (dataname, data, data_type) tuples in payloads field
# Example # Example
```jldoctest ```jldoctest
@@ -724,22 +763,22 @@ function smartreceive(
max_delay::Int = 5000 max_delay::Int = 5000
) )
# Parse the JSON envelope # Parse the JSON envelope
json_data = JSON.parse(String(msg.payload)) env_json_obj = JSON.parse(String(msg.payload))
log_trace(json_data["correlation_id"], "Processing received message") # Log message processing start log_trace(env_json_obj["correlation_id"], "Processing received message") # Log message processing start
# Process all payloads in the envelope # Process all payloads in the envelope
payloads_list = Tuple{String, Any, String}[] payloads_list = Tuple{String, Any, String}[]
# Get number of payloads # Get number of payloads
num_payloads = length(json_data["payloads"]) num_payloads = length(env_json_obj["payloads"])
for i in 1:num_payloads for i in 1:num_payloads
payload = json_data["payloads"][i] payload = env_json_obj["payloads"][i]
transport = String(payload["transport"]) transport = String(payload["transport"])
dataname = String(payload["dataname"]) dataname = String(payload["dataname"])
if transport == "direct" # Direct transport - payload is in the message if transport == "direct" # Direct transport - payload is in the message
log_trace(json_data["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
# Extract base64 payload from the payload # Extract base64 payload from the payload
payload_b64 = String(payload["data"]) payload_b64 = String(payload["data"])
@@ -749,28 +788,28 @@ function smartreceive(
# Deserialize based on type # Deserialize based on type
data_type = String(payload["payload_type"]) data_type = String(payload["payload_type"])
data = _deserialize_data(payload_bytes, data_type, json_data["correlation_id"]) data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"])
push!(payloads_list, (dataname, data, data_type)) push!(payloads_list, (dataname, data, data_type))
elseif transport == "link" # Link transport - payload is at URL elseif transport == "link" # Link transport - payload is at URL
# Extract download URL from the payload # Extract download URL from the payload
url = String(payload["data"]) url = String(payload["data"])
log_trace(json_data["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
# Fetch with exponential backoff using the download handler # Fetch with exponential backoff using the download handler
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, json_data["correlation_id"]) downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"])
# Deserialize based on type # Deserialize based on type
data_type = String(payload["payload_type"]) data_type = String(payload["payload_type"])
data = _deserialize_data(downloaded_data, data_type, json_data["correlation_id"]) data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"])
push!(payloads_list, (dataname, data, data_type)) push!(payloads_list, (dataname, data, data_type))
else # Unknown transport type else # Unknown transport type
error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
end end
end end
json_data["payloads"] = payloads_list env_json_obj["payloads"] = payloads_list
return json_data # Return envelope with list of (dataname, data, data_type) tuples in payloads field return env_json_obj # JSON object of envelope with list of (dataname, data, data_type) tuples in payloads field
end end
@@ -915,7 +954,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
# Arguments: # Arguments:
- `file_server_url::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) - `file_server_url::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `filename::String` - Name of the file being uploaded - `dataname::String` - Name of the file being uploaded
- `data::Vector{UInt8}` - Raw byte data of the file content - `data::Vector{UInt8}` - Raw byte data of the file content
# Return: # Return:
@@ -929,18 +968,18 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
```jldoctest ```jldoctest
using HTTP, JSON using HTTP, JSON
file_server_url = "http://localhost:8080" fileserver_url = "http://localhost:8080"
filename = "test.txt" dataname = "test.txt"
data = Vector{UInt8}("hello world") data = Vector{UInt8}("hello world")
# Upload to local plik server # Upload to local plik server
result = plik_oneshot_upload(file_server_url, filename, data) result = plik_oneshot_upload(file_server_url, dataname, data)
# Access the result as a Dict # Access the result as a Dict
# result["status"], result["uploadid"], result["fileid"], result["url"] # result["status"], result["uploadid"], result["fileid"], result["url"]
``` ```
""" """
function plik_oneshot_upload(file_server_url::String, filename::String, data::Vector{UInt8}) function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8})
# ----------------------------------------- get upload id ---------------------------------------- # # ----------------------------------------- get upload id ---------------------------------------- #
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload # Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
@@ -954,7 +993,7 @@ function plik_oneshot_upload(file_server_url::String, filename::String, data::Ve
# ------------------------------------------ upload file ----------------------------------------- # # ------------------------------------------ upload file ----------------------------------------- #
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID # Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload
url_upload = "$file_server_url/file/$uploadid" url_upload = "$file_server_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken] headers = ["X-UploadToken" => uploadtoken]
@@ -974,7 +1013,7 @@ function plik_oneshot_upload(file_server_url::String, filename::String, data::Ve
fileid = response_json["id"] fileid = response_json["id"]
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
url = "$file_server_url/file/$uploadid/$fileid/$filename" url = "$file_server_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end end
@@ -1006,7 +1045,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
```jldoctest ```jldoctest
using HTTP, JSON using HTTP, JSON
file_server_url = "http://localhost:8080" fileserver_url = "http://localhost:8080"
filepath = "./test.zip" filepath = "./test.zip"
# Upload to local plik server # Upload to local plik server
@@ -1056,7 +1095,7 @@ function plik_oneshot_upload(file_server_url::String, filepath::String)
end end
function _get_payload_bytes(data) function _get_payload_bytes(data)
@error "didn't implement yet" @error "Didn't implement yet. The developer will implement this function later."
end end

View File

@@ -9,16 +9,16 @@
* File Server Handler Architecture: * File Server Handler Architecture:
* The system uses handler functions to abstract file server operations, allowing support * The system uses handler functions to abstract file server operations, allowing support
* for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). * for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
* *
* Handler Function Signatures: * Handler Function Signatures:
* *
* ```javascript * ```javascript
* // Upload handler - uploads data to file server and returns URL * // Upload handler - uploads data to file server and returns URL
* // The handler is passed to smartsend as fileserverUploadHandler parameter * // The handler is passed to smartsend as fileserverUploadHandler parameter
* // It receives: (fileserver_url, dataname, data) * // It receives: (fileserver_url, dataname, data)
* // Returns: { status, uploadid, fileid, url } * // Returns: { status, uploadid, fileid, url }
* async function fileserverUploadHandler(fileserver_url, dataname, data) { ... } * async function plik_oneshot_upload(fileserver_url, dataname, data) { ... }
* *
* // Download handler - fetches data from file server URL with exponential backoff * // Download handler - fetches data from file server URL with exponential backoff
* // The handler is passed to smartreceive as fileserverDownloadHandler parameter * // The handler is passed to smartreceive as fileserverDownloadHandler parameter
* // It receives: (url, max_retries, base_delay, max_delay, correlation_id) * // It receives: (url, max_retries, base_delay, max_delay, correlation_id)
@@ -98,6 +98,26 @@ function base64ToArrayBuffer(base64) {
return bytes.buffer; return bytes.buffer;
} }
// Helper: Convert Uint8Array to Base64 string
function uint8ArrayToBase64(uint8array) {
let binary = '';
for (let i = 0; i < uint8array.byteLength; i++) {
binary += String.fromCharCode(uint8array[i]);
}
return btoa(binary);
}
// Helper: Convert Base64 string to Uint8Array
function base64ToUint8Array(base64) {
const binaryString = atob(base64);
const len = binaryString.length;
const bytes = new Uint8Array(len);
for (let i = 0; i < len; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
return bytes;
}
// Helper: Serialize data based on type // Helper: Serialize data based on type
function _serialize_data(data, type) { function _serialize_data(data, type) {
/** /**
@@ -114,39 +134,39 @@ function _serialize_data(data, type) {
*/ */
if (type === "text") { if (type === "text") {
if (typeof data === 'string') { if (typeof data === 'string') {
return new TextEncoder().encode(data).buffer; return new TextEncoder().encode(data);
} else { } else {
throw new Error("Text data must be a String"); throw new Error("Text data must be a String");
} }
} else if (type === "dictionary") { } else if (type === "dictionary") {
// JSON data - serialize directly // JSON data - serialize directly
const jsonStr = JSON.stringify(data); const jsonStr = JSON.stringify(data);
return new TextEncoder().encode(jsonStr).buffer; return new TextEncoder().encode(jsonStr);
} else if (type === "table") { } else if (type === "table") {
// Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript) // Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
// This would require the apache-arrow library // This would require the apache-arrow library
throw new Error("Table serialization requires apache-arrow library"); throw new Error("Table serialization requires apache-arrow library");
} else if (type === "image") { } else if (type === "image") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) { if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer; return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else { } else {
throw new Error("Image data must be ArrayBuffer or Uint8Array"); throw new Error("Image data must be ArrayBuffer or Uint8Array");
} }
} else if (type === "audio") { } else if (type === "audio") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) { if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer; return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else { } else {
throw new Error("Audio data must be ArrayBuffer or Uint8Array"); throw new Error("Audio data must be ArrayBuffer or Uint8Array");
} }
} else if (type === "video") { } else if (type === "video") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) { if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer; return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else { } else {
throw new Error("Video data must be ArrayBuffer or Uint8Array"); throw new Error("Video data must be ArrayBuffer or Uint8Array");
} }
} else if (type === "binary") { } else if (type === "binary") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) { if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer; return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else { } else {
throw new Error("Binary data must be ArrayBuffer or Uint8Array"); throw new Error("Binary data must be ArrayBuffer or Uint8Array");
} }
@@ -171,10 +191,10 @@ function _deserialize_data(data, type, correlation_id) {
*/ */
if (type === "text") { if (type === "text") {
const decoder = new TextDecoder(); const decoder = new TextDecoder();
return decoder.decode(new Uint8Array(data)); return decoder.decode(data);
} else if (type === "dictionary") { } else if (type === "dictionary") {
const decoder = new TextDecoder(); const decoder = new TextDecoder();
const jsonStr = decoder.decode(new Uint8Array(data)); const jsonStr = decoder.decode(data);
return JSON.parse(jsonStr); return JSON.parse(jsonStr);
} else if (type === "table") { } else if (type === "table") {
// Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript) // Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
@@ -193,73 +213,16 @@ function _deserialize_data(data, type, correlation_id) {
} }
// Helper: Upload data to file server // Helper: Upload data to file server
// Internal wrapper that adds correlation_id logging for smartsend
async function _upload_to_fileserver(fileserver_url, dataname, data, correlation_id) { async function _upload_to_fileserver(fileserver_url, dataname, data, correlation_id) {
/** /**
* Upload data to HTTP file server (plik-like API) * Internal upload helper - wraps plik_oneshot_upload to add correlation_id logging
* * This allows smartsend to pass correlation_id for tracing without changing the handler signature
* This function implements the plik one-shot upload mode:
* 1. Creates a one-shot upload session by sending POST request with {"OneShot": true}
* 2. Uploads the file data as multipart form data
* 3. Returns identifiers and download URL for the uploaded file
*/ */
log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`); log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`);
const result = await plik_oneshot_upload(fileserver_url, dataname, data);
// Step 1: Get upload ID and token log_trace(correlation_id, `Uploaded to URL: ${result.url}`);
const url_getUploadID = `${fileserver_url}/upload`; return result;
const headers = {
"Content-Type": "application/json"
};
const body = JSON.stringify({ OneShot: true });
let response = await fetch(url_getUploadID, {
method: "POST",
headers: headers,
body: body
});
if (!response.ok) {
throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`);
}
const responseJson = await response.json();
const uploadid = responseJson.id;
const uploadtoken = responseJson.uploadToken;
// Step 2: Upload file data
const url_upload = `${fileserver_url}/file/${uploadid}`;
// Create multipart form data
const formData = new FormData();
// Create a Blob from the ArrayBuffer
const blob = new Blob([data], { type: "application/octet-stream" });
formData.append("file", blob, dataname);
response = await fetch(url_upload, {
method: "POST",
headers: {
"X-UploadToken": uploadtoken
},
body: formData
});
if (!response.ok) {
throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`);
}
const fileResponseJson = await response.json();
const fileid = fileResponseJson.id;
// Build the download URL
const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
log_trace(correlation_id, `Uploaded to URL: ${url}`);
return {
status: response.status,
uploadid: uploadid,
fileid: fileid,
url: url
};
} }
// Helper: Fetch data from URL with exponential backoff // Helper: Fetch data from URL with exponential backoff
@@ -276,7 +239,7 @@ async function _fetch_with_backoff(url, max_retries, base_delay, max_delay, corr
if (response.status === 200) { if (response.status === 200) {
log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`); log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`);
const arrayBuffer = await response.arrayBuffer(); const arrayBuffer = await response.arrayBuffer();
return arrayBuffer; return new Uint8Array(arrayBuffer);
} else { } else {
throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`); throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`);
} }
@@ -306,25 +269,26 @@ function _get_payload_bytes(data) {
} }
} }
// MessagePayload class // MessagePayload class - matches msg_payload_v1 Julia struct
class MessagePayload { class MessagePayload {
/** /**
* Represents a single payload in the message envelope * Represents a single payload in the message envelope
* Matches Julia's msg_payload_v1 struct
* *
* @param {Object} options - Payload options * @param {Object} options - Payload options
* @param {string} options.id - ID of this payload (e.g., "uuid4") * @param {string} options.id - ID of this payload (e.g., "uuid4")
* @param {string} options.dataname - Name of this payload (e.g., "login_image") * @param {string} options.dataname - Name of this payload (e.g., "login_image")
* @param {string} options.type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" * @param {string} options.payload_type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
* @param {string} options.transport - "direct" or "link" * @param {string} options.transport - "direct" or "link"
* @param {string} options.encoding - "none", "json", "base64", "arrow-ipc" * @param {string} options.encoding - "none", "json", "base64", "arrow-ipc"
* @param {number} options.size - Data size in bytes * @param {number} options.size - Data size in bytes
* @param {string|ArrayBuffer} options.data - Payload data (direct) or URL (link) * @param {string|Uint8Array} options.data - Payload data (Uint8Array for direct, URL string for link)
* @param {Object} options.metadata - Metadata for this payload * @param {Object} options.metadata - Metadata for this payload
*/ */
constructor(options) { constructor(options) {
this.id = options.id || uuid4(); this.id = options.id || uuid4();
this.dataname = options.dataname; this.dataname = options.dataname;
this.type = options.type; this.payload_type = options.payload_type;
this.transport = options.transport; this.transport = options.transport;
this.encoding = options.encoding; this.encoding = options.encoding;
this.size = options.size; this.size = options.size;
@@ -332,27 +296,27 @@ class MessagePayload {
this.metadata = options.metadata || {}; this.metadata = options.metadata || {};
} }
// Convert to JSON object // Convert to JSON object - uses snake_case to match Julia API
toJSON() { toJSON() {
const obj = { const obj = {
id: this.id, id: this.id,
dataname: this.dataname, dataname: this.dataname,
type: this.type, payload_type: this.payload_type,
transport: this.transport, transport: this.transport,
encoding: this.encoding, encoding: this.encoding,
size: this.size size: this.size
}; };
// Include data based on transport type // Include data based on transport type
if (this.transport === "direct" && this.data !== null) { if (this.transport === "direct" && this.data !== null && this.data !== undefined) {
if (this.encoding === "base64" || this.encoding === "json") { if (this.encoding === "base64" || this.encoding === "json") {
obj.data = this.data; obj.data = this.data;
} else { } else {
// For other encodings, use base64 // For other encodings, use base64
const payloadBytes = _get_payload_bytes(this.data); const payloadBytes = _get_payload_bytes(this.data);
obj.data = arrayBufferToBase64(payloadBytes); obj.data = uint8ArrayToBase64(payloadBytes);
} }
} else if (this.transport === "link" && this.data !== null) { } else if (this.transport === "link" && this.data !== null && this.data !== undefined) {
// For link transport, data is a URL string // For link transport, data is a URL string
obj.data = this.data; obj.data = this.data;
} }
@@ -365,59 +329,60 @@ class MessagePayload {
} }
} }
// MessageEnvelope class // MessageEnvelope class - matches msg_envelope_v1 Julia struct
class MessageEnvelope { class MessageEnvelope {
/** /**
* Represents the message envelope containing metadata and payloads * Represents the message envelope containing metadata and payloads
* Matches Julia's msg_envelope_v1 struct
* *
* @param {Object} options - Envelope options * @param {Object} options - Envelope options
* @param {string} options.sendTo - Topic/subject the sender sends to * @param {string} options.correlation_id - Unique identifier to track messages
* @param {Array<MessagePayload>} options.payloads - Array of payloads * @param {string} options.msg_id - This message id
* @param {string} options.correlationId - Unique identifier to track messages
* @param {string} options.msgId - This message id
* @param {string} options.timestamp - Message published timestamp * @param {string} options.timestamp - Message published timestamp
* @param {string} options.msgPurpose - Purpose of this message * @param {string} options.send_to - Topic/subject the sender sends to
* @param {string} options.senderName - Name of the sender * @param {string} options.msg_purpose - Purpose of this message
* @param {string} options.senderId - UUID of the sender * @param {string} options.sender_name - Name of the sender
* @param {string} options.receiverName - Name of the receiver * @param {string} options.sender_id - UUID of the sender
* @param {string} options.receiverId - UUID of the receiver * @param {string} options.receiver_name - Name of the receiver
* @param {string} options.replyTo - Topic to reply to * @param {string} options.receiver_id - UUID of the receiver
* @param {string} options.replyToMsgId - Message id this message is replying to * @param {string} options.reply_to - Topic to reply to
* @param {string} options.brokerURL - NATS server address * @param {string} options.reply_to_msg_id - Message id this message is replying to
* @param {string} options.broker_url - NATS server address
* @param {Object} options.metadata - Metadata for the envelope * @param {Object} options.metadata - Metadata for the envelope
* @param {Array<MessagePayload>} options.payloads - Array of payloads
*/ */
constructor(options) { constructor(options) {
this.correlationId = options.correlationId || uuid4(); this.correlation_id = options.correlation_id || uuid4();
this.msgId = options.msgId || uuid4(); this.msg_id = options.msg_id || uuid4();
this.timestamp = options.timestamp || new Date().toISOString(); this.timestamp = options.timestamp || new Date().toISOString();
this.sendTo = options.sendTo; this.send_to = options.send_to;
this.msgPurpose = options.msgPurpose || ""; this.msg_purpose = options.msg_purpose || "";
this.senderName = options.senderName || ""; this.sender_name = options.sender_name || "";
this.senderId = options.senderId || uuid4(); this.sender_id = options.sender_id || uuid4();
this.receiverName = options.receiverName || ""; this.receiver_name = options.receiver_name || "";
this.receiverId = options.receiverId || ""; this.receiver_id = options.receiver_id || "";
this.replyTo = options.replyTo || ""; this.reply_to = options.reply_to || "";
this.replyToMsgId = options.replyToMsgId || ""; this.reply_to_msg_id = options.reply_to_msg_id || "";
this.brokerURL = options.brokerURL || DEFAULT_NATS_URL; this.broker_url = options.broker_url || DEFAULT_NATS_URL;
this.metadata = options.metadata || {}; this.metadata = options.metadata || {};
this.payloads = options.payloads || []; this.payloads = options.payloads || [];
} }
// Convert to JSON string // Convert to JSON object - uses snake_case to match Julia API
toJSON() { toJSON() {
const obj = { const obj = {
correlationId: this.correlationId, correlation_id: this.correlation_id,
msgId: this.msgId, msg_id: this.msg_id,
timestamp: this.timestamp, timestamp: this.timestamp,
sendTo: this.sendTo, send_to: this.send_to,
msgPurpose: this.msgPurpose, msg_purpose: this.msg_purpose,
senderName: this.senderName, sender_name: this.sender_name,
senderId: this.senderId, sender_id: this.sender_id,
receiverName: this.receiverName, receiver_name: this.receiver_name,
receiverId: this.receiverId, receiver_id: this.receiver_id,
replyTo: this.replyTo, reply_to: this.reply_to,
replyToMsgId: this.replyToMsgId, reply_to_msg_id: this.reply_to_msg_id,
brokerURL: this.brokerURL broker_url: this.broker_url
}; };
if (Object.keys(this.metadata).length > 0) { if (Object.keys(this.metadata).length > 0) {
@@ -437,7 +402,7 @@ class MessageEnvelope {
} }
} }
// SmartSend function // SmartSend function - matches Julia smartsend signature and behavior
async function smartsend(subject, data, options = {}) { async function smartsend(subject, data, options = {}) {
/** /**
* Send data either directly via NATS or via a fileserver URL, depending on payload size * Send data either directly via NATS or via a fileserver URL, depending on payload size
@@ -447,42 +412,45 @@ async function smartsend(subject, data, options = {}) {
* Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS. * Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS.
* *
* @param {string} subject - NATS subject to publish the message to * @param {string} subject - NATS subject to publish the message to
* @param {Array} data - List of {dataname, data, type} objects to send * @param {Array} data - List of {dataname, data, type} objects to send (must be a list, even for single payload)
* @param {Object} options - Additional options * @param {Object} options - Additional options
* @param {string} options.natsUrl - URL of the NATS server (default: "nats://localhost:4222") * @param {string} options.broker_url - URL of the NATS server (default: "nats://localhost:4222")
* @param {string} options.fileserverUrl - Base URL of the file server (default: "http://localhost:8080") * @param {string} options.fileserver_url - Base URL of the file server (default: "http://localhost:8080")
* @param {Function} options.fileserverUploadHandler - Function to handle fileserver uploads * @param {Function} options.fileserver_upload_handler - Function to handle fileserver uploads
* @param {number} options.sizeThreshold - Threshold in bytes separating direct vs link transport (default: 1MB) * @param {number} options.size_threshold - Threshold in bytes separating direct vs link transport (default: 1MB)
* @param {string} options.correlationId - Optional correlation ID for tracing * @param {string} options.correlation_id - Optional correlation ID for tracing
* @param {string} options.msgPurpose - Purpose of the message (default: "chat") * @param {string} options.msg_purpose - Purpose of the message (default: "chat")
* @param {string} options.senderName - Name of the sender (default: "NATSBridge") * @param {string} options.sender_name - Name of the sender (default: "NATSBridge")
* @param {string} options.receiverName - Name of the receiver (default: "") * @param {string} options.receiver_name - Name of the receiver (default: "")
* @param {string} options.receiverId - UUID of the receiver (default: "") * @param {string} options.receiver_id - UUID of the receiver (default: "")
* @param {string} options.replyTo - Topic to reply to (default: "") * @param {string} options.reply_to - Topic to reply to (default: "")
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "") * @param {string} options.reply_to_msg_id - Message ID this message is replying to (default: "")
* @param {boolean} options.isPublish - Whether to automatically publish the message to NATS (default: true) * @param {boolean} options.is_publish - Whether to automatically publish the message to NATS (default: true)
* * - When true: Message is published to NATS automatically
* @returns {Promise<Object>} - An object with { env: MessageEnvelope, env_json_str: string } * - When false: Returns (env, env_json_str) without publishing, allowing manual publishing
*/ * @returns {Promise<Object>} - A tuple-like object with { env: MessageEnvelope, env_json_str: string }
* - env: MessageEnvelope object with all metadata and payloads
* - env_json_str: JSON string representation of the envelope for manual publishing
*/
const { const {
natsUrl = DEFAULT_NATS_URL, broker_url = DEFAULT_NATS_URL,
fileserverUrl = DEFAULT_FILESERVER_URL, fileserver_url = DEFAULT_FILESERVER_URL,
fileserverUploadHandler = _upload_to_fileserver, fileserver_upload_handler = _upload_to_fileserver,
sizeThreshold = DEFAULT_SIZE_THRESHOLD, size_threshold = DEFAULT_SIZE_THRESHOLD,
correlationId = uuid4(), correlation_id = uuid4(),
msgPurpose = "chat", msg_purpose = "chat",
senderName = "NATSBridge", sender_name = "NATSBridge",
receiverName = "", receiver_name = "",
receiverId = "", receiver_id = "",
replyTo = "", reply_to = "",
replyToMsgId = "", reply_to_msg_id = "",
isPublish = true // Whether to automatically publish the message to NATS is_publish = true // Whether to automatically publish the message to NATS
} = options; } = options;
log_trace(correlationId, `Starting smartsend for subject: ${subject}`); log_trace(correlation_id, `Starting smartsend for subject: ${subject}`);
// Generate message metadata // Generate message metadata
const msgId = uuid4(); const msg_id = uuid4();
// Process each payload in the list // Process each payload in the list
const payloads = []; const payloads = [];
@@ -496,18 +464,18 @@ async function smartsend(subject, data, options = {}) {
const payloadBytes = _serialize_data(payloadData, payloadType); const payloadBytes = _serialize_data(payloadData, payloadType);
const payloadSize = payloadBytes.byteLength; const payloadSize = payloadBytes.byteLength;
log_trace(correlationId, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); log_trace(correlation_id, `Serialized payload '${dataname}' (payload_type: ${payloadType}) size: ${payloadSize} bytes`);
// Decision: Direct vs Link // Decision: Direct vs Link
if (payloadSize < sizeThreshold) { if (payloadSize < size_threshold) {
// Direct path - Base64 encode and send via NATS // Direct path - Base64 encode and send via NATS
const payloadB64 = arrayBufferToBase64(payloadBytes); const payloadB64 = uint8ArrayToBase64(payloadBytes);
log_trace(correlationId, `Using direct transport for ${payloadSize} bytes`); log_trace(correlation_id, `Using direct transport for ${payloadSize} bytes`);
// Create MessagePayload for direct transport // Create MessagePayload for direct transport
const payloadObj = new MessagePayload({ const payloadObj = new MessagePayload({
dataname: dataname, dataname: dataname,
type: payloadType, payload_type: payloadType,
transport: "direct", transport: "direct",
encoding: "base64", encoding: "base64",
size: payloadSize, size: payloadSize,
@@ -517,22 +485,22 @@ async function smartsend(subject, data, options = {}) {
payloads.push(payloadObj); payloads.push(payloadObj);
} else { } else {
// Link path - Upload to HTTP server, send URL via NATS // Link path - Upload to HTTP server, send URL via NATS
log_trace(correlationId, `Using link transport, uploading to fileserver`); log_trace(correlation_id, `Using link transport, uploading to fileserver`);
// Upload to HTTP server // Upload to HTTP server using plik_oneshot_upload handler
const response = await fileserverUploadHandler(fileserverUrl, dataname, payloadBytes, correlationId); const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes);
if (response.status !== 200) { if (response.status !== 200) {
throw new Error(`Failed to upload data to fileserver: ${response.status}`); throw new Error(`Failed to upload data to fileserver: ${response.status}`);
} }
const url = response.url; const url = response.url;
log_trace(correlationId, `Uploaded to URL: ${url}`); log_trace(correlation_id, `Uploaded to URL: ${url}`);
// Create MessagePayload for link transport // Create MessagePayload for link transport
const payloadObj = new MessagePayload({ const payloadObj = new MessagePayload({
dataname: dataname, dataname: dataname,
type: payloadType, payload_type: payloadType,
transport: "link", transport: "link",
encoding: "none", encoding: "none",
size: payloadSize, size: payloadSize,
@@ -545,16 +513,16 @@ async function smartsend(subject, data, options = {}) {
// Create MessageEnvelope with all payloads // Create MessageEnvelope with all payloads
const env = new MessageEnvelope({ const env = new MessageEnvelope({
correlationId: correlationId, correlation_id: correlation_id,
msgId: msgId, msg_id: msg_id,
sendTo: subject, send_to: subject,
msgPurpose: msgPurpose, msg_purpose: msg_purpose,
senderName: senderName, sender_name: sender_name,
receiverName: receiverName, receiver_name: receiver_name,
receiverId: receiverId, receiver_id: receiver_id,
replyTo: replyTo, reply_to: reply_to,
replyToMsgId: replyToMsgId, reply_to_msg_id: reply_to_msg_id,
brokerURL: natsUrl, broker_url: broker_url,
payloads: payloads payloads: payloads
}); });
@@ -562,11 +530,11 @@ async function smartsend(subject, data, options = {}) {
const env_json_str = env.toString(); const env_json_str = env.toString();
// Publish to NATS if isPublish is true // Publish to NATS if isPublish is true
if (isPublish) { if (is_publish) {
await publish_message(natsUrl, subject, env_json_str, correlationId); await publish_message(broker_url, subject, env_json_str, correlation_id);
} }
// Return both envelope and JSON string (tuple-like structure) // Return both envelope and JSON string (tuple-like structure, matching Julia API)
return { return {
env: env, env: env,
env_json_str: env_json_str env_json_str: env_json_str
@@ -574,11 +542,11 @@ async function smartsend(subject, data, options = {}) {
} }
// Helper: Publish message to NATS // Helper: Publish message to NATS
async function publish_message(natsUrl, subject, message, correlation_id) { async function publish_message(broker_url, subject, message, correlation_id) {
/** /**
* Publish a message to a NATS subject with proper connection management * Publish a message to a NATS subject with proper connection management
* *
* @param {string} natsUrl - NATS server URL * @param {string} broker_url - NATS server URL
* @param {string} subject - NATS subject to publish to * @param {string} subject - NATS subject to publish to
* @param {string} message - JSON message to publish * @param {string} message - JSON message to publish
* @param {string} correlation_id - Correlation ID for logging * @param {string} correlation_id - Correlation ID for logging
@@ -591,7 +559,7 @@ async function publish_message(natsUrl, subject, message, correlation_id) {
// Example with nats.js: // Example with nats.js:
// import { connect } from 'nats'; // import { connect } from 'nats';
// const nc = await connect({ servers: [natsUrl] }); // const nc = await connect({ servers: [broker_url] });
// await nc.publish(subject, message); // await nc.publish(subject, message);
// nc.close(); // nc.close();
@@ -599,7 +567,7 @@ async function publish_message(natsUrl, subject, message, correlation_id) {
console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`); console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`);
} }
// SmartReceive function // SmartReceive function - matches Julia smartreceive signature and behavior
async function smartreceive(msg, options = {}) { async function smartreceive(msg, options = {}) {
/** /**
* Receive and process messages from NATS * Receive and process messages from NATS
@@ -609,25 +577,25 @@ async function smartreceive(msg, options = {}) {
* *
* @param {Object} msg - NATS message object with payload property * @param {Object} msg - NATS message object with payload property
* @param {Object} options - Additional options * @param {Object} options - Additional options
* @param {Function} options.fileserverDownloadHandler - Function to handle downloading data from file server URLs * @param {Function} options.fileserver_download_handler - Function to handle downloading data from file server URLs
* @param {number} options.maxRetries - Maximum retry attempts for fetching URL (default: 5) * @param {number} options.max_retries - Maximum retry attempts for fetching URL (default: 5)
* @param {number} options.baseDelay - Initial delay for exponential backoff in ms (default: 100) * @param {number} options.base_delay - Initial delay for exponential backoff in ms (default: 100)
* @param {number} options.maxDelay - Maximum delay for exponential backoff in ms (default: 5000) * @param {number} options.max_delay - Maximum delay for exponential backoff in ms (default: 5000)
* *
* @returns {Promise<Object>} - Envelope dictionary with metadata and payloads field containing list of {dataname, data, type} objects * @returns {Promise<Object>} - JSON object of envelope with payloads field containing list of {dataname, data, type} tuples
*/ */
const { const {
fileserverDownloadHandler = _fetch_with_backoff, fileserver_download_handler = _fetch_with_backoff,
maxRetries = 5, max_retries = 5,
baseDelay = 100, base_delay = 100,
maxDelay = 5000 max_delay = 5000
} = options; } = options;
// Parse the JSON envelope // Parse the JSON envelope
const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload); const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
const json_data = JSON.parse(jsonStr); const json_data = JSON.parse(jsonStr);
log_trace(json_data.correlationId, `Processing received message`); log_trace(json_data.correlation_id, `Processing received message`);
// Process all payloads in the envelope // Process all payloads in the envelope
const payloads_list = []; const payloads_list = [];
@@ -642,32 +610,32 @@ async function smartreceive(msg, options = {}) {
if (transport === "direct") { if (transport === "direct") {
// Direct transport - payload is in the message // Direct transport - payload is in the message
log_trace(json_data.correlationId, `Direct transport - decoding payload '${dataname}'`); log_trace(json_data.correlation_id, `Direct transport - decoding payload '${dataname}'`);
// Extract base64 payload from the payload // Extract base64 payload from the payload
const payload_b64 = payload.data; const payload_b64 = payload.data;
// Decode Base64 payload // Decode Base64 payload
const payload_bytes = base64ToArrayBuffer(payload_b64); const payload_bytes = base64ToUint8Array(payload_b64);
// Deserialize based on type // Deserialize based on type
const data_type = payload.type; const data_type = payload.payload_type;
const data = _deserialize_data(payload_bytes, data_type, json_data.correlationId); const data = _deserialize_data(payload_bytes, data_type, json_data.correlation_id);
payloads_list.push({ dataname, data, type: data_type }); payloads_list.push({ dataname, data, type: data_type });
} else if (transport === "link") { } else if (transport === "link") {
// Link transport - payload is at URL // Link transport - payload is at URL
const url = payload.data; const url = payload.data;
log_trace(json_data.correlationId, `Link transport - fetching '${dataname}' from URL: ${url}`); log_trace(json_data.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`);
// Fetch with exponential backoff using the download handler // Fetch with exponential backoff using the download handler
const downloaded_data = await fileserverDownloadHandler( const downloaded_data = await fileserver_download_handler(
url, maxRetries, baseDelay, maxDelay, json_data.correlationId url, max_retries, base_delay, max_delay, json_data.correlation_id
); );
// Deserialize based on type // Deserialize based on type
const data_type = payload.type; const data_type = payload.payload_type;
const data = _deserialize_data(downloaded_data, data_type, json_data.correlationId); const data = _deserialize_data(downloaded_data, data_type, json_data.correlation_id);
payloads_list.push({ dataname, data, type: data_type }); payloads_list.push({ dataname, data, type: data_type });
} else { } else {
@@ -676,11 +644,74 @@ async function smartreceive(msg, options = {}) {
} }
// Replace payloads array with the processed list of {dataname, data, type} tuples // Replace payloads array with the processed list of {dataname, data, type} tuples
// This matches Julia's smartreceive return format
json_data.payloads = payloads_list; json_data.payloads = payloads_list;
return json_data; return json_data;
} }
// plik_oneshot_upload - matches Julia plik_oneshot_upload function
// Upload handler signature: plik_oneshot_upload(fileserver_url, dataname, data)
// Returns: { status, uploadid, fileid, url }
async function plik_oneshot_upload(file_server_url, dataname, data) {
/**
* Upload a single file to a plik server using one-shot mode
* This function uploads raw byte array to a plik server in one-shot mode (no upload session).
* It first creates a one-shot upload session by sending a POST request with {"OneShot": true},
* retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
*
* This is the default upload handler used by smartsend.
* Custom handlers can be passed via the fileserver_upload_handler option.
*
* @param {string} file_server_url - Base URL of the plik server (e.g., "http://localhost:8080")
* @param {string} dataname - Name of the file being uploaded
* @param {Uint8Array} data - Raw byte data of the file content
* @returns {Promise<Object>} - Dictionary with keys: status, uploadid, fileid, url
*/
// Step 1: Get upload ID and token
const url_getUploadID = `${file_server_url}/upload`;
const headers = { "Content-Type": "application/json" };
const body = JSON.stringify({ OneShot: true });
let http_response = await fetch(url_getUploadID, {
method: "POST",
headers: headers,
body: body
});
const response_json = await http_response.json();
const uploadid = response_json.id;
const uploadtoken = response_json.uploadToken;
// Step 2: Upload file data
const url_upload = `${file_server_url}/file/${uploadid}`;
// Create multipart form data
const formData = new FormData();
const blob = new Blob([data], { type: "application/octet-stream" });
formData.append("file", blob, dataname);
http_response = await fetch(url_upload, {
method: "POST",
headers: { "X-UploadToken": uploadtoken },
body: formData
});
const fileResponseJson = await http_response.json();
const fileid = fileResponseJson.id;
// URL of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
const url = `${file_server_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
return {
status: http_response.status,
uploadid: uploadid,
fileid: fileid,
url: url
};
}
// Export for Node.js // Export for Node.js
if (typeof module !== 'undefined' && module.exports) { if (typeof module !== 'undefined' && module.exports) {
module.exports = { module.exports = {
@@ -692,6 +723,7 @@ if (typeof module !== 'undefined' && module.exports) {
_deserialize_data, _deserialize_data,
_fetch_with_backoff, _fetch_with_backoff,
_upload_to_fileserver, _upload_to_fileserver,
plik_oneshot_upload,
DEFAULT_SIZE_THRESHOLD, DEFAULT_SIZE_THRESHOLD,
DEFAULT_NATS_URL, DEFAULT_NATS_URL,
DEFAULT_FILESERVER_URL, DEFAULT_FILESERVER_URL,
@@ -711,6 +743,7 @@ if (typeof window !== 'undefined') {
_deserialize_data, _deserialize_data,
_fetch_with_backoff, _fetch_with_backoff,
_upload_to_fileserver, _upload_to_fileserver,
plik_oneshot_upload,
DEFAULT_SIZE_THRESHOLD, DEFAULT_SIZE_THRESHOLD,
DEFAULT_NATS_URL, DEFAULT_NATS_URL,
DEFAULT_FILESERVER_URL, DEFAULT_FILESERVER_URL,

View File

@@ -1,45 +1,60 @@
""" """
Micropython NATS Bridge - Bi-Directional Data Bridge for Micropython Python NATS Bridge - Bi-Directional Data Bridge
This module provides functionality for sending and receiving data over NATS This module provides functionality for sending and receiving data over NATS
using the Claim-Check pattern for large payloads. using the Claim-Check pattern for large payloads.
Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
Multi-Payload Support (Standard API):
The system uses a standardized list-of-tuples format for all payload operations.
Even when sending a single payload, the user must wrap it in a list.
API Standard:
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (always returns a list of tuples)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
""" """
import json import json
import random
import time import time
import usocket
import uselect
import ustruct
import uuid import uuid
try:
import ussl
HAS_SSL = True
except ImportError:
HAS_SSL = False
# Constants # Constants
DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport
DEFAULT_NATS_URL = "nats://localhost:4222" DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080" DEFAULT_FILESERVER_URL = "http://localhost:8080"
# ============================================= 100 ============================================== # # ============================================= 100 ============================================== #
class MessagePayload: class MessagePayload:
"""Internal message payload structure representing a single payload within a NATS message envelope.""" """Internal message payload structure representing a single payload within a NATS message envelope.
def __init__(self, data, msg_type, id="", dataname="", transport="direct", This structure represents a single payload within a NATS message envelope.
It supports both direct transport (base64-encoded data) and link transport (URL-based).
Attributes:
id: Unique identifier for this payload (e.g., "uuid4")
dataname: Name of the payload (e.g., "login_image")
payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
transport: Transport method ("direct" or "link")
encoding: Encoding method ("none", "json", "base64", "arrow-ipc")
size: Size of the payload in bytes
data: Payload data (bytes for direct, URL for link)
metadata: Optional metadata dictionary
"""
def __init__(self, data, payload_type, id="", dataname="", transport="direct",
encoding="none", size=0, metadata=None): encoding="none", size=0, metadata=None):
""" """
Initialize a MessagePayload. Initialize a MessagePayload.
Args: Args:
data: Payload data (bytes for direct, URL string for link) data: Payload data (base64 string for direct, URL string for link)
msg_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary") payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
id: Unique identifier for this payload (auto-generated if empty) id: Unique identifier for this payload (auto-generated if empty)
dataname: Name of the payload (auto-generated UUID if empty) dataname: Name of the payload (auto-generated UUID if empty)
transport: Transport method ("direct" or "link") transport: Transport method ("direct" or "link")
@@ -49,7 +64,7 @@ class MessagePayload:
""" """
self.id = id if id else self._generate_uuid() self.id = id if id else self._generate_uuid()
self.dataname = dataname if dataname else self._generate_uuid() self.dataname = dataname if dataname else self._generate_uuid()
self.type = msg_type self.payload_type = payload_type
self.transport = transport self.transport = transport
self.encoding = encoding self.encoding = encoding
self.size = size self.size = size
@@ -65,7 +80,7 @@ class MessagePayload:
payload_dict = { payload_dict = {
"id": self.id, "id": self.id,
"dataname": self.dataname, "dataname": self.dataname,
"type": self.type, "payload_type": self.payload_type,
"transport": self.transport, "transport": self.transport,
"encoding": self.encoding, "encoding": self.encoding,
"size": self.size, "size": self.size,
@@ -152,20 +167,24 @@ class MessageEnvelope:
return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime()) return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime())
def to_json(self): def to_json(self):
"""Convert envelope to JSON string.""" """Convert envelope to JSON string.
Returns:
str: JSON string representation of the envelope using snake_case field names
"""
obj = { obj = {
"correlationId": self.correlation_id, "correlation_id": self.correlation_id,
"msgId": self.msg_id, "msg_id": self.msg_id,
"timestamp": self.timestamp, "timestamp": self.timestamp,
"sendTo": self.send_to, "send_to": self.send_to,
"msgPurpose": self.msg_purpose, "msg_purpose": self.msg_purpose,
"senderName": self.sender_name, "sender_name": self.sender_name,
"senderId": self.sender_id, "sender_id": self.sender_id,
"receiverName": self.receiver_name, "receiver_name": self.receiver_name,
"receiverId": self.receiver_id, "receiver_id": self.receiver_id,
"replyTo": self.reply_to, "reply_to": self.reply_to,
"replyToMsgId": self.reply_to_msg_id, "reply_to_msg_id": self.reply_to_msg_id,
"brokerURL": self.broker_url "broker_url": self.broker_url
} }
# Include metadata if not empty # Include metadata if not empty
@@ -188,68 +207,126 @@ def log_trace(correlation_id, message):
print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message)) print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message))
def _serialize_data(data, msg_type): def _serialize_data(data, payload_type):
"""Serialize data according to specified format. """Serialize data according to specified format.
This function serializes arbitrary data into a binary representation based on the specified type.
It supports multiple serialization formats for different data types.
Args: Args:
data: Data to serialize data: Data to serialize
msg_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary") - "text": String
- "dictionary": JSON-serializable dict
- "table": Tabular data (pandas DataFrame or list of dicts)
- "image", "audio", "video", "binary": bytes
payload_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary")
Returns: Returns:
bytes: Binary representation of the serialized data bytes: Binary representation of the serialized data
Example:
>>> text_bytes = _serialize_data("Hello World", "text")
>>> json_bytes = _serialize_data({"key": "value"}, "dictionary")
>>> table_bytes = _serialize_data([{"id": 1, "name": "Alice"}], "table")
""" """
if msg_type == "text": if payload_type == "text":
if isinstance(data, str): if isinstance(data, str):
return data.encode('utf-8') return data.encode('utf-8')
else: else:
raise ValueError("Text data must be a string") raise ValueError("Text data must be a string")
elif msg_type == "dictionary": elif payload_type == "dictionary":
if isinstance(data, dict): if isinstance(data, dict):
json_str = json.dumps(data) json_str = json.dumps(data)
return json_str.encode('utf-8') return json_str.encode('utf-8')
else: else:
raise ValueError("Dictionary data must be a dict") raise ValueError("Dictionary data must be a dict")
elif msg_type in ("image", "audio", "video", "binary"): elif payload_type == "table":
# Support pandas DataFrame or list of dicts
try:
import pandas as pd
if isinstance(data, pd.DataFrame):
# Convert DataFrame to JSON and then to bytes
json_str = data.to_json(orient='records', force_ascii=False)
return json_str.encode('utf-8')
elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
# List of dicts
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Table data must be a pandas DataFrame or list of dicts")
except ImportError:
# Fallback: if pandas not available, treat as list of dicts
if isinstance(data, list):
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Table data requires pandas DataFrame or list of dicts (pandas not available)")
elif payload_type in ("image", "audio", "video", "binary"):
if isinstance(data, bytes): if isinstance(data, bytes):
return data return data
else: else:
raise ValueError("{} data must be bytes".format(msg_type.capitalize())) raise ValueError("{} data must be bytes".format(payload_type.capitalize()))
else: else:
raise ValueError("Unknown type: {}".format(msg_type)) raise ValueError("Unknown payload_type: {}".format(payload_type))
def _deserialize_data(data_bytes, msg_type, correlation_id): def _deserialize_data(data_bytes, payload_type, correlation_id):
"""Deserialize bytes to data based on type. """Deserialize bytes to data based on type.
This function converts serialized bytes back to Python data based on type.
It handles "text" (string), "dictionary" (JSON deserialization), "table" (JSON deserialization),
"image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data).
Args: Args:
data_bytes: Serialized data as bytes data_bytes: Serialized data as bytes
msg_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") payload_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
correlation_id: Correlation ID for logging correlation_id: Correlation ID for logging
Returns: Returns:
Deserialized data Deserialized data:
- "text": str
- "dictionary": dict
- "table": list of dicts (or pandas DataFrame if available)
- "image", "audio", "video", "binary": bytes
Example:
>>> text_data = _deserialize_data(b"Hello", "text", "corr_id")
>>> json_data = _deserialize_data(b'{"key": "value"}', "dictionary", "corr_id")
>>> table_data = _deserialize_data(b'[{"id": 1}]', "table", "corr_id")
""" """
if msg_type == "text": if payload_type == "text":
return data_bytes.decode('utf-8') return data_bytes.decode('utf-8')
elif msg_type == "dictionary": elif payload_type == "dictionary":
json_str = data_bytes.decode('utf-8') json_str = data_bytes.decode('utf-8')
return json.loads(json_str) return json.loads(json_str)
elif msg_type in ("image", "audio", "video", "binary"): elif payload_type == "table":
# Deserialize table data (JSON format)
json_str = data_bytes.decode('utf-8')
table_data = json.loads(json_str)
# If pandas is available, try to convert to DataFrame
try:
import pandas as pd
return pd.DataFrame(table_data)
except ImportError:
return table_data
elif payload_type in ("image", "audio", "video", "binary"):
return data_bytes return data_bytes
else: else:
raise ValueError("Unknown type: {}".format(msg_type)) raise ValueError("Unknown payload_type: {}".format(payload_type))
class NATSConnection: class NATSConnection:
"""Simple NATS connection for Micropython.""" """Simple NATS connection for Python and Micropython."""
def __init__(self, url=DEFAULT_NATS_URL): def __init__(self, url=DEFAULT_BROKER_URL):
"""Initialize NATS connection. """Initialize NATS connection.
Args: Args:
@@ -276,9 +353,19 @@ class NATSConnection:
def connect(self): def connect(self):
"""Connect to NATS server.""" """Connect to NATS server."""
addr = usocket.getaddrinfo(self.host, self.port)[0][-1] # Use socket for both Python and Micropython
self.conn = usocket.socket() try:
self.conn.connect(addr) import socket
addr = socket.getaddrinfo(self.host, self.port)[0][-1]
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.connect(addr)
except NameError:
# Micropython fallback
import usocket
addr = usocket.getaddrinfo(self.host, self.port)[0][-1]
self.conn = usocket.socket()
self.conn.connect(addr)
log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port)) log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port))
def publish(self, subject, message): def publish(self, subject, message):
@@ -294,7 +381,15 @@ class NATSConnection:
# Simple NATS protocol implementation # Simple NATS protocol implementation
msg = "PUB {} {}\r\n".format(subject, len(message)) msg = "PUB {} {}\r\n".format(subject, len(message))
msg = msg.encode('utf-8') + message + b"\r\n" msg = msg.encode('utf-8') + message + b"\r\n"
self.conn.send(msg)
try:
import socket
self.conn.send(msg)
except NameError:
# Micropython fallback
import usocket
self.conn.send(msg)
log_trace("", "Message published to {}".format(subject)) log_trace("", "Message published to {}".format(subject))
def subscribe(self, subject, callback): def subscribe(self, subject, callback):
@@ -335,11 +430,14 @@ class NATSConnection:
def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""): def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""):
"""Fetch data from URL with exponential backoff. """Fetch data from URL with exponential backoff.
This function retrieves data from a URL with retry logic using
exponential backoff to handle transient failures.
Args: Args:
url: URL to fetch from url: URL to fetch from
max_retries: Maximum number of retry attempts max_retries: Maximum number of retry attempts (default: 5)
base_delay: Initial delay in milliseconds base_delay: Initial delay in milliseconds (default: 100)
max_delay: Maximum delay in milliseconds max_delay: Maximum delay in milliseconds (default: 5000)
correlation_id: Correlation ID for logging correlation_id: Correlation ID for logging
Returns: Returns:
@@ -347,33 +445,54 @@ def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, corr
Raises: Raises:
Exception: If all retry attempts fail Exception: If all retry attempts fail
Example:
>>> data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "corr_id")
""" """
delay = base_delay delay = base_delay
for attempt in range(1, max_retries + 1): for attempt in range(1, max_retries + 1):
try: try:
# Simple HTTP GET request # Simple HTTP GET request
# This is a simplified implementation # Try urequests for Micropython first, then requests for Python
# For production, you'd want a proper HTTP client try:
import urequests import urequests
response = urequests.get(url) response = urequests.get(url)
if response.status_code == 200: status_code = response.status_code
content = response.content
except ImportError:
try:
import requests
response = requests.get(url)
response.raise_for_status()
status_code = response.status_code
content = response.content
except ImportError:
raise Exception("No HTTP library available (urequests or requests)")
if status_code == 200:
log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt)) log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt))
return response.content return content
else: else:
raise Exception("Failed to fetch: {}".format(response.status_code)) raise Exception("Failed to fetch: {}".format(status_code))
except Exception as e: except Exception as e:
log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e))) log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e)))
if attempt < max_retries: if attempt < max_retries:
time.sleep(delay / 1000.0) time.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay) delay = min(delay * 2, max_delay)
raise Exception("Failed to fetch data after {} attempts".format(max_retries))
def plik_oneshot_upload(file_server_url, filename, data): def plik_oneshot_upload(fileserver_url, dataname, data):
"""Upload a single file to a plik server using one-shot mode. """Upload a single file to a plik server using one-shot mode.
This function uploads raw byte data to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with {"OneShot": true},
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
Args: Args:
file_server_url: Base URL of the plik server fileserver_url: Base URL of the plik server (e.g., "http://localhost:8080")
filename: Name of the file being uploaded dataname: Name of the file being uploaded
data: Raw byte data of the file content data: Raw byte data of the file content
Returns: Returns:
@@ -382,23 +501,31 @@ def plik_oneshot_upload(file_server_url, filename, data):
- "uploadid": ID of the one-shot upload session - "uploadid": ID of the one-shot upload session
- "fileid": ID of the uploaded file within the session - "fileid": ID of the uploaded file within the session
- "url": Full URL to download the uploaded file - "url": Full URL to download the uploaded file
Example:
>>> result = plik_oneshot_upload("http://localhost:8080", "test.txt", b"hello world")
>>> result["status"], result["uploadid"], result["fileid"], result["url"]
""" """
import urequests
import json import json
try:
import urequests
except ImportError:
import requests as urequests
# Get upload ID # Get upload ID
url_get_upload_id = "{}/upload".format(file_server_url) url_get_upload_id = "{}/upload".format(fileserver_url)
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
body = json.dumps({"OneShot": True}) body = json.dumps({"OneShot": True})
response = urequests.post(url_get_upload_id, headers=headers, data=body) response = urequests.post(url_get_upload_id, headers=headers, data=body)
response_json = json.loads(response.content) response_json = json.loads(response.text if hasattr(response, 'text') else response.content)
uploadid = response_json.get("id") uploadid = response_json.get("id")
uploadtoken = response_json.get("uploadToken") uploadtoken = response_json.get("uploadToken")
# Upload file # Upload file
url_upload = "{}/file/{}".format(file_server_url, uploadid) url_upload = "{}/file/{}".format(fileserver_url, uploadid)
headers = {"X-UploadToken": uploadtoken} headers = {"X-UploadToken": uploadtoken}
# For Micropython, we need to construct the multipart form data manually # For Micropython, we need to construct the multipart form data manually
@@ -407,7 +534,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
# Create multipart body # Create multipart body
part1 = "--{}\r\n".format(boundary) part1 = "--{}\r\n".format(boundary)
part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(filename) part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(dataname)
part1 += "Content-Type: application/octet-stream\r\n\r\n" part1 += "Content-Type: application/octet-stream\r\n\r\n"
part1_bytes = part1.encode('utf-8') part1_bytes = part1.encode('utf-8')
@@ -421,10 +548,10 @@ def plik_oneshot_upload(file_server_url, filename, data):
content_type = "multipart/form-data; boundary={}".format(boundary) content_type = "multipart/form-data; boundary={}".format(boundary)
response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body) response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body)
response_json = json.loads(response.content) response_json = json.loads(response.text if hasattr(response, 'text') else response.content)
fileid = response_json.get("id") fileid = response_json.get("id")
url = "{}/file/{}/{}".format(file_server_url, uploadid, filename) url = "{}/file/{}/{}".format(fileserver_url, uploadid, dataname)
return { return {
"status": response.status_code, "status": response.status_code,
@@ -434,7 +561,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
} }
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL, def smartsend(subject, data, broker_url=DEFAULT_BROKER_URL, fileserver_url=DEFAULT_FILESERVER_URL,
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD, fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge", correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True): receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True):
@@ -447,27 +574,38 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
Args: Args:
subject: NATS subject to publish the message to subject: NATS subject to publish the message to
data: List of (dataname, data, type) tuples to send data: List of (dataname, data, payload_type) tuples to send
nats_url: URL of the NATS server - dataname: Name of the payload
- data: The actual data to send
- payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
broker_url: URL of the NATS server
fileserver_url: URL of the HTTP file server fileserver_url: URL of the HTTP file server
fileserver_upload_handler: Function to handle fileserver uploads fileserver_upload_handler: Function to handle fileserver uploads (must return dict with "status", "uploadid", "fileid", "url" keys)
size_threshold: Threshold in bytes separating direct vs link transport size_threshold: Threshold in bytes separating direct vs link transport (default: 1MB)
correlation_id: Optional correlation ID for tracing correlation_id: Optional correlation ID for tracing; if None, a UUID is generated
msg_purpose: Purpose of the message msg_purpose: Purpose of the message ("ACK", "NACK", "updateStatus", "shutdown", "chat", etc.)
sender_name: Name of the sender sender_name: Name of the sender
receiver_name: Name of the receiver receiver_name: Name of the receiver (empty string means broadcast)
receiver_id: UUID of the receiver receiver_id: UUID of the receiver (empty string means broadcast)
reply_to: Topic to reply to reply_to: Topic to reply to (empty string if no reply expected)
reply_to_msg_id: Message ID this message is replying to reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS (default: True) is_publish: Whether to automatically publish the message to NATS (default: True)
- When True: message is published to NATS
- When False: returns envelope and JSON string without publishing
Returns: Returns:
tuple: (env, env_json_str) where: tuple: (env, env_json_str) where:
- env: MessageEnvelope object with all metadata and payloads - env: MessageEnvelope object with all metadata and payloads
- env_json_str: JSON string representation of the envelope for publishing - env_json_str: JSON string representation of the envelope for publishing
Example:
>>> data = [("message", "Hello World!", "text")]
>>> env, env_json_str = smartsend("/test", data)
>>> # env: MessageEnvelope with all metadata and payloads
>>> # env_json_str: JSON string for publishing
""" """
# Generate correlation ID if not provided # Generate correlation ID if not provided
cid = correlation_id if correlation_id else str(uuid.uuid4()) cid = correlation_id if correlation_id is not None else str(uuid.uuid4())
log_trace(cid, "Starting smartsend for subject: {}".format(subject)) log_trace(cid, "Starting smartsend for subject: {}".format(subject))
@@ -482,16 +620,19 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
payload_bytes = _serialize_data(payload_data, payload_type) payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes) payload_size = len(payload_bytes)
log_trace(cid, "Serialized payload '{}' (type: {}) size: {} bytes".format( log_trace(cid, "Serialized payload '{}' (payload_type: {}) size: {} bytes".format(
dataname, payload_type, payload_size)) dataname, payload_type, payload_size))
# Decision: Direct vs Link # Decision: Direct vs Link
if payload_size < size_threshold: if payload_size < size_threshold:
# Direct path - Base64 encode and send via NATS # Direct path - Base64 encode and send via NATS
payload_b64 = _serialize_data(payload_bytes, "binary") # Already bytes
# Convert to base64 string for JSON # Convert to base64 string for JSON
import ubinascii try:
payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip() import ubinascii
payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip()
except ImportError:
import base64
payload_b64_str = base64.b64encode(payload_bytes).decode('utf-8')
log_trace(cid, "Using direct transport for {} bytes".format(payload_size)) log_trace(cid, "Using direct transport for {} bytes".format(payload_size))
@@ -514,10 +655,10 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
# Upload to HTTP server # Upload to HTTP server
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
if response["status"] != 200: if response.get("status") != 200:
raise Exception("Failed to upload data to fileserver: {}".format(response["status"])) raise Exception("Failed to upload data to fileserver: {}".format(response.get("status")))
url = response["url"] url = response.get("url")
log_trace(cid, "Uploaded to URL: {}".format(url)) log_trace(cid, "Uploaded to URL: {}".format(url))
# Create MessagePayload for link transport # Create MessagePayload for link transport
@@ -546,7 +687,7 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
receiver_id=receiver_id, receiver_id=receiver_id,
reply_to=reply_to, reply_to=reply_to,
reply_to_msg_id=reply_to_msg_id, reply_to_msg_id=reply_to_msg_id,
broker_url=nats_url, broker_url=broker_url,
metadata={} metadata={}
) )
@@ -554,7 +695,7 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
# Publish to NATS if is_publish is True # Publish to NATS if is_publish is True
if is_publish: if is_publish:
nats_conn = NATSConnection(nats_url) nats_conn = NATSConnection(broker_url)
nats_conn.connect() nats_conn.connect()
nats_conn.publish(subject, msg_json) nats_conn.publish(subject, msg_json)
nats_conn.close() nats_conn.close()
@@ -571,18 +712,29 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
(base64 decoded payloads) and link transport (URL-based payloads). (base64 decoded payloads) and link transport (URL-based payloads).
Args: Args:
msg: NATS message to process (dict with payload data) msg: NATS message to process (dict or JSON string with envelope data)
fileserver_download_handler: Function to handle downloading data from file server URLs fileserver_download_handler: Function to handle downloading data from file server URLs
max_retries: Maximum retry attempts for fetching URL Receives: (url, max_retries, base_delay, max_delay, correlation_id)
base_delay: Initial delay for exponential backoff in ms Returns: bytes (the downloaded data)
max_delay: Maximum delay for exponential backoff in ms max_retries: Maximum retry attempts for fetching URL (default: 5)
base_delay: Initial delay for exponential backoff in ms (default: 100)
max_delay: Maximum delay for exponential backoff in ms (default: 5000)
Returns: Returns:
dict: Envelope dictionary with metadata and 'payloads' field containing list of (dataname, data, type) tuples dict: Envelope dictionary with metadata and 'payloads' field containing list of
(dataname, data, payload_type) tuples
Example:
>>> env = smartreceive(msg)
>>> # env contains envelope metadata and payloads field
>>> # env["payloads"] = [(dataname1, data1, payload_type1), ...]
>>> for dataname, data, payload_type in env["payloads"]:
... print("Received {} of type {}: {}".format(dataname, payload_type, data))
""" """
# Parse the JSON envelope # Parse the JSON envelope
json_data = msg if isinstance(msg, dict) else json.loads(msg) json_data = msg if isinstance(msg, dict) else json.loads(msg)
log_trace(json_data.get("correlationId", ""), "Processing received message") correlation_id = json_data.get("correlation_id", "")
log_trace(correlation_id, "Processing received message")
# Process all payloads in the envelope # Process all payloads in the envelope
payloads_list = [] payloads_list = []
@@ -596,43 +748,47 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
dataname = payload.get("dataname", "") dataname = payload.get("dataname", "")
if transport == "direct": if transport == "direct":
log_trace(json_data.get("correlationId", ""), log_trace(correlation_id,
"Direct transport - decoding payload '{}'".format(dataname)) "Direct transport - decoding payload '{}'".format(dataname))
# Extract base64 payload from the payload # Extract base64 payload from the payload
payload_b64 = payload.get("data", "") payload_b64 = payload.get("data", "")
# Decode Base64 payload # Decode Base64 payload
import ubinascii try:
payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8')) import ubinascii
payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8'))
except ImportError:
import base64
payload_bytes = base64.b64decode(payload_b64)
# Deserialize based on type # Deserialize based on type
data_type = payload.get("type", "") payload_type = payload.get("payload_type", "")
data = _deserialize_data(payload_bytes, data_type, json_data.get("correlationId", "")) data = _deserialize_data(payload_bytes, payload_type, correlation_id)
payloads_list.append((dataname, data, data_type)) payloads_list.append((dataname, data, payload_type))
elif transport == "link": elif transport == "link":
# Extract download URL from the payload # Extract download URL from the payload
url = payload.get("data", "") url = payload.get("data", "")
log_trace(json_data.get("correlationId", ""), log_trace(correlation_id,
"Link transport - fetching '{}' from URL: {}".format(dataname, url)) "Link transport - fetching '{}' from URL: {}".format(dataname, url))
# Fetch with exponential backoff # Fetch with exponential backoff
downloaded_data = fileserver_download_handler( downloaded_data = fileserver_download_handler(
url, max_retries, base_delay, max_delay, json_data.get("correlationId", "") url, max_retries, base_delay, max_delay, correlation_id
) )
# Deserialize based on type # Deserialize based on type
data_type = payload.get("type", "") payload_type = payload.get("payload_type", "")
data = _deserialize_data(downloaded_data, data_type, json_data.get("correlationId", "")) data = _deserialize_data(downloaded_data, payload_type, correlation_id)
payloads_list.append((dataname, data, data_type)) payloads_list.append((dataname, data, payload_type))
else: else:
raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport)) raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport))
# Replace payloads field with the processed list of (dataname, data, type) tuples # Replace payloads field with the processed list of (dataname, data, payload_type) tuples
json_data["payloads"] = payloads_list json_data["payloads"] = payloads_list
return json_data return json_data
@@ -651,11 +807,11 @@ def get_timestamp():
# Example usage # Example usage
if __name__ == "__main__": if __name__ == "__main__":
print("NATSBridge for Micropython") print("NATSBridge - Bi-Directional Data Bridge")
print("=========================") print("=======================================")
print("This module provides:") print("This module provides:")
print(" - MessageEnvelope: Message envelope structure") print(" - MessageEnvelope: Message envelope structure with snake_case fields")
print(" - MessagePayload: Payload structure") print(" - MessagePayload: Payload structure with payload_type field")
print(" - smartsend: Send data via NATS with automatic transport selection") print(" - smartsend: Send data via NATS with automatic transport selection")
print(" - smartreceive: Receive and process messages from NATS") print(" - smartreceive: Receive and process messages from NATS")
print(" - plik_oneshot_upload: Upload files to HTTP file server") print(" - plik_oneshot_upload: Upload files to HTTP file server")
@@ -663,10 +819,12 @@ if __name__ == "__main__":
print() print()
print("Usage:") print("Usage:")
print(" from nats_bridge import smartsend, smartreceive") print(" from nats_bridge import smartsend, smartreceive")
print(" data = [(\"message\", \"Hello World\", \"text\")]") print()
print(" env = smartsend(\"my.subject\", data)") print(" # Send data (list of (dataname, data, payload_type) tuples)")
print(" data = [(\"message\", \"Hello World!\", \"text\")]")
print(" env, env_json_str = smartsend(\"my.subject\", data)")
print() print()
print(" # On receiver:") print(" # On receiver:")
print(" payloads = smartreceive(msg)") print(" env = smartreceive(msg)")
print(" for dataname, data, type in payloads:") print(" for dataname, data, payload_type in env[\"payloads\"]:")
print(" print(f\"Received {dataname} of type {type}: {data}\")") print(" print(\"Received {} of type {}: {}\".format(dataname, payload_type, data))")