test_smartreceive_function #1

Merged
ton merged 15 commits from test_smartreceive_function into main 2026-02-12 23:34:18 +00:00
7 changed files with 116 additions and 262 deletions

View File

@@ -6,7 +6,7 @@
module NATSBridge module NATSBridge
using NATS, JSON, Arrow, HTTP, UUIDs, Dates using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64
# ---------------------------------------------- 100 --------------------------------------------- # # ---------------------------------------------- 100 --------------------------------------------- #
# Constants # Constants
@@ -155,14 +155,14 @@ env = smartsend("large.data", data, "arrow")
# In another process, retrieve and deserialize: # In another process, retrieve and deserialize:
# msg = subscribe(nats_url, "my.subject") # msg = subscribe(nats_url, "my.subject")
# env = json_to_envelope(msg.data) # env = json_to_envelope(msg.data)
# data = _deserialize_data(base64decode(env.payload), env.type) # data = _deserialize_data(Base64.decode(env.payload), env.type)
``` ```
""" """
function smartsend( function smartsend(
subject::String, # smartreceive's subject subject::String, # smartreceive's subject
data::Any, data::Any,
type::String = "json"; type::String = "json";
dataname=string(UUIDs.uuid4()), dataname="NA",
nats_url::String = DEFAULT_NATS_URL, nats_url::String = DEFAULT_NATS_URL,
fileserver_url::String = DEFAULT_FILESERVER_URL, fileserver_url::String = DEFAULT_FILESERVER_URL,
fileServerUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver fileServerUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
@@ -183,7 +183,7 @@ function smartsend(
# Decision: Direct vs Link # Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport if payload_size < size_threshold # Check if payload is small enough for direct transport
# Direct path - Base64 encode and send via NATS # Direct path - Base64 encode and send via NATS
payload_b64 = base64encode(payload_bytes) # Encode bytes as base64 string payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
env = MessageEnvelope( # Create envelope for direct transport env = MessageEnvelope( # Create envelope for direct transport
@@ -191,7 +191,7 @@ function smartsend(
type = type, type = type,
transport = "direct", transport = "direct",
payload = payload_b64, payload = payload_b64,
metadata = Dict("content_length" => payload_size, "format" => "arrow_ipc_stream") metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream")
) )
msg_json = envelope_to_json(env) # Convert envelope to JSON msg_json = envelope_to_json(env) # Convert envelope to JSON
@@ -217,7 +217,7 @@ function smartsend(
type = type, type = type,
transport = "link", transport = "link",
url = url, url = url,
metadata = Dict("content_length" => payload_size, "format" => "arrow_ipc_stream") metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream")
) )
msg_json = envelope_to_json(env) # Convert envelope to JSON msg_json = envelope_to_json(env) # Convert envelope to JSON
@@ -346,8 +346,7 @@ function smartreceive(
max_delay::Int = 5000 max_delay::Int = 5000
) )
# Parse the envelope # Parse the envelope
env = MessageEnvelope(String(msg.data)) # Parse NATS message data as JSON envelope env = MessageEnvelope(String(msg.payload)) # Parse NATS message data as JSON envelope
log_trace(env.correlation_id, "Processing received message") # Log message processing start log_trace(env.correlation_id, "Processing received message") # Log message processing start
# Check transport type # Check transport type
@@ -355,7 +354,7 @@ function smartreceive(
log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling
# Decode Base64 payload # Decode Base64 payload
payload_bytes = base64decode(env.payload) # Decode base64 payload to bytes payload_bytes = Base64.base64decode(env.payload) # Decode base64 payload to bytes
# Deserialize based on type # Deserialize based on type
data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
@@ -365,12 +364,12 @@ function smartreceive(
log_trace(env.correlation_id, "Link transport - fetching from URL") # Log link transport handling log_trace(env.correlation_id, "Link transport - fetching from URL") # Log link transport handling
# Fetch with exponential backoff # Fetch with exponential backoff
data = _fetch_with_backoff(env.url, max_retries, base_delay, max_delay, env.correlation_id) # Fetch data from URL downloaded_data = _fetch_with_backoff(env.url, max_retries, base_delay, max_delay, env.correlation_id) # Fetch data from URL
# Deserialize based on type # Deserialize based on type
result = _deserialize_data(data, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data data = _deserialize_data(downloaded_data, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
return (data = result, envelope = env) # Return data and envelope as tuple return (data = data, envelope = env) # Return data and envelope as tuple
else # Unknown transport type else # Unknown transport type
error("Unknown transport type: $(env.transport)") # Throw error for unknown transport error("Unknown transport type: $(env.transport)") # Throw error for unknown transport
end end
@@ -457,19 +456,19 @@ function _deserialize_data(
end end
""" Decode base64 string to bytes # """ Decode base64 string to bytes
This internal function decodes a base64-encoded string back to binary data. # This internal function decodes a base64-encoded string back to binary data.
It's a wrapper around Base64.decode for consistency in the module. # It's a wrapper around Base64.decode for consistency in the module.
Arguments: # Arguments:
- `str::String` - Base64-encoded string to decode # - `str::String` - Base64-encoded string to decode
Return: # Return:
- Vector{UInt8} - Decoded binary data # - Vector{UInt8} - Decoded binary data
""" # """
function base64decode(str::String) # function base64decode(str::String)
return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module # return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module
end # end
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode """ plik_oneshot_upload - Upload a single file to a plik server using one-shot mode

View File

@@ -1,66 +0,0 @@
#!/usr/bin/env julia
# Scenario 3: Julia-to-Julia Service Communication
# Tests bi-directional communication between two Julia services
using NATS
using Arrow
using DataFrames
using JSON3
using UUIDs
# Include the bridge module
include("../src/julia_bridge.jl")
using .BiDirectionalBridge
# Configuration
const SUBJECT1 = "julia_to_js"
const SUBJECT2 = "js_to_julia"
const RESPONSE_SUBJECT = "response"
const NATS_URL = "nats://localhost:4222"
# Create correlation ID for tracing
correlation_id = string(uuid4())
# Julia-to-Julia Test: Large Arrow Table
function test_julia_to_julia_large_table()
conn = NATS.Connection(NATS_URL)
try
# Subscriber on SUBJECT2 to receive data from Julia sender
NATS.subscribe(conn, SUBJECT2) do msg
log_trace("[$(Dates.now())] Received on $SUBJECT2")
# Use SmartReceive to handle the data
result = SmartReceive(msg)
# Check transport type
if result.envelope.transport == "direct"
log_trace("Received direct transport with $(length(result.data)) bytes")
else
# For link transport, result.data is the URL
log_trace("Received link transport at $(result.data)")
end
# Send response back
response = Dict(
"status" => "Processed",
"correlation_id" => result.envelope.correlation_id,
"timestamp" => Dates.now()
)
NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response))
end
# Keep listening
sleep(5)
finally
NATS.close(conn)
end
end
# Helper: Log with correlation ID
function log_trace(message)
timestamp = Dates.now()
println("[$timestamp] [Correlation: $correlation_id] $message")
end
# Run the test
test_julia_to_julia_large_table()

View File

@@ -1,148 +0,0 @@
# Test Scenarios for Bi-Directional Data Bridge
## Scenario 1: Command & Control (Small JSON)
Tests small JSON payloads (< 1MB) sent directly via NATS.
### Julia (Receiver)
```julia
using NATS
using JSON3
# Subscribe to control subject
subscribe(nats, "control") do msg
env = MessageEnvelope(String(msg.data))
# Parse JSON payload
config = JSON3.read(env.payload)
# Execute simulation with parameters
step_size = config.step_size
iterations = config.iterations
# Send acknowledgment
response = Dict("status" => "Running", "correlation_id" => env.correlation_id)
publish(nats, "control_response", JSON3.stringify(response))
end
```
### JavaScript (Sender)
```javascript
const { SmartSend } = require('./js_bridge');
// Create small JSON config
const config = {
step_size: 0.01,
iterations: 1000
};
// Send via SmartSend with type="json"
await SmartSend("control", config, "json");
```
## Scenario 2: Deep Dive Analysis (Large Arrow Table)
Tests large Arrow tables (> 1MB) sent via HTTP fileserver.
### Julia (Sender)
```julia
using Arrow
using DataFrames
# Create large DataFrame (500MB, 10 million rows)
df = DataFrame(
id = 1:10_000_000,
value = rand(10_000_000),
category = rand(["A", "B", "C"], 10_000_000)
)
# Convert to Arrow IPC stream and send
await SmartSend("analysis_results", df, "table");
```
### JavaScript (Receiver)
```javascript
const { SmartReceive } = require('./js_bridge');
// Receive message with URL
const result = await SmartReceive(msg);
// Fetch data from HTTP server
const table = result.data;
// Load into Perspective.js or D3
// Use table data for visualization
```
## Scenario 3: Live Binary Processing
Tests binary data (binary) sent from JS to Julia for FFT/transcription.
### JavaScript (Sender)
```javascript
const { SmartSend } = require('./js_bridge');
// Capture binary chunk (2 seconds, 44.1kHz, 1 channel)
const binaryData = await navigator.mediaDevices.getUserMedia({ binary: true });
// Send as binary with metadata headers
await SmartSend("binary_input", binaryData, "binary", {
metadata: {
sample_rate: 44100,
channels: 1
}
});
```
### Julia (Receiver)
```julia
using WAV
using DSP
# Receive binary data
function process_binary(data)
# Perform FFT or AI transcription
spectrum = fft(data)
# Send results back (JSON + Arrow table)
results = Dict("transcription" => "sample text", "spectrum" => spectrum)
await SmartSend("binary_output", results, "json")
end
```
## Scenario 4: Catch-Up (JetStream)
Tests temporal decoupling with NATS JetStream.
### Julia (Producer)
```julia
# Publish to JetStream
using NATS
function publish_health_status(nats)
jetstream = JetStream(nats, "health_updates")
while true
status = Dict("cpu" => rand(), "memory" => rand())
publish(jetstream, "health", status)
sleep(5) # Every 5 seconds
end
end
```
### JavaScript (Consumer)
```javascript
const { connect } = require('nats');
const nc = await connect({ servers: ['nats://localhost:4222'] });
const js = nc.jetstream();
// Request replay from last 10 minutes
const consumer = await js.pullSubscribe("health", {
durable_name: "catchup",
max_batch: 100,
max_ack_wait: 30000
});
// Process historical and real-time messages
for await (const msg of consumer) {
const result = await SmartReceive(msg);
// Process the data
msg.ack();
}

View File

@@ -16,8 +16,14 @@ const FILESERVER_URL = "http://192.168.88.104:8080"
# Create correlation ID for tracing # Create correlation ID for tracing
correlation_id = string(uuid4()) correlation_id = string(uuid4())
# ------------------------------------------------------------------------------------------------ #
# test file transfer #
# ------------------------------------------------------------------------------------------------ #
# File path for large binary payload test # File path for large binary payload test
const LARGE_FILE_PATH = "./test.zip" const FILE_PATH = "./testFile_small.zip"
const filename = basename(FILE_PATH)
# Helper: Log with correlation ID # Helper: Log with correlation ID
function log_trace(message) function log_trace(message)
@@ -29,8 +35,8 @@ end
function test_large_binary_send() function test_large_binary_send()
conn = NATS.connect(NATS_URL) conn = NATS.connect(NATS_URL)
# Read the large file as binary data # Read the large file as binary data
log_trace("Reading large file: $LARGE_FILE_PATH") log_trace("Reading large file: $FILE_PATH")
file_data = read(LARGE_FILE_PATH) file_data = read(FILE_PATH)
file_size = length(file_data) file_size = length(file_data)
log_trace("File size: $file_size bytes") log_trace("File size: $file_size bytes")
@@ -43,7 +49,7 @@ function test_large_binary_send()
"binary", "binary",
nats_url = NATS_URL, nats_url = NATS_URL,
fileserver_url = FILESERVER_URL; fileserver_url = FILESERVER_URL;
dataname="test.zip" dataname=filename
) )
log_trace("Sent message with transport: $(env.transport)") log_trace("Sent message with transport: $(env.transport)")
@@ -65,17 +71,15 @@ function test_large_binary_receive()
conn = NATS.connect(NATS_URL) conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg NATS.subscribe(conn, SUBJECT) do msg
log_trace("Received message on $(msg.subject)") log_trace("Received message on $(msg.subject)")
log_trace("Received message:\n$msg")
# Use SmartReceive to handle the data
result = SmartReceive(msg)
# Use NATSBridge.smartreceive to handle the data
result = NATSBridge.smartreceive(msg)
# Check transport type # Check transport type
if result.envelope.transport == "direct" if result.envelope.transport == "direct"
log_trace("Received direct transport with $(length(result.data)) bytes") log_trace("Received direct transport")
else else
# For link transport, result.data is the URL # For link transport, result.data is the URL
log_trace("Received link transport at $(result.data)") log_trace("Received link transport")
end end
# Verify the received data matches the original # Verify the received data matches the original
@@ -85,37 +89,102 @@ function test_large_binary_receive()
log_trace("Received $(file_size) bytes of binary data") log_trace("Received $(file_size) bytes of binary data")
# Save received data to a test file # Save received data to a test file
output_path = "test_output.bin" println("metadata ", result.envelope.metadata)
dataname = result.envelope.metadata["dataname"]
if dataname != "NA"
output_path = "./new_$dataname"
write(output_path, result.data) write(output_path, result.data)
log_trace("Saved received data to $output_path") log_trace("Saved received data to $output_path")
end
# Verify file size # Verify file size
original_size = length(read(LARGE_FILE_PATH)) original_size = length(read(FILE_PATH))
if file_size == original_size if file_size == result.envelope.metadata["content_length"]
log_trace("SUCCESS: File size matches! Original: $original_size bytes") log_trace("SUCCESS: File size matches! Original: $(result.envelope.metadata["content_length"]) bytes")
else else
log_trace("WARNING: File size mismatch! Original: $original_size, Received: $file_size") log_trace("WARNING: File size mismatch! Original: $(result.envelope.metadata["content_length"]), Received: $file_size")
end end
end end
end end
end end
# Keep listening for 10 seconds # Keep listening for 10 seconds
sleep(10) sleep(120)
NATS.drain(conn) NATS.drain(conn)
end end
# Run the test # Run the test
println("Starting large binary payload test...") println("Starting large binary payload test...")
println("Correlation ID: $correlation_id") println("Correlation ID: $correlation_id")
println("Large file: $LARGE_FILE_PATH") println("File: $FILE_PATH")
# Run sender first # Run sender first
println("start smartsend") println("start smartsend")
test_large_binary_send() test_large_binary_send()
# Run receiver # # Run receiver
println("testing smartreceive") # println("testing smartreceive")
test_large_binary_receive() # test_large_binary_receive()
println("Test completed.") println("Test completed.")

BIN
testFile_small.zip Normal file

Binary file not shown.