From 51e494c48b3eecb46eda893437c5cb41182a6349 Mon Sep 17 00:00:00 2001 From: narawat Date: Thu, 19 Feb 2026 11:23:15 +0700 Subject: [PATCH] update --- etc.jl | 176 ++------------ src/NATSBridge.jl | 104 +++++---- test/julia_to_julia_dict_receiver.jl | 97 -------- test/julia_to_julia_dict_sender.jl | 90 ------- test/julia_to_julia_mix_payload_receiver.jl | 131 ----------- test/julia_to_julia_mix_payload_sender.jl | 219 ------------------ test/julia_to_julia_table_receiver.jl | 99 -------- test/julia_to_julia_table_sender.jl | 97 -------- ...l => test_julia_to_julia_file_receiver.jl} | 0 ....jl => test_julia_to_julia_file_sender.jl} | 0 10 files changed, 74 insertions(+), 939 deletions(-) delete mode 100644 test/julia_to_julia_dict_receiver.jl delete mode 100644 test/julia_to_julia_dict_sender.jl delete mode 100644 test/julia_to_julia_mix_payload_receiver.jl delete mode 100644 test/julia_to_julia_mix_payload_sender.jl delete mode 100644 test/julia_to_julia_table_receiver.jl delete mode 100644 test/julia_to_julia_table_sender.jl rename test/{julia_to_julia_file_receiver.jl => test_julia_to_julia_file_receiver.jl} (100%) rename test/{julia_to_julia_file_sender.jl => test_julia_to_julia_file_sender.jl} (100%) diff --git a/etc.jl b/etc.jl index d4c7daa..81fdc52 100644 --- a/etc.jl +++ b/etc.jl @@ -1,168 +1,18 @@ +using JSON +d = Dict( + "name"=>"ton", + "age"=> 20, + "metadata" => Dict( + "height"=> 155, + "wife"=> "jane" + ) +) -using Revise -using NATS, JSON, UUIDs, Dates -using HTTP - - -# workdir = - -# Include the bridge module -include("./src/NATSBridge.jl") -using .NATSBridge - -# Configuration -const SUBJECT = "/NATSBridge_test" -const NATS_URL = "nats.yiem.cc" -const FILESERVER_URL = "http://192.168.88.104:8080" - -# Create correlation ID for tracing -correlation_id = string(uuid4()) - - -# ------------------------------------------------------------------------------------------------ # -# test file transfer # -# ------------------------------------------------------------------------------------------------ # - -# File path for large binary payload test -const FILE_PATH = "./testFile_small.zip" -const filename = basename(FILE_PATH) - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - - -function _serialize_data(data::Any, type::String) - if type == "text" # Text data - convert to UTF-8 bytes - if isa(data, String) - return bytes(data) # Convert string to UTF-8 bytes - else - error("Text data must be a String") - end - elseif type == "dictionary" # JSON data - serialize directly - json_str = JSON.json(data) # Convert Julia data to JSON string - return bytes(json_str) # Convert JSON string to bytes - elseif type == "table" # Table data - convert to Arrow IPC stream - io = IOBuffer() # Create in-memory buffer - Arrow.write(io, data) # Write data as Arrow IPC stream to buffer - return take!(io) # Return the buffer contents as bytes - elseif type == "image" # Image data - treat as binary - if isa(data, Vector{UInt8}) - return data # Return binary data directly - else - error("Image data must be Vector{UInt8}") - end - elseif type == "audio" # Audio data - treat as binary - if isa(data, Vector{UInt8}) - return data # Return binary data directly - else - error("Audio data must be Vector{UInt8}") - end - elseif type == "video" # Video data - treat as binary - if isa(data, Vector{UInt8}) - return data # Return binary data directly - else - error("Video data must be Vector{UInt8}") - end - elseif type == "binary" # Binary data - treat as binary - if isa(data, IOBuffer) # Check if data is an IOBuffer - return take!(data) # Return buffer contents as bytes - elseif isa(data, Vector{UInt8}) # Check if data is already binary - return data # Return binary data directly - else # Unsupported binary data type - error("Binary data must be binary (Vector{UInt8} or IOBuffer)") - end - else # Unknown type - error("Unknown type: $type") - end -end - - -# File upload handler for plik server -function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} - # Get upload ID - url_getUploadID = "$fileserver_url/upload" - headers = ["Content-Type" => "application/json"] - body = """{ "OneShot" : true }""" - httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) - responseJson = JSON.parse(String(httpResponse.body)) - uploadid = responseJson["id"] - uploadtoken = responseJson["uploadToken"] - - # Upload file - file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") - url_upload = "$fileserver_url/file/$uploadid" - headers = ["X-UploadToken" => uploadtoken] - - form = HTTP.Form(Dict("file" => file_multipart)) - httpResponse = HTTP.post(url_upload, headers, form) - responseJson = JSON.parse(String(httpResponse.body)) - - fileid = responseJson["id"] - url = "$fileserver_url/file/$uploadid/$fileid/$dataname" - - return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) -end - -function log_trace(correlation_id::String, message::String) - timestamp = Dates.now() # Get current timestamp - @info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message -end - -file_data = read(FILE_PATH) -file_size = length(file_data) - - -subject::String=SUBJECT -data::AbstractArray{Tuple{String, Array{UInt8, 1}, String}, 1}=[(filename, file_data, "binary")] -nats_url::String=DEFAULT_NATS_URL -fileserver_url::String=DEFAULT_FILESERVER_URL -fileserverUploadHandler::Function=plik_upload_handler -size_threshold::Int=1_000_000 -correlation_id::Union{String, Nothing}=correlation_id -msg_purpose::String="chat" -sender_name::String="sender" -receiver_name::String="receiver_name" -receiver_id::String="receiver_id" -reply_to::String="reply_to" -reply_to_msg_id::String="reply_to_msg_id" - - - -(dataname, payload_data, payload_type) = data[1] -payload_bytes = _serialize_data(payload_data, payload_type) - - payload_size = length(payload_bytes) # Calculate payload size in bytes - log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size - -payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string -log_trace(cid, "Using direct transport for $payload_size bytes") - -payload = msgPayload_v1( - id = string(uuid4()), - dataname = dataname, - type = payload_type, - transport = "direct", - encoding = "base64", - size = payload_size, - data = payload_b64, - metadata = Dict("payload_bytes" => payload_size) - ) - -push!(payloads, payload) - - - - - - - - - - +json_str = JSON.json(d) +json_str_bytes = Vector{UInt8}(json_str) +json_str_2 = String(json_str_bytes) +json_obj = JSON.parse(json_str_2) diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 21216fb..438e21b 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -457,48 +457,66 @@ binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary") ``` """ function _serialize_data(data::Any, type::String) - if type == "text" # Text data - convert to UTF-8 bytes - if isa(data, String) - return bytes(data) # Convert string to UTF-8 bytes - else - error("Text data must be a String") - end - elseif type == "dictionary" # JSON data - serialize directly - json_str = JSON.json(data) # Convert Julia data to JSON string - return bytes(json_str) # Convert JSON string to bytes - elseif type == "table" # Table data - convert to Arrow IPC stream - io = IOBuffer() # Create in-memory buffer - Arrow.write(io, data) # Write data as Arrow IPC stream to buffer - return take!(io) # Return the buffer contents as bytes - elseif type == "image" # Image data - treat as binary - if isa(data, Vector{UInt8}) - return data # Return binary data directly - else - error("Image data must be Vector{UInt8}") - end - elseif type == "audio" # Audio data - treat as binary - if isa(data, Vector{UInt8}) - return data # Return binary data directly - else - error("Audio data must be Vector{UInt8}") - end - elseif type == "video" # Video data - treat as binary - if isa(data, Vector{UInt8}) - return data # Return binary data directly - else - error("Video data must be Vector{UInt8}") - end - elseif type == "binary" # Binary data - treat as binary - if isa(data, IOBuffer) # Check if data is an IOBuffer - return take!(data) # Return buffer contents as bytes - elseif isa(data, Vector{UInt8}) # Check if data is already binary - return data # Return binary data directly - else # Unsupported binary data type - error("Binary data must be binary (Vector{UInt8} or IOBuffer)") - end - else # Unknown type - error("Unknown type: $type") - end + """ Example on how JSON.jl convert: dictionary -> json string -> json string bytes -> json string -> json object + d = Dict( + "name"=>"ton", + "age"=> 20, + "metadata" => Dict( + "height"=> 155, + "wife"=> "jane" + ) + ) + + json_str = JSON.json(d) + json_str_bytes = Vector{UInt8}(json_str) + json_str_2 = String(json_str_bytes) + json_obj = JSON.parse(json_str_2) + """ + + if type == "text" # Text data - convert to UTF-8 bytes + if isa(data, String) + data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes + return data_bytes + else + error("Text data must be a String") + end + elseif type == "dictionary" # JSON data - serialize directly + json_str = JSON.json(data) # Convert Julia data to JSON string + json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes + return json_str_bytes + elseif type == "table" # Table data - convert to Arrow IPC stream + io = IOBuffer() # Create in-memory buffer + Arrow.write(io, data) # Write data as Arrow IPC stream to buffer + return take!(io) # Return the buffer contents as bytes + elseif type == "image" # Image data - treat as binary + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Image data must be Vector{UInt8}") + end + elseif type == "audio" # Audio data - treat as binary + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Audio data must be Vector{UInt8}") + end + elseif type == "video" # Video data - treat as binary + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Video data must be Vector{UInt8}") + end + elseif type == "binary" # Binary data - treat as binary + if isa(data, IOBuffer) # Check if data is an IOBuffer + return take!(data) # Return buffer contents as bytes + elseif isa(data, Vector{UInt8}) # Check if data is already binary + return data # Return binary data directly + else # Unsupported binary data type + error("Binary data must be binary (Vector{UInt8} or IOBuffer)") + end + else # Unknown type + error("Unknown type: $type") + end end @@ -675,7 +693,7 @@ function _deserialize_data( return String(data) # Convert bytes to string elseif type == "dictionary" # JSON data - deserialize json_str = String(data) # Convert bytes to string - return JSON.parse(json_str) # Parse JSON string to Julia data structure + return JSON.parse(json_str) # Parse JSON string to JSON object elseif type == "table" # Table data - deserialize Arrow IPC stream io = IOBuffer(data) # Create buffer from bytes df = Arrow.Table(io) # Read Arrow IPC format from buffer diff --git a/test/julia_to_julia_dict_receiver.jl b/test/julia_to_julia_dict_receiver.jl deleted file mode 100644 index 864a020..0000000 --- a/test/julia_to_julia_dict_receiver.jl +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env julia -# Test script for dictionary transfer from Julia serviceA to Julia serviceB -# Demonstrates the "Command & Control" scenario (small dictionary) using NATSBridge -# -# This is serviceB - the receiver that receives a dummy dictionary from serviceA - -using UUIDs -using JSON -using Dates - -# Include the NATSBridge module -include("../src/NATSBridge.jl") - -# Configuration -const SUBJECT = "/NATSBridge_dict_test" -const NATS_URL = "nats.yiem.cc" - -# Helper: Log with correlation ID -function log_trace(correlation_id::String, message::String) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - - -# Receiver: Receive and process dictionary from serviceA -function receive_dictionary() - # Connect to NATS - conn = NATS.connect(NATS_URL) - - # Subscribe to the subject - subscription = NATS.subscribe(conn, SUBJECT) - - println("Listening for dictionary messages on '$SUBJECT'...") - println("Press Ctrl+C to stop listening.") - - # Listen for messages - while true - # Wait for a message with a 1-second timeout - msg = NATS.waitfor(subscription, 1.0) - - if msg !== nothing - # Extract correlation ID for logging - json_data = JSON.parse(String(msg.payload)) - cid = json_data["correlationId"] - - log_trace(cid, "Received message from $(json_data["senderName"])") - - # Process the message using smartreceive - payloads = NATSBridge.smartreceive( - msg; - fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, cid) -> - NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, cid), - max_retries = 5, - base_delay = 100, - max_delay = 5000 - ) - - log_trace(cid, "Processed $(length(payloads)) payload(s)") - - # Process each payload - for (dataname, data, payload_type) in payloads - log_trace(cid, "Payload '$dataname' type: $payload_type") - - # Handle dictionary type - if payload_type == "dictionary" - println("\nReceived dictionary:") - println(JSON.json(data, 2)) - - # Extract and display specific fields - if isa(data, Dict) - command = get(data, "command", "unknown") - println("\nCommand: $command") - - # Optionally send acknowledgment - reply_to = get(json_data, "replyTo", "") - if !isempty(reply_to) - log_trace(cid, "Reply to: $reply_to") - # Could send ACK here - end - end - else - println("\nReceived non-dictionary payload: $dataname (type: $payload_type)") - end - end - end - end -end - - -# Run the receiver -println("Starting dictionary receiver...") -println("Subject: $SUBJECT") -println("NATS URL: $NATS_URL") -println("="^50) - -# Run receiver (this will block and listen for messages) -receive_dictionary() \ No newline at end of file diff --git a/test/julia_to_julia_dict_sender.jl b/test/julia_to_julia_dict_sender.jl deleted file mode 100644 index 11dfb71..0000000 --- a/test/julia_to_julia_dict_sender.jl +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env julia -# Test script for dictionary transfer from Julia serviceA to Julia serviceB -# Demonstrates the "Command & Control" scenario (small dictionary) using NATSBridge -# -# This is serviceA - the sender that sends a dummy dictionary to serviceB - -using UUIDs -using JSON -using Dates - -# Include the NATSBridge module -include("../src/NATSBridge.jl") - -# Configuration -const SUBJECT = "/NATSBridge_dict_test" -const NATS_URL = "nats.yiem.cc" -const FILESERVER_URL = "http://192.168.88.104:8080" - -# Create correlation ID for tracing -correlation_id = string(uuid4()) - - -# ------------------------------------------------------------------------------------------------ # -# dictionary sender # -# ------------------------------------------------------------------------------------------------ # - - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - - -# Sender: Send a dummy dictionary to serviceB -function send_dictionary() - # Create a dummy dictionary to send - dummy_dict = Dict( - "command" => "start_simulation", - "simulation_id" => string(uuid4()), - "duration_seconds" => 60, - "parameters" => Dict( - "temperature" => 25.5, - "pressure" => 101.3, - "active" => true, - "tags" => ["test", "simulation", "julia_to_julia"] - ), - "metadata" => Dict( - "sender" => "serviceA", - "timestamp" => string(Dates.now()) - ) - ) - - # Send the dictionary using smartsend with type="dictionary" - # API: smartsend(subject, [(dataname, data, type), ...]; keywords...) - env = NATSBridge.smartsend( - SUBJECT, - [("dummy_dict", dummy_dict, "dictionary")], # List of (dataname, data, type) tuples - nats_url = NATS_URL, - fileserver_url = FILESERVER_URL, - size_threshold = 1_000_000, # 1MB threshold - dictionary will use direct transport - correlation_id = correlation_id, - msg_purpose = "chat", - sender_name = "serviceA", - receiver_name = "serviceB", - reply_to = "", - reply_to_msg_id = "" - ) - - log_trace("Sent dictionary via $(env.payloads[1].transport) transport") - log_trace("Payload type: $(env.payloads[1].type)") - log_trace("Envelope correlationId: $(env.correlationId)") - - # Display the sent dictionary - println("\nSent dictionary content:") - println(JSON.json(dummy_dict, 2)) -end - - -# Run the test -println("Starting dictionary transfer test...") -println("Correlation ID: $correlation_id") -println("Subject: $SUBJECT") -println("NATS URL: $NATS_URL") - -# Run sender -println("\n--- Sending dictionary ---") -send_dictionary() - -println("\nTest completed.") \ No newline at end of file diff --git a/test/julia_to_julia_mix_payload_receiver.jl b/test/julia_to_julia_mix_payload_receiver.jl deleted file mode 100644 index b92ee1f..0000000 --- a/test/julia_to_julia_mix_payload_receiver.jl +++ /dev/null @@ -1,131 +0,0 @@ -using NATSBridge -using UUIDs -using JSON -using DataFrames -using Dates - -# Include the NATSBridge module -include("src/NATSBridge.jl") - -# Constants -const NATS_URL = "nats://localhost:4222" -const FILESERVER_URL = "http://localhost:8080" - -# Main chat receiver function for scenario 6 -function chat_receiver( - subject::String = "/chat/test"; - nats_url::String = NATS_URL, - fileserver_url::String = FILESERVER_URL, - duration::Int = 60, # Duration in seconds to listen for messages - max_messages::Int = 100 # Maximum number of messages to receive -) - println("\n=== Chat Receiver (ServiceB) ===") - println("Subject: $subject") - println("NATS URL: $nats_url") - println("Fileserver URL: $fileserver_url") - println("Listening duration: $(duration)s") - println("Max messages: $max_messages") - println("="^50) - - # Create a handler for the fileserver download - # This will be passed to smartreceive as fileserverDownloadHandler parameter - fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, correlation_id) -> - NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) - - # Connect to NATS and subscribe to the chat subject - conn = NATS.connect(nats_url) - - # Track received messages - message_count = 0 - total_payloads = 0 - - # Subscribe to the subject - subscription = NATS.subscribe(conn, subject) - - @info "Listening for chat messages on '$subject'..." - - # Listen for messages for the specified duration - timeout = time() + duration - - while time() < timeout && message_count < max_messages - # Wait for a message with a short timeout - msg = NATS.waitfor(subscription, 1.0) # 1 second timeout - - if msg !== nothing - message_count += 1 - println("\n--- Message $(message_count) Received ---") - - # Process the message using smartreceive - payloads = NATSBridge.smartreceive( - msg; - fileserverDownloadHandler = fileserverDownloadHandler, - max_retries = 5, - base_delay = 100, - max_delay = 5000 - ) - - println("Payloads received: $(length(payloads))") - total_payloads += length(payloads) - - # Process each payload - for (dataname, data, payload_type) in payloads - println(" - $dataname (type: $payload_type)") - - # Handle different types differently for display - if payload_type == "text" - println(" Text content: $(String(data))") - elseif payload_type == "dictionary" - println(" Dictionary content: $(JSON.json(data, 2))") - elseif payload_type == "table" - println(" Table content: $(size(data, 1)) rows, $(size(data, 2)) columns") - if size(data, 1) <= 10 - println(" Sample: $(DataFrames.show(data))") - end - elseif payload_type == "image" - println(" Image: $(length(data)) bytes") - elseif payload_type == "audio" - println(" Audio: $(length(data)) bytes") - elseif payload_type == "video" - println(" Video: $(length(data)) bytes") - elseif payload_type == "binary" - println(" Binary: $(length(data)) bytes") - end - end - - # Extract correlation ID from the message - json_data = JSON.parse(String(msg.payload)) - println(" Correlation ID: $(json_data["correlationId"])") - println(" Message ID: $(json_data["msgId"])") - - # Optional: Send ACK reply - reply_to = get(json_data, "replyTo", "") - if !isempty(reply_to) - println(" Reply to: $reply_to") - # Could send an ACK message here - end - end - end - - println("\n=== Chat Receiver Summary ===") - println("Total messages received: $message_count") - println("Total payloads processed: $total_payloads") - println("Average payloads per message: $(round(total_payloads / max(message_count, 1), digits=2))") - println("="^50) - - # Cleanup - NATS.drain(conn) - - return message_count -end - -# Example usage -if abspath(PROGRAM_FILE) == @__FILE__ - # Parse command line arguments - if length(ARGS) >= 1 - subject = ARGS[1] - else - subject = "/chat/test" - end - - chat_receiver(subject) -end \ No newline at end of file diff --git a/test/julia_to_julia_mix_payload_sender.jl b/test/julia_to_julia_mix_payload_sender.jl deleted file mode 100644 index 0a68ff0..0000000 --- a/test/julia_to_julia_mix_payload_sender.jl +++ /dev/null @@ -1,219 +0,0 @@ -using NATSBridge -using UUIDs -using JSON -using DataFrames -using Random - -# Include the NATSBridge module -include("src/NATSBridge.jl") - -# Constants -const NATS_URL = "nats://localhost:4222" -const FILESERVER_URL = "http://localhost:8080" - -# Chat message types for scenario 6 -const CHAT_TYPES = ["text", "dictionary", "table", "image", "audio", "video", "binary"] - -# Helper function to create sample text data -function create_text_payload() - texts = [ - "Hello!", - "How are you doing today?", - "This is a test message.", - "Chat with mixed content is fun!", - "Short text payload." - ] - return (rand(texts), "text") -end - -# Helper function to create sample dictionary data -function create_dictionary_payload() - dictionaries = [ - Dict("greeting" => "Hello", "status" => "active", "count" => 42), - Dict("user" => "alice", "message_id" => string(uuid4()), "timestamp" => Dates.now().iso8601), - Dict("config" => Dict("theme" => "dark", "notifications" => true)) - ] - return (rand(dictionaries), "dictionary") -end - -# Helper function to create sample table data (DataFrame) -function create_table_payload() - # Small DataFrame - df_small = DataFrame( - id = 1:5, - name = ["Alice", "Bob", "Charlie", "Diana", "Eve"], - score = [95, 88, 92, 78, 85], - status = ["active", "active", "inactive", "active", "pending"] - ) - - # Large DataFrame (> 1MB) - df_large = DataFrame( - id = 1:50000, - name = ["User_$i" for i in 1:50000], - value = rand(50000) .* 100, - status = ["active", "inactive", "pending"][rand(1:3, 50000)] - ) - - # Randomly choose small or large - return (rand([df_small, df_large]), "table") -end - -# Helper function to create sample image data (Vector{UInt8}) -function create_image_payload() - # Create random image bytes (small) - small_image = rand(UInt8, 100_000) # ~100KB - # Large image (> 1MB) - large_image = rand(UInt8, 2_000_000) # ~2MB - - return (rand([small_image, large_image]), "image") -end - -# Helper function to create sample audio data (Vector{UInt8}) -function create_audio_payload() - # Create random audio bytes (small) - small_audio = rand(UInt8, 150_000) # ~150KB - # Large audio (> 1MB) - large_audio = rand(UInt8, 3_000_000) # ~3MB - - return (rand([small_audio, large_audio]), "audio") -end - -# Helper function to create sample video data (Vector{UInt8}) -function create_video_payload() - # Create random video bytes (small) - small_video = rand(UInt8, 200_000) # ~200KB - # Large video (> 1MB) - large_video = rand(UInt8, 5_000_000) # ~5MB - - return (rand([small_video, large_video]), "video") -end - -# Helper function to create sample binary data (Vector{UInt8}) -function create_binary_payload() - # Create random binary bytes (small) - small_binary = rand(UInt8, 50_000) # ~50KB - # Large binary (> 1MB) - large_binary = rand(UInt8, 1_500_000) # ~1.5MB - - return (rand([small_binary, large_binary]), "binary") -end - -# Main chat sender function for scenario 6 -function chat_sender( - subject::String = "/chat/test", - num_messages::Int = 10; - nats_url::String = NATS_URL, - fileserver_url::String = FILESERVER_URL -) - println("\n=== Chat Sender (ServiceA) ===") - println("Subject: $subject") - println("Number of messages: $num_messages") - println("NATS URL: $nats_url") - println("Fileserver URL: $fileserver_url") - println("="^50) - - # Create a handler for the fileserver upload - # This will be passed to smartsend as fileserverUploadHandler parameter - fileserverUploadHandler = (url, dataname, data) -> NATSBridge.plik_oneshot_upload(url, dataname, data) - - for i in 1:num_messages - # Generate random chat message with mixed content - # Each message can have 1-5 payloads with different types - num_payloads = rand(1:5) - - # Create payloads list - payloads = Tuple{String, Any, String}[] - - # Track if we need to include text (required for chat) - has_text = false - - # Create random payloads - for j in 1:num_payloads - # Randomly select a payload type - payload_type = rand(CHAT_TYPES) - - # Create the payload based on type - payload_data, payload_type = if payload_type == "text" - create_text_payload() - elseif payload_type == "dictionary" - create_dictionary_payload() - elseif payload_type == "table" - create_table_payload() - elseif payload_type == "image" - create_image_payload() - elseif payload_type == "audio" - create_audio_payload() - elseif payload_type == "video" - create_video_payload() - elseif payload_type == "binary" - create_binary_payload() - end - - # Ensure at least one text payload - if payload_type == "text" - has_text = true - end - - push!(payloads, ("payload_$j", payload_data, payload_type)) - end - - # Ensure at least one text payload exists - if !has_text - text_data, text_type = create_text_payload() - push!(payloads, ("message_text", text_data, text_type)) - end - - # Generate chat message metadata - chat_metadata = Dict( - "message_index" => i, - "timestamp" => Dates.now().iso8601, - "sender" => "serviceA", - "payload_count" => length(payloads) - ) - - # Send the chat message with mixed content - println("\n--- Message $i ---") - println("Payloads: $(length(payloads))") - for (dataname, data, type) in payloads - println(" - $dataname (type: $type)") - end - - env = NATSBridge.smartsend( - subject, - payloads; - nats_url = nats_url, - fileserver_url = fileserver_url, - fileserverUploadHandler = fileserverUploadHandler, - size_threshold = 1_000_000, # 1MB threshold - correlation_id = string(uuid4()), - msg_purpose = "chat", - sender_name = "serviceA", - receiver_name = "serviceB", - reply_to = "/chat/reply", - reply_to_msg_id = "" - ) - - println("Envelope created with correlationId: $(env.correlationId)") - println("Message published successfully!") - - # Wait a bit between messages - sleep(rand(0.1:0.3)) - end - - println("\n=== Chat Sender Complete ===") - return true -end - -# Example usage -if abspath(PROGRAM_FILE) == @__FILE__ - # Parse command line arguments - if length(ARGS) >= 2 - subject = ARGS[1] - num_messages = parse(Int, ARGS[2]) - else - subject = "/chat/test" - num_messages = 5 - end - - chat_sender(subject, num_messages) -end \ No newline at end of file diff --git a/test/julia_to_julia_table_receiver.jl b/test/julia_to_julia_table_receiver.jl deleted file mode 100644 index cf29891..0000000 --- a/test/julia_to_julia_table_receiver.jl +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env julia -# Test script for DataFrame transfer from Julia serviceA to Julia serviceB -# Demonstrates the "Selection" scenario (small Arrow table) using NATSBridge -# -# This is serviceB - the receiver that receives a dummy DataFrame from serviceA - -using NATSBridge -using UUIDs -using DataFrames -using JSON -using Dates - -# Include the NATSBridge module -include("src/NATSBridge.jl") - -# Configuration -const SUBJECT = "/NATSBridge_table_test" -const NATS_URL = "nats://localhost:4222" - -# Helper: Log with correlation ID -function log_trace(correlation_id::String, message::String) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - - -# Receiver: Receive and process DataFrame from serviceA -function receive_dataframe() - # Connect to NATS - conn = NATS.connect(NATS_URL) - - # Subscribe to the subject - subscription = NATS.subscribe(conn, SUBJECT) - - println("Listening for DataFrame messages on '$SUBJECT'...") - println("Press Ctrl+C to stop listening.") - - # Listen for messages - while true - # Wait for a message with a 1-second timeout - msg = NATS.waitfor(subscription, 1.0) - - if msg !== nothing - # Extract correlation ID for logging - json_data = JSON.parse(String(msg.payload)) - cid = json_data["correlationId"] - - log_trace(cid, "Received message from $(json_data["senderName"])") - - # Process the message using smartreceive - payloads = NATSBridge.smartreceive( - msg; - fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, cid) -> - NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, cid), - max_retries = 5, - base_delay = 100, - max_delay = 5000 - ) - - log_trace(cid, "Processed $(length(payloads)) payload(s)") - - # Process each payload - for (dataname, data, payload_type) in payloads - log_trace(cid, "Payload '$dataname' type: $payload_type") - - # Handle table (DataFrame) type - if payload_type == "table" - println("\nReceived DataFrame:") - println(data) - - # Display DataFrame dimensions - println("\nDataFrame dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns") - - # Display column names - println("Column names: $(names(data))") - - # Optionally send acknowledgment - reply_to = get(json_data, "replyTo", "") - if !isempty(reply_to) - log_trace(cid, "Reply to: $reply_to") - # Could send ACK here - end - else - println("\nReceived non-table payload: $dataname (type: $payload_type)") - end - end - end - end -end - - -# Run the receiver -println("Starting DataFrame receiver...") -println("Subject: $SUBJECT") -println("NATS URL: $NATS_URL") -println("="^50) - -# Run receiver (this will block and listen for messages) -receive_dataframe() \ No newline at end of file diff --git a/test/julia_to_julia_table_sender.jl b/test/julia_to_julia_table_sender.jl deleted file mode 100644 index 0782c3f..0000000 --- a/test/julia_to_julia_table_sender.jl +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env julia -# Test script for DataFrame transfer from Julia serviceA to Julia serviceB -# Demonstrates the "Selection" scenario (small Arrow table) using NATSBridge -# -# This is serviceA - the sender that sends a dummy DataFrame to serviceB - -using NATSBridge -using UUIDs -using DataFrames -using JSON -using Dates - -# Include the NATSBridge module -include("src/NATSBridge.jl") - -# Configuration -const SUBJECT = "/NATSBridge_table_test" -const NATS_URL = "nats://localhost:4222" -const FILESERVER_URL = "http://localhost:8080" - -# Create correlation ID for tracing -correlation_id = string(uuid4()) - - -# ------------------------------------------------------------------------------------------------ # -# DataFrame sender # -# ------------------------------------------------------------------------------------------------ # - - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - - -# Sender: Send a dummy DataFrame to serviceB -function send_dataframe() - # Create a dummy DataFrame (table) to send - # This simulates a selection scenario where Julia server generates options for user selection - dummy_df = DataFrame( - id = 1:10, - name = ["Option A", "Option B", "Option C", "Option D", "Option E", - "Option F", "Option G", "Option H", "Option I", "Option J"], - score = [95, 88, 92, 78, 85, 90, 87, 93, 89, 91], - category = ["A", "B", "A", "C", "B", "A", "C", "A", "B", "C"], - active = [true, true, false, true, true, false, true, true, true, false] - ) - - # Calculate approximate size - df_size = sizeof(dummy_df) - log_trace("DataFrame size: $(df_size / 1024) KB") - - # Check if DataFrame is small enough for direct transport (< 1MB) - if df_size < 1_000_000 - log_trace("Using direct transport (size < 1MB)") - else - log_trace("Using link transport (size >= 1MB)") - end - - # Send the DataFrame using smartsend with type="table" - # API: smartsend(subject, [(dataname, data, type), ...]; keywords...) - env = NATSBridge.smartsend( - SUBJECT, - [("selection_table", dummy_df, "table")], # List of (dataname, data, type) tuples - nats_url = NATS_URL, - fileserver_url = FILESERVER_URL, - size_threshold = 1_000_000, # 1MB threshold - correlation_id = correlation_id, - msg_purpose = "chat", - sender_name = "serviceA", - receiver_name = "serviceB", - reply_to = "", - reply_to_msg_id = "" - ) - - log_trace("Sent DataFrame via $(env.payloads[1].transport) transport") - log_trace("Payload type: $(env.payloads[1].type)") - log_trace("Envelope correlationId: $(env.correlationId)") - - # Display the sent DataFrame - println("\nSent DataFrame content:") - println(dummy_df) -end - - -# Run the test -println("Starting DataFrame transfer test...") -println("Correlation ID: $correlation_id") -println("Subject: $SUBJECT") -println("NATS URL: $NATS_URL") - -# Run sender -println("\n--- Sending DataFrame ---") -send_dataframe() - -println("\nTest completed.") \ No newline at end of file diff --git a/test/julia_to_julia_file_receiver.jl b/test/test_julia_to_julia_file_receiver.jl similarity index 100% rename from test/julia_to_julia_file_receiver.jl rename to test/test_julia_to_julia_file_receiver.jl diff --git a/test/julia_to_julia_file_sender.jl b/test/test_julia_to_julia_file_sender.jl similarity index 100% rename from test/julia_to_julia_file_sender.jl rename to test/test_julia_to_julia_file_sender.jl