diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 7d4549a..4677c53 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -12,10 +12,10 @@ # # ```jldoctest # # Upload handler - uploads data to file server and returns URL -# fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} -# +# fileserver_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} +# # # Download handler - fetches data from file server URL with exponential backoff -# fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} +# fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} # ``` # # Multi-Payload Support (Standard API): @@ -35,24 +35,23 @@ module NATSBridge -using Revise using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting # ---------------------------------------------- 100 --------------------------------------------- # # Constants const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport -const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL +const DEFAULT_BROKER_URL = "nats://localhost:4222" # Default NATS server URL const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport -""" msgPayload_v1 - Internal message payload structure +""" msg_payload_v1 - Internal message payload structure This structure represents a single payload within a NATS message envelope. It supports both direct transport (base64-encoded data) and link transport (URL-based). # Arguments: - `id::String` - Unique identifier for this payload (e.g., "uuid4") - `dataname::String` - Name of the payload (e.g., "login_image") - - `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" + - `payload_type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" - `transport::String` - Transport method: "direct" or "link" - `encoding::String` - Encoding method: "none", "json", "base64", "arrow-ipc" - `size::Integer` - Size of the payload in bytes (e.g., 15433) @@ -68,14 +67,14 @@ It supports both direct transport (base64-encoded data) and link transport (URL- - `metadata::Dict{String, T} = Dict{String, Any}()` - Metadata dictionary # Return: - - A msgPayload_v1 struct instance + - A msg_payload_v1 struct instance # Example ```jldoctest using UUIDs # Create a direct transport payload -payload = msgPayload_v1( +payload = msg_payload_v1( "Hello World", "text"; id = string(uuid4()), @@ -87,7 +86,7 @@ payload = msgPayload_v1( ) # Create a link transport payload -payload = msgPayload_v1( +payload = msg_payload_v1( "http://example.com/file.zip", "binary"; id = string(uuid4()), @@ -98,10 +97,10 @@ payload = msgPayload_v1( ) ``` """ -struct msgPayload_v1 +struct msg_payload_v1 id::String # id of this payload e.g. "uuid4" dataname::String # name of this payload e.g. "login_image" - type::String # this payload type. Can be "text | dictionary | table | image | audio | video | binary" + payload_type::String # this payload type. Can be "text | dictionary | table | image | audio | video | binary" transport::String # "direct | link" encoding::String # "none | json | base64 | arrow-ipc" size::Integer # data size in bytes e.g. 15433 @@ -110,9 +109,9 @@ struct msgPayload_v1 end # constructor -function msgPayload_v1( +function msg_payload_v1( data::Any, - type::String; + payload_type::String; id::String = "", dataname::String = string(uuid4()), transport::String = "direct", @@ -120,10 +119,10 @@ function msgPayload_v1( size::Integer = 0, metadata::Dict{String, T} = Dict{String, Any}() ) where {T<:Any} - return msgPayload_v1( + return msg_payload_v1( id, dataname, - type, + payload_type, transport, encoding, size, @@ -133,101 +132,101 @@ function msgPayload_v1( end -""" msgEnvelope_v1 - Internal message envelope structure +""" msg_envelope_v1 - Internal message envelope structure This structure represents a complete NATS message envelope containing multiple payloads with metadata for routing, tracing, and message context. # Arguments: - - `sendTo::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt") - - `payloads::AbstractArray{msgPayload_v1}` - List of payloads to include in the message + - `send_to::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt") + - `payloads::AbstractArray{msg_payload_v1}` - List of payloads to include in the message # Keyword Arguments: - - `correlationId::String = ""` - Unique identifier to track messages across systems; auto-generated if empty - - `msgId::String = ""` - Unique message identifier; auto-generated if empty + - `correlation_id::String = ""` - Unique identifier to track messages across systems; auto-generated if empty + - `msg_id::String = ""` - Unique message identifier; auto-generated if empty - `timestamp::String = string(Dates.now())` - Message publication timestamp - - `msgPurpose::String = ""` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc. - - `senderName::String = ""` - Name of the sender (e.g., "agent-wine-web-frontend") - - `senderId::String = ""` - UUID of the sender; auto-generated if empty - - `receiverName::String = ""` - Name of the receiver (empty string means broadcast) - - `receiverId::String = ""` - UUID of the receiver (empty string means broadcast) - - `replyTo::String = ""` - Topic where receiver should reply (empty string if no reply expected) - - `replyToMsgId::String = ""` - Message ID this message is replying to - - `brokerURL::String = DEFAULT_NATS_URL` - NATS broker URL + - `msg_purpose::String = ""` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc. + - `sender_name::String = ""` - Name of the sender (e.g., "agent-wine-web-frontend") + - `sender_id::String = ""` - UUID of the sender; auto-generated if empty + - `receiver_name::String = ""` - Name of the receiver (empty string means broadcast) + - `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast) + - `reply_to::String = ""` - Topic where receiver should reply (empty string if no reply expected) + - `reply_to_msg_id::String = ""` - Message ID this message is replying to + - `broker_url::String = DEFAULT_BROKER_URL` - NATS broker URL - `metadata::Dict{String, Any} = Dict{String, Any}()` - Optional message-level metadata # Return: - - A msgEnvelope_v1 struct instance + - A msg_envelope_v1 struct instance # Example ```jldoctest using UUIDs, NATSBridge # Create payloads for the message -payload1 = msgPayload_v1("Hello", "text"; dataname="message", transport="direct", encoding="base64") -payload2 = msgPayload_v1("http://example.com/file.zip", "binary"; dataname="file", transport="link") +payload1 = msg_payload_v1("Hello", "text"; dataname="message", transport="direct", encoding="base64") +payload2 = msg_payload_v1("http://example.com/file.zip", "binary"; dataname="file", transport="link") # Create message envelope -env = msgEnvelope_v1( +env = msg_envelope_v1( "my.subject", [payload1, payload2]; - correlationId = string(uuid4()), - msgPurpose = "chat", - senderName = "my-app", - receiverName = "receiver-app", - replyTo = "reply.subject" + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "my-app", + receiver_name = "receiver-app", + reply_to = "reply.subject" ) ``` """ -struct msgEnvelope_v1 - correlationId::String # Unique identifier to track messages across systems. Many senders can talk about the same topic. - msgId::String # this message id +struct msg_envelope_v1 + correlation_id::String # Unique identifier to track messages across systems. Many senders can talk about the same topic. + msg_id::String # this message id timestamp::String # message published timestamp. string(Dates.now()) - sendTo::String # topic/subject the sender sends to e.g. "/agent/wine/api/v1/prompt" - msgPurpose::String # purpose of this message e.g. "ACK | NACK | updateStatus | shutdown | ..." - senderName::String # sender name (String) e.g. "agent-wine-web-frontend" - senderId::String # sender id e.g. uuid4snakecase() - receiverName::String # msg receiver name (String) e.g. "agent-backend" - receiverId::String # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase() + send_to::String # topic/subject the sender sends to e.g. "/agent/wine/api/v1/prompt" + msg_purpose::String # purpose of this message e.g. "ACK | NACK | updateStatus | shutdown | ..." + sender_name::String # sender name (String) e.g. "agent-wine-web-frontend" + sender_id::String # sender id e.g. uuid4snakecase() + receiver_name::String # msg receiver name (String) e.g. "agent-backend" + receiver_id::String # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase() - replyTo::String # sender ask receiver to reply to this topic - replyToMsgId::String # the message id this message is replying to - brokerURL::String # mqtt/NATS server address + reply_to::String # sender ask receiver to reply to this topic + reply_to_msg_id::String # the message id this message is replying to + broker_url::String # mqtt/NATS server address metadata::Dict{String, Any} - payloads::AbstractArray{msgPayload_v1} # multiple payload store here + payloads::AbstractArray{msg_payload_v1} # multiple payload store here end # constructor -function msgEnvelope_v1( - sendTo::String, - payloads::AbstractArray{msgPayload_v1}; - correlationId::String = "", - msgId::String = "", +function msg_envelope_v1( + send_to::String, + payloads::AbstractArray{msg_payload_v1}; + correlation_id::String = "", + msg_id::String = "", timestamp::String = string(Dates.now()), - msgPurpose::String = "", - senderName::String = "", - senderId::String = "", - receiverName::String = "", - receiverId::String = "", - replyTo::String = "", - replyToMsgId::String = "", - brokerURL::String = DEFAULT_NATS_URL, + msg_purpose::String = "", + sender_name::String = "", + sender_id::String = "", + receiver_name::String = "", + receiver_id::String = "", + reply_to::String = "", + reply_to_msg_id::String = "", + broker_url::String = DEFAULT_BROKER_URL, metadata::Dict{String, Any} = Dict{String, Any}() ) - return msgEnvelope_v1( - correlationId, - msgId, + return msg_envelope_v1( + correlation_id, + msg_id, timestamp, - sendTo, - msgPurpose, - senderName, - senderId, - receiverName, - receiverId, - replyTo, - replyToMsgId, - brokerURL, + send_to, + msg_purpose, + sender_name, + sender_id, + receiver_name, + receiver_id, + reply_to, + reply_to_msg_id, + broker_url, metadata, payloads ) @@ -235,19 +234,19 @@ end -""" envelope_to_json - Convert msgEnvelope_v1 to JSON string -This function converts the msgEnvelope_v1 struct to a JSON string representation, +""" envelope_to_json - Convert msg_envelope_v1 to JSON string +This function converts the msg_envelope_v1 struct to a JSON string representation, preserving all metadata and payload information for NATS message publishing. # Function Workflow: -1. Creates a dictionary with envelope metadata (correlationId, msgId, timestamp, etc.) + 1. Creates a dictionary with envelope metadata (correlation_id, msg_id, timestamp, etc.) 2. Conditionally includes metadata dictionary if not empty 3. Iterates through payloads and converts each to JSON-compatible dictionary 4. Handles direct transport payloads (Base64 encoding) and link transport payloads (URL) 5. Returns final JSON string representation # Arguments: - - `env::msgEnvelope_v1` - The msgEnvelope_v1 struct to convert to JSON + - `env::msg_envelope_v1` - The msg_envelope_v1 struct to convert to JSON # Return: - `String` - JSON string representation of the envelope @@ -257,27 +256,27 @@ preserving all metadata and payload information for NATS message publishing. using UUIDs # Create an envelope with payloads -payload = msgPayload_v1("Hello", "text"; dataname="msg", transport="direct", encoding="base64") -env = msgEnvelope_v1("my.subject", [payload]) +payload = msg_payload_v1("Hello", "text"; dataname="msg", transport="direct", encoding="base64") +env = msg_envelope_v1("my.subject", [payload]) # Convert to JSON for publishing json_msg = envelope_to_json(env) ``` """ -function envelope_to_json(env::msgEnvelope_v1) +function envelope_to_json(env::msg_envelope_v1) obj = Dict{String, Any}( - "correlationId" => env.correlationId, - "msgId" => env.msgId, + "correlation_id" => env.correlation_id, + "msg_id" => env.msg_id, "timestamp" => env.timestamp, - "sendTo" => env.sendTo, - "msgPurpose" => env.msgPurpose, - "senderName" => env.senderName, - "senderId" => env.senderId, - "receiverName" => env.receiverName, - "receiverId" => env.receiverId, - "replyTo" => env.replyTo, - "replyToMsgId" => env.replyToMsgId, - "brokerURL" => env.brokerURL + "send_to" => env.send_to, + "msg_purpose" => env.msg_purpose, + "sender_name" => env.sender_name, + "sender_id" => env.sender_id, + "receiver_name" => env.receiver_name, + "receiver_id" => env.receiver_id, + "reply_to" => env.reply_to, + "reply_to_msg_id" => env.reply_to_msg_id, + "broker_url" => env.broker_url ) if !isempty(env.metadata) # Only include metadata if it exists and is not empty @@ -291,7 +290,7 @@ function envelope_to_json(env::msgEnvelope_v1) payload_obj = Dict{String, Any}( "id" => payload.id, "dataname" => payload.dataname, - "type" => payload.type, + "payload_type" => payload.type, "transport" => payload.transport, "encoding" => payload.encoding, "size" => payload.size, @@ -359,8 +358,8 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c 1. Iterates through the list of (dataname, data, type) tuples 2. For each payload: extracts the type from the tuple and serializes accordingly 3. Compares the serialized size against `size_threshold` -4. For small payloads: encodes as Base64, constructs a "direct" msgPayload_v1 -5. For large payloads: uploads to the fileserver, constructs a "link" msgPayload_v1 with the URL +4. For small payloads: encodes as Base64, constructs a "direct" msg_payload_v1 +5. For large payloads: uploads to the fileserver, constructs a "link" msg_payload_v1 with the URL 6. Converts envelope to JSON string and optionally publishes to NATS # Arguments: @@ -368,13 +367,13 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c - `data::AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples to send - `dataname::String` - Name of the payload - `data::Any` - The actual data to send - - `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" + - `payload_type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" - No standalone `type` parameter - type is specified per payload # Keyword Arguments: - - `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server + - `broker_url::String = DEFAULT_BROKER_URL` - URL of the NATS server - `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads - - `fileserverUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys) + - `fileserver_upload_handler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys) - `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport - `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated - `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc. @@ -387,7 +386,7 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c # Return: - A tuple `(env, env_json_str)` where: - - `env::msgEnvelope_v1` - The envelope object containing all metadata and payloads + - `env::msg_envelope_v1` - The envelope object containing all metadata and payloads - `env_json_str::String` - JSON string representation of the envelope for publishing # Example @@ -415,15 +414,15 @@ env, msg_json = smartsend("chat.subject", [ ]) # Publish the JSON string directly using NATS request-reply pattern -# reply = NATS.request(nats_url, subject, env_json_str; reply_to=reply_to_topic) +# reply = NATS.request(broker_url, subject, env_json_str; reply_to=reply_to_topic) ``` -""" #[PENDING] +""" function smartsend( subject::String, # smartreceive's subject data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples. Use Tuple{String, Any, String}[] for empty payloads - nats_url::String = DEFAULT_NATS_URL, + broker_url::String = DEFAULT_BROKER_URL, # NATS server URL fileserver_url = DEFAULT_FILESERVER_URL, - fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver + fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver size_threshold::Int = DEFAULT_SIZE_THRESHOLD, correlation_id::Union{String, Nothing} = nothing, msg_purpose::String = "chat", @@ -444,13 +443,13 @@ function smartsend( msg_id = string(uuid4()) # Process each payload in the list - payloads = msgPayload_v1[] + payloads = msg_payload_v1[] for (dataname, payload_data, payload_type) in data # Serialize data based on type payload_bytes = _serialize_data(payload_data, payload_type) payload_size = length(payload_bytes) # Calculate payload size in bytes - log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size + log_trace(cid, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size # Decision: Direct vs Link if payload_size < size_threshold # Check if payload is small enough for direct transport @@ -458,8 +457,8 @@ function smartsend( payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice - # Create msgPayload_v1 for direct transport - payload = msgPayload_v1( + # Create msg_payload_v1 for direct transport + payload = msg_payload_v1( payload_b64, payload_type; id = string(uuid4()), @@ -474,18 +473,18 @@ function smartsend( # Link path - Upload to HTTP server, send URL via NATS log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice - # Upload to HTTP server - response = fileserverUploadHandler(fileserver_url, dataname, payload_bytes) + # Upload to HTTP server + response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) if response["status"] != 200 # Check if upload was successful - error("Failed to upload data to fileserver: $(response["status"])") # Throw error if upload failed + error("Failed to upload data to fileserver: $(response["status"])") # Throw error if upload failed end url = response["url"] # URL for the uploaded data log_trace(cid, "Uploaded to URL: $url") # Log successful upload - # Create msgPayload_v1 for link transport - payload = msgPayload_v1( + # Create msg_payload_v1 for link transport + payload = msg_payload_v1( url, payload_type; id = string(uuid4()), @@ -499,26 +498,26 @@ function smartsend( end end - # Create msgEnvelope_v1 with all payloads - env = msgEnvelope_v1( - subject, - payloads; - correlationId = cid, - msgId = msg_id, - msgPurpose = msg_purpose, - senderName = sender_name, - senderId = string(uuid4()), - receiverName = receiver_name, - receiverId = receiver_id, - replyTo = reply_to, - replyToMsgId = reply_to_msg_id, - brokerURL = nats_url, - metadata = Dict{String, Any}(), - ) + # Create msg_envelope_v1 with all payloads + env = msg_envelope_v1( + subject, + payloads; + correlation_id = cid, + msg_id = msg_id, + msg_purpose = msg_purpose, + sender_name = sender_name, + sender_id = string(uuid4()), + receiver_name = receiver_name, + receiver_id = receiver_id, + reply_to = reply_to, + reply_to_msg_id = reply_to_msg_id, + broker_url = broker_url, + metadata = Dict{String, Any}(), + ) env_json_str = envelope_to_json(env) # Convert envelope to JSON if is_publish - publish_message(nats_url, subject, env_json_str, cid) # Publish message to NATS + publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS end return (env, env_json_str) @@ -539,7 +538,7 @@ It supports multiple serialization formats for different data types. # Arguments: - `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`) - - `type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary" + - `payload_type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary" # Return: - `Vector{UInt8}` - Binary representation of the serialized data @@ -585,7 +584,7 @@ binary_bytes = _serialize_data(buf, "binary") binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary") ``` """ -function _serialize_data(data::Any, type::String) +function _serialize_data(data::Any, payload_type::String) """ Example on how JSON.jl convert: dictionary -> json string -> json string bytes -> json string -> json object d = Dict( "name"=>"ton", @@ -602,40 +601,40 @@ function _serialize_data(data::Any, type::String) json_obj = JSON.parse(json_str_2) """ - if type == "text" # Text data - convert to UTF-8 bytes + if payload_type == "text" # Text data - convert to UTF-8 bytes if isa(data, String) data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes return data_bytes else error("Text data must be a String") end - elseif type == "dictionary" # JSON data - serialize directly + elseif payload_type == "dictionary" # JSON data - serialize directly json_str = JSON.json(data) # Convert Julia data to JSON string json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes return json_str_bytes - elseif type == "table" # Table data - convert to Arrow IPC stream + elseif payload_type == "table" # Table data - convert to Arrow IPC stream io = IOBuffer() # Create in-memory buffer Arrow.write(io, data) # Write data as Arrow IPC stream to buffer return take!(io) # Return the buffer contents as bytes - elseif type == "image" # Image data - treat as binary + elseif payload_type == "image" # Image data - treat as binary if isa(data, Vector{UInt8}) return data # Return binary data directly else error("Image data must be Vector{UInt8}") end - elseif type == "audio" # Audio data - treat as binary + elseif payload_type == "audio" # Audio data - treat as binary if isa(data, Vector{UInt8}) return data # Return binary data directly else error("Audio data must be Vector{UInt8}") end - elseif type == "video" # Video data - treat as binary + elseif payload_type == "video" # Video data - treat as binary if isa(data, Vector{UInt8}) return data # Return binary data directly else error("Video data must be Vector{UInt8}") end - elseif type == "binary" # Binary data - treat as binary + elseif payload_type == "binary" # Binary data - treat as binary if isa(data, IOBuffer) # Check if data is an IOBuffer return take!(data) # Return buffer contents as bytes elseif isa(data, Vector{UInt8}) # Check if data is already binary @@ -644,7 +643,7 @@ function _serialize_data(data::Any, type::String) error("Binary data must be binary (Vector{UInt8} or IOBuffer)") end else # Unknown type - error("Unknown type: $type") + error("Unknown payload_type: $type") end end @@ -654,7 +653,7 @@ This internal function publishes a message to a NATS subject with proper connection management and logging. # Arguments: - - `nats_url::String` - NATS server URL (e.g., "nats://localhost:4222") + - `broker_url::String` - NATS server URL (e.g., "nats://localhost:4222") - `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt") - `message::String` - JSON message to publish - `correlation_id::String` - Correlation ID for tracing and logging @@ -663,18 +662,18 @@ connection management and logging. - `nothing` - This function performs publishing but returns nothing # Example -```jldoctest -using NATS + ```jldoctest + using NATS -# Prepare JSON message -message = "{\"correlationId\":\"abc123\",\"payload\":\"test\"}" + # Prepare JSON message + message = "{\"correlation_id\":\"abc123\",\"payload\":\"test\"}" -# Publish to NATS -publish_message("nats://localhost:4222", "my.subject", message, "abc123") -``` + # Publish to NATS + publish_message("nats://localhost:4222", "my.subject", message, "abc123") + ``` """ -function publish_message(nats_url::String, subject::String, message::String, correlation_id::String) - conn = NATS.connect(nats_url) # Create NATS connection +function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) + conn = NATS.connect(broker_url) # Create NATS connection try NATS.publish(conn, subject, message) # Publish message to NATS log_trace(correlation_id, "Message published to $subject") # Log successful publish @@ -701,7 +700,7 @@ A HTTP file server is required along with its download function. - `msg::NATS.Msg` - NATS message to process # Keyword Arguments: - - `fileserverDownloadHandler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs + - `fileserver_download_handler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs - `max_retries::Int = 5` - Maximum retry attempts for fetching URL - `base_delay::Int = 100` - Initial delay for exponential backoff in ms - `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms @@ -710,23 +709,23 @@ A HTTP file server is required along with its download function. - `AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples # Example -```jldoctest -# Receive and process message -msg = nats_message # NATS message -payloads = smartreceive(msg; fileserverDownloadHandler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000) -# payloads = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...] -``` + ```jldoctest + # Receive and process message + msg = nats_message # NATS message + payloads = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000) + # payloads = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...] + ``` """ function smartreceive( msg::NATS.Msg; - fileserverDownloadHandler::Function=_fetch_with_backoff, + fileserver_download_handler::Function=_fetch_with_backoff, max_retries::Int = 5, base_delay::Int = 100, max_delay::Int = 5000 ) # Parse the JSON envelope - json_data = JSON.parse(String(msg.payload)) - log_trace(json_data["correlationId"], "Processing received message") # Log message processing start + json_data = JSON.parse(String(msg.payload)) + log_trace(json_data["correlation_id"], "Processing received message") # Log message processing start # Process all payloads in the envelope payloads_list = Tuple{String, Any, String}[] @@ -740,7 +739,7 @@ function smartreceive( dataname = String(payload["dataname"]) if transport == "direct" # Direct transport - payload is in the message - log_trace(json_data["correlationId"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling + log_trace(json_data["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling # Extract base64 payload from the payload payload_b64 = String(payload["data"]) @@ -749,21 +748,21 @@ function smartreceive( payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes # Deserialize based on type - data_type = String(payload["type"]) - data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"]) + data_type = String(payload["payload_type"]) + data = _deserialize_data(payload_bytes, data_type, json_data["correlation_id"]) push!(payloads_list, (dataname, data, data_type)) elseif transport == "link" # Link transport - payload is at URL # Extract download URL from the payload url = String(payload["data"]) - log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling + log_trace(json_data["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling # Fetch with exponential backoff using the download handler - downloaded_data = fileserverDownloadHandler(url, max_retries, base_delay, max_delay, json_data["correlationId"]) + downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, json_data["correlation_id"]) # Deserialize based on type - data_type = String(payload["type"]) - data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"]) + data_type = String(payload["payload_type"]) + data = _deserialize_data(downloaded_data, data_type, json_data["correlation_id"]) push!(payloads_list, (dataname, data, data_type)) else # Unknown transport type @@ -851,7 +850,7 @@ It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow # Arguments: - `data::Vector{UInt8}` - Serialized data as bytes - - `type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") + - `payload_type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") - `correlation_id::String` - Correlation ID for logging # Return: @@ -877,28 +876,28 @@ table_data = _deserialize_data(arrow_bytes, "table", "correlation123") """ function _deserialize_data( data::Vector{UInt8}, - type::String, + payload_type::String, correlation_id::String ) - if type == "text" # Text data - convert to string + if payload_type == "text" # Text data - convert to string return String(data) # Convert bytes to string - elseif type == "dictionary" # JSON data - deserialize + elseif payload_type == "dictionary" # JSON data - deserialize json_str = String(data) # Convert bytes to string return JSON.parse(json_str) # Parse JSON string to JSON object - elseif type == "table" # Table data - deserialize Arrow IPC stream + elseif payload_type == "table" # Table data - deserialize Arrow IPC stream io = IOBuffer(data) # Create buffer from bytes df = Arrow.Table(io) # Read Arrow IPC format from buffer return df # Return DataFrame - elseif type == "image" # Image data - return binary + elseif payload_type == "image" # Image data - return binary return data # Return bytes directly - elseif type == "audio" # Audio data - return binary + elseif payload_type == "audio" # Audio data - return binary return data # Return bytes directly - elseif type == "video" # Video data - return binary + elseif payload_type == "video" # Video data - return binary return data # Return bytes directly - elseif type == "binary" # Binary data - return binary + elseif payload_type == "binary" # Binary data - return binary return data # Return bytes directly else # Unknown type - error("Unknown type: $type") # Throw error for unknown type + error("Unknown payload_type: $payload_type") # Throw error for unknown type end end @@ -915,9 +914,9 @@ retrieves an upload ID and token, then uploads the file data as multipart form d 4. Returns identifiers and download URL for the uploaded file # Arguments: - - `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) - - `filename::String` - Name of the file being uploaded - - `data::Vector{UInt8}` - Raw byte data of the file content + - `file_server_url::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) + - `filename::String` - Name of the file being uploaded + - `data::Vector{UInt8}` - Raw byte data of the file content # Return: - `Dict{String, Any}` - Dictionary with keys: @@ -927,36 +926,36 @@ retrieves an upload ID and token, then uploads the file data as multipart form d - `"url"` - Full URL to download the uploaded file # Example -```jldoctest -using HTTP, JSON + ```jldoctest + using HTTP, JSON -fileServerURL = "http://localhost:8080" -filename = "test.txt" -data = UInt8["hello world"] + file_server_url = "http://localhost:8080" + filename = "test.txt" + data = UInt8["hello world"] -# Upload to local plik server -result = plik_oneshot_upload(fileServerURL, filename, data) + # Upload to local plik server + result = plik_oneshot_upload(file_server_url, filename, data) -# Access the result as a Dict -# result["status"], result["uploadid"], result["fileid"], result["url"] -``` + # Access the result as a Dict + # result["status"], result["uploadid"], result["fileid"], result["url"] + ``` """ -function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8}) +function plik_oneshot_upload(file_server_url::String, filename::String, data::Vector{UInt8}) # ----------------------------------------- get upload id ---------------------------------------- # # Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload - url_getUploadID = "$fileServerURL/upload" # URL to get upload ID + url_getUploadID = "$file_server_url/upload" # URL to get upload ID headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" - httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) - responseJson = JSON.parse(httpResponse.body) - uploadid = responseJson["id"] - uploadtoken = responseJson["uploadToken"] + http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + response_json = JSON.parse(http_response.body) + uploadid = response_json["id"] + uploadtoken = response_json["uploadToken"] # ------------------------------------------ upload file ----------------------------------------- # # Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload - url_upload = "$fileServerURL/file/$uploadid" + url_upload = "$file_server_url/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] # Create the multipart form data @@ -965,24 +964,24 @@ function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vect )) # Execute the POST request - httpResponse = nothing + http_response = nothing try - httpResponse = HTTP.post(url_upload, headers, form) - responseJson = JSON.parse(httpResponse.body) + http_response = HTTP.post(url_upload, headers, form) + response_json = JSON.parse(http_response.body) catch e @error "Request failed" exception=e end - fileid = responseJson["id"] + fileid = response_json["id"] # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" - url = "$fileServerURL/file/$uploadid/$fileid/$filename" + url = "$file_server_url/file/$uploadid/$fileid/$filename" - return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) + return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end -""" plik_oneshot_upload(fileServerURL::String, filepath::String) +""" plik_oneshot_upload(file_server_url::String, filepath::String) This function uploads a file from disk to a plik server in one-shot mode (no upload session). It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`, retrieves an upload ID and token, then uploads the file data as multipart form data using the token. @@ -994,7 +993,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d 4. Returns identifiers and download URL for the uploaded file # Arguments: - - `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) + - `file_server_url::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) - `filepath::String` - Full path to the local file to upload # Return: @@ -1008,56 +1007,54 @@ retrieves an upload ID and token, then uploads the file data as multipart form d ```jldoctest using HTTP, JSON -fileServerURL = "http://localhost:8080" +file_server_url = "http://localhost:8080" filepath = "./test.zip" # Upload to local plik server -result = plik_oneshot_upload(fileServerURL, filepath) +result = plik_oneshot_upload(file_server_url, filepath) # Access the result as a Dict # result["status"], result["uploadid"], result["fileid"], result["url"] ``` """ -function plik_oneshot_upload(fileServerURL::String, filepath::String) +function plik_oneshot_upload(file_server_url::String, filepath::String) # ----------------------------------------- get upload id ---------------------------------------- # # Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload filename = basename(filepath) - url_getUploadID = "$fileServerURL/upload" # URL to get upload ID + url_getUploadID = "$file_server_url/upload" # URL to get upload ID headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" - httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) - responseJson = JSON.parse(httpResponse.body) + http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + response_json = JSON.parse(http_response.body) - uploadid = responseJson["id"] - uploadtoken = responseJson["uploadToken"] + uploadid = response_json["id"] + uploadtoken = response_json["uploadToken"] # ------------------------------------------ upload file ----------------------------------------- # # Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID - file_multipart = open(filepath, "r") - url_upload = "$fileServerURL/file/$uploadid" + url_upload = "$file_server_url/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] + response = open(filepath, "r") do file_stream + form = HTTP.Form(Dict("file" => file_stream)) + + # Adding status_exception=false prevents 4xx/5xx from triggering 'catch' + HTTP.post(url_upload, headers, form; status_exception = false) + end - # Create the multipart form data - form = HTTP.Form(Dict( - "file" => file_multipart - )) - - # Execute the POST request - httpResponse = nothing - try - httpResponse = HTTP.post(url_upload, headers, form) - responseJson = JSON.parse(httpResponse.body) - catch e - @error "Request failed" exception=e + if !isnothing(response) && response.status == 200 + println("Success!") + else + println("Server returned an error code: ", response.status) end + response_json = JSON.parse(response.body) - fileid = responseJson["id"] + fileid = response_json["id"] # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" - url = "$fileServerURL/file/$uploadid/$fileid/$filename" + url = "$file_server_url/file/$uploadid/$fileid/$filename" - return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) + return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end