Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 95fe697501 | |||
| ee2d2c7238 |
@@ -1,6 +1,6 @@
|
||||
name = "NATSBridge"
|
||||
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
|
||||
version = "0.4.3"
|
||||
version = "0.4.4"
|
||||
authors = ["narawat <narawat@gmail.com>"]
|
||||
|
||||
[deps]
|
||||
|
||||
93
README.md
93
README.md
@@ -60,33 +60,74 @@ NATSBridge enables seamless communication for Julia applications through NATS, w
|
||||
|
||||
### System Components
|
||||
|
||||
```mermaid
|
||||
flowchart TB
|
||||
subgraph Sender["Julia Application (Sender)"]
|
||||
SenderApp[App Code]
|
||||
NATSBridge_Send[NATSBridge]
|
||||
NATS_Client[<b>NATS.jl</b>]
|
||||
end
|
||||
|
||||
subgraph Receiver["Julia Application (Receiver)"]
|
||||
ReceiverApp[App Code]
|
||||
NATSBridge_Recv[NATSBridge]
|
||||
NATS_Client_Recv[<b>NATS.jl</b>]
|
||||
end
|
||||
|
||||
subgraph Infrastructure["Infrastructure"]
|
||||
NATS[<b>NATS Server</b><br/>Message Broker]
|
||||
FileServer[<b>HTTP File Server</b><br/>Upload/Download]
|
||||
end
|
||||
|
||||
SenderApp --> NATSBridge_Send
|
||||
NATSBridge_Send --> NATS_Client
|
||||
NATS_Client --> NATS
|
||||
|
||||
NATS --> NATS_Client_Recv
|
||||
NATS_Client_Recv --> NATSBridge_Recv
|
||||
NATSBridge_Recv --> ReceiverApp
|
||||
|
||||
NATSBridge_Send -.->|HTTP POST upload| FileServer
|
||||
FileServer -.->|HTTP GET download| NATSBridge_Recv
|
||||
|
||||
style SenderApp fill:#e8f5e9
|
||||
style ReceiverApp fill:#e8f5e9
|
||||
style NATS fill:#fff3e0
|
||||
style FileServer fill:#f3e5f5
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ NATSBridge Architecture │
|
||||
├─────────────────────────────────────────────────────────────────────┤
|
||||
│ ┌──────────────┐ │ │
|
||||
│ │ Julia │ ▼ │
|
||||
│ │ (NATS.jl) │ ┌─────────────────────────┐ │
|
||||
│ └──────────────┘ │ NATS │ │
|
||||
│ │ (Message Broker) │ │
|
||||
│ └─────────────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌──────────────────────┐ │
|
||||
│ │ File Server │ │
|
||||
│ │ (HTTP Upload/Get) │ │
|
||||
│ └──────────────────────┘ │
|
||||
└─────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Key Components
|
||||
|
||||
| Component | Description |
|
||||
|-----------|-------------|
|
||||
| **Julia Application** | Sender and receiver applications using the NATSBridge module |
|
||||
| **NATS Server** | Message broker for transporting message envelopes |
|
||||
| **HTTP File Server** | Independent HTTP server for large payload storage (e.g., Plik) |
|
||||
|
||||
### Message Flow
|
||||
|
||||
1. **Sender** creates a message envelope with payloads
|
||||
2. **NATSBridge** serializes and encodes payloads based on type
|
||||
3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server
|
||||
4. **NATS** routes messages to subscribers
|
||||
5. **Receiver** fetches payloads (from NATS or file server)
|
||||
6. **NATSBridge** deserializes and decodes payloads
|
||||
1. **Sender** creates a message envelope with payloads using `smartsend()`
|
||||
2. **NATSBridge** serializes and encodes each payload based on type
|
||||
3. **Transport Decision**:
|
||||
- **Direct** (< 1MB): Payload encoded as Base64, published to NATS
|
||||
- **Link** (≥ 1MB): Payload uploaded to HTTP file server, URL published to NATS
|
||||
4. **NATS** routes message envelope to subscribers
|
||||
5. **Receiver** receives message via NATS subscription callback
|
||||
6. **NATSBridge** processes envelope:
|
||||
- Decodes Base64 payloads from NATS message
|
||||
- Fetches URLs from file server with exponential backoff
|
||||
7. **Receiver** deserializes payloads based on their type
|
||||
|
||||
### File Server Handler Abstraction
|
||||
|
||||
The system uses handler functions to abstract file server operations:
|
||||
|
||||
| Handler | Purpose |
|
||||
|---------|---------|
|
||||
| `plik_oneshot_upload()` | Uploads payload bytes to file server, returns URL |
|
||||
| `_fetch_with_backoff()` | Downloads data from URL with exponential backoff retry |
|
||||
|
||||
This abstraction allows support for different file server implementations (Plik, AWS S3, custom HTTP server).
|
||||
|
||||
---
|
||||
|
||||
@@ -196,7 +237,7 @@ env, env_json_str = NATSBridge.smartsend(
|
||||
fileserver_url = "http://localhost:8080",
|
||||
fileserver_upload_handler::Function = plik_oneshot_upload,
|
||||
size_threshold::Int = 1_000_000,
|
||||
correlation_id::Union{String, Nothing} = nothing,
|
||||
correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID)
|
||||
msg_purpose::String = "chat",
|
||||
sender_name::String = "NATSBridge",
|
||||
receiver_name::String = "",
|
||||
@@ -204,7 +245,9 @@ env, env_json_str = NATSBridge.smartsend(
|
||||
reply_to::String = "",
|
||||
reply_to_msg_id::String = "",
|
||||
is_publish::Bool = true, # Whether to automatically publish to NATS
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional, saves connection overhead)
|
||||
msg_id::String = string(uuid4()), # Message ID (auto-generated UUID)
|
||||
sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID)
|
||||
)
|
||||
# Returns: (msgEnvelope_v1, JSON string)
|
||||
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
|
||||
|
||||
@@ -307,7 +307,7 @@ function smartsend(
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler::Function = plik_oneshot_upload,
|
||||
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id::Union{String, Nothing} = nothing,
|
||||
correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID)
|
||||
msg_purpose::String = "chat",
|
||||
sender_name::String = "NATSBridge",
|
||||
receiver_name::String = "",
|
||||
@@ -315,7 +315,9 @@ function smartsend(
|
||||
reply_to::String = "",
|
||||
reply_to_msg_id::String = "",
|
||||
is_publish::Bool = true, # Whether to automatically publish to NATS
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional, saves connection overhead)
|
||||
msg_id::String = string(uuid4()), # Message ID (auto-generated UUID)
|
||||
sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID)
|
||||
)
|
||||
```
|
||||
|
||||
|
||||
@@ -321,7 +321,7 @@ function smartsend(
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler::Function = plik_oneshot_upload,
|
||||
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id::Union{String, Nothing} = nothing,
|
||||
correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID)
|
||||
msg_purpose::String = "chat",
|
||||
sender_name::String = "NATSBridge",
|
||||
receiver_name::String = "",
|
||||
@@ -329,11 +329,16 @@ function smartsend(
|
||||
reply_to::String = "",
|
||||
reply_to_msg_id::String = "",
|
||||
is_publish::Bool = true,
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional)
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional)
|
||||
msg_id::String = string(uuid4()), # Message ID (auto-generated UUID)
|
||||
sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID)
|
||||
)
|
||||
```
|
||||
|
||||
**New Keyword Parameter:**
|
||||
**New Keyword Parameters:**
|
||||
- `correlation_id::String = string(uuid4())` - Correlation ID for tracing (auto-generated UUID)
|
||||
- `msg_id::String = string(uuid4())` - Message ID (auto-generated UUID)
|
||||
- `sender_id::String = string(uuid4())` - Sender ID (auto-generated UUID)
|
||||
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios.
|
||||
|
||||
**Connection Handling Logic:**
|
||||
|
||||
@@ -375,7 +375,7 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
|
||||
- `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads
|
||||
- `fileserver_upload_handler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
|
||||
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
|
||||
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
|
||||
- `correlation_id::String = string(uuid4())` - Correlation ID for tracing (auto-generated UUID)
|
||||
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
||||
- `sender_name::String = "NATSBridge"` - Name of the sender
|
||||
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
|
||||
@@ -384,6 +384,8 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
|
||||
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
|
||||
- `is_publish::Bool = true` - Whether to automatically publish the message to NATS
|
||||
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection (if provided, uses this connection instead of creating a new one; saves connection establishment overhead)
|
||||
- `msg_id::String = string(uuid4())` - Message ID (auto-generated UUID if not provided)
|
||||
- `sender_id::String = string(uuid4())` - Sender ID (auto-generated UUID if not provided)
|
||||
|
||||
# Return:
|
||||
- A tuple `(env, env_json_str)` where:
|
||||
@@ -425,7 +427,16 @@ function smartsend(
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
|
||||
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id::Union{String, Nothing} = nothing,
|
||||
|
||||
#=
|
||||
Generate a globally unique identifier (UUID) at the start of the request.
|
||||
This ID must remain constant and immutable as it propagates through every
|
||||
stage of the execution pipeline. It serves as the end-to-end ID for
|
||||
distributed tracing, enabling the correlation of all logs, metrics, and
|
||||
errors across the system back to this specific request instance.
|
||||
=#
|
||||
correlation_id::String = string(uuid4()),
|
||||
|
||||
msg_purpose::String = "chat",
|
||||
sender_name::String = "NATSBridge",
|
||||
receiver_name::String = "",
|
||||
@@ -433,15 +444,13 @@ function smartsend(
|
||||
reply_to::String = "",
|
||||
reply_to_msg_id::String = "",
|
||||
is_publish::Bool = true, # some time the user want to get env and env_json_str from this function without publishing the msg
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # a provided connection saves establishing connection overhead.
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # a provided connection saves establishing connection overhead.
|
||||
msg_id::String = string(uuid4()), # Message ID
|
||||
sender_id::String = string(uuid4()) # Sender ID
|
||||
) where {T1<:Any}
|
||||
|
||||
# Generate correlation ID if not provided
|
||||
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
|
||||
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
|
||||
|
||||
# Generate message metadata
|
||||
msg_id = string(uuid4())
|
||||
# Log start of send operation
|
||||
log_trace(correlation_id, "Starting smartsend for subject: $subject")
|
||||
|
||||
# Process each payload in the list
|
||||
payloads = msg_payload_v1[]
|
||||
@@ -450,13 +459,13 @@ function smartsend(
|
||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
||||
|
||||
payload_size = length(payload_bytes) # Calculate payload size in bytes
|
||||
log_trace(cid, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size
|
||||
log_trace(correlation_id, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size
|
||||
|
||||
# Decision: Direct vs Link
|
||||
if payload_size < size_threshold # Check if payload is small enough for direct transport
|
||||
# Direct path - Base64 encode and send via NATS
|
||||
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
||||
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
|
||||
log_trace(correlation_id, "Using direct transport for $payload_size bytes") # Log transport choice
|
||||
|
||||
# Create msg_payload_v1 for direct transport
|
||||
payload = msg_payload_v1(
|
||||
@@ -472,7 +481,7 @@ function smartsend(
|
||||
push!(payloads, payload)
|
||||
else
|
||||
# Link path - Upload to HTTP server, send URL via NATS
|
||||
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
|
||||
log_trace(correlation_id, "Using link transport, uploading to fileserver") # Log link transport choice
|
||||
|
||||
# Upload to HTTP server
|
||||
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||||
@@ -482,7 +491,7 @@ function smartsend(
|
||||
end
|
||||
|
||||
url = response["url"] # URL for the uploaded data
|
||||
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
|
||||
log_trace(correlation_id, "Uploaded to URL: $url") # Log successful upload
|
||||
|
||||
# Create msg_payload_v1 for link transport
|
||||
payload = msg_payload_v1(
|
||||
@@ -503,11 +512,11 @@ function smartsend(
|
||||
env = msg_envelope_v1(
|
||||
subject,
|
||||
payloads;
|
||||
correlation_id = cid,
|
||||
correlation_id = correlation_id,
|
||||
msg_id = msg_id,
|
||||
msg_purpose = msg_purpose,
|
||||
sender_name = sender_name,
|
||||
sender_id = string(uuid4()),
|
||||
sender_id = sender_id,
|
||||
receiver_name = receiver_name,
|
||||
receiver_id = receiver_id,
|
||||
reply_to = reply_to,
|
||||
@@ -520,9 +529,9 @@ function smartsend(
|
||||
if is_publish == false
|
||||
# skip publish a message
|
||||
elseif is_publish == true && NATS_connection === nothing
|
||||
publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS
|
||||
publish_message(broker_url, subject, env_json_str, correlation_id) # Publish message to NATS
|
||||
elseif is_publish == true && NATS_connection !== nothing
|
||||
publish_message(NATS_connection, subject, env_json_str, cid) # Publish message to NATS
|
||||
publish_message(NATS_connection, subject, env_json_str, correlation_id) # Publish message to NATS
|
||||
end
|
||||
|
||||
return (env, env_json_str)
|
||||
|
||||
Reference in New Issue
Block a user