From 45f125789674b9daf523674ca189fa582fce91a0 Mon Sep 17 00:00:00 2001 From: narawat Date: Tue, 24 Feb 2026 18:50:28 +0700 Subject: [PATCH] update --- src/NATSBridge.jl | 64 +++++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index a5a7faf..4edc503 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -100,9 +100,9 @@ payload = msg_payload_v1( struct msg_payload_v1 id::String # id of this payload e.g. "uuid4" dataname::String # name of this payload e.g. "login_image" - payload_type::String # this payload type. Can be "text | dictionary | table | image | audio | video | binary" - transport::String # "direct | link" - encoding::String # "none | json | base64 | arrow-ipc" + payload_type::String # this payload type. Can be "text", "dictionary", "table", "image", "audio", "video", "binary" + transport::String # transport method: "direct" or "link" + encoding::String # encoding method: "none", "json", "base64", "arrow-ipc" size::Integer # data size in bytes e.g. 15433 data::Any # payload data in case of direct transport or a URL in case of link metadata::Dict{String, Any} # Dict("checksum" => "sha256_hash", ...) This metadata is for this payload @@ -138,7 +138,7 @@ with metadata for routing, tracing, and message context. # Arguments: - `send_to::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt") - - `payloads::AbstractArray{msg_payload_v1}` - List of payloads to include in the message + - `payloads::Vector{msg_payload_v1}` - List of payloads to include in the message # Keyword Arguments: - `correlation_id::String = ""` - Unique identifier to track messages across systems; auto-generated if empty @@ -180,27 +180,27 @@ env = msg_envelope_v1( struct msg_envelope_v1 correlation_id::String # Unique identifier to track messages across systems. Many senders can talk about the same topic. msg_id::String # this message id - timestamp::String # message published timestamp. string(Dates.now()) - + timestamp::String # message published timestamp (string(Dates.now())) + send_to::String # topic/subject the sender sends to e.g. "/agent/wine/api/v1/prompt" - msg_purpose::String # purpose of this message e.g. "ACK | NACK | updateStatus | shutdown | ..." + msg_purpose::String # purpose of this message e.g. "ACK", "NACK", "updateStatus", "shutdown", ... sender_name::String # sender name (String) e.g. "agent-wine-web-frontend" - sender_id::String # sender id e.g. uuid4snakecase() + sender_id::String # sender id e.g. uuid4() receiver_name::String # msg receiver name (String) e.g. "agent-backend" - receiver_id::String # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase() - + receiver_id::String # msg receiver id, nothing means everyone in the topic e.g. uuid4() + reply_to::String # sender ask receiver to reply to this topic - reply_to_msg_id::String # the message id this message is replying to - broker_url::String # mqtt/NATS server address - + reply_to_msg_id::String # the message id this message is replying to + broker_url::String # NATS server address + metadata::Dict{String, Any} - payloads::AbstractArray{msg_payload_v1} # multiple payload store here + payloads::Vector{msg_payload_v1} # multiple payload store here end # constructor function msg_envelope_v1( send_to::String, - payloads::AbstractArray{msg_payload_v1}; + payloads::Vector{msg_payload_v1}; correlation_id::String = "", msg_id::String = "", timestamp::String = string(Dates.now()), @@ -422,7 +422,7 @@ function smartsend( data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples. Use Tuple{String, Any, String}[] for empty payloads broker_url::String = DEFAULT_BROKER_URL, # NATS server URL fileserver_url = DEFAULT_FILESERVER_URL, - fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver + fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver size_threshold::Int = DEFAULT_SIZE_THRESHOLD, correlation_id::Union{String, Nothing} = nothing, msg_purpose::String = "chat", @@ -544,8 +544,8 @@ It supports multiple serialization formats for different data types. - `Vector{UInt8}` - Binary representation of the serialized data # Throws: - - `Error` if `type` is not one of the supported types - - `Error` if `type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}` + - `Error` if `payload_type` is not one of the supported types + - `Error` if `payload_type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}` # Example ```jldoctest @@ -706,15 +706,15 @@ A HTTP file server is required along with its download function. - `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms # Return: - - `AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples + - `Vector{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples # Example - ```jldoctest - # Receive and process message - msg = nats_message # NATS message - payloads = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000) - # payloads = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...] - ``` +```jldoctest +# Receive and process message +msg = nats_message # NATS message +payloads = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000) +# payloads = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...] +``` """ function smartreceive( msg::NATS.Msg; @@ -857,12 +857,12 @@ It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow - Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary") # Throws: - - `Error` if `type` is not one of the supported types + - `Error` if `payload_type` is not one of the supported types # Example ```jldoctest # Text data -text_bytes = UInt8["Hello World"] +text_bytes = Vector{UInt8}("Hello World") text_data = _deserialize_data(text_bytes, "text", "correlation123") # JSON data @@ -870,7 +870,7 @@ json_bytes = UInt8[123, 34, 110, 97, 109, 101, 34, 58, 34, 65, 108, 105, 99, 101 json_data = _deserialize_data(json_bytes, "dictionary", "correlation123") # Arrow IPC data (table) -arrow_bytes = UInt8[1, 2, 3] # Arrow IPC bytes +arrow_bytes = Vector{UInt8}([1, 2, 3]) # Arrow IPC bytes table_data = _deserialize_data(arrow_bytes, "table", "correlation123") ``` """ @@ -931,7 +931,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d file_server_url = "http://localhost:8080" filename = "test.txt" - data = UInt8["hello world"] + data = Vector{UInt8}("hello world") # Upload to local plik server result = plik_oneshot_upload(file_server_url, filename, data) @@ -1042,9 +1042,9 @@ function plik_oneshot_upload(file_server_url::String, filepath::String) end if !isnothing(http_response) && http_response.status == 200 - println("Success!") + # Success - response already logged by caller else - println("Server returned an error code: ", http_response.status) + error("Failed to upload file: server returned status $(http_response.status)") end response_json = JSON.parse(http_response.body) @@ -1057,7 +1057,7 @@ function plik_oneshot_upload(file_server_url::String, filepath::String) end function _get_payload_bytes(data) - @error "did't implement yet" + @error "didn't implement yet" end