From 8c106464fdd031f0993c44ad9e88df34cd2b70d3 Mon Sep 17 00:00:00 2001 From: narawat Date: Thu, 19 Feb 2026 07:08:57 +0700 Subject: [PATCH] add test --- test/julia_to_julia_dict_receiver.jl | 98 +++++++++ test/julia_to_julia_dict_sender.jl | 90 ++++++++ test/julia_to_julia_mix_payload_receiver.jl | 131 ++++++++++++ test/julia_to_julia_mix_payload_sender.jl | 219 ++++++++++++++++++++ test/julia_to_julia_table_receiver.jl | 99 +++++++++ test/julia_to_julia_table_sender.jl | 97 +++++++++ 6 files changed, 734 insertions(+) create mode 100644 test/julia_to_julia_dict_receiver.jl create mode 100644 test/julia_to_julia_dict_sender.jl create mode 100644 test/julia_to_julia_mix_payload_receiver.jl create mode 100644 test/julia_to_julia_mix_payload_sender.jl create mode 100644 test/julia_to_julia_table_receiver.jl create mode 100644 test/julia_to_julia_table_sender.jl diff --git a/test/julia_to_julia_dict_receiver.jl b/test/julia_to_julia_dict_receiver.jl new file mode 100644 index 0000000..5230901 --- /dev/null +++ b/test/julia_to_julia_dict_receiver.jl @@ -0,0 +1,98 @@ +#!/usr/bin/env julia +# Test script for dictionary transfer from Julia serviceA to Julia serviceB +# Demonstrates the "Command & Control" scenario (small dictionary) using NATSBridge +# +# This is serviceB - the receiver that receives a dummy dictionary from serviceA + +using NATSBridge +using UUIDs +using JSON +using Dates + +# Include the NATSBridge module +include("src/NATSBridge.jl") + +# Configuration +const SUBJECT = "/NATSBridge_dict_test" +const NATS_URL = "nats://localhost:4222" + +# Helper: Log with correlation ID +function log_trace(correlation_id::String, message::String) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + + +# Receiver: Receive and process dictionary from serviceA +function receive_dictionary() + # Connect to NATS + conn = NATS.connect(NATS_URL) + + # Subscribe to the subject + subscription = NATS.subscribe(conn, SUBJECT) + + println("Listening for dictionary messages on '$SUBJECT'...") + println("Press Ctrl+C to stop listening.") + + # Listen for messages + while true + # Wait for a message with a 1-second timeout + msg = NATS.waitfor(subscription, 1.0) + + if msg !== nothing + # Extract correlation ID for logging + json_data = JSON.parse(String(msg.payload)) + cid = json_data["correlationId"] + + log_trace(cid, "Received message from $(json_data["senderName"])") + + # Process the message using smartreceive + payloads = NATSBridge.smartreceive( + msg; + fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, cid) -> + NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, cid), + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + log_trace(cid, "Processed $(length(payloads)) payload(s)") + + # Process each payload + for (dataname, data, payload_type) in payloads + log_trace(cid, "Payload '$dataname' type: $payload_type") + + # Handle dictionary type + if payload_type == "dictionary" + println("\nReceived dictionary:") + println(JSON.json(data, 2)) + + # Extract and display specific fields + if isa(data, Dict) + command = get(data, "command", "unknown") + println("\nCommand: $command") + + # Optionally send acknowledgment + reply_to = get(json_data, "replyTo", "") + if !isempty(reply_to) + log_trace(cid, "Reply to: $reply_to") + # Could send ACK here + end + end + else + println("\nReceived non-dictionary payload: $dataname (type: $payload_type)") + end + end + end + end +end + + +# Run the receiver +println("Starting dictionary receiver...") +println("Subject: $SUBJECT") +println("NATS URL: $NATS_URL") +println("="^50) + +# Run receiver (this will block and listen for messages) +receive_dictionary() \ No newline at end of file diff --git a/test/julia_to_julia_dict_sender.jl b/test/julia_to_julia_dict_sender.jl new file mode 100644 index 0000000..c9defb6 --- /dev/null +++ b/test/julia_to_julia_dict_sender.jl @@ -0,0 +1,90 @@ +#!/usr/bin/env julia +# Test script for dictionary transfer from Julia serviceA to Julia serviceB +# Demonstrates the "Command & Control" scenario (small dictionary) using NATSBridge +# +# This is serviceA - the sender that sends a dummy dictionary to serviceB + +using NATSBridge +using UUIDs +using JSON + +# Include the NATSBridge module +include("src/NATSBridge.jl") + +# Configuration +const SUBJECT = "/NATSBridge_dict_test" +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" + +# Create correlation ID for tracing +correlation_id = string(uuid4()) + + +# ------------------------------------------------------------------------------------------------ # +# dictionary sender # +# ------------------------------------------------------------------------------------------------ # + + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + + +# Sender: Send a dummy dictionary to serviceB +function send_dictionary() + # Create a dummy dictionary to send + dummy_dict = Dict( + "command" => "start_simulation", + "simulation_id" => string(uuid4()), + "duration_seconds" => 60, + "parameters" => Dict( + "temperature" => 25.5, + "pressure" => 101.3, + "active" => true, + "tags" => ["test", "simulation", "julia_to_julia"] + ), + "metadata" => Dict( + "sender" => "serviceA", + "timestamp" => Dates.now().iso8601 + ) + ) + + # Send the dictionary using smartsend with type="dictionary" + # API: smartsend(subject, [(dataname, data, type), ...]; keywords...) + env = NATSBridge.smartsend( + SUBJECT, + [("dummy_dict", dummy_dict, "dictionary")], # List of (dataname, data, type) tuples + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + size_threshold = 1_000_000, # 1MB threshold - dictionary will use direct transport + correlation_id = correlation_id, + msg_purpose = "chat", + sender_name = "serviceA", + receiver_name = "serviceB", + reply_to = "", + reply_to_msg_id = "" + ) + + log_trace("Sent dictionary via $(env.payloads[1].transport) transport") + log_trace("Payload type: $(env.payloads[1].type)") + log_trace("Envelope correlationId: $(env.correlationId)") + + # Display the sent dictionary + println("\nSent dictionary content:") + println(JSON.json(dummy_dict, 2)) +end + + +# Run the test +println("Starting dictionary transfer test...") +println("Correlation ID: $correlation_id") +println("Subject: $SUBJECT") +println("NATS URL: $NATS_URL") + +# Run sender +println("\n--- Sending dictionary ---") +send_dictionary() + +println("\nTest completed.") \ No newline at end of file diff --git a/test/julia_to_julia_mix_payload_receiver.jl b/test/julia_to_julia_mix_payload_receiver.jl new file mode 100644 index 0000000..b92ee1f --- /dev/null +++ b/test/julia_to_julia_mix_payload_receiver.jl @@ -0,0 +1,131 @@ +using NATSBridge +using UUIDs +using JSON +using DataFrames +using Dates + +# Include the NATSBridge module +include("src/NATSBridge.jl") + +# Constants +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" + +# Main chat receiver function for scenario 6 +function chat_receiver( + subject::String = "/chat/test"; + nats_url::String = NATS_URL, + fileserver_url::String = FILESERVER_URL, + duration::Int = 60, # Duration in seconds to listen for messages + max_messages::Int = 100 # Maximum number of messages to receive +) + println("\n=== Chat Receiver (ServiceB) ===") + println("Subject: $subject") + println("NATS URL: $nats_url") + println("Fileserver URL: $fileserver_url") + println("Listening duration: $(duration)s") + println("Max messages: $max_messages") + println("="^50) + + # Create a handler for the fileserver download + # This will be passed to smartreceive as fileserverDownloadHandler parameter + fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, correlation_id) -> + NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) + + # Connect to NATS and subscribe to the chat subject + conn = NATS.connect(nats_url) + + # Track received messages + message_count = 0 + total_payloads = 0 + + # Subscribe to the subject + subscription = NATS.subscribe(conn, subject) + + @info "Listening for chat messages on '$subject'..." + + # Listen for messages for the specified duration + timeout = time() + duration + + while time() < timeout && message_count < max_messages + # Wait for a message with a short timeout + msg = NATS.waitfor(subscription, 1.0) # 1 second timeout + + if msg !== nothing + message_count += 1 + println("\n--- Message $(message_count) Received ---") + + # Process the message using smartreceive + payloads = NATSBridge.smartreceive( + msg; + fileserverDownloadHandler = fileserverDownloadHandler, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + println("Payloads received: $(length(payloads))") + total_payloads += length(payloads) + + # Process each payload + for (dataname, data, payload_type) in payloads + println(" - $dataname (type: $payload_type)") + + # Handle different types differently for display + if payload_type == "text" + println(" Text content: $(String(data))") + elseif payload_type == "dictionary" + println(" Dictionary content: $(JSON.json(data, 2))") + elseif payload_type == "table" + println(" Table content: $(size(data, 1)) rows, $(size(data, 2)) columns") + if size(data, 1) <= 10 + println(" Sample: $(DataFrames.show(data))") + end + elseif payload_type == "image" + println(" Image: $(length(data)) bytes") + elseif payload_type == "audio" + println(" Audio: $(length(data)) bytes") + elseif payload_type == "video" + println(" Video: $(length(data)) bytes") + elseif payload_type == "binary" + println(" Binary: $(length(data)) bytes") + end + end + + # Extract correlation ID from the message + json_data = JSON.parse(String(msg.payload)) + println(" Correlation ID: $(json_data["correlationId"])") + println(" Message ID: $(json_data["msgId"])") + + # Optional: Send ACK reply + reply_to = get(json_data, "replyTo", "") + if !isempty(reply_to) + println(" Reply to: $reply_to") + # Could send an ACK message here + end + end + end + + println("\n=== Chat Receiver Summary ===") + println("Total messages received: $message_count") + println("Total payloads processed: $total_payloads") + println("Average payloads per message: $(round(total_payloads / max(message_count, 1), digits=2))") + println("="^50) + + # Cleanup + NATS.drain(conn) + + return message_count +end + +# Example usage +if abspath(PROGRAM_FILE) == @__FILE__ + # Parse command line arguments + if length(ARGS) >= 1 + subject = ARGS[1] + else + subject = "/chat/test" + end + + chat_receiver(subject) +end \ No newline at end of file diff --git a/test/julia_to_julia_mix_payload_sender.jl b/test/julia_to_julia_mix_payload_sender.jl new file mode 100644 index 0000000..0a68ff0 --- /dev/null +++ b/test/julia_to_julia_mix_payload_sender.jl @@ -0,0 +1,219 @@ +using NATSBridge +using UUIDs +using JSON +using DataFrames +using Random + +# Include the NATSBridge module +include("src/NATSBridge.jl") + +# Constants +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" + +# Chat message types for scenario 6 +const CHAT_TYPES = ["text", "dictionary", "table", "image", "audio", "video", "binary"] + +# Helper function to create sample text data +function create_text_payload() + texts = [ + "Hello!", + "How are you doing today?", + "This is a test message.", + "Chat with mixed content is fun!", + "Short text payload." + ] + return (rand(texts), "text") +end + +# Helper function to create sample dictionary data +function create_dictionary_payload() + dictionaries = [ + Dict("greeting" => "Hello", "status" => "active", "count" => 42), + Dict("user" => "alice", "message_id" => string(uuid4()), "timestamp" => Dates.now().iso8601), + Dict("config" => Dict("theme" => "dark", "notifications" => true)) + ] + return (rand(dictionaries), "dictionary") +end + +# Helper function to create sample table data (DataFrame) +function create_table_payload() + # Small DataFrame + df_small = DataFrame( + id = 1:5, + name = ["Alice", "Bob", "Charlie", "Diana", "Eve"], + score = [95, 88, 92, 78, 85], + status = ["active", "active", "inactive", "active", "pending"] + ) + + # Large DataFrame (> 1MB) + df_large = DataFrame( + id = 1:50000, + name = ["User_$i" for i in 1:50000], + value = rand(50000) .* 100, + status = ["active", "inactive", "pending"][rand(1:3, 50000)] + ) + + # Randomly choose small or large + return (rand([df_small, df_large]), "table") +end + +# Helper function to create sample image data (Vector{UInt8}) +function create_image_payload() + # Create random image bytes (small) + small_image = rand(UInt8, 100_000) # ~100KB + # Large image (> 1MB) + large_image = rand(UInt8, 2_000_000) # ~2MB + + return (rand([small_image, large_image]), "image") +end + +# Helper function to create sample audio data (Vector{UInt8}) +function create_audio_payload() + # Create random audio bytes (small) + small_audio = rand(UInt8, 150_000) # ~150KB + # Large audio (> 1MB) + large_audio = rand(UInt8, 3_000_000) # ~3MB + + return (rand([small_audio, large_audio]), "audio") +end + +# Helper function to create sample video data (Vector{UInt8}) +function create_video_payload() + # Create random video bytes (small) + small_video = rand(UInt8, 200_000) # ~200KB + # Large video (> 1MB) + large_video = rand(UInt8, 5_000_000) # ~5MB + + return (rand([small_video, large_video]), "video") +end + +# Helper function to create sample binary data (Vector{UInt8}) +function create_binary_payload() + # Create random binary bytes (small) + small_binary = rand(UInt8, 50_000) # ~50KB + # Large binary (> 1MB) + large_binary = rand(UInt8, 1_500_000) # ~1.5MB + + return (rand([small_binary, large_binary]), "binary") +end + +# Main chat sender function for scenario 6 +function chat_sender( + subject::String = "/chat/test", + num_messages::Int = 10; + nats_url::String = NATS_URL, + fileserver_url::String = FILESERVER_URL +) + println("\n=== Chat Sender (ServiceA) ===") + println("Subject: $subject") + println("Number of messages: $num_messages") + println("NATS URL: $nats_url") + println("Fileserver URL: $fileserver_url") + println("="^50) + + # Create a handler for the fileserver upload + # This will be passed to smartsend as fileserverUploadHandler parameter + fileserverUploadHandler = (url, dataname, data) -> NATSBridge.plik_oneshot_upload(url, dataname, data) + + for i in 1:num_messages + # Generate random chat message with mixed content + # Each message can have 1-5 payloads with different types + num_payloads = rand(1:5) + + # Create payloads list + payloads = Tuple{String, Any, String}[] + + # Track if we need to include text (required for chat) + has_text = false + + # Create random payloads + for j in 1:num_payloads + # Randomly select a payload type + payload_type = rand(CHAT_TYPES) + + # Create the payload based on type + payload_data, payload_type = if payload_type == "text" + create_text_payload() + elseif payload_type == "dictionary" + create_dictionary_payload() + elseif payload_type == "table" + create_table_payload() + elseif payload_type == "image" + create_image_payload() + elseif payload_type == "audio" + create_audio_payload() + elseif payload_type == "video" + create_video_payload() + elseif payload_type == "binary" + create_binary_payload() + end + + # Ensure at least one text payload + if payload_type == "text" + has_text = true + end + + push!(payloads, ("payload_$j", payload_data, payload_type)) + end + + # Ensure at least one text payload exists + if !has_text + text_data, text_type = create_text_payload() + push!(payloads, ("message_text", text_data, text_type)) + end + + # Generate chat message metadata + chat_metadata = Dict( + "message_index" => i, + "timestamp" => Dates.now().iso8601, + "sender" => "serviceA", + "payload_count" => length(payloads) + ) + + # Send the chat message with mixed content + println("\n--- Message $i ---") + println("Payloads: $(length(payloads))") + for (dataname, data, type) in payloads + println(" - $dataname (type: $type)") + end + + env = NATSBridge.smartsend( + subject, + payloads; + nats_url = nats_url, + fileserver_url = fileserver_url, + fileserverUploadHandler = fileserverUploadHandler, + size_threshold = 1_000_000, # 1MB threshold + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "serviceA", + receiver_name = "serviceB", + reply_to = "/chat/reply", + reply_to_msg_id = "" + ) + + println("Envelope created with correlationId: $(env.correlationId)") + println("Message published successfully!") + + # Wait a bit between messages + sleep(rand(0.1:0.3)) + end + + println("\n=== Chat Sender Complete ===") + return true +end + +# Example usage +if abspath(PROGRAM_FILE) == @__FILE__ + # Parse command line arguments + if length(ARGS) >= 2 + subject = ARGS[1] + num_messages = parse(Int, ARGS[2]) + else + subject = "/chat/test" + num_messages = 5 + end + + chat_sender(subject, num_messages) +end \ No newline at end of file diff --git a/test/julia_to_julia_table_receiver.jl b/test/julia_to_julia_table_receiver.jl new file mode 100644 index 0000000..cf29891 --- /dev/null +++ b/test/julia_to_julia_table_receiver.jl @@ -0,0 +1,99 @@ +#!/usr/bin/env julia +# Test script for DataFrame transfer from Julia serviceA to Julia serviceB +# Demonstrates the "Selection" scenario (small Arrow table) using NATSBridge +# +# This is serviceB - the receiver that receives a dummy DataFrame from serviceA + +using NATSBridge +using UUIDs +using DataFrames +using JSON +using Dates + +# Include the NATSBridge module +include("src/NATSBridge.jl") + +# Configuration +const SUBJECT = "/NATSBridge_table_test" +const NATS_URL = "nats://localhost:4222" + +# Helper: Log with correlation ID +function log_trace(correlation_id::String, message::String) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + + +# Receiver: Receive and process DataFrame from serviceA +function receive_dataframe() + # Connect to NATS + conn = NATS.connect(NATS_URL) + + # Subscribe to the subject + subscription = NATS.subscribe(conn, SUBJECT) + + println("Listening for DataFrame messages on '$SUBJECT'...") + println("Press Ctrl+C to stop listening.") + + # Listen for messages + while true + # Wait for a message with a 1-second timeout + msg = NATS.waitfor(subscription, 1.0) + + if msg !== nothing + # Extract correlation ID for logging + json_data = JSON.parse(String(msg.payload)) + cid = json_data["correlationId"] + + log_trace(cid, "Received message from $(json_data["senderName"])") + + # Process the message using smartreceive + payloads = NATSBridge.smartreceive( + msg; + fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, cid) -> + NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, cid), + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + log_trace(cid, "Processed $(length(payloads)) payload(s)") + + # Process each payload + for (dataname, data, payload_type) in payloads + log_trace(cid, "Payload '$dataname' type: $payload_type") + + # Handle table (DataFrame) type + if payload_type == "table" + println("\nReceived DataFrame:") + println(data) + + # Display DataFrame dimensions + println("\nDataFrame dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns") + + # Display column names + println("Column names: $(names(data))") + + # Optionally send acknowledgment + reply_to = get(json_data, "replyTo", "") + if !isempty(reply_to) + log_trace(cid, "Reply to: $reply_to") + # Could send ACK here + end + else + println("\nReceived non-table payload: $dataname (type: $payload_type)") + end + end + end + end +end + + +# Run the receiver +println("Starting DataFrame receiver...") +println("Subject: $SUBJECT") +println("NATS URL: $NATS_URL") +println("="^50) + +# Run receiver (this will block and listen for messages) +receive_dataframe() \ No newline at end of file diff --git a/test/julia_to_julia_table_sender.jl b/test/julia_to_julia_table_sender.jl new file mode 100644 index 0000000..0782c3f --- /dev/null +++ b/test/julia_to_julia_table_sender.jl @@ -0,0 +1,97 @@ +#!/usr/bin/env julia +# Test script for DataFrame transfer from Julia serviceA to Julia serviceB +# Demonstrates the "Selection" scenario (small Arrow table) using NATSBridge +# +# This is serviceA - the sender that sends a dummy DataFrame to serviceB + +using NATSBridge +using UUIDs +using DataFrames +using JSON +using Dates + +# Include the NATSBridge module +include("src/NATSBridge.jl") + +# Configuration +const SUBJECT = "/NATSBridge_table_test" +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" + +# Create correlation ID for tracing +correlation_id = string(uuid4()) + + +# ------------------------------------------------------------------------------------------------ # +# DataFrame sender # +# ------------------------------------------------------------------------------------------------ # + + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + + +# Sender: Send a dummy DataFrame to serviceB +function send_dataframe() + # Create a dummy DataFrame (table) to send + # This simulates a selection scenario where Julia server generates options for user selection + dummy_df = DataFrame( + id = 1:10, + name = ["Option A", "Option B", "Option C", "Option D", "Option E", + "Option F", "Option G", "Option H", "Option I", "Option J"], + score = [95, 88, 92, 78, 85, 90, 87, 93, 89, 91], + category = ["A", "B", "A", "C", "B", "A", "C", "A", "B", "C"], + active = [true, true, false, true, true, false, true, true, true, false] + ) + + # Calculate approximate size + df_size = sizeof(dummy_df) + log_trace("DataFrame size: $(df_size / 1024) KB") + + # Check if DataFrame is small enough for direct transport (< 1MB) + if df_size < 1_000_000 + log_trace("Using direct transport (size < 1MB)") + else + log_trace("Using link transport (size >= 1MB)") + end + + # Send the DataFrame using smartsend with type="table" + # API: smartsend(subject, [(dataname, data, type), ...]; keywords...) + env = NATSBridge.smartsend( + SUBJECT, + [("selection_table", dummy_df, "table")], # List of (dataname, data, type) tuples + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + size_threshold = 1_000_000, # 1MB threshold + correlation_id = correlation_id, + msg_purpose = "chat", + sender_name = "serviceA", + receiver_name = "serviceB", + reply_to = "", + reply_to_msg_id = "" + ) + + log_trace("Sent DataFrame via $(env.payloads[1].transport) transport") + log_trace("Payload type: $(env.payloads[1].type)") + log_trace("Envelope correlationId: $(env.correlationId)") + + # Display the sent DataFrame + println("\nSent DataFrame content:") + println(dummy_df) +end + + +# Run the test +println("Starting DataFrame transfer test...") +println("Correlation ID: $correlation_id") +println("Subject: $SUBJECT") +println("NATS URL: $NATS_URL") + +# Run sender +println("\n--- Sending DataFrame ---") +send_dataframe() + +println("\nTest completed.") \ No newline at end of file