259 lines
9.8 KiB
Julia
259 lines
9.8 KiB
Julia
#!/usr/bin/env julia
|
|
# Test script for mixed-content message testing
|
|
# Tests sending a mix of text, dictionary, arrowtable, jsontable, image, audio, video, and binary data
|
|
# from Julia serviceA to Julia serviceB using msghandler.jl smartpack
|
|
#
|
|
# This test demonstrates that any combination and any number of mixed content
|
|
# can be sent and received correctly.
|
|
#
|
|
# Key concept: DataFrames are the main table representation in Julia.
|
|
# The msghandler.jl library handles serialization:
|
|
# - For "arrowtable" type: DataFrame is serialized to Arrow IPC format
|
|
# - For "jsontable" type: DataFrame is converted to Vector{Dict} and then to JSON
|
|
|
|
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64
|
|
|
|
# Include the bridge module
|
|
include("/home/ton/docker-apps/sommpanion/msghandler/src/msghandler.jl")
|
|
using .msghandler
|
|
|
|
# Configuration
|
|
const SUBJECT = "/msghandler"
|
|
const NATS_URL = "nats.yiem.cc"
|
|
const FILESERVER_URL = "http://192.168.88.104:8080"
|
|
|
|
# Create correlation ID for tracing
|
|
correlation_id = string(uuid4())
|
|
|
|
|
|
# ------------------------------------------------------------------------------------------------ #
|
|
# test mixed content transfer #
|
|
# ------------------------------------------------------------------------------------------------ #
|
|
|
|
|
|
# Helper: Log with correlation ID
|
|
function log_trace(message)
|
|
timestamp = Dates.now()
|
|
println("[$timestamp] [Correlation: $correlation_id] $message")
|
|
end
|
|
|
|
|
|
# File upload handler for plik server
|
|
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
|
# Get upload ID
|
|
url_getUploadID = "$fileserver_url/upload"
|
|
headers = ["Content-Type" => "application/json"]
|
|
body = """{ "OneShot" : true }"""
|
|
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
|
responseJson = JSON.parse(String(httpResponse.body))
|
|
uploadid = responseJson["id"]
|
|
uploadtoken = responseJson["uploadToken"]
|
|
|
|
# Upload file
|
|
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
|
url_upload = "$fileserver_url/file/$uploadid"
|
|
headers = ["X-UploadToken" => uploadtoken]
|
|
|
|
form = HTTP.Form(Dict("file" => file_multipart))
|
|
httpResponse = HTTP.post(url_upload, headers, form)
|
|
responseJson = JSON.parse(String(httpResponse.body))
|
|
|
|
fileid = responseJson["id"]
|
|
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
|
|
|
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
|
end
|
|
|
|
|
|
# Helper: Create sample data for each type
|
|
function create_sample_data()
|
|
# Text data (small - direct transport)
|
|
text_data = "Hello! This is a test chat message. 🎉\nHow are you doing today? 😊"
|
|
|
|
# Dictionary/JSON data (medium - could be direct or link)
|
|
dict_data = Dict(
|
|
"type" => "chat",
|
|
"sender" => "serviceA",
|
|
"receiver" => "serviceB",
|
|
"metadata" => Dict(
|
|
"timestamp" => string(Dates.now()),
|
|
"priority" => "high",
|
|
"tags" => ["urgent", "chat", "test"]
|
|
),
|
|
"content" => Dict(
|
|
"text" => "This is a JSON-formatted chat message with nested structure.",
|
|
"format" => "markdown",
|
|
"mentions" => ["user1", "user2"]
|
|
)
|
|
)
|
|
|
|
# Arrow table data (DataFrame - small - direct transport)
|
|
# Uses Arrow IPC format for efficient binary serialization
|
|
# msghandler.jl handles serialization: DataFrame -> Arrow IPC
|
|
arrow_table_small = DataFrame(
|
|
id = 1:10,
|
|
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
|
|
score = rand(50:100, 10),
|
|
active = rand([true, false], 10)
|
|
)
|
|
|
|
# Arrow table data (DataFrame - large - link transport)
|
|
# ~1.5MB of Arrow data (200,000 rows) - should trigger link transport
|
|
# msghandler.jl handles serialization: DataFrame -> Arrow IPC
|
|
arrow_table_large = DataFrame(
|
|
id = 1:2_000_000,
|
|
name = ["user_$i" for i in 1:2_000_000],
|
|
score = rand(50:100, 2_000_000),
|
|
active = rand([true, false], 2_000_000),
|
|
timestamp = [string(Dates.now()) for _ in 1:2_000_000]
|
|
)
|
|
|
|
# Json table data (DataFrame - small - direct transport)
|
|
# Uses JSON format for human-readable tabular data
|
|
# msghandler.jl handles serialization: DataFrame -> Vector{Dict} -> JSON
|
|
json_table_small = DataFrame(
|
|
id = 1:10,
|
|
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
|
|
score = rand(50:100, 10),
|
|
active = rand([true, false], 10)
|
|
)
|
|
|
|
# Json table data (DataFrame - large - link transport)
|
|
# ~1.5MB of JSON data (150,000 rows) - should trigger link transport
|
|
# msghandler.jl handles serialization: DataFrame -> Vector{Dict} -> JSON
|
|
json_table_large = DataFrame(
|
|
id = 1:2_000_000,
|
|
name = ["user_$i" for i in 1:2_000_000],
|
|
score = rand(50:100, 2_000_000),
|
|
active = rand([true, false], 2_000_000)
|
|
)
|
|
|
|
# Audio data (small binary - direct transport)
|
|
audio_data = UInt8[rand(1:255) for _ in 1:100]
|
|
|
|
# Audio data (large - link transport)
|
|
# ~1.5MB of audio-like data
|
|
large_audio_data = UInt8[rand(1:255) for _ in 1:1_500_000]
|
|
|
|
# Video data (small binary - direct transport)
|
|
video_data = UInt8[rand(1:255) for _ in 1:150]
|
|
|
|
# Video data (large - link transport)
|
|
# ~1.5MB of video-like data
|
|
large_video_data = UInt8[rand(1:255) for _ in 1:1_500_000]
|
|
|
|
# Binary data (small - direct transport)
|
|
binary_data = UInt8[rand(1:255) for _ in 1:200]
|
|
|
|
# Binary data (large - link transport)
|
|
# ~1.5MB of binary data
|
|
large_binary_data = UInt8[rand(1:255) for _ in 1:1_500_000]
|
|
|
|
return (
|
|
text_data,
|
|
dict_data,
|
|
arrow_table_small,
|
|
arrow_table_large,
|
|
json_table_small,
|
|
json_table_large,
|
|
audio_data,
|
|
large_audio_data,
|
|
video_data,
|
|
large_video_data,
|
|
binary_data,
|
|
large_binary_data
|
|
)
|
|
end
|
|
|
|
|
|
# Sender: Send mixed content via smartpack
|
|
function test_mix_send()
|
|
# Create sample data
|
|
(text_data, dict_data, arrow_table_small, arrow_table_large, json_table_small, json_table_large, audio_data, large_audio_data, video_data, large_video_data, binary_data, large_binary_data) = create_sample_data()
|
|
|
|
# Read image files from disk (following test_julia_file_sender.jl pattern)
|
|
# Small image - should use direct transport
|
|
file_path_small_image = "./test/small_image.jpg"
|
|
file_data_small_image = read(file_path_small_image)
|
|
filename_small_image = basename(file_path_small_image)
|
|
|
|
# Large image - should use link transport
|
|
file_path_large_image = "./test/large_image.png"
|
|
file_data_large_image = read(file_path_large_image)
|
|
filename_large_image = basename(file_path_large_image)
|
|
|
|
# Create payloads list - mixed content with both small and large data
|
|
# Small data uses direct transport, large data uses link transport
|
|
# Key: Pass DataFrame directly and specify type as "arrowtable" or "jsontable"
|
|
# msghandler.jl handles the serialization internally
|
|
payloads = [
|
|
# Small data (direct transport) - text, dictionary, arrowtable, jsontable, small image
|
|
("chat_text", text_data, "text"),
|
|
("chat_json", dict_data, "dictionary"),
|
|
# ("arrow_table_small", arrow_table_small, "arrowtable"),
|
|
("json_table_small", json_table_small, "jsontable"),
|
|
(filename_small_image, file_data_small_image, "binary"),
|
|
|
|
# Large data (link transport) - large arrowtable, large jsontable, large image, large audio, large video, large binary
|
|
# ("arrow_table_large", arrow_table_large, "arrowtable"),
|
|
("json_table_large", json_table_large, "jsontable"),
|
|
(filename_large_image, file_data_large_image, "binary"),
|
|
("audio_clip_large", large_audio_data, "audio"),
|
|
("video_clip_large", large_video_data, "video"),
|
|
("binary_file_large", large_binary_data, "binary")
|
|
]
|
|
|
|
# Use smartpack with mixed content
|
|
env, env_json_str = msghandler.smartpack(
|
|
SUBJECT,
|
|
payloads; # List of (dataname, data, type) tuples
|
|
broker_url = NATS_URL,
|
|
fileserver_url = FILESERVER_URL,
|
|
fileserver_upload_handler = plik_upload_handler,
|
|
size_threshold = 1_000_000, # 1MB threshold
|
|
correlation_id = correlation_id,
|
|
msg_purpose = "chat",
|
|
sender_name = "mix_sender",
|
|
receiver_name = "",
|
|
receiver_id = "",
|
|
reply_to = "",
|
|
reply_to_msg_id = "",
|
|
)
|
|
|
|
log_trace("Sent message with $(length(env.payloads)) payloads")
|
|
|
|
# Log transport type for each payload
|
|
for (i, payload) in enumerate(env.payloads)
|
|
log_trace("Payload $i ('$payload.dataname'):")
|
|
log_trace(" Transport: $(payload.transport)")
|
|
log_trace(" Type: $(payload.payload_type)")
|
|
log_trace(" Size: $(payload.size) bytes")
|
|
log_trace(" Encoding: $(payload.encoding)")
|
|
|
|
if payload.transport == "link"
|
|
log_trace(" URL: $(payload.data)")
|
|
end
|
|
end
|
|
|
|
# Summary
|
|
println("\n--- Transport Summary ---")
|
|
direct_count = count(p -> p.transport == "direct", env.payloads)
|
|
link_count = count(p -> p.transport == "link", env.payloads)
|
|
log_trace("Direct transport: $direct_count payloads")
|
|
log_trace("Link transport: $link_count payloads")
|
|
|
|
return env_json_str
|
|
end
|
|
|
|
|
|
# Run the test
|
|
println("Starting mixed-content transport test...")
|
|
println("Correlation ID: $correlation_id")
|
|
|
|
# Run sender
|
|
println("start smartpack for mixed content")
|
|
env_json_str = test_mix_send()
|
|
|
|
println("\nTest completed.")
|
|
println("Note: Run test_julia_to_julia_mix_receiver.jl to receive the messages.")
|