smartreceive_return_envelope #6

Merged
ton merged 6 commits from smartreceive_return_envelope into main 2026-02-22 23:59:14 +00:00
17 changed files with 147 additions and 109 deletions
Showing only changes of commit 075d355c58 - Show all commits

View File

@@ -35,8 +35,24 @@ The system uses a **standardized list-of-tuples format** for all payload operati
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (always returns a list of tuples)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# {
# "correlationId": "...",
# "msgId": "...",
# "timestamp": "...",
# "sendTo": "...",
# "msgPurpose": "...",
# "senderName": "...",
# "senderId": "...",
# "receiverName": "...",
# "receiverId": "...",
# "replyTo": "...",
# "replyToMsgId": "...",
# "brokerURL": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
```
**Supported Types:**
@@ -81,9 +97,10 @@ smartsend(
nats_url="nats://localhost:4222"
)
# Receive always returns a list
payloads = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay)
# payloads = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay)
# envelope["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# envelope["correlationId"], envelope["msgId"], etc.
```
## Architecture Diagram
@@ -338,23 +355,25 @@ function smartreceive(
# If direct: decode Base64 payload
# If link: fetch from URL with exponential backoff using fileserverDownloadHandler
# Deserialize payload based on type
# Return list of (dataname, data, type) tuples
# Return envelope dictionary with all metadata and deserialized payloads
end
```
**Output Format:**
- Always returns a list of tuples: `[(dataname1, data1, type1), (dataname2, data2, type2), ...]`
- Even for single payloads: `[(dataname1, data1, type1)]`
- Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract the `payloads` array
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads`
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff (via `fileserverDownloadHandler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return list of `(dataname, data, type)` tuples
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
**Note:** The `fileserverDownloadHandler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
@@ -401,15 +420,21 @@ async function smartreceive(msg, options = {})
// - correlationId: optional correlation ID for tracing
```
**Output Format:**
- Returns a dictionary (key-value map) containing all envelope fields:
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
- `metadata` - Message-level metadata dictionary
- `payloads` - List of dictionaries, each containing deserialized payload data
**Process Flow:**
1. Parse the JSON envelope to extract the `payloads` array
1. Parse the JSON envelope to extract all fields
2. Iterate through each payload in `payloads`
3. For each payload:
- Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return list of `(dataname, data, type)` tuples
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
## Scenario Implementations

View File

@@ -13,8 +13,24 @@ The implementation uses a **standardized list-of-tuples format** for all payload
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (always returns a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns envelope dictionary with payloads field)
# Returns: Dict with envelope metadata and payloads field containing list of tuples
# {
# "correlationId": "...",
# "msgId": "...",
# "timestamp": "...",
# "sendTo": "...",
# "msgPurpose": "...",
# "senderName": "...",
# "senderId": "...",
# "receiverName": "...",
# "receiverId": "...",
# "replyTo": "...",
# "replyToMsgId": "...",
# "brokerURL": "...",
# "metadata": {...},
# "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }
```
Where `type` can be: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
@@ -27,9 +43,10 @@ smartsend("/test", [(dataname1, data1, "text")], ...)
# Multiple payloads in one message (each payload has its own type)
smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], ...)
# Receive always returns a list with type info
payloads = smartreceive(msg, ...)
# payloads = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...]
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, ...)
# envelope["payloads"] = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...]
# envelope["correlationId"], envelope["msgId"], etc.
```
## Architecture
@@ -159,9 +176,10 @@ smartsend("/test", [("single_data", mydata, "dictionary")])
```julia
using NATSBridge
# Receive returns a list of payloads with type info
payloads = smartreceive(msg, "http://localhost:8080")
# payloads = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
# Receive returns a dictionary envelope with all metadata and deserialized payloads
envelope = smartreceive(msg, "http://localhost:8080")
# envelope["payloads"] = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
# envelope["correlationId"], envelope["msgId"], etc.
```
### Scenario 1: Command & Control (Small JSON)
@@ -227,13 +245,18 @@ const nc = await connect({ servers: ['nats://localhost:4222'] });
const sub = nc.subscribe("control");
for await (const msg of sub) {
const result = await smartreceive(msg);
const envelope = await smartreceive(msg);
// Process the result
for (const { dataname, data, type } of result) {
// Process the payloads from the envelope
for (const payload of envelope.payloads) {
const { dataname, data, type } = payload;
console.log(`Received ${dataname} of type ${type}`);
console.log(`Data: ${JSON.stringify(data)}`);
}
// Also access envelope metadata
console.log(`Correlation ID: ${envelope.correlationId}`);
console.log(`Message ID: ${envelope.msgId}`);
}
```
@@ -259,11 +282,11 @@ await SmartSend("analysis_results", [("table_data", df, "table")]);
```javascript
const { smartreceive } = require('./src/NATSBridge');
const result = await smartreceive(msg);
const envelope = await smartreceive(msg);
// Use table data for visualization with Perspective.js or D3
// Use table data from the payloads field
// Note: Tables are sent as arrays of objects in JavaScript
const table = result;
const table = envelope.payloads;
```
### Scenario 3: Live Binary Processing
@@ -309,13 +332,13 @@ const { smartreceive } = require('./src/NATSBridge');
// Receive binary data
function process_binary(msg) {
const result = await smartreceive(msg);
const envelope = await smartreceive(msg);
// Process the binary data
for (const { dataname, data, type } of result) {
if (type === "binary") {
// Process the binary data from envelope.payloads
for (const payload of envelope.payloads) {
if (payload.type === "binary") {
// data is an ArrayBuffer or Uint8Array
console.log(`Received binary data: ${dataname}, size: ${data.length}`);
console.log(`Received binary data: ${payload.dataname}, size: ${payload.data.length}`);
// Perform FFT or AI transcription here
}
}
@@ -353,8 +376,8 @@ const consumer = await js.pullSubscribe("health", {
// Process historical and real-time messages
for await (const msg of consumer) {
const result = await smartreceive(msg);
// result contains the list of payloads
const envelope = await smartreceive(msg);
// envelope.payloads contains the list of payloads
// Each payload has: dataname, data, type
msg.ack();
}
@@ -395,11 +418,11 @@ smartsend(
const { smartreceive, smartsend } = require('./src/NATSBridge');
// Receive NATS message with direct transport
const result = await smartreceive(msg);
const envelope = await smartreceive(msg);
// Decode Base64 payload (for direct transport)
// For tables, data is an array of objects
const table = result; // Array of objects
// For tables, data is in envelope.payloads
const table = envelope.payloads; // Array of objects
// User makes selection
const selection = uiComponent.getSelectedOption();

21
etc.jl
View File

@@ -1,21 +0,0 @@
Check architecture.jl, NATSBridge.jl and its test files:
- test_julia_to_julia_table_receiver.jl
- test_julia_to_julia_table_sender.jl.
Now I want to test sending a mix-content message from Julia serviceA to Julia serviceB, for example, a chat system.
The test message must show that any combination and any number and any data size of text | json | table | image | audio | video | binary can be send and receive.
Can you write me the following test files:
- test_julia_to_julia_mix_receiver.jl
- test_julia_to_julia_mix_sender.jl
1. create a tutorial file "tutorial_julia.md" for NATSBridge.jl
2. create a walkthrough file "walkthrough_julia.md" for NATSBridge.jl
You may consult architecture.md for more info.

View File

@@ -759,8 +759,8 @@ function smartreceive(
error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
end
end
return payloads_list # Return list of (dataname, data, data_type) tuples
json_data["payloads"] = payloads_list
return json_data # Return envelope with list of (dataname, data, data_type) tuples in payloads field
end

View File

@@ -603,7 +603,7 @@ async function smartreceive(msg, options = {}) {
* @param {number} options.baseDelay - Initial delay for exponential backoff in ms (default: 100)
* @param {number} options.maxDelay - Maximum delay for exponential backoff in ms (default: 5000)
*
* @returns {Promise<Array>} - List of {dataname, data, type} objects
* @returns {Promise<Object>} - Envelope dictionary with metadata and payloads field containing list of {dataname, data, type} objects
*/
const {
fileserverDownloadHandler = _fetch_with_backoff,
@@ -664,7 +664,10 @@ async function smartreceive(msg, options = {}) {
}
}
return payloads_list;
// Replace payloads array with the processed list of {dataname, data, type} tuples
json_data.payloads = payloads_list;
return json_data;
}
// Export for Node.js

View File

@@ -559,7 +559,7 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5,
base_delay=100, max_delay=5000):
base_delay=100, max_delay=5000):
"""Receive and process messages from NATS.
This function processes incoming NATS messages, handling both direct transport
@@ -573,7 +573,7 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
max_delay: Maximum delay for exponential backoff in ms
Returns:
list: List of (dataname, data, type) tuples
dict: Envelope dictionary with metadata and 'payloads' field containing list of (dataname, data, type) tuples
"""
# Parse the JSON envelope
json_data = msg if isinstance(msg, dict) else json.loads(msg)
@@ -611,7 +611,7 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
# Extract download URL from the payload
url = payload.get("data", "")
log_trace(json_data.get("correlationId", ""),
"Link transport - fetching '{}' from URL: {}".format(dataname, url))
"Link transport - fetching '{}' from URL: {}".format(dataname, url))
# Fetch with exponential backoff
downloaded_data = fileserver_download_handler(
@@ -627,7 +627,10 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
else:
raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport))
return payloads_list
# Replace payloads field with the processed list of (dataname, data, type) tuples
json_data["payloads"] = payloads_list
return json_data
# Utility functions

View File

@@ -37,8 +37,9 @@ async function test_dict_receive() {
}
);
// Result is a list of {dataname, data, type} objects
for (const { dataname, data, type } of result) {
// Result is an envelope dictionary with payloads field
// Access payloads with result.payloads
for (const { dataname, data, type } of result.payloads) {
if (typeof data === 'object' && data !== null && !Array.isArray(data)) {
log_trace(`Received Dictionary '${dataname}' of type ${type}`);

View File

@@ -36,8 +36,9 @@ async function test_large_binary_receive() {
}
);
// Result is a list of {dataname, data, type} objects
for (const { dataname, data, type } of result) {
// Result is an envelope dictionary with payloads field
// Access payloads with result.payloads
for (const { dataname, data, type } of result.payloads) {
if (data instanceof Uint8Array || Array.isArray(data)) {
const file_size = data.length;
log_trace(`Received ${file_size} bytes of binary data for '${dataname}' of type ${type}`);

View File

@@ -40,10 +40,11 @@ async function test_mix_receive() {
}
);
log_trace(`Received ${result.length} payloads`);
log_trace(`Received ${result.payloads.length} payloads`);
// Result is a list of {dataname, data, type} objects
for (const { dataname, data, type } of result) {
// Result is an envelope dictionary with payloads field
// Access payloads with result.payloads
for (const { dataname, data, type } of result.payloads) {
log_trace(`\n=== Payload: ${dataname} (type: ${type}) ===`);
// Handle different data types
@@ -122,13 +123,13 @@ async function test_mix_receive() {
// Summary
console.log("\n=== Verification Summary ===");
const text_count = result.filter(x => x.type === "text").length;
const dict_count = result.filter(x => x.type === "dictionary").length;
const table_count = result.filter(x => x.type === "table").length;
const image_count = result.filter(x => x.type === "image").length;
const audio_count = result.filter(x => x.type === "audio").length;
const video_count = result.filter(x => x.type === "video").length;
const binary_count = result.filter(x => x.type === "binary").length;
const text_count = result.payloads.filter(x => x.type === "text").length;
const dict_count = result.payloads.filter(x => x.type === "dictionary").length;
const table_count = result.payloads.filter(x => x.type === "table").length;
const image_count = result.payloads.filter(x => x.type === "image").length;
const audio_count = result.payloads.filter(x => x.type === "audio").length;
const video_count = result.payloads.filter(x => x.type === "video").length;
const binary_count = result.payloads.filter(x => x.type === "binary").length;
log_trace(`Text payloads: ${text_count}`);
log_trace(`Dictionary payloads: ${dict_count}`);
@@ -140,7 +141,7 @@ async function test_mix_receive() {
// Print transport type info for each payload if available
console.log("\n=== Payload Details ===");
for (const { dataname, data, type } of result) {
for (const { dataname, data, type } of result.payloads) {
if (["image", "audio", "video", "binary"].includes(type)) {
log_trace(`${dataname}: ${data.length} bytes (binary)`);
} else if (type === "table") {

View File

@@ -40,8 +40,9 @@ async function test_table_receive() {
}
);
// Result is a list of {dataname, data, type} objects
for (const { dataname, data, type } of result) {
// Result is an envelope dictionary with payloads field
// Access payloads with result.payloads
for (const { dataname, data, type } of result.payloads) {
if (Array.isArray(data)) {
log_trace(`Received Table '${dataname}' of type ${type}`);

View File

@@ -37,8 +37,9 @@ async function test_text_receive() {
}
);
// Result is a list of {dataname, data, type} objects
for (const { dataname, data, type } of result) {
// Result is an envelope dictionary with payloads field
// Access payloads with result.payloads
for (const { dataname, data, type } of result.payloads) {
if (typeof data === 'string') {
log_trace(`Received text '${dataname}' of type ${type}`);
log_trace(` Length: ${data.length} characters`);

View File

@@ -42,8 +42,8 @@ function test_dict_receive()
max_delay = 5000
)
# Result is a list of (dataname, data, data_type) tuples
for (dataname, data, data_type) in result
# Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples
for (dataname, data, data_type) in result["payloads"]
if isa(data, JSON.Object{String, Any})
log_trace("Received Dictionary '$dataname' of type $data_type")

View File

@@ -44,8 +44,8 @@ function test_large_binary_receive()
max_delay = 5000
)
# Result is a list of (dataname, data) tuples
for (dataname, data, data_type) in result
# Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples
for (dataname, data, data_type) in result["payloads"]
# Check transport type from the envelope
# For link transport, data is the URL string
# For direct transport, data is the actual payload bytes

View File

@@ -45,10 +45,10 @@ function test_mix_receive()
max_delay = 5000
)
log_trace("Received $(length(result)) payloads")
log_trace("Received $(length(result["payloads"])) payloads")
# Result is a list of (dataname, data, data_type) tuples
for (dataname, data, data_type) in result
# Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples
for (dataname, data, data_type) in result["payloads"]
log_trace("\n=== Payload: $dataname (type: $data_type) ===")
# Handle different data types
@@ -178,13 +178,13 @@ function test_mix_receive()
# Summary
println("\n=== Verification Summary ===")
text_count = count(x -> x[3] == "text", result)
dict_count = count(x -> x[3] == "dictionary", result)
table_count = count(x -> x[3] == "table", result)
image_count = count(x -> x[3] == "image", result)
audio_count = count(x -> x[3] == "audio", result)
video_count = count(x -> x[3] == "video", result)
binary_count = count(x -> x[3] == "binary", result)
text_count = count(x -> x[3] == "text", result["payloads"])
dict_count = count(x -> x[3] == "dictionary", result["payloads"])
table_count = count(x -> x[3] == "table", result["payloads"])
image_count = count(x -> x[3] == "image", result["payloads"])
audio_count = count(x -> x[3] == "audio", result["payloads"])
video_count = count(x -> x[3] == "video", result["payloads"])
binary_count = count(x -> x[3] == "binary", result["payloads"])
log_trace("Text payloads: $text_count")
log_trace("Dictionary payloads: $dict_count")
@@ -196,7 +196,7 @@ function test_mix_receive()
# Print transport type info for each payload if available
println("\n=== Payload Details ===")
for (dataname, data, data_type) in result
for (dataname, data, data_type) in result["payloads"]
if data_type in ["image", "audio", "video", "binary"]
log_trace("$dataname: $(length(data)) bytes (binary)")
elseif data_type == "table"

View File

@@ -42,8 +42,8 @@ function test_table_receive()
max_delay = 5000
)
# Result is a list of (dataname, data, data_type) tuples
for (dataname, data, data_type) in result
# Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples
for (dataname, data, data_type) in result["payloads"]
data = DataFrame(data)
if isa(data, DataFrame)
log_trace("Received DataFrame '$dataname' of type $data_type")

View File

@@ -42,8 +42,8 @@ function test_text_receive()
max_delay = 5000
)
# Result is a list of (dataname, data, data_type) tuples
for (dataname, data, data_type) in result
# Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples
for (dataname, data, data_type) in result["payloads"]
if isa(data, String)
log_trace("Received text '$dataname' of type $data_type")
log_trace(" Length: $(length(data)) characters")

View File

@@ -36,8 +36,8 @@ def test_text_message():
print(" Payloads: {}".format(len(env.payloads)))
# Expected output on receiver:
# payloads = smartreceive(msg)
# for dataname, data, type in payloads:
# envelope = smartreceive(msg)
# for dataname, data, type in envelope["payloads"]:
# print("Received {}: {}".format(dataname, data))
@@ -68,8 +68,8 @@ def test_dictionary_message():
print(" Payloads: {}".format(len(env.payloads)))
# Expected output on receiver:
# payloads = smartreceive(msg)
# for dataname, data, type in payloads:
# envelope = smartreceive(msg)
# for dataname, data, type in envelope["payloads"]:
# if type == "dictionary":
# print("Config: {}".format(data))
@@ -99,8 +99,8 @@ def test_mixed_payloads():
print(" Payloads: {}".format(len(env.payloads)))
# Expected output on receiver:
# payloads = smartreceive(msg)
# for dataname, data, type in payloads:
# envelope = smartreceive(msg)
# for dataname, data, type in envelope["payloads"]:
# print("Received {}: {} (type: {})".format(dataname, data if type != "binary" else len(data), type))