""" Python Mix Payloads Sender Test Tests the smartpack function with mixed payload types """ import asyncio import sys import os import base64 # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from msghandler import smartpack, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL TEST_SUBJECT = '/test/mix' TEST_BROKER_URL = os.environ.get('NATS_URL', 'nats://localhost:4222') TEST_FILESERVER_URL = os.environ.get('FILESERVER_URL', 'http://localhost:8080') async def run_test(): print('=== Python Mix Payloads Sender Test ===\n') correlation_id = 'py-mix-test-' + str(asyncio.get_event_loop().time() * 1000000) print(f'Correlation ID: {correlation_id}') print(f'Subject: {TEST_SUBJECT}') print(f'Broker URL: {TEST_BROKER_URL}\n') # Test data - mixed payload types text_data = 'Hello, msghandler!' dict_data = {'key1': 'value1', 'key2': 42, 'nested': {'a': 1, 'b': 2}} image_data = bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) # PNG header # Table data try: import pandas as pd table_data = pd.DataFrame({ 'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie'], 'age': [30, 25, 35] }) table_available = True except ImportError: table_available = False table_data = None test_data = [ ('message', text_data, 'text'), ('config', dict_data, 'dictionary'), ('image', image_data, 'image') ] if table_available: test_data.append(('users', table_data, 'table')) try: # Send the message print('Sending mixed payloads...') env, env_json_str = await smartpack( TEST_SUBJECT, test_data, broker_url=TEST_BROKER_URL, fileserver_url=TEST_FILESERVER_URL, correlation_id=correlation_id, msg_purpose='test', sender_name='py-mix-test', is_publish=False ) print('\n=== Envelope Created ===') print(f'Correlation ID: {env["correlation_id"]}') print(f'Message ID: {env["msg_id"]}') print(f'Timestamp: {env["timestamp"]}') print(f'Subject: {env["send_to"]}') print(f'Purpose: {env["msg_purpose"]}') print(f'Sender: {env["sender_name"]}') print(f'Payloads: {len(env["payloads"])}\n') # Validate envelope structure print('=== Validation ===') passed = True expected_count = 4 if table_available else 3 if len(env['payloads']) != expected_count: print(f'❌ Expected {expected_count} payloads, got {len(env["payloads"])}') passed = False else: print('✅ Correct number of payloads') # Test each payload expected_datanames = ['message', 'config', 'image'] expected_types = ['text', 'dictionary', 'image'] expected_data = [text_data, dict_data, image_data] if table_available: expected_datanames.append('users') expected_types.append('table') for i in range(len(env['payloads'])): payload = env['payloads'][i] if payload['dataname'] != expected_datanames[i]: print(f"❌ Payload {i + 1}: Expected dataname '{expected_datanames[i]}', got '{payload['dataname']}'") passed = False else: print(f'✅ Payload {i + 1}: Correct dataname') if payload['payload_type'] != expected_types[i]: print(f"❌ Payload {i + 1}: Expected type '{expected_types[i]}', got '{payload['payload_type']}'") passed = False else: print(f'✅ Payload {i + 1}: Correct type') if payload['transport'] != 'direct': print(f"❌ Payload {i + 1}: Expected transport 'direct', got '{payload['transport']}'") passed = False else: print(f'✅ Payload {i + 1}: Correct transport') if payload['encoding'] != 'base64': print(f"❌ Payload {i + 1}: Expected encoding 'base64', got '{payload['encoding']}'") passed = False else: print(f'✅ Payload {i + 1}: Correct encoding') # Verify data integrity based on type decoded_data = base64.b64decode(payload['data']) if expected_types[i] == 'text': decoded_text = decoded_data.decode('utf8') if decoded_text != expected_data[i]: print(f'❌ Payload {i + 1}: Data integrity mismatch') passed = False else: print(f'✅ Payload {i + 1}: Data integrity verified') elif expected_types[i] == 'dictionary': import json decoded_dict = json.loads(decoded_data.decode('utf8')) if json.dumps(decoded_dict, sort_keys=True) != json.dumps(expected_data[i], sort_keys=True): print(f'❌ Payload {i + 1}: Data integrity mismatch') passed = False else: print(f'✅ Payload {i + 1}: Data integrity verified') elif expected_types[i] == 'image': if decoded_data != expected_data[i]: print(f'❌ Payload {i + 1}: Data integrity mismatch') passed = False else: print(f'✅ Payload {i + 1}: Data integrity verified') elif expected_types[i] == 'table': if len(decoded_data) > 0: print(f'✅ Payload {i + 1}: Arrow IPC data present ({len(decoded_data)} bytes)') else: print(f'❌ Payload {i + 1}: Arrow IPC data is empty') passed = False print(f' Size: {payload["size"]} bytes\n') # Test with chat-like payload (text + image + audio) print('=== Chat-like Payload Test ===') chat_data = [ ('text', 'Hello!', 'text'), ('image', bytes([0xFF, 0xD8, 0xFF, 0xE0]), 'image'), ('audio', bytes([0x46, 0x4C, 0x41, 0x43]), 'audio') ] chat_env, _ = await smartpack( TEST_SUBJECT, chat_data, broker_url=TEST_BROKER_URL, fileserver_url=TEST_FILESERVER_URL, correlation_id='chat-' + correlation_id, is_publish=False ) if len(chat_env['payloads']) == 3: print('✅ Chat-like payloads handled correctly') else: print('❌ Chat-like payloads handling failed') passed = False # Final result print('\n=== Test Result ===') if passed: print('✅ ALL TESTS PASSED') sys.exit(0) else: print('❌ SOME TESTS FAILED') sys.exit(1) except Exception as e: print(f'❌ Test failed with error: {e}') import traceback traceback.print_exc() sys.exit(1) if __name__ == '__main__': asyncio.run(run_test())