add test
This commit is contained in:
99
test/julia_to_julia_table_receiver.jl
Normal file
99
test/julia_to_julia_table_receiver.jl
Normal file
@@ -0,0 +1,99 @@
|
||||
#!/usr/bin/env julia
|
||||
# Test script for DataFrame transfer from Julia serviceA to Julia serviceB
|
||||
# Demonstrates the "Selection" scenario (small Arrow table) using NATSBridge
|
||||
#
|
||||
# This is serviceB - the receiver that receives a dummy DataFrame from serviceA
|
||||
|
||||
using NATSBridge
|
||||
using UUIDs
|
||||
using DataFrames
|
||||
using JSON
|
||||
using Dates
|
||||
|
||||
# Include the NATSBridge module
|
||||
include("src/NATSBridge.jl")
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/NATSBridge_table_test"
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(correlation_id::String, message::String)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
||||
end
|
||||
|
||||
|
||||
# Receiver: Receive and process DataFrame from serviceA
|
||||
function receive_dataframe()
|
||||
# Connect to NATS
|
||||
conn = NATS.connect(NATS_URL)
|
||||
|
||||
# Subscribe to the subject
|
||||
subscription = NATS.subscribe(conn, SUBJECT)
|
||||
|
||||
println("Listening for DataFrame messages on '$SUBJECT'...")
|
||||
println("Press Ctrl+C to stop listening.")
|
||||
|
||||
# Listen for messages
|
||||
while true
|
||||
# Wait for a message with a 1-second timeout
|
||||
msg = NATS.waitfor(subscription, 1.0)
|
||||
|
||||
if msg !== nothing
|
||||
# Extract correlation ID for logging
|
||||
json_data = JSON.parse(String(msg.payload))
|
||||
cid = json_data["correlationId"]
|
||||
|
||||
log_trace(cid, "Received message from $(json_data["senderName"])")
|
||||
|
||||
# Process the message using smartreceive
|
||||
payloads = NATSBridge.smartreceive(
|
||||
msg;
|
||||
fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, cid) ->
|
||||
NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, cid),
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
)
|
||||
|
||||
log_trace(cid, "Processed $(length(payloads)) payload(s)")
|
||||
|
||||
# Process each payload
|
||||
for (dataname, data, payload_type) in payloads
|
||||
log_trace(cid, "Payload '$dataname' type: $payload_type")
|
||||
|
||||
# Handle table (DataFrame) type
|
||||
if payload_type == "table"
|
||||
println("\nReceived DataFrame:")
|
||||
println(data)
|
||||
|
||||
# Display DataFrame dimensions
|
||||
println("\nDataFrame dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns")
|
||||
|
||||
# Display column names
|
||||
println("Column names: $(names(data))")
|
||||
|
||||
# Optionally send acknowledgment
|
||||
reply_to = get(json_data, "replyTo", "")
|
||||
if !isempty(reply_to)
|
||||
log_trace(cid, "Reply to: $reply_to")
|
||||
# Could send ACK here
|
||||
end
|
||||
else
|
||||
println("\nReceived non-table payload: $dataname (type: $payload_type)")
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
# Run the receiver
|
||||
println("Starting DataFrame receiver...")
|
||||
println("Subject: $SUBJECT")
|
||||
println("NATS URL: $NATS_URL")
|
||||
println("="^50)
|
||||
|
||||
# Run receiver (this will block and listen for messages)
|
||||
receive_dataframe()
|
||||
Reference in New Issue
Block a user