update
This commit is contained in:
@@ -395,10 +395,25 @@ function smartsend(
|
||||
receiver_id::String = "",
|
||||
reply_to::String = "",
|
||||
reply_to_msg_id::String = "",
|
||||
is_publish::Bool = true # Whether to automatically publish to NATS
|
||||
is_publish::Bool = true, # Whether to automatically publish to NATS
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional, saves connection overhead)
|
||||
)
|
||||
```
|
||||
|
||||
**New Keyword Parameter:**
|
||||
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios where connection reuse provides performance benefits.
|
||||
|
||||
**Connection Handling Logic:**
|
||||
```julia
|
||||
if is_publish == false
|
||||
# skip publish a message
|
||||
elseif is_publish == true && NATS_connection === nothing
|
||||
publish_message(broker_url, subject, env_json_str, cid) # Creates new connection
|
||||
elseif is_publish == true && NATS_connection !== nothing
|
||||
publish_message(NATS_connection, subject, env_json_str, cid) # Uses provided connection
|
||||
end
|
||||
```
|
||||
|
||||
**Return Value:**
|
||||
- Returns a tuple `(env, env_json_str)` where:
|
||||
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
|
||||
@@ -459,6 +474,53 @@ end
|
||||
|
||||
**Note:** The `fileserver_download_handler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
|
||||
|
||||
#### publish_message Function
|
||||
|
||||
The `publish_message` function provides two overloads for publishing messages to NATS:
|
||||
|
||||
**Overload 1 - URL-based publishing (creates new connection):**
|
||||
```julia
|
||||
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
|
||||
conn = NATS.connect(broker_url) # Create NATS connection
|
||||
publish_message(conn, subject, message, correlation_id)
|
||||
end
|
||||
```
|
||||
|
||||
**Overload 2 - Connection-based publishing (uses pre-existing connection):**
|
||||
```julia
|
||||
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
|
||||
try
|
||||
NATS.publish(conn, subject, message) # Publish message to NATS
|
||||
log_trace(correlation_id, "Message published to $subject") # Log successful publish
|
||||
finally
|
||||
NATS.drain(conn) # Ensure connection is closed properly
|
||||
end
|
||||
end
|
||||
```
|
||||
|
||||
**Use Case:** Use the connection-based overload when you already have an established NATS connection and want to publish multiple messages without the overhead of creating a new connection for each publish.
|
||||
|
||||
**Integration with smartsend:**
|
||||
```julia
|
||||
# When NATS_connection is provided to smartsend, it uses the connection-based publish_message
|
||||
env, env_json_str = smartsend(
|
||||
"my.subject",
|
||||
[("data", payload_data, "type")],
|
||||
NATS_connection=my_connection, # Pre-existing connection
|
||||
is_publish=true
|
||||
)
|
||||
# Uses: publish_message(NATS_connection, subject, env_json_str, cid)
|
||||
|
||||
# When NATS_connection is not provided, it uses the URL-based publish_message
|
||||
env, env_json_str = smartsend(
|
||||
"my.subject",
|
||||
[("data", payload_data, "type")],
|
||||
broker_url="nats://localhost:4222",
|
||||
is_publish=true
|
||||
)
|
||||
# Uses: publish_message(broker_url, subject, env_json_str, cid)
|
||||
```
|
||||
|
||||
### JavaScript Implementation
|
||||
|
||||
#### Dependencies
|
||||
|
||||
@@ -337,6 +337,30 @@ env, env_json_str = smartsend(
|
||||
# env_json_str: JSON string for publishing
|
||||
```
|
||||
|
||||
**Julia (Sender/Receiver) with NATS_connection for connection reuse:**
|
||||
```julia
|
||||
using NATSBridge
|
||||
|
||||
# Create connection once for high-frequency publishing
|
||||
conn = NATS.connect("nats://localhost:4222")
|
||||
|
||||
# Send multiple messages using the same connection (saves connection overhead)
|
||||
for i in 1:100
|
||||
config = Dict("iteration" => i, "data" => rand())
|
||||
smartsend(
|
||||
"control",
|
||||
[("config", config, "dictionary")],
|
||||
NATS_connection=conn, # Reuse connection
|
||||
is_publish=true
|
||||
)
|
||||
end
|
||||
|
||||
# Close connection when done
|
||||
NATS.close(conn)
|
||||
```
|
||||
|
||||
**Use Case:** High-frequency publishing scenarios where connection reuse provides performance benefits by avoiding the overhead of establishing a new NATS connection for each message.
|
||||
|
||||
**JavaScript (Sender/Receiver):**
|
||||
```javascript
|
||||
const { smartsend } = require('./src/NATSBridge');
|
||||
@@ -478,6 +502,111 @@ env, env_json_str = smartsend(
|
||||
# env_json_str: JSON string for publishing
|
||||
```
|
||||
|
||||
#### smartsend Function Signature (Julia)
|
||||
|
||||
```julia
|
||||
function smartsend(
|
||||
subject::String,
|
||||
data::AbstractArray{Tuple{String, Any, String}, 1}; # List of (dataname, data, type) tuples
|
||||
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler::Function = plik_oneshot_upload,
|
||||
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id::Union{String, Nothing} = nothing,
|
||||
msg_purpose::String = "chat",
|
||||
sender_name::String = "NATSBridge",
|
||||
receiver_name::String = "",
|
||||
receiver_id::String = "",
|
||||
reply_to::String = "",
|
||||
reply_to_msg_id::String = "",
|
||||
is_publish::Bool = true,
|
||||
NATS_connection::Union{NATS.Connection, Nothing} = nothing # Pre-existing NATS connection (optional)
|
||||
)
|
||||
```
|
||||
|
||||
**New Keyword Parameter:**
|
||||
- `NATS_connection::Union{NATS.Connection, Nothing} = nothing` - Pre-existing NATS connection. When provided, `smartsend` uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios.
|
||||
|
||||
**Connection Handling Logic:**
|
||||
```julia
|
||||
if is_publish == false
|
||||
# skip publish
|
||||
elseif is_publish == true && NATS_connection === nothing
|
||||
publish_message(broker_url, subject, env_json_str, cid) # Creates new connection
|
||||
elseif is_publish == true && NATS_connection !== nothing
|
||||
publish_message(NATS_connection, subject, env_json_str, cid) # Uses provided connection
|
||||
end
|
||||
```
|
||||
|
||||
**Example with pre-existing connection:**
|
||||
```julia
|
||||
using NATSBridge
|
||||
|
||||
# Create connection once
|
||||
conn = NATS.connect("nats://localhost:4222")
|
||||
|
||||
# Send multiple messages using the same connection
|
||||
for i in 1:100
|
||||
data = rand(1000)
|
||||
smartsend(
|
||||
"analysis_results",
|
||||
[("table_data", data, "table")],
|
||||
NATS_connection=conn, # Reuse connection
|
||||
is_publish=true
|
||||
)
|
||||
end
|
||||
|
||||
# Close connection when done
|
||||
NATS.close(conn)
|
||||
```
|
||||
|
||||
#### publish_message Function
|
||||
|
||||
The `publish_message` function provides two overloads for publishing messages to NATS:
|
||||
|
||||
**Overload 1 - URL-based publishing (creates new connection):**
|
||||
```julia
|
||||
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
|
||||
conn = NATS.connect(broker_url) # Create NATS connection
|
||||
publish_message(conn, subject, message, correlation_id)
|
||||
end
|
||||
```
|
||||
|
||||
**Overload 2 - Connection-based publishing (uses pre-existing connection):**
|
||||
```julia
|
||||
function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
|
||||
try
|
||||
NATS.publish(conn, subject, message) # Publish message to NATS
|
||||
log_trace(correlation_id, "Message published to $subject")
|
||||
finally
|
||||
NATS.drain(conn) # Ensure connection is closed properly
|
||||
end
|
||||
end
|
||||
```
|
||||
|
||||
**Use Case:** Use the connection-based overload when you already have an established NATS connection and want to publish multiple messages without the overhead of creating a new connection for each publish.
|
||||
|
||||
**Integration with smartsend:**
|
||||
```julia
|
||||
# When NATS_connection is provided to smartsend, it uses the connection-based publish_message
|
||||
env, env_json_str = smartsend(
|
||||
"my.subject",
|
||||
[("data", payload_data, "type")],
|
||||
NATS_connection=my_connection, # Pre-existing connection
|
||||
is_publish=true
|
||||
)
|
||||
# Uses: publish_message(NATS_connection, subject, env_json_str, cid)
|
||||
|
||||
# When NATS_connection is not provided, it uses the URL-based publish_message
|
||||
env, env_json_str = smartsend(
|
||||
"my.subject",
|
||||
[("data", payload_data, "type")],
|
||||
broker_url="nats://localhost:4222",
|
||||
is_publish=true
|
||||
)
|
||||
# Uses: publish_message(broker_url, subject, env_json_str, cid)
|
||||
```
|
||||
|
||||
#### JavaScript (Receiver)
|
||||
```javascript
|
||||
const { smartreceive } = require('./src/NATSBridge');
|
||||
|
||||
Reference in New Issue
Block a user