diff --git a/src/NATSBridge.js b/src/NATSBridge.js index 536e123..64b1c91 100644 --- a/src/NATSBridge.js +++ b/src/NATSBridge.js @@ -98,6 +98,26 @@ function base64ToArrayBuffer(base64) { return bytes.buffer; } +// Helper: Convert Uint8Array to Base64 string +function uint8ArrayToBase64(uint8array) { + let binary = ''; + for (let i = 0; i < uint8array.byteLength; i++) { + binary += String.fromCharCode(uint8array[i]); + } + return btoa(binary); +} + +// Helper: Convert Base64 string to Uint8Array +function base64ToUint8Array(base64) { + const binaryString = atob(base64); + const len = binaryString.length; + const bytes = new Uint8Array(len); + for (let i = 0; i < len; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + return bytes; +} + // Helper: Serialize data based on type function _serialize_data(data, type) { /** @@ -114,39 +134,39 @@ function _serialize_data(data, type) { */ if (type === "text") { if (typeof data === 'string') { - return new TextEncoder().encode(data).buffer; + return new TextEncoder().encode(data); } else { throw new Error("Text data must be a String"); } } else if (type === "dictionary") { // JSON data - serialize directly const jsonStr = JSON.stringify(data); - return new TextEncoder().encode(jsonStr).buffer; + return new TextEncoder().encode(jsonStr); } else if (type === "table") { // Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript) // This would require the apache-arrow library throw new Error("Table serialization requires apache-arrow library"); } else if (type === "image") { if (data instanceof ArrayBuffer || data instanceof Uint8Array) { - return data instanceof ArrayBuffer ? data : data.buffer; + return data instanceof ArrayBuffer ? new Uint8Array(data) : data; } else { throw new Error("Image data must be ArrayBuffer or Uint8Array"); } } else if (type === "audio") { if (data instanceof ArrayBuffer || data instanceof Uint8Array) { - return data instanceof ArrayBuffer ? data : data.buffer; + return data instanceof ArrayBuffer ? new Uint8Array(data) : data; } else { throw new Error("Audio data must be ArrayBuffer or Uint8Array"); } } else if (type === "video") { if (data instanceof ArrayBuffer || data instanceof Uint8Array) { - return data instanceof ArrayBuffer ? data : data.buffer; + return data instanceof ArrayBuffer ? new Uint8Array(data) : data; } else { throw new Error("Video data must be ArrayBuffer or Uint8Array"); } } else if (type === "binary") { if (data instanceof ArrayBuffer || data instanceof Uint8Array) { - return data instanceof ArrayBuffer ? data : data.buffer; + return data instanceof ArrayBuffer ? new Uint8Array(data) : data; } else { throw new Error("Binary data must be ArrayBuffer or Uint8Array"); } @@ -171,10 +191,10 @@ function _deserialize_data(data, type, correlation_id) { */ if (type === "text") { const decoder = new TextDecoder(); - return decoder.decode(new Uint8Array(data)); + return decoder.decode(data); } else if (type === "dictionary") { const decoder = new TextDecoder(); - const jsonStr = decoder.decode(new Uint8Array(data)); + const jsonStr = decoder.decode(data); return JSON.parse(jsonStr); } else if (type === "table") { // Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript) @@ -230,7 +250,7 @@ async function _upload_to_fileserver(fileserver_url, dataname, data, correlation // Create multipart form data const formData = new FormData(); - // Create a Blob from the ArrayBuffer + // Create a Blob from the Uint8Array const blob = new Blob([data], { type: "application/octet-stream" }); formData.append("file", blob, dataname); @@ -276,7 +296,7 @@ async function _fetch_with_backoff(url, max_retries, base_delay, max_delay, corr if (response.status === 200) { log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`); const arrayBuffer = await response.arrayBuffer(); - return arrayBuffer; + return new Uint8Array(arrayBuffer); } else { throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`); } @@ -306,25 +326,26 @@ function _get_payload_bytes(data) { } } -// MessagePayload class +// MessagePayload class - matches msg_payload_v1 Julia struct class MessagePayload { /** * Represents a single payload in the message envelope + * Matches Julia's msg_payload_v1 struct * * @param {Object} options - Payload options * @param {string} options.id - ID of this payload (e.g., "uuid4") * @param {string} options.dataname - Name of this payload (e.g., "login_image") - * @param {string} options.type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" + * @param {string} options.payload_type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" * @param {string} options.transport - "direct" or "link" * @param {string} options.encoding - "none", "json", "base64", "arrow-ipc" * @param {number} options.size - Data size in bytes - * @param {string|ArrayBuffer} options.data - Payload data (direct) or URL (link) + * @param {string|Uint8Array} options.data - Payload data (Uint8Array for direct, URL string for link) * @param {Object} options.metadata - Metadata for this payload */ constructor(options) { this.id = options.id || uuid4(); this.dataname = options.dataname; - this.type = options.type; + this.payload_type = options.payload_type; this.transport = options.transport; this.encoding = options.encoding; this.size = options.size; @@ -332,27 +353,27 @@ class MessagePayload { this.metadata = options.metadata || {}; } - // Convert to JSON object + // Convert to JSON object - uses snake_case to match Julia API toJSON() { const obj = { id: this.id, dataname: this.dataname, - type: this.type, + payload_type: this.payload_type, transport: this.transport, encoding: this.encoding, size: this.size }; // Include data based on transport type - if (this.transport === "direct" && this.data !== null) { + if (this.transport === "direct" && this.data !== null && this.data !== undefined) { if (this.encoding === "base64" || this.encoding === "json") { obj.data = this.data; } else { // For other encodings, use base64 const payloadBytes = _get_payload_bytes(this.data); - obj.data = arrayBufferToBase64(payloadBytes); + obj.data = uint8ArrayToBase64(payloadBytes); } - } else if (this.transport === "link" && this.data !== null) { + } else if (this.transport === "link" && this.data !== null && this.data !== undefined) { // For link transport, data is a URL string obj.data = this.data; } @@ -365,59 +386,60 @@ class MessagePayload { } } -// MessageEnvelope class +// MessageEnvelope class - matches msg_envelope_v1 Julia struct class MessageEnvelope { /** * Represents the message envelope containing metadata and payloads + * Matches Julia's msg_envelope_v1 struct * * @param {Object} options - Envelope options - * @param {string} options.sendTo - Topic/subject the sender sends to - * @param {Array} options.payloads - Array of payloads - * @param {string} options.correlationId - Unique identifier to track messages - * @param {string} options.msgId - This message id + * @param {string} options.correlation_id - Unique identifier to track messages + * @param {string} options.msg_id - This message id * @param {string} options.timestamp - Message published timestamp - * @param {string} options.msgPurpose - Purpose of this message - * @param {string} options.senderName - Name of the sender - * @param {string} options.senderId - UUID of the sender - * @param {string} options.receiverName - Name of the receiver - * @param {string} options.receiverId - UUID of the receiver - * @param {string} options.replyTo - Topic to reply to - * @param {string} options.replyToMsgId - Message id this message is replying to - * @param {string} options.brokerURL - NATS server address + * @param {string} options.send_to - Topic/subject the sender sends to + * @param {string} options.msg_purpose - Purpose of this message + * @param {string} options.sender_name - Name of the sender + * @param {string} options.sender_id - UUID of the sender + * @param {string} options.receiver_name - Name of the receiver + * @param {string} options.receiver_id - UUID of the receiver + * @param {string} options.reply_to - Topic to reply to + * @param {string} options.reply_to_msg_id - Message id this message is replying to + * @param {string} options.broker_url - NATS server address * @param {Object} options.metadata - Metadata for the envelope + * @param {Array} options.payloads - Array of payloads */ constructor(options) { - this.correlationId = options.correlationId || uuid4(); - this.msgId = options.msgId || uuid4(); + this.correlation_id = options.correlation_id || uuid4(); + this.msg_id = options.msg_id || uuid4(); this.timestamp = options.timestamp || new Date().toISOString(); - this.sendTo = options.sendTo; - this.msgPurpose = options.msgPurpose || ""; - this.senderName = options.senderName || ""; - this.senderId = options.senderId || uuid4(); - this.receiverName = options.receiverName || ""; - this.receiverId = options.receiverId || ""; - this.replyTo = options.replyTo || ""; - this.replyToMsgId = options.replyToMsgId || ""; - this.brokerURL = options.brokerURL || DEFAULT_NATS_URL; + this.send_to = options.send_to; + this.msg_purpose = options.msg_purpose || ""; + this.sender_name = options.sender_name || ""; + this.sender_id = options.sender_id || uuid4(); + this.receiver_name = options.receiver_name || ""; + this.receiver_id = options.receiver_id || ""; + this.reply_to = options.reply_to || ""; + this.reply_to_msg_id = options.reply_to_msg_id || ""; + this.broker_url = options.broker_url || DEFAULT_NATS_URL; this.metadata = options.metadata || {}; this.payloads = options.payloads || []; } - // Convert to JSON string + // Convert to JSON object - uses snake_case to match Julia API toJSON() { const obj = { - correlationId: this.correlationId, - msgId: this.msgId, + correlation_id: this.correlation_id, + msg_id: this.msg_id, timestamp: this.timestamp, - sendTo: this.sendTo, - msgPurpose: this.msgPurpose, - senderName: this.senderName, - senderId: this.senderId, - receiverName: this.receiverName, - receiverId: this.receiverId, - replyTo: this.replyTo, - replyToMsgId: this.replyToMsgId, - brokerURL: this.brokerURL + send_to: this.send_to, + msg_purpose: this.msg_purpose, + sender_name: this.sender_name, + sender_id: this.sender_id, + receiver_name: this.receiver_name, + receiver_id: this.receiver_id, + reply_to: this.reply_to, + reply_to_msg_id: this.reply_to_msg_id, + broker_url: this.broker_url }; if (Object.keys(this.metadata).length > 0) { @@ -437,7 +459,7 @@ class MessageEnvelope { } } -// SmartSend function +// SmartSend function - matches Julia smartsend signature and behavior async function smartsend(subject, data, options = {}) { /** * Send data either directly via NATS or via a fileserver URL, depending on payload size @@ -447,42 +469,42 @@ async function smartsend(subject, data, options = {}) { * Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS. * * @param {string} subject - NATS subject to publish the message to - * @param {Array} data - List of {dataname, data, type} objects to send + * @param {Array} data - List of {dataname, data, type} objects to send (must be a list, even for single payload) * @param {Object} options - Additional options - * @param {string} options.natsUrl - URL of the NATS server (default: "nats://localhost:4222") - * @param {string} options.fileserverUrl - Base URL of the file server (default: "http://localhost:8080") - * @param {Function} options.fileserverUploadHandler - Function to handle fileserver uploads - * @param {number} options.sizeThreshold - Threshold in bytes separating direct vs link transport (default: 1MB) - * @param {string} options.correlationId - Optional correlation ID for tracing - * @param {string} options.msgPurpose - Purpose of the message (default: "chat") - * @param {string} options.senderName - Name of the sender (default: "NATSBridge") - * @param {string} options.receiverName - Name of the receiver (default: "") - * @param {string} options.receiverId - UUID of the receiver (default: "") - * @param {string} options.replyTo - Topic to reply to (default: "") - * @param {string} options.replyToMsgId - Message ID this message is replying to (default: "") - * @param {boolean} options.isPublish - Whether to automatically publish the message to NATS (default: true) + * @param {string} options.broker_url - URL of the NATS server (default: "nats://localhost:4222") + * @param {string} options.fileserver_url - Base URL of the file server (default: "http://localhost:8080") + * @param {Function} options.fileserver_upload_handler - Function to handle fileserver uploads + * @param {number} options.size_threshold - Threshold in bytes separating direct vs link transport (default: 1MB) + * @param {string} options.correlation_id - Optional correlation ID for tracing + * @param {string} options.msg_purpose - Purpose of the message (default: "chat") + * @param {string} options.sender_name - Name of the sender (default: "NATSBridge") + * @param {string} options.receiver_name - Name of the receiver (default: "") + * @param {string} options.receiver_id - UUID of the receiver (default: "") + * @param {string} options.reply_to - Topic to reply to (default: "") + * @param {string} options.reply_to_msg_id - Message ID this message is replying to (default: "") + * @param {boolean} options.is_publish - Whether to automatically publish the message to NATS (default: true) * - * @returns {Promise} - An object with { env: MessageEnvelope, env_json_str: string } + * @returns {Promise} - A tuple-like object with { env: MessageEnvelope, env_json_str: string } */ const { - natsUrl = DEFAULT_NATS_URL, - fileserverUrl = DEFAULT_FILESERVER_URL, - fileserverUploadHandler = _upload_to_fileserver, - sizeThreshold = DEFAULT_SIZE_THRESHOLD, - correlationId = uuid4(), - msgPurpose = "chat", - senderName = "NATSBridge", - receiverName = "", - receiverId = "", - replyTo = "", - replyToMsgId = "", - isPublish = true // Whether to automatically publish the message to NATS + broker_url = DEFAULT_NATS_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler = _upload_to_fileserver, + size_threshold = DEFAULT_SIZE_THRESHOLD, + correlation_id = uuid4(), + msg_purpose = "chat", + sender_name = "NATSBridge", + receiver_name = "", + receiver_id = "", + reply_to = "", + reply_to_msg_id = "", + is_publish = true // Whether to automatically publish the message to NATS } = options; - log_trace(correlationId, `Starting smartsend for subject: ${subject}`); + log_trace(correlation_id, `Starting smartsend for subject: ${subject}`); // Generate message metadata - const msgId = uuid4(); + const msg_id = uuid4(); // Process each payload in the list const payloads = []; @@ -496,18 +518,18 @@ async function smartsend(subject, data, options = {}) { const payloadBytes = _serialize_data(payloadData, payloadType); const payloadSize = payloadBytes.byteLength; - log_trace(correlationId, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); + log_trace(correlation_id, `Serialized payload '${dataname}' (payload_type: ${payloadType}) size: ${payloadSize} bytes`); // Decision: Direct vs Link - if (payloadSize < sizeThreshold) { + if (payloadSize < size_threshold) { // Direct path - Base64 encode and send via NATS - const payloadB64 = arrayBufferToBase64(payloadBytes); - log_trace(correlationId, `Using direct transport for ${payloadSize} bytes`); + const payloadB64 = uint8ArrayToBase64(payloadBytes); + log_trace(correlation_id, `Using direct transport for ${payloadSize} bytes`); // Create MessagePayload for direct transport const payloadObj = new MessagePayload({ dataname: dataname, - type: payloadType, + payload_type: payloadType, transport: "direct", encoding: "base64", size: payloadSize, @@ -517,22 +539,22 @@ async function smartsend(subject, data, options = {}) { payloads.push(payloadObj); } else { // Link path - Upload to HTTP server, send URL via NATS - log_trace(correlationId, `Using link transport, uploading to fileserver`); + log_trace(correlation_id, `Using link transport, uploading to fileserver`); // Upload to HTTP server - const response = await fileserverUploadHandler(fileserverUrl, dataname, payloadBytes, correlationId); + const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes, correlation_id); if (response.status !== 200) { throw new Error(`Failed to upload data to fileserver: ${response.status}`); } const url = response.url; - log_trace(correlationId, `Uploaded to URL: ${url}`); + log_trace(correlation_id, `Uploaded to URL: ${url}`); // Create MessagePayload for link transport const payloadObj = new MessagePayload({ dataname: dataname, - type: payloadType, + payload_type: payloadType, transport: "link", encoding: "none", size: payloadSize, @@ -545,16 +567,16 @@ async function smartsend(subject, data, options = {}) { // Create MessageEnvelope with all payloads const env = new MessageEnvelope({ - correlationId: correlationId, - msgId: msgId, - sendTo: subject, - msgPurpose: msgPurpose, - senderName: senderName, - receiverName: receiverName, - receiverId: receiverId, - replyTo: replyTo, - replyToMsgId: replyToMsgId, - brokerURL: natsUrl, + correlation_id: correlation_id, + msg_id: msg_id, + send_to: subject, + msg_purpose: msg_purpose, + sender_name: sender_name, + receiver_name: receiver_name, + receiver_id: receiver_id, + reply_to: reply_to, + reply_to_msg_id: reply_to_msg_id, + broker_url: broker_url, payloads: payloads }); @@ -562,11 +584,11 @@ async function smartsend(subject, data, options = {}) { const env_json_str = env.toString(); // Publish to NATS if isPublish is true - if (isPublish) { - await publish_message(natsUrl, subject, env_json_str, correlationId); + if (is_publish) { + await publish_message(broker_url, subject, env_json_str, correlation_id); } - // Return both envelope and JSON string (tuple-like structure) + // Return both envelope and JSON string (tuple-like structure, matching Julia API) return { env: env, env_json_str: env_json_str @@ -574,11 +596,11 @@ async function smartsend(subject, data, options = {}) { } // Helper: Publish message to NATS -async function publish_message(natsUrl, subject, message, correlation_id) { +async function publish_message(broker_url, subject, message, correlation_id) { /** * Publish a message to a NATS subject with proper connection management * - * @param {string} natsUrl - NATS server URL + * @param {string} broker_url - NATS server URL * @param {string} subject - NATS subject to publish to * @param {string} message - JSON message to publish * @param {string} correlation_id - Correlation ID for logging @@ -591,7 +613,7 @@ async function publish_message(natsUrl, subject, message, correlation_id) { // Example with nats.js: // import { connect } from 'nats'; - // const nc = await connect({ servers: [natsUrl] }); + // const nc = await connect({ servers: [broker_url] }); // await nc.publish(subject, message); // nc.close(); @@ -599,7 +621,7 @@ async function publish_message(natsUrl, subject, message, correlation_id) { console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`); } -// SmartReceive function +// SmartReceive function - matches Julia smartreceive signature and behavior async function smartreceive(msg, options = {}) { /** * Receive and process messages from NATS @@ -609,25 +631,25 @@ async function smartreceive(msg, options = {}) { * * @param {Object} msg - NATS message object with payload property * @param {Object} options - Additional options - * @param {Function} options.fileserverDownloadHandler - Function to handle downloading data from file server URLs - * @param {number} options.maxRetries - Maximum retry attempts for fetching URL (default: 5) - * @param {number} options.baseDelay - Initial delay for exponential backoff in ms (default: 100) - * @param {number} options.maxDelay - Maximum delay for exponential backoff in ms (default: 5000) + * @param {Function} options.fileserver_download_handler - Function to handle downloading data from file server URLs + * @param {number} options.max_retries - Maximum retry attempts for fetching URL (default: 5) + * @param {number} options.base_delay - Initial delay for exponential backoff in ms (default: 100) + * @param {number} options.max_delay - Maximum delay for exponential backoff in ms (default: 5000) * - * @returns {Promise} - Envelope dictionary with metadata and payloads field containing list of {dataname, data, type} objects + * @returns {Promise} - JSON object of envelope with payloads field containing list of {dataname, data, type} tuples */ const { - fileserverDownloadHandler = _fetch_with_backoff, - maxRetries = 5, - baseDelay = 100, - maxDelay = 5000 + fileserver_download_handler = _fetch_with_backoff, + max_retries = 5, + base_delay = 100, + max_delay = 5000 } = options; // Parse the JSON envelope const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload); const json_data = JSON.parse(jsonStr); - log_trace(json_data.correlationId, `Processing received message`); + log_trace(json_data.correlation_id, `Processing received message`); // Process all payloads in the envelope const payloads_list = []; @@ -642,32 +664,32 @@ async function smartreceive(msg, options = {}) { if (transport === "direct") { // Direct transport - payload is in the message - log_trace(json_data.correlationId, `Direct transport - decoding payload '${dataname}'`); + log_trace(json_data.correlation_id, `Direct transport - decoding payload '${dataname}'`); // Extract base64 payload from the payload const payload_b64 = payload.data; // Decode Base64 payload - const payload_bytes = base64ToArrayBuffer(payload_b64); + const payload_bytes = base64ToUint8Array(payload_b64); // Deserialize based on type - const data_type = payload.type; - const data = _deserialize_data(payload_bytes, data_type, json_data.correlationId); + const data_type = payload.payload_type; + const data = _deserialize_data(payload_bytes, data_type, json_data.correlation_id); payloads_list.push({ dataname, data, type: data_type }); } else if (transport === "link") { // Link transport - payload is at URL const url = payload.data; - log_trace(json_data.correlationId, `Link transport - fetching '${dataname}' from URL: ${url}`); + log_trace(json_data.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`); // Fetch with exponential backoff using the download handler - const downloaded_data = await fileserverDownloadHandler( - url, maxRetries, baseDelay, maxDelay, json_data.correlationId + const downloaded_data = await fileserver_download_handler( + url, max_retries, base_delay, max_delay, json_data.correlation_id ); // Deserialize based on type - const data_type = payload.type; - const data = _deserialize_data(downloaded_data, data_type, json_data.correlationId); + const data_type = payload.payload_type; + const data = _deserialize_data(downloaded_data, data_type, json_data.correlation_id); payloads_list.push({ dataname, data, type: data_type }); } else { @@ -676,11 +698,69 @@ async function smartreceive(msg, options = {}) { } // Replace payloads array with the processed list of {dataname, data, type} tuples + // This matches Julia's smartreceive return format json_data.payloads = payloads_list; return json_data; } +// plik_oneshot_upload - matches Julia plik_oneshot_upload function +async function plik_oneshot_upload(file_server_url, dataname, data) { + /** + * Upload a single file to a plik server using one-shot mode + * This function uploads raw byte array to a plik server in one-shot mode (no upload session). + * It first creates a one-shot upload session by sending a POST request with {"OneShot": true}, + * retrieves an upload ID and token, then uploads the file data as multipart form data using the token. + * + * @param {string} file_server_url - Base URL of the plik server (e.g., "http://localhost:8080") + * @param {string} dataname - Name of the file being uploaded + * @param {Uint8Array} data - Raw byte data of the file content + * @returns {Promise} - Dictionary with keys: status, uploadid, fileid, url + */ + + // Step 1: Get upload ID and token + const url_getUploadID = `${file_server_url}/upload`; + const headers = { "Content-Type": "application/json" }; + const body = JSON.stringify({ OneShot: true }); + + let http_response = await fetch(url_getUploadID, { + method: "POST", + headers: headers, + body: body + }); + + const response_json = await http_response.json(); + const uploadid = response_json.id; + const uploadtoken = response_json.uploadToken; + + // Step 2: Upload file data + const url_upload = `${file_server_url}/file/${uploadid}`; + + // Create multipart form data + const formData = new FormData(); + const blob = new Blob([data], { type: "application/octet-stream" }); + formData.append("file", blob, dataname); + + http_response = await fetch(url_upload, { + method: "POST", + headers: { "X-UploadToken": uploadtoken }, + body: formData + }); + + const fileResponseJson = await http_response.json(); + const fileid = fileResponseJson.id; + + // URL of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" + const url = `${file_server_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`; + + return { + status: http_response.status, + uploadid: uploadid, + fileid: fileid, + url: url + }; +} + // Export for Node.js if (typeof module !== 'undefined' && module.exports) { module.exports = { @@ -692,6 +772,7 @@ if (typeof module !== 'undefined' && module.exports) { _deserialize_data, _fetch_with_backoff, _upload_to_fileserver, + plik_oneshot_upload, DEFAULT_SIZE_THRESHOLD, DEFAULT_NATS_URL, DEFAULT_FILESERVER_URL, @@ -711,6 +792,7 @@ if (typeof window !== 'undefined') { _deserialize_data, _fetch_with_backoff, _upload_to_fileserver, + plik_oneshot_upload, DEFAULT_SIZE_THRESHOLD, DEFAULT_NATS_URL, DEFAULT_FILESERVER_URL, diff --git a/src/nats_bridge.py b/src/nats_bridge.py index 7e73473..2b4c457 100644 --- a/src/nats_bridge.py +++ b/src/nats_bridge.py @@ -1,45 +1,60 @@ """ -Micropython NATS Bridge - Bi-Directional Data Bridge for Micropython +Python NATS Bridge - Bi-Directional Data Bridge This module provides functionality for sending and receiving data over NATS using the Claim-Check pattern for large payloads. Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" + +Multi-Payload Support (Standard API): +The system uses a standardized list-of-tuples format for all payload operations. +Even when sending a single payload, the user must wrap it in a list. + +API Standard: + # Input format for smartsend (always a list of tuples with type info) + [(dataname1, data1, type1), (dataname2, data2, type2), ...] + + # Output format for smartreceive (always returns a list of tuples) + [(dataname1, data1, type1), (dataname2, data2, type2), ...] """ import json -import random import time -import usocket -import uselect -import ustruct import uuid -try: - import ussl - HAS_SSL = True -except ImportError: - HAS_SSL = False - # Constants DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport -DEFAULT_NATS_URL = "nats://localhost:4222" +DEFAULT_BROKER_URL = "nats://localhost:4222" DEFAULT_FILESERVER_URL = "http://localhost:8080" # ============================================= 100 ============================================== # class MessagePayload: - """Internal message payload structure representing a single payload within a NATS message envelope.""" + """Internal message payload structure representing a single payload within a NATS message envelope. - def __init__(self, data, msg_type, id="", dataname="", transport="direct", + This structure represents a single payload within a NATS message envelope. + It supports both direct transport (base64-encoded data) and link transport (URL-based). + + Attributes: + id: Unique identifier for this payload (e.g., "uuid4") + dataname: Name of the payload (e.g., "login_image") + payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary") + transport: Transport method ("direct" or "link") + encoding: Encoding method ("none", "json", "base64", "arrow-ipc") + size: Size of the payload in bytes + data: Payload data (bytes for direct, URL for link) + metadata: Optional metadata dictionary + """ + + def __init__(self, data, payload_type, id="", dataname="", transport="direct", encoding="none", size=0, metadata=None): """ Initialize a MessagePayload. Args: - data: Payload data (bytes for direct, URL string for link) - msg_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary") + data: Payload data (base64 string for direct, URL string for link) + payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary") id: Unique identifier for this payload (auto-generated if empty) dataname: Name of the payload (auto-generated UUID if empty) transport: Transport method ("direct" or "link") @@ -49,7 +64,7 @@ class MessagePayload: """ self.id = id if id else self._generate_uuid() self.dataname = dataname if dataname else self._generate_uuid() - self.type = msg_type + self.payload_type = payload_type self.transport = transport self.encoding = encoding self.size = size @@ -65,7 +80,7 @@ class MessagePayload: payload_dict = { "id": self.id, "dataname": self.dataname, - "type": self.type, + "payload_type": self.payload_type, "transport": self.transport, "encoding": self.encoding, "size": self.size, @@ -152,20 +167,24 @@ class MessageEnvelope: return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime()) def to_json(self): - """Convert envelope to JSON string.""" + """Convert envelope to JSON string. + + Returns: + str: JSON string representation of the envelope using snake_case field names + """ obj = { - "correlationId": self.correlation_id, - "msgId": self.msg_id, + "correlation_id": self.correlation_id, + "msg_id": self.msg_id, "timestamp": self.timestamp, - "sendTo": self.send_to, - "msgPurpose": self.msg_purpose, - "senderName": self.sender_name, - "senderId": self.sender_id, - "receiverName": self.receiver_name, - "receiverId": self.receiver_id, - "replyTo": self.reply_to, - "replyToMsgId": self.reply_to_msg_id, - "brokerURL": self.broker_url + "send_to": self.send_to, + "msg_purpose": self.msg_purpose, + "sender_name": self.sender_name, + "sender_id": self.sender_id, + "receiver_name": self.receiver_name, + "receiver_id": self.receiver_id, + "reply_to": self.reply_to, + "reply_to_msg_id": self.reply_to_msg_id, + "broker_url": self.broker_url } # Include metadata if not empty @@ -188,68 +207,126 @@ def log_trace(correlation_id, message): print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message)) -def _serialize_data(data, msg_type): +def _serialize_data(data, payload_type): """Serialize data according to specified format. + This function serializes arbitrary data into a binary representation based on the specified type. + It supports multiple serialization formats for different data types. + Args: data: Data to serialize - msg_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary") + - "text": String + - "dictionary": JSON-serializable dict + - "table": Tabular data (pandas DataFrame or list of dicts) + - "image", "audio", "video", "binary": bytes + payload_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary") Returns: bytes: Binary representation of the serialized data + + Example: + >>> text_bytes = _serialize_data("Hello World", "text") + >>> json_bytes = _serialize_data({"key": "value"}, "dictionary") + >>> table_bytes = _serialize_data([{"id": 1, "name": "Alice"}], "table") """ - if msg_type == "text": + if payload_type == "text": if isinstance(data, str): return data.encode('utf-8') else: raise ValueError("Text data must be a string") - elif msg_type == "dictionary": + elif payload_type == "dictionary": if isinstance(data, dict): json_str = json.dumps(data) return json_str.encode('utf-8') else: raise ValueError("Dictionary data must be a dict") - elif msg_type in ("image", "audio", "video", "binary"): + elif payload_type == "table": + # Support pandas DataFrame or list of dicts + try: + import pandas as pd + if isinstance(data, pd.DataFrame): + # Convert DataFrame to JSON and then to bytes + json_str = data.to_json(orient='records', force_ascii=False) + return json_str.encode('utf-8') + elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): + # List of dicts + json_str = json.dumps(data) + return json_str.encode('utf-8') + else: + raise ValueError("Table data must be a pandas DataFrame or list of dicts") + except ImportError: + # Fallback: if pandas not available, treat as list of dicts + if isinstance(data, list): + json_str = json.dumps(data) + return json_str.encode('utf-8') + else: + raise ValueError("Table data requires pandas DataFrame or list of dicts (pandas not available)") + + elif payload_type in ("image", "audio", "video", "binary"): if isinstance(data, bytes): return data else: - raise ValueError("{} data must be bytes".format(msg_type.capitalize())) + raise ValueError("{} data must be bytes".format(payload_type.capitalize())) else: - raise ValueError("Unknown type: {}".format(msg_type)) + raise ValueError("Unknown payload_type: {}".format(payload_type)) -def _deserialize_data(data_bytes, msg_type, correlation_id): +def _deserialize_data(data_bytes, payload_type, correlation_id): """Deserialize bytes to data based on type. + This function converts serialized bytes back to Python data based on type. + It handles "text" (string), "dictionary" (JSON deserialization), "table" (JSON deserialization), + "image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data). + Args: data_bytes: Serialized data as bytes - msg_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") + payload_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") correlation_id: Correlation ID for logging Returns: - Deserialized data + Deserialized data: + - "text": str + - "dictionary": dict + - "table": list of dicts (or pandas DataFrame if available) + - "image", "audio", "video", "binary": bytes + + Example: + >>> text_data = _deserialize_data(b"Hello", "text", "corr_id") + >>> json_data = _deserialize_data(b'{"key": "value"}', "dictionary", "corr_id") + >>> table_data = _deserialize_data(b'[{"id": 1}]', "table", "corr_id") """ - if msg_type == "text": + if payload_type == "text": return data_bytes.decode('utf-8') - elif msg_type == "dictionary": + elif payload_type == "dictionary": json_str = data_bytes.decode('utf-8') return json.loads(json_str) - elif msg_type in ("image", "audio", "video", "binary"): + elif payload_type == "table": + # Deserialize table data (JSON format) + json_str = data_bytes.decode('utf-8') + table_data = json.loads(json_str) + # If pandas is available, try to convert to DataFrame + try: + import pandas as pd + return pd.DataFrame(table_data) + except ImportError: + return table_data + + elif payload_type in ("image", "audio", "video", "binary"): return data_bytes else: - raise ValueError("Unknown type: {}".format(msg_type)) + raise ValueError("Unknown payload_type: {}".format(payload_type)) class NATSConnection: - """Simple NATS connection for Micropython.""" + """Simple NATS connection for Python and Micropython.""" - def __init__(self, url=DEFAULT_NATS_URL): + def __init__(self, url=DEFAULT_BROKER_URL): """Initialize NATS connection. Args: @@ -276,9 +353,19 @@ class NATSConnection: def connect(self): """Connect to NATS server.""" - addr = usocket.getaddrinfo(self.host, self.port)[0][-1] - self.conn = usocket.socket() - self.conn.connect(addr) + # Use socket for both Python and Micropython + try: + import socket + addr = socket.getaddrinfo(self.host, self.port)[0][-1] + self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.conn.connect(addr) + except NameError: + # Micropython fallback + import usocket + addr = usocket.getaddrinfo(self.host, self.port)[0][-1] + self.conn = usocket.socket() + self.conn.connect(addr) + log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port)) def publish(self, subject, message): @@ -294,7 +381,15 @@ class NATSConnection: # Simple NATS protocol implementation msg = "PUB {} {}\r\n".format(subject, len(message)) msg = msg.encode('utf-8') + message + b"\r\n" - self.conn.send(msg) + + try: + import socket + self.conn.send(msg) + except NameError: + # Micropython fallback + import usocket + self.conn.send(msg) + log_trace("", "Message published to {}".format(subject)) def subscribe(self, subject, callback): @@ -335,11 +430,14 @@ class NATSConnection: def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""): """Fetch data from URL with exponential backoff. + This function retrieves data from a URL with retry logic using + exponential backoff to handle transient failures. + Args: url: URL to fetch from - max_retries: Maximum number of retry attempts - base_delay: Initial delay in milliseconds - max_delay: Maximum delay in milliseconds + max_retries: Maximum number of retry attempts (default: 5) + base_delay: Initial delay in milliseconds (default: 100) + max_delay: Maximum delay in milliseconds (default: 5000) correlation_id: Correlation ID for logging Returns: @@ -347,33 +445,54 @@ def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, corr Raises: Exception: If all retry attempts fail + + Example: + >>> data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "corr_id") """ delay = base_delay for attempt in range(1, max_retries + 1): try: # Simple HTTP GET request - # This is a simplified implementation - # For production, you'd want a proper HTTP client - import urequests - response = urequests.get(url) - if response.status_code == 200: + # Try urequests for Micropython first, then requests for Python + try: + import urequests + response = urequests.get(url) + status_code = response.status_code + content = response.content + except ImportError: + try: + import requests + response = requests.get(url) + response.raise_for_status() + status_code = response.status_code + content = response.content + except ImportError: + raise Exception("No HTTP library available (urequests or requests)") + + if status_code == 200: log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt)) - return response.content + return content else: - raise Exception("Failed to fetch: {}".format(response.status_code)) + raise Exception("Failed to fetch: {}".format(status_code)) except Exception as e: log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e))) if attempt < max_retries: time.sleep(delay / 1000.0) delay = min(delay * 2, max_delay) + + raise Exception("Failed to fetch data after {} attempts".format(max_retries)) -def plik_oneshot_upload(file_server_url, filename, data): +def plik_oneshot_upload(fileserver_url, dataname, data): """Upload a single file to a plik server using one-shot mode. + This function uploads raw byte data to a plik server in one-shot mode (no upload session). + It first creates a one-shot upload session by sending a POST request with {"OneShot": true}, + retrieves an upload ID and token, then uploads the file data as multipart form data using the token. + Args: - file_server_url: Base URL of the plik server - filename: Name of the file being uploaded + fileserver_url: Base URL of the plik server (e.g., "http://localhost:8080") + dataname: Name of the file being uploaded data: Raw byte data of the file content Returns: @@ -382,23 +501,31 @@ def plik_oneshot_upload(file_server_url, filename, data): - "uploadid": ID of the one-shot upload session - "fileid": ID of the uploaded file within the session - "url": Full URL to download the uploaded file + + Example: + >>> result = plik_oneshot_upload("http://localhost:8080", "test.txt", b"hello world") + >>> result["status"], result["uploadid"], result["fileid"], result["url"] """ - import urequests import json + try: + import urequests + except ImportError: + import requests as urequests + # Get upload ID - url_get_upload_id = "{}/upload".format(file_server_url) + url_get_upload_id = "{}/upload".format(fileserver_url) headers = {"Content-Type": "application/json"} body = json.dumps({"OneShot": True}) response = urequests.post(url_get_upload_id, headers=headers, data=body) - response_json = json.loads(response.content) + response_json = json.loads(response.text if hasattr(response, 'text') else response.content) uploadid = response_json.get("id") uploadtoken = response_json.get("uploadToken") # Upload file - url_upload = "{}/file/{}".format(file_server_url, uploadid) + url_upload = "{}/file/{}".format(fileserver_url, uploadid) headers = {"X-UploadToken": uploadtoken} # For Micropython, we need to construct the multipart form data manually @@ -407,7 +534,7 @@ def plik_oneshot_upload(file_server_url, filename, data): # Create multipart body part1 = "--{}\r\n".format(boundary) - part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(filename) + part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(dataname) part1 += "Content-Type: application/octet-stream\r\n\r\n" part1_bytes = part1.encode('utf-8') @@ -421,10 +548,10 @@ def plik_oneshot_upload(file_server_url, filename, data): content_type = "multipart/form-data; boundary={}".format(boundary) response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body) - response_json = json.loads(response.content) + response_json = json.loads(response.text if hasattr(response, 'text') else response.content) fileid = response_json.get("id") - url = "{}/file/{}/{}".format(file_server_url, uploadid, filename) + url = "{}/file/{}/{}".format(fileserver_url, uploadid, dataname) return { "status": response.status_code, @@ -434,7 +561,7 @@ def plik_oneshot_upload(file_server_url, filename, data): } -def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL, +def smartsend(subject, data, broker_url=DEFAULT_BROKER_URL, fileserver_url=DEFAULT_FILESERVER_URL, fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD, correlation_id=None, msg_purpose="chat", sender_name="NATSBridge", receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True): @@ -447,27 +574,38 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F Args: subject: NATS subject to publish the message to - data: List of (dataname, data, type) tuples to send - nats_url: URL of the NATS server + data: List of (dataname, data, payload_type) tuples to send + - dataname: Name of the payload + - data: The actual data to send + - payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary") + broker_url: URL of the NATS server fileserver_url: URL of the HTTP file server - fileserver_upload_handler: Function to handle fileserver uploads - size_threshold: Threshold in bytes separating direct vs link transport - correlation_id: Optional correlation ID for tracing - msg_purpose: Purpose of the message + fileserver_upload_handler: Function to handle fileserver uploads (must return dict with "status", "uploadid", "fileid", "url" keys) + size_threshold: Threshold in bytes separating direct vs link transport (default: 1MB) + correlation_id: Optional correlation ID for tracing; if None, a UUID is generated + msg_purpose: Purpose of the message ("ACK", "NACK", "updateStatus", "shutdown", "chat", etc.) sender_name: Name of the sender - receiver_name: Name of the receiver - receiver_id: UUID of the receiver - reply_to: Topic to reply to + receiver_name: Name of the receiver (empty string means broadcast) + receiver_id: UUID of the receiver (empty string means broadcast) + reply_to: Topic to reply to (empty string if no reply expected) reply_to_msg_id: Message ID this message is replying to is_publish: Whether to automatically publish the message to NATS (default: True) + - When True: message is published to NATS + - When False: returns envelope and JSON string without publishing Returns: tuple: (env, env_json_str) where: - env: MessageEnvelope object with all metadata and payloads - env_json_str: JSON string representation of the envelope for publishing + + Example: + >>> data = [("message", "Hello World!", "text")] + >>> env, env_json_str = smartsend("/test", data) + >>> # env: MessageEnvelope with all metadata and payloads + >>> # env_json_str: JSON string for publishing """ # Generate correlation ID if not provided - cid = correlation_id if correlation_id else str(uuid.uuid4()) + cid = correlation_id if correlation_id is not None else str(uuid.uuid4()) log_trace(cid, "Starting smartsend for subject: {}".format(subject)) @@ -482,16 +620,19 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F payload_bytes = _serialize_data(payload_data, payload_type) payload_size = len(payload_bytes) - log_trace(cid, "Serialized payload '{}' (type: {}) size: {} bytes".format( + log_trace(cid, "Serialized payload '{}' (payload_type: {}) size: {} bytes".format( dataname, payload_type, payload_size)) # Decision: Direct vs Link if payload_size < size_threshold: # Direct path - Base64 encode and send via NATS - payload_b64 = _serialize_data(payload_bytes, "binary") # Already bytes # Convert to base64 string for JSON - import ubinascii - payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip() + try: + import ubinascii + payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip() + except ImportError: + import base64 + payload_b64_str = base64.b64encode(payload_bytes).decode('utf-8') log_trace(cid, "Using direct transport for {} bytes".format(payload_size)) @@ -514,10 +655,10 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F # Upload to HTTP server response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) - if response["status"] != 200: - raise Exception("Failed to upload data to fileserver: {}".format(response["status"])) + if response.get("status") != 200: + raise Exception("Failed to upload data to fileserver: {}".format(response.get("status"))) - url = response["url"] + url = response.get("url") log_trace(cid, "Uploaded to URL: {}".format(url)) # Create MessagePayload for link transport @@ -546,7 +687,7 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F receiver_id=receiver_id, reply_to=reply_to, reply_to_msg_id=reply_to_msg_id, - broker_url=nats_url, + broker_url=broker_url, metadata={} ) @@ -554,7 +695,7 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F # Publish to NATS if is_publish is True if is_publish: - nats_conn = NATSConnection(nats_url) + nats_conn = NATSConnection(broker_url) nats_conn.connect() nats_conn.publish(subject, msg_json) nats_conn.close() @@ -571,18 +712,29 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri (base64 decoded payloads) and link transport (URL-based payloads). Args: - msg: NATS message to process (dict with payload data) + msg: NATS message to process (dict or JSON string with envelope data) fileserver_download_handler: Function to handle downloading data from file server URLs - max_retries: Maximum retry attempts for fetching URL - base_delay: Initial delay for exponential backoff in ms - max_delay: Maximum delay for exponential backoff in ms + Receives: (url, max_retries, base_delay, max_delay, correlation_id) + Returns: bytes (the downloaded data) + max_retries: Maximum retry attempts for fetching URL (default: 5) + base_delay: Initial delay for exponential backoff in ms (default: 100) + max_delay: Maximum delay for exponential backoff in ms (default: 5000) Returns: - dict: Envelope dictionary with metadata and 'payloads' field containing list of (dataname, data, type) tuples + dict: Envelope dictionary with metadata and 'payloads' field containing list of + (dataname, data, payload_type) tuples + + Example: + >>> env = smartreceive(msg) + >>> # env contains envelope metadata and payloads field + >>> # env["payloads"] = [(dataname1, data1, payload_type1), ...] + >>> for dataname, data, payload_type in env["payloads"]: + ... print("Received {} of type {}: {}".format(dataname, payload_type, data)) """ # Parse the JSON envelope json_data = msg if isinstance(msg, dict) else json.loads(msg) - log_trace(json_data.get("correlationId", ""), "Processing received message") + correlation_id = json_data.get("correlation_id", "") + log_trace(correlation_id, "Processing received message") # Process all payloads in the envelope payloads_list = [] @@ -596,43 +748,47 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri dataname = payload.get("dataname", "") if transport == "direct": - log_trace(json_data.get("correlationId", ""), + log_trace(correlation_id, "Direct transport - decoding payload '{}'".format(dataname)) # Extract base64 payload from the payload payload_b64 = payload.get("data", "") # Decode Base64 payload - import ubinascii - payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8')) + try: + import ubinascii + payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8')) + except ImportError: + import base64 + payload_bytes = base64.b64decode(payload_b64) # Deserialize based on type - data_type = payload.get("type", "") - data = _deserialize_data(payload_bytes, data_type, json_data.get("correlationId", "")) + payload_type = payload.get("payload_type", "") + data = _deserialize_data(payload_bytes, payload_type, correlation_id) - payloads_list.append((dataname, data, data_type)) + payloads_list.append((dataname, data, payload_type)) elif transport == "link": # Extract download URL from the payload url = payload.get("data", "") - log_trace(json_data.get("correlationId", ""), + log_trace(correlation_id, "Link transport - fetching '{}' from URL: {}".format(dataname, url)) # Fetch with exponential backoff downloaded_data = fileserver_download_handler( - url, max_retries, base_delay, max_delay, json_data.get("correlationId", "") + url, max_retries, base_delay, max_delay, correlation_id ) # Deserialize based on type - data_type = payload.get("type", "") - data = _deserialize_data(downloaded_data, data_type, json_data.get("correlationId", "")) + payload_type = payload.get("payload_type", "") + data = _deserialize_data(downloaded_data, payload_type, correlation_id) - payloads_list.append((dataname, data, data_type)) + payloads_list.append((dataname, data, payload_type)) else: raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport)) - # Replace payloads field with the processed list of (dataname, data, type) tuples + # Replace payloads field with the processed list of (dataname, data, payload_type) tuples json_data["payloads"] = payloads_list return json_data @@ -651,11 +807,11 @@ def get_timestamp(): # Example usage if __name__ == "__main__": - print("NATSBridge for Micropython") - print("=========================") + print("NATSBridge - Bi-Directional Data Bridge") + print("=======================================") print("This module provides:") - print(" - MessageEnvelope: Message envelope structure") - print(" - MessagePayload: Payload structure") + print(" - MessageEnvelope: Message envelope structure with snake_case fields") + print(" - MessagePayload: Payload structure with payload_type field") print(" - smartsend: Send data via NATS with automatic transport selection") print(" - smartreceive: Receive and process messages from NATS") print(" - plik_oneshot_upload: Upload files to HTTP file server") @@ -663,10 +819,12 @@ if __name__ == "__main__": print() print("Usage:") print(" from nats_bridge import smartsend, smartreceive") - print(" data = [(\"message\", \"Hello World\", \"text\")]") - print(" env = smartsend(\"my.subject\", data)") + print() + print(" # Send data (list of (dataname, data, payload_type) tuples)") + print(" data = [(\"message\", \"Hello World!\", \"text\")]") + print(" env, env_json_str = smartsend(\"my.subject\", data)") print() print(" # On receiver:") - print(" payloads = smartreceive(msg)") - print(" for dataname, data, type in payloads:") - print(" print(f\"Received {dataname} of type {type}: {data}\")") \ No newline at end of file + print(" env = smartreceive(msg)") + print(" for dataname, data, payload_type in env[\"payloads\"]:") + print(" print(\"Received {} of type {}: {}\".format(dataname, payload_type, data))") \ No newline at end of file