Compare commits
2 Commits
7f68d08134
...
61d81bed62
| Author | SHA1 | Date | |
|---|---|---|---|
| 61d81bed62 | |||
| 1a10bc1a5f |
@@ -445,7 +445,7 @@ end
|
|||||||
- Returns a dictionary (key-value map) containing all envelope fields:
|
- Returns a dictionary (key-value map) containing all envelope fields:
|
||||||
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
|
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
|
||||||
- `metadata` - Message-level metadata dictionary
|
- `metadata` - Message-level metadata dictionary
|
||||||
- `payloads` - List of dictionaries, each containing deserialized payload data
|
- `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
|
||||||
|
|
||||||
**Process Flow:**
|
**Process Flow:**
|
||||||
1. Parse the JSON envelope to extract all fields
|
1. Parse the JSON envelope to extract all fields
|
||||||
@@ -531,16 +531,16 @@ async function smartreceive(msg, options = {})
|
|||||||
- Returns a Promise that resolves to an object containing all envelope fields:
|
- Returns a Promise that resolves to an object containing all envelope fields:
|
||||||
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
|
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
|
||||||
- `metadata` - Message-level metadata dictionary
|
- `metadata` - Message-level metadata dictionary
|
||||||
- `payloads` - List of dictionaries, each containing deserialized payload data with keys: `dataname`, `data`, `payload_type`
|
- `payloads` - List of tuples, each containing `(dataname, data, type)` with deserialized payload data
|
||||||
|
|
||||||
**Process Flow:**
|
**Process Flow:**
|
||||||
1. Parse the JSON envelope to extract all fields
|
1. Parse the JSON envelope to extract all fields
|
||||||
2. Iterate through each payload in `payloads` array
|
2. Iterate through each payload in `payloads` array
|
||||||
3. For each payload:
|
3. For each payload:
|
||||||
- Determine transport type (`direct` or `link`)
|
- Determine transport type (`direct` or `link`)
|
||||||
- If `direct`: Base64 decode the data from the message
|
- If `direct`: Base64 decode the data from the message
|
||||||
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
|
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
|
||||||
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
|
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
|
||||||
4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples
|
4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples
|
||||||
|
|
||||||
**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`.
|
**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`.
|
||||||
@@ -824,7 +824,7 @@ async def smartreceive(
|
|||||||
|
|
||||||
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components across all platforms.
|
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components across all platforms.
|
||||||
|
|
||||||
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads.
|
**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads.
|
||||||
|
|
||||||
## Performance Considerations
|
## Performance Considerations
|
||||||
|
|
||||||
|
|||||||
@@ -436,7 +436,6 @@ function smartsend(
|
|||||||
|
|
||||||
# Generate correlation ID if not provided
|
# Generate correlation ID if not provided
|
||||||
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
|
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
|
||||||
|
|
||||||
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
|
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
|
||||||
|
|
||||||
# Generate message metadata
|
# Generate message metadata
|
||||||
@@ -706,7 +705,7 @@ A HTTP file server is required along with its download function.
|
|||||||
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
|
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
|
||||||
|
|
||||||
# Return:
|
# Return:
|
||||||
- `Vector{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples
|
- JSON object of envelope with list of (dataname, data, data_type) tuples in payloads field
|
||||||
|
|
||||||
# Example
|
# Example
|
||||||
```jldoctest
|
```jldoctest
|
||||||
@@ -724,22 +723,22 @@ function smartreceive(
|
|||||||
max_delay::Int = 5000
|
max_delay::Int = 5000
|
||||||
)
|
)
|
||||||
# Parse the JSON envelope
|
# Parse the JSON envelope
|
||||||
json_data = JSON.parse(String(msg.payload))
|
env_json_obj = JSON.parse(String(msg.payload))
|
||||||
log_trace(json_data["correlation_id"], "Processing received message") # Log message processing start
|
log_trace(env_json_obj["correlation_id"], "Processing received message") # Log message processing start
|
||||||
|
|
||||||
# Process all payloads in the envelope
|
# Process all payloads in the envelope
|
||||||
payloads_list = Tuple{String, Any, String}[]
|
payloads_list = Tuple{String, Any, String}[]
|
||||||
|
|
||||||
# Get number of payloads
|
# Get number of payloads
|
||||||
num_payloads = length(json_data["payloads"])
|
num_payloads = length(env_json_obj["payloads"])
|
||||||
|
|
||||||
for i in 1:num_payloads
|
for i in 1:num_payloads
|
||||||
payload = json_data["payloads"][i]
|
payload = env_json_obj["payloads"][i]
|
||||||
transport = String(payload["transport"])
|
transport = String(payload["transport"])
|
||||||
dataname = String(payload["dataname"])
|
dataname = String(payload["dataname"])
|
||||||
|
|
||||||
if transport == "direct" # Direct transport - payload is in the message
|
if transport == "direct" # Direct transport - payload is in the message
|
||||||
log_trace(json_data["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
|
log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
|
||||||
|
|
||||||
# Extract base64 payload from the payload
|
# Extract base64 payload from the payload
|
||||||
payload_b64 = String(payload["data"])
|
payload_b64 = String(payload["data"])
|
||||||
@@ -749,28 +748,28 @@ function smartreceive(
|
|||||||
|
|
||||||
# Deserialize based on type
|
# Deserialize based on type
|
||||||
data_type = String(payload["payload_type"])
|
data_type = String(payload["payload_type"])
|
||||||
data = _deserialize_data(payload_bytes, data_type, json_data["correlation_id"])
|
data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"])
|
||||||
|
|
||||||
push!(payloads_list, (dataname, data, data_type))
|
push!(payloads_list, (dataname, data, data_type))
|
||||||
elseif transport == "link" # Link transport - payload is at URL
|
elseif transport == "link" # Link transport - payload is at URL
|
||||||
# Extract download URL from the payload
|
# Extract download URL from the payload
|
||||||
url = String(payload["data"])
|
url = String(payload["data"])
|
||||||
log_trace(json_data["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
|
log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
|
||||||
|
|
||||||
# Fetch with exponential backoff using the download handler
|
# Fetch with exponential backoff using the download handler
|
||||||
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, json_data["correlation_id"])
|
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"])
|
||||||
|
|
||||||
# Deserialize based on type
|
# Deserialize based on type
|
||||||
data_type = String(payload["payload_type"])
|
data_type = String(payload["payload_type"])
|
||||||
data = _deserialize_data(downloaded_data, data_type, json_data["correlation_id"])
|
data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"])
|
||||||
|
|
||||||
push!(payloads_list, (dataname, data, data_type))
|
push!(payloads_list, (dataname, data, data_type))
|
||||||
else # Unknown transport type
|
else # Unknown transport type
|
||||||
error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
|
error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
json_data["payloads"] = payloads_list
|
env_json_obj["payloads"] = payloads_list
|
||||||
return json_data # Return envelope with list of (dataname, data, data_type) tuples in payloads field
|
return env_json_obj # JSON object of envelope with list of (dataname, data, data_type) tuples in payloads field
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
@@ -929,7 +928,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
|
|||||||
```jldoctest
|
```jldoctest
|
||||||
using HTTP, JSON
|
using HTTP, JSON
|
||||||
|
|
||||||
file_server_url = "http://localhost:8080"
|
fileserver_url = "http://localhost:8080"
|
||||||
filename = "test.txt"
|
filename = "test.txt"
|
||||||
data = Vector{UInt8}("hello world")
|
data = Vector{UInt8}("hello world")
|
||||||
|
|
||||||
@@ -1006,7 +1005,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
|
|||||||
```jldoctest
|
```jldoctest
|
||||||
using HTTP, JSON
|
using HTTP, JSON
|
||||||
|
|
||||||
file_server_url = "http://localhost:8080"
|
fileserver_url = "http://localhost:8080"
|
||||||
filepath = "./test.zip"
|
filepath = "./test.zip"
|
||||||
|
|
||||||
# Upload to local plik server
|
# Upload to local plik server
|
||||||
|
|||||||
Reference in New Issue
Block a user