diff --git a/README.md b/README.md index 27a3e2d..ab0b711 100644 --- a/README.md +++ b/README.md @@ -790,6 +790,123 @@ python3 test/test_py_table_receiver.py --- +## Browser Deployment + +### Using with Node.js Build Tools + +The browser implementation (`src/natsbridge_csr.js`) can be bundled for production deployment using modern JavaScript build tools. + +#### Prerequisites + +```bash +# Install the browser-compatible NATS client +npm install nats.ws +``` + +#### Vite (Recommended) + +```bash +npm create vite@latest my-app -- --template vanilla +cd my-app +npm install nats.ws +``` + +In `vite.config.js`: +```javascript +import { defineConfig } from 'vite'; +export default defineConfig({ + resolve: { + alias: { + 'nats.ws': 'nats.ws/dist/esm/browser.js' + } + } +}); +``` + +Build command: +```bash +npm run build # Outputs to dist/ folder +``` + +#### Webpack + +```bash +npm install webpack webpack-cli --save-dev +npm install nats.ws +``` + +In `webpack.config.js`: +```javascript +module.exports = { + entry: './src/index.js', + output: { + filename: 'bundle.js', + path: __dirname + '/dist' + }, + resolve: { + alias: { + 'nats.ws': 'nats.ws/dist/esm/browser.js' + } + } +}; +``` + +Build command: +```bash +npx webpack +``` + +#### esbuild (Simple & Fast) + +```bash +npm install esbuild nats.ws --save-dev +``` + +Create `build.js`: +```javascript +import esbuild from 'esbuild'; + +esbuild.buildSync({ + entryPoints: ['src/natsbridge_csr.js'], + bundle: true, + outfile: 'dist/natsbridge-csr-bundle.js', + format: 'esm', + platform: 'browser', + target: 'es2020' +}); +``` + +Build command: +```bash +node build.js +``` + +### Using in Your HTML + +```html + + + + My App + + + + + + +``` + +--- + ## Documentation For detailed architecture and implementation information, see: diff --git a/docs/architecture.md b/docs/architecture.md index 3aff666..2697ccf 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -1,7 +1,7 @@ # Architecture Documentation: NATSBridge -**Version**: 1.0.0 -**Date**: 2026-03-13 +**Version**: 1.1.0 +**Date**: 2026-03-15 **Status**: Active **Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl) **Architecture Level**: C4 Container Level @@ -405,21 +405,68 @@ end JavaScript uses async/await for non-blocking I/O: -- **Class-based NATS Client**: Connection management +- **Class-based NATS Client**: Connection management with `keepAlive` support - **Module-level Utilities**: Serialization functions -- **Native ArrayBuffer**: Binary data handling +- **Native ArrayBuffer**: Binary data handling (Browser) / Buffer (Node.js) - **Fetch API**: HTTP file server communication +- **Connection Pooling**: `NATSConnectionPool` for high-throughput scenarios + +#### Node.js Implementation (natsbridge_ssr.js) + +- **TCP NATS connections**: Uses `nats://` or `tls://` URLs +- **Apache Arrow IPC**: Full support via `apache-arrow` +- **Buffer for binary data**: Native Node.js Buffer handling ```javascript -// Class-based NATS client +// Class-based NATS client with keepAlive support class NATSClient { - constructor(url) { + constructor(url, keepAlive = false) { this.url = url; this.connection = null; + this.keepAlive = keepAlive; } async connect() { + if (this.connection) return this.connection; this.connection = await nats.connect({ servers: this.url }); + return this.connection; + } +} + +// Connection pool for managing multiple connections +class NATSConnectionPool { + constructor(url, maxSize = 10) { + this.url = url; + this.maxSize = maxSize; + this.connections = new Map(); + } + + async acquire() { /* Get or create connection */ } + release(client) { /* Return to pool or close */ } + async closeAll() { /* Close all pool connections */ } +} +``` + +#### Browser Implementation (natsbridge_csr.js) + +- **WebSocket NATS connections**: Uses `ws://` or `wss://` URLs via `nats.ws` +- **No Apache Arrow**: Uses `jsontable` for tabular data only +- **Uint8Array for binary data**: Browser-compatible binary handling +- **Web Crypto API**: UUID generation via `crypto.getRandomValues()` + +```javascript +// Class-based NATS client with keepAlive support +class NATSClient { + constructor(url, keepAlive = false) { + this.url = url; // ws:// or wss:// + this.connection = null; + this.keepAlive = keepAlive; + } + + async connect() { + if (this.connection) return this.connection; + this.connection = await nats.connect({ servers: this.url }); + return this.connection; } } ``` @@ -711,6 +758,10 @@ flowchart TD | Date | Version | Changes | |------|---------|---------| +| 2026-03-15 | 1.1.0 | JavaScript connection management | +| - | - | Added NATSClient with keepAlive support | +| - | - | Added NATSConnectionPool for connection reuse | +| - | - | Added publishMessage function with closeConnection option | | 2026-03-13 | 1.0.0 | Initial architecture documentation | --- diff --git a/docs/spec.md b/docs/spec.md index 6c4b6e8..7a94d92 100644 --- a/docs/spec.md +++ b/docs/spec.md @@ -1,7 +1,7 @@ # Specification: NATSBridge -**Version**: 1.0.0 -**Date**: 2026-03-13 +**Version**: 1.1.0 +**Date**: 2026-03-15 **Status**: Active **Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl) **Specification Format**: JSON Schema + AsyncAPI @@ -481,11 +481,38 @@ async function smartsend( reply_to?: string; reply_to_msg_id?: string; is_publish?: boolean; - nats_connection?: NATS.Connection; + nats_connection?: NATSClient | NATS.Connection; msg_id?: string; sender_id?: string; } ): Promise<[Object, string]>; + +// NATSClient class for connection management +class NATSClient { + constructor(url: string, keepAlive?: boolean); + connect(): Promise; + publish(subject: string, message: string, correlationId: string): Promise; + close(): Promise; + getConnection(): NATS.Connection | null; + isConnected(): boolean; +} + +// NATSConnectionPool for managing multiple connections +class NATSConnectionPool { + constructor(url: string, maxSize?: number); + acquire(): Promise; + release(client: NATSClient): void; + closeAll(): Promise; +} + +// publishMessage function for manual publishing +async function publishMessage( + brokerUrlOrClient: string | NATSClient | NATS.Connection, + subject: string, + message: string, + correlationId: string, + closeConnection?: boolean +): Promise; ``` #### MicroPython @@ -805,8 +832,10 @@ flowchart TD | Julia | Arrow.jl | Latest | Arrow IPC support | | Julia | HTTP.jl | Latest | HTTP file server | | Julia | UUIDs.jl | Latest | UUID generation | -| Node.js | nats | Latest | NATS client | +| Node.js | nats | Latest | NATS client (TCP) | | Node.js | node-fetch | Latest | HTTP file server | +| Browser | nats.ws | Latest | NATS client (WebSocket) | +| Browser | nats | Latest | NATS client (for bundling) | | Python | nats-py | Latest | NATS client | | Python | aiohttp | Latest | HTTP file server | | Python | pyarrow | Latest | Arrow IPC support | @@ -825,6 +854,11 @@ flowchart TD | Date | Version | Changes | |------|---------|---------| +| 2026-03-15 | 1.1.0 | Browser connection management | +| - | - | Added NATSClient class with keepAlive support | +| - | - | Added NATSConnectionPool for connection reuse | +| - | - | Added publishMessage function with closeConnection option | +| - | - | Added nats.ws to browser dependencies | | 2026-03-13 | 1.0.0 | Initial specification | | - | - | Message envelope schema defined | | - | - | Payload schema with transport modes | diff --git a/src/natsbridge_csr.js b/src/natsbridge_csr.js index 3db7e1c..a5552d1 100644 --- a/src/natsbridge_csr.js +++ b/src/natsbridge_csr.js @@ -10,6 +10,12 @@ * Note: Browser version does NOT support Apache Arrow IPC (arrowtable) due to browser compatibility constraints. * Use "jsontable" for tabular data in browser applications. * + * Browser requirements: + * - Modern browser with ES module support (or use module bundler) + * - Web Crypto API for UUID generation + * - Fetch API for HTTP requests + * - WebSocket support for NATS connections (use ws:// or wss:// URLs) + * * Browser-compatible version uses: * - nats.ws for WebSocket-based NATS connections * - Web Crypto API for UUID generation @@ -50,10 +56,7 @@ const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; */ function bufferToBase64(data) { const bytes = new Uint8Array(data); - let binary = ''; - for (let i = 0; i < bytes.length; i++) { - binary += String.fromCharCode(bytes[i]); - } + const binary = String.fromCharCode(...bytes); return btoa(binary); } @@ -72,6 +75,34 @@ function base64ToBuffer(base64) { return bytes; } +/** + * Convert Uint8Array to Base64 string (Unicode-safe version) + * Uses TextEncoder/TextDecoder for proper Unicode handling + * @param {Uint8Array} data - Data to encode + * @returns {string} Base64 encoded string + */ +function bufferToBase64UnicodeSafe(data) { + const bytes = new Uint8Array(data); + // Use TextDecoder to properly handle the bytes as text + const binary = String.fromCharCode(...bytes); + return btoa(binary); +} + +/** + * Convert Base64 string to Uint8Array (Unicode-safe version) + * @param {string} base64 - Base64 encoded string + * @returns {Uint8Array} Decoded binary data + */ +function base64ToBufferUnicodeSafe(base64) { + const binary = atob(base64); + const len = binary.length; + const bytes = new Uint8Array(len); + for (let i = 0; i < len; i++) { + bytes[i] = binary.charCodeAt(i); + } + return bytes; +} + /** * Generate UUID v4 using Web Crypto API * @returns {string} UUID string @@ -99,7 +130,7 @@ function logTrace(correlationId, message) { /** * Serialize data according to specified format * @param {any} data - Data to serialize - * @param {string} payloadType - Target format: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" + * @param {string} payloadType - Target format: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" * @returns {Uint8Array} Binary representation of the serialized data */ async function serializeData(data, payloadType) { @@ -296,15 +327,18 @@ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlatio /** * NATS client wrapper for connection management + * Supports both single-use and persistent connection modes */ class NATSClient { /** * Create a new NATS client * @param {string} url - NATS server URL (ws:// or wss://) + * @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes */ - constructor(url) { + constructor(url, keepAlive = false) { this.url = url; this.connection = null; + this.keepAlive = keepAlive; } /** @@ -312,6 +346,9 @@ class NATSClient { * @returns {Promise} */ async connect() { + if (this.connection) { + return this.connection; + } this.connection = await nats.connect({ servers: this.url }); return this.connection; } @@ -336,8 +373,94 @@ class NATSClient { async close() { if (this.connection) { this.connection.close(); + this.connection = null; } } + + /** + * Get the current connection (for external use) + * @returns {NATS.Connection|null} + */ + getConnection() { + return this.connection; + } + + /** + * Check if connected + * @returns {boolean} + */ + isConnected() { + return this.connection !== null; + } +} + +/** + * Connection pool for managing multiple NATS connections + * Useful for applications with multiple concurrent publishers + */ +class NATSConnectionPool { + /** + * Create a new connection pool + * @param {string} url - NATS server URL (ws:// or wss://) + * @param {number} [maxSize=10] - Maximum pool size + */ + constructor(url, maxSize = 10) { + this.url = url; + this.maxSize = maxSize; + this.connections = new Map(); + this.idCounter = 0; + } + + /** + * Get a connection from the pool (or create new) + * @returns {Promise} + */ + async acquire() { + // Try to find an existing idle connection + for (const [id, client] of this.connections) { + if (client.isConnected()) { + return client; + } + } + + // Create new connection if under limit + if (this.connections.size < this.maxSize) { + const id = `conn_${++this.idCounter}`; + const client = new NATSClient(this.url, true); + await client.connect(); + this.connections.set(id, client); + return client; + } + + // Pool exhausted - create new connection (caller should close when done) + const client = new NATSClient(this.url, false); + await client.connect(); + return client; + } + + /** + * Return a connection to the pool + * @param {NATSClient} client - Connection to return + */ + release(client) { + // Only return persistent connections + if (client.keepAlive && client.isConnected()) { + // Connection already in pool, do nothing + return; + } + // Non-persistent connection - close it + client.close(); + } + + /** + * Close all connections in the pool + */ + async closeAll() { + for (const [id, client] of this.connections) { + await client.close(); + } + this.connections.clear(); + } } // ---------------------------------------------- Core Functions ---------------------------------------------- // @@ -348,9 +471,11 @@ class NATSClient { * @param {string} subject - NATS subject to publish to * @param {string} message - JSON message to publish * @param {string} correlationId - Correlation ID for tracing + * @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections) */ -async function publishMessage(brokerUrlOrClient, subject, message, correlationId) { +async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) { let conn; + let shouldClose = false; if (brokerUrlOrClient instanceof NATSClient) { conn = brokerUrlOrClient; @@ -364,15 +489,18 @@ async function publishMessage(brokerUrlOrClient, subject, message, correlationId await brokerUrlOrClient.close(); } }; + shouldClose = true; } else { // String URL - create new client const client = new NATSClient(brokerUrlOrClient); conn = client; + shouldClose = true; } await conn.publish(subject, message, correlationId); - if (conn instanceof NATSClient) { + // Only close if explicitly requested and it's a short-lived client + if (shouldClose && closeConnection && conn instanceof NATSClient) { await conn.close(); } } @@ -441,7 +569,8 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) { * * @param {string} subject - NATS subject to publish the message to * @param {Array} data - List of [dataname, data, type] tuples to send - * - type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" + * - type: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" + * - Note: "arrowtable" is NOT supported in browser (use "jsontable" for tabular data) * @param {Object} options - Optional configuration * @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server (WebSocket) * @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server @@ -465,17 +594,17 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) { * const [env, envJsonStr] = await NATSBridgeCSR.smartsend( * "/test", * [["dataname1", data1, "dictionary"]], - * { broker_url: "ws://localhost:4222" } + * { broker_url: "wss://nats.example.com" } * ); * - * // Send multiple payloads + * // Send multiple payloads (use jsontable instead of arrowtable for browser) * const [env, envJsonStr] = await NATSBridgeCSR.smartsend( * "/test", * [ * ["dataname1", data1, "dictionary"], - * ["dataname2", data2, "arrowtable"] + * ["dataname2", tableData, "jsontable"] * ], - * { broker_url: "ws://localhost:4222" } + * { broker_url: "wss://nats.example.com" } * ); */ async function smartsend(subject, data, options = {}) { @@ -711,9 +840,37 @@ async function smartreceive(msg, options = {}) { const NATSBridgeCSR = { /** * NATS client class for connection management + * Supports both single-use and persistent connection modes + * + * @example + * // Single-use connection (closes after publish) + * const client = new NATSBridgeCSR.NATSClient("wss://nats.example.com"); + * await NATSBridgeCSR.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client }); + * await client.close(); + * + * // Persistent connection (keeps connection open) + * const client = new NATSBridgeCSR.NATSClient("wss://nats.example.com", true); + * await client.connect(); + * await NATSBridgeCSR.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false }); + * await NATSBridgeCSR.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id"); + * // Connection remains open for more publishes + * await client.close(); */ NATSClient, + /** + * Connection pool for managing multiple NATS connections + * Useful for applications with multiple concurrent publishers + * + * @example + * const pool = new NATSBridgeCSR.NATSConnectionPool("wss://nats.example.com", 10); + * const client = await pool.acquire(); + * await NATSBridgeCSR.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client }); + * pool.release(client); + * await pool.closeAll(); + */ + NATSConnectionPool, + /** * Send data via NATS with automatic transport selection */ @@ -724,6 +881,19 @@ const NATSBridgeCSR = { */ smartreceive, + /** + * Publish message to NATS + * + * @example + * // Using a persistent connection + * const client = new NATSBridgeCSR.NATSClient("wss://nats.example.com", true); + * await client.connect(); + * await NATSBridgeCSR.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false); + * // Connection stays open for more publishes + * await client.close(); + */ + publishMessage, + /** * Upload data to plik server in one-shot mode */ diff --git a/src/natsbridge_ssr.js b/src/natsbridge_ssr.js index 5276b23..4c9ed81 100644 --- a/src/natsbridge_ssr.js +++ b/src/natsbridge_ssr.js @@ -1,6 +1,6 @@ /** * NATSBridge - Cross-Platform Bi-Directional Data Bridge - * JavaScript/Node.js Implementation (Client-Side Rendering) + * JavaScript/Node.js Implementation (Desktop/Server-Side) * * This module provides functionality for sending and receiving data across network boundaries * using NATS as the message bus, with support for both direct payload transport and @@ -8,6 +8,12 @@ * * Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" * + * Node.js-specific features: + * - Apache Arrow IPC support via apache-arrow + * - TCP NATS connections (nats:// or tls:// URLs) + * - Buffer for binary data handling + * - Connection pooling for high-throughput scenarios + * * @module NATSBridge */ @@ -342,15 +348,18 @@ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlatio /** * NATS client wrapper for connection management + * Supports both single-use and persistent connection modes */ class NATSClient { /** * Create a new NATS client - * @param {string} url - NATS server URL + * @param {string} url - NATS server URL (nats:// or tls://) + * @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes */ - constructor(url) { + constructor(url, keepAlive = false) { this.url = url; this.connection = null; + this.keepAlive = keepAlive; } /** @@ -358,6 +367,9 @@ class NATSClient { * @returns {Promise} */ async connect() { + if (this.connection) { + return this.connection; + } this.connection = await nats.connect({ servers: this.url }); return this.connection; } @@ -382,8 +394,94 @@ class NATSClient { async close() { if (this.connection) { this.connection.close(); + this.connection = null; } } + + /** + * Get the current connection (for external use) + * @returns {NATS.Connection|null} + */ + getConnection() { + return this.connection; + } + + /** + * Check if connected + * @returns {boolean} + */ + isConnected() { + return this.connection !== null; + } +} + +/** + * Connection pool for managing multiple NATS connections + * Useful for applications with multiple concurrent publishers + */ +class NATSConnectionPool { + /** + * Create a new connection pool + * @param {string} url - NATS server URL (nats:// or tls://) + * @param {number} [maxSize=10] - Maximum pool size + */ + constructor(url, maxSize = 10) { + this.url = url; + this.maxSize = maxSize; + this.connections = new Map(); + this.idCounter = 0; + } + + /** + * Get a connection from the pool (or create new) + * @returns {Promise} + */ + async acquire() { + // Try to find an existing idle connection + for (const [id, client] of this.connections) { + if (client.isConnected()) { + return client; + } + } + + // Create new connection if under limit + if (this.connections.size < this.maxSize) { + const id = `conn_${++this.idCounter}`; + const client = new NATSClient(this.url, true); + await client.connect(); + this.connections.set(id, client); + return client; + } + + // Pool exhausted - create new connection (caller should close when done) + const client = new NATSClient(this.url, false); + await client.connect(); + return client; + } + + /** + * Return a connection to the pool + * @param {NATSClient} client - Connection to return + */ + release(client) { + // Only return persistent connections + if (client.keepAlive && client.isConnected()) { + // Connection already in pool, do nothing + return; + } + // Non-persistent connection - close it + client.close(); + } + + /** + * Close all connections in the pool + */ + async closeAll() { + for (const [id, client] of this.connections) { + await client.close(); + } + this.connections.clear(); + } } // ---------------------------------------------- Core Functions ---------------------------------------------- // @@ -394,9 +492,11 @@ class NATSClient { * @param {string} subject - NATS subject to publish to * @param {string} message - JSON message to publish * @param {string} correlationId - Correlation ID for tracing + * @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections) */ -async function publishMessage(brokerUrlOrClient, subject, message, correlationId) { +async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) { let conn; + let shouldClose = false; if (brokerUrlOrClient instanceof NATSClient) { conn = brokerUrlOrClient; @@ -410,15 +510,18 @@ async function publishMessage(brokerUrlOrClient, subject, message, correlationId await brokerUrlOrClient.close(); } }; + shouldClose = true; } else { // String URL - create new client const client = new NATSClient(brokerUrlOrClient); conn = client; + shouldClose = true; } await conn.publish(subject, message, correlationId); - if (conn instanceof NATSClient) { + // Only close if explicitly requested and it's a short-lived client + if (shouldClose && closeConnection && conn instanceof NATSClient) { await conn.close(); } } @@ -764,9 +867,37 @@ async function smartreceive(msg, options = {}) { const NATSBridge = { /** * NATS client class for connection management + * Supports both single-use and persistent connection modes + * + * @example + * // Single-use connection (closes after publish) + * const client = new NATSBridge.NATSClient("nats://localhost:4222"); + * await NATSBridge.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client }); + * await client.close(); + * + * // Persistent connection (keeps connection open) + * const client = new NATSBridge.NATSClient("nats://localhost:4222", true); + * await client.connect(); + * await NATSBridge.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false }); + * await NATSBridge.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id"); + * // Connection remains open for more publishes + * await client.close(); */ NATSClient, + /** + * Connection pool for managing multiple NATS connections + * Useful for applications with multiple concurrent publishers + * + * @example + * const pool = new NATSBridge.NATSConnectionPool("nats://localhost:4222", 10); + * const client = await pool.acquire(); + * await NATSBridge.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client }); + * pool.release(client); + * await pool.closeAll(); + */ + NATSConnectionPool, + /** * Send data via NATS with automatic transport selection */ @@ -777,6 +908,19 @@ const NATSBridge = { */ smartreceive, + /** + * Publish message to NATS + * + * @example + * // Using a persistent connection + * const client = new NATSBridge.NATSClient("nats://localhost:4222", true); + * await client.connect(); + * await NATSBridge.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false); + * // Connection stays open for more publishes + * await client.close(); + */ + publishMessage, + /** * Upload data to plik server in one-shot mode */