1st commit
This commit is contained in:
67
test/scenario1_command_control.jl
Normal file
67
test/scenario1_command_control.jl
Normal file
@@ -0,0 +1,67 @@
|
||||
#!/usr/bin/env julia
|
||||
# Scenario 1: Command & Control (Small JSON)
|
||||
# Tests small JSON payloads (< 1MB) sent directly via NATS
|
||||
|
||||
using NATS
|
||||
using JSON3
|
||||
using UUIDs
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/julia_bridge.jl")
|
||||
using .BiDirectionalBridge
|
||||
|
||||
# Configuration
|
||||
const CONTROL_SUBJECT = "control"
|
||||
const RESPONSE_SUBJECT = "control_response"
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
|
||||
# Create correlation ID for tracing
|
||||
correlation_id = string(uuid4())
|
||||
|
||||
# Receiver: Listen for control commands
|
||||
function start_control_listener()
|
||||
conn = NATS.Connection(NATS_URL)
|
||||
try
|
||||
NATS.subscribe(conn, CONTROL_SUBJECT) do msg
|
||||
log_trace(msg.data)
|
||||
|
||||
# Parse the envelope
|
||||
env = MessageEnvelope(String(msg.data))
|
||||
|
||||
# Parse JSON payload
|
||||
config = JSON3.read(env.payload)
|
||||
|
||||
# Execute simulation with parameters
|
||||
step_size = config.step_size
|
||||
iterations = config.iterations
|
||||
|
||||
# Simulate processing
|
||||
sleep(0.1) # Simulate some work
|
||||
|
||||
# Send acknowledgment
|
||||
response = Dict(
|
||||
"status" => "Running",
|
||||
"correlation_id" => env.correlation_id,
|
||||
"step_size" => step_size,
|
||||
"iterations" => iterations
|
||||
)
|
||||
|
||||
NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response))
|
||||
log_trace("Sent response: $(JSON3.stringify(response))")
|
||||
end
|
||||
|
||||
# Keep listening for 5 seconds
|
||||
sleep(5)
|
||||
finally
|
||||
NATS.close(conn)
|
||||
end
|
||||
end
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
||||
end
|
||||
|
||||
# Run the listener
|
||||
start_control_listener()
|
||||
34
test/scenario1_command_control.js
Normal file
34
test/scenario1_command_control.js
Normal file
@@ -0,0 +1,34 @@
|
||||
#!/usr/bin/env node
|
||||
// Scenario 1: Command & Control (Small JSON)
|
||||
// Tests small JSON payloads (< 1MB) sent directly via NATS
|
||||
|
||||
const { SmartSend } = require('../js_bridge');
|
||||
|
||||
// Configuration
|
||||
const CONTROL_SUBJECT = "control";
|
||||
const NATS_URL = "nats://localhost:4222";
|
||||
|
||||
// Create correlation ID for tracing
|
||||
const correlationId = require('uuid').v4();
|
||||
|
||||
// Sender: Send control command to Julia
|
||||
async function sendControlCommand() {
|
||||
const config = {
|
||||
step_size: 0.01,
|
||||
iterations: 1000
|
||||
};
|
||||
|
||||
// Send via SmartSend with type="json"
|
||||
const env = await SmartSend(
|
||||
CONTROL_SUBJECT,
|
||||
config,
|
||||
"json",
|
||||
{ correlationId }
|
||||
);
|
||||
|
||||
console.log(`Sent control command with correlation_id: ${correlationId}`);
|
||||
console.log(`Envelope: ${JSON.stringify(env, null, 2)}`);
|
||||
}
|
||||
|
||||
// Run the sender
|
||||
sendControlCommand().catch(console.error);
|
||||
66
test/scenario2_large_table.jl
Normal file
66
test/scenario2_large_table.jl
Normal file
@@ -0,0 +1,66 @@
|
||||
#!/usr/bin/env julia
|
||||
# Scenario 2: Deep Dive Analysis (Large Arrow Table)
|
||||
# Tests large Arrow tables (> 1MB) sent via HTTP fileserver
|
||||
|
||||
using NATS
|
||||
using Arrow
|
||||
using DataFrames
|
||||
using JSON3
|
||||
using UUIDs
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/julia_bridge.jl")
|
||||
using .BiDirectionalBridge
|
||||
|
||||
# Configuration
|
||||
const ANALYSIS_SUBJECT = "analysis_results"
|
||||
const RESPONSE_SUBJECT = "analysis_response"
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
|
||||
# Create correlation ID for tracing
|
||||
correlation_id = string(uuid4())
|
||||
|
||||
# Receiver: Listen for analysis results
|
||||
function start_analysis_listener()
|
||||
conn = NATS.Connection(NATS_URL)
|
||||
try
|
||||
NATS.subscribe(conn, ANALYSIS_SUBJECT) do msg
|
||||
log_trace("Received message from $(msg.subject)")
|
||||
|
||||
# Parse the envelope
|
||||
env = MessageEnvelope(String(msg.data))
|
||||
|
||||
# Use SmartReceive to handle the data
|
||||
result = SmartReceive(msg)
|
||||
|
||||
# Process the data based on type
|
||||
if result.envelope.type == "table"
|
||||
df = result.data
|
||||
log_trace("Received DataFrame with $(nrows(df)) rows")
|
||||
log_trace("DataFrame columns: $(names(df))")
|
||||
|
||||
# Send acknowledgment
|
||||
response = Dict(
|
||||
"status" => "Processed",
|
||||
"correlation_id" => env.correlation_id,
|
||||
"row_count" => nrows(df)
|
||||
)
|
||||
NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response))
|
||||
end
|
||||
end
|
||||
|
||||
# Keep listening for 10 seconds
|
||||
sleep(10)
|
||||
finally
|
||||
NATS.close(conn)
|
||||
end
|
||||
end
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
||||
end
|
||||
|
||||
# Run the listener
|
||||
start_analysis_listener()
|
||||
54
test/scenario2_large_table.js
Normal file
54
test/scenario2_large_table.js
Normal file
@@ -0,0 +1,54 @@
|
||||
#!/usr/bin/env node
|
||||
// Scenario 2: Deep Dive Analysis (Large Arrow Table)
|
||||
// Tests large Arrow tables (> 1MB) sent via HTTP fileserver
|
||||
|
||||
const { SmartSend } = require('../js_bridge');
|
||||
|
||||
// Configuration
|
||||
const ANALYSIS_SUBJECT = "analysis_results";
|
||||
const NATS_URL = "nats://localhost:4222";
|
||||
|
||||
// Create correlation ID for tracing
|
||||
const correlationId = require('uuid').v4();
|
||||
|
||||
// Sender: Send large Arrow table to Julia
|
||||
async function sendLargeTable() {
|
||||
// Create a large DataFrame-like structure (10 million rows)
|
||||
// For testing, we'll create a smaller but still large table
|
||||
const numRows = 1000000; // 1 million rows
|
||||
|
||||
const data = {
|
||||
id: Array.from({ length: numRows }, (_, i) => i + 1),
|
||||
value: Array.from({ length: numRows }, () => Math.random()),
|
||||
category: Array.from({ length: numRows }, () => ['A', 'B', 'C'][Math.floor(Math.random() * 3)])
|
||||
};
|
||||
|
||||
// Convert to Arrow Table
|
||||
const { Table, Vector, RecordBatch } = require('apache-arrow');
|
||||
|
||||
const idVector = Vector.from(data.id);
|
||||
const valueVector = Vector.from(data.value);
|
||||
const categoryVector = Vector.from(data.category);
|
||||
|
||||
const table = Table.from({
|
||||
id: idVector,
|
||||
value: valueVector,
|
||||
category: categoryVector
|
||||
});
|
||||
|
||||
// Send via SmartSend with type="table"
|
||||
const env = await SmartSend(
|
||||
ANALYSIS_SUBJECT,
|
||||
table,
|
||||
"table",
|
||||
{ correlationId }
|
||||
);
|
||||
|
||||
console.log(`Sent large table with ${numRows} rows`);
|
||||
console.log(`Correlation ID: ${correlationId}`);
|
||||
console.log(`Transport: ${env.transport}`);
|
||||
console.log(`URL: ${env.url || 'N/A'}`);
|
||||
}
|
||||
|
||||
// Run the sender
|
||||
sendLargeTable().catch(console.error);
|
||||
66
test/scenario3_julia_to_julia.jl
Normal file
66
test/scenario3_julia_to_julia.jl
Normal file
@@ -0,0 +1,66 @@
|
||||
#!/usr/bin/env julia
|
||||
# Scenario 3: Julia-to-Julia Service Communication
|
||||
# Tests bi-directional communication between two Julia services
|
||||
|
||||
using NATS
|
||||
using Arrow
|
||||
using DataFrames
|
||||
using JSON3
|
||||
using UUIDs
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/julia_bridge.jl")
|
||||
using .BiDirectionalBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT1 = "julia_to_js"
|
||||
const SUBJECT2 = "js_to_julia"
|
||||
const RESPONSE_SUBJECT = "response"
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
|
||||
# Create correlation ID for tracing
|
||||
correlation_id = string(uuid4())
|
||||
|
||||
# Julia-to-Julia Test: Large Arrow Table
|
||||
function test_julia_to_julia_large_table()
|
||||
conn = NATS.Connection(NATS_URL)
|
||||
try
|
||||
# Subscriber on SUBJECT2 to receive data from Julia sender
|
||||
NATS.subscribe(conn, SUBJECT2) do msg
|
||||
log_trace("[$(Dates.now())] Received on $SUBJECT2")
|
||||
|
||||
# Use SmartReceive to handle the data
|
||||
result = SmartReceive(msg)
|
||||
|
||||
# Check transport type
|
||||
if result.envelope.transport == "direct"
|
||||
log_trace("Received direct transport with $(length(result.data)) bytes")
|
||||
else
|
||||
# For link transport, result.data is the URL
|
||||
log_trace("Received link transport at $(result.data)")
|
||||
end
|
||||
|
||||
# Send response back
|
||||
response = Dict(
|
||||
"status" => "Processed",
|
||||
"correlation_id" => result.envelope.correlation_id,
|
||||
"timestamp" => Dates.now()
|
||||
)
|
||||
NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response))
|
||||
end
|
||||
|
||||
# Keep listening
|
||||
sleep(5)
|
||||
finally
|
||||
NATS.close(conn)
|
||||
end
|
||||
end
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
||||
end
|
||||
|
||||
# Run the test
|
||||
test_julia_to_julia_large_table()
|
||||
148
test/scenario_tests.md
Normal file
148
test/scenario_tests.md
Normal file
@@ -0,0 +1,148 @@
|
||||
# Test Scenarios for Bi-Directional Data Bridge
|
||||
|
||||
## Scenario 1: Command & Control (Small JSON)
|
||||
Tests small JSON payloads (< 1MB) sent directly via NATS.
|
||||
|
||||
### Julia (Receiver)
|
||||
```julia
|
||||
using NATS
|
||||
using JSON3
|
||||
|
||||
# Subscribe to control subject
|
||||
subscribe(nats, "control") do msg
|
||||
env = MessageEnvelope(String(msg.data))
|
||||
|
||||
# Parse JSON payload
|
||||
config = JSON3.read(env.payload)
|
||||
|
||||
# Execute simulation with parameters
|
||||
step_size = config.step_size
|
||||
iterations = config.iterations
|
||||
|
||||
# Send acknowledgment
|
||||
response = Dict("status" => "Running", "correlation_id" => env.correlation_id)
|
||||
publish(nats, "control_response", JSON3.stringify(response))
|
||||
end
|
||||
```
|
||||
|
||||
### JavaScript (Sender)
|
||||
```javascript
|
||||
const { SmartSend } = require('./js_bridge');
|
||||
|
||||
// Create small JSON config
|
||||
const config = {
|
||||
step_size: 0.01,
|
||||
iterations: 1000
|
||||
};
|
||||
|
||||
// Send via SmartSend with type="json"
|
||||
await SmartSend("control", config, "json");
|
||||
```
|
||||
|
||||
## Scenario 2: Deep Dive Analysis (Large Arrow Table)
|
||||
Tests large Arrow tables (> 1MB) sent via HTTP fileserver.
|
||||
|
||||
### Julia (Sender)
|
||||
```julia
|
||||
using Arrow
|
||||
using DataFrames
|
||||
|
||||
# Create large DataFrame (500MB, 10 million rows)
|
||||
df = DataFrame(
|
||||
id = 1:10_000_000,
|
||||
value = rand(10_000_000),
|
||||
category = rand(["A", "B", "C"], 10_000_000)
|
||||
)
|
||||
|
||||
# Convert to Arrow IPC stream and send
|
||||
await SmartSend("analysis_results", df, "table");
|
||||
```
|
||||
|
||||
### JavaScript (Receiver)
|
||||
```javascript
|
||||
const { SmartReceive } = require('./js_bridge');
|
||||
|
||||
// Receive message with URL
|
||||
const result = await SmartReceive(msg);
|
||||
|
||||
// Fetch data from HTTP server
|
||||
const table = result.data;
|
||||
|
||||
// Load into Perspective.js or D3
|
||||
// Use table data for visualization
|
||||
```
|
||||
|
||||
## Scenario 3: Live Binary Processing
|
||||
Tests binary data (binary) sent from JS to Julia for FFT/transcription.
|
||||
|
||||
### JavaScript (Sender)
|
||||
```javascript
|
||||
const { SmartSend } = require('./js_bridge');
|
||||
|
||||
// Capture binary chunk (2 seconds, 44.1kHz, 1 channel)
|
||||
const binaryData = await navigator.mediaDevices.getUserMedia({ binary: true });
|
||||
|
||||
// Send as binary with metadata headers
|
||||
await SmartSend("binary_input", binaryData, "binary", {
|
||||
metadata: {
|
||||
sample_rate: 44100,
|
||||
channels: 1
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
### Julia (Receiver)
|
||||
```julia
|
||||
using WAV
|
||||
using DSP
|
||||
|
||||
# Receive binary data
|
||||
function process_binary(data)
|
||||
# Perform FFT or AI transcription
|
||||
spectrum = fft(data)
|
||||
|
||||
# Send results back (JSON + Arrow table)
|
||||
results = Dict("transcription" => "sample text", "spectrum" => spectrum)
|
||||
await SmartSend("binary_output", results, "json")
|
||||
end
|
||||
```
|
||||
|
||||
## Scenario 4: Catch-Up (JetStream)
|
||||
Tests temporal decoupling with NATS JetStream.
|
||||
|
||||
### Julia (Producer)
|
||||
```julia
|
||||
# Publish to JetStream
|
||||
using NATS
|
||||
|
||||
function publish_health_status(nats)
|
||||
jetstream = JetStream(nats, "health_updates")
|
||||
|
||||
while true
|
||||
status = Dict("cpu" => rand(), "memory" => rand())
|
||||
publish(jetstream, "health", status)
|
||||
sleep(5) # Every 5 seconds
|
||||
end
|
||||
end
|
||||
```
|
||||
|
||||
### JavaScript (Consumer)
|
||||
```javascript
|
||||
const { connect } = require('nats');
|
||||
|
||||
const nc = await connect({ servers: ['nats://localhost:4222'] });
|
||||
const js = nc.jetstream();
|
||||
|
||||
// Request replay from last 10 minutes
|
||||
const consumer = await js.pullSubscribe("health", {
|
||||
durable_name: "catchup",
|
||||
max_batch: 100,
|
||||
max_ack_wait: 30000
|
||||
});
|
||||
|
||||
// Process historical and real-time messages
|
||||
for await (const msg of consumer) {
|
||||
const result = await SmartReceive(msg);
|
||||
// Process the data
|
||||
msg.ack();
|
||||
}
|
||||
121
test/test_large_payload.jl
Normal file
121
test/test_large_payload.jl
Normal file
@@ -0,0 +1,121 @@
|
||||
#!/usr/bin/env julia
|
||||
# Test script for large payload testing using binary transport
|
||||
# Tests sending a large file (> 1MB) via smartsend with binary type
|
||||
|
||||
using NATS, JSON, UUIDs, Dates
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/large_binary_test"
|
||||
const NATS_URL = "nats.yiem.cc"
|
||||
const FILESERVER_URL = "http://192.168.88.104:8080"
|
||||
|
||||
# Create correlation ID for tracing
|
||||
correlation_id = string(uuid4())
|
||||
|
||||
# File path for large binary payload test
|
||||
const LARGE_FILE_PATH = "./test.zip"
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
timestamp = Dates.now()
|
||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
||||
end
|
||||
|
||||
# Sender: Send large binary file via smartsend
|
||||
function test_large_binary_send()
|
||||
conn = NATS.connect(NATS_URL)
|
||||
# Read the large file as binary data
|
||||
log_trace("Reading large file: $LARGE_FILE_PATH")
|
||||
file_data = read(LARGE_FILE_PATH)
|
||||
|
||||
file_size = length(file_data)
|
||||
log_trace("File size: $file_size bytes")
|
||||
|
||||
# Use smartsend with binary type - will automatically use link transport
|
||||
# if file size exceeds the threshold (1MB by default)
|
||||
env = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
file_data,
|
||||
"binary",
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL;
|
||||
dataname="test.zip"
|
||||
)
|
||||
|
||||
log_trace("Sent message with transport: $(env.transport)")
|
||||
log_trace("Envelope type: $(env.type)")
|
||||
|
||||
# Check if link transport was used
|
||||
if env.transport == "link"
|
||||
log_trace("Using link transport - file uploaded to HTTP server")
|
||||
log_trace("URL: $(env.url)")
|
||||
else
|
||||
log_trace("Using direct transport - payload sent via NATS")
|
||||
end
|
||||
|
||||
NATS.drain(conn)
|
||||
end
|
||||
|
||||
# Receiver: Listen for messages and verify large payload handling
|
||||
function test_large_binary_receive()
|
||||
conn = NATS.connect(NATS_URL)
|
||||
NATS.subscribe(conn, SUBJECT) do msg
|
||||
log_trace("Received message on $(msg.subject)")
|
||||
log_trace("Received message:\n$msg")
|
||||
|
||||
# Use SmartReceive to handle the data
|
||||
result = SmartReceive(msg)
|
||||
|
||||
# Check transport type
|
||||
if result.envelope.transport == "direct"
|
||||
log_trace("Received direct transport with $(length(result.data)) bytes")
|
||||
else
|
||||
# For link transport, result.data is the URL
|
||||
log_trace("Received link transport at $(result.data)")
|
||||
end
|
||||
|
||||
# Verify the received data matches the original
|
||||
if result.envelope.type == "binary"
|
||||
if isa(result.data, Vector{UInt8})
|
||||
file_size = length(result.data)
|
||||
log_trace("Received $(file_size) bytes of binary data")
|
||||
|
||||
# Save received data to a test file
|
||||
output_path = "test_output.bin"
|
||||
write(output_path, result.data)
|
||||
log_trace("Saved received data to $output_path")
|
||||
|
||||
# Verify file size
|
||||
original_size = length(read(LARGE_FILE_PATH))
|
||||
if file_size == original_size
|
||||
log_trace("SUCCESS: File size matches! Original: $original_size bytes")
|
||||
else
|
||||
log_trace("WARNING: File size mismatch! Original: $original_size, Received: $file_size")
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Keep listening for 10 seconds
|
||||
sleep(10)
|
||||
NATS.drain(conn)
|
||||
end
|
||||
|
||||
# Run the test
|
||||
println("Starting large binary payload test...")
|
||||
println("Correlation ID: $correlation_id")
|
||||
println("Large file: $LARGE_FILE_PATH")
|
||||
|
||||
# Run sender first
|
||||
println("start smartsend")
|
||||
test_large_binary_send()
|
||||
|
||||
# Run receiver
|
||||
println("testing smartreceive")
|
||||
test_large_binary_receive()
|
||||
|
||||
println("Test completed.")
|
||||
Reference in New Issue
Block a user