1st commit
This commit is contained in:
1167
src/NATSBridge.jl
Normal file
1167
src/NATSBridge.jl
Normal file
File diff suppressed because it is too large
Load Diff
843
src/natsbridge.py
Normal file
843
src/natsbridge.py
Normal file
@@ -0,0 +1,843 @@
|
||||
"""
|
||||
msghandler - Cross-Platform Bi-Directional Data Bridge
|
||||
Python Desktop Implementation
|
||||
|
||||
This module provides functionality for sending and receiving data across network boundaries
|
||||
using NATS as the message bus, with support for both direct payload transport and
|
||||
URL-based transport for larger payloads.
|
||||
|
||||
@package msghandler
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import Any, Callable, Dict, List, Tuple, Union
|
||||
import aiohttp
|
||||
|
||||
try:
|
||||
import pyarrow as arrow
|
||||
import pyarrow.ipc as ipc
|
||||
ARROW_AVAILABLE = True
|
||||
except ImportError:
|
||||
ARROW_AVAILABLE = False
|
||||
|
||||
try:
|
||||
import nats
|
||||
from nats.aio.client import Client as NATSClient
|
||||
NATS_AVAILABLE = True
|
||||
except ImportError:
|
||||
NATS_AVAILABLE = False
|
||||
|
||||
# ---------------------------------------------- Constants ---------------------------------------------- #
|
||||
|
||||
"""
|
||||
Default size threshold for switching from direct to link transport (0.5MB)
|
||||
"""
|
||||
DEFAULT_SIZE_THRESHOLD = 500_000
|
||||
|
||||
"""
|
||||
Default NATS server URL
|
||||
"""
|
||||
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
||||
|
||||
"""
|
||||
Default HTTP file server URL for link transport
|
||||
"""
|
||||
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
||||
|
||||
|
||||
# ---------------------------------------------- Utility Functions ---------------------------------------------- #
|
||||
|
||||
def log_trace(correlation_id: str, message: str) -> None:
|
||||
"""
|
||||
Log a trace message with correlation ID and timestamp.
|
||||
|
||||
Args:
|
||||
correlation_id: Correlation ID for tracing
|
||||
message: Message content to log
|
||||
"""
|
||||
timestamp = datetime.utcnow().isoformat() + 'Z'
|
||||
print(f"[{timestamp}] [Correlation: {correlation_id}] {message}")
|
||||
|
||||
|
||||
# ---------------------------------------------- Serialization Functions ---------------------------------------------- #
|
||||
|
||||
def _serialize_data(data: Any, payload_type: str) -> bytes:
|
||||
"""
|
||||
Serialize data according to specified format.
|
||||
|
||||
Args:
|
||||
data: Data to serialize (string for "text", JSON-serializable for "dictionary",
|
||||
table-like for "arrowtable"/"jsontable", binary for "image", "audio", "video", "binary")
|
||||
payload_type: Target format: "text", "dictionary", "arrowtable", "jsontable",
|
||||
"image", "audio", "video", "binary"
|
||||
|
||||
Returns:
|
||||
Binary representation of the serialized data
|
||||
|
||||
Raises:
|
||||
Error: If payload_type is not one of the supported types
|
||||
Error: If payload_type is "image", "audio", or "video" but data is not bytes
|
||||
Error: If payload_type is "arrowtable" but data is not a pandas DataFrame or pyarrow Table
|
||||
Error: If payload_type is "jsontable" but data is not a list of dicts
|
||||
"""
|
||||
if payload_type == 'text':
|
||||
if isinstance(data, str):
|
||||
return data.encode('utf-8')
|
||||
else:
|
||||
raise ValueError('Text data must be a string')
|
||||
elif payload_type == 'dictionary':
|
||||
json_str = json.dumps(data)
|
||||
return json_str.encode('utf-8')
|
||||
elif payload_type == 'arrowtable':
|
||||
if not ARROW_AVAILABLE:
|
||||
raise RuntimeError('pyarrow not available for arrowtable serialization')
|
||||
|
||||
import io
|
||||
buf = io.BytesIO()
|
||||
|
||||
import pandas as pd
|
||||
if isinstance(data, pd.DataFrame):
|
||||
table = arrow.Table.from_pandas(data)
|
||||
sink = ipc.new_file(buf, table.schema)
|
||||
ipc.write_table(table, sink)
|
||||
sink.close()
|
||||
return buf.getvalue()
|
||||
elif isinstance(data, arrow.Table):
|
||||
sink = ipc.new_file(buf, data.schema)
|
||||
ipc.write_table(data, sink)
|
||||
sink.close()
|
||||
return buf.getvalue()
|
||||
else:
|
||||
raise ValueError('Arrow table data must be a pandas DataFrame or pyarrow Table')
|
||||
elif payload_type == 'jsontable':
|
||||
# Serialize list of dicts to JSON format
|
||||
if isinstance(data, list) and all(isinstance(row, dict) for row in data):
|
||||
json_str = json.dumps(data)
|
||||
return json_str.encode('utf-8')
|
||||
else:
|
||||
raise ValueError('JSON table data must be a list of dicts')
|
||||
elif payload_type == 'image':
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
return bytes(data)
|
||||
else:
|
||||
raise ValueError('Image data must be bytes')
|
||||
elif payload_type == 'audio':
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
return bytes(data)
|
||||
else:
|
||||
raise ValueError('Audio data must be bytes')
|
||||
elif payload_type == 'video':
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
return bytes(data)
|
||||
else:
|
||||
raise ValueError('Video data must be bytes')
|
||||
elif payload_type == 'binary':
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
return bytes(data)
|
||||
else:
|
||||
raise ValueError('Binary data must be bytes')
|
||||
else:
|
||||
raise ValueError(f'Unknown payload_type: {payload_type}')
|
||||
|
||||
|
||||
def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any:
|
||||
"""
|
||||
Deserialize bytes to data based on type.
|
||||
|
||||
Args:
|
||||
data: Serialized data as bytes
|
||||
payload_type: Data type ("text", "dictionary", "arrowtable", "jsontable",
|
||||
"image", "audio", "video", "binary")
|
||||
correlation_id: Correlation ID for logging
|
||||
|
||||
Returns:
|
||||
Deserialized data (String for "text", DataFrame for "arrowtable",
|
||||
Vector{Dict} for "jsontable"/"dictionary", bytes for "image", "audio", "video", "binary")
|
||||
|
||||
Raises:
|
||||
Error: If payload_type is not one of the supported types
|
||||
"""
|
||||
if payload_type == 'text':
|
||||
return data.decode('utf-8')
|
||||
elif payload_type == 'dictionary':
|
||||
json_str = data.decode('utf-8')
|
||||
return json.loads(json_str)
|
||||
elif payload_type == 'arrowtable':
|
||||
if not ARROW_AVAILABLE:
|
||||
raise RuntimeError('pyarrow not available for arrowtable deserialization')
|
||||
|
||||
import io
|
||||
buf = io.BytesIO(data)
|
||||
reader = ipc.open_file(buf)
|
||||
return reader.read_all().to_pandas()
|
||||
elif payload_type == 'jsontable':
|
||||
# Deserialize JSON to list of dicts
|
||||
json_str = data.decode('utf-8')
|
||||
return json.loads(json_str)
|
||||
elif payload_type == 'image':
|
||||
return data
|
||||
elif payload_type == 'audio':
|
||||
return data
|
||||
elif payload_type == 'video':
|
||||
return data
|
||||
elif payload_type == 'binary':
|
||||
return data
|
||||
else:
|
||||
raise ValueError(f'Unknown payload_type: {payload_type}')
|
||||
|
||||
|
||||
# ---------------------------------------------- File Server Handlers ---------------------------------------------- #
|
||||
|
||||
async def plik_oneshot_upload(
|
||||
file_server_url: str,
|
||||
dataname: str,
|
||||
data: bytes
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Upload data to plik server in one-shot mode.
|
||||
|
||||
This function uploads a raw byte array to a plik server in one-shot mode (no upload session).
|
||||
It first creates a one-shot upload session by sending a POST request with {"OneShot": true},
|
||||
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
|
||||
|
||||
Args:
|
||||
file_server_url: Base URL of the plik server (e.g., "http://localhost:8080")
|
||||
dataname: Name of the file being uploaded
|
||||
data: Raw byte data of the file content
|
||||
|
||||
Returns:
|
||||
Dict with keys:
|
||||
- "status": HTTP server response status
|
||||
- "uploadid": ID of the one-shot upload session
|
||||
- "fileid": ID of the uploaded file within the session
|
||||
- "url": Full URL to download the uploaded file
|
||||
|
||||
Example:
|
||||
>>> fileserver_url = "http://localhost:8080"
|
||||
>>> dataname = "test.txt"
|
||||
>>> data = b"hello world"
|
||||
>>> result = await plik_oneshot_upload(file_server_url, dataname, data)
|
||||
>>> result["status"], result["uploadid"], result["fileid"], result["url"]
|
||||
"""
|
||||
async with aiohttp.ClientSession() as session:
|
||||
# Get upload id
|
||||
url_getUploadID = f"{file_server_url}/upload"
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
body = json.dumps({"OneShot": True})
|
||||
|
||||
async with session.post(url_getUploadID, headers=headers, data=body) as response:
|
||||
response_json = await response.json()
|
||||
uploadid = response_json['id']
|
||||
uploadtoken = response_json['uploadToken']
|
||||
|
||||
# Upload file
|
||||
url_upload = f"{file_server_url}/file/{uploadid}"
|
||||
headers = {'X-UploadToken': uploadtoken}
|
||||
|
||||
form = aiohttp.FormData()
|
||||
form.add_field('file', data, filename=dataname, content_type='application/octet-stream')
|
||||
|
||||
async with session.post(url_upload, headers=headers, data=form) as upload_response:
|
||||
upload_json = await upload_response.json()
|
||||
fileid = upload_json['id']
|
||||
|
||||
url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}"
|
||||
|
||||
return {
|
||||
'status': upload_response.status,
|
||||
'uploadid': uploadid,
|
||||
'fileid': fileid,
|
||||
'url': url
|
||||
}
|
||||
|
||||
|
||||
async def fetch_with_backoff(
|
||||
url: str,
|
||||
max_retries: int,
|
||||
base_delay: int,
|
||||
max_delay: int,
|
||||
correlation_id: str
|
||||
) -> bytes:
|
||||
"""
|
||||
Fetch data from URL with exponential backoff.
|
||||
|
||||
This internal function retrieves data from a URL with retry logic using
|
||||
exponential backoff to handle transient failures.
|
||||
|
||||
Args:
|
||||
url: URL to fetch from
|
||||
max_retries: Maximum number of retry attempts
|
||||
base_delay: Initial delay in milliseconds
|
||||
max_delay: Maximum delay in milliseconds
|
||||
correlation_id: Correlation ID for logging
|
||||
|
||||
Returns:
|
||||
Fetched data as bytes
|
||||
|
||||
Raises:
|
||||
Error: If all retry attempts fail
|
||||
|
||||
Example:
|
||||
>>> data = await fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "correlation123")
|
||||
"""
|
||||
delay = base_delay
|
||||
|
||||
for attempt in range(1, max_retries + 1):
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as response:
|
||||
if response.status == 200:
|
||||
log_trace(correlation_id, f"Successfully fetched data from {url} on attempt {attempt}")
|
||||
return await response.read()
|
||||
else:
|
||||
raise Exception(f"Failed to fetch: {response.status}")
|
||||
except Exception as e:
|
||||
log_trace(correlation_id, f"Attempt {attempt} failed: {type(e).__name__}")
|
||||
|
||||
if attempt < max_retries:
|
||||
await asyncio.sleep(delay / 1000.0)
|
||||
delay = min(delay * 2, max_delay)
|
||||
|
||||
raise Exception(f"Failed to fetch data after {max_retries} attempts")
|
||||
|
||||
|
||||
# ---------------------------------------------- NATS Client ---------------------------------------------- #
|
||||
|
||||
class NATSClient:
|
||||
"""NATS client wrapper for connection management."""
|
||||
|
||||
def __init__(self, url: str = DEFAULT_BROKER_URL):
|
||||
"""
|
||||
Create a new NATS client.
|
||||
|
||||
Args:
|
||||
url: NATS server URL
|
||||
"""
|
||||
self.url = url
|
||||
self._client: NATSClient = None
|
||||
|
||||
async def connect(self) -> NATSClient:
|
||||
"""
|
||||
Connect to NATS server.
|
||||
|
||||
Returns:
|
||||
NATS client instance
|
||||
"""
|
||||
if NATS_AVAILABLE:
|
||||
self._client = nats.connect(self.url)
|
||||
await self._client
|
||||
else:
|
||||
raise RuntimeError('nats-py not available')
|
||||
return self._client
|
||||
|
||||
async def publish(self, subject: str, message: str, correlation_id: str = "") -> None:
|
||||
"""
|
||||
Publish message to NATS subject.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
message: Message to publish
|
||||
correlation_id: Correlation ID for logging
|
||||
"""
|
||||
if self._client:
|
||||
await self._client.publish(subject, message)
|
||||
if correlation_id:
|
||||
log_trace(correlation_id, f"Message published to {subject}")
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the NATS connection."""
|
||||
if self._client:
|
||||
await self._client.drain()
|
||||
await self._client.close()
|
||||
|
||||
|
||||
# ---------------------------------------------- Core Functions ---------------------------------------------- #
|
||||
|
||||
def _build_envelope(
|
||||
subject: str,
|
||||
payloads: List[Dict[str, Any]],
|
||||
options: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Build message envelope from payloads and metadata.
|
||||
|
||||
Args:
|
||||
subject: NATS subject
|
||||
payloads: Array of payload objects
|
||||
options: Envelope metadata options
|
||||
|
||||
Returns:
|
||||
Envelope object
|
||||
"""
|
||||
return {
|
||||
'correlation_id': options['correlation_id'],
|
||||
'msg_id': options['msg_id'],
|
||||
'timestamp': datetime.utcnow().isoformat() + 'Z',
|
||||
'send_to': subject,
|
||||
'msg_purpose': options['msg_purpose'],
|
||||
'sender_name': options['sender_name'],
|
||||
'sender_id': options['sender_id'],
|
||||
'receiver_name': options['receiver_name'],
|
||||
'receiver_id': options['receiver_id'],
|
||||
'reply_to': options['reply_to'],
|
||||
'reply_to_msg_id': options['reply_to_msg_id'],
|
||||
'broker_url': options['broker_url'],
|
||||
'metadata': options.get('metadata', {}),
|
||||
'payloads': payloads
|
||||
}
|
||||
|
||||
|
||||
def _build_payload(
|
||||
dataname: str,
|
||||
payload_type: str,
|
||||
payload_bytes: bytes,
|
||||
transport: str,
|
||||
data: Union[str, bytes]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Build payload object from serialized data.
|
||||
|
||||
Args:
|
||||
dataname: Name of the payload
|
||||
payload_type: Type of the payload
|
||||
payload_bytes: Serialized payload bytes
|
||||
transport: Transport type ("direct" or "link")
|
||||
data: Data (base64 for direct, URL for link)
|
||||
|
||||
Returns:
|
||||
Payload object
|
||||
"""
|
||||
# Determine encoding based on payload type (matching Julia/JS implementation)
|
||||
encoding = 'base64'
|
||||
if payload_type == 'jsontable':
|
||||
encoding = 'json'
|
||||
elif payload_type == 'arrowtable':
|
||||
encoding = 'arrow-ipc'
|
||||
|
||||
return {
|
||||
'id': str(uuid.uuid4()),
|
||||
'dataname': dataname,
|
||||
'payload_type': payload_type,
|
||||
'transport': transport,
|
||||
'encoding': encoding,
|
||||
'size': len(payload_bytes),
|
||||
'data': data,
|
||||
'metadata': {'payload_bytes': len(payload_bytes)} if transport == 'direct' else {}
|
||||
}
|
||||
|
||||
|
||||
async def publish_message(
|
||||
broker_url_or_client: Union[str, NATSClient, Any],
|
||||
subject: str,
|
||||
message: str,
|
||||
correlation_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Publish message to NATS.
|
||||
|
||||
Args:
|
||||
broker_url_or_client: NATS URL, client, or connection
|
||||
subject: NATS subject to publish to
|
||||
message: JSON message to publish
|
||||
correlation_id: Correlation ID for tracing
|
||||
"""
|
||||
if isinstance(broker_url_or_client, NATSClient):
|
||||
client = broker_url_or_client
|
||||
elif NATS_AVAILABLE and hasattr(broker_url_or_client, 'publish'):
|
||||
# Direct NATS client connection
|
||||
await broker_url_or_client.publish(subject, message)
|
||||
log_trace(correlation_id, f"Message published to {subject}")
|
||||
return
|
||||
else:
|
||||
# String URL - create new client
|
||||
client = NATSClient(broker_url_or_client)
|
||||
await client.connect()
|
||||
|
||||
await client.publish(subject, message, correlation_id)
|
||||
|
||||
if isinstance(broker_url_or_client, NATSClient):
|
||||
await broker_url_or_client.close()
|
||||
elif not (NATS_AVAILABLE and hasattr(broker_url_or_client, 'publish')):
|
||||
await client.close()
|
||||
|
||||
|
||||
async def smartsend(
|
||||
subject: str,
|
||||
data: List[Tuple[str, Any, str]],
|
||||
broker_url: str = DEFAULT_BROKER_URL,
|
||||
fileserver_url: str = DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler: Callable = plik_oneshot_upload,
|
||||
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id: str = None,
|
||||
msg_purpose: str = "chat",
|
||||
sender_name: str = "msghandler",
|
||||
receiver_name: str = "",
|
||||
receiver_id: str = "",
|
||||
reply_to: str = "",
|
||||
reply_to_msg_id: str = "",
|
||||
is_publish: bool = True,
|
||||
nats_connection: Any = None,
|
||||
msg_id: str = None,
|
||||
sender_id: str = None
|
||||
) -> Tuple[Dict, str]:
|
||||
"""
|
||||
Send data via NATS with automatic transport selection.
|
||||
|
||||
This function intelligently routes data delivery based on payload size.
|
||||
If the serialized payload is smaller than size_threshold, it encodes the data as Base64
|
||||
and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
|
||||
and publishes only the download URL over NATS.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish the message to
|
||||
data: List of (dataname, data, type) tuples to send
|
||||
- dataname: Name of the payload
|
||||
- data: The actual data to send
|
||||
- type: Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||
broker_url: URL of the NATS server
|
||||
fileserver_url: URL of the HTTP file server for large payloads
|
||||
fileserver_upload_handler: Function to handle fileserver uploads (must return Dict with "status",
|
||||
"uploadid", "fileid", "url" keys)
|
||||
size_threshold: Threshold in bytes separating direct vs link transport
|
||||
correlation_id: Correlation ID for tracing (auto-generated UUID if not provided)
|
||||
msg_purpose: Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
||||
sender_name: Name of the sender
|
||||
receiver_name: Name of the receiver (empty string means broadcast)
|
||||
receiver_id: UUID of the receiver (empty string means broadcast)
|
||||
reply_to: Topic to reply to (empty string if no reply expected)
|
||||
reply_to_msg_id: Message ID this message is replying to
|
||||
is_publish: Whether to automatically publish the message to NATS
|
||||
nats_connection: Pre-existing NATS connection (if provided, uses this connection instead of
|
||||
creating a new one; saves connection establishment overhead)
|
||||
msg_id: Message ID (auto-generated UUID if not provided)
|
||||
sender_id: Sender ID (auto-generated UUID if not provided)
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str) where:
|
||||
- env: Dict containing all metadata and payloads
|
||||
- env_json_str: JSON string for publishing to NATS
|
||||
|
||||
Example:
|
||||
>>> # Send a single payload (still wrapped in a list)
|
||||
>>> data = {"key": "value"}
|
||||
>>> env, env_json_str = await smartsend(
|
||||
... "my.subject",
|
||||
... [("dataname1", data, "dictionary")],
|
||||
... broker_url="nats://localhost:4222"
|
||||
... )
|
||||
>>>
|
||||
>>> # Send multiple payloads with different types
|
||||
>>> data1 = {"key1": "value1"}
|
||||
>>> data2 = [1, 2, 3, 4, 5]
|
||||
>>> env, env_json_str = await smartsend(
|
||||
... "my.subject",
|
||||
... [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")]
|
||||
... )
|
||||
>>>
|
||||
>>> # Send a large array using fileserver upload
|
||||
>>> data = list(range(10_000_000)) # ~80 MB
|
||||
>>> env, env_json_str = await smartsend(
|
||||
... "large.data",
|
||||
... [("large_table", data, "arrowtable")]
|
||||
... )
|
||||
>>>
|
||||
>>> # Send jsontable (JSON format for human-readable tabular data)
|
||||
>>> users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
|
||||
>>> env, env_json_str = await smartsend(
|
||||
... "json.data",
|
||||
... [("users", users, "jsontable")]
|
||||
... )
|
||||
>>>
|
||||
>>> # Mixed content (e.g., chat with text and image)
|
||||
>>> env, env_json_str = await smartsend(
|
||||
... "chat.subject",
|
||||
... [
|
||||
... ("message_text", "Hello!", "text"),
|
||||
... ("user_image", image_data, "image"),
|
||||
... ("audio_clip", audio_data, "audio")
|
||||
... ]
|
||||
... )
|
||||
>>>
|
||||
>>> # Publish the JSON string directly using NATS request-reply pattern
|
||||
>>> # reply = await nats.request(broker_url, subject, env_json_str, reply_to=reply_to_topic)
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = str(uuid.uuid4())
|
||||
if msg_id is None:
|
||||
msg_id = str(uuid.uuid4())
|
||||
if sender_id is None:
|
||||
sender_id = str(uuid.uuid4())
|
||||
|
||||
log_trace(correlation_id, f"Starting smartsend for subject: {subject}")
|
||||
|
||||
# Process payloads
|
||||
payloads = []
|
||||
for dataname, payload_data, payload_type in data:
|
||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
||||
payload_size = len(payload_bytes)
|
||||
|
||||
log_trace(correlation_id, f"Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes")
|
||||
|
||||
if payload_size < size_threshold:
|
||||
# Direct path
|
||||
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
|
||||
log_trace(correlation_id, f"Using direct transport for {payload_size} bytes")
|
||||
|
||||
payload = _build_payload(dataname, payload_type, payload_bytes, 'direct', payload_b64)
|
||||
payloads.append(payload)
|
||||
else:
|
||||
# Link path
|
||||
log_trace(correlation_id, "Using link transport, uploading to fileserver")
|
||||
|
||||
response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||||
|
||||
if response['status'] != 200:
|
||||
raise Exception(f"Failed to upload data to fileserver: {response['status']}")
|
||||
|
||||
log_trace(correlation_id, f"Uploaded to URL: {response['url']}")
|
||||
|
||||
payload = _build_payload(dataname, payload_type, payload_bytes, 'link', response['url'])
|
||||
payloads.append(payload)
|
||||
|
||||
# Build envelope
|
||||
env = _build_envelope(subject, payloads, {
|
||||
'correlation_id': correlation_id,
|
||||
'msg_id': msg_id,
|
||||
'msg_purpose': msg_purpose,
|
||||
'sender_name': sender_name,
|
||||
'sender_id': sender_id,
|
||||
'receiver_name': receiver_name,
|
||||
'receiver_id': receiver_id,
|
||||
'reply_to': reply_to,
|
||||
'reply_to_msg_id': reply_to_msg_id,
|
||||
'broker_url': broker_url
|
||||
})
|
||||
|
||||
env_json_str = json.dumps(env)
|
||||
|
||||
if is_publish:
|
||||
if nats_connection:
|
||||
await publish_message(nats_connection, subject, env_json_str, correlation_id)
|
||||
else:
|
||||
await publish_message(broker_url, subject, env_json_str, correlation_id)
|
||||
|
||||
return env, env_json_str
|
||||
|
||||
|
||||
async def smartreceive(
|
||||
msg: Any,
|
||||
fileserver_download_handler: Callable = fetch_with_backoff,
|
||||
max_retries: int = 5,
|
||||
base_delay: int = 100,
|
||||
max_delay: int = 5000
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Receive and process NATS messages.
|
||||
|
||||
This function processes incoming NATS messages, handling both direct transport
|
||||
(base64 decoded payloads) and link transport (URL-based payloads).
|
||||
It deserializes the data based on the transport type and returns the result.
|
||||
|
||||
Args:
|
||||
msg: NATS message to process
|
||||
fileserver_download_handler: Function to handle downloading data from file server URLs
|
||||
max_retries: Maximum retry attempts for fetching URL
|
||||
base_delay: Initial delay for exponential backoff in ms
|
||||
max_delay: Maximum delay for exponential backoff in ms
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads field containing List[Tuple[str, Any, str]]
|
||||
|
||||
Example:
|
||||
>>> # Receive and process message
|
||||
>>> env = await smartreceive(msg, fileserver_download_handler=fetch_with_backoff)
|
||||
>>> # env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
|
||||
>>> # Access payloads: for dataname, data, type_ in env["payloads"]
|
||||
>>> for dataname, data, type_ in env["payloads"]:
|
||||
>>> print(f"{dataname}: {data} (type: {type_})")
|
||||
"""
|
||||
# Parse the JSON envelope
|
||||
if isinstance(msg, dict):
|
||||
# Already parsed
|
||||
env_json_obj = msg
|
||||
elif hasattr(msg, 'payload'):
|
||||
# NATS message object
|
||||
payload = msg.payload if isinstance(msg.payload, str) else msg.payload.decode('utf-8')
|
||||
env_json_obj = json.loads(payload)
|
||||
else:
|
||||
# Assume it's already a JSON string or dict
|
||||
env_json_obj = json.loads(msg) if isinstance(msg, str) else msg
|
||||
|
||||
log_trace(env_json_obj['correlation_id'], "Processing received message")
|
||||
|
||||
# Process all payloads in the envelope
|
||||
payloads_list = []
|
||||
num_payloads = len(env_json_obj['payloads'])
|
||||
|
||||
for i in range(num_payloads):
|
||||
payload_obj = env_json_obj['payloads'][i]
|
||||
transport = payload_obj['transport']
|
||||
dataname = payload_obj['dataname']
|
||||
|
||||
if transport == 'direct':
|
||||
log_trace(env_json_obj['correlation_id'], f"Direct transport - decoding payload '{dataname}'")
|
||||
|
||||
# Extract base64 payload from the payload
|
||||
payload_b64 = payload_obj['data']
|
||||
|
||||
# Decode Base64 payload
|
||||
payload_bytes = base64.b64decode(payload_b64)
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = payload_obj['payload_type']
|
||||
data = _deserialize_data(payload_bytes, data_type, env_json_obj['correlation_id'])
|
||||
|
||||
payloads_list.append((dataname, data, data_type))
|
||||
elif transport == 'link':
|
||||
# Extract download URL from the payload
|
||||
url = payload_obj['data']
|
||||
log_trace(env_json_obj['correlation_id'], f"Link transport - fetching '{dataname}' from URL: {url}")
|
||||
|
||||
# Fetch with exponential backoff using the download handler
|
||||
downloaded_data = await fileserver_download_handler(
|
||||
url,
|
||||
max_retries,
|
||||
base_delay,
|
||||
max_delay,
|
||||
env_json_obj['correlation_id']
|
||||
)
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = payload_obj['payload_type']
|
||||
data = _deserialize_data(downloaded_data, data_type, env_json_obj['correlation_id'])
|
||||
|
||||
payloads_list.append((dataname, data, data_type))
|
||||
else:
|
||||
raise Exception(f"Unknown transport type for payload '{dataname}': {transport}")
|
||||
|
||||
env_json_obj['payloads'] = payloads_list
|
||||
return env_json_obj
|
||||
|
||||
|
||||
# ---------------------------------------------- Module Exports ---------------------------------------------- #
|
||||
|
||||
class msghandler:
|
||||
"""
|
||||
Cross-platform NATS bridge implementation.
|
||||
|
||||
This class provides a convenient interface for msghandler functionality,
|
||||
encapsulating the main functions and providing a class-based API.
|
||||
"""
|
||||
|
||||
DEFAULT_SIZE_THRESHOLD = DEFAULT_SIZE_THRESHOLD
|
||||
DEFAULT_BROKER_URL = DEFAULT_BROKER_URL
|
||||
DEFAULT_FILESERVER_URL = DEFAULT_FILESERVER_URL
|
||||
|
||||
def __init__(self, broker_url: str = None, fileserver_url: str = None):
|
||||
"""
|
||||
Initialize msghandler.
|
||||
|
||||
Args:
|
||||
broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL)
|
||||
fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL)
|
||||
"""
|
||||
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
|
||||
self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL
|
||||
|
||||
async def smartsend(
|
||||
self,
|
||||
subject: str,
|
||||
data: List[Tuple[str, Any, str]],
|
||||
**kwargs
|
||||
) -> Tuple[Dict, str]:
|
||||
"""
|
||||
Send data via NATS.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
data: List of (dataname, data, type) tuples
|
||||
**kwargs: Additional options passed to smartsend
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
"""
|
||||
kwargs['broker_url'] = kwargs.get('broker_url', self.broker_url)
|
||||
kwargs['fileserver_url'] = kwargs.get('fileserver_url', self.fileserver_url)
|
||||
return await smartsend(subject, data, **kwargs)
|
||||
|
||||
async def smartreceive(
|
||||
self,
|
||||
msg: Any,
|
||||
**kwargs
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Receive and process NATS message.
|
||||
|
||||
Args:
|
||||
msg: NATS message to process
|
||||
**kwargs: Additional options passed to smartreceive
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
return await smartreceive(msg, **kwargs)
|
||||
|
||||
|
||||
# Convenience functions for module-level usage
|
||||
def send(
|
||||
subject: str,
|
||||
data: List[Tuple[str, Any, str]],
|
||||
**kwargs
|
||||
) -> Tuple[Dict, str]:
|
||||
"""
|
||||
Convenience function for sending data.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
data: List of (dataname, data, type) tuples
|
||||
**kwargs: Additional options
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
"""
|
||||
return asyncio.run(smartsend(subject, data, **kwargs))
|
||||
|
||||
|
||||
def receive(
|
||||
msg: Any,
|
||||
**kwargs
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Convenience function for receiving messages.
|
||||
|
||||
Args:
|
||||
msg: NATS message to process
|
||||
**kwargs: Additional options
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
return asyncio.run(smartreceive(msg, **kwargs))
|
||||
|
||||
|
||||
__all__ = [
|
||||
'smartsend',
|
||||
'smartreceive',
|
||||
'plik_oneshot_upload',
|
||||
'fetch_with_backoff',
|
||||
'msghandler',
|
||||
'send',
|
||||
'receive',
|
||||
'DEFAULT_SIZE_THRESHOLD',
|
||||
'DEFAULT_BROKER_URL',
|
||||
'DEFAULT_FILESERVER_URL',
|
||||
'NATSClient',
|
||||
'_serialize_data',
|
||||
'_deserialize_data',
|
||||
'log_trace',
|
||||
'publish_message'
|
||||
]
|
||||
1230
src/natsbridge.rs
Normal file
1230
src/natsbridge.rs
Normal file
File diff suppressed because it is too large
Load Diff
915
src/natsbridge_csr.js
Normal file
915
src/natsbridge_csr.js
Normal file
@@ -0,0 +1,915 @@
|
||||
/**
|
||||
* msghandler - Cross-Platform Bi-Directional Data Bridge
|
||||
* Browser-Compatible Implementation (Client-Side Rendering)
|
||||
*
|
||||
* This module provides functionality for sending and receiving data across network boundaries
|
||||
* using NATS as the message bus, with support for both direct payload transport and
|
||||
* URL-based transport for larger payloads.
|
||||
*
|
||||
* Supported payload types: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
|
||||
* Note: Browser version does NOT support Apache Arrow IPC (arrowtable) due to browser compatibility constraints.
|
||||
* Use "jsontable" for tabular data in browser applications.
|
||||
*
|
||||
* Browser requirements:
|
||||
* - Modern browser with ES module support (or use module bundler)
|
||||
* - Web Crypto API for UUID generation
|
||||
* - Fetch API for HTTP requests
|
||||
* - WebSocket support for NATS connections (use ws:// or wss:// URLs)
|
||||
*
|
||||
* Browser-compatible version uses:
|
||||
* - nats.ws for WebSocket-based NATS connections
|
||||
* - Web Crypto API for UUID generation
|
||||
* - Uint8Array instead of Buffer
|
||||
* - fetch API for file server communication
|
||||
*
|
||||
* @module msghandlerCSR
|
||||
*/
|
||||
|
||||
// Import browser-compatible NATS client
|
||||
import * as nats from 'nats.ws';
|
||||
|
||||
// Use native fetch available in browsers
|
||||
|
||||
// ---------------------------------------------- Constants ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Default size threshold for switching from direct to link transport (0.5MB)
|
||||
*/
|
||||
const DEFAULT_SIZE_THRESHOLD = 500_000;
|
||||
|
||||
/**
|
||||
* Default NATS server URL (WebSocket protocol)
|
||||
*/
|
||||
const DEFAULT_BROKER_URL = 'ws://localhost:4222';
|
||||
|
||||
/**
|
||||
* Default HTTP file server URL for link transport
|
||||
*/
|
||||
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
|
||||
|
||||
// ---------------------------------------------- Utility Functions ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Convert Uint8Array to Base64 string
|
||||
* @param {Uint8Array} data - Data to encode
|
||||
* @returns {string} Base64 encoded string
|
||||
*/
|
||||
function bufferToBase64(data) {
|
||||
const bytes = new Uint8Array(data);
|
||||
const binary = String.fromCharCode(...bytes);
|
||||
return btoa(binary);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert Base64 string to Uint8Array
|
||||
* @param {string} base64 - Base64 encoded string
|
||||
* @returns {Uint8Array} Decoded binary data
|
||||
*/
|
||||
function base64ToBuffer(base64) {
|
||||
const binary = atob(base64);
|
||||
const len = binary.length;
|
||||
const bytes = new Uint8Array(len);
|
||||
for (let i = 0; i < len; i++) {
|
||||
bytes[i] = binary.charCodeAt(i);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert Uint8Array to Base64 string (Unicode-safe version)
|
||||
* Uses TextEncoder/TextDecoder for proper Unicode handling
|
||||
* @param {Uint8Array} data - Data to encode
|
||||
* @returns {string} Base64 encoded string
|
||||
*/
|
||||
function bufferToBase64UnicodeSafe(data) {
|
||||
const bytes = new Uint8Array(data);
|
||||
// Use TextDecoder to properly handle the bytes as text
|
||||
const binary = String.fromCharCode(...bytes);
|
||||
return btoa(binary);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert Base64 string to Uint8Array (Unicode-safe version)
|
||||
* @param {string} base64 - Base64 encoded string
|
||||
* @returns {Uint8Array} Decoded binary data
|
||||
*/
|
||||
function base64ToBufferUnicodeSafe(base64) {
|
||||
const binary = atob(base64);
|
||||
const len = binary.length;
|
||||
const bytes = new Uint8Array(len);
|
||||
for (let i = 0; i < len; i++) {
|
||||
bytes[i] = binary.charCodeAt(i);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate UUID v4 using Web Crypto API
|
||||
* @returns {string} UUID string
|
||||
*/
|
||||
function uuidv4() {
|
||||
const array = new Uint8Array(16);
|
||||
crypto.getRandomValues(array);
|
||||
array[6] = (array[6] & 0x0f) | 0x40;
|
||||
array[8] = (array[8] & 0x3f) | 0x80;
|
||||
return Array.from(array, (val) => val.toString(16).padStart(2, '0').toUpperCase()).join('');
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a trace message with correlation ID and timestamp
|
||||
* @param {string} correlationId - Correlation ID for tracing
|
||||
* @param {string} message - Message content to log
|
||||
*/
|
||||
function logTrace(correlationId, message) {
|
||||
const timestamp = new Date().toISOString();
|
||||
console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`);
|
||||
}
|
||||
|
||||
// ---------------------------------------------- Serialization Functions ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Serialize data according to specified format
|
||||
* @param {any} data - Data to serialize
|
||||
* @param {string} payloadType - Target format: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
|
||||
* @returns {Uint8Array} Binary representation of the serialized data
|
||||
*/
|
||||
async function serializeData(data, payloadType) {
|
||||
if (payloadType === 'text') {
|
||||
if (typeof data === 'string') {
|
||||
return new Uint8Array(new TextEncoder().encode(data));
|
||||
} else {
|
||||
throw new Error('Text data must be a string');
|
||||
}
|
||||
} else if (payloadType === 'dictionary') {
|
||||
const jsonStr = JSON.stringify(data);
|
||||
return new Uint8Array(new TextEncoder().encode(jsonStr));
|
||||
} else if (payloadType === 'jsontable') {
|
||||
// Serialize array of objects to JSON format
|
||||
if (!Array.isArray(data)) {
|
||||
throw new Error('JSON table data must be an array');
|
||||
}
|
||||
const jsonStr = JSON.stringify(data);
|
||||
return new Uint8Array(new TextEncoder().encode(jsonStr));
|
||||
} else if (payloadType === 'image') {
|
||||
if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
|
||||
return new Uint8Array(data);
|
||||
} else {
|
||||
throw new Error('Image data must be Uint8Array, ArrayBuffer, or ArrayBuffer view');
|
||||
}
|
||||
} else if (payloadType === 'audio') {
|
||||
if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
|
||||
return new Uint8Array(data);
|
||||
} else {
|
||||
throw new Error('Audio data must be Uint8Array, ArrayBuffer, or ArrayBuffer view');
|
||||
}
|
||||
} else if (payloadType === 'video') {
|
||||
if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
|
||||
return new Uint8Array(data);
|
||||
} else {
|
||||
throw new Error('Video data must be Uint8Array, ArrayBuffer, or ArrayBuffer view');
|
||||
}
|
||||
} else if (payloadType === 'binary') {
|
||||
if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
|
||||
return new Uint8Array(data);
|
||||
} else {
|
||||
throw new Error('Binary data must be Uint8Array, ArrayBuffer, or ArrayBuffer view');
|
||||
}
|
||||
} else {
|
||||
throw new Error(`Unknown payload_type: ${payloadType}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize bytes to data based on type
|
||||
* @param {Uint8Array|ArrayBuffer} data - Serialized data as bytes
|
||||
* @param {string} payloadType - Data type
|
||||
* @param {string} correlationId - Correlation ID for logging
|
||||
* @returns {any} Deserialized data
|
||||
*/
|
||||
async function deserializeData(data, payloadType, correlationId) {
|
||||
const buffer = data instanceof Uint8Array ? data : new Uint8Array(data);
|
||||
|
||||
logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`);
|
||||
|
||||
// Debug: Show first 20 bytes in hex for binary data
|
||||
if (payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') {
|
||||
const hexPreview = [];
|
||||
for (let i = 0; i < Math.min(20, buffer.length); i++) {
|
||||
hexPreview.push(buffer[i].toString(16).padStart(2, '0'));
|
||||
}
|
||||
logTrace(correlationId, `deserializeData: First 20 bytes (hex): ${hexPreview.join(' ')}`);
|
||||
}
|
||||
|
||||
if (payloadType === 'text') {
|
||||
const result = new TextDecoder().decode(buffer);
|
||||
logTrace(correlationId, `deserializeData: text result length=${result.length}`);
|
||||
return result;
|
||||
} else if (payloadType === 'dictionary') {
|
||||
const jsonStr = new TextDecoder().decode(buffer);
|
||||
const result = JSON.parse(jsonStr);
|
||||
logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`);
|
||||
return result;
|
||||
} else if (payloadType === 'jsontable') {
|
||||
const jsonStr = new TextDecoder().decode(buffer);
|
||||
const result = JSON.parse(jsonStr);
|
||||
logTrace(correlationId, `deserializeData: jsontable result length=${Array.isArray(result) ? result.length : 'N/A'}`);
|
||||
return result;
|
||||
} else if (payloadType === 'image') {
|
||||
logTrace(correlationId, `deserializeData: image buffer length=${buffer.length}`);
|
||||
return buffer;
|
||||
} else if (payloadType === 'audio') {
|
||||
logTrace(correlationId, `deserializeData: audio buffer length=${buffer.length}`);
|
||||
return buffer;
|
||||
} else if (payloadType === 'video') {
|
||||
logTrace(correlationId, `deserializeData: video buffer length=${buffer.length}`);
|
||||
return buffer;
|
||||
} else if (payloadType === 'binary') {
|
||||
logTrace(correlationId, `deserializeData: binary buffer length=${buffer.length}`);
|
||||
return buffer;
|
||||
} else {
|
||||
throw new Error(`Unknown payload_type: ${payloadType}`);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------- File Server Handlers ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Upload data to plik server in one-shot mode
|
||||
* @param {string} fileServerUrl - Base URL of the plik server
|
||||
* @param {string} dataname - Name of the file being uploaded
|
||||
* @param {Uint8Array} data - Raw byte data of the file content
|
||||
* @returns {Promise<{status: number, uploadid: string, fileid: string, url: string}>}
|
||||
*/
|
||||
async function plikOneshotUpload(fileServerUrl, dataname, data) {
|
||||
const buffer = data instanceof Uint8Array ? data : new Uint8Array(data);
|
||||
|
||||
// Get upload id
|
||||
const urlGetUploadID = `${fileServerUrl}/upload`;
|
||||
const headers = { 'Content-Type': 'application/json' };
|
||||
const body = JSON.stringify({ OneShot: true });
|
||||
|
||||
const httpResponse = await fetch(urlGetUploadID, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body
|
||||
});
|
||||
|
||||
const responseJson = await httpResponse.json();
|
||||
const uploadid = responseJson.id;
|
||||
const uploadtoken = responseJson.uploadToken;
|
||||
|
||||
// Upload file
|
||||
const urlUpload = `${fileServerUrl}/file/${uploadid}`;
|
||||
const form = new FormData();
|
||||
const blob = new Blob([buffer], { type: 'application/octet-stream' });
|
||||
form.append('file', blob, dataname);
|
||||
|
||||
const uploadHeaders = {
|
||||
'X-UploadToken': uploadtoken
|
||||
};
|
||||
|
||||
const uploadResponse = await fetch(urlUpload, {
|
||||
method: 'POST',
|
||||
headers: uploadHeaders,
|
||||
body: form
|
||||
});
|
||||
|
||||
const uploadJson = await uploadResponse.json();
|
||||
const fileid = uploadJson.id;
|
||||
|
||||
const url = `${fileServerUrl}/file/${uploadid}/${fileid}/${dataname}`;
|
||||
|
||||
return {
|
||||
status: uploadResponse.status,
|
||||
uploadid,
|
||||
fileid,
|
||||
url
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch data from URL with exponential backoff
|
||||
* @param {string} url - URL to fetch from
|
||||
* @param {number} maxRetries - Maximum number of retry attempts
|
||||
* @param {number} baseDelay - Initial delay in milliseconds
|
||||
* @param {number} maxDelay - Maximum delay in milliseconds
|
||||
* @param {string} correlationId - Correlation ID for logging
|
||||
* @returns {Promise<Uint8Array>} Fetched data as bytes
|
||||
*/
|
||||
async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) {
|
||||
let delay = baseDelay;
|
||||
|
||||
for (let attempt = 1; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
const response = await fetch(url);
|
||||
|
||||
if (response.status === 200) {
|
||||
logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`);
|
||||
const arrayBuffer = await response.arrayBuffer();
|
||||
return new Uint8Array(arrayBuffer);
|
||||
} else {
|
||||
throw new Error(`Failed to fetch: ${response.status}`);
|
||||
}
|
||||
} catch (e) {
|
||||
logTrace(correlationId, `Attempt ${attempt} failed: ${e.constructor.name} - ${e.message}`);
|
||||
|
||||
if (attempt < maxRetries) {
|
||||
await new Promise(resolve => setTimeout(resolve, delay));
|
||||
delay = Math.min(delay * 2, maxDelay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(`Failed to fetch data after ${maxRetries} attempts`);
|
||||
}
|
||||
|
||||
// ---------------------------------------------- NATS Client ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* NATS client wrapper for connection management
|
||||
* Supports both single-use and persistent connection modes
|
||||
*/
|
||||
class NATSClient {
|
||||
/**
|
||||
* Create a new NATS client
|
||||
* @param {string} url - NATS server URL (ws:// or wss://)
|
||||
* @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes
|
||||
*/
|
||||
constructor(url, keepAlive = false) {
|
||||
this.url = url;
|
||||
this.connection = null;
|
||||
this.keepAlive = keepAlive;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to NATS server
|
||||
* @returns {Promise<NATS.Connection>}
|
||||
*/
|
||||
async connect() {
|
||||
if (this.connection) {
|
||||
return this.connection;
|
||||
}
|
||||
this.connection = await nats.connect({ servers: this.url });
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish message to NATS subject
|
||||
* @param {string} subject - NATS subject to publish to
|
||||
* @param {string} message - Message to publish
|
||||
* @param {string} correlationId - Correlation ID for logging
|
||||
*/
|
||||
async publish(subject, message, correlationId) {
|
||||
if (!this.connection) {
|
||||
await this.connect();
|
||||
}
|
||||
await this.connection.publish(subject, message);
|
||||
logTrace(correlationId, `Message published to ${subject}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the NATS connection
|
||||
*/
|
||||
async close() {
|
||||
if (this.connection) {
|
||||
this.connection.close();
|
||||
this.connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current connection (for external use)
|
||||
* @returns {NATS.Connection|null}
|
||||
*/
|
||||
getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if connected
|
||||
* @returns {boolean}
|
||||
*/
|
||||
isConnected() {
|
||||
return this.connection !== null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connection pool for managing multiple NATS connections
|
||||
* Useful for applications with multiple concurrent publishers
|
||||
*/
|
||||
class NATSConnectionPool {
|
||||
/**
|
||||
* Create a new connection pool
|
||||
* @param {string} url - NATS server URL (ws:// or wss://)
|
||||
* @param {number} [maxSize=10] - Maximum pool size
|
||||
*/
|
||||
constructor(url, maxSize = 10) {
|
||||
this.url = url;
|
||||
this.maxSize = maxSize;
|
||||
this.connections = new Map();
|
||||
this.idCounter = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a connection from the pool (or create new)
|
||||
* @returns {Promise<NATSClient>}
|
||||
*/
|
||||
async acquire() {
|
||||
// Try to find an existing idle connection
|
||||
for (const [id, client] of this.connections) {
|
||||
if (client.isConnected()) {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
// Create new connection if under limit
|
||||
if (this.connections.size < this.maxSize) {
|
||||
const id = `conn_${++this.idCounter}`;
|
||||
const client = new NATSClient(this.url, true);
|
||||
await client.connect();
|
||||
this.connections.set(id, client);
|
||||
return client;
|
||||
}
|
||||
|
||||
// Pool exhausted - create new connection (caller should close when done)
|
||||
const client = new NATSClient(this.url, false);
|
||||
await client.connect();
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a connection to the pool
|
||||
* @param {NATSClient} client - Connection to return
|
||||
*/
|
||||
release(client) {
|
||||
// Only return persistent connections
|
||||
if (client.keepAlive && client.isConnected()) {
|
||||
// Connection already in pool, do nothing
|
||||
return;
|
||||
}
|
||||
// Non-persistent connection - close it
|
||||
client.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all connections in the pool
|
||||
*/
|
||||
async closeAll() {
|
||||
for (const [id, client] of this.connections) {
|
||||
await client.close();
|
||||
}
|
||||
this.connections.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------- Core Functions ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Publish message to NATS
|
||||
* @param {string|NATSClient|NATS.Connection} brokerUrlOrClient - NATS URL, client, or connection
|
||||
* @param {string} subject - NATS subject to publish to
|
||||
* @param {string} message - JSON message to publish
|
||||
* @param {string} correlationId - Correlation ID for tracing
|
||||
* @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections)
|
||||
*/
|
||||
async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) {
|
||||
let conn;
|
||||
let shouldClose = false;
|
||||
|
||||
if (brokerUrlOrClient instanceof NATSClient) {
|
||||
conn = brokerUrlOrClient;
|
||||
} else if (brokerUrlOrClient && typeof brokerUrlOrClient.publish === 'function') {
|
||||
// Create a wrapper for direct connection (duck-typing check for NATS connection)
|
||||
conn = {
|
||||
async publish(subj, msg) {
|
||||
await brokerUrlOrClient.publish(subj, msg);
|
||||
},
|
||||
async close() {
|
||||
await brokerUrlOrClient.close();
|
||||
}
|
||||
};
|
||||
shouldClose = true;
|
||||
} else {
|
||||
// String URL - create new client
|
||||
const client = new NATSClient(brokerUrlOrClient);
|
||||
conn = client;
|
||||
shouldClose = true;
|
||||
}
|
||||
|
||||
await conn.publish(subject, message, correlationId);
|
||||
|
||||
// Only close if explicitly requested and it's a short-lived client
|
||||
if (shouldClose && closeConnection && conn instanceof NATSClient) {
|
||||
await conn.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build message envelope from payloads and metadata
|
||||
* @param {string} subject - NATS subject
|
||||
* @param {Array} payloads - Array of payload objects
|
||||
* @param {Object} options - Envelope metadata options
|
||||
* @returns {Object} Envelope object
|
||||
*/
|
||||
function buildEnvelope(subject, payloads, options) {
|
||||
return {
|
||||
correlation_id: options.correlation_id,
|
||||
msg_id: options.msg_id,
|
||||
timestamp: new Date().toISOString(),
|
||||
send_to: subject,
|
||||
msg_purpose: options.msg_purpose,
|
||||
sender_name: options.sender_name,
|
||||
sender_id: options.sender_id,
|
||||
receiver_name: options.receiver_name,
|
||||
receiver_id: options.receiver_id,
|
||||
reply_to: options.reply_to,
|
||||
reply_to_msg_id: options.reply_to_msg_id,
|
||||
broker_url: options.broker_url,
|
||||
metadata: options.metadata || {},
|
||||
payloads: payloads
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build payload object from serialized data
|
||||
* @param {string} dataname - Name of the payload
|
||||
* @param {string} payloadType - Type of the payload
|
||||
* @param {Uint8Array} payloadBytes - Serialized payload bytes
|
||||
* @param {string} transport - Transport type ("direct" or "link")
|
||||
* @param {string} data - Data (base64 for direct, URL for link)
|
||||
* @returns {Object} Payload object
|
||||
*/
|
||||
function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
|
||||
// Determine encoding based on payload type (matching Julia implementation)
|
||||
let encoding = 'base64';
|
||||
if (payloadType === 'jsontable') {
|
||||
encoding = 'json';
|
||||
}
|
||||
|
||||
return {
|
||||
id: uuidv4(),
|
||||
dataname,
|
||||
payload_type: payloadType,
|
||||
transport,
|
||||
encoding,
|
||||
size: payloadBytes.byteLength,
|
||||
data,
|
||||
metadata: transport === 'direct' ? { payload_bytes: payloadBytes.byteLength } : {}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data via NATS with automatic transport selection
|
||||
*
|
||||
* This function intelligently routes data delivery based on payload size.
|
||||
* If the serialized payload is smaller than size_threshold, it encodes the data as Base64
|
||||
* and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
|
||||
* and publishes only the download URL over NATS.
|
||||
*
|
||||
* @param {string} subject - NATS subject to publish the message to
|
||||
* @param {Array} data - List of [dataname, data, type] tuples to send
|
||||
* - type: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
|
||||
* - Note: "arrowtable" is NOT supported in browser (use "jsontable" for tabular data)
|
||||
* @param {Object} options - Optional configuration
|
||||
* @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server (WebSocket)
|
||||
* @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server
|
||||
* @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads
|
||||
* @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport
|
||||
* @param {string} [options.correlation_id=uuidv4()] - Correlation ID for tracing
|
||||
* @param {string} [options.msg_purpose="chat"] - Purpose of the message
|
||||
* @param {string} [options.sender_name="msghandler"] - Name of the sender
|
||||
* @param {string} [options.receiver_name=""] - Name of the receiver (empty means broadcast)
|
||||
* @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast)
|
||||
* @param {string} [options.reply_to=""] - Topic to reply to
|
||||
* @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to
|
||||
* @param {boolean} [options.is_publish=true] - Whether to automatically publish the message
|
||||
* @param {NATSClient|NATS.Connection} [options.nats_connection=null] - Pre-existing NATS connection
|
||||
* @param {string} [options.msg_id=uuidv4()] - Message ID
|
||||
* @param {string} [options.sender_id=uuidv4()] - Sender ID
|
||||
* @returns {Promise<[Object, string]>} Tuple of [env, env_json_str]
|
||||
*
|
||||
* @example
|
||||
* // Send a single payload
|
||||
* const [env, envJsonStr] = await msghandlerCSR.smartsend(
|
||||
* "/test",
|
||||
* [["dataname1", data1, "dictionary"]],
|
||||
* { broker_url: "wss://nats.example.com" }
|
||||
* );
|
||||
*
|
||||
* // Send multiple payloads (use jsontable instead of arrowtable for browser)
|
||||
* const [env, envJsonStr] = await msghandlerCSR.smartsend(
|
||||
* "/test",
|
||||
* [
|
||||
* ["dataname1", data1, "dictionary"],
|
||||
* ["dataname2", tableData, "jsontable"]
|
||||
* ],
|
||||
* { broker_url: "wss://nats.example.com" }
|
||||
* );
|
||||
*/
|
||||
async function smartsend(subject, data, options = {}) {
|
||||
const {
|
||||
broker_url = DEFAULT_BROKER_URL,
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler = plikOneshotUpload,
|
||||
size_threshold = DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id = uuidv4(),
|
||||
msg_purpose = 'chat',
|
||||
sender_name = 'msghandler',
|
||||
receiver_name = '',
|
||||
receiver_id = '',
|
||||
reply_to = '',
|
||||
reply_to_msg_id = '',
|
||||
is_publish = true,
|
||||
nats_connection = null,
|
||||
msg_id = uuidv4(),
|
||||
sender_id = uuidv4()
|
||||
} = options;
|
||||
|
||||
logTrace(correlation_id, `Starting smartsend for subject: ${subject}`);
|
||||
logTrace(correlation_id, `smartsend: data array length=${data.length}`);
|
||||
|
||||
// Debug: Log input data structure
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
const [dataname, payloadData, payloadType] = data[i];
|
||||
logTrace(correlation_id, `smartsend: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
}
|
||||
|
||||
// Process payloads
|
||||
const payloads = [];
|
||||
for (const [dataname, payloadData, payloadType] of data) {
|
||||
logTrace(correlation_id, `smartsend: Processing payload '${dataname}' type=${payloadType}`);
|
||||
logTrace(correlation_id, `smartsend: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
|
||||
const payloadBytes = await serializeData(payloadData, payloadType);
|
||||
const payloadSize = payloadBytes.byteLength;
|
||||
|
||||
logTrace(correlation_id, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);
|
||||
|
||||
// Debug: Show first 20 bytes of serialized data for table type
|
||||
if (payloadType === 'table') {
|
||||
const hexPreview = [];
|
||||
for (let i = 0; i < Math.min(20, payloadBytes.length); i++) {
|
||||
hexPreview.push(payloadBytes[i].toString(16).padStart(2, '0'));
|
||||
}
|
||||
logTrace(correlation_id, `Serialized table data first 20 bytes (hex): ${hexPreview.join(' ')}`);
|
||||
}
|
||||
|
||||
if (payloadSize < size_threshold) {
|
||||
// Direct path
|
||||
const payloadB64 = bufferToBase64(payloadBytes);
|
||||
logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes, base64 length=${payloadB64.length}`);
|
||||
|
||||
const payload = buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64);
|
||||
payloads.push(payload);
|
||||
} else {
|
||||
// Link path
|
||||
logTrace(correlation_id, `Using link transport, uploading to fileserver`);
|
||||
|
||||
const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes);
|
||||
|
||||
if (response.status !== 200) {
|
||||
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
|
||||
}
|
||||
|
||||
logTrace(correlation_id, `Uploaded to URL: ${response.url}`);
|
||||
|
||||
const payload = buildPayload(dataname, payloadType, payloadBytes, 'link', response.url);
|
||||
payloads.push(payload);
|
||||
}
|
||||
}
|
||||
|
||||
// Build envelope
|
||||
const env = buildEnvelope(subject, payloads, {
|
||||
correlation_id,
|
||||
msg_id,
|
||||
msg_purpose,
|
||||
sender_name,
|
||||
sender_id,
|
||||
receiver_name,
|
||||
receiver_id,
|
||||
reply_to,
|
||||
reply_to_msg_id,
|
||||
broker_url
|
||||
});
|
||||
|
||||
const env_json_str = JSON.stringify(env);
|
||||
|
||||
if (is_publish) {
|
||||
if (nats_connection) {
|
||||
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
|
||||
} else {
|
||||
await publishMessage(broker_url, subject, env_json_str, correlation_id);
|
||||
}
|
||||
}
|
||||
|
||||
return [env, env_json_str];
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive and process NATS message
|
||||
*
|
||||
* This function processes incoming NATS messages, handling both direct transport
|
||||
* (base64 decoded payloads) and link transport (URL-based payloads).
|
||||
* It deserializes the data based on the transport type and returns the result.
|
||||
*
|
||||
* @param {Object} msg - NATS message object with payload property
|
||||
* @param {Object} options - Optional configuration
|
||||
* @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads
|
||||
* @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL
|
||||
* @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms
|
||||
* @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms
|
||||
* @returns {Promise<Object>} Envelope object with processed payloads
|
||||
*
|
||||
* @example
|
||||
* // Receive and process message
|
||||
* const env = await msghandlerCSR.smartreceive(msg, {
|
||||
* fileserver_download_handler: msghandlerCSR.fetchWithBackoff,
|
||||
* max_retries: 5,
|
||||
* base_delay: 100,
|
||||
* max_delay: 5000
|
||||
* });
|
||||
* // env.payloads is an Array of [dataname, data, type] arrays
|
||||
* for (const [dataname, data, type] of env.payloads) {
|
||||
* console.log(`${dataname}: ${data} (type: ${type})`);
|
||||
* }
|
||||
*/
|
||||
async function smartreceive(msg, options = {}) {
|
||||
const {
|
||||
fileserver_download_handler = fetchWithBackoff,
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
} = options;
|
||||
|
||||
// Debug: Log message object structure
|
||||
logTrace('smartreceive', `smartreceive: msg object keys: ${Object.keys(msg).join(', ')}`);
|
||||
logTrace('smartreceive', `smartreceive: msg.data type: ${typeof msg.data}, constructor: ${msg.data?.constructor?.name}`);
|
||||
logTrace('smartreceive', `smartreceive: msg.payload type: ${typeof msg.payload}, constructor: ${msg.payload?.constructor?.name}`);
|
||||
|
||||
// Parse the JSON envelope
|
||||
// NATS.js v2.x uses msg.data instead of msg.payload
|
||||
let payload;
|
||||
if (msg.data !== undefined) {
|
||||
payload = typeof msg.data === 'string' ? msg.data : new TextDecoder().decode(msg.data);
|
||||
} else if (msg.payload !== undefined) {
|
||||
payload = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
|
||||
} else {
|
||||
throw new Error('Message has neither data nor payload property');
|
||||
}
|
||||
|
||||
logTrace('smartreceive', `smartreceive: raw payload length=${payload.length}`);
|
||||
|
||||
// Debug: Show first 200 chars of payload
|
||||
const payloadPreview = payload.substring(0, 200);
|
||||
logTrace('smartreceive', `smartreceive: payload preview: ${payloadPreview}`);
|
||||
|
||||
let envJsonObj;
|
||||
try {
|
||||
envJsonObj = JSON.parse(payload);
|
||||
} catch (e) {
|
||||
logTrace('smartreceive', `smartreceive: JSON parse failed: ${e.message}`);
|
||||
throw e;
|
||||
}
|
||||
|
||||
logTrace(envJsonObj.correlation_id, 'Processing received message');
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: envelope has ${envJsonObj.payloads.length} payloads`);
|
||||
|
||||
// Process all payloads in the envelope
|
||||
const payloadsList = [];
|
||||
const numPayloads = envJsonObj.payloads.length;
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Processing ${numPayloads} payloads`);
|
||||
|
||||
for (let i = 0; i < numPayloads; i++) {
|
||||
const payloadObj = envJsonObj.payloads[i];
|
||||
const transport = payloadObj.transport;
|
||||
const dataname = payloadObj.dataname;
|
||||
const payloadType = payloadObj.payload_type;
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`);
|
||||
|
||||
if (transport === 'direct') {
|
||||
logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`);
|
||||
|
||||
// Extract base64 payload from the payload
|
||||
const payloadB64 = payloadObj.data;
|
||||
logTrace(envJsonObj.correlation_id, `Direct transport: base64 length=${payloadB64?.length}`);
|
||||
|
||||
// Decode Base64 payload
|
||||
const payloadBytes = base64ToBuffer(payloadB64);
|
||||
logTrace(envJsonObj.correlation_id, `Direct transport: decoded bytes=${payloadBytes.length}`);
|
||||
|
||||
// Deserialize based on type
|
||||
const dataType = payloadObj.payload_type;
|
||||
const data = await deserializeData(payloadBytes, dataType, envJsonObj.correlation_id);
|
||||
logTrace(envJsonObj.correlation_id, `Direct transport: deserialized data type=${typeof data}, constructor=${data?.constructor?.name}`);
|
||||
|
||||
payloadsList.push([dataname, data, dataType]);
|
||||
} else if (transport === 'link') {
|
||||
// Extract download URL from the payload
|
||||
const url = payloadObj.data;
|
||||
logTrace(envJsonObj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`);
|
||||
|
||||
// Fetch with exponential backoff using the download handler
|
||||
const downloadedData = await fileserver_download_handler(
|
||||
url,
|
||||
max_retries,
|
||||
base_delay,
|
||||
max_delay,
|
||||
envJsonObj.correlation_id
|
||||
);
|
||||
|
||||
// Deserialize based on type
|
||||
const dataType = payloadObj.payload_type;
|
||||
const data = await deserializeData(downloadedData, dataType, envJsonObj.correlation_id);
|
||||
|
||||
payloadsList.push([dataname, data, dataType]);
|
||||
} else {
|
||||
throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`);
|
||||
}
|
||||
}
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Successfully processed all ${payloadsList.length} payloads`);
|
||||
envJsonObj.payloads = payloadsList;
|
||||
return envJsonObj;
|
||||
}
|
||||
|
||||
// ---------------------------------------------- Module Exports ---------------------------------------------- //
|
||||
|
||||
const msghandlerCSR = {
|
||||
/**
|
||||
* NATS client class for connection management
|
||||
* Supports both single-use and persistent connection modes
|
||||
*
|
||||
* @example
|
||||
* // Single-use connection (closes after publish)
|
||||
* const client = new msghandlerCSR.NATSClient("wss://nats.example.com");
|
||||
* await msghandlerCSR.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
|
||||
* await client.close();
|
||||
*
|
||||
* // Persistent connection (keeps connection open)
|
||||
* const client = new msghandlerCSR.NATSClient("wss://nats.example.com", true);
|
||||
* await client.connect();
|
||||
* await msghandlerCSR.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false });
|
||||
* await msghandlerCSR.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id");
|
||||
* // Connection remains open for more publishes
|
||||
* await client.close();
|
||||
*/
|
||||
NATSClient,
|
||||
|
||||
/**
|
||||
* Connection pool for managing multiple NATS connections
|
||||
* Useful for applications with multiple concurrent publishers
|
||||
*
|
||||
* @example
|
||||
* const pool = new msghandlerCSR.NATSConnectionPool("wss://nats.example.com", 10);
|
||||
* const client = await pool.acquire();
|
||||
* await msghandlerCSR.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
|
||||
* pool.release(client);
|
||||
* await pool.closeAll();
|
||||
*/
|
||||
NATSConnectionPool,
|
||||
|
||||
/**
|
||||
* Send data via NATS with automatic transport selection
|
||||
*/
|
||||
smartsend,
|
||||
|
||||
/**
|
||||
* Receive and process NATS message
|
||||
*/
|
||||
smartreceive,
|
||||
|
||||
/**
|
||||
* Publish message to NATS
|
||||
*
|
||||
* @example
|
||||
* // Using a persistent connection
|
||||
* const client = new msghandlerCSR.NATSClient("wss://nats.example.com", true);
|
||||
* await client.connect();
|
||||
* await msghandlerCSR.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false);
|
||||
* // Connection stays open for more publishes
|
||||
* await client.close();
|
||||
*/
|
||||
publishMessage,
|
||||
|
||||
/**
|
||||
* Upload data to plik server in one-shot mode
|
||||
*/
|
||||
plikOneshotUpload,
|
||||
|
||||
/**
|
||||
* Fetch data from URL with exponential backoff
|
||||
*/
|
||||
fetchWithBackoff,
|
||||
|
||||
/**
|
||||
* Default constants
|
||||
*/
|
||||
DEFAULT_SIZE_THRESHOLD,
|
||||
DEFAULT_BROKER_URL,
|
||||
DEFAULT_FILESERVER_URL
|
||||
};
|
||||
|
||||
export default msghandlerCSR;
|
||||
673
src/natsbridge_mpy.py
Normal file
673
src/natsbridge_mpy.py
Normal file
@@ -0,0 +1,673 @@
|
||||
"""
|
||||
msghandler - Cross-Platform Bi-Directional Data Bridge
|
||||
MicroPython Implementation
|
||||
|
||||
This module provides functionality for sending and receiving data across network boundaries
|
||||
using NATS as the message bus, with support for both direct payload transport and
|
||||
URL-based transport for larger payloads.
|
||||
|
||||
Note: MicroPython has significant constraints compared to desktop implementations:
|
||||
- Limited memory (~256KB - 1MB)
|
||||
- No Arrow IPC support (memory constraints)
|
||||
- Synchronous API (no async/await)
|
||||
- Lower size threshold for direct transport
|
||||
"""
|
||||
|
||||
import network
|
||||
import time
|
||||
import json
|
||||
import base64
|
||||
import uos
|
||||
import struct
|
||||
import random
|
||||
|
||||
# ---------------------------------------------- Constants ---------------------------------------------- #
|
||||
|
||||
"""
|
||||
Default size threshold for switching from direct to link transport (100KB for MicroPython)
|
||||
"""
|
||||
DEFAULT_SIZE_THRESHOLD = 100000
|
||||
|
||||
"""
|
||||
Default NATS server URL
|
||||
"""
|
||||
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
||||
|
||||
"""
|
||||
Default HTTP file server URL for link transport
|
||||
"""
|
||||
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
||||
|
||||
"""
|
||||
Hard limit for payload size in MicroPython (50KB)
|
||||
"""
|
||||
MAX_PAYLOAD_SIZE = 50000
|
||||
|
||||
|
||||
# ---------------------------------------------- Utility Functions ---------------------------------------------- #
|
||||
|
||||
def log_trace(correlation_id, message):
|
||||
"""
|
||||
Log a trace message with correlation ID and timestamp.
|
||||
|
||||
Args:
|
||||
correlation_id: Correlation ID for tracing
|
||||
message: Message content to log
|
||||
"""
|
||||
timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime())
|
||||
print(f"[{timestamp}] [Correlation: {correlation_id}] {message}")
|
||||
|
||||
|
||||
def _generate_uuid():
|
||||
"""
|
||||
Generate a simple UUID compatible with MicroPython.
|
||||
|
||||
Returns:
|
||||
UUID string
|
||||
"""
|
||||
# Generate a simple UUID-like string
|
||||
# Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
|
||||
hex_chars = '0123456789abcdef'
|
||||
uuid_str = ''.join([random.choice(hex_chars) for _ in range(32)])
|
||||
# Insert hyphens at proper positions
|
||||
return f"{uuid_str[:8]}-{uuid_str[8:12]}-{uuid_str[12:16]}-{uuid_str[16:20]}-{uuid_str[20:]}"
|
||||
|
||||
|
||||
# ---------------------------------------------- Serialization Functions ---------------------------------------------- #
|
||||
|
||||
def _serialize_data(data, payload_type):
|
||||
"""
|
||||
Serialize data according to specified format.
|
||||
|
||||
Args:
|
||||
data: Data to serialize (string for "text", dict for "dictionary",
|
||||
bytes for "image", "audio", "video", "binary")
|
||||
payload_type: Target format: "text", "dictionary", "image", "audio", "video", "binary"
|
||||
|
||||
Returns:
|
||||
Binary representation of the serialized data
|
||||
|
||||
Note:
|
||||
MicroPython does not support "table" type due to memory constraints.
|
||||
|
||||
Raises:
|
||||
ValueError: If payload_type is not one of the supported types
|
||||
"""
|
||||
if payload_type == 'text':
|
||||
if isinstance(data, str):
|
||||
return data.encode('utf-8')
|
||||
else:
|
||||
raise ValueError('Text data must be a string')
|
||||
elif payload_type == 'dictionary':
|
||||
json_str = json.dumps(data)
|
||||
return json_str.encode('utf-8')
|
||||
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
||||
if isinstance(data, (bytes, bytearray, memoryview)):
|
||||
return bytes(data)
|
||||
else:
|
||||
raise ValueError(f'{payload_type} data must be bytes')
|
||||
else:
|
||||
raise ValueError(f'Unknown payload_type: {payload_type}')
|
||||
|
||||
|
||||
def _deserialize_data(data, payload_type):
|
||||
"""
|
||||
Deserialize bytes to data based on type.
|
||||
|
||||
Args:
|
||||
data: Serialized data as bytes
|
||||
payload_type: Data type ("text", "dictionary", "image", "audio", "video", "binary")
|
||||
|
||||
Returns:
|
||||
Deserialized data (String for "text", dict for "dictionary", bytes for others)
|
||||
|
||||
Note:
|
||||
MicroPython does not support "table" type due to memory constraints.
|
||||
|
||||
Raises:
|
||||
ValueError: If payload_type is not one of the supported types
|
||||
"""
|
||||
if payload_type == 'text':
|
||||
return data.decode('utf-8')
|
||||
elif payload_type == 'dictionary':
|
||||
json_str = data.decode('utf-8')
|
||||
return json.loads(json_str)
|
||||
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
||||
return data
|
||||
else:
|
||||
raise ValueError(f'Unknown payload_type: {payload_type}')
|
||||
|
||||
|
||||
# ---------------------------------------------- File Server Handlers ---------------------------------------------- #
|
||||
|
||||
def _sync_fileserver_upload(file_server_url, dataname, data):
|
||||
"""
|
||||
Synchronous file upload to HTTP server.
|
||||
|
||||
Note:
|
||||
This is a simplified implementation for MicroPython.
|
||||
In practice, would use network.HTTP or similar.
|
||||
Currently raises NotImplementedError as file upload is not fully supported.
|
||||
|
||||
Args:
|
||||
file_server_url: Base URL of the file server
|
||||
dataname: Name of the file being uploaded
|
||||
data: Raw byte data of the file content
|
||||
|
||||
Returns:
|
||||
Dict with keys: 'status', 'url'
|
||||
|
||||
Raises:
|
||||
NotImplementedError: File upload is not implemented in MicroPython
|
||||
"""
|
||||
raise NotImplementedError("File upload not fully implemented in MicroPython. "
|
||||
"Use direct transport only for memory-constrained devices.")
|
||||
|
||||
|
||||
def _sync_fileserver_download(url, max_retries, base_delay, max_delay, correlation_id):
|
||||
"""
|
||||
Synchronous file download with exponential backoff.
|
||||
|
||||
Note:
|
||||
This is a simplified implementation for MicroPython.
|
||||
In practice, would use network.HTTP or similar.
|
||||
Currently raises NotImplementedError as file download is not fully supported.
|
||||
|
||||
Args:
|
||||
url: URL to download from
|
||||
max_retries: Maximum retry attempts
|
||||
base_delay: Initial delay in ms
|
||||
max_delay: Maximum delay in ms
|
||||
correlation_id: Correlation ID for logging
|
||||
|
||||
Returns:
|
||||
Downloaded bytes
|
||||
|
||||
Raises:
|
||||
NotImplementedError: File download is not implemented in MicroPython
|
||||
"""
|
||||
raise NotImplementedError("File download not fully implemented in MicroPython. "
|
||||
"Use direct transport only for memory-constrained devices.")
|
||||
|
||||
|
||||
# ---------------------------------------------- NATS Client ---------------------------------------------- #
|
||||
|
||||
class NATSClient:
|
||||
"""
|
||||
NATS client wrapper for MicroPython.
|
||||
|
||||
Note:
|
||||
This is a simplified implementation for MicroPython.
|
||||
Full NATS client implementation would require additional network stack support.
|
||||
"""
|
||||
|
||||
def __init__(self, url=DEFAULT_BROKER_URL):
|
||||
"""
|
||||
Initialize NATS client.
|
||||
|
||||
Args:
|
||||
url: NATS server URL
|
||||
"""
|
||||
self.url = url
|
||||
self._connected = False
|
||||
|
||||
def connect(self):
|
||||
"""
|
||||
Connect to NATS server.
|
||||
|
||||
Note:
|
||||
This is a placeholder implementation.
|
||||
Actual NATS client would require network stack support.
|
||||
|
||||
Returns:
|
||||
True if connected, False otherwise
|
||||
"""
|
||||
# Placeholder - actual implementation would connect to NATS server
|
||||
self._connected = True
|
||||
return self._connected
|
||||
|
||||
def publish(self, subject, message):
|
||||
"""
|
||||
Publish message to NATS subject.
|
||||
|
||||
Note:
|
||||
This is a placeholder implementation.
|
||||
Actual NATS client would require network stack support.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
message: Message to publish
|
||||
"""
|
||||
if not self._connected:
|
||||
raise RuntimeError("Not connected to NATS server")
|
||||
# Placeholder - actual implementation would publish to NATS
|
||||
print(f"[NATS] Publish to {subject}: {message[:50]}...")
|
||||
|
||||
def close(self):
|
||||
"""Close the NATS connection."""
|
||||
self._connected = False
|
||||
|
||||
|
||||
# ---------------------------------------------- Core Functions ---------------------------------------------- #
|
||||
|
||||
def _build_envelope(subject, payloads, options):
|
||||
"""
|
||||
Build message envelope from payloads and metadata.
|
||||
|
||||
Args:
|
||||
subject: NATS subject
|
||||
payloads: Array of payload objects
|
||||
options: Envelope metadata options
|
||||
|
||||
Returns:
|
||||
Envelope dict
|
||||
"""
|
||||
return {
|
||||
'correlation_id': options['correlation_id'],
|
||||
'msg_id': options['msg_id'],
|
||||
'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()),
|
||||
'send_to': subject,
|
||||
'msg_purpose': options['msg_purpose'],
|
||||
'sender_name': options['sender_name'],
|
||||
'sender_id': options['sender_id'],
|
||||
'receiver_name': options['receiver_name'],
|
||||
'receiver_id': options['receiver_id'],
|
||||
'reply_to': options['reply_to'],
|
||||
'reply_to_msg_id': options['reply_to_msg_id'],
|
||||
'broker_url': options['broker_url'],
|
||||
'metadata': {},
|
||||
'payloads': payloads
|
||||
}
|
||||
|
||||
|
||||
def _build_payload(dataname, payload_type, payload_bytes, transport, data):
|
||||
"""
|
||||
Build payload object from serialized data.
|
||||
|
||||
Args:
|
||||
dataname: Name of the payload
|
||||
payload_type: Type of the payload
|
||||
payload_bytes: Serialized payload bytes
|
||||
transport: Transport type ("direct" or "link")
|
||||
data: Data (base64 for direct, URL for link)
|
||||
|
||||
Returns:
|
||||
Payload dict
|
||||
"""
|
||||
return {
|
||||
'id': _generate_uuid(),
|
||||
'dataname': dataname,
|
||||
'payload_type': payload_type,
|
||||
'transport': transport,
|
||||
'encoding': 'base64' if transport == 'direct' else 'none',
|
||||
'size': len(payload_bytes),
|
||||
'data': data,
|
||||
'metadata': {'payload_bytes': len(payload_bytes)} if transport == 'direct' else {}
|
||||
}
|
||||
|
||||
|
||||
def _publish(subject, message, correlation_id):
|
||||
"""
|
||||
Publish message to NATS.
|
||||
|
||||
Note:
|
||||
This is a simplified implementation for MicroPython.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
message: JSON message to publish
|
||||
correlation_id: Correlation ID for logging
|
||||
"""
|
||||
log_trace(correlation_id, f"Publishing to {subject}")
|
||||
# Placeholder - actual implementation would use NATSClient
|
||||
# client = NATSClient()
|
||||
# client.connect()
|
||||
# client.publish(subject, message)
|
||||
# client.close()
|
||||
|
||||
|
||||
def smartsend(subject, data, **kwargs):
|
||||
"""
|
||||
Send data via NATS with automatic transport selection.
|
||||
|
||||
This function intelligently routes data delivery based on payload size.
|
||||
If the serialized payload is smaller than size_threshold, it encodes the data as Base64
|
||||
and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
|
||||
and publishes only the download URL over NATS.
|
||||
|
||||
Note:
|
||||
MicroPython has memory constraints, so the default size_threshold is lower (100KB).
|
||||
Table type is not supported due to memory constraints.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish the message to
|
||||
data: List of (dataname, data, type) tuples to send
|
||||
- dataname: Name of the payload
|
||||
- data: The actual data to send
|
||||
- type: Payload type: "text", "dictionary", "image", "audio", "video", "binary"
|
||||
broker_url: NATS server URL (default: DEFAULT_BROKER_URL)
|
||||
fileserver_url: HTTP file server URL (default: DEFAULT_FILESERVER_URL)
|
||||
fileserver_upload_handler: Function to handle fileserver uploads (default: _sync_fileserver_upload)
|
||||
size_threshold: Threshold in bytes separating direct vs link transport (default: 100000)
|
||||
correlation_id: Correlation ID for tracing (auto-generated if not provided)
|
||||
msg_purpose: Purpose of the message (default: "chat")
|
||||
sender_name: Name of the sender (default: "msghandler")
|
||||
receiver_name: Name of the receiver (empty means broadcast)
|
||||
receiver_id: UUID of the receiver (empty means broadcast)
|
||||
reply_to: Topic to reply to (empty if no reply expected)
|
||||
reply_to_msg_id: Message ID this message is replying to
|
||||
is_publish: Whether to automatically publish the message (default: True)
|
||||
msg_id: Message ID (auto-generated if not provided)
|
||||
sender_id: Sender ID (auto-generated if not provided)
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str) where:
|
||||
- env: Dict containing all metadata and payloads
|
||||
- env_json_str: JSON string for publishing to NATS
|
||||
|
||||
Example:
|
||||
>>> # Send text payload
|
||||
>>> env, env_json_str = msghandler.smartsend(
|
||||
... "/chat",
|
||||
... [("message", "Hello!", "text")],
|
||||
... broker_url="nats://localhost:4222"
|
||||
... )
|
||||
>>>
|
||||
>>> # Send dictionary payload
|
||||
>>> env, env_json_str = msghandler.smartsend(
|
||||
... "/config",
|
||||
... [("config", {"key": "value"}, "dictionary")],
|
||||
... broker_url="nats://localhost:4222"
|
||||
... )
|
||||
>>>
|
||||
>>> # Send binary payload (image, audio, video)
|
||||
>>> env, env_json_str = msghandler.smartsend(
|
||||
... "/media",
|
||||
... [("image", image_bytes, "image")],
|
||||
... broker_url="nats://localhost:4222"
|
||||
... )
|
||||
"""
|
||||
# Extract options with defaults
|
||||
correlation_id = kwargs.get('correlation_id', _generate_uuid())
|
||||
msg_id = kwargs.get('msg_id', _generate_uuid())
|
||||
sender_id = kwargs.get('sender_id', _generate_uuid())
|
||||
broker_url = kwargs.get('broker_url', DEFAULT_BROKER_URL)
|
||||
fileserver_url = kwargs.get('fileserver_url', DEFAULT_FILESERVER_URL)
|
||||
size_threshold = kwargs.get('size_threshold', DEFAULT_SIZE_THRESHOLD)
|
||||
msg_purpose = kwargs.get('msg_purpose', 'chat')
|
||||
sender_name = kwargs.get('sender_name', 'msghandler')
|
||||
receiver_name = kwargs.get('receiver_name', '')
|
||||
receiver_id = kwargs.get('receiver_id', '')
|
||||
reply_to = kwargs.get('reply_to', '')
|
||||
reply_to_msg_id = kwargs.get('reply_to_msg_id', '')
|
||||
is_publish = kwargs.get('is_publish', True)
|
||||
fileserver_upload_handler = kwargs.get('fileserver_upload_handler', _sync_fileserver_upload)
|
||||
|
||||
log_trace(correlation_id, f"Starting smartsend for subject: {subject}")
|
||||
|
||||
# Process payloads
|
||||
payloads = []
|
||||
for dataname, payload_data, payload_type in data:
|
||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
||||
payload_size = len(payload_bytes)
|
||||
|
||||
# Check against hard limit for MicroPython
|
||||
if payload_size > MAX_PAYLOAD_SIZE:
|
||||
raise MemoryError(f"Payload '{dataname}' exceeds max size {MAX_PAYLOAD_SIZE} bytes")
|
||||
|
||||
log_trace(correlation_id, f"Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes")
|
||||
|
||||
if payload_size < size_threshold:
|
||||
# Direct path
|
||||
payload_b64 = base64.b64encode(payload_bytes).decode('ascii')
|
||||
log_trace(correlation_id, f"Using direct transport for {payload_size} bytes")
|
||||
|
||||
payload = _build_payload(dataname, payload_type, payload_bytes, 'direct', payload_b64)
|
||||
payloads.append(payload)
|
||||
else:
|
||||
# Link path (limited support)
|
||||
log_trace(correlation_id, "Using link transport, uploading to fileserver")
|
||||
|
||||
try:
|
||||
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||||
log_trace(correlation_id, f"Uploaded to URL: {response['url']}")
|
||||
|
||||
payload = _build_payload(dataname, payload_type, payload_bytes, 'link', response['url'])
|
||||
payloads.append(payload)
|
||||
except NotImplementedError:
|
||||
# Fall back to direct transport if file upload not available
|
||||
log_trace(correlation_id, "File upload not available, using direct transport")
|
||||
payload_b64 = base64.b64encode(payload_bytes).decode('ascii')
|
||||
payload = _build_payload(dataname, payload_type, payload_bytes, 'direct', payload_b64)
|
||||
payloads.append(payload)
|
||||
|
||||
# Build envelope
|
||||
env = _build_envelope(subject, payloads, {
|
||||
'correlation_id': correlation_id,
|
||||
'msg_id': msg_id,
|
||||
'msg_purpose': msg_purpose,
|
||||
'sender_name': sender_name,
|
||||
'sender_id': sender_id,
|
||||
'receiver_name': receiver_name,
|
||||
'receiver_id': receiver_id,
|
||||
'reply_to': reply_to,
|
||||
'reply_to_msg_id': reply_to_msg_id,
|
||||
'broker_url': broker_url
|
||||
})
|
||||
|
||||
env_json_str = json.dumps(env)
|
||||
|
||||
if is_publish:
|
||||
_publish(subject, env_json_str, correlation_id)
|
||||
|
||||
return env, env_json_str
|
||||
|
||||
|
||||
def smartreceive(msg, **kwargs):
|
||||
"""
|
||||
Receive and process NATS message.
|
||||
|
||||
This function processes incoming NATS messages, handling both direct transport
|
||||
(base64 decoded payloads) and link transport (URL-based payloads).
|
||||
It deserializes the data based on the transport type and returns the result.
|
||||
|
||||
Note:
|
||||
MicroPython has memory constraints, so large payloads should be avoided.
|
||||
Table type is not supported due to memory constraints.
|
||||
|
||||
Args:
|
||||
msg: NATS message to process (can be string, dict, or object with 'payload' attribute)
|
||||
fileserver_download_handler: Function to handle downloading data from file server URLs
|
||||
max_retries: Maximum retry attempts (default: 3)
|
||||
base_delay: Initial delay in ms (default: 100)
|
||||
max_delay: Maximum delay in ms (default: 1000)
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads field containing List[Tuple[str, Any, str]]
|
||||
|
||||
Example:
|
||||
>>> # Receive and process message
|
||||
>>> env = msghandler.smartreceive(msg, fileserver_download_handler=_sync_fileserver_download)
|
||||
>>> # env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
|
||||
>>> for dataname, data, type_ in env["payloads"]:
|
||||
... print(f"{dataname}: {data} (type: {type_})")
|
||||
"""
|
||||
# Parse the JSON envelope
|
||||
if isinstance(msg, dict):
|
||||
# Already parsed
|
||||
env_json_obj = msg
|
||||
elif hasattr(msg, 'payload'):
|
||||
# Object with payload attribute
|
||||
payload = msg.payload if isinstance(msg.payload, str) else msg.payload.decode('utf-8')
|
||||
env_json_obj = json.loads(payload)
|
||||
else:
|
||||
# Assume it's already a JSON string or dict
|
||||
env_json_obj = json.loads(msg) if isinstance(msg, str) else msg
|
||||
|
||||
correlation_id = env_json_obj['correlation_id']
|
||||
log_trace(correlation_id, "Processing received message")
|
||||
|
||||
# Process all payloads in the envelope
|
||||
payloads_list = []
|
||||
num_payloads = len(env_json_obj['payloads'])
|
||||
|
||||
for i in range(num_payloads):
|
||||
payload_obj = env_json_obj['payloads'][i]
|
||||
transport = payload_obj['transport']
|
||||
dataname = payload_obj['dataname']
|
||||
|
||||
if transport == 'direct':
|
||||
log_trace(correlation_id, f"Direct transport - decoding payload '{dataname}'")
|
||||
|
||||
# Extract base64 payload from the payload
|
||||
payload_b64 = payload_obj['data']
|
||||
|
||||
# Decode Base64 payload
|
||||
payload_bytes = base64.b64decode(payload_b64)
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = payload_obj['payload_type']
|
||||
data = _deserialize_data(payload_bytes, data_type)
|
||||
|
||||
payloads_list.append((dataname, data, data_type))
|
||||
elif transport == 'link':
|
||||
# Extract download URL from the payload
|
||||
url = payload_obj['data']
|
||||
log_trace(correlation_id, f"Link transport - fetching '{dataname}' from URL: {url}")
|
||||
|
||||
# Fetch with exponential backoff using the download handler
|
||||
fileserver_download_handler = kwargs.get('fileserver_download_handler', _sync_fileserver_download)
|
||||
max_retries = kwargs.get('max_retries', 3)
|
||||
base_delay = kwargs.get('base_delay', 100)
|
||||
max_delay = kwargs.get('max_delay', 1000)
|
||||
|
||||
downloaded_data = fileserver_download_handler(
|
||||
url,
|
||||
max_retries,
|
||||
base_delay,
|
||||
max_delay,
|
||||
correlation_id
|
||||
)
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = payload_obj['payload_type']
|
||||
data = _deserialize_data(downloaded_data, data_type)
|
||||
|
||||
payloads_list.append((dataname, data, data_type))
|
||||
else:
|
||||
raise ValueError(f"Unknown transport type for payload '{dataname}': {transport}")
|
||||
|
||||
env_json_obj['payloads'] = payloads_list
|
||||
return env_json_obj
|
||||
|
||||
|
||||
# ---------------------------------------------- Module Exports ---------------------------------------------- #
|
||||
|
||||
class msghandler:
|
||||
"""
|
||||
MicroPython NATS bridge implementation.
|
||||
|
||||
This class provides a convenient interface for msghandler functionality,
|
||||
encapsulating the main functions and providing a class-based API.
|
||||
|
||||
Note:
|
||||
MicroPython has significant constraints:
|
||||
- No Arrow IPC support (memory constraints)
|
||||
- Only direct transport (< 100KB threshold enforced)
|
||||
- Simplified UUID generation
|
||||
- No async/await (synchronous API)
|
||||
"""
|
||||
|
||||
DEFAULT_SIZE_THRESHOLD = DEFAULT_SIZE_THRESHOLD
|
||||
DEFAULT_BROKER_URL = DEFAULT_BROKER_URL
|
||||
DEFAULT_FILESERVER_URL = DEFAULT_FILESERVER_URL
|
||||
MAX_PAYLOAD_SIZE = MAX_PAYLOAD_SIZE
|
||||
|
||||
def __init__(self, broker_url=None, fileserver_url=None):
|
||||
"""
|
||||
Initialize msghandler.
|
||||
|
||||
Args:
|
||||
broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL)
|
||||
fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL)
|
||||
"""
|
||||
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
|
||||
self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL
|
||||
|
||||
def smartsend(self, subject, data, **kwargs):
|
||||
"""
|
||||
Send data via NATS.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
data: List of (dataname, data, type) tuples
|
||||
**kwargs: Additional options passed to smartsend
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
"""
|
||||
kwargs['broker_url'] = kwargs.get('broker_url', self.broker_url)
|
||||
kwargs['fileserver_url'] = kwargs.get('fileserver_url', self.fileserver_url)
|
||||
return smartsend(subject, data, **kwargs)
|
||||
|
||||
def smartreceive(self, msg, **kwargs):
|
||||
"""
|
||||
Receive and process NATS message.
|
||||
|
||||
Args:
|
||||
msg: NATS message to process
|
||||
**kwargs: Additional options passed to smartreceive
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
return smartreceive(msg, **kwargs)
|
||||
|
||||
|
||||
# Convenience functions for module-level usage
|
||||
def send(subject, data, **kwargs):
|
||||
"""
|
||||
Convenience function for sending data.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
data: List of (dataname, data, type) tuples
|
||||
**kwargs: Additional options
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
"""
|
||||
return smartsend(subject, data, **kwargs)
|
||||
|
||||
|
||||
def receive(msg, **kwargs):
|
||||
"""
|
||||
Convenience function for receiving messages.
|
||||
|
||||
Args:
|
||||
msg: NATS message to process
|
||||
**kwargs: Additional options
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
return smartreceive(msg, **kwargs)
|
||||
|
||||
|
||||
__all__ = [
|
||||
'smartsend',
|
||||
'smartreceive',
|
||||
'msghandler',
|
||||
'send',
|
||||
'receive',
|
||||
'DEFAULT_SIZE_THRESHOLD',
|
||||
'DEFAULT_BROKER_URL',
|
||||
'DEFAULT_FILESERVER_URL',
|
||||
'MAX_PAYLOAD_SIZE',
|
||||
'NATSClient',
|
||||
'_serialize_data',
|
||||
'_deserialize_data',
|
||||
'log_trace',
|
||||
'_sync_fileserver_upload',
|
||||
'_sync_fileserver_download'
|
||||
]
|
||||
942
src/natsbridge_ssr.js
Normal file
942
src/natsbridge_ssr.js
Normal file
@@ -0,0 +1,942 @@
|
||||
/**
|
||||
* msghandler - Cross-Platform Bi-Directional Data Bridge
|
||||
* JavaScript/Node.js Implementation (Desktop/Server-Side)
|
||||
*
|
||||
* This module provides functionality for sending and receiving data across network boundaries
|
||||
* using NATS as the message bus, with support for both direct payload transport and
|
||||
* URL-based transport for larger payloads.
|
||||
*
|
||||
* Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||
*
|
||||
* Node.js-specific features:
|
||||
* - Apache Arrow IPC support via apache-arrow
|
||||
* - TCP NATS connections (nats:// or tls:// URLs)
|
||||
* - Buffer for binary data handling
|
||||
* - Connection pooling for high-throughput scenarios
|
||||
*
|
||||
* @module msghandler
|
||||
*/
|
||||
|
||||
const nats = require('nats');
|
||||
const crypto = require('crypto');
|
||||
// Use native fetch available in Node.js 18+
|
||||
const arrow = require('apache-arrow');
|
||||
|
||||
// ---------------------------------------------- UUID Helper ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Generate UUID v4 using crypto module (Node.js compatible)
|
||||
* @returns {string} UUID string
|
||||
*/
|
||||
function uuidv4() {
|
||||
return crypto.randomUUID();
|
||||
}
|
||||
|
||||
// ---------------------------------------------- Constants ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Default size threshold for switching from direct to link transport (0.5MB)
|
||||
*/
|
||||
const DEFAULT_SIZE_THRESHOLD = 500_000;
|
||||
|
||||
/**
|
||||
* Default NATS server URL
|
||||
*/
|
||||
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
|
||||
|
||||
/**
|
||||
* Default HTTP file server URL for link transport
|
||||
*/
|
||||
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
|
||||
|
||||
// ---------------------------------------------- Utility Functions ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Convert Buffer to Base64 string
|
||||
* @param {Buffer} buffer - Buffer to encode
|
||||
* @returns {string} Base64 encoded string
|
||||
*/
|
||||
function bufferToBase64(buffer) {
|
||||
return buffer.toString('base64');
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a trace message with correlation ID and timestamp
|
||||
* @param {string} correlationId - Correlation ID for tracing
|
||||
* @param {string} message - Message content to log
|
||||
*/
|
||||
function logTrace(correlationId, message) {
|
||||
const timestamp = new Date().toISOString();
|
||||
console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`);
|
||||
}
|
||||
|
||||
// ---------------------------------------------- Serialization Functions ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Serialize data according to specified format
|
||||
* @param {any} data - Data to serialize
|
||||
* @param {string} payloadType - Target format: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||
* @returns {Buffer} Binary representation of the serialized data
|
||||
*/
|
||||
async function serializeData(data, payloadType) {
|
||||
if (payloadType === 'text') {
|
||||
if (typeof data === 'string') {
|
||||
return Buffer.from(data, 'utf8');
|
||||
} else {
|
||||
throw new Error('Text data must be a string');
|
||||
}
|
||||
} else if (payloadType === 'dictionary') {
|
||||
const jsonStr = JSON.stringify(data);
|
||||
return Buffer.from(jsonStr, 'utf8');
|
||||
} else if (payloadType === 'arrowtable') {
|
||||
// Convert array of objects to Arrow IPC format
|
||||
if (!Array.isArray(data) || data.length === 0) {
|
||||
throw new Error('Arrow table data must be a non-empty array of objects');
|
||||
}
|
||||
|
||||
return serializeArrowTable(data);
|
||||
} else if (payloadType === 'jsontable') {
|
||||
// Serialize array of objects to JSON format
|
||||
if (!Array.isArray(data)) {
|
||||
throw new Error('JSON table data must be an array');
|
||||
}
|
||||
const jsonStr = JSON.stringify(data);
|
||||
return Buffer.from(jsonStr, 'utf8');
|
||||
} else if (payloadType === 'image') {
|
||||
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
||||
return Buffer.from(data);
|
||||
} else {
|
||||
throw new Error('Image data must be Uint8Array or Buffer');
|
||||
}
|
||||
} else if (payloadType === 'audio') {
|
||||
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
||||
return Buffer.from(data);
|
||||
} else {
|
||||
throw new Error('Audio data must be Uint8Array or Buffer');
|
||||
}
|
||||
} else if (payloadType === 'video') {
|
||||
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
||||
return Buffer.from(data);
|
||||
} else {
|
||||
throw new Error('Video data must be Uint8Array or Buffer');
|
||||
}
|
||||
} else if (payloadType === 'binary') {
|
||||
if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
|
||||
return Buffer.from(data);
|
||||
} else {
|
||||
throw new Error('Binary data must be Uint8Array or Buffer');
|
||||
}
|
||||
} else {
|
||||
throw new Error(`Unknown payload_type: ${payloadType}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to properly serialize table data to Arrow IPC
|
||||
* @param {Array<Object>} data - Array of objects representing table rows
|
||||
* @returns {Buffer} Arrow IPC formatted buffer
|
||||
*/
|
||||
function serializeArrowTable(data) {
|
||||
if (!Array.isArray(data) || data.length === 0) {
|
||||
throw new Error('Table data must be a non-empty array of objects');
|
||||
}
|
||||
|
||||
logTrace('serializeArrowTable', `Serializing table with ${data.length} rows`);
|
||||
|
||||
// Use arrow.tableFromArrays which handles the conversion properly
|
||||
// Convert array of objects to a key-value format expected by tableFromArrays
|
||||
const columns = {};
|
||||
for (const key of Object.keys(data[0])) {
|
||||
columns[key] = data.map(row => row[key]);
|
||||
}
|
||||
|
||||
logTrace('serializeArrowTable', `Columns: ${Object.keys(columns).join(', ')}`);
|
||||
|
||||
const table = arrow.tableFromArrays(columns);
|
||||
|
||||
logTrace('serializeArrowTable', `Arrow table created with ${table.numRows} rows, ${table.numCols} cols`);
|
||||
|
||||
// Convert to IPC format
|
||||
const ipcBuffer = arrow.tableToIPC(table);
|
||||
|
||||
logTrace('serializeArrowTable', `IPC buffer type: ${typeof ipcBuffer}, length: ${ipcBuffer.byteLength}`);
|
||||
|
||||
const resultBuffer = Buffer.from(ipcBuffer);
|
||||
logTrace('serializeArrowTable', `Result buffer: ${resultBuffer.length} bytes`);
|
||||
|
||||
// Debug: Show first 20 bytes in hex
|
||||
const hexPreview = resultBuffer.slice(0, 20).toString('hex');
|
||||
logTrace('serializeArrowTable', `First 20 bytes (hex): ${hexPreview}`);
|
||||
|
||||
return resultBuffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize bytes to data based on type
|
||||
* @param {Buffer|Uint8Array} data - Serialized data as bytes
|
||||
* @param {string} payloadType - Data type
|
||||
* @param {string} correlationId - Correlation ID for logging
|
||||
* @returns {any} Deserialized data
|
||||
*/
|
||||
async function deserializeData(data, payloadType, correlationId) {
|
||||
const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data);
|
||||
|
||||
logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`);
|
||||
|
||||
// Debug: Show first 20 bytes in hex for binary data
|
||||
if (payloadType === 'arrowtable' || payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') {
|
||||
const hexPreview = buffer.slice(0, 20).toString('hex');
|
||||
logTrace(correlationId, `deserializeData: First 20 bytes (hex): ${hexPreview}`);
|
||||
}
|
||||
|
||||
if (payloadType === 'text') {
|
||||
const result = buffer.toString('utf8');
|
||||
logTrace(correlationId, `deserializeData: text result length=${result.length}`);
|
||||
return result;
|
||||
} else if (payloadType === 'dictionary') {
|
||||
const jsonStr = buffer.toString('utf8');
|
||||
const result = JSON.parse(jsonStr);
|
||||
logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`);
|
||||
return result;
|
||||
} else if (payloadType === 'arrowtable') {
|
||||
logTrace(correlationId, `deserializeData: Attempting Arrow table deserialization`);
|
||||
|
||||
// Debug: Check available arrow methods
|
||||
logTrace(correlationId, `deserializeData: arrow.tableFromRawBytes exists: ${typeof arrow.tableFromRawBytes}`);
|
||||
logTrace(correlationId, `deserializeData: arrow.tableFromIPC exists: ${typeof arrow.tableFromIPC}`);
|
||||
|
||||
try {
|
||||
// Try tableFromRawBytes first (older API)
|
||||
if (typeof arrow.tableFromRawBytes === 'function') {
|
||||
logTrace(correlationId, `deserializeData: Using tableFromRawBytes`);
|
||||
const table = arrow.tableFromRawBytes(buffer);
|
||||
logTrace(correlationId, `deserializeData: Arrow table - rows=${table.numRows}, cols=${table.numCols}`);
|
||||
return table;
|
||||
}
|
||||
} catch (e) {
|
||||
logTrace(correlationId, `deserializeData: tableFromRawBytes failed: ${e.message}`);
|
||||
}
|
||||
|
||||
try {
|
||||
// Try tableFromIPC (newer API)
|
||||
if (typeof arrow.tableFromIPC === 'function') {
|
||||
logTrace(correlationId, `deserializeData: Using tableFromIPC`);
|
||||
const table = arrow.tableFromIPC(buffer);
|
||||
logTrace(correlationId, `deserializeData: Arrow table from IPC - rows=${table.numRows}, cols=${table.numCols}`);
|
||||
return table;
|
||||
}
|
||||
} catch (e) {
|
||||
logTrace(correlationId, `deserializeData: tableFromIPC failed: ${e.message}`);
|
||||
}
|
||||
|
||||
throw new Error(`Unable to deserialize Arrow table: neither tableFromRawBytes nor tableFromIPC worked`);
|
||||
} else if (payloadType === 'jsontable') {
|
||||
const jsonStr = buffer.toString('utf8');
|
||||
const result = JSON.parse(jsonStr);
|
||||
logTrace(correlationId, `deserializeData: jsontable result length=${Array.isArray(result) ? result.length : 'N/A'}`);
|
||||
return result;
|
||||
} else if (payloadType === 'image') {
|
||||
logTrace(correlationId, `deserializeData: image buffer length=${buffer.length}`);
|
||||
return buffer;
|
||||
} else if (payloadType === 'audio') {
|
||||
logTrace(correlationId, `deserializeData: audio buffer length=${buffer.length}`);
|
||||
return buffer;
|
||||
} else if (payloadType === 'video') {
|
||||
logTrace(correlationId, `deserializeData: video buffer length=${buffer.length}`);
|
||||
return buffer;
|
||||
} else if (payloadType === 'binary') {
|
||||
logTrace(correlationId, `deserializeData: binary buffer length=${buffer.length}`);
|
||||
return buffer;
|
||||
} else {
|
||||
throw new Error(`Unknown payload_type: ${payloadType}`);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------- File Server Handlers ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Upload data to plik server in one-shot mode
|
||||
* @param {string} fileServerUrl - Base URL of the plik server
|
||||
* @param {string} dataname - Name of the file being uploaded
|
||||
* @param {Buffer|Uint8Array} data - Raw byte data of the file content
|
||||
* @returns {Promise<{status: number, uploadid: string, fileid: string, url: string}>}
|
||||
*/
|
||||
async function plikOneshotUpload(fileServerUrl, dataname, data) {
|
||||
const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data);
|
||||
|
||||
// Get upload id
|
||||
const urlGetUploadID = `${fileServerUrl}/upload`;
|
||||
const headers = { 'Content-Type': 'application/json' };
|
||||
const body = JSON.stringify({ OneShot: true });
|
||||
|
||||
const httpResponse = await fetch(urlGetUploadID, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body
|
||||
});
|
||||
|
||||
const responseJson = await httpResponse.json();
|
||||
const uploadid = responseJson.id;
|
||||
const uploadtoken = responseJson.uploadToken;
|
||||
|
||||
// Upload file
|
||||
const urlUpload = `${fileServerUrl}/file/${uploadid}`;
|
||||
const form = new FormData();
|
||||
const blob = new Blob([buffer], { type: 'application/octet-stream' });
|
||||
form.append('file', blob, dataname);
|
||||
|
||||
const uploadHeaders = {
|
||||
'X-UploadToken': uploadtoken
|
||||
};
|
||||
|
||||
const uploadResponse = await fetch(urlUpload, {
|
||||
method: 'POST',
|
||||
headers: uploadHeaders,
|
||||
body: form
|
||||
});
|
||||
|
||||
const uploadJson = await uploadResponse.json();
|
||||
const fileid = uploadJson.id;
|
||||
|
||||
const url = `${fileServerUrl}/file/${uploadid}/${fileid}/${dataname}`;
|
||||
|
||||
return {
|
||||
status: uploadResponse.status,
|
||||
uploadid,
|
||||
fileid,
|
||||
url
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch data from URL with exponential backoff
|
||||
* @param {string} url - URL to fetch from
|
||||
* @param {number} maxRetries - Maximum number of retry attempts
|
||||
* @param {number} baseDelay - Initial delay in milliseconds
|
||||
* @param {number} maxDelay - Maximum delay in milliseconds
|
||||
* @param {string} correlationId - Correlation ID for logging
|
||||
* @returns {Promise<Uint8Array>} Fetched data as bytes
|
||||
*/
|
||||
async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) {
|
||||
let delay = baseDelay;
|
||||
|
||||
for (let attempt = 1; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
const response = await fetch(url);
|
||||
|
||||
if (response.status === 200) {
|
||||
logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`);
|
||||
const arrayBuffer = await response.arrayBuffer();
|
||||
return new Uint8Array(arrayBuffer);
|
||||
} else {
|
||||
throw new Error(`Failed to fetch: ${response.status}`);
|
||||
}
|
||||
} catch (e) {
|
||||
logTrace(correlationId, `Attempt ${attempt} failed: ${e.constructor.name} - ${e.message}`);
|
||||
|
||||
if (attempt < maxRetries) {
|
||||
await new Promise(resolve => setTimeout(resolve, delay));
|
||||
delay = Math.min(delay * 2, maxDelay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(`Failed to fetch data after ${maxRetries} attempts`);
|
||||
}
|
||||
|
||||
// ---------------------------------------------- NATS Client ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* NATS client wrapper for connection management
|
||||
* Supports both single-use and persistent connection modes
|
||||
*/
|
||||
class NATSClient {
|
||||
/**
|
||||
* Create a new NATS client
|
||||
* @param {string} url - NATS server URL (nats:// or tls://)
|
||||
* @param {boolean} [keepAlive=false] - Keep connection open for multiple publishes
|
||||
*/
|
||||
constructor(url, keepAlive = false) {
|
||||
this.url = url;
|
||||
this.connection = null;
|
||||
this.keepAlive = keepAlive;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to NATS server
|
||||
* @returns {Promise<NATS.Connection>}
|
||||
*/
|
||||
async connect() {
|
||||
if (this.connection) {
|
||||
return this.connection;
|
||||
}
|
||||
this.connection = await nats.connect({ servers: this.url });
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish message to NATS subject
|
||||
* @param {string} subject - NATS subject to publish to
|
||||
* @param {string} message - Message to publish
|
||||
* @param {string} correlationId - Correlation ID for logging
|
||||
*/
|
||||
async publish(subject, message, correlationId) {
|
||||
if (!this.connection) {
|
||||
await this.connect();
|
||||
}
|
||||
await this.connection.publish(subject, message);
|
||||
logTrace(correlationId, `Message published to ${subject}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the NATS connection
|
||||
*/
|
||||
async close() {
|
||||
if (this.connection) {
|
||||
this.connection.close();
|
||||
this.connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current connection (for external use)
|
||||
* @returns {NATS.Connection|null}
|
||||
*/
|
||||
getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if connected
|
||||
* @returns {boolean}
|
||||
*/
|
||||
isConnected() {
|
||||
return this.connection !== null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connection pool for managing multiple NATS connections
|
||||
* Useful for applications with multiple concurrent publishers
|
||||
*/
|
||||
class NATSConnectionPool {
|
||||
/**
|
||||
* Create a new connection pool
|
||||
* @param {string} url - NATS server URL (nats:// or tls://)
|
||||
* @param {number} [maxSize=10] - Maximum pool size
|
||||
*/
|
||||
constructor(url, maxSize = 10) {
|
||||
this.url = url;
|
||||
this.maxSize = maxSize;
|
||||
this.connections = new Map();
|
||||
this.idCounter = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a connection from the pool (or create new)
|
||||
* @returns {Promise<NATSClient>}
|
||||
*/
|
||||
async acquire() {
|
||||
// Try to find an existing idle connection
|
||||
for (const [id, client] of this.connections) {
|
||||
if (client.isConnected()) {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
// Create new connection if under limit
|
||||
if (this.connections.size < this.maxSize) {
|
||||
const id = `conn_${++this.idCounter}`;
|
||||
const client = new NATSClient(this.url, true);
|
||||
await client.connect();
|
||||
this.connections.set(id, client);
|
||||
return client;
|
||||
}
|
||||
|
||||
// Pool exhausted - create new connection (caller should close when done)
|
||||
const client = new NATSClient(this.url, false);
|
||||
await client.connect();
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a connection to the pool
|
||||
* @param {NATSClient} client - Connection to return
|
||||
*/
|
||||
release(client) {
|
||||
// Only return persistent connections
|
||||
if (client.keepAlive && client.isConnected()) {
|
||||
// Connection already in pool, do nothing
|
||||
return;
|
||||
}
|
||||
// Non-persistent connection - close it
|
||||
client.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all connections in the pool
|
||||
*/
|
||||
async closeAll() {
|
||||
for (const [id, client] of this.connections) {
|
||||
await client.close();
|
||||
}
|
||||
this.connections.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------- Core Functions ---------------------------------------------- //
|
||||
|
||||
/**
|
||||
* Publish message to NATS
|
||||
* @param {string|NATSClient|NATS.Connection} brokerUrlOrClient - NATS URL, client, or connection
|
||||
* @param {string} subject - NATS subject to publish to
|
||||
* @param {string} message - JSON message to publish
|
||||
* @param {string} correlationId - Correlation ID for tracing
|
||||
* @param {boolean} [closeConnection=true] - Close connection after publish (set false for persistent connections)
|
||||
*/
|
||||
async function publishMessage(brokerUrlOrClient, subject, message, correlationId, closeConnection = true) {
|
||||
let conn;
|
||||
let shouldClose = false;
|
||||
|
||||
if (brokerUrlOrClient instanceof NATSClient) {
|
||||
conn = brokerUrlOrClient;
|
||||
} else if (brokerUrlOrClient && typeof brokerUrlOrClient.publish === 'function') {
|
||||
// Create a wrapper for direct connection (duck-typing check for NATS connection)
|
||||
conn = {
|
||||
async publish(subj, msg) {
|
||||
await brokerUrlOrClient.publish(subj, msg);
|
||||
},
|
||||
async close() {
|
||||
await brokerUrlOrClient.close();
|
||||
}
|
||||
};
|
||||
shouldClose = true;
|
||||
} else {
|
||||
// String URL - create new client
|
||||
const client = new NATSClient(brokerUrlOrClient);
|
||||
conn = client;
|
||||
shouldClose = true;
|
||||
}
|
||||
|
||||
await conn.publish(subject, message, correlationId);
|
||||
|
||||
// Only close if explicitly requested and it's a short-lived client
|
||||
if (shouldClose && closeConnection && conn instanceof NATSClient) {
|
||||
await conn.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build message envelope from payloads and metadata
|
||||
* @param {string} subject - NATS subject
|
||||
* @param {Array} payloads - Array of payload objects
|
||||
* @param {Object} options - Envelope metadata options
|
||||
* @returns {Object} Envelope object
|
||||
*/
|
||||
function buildEnvelope(subject, payloads, options) {
|
||||
return {
|
||||
correlation_id: options.correlation_id,
|
||||
msg_id: options.msg_id,
|
||||
timestamp: new Date().toISOString(),
|
||||
send_to: subject,
|
||||
msg_purpose: options.msg_purpose,
|
||||
sender_name: options.sender_name,
|
||||
sender_id: options.sender_id,
|
||||
receiver_name: options.receiver_name,
|
||||
receiver_id: options.receiver_id,
|
||||
reply_to: options.reply_to,
|
||||
reply_to_msg_id: options.reply_to_msg_id,
|
||||
broker_url: options.broker_url,
|
||||
metadata: options.metadata || {},
|
||||
payloads: payloads
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build payload object from serialized data
|
||||
* @param {string} dataname - Name of the payload
|
||||
* @param {string} payloadType - Type of the payload
|
||||
* @param {Buffer} payloadBytes - Serialized payload bytes
|
||||
* @param {string} transport - Transport type ("direct" or "link")
|
||||
* @param {string} data - Data (base64 for direct, URL for link)
|
||||
* @returns {Object} Payload object
|
||||
*/
|
||||
function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
|
||||
// Determine encoding based on payload type (matching Julia implementation)
|
||||
let encoding = 'base64';
|
||||
if (payloadType === 'jsontable') {
|
||||
encoding = 'json';
|
||||
} else if (payloadType === 'arrowtable') {
|
||||
encoding = 'arrow-ipc';
|
||||
}
|
||||
|
||||
return {
|
||||
id: uuidv4(),
|
||||
dataname,
|
||||
payload_type: payloadType,
|
||||
transport,
|
||||
encoding,
|
||||
size: payloadBytes.byteLength,
|
||||
data,
|
||||
metadata: transport === 'direct' ? { payload_bytes: payloadBytes.byteLength } : {}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data via NATS with automatic transport selection
|
||||
*
|
||||
* This function intelligently routes data delivery based on payload size.
|
||||
* If the serialized payload is smaller than size_threshold, it encodes the data as Base64
|
||||
* and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
|
||||
* and publishes only the download URL over NATS.
|
||||
*
|
||||
* @param {string} subject - NATS subject to publish the message to
|
||||
* @param {Array} data - List of [dataname, data, type] tuples to send
|
||||
* - type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||
* @param {Object} options - Optional configuration
|
||||
* @param {string} [options.broker_url=DEFAULT_BROKER_URL] - URL of the NATS server
|
||||
* @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server
|
||||
* @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads
|
||||
* @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport
|
||||
* @param {string} [options.correlation_id=crypto.randomUUID()] - Correlation ID for tracing
|
||||
* @param {string} [options.msg_purpose="chat"] - Purpose of the message
|
||||
* @param {string} [options.sender_name="msghandler"] - Name of the sender
|
||||
* @param {string} [options.receiver_name=""] - Name of the receiver (empty means broadcast)
|
||||
* @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast)
|
||||
* @param {string} [options.reply_to=""] - Topic to reply to
|
||||
* @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to
|
||||
* @param {boolean} [options.is_publish=true] - Whether to automatically publish the message
|
||||
* @param {NATSClient|NATS.Connection} [options.nats_connection=null] - Pre-existing NATS connection
|
||||
* @param {string} [options.msg_id=crypto.randomUUID()] - Message ID
|
||||
* @param {string} [options.sender_id=crypto.randomUUID()] - Sender ID
|
||||
* @returns {Promise<[Object, string]>} Tuple of [env, env_json_str]
|
||||
*
|
||||
* @example
|
||||
* // Send a single payload
|
||||
* const [env, envJsonStr] = await smartsend(
|
||||
* "/test",
|
||||
* [["dataname1", data1, "dictionary"]],
|
||||
* { broker_url: "nats://localhost:4222" }
|
||||
* );
|
||||
*
|
||||
* // Send multiple payloads
|
||||
* const [env, envJsonStr] = await smartsend(
|
||||
* "/test",
|
||||
* [
|
||||
* ["dataname1", data1, "dictionary"],
|
||||
* ["dataname2", data2, "arrowtable"]
|
||||
* ],
|
||||
* { broker_url: "nats://localhost:4222" }
|
||||
* );
|
||||
*
|
||||
* // Send with pre-existing connection
|
||||
* const client = await msghandler.NATSClient.connect("nats://localhost:4222");
|
||||
* const [env, envJsonStr] = await smartsend(
|
||||
* "/test",
|
||||
* [["data", myData, "text"]],
|
||||
* { nats_connection: client }
|
||||
* );
|
||||
*/
|
||||
async function smartsend(subject, data, options = {}) {
|
||||
const {
|
||||
broker_url = DEFAULT_BROKER_URL,
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler = plikOneshotUpload,
|
||||
size_threshold = DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id = uuidv4(),
|
||||
msg_purpose = 'chat',
|
||||
sender_name = 'msghandler',
|
||||
receiver_name = '',
|
||||
receiver_id = '',
|
||||
reply_to = '',
|
||||
reply_to_msg_id = '',
|
||||
is_publish = true,
|
||||
nats_connection = null,
|
||||
msg_id = uuidv4(),
|
||||
sender_id = uuidv4()
|
||||
} = options;
|
||||
|
||||
logTrace(correlation_id, `Starting smartsend for subject: ${subject}`);
|
||||
logTrace(correlation_id, `smartsend: data array length=${data.length}`);
|
||||
|
||||
// Debug: Log input data structure
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
const [dataname, payloadData, payloadType] = data[i];
|
||||
logTrace(correlation_id, `smartsend: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
}
|
||||
|
||||
// Process payloads
|
||||
const payloads = [];
|
||||
for (const [dataname, payloadData, payloadType] of data) {
|
||||
logTrace(correlation_id, `smartsend: Processing payload '${dataname}' type=${payloadType}`);
|
||||
logTrace(correlation_id, `smartsend: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
|
||||
const payloadBytes = await serializeData(payloadData, payloadType);
|
||||
const payloadSize = payloadBytes.byteLength;
|
||||
|
||||
logTrace(correlation_id, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);
|
||||
|
||||
// Debug: Show first 20 bytes of serialized data for table type
|
||||
if (payloadType === 'table') {
|
||||
const hexPreview = payloadBytes.slice(0, 20).toString('hex');
|
||||
logTrace(correlation_id, `Serialized table data first 20 bytes (hex): ${hexPreview}`);
|
||||
}
|
||||
|
||||
if (payloadSize < size_threshold) {
|
||||
// Direct path
|
||||
const payloadB64 = bufferToBase64(payloadBytes);
|
||||
logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes, base64 length=${payloadB64.length}`);
|
||||
|
||||
const payload = buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64);
|
||||
payloads.push(payload);
|
||||
} else {
|
||||
// Link path
|
||||
logTrace(correlation_id, `Using link transport, uploading to fileserver`);
|
||||
|
||||
const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes);
|
||||
|
||||
if (response.status !== 200) {
|
||||
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
|
||||
}
|
||||
|
||||
logTrace(correlation_id, `Uploaded to URL: ${response.url}`);
|
||||
|
||||
const payload = buildPayload(dataname, payloadType, payloadBytes, 'link', response.url);
|
||||
payloads.push(payload);
|
||||
}
|
||||
}
|
||||
|
||||
// Build envelope
|
||||
const env = buildEnvelope(subject, payloads, {
|
||||
correlation_id,
|
||||
msg_id,
|
||||
msg_purpose,
|
||||
sender_name,
|
||||
sender_id,
|
||||
receiver_name,
|
||||
receiver_id,
|
||||
reply_to,
|
||||
reply_to_msg_id,
|
||||
broker_url
|
||||
});
|
||||
|
||||
const env_json_str = JSON.stringify(env);
|
||||
|
||||
if (is_publish) {
|
||||
if (nats_connection) {
|
||||
await publishMessage(nats_connection, subject, env_json_str, correlation_id);
|
||||
} else {
|
||||
await publishMessage(broker_url, subject, env_json_str, correlation_id);
|
||||
}
|
||||
}
|
||||
|
||||
return [env, env_json_str];
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive and process NATS message
|
||||
*
|
||||
* This function processes incoming NATS messages, handling both direct transport
|
||||
* (base64 decoded payloads) and link transport (URL-based payloads).
|
||||
* It deserializes the data based on the transport type and returns the result.
|
||||
*
|
||||
* @param {Object} msg - NATS message object with payload property
|
||||
* @param {Object} options - Optional configuration
|
||||
* @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads
|
||||
* @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL
|
||||
* @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms
|
||||
* @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms
|
||||
* @returns {Promise<Object>} Envelope object with processed payloads
|
||||
*
|
||||
* @example
|
||||
* // Receive and process message
|
||||
* const env = await smartreceive(msg, {
|
||||
* fileserver_download_handler: fetchWithBackoff,
|
||||
* max_retries: 5,
|
||||
* base_delay: 100,
|
||||
* max_delay: 5000
|
||||
* });
|
||||
* // env.payloads is an Array of [dataname, data, type] arrays
|
||||
* for (const [dataname, data, type] of env.payloads) {
|
||||
* console.log(`${dataname}: ${data} (type: ${type})`);
|
||||
* }
|
||||
*/
|
||||
async function smartreceive(msg, options = {}) {
|
||||
const {
|
||||
fileserver_download_handler = fetchWithBackoff,
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
} = options;
|
||||
|
||||
// Debug: Log message object structure
|
||||
logTrace('smartreceive', `smartreceive: msg object keys: ${Object.keys(msg).join(', ')}`);
|
||||
logTrace('smartreceive', `smartreceive: msg.data type: ${typeof msg.data}, constructor: ${msg.data?.constructor?.name}`);
|
||||
logTrace('smartreceive', `smartreceive: msg.payload type: ${typeof msg.payload}, constructor: ${msg.payload?.constructor?.name}`);
|
||||
|
||||
// Parse the JSON envelope
|
||||
// NATS.js v2.x uses msg.data instead of msg.payload
|
||||
let payload;
|
||||
if (msg.data !== undefined) {
|
||||
payload = typeof msg.data === 'string' ? msg.data : Buffer.from(msg.data).toString('utf8');
|
||||
} else if (msg.payload !== undefined) {
|
||||
payload = typeof msg.payload === 'string' ? msg.payload : Buffer.from(msg.payload).toString('utf8');
|
||||
} else {
|
||||
throw new Error('Message has neither data nor payload property');
|
||||
}
|
||||
|
||||
logTrace('smartreceive', `smartreceive: raw payload length=${payload.length}`);
|
||||
|
||||
// Debug: Show first 200 chars of payload
|
||||
const payloadPreview = payload.substring(0, 200);
|
||||
logTrace('smartreceive', `smartreceive: payload preview: ${payloadPreview}`);
|
||||
|
||||
let envJsonObj;
|
||||
try {
|
||||
envJsonObj = JSON.parse(payload);
|
||||
} catch (e) {
|
||||
logTrace('smartreceive', `smartreceive: JSON parse failed: ${e.message}`);
|
||||
throw e;
|
||||
}
|
||||
|
||||
logTrace(envJsonObj.correlation_id, 'Processing received message');
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: envelope has ${envJsonObj.payloads.length} payloads`);
|
||||
|
||||
// Process all payloads in the envelope
|
||||
const payloadsList = [];
|
||||
const numPayloads = envJsonObj.payloads.length;
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Processing ${numPayloads} payloads`);
|
||||
|
||||
for (let i = 0; i < numPayloads; i++) {
|
||||
const payloadObj = envJsonObj.payloads[i];
|
||||
const transport = payloadObj.transport;
|
||||
const dataname = payloadObj.dataname;
|
||||
const payloadType = payloadObj.payload_type;
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`);
|
||||
|
||||
if (transport === 'direct') {
|
||||
logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`);
|
||||
|
||||
// Extract base64 payload from the payload
|
||||
const payloadB64 = payloadObj.data;
|
||||
logTrace(envJsonObj.correlation_id, `Direct transport: base64 length=${payloadB64?.length}`);
|
||||
|
||||
// Decode Base64 payload
|
||||
const payloadBytes = Buffer.from(payloadB64, 'base64');
|
||||
logTrace(envJsonObj.correlation_id, `Direct transport: decoded bytes=${payloadBytes.length}`);
|
||||
|
||||
// Deserialize based on type
|
||||
const dataType = payloadObj.payload_type;
|
||||
const data = await deserializeData(payloadBytes, dataType, envJsonObj.correlation_id);
|
||||
logTrace(envJsonObj.correlation_id, `Direct transport: deserialized data type=${typeof data}, constructor=${data?.constructor?.name}`);
|
||||
|
||||
payloadsList.push([dataname, data, dataType]);
|
||||
} else if (transport === 'link') {
|
||||
// Extract download URL from the payload
|
||||
const url = payloadObj.data;
|
||||
logTrace(envJsonObj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`);
|
||||
|
||||
// Fetch with exponential backoff using the download handler
|
||||
const downloadedData = await fileserver_download_handler(
|
||||
url,
|
||||
max_retries,
|
||||
base_delay,
|
||||
max_delay,
|
||||
envJsonObj.correlation_id
|
||||
);
|
||||
|
||||
// Deserialize based on type
|
||||
const dataType = payloadObj.payload_type;
|
||||
const data = await deserializeData(downloadedData, dataType, envJsonObj.correlation_id);
|
||||
|
||||
payloadsList.push([dataname, data, dataType]);
|
||||
} else {
|
||||
throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`);
|
||||
}
|
||||
}
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Successfully processed all ${payloadsList.length} payloads`);
|
||||
envJsonObj.payloads = payloadsList;
|
||||
return envJsonObj;
|
||||
}
|
||||
|
||||
// ---------------------------------------------- Module Exports ---------------------------------------------- //
|
||||
|
||||
const msghandler = {
|
||||
/**
|
||||
* NATS client class for connection management
|
||||
* Supports both single-use and persistent connection modes
|
||||
*
|
||||
* @example
|
||||
* // Single-use connection (closes after publish)
|
||||
* const client = new msghandler.NATSClient("nats://localhost:4222");
|
||||
* await msghandler.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
|
||||
* await client.close();
|
||||
*
|
||||
* // Persistent connection (keeps connection open)
|
||||
* const client = new msghandler.NATSClient("nats://localhost:4222", true);
|
||||
* await client.connect();
|
||||
* await msghandler.smartsend("/test1", [["msg", "Hello", "text"]], { nats_connection: client, is_publish: false });
|
||||
* await msghandler.publishMessage(client, "/test2", JSON.stringify({msg: "World"}), "trace-id");
|
||||
* // Connection remains open for more publishes
|
||||
* await client.close();
|
||||
*/
|
||||
NATSClient,
|
||||
|
||||
/**
|
||||
* Connection pool for managing multiple NATS connections
|
||||
* Useful for applications with multiple concurrent publishers
|
||||
*
|
||||
* @example
|
||||
* const pool = new msghandler.NATSConnectionPool("nats://localhost:4222", 10);
|
||||
* const client = await pool.acquire();
|
||||
* await msghandler.smartsend("/test", [["msg", "Hello", "text"]], { nats_connection: client });
|
||||
* pool.release(client);
|
||||
* await pool.closeAll();
|
||||
*/
|
||||
NATSConnectionPool,
|
||||
|
||||
/**
|
||||
* Send data via NATS with automatic transport selection
|
||||
*/
|
||||
smartsend,
|
||||
|
||||
/**
|
||||
* Receive and process NATS message
|
||||
*/
|
||||
smartreceive,
|
||||
|
||||
/**
|
||||
* Publish message to NATS
|
||||
*
|
||||
* @example
|
||||
* // Using a persistent connection
|
||||
* const client = new msghandler.NATSClient("nats://localhost:4222", true);
|
||||
* await client.connect();
|
||||
* await msghandler.publishMessage(client, "/subject", JSON.stringify({msg: "Hello"}), "trace-id", false);
|
||||
* // Connection stays open for more publishes
|
||||
* await client.close();
|
||||
*/
|
||||
publishMessage,
|
||||
|
||||
/**
|
||||
* Upload data to plik server in one-shot mode
|
||||
*/
|
||||
plikOneshotUpload,
|
||||
|
||||
/**
|
||||
* Fetch data from URL with exponential backoff
|
||||
*/
|
||||
fetchWithBackoff,
|
||||
|
||||
/**
|
||||
* Default constants
|
||||
*/
|
||||
DEFAULT_SIZE_THRESHOLD,
|
||||
DEFAULT_BROKER_URL,
|
||||
DEFAULT_FILESERVER_URL
|
||||
};
|
||||
|
||||
module.exports = msghandler;
|
||||
Reference in New Issue
Block a user