diff --git a/new_testFile_large.zip b/new_testFile_large.zip deleted file mode 100644 index 56dfcd1..0000000 Binary files a/new_testFile_large.zip and /dev/null differ diff --git a/new_testFile_small.zip b/new_testFile_small.zip deleted file mode 100644 index 75f1413..0000000 Binary files a/new_testFile_small.zip and /dev/null differ diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 3de00d4..21216fb 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -57,14 +57,14 @@ struct msgPayload_v1 end # constructor -function msgPayload_v1(; +function msgPayload_v1( + data::Any, + type::String; id::String = "", - dataname::String = "", - type::String = "text", + dataname::String = string(uuid4()), transport::String = "direct", encoding::String = "none", size::Integer = 0, - data::Any = nothing, metadata::Dict{String, T} = Dict{String, Any}() ) where {T<:Any} return msgPayload_v1( @@ -101,11 +101,12 @@ struct msgEnvelope_v1 end # constructor -function msgEnvelope_v1(; +function msgEnvelope_v1( + sendTo::String, + payloads::AbstractArray{msgPayload_v1}; correlationId::String = "", msgId::String = "", - timestamp::String = "", - sendTo::String = "", + timestamp::String = string(Dates.now()), msgPurpose::String = "", senderName::String = "", senderId::String = "", @@ -114,8 +115,7 @@ function msgEnvelope_v1(; replyTo::String = "", replyToMsgId::String = "", brokerURL::String = DEFAULT_NATS_URL, - metadata::Dict{String, Any} = Dict{String, Any}(), - payloads::AbstractArray{msgPayload_v1} = msgPayload_v1[] + metadata::Dict{String, Any} = Dict{String, Any}() ) return msgEnvelope_v1( correlationId, @@ -309,7 +309,6 @@ function smartsend( # Generate message metadata msg_id = string(uuid4()) - timestamp = string(Dates.now()) # Process each payload in the list payloads = msgPayload_v1[] @@ -328,14 +327,14 @@ function smartsend( # Create msgPayload_v1 for direct transport payload = msgPayload_v1( - id = string(uuid4()), - dataname = dataname, - type = payload_type, - transport = "direct", - encoding = "base64", - size = payload_size, - data = payload_b64, - metadata = Dict{String, Any}("payload_bytes" => payload_size) + payload_b64, + payload_type; + id = string(uuid4()), + dataname = dataname, + transport = "direct", + encoding = "base64", + size = payload_size, + metadata = Dict{String, Any}("payload_bytes" => payload_size) ) push!(payloads, payload) else @@ -354,14 +353,14 @@ function smartsend( # Create msgPayload_v1 for link transport payload = msgPayload_v1( - id = string(uuid4()), - dataname = dataname, - type = payload_type, - transport = "link", - encoding = "none", - size = payload_size, - data = url, - metadata = Dict{String, Any}() + url, + payload_type; + id = string(uuid4()), + dataname = dataname, + transport = "link", + encoding = "none", + size = payload_size, + metadata = Dict{String, Any}() ) push!(payloads, payload) end @@ -369,20 +368,19 @@ function smartsend( # Create msgEnvelope_v1 with all payloads env = msgEnvelope_v1( - correlationId = cid, - msgId = msg_id, - timestamp = timestamp, - sendTo = subject, - msgPurpose = msg_purpose, - senderName = sender_name, - senderId = string(uuid4()), - receiverName = receiver_name, - receiverId = receiver_id, - replyTo = reply_to, - replyToMsgId = reply_to_msg_id, - brokerURL = nats_url, - metadata = Dict{String, Any}(), - payloads = payloads + subject, + payloads; + correlationId = cid, + msgId = msg_id, + msgPurpose = msg_purpose, + senderName = sender_name, + senderId = string(uuid4()), + receiverName = receiver_name, + receiverId = receiver_id, + replyTo = reply_to, + replyToMsgId = reply_to_msg_id, + brokerURL = nats_url, + metadata = Dict{String, Any}(), ) msg_json = envelope_to_json(env) # Convert envelope to JSON diff --git a/test/test_julia_julia_data_transfer_receive.jl b/test/julia_to_julia_file_transfer_receiver.jl similarity index 100% rename from test/test_julia_julia_data_transfer_receive.jl rename to test/julia_to_julia_file_transfer_receiver.jl diff --git a/test/test_julia_julia_data_transfer_send.jl b/test/julia_to_julia_file_transfer_sender.jl similarity index 100% rename from test/test_julia_julia_data_transfer_send.jl rename to test/julia_to_julia_file_transfer_sender.jl diff --git a/test/scenario1_command_control.jl b/test/scenario1_command_control.jl deleted file mode 100644 index d81da80..0000000 --- a/test/scenario1_command_control.jl +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env julia -# Scenario 1: Command & Control (Small JSON) -# Tests small JSON payloads (< 1MB) sent directly via NATS - -using NATS -using JSON3 -using UUIDs - -# Include the bridge module -include("../src/julia_bridge.jl") -using .BiDirectionalBridge - -# Configuration -const CONTROL_SUBJECT = "control" -const RESPONSE_SUBJECT = "control_response" -const NATS_URL = "nats://localhost:4222" - -# Create correlation ID for tracing -correlation_id = string(uuid4()) - -# Receiver: Listen for control commands -function start_control_listener() - conn = NATS.Connection(NATS_URL) - try - NATS.subscribe(conn, CONTROL_SUBJECT) do msg - log_trace(msg.data) - - # Parse the envelope - env = MessageEnvelope(String(msg.data)) - - # Parse JSON payload - config = JSON3.read(env.payload) - - # Execute simulation with parameters - step_size = config.step_size - iterations = config.iterations - - # Simulate processing - sleep(0.1) # Simulate some work - - # Send acknowledgment - response = Dict( - "status" => "Running", - "correlation_id" => env.correlation_id, - "step_size" => step_size, - "iterations" => iterations - ) - - NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response)) - log_trace("Sent response: $(JSON3.stringify(response))") - end - - # Keep listening for 5 seconds - sleep(5) - finally - NATS.close(conn) - end -end - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - -# Run the listener -start_control_listener() \ No newline at end of file diff --git a/test/scenario1_command_control.js b/test/scenario1_command_control.js deleted file mode 100644 index 742c155..0000000 --- a/test/scenario1_command_control.js +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env node -// Scenario 1: Command & Control (Small JSON) -// Tests small JSON payloads (< 1MB) sent directly via NATS - -const { SmartSend } = require('../js_bridge'); - -// Configuration -const CONTROL_SUBJECT = "control"; -const NATS_URL = "nats://localhost:4222"; - -// Create correlation ID for tracing -const correlationId = require('uuid').v4(); - -// Sender: Send control command to Julia -async function sendControlCommand() { - const config = { - step_size: 0.01, - iterations: 1000 - }; - - // Send via SmartSend with type="json" - const env = await SmartSend( - CONTROL_SUBJECT, - config, - "json", - { correlationId } - ); - - console.log(`Sent control command with correlation_id: ${correlationId}`); - console.log(`Envelope: ${JSON.stringify(env, null, 2)}`); -} - -// Run the sender -sendControlCommand().catch(console.error); \ No newline at end of file diff --git a/test/scenario2_large_table.jl b/test/scenario2_large_table.jl deleted file mode 100644 index 0c2d328..0000000 --- a/test/scenario2_large_table.jl +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env julia -# Scenario 2: Deep Dive Analysis (Large Arrow Table) -# Tests large Arrow tables (> 1MB) sent via HTTP fileserver - -using NATS -using Arrow -using DataFrames -using JSON3 -using UUIDs - -# Include the bridge module -include("../src/julia_bridge.jl") -using .BiDirectionalBridge - -# Configuration -const ANALYSIS_SUBJECT = "analysis_results" -const RESPONSE_SUBJECT = "analysis_response" -const NATS_URL = "nats://localhost:4222" - -# Create correlation ID for tracing -correlation_id = string(uuid4()) - -# Receiver: Listen for analysis results -function start_analysis_listener() - conn = NATS.Connection(NATS_URL) - try - NATS.subscribe(conn, ANALYSIS_SUBJECT) do msg - log_trace("Received message from $(msg.subject)") - - # Parse the envelope - env = MessageEnvelope(String(msg.data)) - - # Use SmartReceive to handle the data - result = SmartReceive(msg) - - # Process the data based on type - if result.envelope.type == "table" - df = result.data - log_trace("Received DataFrame with $(nrows(df)) rows") - log_trace("DataFrame columns: $(names(df))") - - # Send acknowledgment - response = Dict( - "status" => "Processed", - "correlation_id" => env.correlation_id, - "row_count" => nrows(df) - ) - NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response)) - end - end - - # Keep listening for 10 seconds - sleep(10) - finally - NATS.close(conn) - end -end - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - -# Run the listener -start_analysis_listener() \ No newline at end of file diff --git a/test/scenario2_large_table.js b/test/scenario2_large_table.js deleted file mode 100644 index dd0f41c..0000000 --- a/test/scenario2_large_table.js +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env node -// Scenario 2: Deep Dive Analysis (Large Arrow Table) -// Tests large Arrow tables (> 1MB) sent via HTTP fileserver - -const { SmartSend } = require('../js_bridge'); - -// Configuration -const ANALYSIS_SUBJECT = "analysis_results"; -const NATS_URL = "nats://localhost:4222"; - -// Create correlation ID for tracing -const correlationId = require('uuid').v4(); - -// Sender: Send large Arrow table to Julia -async function sendLargeTable() { - // Create a large DataFrame-like structure (10 million rows) - // For testing, we'll create a smaller but still large table - const numRows = 1000000; // 1 million rows - - const data = { - id: Array.from({ length: numRows }, (_, i) => i + 1), - value: Array.from({ length: numRows }, () => Math.random()), - category: Array.from({ length: numRows }, () => ['A', 'B', 'C'][Math.floor(Math.random() * 3)]) - }; - - // Convert to Arrow Table - const { Table, Vector, RecordBatch } = require('apache-arrow'); - - const idVector = Vector.from(data.id); - const valueVector = Vector.from(data.value); - const categoryVector = Vector.from(data.category); - - const table = Table.from({ - id: idVector, - value: valueVector, - category: categoryVector - }); - - // Send via SmartSend with type="table" - const env = await SmartSend( - ANALYSIS_SUBJECT, - table, - "table", - { correlationId } - ); - - console.log(`Sent large table with ${numRows} rows`); - console.log(`Correlation ID: ${correlationId}`); - console.log(`Transport: ${env.transport}`); - console.log(`URL: ${env.url || 'N/A'}`); -} - -// Run the sender -sendLargeTable().catch(console.error); \ No newline at end of file