""" Python Text Sender Test Tests the smartsend function with text payloads """ import asyncio import sys import os # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from natsbridge import smartsend, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL TEST_SUBJECT = '/test/text' TEST_BROKER_URL = os.environ.get('NATS_URL', 'nats://localhost:4222') TEST_FILESERVER_URL = os.environ.get('FILESERVER_URL', 'http://localhost:8080') async def run_test(): print('=== Python Text Sender Test ===\n') correlation_id = 'py-text-test-' + str(asyncio.get_event_loop().time() * 1000000) print(f'Correlation ID: {correlation_id}') print(f'Subject: {TEST_SUBJECT}') print(f'Broker URL: {TEST_BROKER_URL}\n') # Test data text_data = 'Hello, NATSBridge! This is a test message.' test_data = [ ('message', text_data, 'text') ] try: # Send the message print('Sending text payload...') env, env_json_str = await smartsend( TEST_SUBJECT, test_data, broker_url=TEST_BROKER_URL, fileserver_url=TEST_FILESERVER_URL, correlation_id=correlation_id, msg_purpose='test', sender_name='py-text-test', is_publish=False # Don't actually publish for this test ) print('\n=== Envelope Created ===') print(f'Correlation ID: {env["correlation_id"]}') print(f'Message ID: {env["msg_id"]}') print(f'Timestamp: {env["timestamp"]}') print(f'Subject: {env["send_to"]}') print(f'Purpose: {env["msg_purpose"]}') print(f'Sender: {env["sender_name"]}') print(f'Payloads: {len(env["payloads"])}\n') # Validate envelope structure print('=== Validation ===') passed = True if not env.get('correlation_id'): print('❌ correlation_id is missing') passed = False else: print('✅ correlation_id present') if not env.get('msg_id'): print('❌ msg_id is missing') passed = False else: print('✅ msg_id present') if not env.get('timestamp'): print('❌ timestamp is missing') passed = False else: print('✅ timestamp present') if len(env['payloads']) != 1: print(f'❌ Expected 1 payload, got {len(env["payloads"])}') passed = False else: print('✅ Correct number of payloads') payload = env['payloads'][0] if payload['dataname'] != 'message': print(f"❌ Expected dataname 'message', got '{payload['dataname']}'") passed = False else: print('✅ Correct dataname') if payload['payload_type'] != 'text': print(f"❌ Expected payload_type 'text', got '{payload['payload_type']}'") passed = False else: print('✅ Correct payload_type') if payload['transport'] != 'direct': print(f"❌ Expected transport 'direct', got '{payload['transport']}'") passed = False else: print('✅ Correct transport') if payload['encoding'] != 'base64': print(f"❌ Expected encoding 'base64', got '{payload['encoding']}'") passed = False else: print('✅ Correct encoding') # Decode and verify the data import base64 decoded_data = base64.b64decode(payload['data']).decode('utf8') if decoded_data != text_data: print('❌ Decoded data mismatch') print(f' Expected: {text_data}') print(f' Got: {decoded_data}') passed = False else: print('✅ Data integrity verified') print(f"\nPayload size: {payload['size']} bytes") print(f'Base64 data length: {len(payload["data"])} chars') # Test with multiple text payloads print('\n=== Multiple Text Payloads Test ===') multi_test_data = [ ('msg1', 'First message', 'text'), ('msg2', 'Second message', 'text'), ('msg3', 'Third message', 'text') ] multi_env, _ = await smartsend( TEST_SUBJECT, multi_test_data, broker_url=TEST_BROKER_URL, fileserver_url=TEST_FILESERVER_URL, correlation_id='multi-test-' + correlation_id, is_publish=False ) if len(multi_env['payloads']) == 3: print('✅ Multiple payloads handled correctly') else: print(f"❌ Expected 3 payloads, got {len(multi_env['payloads'])}") passed = False # Final result print('\n=== Test Result ===') if passed: print('✅ ALL TESTS PASSED') sys.exit(0) else: print('❌ SOME TESTS FAILED') sys.exit(1) except Exception as e: print(f'❌ Test failed with error: {e}') import traceback traceback.print_exc() sys.exit(1) if __name__ == '__main__': asyncio.run(run_test())