update
This commit is contained in:
@@ -104,7 +104,7 @@ function log_trace(correlation_id::String, message::String)
|
|||||||
timestamp = Dates.now() # Get current timestamp
|
timestamp = Dates.now() # Get current timestamp
|
||||||
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
|
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
|
||||||
end
|
end
|
||||||
|
2112
|
||||||
|
|
||||||
""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
|
""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
|
||||||
|
|
||||||
@@ -346,8 +346,7 @@ function smartreceive(
|
|||||||
max_delay::Int = 5000
|
max_delay::Int = 5000
|
||||||
)
|
)
|
||||||
# Parse the envelope
|
# Parse the envelope
|
||||||
env = MessageEnvelope(String(msg.data)) # Parse NATS message data as JSON envelope
|
env = MessageEnvelope(String(msg.payload)) # Parse NATS message data as JSON envelope
|
||||||
|
|
||||||
log_trace(env.correlation_id, "Processing received message") # Log message processing start
|
log_trace(env.correlation_id, "Processing received message") # Log message processing start
|
||||||
|
|
||||||
# Check transport type
|
# Check transport type
|
||||||
|
|||||||
@@ -66,17 +66,16 @@ function test_large_binary_receive()
|
|||||||
conn = NATS.connect(NATS_URL)
|
conn = NATS.connect(NATS_URL)
|
||||||
NATS.subscribe(conn, SUBJECT) do msg
|
NATS.subscribe(conn, SUBJECT) do msg
|
||||||
log_trace("Received message on $(msg.subject)")
|
log_trace("Received message on $(msg.subject)")
|
||||||
log_trace("Received message:\n$msg")
|
|
||||||
|
|
||||||
# Use SmartReceive to handle the data
|
|
||||||
result = SmartReceive(msg)
|
|
||||||
|
|
||||||
|
# Use NATSBridge.smartreceive to handle the data
|
||||||
|
result = NATSBridge.smartreceive(msg)
|
||||||
|
println("envelope----- ", result.envelope)
|
||||||
# Check transport type
|
# Check transport type
|
||||||
if result.envelope.transport == "direct"
|
if result.envelope.transport == "direct"
|
||||||
log_trace("Received direct transport with $(length(result.data)) bytes")
|
log_trace("Received direct transport with ---- bytes")
|
||||||
else
|
else
|
||||||
# For link transport, result.data is the URL
|
# For link transport, result.data is the URL
|
||||||
log_trace("Received link transport at $(result.data)")
|
log_trace("Received link transport at ---")
|
||||||
end
|
end
|
||||||
|
|
||||||
# Verify the received data matches the original
|
# Verify the received data matches the original
|
||||||
@@ -102,7 +101,7 @@ function test_large_binary_receive()
|
|||||||
end
|
end
|
||||||
|
|
||||||
# Keep listening for 10 seconds
|
# Keep listening for 10 seconds
|
||||||
sleep(10)
|
sleep(60)
|
||||||
NATS.drain(conn)
|
NATS.drain(conn)
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -111,12 +110,28 @@ println("Starting large binary payload test...")
|
|||||||
println("Correlation ID: $correlation_id")
|
println("Correlation ID: $correlation_id")
|
||||||
println("Large file: $LARGE_FILE_PATH")
|
println("Large file: $LARGE_FILE_PATH")
|
||||||
|
|
||||||
# Run sender first
|
# # Run sender first
|
||||||
println("start smartsend")
|
# println("start smartsend")
|
||||||
test_large_binary_send()
|
# test_large_binary_send()
|
||||||
|
|
||||||
# Run receiver
|
# Run receiver
|
||||||
println("testing smartreceive")
|
println("testing smartreceive")
|
||||||
test_large_binary_receive()
|
test_large_binary_receive()
|
||||||
|
|
||||||
println("Test completed.")
|
println("Test completed.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
BIN
test_output.bin
Normal file
BIN
test_output.bin
Normal file
Binary file not shown.
Reference in New Issue
Block a user