""" Micropython NATS Bridge - Bi-Directional Data Bridge for Micropython This module provides functionality for sending and receiving data over NATS using the Claim-Check pattern for large payloads. Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" """ import json import random import time import usocket import uselect import ustruct import uuid try: import ussl HAS_SSL = True except ImportError: HAS_SSL = False # Constants DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport DEFAULT_NATS_URL = "nats://localhost:4222" DEFAULT_FILESERVER_URL = "http://localhost:8080" # ============================================= 100 ============================================== # class MessagePayload: """Internal message payload structure representing a single payload within a NATS message envelope.""" def __init__(self, data, msg_type, id="", dataname="", transport="direct", encoding="none", size=0, metadata=None): """ Initialize a MessagePayload. Args: data: Payload data (bytes for direct, URL string for link) msg_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary") id: Unique identifier for this payload (auto-generated if empty) dataname: Name of the payload (auto-generated UUID if empty) transport: Transport method ("direct" or "link") encoding: Encoding method ("none", "json", "base64", "arrow-ipc") size: Size of the payload in bytes metadata: Optional metadata dictionary """ self.id = id if id else self._generate_uuid() self.dataname = dataname if dataname else self._generate_uuid() self.type = msg_type self.transport = transport self.encoding = encoding self.size = size self.data = data self.metadata = metadata if metadata else {} def _generate_uuid(self): """Generate a UUID string.""" return str(uuid.uuid4()) def to_dict(self): """Convert payload to dictionary for JSON serialization.""" payload_dict = { "id": self.id, "dataname": self.dataname, "type": self.type, "transport": self.transport, "encoding": self.encoding, "size": self.size, } # Include data based on transport type if self.transport == "direct" and self.data is not None: if self.encoding == "base64" or self.encoding == "json": payload_dict["data"] = self.data else: # For other encodings, use base64 payload_dict["data"] = self._to_base64(self.data) elif self.transport == "link" and self.data is not None: # For link transport, data is a URL string payload_dict["data"] = self.data if self.metadata: payload_dict["metadata"] = self.metadata return payload_dict def _to_base64(self, data): """Convert bytes to base64 string.""" if isinstance(data, bytes): # Simple base64 encoding without library import ubinascii return ubinascii.b2a_base64(data).decode('utf-8').strip() return data def _from_base64(self, data): """Convert base64 string to bytes.""" import ubinascii return ubinascii.a2b_base64(data) class MessageEnvelope: """Internal message envelope structure containing multiple payloads with metadata.""" def __init__(self, send_to, payloads, correlation_id="", msg_id="", timestamp="", msg_purpose="", sender_name="", sender_id="", receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", broker_url=DEFAULT_NATS_URL, metadata=None): """ Initialize a MessageEnvelope. Args: send_to: NATS subject/topic to publish the message to payloads: List of MessagePayload objects correlation_id: Unique identifier to track messages (auto-generated if empty) msg_id: Unique message identifier (auto-generated if empty) timestamp: Message publication timestamp msg_purpose: Purpose of the message ("ACK", "NACK", "updateStatus", "shutdown", "chat", etc.) sender_name: Name of the sender sender_id: UUID of the sender receiver_name: Name of the receiver (empty means broadcast) receiver_id: UUID of the receiver (empty means broadcast) reply_to: Topic where receiver should reply reply_to_msg_id: Message ID this message is replying to broker_url: NATS broker URL metadata: Optional message-level metadata """ self.correlation_id = correlation_id if correlation_id else self._generate_uuid() self.msg_id = msg_id if msg_id else self._generate_uuid() self.timestamp = timestamp if timestamp else self._get_timestamp() self.send_to = send_to self.msg_purpose = msg_purpose self.sender_name = sender_name self.sender_id = sender_id if sender_id else self._generate_uuid() self.receiver_name = receiver_name self.receiver_id = receiver_id if receiver_id else self._generate_uuid() self.reply_to = reply_to self.reply_to_msg_id = reply_to_msg_id self.broker_url = broker_url self.metadata = metadata if metadata else {} self.payloads = payloads def _generate_uuid(self): """Generate a UUID string.""" return str(uuid.uuid4()) def _get_timestamp(self): """Get current timestamp in ISO format.""" # Simplified timestamp - Micropython may not have full datetime return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime()) def to_json(self): """Convert envelope to JSON string.""" obj = { "correlationId": self.correlation_id, "msgId": self.msg_id, "timestamp": self.timestamp, "sendTo": self.send_to, "msgPurpose": self.msg_purpose, "senderName": self.sender_name, "senderId": self.sender_id, "receiverName": self.receiver_name, "receiverId": self.receiver_id, "replyTo": self.reply_to, "replyToMsgId": self.reply_to_msg_id, "brokerURL": self.broker_url } # Include metadata if not empty if self.metadata: obj["metadata"] = self.metadata # Convert payloads to JSON array if self.payloads: payloads_json = [] for payload in self.payloads: payloads_json.append(payload.to_dict()) obj["payloads"] = payloads_json return json.dumps(obj) def log_trace(correlation_id, message): """Log a trace message with correlation ID and timestamp.""" timestamp = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()) print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message)) def _serialize_data(data, msg_type): """Serialize data according to specified format. Args: data: Data to serialize msg_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary") Returns: bytes: Binary representation of the serialized data """ if msg_type == "text": if isinstance(data, str): return data.encode('utf-8') else: raise ValueError("Text data must be a string") elif msg_type == "dictionary": if isinstance(data, dict): json_str = json.dumps(data) return json_str.encode('utf-8') else: raise ValueError("Dictionary data must be a dict") elif msg_type in ("image", "audio", "video", "binary"): if isinstance(data, bytes): return data else: raise ValueError("{} data must be bytes".format(msg_type.capitalize())) else: raise ValueError("Unknown type: {}".format(msg_type)) def _deserialize_data(data_bytes, msg_type, correlation_id): """Deserialize bytes to data based on type. Args: data_bytes: Serialized data as bytes msg_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") correlation_id: Correlation ID for logging Returns: Deserialized data """ if msg_type == "text": return data_bytes.decode('utf-8') elif msg_type == "dictionary": json_str = data_bytes.decode('utf-8') return json.loads(json_str) elif msg_type in ("image", "audio", "video", "binary"): return data_bytes else: raise ValueError("Unknown type: {}".format(msg_type)) class NATSConnection: """Simple NATS connection for Micropython.""" def __init__(self, url=DEFAULT_NATS_URL): """Initialize NATS connection. Args: url: NATS server URL (e.g., "nats://localhost:4222") """ self.url = url self.host = "localhost" self.port = 4222 self.conn = None self._parse_url(url) def _parse_url(self, url): """Parse NATS URL to extract host and port.""" if url.startswith("nats://"): url = url[7:] elif url.startswith("tls://"): url = url[6:] if ":" in url: self.host, port_str = url.split(":") self.port = int(port_str) else: self.host = url def connect(self): """Connect to NATS server.""" addr = usocket.getaddrinfo(self.host, self.port)[0][-1] self.conn = usocket.socket() self.conn.connect(addr) log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port)) def publish(self, subject, message): """Publish a message to a NATS subject. Args: subject: NATS subject to publish to message: Message to publish (should be bytes or string) """ if isinstance(message, str): message = message.encode('utf-8') # Simple NATS protocol implementation msg = "PUB {} {}\r\n".format(subject, len(message)) msg = msg.encode('utf-8') + message + b"\r\n" self.conn.send(msg) log_trace("", "Message published to {}".format(subject)) def subscribe(self, subject, callback): """Subscribe to a NATS subject. Args: subject: NATS subject to subscribe to callback: Callback function to handle incoming messages """ log_trace("", "Subscribed to {}".format(subject)) # Simplified subscription - in a real implementation, you'd handle SUB/PUB messages # For Micropython, we'll use a simple polling approach self.subscribed_subject = subject self.subscription_callback = callback def wait_message(self, timeout=1000): """Wait for incoming message. Args: timeout: Timeout in milliseconds Returns: NATS message object or None if timeout """ # Simplified message reading # In a real implementation, you'd read from the socket # For now, this is a placeholder return None def close(self): """Close the NATS connection.""" if self.conn: self.conn.close() self.conn = None log_trace("", "NATS connection closed") def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""): """Fetch data from URL with exponential backoff. Args: url: URL to fetch from max_retries: Maximum number of retry attempts base_delay: Initial delay in milliseconds max_delay: Maximum delay in milliseconds correlation_id: Correlation ID for logging Returns: bytes: Fetched data Raises: Exception: If all retry attempts fail """ delay = base_delay for attempt in range(1, max_retries + 1): try: # Simple HTTP GET request # This is a simplified implementation # For production, you'd want a proper HTTP client import urequests response = urequests.get(url) if response.status_code == 200: log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt)) return response.content else: raise Exception("Failed to fetch: {}".format(response.status_code)) except Exception as e: log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e))) if attempt < max_retries: time.sleep(delay / 1000.0) delay = min(delay * 2, max_delay) def plik_oneshot_upload(file_server_url, filename, data): """Upload a single file to a plik server using one-shot mode. Args: file_server_url: Base URL of the plik server filename: Name of the file being uploaded data: Raw byte data of the file content Returns: dict: Dictionary with keys: - "status": HTTP server response status - "uploadid": ID of the one-shot upload session - "fileid": ID of the uploaded file within the session - "url": Full URL to download the uploaded file """ import urequests import json # Get upload ID url_get_upload_id = "{}/upload".format(file_server_url) headers = {"Content-Type": "application/json"} body = json.dumps({"OneShot": True}) response = urequests.post(url_get_upload_id, headers=headers, data=body) response_json = json.loads(response.content) uploadid = response_json.get("id") uploadtoken = response_json.get("uploadToken") # Upload file url_upload = "{}/file/{}".format(file_server_url, uploadid) headers = {"X-UploadToken": uploadtoken} # For Micropython, we need to construct the multipart form data manually # This is a simplified approach boundary = "----WebKitFormBoundary{}".format(uuid.uuid4().hex[:16]) # Create multipart body part1 = "--{}\r\n".format(boundary) part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(filename) part1 += "Content-Type: application/octet-stream\r\n\r\n" part1_bytes = part1.encode('utf-8') part2 = "\r\n--{}--".format(boundary) part2_bytes = part2.encode('utf-8') # Combine all parts full_body = part1_bytes + data + part2_bytes # Set content type with boundary content_type = "multipart/form-data; boundary={}".format(boundary) response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body) response_json = json.loads(response.content) fileid = response_json.get("id") url = "{}/file/{}/{}".format(file_server_url, uploadid, filename) return { "status": response.status_code, "uploadid": uploadid, "fileid": fileid, "url": url } def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL, fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD, correlation_id=None, msg_purpose="chat", sender_name="NATSBridge", receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True): """Send data either directly via NATS or via a fileserver URL, depending on payload size. This function intelligently routes data delivery based on payload size relative to a threshold. If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS. Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS. Args: subject: NATS subject to publish the message to data: List of (dataname, data, type) tuples to send nats_url: URL of the NATS server fileserver_url: URL of the HTTP file server fileserver_upload_handler: Function to handle fileserver uploads size_threshold: Threshold in bytes separating direct vs link transport correlation_id: Optional correlation ID for tracing msg_purpose: Purpose of the message sender_name: Name of the sender receiver_name: Name of the receiver receiver_id: UUID of the receiver reply_to: Topic to reply to reply_to_msg_id: Message ID this message is replying to is_publish: Whether to automatically publish the message to NATS (default: True) Returns: tuple: (env, env_json_str) where: - env: MessageEnvelope object with all metadata and payloads - env_json_str: JSON string representation of the envelope for publishing """ # Generate correlation ID if not provided cid = correlation_id if correlation_id else str(uuid.uuid4()) log_trace(cid, "Starting smartsend for subject: {}".format(subject)) # Generate message metadata msg_id = str(uuid.uuid4()) # Process each payload in the list payloads = [] for dataname, payload_data, payload_type in data: # Serialize data based on type payload_bytes = _serialize_data(payload_data, payload_type) payload_size = len(payload_bytes) log_trace(cid, "Serialized payload '{}' (type: {}) size: {} bytes".format( dataname, payload_type, payload_size)) # Decision: Direct vs Link if payload_size < size_threshold: # Direct path - Base64 encode and send via NATS payload_b64 = _serialize_data(payload_bytes, "binary") # Already bytes # Convert to base64 string for JSON import ubinascii payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip() log_trace(cid, "Using direct transport for {} bytes".format(payload_size)) # Create MessagePayload for direct transport payload = MessagePayload( payload_b64_str, payload_type, id=str(uuid.uuid4()), dataname=dataname, transport="direct", encoding="base64", size=payload_size, metadata={"payload_bytes": payload_size} ) payloads.append(payload) else: # Link path - Upload to HTTP server, send URL via NATS log_trace(cid, "Using link transport, uploading to fileserver") # Upload to HTTP server response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) if response["status"] != 200: raise Exception("Failed to upload data to fileserver: {}".format(response["status"])) url = response["url"] log_trace(cid, "Uploaded to URL: {}".format(url)) # Create MessagePayload for link transport payload = MessagePayload( url, payload_type, id=str(uuid.uuid4()), dataname=dataname, transport="link", encoding="none", size=payload_size, metadata={} ) payloads.append(payload) # Create MessageEnvelope with all payloads env = MessageEnvelope( subject, payloads, correlation_id=cid, msg_id=msg_id, msg_purpose=msg_purpose, sender_name=sender_name, sender_id=str(uuid.uuid4()), receiver_name=receiver_name, receiver_id=receiver_id, reply_to=reply_to, reply_to_msg_id=reply_to_msg_id, broker_url=nats_url, metadata={} ) msg_json = env.to_json() # Publish to NATS if is_publish is True if is_publish: nats_conn = NATSConnection(nats_url) nats_conn.connect() nats_conn.publish(subject, msg_json) nats_conn.close() # Return tuple of (envelope, json_string) for both direct and link transport return (env, msg_json) def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000): """Receive and process messages from NATS. This function processes incoming NATS messages, handling both direct transport (base64 decoded payloads) and link transport (URL-based payloads). Args: msg: NATS message to process (dict with payload data) fileserver_download_handler: Function to handle downloading data from file server URLs max_retries: Maximum retry attempts for fetching URL base_delay: Initial delay for exponential backoff in ms max_delay: Maximum delay for exponential backoff in ms Returns: dict: Envelope dictionary with metadata and 'payloads' field containing list of (dataname, data, type) tuples """ # Parse the JSON envelope json_data = msg if isinstance(msg, dict) else json.loads(msg) log_trace(json_data.get("correlationId", ""), "Processing received message") # Process all payloads in the envelope payloads_list = [] # Get number of payloads num_payloads = len(json_data.get("payloads", [])) for i in range(num_payloads): payload = json_data["payloads"][i] transport = payload.get("transport", "") dataname = payload.get("dataname", "") if transport == "direct": log_trace(json_data.get("correlationId", ""), "Direct transport - decoding payload '{}'".format(dataname)) # Extract base64 payload from the payload payload_b64 = payload.get("data", "") # Decode Base64 payload import ubinascii payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8')) # Deserialize based on type data_type = payload.get("type", "") data = _deserialize_data(payload_bytes, data_type, json_data.get("correlationId", "")) payloads_list.append((dataname, data, data_type)) elif transport == "link": # Extract download URL from the payload url = payload.get("data", "") log_trace(json_data.get("correlationId", ""), "Link transport - fetching '{}' from URL: {}".format(dataname, url)) # Fetch with exponential backoff downloaded_data = fileserver_download_handler( url, max_retries, base_delay, max_delay, json_data.get("correlationId", "") ) # Deserialize based on type data_type = payload.get("type", "") data = _deserialize_data(downloaded_data, data_type, json_data.get("correlationId", "")) payloads_list.append((dataname, data, data_type)) else: raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport)) # Replace payloads field with the processed list of (dataname, data, type) tuples json_data["payloads"] = payloads_list return json_data # Utility functions def generate_uuid(): """Generate a UUID string.""" return str(uuid.uuid4()) def get_timestamp(): """Get current timestamp in ISO format.""" return time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()) # Example usage if __name__ == "__main__": print("NATSBridge for Micropython") print("=========================") print("This module provides:") print(" - MessageEnvelope: Message envelope structure") print(" - MessagePayload: Payload structure") print(" - smartsend: Send data via NATS with automatic transport selection") print(" - smartreceive: Receive and process messages from NATS") print(" - plik_oneshot_upload: Upload files to HTTP file server") print(" - _fetch_with_backoff: Fetch data from URLs with retry logic") print() print("Usage:") print(" from nats_bridge import smartsend, smartreceive") print(" data = [(\"message\", \"Hello World\", \"text\")]") print(" env = smartsend(\"my.subject\", data)") print() print(" # On receiver:") print(" payloads = smartreceive(msg)") print(" for dataname, data, type in payloads:") print(" print(f\"Received {dataname} of type {type}: {data}\")")