/** * NATSBridge - Cross-Platform Bi-Directional Data Bridge * JavaScript/Node.js Implementation (Desktop/Server-Side) * * This module provides functionality for sending and receiving data across network boundaries * using NATS as the message bus, with support for both direct payload transport and * URL-based transport for larger payloads. * * Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" * * Node.js-specific features: * - Apache Arrow IPC support via apache-arrow * - TCP NATS connections (nats:// or tls:// URLs) * - Buffer for binary data handling * - Connection pooling for high-throughput scenarios * * @module NATSBridge */ const nats = require('nats'); const crypto = require('crypto'); // Use native fetch available in Node.js 18+ const arrow = require('apache-arrow'); // ---------------------------------------------- UUID Helper ---------------------------------------------- // /** * Generate UUID v4 using crypto module (Node.js compatible) * @returns {string} UUID string */ function uuidv4() { return crypto.randomUUID(); } // ---------------------------------------------- Constants ---------------------------------------------- // /** * Default size threshold for switching from direct to link transport (0.5MB) */ const DEFAULT_SIZE_THRESHOLD = 500_000; /** * Default NATS server URL */ const DEFAULT_BROKER_URL = 'nats://localhost:4222'; /** * Default HTTP file server URL for link transport */ const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; // ---------------------------------------------- Utility Functions ---------------------------------------------- // /** * Convert Buffer to Base64 string * @param {Buffer} buffer - Buffer to encode * @returns {string} Base64 encoded string */ function bufferToBase64(buffer) { return buffer.toString('base64'); } /** * Log a trace message with correlation ID and timestamp * @param {string} correlationId - Correlation ID for tracing * @param {string} message - Message content to log */ function logTrace(correlationId, message) { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`); } // ---------------------------------------------- Serialization Functions ---------------------------------------------- // /** * Serialize data according to specified format * @param {any} data - Data to serialize * @param {string} payloadType - Target format: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" * @returns {Buffer} Binary representation of the serialized data */ async function serializeData(data, payloadType) { if (payloadType === 'text') { if (typeof data === 'string') { return Buffer.from(data, 'utf8'); } else { throw new Error('Text data must be a string'); } } else if (payloadType === 'dictionary') { const jsonStr = JSON.stringify(data); return Buffer.from(jsonStr, 'utf8'); } else if (payloadType === 'arrowtable') { // Convert array of objects to Arrow IPC format if (!Array.isArray(data) || data.length === 0) { throw new Error('Arrow table data must be a non-empty array of objects'); } return serializeArrowTable(data); } else if (payloadType === 'jsontable') { // Serialize array of objects to JSON format if (!Array.isArray(data)) { throw new Error('JSON table data must be an array'); } const jsonStr = JSON.stringify(data); return Buffer.from(jsonStr, 'utf8'); } else if (payloadType === 'image') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Image data must be Uint8Array or Buffer'); } } else if (payloadType === 'audio') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Audio data must be Uint8Array or Buffer'); } } else if (payloadType === 'video') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Video data must be Uint8Array or Buffer'); } } else if (payloadType === 'binary') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Binary data must be Uint8Array or Buffer'); } } else { throw new Error(`Unknown payload_type: ${payloadType}`); } } /** * Helper function to properly serialize table data to Arrow IPC * @param {Array} data - Array of objects representing table rows * @returns {Buffer} Arrow IPC formatted buffer */ function serializeArrowTable(data) { if (!Array.isArray(data) || data.length === 0) { throw new Error('Table data must be a non-empty array of objects'); } logTrace('serializeArrowTable', `Serializing table with ${data.length} rows`); // Use arrow.tableFromArrays which handles the conversion properly // Convert array of objects to a key-value format expected by tableFromArrays const columns = {}; for (const key of Object.keys(data[0])) { columns[key] = data.map(row => row[key]); } logTrace('serializeArrowTable', `Columns: ${Object.keys(columns).join(', ')}`); const table = arrow.tableFromArrays(columns); logTrace('serializeArrowTable', `Arrow table created with ${table.numRows} rows, ${table.numCols} cols`); // Convert to IPC format const ipcBuffer = arrow.tableToIPC(table); logTrace('serializeArrowTable', `IPC buffer type: ${typeof ipcBuffer}, length: ${ipcBuffer.byteLength}`); const resultBuffer = Buffer.from(ipcBuffer); logTrace('serializeArrowTable', `Result buffer: ${resultBuffer.length} bytes`); // Debug: Show first 20 bytes in hex const hexPreview = resultBuffer.slice(0, 20).toString('hex'); logTrace('serializeArrowTable', `First 20 bytes (hex): ${hexPreview}`); return resultBuffer; } /** * Deserialize bytes to data based on type * @param {Buffer|Uint8Array} data - Serialized data as bytes * @param {string} payloadType - Data type * @param {string} correlationId - Correlation ID for logging * @returns {any} Deserialized data */ async function deserializeData(data, payloadType, correlationId) { const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data); logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`); // Debug: Show first 20 bytes in hex for binary data if (payloadType === 'arrowtable' || payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') { const hexPreview = buffer.slice(0, 20).toString('hex'); logTrace(correlationId, `deserializeData: First 20 bytes (hex): ${hexPreview}`); } if (payloadType === 'text') { const result = buffer.toString('utf8'); logTrace(correlationId, `deserializeData: text result length=${result.length}`); return result; } else if (payloadType === 'dictionary') { const jsonStr = buffer.toString('utf8'); const result = JSON.parse(jsonStr); logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`); return result; } else if (payloadType === 'arrowtable') { logTrace(correlationId, `deserializeData: Attempting Arrow table deserialization`); // Debug: Check available arrow methods logTrace(correlationId, `deserializeData: arrow.tableFromRawBytes exists: ${typeof arrow.tableFromRawBytes}`); logTrace(correlationId, `deserializeData: arrow.tableFromIPC exists: ${typeof arrow.tableFromIPC}`); try { // Try tableFromRawBytes first (older API) if (typeof arrow.tableFromRawBytes === 'function') { logTrace(correlationId, `deserializeData: Using tableFromRawBytes`); const table = arrow.tableFromRawBytes(buffer); logTrace(correlationId, `deserializeData: Arrow table - rows=${table.numRows}, cols=${table.numCols}`); return table; } } catch (e) { logTrace(correlationId, `deserializeData: tableFromRawBytes failed: ${e.message}`); } try { // Try tableFromIPC (newer API) if (typeof arrow.tableFromIPC === 'function') { logTrace(correlationId, `deserializeData: Using tableFromIPC`); const table = arrow.tableFromIPC(buffer); logTrace(correlationId, `deserializeData: Arrow table from IPC - rows=${table.numRows}, cols=${table.numCols}`); return table; } } catch (e) { logTrace(correlationId, `deserializeData: tableFromIPC failed: ${e.message}`); } throw new Error(`Unable to deserialize Arrow table: neither tableFromRawBytes nor tableFromIPC worked`); } else if (payloadType === 'jsontable') { const jsonStr = buffer.toString('utf8'); const result = JSON.parse(jsonStr); logTrace(correlationId, `deserializeData: jsontable result length=${Array.isArray(result) ? result.length : 'N/A'}`); return result; } else if (payloadType === 'image') { logTrace(correlationId, `deserializeData: image buffer length=${buffer.length}`); return buffer; } else if (payloadType === 'audio') { logTrace(correlationId, `deserializeData: audio buffer length=${buffer.length}`); return buffer; } else if (payloadType === 'video') { logTrace(correlationId, `deserializeData: video buffer length=${buffer.length}`); return buffer; } else if (payloadType === 'binary') { logTrace(correlationId, `deserializeData: binary buffer length=${buffer.length}`); return buffer; } else { throw new Error(`Unknown payload_type: ${payloadType}`); } } // ---------------------------------------------- File Server Handlers ---------------------------------------------- // /** * Upload data to plik server in one-shot mode * @param {string} fileServerUrl - Base URL of the plik server * @param {string} dataname - Name of the file being uploaded * @param {Buffer|Uint8Array} data - Raw byte data of the file content * @returns {Promise<{status: number, uploadid: string, fileid: string, url: string}>} */ async function plikOneshotUpload(fileServerUrl, dataname, data) { const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data); // Get upload id const urlGetUploadID = `${fileServerUrl}/upload`; const headers = { 'Content-Type': 'application/json' }; const body = JSON.stringify({ OneShot: true }); const httpResponse = await fetch(urlGetUploadID, { method: 'POST', headers, body }); const responseJson = await httpResponse.json(); const uploadid = responseJson.id; const uploadtoken = responseJson.uploadToken; // Upload file const urlUpload = `${fileServerUrl}/file/${uploadid}`; const form = new FormData(); const blob = new Blob([buffer], { type: 'application/octet-stream' }); form.append('file', blob, dataname); const uploadHeaders = { 'X-UploadToken': uploadtoken }; const uploadResponse = await fetch(urlUpload, { method: 'POST', headers: uploadHeaders, body: form }); const uploadJson = await uploadResponse.json(); const fileid = uploadJson.id; const url = `${fileServerUrl}/file/${uploadid}/${fileid}/${dataname}`; return { status: uploadResponse.status, uploadid, fileid, url }; } /** * Fetch data from URL with exponential backoff * @param {string} url - URL to fetch from * @param {number} maxRetries - Maximum number of retry attempts * @param {number} baseDelay - Initial delay in milliseconds * @param {number} maxDelay - Maximum delay in milliseconds * @param {string} correlationId - Correlation ID for logging * @returns {Promise} Fetched data as bytes */ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) { let delay = baseDelay; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { const response = await fetch(url); if (response.status === 200) { logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`); const arrayBuffer = await response.arrayBuffer(); return new Uint8Array(arrayBuffer); } else { throw new Error(`Failed to fetch: ${response.status}`); } } catch (e) { logTrace(correlationId, `Attempt ${attempt} failed: ${e.constructor.name} - ${e.message}`); if (attempt < maxRetries) { await new Promise(resolve => setTimeout(resolve, delay)); delay = Math.min(delay * 2, maxDelay); } } } throw new Error(`Failed to fetch data after ${maxRetries} attempts`); } // ---------------------------------------------- NATS Client ---------------------------------------------- // /** * NATS client wrapper for connection management * Supports both single-use and persistent connection modes */ class NATSClient { /** * Create a new NATS client * @param {string} url - NATS server URL (nats:// or tls://) * @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes */ constructor(url, keepAlive = false) { this.url = url; this.connection = null; this.keepAlive = keepAlive; } /** * Connect to NATS server * @returns {Promise} */ async connect() { if (this.connection) { return this.connection; } this.connection = await nats.connect({ servers: this.url }); return this.connection; } /** * Publish message to NATS subject * @param {string} subject - NATS subject to publish to * @param {string} message - Message to publish * @param {string} correlationId - Correlation ID for logging */ async publish(subject, message, correlationId) { if (!this.connection) { await this.connect(); } await this.connection.publish(subject, message); logTrace(correlationId, `Message published to ${subject}`); } /** * Close the NATS connection */ async close() { if (this.connection) { this.connection.close(); this.connection = null; } } /** * Get the current connection (for external use) * @returns {NATS.Connection|null} */ getConnection() { return this.connection; } /** * Check if connected * @returns {boolean} */ isConnected() { return this.connection !== null; } } /** * Connection pool for managing multiple NATS connections * Useful for applications with multiple concurrent publishers */ class NATSConnectionPool { /** * Create a new connection pool * @param {string} url - NATS server URL (nats:// or tls://) * @param {number} [maxSize=10] - Maximum pool size */ constructor(url, maxSize = 10) { this.url = url; this.maxSize = maxSize; this.connections = new Map(); this.idCounter = 0; } /** * Get a connection from the pool (or create new) * @returns {Promise} */ async acquire() { // Try to find an existing idle connection for (const [id, client] of this.connections) { if (client.isConnected()) { return client; } } // Create new connection if under limit if (this.connections.size < this.maxSize) { const id = `conn_${++this.idCounter}`; const client = new NATSClient(this.url, true); await client.connect(); this.connections.set(id, client); return client; } // Pool exhausted - create new connection (caller should close when done) const client = new NATSClient(this.url, false); await client.connect(); return client; } /** * Return a connection to the pool * @param {NATSClient} client - Connection to return */ release(client) { // Only return persistent connections if (client.keepAlive && client.isConnected()) { // Connection already in pool, do nothing return; } // Non-persistent connection - close it client.close(); } /** * Close all connections in the pool */ async closeAll() { for (const [id, client] of this.connections) { await client.close(); } this.connections.clear(); } } // ---------------------------------------------- Core Functions ---------------------------------------------- // /** * Publish message to NATS * @param {string|NATSClient|NATS.Connection} brokerUrlOrClient - NATS URL, client, or connection * @param {string} subject - NATS subject to publish to * @param {string} message - JSON message to publish * @param {string} correlationId - Correlation ID for tracing * @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections) */ async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) { let conn; let shouldClose = false; if (brokerUrlOrClient instanceof NATSClient) { conn = brokerUrlOrClient; } else if (brokerUrlOrClient && typeof brokerUrlOrClient.publish === 'function') { // Create a wrapper for direct connection (duck-typing check for NATS connection) conn = { async publish(subj, msg) { await brokerUrlOrClient.publish(subj, msg); }, async close() { await brokerUrlOrClient.close(); } }; shouldClose = true; } else { // String URL - create new client const client = new NATSClient(brokerUrlOrClient); conn = client; shouldClose = true; } await conn.publish(subject, message, correlationId); // Only close if explicitly requested and it's a short-lived client if (shouldClose && closeConnection && conn instanceof NATSClient) { await conn.close(); } } /** * Build message envelope from payloads and metadata * @param {string} subject - NATS subject * @param {Array} payloads - Array of payload objects * @param {Object} options - Envelope metadata options * @returns {Object} Envelope object */ function buildEnvelope(subject, payloads, options) { return { correlation_id: options.correlation_id, msg_id: options.msg_id, timestamp: new Date().toISOString(), send_to: subject, msg_purpose: options.msg_purpose, sender_name: options.sender_name, sender_id: options.sender_id, receiver_name: options.receiver_name, receiver_id: options.receiver_id, reply_to: options.reply_to, reply_to_msg_id: options.reply_to_msg_id, broker_url: options.broker_url, metadata: options.metadata || {}, payloads: payloads }; } /** * Build payload object from serialized data * @param {string} dataname - Name of the payload * @param {string} payloadType - Type of the payload * @param {Buffer} payloadBytes - Serialized payload bytes * @param {string} transport - Transport type ("direct" or "link") * @param {string} data - Data (base64 for direct, URL for link) * @returns {Object} Payload object */ function buildPayload(dataname, payloadType, payloadBytes, transport, data) { // Determine encoding based on payload type (matching Julia implementation) let encoding = 'base64'; if (payloadType === 'jsontable') { encoding = 'json'; } else if (payloadType === 'arrowtable') { encoding = 'arrow-ipc'; } return { id: uuidv4(), dataname, payload_type: payloadType, transport, encoding, size: payloadBytes.byteLength, data, metadata: transport === 'direct' ? { payload_bytes: payloadBytes.byteLength } : {} }; } /** * Send data via NATS with automatic transport selection * * This function intelligently routes data delivery based on payload size. * If the serialized payload is smaller than size_threshold, it encodes the data as Base64 * and publishes directly over NATS. Otherwise, it uploads the data to a fileserver * and publishes only the download URL over NATS. * * @param {string} subject - NATS subject to publish the message to * @param {Array} data - List of [dataname, data, type] tuples to send * - type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" * @param {Object} options - Optional configuration * @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server * @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server * @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads * @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport * @param {string} [options.correlation_id=crypto.randomUUID()] - Correlation ID for tracing * @param {string} [options.msg_purpose="chat"] - Purpose of the message * @param {string} [options.sender_name="NATSBridge"] - Name of the sender * @param {string} [options.receiver_name=""] - Name of the receiver (empty means broadcast) * @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast) * @param {string} [options.reply_to=""] - Topic to reply to * @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to * @param {boolean} [options.is_publish=true] - Whether to automatically publish the message * @param {NATSClient|NATS.Connection} [options.nats_connection=null] - Pre-existing NATS connection * @param {string} [options.msg_id=crypto.randomUUID()] - Message ID * @param {string} [options.sender_id=crypto.randomUUID()] - Sender ID * @returns {Promise<[Object, string]>} Tuple of [env, env_json_str] * * @example * // Send a single payload * const [env, envJsonStr] = await smartsend( * "/test", * [["dataname1", data1, "dictionary"]], * { broker_url: "nats://localhost:4222" } * ); * * // Send multiple payloads * const [env, envJsonStr] = await smartsend( * "/test", * [ * ["dataname1", data1, "dictionary"], * ["dataname2", data2, "arrowtable"] * ], * { broker_url: "nats://localhost:4222" } * ); * * // Send with pre-existing connection * const client = await NATSBridge.NATSClient.connect("nats://localhost:4222"); * const [env, envJsonStr] = await smartsend( * "/test", * [["data", myData, "text"]], * { nats_connection: client } * ); */ async function smartsend(subject, data, options = {}) { const { broker_url = DEFAULT_BROKER_URL, fileserver_url = DEFAULT_FILESERVER_URL, fileserver_upload_handler = plikOneshotUpload, size_threshold = DEFAULT_SIZE_THRESHOLD, correlation_id = uuidv4(), msg_purpose = 'chat', sender_name = 'NATSBridge', receiver_name = '', receiver_id = '', reply_to = '', reply_to_msg_id = '', is_publish = true, nats_connection = null, msg_id = uuidv4(), sender_id = uuidv4() } = options; logTrace(correlation_id, `Starting smartsend for subject: ${subject}`); logTrace(correlation_id, `smartsend: data array length=${data.length}`); // Debug: Log input data structure for (let i = 0; i < data.length; i++) { const [dataname, payloadData, payloadType] = data[i]; logTrace(correlation_id, `smartsend: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`); } // Process payloads const payloads = []; for (const [dataname, payloadData, payloadType] of data) { logTrace(correlation_id, `smartsend: Processing payload '${dataname}' type=${payloadType}`); logTrace(correlation_id, `smartsend: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`); const payloadBytes = await serializeData(payloadData, payloadType); const payloadSize = payloadBytes.byteLength; logTrace(correlation_id, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); // Debug: Show first 20 bytes of serialized data for table type if (payloadType === 'table') { const hexPreview = payloadBytes.slice(0, 20).toString('hex'); logTrace(correlation_id, `Serialized table data first 20 bytes (hex): ${hexPreview}`); } if (payloadSize < size_threshold) { // Direct path const payloadB64 = bufferToBase64(payloadBytes); logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes, base64 length=${payloadB64.length}`); const payload = buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64); payloads.push(payload); } else { // Link path logTrace(correlation_id, `Using link transport, uploading to fileserver`); const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); if (response.status !== 200) { throw new Error(`Failed to upload data to fileserver: ${response.status}`); } logTrace(correlation_id, `Uploaded to URL: ${response.url}`); const payload = buildPayload(dataname, payloadType, payloadBytes, 'link', response.url); payloads.push(payload); } } // Build envelope const env = buildEnvelope(subject, payloads, { correlation_id, msg_id, msg_purpose, sender_name, sender_id, receiver_name, receiver_id, reply_to, reply_to_msg_id, broker_url }); const env_json_str = JSON.stringify(env); if (is_publish) { if (nats_connection) { await publishMessage(nats_connection, subject, env_json_str, correlation_id); } else { await publishMessage(broker_url, subject, env_json_str, correlation_id); } } return [env, env_json_str]; } /** * Receive and process NATS message * * This function processes incoming NATS messages, handling both direct transport * (base64 decoded payloads) and link transport (URL-based payloads). * It deserializes the data based on the transport type and returns the result. * * @param {Object} msg - NATS message object with payload property * @param {Object} options - Optional configuration * @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads * @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL * @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms * @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms * @returns {Promise} Envelope object with processed payloads * * @example * // Receive and process message * const env = await smartreceive(msg, { * fileserver_download_handler: fetchWithBackoff, * max_retries: 5, * base_delay: 100, * max_delay: 5000 * }); * // env.payloads is an Array of [dataname, data, type] arrays * for (const [dataname, data, type] of env.payloads) { * console.log(`${dataname}: ${data} (type: ${type})`); * } */ async function smartreceive(msg, options = {}) { const { fileserver_download_handler = fetchWithBackoff, max_retries = 5, base_delay = 100, max_delay = 5000 } = options; // Debug: Log message object structure logTrace('smartreceive', `smartreceive: msg object keys: ${Object.keys(msg).join(', ')}`); logTrace('smartreceive', `smartreceive: msg.data type: ${typeof msg.data}, constructor: ${msg.data?.constructor?.name}`); logTrace('smartreceive', `smartreceive: msg.payload type: ${typeof msg.payload}, constructor: ${msg.payload?.constructor?.name}`); // Parse the JSON envelope // NATS.js v2.x uses msg.data instead of msg.payload let payload; if (msg.data !== undefined) { payload = typeof msg.data === 'string' ? msg.data : Buffer.from(msg.data).toString('utf8'); } else if (msg.payload !== undefined) { payload = typeof msg.payload === 'string' ? msg.payload : Buffer.from(msg.payload).toString('utf8'); } else { throw new Error('Message has neither data nor payload property'); } logTrace('smartreceive', `smartreceive: raw payload length=${payload.length}`); // Debug: Show first 200 chars of payload const payloadPreview = payload.substring(0, 200); logTrace('smartreceive', `smartreceive: payload preview: ${payloadPreview}`); let envJsonObj; try { envJsonObj = JSON.parse(payload); } catch (e) { logTrace('smartreceive', `smartreceive: JSON parse failed: ${e.message}`); throw e; } logTrace(envJsonObj.correlation_id, 'Processing received message'); logTrace(envJsonObj.correlation_id, `smartreceive: envelope has ${envJsonObj.payloads.length} payloads`); // Process all payloads in the envelope const payloadsList = []; const numPayloads = envJsonObj.payloads.length; logTrace(envJsonObj.correlation_id, `smartreceive: Processing ${numPayloads} payloads`); for (let i = 0; i < numPayloads; i++) { const payloadObj = envJsonObj.payloads[i]; const transport = payloadObj.transport; const dataname = payloadObj.dataname; const payloadType = payloadObj.payload_type; logTrace(envJsonObj.correlation_id, `smartreceive: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`); if (transport === 'direct') { logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`); // Extract base64 payload from the payload const payloadB64 = payloadObj.data; logTrace(envJsonObj.correlation_id, `Direct transport: base64 length=${payloadB64?.length}`); // Decode Base64 payload const payloadBytes = Buffer.from(payloadB64, 'base64'); logTrace(envJsonObj.correlation_id, `Direct transport: decoded bytes=${payloadBytes.length}`); // Deserialize based on type const dataType = payloadObj.payload_type; const data = await deserializeData(payloadBytes, dataType, envJsonObj.correlation_id); logTrace(envJsonObj.correlation_id, `Direct transport: deserialized data type=${typeof data}, constructor=${data?.constructor?.name}`); payloadsList.push([dataname, data, dataType]); } else if (transport === 'link') { // Extract download URL from the payload const url = payloadObj.data; logTrace(envJsonObj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`); // Fetch with exponential backoff using the download handler const downloadedData = await fileserver_download_handler( url, max_retries, base_delay, max_delay, envJsonObj.correlation_id ); // Deserialize based on type const dataType = payloadObj.payload_type; const data = await deserializeData(downloadedData, dataType, envJsonObj.correlation_id); payloadsList.push([dataname, data, dataType]); } else { throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`); } } logTrace(envJsonObj.correlation_id, `smartreceive: Successfully processed all ${payloadsList.length} payloads`); envJsonObj.payloads = payloadsList; return envJsonObj; } // ---------------------------------------------- Module Exports ---------------------------------------------- // const NATSBridge = { /** * NATS client class for connection management * Supports both single-use and persistent connection modes * * @example * // Single-use connection (closes after publish) * const client = new NATSBridge.NATSClient("nats://localhost:4222"); * await NATSBridge.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client }); * await client.close(); * * // Persistent connection (keeps connection open) * const client = new NATSBridge.NATSClient("nats://localhost:4222", true); * await client.connect(); * await NATSBridge.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false }); * await NATSBridge.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id"); * // Connection remains open for more publishes * await client.close(); */ NATSClient, /** * Connection pool for managing multiple NATS connections * Useful for applications with multiple concurrent publishers * * @example * const pool = new NATSBridge.NATSConnectionPool("nats://localhost:4222", 10); * const client = await pool.acquire(); * await NATSBridge.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client }); * pool.release(client); * await pool.closeAll(); */ NATSConnectionPool, /** * Send data via NATS with automatic transport selection */ smartsend, /** * Receive and process NATS message */ smartreceive, /** * Publish message to NATS * * @example * // Using a persistent connection * const client = new NATSBridge.NATSClient("nats://localhost:4222", true); * await client.connect(); * await NATSBridge.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false); * // Connection stays open for more publishes * await client.close(); */ publishMessage, /** * Upload data to plik server in one-shot mode */ plikOneshotUpload, /** * Fetch data from URL with exponential backoff */ fetchWithBackoff, /** * Default constants */ DEFAULT_SIZE_THRESHOLD, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL }; module.exports = NATSBridge;