1071 lines
31 KiB
Markdown
1071 lines
31 KiB
Markdown
# NATSBridge Walkthrough
|
|
|
|
A comprehensive guide to building real-world applications with NATSBridge.
|
|
|
|
## Table of Contents
|
|
|
|
1. [Introduction](#introduction)
|
|
2. [Architecture Overview](#architecture-overview)
|
|
3. [Building a Chat Application](#building-a-chat-application)
|
|
4. [Building a File Transfer System](#building-a-file-transfer-system)
|
|
5. [Building a Streaming Data Pipeline](#building-a-streaming-data-pipeline)
|
|
6. [Building a Micropython IoT Device](#building-a-micropython-iot-device)
|
|
7. [Building a Cross-Platform Dashboard](#building-a-cross-platform-dashboard)
|
|
8. [Performance Optimization](#performance-optimimization)
|
|
9. [Best Practices](#best-practices)
|
|
|
|
---
|
|
|
|
## Introduction
|
|
|
|
This walkthrough will guide you through building several real-world applications using NATSBridge. We'll cover:
|
|
|
|
- Chat applications with rich media support
|
|
- File transfer systems with claim-check pattern
|
|
- Streaming data pipelines
|
|
- Micropython IoT devices
|
|
- Cross-platform dashboards
|
|
|
|
Each section builds on the previous one, gradually increasing in complexity.
|
|
|
|
---
|
|
|
|
## Architecture Overview
|
|
|
|
### System Components
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ NATSBridge Architecture │
|
|
├─────────────────────────────────────────────────────────────────┤
|
|
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
|
│ │ Julia │ │ JavaScript │ │ Python/Micr │ │
|
|
│ │ (NATS.jl) │◄──►│ (nats.js) │◄──►│ opython │ │
|
|
│ └──────────────┘ └──────────────┘ └──────────────┘ │
|
|
│ │ │ │ │
|
|
│ ▼ ▼ ▼ │
|
|
│ ┌──────────────────────────────────────────────┐ │
|
|
│ │ NATS │ │
|
|
│ │ (Message Broker) │ │
|
|
│ └──────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ ┌──────────────────┐ │
|
|
│ │ File Server │ │
|
|
│ │ (HTTP Upload) │ │
|
|
│ └──────────────────┘ │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### Message Flow
|
|
|
|
1. **Sender** creates a message envelope with payloads
|
|
2. **NATSBridge** serializes and encodes payloads
|
|
3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server
|
|
4. **NATS** routes messages to subscribers
|
|
5. **Receiver** fetches payloads (from NATS or file server)
|
|
6. **NATSBridge** deserializes and decodes payloads
|
|
|
|
---
|
|
|
|
## Building a Chat Application
|
|
|
|
Let's build a full-featured chat application that supports text, images, and file attachments.
|
|
|
|
### Step 1: Set Up the Project
|
|
|
|
```bash
|
|
# Create project directory
|
|
mkdir -p chat-app/src
|
|
cd chat-app
|
|
|
|
# Create configuration file
|
|
cat > config.json << 'EOF'
|
|
{
|
|
"nats_url": "nats://localhost:4222",
|
|
"fileserver_url": "http://localhost:8080",
|
|
"size_threshold": 1048576
|
|
}
|
|
EOF
|
|
```
|
|
|
|
### Step 2: Create the Chat Interface (JavaScript)
|
|
|
|
```javascript
|
|
// src/chat-ui.js
|
|
class ChatUI {
|
|
constructor() {
|
|
this.messages = [];
|
|
this.currentRoom = null;
|
|
this.setupEventListeners();
|
|
}
|
|
|
|
setupEventListeners() {
|
|
document.getElementById('send-btn').addEventListener('click', () => this.sendMessage());
|
|
document.getElementById('file-input').addEventListener('change', (e) => this.handleFileSelect(e));
|
|
}
|
|
|
|
async sendMessage() {
|
|
const messageInput = document.getElementById('message-input');
|
|
const text = messageInput.value.trim();
|
|
|
|
if (!text && !this.selectedFile) return;
|
|
|
|
const data = [];
|
|
|
|
// Add text message
|
|
if (text) {
|
|
data.push({
|
|
dataname: "text",
|
|
data: text,
|
|
type: "text"
|
|
});
|
|
}
|
|
|
|
// Add file if selected
|
|
if (this.selectedFile) {
|
|
const fileData = await this.readFile(this.selectedFile);
|
|
data.push({
|
|
dataname: "attachment",
|
|
data: fileData,
|
|
type: this.getFileType(this.selectedFile.type)
|
|
});
|
|
}
|
|
|
|
const { env, env_json_str } = await smartsend(
|
|
`/chat/${this.currentRoom}`,
|
|
data,
|
|
{
|
|
fileserverUrl: window.config.fileserver_url,
|
|
sizeThreshold: window.config.size_threshold
|
|
}
|
|
);
|
|
|
|
messageInput.value = '';
|
|
this.selectedFile = null;
|
|
document.getElementById('file-name').textContent = '';
|
|
}
|
|
|
|
handleFileSelect(event) {
|
|
this.selectedFile = event.target.files[0];
|
|
document.getElementById('file-name').textContent = `Selected: ${this.selectedFile.name}`;
|
|
}
|
|
|
|
readFile(file) {
|
|
return new Promise((resolve, reject) => {
|
|
const reader = new FileReader();
|
|
reader.onload = () => resolve(reader.result);
|
|
reader.onerror = reject;
|
|
reader.readAsArrayBuffer(file);
|
|
});
|
|
}
|
|
|
|
getFileType(mimeType) {
|
|
if (mimeType.startsWith('image/')) return 'image';
|
|
if (mimeType.startsWith('audio/')) return 'audio';
|
|
if (mimeType.startsWith('video/')) return 'video';
|
|
return 'binary';
|
|
}
|
|
|
|
addMessage(user, text, attachment = null) {
|
|
const messagesContainer = document.getElementById('messages');
|
|
const messageDiv = document.createElement('div');
|
|
messageDiv.className = 'message';
|
|
|
|
let content = `<div class="user">${user}</div>`;
|
|
content += `<div class="text">${text}</div>`;
|
|
|
|
if (attachment) {
|
|
if (attachment.type === 'image') {
|
|
content += `<img src="${attachment.data}" class="attachment" />`;
|
|
} else {
|
|
content += `<a href="${attachment.data}" class="attachment">Download attachment</a>`;
|
|
}
|
|
}
|
|
|
|
messageDiv.innerHTML = content;
|
|
messagesContainer.appendChild(messageDiv);
|
|
messagesContainer.scrollTop = messagesContainer.scrollHeight;
|
|
}
|
|
}
|
|
```
|
|
|
|
### Step 3: Create the Message Handler
|
|
|
|
```javascript
|
|
// src/chat-handler.js
|
|
const { smartreceive } = require('./NATSBridge');
|
|
|
|
class ChatHandler {
|
|
constructor(natsConnection) {
|
|
this.nats = natsConnection;
|
|
this.ui = new ChatUI();
|
|
}
|
|
|
|
async start() {
|
|
// Subscribe to chat rooms
|
|
const rooms = ['general', 'tech', 'random'];
|
|
|
|
for (const room of rooms) {
|
|
await this.nats.subscribe(`/chat/${room}`, async (msg) => {
|
|
await this.handleMessage(msg);
|
|
});
|
|
}
|
|
|
|
console.log('Chat handler started');
|
|
}
|
|
|
|
async handleMessage(msg) {
|
|
const envelope = await smartreceive(msg, {
|
|
fileserverDownloadHandler: this.downloadFile.bind(this)
|
|
});
|
|
|
|
// Extract sender info from envelope
|
|
const sender = envelope.senderName || 'Anonymous';
|
|
|
|
// Process each payload
|
|
for (const payload of envelope.payloads) {
|
|
if (payload.type === 'text') {
|
|
this.ui.addMessage(sender, payload.data);
|
|
} else if (payload.type === 'image') {
|
|
// Convert to data URL for display
|
|
const base64 = this.arrayBufferToBase64(payload.data);
|
|
this.ui.addMessage(sender, null, {
|
|
type: 'image',
|
|
data: `data:image/png;base64,${base64}`
|
|
});
|
|
} else {
|
|
// For other types, use file server URL
|
|
this.ui.addMessage(sender, null, {
|
|
type: payload.type,
|
|
data: payload.data
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
downloadFile(url, max_retries, base_delay, max_delay, correlation_id) {
|
|
return fetch(url)
|
|
.then(response => response.arrayBuffer());
|
|
}
|
|
|
|
arrayBufferToBase64(buffer) {
|
|
const bytes = new Uint8Array(buffer);
|
|
let binary = '';
|
|
for (let i = 0; i < bytes.length; i++) {
|
|
binary += String.fromCharCode(bytes[i]);
|
|
}
|
|
return btoa(binary);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Step 4: Run the Application
|
|
|
|
```bash
|
|
# Start NATS
|
|
docker run -p 4222:4222 nats:latest
|
|
|
|
# Start file server
|
|
mkdir -p /tmp/fileserver
|
|
python3 -m http.server 8080 --directory /tmp/fileserver
|
|
|
|
# Run chat app
|
|
node src/chat-ui.js
|
|
node src/chat-handler.js
|
|
```
|
|
|
|
---
|
|
|
|
## Building a File Transfer System
|
|
|
|
Let's build a file transfer system that handles large files efficiently.
|
|
|
|
### Step 1: File Upload Service
|
|
|
|
```javascript
|
|
// src/file-upload-service.js
|
|
const { smartsend } = require('./NATSBridge');
|
|
|
|
class FileUploadService {
|
|
constructor(natsUrl, fileserverUrl) {
|
|
this.natsUrl = natsUrl;
|
|
this.fileserverUrl = fileserverUrl;
|
|
}
|
|
|
|
async uploadFile(filePath, recipient) {
|
|
const fs = require('fs');
|
|
const fileData = fs.readFileSync(filePath);
|
|
const fileName = require('path').basename(filePath);
|
|
|
|
const data = [{
|
|
dataname: fileName,
|
|
data: fileData,
|
|
type: 'binary'
|
|
}];
|
|
|
|
const { env, env_json_str } = await smartsend(
|
|
`/files/${recipient}`,
|
|
data,
|
|
{
|
|
natsUrl: this.natsUrl,
|
|
fileserverUrl: this.fileserverUrl,
|
|
sizeThreshold: 1048576
|
|
}
|
|
);
|
|
|
|
return env;
|
|
}
|
|
|
|
async uploadLargeFile(filePath, recipient) {
|
|
// For very large files, stream upload
|
|
const fs = require('fs');
|
|
const fileSize = fs.statSync(filePath).size;
|
|
|
|
if (fileSize > 100 * 1024 * 1024) { // > 100MB
|
|
console.log('File too large for direct upload, using streaming...');
|
|
return await this.streamUpload(filePath, recipient);
|
|
}
|
|
|
|
return await this.uploadFile(filePath, recipient);
|
|
}
|
|
|
|
async streamUpload(filePath, recipient) {
|
|
// Implement streaming upload to file server
|
|
// This would require a more sophisticated file server
|
|
// For now, we'll use the standard upload
|
|
return await this.uploadFile(filePath, recipient);
|
|
}
|
|
}
|
|
|
|
module.exports = FileUploadService;
|
|
```
|
|
|
|
### Step 2: File Download Service
|
|
|
|
```javascript
|
|
// src/file-download-service.js
|
|
const { smartreceive } = require('./NATSBridge');
|
|
const fs = require('fs');
|
|
|
|
class FileDownloadService {
|
|
constructor(natsUrl) {
|
|
this.natsUrl = natsUrl;
|
|
this.downloads = new Map();
|
|
}
|
|
|
|
async downloadFile(sender, downloadId) {
|
|
// Subscribe to sender's file channel
|
|
const envelope = await smartreceive(msg, {
|
|
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
|
|
});
|
|
|
|
// Process each payload
|
|
for (const payload of envelope.payloads) {
|
|
if (payload.type === 'binary') {
|
|
const filePath = `/downloads/${payload.dataname}`;
|
|
fs.writeFileSync(filePath, payload.data);
|
|
console.log(`File saved to ${filePath}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
async fetchFromUrl(url, max_retries, base_delay, max_delay, correlation_id) {
|
|
const response = await fetch(url);
|
|
if (!response.ok) {
|
|
throw new Error(`Failed to fetch: ${response.status}`);
|
|
}
|
|
return await response.arrayBuffer();
|
|
}
|
|
}
|
|
|
|
module.exports = FileDownloadService;
|
|
```
|
|
|
|
### Step 3: File Transfer CLI
|
|
|
|
```javascript
|
|
// src/cli.js
|
|
const { smartsend, smartreceive } = require('./NATSBridge');
|
|
const fs = require('fs');
|
|
const readline = require('readline');
|
|
|
|
const rl = readline.createInterface({
|
|
input: process.stdin,
|
|
output: process.stdout
|
|
});
|
|
|
|
async function main() {
|
|
const config = JSON.parse(fs.readFileSync('config.json', 'utf8'));
|
|
|
|
console.log('File Transfer System');
|
|
console.log('====================');
|
|
console.log('1. Upload file');
|
|
console.log('2. Download file');
|
|
console.log('3. List pending downloads');
|
|
|
|
const choice = await rl.question('Enter choice: ');
|
|
|
|
if (choice === '1') {
|
|
await uploadFile(config);
|
|
} else if (choice === '2') {
|
|
await downloadFile(config);
|
|
}
|
|
|
|
rl.close();
|
|
}
|
|
|
|
async function uploadFile(config) {
|
|
const filePath = await rl.question('Enter file path: ');
|
|
const recipient = await rl.question('Enter recipient: ');
|
|
|
|
const fileService = new FileUploadService(config.nats_url, config.fileserver_url);
|
|
|
|
try {
|
|
const envelope = await fileService.uploadFile(filePath, recipient);
|
|
console.log('Upload successful!');
|
|
console.log(`File ID: ${envelope.payloads[0].id}`);
|
|
} catch (error) {
|
|
console.error('Upload failed:', error.message);
|
|
}
|
|
}
|
|
|
|
async function downloadFile(config) {
|
|
const sender = await rl.question('Enter sender: ');
|
|
const fileService = new FileDownloadService(config.nats_url);
|
|
|
|
try {
|
|
await fileService.downloadFile(sender);
|
|
console.log('Download complete!');
|
|
} catch (error) {
|
|
console.error('Download failed:', error.message);
|
|
}
|
|
}
|
|
|
|
main();
|
|
```
|
|
|
|
---
|
|
|
|
## Building a Streaming Data Pipeline
|
|
|
|
Let's build a data pipeline that processes streaming data from sensors.
|
|
|
|
### Step 1: Sensor Data Model
|
|
|
|
```python
|
|
# src/sensor_data.py
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any
|
|
import json
|
|
|
|
@dataclass
|
|
class SensorReading:
|
|
sensor_id: str
|
|
timestamp: str
|
|
value: float
|
|
unit: str
|
|
metadata: Dict[str, Any] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"sensor_id": self.sensor_id,
|
|
"timestamp": self.timestamp,
|
|
"value": self.value,
|
|
"unit": self.unit,
|
|
"metadata": self.metadata or {}
|
|
}
|
|
|
|
class SensorBatch:
|
|
def __init__(self):
|
|
self.readings: List[SensorReading] = []
|
|
|
|
def add_reading(self, reading: SensorReading):
|
|
self.readings.append(reading)
|
|
|
|
def to_dataframe(self):
|
|
import pandas as pd
|
|
data = [r.to_dict() for r in self.readings]
|
|
return pd.DataFrame(data)
|
|
```
|
|
|
|
### Step 2: Sensor Sender
|
|
|
|
```python
|
|
# src/sensor-sender.py
|
|
from nats_bridge import smartsend
|
|
from sensor_data import SensorReading, SensorBatch
|
|
import time
|
|
import random
|
|
|
|
class SensorSender:
|
|
def __init__(self, nats_url: str, fileserver_url: str):
|
|
self.nats_url = nats_url
|
|
self.fileserver_url = fileserver_url
|
|
|
|
def send_reading(self, sensor_id: str, value: float, unit: str):
|
|
reading = SensorReading(
|
|
sensor_id=sensor_id,
|
|
timestamp=datetime.now().isoformat(),
|
|
value=value,
|
|
unit=unit
|
|
)
|
|
|
|
data = [("reading", reading.to_dict(), "dictionary")]
|
|
|
|
# Default: is_publish=True (automatically publishes to NATS)
|
|
smartsend(
|
|
f"/sensors/{sensor_id}",
|
|
data,
|
|
nats_url=self.nats_url,
|
|
fileserver_url=self.fileserver_url
|
|
)
|
|
|
|
def prepare_message_only(self, sensor_id: str, value: float, unit: str):
|
|
"""Prepare a message without publishing (is_publish=False)."""
|
|
reading = SensorReading(
|
|
sensor_id=sensor_id,
|
|
timestamp=datetime.now().isoformat(),
|
|
value=value,
|
|
unit=unit
|
|
)
|
|
|
|
data = [("reading", reading.to_dict(), "dictionary")]
|
|
|
|
# With is_publish=False, returns (envelope, json_str) without publishing
|
|
env, env_json_str = smartsend(
|
|
f"/sensors/{sensor_id}/prepare",
|
|
data,
|
|
nats_url=self.nats_url,
|
|
fileserver_url=self.fileserver_url,
|
|
is_publish=False
|
|
)
|
|
|
|
# Now you can publish manually using NATS request-reply pattern
|
|
# nc.request(subject, env_json_str, reply_to=reply_to_topic)
|
|
|
|
return env, env_json_str
|
|
|
|
def send_batch(self, readings: List[SensorReading]):
|
|
batch = SensorBatch()
|
|
for reading in readings:
|
|
batch.add_reading(reading)
|
|
|
|
df = batch.to_dataframe()
|
|
|
|
# Convert to Arrow IPC format
|
|
import pyarrow as pa
|
|
table = pa.Table.from_pandas(df)
|
|
|
|
# Serialize to Arrow IPC
|
|
import io
|
|
buf = io.BytesIO()
|
|
with pa.ipc.new_stream(buf, table.schema) as writer:
|
|
writer.write_table(table)
|
|
|
|
arrow_data = buf.getvalue()
|
|
|
|
# Send based on size
|
|
if len(arrow_data) < 1048576: # < 1MB
|
|
data = [("batch", arrow_data, "table")]
|
|
smartsend(
|
|
f"/sensors/batch",
|
|
data,
|
|
nats_url=self.nats_url,
|
|
fileserver_url=self.fileserver_url
|
|
)
|
|
else:
|
|
# Upload to file server
|
|
from nats_bridge import plik_oneshot_upload
|
|
response = plik_oneshot_upload(self.fileserver_url, "batch.arrow", arrow_data)
|
|
print(f"Uploaded batch to {response['url']}")
|
|
```
|
|
|
|
### Step 3: Sensor Receiver
|
|
|
|
```python
|
|
# src/sensor-receiver.py
|
|
from nats_bridge import smartreceive
|
|
from sensor_data import SensorReading
|
|
import pandas as pd
|
|
import pyarrow as pa
|
|
import io
|
|
|
|
class SensorReceiver:
|
|
def __init__(self, fileserver_download_handler):
|
|
self.fileserver_download_handler = fileserver_download_handler
|
|
|
|
def process_reading(self, msg):
|
|
envelope = smartreceive(msg, self.fileserver_download_handler)
|
|
|
|
for dataname, data, data_type in envelope["payloads"]:
|
|
if data_type == "dictionary":
|
|
reading = SensorReading(
|
|
sensor_id=data["sensor_id"],
|
|
timestamp=data["timestamp"],
|
|
value=data["value"],
|
|
unit=data["unit"],
|
|
metadata=data.get("metadata", {})
|
|
)
|
|
print(f"Received: {reading}")
|
|
|
|
elif data_type == "table":
|
|
# Deserialize Arrow IPC
|
|
table = pa.ipc.open_stream(io.BytesIO(data)).read_all()
|
|
df = table.to_pandas()
|
|
print(f"Received batch with {len(df)} readings")
|
|
print(df)
|
|
```
|
|
|
|
---
|
|
|
|
## Building a Micropython IoT Device
|
|
|
|
Let's build an IoT device using Micropython that connects to NATS.
|
|
|
|
### Step 1: Device Configuration
|
|
|
|
```python
|
|
# device_config.py
|
|
import json
|
|
|
|
class DeviceConfig:
|
|
def __init__(self, ssid, password, nats_url, device_id):
|
|
self.ssid = ssid
|
|
self.password = password
|
|
self.nats_url = nats_url
|
|
self.device_id = device_id
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"ssid": self.ssid,
|
|
"password": self.password,
|
|
"nats_url": self.nats_url,
|
|
"device_id": self.device_id
|
|
}
|
|
```
|
|
|
|
### Step 2: Device Bridge
|
|
|
|
```python
|
|
# device_bridge.py
|
|
from nats_bridge import smartsend, smartreceive
|
|
import json
|
|
|
|
class DeviceBridge:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.nats_url = config.nats_url
|
|
|
|
def connect(self):
|
|
# Connect to WiFi
|
|
import network
|
|
wlan = network.WLAN(network.STA_IF)
|
|
wlan.active(True)
|
|
wlan.connect(self.config.ssid, self.config.password)
|
|
|
|
while not wlan.isconnected():
|
|
pass
|
|
|
|
print('Connected to WiFi')
|
|
print('Network config:', wlan.ifconfig())
|
|
|
|
def send_status(self, status):
|
|
data = [("status", status, "dictionary")]
|
|
smartsend(
|
|
f"/devices/{self.config.device_id}/status",
|
|
data,
|
|
nats_url=self.nats_url
|
|
)
|
|
|
|
def send_sensor_data(self, sensor_id, value, unit):
|
|
data = [
|
|
("sensor_id", sensor_id, "text"),
|
|
("value", value, "dictionary")
|
|
]
|
|
smartsend(
|
|
f"/devices/{self.config.device_id}/sensors/{sensor_id}",
|
|
data,
|
|
nats_url=self.nats_url
|
|
)
|
|
|
|
def receive_commands(self, callback):
|
|
# Subscribe to commands
|
|
import socket
|
|
# Simplified subscription - in production, use proper NATS client
|
|
|
|
while True:
|
|
# Poll for messages
|
|
msg = self._poll_for_message()
|
|
if msg:
|
|
envelope = smartreceive(msg)
|
|
|
|
# Process payloads
|
|
for dataname, data, data_type in envelope["payloads"]:
|
|
if dataname == "command":
|
|
callback(data)
|
|
|
|
def _poll_for_message(self):
|
|
# Simplified message polling
|
|
# In production, implement proper NATS subscription
|
|
return None
|
|
```
|
|
|
|
### Step 3: Device Application
|
|
|
|
```python
|
|
# device_app.py
|
|
from device_config import DeviceConfig
|
|
from device_bridge import DeviceBridge
|
|
import time
|
|
import random
|
|
|
|
# Load configuration
|
|
config = DeviceConfig(
|
|
ssid="MyNetwork",
|
|
password="password123",
|
|
nats_url="nats://localhost:4222",
|
|
device_id="device-001"
|
|
)
|
|
|
|
bridge = DeviceBridge(config)
|
|
bridge.connect()
|
|
|
|
# Send initial status
|
|
bridge.send_status({
|
|
"status": "online",
|
|
"version": "1.0.0",
|
|
"uptime": 0
|
|
})
|
|
|
|
# Main loop
|
|
while True:
|
|
# Read sensor data
|
|
temperature = random.uniform(20, 30)
|
|
humidity = random.uniform(40, 60)
|
|
|
|
bridge.send_sensor_data("temperature", temperature, "celsius")
|
|
bridge.send_sensor_data("humidity", humidity, "percent")
|
|
|
|
# Send status update
|
|
bridge.send_status({
|
|
"status": "online",
|
|
"temperature": temperature,
|
|
"humidity": humidity
|
|
})
|
|
|
|
time.sleep(60) # Every minute
|
|
```
|
|
|
|
---
|
|
|
|
## Building a Cross-Platform Dashboard
|
|
|
|
Let's build a dashboard that displays data from multiple platforms.
|
|
|
|
### Step 1: Dashboard Server (Python)
|
|
|
|
```python
|
|
# src/dashboard-server.py
|
|
from nats_bridge import smartsend, smartreceive
|
|
import pandas as pd
|
|
import pyarrow as pa
|
|
import io
|
|
|
|
class DashboardServer:
|
|
def __init__(self, nats_url, fileserver_url):
|
|
self.nats_url = nats_url
|
|
self.fileserver_url = fileserver_url
|
|
|
|
def broadcast_data(self, df):
|
|
# Convert to Arrow IPC
|
|
table = pa.Table.from_pandas(df)
|
|
buf = io.BytesIO()
|
|
with pa.ipc.new_stream(buf, table.schema) as writer:
|
|
writer.write_table(table)
|
|
|
|
arrow_data = buf.getvalue()
|
|
|
|
# Broadcast to all subscribers
|
|
data = [("data", arrow_data, "table")]
|
|
smartsend(
|
|
"/dashboard/data",
|
|
data,
|
|
nats_url=self.nats_url,
|
|
fileserver_url=self.fileserver_url
|
|
)
|
|
|
|
def receive_selection(self, callback):
|
|
def handler(msg):
|
|
envelope = smartreceive(msg)
|
|
|
|
for dataname, data, data_type in envelope["payloads"]:
|
|
if data_type == "dictionary":
|
|
callback(data)
|
|
|
|
# Subscribe to selections
|
|
import threading
|
|
thread = threading.Thread(target=self._listen_for_selections, args=(handler,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def _listen_for_selections(self, handler):
|
|
# Simplified subscription
|
|
# In production, implement proper NATS subscription
|
|
pass
|
|
```
|
|
|
|
### Step 2: Dashboard UI (JavaScript)
|
|
|
|
```javascript
|
|
// src/dashboard-ui.js
|
|
class DashboardUI {
|
|
constructor() {
|
|
this.data = null;
|
|
this.setupEventListeners();
|
|
}
|
|
|
|
setupEventListeners() {
|
|
document.getElementById('refresh-btn').addEventListener('click', () => this.refreshData());
|
|
document.getElementById('export-btn').addEventListener('click', () => this.exportData());
|
|
}
|
|
|
|
async refreshData() {
|
|
// Request fresh data
|
|
const { env, env_json_str } = await smartsend("/dashboard/request", [
|
|
{ dataname: "request", data: { type: "refresh" }, type: "dictionary" }
|
|
], {
|
|
fileserverUrl: window.config.fileserver_url
|
|
});
|
|
}
|
|
|
|
async fetchData() {
|
|
// Subscribe to data updates
|
|
const envelope = await smartreceive(msg, {
|
|
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
|
|
});
|
|
|
|
// Process table data
|
|
for (const payload of envelope.payloads) {
|
|
if (payload.type === 'table') {
|
|
// Deserialize Arrow IPC
|
|
this.data = this.deserializeArrow(payload.data);
|
|
this.renderTable();
|
|
}
|
|
}
|
|
}
|
|
|
|
deserializeArrow(data) {
|
|
// Deserialize Arrow IPC to JavaScript array
|
|
// This would require the apache-arrow library
|
|
return JSON.parse(JSON.stringify(data)); // Simplified
|
|
}
|
|
|
|
renderTable() {
|
|
const tableContainer = document.getElementById('data-table');
|
|
tableContainer.innerHTML = '';
|
|
|
|
if (!this.data) return;
|
|
|
|
// Render table headers
|
|
const headers = Object.keys(this.data[0]);
|
|
const thead = document.createElement('thead');
|
|
const headerRow = document.createElement('tr');
|
|
|
|
headers.forEach(header => {
|
|
const th = document.createElement('th');
|
|
th.textContent = header;
|
|
headerRow.appendChild(th);
|
|
});
|
|
|
|
thead.appendChild(headerRow);
|
|
tableContainer.appendChild(thead);
|
|
|
|
// Render table rows
|
|
const tbody = document.createElement('tbody');
|
|
|
|
this.data.forEach(row => {
|
|
const tr = document.createElement('tr');
|
|
headers.forEach(header => {
|
|
const td = document.createElement('td');
|
|
td.textContent = row[header];
|
|
tr.appendChild(td);
|
|
});
|
|
tbody.appendChild(tr);
|
|
});
|
|
|
|
tableContainer.appendChild(tbody);
|
|
}
|
|
|
|
exportData() {
|
|
const csv = this.toCSV();
|
|
const blob = new Blob([csv], { type: 'text/csv' });
|
|
const url = URL.createObjectURL(blob);
|
|
|
|
const a = document.createElement('a');
|
|
a.href = url;
|
|
a.download = 'dashboard_data.csv';
|
|
a.click();
|
|
}
|
|
|
|
toCSV() {
|
|
if (!this.data) return '';
|
|
|
|
const headers = Object.keys(this.data[0]);
|
|
const rows = this.data.map(row =>
|
|
headers.map(h => row[h]).join(',')
|
|
);
|
|
|
|
return [headers.join(','), ...rows].join('\n');
|
|
}
|
|
|
|
fetchFromUrl(url) {
|
|
return fetch(url)
|
|
.then(response => response.arrayBuffer());
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Performance Optimization
|
|
|
|
### 1. Batch Processing
|
|
|
|
```python
|
|
# Batch multiple readings into a single message
|
|
def send_batch_readings(self, readings):
|
|
batch = SensorBatch()
|
|
for reading in readings:
|
|
batch.add_reading(reading)
|
|
|
|
df = batch.to_dataframe()
|
|
|
|
# Convert to Arrow IPC
|
|
table = pa.Table.from_pandas(df)
|
|
buf = io.BytesIO()
|
|
with pa.ipc.new_stream(buf, table.schema) as writer:
|
|
writer.write_table(table)
|
|
|
|
arrow_data = buf.getvalue()
|
|
|
|
# Send as single message
|
|
smartsend(
|
|
"/sensors/batch",
|
|
[("batch", arrow_data, "table")],
|
|
nats_url=self.nats_url
|
|
)
|
|
```
|
|
|
|
### 2. Connection Pooling
|
|
|
|
```javascript
|
|
// Reuse NATS connections
|
|
const nats = require('nats');
|
|
|
|
class ConnectionPool {
|
|
constructor() {
|
|
this.connections = new Map();
|
|
}
|
|
|
|
getConnection(natsUrl) {
|
|
if (!this.connections.has(natsUrl)) {
|
|
this.connections.set(natsUrl, nats.connect({ servers: [natsUrl] }));
|
|
}
|
|
return this.connections.get(natsUrl);
|
|
}
|
|
|
|
closeAll() {
|
|
this.connections.forEach(conn => conn.close());
|
|
this.connections.clear();
|
|
}
|
|
}
|
|
```
|
|
|
|
### 3. Caching
|
|
|
|
```python
|
|
# Cache file server responses
|
|
from functools import lru_cache
|
|
|
|
@lru_cache(maxsize=100)
|
|
def fetch_with_caching(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""):
|
|
return _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id)
|
|
```
|
|
|
|
---
|
|
|
|
## Best Practices
|
|
|
|
### 1. Error Handling
|
|
|
|
```python
|
|
def safe_smartsend(subject, data, **kwargs):
|
|
try:
|
|
return smartsend(subject, data, **kwargs)
|
|
except Exception as e:
|
|
print(f"Failed to send message: {e}")
|
|
return None
|
|
```
|
|
|
|
### 2. Logging
|
|
|
|
```python
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def log_send(subject, data, correlation_id):
|
|
logger.info(f"Sending to {subject}: {len(data)} payloads, correlation_id={correlation_id}")
|
|
|
|
def log_receive(correlation_id, num_payloads):
|
|
logger.info(f"Received message: {num_payloads} payloads, correlation_id={correlation_id}")
|
|
```
|
|
|
|
### 3. Rate Limiting
|
|
|
|
```python
|
|
import time
|
|
from collections import deque
|
|
|
|
class RateLimiter:
|
|
def __init__(self, max_requests, time_window):
|
|
self.max_requests = max_requests
|
|
self.time_window = time_window
|
|
self.requests = deque()
|
|
|
|
def allow(self):
|
|
now = time.time()
|
|
|
|
# Remove old requests
|
|
while self.requests and self.requests[0] < now - self.time_window:
|
|
self.requests.popleft()
|
|
|
|
if len(self.requests) >= self.max_requests:
|
|
return False
|
|
|
|
self.requests.append(now)
|
|
return True
|
|
```
|
|
|
|
---
|
|
|
|
## Conclusion
|
|
|
|
This walkthrough covered:
|
|
|
|
- Building a chat application with rich media support
|
|
- Building a file transfer system with claim-check pattern
|
|
- Building a streaming data pipeline for sensor data
|
|
- Building a Micropython IoT device
|
|
- Building a cross-platform dashboard
|
|
|
|
For more information, check the [API documentation](../src/README.md) and [test examples](../test/).
|
|
|
|
---
|
|
|
|
## License
|
|
|
|
MIT |