12 Commits

Author SHA1 Message Date
be94c62760 update 2026-02-25 12:24:02 +07:00
6a862ef243 update 2026-02-25 12:09:00 +07:00
ae2de5fc62 update 2026-02-25 10:33:30 +07:00
df0bbc7327 update 2026-02-25 09:58:10 +07:00
d94761c866 update 2026-02-25 09:44:08 +07:00
f8235e1a59 update 2026-02-25 08:54:04 +07:00
647cadf497 update 2026-02-25 08:33:32 +07:00
8c793a81b6 update 2026-02-25 08:02:03 +07:00
6a42ba7e43 update 2026-02-25 07:29:42 +07:00
14b3790251 update 2026-02-25 06:23:24 +07:00
61d81bed62 update 2026-02-25 06:04:40 +07:00
1a10bc1a5f update 2026-02-25 05:32:59 +07:00
10 changed files with 1081 additions and 410 deletions

View File

@@ -13,3 +13,23 @@ Role: Principal Systems Architect & Lead Software Engineer.Objective: Implement
Create a walkthrough for Julia service-A service sending a mix-content chat message to Julia service-B. the chat message must includes
I updated the following:
- NATSBridge.jl. Essentially I add NATS_connection keyword and new publish_message function to support the keyword.
Use them and ONLY them as ground truth.
Then update the following files accordingly:
- architecture.md
- implementation.md
All API should be semantically consistent and naming should be consistent across the board.

View File

@@ -173,7 +173,7 @@ from nats_bridge import smartsend
# Send a text message
data = [("message", "Hello World", "text")]
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222")
print("Message sent!")
```
@@ -185,7 +185,7 @@ const { smartsend } = require('./src/NATSBridge');
// Send a text message
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
], { broker_url: "nats://localhost:4222" });
console.log("Message sent!");
```
@@ -197,7 +197,7 @@ using NATSBridge
# Send a text message
data = [("message", "Hello World", "text")]
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; nats_url="nats://localhost:4222")
env, env_json_str = NATSBridge.smartsend("/chat/room1", data; broker_url="nats://localhost:4222")
println("Message sent!")
```
@@ -313,7 +313,7 @@ from nats_bridge import smartsend
env, env_json_str = smartsend(
subject, # NATS subject to publish to
data, # List of (dataname, data, type) tuples
nats_url="nats://localhost:4222", # NATS server URL
broker_url="nats://localhost:4222", # NATS server URL
fileserver_url="http://localhost:8080", # File server URL
fileserver_upload_handler=plik_oneshot_upload, # Upload handler function
size_threshold=1_000_000, # Threshold in bytes (default: 1MB)
@@ -337,18 +337,18 @@ const { env, env_json_str } = await smartsend(
subject, // NATS subject
data, // Array of {dataname, data, type}
{
natsUrl: "nats://localhost:4222",
fileserverUrl: "http://localhost:8080",
fileserverUploadHandler: customUploadHandler,
sizeThreshold: 1_000_000,
correlationId: "custom-id",
msgPurpose: "chat",
senderName: "NATSBridge",
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: "",
isPublish: true // Whether to automatically publish to NATS
broker_url: "nats://localhost:4222",
fileserver_url: "http://localhost:8080",
fileserver_upload_handler: customUploadHandler,
size_threshold: 1_000_000,
correlation_id: "custom-id",
msg_purpose: "chat",
sender_name: "NATSBridge",
receiver_name: "",
receiver_id: "",
reply_to: "",
reply_to_msg_id: "",
is_publish: true // Whether to automatically publish to NATS
}
);
```
@@ -361,9 +361,9 @@ using NATSBridge
env, env_json_str = NATSBridge.smartsend(
subject, # NATS subject
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type)
nats_url::String = "nats://localhost:4222",
broker_url::String = "nats://localhost:4222",
fileserver_url = "http://localhost:8080",
fileserverUploadHandler::Function = plik_oneshot_upload,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
@@ -372,7 +372,8 @@ env, env_json_str = NATSBridge.smartsend(
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS
is_publish::Bool = true, # Whether to automatically publish to NATS
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
)
# Returns: (msgEnvelope_v1, JSON string)
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
@@ -423,9 +424,9 @@ const env = await smartreceive(
using NATSBridge
# Note: msg is a NATS.Msg object passed from the subscription callback
env, env_json_str = NATSBridge.smartreceive(
env = NATSBridge.smartreceive(
msg::NATS.Msg;
fileserverDownloadHandler::Function = _fetch_with_backoff,
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
@@ -433,6 +434,35 @@ env, env_json_str = NATSBridge.smartreceive(
# Returns: Dict with envelope metadata and payloads array
```
### publish_message
Publish a message to a NATS subject. This function is available in Julia with two overloads:
#### Julia
**Using broker URL (creates new connection):**
```julia
using NATSBridge, NATS
# Publish with URL - creates a new connection
NATSBridge.publish_message(
"nats://localhost:4222", # broker_url
"/chat/room1", # subject
"{\"correlation_id\":\"abc123\"}", # message
"abc123" # correlation_id
)
```
**Using pre-existing connection (saves connection overhead):**
```julia
using NATSBridge, NATS
# Create connection once and reuse
conn = NATS.connect("nats://localhost:4222")
NATSBridge.publish_message(conn, "/chat/room1", "{\"correlation_id\":\"abc123\"}", "abc123")
# Connection is automatically drained after publish
```
---
## Payload Types
@@ -488,7 +518,7 @@ smartsend("/topic", data, fileserver_url="http://localhost:8080")
```javascript
await smartsend("/topic", [
{ dataname: "file", data: largeData, type: "binary" }
], { fileserverUrl: "http://localhost:8080" });
], { fileserver_url: "http://localhost:8080" });
```
#### Julia
@@ -529,7 +559,7 @@ const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "user_avatar", data: image_data, type: "image" },
{ dataname: "large_document", data: large_file_data, type: "binary" }
], {
fileserverUrl: "http://localhost:8080"
fileserver_url: "http://localhost:8080"
});
```
@@ -653,9 +683,10 @@ from nats_bridge import smartsend
env, env_json_str = smartsend(
"/device/command",
[("command", {"action": "read_sensor"}, "dictionary")],
broker_url="nats://localhost:4222",
reply_to="/device/response"
)
# env: msgEnvelope_v1 object
# env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS
# The env_json_str can also be published directly using NATS request-reply pattern
@@ -697,7 +728,8 @@ const { smartsend } = require('./src/NATSBridge');
const { env, env_json_str } = await smartsend("/device/command", [
{ dataname: "command", data: { action: "read_sensor" }, type: "dictionary" }
], {
replyTo: "/device/response"
broker_url: "nats://localhost:4222",
reply_to: "/device/response"
});
```
@@ -739,6 +771,7 @@ using NATSBridge
env, env_json_str = NATSBridge.smartsend(
"/device/command",
[("command", Dict("action" => "read_sensor"), "dictionary")];
broker_url="nats://localhost:4222",
reply_to="/device/response"
)
```
@@ -755,7 +788,7 @@ const NATS_URL = "nats://localhost:4222"
function test_responder()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
env, env_json_str = NATSBridge.smartreceive(msg, fileserverDownloadHandler)
env = NATSBridge.smartreceive(msg, fileserver_download_handler=_fetch_with_backoff)
for (dataname, data, type) in env["payloads"]
if dataname == "command" && data["action"] == "read_sensor"
response = Dict("sensor_id" => "sensor-001", "value" => 42.5)
@@ -790,7 +823,7 @@ async def main():
# Send sensor data
data = [("temperature", "25.5", "text"), ("humidity", 65, "dictionary")]
smartsend("/device/sensors", data, nats_url="nats://localhost:4222")
smartsend("/device/sensors", data, broker_url="nats://localhost:4222")
# Receive commands - msg comes from the callback
async def message_handler(msg):

View File

@@ -18,9 +18,9 @@ The system uses **handler functions** to abstract file server operations, allowi
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8})
# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
fileserver_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter
@@ -40,8 +40,8 @@ The system uses a **standardized list-of-tuples format** for all payload operati
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# Output format for smartreceive (returns a dictionary-like object with payloads field containing list of tuples)
# Returns: Dict-like object with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# {
# "correlation_id": "...",
# "msg_id": "...",
@@ -369,6 +369,25 @@ graph TD
## Implementation Details
### API Consistency Across Languages
**High-Level API (Consistent Across All Languages):**
- `smartsend(subject, data, ...)` - Main publishing function
- `smartreceive(msg, ...)` - Main receiving function
- Message envelope structure (`msg_envelope_v1` / `MessageEnvelope`)
- Payload structure (`msg_payload_v1` / `MessagePayload`)
- Transport strategy (direct vs link based on size threshold)
- Supported payload types: text, dictionary, table, image, audio, video, binary
**Low-Level Native Functions (Language-Specific Conventions):**
- Julia: `NATS.connect()`, `publish_message()`, function overloading
- JavaScript: `nats.js` client, native async/await patterns
- Python: `nats-python` client, native async/await patterns
**Connection Reuse Pattern:**
- **Julia:** Uses `NATS_connection` keyword parameter with function overloading
- **JavaScript/Python:** Achieved by creating NATS client outside the function and reusing it in custom handlers
### Julia Implementation
#### Dependencies
@@ -395,10 +414,25 @@ function smartsend(
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true # Whether to automatically publish to NATS
is_publish::Bool = true, # Whether to automatically publish to NATS
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
)
```
**Keyword Parameter - NATS_connection:**
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios where connection reuse provides performance benefits.
**Connection Handling Logic:**
```julia
if is_publish == false
# skip publish a message
elseif is_publish == true && NATS_connection === nothing
publish_message(broker_url, subject, env_json_str, cid) # Creates new connection
elseif is_publish == true && NATS_connection !== nothing
publish_message(NATS_connection, subject, env_json_str, cid) # Uses provided connection
end
```
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
@@ -442,10 +476,10 @@ end
```
**Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields:
- Returns a JSON object (dictionary) containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
- `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
@@ -459,6 +493,57 @@ end
**Note:** The `fileserver_download_handler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
#### publish_message Function
The `publish_message` function provides two overloads for publishing messages to NATS:
**Overload 1 - URL-based publishing (creates new connection):**
```julia
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url) # Create NATS connection
publish_message(conn, subject, message, correlation_id)
end
```
**Overload 2 - Connection-based publishing (uses pre-existing connection):**
```julia
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject") # Log successful publish
finally
NATS.drain(conn) # Ensure connection is closed properly
end
end
```
**Use Case:** Use the connection-based overload when you already have an established NATS connection and want to publish multiple messages without the overhead of creating a new connection for each publish. This is a Julia-specific optimization that leverages function overloading.
**Integration with smartsend:**
```julia
# When NATS_connection is provided to smartsend, it uses the connection-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
NATS_connection=my_connection, # Pre-existing connection
is_publish=true
)
# Uses: publish_message(NATS_connection, subject, env_json_str, cid)
# When NATS_connection is not provided, it uses the URL-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
broker_url="nats://localhost:4222",
is_publish=true
)
# Uses: publish_message(broker_url, subject, env_json_str, cid)
```
**API Consistency Note:**
- **High-level API (smartsend, smartreceive):** Uses consistent naming across all three languages (Julia, JavaScript, Python/Micropython)
- **Low-level native functions (NATS.connect(), publish_message()):** Follow the conventions of the specific language ecosystem and do not require cross-language consistency
### JavaScript Implementation
#### Dependencies
@@ -489,13 +574,15 @@ async function smartsend(
- `receiver_id` (String) - Message receiver ID (default: `""`)
- `reply_to` (String) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (String) - Message ID this message is replying to (default: `""`)
- `is_publish` (Boolean) - Whether to automatically publish the message to NATS (default: `true`)
- `fileserver_upload_handler` (Function) - Custom upload handler function
**Note:** JavaScript uses `is_publish` option (instead of `NATS_connection` keyword) to control automatic publishing behavior. Connection reuse can be achieved by creating a NATS client outside the function and reusing it in a custom `fileserver_upload_handler` or custom publish implementation.
**Return Value:**
- Returns a Promise that resolves to an object containing:
- `env` - The envelope object containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
- `published` - Boolean indicating whether the message was automatically published to NATS
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
@@ -531,16 +618,16 @@ async function smartreceive(msg, options = {})
- Returns a Promise that resolves to an object containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data with keys: `dataname`, `data`, `payload_type`
- `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads` array
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
- Determine transport type (`direct` or `link`)
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`.
@@ -557,26 +644,40 @@ async function smartreceive(msg, options = {})
#### smartsend Function
```python
async def smartsend(
def smartsend(
subject: str,
data: List[Tuple[str, Any, str]], # List of (dataname, data, type) tuples
options: Dict = {}
)
broker_url: str = DEFAULT_BROKER_URL,
fileserver_url: str = DEFAULT_FILESERVER_URL,
fileserver_upload_handler: Callable = plik_oneshot_upload,
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
correlation_id: Union[str, None] = None,
msg_purpose: str = "chat",
sender_name: str = "NATSBridge",
receiver_name: str = "",
receiver_id: str = "",
reply_to: str = "",
reply_to_msg_id: str = "",
is_publish: bool = True
) -> Tuple[MessageEnvelope, str]
```
**Options:**
- `broker_url` (str) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (str) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (int) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (str) - Optional correlation ID for tracing
- `correlation_id` (str) - Optional correlation ID for tracing (auto-generated if None)
- `msg_purpose` (str) - Purpose of the message (default: `"chat"`)
- `sender_name` (str) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (str) - Message receiver name (default: `""`)
- `receiver_id` (str) - Message receiver ID (default: `""`)
- `reply_to` (str) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (str) - Message ID this message is replying to (default: `""`)
- `is_publish` (bool) - Whether to automatically publish the message to NATS (default: `True`)
- `fileserver_upload_handler` (Callable) - Custom upload handler function
**Note:** Python uses `is_publish` parameter (instead of `NATS_connection` keyword) to control automatic publishing behavior. Connection reuse can be achieved by creating a NATS client outside the function and reusing it in a custom `fileserver_upload_handler` or custom publish implementation.
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env` - The envelope dictionary containing all metadata and payloads
@@ -616,7 +717,7 @@ async def smartreceive(
- `correlation_id` (str) - Optional correlation ID for tracing
**Output Format:**
- Returns a dictionary containing all envelope fields:
- Returns a JSON object (dictionary) containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of tuples, each containing `(dataname, data, payload_type)` with deserialized payload data
@@ -824,7 +925,7 @@ async def smartreceive(
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components across all platforms.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
## Performance Considerations

View File

@@ -28,9 +28,9 @@ The system uses **handler functions** to abstract file server operations, allowi
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8})
# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
fileserver_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter
@@ -148,10 +148,12 @@ NATSBridge is designed for seamless communication between Julia, JavaScript, and
### Example: Julia ↔ Python ↔ JavaScript
```julia
# Julia sender
# Julia sender - smartsend returns (env, env_json_str)
using NATSBridge
data = [("message", "Hello from Julia!", "text")]
smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
env, env_json_str = smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
```
```javascript
@@ -165,7 +167,7 @@ const env = await smartreceive(msg);
# Python sender
from nats_bridge import smartsend
data = [("response", "Hello from Python!", "text")]
smartsend("/cross_platform", data, nats_url="nats://localhost:4222")
smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
```
All three platforms can communicate seamlessly using the same NATS subjects and data format.
@@ -314,6 +316,30 @@ julia test/scenario3_julia_to_julia.jl
node test/scenario3_julia_to_julia.js
```
## API Consistency Across Languages
**High-Level API (Consistent Across All Languages):**
- `smartsend(subject, data, ...)` - Main publishing function
- `smartreceive(msg, ...)` - Main receiving function
- Message envelope structure (`msg_envelope_v1` / `MessageEnvelope`)
- Payload structure (`msg_payload_v1` / `MessagePayload`)
- Transport strategy (direct vs link based on size threshold)
- Supported payload types: text, dictionary, table, image, audio, video, binary
**Low-Level Native Functions (Language-Specific Conventions):**
- Julia: `NATS.connect()`, `publish_message()`, function overloading
- JavaScript: `nats.js` client, native async/await patterns
- Python: `nats-python` client, native async/await patterns
**Connection Reuse Pattern - Key Differences:**
- **Julia:** Uses `NATS_connection` keyword parameter with function overloading for automatic connection management
- **JavaScript/Python:** Achieved by creating NATS client outside the function and reusing it in custom handlers or custom publish implementations
**Why the Difference?**
- Julia supports function overloading and keyword arguments, allowing `NATS_connection` to be passed as an optional parameter
- JavaScript/Python use a simpler `is_publish` option to control automatic publishing
- For connection reuse in JavaScript/Python, create a NATS client once and reuse it in your custom `fileserver_upload_handler` or custom publish logic
## Usage
### Scenario 1: Command & Control (Small Dictionary)
@@ -324,12 +350,41 @@ node test/scenario3_julia_to_julia.js
```julia
using NATSBridge
# Subscribe to control subject
# Parse JSON envelope
# Execute simulation with parameters
# Send acknowledgment
# Send small dictionary config (wrapped in list with type)
config = Dict("step_size" => 0.01, "iterations" => 1000, "threshold" => 0.5)
env, env_json_str = smartsend(
"control",
[("config", config, "dictionary")],
broker_url="nats://localhost:4222"
)
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
```
**Julia (Sender/Receiver) with NATS_connection for connection reuse:**
```julia
using NATSBridge
# Create connection once for high-frequency publishing
conn = NATS.connect("nats://localhost:4222")
# Send multiple messages using the same connection (saves connection overhead)
for i in 1:100
config = Dict("iteration" => i, "data" => rand())
smartsend(
"control",
[("config", config, "dictionary")],
NATS_connection=conn, # Reuse connection
is_publish=true
)
end
# Close connection when done
NATS.close(conn)
```
**Use Case:** High-frequency publishing scenarios where connection reuse provides performance benefits by avoiding the overhead of establishing a new NATS connection for each message.
**JavaScript (Sender/Receiver):**
```javascript
const { smartsend } = require('./src/NATSBridge');
@@ -342,9 +397,39 @@ const config = {
threshold: 0.5
};
// Use is_publish option to control automatic publishing
await smartsend("control", [
{ dataname: "config", data: config, type: "dictionary" }
]);
], {
is_publish: true // Automatically publish to NATS
});
```
**Connection Reuse in JavaScript:**
To achieve connection reuse in JavaScript, create a NATS client outside the function and use it in a custom `fileserver_upload_handler` or custom publish implementation:
```javascript
const { connect } = require('nats');
const { smartsend } = require('./src/NATSBridge');
// Create connection once
const nc = await connect({ servers: ['nats://localhost:4222'] });
// Send multiple messages using the same connection
for (let i = 0; i < 100; i++) {
const config = { iteration: i, data: Math.random() };
// Option 1: Use is_publish=false and publish manually with your connection
const { env, env_json_str } = await smartsend("control", [
{ dataname: "config", data: config, type: "dictionary" }
], { is_publish: false });
// Publish with your existing connection
await nc.publish("control", env_json_str);
}
// Close connection when done
await nc.close();
```
**Python/Micropython (Sender/Receiver):**
@@ -359,7 +444,32 @@ config = {
"threshold": 0.5
}
smartsend("control", [("config", config, "dictionary")])
# Use is_publish parameter to control automatic publishing
smartsend("control", [("config", config, "dictionary")], is_publish=True)
```
**Connection Reuse in Python:**
To achieve connection reuse in Python, create a NATS client outside the function and use it in a custom `fileserver_upload_handler` or custom publish implementation:
```python
from nats_bridge import smartsend
import nats
# Create connection once
nc = await nats.connect("nats://localhost:4222")
# Send multiple messages using the same connection
for i in range(100):
config = {"iteration": i, "data": random.random()}
# Option 1: Use is_publish=False and publish manually with your connection
env, env_json_str = smartsend("control", [("config", config, "dictionary")], is_publish=False)
# Publish with your existing connection
await nc.publish("control", env_json_str)
# Close connection when done
await nc.close()
```
### Basic Multi-Payload Example
@@ -372,12 +482,12 @@ from nats_bridge import smartsend
smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080"
)
# Even single payload must be wrapped in a list with type
smartsend("/test", [("single_data", mydata, "dictionary")], nats_url="nats://localhost:4222")
smartsend("/test", [("single_data", mydata, "dictionary")], broker_url="nats://localhost:4222")
```
#### Python/Micropython (Receiver)
@@ -459,12 +569,127 @@ df = DataFrame(
category = rand(["A", "B", "C"], 10_000_000)
)
# Send via smartsend - wrapped in a list (type is part of each tuple)
env, env_json_str = smartsend("analysis_results", [("table_data", df, "table")], broker_url="nats://localhost:4222")
# env: msg_envelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
# Send via smartsend - wrapped in list with type
# Large payload will use link transport (HTTP fileserver)
env, env_json_str = smartsend(
"analysis_results",
[("table_data", df, "table")],
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080"
)
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
```
#### smartsend Function Signature (Julia)
```julia
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, Any, String}, 1}; # List of (dataname, data, type) tuples
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true,
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional)
)
```
**New Keyword Parameter:**
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios.
**Connection Handling Logic:**
```julia
if is_publish == false
# skip publish
elseif is_publish == true && NATS_connection === nothing
publish_message(broker_url, subject, env_json_str, cid) # Creates new connection
elseif is_publish == true && NATS_connection !== nothing
publish_message(NATS_connection, subject, env_json_str, cid) # Uses provided connection
end
```
**Example with pre-existing connection:**
```julia
using NATSBridge
# Create connection once
conn = NATS.connect("nats://localhost:4222")
# Send multiple messages using the same connection
for i in 1:100
data = rand(1000)
smartsend(
"analysis_results",
[("table_data", data, "table")],
NATS_connection=conn, # Reuse connection
is_publish=true
)
end
# Close connection when done
NATS.close(conn)
```
#### publish_message Function
The `publish_message` function provides two overloads for publishing messages to NATS:
**Overload 1 - URL-based publishing (creates new connection):**
```julia
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url) # Create NATS connection
publish_message(conn, subject, message, correlation_id)
end
```
**Overload 2 - Connection-based publishing (uses pre-existing connection):**
```julia
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject")
finally
NATS.drain(conn) # Ensure connection is closed properly
end
end
```
**Use Case:** Use the connection-based overload when you already have an established NATS connection and want to publish multiple messages without the overhead of creating a new connection for each publish.
**Integration with smartsend:**
```julia
# When NATS_connection is provided to smartsend, it uses the connection-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
NATS_connection=my_connection, # Pre-existing connection
is_publish=true
)
# Uses: publish_message(NATS_connection, subject, env_json_str, cid)
# When NATS_connection is not provided, it uses the URL-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
broker_url="nats://localhost:4222",
is_publish=true
)
# Uses: publish_message(broker_url, subject, env_json_str, cid)
```
**API Consistency Note:**
- **Julia:** Uses `NATS_connection` keyword parameter with function overloading for automatic connection management
- **JavaScript/Python:** Use `is_publish` option and achieve connection reuse by creating NATS client outside the function and reusing it in custom handlers or custom publish implementations
#### JavaScript (Receiver)
```javascript
const { smartreceive } = require('./src/NATSBridge');
@@ -482,19 +707,12 @@ const table = env.payloads;
```python
from nats_bridge import smartsend
# Binary data wrapped in a list
binary_data = [
("audio_chunk", binary_buffer, "binary")
]
# Binary data wrapped in list with type
smartsend(
"binary_input",
binary_data,
nats_url="nats://localhost:4222",
metadata={
"sample_rate": 44100,
"channels": 1
}
[("audio_chunk", binary_buffer, "binary")],
broker_url="nats://localhost:4222",
metadata={"sample_rate": 44100, "channels": 1}
)
```
@@ -558,10 +776,14 @@ function process_binary(msg) {
```julia
using NATSBridge
function publish_health_status(nats_url)
# Send status wrapped in a list (type is part of each tuple)
function publish_health_status(broker_url)
# Send status wrapped in list with type
status = Dict("cpu" => rand(), "memory" => rand())
smartsend("health", [("status", status, "dictionary")], broker_url=nats_url)
env, env_json_str = smartsend(
"health",
[("status", status, "dictionary")],
broker_url=broker_url
)
sleep(5) # Every 5 seconds
end
```
@@ -604,8 +826,8 @@ def handle_device_config(msg):
env = smartreceive(msg)
# Process configuration from payloads
for dataname, data, type in env["payloads"]:
if type == "dictionary":
for dataname, data, payload_type in env["payloads"]:
if payload_type == "dictionary":
print(f"Received configuration: {data}")
# Apply configuration to device
if "wifi_ssid" in data:
@@ -622,7 +844,7 @@ def handle_device_config(msg):
smartsend(
"device/response",
[("config", config, "dictionary")],
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
reply_to=env.get("reply_to")
)
```
@@ -670,12 +892,14 @@ options_df = DataFrame(
# Check payload size (< 1MB threshold)
# Publish directly to NATS with Base64-encoded payload
# Include metadata for dashboard selection context
smartsend(
env, env_json_str = smartsend(
"dashboard.selection",
[("options_table", options_df, "table")],
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
metadata=Dict("context" => "user_selection")
)
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
```
**JavaScript (Receiver):**
@@ -709,7 +933,6 @@ await smartsend("dashboard.response", [
**Julia (Sender/Receiver):**
```julia
using NATSBridge
using DataFrames
# Build chat message with mixed payloads:
# - Text: direct transport (Base64)
@@ -733,13 +956,15 @@ chat_message = [
("large_document", large_file_bytes, "binary") # Large file, link transport
]
smartsend(
env, env_json_str = smartsend(
"chat.room123",
chat_message,
broker_url="nats://localhost:4222",
msg_purpose="chat",
reply_to="chat.room123.responses"
)
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
```
**JavaScript (Sender/Receiver):**

9
etc.jl
View File

@@ -0,0 +1,9 @@
Task: Update README.md to reflect recent changes in NATSbridge package.
Context: the package has been updated with the NATS_connection keyword and the publish_message function.
Requirements:
Source of Truth: Treat the updated NATSbridge code as the definitive source. Update README.md to align exactly with these changes.
API Consistency: Ensure the Main Package API (e.g., smartsend(), publish_message()) uses consistent naming across all three supported languages.
Ecosystem Variance: Low-level native functions (e.g., NATS.connect(), JSON.read()) should follow the conventions of the specific language ecosystem and do not require cross-language consistency.

View File

@@ -109,11 +109,11 @@ from nats_bridge import smartsend
# Send a text message (is_publish=True by default)
data = [("message", "Hello World", "text")]
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222")
print("Message sent!")
# Or use is_publish=False to get envelope and JSON without publishing
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222", is_publish=False)
env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222", is_publish=False)
# env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS
```
@@ -126,14 +126,14 @@ const { smartsend } = require('./src/NATSBridge');
// Send a text message (isPublish=true by default)
await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
], { brokerUrl: "nats://localhost:4222" });
console.log("Message sent!");
// Or use isPublish=false to get envelope and JSON without publishing
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222", isPublish: false });
], { brokerUrl: "nats://localhost:4222", isPublish: false });
// env: MessageEnvelope object
// env_json_str: JSON string for publishing to NATS
```
@@ -145,8 +145,8 @@ using NATSBridge
# Send a text message
data = [("message", "Hello World", "text")]
env, env_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# env: msgEnvelope_v1 object with all metadata and payloads
env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222")
# env: msg_envelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
println("Message sent!")
```
@@ -208,7 +208,7 @@ config = {
# Send as dictionary type
data = [("config", config, "dictionary")]
env, env_json_str = smartsend("/device/config", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/device/config", data, broker_url="nats://localhost:4222")
```
#### JavaScript
@@ -224,7 +224,7 @@ const config = {
const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" }
]);
], { brokerUrl: "nats://localhost:4222" });
```
#### Julia
@@ -239,7 +239,7 @@ config = Dict(
)
data = [("config", config, "dictionary")]
env, env_json_str = smartsend("/device/config", data)
env, env_json_str = smartsend("/device/config", data, broker_url="nats://localhost:4222")
```
### Example 2: Sending Binary Data (Image)
@@ -255,7 +255,7 @@ with open("image.png", "rb") as f:
# Send as binary type
data = [("user_image", image_data, "binary")]
env, env_json_str = smartsend("/chat/image", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/image", data, broker_url="nats://localhost:4222")
```
#### JavaScript
@@ -269,7 +269,7 @@ const image_data = fs.readFileSync('image.png');
const { env, env_json_str } = await smartsend("/chat/image", [
{ dataname: "user_image", data: image_data, type: "binary" }
]);
], { brokerUrl: "nats://localhost:4222" });
```
#### Julia
@@ -281,7 +281,7 @@ using NATSBridge
image_data = read("image.png")
data = [("user_image", image_data, "binary")]
env, env_json_str = smartsend("/chat/image", data)
env, env_json_str = smartsend("/chat/image", data, broker_url="nats://localhost:4222")
```
### Example 3: Request-Response Pattern
@@ -296,11 +296,11 @@ data = [("command", {"action": "read_sensor"}, "dictionary")]
env, env_json_str = smartsend(
"/device/command",
data,
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
reply_to="/device/response",
reply_to_msg_id="cmd-001"
)
# env: msgEnvelope_v1 object
# env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS
```
@@ -361,7 +361,7 @@ large_data = os.urandom(2_000_000) # 2MB of random data
env, env_json_str = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080",
size_threshold=1_000_000
)
@@ -383,6 +383,7 @@ view.fill(42); // Fill with some data
const { env, env_json_str } = await smartsend("/data/large", [
{ dataname: "large_file", data: largeData, type: "binary" }
], {
brokerUrl: "nats://localhost:4222",
fileserverUrl: "http://localhost:8080",
sizeThreshold: 1_000_000
});
@@ -399,6 +400,7 @@ large_data = rand(UInt8, 2_000_000)
env, env_json_str = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080"
)
@@ -425,7 +427,7 @@ data = [
("user_avatar", image_data, "image")
]
env, env_json_str = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/mixed", data, broker_url="nats://localhost:4222")
```
#### JavaScript
@@ -446,7 +448,7 @@ const { env, env_json_str } = await smartsend("/chat/mixed", [
data: fs.readFileSync("avatar.png"),
type: "image"
}
]);
], { brokerUrl: "nats://localhost:4222" });
```
#### Julia
@@ -461,7 +463,7 @@ data = [
("user_avatar", image_data, "image")
]
env, env_json_str = smartsend("/chat/mixed", data)
env, env_json_str = smartsend("/chat/mixed", data, broker_url="nats://localhost:4222")
```
### Example 6: Table Data (Arrow IPC)
@@ -483,7 +485,7 @@ df = pd.DataFrame({
# Send as table type
data = [("students", df, "table")]
env, env_json_str = smartsend("/data/students", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/data/students", data, broker_url="nats://localhost:4222")
```
#### Julia
@@ -500,7 +502,7 @@ df = DataFrame(
)
data = [("students", df, "table")]
env, env_json_str = smartsend("/data/students", data)
env, env_json_str = smartsend("/data/students", data, broker_url="nats://localhost:4222")
```
---
@@ -519,7 +521,7 @@ using NATSBridge
# Send dictionary from Julia to JavaScript
config = Dict("step_size" => 0.01, "iterations" => 1000)
data = [("config", config, "dictionary")]
env, env_json_str = smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/analysis/config", data, broker_url="nats://localhost:4222")
```
#### JavaScript Receiver
@@ -546,7 +548,7 @@ const { smartsend } = require('./src/NATSBridge');
const { env, env_json_str } = await smartsend("/data/transfer", [
{ dataname: "message", data: "Hello from JS!", type: "text" }
]);
], { brokerUrl: "nats://localhost:4222" });
```
#### Python Receiver
@@ -568,7 +570,7 @@ for dataname, data, type in env["payloads"]:
from nats_bridge import smartsend
data = [("message", "Hello from Python!", "text")]
env, env_json_str = smartsend("/chat/python", data)
env, env_json_str = smartsend("/chat/python", data, broker_url="nats://localhost:4222")
```
#### Julia Receiver
@@ -576,7 +578,7 @@ env, env_json_str = smartsend("/chat/python", data)
```julia
using NATSBridge
env = smartreceive(msg, fileserverDownloadHandler)
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
for (dataname, data, type) in env["payloads"]
if type == "text"
println("Received from Python: $data")

View File

@@ -136,6 +136,7 @@ class ChatUI {
`/chat/${this.currentRoom}`,
data,
{
brokerUrl: window.config.broker_url,
fileserverUrl: window.config.fileserver_url,
sizeThreshold: window.config.size_threshold
}
@@ -288,8 +289,8 @@ Let's build a file transfer system that handles large files efficiently.
const { smartsend } = require('./NATSBridge');
class FileUploadService {
constructor(natsUrl, fileserverUrl) {
this.natsUrl = natsUrl;
constructor(brokerUrl, fileserverUrl) {
this.brokerUrl = brokerUrl;
this.fileserverUrl = fileserverUrl;
}
@@ -308,7 +309,7 @@ class FileUploadService {
`/files/${recipient}`,
data,
{
natsUrl: this.natsUrl,
brokerUrl: this.brokerUrl,
fileserverUrl: this.fileserverUrl,
sizeThreshold: 1048576
}
@@ -419,7 +420,7 @@ async function uploadFile(config) {
const filePath = await rl.question('Enter file path: ');
const recipient = await rl.question('Enter recipient: ');
const fileService = new FileUploadService(config.nats_url, config.fileserver_url);
const fileService = new FileUploadService(config.broker_url, config.fileserver_url);
try {
const env = await fileService.uploadFile(filePath, recipient);
@@ -500,8 +501,8 @@ import time
import random
class SensorSender:
def __init__(self, nats_url: str, fileserver_url: str):
self.nats_url = nats_url
def __init__(self, broker_url: str, fileserver_url: str):
self.broker_url = broker_url
self.fileserver_url = fileserver_url
def send_reading(self, sensor_id: str, value: float, unit: str):
@@ -518,7 +519,7 @@ class SensorSender:
smartsend(
f"/sensors/{sensor_id}",
data,
nats_url=self.nats_url,
broker_url=self.broker_url,
fileserver_url=self.fileserver_url
)
@@ -537,7 +538,7 @@ class SensorSender:
env, env_json_str = smartsend(
f"/sensors/{sensor_id}/prepare",
data,
nats_url=self.nats_url,
broker_url=self.broker_url,
fileserver_url=self.fileserver_url,
is_publish=False
)
@@ -572,7 +573,7 @@ class SensorSender:
smartsend(
f"/sensors/batch",
data,
nats_url=self.nats_url,
broker_url=self.broker_url,
fileserver_url=self.fileserver_url
)
else:
@@ -631,17 +632,17 @@ Let's build an IoT device using Micropython that connects to NATS.
import json
class DeviceConfig:
def __init__(self, ssid, password, nats_url, device_id):
def __init__(self, ssid, password, broker_url, device_id):
self.ssid = ssid
self.password = password
self.nats_url = nats_url
self.broker_url = broker_url
self.device_id = device_id
def to_dict(self):
return {
"ssid": self.ssid,
"password": self.password,
"nats_url": self.nats_url,
"broker_url": self.broker_url,
"device_id": self.device_id
}
```
@@ -656,7 +657,7 @@ import json
class DeviceBridge:
def __init__(self, config):
self.config = config
self.nats_url = config.nats_url
self.broker_url = config.broker_url
def connect(self):
# Connect to WiFi
@@ -676,7 +677,7 @@ class DeviceBridge:
smartsend(
f"/devices/{self.config.device_id}/status",
data,
nats_url=self.nats_url
broker_url=self.broker_url
)
def send_sensor_data(self, sensor_id, value, unit):
@@ -687,7 +688,7 @@ class DeviceBridge:
smartsend(
f"/devices/{self.config.device_id}/sensors/{sensor_id}",
data,
nats_url=self.nats_url
broker_url=self.broker_url
)
def receive_commands(self, callback):
@@ -725,7 +726,7 @@ import random
config = DeviceConfig(
ssid="MyNetwork",
password="password123",
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
device_id="device-001"
)
@@ -774,8 +775,8 @@ import pyarrow as pa
import io
class DashboardServer:
def __init__(self, nats_url, fileserver_url):
self.nats_url = nats_url
def __init__(self, broker_url, fileserver_url):
self.broker_url = broker_url
self.fileserver_url = fileserver_url
def broadcast_data(self, df):
@@ -792,7 +793,7 @@ class DashboardServer:
smartsend(
"/dashboard/data",
data,
nats_url=self.nats_url,
broker_url=self.broker_url,
fileserver_url=self.fileserver_url
)
@@ -836,6 +837,7 @@ class DashboardUI {
const { env, env_json_str } = await smartsend("/dashboard/request", [
{ dataname: "request", data: { type: "refresh" }, type: "dictionary" }
], {
brokerUrl: window.config.broker_url,
fileserverUrl: window.config.fileserver_url
});
}
@@ -954,7 +956,7 @@ def send_batch_readings(self, readings):
smartsend(
"/sensors/batch",
[("batch", arrow_data, "table")],
nats_url=self.nats_url
broker_url=self.broker_url
)
```

View File

@@ -380,9 +380,10 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
- `sender_name::String = "NATSBridge"` - Name of the sender
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
- `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast)
- `reply_to::String = ""` - Topic to reply to (empty string if no reply expected)
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
- `is_publish::Bool = true` - Whether to automatically publish the message to NATS
- `reply_to::String = ""` - Topic to reply to (empty string if no reply expected)
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
- `is_publish::Bool = true` - Whether to automatically publish the message to NATS
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection (if provided, uses this connection instead of creating a new one; saves connection establishment overhead)
# Return:
- A tuple `(env, env_json_str)` where:
@@ -431,12 +432,12 @@ function smartsend(
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true # some time the user want to get env and env_json_str from this function without publishing the msg
is_publish::Bool = true, # some time the user want to get env and env_json_str from this function without publishing the msg
NATS_connection::Union{NATS.Connection, Nothing} = nothing # a provided connection saves establishing connection overhead.
) where {T1<:Any}
# Generate correlation ID if not provided
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
# Generate message metadata
@@ -516,8 +517,12 @@ function smartsend(
)
env_json_str = envelope_to_json(env) # Convert envelope to JSON
if is_publish
if is_publish == false
# skip publish a message
elseif is_publish == true && NATS_connection === nothing
publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS
elseif is_publish == true && NATS_connection !== nothing
publish_message(NATS_connection, subject, env_json_str, cid) # Publish message to NATS
end
return (env, env_json_str)
@@ -649,7 +654,7 @@ end
""" publish_message - Publish message to NATS
This internal function publishes a message to a NATS subject with proper
This function publishes a message to a NATS subject with proper
connection management and logging.
# Arguments:
@@ -662,18 +667,52 @@ connection management and logging.
- `nothing` - This function performs publishing but returns nothing
# Example
```jldoctest
using NATS
```jldoctest
using NATS
# Prepare JSON message
message = "{\"correlation_id\":\"abc123\",\"payload\":\"test\"}"
# Prepare JSON message
message = "{\"correlation_id\":\"abc123\",\"payload\":\"test\"}"
# Publish to NATS
publish_message("nats://localhost:4222", "my.subject", message, "abc123")
```
# Publish to NATS
publish_message("nats://localhost:4222", "my.subject", message, "abc123")
```
"""
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url) # Create NATS connection
publish_message(conn, subject, message, correlation_id)
end
""" publish_message - Publish message to NATS using pre-existing connection
This function publishes a message to a NATS subject using a pre-existing NATS connection,
avoiding the overhead of connection establishment.
# Arguments:
- `conn::NATS.Connection` - Pre-existing NATS connection
- `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
- `message::String` - JSON message to publish
- `correlation_id::String` - Correlation ID for tracing and logging
# Return:
- `nothing` - This function performs publishing but returns nothing
# Example
```jldoctest
using NATS
# Prepare JSON message
message = "{\"correlation_id\":\"abc123\",\"payload\":\"test\"}"
# Create connection once and reuse for multiple publishes
conn = NATS.connect("nats://localhost:4222")
publish_message(conn, "my.subject", message, "abc123")
# Connection is automatically drained after publish
```
# Use Case:
Use this version when you already have an established NATS connection and want to publish
multiple messages without the overhead of creating a new connection for each publish.
"""
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject") # Log successful publish
@@ -706,7 +745,7 @@ A HTTP file server is required along with its download function.
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
# Return:
- `Vector{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples
- JSON object of envelope with list of (dataname, data, data_type) tuples in payloads field
# Example
```jldoctest
@@ -724,22 +763,22 @@ function smartreceive(
max_delay::Int = 5000
)
# Parse the JSON envelope
json_data = JSON.parse(String(msg.payload))
log_trace(json_data["correlation_id"], "Processing received message") # Log message processing start
env_json_obj = JSON.parse(String(msg.payload))
log_trace(env_json_obj["correlation_id"], "Processing received message") # Log message processing start
# Process all payloads in the envelope
payloads_list = Tuple{String, Any, String}[]
# Get number of payloads
num_payloads = length(json_data["payloads"])
num_payloads = length(env_json_obj["payloads"])
for i in 1:num_payloads
payload = json_data["payloads"][i]
payload = env_json_obj["payloads"][i]
transport = String(payload["transport"])
dataname = String(payload["dataname"])
if transport == "direct" # Direct transport - payload is in the message
log_trace(json_data["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
# Extract base64 payload from the payload
payload_b64 = String(payload["data"])
@@ -749,28 +788,28 @@ function smartreceive(
# Deserialize based on type
data_type = String(payload["payload_type"])
data = _deserialize_data(payload_bytes, data_type, json_data["correlation_id"])
data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"])
push!(payloads_list, (dataname, data, data_type))
elseif transport == "link" # Link transport - payload is at URL
# Extract download URL from the payload
url = String(payload["data"])
log_trace(json_data["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
# Fetch with exponential backoff using the download handler
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, json_data["correlation_id"])
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"])
# Deserialize based on type
data_type = String(payload["payload_type"])
data = _deserialize_data(downloaded_data, data_type, json_data["correlation_id"])
data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"])
push!(payloads_list, (dataname, data, data_type))
else # Unknown transport type
error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
end
end
json_data["payloads"] = payloads_list
return json_data # Return envelope with list of (dataname, data, data_type) tuples in payloads field
env_json_obj["payloads"] = payloads_list
return env_json_obj # JSON object of envelope with list of (dataname, data, data_type) tuples in payloads field
end
@@ -915,7 +954,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
# Arguments:
- `file_server_url::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `filename::String` - Name of the file being uploaded
- `dataname::String` - Name of the file being uploaded
- `data::Vector{UInt8}` - Raw byte data of the file content
# Return:
@@ -929,18 +968,18 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
```jldoctest
using HTTP, JSON
file_server_url = "http://localhost:8080"
filename = "test.txt"
fileserver_url = "http://localhost:8080"
dataname = "test.txt"
data = Vector{UInt8}("hello world")
# Upload to local plik server
result = plik_oneshot_upload(file_server_url, filename, data)
result = plik_oneshot_upload(file_server_url, dataname, data)
# Access the result as a Dict
# result["status"], result["uploadid"], result["fileid"], result["url"]
```
"""
function plik_oneshot_upload(file_server_url::String, filename::String, data::Vector{UInt8})
function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8})
# ----------------------------------------- get upload id ---------------------------------------- #
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
@@ -954,7 +993,7 @@ function plik_oneshot_upload(file_server_url::String, filename::String, data::Ve
# ------------------------------------------ upload file ----------------------------------------- #
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload
url_upload = "$file_server_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
@@ -974,7 +1013,7 @@ function plik_oneshot_upload(file_server_url::String, filename::String, data::Ve
fileid = response_json["id"]
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
url = "$file_server_url/file/$uploadid/$fileid/$filename"
url = "$file_server_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
@@ -1006,7 +1045,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
```jldoctest
using HTTP, JSON
file_server_url = "http://localhost:8080"
fileserver_url = "http://localhost:8080"
filepath = "./test.zip"
# Upload to local plik server
@@ -1056,7 +1095,7 @@ function plik_oneshot_upload(file_server_url::String, filepath::String)
end
function _get_payload_bytes(data)
@error "didn't implement yet"
@error "Didn't implement yet. The developer will implement this function later."
end

View File

@@ -98,6 +98,26 @@ function base64ToArrayBuffer(base64) {
return bytes.buffer;
}
// Helper: Convert Uint8Array to Base64 string
function uint8ArrayToBase64(uint8array) {
let binary = '';
for (let i = 0; i < uint8array.byteLength; i++) {
binary += String.fromCharCode(uint8array[i]);
}
return btoa(binary);
}
// Helper: Convert Base64 string to Uint8Array
function base64ToUint8Array(base64) {
const binaryString = atob(base64);
const len = binaryString.length;
const bytes = new Uint8Array(len);
for (let i = 0; i < len; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
return bytes;
}
// Helper: Serialize data based on type
function _serialize_data(data, type) {
/**
@@ -114,39 +134,39 @@ function _serialize_data(data, type) {
*/
if (type === "text") {
if (typeof data === 'string') {
return new TextEncoder().encode(data).buffer;
return new TextEncoder().encode(data);
} else {
throw new Error("Text data must be a String");
}
} else if (type === "dictionary") {
// JSON data - serialize directly
const jsonStr = JSON.stringify(data);
return new TextEncoder().encode(jsonStr).buffer;
return new TextEncoder().encode(jsonStr);
} else if (type === "table") {
// Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
// This would require the apache-arrow library
throw new Error("Table serialization requires apache-arrow library");
} else if (type === "image") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer;
return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else {
throw new Error("Image data must be ArrayBuffer or Uint8Array");
}
} else if (type === "audio") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer;
return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else {
throw new Error("Audio data must be ArrayBuffer or Uint8Array");
}
} else if (type === "video") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer;
return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else {
throw new Error("Video data must be ArrayBuffer or Uint8Array");
}
} else if (type === "binary") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer;
return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else {
throw new Error("Binary data must be ArrayBuffer or Uint8Array");
}
@@ -171,10 +191,10 @@ function _deserialize_data(data, type, correlation_id) {
*/
if (type === "text") {
const decoder = new TextDecoder();
return decoder.decode(new Uint8Array(data));
return decoder.decode(data);
} else if (type === "dictionary") {
const decoder = new TextDecoder();
const jsonStr = decoder.decode(new Uint8Array(data));
const jsonStr = decoder.decode(data);
return JSON.parse(jsonStr);
} else if (type === "table") {
// Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
@@ -230,7 +250,7 @@ async function _upload_to_fileserver(fileserver_url, dataname, data, correlation
// Create multipart form data
const formData = new FormData();
// Create a Blob from the ArrayBuffer
// Create a Blob from the Uint8Array
const blob = new Blob([data], { type: "application/octet-stream" });
formData.append("file", blob, dataname);
@@ -276,7 +296,7 @@ async function _fetch_with_backoff(url, max_retries, base_delay, max_delay, corr
if (response.status === 200) {
log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`);
const arrayBuffer = await response.arrayBuffer();
return arrayBuffer;
return new Uint8Array(arrayBuffer);
} else {
throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`);
}
@@ -306,25 +326,26 @@ function _get_payload_bytes(data) {
}
}
// MessagePayload class
// MessagePayload class - matches msg_payload_v1 Julia struct
class MessagePayload {
/**
* Represents a single payload in the message envelope
* Matches Julia's msg_payload_v1 struct
*
* @param {Object} options - Payload options
* @param {string} options.id - ID of this payload (e.g., "uuid4")
* @param {string} options.dataname - Name of this payload (e.g., "login_image")
* @param {string} options.type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
* @param {string} options.payload_type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
* @param {string} options.transport - "direct" or "link"
* @param {string} options.encoding - "none", "json", "base64", "arrow-ipc"
* @param {number} options.size - Data size in bytes
* @param {string|ArrayBuffer} options.data - Payload data (direct) or URL (link)
* @param {string|Uint8Array} options.data - Payload data (Uint8Array for direct, URL string for link)
* @param {Object} options.metadata - Metadata for this payload
*/
constructor(options) {
this.id = options.id || uuid4();
this.dataname = options.dataname;
this.type = options.type;
this.payload_type = options.payload_type;
this.transport = options.transport;
this.encoding = options.encoding;
this.size = options.size;
@@ -332,27 +353,27 @@ class MessagePayload {
this.metadata = options.metadata || {};
}
// Convert to JSON object
// Convert to JSON object - uses snake_case to match Julia API
toJSON() {
const obj = {
id: this.id,
dataname: this.dataname,
type: this.type,
payload_type: this.payload_type,
transport: this.transport,
encoding: this.encoding,
size: this.size
};
// Include data based on transport type
if (this.transport === "direct" && this.data !== null) {
if (this.transport === "direct" && this.data !== null && this.data !== undefined) {
if (this.encoding === "base64" || this.encoding === "json") {
obj.data = this.data;
} else {
// For other encodings, use base64
const payloadBytes = _get_payload_bytes(this.data);
obj.data = arrayBufferToBase64(payloadBytes);
obj.data = uint8ArrayToBase64(payloadBytes);
}
} else if (this.transport === "link" && this.data !== null) {
} else if (this.transport === "link" && this.data !== null && this.data !== undefined) {
// For link transport, data is a URL string
obj.data = this.data;
}
@@ -365,59 +386,60 @@ class MessagePayload {
}
}
// MessageEnvelope class
// MessageEnvelope class - matches msg_envelope_v1 Julia struct
class MessageEnvelope {
/**
* Represents the message envelope containing metadata and payloads
* Matches Julia's msg_envelope_v1 struct
*
* @param {Object} options - Envelope options
* @param {string} options.sendTo - Topic/subject the sender sends to
* @param {Array<MessagePayload>} options.payloads - Array of payloads
* @param {string} options.correlationId - Unique identifier to track messages
* @param {string} options.msgId - This message id
* @param {string} options.correlation_id - Unique identifier to track messages
* @param {string} options.msg_id - This message id
* @param {string} options.timestamp - Message published timestamp
* @param {string} options.msgPurpose - Purpose of this message
* @param {string} options.senderName - Name of the sender
* @param {string} options.senderId - UUID of the sender
* @param {string} options.receiverName - Name of the receiver
* @param {string} options.receiverId - UUID of the receiver
* @param {string} options.replyTo - Topic to reply to
* @param {string} options.replyToMsgId - Message id this message is replying to
* @param {string} options.brokerURL - NATS server address
* @param {string} options.send_to - Topic/subject the sender sends to
* @param {string} options.msg_purpose - Purpose of this message
* @param {string} options.sender_name - Name of the sender
* @param {string} options.sender_id - UUID of the sender
* @param {string} options.receiver_name - Name of the receiver
* @param {string} options.receiver_id - UUID of the receiver
* @param {string} options.reply_to - Topic to reply to
* @param {string} options.reply_to_msg_id - Message id this message is replying to
* @param {string} options.broker_url - NATS server address
* @param {Object} options.metadata - Metadata for the envelope
* @param {Array<MessagePayload>} options.payloads - Array of payloads
*/
constructor(options) {
this.correlationId = options.correlationId || uuid4();
this.msgId = options.msgId || uuid4();
this.correlation_id = options.correlation_id || uuid4();
this.msg_id = options.msg_id || uuid4();
this.timestamp = options.timestamp || new Date().toISOString();
this.sendTo = options.sendTo;
this.msgPurpose = options.msgPurpose || "";
this.senderName = options.senderName || "";
this.senderId = options.senderId || uuid4();
this.receiverName = options.receiverName || "";
this.receiverId = options.receiverId || "";
this.replyTo = options.replyTo || "";
this.replyToMsgId = options.replyToMsgId || "";
this.brokerURL = options.brokerURL || DEFAULT_NATS_URL;
this.send_to = options.send_to;
this.msg_purpose = options.msg_purpose || "";
this.sender_name = options.sender_name || "";
this.sender_id = options.sender_id || uuid4();
this.receiver_name = options.receiver_name || "";
this.receiver_id = options.receiver_id || "";
this.reply_to = options.reply_to || "";
this.reply_to_msg_id = options.reply_to_msg_id || "";
this.broker_url = options.broker_url || DEFAULT_NATS_URL;
this.metadata = options.metadata || {};
this.payloads = options.payloads || [];
}
// Convert to JSON string
// Convert to JSON object - uses snake_case to match Julia API
toJSON() {
const obj = {
correlationId: this.correlationId,
msgId: this.msgId,
correlation_id: this.correlation_id,
msg_id: this.msg_id,
timestamp: this.timestamp,
sendTo: this.sendTo,
msgPurpose: this.msgPurpose,
senderName: this.senderName,
senderId: this.senderId,
receiverName: this.receiverName,
receiverId: this.receiverId,
replyTo: this.replyTo,
replyToMsgId: this.replyToMsgId,
brokerURL: this.brokerURL
send_to: this.send_to,
msg_purpose: this.msg_purpose,
sender_name: this.sender_name,
sender_id: this.sender_id,
receiver_name: this.receiver_name,
receiver_id: this.receiver_id,
reply_to: this.reply_to,
reply_to_msg_id: this.reply_to_msg_id,
broker_url: this.broker_url
};
if (Object.keys(this.metadata).length > 0) {
@@ -437,7 +459,7 @@ class MessageEnvelope {
}
}
// SmartSend function
// SmartSend function - matches Julia smartsend signature and behavior
async function smartsend(subject, data, options = {}) {
/**
* Send data either directly via NATS or via a fileserver URL, depending on payload size
@@ -447,42 +469,42 @@ async function smartsend(subject, data, options = {}) {
* Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS.
*
* @param {string} subject - NATS subject to publish the message to
* @param {Array} data - List of {dataname, data, type} objects to send
* @param {Array} data - List of {dataname, data, type} objects to send (must be a list, even for single payload)
* @param {Object} options - Additional options
* @param {string} options.natsUrl - URL of the NATS server (default: "nats://localhost:4222")
* @param {string} options.fileserverUrl - Base URL of the file server (default: "http://localhost:8080")
* @param {Function} options.fileserverUploadHandler - Function to handle fileserver uploads
* @param {number} options.sizeThreshold - Threshold in bytes separating direct vs link transport (default: 1MB)
* @param {string} options.correlationId - Optional correlation ID for tracing
* @param {string} options.msgPurpose - Purpose of the message (default: "chat")
* @param {string} options.senderName - Name of the sender (default: "NATSBridge")
* @param {string} options.receiverName - Name of the receiver (default: "")
* @param {string} options.receiverId - UUID of the receiver (default: "")
* @param {string} options.replyTo - Topic to reply to (default: "")
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "")
* @param {boolean} options.isPublish - Whether to automatically publish the message to NATS (default: true)
* @param {string} options.broker_url - URL of the NATS server (default: "nats://localhost:4222")
* @param {string} options.fileserver_url - Base URL of the file server (default: "http://localhost:8080")
* @param {Function} options.fileserver_upload_handler - Function to handle fileserver uploads
* @param {number} options.size_threshold - Threshold in bytes separating direct vs link transport (default: 1MB)
* @param {string} options.correlation_id - Optional correlation ID for tracing
* @param {string} options.msg_purpose - Purpose of the message (default: "chat")
* @param {string} options.sender_name - Name of the sender (default: "NATSBridge")
* @param {string} options.receiver_name - Name of the receiver (default: "")
* @param {string} options.receiver_id - UUID of the receiver (default: "")
* @param {string} options.reply_to - Topic to reply to (default: "")
* @param {string} options.reply_to_msg_id - Message ID this message is replying to (default: "")
* @param {boolean} options.is_publish - Whether to automatically publish the message to NATS (default: true)
*
* @returns {Promise<Object>} - An object with { env: MessageEnvelope, env_json_str: string }
* @returns {Promise<Object>} - A tuple-like object with { env: MessageEnvelope, env_json_str: string }
*/
const {
natsUrl = DEFAULT_NATS_URL,
fileserverUrl = DEFAULT_FILESERVER_URL,
fileserverUploadHandler = _upload_to_fileserver,
sizeThreshold = DEFAULT_SIZE_THRESHOLD,
correlationId = uuid4(),
msgPurpose = "chat",
senderName = "NATSBridge",
receiverName = "",
receiverId = "",
replyTo = "",
replyToMsgId = "",
isPublish = true // Whether to automatically publish the message to NATS
broker_url = DEFAULT_NATS_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler = _upload_to_fileserver,
size_threshold = DEFAULT_SIZE_THRESHOLD,
correlation_id = uuid4(),
msg_purpose = "chat",
sender_name = "NATSBridge",
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = "",
is_publish = true // Whether to automatically publish the message to NATS
} = options;
log_trace(correlationId, `Starting smartsend for subject: ${subject}`);
log_trace(correlation_id, `Starting smartsend for subject: ${subject}`);
// Generate message metadata
const msgId = uuid4();
const msg_id = uuid4();
// Process each payload in the list
const payloads = [];
@@ -496,18 +518,18 @@ async function smartsend(subject, data, options = {}) {
const payloadBytes = _serialize_data(payloadData, payloadType);
const payloadSize = payloadBytes.byteLength;
log_trace(correlationId, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);
log_trace(correlation_id, `Serialized payload '${dataname}' (payload_type: ${payloadType}) size: ${payloadSize} bytes`);
// Decision: Direct vs Link
if (payloadSize < sizeThreshold) {
if (payloadSize < size_threshold) {
// Direct path - Base64 encode and send via NATS
const payloadB64 = arrayBufferToBase64(payloadBytes);
log_trace(correlationId, `Using direct transport for ${payloadSize} bytes`);
const payloadB64 = uint8ArrayToBase64(payloadBytes);
log_trace(correlation_id, `Using direct transport for ${payloadSize} bytes`);
// Create MessagePayload for direct transport
const payloadObj = new MessagePayload({
dataname: dataname,
type: payloadType,
payload_type: payloadType,
transport: "direct",
encoding: "base64",
size: payloadSize,
@@ -517,22 +539,22 @@ async function smartsend(subject, data, options = {}) {
payloads.push(payloadObj);
} else {
// Link path - Upload to HTTP server, send URL via NATS
log_trace(correlationId, `Using link transport, uploading to fileserver`);
log_trace(correlation_id, `Using link transport, uploading to fileserver`);
// Upload to HTTP server
const response = await fileserverUploadHandler(fileserverUrl, dataname, payloadBytes, correlationId);
const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes, correlation_id);
if (response.status !== 200) {
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
}
const url = response.url;
log_trace(correlationId, `Uploaded to URL: ${url}`);
log_trace(correlation_id, `Uploaded to URL: ${url}`);
// Create MessagePayload for link transport
const payloadObj = new MessagePayload({
dataname: dataname,
type: payloadType,
payload_type: payloadType,
transport: "link",
encoding: "none",
size: payloadSize,
@@ -545,16 +567,16 @@ async function smartsend(subject, data, options = {}) {
// Create MessageEnvelope with all payloads
const env = new MessageEnvelope({
correlationId: correlationId,
msgId: msgId,
sendTo: subject,
msgPurpose: msgPurpose,
senderName: senderName,
receiverName: receiverName,
receiverId: receiverId,
replyTo: replyTo,
replyToMsgId: replyToMsgId,
brokerURL: natsUrl,
correlation_id: correlation_id,
msg_id: msg_id,
send_to: subject,
msg_purpose: msg_purpose,
sender_name: sender_name,
receiver_name: receiver_name,
receiver_id: receiver_id,
reply_to: reply_to,
reply_to_msg_id: reply_to_msg_id,
broker_url: broker_url,
payloads: payloads
});
@@ -562,11 +584,11 @@ async function smartsend(subject, data, options = {}) {
const env_json_str = env.toString();
// Publish to NATS if isPublish is true
if (isPublish) {
await publish_message(natsUrl, subject, env_json_str, correlationId);
if (is_publish) {
await publish_message(broker_url, subject, env_json_str, correlation_id);
}
// Return both envelope and JSON string (tuple-like structure)
// Return both envelope and JSON string (tuple-like structure, matching Julia API)
return {
env: env,
env_json_str: env_json_str
@@ -574,11 +596,11 @@ async function smartsend(subject, data, options = {}) {
}
// Helper: Publish message to NATS
async function publish_message(natsUrl, subject, message, correlation_id) {
async function publish_message(broker_url, subject, message, correlation_id) {
/**
* Publish a message to a NATS subject with proper connection management
*
* @param {string} natsUrl - NATS server URL
* @param {string} broker_url - NATS server URL
* @param {string} subject - NATS subject to publish to
* @param {string} message - JSON message to publish
* @param {string} correlation_id - Correlation ID for logging
@@ -591,7 +613,7 @@ async function publish_message(natsUrl, subject, message, correlation_id) {
// Example with nats.js:
// import { connect } from 'nats';
// const nc = await connect({ servers: [natsUrl] });
// const nc = await connect({ servers: [broker_url] });
// await nc.publish(subject, message);
// nc.close();
@@ -599,7 +621,7 @@ async function publish_message(natsUrl, subject, message, correlation_id) {
console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`);
}
// SmartReceive function
// SmartReceive function - matches Julia smartreceive signature and behavior
async function smartreceive(msg, options = {}) {
/**
* Receive and process messages from NATS
@@ -609,25 +631,25 @@ async function smartreceive(msg, options = {}) {
*
* @param {Object} msg - NATS message object with payload property
* @param {Object} options - Additional options
* @param {Function} options.fileserverDownloadHandler - Function to handle downloading data from file server URLs
* @param {number} options.maxRetries - Maximum retry attempts for fetching URL (default: 5)
* @param {number} options.baseDelay - Initial delay for exponential backoff in ms (default: 100)
* @param {number} options.maxDelay - Maximum delay for exponential backoff in ms (default: 5000)
* @param {Function} options.fileserver_download_handler - Function to handle downloading data from file server URLs
* @param {number} options.max_retries - Maximum retry attempts for fetching URL (default: 5)
* @param {number} options.base_delay - Initial delay for exponential backoff in ms (default: 100)
* @param {number} options.max_delay - Maximum delay for exponential backoff in ms (default: 5000)
*
* @returns {Promise<Object>} - Envelope dictionary with metadata and payloads field containing list of {dataname, data, type} objects
* @returns {Promise<Object>} - JSON object of envelope with payloads field containing list of {dataname, data, type} tuples
*/
const {
fileserverDownloadHandler = _fetch_with_backoff,
maxRetries = 5,
baseDelay = 100,
maxDelay = 5000
fileserver_download_handler = _fetch_with_backoff,
max_retries = 5,
base_delay = 100,
max_delay = 5000
} = options;
// Parse the JSON envelope
const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
const json_data = JSON.parse(jsonStr);
log_trace(json_data.correlationId, `Processing received message`);
log_trace(json_data.correlation_id, `Processing received message`);
// Process all payloads in the envelope
const payloads_list = [];
@@ -642,32 +664,32 @@ async function smartreceive(msg, options = {}) {
if (transport === "direct") {
// Direct transport - payload is in the message
log_trace(json_data.correlationId, `Direct transport - decoding payload '${dataname}'`);
log_trace(json_data.correlation_id, `Direct transport - decoding payload '${dataname}'`);
// Extract base64 payload from the payload
const payload_b64 = payload.data;
// Decode Base64 payload
const payload_bytes = base64ToArrayBuffer(payload_b64);
const payload_bytes = base64ToUint8Array(payload_b64);
// Deserialize based on type
const data_type = payload.type;
const data = _deserialize_data(payload_bytes, data_type, json_data.correlationId);
const data_type = payload.payload_type;
const data = _deserialize_data(payload_bytes, data_type, json_data.correlation_id);
payloads_list.push({ dataname, data, type: data_type });
} else if (transport === "link") {
// Link transport - payload is at URL
const url = payload.data;
log_trace(json_data.correlationId, `Link transport - fetching '${dataname}' from URL: ${url}`);
log_trace(json_data.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`);
// Fetch with exponential backoff using the download handler
const downloaded_data = await fileserverDownloadHandler(
url, maxRetries, baseDelay, maxDelay, json_data.correlationId
const downloaded_data = await fileserver_download_handler(
url, max_retries, base_delay, max_delay, json_data.correlation_id
);
// Deserialize based on type
const data_type = payload.type;
const data = _deserialize_data(downloaded_data, data_type, json_data.correlationId);
const data_type = payload.payload_type;
const data = _deserialize_data(downloaded_data, data_type, json_data.correlation_id);
payloads_list.push({ dataname, data, type: data_type });
} else {
@@ -676,11 +698,69 @@ async function smartreceive(msg, options = {}) {
}
// Replace payloads array with the processed list of {dataname, data, type} tuples
// This matches Julia's smartreceive return format
json_data.payloads = payloads_list;
return json_data;
}
// plik_oneshot_upload - matches Julia plik_oneshot_upload function
async function plik_oneshot_upload(file_server_url, dataname, data) {
/**
* Upload a single file to a plik server using one-shot mode
* This function uploads raw byte array to a plik server in one-shot mode (no upload session).
* It first creates a one-shot upload session by sending a POST request with {"OneShot": true},
* retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
*
* @param {string} file_server_url - Base URL of the plik server (e.g., "http://localhost:8080")
* @param {string} dataname - Name of the file being uploaded
* @param {Uint8Array} data - Raw byte data of the file content
* @returns {Promise<Object>} - Dictionary with keys: status, uploadid, fileid, url
*/
// Step 1: Get upload ID and token
const url_getUploadID = `${file_server_url}/upload`;
const headers = { "Content-Type": "application/json" };
const body = JSON.stringify({ OneShot: true });
let http_response = await fetch(url_getUploadID, {
method: "POST",
headers: headers,
body: body
});
const response_json = await http_response.json();
const uploadid = response_json.id;
const uploadtoken = response_json.uploadToken;
// Step 2: Upload file data
const url_upload = `${file_server_url}/file/${uploadid}`;
// Create multipart form data
const formData = new FormData();
const blob = new Blob([data], { type: "application/octet-stream" });
formData.append("file", blob, dataname);
http_response = await fetch(url_upload, {
method: "POST",
headers: { "X-UploadToken": uploadtoken },
body: formData
});
const fileResponseJson = await http_response.json();
const fileid = fileResponseJson.id;
// URL of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
const url = `${file_server_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
return {
status: http_response.status,
uploadid: uploadid,
fileid: fileid,
url: url
};
}
// Export for Node.js
if (typeof module !== 'undefined' && module.exports) {
module.exports = {
@@ -692,6 +772,7 @@ if (typeof module !== 'undefined' && module.exports) {
_deserialize_data,
_fetch_with_backoff,
_upload_to_fileserver,
plik_oneshot_upload,
DEFAULT_SIZE_THRESHOLD,
DEFAULT_NATS_URL,
DEFAULT_FILESERVER_URL,
@@ -711,6 +792,7 @@ if (typeof window !== 'undefined') {
_deserialize_data,
_fetch_with_backoff,
_upload_to_fileserver,
plik_oneshot_upload,
DEFAULT_SIZE_THRESHOLD,
DEFAULT_NATS_URL,
DEFAULT_FILESERVER_URL,

View File

@@ -1,45 +1,60 @@
"""
Micropython NATS Bridge - Bi-Directional Data Bridge for Micropython
Python NATS Bridge - Bi-Directional Data Bridge
This module provides functionality for sending and receiving data over NATS
using the Claim-Check pattern for large payloads.
Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
Multi-Payload Support (Standard API):
The system uses a standardized list-of-tuples format for all payload operations.
Even when sending a single payload, the user must wrap it in a list.
API Standard:
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (always returns a list of tuples)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
"""
import json
import random
import time
import usocket
import uselect
import ustruct
import uuid
try:
import ussl
HAS_SSL = True
except ImportError:
HAS_SSL = False
# Constants
DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport
DEFAULT_NATS_URL = "nats://localhost:4222"
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"
# ============================================= 100 ============================================== #
class MessagePayload:
"""Internal message payload structure representing a single payload within a NATS message envelope."""
"""Internal message payload structure representing a single payload within a NATS message envelope.
def __init__(self, data, msg_type, id="", dataname="", transport="direct",
This structure represents a single payload within a NATS message envelope.
It supports both direct transport (base64-encoded data) and link transport (URL-based).
Attributes:
id: Unique identifier for this payload (e.g., "uuid4")
dataname: Name of the payload (e.g., "login_image")
payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
transport: Transport method ("direct" or "link")
encoding: Encoding method ("none", "json", "base64", "arrow-ipc")
size: Size of the payload in bytes
data: Payload data (bytes for direct, URL for link)
metadata: Optional metadata dictionary
"""
def __init__(self, data, payload_type, id="", dataname="", transport="direct",
encoding="none", size=0, metadata=None):
"""
Initialize a MessagePayload.
Args:
data: Payload data (bytes for direct, URL string for link)
msg_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
data: Payload data (base64 string for direct, URL string for link)
payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
id: Unique identifier for this payload (auto-generated if empty)
dataname: Name of the payload (auto-generated UUID if empty)
transport: Transport method ("direct" or "link")
@@ -49,7 +64,7 @@ class MessagePayload:
"""
self.id = id if id else self._generate_uuid()
self.dataname = dataname if dataname else self._generate_uuid()
self.type = msg_type
self.payload_type = payload_type
self.transport = transport
self.encoding = encoding
self.size = size
@@ -65,7 +80,7 @@ class MessagePayload:
payload_dict = {
"id": self.id,
"dataname": self.dataname,
"type": self.type,
"payload_type": self.payload_type,
"transport": self.transport,
"encoding": self.encoding,
"size": self.size,
@@ -152,20 +167,24 @@ class MessageEnvelope:
return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime())
def to_json(self):
"""Convert envelope to JSON string."""
"""Convert envelope to JSON string.
Returns:
str: JSON string representation of the envelope using snake_case field names
"""
obj = {
"correlationId": self.correlation_id,
"msgId": self.msg_id,
"correlation_id": self.correlation_id,
"msg_id": self.msg_id,
"timestamp": self.timestamp,
"sendTo": self.send_to,
"msgPurpose": self.msg_purpose,
"senderName": self.sender_name,
"senderId": self.sender_id,
"receiverName": self.receiver_name,
"receiverId": self.receiver_id,
"replyTo": self.reply_to,
"replyToMsgId": self.reply_to_msg_id,
"brokerURL": self.broker_url
"send_to": self.send_to,
"msg_purpose": self.msg_purpose,
"sender_name": self.sender_name,
"sender_id": self.sender_id,
"receiver_name": self.receiver_name,
"receiver_id": self.receiver_id,
"reply_to": self.reply_to,
"reply_to_msg_id": self.reply_to_msg_id,
"broker_url": self.broker_url
}
# Include metadata if not empty
@@ -188,68 +207,126 @@ def log_trace(correlation_id, message):
print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message))
def _serialize_data(data, msg_type):
def _serialize_data(data, payload_type):
"""Serialize data according to specified format.
This function serializes arbitrary data into a binary representation based on the specified type.
It supports multiple serialization formats for different data types.
Args:
data: Data to serialize
msg_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary")
- "text": String
- "dictionary": JSON-serializable dict
- "table": Tabular data (pandas DataFrame or list of dicts)
- "image", "audio", "video", "binary": bytes
payload_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary")
Returns:
bytes: Binary representation of the serialized data
Example:
>>> text_bytes = _serialize_data("Hello World", "text")
>>> json_bytes = _serialize_data({"key": "value"}, "dictionary")
>>> table_bytes = _serialize_data([{"id": 1, "name": "Alice"}], "table")
"""
if msg_type == "text":
if payload_type == "text":
if isinstance(data, str):
return data.encode('utf-8')
else:
raise ValueError("Text data must be a string")
elif msg_type == "dictionary":
elif payload_type == "dictionary":
if isinstance(data, dict):
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Dictionary data must be a dict")
elif msg_type in ("image", "audio", "video", "binary"):
elif payload_type == "table":
# Support pandas DataFrame or list of dicts
try:
import pandas as pd
if isinstance(data, pd.DataFrame):
# Convert DataFrame to JSON and then to bytes
json_str = data.to_json(orient='records', force_ascii=False)
return json_str.encode('utf-8')
elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
# List of dicts
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Table data must be a pandas DataFrame or list of dicts")
except ImportError:
# Fallback: if pandas not available, treat as list of dicts
if isinstance(data, list):
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Table data requires pandas DataFrame or list of dicts (pandas not available)")
elif payload_type in ("image", "audio", "video", "binary"):
if isinstance(data, bytes):
return data
else:
raise ValueError("{} data must be bytes".format(msg_type.capitalize()))
raise ValueError("{} data must be bytes".format(payload_type.capitalize()))
else:
raise ValueError("Unknown type: {}".format(msg_type))
raise ValueError("Unknown payload_type: {}".format(payload_type))
def _deserialize_data(data_bytes, msg_type, correlation_id):
def _deserialize_data(data_bytes, payload_type, correlation_id):
"""Deserialize bytes to data based on type.
This function converts serialized bytes back to Python data based on type.
It handles "text" (string), "dictionary" (JSON deserialization), "table" (JSON deserialization),
"image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data).
Args:
data_bytes: Serialized data as bytes
msg_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
payload_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
correlation_id: Correlation ID for logging
Returns:
Deserialized data
Deserialized data:
- "text": str
- "dictionary": dict
- "table": list of dicts (or pandas DataFrame if available)
- "image", "audio", "video", "binary": bytes
Example:
>>> text_data = _deserialize_data(b"Hello", "text", "corr_id")
>>> json_data = _deserialize_data(b'{"key": "value"}', "dictionary", "corr_id")
>>> table_data = _deserialize_data(b'[{"id": 1}]', "table", "corr_id")
"""
if msg_type == "text":
if payload_type == "text":
return data_bytes.decode('utf-8')
elif msg_type == "dictionary":
elif payload_type == "dictionary":
json_str = data_bytes.decode('utf-8')
return json.loads(json_str)
elif msg_type in ("image", "audio", "video", "binary"):
elif payload_type == "table":
# Deserialize table data (JSON format)
json_str = data_bytes.decode('utf-8')
table_data = json.loads(json_str)
# If pandas is available, try to convert to DataFrame
try:
import pandas as pd
return pd.DataFrame(table_data)
except ImportError:
return table_data
elif payload_type in ("image", "audio", "video", "binary"):
return data_bytes
else:
raise ValueError("Unknown type: {}".format(msg_type))
raise ValueError("Unknown payload_type: {}".format(payload_type))
class NATSConnection:
"""Simple NATS connection for Micropython."""
"""Simple NATS connection for Python and Micropython."""
def __init__(self, url=DEFAULT_NATS_URL):
def __init__(self, url=DEFAULT_BROKER_URL):
"""Initialize NATS connection.
Args:
@@ -276,9 +353,19 @@ class NATSConnection:
def connect(self):
"""Connect to NATS server."""
addr = usocket.getaddrinfo(self.host, self.port)[0][-1]
self.conn = usocket.socket()
self.conn.connect(addr)
# Use socket for both Python and Micropython
try:
import socket
addr = socket.getaddrinfo(self.host, self.port)[0][-1]
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.connect(addr)
except NameError:
# Micropython fallback
import usocket
addr = usocket.getaddrinfo(self.host, self.port)[0][-1]
self.conn = usocket.socket()
self.conn.connect(addr)
log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port))
def publish(self, subject, message):
@@ -294,7 +381,15 @@ class NATSConnection:
# Simple NATS protocol implementation
msg = "PUB {} {}\r\n".format(subject, len(message))
msg = msg.encode('utf-8') + message + b"\r\n"
self.conn.send(msg)
try:
import socket
self.conn.send(msg)
except NameError:
# Micropython fallback
import usocket
self.conn.send(msg)
log_trace("", "Message published to {}".format(subject))
def subscribe(self, subject, callback):
@@ -335,11 +430,14 @@ class NATSConnection:
def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""):
"""Fetch data from URL with exponential backoff.
This function retrieves data from a URL with retry logic using
exponential backoff to handle transient failures.
Args:
url: URL to fetch from
max_retries: Maximum number of retry attempts
base_delay: Initial delay in milliseconds
max_delay: Maximum delay in milliseconds
max_retries: Maximum number of retry attempts (default: 5)
base_delay: Initial delay in milliseconds (default: 100)
max_delay: Maximum delay in milliseconds (default: 5000)
correlation_id: Correlation ID for logging
Returns:
@@ -347,33 +445,54 @@ def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, corr
Raises:
Exception: If all retry attempts fail
Example:
>>> data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "corr_id")
"""
delay = base_delay
for attempt in range(1, max_retries + 1):
try:
# Simple HTTP GET request
# This is a simplified implementation
# For production, you'd want a proper HTTP client
import urequests
response = urequests.get(url)
if response.status_code == 200:
# Try urequests for Micropython first, then requests for Python
try:
import urequests
response = urequests.get(url)
status_code = response.status_code
content = response.content
except ImportError:
try:
import requests
response = requests.get(url)
response.raise_for_status()
status_code = response.status_code
content = response.content
except ImportError:
raise Exception("No HTTP library available (urequests or requests)")
if status_code == 200:
log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt))
return response.content
return content
else:
raise Exception("Failed to fetch: {}".format(response.status_code))
raise Exception("Failed to fetch: {}".format(status_code))
except Exception as e:
log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e)))
if attempt < max_retries:
time.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
raise Exception("Failed to fetch data after {} attempts".format(max_retries))
def plik_oneshot_upload(file_server_url, filename, data):
def plik_oneshot_upload(fileserver_url, dataname, data):
"""Upload a single file to a plik server using one-shot mode.
This function uploads raw byte data to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with {"OneShot": true},
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
Args:
file_server_url: Base URL of the plik server
filename: Name of the file being uploaded
fileserver_url: Base URL of the plik server (e.g., "http://localhost:8080")
dataname: Name of the file being uploaded
data: Raw byte data of the file content
Returns:
@@ -382,23 +501,31 @@ def plik_oneshot_upload(file_server_url, filename, data):
- "uploadid": ID of the one-shot upload session
- "fileid": ID of the uploaded file within the session
- "url": Full URL to download the uploaded file
Example:
>>> result = plik_oneshot_upload("http://localhost:8080", "test.txt", b"hello world")
>>> result["status"], result["uploadid"], result["fileid"], result["url"]
"""
import urequests
import json
try:
import urequests
except ImportError:
import requests as urequests
# Get upload ID
url_get_upload_id = "{}/upload".format(file_server_url)
url_get_upload_id = "{}/upload".format(fileserver_url)
headers = {"Content-Type": "application/json"}
body = json.dumps({"OneShot": True})
response = urequests.post(url_get_upload_id, headers=headers, data=body)
response_json = json.loads(response.content)
response_json = json.loads(response.text if hasattr(response, 'text') else response.content)
uploadid = response_json.get("id")
uploadtoken = response_json.get("uploadToken")
# Upload file
url_upload = "{}/file/{}".format(file_server_url, uploadid)
url_upload = "{}/file/{}".format(fileserver_url, uploadid)
headers = {"X-UploadToken": uploadtoken}
# For Micropython, we need to construct the multipart form data manually
@@ -407,7 +534,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
# Create multipart body
part1 = "--{}\r\n".format(boundary)
part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(filename)
part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(dataname)
part1 += "Content-Type: application/octet-stream\r\n\r\n"
part1_bytes = part1.encode('utf-8')
@@ -421,10 +548,10 @@ def plik_oneshot_upload(file_server_url, filename, data):
content_type = "multipart/form-data; boundary={}".format(boundary)
response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body)
response_json = json.loads(response.content)
response_json = json.loads(response.text if hasattr(response, 'text') else response.content)
fileid = response_json.get("id")
url = "{}/file/{}/{}".format(file_server_url, uploadid, filename)
url = "{}/file/{}/{}".format(fileserver_url, uploadid, dataname)
return {
"status": response.status_code,
@@ -434,7 +561,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
}
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL,
def smartsend(subject, data, broker_url=DEFAULT_BROKER_URL, fileserver_url=DEFAULT_FILESERVER_URL,
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True):
@@ -447,27 +574,38 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
Args:
subject: NATS subject to publish the message to
data: List of (dataname, data, type) tuples to send
nats_url: URL of the NATS server
data: List of (dataname, data, payload_type) tuples to send
- dataname: Name of the payload
- data: The actual data to send
- payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
broker_url: URL of the NATS server
fileserver_url: URL of the HTTP file server
fileserver_upload_handler: Function to handle fileserver uploads
size_threshold: Threshold in bytes separating direct vs link transport
correlation_id: Optional correlation ID for tracing
msg_purpose: Purpose of the message
fileserver_upload_handler: Function to handle fileserver uploads (must return dict with "status", "uploadid", "fileid", "url" keys)
size_threshold: Threshold in bytes separating direct vs link transport (default: 1MB)
correlation_id: Optional correlation ID for tracing; if None, a UUID is generated
msg_purpose: Purpose of the message ("ACK", "NACK", "updateStatus", "shutdown", "chat", etc.)
sender_name: Name of the sender
receiver_name: Name of the receiver
receiver_id: UUID of the receiver
reply_to: Topic to reply to
receiver_name: Name of the receiver (empty string means broadcast)
receiver_id: UUID of the receiver (empty string means broadcast)
reply_to: Topic to reply to (empty string if no reply expected)
reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS (default: True)
- When True: message is published to NATS
- When False: returns envelope and JSON string without publishing
Returns:
tuple: (env, env_json_str) where:
- env: MessageEnvelope object with all metadata and payloads
- env_json_str: JSON string representation of the envelope for publishing
Example:
>>> data = [("message", "Hello World!", "text")]
>>> env, env_json_str = smartsend("/test", data)
>>> # env: MessageEnvelope with all metadata and payloads
>>> # env_json_str: JSON string for publishing
"""
# Generate correlation ID if not provided
cid = correlation_id if correlation_id else str(uuid.uuid4())
cid = correlation_id if correlation_id is not None else str(uuid.uuid4())
log_trace(cid, "Starting smartsend for subject: {}".format(subject))
@@ -482,16 +620,19 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes)
log_trace(cid, "Serialized payload '{}' (type: {}) size: {} bytes".format(
log_trace(cid, "Serialized payload '{}' (payload_type: {}) size: {} bytes".format(
dataname, payload_type, payload_size))
# Decision: Direct vs Link
if payload_size < size_threshold:
# Direct path - Base64 encode and send via NATS
payload_b64 = _serialize_data(payload_bytes, "binary") # Already bytes
# Convert to base64 string for JSON
import ubinascii
payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip()
try:
import ubinascii
payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip()
except ImportError:
import base64
payload_b64_str = base64.b64encode(payload_bytes).decode('utf-8')
log_trace(cid, "Using direct transport for {} bytes".format(payload_size))
@@ -514,10 +655,10 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
# Upload to HTTP server
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
if response["status"] != 200:
raise Exception("Failed to upload data to fileserver: {}".format(response["status"]))
if response.get("status") != 200:
raise Exception("Failed to upload data to fileserver: {}".format(response.get("status")))
url = response["url"]
url = response.get("url")
log_trace(cid, "Uploaded to URL: {}".format(url))
# Create MessagePayload for link transport
@@ -546,7 +687,7 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
receiver_id=receiver_id,
reply_to=reply_to,
reply_to_msg_id=reply_to_msg_id,
broker_url=nats_url,
broker_url=broker_url,
metadata={}
)
@@ -554,7 +695,7 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
# Publish to NATS if is_publish is True
if is_publish:
nats_conn = NATSConnection(nats_url)
nats_conn = NATSConnection(broker_url)
nats_conn.connect()
nats_conn.publish(subject, msg_json)
nats_conn.close()
@@ -571,18 +712,29 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
(base64 decoded payloads) and link transport (URL-based payloads).
Args:
msg: NATS message to process (dict with payload data)
msg: NATS message to process (dict or JSON string with envelope data)
fileserver_download_handler: Function to handle downloading data from file server URLs
max_retries: Maximum retry attempts for fetching URL
base_delay: Initial delay for exponential backoff in ms
max_delay: Maximum delay for exponential backoff in ms
Receives: (url, max_retries, base_delay, max_delay, correlation_id)
Returns: bytes (the downloaded data)
max_retries: Maximum retry attempts for fetching URL (default: 5)
base_delay: Initial delay for exponential backoff in ms (default: 100)
max_delay: Maximum delay for exponential backoff in ms (default: 5000)
Returns:
dict: Envelope dictionary with metadata and 'payloads' field containing list of (dataname, data, type) tuples
dict: Envelope dictionary with metadata and 'payloads' field containing list of
(dataname, data, payload_type) tuples
Example:
>>> env = smartreceive(msg)
>>> # env contains envelope metadata and payloads field
>>> # env["payloads"] = [(dataname1, data1, payload_type1), ...]
>>> for dataname, data, payload_type in env["payloads"]:
... print("Received {} of type {}: {}".format(dataname, payload_type, data))
"""
# Parse the JSON envelope
json_data = msg if isinstance(msg, dict) else json.loads(msg)
log_trace(json_data.get("correlationId", ""), "Processing received message")
correlation_id = json_data.get("correlation_id", "")
log_trace(correlation_id, "Processing received message")
# Process all payloads in the envelope
payloads_list = []
@@ -596,43 +748,47 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
dataname = payload.get("dataname", "")
if transport == "direct":
log_trace(json_data.get("correlationId", ""),
log_trace(correlation_id,
"Direct transport - decoding payload '{}'".format(dataname))
# Extract base64 payload from the payload
payload_b64 = payload.get("data", "")
# Decode Base64 payload
import ubinascii
payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8'))
try:
import ubinascii
payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8'))
except ImportError:
import base64
payload_bytes = base64.b64decode(payload_b64)
# Deserialize based on type
data_type = payload.get("type", "")
data = _deserialize_data(payload_bytes, data_type, json_data.get("correlationId", ""))
payload_type = payload.get("payload_type", "")
data = _deserialize_data(payload_bytes, payload_type, correlation_id)
payloads_list.append((dataname, data, data_type))
payloads_list.append((dataname, data, payload_type))
elif transport == "link":
# Extract download URL from the payload
url = payload.get("data", "")
log_trace(json_data.get("correlationId", ""),
log_trace(correlation_id,
"Link transport - fetching '{}' from URL: {}".format(dataname, url))
# Fetch with exponential backoff
downloaded_data = fileserver_download_handler(
url, max_retries, base_delay, max_delay, json_data.get("correlationId", "")
url, max_retries, base_delay, max_delay, correlation_id
)
# Deserialize based on type
data_type = payload.get("type", "")
data = _deserialize_data(downloaded_data, data_type, json_data.get("correlationId", ""))
payload_type = payload.get("payload_type", "")
data = _deserialize_data(downloaded_data, payload_type, correlation_id)
payloads_list.append((dataname, data, data_type))
payloads_list.append((dataname, data, payload_type))
else:
raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport))
# Replace payloads field with the processed list of (dataname, data, type) tuples
# Replace payloads field with the processed list of (dataname, data, payload_type) tuples
json_data["payloads"] = payloads_list
return json_data
@@ -651,11 +807,11 @@ def get_timestamp():
# Example usage
if __name__ == "__main__":
print("NATSBridge for Micropython")
print("=========================")
print("NATSBridge - Bi-Directional Data Bridge")
print("=======================================")
print("This module provides:")
print(" - MessageEnvelope: Message envelope structure")
print(" - MessagePayload: Payload structure")
print(" - MessageEnvelope: Message envelope structure with snake_case fields")
print(" - MessagePayload: Payload structure with payload_type field")
print(" - smartsend: Send data via NATS with automatic transport selection")
print(" - smartreceive: Receive and process messages from NATS")
print(" - plik_oneshot_upload: Upload files to HTTP file server")
@@ -663,10 +819,12 @@ if __name__ == "__main__":
print()
print("Usage:")
print(" from nats_bridge import smartsend, smartreceive")
print(" data = [(\"message\", \"Hello World\", \"text\")]")
print(" env = smartsend(\"my.subject\", data)")
print()
print(" # Send data (list of (dataname, data, payload_type) tuples)")
print(" data = [(\"message\", \"Hello World!\", \"text\")]")
print(" env, env_json_str = smartsend(\"my.subject\", data)")
print()
print(" # On receiver:")
print(" payloads = smartreceive(msg)")
print(" for dataname, data, type in payloads:")
print(" print(f\"Received {dataname} of type {type}: {data}\")")
print(" env = smartreceive(msg)")
print(" for dataname, data, payload_type in env[\"payloads\"]:")
print(" print(\"Received {} of type {}: {}\".format(dataname, payload_type, data))")