# NATSBridge Walkthrough A comprehensive guide to building real-world applications with NATSBridge. ## Table of Contents 1. [Introduction](#introduction) 2. [Architecture Overview](#architecture-overview) 3. [Building a Chat Application](#building-a-chat-application) 4. [Building a File Transfer System](#building-a-file-transfer-system) 5. [Building a Streaming Data Pipeline](#building-a-streaming-data-pipeline) 6. [Building a Micropython IoT Device](#building-a-micropython-iot-device) 7. [Building a Cross-Platform Dashboard](#building-a-cross-platform-dashboard) 8. [Performance Optimization](#performance-optimimization) 9. [Best Practices](#best-practices) --- ## Introduction This walkthrough will guide you through building several real-world applications using NATSBridge. We'll cover: - Chat applications with rich media support - File transfer systems with claim-check pattern - Streaming data pipelines - Micropython IoT devices - Cross-platform dashboards Each section builds on the previous one, gradually increasing in complexity. --- ## Architecture Overview ### System Components ``` ┌─────────────────────────────────────────────────────────────────┐ │ NATSBridge Architecture │ ├─────────────────────────────────────────────────────────────────┤ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Julia │ │ JavaScript │ │ Python/Micr │ │ │ │ (NATS.jl) │◄──►│ (nats.js) │◄──►│ opython │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────────────────────────────────────────┐ │ │ │ NATS │ │ │ │ (Message Broker) │ │ │ └──────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ File Server │ │ │ │ (HTTP Upload) │ │ │ └──────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` ### Message Flow 1. **Sender** creates a message envelope with payloads 2. **NATSBridge** serializes and encodes payloads 3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server 4. **NATS** routes messages to subscribers 5. **Receiver** fetches payloads (from NATS or file server) 6. **NATSBridge** deserializes and decodes payloads --- ## Building a Chat Application Let's build a full-featured chat application that supports text, images, and file attachments. ### Step 1: Set Up the Project ```bash # Create project directory mkdir -p chat-app/src cd chat-app # Create configuration file cat > config.json << 'EOF' { "nats_url": "nats://localhost:4222", "fileserver_url": "http://localhost:8080", "size_threshold": 1048576 } EOF ``` ### Step 2: Create the Chat Interface (JavaScript) ```javascript // src/chat-ui.js class ChatUI { constructor() { this.messages = []; this.currentRoom = null; this.setupEventListeners(); } setupEventListeners() { document.getElementById('send-btn').addEventListener('click', () => this.sendMessage()); document.getElementById('file-input').addEventListener('change', (e) => this.handleFileSelect(e)); } async sendMessage() { const messageInput = document.getElementById('message-input'); const text = messageInput.value.trim(); if (!text && !this.selectedFile) return; const data = []; // Add text message if (text) { data.push({ dataname: "text", data: text, type: "text" }); } // Add file if selected if (this.selectedFile) { const fileData = await this.readFile(this.selectedFile); data.push({ dataname: "attachment", data: fileData, type: this.getFileType(this.selectedFile.type) }); } await smartsend( `/chat/${this.currentRoom}`, data, { fileserverUrl: window.config.fileserver_url, sizeThreshold: window.config.size_threshold } ); messageInput.value = ''; this.selectedFile = null; document.getElementById('file-name').textContent = ''; } handleFileSelect(event) { this.selectedFile = event.target.files[0]; document.getElementById('file-name').textContent = `Selected: ${this.selectedFile.name}`; } readFile(file) { return new Promise((resolve, reject) => { const reader = new FileReader(); reader.onload = () => resolve(reader.result); reader.onerror = reject; reader.readAsArrayBuffer(file); }); } getFileType(mimeType) { if (mimeType.startsWith('image/')) return 'image'; if (mimeType.startsWith('audio/')) return 'audio'; if (mimeType.startsWith('video/')) return 'video'; return 'binary'; } addMessage(user, text, attachment = null) { const messagesContainer = document.getElementById('messages'); const messageDiv = document.createElement('div'); messageDiv.className = 'message'; let content = `
${user}
`; content += `
${text}
`; if (attachment) { if (attachment.type === 'image') { content += ``; } else { content += `Download attachment`; } } messageDiv.innerHTML = content; messagesContainer.appendChild(messageDiv); messagesContainer.scrollTop = messagesContainer.scrollHeight; } } ``` ### Step 3: Create the Message Handler ```javascript // src/chat-handler.js const { smartreceive } = require('./NATSBridge'); class ChatHandler { constructor(natsConnection) { this.nats = natsConnection; this.ui = new ChatUI(); } async start() { // Subscribe to chat rooms const rooms = ['general', 'tech', 'random']; for (const room of rooms) { await this.nats.subscribe(`/chat/${room}`, async (msg) => { await this.handleMessage(msg); }); } console.log('Chat handler started'); } async handleMessage(msg) { const envelope = await smartreceive(msg, { fileserverDownloadHandler: this.downloadFile.bind(this) }); // Extract sender info from envelope const sender = envelope.senderName || 'Anonymous'; // Process each payload for (const payload of envelope.payloads) { if (payload.type === 'text') { this.ui.addMessage(sender, payload.data); } else if (payload.type === 'image') { // Convert to data URL for display const base64 = this.arrayBufferToBase64(payload.data); this.ui.addMessage(sender, null, { type: 'image', data: `data:image/png;base64,${base64}` }); } else { // For other types, use file server URL this.ui.addMessage(sender, null, { type: payload.type, data: payload.data }); } } } downloadFile(url, max_retries, base_delay, max_delay, correlation_id) { return fetch(url) .then(response => response.arrayBuffer()); } arrayBufferToBase64(buffer) { const bytes = new Uint8Array(buffer); let binary = ''; for (let i = 0; i < bytes.length; i++) { binary += String.fromCharCode(bytes[i]); } return btoa(binary); } } ``` ### Step 4: Run the Application ```bash # Start NATS docker run -p 4222:4222 nats:latest # Start file server mkdir -p /tmp/fileserver python3 -m http.server 8080 --directory /tmp/fileserver # Run chat app node src/chat-ui.js node src/chat-handler.js ``` --- ## Building a File Transfer System Let's build a file transfer system that handles large files efficiently. ### Step 1: File Upload Service ```javascript // src/file-upload-service.js const { smartsend } = require('./NATSBridge'); class FileUploadService { constructor(natsUrl, fileserverUrl) { this.natsUrl = natsUrl; this.fileserverUrl = fileserverUrl; } async uploadFile(filePath, recipient) { const fs = require('fs'); const fileData = fs.readFileSync(filePath); const fileName = require('path').basename(filePath); const data = [{ dataname: fileName, data: fileData, type: 'binary' }]; const envelope = await smartsend( `/files/${recipient}`, data, { natsUrl: this.natsUrl, fileserverUrl: this.fileserverUrl, sizeThreshold: 1048576 } ); return envelope; } async uploadLargeFile(filePath, recipient) { // For very large files, stream upload const fs = require('fs'); const fileSize = fs.statSync(filePath).size; if (fileSize > 100 * 1024 * 1024) { // > 100MB console.log('File too large for direct upload, using streaming...'); return await this.streamUpload(filePath, recipient); } return await this.uploadFile(filePath, recipient); } async streamUpload(filePath, recipient) { // Implement streaming upload to file server // This would require a more sophisticated file server // For now, we'll use the standard upload return await this.uploadFile(filePath, recipient); } } module.exports = FileUploadService; ``` ### Step 2: File Download Service ```javascript // src/file-download-service.js const { smartreceive } = require('./NATSBridge'); const fs = require('fs'); class FileDownloadService { constructor(natsUrl) { this.natsUrl = natsUrl; this.downloads = new Map(); } async downloadFile(sender, downloadId) { // Subscribe to sender's file channel const envelope = await smartreceive(msg, { fileserverDownloadHandler: this.fetchFromUrl.bind(this) }); // Process each payload for (const payload of envelope.payloads) { if (payload.type === 'binary') { const filePath = `/downloads/${payload.dataname}`; fs.writeFileSync(filePath, payload.data); console.log(`File saved to ${filePath}`); } } } async fetchFromUrl(url, max_retries, base_delay, max_delay, correlation_id) { const response = await fetch(url); if (!response.ok) { throw new Error(`Failed to fetch: ${response.status}`); } return await response.arrayBuffer(); } } module.exports = FileDownloadService; ``` ### Step 3: File Transfer CLI ```javascript // src/cli.js const { smartsend, smartreceive } = require('./NATSBridge'); const fs = require('fs'); const readline = require('readline'); const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); async function main() { const config = JSON.parse(fs.readFileSync('config.json', 'utf8')); console.log('File Transfer System'); console.log('===================='); console.log('1. Upload file'); console.log('2. Download file'); console.log('3. List pending downloads'); const choice = await rl.question('Enter choice: '); if (choice === '1') { await uploadFile(config); } else if (choice === '2') { await downloadFile(config); } rl.close(); } async function uploadFile(config) { const filePath = await rl.question('Enter file path: '); const recipient = await rl.question('Enter recipient: '); const fileService = new FileUploadService(config.nats_url, config.fileserver_url); try { const envelope = await fileService.uploadFile(filePath, recipient); console.log('Upload successful!'); console.log(`File ID: ${envelope.payloads[0].id}`); } catch (error) { console.error('Upload failed:', error.message); } } async function downloadFile(config) { const sender = await rl.question('Enter sender: '); const fileService = new FileDownloadService(config.nats_url); try { await fileService.downloadFile(sender); console.log('Download complete!'); } catch (error) { console.error('Download failed:', error.message); } } main(); ``` --- ## Building a Streaming Data Pipeline Let's build a data pipeline that processes streaming data from sensors. ### Step 1: Sensor Data Model ```python # src/sensor_data.py from dataclasses import dataclass from datetime import datetime from typing import List, Dict, Any import json @dataclass class SensorReading: sensor_id: str timestamp: str value: float unit: str metadata: Dict[str, Any] = None def to_dict(self) -> Dict[str, Any]: return { "sensor_id": self.sensor_id, "timestamp": self.timestamp, "value": self.value, "unit": self.unit, "metadata": self.metadata or {} } class SensorBatch: def __init__(self): self.readings: List[SensorReading] = [] def add_reading(self, reading: SensorReading): self.readings.append(reading) def to_dataframe(self): import pandas as pd data = [r.to_dict() for r in self.readings] return pd.DataFrame(data) ``` ### Step 2: Sensor Sender ```python # src/sensor-sender.py from nats_bridge import smartsend from sensor_data import SensorReading, SensorBatch import time import random class SensorSender: def __init__(self, nats_url: str, fileserver_url: str): self.nats_url = nats_url self.fileserver_url = fileserver_url def send_reading(self, sensor_id: str, value: float, unit: str): reading = SensorReading( sensor_id=sensor_id, timestamp=datetime.now().isoformat(), value=value, unit=unit ) data = [("reading", reading.to_dict(), "dictionary")] # Default: is_publish=True (automatically publishes to NATS) smartsend( f"/sensors/{sensor_id}", data, nats_url=self.nats_url, fileserver_url=self.fileserver_url ) def prepare_message_only(self, sensor_id: str, value: float, unit: str): """Prepare a message without publishing (is_publish=False).""" reading = SensorReading( sensor_id=sensor_id, timestamp=datetime.now().isoformat(), value=value, unit=unit ) data = [("reading", reading.to_dict(), "dictionary")] # With is_publish=False, returns (envelope, json_str) without publishing env, msg_json_str = smartsend( f"/sensors/{sensor_id}/prepare", data, nats_url=self.nats_url, fileserver_url=self.fileserver_url, is_publish=False ) # Now you can publish manually using NATS request-reply pattern # nc.request(subject, msg_json_str, reply_to=reply_to_topic) return env, msg_json_str def send_batch(self, readings: List[SensorReading]): batch = SensorBatch() for reading in readings: batch.add_reading(reading) df = batch.to_dataframe() # Convert to Arrow IPC format import pyarrow as pa table = pa.Table.from_pandas(df) # Serialize to Arrow IPC import io buf = io.BytesIO() with pa.ipc.new_stream(buf, table.schema) as writer: writer.write_table(table) arrow_data = buf.getvalue() # Send based on size if len(arrow_data) < 1048576: # < 1MB data = [("batch", arrow_data, "table")] smartsend( f"/sensors/batch", data, nats_url=self.nats_url, fileserver_url=self.fileserver_url ) else: # Upload to file server from nats_bridge import plik_oneshot_upload response = plik_oneshot_upload(self.fileserver_url, "batch.arrow", arrow_data) print(f"Uploaded batch to {response['url']}") ``` ### Step 3: Sensor Receiver ```python # src/sensor-receiver.py from nats_bridge import smartreceive from sensor_data import SensorReading import pandas as pd import pyarrow as pa import io class SensorReceiver: def __init__(self, fileserver_download_handler): self.fileserver_download_handler = fileserver_download_handler def process_reading(self, msg): envelope = smartreceive(msg, self.fileserver_download_handler) for dataname, data, data_type in envelope["payloads"]: if data_type == "dictionary": reading = SensorReading( sensor_id=data["sensor_id"], timestamp=data["timestamp"], value=data["value"], unit=data["unit"], metadata=data.get("metadata", {}) ) print(f"Received: {reading}") elif data_type == "table": # Deserialize Arrow IPC table = pa.ipc.open_stream(io.BytesIO(data)).read_all() df = table.to_pandas() print(f"Received batch with {len(df)} readings") print(df) ``` --- ## Building a Micropython IoT Device Let's build an IoT device using Micropython that connects to NATS. ### Step 1: Device Configuration ```python # device_config.py import json class DeviceConfig: def __init__(self, ssid, password, nats_url, device_id): self.ssid = ssid self.password = password self.nats_url = nats_url self.device_id = device_id def to_dict(self): return { "ssid": self.ssid, "password": self.password, "nats_url": self.nats_url, "device_id": self.device_id } ``` ### Step 2: Device Bridge ```python # device_bridge.py from nats_bridge import smartsend, smartreceive import json class DeviceBridge: def __init__(self, config): self.config = config self.nats_url = config.nats_url def connect(self): # Connect to WiFi import network wlan = network.WLAN(network.STA_IF) wlan.active(True) wlan.connect(self.config.ssid, self.config.password) while not wlan.isconnected(): pass print('Connected to WiFi') print('Network config:', wlan.ifconfig()) def send_status(self, status): data = [("status", status, "dictionary")] smartsend( f"/devices/{self.config.device_id}/status", data, nats_url=self.nats_url ) def send_sensor_data(self, sensor_id, value, unit): data = [ ("sensor_id", sensor_id, "text"), ("value", value, "dictionary") ] smartsend( f"/devices/{self.config.device_id}/sensors/{sensor_id}", data, nats_url=self.nats_url ) def receive_commands(self, callback): # Subscribe to commands import socket # Simplified subscription - in production, use proper NATS client while True: # Poll for messages msg = self._poll_for_message() if msg: envelope = smartreceive(msg) # Process payloads for dataname, data, data_type in envelope["payloads"]: if dataname == "command": callback(data) def _poll_for_message(self): # Simplified message polling # In production, implement proper NATS subscription return None ``` ### Step 3: Device Application ```python # device_app.py from device_config import DeviceConfig from device_bridge import DeviceBridge import time import random # Load configuration config = DeviceConfig( ssid="MyNetwork", password="password123", nats_url="nats://localhost:4222", device_id="device-001" ) bridge = DeviceBridge(config) bridge.connect() # Send initial status bridge.send_status({ "status": "online", "version": "1.0.0", "uptime": 0 }) # Main loop while True: # Read sensor data temperature = random.uniform(20, 30) humidity = random.uniform(40, 60) bridge.send_sensor_data("temperature", temperature, "celsius") bridge.send_sensor_data("humidity", humidity, "percent") # Send status update bridge.send_status({ "status": "online", "temperature": temperature, "humidity": humidity }) time.sleep(60) # Every minute ``` --- ## Building a Cross-Platform Dashboard Let's build a dashboard that displays data from multiple platforms. ### Step 1: Dashboard Server (Python) ```python # src/dashboard-server.py from nats_bridge import smartsend, smartreceive import pandas as pd import pyarrow as pa import io class DashboardServer: def __init__(self, nats_url, fileserver_url): self.nats_url = nats_url self.fileserver_url = fileserver_url def broadcast_data(self, df): # Convert to Arrow IPC table = pa.Table.from_pandas(df) buf = io.BytesIO() with pa.ipc.new_stream(buf, table.schema) as writer: writer.write_table(table) arrow_data = buf.getvalue() # Broadcast to all subscribers data = [("data", arrow_data, "table")] smartsend( "/dashboard/data", data, nats_url=self.nats_url, fileserver_url=self.fileserver_url ) def receive_selection(self, callback): def handler(msg): envelope = smartreceive(msg) for dataname, data, data_type in envelope["payloads"]: if data_type == "dictionary": callback(data) # Subscribe to selections import threading thread = threading.Thread(target=self._listen_for_selections, args=(handler,)) thread.daemon = True thread.start() def _listen_for_selections(self, handler): # Simplified subscription # In production, implement proper NATS subscription pass ``` ### Step 2: Dashboard UI (JavaScript) ```javascript // src/dashboard-ui.js class DashboardUI { constructor() { this.data = null; this.setupEventListeners(); } setupEventListeners() { document.getElementById('refresh-btn').addEventListener('click', () => this.refreshData()); document.getElementById('export-btn').addEventListener('click', () => this.exportData()); } async refreshData() { // Request fresh data await smartsend("/dashboard/request", [ { dataname: "request", data: { type: "refresh" }, type: "dictionary" } ], { fileserverUrl: window.config.fileserver_url }); } async fetchData() { // Subscribe to data updates const envelope = await smartreceive(msg, { fileserverDownloadHandler: this.fetchFromUrl.bind(this) }); // Process table data for (const payload of envelope.payloads) { if (payload.type === 'table') { // Deserialize Arrow IPC this.data = this.deserializeArrow(payload.data); this.renderTable(); } } } deserializeArrow(data) { // Deserialize Arrow IPC to JavaScript array // This would require the apache-arrow library return JSON.parse(JSON.stringify(data)); // Simplified } renderTable() { const tableContainer = document.getElementById('data-table'); tableContainer.innerHTML = ''; if (!this.data) return; // Render table headers const headers = Object.keys(this.data[0]); const thead = document.createElement('thead'); const headerRow = document.createElement('tr'); headers.forEach(header => { const th = document.createElement('th'); th.textContent = header; headerRow.appendChild(th); }); thead.appendChild(headerRow); tableContainer.appendChild(thead); // Render table rows const tbody = document.createElement('tbody'); this.data.forEach(row => { const tr = document.createElement('tr'); headers.forEach(header => { const td = document.createElement('td'); td.textContent = row[header]; tr.appendChild(td); }); tbody.appendChild(tr); }); tableContainer.appendChild(tbody); } exportData() { const csv = this.toCSV(); const blob = new Blob([csv], { type: 'text/csv' }); const url = URL.createObjectURL(blob); const a = document.createElement('a'); a.href = url; a.download = 'dashboard_data.csv'; a.click(); } toCSV() { if (!this.data) return ''; const headers = Object.keys(this.data[0]); const rows = this.data.map(row => headers.map(h => row[h]).join(',') ); return [headers.join(','), ...rows].join('\n'); } fetchFromUrl(url) { return fetch(url) .then(response => response.arrayBuffer()); } } ``` --- ## Performance Optimization ### 1. Batch Processing ```python # Batch multiple readings into a single message def send_batch_readings(self, readings): batch = SensorBatch() for reading in readings: batch.add_reading(reading) df = batch.to_dataframe() # Convert to Arrow IPC table = pa.Table.from_pandas(df) buf = io.BytesIO() with pa.ipc.new_stream(buf, table.schema) as writer: writer.write_table(table) arrow_data = buf.getvalue() # Send as single message smartsend( "/sensors/batch", [("batch", arrow_data, "table")], nats_url=self.nats_url ) ``` ### 2. Connection Pooling ```javascript // Reuse NATS connections const nats = require('nats'); class ConnectionPool { constructor() { this.connections = new Map(); } getConnection(natsUrl) { if (!this.connections.has(natsUrl)) { this.connections.set(natsUrl, nats.connect({ servers: [natsUrl] })); } return this.connections.get(natsUrl); } closeAll() { this.connections.forEach(conn => conn.close()); this.connections.clear(); } } ``` ### 3. Caching ```python # Cache file server responses from functools import lru_cache @lru_cache(maxsize=100) def fetch_with_caching(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""): return _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) ``` --- ## Best Practices ### 1. Error Handling ```python def safe_smartsend(subject, data, **kwargs): try: return smartsend(subject, data, **kwargs) except Exception as e: print(f"Failed to send message: {e}") return None ``` ### 2. Logging ```python import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def log_send(subject, data, correlation_id): logger.info(f"Sending to {subject}: {len(data)} payloads, correlation_id={correlation_id}") def log_receive(correlation_id, num_payloads): logger.info(f"Received message: {num_payloads} payloads, correlation_id={correlation_id}") ``` ### 3. Rate Limiting ```python import time from collections import deque class RateLimiter: def __init__(self, max_requests, time_window): self.max_requests = max_requests self.time_window = time_window self.requests = deque() def allow(self): now = time.time() # Remove old requests while self.requests and self.requests[0] < now - self.time_window: self.requests.popleft() if len(self.requests) >= self.max_requests: return False self.requests.append(now) return True ``` --- ## Conclusion This walkthrough covered: - Building a chat application with rich media support - Building a file transfer system with claim-check pattern - Building a streaming data pipeline for sensor data - Building a Micropython IoT device - Building a cross-platform dashboard For more information, check the [API documentation](../src/README.md) and [test examples](../test/). --- ## License MIT