Files
NATSBridge/test/julia_to_julia_table_receiver.jl
2026-02-19 07:08:57 +07:00

99 lines
3.1 KiB
Julia

#!/usr/bin/env julia
# Test script for DataFrame transfer from Julia serviceA to Julia serviceB
# Demonstrates the "Selection" scenario (small Arrow table) using NATSBridge
#
# This is serviceB - the receiver that receives a dummy DataFrame from serviceA
using NATSBridge
using UUIDs
using DataFrames
using JSON
using Dates
# Include the NATSBridge module
include("src/NATSBridge.jl")
# Configuration
const SUBJECT = "/NATSBridge_table_test"
const NATS_URL = "nats://localhost:4222"
# Helper: Log with correlation ID
function log_trace(correlation_id::String, message::String)
timestamp = Dates.now()
println("[$timestamp] [Correlation: $correlation_id] $message")
end
# Receiver: Receive and process DataFrame from serviceA
function receive_dataframe()
# Connect to NATS
conn = NATS.connect(NATS_URL)
# Subscribe to the subject
subscription = NATS.subscribe(conn, SUBJECT)
println("Listening for DataFrame messages on '$SUBJECT'...")
println("Press Ctrl+C to stop listening.")
# Listen for messages
while true
# Wait for a message with a 1-second timeout
msg = NATS.waitfor(subscription, 1.0)
if msg !== nothing
# Extract correlation ID for logging
json_data = JSON.parse(String(msg.payload))
cid = json_data["correlationId"]
log_trace(cid, "Received message from $(json_data["senderName"])")
# Process the message using smartreceive
payloads = NATSBridge.smartreceive(
msg;
fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, cid) ->
NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, cid),
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
log_trace(cid, "Processed $(length(payloads)) payload(s)")
# Process each payload
for (dataname, data, payload_type) in payloads
log_trace(cid, "Payload '$dataname' type: $payload_type")
# Handle table (DataFrame) type
if payload_type == "table"
println("\nReceived DataFrame:")
println(data)
# Display DataFrame dimensions
println("\nDataFrame dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns")
# Display column names
println("Column names: $(names(data))")
# Optionally send acknowledgment
reply_to = get(json_data, "replyTo", "")
if !isempty(reply_to)
log_trace(cid, "Reply to: $reply_to")
# Could send ACK here
end
else
println("\nReceived non-table payload: $dataname (type: $payload_type)")
end
end
end
end
end
# Run the receiver
println("Starting DataFrame receiver...")
println("Subject: $SUBJECT")
println("NATS URL: $NATS_URL")
println("="^50)
# Run receiver (this will block and listen for messages)
receive_dataframe()