#!/usr/bin/env julia # Test script for DataFrame table transport testing # Tests sending 1 large and 1 small DataFrames via direct and link transport # Uses NATSBridge.jl smartsend with "table" type using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP # Include the bridge module include("../src/NATSBridge.jl") using .NATSBridge # Configuration const SUBJECT = "/NATSBridge_table_test" const NATS_URL = "nats.yiem.cc" const FILESERVER_URL = "http://192.168.88.104:8080" # Create correlation ID for tracing correlation_id = string(uuid4()) # ------------------------------------------------------------------------------------------------ # # test table transfer # # ------------------------------------------------------------------------------------------------ # # Helper: Log with correlation ID function log_trace(message) timestamp = Dates.now() println("[$timestamp] [Correlation: $correlation_id] $message") end # File upload handler for plik server function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} # Get upload ID url_getUploadID = "$fileserver_url/upload" headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) responseJson = JSON.parse(String(httpResponse.body)) uploadid = responseJson["id"] uploadtoken = responseJson["uploadToken"] # Upload file file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") url_upload = "$fileserver_url/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] form = HTTP.Form(Dict("file" => file_multipart)) httpResponse = HTTP.post(url_upload, headers, form) responseJson = JSON.parse(String(httpResponse.body)) fileid = responseJson["id"] url = "$fileserver_url/file/$uploadid/$fileid/$dataname" return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end # Sender: Send DataFrame tables via smartsend function test_table_send() # Create a small DataFrame (will use direct transport) small_df = DataFrame( id = 1:10, name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"], score = [95, 88, 92, 85, 90, 78, 95, 88, 92, 85], category = ["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"] ) # Create a large DataFrame (will use link transport if > 1MB) # Generate a larger dataset (~2MB to ensure link transport) large_ids = 1:50000 large_names = ["User_$i" for i in 1:50000] large_scores = rand(1:100, 50000) large_categories = ["Category_$(rand(1:10))" for i in 1:50000] large_df = DataFrame( id = large_ids, name = large_names, score = large_scores, category = large_categories ) # Test data 1: small DataFrame data1 = ("small_table", small_df, "table") # Test data 2: large DataFrame data2 = ("large_table", large_df, "table") # Use smartsend with table type # For small DataFrame: will use direct transport (Base64 encoded Arrow IPC) # For large DataFrame: will use link transport (uploaded to fileserver) env, env_json_str = NATSBridge.smartsend( SUBJECT, [data1, data2]; # List of (dataname, data, type) tuples broker_url = NATS_URL, fileserver_url = FILESERVER_URL, fileserver_upload_handler = plik_upload_handler, size_threshold = 1_000_000, # 1MB threshold correlation_id = correlation_id, msg_purpose = "chat", sender_name = "table_sender", receiver_name = "", receiver_id = "", reply_to = "", reply_to_msg_id = "", is_publish = true # Publish the message to NATS ) log_trace("Sent message with $(length(env.payloads)) payloads") # Log transport type for each payload for (i, payload) in enumerate(env.payloads) log_trace("Payload $i ('$payload.dataname'):") log_trace(" Transport: $(payload.transport)") log_trace(" Type: $(payload.payload_type)") log_trace(" Size: $(payload.size) bytes") log_trace(" Encoding: $(payload.encoding)") if payload.transport == "link" log_trace(" URL: $(payload.data)") end end end # Run the test println("Starting DataFrame table transport test...") println("Correlation ID: $correlation_id") # Run sender println("start smartsend for tables") test_table_send() println("Test completed.")