diff --git a/README.md b/README.md index 1fdf508..b7f25ec 100644 --- a/README.md +++ b/README.md @@ -45,22 +45,24 @@ NATSBridge enables seamless communication across multiple platforms through NATS | Platform | Implementation | Features | |----------|----------------|----------| | **Julia** | [`src/NATSBridge.jl`](src/NATSBridge.jl) | Full feature set, Arrow IPC, multiple dispatch | -| **JavaScript** | [`src/natsbridge.js`](src/natsbridge.js) | Node.js & browser, async/await | +| **JavaScript** | [`src/natsbridge.js`](src/natsbridge.js) | Node.js, async/await | +| **JavaScript (Browser)** | [`src/natsbridge_csr.js`](src/natsbridge_csr.js) | Browser, WebSocket NATS, async/await | | **Python** | [`src/natsbridge.py`](src/natsbridge.py) | Desktop Python, asyncio, type hints | | **MicroPython** | [`src/natsbridge_mpy.py`](src/natsbridge_mpy.py) | Memory-constrained, synchronous API | ### Platform Comparison -| Feature | Julia | JavaScript | Python | MicroPython | -|---------|-------|------------|--------|-------------| -| Multiple Dispatch | ✅ Native | ❌ | ❌ | ❌ | -| Async/Await | ❌ | ✅ Native | ✅ Native | ⚠️ (uasyncio) | -| Type Safety | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ | -| Arrow IPC | ✅ Native | ✅ | ✅ | ❌ | -| Direct Transport | ✅ | ✅ | ✅ | ✅ | -| Link Transport | ✅ | ✅ | ✅ | ⚠️ (Limited) | -| Handler Functions | ✅ | ✅ | ✅ | ✅ | -| Cross-Platform API | ✅ | ✅ | ✅ | ✅ | +| Feature | Julia | JavaScript | JavaScript (Browser) | Python | MicroPython | +|---------|-------|------------|----------------------|--------|-------------| +| Multiple Dispatch | ✅ Native | ❌ | ❌ | ❌ | ❌ | +| Async/Await | ❌ | ✅ Native | ✅ Native | ✅ Native | ⚠️ (uasyncio) | +| Type Safety | ✅ Strong | ⚠️ (TypeScript) | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ | +| Arrow IPC | ✅ Native | ✅ | ✅ | ✅ | ❌ | +| Direct Transport | ✅ | ✅ | ✅ | ✅ | ✅ | +| Link Transport | ✅ | ✅ | ✅ | ✅ | ⚠️ (Limited) | +| Handler Functions | ✅ | ✅ | ✅ | ✅ | ✅ | +| Cross-Platform API | ✅ | ✅ | ✅ | ✅ | ✅ | +| WebSocket NATS | ❌ | ❌ | ✅ | ❌ | ❌ | --- diff --git a/src/natsbridge_csr.js b/src/natsbridge_csr.js new file mode 100644 index 0000000..b8026d3 --- /dev/null +++ b/src/natsbridge_csr.js @@ -0,0 +1,808 @@ +/** + * NATSBridge - Cross-Platform Bi-Directional Data Bridge + * Browser-Compatible Implementation (Client-Side Rendering) + * + * This module provides functionality for sending and receiving data across network boundaries + * using NATS as the message bus, with support for both direct payload transport and + * URL-based transport for larger payloads. + * + * Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" + * + * Browser-compatible version uses: + * - nats.ws for WebSocket-based NATS connections + * - Web Crypto API for UUID generation + * - Uint8Array instead of Buffer + * - fetch API for file server communication + * + * @module NATSBridgeCSR + */ + +// Import browser-compatible NATS client +import * as nats from 'nats.ws'; + +// Use native fetch available in browsers +import { tableFromArrays, tableToIPC } from 'apache-arrow/browser'; + +// ---------------------------------------------- Constants ---------------------------------------------- // + +/** + * Default size threshold for switching from direct to link transport (0.5MB) + */ +const DEFAULT_SIZE_THRESHOLD = 500_000; + +/** + * Default NATS server URL (WebSocket protocol) + */ +const DEFAULT_BROKER_URL = 'ws://localhost:4222'; + +/** + * Default HTTP file server URL for link transport + */ +const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; + +// ---------------------------------------------- Utility Functions ---------------------------------------------- // + +/** + * Convert Uint8Array to Base64 string + * @param {Uint8Array} data - Data to encode + * @returns {string} Base64 encoded string + */ +function bufferToBase64(data) { + const bytes = new Uint8Array(data); + let binary = ''; + for (let i = 0; i < bytes.length; i++) { + binary += String.fromCharCode(bytes[i]); + } + return btoa(binary); +} + +/** + * Convert Base64 string to Uint8Array + * @param {string} base64 - Base64 encoded string + * @returns {Uint8Array} Decoded binary data + */ +function base64ToBuffer(base64) { + const binary = atob(base64); + const len = binary.length; + const bytes = new Uint8Array(len); + for (let i = 0; i < len; i++) { + bytes[i] = binary.charCodeAt(i); + } + return bytes; +} + +/** + * Generate UUID v4 using Web Crypto API + * @returns {string} UUID string + */ +function uuidv4() { + const array = new Uint8Array(16); + crypto.getRandomValues(array); + array[6] = (array[6] & 0x0f) | 0x40; + array[8] = (array[8] & 0x3f) | 0x80; + return Array.from(array, (val) => val.toString(16).padStart(2, '0').toUpperCase()).join(''); +} + +/** + * Log a trace message with correlation ID and timestamp + * @param {string} correlationId - Correlation ID for tracing + * @param {string} message - Message content to log + */ +function logTrace(correlationId, message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`); +} + +// ---------------------------------------------- Serialization Functions ---------------------------------------------- // + +/** + * Serialize data according to specified format + * @param {any} data - Data to serialize + * @param {string} payloadType - Target format: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" + * @returns {Uint8Array} Binary representation of the serialized data + */ +async function serializeData(data, payloadType) { + if (payloadType === 'text') { + if (typeof data === 'string') { + return new Uint8Array(new TextEncoder().encode(data)); + } else { + throw new Error('Text data must be a string'); + } + } else if (payloadType === 'dictionary') { + const jsonStr = JSON.stringify(data); + return new Uint8Array(new TextEncoder().encode(jsonStr)); + } else if (payloadType === 'arrowtable') { + // Convert array of objects to Arrow IPC format + if (!Array.isArray(data) || data.length === 0) { + throw new Error('Arrow table data must be a non-empty array of objects'); + } + + return serializeArrowTable(data); + } else if (payloadType === 'jsontable') { + // Serialize array of objects to JSON format + if (!Array.isArray(data)) { + throw new Error('JSON table data must be an array'); + } + const jsonStr = JSON.stringify(data); + return new Uint8Array(new TextEncoder().encode(jsonStr)); + } else if (payloadType === 'image') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return new Uint8Array(data); + } else { + throw new Error('Image data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); + } + } else if (payloadType === 'audio') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return new Uint8Array(data); + } else { + throw new Error('Audio data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); + } + } else if (payloadType === 'video') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return new Uint8Array(data); + } else { + throw new Error('Video data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); + } + } else if (payloadType === 'binary') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return new Uint8Array(data); + } else { + throw new Error('Binary data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); + } + } else { + throw new Error(`Unknown payload_type: ${payloadType}`); + } +} + +/** + * Helper function to properly serialize table data to Arrow IPC + * @param {Array} data - Array of objects representing table rows + * @returns {Uint8Array} Arrow IPC formatted buffer + */ +function serializeArrowTable(data) { + if (!Array.isArray(data) || data.length === 0) { + throw new Error('Table data must be a non-empty array of objects'); + } + + logTrace('serializeArrowTable', `Serializing table with ${data.length} rows`); + + // Convert array of objects to a key-value format expected by tableFromArrays + const columns = {}; + const keys = Object.keys(data[0]); + for (const key of keys) { + columns[key] = data.map(row => row[key]); + } + + logTrace('serializeArrowTable', `Columns: ${Object.keys(columns).join(', ')}`); + + const table = tableFromArrays(columns); + + logTrace('serializeArrowTable', `Arrow table created with ${table.numRows} rows, ${table.numCols} cols`); + + // Convert to IPC format + const ipcBuffer = tableToIPC(table); + + logTrace('serializeArrowTable', `IPC buffer type: ${typeof ipcBuffer}, byteLength: ${ipcBuffer.byteLength}`); + + const resultBuffer = new Uint8Array(ipcBuffer); + logTrace('serializeArrowTable', `Result buffer: ${resultBuffer.length} bytes`); + + // Debug: Show first 20 bytes in hex + const hexPreview = []; + for (let i = 0; i < Math.min(20, resultBuffer.length); i++) { + hexPreview.push(resultBuffer[i].toString(16).padStart(2, '0')); + } + logTrace('serializeArrowTable', `First 20 bytes (hex): ${hexPreview.join(' ')}`); + + return resultBuffer; +} + +/** + * Deserialize bytes to data based on type + * @param {Uint8Array|ArrayBuffer} data - Serialized data as bytes + * @param {string} payloadType - Data type + * @param {string} correlationId - Correlation ID for logging + * @returns {any} Deserialized data + */ +async function deserializeData(data, payloadType, correlationId) { + const buffer = data instanceof Uint8Array ? data : new Uint8Array(data); + + logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`); + + // Debug: Show first 20 bytes in hex for binary data + if (payloadType === 'arrowtable' || payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') { + const hexPreview = []; + for (let i = 0; i < Math.min(20, buffer.length); i++) { + hexPreview.push(buffer[i].toString(16).padStart(2, '0')); + } + logTrace(correlationId, `deserializeData: First 20 bytes (hex): ${hexPreview.join(' ')}`); + } + + if (payloadType === 'text') { + const result = new TextDecoder().decode(buffer); + logTrace(correlationId, `deserializeData: text result length=${result.length}`); + return result; + } else if (payloadType === 'dictionary') { + const jsonStr = new TextDecoder().decode(buffer); + const result = JSON.parse(jsonStr); + logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`); + return result; + } else if (payloadType === 'arrowtable') { + logTrace(correlationId, `deserializeData: Attempting Arrow table deserialization`); + + try { + // Try tableFromIPC (browser API) + const table = tableFromIPC(buffer); + logTrace(correlationId, `deserializeData: Arrow table from IPC - rows=${table.numRows}, cols=${table.numCols}`); + return table; + } catch (e) { + logTrace(correlationId, `deserializeData: tableFromIPC failed: ${e.message}`); + throw new Error(`Unable to deserialize Arrow table: ${e.message}`); + } + } else if (payloadType === 'jsontable') { + const jsonStr = new TextDecoder().decode(buffer); + const result = JSON.parse(jsonStr); + logTrace(correlationId, `deserializeData: jsontable result length=${Array.isArray(result) ? result.length : 'N/A'}`); + return result; + } else if (payloadType === 'image') { + logTrace(correlationId, `deserializeData: image buffer length=${buffer.length}`); + return buffer; + } else if (payloadType === 'audio') { + logTrace(correlationId, `deserializeData: audio buffer length=${buffer.length}`); + return buffer; + } else if (payloadType === 'video') { + logTrace(correlationId, `deserializeData: video buffer length=${buffer.length}`); + return buffer; + } else if (payloadType === 'binary') { + logTrace(correlationId, `deserializeData: binary buffer length=${buffer.length}`); + return buffer; + } else { + throw new Error(`Unknown payload_type: ${payloadType}`); + } +} + +// ---------------------------------------------- File Server Handlers ---------------------------------------------- // + +/** + * Upload data to plik server in one-shot mode + * @param {string} fileServerUrl - Base URL of the plik server + * @param {string} dataname - Name of the file being uploaded + * @param {Uint8Array} data - Raw byte data of the file content + * @returns {Promise<{status: number, uploadid: string, fileid: string, url: string}>} + */ +async function plikOneshotUpload(fileServerUrl, dataname, data) { + const buffer = data instanceof Uint8Array ? data : new Uint8Array(data); + + // Get upload id + const urlGetUploadID = `${fileServerUrl}/upload`; + const headers = { 'Content-Type': 'application/json' }; + const body = JSON.stringify({ OneShot: true }); + + const httpResponse = await fetch(urlGetUploadID, { + method: 'POST', + headers, + body + }); + + const responseJson = await httpResponse.json(); + const uploadid = responseJson.id; + const uploadtoken = responseJson.uploadToken; + + // Upload file + const urlUpload = `${fileServerUrl}/file/${uploadid}`; + const form = new FormData(); + const blob = new Blob([buffer], { type: 'application/octet-stream' }); + form.append('file', blob, dataname); + + const uploadHeaders = { + 'X-UploadToken': uploadtoken + }; + + const uploadResponse = await fetch(urlUpload, { + method: 'POST', + headers: uploadHeaders, + body: form + }); + + const uploadJson = await uploadResponse.json(); + const fileid = uploadJson.id; + + const url = `${fileServerUrl}/file/${uploadid}/${fileid}/${dataname}`; + + return { + status: uploadResponse.status, + uploadid, + fileid, + url + }; +} + +/** + * Fetch data from URL with exponential backoff + * @param {string} url - URL to fetch from + * @param {number} maxRetries - Maximum number of retry attempts + * @param {number} baseDelay - Initial delay in milliseconds + * @param {number} maxDelay - Maximum delay in milliseconds + * @param {string} correlationId - Correlation ID for logging + * @returns {Promise} Fetched data as bytes + */ +async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) { + let delay = baseDelay; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + const response = await fetch(url); + + if (response.status === 200) { + logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`); + const arrayBuffer = await response.arrayBuffer(); + return new Uint8Array(arrayBuffer); + } else { + throw new Error(`Failed to fetch: ${response.status}`); + } + } catch (e) { + logTrace(correlationId, `Attempt ${attempt} failed: ${e.constructor.name} - ${e.message}`); + + if (attempt < maxRetries) { + await new Promise(resolve => setTimeout(resolve, delay)); + delay = Math.min(delay * 2, maxDelay); + } + } + } + + throw new Error(`Failed to fetch data after ${maxRetries} attempts`); +} + +// ---------------------------------------------- NATS Client ---------------------------------------------- // + +/** + * NATS client wrapper for connection management + */ +class NATSClient { + /** + * Create a new NATS client + * @param {string} url - NATS server URL (ws:// or wss://) + */ + constructor(url) { + this.url = url; + this.connection = null; + } + + /** + * Connect to NATS server + * @returns {Promise} + */ + async connect() { + this.connection = await nats.connect({ servers: this.url }); + return this.connection; + } + + /** + * Publish message to NATS subject + * @param {string} subject - NATS subject to publish to + * @param {string} message - Message to publish + * @param {string} correlationId - Correlation ID for logging + */ + async publish(subject, message, correlationId) { + if (!this.connection) { + await this.connect(); + } + await this.connection.publish(subject, message); + logTrace(correlationId, `Message published to ${subject}`); + } + + /** + * Close the NATS connection + */ + async close() { + if (this.connection) { + this.connection.close(); + } + } +} + +// ---------------------------------------------- Core Functions ---------------------------------------------- // + +/** + * Publish message to NATS + * @param {string|NATSClient|NATS.Connection} brokerUrlOrClient - NATS URL, client, or connection + * @param {string} subject - NATS subject to publish to + * @param {string} message - JSON message to publish + * @param {string} correlationId - Correlation ID for tracing + */ +async function publishMessage(brokerUrlOrClient, subject, message, correlationId) { + let conn; + + if (brokerUrlOrClient instanceof NATSClient) { + conn = brokerUrlOrClient; + } else if (brokerUrlOrClient && typeof brokerUrlOrClient.publish === 'function') { + // Create a wrapper for direct connection (duck-typing check for NATS connection) + conn = { + async publish(subj, msg) { + await brokerUrlOrClient.publish(subj, msg); + }, + async close() { + await brokerUrlOrClient.close(); + } + }; + } else { + // String URL - create new client + const client = new NATSClient(brokerUrlOrClient); + conn = client; + } + + await conn.publish(subject, message, correlationId); + + if (conn instanceof NATSClient) { + await conn.close(); + } +} + +/** + * Build message envelope from payloads and metadata + * @param {string} subject - NATS subject + * @param {Array} payloads - Array of payload objects + * @param {Object} options - Envelope metadata options + * @returns {Object} Envelope object + */ +function buildEnvelope(subject, payloads, options) { + return { + correlation_id: options.correlation_id, + msg_id: options.msg_id, + timestamp: new Date().toISOString(), + send_to: subject, + msg_purpose: options.msg_purpose, + sender_name: options.sender_name, + sender_id: options.sender_id, + receiver_name: options.receiver_name, + receiver_id: options.receiver_id, + reply_to: options.reply_to, + reply_to_msg_id: options.reply_to_msg_id, + broker_url: options.broker_url, + metadata: options.metadata || {}, + payloads: payloads + }; +} + +/** + * Build payload object from serialized data + * @param {string} dataname - Name of the payload + * @param {string} payloadType - Type of the payload + * @param {Uint8Array} payloadBytes - Serialized payload bytes + * @param {string} transport - Transport type ("direct" or "link") + * @param {string} data - Data (base64 for direct, URL for link) + * @returns {Object} Payload object + */ +function buildPayload(dataname, payloadType, payloadBytes, transport, data) { + // Determine encoding based on payload type (matching Julia implementation) + let encoding = 'base64'; + if (payloadType === 'jsontable') { + encoding = 'json'; + } else if (payloadType === 'arrowtable') { + encoding = 'arrow-ipc'; + } + + return { + id: uuidv4(), + dataname, + payload_type: payloadType, + transport, + encoding, + size: payloadBytes.byteLength, + data, + metadata: transport === 'direct' ? { payload_bytes: payloadBytes.byteLength } : {} + }; +} + +/** + * Send data via NATS with automatic transport selection + * + * This function intelligently routes data delivery based on payload size. + * If the serialized payload is smaller than size_threshold, it encodes the data as Base64 + * and publishes directly over NATS. Otherwise, it uploads the data to a fileserver + * and publishes only the download URL over NATS. + * + * @param {string} subject - NATS subject to publish the message to + * @param {Array} data - List of [dataname, data, type] tuples to send + * - type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" + * @param {Object} options - Optional configuration + * @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server (WebSocket) + * @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server + * @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads + * @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport + * @param {string} [options.correlation_id=uuidv4()] - Correlation ID for tracing + * @param {string} [options.msg_purpose="chat"] - Purpose of the message + * @param {string} [options.sender_name="NATSBridge"] - Name of the sender + * @param {string} [options.receiver_name=""] - Name of the receiver (empty means broadcast) + * @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast) + * @param {string} [options.reply_to=""] - Topic to reply to + * @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to + * @param {boolean} [options.is_publish=true] - Whether to automatically publish the message + * @param {NATSClient|NATS.Connection} [options.nats_connection=null] - Pre-existing NATS connection + * @param {string} [options.msg_id=uuidv4()] - Message ID + * @param {string} [options.sender_id=uuidv4()] - Sender ID + * @returns {Promise<[Object, string]>} Tuple of [env, env_json_str] + * + * @example + * // Send a single payload + * const [env, envJsonStr] = await NATSBridgeCSR.smartsend( + * "/test", + * [["dataname1", data1, "dictionary"]], + * { broker_url: "ws://localhost:4222" } + * ); + * + * // Send multiple payloads + * const [env, envJsonStr] = await NATSBridgeCSR.smartsend( + * "/test", + * [ + * ["dataname1", data1, "dictionary"], + * ["dataname2", data2, "arrowtable"] + * ], + * { broker_url: "ws://localhost:4222" } + * ); + */ +async function smartsend(subject, data, options = {}) { + const { + broker_url = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler = plikOneshotUpload, + size_threshold = DEFAULT_SIZE_THRESHOLD, + correlation_id = uuidv4(), + msg_purpose = 'chat', + sender_name = 'NATSBridge', + receiver_name = '', + receiver_id = '', + reply_to = '', + reply_to_msg_id = '', + is_publish = true, + nats_connection = null, + msg_id = uuidv4(), + sender_id = uuidv4() + } = options; + + logTrace(correlation_id, `Starting smartsend for subject: ${subject}`); + logTrace(correlation_id, `smartsend: data array length=${data.length}`); + + // Debug: Log input data structure + for (let i = 0; i < data.length; i++) { + const [dataname, payloadData, payloadType] = data[i]; + logTrace(correlation_id, `smartsend: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`); + } + + // Process payloads + const payloads = []; + for (const [dataname, payloadData, payloadType] of data) { + logTrace(correlation_id, `smartsend: Processing payload '${dataname}' type=${payloadType}`); + logTrace(correlation_id, `smartsend: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`); + + const payloadBytes = await serializeData(payloadData, payloadType); + const payloadSize = payloadBytes.byteLength; + + logTrace(correlation_id, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); + + // Debug: Show first 20 bytes of serialized data for table type + if (payloadType === 'table') { + const hexPreview = []; + for (let i = 0; i < Math.min(20, payloadBytes.length); i++) { + hexPreview.push(payloadBytes[i].toString(16).padStart(2, '0')); + } + logTrace(correlation_id, `Serialized table data first 20 bytes (hex): ${hexPreview.join(' ')}`); + } + + if (payloadSize < size_threshold) { + // Direct path + const payloadB64 = bufferToBase64(payloadBytes); + logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes, base64 length=${payloadB64.length}`); + + const payload = buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64); + payloads.push(payload); + } else { + // Link path + logTrace(correlation_id, `Using link transport, uploading to fileserver`); + + const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); + + if (response.status !== 200) { + throw new Error(`Failed to upload data to fileserver: ${response.status}`); + } + + logTrace(correlation_id, `Uploaded to URL: ${response.url}`); + + const payload = buildPayload(dataname, payloadType, payloadBytes, 'link', response.url); + payloads.push(payload); + } + } + + // Build envelope + const env = buildEnvelope(subject, payloads, { + correlation_id, + msg_id, + msg_purpose, + sender_name, + sender_id, + receiver_name, + receiver_id, + reply_to, + reply_to_msg_id, + broker_url + }); + + const env_json_str = JSON.stringify(env); + + if (is_publish) { + if (nats_connection) { + await publishMessage(nats_connection, subject, env_json_str, correlation_id); + } else { + await publishMessage(broker_url, subject, env_json_str, correlation_id); + } + } + + return [env, env_json_str]; +} + +/** + * Receive and process NATS message + * + * This function processes incoming NATS messages, handling both direct transport + * (base64 decoded payloads) and link transport (URL-based payloads). + * It deserializes the data based on the transport type and returns the result. + * + * @param {Object} msg - NATS message object with payload property + * @param {Object} options - Optional configuration + * @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads + * @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL + * @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms + * @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms + * @returns {Promise} Envelope object with processed payloads + * + * @example + * // Receive and process message + * const env = await NATSBridgeCSR.smartreceive(msg, { + * fileserver_download_handler: NATSBridgeCSR.fetchWithBackoff, + * max_retries: 5, + * base_delay: 100, + * max_delay: 5000 + * }); + * // env.payloads is an Array of [dataname, data, type] arrays + * for (const [dataname, data, type] of env.payloads) { + * console.log(`${dataname}: ${data} (type: ${type})`); + * } + */ +async function smartreceive(msg, options = {}) { + const { + fileserver_download_handler = fetchWithBackoff, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + } = options; + + // Debug: Log message object structure + logTrace('smartreceive', `smartreceive: msg object keys: ${Object.keys(msg).join(', ')}`); + logTrace('smartreceive', `smartreceive: msg.data type: ${typeof msg.data}, constructor: ${msg.data?.constructor?.name}`); + logTrace('smartreceive', `smartreceive: msg.payload type: ${typeof msg.payload}, constructor: ${msg.payload?.constructor?.name}`); + + // Parse the JSON envelope + // NATS.js v2.x uses msg.data instead of msg.payload + let payload; + if (msg.data !== undefined) { + payload = typeof msg.data === 'string' ? msg.data : new TextDecoder().decode(msg.data); + } else if (msg.payload !== undefined) { + payload = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload); + } else { + throw new Error('Message has neither data nor payload property'); + } + + logTrace('smartreceive', `smartreceive: raw payload length=${payload.length}`); + + // Debug: Show first 200 chars of payload + const payloadPreview = payload.substring(0, 200); + logTrace('smartreceive', `smartreceive: payload preview: ${payloadPreview}`); + + let envJsonObj; + try { + envJsonObj = JSON.parse(payload); + } catch (e) { + logTrace('smartreceive', `smartreceive: JSON parse failed: ${e.message}`); + throw e; + } + + logTrace(envJsonObj.correlation_id, 'Processing received message'); + logTrace(envJsonObj.correlation_id, `smartreceive: envelope has ${envJsonObj.payloads.length} payloads`); + + // Process all payloads in the envelope + const payloadsList = []; + const numPayloads = envJsonObj.payloads.length; + + logTrace(envJsonObj.correlation_id, `smartreceive: Processing ${numPayloads} payloads`); + + for (let i = 0; i < numPayloads; i++) { + const payloadObj = envJsonObj.payloads[i]; + const transport = payloadObj.transport; + const dataname = payloadObj.dataname; + const payloadType = payloadObj.payload_type; + + logTrace(envJsonObj.correlation_id, `smartreceive: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`); + + if (transport === 'direct') { + logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`); + + // Extract base64 payload from the payload + const payloadB64 = payloadObj.data; + logTrace(envJsonObj.correlation_id, `Direct transport: base64 length=${payloadB64?.length}`); + + // Decode Base64 payload + const payloadBytes = base64ToBuffer(payloadB64); + logTrace(envJsonObj.correlation_id, `Direct transport: decoded bytes=${payloadBytes.length}`); + + // Deserialize based on type + const dataType = payloadObj.payload_type; + const data = await deserializeData(payloadBytes, dataType, envJsonObj.correlation_id); + logTrace(envJsonObj.correlation_id, `Direct transport: deserialized data type=${typeof data}, constructor=${data?.constructor?.name}`); + + payloadsList.push([dataname, data, dataType]); + } else if (transport === 'link') { + // Extract download URL from the payload + const url = payloadObj.data; + logTrace(envJsonObj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`); + + // Fetch with exponential backoff using the download handler + const downloadedData = await fileserver_download_handler( + url, + max_retries, + base_delay, + max_delay, + envJsonObj.correlation_id + ); + + // Deserialize based on type + const dataType = payloadObj.payload_type; + const data = await deserializeData(downloadedData, dataType, envJsonObj.correlation_id); + + payloadsList.push([dataname, data, dataType]); + } else { + throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`); + } + } + + logTrace(envJsonObj.correlation_id, `smartreceive: Successfully processed all ${payloadsList.length} payloads`); + envJsonObj.payloads = payloadsList; + return envJsonObj; +} + +// ---------------------------------------------- Module Exports ---------------------------------------------- // + +const NATSBridgeCSR = { + /** + * NATS client class for connection management + */ + NATSClient, + + /** + * Send data via NATS with automatic transport selection + */ + smartsend, + + /** + * Receive and process NATS message + */ + smartreceive, + + /** + * Upload data to plik server in one-shot mode + */ + plikOneshotUpload, + + /** + * Fetch data from URL with exponential backoff + */ + fetchWithBackoff, + + /** + * Default constants + */ + DEFAULT_SIZE_THRESHOLD, + DEFAULT_BROKER_URL, + DEFAULT_FILESERVER_URL +}; + +export default NATSBridgeCSR; diff --git a/src/natsbridge.js b/src/natsbridge_ssr.js similarity index 98% rename from src/natsbridge.js rename to src/natsbridge_ssr.js index 1711b33..5276b23 100644 --- a/src/natsbridge.js +++ b/src/natsbridge_ssr.js @@ -1,6 +1,6 @@ /** * NATSBridge - Cross-Platform Bi-Directional Data Bridge - * JavaScript/Node.js Implementation + * JavaScript/Node.js Implementation (Client-Side Rendering) * * This module provides functionality for sending and receiving data across network boundaries * using NATS as the message bus, with support for both direct payload transport and @@ -16,6 +16,16 @@ const crypto = require('crypto'); // Use native fetch available in Node.js 18+ const arrow = require('apache-arrow'); +// ---------------------------------------------- UUID Helper ---------------------------------------------- // + +/** + * Generate UUID v4 using crypto module (Node.js compatible) + * @returns {string} UUID string + */ +function uuidv4() { + return crypto.randomUUID(); +} + // ---------------------------------------------- Constants ---------------------------------------------- // /** @@ -458,7 +468,7 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) { } return { - id: crypto.randomUUID(), + id: uuidv4(), dataname, payload_type: payloadType, transport, @@ -530,7 +540,7 @@ async function smartsend(subject, data, options = {}) { fileserver_url = DEFAULT_FILESERVER_URL, fileserver_upload_handler = plikOneshotUpload, size_threshold = DEFAULT_SIZE_THRESHOLD, - correlation_id = crypto.randomUUID(), + correlation_id = uuidv4(), msg_purpose = 'chat', sender_name = 'NATSBridge', receiver_name = '', @@ -539,8 +549,8 @@ async function smartsend(subject, data, options = {}) { reply_to_msg_id = '', is_publish = true, nats_connection = null, - msg_id = crypto.randomUUID(), - sender_id = crypto.randomUUID() + msg_id = uuidv4(), + sender_id = uuidv4() } = options; logTrace(correlation_id, `Starting smartsend for subject: ${subject}`);