4 Commits

Author SHA1 Message Date
e5f4793370 fix output annotation 2026-03-04 11:58:19 +07:00
95fe697501 update diagram 2026-03-04 10:23:40 +07:00
ee2d2c7238 minor fix 2026-03-04 10:02:31 +07:00
ton
1dfa277279 Merge pull request 'split_smartsend' (#8) from split_smartsend into main
Reviewed-on: #8
2026-02-26 09:52:56 +00:00
8 changed files with 133 additions and 65 deletions

View File

@@ -1,6 +1,6 @@
name = "NATSBridge"
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
version = "0.4.3"
version = "0.4.5"
authors = ["narawat <narawat@gmail.com>"]
[deps]

View File

@@ -60,33 +60,74 @@ NATSBridge enables seamless communication for Julia applications through NATS, w
### System Components
```mermaid
flowchart TB
subgraph Sender["Julia Application (Sender)"]
SenderApp[App Code]
NATSBridge_Send[NATSBridge]
NATS_Client[<b>NATS.jl</b>]
end
subgraph Receiver["Julia Application (Receiver)"]
ReceiverApp[App Code]
NATSBridge_Recv[NATSBridge]
NATS_Client_Recv[<b>NATS.jl</b>]
end
subgraph Infrastructure["Infrastructure"]
NATS[<b>NATS Server</b><br/>Message Broker]
FileServer[<b>HTTP File Server</b><br/>Upload/Download]
end
SenderApp --> NATSBridge_Send
NATSBridge_Send --> NATS_Client
NATS_Client --> NATS
NATS --> NATS_Client_Recv
NATS_Client_Recv --> NATSBridge_Recv
NATSBridge_Recv --> ReceiverApp
NATSBridge_Send -.->|HTTP POST upload| FileServer
FileServer -.->|HTTP GET download| NATSBridge_Recv
style SenderApp fill:#e8f5e9
style ReceiverApp fill:#e8f5e9
style NATS fill:#fff3e0
style FileServer fill:#f3e5f5
```
┌─────────────────────────────────────────────────────────────────────┐
│ NATSBridge Architecture │
├─────────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ │ │
│ │ Julia │ ▼ │
│ │ (NATS.jl) │ ┌─────────────────────────┐ │
│ └──────────────┘ │ NATS │ │
│ │ (Message Broker) │ │
│ └─────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ File Server │ │
│ │ (HTTP Upload/Get) │ │
│ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
```
### Key Components
| Component | Description |
|-----------|-------------|
| **Julia Application** | Sender and receiver applications using the NATSBridge module |
| **NATS Server** | Message broker for transporting message envelopes |
| **HTTP File Server** | Independent HTTP server for large payload storage (e.g., Plik) |
### Message Flow
1. **Sender** creates a message envelope with payloads
2. **NATSBridge** serializes and encodes payloads based on type
3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server
4. **NATS** routes messages to subscribers
5. **Receiver** fetches payloads (from NATS or file server)
6. **NATSBridge** deserializes and decodes payloads
1. **Sender** creates a message envelope with payloads using `smartsend()`
2. **NATSBridge** serializes and encodes each payload based on type
3. **Transport Decision**:
- **Direct** (< 1MB): Payload encoded as Base64, published to NATS
- **Link** (≥ 1MB): Payload uploaded to HTTP file server, URL published to NATS
4. **NATS** routes message envelope to subscribers
5. **Receiver** receives message via NATS subscription callback
6. **NATSBridge** processes envelope:
- Decodes Base64 payloads from NATS message
- Fetches URLs from file server with exponential backoff
7. **Receiver** deserializes payloads based on their type
### File Server Handler Abstraction
The system uses handler functions to abstract file server operations:
| Handler | Purpose |
|---------|---------|
| `plik_oneshot_upload()` | Uploads payload bytes to file server, returns URL |
| `_fetch_with_backoff()` | Downloads data from URL with exponential backoff retry |
This abstraction allows support for different file server implementations (Plik, AWS S3, custom HTTP server).
---
@@ -196,7 +237,7 @@ env, env_json_str = NATSBridge.smartsend(
fileserver_url = "http://localhost:8080",
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000,
correlation_id::Union{String, Nothing} = nothing,
correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID)
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
@@ -204,10 +245,12 @@ env, env_json_str = NATSBridge.smartsend(
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true, # Whether to automatically publish to NATS
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional, saves connection overhead)
msg_id::String = string(uuid4()), # Message ID (auto-generated UUID)
sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID)
)
# Returns: (msgEnvelope_v1, JSON string)
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
# Returns: ::Tuple{msg_envelope_v1, String}
# - env: msg_envelope_v1 object with all envelope metadata and payloads
# - env_json_str: JSON string representation of the envelope for publishing
```
@@ -228,7 +271,7 @@ env = NATSBridge.smartreceive(
base_delay::Int = 100,
max_delay::Int = 5000
)
# Returns: Dict with envelope metadata and payloads array
# Returns: ::JSON.Object{String, Any} - key-value structure resemble msg_envelope_v1
```
### publish_message

View File

@@ -99,11 +99,12 @@ smartsend(
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads
# Receive returns a JSON object envelope with all metadata and deserialized payloads
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# Returns: ::JSON.Object{String, Any} - key-value structure resemble msg_envelope_v1
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
# env is a JSON object containing envelope metadata and payloads field
```
## Architecture Diagram
@@ -307,7 +308,7 @@ function smartsend(
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID)
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
@@ -315,7 +316,9 @@ function smartsend(
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true, # Whether to automatically publish to NATS
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional, saves connection overhead)
msg_id::String = string(uuid4()), # Message ID (auto-generated UUID)
sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID)
)
```

View File

@@ -105,11 +105,12 @@ smartsend(
broker_url="nats://localhost:4222"
)
# Receive returns a dictionary envelope with all metadata and deserialized payloads
# Receive returns a JSON object envelope with all metadata and deserialized payloads
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# Returns: ::JSON.Object{String, Any} - key-value structure resemble msg_envelope_v1
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field
# env is a JSON object containing envelope metadata and payloads field
```
## Architecture
@@ -147,6 +148,7 @@ The `smartsend` function now returns a tuple containing both the envelope object
```julia
env, env_json_str = smartsend(...)
# Returns: ::Tuple{msg_envelope_v1, String}
# env::msg_envelope_v1 - The envelope object with all metadata and payloads
# env_json_str::String - JSON string for publishing to NATS
```
@@ -280,8 +282,9 @@ smartsend("/test", [("single_data", mydata, "dictionary")], broker_url="nats://l
```julia
using NATSBridge
# Receive returns a dictionary with envelope metadata and payloads field
# Receive returns a JSON object with envelope metadata and payloads field
env = smartreceive(msg)
# Returns: ::JSON.Object{String, Any} - key-value structure resemble msg_envelope_v1
# env["payloads"] = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...]
```
@@ -321,7 +324,7 @@ function smartsend(
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload,
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID)
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
@@ -329,11 +332,16 @@ function smartsend(
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true,
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional)
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional)
msg_id::String = string(uuid4()), # Message ID (auto-generated UUID)
sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID)
)
```
**New Keyword Parameter:**
**New Keyword Parameters:**
- `correlation_id::String = string(uuid4())` - Correlation ID for tracing (auto-generated UUID)
- `msg_id::String = string(uuid4())` - Message ID (auto-generated UUID)
- `sender_id::String = string(uuid4())` - Sender ID (auto-generated UUID)
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios.
**Connection Handling Logic:**

View File

@@ -107,6 +107,7 @@ using NATSBridge
# Receive and process message
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
# Returns: ::JSON.Object{String, Any} - key-value structure resemble msg_envelope_v1
for (dataname, data, type) in env["payloads"]
println("Received $dataname: $data")
end

View File

@@ -163,6 +163,7 @@ end
function handle_message(handler::ChatHandler, msg::NATS.Msg)
env = smartreceive(msg, fileserver_download_handler=_fetch_with_backoff)
# Returns: ::JSON.Object{String, Any} - key-value structure resemble msg_envelope_v1
# Extract sender info from envelope
sender = get(env, "sender_name", "Anonymous")
@@ -281,6 +282,7 @@ end
function download_file(service::FileDownloadService, msg::NATS.Msg, sender::String, download_id::String)
# Subscribe to sender's file channel
env = smartreceive(msg, fileserver_download_handler=fetch_from_url)
# Returns: ::JSON.Object{String, Any} - key-value structure resemble msg_envelope_v1
# Process each payload
for (dataname, data, type) in env["payloads"]
@@ -518,6 +520,7 @@ end
function process_reading(receiver::SensorReceiver, msg::NATS.Msg)
env = smartreceive(msg, receiver.fileserver_download_handler)
# Returns: ::JSON.Object{String, Any} - key-value structure resemble msg_envelope_v1
for (dataname, data, data_type) in env["payloads"]
if data_type == "dictionary"

View File

@@ -375,7 +375,7 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
- `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads
- `fileserver_upload_handler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
- `correlation_id::String = string(uuid4())` - Correlation ID for tracing (auto-generated UUID)
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
- `sender_name::String = "NATSBridge"` - Name of the sender
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
@@ -384,9 +384,11 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
- `is_publish::Bool = true` - Whether to automatically publish the message to NATS
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection (if provided, uses this connection instead of creating a new one; saves connection establishment overhead)
- `msg_id::String = string(uuid4())` - Message ID (auto-generated UUID if not provided)
- `sender_id::String = string(uuid4())` - Sender ID (auto-generated UUID if not provided)
# Return:
- A tuple `(env, env_json_str)` where:
- `::Tuple{msg_envelope_v1, String}` - A tuple containing:
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
- `env_json_str::String` - JSON string representation of the envelope for publishing
@@ -425,7 +427,16 @@ function smartsend(
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing,
#=
Generate a globally unique identifier (UUID) at the start of the request.
This ID must remain constant and immutable as it propagates through every
stage of the execution pipeline. It serves as the end-to-end ID for
distributed tracing, enabling the correlation of all logs, metrics, and
errors across the system back to this specific request instance.
=#
correlation_id::String = string(uuid4()),
msg_purpose::String = "chat",
sender_name::String = "NATSBridge",
receiver_name::String = "",
@@ -433,15 +444,13 @@ function smartsend(
reply_to::String = "",
reply_to_msg_id::String = "",
is_publish::Bool = true, # some time the user want to get env and env_json_str from this function without publishing the msg
NATS_connection::Union{NATS.Connection, Nothing} = nothing # a provided connection saves establishing connection overhead.
) where {T1<:Any}
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # a provided connection saves establishing connection overhead.
msg_id::String = string(uuid4()), # Message ID
sender_id::String = string(uuid4()) # Sender ID
)::Tuple{msg_envelope_v1, String} where {T1<:Any}
# Generate correlation ID if not provided
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
# Generate message metadata
msg_id = string(uuid4())
# Log start of send operation
log_trace(correlation_id, "Starting smartsend for subject: $subject")
# Process each payload in the list
payloads = msg_payload_v1[]
@@ -450,13 +459,13 @@ function smartsend(
payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = length(payload_bytes) # Calculate payload size in bytes
log_trace(cid, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size
log_trace(correlation_id, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size
# Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport
# Direct path - Base64 encode and send via NATS
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
log_trace(correlation_id, "Using direct transport for $payload_size bytes") # Log transport choice
# Create msg_payload_v1 for direct transport
payload = msg_payload_v1(
@@ -472,7 +481,7 @@ function smartsend(
push!(payloads, payload)
else
# Link path - Upload to HTTP server, send URL via NATS
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
log_trace(correlation_id, "Using link transport, uploading to fileserver") # Log link transport choice
# Upload to HTTP server
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
@@ -482,7 +491,7 @@ function smartsend(
end
url = response["url"] # URL for the uploaded data
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
log_trace(correlation_id, "Uploaded to URL: $url") # Log successful upload
# Create msg_payload_v1 for link transport
payload = msg_payload_v1(
@@ -503,11 +512,11 @@ function smartsend(
env = msg_envelope_v1(
subject,
payloads;
correlation_id = cid,
correlation_id = correlation_id,
msg_id = msg_id,
msg_purpose = msg_purpose,
sender_name = sender_name,
sender_id = string(uuid4()),
sender_id = sender_id,
receiver_name = receiver_name,
receiver_id = receiver_id,
reply_to = reply_to,
@@ -520,9 +529,9 @@ function smartsend(
if is_publish == false
# skip publish a message
elseif is_publish == true && NATS_connection === nothing
publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS
publish_message(broker_url, subject, env_json_str, correlation_id) # Publish message to NATS
elseif is_publish == true && NATS_connection !== nothing
publish_message(NATS_connection, subject, env_json_str, cid) # Publish message to NATS
publish_message(NATS_connection, subject, env_json_str, correlation_id) # Publish message to NATS
end
return (env, env_json_str)
@@ -745,14 +754,14 @@ A HTTP file server is required along with its download function.
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
# Return:
- JSON object of envelope with list of (dataname, data, data_type) tuples in payloads field
- `::JSON.Object{String, Any}` - key-value structure resemble msg_envelope_v1
# Example
```jldoctest
# Receive and process message
msg = nats_message # NATS message
payloads = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# payloads = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]
```
"""
function smartreceive(
@@ -761,7 +770,7 @@ function smartreceive(
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)
)::JSON.Object{String, Any}
# Parse the JSON envelope
env_json_obj = JSON.parse(String(msg.payload))
log_trace(env_json_obj["correlation_id"], "Processing received message") # Log message processing start
@@ -809,7 +818,7 @@ function smartreceive(
end
end
env_json_obj["payloads"] = payloads_list
return env_json_obj # JSON object of envelope with list of (dataname, data, data_type) tuples in payloads field
return env_json_obj # key-value structure resemble msg_envelope_v1
end

View File

@@ -186,7 +186,7 @@ function test_mix_send()
]
# Use smartsend with mixed content
env, env_json_str = NATSBridge.smartsend(
sendinfo = NATSBridge.smartsend(
SUBJECT,
payloads; # List of (dataname, data, type) tuples
broker_url = NATS_URL,
@@ -202,7 +202,8 @@ function test_mix_send()
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
env, env_json_str = sendinfo
log_trace("Sent message with $(length(env.payloads)) payloads")
# Log transport type for each payload