#!/usr/bin/env python3 """ Test script for mixed payload testing - Receiver Tests receiving mixed payload types via NATS using nats_bridge.py smartreceive """ import sys import os import json # Add src to path for import sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) from nats_bridge import smartreceive, log_trace import nats import asyncio # Configuration SUBJECT = "/NATSBridge_mixed_test" NATS_URL = "nats://nats.yiem.cc:4222" async def main(): log_trace("", f"Starting mixed payload receiver test...") log_trace("", f"Note: This receiver will wait for messages from the sender.") log_trace("", f"Run test_micropython_mixed_sender.py first to send test data.") # Connect to NATS nc = await nats.connect(NATS_URL) log_trace("", f"Connected to NATS at {NATS_URL}") # Subscribe to the subject async def message_handler(msg): log_trace("", f"Received message on {msg.subject}") # Use smartreceive to handle the data result = smartreceive(msg.data) log_trace(result.get("correlationId", ""), f"Received envelope with {len(result['payloads'])} payloads") # Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples for dataname, data, data_type in result["payloads"]: log_trace(result.get("correlationId", ""), f"\n--- Payload: {dataname} (type: {data_type}) ---") if isinstance(data, str): log_trace(result.get("correlationId", ""), f" Type: text/string") log_trace(result.get("correlationId", ""), f" Length: {len(data)} characters") if len(data) <= 100: log_trace(result.get("correlationId", ""), f" Content: {data}") else: log_trace(result.get("correlationId", ""), f" First 100 chars: {data[:100]}...") # Save to file output_path = f"./received_{dataname}.txt" with open(output_path, 'w') as f: f.write(data) log_trace(result.get("correlationId", ""), f" Saved to: {output_path}") elif isinstance(data, dict): log_trace(result.get("correlationId", ""), f" Type: dictionary") log_trace(result.get("correlationId", ""), f" Keys: {list(data.keys())}") log_trace(result.get("correlationId", ""), f" Content: {json.dumps(data, indent=2)}") # Save to file output_path = f"./received_{dataname}.json" with open(output_path, 'w') as f: json.dump(data, f, indent=2) log_trace(result.get("correlationId", ""), f" Saved to: {output_path}") elif isinstance(data, bytes): log_trace(result.get("correlationId", ""), f" Type: binary") log_trace(result.get("correlationId", ""), f" Size: {len(data)} bytes") log_trace(result.get("correlationId", ""), f" First 100 bytes (hex): {data[:100].hex()}") # Save to file output_path = f"./received_{dataname}.bin" with open(output_path, 'wb') as f: f.write(data) log_trace(result.get("correlationId", ""), f" Saved to: {output_path}") else: log_trace(result.get("correlationId", ""), f" Received unexpected data type: {type(data)}") # Log envelope metadata log_trace(result.get("correlationId", ""), f"\n--- Envelope Metadata ---") log_trace(result.get("correlationId", ""), f" Correlation ID: {result.get('correlationId', 'N/A')}") log_trace(result.get("correlationId", ""), f" Message ID: {result.get('msgId', 'N/A')}") log_trace(result.get("correlationId", ""), f" Sender: {result.get('senderName', 'N/A')}") log_trace(result.get("correlationId", ""), f" Purpose: {result.get('msgPurpose', 'N/A')}") sid = await nc.subscribe(SUBJECT, cb=message_handler) log_trace("", f"Subscribed to {SUBJECT} with subscription ID: {sid}") # Keep listening for 120 seconds await asyncio.sleep(120) await nc.close() log_trace("", "Test completed.") if __name__ == "__main__": asyncio.run(main())