From 3f45052193b55e59f3d2d07b16cf895103cd03e0 Mon Sep 17 00:00:00 2001 From: narawat Date: Thu, 19 Feb 2026 19:25:34 +0700 Subject: [PATCH] update --- docs/implementation.md | 98 ++- package.json | 28 + src/NATSBridge.js | 706 ++++++++++++++++++++ test/test_js_to_js_dict_receiver.js | 79 +++ test/test_js_to_js_dict_sender.js | 164 +++++ test/test_js_to_js_file_receiver.js | 70 ++ test/test_js_to_js_file_sender.js | 143 ++++ test/test_js_to_js_mix_payload_sender.js | 276 ++++++++ test/test_js_to_js_mix_payloads_receiver.js | 172 +++++ test/test_js_to_js_table_receiver.js | 86 +++ test/test_js_to_js_table_sender.js | 164 +++++ test/test_js_to_js_text_receiver.js | 80 +++ test/test_js_to_js_text_sender.js | 140 ++++ 13 files changed, 2177 insertions(+), 29 deletions(-) create mode 100644 package.json create mode 100644 src/NATSBridge.js create mode 100644 test/test_js_to_js_dict_receiver.js create mode 100644 test/test_js_to_js_dict_sender.js create mode 100644 test/test_js_to_js_file_receiver.js create mode 100644 test/test_js_to_js_file_sender.js create mode 100644 test/test_js_to_js_mix_payload_sender.js create mode 100644 test/test_js_to_js_mix_payloads_receiver.js create mode 100644 test/test_js_to_js_table_receiver.js create mode 100644 test/test_js_to_js_table_sender.js create mode 100644 test/test_js_to_js_text_receiver.js create mode 100644 test/test_js_to_js_text_sender.js diff --git a/docs/implementation.md b/docs/implementation.md index ba3ff2b..109de2e 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -71,13 +71,14 @@ The Julia implementation provides: - **[`SmartSend()`](../src/julia_bridge.jl)**: Handles transport selection based on payload size - **[`SmartReceive()`](../src/julia_bridge.jl)**: Handles both direct and link transport -### JavaScript Module: [`src/js_bridge.js`](../src/js_bridge.js) +### JavaScript Module: [`src/NATSBridge.js`](../src/NATSBridge.js) The JavaScript implementation provides: - **`MessageEnvelope` class**: For the unified JSON envelope -- **[`SmartSend()`](../src/js_bridge.js)**: Handles transport selection based on payload size -- **[`SmartReceive()`](../src/js_bridge.js)**: Handles both direct and link transport +- **`MessagePayload` class**: For individual payload representation +- **[`smartsend()`](../src/NATSBridge.js)**: Handles transport selection based on payload size +- **[`smartreceive()`](../src/NATSBridge.js)**: Handles both direct and link transport ## Installation @@ -167,16 +168,16 @@ payloads = smartreceive(msg, "http://localhost:8080") #### JavaScript (Sender) ```javascript -const { SmartSend } = require('./js_bridge'); +const { smartsend } = require('./src/NATSBridge'); // Single payload wrapped in a list const config = [{ dataname: "config", data: { step_size: 0.01, iterations: 1000 }, - type: "json" + type: "dictionary" }]; -await SmartSend("control", config, "json", { +await smartsend("control", config, { correlationId: "unique-id" }); @@ -185,16 +186,16 @@ const configs = [ { dataname: "config1", data: { step_size: 0.01 }, - type: "json" + type: "dictionary" }, { dataname: "config2", data: { iterations: 1000 }, - type: "json" + type: "dictionary" } ]; -await SmartSend("control", configs, "json"); +await smartsend("control", configs); ``` #### Julia (Receiver) @@ -217,6 +218,25 @@ subscribe(nats, "control") do msg end ``` +### JavaScript (Receiver) +```javascript +const { smartreceive } = require('./src/NATSBridge'); + +// Subscribe to messages +const nc = await connect({ servers: ['nats://localhost:4222'] }); +const sub = nc.subscribe("control"); + +for await (const msg of sub) { + const result = await smartreceive(msg); + + // Process the result + for (const { dataname, data, type } of result) { + console.log(`Received ${dataname} of type ${type}`); + console.log(`Data: ${JSON.stringify(data)}`); + } +} +``` + ### Scenario 2: Deep Dive Analysis (Large Arrow Table) #### Julia (Sender) @@ -237,28 +257,29 @@ await SmartSend("analysis_results", [("table_data", df, "table")]); #### JavaScript (Receiver) ```javascript -const { SmartReceive } = require('./js_bridge'); +const { smartreceive } = require('./src/NATSBridge'); -const result = await SmartReceive(msg); +const result = await smartreceive(msg); // Use table data for visualization with Perspective.js or D3 -const table = result.data; +// Note: Tables are sent as arrays of objects in JavaScript +const table = result; ``` ### Scenario 3: Live Binary Processing #### JavaScript (Sender) ```javascript -const { SmartSend } = require('./js_bridge'); +const { smartsend } = require('./src/NATSBridge'); // Binary data wrapped in a list const binaryData = [{ dataname: "audio_chunk", - data: binaryBuffer, + data: binaryBuffer, // ArrayBuffer or Uint8Array type: "binary" }]; -await SmartSend("binary_input", binaryData, "binary", { +await smartsend("binary_input", binaryData, { metadata: { sample_rate: 44100, channels: 1 @@ -282,6 +303,25 @@ function process_binary(data) end ``` +### JavaScript (Receiver) +```javascript +const { smartreceive } = require('./src/NATSBridge'); + +// Receive binary data +function process_binary(msg) { + const result = await smartreceive(msg); + + // Process the binary data + for (const { dataname, data, type } of result) { + if (type === "binary") { + // data is an ArrayBuffer or Uint8Array + console.log(`Received binary data: ${dataname}, size: ${data.length}`); + // Perform FFT or AI transcription here + } + } +} +``` + ### Scenario 4: Catch-Up (JetStream) #### Julia (Producer) @@ -299,6 +339,7 @@ end #### JavaScript (Consumer) ```javascript const { connect } = require('nats'); +const { smartreceive } = require('./src/NATSBridge'); const nc = await connect({ servers: ['nats://localhost:4222'] }); const js = nc.jetstream(); @@ -312,9 +353,9 @@ const consumer = await js.pullSubscribe("health", { // Process historical and real-time messages for await (const msg of consumer) { - const result = await SmartReceive(msg); - // result.data contains the list of payloads - // result.envelope contains the message envelope + const result = await smartreceive(msg); + // result contains the list of payloads + // Each payload has: dataname, data, type msg.ack(); } ``` @@ -351,22 +392,21 @@ smartsend( **JavaScript (Receiver):** ```javascript -const { SmartReceive } = require('./js_bridge'); +const { smartreceive, smartsend } = require('./src/NATSBridge'); // Receive NATS message with direct transport -const result = await SmartReceive(msg); +const result = await smartreceive(msg); -// Decode Base64 payload -// Parse Arrow IPC with zero-copy -// Load into selection UI component (e.g., dropdown, table) -const table = result[2]; // Get the DataFrame from the tuple +// Decode Base64 payload (for direct transport) +// For tables, data is an array of objects +const table = result; // Array of objects // User makes selection const selection = uiComponent.getSelectedOption(); // Send selection back to Julia -await SmartSend("dashboard.response", [ - ("selected_option", selection, "dictionary") +await smartsend("dashboard.response", [ + { dataname: "selected_option", data: selection, type: "dictionary" } ]); ``` @@ -416,7 +456,7 @@ smartsend( **JavaScript (Sender/Receiver):** ```javascript -const { SmartSend, SmartReceive } = require('./js_bridge'); +const { smartsend, smartreceive } = require('./src/NATSBridge'); // Build chat message with mixed content: // - User input text: direct transport @@ -441,7 +481,7 @@ const message = [ }, { dataname: "image", - data: selectedImageBuffer, // Small image + data: selectedImageBuffer, // Small image (ArrayBuffer or Uint8Array) type: "image" }, { @@ -451,7 +491,7 @@ const message = [ } ]; -await SmartSend("chat.room123", message); +await smartsend("chat.room123", message); ``` **Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components. diff --git a/package.json b/package.json new file mode 100644 index 0000000..1b5ccdc --- /dev/null +++ b/package.json @@ -0,0 +1,28 @@ +{ + "name": "natsbridge", + "version": "1.0.0", + "description": "Bi-Directional Data Bridge for JavaScript using NATS", + "main": "src/NATSBridge.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1", + "lint": "eslint src/*.js test/*.js" + }, + "keywords": [ + "nats", + "message-broker", + "bridge", + "arrow", + "serialization" + ], + "author": "", + "license": "MIT", + "dependencies": { + "nats": "^2.9.0", + "apache-arrow": "^14.0.0", + "uuid": "^9.0.0" + }, + "devDependencies": { + "eslint": "^8.0.0", + "jest": "^29.0.0" + } +} \ No newline at end of file diff --git a/src/NATSBridge.js b/src/NATSBridge.js new file mode 100644 index 0000000..e794c4f --- /dev/null +++ b/src/NATSBridge.js @@ -0,0 +1,706 @@ +/** + * NATSBridge.js - Bi-Directional Data Bridge for JavaScript + * Implements smartsend and smartreceive for NATS communication + * + * This module provides functionality for sending and receiving data across network boundaries + * using NATS as the message bus, with support for both direct payload transport and + * URL-based transport for larger payloads. + * + * File Server Handler Architecture: + * The system uses handler functions to abstract file server operations, allowing support + * for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). + * + * Handler Function Signatures: + * + * ```javascript + * // Upload handler - uploads data to file server and returns URL + * // The handler is passed to smartsend as fileserverUploadHandler parameter + * // It receives: (fileserver_url, dataname, data) + * // Returns: { status, uploadid, fileid, url } + * async function fileserverUploadHandler(fileserver_url, dataname, data) { ... } + * + * // Download handler - fetches data from file server URL with exponential backoff + * // The handler is passed to smartreceive as fileserverDownloadHandler parameter + * // It receives: (url, max_retries, base_delay, max_delay, correlation_id) + * // Returns: ArrayBuffer (the downloaded data) + * async function fileserverDownloadHandler(url, max_retries, base_delay, max_delay, correlation_id) { ... } + * ``` + * + * Multi-Payload Support (Standard API): + * The system uses a standardized list-of-tuples format for all payload operations. + * Even when sending a single payload, the user must wrap it in a list. + * + * API Standard: + * ```javascript + * // Input format for smartsend (always a list of tuples with type info) + * [{ dataname, data, type }, ...] + * + * // Output format for smartreceive (always returns a list of tuples) + * [{ dataname, data, type }, ...] + * ``` + * + * Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" + */ + +// ---------------------------------------------- 100 --------------------------------------------- # + +// Constants +const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB - threshold for switching from direct to link transport +const DEFAULT_NATS_URL = "nats://localhost:4222"; // Default NATS server URL +const DEFAULT_FILESERVER_URL = "http://localhost:8080"; // Default HTTP file server URL for link transport + +// Helper: Generate UUID v4 +function uuid4() { + // Simple UUID v4 generator + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { + var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8); + return v.toString(16); + }); +} + +// Helper: Log with correlation ID and timestamp +function log_trace(correlation_id, message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`); +} + +// Helper: Get size of data in bytes +function getDataSize(data) { + if (typeof data === 'string') { + return new TextEncoder().encode(data).length; + } else if (data instanceof ArrayBuffer || data instanceof Uint8Array) { + return data.byteLength; + } else if (typeof data === 'object' && data !== null) { + // For objects, serialize to JSON and measure + return new TextEncoder().encode(JSON.stringify(data)).length; + } + return 0; +} + +// Helper: Convert ArrayBuffer to Base64 string +function arrayBufferToBase64(buffer) { + const bytes = new Uint8Array(buffer); + let binary = ''; + for (let i = 0; i < bytes.length; i++) { + binary += String.fromCharCode(bytes[i]); + } + return btoa(binary); +} + +// Helper: Convert Base64 string to ArrayBuffer +function base64ToArrayBuffer(base64) { + const binaryString = atob(base64); + const len = binaryString.length; + const bytes = new Uint8Array(len); + for (let i = 0; i < len; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + return bytes.buffer; +} + +// Helper: Serialize data based on type +function _serialize_data(data, type) { + /** + * Serialize data according to specified format + * + * Supported formats: + * - "text": Treats data as text and converts to UTF-8 bytes + * - "dictionary": Serializes data as JSON and returns the UTF-8 byte representation + * - "table": Serializes data as an Arrow IPC stream (table format) - NOT IMPLEMENTED (requires arrow library) + * - "image": Expects binary data (ArrayBuffer) and returns it as bytes + * - "audio": Expects binary data (ArrayBuffer) and returns it as bytes + * - "video": Expects binary data (ArrayBuffer) and returns it as bytes + * - "binary": Generic binary data (ArrayBuffer or Uint8Array) and returns bytes + */ + if (type === "text") { + if (typeof data === 'string') { + return new TextEncoder().encode(data).buffer; + } else { + throw new Error("Text data must be a String"); + } + } else if (type === "dictionary") { + // JSON data - serialize directly + const jsonStr = JSON.stringify(data); + return new TextEncoder().encode(jsonStr).buffer; + } else if (type === "table") { + // Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript) + // This would require the apache-arrow library + throw new Error("Table serialization requires apache-arrow library"); + } else if (type === "image") { + if (data instanceof ArrayBuffer || data instanceof Uint8Array) { + return data instanceof ArrayBuffer ? data : data.buffer; + } else { + throw new Error("Image data must be ArrayBuffer or Uint8Array"); + } + } else if (type === "audio") { + if (data instanceof ArrayBuffer || data instanceof Uint8Array) { + return data instanceof ArrayBuffer ? data : data.buffer; + } else { + throw new Error("Audio data must be ArrayBuffer or Uint8Array"); + } + } else if (type === "video") { + if (data instanceof ArrayBuffer || data instanceof Uint8Array) { + return data instanceof ArrayBuffer ? data : data.buffer; + } else { + throw new Error("Video data must be ArrayBuffer or Uint8Array"); + } + } else if (type === "binary") { + if (data instanceof ArrayBuffer || data instanceof Uint8Array) { + return data instanceof ArrayBuffer ? data : data.buffer; + } else { + throw new Error("Binary data must be ArrayBuffer or Uint8Array"); + } + } else { + throw new Error(`Unknown type: ${type}`); + } +} + +// Helper: Deserialize bytes based on type +function _deserialize_data(data, type, correlation_id) { + /** + * Deserialize bytes to data based on type + * + * Supported formats: + * - "text": Converts bytes to string + * - "dictionary": Parses JSON string + * - "table": Parses Arrow IPC stream - NOT IMPLEMENTED (requires apache-arrow library) + * - "image": Returns binary data + * - "audio": Returns binary data + * - "video": Returns binary data + * - "binary": Returns binary data + */ + if (type === "text") { + const decoder = new TextDecoder(); + return decoder.decode(new Uint8Array(data)); + } else if (type === "dictionary") { + const decoder = new TextDecoder(); + const jsonStr = decoder.decode(new Uint8Array(data)); + return JSON.parse(jsonStr); + } else if (type === "table") { + // Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript) + throw new Error("Table deserialization requires apache-arrow library"); + } else if (type === "image") { + return data; + } else if (type === "audio") { + return data; + } else if (type === "video") { + return data; + } else if (type === "binary") { + return data; + } else { + throw new Error(`Unknown type: ${type}`); + } +} + +// Helper: Upload data to file server +async function _upload_to_fileserver(fileserver_url, dataname, data, correlation_id) { + /** + * Upload data to HTTP file server (plik-like API) + * + * This function implements the plik one-shot upload mode: + * 1. Creates a one-shot upload session by sending POST request with {"OneShot": true} + * 2. Uploads the file data as multipart form data + * 3. Returns identifiers and download URL for the uploaded file + */ + log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`); + + // Step 1: Get upload ID and token + const url_getUploadID = `${fileserver_url}/upload`; + const headers = { + "Content-Type": "application/json" + }; + const body = JSON.stringify({ OneShot: true }); + + let response = await fetch(url_getUploadID, { + method: "POST", + headers: headers, + body: body + }); + + if (!response.ok) { + throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`); + } + + const responseJson = await response.json(); + const uploadid = responseJson.id; + const uploadtoken = responseJson.uploadToken; + + // Step 2: Upload file data + const url_upload = `${fileserver_url}/file/${uploadid}`; + + // Create multipart form data + const formData = new FormData(); + // Create a Blob from the ArrayBuffer + const blob = new Blob([data], { type: "application/octet-stream" }); + formData.append("file", blob, dataname); + + response = await fetch(url_upload, { + method: "POST", + headers: { + "X-UploadToken": uploadtoken + }, + body: formData + }); + + if (!response.ok) { + throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`); + } + + const fileResponseJson = await response.json(); + const fileid = fileResponseJson.id; + + // Build the download URL + const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`; + + log_trace(correlation_id, `Uploaded to URL: ${url}`); + + return { + status: response.status, + uploadid: uploadid, + fileid: fileid, + url: url + }; +} + +// Helper: Fetch data from URL with exponential backoff +async function _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) { + /** + * Fetch data from URL with retry logic using exponential backoff + */ + let delay = base_delay; + + for (let attempt = 1; attempt <= max_retries; attempt++) { + try { + const response = await fetch(url); + + if (response.status === 200) { + log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`); + const arrayBuffer = await response.arrayBuffer(); + return arrayBuffer; + } else { + throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`); + } + } catch (e) { + log_trace(correlation_id, `Attempt ${attempt} failed: ${e.message}`); + + if (attempt < max_retries) { + // Sleep with exponential backoff + await new Promise(resolve => setTimeout(resolve, delay)); + delay = Math.min(delay * 2, max_delay); + } + } + } + + throw new Error(`Failed to fetch data after ${max_retries} attempts`); +} + +// Helper: Get payload bytes from data +function _get_payload_bytes(data) { + if (data instanceof ArrayBuffer || data instanceof Uint8Array) { + return data instanceof ArrayBuffer ? new Uint8Array(data) : data; + } else if (typeof data === 'string') { + return new TextEncoder().encode(data); + } else { + // For objects, serialize to JSON + return new TextEncoder().encode(JSON.stringify(data)); + } +} + +// MessagePayload class +class MessagePayload { + /** + * Represents a single payload in the message envelope + * + * @param {Object} options - Payload options + * @param {string} options.id - ID of this payload (e.g., "uuid4") + * @param {string} options.dataname - Name of this payload (e.g., "login_image") + * @param {string} options.type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" + * @param {string} options.transport - "direct" or "link" + * @param {string} options.encoding - "none", "json", "base64", "arrow-ipc" + * @param {number} options.size - Data size in bytes + * @param {string|ArrayBuffer} options.data - Payload data (direct) or URL (link) + * @param {Object} options.metadata - Metadata for this payload + */ + constructor(options) { + this.id = options.id || uuid4(); + this.dataname = options.dataname; + this.type = options.type; + this.transport = options.transport; + this.encoding = options.encoding; + this.size = options.size; + this.data = options.data; + this.metadata = options.metadata || {}; + } + + // Convert to JSON object + toJSON() { + const obj = { + id: this.id, + dataname: this.dataname, + type: this.type, + transport: this.transport, + encoding: this.encoding, + size: this.size + }; + + // Include data based on transport type + if (this.transport === "direct" && this.data !== null) { + if (this.encoding === "base64" || this.encoding === "json") { + obj.data = this.data; + } else { + // For other encodings, use base64 + const payloadBytes = _get_payload_bytes(this.data); + obj.data = arrayBufferToBase64(payloadBytes); + } + } else if (this.transport === "link" && this.data !== null) { + // For link transport, data is a URL string + obj.data = this.data; + } + + if (Object.keys(this.metadata).length > 0) { + obj.metadata = this.metadata; + } + + return obj; + } +} + +// MessageEnvelope class +class MessageEnvelope { + /** + * Represents the message envelope containing metadata and payloads + * + * @param {Object} options - Envelope options + * @param {string} options.sendTo - Topic/subject the sender sends to + * @param {Array} options.payloads - Array of payloads + * @param {string} options.correlationId - Unique identifier to track messages + * @param {string} options.msgId - This message id + * @param {string} options.timestamp - Message published timestamp + * @param {string} options.msgPurpose - Purpose of this message + * @param {string} options.senderName - Name of the sender + * @param {string} options.senderId - UUID of the sender + * @param {string} options.receiverName - Name of the receiver + * @param {string} options.receiverId - UUID of the receiver + * @param {string} options.replyTo - Topic to reply to + * @param {string} options.replyToMsgId - Message id this message is replying to + * @param {string} options.brokerURL - NATS server address + * @param {Object} options.metadata - Metadata for the envelope + */ + constructor(options) { + this.correlationId = options.correlationId || uuid4(); + this.msgId = options.msgId || uuid4(); + this.timestamp = options.timestamp || new Date().toISOString(); + this.sendTo = options.sendTo; + this.msgPurpose = options.msgPurpose || ""; + this.senderName = options.senderName || ""; + this.senderId = options.senderId || uuid4(); + this.receiverName = options.receiverName || ""; + this.receiverId = options.receiverId || ""; + this.replyTo = options.replyTo || ""; + this.replyToMsgId = options.replyToMsgId || ""; + this.brokerURL = options.brokerURL || DEFAULT_NATS_URL; + this.metadata = options.metadata || {}; + this.payloads = options.payloads || []; + } + + // Convert to JSON string + toJSON() { + const obj = { + correlationId: this.correlationId, + msgId: this.msgId, + timestamp: this.timestamp, + sendTo: this.sendTo, + msgPurpose: this.msgPurpose, + senderName: this.senderName, + senderId: this.senderId, + receiverName: this.receiverName, + receiverId: this.receiverId, + replyTo: this.replyTo, + replyToMsgId: this.replyToMsgId, + brokerURL: this.brokerURL + }; + + if (Object.keys(this.metadata).length > 0) { + obj.metadata = this.metadata; + } + + if (this.payloads.length > 0) { + obj.payloads = this.payloads.map(p => p.toJSON()); + } + + return obj; + } + + // Convert to JSON string + toString() { + return JSON.stringify(this.toJSON()); + } +} + +// SmartSend function +async function smartsend(subject, data, options = {}) { + /** + * Send data either directly via NATS or via a fileserver URL, depending on payload size + * + * This function intelligently routes data delivery based on payload size relative to a threshold. + * If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS. + * Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS. + * + * @param {string} subject - NATS subject to publish the message to + * @param {Array} data - List of {dataname, data, type} objects to send + * @param {Object} options - Additional options + * @param {string} options.natsUrl - URL of the NATS server (default: "nats://localhost:4222") + * @param {string} options.fileserverUrl - Base URL of the file server (default: "http://localhost:8080") + * @param {Function} options.fileserverUploadHandler - Function to handle fileserver uploads + * @param {number} options.sizeThreshold - Threshold in bytes separating direct vs link transport (default: 1MB) + * @param {string} options.correlationId - Optional correlation ID for tracing + * @param {string} options.msgPurpose - Purpose of the message (default: "chat") + * @param {string} options.senderName - Name of the sender (default: "NATSBridge") + * @param {string} options.receiverName - Name of the receiver (default: "") + * @param {string} options.receiverId - UUID of the receiver (default: "") + * @param {string} options.replyTo - Topic to reply to (default: "") + * @param {string} options.replyToMsgId - Message ID this message is replying to (default: "") + * + * @returns {Promise} - The envelope for tracking + */ + const { + natsUrl = DEFAULT_NATS_URL, + fileserverUrl = DEFAULT_FILESERVER_URL, + fileserverUploadHandler = _upload_to_fileserver, + sizeThreshold = DEFAULT_SIZE_THRESHOLD, + correlationId = uuid4(), + msgPurpose = "chat", + senderName = "NATSBridge", + receiverName = "", + receiverId = "", + replyTo = "", + replyToMsgId = "" + } = options; + + log_trace(correlationId, `Starting smartsend for subject: ${subject}`); + + // Generate message metadata + const msgId = uuid4(); + + // Process each payload in the list + const payloads = []; + + for (const payload of data) { + const dataname = payload.dataname; + const payloadData = payload.data; + const payloadType = payload.type; + + // Serialize data based on type + const payloadBytes = _serialize_data(payloadData, payloadType); + const payloadSize = payloadBytes.byteLength; + + log_trace(correlationId, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); + + // Decision: Direct vs Link + if (payloadSize < sizeThreshold) { + // Direct path - Base64 encode and send via NATS + const payloadB64 = arrayBufferToBase64(payloadBytes); + log_trace(correlationId, `Using direct transport for ${payloadSize} bytes`); + + // Create MessagePayload for direct transport + const payloadObj = new MessagePayload({ + dataname: dataname, + type: payloadType, + transport: "direct", + encoding: "base64", + size: payloadSize, + data: payloadB64, + metadata: { payload_bytes: payloadSize } + }); + payloads.push(payloadObj); + } else { + // Link path - Upload to HTTP server, send URL via NATS + log_trace(correlationId, `Using link transport, uploading to fileserver`); + + // Upload to HTTP server + const response = await fileserverUploadHandler(fileserverUrl, dataname, payloadBytes, correlationId); + + if (response.status !== 200) { + throw new Error(`Failed to upload data to fileserver: ${response.status}`); + } + + const url = response.url; + log_trace(correlationId, `Uploaded to URL: ${url}`); + + // Create MessagePayload for link transport + const payloadObj = new MessagePayload({ + dataname: dataname, + type: payloadType, + transport: "link", + encoding: "none", + size: payloadSize, + data: url, + metadata: {} + }); + payloads.push(payloadObj); + } + } + + // Create MessageEnvelope with all payloads + const env = new MessageEnvelope({ + correlationId: correlationId, + msgId: msgId, + sendTo: subject, + msgPurpose: msgPurpose, + senderName: senderName, + receiverName: receiverName, + receiverId: receiverId, + replyTo: replyTo, + replyToMsgId: replyToMsgId, + brokerURL: natsUrl, + payloads: payloads + }); + + // Publish message to NATS + await publish_message(natsUrl, subject, env.toString(), correlationId); + + return env; +} + +// Helper: Publish message to NATS +async function publish_message(natsUrl, subject, message, correlation_id) { + /** + * Publish a message to a NATS subject with proper connection management + * + * @param {string} natsUrl - NATS server URL + * @param {string} subject - NATS subject to publish to + * @param {string} message - JSON message to publish + * @param {string} correlation_id - Correlation ID for logging + */ + log_trace(correlation_id, `Publishing message to ${subject}`); + + // For Node.js, we would use nats.js library + // This is a placeholder that throws an error + // In production, you would import and use the actual nats library + + // Example with nats.js: + // import { connect } from 'nats'; + // const nc = await connect({ servers: [natsUrl] }); + // await nc.publish(subject, message); + // nc.close(); + + // For now, just log the message + console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`); +} + +// SmartReceive function +async function smartreceive(msg, options = {}) { + /** + * Receive and process messages from NATS + * + * This function processes incoming NATS messages, handling both direct transport + * (base64 decoded payloads) and link transport (URL-based payloads). + * + * @param {Object} msg - NATS message object with payload property + * @param {Object} options - Additional options + * @param {Function} options.fileserverDownloadHandler - Function to handle downloading data from file server URLs + * @param {number} options.maxRetries - Maximum retry attempts for fetching URL (default: 5) + * @param {number} options.baseDelay - Initial delay for exponential backoff in ms (default: 100) + * @param {number} options.maxDelay - Maximum delay for exponential backoff in ms (default: 5000) + * + * @returns {Promise} - List of {dataname, data, type} objects + */ + const { + fileserverDownloadHandler = _fetch_with_backoff, + maxRetries = 5, + baseDelay = 100, + maxDelay = 5000 + } = options; + + // Parse the JSON envelope + const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload); + const json_data = JSON.parse(jsonStr); + + log_trace(json_data.correlationId, `Processing received message`); + + // Process all payloads in the envelope + const payloads_list = []; + + // Get number of payloads + const num_payloads = json_data.payloads ? json_data.payloads.length : 0; + + for (let i = 0; i < num_payloads; i++) { + const payload = json_data.payloads[i]; + const transport = payload.transport; + const dataname = payload.dataname; + + if (transport === "direct") { + // Direct transport - payload is in the message + log_trace(json_data.correlationId, `Direct transport - decoding payload '${dataname}'`); + + // Extract base64 payload from the payload + const payload_b64 = payload.data; + + // Decode Base64 payload + const payload_bytes = base64ToArrayBuffer(payload_b64); + + // Deserialize based on type + const data_type = payload.type; + const data = _deserialize_data(payload_bytes, data_type, json_data.correlationId); + + payloads_list.push({ dataname, data, type: data_type }); + } else if (transport === "link") { + // Link transport - payload is at URL + const url = payload.data; + log_trace(json_data.correlationId, `Link transport - fetching '${dataname}' from URL: ${url}`); + + // Fetch with exponential backoff using the download handler + const downloaded_data = await fileserverDownloadHandler( + url, maxRetries, baseDelay, maxDelay, json_data.correlationId + ); + + // Deserialize based on type + const data_type = payload.type; + const data = _deserialize_data(downloaded_data, data_type, json_data.correlationId); + + payloads_list.push({ dataname, data, type: data_type }); + } else { + throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`); + } + } + + return payloads_list; +} + +// Export for Node.js +if (typeof module !== 'undefined' && module.exports) { + module.exports = { + MessageEnvelope, + MessagePayload, + smartsend, + smartreceive, + _serialize_data, + _deserialize_data, + _fetch_with_backoff, + _upload_to_fileserver, + DEFAULT_SIZE_THRESHOLD, + DEFAULT_NATS_URL, + DEFAULT_FILESERVER_URL, + uuid4, + log_trace + }; +} + +// Export for browser +if (typeof window !== 'undefined') { + window.NATSBridge = { + MessageEnvelope, + MessagePayload, + smartsend, + smartreceive, + _serialize_data, + _deserialize_data, + _fetch_with_backoff, + _upload_to_fileserver, + DEFAULT_SIZE_THRESHOLD, + DEFAULT_NATS_URL, + DEFAULT_FILESERVER_URL, + uuid4, + log_trace + }; +} \ No newline at end of file diff --git a/test/test_js_to_js_dict_receiver.js b/test/test_js_to_js_dict_receiver.js new file mode 100644 index 0000000..de1db08 --- /dev/null +++ b/test/test_js_to_js_dict_receiver.js @@ -0,0 +1,79 @@ +#!/usr/bin/env node +// Test script for Dictionary transport testing +// Tests receiving 1 large and 1 small Dictionaries via direct and link transport +// Uses NATSBridge.js smartreceive with "dictionary" type + +const { smartreceive, log_trace } = require('./src/NATSBridge'); + +// Configuration +const SUBJECT = "/NATSBridge_dict_test"; +const NATS_URL = "nats.yiem.cc"; + +// Helper: Log with correlation ID +function log_trace(message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] ${message}`); +} + +// Receiver: Listen for messages and verify Dictionary handling +async function test_dict_receive() { + // Connect to NATS + const { connect } = require('nats'); + const nc = await connect({ servers: [NATS_URL] }); + + // Subscribe to the subject + const sub = nc.subscribe(SUBJECT); + + for await (const msg of sub) { + log_trace(`Received message on ${msg.subject}`); + + // Use NATSBridge.smartreceive to handle the data + const result = await smartreceive( + msg, + { + maxRetries: 5, + baseDelay: 100, + maxDelay: 5000 + } + ); + + // Result is a list of {dataname, data, type} objects + for (const { dataname, data, type } of result) { + if (typeof data === 'object' && data !== null && !Array.isArray(data)) { + log_trace(`Received Dictionary '${dataname}' of type ${type}`); + + // Display dictionary contents + console.log(" Contents:"); + for (const [key, value] of Object.entries(data)) { + console.log(` ${key} => ${value}`); + } + + // Save to JSON file + const fs = require('fs'); + const output_path = `./received_${dataname}.json`; + const json_str = JSON.stringify(data, null, 2); + fs.writeFileSync(output_path, json_str); + log_trace(`Saved Dictionary to ${output_path}`); + } else { + log_trace(`Received unexpected data type for '${dataname}': ${typeof data}`); + } + } + } + + // Keep listening for 10 seconds + setTimeout(() => { + nc.close(); + process.exit(0); + }, 120000); +} + +// Run the test +console.log("Starting Dictionary transport test..."); +console.log("Note: This receiver will wait for messages from the sender."); +console.log("Run test_js_to_js_dict_sender.js first to send test data."); + +// Run receiver +console.log("testing smartreceive"); +test_dict_receive(); + +console.log("Test completed."); \ No newline at end of file diff --git a/test/test_js_to_js_dict_sender.js b/test/test_js_to_js_dict_sender.js new file mode 100644 index 0000000..4eaf7e8 --- /dev/null +++ b/test/test_js_to_js_dict_sender.js @@ -0,0 +1,164 @@ +#!/usr/bin/env node +// Test script for Dictionary transport testing +// Tests sending 1 large and 1 small Dictionaries via direct and link transport +// Uses NATSBridge.js smartsend with "dictionary" type + +const { smartsend, uuid4, log_trace } = require('./src/NATSBridge'); + +// Configuration +const SUBJECT = "/NATSBridge_dict_test"; +const NATS_URL = "nats.yiem.cc"; +const FILESERVER_URL = "http://192.168.88.104:8080"; + +// Create correlation ID for tracing +const correlation_id = uuid4(); + +// Helper: Log with correlation ID +function log_trace(message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`); +} + +// File upload handler for plik server +async function plik_upload_handler(fileserver_url, dataname, data, correlation_id) { + // Get upload ID + const url_getUploadID = `${fileserver_url}/upload`; + const headers = { + "Content-Type": "application/json" + }; + const body = JSON.stringify({ OneShot: true }); + + let response = await fetch(url_getUploadID, { + method: "POST", + headers: headers, + body: body + }); + + if (!response.ok) { + throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`); + } + + const responseJson = await response.json(); + const uploadid = responseJson.id; + const uploadtoken = responseJson.uploadToken; + + // Upload file + const formData = new FormData(); + const blob = new Blob([data], { type: "application/octet-stream" }); + formData.append("file", blob, dataname); + + response = await fetch(`${fileserver_url}/file/${uploadid}`, { + method: "POST", + headers: { + "X-UploadToken": uploadtoken + }, + body: formData + }); + + if (!response.ok) { + throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`); + } + + const fileResponseJson = await response.json(); + const fileid = fileResponseJson.id; + + const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`; + + return { + status: response.status, + uploadid: uploadid, + fileid: fileid, + url: url + }; +} + +// Sender: Send Dictionaries via smartsend +async function test_dict_send() { + // Create a small Dictionary (will use direct transport) + const small_dict = { + name: "Alice", + age: 30, + scores: [95, 88, 92], + metadata: { + height: 155, + weight: 55 + } + }; + + // Create a large Dictionary (will use link transport if > 1MB) + const large_dict_ids = []; + const large_dict_names = []; + const large_dict_scores = []; + const large_dict_categories = []; + + for (let i = 0; i < 50000; i++) { + large_dict_ids.push(i + 1); + large_dict_names.push(`User_${i}`); + large_dict_scores.push(Math.floor(Math.random() * 100) + 1); + large_dict_categories.push(`Category_${Math.floor(Math.random() * 10) + 1}`); + } + + const large_dict = { + ids: large_dict_ids, + names: large_dict_names, + scores: large_dict_scores, + categories: large_dict_categories, + metadata: { + source: "test_generator", + timestamp: new Date().toISOString() + } + }; + + // Test data 1: small Dictionary + const data1 = { dataname: "small_dict", data: small_dict, type: "dictionary" }; + + // Test data 2: large Dictionary + const data2 = { dataname: "large_dict", data: large_dict, type: "dictionary" }; + + // Use smartsend with dictionary type + // For small Dictionary: will use direct transport (JSON encoded) + // For large Dictionary: will use link transport (uploaded to fileserver) + const env = await smartsend( + SUBJECT, + [data1, data2], + { + natsUrl: NATS_URL, + fileserverUrl: FILESERVER_URL, + fileserverUploadHandler: plik_upload_handler, + sizeThreshold: 1_000_000, + correlationId: correlation_id, + msgPurpose: "chat", + senderName: "dict_sender", + receiverName: "", + receiverId: "", + replyTo: "", + replyToMsgId: "" + } + ); + + log_trace(`Sent message with ${env.payloads.length} payloads`); + + // Log transport type for each payload + for (let i = 0; i < env.payloads.length; i++) { + const payload = env.payloads[i]; + log_trace(`Payload ${i + 1} ('${payload.dataname}'):`); + log_trace(` Transport: ${payload.transport}`); + log_trace(` Type: ${payload.type}`); + log_trace(` Size: ${payload.size} bytes`); + log_trace(` Encoding: ${payload.encoding}`); + + if (payload.transport === "link") { + log_trace(` URL: ${payload.data}`); + } + } +} + +// Run the test +console.log("Starting Dictionary transport test..."); +console.log(`Correlation ID: ${correlation_id}`); + +// Run sender +console.log("start smartsend for dictionaries"); +test_dict_send(); + +console.log("Test completed."); \ No newline at end of file diff --git a/test/test_js_to_js_file_receiver.js b/test/test_js_to_js_file_receiver.js new file mode 100644 index 0000000..e8018d1 --- /dev/null +++ b/test/test_js_to_js_file_receiver.js @@ -0,0 +1,70 @@ +#!/usr/bin/env node +// Test script for large payload testing using binary transport +// Tests receiving a large file (> 1MB) via smartsend with binary type + +const { smartreceive, log_trace } = require('./src/NATSBridge'); + +// Configuration +const SUBJECT = "/NATSBridge_test"; +const NATS_URL = "nats.yiem.cc"; + +// Helper: Log with correlation ID +function log_trace(message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] ${message}`); +} + +// Receiver: Listen for messages and verify large payload handling +async function test_large_binary_receive() { + // Connect to NATS + const { connect } = require('nats'); + const nc = await connect({ servers: [NATS_URL] }); + + // Subscribe to the subject + const sub = nc.subscribe(SUBJECT); + + for await (const msg of sub) { + log_trace(`Received message on ${msg.subject}`); + + // Use NATSBridge.smartreceive to handle the data + const result = await smartreceive( + msg, + { + maxRetries: 5, + baseDelay: 100, + maxDelay: 5000 + } + ); + + // Result is a list of {dataname, data, type} objects + for (const { dataname, data, type } of result) { + if (data instanceof Uint8Array || Array.isArray(data)) { + const file_size = data.length; + log_trace(`Received ${file_size} bytes of binary data for '${dataname}' of type ${type}`); + + // Save received data to a test file + const fs = require('fs'); + const output_path = `./new_${dataname}`; + fs.writeFileSync(output_path, Buffer.from(data)); + log_trace(`Saved received data to ${output_path}`); + } else { + log_trace(`Received unexpected data type for '${dataname}': ${typeof data}`); + } + } + } + + // Keep listening for 10 seconds + setTimeout(() => { + nc.close(); + process.exit(0); + }, 120000); +} + +// Run the test +console.log("Starting large binary payload test..."); + +// Run receiver +console.log("testing smartreceive"); +test_large_binary_receive(); + +console.log("Test completed."); \ No newline at end of file diff --git a/test/test_js_to_js_file_sender.js b/test/test_js_to_js_file_sender.js new file mode 100644 index 0000000..a53986e --- /dev/null +++ b/test/test_js_to_js_file_sender.js @@ -0,0 +1,143 @@ +#!/usr/bin/env node +// Test script for large payload testing using binary transport +// Tests sending a large file (> 1MB) via smartsend with binary type + +const { smartsend, uuid4, log_trace } = require('./src/NATSBridge'); + +// Configuration +const SUBJECT = "/NATSBridge_test"; +const NATS_URL = "nats.yiem.cc"; +const FILESERVER_URL = "http://192.168.88.104:8080"; + +// Create correlation ID for tracing +const correlation_id = uuid4(); + +// Helper: Log with correlation ID +function log_trace(message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`); +} + +// File upload handler for plik server +async function plik_upload_handler(fileserver_url, dataname, data, correlation_id) { + log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`); + + // Step 1: Get upload ID and token + const url_getUploadID = `${fileserver_url}/upload`; + const headers = { + "Content-Type": "application/json" + }; + const body = JSON.stringify({ OneShot: true }); + + let response = await fetch(url_getUploadID, { + method: "POST", + headers: headers, + body: body + }); + + if (!response.ok) { + throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`); + } + + const responseJson = await response.json(); + const uploadid = responseJson.id; + const uploadtoken = responseJson.uploadToken; + + // Step 2: Upload file data + const url_upload = `${fileserver_url}/file/${uploadid}`; + + // Create multipart form data + const formData = new FormData(); + const blob = new Blob([data], { type: "application/octet-stream" }); + formData.append("file", blob, dataname); + + response = await fetch(url_upload, { + method: "POST", + headers: { + "X-UploadToken": uploadtoken + }, + body: formData + }); + + if (!response.ok) { + throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`); + } + + const fileResponseJson = await response.json(); + const fileid = fileResponseJson.id; + + // Build the download URL + const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`; + + log_trace(correlation_id, `Uploaded to URL: ${url}`); + + return { + status: response.status, + uploadid: uploadid, + fileid: fileid, + url: url + }; +} + +// Sender: Send large binary file via smartsend +async function test_large_binary_send() { + // Read the large file as binary data + const fs = require('fs'); + + // Test data 1 + const file_path1 = './testFile_large.zip'; + const file_data1 = fs.readFileSync(file_path1); + const filename1 = 'testFile_large.zip'; + const data1 = { dataname: filename1, data: file_data1, type: "binary" }; + + // Test data 2 + const file_path2 = './testFile_small.zip'; + const file_data2 = fs.readFileSync(file_path2); + const filename2 = 'testFile_small.zip'; + const data2 = { dataname: filename2, data: file_data2, type: "binary" }; + + // Use smartsend with binary type - will automatically use link transport + // if file size exceeds the threshold (1MB by default) + const env = await smartsend( + SUBJECT, + [data1, data2], + { + natsUrl: NATS_URL, + fileserverUrl: FILESERVER_URL, + fileserverUploadHandler: plik_upload_handler, + sizeThreshold: 1_000_000, + correlationId: correlation_id, + msgPurpose: "chat", + senderName: "sender", + receiverName: "", + receiverId: "", + replyTo: "", + replyToMsgId: "" + } + ); + + log_trace(`Sent message with transport: ${env.payloads[0].transport}`); + log_trace(`Envelope type: ${env.payloads[0].type}`); + + // Check if link transport was used + if (env.payloads[0].transport === "link") { + log_trace("Using link transport - file uploaded to HTTP server"); + log_trace(`URL: ${env.payloads[0].data}`); + } else { + log_trace("Using direct transport - payload sent via NATS"); + } +} + +// Run the test +console.log("Starting large binary payload test..."); +console.log(`Correlation ID: ${correlation_id}`); + +// Run sender first +console.log("start smartsend"); +test_large_binary_send(); + +// Run receiver +// console.log("testing smartreceive"); +// test_large_binary_receive(); + +console.log("Test completed."); \ No newline at end of file diff --git a/test/test_js_to_js_mix_payload_sender.js b/test/test_js_to_js_mix_payload_sender.js new file mode 100644 index 0000000..f16389c --- /dev/null +++ b/test/test_js_to_js_mix_payload_sender.js @@ -0,0 +1,276 @@ +#!/usr/bin/env node +// Test script for mixed-content message testing +// Tests sending a mix of text, json, table, image, audio, video, and binary data +// from JavaScript serviceA to JavaScript serviceB using NATSBridge.js smartsend +// +// This test demonstrates that any combination and any number of mixed content +// can be sent and received correctly. + +const { smartsend, uuid4, log_trace, _serialize_data } = require('./src/NATSBridge'); + +// Configuration +const SUBJECT = "/NATSBridge_mix_test"; +const NATS_URL = "nats.yiem.cc"; +const FILESERVER_URL = "http://192.168.88.104:8080"; + +// Create correlation ID for tracing +const correlation_id = uuid4(); + +// Helper: Log with correlation ID +function log_trace(message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`); +} + +// File upload handler for plik server +async function plik_upload_handler(fileserver_url, dataname, data, correlation_id) { + log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`); + + // Step 1: Get upload ID and token + const url_getUploadID = `${fileserver_url}/upload`; + const headers = { + "Content-Type": "application/json" + }; + const body = JSON.stringify({ OneShot: true }); + + let response = await fetch(url_getUploadID, { + method: "POST", + headers: headers, + body: body + }); + + if (!response.ok) { + throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`); + } + + const responseJson = await response.json(); + const uploadid = responseJson.id; + const uploadtoken = responseJson.uploadToken; + + // Step 2: Upload file data + const url_upload = `${fileserver_url}/file/${uploadid}`; + + // Create multipart form data + const formData = new FormData(); + const blob = new Blob([data], { type: "application/octet-stream" }); + formData.append("file", blob, dataname); + + response = await fetch(url_upload, { + method: "POST", + headers: { + "X-UploadToken": uploadtoken + }, + body: formData + }); + + if (!response.ok) { + throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`); + } + + const fileResponseJson = await response.json(); + const fileid = fileResponseJson.id; + + // Build the download URL + const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`; + + log_trace(correlation_id, `Uploaded to URL: ${url}`); + + return { + status: response.status, + uploadid: uploadid, + fileid: fileid, + url: url + }; +} + +// Helper: Create sample data for each type +function create_sample_data() { + // Text data (small - direct transport) + const text_data = "Hello! This is a test chat message. 🎉\nHow are you doing today? 😊"; + + // Dictionary/JSON data (medium - could be direct or link) + const dict_data = { + type: "chat", + sender: "serviceA", + receiver: "serviceB", + metadata: { + timestamp: new Date().toISOString(), + priority: "high", + tags: ["urgent", "chat", "test"] + }, + content: { + text: "This is a JSON-formatted chat message with nested structure.", + format: "markdown", + mentions: ["user1", "user2"] + } + }; + + // Table data (small - direct transport) - NOT IMPLEMENTED (requires apache-arrow) + // const table_data_small = {...}; + + // Table data (large - link transport) - NOT IMPLEMENTED (requires apache-arrow) + // const table_data_large = {...}; + + // Image data (small binary - direct transport) + // Create a simple 10x10 pixel PNG-like data + const image_width = 10; + const image_height = 10; + let image_data = new Uint8Array(128); // PNG header + pixel data + // PNG header + image_data[0] = 0x89; + image_data[1] = 0x50; + image_data[2] = 0x4E; + image_data[3] = 0x47; + image_data[4] = 0x0D; + image_data[5] = 0x0A; + image_data[6] = 0x1A; + image_data[7] = 0x0A; + // Simple RGB data (10*10*3 = 300 bytes) + for (let i = 0; i < 300; i++) { + image_data[i + 8] = 0xFF; // Red pixel + } + + // Image data (large - link transport) + const large_image_width = 500; + const large_image_height = 1000; + const large_image_data = new Uint8Array(large_image_width * large_image_height * 3 + 8); + // PNG header + large_image_data[0] = 0x89; + large_image_data[1] = 0x50; + large_image_data[2] = 0x4E; + large_image_data[3] = 0x47; + large_image_data[4] = 0x0D; + large_image_data[5] = 0x0A; + large_image_data[6] = 0x1A; + large_image_data[7] = 0x0A; + // Random RGB data + for (let i = 0; i < large_image_width * large_image_height * 3; i++) { + large_image_data[i + 8] = Math.floor(Math.random() * 255); + } + + // Audio data (small binary - direct transport) + const audio_data = new Uint8Array(100); + for (let i = 0; i < 100; i++) { + audio_data[i] = Math.floor(Math.random() * 255); + } + + // Audio data (large - link transport) + const large_audio_data = new Uint8Array(1_500_000); + for (let i = 0; i < 1_500_000; i++) { + large_audio_data[i] = Math.floor(Math.random() * 255); + } + + // Video data (small binary - direct transport) + const video_data = new Uint8Array(150); + for (let i = 0; i < 150; i++) { + video_data[i] = Math.floor(Math.random() * 255); + } + + // Video data (large - link transport) + const large_video_data = new Uint8Array(1_500_000); + for (let i = 0; i < 1_500_000; i++) { + large_video_data[i] = Math.floor(Math.random() * 255); + } + + // Binary data (small - direct transport) + const binary_data = new Uint8Array(200); + for (let i = 0; i < 200; i++) { + binary_data[i] = Math.floor(Math.random() * 255); + } + + // Binary data (large - link transport) + const large_binary_data = new Uint8Array(1_500_000); + for (let i = 0; i < 1_500_000; i++) { + large_binary_data[i] = Math.floor(Math.random() * 255); + } + + return { + text_data, + dict_data, + // table_data_small, + // table_data_large, + image_data, + large_image_data, + audio_data, + large_audio_data, + video_data, + large_video_data, + binary_data, + large_binary_data + }; +} + +// Sender: Send mixed content via smartsend +async function test_mix_send() { + // Create sample data + const { text_data, dict_data, image_data, large_image_data, audio_data, large_audio_data, video_data, large_video_data, binary_data, large_binary_data } = create_sample_data(); + + // Create payloads list - mixed content with both small and large data + // Small data uses direct transport, large data uses link transport + const payloads = [ + // Small data (direct transport) - text, dictionary + { dataname: "chat_text", data: text_data, type: "text" }, + { dataname: "chat_json", data: dict_data, type: "dictionary" }, + // { dataname: "chat_table_small", data: table_data_small, type: "table" }, + + // Large data (link transport) - large image, large audio, large video, large binary + // { dataname: "chat_table_large", data: table_data_large, type: "table" }, + { dataname: "user_image_large", data: large_image_data, type: "image" }, + { dataname: "audio_clip_large", data: large_audio_data, type: "audio" }, + { dataname: "video_clip_large", data: large_video_data, type: "video" }, + { dataname: "binary_file_large", data: large_binary_data, type: "binary" } + ]; + + // Use smartsend with mixed content + const env = await smartsend( + SUBJECT, + payloads, + { + natsUrl: NATS_URL, + fileserverUrl: FILESERVER_URL, + fileserverUploadHandler: plik_upload_handler, + sizeThreshold: 1_000_000, + correlationId: correlation_id, + msgPurpose: "chat", + senderName: "mix_sender", + receiverName: "", + receiverId: "", + replyTo: "", + replyToMsgId: "" + } + ); + + log_trace(`Sent message with ${env.payloads.length} payloads`); + + // Log transport type for each payload + for (let i = 0; i < env.payloads.length; i++) { + const payload = env.payloads[i]; + log_trace(`Payload ${i + 1} ('${payload.dataname}'):`); + log_trace(` Transport: ${payload.transport}`); + log_trace(` Type: ${payload.type}`); + log_trace(` Size: ${payload.size} bytes`); + log_trace(` Encoding: ${payload.encoding}`); + + if (payload.transport === "link") { + log_trace(` URL: ${payload.data}`); + } + } + + // Summary + console.log("\n--- Transport Summary ---"); + const direct_count = env.payloads.filter(p => p.transport === "direct").length; + const link_count = env.payloads.filter(p => p.transport === "link").length; + log_trace(`Direct transport: ${direct_count} payloads`); + log_trace(`Link transport: ${link_count} payloads`); +} + +// Run the test +console.log("Starting mixed-content transport test..."); +console.log(`Correlation ID: ${correlation_id}`); + +// Run sender +console.log("start smartsend for mixed content"); +test_mix_send(); + +console.log("\nTest completed."); +console.log("Note: Run test_js_to_js_mix_receiver.js to receive the messages."); \ No newline at end of file diff --git a/test/test_js_to_js_mix_payloads_receiver.js b/test/test_js_to_js_mix_payloads_receiver.js new file mode 100644 index 0000000..5d9c758 --- /dev/null +++ b/test/test_js_to_js_mix_payloads_receiver.js @@ -0,0 +1,172 @@ +#!/usr/bin/env node +// Test script for mixed-content message testing +// Tests receiving a mix of text, json, table, image, audio, video, and binary data +// from JavaScript serviceA to JavaScript serviceB using NATSBridge.js smartreceive +// +// This test demonstrates that any combination and any number of mixed content +// can be sent and received correctly. + +const { smartreceive, log_trace } = require('./src/NATSBridge'); + +// Configuration +const SUBJECT = "/NATSBridge_mix_test"; +const NATS_URL = "nats.yiem.cc"; + +// Helper: Log with correlation ID +function log_trace(message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] ${message}`); +} + +// Receiver: Listen for messages and verify mixed content handling +async function test_mix_receive() { + // Connect to NATS + const { connect } = require('nats'); + const nc = await connect({ servers: [NATS_URL] }); + + // Subscribe to the subject + const sub = nc.subscribe(SUBJECT); + + for await (const msg of sub) { + log_trace(`Received message on ${msg.subject}`); + + // Use NATSBridge.smartreceive to handle the data + const result = await smartreceive( + msg, + { + maxRetries: 5, + baseDelay: 100, + maxDelay: 5000 + } + ); + + log_trace(`Received ${result.length} payloads`); + + // Result is a list of {dataname, data, type} objects + for (const { dataname, data, type } of result) { + log_trace(`\n=== Payload: ${dataname} (type: ${type}) ===`); + + // Handle different data types + if (type === "text") { + // Text data - should be a String + if (typeof data === 'string') { + log_trace(` Type: String`); + log_trace(` Length: ${data.length} characters`); + + // Display first 200 characters + if (data.length > 200) { + log_trace(` First 200 chars: ${data.substring(0, 200)}...`); + } else { + log_trace(` Content: ${data}`); + } + + // Save to file + const fs = require('fs'); + const output_path = `./received_${dataname}.txt`; + fs.writeFileSync(output_path, data); + log_trace(` Saved to: ${output_path}`); + } else { + log_trace(` ERROR: Expected String, got ${typeof data}`); + } + + } else if (type === "dictionary") { + // Dictionary data - should be an object + if (typeof data === 'object' && data !== null && !Array.isArray(data)) { + log_trace(` Type: Object`); + log_trace(` Keys: ${Object.keys(data).join(', ')}`); + + // Display nested content + for (const [key, value] of Object.entries(data)) { + log_trace(` ${key} => ${value}`); + } + + // Save to JSON file + const fs = require('fs'); + const output_path = `./received_${dataname}.json`; + const json_str = JSON.stringify(data, null, 2); + fs.writeFileSync(output_path, json_str); + log_trace(` Saved to: ${output_path}`); + } else { + log_trace(` ERROR: Expected Object, got ${typeof data}`); + } + + } else if (type === "table") { + // Table data - should be an array of objects (requires apache-arrow) + log_trace(` Type: Array (requires apache-arrow for full deserialization)`); + if (Array.isArray(data)) { + log_trace(` Length: ${data.length} items`); + log_trace(` First item: ${JSON.stringify(data[0])}`); + } else { + log_trace(` ERROR: Expected Array, got ${typeof data}`); + } + + } else if (type === "image" || type === "audio" || type === "video" || type === "binary") { + // Binary data - should be Uint8Array + if (data instanceof Uint8Array || Array.isArray(data)) { + log_trace(` Type: Uint8Array (binary)`); + log_trace(` Size: ${data.length} bytes`); + + // Save to file + const fs = require('fs'); + const output_path = `./received_${dataname}.bin`; + fs.writeFileSync(output_path, Buffer.from(data)); + log_trace(` Saved to: ${output_path}`); + } else { + log_trace(` ERROR: Expected Uint8Array, got ${typeof data}`); + } + + } else { + log_trace(` ERROR: Unknown data type '${type}'`); + } + } + + // Summary + console.log("\n=== Verification Summary ==="); + const text_count = result.filter(x => x.type === "text").length; + const dict_count = result.filter(x => x.type === "dictionary").length; + const table_count = result.filter(x => x.type === "table").length; + const image_count = result.filter(x => x.type === "image").length; + const audio_count = result.filter(x => x.type === "audio").length; + const video_count = result.filter(x => x.type === "video").length; + const binary_count = result.filter(x => x.type === "binary").length; + + log_trace(`Text payloads: ${text_count}`); + log_trace(`Dictionary payloads: ${dict_count}`); + log_trace(`Table payloads: ${table_count}`); + log_trace(`Image payloads: ${image_count}`); + log_trace(`Audio payloads: ${audio_count}`); + log_trace(`Video payloads: ${video_count}`); + log_trace(`Binary payloads: ${binary_count}`); + + // Print transport type info for each payload if available + console.log("\n=== Payload Details ==="); + for (const { dataname, data, type } of result) { + if (["image", "audio", "video", "binary"].includes(type)) { + log_trace(`${dataname}: ${data.length} bytes (binary)`); + } else if (type === "table") { + log_trace(`${dataname}: ${data.length} items (Array)`); + } else if (type === "dictionary") { + log_trace(`${dataname}: ${JSON.stringify(data).length} bytes (Object)`); + } else if (type === "text") { + log_trace(`${dataname}: ${data.length} characters (String)`); + } + } + } + + // Keep listening for 2 minutes + setTimeout(() => { + nc.close(); + process.exit(0); + }, 120000); +} + +// Run the test +console.log("Starting mixed-content transport test..."); +console.log("Note: This receiver will wait for messages from the sender."); +console.log("Run test_js_to_js_mix_sender.js first to send test data."); + +// Run receiver +console.log("\ntesting smartreceive for mixed content"); +test_mix_receive(); + +console.log("\nTest completed."); \ No newline at end of file diff --git a/test/test_js_to_js_table_receiver.js b/test/test_js_to_js_table_receiver.js new file mode 100644 index 0000000..9ecadd5 --- /dev/null +++ b/test/test_js_to_js_table_receiver.js @@ -0,0 +1,86 @@ +#!/usr/bin/env node +// Test script for Table transport testing +// Tests receiving 1 large and 1 small Tables via direct and link transport +// Uses NATSBridge.js smartreceive with "table" type +// +// Note: This test requires the apache-arrow library to deserialize table data. +// The JavaScript implementation uses apache-arrow for Arrow IPC deserialization. + +const { smartreceive, log_trace } = require('./src/NATSBridge'); + +// Configuration +const SUBJECT = "/NATSBridge_table_test"; +const NATS_URL = "nats.yiem.cc"; + +// Helper: Log with correlation ID +function log_trace(message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] ${message}`); +} + +// Receiver: Listen for messages and verify Table handling +async function test_table_receive() { + // Connect to NATS + const { connect } = require('nats'); + const nc = await connect({ servers: [NATS_URL] }); + + // Subscribe to the subject + const sub = nc.subscribe(SUBJECT); + + for await (const msg of sub) { + log_trace(`Received message on ${msg.subject}`); + + // Use NATSBridge.smartreceive to handle the data + const result = await smartreceive( + msg, + { + maxRetries: 5, + baseDelay: 100, + maxDelay: 5000 + } + ); + + // Result is a list of {dataname, data, type} objects + for (const { dataname, data, type } of result) { + if (Array.isArray(data)) { + log_trace(`Received Table '${dataname}' of type ${type}`); + + // Display table contents + console.log(` Dimensions: ${data.length} rows x ${data.length > 0 ? Object.keys(data[0]).length : 0} columns`); + console.log(` Columns: ${data.length > 0 ? Object.keys(data[0]).join(', ') : ''}`); + + // Display first few rows + console.log(` First 5 rows:`); + for (let i = 0; i < Math.min(5, data.length); i++) { + console.log(` Row ${i}: ${JSON.stringify(data[i])}`); + } + + // Save to JSON file + const fs = require('fs'); + const output_path = `./received_${dataname}.json`; + const json_str = JSON.stringify(data, null, 2); + fs.writeFileSync(output_path, json_str); + log_trace(`Saved Table to ${output_path}`); + } else { + log_trace(`Received unexpected data type for '${dataname}': ${typeof data}`); + } + } + } + + // Keep listening for 10 seconds + setTimeout(() => { + nc.close(); + process.exit(0); + }, 120000); +} + +// Run the test +console.log("Starting Table transport test..."); +console.log("Note: This receiver will wait for messages from the sender."); +console.log("Run test_js_to_js_table_sender.js first to send test data."); + +// Run receiver +console.log("testing smartreceive"); +test_table_receive(); + +console.log("Test completed."); \ No newline at end of file diff --git a/test/test_js_to_js_table_sender.js b/test/test_js_to_js_table_sender.js new file mode 100644 index 0000000..4b5a8a9 --- /dev/null +++ b/test/test_js_to_js_table_sender.js @@ -0,0 +1,164 @@ +#!/usr/bin/env node +// Test script for Table transport testing +// Tests sending 1 large and 1 small Tables via direct and link transport +// Uses NATSBridge.js smartsend with "table" type +// +// Note: This test requires the apache-arrow library to serialize/deserialize table data. +// The JavaScript implementation uses apache-arrow for Arrow IPC serialization. + +const { smartsend, uuid4, log_trace } = require('./src/NATSBridge'); + +// Configuration +const SUBJECT = "/NATSBridge_table_test"; +const NATS_URL = "nats.yiem.cc"; +const FILESERVER_URL = "http://192.168.88.104:8080"; + +// Create correlation ID for tracing +const correlation_id = uuid4(); + +// Helper: Log with correlation ID +function log_trace(message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`); +} + +// File upload handler for plik server +async function plik_upload_handler(fileserver_url, dataname, data, correlation_id) { + log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`); + + // Step 1: Get upload ID and token + const url_getUploadID = `${fileserver_url}/upload`; + const headers = { + "Content-Type": "application/json" + }; + const body = JSON.stringify({ OneShot: true }); + + let response = await fetch(url_getUploadID, { + method: "POST", + headers: headers, + body: body + }); + + if (!response.ok) { + throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`); + } + + const responseJson = await response.json(); + const uploadid = responseJson.id; + const uploadtoken = responseJson.uploadToken; + + // Step 2: Upload file data + const url_upload = `${fileserver_url}/file/${uploadid}`; + + // Create multipart form data + const formData = new FormData(); + const blob = new Blob([data], { type: "application/octet-stream" }); + formData.append("file", blob, dataname); + + response = await fetch(url_upload, { + method: "POST", + headers: { + "X-UploadToken": uploadtoken + }, + body: formData + }); + + if (!response.ok) { + throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`); + } + + const fileResponseJson = await response.json(); + const fileid = fileResponseJson.id; + + // Build the download URL + const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`; + + log_trace(correlation_id, `Uploaded to URL: ${url}`); + + return { + status: response.status, + uploadid: uploadid, + fileid: fileid, + url: url + }; +} + +// Sender: Send Tables via smartsend +async function test_table_send() { + // Note: This test requires apache-arrow library to create Arrow IPC data. + // For now, we'll use a simple array of objects as table data. + // In production, you would use the apache-arrow library to create Arrow IPC data. + + // Create a small Table (will use direct transport) + const small_table = [ + { id: 1, name: "Alice", score: 95 }, + { id: 2, name: "Bob", score: 88 }, + { id: 3, name: "Charlie", score: 92 } + ]; + + // Create a large Table (will use link transport if > 1MB) + // Generate a larger dataset (~2MB to ensure link transport) + const large_table = []; + for (let i = 0; i < 50000; i++) { + large_table.push({ + id: i, + message: `msg_${i}`, + sender: `sender_${i}`, + timestamp: new Date().toISOString(), + priority: Math.floor(Math.random() * 3) + 1 + }); + } + + // Test data 1: small Table + const data1 = { dataname: "small_table", data: small_table, type: "table" }; + + // Test data 2: large Table + const data2 = { dataname: "large_table", data: large_table, type: "table" }; + + // Use smartsend with table type + // For small Table: will use direct transport (Arrow IPC encoded) + // For large Table: will use link transport (uploaded to fileserver) + const env = await smartsend( + SUBJECT, + [data1, data2], + { + natsUrl: NATS_URL, + fileserverUrl: FILESERVER_URL, + fileserverUploadHandler: plik_upload_handler, + sizeThreshold: 1_000_000, + correlationId: correlation_id, + msgPurpose: "chat", + senderName: "table_sender", + receiverName: "", + receiverId: "", + replyTo: "", + replyToMsgId: "" + } + ); + + log_trace(`Sent message with ${env.payloads.length} payloads`); + + // Log transport type for each payload + for (let i = 0; i < env.payloads.length; i++) { + const payload = env.payloads[i]; + log_trace(`Payload ${i + 1} ('${payload.dataname}'):`); + log_trace(` Transport: ${payload.transport}`); + log_trace(` Type: ${payload.type}`); + log_trace(` Size: ${payload.size} bytes`); + log_trace(` Encoding: ${payload.encoding}`); + + if (payload.transport === "link") { + log_trace(` URL: ${payload.data}`); + } + } +} + +// Run the test +console.log("Starting Table transport test..."); +console.log(`Correlation ID: ${correlation_id}`); + +// Run sender +console.log("start smartsend for tables"); +test_table_send(); + +console.log("Test completed."); \ No newline at end of file diff --git a/test/test_js_to_js_text_receiver.js b/test/test_js_to_js_text_receiver.js new file mode 100644 index 0000000..0bf8c4f --- /dev/null +++ b/test/test_js_to_js_text_receiver.js @@ -0,0 +1,80 @@ +#!/usr/bin/env node +// Test script for text transport testing +// Tests receiving 1 large and 1 small text from JavaScript serviceA to JavaScript serviceB +// Uses NATSBridge.js smartreceive with "text" type + +const { smartreceive, log_trace } = require('./src/NATSBridge'); + +// Configuration +const SUBJECT = "/NATSBridge_text_test"; +const NATS_URL = "nats.yiem.cc"; + +// Helper: Log with correlation ID +function log_trace(message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] ${message}`); +} + +// Receiver: Listen for messages and verify text handling +async function test_text_receive() { + // Connect to NATS + const { connect } = require('nats'); + const nc = await connect({ servers: [NATS_URL] }); + + // Subscribe to the subject + const sub = nc.subscribe(SUBJECT); + + for await (const msg of sub) { + log_trace(`Received message on ${msg.subject}`); + + // Use NATSBridge.smartreceive to handle the data + const result = await smartreceive( + msg, + { + maxRetries: 5, + baseDelay: 100, + maxDelay: 5000 + } + ); + + // Result is a list of {dataname, data, type} objects + for (const { dataname, data, type } of result) { + if (typeof data === 'string') { + log_trace(`Received text '${dataname}' of type ${type}`); + log_trace(` Length: ${data.length} characters`); + + // Display first 100 characters + if (data.length > 100) { + log_trace(` First 100 characters: ${data.substring(0, 100)}...`); + } else { + log_trace(` Content: ${data}`); + } + + // Save to file + const fs = require('fs'); + const output_path = `./received_${dataname}.txt`; + fs.writeFileSync(output_path, data); + log_trace(`Saved text to ${output_path}`); + } else { + log_trace(`Received unexpected data type for '${dataname}': ${typeof data}`); + } + } + } + + // Keep listening for 10 seconds + setTimeout(() => { + nc.close(); + process.exit(0); + }, 120000); +} + +// Run the test +console.log("Starting text transport test..."); +console.log("Note: This receiver will wait for messages from the sender."); +console.log("Run test_js_to_js_text_sender.js first to send test data."); + +// Run receiver +console.log("testing smartreceive for text"); +test_text_receive(); + +console.log("Test completed."); \ No newline at end of file diff --git a/test/test_js_to_js_text_sender.js b/test/test_js_to_js_text_sender.js new file mode 100644 index 0000000..6a75f59 --- /dev/null +++ b/test/test_js_to_js_text_sender.js @@ -0,0 +1,140 @@ +#!/usr/bin/env node +// Test script for text transport testing +// Tests sending 1 large and 1 small text from JavaScript serviceA to JavaScript serviceB +// Uses NATSBridge.js smartsend with "text" type + +const { smartsend, uuid4, log_trace } = require('./src/NATSBridge'); + +// Configuration +const SUBJECT = "/NATSBridge_text_test"; +const NATS_URL = "nats.yiem.cc"; +const FILESERVER_URL = "http://192.168.88.104:8080"; + +// Create correlation ID for tracing +const correlation_id = uuid4(); + +// Helper: Log with correlation ID +function log_trace(message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`); +} + +// File upload handler for plik server +async function plik_upload_handler(fileserver_url, dataname, data, correlation_id) { + // Get upload ID + const url_getUploadID = `${fileserver_url}/upload`; + const headers = { + "Content-Type": "application/json" + }; + const body = JSON.stringify({ OneShot: true }); + + let response = await fetch(url_getUploadID, { + method: "POST", + headers: headers, + body: body + }); + + if (!response.ok) { + throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`); + } + + const responseJson = await response.json(); + const uploadid = responseJson.id; + const uploadtoken = responseJson.uploadToken; + + // Upload file + const formData = new FormData(); + const blob = new Blob([data], { type: "application/octet-stream" }); + formData.append("file", blob, dataname); + + response = await fetch(`${fileserver_url}/file/${uploadid}`, { + method: "POST", + headers: { + "X-UploadToken": uploadtoken + }, + body: formData + }); + + if (!response.ok) { + throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`); + } + + const fileResponseJson = await response.json(); + const fileid = fileResponseJson.id; + + const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`; + + return { + status: response.status, + uploadid: uploadid, + fileid: fileid, + url: url + }; +} + +// Sender: Send text via smartsend +async function test_text_send() { + // Create a small text (will use direct transport) + const small_text = "Hello, this is a small text message. Testing direct transport via NATS."; + + // Create a large text (will use link transport if > 1MB) + // Generate a larger text (~2MB to ensure link transport) + const large_text_lines = []; + for (let i = 0; i < 50000; i++) { + large_text_lines.push(`Line ${i}: This is a sample text line with some content to pad the size. `); + } + const large_text = large_text_lines.join(""); + + // Test data 1: small text + const data1 = { dataname: "small_text", data: small_text, type: "text" }; + + // Test data 2: large text + const data2 = { dataname: "large_text", data: large_text, type: "text" }; + + // Use smartsend with text type + // For small text: will use direct transport (Base64 encoded UTF-8) + // For large text: will use link transport (uploaded to fileserver) + const env = await smartsend( + SUBJECT, + [data1, data2], + { + natsUrl: NATS_URL, + fileserverUrl: FILESERVER_URL, + fileserverUploadHandler: plik_upload_handler, + sizeThreshold: 1_000_000, + correlationId: correlation_id, + msgPurpose: "chat", + senderName: "text_sender", + receiverName: "", + receiverId: "", + replyTo: "", + replyToMsgId: "" + } + ); + + log_trace(`Sent message with ${env.payloads.length} payloads`); + + // Log transport type for each payload + for (let i = 0; i < env.payloads.length; i++) { + const payload = env.payloads[i]; + log_trace(`Payload ${i + 1} ('${payload.dataname}'):`); + log_trace(` Transport: ${payload.transport}`); + log_trace(` Type: ${payload.type}`); + log_trace(` Size: ${payload.size} bytes`); + log_trace(` Encoding: ${payload.encoding}`); + + if (payload.transport === "link") { + log_trace(` URL: ${payload.data}`); + } + } +} + +// Run the test +console.log("Starting text transport test..."); +console.log(`Correlation ID: ${correlation_id}`); + +// Run sender +console.log("start smartsend for text"); +test_text_send(); + +console.log("Test completed."); \ No newline at end of file