update
This commit is contained in:
@@ -71,13 +71,14 @@ The Julia implementation provides:
|
|||||||
- **[`SmartSend()`](../src/julia_bridge.jl)**: Handles transport selection based on payload size
|
- **[`SmartSend()`](../src/julia_bridge.jl)**: Handles transport selection based on payload size
|
||||||
- **[`SmartReceive()`](../src/julia_bridge.jl)**: Handles both direct and link transport
|
- **[`SmartReceive()`](../src/julia_bridge.jl)**: Handles both direct and link transport
|
||||||
|
|
||||||
### JavaScript Module: [`src/js_bridge.js`](../src/js_bridge.js)
|
### JavaScript Module: [`src/NATSBridge.js`](../src/NATSBridge.js)
|
||||||
|
|
||||||
The JavaScript implementation provides:
|
The JavaScript implementation provides:
|
||||||
|
|
||||||
- **`MessageEnvelope` class**: For the unified JSON envelope
|
- **`MessageEnvelope` class**: For the unified JSON envelope
|
||||||
- **[`SmartSend()`](../src/js_bridge.js)**: Handles transport selection based on payload size
|
- **`MessagePayload` class**: For individual payload representation
|
||||||
- **[`SmartReceive()`](../src/js_bridge.js)**: Handles both direct and link transport
|
- **[`smartsend()`](../src/NATSBridge.js)**: Handles transport selection based on payload size
|
||||||
|
- **[`smartreceive()`](../src/NATSBridge.js)**: Handles both direct and link transport
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
@@ -167,16 +168,16 @@ payloads = smartreceive(msg, "http://localhost:8080")
|
|||||||
|
|
||||||
#### JavaScript (Sender)
|
#### JavaScript (Sender)
|
||||||
```javascript
|
```javascript
|
||||||
const { SmartSend } = require('./js_bridge');
|
const { smartsend } = require('./src/NATSBridge');
|
||||||
|
|
||||||
// Single payload wrapped in a list
|
// Single payload wrapped in a list
|
||||||
const config = [{
|
const config = [{
|
||||||
dataname: "config",
|
dataname: "config",
|
||||||
data: { step_size: 0.01, iterations: 1000 },
|
data: { step_size: 0.01, iterations: 1000 },
|
||||||
type: "json"
|
type: "dictionary"
|
||||||
}];
|
}];
|
||||||
|
|
||||||
await SmartSend("control", config, "json", {
|
await smartsend("control", config, {
|
||||||
correlationId: "unique-id"
|
correlationId: "unique-id"
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -185,16 +186,16 @@ const configs = [
|
|||||||
{
|
{
|
||||||
dataname: "config1",
|
dataname: "config1",
|
||||||
data: { step_size: 0.01 },
|
data: { step_size: 0.01 },
|
||||||
type: "json"
|
type: "dictionary"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
dataname: "config2",
|
dataname: "config2",
|
||||||
data: { iterations: 1000 },
|
data: { iterations: 1000 },
|
||||||
type: "json"
|
type: "dictionary"
|
||||||
}
|
}
|
||||||
];
|
];
|
||||||
|
|
||||||
await SmartSend("control", configs, "json");
|
await smartsend("control", configs);
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Julia (Receiver)
|
#### Julia (Receiver)
|
||||||
@@ -217,6 +218,25 @@ subscribe(nats, "control") do msg
|
|||||||
end
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### JavaScript (Receiver)
|
||||||
|
```javascript
|
||||||
|
const { smartreceive } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Subscribe to messages
|
||||||
|
const nc = await connect({ servers: ['nats://localhost:4222'] });
|
||||||
|
const sub = nc.subscribe("control");
|
||||||
|
|
||||||
|
for await (const msg of sub) {
|
||||||
|
const result = await smartreceive(msg);
|
||||||
|
|
||||||
|
// Process the result
|
||||||
|
for (const { dataname, data, type } of result) {
|
||||||
|
console.log(`Received ${dataname} of type ${type}`);
|
||||||
|
console.log(`Data: ${JSON.stringify(data)}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### Scenario 2: Deep Dive Analysis (Large Arrow Table)
|
### Scenario 2: Deep Dive Analysis (Large Arrow Table)
|
||||||
|
|
||||||
#### Julia (Sender)
|
#### Julia (Sender)
|
||||||
@@ -237,28 +257,29 @@ await SmartSend("analysis_results", [("table_data", df, "table")]);
|
|||||||
|
|
||||||
#### JavaScript (Receiver)
|
#### JavaScript (Receiver)
|
||||||
```javascript
|
```javascript
|
||||||
const { SmartReceive } = require('./js_bridge');
|
const { smartreceive } = require('./src/NATSBridge');
|
||||||
|
|
||||||
const result = await SmartReceive(msg);
|
const result = await smartreceive(msg);
|
||||||
|
|
||||||
// Use table data for visualization with Perspective.js or D3
|
// Use table data for visualization with Perspective.js or D3
|
||||||
const table = result.data;
|
// Note: Tables are sent as arrays of objects in JavaScript
|
||||||
|
const table = result;
|
||||||
```
|
```
|
||||||
|
|
||||||
### Scenario 3: Live Binary Processing
|
### Scenario 3: Live Binary Processing
|
||||||
|
|
||||||
#### JavaScript (Sender)
|
#### JavaScript (Sender)
|
||||||
```javascript
|
```javascript
|
||||||
const { SmartSend } = require('./js_bridge');
|
const { smartsend } = require('./src/NATSBridge');
|
||||||
|
|
||||||
// Binary data wrapped in a list
|
// Binary data wrapped in a list
|
||||||
const binaryData = [{
|
const binaryData = [{
|
||||||
dataname: "audio_chunk",
|
dataname: "audio_chunk",
|
||||||
data: binaryBuffer,
|
data: binaryBuffer, // ArrayBuffer or Uint8Array
|
||||||
type: "binary"
|
type: "binary"
|
||||||
}];
|
}];
|
||||||
|
|
||||||
await SmartSend("binary_input", binaryData, "binary", {
|
await smartsend("binary_input", binaryData, {
|
||||||
metadata: {
|
metadata: {
|
||||||
sample_rate: 44100,
|
sample_rate: 44100,
|
||||||
channels: 1
|
channels: 1
|
||||||
@@ -282,6 +303,25 @@ function process_binary(data)
|
|||||||
end
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### JavaScript (Receiver)
|
||||||
|
```javascript
|
||||||
|
const { smartreceive } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Receive binary data
|
||||||
|
function process_binary(msg) {
|
||||||
|
const result = await smartreceive(msg);
|
||||||
|
|
||||||
|
// Process the binary data
|
||||||
|
for (const { dataname, data, type } of result) {
|
||||||
|
if (type === "binary") {
|
||||||
|
// data is an ArrayBuffer or Uint8Array
|
||||||
|
console.log(`Received binary data: ${dataname}, size: ${data.length}`);
|
||||||
|
// Perform FFT or AI transcription here
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### Scenario 4: Catch-Up (JetStream)
|
### Scenario 4: Catch-Up (JetStream)
|
||||||
|
|
||||||
#### Julia (Producer)
|
#### Julia (Producer)
|
||||||
@@ -299,6 +339,7 @@ end
|
|||||||
#### JavaScript (Consumer)
|
#### JavaScript (Consumer)
|
||||||
```javascript
|
```javascript
|
||||||
const { connect } = require('nats');
|
const { connect } = require('nats');
|
||||||
|
const { smartreceive } = require('./src/NATSBridge');
|
||||||
|
|
||||||
const nc = await connect({ servers: ['nats://localhost:4222'] });
|
const nc = await connect({ servers: ['nats://localhost:4222'] });
|
||||||
const js = nc.jetstream();
|
const js = nc.jetstream();
|
||||||
@@ -312,9 +353,9 @@ const consumer = await js.pullSubscribe("health", {
|
|||||||
|
|
||||||
// Process historical and real-time messages
|
// Process historical and real-time messages
|
||||||
for await (const msg of consumer) {
|
for await (const msg of consumer) {
|
||||||
const result = await SmartReceive(msg);
|
const result = await smartreceive(msg);
|
||||||
// result.data contains the list of payloads
|
// result contains the list of payloads
|
||||||
// result.envelope contains the message envelope
|
// Each payload has: dataname, data, type
|
||||||
msg.ack();
|
msg.ack();
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@@ -351,22 +392,21 @@ smartsend(
|
|||||||
|
|
||||||
**JavaScript (Receiver):**
|
**JavaScript (Receiver):**
|
||||||
```javascript
|
```javascript
|
||||||
const { SmartReceive } = require('./js_bridge');
|
const { smartreceive, smartsend } = require('./src/NATSBridge');
|
||||||
|
|
||||||
// Receive NATS message with direct transport
|
// Receive NATS message with direct transport
|
||||||
const result = await SmartReceive(msg);
|
const result = await smartreceive(msg);
|
||||||
|
|
||||||
// Decode Base64 payload
|
// Decode Base64 payload (for direct transport)
|
||||||
// Parse Arrow IPC with zero-copy
|
// For tables, data is an array of objects
|
||||||
// Load into selection UI component (e.g., dropdown, table)
|
const table = result; // Array of objects
|
||||||
const table = result[2]; // Get the DataFrame from the tuple
|
|
||||||
|
|
||||||
// User makes selection
|
// User makes selection
|
||||||
const selection = uiComponent.getSelectedOption();
|
const selection = uiComponent.getSelectedOption();
|
||||||
|
|
||||||
// Send selection back to Julia
|
// Send selection back to Julia
|
||||||
await SmartSend("dashboard.response", [
|
await smartsend("dashboard.response", [
|
||||||
("selected_option", selection, "dictionary")
|
{ dataname: "selected_option", data: selection, type: "dictionary" }
|
||||||
]);
|
]);
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -416,7 +456,7 @@ smartsend(
|
|||||||
|
|
||||||
**JavaScript (Sender/Receiver):**
|
**JavaScript (Sender/Receiver):**
|
||||||
```javascript
|
```javascript
|
||||||
const { SmartSend, SmartReceive } = require('./js_bridge');
|
const { smartsend, smartreceive } = require('./src/NATSBridge');
|
||||||
|
|
||||||
// Build chat message with mixed content:
|
// Build chat message with mixed content:
|
||||||
// - User input text: direct transport
|
// - User input text: direct transport
|
||||||
@@ -441,7 +481,7 @@ const message = [
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
dataname: "image",
|
dataname: "image",
|
||||||
data: selectedImageBuffer, // Small image
|
data: selectedImageBuffer, // Small image (ArrayBuffer or Uint8Array)
|
||||||
type: "image"
|
type: "image"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -451,7 +491,7 @@ const message = [
|
|||||||
}
|
}
|
||||||
];
|
];
|
||||||
|
|
||||||
await SmartSend("chat.room123", message);
|
await smartsend("chat.room123", message);
|
||||||
```
|
```
|
||||||
|
|
||||||
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components.
|
**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components.
|
||||||
|
|||||||
28
package.json
Normal file
28
package.json
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
{
|
||||||
|
"name": "natsbridge",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"description": "Bi-Directional Data Bridge for JavaScript using NATS",
|
||||||
|
"main": "src/NATSBridge.js",
|
||||||
|
"scripts": {
|
||||||
|
"test": "echo \"Error: no test specified\" && exit 1",
|
||||||
|
"lint": "eslint src/*.js test/*.js"
|
||||||
|
},
|
||||||
|
"keywords": [
|
||||||
|
"nats",
|
||||||
|
"message-broker",
|
||||||
|
"bridge",
|
||||||
|
"arrow",
|
||||||
|
"serialization"
|
||||||
|
],
|
||||||
|
"author": "",
|
||||||
|
"license": "MIT",
|
||||||
|
"dependencies": {
|
||||||
|
"nats": "^2.9.0",
|
||||||
|
"apache-arrow": "^14.0.0",
|
||||||
|
"uuid": "^9.0.0"
|
||||||
|
},
|
||||||
|
"devDependencies": {
|
||||||
|
"eslint": "^8.0.0",
|
||||||
|
"jest": "^29.0.0"
|
||||||
|
}
|
||||||
|
}
|
||||||
706
src/NATSBridge.js
Normal file
706
src/NATSBridge.js
Normal file
@@ -0,0 +1,706 @@
|
|||||||
|
/**
|
||||||
|
* NATSBridge.js - Bi-Directional Data Bridge for JavaScript
|
||||||
|
* Implements smartsend and smartreceive for NATS communication
|
||||||
|
*
|
||||||
|
* This module provides functionality for sending and receiving data across network boundaries
|
||||||
|
* using NATS as the message bus, with support for both direct payload transport and
|
||||||
|
* URL-based transport for larger payloads.
|
||||||
|
*
|
||||||
|
* File Server Handler Architecture:
|
||||||
|
* The system uses handler functions to abstract file server operations, allowing support
|
||||||
|
* for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
|
||||||
|
*
|
||||||
|
* Handler Function Signatures:
|
||||||
|
*
|
||||||
|
* ```javascript
|
||||||
|
* // Upload handler - uploads data to file server and returns URL
|
||||||
|
* // The handler is passed to smartsend as fileserverUploadHandler parameter
|
||||||
|
* // It receives: (fileserver_url, dataname, data)
|
||||||
|
* // Returns: { status, uploadid, fileid, url }
|
||||||
|
* async function fileserverUploadHandler(fileserver_url, dataname, data) { ... }
|
||||||
|
*
|
||||||
|
* // Download handler - fetches data from file server URL with exponential backoff
|
||||||
|
* // The handler is passed to smartreceive as fileserverDownloadHandler parameter
|
||||||
|
* // It receives: (url, max_retries, base_delay, max_delay, correlation_id)
|
||||||
|
* // Returns: ArrayBuffer (the downloaded data)
|
||||||
|
* async function fileserverDownloadHandler(url, max_retries, base_delay, max_delay, correlation_id) { ... }
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* Multi-Payload Support (Standard API):
|
||||||
|
* The system uses a standardized list-of-tuples format for all payload operations.
|
||||||
|
* Even when sending a single payload, the user must wrap it in a list.
|
||||||
|
*
|
||||||
|
* API Standard:
|
||||||
|
* ```javascript
|
||||||
|
* // Input format for smartsend (always a list of tuples with type info)
|
||||||
|
* [{ dataname, data, type }, ...]
|
||||||
|
*
|
||||||
|
* // Output format for smartreceive (always returns a list of tuples)
|
||||||
|
* [{ dataname, data, type }, ...]
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||||
|
*/
|
||||||
|
|
||||||
|
// ---------------------------------------------- 100 --------------------------------------------- #
|
||||||
|
|
||||||
|
// Constants
|
||||||
|
const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB - threshold for switching from direct to link transport
|
||||||
|
const DEFAULT_NATS_URL = "nats://localhost:4222"; // Default NATS server URL
|
||||||
|
const DEFAULT_FILESERVER_URL = "http://localhost:8080"; // Default HTTP file server URL for link transport
|
||||||
|
|
||||||
|
// Helper: Generate UUID v4
|
||||||
|
function uuid4() {
|
||||||
|
// Simple UUID v4 generator
|
||||||
|
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
|
||||||
|
var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
|
||||||
|
return v.toString(16);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID and timestamp
|
||||||
|
function log_trace(correlation_id, message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Get size of data in bytes
|
||||||
|
function getDataSize(data) {
|
||||||
|
if (typeof data === 'string') {
|
||||||
|
return new TextEncoder().encode(data).length;
|
||||||
|
} else if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
||||||
|
return data.byteLength;
|
||||||
|
} else if (typeof data === 'object' && data !== null) {
|
||||||
|
// For objects, serialize to JSON and measure
|
||||||
|
return new TextEncoder().encode(JSON.stringify(data)).length;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Convert ArrayBuffer to Base64 string
|
||||||
|
function arrayBufferToBase64(buffer) {
|
||||||
|
const bytes = new Uint8Array(buffer);
|
||||||
|
let binary = '';
|
||||||
|
for (let i = 0; i < bytes.length; i++) {
|
||||||
|
binary += String.fromCharCode(bytes[i]);
|
||||||
|
}
|
||||||
|
return btoa(binary);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Convert Base64 string to ArrayBuffer
|
||||||
|
function base64ToArrayBuffer(base64) {
|
||||||
|
const binaryString = atob(base64);
|
||||||
|
const len = binaryString.length;
|
||||||
|
const bytes = new Uint8Array(len);
|
||||||
|
for (let i = 0; i < len; i++) {
|
||||||
|
bytes[i] = binaryString.charCodeAt(i);
|
||||||
|
}
|
||||||
|
return bytes.buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Serialize data based on type
|
||||||
|
function _serialize_data(data, type) {
|
||||||
|
/**
|
||||||
|
* Serialize data according to specified format
|
||||||
|
*
|
||||||
|
* Supported formats:
|
||||||
|
* - "text": Treats data as text and converts to UTF-8 bytes
|
||||||
|
* - "dictionary": Serializes data as JSON and returns the UTF-8 byte representation
|
||||||
|
* - "table": Serializes data as an Arrow IPC stream (table format) - NOT IMPLEMENTED (requires arrow library)
|
||||||
|
* - "image": Expects binary data (ArrayBuffer) and returns it as bytes
|
||||||
|
* - "audio": Expects binary data (ArrayBuffer) and returns it as bytes
|
||||||
|
* - "video": Expects binary data (ArrayBuffer) and returns it as bytes
|
||||||
|
* - "binary": Generic binary data (ArrayBuffer or Uint8Array) and returns bytes
|
||||||
|
*/
|
||||||
|
if (type === "text") {
|
||||||
|
if (typeof data === 'string') {
|
||||||
|
return new TextEncoder().encode(data).buffer;
|
||||||
|
} else {
|
||||||
|
throw new Error("Text data must be a String");
|
||||||
|
}
|
||||||
|
} else if (type === "dictionary") {
|
||||||
|
// JSON data - serialize directly
|
||||||
|
const jsonStr = JSON.stringify(data);
|
||||||
|
return new TextEncoder().encode(jsonStr).buffer;
|
||||||
|
} else if (type === "table") {
|
||||||
|
// Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
|
||||||
|
// This would require the apache-arrow library
|
||||||
|
throw new Error("Table serialization requires apache-arrow library");
|
||||||
|
} else if (type === "image") {
|
||||||
|
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
||||||
|
return data instanceof ArrayBuffer ? data : data.buffer;
|
||||||
|
} else {
|
||||||
|
throw new Error("Image data must be ArrayBuffer or Uint8Array");
|
||||||
|
}
|
||||||
|
} else if (type === "audio") {
|
||||||
|
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
||||||
|
return data instanceof ArrayBuffer ? data : data.buffer;
|
||||||
|
} else {
|
||||||
|
throw new Error("Audio data must be ArrayBuffer or Uint8Array");
|
||||||
|
}
|
||||||
|
} else if (type === "video") {
|
||||||
|
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
||||||
|
return data instanceof ArrayBuffer ? data : data.buffer;
|
||||||
|
} else {
|
||||||
|
throw new Error("Video data must be ArrayBuffer or Uint8Array");
|
||||||
|
}
|
||||||
|
} else if (type === "binary") {
|
||||||
|
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
||||||
|
return data instanceof ArrayBuffer ? data : data.buffer;
|
||||||
|
} else {
|
||||||
|
throw new Error("Binary data must be ArrayBuffer or Uint8Array");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new Error(`Unknown type: ${type}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Deserialize bytes based on type
|
||||||
|
function _deserialize_data(data, type, correlation_id) {
|
||||||
|
/**
|
||||||
|
* Deserialize bytes to data based on type
|
||||||
|
*
|
||||||
|
* Supported formats:
|
||||||
|
* - "text": Converts bytes to string
|
||||||
|
* - "dictionary": Parses JSON string
|
||||||
|
* - "table": Parses Arrow IPC stream - NOT IMPLEMENTED (requires apache-arrow library)
|
||||||
|
* - "image": Returns binary data
|
||||||
|
* - "audio": Returns binary data
|
||||||
|
* - "video": Returns binary data
|
||||||
|
* - "binary": Returns binary data
|
||||||
|
*/
|
||||||
|
if (type === "text") {
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
return decoder.decode(new Uint8Array(data));
|
||||||
|
} else if (type === "dictionary") {
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
const jsonStr = decoder.decode(new Uint8Array(data));
|
||||||
|
return JSON.parse(jsonStr);
|
||||||
|
} else if (type === "table") {
|
||||||
|
// Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
|
||||||
|
throw new Error("Table deserialization requires apache-arrow library");
|
||||||
|
} else if (type === "image") {
|
||||||
|
return data;
|
||||||
|
} else if (type === "audio") {
|
||||||
|
return data;
|
||||||
|
} else if (type === "video") {
|
||||||
|
return data;
|
||||||
|
} else if (type === "binary") {
|
||||||
|
return data;
|
||||||
|
} else {
|
||||||
|
throw new Error(`Unknown type: ${type}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Upload data to file server
|
||||||
|
async function _upload_to_fileserver(fileserver_url, dataname, data, correlation_id) {
|
||||||
|
/**
|
||||||
|
* Upload data to HTTP file server (plik-like API)
|
||||||
|
*
|
||||||
|
* This function implements the plik one-shot upload mode:
|
||||||
|
* 1. Creates a one-shot upload session by sending POST request with {"OneShot": true}
|
||||||
|
* 2. Uploads the file data as multipart form data
|
||||||
|
* 3. Returns identifiers and download URL for the uploaded file
|
||||||
|
*/
|
||||||
|
log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`);
|
||||||
|
|
||||||
|
// Step 1: Get upload ID and token
|
||||||
|
const url_getUploadID = `${fileserver_url}/upload`;
|
||||||
|
const headers = {
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
};
|
||||||
|
const body = JSON.stringify({ OneShot: true });
|
||||||
|
|
||||||
|
let response = await fetch(url_getUploadID, {
|
||||||
|
method: "POST",
|
||||||
|
headers: headers,
|
||||||
|
body: body
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const responseJson = await response.json();
|
||||||
|
const uploadid = responseJson.id;
|
||||||
|
const uploadtoken = responseJson.uploadToken;
|
||||||
|
|
||||||
|
// Step 2: Upload file data
|
||||||
|
const url_upload = `${fileserver_url}/file/${uploadid}`;
|
||||||
|
|
||||||
|
// Create multipart form data
|
||||||
|
const formData = new FormData();
|
||||||
|
// Create a Blob from the ArrayBuffer
|
||||||
|
const blob = new Blob([data], { type: "application/octet-stream" });
|
||||||
|
formData.append("file", blob, dataname);
|
||||||
|
|
||||||
|
response = await fetch(url_upload, {
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"X-UploadToken": uploadtoken
|
||||||
|
},
|
||||||
|
body: formData
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileResponseJson = await response.json();
|
||||||
|
const fileid = fileResponseJson.id;
|
||||||
|
|
||||||
|
// Build the download URL
|
||||||
|
const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
|
||||||
|
|
||||||
|
log_trace(correlation_id, `Uploaded to URL: ${url}`);
|
||||||
|
|
||||||
|
return {
|
||||||
|
status: response.status,
|
||||||
|
uploadid: uploadid,
|
||||||
|
fileid: fileid,
|
||||||
|
url: url
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Fetch data from URL with exponential backoff
|
||||||
|
async function _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) {
|
||||||
|
/**
|
||||||
|
* Fetch data from URL with retry logic using exponential backoff
|
||||||
|
*/
|
||||||
|
let delay = base_delay;
|
||||||
|
|
||||||
|
for (let attempt = 1; attempt <= max_retries; attempt++) {
|
||||||
|
try {
|
||||||
|
const response = await fetch(url);
|
||||||
|
|
||||||
|
if (response.status === 200) {
|
||||||
|
log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`);
|
||||||
|
const arrayBuffer = await response.arrayBuffer();
|
||||||
|
return arrayBuffer;
|
||||||
|
} else {
|
||||||
|
throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
log_trace(correlation_id, `Attempt ${attempt} failed: ${e.message}`);
|
||||||
|
|
||||||
|
if (attempt < max_retries) {
|
||||||
|
// Sleep with exponential backoff
|
||||||
|
await new Promise(resolve => setTimeout(resolve, delay));
|
||||||
|
delay = Math.min(delay * 2, max_delay);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Error(`Failed to fetch data after ${max_retries} attempts`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Get payload bytes from data
|
||||||
|
function _get_payload_bytes(data) {
|
||||||
|
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
||||||
|
return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
|
||||||
|
} else if (typeof data === 'string') {
|
||||||
|
return new TextEncoder().encode(data);
|
||||||
|
} else {
|
||||||
|
// For objects, serialize to JSON
|
||||||
|
return new TextEncoder().encode(JSON.stringify(data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MessagePayload class
|
||||||
|
class MessagePayload {
|
||||||
|
/**
|
||||||
|
* Represents a single payload in the message envelope
|
||||||
|
*
|
||||||
|
* @param {Object} options - Payload options
|
||||||
|
* @param {string} options.id - ID of this payload (e.g., "uuid4")
|
||||||
|
* @param {string} options.dataname - Name of this payload (e.g., "login_image")
|
||||||
|
* @param {string} options.type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||||
|
* @param {string} options.transport - "direct" or "link"
|
||||||
|
* @param {string} options.encoding - "none", "json", "base64", "arrow-ipc"
|
||||||
|
* @param {number} options.size - Data size in bytes
|
||||||
|
* @param {string|ArrayBuffer} options.data - Payload data (direct) or URL (link)
|
||||||
|
* @param {Object} options.metadata - Metadata for this payload
|
||||||
|
*/
|
||||||
|
constructor(options) {
|
||||||
|
this.id = options.id || uuid4();
|
||||||
|
this.dataname = options.dataname;
|
||||||
|
this.type = options.type;
|
||||||
|
this.transport = options.transport;
|
||||||
|
this.encoding = options.encoding;
|
||||||
|
this.size = options.size;
|
||||||
|
this.data = options.data;
|
||||||
|
this.metadata = options.metadata || {};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert to JSON object
|
||||||
|
toJSON() {
|
||||||
|
const obj = {
|
||||||
|
id: this.id,
|
||||||
|
dataname: this.dataname,
|
||||||
|
type: this.type,
|
||||||
|
transport: this.transport,
|
||||||
|
encoding: this.encoding,
|
||||||
|
size: this.size
|
||||||
|
};
|
||||||
|
|
||||||
|
// Include data based on transport type
|
||||||
|
if (this.transport === "direct" && this.data !== null) {
|
||||||
|
if (this.encoding === "base64" || this.encoding === "json") {
|
||||||
|
obj.data = this.data;
|
||||||
|
} else {
|
||||||
|
// For other encodings, use base64
|
||||||
|
const payloadBytes = _get_payload_bytes(this.data);
|
||||||
|
obj.data = arrayBufferToBase64(payloadBytes);
|
||||||
|
}
|
||||||
|
} else if (this.transport === "link" && this.data !== null) {
|
||||||
|
// For link transport, data is a URL string
|
||||||
|
obj.data = this.data;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Object.keys(this.metadata).length > 0) {
|
||||||
|
obj.metadata = this.metadata;
|
||||||
|
}
|
||||||
|
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MessageEnvelope class
|
||||||
|
class MessageEnvelope {
|
||||||
|
/**
|
||||||
|
* Represents the message envelope containing metadata and payloads
|
||||||
|
*
|
||||||
|
* @param {Object} options - Envelope options
|
||||||
|
* @param {string} options.sendTo - Topic/subject the sender sends to
|
||||||
|
* @param {Array<MessagePayload>} options.payloads - Array of payloads
|
||||||
|
* @param {string} options.correlationId - Unique identifier to track messages
|
||||||
|
* @param {string} options.msgId - This message id
|
||||||
|
* @param {string} options.timestamp - Message published timestamp
|
||||||
|
* @param {string} options.msgPurpose - Purpose of this message
|
||||||
|
* @param {string} options.senderName - Name of the sender
|
||||||
|
* @param {string} options.senderId - UUID of the sender
|
||||||
|
* @param {string} options.receiverName - Name of the receiver
|
||||||
|
* @param {string} options.receiverId - UUID of the receiver
|
||||||
|
* @param {string} options.replyTo - Topic to reply to
|
||||||
|
* @param {string} options.replyToMsgId - Message id this message is replying to
|
||||||
|
* @param {string} options.brokerURL - NATS server address
|
||||||
|
* @param {Object} options.metadata - Metadata for the envelope
|
||||||
|
*/
|
||||||
|
constructor(options) {
|
||||||
|
this.correlationId = options.correlationId || uuid4();
|
||||||
|
this.msgId = options.msgId || uuid4();
|
||||||
|
this.timestamp = options.timestamp || new Date().toISOString();
|
||||||
|
this.sendTo = options.sendTo;
|
||||||
|
this.msgPurpose = options.msgPurpose || "";
|
||||||
|
this.senderName = options.senderName || "";
|
||||||
|
this.senderId = options.senderId || uuid4();
|
||||||
|
this.receiverName = options.receiverName || "";
|
||||||
|
this.receiverId = options.receiverId || "";
|
||||||
|
this.replyTo = options.replyTo || "";
|
||||||
|
this.replyToMsgId = options.replyToMsgId || "";
|
||||||
|
this.brokerURL = options.brokerURL || DEFAULT_NATS_URL;
|
||||||
|
this.metadata = options.metadata || {};
|
||||||
|
this.payloads = options.payloads || [];
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert to JSON string
|
||||||
|
toJSON() {
|
||||||
|
const obj = {
|
||||||
|
correlationId: this.correlationId,
|
||||||
|
msgId: this.msgId,
|
||||||
|
timestamp: this.timestamp,
|
||||||
|
sendTo: this.sendTo,
|
||||||
|
msgPurpose: this.msgPurpose,
|
||||||
|
senderName: this.senderName,
|
||||||
|
senderId: this.senderId,
|
||||||
|
receiverName: this.receiverName,
|
||||||
|
receiverId: this.receiverId,
|
||||||
|
replyTo: this.replyTo,
|
||||||
|
replyToMsgId: this.replyToMsgId,
|
||||||
|
brokerURL: this.brokerURL
|
||||||
|
};
|
||||||
|
|
||||||
|
if (Object.keys(this.metadata).length > 0) {
|
||||||
|
obj.metadata = this.metadata;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.payloads.length > 0) {
|
||||||
|
obj.payloads = this.payloads.map(p => p.toJSON());
|
||||||
|
}
|
||||||
|
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert to JSON string
|
||||||
|
toString() {
|
||||||
|
return JSON.stringify(this.toJSON());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SmartSend function
|
||||||
|
async function smartsend(subject, data, options = {}) {
|
||||||
|
/**
|
||||||
|
* Send data either directly via NATS or via a fileserver URL, depending on payload size
|
||||||
|
*
|
||||||
|
* This function intelligently routes data delivery based on payload size relative to a threshold.
|
||||||
|
* If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS.
|
||||||
|
* Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS.
|
||||||
|
*
|
||||||
|
* @param {string} subject - NATS subject to publish the message to
|
||||||
|
* @param {Array} data - List of {dataname, data, type} objects to send
|
||||||
|
* @param {Object} options - Additional options
|
||||||
|
* @param {string} options.natsUrl - URL of the NATS server (default: "nats://localhost:4222")
|
||||||
|
* @param {string} options.fileserverUrl - Base URL of the file server (default: "http://localhost:8080")
|
||||||
|
* @param {Function} options.fileserverUploadHandler - Function to handle fileserver uploads
|
||||||
|
* @param {number} options.sizeThreshold - Threshold in bytes separating direct vs link transport (default: 1MB)
|
||||||
|
* @param {string} options.correlationId - Optional correlation ID for tracing
|
||||||
|
* @param {string} options.msgPurpose - Purpose of the message (default: "chat")
|
||||||
|
* @param {string} options.senderName - Name of the sender (default: "NATSBridge")
|
||||||
|
* @param {string} options.receiverName - Name of the receiver (default: "")
|
||||||
|
* @param {string} options.receiverId - UUID of the receiver (default: "")
|
||||||
|
* @param {string} options.replyTo - Topic to reply to (default: "")
|
||||||
|
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "")
|
||||||
|
*
|
||||||
|
* @returns {Promise<MessageEnvelope>} - The envelope for tracking
|
||||||
|
*/
|
||||||
|
const {
|
||||||
|
natsUrl = DEFAULT_NATS_URL,
|
||||||
|
fileserverUrl = DEFAULT_FILESERVER_URL,
|
||||||
|
fileserverUploadHandler = _upload_to_fileserver,
|
||||||
|
sizeThreshold = DEFAULT_SIZE_THRESHOLD,
|
||||||
|
correlationId = uuid4(),
|
||||||
|
msgPurpose = "chat",
|
||||||
|
senderName = "NATSBridge",
|
||||||
|
receiverName = "",
|
||||||
|
receiverId = "",
|
||||||
|
replyTo = "",
|
||||||
|
replyToMsgId = ""
|
||||||
|
} = options;
|
||||||
|
|
||||||
|
log_trace(correlationId, `Starting smartsend for subject: ${subject}`);
|
||||||
|
|
||||||
|
// Generate message metadata
|
||||||
|
const msgId = uuid4();
|
||||||
|
|
||||||
|
// Process each payload in the list
|
||||||
|
const payloads = [];
|
||||||
|
|
||||||
|
for (const payload of data) {
|
||||||
|
const dataname = payload.dataname;
|
||||||
|
const payloadData = payload.data;
|
||||||
|
const payloadType = payload.type;
|
||||||
|
|
||||||
|
// Serialize data based on type
|
||||||
|
const payloadBytes = _serialize_data(payloadData, payloadType);
|
||||||
|
const payloadSize = payloadBytes.byteLength;
|
||||||
|
|
||||||
|
log_trace(correlationId, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);
|
||||||
|
|
||||||
|
// Decision: Direct vs Link
|
||||||
|
if (payloadSize < sizeThreshold) {
|
||||||
|
// Direct path - Base64 encode and send via NATS
|
||||||
|
const payloadB64 = arrayBufferToBase64(payloadBytes);
|
||||||
|
log_trace(correlationId, `Using direct transport for ${payloadSize} bytes`);
|
||||||
|
|
||||||
|
// Create MessagePayload for direct transport
|
||||||
|
const payloadObj = new MessagePayload({
|
||||||
|
dataname: dataname,
|
||||||
|
type: payloadType,
|
||||||
|
transport: "direct",
|
||||||
|
encoding: "base64",
|
||||||
|
size: payloadSize,
|
||||||
|
data: payloadB64,
|
||||||
|
metadata: { payload_bytes: payloadSize }
|
||||||
|
});
|
||||||
|
payloads.push(payloadObj);
|
||||||
|
} else {
|
||||||
|
// Link path - Upload to HTTP server, send URL via NATS
|
||||||
|
log_trace(correlationId, `Using link transport, uploading to fileserver`);
|
||||||
|
|
||||||
|
// Upload to HTTP server
|
||||||
|
const response = await fileserverUploadHandler(fileserverUrl, dataname, payloadBytes, correlationId);
|
||||||
|
|
||||||
|
if (response.status !== 200) {
|
||||||
|
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const url = response.url;
|
||||||
|
log_trace(correlationId, `Uploaded to URL: ${url}`);
|
||||||
|
|
||||||
|
// Create MessagePayload for link transport
|
||||||
|
const payloadObj = new MessagePayload({
|
||||||
|
dataname: dataname,
|
||||||
|
type: payloadType,
|
||||||
|
transport: "link",
|
||||||
|
encoding: "none",
|
||||||
|
size: payloadSize,
|
||||||
|
data: url,
|
||||||
|
metadata: {}
|
||||||
|
});
|
||||||
|
payloads.push(payloadObj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create MessageEnvelope with all payloads
|
||||||
|
const env = new MessageEnvelope({
|
||||||
|
correlationId: correlationId,
|
||||||
|
msgId: msgId,
|
||||||
|
sendTo: subject,
|
||||||
|
msgPurpose: msgPurpose,
|
||||||
|
senderName: senderName,
|
||||||
|
receiverName: receiverName,
|
||||||
|
receiverId: receiverId,
|
||||||
|
replyTo: replyTo,
|
||||||
|
replyToMsgId: replyToMsgId,
|
||||||
|
brokerURL: natsUrl,
|
||||||
|
payloads: payloads
|
||||||
|
});
|
||||||
|
|
||||||
|
// Publish message to NATS
|
||||||
|
await publish_message(natsUrl, subject, env.toString(), correlationId);
|
||||||
|
|
||||||
|
return env;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Publish message to NATS
|
||||||
|
async function publish_message(natsUrl, subject, message, correlation_id) {
|
||||||
|
/**
|
||||||
|
* Publish a message to a NATS subject with proper connection management
|
||||||
|
*
|
||||||
|
* @param {string} natsUrl - NATS server URL
|
||||||
|
* @param {string} subject - NATS subject to publish to
|
||||||
|
* @param {string} message - JSON message to publish
|
||||||
|
* @param {string} correlation_id - Correlation ID for logging
|
||||||
|
*/
|
||||||
|
log_trace(correlation_id, `Publishing message to ${subject}`);
|
||||||
|
|
||||||
|
// For Node.js, we would use nats.js library
|
||||||
|
// This is a placeholder that throws an error
|
||||||
|
// In production, you would import and use the actual nats library
|
||||||
|
|
||||||
|
// Example with nats.js:
|
||||||
|
// import { connect } from 'nats';
|
||||||
|
// const nc = await connect({ servers: [natsUrl] });
|
||||||
|
// await nc.publish(subject, message);
|
||||||
|
// nc.close();
|
||||||
|
|
||||||
|
// For now, just log the message
|
||||||
|
console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// SmartReceive function
|
||||||
|
async function smartreceive(msg, options = {}) {
|
||||||
|
/**
|
||||||
|
* Receive and process messages from NATS
|
||||||
|
*
|
||||||
|
* This function processes incoming NATS messages, handling both direct transport
|
||||||
|
* (base64 decoded payloads) and link transport (URL-based payloads).
|
||||||
|
*
|
||||||
|
* @param {Object} msg - NATS message object with payload property
|
||||||
|
* @param {Object} options - Additional options
|
||||||
|
* @param {Function} options.fileserverDownloadHandler - Function to handle downloading data from file server URLs
|
||||||
|
* @param {number} options.maxRetries - Maximum retry attempts for fetching URL (default: 5)
|
||||||
|
* @param {number} options.baseDelay - Initial delay for exponential backoff in ms (default: 100)
|
||||||
|
* @param {number} options.maxDelay - Maximum delay for exponential backoff in ms (default: 5000)
|
||||||
|
*
|
||||||
|
* @returns {Promise<Array>} - List of {dataname, data, type} objects
|
||||||
|
*/
|
||||||
|
const {
|
||||||
|
fileserverDownloadHandler = _fetch_with_backoff,
|
||||||
|
maxRetries = 5,
|
||||||
|
baseDelay = 100,
|
||||||
|
maxDelay = 5000
|
||||||
|
} = options;
|
||||||
|
|
||||||
|
// Parse the JSON envelope
|
||||||
|
const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
|
||||||
|
const json_data = JSON.parse(jsonStr);
|
||||||
|
|
||||||
|
log_trace(json_data.correlationId, `Processing received message`);
|
||||||
|
|
||||||
|
// Process all payloads in the envelope
|
||||||
|
const payloads_list = [];
|
||||||
|
|
||||||
|
// Get number of payloads
|
||||||
|
const num_payloads = json_data.payloads ? json_data.payloads.length : 0;
|
||||||
|
|
||||||
|
for (let i = 0; i < num_payloads; i++) {
|
||||||
|
const payload = json_data.payloads[i];
|
||||||
|
const transport = payload.transport;
|
||||||
|
const dataname = payload.dataname;
|
||||||
|
|
||||||
|
if (transport === "direct") {
|
||||||
|
// Direct transport - payload is in the message
|
||||||
|
log_trace(json_data.correlationId, `Direct transport - decoding payload '${dataname}'`);
|
||||||
|
|
||||||
|
// Extract base64 payload from the payload
|
||||||
|
const payload_b64 = payload.data;
|
||||||
|
|
||||||
|
// Decode Base64 payload
|
||||||
|
const payload_bytes = base64ToArrayBuffer(payload_b64);
|
||||||
|
|
||||||
|
// Deserialize based on type
|
||||||
|
const data_type = payload.type;
|
||||||
|
const data = _deserialize_data(payload_bytes, data_type, json_data.correlationId);
|
||||||
|
|
||||||
|
payloads_list.push({ dataname, data, type: data_type });
|
||||||
|
} else if (transport === "link") {
|
||||||
|
// Link transport - payload is at URL
|
||||||
|
const url = payload.data;
|
||||||
|
log_trace(json_data.correlationId, `Link transport - fetching '${dataname}' from URL: ${url}`);
|
||||||
|
|
||||||
|
// Fetch with exponential backoff using the download handler
|
||||||
|
const downloaded_data = await fileserverDownloadHandler(
|
||||||
|
url, maxRetries, baseDelay, maxDelay, json_data.correlationId
|
||||||
|
);
|
||||||
|
|
||||||
|
// Deserialize based on type
|
||||||
|
const data_type = payload.type;
|
||||||
|
const data = _deserialize_data(downloaded_data, data_type, json_data.correlationId);
|
||||||
|
|
||||||
|
payloads_list.push({ dataname, data, type: data_type });
|
||||||
|
} else {
|
||||||
|
throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return payloads_list;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Export for Node.js
|
||||||
|
if (typeof module !== 'undefined' && module.exports) {
|
||||||
|
module.exports = {
|
||||||
|
MessageEnvelope,
|
||||||
|
MessagePayload,
|
||||||
|
smartsend,
|
||||||
|
smartreceive,
|
||||||
|
_serialize_data,
|
||||||
|
_deserialize_data,
|
||||||
|
_fetch_with_backoff,
|
||||||
|
_upload_to_fileserver,
|
||||||
|
DEFAULT_SIZE_THRESHOLD,
|
||||||
|
DEFAULT_NATS_URL,
|
||||||
|
DEFAULT_FILESERVER_URL,
|
||||||
|
uuid4,
|
||||||
|
log_trace
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Export for browser
|
||||||
|
if (typeof window !== 'undefined') {
|
||||||
|
window.NATSBridge = {
|
||||||
|
MessageEnvelope,
|
||||||
|
MessagePayload,
|
||||||
|
smartsend,
|
||||||
|
smartreceive,
|
||||||
|
_serialize_data,
|
||||||
|
_deserialize_data,
|
||||||
|
_fetch_with_backoff,
|
||||||
|
_upload_to_fileserver,
|
||||||
|
DEFAULT_SIZE_THRESHOLD,
|
||||||
|
DEFAULT_NATS_URL,
|
||||||
|
DEFAULT_FILESERVER_URL,
|
||||||
|
uuid4,
|
||||||
|
log_trace
|
||||||
|
};
|
||||||
|
}
|
||||||
79
test/test_js_to_js_dict_receiver.js
Normal file
79
test/test_js_to_js_dict_receiver.js
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
// Test script for Dictionary transport testing
|
||||||
|
// Tests receiving 1 large and 1 small Dictionaries via direct and link transport
|
||||||
|
// Uses NATSBridge.js smartreceive with "dictionary" type
|
||||||
|
|
||||||
|
const { smartreceive, log_trace } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
const SUBJECT = "/NATSBridge_dict_test";
|
||||||
|
const NATS_URL = "nats.yiem.cc";
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID
|
||||||
|
function log_trace(message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Receiver: Listen for messages and verify Dictionary handling
|
||||||
|
async function test_dict_receive() {
|
||||||
|
// Connect to NATS
|
||||||
|
const { connect } = require('nats');
|
||||||
|
const nc = await connect({ servers: [NATS_URL] });
|
||||||
|
|
||||||
|
// Subscribe to the subject
|
||||||
|
const sub = nc.subscribe(SUBJECT);
|
||||||
|
|
||||||
|
for await (const msg of sub) {
|
||||||
|
log_trace(`Received message on ${msg.subject}`);
|
||||||
|
|
||||||
|
// Use NATSBridge.smartreceive to handle the data
|
||||||
|
const result = await smartreceive(
|
||||||
|
msg,
|
||||||
|
{
|
||||||
|
maxRetries: 5,
|
||||||
|
baseDelay: 100,
|
||||||
|
maxDelay: 5000
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Result is a list of {dataname, data, type} objects
|
||||||
|
for (const { dataname, data, type } of result) {
|
||||||
|
if (typeof data === 'object' && data !== null && !Array.isArray(data)) {
|
||||||
|
log_trace(`Received Dictionary '${dataname}' of type ${type}`);
|
||||||
|
|
||||||
|
// Display dictionary contents
|
||||||
|
console.log(" Contents:");
|
||||||
|
for (const [key, value] of Object.entries(data)) {
|
||||||
|
console.log(` ${key} => ${value}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save to JSON file
|
||||||
|
const fs = require('fs');
|
||||||
|
const output_path = `./received_${dataname}.json`;
|
||||||
|
const json_str = JSON.stringify(data, null, 2);
|
||||||
|
fs.writeFileSync(output_path, json_str);
|
||||||
|
log_trace(`Saved Dictionary to ${output_path}`);
|
||||||
|
} else {
|
||||||
|
log_trace(`Received unexpected data type for '${dataname}': ${typeof data}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep listening for 10 seconds
|
||||||
|
setTimeout(() => {
|
||||||
|
nc.close();
|
||||||
|
process.exit(0);
|
||||||
|
}, 120000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
console.log("Starting Dictionary transport test...");
|
||||||
|
console.log("Note: This receiver will wait for messages from the sender.");
|
||||||
|
console.log("Run test_js_to_js_dict_sender.js first to send test data.");
|
||||||
|
|
||||||
|
// Run receiver
|
||||||
|
console.log("testing smartreceive");
|
||||||
|
test_dict_receive();
|
||||||
|
|
||||||
|
console.log("Test completed.");
|
||||||
164
test/test_js_to_js_dict_sender.js
Normal file
164
test/test_js_to_js_dict_sender.js
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
// Test script for Dictionary transport testing
|
||||||
|
// Tests sending 1 large and 1 small Dictionaries via direct and link transport
|
||||||
|
// Uses NATSBridge.js smartsend with "dictionary" type
|
||||||
|
|
||||||
|
const { smartsend, uuid4, log_trace } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
const SUBJECT = "/NATSBridge_dict_test";
|
||||||
|
const NATS_URL = "nats.yiem.cc";
|
||||||
|
const FILESERVER_URL = "http://192.168.88.104:8080";
|
||||||
|
|
||||||
|
// Create correlation ID for tracing
|
||||||
|
const correlation_id = uuid4();
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID
|
||||||
|
function log_trace(message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// File upload handler for plik server
|
||||||
|
async function plik_upload_handler(fileserver_url, dataname, data, correlation_id) {
|
||||||
|
// Get upload ID
|
||||||
|
const url_getUploadID = `${fileserver_url}/upload`;
|
||||||
|
const headers = {
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
};
|
||||||
|
const body = JSON.stringify({ OneShot: true });
|
||||||
|
|
||||||
|
let response = await fetch(url_getUploadID, {
|
||||||
|
method: "POST",
|
||||||
|
headers: headers,
|
||||||
|
body: body
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const responseJson = await response.json();
|
||||||
|
const uploadid = responseJson.id;
|
||||||
|
const uploadtoken = responseJson.uploadToken;
|
||||||
|
|
||||||
|
// Upload file
|
||||||
|
const formData = new FormData();
|
||||||
|
const blob = new Blob([data], { type: "application/octet-stream" });
|
||||||
|
formData.append("file", blob, dataname);
|
||||||
|
|
||||||
|
response = await fetch(`${fileserver_url}/file/${uploadid}`, {
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"X-UploadToken": uploadtoken
|
||||||
|
},
|
||||||
|
body: formData
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileResponseJson = await response.json();
|
||||||
|
const fileid = fileResponseJson.id;
|
||||||
|
|
||||||
|
const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
|
||||||
|
|
||||||
|
return {
|
||||||
|
status: response.status,
|
||||||
|
uploadid: uploadid,
|
||||||
|
fileid: fileid,
|
||||||
|
url: url
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sender: Send Dictionaries via smartsend
|
||||||
|
async function test_dict_send() {
|
||||||
|
// Create a small Dictionary (will use direct transport)
|
||||||
|
const small_dict = {
|
||||||
|
name: "Alice",
|
||||||
|
age: 30,
|
||||||
|
scores: [95, 88, 92],
|
||||||
|
metadata: {
|
||||||
|
height: 155,
|
||||||
|
weight: 55
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Create a large Dictionary (will use link transport if > 1MB)
|
||||||
|
const large_dict_ids = [];
|
||||||
|
const large_dict_names = [];
|
||||||
|
const large_dict_scores = [];
|
||||||
|
const large_dict_categories = [];
|
||||||
|
|
||||||
|
for (let i = 0; i < 50000; i++) {
|
||||||
|
large_dict_ids.push(i + 1);
|
||||||
|
large_dict_names.push(`User_${i}`);
|
||||||
|
large_dict_scores.push(Math.floor(Math.random() * 100) + 1);
|
||||||
|
large_dict_categories.push(`Category_${Math.floor(Math.random() * 10) + 1}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const large_dict = {
|
||||||
|
ids: large_dict_ids,
|
||||||
|
names: large_dict_names,
|
||||||
|
scores: large_dict_scores,
|
||||||
|
categories: large_dict_categories,
|
||||||
|
metadata: {
|
||||||
|
source: "test_generator",
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Test data 1: small Dictionary
|
||||||
|
const data1 = { dataname: "small_dict", data: small_dict, type: "dictionary" };
|
||||||
|
|
||||||
|
// Test data 2: large Dictionary
|
||||||
|
const data2 = { dataname: "large_dict", data: large_dict, type: "dictionary" };
|
||||||
|
|
||||||
|
// Use smartsend with dictionary type
|
||||||
|
// For small Dictionary: will use direct transport (JSON encoded)
|
||||||
|
// For large Dictionary: will use link transport (uploaded to fileserver)
|
||||||
|
const env = await smartsend(
|
||||||
|
SUBJECT,
|
||||||
|
[data1, data2],
|
||||||
|
{
|
||||||
|
natsUrl: NATS_URL,
|
||||||
|
fileserverUrl: FILESERVER_URL,
|
||||||
|
fileserverUploadHandler: plik_upload_handler,
|
||||||
|
sizeThreshold: 1_000_000,
|
||||||
|
correlationId: correlation_id,
|
||||||
|
msgPurpose: "chat",
|
||||||
|
senderName: "dict_sender",
|
||||||
|
receiverName: "",
|
||||||
|
receiverId: "",
|
||||||
|
replyTo: "",
|
||||||
|
replyToMsgId: ""
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
log_trace(`Sent message with ${env.payloads.length} payloads`);
|
||||||
|
|
||||||
|
// Log transport type for each payload
|
||||||
|
for (let i = 0; i < env.payloads.length; i++) {
|
||||||
|
const payload = env.payloads[i];
|
||||||
|
log_trace(`Payload ${i + 1} ('${payload.dataname}'):`);
|
||||||
|
log_trace(` Transport: ${payload.transport}`);
|
||||||
|
log_trace(` Type: ${payload.type}`);
|
||||||
|
log_trace(` Size: ${payload.size} bytes`);
|
||||||
|
log_trace(` Encoding: ${payload.encoding}`);
|
||||||
|
|
||||||
|
if (payload.transport === "link") {
|
||||||
|
log_trace(` URL: ${payload.data}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
console.log("Starting Dictionary transport test...");
|
||||||
|
console.log(`Correlation ID: ${correlation_id}`);
|
||||||
|
|
||||||
|
// Run sender
|
||||||
|
console.log("start smartsend for dictionaries");
|
||||||
|
test_dict_send();
|
||||||
|
|
||||||
|
console.log("Test completed.");
|
||||||
70
test/test_js_to_js_file_receiver.js
Normal file
70
test/test_js_to_js_file_receiver.js
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
// Test script for large payload testing using binary transport
|
||||||
|
// Tests receiving a large file (> 1MB) via smartsend with binary type
|
||||||
|
|
||||||
|
const { smartreceive, log_trace } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
const SUBJECT = "/NATSBridge_test";
|
||||||
|
const NATS_URL = "nats.yiem.cc";
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID
|
||||||
|
function log_trace(message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Receiver: Listen for messages and verify large payload handling
|
||||||
|
async function test_large_binary_receive() {
|
||||||
|
// Connect to NATS
|
||||||
|
const { connect } = require('nats');
|
||||||
|
const nc = await connect({ servers: [NATS_URL] });
|
||||||
|
|
||||||
|
// Subscribe to the subject
|
||||||
|
const sub = nc.subscribe(SUBJECT);
|
||||||
|
|
||||||
|
for await (const msg of sub) {
|
||||||
|
log_trace(`Received message on ${msg.subject}`);
|
||||||
|
|
||||||
|
// Use NATSBridge.smartreceive to handle the data
|
||||||
|
const result = await smartreceive(
|
||||||
|
msg,
|
||||||
|
{
|
||||||
|
maxRetries: 5,
|
||||||
|
baseDelay: 100,
|
||||||
|
maxDelay: 5000
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Result is a list of {dataname, data, type} objects
|
||||||
|
for (const { dataname, data, type } of result) {
|
||||||
|
if (data instanceof Uint8Array || Array.isArray(data)) {
|
||||||
|
const file_size = data.length;
|
||||||
|
log_trace(`Received ${file_size} bytes of binary data for '${dataname}' of type ${type}`);
|
||||||
|
|
||||||
|
// Save received data to a test file
|
||||||
|
const fs = require('fs');
|
||||||
|
const output_path = `./new_${dataname}`;
|
||||||
|
fs.writeFileSync(output_path, Buffer.from(data));
|
||||||
|
log_trace(`Saved received data to ${output_path}`);
|
||||||
|
} else {
|
||||||
|
log_trace(`Received unexpected data type for '${dataname}': ${typeof data}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep listening for 10 seconds
|
||||||
|
setTimeout(() => {
|
||||||
|
nc.close();
|
||||||
|
process.exit(0);
|
||||||
|
}, 120000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
console.log("Starting large binary payload test...");
|
||||||
|
|
||||||
|
// Run receiver
|
||||||
|
console.log("testing smartreceive");
|
||||||
|
test_large_binary_receive();
|
||||||
|
|
||||||
|
console.log("Test completed.");
|
||||||
143
test/test_js_to_js_file_sender.js
Normal file
143
test/test_js_to_js_file_sender.js
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
// Test script for large payload testing using binary transport
|
||||||
|
// Tests sending a large file (> 1MB) via smartsend with binary type
|
||||||
|
|
||||||
|
const { smartsend, uuid4, log_trace } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
const SUBJECT = "/NATSBridge_test";
|
||||||
|
const NATS_URL = "nats.yiem.cc";
|
||||||
|
const FILESERVER_URL = "http://192.168.88.104:8080";
|
||||||
|
|
||||||
|
// Create correlation ID for tracing
|
||||||
|
const correlation_id = uuid4();
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID
|
||||||
|
function log_trace(message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// File upload handler for plik server
|
||||||
|
async function plik_upload_handler(fileserver_url, dataname, data, correlation_id) {
|
||||||
|
log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`);
|
||||||
|
|
||||||
|
// Step 1: Get upload ID and token
|
||||||
|
const url_getUploadID = `${fileserver_url}/upload`;
|
||||||
|
const headers = {
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
};
|
||||||
|
const body = JSON.stringify({ OneShot: true });
|
||||||
|
|
||||||
|
let response = await fetch(url_getUploadID, {
|
||||||
|
method: "POST",
|
||||||
|
headers: headers,
|
||||||
|
body: body
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const responseJson = await response.json();
|
||||||
|
const uploadid = responseJson.id;
|
||||||
|
const uploadtoken = responseJson.uploadToken;
|
||||||
|
|
||||||
|
// Step 2: Upload file data
|
||||||
|
const url_upload = `${fileserver_url}/file/${uploadid}`;
|
||||||
|
|
||||||
|
// Create multipart form data
|
||||||
|
const formData = new FormData();
|
||||||
|
const blob = new Blob([data], { type: "application/octet-stream" });
|
||||||
|
formData.append("file", blob, dataname);
|
||||||
|
|
||||||
|
response = await fetch(url_upload, {
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"X-UploadToken": uploadtoken
|
||||||
|
},
|
||||||
|
body: formData
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileResponseJson = await response.json();
|
||||||
|
const fileid = fileResponseJson.id;
|
||||||
|
|
||||||
|
// Build the download URL
|
||||||
|
const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
|
||||||
|
|
||||||
|
log_trace(correlation_id, `Uploaded to URL: ${url}`);
|
||||||
|
|
||||||
|
return {
|
||||||
|
status: response.status,
|
||||||
|
uploadid: uploadid,
|
||||||
|
fileid: fileid,
|
||||||
|
url: url
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sender: Send large binary file via smartsend
|
||||||
|
async function test_large_binary_send() {
|
||||||
|
// Read the large file as binary data
|
||||||
|
const fs = require('fs');
|
||||||
|
|
||||||
|
// Test data 1
|
||||||
|
const file_path1 = './testFile_large.zip';
|
||||||
|
const file_data1 = fs.readFileSync(file_path1);
|
||||||
|
const filename1 = 'testFile_large.zip';
|
||||||
|
const data1 = { dataname: filename1, data: file_data1, type: "binary" };
|
||||||
|
|
||||||
|
// Test data 2
|
||||||
|
const file_path2 = './testFile_small.zip';
|
||||||
|
const file_data2 = fs.readFileSync(file_path2);
|
||||||
|
const filename2 = 'testFile_small.zip';
|
||||||
|
const data2 = { dataname: filename2, data: file_data2, type: "binary" };
|
||||||
|
|
||||||
|
// Use smartsend with binary type - will automatically use link transport
|
||||||
|
// if file size exceeds the threshold (1MB by default)
|
||||||
|
const env = await smartsend(
|
||||||
|
SUBJECT,
|
||||||
|
[data1, data2],
|
||||||
|
{
|
||||||
|
natsUrl: NATS_URL,
|
||||||
|
fileserverUrl: FILESERVER_URL,
|
||||||
|
fileserverUploadHandler: plik_upload_handler,
|
||||||
|
sizeThreshold: 1_000_000,
|
||||||
|
correlationId: correlation_id,
|
||||||
|
msgPurpose: "chat",
|
||||||
|
senderName: "sender",
|
||||||
|
receiverName: "",
|
||||||
|
receiverId: "",
|
||||||
|
replyTo: "",
|
||||||
|
replyToMsgId: ""
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
log_trace(`Sent message with transport: ${env.payloads[0].transport}`);
|
||||||
|
log_trace(`Envelope type: ${env.payloads[0].type}`);
|
||||||
|
|
||||||
|
// Check if link transport was used
|
||||||
|
if (env.payloads[0].transport === "link") {
|
||||||
|
log_trace("Using link transport - file uploaded to HTTP server");
|
||||||
|
log_trace(`URL: ${env.payloads[0].data}`);
|
||||||
|
} else {
|
||||||
|
log_trace("Using direct transport - payload sent via NATS");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
console.log("Starting large binary payload test...");
|
||||||
|
console.log(`Correlation ID: ${correlation_id}`);
|
||||||
|
|
||||||
|
// Run sender first
|
||||||
|
console.log("start smartsend");
|
||||||
|
test_large_binary_send();
|
||||||
|
|
||||||
|
// Run receiver
|
||||||
|
// console.log("testing smartreceive");
|
||||||
|
// test_large_binary_receive();
|
||||||
|
|
||||||
|
console.log("Test completed.");
|
||||||
276
test/test_js_to_js_mix_payload_sender.js
Normal file
276
test/test_js_to_js_mix_payload_sender.js
Normal file
@@ -0,0 +1,276 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
// Test script for mixed-content message testing
|
||||||
|
// Tests sending a mix of text, json, table, image, audio, video, and binary data
|
||||||
|
// from JavaScript serviceA to JavaScript serviceB using NATSBridge.js smartsend
|
||||||
|
//
|
||||||
|
// This test demonstrates that any combination and any number of mixed content
|
||||||
|
// can be sent and received correctly.
|
||||||
|
|
||||||
|
const { smartsend, uuid4, log_trace, _serialize_data } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
const SUBJECT = "/NATSBridge_mix_test";
|
||||||
|
const NATS_URL = "nats.yiem.cc";
|
||||||
|
const FILESERVER_URL = "http://192.168.88.104:8080";
|
||||||
|
|
||||||
|
// Create correlation ID for tracing
|
||||||
|
const correlation_id = uuid4();
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID
|
||||||
|
function log_trace(message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// File upload handler for plik server
|
||||||
|
async function plik_upload_handler(fileserver_url, dataname, data, correlation_id) {
|
||||||
|
log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`);
|
||||||
|
|
||||||
|
// Step 1: Get upload ID and token
|
||||||
|
const url_getUploadID = `${fileserver_url}/upload`;
|
||||||
|
const headers = {
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
};
|
||||||
|
const body = JSON.stringify({ OneShot: true });
|
||||||
|
|
||||||
|
let response = await fetch(url_getUploadID, {
|
||||||
|
method: "POST",
|
||||||
|
headers: headers,
|
||||||
|
body: body
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const responseJson = await response.json();
|
||||||
|
const uploadid = responseJson.id;
|
||||||
|
const uploadtoken = responseJson.uploadToken;
|
||||||
|
|
||||||
|
// Step 2: Upload file data
|
||||||
|
const url_upload = `${fileserver_url}/file/${uploadid}`;
|
||||||
|
|
||||||
|
// Create multipart form data
|
||||||
|
const formData = new FormData();
|
||||||
|
const blob = new Blob([data], { type: "application/octet-stream" });
|
||||||
|
formData.append("file", blob, dataname);
|
||||||
|
|
||||||
|
response = await fetch(url_upload, {
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"X-UploadToken": uploadtoken
|
||||||
|
},
|
||||||
|
body: formData
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileResponseJson = await response.json();
|
||||||
|
const fileid = fileResponseJson.id;
|
||||||
|
|
||||||
|
// Build the download URL
|
||||||
|
const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
|
||||||
|
|
||||||
|
log_trace(correlation_id, `Uploaded to URL: ${url}`);
|
||||||
|
|
||||||
|
return {
|
||||||
|
status: response.status,
|
||||||
|
uploadid: uploadid,
|
||||||
|
fileid: fileid,
|
||||||
|
url: url
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper: Create sample data for each type
|
||||||
|
function create_sample_data() {
|
||||||
|
// Text data (small - direct transport)
|
||||||
|
const text_data = "Hello! This is a test chat message. 🎉\nHow are you doing today? 😊";
|
||||||
|
|
||||||
|
// Dictionary/JSON data (medium - could be direct or link)
|
||||||
|
const dict_data = {
|
||||||
|
type: "chat",
|
||||||
|
sender: "serviceA",
|
||||||
|
receiver: "serviceB",
|
||||||
|
metadata: {
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
priority: "high",
|
||||||
|
tags: ["urgent", "chat", "test"]
|
||||||
|
},
|
||||||
|
content: {
|
||||||
|
text: "This is a JSON-formatted chat message with nested structure.",
|
||||||
|
format: "markdown",
|
||||||
|
mentions: ["user1", "user2"]
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Table data (small - direct transport) - NOT IMPLEMENTED (requires apache-arrow)
|
||||||
|
// const table_data_small = {...};
|
||||||
|
|
||||||
|
// Table data (large - link transport) - NOT IMPLEMENTED (requires apache-arrow)
|
||||||
|
// const table_data_large = {...};
|
||||||
|
|
||||||
|
// Image data (small binary - direct transport)
|
||||||
|
// Create a simple 10x10 pixel PNG-like data
|
||||||
|
const image_width = 10;
|
||||||
|
const image_height = 10;
|
||||||
|
let image_data = new Uint8Array(128); // PNG header + pixel data
|
||||||
|
// PNG header
|
||||||
|
image_data[0] = 0x89;
|
||||||
|
image_data[1] = 0x50;
|
||||||
|
image_data[2] = 0x4E;
|
||||||
|
image_data[3] = 0x47;
|
||||||
|
image_data[4] = 0x0D;
|
||||||
|
image_data[5] = 0x0A;
|
||||||
|
image_data[6] = 0x1A;
|
||||||
|
image_data[7] = 0x0A;
|
||||||
|
// Simple RGB data (10*10*3 = 300 bytes)
|
||||||
|
for (let i = 0; i < 300; i++) {
|
||||||
|
image_data[i + 8] = 0xFF; // Red pixel
|
||||||
|
}
|
||||||
|
|
||||||
|
// Image data (large - link transport)
|
||||||
|
const large_image_width = 500;
|
||||||
|
const large_image_height = 1000;
|
||||||
|
const large_image_data = new Uint8Array(large_image_width * large_image_height * 3 + 8);
|
||||||
|
// PNG header
|
||||||
|
large_image_data[0] = 0x89;
|
||||||
|
large_image_data[1] = 0x50;
|
||||||
|
large_image_data[2] = 0x4E;
|
||||||
|
large_image_data[3] = 0x47;
|
||||||
|
large_image_data[4] = 0x0D;
|
||||||
|
large_image_data[5] = 0x0A;
|
||||||
|
large_image_data[6] = 0x1A;
|
||||||
|
large_image_data[7] = 0x0A;
|
||||||
|
// Random RGB data
|
||||||
|
for (let i = 0; i < large_image_width * large_image_height * 3; i++) {
|
||||||
|
large_image_data[i + 8] = Math.floor(Math.random() * 255);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Audio data (small binary - direct transport)
|
||||||
|
const audio_data = new Uint8Array(100);
|
||||||
|
for (let i = 0; i < 100; i++) {
|
||||||
|
audio_data[i] = Math.floor(Math.random() * 255);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Audio data (large - link transport)
|
||||||
|
const large_audio_data = new Uint8Array(1_500_000);
|
||||||
|
for (let i = 0; i < 1_500_000; i++) {
|
||||||
|
large_audio_data[i] = Math.floor(Math.random() * 255);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Video data (small binary - direct transport)
|
||||||
|
const video_data = new Uint8Array(150);
|
||||||
|
for (let i = 0; i < 150; i++) {
|
||||||
|
video_data[i] = Math.floor(Math.random() * 255);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Video data (large - link transport)
|
||||||
|
const large_video_data = new Uint8Array(1_500_000);
|
||||||
|
for (let i = 0; i < 1_500_000; i++) {
|
||||||
|
large_video_data[i] = Math.floor(Math.random() * 255);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Binary data (small - direct transport)
|
||||||
|
const binary_data = new Uint8Array(200);
|
||||||
|
for (let i = 0; i < 200; i++) {
|
||||||
|
binary_data[i] = Math.floor(Math.random() * 255);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Binary data (large - link transport)
|
||||||
|
const large_binary_data = new Uint8Array(1_500_000);
|
||||||
|
for (let i = 0; i < 1_500_000; i++) {
|
||||||
|
large_binary_data[i] = Math.floor(Math.random() * 255);
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
text_data,
|
||||||
|
dict_data,
|
||||||
|
// table_data_small,
|
||||||
|
// table_data_large,
|
||||||
|
image_data,
|
||||||
|
large_image_data,
|
||||||
|
audio_data,
|
||||||
|
large_audio_data,
|
||||||
|
video_data,
|
||||||
|
large_video_data,
|
||||||
|
binary_data,
|
||||||
|
large_binary_data
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sender: Send mixed content via smartsend
|
||||||
|
async function test_mix_send() {
|
||||||
|
// Create sample data
|
||||||
|
const { text_data, dict_data, image_data, large_image_data, audio_data, large_audio_data, video_data, large_video_data, binary_data, large_binary_data } = create_sample_data();
|
||||||
|
|
||||||
|
// Create payloads list - mixed content with both small and large data
|
||||||
|
// Small data uses direct transport, large data uses link transport
|
||||||
|
const payloads = [
|
||||||
|
// Small data (direct transport) - text, dictionary
|
||||||
|
{ dataname: "chat_text", data: text_data, type: "text" },
|
||||||
|
{ dataname: "chat_json", data: dict_data, type: "dictionary" },
|
||||||
|
// { dataname: "chat_table_small", data: table_data_small, type: "table" },
|
||||||
|
|
||||||
|
// Large data (link transport) - large image, large audio, large video, large binary
|
||||||
|
// { dataname: "chat_table_large", data: table_data_large, type: "table" },
|
||||||
|
{ dataname: "user_image_large", data: large_image_data, type: "image" },
|
||||||
|
{ dataname: "audio_clip_large", data: large_audio_data, type: "audio" },
|
||||||
|
{ dataname: "video_clip_large", data: large_video_data, type: "video" },
|
||||||
|
{ dataname: "binary_file_large", data: large_binary_data, type: "binary" }
|
||||||
|
];
|
||||||
|
|
||||||
|
// Use smartsend with mixed content
|
||||||
|
const env = await smartsend(
|
||||||
|
SUBJECT,
|
||||||
|
payloads,
|
||||||
|
{
|
||||||
|
natsUrl: NATS_URL,
|
||||||
|
fileserverUrl: FILESERVER_URL,
|
||||||
|
fileserverUploadHandler: plik_upload_handler,
|
||||||
|
sizeThreshold: 1_000_000,
|
||||||
|
correlationId: correlation_id,
|
||||||
|
msgPurpose: "chat",
|
||||||
|
senderName: "mix_sender",
|
||||||
|
receiverName: "",
|
||||||
|
receiverId: "",
|
||||||
|
replyTo: "",
|
||||||
|
replyToMsgId: ""
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
log_trace(`Sent message with ${env.payloads.length} payloads`);
|
||||||
|
|
||||||
|
// Log transport type for each payload
|
||||||
|
for (let i = 0; i < env.payloads.length; i++) {
|
||||||
|
const payload = env.payloads[i];
|
||||||
|
log_trace(`Payload ${i + 1} ('${payload.dataname}'):`);
|
||||||
|
log_trace(` Transport: ${payload.transport}`);
|
||||||
|
log_trace(` Type: ${payload.type}`);
|
||||||
|
log_trace(` Size: ${payload.size} bytes`);
|
||||||
|
log_trace(` Encoding: ${payload.encoding}`);
|
||||||
|
|
||||||
|
if (payload.transport === "link") {
|
||||||
|
log_trace(` URL: ${payload.data}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Summary
|
||||||
|
console.log("\n--- Transport Summary ---");
|
||||||
|
const direct_count = env.payloads.filter(p => p.transport === "direct").length;
|
||||||
|
const link_count = env.payloads.filter(p => p.transport === "link").length;
|
||||||
|
log_trace(`Direct transport: ${direct_count} payloads`);
|
||||||
|
log_trace(`Link transport: ${link_count} payloads`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
console.log("Starting mixed-content transport test...");
|
||||||
|
console.log(`Correlation ID: ${correlation_id}`);
|
||||||
|
|
||||||
|
// Run sender
|
||||||
|
console.log("start smartsend for mixed content");
|
||||||
|
test_mix_send();
|
||||||
|
|
||||||
|
console.log("\nTest completed.");
|
||||||
|
console.log("Note: Run test_js_to_js_mix_receiver.js to receive the messages.");
|
||||||
172
test/test_js_to_js_mix_payloads_receiver.js
Normal file
172
test/test_js_to_js_mix_payloads_receiver.js
Normal file
@@ -0,0 +1,172 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
// Test script for mixed-content message testing
|
||||||
|
// Tests receiving a mix of text, json, table, image, audio, video, and binary data
|
||||||
|
// from JavaScript serviceA to JavaScript serviceB using NATSBridge.js smartreceive
|
||||||
|
//
|
||||||
|
// This test demonstrates that any combination and any number of mixed content
|
||||||
|
// can be sent and received correctly.
|
||||||
|
|
||||||
|
const { smartreceive, log_trace } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
const SUBJECT = "/NATSBridge_mix_test";
|
||||||
|
const NATS_URL = "nats.yiem.cc";
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID
|
||||||
|
function log_trace(message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Receiver: Listen for messages and verify mixed content handling
|
||||||
|
async function test_mix_receive() {
|
||||||
|
// Connect to NATS
|
||||||
|
const { connect } = require('nats');
|
||||||
|
const nc = await connect({ servers: [NATS_URL] });
|
||||||
|
|
||||||
|
// Subscribe to the subject
|
||||||
|
const sub = nc.subscribe(SUBJECT);
|
||||||
|
|
||||||
|
for await (const msg of sub) {
|
||||||
|
log_trace(`Received message on ${msg.subject}`);
|
||||||
|
|
||||||
|
// Use NATSBridge.smartreceive to handle the data
|
||||||
|
const result = await smartreceive(
|
||||||
|
msg,
|
||||||
|
{
|
||||||
|
maxRetries: 5,
|
||||||
|
baseDelay: 100,
|
||||||
|
maxDelay: 5000
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
log_trace(`Received ${result.length} payloads`);
|
||||||
|
|
||||||
|
// Result is a list of {dataname, data, type} objects
|
||||||
|
for (const { dataname, data, type } of result) {
|
||||||
|
log_trace(`\n=== Payload: ${dataname} (type: ${type}) ===`);
|
||||||
|
|
||||||
|
// Handle different data types
|
||||||
|
if (type === "text") {
|
||||||
|
// Text data - should be a String
|
||||||
|
if (typeof data === 'string') {
|
||||||
|
log_trace(` Type: String`);
|
||||||
|
log_trace(` Length: ${data.length} characters`);
|
||||||
|
|
||||||
|
// Display first 200 characters
|
||||||
|
if (data.length > 200) {
|
||||||
|
log_trace(` First 200 chars: ${data.substring(0, 200)}...`);
|
||||||
|
} else {
|
||||||
|
log_trace(` Content: ${data}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save to file
|
||||||
|
const fs = require('fs');
|
||||||
|
const output_path = `./received_${dataname}.txt`;
|
||||||
|
fs.writeFileSync(output_path, data);
|
||||||
|
log_trace(` Saved to: ${output_path}`);
|
||||||
|
} else {
|
||||||
|
log_trace(` ERROR: Expected String, got ${typeof data}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (type === "dictionary") {
|
||||||
|
// Dictionary data - should be an object
|
||||||
|
if (typeof data === 'object' && data !== null && !Array.isArray(data)) {
|
||||||
|
log_trace(` Type: Object`);
|
||||||
|
log_trace(` Keys: ${Object.keys(data).join(', ')}`);
|
||||||
|
|
||||||
|
// Display nested content
|
||||||
|
for (const [key, value] of Object.entries(data)) {
|
||||||
|
log_trace(` ${key} => ${value}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save to JSON file
|
||||||
|
const fs = require('fs');
|
||||||
|
const output_path = `./received_${dataname}.json`;
|
||||||
|
const json_str = JSON.stringify(data, null, 2);
|
||||||
|
fs.writeFileSync(output_path, json_str);
|
||||||
|
log_trace(` Saved to: ${output_path}`);
|
||||||
|
} else {
|
||||||
|
log_trace(` ERROR: Expected Object, got ${typeof data}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (type === "table") {
|
||||||
|
// Table data - should be an array of objects (requires apache-arrow)
|
||||||
|
log_trace(` Type: Array (requires apache-arrow for full deserialization)`);
|
||||||
|
if (Array.isArray(data)) {
|
||||||
|
log_trace(` Length: ${data.length} items`);
|
||||||
|
log_trace(` First item: ${JSON.stringify(data[0])}`);
|
||||||
|
} else {
|
||||||
|
log_trace(` ERROR: Expected Array, got ${typeof data}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (type === "image" || type === "audio" || type === "video" || type === "binary") {
|
||||||
|
// Binary data - should be Uint8Array
|
||||||
|
if (data instanceof Uint8Array || Array.isArray(data)) {
|
||||||
|
log_trace(` Type: Uint8Array (binary)`);
|
||||||
|
log_trace(` Size: ${data.length} bytes`);
|
||||||
|
|
||||||
|
// Save to file
|
||||||
|
const fs = require('fs');
|
||||||
|
const output_path = `./received_${dataname}.bin`;
|
||||||
|
fs.writeFileSync(output_path, Buffer.from(data));
|
||||||
|
log_trace(` Saved to: ${output_path}`);
|
||||||
|
} else {
|
||||||
|
log_trace(` ERROR: Expected Uint8Array, got ${typeof data}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log_trace(` ERROR: Unknown data type '${type}'`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Summary
|
||||||
|
console.log("\n=== Verification Summary ===");
|
||||||
|
const text_count = result.filter(x => x.type === "text").length;
|
||||||
|
const dict_count = result.filter(x => x.type === "dictionary").length;
|
||||||
|
const table_count = result.filter(x => x.type === "table").length;
|
||||||
|
const image_count = result.filter(x => x.type === "image").length;
|
||||||
|
const audio_count = result.filter(x => x.type === "audio").length;
|
||||||
|
const video_count = result.filter(x => x.type === "video").length;
|
||||||
|
const binary_count = result.filter(x => x.type === "binary").length;
|
||||||
|
|
||||||
|
log_trace(`Text payloads: ${text_count}`);
|
||||||
|
log_trace(`Dictionary payloads: ${dict_count}`);
|
||||||
|
log_trace(`Table payloads: ${table_count}`);
|
||||||
|
log_trace(`Image payloads: ${image_count}`);
|
||||||
|
log_trace(`Audio payloads: ${audio_count}`);
|
||||||
|
log_trace(`Video payloads: ${video_count}`);
|
||||||
|
log_trace(`Binary payloads: ${binary_count}`);
|
||||||
|
|
||||||
|
// Print transport type info for each payload if available
|
||||||
|
console.log("\n=== Payload Details ===");
|
||||||
|
for (const { dataname, data, type } of result) {
|
||||||
|
if (["image", "audio", "video", "binary"].includes(type)) {
|
||||||
|
log_trace(`${dataname}: ${data.length} bytes (binary)`);
|
||||||
|
} else if (type === "table") {
|
||||||
|
log_trace(`${dataname}: ${data.length} items (Array)`);
|
||||||
|
} else if (type === "dictionary") {
|
||||||
|
log_trace(`${dataname}: ${JSON.stringify(data).length} bytes (Object)`);
|
||||||
|
} else if (type === "text") {
|
||||||
|
log_trace(`${dataname}: ${data.length} characters (String)`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep listening for 2 minutes
|
||||||
|
setTimeout(() => {
|
||||||
|
nc.close();
|
||||||
|
process.exit(0);
|
||||||
|
}, 120000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
console.log("Starting mixed-content transport test...");
|
||||||
|
console.log("Note: This receiver will wait for messages from the sender.");
|
||||||
|
console.log("Run test_js_to_js_mix_sender.js first to send test data.");
|
||||||
|
|
||||||
|
// Run receiver
|
||||||
|
console.log("\ntesting smartreceive for mixed content");
|
||||||
|
test_mix_receive();
|
||||||
|
|
||||||
|
console.log("\nTest completed.");
|
||||||
86
test/test_js_to_js_table_receiver.js
Normal file
86
test/test_js_to_js_table_receiver.js
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
// Test script for Table transport testing
|
||||||
|
// Tests receiving 1 large and 1 small Tables via direct and link transport
|
||||||
|
// Uses NATSBridge.js smartreceive with "table" type
|
||||||
|
//
|
||||||
|
// Note: This test requires the apache-arrow library to deserialize table data.
|
||||||
|
// The JavaScript implementation uses apache-arrow for Arrow IPC deserialization.
|
||||||
|
|
||||||
|
const { smartreceive, log_trace } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
const SUBJECT = "/NATSBridge_table_test";
|
||||||
|
const NATS_URL = "nats.yiem.cc";
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID
|
||||||
|
function log_trace(message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Receiver: Listen for messages and verify Table handling
|
||||||
|
async function test_table_receive() {
|
||||||
|
// Connect to NATS
|
||||||
|
const { connect } = require('nats');
|
||||||
|
const nc = await connect({ servers: [NATS_URL] });
|
||||||
|
|
||||||
|
// Subscribe to the subject
|
||||||
|
const sub = nc.subscribe(SUBJECT);
|
||||||
|
|
||||||
|
for await (const msg of sub) {
|
||||||
|
log_trace(`Received message on ${msg.subject}`);
|
||||||
|
|
||||||
|
// Use NATSBridge.smartreceive to handle the data
|
||||||
|
const result = await smartreceive(
|
||||||
|
msg,
|
||||||
|
{
|
||||||
|
maxRetries: 5,
|
||||||
|
baseDelay: 100,
|
||||||
|
maxDelay: 5000
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Result is a list of {dataname, data, type} objects
|
||||||
|
for (const { dataname, data, type } of result) {
|
||||||
|
if (Array.isArray(data)) {
|
||||||
|
log_trace(`Received Table '${dataname}' of type ${type}`);
|
||||||
|
|
||||||
|
// Display table contents
|
||||||
|
console.log(` Dimensions: ${data.length} rows x ${data.length > 0 ? Object.keys(data[0]).length : 0} columns`);
|
||||||
|
console.log(` Columns: ${data.length > 0 ? Object.keys(data[0]).join(', ') : ''}`);
|
||||||
|
|
||||||
|
// Display first few rows
|
||||||
|
console.log(` First 5 rows:`);
|
||||||
|
for (let i = 0; i < Math.min(5, data.length); i++) {
|
||||||
|
console.log(` Row ${i}: ${JSON.stringify(data[i])}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save to JSON file
|
||||||
|
const fs = require('fs');
|
||||||
|
const output_path = `./received_${dataname}.json`;
|
||||||
|
const json_str = JSON.stringify(data, null, 2);
|
||||||
|
fs.writeFileSync(output_path, json_str);
|
||||||
|
log_trace(`Saved Table to ${output_path}`);
|
||||||
|
} else {
|
||||||
|
log_trace(`Received unexpected data type for '${dataname}': ${typeof data}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep listening for 10 seconds
|
||||||
|
setTimeout(() => {
|
||||||
|
nc.close();
|
||||||
|
process.exit(0);
|
||||||
|
}, 120000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
console.log("Starting Table transport test...");
|
||||||
|
console.log("Note: This receiver will wait for messages from the sender.");
|
||||||
|
console.log("Run test_js_to_js_table_sender.js first to send test data.");
|
||||||
|
|
||||||
|
// Run receiver
|
||||||
|
console.log("testing smartreceive");
|
||||||
|
test_table_receive();
|
||||||
|
|
||||||
|
console.log("Test completed.");
|
||||||
164
test/test_js_to_js_table_sender.js
Normal file
164
test/test_js_to_js_table_sender.js
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
// Test script for Table transport testing
|
||||||
|
// Tests sending 1 large and 1 small Tables via direct and link transport
|
||||||
|
// Uses NATSBridge.js smartsend with "table" type
|
||||||
|
//
|
||||||
|
// Note: This test requires the apache-arrow library to serialize/deserialize table data.
|
||||||
|
// The JavaScript implementation uses apache-arrow for Arrow IPC serialization.
|
||||||
|
|
||||||
|
const { smartsend, uuid4, log_trace } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
const SUBJECT = "/NATSBridge_table_test";
|
||||||
|
const NATS_URL = "nats.yiem.cc";
|
||||||
|
const FILESERVER_URL = "http://192.168.88.104:8080";
|
||||||
|
|
||||||
|
// Create correlation ID for tracing
|
||||||
|
const correlation_id = uuid4();
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID
|
||||||
|
function log_trace(message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// File upload handler for plik server
|
||||||
|
async function plik_upload_handler(fileserver_url, dataname, data, correlation_id) {
|
||||||
|
log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`);
|
||||||
|
|
||||||
|
// Step 1: Get upload ID and token
|
||||||
|
const url_getUploadID = `${fileserver_url}/upload`;
|
||||||
|
const headers = {
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
};
|
||||||
|
const body = JSON.stringify({ OneShot: true });
|
||||||
|
|
||||||
|
let response = await fetch(url_getUploadID, {
|
||||||
|
method: "POST",
|
||||||
|
headers: headers,
|
||||||
|
body: body
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const responseJson = await response.json();
|
||||||
|
const uploadid = responseJson.id;
|
||||||
|
const uploadtoken = responseJson.uploadToken;
|
||||||
|
|
||||||
|
// Step 2: Upload file data
|
||||||
|
const url_upload = `${fileserver_url}/file/${uploadid}`;
|
||||||
|
|
||||||
|
// Create multipart form data
|
||||||
|
const formData = new FormData();
|
||||||
|
const blob = new Blob([data], { type: "application/octet-stream" });
|
||||||
|
formData.append("file", blob, dataname);
|
||||||
|
|
||||||
|
response = await fetch(url_upload, {
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"X-UploadToken": uploadtoken
|
||||||
|
},
|
||||||
|
body: formData
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileResponseJson = await response.json();
|
||||||
|
const fileid = fileResponseJson.id;
|
||||||
|
|
||||||
|
// Build the download URL
|
||||||
|
const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
|
||||||
|
|
||||||
|
log_trace(correlation_id, `Uploaded to URL: ${url}`);
|
||||||
|
|
||||||
|
return {
|
||||||
|
status: response.status,
|
||||||
|
uploadid: uploadid,
|
||||||
|
fileid: fileid,
|
||||||
|
url: url
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sender: Send Tables via smartsend
|
||||||
|
async function test_table_send() {
|
||||||
|
// Note: This test requires apache-arrow library to create Arrow IPC data.
|
||||||
|
// For now, we'll use a simple array of objects as table data.
|
||||||
|
// In production, you would use the apache-arrow library to create Arrow IPC data.
|
||||||
|
|
||||||
|
// Create a small Table (will use direct transport)
|
||||||
|
const small_table = [
|
||||||
|
{ id: 1, name: "Alice", score: 95 },
|
||||||
|
{ id: 2, name: "Bob", score: 88 },
|
||||||
|
{ id: 3, name: "Charlie", score: 92 }
|
||||||
|
];
|
||||||
|
|
||||||
|
// Create a large Table (will use link transport if > 1MB)
|
||||||
|
// Generate a larger dataset (~2MB to ensure link transport)
|
||||||
|
const large_table = [];
|
||||||
|
for (let i = 0; i < 50000; i++) {
|
||||||
|
large_table.push({
|
||||||
|
id: i,
|
||||||
|
message: `msg_${i}`,
|
||||||
|
sender: `sender_${i}`,
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
priority: Math.floor(Math.random() * 3) + 1
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test data 1: small Table
|
||||||
|
const data1 = { dataname: "small_table", data: small_table, type: "table" };
|
||||||
|
|
||||||
|
// Test data 2: large Table
|
||||||
|
const data2 = { dataname: "large_table", data: large_table, type: "table" };
|
||||||
|
|
||||||
|
// Use smartsend with table type
|
||||||
|
// For small Table: will use direct transport (Arrow IPC encoded)
|
||||||
|
// For large Table: will use link transport (uploaded to fileserver)
|
||||||
|
const env = await smartsend(
|
||||||
|
SUBJECT,
|
||||||
|
[data1, data2],
|
||||||
|
{
|
||||||
|
natsUrl: NATS_URL,
|
||||||
|
fileserverUrl: FILESERVER_URL,
|
||||||
|
fileserverUploadHandler: plik_upload_handler,
|
||||||
|
sizeThreshold: 1_000_000,
|
||||||
|
correlationId: correlation_id,
|
||||||
|
msgPurpose: "chat",
|
||||||
|
senderName: "table_sender",
|
||||||
|
receiverName: "",
|
||||||
|
receiverId: "",
|
||||||
|
replyTo: "",
|
||||||
|
replyToMsgId: ""
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
log_trace(`Sent message with ${env.payloads.length} payloads`);
|
||||||
|
|
||||||
|
// Log transport type for each payload
|
||||||
|
for (let i = 0; i < env.payloads.length; i++) {
|
||||||
|
const payload = env.payloads[i];
|
||||||
|
log_trace(`Payload ${i + 1} ('${payload.dataname}'):`);
|
||||||
|
log_trace(` Transport: ${payload.transport}`);
|
||||||
|
log_trace(` Type: ${payload.type}`);
|
||||||
|
log_trace(` Size: ${payload.size} bytes`);
|
||||||
|
log_trace(` Encoding: ${payload.encoding}`);
|
||||||
|
|
||||||
|
if (payload.transport === "link") {
|
||||||
|
log_trace(` URL: ${payload.data}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
console.log("Starting Table transport test...");
|
||||||
|
console.log(`Correlation ID: ${correlation_id}`);
|
||||||
|
|
||||||
|
// Run sender
|
||||||
|
console.log("start smartsend for tables");
|
||||||
|
test_table_send();
|
||||||
|
|
||||||
|
console.log("Test completed.");
|
||||||
80
test/test_js_to_js_text_receiver.js
Normal file
80
test/test_js_to_js_text_receiver.js
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
// Test script for text transport testing
|
||||||
|
// Tests receiving 1 large and 1 small text from JavaScript serviceA to JavaScript serviceB
|
||||||
|
// Uses NATSBridge.js smartreceive with "text" type
|
||||||
|
|
||||||
|
const { smartreceive, log_trace } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
const SUBJECT = "/NATSBridge_text_test";
|
||||||
|
const NATS_URL = "nats.yiem.cc";
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID
|
||||||
|
function log_trace(message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Receiver: Listen for messages and verify text handling
|
||||||
|
async function test_text_receive() {
|
||||||
|
// Connect to NATS
|
||||||
|
const { connect } = require('nats');
|
||||||
|
const nc = await connect({ servers: [NATS_URL] });
|
||||||
|
|
||||||
|
// Subscribe to the subject
|
||||||
|
const sub = nc.subscribe(SUBJECT);
|
||||||
|
|
||||||
|
for await (const msg of sub) {
|
||||||
|
log_trace(`Received message on ${msg.subject}`);
|
||||||
|
|
||||||
|
// Use NATSBridge.smartreceive to handle the data
|
||||||
|
const result = await smartreceive(
|
||||||
|
msg,
|
||||||
|
{
|
||||||
|
maxRetries: 5,
|
||||||
|
baseDelay: 100,
|
||||||
|
maxDelay: 5000
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Result is a list of {dataname, data, type} objects
|
||||||
|
for (const { dataname, data, type } of result) {
|
||||||
|
if (typeof data === 'string') {
|
||||||
|
log_trace(`Received text '${dataname}' of type ${type}`);
|
||||||
|
log_trace(` Length: ${data.length} characters`);
|
||||||
|
|
||||||
|
// Display first 100 characters
|
||||||
|
if (data.length > 100) {
|
||||||
|
log_trace(` First 100 characters: ${data.substring(0, 100)}...`);
|
||||||
|
} else {
|
||||||
|
log_trace(` Content: ${data}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save to file
|
||||||
|
const fs = require('fs');
|
||||||
|
const output_path = `./received_${dataname}.txt`;
|
||||||
|
fs.writeFileSync(output_path, data);
|
||||||
|
log_trace(`Saved text to ${output_path}`);
|
||||||
|
} else {
|
||||||
|
log_trace(`Received unexpected data type for '${dataname}': ${typeof data}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep listening for 10 seconds
|
||||||
|
setTimeout(() => {
|
||||||
|
nc.close();
|
||||||
|
process.exit(0);
|
||||||
|
}, 120000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
console.log("Starting text transport test...");
|
||||||
|
console.log("Note: This receiver will wait for messages from the sender.");
|
||||||
|
console.log("Run test_js_to_js_text_sender.js first to send test data.");
|
||||||
|
|
||||||
|
// Run receiver
|
||||||
|
console.log("testing smartreceive for text");
|
||||||
|
test_text_receive();
|
||||||
|
|
||||||
|
console.log("Test completed.");
|
||||||
140
test/test_js_to_js_text_sender.js
Normal file
140
test/test_js_to_js_text_sender.js
Normal file
@@ -0,0 +1,140 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
// Test script for text transport testing
|
||||||
|
// Tests sending 1 large and 1 small text from JavaScript serviceA to JavaScript serviceB
|
||||||
|
// Uses NATSBridge.js smartsend with "text" type
|
||||||
|
|
||||||
|
const { smartsend, uuid4, log_trace } = require('./src/NATSBridge');
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
const SUBJECT = "/NATSBridge_text_test";
|
||||||
|
const NATS_URL = "nats.yiem.cc";
|
||||||
|
const FILESERVER_URL = "http://192.168.88.104:8080";
|
||||||
|
|
||||||
|
// Create correlation ID for tracing
|
||||||
|
const correlation_id = uuid4();
|
||||||
|
|
||||||
|
// Helper: Log with correlation ID
|
||||||
|
function log_trace(message) {
|
||||||
|
const timestamp = new Date().toISOString();
|
||||||
|
console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// File upload handler for plik server
|
||||||
|
async function plik_upload_handler(fileserver_url, dataname, data, correlation_id) {
|
||||||
|
// Get upload ID
|
||||||
|
const url_getUploadID = `${fileserver_url}/upload`;
|
||||||
|
const headers = {
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
};
|
||||||
|
const body = JSON.stringify({ OneShot: true });
|
||||||
|
|
||||||
|
let response = await fetch(url_getUploadID, {
|
||||||
|
method: "POST",
|
||||||
|
headers: headers,
|
||||||
|
body: body
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const responseJson = await response.json();
|
||||||
|
const uploadid = responseJson.id;
|
||||||
|
const uploadtoken = responseJson.uploadToken;
|
||||||
|
|
||||||
|
// Upload file
|
||||||
|
const formData = new FormData();
|
||||||
|
const blob = new Blob([data], { type: "application/octet-stream" });
|
||||||
|
formData.append("file", blob, dataname);
|
||||||
|
|
||||||
|
response = await fetch(`${fileserver_url}/file/${uploadid}`, {
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"X-UploadToken": uploadtoken
|
||||||
|
},
|
||||||
|
body: formData
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileResponseJson = await response.json();
|
||||||
|
const fileid = fileResponseJson.id;
|
||||||
|
|
||||||
|
const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
|
||||||
|
|
||||||
|
return {
|
||||||
|
status: response.status,
|
||||||
|
uploadid: uploadid,
|
||||||
|
fileid: fileid,
|
||||||
|
url: url
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sender: Send text via smartsend
|
||||||
|
async function test_text_send() {
|
||||||
|
// Create a small text (will use direct transport)
|
||||||
|
const small_text = "Hello, this is a small text message. Testing direct transport via NATS.";
|
||||||
|
|
||||||
|
// Create a large text (will use link transport if > 1MB)
|
||||||
|
// Generate a larger text (~2MB to ensure link transport)
|
||||||
|
const large_text_lines = [];
|
||||||
|
for (let i = 0; i < 50000; i++) {
|
||||||
|
large_text_lines.push(`Line ${i}: This is a sample text line with some content to pad the size. `);
|
||||||
|
}
|
||||||
|
const large_text = large_text_lines.join("");
|
||||||
|
|
||||||
|
// Test data 1: small text
|
||||||
|
const data1 = { dataname: "small_text", data: small_text, type: "text" };
|
||||||
|
|
||||||
|
// Test data 2: large text
|
||||||
|
const data2 = { dataname: "large_text", data: large_text, type: "text" };
|
||||||
|
|
||||||
|
// Use smartsend with text type
|
||||||
|
// For small text: will use direct transport (Base64 encoded UTF-8)
|
||||||
|
// For large text: will use link transport (uploaded to fileserver)
|
||||||
|
const env = await smartsend(
|
||||||
|
SUBJECT,
|
||||||
|
[data1, data2],
|
||||||
|
{
|
||||||
|
natsUrl: NATS_URL,
|
||||||
|
fileserverUrl: FILESERVER_URL,
|
||||||
|
fileserverUploadHandler: plik_upload_handler,
|
||||||
|
sizeThreshold: 1_000_000,
|
||||||
|
correlationId: correlation_id,
|
||||||
|
msgPurpose: "chat",
|
||||||
|
senderName: "text_sender",
|
||||||
|
receiverName: "",
|
||||||
|
receiverId: "",
|
||||||
|
replyTo: "",
|
||||||
|
replyToMsgId: ""
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
log_trace(`Sent message with ${env.payloads.length} payloads`);
|
||||||
|
|
||||||
|
// Log transport type for each payload
|
||||||
|
for (let i = 0; i < env.payloads.length; i++) {
|
||||||
|
const payload = env.payloads[i];
|
||||||
|
log_trace(`Payload ${i + 1} ('${payload.dataname}'):`);
|
||||||
|
log_trace(` Transport: ${payload.transport}`);
|
||||||
|
log_trace(` Type: ${payload.type}`);
|
||||||
|
log_trace(` Size: ${payload.size} bytes`);
|
||||||
|
log_trace(` Encoding: ${payload.encoding}`);
|
||||||
|
|
||||||
|
if (payload.transport === "link") {
|
||||||
|
log_trace(` URL: ${payload.data}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the test
|
||||||
|
console.log("Starting text transport test...");
|
||||||
|
console.log(`Correlation ID: ${correlation_id}`);
|
||||||
|
|
||||||
|
// Run sender
|
||||||
|
console.log("start smartsend for text");
|
||||||
|
test_text_send();
|
||||||
|
|
||||||
|
console.log("Test completed.");
|
||||||
Reference in New Issue
Block a user