add test images
This commit is contained in:
@@ -43,7 +43,7 @@
|
||||
|
||||
module NATSBridge
|
||||
|
||||
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting
|
||||
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting, DataFrames
|
||||
# ---------------------------------------------- 100 --------------------------------------------- #
|
||||
|
||||
# Constants
|
||||
@@ -302,7 +302,7 @@ function envelope_to_json(env::msg_envelope_v1)
|
||||
)
|
||||
# Include data based on transport type
|
||||
if payload.transport == "direct" && payload.data !== nothing
|
||||
if payload.encoding == "base64" || payload.encoding == "json"
|
||||
if payload.encoding == "base64" || payload.encoding == "json" || payload.encoding == "arrow-ipc"
|
||||
payload_obj["data"] = payload.data
|
||||
else
|
||||
# For other encodings, use base64
|
||||
@@ -462,6 +462,8 @@ function smartsend(
|
||||
# Process each payload in the list
|
||||
payloads = msg_payload_v1[]
|
||||
for (dataname, payload_data, payload_type) in data
|
||||
@show dataname typeof(payload_data)
|
||||
|
||||
# Serialize data based on type
|
||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
||||
|
||||
|
||||
BIN
test/large_image.png
Normal file
BIN
test/large_image.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.2 MiB |
BIN
test/small_image.jpg
Normal file
BIN
test/small_image.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 76 KiB |
@@ -1,82 +0,0 @@
|
||||
#!/usr/bin/env julia
|
||||
# Test script for Dictionary transport testing
|
||||
# Tests receiving 1 large and 1 small Dictionaries via direct and link transport
|
||||
# Uses NATSBridge.jl smartreceive with "dictionary" type
|
||||
|
||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/NATSBridge_dict_test"
|
||||
const NATS_URL = "nats.yiem.cc"
|
||||
const FILESERVER_URL = "http://192.168.88.104:8080"
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
# test dictionary transfer #
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] $message")
|
||||
end
|
||||
|
||||
|
||||
# Receiver: Listen for messages and verify Dictionary handling
|
||||
function test_dict_receive()
|
||||
conn = NATS.connect(NATS_URL)
|
||||
NATS.subscribe(conn, SUBJECT) do msg
|
||||
log_trace("Received message on $(msg.subject)")
|
||||
|
||||
# Use NATSBridge.smartreceive to handle the data
|
||||
# API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay)
|
||||
result = NATSBridge.smartreceive(
|
||||
msg;
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
)
|
||||
|
||||
# Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples
|
||||
for (dataname, data, data_type) in result["payloads"]
|
||||
if isa(data, JSON.Object{String, Any})
|
||||
log_trace("Received Dictionary '$dataname' of type $data_type")
|
||||
|
||||
# Display dictionary contents
|
||||
println(" Contents:")
|
||||
for (key, value) in data
|
||||
println(" $key => $value")
|
||||
end
|
||||
|
||||
# Save to JSON file
|
||||
output_path = "./received_$dataname.json"
|
||||
json_str = JSON.json(data, 2)
|
||||
write(output_path, json_str)
|
||||
log_trace("Saved Dictionary to $output_path")
|
||||
else
|
||||
log_trace("Received unexpected data type for '$dataname': $(typeof(data))")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Keep listening for 10 seconds
|
||||
sleep(120)
|
||||
NATS.drain(conn)
|
||||
end
|
||||
|
||||
|
||||
# Run the test
|
||||
println("Starting Dictionary transport test...")
|
||||
println("Note: This receiver will wait for messages from the sender.")
|
||||
println("Run test_julia_to_julia_dict_sender.jl first to send test data.")
|
||||
|
||||
# Run receiver
|
||||
println("testing smartreceive")
|
||||
test_dict_receive()
|
||||
|
||||
println("Test completed.")
|
||||
@@ -1,137 +0,0 @@
|
||||
#!/usr/bin/env julia
|
||||
# Test script for Dictionary transport testing
|
||||
# Tests sending 1 large and 1 small Dictionaries via direct and link transport
|
||||
# Uses NATSBridge.jl smartsend with "dictionary" type
|
||||
|
||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/NATSBridge_dict_test"
|
||||
const NATS_URL = "nats.yiem.cc"
|
||||
const FILESERVER_URL = "http://192.168.88.104:8080"
|
||||
|
||||
# Create correlation ID for tracing
|
||||
correlation_id = string(uuid4())
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
# test dictionary transfer #
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
||||
end
|
||||
|
||||
|
||||
# File upload handler for plik server
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
# Get upload ID
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
# Upload file
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
|
||||
# Sender: Send Dictionaries via smartsend
|
||||
function test_dict_send()
|
||||
# Create a small Dictionary (will use direct transport)
|
||||
small_dict = Dict(
|
||||
"name" => "Alice",
|
||||
"age" => 30,
|
||||
"scores" => [95, 88, 92],
|
||||
"metadata" => Dict(
|
||||
"height" => 155,
|
||||
"weight" => 55
|
||||
)
|
||||
)
|
||||
|
||||
# Create a large Dictionary (will use link transport if > 1MB)
|
||||
# Generate a larger dataset (~2MB to ensure link transport)
|
||||
large_dict = Dict(
|
||||
"ids" => collect(1:50000),
|
||||
"names" => ["User_$i" for i in 1:50000],
|
||||
"scores" => rand(1:100, 50000),
|
||||
"categories" => ["Category_$(rand(1:10))" for i in 1:50000],
|
||||
"metadata" => Dict(
|
||||
"source" => "test_generator",
|
||||
"timestamp" => string(Dates.now())
|
||||
)
|
||||
)
|
||||
|
||||
# Test data 1: small Dictionary
|
||||
data1 = ("small_dict", small_dict, "dictionary")
|
||||
|
||||
# Test data 2: large Dictionary
|
||||
data2 = ("large_dict", large_dict, "dictionary")
|
||||
|
||||
# Use smartsend with dictionary type
|
||||
# For small Dictionary: will use direct transport (JSON encoded)
|
||||
# For large Dictionary: will use link transport (uploaded to fileserver)
|
||||
env, env_json_str = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[data1, data2]; # List of (dataname, data, type) tuples
|
||||
broker_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserver_upload_handler = plik_upload_handler,
|
||||
size_threshold = 1_000_000, # 1MB threshold
|
||||
correlation_id = correlation_id,
|
||||
msg_purpose = "chat",
|
||||
sender_name = "dict_sender",
|
||||
receiver_name = "",
|
||||
receiver_id = "",
|
||||
reply_to = "",
|
||||
reply_to_msg_id = "",
|
||||
is_publish = true # Publish the message to NATS
|
||||
)
|
||||
|
||||
log_trace("Sent message with $(length(env.payloads)) payloads")
|
||||
|
||||
# Log transport type for each payload
|
||||
for (i, payload) in enumerate(env.payloads)
|
||||
log_trace("Payload $i ('$payload.dataname'):")
|
||||
log_trace(" Transport: $(payload.transport)")
|
||||
log_trace(" Type: $(payload.payload_type)")
|
||||
log_trace(" Size: $(payload.size) bytes")
|
||||
log_trace(" Encoding: $(payload.encoding)")
|
||||
|
||||
if payload.transport == "link"
|
||||
log_trace(" URL: $(payload.data)")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
# Run the test
|
||||
println("Starting Dictionary transport test...")
|
||||
println("Correlation ID: $correlation_id")
|
||||
|
||||
# Run sender
|
||||
println("start smartsend for dictionaries")
|
||||
test_dict_send()
|
||||
|
||||
println("Test completed.")
|
||||
@@ -93,8 +93,62 @@ function test_mix_receive()
|
||||
log_trace(" ERROR: Expected Dict, got $(typeof(data))")
|
||||
end
|
||||
|
||||
elseif data_type == "arrowtable"
|
||||
# Arrow table data - should be Arrow.Table
|
||||
if isa(data, Arrow.Table)
|
||||
log_trace(" Type: Arrow.Table")
|
||||
log_trace(" Dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns")
|
||||
log_trace(" Columns: $(names(data))")
|
||||
|
||||
# Display first few rows
|
||||
log_trace(" First 5 rows:")
|
||||
display(data[1:min(5, size(data, 1)), :])
|
||||
|
||||
# Convert to DataFrame for display and save
|
||||
df = DataFrame(data)
|
||||
output_path = "./received_$dataname.arrow"
|
||||
io = IOBuffer()
|
||||
Arrow.write(io, data)
|
||||
write(output_path, take!(io))
|
||||
log_trace(" Saved to: $output_path")
|
||||
else
|
||||
log_trace(" ERROR: Expected Arrow.Table, got $(typeof(data))")
|
||||
end
|
||||
|
||||
elseif data_type == "jsontable"
|
||||
# JSON table data - should be Vector{Dict} or Vector{NamedTuple}
|
||||
if isa(data, Vector{<:Union{JSON.Object, Dict, NamedTuple}})
|
||||
log_trace(" Type: Vector{Dict/NamedTuple}")
|
||||
log_trace(" Number of rows: $(length(data))")
|
||||
|
||||
# Display first few rows
|
||||
log_trace(" First 3 rows:")
|
||||
for i in 1:min(3, length(data))
|
||||
log_trace(" Row $i: $(data[i])")
|
||||
end
|
||||
|
||||
# Convert to DataFrame for display and save
|
||||
df = DataFrame(data)
|
||||
log_trace(" Converted to DataFrame: $(size(df, 1)) rows x $(size(df, 2)) columns")
|
||||
|
||||
# Save as JSON file
|
||||
output_path = "./received_$dataname.json"
|
||||
json_str = JSON.json(data, 2)
|
||||
write(output_path, json_str)
|
||||
log_trace(" Saved to: $output_path")
|
||||
|
||||
# Also save as Arrow file
|
||||
output_path_arrow = "./received_$dataname.arrow"
|
||||
io = IOBuffer()
|
||||
Arrow.write(io, df)
|
||||
write(output_path_arrow, take!(io))
|
||||
log_trace(" Saved to: $output_path_arrow")
|
||||
else
|
||||
log_trace(" ERROR: Expected Vector{Dict/NamedTuple}, got $(typeof(data))")
|
||||
end
|
||||
|
||||
elseif data_type == "table"
|
||||
# Table data - should be a DataFrame
|
||||
# Table data - should be a DataFrame (backward compatibility)
|
||||
data = DataFrame(data)
|
||||
if isa(data, DataFrame)
|
||||
log_trace(" Type: DataFrame")
|
||||
@@ -180,7 +234,9 @@ function test_mix_receive()
|
||||
println("\n=== Verification Summary ===")
|
||||
text_count = count(x -> x[3] == "text", result["payloads"])
|
||||
dict_count = count(x -> x[3] == "dictionary", result["payloads"])
|
||||
table_count = count(x -> x[3] == "table", result["payloads"])
|
||||
arrowtable_count = count(x -> x[3] == "arrowtable", result["payloads"])
|
||||
jsontable_count = count(x -> x[3] == "jsontable", result["payloads"])
|
||||
table_count = count(x -> x[3] == "table", result["payloads"]) # backward compatibility
|
||||
image_count = count(x -> x[3] == "image", result["payloads"])
|
||||
audio_count = count(x -> x[3] == "audio", result["payloads"])
|
||||
video_count = count(x -> x[3] == "video", result["payloads"])
|
||||
@@ -188,7 +244,9 @@ function test_mix_receive()
|
||||
|
||||
log_trace("Text payloads: $text_count")
|
||||
log_trace("Dictionary payloads: $dict_count")
|
||||
log_trace("Table payloads: $table_count")
|
||||
log_trace("Arrow table payloads: $arrowtable_count")
|
||||
log_trace("JSON table payloads: $jsontable_count")
|
||||
log_trace("Table payloads (backward compat): $table_count")
|
||||
log_trace("Image payloads: $image_count")
|
||||
log_trace("Audio payloads: $audio_count")
|
||||
log_trace("Video payloads: $video_count")
|
||||
@@ -199,8 +257,12 @@ function test_mix_receive()
|
||||
for (dataname, data, data_type) in result["payloads"]
|
||||
if data_type in ["image", "audio", "video", "binary"]
|
||||
log_trace("$dataname: $(length(data)) bytes (binary)")
|
||||
elseif data_type == "arrowtable"
|
||||
log_trace("$dataname: $(size(data, 1)) rows x $(size(data, 2)) columns (Arrow.Table)")
|
||||
elseif data_type == "jsontable"
|
||||
log_trace("$dataname: $(length(data)) rows (Vector{Dict/NamedTuple})")
|
||||
elseif data_type == "table"
|
||||
data = DataFrame(data)
|
||||
data = DataFrame(data)
|
||||
log_trace("$dataname: $(size(data, 1)) rows x $(size(data, 2)) columns (DataFrame)")
|
||||
elseif data_type == "dictionary"
|
||||
log_trace("$dataname: $(length(JSON.json(data))) bytes (Dict)")
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
#!/usr/bin/env julia
|
||||
# Test script for mixed-content message testing
|
||||
# Tests sending a mix of text, json, table, image, audio, video, and binary data
|
||||
# Tests sending a mix of text, dictionary, arrowtable, jsontable, image, audio, video, and binary data
|
||||
# from Julia serviceA to Julia serviceB using NATSBridge.jl smartsend
|
||||
#
|
||||
# This test demonstrates that any combination and any number of mixed content
|
||||
# can be sent and received correctly.
|
||||
#
|
||||
# Key concept: DataFrames are the main table representation in Julia.
|
||||
# The NATSBridge.jl library handles serialization:
|
||||
# - For "arrowtable" type: DataFrame is serialized to Arrow IPC format
|
||||
# - For "jsontable" type: DataFrame is converted to Vector{Dict} and then to JSON
|
||||
|
||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64
|
||||
|
||||
@@ -82,50 +87,47 @@ function create_sample_data()
|
||||
)
|
||||
)
|
||||
|
||||
# Table data (DataFrame - small - direct transport)
|
||||
table_data_small = DataFrame(
|
||||
# Arrow table data (DataFrame - small - direct transport)
|
||||
# Uses Arrow IPC format for efficient binary serialization
|
||||
# NATSBridge.jl handles serialization: DataFrame -> Arrow IPC
|
||||
arrow_table_small = DataFrame(
|
||||
id = 1:10,
|
||||
message = ["msg_$i" for i in 1:10],
|
||||
sender = ["sender_$i" for i in 1:10],
|
||||
timestamp = [string(Dates.now()) for _ in 1:10],
|
||||
priority = rand(1:3, 10)
|
||||
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
|
||||
score = rand(50:100, 10),
|
||||
active = rand([true, false], 10)
|
||||
)
|
||||
|
||||
# Table data (DataFrame - large - link transport)
|
||||
# ~1.5MB of data (150,000 rows) - should trigger link transport
|
||||
table_data_large = DataFrame(
|
||||
# Arrow table data (DataFrame - large - link transport)
|
||||
# ~1.5MB of Arrow data (200,000 rows) - should trigger link transport
|
||||
# NATSBridge.jl handles serialization: DataFrame -> Arrow IPC
|
||||
arrow_table_large = DataFrame(
|
||||
id = 1:200_000,
|
||||
name = ["user_$i" for i in 1:200_000],
|
||||
score = rand(50:100, 200_000),
|
||||
active = rand([true, false], 200_000),
|
||||
timestamp = [string(Dates.now()) for _ in 1:200_000]
|
||||
)
|
||||
|
||||
# Json table data (DataFrame - small - direct transport)
|
||||
# Uses JSON format for human-readable tabular data
|
||||
# NATSBridge.jl handles serialization: DataFrame -> Vector{Dict} -> JSON
|
||||
json_table_small = DataFrame(
|
||||
id = 1:10,
|
||||
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
|
||||
score = rand(50:100, 10),
|
||||
active = rand([true, false], 10)
|
||||
)
|
||||
|
||||
# Json table data (DataFrame - large - link transport)
|
||||
# ~1.5MB of JSON data (150,000 rows) - should trigger link transport
|
||||
# NATSBridge.jl handles serialization: DataFrame -> Vector{Dict} -> JSON
|
||||
json_table_large = DataFrame(
|
||||
id = 1:150_000,
|
||||
message = ["msg_$i" for i in 1:150_000],
|
||||
sender = ["sender_$i" for i in 1:150_000],
|
||||
timestamp = [string(Dates.now()) for i in 1:150_000],
|
||||
priority = rand(1:3, 150_000)
|
||||
name = ["user_$i" for i in 1:150_000],
|
||||
score = rand(50:100, 150_000),
|
||||
active = rand([true, false], 150_000)
|
||||
)
|
||||
|
||||
# Image data (small binary - direct transport)
|
||||
# Create a simple 10x10 pixel PNG-like data (128 bytes header + 100 pixels = 112 bytes)
|
||||
# Using simple RGB data (10*10*3 = 300 bytes of pixel data)
|
||||
image_width = 10
|
||||
image_height = 10
|
||||
image_data = UInt8[]
|
||||
# PNG header (simplified)
|
||||
push!(image_data, 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A)
|
||||
# Simple RGB data (RGBRGBRGB...)
|
||||
for i in 1:image_width*image_height
|
||||
push!(image_data, 0xFF, 0x00, 0x00) # Red pixel
|
||||
end
|
||||
|
||||
# Image data (large - link transport)
|
||||
# Create a larger image (~1.5MB) to test link transport
|
||||
large_image_width = 500
|
||||
large_image_height = 1000
|
||||
large_image_data = UInt8[]
|
||||
# PNG header (simplified for 500x1000)
|
||||
push!(large_image_data, 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A)
|
||||
# RGB data (500*1000*3 = 1,500,000 bytes)
|
||||
for i in 1:large_image_width*large_image_height
|
||||
push!(large_image_data, rand(1:255), rand(1:255), rand(1:255)) # Random color pixels
|
||||
end
|
||||
|
||||
# Audio data (small binary - direct transport)
|
||||
audio_data = UInt8[rand(1:255) for _ in 1:100]
|
||||
|
||||
@@ -150,10 +152,10 @@ function create_sample_data()
|
||||
return (
|
||||
text_data,
|
||||
dict_data,
|
||||
table_data_small,
|
||||
table_data_large,
|
||||
image_data,
|
||||
large_image_data,
|
||||
arrow_table_small,
|
||||
arrow_table_large,
|
||||
json_table_small,
|
||||
json_table_large,
|
||||
audio_data,
|
||||
large_audio_data,
|
||||
video_data,
|
||||
@@ -167,19 +169,35 @@ end
|
||||
# Sender: Send mixed content via smartsend
|
||||
function test_mix_send()
|
||||
# Create sample data
|
||||
(text_data, dict_data, table_data_small, table_data_large, image_data, large_image_data, audio_data, large_audio_data, video_data, large_video_data, binary_data, large_binary_data) = create_sample_data()
|
||||
|
||||
(text_data, dict_data, arrow_table_small, arrow_table_large, json_table_small, json_table_large, audio_data, large_audio_data, video_data, large_video_data, binary_data, large_binary_data) = create_sample_data()
|
||||
|
||||
# Read image files from disk (following test_julia_file_sender.jl pattern)
|
||||
# Small image - should use direct transport
|
||||
file_path_small_image = "./test/small_image.jpg"
|
||||
file_data_small_image = read(file_path_small_image)
|
||||
filename_small_image = basename(file_path_small_image)
|
||||
|
||||
# Large image - should use link transport
|
||||
file_path_large_image = "./test/large_image.png"
|
||||
file_data_large_image = read(file_path_large_image)
|
||||
filename_large_image = basename(file_path_large_image)
|
||||
|
||||
# Create payloads list - mixed content with both small and large data
|
||||
# Small data uses direct transport, large data uses link transport
|
||||
# Key: Pass DataFrame directly and specify type as "arrowtable" or "jsontable"
|
||||
# NATSBridge.jl handles the serialization internally
|
||||
payloads = [
|
||||
# Small data (direct transport) - text, dictionary, small table
|
||||
# Small data (direct transport) - text, dictionary, arrowtable, jsontable, small image
|
||||
("chat_text", text_data, "text"),
|
||||
("chat_json", dict_data, "dictionary"),
|
||||
("chat_table_small", table_data_small, "table"),
|
||||
|
||||
# Large data (link transport) - large table, large image, large audio, large video, large binary
|
||||
("chat_table_large", table_data_large, "table"),
|
||||
("user_image_large", large_image_data, "image"),
|
||||
("arrow_table_small", arrow_table_small, "arrowtable"),
|
||||
("json_table_small", json_table_small, "jsontable"),
|
||||
(filename_small_image, file_data_small_image, "binary"),
|
||||
|
||||
# Large data (link transport) - large arrowtable, large jsontable, large image, large audio, large video, large binary
|
||||
("arrow_table_large", arrow_table_large, "arrowtable"),
|
||||
("json_table_large", json_table_large, "jsontable"),
|
||||
(filename_large_image, file_data_large_image, "binary"),
|
||||
("audio_clip_large", large_audio_data, "audio"),
|
||||
("video_clip_large", large_video_data, "video"),
|
||||
("binary_file_large", large_binary_data, "binary")
|
||||
@@ -237,4 +255,4 @@ println("start smartsend for mixed content")
|
||||
test_mix_send()
|
||||
|
||||
println("\nTest completed.")
|
||||
println("Note: Run test_julia_to_julia_mix_receiver.jl to receive the messages.")
|
||||
println("Note: Run test_julia_to_julia_mix_receiver.jl to receive the messages.")
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
#!/usr/bin/env julia
|
||||
# Test script for DataFrame table transport testing
|
||||
# Tests receiving 1 large and 1 small DataFrames via direct and link transport
|
||||
# Uses NATSBridge.jl smartreceive with "table" type
|
||||
|
||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/NATSBridge_table_test"
|
||||
const NATS_URL = "nats.yiem.cc"
|
||||
const FILESERVER_URL = "http://192.168.88.104:8080"
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
# test table transfer #
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] $message")
|
||||
end
|
||||
|
||||
|
||||
# Receiver: Listen for messages and verify DataFrame table handling
|
||||
function test_table_receive()
|
||||
conn = NATS.connect(NATS_URL)
|
||||
NATS.subscribe(conn, SUBJECT) do msg
|
||||
log_trace("Received message on $(msg.subject)")
|
||||
|
||||
# Use NATSBridge.smartreceive to handle the data
|
||||
# API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay)
|
||||
result = NATSBridge.smartreceive(
|
||||
msg;
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
)
|
||||
|
||||
# Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples
|
||||
for (dataname, data, data_type) in result["payloads"]
|
||||
data = DataFrame(data)
|
||||
if isa(data, DataFrame)
|
||||
log_trace("Received DataFrame '$dataname' of type $data_type")
|
||||
log_trace(" Dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns")
|
||||
log_trace(" Column names: $(names(data))")
|
||||
|
||||
# Display first few rows
|
||||
println(" First 5 rows:")
|
||||
display(data[1:min(5, size(data, 1)), :])
|
||||
|
||||
# Save to file
|
||||
output_path = "./received_$dataname.arrow"
|
||||
io = IOBuffer()
|
||||
Arrow.write(io, data)
|
||||
write(output_path, take!(io))
|
||||
log_trace("Saved DataFrame to $output_path")
|
||||
else
|
||||
log_trace("Received unexpected data type for '$dataname': $(typeof(data))")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Keep listening for 10 seconds
|
||||
sleep(120)
|
||||
NATS.drain(conn)
|
||||
end
|
||||
|
||||
|
||||
# Run the test
|
||||
println("Starting DataFrame table transport test...")
|
||||
println("Note: This receiver will wait for messages from the sender.")
|
||||
println("Run test_julia_to_julia_table_sender.jl first to send test data.")
|
||||
|
||||
# Run receiver
|
||||
println("testing smartreceive")
|
||||
test_table_receive()
|
||||
|
||||
println("Test completed.")
|
||||
@@ -1,135 +0,0 @@
|
||||
#!/usr/bin/env julia
|
||||
# Test script for DataFrame table transport testing
|
||||
# Tests sending 1 large and 1 small DataFrames via direct and link transport
|
||||
# Uses NATSBridge.jl smartsend with "table" type
|
||||
|
||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/NATSBridge_table_test"
|
||||
const NATS_URL = "nats.yiem.cc"
|
||||
const FILESERVER_URL = "http://192.168.88.104:8080"
|
||||
|
||||
# Create correlation ID for tracing
|
||||
correlation_id = string(uuid4())
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
# test table transfer #
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
||||
end
|
||||
|
||||
|
||||
# File upload handler for plik server
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
# Get upload ID
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
# Upload file
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
|
||||
# Sender: Send DataFrame tables via smartsend
|
||||
function test_table_send()
|
||||
# Create a small DataFrame (will use direct transport)
|
||||
small_df = DataFrame(
|
||||
id = 1:10,
|
||||
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
|
||||
score = [95, 88, 92, 85, 90, 78, 95, 88, 92, 85],
|
||||
category = ["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
|
||||
)
|
||||
|
||||
# Create a large DataFrame (will use link transport if > 1MB)
|
||||
# Generate a larger dataset (~2MB to ensure link transport)
|
||||
large_ids = 1:50000
|
||||
large_names = ["User_$i" for i in 1:50000]
|
||||
large_scores = rand(1:100, 50000)
|
||||
large_categories = ["Category_$(rand(1:10))" for i in 1:50000]
|
||||
|
||||
large_df = DataFrame(
|
||||
id = large_ids,
|
||||
name = large_names,
|
||||
score = large_scores,
|
||||
category = large_categories
|
||||
)
|
||||
|
||||
# Test data 1: small DataFrame
|
||||
data1 = ("small_table", small_df, "table")
|
||||
|
||||
# Test data 2: large DataFrame
|
||||
data2 = ("large_table", large_df, "table")
|
||||
|
||||
# Use smartsend with table type
|
||||
# For small DataFrame: will use direct transport (Base64 encoded Arrow IPC)
|
||||
# For large DataFrame: will use link transport (uploaded to fileserver)
|
||||
env, env_json_str = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[data1, data2]; # List of (dataname, data, type) tuples
|
||||
broker_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserver_upload_handler = plik_upload_handler,
|
||||
size_threshold = 1_000_000, # 1MB threshold
|
||||
correlation_id = correlation_id,
|
||||
msg_purpose = "chat",
|
||||
sender_name = "table_sender",
|
||||
receiver_name = "",
|
||||
receiver_id = "",
|
||||
reply_to = "",
|
||||
reply_to_msg_id = "",
|
||||
is_publish = true # Publish the message to NATS
|
||||
)
|
||||
|
||||
log_trace("Sent message with $(length(env.payloads)) payloads")
|
||||
|
||||
# Log transport type for each payload
|
||||
for (i, payload) in enumerate(env.payloads)
|
||||
log_trace("Payload $i ('$payload.dataname'):")
|
||||
log_trace(" Transport: $(payload.transport)")
|
||||
log_trace(" Type: $(payload.payload_type)")
|
||||
log_trace(" Size: $(payload.size) bytes")
|
||||
log_trace(" Encoding: $(payload.encoding)")
|
||||
|
||||
if payload.transport == "link"
|
||||
log_trace(" URL: $(payload.data)")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
# Run the test
|
||||
println("Starting DataFrame table transport test...")
|
||||
println("Correlation ID: $correlation_id")
|
||||
|
||||
# Run sender
|
||||
println("start smartsend for tables")
|
||||
test_table_send()
|
||||
|
||||
println("Test completed.")
|
||||
@@ -1,83 +0,0 @@
|
||||
#!/usr/bin/env julia
|
||||
# Test script for text transport testing
|
||||
# Tests receiving 1 large and 1 small text from Julia serviceA to Julia serviceB
|
||||
# Uses NATSBridge.jl smartreceive with "text" type
|
||||
|
||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/NATSBridge_text_test"
|
||||
const NATS_URL = "nats.yiem.cc"
|
||||
const FILESERVER_URL = "http://192.168.88.104:8080"
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
# test text transfer #
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] $message")
|
||||
end
|
||||
|
||||
|
||||
# Receiver: Listen for messages and verify text handling
|
||||
function test_text_receive()
|
||||
conn = NATS.connect(NATS_URL)
|
||||
NATS.subscribe(conn, SUBJECT) do msg
|
||||
log_trace("Received message on $(msg.subject)")
|
||||
|
||||
# Use NATSBridge.smartreceive to handle the data
|
||||
# API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay)
|
||||
result = NATSBridge.smartreceive(
|
||||
msg;
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
)
|
||||
|
||||
# Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples
|
||||
for (dataname, data, data_type) in result["payloads"]
|
||||
if isa(data, String)
|
||||
log_trace("Received text '$dataname' of type $data_type")
|
||||
log_trace(" Length: $(length(data)) characters")
|
||||
|
||||
# Display first 100 characters
|
||||
if length(data) > 100
|
||||
log_trace(" First 100 characters: $(data[1:100])...")
|
||||
else
|
||||
log_trace(" Content: $data")
|
||||
end
|
||||
|
||||
# Save to file
|
||||
output_path = "./received_$dataname.txt"
|
||||
write(output_path, data)
|
||||
log_trace("Saved text to $output_path")
|
||||
else
|
||||
log_trace("Received unexpected data type for '$dataname': $(typeof(data))")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Keep listening for 10 seconds
|
||||
sleep(120)
|
||||
NATS.drain(conn)
|
||||
end
|
||||
|
||||
|
||||
# Run the test
|
||||
println("Starting text transport test...")
|
||||
println("Note: This receiver will wait for messages from the sender.")
|
||||
println("Run test_julia_to_julia_text_sender.jl first to send test data.")
|
||||
|
||||
# Run receiver
|
||||
println("testing smartreceive for text")
|
||||
test_text_receive()
|
||||
|
||||
println("Test completed.")
|
||||
@@ -1,120 +0,0 @@
|
||||
#!/usr/bin/env julia
|
||||
# Test script for text transport testing
|
||||
# Tests sending 1 large and 1 small text from Julia serviceA to Julia serviceB
|
||||
# Uses NATSBridge.jl smartsend with "text" type
|
||||
|
||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/NATSBridge_text_test"
|
||||
const NATS_URL = "nats.yiem.cc"
|
||||
const FILESERVER_URL = "http://192.168.88.104:8080"
|
||||
|
||||
# Create correlation ID for tracing
|
||||
correlation_id = string(uuid4())
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
# test text transfer #
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
||||
end
|
||||
|
||||
|
||||
# File upload handler for plik server
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
# Get upload ID
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
# Upload file
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
|
||||
# Sender: Send text via smartsend
|
||||
function test_text_send()
|
||||
# Create a small text (will use direct transport)
|
||||
small_text = "Hello, this is a small text message. Testing direct transport via NATS."
|
||||
|
||||
# Create a large text (will use link transport if > 1MB)
|
||||
# Generate a larger text (~2MB to ensure link transport)
|
||||
large_text = join(["Line $i: This is a sample text line with some content to pad the size. " for i in 1:50000], "")
|
||||
|
||||
# Test data 1: small text
|
||||
data1 = ("small_text", small_text, "text")
|
||||
|
||||
# Test data 2: large text
|
||||
data2 = ("large_text", large_text, "text")
|
||||
|
||||
# Use smartsend with text type
|
||||
# For small text: will use direct transport (Base64 encoded UTF-8)
|
||||
# For large text: will use link transport (uploaded to fileserver)
|
||||
env, env_json_str = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[data1, data2]; # List of (dataname, data, type) tuples
|
||||
broker_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserver_upload_handler = plik_upload_handler,
|
||||
size_threshold = 1_000_000, # 1MB threshold
|
||||
correlation_id = correlation_id,
|
||||
msg_purpose = "chat",
|
||||
sender_name = "text_sender",
|
||||
receiver_name = "",
|
||||
receiver_id = "",
|
||||
reply_to = "",
|
||||
reply_to_msg_id = "",
|
||||
is_publish = true # Publish the message to NATS
|
||||
)
|
||||
|
||||
log_trace("Sent message with $(length(env.payloads)) payloads")
|
||||
|
||||
# Log transport type for each payload
|
||||
for (i, payload) in enumerate(env.payloads)
|
||||
log_trace("Payload $i ('$payload.dataname'):")
|
||||
log_trace(" Transport: $(payload.transport)")
|
||||
log_trace(" Type: $(payload.payload_type)")
|
||||
log_trace(" Size: $(payload.size) bytes")
|
||||
log_trace(" Encoding: $(payload.encoding)")
|
||||
|
||||
if payload.transport == "link"
|
||||
log_trace(" URL: $(payload.data)")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
# Run the test
|
||||
println("Starting text transport test...")
|
||||
println("Correlation ID: $correlation_id")
|
||||
|
||||
# Run sender
|
||||
println("start smartsend for text")
|
||||
test_text_send()
|
||||
|
||||
println("Test completed.")
|
||||
Reference in New Issue
Block a user