Files
NATSBridge/examples/walkthrough.md
2026-03-06 08:19:15 +07:00

36 KiB

Cross-Platform NATSBridge Walkthrough

A comprehensive guide to building real-world applications with NATSBridge across Julia, JavaScript, and Python/MicroPython.

Table of Contents

  1. Introduction
  2. Architecture Overview
  3. Building a Chat Application
  4. Building a File Transfer System
  5. Building a Streaming Data Pipeline
  6. Performance Optimization
  7. Best Practices

Introduction

This walkthrough will guide you through building several real-world applications using NATSBridge. We'll cover:

  • Chat applications with rich media support
  • File transfer systems with claim-check pattern
  • Streaming data pipelines

Each section builds on the previous one, gradually increasing in complexity.


Architecture Overview

Cross-Platform System Components

flowchart TB
    subgraph JuliaApp["Julia Application"]
        JuliaAppCode[App Code]
        JuliaBridge[NATSBridge.jl]
        JuliaNATS[<b>NATS.jl</b>]
    end

    subgraph JSApp["JavaScript Application"]
        JSAppCode[App Code]
        JSBridge[NATSBridge.js]
        JSNATS[<b>nats.js</b>]
    end

    subgraph PythonApp["Python Application"]
        PythonAppCode[App Code]
        PythonBridge[NATSBridge.py]
        PythonNATS[<b>nats-py</b>]
    end

    subgraph Infrastructure["Infrastructure"]
        NATS[<b>NATS Server</b><br/>Message Broker]
        FileServer[<b>HTTP File Server</b><br/>Upload/Download]
    end

    JuliaAppCode --> JuliaBridge
    JuliaBridge --> JuliaNATS
    JSAppCode --> JSBridge
    JSBridge --> JSNATS
    PythonAppCode --> PythonBridge
    PythonBridge --> PythonNATS

    JuliaNATS --> NATS
    JSNATS --> NATS
    PythonNATS --> NATS

    NATS --> JuliaNATS
    NATS --> JSNATS
    NATS --> PythonNATS

    JuliaBridge -.->|HTTP POST upload| FileServer
    JSBridge -.->|HTTP POST upload| FileServer
    PythonBridge -.->|HTTP POST upload| FileServer

    FileServer -.->|HTTP GET download| JuliaBridge
    FileServer -.->|HTTP GET download| JSBridge
    FileServer -.->|HTTP GET download| PythonBridge

    style JuliaApp fill:#c5e1a5
    style JSApp fill:#bbdefb
    style PythonApp fill:#f8bbd0
    style NATS fill:#fff3e0
    style FileServer fill:#f3e5f5

Message Flow

  1. Sender creates a message envelope with payloads
  2. NATSBridge serializes and encodes payloads
  3. Transport Decision: Small payloads go directly to NATS, large payloads are uploaded to file server
  4. NATS routes messages to subscribers
  5. Receiver fetches payloads (from NATS or file server)
  6. NATSBridge deserializes and decodes payloads

Building a Chat Application

Let's build a full-featured chat application that supports text, images, and file attachments.

Step 1: Set Up the Project

# Create project directory
mkdir -p chat-app/src
cd chat-app

# Create configuration file
cat > config.json << 'EOF'
{
  "nats_url": "nats://localhost:4222",
  "fileserver_url": "http://localhost:8080",
  "size_threshold": 1048576
}
EOF

Step 2: Create the Chat Interface

Julia

# src/chat_ui.jl
using NATSBridge, NATS

struct ChatUI
    messages::Vector{Dict}
    current_room::String
end

function ChatUI()
    ChatUI(Dict[], "")
end

function send_message(ui::ChatUI, message_input::String, selected_file::Union{Nothing, String})
    data = []
    
    # Add text message
    if !isempty(message_input)
        push!(data, ("text", message_input, "text"))
    end
    
    # Add file if selected
    if selected_file !== nothing
        file_data = read(selected_file)
        file_type = get_file_type(selected_file)
        push!(data, ("attachment", file_data, file_type))
    end
    
    return data
end

function get_file_type(filename::String)::String
    if endswith(filename, ".png") || endswith(filename, ".jpg")
        return "image"
    elseif endswith(filename, ".mp3") || endswith(filename, ".wav")
        return "audio"
    elseif endswith(filename, ".mp4") || endswith(filename, ".avi")
        return "video"
    else
        return "binary"
    end
end

function add_message(ui::ChatUI, user::String, text::String, attachment::Union{Nothing, Dict})
    push!(ui.messages, Dict(
        "user" => user,
        "text" => text,
        "attachment" => attachment
    ))
end

JavaScript

// src/chat_ui.js
const NATSBridge = require('./src/natsbridge.js');

class ChatUI {
    constructor() {
        this.messages = [];
        this.currentRoom = "";
    }

    sendMessage(messageInput, selectedFile = null) {
        const data = [];
        
        // Add text message
        if (messageInput.length > 0) {
            data.push(["text", messageInput, "text"]);
        }
        
        // Add file if selected
        if (selectedFile !== null) {
            const fileData = fs.readFileSync(selectedFile);
            const fileType = this.getFileType(selectedFile);
            data.push(["attachment", fileData, fileType]);
        }
        
        return data;
    }

    getFileType(filename) {
        if (filename.endsWith('.png') || filename.endsWith('.jpg')) {
            return 'image';
        } else if (filename.endsWith('.mp3') || filename.endsWith('.wav')) {
            return 'audio';
        } else if (filename.endsWith('.mp4') || filename.endsWith('.avi')) {
            return 'video';
        } else {
            return 'binary';
        }
    }

    addMessage(user, text, attachment = null) {
        this.messages.push({
            user,
            text,
            attachment
        });
    }
}

module.exports = ChatUI;

Python

# src/chat_ui.py
from typing import List, Dict, Optional, Union

class ChatUI:
    def __init__(self):
        self.messages: List[Dict] = []
        self.current_room: str = ""

    def send_message(self, message_input: str, selected_file: Optional[str] = None) -> List[tuple]:
        data = []
        
        # Add text message
        if message_input:
            data.append(("text", message_input, "text"))
        
        # Add file if selected
        if selected_file:
            with open(selected_file, "rb") as f:
                file_data = f.read()
            file_type = self.get_file_type(selected_file)
            data.append(("attachment", file_data, file_type))
        
        return data

    def get_file_type(self, filename: str) -> str:
        if filename.endswith(('.png', '.jpg')):
            return "image"
        elif filename.endswith(('.mp3', '.wav')):
            return "audio"
        elif filename.endswith(('.mp4', '.avi')):
            return "video"
        else:
            return "binary"

    def add_message(self, user: str, text: str, attachment: Optional[Dict] = None):
        self.messages.append({
            "user": user,
            "text": text,
            "attachment": attachment
        })

Step 3: Create the Message Handler

Julia

# src/chat_handler.jl
using NATSBridge, NATS

struct ChatHandler
    nats::NATS.Connection
    ui::ChatUI
end

function ChatHandler(nats_connection::NATS.Connection)
    ChatHandler(nats_connection, ChatUI())
end

function start(handler::ChatHandler)
    # Subscribe to chat rooms
    rooms = ["general", "tech", "random"]
    
    for room in rooms
        NATS.subscribe(handler.nats, "/chat/$room") do msg
            handle_message(handler, msg)
        end
    end
    
    println("Chat handler started")
end

function handle_message(handler::ChatHandler, msg::NATS.Msg)
    env = smartreceive(msg, fileserver_download_handler=_fetch_with_backoff)
    
    # Extract sender info from envelope
    sender = get(env, "sender_name", "Anonymous")
    
    # Process each payload
    for (dataname, data, type) in env["payloads"]
        if type == "text"
            add_message(handler.ui, sender, data, nothing)
        elseif type == "image"
            # Convert to data URL for display
            base64_data = base64encode(data)
            attachment = Dict(
                "type" => "image",
                "data" => "data:image/png;base64,$base64_data"
            )
            add_message(handler.ui, sender, "", attachment)
        else
            # For other types, use file server URL
            attachment = Dict("type" => type, "data" => data)
            add_message(handler.ui, sender, "", attachment)
        end
    end
end

JavaScript

// src/chat_handler.js
const NATSBridge = require('./src/natsbridge.js');
const nats = require('nats');

class ChatHandler {
    constructor(natsConnection) {
        this.nats = natsConnection;
        this.ui = new (require('./chat_ui.js'))();
    }

    async start() {
        // Subscribe to chat rooms
        const rooms = ['general', 'tech', 'random'];
        
        for (const room of rooms) {
            this.nats.subscribe(`/chat/${room}`, async (msg) => {
                await this.handleMessage(msg);
            });
        }
        
        console.log('Chat handler started');
    }

    async handleMessage(msg) {
        const env = await NATSBridge.smartreceive(msg, {
            fileserver_download_handler: NATSBridge.fetchWithBackoff
        });
        
        // Extract sender info from envelope
        const sender = env.sender_name || 'Anonymous';
        
        // Process each payload
        for (const [dataname, data, type] of env.payloads) {
            if (type === 'text') {
                this.ui.addMessage(sender, data, null);
            } else if (type === 'image') {
                // Convert to data URL for display
                const base64Data = Buffer.from(data).toString('base64');
                const attachment = {
                    type: 'image',
                    data: `data:image/png;base64,${base64Data}`
                };
                this.ui.addMessage(sender, '', attachment);
            } else {
                // For other types, use file server URL
                const attachment = { type, data };
                this.ui.addMessage(sender, '', attachment);
            }
        }
    }
}

module.exports = ChatHandler;

Python

# src/chat_handler.py
import asyncio
from typing import Optional
from natsbridge import smartreceive, fetch_with_backoff

class ChatHandler:
    def __init__(self, nats_connection):
        self.nats = nats_connection
        self.ui = ChatUI()

    async def start(self):
        # Subscribe to chat rooms
        rooms = ['general', 'tech', 'random']
        
        for room in rooms:
            await self.nats.subscribe(
                f'/chat/{room}',
                callback=self.handle_message
            )
        
        print('Chat handler started')

    async def handle_message(self, msg):
        env = await smartreceive(
            msg,
            fileserver_download_handler=fetch_with_backoff
        )
        
        # Extract sender info from envelope
        sender = env.get('sender_name', 'Anonymous')
        
        # Process each payload
        for dataname, data, type_ in env['payloads']:
            if type_ == 'text':
                self.ui.add_message(sender, data, None)
            elif type_ == 'image':
                # Convert to data URL for display
                import base64
                base64_data = base64.b64encode(data).decode('utf-8')
                attachment = {
                    'type': 'image',
                    'data': f'data:image/png;base64,{base64_data}'
                }
                self.ui.add_message(sender, '', attachment)
            else:
                # For other types, use file server URL or data
                attachment = {'type': type_, 'data': data}
                self.ui.add_message(sender, '', attachment)

Step 4: Run the Application

# Start NATS
docker run -p 4222:4222 nats:latest

# Start file server
mkdir -p /tmp/fileserver
python3 -m http.server 8080 --directory /tmp/fileserver

# Run chat app
# Julia
julia src/chat_ui.jl
julia src/chat_handler.jl

# JavaScript
node src/chat_ui.js
node src/chat_handler.js

# Python
python3 src/chat_ui.py
python3 src/chat_handler.py

Building a File Transfer System

Let's build a file transfer system that handles large files efficiently.

Step 1: File Upload Service

Julia

# src/file_upload_service.jl
using NATSBridge, HTTP

struct FileUploadService
    broker_url::String
    fileserver_url::String
end

function FileUploadService(broker_url::String, fileserver_url::String)
    FileUploadService(broker_url, fileserver_url)
end

function upload_file(service::FileUploadService, file_path::String, recipient::String)::Dict
    file_data = read(file_path)
    file_name = basename(file_path)
    
    data = [("file", file_data, "binary")]
    
    env, env_json_str = smartsend(
        "/files/$recipient",
        data,
        broker_url=service.broker_url,
        fileserver_url=service.fileserver_url
    )
    
    return env
end

function upload_large_file(service::FileUploadService, file_path::String, recipient::String)::Dict
    file_size = stat(file_path).size
    
    if file_size > 100 * 1024 * 1024  # > 100MB
        println("File too large for direct upload, using streaming...")
        return stream_upload(service, file_path, recipient)
    end
    
    return upload_file(service, file_path, recipient)
end

function stream_upload(service::FileUploadService, file_path::String, recipient::String)::Dict
    # Implement streaming upload to file server
    # This would require a more sophisticated file server
    # For now, we'll use the standard upload
    return upload_file(service, file_path, recipient)
end

JavaScript

// src/file_upload_service.js
const NATSBridge = require('./src/natsbridge.js');
const fs = require('fs');

class FileUploadService {
    constructor(brokerUrl, fileserverUrl) {
        this.broker_url = brokerUrl;
        this.fileserver_url = fileserverUrl;
    }

    async uploadFile(filePath, recipient) {
        const fileData = fs.readFileSync(filePath);
        const fileName = require('path').basename(filePath);
        
        const data = [["file", fileData, "binary"]];
        
        const [env, env_json_str] = await NATSBridge.smartsend(
            `/files/${recipient}`,
            data,
            {
                broker_url: this.broker_url,
                fileserver_url: this.fileserver_url
            }
        );
        
        return env;
    }

    async uploadLargeFile(filePath, recipient) {
        const stats = fs.statSync(filePath);
        const fileSize = stats.size;
        
        if (fileSize > 100 * 1024 * 1024) {  // > 100MB
            console.log('File too large for direct upload, using streaming...');
            return this.streamUpload(filePath, recipient);
        }
        
        return this.uploadFile(filePath, recipient);
    }

    async streamUpload(filePath, recipient) {
        // Implement streaming upload to file server
        // This would require a more sophisticated file server
        // For now, we'll use the standard upload
        return this.uploadFile(filePath, recipient);
    }
}

module.exports = FileUploadService;

Python

# src/file_upload_service.py
from natsbridge import smartsend
import os

class FileUploadService:
    def __init__(self, broker_url: str, fileserver_url: str):
        self.broker_url = broker_url
        self.fileserver_url = fileserver_url

    async def upload_file(self, file_path: str, recipient: str) -> tuple:
        with open(file_path, "rb") as f:
            file_data = f.read()
        file_name = os.path.basename(file_path)
        
        data = [("file", file_data, "binary")]
        
        env, env_json_str = await smartsend(
            f"/files/{recipient}",
            data,
            broker_url=self.broker_url,
            fileserver_url=self.fileserver_url
        )
        
        return env, env_json_str

    async def upload_large_file(self, file_path: str, recipient: str) -> tuple:
        file_size = os.path.getsize(file_path)
        
        if file_size > 100 * 1024 * 1024:  # > 100MB
            print("File too large for direct upload, using streaming...")
            return await self.stream_upload(file_path, recipient)
        
        return await self.upload_file(file_path, recipient)

    async def stream_upload(self, file_path: str, recipient: str) -> tuple:
        # Implement streaming upload to file server
        # This would require a more sophisticated file server
        # For now, we'll use the standard upload
        return await self.upload_file(file_path, recipient)

Step 2: File Download Service

Julia

# src/file_download_service.jl
using NATSBridge

struct FileDownloadService
    nats_url::String
end

function FileDownloadService(nats_url::String)
    FileDownloadService(nats_url)
end

function download_file(service::FileDownloadService, msg::NATS.Msg, sender::String, download_id::String)
    env = smartreceive(msg, fileserver_download_handler=fetch_from_url)
    
    # Process each payload
    for (dataname, data, type) in env["payloads"]
        if type == "binary"
            file_path = "/downloads/$dataname"
            write(file_path, data)
            println("File saved to $file_path")
        end
    end
end

function fetch_from_url(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
    # Fetch data from URL with exponential backoff
    # Return downloaded data as Vector{UInt8}
end

JavaScript

// src/file_download_service.js
const NATSBridge = require('./src/natsbridge.js');
const fs = require('fs');

class FileDownloadService {
    constructor(natsUrl) {
        this.nats_url = natsUrl;
    }

    async downloadFile(msg, sender, downloadId) {
        const env = await NATSBridge.smartreceive(msg, {
            fileserver_download_handler: NATSBridge.fetchWithBackoff
        });
        
        // Process each payload
        for (const [dataname, data, type] of env.payloads) {
            if (type === 'binary') {
                const filePath = `/downloads/${dataname}`;
                fs.writeFileSync(filePath, data);
                console.log(`File saved to ${filePath}`);
            }
        }
    }
}

module.exports = FileDownloadService;

Python

# src/file_download_service.py
from natsbridge import smartreceive, fetch_with_backoff
import os

class FileDownloadService:
    def __init__(self, nats_url: str):
        self.nats_url = nats_url

    async def download_file(self, msg, sender: str, download_id: str):
        env = await smartreceive(
            msg,
            fileserver_download_handler=fetch_with_backoff
        )
        
        # Process each payload
        for dataname, data, type_ in env['payloads']:
            if type_ == 'binary':
                file_path = f'/downloads/{dataname}'
                os.makedirs('/downloads', exist_ok=True)
                with open(file_path, 'wb') as f:
                    f.write(data)
                print(f"File saved to {file_path}")

Step 3: File Transfer CLI

Julia

# src/cli.jl
using NATSBridge

function main()
    println("File Transfer System")
    println("====================")
    println("1. Upload file")
    println("2. Download file")
    println("3. List pending downloads")
    
    print("Enter choice: ")
    choice = readline()
    
    if choice == "1"
        upload_file_cli()
    elseif choice == "2"
        download_file_cli()
    end
end

function upload_file_cli()
    print("Enter file path: ")
    file_path = readline()
    
    print("Enter recipient: ")
    recipient = readline()
    
    file_service = FileUploadService("nats://localhost:4222", "http://localhost:8080")
    
    try
        env = upload_file(file_service, file_path, recipient)
        println("Upload successful!")
        println("File ID: $(env["payloads"][1][1])")
    catch error
        println("Upload failed: $(error)")
    end
end

function download_file_cli()
    print("Enter sender: ")
    sender = readline()
    
    file_service = FileDownloadService("nats://localhost:4222")
    
    try
        download_file(file_service, sender)
        println("Download complete!")
    catch error
        println("Download failed: $(error)")
    end
end

main()

Building a Streaming Data Pipeline

Let's build a data pipeline that processes streaming data from sensors.

Step 1: Sensor Data Model

Julia

# src/sensor_data.jl
using Dates, DataFrames

struct SensorReading
    sensor_id::String
    timestamp::String
    value::Float64
    unit::String
    metadata::Dict{String, Any}
end

function SensorReading(sensor_id::String, value::Float64, unit::String, metadata::Dict{String, Any}=Dict())
    SensorReading(
        sensor_id,
        ISODateTime(now(), Dates.Second) |> string,
        value,
        unit,
        metadata
    )
end

struct SensorBatch
    readings::Vector{SensorReading}
end

function SensorBatch()
    SensorBatch(SensorReading[])
end

function add_reading(batch::SensorBatch, reading::SensorReading)
    push!(batch.readings, reading)
end

function to_dataframe(batch::SensorBatch)::DataFrame
    data = Dict{String, Any}()
    data["sensor_id"] = [r.sensor_id for r in batch.readings]
    data["timestamp"] = [r.timestamp for r in batch.readings]
    data["value"] = [r.value for r in batch.readings]
    data["unit"] = [r.unit for r in batch.readings]
    
    return DataFrame(data)
end

JavaScript

// src/sensor_data.js
const NATSBridge = require('./src/natsbridge.js');

class SensorReading {
    constructor(sensorId, value, unit, metadata = {}) {
        this.sensor_id = sensorId;
        this.timestamp = new Date().toISOString();
        this.value = value;
        this.unit = unit;
        this.metadata = metadata;
    }
}

class SensorBatch {
    constructor() {
        this.readings = [];
    }

    addReading(reading) {
        this.readings.push(reading);
    }

    toDataFrame() {
        return {
            sensor_id: this.readings.map(r => r.sensor_id),
            timestamp: this.readings.map(r => r.timestamp),
            value: this.readings.map(r => r.value),
            unit: this.readings.map(r => r.unit)
        };
    }
}

module.exports = { SensorReading, SensorBatch };

Python

# src/sensor_data.py
from datetime import datetime
from dataclasses import dataclass, field
from typing import List, Dict, Any

@dataclass
class SensorReading:
    sensor_id: str
    timestamp: str
    value: float
    unit: str
    metadata: Dict[str, Any] = field(default_factory=dict)

    @classmethod
    def create(cls, sensor_id: str, value: float, unit: str, metadata: Dict[str, Any] = None):
        return cls(
            sensor_id=sensor_id,
            timestamp=datetime.utcnow().isoformat(),
            value=value,
            unit=unit,
            metadata=metadata or {}
        )

class SensorBatch:
    def __init__(self):
        self.readings: List[SensorReading] = []

    def add_reading(self, reading: SensorReading):
        self.readings.append(reading)

    def to_dataframe(self):
        import pandas as pd
        return pd.DataFrame({
            'sensor_id': [r.sensor_id for r in self.readings],
            'timestamp': [r.timestamp for r in self.readings],
            'value': [r.value for r in self.readings],
            'unit': [r.unit for r in self.readings]
        })

Step 2: Sensor Sender

Julia

# src/sensor_sender.jl
using NATSBridge, Dates, Random

struct SensorSender
    broker_url::String
    fileserver_url::String
end

function SensorSender(broker_url::String, fileserver_url::String)
    SensorSender(broker_url, fileserver_url)
end

function send_reading(sender::SensorSender, sensor_id::String, value::Float64, unit::String)
    reading = SensorReading(sensor_id, value, unit)
    
    data = [("reading", reading.metadata, "dictionary")]
    
    # Default: is_publish=True (automatically publishes to NATS)
    smartsend(
        "/sensors/$sensor_id",
        data,
        broker_url=sender.broker_url,
        fileserver_url=sender.fileserver_url
    )
end

function send_batch(sender::SensorSender, readings::Vector{SensorReading})
    batch = SensorBatch()
    for reading in readings
        add_reading(batch, reading)
    end
    
    df = to_dataframe(batch)
    
    # Convert to Arrow IPC format
    import Arrow
    table = Arrow.Table(df)
    
    # Serialize to Arrow IPC
    import IOBuffer
    buf = IOBuffer()
    Arrow.write(buf, table)
    
    arrow_data = take!(buf)
    
    # Send based on size
    if length(arrow_data) < 1048576  # < 1MB
        data = [("batch", arrow_data, "table")]
        smartsend(
            "/sensors/batch",
            data,
            broker_url=sender.broker_url,
            fileserver_url=sender.fileserver_url
        )
    else
        # Upload to file server
        data = [("batch", arrow_data, "table")]
        smartsend(
            "/sensors/batch",
            data,
            broker_url=sender.broker_url,
            fileserver_url=sender.fileserver_url
        )
    end
end

JavaScript

// src/sensor_sender.js
const NATSBridge = require('./src/natsbridge.js');
const { SensorReading, SensorBatch } = require('./sensor_data.js');

class SensorSender {
    constructor(brokerUrl, fileserverUrl) {
        this.broker_url = brokerUrl;
        this.fileserver_url = fileserverUrl;
    }

    async sendReading(sensorId, value, unit) {
        const reading = new SensorReading(sensorId, value, unit);
        
        const data = [["reading", reading.metadata, "dictionary"]];
        
        await NATSBridge.smartsend(
            `/sensors/${sensorId}`,
            data,
            {
                broker_url: this.broker_url,
                fileserver_url: this.fileserver_url
            }
        );
    }

    async sendBatch(readings) {
        const batch = new SensorBatch();
        for (const reading of readings) {
            batch.addReading(reading);
        }
        
        const df = batch.toDataFrame();
        
        // Convert to Arrow IPC
        const arrow = require('apache-arrow');
        const schema = new arrow.Schema([
            new arrow.Field('sensor_id', arrow.string()),
            new arrow.Field('timestamp', arrow.string()),
            new arrow.Field('value', arrow.float64()),
            new arrow.Field('unit', arrow.string())
        ]);
        
        const arrays = {
            sensor_id: new arrow.StringArray(df.sensor_id.map(s => String(s))),
            timestamp: new arrow.StringArray(df.timestamp),
            value: new arrow.Float64Array(df.value),
            unit: new arrow.StringArray(df.unit)
        };
        
        const recordBatch = arrow.RecordBatch.fromArrays(schema, arrays, df.value.length);
        const buffer = arrow.tableFromBatches([recordBatch]).toBuffer();
        const arrow_data = new Uint8Array(buffer);
        
        // Send based on size
        if (arrow_data.length < 1048576) {
            const data = [["batch", arrow_data, "table"]];
            await NATSBridge.smartsend(
                "/sensors/batch",
                data,
                {
                    broker_url: this.broker_url,
                    fileserver_url: this.fileserver_url
                }
            );
        } else {
            const data = [["batch", arrow_data, "table"]];
            await NATSBridge.smartsend(
                "/sensors/batch",
                data,
                {
                    broker_url: this.broker_url,
                    fileserver_url: this.fileserver_url
                }
            );
        }
    }
}

module.exports = SensorSender;

Python

# src/sensor_sender.py
from natsbridge import smartsend
from sensor_data import SensorReading, SensorBatch

class SensorSender:
    def __init__(self, broker_url: str, fileserver_url: str):
        self.broker_url = broker_url
        self.fileserver_url = fileserver_url

    async def send_reading(self, sensor_id: str, value: float, unit: str):
        reading = SensorReading.create(sensor_id, value, unit)
        
        data = [("reading", reading.metadata, "dictionary")]
        
        await smartsend(
            f"/sensors/{sensor_id}",
            data,
            broker_url=self.broker_url,
            fileserver_url=self.fileserver_url
        )

    async def send_batch(self, readings):
        batch = SensorBatch()
        for reading in readings:
            batch.add_reading(reading)
        
        df = batch.to_dataframe()
        
        # Convert to Arrow IPC
        import pyarrow as arrow
        import pyarrow.ipc as ipc
        import io
        
        table = arrow.Table.from_pandas(df)
        buf = io.BytesIO()
        sink = ipc.new_file(buf, table.schema)
        ipc.write_table(table, sink)
        sink.close()
        arrow_data = buf.getvalue()
        
        # Send based on size (auto-selected by smartsend)
        data = [("batch", arrow_data, "table")]
        await smartsend(
            "/sensors/batch",
            data,
            broker_url=self.broker_url,
            fileserver_url=self.fileserver_url
        )

Performance Optimization

1. Batch Processing

Julia

# Batch multiple readings into a single message
function send_batch_readings(sender::SensorSender, readings::Vector{Tuple{String, Float64, String}})
    batch = SensorBatch()
    
    for (sensor_id, value, unit) in readings
        reading = SensorReading(sensor_id, value, unit)
        add_reading(batch, reading)
    end
    
    df = to_dataframe(batch)
    
    # Convert to Arrow IPC
    import Arrow
    table = Arrow.Table(df)
    
    # Serialize to Arrow IPC
    import IOBuffer
    buf = IOBuffer()
    Arrow.write(buf, table)
    
    arrow_data = take!(buf)
    
    # Send as single message
    smartsend(
        "/sensors/batch",
        [("batch", arrow_data, "table")],
        broker_url=sender.broker_url
    )
end

JavaScript

// Batch multiple readings into a single message
async function sendBatchReadings(sender, readings) {
    const batch = new SensorBatch();
    
    for (const [sensorId, value, unit] of readings) {
        const reading = new SensorReading(sensorId, value, unit);
        batch.addReading(reading);
    }
    
    const df = batch.toDataFrame();
    
    // Convert to Arrow IPC
    const arrow = require('apache-arrow');
    const schema = new arrow.Schema([
        new arrow.Field('sensor_id', arrow.string()),
        new arrow.Field('timestamp', arrow.string()),
        new arrow.Field('value', arrow.float64()),
        new arrow.Field('unit', arrow.string())
    ]);
    
    const arrays = {
        sensor_id: new arrow.StringArray(df.sensor_id),
        timestamp: new arrow.StringArray(df.timestamp),
        value: new arrow.Float64Array(df.value),
        unit: new arrow.StringArray(df.unit)
    };
    
    const recordBatch = arrow.RecordBatch.fromArrays(schema, arrays, df.value.length);
    const buffer = arrow.tableFromBatches([recordBatch]).toBuffer();
    const arrow_data = new Uint8Array(buffer);
    
    // Send as single message
    const data = [["batch", arrow_data, "table"]];
    await NATSBridge.smartsend(
        "/sensors/batch",
        data,
        { broker_url: sender.broker_url }
    );
}

2. Connection Reuse

Julia

# Reuse NATS connections
function create_connection_pool()
    connections = Dict{String, NATS.Connection}()
    
    function get_connection(nats_url::String)::NATS.Connection
        if !haskey(connections, nats_url)
            connections[nats_url] = NATS.connect(nats_url)
        end
        return connections[nats_url]
    end
    
    function close_all()
        for conn in values(connections)
            NATS.drain(conn)
        end
        empty!(connections)
    end
    
    return (get_connection=get_connection, close_all=close_all)
end

Python

# Reuse NATS connections
import asyncio
import nats

class ConnectionPool:
    def __init__(self):
        self.connections = {}
    
    async def get_connection(self, nats_url: str):
        if nats_url not in self.connections:
            self.connections[nats_url] = await nats.connect(nats_url)
        return self.connections[nats_url]
    
    async def close_all(self):
        for conn in self.connections.values():
            await conn.drain()
        self.connections.clear()

3. Caching

Julia

# Cache file server responses
using Base.Threads

const file_cache = Dict{String, Vector{UInt8}}()

function fetch_with_caching(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
    if haskey(file_cache, url)
        return file_cache[url]
    end
    
    # Fetch from file server
    data = _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id)
    
    # Cache the result
    file_cache[url] = data
    
    return data
end

Python

# Cache file server responses
import asyncio
import threading
from natsbridge import fetch_with_backoff

file_cache = {}
cache_lock = threading.Lock()

async def fetch_with_caching(url, max_retries, base_delay, max_delay, correlation_id):
    with cache_lock:
        if url in file_cache:
            return file_cache[url]
    
    # Fetch from file server
    data = await fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id)
    
    # Cache the result
    with cache_lock:
        file_cache[url] = data
    
    return data

Best Practices

1. Error Handling

Julia

function safe_smartsend(subject::String, data::Vector{Tuple}, kwargs...)
    try
        return smartsend(subject, data; kwargs...)
    catch error
        println("Failed to send message: $(error)")
        return nothing
    end
end

JavaScript

async function safeSmartSend(subject, data, options = {}) {
    try {
        return await NATSBridge.smartsend(subject, data, options);
    } catch (error) {
        console.error(`Failed to send message: ${error}`);
        return null;
    }
}

Python

from typing import List, Tuple, Optional, Union

async def safe_smartsend(
    subject: str,
    data: List[Tuple[str, Any, str]],
    **kwargs
) -> Optional[Tuple[dict, str]]:
    try:
        return await smartsend(subject, data, **kwargs)
    except Exception as error:
        print(f"Failed to send message: {error}")
        return None

2. Logging

Julia

using Logging

function log_send(subject::String, data::Vector{Tuple}, correlation_id::String)
    @info "Sending to $subject: $(length(data)) payloads, correlation_id=$correlation_id"
end

function log_receive(correlation_id::String, num_payloads::Int)
    @info "Received message: $num_payloads payloads, correlation_id=$correlation_id"
end

Python

import logging
from typing import List, Tuple, Any

logger = logging.getLogger(__name__)

def log_send(subject: str, data: List[Tuple[str, Any, str]], correlation_id: str):
    logger.info(f"Sending to {subject}: {len(data)} payloads, correlation_id={correlation_id}")

def log_receive(correlation_id: str, num_payloads: int):
    logger.info(f"Received message: {num_payloads} payloads, correlation_id={correlation_id}")

Conclusion

This walkthrough covered:

  • Building a chat application with rich media support
  • Building a file transfer system with claim-check pattern
  • Building a streaming data pipeline for sensor data

For more information, check the API documentation and test examples.


License

MIT