Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 95fe697501 | |||
| ee2d2c7238 |
@@ -1,6 +1,6 @@
|
|||||||
name = "NATSBridge"
|
name = "NATSBridge"
|
||||||
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
|
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
|
||||||
version = "0.4.3"
|
version = "0.4.4"
|
||||||
authors = ["narawat <narawat@gmail.com>"]
|
authors = ["narawat <narawat@gmail.com>"]
|
||||||
|
|
||||||
[deps]
|
[deps]
|
||||||
|
|||||||
93
README.md
93
README.md
@@ -60,33 +60,74 @@ NATSBridge enables seamless communication for Julia applications through NATS, w
|
|||||||
|
|
||||||
### System Components
|
### System Components
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
flowchart TB
|
||||||
|
subgraph Sender["Julia Application (Sender)"]
|
||||||
|
SenderApp[App Code]
|
||||||
|
NATSBridge_Send[NATSBridge]
|
||||||
|
NATS_Client[<b>NATS.jl</b>]
|
||||||
|
end
|
||||||
|
|
||||||
|
subgraph Receiver["Julia Application (Receiver)"]
|
||||||
|
ReceiverApp[App Code]
|
||||||
|
NATSBridge_Recv[NATSBridge]
|
||||||
|
NATS_Client_Recv[<b>NATS.jl</b>]
|
||||||
|
end
|
||||||
|
|
||||||
|
subgraph Infrastructure["Infrastructure"]
|
||||||
|
NATS[<b>NATS Server</b><br/>Message Broker]
|
||||||
|
FileServer[<b>HTTP File Server</b><br/>Upload/Download]
|
||||||
|
end
|
||||||
|
|
||||||
|
SenderApp --> NATSBridge_Send
|
||||||
|
NATSBridge_Send --> NATS_Client
|
||||||
|
NATS_Client --> NATS
|
||||||
|
|
||||||
|
NATS --> NATS_Client_Recv
|
||||||
|
NATS_Client_Recv --> NATSBridge_Recv
|
||||||
|
NATSBridge_Recv --> ReceiverApp
|
||||||
|
|
||||||
|
NATSBridge_Send -.->|HTTP POST upload| FileServer
|
||||||
|
FileServer -.->|HTTP GET download| NATSBridge_Recv
|
||||||
|
|
||||||
|
style SenderApp fill:#e8f5e9
|
||||||
|
style ReceiverApp fill:#e8f5e9
|
||||||
|
style NATS fill:#fff3e0
|
||||||
|
style FileServer fill:#f3e5f5
|
||||||
```
|
```
|
||||||
┌─────────────────────────────────────────────────────────────────────┐
|
|
||||||
│ NATSBridge Architecture │
|
### Key Components
|
||||||
├─────────────────────────────────────────────────────────────────────┤
|
|
||||||
│ ┌──────────────┐ │ │
|
| Component | Description |
|
||||||
│ │ Julia │ ▼ │
|
|-----------|-------------|
|
||||||
│ │ (NATS.jl) │ ┌─────────────────────────┐ │
|
| **Julia Application** | Sender and receiver applications using the NATSBridge module |
|
||||||
│ └──────────────┘ │ NATS │ │
|
| **NATS Server** | Message broker for transporting message envelopes |
|
||||||
│ │ (Message Broker) │ │
|
| **HTTP File Server** | Independent HTTP server for large payload storage (e.g., Plik) |
|
||||||
│ └─────────────────────────┘ │
|
|
||||||
│ │ │
|
|
||||||
│ ▼ │
|
|
||||||
│ ┌──────────────────────┐ │
|
|
||||||
│ │ File Server │ │
|
|
||||||
│ │ (HTTP Upload/Get) │ │
|
|
||||||
│ └──────────────────────┘ │
|
|
||||||
└─────────────────────────────────────────────────────────────────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
### Message Flow
|
### Message Flow
|
||||||
|
|
||||||
1. **Sender** creates a message envelope with payloads
|
1. **Sender** creates a message envelope with payloads using `smartsend()`
|
||||||
2. **NATSBridge** serializes and encodes payloads based on type
|
2. **NATSBridge** serializes and encodes each payload based on type
|
||||||
3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server
|
3. **Transport Decision**:
|
||||||
4. **NATS** routes messages to subscribers
|
- **Direct** (< 1MB): Payload encoded as Base64, published to NATS
|
||||||
5. **Receiver** fetches payloads (from NATS or file server)
|
- **Link** (≥ 1MB): Payload uploaded to HTTP file server, URL published to NATS
|
||||||
6. **NATSBridge** deserializes and decodes payloads
|
4. **NATS** routes message envelope to subscribers
|
||||||
|
5. **Receiver** receives message via NATS subscription callback
|
||||||
|
6. **NATSBridge** processes envelope:
|
||||||
|
- Decodes Base64 payloads from NATS message
|
||||||
|
- Fetches URLs from file server with exponential backoff
|
||||||
|
7. **Receiver** deserializes payloads based on their type
|
||||||
|
|
||||||
|
### File Server Handler Abstraction
|
||||||
|
|
||||||
|
The system uses handler functions to abstract file server operations:
|
||||||
|
|
||||||
|
| Handler | Purpose |
|
||||||
|
|---------|---------|
|
||||||
|
| `plik_oneshot_upload()` | Uploads payload bytes to file server, returns URL |
|
||||||
|
| `_fetch_with_backoff()` | Downloads data from URL with exponential backoff retry |
|
||||||
|
|
||||||
|
This abstraction allows support for different file server implementations (Plik, AWS S3, custom HTTP server).
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -196,7 +237,7 @@ env, env_json_str = NATSBridge.smartsend(
|
|||||||
fileserver_url = "http://localhost:8080",
|
fileserver_url = "http://localhost:8080",
|
||||||
fileserver_upload_handler::Function = plik_oneshot_upload,
|
fileserver_upload_handler::Function = plik_oneshot_upload,
|
||||||
size_threshold::Int = 1_000_000,
|
size_threshold::Int = 1_000_000,
|
||||||
correlation_id::Union{String, Nothing} = nothing,
|
correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID)
|
||||||
msg_purpose::String = "chat",
|
msg_purpose::String = "chat",
|
||||||
sender_name::String = "NATSBridge",
|
sender_name::String = "NATSBridge",
|
||||||
receiver_name::String = "",
|
receiver_name::String = "",
|
||||||
@@ -204,7 +245,9 @@ env, env_json_str = NATSBridge.smartsend(
|
|||||||
reply_to::String = "",
|
reply_to::String = "",
|
||||||
reply_to_msg_id::String = "",
|
reply_to_msg_id::String = "",
|
||||||
is_publish::Bool = true, # Whether to automatically publish to NATS
|
is_publish::Bool = true, # Whether to automatically publish to NATS
|
||||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
|
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional, saves connection overhead)
|
||||||
|
msg_id::String = string(uuid4()), # Message ID (auto-generated UUID)
|
||||||
|
sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID)
|
||||||
)
|
)
|
||||||
# Returns: (msgEnvelope_v1, JSON string)
|
# Returns: (msgEnvelope_v1, JSON string)
|
||||||
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
|
# - env: msgEnvelope_v1 object with all envelope metadata and payloads
|
||||||
|
|||||||
@@ -307,7 +307,7 @@ function smartsend(
|
|||||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||||
fileserver_upload_handler::Function = plik_oneshot_upload,
|
fileserver_upload_handler::Function = plik_oneshot_upload,
|
||||||
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
||||||
correlation_id::Union{String, Nothing} = nothing,
|
correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID)
|
||||||
msg_purpose::String = "chat",
|
msg_purpose::String = "chat",
|
||||||
sender_name::String = "NATSBridge",
|
sender_name::String = "NATSBridge",
|
||||||
receiver_name::String = "",
|
receiver_name::String = "",
|
||||||
@@ -315,7 +315,9 @@ function smartsend(
|
|||||||
reply_to::String = "",
|
reply_to::String = "",
|
||||||
reply_to_msg_id::String = "",
|
reply_to_msg_id::String = "",
|
||||||
is_publish::Bool = true, # Whether to automatically publish to NATS
|
is_publish::Bool = true, # Whether to automatically publish to NATS
|
||||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
|
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional, saves connection overhead)
|
||||||
|
msg_id::String = string(uuid4()), # Message ID (auto-generated UUID)
|
||||||
|
sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID)
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@@ -321,7 +321,7 @@ function smartsend(
|
|||||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||||
fileserver_upload_handler::Function = plik_oneshot_upload,
|
fileserver_upload_handler::Function = plik_oneshot_upload,
|
||||||
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
||||||
correlation_id::Union{String, Nothing} = nothing,
|
correlation_id::String = string(uuid4()), # Correlation ID for tracing (auto-generated UUID)
|
||||||
msg_purpose::String = "chat",
|
msg_purpose::String = "chat",
|
||||||
sender_name::String = "NATSBridge",
|
sender_name::String = "NATSBridge",
|
||||||
receiver_name::String = "",
|
receiver_name::String = "",
|
||||||
@@ -329,11 +329,16 @@ function smartsend(
|
|||||||
reply_to::String = "",
|
reply_to::String = "",
|
||||||
reply_to_msg_id::String = "",
|
reply_to_msg_id::String = "",
|
||||||
is_publish::Bool = true,
|
is_publish::Bool = true,
|
||||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional)
|
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # Pre-existing NATS connection (optional)
|
||||||
|
msg_id::String = string(uuid4()), # Message ID (auto-generated UUID)
|
||||||
|
sender_id::String = string(uuid4()) # Sender ID (auto-generated UUID)
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
|
|
||||||
**New Keyword Parameter:**
|
**New Keyword Parameters:**
|
||||||
|
- `correlation_id::String = string(uuid4())` - Correlation ID for tracing (auto-generated UUID)
|
||||||
|
- `msg_id::String = string(uuid4())` - Message ID (auto-generated UUID)
|
||||||
|
- `sender_id::String = string(uuid4())` - Sender ID (auto-generated UUID)
|
||||||
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios.
|
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios.
|
||||||
|
|
||||||
**Connection Handling Logic:**
|
**Connection Handling Logic:**
|
||||||
|
|||||||
@@ -375,7 +375,7 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
|
|||||||
- `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads
|
- `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads
|
||||||
- `fileserver_upload_handler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
|
- `fileserver_upload_handler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
|
||||||
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
|
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
|
||||||
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
|
- `correlation_id::String = string(uuid4())` - Correlation ID for tracing (auto-generated UUID)
|
||||||
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
||||||
- `sender_name::String = "NATSBridge"` - Name of the sender
|
- `sender_name::String = "NATSBridge"` - Name of the sender
|
||||||
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
|
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
|
||||||
@@ -384,6 +384,8 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
|
|||||||
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
|
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
|
||||||
- `is_publish::Bool = true` - Whether to automatically publish the message to NATS
|
- `is_publish::Bool = true` - Whether to automatically publish the message to NATS
|
||||||
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection (if provided, uses this connection instead of creating a new one; saves connection establishment overhead)
|
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection (if provided, uses this connection instead of creating a new one; saves connection establishment overhead)
|
||||||
|
- `msg_id::String = string(uuid4())` - Message ID (auto-generated UUID if not provided)
|
||||||
|
- `sender_id::String = string(uuid4())` - Sender ID (auto-generated UUID if not provided)
|
||||||
|
|
||||||
# Return:
|
# Return:
|
||||||
- A tuple `(env, env_json_str)` where:
|
- A tuple `(env, env_json_str)` where:
|
||||||
@@ -425,7 +427,16 @@ function smartsend(
|
|||||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||||
fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
|
fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
|
||||||
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
||||||
correlation_id::Union{String, Nothing} = nothing,
|
|
||||||
|
#=
|
||||||
|
Generate a globally unique identifier (UUID) at the start of the request.
|
||||||
|
This ID must remain constant and immutable as it propagates through every
|
||||||
|
stage of the execution pipeline. It serves as the end-to-end ID for
|
||||||
|
distributed tracing, enabling the correlation of all logs, metrics, and
|
||||||
|
errors across the system back to this specific request instance.
|
||||||
|
=#
|
||||||
|
correlation_id::String = string(uuid4()),
|
||||||
|
|
||||||
msg_purpose::String = "chat",
|
msg_purpose::String = "chat",
|
||||||
sender_name::String = "NATSBridge",
|
sender_name::String = "NATSBridge",
|
||||||
receiver_name::String = "",
|
receiver_name::String = "",
|
||||||
@@ -433,15 +444,13 @@ function smartsend(
|
|||||||
reply_to::String = "",
|
reply_to::String = "",
|
||||||
reply_to_msg_id::String = "",
|
reply_to_msg_id::String = "",
|
||||||
is_publish::Bool = true, # some time the user want to get env and env_json_str from this function without publishing the msg
|
is_publish::Bool = true, # some time the user want to get env and env_json_str from this function without publishing the msg
|
||||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # a provided connection saves establishing connection overhead.
|
NATS_connection::Union{NATS.Connection, Nothing} = nothing, # a provided connection saves establishing connection overhead.
|
||||||
|
msg_id::String = string(uuid4()), # Message ID
|
||||||
|
sender_id::String = string(uuid4()) # Sender ID
|
||||||
) where {T1<:Any}
|
) where {T1<:Any}
|
||||||
|
|
||||||
# Generate correlation ID if not provided
|
# Log start of send operation
|
||||||
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
|
log_trace(correlation_id, "Starting smartsend for subject: $subject")
|
||||||
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
|
|
||||||
|
|
||||||
# Generate message metadata
|
|
||||||
msg_id = string(uuid4())
|
|
||||||
|
|
||||||
# Process each payload in the list
|
# Process each payload in the list
|
||||||
payloads = msg_payload_v1[]
|
payloads = msg_payload_v1[]
|
||||||
@@ -450,13 +459,13 @@ function smartsend(
|
|||||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
payload_bytes = _serialize_data(payload_data, payload_type)
|
||||||
|
|
||||||
payload_size = length(payload_bytes) # Calculate payload size in bytes
|
payload_size = length(payload_bytes) # Calculate payload size in bytes
|
||||||
log_trace(cid, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size
|
log_trace(correlation_id, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size
|
||||||
|
|
||||||
# Decision: Direct vs Link
|
# Decision: Direct vs Link
|
||||||
if payload_size < size_threshold # Check if payload is small enough for direct transport
|
if payload_size < size_threshold # Check if payload is small enough for direct transport
|
||||||
# Direct path - Base64 encode and send via NATS
|
# Direct path - Base64 encode and send via NATS
|
||||||
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
||||||
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
|
log_trace(correlation_id, "Using direct transport for $payload_size bytes") # Log transport choice
|
||||||
|
|
||||||
# Create msg_payload_v1 for direct transport
|
# Create msg_payload_v1 for direct transport
|
||||||
payload = msg_payload_v1(
|
payload = msg_payload_v1(
|
||||||
@@ -472,7 +481,7 @@ function smartsend(
|
|||||||
push!(payloads, payload)
|
push!(payloads, payload)
|
||||||
else
|
else
|
||||||
# Link path - Upload to HTTP server, send URL via NATS
|
# Link path - Upload to HTTP server, send URL via NATS
|
||||||
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
|
log_trace(correlation_id, "Using link transport, uploading to fileserver") # Log link transport choice
|
||||||
|
|
||||||
# Upload to HTTP server
|
# Upload to HTTP server
|
||||||
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||||||
@@ -482,7 +491,7 @@ function smartsend(
|
|||||||
end
|
end
|
||||||
|
|
||||||
url = response["url"] # URL for the uploaded data
|
url = response["url"] # URL for the uploaded data
|
||||||
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
|
log_trace(correlation_id, "Uploaded to URL: $url") # Log successful upload
|
||||||
|
|
||||||
# Create msg_payload_v1 for link transport
|
# Create msg_payload_v1 for link transport
|
||||||
payload = msg_payload_v1(
|
payload = msg_payload_v1(
|
||||||
@@ -503,11 +512,11 @@ function smartsend(
|
|||||||
env = msg_envelope_v1(
|
env = msg_envelope_v1(
|
||||||
subject,
|
subject,
|
||||||
payloads;
|
payloads;
|
||||||
correlation_id = cid,
|
correlation_id = correlation_id,
|
||||||
msg_id = msg_id,
|
msg_id = msg_id,
|
||||||
msg_purpose = msg_purpose,
|
msg_purpose = msg_purpose,
|
||||||
sender_name = sender_name,
|
sender_name = sender_name,
|
||||||
sender_id = string(uuid4()),
|
sender_id = sender_id,
|
||||||
receiver_name = receiver_name,
|
receiver_name = receiver_name,
|
||||||
receiver_id = receiver_id,
|
receiver_id = receiver_id,
|
||||||
reply_to = reply_to,
|
reply_to = reply_to,
|
||||||
@@ -520,9 +529,9 @@ function smartsend(
|
|||||||
if is_publish == false
|
if is_publish == false
|
||||||
# skip publish a message
|
# skip publish a message
|
||||||
elseif is_publish == true && NATS_connection === nothing
|
elseif is_publish == true && NATS_connection === nothing
|
||||||
publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS
|
publish_message(broker_url, subject, env_json_str, correlation_id) # Publish message to NATS
|
||||||
elseif is_publish == true && NATS_connection !== nothing
|
elseif is_publish == true && NATS_connection !== nothing
|
||||||
publish_message(NATS_connection, subject, env_json_str, cid) # Publish message to NATS
|
publish_message(NATS_connection, subject, env_json_str, correlation_id) # Publish message to NATS
|
||||||
end
|
end
|
||||||
|
|
||||||
return (env, env_json_str)
|
return (env, env_json_str)
|
||||||
|
|||||||
Reference in New Issue
Block a user