Files
NATSBridge/src/NATSBridge.jl
2026-02-22 20:43:28 +07:00

1072 lines
41 KiB
Julia

# Bi-Directional Data Bridge - Julia Module
# Implements smartsend and smartreceive for NATS communication
# This module provides functionality for sending and receiving data across network boundaries
# using NATS as the message bus, with support for both direct payload transport and
# URL-based transport for larger payloads.
#
# File Server Handler Architecture:
# The system uses handler functions to abstract file server operations, allowing support
# for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
#
# Handler Function Signatures:
#
# ```jldoctest
# # Upload handler - uploads data to file server and returns URL
# fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
#
# # Download handler - fetches data from file server URL with exponential backoff
# fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
# ```
#
# Multi-Payload Support (Standard API):
# The system uses a standardized list-of-tuples format for all payload operations.
# Even when sending a single payload, the user must wrap it in a list.
#
# API Standard:
# ```jldoctest
# # Input format for smartsend (always a list of tuples with type info)
# [(dataname1, data1, type1), (dataname2, data2, type2), ...]
#
# # Output format for smartreceive (always returns a list of tuples)
# [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# ```
#
# Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
module NATSBridge
using Revise
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting
# ---------------------------------------------- 100 --------------------------------------------- #
# Constants
const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport
const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL
const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport
""" msgPayload_v1 - Internal message payload structure
This structure represents a single payload within a NATS message envelope.
It supports both direct transport (base64-encoded data) and link transport (URL-based).
# Arguments:
- `id::String` - Unique identifier for this payload (e.g., "uuid4")
- `dataname::String` - Name of the payload (e.g., "login_image")
- `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
- `transport::String` - Transport method: "direct" or "link"
- `encoding::String` - Encoding method: "none", "json", "base64", "arrow-ipc"
- `size::Integer` - Size of the payload in bytes (e.g., 15433)
- `data::Any` - Payload data (bytes for direct, URL for link)
- `metadata::Dict{String, Any}` - Optional metadata dictionary
# Keyword Arguments:
- `id::String = ""` - Payload ID, auto-generated if empty
- `dataname::String = string(uuid4())` - Payload name, auto-generated UUID if empty
- `transport::String = "direct"` - Transport method
- `encoding::String = "none"` - Encoding method
- `size::Integer = 0` - Payload size
- `metadata::Dict{String, T} = Dict{String, Any}()` - Metadata dictionary
# Return:
- A msgPayload_v1 struct instance
# Example
```jldoctest
using UUIDs
# Create a direct transport payload
payload = msgPayload_v1(
"Hello World",
"text";
id = string(uuid4()),
dataname = "message",
transport = "direct",
encoding = "base64",
size = 11,
metadata = Dict{String, Any}()
)
# Create a link transport payload
payload = msgPayload_v1(
"http://example.com/file.zip",
"binary";
id = string(uuid4()),
dataname = "file",
transport = "link",
encoding = "none",
size = 1000000
)
```
"""
struct msgPayload_v1
id::String # id of this payload e.g. "uuid4"
dataname::String # name of this payload e.g. "login_image"
type::String # this payload type. Can be "text | dictionary | table | image | audio | video | binary"
transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc"
size::Integer # data size in bytes e.g. 15433
data::Any # payload data in case of direct transport or a URL in case of link
metadata::Dict{String, Any} # Dict("checksum" => "sha256_hash", ...) This metadata is for this payload
end
# constructor
function msgPayload_v1(
data::Any,
type::String;
id::String = "",
dataname::String = string(uuid4()),
transport::String = "direct",
encoding::String = "none",
size::Integer = 0,
metadata::Dict{String, T} = Dict{String, Any}()
) where {T<:Any}
return msgPayload_v1(
id,
dataname,
type,
transport,
encoding,
size,
data,
metadata
)
end
""" msgEnvelope_v1 - Internal message envelope structure
This structure represents a complete NATS message envelope containing multiple payloads
with metadata for routing, tracing, and message context.
# Arguments:
- `sendTo::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt")
- `payloads::AbstractArray{msgPayload_v1}` - List of payloads to include in the message
# Keyword Arguments:
- `correlationId::String = ""` - Unique identifier to track messages across systems; auto-generated if empty
- `msgId::String = ""` - Unique message identifier; auto-generated if empty
- `timestamp::String = string(Dates.now())` - Message publication timestamp
- `msgPurpose::String = ""` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
- `senderName::String = ""` - Name of the sender (e.g., "agent-wine-web-frontend")
- `senderId::String = ""` - UUID of the sender; auto-generated if empty
- `receiverName::String = ""` - Name of the receiver (empty string means broadcast)
- `receiverId::String = ""` - UUID of the receiver (empty string means broadcast)
- `replyTo::String = ""` - Topic where receiver should reply (empty string if no reply expected)
- `replyToMsgId::String = ""` - Message ID this message is replying to
- `brokerURL::String = DEFAULT_NATS_URL` - NATS broker URL
- `metadata::Dict{String, Any} = Dict{String, Any}()` - Optional message-level metadata
# Return:
- A msgEnvelope_v1 struct instance
# Example
```jldoctest
using UUIDs, NATSBridge
# Create payloads for the message
payload1 = msgPayload_v1("Hello", "text"; dataname="message", transport="direct", encoding="base64")
payload2 = msgPayload_v1("http://example.com/file.zip", "binary"; dataname="file", transport="link")
# Create message envelope
env = msgEnvelope_v1(
"my.subject",
[payload1, payload2];
correlationId = string(uuid4()),
msgPurpose = "chat",
senderName = "my-app",
receiverName = "receiver-app",
replyTo = "reply.subject"
)
```
"""
struct msgEnvelope_v1
correlationId::String # Unique identifier to track messages across systems. Many senders can talk about the same topic.
msgId::String # this message id
timestamp::String # message published timestamp. string(Dates.now())
sendTo::String # topic/subject the sender sends to e.g. "/agent/wine/api/v1/prompt"
msgPurpose::String # purpose of this message e.g. "ACK | NACK | updateStatus | shutdown | ..."
senderName::String # sender name (String) e.g. "agent-wine-web-frontend"
senderId::String # sender id e.g. uuid4snakecase()
receiverName::String # msg receiver name (String) e.g. "agent-backend"
receiverId::String # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
replyTo::String # sender ask receiver to reply to this topic
replyToMsgId::String # the message id this message is replying to
brokerURL::String # mqtt/NATS server address
metadata::Dict{String, Any}
payloads::AbstractArray{msgPayload_v1} # multiple payload store here
end
# constructor
function msgEnvelope_v1(
sendTo::String,
payloads::AbstractArray{msgPayload_v1};
correlationId::String = "",
msgId::String = "",
timestamp::String = string(Dates.now()),
msgPurpose::String = "",
senderName::String = "",
senderId::String = "",
receiverName::String = "",
receiverId::String = "",
replyTo::String = "",
replyToMsgId::String = "",
brokerURL::String = DEFAULT_NATS_URL,
metadata::Dict{String, Any} = Dict{String, Any}()
)
return msgEnvelope_v1(
correlationId,
msgId,
timestamp,
sendTo,
msgPurpose,
senderName,
senderId,
receiverName,
receiverId,
replyTo,
replyToMsgId,
brokerURL,
metadata,
payloads
)
end
""" envelope_to_json - Convert msgEnvelope_v1 to JSON string
This function converts the msgEnvelope_v1 struct to a JSON string representation,
preserving all metadata and payload information for NATS message publishing.
# Function Workflow:
1. Creates a dictionary with envelope metadata (correlationId, msgId, timestamp, etc.)
2. Conditionally includes metadata dictionary if not empty
3. Iterates through payloads and converts each to JSON-compatible dictionary
4. Handles direct transport payloads (Base64 encoding) and link transport payloads (URL)
5. Returns final JSON string representation
# Arguments:
- `env::msgEnvelope_v1` - The msgEnvelope_v1 struct to convert to JSON
# Return:
- `String` - JSON string representation of the envelope
# Example
```jldoctest
using UUIDs
# Create an envelope with payloads
payload = msgPayload_v1("Hello", "text"; dataname="msg", transport="direct", encoding="base64")
env = msgEnvelope_v1("my.subject", [payload])
# Convert to JSON for publishing
json_msg = envelope_to_json(env)
```
"""
function envelope_to_json(env::msgEnvelope_v1)
obj = Dict{String, Any}(
"correlationId" => env.correlationId,
"msgId" => env.msgId,
"timestamp" => env.timestamp,
"sendTo" => env.sendTo,
"msgPurpose" => env.msgPurpose,
"senderName" => env.senderName,
"senderId" => env.senderId,
"receiverName" => env.receiverName,
"receiverId" => env.receiverId,
"replyTo" => env.replyTo,
"replyToMsgId" => env.replyToMsgId,
"brokerURL" => env.brokerURL
)
if !isempty(env.metadata) # Only include metadata if it exists and is not empty
obj["metadata"] = Dict(String(k) => v for (k, v) in env.metadata)
end
# Convert payloads to JSON array
if !isempty(env.payloads)
payloads_json = []
for payload in env.payloads
payload_obj = Dict{String, Any}(
"id" => payload.id,
"dataname" => payload.dataname,
"type" => payload.type,
"transport" => payload.transport,
"encoding" => payload.encoding,
"size" => payload.size,
)
# Include data based on transport type
if payload.transport == "direct" && payload.data !== nothing
if payload.encoding == "base64" || payload.encoding == "json"
payload_obj["data"] = payload.data
else
# For other encodings, use base64
payload_bytes = _get_payload_bytes(payload.data)
payload_obj["data"] = Base64.base64encode(payload_bytes)
end
elseif payload.transport == "link" && payload.data !== nothing
# For link transport, data is a URL string - include directly
payload_obj["data"] = payload.data
end
if !isempty(payload.metadata)
payload_obj["metadata"] = Dict(String(k) => v for (k, v) in payload.metadata)
end
push!(payloads_json, payload_obj)
end
obj["payloads"] = payloads_json
end
JSON.json(obj)
end
""" log_trace - Log a trace message with correlation ID and timestamp
This function logs information messages with a correlation ID for tracing purposes,
making it easier to track message flow across distributed systems.
# Arguments:
- `correlation_id::String` - Correlation ID to identify the message flow
- `message::String` - The message content to log
# Return:
- `nothing` - This function performs logging but returns nothing
# Example
```jldoctest
using Dates
log_trace("abc123", "Starting message processing")
# Logs: [2026-02-21T05:39:00] [Correlation: abc123] Starting message processing
```
"""
function log_trace(correlation_id::String, message::String)
timestamp = Dates.now() # Get current timestamp
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
end
""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
This function intelligently routes data delivery based on payload size relative to a threshold.
If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS.
Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_upload`) and publishes only the download URL over NATS.
The function accepts a list of (dataname, data, type) tuples as input and processes each payload individually.
Each payload can have a different type, enabling mixed-content messages (e.g., chat with text, images, audio).
# Function Workflow:
1. Iterates through the list of (dataname, data, type) tuples
2. For each payload: extracts the type from the tuple and serializes accordingly
3. Compares the serialized size against `size_threshold`
4. For small payloads: encodes as Base64, constructs a "direct" msgPayload_v1
5. For large payloads: uploads to the fileserver, constructs a "link" msgPayload_v1 with the URL
# Arguments:
- `subject::String` - NATS subject to publish the message to
- `data::AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples to send
- `dataname::String` - Name of the payload
- `data::Any` - The actual data to send
- `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
- No standalone `type` parameter - type is specified per payload
# Keyword Arguments:
- `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server
- `fileserverUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
- `sender_name::String = "NATSBridge"` - Name of the sender
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
- `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast)
- `reply_to::String = ""` - Topic to reply to (empty string if no reply expected)
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
# Return:
- A `msgEnvelope_v1` object containing metadata and transport information
# Example
```jldoctest
using UUIDs
# Send a single payload (still wrapped in a list)
data = Dict("key" => "value")
env = smartsend("my.subject", [("dataname1", data, "dictionary")])
# Send multiple payloads in one message with different types
data1 = Dict("key1" => "value1")
data2 = rand(10_000) # Small array
env = smartsend("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")])
# Send a large array using fileserver upload
data = rand(10_000_000) # ~80 MB
env = smartsend("large.data", [("large_table", data, "table")])
# Mixed content (e.g., chat with text and image)
env = smartsend("chat.subject", [
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
])
```
"""
function smartsend(
subject::String, # smartreceive's subject
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples
nats_url::String = DEFAULT_NATS_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = ""
) where {T1<:Any}
# Generate correlation ID if not provided
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
# Generate message metadata
msg_id = string(uuid4())
# Process each payload in the list
payloads = msgPayload_v1[]
for (dataname, payload_data, payload_type) in data
# Serialize data based on type
payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = length(payload_bytes) # Calculate payload size in bytes
log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size
# Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport
# Direct path - Base64 encode and send via NATS
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
# Create msgPayload_v1 for direct transport
payload = msgPayload_v1(
payload_b64,
payload_type;
id = string(uuid4()),
dataname = dataname,
transport = "direct",
encoding = "base64",
size = payload_size,
metadata = Dict{String, Any}("payload_bytes" => payload_size)
)
push!(payloads, payload)
else
# Link path - Upload to HTTP server, send URL via NATS
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
# Upload to HTTP server
response = fileserverUploadHandler(fileserver_url, dataname, payload_bytes)
if response["status"] != 200 # Check if upload was successful
error("Failed to upload data to fileserver: $(response["status"])") # Throw error if upload failed
end
url = response["url"] # URL for the uploaded data
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
# Create msgPayload_v1 for link transport
payload = msgPayload_v1(
url,
payload_type;
id = string(uuid4()),
dataname = dataname,
transport = "link",
encoding = "none",
size = payload_size,
metadata = Dict{String, Any}()
)
push!(payloads, payload)
end
end
# Create msgEnvelope_v1 with all payloads
env = msgEnvelope_v1(
subject,
payloads;
correlationId = cid,
msgId = msg_id,
msgPurpose = msg_purpose,
senderName = sender_name,
senderId = string(uuid4()),
receiverName = receiver_name,
receiverId = receiver_id,
replyTo = reply_to,
replyToMsgId = reply_to_msg_id,
brokerURL = nats_url,
metadata = Dict{String, Any}(),
)
msg_json = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
return env # Return the envelope for tracking
end
""" _serialize_data - Serialize data according to specified format
This function serializes arbitrary Julia data into a binary representation based on the specified format.
It supports multiple serialization formats for different data types.
# Function Workflow:
1. Validates the data type against the specified format
2. Converts data to binary representation according to format rules
3. For text: converts string to UTF-8 bytes
4. For dictionary: serializes as JSON then converts to bytes
5. For table: uses Arrow.jl to write as IPC stream
6. For image/audio/video/binary: returns binary data directly
# Arguments:
- `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`)
- `type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
# Return:
- `Vector{UInt8}` - Binary representation of the serialized data
# Throws:
- `Error` if `type` is not one of the supported types
- `Error` if `type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}`
# Example
```jldoctest
using JSON, Arrow, DataFrames
# Text serialization
text_data = "Hello, World!"
text_bytes = _serialize_data(text_data, "text")
# JSON serialization
json_data = Dict("name" => "Alice", "age" => 30)
json_bytes = _serialize_data(json_data, "dictionary")
# Table serialization with a DataFrame (recommended for tabular data)
df = DataFrame(id = 1:3, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92])
table_bytes = _serialize_data(df, "table")
# Image data (Vector{UInt8})
image_bytes = UInt8[1, 2, 3] # Image bytes
image_serialized = _serialize_data(image_bytes, "image")
# Audio data (Vector{UInt8})
audio_bytes = UInt8[1, 2, 3] # Audio bytes
audio_serialized = _serialize_data(audio_bytes, "audio")
# Video data (Vector{UInt8})
video_bytes = UInt8[1, 2, 3] # Video bytes
video_serialized = _serialize_data(video_bytes, "video")
# Binary data (IOBuffer)
buf = IOBuffer()
write(buf, "hello")
binary_bytes = _serialize_data(buf, "binary")
# Binary data (already bytes)
binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary")
```
"""
function _serialize_data(data::Any, type::String)
""" Example on how JSON.jl convert: dictionary -> json string -> json string bytes -> json string -> json object
d = Dict(
"name"=>"ton",
"age"=> 20,
"metadata" => Dict(
"height"=> 155,
"wife"=> "jane"
)
)
json_str = JSON.json(d)
json_str_bytes = Vector{UInt8}(json_str)
json_str_2 = String(json_str_bytes)
json_obj = JSON.parse(json_str_2)
"""
if type == "text" # Text data - convert to UTF-8 bytes
if isa(data, String)
data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes
return data_bytes
else
error("Text data must be a String")
end
elseif type == "dictionary" # JSON data - serialize directly
json_str = JSON.json(data) # Convert Julia data to JSON string
json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes
return json_str_bytes
elseif type == "table" # Table data - convert to Arrow IPC stream
io = IOBuffer() # Create in-memory buffer
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
return take!(io) # Return the buffer contents as bytes
elseif type == "image" # Image data - treat as binary
if isa(data, Vector{UInt8})
return data # Return binary data directly
else
error("Image data must be Vector{UInt8}")
end
elseif type == "audio" # Audio data - treat as binary
if isa(data, Vector{UInt8})
return data # Return binary data directly
else
error("Audio data must be Vector{UInt8}")
end
elseif type == "video" # Video data - treat as binary
if isa(data, Vector{UInt8})
return data # Return binary data directly
else
error("Video data must be Vector{UInt8}")
end
elseif type == "binary" # Binary data - treat as binary
if isa(data, IOBuffer) # Check if data is an IOBuffer
return take!(data) # Return buffer contents as bytes
elseif isa(data, Vector{UInt8}) # Check if data is already binary
return data # Return binary data directly
else # Unsupported binary data type
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
end
else # Unknown type
error("Unknown type: $type")
end
end
""" publish_message - Publish message to NATS
This internal function publishes a message to a NATS subject with proper
connection management and logging.
# Arguments:
- `nats_url::String` - NATS server URL (e.g., "nats://localhost:4222")
- `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
- `message::String` - JSON message to publish
- `correlation_id::String` - Correlation ID for tracing and logging
# Return:
- `nothing` - This function performs publishing but returns nothing
# Example
```jldoctest
using NATS
# Prepare JSON message
message = "{\"correlationId\":\"abc123\",\"payload\":\"test\"}"
# Publish to NATS
publish_message("nats://localhost:4222", "my.subject", message, "abc123")
```
"""
function publish_message(nats_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(nats_url) # Create NATS connection
try
NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject") # Log successful publish
finally
NATS.drain(conn) # Ensure connection is closed properly
end
end
""" smartreceive - Receive and process messages from NATS
This function processes incoming NATS messages, handling both direct transport
(base64 decoded payloads) and link transport (URL-based payloads).
It deserializes the data based on the transport type and returns the result.
A HTTP file server is required along with its download function.
# Function Workflow:
1. Parses the JSON envelope from the NATS message
2. Iterates through each payload in the envelope
3. For each payload: determines the transport type (direct or link)
4. For direct transport: decodes Base64 payload and deserializes based on type
5. For link transport: fetches data from URL with exponential backoff, then deserializes
# Arguments:
- `msg::NATS.Msg` - NATS message to process
# Keyword Arguments:
- `fileserverDownloadHandler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs
- `max_retries::Int = 5` - Maximum retry attempts for fetching URL
- `base_delay::Int = 100` - Initial delay for exponential backoff in ms
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
# Return:
- `AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples
# Example
```jldoctest
# Receive and process message
msg = nats_message # NATS message
payloads = smartreceive(msg; fileserverDownloadHandler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# payloads = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]
```
"""
function smartreceive(
msg::NATS.Msg;
fileserverDownloadHandler::Function=_fetch_with_backoff,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)
# Parse the JSON envelope
json_data = JSON.parse(String(msg.payload))
log_trace(json_data["correlationId"], "Processing received message") # Log message processing start
# Process all payloads in the envelope
payloads_list = Tuple{String, Any, String}[]
# Get number of payloads
num_payloads = length(json_data["payloads"])
for i in 1:num_payloads
payload = json_data["payloads"][i]
transport = String(payload["transport"])
dataname = String(payload["dataname"])
if transport == "direct" # Direct transport - payload is in the message
log_trace(json_data["correlationId"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
# Extract base64 payload from the payload
payload_b64 = String(payload["data"])
# Decode Base64 payload
payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes
# Deserialize based on type
data_type = String(payload["type"])
data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"])
push!(payloads_list, (dataname, data, data_type))
elseif transport == "link" # Link transport - payload is at URL
# Extract download URL from the payload
url = String(payload["data"])
log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
# Fetch with exponential backoff using the download handler
downloaded_data = fileserverDownloadHandler(url, max_retries, base_delay, max_delay, json_data["correlationId"])
# Deserialize based on type
data_type = String(payload["type"])
data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"])
push!(payloads_list, (dataname, data, data_type))
else # Unknown transport type
error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
end
end
json_data["payloads"] = payloads_list
return json_data # Return envelope with list of (dataname, data, data_type) tuples in payloads field
end
""" _fetch_with_backoff - Fetch data from URL with exponential backoff
This internal function retrieves data from a URL with retry logic using
exponential backoff to handle transient failures.
# Function Workflow:
1. Initializes delay with base_delay value
2. Attempts to fetch data from URL in a retry loop
3. On success: logs success and returns response body as bytes
4. On failure: sleeps using exponential backoff and retries
5. After max_retries: throws error indicating failure
# Arguments:
- `url::String` - URL to fetch from
- `max_retries::Int` - Maximum number of retry attempts
- `base_delay::Int` - Initial delay in milliseconds
- `max_delay::Int` - Maximum delay in milliseconds
- `correlation_id::String` - Correlation ID for logging
# Return:
- `Vector{UInt8}` - Fetched data as bytes
# Throws:
- `Error` if all retry attempts fail
# Example
```jldoctest
# Fetch data with exponential backoff
data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "correlation123")
```
"""
function _fetch_with_backoff(
url::String,
max_retries::Int,
base_delay::Int,
max_delay::Int,
correlation_id::String
)
delay = base_delay # Initialize delay with base delay value
for attempt in 1:max_retries # Attempt to fetch data up to max_retries times
try
response = HTTP.request("GET", url) # Make HTTP GET request to URL
if response.status == 200 # Check if request was successful
log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") # Log success
return response.body # Return response body as bytes
else # Request failed
error("Failed to fetch: $(response.status)") # Throw error for non-200 status
end
catch e # Handle exceptions during fetch
log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") # Log failure
if attempt < max_retries # Only sleep if not the last attempt
sleep(delay / 1000.0) # Sleep for delay seconds (convert from ms)
delay = min(delay * 2, max_delay) # Double delay for next attempt, capped at max_delay
end
end
end
error("Failed to fetch data after $max_retries attempts") # Throw error if all attempts failed
end
""" _deserialize_data - Deserialize bytes to data based on type
This internal function converts serialized bytes back to Julia data based on type.
It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow IPC deserialization),
"image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data).
# Function Workflow:
1. Validates the data type against supported formats
2. Converts bytes to appropriate Julia data type based on format
3. For text: converts bytes to string
4. For dictionary: converts bytes to JSON string then parses to Julia object
5. For table: reads Arrow IPC format and returns DataFrame
6. For image/audio/video/binary: returns bytes directly
# Arguments:
- `data::Vector{UInt8}` - Serialized data as bytes
- `type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
- `correlation_id::String` - Correlation ID for logging
# Return:
- Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary")
# Throws:
- `Error` if `type` is not one of the supported types
# Example
```jldoctest
# Text data
text_bytes = UInt8["Hello World"]
text_data = _deserialize_data(text_bytes, "text", "correlation123")
# JSON data
json_bytes = UInt8[123, 34, 110, 97, 109, 101, 34, 58, 34, 65, 108, 105, 99, 101, 125] # {"name":"Alice"}
json_data = _deserialize_data(json_bytes, "dictionary", "correlation123")
# Arrow IPC data (table)
arrow_bytes = UInt8[1, 2, 3] # Arrow IPC bytes
table_data = _deserialize_data(arrow_bytes, "table", "correlation123")
```
"""
function _deserialize_data(
data::Vector{UInt8},
type::String,
correlation_id::String
)
if type == "text" # Text data - convert to string
return String(data) # Convert bytes to string
elseif type == "dictionary" # JSON data - deserialize
json_str = String(data) # Convert bytes to string
return JSON.parse(json_str) # Parse JSON string to JSON object
elseif type == "table" # Table data - deserialize Arrow IPC stream
io = IOBuffer(data) # Create buffer from bytes
df = Arrow.Table(io) # Read Arrow IPC format from buffer
return df # Return DataFrame
elseif type == "image" # Image data - return binary
return data # Return bytes directly
elseif type == "audio" # Audio data - return binary
return data # Return bytes directly
elseif type == "video" # Video data - return binary
return data # Return bytes directly
elseif type == "binary" # Binary data - return binary
return data # Return bytes directly
else # Unknown type
error("Unknown type: $type") # Throw error for unknown type
end
end
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
This function uploads a raw byte array to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
# Function Workflow:
1. Creates a one-shot upload session by sending POST request with `{"OneShot": true}`
2. Retrieves upload ID and token from server response
3. Uploads binary data as multipart form data using the token
4. Returns identifiers and download URL for the uploaded file
# Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `filename::String` - Name of the file being uploaded
- `data::Vector{UInt8}` - Raw byte data of the file content
# Return:
- `Dict{String, Any}` - Dictionary with keys:
- `"status"` - HTTP server response status
- `"uploadid"` - ID of the one-shot upload session
- `"fileid"` - ID of the uploaded file within the session
- `"url"` - Full URL to download the uploaded file
# Example
```jldoctest
using HTTP, JSON
fileServerURL = "http://localhost:8080"
filename = "test.txt"
data = UInt8["hello world"]
# Upload to local plik server
result = plik_oneshot_upload(fileServerURL, filename, data)
# Access the result as a Dict
# result["status"], result["uploadid"], result["fileid"], result["url"]
```
"""
function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8})
# ----------------------------------------- get upload id ---------------------------------------- #
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
url_getUploadID = "$fileServerURL/upload" # URL to get upload ID
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(httpResponse.body)
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
# ------------------------------------------ upload file ----------------------------------------- #
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload
url_upload = "$fileServerURL/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
# Create the multipart form data
form = HTTP.Form(Dict(
"file" => file_multipart
))
# Execute the POST request
httpResponse = nothing
try
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(httpResponse.body)
catch e
@error "Request failed" exception=e
end
fileid = responseJson["id"]
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
url = "$fileServerURL/file/$uploadid/$fileid/$filename"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
""" plik_oneshot_upload(fileServerURL::String, filepath::String)
This function uploads a file from disk to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
# Function Workflow:
1. Creates a one-shot upload session by sending POST request with `{"OneShot": true}`
2. Retrieves upload ID and token from server response
3. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header
4. Returns identifiers and download URL for the uploaded file
# Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `filepath::String` - Full path to the local file to upload
# Return:
- `Dict{String, Any}` - Dictionary with keys:
- `"status"` - HTTP server response status
- `"uploadid"` - ID of the one-shot upload session
- `"fileid"` - ID of the uploaded file within the session
- `"url"` - Full URL to download the uploaded file
# Example
```jldoctest
using HTTP, JSON
fileServerURL = "http://localhost:8080"
filepath = "./test.zip"
# Upload to local plik server
result = plik_oneshot_upload(fileServerURL, filepath)
# Access the result as a Dict
# result["status"], result["uploadid"], result["fileid"], result["url"]
```
"""
function plik_oneshot_upload(fileServerURL::String, filepath::String)
# ----------------------------------------- get upload id ---------------------------------------- #
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
filename = basename(filepath)
url_getUploadID = "$fileServerURL/upload" # URL to get upload ID
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(httpResponse.body)
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
# ------------------------------------------ upload file ----------------------------------------- #
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
file_multipart = open(filepath, "r")
url_upload = "$fileServerURL/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
# Create the multipart form data
form = HTTP.Form(Dict(
"file" => file_multipart
))
# Execute the POST request
httpResponse = nothing
try
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(httpResponse.body)
catch e
@error "Request failed" exception=e
end
fileid = responseJson["id"]
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
url = "$fileServerURL/file/$uploadid/$fileid/$filename"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
end # module