Files
NATSBridge/test/scenario2_large_table.jl
2026-02-10 06:55:29 +07:00

66 lines
1.9 KiB
Julia

#!/usr/bin/env julia
# Scenario 2: Deep Dive Analysis (Large Arrow Table)
# Tests large Arrow tables (> 1MB) sent via HTTP fileserver
using NATS
using Arrow
using DataFrames
using JSON3
using UUIDs
# Include the bridge module
include("../src/julia_bridge.jl")
using .BiDirectionalBridge
# Configuration
const ANALYSIS_SUBJECT = "analysis_results"
const RESPONSE_SUBJECT = "analysis_response"
const NATS_URL = "nats://localhost:4222"
# Create correlation ID for tracing
correlation_id = string(uuid4())
# Receiver: Listen for analysis results
function start_analysis_listener()
conn = NATS.Connection(NATS_URL)
try
NATS.subscribe(conn, ANALYSIS_SUBJECT) do msg
log_trace("Received message from $(msg.subject)")
# Parse the envelope
env = MessageEnvelope(String(msg.data))
# Use SmartReceive to handle the data
result = SmartReceive(msg)
# Process the data based on type
if result.envelope.type == "table"
df = result.data
log_trace("Received DataFrame with $(nrows(df)) rows")
log_trace("DataFrame columns: $(names(df))")
# Send acknowledgment
response = Dict(
"status" => "Processed",
"correlation_id" => env.correlation_id,
"row_count" => nrows(df)
)
NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response))
end
end
# Keep listening for 10 seconds
sleep(10)
finally
NATS.close(conn)
end
end
# Helper: Log with correlation ID
function log_trace(message)
timestamp = Dates.now()
println("[$timestamp] [Correlation: $correlation_id] $message")
end
# Run the listener
start_analysis_listener()