This commit is contained in:
2026-02-23 19:18:12 +07:00
parent f8a92a45a0
commit 46fdf668c6
22 changed files with 260 additions and 141 deletions

View File

@@ -356,7 +356,7 @@ const env = await smartsend(
```julia ```julia
using NATSBridge using NATSBridge
env = NATSBridge.smartsend( env, msg_json_str = NATSBridge.smartsend(
subject; # NATS subject subject; # NATS subject
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type)
nats_url::String = "nats://localhost:4222", nats_url::String = "nats://localhost:4222",
@@ -371,6 +371,9 @@ env = NATSBridge.smartsend(
reply_to::String = "", reply_to::String = "",
reply_to_msg_id::String = "" reply_to_msg_id::String = ""
) )
# Returns: (msgEnvelope_v1, JSON string)
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
# - msg_json_str: JSON string representation of the envelope for publishing
``` ```
### smartreceive ### smartreceive
@@ -636,19 +639,24 @@ data = [("students", df, "table")]
NATSBridge.smartsend("/data/analysis", data) NATSBridge.smartsend("/data/analysis", data)
``` ```
### Example 4: Request-Response Pattern ### Example 4: Request-Response Pattern with Envelope JSON
Bi-directional communication with reply-to support. Bi-directional communication with reply-to support. The `smartsend` function now returns both the envelope object and a JSON string that can be published directly.
#### Python/Micropython (Requester) #### Python/Micropython (Requester)
```python ```python
from nats_bridge import smartsend from nats_bridge import smartsend
env = smartsend( env, msg_json_str = smartsend(
"/device/command", "/device/command",
[("command", {"action": "read_sensor"}, "dictionary")], [("command", {"action": "read_sensor"}, "dictionary")],
reply_to="/device/response" reply_to="/device/response"
) )
# env: msgEnvelope_v1 object
# msg_json_str: JSON string for publishing to NATS
# The msg_json_str can also be published directly using NATS request-reply pattern
# nc.request("/device/command", msg_json_str, reply_to="/device/response")
``` ```
#### Python/Micropython (Responder) #### Python/Micropython (Responder)

View File

@@ -387,9 +387,20 @@ function smartsend(
nats_url::String = "nats://localhost:4222", nats_url::String = "nats://localhost:4222",
fileserverUploadHandler::Function = plik_oneshot_upload, fileserverUploadHandler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000 # 1MB size_threshold::Int = 1_000_000 # 1MB
is_publish::Bool = true # Whether to automatically publish to NATS
) )
``` ```
**Return Value:**
- Returns a tuple `(env, msg_json_str)` where:
- `env::msgEnvelope_v1` - The envelope object containing all metadata and payloads
- `msg_json_str::String` - JSON string representation of the envelope for publishing
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
The envelope object can be accessed directly for programmatic use, while the JSON string can be published directly to NATS using the request-reply pattern.
**Input Format:** **Input Format:**
- `data::AbstractArray{Tuple{String, Any, String}}` - **Must be a list of (dataname, data, type) tuples**: `[("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]` - `data::AbstractArray{Tuple{String, Any, String}}` - **Must be a list of (dataname, data, type) tuples**: `[("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]` - Even for single payloads: `[(dataname1, data1, "type1")]`

View File

@@ -146,7 +146,22 @@ All three implementations (Julia, JavaScript, Python/Micropython) follow the sam
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
``` ```
## Files ## smartsend Return Value
The `smartsend` function now returns a tuple containing both the envelope object and the JSON string representation:
```julia
env, msg_json_str = smartsend(...)
# env::msgEnvelope_v1 - The envelope object with all metadata and payloads
# msg_json_str::String - JSON string for publishing to NATS
```
**Options:**
- `is_publish::Bool = true` - When `true` (default), the message is automatically published to NATS. When `false`, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.
This enables two use cases:
1. **Programmatic envelope access**: Access envelope fields directly via the `env` object
2. **Direct JSON publishing**: Publish the JSON string directly using NATS request-reply pattern
### Julia Module: [`src/NATSBridge.jl`](../src/NATSBridge.jl) ### Julia Module: [`src/NATSBridge.jl`](../src/NATSBridge.jl)
@@ -345,7 +360,9 @@ df = DataFrame(
) )
# Send via SmartSend - wrapped in a list (type is part of each tuple) # Send via SmartSend - wrapped in a list (type is part of each tuple)
await SmartSend("analysis_results", [("table_data", df, "table")]); env, msg_json_str = SmartSend("analysis_results", [("table_data", df, "table")])
# env: msgEnvelope_v1 object with all metadata and payloads
# msg_json_str: JSON string representation of the envelope for publishing
``` ```
#### JavaScript (Receiver) #### JavaScript (Receiver)

View File

@@ -107,10 +107,15 @@ python3 -m http.server 8080 --directory /tmp/fileserver
```python ```python
from nats_bridge import smartsend from nats_bridge import smartsend
# Send a text message # Send a text message (is_publish=True by default)
data = [("message", "Hello World", "text")] data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222") env, msg_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
print("Message sent!") print("Message sent!")
# Or use is_publish=False to get envelope and JSON without publishing
env, msg_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222", is_publish=False)
# env: MessageEnvelope object
# msg_json_str: JSON string for publishing to NATS
``` ```
#### JavaScript #### JavaScript
@@ -118,12 +123,19 @@ print("Message sent!")
```javascript ```javascript
const { smartsend } = require('./src/NATSBridge'); const { smartsend } = require('./src/NATSBridge');
// Send a text message // Send a text message (isPublish=true by default)
await smartsend("/chat/room1", [ await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" } { dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222" }); ], { natsUrl: "nats://localhost:4222" });
console.log("Message sent!"); console.log("Message sent!");
// Or use isPublish=false to get envelope and JSON without publishing
const { env, msg_json_str } = await smartsend("/chat/room1", [
{ dataname: "message", data: "Hello World", type: "text" }
], { natsUrl: "nats://localhost:4222", isPublish: false });
// env: MessageEnvelope object
// msg_json_str: JSON string for publishing to NATS
``` ```
#### Julia #### Julia
@@ -133,7 +145,9 @@ using NATSBridge
# Send a text message # Send a text message
data = [("message", "Hello World", "text")] data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222") env, msg_json_str = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# env: msgEnvelope_v1 object with all metadata and payloads
# msg_json_str: JSON string representation of the envelope for publishing
println("Message sent!") println("Message sent!")
``` ```
@@ -279,13 +293,15 @@ from nats_bridge import smartsend
# Send command with reply-to # Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")] data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend( env, msg_json_str = smartsend(
"/device/command", "/device/command",
data, data,
nats_url="nats://localhost:4222", nats_url="nats://localhost:4222",
reply_to="/device/response", reply_to="/device/response",
reply_to_msg_id="cmd-001" reply_to_msg_id="cmd-001"
) )
# env: msgEnvelope_v1 object
# msg_json_str: JSON string for publishing to NATS
``` ```
#### JavaScript (Responder) #### JavaScript (Responder)

View File

@@ -514,6 +514,7 @@ class SensorSender:
data = [("reading", reading.to_dict(), "dictionary")] data = [("reading", reading.to_dict(), "dictionary")]
# Default: is_publish=True (automatically publishes to NATS)
smartsend( smartsend(
f"/sensors/{sensor_id}", f"/sensors/{sensor_id}",
data, data,
@@ -521,6 +522,31 @@ class SensorSender:
fileserver_url=self.fileserver_url fileserver_url=self.fileserver_url
) )
def prepare_message_only(self, sensor_id: str, value: float, unit: str):
"""Prepare a message without publishing (is_publish=False)."""
reading = SensorReading(
sensor_id=sensor_id,
timestamp=datetime.now().isoformat(),
value=value,
unit=unit
)
data = [("reading", reading.to_dict(), "dictionary")]
# With is_publish=False, returns (envelope, json_str) without publishing
env, msg_json_str = smartsend(
f"/sensors/{sensor_id}/prepare",
data,
nats_url=self.nats_url,
fileserver_url=self.fileserver_url,
is_publish=False
)
# Now you can publish manually using NATS request-reply pattern
# nc.request(subject, msg_json_str, reply_to=reply_to_topic)
return env, msg_json_str
def send_batch(self, readings: List[SensorReading]): def send_batch(self, readings: List[SensorReading]):
batch = SensorBatch() batch = SensorBatch()
for reading in readings: for reading in readings:

View File

@@ -286,35 +286,35 @@ function envelope_to_json(env::msgEnvelope_v1)
# Convert payloads to JSON array # Convert payloads to JSON array
if !isempty(env.payloads) if !isempty(env.payloads)
payloads_json = [] payloads_json = []
for payload in env.payloads for payload in env.payloads
payload_obj = Dict{String, Any}( payload_obj = Dict{String, Any}(
"id" => payload.id, "id" => payload.id,
"dataname" => payload.dataname, "dataname" => payload.dataname,
"type" => payload.type, "type" => payload.type,
"transport" => payload.transport, "transport" => payload.transport,
"encoding" => payload.encoding, "encoding" => payload.encoding,
"size" => payload.size, "size" => payload.size,
) )
# Include data based on transport type # Include data based on transport type
if payload.transport == "direct" && payload.data !== nothing if payload.transport == "direct" && payload.data !== nothing
if payload.encoding == "base64" || payload.encoding == "json" if payload.encoding == "base64" || payload.encoding == "json"
payload_obj["data"] = payload.data payload_obj["data"] = payload.data
else else
# For other encodings, use base64 # For other encodings, use base64
payload_bytes = _get_payload_bytes(payload.data) payload_bytes = _get_payload_bytes(payload.data)
payload_obj["data"] = Base64.base64encode(payload_bytes) payload_obj["data"] = Base64.base64encode(payload_bytes)
end
elseif payload.transport == "link" && payload.data !== nothing
# For link transport, data is a URL string - include directly
payload_obj["data"] = payload.data
end
if !isempty(payload.metadata)
payload_obj["metadata"] = Dict(String(k) => v for (k, v) in payload.metadata)
end end
push!(payloads_json, payload_obj) elseif payload.transport == "link" && payload.data !== nothing
# For link transport, data is a URL string - include directly
payload_obj["data"] = payload.data
end
if !isempty(payload.metadata)
payload_obj["metadata"] = Dict(String(k) => v for (k, v) in payload.metadata)
end end
obj["payloads"] = payloads_json push!(payloads_json, payload_obj)
end
obj["payloads"] = payloads_json
end end
JSON.json(obj) JSON.json(obj)
@@ -361,6 +361,7 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
3. Compares the serialized size against `size_threshold` 3. Compares the serialized size against `size_threshold`
4. For small payloads: encodes as Base64, constructs a "direct" msgPayload_v1 4. For small payloads: encodes as Base64, constructs a "direct" msgPayload_v1
5. For large payloads: uploads to the fileserver, constructs a "link" msgPayload_v1 with the URL 5. For large payloads: uploads to the fileserver, constructs a "link" msgPayload_v1 with the URL
6. Converts envelope to JSON string and optionally publishes to NATS
# Arguments: # Arguments:
- `subject::String` - NATS subject to publish the message to - `subject::String` - NATS subject to publish the message to
@@ -372,18 +373,22 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
# Keyword Arguments: # Keyword Arguments:
- `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server - `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server
- `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads
- `fileserverUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys) - `fileserverUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport - `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated - `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc. - `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
- `sender_name::String = "NATSBridge"` - Name of the sender - `sender_name::String = "default"` - Name of the sender
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast) - `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
- `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast) - `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast)
- `reply_to::String = ""` - Topic to reply to (empty string if no reply expected) - `reply_to::String = ""` - Topic to reply to (empty string if no reply expected)
- `reply_to_msg_id::String = ""` - Message ID this message is replying to - `reply_to_msg_id::String = ""` - Message ID this message is replying to
- `is_publish::Bool = true` - Whether to automatically publish the message to NATS
# Return: # Return:
- A `msgEnvelope_v1` object containing metadata and transport information - A tuple `(env, msg_json_str)` where:
- `env::msgEnvelope_v1` - The envelope object containing all metadata and payloads
- `msg_json_str::String` - JSON string representation of the envelope for publishing
# Example # Example
```jldoctest ```jldoctest
@@ -391,39 +396,43 @@ using UUIDs
# Send a single payload (still wrapped in a list) # Send a single payload (still wrapped in a list)
data = Dict("key" => "value") data = Dict("key" => "value")
env = smartsend("my.subject", [("dataname1", data, "dictionary")]) env, msg_json = smartsend("my.subject", [("dataname1", data, "dictionary")])
# Send multiple payloads in one message with different types # Send multiple payloads in one message with different types
data1 = Dict("key1" => "value1") data1 = Dict("key1" => "value1")
data2 = rand(10_000) # Small array data2 = rand(10_000) # Small array
env = smartsend("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")]) env, msg_json = smartsend("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")])
# Send a large array using fileserver upload # Send a large array using fileserver upload
data = rand(10_000_000) # ~80 MB data = rand(10_000_000) # ~80 MB
env = smartsend("large.data", [("large_table", data, "table")]) env, msg_json = smartsend("large.data", [("large_table", data, "table")])
# Mixed content (e.g., chat with text and image) # Mixed content (e.g., chat with text and image)
env = smartsend("chat.subject", [ env, msg_json = smartsend("chat.subject", [
("message_text", "Hello!", "text"), ("message_text", "Hello!", "text"),
("user_image", image_data, "image"), ("user_image", image_data, "image"),
("audio_clip", audio_data, "audio") ("audio_clip", audio_data, "audio")
]) ])
# Publish the JSON string directly using NATS request-reply pattern
# reply = NATS.request(nats_url, subject, msg_json_str; reply_to=reply_to_topic)
``` ```
""" """ #[PENDING]
function smartsend( function smartsend(
subject::String, # smartreceive's subject subject::String, # smartreceive's subject
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples. Use Tuple{String, Any, String}[] for empty payloads
nats_url::String = DEFAULT_NATS_URL, nats_url::String = DEFAULT_NATS_URL,
fileserver_url = DEFAULT_FILESERVER_URL, fileserver_url = DEFAULT_FILESERVER_URL,
fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
size_threshold::Int = DEFAULT_SIZE_THRESHOLD, size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing, correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat", msg_purpose::String = "chat",
sender_name::String = "NATSBridge", sender_name::String = "default",
receiver_name::String = "", receiver_name::String = "",
receiver_id::String = "", receiver_id::String = "",
reply_to::String = "", reply_to::String = "",
reply_to_msg_id::String = "" reply_to_msg_id::String = "",
is_publish::Bool = true # some time the user want to get env and msg_json_str from this function without publishing the msg
) where {T1<:Any} ) where {T1<:Any}
# Generate correlation ID if not provided # Generate correlation ID if not provided
@@ -437,59 +446,59 @@ function smartsend(
# Process each payload in the list # Process each payload in the list
payloads = msgPayload_v1[] payloads = msgPayload_v1[]
for (dataname, payload_data, payload_type) in data for (dataname, payload_data, payload_type) in data
# Serialize data based on type # Serialize data based on type
payload_bytes = _serialize_data(payload_data, payload_type) payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = length(payload_bytes) # Calculate payload size in bytes
log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size
# Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport
# Direct path - Base64 encode and send via NATS
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
payload_size = length(payload_bytes) # Calculate payload size in bytes # Create msgPayload_v1 for direct transport
log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size payload = msgPayload_v1(
payload_b64,
payload_type;
id = string(uuid4()),
dataname = dataname,
transport = "direct",
encoding = "base64",
size = payload_size,
metadata = Dict{String, Any}("payload_bytes" => payload_size)
)
push!(payloads, payload)
else
# Link path - Upload to HTTP server, send URL via NATS
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
# Decision: Direct vs Link # Upload to HTTP server
if payload_size < size_threshold # Check if payload is small enough for direct transport response = fileserverUploadHandler(fileserver_url, dataname, payload_bytes)
# Direct path - Base64 encode and send via NATS
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string if response["status"] != 200 # Check if upload was successful
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice error("Failed to upload data to fileserver: $(response["status"])") # Throw error if upload failed
# Create msgPayload_v1 for direct transport
payload = msgPayload_v1(
payload_b64,
payload_type;
id = string(uuid4()),
dataname = dataname,
transport = "direct",
encoding = "base64",
size = payload_size,
metadata = Dict{String, Any}("payload_bytes" => payload_size)
)
push!(payloads, payload)
else
# Link path - Upload to HTTP server, send URL via NATS
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
# Upload to HTTP server
response = fileserverUploadHandler(fileserver_url, dataname, payload_bytes)
if response["status"] != 200 # Check if upload was successful
error("Failed to upload data to fileserver: $(response["status"])") # Throw error if upload failed
end
url = response["url"] # URL for the uploaded data
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
# Create msgPayload_v1 for link transport
payload = msgPayload_v1(
url,
payload_type;
id = string(uuid4()),
dataname = dataname,
transport = "link",
encoding = "none",
size = payload_size,
metadata = Dict{String, Any}()
)
push!(payloads, payload)
end end
url = response["url"] # URL for the uploaded data
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
# Create msgPayload_v1 for link transport
payload = msgPayload_v1(
url,
payload_type;
id = string(uuid4()),
dataname = dataname,
transport = "link",
encoding = "none",
size = payload_size,
metadata = Dict{String, Any}()
)
push!(payloads, payload)
end
end end
# Create msgEnvelope_v1 with all payloads # Create msgEnvelope_v1 with all payloads
env = msgEnvelope_v1( env = msgEnvelope_v1(
subject, subject,
@@ -507,10 +516,12 @@ function smartsend(
metadata = Dict{String, Any}(), metadata = Dict{String, Any}(),
) )
msg_json = envelope_to_json(env) # Convert envelope to JSON msg_json_str = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS if is_publish
publish_message(nats_url, subject, msg_json_str, cid) # Publish message to NATS
end
return env # Return the envelope for tracking return (env, msg_json_str)
end end

View File

@@ -460,8 +460,9 @@ async function smartsend(subject, data, options = {}) {
* @param {string} options.receiverId - UUID of the receiver (default: "") * @param {string} options.receiverId - UUID of the receiver (default: "")
* @param {string} options.replyTo - Topic to reply to (default: "") * @param {string} options.replyTo - Topic to reply to (default: "")
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "") * @param {string} options.replyToMsgId - Message ID this message is replying to (default: "")
* @param {boolean} options.isPublish - Whether to automatically publish the message to NATS (default: true)
* *
* @returns {Promise<MessageEnvelope>} - The envelope for tracking * @returns {Promise<Object>} - An object with { env: MessageEnvelope, msg_json_str: string }
*/ */
const { const {
natsUrl = DEFAULT_NATS_URL, natsUrl = DEFAULT_NATS_URL,
@@ -474,7 +475,8 @@ async function smartsend(subject, data, options = {}) {
receiverName = "", receiverName = "",
receiverId = "", receiverId = "",
replyTo = "", replyTo = "",
replyToMsgId = "" replyToMsgId = "",
isPublish = true // Whether to automatically publish the message to NATS
} = options; } = options;
log_trace(correlationId, `Starting smartsend for subject: ${subject}`); log_trace(correlationId, `Starting smartsend for subject: ${subject}`);
@@ -556,10 +558,19 @@ async function smartsend(subject, data, options = {}) {
payloads: payloads payloads: payloads
}); });
// Publish message to NATS // Convert envelope to JSON string
await publish_message(natsUrl, subject, env.toString(), correlationId); const msg_json_str = env.toString();
return env; // Publish to NATS if isPublish is true
if (isPublish) {
await publish_message(natsUrl, subject, msg_json_str, correlationId);
}
// Return both envelope and JSON string (tuple-like structure)
return {
env: env,
msg_json_str: msg_json_str
};
} }
// Helper: Publish message to NATS // Helper: Publish message to NATS

View File

@@ -437,7 +437,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL, def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL,
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD, fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge", correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id=""): receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True):
"""Send data either directly via NATS or via a fileserver URL, depending on payload size. """Send data either directly via NATS or via a fileserver URL, depending on payload size.
This function intelligently routes data delivery based on payload size relative to a threshold. This function intelligently routes data delivery based on payload size relative to a threshold.
@@ -459,9 +459,12 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
receiver_id: UUID of the receiver receiver_id: UUID of the receiver
reply_to: Topic to reply to reply_to: Topic to reply to
reply_to_msg_id: Message ID this message is replying to reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS (default: True)
Returns: Returns:
MessageEnvelope: The envelope object for tracking tuple: (env, msg_json_str) where:
- env: MessageEnvelope object with all metadata and payloads
- msg_json_str: JSON string representation of the envelope for publishing
""" """
# Generate correlation ID if not provided # Generate correlation ID if not provided
cid = correlation_id if correlation_id else str(uuid.uuid4()) cid = correlation_id if correlation_id else str(uuid.uuid4())
@@ -549,13 +552,15 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
msg_json = env.to_json() msg_json = env.to_json()
# Publish to NATS # Publish to NATS if is_publish is True
nats_conn = NATSConnection(nats_url) if is_publish:
nats_conn.connect() nats_conn = NATSConnection(nats_url)
nats_conn.publish(subject, msg_json) nats_conn.connect()
nats_conn.close() nats_conn.publish(subject, msg_json)
nats_conn.close()
return env # Return tuple of (envelope, json_string) for both direct and link transport
return (env, msg_json)
def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5, def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5,

View File

@@ -118,7 +118,7 @@ async function test_dict_send() {
// Use smartsend with dictionary type // Use smartsend with dictionary type
// For small Dictionary: will use direct transport (JSON encoded) // For small Dictionary: will use direct transport (JSON encoded)
// For large Dictionary: will use link transport (uploaded to fileserver) // For large Dictionary: will use link transport (uploaded to fileserver)
const env = await smartsend( const { env, msg_json_str } = await smartsend(
SUBJECT, SUBJECT,
[data1, data2], [data1, data2],
{ {
@@ -132,7 +132,8 @@ async function test_dict_send() {
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Publish the message to NATS
} }
); );

View File

@@ -98,7 +98,7 @@ async function test_large_binary_send() {
// Use smartsend with binary type - will automatically use link transport // Use smartsend with binary type - will automatically use link transport
// if file size exceeds the threshold (1MB by default) // if file size exceeds the threshold (1MB by default)
const env = await smartsend( const { env, msg_json_str } = await smartsend(
SUBJECT, SUBJECT,
[data1, data2], [data1, data2],
{ {
@@ -112,7 +112,8 @@ async function test_large_binary_send() {
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Publish the message to NATS
} }
); );

View File

@@ -222,7 +222,7 @@ async function test_mix_send() {
]; ];
// Use smartsend with mixed content // Use smartsend with mixed content
const env = await smartsend( const { env, msg_json_str } = await smartsend(
SUBJECT, SUBJECT,
payloads, payloads,
{ {
@@ -236,7 +236,8 @@ async function test_mix_send() {
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Publish the message to NATS
} }
); );

View File

@@ -118,7 +118,7 @@ async function test_table_send() {
// Use smartsend with table type // Use smartsend with table type
// For small Table: will use direct transport (Arrow IPC encoded) // For small Table: will use direct transport (Arrow IPC encoded)
// For large Table: will use link transport (uploaded to fileserver) // For large Table: will use link transport (uploaded to fileserver)
const env = await smartsend( const { env, msg_json_str } = await smartsend(
SUBJECT, SUBJECT,
[data1, data2], [data1, data2],
{ {
@@ -132,7 +132,8 @@ async function test_table_send() {
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Publish the message to NATS
} }
); );

View File

@@ -94,7 +94,7 @@ async function test_text_send() {
// Use smartsend with text type // Use smartsend with text type
// For small text: will use direct transport (Base64 encoded UTF-8) // For small text: will use direct transport (Base64 encoded UTF-8)
// For large text: will use link transport (uploaded to fileserver) // For large text: will use link transport (uploaded to fileserver)
const env = await smartsend( const { env, msg_json_str } = await smartsend(
SUBJECT, SUBJECT,
[data1, data2], [data1, data2],
{ {
@@ -108,7 +108,8 @@ async function test_text_send() {
receiverName: "", receiverName: "",
receiverId: "", receiverId: "",
replyTo: "", replyTo: "",
replyToMsgId: "" replyToMsgId: "",
isPublish: true // Publish the message to NATS
} }
); );

View File

@@ -92,7 +92,7 @@ function test_dict_send()
# Use smartsend with dictionary type # Use smartsend with dictionary type
# For small Dictionary: will use direct transport (JSON encoded) # For small Dictionary: will use direct transport (JSON encoded)
# For large Dictionary: will use link transport (uploaded to fileserver) # For large Dictionary: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend( env, msg_json = NATSBridge.smartsend(
SUBJECT, SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples [data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL, nats_url = NATS_URL,
@@ -105,7 +105,8 @@ function test_dict_send()
receiver_name = "", receiver_name = "",
receiver_id = "", receiver_id = "",
reply_to = "", reply_to = "",
reply_to_msg_id = "" reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
) )
log_trace("Sent message with $(length(env.payloads)) payloads") log_trace("Sent message with $(length(env.payloads)) payloads")

View File

@@ -79,7 +79,7 @@ function test_large_binary_send()
# Use smartsend with binary type - will automatically use link transport # Use smartsend with binary type - will automatically use link transport
# if file size exceeds the threshold (1MB by default) # if file size exceeds the threshold (1MB by default)
# API: smartsend(subject, [(dataname, data, type), ...]; keywords...) # API: smartsend(subject, [(dataname, data, type), ...]; keywords...)
env = NATSBridge.smartsend( env, msg_json = NATSBridge.smartsend(
SUBJECT, SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples [data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL; nats_url = NATS_URL;
@@ -92,7 +92,8 @@ function test_large_binary_send()
receiver_name = "", receiver_name = "",
receiver_id = "", receiver_id = "",
reply_to = "", reply_to = "",
reply_to_msg_id = "" reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
) )
log_trace("Sent message with transport: $(env.payloads[1].transport)") log_trace("Sent message with transport: $(env.payloads[1].transport)")

View File

@@ -186,7 +186,7 @@ function test_mix_send()
] ]
# Use smartsend with mixed content # Use smartsend with mixed content
env = NATSBridge.smartsend( env, msg_json = NATSBridge.smartsend(
SUBJECT, SUBJECT,
payloads; # List of (dataname, data, type) tuples payloads; # List of (dataname, data, type) tuples
nats_url = NATS_URL, nats_url = NATS_URL,
@@ -199,7 +199,8 @@ function test_mix_send()
receiver_name = "", receiver_name = "",
receiver_id = "", receiver_id = "",
reply_to = "", reply_to = "",
reply_to_msg_id = "" reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
) )
log_trace("Sent message with $(length(env.payloads)) payloads") log_trace("Sent message with $(length(env.payloads)) payloads")

View File

@@ -90,7 +90,7 @@ function test_table_send()
# Use smartsend with table type # Use smartsend with table type
# For small DataFrame: will use direct transport (Base64 encoded Arrow IPC) # For small DataFrame: will use direct transport (Base64 encoded Arrow IPC)
# For large DataFrame: will use link transport (uploaded to fileserver) # For large DataFrame: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend( env, msg_json = NATSBridge.smartsend(
SUBJECT, SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples [data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL, nats_url = NATS_URL,
@@ -103,7 +103,8 @@ function test_table_send()
receiver_name = "", receiver_name = "",
receiver_id = "", receiver_id = "",
reply_to = "", reply_to = "",
reply_to_msg_id = "" reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
) )
log_trace("Sent message with $(length(env.payloads)) payloads") log_trace("Sent message with $(length(env.payloads)) payloads")

View File

@@ -75,7 +75,7 @@ function test_text_send()
# Use smartsend with text type # Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8) # For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver) # For large text: will use link transport (uploaded to fileserver)
env = NATSBridge.smartsend( env, msg_json = NATSBridge.smartsend(
SUBJECT, SUBJECT,
[data1, data2]; # List of (dataname, data, type) tuples [data1, data2]; # List of (dataname, data, type) tuples
nats_url = NATS_URL, nats_url = NATS_URL,
@@ -88,7 +88,8 @@ function test_text_send()
receiver_name = "", receiver_name = "",
receiver_id = "", receiver_id = "",
reply_to = "", reply_to = "",
reply_to_msg_id = "" reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
) )
log_trace("Sent message with $(length(env.payloads)) payloads") log_trace("Sent message with $(length(env.payloads)) payloads")

View File

@@ -64,7 +64,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}") log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with dictionary type # Use smartsend with dictionary type
env = smartsend( env, msg_json = smartsend(
SUBJECT, SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples [data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL, nats_url=NATS_URL,
@@ -76,7 +76,8 @@ def main():
receiver_name="", receiver_name="",
receiver_id="", receiver_id="",
reply_to="", reply_to="",
reply_to_msg_id="" reply_to_msg_id="",
is_publish=True # Publish the message to NATS
) )
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads") log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -44,7 +44,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}") log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with binary type # Use smartsend with binary type
env = smartsend( env, msg_json = smartsend(
SUBJECT, SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples [data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL, nats_url=NATS_URL,
@@ -56,7 +56,8 @@ def main():
receiver_name="", receiver_name="",
receiver_id="", receiver_id="",
reply_to="", reply_to="",
reply_to_msg_id="" reply_to_msg_id="",
is_publish=True # Publish the message to NATS
) )
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads") log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -58,7 +58,7 @@ def main():
log_trace(correlation_id, f"Correlation ID: {correlation_id}") log_trace(correlation_id, f"Correlation ID: {correlation_id}")
# Use smartsend with mixed types # Use smartsend with mixed types
env = smartsend( env, msg_json = smartsend(
SUBJECT, SUBJECT,
data, # List of (dataname, data, type) tuples data, # List of (dataname, data, type) tuples
nats_url=NATS_URL, nats_url=NATS_URL,
@@ -70,7 +70,8 @@ def main():
receiver_name="", receiver_name="",
receiver_id="", receiver_id="",
reply_to="", reply_to="",
reply_to_msg_id="" reply_to_msg_id="",
is_publish=True # Publish the message to NATS
) )
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads") log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")

View File

@@ -46,7 +46,7 @@ def main():
# Use smartsend with text type # Use smartsend with text type
# For small text: will use direct transport (Base64 encoded UTF-8) # For small text: will use direct transport (Base64 encoded UTF-8)
# For large text: will use link transport (uploaded to fileserver) # For large text: will use link transport (uploaded to fileserver)
env = smartsend( env, msg_json = smartsend(
SUBJECT, SUBJECT,
[data1, data2], # List of (dataname, data, type) tuples [data1, data2], # List of (dataname, data, type) tuples
nats_url=NATS_URL, nats_url=NATS_URL,
@@ -58,7 +58,8 @@ def main():
receiver_name="", receiver_name="",
receiver_id="", receiver_id="",
reply_to="", reply_to="",
reply_to_msg_id="" reply_to_msg_id="",
is_publish=True # Publish the message to NATS
) )
log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads") log_trace(correlation_id, f"Sent message with {len(env.payloads)} payloads")