remove NATS integration

This commit is contained in:
2026-05-15 11:55:41 +07:00
parent 0e24b7d044
commit df9012e0eb
10 changed files with 254 additions and 973 deletions

View File

@@ -3,8 +3,7 @@
* Browser-Compatible Implementation (Client-Side Rendering)
*
* This module provides functionality for sending and receiving data across network boundaries
* using NATS as the message bus, with support for both direct payload transport and
* URL-based transport for larger payloads.
* with support for both direct payload transport and URL-based transport for larger payloads.
*
* Supported payload types: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
* Note: Browser version does NOT support Apache Arrow IPC (arrowtable) due to browser compatibility constraints.
@@ -14,10 +13,8 @@
* - Modern browser with ES module support (or use module bundler)
* - Web Crypto API for UUID generation
* - Fetch API for HTTP requests
* - WebSocket support for NATS connections (use ws:// or wss:// URLs)
*
* Browser-compatible version uses:
* - nats.ws for WebSocket-based NATS connections
* - Web Crypto API for UUID generation
* - Uint8Array instead of Buffer
* - fetch API for file server communication
@@ -25,9 +22,6 @@
* @module msghandlerCSR
*/
// Import browser-compatible NATS client
import * as nats from 'nats.ws';
// Use native fetch available in browsers
// ---------------------------------------------- Constants ---------------------------------------------- //
@@ -38,9 +32,9 @@ import * as nats from 'nats.ws';
const DEFAULT_SIZE_THRESHOLD = 500_000;
/**
* Default NATS server URL (WebSocket protocol)
* Default broker URL
*/
const DEFAULT_BROKER_URL = 'ws://localhost:4222';
const DEFAULT_BROKER_URL = 'localhost:4222';
/**
* Default HTTP file server URL for link transport
@@ -75,34 +69,6 @@ function base64ToBuffer(base64) {
return bytes;
}
/**
* Convert Uint8Array to Base64 string (Unicode-safe version)
* Uses TextEncoder/TextDecoder for proper Unicode handling
* @param {Uint8Array} data - Data to encode
* @returns {string} Base64 encoded string
*/
function bufferToBase64UnicodeSafe(data) {
const bytes = new Uint8Array(data);
// Use TextDecoder to properly handle the bytes as text
const binary = String.fromCharCode(...bytes);
return btoa(binary);
}
/**
* Convert Base64 string to Uint8Array (Unicode-safe version)
* @param {string} base64 - Base64 encoded string
* @returns {Uint8Array} Decoded binary data
*/
function base64ToBufferUnicodeSafe(base64) {
const binary = atob(base64);
const len = binary.length;
const bytes = new Uint8Array(len);
for (let i = 0; i < len; i++) {
bytes[i] = binary.charCodeAt(i);
}
return bytes;
}
/**
* Generate UUID v4 using Web Crypto API
* @returns {string} UUID string
@@ -323,191 +289,11 @@ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlatio
throw new Error(`Failed to fetch data after ${maxRetries} attempts`);
}
// ---------------------------------------------- NATS Client ---------------------------------------------- //
/**
* NATS client wrapper for connection management
* Supports both single-use and persistent connection modes
*/
class NATSClient {
/**
* Create a new NATS client
* @param {string} url - NATS server URL (ws:// or wss://)
* @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes
*/
constructor(url, keepAlive = false) {
this.url = url;
this.connection = null;
this.keepAlive = keepAlive;
}
/**
* Connect to NATS server
* @returns {Promise<NATS.Connection>}
*/
async connect() {
if (this.connection) {
return this.connection;
}
this.connection = await nats.connect({ servers: this.url });
return this.connection;
}
/**
* Publish message to NATS subject
* @param {string} subject - NATS subject to publish to
* @param {string} message - Message to publish
* @param {string} correlationId - Correlation ID for logging
*/
async publish(subject, message, correlationId) {
if (!this.connection) {
await this.connect();
}
await this.connection.publish(subject, message);
logTrace(correlationId, `Message published to ${subject}`);
}
/**
* Close the NATS connection
*/
async close() {
if (this.connection) {
this.connection.close();
this.connection = null;
}
}
/**
* Get the current connection (for external use)
* @returns {NATS.Connection|null}
*/
getConnection() {
return this.connection;
}
/**
* Check if connected
* @returns {boolean}
*/
isConnected() {
return this.connection !== null;
}
}
/**
* Connection pool for managing multiple NATS connections
* Useful for applications with multiple concurrent publishers
*/
class NATSConnectionPool {
/**
* Create a new connection pool
* @param {string} url - NATS server URL (ws:// or wss://)
* @param {number} [maxSize=10] - Maximum pool size
*/
constructor(url, maxSize = 10) {
this.url = url;
this.maxSize = maxSize;
this.connections = new Map();
this.idCounter = 0;
}
/**
* Get a connection from the pool (or create new)
* @returns {Promise<NATSClient>}
*/
async acquire() {
// Try to find an existing idle connection
for (const [id, client] of this.connections) {
if (client.isConnected()) {
return client;
}
}
// Create new connection if under limit
if (this.connections.size < this.maxSize) {
const id = `conn_${++this.idCounter}`;
const client = new NATSClient(this.url, true);
await client.connect();
this.connections.set(id, client);
return client;
}
// Pool exhausted - create new connection (caller should close when done)
const client = new NATSClient(this.url, false);
await client.connect();
return client;
}
/**
* Return a connection to the pool
* @param {NATSClient} client - Connection to return
*/
release(client) {
// Only return persistent connections
if (client.keepAlive && client.isConnected()) {
// Connection already in pool, do nothing
return;
}
// Non-persistent connection - close it
client.close();
}
/**
* Close all connections in the pool
*/
async closeAll() {
for (const [id, client] of this.connections) {
await client.close();
}
this.connections.clear();
}
}
// ---------------------------------------------- Core Functions ---------------------------------------------- //
/**
* Publish message to NATS
* @param {string|NATSClient|NATS.Connection} brokerUrlOrClient - NATS URL, client, or connection
* @param {string} subject - NATS subject to publish to
* @param {string} message - JSON message to publish
* @param {string} correlationId - Correlation ID for tracing
* @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections)
*/
async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) {
let conn;
let shouldClose = false;
if (brokerUrlOrClient instanceof NATSClient) {
conn = brokerUrlOrClient;
} else if (brokerUrlOrClient && typeof brokerUrlOrClient.publish === 'function') {
// Create a wrapper for direct connection (duck-typing check for NATS connection)
conn = {
async publish(subj, msg) {
await brokerUrlOrClient.publish(subj, msg);
},
async close() {
await brokerUrlOrClient.close();
}
};
shouldClose = true;
} else {
// String URL - create new client
const client = new NATSClient(brokerUrlOrClient);
conn = client;
shouldClose = true;
}
await conn.publish(subject, message, correlationId);
// Only close if explicitly requested and it's a short-lived client
if (shouldClose && closeConnection && conn instanceof NATSClient) {
await conn.close();
}
}
/**
* Build message envelope from payloads and metadata
* @param {string} subject - NATS subject
* @param {string} subject - Subject/topic
* @param {Array} payloads - Array of payload objects
* @param {Object} options - Envelope metadata options
* @returns {Object} Envelope object
@@ -560,19 +346,22 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
}
/**
* Send data via NATS with automatic transport selection
* Send data with automatic transport selection
*
* This function intelligently routes data delivery based on payload size.
* If the serialized payload is smaller than size_threshold, it encodes the data as Base64
* and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
* and publishes only the download URL over NATS.
* into a "direct" payload. Otherwise, it uploads the data to a fileserver
* and creates a "link" payload with the URL.
*
* @param {string} subject - NATS subject to publish the message to
* Transport publishing is the caller's responsibility. This function returns the
* envelope and its JSON string representation.
*
* @param {string} subject - Subject/topic to send the message to
* @param {Array} data - List of [dataname, data, type] tuples to send
* - type: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
* - Note: "arrowtable" is NOT supported in browser (use "jsontable" for tabular data)
* @param {Object} options - Optional configuration
* @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server (WebSocket)
* @param {string} [options.broker_url=DEFAULT_BROKER_URL] - Broker URL (for envelope metadata)
* @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server
* @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads
* @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport
@@ -583,8 +372,6 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
* @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast)
* @param {string} [options.reply_to=""] - Topic to reply to
* @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to
* @param {boolean} [options.is_publish=true] - Whether to automatically publish the message
* @param {NATSClient|NATS.Connection} [options.nats_connection=null] - Pre-existing NATS connection
* @param {string} [options.msg_id=uuidv4()] - Message ID
* @param {string} [options.sender_id=uuidv4()] - Sender ID
* @returns {Promise<[Object, string]>} Tuple of [env, env_json_str]
@@ -593,8 +380,7 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
* // Send a single payload
* const [env, envJsonStr] = await msghandlerCSR.smartsend(
* "/test",
* [["dataname1", data1, "dictionary"]],
* { broker_url: "wss://nats.example.com" }
* [["dataname1", data1, "dictionary"]]
* );
*
* // Send multiple payloads (use jsontable instead of arrowtable for browser)
@@ -603,9 +389,11 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
* [
* ["dataname1", data1, "dictionary"],
* ["dataname2", tableData, "jsontable"]
* ],
* { broker_url: "wss://nats.example.com" }
* ]
* );
*
* // Publish via your transport (NATS, MQTT, HTTP, etc.)
* // await myNatsClient.publish("/test", envJsonStr);
*/
async function smartsend(subject, data, options = {}) {
const {
@@ -620,8 +408,6 @@ async function smartsend(subject, data, options = {}) {
receiver_id = '',
reply_to = '',
reply_to_msg_id = '',
is_publish = true,
nats_connection = null,
msg_id = uuidv4(),
sender_id = uuidv4()
} = options;
@@ -695,25 +481,18 @@ async function smartsend(subject, data, options = {}) {
const env_json_str = JSON.stringify(env);
if (is_publish) {
if (nats_connection) {
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
} else {
await publishMessage(broker_url, subject, env_json_str, correlation_id);
}
}
return [env, env_json_str];
}
/**
* Receive and process NATS message
* Receive and process messages
*
* This function processes incoming NATS messages, handling both direct transport
* This function processes incoming messages, handling both direct transport
* (base64 decoded payloads) and link transport (URL-based payloads).
* It deserializes the data based on the transport type and returns the result.
*
* @param {Object} msg - NATS message object with payload property
* @param {string|Object} msg - Message payload. Accepts either a JSON string directly,
* or an object with a `data` or `payload` property containing the JSON string.
* @param {Object} options - Optional configuration
* @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads
* @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL
@@ -722,13 +501,18 @@ async function smartsend(subject, data, options = {}) {
* @returns {Promise<Object>} Envelope object with processed payloads
*
* @example
* // Receive and process message
* const env = await msghandlerCSR.smartreceive(msg, {
* // Receive from JSON string directly
* const env = await msghandlerCSR.smartreceive(jsonString, {
* fileserver_download_handler: msghandlerCSR.fetchWithBackoff,
* max_retries: 5,
* base_delay: 100,
* max_delay: 5000
* });
*
* // Receive from transport message object (e.g., NATS, MQTT)
* const env = await msghandlerCSR.smartreceive(natsMsg, {
* fileserver_download_handler: msghandlerCSR.fetchWithBackoff
* });
* // env.payloads is an Array of [dataname, data, type] arrays
* for (const [dataname, data, type] of env.payloads) {
* console.log(`${dataname}: ${data} (type: ${type})`);
@@ -742,20 +526,20 @@ async function smartreceive(msg, options = {}) {
max_delay = 5000
} = options;
// Debug: Log message object structure
logTrace('smartreceive', `smartreceive: msg object keys: ${Object.keys(msg).join(', ')}`);
logTrace('smartreceive', `smartreceive: msg.data type: ${typeof msg.data}, constructor: ${msg.data?.constructor?.name}`);
logTrace('smartreceive', `smartreceive: msg.payload type: ${typeof msg.payload}, constructor: ${msg.payload?.constructor?.name}`);
// Parse the JSON envelope
// NATS.js v2.x uses msg.data instead of msg.payload
// Handle both raw JSON strings and transport message objects
let payload;
if (msg.data !== undefined) {
payload = typeof msg.data === 'string' ? msg.data : new TextDecoder().decode(msg.data);
} else if (msg.payload !== undefined) {
payload = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
if (typeof msg === 'string') {
payload = msg;
} else if (msg !== null && typeof msg === 'object') {
if (msg.data !== undefined) {
payload = typeof msg.data === 'string' ? msg.data : new TextDecoder().decode(msg.data);
} else if (msg.payload !== undefined) {
payload = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
} else {
throw new Error('Message has neither data nor payload property');
}
} else {
throw new Error('Message has neither data nor payload property');
throw new Error('Invalid message format: expected JSON string or message object');
}
logTrace('smartreceive', `smartreceive: raw payload length=${payload.length}`);
@@ -839,61 +623,15 @@ async function smartreceive(msg, options = {}) {
const msghandlerCSR = {
/**
* NATS client class for connection management
* Supports both single-use and persistent connection modes
*
* @example
* // Single-use connection (closes after publish)
* const client = new msghandlerCSR.NATSClient("wss://nats.example.com");
* await msghandlerCSR.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
* await client.close();
*
* // Persistent connection (keeps connection open)
* const client = new msghandlerCSR.NATSClient("wss://nats.example.com", true);
* await client.connect();
* await msghandlerCSR.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false });
* await msghandlerCSR.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id");
* // Connection remains open for more publishes
* await client.close();
*/
NATSClient,
/**
* Connection pool for managing multiple NATS connections
* Useful for applications with multiple concurrent publishers
*
* @example
* const pool = new msghandlerCSR.NATSConnectionPool("wss://nats.example.com", 10);
* const client = await pool.acquire();
* await msghandlerCSR.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
* pool.release(client);
* await pool.closeAll();
*/
NATSConnectionPool,
/**
* Send data via NATS with automatic transport selection
* Send data with automatic transport selection
*/
smartsend,
/**
* Receive and process NATS message
* Receive and process messages
*/
smartreceive,
/**
* Publish message to NATS
*
* @example
* // Using a persistent connection
* const client = new msghandlerCSR.NATSClient("wss://nats.example.com", true);
* await client.connect();
* await msghandlerCSR.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false);
* // Connection stays open for more publishes
* await client.close();
*/
publishMessage,
/**
* Upload data to plik server in one-shot mode
*/

View File

@@ -1,7 +1,7 @@
# Bi-Directional Data Bridge - Julia Module
# Implements smartsend and smartreceive for NATS communication
# Implements smartsend and smartreceive for message transport
# This module provides functionality for sending and receiving data across network boundaries
# using NATS as the message bus, with support for both direct payload transport and
# with support for both direct payload transport and
# URL-based transport for larger payloads.
#
# File Server Handler Architecture:
@@ -48,12 +48,12 @@ using JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting, DataFrames
# Constants
const DEFAULT_SIZE_THRESHOLD = 500_000 # 0.5MB - threshold for switching from direct to link transport
const DEFAULT_BROKER_URL = "nats://localhost:4222" # Default NATS server URL
const DEFAULT_BROKER_URL = "localhost:4222" # Default broker URL
const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport
""" msg_payload_v1 - Internal message payload structure
This structure represents a single payload within a NATS message envelope.
This structure represents a single payload within a message envelope.
It supports both direct transport (base64-encoded data) and link transport (URL-based).
# Arguments:
@@ -141,11 +141,11 @@ end
""" msg_envelope_v1 - Internal message envelope structure
This structure represents a complete NATS message envelope containing multiple payloads
This structure represents a complete message envelope containing multiple payloads
with metadata for routing, tracing, and message context.
# Arguments:
- `send_to::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt")
- `send_to::String` - Subject/topic to send the message to (e.g., "/agent/wine/api/v1/prompt")
- `payloads::Vector{msg_payload_v1}` - List of payloads to include in the message
# Keyword Arguments:
@@ -159,7 +159,7 @@ with metadata for routing, tracing, and message context.
- `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast)
- `reply_to::String = ""` - Topic where receiver should reply (empty string if no reply expected)
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
- `broker_url::String = DEFAULT_BROKER_URL` - NATS broker URL
- `broker_url::String = DEFAULT_BROKER_URL` - Broker URL
- `metadata::Dict{String, Any} = Dict{String, Any}()` - Optional message-level metadata
# Return:
@@ -199,7 +199,7 @@ struct msg_envelope_v1
reply_to::String # sender ask receiver to reply to this topic
reply_to_msg_id::String # the message id this message is replying to
broker_url::String # NATS server address
broker_url::String # Broker address
metadata::Dict{String, Any}
payloads::Vector{msg_payload_v1} # multiple payload store here
@@ -244,7 +244,7 @@ end
""" envelope_to_json - Convert msg_envelope_v1 to JSON string
This function converts the msg_envelope_v1 struct to a JSON string representation,
preserving all metadata and payload information for NATS message publishing.
preserving all metadata and payload information for transport publishing.
# Function Workflow:
1. Creates a dictionary with envelope metadata (correlation_id, msg_id, timestamp, etc.)
@@ -337,7 +337,7 @@ function log_trace(correlation_id::String, message::String)
end
""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
""" smartsend - Send data with automatic transport selection, depending on payload size
This function intelligently routes data delivery based on payload size relative to a threshold.
If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and constructs a "direct" msg_payload_v1.
@@ -347,7 +347,7 @@ The function accepts a list of (dataname, data, type) tuples as input and proces
Each payload can have a different type, enabling mixed-content messages (e.g., chat with text, images, audio).
This function creates and returns the msg_envelope_v1 and its JSON string representation only.
NATS publishing must be performed by the caller.
Transport publishing must be performed by the caller.
# Function Workflow:
1. Iterates through the list of (dataname, data, type) tuples
@@ -356,10 +356,10 @@ NATS publishing must be performed by the caller.
4. For small payloads: encodes as Base64, constructs a "direct" msg_payload_v1
5. For large payloads: uploads to the fileserver, constructs a "link" msg_payload_v1 with the URL
6. Constructs msg_envelope_v1 with all payloads and metadata
7. Converts envelope to JSON string and returns (NATS publishing is handled by the caller)
7. Converts envelope to JSON string and returns (transport publishing is handled by the caller)
# Arguments:
- `subject::String` - NATS subject to publish the message to
- `subject::String` - Subject/topic to send the message to
- `data::AbstractArray{Tuple{String, T1, String}, 1}` - List of (dataname, data, type) tuples to send
- `dataname::String` - Name of the payload
- `data::T1` - The actual data to send (any type supported by `_serialize_data`)
@@ -367,7 +367,7 @@ NATS publishing must be performed by the caller.
- No standalone `type` parameter - type is specified per payload
# Keyword Arguments:
- `broker_url::String = DEFAULT_BROKER_URL` - URL of the NATS server
- `broker_url::String = DEFAULT_BROKER_URL` - URL of the broker
- `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads
- `fileserver_upload_handler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
@@ -414,15 +414,15 @@ env, msg_json = smartsend("chat.subject", [
("audio_clip", audio_data, "audio")
])
# Publish the JSON string directly using NATS (manual publish)
# conn = NATS.connect(broker_url)
# NATS.publish(conn, subject, env_json_str)
# Publish the JSON string directly via your transport (manual publish)
# conn = my_transport.connect(broker_url)
# my_transport.publish(conn, subject, env_json_str)
```
"""
function smartsend(
subject::String, # smartreceive's subject
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples. Use Tuple{String, Any, String}[] for empty payloads
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
broker_url::String = DEFAULT_BROKER_URL, # Broker URL
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
@@ -461,7 +461,7 @@ function smartsend(
# Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport
# Direct path - Base64 encode and send via NATS
# Direct path - Base64 encode and include in message envelope
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
log_trace(correlation_id, "Using direct transport for $payload_size bytes") # Log transport choice
@@ -486,7 +486,7 @@ function smartsend(
)
push!(payloads, payload)
else
# Link path - Upload to HTTP server, send URL via NATS
# Link path - Upload to HTTP server, include URL in message envelope
log_trace(correlation_id, "Using link transport, uploading to fileserver") # Log link transport choice
# Upload to HTTP server
@@ -703,13 +703,13 @@ function _serialize_data(data::Any, payload_type::String)
end
# """ publish_message - Publish message to NATS
# This function publishes a message to a NATS subject with proper
# """ publish_message - Publish message via transport
# This function publishes a message via the transport with proper
# connection management and logging.
# # Arguments:
# - `broker_url::String` - NATS server URL (e.g., "nats://localhost:4222")
# - `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
# - `broker_url::String` - Broker URL (e.g., "localhost:4222")
# - `subject::String` - Subject to publish to (e.g., "/agent/wine/api/v1/prompt")
# - `message::String` - JSON message to publish
# - `correlation_id::String` - Correlation ID for tracing and logging
@@ -723,8 +723,8 @@ end
# # Prepare JSON message
# message = "{\"correlation_id\":\"abc123\",\"payload\":\"test\"}"
# # Publish to NATS
# publish_message("nats://localhost:4222", "my.subject", message, "abc123")
# # Publish via transport
# publish_message("localhost:4222", "my.subject", message, "abc123")
# ```
# """
# function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
@@ -732,13 +732,13 @@ end
# publish_message(conn, subject, message, correlation_id)
# end
# """ publish_message - Publish message to NATS using pre-existing connection
# This function publishes a message to a NATS subject using a pre-existing NATS connection,
# """ publish_message - Publish message via transport using pre-existing connection
# This function publishes a message via the transport using a pre-existing connection,
# avoiding the overhead of connection establishment.
# # Arguments:
# - `conn::NATS.Connection` - Pre-existing NATS connection
# - `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
# - `conn` - Pre-existing connection object with publish/close methods
# - `subject::String` - Subject to publish to (e.g., "/agent/wine/api/v1/prompt")
# - `message::String` - JSON message to publish
# - `correlation_id::String` - Correlation ID for tracing and logging
@@ -759,7 +759,7 @@ end
# ```
# # Use Case:
# Use this version when you already have an established NATS connection and want to publish
# Use this version when you already have an established connection and want to publish
# multiple messages without the overhead of creating a new connection for each publish.
# """
# function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
@@ -772,21 +772,21 @@ end
# end
""" smartreceive - Receive and process messages from NATS
This function processes incoming NATS messages, handling both direct transport
""" smartreceive - Receive and process messages
This function processes incoming messages, handling both direct transport
(base64 decoded payloads) and link transport (URL-based payloads).
It deserializes the data based on the transport type and returns the result.
A HTTP file server is required along with its download function.
# Function Workflow:
1. Parses the JSON envelope from the NATS message
1. Parses the JSON envelope from the message
2. Iterates through each payload in the envelope
3. For each payload: determines the transport type (direct or link)
4. For direct transport: decodes Base64 payload and deserializes based on type
5. For link transport: fetches data from URL with exponential backoff, then deserializes
# Arguments:
- `msg_json_str::String` - JSON string from NATS message payload (e.g., `String(nats_msg.payload)`)
- `msg_json_str::String` - JSON string from the message payload (e.g., `String(msg.payload)`)
# Keyword Arguments:
- `fileserver_download_handler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs
@@ -800,7 +800,6 @@ A HTTP file server is required along with its download function.
# Example
```jldoctest
# Receive and process message
msg = nats_message # NATS message
msg_json_str = String(msg.payload)
env = smartreceive(msg_json_str; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]

View File

@@ -3,21 +3,18 @@
* JavaScript/Node.js Implementation (Desktop/Server-Side)
*
* This module provides functionality for sending and receiving data across network boundaries
* using NATS as the message bus, with support for both direct payload transport and
* URL-based transport for larger payloads.
* with support for both direct payload transport and URL-based transport for larger payloads.
*
* Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
*
* Node.js-specific features:
* - Apache Arrow IPC support via apache-arrow
* - TCP NATS connections (nats:// or tls:// URLs)
* - Buffer for binary data handling
* - Connection pooling for high-throughput scenarios
* - Native fetch for HTTP operations
*
* @module msghandler
*/
const nats = require('nats');
const crypto = require('crypto');
// Use native fetch available in Node.js 18+
const arrow = require('apache-arrow');
@@ -40,9 +37,9 @@ function uuidv4() {
const DEFAULT_SIZE_THRESHOLD = 500_000;
/**
* Default NATS server URL
* Default broker URL
*/
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
const DEFAULT_BROKER_URL = 'localhost:4222';
/**
* Default HTTP file server URL for link transport
@@ -344,191 +341,11 @@ async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlatio
throw new Error(`Failed to fetch data after ${maxRetries} attempts`);
}
// ---------------------------------------------- NATS Client ---------------------------------------------- //
/**
* NATS client wrapper for connection management
* Supports both single-use and persistent connection modes
*/
class NATSClient {
/**
* Create a new NATS client
* @param {string} url - NATS server URL (nats:// or tls://)
* @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes
*/
constructor(url, keepAlive = false) {
this.url = url;
this.connection = null;
this.keepAlive = keepAlive;
}
/**
* Connect to NATS server
* @returns {Promise<NATS.Connection>}
*/
async connect() {
if (this.connection) {
return this.connection;
}
this.connection = await nats.connect({ servers: this.url });
return this.connection;
}
/**
* Publish message to NATS subject
* @param {string} subject - NATS subject to publish to
* @param {string} message - Message to publish
* @param {string} correlationId - Correlation ID for logging
*/
async publish(subject, message, correlationId) {
if (!this.connection) {
await this.connect();
}
await this.connection.publish(subject, message);
logTrace(correlationId, `Message published to ${subject}`);
}
/**
* Close the NATS connection
*/
async close() {
if (this.connection) {
this.connection.close();
this.connection = null;
}
}
/**
* Get the current connection (for external use)
* @returns {NATS.Connection|null}
*/
getConnection() {
return this.connection;
}
/**
* Check if connected
* @returns {boolean}
*/
isConnected() {
return this.connection !== null;
}
}
/**
* Connection pool for managing multiple NATS connections
* Useful for applications with multiple concurrent publishers
*/
class NATSConnectionPool {
/**
* Create a new connection pool
* @param {string} url - NATS server URL (nats:// or tls://)
* @param {number} [maxSize=10] - Maximum pool size
*/
constructor(url, maxSize = 10) {
this.url = url;
this.maxSize = maxSize;
this.connections = new Map();
this.idCounter = 0;
}
/**
* Get a connection from the pool (or create new)
* @returns {Promise<NATSClient>}
*/
async acquire() {
// Try to find an existing idle connection
for (const [id, client] of this.connections) {
if (client.isConnected()) {
return client;
}
}
// Create new connection if under limit
if (this.connections.size < this.maxSize) {
const id = `conn_${++this.idCounter}`;
const client = new NATSClient(this.url, true);
await client.connect();
this.connections.set(id, client);
return client;
}
// Pool exhausted - create new connection (caller should close when done)
const client = new NATSClient(this.url, false);
await client.connect();
return client;
}
/**
* Return a connection to the pool
* @param {NATSClient} client - Connection to return
*/
release(client) {
// Only return persistent connections
if (client.keepAlive && client.isConnected()) {
// Connection already in pool, do nothing
return;
}
// Non-persistent connection - close it
client.close();
}
/**
* Close all connections in the pool
*/
async closeAll() {
for (const [id, client] of this.connections) {
await client.close();
}
this.connections.clear();
}
}
// ---------------------------------------------- Core Functions ---------------------------------------------- //
/**
* Publish message to NATS
* @param {string|NATSClient|NATS.Connection} brokerUrlOrClient - NATS URL, client, or connection
* @param {string} subject - NATS subject to publish to
* @param {string} message - JSON message to publish
* @param {string} correlationId - Correlation ID for tracing
* @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections)
*/
async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) {
let conn;
let shouldClose = false;
if (brokerUrlOrClient instanceof NATSClient) {
conn = brokerUrlOrClient;
} else if (brokerUrlOrClient && typeof brokerUrlOrClient.publish === 'function') {
// Create a wrapper for direct connection (duck-typing check for NATS connection)
conn = {
async publish(subj, msg) {
await brokerUrlOrClient.publish(subj, msg);
},
async close() {
await brokerUrlOrClient.close();
}
};
shouldClose = true;
} else {
// String URL - create new client
const client = new NATSClient(brokerUrlOrClient);
conn = client;
shouldClose = true;
}
await conn.publish(subject, message, correlationId);
// Only close if explicitly requested and it's a short-lived client
if (shouldClose && closeConnection && conn instanceof NATSClient) {
await conn.close();
}
}
/**
* Build message envelope from payloads and metadata
* @param {string} subject - NATS subject
* @param {string} subject - Subject/topic
* @param {Array} payloads - Array of payload objects
* @param {Object} options - Envelope metadata options
* @returns {Object} Envelope object
@@ -583,18 +400,21 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
}
/**
* Send data via NATS with automatic transport selection
* Send data with automatic transport selection
*
* This function intelligently routes data delivery based on payload size.
* If the serialized payload is smaller than size_threshold, it encodes the data as Base64
* and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
* and publishes only the download URL over NATS.
* into a "direct" payload. Otherwise, it uploads the data to a fileserver
* and creates a "link" payload with the URL.
*
* @param {string} subject - NATS subject to publish the message to
* Transport publishing is the caller's responsibility. This function returns the
* envelope and its JSON string representation.
*
* @param {string} subject - Subject/topic to send the message to
* @param {Array} data - List of [dataname, data, type] tuples to send
* - type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
* @param {Object} options - Optional configuration
* @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server
* @param {string} [options.broker_url=DEFAULT_BROKER_URL] - Broker URL (for envelope metadata)
* @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server
* @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads
* @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport
@@ -605,8 +425,6 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
* @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast)
* @param {string} [options.reply_to=""] - Topic to reply to
* @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to
* @param {boolean} [options.is_publish=true] - Whether to automatically publish the message
* @param {NATSClient|NATS.Connection} [options.nats_connection=null] - Pre-existing NATS connection
* @param {string} [options.msg_id=crypto.randomUUID()] - Message ID
* @param {string} [options.sender_id=crypto.randomUUID()] - Sender ID
* @returns {Promise<[Object, string]>} Tuple of [env, env_json_str]
@@ -615,8 +433,7 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
* // Send a single payload
* const [env, envJsonStr] = await smartsend(
* "/test",
* [["dataname1", data1, "dictionary"]],
* { broker_url: "nats://localhost:4222" }
* [["dataname1", data1, "dictionary"]]
* );
*
* // Send multiple payloads
@@ -625,17 +442,11 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
* [
* ["dataname1", data1, "dictionary"],
* ["dataname2", data2, "arrowtable"]
* ],
* { broker_url: "nats://localhost:4222" }
* ]
* );
*
* // Send with pre-existing connection
* const client = await msghandler.NATSClient.connect("nats://localhost:4222");
* const [env, envJsonStr] = await smartsend(
* "/test",
* [["data", myData, "text"]],
* { nats_connection: client }
* );
* // Publish via your transport (NATS, MQTT, HTTP, etc.)
* // await myNatsClient.publish("/test", envJsonStr);
*/
async function smartsend(subject, data, options = {}) {
const {
@@ -650,8 +461,6 @@ async function smartsend(subject, data, options = {}) {
receiver_id = '',
reply_to = '',
reply_to_msg_id = '',
is_publish = true,
nats_connection = null,
msg_id = uuidv4(),
sender_id = uuidv4()
} = options;
@@ -722,25 +531,18 @@ async function smartsend(subject, data, options = {}) {
const env_json_str = JSON.stringify(env);
if (is_publish) {
if (nats_connection) {
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
} else {
await publishMessage(broker_url, subject, env_json_str, correlation_id);
}
}
return [env, env_json_str];
}
/**
* Receive and process NATS message
* Receive and process messages
*
* This function processes incoming NATS messages, handling both direct transport
* This function processes incoming messages, handling both direct transport
* (base64 decoded payloads) and link transport (URL-based payloads).
* It deserializes the data based on the transport type and returns the result.
*
* @param {Object} msg - NATS message object with payload property
* @param {string|Object} msg - Message payload. Accepts either a JSON string directly,
* or an object with a `data` or `payload` property containing the JSON string.
* @param {Object} options - Optional configuration
* @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads
* @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL
@@ -749,13 +551,18 @@ async function smartsend(subject, data, options = {}) {
* @returns {Promise<Object>} Envelope object with processed payloads
*
* @example
* // Receive and process message
* const env = await smartreceive(msg, {
* // Receive from JSON string directly
* const env = await smartreceive(jsonString, {
* fileserver_download_handler: fetchWithBackoff,
* max_retries: 5,
* base_delay: 100,
* max_delay: 5000
* });
*
* // Receive from transport message object (e.g., NATS, MQTT)
* const env = await smartreceive(natsMsg, {
* fileserver_download_handler: fetchWithBackoff
* });
* // env.payloads is an Array of [dataname, data, type] arrays
* for (const [dataname, data, type] of env.payloads) {
* console.log(`${dataname}: ${data} (type: ${type})`);
@@ -769,20 +576,20 @@ async function smartreceive(msg, options = {}) {
max_delay = 5000
} = options;
// Debug: Log message object structure
logTrace('smartreceive', `smartreceive: msg object keys: ${Object.keys(msg).join(', ')}`);
logTrace('smartreceive', `smartreceive: msg.data type: ${typeof msg.data}, constructor: ${msg.data?.constructor?.name}`);
logTrace('smartreceive', `smartreceive: msg.payload type: ${typeof msg.payload}, constructor: ${msg.payload?.constructor?.name}`);
// Parse the JSON envelope
// NATS.js v2.x uses msg.data instead of msg.payload
// Handle both raw JSON strings and transport message objects
let payload;
if (msg.data !== undefined) {
payload = typeof msg.data === 'string' ? msg.data : Buffer.from(msg.data).toString('utf8');
} else if (msg.payload !== undefined) {
payload = typeof msg.payload === 'string' ? msg.payload : Buffer.from(msg.payload).toString('utf8');
if (typeof msg === 'string') {
payload = msg;
} else if (msg !== null && typeof msg === 'object') {
if (msg.data !== undefined) {
payload = typeof msg.data === 'string' ? msg.data : Buffer.from(msg.data).toString('utf8');
} else if (msg.payload !== undefined) {
payload = typeof msg.payload === 'string' ? msg.payload : Buffer.from(msg.payload).toString('utf8');
} else {
throw new Error('Message has neither data nor payload property');
}
} else {
throw new Error('Message has neither data nor payload property');
throw new Error('Invalid message format: expected JSON string or message object');
}
logTrace('smartreceive', `smartreceive: raw payload length=${payload.length}`);
@@ -866,61 +673,15 @@ async function smartreceive(msg, options = {}) {
const msghandler = {
/**
* NATS client class for connection management
* Supports both single-use and persistent connection modes
*
* @example
* // Single-use connection (closes after publish)
* const client = new msghandler.NATSClient("nats://localhost:4222");
* await msghandler.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
* await client.close();
*
* // Persistent connection (keeps connection open)
* const client = new msghandler.NATSClient("nats://localhost:4222", true);
* await client.connect();
* await msghandler.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false });
* await msghandler.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id");
* // Connection remains open for more publishes
* await client.close();
*/
NATSClient,
/**
* Connection pool for managing multiple NATS connections
* Useful for applications with multiple concurrent publishers
*
* @example
* const pool = new msghandler.NATSConnectionPool("nats://localhost:4222", 10);
* const client = await pool.acquire();
* await msghandler.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
* pool.release(client);
* await pool.closeAll();
*/
NATSConnectionPool,
/**
* Send data via NATS with automatic transport selection
* Send data with automatic transport selection
*/
smartsend,
/**
* Receive and process NATS message
* Receive and process messages
*/
smartreceive,
/**
* Publish message to NATS
*
* @example
* // Using a persistent connection
* const client = new msghandler.NATSClient("nats://localhost:4222", true);
* await client.connect();
* await msghandler.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false);
* // Connection stays open for more publishes
* await client.close();
*/
publishMessage,
/**
* Upload data to plik server in one-shot mode
*/
@@ -939,4 +700,4 @@ const msghandler = {
DEFAULT_FILESERVER_URL
};
module.exports = msghandler;
module.exports = msghandler;

View File

@@ -3,8 +3,7 @@ msghandler - Cross-Platform Bi-Directional Data Bridge
Python Desktop Implementation
This module provides functionality for sending and receiving data across network boundaries
using NATS as the message bus, with support for both direct payload transport and
URL-based transport for larger payloads.
with support for both direct payload transport and URL-based transport for larger payloads.
@package msghandler
"""
@@ -24,13 +23,6 @@ try:
except ImportError:
ARROW_AVAILABLE = False
try:
import nats
from nats.aio.client import Client as NATSClient
NATS_AVAILABLE = True
except ImportError:
NATS_AVAILABLE = False
# ---------------------------------------------- Constants ---------------------------------------------- #
"""
@@ -39,9 +31,9 @@ Default size threshold for switching from direct to link transport (0.5MB)
DEFAULT_SIZE_THRESHOLD = 500_000
"""
Default NATS server URL
Default broker URL
"""
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_BROKER_URL = "localhost:4222"
"""
Default HTTP file server URL for link transport
@@ -305,56 +297,6 @@ async def fetch_with_backoff(
raise Exception(f"Failed to fetch data after {max_retries} attempts")
# ---------------------------------------------- NATS Client ---------------------------------------------- #
class NATSClient:
"""NATS client wrapper for connection management."""
def __init__(self, url: str = DEFAULT_BROKER_URL):
"""
Create a new NATS client.
Args:
url: NATS server URL
"""
self.url = url
self._client: NATSClient = None
async def connect(self) -> NATSClient:
"""
Connect to NATS server.
Returns:
NATS client instance
"""
if NATS_AVAILABLE:
self._client = nats.connect(self.url)
await self._client
else:
raise RuntimeError('nats-py not available')
return self._client
async def publish(self, subject: str, message: str, correlation_id: str = "") -> None:
"""
Publish message to NATS subject.
Args:
subject: NATS subject to publish to
message: Message to publish
correlation_id: Correlation ID for logging
"""
if self._client:
await self._client.publish(subject, message)
if correlation_id:
log_trace(correlation_id, f"Message published to {subject}")
async def close(self) -> None:
"""Close the NATS connection."""
if self._client:
await self._client.drain()
await self._client.close()
# ---------------------------------------------- Core Functions ---------------------------------------------- #
def _build_envelope(
@@ -366,7 +308,7 @@ def _build_envelope(
Build message envelope from payloads and metadata.
Args:
subject: NATS subject
subject: Subject/topic
payloads: Array of payload objects
options: Envelope metadata options
@@ -430,41 +372,6 @@ def _build_payload(
}
async def publish_message(
broker_url_or_client: Union[str, NATSClient, Any],
subject: str,
message: str,
correlation_id: str
) -> None:
"""
Publish message to NATS.
Args:
broker_url_or_client: NATS URL, client, or connection
subject: NATS subject to publish to
message: JSON message to publish
correlation_id: Correlation ID for tracing
"""
if isinstance(broker_url_or_client, NATSClient):
client = broker_url_or_client
elif NATS_AVAILABLE and hasattr(broker_url_or_client, 'publish'):
# Direct NATS client connection
await broker_url_or_client.publish(subject, message)
log_trace(correlation_id, f"Message published to {subject}")
return
else:
# String URL - create new client
client = NATSClient(broker_url_or_client)
await client.connect()
await client.publish(subject, message, correlation_id)
if isinstance(broker_url_or_client, NATSClient):
await broker_url_or_client.close()
elif not (NATS_AVAILABLE and hasattr(broker_url_or_client, 'publish')):
await client.close()
async def smartsend(
subject: str,
data: List[Tuple[str, Any, str]],
@@ -479,26 +386,27 @@ async def smartsend(
receiver_id: str = "",
reply_to: str = "",
reply_to_msg_id: str = "",
is_publish: bool = True,
nats_connection: Any = None,
msg_id: str = None,
sender_id: str = None
) -> Tuple[Dict, str]:
"""
Send data via NATS with automatic transport selection.
Send data with automatic transport selection.
This function intelligently routes data delivery based on payload size.
If the serialized payload is smaller than size_threshold, it encodes the data as Base64
and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
and publishes only the download URL over NATS.
into a "direct" payload. Otherwise, it uploads the data to a fileserver
and creates a "link" payload with the URL.
Transport publishing is the caller's responsibility. This function returns the
envelope and its JSON string representation.
Args:
subject: NATS subject to publish the message to
subject: Subject/topic to send the message to
data: List of (dataname, data, type) tuples to send
- dataname: Name of the payload
- data: The actual data to send
- type: Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
broker_url: URL of the NATS server
broker_url: Broker URL (for envelope metadata)
fileserver_url: URL of the HTTP file server for large payloads
fileserver_upload_handler: Function to handle fileserver uploads (must return Dict with "status",
"uploadid", "fileid", "url" keys)
@@ -510,60 +418,24 @@ async def smartsend(
receiver_id: UUID of the receiver (empty string means broadcast)
reply_to: Topic to reply to (empty string if no reply expected)
reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS
nats_connection: Pre-existing NATS connection (if provided, uses this connection instead of
creating a new one; saves connection establishment overhead)
msg_id: Message ID (auto-generated UUID if not provided)
sender_id: Sender ID (auto-generated UUID if not provided)
Returns:
Tuple of (env, env_json_str) where:
- env: Dict containing all metadata and payloads
- env_json_str: JSON string for publishing to NATS
- env_json_str: JSON string for transport
Example:
>>> # Send a single payload (still wrapped in a list)
>>> data = {"key": "value"}
>>> env, env_json_str = await smartsend(
... "my.subject",
... [("dataname1", data, "dictionary")],
... broker_url="nats://localhost:4222"
... [("dataname1", data, "dictionary")]
... )
>>>
>>> # Send multiple payloads with different types
>>> data1 = {"key1": "value1"}
>>> data2 = [1, 2, 3, 4, 5]
>>> env, env_json_str = await smartsend(
... "my.subject",
... [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")]
... )
>>>
>>> # Send a large array using fileserver upload
>>> data = list(range(10_000_000)) # ~80 MB
>>> env, env_json_str = await smartsend(
... "large.data",
... [("large_table", data, "arrowtable")]
... )
>>>
>>> # Send jsontable (JSON format for human-readable tabular data)
>>> users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
>>> env, env_json_str = await smartsend(
... "json.data",
... [("users", users, "jsontable")]
... )
>>>
>>> # Mixed content (e.g., chat with text and image)
>>> env, env_json_str = await smartsend(
... "chat.subject",
... [
... ("message_text", "Hello!", "text"),
... ("user_image", image_data, "image"),
... ("audio_clip", audio_data, "audio")
... ]
... )
>>>
>>> # Publish the JSON string directly using NATS request-reply pattern
>>> # reply = await nats.request(broker_url, subject, env_json_str, reply_to=reply_to_topic)
>>> # Publish the JSON string via your preferred transport
>>> # await my_nats_client.publish("my.subject", env_json_str)
"""
if correlation_id is None:
correlation_id = str(uuid.uuid4())
@@ -619,12 +491,6 @@ async def smartsend(
env_json_str = json.dumps(env)
if is_publish:
if nats_connection:
await publish_message(nats_connection, subject, env_json_str, correlation_id)
else:
await publish_message(broker_url, subject, env_json_str, correlation_id)
return env, env_json_str
@@ -636,14 +502,15 @@ async def smartreceive(
max_delay: int = 5000
) -> Dict[str, Any]:
"""
Receive and process NATS messages.
Receive and process messages.
This function processes incoming NATS messages, handling both direct transport
This function processes incoming messages, handling both direct transport
(base64 decoded payloads) and link transport (URL-based payloads).
It deserializes the data based on the transport type and returns the result.
Args:
msg: NATS message to process
msg: Message to process. Accepts JSON string directly, or an object with
a `payload` or `data` property containing the JSON string.
fileserver_download_handler: Function to handle downloading data from file server URLs
max_retries: Maximum retry attempts for fetching URL
base_delay: Initial delay for exponential backoff in ms
@@ -653,10 +520,12 @@ async def smartreceive(
Dict with envelope metadata and payloads field containing List[Tuple[str, Any, str]]
Example:
>>> # Receive and process message
>>> env = await smartreceive(msg, fileserver_download_handler=fetch_with_backoff)
>>> # Receive from JSON string directly
>>> env = await smartreceive(json_string)
>>>
>>> # Receive from transport message object (e.g., NATS, MQTT)
>>> env = await smartreceive(nats_msg, fileserver_download_handler=fetch_with_backoff)
>>> # env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
>>> # Access payloads: for dataname, data, type_ in env["payloads"]
>>> for dataname, data, type_ in env["payloads"]:
>>> print(f"{dataname}: {data} (type: {type_})")
"""
@@ -664,13 +533,19 @@ async def smartreceive(
if isinstance(msg, dict):
# Already parsed
env_json_obj = msg
elif isinstance(msg, str):
# Raw JSON string
env_json_obj = json.loads(msg)
elif hasattr(msg, 'payload'):
# NATS message object
# Transport message object with payload property
payload = msg.payload if isinstance(msg.payload, str) else msg.payload.decode('utf-8')
env_json_obj = json.loads(payload)
elif hasattr(msg, 'data'):
# Transport message object with data property
payload = msg.data if isinstance(msg.data, str) else msg.data.decode('utf-8')
env_json_obj = json.loads(payload)
else:
# Assume it's already a JSON string or dict
env_json_obj = json.loads(msg) if isinstance(msg, str) else msg
raise ValueError('Invalid message format: expected JSON string or message object')
log_trace(env_json_obj['correlation_id'], "Processing received message")
@@ -727,7 +602,7 @@ async def smartreceive(
class msghandler:
"""
Cross-platform NATS bridge implementation.
Cross-platform message bridge implementation.
This class provides a convenient interface for msghandler functionality,
encapsulating the main functions and providing a class-based API.
@@ -742,7 +617,7 @@ class msghandler:
Initialize msghandler.
Args:
broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL)
broker_url: Broker URL (defaults to DEFAULT_BROKER_URL)
fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL)
"""
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
@@ -755,10 +630,10 @@ class msghandler:
**kwargs
) -> Tuple[Dict, str]:
"""
Send data via NATS.
Send data.
Args:
subject: NATS subject to publish to
subject: Subject/topic to send to
data: List of (dataname, data, type) tuples
**kwargs: Additional options passed to smartsend
@@ -775,10 +650,10 @@ class msghandler:
**kwargs
) -> Dict[str, Any]:
"""
Receive and process NATS message.
Receive and process message.
Args:
msg: NATS message to process
msg: Message to process
**kwargs: Additional options passed to smartreceive
Returns:
@@ -797,7 +672,7 @@ def send(
Convenience function for sending data.
Args:
subject: NATS subject to publish to
subject: Subject/topic to send to
data: List of (dataname, data, type) tuples
**kwargs: Additional options
@@ -815,7 +690,7 @@ def receive(
Convenience function for receiving messages.
Args:
msg: NATS message to process
msg: Message to process
**kwargs: Additional options
Returns:
@@ -835,9 +710,7 @@ __all__ = [
'DEFAULT_SIZE_THRESHOLD',
'DEFAULT_BROKER_URL',
'DEFAULT_FILESERVER_URL',
'NATSClient',
'_serialize_data',
'_deserialize_data',
'log_trace',
'publish_message'
]
'log_trace'
]

View File

@@ -1,6 +1,6 @@
// msghandler Rust Module
// Cross-platform bi-directional data bridge for NATS communication
// Implements smartsend and smartreceive for NATS communication
// Cross-platform bi-directional data bridge
// Implements smartsend and smartreceive for message transport
// with support for both direct payload transport and URL-based transport
// for larger payloads using the Claim-Check pattern.
//
@@ -38,8 +38,8 @@ use uuid::Uuid;
/// Default size threshold (0.5MB) for switching from direct to link transport
pub const DEFAULT_SIZE_THRESHOLD: usize = 500_000;
/// Default NATS server URL
pub const DEFAULT_BROKER_URL: &str = "nats://localhost:4222";
/// Default broker URL
pub const DEFAULT_BROKER_URL: &str = "localhost:4222";
/// Default HTTP file server URL for link transport
pub const DEFAULT_FILESERVER_URL: &str = "http://localhost:8080";
@@ -68,8 +68,8 @@ pub enum msghandlerError {
DownloadFailed { url: String, retries: u32 },
/// Unknown transport type
UnknownTransport(String),
/// NATS connection failed
NatConnectionFailed(String),
/// Connection failed
ConnectionFailed(String),
/// Payload deserialization error
DeserializationError(String),
/// HTTP request error
@@ -95,7 +95,7 @@ impl fmt::Display for msghandlerError {
write!(f, "Failed to fetch {} after {} attempts", url, retries)
}
msghandlerError::UnknownTransport(t) => write!(f, "Unknown transport type: {}", t),
msghandlerError::NatConnectionFailed(msg) => write!(f, "NATS connection failed: {}", msg),
msghandlerError::ConnectionFailed(msg) => write!(f, "Connection failed: {}", msg),
msghandlerError::DeserializationError(msg) => {
write!(f, "Deserialization error: {}", msg)
}
@@ -172,7 +172,7 @@ impl Payload {
// Message Payload Structure (wire format)
// ============================================================================
/// Represents a single payload within a NATS message envelope.
/// Represents a single payload within a message envelope.
/// Supports both direct transport (base64-encoded data) and link transport (URL-based).
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct MsgPayloadV1 {
@@ -257,7 +257,7 @@ impl MsgPayloadV1 {
// Message Envelope Structure (wire format)
// ============================================================================
/// Represents a complete NATS message envelope containing multiple payloads
/// Represents a complete message envelope containing multiple payloads
/// with metadata for routing, tracing, and message context.
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct MsgEnvelopeV1 {
@@ -268,7 +268,7 @@ pub struct MsgEnvelopeV1 {
/// Message publication timestamp (ISO 8601 UTC)
pub timestamp: String,
/// NATS subject/topic to publish the message to
/// Subject/topic to send the message to
pub send_to: String,
/// Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown",
/// "chat", "command", "event"
@@ -286,7 +286,7 @@ pub struct MsgEnvelopeV1 {
pub reply_to: String,
/// Message ID this message is replying to
pub reply_to_msg_id: String,
/// NATS broker URL
/// Broker URL
pub broker_url: String,
/// Optional message-level metadata
@@ -317,7 +317,7 @@ impl MsgEnvelopeV1 {
}
}
/// Convert the envelope to a JSON string for NATS publishing
/// Convert the envelope to a JSON string for transport
pub fn to_json(&self) -> Result<String, msghandlerError> {
serde_json::to_string(self).map_err(|e| msghandlerError::JsonError(e.to_string()))
}
@@ -329,7 +329,7 @@ impl MsgEnvelopeV1 {
/// Options for the `smartsend` function
pub struct SmartsendOptions {
/// NATS server URL
/// Broker URL
pub broker_url: String,
/// HTTP file server URL for large payloads
pub fileserver_url: String,
@@ -701,7 +701,7 @@ pub fn log_trace(correlation_id: &str, message: &str) {
// Public API: smartsend
// ============================================================================
/// Send data via NATS with automatic transport selection.
/// Send data with automatic transport selection.
///
/// This function intelligently routes data delivery based on payload size.
/// If the serialized payload is smaller than `size_threshold`, it encodes the
@@ -711,11 +711,11 @@ pub fn log_trace(correlation_id: &str, message: &str) {
/// Each payload in the list can have a different type, enabling mixed-content
/// messages (e.g., chat with text, images, audio).
///
/// NATS publishing is the caller's responsibility. This function returns the
/// Transport publishing is the caller's responsibility. This function returns the
/// envelope and its JSON string representation.
///
/// # Arguments
/// - `subject`: NATS subject to publish the message to
/// - `subject`: Subject/topic to send the message to
/// - `data`: Slice of (dataname, payload, payload_type) tuples
/// - `options`: Configuration options
///
@@ -736,8 +736,7 @@ pub fn log_trace(correlation_id: &str, message: &str) {
/// &SmartsendOptions::default(),
/// ).await?;
///
/// // Caller publishes to NATS
/// // conn.publish("/agent/wine/api/v1/prompt", &json_str)?;
/// // Caller publishes via their preferred transport
/// # Ok(())
/// # }
/// ```
@@ -793,7 +792,7 @@ pub async fn smartsend(
));
if payload_size < options.size_threshold {
// Direct transport: Base64 encode and include in NATS message
// Direct transport: Base64 encode and include in message envelope
let payload_b64 = BASE64.encode(&payload_bytes);
log_trace(&correlation_id, &format!(
"Using direct transport for {} bytes", payload_size
@@ -807,7 +806,7 @@ pub async fn smartsend(
);
payloads.push(msg_payload);
} else {
// Link transport: Upload to file server, include URL in NATS message
// Link transport: Upload to file server, include URL in message envelope
log_trace(&correlation_id, "Using link transport, uploading to fileserver");
let upload_result = upload_handler
@@ -878,15 +877,15 @@ fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> Ms
// Public API: smartreceive
// ============================================================================
/// Receive and process messages from NATS.
/// Receive and process messages.
///
/// This function processes incoming NATS messages, handling both direct transport
/// This function processes incoming messages, handling both direct transport
/// (base64 decoded payloads) and link transport (URL-based payloads).
/// It deserializes the data based on the payload type and returns the envelope
/// with deserialized payloads.
///
/// # Arguments
/// - `msg_json_str`: JSON string from NATS message payload
/// - `msg_json_str`: JSON string from the message payload
/// - `options`: Configuration options
///
/// # Returns
@@ -902,7 +901,7 @@ fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> Ms
/// "timestamp":"2026-01-01T00:00:00Z","send_to":"/test",
/// "msg_purpose":"chat","sender_name":"test","sender_id":"sender-uuid",
/// "receiver_name":"","receiver_id":"","reply_to":"","reply_to_msg_id":"",
/// "broker_url":"nats://localhost:4222","payloads":[{
/// "broker_url":"localhost:4222","payloads":[{
/// "id":"payload-uuid","dataname":"msg","payload_type":"text",
/// "transport":"direct","encoding":"base64","size":5,
/// "data":"SGVsbG8=","metadata":{"payload_bytes":5}

View File

@@ -3,8 +3,7 @@ msghandler - Cross-Platform Bi-Directional Data Bridge
MicroPython Implementation
This module provides functionality for sending and receiving data across network boundaries
using NATS as the message bus, with support for both direct payload transport and
URL-based transport for larger payloads.
with support for both direct payload transport and URL-based transport for larger payloads.
Note: MicroPython has significant constraints compared to desktop implementations:
- Limited memory (~256KB - 1MB)
@@ -29,9 +28,9 @@ Default size threshold for switching from direct to link transport (100KB for Mi
DEFAULT_SIZE_THRESHOLD = 100000
"""
Default NATS server URL
Default broker URL
"""
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_BROKER_URL = "localhost:4222"
"""
Default HTTP file server URL for link transport
@@ -190,64 +189,6 @@ def _sync_fileserver_download(url, max_retries, base_delay, max_delay, correlati
"Use direct transport only for memory-constrained devices.")
# ---------------------------------------------- NATS Client ---------------------------------------------- #
class NATSClient:
"""
NATS client wrapper for MicroPython.
Note:
This is a simplified implementation for MicroPython.
Full NATS client implementation would require additional network stack support.
"""
def __init__(self, url=DEFAULT_BROKER_URL):
"""
Initialize NATS client.
Args:
url: NATS server URL
"""
self.url = url
self._connected = False
def connect(self):
"""
Connect to NATS server.
Note:
This is a placeholder implementation.
Actual NATS client would require network stack support.
Returns:
True if connected, False otherwise
"""
# Placeholder - actual implementation would connect to NATS server
self._connected = True
return self._connected
def publish(self, subject, message):
"""
Publish message to NATS subject.
Note:
This is a placeholder implementation.
Actual NATS client would require network stack support.
Args:
subject: NATS subject to publish to
message: Message to publish
"""
if not self._connected:
raise RuntimeError("Not connected to NATS server")
# Placeholder - actual implementation would publish to NATS
print(f"[NATS] Publish to {subject}: {message[:50]}...")
def close(self):
"""Close the NATS connection."""
self._connected = False
# ---------------------------------------------- Core Functions ---------------------------------------------- #
def _build_envelope(subject, payloads, options):
@@ -255,7 +196,7 @@ def _build_envelope(subject, payloads, options):
Build message envelope from payloads and metadata.
Args:
subject: NATS subject
subject: Subject/topic
payloads: Array of payload objects
options: Envelope metadata options
@@ -308,44 +249,43 @@ def _build_payload(dataname, payload_type, payload_bytes, transport, data):
def _publish(subject, message, correlation_id):
"""
Publish message to NATS.
Publish message via transport.
Note:
This is a simplified implementation for MicroPython.
Args:
subject: NATS subject to publish to
subject: Subject to publish to
message: JSON message to publish
correlation_id: Correlation ID for logging
"""
log_trace(correlation_id, f"Publishing to {subject}")
# Placeholder - actual implementation would use NATSClient
# client = NATSClient()
# client.connect()
# client.publish(subject, message)
# client.close()
# Placeholder - actual implementation would publish via preferred transport
def smartsend(subject, data, **kwargs):
"""
Send data via NATS with automatic transport selection.
Send data with automatic transport selection.
This function intelligently routes data delivery based on payload size.
If the serialized payload is smaller than size_threshold, it encodes the data as Base64
and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
and publishes only the download URL over NATS.
into a "direct" payload. Otherwise, it uploads the data to a fileserver
and creates a "link" payload with the URL.
Transport publishing is the caller's responsibility. This function returns the
envelope and its JSON string representation.
Note:
MicroPython has memory constraints, so the default size_threshold is lower (100KB).
Table type is not supported due to memory constraints.
Args:
subject: NATS subject to publish the message to
subject: Subject/topic to send the message to
data: List of (dataname, data, type) tuples to send
- dataname: Name of the payload
- data: The actual data to send
- type: Payload type: "text", "dictionary", "image", "audio", "video", "binary"
broker_url: NATS server URL (default: DEFAULT_BROKER_URL)
broker_url: Broker URL (for envelope metadata, default: DEFAULT_BROKER_URL)
fileserver_url: HTTP file server URL (default: DEFAULT_FILESERVER_URL)
fileserver_upload_handler: Function to handle fileserver uploads (default: _sync_fileserver_upload)
size_threshold: Threshold in bytes separating direct vs link transport (default: 100000)
@@ -356,36 +296,23 @@ def smartsend(subject, data, **kwargs):
receiver_id: UUID of the receiver (empty means broadcast)
reply_to: Topic to reply to (empty if no reply expected)
reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message (default: True)
msg_id: Message ID (auto-generated if not provided)
sender_id: Sender ID (auto-generated if not provided)
Returns:
Tuple of (env, env_json_str) where:
- env: Dict containing all metadata and payloads
- env_json_str: JSON string for publishing to NATS
- env_json_str: JSON string for transport
Example:
>>> # Send text payload
>>> env, env_json_str = msghandler.smartsend(
>>> env, env_json_str = smartsend(
... "/chat",
... [("message", "Hello!", "text")],
... broker_url="nats://localhost:4222"
... [("message", "Hello!", "text")]
... )
>>>
>>> # Send dictionary payload
>>> env, env_json_str = msghandler.smartsend(
... "/config",
... [("config", {"key": "value"}, "dictionary")],
... broker_url="nats://localhost:4222"
... )
>>>
>>> # Send binary payload (image, audio, video)
>>> env, env_json_str = msghandler.smartsend(
... "/media",
... [("image", image_bytes, "image")],
... broker_url="nats://localhost:4222"
... )
>>> # Publish via your transport
>>> # my_transport.publish("/chat", env_json_str)
"""
# Extract options with defaults
correlation_id = kwargs.get('correlation_id', _generate_uuid())
@@ -465,9 +392,9 @@ def smartsend(subject, data, **kwargs):
def smartreceive(msg, **kwargs):
"""
Receive and process NATS message.
Receive and process messages.
This function processes incoming NATS messages, handling both direct transport
This function processes incoming messages, handling both direct transport
(base64 decoded payloads) and link transport (URL-based payloads).
It deserializes the data based on the transport type and returns the result.
@@ -476,7 +403,7 @@ def smartreceive(msg, **kwargs):
Table type is not supported due to memory constraints.
Args:
msg: NATS message to process (can be string, dict, or object with 'payload' attribute)
msg: Message to process (can be JSON string, dict, or object with 'payload'/'data' attribute)
fileserver_download_handler: Function to handle downloading data from file server URLs
max_retries: Maximum retry attempts (default: 3)
base_delay: Initial delay in ms (default: 100)
@@ -486,8 +413,11 @@ def smartreceive(msg, **kwargs):
Dict with envelope metadata and payloads field containing List[Tuple[str, Any, str]]
Example:
>>> # Receive and process message
>>> env = msghandler.smartreceive(msg, fileserver_download_handler=_sync_fileserver_download)
>>> # Receive from JSON string
>>> env = smartreceive(json_string)
>>>
>>> # Receive from transport message object
>>> env = smartreceive(transport_msg, fileserver_download_handler=_sync_fileserver_download)
>>> # env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
>>> for dataname, data, type_ in env["payloads"]:
... print(f"{dataname}: {data} (type: {type_})")
@@ -496,13 +426,19 @@ def smartreceive(msg, **kwargs):
if isinstance(msg, dict):
# Already parsed
env_json_obj = msg
elif isinstance(msg, str):
# Raw JSON string
env_json_obj = json.loads(msg)
elif hasattr(msg, 'payload'):
# Object with payload attribute
payload = msg.payload if isinstance(msg.payload, str) else msg.payload.decode('utf-8')
env_json_obj = json.loads(payload)
elif hasattr(msg, 'data'):
# Object with data attribute
payload = msg.data if isinstance(msg.data, str) else msg.data.decode('utf-8')
env_json_obj = json.loads(payload)
else:
# Assume it's already a JSON string or dict
env_json_obj = json.loads(msg) if isinstance(msg, str) else msg
raise ValueError('Invalid message format: expected JSON string or message object')
correlation_id = env_json_obj['correlation_id']
log_trace(correlation_id, "Processing received message")
@@ -565,7 +501,7 @@ def smartreceive(msg, **kwargs):
class msghandler:
"""
MicroPython NATS bridge implementation.
MicroPython message bridge implementation.
This class provides a convenient interface for msghandler functionality,
encapsulating the main functions and providing a class-based API.
@@ -588,7 +524,7 @@ class msghandler:
Initialize msghandler.
Args:
broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL)
broker_url: Broker URL (defaults to DEFAULT_BROKER_URL)
fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL)
"""
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
@@ -596,10 +532,10 @@ class msghandler:
def smartsend(self, subject, data, **kwargs):
"""
Send data via NATS.
Send data.
Args:
subject: NATS subject to publish to
subject: Subject/topic to send to
data: List of (dataname, data, type) tuples
**kwargs: Additional options passed to smartsend
@@ -612,10 +548,10 @@ class msghandler:
def smartreceive(self, msg, **kwargs):
"""
Receive and process NATS message.
Receive and process message.
Args:
msg: NATS message to process
msg: Message to process
**kwargs: Additional options passed to smartreceive
Returns:
@@ -630,7 +566,7 @@ def send(subject, data, **kwargs):
Convenience function for sending data.
Args:
subject: NATS subject to publish to
subject: Subject/topic to send to
data: List of (dataname, data, type) tuples
**kwargs: Additional options
@@ -645,7 +581,7 @@ def receive(msg, **kwargs):
Convenience function for receiving messages.
Args:
msg: NATS message to process
msg: Message to process
**kwargs: Additional options
Returns:
@@ -664,10 +600,9 @@ __all__ = [
'DEFAULT_BROKER_URL',
'DEFAULT_FILESERVER_URL',
'MAX_PAYLOAD_SIZE',
'NATSClient',
'_serialize_data',
'_deserialize_data',
'log_trace',
'_sync_fileserver_upload',
'_sync_fileserver_download'
]
]