diff --git a/docs/architecture.md b/docs/architecture.md index 4fa65fc..82a8dea 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -12,10 +12,16 @@ The system uses **handler functions** to abstract file server operations, allowi ```julia # Upload handler - uploads data to file server and returns URL +# The handler is passed to smartsend as fileserverUploadHandler parameter +# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8}) +# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url" fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} -# Download handler - fetches data from file server URL -fileserverDownloadHandler(fileserver_url::String, url::String, max_retries::Int, base_delay::Int, max_delay::Int)::Vector{UInt8} +# Download handler - fetches data from file server URL with exponential backoff +# The handler is passed to smartreceive as fileserverDownloadHandler parameter +# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String) +# Returns: Vector{UInt8} (the downloaded data) +fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} ``` This design allows the system to support multiple file server backends without changing the core messaging logic. @@ -154,8 +160,7 @@ end "brokerURL": "nats://localhost:4222", "metadata": { - "content_type": "application/octet-stream", - "content_length": 123456 + }, "payloads": [ @@ -168,7 +173,7 @@ end "size": 15433, "data": "base64-encoded-string", "metadata": { - "checksum": "sha256_hash" + } }, { @@ -180,7 +185,7 @@ end "size": 524288, "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow", "metadata": { - "checksum": "sha256_hash" + } } ] @@ -321,19 +326,19 @@ function smartsend( ```julia function smartreceive( - msg::NATS.Message; - fileserverDownloadHandler::Function, - max_retries::Int = 5, - base_delay::Int = 100, - max_delay::Int = 5000 + msg::NATS.Message, + fileserverDownloadHandler::Function=_fetch_with_backoff; + max_retries::Int = 5, + base_delay::Int = 100, + max_delay::Int = 5000 ) - # Parse envelope - # Iterate through all payloads - # For each payload: check transport type - # If direct: decode Base64 payload - # If link: fetch from URL with exponential backoff using fileserverDownloadHandler - # Deserialize payload based on type - # Return list of (dataname, data) tuples + # Parse envelope + # Iterate through all payloads + # For each payload: check transport type + # If direct: decode Base64 payload + # If link: fetch from URL with exponential backoff using fileserverDownloadHandler + # Deserialize payload based on type + # Return list of (dataname, data) tuples end ``` @@ -347,10 +352,12 @@ end 3. For each payload: - Determine transport type (`direct` or `link`) - If `direct`: decode Base64 data from the message - - If `link`: fetch data from URL using exponential backoff + - If `link`: fetch data from URL using exponential backoff (via `fileserverDownloadHandler`) - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.) 4. Return list of `(dataname, data)` tuples +**Note:** The `fileserverDownloadHandler` receives `(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)` and returns `Vector{UInt8}`. + ### JavaScript Implementation #### Dependencies @@ -388,10 +395,10 @@ async function smartsend(subject, data, options = {}) async function smartreceive(msg, options = {}) // options object should include: // - fileserverDownloadHandler: function to fetch data from file server URL - // - fileserver_url: base URL of the file server // - max_retries: maximum retry attempts for fetching URL // - base_delay: initial delay for exponential backoff in ms // - max_delay: maximum delay for exponential backoff in ms + // - correlationId: optional correlation ID for tracing ``` **Process Flow:** diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 7a5ae51..010a3e4 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -9,13 +9,13 @@ # for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). # # Handler Function Signatures: -# +# # ```julia # # Upload handler - uploads data to file server and returns URL # fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} -# -# # Download handler - fetches data from file server URL -# fileserverDownloadHandler(fileserver_url::String, url::String, max_retries::Int, base_delay::Int, max_delay::Int)::Vector{UInt8} +# +# # Download handler - fetches data from file server URL with exponential backoff +# fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} # ``` # # Multi-Payload Support (Standard API): @@ -169,7 +169,7 @@ function envelope_to_json(env::msgEnvelope_v1) "type" => payload.type, "transport" => payload.transport, "encoding" => payload.encoding, - "size" => payload.size + "size" => payload.size, ) # Include data based on transport type if payload.transport == "direct" && payload.data !== nothing @@ -286,6 +286,7 @@ function smartsend( subject::String, # smartreceive's subject data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) tuples nats_url::String = DEFAULT_NATS_URL, + fileserver_url = DEFAULT_FILESERVER_URL, fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver size_threshold::Int = DEFAULT_SIZE_THRESHOLD, correlation_id::Union{String, Nothing} = nothing, @@ -329,7 +330,7 @@ function smartsend( encoding = "base64", size = payload_size, data = payload_b64, - metadata = Dict("content_length" => payload_size) + metadata = Dict("payload_bytes" => payload_size) ) push!(payloads, payload) else @@ -337,9 +338,9 @@ function smartsend( log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice # Upload to HTTP server - response = fileserverUploadHandler(DEFAULT_FILESERVER_URL, dataname, payload_bytes) + response = fileserverUploadHandler(fileserver_url, dataname, payload_bytes) - if response[:status] != 200 # Check if upload was successful + if response["status"] != 200 # Check if upload was successful error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed end @@ -355,7 +356,7 @@ function smartsend( encoding = "none", size = payload_size, data = url, - metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") + metadata = Dict() ) push!(payloads, payload) end @@ -585,11 +586,11 @@ function smartreceive( elseif transport == "link" # Link transport - payload is at URL log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL") # Log link transport handling - # Extract URL from the payload + # Extract download URL from the payload url = String(payload_data["data"]) #[WORKING] Fetch with exponential backoff using the download handler - downloaded_data = fileserverDownloadHandler(json_data["data"], max_retries, base_delay, max_delay,json_data["correlationId"]) + downloaded_data = fileserverDownloadHandler(url, max_retries, base_delay, max_delay, json_data["correlationId"]) # Deserialize based on type data_type = String(payload_data["type"]) diff --git a/test/test_julia_julia_data_transfer.jl b/test/test_julia_julia_data_transfer.jl index 464a493..0fa29af 100644 --- a/test/test_julia_julia_data_transfer.jl +++ b/test/test_julia_julia_data_transfer.jl @@ -1,11 +1,13 @@ #!/usr/bin/env julia # Test script for large payload testing using binary transport # Tests sending a large file (> 1MB) via smartsend with binary type +# Updated to match NATSBridge.jl API using NATS, JSON, UUIDs, Dates +using HTTP # Include the bridge module -include("../src/NATSBridge.jl") +include("./src/NATSBridge.jl") using .NATSBridge # Configuration @@ -31,9 +33,55 @@ function log_trace(message) println("[$timestamp] [Correlation: $correlation_id] $message") end +# File upload handler for plik server +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + # Get upload ID + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + # Upload file + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +# File download handler for plik server +function plik_download_handler(fileserver_url::String, url::String, max_retries::Int, base_delay::Int, max_delay::Int)::Vector{UInt8} + delay = base_delay + for attempt in 1:max_retries + try + response = HTTP.request("GET", url) + if response.status == 200 + return response.body + else + error("Failed to fetch: $(response.status)") + end + catch e + if attempt < max_retries + sleep(delay / 1000.0) + delay = min(delay * 2, max_delay) + end + end + end + error("Failed to fetch data after $max_retries attempts") +end + # Sender: Send large binary file via smartsend function test_large_binary_send() - conn = NATS.connect(NATS_URL) # Read the large file as binary data log_trace("Reading large file: $FILE_PATH") file_data = read(FILE_PATH) @@ -43,74 +91,73 @@ function test_large_binary_send() # Use smartsend with binary type - will automatically use link transport # if file size exceeds the threshold (1MB by default) + # API: smartsend(subject, [(dataname, data, type), ...]; keywords...) env = NATSBridge.smartsend( SUBJECT, - file_data, - "binary", + [(filename, file_data, "binary")], # List of (dataname, data, type) tuples nats_url = NATS_URL, - fileserver_url = FILESERVER_URL; - dataname=filename + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = correlation_id, + msg_purpose = "chat", + sender_name = "sender", + receiver_name = "", + receiver_id = "", + reply_to = "", + reply_to_msg_id = "" ) - log_trace("Sent message with transport: $(env.transport)") - log_trace("Envelope type: $(env.type)") + log_trace("Sent message with transport: $(env.payloads[1].transport)") + log_trace("Envelope type: $(env.payloads[1].type)") # Check if link transport was used - if env.transport == "link" + if env.payloads[1].transport == "link" log_trace("Using link transport - file uploaded to HTTP server") - log_trace("URL: $(env.url)") + log_trace("URL: $(env.payloads[1].data)") else log_trace("Using direct transport - payload sent via NATS") end - - NATS.drain(conn) end # Receiver: Listen for messages and verify large payload handling function test_large_binary_receive() - conn = NATS.connect(NATS_URL) - NATS.subscribe(conn, SUBJECT) do msg - log_trace("Received message on $(msg.subject)") + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + log_trace("Received message on $(msg.subject)") + + # Use NATSBridge.smartreceive to handle the data + # API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay) + result = NATSBridge.smartreceive( + msg, + plik_download_handler, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + # Result is a list of (dataname, data) tuples + for (dataname, data) in result + # Check transport type from the envelope + # For link transport, data is the URL string + # For direct transport, data is the actual payload bytes + + if isa(data, Vector{UInt8}) + file_size = length(data) + log_trace("Received $(file_size) bytes of binary data for '$dataname'") + + # Save received data to a test file + output_path = "./new_$dataname" + write(output_path, data) + log_trace("Saved received data to $output_path") + else + log_trace("Received data for '$dataname' (type: $(typeof(data)))") + end + end + end - # Use NATSBridge.smartreceive to handle the data - result = NATSBridge.smartreceive(msg) - # Check transport type - if result.envelope.transport == "direct" - log_trace("Received direct transport") - else - # For link transport, result.data is the URL - log_trace("Received link transport") - end - - # Verify the received data matches the original - if result.envelope.type == "binary" - if isa(result.data, Vector{UInt8}) - file_size = length(result.data) - log_trace("Received $(file_size) bytes of binary data") - - # Save received data to a test file - println("metadata ", result.envelope.metadata) - dataname = result.envelope.metadata["dataname"] - if dataname != "NA" - output_path = "./new_$dataname" - write(output_path, result.data) - log_trace("Saved received data to $output_path") - end - - # Verify file size - original_size = length(read(FILE_PATH)) - if file_size == result.envelope.metadata["content_length"] - log_trace("SUCCESS: File size matches! Original: $(result.envelope.metadata["content_length"]) bytes") - else - log_trace("WARNING: File size mismatch! Original: $(result.envelope.metadata["content_length"]), Received: $file_size") - end - end - end - end - - # Keep listening for 10 seconds - sleep(120) - NATS.drain(conn) + # Keep listening for 10 seconds + sleep(120) + NATS.drain(conn) end @@ -123,68 +170,8 @@ println("File: $FILE_PATH") println("start smartsend") test_large_binary_send() -# # Run receiver +# Run receiver # println("testing smartreceive") # test_large_binary_receive() -println("Test completed.") - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +println("Test completed.") \ No newline at end of file