From 49d789872086260e29a73a0394c51024ee881d93 Mon Sep 17 00:00:00 2001 From: narawat Date: Sun, 15 Mar 2026 11:58:08 +0700 Subject: [PATCH 1/3] remove arrow support for natsbridge_csr.js --- README.md | 36 +++++++++++++++++----- docs/architecture.md | 14 +++++++-- docs/requirements.md | 17 ++++++----- docs/spec.md | 17 +++++++++-- src/natsbridge_csr.js | 71 +++---------------------------------------- 5 files changed, 69 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index d4f33a7..27a3e2d 100644 --- a/README.md +++ b/README.md @@ -45,9 +45,9 @@ NATSBridge enables seamless communication across multiple platforms through NATS | Platform | Implementation | Features | |----------|----------------|----------| | **Julia** | [`src/NATSBridge.jl`](src/NATSBridge.jl) | Full feature set, Arrow IPC, multiple dispatch | -| **JavaScript (Node.js)** | [`src/natsbridge.js`](src/natsbridge_ssr.js) | Node.js, async/await | -| **JavaScript (Browser)** | [`src/natsbridge_csr.js`](src/natsbridge_csr.js) | Browser, WebSocket NATS, async/await | -| **Python** | [`src/natsbridge.py`](src/natsbridge.py) | Desktop Python, asyncio, type hints | +| **JavaScript (Node.js)** | [`src/natsbridge_ssr.js`](src/natsbridge_ssr.js) | Node.js, async/await, Arrow IPC | +| **JavaScript (Browser)** | [`src/natsbridge_csr.js`](src/natsbridge_csr.js) | Browser, WebSocket NATS, async/await, JSON table only | +| **Python** | [`src/natsbridge.py`](src/natsbridge.py) | Desktop Python, asyncio, type hints, Arrow IPC | | **MicroPython** | [`src/natsbridge_mpy.py`](src/natsbridge_mpy.py) | Memory-constrained, synchronous API | ### Platform Comparison @@ -57,7 +57,8 @@ NATSBridge enables seamless communication across multiple platforms through NATS | Multiple Dispatch | ✅ Native | ❌ | ❌ | ❌ | ❌ | | Async/Await | ❌ | ✅ Native | ✅ Native | ✅ Native | ⚠️ (uasyncio) | | Type Safety | ✅ Strong | ⚠️ (TypeScript) | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ | -| Arrow IPC | ✅ Native | ✅ | ✅ | ✅ | ❌ | +| Arrow IPC | ✅ Native | ✅ Native | ❌ (Browser incompatible) | ✅ Native | ❌ | +| JSON Table | ✅ | ✅ | ✅ (Only table type) | ✅ | ⚠️ (Limited) | | Direct Transport | ✅ | ✅ | ✅ | ✅ | ✅ | | Link Transport | ✅ | ✅ | ✅ | ✅ | ⚠️ (Limited) | | Handler Functions | ✅ | ✅ | ✅ | ✅ | ✅ | @@ -73,7 +74,8 @@ NATSBridge enables seamless communication across multiple platforms through NATS - ✅ **Multi-payload support** - send multiple payloads with different types in one message - ✅ **Automatic transport selection** - direct vs link based on payload size - ✅ **Claim-Check pattern** for payloads ≥ 500KB -- ✅ **Apache Arrow IPC** support for tabular data (zero-copy reading) +- ✅ **Apache Arrow IPC** support for tabular data (Desktop: Julia/Python/Node.js) +- ✅ **JSON Table** support for tabular data (All platforms including Browser) - ✅ **Exponential backoff** for reliable file server downloads - ✅ **Correlation ID tracking** for message tracing - ✅ **Reply-to support** for request-response patterns @@ -424,8 +426,8 @@ env = NATSBridge.smartreceive( |------|-------|------------|--------|-------------|-------------| | `text` | `String` | `string` | `str` | `str` | Plain text strings | | `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | JSON-serializable dictionaries | -| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) | -| `jsontable` | `DataFrame`, `Vector{NamedTuple}` | `Array` | `list[dict]` | ⚠️ | Tabular data (JSON) | +| `arrowtable` | `DataFrame`, `Arrow.Table` | ❌ (Browser), ✅ (Node.js) | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) | +| `jsontable` | `DataFrame`, `Vector{NamedTuple}` | `Array` | `list[dict]` | ⚠️ | Tabular data (JSON) - **Only table type in Browser** | | `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Image data (PNG, JPG) | | `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Audio data (WAV, MP3) | | `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Video data (MP4, AVI) | @@ -611,6 +613,26 @@ data = [("students", df, "arrowtable")] env, env_json_str = await NATSBridge.smartsend("/data/analysis", data) ``` +#### JavaScript (Browser) + +```javascript +import NATSBridge from './src/natsbridge_csr.js'; + +// Browser uses jsontable (JSON array of objects) instead of arrowtable +// Apache Arrow is not compatible with browsers +const df = [ + { id: 1, name: "Alice", score: 95 }, + { id: 2, name: "Bob", score: 88 }, + { id: 3, name: "Charlie", score: 92 } +]; + +const [env, env_json_str] = await NATSBridge.smartsend( + "/data/analysis", + [["students", df, "jsontable"]], // Use jsontable for browser + { broker_url: 'ws://localhost:4222' } +); +``` + ### Example 4: Request-Response Pattern Bi-directional communication with reply-to support. diff --git a/docs/architecture.md b/docs/architecture.md index 340679b..3aff666 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -275,8 +275,8 @@ end |------|-------------|---------------|----------|-----------| | `text` | Plain text string | UTF-8 bytes | Base64 | All | | `dictionary` | JSON object | JSON string | Base64/JSON | All | -| `arrowtable` | Apache Arrow IPC | Arrow IPC stream | Base64/arrow-ipc | Desktop | -| `jsontable` | JSON array of objects | JSON string | Base64/json | All | +| `arrowtable` | Apache Arrow IPC | Arrow IPC stream | Base64/arrow-ipc | Desktop (Julia/Python/Node.js) | +| `jsontable` | JSON array of objects | JSON string | Base64/json | All (including Browser) | | `image` | Binary image data | Raw bytes | Base64 | All | | `audio` | Binary audio data | Raw bytes | Base64 | All | | `video` | Binary video data | Raw bytes | Base64 | All | @@ -442,6 +442,16 @@ class NATSBridge: self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL ``` +### Browser Architecture + +Browser JavaScript has specific constraints due to security and compatibility: + +- **Async/await**: Native async/await support +- **No Apache Arrow**: Arrow IPC not available in browsers +- **JSON table only**: Use "jsontable" for tabular data +- **WebSocket NATS**: Uses nats.ws for browser-compatible NATS connections +- **Fetch API**: HTTP file server communication via fetch + ### MicroPython Architecture MicroPython has significant constraints: diff --git a/docs/requirements.md b/docs/requirements.md index dc6c0e4..a0cfaea 100644 --- a/docs/requirements.md +++ b/docs/requirements.md @@ -115,8 +115,9 @@ NATSBridge is a cross-platform, bi-directional data bridge that enables seamless | Platform | Minimum Version | Notes | |----------|-----------------|-------| | Julia | 1.7+ | Arrow.jl required for arrowtable support | -| Node.js | 16+ | nats.js required | +| Node.js | 16+ | nats.js required, Arrow IPC supported | | Python | 3.8+ | pyarrow required for arrowtable support | +| Browser | Latest | No Arrow IPC (uses jsontable only) | | MicroPython | 1.19+ | Limited to direct transport | --- @@ -181,8 +182,8 @@ NATSBridge is a cross-platform, bi-directional data bridge that enables seamless |------|-------|------------|--------|-------------|-------------| | `text` | `String` | `string` | `str` | `str` | Plain text strings | | `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | JSON-serializable data | -| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) | -| `jsontable` | `Vector{NamedTuple}` | `Array` | `list[dict]` | ⚠️ | Tabular data (JSON) | +| `arrowtable` | `DataFrame`, `Arrow.Table` | ❌ (Browser), ✅ (Node.js) | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) | +| `jsontable` | `Vector{NamedTuple}` | `Array` | `list[dict]` | ⚠️ | Tabular data (JSON) - **Only table type in Browser** | | `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Image binary data | | `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Audio binary data | | `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Video binary data | @@ -194,8 +195,8 @@ NATSBridge is a cross-platform, bi-directional data bridge that enables seamless |--------------|-----------------|-------| | `text` | UTF-8 → Base64 | Text must be String type | | `dictionary` | JSON → Base64 | JSON.jl for Julia | -| `arrowtable` | Arrow IPC → Base64 | Requires Arrow.jl/pyarrow | -| `jsontable` | JSON → Base64 | Human-readable format | +| `arrowtable` | Arrow IPC → Base64 | Requires Arrow.jl/pyarrow (Desktop only) | +| `jsontable` | JSON → Base64 | Human-readable format - **Browser uses this only** | | `image`/`audio`/`video`/`binary` | Direct → Base64 | Binary data preserved | --- @@ -294,7 +295,8 @@ NATSBridge is a cross-platform, bi-directional data bridge that enables seamless | Test Scenario | Success Criteria | |-------------|-----------------| | Cross-platform text message | Julia ↔ JavaScript ↔ Python | -| Cross-platform tabular data | Arrow IPC round-trip | +| Cross-platform tabular data (Desktop) | Arrow IPC round-trip | +| Cross-platform tabular data (Browser) | JSON table round-trip | | Large file transfer | File server upload/download | | Multi-payload mixed content | All payload types in one message | @@ -356,6 +358,7 @@ function smartreceive( | Python | nats-py | Latest stable | | Python | aiohttp | Latest stable | | Python | pyarrow | Latest stable | +| Browser | nats.ws | Latest stable | ### Optional Dependencies @@ -399,7 +402,7 @@ function smartreceive( | Version | Supported Platforms | |---------|---------------------| -| v1.0.x | Julia 1.7+, Node.js 16+, Python 3.8+, MicroPython 1.19+ | +| v1.0.x | Julia 1.7+, Node.js 16+, Python 3.8+, Browser (latest), MicroPython 1.19+ | --- diff --git a/docs/spec.md b/docs/spec.md index 7908319..e18f560 100644 --- a/docs/spec.md +++ b/docs/spec.md @@ -176,6 +176,7 @@ await smartsend("/agent/v1/process", data) | All | `String` | `"text"` | | All | `Dict`/`Object` | `"dictionary"` | | Desktop | `DataFrame` | `"arrowtable"` or `"jsontable"` | +| Browser | `Array` of objects | `"jsontable"` (only table type) | | All | `Array` of objects | `"jsontable"` | | All | `Uint8Array`/`Buffer`/`bytes` | `"binary"` | | Desktop | `Arrow.Table` | `"arrowtable"` | @@ -203,8 +204,8 @@ await smartsend("/agent/v1/process", data) |-------|-------------|---------------------|------------------| | `text` | Plain text string | All | `base64` | | `dictionary` | JSON object/dictionary | All | `base64`, `json` | -| `arrowtable` | Apache Arrow IPC table | Desktop (Julia/JS/Python) | `base64`, `arrow-ipc` | -| `jsontable` | JSON array of objects | All | `base64`, `json` | +| `arrowtable` | Apache Arrow IPC table | Desktop (Julia/Python/Node.js) | `base64`, `arrow-ipc` | +| `jsontable` | JSON array of objects | All (including Browser) | `base64`, `json` | | `image` | Binary image data | All | `base64` | | `audio` | Binary audio data | All | `base64` | | `video` | Binary video data | All | `base64` | @@ -613,7 +614,7 @@ function fileserver_download_handler( ## Platform-Specific Constraints -### Desktop (Julia/JS/Python) +### Desktop (Julia/Python/Node.js) | Feature | Status | Notes | |---------|--------|-------| @@ -623,6 +624,16 @@ function fileserver_download_handler( | File server download | ✅ Supported | HTTP/HTTPS | | Size threshold | 500KB | Configurable | +### Browser (JavaScript) + +| Feature | Status | Notes | +|---------|--------|-------| +| Arrow IPC | ❌ Not supported | Apache Arrow not browser-compatible | +| JSON table | ✅ Supported | Only table type available in browser | +| File server upload | ✅ Supported | HTTP/HTTPS | +| File server download | ✅ Supported | HTTP/HTTPS | +| Size threshold | 500KB | Configurable | + ### MicroPython | Feature | Status | Notes | diff --git a/src/natsbridge_csr.js b/src/natsbridge_csr.js index b8026d3..3db7e1c 100644 --- a/src/natsbridge_csr.js +++ b/src/natsbridge_csr.js @@ -6,7 +6,9 @@ * using NATS as the message bus, with support for both direct payload transport and * URL-based transport for larger payloads. * - * Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" + * Supported payload types: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" + * Note: Browser version does NOT support Apache Arrow IPC (arrowtable) due to browser compatibility constraints. + * Use "jsontable" for tabular data in browser applications. * * Browser-compatible version uses: * - nats.ws for WebSocket-based NATS connections @@ -21,7 +23,6 @@ import * as nats from 'nats.ws'; // Use native fetch available in browsers -import { tableFromArrays, tableToIPC } from 'apache-arrow/browser'; // ---------------------------------------------- Constants ---------------------------------------------- // @@ -111,13 +112,6 @@ async function serializeData(data, payloadType) { } else if (payloadType === 'dictionary') { const jsonStr = JSON.stringify(data); return new Uint8Array(new TextEncoder().encode(jsonStr)); - } else if (payloadType === 'arrowtable') { - // Convert array of objects to Arrow IPC format - if (!Array.isArray(data) || data.length === 0) { - throw new Error('Arrow table data must be a non-empty array of objects'); - } - - return serializeArrowTable(data); } else if (payloadType === 'jsontable') { // Serialize array of objects to JSON format if (!Array.isArray(data)) { @@ -154,49 +148,6 @@ async function serializeData(data, payloadType) { } } -/** - * Helper function to properly serialize table data to Arrow IPC - * @param {Array} data - Array of objects representing table rows - * @returns {Uint8Array} Arrow IPC formatted buffer - */ -function serializeArrowTable(data) { - if (!Array.isArray(data) || data.length === 0) { - throw new Error('Table data must be a non-empty array of objects'); - } - - logTrace('serializeArrowTable', `Serializing table with ${data.length} rows`); - - // Convert array of objects to a key-value format expected by tableFromArrays - const columns = {}; - const keys = Object.keys(data[0]); - for (const key of keys) { - columns[key] = data.map(row => row[key]); - } - - logTrace('serializeArrowTable', `Columns: ${Object.keys(columns).join(', ')}`); - - const table = tableFromArrays(columns); - - logTrace('serializeArrowTable', `Arrow table created with ${table.numRows} rows, ${table.numCols} cols`); - - // Convert to IPC format - const ipcBuffer = tableToIPC(table); - - logTrace('serializeArrowTable', `IPC buffer type: ${typeof ipcBuffer}, byteLength: ${ipcBuffer.byteLength}`); - - const resultBuffer = new Uint8Array(ipcBuffer); - logTrace('serializeArrowTable', `Result buffer: ${resultBuffer.length} bytes`); - - // Debug: Show first 20 bytes in hex - const hexPreview = []; - for (let i = 0; i < Math.min(20, resultBuffer.length); i++) { - hexPreview.push(resultBuffer[i].toString(16).padStart(2, '0')); - } - logTrace('serializeArrowTable', `First 20 bytes (hex): ${hexPreview.join(' ')}`); - - return resultBuffer; -} - /** * Deserialize bytes to data based on type * @param {Uint8Array|ArrayBuffer} data - Serialized data as bytes @@ -210,7 +161,7 @@ async function deserializeData(data, payloadType, correlationId) { logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`); // Debug: Show first 20 bytes in hex for binary data - if (payloadType === 'arrowtable' || payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') { + if (payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') { const hexPreview = []; for (let i = 0; i < Math.min(20, buffer.length); i++) { hexPreview.push(buffer[i].toString(16).padStart(2, '0')); @@ -227,18 +178,6 @@ async function deserializeData(data, payloadType, correlationId) { const result = JSON.parse(jsonStr); logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`); return result; - } else if (payloadType === 'arrowtable') { - logTrace(correlationId, `deserializeData: Attempting Arrow table deserialization`); - - try { - // Try tableFromIPC (browser API) - const table = tableFromIPC(buffer); - logTrace(correlationId, `deserializeData: Arrow table from IPC - rows=${table.numRows}, cols=${table.numCols}`); - return table; - } catch (e) { - logTrace(correlationId, `deserializeData: tableFromIPC failed: ${e.message}`); - throw new Error(`Unable to deserialize Arrow table: ${e.message}`); - } } else if (payloadType === 'jsontable') { const jsonStr = new TextDecoder().decode(buffer); const result = JSON.parse(jsonStr); @@ -478,8 +417,6 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) { let encoding = 'base64'; if (payloadType === 'jsontable') { encoding = 'json'; - } else if (payloadType === 'arrowtable') { - encoding = 'arrow-ipc'; } return { From 34d8e3fad82b8180ffec83e1601c8f9640ca5343 Mon Sep 17 00:00:00 2001 From: narawat Date: Sun, 15 Mar 2026 12:09:04 +0700 Subject: [PATCH 2/3] update --- Project.toml | 2 +- docs/spec.md | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 85f16c3..bc368e8 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "NATSBridge" uuid = "f2724d33-f338-4a57-b9f8-1be882570d10" -version = "0.5.5" +version = "0.5.6" authors = ["narawat "] [deps] diff --git a/docs/spec.md b/docs/spec.md index e18f560..6c4b6e8 100644 --- a/docs/spec.md +++ b/docs/spec.md @@ -647,6 +647,41 @@ function fileserver_download_handler( --- +## Implementation Files + +| File | Platform | Features | Notes | +|------|----------|----------|-------| +| [`src/NATSBridge.jl`](../src/NATSBridge.jl) | Julia | Full feature set, Arrow IPC, multiple dispatch | Ground truth implementation | +| [`src/natsbridge_ssr.js`](../src/natsbridge_ssr.js) | Node.js | Arrow IPC, async/await | Server-side JavaScript | +| [`src/natsbridge_csr.js`](../src/natsbridge_csr.js) | Browser | JSON table only, WebSocket NATS | Client-side rendering | +| [`src/natsbridge.py`](../src/natsbridge.py) | Python | Arrow IPC, async/await | Desktop Python | +| [`src/natsbridge_mpy.py`](../src/natsbridge_mpy.py) | MicroPython | Limited to direct transport | Memory-constrained | + +### Browser Implementation Notes + +The browser implementation ([`src/natsbridge_csr.js`](../src/natsbridge_csr.js)) has the following constraints: + +| Constraint | Reason | Workaround | +|------------|--------|------------| +| No Apache Arrow IPC | Browser-incompatible dependency | Use `jsontable` for tabular data | +| WebSocket NATS only | Browser cannot use TCP directly | Use `ws://` or `wss://` broker URLs | +| Fetch API for HTTP | Browser fetch() API only | Compatible with Plik and other HTTP servers | + +### Payload Type Availability by Platform + +| Payload Type | Julia | Node.js | Browser | Python | MicroPython | +|--------------|-------|---------|---------|--------|-------------| +| `text` | ✅ | ✅ | ✅ | ✅ | ✅ | +| `dictionary` | ✅ | ✅ | ✅ | ✅ | ✅ | +| `arrowtable` | ✅ | ✅ | ❌ | ✅ | ❌ | +| `jsontable` | ✅ | ✅ | ✅ | ✅ | ⚠️ | +| `image` | ✅ | ✅ | ✅ | ✅ | ✅ | +| `audio` | ✅ | ✅ | ✅ | ✅ | ✅ | +| `video` | ✅ | ✅ | ✅ | ✅ | ✅ | +| `binary` | ✅ | ✅ | ✅ | ✅ | ✅ | + +--- + ## Message Flow ### Sending Flow From 9c4c9418403ad613173a8c996b99a63057cd7448 Mon Sep 17 00:00:00 2001 From: narawat Date: Sun, 15 Mar 2026 18:41:45 +0700 Subject: [PATCH 3/3] add js class --- README.md | 117 +++++++++++++++++++++++++ docs/architecture.md | 63 ++++++++++++-- docs/spec.md | 42 ++++++++- src/natsbridge_csr.js | 196 +++++++++++++++++++++++++++++++++++++++--- src/natsbridge_ssr.js | 154 +++++++++++++++++++++++++++++++-- 5 files changed, 544 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 27a3e2d..ab0b711 100644 --- a/README.md +++ b/README.md @@ -790,6 +790,123 @@ python3 test/test_py_table_receiver.py --- +## Browser Deployment + +### Using with Node.js Build Tools + +The browser implementation (`src/natsbridge_csr.js`) can be bundled for production deployment using modern JavaScript build tools. + +#### Prerequisites + +```bash +# Install the browser-compatible NATS client +npm install nats.ws +``` + +#### Vite (Recommended) + +```bash +npm create vite@latest my-app -- --template vanilla +cd my-app +npm install nats.ws +``` + +In `vite.config.js`: +```javascript +import { defineConfig } from 'vite'; +export default defineConfig({ + resolve: { + alias: { + 'nats.ws': 'nats.ws/dist/esm/browser.js' + } + } +}); +``` + +Build command: +```bash +npm run build # Outputs to dist/ folder +``` + +#### Webpack + +```bash +npm install webpack webpack-cli --save-dev +npm install nats.ws +``` + +In `webpack.config.js`: +```javascript +module.exports = { + entry: './src/index.js', + output: { + filename: 'bundle.js', + path: __dirname + '/dist' + }, + resolve: { + alias: { + 'nats.ws': 'nats.ws/dist/esm/browser.js' + } + } +}; +``` + +Build command: +```bash +npx webpack +``` + +#### esbuild (Simple & Fast) + +```bash +npm install esbuild nats.ws --save-dev +``` + +Create `build.js`: +```javascript +import esbuild from 'esbuild'; + +esbuild.buildSync({ + entryPoints: ['src/natsbridge_csr.js'], + bundle: true, + outfile: 'dist/natsbridge-csr-bundle.js', + format: 'esm', + platform: 'browser', + target: 'es2020' +}); +``` + +Build command: +```bash +node build.js +``` + +### Using in Your HTML + +```html + + + + My App + + + + + + +``` + +--- + ## Documentation For detailed architecture and implementation information, see: diff --git a/docs/architecture.md b/docs/architecture.md index 3aff666..2697ccf 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -1,7 +1,7 @@ # Architecture Documentation: NATSBridge -**Version**: 1.0.0 -**Date**: 2026-03-13 +**Version**: 1.1.0 +**Date**: 2026-03-15 **Status**: Active **Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl) **Architecture Level**: C4 Container Level @@ -405,21 +405,68 @@ end JavaScript uses async/await for non-blocking I/O: -- **Class-based NATS Client**: Connection management +- **Class-based NATS Client**: Connection management with `keepAlive` support - **Module-level Utilities**: Serialization functions -- **Native ArrayBuffer**: Binary data handling +- **Native ArrayBuffer**: Binary data handling (Browser) / Buffer (Node.js) - **Fetch API**: HTTP file server communication +- **Connection Pooling**: `NATSConnectionPool` for high-throughput scenarios + +#### Node.js Implementation (natsbridge_ssr.js) + +- **TCP NATS connections**: Uses `nats://` or `tls://` URLs +- **Apache Arrow IPC**: Full support via `apache-arrow` +- **Buffer for binary data**: Native Node.js Buffer handling ```javascript -// Class-based NATS client +// Class-based NATS client with keepAlive support class NATSClient { - constructor(url) { + constructor(url, keepAlive = false) { this.url = url; this.connection = null; + this.keepAlive = keepAlive; } async connect() { + if (this.connection) return this.connection; this.connection = await nats.connect({ servers: this.url }); + return this.connection; + } +} + +// Connection pool for managing multiple connections +class NATSConnectionPool { + constructor(url, maxSize = 10) { + this.url = url; + this.maxSize = maxSize; + this.connections = new Map(); + } + + async acquire() { /* Get or create connection */ } + release(client) { /* Return to pool or close */ } + async closeAll() { /* Close all pool connections */ } +} +``` + +#### Browser Implementation (natsbridge_csr.js) + +- **WebSocket NATS connections**: Uses `ws://` or `wss://` URLs via `nats.ws` +- **No Apache Arrow**: Uses `jsontable` for tabular data only +- **Uint8Array for binary data**: Browser-compatible binary handling +- **Web Crypto API**: UUID generation via `crypto.getRandomValues()` + +```javascript +// Class-based NATS client with keepAlive support +class NATSClient { + constructor(url, keepAlive = false) { + this.url = url; // ws:// or wss:// + this.connection = null; + this.keepAlive = keepAlive; + } + + async connect() { + if (this.connection) return this.connection; + this.connection = await nats.connect({ servers: this.url }); + return this.connection; } } ``` @@ -711,6 +758,10 @@ flowchart TD | Date | Version | Changes | |------|---------|---------| +| 2026-03-15 | 1.1.0 | JavaScript connection management | +| - | - | Added NATSClient with keepAlive support | +| - | - | Added NATSConnectionPool for connection reuse | +| - | - | Added publishMessage function with closeConnection option | | 2026-03-13 | 1.0.0 | Initial architecture documentation | --- diff --git a/docs/spec.md b/docs/spec.md index 6c4b6e8..7a94d92 100644 --- a/docs/spec.md +++ b/docs/spec.md @@ -1,7 +1,7 @@ # Specification: NATSBridge -**Version**: 1.0.0 -**Date**: 2026-03-13 +**Version**: 1.1.0 +**Date**: 2026-03-15 **Status**: Active **Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl) **Specification Format**: JSON Schema + AsyncAPI @@ -481,11 +481,38 @@ async function smartsend( reply_to?: string; reply_to_msg_id?: string; is_publish?: boolean; - nats_connection?: NATS.Connection; + nats_connection?: NATSClient | NATS.Connection; msg_id?: string; sender_id?: string; } ): Promise<[Object, string]>; + +// NATSClient class for connection management +class NATSClient { + constructor(url: string, keepAlive?: boolean); + connect(): Promise; + publish(subject: string, message: string, correlationId: string): Promise; + close(): Promise; + getConnection(): NATS.Connection | null; + isConnected(): boolean; +} + +// NATSConnectionPool for managing multiple connections +class NATSConnectionPool { + constructor(url: string, maxSize?: number); + acquire(): Promise; + release(client: NATSClient): void; + closeAll(): Promise; +} + +// publishMessage function for manual publishing +async function publishMessage( + brokerUrlOrClient: string | NATSClient | NATS.Connection, + subject: string, + message: string, + correlationId: string, + closeConnection?: boolean +): Promise; ``` #### MicroPython @@ -805,8 +832,10 @@ flowchart TD | Julia | Arrow.jl | Latest | Arrow IPC support | | Julia | HTTP.jl | Latest | HTTP file server | | Julia | UUIDs.jl | Latest | UUID generation | -| Node.js | nats | Latest | NATS client | +| Node.js | nats | Latest | NATS client (TCP) | | Node.js | node-fetch | Latest | HTTP file server | +| Browser | nats.ws | Latest | NATS client (WebSocket) | +| Browser | nats | Latest | NATS client (for bundling) | | Python | nats-py | Latest | NATS client | | Python | aiohttp | Latest | HTTP file server | | Python | pyarrow | Latest | Arrow IPC support | @@ -825,6 +854,11 @@ flowchart TD | Date | Version | Changes | |------|---------|---------| +| 2026-03-15 | 1.1.0 | Browser connection management | +| - | - | Added NATSClient class with keepAlive support | +| - | - | Added NATSConnectionPool for connection reuse | +| - | - | Added publishMessage function with closeConnection option | +| - | - | Added nats.ws to browser dependencies | | 2026-03-13 | 1.0.0 | Initial specification | | - | - | Message envelope schema defined | | - | - | Payload schema with transport modes | diff --git a/src/natsbridge_csr.js b/src/natsbridge_csr.js index 3db7e1c..a5552d1 100644 --- a/src/natsbridge_csr.js +++ b/src/natsbridge_csr.js @@ -10,6 +10,12 @@ * Note: Browser version does NOT support Apache Arrow IPC (arrowtable) due to browser compatibility constraints. * Use "jsontable" for tabular data in browser applications. * + * Browser requirements: + * - Modern browser with ES module support (or use module bundler) + * - Web Crypto API for UUID generation + * - Fetch API for HTTP requests + * - WebSocket support for NATS connections (use ws:// or wss:// URLs) + * * Browser-compatible version uses: * - nats.ws for WebSocket-based NATS connections * - Web Crypto API for UUID generation @@ -50,10 +56,7 @@ const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; */ function bufferToBase64(data) { const bytes = new Uint8Array(data); - let binary = ''; - for (let i = 0; i < bytes.length; i++) { - binary += String.fromCharCode(bytes[i]); - } + const binary = String.fromCharCode(...bytes); return btoa(binary); } @@ -72,6 +75,34 @@ function base64ToBuffer(base64) { return bytes; } +/** + * Convert Uint8Array to Base64 string (Unicode-safe version) + * Uses TextEncoder/TextDecoder for proper Unicode handling + * @param {Uint8Array} data - Data to encode + * @returns {string} Base64 encoded string + */ +function bufferToBase64UnicodeSafe(data) { + const bytes = new Uint8Array(data); + // Use TextDecoder to properly handle the bytes as text + const binary = String.fromCharCode(...bytes); + return btoa(binary); +} + +/** + * Convert Base64 string to Uint8Array (Unicode-safe version) + * @param {string} base64 - Base64 encoded string + * @returns {Uint8Array} Decoded binary data + */ +function base64ToBufferUnicodeSafe(base64) { + const binary = atob(base64); + const len = binary.length; + const bytes = new Uint8Array(len); + for (let i = 0; i < len; i++) { + bytes[i] = binary.charCodeAt(i); + } + return bytes; +} + /** * Generate UUID v4 using Web Crypto API * @returns {string} UUID string @@ -99,7 +130,7 @@ function logTrace(correlationId, message) { /** * Serialize data according to specified format * @param {any} data - Data to serialize - * @param {string} payloadType - Target format: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" + * @param {string} payloadType - Target format: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" * @returns {Uint8Array} Binary representation of the serialized data */ async function serializeData(data, payloadType) { @@ -296,15 +327,18 @@ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlatio /** * NATS client wrapper for connection management + * Supports both single-use and persistent connection modes */ class NATSClient { /** * Create a new NATS client * @param {string} url - NATS server URL (ws:// or wss://) + * @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes */ - constructor(url) { + constructor(url, keepAlive = false) { this.url = url; this.connection = null; + this.keepAlive = keepAlive; } /** @@ -312,6 +346,9 @@ class NATSClient { * @returns {Promise} */ async connect() { + if (this.connection) { + return this.connection; + } this.connection = await nats.connect({ servers: this.url }); return this.connection; } @@ -336,8 +373,94 @@ class NATSClient { async close() { if (this.connection) { this.connection.close(); + this.connection = null; } } + + /** + * Get the current connection (for external use) + * @returns {NATS.Connection|null} + */ + getConnection() { + return this.connection; + } + + /** + * Check if connected + * @returns {boolean} + */ + isConnected() { + return this.connection !== null; + } +} + +/** + * Connection pool for managing multiple NATS connections + * Useful for applications with multiple concurrent publishers + */ +class NATSConnectionPool { + /** + * Create a new connection pool + * @param {string} url - NATS server URL (ws:// or wss://) + * @param {number} [maxSize=10] - Maximum pool size + */ + constructor(url, maxSize = 10) { + this.url = url; + this.maxSize = maxSize; + this.connections = new Map(); + this.idCounter = 0; + } + + /** + * Get a connection from the pool (or create new) + * @returns {Promise} + */ + async acquire() { + // Try to find an existing idle connection + for (const [id, client] of this.connections) { + if (client.isConnected()) { + return client; + } + } + + // Create new connection if under limit + if (this.connections.size < this.maxSize) { + const id = `conn_${++this.idCounter}`; + const client = new NATSClient(this.url, true); + await client.connect(); + this.connections.set(id, client); + return client; + } + + // Pool exhausted - create new connection (caller should close when done) + const client = new NATSClient(this.url, false); + await client.connect(); + return client; + } + + /** + * Return a connection to the pool + * @param {NATSClient} client - Connection to return + */ + release(client) { + // Only return persistent connections + if (client.keepAlive && client.isConnected()) { + // Connection already in pool, do nothing + return; + } + // Non-persistent connection - close it + client.close(); + } + + /** + * Close all connections in the pool + */ + async closeAll() { + for (const [id, client] of this.connections) { + await client.close(); + } + this.connections.clear(); + } } // ---------------------------------------------- Core Functions ---------------------------------------------- // @@ -348,9 +471,11 @@ class NATSClient { * @param {string} subject - NATS subject to publish to * @param {string} message - JSON message to publish * @param {string} correlationId - Correlation ID for tracing + * @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections) */ -async function publishMessage(brokerUrlOrClient, subject, message, correlationId) { +async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) { let conn; + let shouldClose = false; if (brokerUrlOrClient instanceof NATSClient) { conn = brokerUrlOrClient; @@ -364,15 +489,18 @@ async function publishMessage(brokerUrlOrClient, subject, message, correlationId await brokerUrlOrClient.close(); } }; + shouldClose = true; } else { // String URL - create new client const client = new NATSClient(brokerUrlOrClient); conn = client; + shouldClose = true; } await conn.publish(subject, message, correlationId); - if (conn instanceof NATSClient) { + // Only close if explicitly requested and it's a short-lived client + if (shouldClose && closeConnection && conn instanceof NATSClient) { await conn.close(); } } @@ -441,7 +569,8 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) { * * @param {string} subject - NATS subject to publish the message to * @param {Array} data - List of [dataname, data, type] tuples to send - * - type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" + * - type: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" + * - Note: "arrowtable" is NOT supported in browser (use "jsontable" for tabular data) * @param {Object} options - Optional configuration * @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server (WebSocket) * @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server @@ -465,17 +594,17 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) { * const [env, envJsonStr] = await NATSBridgeCSR.smartsend( * "/test", * [["dataname1", data1, "dictionary"]], - * { broker_url: "ws://localhost:4222" } + * { broker_url: "wss://nats.example.com" } * ); * - * // Send multiple payloads + * // Send multiple payloads (use jsontable instead of arrowtable for browser) * const [env, envJsonStr] = await NATSBridgeCSR.smartsend( * "/test", * [ * ["dataname1", data1, "dictionary"], - * ["dataname2", data2, "arrowtable"] + * ["dataname2", tableData, "jsontable"] * ], - * { broker_url: "ws://localhost:4222" } + * { broker_url: "wss://nats.example.com" } * ); */ async function smartsend(subject, data, options = {}) { @@ -711,9 +840,37 @@ async function smartreceive(msg, options = {}) { const NATSBridgeCSR = { /** * NATS client class for connection management + * Supports both single-use and persistent connection modes + * + * @example + * // Single-use connection (closes after publish) + * const client = new NATSBridgeCSR.NATSClient("wss://nats.example.com"); + * await NATSBridgeCSR.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client }); + * await client.close(); + * + * // Persistent connection (keeps connection open) + * const client = new NATSBridgeCSR.NATSClient("wss://nats.example.com", true); + * await client.connect(); + * await NATSBridgeCSR.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false }); + * await NATSBridgeCSR.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id"); + * // Connection remains open for more publishes + * await client.close(); */ NATSClient, + /** + * Connection pool for managing multiple NATS connections + * Useful for applications with multiple concurrent publishers + * + * @example + * const pool = new NATSBridgeCSR.NATSConnectionPool("wss://nats.example.com", 10); + * const client = await pool.acquire(); + * await NATSBridgeCSR.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client }); + * pool.release(client); + * await pool.closeAll(); + */ + NATSConnectionPool, + /** * Send data via NATS with automatic transport selection */ @@ -724,6 +881,19 @@ const NATSBridgeCSR = { */ smartreceive, + /** + * Publish message to NATS + * + * @example + * // Using a persistent connection + * const client = new NATSBridgeCSR.NATSClient("wss://nats.example.com", true); + * await client.connect(); + * await NATSBridgeCSR.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false); + * // Connection stays open for more publishes + * await client.close(); + */ + publishMessage, + /** * Upload data to plik server in one-shot mode */ diff --git a/src/natsbridge_ssr.js b/src/natsbridge_ssr.js index 5276b23..4c9ed81 100644 --- a/src/natsbridge_ssr.js +++ b/src/natsbridge_ssr.js @@ -1,6 +1,6 @@ /** * NATSBridge - Cross-Platform Bi-Directional Data Bridge - * JavaScript/Node.js Implementation (Client-Side Rendering) + * JavaScript/Node.js Implementation (Desktop/Server-Side) * * This module provides functionality for sending and receiving data across network boundaries * using NATS as the message bus, with support for both direct payload transport and @@ -8,6 +8,12 @@ * * Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" * + * Node.js-specific features: + * - Apache Arrow IPC support via apache-arrow + * - TCP NATS connections (nats:// or tls:// URLs) + * - Buffer for binary data handling + * - Connection pooling for high-throughput scenarios + * * @module NATSBridge */ @@ -342,15 +348,18 @@ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlatio /** * NATS client wrapper for connection management + * Supports both single-use and persistent connection modes */ class NATSClient { /** * Create a new NATS client - * @param {string} url - NATS server URL + * @param {string} url - NATS server URL (nats:// or tls://) + * @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes */ - constructor(url) { + constructor(url, keepAlive = false) { this.url = url; this.connection = null; + this.keepAlive = keepAlive; } /** @@ -358,6 +367,9 @@ class NATSClient { * @returns {Promise} */ async connect() { + if (this.connection) { + return this.connection; + } this.connection = await nats.connect({ servers: this.url }); return this.connection; } @@ -382,8 +394,94 @@ class NATSClient { async close() { if (this.connection) { this.connection.close(); + this.connection = null; } } + + /** + * Get the current connection (for external use) + * @returns {NATS.Connection|null} + */ + getConnection() { + return this.connection; + } + + /** + * Check if connected + * @returns {boolean} + */ + isConnected() { + return this.connection !== null; + } +} + +/** + * Connection pool for managing multiple NATS connections + * Useful for applications with multiple concurrent publishers + */ +class NATSConnectionPool { + /** + * Create a new connection pool + * @param {string} url - NATS server URL (nats:// or tls://) + * @param {number} [maxSize=10] - Maximum pool size + */ + constructor(url, maxSize = 10) { + this.url = url; + this.maxSize = maxSize; + this.connections = new Map(); + this.idCounter = 0; + } + + /** + * Get a connection from the pool (or create new) + * @returns {Promise} + */ + async acquire() { + // Try to find an existing idle connection + for (const [id, client] of this.connections) { + if (client.isConnected()) { + return client; + } + } + + // Create new connection if under limit + if (this.connections.size < this.maxSize) { + const id = `conn_${++this.idCounter}`; + const client = new NATSClient(this.url, true); + await client.connect(); + this.connections.set(id, client); + return client; + } + + // Pool exhausted - create new connection (caller should close when done) + const client = new NATSClient(this.url, false); + await client.connect(); + return client; + } + + /** + * Return a connection to the pool + * @param {NATSClient} client - Connection to return + */ + release(client) { + // Only return persistent connections + if (client.keepAlive && client.isConnected()) { + // Connection already in pool, do nothing + return; + } + // Non-persistent connection - close it + client.close(); + } + + /** + * Close all connections in the pool + */ + async closeAll() { + for (const [id, client] of this.connections) { + await client.close(); + } + this.connections.clear(); + } } // ---------------------------------------------- Core Functions ---------------------------------------------- // @@ -394,9 +492,11 @@ class NATSClient { * @param {string} subject - NATS subject to publish to * @param {string} message - JSON message to publish * @param {string} correlationId - Correlation ID for tracing + * @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections) */ -async function publishMessage(brokerUrlOrClient, subject, message, correlationId) { +async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) { let conn; + let shouldClose = false; if (brokerUrlOrClient instanceof NATSClient) { conn = brokerUrlOrClient; @@ -410,15 +510,18 @@ async function publishMessage(brokerUrlOrClient, subject, message, correlationId await brokerUrlOrClient.close(); } }; + shouldClose = true; } else { // String URL - create new client const client = new NATSClient(brokerUrlOrClient); conn = client; + shouldClose = true; } await conn.publish(subject, message, correlationId); - if (conn instanceof NATSClient) { + // Only close if explicitly requested and it's a short-lived client + if (shouldClose && closeConnection && conn instanceof NATSClient) { await conn.close(); } } @@ -764,9 +867,37 @@ async function smartreceive(msg, options = {}) { const NATSBridge = { /** * NATS client class for connection management + * Supports both single-use and persistent connection modes + * + * @example + * // Single-use connection (closes after publish) + * const client = new NATSBridge.NATSClient("nats://localhost:4222"); + * await NATSBridge.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client }); + * await client.close(); + * + * // Persistent connection (keeps connection open) + * const client = new NATSBridge.NATSClient("nats://localhost:4222", true); + * await client.connect(); + * await NATSBridge.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false }); + * await NATSBridge.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id"); + * // Connection remains open for more publishes + * await client.close(); */ NATSClient, + /** + * Connection pool for managing multiple NATS connections + * Useful for applications with multiple concurrent publishers + * + * @example + * const pool = new NATSBridge.NATSConnectionPool("nats://localhost:4222", 10); + * const client = await pool.acquire(); + * await NATSBridge.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client }); + * pool.release(client); + * await pool.closeAll(); + */ + NATSConnectionPool, + /** * Send data via NATS with automatic transport selection */ @@ -777,6 +908,19 @@ const NATSBridge = { */ smartreceive, + /** + * Publish message to NATS + * + * @example + * // Using a persistent connection + * const client = new NATSBridge.NATSClient("nats://localhost:4222", true); + * await client.connect(); + * await NATSBridge.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false); + * // Connection stays open for more publishes + * await client.close(); + */ + publishMessage, + /** * Upload data to plik server in one-shot mode */