Files
NATSBridge/examples/walkthrough.md
2026-02-23 06:58:16 +07:00

1045 lines
30 KiB
Markdown

# NATSBridge Walkthrough
A comprehensive guide to building real-world applications with NATSBridge.
## Table of Contents
1. [Introduction](#introduction)
2. [Architecture Overview](#architecture-overview)
3. [Building a Chat Application](#building-a-chat-application)
4. [Building a File Transfer System](#building-a-file-transfer-system)
5. [Building a Streaming Data Pipeline](#building-a-streaming-data-pipeline)
6. [Building a Micropython IoT Device](#building-a-micropython-iot-device)
7. [Building a Cross-Platform Dashboard](#building-a-cross-platform-dashboard)
8. [Performance Optimization](#performance-optimimization)
9. [Best Practices](#best-practices)
---
## Introduction
This walkthrough will guide you through building several real-world applications using NATSBridge. We'll cover:
- Chat applications with rich media support
- File transfer systems with claim-check pattern
- Streaming data pipelines
- Micropython IoT devices
- Cross-platform dashboards
Each section builds on the previous one, gradually increasing in complexity.
---
## Architecture Overview
### System Components
```
┌─────────────────────────────────────────────────────────────────┐
│ NATSBridge Architecture │
├─────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Julia │ │ JavaScript │ │ Python/Micr │ │
│ │ (NATS.jl) │◄──►│ (nats.js) │◄──►│ opython │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ NATS │ │
│ │ (Message Broker) │ │
│ └──────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ File Server │ │
│ │ (HTTP Upload) │ │
│ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### Message Flow
1. **Sender** creates a message envelope with payloads
2. **NATSBridge** serializes and encodes payloads
3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server
4. **NATS** routes messages to subscribers
5. **Receiver** fetches payloads (from NATS or file server)
6. **NATSBridge** deserializes and decodes payloads
---
## Building a Chat Application
Let's build a full-featured chat application that supports text, images, and file attachments.
### Step 1: Set Up the Project
```bash
# Create project directory
mkdir -p chat-app/src
cd chat-app
# Create configuration file
cat > config.json << 'EOF'
{
"nats_url": "nats://localhost:4222",
"fileserver_url": "http://localhost:8080",
"size_threshold": 1048576
}
EOF
```
### Step 2: Create the Chat Interface (JavaScript)
```javascript
// src/chat-ui.js
class ChatUI {
constructor() {
this.messages = [];
this.currentRoom = null;
this.setupEventListeners();
}
setupEventListeners() {
document.getElementById('send-btn').addEventListener('click', () => this.sendMessage());
document.getElementById('file-input').addEventListener('change', (e) => this.handleFileSelect(e));
}
async sendMessage() {
const messageInput = document.getElementById('message-input');
const text = messageInput.value.trim();
if (!text && !this.selectedFile) return;
const data = [];
// Add text message
if (text) {
data.push({
dataname: "text",
data: text,
type: "text"
});
}
// Add file if selected
if (this.selectedFile) {
const fileData = await this.readFile(this.selectedFile);
data.push({
dataname: "attachment",
data: fileData,
type: this.getFileType(this.selectedFile.type)
});
}
await smartsend(
`/chat/${this.currentRoom}`,
data,
{
fileserverUrl: window.config.fileserver_url,
sizeThreshold: window.config.size_threshold
}
);
messageInput.value = '';
this.selectedFile = null;
document.getElementById('file-name').textContent = '';
}
handleFileSelect(event) {
this.selectedFile = event.target.files[0];
document.getElementById('file-name').textContent = `Selected: ${this.selectedFile.name}`;
}
readFile(file) {
return new Promise((resolve, reject) => {
const reader = new FileReader();
reader.onload = () => resolve(reader.result);
reader.onerror = reject;
reader.readAsArrayBuffer(file);
});
}
getFileType(mimeType) {
if (mimeType.startsWith('image/')) return 'image';
if (mimeType.startsWith('audio/')) return 'audio';
if (mimeType.startsWith('video/')) return 'video';
return 'binary';
}
addMessage(user, text, attachment = null) {
const messagesContainer = document.getElementById('messages');
const messageDiv = document.createElement('div');
messageDiv.className = 'message';
let content = `<div class="user">${user}</div>`;
content += `<div class="text">${text}</div>`;
if (attachment) {
if (attachment.type === 'image') {
content += `<img src="${attachment.data}" class="attachment" />`;
} else {
content += `<a href="${attachment.data}" class="attachment">Download attachment</a>`;
}
}
messageDiv.innerHTML = content;
messagesContainer.appendChild(messageDiv);
messagesContainer.scrollTop = messagesContainer.scrollHeight;
}
}
```
### Step 3: Create the Message Handler
```javascript
// src/chat-handler.js
const { smartreceive } = require('./NATSBridge');
class ChatHandler {
constructor(natsConnection) {
this.nats = natsConnection;
this.ui = new ChatUI();
}
async start() {
// Subscribe to chat rooms
const rooms = ['general', 'tech', 'random'];
for (const room of rooms) {
await this.nats.subscribe(`/chat/${room}`, async (msg) => {
await this.handleMessage(msg);
});
}
console.log('Chat handler started');
}
async handleMessage(msg) {
const envelope = await smartreceive(msg, {
fileserverDownloadHandler: this.downloadFile.bind(this)
});
// Extract sender info from envelope
const sender = envelope.senderName || 'Anonymous';
// Process each payload
for (const payload of envelope.payloads) {
if (payload.type === 'text') {
this.ui.addMessage(sender, payload.data);
} else if (payload.type === 'image') {
// Convert to data URL for display
const base64 = this.arrayBufferToBase64(payload.data);
this.ui.addMessage(sender, null, {
type: 'image',
data: `data:image/png;base64,${base64}`
});
} else {
// For other types, use file server URL
this.ui.addMessage(sender, null, {
type: payload.type,
data: payload.data
});
}
}
}
downloadFile(url, max_retries, base_delay, max_delay, correlation_id) {
return fetch(url)
.then(response => response.arrayBuffer());
}
arrayBufferToBase64(buffer) {
const bytes = new Uint8Array(buffer);
let binary = '';
for (let i = 0; i < bytes.length; i++) {
binary += String.fromCharCode(bytes[i]);
}
return btoa(binary);
}
}
```
### Step 4: Run the Application
```bash
# Start NATS
docker run -p 4222:4222 nats:latest
# Start file server
mkdir -p /tmp/fileserver
python3 -m http.server 8080 --directory /tmp/fileserver
# Run chat app
node src/chat-ui.js
node src/chat-handler.js
```
---
## Building a File Transfer System
Let's build a file transfer system that handles large files efficiently.
### Step 1: File Upload Service
```javascript
// src/file-upload-service.js
const { smartsend } = require('./NATSBridge');
class FileUploadService {
constructor(natsUrl, fileserverUrl) {
this.natsUrl = natsUrl;
this.fileserverUrl = fileserverUrl;
}
async uploadFile(filePath, recipient) {
const fs = require('fs');
const fileData = fs.readFileSync(filePath);
const fileName = require('path').basename(filePath);
const data = [{
dataname: fileName,
data: fileData,
type: 'binary'
}];
const envelope = await smartsend(
`/files/${recipient}`,
data,
{
natsUrl: this.natsUrl,
fileserverUrl: this.fileserverUrl,
sizeThreshold: 1048576
}
);
return envelope;
}
async uploadLargeFile(filePath, recipient) {
// For very large files, stream upload
const fs = require('fs');
const fileSize = fs.statSync(filePath).size;
if (fileSize > 100 * 1024 * 1024) { // > 100MB
console.log('File too large for direct upload, using streaming...');
return await this.streamUpload(filePath, recipient);
}
return await this.uploadFile(filePath, recipient);
}
async streamUpload(filePath, recipient) {
// Implement streaming upload to file server
// This would require a more sophisticated file server
// For now, we'll use the standard upload
return await this.uploadFile(filePath, recipient);
}
}
module.exports = FileUploadService;
```
### Step 2: File Download Service
```javascript
// src/file-download-service.js
const { smartreceive } = require('./NATSBridge');
const fs = require('fs');
class FileDownloadService {
constructor(natsUrl) {
this.natsUrl = natsUrl;
this.downloads = new Map();
}
async downloadFile(sender, downloadId) {
// Subscribe to sender's file channel
const envelope = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
});
// Process each payload
for (const payload of envelope.payloads) {
if (payload.type === 'binary') {
const filePath = `/downloads/${payload.dataname}`;
fs.writeFileSync(filePath, payload.data);
console.log(`File saved to ${filePath}`);
}
}
}
async fetchFromUrl(url, max_retries, base_delay, max_delay, correlation_id) {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Failed to fetch: ${response.status}`);
}
return await response.arrayBuffer();
}
}
module.exports = FileDownloadService;
```
### Step 3: File Transfer CLI
```javascript
// src/cli.js
const { smartsend, smartreceive } = require('./NATSBridge');
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
async function main() {
const config = JSON.parse(fs.readFileSync('config.json', 'utf8'));
console.log('File Transfer System');
console.log('====================');
console.log('1. Upload file');
console.log('2. Download file');
console.log('3. List pending downloads');
const choice = await rl.question('Enter choice: ');
if (choice === '1') {
await uploadFile(config);
} else if (choice === '2') {
await downloadFile(config);
}
rl.close();
}
async function uploadFile(config) {
const filePath = await rl.question('Enter file path: ');
const recipient = await rl.question('Enter recipient: ');
const fileService = new FileUploadService(config.nats_url, config.fileserver_url);
try {
const envelope = await fileService.uploadFile(filePath, recipient);
console.log('Upload successful!');
console.log(`File ID: ${envelope.payloads[0].id}`);
} catch (error) {
console.error('Upload failed:', error.message);
}
}
async function downloadFile(config) {
const sender = await rl.question('Enter sender: ');
const fileService = new FileDownloadService(config.nats_url);
try {
await fileService.downloadFile(sender);
console.log('Download complete!');
} catch (error) {
console.error('Download failed:', error.message);
}
}
main();
```
---
## Building a Streaming Data Pipeline
Let's build a data pipeline that processes streaming data from sensors.
### Step 1: Sensor Data Model
```python
# src/sensor_data.py
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any
import json
@dataclass
class SensorReading:
sensor_id: str
timestamp: str
value: float
unit: str
metadata: Dict[str, Any] = None
def to_dict(self) -> Dict[str, Any]:
return {
"sensor_id": self.sensor_id,
"timestamp": self.timestamp,
"value": self.value,
"unit": self.unit,
"metadata": self.metadata or {}
}
class SensorBatch:
def __init__(self):
self.readings: List[SensorReading] = []
def add_reading(self, reading: SensorReading):
self.readings.append(reading)
def to_dataframe(self):
import pandas as pd
data = [r.to_dict() for r in self.readings]
return pd.DataFrame(data)
```
### Step 2: Sensor Sender
```python
# src/sensor-sender.py
from nats_bridge import smartsend
from sensor_data import SensorReading, SensorBatch
import time
import random
class SensorSender:
def __init__(self, nats_url: str, fileserver_url: str):
self.nats_url = nats_url
self.fileserver_url = fileserver_url
def send_reading(self, sensor_id: str, value: float, unit: str):
reading = SensorReading(
sensor_id=sensor_id,
timestamp=datetime.now().isoformat(),
value=value,
unit=unit
)
data = [("reading", reading.to_dict(), "dictionary")]
smartsend(
f"/sensors/{sensor_id}",
data,
nats_url=self.nats_url,
fileserver_url=self.fileserver_url
)
def send_batch(self, readings: List[SensorReading]):
batch = SensorBatch()
for reading in readings:
batch.add_reading(reading)
df = batch.to_dataframe()
# Convert to Arrow IPC format
import pyarrow as pa
table = pa.Table.from_pandas(df)
# Serialize to Arrow IPC
import io
buf = io.BytesIO()
with pa.ipc.new_stream(buf, table.schema) as writer:
writer.write_table(table)
arrow_data = buf.getvalue()
# Send based on size
if len(arrow_data) < 1048576: # < 1MB
data = [("batch", arrow_data, "table")]
smartsend(
f"/sensors/batch",
data,
nats_url=self.nats_url,
fileserver_url=self.fileserver_url
)
else:
# Upload to file server
from nats_bridge import plik_oneshot_upload
response = plik_oneshot_upload(self.fileserver_url, "batch.arrow", arrow_data)
print(f"Uploaded batch to {response['url']}")
```
### Step 3: Sensor Receiver
```python
# src/sensor-receiver.py
from nats_bridge import smartreceive
from sensor_data import SensorReading
import pandas as pd
import pyarrow as pa
import io
class SensorReceiver:
def __init__(self, fileserver_download_handler):
self.fileserver_download_handler = fileserver_download_handler
def process_reading(self, msg):
envelope = smartreceive(msg, self.fileserver_download_handler)
for dataname, data, data_type in envelope["payloads"]:
if data_type == "dictionary":
reading = SensorReading(
sensor_id=data["sensor_id"],
timestamp=data["timestamp"],
value=data["value"],
unit=data["unit"],
metadata=data.get("metadata", {})
)
print(f"Received: {reading}")
elif data_type == "table":
# Deserialize Arrow IPC
table = pa.ipc.open_stream(io.BytesIO(data)).read_all()
df = table.to_pandas()
print(f"Received batch with {len(df)} readings")
print(df)
```
---
## Building a Micropython IoT Device
Let's build an IoT device using Micropython that connects to NATS.
### Step 1: Device Configuration
```python
# device_config.py
import json
class DeviceConfig:
def __init__(self, ssid, password, nats_url, device_id):
self.ssid = ssid
self.password = password
self.nats_url = nats_url
self.device_id = device_id
def to_dict(self):
return {
"ssid": self.ssid,
"password": self.password,
"nats_url": self.nats_url,
"device_id": self.device_id
}
```
### Step 2: Device Bridge
```python
# device_bridge.py
from nats_bridge import smartsend, smartreceive
import json
class DeviceBridge:
def __init__(self, config):
self.config = config
self.nats_url = config.nats_url
def connect(self):
# Connect to WiFi
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(self.config.ssid, self.config.password)
while not wlan.isconnected():
pass
print('Connected to WiFi')
print('Network config:', wlan.ifconfig())
def send_status(self, status):
data = [("status", status, "dictionary")]
smartsend(
f"/devices/{self.config.device_id}/status",
data,
nats_url=self.nats_url
)
def send_sensor_data(self, sensor_id, value, unit):
data = [
("sensor_id", sensor_id, "text"),
("value", value, "dictionary")
]
smartsend(
f"/devices/{self.config.device_id}/sensors/{sensor_id}",
data,
nats_url=self.nats_url
)
def receive_commands(self, callback):
# Subscribe to commands
import socket
# Simplified subscription - in production, use proper NATS client
while True:
# Poll for messages
msg = self._poll_for_message()
if msg:
envelope = smartreceive(msg)
# Process payloads
for dataname, data, data_type in envelope["payloads"]:
if dataname == "command":
callback(data)
def _poll_for_message(self):
# Simplified message polling
# In production, implement proper NATS subscription
return None
```
### Step 3: Device Application
```python
# device_app.py
from device_config import DeviceConfig
from device_bridge import DeviceBridge
import time
import random
# Load configuration
config = DeviceConfig(
ssid="MyNetwork",
password="password123",
nats_url="nats://localhost:4222",
device_id="device-001"
)
bridge = DeviceBridge(config)
bridge.connect()
# Send initial status
bridge.send_status({
"status": "online",
"version": "1.0.0",
"uptime": 0
})
# Main loop
while True:
# Read sensor data
temperature = random.uniform(20, 30)
humidity = random.uniform(40, 60)
bridge.send_sensor_data("temperature", temperature, "celsius")
bridge.send_sensor_data("humidity", humidity, "percent")
# Send status update
bridge.send_status({
"status": "online",
"temperature": temperature,
"humidity": humidity
})
time.sleep(60) # Every minute
```
---
## Building a Cross-Platform Dashboard
Let's build a dashboard that displays data from multiple platforms.
### Step 1: Dashboard Server (Python)
```python
# src/dashboard-server.py
from nats_bridge import smartsend, smartreceive
import pandas as pd
import pyarrow as pa
import io
class DashboardServer:
def __init__(self, nats_url, fileserver_url):
self.nats_url = nats_url
self.fileserver_url = fileserver_url
def broadcast_data(self, df):
# Convert to Arrow IPC
table = pa.Table.from_pandas(df)
buf = io.BytesIO()
with pa.ipc.new_stream(buf, table.schema) as writer:
writer.write_table(table)
arrow_data = buf.getvalue()
# Broadcast to all subscribers
data = [("data", arrow_data, "table")]
smartsend(
"/dashboard/data",
data,
nats_url=self.nats_url,
fileserver_url=self.fileserver_url
)
def receive_selection(self, callback):
def handler(msg):
envelope = smartreceive(msg)
for dataname, data, data_type in envelope["payloads"]:
if data_type == "dictionary":
callback(data)
# Subscribe to selections
import threading
thread = threading.Thread(target=self._listen_for_selections, args=(handler,))
thread.daemon = True
thread.start()
def _listen_for_selections(self, handler):
# Simplified subscription
# In production, implement proper NATS subscription
pass
```
### Step 2: Dashboard UI (JavaScript)
```javascript
// src/dashboard-ui.js
class DashboardUI {
constructor() {
this.data = null;
this.setupEventListeners();
}
setupEventListeners() {
document.getElementById('refresh-btn').addEventListener('click', () => this.refreshData());
document.getElementById('export-btn').addEventListener('click', () => this.exportData());
}
async refreshData() {
// Request fresh data
await smartsend("/dashboard/request", [
{ dataname: "request", data: { type: "refresh" }, type: "dictionary" }
], {
fileserverUrl: window.config.fileserver_url
});
}
async fetchData() {
// Subscribe to data updates
const envelope = await smartreceive(msg, {
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
});
// Process table data
for (const payload of envelope.payloads) {
if (payload.type === 'table') {
// Deserialize Arrow IPC
this.data = this.deserializeArrow(payload.data);
this.renderTable();
}
}
}
deserializeArrow(data) {
// Deserialize Arrow IPC to JavaScript array
// This would require the apache-arrow library
return JSON.parse(JSON.stringify(data)); // Simplified
}
renderTable() {
const tableContainer = document.getElementById('data-table');
tableContainer.innerHTML = '';
if (!this.data) return;
// Render table headers
const headers = Object.keys(this.data[0]);
const thead = document.createElement('thead');
const headerRow = document.createElement('tr');
headers.forEach(header => {
const th = document.createElement('th');
th.textContent = header;
headerRow.appendChild(th);
});
thead.appendChild(headerRow);
tableContainer.appendChild(thead);
// Render table rows
const tbody = document.createElement('tbody');
this.data.forEach(row => {
const tr = document.createElement('tr');
headers.forEach(header => {
const td = document.createElement('td');
td.textContent = row[header];
tr.appendChild(td);
});
tbody.appendChild(tr);
});
tableContainer.appendChild(tbody);
}
exportData() {
const csv = this.toCSV();
const blob = new Blob([csv], { type: 'text/csv' });
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'dashboard_data.csv';
a.click();
}
toCSV() {
if (!this.data) return '';
const headers = Object.keys(this.data[0]);
const rows = this.data.map(row =>
headers.map(h => row[h]).join(',')
);
return [headers.join(','), ...rows].join('\n');
}
fetchFromUrl(url) {
return fetch(url)
.then(response => response.arrayBuffer());
}
}
```
---
## Performance Optimization
### 1. Batch Processing
```python
# Batch multiple readings into a single message
def send_batch_readings(self, readings):
batch = SensorBatch()
for reading in readings:
batch.add_reading(reading)
df = batch.to_dataframe()
# Convert to Arrow IPC
table = pa.Table.from_pandas(df)
buf = io.BytesIO()
with pa.ipc.new_stream(buf, table.schema) as writer:
writer.write_table(table)
arrow_data = buf.getvalue()
# Send as single message
smartsend(
"/sensors/batch",
[("batch", arrow_data, "table")],
nats_url=self.nats_url
)
```
### 2. Connection Pooling
```javascript
// Reuse NATS connections
const nats = require('nats');
class ConnectionPool {
constructor() {
this.connections = new Map();
}
getConnection(natsUrl) {
if (!this.connections.has(natsUrl)) {
this.connections.set(natsUrl, nats.connect({ servers: [natsUrl] }));
}
return this.connections.get(natsUrl);
}
closeAll() {
this.connections.forEach(conn => conn.close());
this.connections.clear();
}
}
```
### 3. Caching
```python
# Cache file server responses
from functools import lru_cache
@lru_cache(maxsize=100)
def fetch_with_caching(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""):
return _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id)
```
---
## Best Practices
### 1. Error Handling
```python
def safe_smartsend(subject, data, **kwargs):
try:
return smartsend(subject, data, **kwargs)
except Exception as e:
print(f"Failed to send message: {e}")
return None
```
### 2. Logging
```python
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def log_send(subject, data, correlation_id):
logger.info(f"Sending to {subject}: {len(data)} payloads, correlation_id={correlation_id}")
def log_receive(correlation_id, num_payloads):
logger.info(f"Received message: {num_payloads} payloads, correlation_id={correlation_id}")
```
### 3. Rate Limiting
```python
import time
from collections import deque
class RateLimiter:
def __init__(self, max_requests, time_window):
self.max_requests = max_requests
self.time_window = time_window
self.requests = deque()
def allow(self):
now = time.time()
# Remove old requests
while self.requests and self.requests[0] < now - self.time_window:
self.requests.popleft()
if len(self.requests) >= self.max_requests:
return False
self.requests.append(now)
return True
```
---
## Conclusion
This walkthrough covered:
- Building a chat application with rich media support
- Building a file transfer system with claim-check pattern
- Building a streaming data pipeline for sensor data
- Building a Micropython IoT device
- Building a cross-platform dashboard
For more information, check the [API documentation](../src/README.md) and [test examples](../test/).
---
## License
MIT