66 lines
1.9 KiB
Julia
66 lines
1.9 KiB
Julia
#!/usr/bin/env julia
|
|
# Scenario 2: Deep Dive Analysis (Large Arrow Table)
|
|
# Tests large Arrow tables (> 1MB) sent via HTTP fileserver
|
|
|
|
using NATS
|
|
using Arrow
|
|
using DataFrames
|
|
using JSON3
|
|
using UUIDs
|
|
|
|
# Include the bridge module
|
|
include("../src/julia_bridge.jl")
|
|
using .BiDirectionalBridge
|
|
|
|
# Configuration
|
|
const ANALYSIS_SUBJECT = "analysis_results"
|
|
const RESPONSE_SUBJECT = "analysis_response"
|
|
const NATS_URL = "nats://localhost:4222"
|
|
|
|
# Create correlation ID for tracing
|
|
correlation_id = string(uuid4())
|
|
|
|
# Receiver: Listen for analysis results
|
|
function start_analysis_listener()
|
|
conn = NATS.Connection(NATS_URL)
|
|
try
|
|
NATS.subscribe(conn, ANALYSIS_SUBJECT) do msg
|
|
log_trace("Received message from $(msg.subject)")
|
|
|
|
# Parse the envelope
|
|
env = MessageEnvelope(String(msg.data))
|
|
|
|
# Use SmartReceive to handle the data
|
|
result = SmartReceive(msg)
|
|
|
|
# Process the data based on type
|
|
if result.envelope.type == "table"
|
|
df = result.data
|
|
log_trace("Received DataFrame with $(nrows(df)) rows")
|
|
log_trace("DataFrame columns: $(names(df))")
|
|
|
|
# Send acknowledgment
|
|
response = Dict(
|
|
"status" => "Processed",
|
|
"correlation_id" => env.correlation_id,
|
|
"row_count" => nrows(df)
|
|
)
|
|
NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response))
|
|
end
|
|
end
|
|
|
|
# Keep listening for 10 seconds
|
|
sleep(10)
|
|
finally
|
|
NATS.close(conn)
|
|
end
|
|
end
|
|
|
|
# Helper: Log with correlation ID
|
|
function log_trace(message)
|
|
timestamp = Dates.now()
|
|
println("[$timestamp] [Correlation: $correlation_id] $message")
|
|
end
|
|
|
|
# Run the listener
|
|
start_analysis_listener() |