843 lines
29 KiB
Python
843 lines
29 KiB
Python
"""
|
|
NATSBridge - Cross-Platform Bi-Directional Data Bridge
|
|
Python Desktop Implementation
|
|
|
|
This module provides functionality for sending and receiving data across network boundaries
|
|
using NATS as the message bus, with support for both direct payload transport and
|
|
URL-based transport for larger payloads.
|
|
|
|
@package natsbridge
|
|
"""
|
|
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Any, Callable, Dict, List, Tuple, Union
|
|
import aiohttp
|
|
|
|
try:
|
|
import pyarrow as arrow
|
|
import pyarrow.ipc as ipc
|
|
ARROW_AVAILABLE = True
|
|
except ImportError:
|
|
ARROW_AVAILABLE = False
|
|
|
|
try:
|
|
import nats
|
|
from nats.aio.client import Client as NATSClient
|
|
NATS_AVAILABLE = True
|
|
except ImportError:
|
|
NATS_AVAILABLE = False
|
|
|
|
# ---------------------------------------------- Constants ---------------------------------------------- #
|
|
|
|
"""
|
|
Default size threshold for switching from direct to link transport (1MB)
|
|
"""
|
|
DEFAULT_SIZE_THRESHOLD = 1_000_000
|
|
|
|
"""
|
|
Default NATS server URL
|
|
"""
|
|
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
|
|
|
"""
|
|
Default HTTP file server URL for link transport
|
|
"""
|
|
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
|
|
|
|
|
# ---------------------------------------------- Utility Functions ---------------------------------------------- #
|
|
|
|
def log_trace(correlation_id: str, message: str) -> None:
|
|
"""
|
|
Log a trace message with correlation ID and timestamp.
|
|
|
|
Args:
|
|
correlation_id: Correlation ID for tracing
|
|
message: Message content to log
|
|
"""
|
|
timestamp = datetime.utcnow().isoformat() + 'Z'
|
|
print(f"[{timestamp}] [Correlation: {correlation_id}] {message}")
|
|
|
|
|
|
# ---------------------------------------------- Serialization Functions ---------------------------------------------- #
|
|
|
|
def _serialize_data(data: Any, payload_type: str) -> bytes:
|
|
"""
|
|
Serialize data according to specified format.
|
|
|
|
Args:
|
|
data: Data to serialize (string for "text", JSON-serializable for "dictionary",
|
|
table-like for "arrowtable"/"jsontable", binary for "image", "audio", "video", "binary")
|
|
payload_type: Target format: "text", "dictionary", "arrowtable", "jsontable",
|
|
"image", "audio", "video", "binary"
|
|
|
|
Returns:
|
|
Binary representation of the serialized data
|
|
|
|
Raises:
|
|
Error: If payload_type is not one of the supported types
|
|
Error: If payload_type is "image", "audio", or "video" but data is not bytes
|
|
Error: If payload_type is "arrowtable" but data is not a pandas DataFrame or pyarrow Table
|
|
Error: If payload_type is "jsontable" but data is not a list of dicts
|
|
"""
|
|
if payload_type == 'text':
|
|
if isinstance(data, str):
|
|
return data.encode('utf-8')
|
|
else:
|
|
raise ValueError('Text data must be a string')
|
|
elif payload_type == 'dictionary':
|
|
json_str = json.dumps(data)
|
|
return json_str.encode('utf-8')
|
|
elif payload_type == 'arrowtable':
|
|
if not ARROW_AVAILABLE:
|
|
raise RuntimeError('pyarrow not available for arrowtable serialization')
|
|
|
|
import io
|
|
buf = io.BytesIO()
|
|
|
|
import pandas as pd
|
|
if isinstance(data, pd.DataFrame):
|
|
table = arrow.Table.from_pandas(data)
|
|
sink = ipc.new_file(buf, table.schema)
|
|
ipc.write_table(table, sink)
|
|
sink.close()
|
|
return buf.getvalue()
|
|
elif isinstance(data, arrow.Table):
|
|
sink = ipc.new_file(buf, data.schema)
|
|
ipc.write_table(data, sink)
|
|
sink.close()
|
|
return buf.getvalue()
|
|
else:
|
|
raise ValueError('Arrow table data must be a pandas DataFrame or pyarrow Table')
|
|
elif payload_type == 'jsontable':
|
|
# Serialize list of dicts to JSON format
|
|
if isinstance(data, list) and all(isinstance(row, dict) for row in data):
|
|
json_str = json.dumps(data)
|
|
return json_str.encode('utf-8')
|
|
else:
|
|
raise ValueError('JSON table data must be a list of dicts')
|
|
elif payload_type == 'image':
|
|
if isinstance(data, (bytes, bytearray)):
|
|
return bytes(data)
|
|
else:
|
|
raise ValueError('Image data must be bytes')
|
|
elif payload_type == 'audio':
|
|
if isinstance(data, (bytes, bytearray)):
|
|
return bytes(data)
|
|
else:
|
|
raise ValueError('Audio data must be bytes')
|
|
elif payload_type == 'video':
|
|
if isinstance(data, (bytes, bytearray)):
|
|
return bytes(data)
|
|
else:
|
|
raise ValueError('Video data must be bytes')
|
|
elif payload_type == 'binary':
|
|
if isinstance(data, (bytes, bytearray)):
|
|
return bytes(data)
|
|
else:
|
|
raise ValueError('Binary data must be bytes')
|
|
else:
|
|
raise ValueError(f'Unknown payload_type: {payload_type}')
|
|
|
|
|
|
def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any:
|
|
"""
|
|
Deserialize bytes to data based on type.
|
|
|
|
Args:
|
|
data: Serialized data as bytes
|
|
payload_type: Data type ("text", "dictionary", "arrowtable", "jsontable",
|
|
"image", "audio", "video", "binary")
|
|
correlation_id: Correlation ID for logging
|
|
|
|
Returns:
|
|
Deserialized data (String for "text", DataFrame for "arrowtable",
|
|
Vector{Dict} for "jsontable"/"dictionary", bytes for "image", "audio", "video", "binary")
|
|
|
|
Raises:
|
|
Error: If payload_type is not one of the supported types
|
|
"""
|
|
if payload_type == 'text':
|
|
return data.decode('utf-8')
|
|
elif payload_type == 'dictionary':
|
|
json_str = data.decode('utf-8')
|
|
return json.loads(json_str)
|
|
elif payload_type == 'arrowtable':
|
|
if not ARROW_AVAILABLE:
|
|
raise RuntimeError('pyarrow not available for arrowtable deserialization')
|
|
|
|
import io
|
|
buf = io.BytesIO(data)
|
|
reader = ipc.open_file(buf)
|
|
return reader.read_all().to_pandas()
|
|
elif payload_type == 'jsontable':
|
|
# Deserialize JSON to list of dicts
|
|
json_str = data.decode('utf-8')
|
|
return json.loads(json_str)
|
|
elif payload_type == 'image':
|
|
return data
|
|
elif payload_type == 'audio':
|
|
return data
|
|
elif payload_type == 'video':
|
|
return data
|
|
elif payload_type == 'binary':
|
|
return data
|
|
else:
|
|
raise ValueError(f'Unknown payload_type: {payload_type}')
|
|
|
|
|
|
# ---------------------------------------------- File Server Handlers ---------------------------------------------- #
|
|
|
|
async def plik_oneshot_upload(
|
|
file_server_url: str,
|
|
dataname: str,
|
|
data: bytes
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Upload data to plik server in one-shot mode.
|
|
|
|
This function uploads a raw byte array to a plik server in one-shot mode (no upload session).
|
|
It first creates a one-shot upload session by sending a POST request with {"OneShot": true},
|
|
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
|
|
|
|
Args:
|
|
file_server_url: Base URL of the plik server (e.g., "http://localhost:8080")
|
|
dataname: Name of the file being uploaded
|
|
data: Raw byte data of the file content
|
|
|
|
Returns:
|
|
Dict with keys:
|
|
- "status": HTTP server response status
|
|
- "uploadid": ID of the one-shot upload session
|
|
- "fileid": ID of the uploaded file within the session
|
|
- "url": Full URL to download the uploaded file
|
|
|
|
Example:
|
|
>>> fileserver_url = "http://localhost:8080"
|
|
>>> dataname = "test.txt"
|
|
>>> data = b"hello world"
|
|
>>> result = await plik_oneshot_upload(file_server_url, dataname, data)
|
|
>>> result["status"], result["uploadid"], result["fileid"], result["url"]
|
|
"""
|
|
async with aiohttp.ClientSession() as session:
|
|
# Get upload id
|
|
url_getUploadID = f"{file_server_url}/upload"
|
|
headers = {'Content-Type': 'application/json'}
|
|
body = json.dumps({"OneShot": True})
|
|
|
|
async with session.post(url_getUploadID, headers=headers, data=body) as response:
|
|
response_json = await response.json()
|
|
uploadid = response_json['id']
|
|
uploadtoken = response_json['uploadToken']
|
|
|
|
# Upload file
|
|
url_upload = f"{file_server_url}/file/{uploadid}"
|
|
headers = {'X-UploadToken': uploadtoken}
|
|
|
|
form = aiohttp.FormData()
|
|
form.add_field('file', data, filename=dataname, content_type='application/octet-stream')
|
|
|
|
async with session.post(url_upload, headers=headers, data=form) as upload_response:
|
|
upload_json = await upload_response.json()
|
|
fileid = upload_json['id']
|
|
|
|
url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}"
|
|
|
|
return {
|
|
'status': upload_response.status,
|
|
'uploadid': uploadid,
|
|
'fileid': fileid,
|
|
'url': url
|
|
}
|
|
|
|
|
|
async def fetch_with_backoff(
|
|
url: str,
|
|
max_retries: int,
|
|
base_delay: int,
|
|
max_delay: int,
|
|
correlation_id: str
|
|
) -> bytes:
|
|
"""
|
|
Fetch data from URL with exponential backoff.
|
|
|
|
This internal function retrieves data from a URL with retry logic using
|
|
exponential backoff to handle transient failures.
|
|
|
|
Args:
|
|
url: URL to fetch from
|
|
max_retries: Maximum number of retry attempts
|
|
base_delay: Initial delay in milliseconds
|
|
max_delay: Maximum delay in milliseconds
|
|
correlation_id: Correlation ID for logging
|
|
|
|
Returns:
|
|
Fetched data as bytes
|
|
|
|
Raises:
|
|
Error: If all retry attempts fail
|
|
|
|
Example:
|
|
>>> data = await fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "correlation123")
|
|
"""
|
|
delay = base_delay
|
|
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
log_trace(correlation_id, f"Successfully fetched data from {url} on attempt {attempt}")
|
|
return await response.read()
|
|
else:
|
|
raise Exception(f"Failed to fetch: {response.status}")
|
|
except Exception as e:
|
|
log_trace(correlation_id, f"Attempt {attempt} failed: {type(e).__name__}")
|
|
|
|
if attempt < max_retries:
|
|
await asyncio.sleep(delay / 1000.0)
|
|
delay = min(delay * 2, max_delay)
|
|
|
|
raise Exception(f"Failed to fetch data after {max_retries} attempts")
|
|
|
|
|
|
# ---------------------------------------------- NATS Client ---------------------------------------------- #
|
|
|
|
class NATSClient:
|
|
"""NATS client wrapper for connection management."""
|
|
|
|
def __init__(self, url: str = DEFAULT_BROKER_URL):
|
|
"""
|
|
Create a new NATS client.
|
|
|
|
Args:
|
|
url: NATS server URL
|
|
"""
|
|
self.url = url
|
|
self._client: NATSClient = None
|
|
|
|
async def connect(self) -> NATSClient:
|
|
"""
|
|
Connect to NATS server.
|
|
|
|
Returns:
|
|
NATS client instance
|
|
"""
|
|
if NATS_AVAILABLE:
|
|
self._client = nats.connect(self.url)
|
|
await self._client
|
|
else:
|
|
raise RuntimeError('nats-py not available')
|
|
return self._client
|
|
|
|
async def publish(self, subject: str, message: str, correlation_id: str = "") -> None:
|
|
"""
|
|
Publish message to NATS subject.
|
|
|
|
Args:
|
|
subject: NATS subject to publish to
|
|
message: Message to publish
|
|
correlation_id: Correlation ID for logging
|
|
"""
|
|
if self._client:
|
|
await self._client.publish(subject, message)
|
|
if correlation_id:
|
|
log_trace(correlation_id, f"Message published to {subject}")
|
|
|
|
async def close(self) -> None:
|
|
"""Close the NATS connection."""
|
|
if self._client:
|
|
await self._client.drain()
|
|
await self._client.close()
|
|
|
|
|
|
# ---------------------------------------------- Core Functions ---------------------------------------------- #
|
|
|
|
def _build_envelope(
|
|
subject: str,
|
|
payloads: List[Dict[str, Any]],
|
|
options: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build message envelope from payloads and metadata.
|
|
|
|
Args:
|
|
subject: NATS subject
|
|
payloads: Array of payload objects
|
|
options: Envelope metadata options
|
|
|
|
Returns:
|
|
Envelope object
|
|
"""
|
|
return {
|
|
'correlation_id': options['correlation_id'],
|
|
'msg_id': options['msg_id'],
|
|
'timestamp': datetime.utcnow().isoformat() + 'Z',
|
|
'send_to': subject,
|
|
'msg_purpose': options['msg_purpose'],
|
|
'sender_name': options['sender_name'],
|
|
'sender_id': options['sender_id'],
|
|
'receiver_name': options['receiver_name'],
|
|
'receiver_id': options['receiver_id'],
|
|
'reply_to': options['reply_to'],
|
|
'reply_to_msg_id': options['reply_to_msg_id'],
|
|
'broker_url': options['broker_url'],
|
|
'metadata': options.get('metadata', {}),
|
|
'payloads': payloads
|
|
}
|
|
|
|
|
|
def _build_payload(
|
|
dataname: str,
|
|
payload_type: str,
|
|
payload_bytes: bytes,
|
|
transport: str,
|
|
data: Union[str, bytes]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build payload object from serialized data.
|
|
|
|
Args:
|
|
dataname: Name of the payload
|
|
payload_type: Type of the payload
|
|
payload_bytes: Serialized payload bytes
|
|
transport: Transport type ("direct" or "link")
|
|
data: Data (base64 for direct, URL for link)
|
|
|
|
Returns:
|
|
Payload object
|
|
"""
|
|
# Determine encoding based on payload type (matching Julia/JS implementation)
|
|
encoding = 'base64'
|
|
if payload_type == 'jsontable':
|
|
encoding = 'json'
|
|
elif payload_type == 'arrowtable':
|
|
encoding = 'arrow-ipc'
|
|
|
|
return {
|
|
'id': str(uuid.uuid4()),
|
|
'dataname': dataname,
|
|
'payload_type': payload_type,
|
|
'transport': transport,
|
|
'encoding': encoding,
|
|
'size': len(payload_bytes),
|
|
'data': data,
|
|
'metadata': {'payload_bytes': len(payload_bytes)} if transport == 'direct' else {}
|
|
}
|
|
|
|
|
|
async def publish_message(
|
|
broker_url_or_client: Union[str, NATSClient, Any],
|
|
subject: str,
|
|
message: str,
|
|
correlation_id: str
|
|
) -> None:
|
|
"""
|
|
Publish message to NATS.
|
|
|
|
Args:
|
|
broker_url_or_client: NATS URL, client, or connection
|
|
subject: NATS subject to publish to
|
|
message: JSON message to publish
|
|
correlation_id: Correlation ID for tracing
|
|
"""
|
|
if isinstance(broker_url_or_client, NATSClient):
|
|
client = broker_url_or_client
|
|
elif NATS_AVAILABLE and hasattr(broker_url_or_client, 'publish'):
|
|
# Direct NATS client connection
|
|
await broker_url_or_client.publish(subject, message)
|
|
log_trace(correlation_id, f"Message published to {subject}")
|
|
return
|
|
else:
|
|
# String URL - create new client
|
|
client = NATSClient(broker_url_or_client)
|
|
await client.connect()
|
|
|
|
await client.publish(subject, message, correlation_id)
|
|
|
|
if isinstance(broker_url_or_client, NATSClient):
|
|
await broker_url_or_client.close()
|
|
elif not (NATS_AVAILABLE and hasattr(broker_url_or_client, 'publish')):
|
|
await client.close()
|
|
|
|
|
|
async def smartsend(
|
|
subject: str,
|
|
data: List[Tuple[str, Any, str]],
|
|
broker_url: str = DEFAULT_BROKER_URL,
|
|
fileserver_url: str = DEFAULT_FILESERVER_URL,
|
|
fileserver_upload_handler: Callable = plik_oneshot_upload,
|
|
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
|
|
correlation_id: str = None,
|
|
msg_purpose: str = "chat",
|
|
sender_name: str = "NATSBridge",
|
|
receiver_name: str = "",
|
|
receiver_id: str = "",
|
|
reply_to: str = "",
|
|
reply_to_msg_id: str = "",
|
|
is_publish: bool = True,
|
|
nats_connection: Any = None,
|
|
msg_id: str = None,
|
|
sender_id: str = None
|
|
) -> Tuple[Dict, str]:
|
|
"""
|
|
Send data via NATS with automatic transport selection.
|
|
|
|
This function intelligently routes data delivery based on payload size.
|
|
If the serialized payload is smaller than size_threshold, it encodes the data as Base64
|
|
and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
|
|
and publishes only the download URL over NATS.
|
|
|
|
Args:
|
|
subject: NATS subject to publish the message to
|
|
data: List of (dataname, data, type) tuples to send
|
|
- dataname: Name of the payload
|
|
- data: The actual data to send
|
|
- type: Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
|
broker_url: URL of the NATS server
|
|
fileserver_url: URL of the HTTP file server for large payloads
|
|
fileserver_upload_handler: Function to handle fileserver uploads (must return Dict with "status",
|
|
"uploadid", "fileid", "url" keys)
|
|
size_threshold: Threshold in bytes separating direct vs link transport
|
|
correlation_id: Correlation ID for tracing (auto-generated UUID if not provided)
|
|
msg_purpose: Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
|
sender_name: Name of the sender
|
|
receiver_name: Name of the receiver (empty string means broadcast)
|
|
receiver_id: UUID of the receiver (empty string means broadcast)
|
|
reply_to: Topic to reply to (empty string if no reply expected)
|
|
reply_to_msg_id: Message ID this message is replying to
|
|
is_publish: Whether to automatically publish the message to NATS
|
|
nats_connection: Pre-existing NATS connection (if provided, uses this connection instead of
|
|
creating a new one; saves connection establishment overhead)
|
|
msg_id: Message ID (auto-generated UUID if not provided)
|
|
sender_id: Sender ID (auto-generated UUID if not provided)
|
|
|
|
Returns:
|
|
Tuple of (env, env_json_str) where:
|
|
- env: Dict containing all metadata and payloads
|
|
- env_json_str: JSON string for publishing to NATS
|
|
|
|
Example:
|
|
>>> # Send a single payload (still wrapped in a list)
|
|
>>> data = {"key": "value"}
|
|
>>> env, env_json_str = await smartsend(
|
|
... "my.subject",
|
|
... [("dataname1", data, "dictionary")],
|
|
... broker_url="nats://localhost:4222"
|
|
... )
|
|
>>>
|
|
>>> # Send multiple payloads with different types
|
|
>>> data1 = {"key1": "value1"}
|
|
>>> data2 = [1, 2, 3, 4, 5]
|
|
>>> env, env_json_str = await smartsend(
|
|
... "my.subject",
|
|
... [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")]
|
|
... )
|
|
>>>
|
|
>>> # Send a large array using fileserver upload
|
|
>>> data = list(range(10_000_000)) # ~80 MB
|
|
>>> env, env_json_str = await smartsend(
|
|
... "large.data",
|
|
... [("large_table", data, "arrowtable")]
|
|
... )
|
|
>>>
|
|
>>> # Send jsontable (JSON format for human-readable tabular data)
|
|
>>> users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
|
|
>>> env, env_json_str = await smartsend(
|
|
... "json.data",
|
|
... [("users", users, "jsontable")]
|
|
... )
|
|
>>>
|
|
>>> # Mixed content (e.g., chat with text and image)
|
|
>>> env, env_json_str = await smartsend(
|
|
... "chat.subject",
|
|
... [
|
|
... ("message_text", "Hello!", "text"),
|
|
... ("user_image", image_data, "image"),
|
|
... ("audio_clip", audio_data, "audio")
|
|
... ]
|
|
... )
|
|
>>>
|
|
>>> # Publish the JSON string directly using NATS request-reply pattern
|
|
>>> # reply = await nats.request(broker_url, subject, env_json_str, reply_to=reply_to_topic)
|
|
"""
|
|
if correlation_id is None:
|
|
correlation_id = str(uuid.uuid4())
|
|
if msg_id is None:
|
|
msg_id = str(uuid.uuid4())
|
|
if sender_id is None:
|
|
sender_id = str(uuid.uuid4())
|
|
|
|
log_trace(correlation_id, f"Starting smartsend for subject: {subject}")
|
|
|
|
# Process payloads
|
|
payloads = []
|
|
for dataname, payload_data, payload_type in data:
|
|
payload_bytes = _serialize_data(payload_data, payload_type)
|
|
payload_size = len(payload_bytes)
|
|
|
|
log_trace(correlation_id, f"Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes")
|
|
|
|
if payload_size < size_threshold:
|
|
# Direct path
|
|
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
|
|
log_trace(correlation_id, f"Using direct transport for {payload_size} bytes")
|
|
|
|
payload = _build_payload(dataname, payload_type, payload_bytes, 'direct', payload_b64)
|
|
payloads.append(payload)
|
|
else:
|
|
# Link path
|
|
log_trace(correlation_id, "Using link transport, uploading to fileserver")
|
|
|
|
response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
|
|
|
if response['status'] != 200:
|
|
raise Exception(f"Failed to upload data to fileserver: {response['status']}")
|
|
|
|
log_trace(correlation_id, f"Uploaded to URL: {response['url']}")
|
|
|
|
payload = _build_payload(dataname, payload_type, payload_bytes, 'link', response['url'])
|
|
payloads.append(payload)
|
|
|
|
# Build envelope
|
|
env = _build_envelope(subject, payloads, {
|
|
'correlation_id': correlation_id,
|
|
'msg_id': msg_id,
|
|
'msg_purpose': msg_purpose,
|
|
'sender_name': sender_name,
|
|
'sender_id': sender_id,
|
|
'receiver_name': receiver_name,
|
|
'receiver_id': receiver_id,
|
|
'reply_to': reply_to,
|
|
'reply_to_msg_id': reply_to_msg_id,
|
|
'broker_url': broker_url
|
|
})
|
|
|
|
env_json_str = json.dumps(env)
|
|
|
|
if is_publish:
|
|
if nats_connection:
|
|
await publish_message(nats_connection, subject, env_json_str, correlation_id)
|
|
else:
|
|
await publish_message(broker_url, subject, env_json_str, correlation_id)
|
|
|
|
return env, env_json_str
|
|
|
|
|
|
async def smartreceive(
|
|
msg: Any,
|
|
fileserver_download_handler: Callable = fetch_with_backoff,
|
|
max_retries: int = 5,
|
|
base_delay: int = 100,
|
|
max_delay: int = 5000
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Receive and process NATS messages.
|
|
|
|
This function processes incoming NATS messages, handling both direct transport
|
|
(base64 decoded payloads) and link transport (URL-based payloads).
|
|
It deserializes the data based on the transport type and returns the result.
|
|
|
|
Args:
|
|
msg: NATS message to process
|
|
fileserver_download_handler: Function to handle downloading data from file server URLs
|
|
max_retries: Maximum retry attempts for fetching URL
|
|
base_delay: Initial delay for exponential backoff in ms
|
|
max_delay: Maximum delay for exponential backoff in ms
|
|
|
|
Returns:
|
|
Dict with envelope metadata and payloads field containing List[Tuple[str, Any, str]]
|
|
|
|
Example:
|
|
>>> # Receive and process message
|
|
>>> env = await smartreceive(msg, fileserver_download_handler=fetch_with_backoff)
|
|
>>> # env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
|
|
>>> # Access payloads: for dataname, data, type_ in env["payloads"]
|
|
>>> for dataname, data, type_ in env["payloads"]:
|
|
>>> print(f"{dataname}: {data} (type: {type_})")
|
|
"""
|
|
# Parse the JSON envelope
|
|
if isinstance(msg, dict):
|
|
# Already parsed
|
|
env_json_obj = msg
|
|
elif hasattr(msg, 'payload'):
|
|
# NATS message object
|
|
payload = msg.payload if isinstance(msg.payload, str) else msg.payload.decode('utf-8')
|
|
env_json_obj = json.loads(payload)
|
|
else:
|
|
# Assume it's already a JSON string or dict
|
|
env_json_obj = json.loads(msg) if isinstance(msg, str) else msg
|
|
|
|
log_trace(env_json_obj['correlation_id'], "Processing received message")
|
|
|
|
# Process all payloads in the envelope
|
|
payloads_list = []
|
|
num_payloads = len(env_json_obj['payloads'])
|
|
|
|
for i in range(num_payloads):
|
|
payload_obj = env_json_obj['payloads'][i]
|
|
transport = payload_obj['transport']
|
|
dataname = payload_obj['dataname']
|
|
|
|
if transport == 'direct':
|
|
log_trace(env_json_obj['correlation_id'], f"Direct transport - decoding payload '{dataname}'")
|
|
|
|
# Extract base64 payload from the payload
|
|
payload_b64 = payload_obj['data']
|
|
|
|
# Decode Base64 payload
|
|
payload_bytes = base64.b64decode(payload_b64)
|
|
|
|
# Deserialize based on type
|
|
data_type = payload_obj['payload_type']
|
|
data = _deserialize_data(payload_bytes, data_type, env_json_obj['correlation_id'])
|
|
|
|
payloads_list.append((dataname, data, data_type))
|
|
elif transport == 'link':
|
|
# Extract download URL from the payload
|
|
url = payload_obj['data']
|
|
log_trace(env_json_obj['correlation_id'], f"Link transport - fetching '{dataname}' from URL: {url}")
|
|
|
|
# Fetch with exponential backoff using the download handler
|
|
downloaded_data = await fileserver_download_handler(
|
|
url,
|
|
max_retries,
|
|
base_delay,
|
|
max_delay,
|
|
env_json_obj['correlation_id']
|
|
)
|
|
|
|
# Deserialize based on type
|
|
data_type = payload_obj['payload_type']
|
|
data = _deserialize_data(downloaded_data, data_type, env_json_obj['correlation_id'])
|
|
|
|
payloads_list.append((dataname, data, data_type))
|
|
else:
|
|
raise Exception(f"Unknown transport type for payload '{dataname}': {transport}")
|
|
|
|
env_json_obj['payloads'] = payloads_list
|
|
return env_json_obj
|
|
|
|
|
|
# ---------------------------------------------- Module Exports ---------------------------------------------- #
|
|
|
|
class NATSBridge:
|
|
"""
|
|
Cross-platform NATS bridge implementation.
|
|
|
|
This class provides a convenient interface for NATSBridge functionality,
|
|
encapsulating the main functions and providing a class-based API.
|
|
"""
|
|
|
|
DEFAULT_SIZE_THRESHOLD = DEFAULT_SIZE_THRESHOLD
|
|
DEFAULT_BROKER_URL = DEFAULT_BROKER_URL
|
|
DEFAULT_FILESERVER_URL = DEFAULT_FILESERVER_URL
|
|
|
|
def __init__(self, broker_url: str = None, fileserver_url: str = None):
|
|
"""
|
|
Initialize NATSBridge.
|
|
|
|
Args:
|
|
broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL)
|
|
fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL)
|
|
"""
|
|
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
|
|
self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL
|
|
|
|
async def smartsend(
|
|
self,
|
|
subject: str,
|
|
data: List[Tuple[str, Any, str]],
|
|
**kwargs
|
|
) -> Tuple[Dict, str]:
|
|
"""
|
|
Send data via NATS.
|
|
|
|
Args:
|
|
subject: NATS subject to publish to
|
|
data: List of (dataname, data, type) tuples
|
|
**kwargs: Additional options passed to smartsend
|
|
|
|
Returns:
|
|
Tuple of (env, env_json_str)
|
|
"""
|
|
kwargs['broker_url'] = kwargs.get('broker_url', self.broker_url)
|
|
kwargs['fileserver_url'] = kwargs.get('fileserver_url', self.fileserver_url)
|
|
return await smartsend(subject, data, **kwargs)
|
|
|
|
async def smartreceive(
|
|
self,
|
|
msg: Any,
|
|
**kwargs
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Receive and process NATS message.
|
|
|
|
Args:
|
|
msg: NATS message to process
|
|
**kwargs: Additional options passed to smartreceive
|
|
|
|
Returns:
|
|
Dict with envelope metadata and payloads
|
|
"""
|
|
return await smartreceive(msg, **kwargs)
|
|
|
|
|
|
# Convenience functions for module-level usage
|
|
def send(
|
|
subject: str,
|
|
data: List[Tuple[str, Any, str]],
|
|
**kwargs
|
|
) -> Tuple[Dict, str]:
|
|
"""
|
|
Convenience function for sending data.
|
|
|
|
Args:
|
|
subject: NATS subject to publish to
|
|
data: List of (dataname, data, type) tuples
|
|
**kwargs: Additional options
|
|
|
|
Returns:
|
|
Tuple of (env, env_json_str)
|
|
"""
|
|
return asyncio.run(smartsend(subject, data, **kwargs))
|
|
|
|
|
|
def receive(
|
|
msg: Any,
|
|
**kwargs
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Convenience function for receiving messages.
|
|
|
|
Args:
|
|
msg: NATS message to process
|
|
**kwargs: Additional options
|
|
|
|
Returns:
|
|
Dict with envelope metadata and payloads
|
|
"""
|
|
return asyncio.run(smartreceive(msg, **kwargs))
|
|
|
|
|
|
__all__ = [
|
|
'smartsend',
|
|
'smartreceive',
|
|
'plik_oneshot_upload',
|
|
'fetch_with_backoff',
|
|
'NATSBridge',
|
|
'send',
|
|
'receive',
|
|
'DEFAULT_SIZE_THRESHOLD',
|
|
'DEFAULT_BROKER_URL',
|
|
'DEFAULT_FILESERVER_URL',
|
|
'NATSClient',
|
|
'_serialize_data',
|
|
'_deserialize_data',
|
|
'log_trace',
|
|
'publish_message'
|
|
] |