# Cross-Platform NATSBridge Walkthrough A comprehensive guide to building real-world applications with NATSBridge across **Julia**, **JavaScript**, and **Python/MicroPython**. ## Table of Contents 1. [Introduction](#introduction) 2. [Architecture Overview](#architecture-overview) 3. [Building a Chat Application](#building-a-chat-application) 4. [Building a File Transfer System](#building-a-file-transfer-system) 5. [Building a Streaming Data Pipeline](#building-a-streaming-data-pipeline) 6. [Performance Optimization](#performance-optimization) 7. [Best Practices](#best-practices) --- ## Introduction This walkthrough will guide you through building several real-world applications using NATSBridge. We'll cover: - Chat applications with rich media support - File transfer systems with claim-check pattern - Streaming data pipelines Each section builds on the previous one, gradually increasing in complexity. --- ## Architecture Overview ### Cross-Platform System Components ```mermaid flowchart TB subgraph JuliaApp["Julia Application"] JuliaAppCode[App Code] JuliaBridge[NATSBridge.jl] JuliaNATS[NATS.jl] end subgraph JSApp["JavaScript Application"] JSAppCode[App Code] JSBridge[NATSBridge.js] JSNATS[nats.js] end subgraph PythonApp["Python Application"] PythonAppCode[App Code] PythonBridge[NATSBridge.py] PythonNATS[nats-py] end subgraph Infrastructure["Infrastructure"] NATS[NATS Server
Message Broker] FileServer[HTTP File Server
Upload/Download] end JuliaAppCode --> JuliaBridge JuliaBridge --> JuliaNATS JSAppCode --> JSBridge JSBridge --> JSNATS PythonAppCode --> PythonBridge PythonBridge --> PythonNATS JuliaNATS --> NATS JSNATS --> NATS PythonNATS --> NATS NATS --> JuliaNATS NATS --> JSNATS NATS --> PythonNATS JuliaBridge -.->|HTTP POST upload| FileServer JSBridge -.->|HTTP POST upload| FileServer PythonBridge -.->|HTTP POST upload| FileServer FileServer -.->|HTTP GET download| JuliaBridge FileServer -.->|HTTP GET download| JSBridge FileServer -.->|HTTP GET download| PythonBridge style JuliaApp fill:#c5e1a5 style JSApp fill:#bbdefb style PythonApp fill:#f8bbd0 style NATS fill:#fff3e0 style FileServer fill:#f3e5f5 ``` ### Message Flow 1. **Sender** creates a message envelope with payloads 2. **NATSBridge** serializes and encodes payloads 3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server 4. **NATS** routes messages to subscribers 5. **Receiver** fetches payloads (from NATS or file server) 6. **NATSBridge** deserializes and decodes payloads --- ## Building a Chat Application Let's build a full-featured chat application that supports text, images, and file attachments. ### Step 1: Set Up the Project ```bash # Create project directory mkdir -p chat-app/src cd chat-app # Create configuration file cat > config.json << 'EOF' { "nats_url": "nats://localhost:4222", "fileserver_url": "http://localhost:8080", "size_threshold": 1048576 } EOF ``` ### Step 2: Create the Chat Interface #### Julia ```julia # src/chat_ui.jl using NATSBridge, NATS struct ChatUI messages::Vector{Dict} current_room::String end function ChatUI() ChatUI(Dict[], "") end function send_message(ui::ChatUI, message_input::String, selected_file::Union{Nothing, String}) data = [] # Add text message if !isempty(message_input) push!(data, ("text", message_input, "text")) end # Add file if selected if selected_file !== nothing file_data = read(selected_file) file_type = get_file_type(selected_file) push!(data, ("attachment", file_data, file_type)) end return data end function get_file_type(filename::String)::String if endswith(filename, ".png") || endswith(filename, ".jpg") return "image" elseif endswith(filename, ".mp3") || endswith(filename, ".wav") return "audio" elseif endswith(filename, ".mp4") || endswith(filename, ".avi") return "video" else return "binary" end end function add_message(ui::ChatUI, user::String, text::String, attachment::Union{Nothing, Dict}) push!(ui.messages, Dict( "user" => user, "text" => text, "attachment" => attachment )) end ``` #### JavaScript ```javascript // src/chat_ui.js const NATSBridge = require('./src/natbridge.js'); class ChatUI { constructor() { this.messages = []; this.currentRoom = ""; } sendMessage(messageInput, selectedFile = null) { const data = []; // Add text message if (messageInput.length > 0) { data.push(["text", messageInput, "text"]); } // Add file if selected if (selectedFile !== null) { const fileData = fs.readFileSync(selectedFile); const fileType = this.getFileType(selectedFile); data.push(["attachment", fileData, fileType]); } return data; } getFileType(filename) { if (filename.endsWith('.png') || filename.endsWith('.jpg')) { return 'image'; } else if (filename.endsWith('.mp3') || filename.endsWith('.wav')) { return 'audio'; } else if (filename.endsWith('.mp4') || filename.endsWith('.avi')) { return 'video'; } else { return 'binary'; } } addMessage(user, text, attachment = null) { this.messages.push({ user, text, attachment }); } } module.exports = ChatUI; ``` #### Python ```python # src/chat_ui.py from typing import List, Dict, Optional, Union class ChatUI: def __init__(self): self.messages: List[Dict] = [] self.current_room: str = "" def send_message(self, message_input: str, selected_file: Optional[str] = None) -> List[tuple]: data = [] # Add text message if message_input: data.append(("text", message_input, "text")) # Add file if selected if selected_file: with open(selected_file, "rb") as f: file_data = f.read() file_type = self.get_file_type(selected_file) data.append(("attachment", file_data, file_type)) return data def get_file_type(self, filename: str) -> str: if filename.endswith(('.png', '.jpg')): return "image" elif filename.endswith(('.mp3', '.wav')): return "audio" elif filename.endswith(('.mp4', '.avi')): return "video" else: return "binary" def add_message(self, user: str, text: str, attachment: Optional[Dict] = None): self.messages.append({ "user": user, "text": text, "attachment": attachment }) ``` ### Step 3: Create the Message Handler #### Julia ```julia # src/chat_handler.jl using NATSBridge, NATS struct ChatHandler nats::NATS.Connection ui::ChatUI end function ChatHandler(nats_connection::NATS.Connection) ChatHandler(nats_connection, ChatUI()) end function start(handler::ChatHandler) # Subscribe to chat rooms rooms = ["general", "tech", "random"] for room in rooms NATS.subscribe(handler.nats, "/chat/$room") do msg handle_message(handler, msg) end end println("Chat handler started") end function handle_message(handler::ChatHandler, msg::NATS.Msg) env = smartreceive(msg, fileserver_download_handler=_fetch_with_backoff) # Extract sender info from envelope sender = get(env, "sender_name", "Anonymous") # Process each payload for (dataname, data, type) in env["payloads"] if type == "text" add_message(handler.ui, sender, data, nothing) elseif type == "image" # Convert to data URL for display base64_data = base64encode(data) attachment = Dict( "type" => "image", "data" => "data:image/png;base64,$base64_data" ) add_message(handler.ui, sender, "", attachment) else # For other types, use file server URL attachment = Dict("type" => type, "data" => data) add_message(handler.ui, sender, "", attachment) end end end ``` #### JavaScript ```javascript // src/chat_handler.js const NATSBridge = require('./src/natbridge.js'); const nats = require('nats'); class ChatHandler { constructor(natsConnection) { this.nats = natsConnection; this.ui = new (require('./chat_ui.js'))(); } async start() { // Subscribe to chat rooms const rooms = ['general', 'tech', 'random']; for (const room of rooms) { this.nats.subscribe(`/chat/${room}`, async (msg) => { await this.handleMessage(msg); }); } console.log('Chat handler started'); } async handleMessage(msg) { const env = await NATSBridge.smartreceive(msg, { fileserver_download_handler: NATSBridge.fetchWithBackoff }); // Extract sender info from envelope const sender = env.sender_name || 'Anonymous'; // Process each payload for (const [dataname, data, type] of env.payloads) { if (type === 'text') { this.ui.addMessage(sender, data, null); } else if (type === 'image') { // Convert to data URL for display const base64Data = Buffer.from(data).toString('base64'); const attachment = { type: 'image', data: `data:image/png;base64,${base64Data}` }; this.ui.addMessage(sender, '', attachment); } else { // For other types, use file server URL const attachment = { type, data }; this.ui.addMessage(sender, '', attachment); } } } } module.exports = ChatHandler; ``` #### Python ```python # src/chat_handler.py import asyncio from typing import Optional from natbridge import smartreceive, fetch_with_backoff class ChatHandler: def __init__(self, nats_connection): self.nats = nats_connection self.ui = ChatUI() async def start(self): # Subscribe to chat rooms rooms = ['general', 'tech', 'random'] for room in rooms: await self.nats.subscribe( f'/chat/{room}', callback=self.handle_message ) print('Chat handler started') async def handle_message(self, msg): env = await smartreceive( msg, fileserver_download_handler=fetch_with_backoff ) # Extract sender info from envelope sender = env.get('sender_name', 'Anonymous') # Process each payload for dataname, data, type_ in env['payloads']: if type_ == 'text': self.ui.add_message(sender, data, None) elif type_ == 'image': # Convert to data URL for display import base64 base64_data = base64.b64encode(data).decode('utf-8') attachment = { 'type': 'image', 'data': f'data:image/png;base64,{base64_data}' } self.ui.add_message(sender, '', attachment) else: # For other types, use file server URL or data attachment = {'type': type_, 'data': data} self.ui.add_message(sender, '', attachment) ``` ### Step 4: Run the Application ```bash # Start NATS docker run -p 4222:4222 nats:latest # Start file server mkdir -p /tmp/fileserver python3 -m http.server 8080 --directory /tmp/fileserver # Run chat app # Julia julia src/chat_ui.jl julia src/chat_handler.jl # JavaScript node src/chat_ui.js node src/chat_handler.js # Python python3 src/chat_ui.py python3 src/chat_handler.py ``` --- ## Building a File Transfer System Let's build a file transfer system that handles large files efficiently. ### Step 1: File Upload Service #### Julia ```julia # src/file_upload_service.jl using NATSBridge, HTTP struct FileUploadService broker_url::String fileserver_url::String end function FileUploadService(broker_url::String, fileserver_url::String) FileUploadService(broker_url, fileserver_url) end function upload_file(service::FileUploadService, file_path::String, recipient::String)::Dict file_data = read(file_path) file_name = basename(file_path) data = [("file", file_data, "binary")] env, env_json_str = smartsend( "/files/$recipient", data, broker_url=service.broker_url, fileserver_url=service.fileserver_url ) return env end function upload_large_file(service::FileUploadService, file_path::String, recipient::String)::Dict file_size = stat(file_path).size if file_size > 100 * 1024 * 1024 # > 100MB println("File too large for direct upload, using streaming...") return stream_upload(service, file_path, recipient) end return upload_file(service, file_path, recipient) end function stream_upload(service::FileUploadService, file_path::String, recipient::String)::Dict # Implement streaming upload to file server # This would require a more sophisticated file server # For now, we'll use the standard upload return upload_file(service, file_path, recipient) end ``` #### JavaScript ```javascript // src/file_upload_service.js const NATSBridge = require('./src/natbridge.js'); const fs = require('fs'); class FileUploadService { constructor(brokerUrl, fileserverUrl) { this.broker_url = brokerUrl; this.fileserver_url = fileserverUrl; } async uploadFile(filePath, recipient) { const fileData = fs.readFileSync(filePath); const fileName = require('path').basename(filePath); const data = [["file", fileData, "binary"]]; const [env, env_json_str] = await NATSBridge.smartsend( `/files/${recipient}`, data, { broker_url: this.broker_url, fileserver_url: this.fileserver_url } ); return env; } async uploadLargeFile(filePath, recipient) { const stats = fs.statSync(filePath); const fileSize = stats.size; if (fileSize > 100 * 1024 * 1024) { // > 100MB console.log('File too large for direct upload, using streaming...'); return this.streamUpload(filePath, recipient); } return this.uploadFile(filePath, recipient); } async streamUpload(filePath, recipient) { // Implement streaming upload to file server // This would require a more sophisticated file server // For now, we'll use the standard upload return this.uploadFile(filePath, recipient); } } module.exports = FileUploadService; ``` #### Python ```python # src/file_upload_service.py from natbridge import smartsend import os class FileUploadService: def __init__(self, broker_url: str, fileserver_url: str): self.broker_url = broker_url self.fileserver_url = fileserver_url async def upload_file(self, file_path: str, recipient: str) -> tuple: with open(file_path, "rb") as f: file_data = f.read() file_name = os.path.basename(file_path) data = [("file", file_data, "binary")] env, env_json_str = await smartsend( f"/files/{recipient}", data, broker_url=self.broker_url, fileserver_url=self.fileserver_url ) return env, env_json_str async def upload_large_file(self, file_path: str, recipient: str) -> tuple: file_size = os.path.getsize(file_path) if file_size > 100 * 1024 * 1024: # > 100MB print("File too large for direct upload, using streaming...") return await self.stream_upload(file_path, recipient) return await self.upload_file(file_path, recipient) async def stream_upload(self, file_path: str, recipient: str) -> tuple: # Implement streaming upload to file server # This would require a more sophisticated file server # For now, we'll use the standard upload return await self.upload_file(file_path, recipient) ``` ### Step 2: File Download Service #### Julia ```julia # src/file_download_service.jl using NATSBridge struct FileDownloadService nats_url::String end function FileDownloadService(nats_url::String) FileDownloadService(nats_url) end function download_file(service::FileDownloadService, msg::NATS.Msg, sender::String, download_id::String) env = smartreceive(msg, fileserver_download_handler=fetch_from_url) # Process each payload for (dataname, data, type) in env["payloads"] if type == "binary" file_path = "/downloads/$dataname" write(file_path, data) println("File saved to $file_path") end end end function fetch_from_url(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} # Fetch data from URL with exponential backoff # Return downloaded data as Vector{UInt8} end ``` #### JavaScript ```javascript // src/file_download_service.js const NATSBridge = require('./src/natbridge.js'); const fs = require('fs'); class FileDownloadService { constructor(natsUrl) { this.nats_url = natsUrl; } async downloadFile(msg, sender, downloadId) { const env = await NATSBridge.smartreceive(msg, { fileserver_download_handler: NATSBridge.fetchWithBackoff }); // Process each payload for (const [dataname, data, type] of env.payloads) { if (type === 'binary') { const filePath = `/downloads/${dataname}`; fs.writeFileSync(filePath, data); console.log(`File saved to ${filePath}`); } } } } module.exports = FileDownloadService; ``` #### Python ```python # src/file_download_service.py from natbridge import smartreceive, fetch_with_backoff import os class FileDownloadService: def __init__(self, nats_url: str): self.nats_url = nats_url async def download_file(self, msg, sender: str, download_id: str): env = await smartreceive( msg, fileserver_download_handler=fetch_with_backoff ) # Process each payload for dataname, data, type_ in env['payloads']: if type_ == 'binary': file_path = f'/downloads/{dataname}' os.makedirs('/downloads', exist_ok=True) with open(file_path, 'wb') as f: f.write(data) print(f"File saved to {file_path}") ``` ### Step 3: File Transfer CLI #### Julia ```julia # src/cli.jl using NATSBridge function main() println("File Transfer System") println("====================") println("1. Upload file") println("2. Download file") println("3. List pending downloads") print("Enter choice: ") choice = readline() if choice == "1" upload_file_cli() elseif choice == "2" download_file_cli() end end function upload_file_cli() print("Enter file path: ") file_path = readline() print("Enter recipient: ") recipient = readline() file_service = FileUploadService("nats://localhost:4222", "http://localhost:8080") try env = upload_file(file_service, file_path, recipient) println("Upload successful!") println("File ID: $(env["payloads"][1][1])") catch error println("Upload failed: $(error)") end end function download_file_cli() print("Enter sender: ") sender = readline() file_service = FileDownloadService("nats://localhost:4222") try download_file(file_service, sender) println("Download complete!") catch error println("Download failed: $(error)") end end main() ``` --- ## Building a Streaming Data Pipeline Let's build a data pipeline that processes streaming data from sensors. ### Step 1: Sensor Data Model #### Julia ```julia # src/sensor_data.jl using Dates, DataFrames struct SensorReading sensor_id::String timestamp::String value::Float64 unit::String metadata::Dict{String, Any} end function SensorReading(sensor_id::String, value::Float64, unit::String, metadata::Dict{String, Any}=Dict()) SensorReading( sensor_id, ISODateTime(now(), Dates.Second) |> string, value, unit, metadata ) end struct SensorBatch readings::Vector{SensorReading} end function SensorBatch() SensorBatch(SensorReading[]) end function add_reading(batch::SensorBatch, reading::SensorReading) push!(batch.readings, reading) end function to_dataframe(batch::SensorBatch)::DataFrame data = Dict{String, Any}() data["sensor_id"] = [r.sensor_id for r in batch.readings] data["timestamp"] = [r.timestamp for r in batch.readings] data["value"] = [r.value for r in batch.readings] data["unit"] = [r.unit for r in batch.readings] return DataFrame(data) end ``` #### JavaScript ```javascript // src/sensor_data.js const NATSBridge = require('./src/natbridge.js'); class SensorReading { constructor(sensorId, value, unit, metadata = {}) { this.sensor_id = sensorId; this.timestamp = new Date().toISOString(); this.value = value; this.unit = unit; this.metadata = metadata; } } class SensorBatch { constructor() { this.readings = []; } addReading(reading) { this.readings.push(reading); } toDataFrame() { return { sensor_id: this.readings.map(r => r.sensor_id), timestamp: this.readings.map(r => r.timestamp), value: this.readings.map(r => r.value), unit: this.readings.map(r => r.unit) }; } } module.exports = { SensorReading, SensorBatch }; ``` #### Python ```python # src/sensor_data.py from datetime import datetime from dataclasses import dataclass, field from typing import List, Dict, Any @dataclass class SensorReading: sensor_id: str timestamp: str value: float unit: str metadata: Dict[str, Any] = field(default_factory=dict) @classmethod def create(cls, sensor_id: str, value: float, unit: str, metadata: Dict[str, Any] = None): return cls( sensor_id=sensor_id, timestamp=datetime.utcnow().isoformat(), value=value, unit=unit, metadata=metadata or {} ) class SensorBatch: def __init__(self): self.readings: List[SensorReading] = [] def add_reading(self, reading: SensorReading): self.readings.append(reading) def to_dataframe(self): import pandas as pd return pd.DataFrame({ 'sensor_id': [r.sensor_id for r in self.readings], 'timestamp': [r.timestamp for r in self.readings], 'value': [r.value for r in self.readings], 'unit': [r.unit for r in self.readings] }) ``` ### Step 2: Sensor Sender #### Julia ```julia # src/sensor_sender.jl using NATSBridge, Dates, Random struct SensorSender broker_url::String fileserver_url::String end function SensorSender(broker_url::String, fileserver_url::String) SensorSender(broker_url, fileserver_url) end function send_reading(sender::SensorSender, sensor_id::String, value::Float64, unit::String) reading = SensorReading(sensor_id, value, unit) data = [("reading", reading.metadata, "dictionary")] # Default: is_publish=True (automatically publishes to NATS) smartsend( "/sensors/$sensor_id", data, broker_url=sender.broker_url, fileserver_url=sender.fileserver_url ) end function send_batch(sender::SensorSender, readings::Vector{SensorReading}) batch = SensorBatch() for reading in readings add_reading(batch, reading) end df = to_dataframe(batch) # Convert to Arrow IPC format import Arrow table = Arrow.Table(df) # Serialize to Arrow IPC import IOBuffer buf = IOBuffer() Arrow.write(buf, table) arrow_data = take!(buf) # Send based on size if length(arrow_data) < 1048576 # < 1MB data = [("batch", arrow_data, "table")] smartsend( "/sensors/batch", data, broker_url=sender.broker_url, fileserver_url=sender.fileserver_url ) else # Upload to file server data = [("batch", arrow_data, "table")] smartsend( "/sensors/batch", data, broker_url=sender.broker_url, fileserver_url=sender.fileserver_url ) end end ``` #### JavaScript ```javascript // src/sensor_sender.js const NATSBridge = require('./src/natbridge.js'); const { SensorReading, SensorBatch } = require('./sensor_data.js'); class SensorSender { constructor(brokerUrl, fileserverUrl) { this.broker_url = brokerUrl; this.fileserver_url = fileserverUrl; } async sendReading(sensorId, value, unit) { const reading = new SensorReading(sensorId, value, unit); const data = [["reading", reading.metadata, "dictionary"]]; await NATSBridge.smartsend( `/sensors/${sensorId}`, data, { broker_url: this.broker_url, fileserver_url: this.fileserver_url } ); } async sendBatch(readings) { const batch = new SensorBatch(); for (const reading of readings) { batch.addReading(reading); } const df = batch.toDataFrame(); // Convert to Arrow IPC const arrow = require('apache-arrow'); const schema = new arrow.Schema([ new arrow.Field('sensor_id', arrow.string()), new arrow.Field('timestamp', arrow.string()), new arrow.Field('value', arrow.float64()), new arrow.Field('unit', arrow.string()) ]); const arrays = { sensor_id: new arrow.StringArray(df.sensor_id.map(s => String(s))), timestamp: new arrow.StringArray(df.timestamp), value: new arrow.Float64Array(df.value), unit: new arrow.StringArray(df.unit) }; const recordBatch = arrow.RecordBatch.fromArrays(schema, arrays, df.value.length); const buffer = arrow.tableFromBatches([recordBatch]).toBuffer(); const arrow_data = new Uint8Array(buffer); // Send based on size if (arrow_data.length < 1048576) { const data = [["batch", arrow_data, "table"]]; await NATSBridge.smartsend( "/sensors/batch", data, { broker_url: this.broker_url, fileserver_url: this.fileserver_url } ); } else { const data = [["batch", arrow_data, "table"]]; await NATSBridge.smartsend( "/sensors/batch", data, { broker_url: this.broker_url, fileserver_url: this.fileserver_url } ); } } } module.exports = SensorSender; ``` #### Python ```python # src/sensor_sender.py from natbridge import smartsend from sensor_data import SensorReading, SensorBatch class SensorSender: def __init__(self, broker_url: str, fileserver_url: str): self.broker_url = broker_url self.fileserver_url = fileserver_url async def send_reading(self, sensor_id: str, value: float, unit: str): reading = SensorReading.create(sensor_id, value, unit) data = [("reading", reading.metadata, "dictionary")] await smartsend( f"/sensors/{sensor_id}", data, broker_url=self.broker_url, fileserver_url=self.fileserver_url ) async def send_batch(self, readings): batch = SensorBatch() for reading in readings: batch.add_reading(reading) df = batch.to_dataframe() # Convert to Arrow IPC import pyarrow as arrow import pyarrow.ipc as ipc import io table = arrow.Table.from_pandas(df) buf = io.BytesIO() sink = ipc.new_file(buf, table.schema) ipc.write_table(table, sink) sink.close() arrow_data = buf.getvalue() # Send based on size (auto-selected by smartsend) data = [("batch", arrow_data, "table")] await smartsend( "/sensors/batch", data, broker_url=self.broker_url, fileserver_url=self.fileserver_url ) ``` --- ## Performance Optimization ### 1. Batch Processing #### Julia ```julia # Batch multiple readings into a single message function send_batch_readings(sender::SensorSender, readings::Vector{Tuple{String, Float64, String}}) batch = SensorBatch() for (sensor_id, value, unit) in readings reading = SensorReading(sensor_id, value, unit) add_reading(batch, reading) end df = to_dataframe(batch) # Convert to Arrow IPC import Arrow table = Arrow.Table(df) # Serialize to Arrow IPC import IOBuffer buf = IOBuffer() Arrow.write(buf, table) arrow_data = take!(buf) # Send as single message smartsend( "/sensors/batch", [("batch", arrow_data, "table")], broker_url=sender.broker_url ) end ``` #### JavaScript ```javascript // Batch multiple readings into a single message async function sendBatchReadings(sender, readings) { const batch = new SensorBatch(); for (const [sensorId, value, unit] of readings) { const reading = new SensorReading(sensorId, value, unit); batch.addReading(reading); } const df = batch.toDataFrame(); // Convert to Arrow IPC const arrow = require('apache-arrow'); const schema = new arrow.Schema([ new arrow.Field('sensor_id', arrow.string()), new arrow.Field('timestamp', arrow.string()), new arrow.Field('value', arrow.float64()), new arrow.Field('unit', arrow.string()) ]); const arrays = { sensor_id: new arrow.StringArray(df.sensor_id), timestamp: new arrow.StringArray(df.timestamp), value: new arrow.Float64Array(df.value), unit: new arrow.StringArray(df.unit) }; const recordBatch = arrow.RecordBatch.fromArrays(schema, arrays, df.value.length); const buffer = arrow.tableFromBatches([recordBatch]).toBuffer(); const arrow_data = new Uint8Array(buffer); // Send as single message const data = [["batch", arrow_data, "table"]]; await NATSBridge.smartsend( "/sensors/batch", data, { broker_url: sender.broker_url } ); } ``` ### 2. Connection Reuse #### Julia ```julia # Reuse NATS connections function create_connection_pool() connections = Dict{String, NATS.Connection}() function get_connection(nats_url::String)::NATS.Connection if !haskey(connections, nats_url) connections[nats_url] = NATS.connect(nats_url) end return connections[nats_url] end function close_all() for conn in values(connections) NATS.drain(conn) end empty!(connections) end return (get_connection=get_connection, close_all=close_all) end ``` #### Python ```python # Reuse NATS connections import asyncio import nats class ConnectionPool: def __init__(self): self.connections = {} async def get_connection(self, nats_url: str): if nats_url not in self.connections: self.connections[nats_url] = await nats.connect(nats_url) return self.connections[nats_url] async def close_all(self): for conn in self.connections.values(): await conn.drain() self.connections.clear() ``` ### 3. Caching #### Julia ```julia # Cache file server responses using Base.Threads const file_cache = Dict{String, Vector{UInt8}}() function fetch_with_caching(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} if haskey(file_cache, url) return file_cache[url] end # Fetch from file server data = _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) # Cache the result file_cache[url] = data return data end ``` #### Python ```python # Cache file server responses import asyncio import threading from natbridge import fetch_with_backoff file_cache = {} cache_lock = threading.Lock() async def fetch_with_caching(url, max_retries, base_delay, max_delay, correlation_id): with cache_lock: if url in file_cache: return file_cache[url] # Fetch from file server data = await fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) # Cache the result with cache_lock: file_cache[url] = data return data ``` --- ## Best Practices ### 1. Error Handling #### Julia ```julia function safe_smartsend(subject::String, data::Vector{Tuple}, kwargs...) try return smartsend(subject, data; kwargs...) catch error println("Failed to send message: $(error)") return nothing end end ``` #### JavaScript ```javascript async function safeSmartSend(subject, data, options = {}) { try { return await NATSBridge.smartsend(subject, data, options); } catch (error) { console.error(`Failed to send message: ${error}`); return null; } } ``` #### Python ```python from typing import List, Tuple, Optional, Union async def safe_smartsend( subject: str, data: List[Tuple[str, Any, str]], **kwargs ) -> Optional[Tuple[dict, str]]: try: return await smartsend(subject, data, **kwargs) except Exception as error: print(f"Failed to send message: {error}") return None ``` ### 2. Logging #### Julia ```julia using Logging function log_send(subject::String, data::Vector{Tuple}, correlation_id::String) @info "Sending to $subject: $(length(data)) payloads, correlation_id=$correlation_id" end function log_receive(correlation_id::String, num_payloads::Int) @info "Received message: $num_payloads payloads, correlation_id=$correlation_id" end ``` #### Python ```python import logging from typing import List, Tuple, Any logger = logging.getLogger(__name__) def log_send(subject: str, data: List[Tuple[str, Any, str]], correlation_id: str): logger.info(f"Sending to {subject}: {len(data)} payloads, correlation_id={correlation_id}") def log_receive(correlation_id: str, num_payloads: int): logger.info(f"Received message: {num_payloads} payloads, correlation_id={correlation_id}") ``` --- ## Conclusion This walkthrough covered: - Building a chat application with rich media support - Building a file transfer system with claim-check pattern - Building a streaming data pipeline for sensor data For more information, check the [API documentation](../src/README.md) and [test examples](../test/). --- ## License MIT