""" Python NATS Bridge - Bi-Directional Data Bridge This module provides functionality for sending and receiving data over NATS using the Claim-Check pattern for large payloads. Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" Multi-Payload Support (Standard API): The system uses a standardized list-of-tuples format for all payload operations. Even when sending a single payload, the user must wrap it in a list. API Standard: # Input format for smartsend (always a list of tuples with type info) [(dataname1, data1, type1), (dataname2, data2, type2), ...] # Output format for smartreceive (always returns a list of tuples) [(dataname1, data1, type1), (dataname2, data2, type2), ...] """ import json import time import uuid # Constants DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport DEFAULT_BROKER_URL = "nats://localhost:4222" DEFAULT_FILESERVER_URL = "http://localhost:8080" # ============================================= 100 ============================================== # class MessagePayload: """Internal message payload structure representing a single payload within a NATS message envelope. This structure represents a single payload within a NATS message envelope. It supports both direct transport (base64-encoded data) and link transport (URL-based). Attributes: id: Unique identifier for this payload (e.g., "uuid4") dataname: Name of the payload (e.g., "login_image") payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary") transport: Transport method ("direct" or "link") encoding: Encoding method ("none", "json", "base64", "arrow-ipc") size: Size of the payload in bytes data: Payload data (bytes for direct, URL for link) metadata: Optional metadata dictionary """ def __init__(self, data, payload_type, id="", dataname="", transport="direct", encoding="none", size=0, metadata=None): """ Initialize a MessagePayload. Args: data: Payload data (base64 string for direct, URL string for link) payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary") id: Unique identifier for this payload (auto-generated if empty) dataname: Name of the payload (auto-generated UUID if empty) transport: Transport method ("direct" or "link") encoding: Encoding method ("none", "json", "base64", "arrow-ipc") size: Size of the payload in bytes metadata: Optional metadata dictionary """ self.id = id if id else self._generate_uuid() self.dataname = dataname if dataname else self._generate_uuid() self.payload_type = payload_type self.transport = transport self.encoding = encoding self.size = size self.data = data self.metadata = metadata if metadata else {} def _generate_uuid(self): """Generate a UUID string.""" return str(uuid.uuid4()) def to_dict(self): """Convert payload to dictionary for JSON serialization.""" payload_dict = { "id": self.id, "dataname": self.dataname, "payload_type": self.payload_type, "transport": self.transport, "encoding": self.encoding, "size": self.size, } # Include data based on transport type if self.transport == "direct" and self.data is not None: if self.encoding == "base64" or self.encoding == "json": payload_dict["data"] = self.data else: # For other encodings, use base64 payload_dict["data"] = self._to_base64(self.data) elif self.transport == "link" and self.data is not None: # For link transport, data is a URL string payload_dict["data"] = self.data if self.metadata: payload_dict["metadata"] = self.metadata return payload_dict def _to_base64(self, data): """Convert bytes to base64 string.""" if isinstance(data, bytes): # Simple base64 encoding without library import ubinascii return ubinascii.b2a_base64(data).decode('utf-8').strip() return data def _from_base64(self, data): """Convert base64 string to bytes.""" import ubinascii return ubinascii.a2b_base64(data) class MessageEnvelope: """Internal message envelope structure containing multiple payloads with metadata.""" def __init__(self, send_to, payloads, correlation_id="", msg_id="", timestamp="", msg_purpose="", sender_name="", sender_id="", receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", broker_url=DEFAULT_NATS_URL, metadata=None): """ Initialize a MessageEnvelope. Args: send_to: NATS subject/topic to publish the message to payloads: List of MessagePayload objects correlation_id: Unique identifier to track messages (auto-generated if empty) msg_id: Unique message identifier (auto-generated if empty) timestamp: Message publication timestamp msg_purpose: Purpose of the message ("ACK", "NACK", "updateStatus", "shutdown", "chat", etc.) sender_name: Name of the sender sender_id: UUID of the sender receiver_name: Name of the receiver (empty means broadcast) receiver_id: UUID of the receiver (empty means broadcast) reply_to: Topic where receiver should reply reply_to_msg_id: Message ID this message is replying to broker_url: NATS broker URL metadata: Optional message-level metadata """ self.correlation_id = correlation_id if correlation_id else self._generate_uuid() self.msg_id = msg_id if msg_id else self._generate_uuid() self.timestamp = timestamp if timestamp else self._get_timestamp() self.send_to = send_to self.msg_purpose = msg_purpose self.sender_name = sender_name self.sender_id = sender_id if sender_id else self._generate_uuid() self.receiver_name = receiver_name self.receiver_id = receiver_id if receiver_id else self._generate_uuid() self.reply_to = reply_to self.reply_to_msg_id = reply_to_msg_id self.broker_url = broker_url self.metadata = metadata if metadata else {} self.payloads = payloads def _generate_uuid(self): """Generate a UUID string.""" return str(uuid.uuid4()) def _get_timestamp(self): """Get current timestamp in ISO format.""" # Simplified timestamp - Micropython may not have full datetime return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime()) def to_json(self): """Convert envelope to JSON string. Returns: str: JSON string representation of the envelope using snake_case field names """ obj = { "correlation_id": self.correlation_id, "msg_id": self.msg_id, "timestamp": self.timestamp, "send_to": self.send_to, "msg_purpose": self.msg_purpose, "sender_name": self.sender_name, "sender_id": self.sender_id, "receiver_name": self.receiver_name, "receiver_id": self.receiver_id, "reply_to": self.reply_to, "reply_to_msg_id": self.reply_to_msg_id, "broker_url": self.broker_url } # Include metadata if not empty if self.metadata: obj["metadata"] = self.metadata # Convert payloads to JSON array if self.payloads: payloads_json = [] for payload in self.payloads: payloads_json.append(payload.to_dict()) obj["payloads"] = payloads_json return json.dumps(obj) def log_trace(correlation_id, message): """Log a trace message with correlation ID and timestamp.""" timestamp = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()) print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message)) def _serialize_data(data, payload_type): """Serialize data according to specified format. This function serializes arbitrary data into a binary representation based on the specified type. It supports multiple serialization formats for different data types. Args: data: Data to serialize - "text": String - "dictionary": JSON-serializable dict - "table": Tabular data (pandas DataFrame or list of dicts) - "image", "audio", "video", "binary": bytes payload_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary") Returns: bytes: Binary representation of the serialized data Example: >>> text_bytes = _serialize_data("Hello World", "text") >>> json_bytes = _serialize_data({"key": "value"}, "dictionary") >>> table_bytes = _serialize_data([{"id": 1, "name": "Alice"}], "table") """ if payload_type == "text": if isinstance(data, str): return data.encode('utf-8') else: raise ValueError("Text data must be a string") elif payload_type == "dictionary": if isinstance(data, dict): json_str = json.dumps(data) return json_str.encode('utf-8') else: raise ValueError("Dictionary data must be a dict") elif payload_type == "table": # Support pandas DataFrame or list of dicts try: import pandas as pd if isinstance(data, pd.DataFrame): # Convert DataFrame to JSON and then to bytes json_str = data.to_json(orient='records', force_ascii=False) return json_str.encode('utf-8') elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): # List of dicts json_str = json.dumps(data) return json_str.encode('utf-8') else: raise ValueError("Table data must be a pandas DataFrame or list of dicts") except ImportError: # Fallback: if pandas not available, treat as list of dicts if isinstance(data, list): json_str = json.dumps(data) return json_str.encode('utf-8') else: raise ValueError("Table data requires pandas DataFrame or list of dicts (pandas not available)") elif payload_type in ("image", "audio", "video", "binary"): if isinstance(data, bytes): return data else: raise ValueError("{} data must be bytes".format(payload_type.capitalize())) else: raise ValueError("Unknown payload_type: {}".format(payload_type)) def _deserialize_data(data_bytes, payload_type, correlation_id): """Deserialize bytes to data based on type. This function converts serialized bytes back to Python data based on type. It handles "text" (string), "dictionary" (JSON deserialization), "table" (JSON deserialization), "image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data). Args: data_bytes: Serialized data as bytes payload_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") correlation_id: Correlation ID for logging Returns: Deserialized data: - "text": str - "dictionary": dict - "table": list of dicts (or pandas DataFrame if available) - "image", "audio", "video", "binary": bytes Example: >>> text_data = _deserialize_data(b"Hello", "text", "corr_id") >>> json_data = _deserialize_data(b'{"key": "value"}', "dictionary", "corr_id") >>> table_data = _deserialize_data(b'[{"id": 1}]', "table", "corr_id") """ if payload_type == "text": return data_bytes.decode('utf-8') elif payload_type == "dictionary": json_str = data_bytes.decode('utf-8') return json.loads(json_str) elif payload_type == "table": # Deserialize table data (JSON format) json_str = data_bytes.decode('utf-8') table_data = json.loads(json_str) # If pandas is available, try to convert to DataFrame try: import pandas as pd return pd.DataFrame(table_data) except ImportError: return table_data elif payload_type in ("image", "audio", "video", "binary"): return data_bytes else: raise ValueError("Unknown payload_type: {}".format(payload_type)) class NATSConnection: """Simple NATS connection for Python and Micropython.""" def __init__(self, url=DEFAULT_BROKER_URL): """Initialize NATS connection. Args: url: NATS server URL (e.g., "nats://localhost:4222") """ self.url = url self.host = "localhost" self.port = 4222 self.conn = None self._parse_url(url) def _parse_url(self, url): """Parse NATS URL to extract host and port.""" if url.startswith("nats://"): url = url[7:] elif url.startswith("tls://"): url = url[6:] if ":" in url: self.host, port_str = url.split(":") self.port = int(port_str) else: self.host = url def connect(self): """Connect to NATS server.""" # Use socket for both Python and Micropython try: import socket addr = socket.getaddrinfo(self.host, self.port)[0][-1] self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.conn.connect(addr) except NameError: # Micropython fallback import usocket addr = usocket.getaddrinfo(self.host, self.port)[0][-1] self.conn = usocket.socket() self.conn.connect(addr) log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port)) def publish(self, subject, message): """Publish a message to a NATS subject. Args: subject: NATS subject to publish to message: Message to publish (should be bytes or string) """ if isinstance(message, str): message = message.encode('utf-8') # Simple NATS protocol implementation msg = "PUB {} {}\r\n".format(subject, len(message)) msg = msg.encode('utf-8') + message + b"\r\n" try: import socket self.conn.send(msg) except NameError: # Micropython fallback import usocket self.conn.send(msg) log_trace("", "Message published to {}".format(subject)) def subscribe(self, subject, callback): """Subscribe to a NATS subject. Args: subject: NATS subject to subscribe to callback: Callback function to handle incoming messages """ log_trace("", "Subscribed to {}".format(subject)) # Simplified subscription - in a real implementation, you'd handle SUB/PUB messages # For Micropython, we'll use a simple polling approach self.subscribed_subject = subject self.subscription_callback = callback def wait_message(self, timeout=1000): """Wait for incoming message. Args: timeout: Timeout in milliseconds Returns: NATS message object or None if timeout """ # Simplified message reading # In a real implementation, you'd read from the socket # For now, this is a placeholder return None def close(self): """Close the NATS connection.""" if self.conn: self.conn.close() self.conn = None log_trace("", "NATS connection closed") def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""): """Fetch data from URL with exponential backoff. This function retrieves data from a URL with retry logic using exponential backoff to handle transient failures. Args: url: URL to fetch from max_retries: Maximum number of retry attempts (default: 5) base_delay: Initial delay in milliseconds (default: 100) max_delay: Maximum delay in milliseconds (default: 5000) correlation_id: Correlation ID for logging Returns: bytes: Fetched data Raises: Exception: If all retry attempts fail Example: >>> data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "corr_id") """ delay = base_delay for attempt in range(1, max_retries + 1): try: # Simple HTTP GET request # Try urequests for Micropython first, then requests for Python try: import urequests response = urequests.get(url) status_code = response.status_code content = response.content except ImportError: try: import requests response = requests.get(url) response.raise_for_status() status_code = response.status_code content = response.content except ImportError: raise Exception("No HTTP library available (urequests or requests)") if status_code == 200: log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt)) return content else: raise Exception("Failed to fetch: {}".format(status_code)) except Exception as e: log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e))) if attempt < max_retries: time.sleep(delay / 1000.0) delay = min(delay * 2, max_delay) raise Exception("Failed to fetch data after {} attempts".format(max_retries)) def plik_oneshot_upload(fileserver_url, dataname, data): """Upload a single file to a plik server using one-shot mode. This function uploads raw byte data to a plik server in one-shot mode (no upload session). It first creates a one-shot upload session by sending a POST request with {"OneShot": true}, retrieves an upload ID and token, then uploads the file data as multipart form data using the token. Args: fileserver_url: Base URL of the plik server (e.g., "http://localhost:8080") dataname: Name of the file being uploaded data: Raw byte data of the file content Returns: dict: Dictionary with keys: - "status": HTTP server response status - "uploadid": ID of the one-shot upload session - "fileid": ID of the uploaded file within the session - "url": Full URL to download the uploaded file Example: >>> result = plik_oneshot_upload("http://localhost:8080", "test.txt", b"hello world") >>> result["status"], result["uploadid"], result["fileid"], result["url"] """ import json try: import urequests except ImportError: import requests as urequests # Get upload ID url_get_upload_id = "{}/upload".format(fileserver_url) headers = {"Content-Type": "application/json"} body = json.dumps({"OneShot": True}) response = urequests.post(url_get_upload_id, headers=headers, data=body) response_json = json.loads(response.text if hasattr(response, 'text') else response.content) uploadid = response_json.get("id") uploadtoken = response_json.get("uploadToken") # Upload file url_upload = "{}/file/{}".format(fileserver_url, uploadid) headers = {"X-UploadToken": uploadtoken} # For Micropython, we need to construct the multipart form data manually # This is a simplified approach boundary = "----WebKitFormBoundary{}".format(uuid.uuid4().hex[:16]) # Create multipart body part1 = "--{}\r\n".format(boundary) part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(dataname) part1 += "Content-Type: application/octet-stream\r\n\r\n" part1_bytes = part1.encode('utf-8') part2 = "\r\n--{}--".format(boundary) part2_bytes = part2.encode('utf-8') # Combine all parts full_body = part1_bytes + data + part2_bytes # Set content type with boundary content_type = "multipart/form-data; boundary={}".format(boundary) response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body) response_json = json.loads(response.text if hasattr(response, 'text') else response.content) fileid = response_json.get("id") url = "{}/file/{}/{}".format(fileserver_url, uploadid, dataname) return { "status": response.status_code, "uploadid": uploadid, "fileid": fileid, "url": url } def smartsend(subject, data, broker_url=DEFAULT_BROKER_URL, fileserver_url=DEFAULT_FILESERVER_URL, fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD, correlation_id=None, msg_purpose="chat", sender_name="NATSBridge", receiver_name="", receiver_id="", reply_to="", reply_to_msg_id="", is_publish=True): """Send data either directly via NATS or via a fileserver URL, depending on payload size. This function intelligently routes data delivery based on payload size relative to a threshold. If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS. Otherwise, it uploads the data to a fileserver and publishes only the download URL over NATS. Args: subject: NATS subject to publish the message to data: List of (dataname, data, payload_type) tuples to send - dataname: Name of the payload - data: The actual data to send - payload_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary") broker_url: URL of the NATS server fileserver_url: URL of the HTTP file server fileserver_upload_handler: Function to handle fileserver uploads (must return dict with "status", "uploadid", "fileid", "url" keys) size_threshold: Threshold in bytes separating direct vs link transport (default: 1MB) correlation_id: Optional correlation ID for tracing; if None, a UUID is generated msg_purpose: Purpose of the message ("ACK", "NACK", "updateStatus", "shutdown", "chat", etc.) sender_name: Name of the sender receiver_name: Name of the receiver (empty string means broadcast) receiver_id: UUID of the receiver (empty string means broadcast) reply_to: Topic to reply to (empty string if no reply expected) reply_to_msg_id: Message ID this message is replying to is_publish: Whether to automatically publish the message to NATS (default: True) - When True: message is published to NATS - When False: returns envelope and JSON string without publishing Returns: tuple: (env, env_json_str) where: - env: MessageEnvelope object with all metadata and payloads - env_json_str: JSON string representation of the envelope for publishing Example: >>> data = [("message", "Hello World!", "text")] >>> env, env_json_str = smartsend("/test", data) >>> # env: MessageEnvelope with all metadata and payloads >>> # env_json_str: JSON string for publishing """ # Generate correlation ID if not provided cid = correlation_id if correlation_id is not None else str(uuid.uuid4()) log_trace(cid, "Starting smartsend for subject: {}".format(subject)) # Generate message metadata msg_id = str(uuid.uuid4()) # Process each payload in the list payloads = [] for dataname, payload_data, payload_type in data: # Serialize data based on type payload_bytes = _serialize_data(payload_data, payload_type) payload_size = len(payload_bytes) log_trace(cid, "Serialized payload '{}' (payload_type: {}) size: {} bytes".format( dataname, payload_type, payload_size)) # Decision: Direct vs Link if payload_size < size_threshold: # Direct path - Base64 encode and send via NATS # Convert to base64 string for JSON try: import ubinascii payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip() except ImportError: import base64 payload_b64_str = base64.b64encode(payload_bytes).decode('utf-8') log_trace(cid, "Using direct transport for {} bytes".format(payload_size)) # Create MessagePayload for direct transport payload = MessagePayload( payload_b64_str, payload_type, id=str(uuid.uuid4()), dataname=dataname, transport="direct", encoding="base64", size=payload_size, metadata={"payload_bytes": payload_size} ) payloads.append(payload) else: # Link path - Upload to HTTP server, send URL via NATS log_trace(cid, "Using link transport, uploading to fileserver") # Upload to HTTP server response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) if response.get("status") != 200: raise Exception("Failed to upload data to fileserver: {}".format(response.get("status"))) url = response.get("url") log_trace(cid, "Uploaded to URL: {}".format(url)) # Create MessagePayload for link transport payload = MessagePayload( url, payload_type, id=str(uuid.uuid4()), dataname=dataname, transport="link", encoding="none", size=payload_size, metadata={} ) payloads.append(payload) # Create MessageEnvelope with all payloads env = MessageEnvelope( subject, payloads, correlation_id=cid, msg_id=msg_id, msg_purpose=msg_purpose, sender_name=sender_name, sender_id=str(uuid.uuid4()), receiver_name=receiver_name, receiver_id=receiver_id, reply_to=reply_to, reply_to_msg_id=reply_to_msg_id, broker_url=broker_url, metadata={} ) msg_json = env.to_json() # Publish to NATS if is_publish is True if is_publish: nats_conn = NATSConnection(broker_url) nats_conn.connect() nats_conn.publish(subject, msg_json) nats_conn.close() # Return tuple of (envelope, json_string) for both direct and link transport return (env, msg_json) def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000): """Receive and process messages from NATS. This function processes incoming NATS messages, handling both direct transport (base64 decoded payloads) and link transport (URL-based payloads). Args: msg: NATS message to process (dict or JSON string with envelope data) fileserver_download_handler: Function to handle downloading data from file server URLs Receives: (url, max_retries, base_delay, max_delay, correlation_id) Returns: bytes (the downloaded data) max_retries: Maximum retry attempts for fetching URL (default: 5) base_delay: Initial delay for exponential backoff in ms (default: 100) max_delay: Maximum delay for exponential backoff in ms (default: 5000) Returns: dict: Envelope dictionary with metadata and 'payloads' field containing list of (dataname, data, payload_type) tuples Example: >>> env = smartreceive(msg) >>> # env contains envelope metadata and payloads field >>> # env["payloads"] = [(dataname1, data1, payload_type1), ...] >>> for dataname, data, payload_type in env["payloads"]: ... print("Received {} of type {}: {}".format(dataname, payload_type, data)) """ # Parse the JSON envelope json_data = msg if isinstance(msg, dict) else json.loads(msg) correlation_id = json_data.get("correlation_id", "") log_trace(correlation_id, "Processing received message") # Process all payloads in the envelope payloads_list = [] # Get number of payloads num_payloads = len(json_data.get("payloads", [])) for i in range(num_payloads): payload = json_data["payloads"][i] transport = payload.get("transport", "") dataname = payload.get("dataname", "") if transport == "direct": log_trace(correlation_id, "Direct transport - decoding payload '{}'".format(dataname)) # Extract base64 payload from the payload payload_b64 = payload.get("data", "") # Decode Base64 payload try: import ubinascii payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8')) except ImportError: import base64 payload_bytes = base64.b64decode(payload_b64) # Deserialize based on type payload_type = payload.get("payload_type", "") data = _deserialize_data(payload_bytes, payload_type, correlation_id) payloads_list.append((dataname, data, payload_type)) elif transport == "link": # Extract download URL from the payload url = payload.get("data", "") log_trace(correlation_id, "Link transport - fetching '{}' from URL: {}".format(dataname, url)) # Fetch with exponential backoff downloaded_data = fileserver_download_handler( url, max_retries, base_delay, max_delay, correlation_id ) # Deserialize based on type payload_type = payload.get("payload_type", "") data = _deserialize_data(downloaded_data, payload_type, correlation_id) payloads_list.append((dataname, data, payload_type)) else: raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport)) # Replace payloads field with the processed list of (dataname, data, payload_type) tuples json_data["payloads"] = payloads_list return json_data # Utility functions def generate_uuid(): """Generate a UUID string.""" return str(uuid.uuid4()) def get_timestamp(): """Get current timestamp in ISO format.""" return time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()) # Example usage if __name__ == "__main__": print("NATSBridge - Bi-Directional Data Bridge") print("=======================================") print("This module provides:") print(" - MessageEnvelope: Message envelope structure with snake_case fields") print(" - MessagePayload: Payload structure with payload_type field") print(" - smartsend: Send data via NATS with automatic transport selection") print(" - smartreceive: Receive and process messages from NATS") print(" - plik_oneshot_upload: Upload files to HTTP file server") print(" - _fetch_with_backoff: Fetch data from URLs with retry logic") print() print("Usage:") print(" from nats_bridge import smartsend, smartreceive") print() print(" # Send data (list of (dataname, data, payload_type) tuples)") print(" data = [(\"message\", \"Hello World!\", \"text\")]") print(" env, env_json_str = smartsend(\"my.subject\", data)") print() print(" # On receiver:") print(" env = smartreceive(msg)") print(" for dataname, data, payload_type in env[\"payloads\"]:") print(" print(\"Received {} of type {}: {}\".format(dataname, payload_type, data))")