/** * NATSBridge - Cross-Platform Bi-Directional Data Bridge * JavaScript/Node.js Implementation * * This module provides functionality for sending and receiving data across network boundaries * using NATS as the message bus, with support for both direct payload transport and * URL-based transport for larger payloads. * * @module NATSBridge */ const nats = require('nats'); const { v4: uuidv4 } = require('uuid'); const fetch = require('node-fetch'); const arrow = require('apache-arrow'); // ---------------------------------------------- Constants ---------------------------------------------- // /** * Default size threshold for switching from direct to link transport (1MB) */ const DEFAULT_SIZE_THRESHOLD = 1_000_000; /** * Default NATS server URL */ const DEFAULT_BROKER_URL = 'nats://localhost:4222'; /** * Default HTTP file server URL for link transport */ const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; // ---------------------------------------------- Utility Functions ---------------------------------------------- // /** * Convert Buffer to Base64 string * @param {Buffer} buffer - Buffer to encode * @returns {string} Base64 encoded string */ function bufferToBase64(buffer) { return buffer.toString('base64'); } /** * Log a trace message with correlation ID and timestamp * @param {string} correlationId - Correlation ID for tracing * @param {string} message - Message content to log */ function logTrace(correlationId, message) { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`); } // ---------------------------------------------- Serialization Functions ---------------------------------------------- // /** * Serialize data according to specified format * @param {any} data - Data to serialize * @param {string} payloadType - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary" * @returns {Buffer} Binary representation of the serialized data */ async function serializeData(data, payloadType) { if (payloadType === 'text') { if (typeof data === 'string') { return Buffer.from(data, 'utf8'); } else { throw new Error('Text data must be a string'); } } else if (payloadType === 'dictionary') { const jsonStr = JSON.stringify(data); return Buffer.from(jsonStr, 'utf8'); } else if (payloadType === 'table') { // Convert array of objects to Arrow IPC format if (!Array.isArray(data) || data.length === 0) { throw new Error('Table data must be a non-empty array of objects'); } return serializeArrowTable(data); } else if (payloadType === 'image') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Image data must be Uint8Array or Buffer'); } } else if (payloadType === 'audio') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Audio data must be Uint8Array or Buffer'); } } else if (payloadType === 'video') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Video data must be Uint8Array or Buffer'); } } else if (payloadType === 'binary') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Binary data must be Uint8Array or Buffer'); } } else { throw new Error(`Unknown payload_type: ${payloadType}`); } } /** * Helper function to properly serialize table data to Arrow IPC * @param {Array} data - Array of objects representing table rows * @returns {Buffer} Arrow IPC formatted buffer */ function serializeArrowTable(data) { if (!Array.isArray(data) || data.length === 0) { throw new Error('Table data must be a non-empty array of objects'); } // Build schema from first row const fields = Object.keys(data[0]).map(key => { const value = data[0][key]; let arrowType; if (typeof value === 'number') { arrowType = Number.isInteger(value) ? arrow.Int64 : arrow.Float64; } else if (typeof value === 'boolean') { arrowType = arrow.Boolean; } else if (value instanceof Date) { arrowType = arrow.Date; } else { arrowType = arrow.Utf8; } return new arrow.Field(key, arrowType, true); }); const schema = new arrow.Schema(fields); const batches = []; // Create record batches for (const row of data) { const batch = arrow.recordBatch.fromObjects([row], schema); batches.push(batch); } // Write to buffer using IPC format const buffers = arrow.ipc.recordBatchesToMessage(batches, schema).buffers; const combined = new Uint8Array(buffers.reduce((acc, b) => acc + b.byteLength, 0)); let offset = 0; for (const buf of buffers) { combined.set(new Uint8Array(buf), offset); offset += buf.byteLength; } return Buffer.from(combined); } /** * Deserialize bytes to data based on type * @param {Buffer|Uint8Array} data - Serialized data as bytes * @param {string} payloadType - Data type * @param {string} correlationId - Correlation ID for logging * @returns {any} Deserialized data */ async function deserializeData(data, payloadType, correlationId) { const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data); if (payloadType === 'text') { return buffer.toString('utf8'); } else if (payloadType === 'dictionary') { const jsonStr = buffer.toString('utf8'); return JSON.parse(jsonStr); } else if (payloadType === 'table') { const table = arrow.tableFromRawBytes(buffer); return table; } else if (payloadType === 'image') { return buffer; } else if (payloadType === 'audio') { return buffer; } else if (payloadType === 'video') { return buffer; } else if (payloadType === 'binary') { return buffer; } else { throw new Error(`Unknown payload_type: ${payloadType}`); } } // ---------------------------------------------- File Server Handlers ---------------------------------------------- // /** * Upload data to plik server in one-shot mode * @param {string} fileServerUrl - Base URL of the plik server * @param {string} dataname - Name of the file being uploaded * @param {Buffer|Uint8Array} data - Raw byte data of the file content * @returns {Promise<{status: number, uploadid: string, fileid: string, url: string}>} */ async function plikOneshotUpload(fileServerUrl, dataname, data) { const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data); // Get upload id const urlGetUploadID = `${fileServerUrl}/upload`; const headers = { 'Content-Type': 'application/json' }; const body = JSON.stringify({ OneShot: true }); const httpResponse = await fetch(urlGetUploadID, { method: 'POST', headers, body }); const responseJson = await httpResponse.json(); const uploadid = responseJson.id; const uploadtoken = responseJson.uploadToken; // Upload file const urlUpload = `${fileServerUrl}/file/${uploadid}`; const form = new FormData(); const blob = new Blob([buffer], { type: 'application/octet-stream' }); form.append('file', blob, dataname); const uploadHeaders = { 'X-UploadToken': uploadtoken }; const uploadResponse = await fetch(urlUpload, { method: 'POST', headers: uploadHeaders, body: form }); const uploadJson = await uploadResponse.json(); const fileid = uploadJson.id; const url = `${fileServerUrl}/file/${uploadid}/${fileid}/${dataname}`; return { status: uploadResponse.status, uploadid, fileid, url }; } /** * Fetch data from URL with exponential backoff * @param {string} url - URL to fetch from * @param {number} maxRetries - Maximum number of retry attempts * @param {number} baseDelay - Initial delay in milliseconds * @param {number} maxDelay - Maximum delay in milliseconds * @param {string} correlationId - Correlation ID for logging * @returns {Promise} Fetched data as bytes */ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) { let delay = baseDelay; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { const response = await fetch(url); if (response.status === 200) { logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`); const arrayBuffer = await response.arrayBuffer(); return new Uint8Array(arrayBuffer); } else { throw new Error(`Failed to fetch: ${response.status}`); } } catch (e) { logTrace(correlationId, `Attempt ${attempt} failed: ${e.constructor.name}`); if (attempt < maxRetries) { await new Promise(resolve => setTimeout(resolve, delay)); delay = Math.min(delay * 2, maxDelay); } } } throw new Error(`Failed to fetch data after ${maxRetries} attempts`); } // ---------------------------------------------- NATS Client ---------------------------------------------- // /** * NATS client wrapper for connection management */ class NATSClient { /** * Create a new NATS client * @param {string} url - NATS server URL */ constructor(url) { this.url = url; this.connection = null; } /** * Connect to NATS server * @returns {Promise} */ async connect() { this.connection = await nats.connect({ servers: this.url }); return this.connection; } /** * Publish message to NATS subject * @param {string} subject - NATS subject to publish to * @param {string} message - Message to publish * @param {string} correlationId - Correlation ID for logging */ async publish(subject, message, correlationId) { if (!this.connection) { await this.connect(); } await this.connection.publish(subject, message); logTrace(correlationId, `Message published to ${subject}`); } /** * Close the NATS connection */ async close() { if (this.connection) { this.connection.close(); } } } // ---------------------------------------------- Core Functions ---------------------------------------------- // /** * Publish message to NATS * @param {string|NATSClient|NATS.Connection} brokerUrlOrClient - NATS URL, client, or connection * @param {string} subject - NATS subject to publish to * @param {string} message - JSON message to publish * @param {string} correlationId - Correlation ID for tracing */ async function publishMessage(brokerUrlOrClient, subject, message, correlationId) { let conn; if (brokerUrlOrClient instanceof NATSClient) { conn = brokerUrlOrClient; } else if (brokerUrlOrClient instanceof nats.Connection) { // Create a wrapper for direct connection conn = { async publish(subj, msg) { await brokerUrlOrClient.publish(subj, msg); }, async close() { await brokerUrlOrClient.close(); } }; } else { // String URL - create new client const client = new NATSClient(brokerUrlOrClient); conn = client; } await conn.publish(subject, message, correlationId); if (conn instanceof NATSClient) { await conn.close(); } } /** * Build message envelope from payloads and metadata * @param {string} subject - NATS subject * @param {Array} payloads - Array of payload objects * @param {Object} options - Envelope metadata options * @returns {Object} Envelope object */ function buildEnvelope(subject, payloads, options) { return { correlation_id: options.correlation_id, msg_id: options.msg_id, timestamp: new Date().toISOString(), send_to: subject, msg_purpose: options.msg_purpose, sender_name: options.sender_name, sender_id: options.sender_id, receiver_name: options.receiver_name, receiver_id: options.receiver_id, reply_to: options.reply_to, reply_to_msg_id: options.reply_to_msg_id, broker_url: options.broker_url, metadata: options.metadata || {}, payloads: payloads }; } /** * Build payload object from serialized data * @param {string} dataname - Name of the payload * @param {string} payloadType - Type of the payload * @param {Buffer} payloadBytes - Serialized payload bytes * @param {string} transport - Transport type ("direct" or "link") * @param {string} data - Data (base64 for direct, URL for link) * @returns {Object} Payload object */ function buildPayload(dataname, payloadType, payloadBytes, transport, data) { return { id: uuidv4(), dataname, payload_type: payloadType, transport, encoding: transport === 'direct' ? 'base64' : 'none', size: payloadBytes.byteLength, data, metadata: transport === 'direct' ? { payload_bytes: payloadBytes.byteLength } : {} }; } /** * Send data via NATS with automatic transport selection * * This function intelligently routes data delivery based on payload size. * If the serialized payload is smaller than size_threshold, it encodes the data as Base64 * and publishes directly over NATS. Otherwise, it uploads the data to a fileserver * and publishes only the download URL over NATS. * * @param {string} subject - NATS subject to publish the message to * @param {Array} data - List of [dataname, data, type] tuples to send * @param {Object} options - Optional configuration * @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server * @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server * @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads * @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport * @param {string} [options.correlation_id=uuidv4()] - Correlation ID for tracing * @param {string} [options.msg_purpose="chat"] - Purpose of the message * @param {string} [options.sender_name="NATSBridge"] - Name of the sender * @param {string} [options.receiver_name=""] - Name of the receiver (empty means broadcast) * @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast) * @param {string} [options.reply_to=""] - Topic to reply to * @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to * @param {boolean} [options.is_publish=true] - Whether to automatically publish the message * @param {NATSClient|NATS.Connection} [options.nats_connection=null] - Pre-existing NATS connection * @param {string} [options.msg_id=uuidv4()] - Message ID * @param {string} [options.sender_id=uuidv4()] - Sender ID * @returns {Promise<[Object, string]>} Tuple of [env, env_json_str] * * @example * // Send a single payload * const [env, envJsonStr] = await smartsend( * "/test", * [["dataname1", data1, "dictionary"]], * { broker_url: "nats://localhost:4222" } * ); * * // Send multiple payloads * const [env, envJsonStr] = await smartsend( * "/test", * [ * ["dataname1", data1, "dictionary"], * ["dataname2", data2, "table"] * ], * { broker_url: "nats://localhost:4222" } * ); * * // Send with pre-existing connection * const client = await NATSBridge.NATSClient.connect("nats://localhost:4222"); * const [env, envJsonStr] = await smartsend( * "/test", * [["data", myData, "text"]], * { nats_connection: client } * ); */ async function smartsend(subject, data, options = {}) { const { broker_url = DEFAULT_BROKER_URL, fileserver_url = DEFAULT_FILESERVER_URL, fileserver_upload_handler = plikOneshotUpload, size_threshold = DEFAULT_SIZE_THRESHOLD, correlation_id = uuidv4(), msg_purpose = 'chat', sender_name = 'NATSBridge', receiver_name = '', receiver_id = '', reply_to = '', reply_to_msg_id = '', is_publish = true, nats_connection = null, msg_id = uuidv4(), sender_id = uuidv4() } = options; logTrace(correlation_id, `Starting smartsend for subject: ${subject}`); // Process payloads const payloads = []; for (const [dataname, payloadData, payloadType] of data) { const payloadBytes = await serializeData(payloadData, payloadType); const payloadSize = payloadBytes.byteLength; logTrace(correlation_id, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); if (payloadSize < size_threshold) { // Direct path const payloadB64 = bufferToBase64(payloadBytes); logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes`); const payload = buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64); payloads.push(payload); } else { // Link path logTrace(correlation_id, `Using link transport, uploading to fileserver`); const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); if (response.status !== 200) { throw new Error(`Failed to upload data to fileserver: ${response.status}`); } logTrace(correlation_id, `Uploaded to URL: ${response.url}`); const payload = buildPayload(dataname, payloadType, payloadBytes, 'link', response.url); payloads.push(payload); } } // Build envelope const env = buildEnvelope(subject, payloads, { correlation_id, msg_id, msg_purpose, sender_name, sender_id, receiver_name, receiver_id, reply_to, reply_to_msg_id, broker_url }); const env_json_str = JSON.stringify(env); if (is_publish) { if (nats_connection) { await publishMessage(nats_connection, subject, env_json_str, correlation_id); } else { await publishMessage(broker_url, subject, env_json_str, correlation_id); } } return [env, env_json_str]; } /** * Receive and process NATS message * * This function processes incoming NATS messages, handling both direct transport * (base64 decoded payloads) and link transport (URL-based payloads). * It deserializes the data based on the transport type and returns the result. * * @param {Object} msg - NATS message object with payload property * @param {Object} options - Optional configuration * @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads * @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL * @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms * @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms * @returns {Promise} Envelope object with processed payloads * * @example * // Receive and process message * const env = await smartreceive(msg, { * fileserver_download_handler: fetchWithBackoff, * max_retries: 5, * base_delay: 100, * max_delay: 5000 * }); * // env.payloads is an Array of [dataname, data, type] arrays * for (const [dataname, data, type] of env.payloads) { * console.log(`${dataname}: ${data} (type: ${type})`); * } */ async function smartreceive(msg, options = {}) { const { fileserver_download_handler = fetchWithBackoff, max_retries = 5, base_delay = 100, max_delay = 5000 } = options; // Parse the JSON envelope const payload = typeof msg.payload === 'string' ? msg.payload : Buffer.from(msg.payload).toString('utf8'); const envJsonObj = JSON.parse(payload); logTrace(envJsonObj.correlation_id, 'Processing received message'); // Process all payloads in the envelope const payloadsList = []; const numPayloads = envJsonObj.payloads.length; for (let i = 0; i < numPayloads; i++) { const payloadObj = envJsonObj.payloads[i]; const transport = payloadObj.transport; const dataname = payloadObj.dataname; if (transport === 'direct') { logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`); // Extract base64 payload from the payload const payloadB64 = payloadObj.data; // Decode Base64 payload const payloadBytes = Buffer.from(payloadB64, 'base64'); // Deserialize based on type const dataType = payloadObj.payload_type; const data = await deserializeData(payloadBytes, dataType, envJsonObj.correlation_id); payloadsList.push([dataname, data, dataType]); } else if (transport === 'link') { // Extract download URL from the payload const url = payloadObj.data; logTrace(envJsonObj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`); // Fetch with exponential backoff using the download handler const downloadedData = await fileserver_download_handler( url, max_retries, base_delay, max_delay, envJsonObj.correlation_id ); // Deserialize based on type const dataType = payloadObj.payload_type; const data = await deserializeData(downloadedData, dataType, envJsonObj.correlation_id); payloadsList.push([dataname, data, dataType]); } else { throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`); } } envJsonObj.payloads = payloadsList; return envJsonObj; } // ---------------------------------------------- Module Exports ---------------------------------------------- // const NATSBridge = { /** * NATS client class for connection management */ NATSClient, /** * Send data via NATS with automatic transport selection */ smartsend, /** * Receive and process NATS message */ smartreceive, /** * Upload data to plik server in one-shot mode */ plikOneshotUpload, /** * Fetch data from URL with exponential backoff */ fetchWithBackoff, /** * Default constants */ DEFAULT_SIZE_THRESHOLD, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL }; module.exports = NATSBridge;