architecture.md rev1

This commit is contained in:
2026-02-14 13:04:28 +07:00
parent d9fd7a61bb
commit f0df169689
3 changed files with 673 additions and 299 deletions

View File

@@ -18,12 +18,12 @@ const DEFAULT_FILESERVER_URL = "http://localhost:8080/upload" # Default HTTP fi
struct msgPayload_v1
id::String # id of this payload e.g. "uuid4"
dataname::String # name of this payload e.g. "login_image"
type::String # "text | json | table | image | audio | video | binary"
type::String # this payload type. Can be "text | json | table | image | audio | video | binary"
transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc"
size::Integer # data size in bytes e.g. 15433
data::Any # payload data in case of direct transport or a URL in case of link
metadata::Dict{String, Any} # Dict("checksum=> "sha256_hash", ...)
metadata::Dict{String, Any} # Dict("checksum=> "sha256_hash", ...) This metadata is for this payload
end
# constructor
@@ -56,15 +56,15 @@ struct msgEnvelope_v1
timestamp::String # message published timestamp. string(Dates.now())
sendTo::String # topic/subject the sender sends to e.g. "/agent/wine/api/v1/prompt"
msgPurpose::String # purpose of this message e.g. "ACK | NACK | updateStatus | shutdown | ..."
senderName::String # sender name (String) e.g. "agent-wine-web-frontend"
msgPurpose::String # purpose of this message e.g. "ACK | NACK | updateStatus | shutdown | ..."
senderName::String # sender name (String) e.g. "agent-wine-web-frontend"
senderId::String # sender id e.g. uuid4snakecase()
receiverName::String # msg receiver name (String) e.g. "agent-backend"
receiverId::String # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
replyTo::String # sender ask receiver to reply to this topic
replyToMsgId::String # the message id this message is replying to
BrokerURL::String # mqtt/NATS server address
brokerURL::String # mqtt/NATS server address
metadata::Dict{String, Any}
payloads::AbstractArray{msgPayload_v1} # multiple payload store here
@@ -83,7 +83,7 @@ function msgEnvelope_v1(
receiverId::String = "",
replyTo::String = "",
replyToMsgId::String = "",
BrokerURL::String = DEFAULT_NATS_URL,
brokerURL::String = DEFAULT_NATS_URL,
metadata::Dict{String, Any} = Dict{String, Any}(),
payloads::AbstractArray{msgPayload_v1} = msgPayload_v1[]
)
@@ -99,7 +99,7 @@ function msgEnvelope_v1(
receiverId,
replyTo,
replyToMsgId,
BrokerURL,
brokerURL,
metadata,
payloads
)
@@ -107,94 +107,74 @@ end
""" Struct for the unified JSON envelope
This struct represents a standardized message format that can carry either
direct payload data or a URL reference, allowing flexible transport strategies
based on payload size and requirements.
""" Convert msgEnvelope_v1 to JSON string
This function converts the msgEnvelope_v1 struct to a JSON string representation.
"""
struct MessageEnvelope
correlation_id::String # Unique identifier to track messages across systems
type::String # Data type indicator (e.g., "json", "table", "binary")
transport::String # Transport strategy: "direct" (base64 encoded bytes) or "link" (URL reference)
payload::Union{String, Nothing} # Base64-encoded payload for direct transport
url::Union{String, Nothing} # URL reference for link transport
metadata::Dict{String, Any} # Additional metadata about the payload
end
""" Constructor for MessageEnvelope with keyword arguments and defaults
This constructor provides a convenient way to create an envelope using keyword arguments,
automatically generating a correlation ID if not provided, and defaulting to "json" type
and "direct" transport.
"""
function MessageEnvelope(
; correlation_id::String = string(uuid4()), # Generate unique ID if not provided
type::String = "json", # Default data type
transport::String = "direct", # Default transport method
payload::Union{String, Nothing} = nothing, # No payload by default
url::Union{String, Nothing} = nothing, # No URL by default
metadata::Dict{String, Any} = Dict{String, Any}() # Empty metadata by default
)
MessageEnvelope(correlation_id, type, transport, payload, url, metadata)
end
""" Constructor for MessageEnvelope from JSON string
This constructor parses a JSON string and reconstructs a MessageEnvelope struct.
It handles the metadata field specially by converting the JSON object to a Julia Dict,
extracting values from the JSON structure for all other fields.
"""
function MessageEnvelope(json_str::String)
data = JSON.parse(json_str) # Parse JSON string into Julia data structure
metadata = Dict{String, Any}()
if haskey(data, :metadata) # Check if metadata exists in JSON
metadata = Dict(String(k) => v for (k, v) in data.metadata) # Convert JSON keys to strings and store in Dict
end
MessageEnvelope(
correlation_id = String(data.correlation_id), # Extract correlation_id from JSON data
type = String(data.type), # Extract type from JSON data
transport = String(data.transport), # Extract transport from JSON data
payload = haskey(data, :payload) ? String(data.payload) : nothing, # Extract payload if present
url = haskey(data, :url) ? String(data.url) : nothing, # Extract URL if present
metadata = metadata # Use the parsed metadata
)
end
""" Convert MessageEnvelope to JSON string
This function converts the MessageEnvelope struct to a JSON string representation.
It only includes fields in the JSON output if they have non-nothing values,
making the JSON output cleaner and more efficient.
"""
function envelope_to_json(env::MessageEnvelope)
function envelope_to_json(env::msgEnvelope_v1)
obj = Dict{String, Any}(
"correlation_id" => env.correlation_id, # Always include correlation_id
"type" => env.type, # Always include type
"transport" => env.transport # Always include transport
"correlationId" => env.correlationId,
"msgId" => env.msgId,
"timestamp" => env.timestamp,
"sendTo" => env.sendTo,
"msgPurpose" => env.msgPurpose,
"senderName" => env.senderName,
"senderId" => env.senderId,
"receiverName" => env.receiverName,
"receiverId" => env.receiverId,
"replyTo" => env.replyTo,
"replyToMsgId" => env.replyToMsgId,
"brokerURL" => env.brokerURL
)
if env.payload !== nothing # Only include payload if it exists
obj["payload"] = env.payload
end
if env.url !== nothing # Only include URL if it exists
obj["url"] = env.url
end
if !isempty(env.metadata) # Only include metadata if it exists and is not empty
obj["metadata"] = env.metadata
end
JSON.json(obj) # Convert Dict to JSON string
if !isempty(env.metadata) # Only include metadata if it exists and is not empty
obj["metadata"] = Dict(String(k) => v for (k, v) in env.metadata)
end
# Convert payloads to JSON array
if !isempty(env.payloads)
payloads_json = []
for payload in env.payloads
payload_obj = Dict{String, Any}(
"id" => payload.id,
"dataname" => payload.dataname,
"type" => payload.type,
"transport" => payload.transport,
"encoding" => payload.encoding,
"size" => payload.size
)
# Include data based on transport type
if payload.transport == "direct" && payload.data !== nothing
if payload.encoding == "base64" || payload.encoding == "json"
payload_obj["data"] = payload.data
else
# For other encodings, use base64
payload_bytes = _get_payload_bytes(payload.data)
payload_obj["data"] = Base64.base64encode(payload_bytes)
end
end
if !isempty(payload.metadata)
payload_obj["metadata"] = Dict(String(k) => v for (k, v) in payload.metadata)
end
push!(payloads_json, payload_obj)
end
obj["payloads"] = payloads_json
end
JSON.json(obj)
end
""" Helper function to get payload bytes from data
"""
function _get_payload_bytes(data::Any)
# This is a placeholder - actual implementation depends on data type
if isa(data, Vector{UInt8})
return data
elseif isa(data, String)
return bytes(data)
else
return String(data)
end
end
""" Log a trace message with correlation ID and timestamp
This function logs information messages with a correlation ID for tracing purposes,
@@ -215,8 +195,8 @@ Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_u
The function workflow:
1. Serializes the provided data according to the specified format (`type`)
2. Compares the serialized size against `size_threshold`
3. For small payloads: encodes as Base64, constructs a "direct" MessageEnvelope, and publishes to NATS
4. For large payloads: uploads to the fileserver, constructs a "link" MessageEnvelope with the URL, and publishes to NATS
3. For small payloads: encodes as Base64, constructs a "direct" msgEnvelope_v1, and publishes to NATS
4. For large payloads: uploads to the fileserver, constructs a "link" msgEnvelope_v1 with the URL, and publishes to NATS
# Arguments:
- `subject::String` - NATS subject to publish the message to
@@ -230,15 +210,15 @@ The function workflow:
- `fileServerUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must match signature of `plik_oneshot_upload`)
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
- `sender_name::String = "NATSBridge"` - Name of the sender
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
- `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast)
- `reply_to::String = ""` - Topic to reply to (empty string if no reply expected)
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
# Return:
- A `MessageEnvelope` object containing metadata and transport information:
- `correlation_id::String` - Unique identifier for this message exchange
- `type::String` - Serialization type used (`"json"` or `"arrow"`)
- `transport::String` - Either `"direct"` or `"link"`
- `payload::Union{String, Nothing}` - Base64-encoded data for direct transport, `nothing` for link transport
- `url::Union{String, Nothing}` - Download URL for link transport, `nothing` for direct transport
- `metadata::Dict` - Additional metadata (e.g., `"content_length"`, `"format"`)
- A `msgEnvelope_v1` object containing metadata and transport information
# Example
```julia
@@ -251,11 +231,6 @@ env = smartsend("my.subject", data, "json")
# Send a large array using fileserver upload
data = rand(10_000_000) # ~80 MB
env = smartsend("large.data", data, "arrow")
# In another process, retrieve and deserialize:
# msg = subscribe(nats_url, "my.subject")
# env = json_to_envelope(msg.data)
# data = _deserialize_data(Base64.decode(env.payload), env.type)
```
"""
function smartsend(
@@ -267,7 +242,13 @@ function smartsend(
fileserver_url::String = DEFAULT_FILESERVER_URL,
fileServerUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing
correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
receiver_id::String = "",
reply_to::String = "",
reply_to_msg_id::String = ""
)
# Generate correlation ID if not provided
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
@@ -280,51 +261,99 @@ function smartsend(
payload_size = length(payload_bytes) # Calculate payload size in bytes
log_trace(cid, "Serialized payload size: $payload_size bytes") # Log payload size
# Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport
# Direct path - Base64 encode and send via NATS
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
env = MessageEnvelope( # Create envelope for direct transport
correlation_id = cid,
type = type,
transport = "direct",
payload = payload_b64,
metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream")
)
msg_json = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
return env # Return the envelope for tracking
else
# Link path - Upload to HTTP server, send URL via NATS
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
# Upload to HTTP server
response = fileServerUploadHandler(fileserver_url, dataname, payload_bytes)
if response[:status] != 200 # Check if upload was successful
error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed
end
url = response[:url] # URL for the uploaded data
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
# Generate unique IDs
msg_id = string(uuid4())
timestamp = string(Dates.now())
# Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport
# Direct path - Base64 encode and send via NATS
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
# Create msgPayload_v1 for direct transport
payload = msgPayload_v1(
id = string(uuid4()),
dataname = dataname,
type = type,
transport = "direct",
encoding = "base64",
size = payload_size,
data = payload_b64,
metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream")
)
# Create msgEnvelope_v1 with all fields populated
env = msgEnvelope_v1(
correlationId = cid,
msgId = msg_id,
timestamp = timestamp,
sendTo = subject,
msgPurpose = msg_purpose,
senderName = sender_name,
senderId = string(uuid4()),
receiverName = receiver_name,
receiverId = receiver_id,
replyTo = reply_to,
replyToMsgId = reply_to_msg_id,
brokerURL = nats_url,
metadata = Dict(),
payloads = [payload]
)
msg_json = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
return env # Return the envelope for tracking
else
# Link path - Upload to HTTP server, send URL via NATS
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
# Upload to HTTP server
response = fileServerUploadHandler(fileserver_url, dataname, payload_bytes)
if response[:status] != 200 # Check if upload was successful
error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed
end
url = response[:url] # URL for the uploaded data
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
env = MessageEnvelope( # Create envelope for link transport
correlation_id = cid,
type = type,
transport = "link",
url = url,
metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream")
)
msg_json = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
return env # Return the envelope for tracking
end
# Create msgPayload_v1 for link transport
payload = msgPayload_v1(
id = string(uuid4()),
dataname = dataname,
type = type,
transport = "link",
encoding = "none",
size = payload_size,
data = url,
metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream")
)
# Create msgEnvelope_v1 with all fields populated
env = msgEnvelope_v1(
correlationId = cid,
msgId = msg_id,
timestamp = timestamp,
sendTo = subject,
msgPurpose = msg_purpose,
senderName = sender_name,
senderId = string(uuid4()),
receiverName = receiver_name,
receiverId = receiver_id,
replyTo = reply_to,
replyToMsgId = reply_to_msg_id,
brokerURL = nats_url,
metadata = Dict(),
payloads = [payload]
)
msg_json = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
return env # Return the envelope for tracking
end
end
@@ -436,7 +465,7 @@ Keyword Arguments:
- `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000)
Return:
- Tuple `(data = deserialized_data, envelope = MessageEnvelope)` - Data and envelope
- Tuple `(data = deserialized_data, envelope = msgEnvelope_v1)` - Data and envelope
"""
function smartreceive(
msg::NATS.Msg;
@@ -445,33 +474,79 @@ function smartreceive(
base_delay::Int = 100,
max_delay::Int = 5000
)
# Parse the envelope
env = MessageEnvelope(String(msg.payload)) # Parse NATS message data as JSON envelope
log_trace(env.correlation_id, "Processing received message") # Log message processing start
# Parse the JSON envelope
json_data = JSON.parse(String(msg.payload))
# Check transport type
if env.transport == "direct" # Direct transport - payload is in the message
log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling
# Get transport from the first payload
transport = String(json_data["payloads"][1]["transport"])
log_trace(json_data["correlationId"], "Processing received message") # Log message processing start
if transport == "direct" # Direct transport - payload is in the message
log_trace(json_data["correlationId"], "Direct transport - decoding payload") # Log direct transport handling
# Extract base64 payload from the first payload
payload_b64 = String(json_data["payloads"][1]["data"])
# Decode Base64 payload
payload_bytes = Base64.base64decode(env.payload) # Decode base64 payload to bytes
payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes
# Deserialize based on type
data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
data_type = String(json_data["payloads"][1]["type"])
data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"], Dict{String, Any}())
# Create msgEnvelope_v1 from parsed data
env = msgEnvelope_v1(
correlationId = json_data["correlationId"],
msgId = haskey(json_data, "msgId") ? String(json_data["msgId"]) : "",
timestamp = haskey(json_data, "timestamp") ? String(json_data["timestamp"]) : "",
sendTo = json_data["sendTo"],
msgPurpose = haskey(json_data, "msgPurpose") ? String(json_data["msgPurpose"]) : "",
senderName = haskey(json_data, "senderName") ? String(json_data["senderName"]) : "",
senderId = haskey(json_data, "senderId") ? String(json_data["senderId"]) : "",
receiverName = haskey(json_data, "receiverName") ? String(json_data["receiverName"]) : "",
receiverId = haskey(json_data, "receiverId") ? String(json_data["receiverId"]) : "",
replyTo = haskey(json_data, "replyTo") ? String(json_data["replyTo"]) : "",
replyToMsgId = haskey(json_data, "replyToMsgId") ? String(json_data["replyToMsgId"]) : "",
brokerURL = haskey(json_data, "brokerURL") ? String(json_data["brokerURL"]) : DEFAULT_NATS_URL,
metadata = Dict{String, Any}(),
payloads = msgPayload_v1[]
)
return (data = data, envelope = env) # Return data and envelope as tuple
elseif env.transport == "link" # Link transport - payload is at URL
log_trace(env.correlation_id, "Link transport - fetching from URL") # Log link transport handling
elseif transport == "link" # Link transport - payload is at URL
log_trace(json_data["correlationId"], "Link transport - fetching from URL") # Log link transport handling
# Extract URL from the first payload
url = String(json_data["payloads"][1]["data"])
# Fetch with exponential backoff
downloaded_data = _fetch_with_backoff(env.url, max_retries, base_delay, max_delay, env.correlation_id) # Fetch data from URL
downloaded_data = _fetch_with_backoff(url, max_retries, base_delay, max_delay, json_data["correlationId"]) # Fetch data from URL
# Deserialize based on type
data = _deserialize_data(downloaded_data, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
data_type = String(json_data["payloads"][1]["type"])
data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"], Dict{String, Any}())
# Create msgEnvelope_v1 from parsed data
env = msgEnvelope_v1(
correlationId = json_data["correlationId"],
msgId = haskey(json_data, "msgId") ? String(json_data["msgId"]) : "",
timestamp = haskey(json_data, "timestamp") ? String(json_data["timestamp"]) : "",
sendTo = json_data["sendTo"],
msgPurpose = haskey(json_data, "msgPurpose") ? String(json_data["msgPurpose"]) : "",
senderName = haskey(json_data, "senderName") ? String(json_data["senderName"]) : "",
senderId = haskey(json_data, "senderId") ? String(json_data["senderId"]) : "",
receiverName = haskey(json_data, "receiverName") ? String(json_data["receiverName"]) : "",
receiverId = haskey(json_data, "receiverId") ? String(json_data["receiverId"]) : "",
replyTo = haskey(json_data, "replyTo") ? String(json_data["replyTo"]) : "",
replyToMsgId = haskey(json_data, "replyToMsgId") ? String(json_data["replyToMsgId"]) : "",
brokerURL = haskey(json_data, "brokerURL") ? String(json_data["brokerURL"]) : DEFAULT_NATS_URL,
metadata = Dict{String, Any}(),
payloads = msgPayload_v1[]
)
return (data = data, envelope = env) # Return data and envelope as tuple
else # Unknown transport type
error("Unknown transport type: $(env.transport)") # Throw error for unknown transport
error("Unknown transport type: $(transport)") # Throw error for unknown transport
end
end
@@ -556,21 +631,6 @@ function _deserialize_data(
end
# """ Decode base64 string to bytes
# This internal function decodes a base64-encoded string back to binary data.
# It's a wrapper around Base64.decode for consistency in the module.
# Arguments:
# - `str::String` - Base64-encoded string to decode
# Return:
# - Vector{UInt8} - Decoded binary data
# """
# function base64decode(str::String)
# return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module
# end
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
This function uploads a raw byte array to a plik server in one-shot mode (no upload session).
@@ -609,7 +669,7 @@ status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filename, fil
# to download an uploaded file
curl -L -O "url"
```
""" #[x]
"""
function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8})
# ----------------------------------------- get upload id ---------------------------------------- #
@@ -652,10 +712,6 @@ function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vect
end
""" plik_oneshot_upload(fileServerURL::String, filepath::String)
Upload a single file to a plik server using one-shot mode.
@@ -693,7 +749,7 @@ status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filepath)
# To download the uploaded file later (via curl as example):
curl -L -O "url"
```
""" #[x]
"""
function plik_oneshot_upload(fileServerURL::String, filepath::String)
# ----------------------------------------- get upload id ---------------------------------------- #
@@ -739,30 +795,4 @@ function plik_oneshot_upload(fileServerURL::String, filepath::String)
end
end # module