228 lines
9.3 KiB
Julia
228 lines
9.3 KiB
Julia
#!/usr/bin/env julia
|
|
# Test script for mixed-content message testing
|
|
# Tests receiving a mix of text, json, table, image, audio, video, and binary data
|
|
# from Julia serviceA to Julia serviceB using NATSBridge.jl smartreceive
|
|
#
|
|
# This test demonstrates that any combination and any number of mixed content
|
|
# can be sent and received correctly.
|
|
|
|
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64
|
|
|
|
# Include the bridge module
|
|
include("../src/NATSBridge.jl")
|
|
using .NATSBridge
|
|
|
|
# Configuration
|
|
const SUBJECT = "/NATSBridge_mix_test"
|
|
const NATS_URL = "nats.yiem.cc"
|
|
const FILESERVER_URL = "http://192.168.88.104:8080"
|
|
|
|
|
|
# ------------------------------------------------------------------------------------------------ #
|
|
# test mixed content transfer #
|
|
# ------------------------------------------------------------------------------------------------ #
|
|
|
|
|
|
# Helper: Log with correlation ID
|
|
function log_trace(message)
|
|
timestamp = Dates.now()
|
|
println("[$timestamp] $message")
|
|
end
|
|
|
|
|
|
# Receiver: Listen for messages and verify mixed content handling
|
|
function test_mix_receive()
|
|
conn = NATS.connect(NATS_URL)
|
|
NATS.subscribe(conn, SUBJECT) do msg
|
|
log_trace("Received message on $(msg.subject)")
|
|
|
|
# Use NATSBridge.smartreceive to handle the data
|
|
# API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay)
|
|
result = NATSBridge.smartreceive(
|
|
msg;
|
|
max_retries = 5,
|
|
base_delay = 100,
|
|
max_delay = 5000
|
|
)
|
|
|
|
log_trace("Received $(length(result["payloads"])) payloads")
|
|
|
|
# Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples
|
|
for (dataname, data, data_type) in result["payloads"]
|
|
log_trace("\n=== Payload: $dataname (type: $data_type) ===")
|
|
|
|
# Handle different data types
|
|
if data_type == "text"
|
|
# Text data - should be a String
|
|
if isa(data, String)
|
|
log_trace(" Type: String")
|
|
log_trace(" Length: $(length(data)) characters")
|
|
|
|
# Display first 200 characters
|
|
if length(data) > 200
|
|
log_trace(" First 200 chars: $(data[1:200])...")
|
|
else
|
|
log_trace(" Content: $data")
|
|
end
|
|
|
|
# Save to file
|
|
output_path = "./received_$dataname.txt"
|
|
write(output_path, data)
|
|
log_trace(" Saved to: $output_path")
|
|
else
|
|
log_trace(" ERROR: Expected String, got $(typeof(data))")
|
|
end
|
|
|
|
elseif data_type == "dictionary"
|
|
# Dictionary data - should be JSON object
|
|
if isa(data, JSON.Object{String, Any})
|
|
log_trace(" Type: Dict")
|
|
log_trace(" Keys: $(keys(data))")
|
|
|
|
# Display nested content
|
|
for (key, value) in data
|
|
log_trace(" $key => $value")
|
|
end
|
|
|
|
# Save to JSON file
|
|
output_path = "./received_$dataname.json"
|
|
json_str = JSON.json(data, 2)
|
|
write(output_path, json_str)
|
|
log_trace(" Saved to: $output_path")
|
|
else
|
|
log_trace(" ERROR: Expected Dict, got $(typeof(data))")
|
|
end
|
|
|
|
elseif data_type == "table"
|
|
# Table data - should be a DataFrame
|
|
data = DataFrame(data)
|
|
if isa(data, DataFrame)
|
|
log_trace(" Type: DataFrame")
|
|
log_trace(" Dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns")
|
|
log_trace(" Columns: $(names(data))")
|
|
|
|
# Display first few rows
|
|
log_trace(" First 5 rows:")
|
|
display(data[1:min(5, size(data, 1)), :])
|
|
|
|
# Save to Arrow file
|
|
output_path = "./received_$dataname.arrow"
|
|
io = IOBuffer()
|
|
Arrow.write(io, data)
|
|
write(output_path, take!(io))
|
|
log_trace(" Saved to: $output_path")
|
|
else
|
|
log_trace(" ERROR: Expected DataFrame, got $(typeof(data))")
|
|
end
|
|
|
|
elseif data_type == "image"
|
|
# Image data - should be Vector{UInt8}
|
|
if isa(data, Vector{UInt8})
|
|
log_trace(" Type: Vector{UInt8} (binary)")
|
|
log_trace(" Size: $(length(data)) bytes")
|
|
|
|
# Save to file
|
|
output_path = "./received_$dataname.bin"
|
|
write(output_path, data)
|
|
log_trace(" Saved to: $output_path")
|
|
else
|
|
log_trace(" ERROR: Expected Vector{UInt8}, got $(typeof(data))")
|
|
end
|
|
|
|
elseif data_type == "audio"
|
|
# Audio data - should be Vector{UInt8}
|
|
if isa(data, Vector{UInt8})
|
|
log_trace(" Type: Vector{UInt8} (binary)")
|
|
log_trace(" Size: $(length(data)) bytes")
|
|
|
|
# Save to file
|
|
output_path = "./received_$dataname.bin"
|
|
write(output_path, data)
|
|
log_trace(" Saved to: $output_path")
|
|
else
|
|
log_trace(" ERROR: Expected Vector{UInt8}, got $(typeof(data))")
|
|
end
|
|
|
|
elseif data_type == "video"
|
|
# Video data - should be Vector{UInt8}
|
|
if isa(data, Vector{UInt8})
|
|
log_trace(" Type: Vector{UInt8} (binary)")
|
|
log_trace(" Size: $(length(data)) bytes")
|
|
|
|
# Save to file
|
|
output_path = "./received_$dataname.bin"
|
|
write(output_path, data)
|
|
log_trace(" Saved to: $output_path")
|
|
else
|
|
log_trace(" ERROR: Expected Vector{UInt8}, got $(typeof(data))")
|
|
end
|
|
|
|
elseif data_type == "binary"
|
|
# Binary data - should be Vector{UInt8}
|
|
if isa(data, Vector{UInt8})
|
|
log_trace(" Type: Vector{UInt8} (binary)")
|
|
log_trace(" Size: $(length(data)) bytes")
|
|
|
|
# Save to file
|
|
output_path = "./received_$dataname.bin"
|
|
write(output_path, data)
|
|
log_trace(" Saved to: $output_path")
|
|
else
|
|
log_trace(" ERROR: Expected Vector{UInt8}, got $(typeof(data))")
|
|
end
|
|
|
|
else
|
|
log_trace(" ERROR: Unknown data type '$data_type'")
|
|
end
|
|
end
|
|
|
|
# Summary
|
|
println("\n=== Verification Summary ===")
|
|
text_count = count(x -> x[3] == "text", result["payloads"])
|
|
dict_count = count(x -> x[3] == "dictionary", result["payloads"])
|
|
table_count = count(x -> x[3] == "table", result["payloads"])
|
|
image_count = count(x -> x[3] == "image", result["payloads"])
|
|
audio_count = count(x -> x[3] == "audio", result["payloads"])
|
|
video_count = count(x -> x[3] == "video", result["payloads"])
|
|
binary_count = count(x -> x[3] == "binary", result["payloads"])
|
|
|
|
log_trace("Text payloads: $text_count")
|
|
log_trace("Dictionary payloads: $dict_count")
|
|
log_trace("Table payloads: $table_count")
|
|
log_trace("Image payloads: $image_count")
|
|
log_trace("Audio payloads: $audio_count")
|
|
log_trace("Video payloads: $video_count")
|
|
log_trace("Binary payloads: $binary_count")
|
|
|
|
# Print transport type info for each payload if available
|
|
println("\n=== Payload Details ===")
|
|
for (dataname, data, data_type) in result["payloads"]
|
|
if data_type in ["image", "audio", "video", "binary"]
|
|
log_trace("$dataname: $(length(data)) bytes (binary)")
|
|
elseif data_type == "table"
|
|
data = DataFrame(data)
|
|
log_trace("$dataname: $(size(data, 1)) rows x $(size(data, 2)) columns (DataFrame)")
|
|
elseif data_type == "dictionary"
|
|
log_trace("$dataname: $(length(JSON.json(data))) bytes (Dict)")
|
|
elseif data_type == "text"
|
|
log_trace("$dataname: $(length(data)) characters (String)")
|
|
end
|
|
end
|
|
end
|
|
|
|
# Keep listening for 2 minutes
|
|
sleep(120)
|
|
NATS.drain(conn)
|
|
end
|
|
|
|
|
|
# Run the test
|
|
println("Starting mixed-content transport test...")
|
|
println("Note: This receiver will wait for messages from the sender.")
|
|
println("Run test_julia_to_julia_mix_sender.jl first to send test data.")
|
|
|
|
# Run receiver
|
|
println("\ntesting smartreceive for mixed content")
|
|
test_mix_receive()
|
|
|
|
println("\nTest completed.") |