# Implementation Guide: Bi-Directional Data Bridge ## Overview This document describes the implementation of the high-performance, bi-directional data bridge between Julia and JavaScript services using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. ## Architecture The implementation follows the Claim-Check pattern: ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ SmartSend Function │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ Is payload size < 1MB? │ └─────────────────────────────────────────────────────────────────────────┘ │ ┌─────────────────┴─────────────────┐ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ │ Direct Path │ │ Link Path │ │ (< 1MB) │ │ (> 1MB) │ │ │ │ │ │ • Serialize to │ │ • Serialize to │ │ IOBuffer │ │ IOBuffer │ │ • Base64 encode │ │ • Upload to │ │ • Publish to │ │ HTTP Server │ │ NATS │ │ • Publish to │ │ │ │ NATS with URL │ └─────────────────┘ └─────────────────┘ ``` ## Files ### Julia Module: [`src/julia_bridge.jl`](../src/julia_bridge.jl) The Julia implementation provides: - **[`MessageEnvelope`](../src/julia_bridge.jl)**: Struct for the unified JSON envelope - **[`SmartSend()`](../src/julia_bridge.jl)**: Handles transport selection based on payload size - **[`SmartReceive()`](../src/julia_bridge.jl)**: Handles both direct and link transport ### JavaScript Module: [`src/js_bridge.js`](../src/js_bridge.js) The JavaScript implementation provides: - **`MessageEnvelope` class**: For the unified JSON envelope - **[`SmartSend()`](../src/js_bridge.js)**: Handles transport selection based on payload size - **[`SmartReceive()`](../src/js_bridge.js)**: Handles both direct and link transport ## Installation ### Julia Dependencies ```julia using Pkg Pkg.add("NATS") Pkg.add("Arrow") Pkg.add("JSON3") Pkg.add("HTTP") Pkg.add("UUIDs") Pkg.add("Dates") ``` ### JavaScript Dependencies ```bash npm install nats.js apache-arrow uuid base64-url ``` ## Usage Tutorial ### Step 1: Start NATS Server ```bash docker run -p 4222:4222 nats:latest ``` ### Step 2: Start HTTP File Server (optional) ```bash # Create a directory for file uploads mkdir -p /tmp/fileserver # Use any HTTP server that supports POST for file uploads # Example: Python's built-in server python3 -m http.server 8080 --directory /tmp/fileserver ``` ### Step 3: Run Test Scenarios ```bash # Scenario 1: Command & Control (JavaScript sender) node test/scenario1_command_control.js # Scenario 2: Large Arrow Table (JavaScript sender) node test/scenario2_large_table.js # Scenario 3: Julia-to-Julia communication # Run both Julia and JavaScript versions julia test/scenario3_julia_to_julia.jl node test/scenario3_julia_to_julia.js ``` ## Usage ### Scenario 1: Command & Control (Small JSON) #### JavaScript (Sender) ```javascript const { SmartSend } = require('./js_bridge'); const config = { step_size: 0.01, iterations: 1000 }; await SmartSend("control", config, "json", { correlationId: "unique-id" }); ``` #### Julia (Receiver) ```julia using NATS using JSON3 # Subscribe to control subject subscribe(nats, "control") do msg env = MessageEnvelope(String(msg.data)) config = JSON3.read(env.payload) # Execute simulation with parameters step_size = config.step_size iterations = config.iterations # Send acknowledgment response = Dict("status" => "Running", "correlation_id" => env.correlation_id) publish(nats, "control_response", JSON3.stringify(response)) end ``` ### Scenario 2: Deep Dive Analysis (Large Arrow Table) #### Julia (Sender) ```julia using Arrow using DataFrames # Create large DataFrame df = DataFrame( id = 1:10_000_000, value = rand(10_000_000), category = rand(["A", "B", "C"], 10_000_000) ) # Send via SmartSend with type="table" await SmartSend("analysis_results", df, "table"); ``` #### JavaScript (Receiver) ```javascript const { SmartReceive } = require('./js_bridge'); const result = await SmartReceive(msg); // Use table data for visualization with Perspective.js or D3 const table = result.data; ``` ### Scenario 3: Live Binary Processing #### JavaScript (Sender) ```javascript const { SmartSend } = require('./js_bridge'); // Capture binary chunk const binaryData = await navigator.mediaDevices.getUserMedia({ binary: true }); await SmartSend("binary_input", binaryData, "binary", { metadata: { sample_rate: 44100, channels: 1 } }); ``` #### Julia (Receiver) ```julia using WAV using DSP # Receive binary data function process_binary(data) # Perform FFT or AI transcription spectrum = fft(data) # Send results back (JSON + Arrow table) results = Dict("transcription" => "sample text", "spectrum" => spectrum) await SmartSend("binary_output", results, "json") end ``` ### Scenario 4: Catch-Up (JetStream) #### Julia (Producer) ```julia using NATS function publish_health_status(nats) jetstream = JetStream(nats, "health_updates") while true status = Dict("cpu" => rand(), "memory" => rand()) publish(jetstream, "health", status) sleep(5) # Every 5 seconds end end ``` #### JavaScript (Consumer) ```javascript const { connect } = require('nats'); const nc = await connect({ servers: ['nats://localhost:4222'] }); const js = nc.jetstream(); // Request replay from last 10 minutes const consumer = await js.pullSubscribe("health", { durable_name: "catchup", max_batch: 100, max_ack_wait: 30000 }); // Process historical and real-time messages for await (const msg of consumer) { const result = await SmartReceive(msg); // Process the data msg.ack(); } ``` ## Configuration ### Environment Variables | Variable | Default | Description | |----------|---------|-------------| | `NATS_URL` | `nats://localhost:4222` | NATS server URL | | `FILESERVER_URL` | `http://localhost:8080/upload` | HTTP file server URL | | `SIZE_THRESHOLD` | `1_000_000` | Size threshold in bytes (1MB) | ### Message Envelope Schema ```json { "correlation_id": "uuid-v4-string", "type": "json|table|binary", "transport": "direct|link", "payload": "base64-encoded-string", // Only if transport=direct "url": "http://fileserver/path/to/data", // Only if transport=link "metadata": { "content_type": "application/octet-stream", "content_length": 123456, "format": "arrow_ipc_stream" } } ``` ## Performance Considerations ### Zero-Copy Reading - Use Arrow's memory-mapped file reading - Avoid unnecessary data copying during deserialization - Use Apache Arrow's native IPC reader ### Exponential Backoff - Maximum retry count: 5 - Base delay: 100ms, max delay: 5000ms - Implemented in both Julia and JavaScript implementations ### Correlation ID Logging - Log correlation_id at every stage - Include: send, receive, serialize, deserialize - Use structured logging format ## Testing Run the test scripts: ```bash # Scenario 1: Command & Control (JavaScript sender) node test/scenario1_command_control.js # Scenario 2: Large Arrow Table (JavaScript sender) node test/scenario2_large_table.js ``` ## Troubleshooting ### Common Issues 1. **NATS Connection Failed** - Ensure NATS server is running - Check NATS_URL configuration 2. **HTTP Upload Failed** - Ensure file server is running - Check FILESERVER_URL configuration - Verify upload permissions 3. **Arrow IPC Deserialization Error** - Ensure data is properly serialized to Arrow format - Check Arrow version compatibility ## License MIT