41 Commits

Author SHA1 Message Date
bee9f783d9 update 2026-02-25 17:38:50 +07:00
3e1c8d563e update 2026-02-25 15:20:29 +07:00
1299febcdc update 2026-02-25 14:25:08 +07:00
be94c62760 update 2026-02-25 12:24:02 +07:00
6a862ef243 update 2026-02-25 12:09:00 +07:00
ae2de5fc62 update 2026-02-25 10:33:30 +07:00
df0bbc7327 update 2026-02-25 09:58:10 +07:00
d94761c866 update 2026-02-25 09:44:08 +07:00
f8235e1a59 update 2026-02-25 08:54:04 +07:00
647cadf497 update 2026-02-25 08:33:32 +07:00
8c793a81b6 update 2026-02-25 08:02:03 +07:00
6a42ba7e43 update 2026-02-25 07:29:42 +07:00
14b3790251 update 2026-02-25 06:23:24 +07:00
61d81bed62 update 2026-02-25 06:04:40 +07:00
1a10bc1a5f update 2026-02-25 05:32:59 +07:00
7f68d08134 update 2026-02-24 21:40:33 +07:00
ab20cd896f update 2026-02-24 21:18:19 +07:00
5a9e93d6e7 update 2026-02-24 20:38:45 +07:00
b51641dc7e update 2026-02-24 20:09:10 +07:00
45f1257896 update 2026-02-24 18:50:28 +07:00
3e2b8b1e3a update 2026-02-24 18:19:03 +07:00
90d81617ef update 2026-02-24 17:58:59 +07:00
64c62e616b update 2026-02-23 22:06:57 +07:00
2c340e37c7 update 2026-02-23 22:00:06 +07:00
7853e94d2e update 2026-02-23 21:54:50 +07:00
99bf57b154 update 2026-02-23 21:43:09 +07:00
0fa6eaf95b update 2026-02-23 21:37:50 +07:00
76f42be740 update 2026-02-23 21:32:22 +07:00
d99dc41be9 update 2026-02-23 21:09:36 +07:00
263508b8f7 update 2026-02-23 20:50:41 +07:00
0c2cca30ed update 2026-02-23 20:34:08 +07:00
46fdf668c6 update 2026-02-23 19:18:12 +07:00
f8a92a45a0 update README.md 2026-02-23 09:39:24 +07:00
cec70e6036 update 2026-02-23 08:11:03 +07:00
f9e08ba628 add Plik fileserver 2026-02-23 07:58:18 +07:00
c12a078149 update README.md 2026-02-23 07:55:10 +07:00
dedd803dc3 fix README.md 2026-02-23 07:24:54 +07:00
e8e927a491 move README.md 2026-02-23 07:17:31 +07:00
ton
d950bbac23 Merge pull request 'smartreceive_return_envelope' (#7) from smartreceive_return_envelope into main
Reviewed-on: #7
2026-02-23 00:11:09 +00:00
fc8da2ebf5 update 2026-02-23 07:08:17 +07:00
f6e50c405f update 2026-02-23 07:06:53 +07:00
37 changed files with 3101 additions and 1377 deletions

View File

@@ -13,3 +13,41 @@ Role: Principal Systems Architect & Lead Software Engineer.Objective: Implement
Create a walkthrough for Julia service-A service sending a mix-content chat message to Julia service-B. the chat message must includes
I updated the following:
- NATSBridge.jl. Essentially I add NATS_connection keyword and new publish_message function to support the keyword.
Use them and ONLY them as ground truth.
Then update the following files accordingly:
- architecture.md
- implementation.md
All API should be semantically consistent and naming should be consistent across the board.
Task: Update NATSBridge.js to reflect recent changes in NATSBridge.jl and docs
Context: NATSBridge.jl and docs has been updated.
Requirements:
Source of Truth: Treat the updated NATSBridge.jl and docs as the definitive source.
API Consistency: Ensure the Main Package API (e.g., smartsend(), publish_message()) uses consistent naming across all three supported languages.
Ecosystem Variance: Low-level native functions (e.g., NATS.connect(), JSON.read()) should follow the conventions of the specific language ecosystem and do not require cross-language consistency.

View File

@@ -1,6 +1,6 @@
name = "NATSBridge"
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
version = "0.4.2"
version = "0.4.3"
authors = ["narawat <narawat@gmail.com>"]
[deps]

1019
README.md Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -17,16 +17,16 @@ The system uses **handler functions** to abstract file server operations, allowi
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserverUploadHandler parameter
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
fileserver_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserverDownloadHandler parameter
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
```
This design allows the system to support multiple file server backends without changing the core messaging logic.
@@ -40,21 +40,21 @@ The system uses a **standardized list-of-tuples format** for all payload operati
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# Output format for smartreceive (returns a dictionary-like object with payloads field containing list of tuples)
# Returns: Dict-like object with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# {
# "correlationId": "...",
# "msgId": "...",
# "correlation_id": "...",
# "msg_id": "...",
# "timestamp": "...",
# "sendTo": "...",
# "msgPurpose": "...",
# "senderName": "...",
# "senderId": "...",
# "receiverName": "...",
# "receiverId": "...",
# "replyTo": "...",
# "replyToMsgId": "...",
# "brokerURL": "...",
# "send_to": "...",
# "msg_purpose": "...",
# "sender_name": "...",
# "sender_id": "...",
# "receiver_name": "...",
# "receiver_id": "...",
# "reply_to": "...",
# "reply_to_msg_id": "...",
# "broker_url": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
@@ -78,17 +78,16 @@ This design allows per-payload type specification, enabling **mixed-content mess
smartsend(
"/test",
[("dataname1", data1, "dictionary")], # List with one tuple (data, type)
nats_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload,
metadata=user_provided_envelope_level_metadata
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads in one message with different types
smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
nats_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Mixed content (e.g., chat with text, image, audio)
@@ -99,13 +98,14 @@ smartsend(
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
nats_url="nats://localhost:4222"
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay)
# envelope["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# envelope["correlationId"], envelope["msgId"], etc.
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
```
## Architecture Diagram
@@ -138,48 +138,48 @@ flowchart TD
## System Components
### 1. msgEnvelope_v1 - Message Envelope
### 1. msg_envelope_v1 - Message Envelope
The `msgEnvelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications.
The `msg_envelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications.
**Julia Structure:**
```julia
struct msgEnvelope_v1
correlationId::String # Unique identifier to track messages across systems
msgId::String # This message id
struct msg_envelope_v1
correlation_id::String # Unique identifier to track messages across systems
msg_id::String # This message id
timestamp::String # Message published timestamp
sendTo::String # Topic/subject the sender sends to
msgPurpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...)
senderName::String # Sender name (e.g., "agent-wine-web-frontend")
senderId::String # Sender id (uuid4)
receiverName::String # Message receiver name (e.g., "agent-backend")
receiverId::String # Message receiver id (uuid4 or nothing for broadcast)
replyTo::String # Topic to reply to
replyToMsgId::String # Message id this message is replying to
brokerURL::String # NATS server address
send_to::String # Topic/subject the sender sends to
msg_purpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...)
sender_name::String # Sender name (e.g., "agent-wine-web-frontend")
sender_id::String # Sender id (uuid4)
receiver_name::String # Message receiver name (e.g., "agent-backend")
receiver_id::String # Message receiver id (uuid4 or nothing for broadcast)
reply_to::String # Topic to reply to
reply_to_msg_id::String # Message id this message is replying to
broker_url::String # NATS server address
metadata::Dict{String, Any}
payloads::AbstractArray{msgPayload_v1} # Multiple payloads stored here
payloads::Vector{msg_payload_v1} # Multiple payloads stored here
end
```
**JSON Schema:**
```json
{
"correlationId": "uuid-v4-string",
"msgId": "uuid-v4-string",
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"sendTo": "topic/subject",
"msgPurpose": "ACK | NACK | updateStatus | shutdown | chat",
"senderName": "agent-wine-web-frontend",
"senderId": "uuid4",
"receiverName": "agent-backend",
"receiverId": "uuid4",
"replyTo": "topic",
"replyToMsgId": "uuid4",
"brokerURL": "nats://localhost:4222",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
@@ -189,7 +189,7 @@ end
{
"id": "uuid4",
"dataname": "login_image",
"type": "image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,
@@ -201,7 +201,7 @@ end
{
"id": "uuid4",
"dataname": "large_data",
"type": "table",
"payload_type": "table",
"transport": "link",
"encoding": "none",
"size": 524288,
@@ -214,16 +214,16 @@ end
}
```
### 2. msgPayload_v1 - Payload Structure
### 2. msg_payload_v1 - Payload Structure
The `msgPayload_v1` structure provides flexible payload handling for various data types across all supported platforms.
The `msg_payload_v1` structure provides flexible payload handling for various data types across all supported platforms.
**Julia Structure:**
```julia
struct msgPayload_v1
struct msg_payload_v1
id::String # Id of this payload (e.g., "uuid4")
dataname::String # Name of this payload (e.g., "login_image")
type::String # "text | dictionary | table | image | audio | video | binary"
payload_type::String # "text | dictionary | table | image | audio | video | binary"
transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc"
size::Integer # Data size in bytes
@@ -369,6 +369,31 @@ graph TD
## Implementation Details
### API Consistency Across Languages
**High-Level API (Consistent Across All Languages):**
- `smartsend(subject, data, ...)` - Main publishing function
- `smartreceive(msg, ...)` - Main receiving function
- Message envelope structure (`msg_envelope_v1` / `MessageEnvelope`)
- Payload structure (`msg_payload_v1` / `MessagePayload`)
- Transport strategy (direct vs link based on size threshold)
- Supported payload types: text, dictionary, table, image, audio, video, binary
**Low-Level Native Functions (Language-Specific Conventions):**
- Julia: `NATS.connect()`, `publish_message()`, function overloading
- JavaScript: `nats.js` client, native async/await patterns
- Python: `nats-python` client, native async/await patterns
**Connection Reuse Pattern:**
- **Julia:** Uses `NATS_connection` keyword parameter with function overloading
- **JavaScript/Python:** Achieved by creating NATS client outside the function and reusing it in custom handlers
**Note on `is_publish`:**
- `is_publish` is simply a switch to control automatic publishing
- When `true` (default): Message is published to NATS automatically
- When `false`: Returns `(env, env_json_str)` without publishing, allowing manual publishing
- Connection reuse is achieved separately by creating NATS client outside the function
### Julia Implementation
#### Dependencies
@@ -383,13 +408,47 @@ graph TD
```julia
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, Any, String}}; # No standalone type parameter
nats_url::String = "nats://localhost:4222",
fileserverUploadHandler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000 # 1MB
data::AbstractArray{Tuple{String, Any, String}, 1}; # List of (dataname, data, type) tuples
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true, # Whether to automatically publish to NATS
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
)
```
**Keyword Parameter - NATS_connection:**
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios where connection reuse provides performance benefits.
**Connection Handling Logic:**
```julia
if is_publish == false
# skip publish a message
elseif is_publish == true && NATS_connection === nothing
publish_message(broker_url, subject, env_json_str, cid) # Creates new connection
elseif is_publish == true && NATS_connection !== nothing
publish_message(NATS_connection, subject, env_json_str, cid) # Uses provided connection
end
```
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
- `env_json_str::String` - JSON string representation of the envelope for publishing
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
The envelope object can be accessed directly for programmatic use, while the JSON string can be published directly to NATS using the request-reply pattern.
**Input Format:**
- `data::AbstractArray{Tuple{String, Any, String}}` - **Must be a list of (dataname, data, type) tuples**: `[("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
@@ -406,8 +465,8 @@ function smartsend(
```julia
function smartreceive(
msg::NATS.Message,
fileserverDownloadHandler::Function;
msg::NATS.Msg;
fileserver_download_handler::Function = _fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
@@ -416,17 +475,17 @@ function smartreceive(
# Iterate through all payloads
# For each payload: check transport type
# If direct: decode Base64 payload
# If link: fetch from URL with exponential backoff using fileserverDownloadHandler
# If link: fetch from URL with exponential backoff using fileserver_download_handler
# Deserialize payload based on type
# Return envelope dictionary with all metadata and deserialized payloads
end
```
**Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
- Returns a JSON object (dictionary) containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
- `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
@@ -434,71 +493,253 @@ end
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff (via `fileserverDownloadHandler`)
- If `link`: fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserverDownloadHandler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
**Note:** The `fileserver_download_handler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
#### publish_message Function
The `publish_message` function provides two overloads for publishing messages to NATS:
**Overload 1 - URL-based publishing (creates new connection):**
```julia
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url) # Create NATS connection
publish_message(conn, subject, message, correlation_id)
end
```
**Overload 2 - Connection-based publishing (uses pre-existing connection):**
```julia
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject") # Log successful publish
finally
NATS.drain(conn) # Ensure connection is closed properly
end
end
```
**Use Case:** Use the connection-based overload when you already have an established NATS connection and want to publish multiple messages without the overhead of creating a new connection for each publish. This is a Julia-specific optimization that leverages function overloading.
**Integration with smartsend:**
```julia
# When NATS_connection is provided to smartsend, it uses the connection-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
NATS_connection=my_connection, # Pre-existing connection
is_publish=true
)
# Uses: publish_message(NATS_connection, subject, env_json_str, cid)
# When NATS_connection is not provided, it uses the URL-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
broker_url="nats://localhost:4222",
is_publish=true
)
# Uses: publish_message(broker_url, subject, env_json_str, cid)
```
**API Consistency Note:**
- **High-level API (smartsend, smartreceive):** Uses consistent naming across all three languages (Julia, JavaScript, Python/Micropython)
- **Low-level native functions (NATS.connect(), publish_message()):** Follow the conventions of the specific language ecosystem and do not require cross-language consistency
### JavaScript Implementation
#### Dependencies
- `nats.js` - Core NATS functionality
- `apache-arrow` - Arrow IPC serialization
- `uuid` - Correlation ID generation
- `uuid` - Correlation ID and message ID generation
- `base64-arraybuffer` - Base64 encoding/decoding
- `node-fetch` or `fetch` - HTTP client for file server
#### smartsend Function
```javascript
async function smartsend(subject, data, options = {})
// data format: [(dataname, data, type), ...]
// options object should include:
// - natsUrl: NATS server URL
// - fileserverUrl: base URL of the file server
// - sizeThreshold: threshold in bytes for transport selection
// - correlationId: optional correlation ID for tracing
async function smartsend(
subject,
data, // List of (dataname, data, type) tuples: [(dataname1, data1, type1), ...]
options = {}
)
```
**Options:**
- `broker_url` (String) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (String) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (Number) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (String) - Optional correlation ID for tracing
- `msg_purpose` (String) - Purpose of the message (default: `"chat"`)
- `sender_name` (String) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (String) - Message receiver name (default: `""`)
- `receiver_id` (String) - Message receiver ID (default: `""`)
- `reply_to` (String) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (String) - Message ID this message is replying to (default: `""`)
- `is_publish` (Boolean) - Whether to automatically publish the message to NATS (default: `true`)
- `fileserver_upload_handler` (Function) - Custom upload handler function
**Note:** JavaScript uses `is_publish` option (instead of `NATS_connection` keyword) to control automatic publishing behavior. Connection reuse can be achieved by creating a NATS client outside the function and reusing it in a custom `fileserver_upload_handler` or custom publish implementation.
**Return Value:**
- Returns a Promise that resolves to an object containing:
- `env` - The envelope object containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Flow:**
1. Iterate through the list of (dataname, data, type) tuples
2. For each payload: extract the type from the tuple and serialize accordingly
3. Check payload size
4. If < threshold: publish directly to NATS
5. If >= threshold: upload to HTTP server, publish NATS with URL
1. Generate correlation ID and message ID if not provided
2. Iterate through the list of `(dataname, data, type)` tuples
3. For each payload:
- Serialize based on payload type
- Check payload size
- If < threshold: Base64 encode and include in envelope
- If >= threshold: Upload to HTTP server, store URL in envelope
4. Publish the JSON envelope to NATS
5. Return envelope object and JSON string
#### smartreceive Handler
```javascript
async function smartreceive(msg, options = {})
// options object should include:
// - fileserverDownloadHandler: function to fetch data from file server URL
// - max_retries: maximum retry attempts for fetching URL
// - base_delay: initial delay for exponential backoff in ms
// - max_delay: maximum delay for exponential backoff in ms
// - correlationId: optional correlation ID for tracing
```
**Options:**
- `fileserver_download_handler` (Function) - Custom download handler function
- `max_retries` (Number) - Maximum retry attempts for fetching URL (default: `5`)
- `base_delay` (Number) - Initial delay for exponential backoff in ms (default: `100`)
- `max_delay` (Number) - Maximum delay for exponential backoff in ms (default: `5000`)
- `correlation_id` (String) - Optional correlation ID for tracing
**Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
- Returns a Promise that resolves to an object containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
- `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads`
2. Iterate through each payload in `payloads` array
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`.
### Python/Micropython Implementation
#### Dependencies
- `nats-python` - Core NATS functionality
- `pyarrow` - Arrow IPC serialization
- `uuid` - Correlation ID and message ID generation
- `base64` - Base64 encoding/decoding
- `requests` or `aiohttp` - HTTP client for file server
#### smartsend Function
```python
def smartsend(
subject: str,
data: List[Tuple[str, Any, str]], # List of (dataname, data, type) tuples
broker_url: str = DEFAULT_BROKER_URL,
fileserver_url: str = DEFAULT_FILESERVER_URL,
fileserver_upload_handler: Callable = plik_oneshot_upload,
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
correlation_id: Union[str, None] = None,
msg_purpose: str = "chat",
sender_name: str = "NATSBridge",
receiver_name: str = "",
receiver_id: str = "",
reply_to: str = "",
reply_to_msg_id: str = "",
is_publish: bool = True
) -> Tuple[MessageEnvelope, str]
```
**Options:**
- `broker_url` (str) - NATS server URL (default: `"nats://localhost:4222"`)
- `fileserver_url` (str) - Base URL of the file server (default: `"http://localhost:8080"`)
- `size_threshold` (int) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
- `correlation_id` (str) - Optional correlation ID for tracing (auto-generated if None)
- `msg_purpose` (str) - Purpose of the message (default: `"chat"`)
- `sender_name` (str) - Sender name (default: `"NATSBridge"`)
- `receiver_name` (str) - Message receiver name (default: `""`)
- `receiver_id` (str) - Message receiver ID (default: `""`)
- `reply_to` (str) - Topic to reply to (default: `""`)
- `reply_to_msg_id` (str) - Message ID this message is replying to (default: `""`)
- `is_publish` (bool) - Whether to automatically publish the message to NATS (default: `True`)
- `fileserver_upload_handler` (Callable) - Custom upload handler function
**Note:** Python uses `is_publish` parameter (instead of `NATS_connection` keyword) to control automatic publishing behavior. Connection reuse can be achieved by creating a NATS client outside the function and reusing it in a custom `fileserver_upload_handler` or custom publish implementation.
**Return Value:**
- Returns a tuple `(env, env_json_str)` where:
- `env` - The envelope dictionary containing all metadata and payloads
- `env_json_str` - JSON string representation of the envelope for publishing
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Flow:**
1. Generate correlation ID and message ID if not provided
2. Iterate through the list of `(dataname, data, type)` tuples
3. For each payload:
- Serialize based on payload type
- Check payload size
- If < threshold: Base64 encode and include in envelope
- If >= threshold: Upload to HTTP server, store URL in envelope
4. Publish the JSON envelope to NATS
5. Return envelope dictionary and JSON string
#### smartreceive Handler
```python
async def smartreceive(
msg: NATS.Message,
options: Dict = {}
)
```
**Options:**
- `fileserver_download_handler` (Callable) - Custom download handler function
- `max_retries` (int) - Maximum retry attempts for fetching URL (default: `5`)
- `base_delay` (int) - Initial delay for exponential backoff in ms (default: `100`)
- `max_delay` (int) - Maximum delay for exponential backoff in ms (default: `5000`)
- `correlation_id` (str) - Optional correlation ID for tracing
**Output Format:**
- Returns a JSON object (dictionary) containing all envelope fields:
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of tuples, each containing `(dataname, data, payload_type)` with deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads` list
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: Base64 decode the data from the message
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserver_download_handler` receives `(url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str)` and returns `bytes`.
## Scenario Implementations
### Scenario 1: Command & Control (Small Dictionary)
@@ -690,7 +931,7 @@ async function smartreceive(msg, options = {})
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components across all platforms.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
## Performance Considerations

View File

@@ -19,49 +19,102 @@ NATSBridge is implemented in three languages, each providing the same API:
| **JavaScript** | [`src/NATSBridge.js`](../src/NATSBridge.js) | JavaScript implementation for Node.js and browsers |
| **Python/Micropython** | [`src/nats_bridge.py`](../src/nats_bridge.py) | Python implementation for desktop and microcontrollers |
### Multi-Payload Support
### File Server Handler Architecture
The implementation uses a **standardized list-of-tuples format** for all payload operations. **Even when sending a single payload, the user must wrap it in a list.**
The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
**Handler Function Signatures:**
```julia
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserver_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
```
This design allows the system to support multiple file server backends without changing the core messaging logic.
### Multi-Payload Support (Standard API)
The system uses a **standardized list-of-tuples format** for all payload operations. **Even when sending a single payload, the user must wrap it in a list.**
**API Standard:**
```julia
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# {
# "correlationId": "...",
# "msgId": "...",
# "correlation_id": "...",
# "msg_id": "...",
# "timestamp": "...",
# "sendTo": "...",
# "msgPurpose": "...",
# "senderName": "...",
# "senderId": "...",
# "receiverName": "...",
# "receiverId": "...",
# "replyTo": "...",
# "replyToMsgId": "...",
# "brokerURL": "...",
# "send_to": "...",
# "msg_purpose": "...",
# "sender_name": "...",
# "sender_id": "...",
# "receiver_name": "...",
# "receiver_id": "...",
# "reply_to": "...",
# "reply_to_msg_id": "...",
# "broker_url": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
```
Where `type` can be: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
**Supported Types:**
- `"text"` - Plain text
- `"dictionary"` - JSON-serializable dictionaries (Dict, NamedTuple)
- `"table"` - Tabular data (DataFrame, array of structs)
- `"image"` - Image data (Bitmap, PNG/JPG bytes)
- `"audio"` - Audio data (WAV, MP3 bytes)
- `"video"` - Video data (MP4, AVI bytes)
- `"binary"` - Generic binary data (Vector{UInt8})
This design allows per-payload type specification, enabling **mixed-content messages** where different payloads can use different serialization formats in a single message.
**Examples:**
```julia
# Single payload - still wrapped in a list (type is required as third element)
smartsend("/test", [(dataname1, data1, "text")], ...)
# Single payload - still wrapped in a list
smartsend(
"/test",
[("dataname1", data1, "dictionary")], # List with one tuple (data, type)
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Multiple payloads in one message (each payload has its own type)
smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], ...)
# Multiple payloads in one message with different types
smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
broker_url="nats://localhost:4222",
fileserver_upload_handler=plik_oneshot_upload
)
# Mixed content (e.g., chat with text, image, audio)
smartsend(
"/chat",
[
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, ...)
# envelope["payloads"] = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...]
# envelope["correlationId"], envelope["msgId"], etc.
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
```
## Cross-Platform Interoperability
@@ -95,24 +148,26 @@ NATSBridge is designed for seamless communication between Julia, JavaScript, and
### Example: Julia ↔ Python ↔ JavaScript
```julia
# Julia sender
# Julia sender - smartsend returns (env, env_json_str)
using NATSBridge
data = [("message", "Hello from Julia!", "text")]
smartsend("/cross_platform", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
```
```javascript
// JavaScript receiver
const { smartreceive } = require('./src/NATSBridge');
const envelope = await smartreceive(msg);
// envelope.payloads[0].data === "Hello from Julia!"
const env = await smartreceive(msg);
// env.payloads[0].data === "Hello from Julia!"
```
```python
# Python sender
from nats_bridge import smartsend
data = [("response", "Hello from Python!", "text")]
smartsend("/cross_platform", data, nats_url="nats://localhost:4222")
smartsend("/cross_platform", data, broker_url="nats://localhost:4222")
```
All three platforms can communicate seamlessly using the same NATS subjects and data format.
@@ -146,15 +201,31 @@ All three implementations (Julia, JavaScript, Python/Micropython) follow the sam
└─────────────────┘ └─────────────────┘
```
## Files
## smartsend Return Value
The `smartsend` function now returns a tuple containing both the envelope object and the JSON string representation:
```julia
env, env_json_str = smartsend(...)
# env::msg_envelope_v1 - The envelope object with all metadata and payloads
# env_json_str::String - JSON string for publishing to NATS
```
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
This enables two use cases:
1. **Programmatic envelope access**: Access envelope fields directly via the `env` object
2. **Direct JSON publishing**: Publish the JSON string directly using NATS request-reply pattern
### Julia Module: [`src/NATSBridge.jl`](../src/NATSBridge.jl)
The Julia implementation provides:
- **[`MessageEnvelope`](src/NATSBridge.jl)**: Struct for the unified JSON envelope
- **[`SmartSend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size
- **[`SmartReceive()`](src/NATSBridge.jl)**: Handles both direct and link transport
- **[`msg_envelope_v1`](src/NATSBridge.jl)**: Struct for the unified JSON envelope
- **[`msg_payload_v1`](src/NATSBridge.jl)**: Struct for individual payload representation
- **[`smartsend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size
- **[`smartreceive()`](src/NATSBridge.jl)**: Handles both direct and link transport
### JavaScript Module: [`src/NATSBridge.js`](../src/NATSBridge.js)
@@ -245,9 +316,164 @@ julia test/scenario3_julia_to_julia.jl
node test/scenario3_julia_to_julia.js
```
## API Consistency Across Languages
**High-Level API (Consistent Across All Languages):**
- `smartsend(subject, data, ...)` - Main publishing function
- `smartreceive(msg, ...)` - Main receiving function
- Message envelope structure (`msg_envelope_v1` / `MessageEnvelope`)
- Payload structure (`msg_payload_v1` / `MessagePayload`)
- Transport strategy (direct vs link based on size threshold)
- Supported payload types: text, dictionary, table, image, audio, video, binary
**Low-Level Native Functions (Language-Specific Conventions):**
- Julia: `NATS.connect()`, `publish_message()`, function overloading
- JavaScript: `nats.js` client, native async/await patterns
- Python: `nats-python` client, native async/await patterns
**Connection Reuse Pattern - Key Differences:**
- **Julia:** Uses `NATS_connection` keyword parameter with function overloading for automatic connection management
- **JavaScript/Python:** Achieved by creating NATS client outside the function and reusing it in custom handlers or custom publish implementations
**Why the Difference?**
- Julia supports function overloading and keyword arguments, allowing `NATS_connection` to be passed as an optional parameter
- JavaScript/Python use a simpler `is_publish` option to control automatic publishing
- `is_publish` is simply a switch: when `true`, publish automatically; when `false`, return `(env, env_json_str)` without publishing
- For connection reuse in JavaScript/Python, create a NATS client once and reuse it in your custom `fileserver_upload_handler` or custom publish logic
## Usage
### Scenario 0: Basic Multi-Payload Example
### Scenario 1: Command & Control (Small Dictionary)
**Focus:** Sending small dictionary configurations across platforms. This is the simplest use case for command and control scenarios.
**Julia (Sender/Receiver):**
```julia
using NATSBridge
# Send small dictionary config (wrapped in list with type)
config = Dict("step_size" => 0.01, "iterations" => 1000, "threshold" => 0.5)
env, env_json_str = smartsend(
"control",
[("config", config, "dictionary")],
broker_url="nats://localhost:4222"
)
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
```
**Julia (Sender/Receiver) with NATS_connection for connection reuse:**
```julia
using NATSBridge
# Create connection once for high-frequency publishing
conn = NATS.connect("nats://localhost:4222")
# Send multiple messages using the same connection (saves connection overhead)
for i in 1:100
config = Dict("iteration" => i, "data" => rand())
smartsend(
"control",
[("config", config, "dictionary")],
NATS_connection=conn, # Reuse connection
is_publish=true
)
end
# Close connection when done
NATS.close(conn)
```
**Use Case:** High-frequency publishing scenarios where connection reuse provides performance benefits by avoiding the overhead of establishing a new NATS connection for each message.
**JavaScript (Sender/Receiver):**
```javascript
const { smartsend } = require('./src/NATSBridge');
// Create small dictionary config
// Send via smartsend with type="dictionary"
const config = {
step_size: 0.01,
iterations: 1000,
threshold: 0.5
};
// Use is_publish option to control automatic publishing
await smartsend("control", [
{ dataname: "config", data: config, type: "dictionary" }
], {
is_publish: true // Automatically publish to NATS
});
```
**Connection Reuse in JavaScript:**
To achieve connection reuse in JavaScript, create a NATS client outside the function and use it in a custom `fileserver_upload_handler` or custom publish implementation:
```javascript
const { connect } = require('nats');
const { smartsend } = require('./src/NATSBridge');
// Create connection once
const nc = await connect({ servers: ['nats://localhost:4222'] });
// Send multiple messages using the same connection
for (let i = 0; i < 100; i++) {
const config = { iteration: i, data: Math.random() };
// Option 1: Use is_publish=false and publish manually with your connection
const { env, env_json_str } = await smartsend("control", [
{ dataname: "config", data: config, type: "dictionary" }
], { is_publish: false });
// Publish with your existing connection
await nc.publish("control", env_json_str);
}
// Close connection when done
await nc.close();
```
**Python/Micropython (Sender/Receiver):**
```python
from nats_bridge import smartsend
# Create small dictionary config
# Send via smartsend with type="dictionary"
config = {
"step_size": 0.01,
"iterations": 1000,
"threshold": 0.5
}
# Use is_publish parameter to control automatic publishing
smartsend("control", [("config", config, "dictionary")], is_publish=True)
```
**Connection Reuse in Python:**
To achieve connection reuse in Python, create a NATS client outside the function and use it in a custom `fileserver_upload_handler` or custom publish implementation:
```python
from nats_bridge import smartsend
import nats
# Create connection once
nc = await nats.connect("nats://localhost:4222")
# Send multiple messages using the same connection
for i in range(100):
config = {"iteration": i, "data": random.random()}
# Option 1: Use is_publish=False and publish manually with your connection
env, env_json_str = smartsend("control", [("config", config, "dictionary")], is_publish=False)
# Publish with your existing connection
await nc.publish("control", env_json_str)
# Close connection when done
await nc.close()
```
### Basic Multi-Payload Example
#### Python/Micropython (Sender)
```python
@@ -257,21 +483,21 @@ from nats_bridge import smartsend
smartsend(
"/test",
[("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080"
)
# Even single payload must be wrapped in a list with type
smartsend("/test", [("single_data", mydata, "dictionary")])
smartsend("/test", [("single_data", mydata, "dictionary")], broker_url="nats://localhost:4222")
```
#### Python/Micropython (Receiver)
```python
from nats_bridge import smartreceive
# Receive returns a list of (dataname, data, type) tuples
payloads = smartreceive(msg)
# payloads = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
# Receive returns a dictionary with envelope metadata and payloads field
env = smartreceive(msg)
# env["payloads"] = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
```
#### JavaScript (Sender)
@@ -315,18 +541,18 @@ const nc = await connect({ servers: ['nats://localhost:4222'] });
const sub = nc.subscribe("control");
for await (const msg of sub) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process the payloads from the envelope
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
const { dataname, data, type } = payload;
console.log(`Received ${dataname} of type ${type}`);
console.log(`Data: ${JSON.stringify(data)}`);
}
// Also access envelope metadata
console.log(`Correlation ID: ${envelope.correlationId}`);
console.log(`Message ID: ${envelope.msgId}`);
console.log(`Correlation ID: ${env.correlation_id}`);
console.log(`Message ID: ${env.msg_id}`);
}
```
@@ -344,19 +570,136 @@ df = DataFrame(
category = rand(["A", "B", "C"], 10_000_000)
)
# Send via SmartSend - wrapped in a list (type is part of each tuple)
await SmartSend("analysis_results", [("table_data", df, "table")]);
# Send via smartsend - wrapped in list with type
# Large payload will use link transport (HTTP fileserver)
env, env_json_str = smartsend(
"analysis_results",
[("table_data", df, "table")],
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080"
)
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
```
#### smartsend Function Signature (Julia)
```julia
function smartsend(
subject::String,
data::AbstractArray{Tuple{String, Any, String}, 1}; # List of (dataname, data, type) tuples
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true,
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional)
)
```
**New Keyword Parameter:**
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios.
**Connection Handling Logic:**
```julia
if is_publish == false
# skip publish
elseif is_publish == true && NATS_connection === nothing
publish_message(broker_url, subject, env_json_str, cid) # Creates new connection
elseif is_publish == true && NATS_connection !== nothing
publish_message(NATS_connection, subject, env_json_str, cid) # Uses provided connection
end
```
**Example with pre-existing connection:**
```julia
using NATSBridge
# Create connection once
conn = NATS.connect("nats://localhost:4222")
# Send multiple messages using the same connection
for i in 1:100
data = rand(1000)
smartsend(
"analysis_results",
[("table_data", data, "table")],
NATS_connection=conn, # Reuse connection
is_publish=true
)
end
# Close connection when done
NATS.close(conn)
```
#### publish_message Function
The `publish_message` function provides two overloads for publishing messages to NATS:
**Overload 1 - URL-based publishing (creates new connection):**
```julia
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(broker_url) # Create NATS connection
publish_message(conn, subject, message, correlation_id)
end
```
**Overload 2 - Connection-based publishing (uses pre-existing connection):**
```julia
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
try
NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject")
finally
NATS.drain(conn) # Ensure connection is closed properly
end
end
```
**Use Case:** Use the connection-based overload when you already have an established NATS connection and want to publish multiple messages without the overhead of creating a new connection for each publish.
**Integration with smartsend:**
```julia
# When NATS_connection is provided to smartsend, it uses the connection-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
NATS_connection=my_connection, # Pre-existing connection
is_publish=true
)
# Uses: publish_message(NATS_connection, subject, env_json_str, cid)
# When NATS_connection is not provided, it uses the URL-based publish_message
env, env_json_str = smartsend(
"my.subject",
[("data", payload_data, "type")],
broker_url="nats://localhost:4222",
is_publish=true
)
# Uses: publish_message(broker_url, subject, env_json_str, cid)
```
**API Consistency Note:**
- **Julia:** Uses `NATS_connection` keyword parameter with function overloading for automatic connection management
- **JavaScript/Python:** Use `is_publish` option and achieve connection reuse by creating NATS client outside the function and reusing it in custom handlers or custom publish implementations
#### JavaScript (Receiver)
```javascript
const { smartreceive } = require('./src/NATSBridge');
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Use table data from the payloads field
// Note: Tables are sent as arrays of objects in JavaScript
const table = envelope.payloads;
const table = env.payloads;
```
### Scenario 3: Live Binary Processing
@@ -365,19 +708,12 @@ const table = envelope.payloads;
```python
from nats_bridge import smartsend
# Binary data wrapped in a list
binary_data = [
("audio_chunk", binary_buffer, "binary")
]
# Binary data wrapped in list with type
smartsend(
"binary_input",
binary_data,
nats_url="nats://localhost:4222",
metadata={
"sample_rate": 44100,
"channels": 1
}
[("audio_chunk", binary_buffer, "binary")],
broker_url="nats://localhost:4222",
metadata={"sample_rate": 44100, "channels": 1}
)
```
@@ -406,10 +742,10 @@ from nats_bridge import smartreceive
# Receive binary data
def process_binary(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process the binary data from envelope.payloads
for dataname, data, type in envelope["payloads"]:
# Process the binary data from env.payloads
for dataname, data, type in env["payloads"]:
if type == "binary":
# data is bytes
print(f"Received binary data: {dataname}, size: {len(data)}")
@@ -422,10 +758,10 @@ const { smartreceive } = require('./src/NATSBridge');
// Receive binary data
function process_binary(msg) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process the binary data from envelope.payloads
for (const payload of envelope.payloads) {
// Process the binary data from env.payloads
for (const payload of env.payloads) {
if (payload.type === "binary") {
// data is an ArrayBuffer or Uint8Array
console.log(`Received binary data: ${payload.dataname}, size: ${payload.data.length}`);
@@ -441,10 +777,14 @@ function process_binary(msg) {
```julia
using NATSBridge
function publish_health_status(nats_url)
# Send status wrapped in a list (type is part of each tuple)
function publish_health_status(broker_url)
# Send status wrapped in list with type
status = Dict("cpu" => rand(), "memory" => rand())
smartsend("health", [("status", status, "dictionary")], nats_url=nats_url)
env, env_json_str = smartsend(
"health",
[("status", status, "dictionary")],
broker_url=broker_url
)
sleep(5) # Every 5 seconds
end
```
@@ -466,8 +806,8 @@ const consumer = await js.pullSubscribe("health", {
// Process historical and real-time messages
for await (const msg of consumer) {
const envelope = await smartreceive(msg);
// envelope.payloads contains the list of payloads
const env = await smartreceive(msg);
// env.payloads contains the list of payloads
// Each payload has: dataname, data, type
msg.ack();
}
@@ -484,11 +824,11 @@ import json
# Device configuration handler
def handle_device_config(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process configuration from payloads
for dataname, data, type in envelope["payloads"]:
if type == "dictionary":
for dataname, data, payload_type in env["payloads"]:
if payload_type == "dictionary":
print(f"Received configuration: {data}")
# Apply configuration to device
if "wifi_ssid" in data:
@@ -505,8 +845,8 @@ def handle_device_config(msg):
smartsend(
"device/response",
[("config", config, "dictionary")],
nats_url="nats://localhost:4222",
reply_to=envelope.get("replyTo")
broker_url="nats://localhost:4222",
reply_to=env.get("reply_to")
)
```
@@ -553,12 +893,14 @@ options_df = DataFrame(
# Check payload size (< 1MB threshold)
# Publish directly to NATS with Base64-encoded payload
# Include metadata for dashboard selection context
smartsend(
env, env_json_str = smartsend(
"dashboard.selection",
[("options_table", options_df, "table")],
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
metadata=Dict("context" => "user_selection")
)
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
```
**JavaScript (Receiver):**
@@ -566,11 +908,11 @@ smartsend(
const { smartreceive, smartsend } = require('./src/NATSBridge');
// Receive NATS message with direct transport
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Decode Base64 payload (for direct transport)
// For tables, data is in envelope.payloads
const table = envelope.payloads; // Array of objects
// For tables, data is in env.payloads
const table = env.payloads; // Array of objects
// User makes selection
const selection = uiComponent.getSelectedOption();
@@ -592,7 +934,6 @@ await smartsend("dashboard.response", [
**Julia (Sender/Receiver):**
```julia
using NATSBridge
using DataFrames
# Build chat message with mixed payloads:
# - Text: direct transport (Base64)
@@ -616,13 +957,15 @@ chat_message = [
("large_document", large_file_bytes, "binary") # Large file, link transport
]
smartsend(
env, env_json_str = smartsend(
"chat.room123",
chat_message,
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
msg_purpose="chat",
reply_to="chat.room123.responses"
)
# env: msg_envelope_v1 with all metadata and payloads
# env_json_str: JSON string for publishing
```
**JavaScript (Sender/Receiver):**
@@ -667,7 +1010,7 @@ await smartsend("chat.room123", message);
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads.
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
## Configuration
@@ -683,19 +1026,19 @@ await smartsend("chat.room123", message);
```json
{
"correlationId": "uuid-v4-string",
"msgId": "uuid-v4-string",
"correlation_id": "uuid-v4-string",
"msg_id": "uuid-v4-string",
"timestamp": "2024-01-15T10:30:00Z",
"sendTo": "topic/subject",
"msgPurpose": "ACK | NACK | updateStatus | shutdown | chat",
"senderName": "agent-wine-web-frontend",
"senderId": "uuid4",
"receiverName": "agent-backend",
"receiverId": "uuid4",
"replyTo": "topic",
"replyToMsgId": "uuid4",
"BrokerURL": "nats://localhost:4222",
"send_to": "topic/subject",
"msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
"sender_name": "agent-wine-web-frontend",
"sender_id": "uuid4",
"receiver_name": "agent-backend",
"receiver_id": "uuid4",
"reply_to": "topic",
"reply_to_msg_id": "uuid4",
"broker_url": "nats://localhost:4222",
"metadata": {
"content_type": "application/octet-stream",
@@ -706,7 +1049,7 @@ await smartsend("chat.room123", message);
{
"id": "uuid4",
"dataname": "login_image",
"type": "image",
"payload_type": "image",
"transport": "direct",
"encoding": "base64",
"size": 15433,

9
etc.jl
View File

@@ -0,0 +1,9 @@
Task: Update README.md to reflect recent changes in NATSbridge package.
Context: the package has been updated with the NATS_connection keyword and the publish_message function.
Requirements:
Source of Truth: Treat the updated NATSbridge code as the definitive source. Update README.md to align exactly with these changes.
API Consistency: Ensure the Main Package API (e.g., smartsend(), publish_message()) uses consistent naming across all three supported languages.
Ecosystem Variance: Low-level native functions (e.g., NATS.connect(), JSON.read()) should follow the conventions of the specific language ecosystem and do not require cross-language consistency.

View File

@@ -107,10 +107,15 @@ python3 -m http.server 8080 --directory /tmp/fileserver
```python
from nats_bridge import smartsend
# Send a text message
# Send a text message (is_publish=True by default)
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222")
print("Message sent!")
# Or use is_publish=False to get envelope and JSON without publishing
env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222", is_publish=False)
# env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS
```
#### JavaScript
@@ -118,12 +123,19 @@ print("Message sent!")
```javascript
const { smartsend } = require('./src/NATSBridge');
// Send a text message
// Send a text message (isPublish=true by default)
await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
], { brokerUrl: "nats://localhost:4222" });
console.log("Message sent!");
// Or use isPublish=false to get envelope and JSON without publishing
const { env, env_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { brokerUrl: "nats://localhost:4222", isPublish: false });
// env: MessageEnvelope object
// env_json_str: JSON string for publishing to NATS
```
#### Julia
@@ -133,7 +145,9 @@ using NATSBridge
# Send a text message
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/room1", data, broker_url="nats://localhost:4222")
# env: msg_envelope_v1 object with all metadata and payloads
# env_json_str: JSON string representation of the envelope for publishing
println("Message sent!")
```
@@ -145,8 +159,8 @@ println("Message sent!")
from nats_bridge import smartreceive
# Receive and process message
envelope = smartreceive(msg)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg)
for dataname, data, type in env["payloads"]:
print(f"Received {dataname}: {data}")
```
@@ -156,8 +170,8 @@ for dataname, data, type in envelope["payloads"]:
const { smartreceive } = require('./src/NATSBridge');
// Receive and process message
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
console.log(`Received ${payload.dataname}: ${payload.data}`);
}
```
@@ -168,8 +182,8 @@ for (const payload of envelope.payloads) {
using NATSBridge
# Receive and process message
envelope = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data")
end
```
@@ -194,7 +208,7 @@ config = {
# Send as dictionary type
data = [("config", config, "dictionary")]
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/device/config", data, broker_url="nats://localhost:4222")
```
#### JavaScript
@@ -208,9 +222,9 @@ const config = {
update_interval: 60
};
await smartsend("/device/config", [
const { env, env_json_str } = await smartsend("/device/config", [
{ dataname: "config", data: config, type: "dictionary" }
]);
], { brokerUrl: "nats://localhost:4222" });
```
#### Julia
@@ -225,7 +239,7 @@ config = Dict(
)
data = [("config", config, "dictionary")]
smartsend("/device/config", data)
env, env_json_str = smartsend("/device/config", data, broker_url="nats://localhost:4222")
```
### Example 2: Sending Binary Data (Image)
@@ -241,7 +255,7 @@ with open("image.png", "rb") as f:
# Send as binary type
data = [("user_image", image_data, "binary")]
env = smartsend("/chat/image", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/image", data, broker_url="nats://localhost:4222")
```
#### JavaScript
@@ -253,9 +267,9 @@ const { smartsend } = require('./src/NATSBridge');
const fs = require('fs');
const image_data = fs.readFileSync('image.png');
await smartsend("/chat/image", [
const { env, env_json_str } = await smartsend("/chat/image", [
{ dataname: "user_image", data: image_data, type: "binary" }
]);
], { brokerUrl: "nats://localhost:4222" });
```
#### Julia
@@ -267,7 +281,7 @@ using NATSBridge
image_data = read("image.png")
data = [("user_image", image_data, "binary")]
smartsend("/chat/image", data)
env, env_json_str = smartsend("/chat/image", data, broker_url="nats://localhost:4222")
```
### Example 3: Request-Response Pattern
@@ -279,13 +293,15 @@ from nats_bridge import smartsend
# Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend(
env, env_json_str = smartsend(
"/device/command",
data,
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
reply_to="/device/response",
reply_to_msg_id="cmd-001"
)
# env: MessageEnvelope object
# env_json_str: JSON string for publishing to NATS
```
#### JavaScript (Responder)
@@ -297,10 +313,10 @@ const { smartreceive, smartsend } = require('./src/NATSBridge');
const sub = nc.subscribe("/device/command");
for await (const msg of sub) {
const envelope = await smartreceive(msg);
const env = await smartreceive(msg);
// Process command
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.dataname === "command") {
const command = payload.data;
@@ -315,8 +331,8 @@ for await (const msg of sub) {
await smartsend("/device/response", [
{ dataname: "sensor_data", data: response, type: "dictionary" }
], {
reply_to: envelope.replyTo,
reply_to_msg_id: envelope.msgId
reply_to: env.replyTo,
reply_to_msg_id: env.msgId
});
}
}
@@ -342,10 +358,10 @@ import os
large_data = os.urandom(2_000_000) # 2MB of random data
# Send with file server URL
env = smartsend(
env, env_json_str = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080",
size_threshold=1_000_000
)
@@ -364,9 +380,10 @@ const largeData = new ArrayBuffer(2_000_000);
const view = new Uint8Array(largeData);
view.fill(42); // Fill with some data
await smartsend("/data/large", [
const { env, env_json_str } = await smartsend("/data/large", [
{ dataname: "large_file", data: largeData, type: "binary" }
], {
brokerUrl: "nats://localhost:4222",
fileserverUrl: "http://localhost:8080",
sizeThreshold: 1_000_000
});
@@ -380,9 +397,10 @@ using NATSBridge
# Create large data (> 1MB)
large_data = rand(UInt8, 2_000_000)
env = smartsend(
env, env_json_str = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080"
)
@@ -409,7 +427,7 @@ data = [
("user_avatar", image_data, "image")
]
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/chat/mixed", data, broker_url="nats://localhost:4222")
```
#### JavaScript
@@ -419,7 +437,7 @@ const { smartsend } = require('./src/NATSBridge');
const fs = require('fs');
await smartsend("/chat/mixed", [
const { env, env_json_str } = await smartsend("/chat/mixed", [
{
dataname: "message_text",
data: "Hello with image!",
@@ -430,7 +448,7 @@ await smartsend("/chat/mixed", [
data: fs.readFileSync("avatar.png"),
type: "image"
}
]);
], { brokerUrl: "nats://localhost:4222" });
```
#### Julia
@@ -445,7 +463,7 @@ data = [
("user_avatar", image_data, "image")
]
smartsend("/chat/mixed", data)
env, env_json_str = smartsend("/chat/mixed", data, broker_url="nats://localhost:4222")
```
### Example 6: Table Data (Arrow IPC)
@@ -467,7 +485,7 @@ df = pd.DataFrame({
# Send as table type
data = [("students", df, "table")]
env = smartsend("/data/students", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/data/students", data, broker_url="nats://localhost:4222")
```
#### Julia
@@ -484,7 +502,7 @@ df = DataFrame(
)
data = [("students", df, "table")]
smartsend("/data/students", data)
env, env_json_str = smartsend("/data/students", data, broker_url="nats://localhost:4222")
```
---
@@ -503,7 +521,7 @@ using NATSBridge
# Send dictionary from Julia to JavaScript
config = Dict("step_size" => 0.01, "iterations" => 1000)
data = [("config", config, "dictionary")]
smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
env, env_json_str = smartsend("/analysis/config", data, broker_url="nats://localhost:4222")
```
#### JavaScript Receiver
@@ -512,8 +530,8 @@ smartsend("/analysis/config", data, nats_url="nats://localhost:4222")
const { smartreceive } = require('./src/NATSBridge');
// Receive dictionary from Julia
const envelope = await smartreceive(msg);
for (const payload of envelope.payloads) {
const env = await smartreceive(msg);
for (const payload of env.payloads) {
if (payload.type === "dictionary") {
console.log("Received config:", payload.data);
// payload.data = { step_size: 0.01, iterations: 1000 }
@@ -528,9 +546,9 @@ for (const payload of envelope.payloads) {
```javascript
const { smartsend } = require('./src/NATSBridge');
await smartsend("/data/transfer", [
const { env, env_json_str } = await smartsend("/data/transfer", [
{ dataname: "message", data: "Hello from JS!", type: "text" }
]);
], { brokerUrl: "nats://localhost:4222" });
```
#### Python Receiver
@@ -538,8 +556,8 @@ await smartsend("/data/transfer", [
```python
from nats_bridge import smartreceive
envelope = smartreceive(msg)
for dataname, data, type in envelope["payloads"]:
env = smartreceive(msg)
for dataname, data, type in env["payloads"]:
if type == "text":
print(f"Received from JS: {data}")
```
@@ -552,7 +570,7 @@ for dataname, data, type in envelope["payloads"]:
from nats_bridge import smartsend
data = [("message", "Hello from Python!", "text")]
smartsend("/chat/python", data)
env, env_json_str = smartsend("/chat/python", data, broker_url="nats://localhost:4222")
```
#### Julia Receiver
@@ -560,8 +578,8 @@ smartsend("/chat/python", data)
```julia
using NATSBridge
envelope = smartreceive(msg, fileserverDownloadHandler)
for (dataname, data, type) in envelope["payloads"]
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
for (dataname, data, type) in env["payloads"]
if type == "text"
println("Received from Python: $data")
end

View File

@@ -132,10 +132,11 @@ class ChatUI {
});
}
await smartsend(
const { env, env_json_str } = await smartsend(
`/chat/${this.currentRoom}`,
data,
{
brokerUrl: window.config.broker_url,
fileserverUrl: window.config.fileserver_url,
sizeThreshold: window.config.size_threshold
}
@@ -216,15 +217,15 @@ class ChatHandler {
}
async handleMessage(msg) {
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.downloadFile.bind(this)
});
// Extract sender info from envelope
const sender = envelope.senderName || 'Anonymous';
const sender = env.senderName || 'Anonymous';
// Process each payload
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'text') {
this.ui.addMessage(sender, payload.data);
} else if (payload.type === 'image') {
@@ -288,8 +289,8 @@ Let's build a file transfer system that handles large files efficiently.
const { smartsend } = require('./NATSBridge');
class FileUploadService {
constructor(natsUrl, fileserverUrl) {
this.natsUrl = natsUrl;
constructor(brokerUrl, fileserverUrl) {
this.brokerUrl = brokerUrl;
this.fileserverUrl = fileserverUrl;
}
@@ -304,17 +305,17 @@ class FileUploadService {
type: 'binary'
}];
const envelope = await smartsend(
const { env, env_json_str } = await smartsend(
`/files/${recipient}`,
data,
{
natsUrl: this.natsUrl,
brokerUrl: this.brokerUrl,
fileserverUrl: this.fileserverUrl,
sizeThreshold: 1048576
}
);
return envelope;
return env;
}
async uploadLargeFile(filePath, recipient) {
@@ -356,12 +357,12 @@ class FileDownloadService {
async downloadFile(sender, downloadId) {
// Subscribe to sender's file channel
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
});
// Process each payload
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'binary') {
const filePath = `/downloads/${payload.dataname}`;
fs.writeFileSync(filePath, payload.data);
@@ -419,12 +420,12 @@ async function uploadFile(config) {
const filePath = await rl.question('Enter file path: ');
const recipient = await rl.question('Enter recipient: ');
const fileService = new FileUploadService(config.nats_url, config.fileserver_url);
const fileService = new FileUploadService(config.broker_url, config.fileserver_url);
try {
const envelope = await fileService.uploadFile(filePath, recipient);
const env = await fileService.uploadFile(filePath, recipient);
console.log('Upload successful!');
console.log(`File ID: ${envelope.payloads[0].id}`);
console.log(`File ID: ${env.payloads[0].id}`);
} catch (error) {
console.error('Upload failed:', error.message);
}
@@ -500,8 +501,8 @@ import time
import random
class SensorSender:
def __init__(self, nats_url: str, fileserver_url: str):
self.nats_url = nats_url
def __init__(self, broker_url: str, fileserver_url: str):
self.broker_url = broker_url
self.fileserver_url = fileserver_url
def send_reading(self, sensor_id: str, value: float, unit: str):
@@ -514,13 +515,39 @@ class SensorSender:
data = [("reading", reading.to_dict(), "dictionary")]
# Default: is_publish=True (automatically publishes to NATS)
smartsend(
f"/sensors/{sensor_id}",
data,
nats_url=self.nats_url,
broker_url=self.broker_url,
fileserver_url=self.fileserver_url
)
def prepare_message_only(self, sensor_id: str, value: float, unit: str):
"""Prepare a message without publishing (is_publish=False)."""
reading = SensorReading(
sensor_id=sensor_id,
timestamp=datetime.now().isoformat(),
value=value,
unit=unit
)
data = [("reading", reading.to_dict(), "dictionary")]
# With is_publish=False, returns (env, env_json_str) without publishing
env, env_json_str = smartsend(
f"/sensors/{sensor_id}/prepare",
data,
broker_url=self.broker_url,
fileserver_url=self.fileserver_url,
is_publish=False
)
# Now you can publish manually using NATS request-reply pattern
# nc.request(subject, env_json_str, reply_to=reply_to_topic)
return env, env_json_str
def send_batch(self, readings: List[SensorReading]):
batch = SensorBatch()
for reading in readings:
@@ -546,7 +573,7 @@ class SensorSender:
smartsend(
f"/sensors/batch",
data,
nats_url=self.nats_url,
broker_url=self.broker_url,
fileserver_url=self.fileserver_url
)
else:
@@ -571,9 +598,9 @@ class SensorReceiver:
self.fileserver_download_handler = fileserver_download_handler
def process_reading(self, msg):
envelope = smartreceive(msg, self.fileserver_download_handler)
env = smartreceive(msg, self.fileserver_download_handler)
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if data_type == "dictionary":
reading = SensorReading(
sensor_id=data["sensor_id"],
@@ -605,17 +632,17 @@ Let's build an IoT device using Micropython that connects to NATS.
import json
class DeviceConfig:
def __init__(self, ssid, password, nats_url, device_id):
def __init__(self, ssid, password, broker_url, device_id):
self.ssid = ssid
self.password = password
self.nats_url = nats_url
self.broker_url = broker_url
self.device_id = device_id
def to_dict(self):
return {
"ssid": self.ssid,
"password": self.password,
"nats_url": self.nats_url,
"broker_url": self.broker_url,
"device_id": self.device_id
}
```
@@ -630,7 +657,7 @@ import json
class DeviceBridge:
def __init__(self, config):
self.config = config
self.nats_url = config.nats_url
self.broker_url = config.broker_url
def connect(self):
# Connect to WiFi
@@ -650,7 +677,7 @@ class DeviceBridge:
smartsend(
f"/devices/{self.config.device_id}/status",
data,
nats_url=self.nats_url
broker_url=self.broker_url
)
def send_sensor_data(self, sensor_id, value, unit):
@@ -661,7 +688,7 @@ class DeviceBridge:
smartsend(
f"/devices/{self.config.device_id}/sensors/{sensor_id}",
data,
nats_url=self.nats_url
broker_url=self.broker_url
)
def receive_commands(self, callback):
@@ -673,10 +700,10 @@ class DeviceBridge:
# Poll for messages
msg = self._poll_for_message()
if msg:
envelope = smartreceive(msg)
env = smartreceive(msg)
# Process payloads
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if dataname == "command":
callback(data)
@@ -699,7 +726,7 @@ import random
config = DeviceConfig(
ssid="MyNetwork",
password="password123",
nats_url="nats://localhost:4222",
broker_url="nats://localhost:4222",
device_id="device-001"
)
@@ -748,8 +775,8 @@ import pyarrow as pa
import io
class DashboardServer:
def __init__(self, nats_url, fileserver_url):
self.nats_url = nats_url
def __init__(self, broker_url, fileserver_url):
self.broker_url = broker_url
self.fileserver_url = fileserver_url
def broadcast_data(self, df):
@@ -766,15 +793,15 @@ class DashboardServer:
smartsend(
"/dashboard/data",
data,
nats_url=self.nats_url,
broker_url=self.broker_url,
fileserver_url=self.fileserver_url
)
def receive_selection(self, callback):
def handler(msg):
envelope = smartreceive(msg)
env = smartreceive(msg)
for dataname, data, data_type in envelope["payloads"]:
for dataname, data, data_type in env["payloads"]:
if data_type == "dictionary":
callback(data)
@@ -807,21 +834,22 @@ class DashboardUI {
async refreshData() {
// Request fresh data
await smartsend("/dashboard/request", [
const { env, env_json_str } = await smartsend("/dashboard/request", [
{ dataname: "request", data: { type: "refresh" }, type: "dictionary" }
], {
brokerUrl: window.config.broker_url,
fileserverUrl: window.config.fileserver_url
});
}
async fetchData() {
// Subscribe to data updates
const envelope = await smartreceive(msg, {
const env = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
});
// Process table data
for (const payload of envelope.payloads) {
for (const payload of env.payloads) {
if (payload.type === 'table') {
// Deserialize Arrow IPC
this.data = this.deserializeArrow(payload.data);
@@ -928,7 +956,7 @@ def send_batch_readings(self, readings):
smartsend(
"/sensors/batch",
[("batch", arrow_data, "table")],
nats_url=self.nats_url
broker_url=self.broker_url
)
```

View File

@@ -0,0 +1,14 @@
services:
plik:
image: rootgg/plik:latest
container_name: plik-server
restart: unless-stopped
ports:
- "8080:8080"
volumes:
# # Mount the config file (created below)
# - ./plikd.cfg:/home/plik/server/plikd.cfg
# Mount local folder for uploads and database
- ./plik-data:/data
# Set user to match your host UID to avoid permission issues
user: "1000:1000"

File diff suppressed because it is too large Load Diff

View File

@@ -17,7 +17,7 @@
* // The handler is passed to smartsend as fileserverUploadHandler parameter
* // It receives: (fileserver_url, dataname, data)
* // Returns: { status, uploadid, fileid, url }
* async function fileserverUploadHandler(fileserver_url, dataname, data) { ... }
* async function plik_oneshot_upload(fileserver_url, dataname, data) { ... }
*
* // Download handler - fetches data from file server URL with exponential backoff
* // The handler is passed to smartreceive as fileserverDownloadHandler parameter
@@ -98,6 +98,26 @@ function base64ToArrayBuffer(base64) {
return bytes.buffer;
}
// Helper: Convert Uint8Array to Base64 string
function uint8ArrayToBase64(uint8array) {
let binary = '';
for (let i = 0; i < uint8array.byteLength; i++) {
binary += String.fromCharCode(uint8array[i]);
}
return btoa(binary);
}
// Helper: Convert Base64 string to Uint8Array
function base64ToUint8Array(base64) {
const binaryString = atob(base64);
const len = binaryString.length;
const bytes = new Uint8Array(len);
for (let i = 0; i < len; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
return bytes;
}
// Helper: Serialize data based on type
function _serialize_data(data, type) {
/**
@@ -114,39 +134,39 @@ function _serialize_data(data, type) {
*/
if (type === "text") {
if (typeof data === 'string') {
return new TextEncoder().encode(data).buffer;
return new TextEncoder().encode(data);
} else {
throw new Error("Text data must be a String");
}
} else if (type === "dictionary") {
// JSON data - serialize directly
const jsonStr = JSON.stringify(data);
return new TextEncoder().encode(jsonStr).buffer;
return new TextEncoder().encode(jsonStr);
} else if (type === "table") {
// Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
// This would require the apache-arrow library
throw new Error("Table serialization requires apache-arrow library");
} else if (type === "image") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer;
return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else {
throw new Error("Image data must be ArrayBuffer or Uint8Array");
}
} else if (type === "audio") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer;
return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else {
throw new Error("Audio data must be ArrayBuffer or Uint8Array");
}
} else if (type === "video") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer;
return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else {
throw new Error("Video data must be ArrayBuffer or Uint8Array");
}
} else if (type === "binary") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer;
return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else {
throw new Error("Binary data must be ArrayBuffer or Uint8Array");
}
@@ -171,10 +191,10 @@ function _deserialize_data(data, type, correlation_id) {
*/
if (type === "text") {
const decoder = new TextDecoder();
return decoder.decode(new Uint8Array(data));
return decoder.decode(data);
} else if (type === "dictionary") {
const decoder = new TextDecoder();
const jsonStr = decoder.decode(new Uint8Array(data));
const jsonStr = decoder.decode(data);
return JSON.parse(jsonStr);
} else if (type === "table") {
// Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
@@ -193,73 +213,16 @@ function _deserialize_data(data, type, correlation_id) {
}
// Helper: Upload data to file server
// Internal wrapper that adds correlation_id logging for smartsend
async function _upload_to_fileserver(fileserver_url, dataname, data, correlation_id) {
/**
* Upload data to HTTP file server (plik-like API)
*
* This function implements the plik one-shot upload mode:
* 1. Creates a one-shot upload session by sending POST request with {"OneShot": true}
* 2. Uploads the file data as multipart form data
* 3. Returns identifiers and download URL for the uploaded file
* Internal upload helper - wraps plik_oneshot_upload to add correlation_id logging
* This allows smartsend to pass correlation_id for tracing without changing the handler signature
*/
log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`);
// Step 1: Get upload ID and token
const url_getUploadID = `${fileserver_url}/upload`;
const headers = {
"Content-Type": "application/json"
};
const body = JSON.stringify({ OneShot: true });
let response = await fetch(url_getUploadID, {
method: "POST",
headers: headers,
body: body
});
if (!response.ok) {
throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`);
}
const responseJson = await response.json();
const uploadid = responseJson.id;
const uploadtoken = responseJson.uploadToken;
// Step 2: Upload file data
const url_upload = `${fileserver_url}/file/${uploadid}`;
// Create multipart form data
const formData = new FormData();
// Create a Blob from the ArrayBuffer
const blob = new Blob([data], { type: "application/octet-stream" });
formData.append("file", blob, dataname);
response = await fetch(url_upload, {
method: "POST",
headers: {
"X-UploadToken": uploadtoken
},
body: formData
});
if (!response.ok) {
throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`);
}
const fileResponseJson = await response.json();
const fileid = fileResponseJson.id;
// Build the download URL
const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
log_trace(correlation_id, `Uploaded to URL: ${url}`);
return {
status: response.status,
uploadid: uploadid,
fileid: fileid,
url: url
};
const result = await plik_oneshot_upload(fileserver_url, dataname, data);
log_trace(correlation_id, `Uploaded to URL: ${result.url}`);
return result;
}
// Helper: Fetch data from URL with exponential backoff
@@ -276,7 +239,7 @@ async function _fetch_with_backoff(url, max_retries, base_delay, max_delay, corr
if (response.status === 200) {
log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`);
const arrayBuffer = await response.arrayBuffer();
return arrayBuffer;
return new Uint8Array(arrayBuffer);
} else {
throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`);
}
@@ -306,25 +269,26 @@ function _get_payload_bytes(data) {
}
}
// MessagePayload class
// MessagePayload class - matches msg_payload_v1 Julia struct
class MessagePayload {
/**
* Represents a single payload in the message envelope
* Matches Julia's msg_payload_v1 struct
*
* @param {Object} options - Payload options
* @param {string} options.id - ID of this payload (e.g., "uuid4")
* @param {string} options.dataname - Name of this payload (e.g., "login_image")
* @param {string} options.type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
* @param {string} options.payload_type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
* @param {string} options.transport - "direct" or "link"
* @param {string} options.encoding - "none", "json", "base64", "arrow-ipc"
* @param {number} options.size - Data size in bytes
* @param {string|ArrayBuffer} options.data - Payload data (direct) or URL (link)
* @param {string|Uint8Array} options.data - Payload data (Uint8Array for direct, URL string for link)
* @param {Object} options.metadata - Metadata for this payload
*/
constructor(options) {
this.id = options.id || uuid4();
this.dataname = options.dataname;
this.type = options.type;
this.payload_type = options.payload_type;
this.transport = options.transport;
this.encoding = options.encoding;
this.size = options.size;
@@ -332,27 +296,27 @@ class MessagePayload {
this.metadata = options.metadata || {};
}
// Convert to JSON object
// Convert to JSON object - uses snake_case to match Julia API
toJSON() {
const obj = {
id: this.id,
dataname: this.dataname,
type: this.type,
payload_type: this.payload_type,
transport: this.transport,
encoding: this.encoding,
size: this.size
};
// Include data based on transport type
if (this.transport === "direct" && this.data !== null) {
if (this.transport === "direct" && this.data !== null && this.data !== undefined) {
if (this.encoding === "base64" || this.encoding === "json") {
obj.data = this.data;
} else {
// For other encodings, use base64
const payloadBytes = _get_payload_bytes(this.data);
obj.data = arrayBufferToBase64(payloadBytes);
obj.data = uint8ArrayToBase64(payloadBytes);
}
} else if (this.transport === "link" && this.data !== null) {
} else if (this.transport === "link" && this.data !== null && this.data !== undefined) {
// For link transport, data is a URL string
obj.data = this.data;
}
@@ -365,59 +329,60 @@ class MessagePayload {
}
}
// MessageEnvelope class
// MessageEnvelope class - matches msg_envelope_v1 Julia struct
class MessageEnvelope {
/**
* Represents the message envelope containing metadata and payloads
* Matches Julia's msg_envelope_v1 struct
*
* @param {Object} options - Envelope options
* @param {string} options.sendTo - Topic/subject the sender sends to
* @param {Array<MessagePayload>} options.payloads - Array of payloads
* @param {string} options.correlationId - Unique identifier to track messages
* @param {string} options.msgId - This message id
* @param {string} options.correlation_id - Unique identifier to track messages
* @param {string} options.msg_id - This message id
* @param {string} options.timestamp - Message published timestamp
* @param {string} options.msgPurpose - Purpose of this message
* @param {string} options.senderName - Name of the sender
* @param {string} options.senderId - UUID of the sender
* @param {string} options.receiverName - Name of the receiver
* @param {string} options.receiverId - UUID of the receiver
* @param {string} options.replyTo - Topic to reply to
* @param {string} options.replyToMsgId - Message id this message is replying to
* @param {string} options.brokerURL - NATS server address
* @param {string} options.send_to - Topic/subject the sender sends to
* @param {string} options.msg_purpose - Purpose of this message
* @param {string} options.sender_name - Name of the sender
* @param {string} options.sender_id - UUID of the sender
* @param {string} options.receiver_name - Name of the receiver
* @param {string} options.receiver_id - UUID of the receiver
* @param {string} options.reply_to - Topic to reply to
* @param {string} options.reply_to_msg_id - Message id this message is replying to
* @param {string} options.broker_url - NATS server address
* @param {Object} options.metadata - Metadata for the envelope
* @param {Array<MessagePayload>} options.payloads - Array of payloads
*/
constructor(options) {
this.correlationId = options.correlationId || uuid4();
this.msgId = options.msgId || uuid4();
this.correlation_id = options.correlation_id || uuid4();
this.msg_id = options.msg_id || uuid4();
this.timestamp = options.timestamp || new Date().toISOString();
this.sendTo = options.sendTo;
this.msgPurpose = options.msgPurpose || "";
this.senderName = options.senderName || "";
this.senderId = options.senderId || uuid4();
this.receiverName = options.receiverName || "";
this.receiverId = options.receiverId || "";
this.replyTo = options.replyTo || "";
this.replyToMsgId = options.replyToMsgId || "";
this.brokerURL = options.brokerURL || DEFAULT_NATS_URL;
this.send_to = options.send_to;
this.msg_purpose = options.msg_purpose || "";
this.sender_name = options.sender_name || "";
this.sender_id = options.sender_id || uuid4();
this.receiver_name = options.receiver_name || "";
this.receiver_id = options.receiver_id || "";
this.reply_to = options.reply_to || "";
this.reply_to_msg_id = options.reply_to_msg_id || "";
this.broker_url = options.broker_url || DEFAULT_NATS_URL;
this.metadata = options.metadata || {};
this.payloads = options.payloads || [];
}
// Convert to JSON string
// Convert to JSON object - uses snake_case to match Julia API
toJSON() {
const obj = {
correlationId: this.correlationId,
msgId: this.msgId,
correlation_id: this.correlation_id,
msg_id: this.msg_id,
timestamp: this.timestamp,
sendTo: this.sendTo,
msgPurpose: this.msgPurpose,
senderName: this.senderName,
senderId: this.senderId,
receiverName: this.receiverName,
receiverId: this.receiverId,
replyTo: this.replyTo,
replyToMsgId: this.replyToMsgId,
brokerURL: this.brokerURL
send_to: this.send_to,
msg_purpose: this.msg_purpose,
sender_name: this.sender_name,
sender_id: this.sender_id,
receiver_name: this.receiver_name,
receiver_id: this.receiver_id,
reply_to: this.reply_to,
reply_to_msg_id: this.reply_to_msg_id,
broker_url: this.broker_url
};
if (Object.keys(this.metadata).length > 0) {
@@ -437,7 +402,7 @@ class MessageEnvelope {
}
}
// SmartSend function
// SmartSend function - matches Julia smartsend signature and behavior
async function smartsend(subject, data, options = {}) {
/**
* Send data either directly via NATS or via a fileserver URL, depending on payload size
@@ -447,40 +412,45 @@ async function smartsend(subject, data, options = {}) {
* Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS.
*
* @param {string} subject - NATS subject to publish the message to
* @param {Array} data - List of {dataname, data, type} objects to send
* @param {Array} data - List of {dataname, data, type} objects to send (must be a list, even for single payload)
* @param {Object} options - Additional options
* @param {string} options.natsUrl - URL of the NATS server (default: "nats://localhost:4222")
* @param {string} options.fileserverUrl - Base URL of the file server (default: "http://localhost:8080")
* @param {Function} options.fileserverUploadHandler - Function to handle fileserver uploads
* @param {number} options.sizeThreshold - Threshold in bytes separating direct vs link transport (default: 1MB)
* @param {string} options.correlationId - Optional correlation ID for tracing
* @param {string} options.msgPurpose - Purpose of the message (default: "chat")
* @param {string} options.senderName - Name of the sender (default: "NATSBridge")
* @param {string} options.receiverName - Name of the receiver (default: "")
* @param {string} options.receiverId - UUID of the receiver (default: "")
* @param {string} options.replyTo - Topic to reply to (default: "")
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "")
*
* @returns {Promise<MessageEnvelope>} - The envelope for tracking
* @param {string} options.broker_url - URL of the NATS server (default: "nats://localhost:4222")
* @param {string} options.fileserver_url - Base URL of the file server (default: "http://localhost:8080")
* @param {Function} options.fileserver_upload_handler - Function to handle fileserver uploads
* @param {number} options.size_threshold - Threshold in bytes separating direct vs link transport (default: 1MB)
* @param {string} options.correlation_id - Optional correlation ID for tracing
* @param {string} options.msg_purpose - Purpose of the message (default: "chat")
* @param {string} options.sender_name - Name of the sender (default: "NATSBridge")
* @param {string} options.receiver_name - Name of the receiver (default: "")
* @param {string} options.receiver_id - UUID of the receiver (default: "")
* @param {string} options.reply_to - Topic to reply to (default: "")
* @param {string} options.reply_to_msg_id - Message ID this message is replying to (default: "")
* @param {boolean} options.is_publish - Whether to automatically publish the message to NATS (default: true)
* - When true: Message is published to NATS automatically
* - When false: Returns (env, env_json_str) without publishing, allowing manual publishing
* @returns {Promise<Object>} - A tuple-like object with { env: MessageEnvelope, env_json_str: string }
* - env: MessageEnvelope object with all metadata and payloads
* - env_json_str: JSON string representation of the envelope for manual publishing
*/
const {
natsUrl = DEFAULT_NATS_URL,
fileserverUrl = DEFAULT_FILESERVER_URL,
fileserverUploadHandler = _upload_to_fileserver,
sizeThreshold = DEFAULT_SIZE_THRESHOLD,
correlationId = uuid4(),
msgPurpose = "chat",
senderName = "NATSBridge",
receiverName = "",
receiverId = "",
replyTo = "",
replyToMsgId = ""
broker_url = DEFAULT_NATS_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler = _upload_to_fileserver,
size_threshold = DEFAULT_SIZE_THRESHOLD,
correlation_id = uuid4(),
msg_purpose = "chat",
sender_name = "NATSBridge",
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = "",
is_publish = true // Whether to automatically publish the message to NATS
} = options;
log_trace(correlationId, `Starting smartsend for subject: ${subject}`);
log_trace(correlation_id, `Starting smartsend for subject: ${subject}`);
// Generate message metadata
const msgId = uuid4();
const msg_id = uuid4();
// Process each payload in the list
const payloads = [];
@@ -494,18 +464,18 @@ async function smartsend(subject, data, options = {}) {
const payloadBytes = _serialize_data(payloadData, payloadType);
const payloadSize = payloadBytes.byteLength;
log_trace(correlationId, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);
log_trace(correlation_id, `Serialized payload '${dataname}' (payload_type: ${payloadType}) size: ${payloadSize} bytes`);
// Decision: Direct vs Link
if (payloadSize < sizeThreshold) {
if (payloadSize < size_threshold) {
// Direct path - Base64 encode and send via NATS
const payloadB64 = arrayBufferToBase64(payloadBytes);
log_trace(correlationId, `Using direct transport for ${payloadSize} bytes`);
const payloadB64 = uint8ArrayToBase64(payloadBytes);
log_trace(correlation_id, `Using direct transport for ${payloadSize} bytes`);
// Create MessagePayload for direct transport
const payloadObj = new MessagePayload({
dataname: dataname,
type: payloadType,
payload_type: payloadType,
transport: "direct",
encoding: "base64",
size: payloadSize,
@@ -515,22 +485,22 @@ async function smartsend(subject, data, options = {}) {
payloads.push(payloadObj);
} else {
// Link path - Upload to HTTP server, send URL via NATS
log_trace(correlationId, `Using link transport, uploading to fileserver`);
log_trace(correlation_id, `Using link transport, uploading to fileserver`);
// Upload to HTTP server
const response = await fileserverUploadHandler(fileserverUrl, dataname, payloadBytes, correlationId);
// Upload to HTTP server using plik_oneshot_upload handler
const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes);
if (response.status !== 200) {
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
}
const url = response.url;
log_trace(correlationId, `Uploaded to URL: ${url}`);
log_trace(correlation_id, `Uploaded to URL: ${url}`);
// Create MessagePayload for link transport
const payloadObj = new MessagePayload({
dataname: dataname,
type: payloadType,
payload_type: payloadType,
transport: "link",
encoding: "none",
size: payloadSize,
@@ -543,31 +513,40 @@ async function smartsend(subject, data, options = {}) {
// Create MessageEnvelope with all payloads
const env = new MessageEnvelope({
correlationId: correlationId,
msgId: msgId,
sendTo: subject,
msgPurpose: msgPurpose,
senderName: senderName,
receiverName: receiverName,
receiverId: receiverId,
replyTo: replyTo,
replyToMsgId: replyToMsgId,
brokerURL: natsUrl,
correlation_id: correlation_id,
msg_id: msg_id,
send_to: subject,
msg_purpose: msg_purpose,
sender_name: sender_name,
receiver_name: receiver_name,
receiver_id: receiver_id,
reply_to: reply_to,
reply_to_msg_id: reply_to_msg_id,
broker_url: broker_url,
payloads: payloads
});
// Publish message to NATS
await publish_message(natsUrl, subject, env.toString(), correlationId);
// Convert envelope to JSON string
const env_json_str = env.toString();
return env;
// Publish to NATS if isPublish is true
if (is_publish) {
await publish_message(broker_url, subject, env_json_str, correlation_id);
}
// Return both envelope and JSON string (tuple-like structure, matching Julia API)
return {
env: env,
env_json_str: env_json_str
};
}
// Helper: Publish message to NATS
async function publish_message(natsUrl, subject, message, correlation_id) {
async function publish_message(broker_url, subject, message, correlation_id) {
/**
* Publish a message to a NATS subject with proper connection management
*
* @param {string} natsUrl - NATS server URL
* @param {string} broker_url - NATS server URL
* @param {string} subject - NATS subject to publish to
* @param {string} message - JSON message to publish
* @param {string} correlation_id - Correlation ID for logging
@@ -580,7 +559,7 @@ async function publish_message(natsUrl, subject, message, correlation_id) {
// Example with nats.js:
// import { connect } from 'nats';
// const nc = await connect({ servers: [natsUrl] });
// const nc = await connect({ servers: [broker_url] });
// await nc.publish(subject, message);
// nc.close();
@@ -588,7 +567,7 @@ async function publish_message(natsUrl, subject, message, correlation_id) {
console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`);
}
// SmartReceive function
// SmartReceive function - matches Julia smartreceive signature and behavior
async function smartreceive(msg, options = {}) {
/**
* Receive and process messages from NATS
@@ -598,25 +577,25 @@ async function smartreceive(msg, options = {}) {
*
* @param {Object} msg - NATS message object with payload property
* @param {Object} options - Additional options
* @param {Function} options.fileserverDownloadHandler - Function to handle downloading data from file server URLs
* @param {number} options.maxRetries - Maximum retry attempts for fetching URL (default: 5)
* @param {number} options.baseDelay - Initial delay for exponential backoff in ms (default: 100)
* @param {number} options.maxDelay - Maximum delay for exponential backoff in ms (default: 5000)
* @param {Function} options.fileserver_download_handler - Function to handle downloading data from file server URLs
* @param {number} options.max_retries - Maximum retry attempts for fetching URL (default: 5)
* @param {number} options.base_delay - Initial delay for exponential backoff in ms (default: 100)
* @param {number} options.max_delay - Maximum delay for exponential backoff in ms (default: 5000)
*
* @returns {Promise<Object>} - Envelope dictionary with metadata and payloads field containing list of {dataname, data, type} objects
* @returns {Promise<Object>} - JSON object of envelope with payloads field containing list of {dataname, data, type} tuples
*/
const {
fileserverDownloadHandler = _fetch_with_backoff,
maxRetries = 5,
baseDelay = 100,
maxDelay = 5000
fileserver_download_handler = _fetch_with_backoff,
max_retries = 5,
base_delay = 100,
max_delay = 5000
} = options;
// Parse the JSON envelope
const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
const json_data = JSON.parse(jsonStr);
log_trace(json_data.correlationId, `Processing received message`);
log_trace(json_data.correlation_id, `Processing received message`);
// Process all payloads in the envelope
const payloads_list = [];
@@ -631,32 +610,32 @@ async function smartreceive(msg, options = {}) {
if (transport === "direct") {
// Direct transport - payload is in the message
log_trace(json_data.correlationId, `Direct transport - decoding payload '${dataname}'`);
log_trace(json_data.correlation_id, `Direct transport - decoding payload '${dataname}'`);
// Extract base64 payload from the payload
const payload_b64 = payload.data;
// Decode Base64 payload
const payload_bytes = base64ToArrayBuffer(payload_b64);
const payload_bytes = base64ToUint8Array(payload_b64);
// Deserialize based on type
const data_type = payload.type;
const data = _deserialize_data(payload_bytes, data_type, json_data.correlationId);
const data_type = payload.payload_type;
const data = _deserialize_data(payload_bytes, data_type, json_data.correlation_id);
payloads_list.push({ dataname, data, type: data_type });
} else if (transport === "link") {
// Link transport - payload is at URL
const url = payload.data;
log_trace(json_data.correlationId, `Link transport - fetching '${dataname}' from URL: ${url}`);
log_trace(json_data.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`);
// Fetch with exponential backoff using the download handler
const downloaded_data = await fileserverDownloadHandler(
url, maxRetries, baseDelay, maxDelay, json_data.correlationId
const downloaded_data = await fileserver_download_handler(
url, max_retries, base_delay, max_delay, json_data.correlation_id
);
// Deserialize based on type
const data_type = payload.type;
const data = _deserialize_data(downloaded_data, data_type, json_data.correlationId);
const data_type = payload.payload_type;
const data = _deserialize_data(downloaded_data, data_type, json_data.correlation_id);
payloads_list.push({ dataname, data, type: data_type });
} else {
@@ -665,11 +644,74 @@ async function smartreceive(msg, options = {}) {
}
// Replace payloads array with the processed list of {dataname, data, type} tuples
// This matches Julia's smartreceive return format
json_data.payloads = payloads_list;
return json_data;
}
// plik_oneshot_upload - matches Julia plik_oneshot_upload function
// Upload handler signature: plik_oneshot_upload(fileserver_url, dataname, data)
// Returns: { status, uploadid, fileid, url }
async function plik_oneshot_upload(file_server_url, dataname, data) {
/**
* Upload a single file to a plik server using one-shot mode
* This function uploads raw byte array to a plik server in one-shot mode (no upload session).
* It first creates a one-shot upload session by sending a POST request with {"OneShot": true},
* retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
*
* This is the default upload handler used by smartsend.
* Custom handlers can be passed via the fileserver_upload_handler option.
*
* @param {string} file_server_url - Base URL of the plik server (e.g., "http://localhost:8080")
* @param {string} dataname - Name of the file being uploaded
* @param {Uint8Array} data - Raw byte data of the file content
* @returns {Promise<Object>} - Dictionary with keys: status, uploadid, fileid, url
*/
// Step 1: Get upload ID and token
const url_getUploadID = `${file_server_url}/upload`;
const headers = { "Content-Type": "application/json" };
const body = JSON.stringify({ OneShot: true });
let http_response = await fetch(url_getUploadID, {
method: "POST",
headers: headers,
body: body
});
const response_json = await http_response.json();
const uploadid = response_json.id;
const uploadtoken = response_json.uploadToken;
// Step 2: Upload file data
const url_upload = `${file_server_url}/file/${uploadid}`;
// Create multipart form data
const formData = new FormData();
const blob = new Blob([data], { type: "application/octet-stream" });
formData.append("file", blob, dataname);
http_response = await fetch(url_upload, {
method: "POST",
headers: { "X-UploadToken": uploadtoken },
body: formData
});
const fileResponseJson = await http_response.json();
const fileid = fileResponseJson.id;
// URL of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
const url = `${file_server_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
return {
status: http_response.status,
uploadid: uploadid,
fileid: fileid,
url: url
};
}
// Export for Node.js
if (typeof module !== 'undefined' && module.exports) {
module.exports = {
@@ -681,6 +723,7 @@ if (typeof module !== 'undefined' && module.exports) {
_deserialize_data,
_fetch_with_backoff,
_upload_to_fileserver,
plik_oneshot_upload,
DEFAULT_SIZE_THRESHOLD,
DEFAULT_NATS_URL,
DEFAULT_FILESERVER_URL,
@@ -700,6 +743,7 @@ if (typeof window !== 'undefined') {
_deserialize_data,
_fetch_with_backoff,
_upload_to_fileserver,
plik_oneshot_upload,
DEFAULT_SIZE_THRESHOLD,
DEFAULT_NATS_URL,
DEFAULT_FILESERVER_URL,

View File

@@ -1,295 +0,0 @@
# NATSBridge
A high-performance, bi-directional data bridge for **Julia**, **JavaScript**, and **Python/Micropython** using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
## Overview
NATSBridge enables seamless communication between Julia, JavaScript, and Python/Micropython applications through NATS, with automatic transport selection based on payload size:
- **Direct Transport**: Payloads < 1MB are sent directly via NATS (Base64 encoded)
- **Link Transport**: Payloads >= 1MB are uploaded to an HTTP file server and referenced via URL
## Features
- ✅ Bi-directional NATS communication across Julia ↔ JavaScript ↔ Python/Micropython
- ✅ Multi-payload support (mixed content in single message)
- ✅ Automatic transport selection based on payload size
- ✅ File server integration for large payloads
- ✅ Exponential backoff for URL fetching
- ✅ Correlation ID tracking
- ✅ Reply-to support for request-response pattern
## Supported Payload Types
| Type | Description |
|------|-------------|
| `text` | Plain text strings |
| `dictionary` | JSON-serializable dictionaries |
| `table` | Tabular data (Arrow IPC format) |
| `image` | Image data (PNG, JPG bytes) |
| `audio` | Audio data (WAV, MP3 bytes) |
| `video` | Video data (MP4, AVI bytes) |
| `binary` | Generic binary data |
## Implementation Guides
### [Julia Implementation](../tutorial_julia.md)
See the [Julia tutorial](../tutorial_julia.md) for getting started with Julia.
### [JavaScript Implementation](#javascript-implementation)
See [`NATSBridge.js`](NATSBridge.js) for the JavaScript implementation.
### [Python/Micropython Implementation](#pythonmicropython-implementation)
See [`nats_bridge.py`](nats_bridge.py) for the Python/Micropython implementation.
## Installation
### Julia
```julia
using Pkg
Pkg.add("NATS")
Pkg.add("Arrow")
Pkg.add("JSON3")
Pkg.add("HTTP")
Pkg.add("UUIDs")
Pkg.add("Dates")
```
### JavaScript
```bash
npm install nats.js apache-arrow uuid base64-url
```
### Python/Micropython
1. Copy `nats_bridge.py` to your device
2. Ensure you have the following dependencies:
- `urequests` for HTTP requests (Micropython)
- `requests` for HTTP requests (Python)
- `base64` for base64 encoding
- `json` for JSON handling
- `socket` for networking (Micropython)
## Usage
### Basic Text Message
#### Python/Micropython
```python
from nats_bridge import smartsend, smartreceive
# Sender
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# Receiver
payloads = smartreceive(msg)
for dataname, data, type in payloads:
print("Received {}: {}".format(dataname, data))
```
#### Julia
```julia
using NATSBridge
# Sender
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# Receiver
envelope = smartreceive(msg, fileserverDownloadHandler)
# envelope["payloads"] = [("message", "Hello World", "text"), ...]
```
#### JavaScript
```javascript
const { smartsend, smartreceive } = require('./src/NATSBridge');
// Sender
await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" });
// Receiver
const envelope = await smartreceive(msg);
// envelope.payloads = [{ dataname: "message", data: "Hello World", type: "text" }, ...]
```
### Sending JSON Configuration
#### Python/Micropython
```python
from nats_bridge import smartsend
config = {
"wifi_ssid": "MyNetwork",
"wifi_password": "password123",
"update_interval": 60
}
data = [("config", config, "dictionary")]
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")
```
### Mixed Content (Chat with Text + Image)
#### Python/Micropython
```python
from nats_bridge import smartsend
image_data = b"\x89PNG..." # PNG bytes
data = [
("message_text", "Hello with image!", "text"),
("user_avatar", image_data, "binary")
]
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
```
### Request-Response Pattern
#### Python/Micropython
```python
from nats_bridge import smartsend
# Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend(
"/device/command",
data,
nats_url="nats://localhost:4222",
reply_to="/device/response",
reply_to_msg_id="cmd-001"
)
```
### Large Payloads (File Server)
#### Python/Micropython
```python
from nats_bridge import smartsend
# Large data (> 1MB)
large_data = b"A" * 2000000 # 2MB
env = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
nats_url="nats://localhost:4222",
fileserver_url="http://localhost:8080",
size_threshold=1000000 # 1MB threshold
)
```
## API Reference
### `smartsend(subject, data, ...)`
Send data via NATS with automatic transport selection.
**Arguments:**
- `subject` (str): NATS subject to publish to
- `data` (list): List of `(dataname, data, type)` tuples
- `nats_url` (str): NATS server URL (default: `nats://localhost:4222`)
- `fileserver_url` (str): HTTP file server URL (default: `http://localhost:8080`)
- `size_threshold` (int): Threshold in bytes (default: 1,000,000)
- `correlation_id` (str): Optional correlation ID for tracing
- `msg_purpose` (str): Message purpose (default: `"chat"`)
- `sender_name` (str): Sender name (default: `"NATSBridge"`)
- `receiver_name` (str): Receiver name (default: `""`)
- `receiver_id` (str): Receiver ID (default: `""`)
- `reply_to` (str): Reply topic (default: `""`)
- `reply_to_msg_id` (str): Reply message ID (default: `""`)
**Returns:** `MessageEnvelope` object
### `smartreceive(msg, ...)`
Receive and process NATS messages.
**Arguments:**
- `msg`: NATS message (dict or JSON string)
- `fileserver_download_handler` (function): Function to fetch data from URLs
- `max_retries` (int): Maximum retry attempts (default: 5)
- `base_delay` (int): Initial delay in ms (default: 100)
- `max_delay` (int): Maximum delay in ms (default: 5000)
**Returns:** List of `(dataname, data, type)` tuples
### `MessageEnvelope`
Represents a complete NATS message envelope.
**Attributes:**
- `correlation_id`: Unique identifier for tracing
- `msg_id`: Unique message identifier
- `timestamp`: Message publication timestamp
- `send_to`: NATS subject
- `msg_purpose`: Message purpose
- `sender_name`: Sender name
- `sender_id`: Sender UUID
- `receiver_name`: Receiver name
- `receiver_id`: Receiver UUID
- `reply_to`: Reply topic
- `reply_to_msg_id`: Reply message ID
- `broker_url`: NATS broker URL
- `metadata`: Message-level metadata
- `payloads`: List of MessagePayload objects
### `MessagePayload`
Represents a single payload within a message envelope.
**Attributes:**
- `id`: Unique payload identifier
- `dataname`: Name of the payload
- `type`: Payload type ("text", "dictionary", etc.)
- `transport`: Transport method ("direct" or "link")
- `encoding`: Encoding method ("none", "base64", etc.)
- `size`: Payload size in bytes
- `data`: Payload data (bytes for direct, URL for link)
- `metadata`: Payload-level metadata
## Examples
See [`examples/micropython_example.py`](../examples/micropython_example.py) for more detailed examples.
## Testing
Run the test suite:
```bash
# Python/Micropython
python test/test_micropython_basic.py
# JavaScript
node test/test_js_to_js_text_sender.js
node test/test_js_to_js_text_receiver.js
# Julia
julia test/test_julia_to_julia_text_sender.jl
julia test/test_julia_to_julia_text_receiver.jl
```
## Requirements
- **Julia**: NATS server (nats.io), HTTP file server (optional)
- **JavaScript**: NATS server (nats.io), HTTP file server (optional)
- **Python/Micropython**: NATS server (nats.io), HTTP file server (optional)
## License
MIT

View File

@@ -1,45 +1,76 @@
"""
Micropython NATS Bridge - Bi-Directional Data Bridge for Micropython
Python NATS Bridge - Bi-Directional Data Bridge
This module provides functionality for sending and receiving data over NATS
using the Claim-Check pattern for large payloads.
Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
Multi-Payload Support (Standard API):
The system uses a standardized list-of-tuples format for all payload operations.
Even when sending a single payload, the user must wrap it in a list.
API Standard:
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# {
# "correlation_id": "...",
# "msg_id": "...",
# "timestamp": "...",
# "send_to": "...",
# "msg_purpose": "...",
# "sender_name": "...",
# "sender_id": "...",
# "receiver_name": "...",
# "receiver_id": "...",
# "reply_to": "...",
# "reply_to_msg_id": "...",
# "broker_url": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
"""
import json
import random
import time
import usocket
import uselect
import ustruct
import uuid
try:
import ussl
HAS_SSL = True
except ImportError:
HAS_SSL = False
# Constants
DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport
DEFAULT_NATS_URL = "nats://localhost:4222"
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"
# ============================================= 100 ============================================== #
class MessagePayload:
"""Internal message payload structure representing a single payload within a NATS message envelope."""
"""Internal message payload structure representing a single payload within a NATS message envelope.
def __init__(self, data, msg_type, id="", dataname="", transport="direct",
This structure represents a single payload within a NATS message envelope.
It supports both direct transport (base64-encoded data) and link transport (URL-based).
Attributes:
id: Unique identifier for this payload (e.g., "uuid4")
dataname: Name of the payload (e.g., "login_image")
payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
transport: Transport method ("direct" or "link")
encoding: Encoding method ("none", "json", "base64", "arrow-ipc")
size: Size of the payload in bytes
data: Payload data (bytes for direct, URL for link)
metadata: Optional metadata dictionary
"""
def __init__(self, data, payload_type, id="", dataname="", transport="direct",
encoding="none", size=0, metadata=None):
"""
Initialize a MessagePayload.
Args:
data: Payload data (bytes for direct, URL string for link)
msg_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
data: Payload data (base64 string for direct, URL string for link)
payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
id: Unique identifier for this payload (auto-generated if empty)
dataname: Name of the payload (auto-generated UUID if empty)
transport: Transport method ("direct" or "link")
@@ -49,7 +80,7 @@ class MessagePayload:
"""
self.id = id if id else self._generate_uuid()
self.dataname = dataname if dataname else self._generate_uuid()
self.type = msg_type
self.payload_type = payload_type
self.transport = transport
self.encoding = encoding
self.size = size
@@ -65,7 +96,7 @@ class MessagePayload:
payload_dict = {
"id": self.id,
"dataname": self.dataname,
"type": self.type,
"payload_type": self.payload_type,
"transport": self.transport,
"encoding": self.encoding,
"size": self.size,
@@ -106,7 +137,7 @@ class MessageEnvelope:
def __init__(self, send_to, payloads, correlation_id="", msg_id="", timestamp="",
msg_purpose="", sender_name="", sender_id="", receiver_name="",
receiver_id="", reply_to="", reply_to_msg_id="", broker_url=DEFAULT_NATS_URL,
receiver_id="", reply_to="", reply_to_msg_id="", broker_url=DEFAULT_BROKER_URL,
metadata=None):
"""
Initialize a MessageEnvelope.
@@ -152,20 +183,24 @@ class MessageEnvelope:
return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime())
def to_json(self):
"""Convert envelope to JSON string."""
"""Convert envelope to JSON string.
Returns:
str: JSON string representation of the envelope using snake_case field names
"""
obj = {
"correlationId": self.correlation_id,
"msgId": self.msg_id,
"correlation_id": self.correlation_id,
"msg_id": self.msg_id,
"timestamp": self.timestamp,
"sendTo": self.send_to,
"msgPurpose": self.msg_purpose,
"senderName": self.sender_name,
"senderId": self.sender_id,
"receiverName": self.receiver_name,
"receiverId": self.receiver_id,
"replyTo": self.reply_to,
"replyToMsgId": self.reply_to_msg_id,
"brokerURL": self.broker_url
"send_to": self.send_to,
"msg_purpose": self.msg_purpose,
"sender_name": self.sender_name,
"sender_id": self.sender_id,
"receiver_name": self.receiver_name,
"receiver_id": self.receiver_id,
"reply_to": self.reply_to,
"reply_to_msg_id": self.reply_to_msg_id,
"broker_url": self.broker_url
}
# Include metadata if not empty
@@ -188,68 +223,126 @@ def log_trace(correlation_id, message):
print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message))
def _serialize_data(data, msg_type):
def _serialize_data(data, payload_type):
"""Serialize data according to specified format.
This function serializes arbitrary data into a binary representation based on the specified type.
It supports multiple serialization formats for different data types.
Args:
data: Data to serialize
msg_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary")
- "text": String
- "dictionary": JSON-serializable dict
- "table": Tabular data (pandas DataFrame or list of dicts)
- "image", "audio", "video", "binary": bytes
payload_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary")
Returns:
bytes: Binary representation of the serialized data
Example:
>>> text_bytes = _serialize_data("Hello World", "text")
>>> json_bytes = _serialize_data({"key": "value"}, "dictionary")
>>> table_bytes = _serialize_data([{"id": 1, "name": "Alice"}], "table")
"""
if msg_type == "text":
if payload_type == "text":
if isinstance(data, str):
return data.encode('utf-8')
else:
raise ValueError("Text data must be a string")
elif msg_type == "dictionary":
elif payload_type == "dictionary":
if isinstance(data, dict):
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Dictionary data must be a dict")
elif msg_type in ("image", "audio", "video", "binary"):
elif payload_type == "table":
# Support pandas DataFrame or list of dicts
try:
import pandas as pd
if isinstance(data, pd.DataFrame):
# Convert DataFrame to JSON and then to bytes
json_str = data.to_json(orient='records', force_ascii=False)
return json_str.encode('utf-8')
elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
# List of dicts
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Table data must be a pandas DataFrame or list of dicts")
except ImportError:
# Fallback: if pandas not available, treat as list of dicts
if isinstance(data, list):
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Table data requires pandas DataFrame or list of dicts (pandas not available)")
elif payload_type in ("image", "audio", "video", "binary"):
if isinstance(data, bytes):
return data
else:
raise ValueError("{} data must be bytes".format(msg_type.capitalize()))
raise ValueError("{} data must be bytes".format(payload_type.capitalize()))
else:
raise ValueError("Unknown type: {}".format(msg_type))
raise ValueError("Unknown payload_type: {}".format(payload_type))
def _deserialize_data(data_bytes, msg_type, correlation_id):
def _deserialize_data(data_bytes, payload_type, correlation_id):
"""Deserialize bytes to data based on type.
This function converts serialized bytes back to Python data based on type.
It handles "text" (string), "dictionary" (JSON deserialization), "table" (JSON deserialization),
"image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data).
Args:
data_bytes: Serialized data as bytes
msg_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
payload_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
correlation_id: Correlation ID for logging
Returns:
Deserialized data
Deserialized data:
- "text": str
- "dictionary": dict
- "table": list of dicts (or pandas DataFrame if available)
- "image", "audio", "video", "binary": bytes
Example:
>>> text_data = _deserialize_data(b"Hello", "text", "corr_id")
>>> json_data = _deserialize_data(b'{"key": "value"}', "dictionary", "corr_id")
>>> table_data = _deserialize_data(b'[{"id": 1}]', "table", "corr_id")
"""
if msg_type == "text":
if payload_type == "text":
return data_bytes.decode('utf-8')
elif msg_type == "dictionary":
elif payload_type == "dictionary":
json_str = data_bytes.decode('utf-8')
return json.loads(json_str)
elif msg_type in ("image", "audio", "video", "binary"):
elif payload_type == "table":
# Deserialize table data (JSON format)
json_str = data_bytes.decode('utf-8')
table_data = json.loads(json_str)
# If pandas is available, try to convert to DataFrame
try:
import pandas as pd
return pd.DataFrame(table_data)
except ImportError:
return table_data
elif payload_type in ("image", "audio", "video", "binary"):
return data_bytes
else:
raise ValueError("Unknown type: {}".format(msg_type))
raise ValueError("Unknown payload_type: {}".format(payload_type))
class NATSConnection:
"""Simple NATS connection for Micropython."""
"""Simple NATS connection for Python and Micropython."""
def __init__(self, url=DEFAULT_NATS_URL):
def __init__(self, url=DEFAULT_BROKER_URL):
"""Initialize NATS connection.
Args:
@@ -276,9 +369,19 @@ class NATSConnection:
def connect(self):
"""Connect to NATS server."""
# Use socket for both Python and Micropython
try:
import socket
addr = socket.getaddrinfo(self.host, self.port)[0][-1]
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.connect(addr)
except NameError:
# Micropython fallback
import usocket
addr = usocket.getaddrinfo(self.host, self.port)[0][-1]
self.conn = usocket.socket()
self.conn.connect(addr)
log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port))
def publish(self, subject, message):
@@ -294,7 +397,15 @@ class NATSConnection:
# Simple NATS protocol implementation
msg = "PUB {} {}\r\n".format(subject, len(message))
msg = msg.encode('utf-8') + message + b"\r\n"
try:
import socket
self.conn.send(msg)
except NameError:
# Micropython fallback
import usocket
self.conn.send(msg)
log_trace("", "Message published to {}".format(subject))
def subscribe(self, subject, callback):
@@ -335,11 +446,14 @@ class NATSConnection:
def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""):
"""Fetch data from URL with exponential backoff.
This function retrieves data from a URL with retry logic using
exponential backoff to handle transient failures.
Args:
url: URL to fetch from
max_retries: Maximum number of retry attempts
base_delay: Initial delay in milliseconds
max_delay: Maximum delay in milliseconds
max_retries: Maximum number of retry attempts (default: 5)
base_delay: Initial delay in milliseconds (default: 100)
max_delay: Maximum delay in milliseconds (default: 5000)
correlation_id: Correlation ID for logging
Returns:
@@ -347,33 +461,54 @@ def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, corr
Raises:
Exception: If all retry attempts fail
Example:
>>> data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "corr_id")
"""
delay = base_delay
for attempt in range(1, max_retries + 1):
try:
# Simple HTTP GET request
# This is a simplified implementation
# For production, you'd want a proper HTTP client
# Try urequests for Micropython first, then requests for Python
try:
import urequests
response = urequests.get(url)
if response.status_code == 200:
status_code = response.status_code
content = response.content
except ImportError:
try:
import requests
response = requests.get(url)
response.raise_for_status()
status_code = response.status_code
content = response.content
except ImportError:
raise Exception("No HTTP library available (urequests or requests)")
if status_code == 200:
log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt))
return response.content
return content
else:
raise Exception("Failed to fetch: {}".format(response.status_code))
raise Exception("Failed to fetch: {}".format(status_code))
except Exception as e:
log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e)))
if attempt < max_retries:
time.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
raise Exception("Failed to fetch data after {} attempts".format(max_retries))
def plik_oneshot_upload(file_server_url, filename, data):
def plik_oneshot_upload(fileserver_url, dataname, data):
"""Upload a single file to a plik server using one-shot mode.
This function uploads raw byte data to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with {"OneShot": true},
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
Args:
file_server_url: Base URL of the plik server
filename: Name of the file being uploaded
fileserver_url: Base URL of the plik server (e.g., "http://localhost:8080")
dataname: Name of the file being uploaded
data: Raw byte data of the file content
Returns:
@@ -382,23 +517,31 @@ def plik_oneshot_upload(file_server_url, filename, data):
- "uploadid": ID of the one-shot upload session
- "fileid": ID of the uploaded file within the session
- "url": Full URL to download the uploaded file
Example:
>>> result = plik_oneshot_upload("http://localhost:8080", "test.txt", b"hello world")
>>> result["status"], result["uploadid"], result["fileid"], result["url"]
"""
import urequests
import json
try:
import urequests
except ImportError:
import requests as urequests
# Get upload ID
url_get_upload_id = "{}/upload".format(file_server_url)
url_get_upload_id = "{}/upload".format(fileserver_url)
headers = {"Content-Type": "application/json"}
body = json.dumps({"OneShot": True})
response = urequests.post(url_get_upload_id, headers=headers, data=body)
response_json = json.loads(response.content)
response_json = json.loads(response.text if hasattr(response, 'text') else response.content)
uploadid = response_json.get("id")
uploadtoken = response_json.get("uploadToken")
# Upload file
url_upload = "{}/file/{}".format(file_server_url, uploadid)
url_upload = "{}/file/{}".format(fileserver_url, uploadid)
headers = {"X-UploadToken": uploadtoken}
# For Micropython, we need to construct the multipart form data manually
@@ -407,7 +550,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
# Create multipart body
part1 = "--{}\r\n".format(boundary)
part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(filename)
part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(dataname)
part1 += "Content-Type: application/octet-stream\r\n\r\n"
part1_bytes = part1.encode('utf-8')
@@ -421,10 +564,10 @@ def plik_oneshot_upload(file_server_url, filename, data):
content_type = "multipart/form-data; boundary={}".format(boundary)
response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body)
response_json = json.loads(response.content)
response_json = json.loads(response.text if hasattr(response, 'text') else response.content)
fileid = response_json.get("id")
url = "{}/file/{}/{}".format(file_server_url, uploadid, filename)
url = "{}/file/{}/{}".format(fileserver_url, uploadid, dataname)
return {
"status": response.status_code,
@@ -434,10 +577,10 @@ def plik_oneshot_upload(file_server_url, filename, data):
}
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL,
def smartsend(subject, data, broker_url=DEFAULT_BROKER_URL, fileserver_url=DEFAULT_FILESERVER_URL,
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id=""):
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True):
"""Send data either directly via NATS or via a fileserver URL, depending on payload size.
This function intelligently routes data delivery based on payload size relative to a threshold.
@@ -445,26 +588,46 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
publishes directly over NATS. Otherwise, it uploads the data to a fileserver and publishes
only the download URL over NATS.
API Standard:
- Input format: List of (dataname, data, payload_type) tuples
- Even single payloads must be wrapped in a list
- Each payload can have a different type, enabling mixed-content messages
Args:
subject: NATS subject to publish the message to
data: List of (dataname, data, type) tuples to send
nats_url: URL of the NATS server
data: List of (dataname, data, payload_type) tuples to send
- dataname: Name of the payload
- data: The actual data to send
- payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
- Example: [("message", "Hello World!", "text"), ("config", {"key": "value"}, "dictionary")]
broker_url: URL of the NATS server
fileserver_url: URL of the HTTP file server
fileserver_upload_handler: Function to handle fileserver uploads
size_threshold: Threshold in bytes separating direct vs link transport
correlation_id: Optional correlation ID for tracing
msg_purpose: Purpose of the message
fileserver_upload_handler: Function to handle fileserver uploads (must return dict with "status", "uploadid", "fileid", "url" keys)
size_threshold: Threshold in bytes separating direct vs link transport (default: 1MB)
correlation_id: Optional correlation ID for tracing; if None, a UUID is generated
msg_purpose: Purpose of the message ("ACK", "NACK", "updateStatus", "shutdown", "chat", etc.)
sender_name: Name of the sender
receiver_name: Name of the receiver
receiver_id: UUID of the receiver
reply_to: Topic to reply to
receiver_name: Name of the receiver (empty string means broadcast)
receiver_id: UUID of the receiver (empty string means broadcast)
reply_to: Topic to reply to (empty string if no reply expected)
reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS (default: True)
- When True: message is published to NATS
- When False: returns envelope and JSON string without publishing
Returns:
MessageEnvelope: The envelope object for tracking
tuple: (env, env_json_str) where:
- env: MessageEnvelope object with all metadata and payloads
- env_json_str: JSON string representation of the envelope for publishing
Example:
>>> data = [("message", "Hello World!", "text")]
>>> env, env_json_str = smartsend("/test", data)
>>> # env: MessageEnvelope with all metadata and payloads
>>> # env_json_str: JSON string for publishing
"""
# Generate correlation ID if not provided
cid = correlation_id if correlation_id else str(uuid.uuid4())
cid = correlation_id if correlation_id is not None else str(uuid.uuid4())
log_trace(cid, "Starting smartsend for subject: {}".format(subject))
@@ -479,16 +642,19 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes)
log_trace(cid, "Serialized payload '{}' (type: {}) size: {} bytes".format(
log_trace(cid, "Serialized payload '{}' (payload_type: {}) size: {} bytes".format(
dataname, payload_type, payload_size))
# Decision: Direct vs Link
if payload_size < size_threshold:
# Direct path - Base64 encode and send via NATS
payload_b64 = _serialize_data(payload_bytes, "binary") # Already bytes
# Convert to base64 string for JSON
try:
import ubinascii
payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip()
except ImportError:
import base64
payload_b64_str = base64.b64encode(payload_bytes).decode('utf-8')
log_trace(cid, "Using direct transport for {} bytes".format(payload_size))
@@ -511,10 +677,10 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
# Upload to HTTP server
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
if response["status"] != 200:
raise Exception("Failed to upload data to fileserver: {}".format(response["status"]))
if response.get("status") != 200:
raise Exception("Failed to upload data to fileserver: {}".format(response.get("status")))
url = response["url"]
url = response.get("url")
log_trace(cid, "Uploaded to URL: {}".format(url))
# Create MessagePayload for link transport
@@ -543,19 +709,21 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
receiver_id=receiver_id,
reply_to=reply_to,
reply_to_msg_id=reply_to_msg_id,
broker_url=nats_url,
broker_url=broker_url,
metadata={}
)
msg_json = env.to_json()
# Publish to NATS
nats_conn = NATSConnection(nats_url)
# Publish to NATS if is_publish is True
if is_publish:
nats_conn = NATSConnection(broker_url)
nats_conn.connect()
nats_conn.publish(subject, msg_json)
nats_conn.close()
return env
# Return tuple of (envelope, json_string) for both direct and link transport
return (env, msg_json)
def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5,
@@ -565,19 +733,39 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
This function processes incoming NATS messages, handling both direct transport
(base64 decoded payloads) and link transport (URL-based payloads).
API Standard:
- Returns a dictionary with envelope metadata and 'payloads' field
- payloads field contains list of (dataname, data, payload_type) tuples
- Supports mixed-content messages with different payload types
Args:
msg: NATS message to process (dict with payload data)
msg: NATS message to process (dict or JSON string with envelope data)
fileserver_download_handler: Function to handle downloading data from file server URLs
max_retries: Maximum retry attempts for fetching URL
base_delay: Initial delay for exponential backoff in ms
max_delay: Maximum delay for exponential backoff in ms
Receives: (url, max_retries, base_delay, max_delay, correlation_id)
Returns: bytes (the downloaded data)
max_retries: Maximum retry attempts for fetching URL (default: 5)
base_delay: Initial delay for exponential backoff in ms (default: 100)
max_delay: Maximum delay for exponential backoff in ms (default: 5000)
Returns:
dict: Envelope dictionary with metadata and 'payloads' field containing list of (dataname, data, type) tuples
dict: Envelope dictionary with metadata and 'payloads' field containing list of
(dataname, data, payload_type) tuples
- Envelope fields: correlation_id, msg_id, timestamp, send_to, msg_purpose,
sender_name, sender_id, receiver_name, receiver_id, reply_to, reply_to_msg_id,
broker_url, metadata
- payloads: List of (dataname, data, payload_type) tuples
Example:
>>> env = smartreceive(msg)
>>> # env contains envelope metadata and payloads field
>>> # env["payloads"] = [(dataname1, data1, payload_type1), ...]
>>> for dataname, data, payload_type in env["payloads"]:
... print("Received {} of type {}: {}".format(dataname, payload_type, data))
"""
# Parse the JSON envelope
json_data = msg if isinstance(msg, dict) else json.loads(msg)
log_trace(json_data.get("correlationId", ""), "Processing received message")
correlation_id = json_data.get("correlation_id", "")
log_trace(correlation_id, "Processing received message")
# Process all payloads in the envelope
payloads_list = []
@@ -591,43 +779,47 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
dataname = payload.get("dataname", "")
if transport == "direct":
log_trace(json_data.get("correlationId", ""),
log_trace(correlation_id,
"Direct transport - decoding payload '{}'".format(dataname))
# Extract base64 payload from the payload
payload_b64 = payload.get("data", "")
# Decode Base64 payload
try:
import ubinascii
payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8'))
except ImportError:
import base64
payload_bytes = base64.b64decode(payload_b64)
# Deserialize based on type
data_type = payload.get("type", "")
data = _deserialize_data(payload_bytes, data_type, json_data.get("correlationId", ""))
payload_type = payload.get("payload_type", "")
data = _deserialize_data(payload_bytes, payload_type, correlation_id)
payloads_list.append((dataname, data, data_type))
payloads_list.append((dataname, data, payload_type))
elif transport == "link":
# Extract download URL from the payload
url = payload.get("data", "")
log_trace(json_data.get("correlationId", ""),
log_trace(correlation_id,
"Link transport - fetching '{}' from URL: {}".format(dataname, url))
# Fetch with exponential backoff
downloaded_data = fileserver_download_handler(
url, max_retries, base_delay, max_delay, json_data.get("correlationId", "")
url, max_retries, base_delay, max_delay, correlation_id
)
# Deserialize based on type
data_type = payload.get("type", "")
data = _deserialize_data(downloaded_data, data_type, json_data.get("correlationId", ""))
payload_type = payload.get("payload_type", "")
data = _deserialize_data(downloaded_data, payload_type, correlation_id)
payloads_list.append((dataname, data, data_type))
payloads_list.append((dataname, data, payload_type))
else:
raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport))
# Replace payloads field with the processed list of (dataname, data, type) tuples
# Replace payloads field with the processed list of (dataname, data, payload_type) tuples
json_data["payloads"] = payloads_list
return json_data
@@ -646,11 +838,11 @@ def get_timestamp():
# Example usage
if __name__ == "__main__":
print("NATSBridge for Micropython")
print("=========================")
print("NATSBridge - Bi-Directional Data Bridge")
print("=======================================")
print("This module provides:")
print(" - MessageEnvelope: Message envelope structure")
print(" - MessagePayload: Payload structure")
print(" - MessageEnvelope: Message envelope structure with snake_case fields")
print(" - MessagePayload: Payload structure with payload_type field")
print(" - smartsend: Send data via NATS with automatic transport selection")
print(" - smartreceive: Receive and process messages from NATS")
print(" - plik_oneshot_upload: Upload files to HTTP file server")
@@ -658,10 +850,22 @@ if __name__ == "__main__":
print()
print("Usage:")
print(" from nats_bridge import smartsend, smartreceive")
print(" data = [(\"message\", \"Hello World\", \"text\")]")
print(" env = smartsend(\"my.subject\", data)")
print()
print(" # Send data (list of (dataname, data, payload_type) tuples)")
print(" # Even single payloads must be wrapped in a list")
print(" data = [(\"message\", \"Hello World!\", \"text\")]")
print(" env, env_json_str = smartsend(\"my.subject\", data)")
print()
print(" # On receiver:")
print(" payloads = smartreceive(msg)")
print(" for dataname, data, type in payloads:")
print(" print(f\"Received {dataname} of type {type}: {data}\")")
print(" env = smartreceive(msg)")
print(" # env contains envelope metadata and payloads field")
print(" for dataname, data, payload_type in env[\"payloads\"]:")
print(" print(\"Received {} of type {}: {}\".format(dataname, payload_type, data))")
print()
print(" # Mixed-content message example:")
print(" mixed_data = [")
print(" (\"text\", \"Hello!\", \"text\"),")
print(" (\"config\", {\"key\": \"value\"}, \"dictionary\"),")
print(" (\"table\", [{\"id\": 1}], \"table\")")
print(" ]")
print(" smartsend(\"/chat\", mixed_data)")

View File

@@ -118,7 +118,7 @@ async function test_dict_send() {
// Use smartsend with dictionary type
// For small Dictionary: will use direct transport (JSON encoded)
// For large Dictionary: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -132,7 +132,8 @@ async function test_dict_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -98,7 +98,7 @@ async function test_large_binary_send() {
// Use smartsend with binary type - will automatically use link transport
// if file size exceeds the threshold (1MB by default)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -112,7 +112,8 @@ async function test_large_binary_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -222,7 +222,7 @@ async function test_mix_send() {
];
// Use smartsend with mixed content
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
payloads,
{
@@ -236,7 +236,8 @@ async function test_mix_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -118,7 +118,7 @@ async function test_table_send() {
// Use smartsend with table type
// For small Table: will use direct transport (Arrow IPC encoded)
// For large Table: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -132,7 +132,8 @@ async function test_table_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -94,7 +94,7 @@ async function test_text_send() {
// Use smartsend with text type
// For small text: will use direct transport (Base64 encoded UTF-8)
// For large text: will use link transport (uploaded to fileserver)
const env = await smartsend(
const { env, env_json_str } = await smartsend(
SUBJECT,
[data1, data2],
{
@@ -108,7 +108,8 @@ async function test_text_send() {
receiverName: "",
receiverId: "",
replyTo: "",
replyToMsgId: ""
replyToMsgId: "",
isPublish: true // Publish the message to NATS
}
);

View File

@@ -92,12 +92,12 @@ function test_dict_send()
# Use smartsend with dictionary type
# For small Dictionary: will use direct transport (JSON encoded)
# For large Dictionary: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -105,7 +105,8 @@ function test_dict_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -114,7 +115,7 @@ function test_dict_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -79,12 +79,12 @@ function test_large_binary_send()
# Use smartsend with binary type - will automatically use link transport
# if file size exceeds the threshold (1MB by default)
# API: smartsend(subject, [(dataname, data, type), ...]; keywords...)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL;
broker_url = NATS_URL;
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -92,11 +92,12 @@ function test_large_binary_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with transport: $(env.payloads[1].transport)")
log_trace("Envelope type: $(env.payloads[1].type)")
log_trace("Envelope type: $(env.payloads[1].payload_type)")
# Check if link transport was used
if env.payloads[1].transport == "link"

View File

@@ -186,12 +186,12 @@ function test_mix_send()
]
# Use smartsend with mixed content
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
payloads; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -199,7 +199,8 @@ function test_mix_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -208,7 +209,7 @@ function test_mix_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -90,12 +90,12 @@ function test_table_send()
# Use smartsend with table type
# For small DataFrame: will use direct transport (Base64 encoded Arrow IPC)
# For large DataFrame: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -103,7 +103,8 @@ function test_table_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -112,7 +113,7 @@ function test_table_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -75,12 +75,12 @@ function test_text_send()
# Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend(
env, env_json_str = NATSBridge.smartsend(
SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL,
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
@@ -88,7 +88,8 @@ function test_text_send()
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
log_trace("Sent message with $(length(env.payloads)) payloads")
@@ -97,7 +98,7 @@ function test_text_send()
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.type)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")

View File

@@ -64,7 +64,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with dictionary type
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -76,7 +76,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -44,7 +44,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with binary type
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -56,7 +56,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -58,7 +58,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with mixed types
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
data, # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -70,7 +70,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -46,7 +46,7 @@ def main():
# Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver)
env = smartsend(
env, env_json_str = smartsend(
SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL,
@@ -58,7 +58,8 @@ def main():
receiver_name="",
receiver_id="",
reply_to="",
reply_to_msg_id=""
reply_to_msg_id="",
is_publish=True # Publish the message to NATS
)
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")