/** * msghandler - Cross-Platform Bi-Directional Data Bridge * Browser-Compatible Implementation (Client-Side Rendering) * * This module provides functionality for sending and receiving data across network boundaries * with support for both direct payload transport and URL-based transport for larger payloads. * * Supported payload types: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" * Note: Browser version does NOT support Apache Arrow IPC (arrowtable) due to browser compatibility constraints. * Use "jsontable" for tabular data in browser applications. * * Browser requirements: * - Modern browser with ES module support (or use module bundler) * - Web Crypto API for UUID generation * - Fetch API for HTTP requests * * Browser-compatible version uses: * - Web Crypto API for UUID generation * - Uint8Array instead of Buffer * - fetch API for file server communication * * @module msghandlerCSR */ // Use native fetch available in browsers // ---------------------------------------------- Constants ---------------------------------------------- // /** * Default size threshold for switching from direct to link transport (0.5MB) */ const DEFAULT_SIZE_THRESHOLD = 500_000; /** * Default broker URL */ const DEFAULT_BROKER_URL = 'localhost:4222'; /** * Default HTTP file server URL for link transport */ const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; // ---------------------------------------------- Utility Functions ---------------------------------------------- // /** * Convert Uint8Array to Base64 string * @param {Uint8Array} data - Data to encode * @returns {string} Base64 encoded string */ function bufferToBase64(data) { const bytes = new Uint8Array(data); const binary = String.fromCharCode(...bytes); return btoa(binary); } /** * Convert Base64 string to Uint8Array * @param {string} base64 - Base64 encoded string * @returns {Uint8Array} Decoded binary data */ function base64ToBuffer(base64) { const binary = atob(base64); const len = binary.length; const bytes = new Uint8Array(len); for (let i = 0; i < len; i++) { bytes[i] = binary.charCodeAt(i); } return bytes; } /** * Generate UUID v4 using Web Crypto API * @returns {string} UUID string */ function uuidv4() { const array = new Uint8Array(16); crypto.getRandomValues(array); array[6] = (array[6] & 0x0f) | 0x40; array[8] = (array[8] & 0x3f) | 0x80; return Array.from(array, (val) => val.toString(16).padStart(2, '0').toUpperCase()).join(''); } /** * Log a trace message with correlation ID and timestamp * @param {string} correlationId - Correlation ID for tracing * @param {string} message - Message content to log */ function logTrace(correlationId, message) { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`); } // ---------------------------------------------- Serialization Functions ---------------------------------------------- // /** * Serialize data according to specified format * @param {any} data - Data to serialize * @param {string} payloadType - Target format: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" * @returns {Uint8Array} Binary representation of the serialized data */ async function serializeData(data, payloadType) { if (payloadType === 'text') { if (typeof data === 'string') { return new Uint8Array(new TextEncoder().encode(data)); } else { throw new Error('Text data must be a string'); } } else if (payloadType === 'dictionary') { const jsonStr = JSON.stringify(data); return new Uint8Array(new TextEncoder().encode(jsonStr)); } else if (payloadType === 'jsontable') { // Serialize array of objects to JSON format if (!Array.isArray(data)) { throw new Error('JSON table data must be an array'); } const jsonStr = JSON.stringify(data); return new Uint8Array(new TextEncoder().encode(jsonStr)); } else if (payloadType === 'image') { if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { return new Uint8Array(data); } else { throw new Error('Image data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); } } else if (payloadType === 'audio') { if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { return new Uint8Array(data); } else { throw new Error('Audio data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); } } else if (payloadType === 'video') { if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { return new Uint8Array(data); } else { throw new Error('Video data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); } } else if (payloadType === 'binary') { if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { return new Uint8Array(data); } else { throw new Error('Binary data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); } } else { throw new Error(`Unknown payload_type: ${payloadType}`); } } /** * Deserialize bytes to data based on type * @param {Uint8Array|ArrayBuffer} data - Serialized data as bytes * @param {string} payloadType - Data type * @param {string} correlationId - Correlation ID for logging * @returns {any} Deserialized data */ async function deserializeData(data, payloadType, correlationId) { const buffer = data instanceof Uint8Array ? data : new Uint8Array(data); logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`); // Debug: Show first 20 bytes in hex for binary data if (payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') { const hexPreview = []; for (let i = 0; i < Math.min(20, buffer.length); i++) { hexPreview.push(buffer[i].toString(16).padStart(2, '0')); } logTrace(correlationId, `deserializeData: First 20 bytes (hex): ${hexPreview.join(' ')}`); } if (payloadType === 'text') { const result = new TextDecoder().decode(buffer); logTrace(correlationId, `deserializeData: text result length=${result.length}`); return result; } else if (payloadType === 'dictionary') { const jsonStr = new TextDecoder().decode(buffer); const result = JSON.parse(jsonStr); logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`); return result; } else if (payloadType === 'jsontable') { const jsonStr = new TextDecoder().decode(buffer); const result = JSON.parse(jsonStr); logTrace(correlationId, `deserializeData: jsontable result length=${Array.isArray(result) ? result.length : 'N/A'}`); return result; } else if (payloadType === 'image') { logTrace(correlationId, `deserializeData: image buffer length=${buffer.length}`); return buffer; } else if (payloadType === 'audio') { logTrace(correlationId, `deserializeData: audio buffer length=${buffer.length}`); return buffer; } else if (payloadType === 'video') { logTrace(correlationId, `deserializeData: video buffer length=${buffer.length}`); return buffer; } else if (payloadType === 'binary') { logTrace(correlationId, `deserializeData: binary buffer length=${buffer.length}`); return buffer; } else { throw new Error(`Unknown payload_type: ${payloadType}`); } } // ---------------------------------------------- File Server Handlers ---------------------------------------------- // /** * Upload data to plik server in one-shot mode * @param {string} fileServerUrl - Base URL of the plik server * @param {string} dataname - Name of the file being uploaded * @param {Uint8Array} data - Raw byte data of the file content * @returns {Promise<{status: number, uploadid: string, fileid: string, url: string}>} */ async function plikOneshotUpload(fileServerUrl, dataname, data) { const buffer = data instanceof Uint8Array ? data : new Uint8Array(data); // Get upload id const urlGetUploadID = `${fileServerUrl}/upload`; const headers = { 'Content-Type': 'application/json' }; const body = JSON.stringify({ OneShot: true }); const httpResponse = await fetch(urlGetUploadID, { method: 'POST', headers, body }); const responseJson = await httpResponse.json(); const uploadid = responseJson.id; const uploadtoken = responseJson.uploadToken; // Upload file const urlUpload = `${fileServerUrl}/file/${uploadid}`; const form = new FormData(); const blob = new Blob([buffer], { type: 'application/octet-stream' }); form.append('file', blob, dataname); const uploadHeaders = { 'X-UploadToken': uploadtoken }; const uploadResponse = await fetch(urlUpload, { method: 'POST', headers: uploadHeaders, body: form }); const uploadJson = await uploadResponse.json(); const fileid = uploadJson.id; const url = `${fileServerUrl}/file/${uploadid}/${fileid}/${dataname}`; return { status: uploadResponse.status, uploadid, fileid, url }; } /** * Fetch data from URL with exponential backoff * @param {string} url - URL to fetch from * @param {number} maxRetries - Maximum number of retry attempts * @param {number} baseDelay - Initial delay in milliseconds * @param {number} maxDelay - Maximum delay in milliseconds * @param {string} correlationId - Correlation ID for logging * @returns {Promise} Fetched data as bytes */ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) { let delay = baseDelay; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { const response = await fetch(url); if (response.status === 200) { logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`); const arrayBuffer = await response.arrayBuffer(); return new Uint8Array(arrayBuffer); } else { throw new Error(`Failed to fetch: ${response.status}`); } } catch (e) { logTrace(correlationId, `Attempt ${attempt} failed: ${e.constructor.name} - ${e.message}`); if (attempt < maxRetries) { await new Promise(resolve => setTimeout(resolve, delay)); delay = Math.min(delay * 2, maxDelay); } } } throw new Error(`Failed to fetch data after ${maxRetries} attempts`); } // ---------------------------------------------- Core Functions ---------------------------------------------- // /** * Build message envelope from payloads and metadata * @param {string} subject - Subject/topic * @param {Array} payloads - Array of payload objects * @param {Object} options - Envelope metadata options * @returns {Object} Envelope object */ function buildEnvelope(subject, payloads, options) { return { correlation_id: options.correlation_id, msg_id: options.msg_id, timestamp: new Date().toISOString(), send_to: subject, msg_purpose: options.msg_purpose, sender_name: options.sender_name, sender_id: options.sender_id, receiver_name: options.receiver_name, receiver_id: options.receiver_id, reply_to: options.reply_to, reply_to_msg_id: options.reply_to_msg_id, broker_url: options.broker_url, metadata: options.metadata || {}, payloads: payloads }; } /** * Build payload object from serialized data * @param {string} dataname - Name of the payload * @param {string} payloadType - Type of the payload * @param {Uint8Array} payloadBytes - Serialized payload bytes * @param {string} transport - Transport type ("direct" or "link") * @param {string} data - Data (base64 for direct, URL for link) * @returns {Object} Payload object */ function buildPayload(dataname, payloadType, payloadBytes, transport, data) { // Determine encoding based on payload type (matching Julia implementation) let encoding = 'base64'; if (payloadType === 'jsontable') { encoding = 'json'; } return { id: uuidv4(), dataname, payload_type: payloadType, transport, encoding, size: payloadBytes.byteLength, data, metadata: transport === 'direct' ? { payload_bytes: payloadBytes.byteLength } : {} }; } /** * Send data with automatic transport selection * * This function intelligently routes data delivery based on payload size. * If the serialized payload is smaller than size_threshold, it encodes the data as Base64 * into a "direct" payload. Otherwise, it uploads the data to a fileserver * and creates a "link" payload with the URL. * * Transport publishing is the caller's responsibility. This function returns the * envelope and its JSON string representation. * * @param {string} subject - Subject/topic to send the message to * @param {Array} data - List of [dataname, data, type] tuples to send * - type: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" * - Note: "arrowtable" is NOT supported in browser (use "jsontable" for tabular data) * @param {Object} options - Optional configuration * @param {string} [options.broker_url=DEFAULT_BROKER_URL] - Broker URL (for envelope metadata) * @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server * @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads * @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport * @param {string} [options.correlation_id=uuidv4()] - Correlation ID for tracing * @param {string} [options.msg_purpose="chat"] - Purpose of the message * @param {string} [options.sender_name="msghandler"] - Name of the sender * @param {string} [options.receiver_name=""] - Name of the receiver (empty means broadcast) * @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast) * @param {string} [options.reply_to=""] - Topic to reply to * @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to * @param {string} [options.msg_id=uuidv4()] - Message ID * @param {string} [options.sender_id=uuidv4()] - Sender ID * @returns {Promise<[Object, string]>} Tuple of [env, env_json_str] * * @example * // Send a single payload * const [env, envJsonStr] = await msghandlerCSR.smartpack( * "/test", * [["dataname1", data1, "dictionary"]] * ); * * // Send multiple payloads (use jsontable instead of arrowtable for browser) * const [env, envJsonStr] = await msghandlerCSR.smartpack( * "/test", * [ * ["dataname1", data1, "dictionary"], * ["dataname2", tableData, "jsontable"] * ] * ); * * // Publish via your transport (NATS, MQTT, HTTP, etc.) * // await myNatsClient.publish("/test", envJsonStr); */ async function smartpack(subject, data, options = {}) { const { broker_url = DEFAULT_BROKER_URL, fileserver_url = DEFAULT_FILESERVER_URL, fileserver_upload_handler = plikOneshotUpload, size_threshold = DEFAULT_SIZE_THRESHOLD, correlation_id = uuidv4(), msg_purpose = 'chat', sender_name = 'msghandler', receiver_name = '', receiver_id = '', reply_to = '', reply_to_msg_id = '', msg_id = uuidv4(), sender_id = uuidv4() } = options; logTrace(correlation_id, `Starting smartpack for subject: ${subject}`); logTrace(correlation_id, `smartpack: data array length=${data.length}`); // Debug: Log input data structure for (let i = 0; i < data.length; i++) { const [dataname, payloadData, payloadType] = data[i]; logTrace(correlation_id, `smartpack: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`); } // Process payloads const payloads = []; for (const [dataname, payloadData, payloadType] of data) { logTrace(correlation_id, `smartpack: Processing payload '${dataname}' type=${payloadType}`); logTrace(correlation_id, `smartpack: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`); const payloadBytes = await serializeData(payloadData, payloadType); const payloadSize = payloadBytes.byteLength; logTrace(correlation_id, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); // Debug: Show first 20 bytes of serialized data for table type if (payloadType === 'table') { const hexPreview = []; for (let i = 0; i < Math.min(20, payloadBytes.length); i++) { hexPreview.push(payloadBytes[i].toString(16).padStart(2, '0')); } logTrace(correlation_id, `Serialized table data first 20 bytes (hex): ${hexPreview.join(' ')}`); } if (payloadSize < size_threshold) { // Direct path const payloadB64 = bufferToBase64(payloadBytes); logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes, base64 length=${payloadB64.length}`); const payload = buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64); payloads.push(payload); } else { // Link path logTrace(correlation_id, `Using link transport, uploading to fileserver`); const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); if (response.status !== 200) { throw new Error(`Failed to upload data to fileserver: ${response.status}`); } logTrace(correlation_id, `Uploaded to URL: ${response.url}`); const payload = buildPayload(dataname, payloadType, payloadBytes, 'link', response.url); payloads.push(payload); } } // Build envelope const env = buildEnvelope(subject, payloads, { correlation_id, msg_id, msg_purpose, sender_name, sender_id, receiver_name, receiver_id, reply_to, reply_to_msg_id, broker_url }); const env_json_str = JSON.stringify(env); return [env, env_json_str]; } /** * Receive and process messages * * This function processes incoming messages, handling both direct transport * (base64 decoded payloads) and link transport (URL-based payloads). * It deserializes the data based on the transport type and returns the result. * * @param {string|Object} msg - Message payload. Accepts either a JSON string directly, * or an object with a `data` or `payload` property containing the JSON string. * @param {Object} options - Optional configuration * @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads * @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL * @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms * @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms * @returns {Promise} Envelope object with processed payloads * * @example * // Receive from JSON string directly * const env = await msghandlerCSR.smartunpack(jsonString, { * fileserver_download_handler: msghandlerCSR.fetchWithBackoff, * max_retries: 5, * base_delay: 100, * max_delay: 5000 * }); * * // Receive from transport message object (e.g., NATS, MQTT) * const env = await msghandlerCSR.smartunpack(natsMsg, { * fileserver_download_handler: msghandlerCSR.fetchWithBackoff * }); * // env.payloads is an Array of [dataname, data, type] arrays * for (const [dataname, data, type] of env.payloads) { * console.log(`${dataname}: ${data} (type: ${type})`); * } */ async function smartunpack(msg, options = {}) { const { fileserver_download_handler = fetchWithBackoff, max_retries = 5, base_delay = 100, max_delay = 5000 } = options; let envJsonObj; // NATS's Javascript lib already return JSON object if (msg !== null && typeof msg === 'object') { envJsonObj = msg; } else { throw new Error('Invalid message format: expected JSON string or message object'); } logTrace(envJsonObj.correlation_id, 'Processing received message'); logTrace(envJsonObj.correlation_id, `smartunpack: envelope has ${envJsonObj.payloads.length} payloads`); // Process all payloads in the envelope const payloadsList = []; const numPayloads = envJsonObj.payloads.length; logTrace(envJsonObj.correlation_id, `smartunpack: Processing ${numPayloads} payloads`); for (let i = 0; i < numPayloads; i++) { const payloadObj = envJsonObj.payloads[i]; const transport = payloadObj.transport; const dataname = payloadObj.dataname; const payloadType = payloadObj.payload_type; logTrace(envJsonObj.correlation_id, `smartunpack: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`); if (transport === 'direct') { logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`); // Extract base64 payload from the payload const payloadB64 = payloadObj.data; logTrace(envJsonObj.correlation_id, `Direct transport: base64 length=${payloadB64?.length}`); // Decode Base64 payload const payloadBytes = base64ToBuffer(payloadB64); logTrace(envJsonObj.correlation_id, `Direct transport: decoded bytes=${payloadBytes.length}`); // Deserialize based on type const dataType = payloadObj.payload_type; const data = await deserializeData(payloadBytes, dataType, envJsonObj.correlation_id); logTrace(envJsonObj.correlation_id, `Direct transport: deserialized data type=${typeof data}, constructor=${data?.constructor?.name}`); payloadsList.push([dataname, data, dataType]); } else if (transport === 'link') { // Extract download URL from the payload const url = payloadObj.data; logTrace(envJsonObj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`); // Fetch with exponential backoff using the download handler const downloadedData = await fileserver_download_handler( url, max_retries, base_delay, max_delay, envJsonObj.correlation_id ); // Deserialize based on type const dataType = payloadObj.payload_type; const data = await deserializeData(downloadedData, dataType, envJsonObj.correlation_id); payloadsList.push([dataname, data, dataType]); } else { throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`); } } logTrace(envJsonObj.correlation_id, `smartunpack: Successfully processed all ${payloadsList.length} payloads`); envJsonObj.payloads = payloadsList; return envJsonObj; } // ---------------------------------------------- Module Exports ---------------------------------------------- // const msghandlerCSR = { /** * Send data with automatic transport selection */ smartpack, /** * Receive and process messages */ smartunpack, /** * Upload data to plik server in one-shot mode */ plikOneshotUpload, /** * Fetch data from URL with exponential backoff */ fetchWithBackoff, /** * Default constants */ DEFAULT_SIZE_THRESHOLD, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL }; export default msghandlerCSR;