Files
NATSBridge/test/test_py_dictionary_sender.py
2026-03-05 20:17:36 +07:00

172 lines
6.5 KiB
Python

"""
Python Dictionary Sender Test
Tests the smartsend function with dictionary payloads
"""
import asyncio
import sys
import os
import json
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from natbridge import smartsend, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL
TEST_SUBJECT = '/test/dictionary'
TEST_BROKER_URL = os.environ.get('NATS_URL', 'nats://localhost:4222')
TEST_FILESERVER_URL = os.environ.get('FILESERVER_URL', 'http://localhost:8080')
async def run_test():
print('=== Python Dictionary Sender Test ===\n')
correlation_id = 'py-dict-test-' + str(asyncio.get_event_loop().time() * 1000000)
print(f'Correlation ID: {correlation_id}')
print(f'Subject: {TEST_SUBJECT}')
print(f'Broker URL: {TEST_BROKER_URL}\n')
# Test data - various dictionary structures
test_data = [
('simple_dict', {'key1': 'value1', 'key2': 'value2'}, 'dictionary'),
('nested_dict', {'outer': {'inner': 'value', 'number': 42}}, 'dictionary'),
('array_dict', {'items': [1, 2, 3, 'four', 'five']}, 'dictionary'),
('mixed_dict', {'string': 'text', 'number': 123, 'boolean': True, 'null_val': None}, 'dictionary')
]
try:
# Send the message
print('Sending dictionary payloads...')
env, env_json_str = await smartsend(
TEST_SUBJECT,
test_data,
broker_url=TEST_BROKER_URL,
fileserver_url=TEST_FILESERVER_URL,
correlation_id=correlation_id,
msg_purpose='test',
sender_name='py-dict-test',
is_publish=False
)
print('\n=== Envelope Created ===')
print(f'Correlation ID: {env["correlation_id"]}')
print(f'Message ID: {env["msg_id"]}')
print(f'Timestamp: {env["timestamp"]}')
print(f'Subject: {env["send_to"]}')
print(f'Purpose: {env["msg_purpose"]}')
print(f'Sender: {env["sender_name"]}')
print(f'Payloads: {len(env["payloads"])}\n')
# Validate envelope structure
print('=== Validation ===')
passed = True
if len(env['payloads']) != 4:
print(f'❌ Expected 4 payloads, got {len(env["payloads"])}')
passed = False
else:
print('✅ Correct number of payloads')
# Test each payload
expected_datanames = ['simple_dict', 'nested_dict', 'array_dict', 'mixed_dict']
expected_types = ['dictionary', 'dictionary', 'dictionary', 'dictionary']
for i in range(len(env['payloads'])):
payload = env['payloads'][i]
if payload['dataname'] != expected_datanames[i]:
print(f"❌ Payload {i + 1}: Expected dataname '{expected_datanames[i]}', got '{payload['dataname']}'")
passed = False
else:
print(f'✅ Payload {i + 1}: Correct dataname')
if payload['payload_type'] != expected_types[i]:
print(f"❌ Payload {i + 1}: Expected type '{expected_types[i]}', got '{payload['payload_type']}'")
passed = False
else:
print(f'✅ Payload {i + 1}: Correct type')
if payload['transport'] != 'direct':
print(f"❌ Payload {i + 1}: Expected transport 'direct', got '{payload['transport']}'")
passed = False
else:
print(f'✅ Payload {i + 1}: Correct transport')
if payload['encoding'] != 'base64':
print(f"❌ Payload {i + 1}: Expected encoding 'base64', got '{payload['encoding']}'")
passed = False
else:
print(f'✅ Payload {i + 1}: Correct encoding')
# Decode and verify the data
import base64
decoded_data = json.loads(base64.b64decode(payload['data']).decode('utf8'))
original_data = test_data[i][1]
# Normalize for comparison (None vs null, True vs true, etc.)
if json.dumps(decoded_data, sort_keys=True) != json.dumps(original_data, sort_keys=True):
print(f'❌ Payload {i + 1}: Data integrity mismatch')
print(f' Expected: {json.dumps(original_data)}')
print(f' Got: {json.dumps(decoded_data)}')
passed = False
else:
print(f'✅ Payload {i + 1}: Data integrity verified')
print(f' Size: {payload["size"]} bytes\n')
# Test round-trip serialization
print('=== Round-trip Serialization Test ===')
round_trip_data = [
('roundtrip', {'test': 'data', 'numbers': [1, 2, 3], 'nested': {'a': 1, 'b': 2}}, 'dictionary')
]
rt_env, _ = await smartsend(
TEST_SUBJECT,
round_trip_data,
broker_url=TEST_BROKER_URL,
fileserver_url=TEST_FILESERVER_URL,
correlation_id='roundtrip-' + correlation_id,
is_publish=False
)
rt_payload = rt_env['payloads'][0]
rt_decoded = json.loads(base64.b64decode(rt_payload['data']).decode('utf8'))
if json.dumps(rt_decoded, sort_keys=True) == json.dumps(round_trip_data[0][1], sort_keys=True):
print('✅ Round-trip serialization successful')
else:
print('❌ Round-trip serialization failed')
passed = False
# Test JSON string output
print('\n=== JSON String Output Test ===')
try:
parsed = json.loads(env_json_str)
if parsed['correlation_id'] == env['correlation_id'] and \
len(parsed['payloads']) == len(env['payloads']):
print('✅ JSON string is valid and matches envelope')
else:
print('❌ JSON string does not match envelope')
passed = False
except json.JSONDecodeError as e:
print(f'❌ JSON string is invalid: {e}')
passed = False
# Final result
print('\n=== Test Result ===')
if passed:
print('✅ ALL TESTS PASSED')
sys.exit(0)
else:
print('❌ SOME TESTS FAILED')
sys.exit(1)
except Exception as e:
print(f'❌ Test failed with error: {e}')
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
asyncio.run(run_test())