Merge pull request 'remove_arrow_in_natsbridge_csr' (#11) from remove_arrow_in_natsbridge_csr into main
Reviewed-on: #11
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
name = "NATSBridge"
|
||||
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
|
||||
version = "0.5.5"
|
||||
version = "0.5.6"
|
||||
authors = ["narawat <narawat@gmail.com>"]
|
||||
|
||||
[deps]
|
||||
|
||||
153
README.md
153
README.md
@@ -45,9 +45,9 @@ NATSBridge enables seamless communication across multiple platforms through NATS
|
||||
| Platform | Implementation | Features |
|
||||
|----------|----------------|----------|
|
||||
| **Julia** | [`src/NATSBridge.jl`](src/NATSBridge.jl) | Full feature set, Arrow IPC, multiple dispatch |
|
||||
| **JavaScript (Node.js)** | [`src/natsbridge.js`](src/natsbridge_ssr.js) | Node.js, async/await |
|
||||
| **JavaScript (Browser)** | [`src/natsbridge_csr.js`](src/natsbridge_csr.js) | Browser, WebSocket NATS, async/await |
|
||||
| **Python** | [`src/natsbridge.py`](src/natsbridge.py) | Desktop Python, asyncio, type hints |
|
||||
| **JavaScript (Node.js)** | [`src/natsbridge_ssr.js`](src/natsbridge_ssr.js) | Node.js, async/await, Arrow IPC |
|
||||
| **JavaScript (Browser)** | [`src/natsbridge_csr.js`](src/natsbridge_csr.js) | Browser, WebSocket NATS, async/await, JSON table only |
|
||||
| **Python** | [`src/natsbridge.py`](src/natsbridge.py) | Desktop Python, asyncio, type hints, Arrow IPC |
|
||||
| **MicroPython** | [`src/natsbridge_mpy.py`](src/natsbridge_mpy.py) | Memory-constrained, synchronous API |
|
||||
|
||||
### Platform Comparison
|
||||
@@ -57,7 +57,8 @@ NATSBridge enables seamless communication across multiple platforms through NATS
|
||||
| Multiple Dispatch | ✅ Native | ❌ | ❌ | ❌ | ❌ |
|
||||
| Async/Await | ❌ | ✅ Native | ✅ Native | ✅ Native | ⚠️ (uasyncio) |
|
||||
| Type Safety | ✅ Strong | ⚠️ (TypeScript) | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ |
|
||||
| Arrow IPC | ✅ Native | ✅ | ✅ | ✅ | ❌ |
|
||||
| Arrow IPC | ✅ Native | ✅ Native | ❌ (Browser incompatible) | ✅ Native | ❌ |
|
||||
| JSON Table | ✅ | ✅ | ✅ (Only table type) | ✅ | ⚠️ (Limited) |
|
||||
| Direct Transport | ✅ | ✅ | ✅ | ✅ | ✅ |
|
||||
| Link Transport | ✅ | ✅ | ✅ | ✅ | ⚠️ (Limited) |
|
||||
| Handler Functions | ✅ | ✅ | ✅ | ✅ | ✅ |
|
||||
@@ -73,7 +74,8 @@ NATSBridge enables seamless communication across multiple platforms through NATS
|
||||
- ✅ **Multi-payload support** - send multiple payloads with different types in one message
|
||||
- ✅ **Automatic transport selection** - direct vs link based on payload size
|
||||
- ✅ **Claim-Check pattern** for payloads ≥ 500KB
|
||||
- ✅ **Apache Arrow IPC** support for tabular data (zero-copy reading)
|
||||
- ✅ **Apache Arrow IPC** support for tabular data (Desktop: Julia/Python/Node.js)
|
||||
- ✅ **JSON Table** support for tabular data (All platforms including Browser)
|
||||
- ✅ **Exponential backoff** for reliable file server downloads
|
||||
- ✅ **Correlation ID tracking** for message tracing
|
||||
- ✅ **Reply-to support** for request-response patterns
|
||||
@@ -424,8 +426,8 @@ env = NATSBridge.smartreceive(
|
||||
|------|-------|------------|--------|-------------|-------------|
|
||||
| `text` | `String` | `string` | `str` | `str` | Plain text strings |
|
||||
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | JSON-serializable dictionaries |
|
||||
| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array<Object>` | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) |
|
||||
| `jsontable` | `DataFrame`, `Vector{NamedTuple}` | `Array<Object>` | `list[dict]` | ⚠️ | Tabular data (JSON) |
|
||||
| `arrowtable` | `DataFrame`, `Arrow.Table` | ❌ (Browser), ✅ (Node.js) | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) |
|
||||
| `jsontable` | `DataFrame`, `Vector{NamedTuple}` | `Array<Object>` | `list[dict]` | ⚠️ | Tabular data (JSON) - **Only table type in Browser** |
|
||||
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Image data (PNG, JPG) |
|
||||
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Audio data (WAV, MP3) |
|
||||
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Video data (MP4, AVI) |
|
||||
@@ -611,6 +613,26 @@ data = [("students", df, "arrowtable")]
|
||||
env, env_json_str = await NATSBridge.smartsend("/data/analysis", data)
|
||||
```
|
||||
|
||||
#### JavaScript (Browser)
|
||||
|
||||
```javascript
|
||||
import NATSBridge from './src/natsbridge_csr.js';
|
||||
|
||||
// Browser uses jsontable (JSON array of objects) instead of arrowtable
|
||||
// Apache Arrow is not compatible with browsers
|
||||
const df = [
|
||||
{ id: 1, name: "Alice", score: 95 },
|
||||
{ id: 2, name: "Bob", score: 88 },
|
||||
{ id: 3, name: "Charlie", score: 92 }
|
||||
];
|
||||
|
||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
||||
"/data/analysis",
|
||||
[["students", df, "jsontable"]], // Use jsontable for browser
|
||||
{ broker_url: 'ws://localhost:4222' }
|
||||
);
|
||||
```
|
||||
|
||||
### Example 4: Request-Response Pattern
|
||||
|
||||
Bi-directional communication with reply-to support.
|
||||
@@ -768,6 +790,123 @@ python3 test/test_py_table_receiver.py
|
||||
|
||||
---
|
||||
|
||||
## Browser Deployment
|
||||
|
||||
### Using with Node.js Build Tools
|
||||
|
||||
The browser implementation (`src/natsbridge_csr.js`) can be bundled for production deployment using modern JavaScript build tools.
|
||||
|
||||
#### Prerequisites
|
||||
|
||||
```bash
|
||||
# Install the browser-compatible NATS client
|
||||
npm install nats.ws
|
||||
```
|
||||
|
||||
#### Vite (Recommended)
|
||||
|
||||
```bash
|
||||
npm create vite@latest my-app -- --template vanilla
|
||||
cd my-app
|
||||
npm install nats.ws
|
||||
```
|
||||
|
||||
In `vite.config.js`:
|
||||
```javascript
|
||||
import { defineConfig } from 'vite';
|
||||
export default defineConfig({
|
||||
resolve: {
|
||||
alias: {
|
||||
'nats.ws': 'nats.ws/dist/esm/browser.js'
|
||||
}
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
Build command:
|
||||
```bash
|
||||
npm run build # Outputs to dist/ folder
|
||||
```
|
||||
|
||||
#### Webpack
|
||||
|
||||
```bash
|
||||
npm install webpack webpack-cli --save-dev
|
||||
npm install nats.ws
|
||||
```
|
||||
|
||||
In `webpack.config.js`:
|
||||
```javascript
|
||||
module.exports = {
|
||||
entry: './src/index.js',
|
||||
output: {
|
||||
filename: 'bundle.js',
|
||||
path: __dirname + '/dist'
|
||||
},
|
||||
resolve: {
|
||||
alias: {
|
||||
'nats.ws': 'nats.ws/dist/esm/browser.js'
|
||||
}
|
||||
}
|
||||
};
|
||||
```
|
||||
|
||||
Build command:
|
||||
```bash
|
||||
npx webpack
|
||||
```
|
||||
|
||||
#### esbuild (Simple & Fast)
|
||||
|
||||
```bash
|
||||
npm install esbuild nats.ws --save-dev
|
||||
```
|
||||
|
||||
Create `build.js`:
|
||||
```javascript
|
||||
import esbuild from 'esbuild';
|
||||
|
||||
esbuild.buildSync({
|
||||
entryPoints: ['src/natsbridge_csr.js'],
|
||||
bundle: true,
|
||||
outfile: 'dist/natsbridge-csr-bundle.js',
|
||||
format: 'esm',
|
||||
platform: 'browser',
|
||||
target: 'es2020'
|
||||
});
|
||||
```
|
||||
|
||||
Build command:
|
||||
```bash
|
||||
node build.js
|
||||
```
|
||||
|
||||
### Using in Your HTML
|
||||
|
||||
```html
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>My App</title>
|
||||
</head>
|
||||
<body>
|
||||
<script type="module" src="dist/natsbridge-csr-bundle.js"></script>
|
||||
<script type="module">
|
||||
import NATSBridgeCSR from './dist/natsbridge-csr-bundle.js';
|
||||
|
||||
// Use the library
|
||||
const [env, envJson] = await NATSBridgeCSR.smartsend(
|
||||
"/chat/user/v1/message",
|
||||
[["msg", "Hello", "text"]],
|
||||
{ broker_url: "wss://nats.example.com" }
|
||||
);
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Documentation
|
||||
|
||||
For detailed architecture and implementation information, see:
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Architecture Documentation: NATSBridge
|
||||
|
||||
**Version**: 1.0.0
|
||||
**Date**: 2026-03-13
|
||||
**Version**: 1.1.0
|
||||
**Date**: 2026-03-15
|
||||
**Status**: Active
|
||||
**Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl)
|
||||
**Architecture Level**: C4 Container Level
|
||||
@@ -275,8 +275,8 @@ end
|
||||
|------|-------------|---------------|----------|-----------|
|
||||
| `text` | Plain text string | UTF-8 bytes | Base64 | All |
|
||||
| `dictionary` | JSON object | JSON string | Base64/JSON | All |
|
||||
| `arrowtable` | Apache Arrow IPC | Arrow IPC stream | Base64/arrow-ipc | Desktop |
|
||||
| `jsontable` | JSON array of objects | JSON string | Base64/json | All |
|
||||
| `arrowtable` | Apache Arrow IPC | Arrow IPC stream | Base64/arrow-ipc | Desktop (Julia/Python/Node.js) |
|
||||
| `jsontable` | JSON array of objects | JSON string | Base64/json | All (including Browser) |
|
||||
| `image` | Binary image data | Raw bytes | Base64 | All |
|
||||
| `audio` | Binary audio data | Raw bytes | Base64 | All |
|
||||
| `video` | Binary video data | Raw bytes | Base64 | All |
|
||||
@@ -405,21 +405,68 @@ end
|
||||
|
||||
JavaScript uses async/await for non-blocking I/O:
|
||||
|
||||
- **Class-based NATS Client**: Connection management
|
||||
- **Class-based NATS Client**: Connection management with `keepAlive` support
|
||||
- **Module-level Utilities**: Serialization functions
|
||||
- **Native ArrayBuffer**: Binary data handling
|
||||
- **Native ArrayBuffer**: Binary data handling (Browser) / Buffer (Node.js)
|
||||
- **Fetch API**: HTTP file server communication
|
||||
- **Connection Pooling**: `NATSConnectionPool` for high-throughput scenarios
|
||||
|
||||
#### Node.js Implementation (natsbridge_ssr.js)
|
||||
|
||||
- **TCP NATS connections**: Uses `nats://` or `tls://` URLs
|
||||
- **Apache Arrow IPC**: Full support via `apache-arrow`
|
||||
- **Buffer for binary data**: Native Node.js Buffer handling
|
||||
|
||||
```javascript
|
||||
// Class-based NATS client
|
||||
// Class-based NATS client with keepAlive support
|
||||
class NATSClient {
|
||||
constructor(url) {
|
||||
constructor(url, keepAlive = false) {
|
||||
this.url = url;
|
||||
this.connection = null;
|
||||
this.keepAlive = keepAlive;
|
||||
}
|
||||
|
||||
async connect() {
|
||||
if (this.connection) return this.connection;
|
||||
this.connection = await nats.connect({ servers: this.url });
|
||||
return this.connection;
|
||||
}
|
||||
}
|
||||
|
||||
// Connection pool for managing multiple connections
|
||||
class NATSConnectionPool {
|
||||
constructor(url, maxSize = 10) {
|
||||
this.url = url;
|
||||
this.maxSize = maxSize;
|
||||
this.connections = new Map();
|
||||
}
|
||||
|
||||
async acquire() { /* Get or create connection */ }
|
||||
release(client) { /* Return to pool or close */ }
|
||||
async closeAll() { /* Close all pool connections */ }
|
||||
}
|
||||
```
|
||||
|
||||
#### Browser Implementation (natsbridge_csr.js)
|
||||
|
||||
- **WebSocket NATS connections**: Uses `ws://` or `wss://` URLs via `nats.ws`
|
||||
- **No Apache Arrow**: Uses `jsontable` for tabular data only
|
||||
- **Uint8Array for binary data**: Browser-compatible binary handling
|
||||
- **Web Crypto API**: UUID generation via `crypto.getRandomValues()`
|
||||
|
||||
```javascript
|
||||
// Class-based NATS client with keepAlive support
|
||||
class NATSClient {
|
||||
constructor(url, keepAlive = false) {
|
||||
this.url = url; // ws:// or wss://
|
||||
this.connection = null;
|
||||
this.keepAlive = keepAlive;
|
||||
}
|
||||
|
||||
async connect() {
|
||||
if (this.connection) return this.connection;
|
||||
this.connection = await nats.connect({ servers: this.url });
|
||||
return this.connection;
|
||||
}
|
||||
}
|
||||
```
|
||||
@@ -442,6 +489,16 @@ class NATSBridge:
|
||||
self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL
|
||||
```
|
||||
|
||||
### Browser Architecture
|
||||
|
||||
Browser JavaScript has specific constraints due to security and compatibility:
|
||||
|
||||
- **Async/await**: Native async/await support
|
||||
- **No Apache Arrow**: Arrow IPC not available in browsers
|
||||
- **JSON table only**: Use "jsontable" for tabular data
|
||||
- **WebSocket NATS**: Uses nats.ws for browser-compatible NATS connections
|
||||
- **Fetch API**: HTTP file server communication via fetch
|
||||
|
||||
### MicroPython Architecture
|
||||
|
||||
MicroPython has significant constraints:
|
||||
@@ -701,6 +758,10 @@ flowchart TD
|
||||
|
||||
| Date | Version | Changes |
|
||||
|------|---------|---------|
|
||||
| 2026-03-15 | 1.1.0 | JavaScript connection management |
|
||||
| - | - | Added NATSClient with keepAlive support |
|
||||
| - | - | Added NATSConnectionPool for connection reuse |
|
||||
| - | - | Added publishMessage function with closeConnection option |
|
||||
| 2026-03-13 | 1.0.0 | Initial architecture documentation |
|
||||
|
||||
---
|
||||
|
||||
@@ -115,8 +115,9 @@ NATSBridge is a cross-platform, bi-directional data bridge that enables seamless
|
||||
| Platform | Minimum Version | Notes |
|
||||
|----------|-----------------|-------|
|
||||
| Julia | 1.7+ | Arrow.jl required for arrowtable support |
|
||||
| Node.js | 16+ | nats.js required |
|
||||
| Node.js | 16+ | nats.js required, Arrow IPC supported |
|
||||
| Python | 3.8+ | pyarrow required for arrowtable support |
|
||||
| Browser | Latest | No Arrow IPC (uses jsontable only) |
|
||||
| MicroPython | 1.19+ | Limited to direct transport |
|
||||
|
||||
---
|
||||
@@ -181,8 +182,8 @@ NATSBridge is a cross-platform, bi-directional data bridge that enables seamless
|
||||
|------|-------|------------|--------|-------------|-------------|
|
||||
| `text` | `String` | `string` | `str` | `str` | Plain text strings |
|
||||
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | JSON-serializable data |
|
||||
| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array<Object>` | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) |
|
||||
| `jsontable` | `Vector{NamedTuple}` | `Array<Object>` | `list[dict]` | ⚠️ | Tabular data (JSON) |
|
||||
| `arrowtable` | `DataFrame`, `Arrow.Table` | ❌ (Browser), ✅ (Node.js) | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) |
|
||||
| `jsontable` | `Vector{NamedTuple}` | `Array<Object>` | `list[dict]` | ⚠️ | Tabular data (JSON) - **Only table type in Browser** |
|
||||
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Image binary data |
|
||||
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Audio binary data |
|
||||
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Video binary data |
|
||||
@@ -194,8 +195,8 @@ NATSBridge is a cross-platform, bi-directional data bridge that enables seamless
|
||||
|--------------|-----------------|-------|
|
||||
| `text` | UTF-8 → Base64 | Text must be String type |
|
||||
| `dictionary` | JSON → Base64 | JSON.jl for Julia |
|
||||
| `arrowtable` | Arrow IPC → Base64 | Requires Arrow.jl/pyarrow |
|
||||
| `jsontable` | JSON → Base64 | Human-readable format |
|
||||
| `arrowtable` | Arrow IPC → Base64 | Requires Arrow.jl/pyarrow (Desktop only) |
|
||||
| `jsontable` | JSON → Base64 | Human-readable format - **Browser uses this only** |
|
||||
| `image`/`audio`/`video`/`binary` | Direct → Base64 | Binary data preserved |
|
||||
|
||||
---
|
||||
@@ -294,7 +295,8 @@ NATSBridge is a cross-platform, bi-directional data bridge that enables seamless
|
||||
| Test Scenario | Success Criteria |
|
||||
|-------------|-----------------|
|
||||
| Cross-platform text message | Julia ↔ JavaScript ↔ Python |
|
||||
| Cross-platform tabular data | Arrow IPC round-trip |
|
||||
| Cross-platform tabular data (Desktop) | Arrow IPC round-trip |
|
||||
| Cross-platform tabular data (Browser) | JSON table round-trip |
|
||||
| Large file transfer | File server upload/download |
|
||||
| Multi-payload mixed content | All payload types in one message |
|
||||
|
||||
@@ -356,6 +358,7 @@ function smartreceive(
|
||||
| Python | nats-py | Latest stable |
|
||||
| Python | aiohttp | Latest stable |
|
||||
| Python | pyarrow | Latest stable |
|
||||
| Browser | nats.ws | Latest stable |
|
||||
|
||||
### Optional Dependencies
|
||||
|
||||
@@ -399,7 +402,7 @@ function smartreceive(
|
||||
|
||||
| Version | Supported Platforms |
|
||||
|---------|---------------------|
|
||||
| v1.0.x | Julia 1.7+, Node.js 16+, Python 3.8+, MicroPython 1.19+ |
|
||||
| v1.0.x | Julia 1.7+, Node.js 16+, Python 3.8+, Browser (latest), MicroPython 1.19+ |
|
||||
|
||||
---
|
||||
|
||||
|
||||
94
docs/spec.md
94
docs/spec.md
@@ -1,7 +1,7 @@
|
||||
# Specification: NATSBridge
|
||||
|
||||
**Version**: 1.0.0
|
||||
**Date**: 2026-03-13
|
||||
**Version**: 1.1.0
|
||||
**Date**: 2026-03-15
|
||||
**Status**: Active
|
||||
**Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl)
|
||||
**Specification Format**: JSON Schema + AsyncAPI
|
||||
@@ -176,6 +176,7 @@ await smartsend("/agent/v1/process", data)
|
||||
| All | `String` | `"text"` |
|
||||
| All | `Dict`/`Object` | `"dictionary"` |
|
||||
| Desktop | `DataFrame` | `"arrowtable"` or `"jsontable"` |
|
||||
| Browser | `Array` of objects | `"jsontable"` (only table type) |
|
||||
| All | `Array` of objects | `"jsontable"` |
|
||||
| All | `Uint8Array`/`Buffer`/`bytes` | `"binary"` |
|
||||
| Desktop | `Arrow.Table` | `"arrowtable"` |
|
||||
@@ -203,8 +204,8 @@ await smartsend("/agent/v1/process", data)
|
||||
|-------|-------------|---------------------|------------------|
|
||||
| `text` | Plain text string | All | `base64` |
|
||||
| `dictionary` | JSON object/dictionary | All | `base64`, `json` |
|
||||
| `arrowtable` | Apache Arrow IPC table | Desktop (Julia/JS/Python) | `base64`, `arrow-ipc` |
|
||||
| `jsontable` | JSON array of objects | All | `base64`, `json` |
|
||||
| `arrowtable` | Apache Arrow IPC table | Desktop (Julia/Python/Node.js) | `base64`, `arrow-ipc` |
|
||||
| `jsontable` | JSON array of objects | All (including Browser) | `base64`, `json` |
|
||||
| `image` | Binary image data | All | `base64` |
|
||||
| `audio` | Binary audio data | All | `base64` |
|
||||
| `video` | Binary video data | All | `base64` |
|
||||
@@ -480,11 +481,38 @@ async function smartsend(
|
||||
reply_to?: string;
|
||||
reply_to_msg_id?: string;
|
||||
is_publish?: boolean;
|
||||
nats_connection?: NATS.Connection;
|
||||
nats_connection?: NATSClient | NATS.Connection;
|
||||
msg_id?: string;
|
||||
sender_id?: string;
|
||||
}
|
||||
): Promise<[Object, string]>;
|
||||
|
||||
// NATSClient class for connection management
|
||||
class NATSClient {
|
||||
constructor(url: string, keepAlive?: boolean);
|
||||
connect(): Promise<NATS.Connection>;
|
||||
publish(subject: string, message: string, correlationId: string): Promise<void>;
|
||||
close(): Promise<void>;
|
||||
getConnection(): NATS.Connection | null;
|
||||
isConnected(): boolean;
|
||||
}
|
||||
|
||||
// NATSConnectionPool for managing multiple connections
|
||||
class NATSConnectionPool {
|
||||
constructor(url: string, maxSize?: number);
|
||||
acquire(): Promise<NATSClient>;
|
||||
release(client: NATSClient): void;
|
||||
closeAll(): Promise<void>;
|
||||
}
|
||||
|
||||
// publishMessage function for manual publishing
|
||||
async function publishMessage(
|
||||
brokerUrlOrClient: string | NATSClient | NATS.Connection,
|
||||
subject: string,
|
||||
message: string,
|
||||
correlationId: string,
|
||||
closeConnection?: boolean
|
||||
): Promise<void>;
|
||||
```
|
||||
|
||||
#### MicroPython
|
||||
@@ -613,7 +641,7 @@ function fileserver_download_handler(
|
||||
|
||||
## Platform-Specific Constraints
|
||||
|
||||
### Desktop (Julia/JS/Python)
|
||||
### Desktop (Julia/Python/Node.js)
|
||||
|
||||
| Feature | Status | Notes |
|
||||
|---------|--------|-------|
|
||||
@@ -623,6 +651,16 @@ function fileserver_download_handler(
|
||||
| File server download | ✅ Supported | HTTP/HTTPS |
|
||||
| Size threshold | 500KB | Configurable |
|
||||
|
||||
### Browser (JavaScript)
|
||||
|
||||
| Feature | Status | Notes |
|
||||
|---------|--------|-------|
|
||||
| Arrow IPC | ❌ Not supported | Apache Arrow not browser-compatible |
|
||||
| JSON table | ✅ Supported | Only table type available in browser |
|
||||
| File server upload | ✅ Supported | HTTP/HTTPS |
|
||||
| File server download | ✅ Supported | HTTP/HTTPS |
|
||||
| Size threshold | 500KB | Configurable |
|
||||
|
||||
### MicroPython
|
||||
|
||||
| Feature | Status | Notes |
|
||||
@@ -636,6 +674,41 @@ function fileserver_download_handler(
|
||||
|
||||
---
|
||||
|
||||
## Implementation Files
|
||||
|
||||
| File | Platform | Features | Notes |
|
||||
|------|----------|----------|-------|
|
||||
| [`src/NATSBridge.jl`](../src/NATSBridge.jl) | Julia | Full feature set, Arrow IPC, multiple dispatch | Ground truth implementation |
|
||||
| [`src/natsbridge_ssr.js`](../src/natsbridge_ssr.js) | Node.js | Arrow IPC, async/await | Server-side JavaScript |
|
||||
| [`src/natsbridge_csr.js`](../src/natsbridge_csr.js) | Browser | JSON table only, WebSocket NATS | Client-side rendering |
|
||||
| [`src/natsbridge.py`](../src/natsbridge.py) | Python | Arrow IPC, async/await | Desktop Python |
|
||||
| [`src/natsbridge_mpy.py`](../src/natsbridge_mpy.py) | MicroPython | Limited to direct transport | Memory-constrained |
|
||||
|
||||
### Browser Implementation Notes
|
||||
|
||||
The browser implementation ([`src/natsbridge_csr.js`](../src/natsbridge_csr.js)) has the following constraints:
|
||||
|
||||
| Constraint | Reason | Workaround |
|
||||
|------------|--------|------------|
|
||||
| No Apache Arrow IPC | Browser-incompatible dependency | Use `jsontable` for tabular data |
|
||||
| WebSocket NATS only | Browser cannot use TCP directly | Use `ws://` or `wss://` broker URLs |
|
||||
| Fetch API for HTTP | Browser fetch() API only | Compatible with Plik and other HTTP servers |
|
||||
|
||||
### Payload Type Availability by Platform
|
||||
|
||||
| Payload Type | Julia | Node.js | Browser | Python | MicroPython |
|
||||
|--------------|-------|---------|---------|--------|-------------|
|
||||
| `text` | ✅ | ✅ | ✅ | ✅ | ✅ |
|
||||
| `dictionary` | ✅ | ✅ | ✅ | ✅ | ✅ |
|
||||
| `arrowtable` | ✅ | ✅ | ❌ | ✅ | ❌ |
|
||||
| `jsontable` | ✅ | ✅ | ✅ | ✅ | ⚠️ |
|
||||
| `image` | ✅ | ✅ | ✅ | ✅ | ✅ |
|
||||
| `audio` | ✅ | ✅ | ✅ | ✅ | ✅ |
|
||||
| `video` | ✅ | ✅ | ✅ | ✅ | ✅ |
|
||||
| `binary` | ✅ | ✅ | ✅ | ✅ | ✅ |
|
||||
|
||||
---
|
||||
|
||||
## Message Flow
|
||||
|
||||
### Sending Flow
|
||||
@@ -759,8 +832,10 @@ flowchart TD
|
||||
| Julia | Arrow.jl | Latest | Arrow IPC support |
|
||||
| Julia | HTTP.jl | Latest | HTTP file server |
|
||||
| Julia | UUIDs.jl | Latest | UUID generation |
|
||||
| Node.js | nats | Latest | NATS client |
|
||||
| Node.js | nats | Latest | NATS client (TCP) |
|
||||
| Node.js | node-fetch | Latest | HTTP file server |
|
||||
| Browser | nats.ws | Latest | NATS client (WebSocket) |
|
||||
| Browser | nats | Latest | NATS client (for bundling) |
|
||||
| Python | nats-py | Latest | NATS client |
|
||||
| Python | aiohttp | Latest | HTTP file server |
|
||||
| Python | pyarrow | Latest | Arrow IPC support |
|
||||
@@ -779,6 +854,11 @@ flowchart TD
|
||||
|
||||
| Date | Version | Changes |
|
||||
|------|---------|---------|
|
||||
| 2026-03-15 | 1.1.0 | Browser connection management |
|
||||
| - | - | Added NATSClient class with keepAlive support |
|
||||
| - | - | Added NATSConnectionPool for connection reuse |
|
||||
| - | - | Added publishMessage function with closeConnection option |
|
||||
| - | - | Added nats.ws to browser dependencies |
|
||||
| 2026-03-13 | 1.0.0 | Initial specification |
|
||||
| - | - | Message envelope schema defined |
|
||||
| - | - | Payload schema with transport modes |
|
||||
|
||||
@@ -6,7 +6,15 @@
|
||||
* using NATS as the message bus, with support for both direct payload transport and
|
||||
* URL-based transport for larger payloads.
|
||||
*
|
||||
* Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||
* Supported payload types: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
|
||||
* Note: Browser version does NOT support Apache Arrow IPC (arrowtable) due to browser compatibility constraints.
|
||||
* Use "jsontable" for tabular data in browser applications.
|
||||
*
|
||||
* Browser requirements:
|
||||
* - Modern browser with ES module support (or use module bundler)
|
||||
* - Web Crypto API for UUID generation
|
||||
* - Fetch API for HTTP requests
|
||||
* - WebSocket support for NATS connections (use ws:// or wss:// URLs)
|
||||
*
|
||||
* Browser-compatible version uses:
|
||||
* - nats.ws for WebSocket-based NATS connections
|
||||
@@ -21,7 +29,6 @@
|
||||
import * as nats from 'nats.ws';
|
||||
|
||||
// Use native fetch available in browsers
|
||||
import { tableFromArrays, tableToIPC } from 'apache-arrow/browser';
|
||||
|
||||
// ---------------------------------------------- Constants ---------------------------------------------- //
|
||||
|
||||
@@ -49,10 +56,7 @@ const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
|
||||
*/
|
||||
function bufferToBase64(data) {
|
||||
const bytes = new Uint8Array(data);
|
||||
let binary = '';
|
||||
for (let i = 0; i < bytes.length; i++) {
|
||||
binary += String.fromCharCode(bytes[i]);
|
||||
}
|
||||
const binary = String.fromCharCode(...bytes);
|
||||
return btoa(binary);
|
||||
}
|
||||
|
||||
@@ -71,6 +75,34 @@ function base64ToBuffer(base64) {
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert Uint8Array to Base64 string (Unicode-safe version)
|
||||
* Uses TextEncoder/TextDecoder for proper Unicode handling
|
||||
* @param {Uint8Array} data - Data to encode
|
||||
* @returns {string} Base64 encoded string
|
||||
*/
|
||||
function bufferToBase64UnicodeSafe(data) {
|
||||
const bytes = new Uint8Array(data);
|
||||
// Use TextDecoder to properly handle the bytes as text
|
||||
const binary = String.fromCharCode(...bytes);
|
||||
return btoa(binary);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert Base64 string to Uint8Array (Unicode-safe version)
|
||||
* @param {string} base64 - Base64 encoded string
|
||||
* @returns {Uint8Array} Decoded binary data
|
||||
*/
|
||||
function base64ToBufferUnicodeSafe(base64) {
|
||||
const binary = atob(base64);
|
||||
const len = binary.length;
|
||||
const bytes = new Uint8Array(len);
|
||||
for (let i = 0; i < len; i++) {
|
||||
bytes[i] = binary.charCodeAt(i);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate UUID v4 using Web Crypto API
|
||||
* @returns {string} UUID string
|
||||
@@ -98,7 +130,7 @@ function logTrace(correlationId, message) {
|
||||
/**
|
||||
* Serialize data according to specified format
|
||||
* @param {any} data - Data to serialize
|
||||
* @param {string} payloadType - Target format: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||
* @param {string} payloadType - Target format: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
|
||||
* @returns {Uint8Array} Binary representation of the serialized data
|
||||
*/
|
||||
async function serializeData(data, payloadType) {
|
||||
@@ -111,13 +143,6 @@ async function serializeData(data, payloadType) {
|
||||
} else if (payloadType === 'dictionary') {
|
||||
const jsonStr = JSON.stringify(data);
|
||||
return new Uint8Array(new TextEncoder().encode(jsonStr));
|
||||
} else if (payloadType === 'arrowtable') {
|
||||
// Convert array of objects to Arrow IPC format
|
||||
if (!Array.isArray(data) || data.length === 0) {
|
||||
throw new Error('Arrow table data must be a non-empty array of objects');
|
||||
}
|
||||
|
||||
return serializeArrowTable(data);
|
||||
} else if (payloadType === 'jsontable') {
|
||||
// Serialize array of objects to JSON format
|
||||
if (!Array.isArray(data)) {
|
||||
@@ -154,49 +179,6 @@ async function serializeData(data, payloadType) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to properly serialize table data to Arrow IPC
|
||||
* @param {Array<Object>} data - Array of objects representing table rows
|
||||
* @returns {Uint8Array} Arrow IPC formatted buffer
|
||||
*/
|
||||
function serializeArrowTable(data) {
|
||||
if (!Array.isArray(data) || data.length === 0) {
|
||||
throw new Error('Table data must be a non-empty array of objects');
|
||||
}
|
||||
|
||||
logTrace('serializeArrowTable', `Serializing table with ${data.length} rows`);
|
||||
|
||||
// Convert array of objects to a key-value format expected by tableFromArrays
|
||||
const columns = {};
|
||||
const keys = Object.keys(data[0]);
|
||||
for (const key of keys) {
|
||||
columns[key] = data.map(row => row[key]);
|
||||
}
|
||||
|
||||
logTrace('serializeArrowTable', `Columns: ${Object.keys(columns).join(', ')}`);
|
||||
|
||||
const table = tableFromArrays(columns);
|
||||
|
||||
logTrace('serializeArrowTable', `Arrow table created with ${table.numRows} rows, ${table.numCols} cols`);
|
||||
|
||||
// Convert to IPC format
|
||||
const ipcBuffer = tableToIPC(table);
|
||||
|
||||
logTrace('serializeArrowTable', `IPC buffer type: ${typeof ipcBuffer}, byteLength: ${ipcBuffer.byteLength}`);
|
||||
|
||||
const resultBuffer = new Uint8Array(ipcBuffer);
|
||||
logTrace('serializeArrowTable', `Result buffer: ${resultBuffer.length} bytes`);
|
||||
|
||||
// Debug: Show first 20 bytes in hex
|
||||
const hexPreview = [];
|
||||
for (let i = 0; i < Math.min(20, resultBuffer.length); i++) {
|
||||
hexPreview.push(resultBuffer[i].toString(16).padStart(2, '0'));
|
||||
}
|
||||
logTrace('serializeArrowTable', `First 20 bytes (hex): ${hexPreview.join(' ')}`);
|
||||
|
||||
return resultBuffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize bytes to data based on type
|
||||
* @param {Uint8Array|ArrayBuffer} data - Serialized data as bytes
|
||||
@@ -210,7 +192,7 @@ async function deserializeData(data, payloadType, correlationId) {
|
||||
logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`);
|
||||
|
||||
// Debug: Show first 20 bytes in hex for binary data
|
||||
if (payloadType === 'arrowtable' || payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') {
|
||||
if (payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') {
|
||||
const hexPreview = [];
|
||||
for (let i = 0; i < Math.min(20, buffer.length); i++) {
|
||||
hexPreview.push(buffer[i].toString(16).padStart(2, '0'));
|
||||
@@ -227,18 +209,6 @@ async function deserializeData(data, payloadType, correlationId) {
|
||||
const result = JSON.parse(jsonStr);
|
||||
logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`);
|
||||
return result;
|
||||
} else if (payloadType === 'arrowtable') {
|
||||
logTrace(correlationId, `deserializeData: Attempting Arrow table deserialization`);
|
||||
|
||||
try {
|
||||
// Try tableFromIPC (browser API)
|
||||
const table = tableFromIPC(buffer);
|
||||
logTrace(correlationId, `deserializeData: Arrow table from IPC - rows=${table.numRows}, cols=${table.numCols}`);
|
||||
return table;
|
||||
} catch (e) {
|
||||
logTrace(correlationId, `deserializeData: tableFromIPC failed: ${e.message}`);
|
||||
throw new Error(`Unable to deserialize Arrow table: ${e.message}`);
|
||||
}
|
||||
} else if (payloadType === 'jsontable') {
|
||||
const jsonStr = new TextDecoder().decode(buffer);
|
||||
const result = JSON.parse(jsonStr);
|
||||
@@ -357,15 +327,18 @@ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlatio
|
||||
|
||||
/**
|
||||
* NATS client wrapper for connection management
|
||||
* Supports both single-use and persistent connection modes
|
||||
*/
|
||||
class NATSClient {
|
||||
/**
|
||||
* Create a new NATS client
|
||||
* @param {string} url - NATS server URL (ws:// or wss://)
|
||||
* @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes
|
||||
*/
|
||||
constructor(url) {
|
||||
constructor(url, keepAlive = false) {
|
||||
this.url = url;
|
||||
this.connection = null;
|
||||
this.keepAlive = keepAlive;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -373,6 +346,9 @@ class NATSClient {
|
||||
* @returns {Promise<NATS.Connection>}
|
||||
*/
|
||||
async connect() {
|
||||
if (this.connection) {
|
||||
return this.connection;
|
||||
}
|
||||
this.connection = await nats.connect({ servers: this.url });
|
||||
return this.connection;
|
||||
}
|
||||
@@ -397,8 +373,94 @@ class NATSClient {
|
||||
async close() {
|
||||
if (this.connection) {
|
||||
this.connection.close();
|
||||
this.connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current connection (for external use)
|
||||
* @returns {NATS.Connection|null}
|
||||
*/
|
||||
getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if connected
|
||||
* @returns {boolean}
|
||||
*/
|
||||
isConnected() {
|
||||
return this.connection !== null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connection pool for managing multiple NATS connections
|
||||
* Useful for applications with multiple concurrent publishers
|
||||
*/
|
||||
class NATSConnectionPool {
|
||||
/**
|
||||
* Create a new connection pool
|
||||
* @param {string} url - NATS server URL (ws:// or wss://)
|
||||
* @param {number} [maxSize=10] - Maximum pool size
|
||||
*/
|
||||
constructor(url, maxSize = 10) {
|
||||
this.url = url;
|
||||
this.maxSize = maxSize;
|
||||
this.connections = new Map();
|
||||
this.idCounter = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a connection from the pool (or create new)
|
||||
* @returns {Promise<NATSClient>}
|
||||
*/
|
||||
async acquire() {
|
||||
// Try to find an existing idle connection
|
||||
for (const [id, client] of this.connections) {
|
||||
if (client.isConnected()) {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
// Create new connection if under limit
|
||||
if (this.connections.size < this.maxSize) {
|
||||
const id = `conn_${++this.idCounter}`;
|
||||
const client = new NATSClient(this.url, true);
|
||||
await client.connect();
|
||||
this.connections.set(id, client);
|
||||
return client;
|
||||
}
|
||||
|
||||
// Pool exhausted - create new connection (caller should close when done)
|
||||
const client = new NATSClient(this.url, false);
|
||||
await client.connect();
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a connection to the pool
|
||||
* @param {NATSClient} client - Connection to return
|
||||
*/
|
||||
release(client) {
|
||||
// Only return persistent connections
|
||||
if (client.keepAlive && client.isConnected()) {
|
||||
// Connection already in pool, do nothing
|
||||
return;
|
||||
}
|
||||
// Non-persistent connection - close it
|
||||
client.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all connections in the pool
|
||||
*/
|
||||
async closeAll() {
|
||||
for (const [id, client] of this.connections) {
|
||||
await client.close();
|
||||
}
|
||||
this.connections.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------- Core Functions ---------------------------------------------- //
|
||||
@@ -409,9 +471,11 @@ class NATSClient {
|
||||
* @param {string} subject - NATS subject to publish to
|
||||
* @param {string} message - JSON message to publish
|
||||
* @param {string} correlationId - Correlation ID for tracing
|
||||
* @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections)
|
||||
*/
|
||||
async function publishMessage(brokerUrlOrClient, subject, message, correlationId) {
|
||||
async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) {
|
||||
let conn;
|
||||
let shouldClose = false;
|
||||
|
||||
if (brokerUrlOrClient instanceof NATSClient) {
|
||||
conn = brokerUrlOrClient;
|
||||
@@ -425,15 +489,18 @@ async function publishMessage(brokerUrlOrClient, subject, message, correlationId
|
||||
await brokerUrlOrClient.close();
|
||||
}
|
||||
};
|
||||
shouldClose = true;
|
||||
} else {
|
||||
// String URL - create new client
|
||||
const client = new NATSClient(brokerUrlOrClient);
|
||||
conn = client;
|
||||
shouldClose = true;
|
||||
}
|
||||
|
||||
await conn.publish(subject, message, correlationId);
|
||||
|
||||
if (conn instanceof NATSClient) {
|
||||
// Only close if explicitly requested and it's a short-lived client
|
||||
if (shouldClose && closeConnection && conn instanceof NATSClient) {
|
||||
await conn.close();
|
||||
}
|
||||
}
|
||||
@@ -478,8 +545,6 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
|
||||
let encoding = 'base64';
|
||||
if (payloadType === 'jsontable') {
|
||||
encoding = 'json';
|
||||
} else if (payloadType === 'arrowtable') {
|
||||
encoding = 'arrow-ipc';
|
||||
}
|
||||
|
||||
return {
|
||||
@@ -504,7 +569,8 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
|
||||
*
|
||||
* @param {string} subject - NATS subject to publish the message to
|
||||
* @param {Array} data - List of [dataname, data, type] tuples to send
|
||||
* - type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||
* - type: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
|
||||
* - Note: "arrowtable" is NOT supported in browser (use "jsontable" for tabular data)
|
||||
* @param {Object} options - Optional configuration
|
||||
* @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server (WebSocket)
|
||||
* @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server
|
||||
@@ -528,17 +594,17 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
|
||||
* const [env, envJsonStr] = await NATSBridgeCSR.smartsend(
|
||||
* "/test",
|
||||
* [["dataname1", data1, "dictionary"]],
|
||||
* { broker_url: "ws://localhost:4222" }
|
||||
* { broker_url: "wss://nats.example.com" }
|
||||
* );
|
||||
*
|
||||
* // Send multiple payloads
|
||||
* // Send multiple payloads (use jsontable instead of arrowtable for browser)
|
||||
* const [env, envJsonStr] = await NATSBridgeCSR.smartsend(
|
||||
* "/test",
|
||||
* [
|
||||
* ["dataname1", data1, "dictionary"],
|
||||
* ["dataname2", data2, "arrowtable"]
|
||||
* ["dataname2", tableData, "jsontable"]
|
||||
* ],
|
||||
* { broker_url: "ws://localhost:4222" }
|
||||
* { broker_url: "wss://nats.example.com" }
|
||||
* );
|
||||
*/
|
||||
async function smartsend(subject, data, options = {}) {
|
||||
@@ -774,9 +840,37 @@ async function smartreceive(msg, options = {}) {
|
||||
const NATSBridgeCSR = {
|
||||
/**
|
||||
* NATS client class for connection management
|
||||
* Supports both single-use and persistent connection modes
|
||||
*
|
||||
* @example
|
||||
* // Single-use connection (closes after publish)
|
||||
* const client = new NATSBridgeCSR.NATSClient("wss://nats.example.com");
|
||||
* await NATSBridgeCSR.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
|
||||
* await client.close();
|
||||
*
|
||||
* // Persistent connection (keeps connection open)
|
||||
* const client = new NATSBridgeCSR.NATSClient("wss://nats.example.com", true);
|
||||
* await client.connect();
|
||||
* await NATSBridgeCSR.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false });
|
||||
* await NATSBridgeCSR.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id");
|
||||
* // Connection remains open for more publishes
|
||||
* await client.close();
|
||||
*/
|
||||
NATSClient,
|
||||
|
||||
/**
|
||||
* Connection pool for managing multiple NATS connections
|
||||
* Useful for applications with multiple concurrent publishers
|
||||
*
|
||||
* @example
|
||||
* const pool = new NATSBridgeCSR.NATSConnectionPool("wss://nats.example.com", 10);
|
||||
* const client = await pool.acquire();
|
||||
* await NATSBridgeCSR.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
|
||||
* pool.release(client);
|
||||
* await pool.closeAll();
|
||||
*/
|
||||
NATSConnectionPool,
|
||||
|
||||
/**
|
||||
* Send data via NATS with automatic transport selection
|
||||
*/
|
||||
@@ -787,6 +881,19 @@ const NATSBridgeCSR = {
|
||||
*/
|
||||
smartreceive,
|
||||
|
||||
/**
|
||||
* Publish message to NATS
|
||||
*
|
||||
* @example
|
||||
* // Using a persistent connection
|
||||
* const client = new NATSBridgeCSR.NATSClient("wss://nats.example.com", true);
|
||||
* await client.connect();
|
||||
* await NATSBridgeCSR.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false);
|
||||
* // Connection stays open for more publishes
|
||||
* await client.close();
|
||||
*/
|
||||
publishMessage,
|
||||
|
||||
/**
|
||||
* Upload data to plik server in one-shot mode
|
||||
*/
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
/**
|
||||
* NATSBridge - Cross-Platform Bi-Directional Data Bridge
|
||||
* JavaScript/Node.js Implementation (Client-Side Rendering)
|
||||
* JavaScript/Node.js Implementation (Desktop/Server-Side)
|
||||
*
|
||||
* This module provides functionality for sending and receiving data across network boundaries
|
||||
* using NATS as the message bus, with support for both direct payload transport and
|
||||
@@ -8,6 +8,12 @@
|
||||
*
|
||||
* Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||
*
|
||||
* Node.js-specific features:
|
||||
* - Apache Arrow IPC support via apache-arrow
|
||||
* - TCP NATS connections (nats:// or tls:// URLs)
|
||||
* - Buffer for binary data handling
|
||||
* - Connection pooling for high-throughput scenarios
|
||||
*
|
||||
* @module NATSBridge
|
||||
*/
|
||||
|
||||
@@ -342,15 +348,18 @@ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlatio
|
||||
|
||||
/**
|
||||
* NATS client wrapper for connection management
|
||||
* Supports both single-use and persistent connection modes
|
||||
*/
|
||||
class NATSClient {
|
||||
/**
|
||||
* Create a new NATS client
|
||||
* @param {string} url - NATS server URL
|
||||
* @param {string} url - NATS server URL (nats:// or tls://)
|
||||
* @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes
|
||||
*/
|
||||
constructor(url) {
|
||||
constructor(url, keepAlive = false) {
|
||||
this.url = url;
|
||||
this.connection = null;
|
||||
this.keepAlive = keepAlive;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -358,6 +367,9 @@ class NATSClient {
|
||||
* @returns {Promise<NATS.Connection>}
|
||||
*/
|
||||
async connect() {
|
||||
if (this.connection) {
|
||||
return this.connection;
|
||||
}
|
||||
this.connection = await nats.connect({ servers: this.url });
|
||||
return this.connection;
|
||||
}
|
||||
@@ -382,8 +394,94 @@ class NATSClient {
|
||||
async close() {
|
||||
if (this.connection) {
|
||||
this.connection.close();
|
||||
this.connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current connection (for external use)
|
||||
* @returns {NATS.Connection|null}
|
||||
*/
|
||||
getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if connected
|
||||
* @returns {boolean}
|
||||
*/
|
||||
isConnected() {
|
||||
return this.connection !== null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connection pool for managing multiple NATS connections
|
||||
* Useful for applications with multiple concurrent publishers
|
||||
*/
|
||||
class NATSConnectionPool {
|
||||
/**
|
||||
* Create a new connection pool
|
||||
* @param {string} url - NATS server URL (nats:// or tls://)
|
||||
* @param {number} [maxSize=10] - Maximum pool size
|
||||
*/
|
||||
constructor(url, maxSize = 10) {
|
||||
this.url = url;
|
||||
this.maxSize = maxSize;
|
||||
this.connections = new Map();
|
||||
this.idCounter = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a connection from the pool (or create new)
|
||||
* @returns {Promise<NATSClient>}
|
||||
*/
|
||||
async acquire() {
|
||||
// Try to find an existing idle connection
|
||||
for (const [id, client] of this.connections) {
|
||||
if (client.isConnected()) {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
// Create new connection if under limit
|
||||
if (this.connections.size < this.maxSize) {
|
||||
const id = `conn_${++this.idCounter}`;
|
||||
const client = new NATSClient(this.url, true);
|
||||
await client.connect();
|
||||
this.connections.set(id, client);
|
||||
return client;
|
||||
}
|
||||
|
||||
// Pool exhausted - create new connection (caller should close when done)
|
||||
const client = new NATSClient(this.url, false);
|
||||
await client.connect();
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a connection to the pool
|
||||
* @param {NATSClient} client - Connection to return
|
||||
*/
|
||||
release(client) {
|
||||
// Only return persistent connections
|
||||
if (client.keepAlive && client.isConnected()) {
|
||||
// Connection already in pool, do nothing
|
||||
return;
|
||||
}
|
||||
// Non-persistent connection - close it
|
||||
client.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all connections in the pool
|
||||
*/
|
||||
async closeAll() {
|
||||
for (const [id, client] of this.connections) {
|
||||
await client.close();
|
||||
}
|
||||
this.connections.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------- Core Functions ---------------------------------------------- //
|
||||
@@ -394,9 +492,11 @@ class NATSClient {
|
||||
* @param {string} subject - NATS subject to publish to
|
||||
* @param {string} message - JSON message to publish
|
||||
* @param {string} correlationId - Correlation ID for tracing
|
||||
* @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections)
|
||||
*/
|
||||
async function publishMessage(brokerUrlOrClient, subject, message, correlationId) {
|
||||
async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) {
|
||||
let conn;
|
||||
let shouldClose = false;
|
||||
|
||||
if (brokerUrlOrClient instanceof NATSClient) {
|
||||
conn = brokerUrlOrClient;
|
||||
@@ -410,15 +510,18 @@ async function publishMessage(brokerUrlOrClient, subject, message, correlationId
|
||||
await brokerUrlOrClient.close();
|
||||
}
|
||||
};
|
||||
shouldClose = true;
|
||||
} else {
|
||||
// String URL - create new client
|
||||
const client = new NATSClient(brokerUrlOrClient);
|
||||
conn = client;
|
||||
shouldClose = true;
|
||||
}
|
||||
|
||||
await conn.publish(subject, message, correlationId);
|
||||
|
||||
if (conn instanceof NATSClient) {
|
||||
// Only close if explicitly requested and it's a short-lived client
|
||||
if (shouldClose && closeConnection && conn instanceof NATSClient) {
|
||||
await conn.close();
|
||||
}
|
||||
}
|
||||
@@ -764,9 +867,37 @@ async function smartreceive(msg, options = {}) {
|
||||
const NATSBridge = {
|
||||
/**
|
||||
* NATS client class for connection management
|
||||
* Supports both single-use and persistent connection modes
|
||||
*
|
||||
* @example
|
||||
* // Single-use connection (closes after publish)
|
||||
* const client = new NATSBridge.NATSClient("nats://localhost:4222");
|
||||
* await NATSBridge.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
|
||||
* await client.close();
|
||||
*
|
||||
* // Persistent connection (keeps connection open)
|
||||
* const client = new NATSBridge.NATSClient("nats://localhost:4222", true);
|
||||
* await client.connect();
|
||||
* await NATSBridge.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false });
|
||||
* await NATSBridge.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id");
|
||||
* // Connection remains open for more publishes
|
||||
* await client.close();
|
||||
*/
|
||||
NATSClient,
|
||||
|
||||
/**
|
||||
* Connection pool for managing multiple NATS connections
|
||||
* Useful for applications with multiple concurrent publishers
|
||||
*
|
||||
* @example
|
||||
* const pool = new NATSBridge.NATSConnectionPool("nats://localhost:4222", 10);
|
||||
* const client = await pool.acquire();
|
||||
* await NATSBridge.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
|
||||
* pool.release(client);
|
||||
* await pool.closeAll();
|
||||
*/
|
||||
NATSConnectionPool,
|
||||
|
||||
/**
|
||||
* Send data via NATS with automatic transport selection
|
||||
*/
|
||||
@@ -777,6 +908,19 @@ const NATSBridge = {
|
||||
*/
|
||||
smartreceive,
|
||||
|
||||
/**
|
||||
* Publish message to NATS
|
||||
*
|
||||
* @example
|
||||
* // Using a persistent connection
|
||||
* const client = new NATSBridge.NATSClient("nats://localhost:4222", true);
|
||||
* await client.connect();
|
||||
* await NATSBridge.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false);
|
||||
* // Connection stays open for more publishes
|
||||
* await client.close();
|
||||
*/
|
||||
publishMessage,
|
||||
|
||||
/**
|
||||
* Upload data to plik server in one-shot mode
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user