""" msghandler - Cross-Platform Bi-Directional Data Bridge MicroPython Implementation This module provides functionality for sending and receiving data across network boundaries with support for both direct payload transport and URL-based transport for larger payloads. Note: MicroPython has significant constraints compared to desktop implementations: - Limited memory (~256KB - 1MB) - No Arrow IPC support (memory constraints) - Synchronous API (no async/await) - Lower size threshold for direct transport """ import network import time import json import base64 import uos import struct import random # ---------------------------------------------- Constants ---------------------------------------------- # """ Default size threshold for switching from direct to link transport (100KB for MicroPython) """ DEFAULT_SIZE_THRESHOLD = 100000 """ Default broker URL """ DEFAULT_BROKER_URL = "localhost:4222" """ Default HTTP file server URL for link transport """ DEFAULT_FILESERVER_URL = "http://localhost:8080" """ Hard limit for payload size in MicroPython (50KB) """ MAX_PAYLOAD_SIZE = 50000 # ---------------------------------------------- Utility Functions ---------------------------------------------- # def log_trace(correlation_id, message): """ Log a trace message with correlation ID and timestamp. Args: correlation_id: Correlation ID for tracing message: Message content to log """ timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()) print(f"[{timestamp}] [Correlation: {correlation_id}] {message}") def _generate_uuid(): """ Generate a simple UUID compatible with MicroPython. Returns: UUID string """ # Generate a simple UUID-like string # Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx hex_chars = '0123456789abcdef' uuid_str = ''.join([random.choice(hex_chars) for _ in range(32)]) # Insert hyphens at proper positions return f"{uuid_str[:8]}-{uuid_str[8:12]}-{uuid_str[12:16]}-{uuid_str[16:20]}-{uuid_str[20:]}" # ---------------------------------------------- Serialization Functions ---------------------------------------------- # def _serialize_data(data, payload_type): """ Serialize data according to specified format. Args: data: Data to serialize (string for "text", dict for "dictionary", bytes for "image", "audio", "video", "binary") payload_type: Target format: "text", "dictionary", "image", "audio", "video", "binary" Returns: Binary representation of the serialized data Note: MicroPython does not support "table" type due to memory constraints. Raises: ValueError: If payload_type is not one of the supported types """ if payload_type == 'text': if isinstance(data, str): return data.encode('utf-8') else: raise ValueError('Text data must be a string') elif payload_type == 'dictionary': json_str = json.dumps(data) return json_str.encode('utf-8') elif payload_type in ('image', 'audio', 'video', 'binary'): if isinstance(data, (bytes, bytearray, memoryview)): return bytes(data) else: raise ValueError(f'{payload_type} data must be bytes') else: raise ValueError(f'Unknown payload_type: {payload_type}') def _deserialize_data(data, payload_type): """ Deserialize bytes to data based on type. Args: data: Serialized data as bytes payload_type: Data type ("text", "dictionary", "image", "audio", "video", "binary") Returns: Deserialized data (String for "text", dict for "dictionary", bytes for others) Note: MicroPython does not support "table" type due to memory constraints. Raises: ValueError: If payload_type is not one of the supported types """ if payload_type == 'text': return data.decode('utf-8') elif payload_type == 'dictionary': json_str = data.decode('utf-8') return json.loads(json_str) elif payload_type in ('image', 'audio', 'video', 'binary'): return data else: raise ValueError(f'Unknown payload_type: {payload_type}') # ---------------------------------------------- File Server Handlers ---------------------------------------------- # def _sync_fileserver_upload(file_server_url, dataname, data): """ Synchronous file upload to HTTP server. Note: This is a simplified implementation for MicroPython. In practice, would use network.HTTP or similar. Currently raises NotImplementedError as file upload is not fully supported. Args: file_server_url: Base URL of the file server dataname: Name of the file being uploaded data: Raw byte data of the file content Returns: Dict with keys: 'status', 'url' Raises: NotImplementedError: File upload is not implemented in MicroPython """ raise NotImplementedError("File upload not fully implemented in MicroPython. " "Use direct transport only for memory-constrained devices.") def _sync_fileserver_download(url, max_retries, base_delay, max_delay, correlation_id): """ Synchronous file download with exponential backoff. Note: This is a simplified implementation for MicroPython. In practice, would use network.HTTP or similar. Currently raises NotImplementedError as file download is not fully supported. Args: url: URL to download from max_retries: Maximum retry attempts base_delay: Initial delay in ms max_delay: Maximum delay in ms correlation_id: Correlation ID for logging Returns: Downloaded bytes Raises: NotImplementedError: File download is not implemented in MicroPython """ raise NotImplementedError("File download not fully implemented in MicroPython. " "Use direct transport only for memory-constrained devices.") # ---------------------------------------------- Core Functions ---------------------------------------------- # def _build_envelope(subject, payloads, options): """ Build message envelope from payloads and metadata. Args: subject: Subject/topic payloads: Array of payload objects options: Envelope metadata options Returns: Envelope dict """ return { 'correlation_id': options['correlation_id'], 'msg_id': options['msg_id'], 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), 'send_to': subject, 'msg_purpose': options['msg_purpose'], 'sender_name': options['sender_name'], 'sender_id': options['sender_id'], 'receiver_name': options['receiver_name'], 'receiver_id': options['receiver_id'], 'reply_to': options['reply_to'], 'reply_to_msg_id': options['reply_to_msg_id'], 'broker_url': options['broker_url'], 'metadata': {}, 'payloads': payloads } def _build_payload(dataname, payload_type, payload_bytes, transport, data): """ Build payload object from serialized data. Args: dataname: Name of the payload payload_type: Type of the payload payload_bytes: Serialized payload bytes transport: Transport type ("direct" or "link") data: Data (base64 for direct, URL for link) Returns: Payload dict """ return { 'id': _generate_uuid(), 'dataname': dataname, 'payload_type': payload_type, 'transport': transport, 'encoding': 'base64' if transport == 'direct' else 'none', 'size': len(payload_bytes), 'data': data, 'metadata': {'payload_bytes': len(payload_bytes)} if transport == 'direct' else {} } def _publish(subject, message, correlation_id): """ Publish message via transport. Note: This is a simplified implementation for MicroPython. Args: subject: Subject to publish to message: JSON message to publish correlation_id: Correlation ID for logging """ log_trace(correlation_id, f"Publishing to {subject}") # Placeholder - actual implementation would publish via preferred transport def smartpack(subject, data, **kwargs): """ Send data with automatic transport selection. This function intelligently routes data delivery based on payload size. If the serialized payload is smaller than size_threshold, it encodes the data as Base64 into a "direct" payload. Otherwise, it uploads the data to a fileserver and creates a "link" payload with the URL. Transport publishing is the caller's responsibility. This function returns the envelope and its JSON string representation. Note: MicroPython has memory constraints, so the default size_threshold is lower (100KB). Table type is not supported due to memory constraints. Args: subject: Subject/topic to send the message to data: List of (dataname, data, type) tuples to send - dataname: Name of the payload - data: The actual data to send - type: Payload type: "text", "dictionary", "image", "audio", "video", "binary" broker_url: Broker URL (for envelope metadata, default: DEFAULT_BROKER_URL) fileserver_url: HTTP file server URL (default: DEFAULT_FILESERVER_URL) fileserver_upload_handler: Function to handle fileserver uploads (default: _sync_fileserver_upload) size_threshold: Threshold in bytes separating direct vs link transport (default: 100000) correlation_id: Correlation ID for tracing (auto-generated if not provided) msg_purpose: Purpose of the message (default: "chat") sender_name: Name of the sender (default: "msghandler") receiver_name: Name of the receiver (empty means broadcast) receiver_id: UUID of the receiver (empty means broadcast) reply_to: Topic to reply to (empty if no reply expected) reply_to_msg_id: Message ID this message is replying to msg_id: Message ID (auto-generated if not provided) sender_id: Sender ID (auto-generated if not provided) Returns: Tuple of (env, env_json_str) where: - env: Dict containing all metadata and payloads - env_json_str: JSON string for transport Example: >>> # Send text payload >>> env, env_json_str = smartpack( ... "/chat", ... [("message", "Hello!", "text")] ... ) >>> >>> # Publish via your transport >>> # my_transport.publish("/chat", env_json_str) """ # Extract options with defaults correlation_id = kwargs.get('correlation_id', _generate_uuid()) msg_id = kwargs.get('msg_id', _generate_uuid()) sender_id = kwargs.get('sender_id', _generate_uuid()) broker_url = kwargs.get('broker_url', DEFAULT_BROKER_URL) fileserver_url = kwargs.get('fileserver_url', DEFAULT_FILESERVER_URL) size_threshold = kwargs.get('size_threshold', DEFAULT_SIZE_THRESHOLD) msg_purpose = kwargs.get('msg_purpose', 'chat') sender_name = kwargs.get('sender_name', 'msghandler') receiver_name = kwargs.get('receiver_name', '') receiver_id = kwargs.get('receiver_id', '') reply_to = kwargs.get('reply_to', '') reply_to_msg_id = kwargs.get('reply_to_msg_id', '') is_publish = kwargs.get('is_publish', True) fileserver_upload_handler = kwargs.get('fileserver_upload_handler', _sync_fileserver_upload) log_trace(correlation_id, f"Starting smartpack for subject: {subject}") # Process payloads payloads = [] for dataname, payload_data, payload_type in data: payload_bytes = _serialize_data(payload_data, payload_type) payload_size = len(payload_bytes) # Check against hard limit for MicroPython if payload_size > MAX_PAYLOAD_SIZE: raise MemoryError(f"Payload '{dataname}' exceeds max size {MAX_PAYLOAD_SIZE} bytes") log_trace(correlation_id, f"Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes") if payload_size < size_threshold: # Direct path payload_b64 = base64.b64encode(payload_bytes).decode('ascii') log_trace(correlation_id, f"Using direct transport for {payload_size} bytes") payload = _build_payload(dataname, payload_type, payload_bytes, 'direct', payload_b64) payloads.append(payload) else: # Link path (limited support) log_trace(correlation_id, "Using link transport, uploading to fileserver") try: response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) log_trace(correlation_id, f"Uploaded to URL: {response['url']}") payload = _build_payload(dataname, payload_type, payload_bytes, 'link', response['url']) payloads.append(payload) except NotImplementedError: # Fall back to direct transport if file upload not available log_trace(correlation_id, "File upload not available, using direct transport") payload_b64 = base64.b64encode(payload_bytes).decode('ascii') payload = _build_payload(dataname, payload_type, payload_bytes, 'direct', payload_b64) payloads.append(payload) # Build envelope env = _build_envelope(subject, payloads, { 'correlation_id': correlation_id, 'msg_id': msg_id, 'msg_purpose': msg_purpose, 'sender_name': sender_name, 'sender_id': sender_id, 'receiver_name': receiver_name, 'receiver_id': receiver_id, 'reply_to': reply_to, 'reply_to_msg_id': reply_to_msg_id, 'broker_url': broker_url }) env_json_str = json.dumps(env) if is_publish: _publish(subject, env_json_str, correlation_id) return env, env_json_str def smartunpack(msg, **kwargs): """ Receive and process messages. This function processes incoming messages, handling both direct transport (base64 decoded payloads) and link transport (URL-based payloads). It deserializes the data based on the transport type and returns the result. Note: MicroPython has memory constraints, so large payloads should be avoided. Table type is not supported due to memory constraints. Args: msg: Message to process (can be JSON string, dict, or object with 'payload'/'data' attribute) fileserver_download_handler: Function to handle downloading data from file server URLs max_retries: Maximum retry attempts (default: 3) base_delay: Initial delay in ms (default: 100) max_delay: Maximum delay in ms (default: 1000) Returns: Dict with envelope metadata and payloads field containing List[Tuple[str, Any, str]] Example: >>> # Receive from JSON string >>> env = smartunpack(json_string) >>> >>> # Receive from transport message object >>> env = smartunpack(transport_msg, fileserver_download_handler=_sync_fileserver_download) >>> # env is a Dict with "payloads" key containing List[Tuple[str, Any, str]] >>> for dataname, data, type_ in env["payloads"]: ... print(f"{dataname}: {data} (type: {type_})") """ # Parse the JSON envelope if isinstance(msg, dict): # Already parsed env_json_obj = msg elif isinstance(msg, str): # Raw JSON string env_json_obj = json.loads(msg) elif hasattr(msg, 'payload'): # Object with payload attribute payload = msg.payload if isinstance(msg.payload, str) else msg.payload.decode('utf-8') env_json_obj = json.loads(payload) elif hasattr(msg, 'data'): # Object with data attribute payload = msg.data if isinstance(msg.data, str) else msg.data.decode('utf-8') env_json_obj = json.loads(payload) else: raise ValueError('Invalid message format: expected JSON string or message object') correlation_id = env_json_obj['correlation_id'] log_trace(correlation_id, "Processing received message") # Process all payloads in the envelope payloads_list = [] num_payloads = len(env_json_obj['payloads']) for i in range(num_payloads): payload_obj = env_json_obj['payloads'][i] transport = payload_obj['transport'] dataname = payload_obj['dataname'] if transport == 'direct': log_trace(correlation_id, f"Direct transport - decoding payload '{dataname}'") # Extract base64 payload from the payload payload_b64 = payload_obj['data'] # Decode Base64 payload payload_bytes = base64.b64decode(payload_b64) # Deserialize based on type data_type = payload_obj['payload_type'] data = _deserialize_data(payload_bytes, data_type) payloads_list.append((dataname, data, data_type)) elif transport == 'link': # Extract download URL from the payload url = payload_obj['data'] log_trace(correlation_id, f"Link transport - fetching '{dataname}' from URL: {url}") # Fetch with exponential backoff using the download handler fileserver_download_handler = kwargs.get('fileserver_download_handler', _sync_fileserver_download) max_retries = kwargs.get('max_retries', 3) base_delay = kwargs.get('base_delay', 100) max_delay = kwargs.get('max_delay', 1000) downloaded_data = fileserver_download_handler( url, max_retries, base_delay, max_delay, correlation_id ) # Deserialize based on type data_type = payload_obj['payload_type'] data = _deserialize_data(downloaded_data, data_type) payloads_list.append((dataname, data, data_type)) else: raise ValueError(f"Unknown transport type for payload '{dataname}': {transport}") env_json_obj['payloads'] = payloads_list return env_json_obj # ---------------------------------------------- Module Exports ---------------------------------------------- # class msghandler: """ MicroPython message bridge implementation. This class provides a convenient interface for msghandler functionality, encapsulating the main functions and providing a class-based API. Note: MicroPython has significant constraints: - No Arrow IPC support (memory constraints) - Only direct transport (< 100KB threshold enforced) - Simplified UUID generation - No async/await (synchronous API) """ DEFAULT_SIZE_THRESHOLD = DEFAULT_SIZE_THRESHOLD DEFAULT_BROKER_URL = DEFAULT_BROKER_URL DEFAULT_FILESERVER_URL = DEFAULT_FILESERVER_URL MAX_PAYLOAD_SIZE = MAX_PAYLOAD_SIZE def __init__(self, broker_url=None, fileserver_url=None): """ Initialize msghandler. Args: broker_url: Broker URL (defaults to DEFAULT_BROKER_URL) fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL) """ self.broker_url = broker_url or self.DEFAULT_BROKER_URL self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL def smartpack(self, subject, data, **kwargs): """ Send data. Args: subject: Subject/topic to send to data: List of (dataname, data, type) tuples **kwargs: Additional options passed to smartpack Returns: Tuple of (env, env_json_str) """ kwargs['broker_url'] = kwargs.get('broker_url', self.broker_url) kwargs['fileserver_url'] = kwargs.get('fileserver_url', self.fileserver_url) return smartpack(subject, data, **kwargs) def smartunpack(self, msg, **kwargs): """ Receive and process message. Args: msg: Message to process **kwargs: Additional options passed to smartunpack Returns: Dict with envelope metadata and payloads """ return smartunpack(msg, **kwargs) # Convenience functions for module-level usage def send(subject, data, **kwargs): """ Convenience function for sending data. Args: subject: Subject/topic to send to data: List of (dataname, data, type) tuples **kwargs: Additional options Returns: Tuple of (env, env_json_str) """ return smartpack(subject, data, **kwargs) def receive(msg, **kwargs): """ Convenience function for receiving messages. Args: msg: Message to process **kwargs: Additional options Returns: Dict with envelope metadata and payloads """ return smartunpack(msg, **kwargs) __all__ = [ 'smartpack', 'smartunpack', 'msghandler', 'send', 'receive', 'DEFAULT_SIZE_THRESHOLD', 'DEFAULT_BROKER_URL', 'DEFAULT_FILESERVER_URL', 'MAX_PAYLOAD_SIZE', '_serialize_data', '_deserialize_data', 'log_trace', '_sync_fileserver_upload', '_sync_fileserver_download' ]