From ee2d2c72388af01d95a3f5fa551896cd7c321d87 Mon Sep 17 00:00:00 2001 From: narawat Date: Wed, 4 Mar 2026 10:02:31 +0700 Subject: [PATCH] minor fix --- Project.toml | 2 +- README.md | 6 ++++-- docs/architecture.md | 6 ++++-- docs/implementation.md | 11 ++++++++--- src/NATSBridge.jl | 43 +++++++++++++++++++++++++----------------- 5 files changed, 43 insertions(+), 25 deletions(-) diff --git a/Project.toml b/Project.toml index 7dcfae6..e26314e 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "NATSBridge" uuid = "f2724d33-f338-4a57-b9f8-1be882570d10" -version = "0.4.3" +version = "0.4.4" authors = ["narawat "] [deps] diff --git a/README.md b/README.md index e5885a7..3525db2 100644 --- a/README.md +++ b/README.md @@ -196,7 +196,7 @@ env, env_json_str = NATSBridge.smartsend( fileserver_url = "http://localhost:8080", fileserver_upload_handler::Function = plik_oneshot_upload, size_threshold::Int = 1_000_000, - correlation_id::Union{String, Nothing} = nothing, + correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID) msg_purpose::String = "chat", sender_name::String = "NATSBridge", receiver_name::String = "", @@ -204,7 +204,9 @@ env, env_json_str = NATSBridge.smartsend( reply_to::String = "", reply_to_msg_id::String = "", is_publish::Bool = true, # Whether to automatically publish to NATS - NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead) + NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional, saves connection overhead) + msg_id::String = string(uuid4()), # Message ID (auto-generated UUID) + sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID) ) # Returns: (msgEnvelope_v1, JSON string) # - env: msgEnvelope_v1 object with all envelope metadata and payloads diff --git a/docs/architecture.md b/docs/architecture.md index 47891e3..0c77786 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -307,7 +307,7 @@ function smartsend( fileserver_url = DEFAULT_FILESERVER_URL, fileserver_upload_handler::Function = plik_oneshot_upload, size_threshold::Int = DEFAULT_SIZE_THRESHOLD, - correlation_id::Union{String, Nothing} = nothing, + correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID) msg_purpose::String = "chat", sender_name::String = "NATSBridge", receiver_name::String = "", @@ -315,7 +315,9 @@ function smartsend( reply_to::String = "", reply_to_msg_id::String = "", is_publish::Bool = true, # Whether to automatically publish to NATS - NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead) + NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional, saves connection overhead) + msg_id::String = string(uuid4()), # Message ID (auto-generated UUID) + sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID) ) ``` diff --git a/docs/implementation.md b/docs/implementation.md index 9a14090..2f817f8 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -321,7 +321,7 @@ function smartsend( fileserver_url = DEFAULT_FILESERVER_URL, fileserver_upload_handler::Function = plik_oneshot_upload, size_threshold::Int = DEFAULT_SIZE_THRESHOLD, - correlation_id::Union{String, Nothing} = nothing, + correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID) msg_purpose::String = "chat", sender_name::String = "NATSBridge", receiver_name::String = "", @@ -329,11 +329,16 @@ function smartsend( reply_to::String = "", reply_to_msg_id::String = "", is_publish::Bool = true, - NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional) + NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional) + msg_id::String = string(uuid4()), # Message ID (auto-generated UUID) + sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID) ) ``` -**New Keyword Parameter:** +**New Keyword Parameters:** +- `correlation_id::String = string(uuid4())` - Correlation ID for tracing (auto-generated UUID) +- `msg_id::String = string(uuid4())` - Message ID (auto-generated UUID) +- `sender_id::String = string(uuid4())` - Sender ID (auto-generated UUID) - `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios. **Connection Handling Logic:** diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 66dd9d9..e409089 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -375,7 +375,7 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c - `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads - `fileserver_upload_handler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys) - `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport - - `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated + - `correlation_id::String = string(uuid4())` - Correlation ID for tracing (auto-generated UUID) - `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc. - `sender_name::String = "NATSBridge"` - Name of the sender - `receiver_name::String = ""` - Name of the receiver (empty string means broadcast) @@ -384,6 +384,8 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c - `reply_to_msg_id::String = ""` - Message ID this message is replying to - `is_publish::Bool = true` - Whether to automatically publish the message to NATS - `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection (if provided, uses this connection instead of creating a new one; saves connection establishment overhead) + - `msg_id::String = string(uuid4())` - Message ID (auto-generated UUID if not provided) + - `sender_id::String = string(uuid4())` - Sender ID (auto-generated UUID if not provided) # Return: - A tuple `(env, env_json_str)` where: @@ -425,7 +427,16 @@ function smartsend( fileserver_url = DEFAULT_FILESERVER_URL, fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver size_threshold::Int = DEFAULT_SIZE_THRESHOLD, - correlation_id::Union{String, Nothing} = nothing, + + #= + Generate a globally unique identifier (UUID) at the start of the request. + This ID must remain constant and immutable as it propagates through every + stage of the execution pipeline. It serves as the end-to-end ID for + distributed tracing, enabling the correlation of all logs, metrics, and + errors across the system back to this specific request instance. + =# + correlation_id::String = string(uuid4()), + msg_purpose::String = "chat", sender_name::String = "NATSBridge", receiver_name::String = "", @@ -433,15 +444,13 @@ function smartsend( reply_to::String = "", reply_to_msg_id::String = "", is_publish::Bool = true, # some time the user want to get env and env_json_str from this function without publishing the msg - NATS_connection::Union{NATS.Connection, Nothing} = nothing # a provided connection saves establishing connection overhead. + NATS_connection::Union{NATS.Connection, Nothing} = nothing, # a provided connection saves establishing connection overhead. + msg_id::String = string(uuid4()), # Message ID + sender_id::String = string(uuid4()) # Sender ID ) where {T1<:Any} - # Generate correlation ID if not provided - cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID - log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation - - # Generate message metadata - msg_id = string(uuid4()) + # Log start of send operation + log_trace(correlation_id, "Starting smartsend for subject: $subject") # Process each payload in the list payloads = msg_payload_v1[] @@ -450,13 +459,13 @@ function smartsend( payload_bytes = _serialize_data(payload_data, payload_type) payload_size = length(payload_bytes) # Calculate payload size in bytes - log_trace(cid, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size + log_trace(correlation_id, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size # Decision: Direct vs Link if payload_size < size_threshold # Check if payload is small enough for direct transport # Direct path - Base64 encode and send via NATS payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string - log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice + log_trace(correlation_id, "Using direct transport for $payload_size bytes") # Log transport choice # Create msg_payload_v1 for direct transport payload = msg_payload_v1( @@ -472,7 +481,7 @@ function smartsend( push!(payloads, payload) else # Link path - Upload to HTTP server, send URL via NATS - log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice + log_trace(correlation_id, "Using link transport, uploading to fileserver") # Log link transport choice # Upload to HTTP server response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) @@ -482,7 +491,7 @@ function smartsend( end url = response["url"] # URL for the uploaded data - log_trace(cid, "Uploaded to URL: $url") # Log successful upload + log_trace(correlation_id, "Uploaded to URL: $url") # Log successful upload # Create msg_payload_v1 for link transport payload = msg_payload_v1( @@ -503,11 +512,11 @@ function smartsend( env = msg_envelope_v1( subject, payloads; - correlation_id = cid, + correlation_id = correlation_id, msg_id = msg_id, msg_purpose = msg_purpose, sender_name = sender_name, - sender_id = string(uuid4()), + sender_id = sender_id, receiver_name = receiver_name, receiver_id = receiver_id, reply_to = reply_to, @@ -520,9 +529,9 @@ function smartsend( if is_publish == false # skip publish a message elseif is_publish == true && NATS_connection === nothing - publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS + publish_message(broker_url, subject, env_json_str, correlation_id) # Publish message to NATS elseif is_publish == true && NATS_connection !== nothing - publish_message(NATS_connection, subject, env_json_str, cid) # Publish message to NATS + publish_message(NATS_connection, subject, env_json_str, correlation_id) # Publish message to NATS end return (env, env_json_str)