diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 9a0976d..95f9db5 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -43,7 +43,7 @@ module NATSBridge -using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting +using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting, DataFrames # ---------------------------------------------- 100 --------------------------------------------- # # Constants @@ -302,7 +302,7 @@ function envelope_to_json(env::msg_envelope_v1) ) # Include data based on transport type if payload.transport == "direct" && payload.data !== nothing - if payload.encoding == "base64" || payload.encoding == "json" + if payload.encoding == "base64" || payload.encoding == "json" || payload.encoding == "arrow-ipc" payload_obj["data"] = payload.data else # For other encodings, use base64 @@ -462,6 +462,8 @@ function smartsend( # Process each payload in the list payloads = msg_payload_v1[] for (dataname, payload_data, payload_type) in data + @show dataname typeof(payload_data) + # Serialize data based on type payload_bytes = _serialize_data(payload_data, payload_type) diff --git a/test/large_image.png b/test/large_image.png new file mode 100644 index 0000000..2a89fd8 Binary files /dev/null and b/test/large_image.png differ diff --git a/test/small_image.jpg b/test/small_image.jpg new file mode 100644 index 0000000..98a28be Binary files /dev/null and b/test/small_image.jpg differ diff --git a/test/test_julia_dict_receiver.jl b/test/test_julia_dict_receiver.jl deleted file mode 100644 index 6fb98a8..0000000 --- a/test/test_julia_dict_receiver.jl +++ /dev/null @@ -1,82 +0,0 @@ -#!/usr/bin/env julia -# Test script for Dictionary transport testing -# Tests receiving 1 large and 1 small Dictionaries via direct and link transport -# Uses NATSBridge.jl smartreceive with "dictionary" type - -using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP - -# Include the bridge module -include("../src/NATSBridge.jl") -using .NATSBridge - -# Configuration -const SUBJECT = "/NATSBridge_dict_test" -const NATS_URL = "nats.yiem.cc" -const FILESERVER_URL = "http://192.168.88.104:8080" - - -# ------------------------------------------------------------------------------------------------ # -# test dictionary transfer # -# ------------------------------------------------------------------------------------------------ # - - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] $message") -end - - -# Receiver: Listen for messages and verify Dictionary handling -function test_dict_receive() - conn = NATS.connect(NATS_URL) - NATS.subscribe(conn, SUBJECT) do msg - log_trace("Received message on $(msg.subject)") - - # Use NATSBridge.smartreceive to handle the data - # API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay) - result = NATSBridge.smartreceive( - msg; - max_retries = 5, - base_delay = 100, - max_delay = 5000 - ) - - # Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples - for (dataname, data, data_type) in result["payloads"] - if isa(data, JSON.Object{String, Any}) - log_trace("Received Dictionary '$dataname' of type $data_type") - - # Display dictionary contents - println(" Contents:") - for (key, value) in data - println(" $key => $value") - end - - # Save to JSON file - output_path = "./received_$dataname.json" - json_str = JSON.json(data, 2) - write(output_path, json_str) - log_trace("Saved Dictionary to $output_path") - else - log_trace("Received unexpected data type for '$dataname': $(typeof(data))") - end - end - end - - # Keep listening for 10 seconds - sleep(120) - NATS.drain(conn) -end - - -# Run the test -println("Starting Dictionary transport test...") -println("Note: This receiver will wait for messages from the sender.") -println("Run test_julia_to_julia_dict_sender.jl first to send test data.") - -# Run receiver -println("testing smartreceive") -test_dict_receive() - -println("Test completed.") \ No newline at end of file diff --git a/test/test_julia_dict_sender.jl b/test/test_julia_dict_sender.jl deleted file mode 100644 index dd2a847..0000000 --- a/test/test_julia_dict_sender.jl +++ /dev/null @@ -1,137 +0,0 @@ -#!/usr/bin/env julia -# Test script for Dictionary transport testing -# Tests sending 1 large and 1 small Dictionaries via direct and link transport -# Uses NATSBridge.jl smartsend with "dictionary" type - -using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP - -# Include the bridge module -include("../src/NATSBridge.jl") -using .NATSBridge - -# Configuration -const SUBJECT = "/NATSBridge_dict_test" -const NATS_URL = "nats.yiem.cc" -const FILESERVER_URL = "http://192.168.88.104:8080" - -# Create correlation ID for tracing -correlation_id = string(uuid4()) - - -# ------------------------------------------------------------------------------------------------ # -# test dictionary transfer # -# ------------------------------------------------------------------------------------------------ # - - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - - -# File upload handler for plik server -function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} - # Get upload ID - url_getUploadID = "$fileserver_url/upload" - headers = ["Content-Type" => "application/json"] - body = """{ "OneShot" : true }""" - httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) - responseJson = JSON.parse(String(httpResponse.body)) - uploadid = responseJson["id"] - uploadtoken = responseJson["uploadToken"] - - # Upload file - file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") - url_upload = "$fileserver_url/file/$uploadid" - headers = ["X-UploadToken" => uploadtoken] - - form = HTTP.Form(Dict("file" => file_multipart)) - httpResponse = HTTP.post(url_upload, headers, form) - responseJson = JSON.parse(String(httpResponse.body)) - - fileid = responseJson["id"] - url = "$fileserver_url/file/$uploadid/$fileid/$dataname" - - return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) -end - - -# Sender: Send Dictionaries via smartsend -function test_dict_send() - # Create a small Dictionary (will use direct transport) - small_dict = Dict( - "name" => "Alice", - "age" => 30, - "scores" => [95, 88, 92], - "metadata" => Dict( - "height" => 155, - "weight" => 55 - ) - ) - - # Create a large Dictionary (will use link transport if > 1MB) - # Generate a larger dataset (~2MB to ensure link transport) - large_dict = Dict( - "ids" => collect(1:50000), - "names" => ["User_$i" for i in 1:50000], - "scores" => rand(1:100, 50000), - "categories" => ["Category_$(rand(1:10))" for i in 1:50000], - "metadata" => Dict( - "source" => "test_generator", - "timestamp" => string(Dates.now()) - ) - ) - - # Test data 1: small Dictionary - data1 = ("small_dict", small_dict, "dictionary") - - # Test data 2: large Dictionary - data2 = ("large_dict", large_dict, "dictionary") - - # Use smartsend with dictionary type - # For small Dictionary: will use direct transport (JSON encoded) - # For large Dictionary: will use link transport (uploaded to fileserver) - env, env_json_str = NATSBridge.smartsend( - SUBJECT, - [data1, data2]; # List of (dataname, data, type) tuples - broker_url = NATS_URL, - fileserver_url = FILESERVER_URL, - fileserver_upload_handler = plik_upload_handler, - size_threshold = 1_000_000, # 1MB threshold - correlation_id = correlation_id, - msg_purpose = "chat", - sender_name = "dict_sender", - receiver_name = "", - receiver_id = "", - reply_to = "", - reply_to_msg_id = "", - is_publish = true # Publish the message to NATS - ) - - log_trace("Sent message with $(length(env.payloads)) payloads") - - # Log transport type for each payload - for (i, payload) in enumerate(env.payloads) - log_trace("Payload $i ('$payload.dataname'):") - log_trace(" Transport: $(payload.transport)") - log_trace(" Type: $(payload.payload_type)") - log_trace(" Size: $(payload.size) bytes") - log_trace(" Encoding: $(payload.encoding)") - - if payload.transport == "link" - log_trace(" URL: $(payload.data)") - end - end -end - - -# Run the test -println("Starting Dictionary transport test...") -println("Correlation ID: $correlation_id") - -# Run sender -println("start smartsend for dictionaries") -test_dict_send() - -println("Test completed.") \ No newline at end of file diff --git a/test/test_julia_mix_payloads_receiver.jl b/test/test_julia_mix_payloads_receiver.jl index 1c33866..86d4413 100644 --- a/test/test_julia_mix_payloads_receiver.jl +++ b/test/test_julia_mix_payloads_receiver.jl @@ -93,8 +93,62 @@ function test_mix_receive() log_trace(" ERROR: Expected Dict, got $(typeof(data))") end + elseif data_type == "arrowtable" + # Arrow table data - should be Arrow.Table + if isa(data, Arrow.Table) + log_trace(" Type: Arrow.Table") + log_trace(" Dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns") + log_trace(" Columns: $(names(data))") + + # Display first few rows + log_trace(" First 5 rows:") + display(data[1:min(5, size(data, 1)), :]) + + # Convert to DataFrame for display and save + df = DataFrame(data) + output_path = "./received_$dataname.arrow" + io = IOBuffer() + Arrow.write(io, data) + write(output_path, take!(io)) + log_trace(" Saved to: $output_path") + else + log_trace(" ERROR: Expected Arrow.Table, got $(typeof(data))") + end + + elseif data_type == "jsontable" + # JSON table data - should be Vector{Dict} or Vector{NamedTuple} + if isa(data, Vector{<:Union{JSON.Object, Dict, NamedTuple}}) + log_trace(" Type: Vector{Dict/NamedTuple}") + log_trace(" Number of rows: $(length(data))") + + # Display first few rows + log_trace(" First 3 rows:") + for i in 1:min(3, length(data)) + log_trace(" Row $i: $(data[i])") + end + + # Convert to DataFrame for display and save + df = DataFrame(data) + log_trace(" Converted to DataFrame: $(size(df, 1)) rows x $(size(df, 2)) columns") + + # Save as JSON file + output_path = "./received_$dataname.json" + json_str = JSON.json(data, 2) + write(output_path, json_str) + log_trace(" Saved to: $output_path") + + # Also save as Arrow file + output_path_arrow = "./received_$dataname.arrow" + io = IOBuffer() + Arrow.write(io, df) + write(output_path_arrow, take!(io)) + log_trace(" Saved to: $output_path_arrow") + else + log_trace(" ERROR: Expected Vector{Dict/NamedTuple}, got $(typeof(data))") + end + elseif data_type == "table" - # Table data - should be a DataFrame + # Table data - should be a DataFrame (backward compatibility) data = DataFrame(data) if isa(data, DataFrame) log_trace(" Type: DataFrame") @@ -180,7 +234,9 @@ function test_mix_receive() println("\n=== Verification Summary ===") text_count = count(x -> x[3] == "text", result["payloads"]) dict_count = count(x -> x[3] == "dictionary", result["payloads"]) - table_count = count(x -> x[3] == "table", result["payloads"]) + arrowtable_count = count(x -> x[3] == "arrowtable", result["payloads"]) + jsontable_count = count(x -> x[3] == "jsontable", result["payloads"]) + table_count = count(x -> x[3] == "table", result["payloads"]) # backward compatibility image_count = count(x -> x[3] == "image", result["payloads"]) audio_count = count(x -> x[3] == "audio", result["payloads"]) video_count = count(x -> x[3] == "video", result["payloads"]) @@ -188,7 +244,9 @@ function test_mix_receive() log_trace("Text payloads: $text_count") log_trace("Dictionary payloads: $dict_count") - log_trace("Table payloads: $table_count") + log_trace("Arrow table payloads: $arrowtable_count") + log_trace("JSON table payloads: $jsontable_count") + log_trace("Table payloads (backward compat): $table_count") log_trace("Image payloads: $image_count") log_trace("Audio payloads: $audio_count") log_trace("Video payloads: $video_count") @@ -199,8 +257,12 @@ function test_mix_receive() for (dataname, data, data_type) in result["payloads"] if data_type in ["image", "audio", "video", "binary"] log_trace("$dataname: $(length(data)) bytes (binary)") + elseif data_type == "arrowtable" + log_trace("$dataname: $(size(data, 1)) rows x $(size(data, 2)) columns (Arrow.Table)") + elseif data_type == "jsontable" + log_trace("$dataname: $(length(data)) rows (Vector{Dict/NamedTuple})") elseif data_type == "table" - data = DataFrame(data) + data = DataFrame(data) log_trace("$dataname: $(size(data, 1)) rows x $(size(data, 2)) columns (DataFrame)") elseif data_type == "dictionary" log_trace("$dataname: $(length(JSON.json(data))) bytes (Dict)") diff --git a/test/test_julia_mix_payloads_sender.jl b/test/test_julia_mix_payloads_sender.jl index 2ce262e..fac92a9 100644 --- a/test/test_julia_mix_payloads_sender.jl +++ b/test/test_julia_mix_payloads_sender.jl @@ -1,10 +1,15 @@ #!/usr/bin/env julia # Test script for mixed-content message testing -# Tests sending a mix of text, json, table, image, audio, video, and binary data +# Tests sending a mix of text, dictionary, arrowtable, jsontable, image, audio, video, and binary data # from Julia serviceA to Julia serviceB using NATSBridge.jl smartsend # # This test demonstrates that any combination and any number of mixed content # can be sent and received correctly. +# +# Key concept: DataFrames are the main table representation in Julia. +# The NATSBridge.jl library handles serialization: +# - For "arrowtable" type: DataFrame is serialized to Arrow IPC format +# - For "jsontable" type: DataFrame is converted to Vector{Dict} and then to JSON using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64 @@ -82,50 +87,47 @@ function create_sample_data() ) ) - # Table data (DataFrame - small - direct transport) - table_data_small = DataFrame( + # Arrow table data (DataFrame - small - direct transport) + # Uses Arrow IPC format for efficient binary serialization + # NATSBridge.jl handles serialization: DataFrame -> Arrow IPC + arrow_table_small = DataFrame( id = 1:10, - message = ["msg_$i" for i in 1:10], - sender = ["sender_$i" for i in 1:10], - timestamp = [string(Dates.now()) for _ in 1:10], - priority = rand(1:3, 10) + name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"], + score = rand(50:100, 10), + active = rand([true, false], 10) ) - # Table data (DataFrame - large - link transport) - # ~1.5MB of data (150,000 rows) - should trigger link transport - table_data_large = DataFrame( + # Arrow table data (DataFrame - large - link transport) + # ~1.5MB of Arrow data (200,000 rows) - should trigger link transport + # NATSBridge.jl handles serialization: DataFrame -> Arrow IPC + arrow_table_large = DataFrame( + id = 1:200_000, + name = ["user_$i" for i in 1:200_000], + score = rand(50:100, 200_000), + active = rand([true, false], 200_000), + timestamp = [string(Dates.now()) for _ in 1:200_000] + ) + + # Json table data (DataFrame - small - direct transport) + # Uses JSON format for human-readable tabular data + # NATSBridge.jl handles serialization: DataFrame -> Vector{Dict} -> JSON + json_table_small = DataFrame( + id = 1:10, + name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"], + score = rand(50:100, 10), + active = rand([true, false], 10) + ) + + # Json table data (DataFrame - large - link transport) + # ~1.5MB of JSON data (150,000 rows) - should trigger link transport + # NATSBridge.jl handles serialization: DataFrame -> Vector{Dict} -> JSON + json_table_large = DataFrame( id = 1:150_000, - message = ["msg_$i" for i in 1:150_000], - sender = ["sender_$i" for i in 1:150_000], - timestamp = [string(Dates.now()) for i in 1:150_000], - priority = rand(1:3, 150_000) + name = ["user_$i" for i in 1:150_000], + score = rand(50:100, 150_000), + active = rand([true, false], 150_000) ) - # Image data (small binary - direct transport) - # Create a simple 10x10 pixel PNG-like data (128 bytes header + 100 pixels = 112 bytes) - # Using simple RGB data (10*10*3 = 300 bytes of pixel data) - image_width = 10 - image_height = 10 - image_data = UInt8[] - # PNG header (simplified) - push!(image_data, 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A) - # Simple RGB data (RGBRGBRGB...) - for i in 1:image_width*image_height - push!(image_data, 0xFF, 0x00, 0x00) # Red pixel - end - - # Image data (large - link transport) - # Create a larger image (~1.5MB) to test link transport - large_image_width = 500 - large_image_height = 1000 - large_image_data = UInt8[] - # PNG header (simplified for 500x1000) - push!(large_image_data, 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A) - # RGB data (500*1000*3 = 1,500,000 bytes) - for i in 1:large_image_width*large_image_height - push!(large_image_data, rand(1:255), rand(1:255), rand(1:255)) # Random color pixels - end - # Audio data (small binary - direct transport) audio_data = UInt8[rand(1:255) for _ in 1:100] @@ -150,10 +152,10 @@ function create_sample_data() return ( text_data, dict_data, - table_data_small, - table_data_large, - image_data, - large_image_data, + arrow_table_small, + arrow_table_large, + json_table_small, + json_table_large, audio_data, large_audio_data, video_data, @@ -167,19 +169,35 @@ end # Sender: Send mixed content via smartsend function test_mix_send() # Create sample data - (text_data, dict_data, table_data_small, table_data_large, image_data, large_image_data, audio_data, large_audio_data, video_data, large_video_data, binary_data, large_binary_data) = create_sample_data() - + (text_data, dict_data, arrow_table_small, arrow_table_large, json_table_small, json_table_large, audio_data, large_audio_data, video_data, large_video_data, binary_data, large_binary_data) = create_sample_data() + + # Read image files from disk (following test_julia_file_sender.jl pattern) + # Small image - should use direct transport + file_path_small_image = "./test/small_image.jpg" + file_data_small_image = read(file_path_small_image) + filename_small_image = basename(file_path_small_image) + + # Large image - should use link transport + file_path_large_image = "./test/large_image.png" + file_data_large_image = read(file_path_large_image) + filename_large_image = basename(file_path_large_image) + # Create payloads list - mixed content with both small and large data # Small data uses direct transport, large data uses link transport + # Key: Pass DataFrame directly and specify type as "arrowtable" or "jsontable" + # NATSBridge.jl handles the serialization internally payloads = [ - # Small data (direct transport) - text, dictionary, small table + # Small data (direct transport) - text, dictionary, arrowtable, jsontable, small image ("chat_text", text_data, "text"), ("chat_json", dict_data, "dictionary"), - ("chat_table_small", table_data_small, "table"), - - # Large data (link transport) - large table, large image, large audio, large video, large binary - ("chat_table_large", table_data_large, "table"), - ("user_image_large", large_image_data, "image"), + ("arrow_table_small", arrow_table_small, "arrowtable"), + ("json_table_small", json_table_small, "jsontable"), + (filename_small_image, file_data_small_image, "binary"), + + # Large data (link transport) - large arrowtable, large jsontable, large image, large audio, large video, large binary + ("arrow_table_large", arrow_table_large, "arrowtable"), + ("json_table_large", json_table_large, "jsontable"), + (filename_large_image, file_data_large_image, "binary"), ("audio_clip_large", large_audio_data, "audio"), ("video_clip_large", large_video_data, "video"), ("binary_file_large", large_binary_data, "binary") @@ -237,4 +255,4 @@ println("start smartsend for mixed content") test_mix_send() println("\nTest completed.") -println("Note: Run test_julia_to_julia_mix_receiver.jl to receive the messages.") \ No newline at end of file +println("Note: Run test_julia_to_julia_mix_receiver.jl to receive the messages.") diff --git a/test/test_julia_table_receiver.jl b/test/test_julia_table_receiver.jl deleted file mode 100644 index 2ea9866..0000000 --- a/test/test_julia_table_receiver.jl +++ /dev/null @@ -1,84 +0,0 @@ -#!/usr/bin/env julia -# Test script for DataFrame table transport testing -# Tests receiving 1 large and 1 small DataFrames via direct and link transport -# Uses NATSBridge.jl smartreceive with "table" type - -using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP - -# Include the bridge module -include("../src/NATSBridge.jl") -using .NATSBridge - -# Configuration -const SUBJECT = "/NATSBridge_table_test" -const NATS_URL = "nats.yiem.cc" -const FILESERVER_URL = "http://192.168.88.104:8080" - - -# ------------------------------------------------------------------------------------------------ # -# test table transfer # -# ------------------------------------------------------------------------------------------------ # - - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] $message") -end - - -# Receiver: Listen for messages and verify DataFrame table handling -function test_table_receive() - conn = NATS.connect(NATS_URL) - NATS.subscribe(conn, SUBJECT) do msg - log_trace("Received message on $(msg.subject)") - - # Use NATSBridge.smartreceive to handle the data - # API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay) - result = NATSBridge.smartreceive( - msg; - max_retries = 5, - base_delay = 100, - max_delay = 5000 - ) - - # Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples - for (dataname, data, data_type) in result["payloads"] - data = DataFrame(data) - if isa(data, DataFrame) - log_trace("Received DataFrame '$dataname' of type $data_type") - log_trace(" Dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns") - log_trace(" Column names: $(names(data))") - - # Display first few rows - println(" First 5 rows:") - display(data[1:min(5, size(data, 1)), :]) - - # Save to file - output_path = "./received_$dataname.arrow" - io = IOBuffer() - Arrow.write(io, data) - write(output_path, take!(io)) - log_trace("Saved DataFrame to $output_path") - else - log_trace("Received unexpected data type for '$dataname': $(typeof(data))") - end - end - end - - # Keep listening for 10 seconds - sleep(120) - NATS.drain(conn) -end - - -# Run the test -println("Starting DataFrame table transport test...") -println("Note: This receiver will wait for messages from the sender.") -println("Run test_julia_to_julia_table_sender.jl first to send test data.") - -# Run receiver -println("testing smartreceive") -test_table_receive() - -println("Test completed.") \ No newline at end of file diff --git a/test/test_julia_table_sender.jl b/test/test_julia_table_sender.jl deleted file mode 100644 index 386ad50..0000000 --- a/test/test_julia_table_sender.jl +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env julia -# Test script for DataFrame table transport testing -# Tests sending 1 large and 1 small DataFrames via direct and link transport -# Uses NATSBridge.jl smartsend with "table" type - -using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP - -# Include the bridge module -include("../src/NATSBridge.jl") -using .NATSBridge - -# Configuration -const SUBJECT = "/NATSBridge_table_test" -const NATS_URL = "nats.yiem.cc" -const FILESERVER_URL = "http://192.168.88.104:8080" - -# Create correlation ID for tracing -correlation_id = string(uuid4()) - - -# ------------------------------------------------------------------------------------------------ # -# test table transfer # -# ------------------------------------------------------------------------------------------------ # - - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - - -# File upload handler for plik server -function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} - # Get upload ID - url_getUploadID = "$fileserver_url/upload" - headers = ["Content-Type" => "application/json"] - body = """{ "OneShot" : true }""" - httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) - responseJson = JSON.parse(String(httpResponse.body)) - uploadid = responseJson["id"] - uploadtoken = responseJson["uploadToken"] - - # Upload file - file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") - url_upload = "$fileserver_url/file/$uploadid" - headers = ["X-UploadToken" => uploadtoken] - - form = HTTP.Form(Dict("file" => file_multipart)) - httpResponse = HTTP.post(url_upload, headers, form) - responseJson = JSON.parse(String(httpResponse.body)) - - fileid = responseJson["id"] - url = "$fileserver_url/file/$uploadid/$fileid/$dataname" - - return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) -end - - -# Sender: Send DataFrame tables via smartsend -function test_table_send() - # Create a small DataFrame (will use direct transport) - small_df = DataFrame( - id = 1:10, - name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"], - score = [95, 88, 92, 85, 90, 78, 95, 88, 92, 85], - category = ["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"] - ) - - # Create a large DataFrame (will use link transport if > 1MB) - # Generate a larger dataset (~2MB to ensure link transport) - large_ids = 1:50000 - large_names = ["User_$i" for i in 1:50000] - large_scores = rand(1:100, 50000) - large_categories = ["Category_$(rand(1:10))" for i in 1:50000] - - large_df = DataFrame( - id = large_ids, - name = large_names, - score = large_scores, - category = large_categories - ) - - # Test data 1: small DataFrame - data1 = ("small_table", small_df, "table") - - # Test data 2: large DataFrame - data2 = ("large_table", large_df, "table") - - # Use smartsend with table type - # For small DataFrame: will use direct transport (Base64 encoded Arrow IPC) - # For large DataFrame: will use link transport (uploaded to fileserver) - env, env_json_str = NATSBridge.smartsend( - SUBJECT, - [data1, data2]; # List of (dataname, data, type) tuples - broker_url = NATS_URL, - fileserver_url = FILESERVER_URL, - fileserver_upload_handler = plik_upload_handler, - size_threshold = 1_000_000, # 1MB threshold - correlation_id = correlation_id, - msg_purpose = "chat", - sender_name = "table_sender", - receiver_name = "", - receiver_id = "", - reply_to = "", - reply_to_msg_id = "", - is_publish = true # Publish the message to NATS - ) - - log_trace("Sent message with $(length(env.payloads)) payloads") - - # Log transport type for each payload - for (i, payload) in enumerate(env.payloads) - log_trace("Payload $i ('$payload.dataname'):") - log_trace(" Transport: $(payload.transport)") - log_trace(" Type: $(payload.payload_type)") - log_trace(" Size: $(payload.size) bytes") - log_trace(" Encoding: $(payload.encoding)") - - if payload.transport == "link" - log_trace(" URL: $(payload.data)") - end - end -end - - -# Run the test -println("Starting DataFrame table transport test...") -println("Correlation ID: $correlation_id") - -# Run sender -println("start smartsend for tables") -test_table_send() - -println("Test completed.") \ No newline at end of file diff --git a/test/test_julia_text_receiver.jl b/test/test_julia_text_receiver.jl deleted file mode 100644 index c7f7402..0000000 --- a/test/test_julia_text_receiver.jl +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env julia -# Test script for text transport testing -# Tests receiving 1 large and 1 small text from Julia serviceA to Julia serviceB -# Uses NATSBridge.jl smartreceive with "text" type - -using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP - -# Include the bridge module -include("../src/NATSBridge.jl") -using .NATSBridge - -# Configuration -const SUBJECT = "/NATSBridge_text_test" -const NATS_URL = "nats.yiem.cc" -const FILESERVER_URL = "http://192.168.88.104:8080" - - -# ------------------------------------------------------------------------------------------------ # -# test text transfer # -# ------------------------------------------------------------------------------------------------ # - - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] $message") -end - - -# Receiver: Listen for messages and verify text handling -function test_text_receive() - conn = NATS.connect(NATS_URL) - NATS.subscribe(conn, SUBJECT) do msg - log_trace("Received message on $(msg.subject)") - - # Use NATSBridge.smartreceive to handle the data - # API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay) - result = NATSBridge.smartreceive( - msg; - max_retries = 5, - base_delay = 100, - max_delay = 5000 - ) - - # Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples - for (dataname, data, data_type) in result["payloads"] - if isa(data, String) - log_trace("Received text '$dataname' of type $data_type") - log_trace(" Length: $(length(data)) characters") - - # Display first 100 characters - if length(data) > 100 - log_trace(" First 100 characters: $(data[1:100])...") - else - log_trace(" Content: $data") - end - - # Save to file - output_path = "./received_$dataname.txt" - write(output_path, data) - log_trace("Saved text to $output_path") - else - log_trace("Received unexpected data type for '$dataname': $(typeof(data))") - end - end - end - - # Keep listening for 10 seconds - sleep(120) - NATS.drain(conn) -end - - -# Run the test -println("Starting text transport test...") -println("Note: This receiver will wait for messages from the sender.") -println("Run test_julia_to_julia_text_sender.jl first to send test data.") - -# Run receiver -println("testing smartreceive for text") -test_text_receive() - -println("Test completed.") \ No newline at end of file diff --git a/test/test_julia_text_sender.jl b/test/test_julia_text_sender.jl deleted file mode 100644 index 29e1839..0000000 --- a/test/test_julia_text_sender.jl +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/env julia -# Test script for text transport testing -# Tests sending 1 large and 1 small text from Julia serviceA to Julia serviceB -# Uses NATSBridge.jl smartsend with "text" type - -using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP - -# Include the bridge module -include("../src/NATSBridge.jl") -using .NATSBridge - -# Configuration -const SUBJECT = "/NATSBridge_text_test" -const NATS_URL = "nats.yiem.cc" -const FILESERVER_URL = "http://192.168.88.104:8080" - -# Create correlation ID for tracing -correlation_id = string(uuid4()) - - -# ------------------------------------------------------------------------------------------------ # -# test text transfer # -# ------------------------------------------------------------------------------------------------ # - - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - - -# File upload handler for plik server -function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} - # Get upload ID - url_getUploadID = "$fileserver_url/upload" - headers = ["Content-Type" => "application/json"] - body = """{ "OneShot" : true }""" - httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) - responseJson = JSON.parse(String(httpResponse.body)) - uploadid = responseJson["id"] - uploadtoken = responseJson["uploadToken"] - - # Upload file - file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") - url_upload = "$fileserver_url/file/$uploadid" - headers = ["X-UploadToken" => uploadtoken] - - form = HTTP.Form(Dict("file" => file_multipart)) - httpResponse = HTTP.post(url_upload, headers, form) - responseJson = JSON.parse(String(httpResponse.body)) - - fileid = responseJson["id"] - url = "$fileserver_url/file/$uploadid/$fileid/$dataname" - - return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) -end - - -# Sender: Send text via smartsend -function test_text_send() - # Create a small text (will use direct transport) - small_text = "Hello, this is a small text message. Testing direct transport via NATS." - - # Create a large text (will use link transport if > 1MB) - # Generate a larger text (~2MB to ensure link transport) - large_text = join(["Line $i: This is a sample text line with some content to pad the size. " for i in 1:50000], "") - - # Test data 1: small text - data1 = ("small_text", small_text, "text") - - # Test data 2: large text - data2 = ("large_text", large_text, "text") - - # Use smartsend with text type - # For small text: will use direct transport (Base64 encoded UTF-8) - # For large text: will use link transport (uploaded to fileserver) - env, env_json_str = NATSBridge.smartsend( - SUBJECT, - [data1, data2]; # List of (dataname, data, type) tuples - broker_url = NATS_URL, - fileserver_url = FILESERVER_URL, - fileserver_upload_handler = plik_upload_handler, - size_threshold = 1_000_000, # 1MB threshold - correlation_id = correlation_id, - msg_purpose = "chat", - sender_name = "text_sender", - receiver_name = "", - receiver_id = "", - reply_to = "", - reply_to_msg_id = "", - is_publish = true # Publish the message to NATS - ) - - log_trace("Sent message with $(length(env.payloads)) payloads") - - # Log transport type for each payload - for (i, payload) in enumerate(env.payloads) - log_trace("Payload $i ('$payload.dataname'):") - log_trace(" Transport: $(payload.transport)") - log_trace(" Type: $(payload.payload_type)") - log_trace(" Size: $(payload.size) bytes") - log_trace(" Encoding: $(payload.encoding)") - - if payload.transport == "link" - log_trace(" URL: $(payload.data)") - end - end -end - - -# Run the test -println("Starting text transport test...") -println("Correlation ID: $correlation_id") - -# Run sender -println("start smartsend for text") -test_text_send() - -println("Test completed.") \ No newline at end of file