This commit is contained in:
2026-02-22 13:26:44 +07:00
parent 4b5b5d6ed8
commit 9e5ee61785
5 changed files with 1579 additions and 68 deletions

View File

@@ -10,7 +10,7 @@
#
# Handler Function Signatures:
#
# ```julia
# ```jldoctest
# # Upload handler - uploads data to file server and returns URL
# fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
#
@@ -23,7 +23,7 @@
# Even when sending a single payload, the user must wrap it in a list.
#
# API Standard:
# ```julia
# ```jldoctest
# # Input format for smartsend (always a list of tuples with type info)
# [(dataname1, data1, type1), (dataname2, data2, type2), ...]
#
@@ -45,6 +45,59 @@ const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL
const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport
""" msgPayload_v1 - Internal message payload structure
This structure represents a single payload within a NATS message envelope.
It supports both direct transport (base64-encoded data) and link transport (URL-based).
# Arguments:
- `id::String` - Unique identifier for this payload (e.g., "uuid4")
- `dataname::String` - Name of the payload (e.g., "login_image")
- `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
- `transport::String` - Transport method: "direct" or "link"
- `encoding::String` - Encoding method: "none", "json", "base64", "arrow-ipc"
- `size::Integer` - Size of the payload in bytes (e.g., 15433)
- `data::Any` - Payload data (bytes for direct, URL for link)
- `metadata::Dict{String, Any}` - Optional metadata dictionary
# Keyword Arguments:
- `id::String = ""` - Payload ID, auto-generated if empty
- `dataname::String = string(uuid4())` - Payload name, auto-generated UUID if empty
- `transport::String = "direct"` - Transport method
- `encoding::String = "none"` - Encoding method
- `size::Integer = 0` - Payload size
- `metadata::Dict{String, T} = Dict{String, Any}()` - Metadata dictionary
# Return:
- A msgPayload_v1 struct instance
# Example
```jldoctest
using UUIDs
# Create a direct transport payload
payload = msgPayload_v1(
"Hello World",
"text";
id = string(uuid4()),
dataname = "message",
transport = "direct",
encoding = "base64",
size = 11,
metadata = Dict{String, Any}()
)
# Create a link transport payload
payload = msgPayload_v1(
"http://example.com/file.zip",
"binary";
id = string(uuid4()),
dataname = "file",
transport = "link",
encoding = "none",
size = 1000000
)
```
"""
struct msgPayload_v1
id::String # id of this payload e.g. "uuid4"
dataname::String # name of this payload e.g. "login_image"
@@ -68,18 +121,63 @@ function msgPayload_v1(
metadata::Dict{String, T} = Dict{String, Any}()
) where {T<:Any}
return msgPayload_v1(
id,
dataname,
type,
transport,
encoding,
size,
data,
metadata
)
id,
dataname,
type,
transport,
encoding,
size,
data,
metadata
)
end
""" msgEnvelope_v1 - Internal message envelope structure
This structure represents a complete NATS message envelope containing multiple payloads
with metadata for routing, tracing, and message context.
# Arguments:
- `sendTo::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt")
- `payloads::AbstractArray{msgPayload_v1}` - List of payloads to include in the message
# Keyword Arguments:
- `correlationId::String = ""` - Unique identifier to track messages across systems; auto-generated if empty
- `msgId::String = ""` - Unique message identifier; auto-generated if empty
- `timestamp::String = string(Dates.now())` - Message publication timestamp
- `msgPurpose::String = ""` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
- `senderName::String = ""` - Name of the sender (e.g., "agent-wine-web-frontend")
- `senderId::String = ""` - UUID of the sender; auto-generated if empty
- `receiverName::String = ""` - Name of the receiver (empty string means broadcast)
- `receiverId::String = ""` - UUID of the receiver (empty string means broadcast)
- `replyTo::String = ""` - Topic where receiver should reply (empty string if no reply expected)
- `replyToMsgId::String = ""` - Message ID this message is replying to
- `brokerURL::String = DEFAULT_NATS_URL` - NATS broker URL
- `metadata::Dict{String, Any} = Dict{String, Any}()` - Optional message-level metadata
# Return:
- A msgEnvelope_v1 struct instance
# Example
```jldoctest
using UUIDs, NATSBridge
# Create payloads for the message
payload1 = msgPayload_v1("Hello", "text"; dataname="message", transport="direct", encoding="base64")
payload2 = msgPayload_v1("http://example.com/file.zip", "binary"; dataname="file", transport="link")
# Create message envelope
env = msgEnvelope_v1(
"my.subject",
[payload1, payload2];
correlationId = string(uuid4()),
msgPurpose = "chat",
senderName = "my-app",
receiverName = "receiver-app",
replyTo = "reply.subject"
)
```
"""
struct msgEnvelope_v1
correlationId::String # Unique identifier to track messages across systems. Many senders can talk about the same topic.
msgId::String # this message id
@@ -137,8 +235,34 @@ end
""" Convert msgEnvelope_v1 to JSON string
This function converts the msgEnvelope_v1 struct to a JSON string representation.
""" envelope_to_json - Convert msgEnvelope_v1 to JSON string
This function converts the msgEnvelope_v1 struct to a JSON string representation,
preserving all metadata and payload information for NATS message publishing.
# Function Workflow:
1. Creates a dictionary with envelope metadata (correlationId, msgId, timestamp, etc.)
2. Conditionally includes metadata dictionary if not empty
3. Iterates through payloads and converts each to JSON-compatible dictionary
4. Handles direct transport payloads (Base64 encoding) and link transport payloads (URL)
5. Returns final JSON string representation
# Arguments:
- `env::msgEnvelope_v1` - The msgEnvelope_v1 struct to convert to JSON
# Return:
- `String` - JSON string representation of the envelope
# Example
```jldoctest
using UUIDs
# Create an envelope with payloads
payload = msgPayload_v1("Hello", "text"; dataname="msg", transport="direct", encoding="base64")
env = msgEnvelope_v1("my.subject", [payload])
# Convert to JSON for publishing
json_msg = envelope_to_json(env)
```
"""
function envelope_to_json(env::msgEnvelope_v1)
obj = Dict{String, Any}(
@@ -197,9 +321,24 @@ function envelope_to_json(env::msgEnvelope_v1)
end
""" Log a trace message with correlation ID and timestamp
""" log_trace - Log a trace message with correlation ID and timestamp
This function logs information messages with a correlation ID for tracing purposes,
making it easier to track message flow across distributed systems.
# Arguments:
- `correlation_id::String` - Correlation ID to identify the message flow
- `message::String` - The message content to log
# Return:
- `nothing` - This function performs logging but returns nothing
# Example
```jldoctest
using Dates
log_trace("abc123", "Starting message processing")
# Logs: [2026-02-21T05:39:00] [Correlation: abc123] Starting message processing
```
"""
function log_trace(correlation_id::String, message::String)
timestamp = Dates.now() # Get current timestamp
@@ -216,7 +355,7 @@ Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_u
The function accepts a list of (dataname, data, type) tuples as input and processes each payload individually.
Each payload can have a different type, enabling mixed-content messages (e.g., chat with text, images, audio).
The function workflow:
# Function Workflow:
1. Iterates through the list of (dataname, data, type) tuples
2. For each payload: extracts the type from the tuple and serializes accordingly
3. Compares the serialized size against `size_threshold`
@@ -247,7 +386,7 @@ The function workflow:
- A `msgEnvelope_v1` object containing metadata and transport information
# Example
```julia
```jldoctest
using UUIDs
# Send a single payload (still wrapped in a list)
@@ -376,26 +515,20 @@ end
""" _serialize_data - Serialize data according to specified format
This function serializes arbitrary Julia data into a binary representation based on the specified format.
It supports multiple serialization formats:
- `"text"`: Treats data as text and converts to UTF-8 bytes
- `"dictionary"`: Serializes data as JSON and returns the UTF-8 byte representation
- `"table"`: Serializes data as an Arrow IPC stream (table format) and returns the byte stream
- `"image"`: Expects binary image data (Vector{UInt8}) and returns it as bytes
- `"audio"`: Expects binary audio data (Vector{UInt8}) and returns it as bytes
- `"video"`: Expects binary video data (Vector{UInt8}) and returns it as bytes
- `"binary"`: Generic binary data (Vector{UInt8} or IOBuffer) and returns bytes
It supports multiple serialization formats for different data types.
The function handles format-specific serialization logic:
1. For `"text"`: Converts string to UTF-8 bytes
2. For `"dictionary"`: Converts Julia data to JSON string, then encodes to bytes
3. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer
4. For `"image"`, `"audio"`, `"video"`: Treats data as binary (Vector{UInt8})
5. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly
# Function Workflow:
1. Validates the data type against the specified format
2. Converts data to binary representation according to format rules
3. For text: converts string to UTF-8 bytes
4. For dictionary: serializes as JSON then converts to bytes
5. For table: uses Arrow.jl to write as IPC stream
6. For image/audio/video/binary: returns binary data directly
# Arguments:
- `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`)
- `type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
# Return:
- `Vector{UInt8}` - Binary representation of the serialized data
@@ -405,7 +538,7 @@ The function handles format-specific serialization logic:
- `Error` if `type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}`
# Example
```julia
```jldoctest
using JSON, Arrow, DataFrames
# Text serialization
@@ -505,15 +638,29 @@ function _serialize_data(data::Any, type::String)
end
""" Publish message to NATS
""" publish_message - Publish message to NATS
This internal function publishes a message to a NATS subject with proper
connection management and logging.
Arguments:
- `nats_url::String` - NATS server URL
- `subject::String` - NATS subject to publish to
# Arguments:
- `nats_url::String` - NATS server URL (e.g., "nats://localhost:4222")
- `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
- `message::String` - JSON message to publish
- `correlation_id::String` - Correlation ID for logging
- `correlation_id::String` - Correlation ID for tracing and logging
# Return:
- `nothing` - This function performs publishing but returns nothing
# Example
```jldoctest
using NATS
# Prepare JSON message
message = "{\"correlationId\":\"abc123\",\"payload\":\"test\"}"
# Publish to NATS
publish_message("nats://localhost:4222", "my.subject", message, "abc123")
```
"""
function publish_message(nats_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(nats_url) # Create NATS connection
@@ -532,20 +679,27 @@ This function processes incoming NATS messages, handling both direct transport
It deserializes the data based on the transport type and returns the result.
A HTTP file server is required along with its download function.
Arguments:
# Function Workflow:
1. Parses the JSON envelope from the NATS message
2. Iterates through each payload in the envelope
3. For each payload: determines the transport type (direct or link)
4. For direct transport: decodes Base64 payload and deserializes based on type
5. For link transport: fetches data from URL with exponential backoff, then deserializes
# Arguments:
- `msg::NATS.Msg` - NATS message to process
- `fileserverDownloadHandler::Function` - Function to handle downloading data from file server URLs
Keyword Arguments:
- `max_retries::Int` - Maximum retry attempts for fetching URL (default: 5)
- `base_delay::Int` - Initial delay for exponential backoff in ms (default: 100)
- `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000)
# Keyword Arguments:
- `fileserverDownloadHandler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs
- `max_retries::Int = 5` - Maximum retry attempts for fetching URL
- `base_delay::Int = 100` - Initial delay for exponential backoff in ms
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
Return:
# Return:
- `AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples
# Example
```julia
```jldoctest
# Receive and process message
msg = nats_message # NATS message
payloads = smartreceive(msg; fileserverDownloadHandler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
@@ -610,19 +764,35 @@ function smartreceive(
end
""" Fetch data from URL with exponential backoff
""" _fetch_with_backoff - Fetch data from URL with exponential backoff
This internal function retrieves data from a URL with retry logic using
exponential backoff to handle transient failures.
Arguments:
# Function Workflow:
1. Initializes delay with base_delay value
2. Attempts to fetch data from URL in a retry loop
3. On success: logs success and returns response body as bytes
4. On failure: sleeps using exponential backoff and retries
5. After max_retries: throws error indicating failure
# Arguments:
- `url::String` - URL to fetch from
- `max_retries::Int` - Maximum number of retry attempts
- `base_delay::Int` - Initial delay in milliseconds
- `max_delay::Int` - Maximum delay in milliseconds
- `correlation_id::String` - Correlation ID for logging
Return:
- Vector{UInt8} - Fetched data as bytes
# Return:
- `Vector{UInt8}` - Fetched data as bytes
# Throws:
- `Error` if all retry attempts fail
# Example
```jldoctest
# Fetch data with exponential backoff
data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "correlation123")
```
"""
function _fetch_with_backoff(
url::String,
@@ -655,18 +825,44 @@ function _fetch_with_backoff(
end
""" Deserialize bytes to data based on type
""" _deserialize_data - Deserialize bytes to data based on type
This internal function converts serialized bytes back to Julia data based on type.
It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow IPC deserialization),
"image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data).
Arguments:
# Function Workflow:
1. Validates the data type against supported formats
2. Converts bytes to appropriate Julia data type based on format
3. For text: converts bytes to string
4. For dictionary: converts bytes to JSON string then parses to Julia object
5. For table: reads Arrow IPC format and returns DataFrame
6. For image/audio/video/binary: returns bytes directly
# Arguments:
- `data::Vector{UInt8}` - Serialized data as bytes
- `type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
- `correlation_id::String` - Correlation ID for logging
Return:
# Return:
- Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary")
# Throws:
- `Error` if `type` is not one of the supported types
# Example
```jldoctest
# Text data
text_bytes = UInt8["Hello World"]
text_data = _deserialize_data(text_bytes, "text", "correlation123")
# JSON data
json_bytes = UInt8[123, 34, 110, 97, 109, 101, 34, 58, 34, 65, 108, 105, 99, 101, 125] # {"name":"Alice"}
json_data = _deserialize_data(json_bytes, "dictionary", "correlation123")
# Arrow IPC data (table)
arrow_bytes = UInt8[1, 2, 3] # Arrow IPC bytes
table_data = _deserialize_data(arrow_bytes, "table", "correlation123")
```
"""
function _deserialize_data(
data::Vector{UInt8},
@@ -697,15 +893,15 @@ end
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
This function uploads a raw byte array to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
The function workflow:
1. Obtains an upload ID and token from the server
2. Uploads the provided binary data as a file using the `X-UploadToken` header
3. Returns identifiers and download URL for the uploaded file
# Function Workflow:
1. Creates a one-shot upload session by sending POST request with `{"OneShot": true}`
2. Retrieves upload ID and token from server response
3. Uploads binary data as multipart form data using the token
4. Returns identifiers and download URL for the uploaded file
# Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
@@ -713,14 +909,14 @@ The function workflow:
- `data::Vector{UInt8}` - Raw byte data of the file content
# Return:
- A Dict with keys:
- `Dict{String, Any}` - Dictionary with keys:
- `"status"` - HTTP server response status
- `"uploadid"` - ID of the one-shot upload session
- `"fileid"` - ID of the uploaded file within the session
- `"url"` - Full URL to download the uploaded file
# Example
```julia
```jldoctest
using HTTP, JSON
fileServerURL = "http://localhost:8080"
@@ -776,31 +972,29 @@ end
""" plik_oneshot_upload(fileServerURL::String, filepath::String)
Upload a single file to a plik server using one-shot mode.
This function uploads a file from disk to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
The function workflow:
1. Obtains an upload ID and token from the server
2. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header
3. Returns identifiers and download URL for the uploaded file
# Function Workflow:
1. Creates a one-shot upload session by sending POST request with `{"OneShot": true}`
2. Retrieves upload ID and token from server response
3. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header
4. Returns identifiers and download URL for the uploaded file
# Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `filepath::String` - Full path to the local file to upload
# Return:
- A Dict with keys:
- `Dict{String, Any}` - Dictionary with keys:
- `"status"` - HTTP server response status
- `"uploadid"` - ID of the one-shot upload session
- `"fileid"` - ID of the uploaded file within the session
- `"url"` - Full URL to download the uploaded file
# Example
```julia
```jldoctest
using HTTP, JSON
fileServerURL = "http://localhost:8080"