update
This commit is contained in:
@@ -35,13 +35,14 @@
|
||||
|
||||
module NATSBridge
|
||||
|
||||
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64
|
||||
using Revise
|
||||
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting
|
||||
# ---------------------------------------------- 100 --------------------------------------------- #
|
||||
|
||||
# Constants
|
||||
const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport
|
||||
const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL
|
||||
const DEFAULT_FILESERVER_URL = "http://localhost:8080/upload" # Default HTTP file server URL for link transport
|
||||
const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport
|
||||
|
||||
|
||||
struct msgPayload_v1
|
||||
@@ -64,8 +65,8 @@ function msgPayload_v1(;
|
||||
encoding::String = "none",
|
||||
size::Integer = 0,
|
||||
data::Any = nothing,
|
||||
metadata::Dict{String, Any} = Dict{String, Any}()
|
||||
)
|
||||
metadata::Dict{String, T} = Dict{String, Any}()
|
||||
) where {T<:Any}
|
||||
return msgPayload_v1(
|
||||
id,
|
||||
dataname,
|
||||
@@ -172,15 +173,18 @@ function envelope_to_json(env::msgEnvelope_v1)
|
||||
"size" => payload.size,
|
||||
)
|
||||
# Include data based on transport type
|
||||
if payload.transport == "direct" && payload.data !== nothing
|
||||
if payload.encoding == "base64" || payload.encoding == "json"
|
||||
payload_obj["data"] = payload.data
|
||||
else
|
||||
# For other encodings, use base64
|
||||
payload_bytes = _get_payload_bytes(payload.data)
|
||||
payload_obj["data"] = Base64.base64encode(payload_bytes)
|
||||
end
|
||||
end
|
||||
if payload.transport == "direct" && payload.data !== nothing
|
||||
if payload.encoding == "base64" || payload.encoding == "json"
|
||||
payload_obj["data"] = payload.data
|
||||
else
|
||||
# For other encodings, use base64
|
||||
payload_bytes = _get_payload_bytes(payload.data)
|
||||
payload_obj["data"] = Base64.base64encode(payload_bytes)
|
||||
end
|
||||
elseif payload.transport == "link" && payload.data !== nothing
|
||||
# For link transport, data is a URL string - include directly
|
||||
payload_obj["data"] = payload.data
|
||||
end
|
||||
if !isempty(payload.metadata)
|
||||
payload_obj["metadata"] = Dict(String(k) => v for (k, v) in payload.metadata)
|
||||
end
|
||||
@@ -284,7 +288,7 @@ env = smartsend("chat.subject", [
|
||||
"""
|
||||
function smartsend(
|
||||
subject::String, # smartreceive's subject
|
||||
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) tuples
|
||||
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples
|
||||
nats_url::String = DEFAULT_NATS_URL,
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
|
||||
@@ -296,7 +300,8 @@ function smartsend(
|
||||
receiver_id::String = "",
|
||||
reply_to::String = "",
|
||||
reply_to_msg_id::String = ""
|
||||
)
|
||||
) where {T1<:Any}
|
||||
|
||||
# Generate correlation ID if not provided
|
||||
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
|
||||
|
||||
@@ -330,7 +335,7 @@ function smartsend(
|
||||
encoding = "base64",
|
||||
size = payload_size,
|
||||
data = payload_b64,
|
||||
metadata = Dict("payload_bytes" => payload_size)
|
||||
metadata = Dict{String, Any}("payload_bytes" => payload_size)
|
||||
)
|
||||
push!(payloads, payload)
|
||||
else
|
||||
@@ -356,11 +361,18 @@ function smartsend(
|
||||
encoding = "none",
|
||||
size = payload_size,
|
||||
data = url,
|
||||
metadata = Dict()
|
||||
metadata = Dict{String, Any}()
|
||||
)
|
||||
println("1 ----------")
|
||||
PrettyPrinting.pprintln(payload)
|
||||
println("1 ==========")
|
||||
push!(payloads, payload)
|
||||
end
|
||||
end
|
||||
|
||||
println("2 ----------")
|
||||
PrettyPrinting.pprintln(payloads)
|
||||
println("2 ==========")
|
||||
|
||||
# Create msgEnvelope_v1 with all payloads
|
||||
env = msgEnvelope_v1(
|
||||
@@ -376,12 +388,16 @@ function smartsend(
|
||||
replyTo = reply_to,
|
||||
replyToMsgId = reply_to_msg_id,
|
||||
brokerURL = nats_url,
|
||||
metadata = Dict(),
|
||||
metadata = Dict{String, Any}(),
|
||||
payloads = payloads
|
||||
)
|
||||
|
||||
msg_json = envelope_to_json(env) # Convert envelope to JSON
|
||||
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
|
||||
|
||||
println("3 ----------")
|
||||
PrettyPrinting.pprintln(msg_json)
|
||||
println("3 ==========")
|
||||
|
||||
return env # Return the envelope for tracking
|
||||
end
|
||||
@@ -527,7 +543,7 @@ It deserializes the data based on the transport type and returns the result.
|
||||
A HTTP file server is required along with its upload function.
|
||||
|
||||
Arguments:
|
||||
- `msg::NATS.Message` - NATS message to process
|
||||
- `msg::NATS.Msg` - NATS message to process
|
||||
- `fileserverDownloadHandler::Function` - Function to handle downloading data from file server URLs
|
||||
|
||||
Keyword Arguments:
|
||||
@@ -547,62 +563,67 @@ payloads = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay,
|
||||
```
|
||||
"""
|
||||
function smartreceive(
|
||||
msg::NATS.Message,
|
||||
fileserverDownloadHandler::Function=_fetch_with_backoff;
|
||||
msg::NATS.Msg;
|
||||
fileserverDownloadHandler::Function=_fetch_with_backoff,
|
||||
max_retries::Int = 5,
|
||||
base_delay::Int = 100,
|
||||
max_delay::Int = 5000
|
||||
)
|
||||
# Parse the JSON envelope
|
||||
json_data = JSON.parse(String(msg.payload))
|
||||
|
||||
println("R1 ----------")
|
||||
PrettyPrinting.pprintln(json_data)
|
||||
println("R1 ==========")
|
||||
log_trace(json_data["correlationId"], "Processing received message") # Log message processing start
|
||||
|
||||
# Process all payloads in the envelope
|
||||
payloads_list = Tuple{String, Any}[]
|
||||
payloads_list = Tuple{String, Any, String}[]
|
||||
|
||||
# Get number of payloads
|
||||
num_payloads = length(json_data["payloads"])
|
||||
|
||||
for i in 1:num_payloads
|
||||
payload_data = json_data["payloads"][i]
|
||||
transport = String(payload_data["transport"])
|
||||
dataname = String(payload_data["dataname"])
|
||||
payload = json_data["payloads"][i]
|
||||
transport = String(payload["transport"])
|
||||
dataname = String(payload["dataname"])
|
||||
|
||||
if transport == "direct" # Direct transport - payload is in the message
|
||||
log_trace(json_data["correlationId"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
|
||||
|
||||
# Extract base64 payload from the payload
|
||||
payload_b64 = String(payload_data["data"])
|
||||
payload_b64 = String(payload["data"])
|
||||
|
||||
# Decode Base64 payload
|
||||
payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = String(payload_data["type"])
|
||||
data_type = String(payload["type"])
|
||||
data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"])
|
||||
|
||||
push!(payloads_list, (dataname, data))
|
||||
push!(payloads_list, (dataname, data, data_type))
|
||||
elseif transport == "link" # Link transport - payload is at URL
|
||||
log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL") # Log link transport handling
|
||||
|
||||
# Extract download URL from the payload
|
||||
url = String(payload_data["data"])
|
||||
println("R2 ----------")
|
||||
PrettyPrinting.pprintln(payload)
|
||||
println("R2 ==========")
|
||||
url = String(payload["data"])
|
||||
|
||||
#[WORKING] Fetch with exponential backoff using the download handler
|
||||
# Fetch with exponential backoff using the download handler
|
||||
downloaded_data = fileserverDownloadHandler(url, max_retries, base_delay, max_delay, json_data["correlationId"])
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = String(payload_data["type"])
|
||||
data_type = String(payload["type"])
|
||||
data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"])
|
||||
|
||||
push!(payloads_list, (dataname, data))
|
||||
push!(payloads_list, (dataname, data, data_type))
|
||||
else # Unknown transport type
|
||||
error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
|
||||
end
|
||||
end
|
||||
|
||||
return payloads_list # Return list of (dataname, data) tuples
|
||||
return payloads_list # Return list of (dataname, data, data_type) tuples
|
||||
end
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user