This commit is contained in:
2026-02-17 21:04:32 +07:00
parent ad87934abf
commit 1d5c6f3348
3 changed files with 142 additions and 147 deletions

View File

@@ -12,10 +12,16 @@ The system uses **handler functions** to abstract file server operations, allowi
```julia ```julia
# Upload handler - uploads data to file server and returns URL # Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserverUploadHandler parameter
# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Download handler - fetches data from file server URL # Download handler - fetches data from file server URL with exponential backoff
fileserverDownloadHandler(fileserver_url::String, url::String, max_retries::Int, base_delay::Int, max_delay::Int)::Vector{UInt8} # The handler is passed to smartreceive as fileserverDownloadHandler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
``` ```
This design allows the system to support multiple file server backends without changing the core messaging logic. This design allows the system to support multiple file server backends without changing the core messaging logic.
@@ -154,8 +160,7 @@ end
"brokerURL": "nats://localhost:4222", "brokerURL": "nats://localhost:4222",
"metadata": { "metadata": {
"content_type": "application/octet-stream",
"content_length": 123456
}, },
"payloads": [ "payloads": [
@@ -168,7 +173,7 @@ end
"size": 15433, "size": 15433,
"data": "base64-encoded-string", "data": "base64-encoded-string",
"metadata": { "metadata": {
"checksum": "sha256_hash"
} }
}, },
{ {
@@ -180,7 +185,7 @@ end
"size": 524288, "size": 524288,
"data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow", "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow",
"metadata": { "metadata": {
"checksum": "sha256_hash"
} }
} }
] ]
@@ -321,19 +326,19 @@ function smartsend(
```julia ```julia
function smartreceive( function smartreceive(
msg::NATS.Message; msg::NATS.Message,
fileserverDownloadHandler::Function, fileserverDownloadHandler::Function=_fetch_with_backoff;
max_retries::Int = 5, max_retries::Int = 5,
base_delay::Int = 100, base_delay::Int = 100,
max_delay::Int = 5000 max_delay::Int = 5000
) )
# Parse envelope # Parse envelope
# Iterate through all payloads # Iterate through all payloads
# For each payload: check transport type # For each payload: check transport type
# If direct: decode Base64 payload # If direct: decode Base64 payload
# If link: fetch from URL with exponential backoff using fileserverDownloadHandler # If link: fetch from URL with exponential backoff using fileserverDownloadHandler
# Deserialize payload based on type # Deserialize payload based on type
# Return list of (dataname, data) tuples # Return list of (dataname, data) tuples
end end
``` ```
@@ -347,10 +352,12 @@ end
3. For each payload: 3. For each payload:
- Determine transport type (`direct` or `link`) - Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message - If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff - If `link`: fetch data from URL using exponential backoff (via `fileserverDownloadHandler`)
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.) - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return list of `(dataname, data)` tuples 4. Return list of `(dataname, data)` tuples
**Note:** The `fileserverDownloadHandler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`.
### JavaScript Implementation ### JavaScript Implementation
#### Dependencies #### Dependencies
@@ -388,10 +395,10 @@ async function smartsend(subject, data, options = {})
async function smartreceive(msg, options = {}) async function smartreceive(msg, options = {})
// options object should include: // options object should include:
// - fileserverDownloadHandler: function to fetch data from file server URL // - fileserverDownloadHandler: function to fetch data from file server URL
// - fileserver_url: base URL of the file server
// - max_retries: maximum retry attempts for fetching URL // - max_retries: maximum retry attempts for fetching URL
// - base_delay: initial delay for exponential backoff in ms // - base_delay: initial delay for exponential backoff in ms
// - max_delay: maximum delay for exponential backoff in ms // - max_delay: maximum delay for exponential backoff in ms
// - correlationId: optional correlation ID for tracing
``` ```
**Process Flow:** **Process Flow:**

View File

@@ -9,13 +9,13 @@
# for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). # for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
# #
# Handler Function Signatures: # Handler Function Signatures:
# #
# ```julia # ```julia
# # Upload handler - uploads data to file server and returns URL # # Upload handler - uploads data to file server and returns URL
# fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} # fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# #
# # Download handler - fetches data from file server URL # # Download handler - fetches data from file server URL with exponential backoff
# fileserverDownloadHandler(fileserver_url::String, url::String, max_retries::Int, base_delay::Int, max_delay::Int)::Vector{UInt8} # fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
# ``` # ```
# #
# Multi-Payload Support (Standard API): # Multi-Payload Support (Standard API):
@@ -169,7 +169,7 @@ function envelope_to_json(env::msgEnvelope_v1)
"type" => payload.type, "type" => payload.type,
"transport" => payload.transport, "transport" => payload.transport,
"encoding" => payload.encoding, "encoding" => payload.encoding,
"size" => payload.size "size" => payload.size,
) )
# Include data based on transport type # Include data based on transport type
if payload.transport == "direct" && payload.data !== nothing if payload.transport == "direct" && payload.data !== nothing
@@ -286,6 +286,7 @@ function smartsend(
subject::String, # smartreceive's subject subject::String, # smartreceive's subject
data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) tuples data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) tuples
nats_url::String = DEFAULT_NATS_URL, nats_url::String = DEFAULT_NATS_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
size_threshold::Int = DEFAULT_SIZE_THRESHOLD, size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing, correlation_id::Union{String, Nothing} = nothing,
@@ -329,7 +330,7 @@ function smartsend(
encoding = "base64", encoding = "base64",
size = payload_size, size = payload_size,
data = payload_b64, data = payload_b64,
metadata = Dict("content_length" => payload_size) metadata = Dict("payload_bytes" => payload_size)
) )
push!(payloads, payload) push!(payloads, payload)
else else
@@ -337,9 +338,9 @@ function smartsend(
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
# Upload to HTTP server # Upload to HTTP server
response = fileserverUploadHandler(DEFAULT_FILESERVER_URL, dataname, payload_bytes) response = fileserverUploadHandler(fileserver_url, dataname, payload_bytes)
if response[:status] != 200 # Check if upload was successful if response["status"] != 200 # Check if upload was successful
error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed
end end
@@ -355,7 +356,7 @@ function smartsend(
encoding = "none", encoding = "none",
size = payload_size, size = payload_size,
data = url, data = url,
metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") metadata = Dict()
) )
push!(payloads, payload) push!(payloads, payload)
end end
@@ -585,11 +586,11 @@ function smartreceive(
elseif transport == "link" # Link transport - payload is at URL elseif transport == "link" # Link transport - payload is at URL
log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL") # Log link transport handling log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL") # Log link transport handling
# Extract URL from the payload # Extract download URL from the payload
url = String(payload_data["data"]) url = String(payload_data["data"])
#[WORKING] Fetch with exponential backoff using the download handler #[WORKING] Fetch with exponential backoff using the download handler
downloaded_data = fileserverDownloadHandler(json_data["data"], max_retries, base_delay, max_delay,json_data["correlationId"]) downloaded_data = fileserverDownloadHandler(url, max_retries, base_delay, max_delay, json_data["correlationId"])
# Deserialize based on type # Deserialize based on type
data_type = String(payload_data["type"]) data_type = String(payload_data["type"])

View File

@@ -1,11 +1,13 @@
#!/usr/bin/env julia #!/usr/bin/env julia
# Test script for large payload testing using binary transport # Test script for large payload testing using binary transport
# Tests sending a large file (> 1MB) via smartsend with binary type # Tests sending a large file (> 1MB) via smartsend with binary type
# Updated to match NATSBridge.jl API
using NATS, JSON, UUIDs, Dates using NATS, JSON, UUIDs, Dates
using HTTP
# Include the bridge module # Include the bridge module
include("../src/NATSBridge.jl") include("./src/NATSBridge.jl")
using .NATSBridge using .NATSBridge
# Configuration # Configuration
@@ -31,9 +33,55 @@ function log_trace(message)
println("[$timestamp] [Correlation: $correlation_id] $message") println("[$timestamp] [Correlation: $correlation_id] $message")
end end
# File upload handler for plik server
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Get upload ID
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
# Upload file
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
# File download handler for plik server
function plik_download_handler(fileserver_url::String, url::String, max_retries::Int, base_delay::Int, max_delay::Int)::Vector{UInt8}
delay = base_delay
for attempt in 1:max_retries
try
response = HTTP.request("GET", url)
if response.status == 200
return response.body
else
error("Failed to fetch: $(response.status)")
end
catch e
if attempt < max_retries
sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
end
end
end
error("Failed to fetch data after $max_retries attempts")
end
# Sender: Send large binary file via smartsend # Sender: Send large binary file via smartsend
function test_large_binary_send() function test_large_binary_send()
conn = NATS.connect(NATS_URL)
# Read the large file as binary data # Read the large file as binary data
log_trace("Reading large file: $FILE_PATH") log_trace("Reading large file: $FILE_PATH")
file_data = read(FILE_PATH) file_data = read(FILE_PATH)
@@ -43,74 +91,73 @@ function test_large_binary_send()
# Use smartsend with binary type - will automatically use link transport # Use smartsend with binary type - will automatically use link transport
# if file size exceeds the threshold (1MB by default) # if file size exceeds the threshold (1MB by default)
# API: smartsend(subject, [(dataname, data, type), ...]; keywords...)
env = NATSBridge.smartsend( env = NATSBridge.smartsend(
SUBJECT, SUBJECT,
file_data, [(filename, file_data, "binary")], # List of (dataname, data, type) tuples
"binary",
nats_url = NATS_URL, nats_url = NATS_URL,
fileserver_url = FILESERVER_URL; fileserverUploadHandler = plik_upload_handler,
dataname=filename size_threshold = 1_000_000,
correlation_id = correlation_id,
msg_purpose = "chat",
sender_name = "sender",
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = ""
) )
log_trace("Sent message with transport: $(env.transport)") log_trace("Sent message with transport: $(env.payloads[1].transport)")
log_trace("Envelope type: $(env.type)") log_trace("Envelope type: $(env.payloads[1].type)")
# Check if link transport was used # Check if link transport was used
if env.transport == "link" if env.payloads[1].transport == "link"
log_trace("Using link transport - file uploaded to HTTP server") log_trace("Using link transport - file uploaded to HTTP server")
log_trace("URL: $(env.url)") log_trace("URL: $(env.payloads[1].data)")
else else
log_trace("Using direct transport - payload sent via NATS") log_trace("Using direct transport - payload sent via NATS")
end end
NATS.drain(conn)
end end
# Receiver: Listen for messages and verify large payload handling # Receiver: Listen for messages and verify large payload handling
function test_large_binary_receive() function test_large_binary_receive()
conn = NATS.connect(NATS_URL) conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg NATS.subscribe(conn, SUBJECT) do msg
log_trace("Received message on $(msg.subject)") log_trace("Received message on $(msg.subject)")
# Use NATSBridge.smartreceive to handle the data
# API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay)
result = NATSBridge.smartreceive(
msg,
plik_download_handler,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
# Result is a list of (dataname, data) tuples
for (dataname, data) in result
# Check transport type from the envelope
# For link transport, data is the URL string
# For direct transport, data is the actual payload bytes
if isa(data, Vector{UInt8})
file_size = length(data)
log_trace("Received $(file_size) bytes of binary data for '$dataname'")
# Save received data to a test file
output_path = "./new_$dataname"
write(output_path, data)
log_trace("Saved received data to $output_path")
else
log_trace("Received data for '$dataname' (type: $(typeof(data)))")
end
end
end
# Use NATSBridge.smartreceive to handle the data # Keep listening for 10 seconds
result = NATSBridge.smartreceive(msg) sleep(120)
# Check transport type NATS.drain(conn)
if result.envelope.transport == "direct"
log_trace("Received direct transport")
else
# For link transport, result.data is the URL
log_trace("Received link transport")
end
# Verify the received data matches the original
if result.envelope.type == "binary"
if isa(result.data, Vector{UInt8})
file_size = length(result.data)
log_trace("Received $(file_size) bytes of binary data")
# Save received data to a test file
println("metadata ", result.envelope.metadata)
dataname = result.envelope.metadata["dataname"]
if dataname != "NA"
output_path = "./new_$dataname"
write(output_path, result.data)
log_trace("Saved received data to $output_path")
end
# Verify file size
original_size = length(read(FILE_PATH))
if file_size == result.envelope.metadata["content_length"]
log_trace("SUCCESS: File size matches! Original: $(result.envelope.metadata["content_length"]) bytes")
else
log_trace("WARNING: File size mismatch! Original: $(result.envelope.metadata["content_length"]), Received: $file_size")
end
end
end
end
# Keep listening for 10 seconds
sleep(120)
NATS.drain(conn)
end end
@@ -123,68 +170,8 @@ println("File: $FILE_PATH")
println("start smartsend") println("start smartsend")
test_large_binary_send() test_large_binary_send()
# # Run receiver # Run receiver
# println("testing smartreceive") # println("testing smartreceive")
# test_large_binary_receive() # test_large_binary_receive()
println("Test completed.") println("Test completed.")