/** * NATSBridge.js - Bi-Directional Data Bridge for JavaScript * Implements smartsend and smartreceive for NATS communication * * This module provides functionality for sending and receiving data across network boundaries * using NATS as the message bus, with support for both direct payload transport and * URL-based transport for larger payloads. * * File Server Handler Architecture: * The system uses handler functions to abstract file server operations, allowing support * for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). * * Handler Function Signatures: * * ```javascript * // Upload handler - uploads data to file server and returns URL * // The handler is passed to smartsend as fileserverUploadHandler parameter * // It receives: (fileserver_url, dataname, data) * // Returns: { status, uploadid, fileid, url } * async function fileserverUploadHandler(fileserver_url, dataname, data) { ... } * * // Download handler - fetches data from file server URL with exponential backoff * // The handler is passed to smartreceive as fileserverDownloadHandler parameter * // It receives: (url, max_retries, base_delay, max_delay, correlation_id) * // Returns: ArrayBuffer (the downloaded data) * async function fileserverDownloadHandler(url, max_retries, base_delay, max_delay, correlation_id) { ... } * ``` * * Multi-Payload Support (Standard API): * The system uses a standardized list-of-tuples format for all payload operations. * Even when sending a single payload, the user must wrap it in a list. * * API Standard: * ```javascript * // Input format for smartsend (always a list of tuples with type info) * [{ dataname, data, type }, ...] * * // Output format for smartreceive (always returns a list of tuples) * [{ dataname, data, type }, ...] * ``` * * Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" */ // ---------------------------------------------- 100 --------------------------------------------- # // Constants const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB - threshold for switching from direct to link transport const DEFAULT_NATS_URL = "nats://localhost:4222"; // Default NATS server URL const DEFAULT_FILESERVER_URL = "http://localhost:8080"; // Default HTTP file server URL for link transport // Helper: Generate UUID v4 function uuid4() { // Simple UUID v4 generator return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8); return v.toString(16); }); } // Helper: Log with correlation ID and timestamp function log_trace(correlation_id, message) { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`); } // Helper: Get size of data in bytes function getDataSize(data) { if (typeof data === 'string') { return new TextEncoder().encode(data).length; } else if (data instanceof ArrayBuffer || data instanceof Uint8Array) { return data.byteLength; } else if (typeof data === 'object' && data !== null) { // For objects, serialize to JSON and measure return new TextEncoder().encode(JSON.stringify(data)).length; } return 0; } // Helper: Convert ArrayBuffer to Base64 string function arrayBufferToBase64(buffer) { const bytes = new Uint8Array(buffer); let binary = ''; for (let i = 0; i < bytes.length; i++) { binary += String.fromCharCode(bytes[i]); } return btoa(binary); } // Helper: Convert Base64 string to ArrayBuffer function base64ToArrayBuffer(base64) { const binaryString = atob(base64); const len = binaryString.length; const bytes = new Uint8Array(len); for (let i = 0; i < len; i++) { bytes[i] = binaryString.charCodeAt(i); } return bytes.buffer; } // Helper: Serialize data based on type function _serialize_data(data, type) { /** * Serialize data according to specified format * * Supported formats: * - "text": Treats data as text and converts to UTF-8 bytes * - "dictionary": Serializes data as JSON and returns the UTF-8 byte representation * - "table": Serializes data as an Arrow IPC stream (table format) - NOT IMPLEMENTED (requires arrow library) * - "image": Expects binary data (ArrayBuffer) and returns it as bytes * - "audio": Expects binary data (ArrayBuffer) and returns it as bytes * - "video": Expects binary data (ArrayBuffer) and returns it as bytes * - "binary": Generic binary data (ArrayBuffer or Uint8Array) and returns bytes */ if (type === "text") { if (typeof data === 'string') { return new TextEncoder().encode(data).buffer; } else { throw new Error("Text data must be a String"); } } else if (type === "dictionary") { // JSON data - serialize directly const jsonStr = JSON.stringify(data); return new TextEncoder().encode(jsonStr).buffer; } else if (type === "table") { // Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript) // This would require the apache-arrow library throw new Error("Table serialization requires apache-arrow library"); } else if (type === "image") { if (data instanceof ArrayBuffer || data instanceof Uint8Array) { return data instanceof ArrayBuffer ? data : data.buffer; } else { throw new Error("Image data must be ArrayBuffer or Uint8Array"); } } else if (type === "audio") { if (data instanceof ArrayBuffer || data instanceof Uint8Array) { return data instanceof ArrayBuffer ? data : data.buffer; } else { throw new Error("Audio data must be ArrayBuffer or Uint8Array"); } } else if (type === "video") { if (data instanceof ArrayBuffer || data instanceof Uint8Array) { return data instanceof ArrayBuffer ? data : data.buffer; } else { throw new Error("Video data must be ArrayBuffer or Uint8Array"); } } else if (type === "binary") { if (data instanceof ArrayBuffer || data instanceof Uint8Array) { return data instanceof ArrayBuffer ? data : data.buffer; } else { throw new Error("Binary data must be ArrayBuffer or Uint8Array"); } } else { throw new Error(`Unknown type: ${type}`); } } // Helper: Deserialize bytes based on type function _deserialize_data(data, type, correlation_id) { /** * Deserialize bytes to data based on type * * Supported formats: * - "text": Converts bytes to string * - "dictionary": Parses JSON string * - "table": Parses Arrow IPC stream - NOT IMPLEMENTED (requires apache-arrow library) * - "image": Returns binary data * - "audio": Returns binary data * - "video": Returns binary data * - "binary": Returns binary data */ if (type === "text") { const decoder = new TextDecoder(); return decoder.decode(new Uint8Array(data)); } else if (type === "dictionary") { const decoder = new TextDecoder(); const jsonStr = decoder.decode(new Uint8Array(data)); return JSON.parse(jsonStr); } else if (type === "table") { // Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript) throw new Error("Table deserialization requires apache-arrow library"); } else if (type === "image") { return data; } else if (type === "audio") { return data; } else if (type === "video") { return data; } else if (type === "binary") { return data; } else { throw new Error(`Unknown type: ${type}`); } } // Helper: Upload data to file server async function _upload_to_fileserver(fileserver_url, dataname, data, correlation_id) { /** * Upload data to HTTP file server (plik-like API) * * This function implements the plik one-shot upload mode: * 1. Creates a one-shot upload session by sending POST request with {"OneShot": true} * 2. Uploads the file data as multipart form data * 3. Returns identifiers and download URL for the uploaded file */ log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`); // Step 1: Get upload ID and token const url_getUploadID = `${fileserver_url}/upload`; const headers = { "Content-Type": "application/json" }; const body = JSON.stringify({ OneShot: true }); let response = await fetch(url_getUploadID, { method: "POST", headers: headers, body: body }); if (!response.ok) { throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`); } const responseJson = await response.json(); const uploadid = responseJson.id; const uploadtoken = responseJson.uploadToken; // Step 2: Upload file data const url_upload = `${fileserver_url}/file/${uploadid}`; // Create multipart form data const formData = new FormData(); // Create a Blob from the ArrayBuffer const blob = new Blob([data], { type: "application/octet-stream" }); formData.append("file", blob, dataname); response = await fetch(url_upload, { method: "POST", headers: { "X-UploadToken": uploadtoken }, body: formData }); if (!response.ok) { throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`); } const fileResponseJson = await response.json(); const fileid = fileResponseJson.id; // Build the download URL const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`; log_trace(correlation_id, `Uploaded to URL: ${url}`); return { status: response.status, uploadid: uploadid, fileid: fileid, url: url }; } // Helper: Fetch data from URL with exponential backoff async function _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) { /** * Fetch data from URL with retry logic using exponential backoff */ let delay = base_delay; for (let attempt = 1; attempt <= max_retries; attempt++) { try { const response = await fetch(url); if (response.status === 200) { log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`); const arrayBuffer = await response.arrayBuffer(); return arrayBuffer; } else { throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`); } } catch (e) { log_trace(correlation_id, `Attempt ${attempt} failed: ${e.message}`); if (attempt < max_retries) { // Sleep with exponential backoff await new Promise(resolve => setTimeout(resolve, delay)); delay = Math.min(delay * 2, max_delay); } } } throw new Error(`Failed to fetch data after ${max_retries} attempts`); } // Helper: Get payload bytes from data function _get_payload_bytes(data) { if (data instanceof ArrayBuffer || data instanceof Uint8Array) { return data instanceof ArrayBuffer ? new Uint8Array(data) : data; } else if (typeof data === 'string') { return new TextEncoder().encode(data); } else { // For objects, serialize to JSON return new TextEncoder().encode(JSON.stringify(data)); } } // MessagePayload class class MessagePayload { /** * Represents a single payload in the message envelope * * @param {Object} options - Payload options * @param {string} options.id - ID of this payload (e.g., "uuid4") * @param {string} options.dataname - Name of this payload (e.g., "login_image") * @param {string} options.type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" * @param {string} options.transport - "direct" or "link" * @param {string} options.encoding - "none", "json", "base64", "arrow-ipc" * @param {number} options.size - Data size in bytes * @param {string|ArrayBuffer} options.data - Payload data (direct) or URL (link) * @param {Object} options.metadata - Metadata for this payload */ constructor(options) { this.id = options.id || uuid4(); this.dataname = options.dataname; this.type = options.type; this.transport = options.transport; this.encoding = options.encoding; this.size = options.size; this.data = options.data; this.metadata = options.metadata || {}; } // Convert to JSON object toJSON() { const obj = { id: this.id, dataname: this.dataname, type: this.type, transport: this.transport, encoding: this.encoding, size: this.size }; // Include data based on transport type if (this.transport === "direct" && this.data !== null) { if (this.encoding === "base64" || this.encoding === "json") { obj.data = this.data; } else { // For other encodings, use base64 const payloadBytes = _get_payload_bytes(this.data); obj.data = arrayBufferToBase64(payloadBytes); } } else if (this.transport === "link" && this.data !== null) { // For link transport, data is a URL string obj.data = this.data; } if (Object.keys(this.metadata).length > 0) { obj.metadata = this.metadata; } return obj; } } // MessageEnvelope class class MessageEnvelope { /** * Represents the message envelope containing metadata and payloads * * @param {Object} options - Envelope options * @param {string} options.sendTo - Topic/subject the sender sends to * @param {Array} options.payloads - Array of payloads * @param {string} options.correlationId - Unique identifier to track messages * @param {string} options.msgId - This message id * @param {string} options.timestamp - Message published timestamp * @param {string} options.msgPurpose - Purpose of this message * @param {string} options.senderName - Name of the sender * @param {string} options.senderId - UUID of the sender * @param {string} options.receiverName - Name of the receiver * @param {string} options.receiverId - UUID of the receiver * @param {string} options.replyTo - Topic to reply to * @param {string} options.replyToMsgId - Message id this message is replying to * @param {string} options.brokerURL - NATS server address * @param {Object} options.metadata - Metadata for the envelope */ constructor(options) { this.correlationId = options.correlationId || uuid4(); this.msgId = options.msgId || uuid4(); this.timestamp = options.timestamp || new Date().toISOString(); this.sendTo = options.sendTo; this.msgPurpose = options.msgPurpose || ""; this.senderName = options.senderName || ""; this.senderId = options.senderId || uuid4(); this.receiverName = options.receiverName || ""; this.receiverId = options.receiverId || ""; this.replyTo = options.replyTo || ""; this.replyToMsgId = options.replyToMsgId || ""; this.brokerURL = options.brokerURL || DEFAULT_NATS_URL; this.metadata = options.metadata || {}; this.payloads = options.payloads || []; } // Convert to JSON string toJSON() { const obj = { correlationId: this.correlationId, msgId: this.msgId, timestamp: this.timestamp, sendTo: this.sendTo, msgPurpose: this.msgPurpose, senderName: this.senderName, senderId: this.senderId, receiverName: this.receiverName, receiverId: this.receiverId, replyTo: this.replyTo, replyToMsgId: this.replyToMsgId, brokerURL: this.brokerURL }; if (Object.keys(this.metadata).length > 0) { obj.metadata = this.metadata; } if (this.payloads.length > 0) { obj.payloads = this.payloads.map(p => p.toJSON()); } return obj; } // Convert to JSON string toString() { return JSON.stringify(this.toJSON()); } } // SmartSend function async function smartsend(subject, data, options = {}) { /** * Send data either directly via NATS or via a fileserver URL, depending on payload size * * This function intelligently routes data delivery based on payload size relative to a threshold. * If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS. * Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS. * * @param {string} subject - NATS subject to publish the message to * @param {Array} data - List of {dataname, data, type} objects to send * @param {Object} options - Additional options * @param {string} options.natsUrl - URL of the NATS server (default: "nats://localhost:4222") * @param {string} options.fileserverUrl - Base URL of the file server (default: "http://localhost:8080") * @param {Function} options.fileserverUploadHandler - Function to handle fileserver uploads * @param {number} options.sizeThreshold - Threshold in bytes separating direct vs link transport (default: 1MB) * @param {string} options.correlationId - Optional correlation ID for tracing * @param {string} options.msgPurpose - Purpose of the message (default: "chat") * @param {string} options.senderName - Name of the sender (default: "NATSBridge") * @param {string} options.receiverName - Name of the receiver (default: "") * @param {string} options.receiverId - UUID of the receiver (default: "") * @param {string} options.replyTo - Topic to reply to (default: "") * @param {string} options.replyToMsgId - Message ID this message is replying to (default: "") * * @returns {Promise} - The envelope for tracking */ const { natsUrl = DEFAULT_NATS_URL, fileserverUrl = DEFAULT_FILESERVER_URL, fileserverUploadHandler = _upload_to_fileserver, sizeThreshold = DEFAULT_SIZE_THRESHOLD, correlationId = uuid4(), msgPurpose = "chat", senderName = "NATSBridge", receiverName = "", receiverId = "", replyTo = "", replyToMsgId = "" } = options; log_trace(correlationId, `Starting smartsend for subject: ${subject}`); // Generate message metadata const msgId = uuid4(); // Process each payload in the list const payloads = []; for (const payload of data) { const dataname = payload.dataname; const payloadData = payload.data; const payloadType = payload.type; // Serialize data based on type const payloadBytes = _serialize_data(payloadData, payloadType); const payloadSize = payloadBytes.byteLength; log_trace(correlationId, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); // Decision: Direct vs Link if (payloadSize < sizeThreshold) { // Direct path - Base64 encode and send via NATS const payloadB64 = arrayBufferToBase64(payloadBytes); log_trace(correlationId, `Using direct transport for ${payloadSize} bytes`); // Create MessagePayload for direct transport const payloadObj = new MessagePayload({ dataname: dataname, type: payloadType, transport: "direct", encoding: "base64", size: payloadSize, data: payloadB64, metadata: { payload_bytes: payloadSize } }); payloads.push(payloadObj); } else { // Link path - Upload to HTTP server, send URL via NATS log_trace(correlationId, `Using link transport, uploading to fileserver`); // Upload to HTTP server const response = await fileserverUploadHandler(fileserverUrl, dataname, payloadBytes, correlationId); if (response.status !== 200) { throw new Error(`Failed to upload data to fileserver: ${response.status}`); } const url = response.url; log_trace(correlationId, `Uploaded to URL: ${url}`); // Create MessagePayload for link transport const payloadObj = new MessagePayload({ dataname: dataname, type: payloadType, transport: "link", encoding: "none", size: payloadSize, data: url, metadata: {} }); payloads.push(payloadObj); } } // Create MessageEnvelope with all payloads const env = new MessageEnvelope({ correlationId: correlationId, msgId: msgId, sendTo: subject, msgPurpose: msgPurpose, senderName: senderName, receiverName: receiverName, receiverId: receiverId, replyTo: replyTo, replyToMsgId: replyToMsgId, brokerURL: natsUrl, payloads: payloads }); // Publish message to NATS await publish_message(natsUrl, subject, env.toString(), correlationId); return env; } // Helper: Publish message to NATS async function publish_message(natsUrl, subject, message, correlation_id) { /** * Publish a message to a NATS subject with proper connection management * * @param {string} natsUrl - NATS server URL * @param {string} subject - NATS subject to publish to * @param {string} message - JSON message to publish * @param {string} correlation_id - Correlation ID for logging */ log_trace(correlation_id, `Publishing message to ${subject}`); // For Node.js, we would use nats.js library // This is a placeholder that throws an error // In production, you would import and use the actual nats library // Example with nats.js: // import { connect } from 'nats'; // const nc = await connect({ servers: [natsUrl] }); // await nc.publish(subject, message); // nc.close(); // For now, just log the message console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`); } // SmartReceive function async function smartreceive(msg, options = {}) { /** * Receive and process messages from NATS * * This function processes incoming NATS messages, handling both direct transport * (base64 decoded payloads) and link transport (URL-based payloads). * * @param {Object} msg - NATS message object with payload property * @param {Object} options - Additional options * @param {Function} options.fileserverDownloadHandler - Function to handle downloading data from file server URLs * @param {number} options.maxRetries - Maximum retry attempts for fetching URL (default: 5) * @param {number} options.baseDelay - Initial delay for exponential backoff in ms (default: 100) * @param {number} options.maxDelay - Maximum delay for exponential backoff in ms (default: 5000) * * @returns {Promise} - List of {dataname, data, type} objects */ const { fileserverDownloadHandler = _fetch_with_backoff, maxRetries = 5, baseDelay = 100, maxDelay = 5000 } = options; // Parse the JSON envelope const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload); const json_data = JSON.parse(jsonStr); log_trace(json_data.correlationId, `Processing received message`); // Process all payloads in the envelope const payloads_list = []; // Get number of payloads const num_payloads = json_data.payloads ? json_data.payloads.length : 0; for (let i = 0; i < num_payloads; i++) { const payload = json_data.payloads[i]; const transport = payload.transport; const dataname = payload.dataname; if (transport === "direct") { // Direct transport - payload is in the message log_trace(json_data.correlationId, `Direct transport - decoding payload '${dataname}'`); // Extract base64 payload from the payload const payload_b64 = payload.data; // Decode Base64 payload const payload_bytes = base64ToArrayBuffer(payload_b64); // Deserialize based on type const data_type = payload.type; const data = _deserialize_data(payload_bytes, data_type, json_data.correlationId); payloads_list.push({ dataname, data, type: data_type }); } else if (transport === "link") { // Link transport - payload is at URL const url = payload.data; log_trace(json_data.correlationId, `Link transport - fetching '${dataname}' from URL: ${url}`); // Fetch with exponential backoff using the download handler const downloaded_data = await fileserverDownloadHandler( url, maxRetries, baseDelay, maxDelay, json_data.correlationId ); // Deserialize based on type const data_type = payload.type; const data = _deserialize_data(downloaded_data, data_type, json_data.correlationId); payloads_list.push({ dataname, data, type: data_type }); } else { throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`); } } return payloads_list; } // Export for Node.js if (typeof module !== 'undefined' && module.exports) { module.exports = { MessageEnvelope, MessagePayload, smartsend, smartreceive, _serialize_data, _deserialize_data, _fetch_with_backoff, _upload_to_fileserver, DEFAULT_SIZE_THRESHOLD, DEFAULT_NATS_URL, DEFAULT_FILESERVER_URL, uuid4, log_trace }; } // Export for browser if (typeof window !== 'undefined') { window.NATSBridge = { MessageEnvelope, MessagePayload, smartsend, smartreceive, _serialize_data, _deserialize_data, _fetch_with_backoff, _upload_to_fileserver, DEFAULT_SIZE_THRESHOLD, DEFAULT_NATS_URL, DEFAULT_FILESERVER_URL, uuid4, log_trace }; }