update
This commit is contained in:
@@ -56,8 +56,8 @@ struct msgPayload_v1
|
|||||||
end
|
end
|
||||||
|
|
||||||
# constructor
|
# constructor
|
||||||
function msgPayload_v1(
|
function msgPayload_v1(;
|
||||||
id::String = "";
|
id::String = "",
|
||||||
dataname::String = "",
|
dataname::String = "",
|
||||||
type::String = "text",
|
type::String = "text",
|
||||||
transport::String = "direct",
|
transport::String = "direct",
|
||||||
@@ -100,8 +100,8 @@ struct msgEnvelope_v1
|
|||||||
end
|
end
|
||||||
|
|
||||||
# constructor
|
# constructor
|
||||||
function msgEnvelope_v1(
|
function msgEnvelope_v1(;
|
||||||
correlationId::String = "";
|
correlationId::String = "",
|
||||||
msgId::String = "",
|
msgId::String = "",
|
||||||
timestamp::String = "",
|
timestamp::String = "",
|
||||||
sendTo::String = "",
|
sendTo::String = "",
|
||||||
@@ -547,7 +547,7 @@ payloads = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay,
|
|||||||
"""
|
"""
|
||||||
function smartreceive(
|
function smartreceive(
|
||||||
msg::NATS.Message,
|
msg::NATS.Message,
|
||||||
fileserverDownloadHandler::Function;
|
fileserverDownloadHandler::Function=_fetch_with_backoff;
|
||||||
max_retries::Int = 5,
|
max_retries::Int = 5,
|
||||||
base_delay::Int = 100,
|
base_delay::Int = 100,
|
||||||
max_delay::Int = 5000
|
max_delay::Int = 5000
|
||||||
@@ -588,9 +588,9 @@ function smartreceive(
|
|||||||
# Extract URL from the payload
|
# Extract URL from the payload
|
||||||
url = String(payload_data["data"])
|
url = String(payload_data["data"])
|
||||||
|
|
||||||
# Fetch with exponential backoff using the download handler
|
#[WORKING] Fetch with exponential backoff using the download handler
|
||||||
downloaded_data = fileserverDownloadHandler(DEFAULT_FILESERVER_URL, url, max_retries, base_delay, max_delay)
|
downloaded_data = fileserverDownloadHandler(json_data["data"], max_retries, base_delay, max_delay,json_data["correlationId"])
|
||||||
|
|
||||||
# Deserialize based on type
|
# Deserialize based on type
|
||||||
data_type = String(payload_data["type"])
|
data_type = String(payload_data["type"])
|
||||||
data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"])
|
data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"])
|
||||||
@@ -851,4 +851,22 @@ function plik_oneshot_upload(fileServerURL::String, filepath::String)
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
end # module
|
end # module
|
||||||
|
|||||||
@@ -81,13 +81,13 @@ function test_large_binary_receive()
|
|||||||
# For link transport, result.data is the URL
|
# For link transport, result.data is the URL
|
||||||
log_trace("Received link transport")
|
log_trace("Received link transport")
|
||||||
end
|
end
|
||||||
|
|
||||||
# Verify the received data matches the original
|
# Verify the received data matches the original
|
||||||
if result.envelope.type == "binary"
|
if result.envelope.type == "binary"
|
||||||
if isa(result.data, Vector{UInt8})
|
if isa(result.data, Vector{UInt8})
|
||||||
file_size = length(result.data)
|
file_size = length(result.data)
|
||||||
log_trace("Received $(file_size) bytes of binary data")
|
log_trace("Received $(file_size) bytes of binary data")
|
||||||
|
|
||||||
# Save received data to a test file
|
# Save received data to a test file
|
||||||
println("metadata ", result.envelope.metadata)
|
println("metadata ", result.envelope.metadata)
|
||||||
dataname = result.envelope.metadata["dataname"]
|
dataname = result.envelope.metadata["dataname"]
|
||||||
@@ -96,7 +96,7 @@ function test_large_binary_receive()
|
|||||||
write(output_path, result.data)
|
write(output_path, result.data)
|
||||||
log_trace("Saved received data to $output_path")
|
log_trace("Saved received data to $output_path")
|
||||||
end
|
end
|
||||||
|
|
||||||
# Verify file size
|
# Verify file size
|
||||||
original_size = length(read(FILE_PATH))
|
original_size = length(read(FILE_PATH))
|
||||||
if file_size == result.envelope.metadata["content_length"]
|
if file_size == result.envelope.metadata["content_length"]
|
||||||
|
|||||||
Reference in New Issue
Block a user