diff --git a/Manifest.toml b/Manifest.toml index 1e89673..8e08852 100644 --- a/Manifest.toml +++ b/Manifest.toml @@ -1,8 +1,8 @@ # This file is machine-generated - editing it directly is not advised -julia_version = "1.12.4" +julia_version = "1.12.5" manifest_format = "2.0" -project_hash = "be1e3c2d8b7f4f0ee7375c94aaf704ce73ba57b9" +project_hash = "24baf0eb17859281acbf0208c4164e7fb92fabbe" [[deps.AliasTables]] deps = ["PtrArrays", "Random"] diff --git a/Project.toml b/Project.toml index e5188aa..aa7a0c2 100644 --- a/Project.toml +++ b/Project.toml @@ -5,4 +5,6 @@ GeneralUtils = "c6c72f09-b708-4ac8-ac7c-2084d70108fe" HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" NATS = "55e73f9c-eeeb-467f-b4cc-a633fde63d2a" +PrettyPrinting = "54e16d92-306c-5ea0-a30b-337be88ac337" +Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" diff --git a/etc.jl b/etc.jl index 532d5d1..d4c7daa 100644 --- a/etc.jl +++ b/etc.jl @@ -1,15 +1,172 @@ -the user will provide data in this form: [("dataname1", data1, "datatype1"), ("dataname2", data2, "datatype2"), ...] -For example: -[ - ("name", "ton", "text"), - ("age", 15, "Integer"), - ("school_info", Dict("schoolname"=> "Bodin", "classmates_number"=> 52), "dictionary"), - ("classmate_names", Dataframe_data, "table"), - ("ton_image", image_data, "image"), - ("ton_audio", audio_data, "audio"), - ("ton_birthday_video", video_data, "video"), - ("achievement.zip", file_data, "binary"), -] + + +using Revise +using NATS, JSON, UUIDs, Dates +using HTTP + + +# workdir = + +# Include the bridge module +include("./src/NATSBridge.jl") +using .NATSBridge + +# Configuration +const SUBJECT = "/NATSBridge_test" +const NATS_URL = "nats.yiem.cc" +const FILESERVER_URL = "http://192.168.88.104:8080" + +# Create correlation ID for tracing +correlation_id = string(uuid4()) + + +# ------------------------------------------------------------------------------------------------ # +# test file transfer # +# ------------------------------------------------------------------------------------------------ # + +# File path for large binary payload test +const FILE_PATH = "./testFile_small.zip" +const filename = basename(FILE_PATH) + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + + +function _serialize_data(data::Any, type::String) + if type == "text" # Text data - convert to UTF-8 bytes + if isa(data, String) + return bytes(data) # Convert string to UTF-8 bytes + else + error("Text data must be a String") + end + elseif type == "dictionary" # JSON data - serialize directly + json_str = JSON.json(data) # Convert Julia data to JSON string + return bytes(json_str) # Convert JSON string to bytes + elseif type == "table" # Table data - convert to Arrow IPC stream + io = IOBuffer() # Create in-memory buffer + Arrow.write(io, data) # Write data as Arrow IPC stream to buffer + return take!(io) # Return the buffer contents as bytes + elseif type == "image" # Image data - treat as binary + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Image data must be Vector{UInt8}") + end + elseif type == "audio" # Audio data - treat as binary + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Audio data must be Vector{UInt8}") + end + elseif type == "video" # Video data - treat as binary + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Video data must be Vector{UInt8}") + end + elseif type == "binary" # Binary data - treat as binary + if isa(data, IOBuffer) # Check if data is an IOBuffer + return take!(data) # Return buffer contents as bytes + elseif isa(data, Vector{UInt8}) # Check if data is already binary + return data # Return binary data directly + else # Unsupported binary data type + error("Binary data must be binary (Vector{UInt8} or IOBuffer)") + end + else # Unknown type + error("Unknown type: $type") + end +end + + +# File upload handler for plik server +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + # Get upload ID + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + # Upload file + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function log_trace(correlation_id::String, message::String) + timestamp = Dates.now() # Get current timestamp + @info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message +end + +file_data = read(FILE_PATH) +file_size = length(file_data) + + +subject::String=SUBJECT +data::AbstractArray{Tuple{String, Array{UInt8, 1}, String}, 1}=[(filename, file_data, "binary")] +nats_url::String=DEFAULT_NATS_URL +fileserver_url::String=DEFAULT_FILESERVER_URL +fileserverUploadHandler::Function=plik_upload_handler +size_threshold::Int=1_000_000 +correlation_id::Union{String, Nothing}=correlation_id +msg_purpose::String="chat" +sender_name::String="sender" +receiver_name::String="receiver_name" +receiver_id::String="receiver_id" +reply_to::String="reply_to" +reply_to_msg_id::String="reply_to_msg_id" + + + +(dataname, payload_data, payload_type) = data[1] +payload_bytes = _serialize_data(payload_data, payload_type) + + payload_size = length(payload_bytes) # Calculate payload size in bytes + log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size + +payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string +log_trace(cid, "Using direct transport for $payload_size bytes") + +payload = msgPayload_v1( + id = string(uuid4()), + dataname = dataname, + type = payload_type, + transport = "direct", + encoding = "base64", + size = payload_size, + data = payload_b64, + metadata = Dict("payload_bytes" => payload_size) + ) + +push!(payloads, payload) + + + + + + + + + + + + + + diff --git a/new_testFile_large.zip b/new_testFile_large.zip new file mode 100644 index 0000000..56dfcd1 Binary files /dev/null and b/new_testFile_large.zip differ diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 5e4786e..a8ea2f1 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -35,13 +35,14 @@ module NATSBridge -using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64 +using Revise +using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting # ---------------------------------------------- 100 --------------------------------------------- # # Constants const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL -const DEFAULT_FILESERVER_URL = "http://localhost:8080/upload" # Default HTTP file server URL for link transport +const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport struct msgPayload_v1 @@ -64,8 +65,8 @@ function msgPayload_v1(; encoding::String = "none", size::Integer = 0, data::Any = nothing, - metadata::Dict{String, Any} = Dict{String, Any}() -) + metadata::Dict{String, T} = Dict{String, Any}() +) where {T<:Any} return msgPayload_v1( id, dataname, @@ -172,15 +173,18 @@ function envelope_to_json(env::msgEnvelope_v1) "size" => payload.size, ) # Include data based on transport type - if payload.transport == "direct" && payload.data !== nothing - if payload.encoding == "base64" || payload.encoding == "json" - payload_obj["data"] = payload.data - else - # For other encodings, use base64 - payload_bytes = _get_payload_bytes(payload.data) - payload_obj["data"] = Base64.base64encode(payload_bytes) - end - end + if payload.transport == "direct" && payload.data !== nothing + if payload.encoding == "base64" || payload.encoding == "json" + payload_obj["data"] = payload.data + else + # For other encodings, use base64 + payload_bytes = _get_payload_bytes(payload.data) + payload_obj["data"] = Base64.base64encode(payload_bytes) + end + elseif payload.transport == "link" && payload.data !== nothing + # For link transport, data is a URL string - include directly + payload_obj["data"] = payload.data + end if !isempty(payload.metadata) payload_obj["metadata"] = Dict(String(k) => v for (k, v) in payload.metadata) end @@ -284,7 +288,7 @@ env = smartsend("chat.subject", [ """ function smartsend( subject::String, # smartreceive's subject - data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) tuples + data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples nats_url::String = DEFAULT_NATS_URL, fileserver_url = DEFAULT_FILESERVER_URL, fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver @@ -296,7 +300,8 @@ function smartsend( receiver_id::String = "", reply_to::String = "", reply_to_msg_id::String = "" -) +) where {T1<:Any} + # Generate correlation ID if not provided cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID @@ -330,7 +335,7 @@ function smartsend( encoding = "base64", size = payload_size, data = payload_b64, - metadata = Dict("payload_bytes" => payload_size) + metadata = Dict{String, Any}("payload_bytes" => payload_size) ) push!(payloads, payload) else @@ -356,11 +361,18 @@ function smartsend( encoding = "none", size = payload_size, data = url, - metadata = Dict() + metadata = Dict{String, Any}() ) + println("1 ----------") + PrettyPrinting.pprintln(payload) + println("1 ==========") push!(payloads, payload) end end + + println("2 ----------") + PrettyPrinting.pprintln(payloads) + println("2 ==========") # Create msgEnvelope_v1 with all payloads env = msgEnvelope_v1( @@ -376,12 +388,16 @@ function smartsend( replyTo = reply_to, replyToMsgId = reply_to_msg_id, brokerURL = nats_url, - metadata = Dict(), + metadata = Dict{String, Any}(), payloads = payloads ) msg_json = envelope_to_json(env) # Convert envelope to JSON publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS + + println("3 ----------") + PrettyPrinting.pprintln(msg_json) + println("3 ==========") return env # Return the envelope for tracking end @@ -527,7 +543,7 @@ It deserializes the data based on the transport type and returns the result. A HTTP file server is required along with its upload function. Arguments: - - `msg::NATS.Message` - NATS message to process + - `msg::NATS.Msg` - NATS message to process - `fileserverDownloadHandler::Function` - Function to handle downloading data from file server URLs Keyword Arguments: @@ -547,62 +563,67 @@ payloads = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, ``` """ function smartreceive( - msg::NATS.Message, - fileserverDownloadHandler::Function=_fetch_with_backoff; + msg::NATS.Msg; + fileserverDownloadHandler::Function=_fetch_with_backoff, max_retries::Int = 5, base_delay::Int = 100, max_delay::Int = 5000 ) # Parse the JSON envelope json_data = JSON.parse(String(msg.payload)) - + println("R1 ----------") + PrettyPrinting.pprintln(json_data) + println("R1 ==========") log_trace(json_data["correlationId"], "Processing received message") # Log message processing start # Process all payloads in the envelope - payloads_list = Tuple{String, Any}[] + payloads_list = Tuple{String, Any, String}[] # Get number of payloads num_payloads = length(json_data["payloads"]) for i in 1:num_payloads - payload_data = json_data["payloads"][i] - transport = String(payload_data["transport"]) - dataname = String(payload_data["dataname"]) + payload = json_data["payloads"][i] + transport = String(payload["transport"]) + dataname = String(payload["dataname"]) if transport == "direct" # Direct transport - payload is in the message log_trace(json_data["correlationId"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling # Extract base64 payload from the payload - payload_b64 = String(payload_data["data"]) + payload_b64 = String(payload["data"]) # Decode Base64 payload payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes # Deserialize based on type - data_type = String(payload_data["type"]) + data_type = String(payload["type"]) data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"]) - push!(payloads_list, (dataname, data)) + push!(payloads_list, (dataname, data, data_type)) elseif transport == "link" # Link transport - payload is at URL log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL") # Log link transport handling # Extract download URL from the payload - url = String(payload_data["data"]) + println("R2 ----------") + PrettyPrinting.pprintln(payload) + println("R2 ==========") + url = String(payload["data"]) - #[WORKING] Fetch with exponential backoff using the download handler + # Fetch with exponential backoff using the download handler downloaded_data = fileserverDownloadHandler(url, max_retries, base_delay, max_delay, json_data["correlationId"]) # Deserialize based on type - data_type = String(payload_data["type"]) + data_type = String(payload["type"]) data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"]) - push!(payloads_list, (dataname, data)) + push!(payloads_list, (dataname, data, data_type)) else # Unknown transport type error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport end end - return payloads_list # Return list of (dataname, data) tuples + return payloads_list # Return list of (dataname, data, data_type) tuples end diff --git a/test/test_julia_julia_data_transfer_receive.jl b/test/test_julia_julia_data_transfer_receive.jl new file mode 100644 index 0000000..fa58a37 --- /dev/null +++ b/test/test_julia_julia_data_transfer_receive.jl @@ -0,0 +1,85 @@ +#!/usr/bin/env julia +# Test script for large payload testing using binary transport +# Tests sending a large file (> 1MB) via smartsend with binary type +# Updated to match NATSBridge.jl API + +using NATS, JSON, UUIDs, Dates +using HTTP + + +# workdir = + +# Include the bridge module +include("../src/NATSBridge.jl") +using .NATSBridge + +# Configuration +const SUBJECT = "/NATSBridge_test" +const NATS_URL = "nats.yiem.cc" +const FILESERVER_URL = "http://192.168.88.104:8080" + + + +# ------------------------------------------------------------------------------------------------ # +# test file transfer # +# ------------------------------------------------------------------------------------------------ # + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] $message") +end + +# Receiver: Listen for messages and verify large payload handling +function test_large_binary_receive() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + log_trace("Received message on $(msg.subject)") + + # Use NATSBridge.smartreceive to handle the data + # API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay) + result = NATSBridge.smartreceive( + msg; + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + # Result is a list of (dataname, data) tuples + for (dataname, data, data_type) in result + # Check transport type from the envelope + # For link transport, data is the URL string + # For direct transport, data is the actual payload bytes + + if isa(data, Vector{UInt8}) + file_size = length(data) + log_trace("Received $(file_size) bytes of binary data for '$dataname' of type $data_type") + + # Save received data to a test file + output_path = "./new_$dataname" + write(output_path, data) + log_trace("Saved received data to $output_path") + else + log_trace("Received $(file_size) bytes of binary data for '$dataname' of type $data_type") + end + end + end + + # Keep listening for 10 seconds + sleep(120) + NATS.drain(conn) +end + + +# Run the test +println("Starting large binary payload test...") + +# # Run sender first +# println("start smartsend") +# test_large_binary_send() + +# Run receiver +println("testing smartreceive") +test_large_binary_receive() + +println("Test completed.") \ No newline at end of file diff --git a/test/test_julia_julia_data_transfer.jl b/test/test_julia_julia_data_transfer_send.jl similarity index 61% rename from test/test_julia_julia_data_transfer.jl rename to test/test_julia_julia_data_transfer_send.jl index 0fa29af..515c4bd 100644 --- a/test/test_julia_julia_data_transfer.jl +++ b/test/test_julia_julia_data_transfer_send.jl @@ -6,12 +6,15 @@ using NATS, JSON, UUIDs, Dates using HTTP + +# workdir = + # Include the bridge module -include("./src/NATSBridge.jl") +include("../src/NATSBridge.jl") using .NATSBridge # Configuration -const SUBJECT = "/large_binary_test" +const SUBJECT = "/NATSBridge_test" const NATS_URL = "nats.yiem.cc" const FILESERVER_URL = "http://192.168.88.104:8080" @@ -24,7 +27,7 @@ correlation_id = string(uuid4()) # ------------------------------------------------------------------------------------------------ # # File path for large binary payload test -const FILE_PATH = "./testFile_small.zip" +const FILE_PATH = "./testFile_large.zip" const filename = basename(FILE_PATH) # Helper: Log with correlation ID @@ -59,31 +62,10 @@ function plik_upload_handler(fileserver_url::String, dataname::String, data::Vec return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end -# File download handler for plik server -function plik_download_handler(fileserver_url::String, url::String, max_retries::Int, base_delay::Int, max_delay::Int)::Vector{UInt8} - delay = base_delay - for attempt in 1:max_retries - try - response = HTTP.request("GET", url) - if response.status == 200 - return response.body - else - error("Failed to fetch: $(response.status)") - end - catch e - if attempt < max_retries - sleep(delay / 1000.0) - delay = min(delay * 2, max_delay) - end - end - end - error("Failed to fetch data after $max_retries attempts") -end - # Sender: Send large binary file via smartsend function test_large_binary_send() # Read the large file as binary data - log_trace("Reading large file: $FILE_PATH") + log_trace("Reading file: $FILE_PATH") file_data = read(FILE_PATH) file_size = length(file_data) @@ -95,7 +77,8 @@ function test_large_binary_send() env = NATSBridge.smartsend( SUBJECT, [(filename, file_data, "binary")], # List of (dataname, data, type) tuples - nats_url = NATS_URL, + nats_url = NATS_URL; + fileserver_url = FILESERVER_URL, fileserverUploadHandler = plik_upload_handler, size_threshold = 1_000_000, correlation_id = correlation_id, @@ -119,48 +102,6 @@ function test_large_binary_send() end end -# Receiver: Listen for messages and verify large payload handling -function test_large_binary_receive() - conn = NATS.connect(NATS_URL) - NATS.subscribe(conn, SUBJECT) do msg - log_trace("Received message on $(msg.subject)") - - # Use NATSBridge.smartreceive to handle the data - # API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay) - result = NATSBridge.smartreceive( - msg, - plik_download_handler, - max_retries = 5, - base_delay = 100, - max_delay = 5000 - ) - - # Result is a list of (dataname, data) tuples - for (dataname, data) in result - # Check transport type from the envelope - # For link transport, data is the URL string - # For direct transport, data is the actual payload bytes - - if isa(data, Vector{UInt8}) - file_size = length(data) - log_trace("Received $(file_size) bytes of binary data for '$dataname'") - - # Save received data to a test file - output_path = "./new_$dataname" - write(output_path, data) - log_trace("Saved received data to $output_path") - else - log_trace("Received data for '$dataname' (type: $(typeof(data)))") - end - end - end - - # Keep listening for 10 seconds - sleep(120) - NATS.drain(conn) -end - - # Run the test println("Starting large binary payload test...") println("Correlation ID: $correlation_id")