minor fix
This commit is contained in:
@@ -375,7 +375,7 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
|
||||
- `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads
|
||||
- `fileserver_upload_handler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
|
||||
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
|
||||
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
|
||||
- `correlation_id::String = string(uuid4())` - Correlation ID for tracing (auto-generated UUID)
|
||||
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
||||
- `sender_name::String = "NATSBridge"` - Name of the sender
|
||||
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
|
||||
@@ -384,6 +384,8 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
|
||||
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
|
||||
- `is_publish::Bool = true` - Whether to automatically publish the message to NATS
|
||||
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection (if provided, uses this connection instead of creating a new one; saves connection establishment overhead)
|
||||
- `msg_id::String = string(uuid4())` - Message ID (auto-generated UUID if not provided)
|
||||
- `sender_id::String = string(uuid4())` - Sender ID (auto-generated UUID if not provided)
|
||||
|
||||
# Return:
|
||||
- A tuple `(env, env_json_str)` where:
|
||||
@@ -425,7 +427,16 @@ function smartsend(
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
|
||||
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id::Union{String, Nothing} = nothing,
|
||||
|
||||
#=
|
||||
Generate a globally unique identifier (UUID) at the start of the request.
|
||||
This ID must remain constant and immutable as it propagates through every
|
||||
stage of the execution pipeline. It serves as the end-to-end ID for
|
||||
distributed tracing, enabling the correlation of all logs, metrics, and
|
||||
errors across the system back to this specific request instance.
|
||||
=#
|
||||
correlation_id::String = string(uuid4()),
|
||||
|
||||
msg_purpose::String = "chat",
|
||||
sender_name::String = "NATSBridge",
|
||||
receiver_name::String = "",
|
||||
@@ -433,15 +444,13 @@ function smartsend(
|
||||
reply_to::String = "",
|
||||
reply_to_msg_id::String = "",
|
||||
is_publish::Bool = true, # some time the user want to get env and env_json_str from this function without publishing the msg
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # a provided connection saves establishing connection overhead.
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # a provided connection saves establishing connection overhead.
|
||||
msg_id::String = string(uuid4()), # Message ID
|
||||
sender_id::String = string(uuid4()) # Sender ID
|
||||
) where {T1<:Any}
|
||||
|
||||
# Generate correlation ID if not provided
|
||||
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
|
||||
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
|
||||
|
||||
# Generate message metadata
|
||||
msg_id = string(uuid4())
|
||||
# Log start of send operation
|
||||
log_trace(correlation_id, "Starting smartsend for subject: $subject")
|
||||
|
||||
# Process each payload in the list
|
||||
payloads = msg_payload_v1[]
|
||||
@@ -450,13 +459,13 @@ function smartsend(
|
||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
||||
|
||||
payload_size = length(payload_bytes) # Calculate payload size in bytes
|
||||
log_trace(cid, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size
|
||||
log_trace(correlation_id, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size
|
||||
|
||||
# Decision: Direct vs Link
|
||||
if payload_size < size_threshold # Check if payload is small enough for direct transport
|
||||
# Direct path - Base64 encode and send via NATS
|
||||
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
||||
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
|
||||
log_trace(correlation_id, "Using direct transport for $payload_size bytes") # Log transport choice
|
||||
|
||||
# Create msg_payload_v1 for direct transport
|
||||
payload = msg_payload_v1(
|
||||
@@ -472,7 +481,7 @@ function smartsend(
|
||||
push!(payloads, payload)
|
||||
else
|
||||
# Link path - Upload to HTTP server, send URL via NATS
|
||||
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
|
||||
log_trace(correlation_id, "Using link transport, uploading to fileserver") # Log link transport choice
|
||||
|
||||
# Upload to HTTP server
|
||||
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||||
@@ -482,7 +491,7 @@ function smartsend(
|
||||
end
|
||||
|
||||
url = response["url"] # URL for the uploaded data
|
||||
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
|
||||
log_trace(correlation_id, "Uploaded to URL: $url") # Log successful upload
|
||||
|
||||
# Create msg_payload_v1 for link transport
|
||||
payload = msg_payload_v1(
|
||||
@@ -503,11 +512,11 @@ function smartsend(
|
||||
env = msg_envelope_v1(
|
||||
subject,
|
||||
payloads;
|
||||
correlation_id = cid,
|
||||
correlation_id = correlation_id,
|
||||
msg_id = msg_id,
|
||||
msg_purpose = msg_purpose,
|
||||
sender_name = sender_name,
|
||||
sender_id = string(uuid4()),
|
||||
sender_id = sender_id,
|
||||
receiver_name = receiver_name,
|
||||
receiver_id = receiver_id,
|
||||
reply_to = reply_to,
|
||||
@@ -520,9 +529,9 @@ function smartsend(
|
||||
if is_publish == false
|
||||
# skip publish a message
|
||||
elseif is_publish == true && NATS_connection === nothing
|
||||
publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS
|
||||
publish_message(broker_url, subject, env_json_str, correlation_id) # Publish message to NATS
|
||||
elseif is_publish == true && NATS_connection !== nothing
|
||||
publish_message(NATS_connection, subject, env_json_str, cid) # Publish message to NATS
|
||||
publish_message(NATS_connection, subject, env_json_str, correlation_id) # Publish message to NATS
|
||||
end
|
||||
|
||||
return (env, env_json_str)
|
||||
|
||||
Reference in New Issue
Block a user