diff --git a/docs/architecture.md b/docs/architecture.md index fee3cb2..31e5c99 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -17,16 +17,16 @@ The system uses **handler functions** to abstract file server operations, allowi ```julia # Upload handler - uploads data to file server and returns URL -# The handler is passed to smartsend as fileserverUploadHandler parameter -# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8}) +# The handler is passed to smartsend as fileserver_upload_handler parameter +# It receives: (file_server_url::String, dataname::String, data::Vector{UInt8}) # Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url" -fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} +fileserver_upload_handler(file_server_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} # Download handler - fetches data from file server URL with exponential backoff -# The handler is passed to smartreceive as fileserverDownloadHandler parameter +# The handler is passed to smartreceive as fileserver_download_handler parameter # It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String) # Returns: Vector{UInt8} (the downloaded data) -fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} +fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} ``` This design allows the system to support multiple file server backends without changing the core messaging logic. @@ -40,21 +40,21 @@ The system uses a **standardized list-of-tuples format** for all payload operati # Input format for smartsend (always a list of tuples with type info) [(dataname1, data1, type1), (dataname2, data2, type2), ...] -# Output format for smartreceive (returns envelope dictionary with payloads field) -# Returns: Dict with envelope metadata and payloads field containing list of tuples +# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples) +# Returns: Dict with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}} # { -# "correlationId": "...", -# "msgId": "...", +# "correlation_id": "...", +# "msg_id": "...", # "timestamp": "...", -# "sendTo": "...", -# "msgPurpose": "...", -# "senderName": "...", -# "senderId": "...", -# "receiverName": "...", -# "receiverId": "...", -# "replyTo": "...", -# "replyToMsgId": "...", -# "brokerURL": "...", +# "send_to": "...", +# "msg_purpose": "...", +# "sender_name": "...", +# "sender_id": "...", +# "receiver_name": "...", +# "receiver_id": "...", +# "reply_to": "...", +# "reply_to_msg_id": "...", +# "broker_url": "...", # "metadata": {...}, # "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] # } @@ -78,17 +78,16 @@ This design allows per-payload type specification, enabling **mixed-content mess smartsend( "/test", [("dataname1", data1, "dictionary")], # List with one tuple (data, type) - nats_url="nats://localhost:4222", - fileserverUploadHandler=plik_oneshot_upload, - metadata=user_provided_envelope_level_metadata + broker_url="nats://localhost:4222", + fileserver_upload_handler=plik_oneshot_upload ) # Multiple payloads in one message with different types smartsend( "/test", [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")], - nats_url="nats://localhost:4222", - fileserverUploadHandler=plik_oneshot_upload + broker_url="nats://localhost:4222", + fileserver_upload_handler=plik_oneshot_upload ) # Mixed content (e.g., chat with text, image, audio) @@ -99,13 +98,14 @@ smartsend( ("user_image", image_data, "image"), ("audio_clip", audio_data, "audio") ], - nats_url="nats://localhost:4222" + broker_url="nats://localhost:4222" ) # Receive returns a dictionary envelope with all metadata and deserialized payloads -env = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay) +env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000) # env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...] -# env["correlationId"], env["msgId"], etc. +# env["correlation_id"], env["msg_id"], etc. +# env is a dictionary containing envelope metadata and payloads field ``` ## Architecture Diagram @@ -138,48 +138,48 @@ flowchart TD ## System Components -### 1. msgEnvelope_v1 - Message Envelope +### 1. msg_envelope_v1 - Message Envelope -The `msgEnvelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications. +The `msg_envelope_v1` structure provides a comprehensive message format for bidirectional communication between Julia, JavaScript, and Python/Micropython applications. **Julia Structure:** ```julia -struct msgEnvelope_v1 - correlationId::String # Unique identifier to track messages across systems - msgId::String # This message id - timestamp::String # Message published timestamp +struct msg_envelope_v1 + correlation_id::String # Unique identifier to track messages across systems + msg_id::String # This message id + timestamp::String # Message published timestamp - sendTo::String # Topic/subject the sender sends to - msgPurpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...) - senderName::String # Sender name (e.g., "agent-wine-web-frontend") - senderId::String # Sender id (uuid4) - receiverName::String # Message receiver name (e.g., "agent-backend") - receiverId::String # Message receiver id (uuid4 or nothing for broadcast) - replyTo::String # Topic to reply to - replyToMsgId::String # Message id this message is replying to - brokerURL::String # NATS server address + send_to::String # Topic/subject the sender sends to + msg_purpose::String # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...) + sender_name::String # Sender name (e.g., "agent-wine-web-frontend") + sender_id::String # Sender id (uuid4) + receiver_name::String # Message receiver name (e.g., "agent-backend") + receiver_id::String # Message receiver id (uuid4 or nothing for broadcast) + reply_to::String # Topic to reply to + reply_to_msg_id::String # Message id this message is replying to + broker_url::String # NATS server address metadata::Dict{String, Any} - payloads::AbstractArray{msgPayload_v1} # Multiple payloads stored here + payloads::Vector{msg_payload_v1} # Multiple payloads stored here end ``` **JSON Schema:** ```json { - "correlationId": "uuid-v4-string", - "msgId": "uuid-v4-string", + "correlation_id": "uuid-v4-string", + "msg_id": "uuid-v4-string", "timestamp": "2024-01-15T10:30:00Z", - "sendTo": "topic/subject", - "msgPurpose": "ACK | NACK | updateStatus | shutdown | chat", - "senderName": "agent-wine-web-frontend", - "senderId": "uuid4", - "receiverName": "agent-backend", - "receiverId": "uuid4", - "replyTo": "topic", - "replyToMsgId": "uuid4", - "brokerURL": "nats://localhost:4222", + "send_to": "topic/subject", + "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat", + "sender_name": "agent-wine-web-frontend", + "sender_id": "uuid4", + "receiver_name": "agent-backend", + "receiver_id": "uuid4", + "reply_to": "topic", + "reply_to_msg_id": "uuid4", + "broker_url": "nats://localhost:4222", "metadata": { @@ -189,7 +189,7 @@ end { "id": "uuid4", "dataname": "login_image", - "type": "image", + "payload_type": "image", "transport": "direct", "encoding": "base64", "size": 15433, @@ -201,7 +201,7 @@ end { "id": "uuid4", "dataname": "large_data", - "type": "table", + "payload_type": "table", "transport": "link", "encoding": "none", "size": 524288, @@ -214,16 +214,16 @@ end } ``` -### 2. msgPayload_v1 - Payload Structure +### 2. msg_payload_v1 - Payload Structure -The `msgPayload_v1` structure provides flexible payload handling for various data types across all supported platforms. +The `msg_payload_v1` structure provides flexible payload handling for various data types across all supported platforms. **Julia Structure:** ```julia -struct msgPayload_v1 +struct msg_payload_v1 id::String # Id of this payload (e.g., "uuid4") dataname::String # Name of this payload (e.g., "login_image") - type::String # "text | dictionary | table | image | audio | video | binary" + payload_type::String # "text | dictionary | table | image | audio | video | binary" transport::String # "direct | link" encoding::String # "none | json | base64 | arrow-ipc" size::Integer # Data size in bytes @@ -383,17 +383,25 @@ graph TD ```julia function smartsend( subject::String, - data::AbstractArray{Tuple{String, Any, String}}; # No standalone type parameter - nats_url::String = "nats://localhost:4222", - fileserverUploadHandler::Function = plik_oneshot_upload, - size_threshold::Int = 1_000_000 # 1MB - is_publish::Bool = true # Whether to automatically publish to NATS + data::AbstractArray{Tuple{String, Any, String}, 1}; # List of (dataname, data, type) tuples + broker_url::String = DEFAULT_BROKER_URL, # NATS server URL + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler::Function = plik_oneshot_upload, + size_threshold::Int = DEFAULT_SIZE_THRESHOLD, + correlation_id::Union{String, Nothing} = nothing, + msg_purpose::String = "chat", + sender_name::String = "NATSBridge", + receiver_name::String = "", + receiver_id::String = "", + reply_to::String = "", + reply_to_msg_id::String = "", + is_publish::Bool = true # Whether to automatically publish to NATS ) ``` **Return Value:** - Returns a tuple `(env, env_json_str)` where: - - `env::msgEnvelope_v1` - The envelope object containing all metadata and payloads + - `env::msg_envelope_v1` - The envelope object containing all metadata and payloads - `env_json_str::String` - JSON string representation of the envelope for publishing **Options:** @@ -417,8 +425,8 @@ The envelope object can be accessed directly for programmatic use, while the JSO ```julia function smartreceive( - msg::NATS.Message, - fileserverDownloadHandler::Function; + msg::NATS.Msg; + fileserver_download_handler::Function = _fetch_with_backoff, max_retries::Int = 5, base_delay::Int = 100, max_delay::Int = 5000 @@ -427,7 +435,7 @@ function smartreceive( # Iterate through all payloads # For each payload: check transport type # If direct: decode Base64 payload - # If link: fetch from URL with exponential backoff using fileserverDownloadHandler + # If link: fetch from URL with exponential backoff using fileserver_download_handler # Deserialize payload based on type # Return envelope dictionary with all metadata and deserialized payloads end @@ -435,7 +443,7 @@ end **Output Format:** - Returns a dictionary (key-value map) containing all envelope fields: - - `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL` + - `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url` - `metadata` - Message-level metadata dictionary - `payloads` - List of dictionaries, each containing deserialized payload data @@ -445,11 +453,11 @@ end 3. For each payload: - Determine transport type (`direct` or `link`) - If `direct`: decode Base64 data from the message - - If `link`: fetch data from URL using exponential backoff (via `fileserverDownloadHandler`) + - If `link`: fetch data from URL using exponential backoff (via `fileserver_download_handler`) - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.) 4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples -**Note:** The `fileserverDownloadHandler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`. +**Note:** The `fileserver_download_handler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`. ### JavaScript Implementation diff --git a/docs/implementation.md b/docs/implementation.md index 9a02d5d..7b68f11 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -31,18 +31,18 @@ The implementation uses a **standardized list-of-tuples format** for all payload # Output format for smartreceive (returns envelope dictionary with payloads field) # Returns: Dict with envelope metadata and payloads field containing list of tuples # { -# "correlationId": "...", -# "msgId": "...", +# "correlation_id": "...", +# "msg_id": "...", # "timestamp": "...", -# "sendTo": "...", -# "msgPurpose": "...", -# "senderName": "...", -# "senderId": "...", -# "receiverName": "...", -# "receiverId": "...", -# "replyTo": "...", -# "replyToMsgId": "...", -# "brokerURL": "...", +# "send_to": "...", +# "msg_purpose": "...", +# "sender_name": "...", +# "sender_id": "...", +# "receiver_name": "...", +# "receiver_id": "...", +# "reply_to": "...", +# "reply_to_msg_id": "...", +# "broker_url": "...", # "metadata": {...}, # "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] # } @@ -53,15 +53,15 @@ Where `type` can be: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, **Examples:** ```julia # Single payload - still wrapped in a list (type is required as third element) -smartsend("/test", [(dataname1, data1, "text")], ...) +smartsend("/test", [(dataname1, data1, "text")], broker_url="nats://localhost:4222") # Multiple payloads in one message (each payload has its own type) -smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], ...) +smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], broker_url="nats://localhost:4222") # Receive returns a dictionary envelope with all metadata and deserialized payloads -env = smartreceive(msg, ...) +env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000) # env["payloads"] = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...] -# env["correlationId"], env["msgId"], etc. +# env["correlation_id"], env["msg_id"], etc. ``` ## Cross-Platform Interoperability @@ -98,7 +98,7 @@ NATSBridge is designed for seamless communication between Julia, JavaScript, and # Julia sender using NATSBridge data = [("message", "Hello from Julia!", "text")] -smartsend("/cross_platform", data, nats_url="nats://localhost:4222") +smartsend("/cross_platform", data, broker_url="nats://localhost:4222") ``` ```javascript @@ -152,8 +152,8 @@ The `smartsend` function now returns a tuple containing both the envelope object ```julia env, env_json_str = smartsend(...) -# env::msgEnvelope_v1 - The envelope object with all metadata and payloads -# env_json_str::String - JSON string for publishing to NATS +# env::msg_envelope_v1 - The envelope object with all metadata and payloads +# env_json_str::String - JSON string for publishing to NATS ``` **Options:** @@ -167,9 +167,10 @@ This enables two use cases: The Julia implementation provides: -- **[`MessageEnvelope`](src/NATSBridge.jl)**: Struct for the unified JSON envelope -- **[`SmartSend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size -- **[`SmartReceive()`](src/NATSBridge.jl)**: Handles both direct and link transport +- **[`msg_envelope_v1`](src/NATSBridge.jl)**: Struct for the unified JSON envelope +- **[`msg_payload_v1`](src/NATSBridge.jl)**: Struct for individual payload representation +- **[`smartsend()`](src/NATSBridge.jl)**: Handles transport selection based on payload size +- **[`smartreceive()`](src/NATSBridge.jl)**: Handles both direct and link transport ### JavaScript Module: [`src/NATSBridge.js`](../src/NATSBridge.js) @@ -277,16 +278,16 @@ smartsend( ) # Even single payload must be wrapped in a list with type -smartsend("/test", [("single_data", mydata, "dictionary")]) +smartsend("/test", [("single_data", mydata, "dictionary")], nats_url="nats://localhost:4222") ``` #### Python/Micropython (Receiver) ```python from nats_bridge import smartreceive -# Receive returns a list of (dataname, data, type) tuples -payloads = smartreceive(msg) -# payloads = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...] +# Receive returns a dictionary with envelope metadata and payloads field +env = smartreceive(msg) +# env["payloads"] = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...] ``` #### JavaScript (Sender) @@ -340,8 +341,8 @@ for await (const msg of sub) { } // Also access envelope metadata - console.log(`Correlation ID: ${env.correlationId}`); - console.log(`Message ID: ${env.msgId}`); + console.log(`Correlation ID: ${env.correlation_id}`); + console.log(`Message ID: ${env.msg_id}`); } ``` @@ -359,9 +360,9 @@ df = DataFrame( category = rand(["A", "B", "C"], 10_000_000) ) -# Send via SmartSend - wrapped in a list (type is part of each tuple) -env, env_json_str = SmartSend("analysis_results", [("table_data", df, "table")]) -# env: msgEnvelope_v1 object with all metadata and payloads +# Send via smartsend - wrapped in a list (type is part of each tuple) +env, env_json_str = smartsend("analysis_results", [("table_data", df, "table")], broker_url="nats://localhost:4222") +# env: msg_envelope_v1 object with all metadata and payloads # env_json_str: JSON string representation of the envelope for publishing ``` @@ -461,7 +462,7 @@ using NATSBridge function publish_health_status(nats_url) # Send status wrapped in a list (type is part of each tuple) status = Dict("cpu" => rand(), "memory" => rand()) - smartsend("health", [("status", status, "dictionary")], nats_url=nats_url) + smartsend("health", [("status", status, "dictionary")], broker_url=nats_url) sleep(5) # Every 5 seconds end ``` @@ -523,7 +524,7 @@ def handle_device_config(msg): "device/response", [("config", config, "dictionary")], nats_url="nats://localhost:4222", - reply_to=env.get("replyTo") + reply_to=env.get("reply_to") ) ``` @@ -636,7 +637,7 @@ chat_message = [ smartsend( "chat.room123", chat_message, - nats_url="nats://localhost:4222", + broker_url="nats://localhost:4222", msg_purpose="chat", reply_to="chat.room123.responses" ) @@ -684,7 +685,7 @@ await smartsend("chat.room123", message); **Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components. -**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads. +**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msg_envelope_v1` supports `Vector{msg_payload_v1}` for multiple payloads. ## Configuration @@ -700,19 +701,19 @@ await smartsend("chat.room123", message); ```json { - "correlationId": "uuid-v4-string", - "msgId": "uuid-v4-string", + "correlation_id": "uuid-v4-string", + "msg_id": "uuid-v4-string", "timestamp": "2024-01-15T10:30:00Z", - "sendTo": "topic/subject", - "msgPurpose": "ACK | NACK | updateStatus | shutdown | chat", - "senderName": "agent-wine-web-frontend", - "senderId": "uuid4", - "receiverName": "agent-backend", - "receiverId": "uuid4", - "replyTo": "topic", - "replyToMsgId": "uuid4", - "BrokerURL": "nats://localhost:4222", + "send_to": "topic/subject", + "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat", + "sender_name": "agent-wine-web-frontend", + "sender_id": "uuid4", + "receiver_name": "agent-backend", + "receiver_id": "uuid4", + "reply_to": "topic", + "reply_to_msg_id": "uuid4", + "broker_url": "nats://localhost:4222", "metadata": { "content_type": "application/octet-stream", @@ -723,7 +724,7 @@ await smartsend("chat.room123", message); { "id": "uuid4", "dataname": "login_image", - "type": "image", + "payload_type": "image", "transport": "direct", "encoding": "base64", "size": 15433, diff --git a/examples/tutorial.md b/examples/tutorial.md index 220df84..9560f8a 100644 --- a/examples/tutorial.md +++ b/examples/tutorial.md @@ -182,7 +182,7 @@ for (const payload of env.payloads) { using NATSBridge # Receive and process message -env = smartreceive(msg, fileserverDownloadHandler) +env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) for (dataname, data, type) in env["payloads"] println("Received $dataname: $data") end diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 4edc503..01bed02 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -120,15 +120,15 @@ function msg_payload_v1( metadata::Dict{String, T} = Dict{String, Any}() ) where {T<:Any} return msg_payload_v1( - id, - dataname, - payload_type, - transport, - encoding, - size, - data, - metadata - ) + id, + dataname, + payload_type, + transport, + encoding, + size, + data, + metadata + ) end @@ -167,13 +167,13 @@ payload2 = msg_payload_v1("http://example.com/file.zip", "binary"; dataname="fil # Create message envelope env = msg_envelope_v1( - "my.subject", - [payload1, payload2]; - correlation_id = string(uuid4()), - msg_purpose = "chat", - sender_name = "my-app", - receiver_name = "receiver-app", - reply_to = "reply.subject" + "my.subject", + [payload1, payload2]; + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "my-app", + receiver_name = "receiver-app", + reply_to = "reply.subject" ) ``` """ @@ -498,22 +498,22 @@ function smartsend( end end - # Create msg_envelope_v1 with all payloads - env = msg_envelope_v1( - subject, - payloads; - correlation_id = cid, - msg_id = msg_id, - msg_purpose = msg_purpose, - sender_name = sender_name, - sender_id = string(uuid4()), - receiver_name = receiver_name, - receiver_id = receiver_id, - reply_to = reply_to, - reply_to_msg_id = reply_to_msg_id, - broker_url = broker_url, - metadata = Dict{String, Any}(), - ) + # Create msg_envelope_v1 with all payloads + env = msg_envelope_v1( + subject, + payloads; + correlation_id = cid, + msg_id = msg_id, + msg_purpose = msg_purpose, + sender_name = sender_name, + sender_id = string(uuid4()), + receiver_name = receiver_name, + receiver_id = receiver_id, + reply_to = reply_to, + reply_to_msg_id = reply_to_msg_id, + broker_url = broker_url, + metadata = Dict{String, Any}(), + ) env_json_str = envelope_to_json(env) # Convert envelope to JSON if is_publish @@ -602,48 +602,48 @@ function _serialize_data(data::Any, payload_type::String) """ if payload_type == "text" # Text data - convert to UTF-8 bytes - if isa(data, String) - data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes - return data_bytes - else - error("Text data must be a String") - end + if isa(data, String) + data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes + return data_bytes + else + error("Text data must be a String") + end elseif payload_type == "dictionary" # JSON data - serialize directly - json_str = JSON.json(data) # Convert Julia data to JSON string - json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes - return json_str_bytes + json_str = JSON.json(data) # Convert Julia data to JSON string + json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes + return json_str_bytes elseif payload_type == "table" # Table data - convert to Arrow IPC stream - io = IOBuffer() # Create in-memory buffer - Arrow.write(io, data) # Write data as Arrow IPC stream to buffer - return take!(io) # Return the buffer contents as bytes + io = IOBuffer() # Create in-memory buffer + Arrow.write(io, data) # Write data as Arrow IPC stream to buffer + return take!(io) # Return the buffer contents as bytes elseif payload_type == "image" # Image data - treat as binary - if isa(data, Vector{UInt8}) - return data # Return binary data directly - else - error("Image data must be Vector{UInt8}") - end + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Image data must be Vector{UInt8}") + end elseif payload_type == "audio" # Audio data - treat as binary - if isa(data, Vector{UInt8}) - return data # Return binary data directly - else - error("Audio data must be Vector{UInt8}") - end + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Audio data must be Vector{UInt8}") + end elseif payload_type == "video" # Video data - treat as binary - if isa(data, Vector{UInt8}) - return data # Return binary data directly - else - error("Video data must be Vector{UInt8}") - end + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Video data must be Vector{UInt8}") + end elseif payload_type == "binary" # Binary data - treat as binary - if isa(data, IOBuffer) # Check if data is an IOBuffer - return take!(data) # Return buffer contents as bytes - elseif isa(data, Vector{UInt8}) # Check if data is already binary - return data # Return binary data directly - else # Unsupported binary data type - error("Binary data must be binary (Vector{UInt8} or IOBuffer)") - end + if isa(data, IOBuffer) # Check if data is an IOBuffer + return take!(data) # Return buffer contents as bytes + elseif isa(data, Vector{UInt8}) # Check if data is already binary + return data # Return binary data directly + else # Unsupported binary data type + error("Binary data must be binary (Vector{UInt8} or IOBuffer)") + end else # Unknown type - error("Unknown payload_type: $payload_type") + error("Unknown payload_type: $payload_type") end end @@ -673,13 +673,13 @@ connection management and logging. ``` """ function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) - conn = NATS.connect(broker_url) # Create NATS connection - try - NATS.publish(conn, subject, message) # Publish message to NATS - log_trace(correlation_id, "Message published to $subject") # Log successful publish - finally - NATS.drain(conn) # Ensure connection is closed properly - end + conn = NATS.connect(broker_url) # Create NATS connection + try + NATS.publish(conn, subject, message) # Publish message to NATS + log_trace(correlation_id, "Message published to $subject") # Log successful publish + finally + NATS.drain(conn) # Ensure connection is closed properly + end end @@ -717,60 +717,60 @@ payloads = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, ma ``` """ function smartreceive( - msg::NATS.Msg; - fileserver_download_handler::Function=_fetch_with_backoff, - max_retries::Int = 5, - base_delay::Int = 100, - max_delay::Int = 5000 + msg::NATS.Msg; + fileserver_download_handler::Function = _fetch_with_backoff, + max_retries::Int = 5, + base_delay::Int = 100, + max_delay::Int = 5000 ) - # Parse the JSON envelope - json_data = JSON.parse(String(msg.payload)) - log_trace(json_data["correlation_id"], "Processing received message") # Log message processing start + # Parse the JSON envelope + json_data = JSON.parse(String(msg.payload)) + log_trace(json_data["correlation_id"], "Processing received message") # Log message processing start + + # Process all payloads in the envelope + payloads_list = Tuple{String, Any, String}[] + + # Get number of payloads + num_payloads = length(json_data["payloads"]) - # Process all payloads in the envelope - payloads_list = Tuple{String, Any, String}[] + for i in 1:num_payloads + payload = json_data["payloads"][i] + transport = String(payload["transport"]) + dataname = String(payload["dataname"]) - # Get number of payloads - num_payloads = length(json_data["payloads"]) - - for i in 1:num_payloads - payload = json_data["payloads"][i] - transport = String(payload["transport"]) - dataname = String(payload["dataname"]) - - if transport == "direct" # Direct transport - payload is in the message - log_trace(json_data["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling - - # Extract base64 payload from the payload - payload_b64 = String(payload["data"]) - - # Decode Base64 payload - payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes - - # Deserialize based on type - data_type = String(payload["payload_type"]) - data = _deserialize_data(payload_bytes, data_type, json_data["correlation_id"]) - - push!(payloads_list, (dataname, data, data_type)) - elseif transport == "link" # Link transport - payload is at URL - # Extract download URL from the payload - url = String(payload["data"]) - log_trace(json_data["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling - - # Fetch with exponential backoff using the download handler - downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, json_data["correlation_id"]) + if transport == "direct" # Direct transport - payload is in the message + log_trace(json_data["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling + + # Extract base64 payload from the payload + payload_b64 = String(payload["data"]) + + # Decode Base64 payload + payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes + + # Deserialize based on type + data_type = String(payload["payload_type"]) + data = _deserialize_data(payload_bytes, data_type, json_data["correlation_id"]) + + push!(payloads_list, (dataname, data, data_type)) + elseif transport == "link" # Link transport - payload is at URL + # Extract download URL from the payload + url = String(payload["data"]) + log_trace(json_data["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling + + # Fetch with exponential backoff using the download handler + downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, json_data["correlation_id"]) - # Deserialize based on type - data_type = String(payload["payload_type"]) - data = _deserialize_data(downloaded_data, data_type, json_data["correlation_id"]) - - push!(payloads_list, (dataname, data, data_type)) - else # Unknown transport type - error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport - end + # Deserialize based on type + data_type = String(payload["payload_type"]) + data = _deserialize_data(downloaded_data, data_type, json_data["correlation_id"]) + + push!(payloads_list, (dataname, data, data_type)) + else # Unknown transport type + error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport end - json_data["payloads"] = payloads_list - return json_data # Return envelope with list of (dataname, data, data_type) tuples in payloads field + end + json_data["payloads"] = payloads_list + return json_data # Return envelope with list of (dataname, data, data_type) tuples in payloads field end @@ -805,33 +805,33 @@ data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "correla ``` """ function _fetch_with_backoff( - url::String, - max_retries::Int, - base_delay::Int, - max_delay::Int, - correlation_id::String + url::String, + max_retries::Int, + base_delay::Int, + max_delay::Int, + correlation_id::String ) - delay = base_delay # Initialize delay with base delay value - for attempt in 1:max_retries # Attempt to fetch data up to max_retries times - try - response = HTTP.request("GET", url) # Make HTTP GET request to URL - if response.status == 200 # Check if request was successful - log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") # Log success - return response.body # Return response body as bytes - else # Request failed - error("Failed to fetch: $(response.status)") # Throw error for non-200 status - end - catch e # Handle exceptions during fetch - log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") # Log failure - - if attempt < max_retries # Only sleep if not the last attempt - sleep(delay / 1000.0) # Sleep for delay seconds (convert from ms) - delay = min(delay * 2, max_delay) # Double delay for next attempt, capped at max_delay - end - end + delay = base_delay # Initialize delay with base delay value + for attempt in 1:max_retries # Attempt to fetch data up to max_retries times + try + response = HTTP.request("GET", url) # Make HTTP GET request to URL + if response.status == 200 # Check if request was successful + log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") # Log success + return response.body # Return response body as bytes + else # Request failed + error("Failed to fetch: $(response.status)") # Throw error for non-200 status + end + catch e # Handle exceptions during fetch + log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") # Log failure + + if attempt < max_retries # Only sleep if not the last attempt + sleep(delay / 1000.0) # Sleep for delay seconds (convert from ms) + delay = min(delay * 2, max_delay) # Double delay for next attempt, capped at max_delay + end end - - error("Failed to fetch data after $max_retries attempts") # Throw error if all attempts failed + end + + error("Failed to fetch data after $max_retries attempts") # Throw error if all attempts failed end @@ -875,30 +875,30 @@ table_data = _deserialize_data(arrow_bytes, "table", "correlation123") ``` """ function _deserialize_data( - data::Vector{UInt8}, - payload_type::String, - correlation_id::String + data::Vector{UInt8}, + payload_type::String, + correlation_id::String ) - if payload_type == "text" # Text data - convert to string - return String(data) # Convert bytes to string - elseif payload_type == "dictionary" # JSON data - deserialize - json_str = String(data) # Convert bytes to string - return JSON.parse(json_str) # Parse JSON string to JSON object - elseif payload_type == "table" # Table data - deserialize Arrow IPC stream - io = IOBuffer(data) # Create buffer from bytes - df = Arrow.Table(io) # Read Arrow IPC format from buffer - return df # Return DataFrame - elseif payload_type == "image" # Image data - return binary - return data # Return bytes directly - elseif payload_type == "audio" # Audio data - return binary - return data # Return bytes directly - elseif payload_type == "video" # Video data - return binary - return data # Return bytes directly - elseif payload_type == "binary" # Binary data - return binary - return data # Return bytes directly - else # Unknown type - error("Unknown payload_type: $payload_type") # Throw error for unknown type - end + if payload_type == "text" # Text data - convert to string + return String(data) # Convert bytes to string + elseif payload_type == "dictionary" # JSON data - deserialize + json_str = String(data) # Convert bytes to string + return JSON.parse(json_str) # Parse JSON string to JSON object + elseif payload_type == "table" # Table data - deserialize Arrow IPC stream + io = IOBuffer(data) # Create buffer from bytes + df = Arrow.Table(io) # Read Arrow IPC format from buffer + return df # Return DataFrame + elseif payload_type == "image" # Image data - return binary + return data # Return bytes directly + elseif payload_type == "audio" # Audio data - return binary + return data # Return bytes directly + elseif payload_type == "video" # Video data - return binary + return data # Return bytes directly + elseif payload_type == "binary" # Binary data - return binary + return data # Return bytes directly + else # Unknown type + error("Unknown payload_type: $payload_type") # Throw error for unknown type + end end @@ -1035,10 +1035,10 @@ function plik_oneshot_upload(file_server_url::String, filepath::String) url_upload = "$file_server_url/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] http_response = open(filepath, "r") do file_stream - form = HTTP.Form(Dict("file" => file_stream)) - - # Adding status_exception=false prevents 4xx/5xx from triggering 'catch' - HTTP.post(url_upload, headers, form; status_exception = false) + form = HTTP.Form(Dict("file" => file_stream)) + + # Adding status_exception=false prevents 4xx/5xx from triggering 'catch' + HTTP.post(url_upload, headers, form; status_exception = false) end if !isnothing(http_response) && http_response.status == 200 @@ -1047,7 +1047,6 @@ function plik_oneshot_upload(file_server_url::String, filepath::String) error("Failed to upload file: server returned status $(http_response.status)") end response_json = JSON.parse(http_response.body) - fileid = response_json["id"] # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" diff --git a/test/test_julia_dict_sender.jl b/test/test_julia_dict_sender.jl index c180320..dd2a847 100644 --- a/test/test_julia_dict_sender.jl +++ b/test/test_julia_dict_sender.jl @@ -95,9 +95,9 @@ function test_dict_send() env, env_json_str = NATSBridge.smartsend( SUBJECT, [data1, data2]; # List of (dataname, data, type) tuples - nats_url = NATS_URL, + broker_url = NATS_URL, fileserver_url = FILESERVER_URL, - fileserverUploadHandler = plik_upload_handler, + fileserver_upload_handler = plik_upload_handler, size_threshold = 1_000_000, # 1MB threshold correlation_id = correlation_id, msg_purpose = "chat", @@ -115,7 +115,7 @@ function test_dict_send() for (i, payload) in enumerate(env.payloads) log_trace("Payload $i ('$payload.dataname'):") log_trace(" Transport: $(payload.transport)") - log_trace(" Type: $(payload.type)") + log_trace(" Type: $(payload.payload_type)") log_trace(" Size: $(payload.size) bytes") log_trace(" Encoding: $(payload.encoding)") diff --git a/test/test_julia_file_sender.jl b/test/test_julia_file_sender.jl index db1bed1..abce1c2 100644 --- a/test/test_julia_file_sender.jl +++ b/test/test_julia_file_sender.jl @@ -82,9 +82,9 @@ function test_large_binary_send() env, env_json_str = NATSBridge.smartsend( SUBJECT, [data1, data2]; # List of (dataname, data, type) tuples - nats_url = NATS_URL; + broker_url = NATS_URL; fileserver_url = FILESERVER_URL, - fileserverUploadHandler = plik_upload_handler, + fileserver_upload_handler = plik_upload_handler, size_threshold = 1_000_000, correlation_id = correlation_id, msg_purpose = "chat", @@ -97,7 +97,7 @@ function test_large_binary_send() ) log_trace("Sent message with transport: $(env.payloads[1].transport)") - log_trace("Envelope type: $(env.payloads[1].type)") + log_trace("Envelope type: $(env.payloads[1].payload_type)") # Check if link transport was used if env.payloads[1].transport == "link" diff --git a/test/test_julia_mix_payloads_sender.jl b/test/test_julia_mix_payloads_sender.jl index f62e009..541a4d0 100644 --- a/test/test_julia_mix_payloads_sender.jl +++ b/test/test_julia_mix_payloads_sender.jl @@ -189,9 +189,9 @@ function test_mix_send() env, env_json_str = NATSBridge.smartsend( SUBJECT, payloads; # List of (dataname, data, type) tuples - nats_url = NATS_URL, + broker_url = NATS_URL, fileserver_url = FILESERVER_URL, - fileserverUploadHandler = plik_upload_handler, + fileserver_upload_handler = plik_upload_handler, size_threshold = 1_000_000, # 1MB threshold correlation_id = correlation_id, msg_purpose = "chat", @@ -209,7 +209,7 @@ function test_mix_send() for (i, payload) in enumerate(env.payloads) log_trace("Payload $i ('$payload.dataname'):") log_trace(" Transport: $(payload.transport)") - log_trace(" Type: $(payload.type)") + log_trace(" Type: $(payload.payload_type)") log_trace(" Size: $(payload.size) bytes") log_trace(" Encoding: $(payload.encoding)") diff --git a/test/test_julia_table_sender.jl b/test/test_julia_table_sender.jl index 75e93f3..386ad50 100644 --- a/test/test_julia_table_sender.jl +++ b/test/test_julia_table_sender.jl @@ -93,9 +93,9 @@ function test_table_send() env, env_json_str = NATSBridge.smartsend( SUBJECT, [data1, data2]; # List of (dataname, data, type) tuples - nats_url = NATS_URL, + broker_url = NATS_URL, fileserver_url = FILESERVER_URL, - fileserverUploadHandler = plik_upload_handler, + fileserver_upload_handler = plik_upload_handler, size_threshold = 1_000_000, # 1MB threshold correlation_id = correlation_id, msg_purpose = "chat", @@ -113,7 +113,7 @@ function test_table_send() for (i, payload) in enumerate(env.payloads) log_trace("Payload $i ('$payload.dataname'):") log_trace(" Transport: $(payload.transport)") - log_trace(" Type: $(payload.type)") + log_trace(" Type: $(payload.payload_type)") log_trace(" Size: $(payload.size) bytes") log_trace(" Encoding: $(payload.encoding)") diff --git a/test/test_julia_text_sender.jl b/test/test_julia_text_sender.jl index 6625d16..29e1839 100644 --- a/test/test_julia_text_sender.jl +++ b/test/test_julia_text_sender.jl @@ -78,9 +78,9 @@ function test_text_send() env, env_json_str = NATSBridge.smartsend( SUBJECT, [data1, data2]; # List of (dataname, data, type) tuples - nats_url = NATS_URL, + broker_url = NATS_URL, fileserver_url = FILESERVER_URL, - fileserverUploadHandler = plik_upload_handler, + fileserver_upload_handler = plik_upload_handler, size_threshold = 1_000_000, # 1MB threshold correlation_id = correlation_id, msg_purpose = "chat", @@ -98,7 +98,7 @@ function test_text_send() for (i, payload) in enumerate(env.payloads) log_trace("Payload $i ('$payload.dataname'):") log_trace(" Transport: $(payload.transport)") - log_trace(" Type: $(payload.type)") + log_trace(" Type: $(payload.payload_type)") log_trace(" Size: $(payload.size) bytes") log_trace(" Encoding: $(payload.encoding)")