update
This commit is contained in:
134
test/test_julia_to_julia_table_sender.jl
Normal file
134
test/test_julia_to_julia_table_sender.jl
Normal file
@@ -0,0 +1,134 @@
|
||||
#!/usr/bin/env julia
|
||||
# Test script for DataFrame table transport testing
|
||||
# Tests sending 1 large and 1 small DataFrames via direct and link transport
|
||||
# Uses NATSBridge.jl smartsend with "table" type
|
||||
|
||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/NATSBridge_table_test"
|
||||
const NATS_URL = "nats.yiem.cc"
|
||||
const FILESERVER_URL = "http://192.168.88.104:8080"
|
||||
|
||||
# Create correlation ID for tracing
|
||||
correlation_id = string(uuid4())
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
# test table transfer #
|
||||
# ------------------------------------------------------------------------------------------------ #
|
||||
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
||||
end
|
||||
|
||||
|
||||
# File upload handler for plik server
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
# Get upload ID
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
# Upload file
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
|
||||
# Sender: Send DataFrame tables via smartsend
|
||||
function test_table_send()
|
||||
# Create a small DataFrame (will use direct transport)
|
||||
small_df = DataFrame(
|
||||
id = 1:10,
|
||||
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
|
||||
score = [95, 88, 92, 85, 90, 78, 95, 88, 92, 85],
|
||||
category = ["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
|
||||
)
|
||||
|
||||
# Create a large DataFrame (will use link transport if > 1MB)
|
||||
# Generate a larger dataset (~2MB to ensure link transport)
|
||||
large_ids = 1:50000
|
||||
large_names = ["User_$i" for i in 1:50000]
|
||||
large_scores = rand(1:100, 50000)
|
||||
large_categories = ["Category_$(rand(1:10))" for i in 1:50000]
|
||||
|
||||
large_df = DataFrame(
|
||||
id = large_ids,
|
||||
name = large_names,
|
||||
score = large_scores,
|
||||
category = large_categories
|
||||
)
|
||||
|
||||
# Test data 1: small DataFrame
|
||||
data1 = ("small_table", small_df, "table")
|
||||
|
||||
# Test data 2: large DataFrame
|
||||
data2 = ("large_table", large_df, "table")
|
||||
|
||||
# Use smartsend with table type
|
||||
# For small DataFrame: will use direct transport (Base64 encoded Arrow IPC)
|
||||
# For large DataFrame: will use link transport (uploaded to fileserver)
|
||||
env = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[data1, data2], # List of (dataname, data, type) tuples
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserverUploadHandler = plik_upload_handler,
|
||||
size_threshold = 1_000_000, # 1MB threshold
|
||||
correlation_id = correlation_id,
|
||||
msg_purpose = "chat",
|
||||
sender_name = "table_sender",
|
||||
receiver_name = "",
|
||||
receiver_id = "",
|
||||
reply_to = "",
|
||||
reply_to_msg_id = ""
|
||||
)
|
||||
|
||||
log_trace("Sent message with $(length(env.payloads)) payloads")
|
||||
|
||||
# Log transport type for each payload
|
||||
for (i, payload) in enumerate(env.payloads)
|
||||
log_trace("Payload $i ('$payload.dataname'):")
|
||||
log_trace(" Transport: $(payload.transport)")
|
||||
log_trace(" Type: $(payload.type)")
|
||||
log_trace(" Size: $(payload.size) bytes")
|
||||
log_trace(" Encoding: $(payload.encoding)")
|
||||
|
||||
if payload.transport == "link"
|
||||
log_trace(" URL: $(payload.data)")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
# Run the test
|
||||
println("Starting DataFrame table transport test...")
|
||||
println("Correlation ID: $correlation_id")
|
||||
|
||||
# Run sender
|
||||
println("start smartsend for tables")
|
||||
test_table_send()
|
||||
|
||||
println("Test completed.")
|
||||
Reference in New Issue
Block a user