update
This commit is contained in:
@@ -120,15 +120,15 @@ function msg_payload_v1(
|
||||
metadata::Dict{String, T} = Dict{String, Any}()
|
||||
) where {T<:Any}
|
||||
return msg_payload_v1(
|
||||
id,
|
||||
dataname,
|
||||
payload_type,
|
||||
transport,
|
||||
encoding,
|
||||
size,
|
||||
data,
|
||||
metadata
|
||||
)
|
||||
id,
|
||||
dataname,
|
||||
payload_type,
|
||||
transport,
|
||||
encoding,
|
||||
size,
|
||||
data,
|
||||
metadata
|
||||
)
|
||||
end
|
||||
|
||||
|
||||
@@ -167,13 +167,13 @@ payload2 = msg_payload_v1("http://example.com/file.zip", "binary"; dataname="fil
|
||||
|
||||
# Create message envelope
|
||||
env = msg_envelope_v1(
|
||||
"my.subject",
|
||||
[payload1, payload2];
|
||||
correlation_id = string(uuid4()),
|
||||
msg_purpose = "chat",
|
||||
sender_name = "my-app",
|
||||
receiver_name = "receiver-app",
|
||||
reply_to = "reply.subject"
|
||||
"my.subject",
|
||||
[payload1, payload2];
|
||||
correlation_id = string(uuid4()),
|
||||
msg_purpose = "chat",
|
||||
sender_name = "my-app",
|
||||
receiver_name = "receiver-app",
|
||||
reply_to = "reply.subject"
|
||||
)
|
||||
```
|
||||
"""
|
||||
@@ -498,22 +498,22 @@ function smartsend(
|
||||
end
|
||||
end
|
||||
|
||||
# Create msg_envelope_v1 with all payloads
|
||||
env = msg_envelope_v1(
|
||||
subject,
|
||||
payloads;
|
||||
correlation_id = cid,
|
||||
msg_id = msg_id,
|
||||
msg_purpose = msg_purpose,
|
||||
sender_name = sender_name,
|
||||
sender_id = string(uuid4()),
|
||||
receiver_name = receiver_name,
|
||||
receiver_id = receiver_id,
|
||||
reply_to = reply_to,
|
||||
reply_to_msg_id = reply_to_msg_id,
|
||||
broker_url = broker_url,
|
||||
metadata = Dict{String, Any}(),
|
||||
)
|
||||
# Create msg_envelope_v1 with all payloads
|
||||
env = msg_envelope_v1(
|
||||
subject,
|
||||
payloads;
|
||||
correlation_id = cid,
|
||||
msg_id = msg_id,
|
||||
msg_purpose = msg_purpose,
|
||||
sender_name = sender_name,
|
||||
sender_id = string(uuid4()),
|
||||
receiver_name = receiver_name,
|
||||
receiver_id = receiver_id,
|
||||
reply_to = reply_to,
|
||||
reply_to_msg_id = reply_to_msg_id,
|
||||
broker_url = broker_url,
|
||||
metadata = Dict{String, Any}(),
|
||||
)
|
||||
|
||||
env_json_str = envelope_to_json(env) # Convert envelope to JSON
|
||||
if is_publish
|
||||
@@ -602,48 +602,48 @@ function _serialize_data(data::Any, payload_type::String)
|
||||
"""
|
||||
|
||||
if payload_type == "text" # Text data - convert to UTF-8 bytes
|
||||
if isa(data, String)
|
||||
data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes
|
||||
return data_bytes
|
||||
else
|
||||
error("Text data must be a String")
|
||||
end
|
||||
if isa(data, String)
|
||||
data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes
|
||||
return data_bytes
|
||||
else
|
||||
error("Text data must be a String")
|
||||
end
|
||||
elseif payload_type == "dictionary" # JSON data - serialize directly
|
||||
json_str = JSON.json(data) # Convert Julia data to JSON string
|
||||
json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes
|
||||
return json_str_bytes
|
||||
json_str = JSON.json(data) # Convert Julia data to JSON string
|
||||
json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes
|
||||
return json_str_bytes
|
||||
elseif payload_type == "table" # Table data - convert to Arrow IPC stream
|
||||
io = IOBuffer() # Create in-memory buffer
|
||||
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
|
||||
return take!(io) # Return the buffer contents as bytes
|
||||
io = IOBuffer() # Create in-memory buffer
|
||||
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
|
||||
return take!(io) # Return the buffer contents as bytes
|
||||
elseif payload_type == "image" # Image data - treat as binary
|
||||
if isa(data, Vector{UInt8})
|
||||
return data # Return binary data directly
|
||||
else
|
||||
error("Image data must be Vector{UInt8}")
|
||||
end
|
||||
if isa(data, Vector{UInt8})
|
||||
return data # Return binary data directly
|
||||
else
|
||||
error("Image data must be Vector{UInt8}")
|
||||
end
|
||||
elseif payload_type == "audio" # Audio data - treat as binary
|
||||
if isa(data, Vector{UInt8})
|
||||
return data # Return binary data directly
|
||||
else
|
||||
error("Audio data must be Vector{UInt8}")
|
||||
end
|
||||
if isa(data, Vector{UInt8})
|
||||
return data # Return binary data directly
|
||||
else
|
||||
error("Audio data must be Vector{UInt8}")
|
||||
end
|
||||
elseif payload_type == "video" # Video data - treat as binary
|
||||
if isa(data, Vector{UInt8})
|
||||
return data # Return binary data directly
|
||||
else
|
||||
error("Video data must be Vector{UInt8}")
|
||||
end
|
||||
if isa(data, Vector{UInt8})
|
||||
return data # Return binary data directly
|
||||
else
|
||||
error("Video data must be Vector{UInt8}")
|
||||
end
|
||||
elseif payload_type == "binary" # Binary data - treat as binary
|
||||
if isa(data, IOBuffer) # Check if data is an IOBuffer
|
||||
return take!(data) # Return buffer contents as bytes
|
||||
elseif isa(data, Vector{UInt8}) # Check if data is already binary
|
||||
return data # Return binary data directly
|
||||
else # Unsupported binary data type
|
||||
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
||||
end
|
||||
if isa(data, IOBuffer) # Check if data is an IOBuffer
|
||||
return take!(data) # Return buffer contents as bytes
|
||||
elseif isa(data, Vector{UInt8}) # Check if data is already binary
|
||||
return data # Return binary data directly
|
||||
else # Unsupported binary data type
|
||||
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
||||
end
|
||||
else # Unknown type
|
||||
error("Unknown payload_type: $payload_type")
|
||||
error("Unknown payload_type: $payload_type")
|
||||
end
|
||||
end
|
||||
|
||||
@@ -673,13 +673,13 @@ connection management and logging.
|
||||
```
|
||||
"""
|
||||
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
|
||||
conn = NATS.connect(broker_url) # Create NATS connection
|
||||
try
|
||||
NATS.publish(conn, subject, message) # Publish message to NATS
|
||||
log_trace(correlation_id, "Message published to $subject") # Log successful publish
|
||||
finally
|
||||
NATS.drain(conn) # Ensure connection is closed properly
|
||||
end
|
||||
conn = NATS.connect(broker_url) # Create NATS connection
|
||||
try
|
||||
NATS.publish(conn, subject, message) # Publish message to NATS
|
||||
log_trace(correlation_id, "Message published to $subject") # Log successful publish
|
||||
finally
|
||||
NATS.drain(conn) # Ensure connection is closed properly
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -717,60 +717,60 @@ payloads = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, ma
|
||||
```
|
||||
"""
|
||||
function smartreceive(
|
||||
msg::NATS.Msg;
|
||||
fileserver_download_handler::Function=_fetch_with_backoff,
|
||||
max_retries::Int = 5,
|
||||
base_delay::Int = 100,
|
||||
max_delay::Int = 5000
|
||||
msg::NATS.Msg;
|
||||
fileserver_download_handler::Function = _fetch_with_backoff,
|
||||
max_retries::Int = 5,
|
||||
base_delay::Int = 100,
|
||||
max_delay::Int = 5000
|
||||
)
|
||||
# Parse the JSON envelope
|
||||
json_data = JSON.parse(String(msg.payload))
|
||||
log_trace(json_data["correlation_id"], "Processing received message") # Log message processing start
|
||||
# Parse the JSON envelope
|
||||
json_data = JSON.parse(String(msg.payload))
|
||||
log_trace(json_data["correlation_id"], "Processing received message") # Log message processing start
|
||||
|
||||
# Process all payloads in the envelope
|
||||
payloads_list = Tuple{String, Any, String}[]
|
||||
|
||||
# Get number of payloads
|
||||
num_payloads = length(json_data["payloads"])
|
||||
|
||||
# Process all payloads in the envelope
|
||||
payloads_list = Tuple{String, Any, String}[]
|
||||
for i in 1:num_payloads
|
||||
payload = json_data["payloads"][i]
|
||||
transport = String(payload["transport"])
|
||||
dataname = String(payload["dataname"])
|
||||
|
||||
# Get number of payloads
|
||||
num_payloads = length(json_data["payloads"])
|
||||
|
||||
for i in 1:num_payloads
|
||||
payload = json_data["payloads"][i]
|
||||
transport = String(payload["transport"])
|
||||
dataname = String(payload["dataname"])
|
||||
|
||||
if transport == "direct" # Direct transport - payload is in the message
|
||||
log_trace(json_data["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
|
||||
|
||||
# Extract base64 payload from the payload
|
||||
payload_b64 = String(payload["data"])
|
||||
|
||||
# Decode Base64 payload
|
||||
payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = String(payload["payload_type"])
|
||||
data = _deserialize_data(payload_bytes, data_type, json_data["correlation_id"])
|
||||
|
||||
push!(payloads_list, (dataname, data, data_type))
|
||||
elseif transport == "link" # Link transport - payload is at URL
|
||||
# Extract download URL from the payload
|
||||
url = String(payload["data"])
|
||||
log_trace(json_data["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
|
||||
|
||||
# Fetch with exponential backoff using the download handler
|
||||
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, json_data["correlation_id"])
|
||||
if transport == "direct" # Direct transport - payload is in the message
|
||||
log_trace(json_data["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
|
||||
|
||||
# Extract base64 payload from the payload
|
||||
payload_b64 = String(payload["data"])
|
||||
|
||||
# Decode Base64 payload
|
||||
payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = String(payload["payload_type"])
|
||||
data = _deserialize_data(payload_bytes, data_type, json_data["correlation_id"])
|
||||
|
||||
push!(payloads_list, (dataname, data, data_type))
|
||||
elseif transport == "link" # Link transport - payload is at URL
|
||||
# Extract download URL from the payload
|
||||
url = String(payload["data"])
|
||||
log_trace(json_data["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
|
||||
|
||||
# Fetch with exponential backoff using the download handler
|
||||
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, json_data["correlation_id"])
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = String(payload["payload_type"])
|
||||
data = _deserialize_data(downloaded_data, data_type, json_data["correlation_id"])
|
||||
|
||||
push!(payloads_list, (dataname, data, data_type))
|
||||
else # Unknown transport type
|
||||
error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
|
||||
end
|
||||
# Deserialize based on type
|
||||
data_type = String(payload["payload_type"])
|
||||
data = _deserialize_data(downloaded_data, data_type, json_data["correlation_id"])
|
||||
|
||||
push!(payloads_list, (dataname, data, data_type))
|
||||
else # Unknown transport type
|
||||
error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
|
||||
end
|
||||
json_data["payloads"] = payloads_list
|
||||
return json_data # Return envelope with list of (dataname, data, data_type) tuples in payloads field
|
||||
end
|
||||
json_data["payloads"] = payloads_list
|
||||
return json_data # Return envelope with list of (dataname, data, data_type) tuples in payloads field
|
||||
end
|
||||
|
||||
|
||||
@@ -805,33 +805,33 @@ data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "correla
|
||||
```
|
||||
"""
|
||||
function _fetch_with_backoff(
|
||||
url::String,
|
||||
max_retries::Int,
|
||||
base_delay::Int,
|
||||
max_delay::Int,
|
||||
correlation_id::String
|
||||
url::String,
|
||||
max_retries::Int,
|
||||
base_delay::Int,
|
||||
max_delay::Int,
|
||||
correlation_id::String
|
||||
)
|
||||
delay = base_delay # Initialize delay with base delay value
|
||||
for attempt in 1:max_retries # Attempt to fetch data up to max_retries times
|
||||
try
|
||||
response = HTTP.request("GET", url) # Make HTTP GET request to URL
|
||||
if response.status == 200 # Check if request was successful
|
||||
log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") # Log success
|
||||
return response.body # Return response body as bytes
|
||||
else # Request failed
|
||||
error("Failed to fetch: $(response.status)") # Throw error for non-200 status
|
||||
end
|
||||
catch e # Handle exceptions during fetch
|
||||
log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") # Log failure
|
||||
|
||||
if attempt < max_retries # Only sleep if not the last attempt
|
||||
sleep(delay / 1000.0) # Sleep for delay seconds (convert from ms)
|
||||
delay = min(delay * 2, max_delay) # Double delay for next attempt, capped at max_delay
|
||||
end
|
||||
end
|
||||
delay = base_delay # Initialize delay with base delay value
|
||||
for attempt in 1:max_retries # Attempt to fetch data up to max_retries times
|
||||
try
|
||||
response = HTTP.request("GET", url) # Make HTTP GET request to URL
|
||||
if response.status == 200 # Check if request was successful
|
||||
log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") # Log success
|
||||
return response.body # Return response body as bytes
|
||||
else # Request failed
|
||||
error("Failed to fetch: $(response.status)") # Throw error for non-200 status
|
||||
end
|
||||
catch e # Handle exceptions during fetch
|
||||
log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") # Log failure
|
||||
|
||||
if attempt < max_retries # Only sleep if not the last attempt
|
||||
sleep(delay / 1000.0) # Sleep for delay seconds (convert from ms)
|
||||
delay = min(delay * 2, max_delay) # Double delay for next attempt, capped at max_delay
|
||||
end
|
||||
end
|
||||
|
||||
error("Failed to fetch data after $max_retries attempts") # Throw error if all attempts failed
|
||||
end
|
||||
|
||||
error("Failed to fetch data after $max_retries attempts") # Throw error if all attempts failed
|
||||
end
|
||||
|
||||
|
||||
@@ -875,30 +875,30 @@ table_data = _deserialize_data(arrow_bytes, "table", "correlation123")
|
||||
```
|
||||
"""
|
||||
function _deserialize_data(
|
||||
data::Vector{UInt8},
|
||||
payload_type::String,
|
||||
correlation_id::String
|
||||
data::Vector{UInt8},
|
||||
payload_type::String,
|
||||
correlation_id::String
|
||||
)
|
||||
if payload_type == "text" # Text data - convert to string
|
||||
return String(data) # Convert bytes to string
|
||||
elseif payload_type == "dictionary" # JSON data - deserialize
|
||||
json_str = String(data) # Convert bytes to string
|
||||
return JSON.parse(json_str) # Parse JSON string to JSON object
|
||||
elseif payload_type == "table" # Table data - deserialize Arrow IPC stream
|
||||
io = IOBuffer(data) # Create buffer from bytes
|
||||
df = Arrow.Table(io) # Read Arrow IPC format from buffer
|
||||
return df # Return DataFrame
|
||||
elseif payload_type == "image" # Image data - return binary
|
||||
return data # Return bytes directly
|
||||
elseif payload_type == "audio" # Audio data - return binary
|
||||
return data # Return bytes directly
|
||||
elseif payload_type == "video" # Video data - return binary
|
||||
return data # Return bytes directly
|
||||
elseif payload_type == "binary" # Binary data - return binary
|
||||
return data # Return bytes directly
|
||||
else # Unknown type
|
||||
error("Unknown payload_type: $payload_type") # Throw error for unknown type
|
||||
end
|
||||
if payload_type == "text" # Text data - convert to string
|
||||
return String(data) # Convert bytes to string
|
||||
elseif payload_type == "dictionary" # JSON data - deserialize
|
||||
json_str = String(data) # Convert bytes to string
|
||||
return JSON.parse(json_str) # Parse JSON string to JSON object
|
||||
elseif payload_type == "table" # Table data - deserialize Arrow IPC stream
|
||||
io = IOBuffer(data) # Create buffer from bytes
|
||||
df = Arrow.Table(io) # Read Arrow IPC format from buffer
|
||||
return df # Return DataFrame
|
||||
elseif payload_type == "image" # Image data - return binary
|
||||
return data # Return bytes directly
|
||||
elseif payload_type == "audio" # Audio data - return binary
|
||||
return data # Return bytes directly
|
||||
elseif payload_type == "video" # Video data - return binary
|
||||
return data # Return bytes directly
|
||||
elseif payload_type == "binary" # Binary data - return binary
|
||||
return data # Return bytes directly
|
||||
else # Unknown type
|
||||
error("Unknown payload_type: $payload_type") # Throw error for unknown type
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -1035,10 +1035,10 @@ function plik_oneshot_upload(file_server_url::String, filepath::String)
|
||||
url_upload = "$file_server_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
http_response = open(filepath, "r") do file_stream
|
||||
form = HTTP.Form(Dict("file" => file_stream))
|
||||
|
||||
# Adding status_exception=false prevents 4xx/5xx from triggering 'catch'
|
||||
HTTP.post(url_upload, headers, form; status_exception = false)
|
||||
form = HTTP.Form(Dict("file" => file_stream))
|
||||
|
||||
# Adding status_exception=false prevents 4xx/5xx from triggering 'catch'
|
||||
HTTP.post(url_upload, headers, form; status_exception = false)
|
||||
end
|
||||
|
||||
if !isnothing(http_response) && http_response.status == 200
|
||||
@@ -1047,7 +1047,6 @@ function plik_oneshot_upload(file_server_url::String, filepath::String)
|
||||
error("Failed to upload file: server returned status $(http_response.status)")
|
||||
end
|
||||
response_json = JSON.parse(http_response.body)
|
||||
|
||||
fileid = response_json["id"]
|
||||
|
||||
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
|
||||
|
||||
Reference in New Issue
Block a user