720 lines
27 KiB
JavaScript
720 lines
27 KiB
JavaScript
/**
|
|
* NATSBridge.js - Bi-Directional Data Bridge for JavaScript
|
|
* Implements smartsend and smartreceive for NATS communication
|
|
*
|
|
* This module provides functionality for sending and receiving data across network boundaries
|
|
* using NATS as the message bus, with support for both direct payload transport and
|
|
* URL-based transport for larger payloads.
|
|
*
|
|
* File Server Handler Architecture:
|
|
* The system uses handler functions to abstract file server operations, allowing support
|
|
* for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
|
|
*
|
|
* Handler Function Signatures:
|
|
*
|
|
* ```javascript
|
|
* // Upload handler - uploads data to file server and returns URL
|
|
* // The handler is passed to smartsend as fileserverUploadHandler parameter
|
|
* // It receives: (fileserver_url, dataname, data)
|
|
* // Returns: { status, uploadid, fileid, url }
|
|
* async function fileserverUploadHandler(fileserver_url, dataname, data) { ... }
|
|
*
|
|
* // Download handler - fetches data from file server URL with exponential backoff
|
|
* // The handler is passed to smartreceive as fileserverDownloadHandler parameter
|
|
* // It receives: (url, max_retries, base_delay, max_delay, correlation_id)
|
|
* // Returns: ArrayBuffer (the downloaded data)
|
|
* async function fileserverDownloadHandler(url, max_retries, base_delay, max_delay, correlation_id) { ... }
|
|
* ```
|
|
*
|
|
* Multi-Payload Support (Standard API):
|
|
* The system uses a standardized list-of-tuples format for all payload operations.
|
|
* Even when sending a single payload, the user must wrap it in a list.
|
|
*
|
|
* API Standard:
|
|
* ```javascript
|
|
* // Input format for smartsend (always a list of tuples with type info)
|
|
* [{ dataname, data, type }, ...]
|
|
*
|
|
* // Output format for smartreceive (always returns a list of tuples)
|
|
* [{ dataname, data, type }, ...]
|
|
* ```
|
|
*
|
|
* Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
|
*/
|
|
|
|
// ---------------------------------------------- 100 --------------------------------------------- #
|
|
|
|
// Constants
|
|
const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB - threshold for switching from direct to link transport
|
|
const DEFAULT_NATS_URL = "nats://localhost:4222"; // Default NATS server URL
|
|
const DEFAULT_FILESERVER_URL = "http://localhost:8080"; // Default HTTP file server URL for link transport
|
|
|
|
// Helper: Generate UUID v4
|
|
function uuid4() {
|
|
// Simple UUID v4 generator
|
|
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
|
|
var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
|
|
return v.toString(16);
|
|
});
|
|
}
|
|
|
|
// Helper: Log with correlation ID and timestamp
|
|
function log_trace(correlation_id, message) {
|
|
const timestamp = new Date().toISOString();
|
|
console.log(`[${timestamp}] [Correlation: ${correlation_id}] ${message}`);
|
|
}
|
|
|
|
// Helper: Get size of data in bytes
|
|
function getDataSize(data) {
|
|
if (typeof data === 'string') {
|
|
return new TextEncoder().encode(data).length;
|
|
} else if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
|
return data.byteLength;
|
|
} else if (typeof data === 'object' && data !== null) {
|
|
// For objects, serialize to JSON and measure
|
|
return new TextEncoder().encode(JSON.stringify(data)).length;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
// Helper: Convert ArrayBuffer to Base64 string
|
|
function arrayBufferToBase64(buffer) {
|
|
const bytes = new Uint8Array(buffer);
|
|
let binary = '';
|
|
for (let i = 0; i < bytes.length; i++) {
|
|
binary += String.fromCharCode(bytes[i]);
|
|
}
|
|
return btoa(binary);
|
|
}
|
|
|
|
// Helper: Convert Base64 string to ArrayBuffer
|
|
function base64ToArrayBuffer(base64) {
|
|
const binaryString = atob(base64);
|
|
const len = binaryString.length;
|
|
const bytes = new Uint8Array(len);
|
|
for (let i = 0; i < len; i++) {
|
|
bytes[i] = binaryString.charCodeAt(i);
|
|
}
|
|
return bytes.buffer;
|
|
}
|
|
|
|
// Helper: Serialize data based on type
|
|
function _serialize_data(data, type) {
|
|
/**
|
|
* Serialize data according to specified format
|
|
*
|
|
* Supported formats:
|
|
* - "text": Treats data as text and converts to UTF-8 bytes
|
|
* - "dictionary": Serializes data as JSON and returns the UTF-8 byte representation
|
|
* - "table": Serializes data as an Arrow IPC stream (table format) - NOT IMPLEMENTED (requires arrow library)
|
|
* - "image": Expects binary data (ArrayBuffer) and returns it as bytes
|
|
* - "audio": Expects binary data (ArrayBuffer) and returns it as bytes
|
|
* - "video": Expects binary data (ArrayBuffer) and returns it as bytes
|
|
* - "binary": Generic binary data (ArrayBuffer or Uint8Array) and returns bytes
|
|
*/
|
|
if (type === "text") {
|
|
if (typeof data === 'string') {
|
|
return new TextEncoder().encode(data).buffer;
|
|
} else {
|
|
throw new Error("Text data must be a String");
|
|
}
|
|
} else if (type === "dictionary") {
|
|
// JSON data - serialize directly
|
|
const jsonStr = JSON.stringify(data);
|
|
return new TextEncoder().encode(jsonStr).buffer;
|
|
} else if (type === "table") {
|
|
// Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
|
|
// This would require the apache-arrow library
|
|
throw new Error("Table serialization requires apache-arrow library");
|
|
} else if (type === "image") {
|
|
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
|
return data instanceof ArrayBuffer ? data : data.buffer;
|
|
} else {
|
|
throw new Error("Image data must be ArrayBuffer or Uint8Array");
|
|
}
|
|
} else if (type === "audio") {
|
|
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
|
return data instanceof ArrayBuffer ? data : data.buffer;
|
|
} else {
|
|
throw new Error("Audio data must be ArrayBuffer or Uint8Array");
|
|
}
|
|
} else if (type === "video") {
|
|
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
|
return data instanceof ArrayBuffer ? data : data.buffer;
|
|
} else {
|
|
throw new Error("Video data must be ArrayBuffer or Uint8Array");
|
|
}
|
|
} else if (type === "binary") {
|
|
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
|
return data instanceof ArrayBuffer ? data : data.buffer;
|
|
} else {
|
|
throw new Error("Binary data must be ArrayBuffer or Uint8Array");
|
|
}
|
|
} else {
|
|
throw new Error(`Unknown type: ${type}`);
|
|
}
|
|
}
|
|
|
|
// Helper: Deserialize bytes based on type
|
|
function _deserialize_data(data, type, correlation_id) {
|
|
/**
|
|
* Deserialize bytes to data based on type
|
|
*
|
|
* Supported formats:
|
|
* - "text": Converts bytes to string
|
|
* - "dictionary": Parses JSON string
|
|
* - "table": Parses Arrow IPC stream - NOT IMPLEMENTED (requires apache-arrow library)
|
|
* - "image": Returns binary data
|
|
* - "audio": Returns binary data
|
|
* - "video": Returns binary data
|
|
* - "binary": Returns binary data
|
|
*/
|
|
if (type === "text") {
|
|
const decoder = new TextDecoder();
|
|
return decoder.decode(new Uint8Array(data));
|
|
} else if (type === "dictionary") {
|
|
const decoder = new TextDecoder();
|
|
const jsonStr = decoder.decode(new Uint8Array(data));
|
|
return JSON.parse(jsonStr);
|
|
} else if (type === "table") {
|
|
// Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
|
|
throw new Error("Table deserialization requires apache-arrow library");
|
|
} else if (type === "image") {
|
|
return data;
|
|
} else if (type === "audio") {
|
|
return data;
|
|
} else if (type === "video") {
|
|
return data;
|
|
} else if (type === "binary") {
|
|
return data;
|
|
} else {
|
|
throw new Error(`Unknown type: ${type}`);
|
|
}
|
|
}
|
|
|
|
// Helper: Upload data to file server
|
|
async function _upload_to_fileserver(fileserver_url, dataname, data, correlation_id) {
|
|
/**
|
|
* Upload data to HTTP file server (plik-like API)
|
|
*
|
|
* This function implements the plik one-shot upload mode:
|
|
* 1. Creates a one-shot upload session by sending POST request with {"OneShot": true}
|
|
* 2. Uploads the file data as multipart form data
|
|
* 3. Returns identifiers and download URL for the uploaded file
|
|
*/
|
|
log_trace(correlation_id, `Uploading ${dataname} to fileserver: ${fileserver_url}`);
|
|
|
|
// Step 1: Get upload ID and token
|
|
const url_getUploadID = `${fileserver_url}/upload`;
|
|
const headers = {
|
|
"Content-Type": "application/json"
|
|
};
|
|
const body = JSON.stringify({ OneShot: true });
|
|
|
|
let response = await fetch(url_getUploadID, {
|
|
method: "POST",
|
|
headers: headers,
|
|
body: body
|
|
});
|
|
|
|
if (!response.ok) {
|
|
throw new Error(`Failed to get upload ID: ${response.status} ${response.statusText}`);
|
|
}
|
|
|
|
const responseJson = await response.json();
|
|
const uploadid = responseJson.id;
|
|
const uploadtoken = responseJson.uploadToken;
|
|
|
|
// Step 2: Upload file data
|
|
const url_upload = `${fileserver_url}/file/${uploadid}`;
|
|
|
|
// Create multipart form data
|
|
const formData = new FormData();
|
|
// Create a Blob from the ArrayBuffer
|
|
const blob = new Blob([data], { type: "application/octet-stream" });
|
|
formData.append("file", blob, dataname);
|
|
|
|
response = await fetch(url_upload, {
|
|
method: "POST",
|
|
headers: {
|
|
"X-UploadToken": uploadtoken
|
|
},
|
|
body: formData
|
|
});
|
|
|
|
if (!response.ok) {
|
|
throw new Error(`Failed to upload file: ${response.status} ${response.statusText}`);
|
|
}
|
|
|
|
const fileResponseJson = await response.json();
|
|
const fileid = fileResponseJson.id;
|
|
|
|
// Build the download URL
|
|
const url = `${fileserver_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
|
|
|
|
log_trace(correlation_id, `Uploaded to URL: ${url}`);
|
|
|
|
return {
|
|
status: response.status,
|
|
uploadid: uploadid,
|
|
fileid: fileid,
|
|
url: url
|
|
};
|
|
}
|
|
|
|
// Helper: Fetch data from URL with exponential backoff
|
|
async function _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) {
|
|
/**
|
|
* Fetch data from URL with retry logic using exponential backoff
|
|
*/
|
|
let delay = base_delay;
|
|
|
|
for (let attempt = 1; attempt <= max_retries; attempt++) {
|
|
try {
|
|
const response = await fetch(url);
|
|
|
|
if (response.status === 200) {
|
|
log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`);
|
|
const arrayBuffer = await response.arrayBuffer();
|
|
return arrayBuffer;
|
|
} else {
|
|
throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`);
|
|
}
|
|
} catch (e) {
|
|
log_trace(correlation_id, `Attempt ${attempt} failed: ${e.message}`);
|
|
|
|
if (attempt < max_retries) {
|
|
// Sleep with exponential backoff
|
|
await new Promise(resolve => setTimeout(resolve, delay));
|
|
delay = Math.min(delay * 2, max_delay);
|
|
}
|
|
}
|
|
}
|
|
|
|
throw new Error(`Failed to fetch data after ${max_retries} attempts`);
|
|
}
|
|
|
|
// Helper: Get payload bytes from data
|
|
function _get_payload_bytes(data) {
|
|
if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
|
|
return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
|
|
} else if (typeof data === 'string') {
|
|
return new TextEncoder().encode(data);
|
|
} else {
|
|
// For objects, serialize to JSON
|
|
return new TextEncoder().encode(JSON.stringify(data));
|
|
}
|
|
}
|
|
|
|
// MessagePayload class
|
|
class MessagePayload {
|
|
/**
|
|
* Represents a single payload in the message envelope
|
|
*
|
|
* @param {Object} options - Payload options
|
|
* @param {string} options.id - ID of this payload (e.g., "uuid4")
|
|
* @param {string} options.dataname - Name of this payload (e.g., "login_image")
|
|
* @param {string} options.type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
|
* @param {string} options.transport - "direct" or "link"
|
|
* @param {string} options.encoding - "none", "json", "base64", "arrow-ipc"
|
|
* @param {number} options.size - Data size in bytes
|
|
* @param {string|ArrayBuffer} options.data - Payload data (direct) or URL (link)
|
|
* @param {Object} options.metadata - Metadata for this payload
|
|
*/
|
|
constructor(options) {
|
|
this.id = options.id || uuid4();
|
|
this.dataname = options.dataname;
|
|
this.type = options.type;
|
|
this.transport = options.transport;
|
|
this.encoding = options.encoding;
|
|
this.size = options.size;
|
|
this.data = options.data;
|
|
this.metadata = options.metadata || {};
|
|
}
|
|
|
|
// Convert to JSON object
|
|
toJSON() {
|
|
const obj = {
|
|
id: this.id,
|
|
dataname: this.dataname,
|
|
type: this.type,
|
|
transport: this.transport,
|
|
encoding: this.encoding,
|
|
size: this.size
|
|
};
|
|
|
|
// Include data based on transport type
|
|
if (this.transport === "direct" && this.data !== null) {
|
|
if (this.encoding === "base64" || this.encoding === "json") {
|
|
obj.data = this.data;
|
|
} else {
|
|
// For other encodings, use base64
|
|
const payloadBytes = _get_payload_bytes(this.data);
|
|
obj.data = arrayBufferToBase64(payloadBytes);
|
|
}
|
|
} else if (this.transport === "link" && this.data !== null) {
|
|
// For link transport, data is a URL string
|
|
obj.data = this.data;
|
|
}
|
|
|
|
if (Object.keys(this.metadata).length > 0) {
|
|
obj.metadata = this.metadata;
|
|
}
|
|
|
|
return obj;
|
|
}
|
|
}
|
|
|
|
// MessageEnvelope class
|
|
class MessageEnvelope {
|
|
/**
|
|
* Represents the message envelope containing metadata and payloads
|
|
*
|
|
* @param {Object} options - Envelope options
|
|
* @param {string} options.sendTo - Topic/subject the sender sends to
|
|
* @param {Array<MessagePayload>} options.payloads - Array of payloads
|
|
* @param {string} options.correlationId - Unique identifier to track messages
|
|
* @param {string} options.msgId - This message id
|
|
* @param {string} options.timestamp - Message published timestamp
|
|
* @param {string} options.msgPurpose - Purpose of this message
|
|
* @param {string} options.senderName - Name of the sender
|
|
* @param {string} options.senderId - UUID of the sender
|
|
* @param {string} options.receiverName - Name of the receiver
|
|
* @param {string} options.receiverId - UUID of the receiver
|
|
* @param {string} options.replyTo - Topic to reply to
|
|
* @param {string} options.replyToMsgId - Message id this message is replying to
|
|
* @param {string} options.brokerURL - NATS server address
|
|
* @param {Object} options.metadata - Metadata for the envelope
|
|
*/
|
|
constructor(options) {
|
|
this.correlationId = options.correlationId || uuid4();
|
|
this.msgId = options.msgId || uuid4();
|
|
this.timestamp = options.timestamp || new Date().toISOString();
|
|
this.sendTo = options.sendTo;
|
|
this.msgPurpose = options.msgPurpose || "";
|
|
this.senderName = options.senderName || "";
|
|
this.senderId = options.senderId || uuid4();
|
|
this.receiverName = options.receiverName || "";
|
|
this.receiverId = options.receiverId || "";
|
|
this.replyTo = options.replyTo || "";
|
|
this.replyToMsgId = options.replyToMsgId || "";
|
|
this.brokerURL = options.brokerURL || DEFAULT_NATS_URL;
|
|
this.metadata = options.metadata || {};
|
|
this.payloads = options.payloads || [];
|
|
}
|
|
|
|
// Convert to JSON string
|
|
toJSON() {
|
|
const obj = {
|
|
correlationId: this.correlationId,
|
|
msgId: this.msgId,
|
|
timestamp: this.timestamp,
|
|
sendTo: this.sendTo,
|
|
msgPurpose: this.msgPurpose,
|
|
senderName: this.senderName,
|
|
senderId: this.senderId,
|
|
receiverName: this.receiverName,
|
|
receiverId: this.receiverId,
|
|
replyTo: this.replyTo,
|
|
replyToMsgId: this.replyToMsgId,
|
|
brokerURL: this.brokerURL
|
|
};
|
|
|
|
if (Object.keys(this.metadata).length > 0) {
|
|
obj.metadata = this.metadata;
|
|
}
|
|
|
|
if (this.payloads.length > 0) {
|
|
obj.payloads = this.payloads.map(p => p.toJSON());
|
|
}
|
|
|
|
return obj;
|
|
}
|
|
|
|
// Convert to JSON string
|
|
toString() {
|
|
return JSON.stringify(this.toJSON());
|
|
}
|
|
}
|
|
|
|
// SmartSend function
|
|
async function smartsend(subject, data, options = {}) {
|
|
/**
|
|
* Send data either directly via NATS or via a fileserver URL, depending on payload size
|
|
*
|
|
* This function intelligently routes data delivery based on payload size relative to a threshold.
|
|
* If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS.
|
|
* Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS.
|
|
*
|
|
* @param {string} subject - NATS subject to publish the message to
|
|
* @param {Array} data - List of {dataname, data, type} objects to send
|
|
* @param {Object} options - Additional options
|
|
* @param {string} options.natsUrl - URL of the NATS server (default: "nats://localhost:4222")
|
|
* @param {string} options.fileserverUrl - Base URL of the file server (default: "http://localhost:8080")
|
|
* @param {Function} options.fileserverUploadHandler - Function to handle fileserver uploads
|
|
* @param {number} options.sizeThreshold - Threshold in bytes separating direct vs link transport (default: 1MB)
|
|
* @param {string} options.correlationId - Optional correlation ID for tracing
|
|
* @param {string} options.msgPurpose - Purpose of the message (default: "chat")
|
|
* @param {string} options.senderName - Name of the sender (default: "NATSBridge")
|
|
* @param {string} options.receiverName - Name of the receiver (default: "")
|
|
* @param {string} options.receiverId - UUID of the receiver (default: "")
|
|
* @param {string} options.replyTo - Topic to reply to (default: "")
|
|
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "")
|
|
* @param {boolean} options.isPublish - Whether to automatically publish the message to NATS (default: true)
|
|
*
|
|
* @returns {Promise<Object>} - An object with { env: MessageEnvelope, env_json_str: string }
|
|
*/
|
|
const {
|
|
natsUrl = DEFAULT_NATS_URL,
|
|
fileserverUrl = DEFAULT_FILESERVER_URL,
|
|
fileserverUploadHandler = _upload_to_fileserver,
|
|
sizeThreshold = DEFAULT_SIZE_THRESHOLD,
|
|
correlationId = uuid4(),
|
|
msgPurpose = "chat",
|
|
senderName = "NATSBridge",
|
|
receiverName = "",
|
|
receiverId = "",
|
|
replyTo = "",
|
|
replyToMsgId = "",
|
|
isPublish = true // Whether to automatically publish the message to NATS
|
|
} = options;
|
|
|
|
log_trace(correlationId, `Starting smartsend for subject: ${subject}`);
|
|
|
|
// Generate message metadata
|
|
const msgId = uuid4();
|
|
|
|
// Process each payload in the list
|
|
const payloads = [];
|
|
|
|
for (const payload of data) {
|
|
const dataname = payload.dataname;
|
|
const payloadData = payload.data;
|
|
const payloadType = payload.type;
|
|
|
|
// Serialize data based on type
|
|
const payloadBytes = _serialize_data(payloadData, payloadType);
|
|
const payloadSize = payloadBytes.byteLength;
|
|
|
|
log_trace(correlationId, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);
|
|
|
|
// Decision: Direct vs Link
|
|
if (payloadSize < sizeThreshold) {
|
|
// Direct path - Base64 encode and send via NATS
|
|
const payloadB64 = arrayBufferToBase64(payloadBytes);
|
|
log_trace(correlationId, `Using direct transport for ${payloadSize} bytes`);
|
|
|
|
// Create MessagePayload for direct transport
|
|
const payloadObj = new MessagePayload({
|
|
dataname: dataname,
|
|
type: payloadType,
|
|
transport: "direct",
|
|
encoding: "base64",
|
|
size: payloadSize,
|
|
data: payloadB64,
|
|
metadata: { payload_bytes: payloadSize }
|
|
});
|
|
payloads.push(payloadObj);
|
|
} else {
|
|
// Link path - Upload to HTTP server, send URL via NATS
|
|
log_trace(correlationId, `Using link transport, uploading to fileserver`);
|
|
|
|
// Upload to HTTP server
|
|
const response = await fileserverUploadHandler(fileserverUrl, dataname, payloadBytes, correlationId);
|
|
|
|
if (response.status !== 200) {
|
|
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
|
|
}
|
|
|
|
const url = response.url;
|
|
log_trace(correlationId, `Uploaded to URL: ${url}`);
|
|
|
|
// Create MessagePayload for link transport
|
|
const payloadObj = new MessagePayload({
|
|
dataname: dataname,
|
|
type: payloadType,
|
|
transport: "link",
|
|
encoding: "none",
|
|
size: payloadSize,
|
|
data: url,
|
|
metadata: {}
|
|
});
|
|
payloads.push(payloadObj);
|
|
}
|
|
}
|
|
|
|
// Create MessageEnvelope with all payloads
|
|
const env = new MessageEnvelope({
|
|
correlationId: correlationId,
|
|
msgId: msgId,
|
|
sendTo: subject,
|
|
msgPurpose: msgPurpose,
|
|
senderName: senderName,
|
|
receiverName: receiverName,
|
|
receiverId: receiverId,
|
|
replyTo: replyTo,
|
|
replyToMsgId: replyToMsgId,
|
|
brokerURL: natsUrl,
|
|
payloads: payloads
|
|
});
|
|
|
|
// Convert envelope to JSON string
|
|
const env_json_str = env.toString();
|
|
|
|
// Publish to NATS if isPublish is true
|
|
if (isPublish) {
|
|
await publish_message(natsUrl, subject, env_json_str, correlationId);
|
|
}
|
|
|
|
// Return both envelope and JSON string (tuple-like structure)
|
|
return {
|
|
env: env,
|
|
env_json_str: env_json_str
|
|
};
|
|
}
|
|
|
|
// Helper: Publish message to NATS
|
|
async function publish_message(natsUrl, subject, message, correlation_id) {
|
|
/**
|
|
* Publish a message to a NATS subject with proper connection management
|
|
*
|
|
* @param {string} natsUrl - NATS server URL
|
|
* @param {string} subject - NATS subject to publish to
|
|
* @param {string} message - JSON message to publish
|
|
* @param {string} correlation_id - Correlation ID for logging
|
|
*/
|
|
log_trace(correlation_id, `Publishing message to ${subject}`);
|
|
|
|
// For Node.js, we would use nats.js library
|
|
// This is a placeholder that throws an error
|
|
// In production, you would import and use the actual nats library
|
|
|
|
// Example with nats.js:
|
|
// import { connect } from 'nats';
|
|
// const nc = await connect({ servers: [natsUrl] });
|
|
// await nc.publish(subject, message);
|
|
// nc.close();
|
|
|
|
// For now, just log the message
|
|
console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`);
|
|
}
|
|
|
|
// SmartReceive function
|
|
async function smartreceive(msg, options = {}) {
|
|
/**
|
|
* Receive and process messages from NATS
|
|
*
|
|
* This function processes incoming NATS messages, handling both direct transport
|
|
* (base64 decoded payloads) and link transport (URL-based payloads).
|
|
*
|
|
* @param {Object} msg - NATS message object with payload property
|
|
* @param {Object} options - Additional options
|
|
* @param {Function} options.fileserverDownloadHandler - Function to handle downloading data from file server URLs
|
|
* @param {number} options.maxRetries - Maximum retry attempts for fetching URL (default: 5)
|
|
* @param {number} options.baseDelay - Initial delay for exponential backoff in ms (default: 100)
|
|
* @param {number} options.maxDelay - Maximum delay for exponential backoff in ms (default: 5000)
|
|
*
|
|
* @returns {Promise<Object>} - Envelope dictionary with metadata and payloads field containing list of {dataname, data, type} objects
|
|
*/
|
|
const {
|
|
fileserverDownloadHandler = _fetch_with_backoff,
|
|
maxRetries = 5,
|
|
baseDelay = 100,
|
|
maxDelay = 5000
|
|
} = options;
|
|
|
|
// Parse the JSON envelope
|
|
const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
|
|
const json_data = JSON.parse(jsonStr);
|
|
|
|
log_trace(json_data.correlationId, `Processing received message`);
|
|
|
|
// Process all payloads in the envelope
|
|
const payloads_list = [];
|
|
|
|
// Get number of payloads
|
|
const num_payloads = json_data.payloads ? json_data.payloads.length : 0;
|
|
|
|
for (let i = 0; i < num_payloads; i++) {
|
|
const payload = json_data.payloads[i];
|
|
const transport = payload.transport;
|
|
const dataname = payload.dataname;
|
|
|
|
if (transport === "direct") {
|
|
// Direct transport - payload is in the message
|
|
log_trace(json_data.correlationId, `Direct transport - decoding payload '${dataname}'`);
|
|
|
|
// Extract base64 payload from the payload
|
|
const payload_b64 = payload.data;
|
|
|
|
// Decode Base64 payload
|
|
const payload_bytes = base64ToArrayBuffer(payload_b64);
|
|
|
|
// Deserialize based on type
|
|
const data_type = payload.type;
|
|
const data = _deserialize_data(payload_bytes, data_type, json_data.correlationId);
|
|
|
|
payloads_list.push({ dataname, data, type: data_type });
|
|
} else if (transport === "link") {
|
|
// Link transport - payload is at URL
|
|
const url = payload.data;
|
|
log_trace(json_data.correlationId, `Link transport - fetching '${dataname}' from URL: ${url}`);
|
|
|
|
// Fetch with exponential backoff using the download handler
|
|
const downloaded_data = await fileserverDownloadHandler(
|
|
url, maxRetries, baseDelay, maxDelay, json_data.correlationId
|
|
);
|
|
|
|
// Deserialize based on type
|
|
const data_type = payload.type;
|
|
const data = _deserialize_data(downloaded_data, data_type, json_data.correlationId);
|
|
|
|
payloads_list.push({ dataname, data, type: data_type });
|
|
} else {
|
|
throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`);
|
|
}
|
|
}
|
|
|
|
// Replace payloads array with the processed list of {dataname, data, type} tuples
|
|
json_data.payloads = payloads_list;
|
|
|
|
return json_data;
|
|
}
|
|
|
|
// Export for Node.js
|
|
if (typeof module !== 'undefined' && module.exports) {
|
|
module.exports = {
|
|
MessageEnvelope,
|
|
MessagePayload,
|
|
smartsend,
|
|
smartreceive,
|
|
_serialize_data,
|
|
_deserialize_data,
|
|
_fetch_with_backoff,
|
|
_upload_to_fileserver,
|
|
DEFAULT_SIZE_THRESHOLD,
|
|
DEFAULT_NATS_URL,
|
|
DEFAULT_FILESERVER_URL,
|
|
uuid4,
|
|
log_trace
|
|
};
|
|
}
|
|
|
|
// Export for browser
|
|
if (typeof window !== 'undefined') {
|
|
window.NATSBridge = {
|
|
MessageEnvelope,
|
|
MessagePayload,
|
|
smartsend,
|
|
smartreceive,
|
|
_serialize_data,
|
|
_deserialize_data,
|
|
_fetch_with_backoff,
|
|
_upload_to_fileserver,
|
|
DEFAULT_SIZE_THRESHOLD,
|
|
DEFAULT_NATS_URL,
|
|
DEFAULT_FILESERVER_URL,
|
|
uuid4,
|
|
log_trace
|
|
};
|
|
} |