This commit is contained in:
2026-02-25 08:33:32 +07:00
parent 8c793a81b6
commit 647cadf497
2 changed files with 486 additions and 246 deletions

View File

@@ -98,6 +98,26 @@ function base64ToArrayBuffer(base64) {
return bytes.buffer; return bytes.buffer;
} }
// Helper: Convert Uint8Array to Base64 string
function uint8ArrayToBase64(uint8array) {
let binary = '';
for (let i = 0; i < uint8array.byteLength; i++) {
binary += String.fromCharCode(uint8array[i]);
}
return btoa(binary);
}
// Helper: Convert Base64 string to Uint8Array
function base64ToUint8Array(base64) {
const binaryString = atob(base64);
const len = binaryString.length;
const bytes = new Uint8Array(len);
for (let i = 0; i < len; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
return bytes;
}
// Helper: Serialize data based on type // Helper: Serialize data based on type
function _serialize_data(data, type) { function _serialize_data(data, type) {
/** /**
@@ -114,39 +134,39 @@ function _serialize_data(data, type) {
*/ */
if (type === "text") { if (type === "text") {
if (typeof data === 'string') { if (typeof data === 'string') {
return new TextEncoder().encode(data).buffer; return new TextEncoder().encode(data);
} else { } else {
throw new Error("Text data must be a String"); throw new Error("Text data must be a String");
} }
} else if (type === "dictionary") { } else if (type === "dictionary") {
// JSON data - serialize directly // JSON data - serialize directly
const jsonStr = JSON.stringify(data); const jsonStr = JSON.stringify(data);
return new TextEncoder().encode(jsonStr).buffer; return new TextEncoder().encode(jsonStr);
} else if (type === "table") { } else if (type === "table") {
// Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript) // Table data - convert to Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
// This would require the apache-arrow library // This would require the apache-arrow library
throw new Error("Table serialization requires apache-arrow library"); throw new Error("Table serialization requires apache-arrow library");
} else if (type === "image") { } else if (type === "image") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) { if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer; return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else { } else {
throw new Error("Image data must be ArrayBuffer or Uint8Array"); throw new Error("Image data must be ArrayBuffer or Uint8Array");
} }
} else if (type === "audio") { } else if (type === "audio") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) { if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer; return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else { } else {
throw new Error("Audio data must be ArrayBuffer or Uint8Array"); throw new Error("Audio data must be ArrayBuffer or Uint8Array");
} }
} else if (type === "video") { } else if (type === "video") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) { if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer; return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else { } else {
throw new Error("Video data must be ArrayBuffer or Uint8Array"); throw new Error("Video data must be ArrayBuffer or Uint8Array");
} }
} else if (type === "binary") { } else if (type === "binary") {
if (data instanceof ArrayBuffer || data instanceof Uint8Array) { if (data instanceof ArrayBuffer || data instanceof Uint8Array) {
return data instanceof ArrayBuffer ? data : data.buffer; return data instanceof ArrayBuffer ? new Uint8Array(data) : data;
} else { } else {
throw new Error("Binary data must be ArrayBuffer or Uint8Array"); throw new Error("Binary data must be ArrayBuffer or Uint8Array");
} }
@@ -171,10 +191,10 @@ function _deserialize_data(data, type, correlation_id) {
*/ */
if (type === "text") { if (type === "text") {
const decoder = new TextDecoder(); const decoder = new TextDecoder();
return decoder.decode(new Uint8Array(data)); return decoder.decode(data);
} else if (type === "dictionary") { } else if (type === "dictionary") {
const decoder = new TextDecoder(); const decoder = new TextDecoder();
const jsonStr = decoder.decode(new Uint8Array(data)); const jsonStr = decoder.decode(data);
return JSON.parse(jsonStr); return JSON.parse(jsonStr);
} else if (type === "table") { } else if (type === "table") {
// Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript) // Table data - deserialize Arrow IPC stream (NOT IMPLEMENTED in pure JavaScript)
@@ -230,7 +250,7 @@ async function _upload_to_fileserver(fileserver_url, dataname, data, correlation
// Create multipart form data // Create multipart form data
const formData = new FormData(); const formData = new FormData();
// Create a Blob from the ArrayBuffer // Create a Blob from the Uint8Array
const blob = new Blob([data], { type: "application/octet-stream" }); const blob = new Blob([data], { type: "application/octet-stream" });
formData.append("file", blob, dataname); formData.append("file", blob, dataname);
@@ -276,7 +296,7 @@ async function _fetch_with_backoff(url, max_retries, base_delay, max_delay, corr
if (response.status === 200) { if (response.status === 200) {
log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`); log_trace(correlation_id, `Successfully fetched data from ${url} on attempt ${attempt}`);
const arrayBuffer = await response.arrayBuffer(); const arrayBuffer = await response.arrayBuffer();
return arrayBuffer; return new Uint8Array(arrayBuffer);
} else { } else {
throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`); throw new Error(`Failed to fetch: ${response.status} ${response.statusText}`);
} }
@@ -306,25 +326,26 @@ function _get_payload_bytes(data) {
} }
} }
// MessagePayload class // MessagePayload class - matches msg_payload_v1 Julia struct
class MessagePayload { class MessagePayload {
/** /**
* Represents a single payload in the message envelope * Represents a single payload in the message envelope
* Matches Julia's msg_payload_v1 struct
* *
* @param {Object} options - Payload options * @param {Object} options - Payload options
* @param {string} options.id - ID of this payload (e.g., "uuid4") * @param {string} options.id - ID of this payload (e.g., "uuid4")
* @param {string} options.dataname - Name of this payload (e.g., "login_image") * @param {string} options.dataname - Name of this payload (e.g., "login_image")
* @param {string} options.type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" * @param {string} options.payload_type - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
* @param {string} options.transport - "direct" or "link" * @param {string} options.transport - "direct" or "link"
* @param {string} options.encoding - "none", "json", "base64", "arrow-ipc" * @param {string} options.encoding - "none", "json", "base64", "arrow-ipc"
* @param {number} options.size - Data size in bytes * @param {number} options.size - Data size in bytes
* @param {string|ArrayBuffer} options.data - Payload data (direct) or URL (link) * @param {string|Uint8Array} options.data - Payload data (Uint8Array for direct, URL string for link)
* @param {Object} options.metadata - Metadata for this payload * @param {Object} options.metadata - Metadata for this payload
*/ */
constructor(options) { constructor(options) {
this.id = options.id || uuid4(); this.id = options.id || uuid4();
this.dataname = options.dataname; this.dataname = options.dataname;
this.type = options.type; this.payload_type = options.payload_type;
this.transport = options.transport; this.transport = options.transport;
this.encoding = options.encoding; this.encoding = options.encoding;
this.size = options.size; this.size = options.size;
@@ -332,27 +353,27 @@ class MessagePayload {
this.metadata = options.metadata || {}; this.metadata = options.metadata || {};
} }
// Convert to JSON object // Convert to JSON object - uses snake_case to match Julia API
toJSON() { toJSON() {
const obj = { const obj = {
id: this.id, id: this.id,
dataname: this.dataname, dataname: this.dataname,
type: this.type, payload_type: this.payload_type,
transport: this.transport, transport: this.transport,
encoding: this.encoding, encoding: this.encoding,
size: this.size size: this.size
}; };
// Include data based on transport type // Include data based on transport type
if (this.transport === "direct" && this.data !== null) { if (this.transport === "direct" && this.data !== null && this.data !== undefined) {
if (this.encoding === "base64" || this.encoding === "json") { if (this.encoding === "base64" || this.encoding === "json") {
obj.data = this.data; obj.data = this.data;
} else { } else {
// For other encodings, use base64 // For other encodings, use base64
const payloadBytes = _get_payload_bytes(this.data); const payloadBytes = _get_payload_bytes(this.data);
obj.data = arrayBufferToBase64(payloadBytes); obj.data = uint8ArrayToBase64(payloadBytes);
} }
} else if (this.transport === "link" && this.data !== null) { } else if (this.transport === "link" && this.data !== null && this.data !== undefined) {
// For link transport, data is a URL string // For link transport, data is a URL string
obj.data = this.data; obj.data = this.data;
} }
@@ -365,59 +386,60 @@ class MessagePayload {
} }
} }
// MessageEnvelope class // MessageEnvelope class - matches msg_envelope_v1 Julia struct
class MessageEnvelope { class MessageEnvelope {
/** /**
* Represents the message envelope containing metadata and payloads * Represents the message envelope containing metadata and payloads
* Matches Julia's msg_envelope_v1 struct
* *
* @param {Object} options - Envelope options * @param {Object} options - Envelope options
* @param {string} options.sendTo - Topic/subject the sender sends to * @param {string} options.correlation_id - Unique identifier to track messages
* @param {Array<MessagePayload>} options.payloads - Array of payloads * @param {string} options.msg_id - This message id
* @param {string} options.correlationId - Unique identifier to track messages
* @param {string} options.msgId - This message id
* @param {string} options.timestamp - Message published timestamp * @param {string} options.timestamp - Message published timestamp
* @param {string} options.msgPurpose - Purpose of this message * @param {string} options.send_to - Topic/subject the sender sends to
* @param {string} options.senderName - Name of the sender * @param {string} options.msg_purpose - Purpose of this message
* @param {string} options.senderId - UUID of the sender * @param {string} options.sender_name - Name of the sender
* @param {string} options.receiverName - Name of the receiver * @param {string} options.sender_id - UUID of the sender
* @param {string} options.receiverId - UUID of the receiver * @param {string} options.receiver_name - Name of the receiver
* @param {string} options.replyTo - Topic to reply to * @param {string} options.receiver_id - UUID of the receiver
* @param {string} options.replyToMsgId - Message id this message is replying to * @param {string} options.reply_to - Topic to reply to
* @param {string} options.brokerURL - NATS server address * @param {string} options.reply_to_msg_id - Message id this message is replying to
* @param {string} options.broker_url - NATS server address
* @param {Object} options.metadata - Metadata for the envelope * @param {Object} options.metadata - Metadata for the envelope
* @param {Array<MessagePayload>} options.payloads - Array of payloads
*/ */
constructor(options) { constructor(options) {
this.correlationId = options.correlationId || uuid4(); this.correlation_id = options.correlation_id || uuid4();
this.msgId = options.msgId || uuid4(); this.msg_id = options.msg_id || uuid4();
this.timestamp = options.timestamp || new Date().toISOString(); this.timestamp = options.timestamp || new Date().toISOString();
this.sendTo = options.sendTo; this.send_to = options.send_to;
this.msgPurpose = options.msgPurpose || ""; this.msg_purpose = options.msg_purpose || "";
this.senderName = options.senderName || ""; this.sender_name = options.sender_name || "";
this.senderId = options.senderId || uuid4(); this.sender_id = options.sender_id || uuid4();
this.receiverName = options.receiverName || ""; this.receiver_name = options.receiver_name || "";
this.receiverId = options.receiverId || ""; this.receiver_id = options.receiver_id || "";
this.replyTo = options.replyTo || ""; this.reply_to = options.reply_to || "";
this.replyToMsgId = options.replyToMsgId || ""; this.reply_to_msg_id = options.reply_to_msg_id || "";
this.brokerURL = options.brokerURL || DEFAULT_NATS_URL; this.broker_url = options.broker_url || DEFAULT_NATS_URL;
this.metadata = options.metadata || {}; this.metadata = options.metadata || {};
this.payloads = options.payloads || []; this.payloads = options.payloads || [];
} }
// Convert to JSON string // Convert to JSON object - uses snake_case to match Julia API
toJSON() { toJSON() {
const obj = { const obj = {
correlationId: this.correlationId, correlation_id: this.correlation_id,
msgId: this.msgId, msg_id: this.msg_id,
timestamp: this.timestamp, timestamp: this.timestamp,
sendTo: this.sendTo, send_to: this.send_to,
msgPurpose: this.msgPurpose, msg_purpose: this.msg_purpose,
senderName: this.senderName, sender_name: this.sender_name,
senderId: this.senderId, sender_id: this.sender_id,
receiverName: this.receiverName, receiver_name: this.receiver_name,
receiverId: this.receiverId, receiver_id: this.receiver_id,
replyTo: this.replyTo, reply_to: this.reply_to,
replyToMsgId: this.replyToMsgId, reply_to_msg_id: this.reply_to_msg_id,
brokerURL: this.brokerURL broker_url: this.broker_url
}; };
if (Object.keys(this.metadata).length > 0) { if (Object.keys(this.metadata).length > 0) {
@@ -437,7 +459,7 @@ class MessageEnvelope {
} }
} }
// SmartSend function // SmartSend function - matches Julia smartsend signature and behavior
async function smartsend(subject, data, options = {}) { async function smartsend(subject, data, options = {}) {
/** /**
* Send data either directly via NATS or via a fileserver URL, depending on payload size * Send data either directly via NATS or via a fileserver URL, depending on payload size
@@ -447,42 +469,42 @@ async function smartsend(subject, data, options = {}) {
* Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS. * Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS.
* *
* @param {string} subject - NATS subject to publish the message to * @param {string} subject - NATS subject to publish the message to
* @param {Array} data - List of {dataname, data, type} objects to send * @param {Array} data - List of {dataname, data, type} objects to send (must be a list, even for single payload)
* @param {Object} options - Additional options * @param {Object} options - Additional options
* @param {string} options.natsUrl - URL of the NATS server (default: "nats://localhost:4222") * @param {string} options.broker_url - URL of the NATS server (default: "nats://localhost:4222")
* @param {string} options.fileserverUrl - Base URL of the file server (default: "http://localhost:8080") * @param {string} options.fileserver_url - Base URL of the file server (default: "http://localhost:8080")
* @param {Function} options.fileserverUploadHandler - Function to handle fileserver uploads * @param {Function} options.fileserver_upload_handler - Function to handle fileserver uploads
* @param {number} options.sizeThreshold - Threshold in bytes separating direct vs link transport (default: 1MB) * @param {number} options.size_threshold - Threshold in bytes separating direct vs link transport (default: 1MB)
* @param {string} options.correlationId - Optional correlation ID for tracing * @param {string} options.correlation_id - Optional correlation ID for tracing
* @param {string} options.msgPurpose - Purpose of the message (default: "chat") * @param {string} options.msg_purpose - Purpose of the message (default: "chat")
* @param {string} options.senderName - Name of the sender (default: "NATSBridge") * @param {string} options.sender_name - Name of the sender (default: "NATSBridge")
* @param {string} options.receiverName - Name of the receiver (default: "") * @param {string} options.receiver_name - Name of the receiver (default: "")
* @param {string} options.receiverId - UUID of the receiver (default: "") * @param {string} options.receiver_id - UUID of the receiver (default: "")
* @param {string} options.replyTo - Topic to reply to (default: "") * @param {string} options.reply_to - Topic to reply to (default: "")
* @param {string} options.replyToMsgId - Message ID this message is replying to (default: "") * @param {string} options.reply_to_msg_id - Message ID this message is replying to (default: "")
* @param {boolean} options.isPublish - Whether to automatically publish the message to NATS (default: true) * @param {boolean} options.is_publish - Whether to automatically publish the message to NATS (default: true)
* *
* @returns {Promise<Object>} - An object with { env: MessageEnvelope, env_json_str: string } * @returns {Promise<Object>} - A tuple-like object with { env: MessageEnvelope, env_json_str: string }
*/ */
const { const {
natsUrl = DEFAULT_NATS_URL, broker_url = DEFAULT_NATS_URL,
fileserverUrl = DEFAULT_FILESERVER_URL, fileserver_url = DEFAULT_FILESERVER_URL,
fileserverUploadHandler = _upload_to_fileserver, fileserver_upload_handler = _upload_to_fileserver,
sizeThreshold = DEFAULT_SIZE_THRESHOLD, size_threshold = DEFAULT_SIZE_THRESHOLD,
correlationId = uuid4(), correlation_id = uuid4(),
msgPurpose = "chat", msg_purpose = "chat",
senderName = "NATSBridge", sender_name = "NATSBridge",
receiverName = "", receiver_name = "",
receiverId = "", receiver_id = "",
replyTo = "", reply_to = "",
replyToMsgId = "", reply_to_msg_id = "",
isPublish = true // Whether to automatically publish the message to NATS is_publish = true // Whether to automatically publish the message to NATS
} = options; } = options;
log_trace(correlationId, `Starting smartsend for subject: ${subject}`); log_trace(correlation_id, `Starting smartsend for subject: ${subject}`);
// Generate message metadata // Generate message metadata
const msgId = uuid4(); const msg_id = uuid4();
// Process each payload in the list // Process each payload in the list
const payloads = []; const payloads = [];
@@ -496,18 +518,18 @@ async function smartsend(subject, data, options = {}) {
const payloadBytes = _serialize_data(payloadData, payloadType); const payloadBytes = _serialize_data(payloadData, payloadType);
const payloadSize = payloadBytes.byteLength; const payloadSize = payloadBytes.byteLength;
log_trace(correlationId, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); log_trace(correlation_id, `Serialized payload '${dataname}' (payload_type: ${payloadType}) size: ${payloadSize} bytes`);
// Decision: Direct vs Link // Decision: Direct vs Link
if (payloadSize < sizeThreshold) { if (payloadSize < size_threshold) {
// Direct path - Base64 encode and send via NATS // Direct path - Base64 encode and send via NATS
const payloadB64 = arrayBufferToBase64(payloadBytes); const payloadB64 = uint8ArrayToBase64(payloadBytes);
log_trace(correlationId, `Using direct transport for ${payloadSize} bytes`); log_trace(correlation_id, `Using direct transport for ${payloadSize} bytes`);
// Create MessagePayload for direct transport // Create MessagePayload for direct transport
const payloadObj = new MessagePayload({ const payloadObj = new MessagePayload({
dataname: dataname, dataname: dataname,
type: payloadType, payload_type: payloadType,
transport: "direct", transport: "direct",
encoding: "base64", encoding: "base64",
size: payloadSize, size: payloadSize,
@@ -517,22 +539,22 @@ async function smartsend(subject, data, options = {}) {
payloads.push(payloadObj); payloads.push(payloadObj);
} else { } else {
// Link path - Upload to HTTP server, send URL via NATS // Link path - Upload to HTTP server, send URL via NATS
log_trace(correlationId, `Using link transport, uploading to fileserver`); log_trace(correlation_id, `Using link transport, uploading to fileserver`);
// Upload to HTTP server // Upload to HTTP server
const response = await fileserverUploadHandler(fileserverUrl, dataname, payloadBytes, correlationId); const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes, correlation_id);
if (response.status !== 200) { if (response.status !== 200) {
throw new Error(`Failed to upload data to fileserver: ${response.status}`); throw new Error(`Failed to upload data to fileserver: ${response.status}`);
} }
const url = response.url; const url = response.url;
log_trace(correlationId, `Uploaded to URL: ${url}`); log_trace(correlation_id, `Uploaded to URL: ${url}`);
// Create MessagePayload for link transport // Create MessagePayload for link transport
const payloadObj = new MessagePayload({ const payloadObj = new MessagePayload({
dataname: dataname, dataname: dataname,
type: payloadType, payload_type: payloadType,
transport: "link", transport: "link",
encoding: "none", encoding: "none",
size: payloadSize, size: payloadSize,
@@ -545,16 +567,16 @@ async function smartsend(subject, data, options = {}) {
// Create MessageEnvelope with all payloads // Create MessageEnvelope with all payloads
const env = new MessageEnvelope({ const env = new MessageEnvelope({
correlationId: correlationId, correlation_id: correlation_id,
msgId: msgId, msg_id: msg_id,
sendTo: subject, send_to: subject,
msgPurpose: msgPurpose, msg_purpose: msg_purpose,
senderName: senderName, sender_name: sender_name,
receiverName: receiverName, receiver_name: receiver_name,
receiverId: receiverId, receiver_id: receiver_id,
replyTo: replyTo, reply_to: reply_to,
replyToMsgId: replyToMsgId, reply_to_msg_id: reply_to_msg_id,
brokerURL: natsUrl, broker_url: broker_url,
payloads: payloads payloads: payloads
}); });
@@ -562,11 +584,11 @@ async function smartsend(subject, data, options = {}) {
const env_json_str = env.toString(); const env_json_str = env.toString();
// Publish to NATS if isPublish is true // Publish to NATS if isPublish is true
if (isPublish) { if (is_publish) {
await publish_message(natsUrl, subject, env_json_str, correlationId); await publish_message(broker_url, subject, env_json_str, correlation_id);
} }
// Return both envelope and JSON string (tuple-like structure) // Return both envelope and JSON string (tuple-like structure, matching Julia API)
return { return {
env: env, env: env,
env_json_str: env_json_str env_json_str: env_json_str
@@ -574,11 +596,11 @@ async function smartsend(subject, data, options = {}) {
} }
// Helper: Publish message to NATS // Helper: Publish message to NATS
async function publish_message(natsUrl, subject, message, correlation_id) { async function publish_message(broker_url, subject, message, correlation_id) {
/** /**
* Publish a message to a NATS subject with proper connection management * Publish a message to a NATS subject with proper connection management
* *
* @param {string} natsUrl - NATS server URL * @param {string} broker_url - NATS server URL
* @param {string} subject - NATS subject to publish to * @param {string} subject - NATS subject to publish to
* @param {string} message - JSON message to publish * @param {string} message - JSON message to publish
* @param {string} correlation_id - Correlation ID for logging * @param {string} correlation_id - Correlation ID for logging
@@ -591,7 +613,7 @@ async function publish_message(natsUrl, subject, message, correlation_id) {
// Example with nats.js: // Example with nats.js:
// import { connect } from 'nats'; // import { connect } from 'nats';
// const nc = await connect({ servers: [natsUrl] }); // const nc = await connect({ servers: [broker_url] });
// await nc.publish(subject, message); // await nc.publish(subject, message);
// nc.close(); // nc.close();
@@ -599,7 +621,7 @@ async function publish_message(natsUrl, subject, message, correlation_id) {
console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`); console.log(`[NATS PUBLISH] Subject: ${subject}, Message: ${message.substring(0, 100)}...`);
} }
// SmartReceive function // SmartReceive function - matches Julia smartreceive signature and behavior
async function smartreceive(msg, options = {}) { async function smartreceive(msg, options = {}) {
/** /**
* Receive and process messages from NATS * Receive and process messages from NATS
@@ -609,25 +631,25 @@ async function smartreceive(msg, options = {}) {
* *
* @param {Object} msg - NATS message object with payload property * @param {Object} msg - NATS message object with payload property
* @param {Object} options - Additional options * @param {Object} options - Additional options
* @param {Function} options.fileserverDownloadHandler - Function to handle downloading data from file server URLs * @param {Function} options.fileserver_download_handler - Function to handle downloading data from file server URLs
* @param {number} options.maxRetries - Maximum retry attempts for fetching URL (default: 5) * @param {number} options.max_retries - Maximum retry attempts for fetching URL (default: 5)
* @param {number} options.baseDelay - Initial delay for exponential backoff in ms (default: 100) * @param {number} options.base_delay - Initial delay for exponential backoff in ms (default: 100)
* @param {number} options.maxDelay - Maximum delay for exponential backoff in ms (default: 5000) * @param {number} options.max_delay - Maximum delay for exponential backoff in ms (default: 5000)
* *
* @returns {Promise<Object>} - Envelope dictionary with metadata and payloads field containing list of {dataname, data, type} objects * @returns {Promise<Object>} - JSON object of envelope with payloads field containing list of {dataname, data, type} tuples
*/ */
const { const {
fileserverDownloadHandler = _fetch_with_backoff, fileserver_download_handler = _fetch_with_backoff,
maxRetries = 5, max_retries = 5,
baseDelay = 100, base_delay = 100,
maxDelay = 5000 max_delay = 5000
} = options; } = options;
// Parse the JSON envelope // Parse the JSON envelope
const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload); const jsonStr = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
const json_data = JSON.parse(jsonStr); const json_data = JSON.parse(jsonStr);
log_trace(json_data.correlationId, `Processing received message`); log_trace(json_data.correlation_id, `Processing received message`);
// Process all payloads in the envelope // Process all payloads in the envelope
const payloads_list = []; const payloads_list = [];
@@ -642,32 +664,32 @@ async function smartreceive(msg, options = {}) {
if (transport === "direct") { if (transport === "direct") {
// Direct transport - payload is in the message // Direct transport - payload is in the message
log_trace(json_data.correlationId, `Direct transport - decoding payload '${dataname}'`); log_trace(json_data.correlation_id, `Direct transport - decoding payload '${dataname}'`);
// Extract base64 payload from the payload // Extract base64 payload from the payload
const payload_b64 = payload.data; const payload_b64 = payload.data;
// Decode Base64 payload // Decode Base64 payload
const payload_bytes = base64ToArrayBuffer(payload_b64); const payload_bytes = base64ToUint8Array(payload_b64);
// Deserialize based on type // Deserialize based on type
const data_type = payload.type; const data_type = payload.payload_type;
const data = _deserialize_data(payload_bytes, data_type, json_data.correlationId); const data = _deserialize_data(payload_bytes, data_type, json_data.correlation_id);
payloads_list.push({ dataname, data, type: data_type }); payloads_list.push({ dataname, data, type: data_type });
} else if (transport === "link") { } else if (transport === "link") {
// Link transport - payload is at URL // Link transport - payload is at URL
const url = payload.data; const url = payload.data;
log_trace(json_data.correlationId, `Link transport - fetching '${dataname}' from URL: ${url}`); log_trace(json_data.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`);
// Fetch with exponential backoff using the download handler // Fetch with exponential backoff using the download handler
const downloaded_data = await fileserverDownloadHandler( const downloaded_data = await fileserver_download_handler(
url, maxRetries, baseDelay, maxDelay, json_data.correlationId url, max_retries, base_delay, max_delay, json_data.correlation_id
); );
// Deserialize based on type // Deserialize based on type
const data_type = payload.type; const data_type = payload.payload_type;
const data = _deserialize_data(downloaded_data, data_type, json_data.correlationId); const data = _deserialize_data(downloaded_data, data_type, json_data.correlation_id);
payloads_list.push({ dataname, data, type: data_type }); payloads_list.push({ dataname, data, type: data_type });
} else { } else {
@@ -676,11 +698,69 @@ async function smartreceive(msg, options = {}) {
} }
// Replace payloads array with the processed list of {dataname, data, type} tuples // Replace payloads array with the processed list of {dataname, data, type} tuples
// This matches Julia's smartreceive return format
json_data.payloads = payloads_list; json_data.payloads = payloads_list;
return json_data; return json_data;
} }
// plik_oneshot_upload - matches Julia plik_oneshot_upload function
async function plik_oneshot_upload(file_server_url, dataname, data) {
/**
* Upload a single file to a plik server using one-shot mode
* This function uploads raw byte array to a plik server in one-shot mode (no upload session).
* It first creates a one-shot upload session by sending a POST request with {"OneShot": true},
* retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
*
* @param {string} file_server_url - Base URL of the plik server (e.g., "http://localhost:8080")
* @param {string} dataname - Name of the file being uploaded
* @param {Uint8Array} data - Raw byte data of the file content
* @returns {Promise<Object>} - Dictionary with keys: status, uploadid, fileid, url
*/
// Step 1: Get upload ID and token
const url_getUploadID = `${file_server_url}/upload`;
const headers = { "Content-Type": "application/json" };
const body = JSON.stringify({ OneShot: true });
let http_response = await fetch(url_getUploadID, {
method: "POST",
headers: headers,
body: body
});
const response_json = await http_response.json();
const uploadid = response_json.id;
const uploadtoken = response_json.uploadToken;
// Step 2: Upload file data
const url_upload = `${file_server_url}/file/${uploadid}`;
// Create multipart form data
const formData = new FormData();
const blob = new Blob([data], { type: "application/octet-stream" });
formData.append("file", blob, dataname);
http_response = await fetch(url_upload, {
method: "POST",
headers: { "X-UploadToken": uploadtoken },
body: formData
});
const fileResponseJson = await http_response.json();
const fileid = fileResponseJson.id;
// URL of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
const url = `${file_server_url}/file/${uploadid}/${fileid}/${encodeURIComponent(dataname)}`;
return {
status: http_response.status,
uploadid: uploadid,
fileid: fileid,
url: url
};
}
// Export for Node.js // Export for Node.js
if (typeof module !== 'undefined' && module.exports) { if (typeof module !== 'undefined' && module.exports) {
module.exports = { module.exports = {
@@ -692,6 +772,7 @@ if (typeof module !== 'undefined' && module.exports) {
_deserialize_data, _deserialize_data,
_fetch_with_backoff, _fetch_with_backoff,
_upload_to_fileserver, _upload_to_fileserver,
plik_oneshot_upload,
DEFAULT_SIZE_THRESHOLD, DEFAULT_SIZE_THRESHOLD,
DEFAULT_NATS_URL, DEFAULT_NATS_URL,
DEFAULT_FILESERVER_URL, DEFAULT_FILESERVER_URL,
@@ -711,6 +792,7 @@ if (typeof window !== 'undefined') {
_deserialize_data, _deserialize_data,
_fetch_with_backoff, _fetch_with_backoff,
_upload_to_fileserver, _upload_to_fileserver,
plik_oneshot_upload,
DEFAULT_SIZE_THRESHOLD, DEFAULT_SIZE_THRESHOLD,
DEFAULT_NATS_URL, DEFAULT_NATS_URL,
DEFAULT_FILESERVER_URL, DEFAULT_FILESERVER_URL,

View File

@@ -1,45 +1,60 @@
""" """
Micropython NATS Bridge - Bi-Directional Data Bridge for Micropython Python NATS Bridge - Bi-Directional Data Bridge
This module provides functionality for sending and receiving data over NATS This module provides functionality for sending and receiving data over NATS
using the Claim-Check pattern for large payloads. using the Claim-Check pattern for large payloads.
Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
Multi-Payload Support (Standard API):
The system uses a standardized list-of-tuples format for all payload operations.
Even when sending a single payload, the user must wrap it in a list.
API Standard:
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (always returns a list of tuples)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
""" """
import json import json
import random
import time import time
import usocket
import uselect
import ustruct
import uuid import uuid
try:
import ussl
HAS_SSL = True
except ImportError:
HAS_SSL = False
# Constants # Constants
DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport
DEFAULT_NATS_URL = "nats://localhost:4222" DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080" DEFAULT_FILESERVER_URL = "http://localhost:8080"
# ============================================= 100 ============================================== # # ============================================= 100 ============================================== #
class MessagePayload: class MessagePayload:
"""Internal message payload structure representing a single payload within a NATS message envelope.""" """Internal message payload structure representing a single payload within a NATS message envelope.
def __init__(self, data, msg_type, id="", dataname="", transport="direct", This structure represents a single payload within a NATS message envelope.
It supports both direct transport (base64-encoded data) and link transport (URL-based).
Attributes:
id: Unique identifier for this payload (e.g., "uuid4")
dataname: Name of the payload (e.g., "login_image")
payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
transport: Transport method ("direct" or "link")
encoding: Encoding method ("none", "json", "base64", "arrow-ipc")
size: Size of the payload in bytes
data: Payload data (bytes for direct, URL for link)
metadata: Optional metadata dictionary
"""
def __init__(self, data, payload_type, id="", dataname="", transport="direct",
encoding="none", size=0, metadata=None): encoding="none", size=0, metadata=None):
""" """
Initialize a MessagePayload. Initialize a MessagePayload.
Args: Args:
data: Payload data (bytes for direct, URL string for link) data: Payload data (base64 string for direct, URL string for link)
msg_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary") payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
id: Unique identifier for this payload (auto-generated if empty) id: Unique identifier for this payload (auto-generated if empty)
dataname: Name of the payload (auto-generated UUID if empty) dataname: Name of the payload (auto-generated UUID if empty)
transport: Transport method ("direct" or "link") transport: Transport method ("direct" or "link")
@@ -49,7 +64,7 @@ class MessagePayload:
""" """
self.id = id if id else self._generate_uuid() self.id = id if id else self._generate_uuid()
self.dataname = dataname if dataname else self._generate_uuid() self.dataname = dataname if dataname else self._generate_uuid()
self.type = msg_type self.payload_type = payload_type
self.transport = transport self.transport = transport
self.encoding = encoding self.encoding = encoding
self.size = size self.size = size
@@ -65,7 +80,7 @@ class MessagePayload:
payload_dict = { payload_dict = {
"id": self.id, "id": self.id,
"dataname": self.dataname, "dataname": self.dataname,
"type": self.type, "payload_type": self.payload_type,
"transport": self.transport, "transport": self.transport,
"encoding": self.encoding, "encoding": self.encoding,
"size": self.size, "size": self.size,
@@ -152,20 +167,24 @@ class MessageEnvelope:
return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime()) return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime())
def to_json(self): def to_json(self):
"""Convert envelope to JSON string.""" """Convert envelope to JSON string.
Returns:
str: JSON string representation of the envelope using snake_case field names
"""
obj = { obj = {
"correlationId": self.correlation_id, "correlation_id": self.correlation_id,
"msgId": self.msg_id, "msg_id": self.msg_id,
"timestamp": self.timestamp, "timestamp": self.timestamp,
"sendTo": self.send_to, "send_to": self.send_to,
"msgPurpose": self.msg_purpose, "msg_purpose": self.msg_purpose,
"senderName": self.sender_name, "sender_name": self.sender_name,
"senderId": self.sender_id, "sender_id": self.sender_id,
"receiverName": self.receiver_name, "receiver_name": self.receiver_name,
"receiverId": self.receiver_id, "receiver_id": self.receiver_id,
"replyTo": self.reply_to, "reply_to": self.reply_to,
"replyToMsgId": self.reply_to_msg_id, "reply_to_msg_id": self.reply_to_msg_id,
"brokerURL": self.broker_url "broker_url": self.broker_url
} }
# Include metadata if not empty # Include metadata if not empty
@@ -188,68 +207,126 @@ def log_trace(correlation_id, message):
print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message)) print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message))
def _serialize_data(data, msg_type): def _serialize_data(data, payload_type):
"""Serialize data according to specified format. """Serialize data according to specified format.
This function serializes arbitrary data into a binary representation based on the specified type.
It supports multiple serialization formats for different data types.
Args: Args:
data: Data to serialize data: Data to serialize
msg_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary") - "text": String
- "dictionary": JSON-serializable dict
- "table": Tabular data (pandas DataFrame or list of dicts)
- "image", "audio", "video", "binary": bytes
payload_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary")
Returns: Returns:
bytes: Binary representation of the serialized data bytes: Binary representation of the serialized data
Example:
>>> text_bytes = _serialize_data("Hello World", "text")
>>> json_bytes = _serialize_data({"key": "value"}, "dictionary")
>>> table_bytes = _serialize_data([{"id": 1, "name": "Alice"}], "table")
""" """
if msg_type == "text": if payload_type == "text":
if isinstance(data, str): if isinstance(data, str):
return data.encode('utf-8') return data.encode('utf-8')
else: else:
raise ValueError("Text data must be a string") raise ValueError("Text data must be a string")
elif msg_type == "dictionary": elif payload_type == "dictionary":
if isinstance(data, dict): if isinstance(data, dict):
json_str = json.dumps(data) json_str = json.dumps(data)
return json_str.encode('utf-8') return json_str.encode('utf-8')
else: else:
raise ValueError("Dictionary data must be a dict") raise ValueError("Dictionary data must be a dict")
elif msg_type in ("image", "audio", "video", "binary"): elif payload_type == "table":
# Support pandas DataFrame or list of dicts
try:
import pandas as pd
if isinstance(data, pd.DataFrame):
# Convert DataFrame to JSON and then to bytes
json_str = data.to_json(orient='records', force_ascii=False)
return json_str.encode('utf-8')
elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
# List of dicts
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Table data must be a pandas DataFrame or list of dicts")
except ImportError:
# Fallback: if pandas not available, treat as list of dicts
if isinstance(data, list):
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Table data requires pandas DataFrame or list of dicts (pandas not available)")
elif payload_type in ("image", "audio", "video", "binary"):
if isinstance(data, bytes): if isinstance(data, bytes):
return data return data
else: else:
raise ValueError("{} data must be bytes".format(msg_type.capitalize())) raise ValueError("{} data must be bytes".format(payload_type.capitalize()))
else: else:
raise ValueError("Unknown type: {}".format(msg_type)) raise ValueError("Unknown payload_type: {}".format(payload_type))
def _deserialize_data(data_bytes, msg_type, correlation_id): def _deserialize_data(data_bytes, payload_type, correlation_id):
"""Deserialize bytes to data based on type. """Deserialize bytes to data based on type.
This function converts serialized bytes back to Python data based on type.
It handles "text" (string), "dictionary" (JSON deserialization), "table" (JSON deserialization),
"image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data).
Args: Args:
data_bytes: Serialized data as bytes data_bytes: Serialized data as bytes
msg_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") payload_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
correlation_id: Correlation ID for logging correlation_id: Correlation ID for logging
Returns: Returns:
Deserialized data Deserialized data:
- "text": str
- "dictionary": dict
- "table": list of dicts (or pandas DataFrame if available)
- "image", "audio", "video", "binary": bytes
Example:
>>> text_data = _deserialize_data(b"Hello", "text", "corr_id")
>>> json_data = _deserialize_data(b'{"key": "value"}', "dictionary", "corr_id")
>>> table_data = _deserialize_data(b'[{"id": 1}]', "table", "corr_id")
""" """
if msg_type == "text": if payload_type == "text":
return data_bytes.decode('utf-8') return data_bytes.decode('utf-8')
elif msg_type == "dictionary": elif payload_type == "dictionary":
json_str = data_bytes.decode('utf-8') json_str = data_bytes.decode('utf-8')
return json.loads(json_str) return json.loads(json_str)
elif msg_type in ("image", "audio", "video", "binary"): elif payload_type == "table":
# Deserialize table data (JSON format)
json_str = data_bytes.decode('utf-8')
table_data = json.loads(json_str)
# If pandas is available, try to convert to DataFrame
try:
import pandas as pd
return pd.DataFrame(table_data)
except ImportError:
return table_data
elif payload_type in ("image", "audio", "video", "binary"):
return data_bytes return data_bytes
else: else:
raise ValueError("Unknown type: {}".format(msg_type)) raise ValueError("Unknown payload_type: {}".format(payload_type))
class NATSConnection: class NATSConnection:
"""Simple NATS connection for Micropython.""" """Simple NATS connection for Python and Micropython."""
def __init__(self, url=DEFAULT_NATS_URL): def __init__(self, url=DEFAULT_BROKER_URL):
"""Initialize NATS connection. """Initialize NATS connection.
Args: Args:
@@ -276,9 +353,19 @@ class NATSConnection:
def connect(self): def connect(self):
"""Connect to NATS server.""" """Connect to NATS server."""
# Use socket for both Python and Micropython
try:
import socket
addr = socket.getaddrinfo(self.host, self.port)[0][-1]
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.connect(addr)
except NameError:
# Micropython fallback
import usocket
addr = usocket.getaddrinfo(self.host, self.port)[0][-1] addr = usocket.getaddrinfo(self.host, self.port)[0][-1]
self.conn = usocket.socket() self.conn = usocket.socket()
self.conn.connect(addr) self.conn.connect(addr)
log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port)) log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port))
def publish(self, subject, message): def publish(self, subject, message):
@@ -294,7 +381,15 @@ class NATSConnection:
# Simple NATS protocol implementation # Simple NATS protocol implementation
msg = "PUB {} {}\r\n".format(subject, len(message)) msg = "PUB {} {}\r\n".format(subject, len(message))
msg = msg.encode('utf-8') + message + b"\r\n" msg = msg.encode('utf-8') + message + b"\r\n"
try:
import socket
self.conn.send(msg) self.conn.send(msg)
except NameError:
# Micropython fallback
import usocket
self.conn.send(msg)
log_trace("", "Message published to {}".format(subject)) log_trace("", "Message published to {}".format(subject))
def subscribe(self, subject, callback): def subscribe(self, subject, callback):
@@ -335,11 +430,14 @@ class NATSConnection:
def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""): def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""):
"""Fetch data from URL with exponential backoff. """Fetch data from URL with exponential backoff.
This function retrieves data from a URL with retry logic using
exponential backoff to handle transient failures.
Args: Args:
url: URL to fetch from url: URL to fetch from
max_retries: Maximum number of retry attempts max_retries: Maximum number of retry attempts (default: 5)
base_delay: Initial delay in milliseconds base_delay: Initial delay in milliseconds (default: 100)
max_delay: Maximum delay in milliseconds max_delay: Maximum delay in milliseconds (default: 5000)
correlation_id: Correlation ID for logging correlation_id: Correlation ID for logging
Returns: Returns:
@@ -347,33 +445,54 @@ def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, corr
Raises: Raises:
Exception: If all retry attempts fail Exception: If all retry attempts fail
Example:
>>> data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "corr_id")
""" """
delay = base_delay delay = base_delay
for attempt in range(1, max_retries + 1): for attempt in range(1, max_retries + 1):
try: try:
# Simple HTTP GET request # Simple HTTP GET request
# This is a simplified implementation # Try urequests for Micropython first, then requests for Python
# For production, you'd want a proper HTTP client try:
import urequests import urequests
response = urequests.get(url) response = urequests.get(url)
if response.status_code == 200: status_code = response.status_code
content = response.content
except ImportError:
try:
import requests
response = requests.get(url)
response.raise_for_status()
status_code = response.status_code
content = response.content
except ImportError:
raise Exception("No HTTP library available (urequests or requests)")
if status_code == 200:
log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt)) log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt))
return response.content return content
else: else:
raise Exception("Failed to fetch: {}".format(response.status_code)) raise Exception("Failed to fetch: {}".format(status_code))
except Exception as e: except Exception as e:
log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e))) log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e)))
if attempt < max_retries: if attempt < max_retries:
time.sleep(delay / 1000.0) time.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay) delay = min(delay * 2, max_delay)
raise Exception("Failed to fetch data after {} attempts".format(max_retries))
def plik_oneshot_upload(file_server_url, filename, data):
def plik_oneshot_upload(fileserver_url, dataname, data):
"""Upload a single file to a plik server using one-shot mode. """Upload a single file to a plik server using one-shot mode.
This function uploads raw byte data to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with {"OneShot": true},
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
Args: Args:
file_server_url: Base URL of the plik server fileserver_url: Base URL of the plik server (e.g., "http://localhost:8080")
filename: Name of the file being uploaded dataname: Name of the file being uploaded
data: Raw byte data of the file content data: Raw byte data of the file content
Returns: Returns:
@@ -382,23 +501,31 @@ def plik_oneshot_upload(file_server_url, filename, data):
- "uploadid": ID of the one-shot upload session - "uploadid": ID of the one-shot upload session
- "fileid": ID of the uploaded file within the session - "fileid": ID of the uploaded file within the session
- "url": Full URL to download the uploaded file - "url": Full URL to download the uploaded file
Example:
>>> result = plik_oneshot_upload("http://localhost:8080", "test.txt", b"hello world")
>>> result["status"], result["uploadid"], result["fileid"], result["url"]
""" """
import urequests
import json import json
try:
import urequests
except ImportError:
import requests as urequests
# Get upload ID # Get upload ID
url_get_upload_id = "{}/upload".format(file_server_url) url_get_upload_id = "{}/upload".format(fileserver_url)
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
body = json.dumps({"OneShot": True}) body = json.dumps({"OneShot": True})
response = urequests.post(url_get_upload_id, headers=headers, data=body) response = urequests.post(url_get_upload_id, headers=headers, data=body)
response_json = json.loads(response.content) response_json = json.loads(response.text if hasattr(response, 'text') else response.content)
uploadid = response_json.get("id") uploadid = response_json.get("id")
uploadtoken = response_json.get("uploadToken") uploadtoken = response_json.get("uploadToken")
# Upload file # Upload file
url_upload = "{}/file/{}".format(file_server_url, uploadid) url_upload = "{}/file/{}".format(fileserver_url, uploadid)
headers = {"X-UploadToken": uploadtoken} headers = {"X-UploadToken": uploadtoken}
# For Micropython, we need to construct the multipart form data manually # For Micropython, we need to construct the multipart form data manually
@@ -407,7 +534,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
# Create multipart body # Create multipart body
part1 = "--{}\r\n".format(boundary) part1 = "--{}\r\n".format(boundary)
part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(filename) part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(dataname)
part1 += "Content-Type: application/octet-stream\r\n\r\n" part1 += "Content-Type: application/octet-stream\r\n\r\n"
part1_bytes = part1.encode('utf-8') part1_bytes = part1.encode('utf-8')
@@ -421,10 +548,10 @@ def plik_oneshot_upload(file_server_url, filename, data):
content_type = "multipart/form-data; boundary={}".format(boundary) content_type = "multipart/form-data; boundary={}".format(boundary)
response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body) response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body)
response_json = json.loads(response.content) response_json = json.loads(response.text if hasattr(response, 'text') else response.content)
fileid = response_json.get("id") fileid = response_json.get("id")
url = "{}/file/{}/{}".format(file_server_url, uploadid, filename) url = "{}/file/{}/{}".format(fileserver_url, uploadid, dataname)
return { return {
"status": response.status_code, "status": response.status_code,
@@ -434,7 +561,7 @@ def plik_oneshot_upload(file_server_url, filename, data):
} }
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL, def smartsend(subject, data, broker_url=DEFAULT_BROKER_URL, fileserver_url=DEFAULT_FILESERVER_URL,
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD, fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge", correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True): receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True):
@@ -447,27 +574,38 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
Args: Args:
subject: NATS subject to publish the message to subject: NATS subject to publish the message to
data: List of (dataname, data, type) tuples to send data: List of (dataname, data, payload_type) tuples to send
nats_url: URL of the NATS server - dataname: Name of the payload
- data: The actual data to send
- payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
broker_url: URL of the NATS server
fileserver_url: URL of the HTTP file server fileserver_url: URL of the HTTP file server
fileserver_upload_handler: Function to handle fileserver uploads fileserver_upload_handler: Function to handle fileserver uploads (must return dict with "status", "uploadid", "fileid", "url" keys)
size_threshold: Threshold in bytes separating direct vs link transport size_threshold: Threshold in bytes separating direct vs link transport (default: 1MB)
correlation_id: Optional correlation ID for tracing correlation_id: Optional correlation ID for tracing; if None, a UUID is generated
msg_purpose: Purpose of the message msg_purpose: Purpose of the message ("ACK", "NACK", "updateStatus", "shutdown", "chat", etc.)
sender_name: Name of the sender sender_name: Name of the sender
receiver_name: Name of the receiver receiver_name: Name of the receiver (empty string means broadcast)
receiver_id: UUID of the receiver receiver_id: UUID of the receiver (empty string means broadcast)
reply_to: Topic to reply to reply_to: Topic to reply to (empty string if no reply expected)
reply_to_msg_id: Message ID this message is replying to reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS (default: True) is_publish: Whether to automatically publish the message to NATS (default: True)
- When True: message is published to NATS
- When False: returns envelope and JSON string without publishing
Returns: Returns:
tuple: (env, env_json_str) where: tuple: (env, env_json_str) where:
- env: MessageEnvelope object with all metadata and payloads - env: MessageEnvelope object with all metadata and payloads
- env_json_str: JSON string representation of the envelope for publishing - env_json_str: JSON string representation of the envelope for publishing
Example:
>>> data = [("message", "Hello World!", "text")]
>>> env, env_json_str = smartsend("/test", data)
>>> # env: MessageEnvelope with all metadata and payloads
>>> # env_json_str: JSON string for publishing
""" """
# Generate correlation ID if not provided # Generate correlation ID if not provided
cid = correlation_id if correlation_id else str(uuid.uuid4()) cid = correlation_id if correlation_id is not None else str(uuid.uuid4())
log_trace(cid, "Starting smartsend for subject: {}".format(subject)) log_trace(cid, "Starting smartsend for subject: {}".format(subject))
@@ -482,16 +620,19 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
payload_bytes = _serialize_data(payload_data, payload_type) payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes) payload_size = len(payload_bytes)
log_trace(cid, "Serialized payload '{}' (type: {}) size: {} bytes".format( log_trace(cid, "Serialized payload '{}' (payload_type: {}) size: {} bytes".format(
dataname, payload_type, payload_size)) dataname, payload_type, payload_size))
# Decision: Direct vs Link # Decision: Direct vs Link
if payload_size < size_threshold: if payload_size < size_threshold:
# Direct path - Base64 encode and send via NATS # Direct path - Base64 encode and send via NATS
payload_b64 = _serialize_data(payload_bytes, "binary") # Already bytes
# Convert to base64 string for JSON # Convert to base64 string for JSON
try:
import ubinascii import ubinascii
payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip() payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip()
except ImportError:
import base64
payload_b64_str = base64.b64encode(payload_bytes).decode('utf-8')
log_trace(cid, "Using direct transport for {} bytes".format(payload_size)) log_trace(cid, "Using direct transport for {} bytes".format(payload_size))
@@ -514,10 +655,10 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
# Upload to HTTP server # Upload to HTTP server
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
if response["status"] != 200: if response.get("status") != 200:
raise Exception("Failed to upload data to fileserver: {}".format(response["status"])) raise Exception("Failed to upload data to fileserver: {}".format(response.get("status")))
url = response["url"] url = response.get("url")
log_trace(cid, "Uploaded to URL: {}".format(url)) log_trace(cid, "Uploaded to URL: {}".format(url))
# Create MessagePayload for link transport # Create MessagePayload for link transport
@@ -546,7 +687,7 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
receiver_id=receiver_id, receiver_id=receiver_id,
reply_to=reply_to, reply_to=reply_to,
reply_to_msg_id=reply_to_msg_id, reply_to_msg_id=reply_to_msg_id,
broker_url=nats_url, broker_url=broker_url,
metadata={} metadata={}
) )
@@ -554,7 +695,7 @@ def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_F
# Publish to NATS if is_publish is True # Publish to NATS if is_publish is True
if is_publish: if is_publish:
nats_conn = NATSConnection(nats_url) nats_conn = NATSConnection(broker_url)
nats_conn.connect() nats_conn.connect()
nats_conn.publish(subject, msg_json) nats_conn.publish(subject, msg_json)
nats_conn.close() nats_conn.close()
@@ -571,18 +712,29 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
(base64 decoded payloads) and link transport (URL-based payloads). (base64 decoded payloads) and link transport (URL-based payloads).
Args: Args:
msg: NATS message to process (dict with payload data) msg: NATS message to process (dict or JSON string with envelope data)
fileserver_download_handler: Function to handle downloading data from file server URLs fileserver_download_handler: Function to handle downloading data from file server URLs
max_retries: Maximum retry attempts for fetching URL Receives: (url, max_retries, base_delay, max_delay, correlation_id)
base_delay: Initial delay for exponential backoff in ms Returns: bytes (the downloaded data)
max_delay: Maximum delay for exponential backoff in ms max_retries: Maximum retry attempts for fetching URL (default: 5)
base_delay: Initial delay for exponential backoff in ms (default: 100)
max_delay: Maximum delay for exponential backoff in ms (default: 5000)
Returns: Returns:
dict: Envelope dictionary with metadata and 'payloads' field containing list of (dataname, data, type) tuples dict: Envelope dictionary with metadata and 'payloads' field containing list of
(dataname, data, payload_type) tuples
Example:
>>> env = smartreceive(msg)
>>> # env contains envelope metadata and payloads field
>>> # env["payloads"] = [(dataname1, data1, payload_type1), ...]
>>> for dataname, data, payload_type in env["payloads"]:
... print("Received {} of type {}: {}".format(dataname, payload_type, data))
""" """
# Parse the JSON envelope # Parse the JSON envelope
json_data = msg if isinstance(msg, dict) else json.loads(msg) json_data = msg if isinstance(msg, dict) else json.loads(msg)
log_trace(json_data.get("correlationId", ""), "Processing received message") correlation_id = json_data.get("correlation_id", "")
log_trace(correlation_id, "Processing received message")
# Process all payloads in the envelope # Process all payloads in the envelope
payloads_list = [] payloads_list = []
@@ -596,43 +748,47 @@ def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retri
dataname = payload.get("dataname", "") dataname = payload.get("dataname", "")
if transport == "direct": if transport == "direct":
log_trace(json_data.get("correlationId", ""), log_trace(correlation_id,
"Direct transport - decoding payload '{}'".format(dataname)) "Direct transport - decoding payload '{}'".format(dataname))
# Extract base64 payload from the payload # Extract base64 payload from the payload
payload_b64 = payload.get("data", "") payload_b64 = payload.get("data", "")
# Decode Base64 payload # Decode Base64 payload
try:
import ubinascii import ubinascii
payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8')) payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8'))
except ImportError:
import base64
payload_bytes = base64.b64decode(payload_b64)
# Deserialize based on type # Deserialize based on type
data_type = payload.get("type", "") payload_type = payload.get("payload_type", "")
data = _deserialize_data(payload_bytes, data_type, json_data.get("correlationId", "")) data = _deserialize_data(payload_bytes, payload_type, correlation_id)
payloads_list.append((dataname, data, data_type)) payloads_list.append((dataname, data, payload_type))
elif transport == "link": elif transport == "link":
# Extract download URL from the payload # Extract download URL from the payload
url = payload.get("data", "") url = payload.get("data", "")
log_trace(json_data.get("correlationId", ""), log_trace(correlation_id,
"Link transport - fetching '{}' from URL: {}".format(dataname, url)) "Link transport - fetching '{}' from URL: {}".format(dataname, url))
# Fetch with exponential backoff # Fetch with exponential backoff
downloaded_data = fileserver_download_handler( downloaded_data = fileserver_download_handler(
url, max_retries, base_delay, max_delay, json_data.get("correlationId", "") url, max_retries, base_delay, max_delay, correlation_id
) )
# Deserialize based on type # Deserialize based on type
data_type = payload.get("type", "") payload_type = payload.get("payload_type", "")
data = _deserialize_data(downloaded_data, data_type, json_data.get("correlationId", "")) data = _deserialize_data(downloaded_data, payload_type, correlation_id)
payloads_list.append((dataname, data, data_type)) payloads_list.append((dataname, data, payload_type))
else: else:
raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport)) raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport))
# Replace payloads field with the processed list of (dataname, data, type) tuples # Replace payloads field with the processed list of (dataname, data, payload_type) tuples
json_data["payloads"] = payloads_list json_data["payloads"] = payloads_list
return json_data return json_data
@@ -651,11 +807,11 @@ def get_timestamp():
# Example usage # Example usage
if __name__ == "__main__": if __name__ == "__main__":
print("NATSBridge for Micropython") print("NATSBridge - Bi-Directional Data Bridge")
print("=========================") print("=======================================")
print("This module provides:") print("This module provides:")
print(" - MessageEnvelope: Message envelope structure") print(" - MessageEnvelope: Message envelope structure with snake_case fields")
print(" - MessagePayload: Payload structure") print(" - MessagePayload: Payload structure with payload_type field")
print(" - smartsend: Send data via NATS with automatic transport selection") print(" - smartsend: Send data via NATS with automatic transport selection")
print(" - smartreceive: Receive and process messages from NATS") print(" - smartreceive: Receive and process messages from NATS")
print(" - plik_oneshot_upload: Upload files to HTTP file server") print(" - plik_oneshot_upload: Upload files to HTTP file server")
@@ -663,10 +819,12 @@ if __name__ == "__main__":
print() print()
print("Usage:") print("Usage:")
print(" from nats_bridge import smartsend, smartreceive") print(" from nats_bridge import smartsend, smartreceive")
print(" data = [(\"message\", \"Hello World\", \"text\")]") print()
print(" env = smartsend(\"my.subject\", data)") print(" # Send data (list of (dataname, data, payload_type) tuples)")
print(" data = [(\"message\", \"Hello World!\", \"text\")]")
print(" env, env_json_str = smartsend(\"my.subject\", data)")
print() print()
print(" # On receiver:") print(" # On receiver:")
print(" payloads = smartreceive(msg)") print(" env = smartreceive(msg)")
print(" for dataname, data, type in payloads:") print(" for dataname, data, payload_type in env[\"payloads\"]:")
print(" print(f\"Received {dataname} of type {type}: {data}\")") print(" print(\"Received {} of type {}: {}\".format(dataname, payload_type, data))")