Files
NATSBridge/docs/IMPLEMENTATION.md
2026-02-14 13:04:28 +07:00

11 KiB

Implementation Guide: Bi-Directional Data Bridge

Overview

This document describes the implementation of the high-performance, bi-directional data bridge between Julia and JavaScript services using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.

Multi-Payload Support

The implementation uses a standardized list-of-tuples format for all payload operations. Even when sending a single payload, the user must wrap it in a list.

API Standard:

# Input format for smartsend (always a list of tuples)
[(dataname1, data1), (dataname2, data2), ...]

# Output format for smartreceive (always returns a list of tuples)
[(dataname1, data1), (dataname2, data2), ...]

Examples:

# Single payload - still wrapped in a list
smartsend("/test", [(dataname1, data1)], ...)

# Multiple payloads in one message
smartsend("/test", [(dataname1, data1), (dataname2, data2)], ...)

# Receive always returns a list
payloads = smartreceive(msg, ...)
# payloads = [(dataname1, data1), (dataname2, data2), ...]

Architecture

The implementation follows the Claim-Check pattern:

┌─────────────────────────────────────────────────────────────────────────┐
│                         SmartSend Function                              │
└─────────────────────────────────────────────────────────────────────────┘
                             │
                             ▼
┌─────────────────────────────────────────────────────────────────────────┐
│  Is payload size < 1MB?                                                 │
└─────────────────────────────────────────────────────────────────────────┘
                             │
           ┌─────────────────┴─────────────────┐
           ▼                                   ▼
     ┌─────────────────┐                 ┌─────────────────┐
     │  Direct Path    │                 │  Link Path      │
     │  (< 1MB)        │                 │  (> 1MB)        │
     │                 │                 │                 │
     │ • Serialize to  │                 │ • Serialize to  │
     │   IOBuffer      │                 │   IOBuffer      │
     │ • Base64 encode │                 │ • Upload to     │
     │ • Publish to    │                 │   HTTP Server   │
     │   NATS          │                 │ • Publish to    │
     │                 │                 │   NATS with URL │
     └─────────────────┘                 └─────────────────┘

Files

Julia Module: src/julia_bridge.jl

The Julia implementation provides:

JavaScript Module: src/js_bridge.js

The JavaScript implementation provides:

  • MessageEnvelope class: For the unified JSON envelope
  • SmartSend(): Handles transport selection based on payload size
  • SmartReceive(): Handles both direct and link transport

Installation

Julia Dependencies

using Pkg
Pkg.add("NATS")
Pkg.add("Arrow")
Pkg.add("JSON3")
Pkg.add("HTTP")
Pkg.add("UUIDs")
Pkg.add("Dates")

JavaScript Dependencies

npm install nats.js apache-arrow uuid base64-url

Usage Tutorial

Step 1: Start NATS Server

docker run -p 4222:4222 nats:latest

Step 2: Start HTTP File Server (optional)

# Create a directory for file uploads
mkdir -p /tmp/fileserver

# Use any HTTP server that supports POST for file uploads
# Example: Python's built-in server
python3 -m http.server 8080 --directory /tmp/fileserver

Step 3: Run Test Scenarios

# Scenario 1: Command & Control (JavaScript sender)
node test/scenario1_command_control.js

# Scenario 2: Large Arrow Table (JavaScript sender)
node test/scenario2_large_table.js

# Scenario 3: Julia-to-Julia communication
# Run both Julia and JavaScript versions
julia test/scenario3_julia_to_julia.jl
node test/scenario3_julia_to_julia.js

Usage

Scenario 0: Basic Multi-Payload Example

Julia (Sender)

using NATSBridge

# Send multiple payloads in one message
smartsend(
    "/test",
    [("dataname1", data1), ("dataname2", data2)],
    nats_url="nats://localhost:4222",
    fileserver_url="http://localhost:8080/upload",
    metadata=Dict("custom_key" => "custom_value")
)

# Even single payload must be wrapped in a list
smartsend("/test", [("single_data", mydata)])

Julia (Receiver)

using NATSBridge

# Receive returns a list of payloads
payloads = smartreceive(msg, "http://localhost:8080/upload")
# payloads = [(dataname1, data1), (dataname2, data2), ...]

Scenario 1: Command & Control (Small JSON)

JavaScript (Sender)

const { SmartSend } = require('./js_bridge');

// Single payload wrapped in a list
const config = [{
    dataname: "config",
    data: { step_size: 0.01, iterations: 1000 },
    type: "json"
}];

await SmartSend("control", config, "json", {
    correlationId: "unique-id"
});

// Multiple payloads
const configs = [
    {
        dataname: "config1",
        data: { step_size: 0.01 },
        type: "json"
    },
    {
        dataname: "config2", 
        data: { iterations: 1000 },
        type: "json"
    }
];

await SmartSend("control", configs, "json");

Julia (Receiver)

using NATS
using JSON3

# Subscribe to control subject
subscribe(nats, "control") do msg
    env = MessageEnvelope(String(msg.data))
    config = JSON3.read(env.payload)
    
    # Execute simulation with parameters
    step_size = config.step_size
    iterations = config.iterations
    
    # Send acknowledgment
    response = Dict("status" => "Running", "correlation_id" => env.correlation_id)
    publish(nats, "control_response", JSON3.stringify(response))
end

Scenario 2: Deep Dive Analysis (Large Arrow Table)

Julia (Sender)

using Arrow
using DataFrames

# Create large DataFrame
df = DataFrame(
    id = 1:10_000_000,
    value = rand(10_000_000),
    category = rand(["A", "B", "C"], 10_000_000)
)

# Send via SmartSend - wrapped in a list
await SmartSend("analysis_results", [("table_data", df)], "table");

JavaScript (Receiver)

const { SmartReceive } = require('./js_bridge');

const result = await SmartReceive(msg);

// Use table data for visualization with Perspective.js or D3
const table = result.data;

Scenario 3: Live Binary Processing

JavaScript (Sender)

const { SmartSend } = require('./js_bridge');

// Binary data wrapped in a list
const binaryData = [{
    dataname: "audio_chunk",
    data: binaryBuffer,
    type: "binary"
}];

await SmartSend("binary_input", binaryData, "binary", {
    metadata: {
        sample_rate: 44100,
        channels: 1
    }
});

Julia (Receiver)

using WAV
using DSP

# Receive binary data
function process_binary(data)
    # Perform FFT or AI transcription
    spectrum = fft(data)
    
    # Send results back (JSON + Arrow table)
    results = Dict("transcription" => "sample text", "spectrum" => spectrum)
    await SmartSend("binary_output", results, "json")
end

Scenario 4: Catch-Up (JetStream)

Julia (Producer)

using NATSBridge

function publish_health_status(nats_url)
    # Send status wrapped in a list
    status = Dict("cpu" => rand(), "memory" => rand())
    smartsend("health", [("status", status)], "json", nats_url=nats_url)
    sleep(5)  # Every 5 seconds
end

JavaScript (Consumer)

const { connect } = require('nats');

const nc = await connect({ servers: ['nats://localhost:4222'] });
const js = nc.jetstream();

// Request replay from last 10 minutes
const consumer = await js.pullSubscribe("health", { 
    durable_name: "catchup",
    max_batch: 100,
    max_ack_wait: 30000
});

// Process historical and real-time messages
for await (const msg of consumer) {
    const result = await SmartReceive(msg);
    // result.data contains the list of payloads
    // result.envelope contains the message envelope
    msg.ack();
}

Configuration

Environment Variables

Variable Default Description
NATS_URL nats://localhost:4222 NATS server URL
FILESERVER_URL http://localhost:8080/upload HTTP file server URL
SIZE_THRESHOLD 1_000_000 Size threshold in bytes (1MB)

Message Envelope Schema

{
  "correlationId": "uuid-v4-string",
  "msgId": "uuid-v4-string",
  "timestamp": "2024-01-15T10:30:00Z",
  
  "sendTo": "topic/subject",
  "msgPurpose": "ACK | NACK | updateStatus | shutdown | chat",
  "senderName": "agent-wine-web-frontend",
  "senderId": "uuid4",
  "receiverName": "agent-backend",
  "receiverId": "uuid4",
  "replyTo": "topic",
  "replyToMsgId": "uuid4",
  "BrokerURL": "nats://localhost:4222",
  
  "metadata": {
    "content_type": "application/octet-stream",
    "content_length": 123456
  },
  
  "payloads": [
    {
      "id": "uuid4",
      "dataname": "login_image",
      "type": "image",
      "transport": "direct",
      "encoding": "base64",
      "size": 15433,
      "data": "base64-encoded-string",
      "metadata": {
        "checksum": "sha256_hash"
      }
    }
  ]
}

Performance Considerations

Zero-Copy Reading

  • Use Arrow's memory-mapped file reading
  • Avoid unnecessary data copying during deserialization
  • Use Apache Arrow's native IPC reader

Exponential Backoff

  • Maximum retry count: 5
  • Base delay: 100ms, max delay: 5000ms
  • Implemented in both Julia and JavaScript implementations

Correlation ID Logging

  • Log correlation_id at every stage
  • Include: send, receive, serialize, deserialize
  • Use structured logging format

Testing

Run the test scripts:

# Scenario 1: Command & Control (JavaScript sender)
node test/scenario1_command_control.js

# Scenario 2: Large Arrow Table (JavaScript sender)
node test/scenario2_large_table.js

Troubleshooting

Common Issues

  1. NATS Connection Failed

    • Ensure NATS server is running
    • Check NATS_URL configuration
  2. HTTP Upload Failed

    • Ensure file server is running
    • Check FILESERVER_URL configuration
    • Verify upload permissions
  3. Arrow IPC Deserialization Error

    • Ensure data is properly serialized to Arrow format
    • Check Arrow version compatibility

License

MIT