97 lines
4.3 KiB
Python
97 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script for mixed payload testing - Receiver
|
|
Tests receiving mixed payload types via NATS using nats_bridge.py smartreceive
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
|
|
# Add src to path for import
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
|
|
|
|
from nats_bridge import smartreceive, log_trace
|
|
import nats
|
|
import asyncio
|
|
|
|
# Configuration
|
|
SUBJECT = "/NATSBridge_mixed_test"
|
|
NATS_URL = "nats://nats.yiem.cc:4222"
|
|
|
|
|
|
async def main():
|
|
log_trace("", f"Starting mixed payload receiver test...")
|
|
log_trace("", f"Note: This receiver will wait for messages from the sender.")
|
|
log_trace("", f"Run test_micropython_mixed_sender.py first to send test data.")
|
|
|
|
# Connect to NATS
|
|
nc = await nats.connect(NATS_URL)
|
|
log_trace("", f"Connected to NATS at {NATS_URL}")
|
|
|
|
# Subscribe to the subject
|
|
async def message_handler(msg):
|
|
log_trace("", f"Received message on {msg.subject}")
|
|
|
|
# Use smartreceive to handle the data
|
|
result = smartreceive(msg.data)
|
|
|
|
log_trace(result.get("correlationId", ""), f"Received envelope with {len(result['payloads'])} payloads")
|
|
|
|
# Result is an envelope dictionary with payloads field containing list of (dataname, data, data_type) tuples
|
|
for dataname, data, data_type in result["payloads"]:
|
|
log_trace(result.get("correlationId", ""), f"\n--- Payload: {dataname} (type: {data_type}) ---")
|
|
|
|
if isinstance(data, str):
|
|
log_trace(result.get("correlationId", ""), f" Type: text/string")
|
|
log_trace(result.get("correlationId", ""), f" Length: {len(data)} characters")
|
|
if len(data) <= 100:
|
|
log_trace(result.get("correlationId", ""), f" Content: {data}")
|
|
else:
|
|
log_trace(result.get("correlationId", ""), f" First 100 chars: {data[:100]}...")
|
|
# Save to file
|
|
output_path = f"./received_{dataname}.txt"
|
|
with open(output_path, 'w') as f:
|
|
f.write(data)
|
|
log_trace(result.get("correlationId", ""), f" Saved to: {output_path}")
|
|
|
|
elif isinstance(data, dict):
|
|
log_trace(result.get("correlationId", ""), f" Type: dictionary")
|
|
log_trace(result.get("correlationId", ""), f" Keys: {list(data.keys())}")
|
|
log_trace(result.get("correlationId", ""), f" Content: {json.dumps(data, indent=2)}")
|
|
# Save to file
|
|
output_path = f"./received_{dataname}.json"
|
|
with open(output_path, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
log_trace(result.get("correlationId", ""), f" Saved to: {output_path}")
|
|
|
|
elif isinstance(data, bytes):
|
|
log_trace(result.get("correlationId", ""), f" Type: binary")
|
|
log_trace(result.get("correlationId", ""), f" Size: {len(data)} bytes")
|
|
log_trace(result.get("correlationId", ""), f" First 100 bytes (hex): {data[:100].hex()}")
|
|
# Save to file
|
|
output_path = f"./received_{dataname}.bin"
|
|
with open(output_path, 'wb') as f:
|
|
f.write(data)
|
|
log_trace(result.get("correlationId", ""), f" Saved to: {output_path}")
|
|
else:
|
|
log_trace(result.get("correlationId", ""), f" Received unexpected data type: {type(data)}")
|
|
|
|
# Log envelope metadata
|
|
log_trace(result.get("correlationId", ""), f"\n--- Envelope Metadata ---")
|
|
log_trace(result.get("correlationId", ""), f" Correlation ID: {result.get('correlationId', 'N/A')}")
|
|
log_trace(result.get("correlationId", ""), f" Message ID: {result.get('msgId', 'N/A')}")
|
|
log_trace(result.get("correlationId", ""), f" Sender: {result.get('senderName', 'N/A')}")
|
|
log_trace(result.get("correlationId", ""), f" Purpose: {result.get('msgPurpose', 'N/A')}")
|
|
|
|
sid = await nc.subscribe(SUBJECT, cb=message_handler)
|
|
log_trace("", f"Subscribed to {SUBJECT} with subscription ID: {sid}")
|
|
|
|
# Keep listening for 120 seconds
|
|
await asyncio.sleep(120)
|
|
await nc.close()
|
|
log_trace("", "Test completed.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |