#!/usr/bin/env julia # Scenario 2: Deep Dive Analysis (Large Arrow Table) # Tests large Arrow tables (> 1MB) sent via HTTP fileserver using NATS using Arrow using DataFrames using JSON3 using UUIDs # Include the bridge module include("../src/julia_bridge.jl") using .BiDirectionalBridge # Configuration const ANALYSIS_SUBJECT = "analysis_results" const RESPONSE_SUBJECT = "analysis_response" const NATS_URL = "nats://localhost:4222" # Create correlation ID for tracing correlation_id = string(uuid4()) # Receiver: Listen for analysis results function start_analysis_listener() conn = NATS.Connection(NATS_URL) try NATS.subscribe(conn, ANALYSIS_SUBJECT) do msg log_trace("Received message from $(msg.subject)") # Parse the envelope env = MessageEnvelope(String(msg.data)) # Use SmartReceive to handle the data result = SmartReceive(msg) # Process the data based on type if result.envelope.type == "table" df = result.data log_trace("Received DataFrame with $(nrows(df)) rows") log_trace("DataFrame columns: $(names(df))") # Send acknowledgment response = Dict( "status" => "Processed", "correlation_id" => env.correlation_id, "row_count" => nrows(df) ) NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response)) end end # Keep listening for 10 seconds sleep(10) finally NATS.close(conn) end end # Helper: Log with correlation ID function log_trace(message) timestamp = Dates.now() println("[$timestamp] [Correlation: $correlation_id] $message") end # Run the listener start_analysis_listener()