/** * NATSBridge - Cross-Platform Bi-Directional Data Bridge * Browser-Compatible Implementation (Client-Side Rendering) * * This module provides functionality for sending and receiving data across network boundaries * using NATS as the message bus, with support for both direct payload transport and * URL-based transport for larger payloads. * * Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" * * Browser-compatible version uses: * - nats.ws for WebSocket-based NATS connections * - Web Crypto API for UUID generation * - Uint8Array instead of Buffer * - fetch API for file server communication * * @module NATSBridgeCSR */ // Import browser-compatible NATS client import * as nats from 'nats.ws'; // Use native fetch available in browsers import { tableFromArrays, tableToIPC } from 'apache-arrow/browser'; // ---------------------------------------------- Constants ---------------------------------------------- // /** * Default size threshold for switching from direct to link transport (0.5MB) */ const DEFAULT_SIZE_THRESHOLD = 500_000; /** * Default NATS server URL (WebSocket protocol) */ const DEFAULT_BROKER_URL = 'ws://localhost:4222'; /** * Default HTTP file server URL for link transport */ const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; // ---------------------------------------------- Utility Functions ---------------------------------------------- // /** * Convert Uint8Array to Base64 string * @param {Uint8Array} data - Data to encode * @returns {string} Base64 encoded string */ function bufferToBase64(data) { const bytes = new Uint8Array(data); let binary = ''; for (let i = 0; i < bytes.length; i++) { binary += String.fromCharCode(bytes[i]); } return btoa(binary); } /** * Convert Base64 string to Uint8Array * @param {string} base64 - Base64 encoded string * @returns {Uint8Array} Decoded binary data */ function base64ToBuffer(base64) { const binary = atob(base64); const len = binary.length; const bytes = new Uint8Array(len); for (let i = 0; i < len; i++) { bytes[i] = binary.charCodeAt(i); } return bytes; } /** * Generate UUID v4 using Web Crypto API * @returns {string} UUID string */ function uuidv4() { const array = new Uint8Array(16); crypto.getRandomValues(array); array[6] = (array[6] & 0x0f) | 0x40; array[8] = (array[8] & 0x3f) | 0x80; return Array.from(array, (val) => val.toString(16).padStart(2, '0').toUpperCase()).join(''); } /** * Log a trace message with correlation ID and timestamp * @param {string} correlationId - Correlation ID for tracing * @param {string} message - Message content to log */ function logTrace(correlationId, message) { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`); } // ---------------------------------------------- Serialization Functions ---------------------------------------------- // /** * Serialize data according to specified format * @param {any} data - Data to serialize * @param {string} payloadType - Target format: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" * @returns {Uint8Array} Binary representation of the serialized data */ async function serializeData(data, payloadType) { if (payloadType === 'text') { if (typeof data === 'string') { return new Uint8Array(new TextEncoder().encode(data)); } else { throw new Error('Text data must be a string'); } } else if (payloadType === 'dictionary') { const jsonStr = JSON.stringify(data); return new Uint8Array(new TextEncoder().encode(jsonStr)); } else if (payloadType === 'arrowtable') { // Convert array of objects to Arrow IPC format if (!Array.isArray(data) || data.length === 0) { throw new Error('Arrow table data must be a non-empty array of objects'); } return serializeArrowTable(data); } else if (payloadType === 'jsontable') { // Serialize array of objects to JSON format if (!Array.isArray(data)) { throw new Error('JSON table data must be an array'); } const jsonStr = JSON.stringify(data); return new Uint8Array(new TextEncoder().encode(jsonStr)); } else if (payloadType === 'image') { if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { return new Uint8Array(data); } else { throw new Error('Image data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); } } else if (payloadType === 'audio') { if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { return new Uint8Array(data); } else { throw new Error('Audio data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); } } else if (payloadType === 'video') { if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { return new Uint8Array(data); } else { throw new Error('Video data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); } } else if (payloadType === 'binary') { if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { return new Uint8Array(data); } else { throw new Error('Binary data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); } } else { throw new Error(`Unknown payload_type: ${payloadType}`); } } /** * Helper function to properly serialize table data to Arrow IPC * @param {Array} data - Array of objects representing table rows * @returns {Uint8Array} Arrow IPC formatted buffer */ function serializeArrowTable(data) { if (!Array.isArray(data) || data.length === 0) { throw new Error('Table data must be a non-empty array of objects'); } logTrace('serializeArrowTable', `Serializing table with ${data.length} rows`); // Convert array of objects to a key-value format expected by tableFromArrays const columns = {}; const keys = Object.keys(data[0]); for (const key of keys) { columns[key] = data.map(row => row[key]); } logTrace('serializeArrowTable', `Columns: ${Object.keys(columns).join(', ')}`); const table = tableFromArrays(columns); logTrace('serializeArrowTable', `Arrow table created with ${table.numRows} rows, ${table.numCols} cols`); // Convert to IPC format const ipcBuffer = tableToIPC(table); logTrace('serializeArrowTable', `IPC buffer type: ${typeof ipcBuffer}, byteLength: ${ipcBuffer.byteLength}`); const resultBuffer = new Uint8Array(ipcBuffer); logTrace('serializeArrowTable', `Result buffer: ${resultBuffer.length} bytes`); // Debug: Show first 20 bytes in hex const hexPreview = []; for (let i = 0; i < Math.min(20, resultBuffer.length); i++) { hexPreview.push(resultBuffer[i].toString(16).padStart(2, '0')); } logTrace('serializeArrowTable', `First 20 bytes (hex): ${hexPreview.join(' ')}`); return resultBuffer; } /** * Deserialize bytes to data based on type * @param {Uint8Array|ArrayBuffer} data - Serialized data as bytes * @param {string} payloadType - Data type * @param {string} correlationId - Correlation ID for logging * @returns {any} Deserialized data */ async function deserializeData(data, payloadType, correlationId) { const buffer = data instanceof Uint8Array ? data : new Uint8Array(data); logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`); // Debug: Show first 20 bytes in hex for binary data if (payloadType === 'arrowtable' || payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') { const hexPreview = []; for (let i = 0; i < Math.min(20, buffer.length); i++) { hexPreview.push(buffer[i].toString(16).padStart(2, '0')); } logTrace(correlationId, `deserializeData: First 20 bytes (hex): ${hexPreview.join(' ')}`); } if (payloadType === 'text') { const result = new TextDecoder().decode(buffer); logTrace(correlationId, `deserializeData: text result length=${result.length}`); return result; } else if (payloadType === 'dictionary') { const jsonStr = new TextDecoder().decode(buffer); const result = JSON.parse(jsonStr); logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`); return result; } else if (payloadType === 'arrowtable') { logTrace(correlationId, `deserializeData: Attempting Arrow table deserialization`); try { // Try tableFromIPC (browser API) const table = tableFromIPC(buffer); logTrace(correlationId, `deserializeData: Arrow table from IPC - rows=${table.numRows}, cols=${table.numCols}`); return table; } catch (e) { logTrace(correlationId, `deserializeData: tableFromIPC failed: ${e.message}`); throw new Error(`Unable to deserialize Arrow table: ${e.message}`); } } else if (payloadType === 'jsontable') { const jsonStr = new TextDecoder().decode(buffer); const result = JSON.parse(jsonStr); logTrace(correlationId, `deserializeData: jsontable result length=${Array.isArray(result) ? result.length : 'N/A'}`); return result; } else if (payloadType === 'image') { logTrace(correlationId, `deserializeData: image buffer length=${buffer.length}`); return buffer; } else if (payloadType === 'audio') { logTrace(correlationId, `deserializeData: audio buffer length=${buffer.length}`); return buffer; } else if (payloadType === 'video') { logTrace(correlationId, `deserializeData: video buffer length=${buffer.length}`); return buffer; } else if (payloadType === 'binary') { logTrace(correlationId, `deserializeData: binary buffer length=${buffer.length}`); return buffer; } else { throw new Error(`Unknown payload_type: ${payloadType}`); } } // ---------------------------------------------- File Server Handlers ---------------------------------------------- // /** * Upload data to plik server in one-shot mode * @param {string} fileServerUrl - Base URL of the plik server * @param {string} dataname - Name of the file being uploaded * @param {Uint8Array} data - Raw byte data of the file content * @returns {Promise<{status: number, uploadid: string, fileid: string, url: string}>} */ async function plikOneshotUpload(fileServerUrl, dataname, data) { const buffer = data instanceof Uint8Array ? data : new Uint8Array(data); // Get upload id const urlGetUploadID = `${fileServerUrl}/upload`; const headers = { 'Content-Type': 'application/json' }; const body = JSON.stringify({ OneShot: true }); const httpResponse = await fetch(urlGetUploadID, { method: 'POST', headers, body }); const responseJson = await httpResponse.json(); const uploadid = responseJson.id; const uploadtoken = responseJson.uploadToken; // Upload file const urlUpload = `${fileServerUrl}/file/${uploadid}`; const form = new FormData(); const blob = new Blob([buffer], { type: 'application/octet-stream' }); form.append('file', blob, dataname); const uploadHeaders = { 'X-UploadToken': uploadtoken }; const uploadResponse = await fetch(urlUpload, { method: 'POST', headers: uploadHeaders, body: form }); const uploadJson = await uploadResponse.json(); const fileid = uploadJson.id; const url = `${fileServerUrl}/file/${uploadid}/${fileid}/${dataname}`; return { status: uploadResponse.status, uploadid, fileid, url }; } /** * Fetch data from URL with exponential backoff * @param {string} url - URL to fetch from * @param {number} maxRetries - Maximum number of retry attempts * @param {number} baseDelay - Initial delay in milliseconds * @param {number} maxDelay - Maximum delay in milliseconds * @param {string} correlationId - Correlation ID for logging * @returns {Promise} Fetched data as bytes */ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) { let delay = baseDelay; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { const response = await fetch(url); if (response.status === 200) { logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`); const arrayBuffer = await response.arrayBuffer(); return new Uint8Array(arrayBuffer); } else { throw new Error(`Failed to fetch: ${response.status}`); } } catch (e) { logTrace(correlationId, `Attempt ${attempt} failed: ${e.constructor.name} - ${e.message}`); if (attempt < maxRetries) { await new Promise(resolve => setTimeout(resolve, delay)); delay = Math.min(delay * 2, maxDelay); } } } throw new Error(`Failed to fetch data after ${maxRetries} attempts`); } // ---------------------------------------------- NATS Client ---------------------------------------------- // /** * NATS client wrapper for connection management */ class NATSClient { /** * Create a new NATS client * @param {string} url - NATS server URL (ws:// or wss://) */ constructor(url) { this.url = url; this.connection = null; } /** * Connect to NATS server * @returns {Promise} */ async connect() { this.connection = await nats.connect({ servers: this.url }); return this.connection; } /** * Publish message to NATS subject * @param {string} subject - NATS subject to publish to * @param {string} message - Message to publish * @param {string} correlationId - Correlation ID for logging */ async publish(subject, message, correlationId) { if (!this.connection) { await this.connect(); } await this.connection.publish(subject, message); logTrace(correlationId, `Message published to ${subject}`); } /** * Close the NATS connection */ async close() { if (this.connection) { this.connection.close(); } } } // ---------------------------------------------- Core Functions ---------------------------------------------- // /** * Publish message to NATS * @param {string|NATSClient|NATS.Connection} brokerUrlOrClient - NATS URL, client, or connection * @param {string} subject - NATS subject to publish to * @param {string} message - JSON message to publish * @param {string} correlationId - Correlation ID for tracing */ async function publishMessage(brokerUrlOrClient, subject, message, correlationId) { let conn; if (brokerUrlOrClient instanceof NATSClient) { conn = brokerUrlOrClient; } else if (brokerUrlOrClient && typeof brokerUrlOrClient.publish === 'function') { // Create a wrapper for direct connection (duck-typing check for NATS connection) conn = { async publish(subj, msg) { await brokerUrlOrClient.publish(subj, msg); }, async close() { await brokerUrlOrClient.close(); } }; } else { // String URL - create new client const client = new NATSClient(brokerUrlOrClient); conn = client; } await conn.publish(subject, message, correlationId); if (conn instanceof NATSClient) { await conn.close(); } } /** * Build message envelope from payloads and metadata * @param {string} subject - NATS subject * @param {Array} payloads - Array of payload objects * @param {Object} options - Envelope metadata options * @returns {Object} Envelope object */ function buildEnvelope(subject, payloads, options) { return { correlation_id: options.correlation_id, msg_id: options.msg_id, timestamp: new Date().toISOString(), send_to: subject, msg_purpose: options.msg_purpose, sender_name: options.sender_name, sender_id: options.sender_id, receiver_name: options.receiver_name, receiver_id: options.receiver_id, reply_to: options.reply_to, reply_to_msg_id: options.reply_to_msg_id, broker_url: options.broker_url, metadata: options.metadata || {}, payloads: payloads }; } /** * Build payload object from serialized data * @param {string} dataname - Name of the payload * @param {string} payloadType - Type of the payload * @param {Uint8Array} payloadBytes - Serialized payload bytes * @param {string} transport - Transport type ("direct" or "link") * @param {string} data - Data (base64 for direct, URL for link) * @returns {Object} Payload object */ function buildPayload(dataname, payloadType, payloadBytes, transport, data) { // Determine encoding based on payload type (matching Julia implementation) let encoding = 'base64'; if (payloadType === 'jsontable') { encoding = 'json'; } else if (payloadType === 'arrowtable') { encoding = 'arrow-ipc'; } return { id: uuidv4(), dataname, payload_type: payloadType, transport, encoding, size: payloadBytes.byteLength, data, metadata: transport === 'direct' ? { payload_bytes: payloadBytes.byteLength } : {} }; } /** * Send data via NATS with automatic transport selection * * This function intelligently routes data delivery based on payload size. * If the serialized payload is smaller than size_threshold, it encodes the data as Base64 * and publishes directly over NATS. Otherwise, it uploads the data to a fileserver * and publishes only the download URL over NATS. * * @param {string} subject - NATS subject to publish the message to * @param {Array} data - List of [dataname, data, type] tuples to send * - type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" * @param {Object} options - Optional configuration * @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server (WebSocket) * @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server * @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads * @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport * @param {string} [options.correlation_id=uuidv4()] - Correlation ID for tracing * @param {string} [options.msg_purpose="chat"] - Purpose of the message * @param {string} [options.sender_name="NATSBridge"] - Name of the sender * @param {string} [options.receiver_name=""] - Name of the receiver (empty means broadcast) * @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast) * @param {string} [options.reply_to=""] - Topic to reply to * @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to * @param {boolean} [options.is_publish=true] - Whether to automatically publish the message * @param {NATSClient|NATS.Connection} [options.nats_connection=null] - Pre-existing NATS connection * @param {string} [options.msg_id=uuidv4()] - Message ID * @param {string} [options.sender_id=uuidv4()] - Sender ID * @returns {Promise<[Object, string]>} Tuple of [env, env_json_str] * * @example * // Send a single payload * const [env, envJsonStr] = await NATSBridgeCSR.smartsend( * "/test", * [["dataname1", data1, "dictionary"]], * { broker_url: "ws://localhost:4222" } * ); * * // Send multiple payloads * const [env, envJsonStr] = await NATSBridgeCSR.smartsend( * "/test", * [ * ["dataname1", data1, "dictionary"], * ["dataname2", data2, "arrowtable"] * ], * { broker_url: "ws://localhost:4222" } * ); */ async function smartsend(subject, data, options = {}) { const { broker_url = DEFAULT_BROKER_URL, fileserver_url = DEFAULT_FILESERVER_URL, fileserver_upload_handler = plikOneshotUpload, size_threshold = DEFAULT_SIZE_THRESHOLD, correlation_id = uuidv4(), msg_purpose = 'chat', sender_name = 'NATSBridge', receiver_name = '', receiver_id = '', reply_to = '', reply_to_msg_id = '', is_publish = true, nats_connection = null, msg_id = uuidv4(), sender_id = uuidv4() } = options; logTrace(correlation_id, `Starting smartsend for subject: ${subject}`); logTrace(correlation_id, `smartsend: data array length=${data.length}`); // Debug: Log input data structure for (let i = 0; i < data.length; i++) { const [dataname, payloadData, payloadType] = data[i]; logTrace(correlation_id, `smartsend: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`); } // Process payloads const payloads = []; for (const [dataname, payloadData, payloadType] of data) { logTrace(correlation_id, `smartsend: Processing payload '${dataname}' type=${payloadType}`); logTrace(correlation_id, `smartsend: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`); const payloadBytes = await serializeData(payloadData, payloadType); const payloadSize = payloadBytes.byteLength; logTrace(correlation_id, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); // Debug: Show first 20 bytes of serialized data for table type if (payloadType === 'table') { const hexPreview = []; for (let i = 0; i < Math.min(20, payloadBytes.length); i++) { hexPreview.push(payloadBytes[i].toString(16).padStart(2, '0')); } logTrace(correlation_id, `Serialized table data first 20 bytes (hex): ${hexPreview.join(' ')}`); } if (payloadSize < size_threshold) { // Direct path const payloadB64 = bufferToBase64(payloadBytes); logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes, base64 length=${payloadB64.length}`); const payload = buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64); payloads.push(payload); } else { // Link path logTrace(correlation_id, `Using link transport, uploading to fileserver`); const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); if (response.status !== 200) { throw new Error(`Failed to upload data to fileserver: ${response.status}`); } logTrace(correlation_id, `Uploaded to URL: ${response.url}`); const payload = buildPayload(dataname, payloadType, payloadBytes, 'link', response.url); payloads.push(payload); } } // Build envelope const env = buildEnvelope(subject, payloads, { correlation_id, msg_id, msg_purpose, sender_name, sender_id, receiver_name, receiver_id, reply_to, reply_to_msg_id, broker_url }); const env_json_str = JSON.stringify(env); if (is_publish) { if (nats_connection) { await publishMessage(nats_connection, subject, env_json_str, correlation_id); } else { await publishMessage(broker_url, subject, env_json_str, correlation_id); } } return [env, env_json_str]; } /** * Receive and process NATS message * * This function processes incoming NATS messages, handling both direct transport * (base64 decoded payloads) and link transport (URL-based payloads). * It deserializes the data based on the transport type and returns the result. * * @param {Object} msg - NATS message object with payload property * @param {Object} options - Optional configuration * @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads * @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL * @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms * @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms * @returns {Promise} Envelope object with processed payloads * * @example * // Receive and process message * const env = await NATSBridgeCSR.smartreceive(msg, { * fileserver_download_handler: NATSBridgeCSR.fetchWithBackoff, * max_retries: 5, * base_delay: 100, * max_delay: 5000 * }); * // env.payloads is an Array of [dataname, data, type] arrays * for (const [dataname, data, type] of env.payloads) { * console.log(`${dataname}: ${data} (type: ${type})`); * } */ async function smartreceive(msg, options = {}) { const { fileserver_download_handler = fetchWithBackoff, max_retries = 5, base_delay = 100, max_delay = 5000 } = options; // Debug: Log message object structure logTrace('smartreceive', `smartreceive: msg object keys: ${Object.keys(msg).join(', ')}`); logTrace('smartreceive', `smartreceive: msg.data type: ${typeof msg.data}, constructor: ${msg.data?.constructor?.name}`); logTrace('smartreceive', `smartreceive: msg.payload type: ${typeof msg.payload}, constructor: ${msg.payload?.constructor?.name}`); // Parse the JSON envelope // NATS.js v2.x uses msg.data instead of msg.payload let payload; if (msg.data !== undefined) { payload = typeof msg.data === 'string' ? msg.data : new TextDecoder().decode(msg.data); } else if (msg.payload !== undefined) { payload = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload); } else { throw new Error('Message has neither data nor payload property'); } logTrace('smartreceive', `smartreceive: raw payload length=${payload.length}`); // Debug: Show first 200 chars of payload const payloadPreview = payload.substring(0, 200); logTrace('smartreceive', `smartreceive: payload preview: ${payloadPreview}`); let envJsonObj; try { envJsonObj = JSON.parse(payload); } catch (e) { logTrace('smartreceive', `smartreceive: JSON parse failed: ${e.message}`); throw e; } logTrace(envJsonObj.correlation_id, 'Processing received message'); logTrace(envJsonObj.correlation_id, `smartreceive: envelope has ${envJsonObj.payloads.length} payloads`); // Process all payloads in the envelope const payloadsList = []; const numPayloads = envJsonObj.payloads.length; logTrace(envJsonObj.correlation_id, `smartreceive: Processing ${numPayloads} payloads`); for (let i = 0; i < numPayloads; i++) { const payloadObj = envJsonObj.payloads[i]; const transport = payloadObj.transport; const dataname = payloadObj.dataname; const payloadType = payloadObj.payload_type; logTrace(envJsonObj.correlation_id, `smartreceive: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`); if (transport === 'direct') { logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`); // Extract base64 payload from the payload const payloadB64 = payloadObj.data; logTrace(envJsonObj.correlation_id, `Direct transport: base64 length=${payloadB64?.length}`); // Decode Base64 payload const payloadBytes = base64ToBuffer(payloadB64); logTrace(envJsonObj.correlation_id, `Direct transport: decoded bytes=${payloadBytes.length}`); // Deserialize based on type const dataType = payloadObj.payload_type; const data = await deserializeData(payloadBytes, dataType, envJsonObj.correlation_id); logTrace(envJsonObj.correlation_id, `Direct transport: deserialized data type=${typeof data}, constructor=${data?.constructor?.name}`); payloadsList.push([dataname, data, dataType]); } else if (transport === 'link') { // Extract download URL from the payload const url = payloadObj.data; logTrace(envJsonObj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`); // Fetch with exponential backoff using the download handler const downloadedData = await fileserver_download_handler( url, max_retries, base_delay, max_delay, envJsonObj.correlation_id ); // Deserialize based on type const dataType = payloadObj.payload_type; const data = await deserializeData(downloadedData, dataType, envJsonObj.correlation_id); payloadsList.push([dataname, data, dataType]); } else { throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`); } } logTrace(envJsonObj.correlation_id, `smartreceive: Successfully processed all ${payloadsList.length} payloads`); envJsonObj.payloads = payloadsList; return envJsonObj; } // ---------------------------------------------- Module Exports ---------------------------------------------- // const NATSBridgeCSR = { /** * NATS client class for connection management */ NATSClient, /** * Send data via NATS with automatic transport selection */ smartsend, /** * Receive and process NATS message */ smartreceive, /** * Upload data to plik server in one-shot mode */ plikOneshotUpload, /** * Fetch data from URL with exponential backoff */ fetchWithBackoff, /** * Default constants */ DEFAULT_SIZE_THRESHOLD, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL }; export default NATSBridgeCSR;