#!/usr/bin/env julia # Test script for DataFrame table transport testing # Tests receiving 1 large and 1 small DataFrames via direct and link transport # Uses NATSBridge.jl smartreceive with "table" type using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP # Include the bridge module include("../src/NATSBridge.jl") using .NATSBridge # Configuration const SUBJECT = "/NATSBridge_table_test" const NATS_URL = "nats.yiem.cc" const FILESERVER_URL = "http://192.168.88.104:8080" # ------------------------------------------------------------------------------------------------ # # test table transfer # # ------------------------------------------------------------------------------------------------ # # Helper: Log with correlation ID function log_trace(message) timestamp = Dates.now() println("[$timestamp] $message") end # Receiver: Listen for messages and verify DataFrame table handling function test_table_receive() conn = NATS.connect(NATS_URL) NATS.subscribe(conn, SUBJECT) do msg log_trace("Received message on $(msg.subject)") # Use NATSBridge.smartreceive to handle the data # API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay) result = NATSBridge.smartreceive( msg; max_retries = 5, base_delay = 100, max_delay = 5000 ) # Result is a list of (dataname, data, data_type) tuples for (dataname, data, data_type) in result data = DataFrame(data) if isa(data, DataFrame) log_trace("Received DataFrame '$dataname' of type $data_type") log_trace(" Dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns") log_trace(" Column names: $(names(data))") # Display first few rows println(" First 5 rows:") display(data[1:min(5, size(data, 1)), :]) # Save to file output_path = "./received_$dataname.arrow" io = IOBuffer() Arrow.write(io, data) write(output_path, take!(io)) log_trace("Saved DataFrame to $output_path") else log_trace("Received unexpected data type for '$dataname': $(typeof(data))") end end end # Keep listening for 10 seconds sleep(120) NATS.drain(conn) end # Run the test println("Starting DataFrame table transport test...") println("Note: This receiver will wait for messages from the sender.") println("Run test_julia_to_julia_table_sender.jl first to send test data.") # Run receiver println("testing smartreceive") test_table_receive() println("Test completed.")