diff --git a/architecture.md b/docs/architecture.md similarity index 100% rename from architecture.md rename to docs/architecture.md diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 902613e..027016d 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -6,7 +6,7 @@ module NATSBridge -using NATS, JSON, Arrow, HTTP, UUIDs, Dates +using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64 # ---------------------------------------------- 100 --------------------------------------------- # # Constants @@ -155,14 +155,14 @@ env = smartsend("large.data", data, "arrow") # In another process, retrieve and deserialize: # msg = subscribe(nats_url, "my.subject") # env = json_to_envelope(msg.data) -# data = _deserialize_data(base64decode(env.payload), env.type) +# data = _deserialize_data(Base64.decode(env.payload), env.type) ``` """ function smartsend( subject::String, # smartreceive's subject data::Any, type::String = "json"; - dataname=string(UUIDs.uuid4()), + dataname="NA", nats_url::String = DEFAULT_NATS_URL, fileserver_url::String = DEFAULT_FILESERVER_URL, fileServerUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver @@ -183,7 +183,7 @@ function smartsend( # Decision: Direct vs Link if payload_size < size_threshold # Check if payload is small enough for direct transport # Direct path - Base64 encode and send via NATS - payload_b64 = base64encode(payload_bytes) # Encode bytes as base64 string + payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice env = MessageEnvelope( # Create envelope for direct transport @@ -191,7 +191,7 @@ function smartsend( type = type, transport = "direct", payload = payload_b64, - metadata = Dict("content_length" => payload_size, "format" => "arrow_ipc_stream") + metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") ) msg_json = envelope_to_json(env) # Convert envelope to JSON @@ -217,7 +217,7 @@ function smartsend( type = type, transport = "link", url = url, - metadata = Dict("content_length" => payload_size, "format" => "arrow_ipc_stream") + metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") ) msg_json = envelope_to_json(env) # Convert envelope to JSON @@ -346,8 +346,7 @@ function smartreceive( max_delay::Int = 5000 ) # Parse the envelope - env = MessageEnvelope(String(msg.data)) # Parse NATS message data as JSON envelope - + env = MessageEnvelope(String(msg.payload)) # Parse NATS message data as JSON envelope log_trace(env.correlation_id, "Processing received message") # Log message processing start # Check transport type @@ -355,22 +354,22 @@ function smartreceive( log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling # Decode Base64 payload - payload_bytes = base64decode(env.payload) # Decode base64 payload to bytes + payload_bytes = Base64.base64decode(env.payload) # Decode base64 payload to bytes # Deserialize based on type data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data - + return (data = data, envelope = env) # Return data and envelope as tuple elseif env.transport == "link" # Link transport - payload is at URL log_trace(env.correlation_id, "Link transport - fetching from URL") # Log link transport handling # Fetch with exponential backoff - data = _fetch_with_backoff(env.url, max_retries, base_delay, max_delay, env.correlation_id) # Fetch data from URL + downloaded_data = _fetch_with_backoff(env.url, max_retries, base_delay, max_delay, env.correlation_id) # Fetch data from URL # Deserialize based on type - result = _deserialize_data(data, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data + data = _deserialize_data(downloaded_data, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data - return (data = result, envelope = env) # Return data and envelope as tuple + return (data = data, envelope = env) # Return data and envelope as tuple else # Unknown transport type error("Unknown transport type: $(env.transport)") # Throw error for unknown transport end @@ -457,19 +456,19 @@ function _deserialize_data( end -""" Decode base64 string to bytes -This internal function decodes a base64-encoded string back to binary data. -It's a wrapper around Base64.decode for consistency in the module. +# """ Decode base64 string to bytes +# This internal function decodes a base64-encoded string back to binary data. +# It's a wrapper around Base64.decode for consistency in the module. -Arguments: - - `str::String` - Base64-encoded string to decode +# Arguments: +# - `str::String` - Base64-encoded string to decode -Return: - - Vector{UInt8} - Decoded binary data -""" -function base64decode(str::String) - return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module -end +# Return: +# - Vector{UInt8} - Decoded binary data +# """ +# function base64decode(str::String) +# return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module +# end """ plik_oneshot_upload - Upload a single file to a plik server using one-shot mode diff --git a/test/scenario3_julia_to_julia.jl b/test/scenario3_julia_to_julia.jl deleted file mode 100644 index 42c21d3..0000000 --- a/test/scenario3_julia_to_julia.jl +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env julia -# Scenario 3: Julia-to-Julia Service Communication -# Tests bi-directional communication between two Julia services - -using NATS -using Arrow -using DataFrames -using JSON3 -using UUIDs - -# Include the bridge module -include("../src/julia_bridge.jl") -using .BiDirectionalBridge - -# Configuration -const SUBJECT1 = "julia_to_js" -const SUBJECT2 = "js_to_julia" -const RESPONSE_SUBJECT = "response" -const NATS_URL = "nats://localhost:4222" - -# Create correlation ID for tracing -correlation_id = string(uuid4()) - -# Julia-to-Julia Test: Large Arrow Table -function test_julia_to_julia_large_table() - conn = NATS.Connection(NATS_URL) - try - # Subscriber on SUBJECT2 to receive data from Julia sender - NATS.subscribe(conn, SUBJECT2) do msg - log_trace("[$(Dates.now())] Received on $SUBJECT2") - - # Use SmartReceive to handle the data - result = SmartReceive(msg) - - # Check transport type - if result.envelope.transport == "direct" - log_trace("Received direct transport with $(length(result.data)) bytes") - else - # For link transport, result.data is the URL - log_trace("Received link transport at $(result.data)") - end - - # Send response back - response = Dict( - "status" => "Processed", - "correlation_id" => result.envelope.correlation_id, - "timestamp" => Dates.now() - ) - NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response)) - end - - # Keep listening - sleep(5) - finally - NATS.close(conn) - end -end - -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end - -# Run the test -test_julia_to_julia_large_table() \ No newline at end of file diff --git a/test/scenario_tests.md b/test/scenario_tests.md deleted file mode 100644 index 7b2df9a..0000000 --- a/test/scenario_tests.md +++ /dev/null @@ -1,148 +0,0 @@ -# Test Scenarios for Bi-Directional Data Bridge - -## Scenario 1: Command & Control (Small JSON) -Tests small JSON payloads (< 1MB) sent directly via NATS. - -### Julia (Receiver) -```julia -using NATS -using JSON3 - -# Subscribe to control subject -subscribe(nats, "control") do msg - env = MessageEnvelope(String(msg.data)) - - # Parse JSON payload - config = JSON3.read(env.payload) - - # Execute simulation with parameters - step_size = config.step_size - iterations = config.iterations - - # Send acknowledgment - response = Dict("status" => "Running", "correlation_id" => env.correlation_id) - publish(nats, "control_response", JSON3.stringify(response)) -end -``` - -### JavaScript (Sender) -```javascript -const { SmartSend } = require('./js_bridge'); - -// Create small JSON config -const config = { - step_size: 0.01, - iterations: 1000 -}; - -// Send via SmartSend with type="json" -await SmartSend("control", config, "json"); -``` - -## Scenario 2: Deep Dive Analysis (Large Arrow Table) -Tests large Arrow tables (> 1MB) sent via HTTP fileserver. - -### Julia (Sender) -```julia -using Arrow -using DataFrames - -# Create large DataFrame (500MB, 10 million rows) -df = DataFrame( - id = 1:10_000_000, - value = rand(10_000_000), - category = rand(["A", "B", "C"], 10_000_000) -) - -# Convert to Arrow IPC stream and send -await SmartSend("analysis_results", df, "table"); -``` - -### JavaScript (Receiver) -```javascript -const { SmartReceive } = require('./js_bridge'); - -// Receive message with URL -const result = await SmartReceive(msg); - -// Fetch data from HTTP server -const table = result.data; - -// Load into Perspective.js or D3 -// Use table data for visualization -``` - -## Scenario 3: Live Binary Processing -Tests binary data (binary) sent from JS to Julia for FFT/transcription. - -### JavaScript (Sender) -```javascript -const { SmartSend } = require('./js_bridge'); - -// Capture binary chunk (2 seconds, 44.1kHz, 1 channel) -const binaryData = await navigator.mediaDevices.getUserMedia({ binary: true }); - -// Send as binary with metadata headers -await SmartSend("binary_input", binaryData, "binary", { - metadata: { - sample_rate: 44100, - channels: 1 - } -}); -``` - -### Julia (Receiver) -```julia -using WAV -using DSP - -# Receive binary data -function process_binary(data) - # Perform FFT or AI transcription - spectrum = fft(data) - - # Send results back (JSON + Arrow table) - results = Dict("transcription" => "sample text", "spectrum" => spectrum) - await SmartSend("binary_output", results, "json") -end -``` - -## Scenario 4: Catch-Up (JetStream) -Tests temporal decoupling with NATS JetStream. - -### Julia (Producer) -```julia -# Publish to JetStream -using NATS - -function publish_health_status(nats) - jetstream = JetStream(nats, "health_updates") - - while true - status = Dict("cpu" => rand(), "memory" => rand()) - publish(jetstream, "health", status) - sleep(5) # Every 5 seconds - end -end -``` - -### JavaScript (Consumer) -```javascript -const { connect } = require('nats'); - -const nc = await connect({ servers: ['nats://localhost:4222'] }); -const js = nc.jetstream(); - -// Request replay from last 10 minutes -const consumer = await js.pullSubscribe("health", { - durable_name: "catchup", - max_batch: 100, - max_ack_wait: 30000 -}); - -// Process historical and real-time messages -for await (const msg of consumer) { - const result = await SmartReceive(msg); - // Process the data - msg.ack(); -} \ No newline at end of file diff --git a/test/test_large_payload.jl b/test/test_julia_julia_data_transfer.jl similarity index 63% rename from test/test_large_payload.jl rename to test/test_julia_julia_data_transfer.jl index af4350e..4e65d6f 100644 --- a/test/test_large_payload.jl +++ b/test/test_julia_julia_data_transfer.jl @@ -16,8 +16,14 @@ const FILESERVER_URL = "http://192.168.88.104:8080" # Create correlation ID for tracing correlation_id = string(uuid4()) + +# ------------------------------------------------------------------------------------------------ # +# test file transfer # +# ------------------------------------------------------------------------------------------------ # + # File path for large binary payload test -const LARGE_FILE_PATH = "./test.zip" +const FILE_PATH = "./testFile_small.zip" +const filename = basename(FILE_PATH) # Helper: Log with correlation ID function log_trace(message) @@ -29,8 +35,8 @@ end function test_large_binary_send() conn = NATS.connect(NATS_URL) # Read the large file as binary data - log_trace("Reading large file: $LARGE_FILE_PATH") - file_data = read(LARGE_FILE_PATH) + log_trace("Reading large file: $FILE_PATH") + file_data = read(FILE_PATH) file_size = length(file_data) log_trace("File size: $file_size bytes") @@ -43,7 +49,7 @@ function test_large_binary_send() "binary", nats_url = NATS_URL, fileserver_url = FILESERVER_URL; - dataname="test.zip" + dataname=filename ) log_trace("Sent message with transport: $(env.transport)") @@ -65,17 +71,15 @@ function test_large_binary_receive() conn = NATS.connect(NATS_URL) NATS.subscribe(conn, SUBJECT) do msg log_trace("Received message on $(msg.subject)") - log_trace("Received message:\n$msg") - - # Use SmartReceive to handle the data - result = SmartReceive(msg) + # Use NATSBridge.smartreceive to handle the data + result = NATSBridge.smartreceive(msg) # Check transport type if result.envelope.transport == "direct" - log_trace("Received direct transport with $(length(result.data)) bytes") + log_trace("Received direct transport") else # For link transport, result.data is the URL - log_trace("Received link transport at $(result.data)") + log_trace("Received link transport") end # Verify the received data matches the original @@ -85,37 +89,102 @@ function test_large_binary_receive() log_trace("Received $(file_size) bytes of binary data") # Save received data to a test file - output_path = "test_output.bin" - write(output_path, result.data) - log_trace("Saved received data to $output_path") + println("metadata ", result.envelope.metadata) + dataname = result.envelope.metadata["dataname"] + if dataname != "NA" + output_path = "./new_$dataname" + write(output_path, result.data) + log_trace("Saved received data to $output_path") + end # Verify file size - original_size = length(read(LARGE_FILE_PATH)) - if file_size == original_size - log_trace("SUCCESS: File size matches! Original: $original_size bytes") + original_size = length(read(FILE_PATH)) + if file_size == result.envelope.metadata["content_length"] + log_trace("SUCCESS: File size matches! Original: $(result.envelope.metadata["content_length"]) bytes") else - log_trace("WARNING: File size mismatch! Original: $original_size, Received: $file_size") + log_trace("WARNING: File size mismatch! Original: $(result.envelope.metadata["content_length"]), Received: $file_size") end end end end - + # Keep listening for 10 seconds - sleep(10) + sleep(120) NATS.drain(conn) end + # Run the test println("Starting large binary payload test...") println("Correlation ID: $correlation_id") -println("Large file: $LARGE_FILE_PATH") +println("File: $FILE_PATH") # Run sender first println("start smartsend") test_large_binary_send() -# Run receiver -println("testing smartreceive") -test_large_binary_receive() +# # Run receiver +# println("testing smartreceive") +# test_large_binary_receive() + +println("Test completed.") + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + -println("Test completed.") \ No newline at end of file diff --git a/largefile.zip b/testFile_large.zip similarity index 100% rename from largefile.zip rename to testFile_large.zip diff --git a/testFile_small.zip b/testFile_small.zip new file mode 100644 index 0000000..75f1413 Binary files /dev/null and b/testFile_small.zip differ