""" NATSBridge - Cross-Platform Bi-Directional Data Bridge Python Desktop Implementation This module provides functionality for sending and receiving data across network boundaries using NATS as the message bus, with support for both direct payload transport and URL-based transport for larger payloads. @package natsbridge """ import asyncio import base64 import json import uuid from datetime import datetime from typing import Any, Callable, Dict, List, Tuple, Union import aiohttp try: import pyarrow as arrow import pyarrow.ipc as ipc ARROW_AVAILABLE = True except ImportError: ARROW_AVAILABLE = False try: import nats from nats.aio.client import Client as NATSClient NATS_AVAILABLE = True except ImportError: NATS_AVAILABLE = False # ---------------------------------------------- Constants ---------------------------------------------- # """ Default size threshold for switching from direct to link transport (1MB) """ DEFAULT_SIZE_THRESHOLD = 1_000_000 """ Default NATS server URL """ DEFAULT_BROKER_URL = "nats://localhost:4222" """ Default HTTP file server URL for link transport """ DEFAULT_FILESERVER_URL = "http://localhost:8080" # ---------------------------------------------- Utility Functions ---------------------------------------------- # def log_trace(correlation_id: str, message: str) -> None: """ Log a trace message with correlation ID and timestamp. Args: correlation_id: Correlation ID for tracing message: Message content to log """ timestamp = datetime.utcnow().isoformat() + 'Z' print(f"[{timestamp}] [Correlation: {correlation_id}] {message}") # ---------------------------------------------- Serialization Functions ---------------------------------------------- # def _serialize_data(data: Any, payload_type: str) -> bytes: """ Serialize data according to specified format. Args: data: Data to serialize (string for "text", JSON-serializable for "dictionary", table-like for "table", binary for "image", "audio", "video", "binary") payload_type: Target format: "text", "dictionary", "table", "image", "audio", "video", "binary" Returns: Binary representation of the serialized data Raises: Error: If payload_type is not one of the supported types Error: If payload_type is "image", "audio", or "video" but data is not bytes Error: If payload_type is "table" but data is not a pandas DataFrame or pyarrow Table """ if payload_type == 'text': if isinstance(data, str): return data.encode('utf-8') else: raise ValueError('Text data must be a string') elif payload_type == 'dictionary': json_str = json.dumps(data) return json_str.encode('utf-8') elif payload_type == 'table': if not ARROW_AVAILABLE: raise RuntimeError('pyarrow not available for table serialization') import io buf = io.BytesIO() import pandas as pd if isinstance(data, pd.DataFrame): table = arrow.Table.from_pandas(data) sink = ipc.new_file(buf, table.schema) ipc.write_table(table, sink) sink.close() return buf.getvalue() elif isinstance(data, arrow.Table): sink = ipc.new_file(buf, data.schema) ipc.write_table(data, sink) sink.close() return buf.getvalue() else: raise ValueError('Table data must be a pandas DataFrame or pyarrow Table') elif payload_type == 'image': if isinstance(data, (bytes, bytearray)): return bytes(data) else: raise ValueError('Image data must be bytes') elif payload_type == 'audio': if isinstance(data, (bytes, bytearray)): return bytes(data) else: raise ValueError('Audio data must be bytes') elif payload_type == 'video': if isinstance(data, (bytes, bytearray)): return bytes(data) else: raise ValueError('Video data must be bytes') elif payload_type == 'binary': if isinstance(data, (bytes, bytearray)): return bytes(data) else: raise ValueError('Binary data must be bytes') else: raise ValueError(f'Unknown payload_type: {payload_type}') def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any: """ Deserialize bytes to data based on type. Args: data: Serialized data as bytes payload_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") correlation_id: Correlation ID for logging Returns: Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary") Raises: Error: If payload_type is not one of the supported types """ if payload_type == 'text': return data.decode('utf-8') elif payload_type == 'dictionary': json_str = data.decode('utf-8') return json.loads(json_str) elif payload_type == 'table': if not ARROW_AVAILABLE: raise RuntimeError('pyarrow not available for table deserialization') import io buf = io.BytesIO(data) reader = ipc.open_file(buf) return reader.read_all().to_pandas() elif payload_type == 'image': return data elif payload_type == 'audio': return data elif payload_type == 'video': return data elif payload_type == 'binary': return data else: raise ValueError(f'Unknown payload_type: {payload_type}') # ---------------------------------------------- File Server Handlers ---------------------------------------------- # async def plik_oneshot_upload( file_server_url: str, dataname: str, data: bytes ) -> Dict[str, Any]: """ Upload data to plik server in one-shot mode. This function uploads a raw byte array to a plik server in one-shot mode (no upload session). It first creates a one-shot upload session by sending a POST request with {"OneShot": true}, retrieves an upload ID and token, then uploads the file data as multipart form data using the token. Args: file_server_url: Base URL of the plik server (e.g., "http://localhost:8080") dataname: Name of the file being uploaded data: Raw byte data of the file content Returns: Dict with keys: - "status": HTTP server response status - "uploadid": ID of the one-shot upload session - "fileid": ID of the uploaded file within the session - "url": Full URL to download the uploaded file Example: >>> fileserver_url = "http://localhost:8080" >>> dataname = "test.txt" >>> data = b"hello world" >>> result = await plik_oneshot_upload(file_server_url, dataname, data) >>> result["status"], result["uploadid"], result["fileid"], result["url"] """ async with aiohttp.ClientSession() as session: # Get upload id url_getUploadID = f"{file_server_url}/upload" headers = {'Content-Type': 'application/json'} body = json.dumps({"OneShot": True}) async with session.post(url_getUploadID, headers=headers, data=body) as response: response_json = await response.json() uploadid = response_json['id'] uploadtoken = response_json['uploadToken'] # Upload file url_upload = f"{file_server_url}/file/{uploadid}" headers = {'X-UploadToken': uploadtoken} form = aiohttp.FormData() form.add_field('file', data, filename=dataname, content_type='application/octet-stream') async with session.post(url_upload, headers=headers, data=form) as upload_response: upload_json = await upload_response.json() fileid = upload_json['id'] url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}" return { 'status': upload_response.status, 'uploadid': uploadid, 'fileid': fileid, 'url': url } async def fetch_with_backoff( url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str ) -> bytes: """ Fetch data from URL with exponential backoff. This internal function retrieves data from a URL with retry logic using exponential backoff to handle transient failures. Args: url: URL to fetch from max_retries: Maximum number of retry attempts base_delay: Initial delay in milliseconds max_delay: Maximum delay in milliseconds correlation_id: Correlation ID for logging Returns: Fetched data as bytes Raises: Error: If all retry attempts fail Example: >>> data = await fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "correlation123") """ delay = base_delay for attempt in range(1, max_retries + 1): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: log_trace(correlation_id, f"Successfully fetched data from {url} on attempt {attempt}") return await response.read() else: raise Exception(f"Failed to fetch: {response.status}") except Exception as e: log_trace(correlation_id, f"Attempt {attempt} failed: {type(e).__name__}") if attempt < max_retries: await asyncio.sleep(delay / 1000.0) delay = min(delay * 2, max_delay) raise Exception(f"Failed to fetch data after {max_retries} attempts") # ---------------------------------------------- NATS Client ---------------------------------------------- # class NATSClient: """NATS client wrapper for connection management.""" def __init__(self, url: str = DEFAULT_BROKER_URL): """ Create a new NATS client. Args: url: NATS server URL """ self.url = url self._client: NATSClient = None async def connect(self) -> NATSClient: """ Connect to NATS server. Returns: NATS client instance """ if NATS_AVAILABLE: self._client = nats.connect(self.url) await self._client else: raise Error('nats-py not available') return self._client async def publish(self, subject: str, message: str, correlation_id: str = "") -> None: """ Publish message to NATS subject. Args: subject: NATS subject to publish to message: Message to publish correlation_id: Correlation ID for logging """ if self._client: await self._client.publish(subject, message) if correlation_id: log_trace(correlation_id, f"Message published to {subject}") async def close(self) -> None: """Close the NATS connection.""" if self._client: await self._client.drain() await self._client.close() # ---------------------------------------------- Core Functions ---------------------------------------------- # def _build_envelope( subject: str, payloads: List[Dict[str, Any]], options: Dict[str, Any] ) -> Dict[str, Any]: """ Build message envelope from payloads and metadata. Args: subject: NATS subject payloads: Array of payload objects options: Envelope metadata options Returns: Envelope object """ return { 'correlation_id': options['correlation_id'], 'msg_id': options['msg_id'], 'timestamp': datetime.utcnow().isoformat() + 'Z', 'send_to': subject, 'msg_purpose': options['msg_purpose'], 'sender_name': options['sender_name'], 'sender_id': options['sender_id'], 'receiver_name': options['receiver_name'], 'receiver_id': options['receiver_id'], 'reply_to': options['reply_to'], 'reply_to_msg_id': options['reply_to_msg_id'], 'broker_url': options['broker_url'], 'metadata': options.get('metadata', {}), 'payloads': payloads } def _build_payload( dataname: str, payload_type: str, payload_bytes: bytes, transport: str, data: Union[str, bytes] ) -> Dict[str, Any]: """ Build payload object from serialized data. Args: dataname: Name of the payload payload_type: Type of the payload payload_bytes: Serialized payload bytes transport: Transport type ("direct" or "link") data: Data (base64 for direct, URL for link) Returns: Payload object """ return { 'id': str(uuid.uuid4()), 'dataname': dataname, 'payload_type': payload_type, 'transport': transport, 'encoding': 'base64' if transport == 'direct' else 'none', 'size': len(payload_bytes), 'data': data, 'metadata': {'payload_bytes': len(payload_bytes)} if transport == 'direct' else {} } async def publish_message( broker_url_or_client: Union[str, NATSClient, Any], subject: str, message: str, correlation_id: str ) -> None: """ Publish message to NATS. Args: broker_url_or_client: NATS URL, client, or connection subject: NATS subject to publish to message: JSON message to publish correlation_id: Correlation ID for tracing """ if isinstance(broker_url_or_client, NATSClient): client = broker_url_or_client elif NATS_AVAILABLE and hasattr(broker_url_or_client, 'publish'): # Direct NATS client connection await broker_url_or_client.publish(subject, message) log_trace(correlation_id, f"Message published to {subject}") return else: # String URL - create new client client = NATSClient(broker_url_or_client) await client.connect() await client.publish(subject, message, correlation_id) if isinstance(broker_url_or_client, NATSClient): await broker_url_or_client.close() elif not (NATS_AVAILABLE and hasattr(broker_url_or_client, 'publish')): await client.close() async def smartsend( subject: str, data: List[Tuple[str, Any, str]], broker_url: str = DEFAULT_BROKER_URL, fileserver_url: str = DEFAULT_FILESERVER_URL, fileserver_upload_handler: Callable = plik_oneshot_upload, size_threshold: int = DEFAULT_SIZE_THRESHOLD, correlation_id: str = None, msg_purpose: str = "chat", sender_name: str = "NATSBridge", receiver_name: str = "", receiver_id: str = "", reply_to: str = "", reply_to_msg_id: str = "", is_publish: bool = True, nats_connection: Any = None, msg_id: str = None, sender_id: str = None ) -> Tuple[Dict, str]: """ Send data via NATS with automatic transport selection. This function intelligently routes data delivery based on payload size. If the serialized payload is smaller than size_threshold, it encodes the data as Base64 and publishes directly over NATS. Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS. Args: subject: NATS subject to publish the message to data: List of (dataname, data, type) tuples to send - dataname: Name of the payload - data: The actual data to send - type: Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" broker_url: URL of the NATS server fileserver_url: URL of the HTTP file server for large payloads fileserver_upload_handler: Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys) size_threshold: Threshold in bytes separating direct vs link transport correlation_id: Correlation ID for tracing (auto-generated UUID if not provided) msg_purpose: Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc. sender_name: Name of the sender receiver_name: Name of the receiver (empty string means broadcast) receiver_id: UUID of the receiver (empty string means broadcast) reply_to: Topic to reply to (empty string if no reply expected) reply_to_msg_id: Message ID this message is replying to is_publish: Whether to automatically publish the message to NATS nats_connection: Pre-existing NATS connection (if provided, uses this connection instead of creating a new one; saves connection establishment overhead) msg_id: Message ID (auto-generated UUID if not provided) sender_id: Sender ID (auto-generated UUID if not provided) Returns: Tuple of (env, env_json_str) where: - env: Dict containing all metadata and payloads - env_json_str: JSON string for publishing to NATS Example: >>> # Send a single payload (still wrapped in a list) >>> data = {"key": "value"} >>> env, env_json_str = await smartsend( ... "my.subject", ... [("dataname1", data, "dictionary")], ... broker_url="nats://localhost:4222" ... ) >>> >>> # Send multiple payloads with different types >>> data1 = {"key1": "value1"} >>> data2 = [1, 2, 3, 4, 5] >>> env, env_json_str = await smartsend( ... "my.subject", ... [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")] ... ) >>> >>> # Send a large array using fileserver upload >>> data = list(range(10_000_000)) # ~80 MB >>> env, env_json_str = await smartsend( ... "large.data", ... [("large_table", data, "table")] ... ) >>> >>> # Mixed content (e.g., chat with text and image) >>> env, env_json_str = await smartsend( ... "chat.subject", ... [ ... ("message_text", "Hello!", "text"), ... ("user_image", image_data, "image"), ... ("audio_clip", audio_data, "audio") ... ] ... ) >>> >>> # Publish the JSON string directly using NATS request-reply pattern >>> # reply = await nats.request(broker_url, subject, env_json_str, reply_to=reply_to_topic) """ if correlation_id is None: correlation_id = str(uuid.uuid4()) if msg_id is None: msg_id = str(uuid.uuid4()) if sender_id is None: sender_id = str(uuid.uuid4()) log_trace(correlation_id, f"Starting smartsend for subject: {subject}") # Process payloads payloads = [] for dataname, payload_data, payload_type in data: payload_bytes = _serialize_data(payload_data, payload_type) payload_size = len(payload_bytes) log_trace(correlation_id, f"Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes") if payload_size < size_threshold: # Direct path payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') log_trace(correlation_id, f"Using direct transport for {payload_size} bytes") payload = _build_payload(dataname, payload_type, payload_bytes, 'direct', payload_b64) payloads.append(payload) else: # Link path log_trace(correlation_id, "Using link transport, uploading to fileserver") response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes) if response['status'] != 200: raise Exception(f"Failed to upload data to fileserver: {response['status']}") log_trace(correlation_id, f"Uploaded to URL: {response['url']}") payload = _build_payload(dataname, payload_type, payload_bytes, 'link', response['url']) payloads.append(payload) # Build envelope env = _build_envelope(subject, payloads, { 'correlation_id': correlation_id, 'msg_id': msg_id, 'msg_purpose': msg_purpose, 'sender_name': sender_name, 'sender_id': sender_id, 'receiver_name': receiver_name, 'receiver_id': receiver_id, 'reply_to': reply_to, 'reply_to_msg_id': reply_to_msg_id, 'broker_url': broker_url }) env_json_str = json.dumps(env) if is_publish: if nats_connection: await publish_message(nats_connection, subject, env_json_str, correlation_id) else: await publish_message(broker_url, subject, env_json_str, correlation_id) return env, env_json_str async def smartreceive( msg: Any, fileserver_download_handler: Callable = fetch_with_backoff, max_retries: int = 5, base_delay: int = 100, max_delay: int = 5000 ) -> Dict[str, Any]: """ Receive and process NATS messages. This function processes incoming NATS messages, handling both direct transport (base64 decoded payloads) and link transport (URL-based payloads). It deserializes the data based on the transport type and returns the result. Args: msg: NATS message to process fileserver_download_handler: Function to handle downloading data from file server URLs max_retries: Maximum retry attempts for fetching URL base_delay: Initial delay for exponential backoff in ms max_delay: Maximum delay for exponential backoff in ms Returns: Dict with envelope metadata and payloads field containing List[Tuple[str, Any, str]] Example: >>> # Receive and process message >>> env = await smartreceive(msg, fileserver_download_handler=fetch_with_backoff) >>> # env is a Dict with "payloads" key containing List[Tuple[str, Any, str]] >>> # Access payloads: for dataname, data, type_ in env["payloads"] >>> for dataname, data, type_ in env["payloads"]: >>> print(f"{dataname}: {data} (type: {type_})") """ # Parse the JSON envelope if isinstance(msg, dict): # Already parsed env_json_obj = msg elif hasattr(msg, 'payload'): # NATS message object payload = msg.payload if isinstance(msg.payload, str) else msg.payload.decode('utf-8') env_json_obj = json.loads(payload) else: # Assume it's already a JSON string or dict env_json_obj = json.loads(msg) if isinstance(msg, str) else msg log_trace(env_json_obj['correlation_id'], "Processing received message") # Process all payloads in the envelope payloads_list = [] num_payloads = len(env_json_obj['payloads']) for i in range(num_payloads): payload_obj = env_json_obj['payloads'][i] transport = payload_obj['transport'] dataname = payload_obj['dataname'] if transport == 'direct': log_trace(env_json_obj['correlation_id'], f"Direct transport - decoding payload '{dataname}'") # Extract base64 payload from the payload payload_b64 = payload_obj['data'] # Decode Base64 payload payload_bytes = base64.b64decode(payload_b64) # Deserialize based on type data_type = payload_obj['payload_type'] data = _deserialize_data(payload_bytes, data_type, env_json_obj['correlation_id']) payloads_list.append((dataname, data, data_type)) elif transport == 'link': # Extract download URL from the payload url = payload_obj['data'] log_trace(env_json_obj['correlation_id'], f"Link transport - fetching '{dataname}' from URL: {url}") # Fetch with exponential backoff using the download handler downloaded_data = await fileserver_download_handler( url, max_retries, base_delay, max_delay, env_json_obj['correlation_id'] ) # Deserialize based on type data_type = payload_obj['payload_type'] data = _deserialize_data(downloaded_data, data_type, env_json_obj['correlation_id']) payloads_list.append((dataname, data, data_type)) else: raise Exception(f"Unknown transport type for payload '{dataname}': {transport}") env_json_obj['payloads'] = payloads_list return env_json_obj # ---------------------------------------------- Module Exports ---------------------------------------------- # class NATSBridge: """ Cross-platform NATS bridge implementation. This class provides a convenient interface for NATSBridge functionality, encapsulating the main functions and providing a class-based API. """ DEFAULT_SIZE_THRESHOLD = DEFAULT_SIZE_THRESHOLD DEFAULT_BROKER_URL = DEFAULT_BROKER_URL DEFAULT_FILESERVER_URL = DEFAULT_FILESERVER_URL def __init__(self, broker_url: str = None, fileserver_url: str = None): """ Initialize NATSBridge. Args: broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL) fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL) """ self.broker_url = broker_url or self.DEFAULT_BROKER_URL self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL async def smartsend( self, subject: str, data: List[Tuple[str, Any, str]], **kwargs ) -> Tuple[Dict, str]: """ Send data via NATS. Args: subject: NATS subject to publish to data: List of (dataname, data, type) tuples **kwargs: Additional options passed to smartsend Returns: Tuple of (env, env_json_str) """ kwargs['broker_url'] = kwargs.get('broker_url', self.broker_url) kwargs['fileserver_url'] = kwargs.get('fileserver_url', self.fileserver_url) return await smartsend(subject, data, **kwargs) async def smartreceive( self, msg: Any, **kwargs ) -> Dict[str, Any]: """ Receive and process NATS message. Args: msg: NATS message to process **kwargs: Additional options passed to smartreceive Returns: Dict with envelope metadata and payloads """ return await smartreceive(msg, **kwargs) # Convenience functions for module-level usage def send( subject: str, data: List[Tuple[str, Any, str]], **kwargs ) -> Tuple[Dict, str]: """ Convenience function for sending data. Args: subject: NATS subject to publish to data: List of (dataname, data, type) tuples **kwargs: Additional options Returns: Tuple of (env, env_json_str) """ return asyncio.run(smartsend(subject, data, **kwargs)) def receive( msg: Any, **kwargs ) -> Dict[str, Any]: """ Convenience function for receiving messages. Args: msg: NATS message to process **kwargs: Additional options Returns: Dict with envelope metadata and payloads """ return asyncio.run(smartreceive(msg, **kwargs)) __all__ = [ 'smartsend', 'smartreceive', 'plik_oneshot_upload', 'fetch_with_backoff', 'NATSBridge', 'send', 'receive', 'DEFAULT_SIZE_THRESHOLD', 'DEFAULT_BROKER_URL', 'DEFAULT_FILESERVER_URL', 'NATSClient', '_serialize_data', '_deserialize_data', 'log_trace', 'publish_message' ]