321 lines
9.0 KiB
Markdown
321 lines
9.0 KiB
Markdown
# Implementation Guide: Bi-Directional Data Bridge
|
|
|
|
## Overview
|
|
|
|
This document describes the implementation of the high-performance, bi-directional data bridge between Julia and JavaScript services using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
|
|
|
|
## Architecture
|
|
|
|
The implementation follows the Claim-Check pattern:
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────────────┐
|
|
│ SmartSend Function │
|
|
└─────────────────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────────────────┐
|
|
│ Is payload size < 1MB? │
|
|
└─────────────────────────────────────────────────────────────────────────┘
|
|
│
|
|
┌─────────────────┴─────────────────┐
|
|
▼ ▼
|
|
┌─────────────────┐ ┌─────────────────┐
|
|
│ Direct Path │ │ Link Path │
|
|
│ (< 1MB) │ │ (> 1MB) │
|
|
│ │ │ │
|
|
│ • Serialize to │ │ • Serialize to │
|
|
│ IOBuffer │ │ IOBuffer │
|
|
│ • Base64 encode │ │ • Upload to │
|
|
│ • Publish to │ │ HTTP Server │
|
|
│ NATS │ │ • Publish to │
|
|
│ │ │ NATS with URL │
|
|
└─────────────────┘ └─────────────────┘
|
|
```
|
|
|
|
## Files
|
|
|
|
### Julia Module: [`src/julia_bridge.jl`](../src/julia_bridge.jl)
|
|
|
|
The Julia implementation provides:
|
|
|
|
- **[`MessageEnvelope`](../src/julia_bridge.jl)**: Struct for the unified JSON envelope
|
|
- **[`SmartSend()`](../src/julia_bridge.jl)**: Handles transport selection based on payload size
|
|
- **[`SmartReceive()`](../src/julia_bridge.jl)**: Handles both direct and link transport
|
|
|
|
### JavaScript Module: [`src/js_bridge.js`](../src/js_bridge.js)
|
|
|
|
The JavaScript implementation provides:
|
|
|
|
- **`MessageEnvelope` class**: For the unified JSON envelope
|
|
- **[`SmartSend()`](../src/js_bridge.js)**: Handles transport selection based on payload size
|
|
- **[`SmartReceive()`](../src/js_bridge.js)**: Handles both direct and link transport
|
|
|
|
## Installation
|
|
|
|
### Julia Dependencies
|
|
|
|
```julia
|
|
using Pkg
|
|
Pkg.add("NATS")
|
|
Pkg.add("Arrow")
|
|
Pkg.add("JSON3")
|
|
Pkg.add("HTTP")
|
|
Pkg.add("UUIDs")
|
|
Pkg.add("Dates")
|
|
```
|
|
|
|
### JavaScript Dependencies
|
|
|
|
```bash
|
|
npm install nats.js apache-arrow uuid base64-url
|
|
```
|
|
|
|
## Usage Tutorial
|
|
|
|
### Step 1: Start NATS Server
|
|
|
|
```bash
|
|
docker run -p 4222:4222 nats:latest
|
|
```
|
|
|
|
### Step 2: Start HTTP File Server (optional)
|
|
|
|
```bash
|
|
# Create a directory for file uploads
|
|
mkdir -p /tmp/fileserver
|
|
|
|
# Use any HTTP server that supports POST for file uploads
|
|
# Example: Python's built-in server
|
|
python3 -m http.server 8080 --directory /tmp/fileserver
|
|
```
|
|
|
|
### Step 3: Run Test Scenarios
|
|
|
|
```bash
|
|
# Scenario 1: Command & Control (JavaScript sender)
|
|
node test/scenario1_command_control.js
|
|
|
|
# Scenario 2: Large Arrow Table (JavaScript sender)
|
|
node test/scenario2_large_table.js
|
|
|
|
# Scenario 3: Julia-to-Julia communication
|
|
# Run both Julia and JavaScript versions
|
|
julia test/scenario3_julia_to_julia.jl
|
|
node test/scenario3_julia_to_julia.js
|
|
```
|
|
|
|
## Usage
|
|
|
|
### Scenario 1: Command & Control (Small JSON)
|
|
|
|
#### JavaScript (Sender)
|
|
```javascript
|
|
const { SmartSend } = require('./js_bridge');
|
|
|
|
const config = {
|
|
step_size: 0.01,
|
|
iterations: 1000
|
|
};
|
|
|
|
await SmartSend("control", config, "json", {
|
|
correlationId: "unique-id"
|
|
});
|
|
```
|
|
|
|
#### Julia (Receiver)
|
|
```julia
|
|
using NATS
|
|
using JSON3
|
|
|
|
# Subscribe to control subject
|
|
subscribe(nats, "control") do msg
|
|
env = MessageEnvelope(String(msg.data))
|
|
config = JSON3.read(env.payload)
|
|
|
|
# Execute simulation with parameters
|
|
step_size = config.step_size
|
|
iterations = config.iterations
|
|
|
|
# Send acknowledgment
|
|
response = Dict("status" => "Running", "correlation_id" => env.correlation_id)
|
|
publish(nats, "control_response", JSON3.stringify(response))
|
|
end
|
|
```
|
|
|
|
### Scenario 2: Deep Dive Analysis (Large Arrow Table)
|
|
|
|
#### Julia (Sender)
|
|
```julia
|
|
using Arrow
|
|
using DataFrames
|
|
|
|
# Create large DataFrame
|
|
df = DataFrame(
|
|
id = 1:10_000_000,
|
|
value = rand(10_000_000),
|
|
category = rand(["A", "B", "C"], 10_000_000)
|
|
)
|
|
|
|
# Send via SmartSend with type="table"
|
|
await SmartSend("analysis_results", df, "table");
|
|
```
|
|
|
|
#### JavaScript (Receiver)
|
|
```javascript
|
|
const { SmartReceive } = require('./js_bridge');
|
|
|
|
const result = await SmartReceive(msg);
|
|
|
|
// Use table data for visualization with Perspective.js or D3
|
|
const table = result.data;
|
|
```
|
|
|
|
### Scenario 3: Live Binary Processing
|
|
|
|
#### JavaScript (Sender)
|
|
```javascript
|
|
const { SmartSend } = require('./js_bridge');
|
|
|
|
// Capture binary chunk
|
|
const binaryData = await navigator.mediaDevices.getUserMedia({ binary: true });
|
|
|
|
await SmartSend("binary_input", binaryData, "binary", {
|
|
metadata: {
|
|
sample_rate: 44100,
|
|
channels: 1
|
|
}
|
|
});
|
|
```
|
|
|
|
#### Julia (Receiver)
|
|
```julia
|
|
using WAV
|
|
using DSP
|
|
|
|
# Receive binary data
|
|
function process_binary(data)
|
|
# Perform FFT or AI transcription
|
|
spectrum = fft(data)
|
|
|
|
# Send results back (JSON + Arrow table)
|
|
results = Dict("transcription" => "sample text", "spectrum" => spectrum)
|
|
await SmartSend("binary_output", results, "json")
|
|
end
|
|
```
|
|
|
|
### Scenario 4: Catch-Up (JetStream)
|
|
|
|
#### Julia (Producer)
|
|
```julia
|
|
using NATS
|
|
|
|
function publish_health_status(nats)
|
|
jetstream = JetStream(nats, "health_updates")
|
|
|
|
while true
|
|
status = Dict("cpu" => rand(), "memory" => rand())
|
|
publish(jetstream, "health", status)
|
|
sleep(5) # Every 5 seconds
|
|
end
|
|
end
|
|
```
|
|
|
|
#### JavaScript (Consumer)
|
|
```javascript
|
|
const { connect } = require('nats');
|
|
|
|
const nc = await connect({ servers: ['nats://localhost:4222'] });
|
|
const js = nc.jetstream();
|
|
|
|
// Request replay from last 10 minutes
|
|
const consumer = await js.pullSubscribe("health", {
|
|
durable_name: "catchup",
|
|
max_batch: 100,
|
|
max_ack_wait: 30000
|
|
});
|
|
|
|
// Process historical and real-time messages
|
|
for await (const msg of consumer) {
|
|
const result = await SmartReceive(msg);
|
|
// Process the data
|
|
msg.ack();
|
|
}
|
|
```
|
|
|
|
## Configuration
|
|
|
|
### Environment Variables
|
|
|
|
| Variable | Default | Description |
|
|
|----------|---------|-------------|
|
|
| `NATS_URL` | `nats://localhost:4222` | NATS server URL |
|
|
| `FILESERVER_URL` | `http://localhost:8080/upload` | HTTP file server URL |
|
|
| `SIZE_THRESHOLD` | `1_000_000` | Size threshold in bytes (1MB) |
|
|
|
|
### Message Envelope Schema
|
|
|
|
```json
|
|
{
|
|
"correlation_id": "uuid-v4-string",
|
|
"type": "json|table|binary",
|
|
"transport": "direct|link",
|
|
"payload": "base64-encoded-string", // Only if transport=direct
|
|
"url": "http://fileserver/path/to/data", // Only if transport=link
|
|
"metadata": {
|
|
"content_type": "application/octet-stream",
|
|
"content_length": 123456,
|
|
"format": "arrow_ipc_stream"
|
|
}
|
|
}
|
|
```
|
|
|
|
## Performance Considerations
|
|
|
|
### Zero-Copy Reading
|
|
- Use Arrow's memory-mapped file reading
|
|
- Avoid unnecessary data copying during deserialization
|
|
- Use Apache Arrow's native IPC reader
|
|
|
|
### Exponential Backoff
|
|
- Maximum retry count: 5
|
|
- Base delay: 100ms, max delay: 5000ms
|
|
- Implemented in both Julia and JavaScript implementations
|
|
|
|
### Correlation ID Logging
|
|
- Log correlation_id at every stage
|
|
- Include: send, receive, serialize, deserialize
|
|
- Use structured logging format
|
|
|
|
## Testing
|
|
|
|
Run the test scripts:
|
|
|
|
```bash
|
|
# Scenario 1: Command & Control (JavaScript sender)
|
|
node test/scenario1_command_control.js
|
|
|
|
# Scenario 2: Large Arrow Table (JavaScript sender)
|
|
node test/scenario2_large_table.js
|
|
```
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
1. **NATS Connection Failed**
|
|
- Ensure NATS server is running
|
|
- Check NATS_URL configuration
|
|
|
|
2. **HTTP Upload Failed**
|
|
- Ensure file server is running
|
|
- Check FILESERVER_URL configuration
|
|
- Verify upload permissions
|
|
|
|
3. **Arrow IPC Deserialization Error**
|
|
- Ensure data is properly serialized to Arrow format
|
|
- Check Arrow version compatibility
|
|
|
|
## License
|
|
|
|
MIT |