""" msghandler - Cross-Platform Bi-Directional Data Bridge Python Desktop Implementation This module provides functionality for sending and receiving data across network boundaries with support for both direct payload transport and URL-based transport for larger payloads. @package msghandler """ import asyncio import base64 import json import uuid from datetime import datetime from typing import Any, Callable, Dict, List, Tuple, Union import aiohttp try: import pyarrow as arrow import pyarrow.ipc as ipc ARROW_AVAILABLE = True except ImportError: ARROW_AVAILABLE = False # ---------------------------------------------- Constants ---------------------------------------------- # """ Default size threshold for switching from direct to link transport (0.5MB) """ DEFAULT_SIZE_THRESHOLD = 500_000 """ Default broker URL """ DEFAULT_BROKER_URL = "localhost:4222" """ Default HTTP file server URL for link transport """ DEFAULT_FILESERVER_URL = "http://localhost:8080" # ---------------------------------------------- Utility Functions ---------------------------------------------- # def log_trace(correlation_id: str, message: str) -> None: """ Log a trace message with correlation ID and timestamp. Args: correlation_id: Correlation ID for tracing message: Message content to log """ timestamp = datetime.utcnow().isoformat() + 'Z' print(f"[{timestamp}] [Correlation: {correlation_id}] {message}") # ---------------------------------------------- Serialization Functions ---------------------------------------------- # def _serialize_data(data: Any, payload_type: str) -> bytes: """ Serialize data according to specified format. Args: data: Data to serialize (string for "text", JSON-serializable for "dictionary", table-like for "arrowtable"/"jsontable", binary for "image", "audio", "video", "binary") payload_type: Target format: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" Returns: Binary representation of the serialized data Raises: Error: If payload_type is not one of the supported types Error: If payload_type is "image", "audio", or "video" but data is not bytes Error: If payload_type is "arrowtable" but data is not a pandas DataFrame or pyarrow Table Error: If payload_type is "jsontable" but data is not a list of dicts """ if payload_type == 'text': if isinstance(data, str): return data.encode('utf-8') else: raise ValueError('Text data must be a string') elif payload_type == 'dictionary': json_str = json.dumps(data) return json_str.encode('utf-8') elif payload_type == 'arrowtable': if not ARROW_AVAILABLE: raise RuntimeError('pyarrow not available for arrowtable serialization') import io buf = io.BytesIO() import pandas as pd if isinstance(data, pd.DataFrame): table = arrow.Table.from_pandas(data) sink = ipc.new_file(buf, table.schema) ipc.write_table(table, sink) sink.close() return buf.getvalue() elif isinstance(data, arrow.Table): sink = ipc.new_file(buf, data.schema) ipc.write_table(data, sink) sink.close() return buf.getvalue() else: raise ValueError('Arrow table data must be a pandas DataFrame or pyarrow Table') elif payload_type == 'jsontable': # Serialize list of dicts to JSON format if isinstance(data, list) and all(isinstance(row, dict) for row in data): json_str = json.dumps(data) return json_str.encode('utf-8') else: raise ValueError('JSON table data must be a list of dicts') elif payload_type == 'image': if isinstance(data, (bytes, bytearray)): return bytes(data) else: raise ValueError('Image data must be bytes') elif payload_type == 'audio': if isinstance(data, (bytes, bytearray)): return bytes(data) else: raise ValueError('Audio data must be bytes') elif payload_type == 'video': if isinstance(data, (bytes, bytearray)): return bytes(data) else: raise ValueError('Video data must be bytes') elif payload_type == 'binary': if isinstance(data, (bytes, bytearray)): return bytes(data) else: raise ValueError('Binary data must be bytes') else: raise ValueError(f'Unknown payload_type: {payload_type}') def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any: """ Deserialize bytes to data based on type. Args: data: Serialized data as bytes payload_type: Data type ("text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary") correlation_id: Correlation ID for logging Returns: Deserialized data (String for "text", DataFrame for "arrowtable", Vector{Dict} for "jsontable"/"dictionary", bytes for "image", "audio", "video", "binary") Raises: Error: If payload_type is not one of the supported types """ if payload_type == 'text': return data.decode('utf-8') elif payload_type == 'dictionary': json_str = data.decode('utf-8') return json.loads(json_str) elif payload_type == 'arrowtable': if not ARROW_AVAILABLE: raise RuntimeError('pyarrow not available for arrowtable deserialization') import io buf = io.BytesIO(data) reader = ipc.open_file(buf) return reader.read_all().to_pandas() elif payload_type == 'jsontable': # Deserialize JSON to list of dicts json_str = data.decode('utf-8') return json.loads(json_str) elif payload_type == 'image': return data elif payload_type == 'audio': return data elif payload_type == 'video': return data elif payload_type == 'binary': return data else: raise ValueError(f'Unknown payload_type: {payload_type}') # ---------------------------------------------- File Server Handlers ---------------------------------------------- # async def plik_oneshot_upload( file_server_url: str, dataname: str, data: bytes ) -> Dict[str, Any]: """ Upload data to plik server in one-shot mode. This function uploads a raw byte array to a plik server in one-shot mode (no upload session). It first creates a one-shot upload session by sending a POST request with {"OneShot": true}, retrieves an upload ID and token, then uploads the file data as multipart form data using the token. Args: file_server_url: Base URL of the plik server (e.g., "http://localhost:8080") dataname: Name of the file being uploaded data: Raw byte data of the file content Returns: Dict with keys: - "status": HTTP server response status - "uploadid": ID of the one-shot upload session - "fileid": ID of the uploaded file within the session - "url": Full URL to download the uploaded file Example: >>> fileserver_url = "http://localhost:8080" >>> dataname = "test.txt" >>> data = b"hello world" >>> result = await plik_oneshot_upload(file_server_url, dataname, data) >>> result["status"], result["uploadid"], result["fileid"], result["url"] """ async with aiohttp.ClientSession() as session: # Get upload id url_getUploadID = f"{file_server_url}/upload" headers = {'Content-Type': 'application/json'} body = json.dumps({"OneShot": True}) async with session.post(url_getUploadID, headers=headers, data=body) as response: response_json = await response.json() uploadid = response_json['id'] uploadtoken = response_json['uploadToken'] # Upload file url_upload = f"{file_server_url}/file/{uploadid}" headers = {'X-UploadToken': uploadtoken} form = aiohttp.FormData() form.add_field('file', data, filename=dataname, content_type='application/octet-stream') async with session.post(url_upload, headers=headers, data=form) as upload_response: upload_json = await upload_response.json() fileid = upload_json['id'] url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}" return { 'status': upload_response.status, 'uploadid': uploadid, 'fileid': fileid, 'url': url } async def fetch_with_backoff( url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str ) -> bytes: """ Fetch data from URL with exponential backoff. This internal function retrieves data from a URL with retry logic using exponential backoff to handle transient failures. Args: url: URL to fetch from max_retries: Maximum number of retry attempts base_delay: Initial delay in milliseconds max_delay: Maximum delay in milliseconds correlation_id: Correlation ID for logging Returns: Fetched data as bytes Raises: Error: If all retry attempts fail Example: >>> data = await fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "correlation123") """ delay = base_delay for attempt in range(1, max_retries + 1): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: log_trace(correlation_id, f"Successfully fetched data from {url} on attempt {attempt}") return await response.read() else: raise Exception(f"Failed to fetch: {response.status}") except Exception as e: log_trace(correlation_id, f"Attempt {attempt} failed: {type(e).__name__}") if attempt < max_retries: await asyncio.sleep(delay / 1000.0) delay = min(delay * 2, max_delay) raise Exception(f"Failed to fetch data after {max_retries} attempts") # ---------------------------------------------- Core Functions ---------------------------------------------- # def _build_envelope( subject: str, payloads: List[Dict[str, Any]], options: Dict[str, Any] ) -> Dict[str, Any]: """ Build message envelope from payloads and metadata. Args: subject: Subject/topic payloads: Array of payload objects options: Envelope metadata options Returns: Envelope object """ return { 'correlation_id': options['correlation_id'], 'msg_id': options['msg_id'], 'timestamp': datetime.utcnow().isoformat() + 'Z', 'send_to': subject, 'msg_purpose': options['msg_purpose'], 'sender_name': options['sender_name'], 'sender_id': options['sender_id'], 'receiver_name': options['receiver_name'], 'receiver_id': options['receiver_id'], 'reply_to': options['reply_to'], 'reply_to_msg_id': options['reply_to_msg_id'], 'broker_url': options['broker_url'], 'metadata': options.get('metadata', {}), 'payloads': payloads } def _build_payload( dataname: str, payload_type: str, payload_bytes: bytes, transport: str, data: Union[str, bytes] ) -> Dict[str, Any]: """ Build payload object from serialized data. Args: dataname: Name of the payload payload_type: Type of the payload payload_bytes: Serialized payload bytes transport: Transport type ("direct" or "link") data: Data (base64 for direct, URL for link) Returns: Payload object """ # Determine encoding based on payload type (matching Julia/JS implementation) encoding = 'base64' if payload_type == 'jsontable': encoding = 'json' elif payload_type == 'arrowtable': encoding = 'arrow-ipc' return { 'id': str(uuid.uuid4()), 'dataname': dataname, 'payload_type': payload_type, 'transport': transport, 'encoding': encoding, 'size': len(payload_bytes), 'data': data, 'metadata': {'payload_bytes': len(payload_bytes)} if transport == 'direct' else {} } async def smartpack( subject: str, data: List[Tuple[str, Any, str]], broker_url: str = DEFAULT_BROKER_URL, fileserver_url: str = DEFAULT_FILESERVER_URL, fileserver_upload_handler: Callable = plik_oneshot_upload, size_threshold: int = DEFAULT_SIZE_THRESHOLD, correlation_id: str = None, msg_purpose: str = "chat", sender_name: str = "msghandler", receiver_name: str = "", receiver_id: str = "", reply_to: str = "", reply_to_msg_id: str = "", msg_id: str = None, sender_id: str = None ) -> Tuple[Dict, str]: """ Send data with automatic transport selection. This function intelligently routes data delivery based on payload size. If the serialized payload is smaller than size_threshold, it encodes the data as Base64 into a "direct" payload. Otherwise, it uploads the data to a fileserver and creates a "link" payload with the URL. Transport publishing is the caller's responsibility. This function returns the envelope and its JSON string representation. Args: subject: Subject/topic to send the message to data: List of (dataname, data, type) tuples to send - dataname: Name of the payload - data: The actual data to send - type: Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" broker_url: Broker URL (for envelope metadata) fileserver_url: URL of the HTTP file server for large payloads fileserver_upload_handler: Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys) size_threshold: Threshold in bytes separating direct vs link transport correlation_id: Correlation ID for tracing (auto-generated UUID if not provided) msg_purpose: Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc. sender_name: Name of the sender receiver_name: Name of the receiver (empty string means broadcast) receiver_id: UUID of the receiver (empty string means broadcast) reply_to: Topic to reply to (empty string if no reply expected) reply_to_msg_id: Message ID this message is replying to msg_id: Message ID (auto-generated UUID if not provided) sender_id: Sender ID (auto-generated UUID if not provided) Returns: Tuple of (env, env_json_str) where: - env: Dict containing all metadata and payloads - env_json_str: JSON string for transport Example: >>> # Send a single payload (still wrapped in a list) >>> data = {"key": "value"} >>> env, env_json_str = await smartpack( ... "my.subject", ... [("dataname1", data, "dictionary")] ... ) >>> >>> # Publish the JSON string via your preferred transport >>> # await my_nats_client.publish("my.subject", env_json_str) """ if correlation_id is None: correlation_id = str(uuid.uuid4()) if msg_id is None: msg_id = str(uuid.uuid4()) if sender_id is None: sender_id = str(uuid.uuid4()) log_trace(correlation_id, f"Starting smartpack for subject: {subject}") # Process payloads payloads = [] for dataname, payload_data, payload_type in data: payload_bytes = _serialize_data(payload_data, payload_type) payload_size = len(payload_bytes) log_trace(correlation_id, f"Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes") if payload_size < size_threshold: # Direct path payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') log_trace(correlation_id, f"Using direct transport for {payload_size} bytes") payload = _build_payload(dataname, payload_type, payload_bytes, 'direct', payload_b64) payloads.append(payload) else: # Link path log_trace(correlation_id, "Using link transport, uploading to fileserver") response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes) if response['status'] != 200: raise Exception(f"Failed to upload data to fileserver: {response['status']}") log_trace(correlation_id, f"Uploaded to URL: {response['url']}") payload = _build_payload(dataname, payload_type, payload_bytes, 'link', response['url']) payloads.append(payload) # Build envelope env = _build_envelope(subject, payloads, { 'correlation_id': correlation_id, 'msg_id': msg_id, 'msg_purpose': msg_purpose, 'sender_name': sender_name, 'sender_id': sender_id, 'receiver_name': receiver_name, 'receiver_id': receiver_id, 'reply_to': reply_to, 'reply_to_msg_id': reply_to_msg_id, 'broker_url': broker_url }) env_json_str = json.dumps(env) return env, env_json_str async def smartunpack( msg: Any, fileserver_download_handler: Callable = fetch_with_backoff, max_retries: int = 5, base_delay: int = 100, max_delay: int = 5000 ) -> Dict[str, Any]: """ Receive and process messages. This function processes incoming messages, handling both direct transport (base64 decoded payloads) and link transport (URL-based payloads). It deserializes the data based on the transport type and returns the result. Args: msg: Message to process. Accepts JSON string directly, or an object with a `payload` or `data` property containing the JSON string. fileserver_download_handler: Function to handle downloading data from file server URLs max_retries: Maximum retry attempts for fetching URL base_delay: Initial delay for exponential backoff in ms max_delay: Maximum delay for exponential backoff in ms Returns: Dict with envelope metadata and payloads field containing List[Tuple[str, Any, str]] Example: >>> # Receive from JSON string directly >>> env = await smartunpack(json_string) >>> >>> # Receive from transport message object (e.g., NATS, MQTT) >>> env = await smartunpack(nats_msg, fileserver_download_handler=fetch_with_backoff) >>> # env is a Dict with "payloads" key containing List[Tuple[str, Any, str]] >>> for dataname, data, type_ in env["payloads"]: >>> print(f"{dataname}: {data} (type: {type_})") """ # Parse the JSON envelope if isinstance(msg, dict): # Already parsed env_json_obj = msg elif isinstance(msg, str): # Raw JSON string env_json_obj = json.loads(msg) elif hasattr(msg, 'payload'): # Transport message object with payload property payload = msg.payload if isinstance(msg.payload, str) else msg.payload.decode('utf-8') env_json_obj = json.loads(payload) elif hasattr(msg, 'data'): # Transport message object with data property payload = msg.data if isinstance(msg.data, str) else msg.data.decode('utf-8') env_json_obj = json.loads(payload) else: raise ValueError('Invalid message format: expected JSON string or message object') log_trace(env_json_obj['correlation_id'], "Processing received message") # Process all payloads in the envelope payloads_list = [] num_payloads = len(env_json_obj['payloads']) for i in range(num_payloads): payload_obj = env_json_obj['payloads'][i] transport = payload_obj['transport'] dataname = payload_obj['dataname'] if transport == 'direct': log_trace(env_json_obj['correlation_id'], f"Direct transport - decoding payload '{dataname}'") # Extract base64 payload from the payload payload_b64 = payload_obj['data'] # Decode Base64 payload payload_bytes = base64.b64decode(payload_b64) # Deserialize based on type data_type = payload_obj['payload_type'] data = _deserialize_data(payload_bytes, data_type, env_json_obj['correlation_id']) payloads_list.append((dataname, data, data_type)) elif transport == 'link': # Extract download URL from the payload url = payload_obj['data'] log_trace(env_json_obj['correlation_id'], f"Link transport - fetching '{dataname}' from URL: {url}") # Fetch with exponential backoff using the download handler downloaded_data = await fileserver_download_handler( url, max_retries, base_delay, max_delay, env_json_obj['correlation_id'] ) # Deserialize based on type data_type = payload_obj['payload_type'] data = _deserialize_data(downloaded_data, data_type, env_json_obj['correlation_id']) payloads_list.append((dataname, data, data_type)) else: raise Exception(f"Unknown transport type for payload '{dataname}': {transport}") env_json_obj['payloads'] = payloads_list return env_json_obj # ---------------------------------------------- Module Exports ---------------------------------------------- # class msghandler: """ Cross-platform message bridge implementation. This class provides a convenient interface for msghandler functionality, encapsulating the main functions and providing a class-based API. """ DEFAULT_SIZE_THRESHOLD = DEFAULT_SIZE_THRESHOLD DEFAULT_BROKER_URL = DEFAULT_BROKER_URL DEFAULT_FILESERVER_URL = DEFAULT_FILESERVER_URL def __init__(self, broker_url: str = None, fileserver_url: str = None): """ Initialize msghandler. Args: broker_url: Broker URL (defaults to DEFAULT_BROKER_URL) fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL) """ self.broker_url = broker_url or self.DEFAULT_BROKER_URL self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL async def smartpack( self, subject: str, data: List[Tuple[str, Any, str]], **kwargs ) -> Tuple[Dict, str]: """ Send data. Args: subject: Subject/topic to send to data: List of (dataname, data, type) tuples **kwargs: Additional options passed to smartpack Returns: Tuple of (env, env_json_str) """ kwargs['broker_url'] = kwargs.get('broker_url', self.broker_url) kwargs['fileserver_url'] = kwargs.get('fileserver_url', self.fileserver_url) return await smartpack(subject, data, **kwargs) async def smartunpack( self, msg: Any, **kwargs ) -> Dict[str, Any]: """ Receive and process message. Args: msg: Message to process **kwargs: Additional options passed to smartunpack Returns: Dict with envelope metadata and payloads """ return await smartunpack(msg, **kwargs) # Convenience functions for module-level usage def send( subject: str, data: List[Tuple[str, Any, str]], **kwargs ) -> Tuple[Dict, str]: """ Convenience function for sending data. Args: subject: Subject/topic to send to data: List of (dataname, data, type) tuples **kwargs: Additional options Returns: Tuple of (env, env_json_str) """ return asyncio.run(smartpack(subject, data, **kwargs)) def receive( msg: Any, **kwargs ) -> Dict[str, Any]: """ Convenience function for receiving messages. Args: msg: Message to process **kwargs: Additional options Returns: Dict with envelope metadata and payloads """ return asyncio.run(smartunpack(msg, **kwargs)) __all__ = [ 'smartpack', 'smartunpack', 'plik_oneshot_upload', 'fetch_with_backoff', 'msghandler', 'send', 'receive', 'DEFAULT_SIZE_THRESHOLD', 'DEFAULT_BROKER_URL', 'DEFAULT_FILESERVER_URL', '_serialize_data', '_deserialize_data', 'log_trace' ]