172 lines
6.5 KiB
Python
172 lines
6.5 KiB
Python
"""
|
|
Python Dictionary Sender Test
|
|
Tests the smartsend function with dictionary payloads
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
import json
|
|
|
|
# Add parent directory to path
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from natbridge import smartsend, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL
|
|
|
|
TEST_SUBJECT = '/test/dictionary'
|
|
TEST_BROKER_URL = os.environ.get('NATS_URL', 'nats://localhost:4222')
|
|
TEST_FILESERVER_URL = os.environ.get('FILESERVER_URL', 'http://localhost:8080')
|
|
|
|
|
|
async def run_test():
|
|
print('=== Python Dictionary Sender Test ===\n')
|
|
|
|
correlation_id = 'py-dict-test-' + str(asyncio.get_event_loop().time() * 1000000)
|
|
print(f'Correlation ID: {correlation_id}')
|
|
print(f'Subject: {TEST_SUBJECT}')
|
|
print(f'Broker URL: {TEST_BROKER_URL}\n')
|
|
|
|
# Test data - various dictionary structures
|
|
test_data = [
|
|
('simple_dict', {'key1': 'value1', 'key2': 'value2'}, 'dictionary'),
|
|
('nested_dict', {'outer': {'inner': 'value', 'number': 42}}, 'dictionary'),
|
|
('array_dict', {'items': [1, 2, 3, 'four', 'five']}, 'dictionary'),
|
|
('mixed_dict', {'string': 'text', 'number': 123, 'boolean': True, 'null_val': None}, 'dictionary')
|
|
]
|
|
|
|
try:
|
|
# Send the message
|
|
print('Sending dictionary payloads...')
|
|
env, env_json_str = await smartsend(
|
|
TEST_SUBJECT,
|
|
test_data,
|
|
broker_url=TEST_BROKER_URL,
|
|
fileserver_url=TEST_FILESERVER_URL,
|
|
correlation_id=correlation_id,
|
|
msg_purpose='test',
|
|
sender_name='py-dict-test',
|
|
is_publish=False
|
|
)
|
|
|
|
print('\n=== Envelope Created ===')
|
|
print(f'Correlation ID: {env["correlation_id"]}')
|
|
print(f'Message ID: {env["msg_id"]}')
|
|
print(f'Timestamp: {env["timestamp"]}')
|
|
print(f'Subject: {env["send_to"]}')
|
|
print(f'Purpose: {env["msg_purpose"]}')
|
|
print(f'Sender: {env["sender_name"]}')
|
|
print(f'Payloads: {len(env["payloads"])}\n')
|
|
|
|
# Validate envelope structure
|
|
print('=== Validation ===')
|
|
passed = True
|
|
|
|
if len(env['payloads']) != 4:
|
|
print(f'❌ Expected 4 payloads, got {len(env["payloads"])}')
|
|
passed = False
|
|
else:
|
|
print('✅ Correct number of payloads')
|
|
|
|
# Test each payload
|
|
expected_datanames = ['simple_dict', 'nested_dict', 'array_dict', 'mixed_dict']
|
|
expected_types = ['dictionary', 'dictionary', 'dictionary', 'dictionary']
|
|
|
|
for i in range(len(env['payloads'])):
|
|
payload = env['payloads'][i]
|
|
|
|
if payload['dataname'] != expected_datanames[i]:
|
|
print(f"❌ Payload {i + 1}: Expected dataname '{expected_datanames[i]}', got '{payload['dataname']}'")
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Correct dataname')
|
|
|
|
if payload['payload_type'] != expected_types[i]:
|
|
print(f"❌ Payload {i + 1}: Expected type '{expected_types[i]}', got '{payload['payload_type']}'")
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Correct type')
|
|
|
|
if payload['transport'] != 'direct':
|
|
print(f"❌ Payload {i + 1}: Expected transport 'direct', got '{payload['transport']}'")
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Correct transport')
|
|
|
|
if payload['encoding'] != 'base64':
|
|
print(f"❌ Payload {i + 1}: Expected encoding 'base64', got '{payload['encoding']}'")
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Correct encoding')
|
|
|
|
# Decode and verify the data
|
|
import base64
|
|
decoded_data = json.loads(base64.b64decode(payload['data']).decode('utf8'))
|
|
original_data = test_data[i][1]
|
|
|
|
# Normalize for comparison (None vs null, True vs true, etc.)
|
|
if json.dumps(decoded_data, sort_keys=True) != json.dumps(original_data, sort_keys=True):
|
|
print(f'❌ Payload {i + 1}: Data integrity mismatch')
|
|
print(f' Expected: {json.dumps(original_data)}')
|
|
print(f' Got: {json.dumps(decoded_data)}')
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Data integrity verified')
|
|
|
|
print(f' Size: {payload["size"]} bytes\n')
|
|
|
|
# Test round-trip serialization
|
|
print('=== Round-trip Serialization Test ===')
|
|
round_trip_data = [
|
|
('roundtrip', {'test': 'data', 'numbers': [1, 2, 3], 'nested': {'a': 1, 'b': 2}}, 'dictionary')
|
|
]
|
|
|
|
rt_env, _ = await smartsend(
|
|
TEST_SUBJECT,
|
|
round_trip_data,
|
|
broker_url=TEST_BROKER_URL,
|
|
fileserver_url=TEST_FILESERVER_URL,
|
|
correlation_id='roundtrip-' + correlation_id,
|
|
is_publish=False
|
|
)
|
|
|
|
rt_payload = rt_env['payloads'][0]
|
|
rt_decoded = json.loads(base64.b64decode(rt_payload['data']).decode('utf8'))
|
|
|
|
if json.dumps(rt_decoded, sort_keys=True) == json.dumps(round_trip_data[0][1], sort_keys=True):
|
|
print('✅ Round-trip serialization successful')
|
|
else:
|
|
print('❌ Round-trip serialization failed')
|
|
passed = False
|
|
|
|
# Test JSON string output
|
|
print('\n=== JSON String Output Test ===')
|
|
try:
|
|
parsed = json.loads(env_json_str)
|
|
if parsed['correlation_id'] == env['correlation_id'] and \
|
|
len(parsed['payloads']) == len(env['payloads']):
|
|
print('✅ JSON string is valid and matches envelope')
|
|
else:
|
|
print('❌ JSON string does not match envelope')
|
|
passed = False
|
|
except json.JSONDecodeError as e:
|
|
print(f'❌ JSON string is invalid: {e}')
|
|
passed = False
|
|
|
|
# Final result
|
|
print('\n=== Test Result ===')
|
|
if passed:
|
|
print('✅ ALL TESTS PASSED')
|
|
sys.exit(0)
|
|
else:
|
|
print('❌ SOME TESTS FAILED')
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
print(f'❌ Test failed with error: {e}')
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(run_test()) |