199 lines
7.2 KiB
Python
199 lines
7.2 KiB
Python
"""
|
|
Python Mix Payloads Sender Test
|
|
Tests the smartsend function with mixed payload types
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
import base64
|
|
|
|
# Add parent directory to path
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from natsbridge import smartsend, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL
|
|
|
|
TEST_SUBJECT = '/test/mix'
|
|
TEST_BROKER_URL = os.environ.get('NATS_URL', 'nats://localhost:4222')
|
|
TEST_FILESERVER_URL = os.environ.get('FILESERVER_URL', 'http://localhost:8080')
|
|
|
|
|
|
async def run_test():
|
|
print('=== Python Mix Payloads Sender Test ===\n')
|
|
|
|
correlation_id = 'py-mix-test-' + str(asyncio.get_event_loop().time() * 1000000)
|
|
print(f'Correlation ID: {correlation_id}')
|
|
print(f'Subject: {TEST_SUBJECT}')
|
|
print(f'Broker URL: {TEST_BROKER_URL}\n')
|
|
|
|
# Test data - mixed payload types
|
|
text_data = 'Hello, NATSBridge!'
|
|
dict_data = {'key1': 'value1', 'key2': 42, 'nested': {'a': 1, 'b': 2}}
|
|
image_data = bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) # PNG header
|
|
|
|
# Table data
|
|
try:
|
|
import pandas as pd
|
|
table_data = pd.DataFrame({
|
|
'id': [1, 2, 3],
|
|
'name': ['Alice', 'Bob', 'Charlie'],
|
|
'age': [30, 25, 35]
|
|
})
|
|
table_available = True
|
|
except ImportError:
|
|
table_available = False
|
|
table_data = None
|
|
|
|
test_data = [
|
|
('message', text_data, 'text'),
|
|
('config', dict_data, 'dictionary'),
|
|
('image', image_data, 'image')
|
|
]
|
|
|
|
if table_available:
|
|
test_data.append(('users', table_data, 'table'))
|
|
|
|
try:
|
|
# Send the message
|
|
print('Sending mixed payloads...')
|
|
env, env_json_str = await smartsend(
|
|
TEST_SUBJECT,
|
|
test_data,
|
|
broker_url=TEST_BROKER_URL,
|
|
fileserver_url=TEST_FILESERVER_URL,
|
|
correlation_id=correlation_id,
|
|
msg_purpose='test',
|
|
sender_name='py-mix-test',
|
|
is_publish=False
|
|
)
|
|
|
|
print('\n=== Envelope Created ===')
|
|
print(f'Correlation ID: {env["correlation_id"]}')
|
|
print(f'Message ID: {env["msg_id"]}')
|
|
print(f'Timestamp: {env["timestamp"]}')
|
|
print(f'Subject: {env["send_to"]}')
|
|
print(f'Purpose: {env["msg_purpose"]}')
|
|
print(f'Sender: {env["sender_name"]}')
|
|
print(f'Payloads: {len(env["payloads"])}\n')
|
|
|
|
# Validate envelope structure
|
|
print('=== Validation ===')
|
|
passed = True
|
|
|
|
expected_count = 4 if table_available else 3
|
|
if len(env['payloads']) != expected_count:
|
|
print(f'❌ Expected {expected_count} payloads, got {len(env["payloads"])}')
|
|
passed = False
|
|
else:
|
|
print('✅ Correct number of payloads')
|
|
|
|
# Test each payload
|
|
expected_datanames = ['message', 'config', 'image']
|
|
expected_types = ['text', 'dictionary', 'image']
|
|
expected_data = [text_data, dict_data, image_data]
|
|
|
|
if table_available:
|
|
expected_datanames.append('users')
|
|
expected_types.append('table')
|
|
|
|
for i in range(len(env['payloads'])):
|
|
payload = env['payloads'][i]
|
|
|
|
if payload['dataname'] != expected_datanames[i]:
|
|
print(f"❌ Payload {i + 1}: Expected dataname '{expected_datanames[i]}', got '{payload['dataname']}'")
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Correct dataname')
|
|
|
|
if payload['payload_type'] != expected_types[i]:
|
|
print(f"❌ Payload {i + 1}: Expected type '{expected_types[i]}', got '{payload['payload_type']}'")
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Correct type')
|
|
|
|
if payload['transport'] != 'direct':
|
|
print(f"❌ Payload {i + 1}: Expected transport 'direct', got '{payload['transport']}'")
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Correct transport')
|
|
|
|
if payload['encoding'] != 'base64':
|
|
print(f"❌ Payload {i + 1}: Expected encoding 'base64', got '{payload['encoding']}'")
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Correct encoding')
|
|
|
|
# Verify data integrity based on type
|
|
decoded_data = base64.b64decode(payload['data'])
|
|
|
|
if expected_types[i] == 'text':
|
|
decoded_text = decoded_data.decode('utf8')
|
|
if decoded_text != expected_data[i]:
|
|
print(f'❌ Payload {i + 1}: Data integrity mismatch')
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Data integrity verified')
|
|
elif expected_types[i] == 'dictionary':
|
|
import json
|
|
decoded_dict = json.loads(decoded_data.decode('utf8'))
|
|
if json.dumps(decoded_dict, sort_keys=True) != json.dumps(expected_data[i], sort_keys=True):
|
|
print(f'❌ Payload {i + 1}: Data integrity mismatch')
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Data integrity verified')
|
|
elif expected_types[i] == 'image':
|
|
if decoded_data != expected_data[i]:
|
|
print(f'❌ Payload {i + 1}: Data integrity mismatch')
|
|
passed = False
|
|
else:
|
|
print(f'✅ Payload {i + 1}: Data integrity verified')
|
|
elif expected_types[i] == 'table':
|
|
if len(decoded_data) > 0:
|
|
print(f'✅ Payload {i + 1}: Arrow IPC data present ({len(decoded_data)} bytes)')
|
|
else:
|
|
print(f'❌ Payload {i + 1}: Arrow IPC data is empty')
|
|
passed = False
|
|
|
|
print(f' Size: {payload["size"]} bytes\n')
|
|
|
|
# Test with chat-like payload (text + image + audio)
|
|
print('=== Chat-like Payload Test ===')
|
|
chat_data = [
|
|
('text', 'Hello!', 'text'),
|
|
('image', bytes([0xFF, 0xD8, 0xFF, 0xE0]), 'image'),
|
|
('audio', bytes([0x46, 0x4C, 0x41, 0x43]), 'audio')
|
|
]
|
|
|
|
chat_env, _ = await smartsend(
|
|
TEST_SUBJECT,
|
|
chat_data,
|
|
broker_url=TEST_BROKER_URL,
|
|
fileserver_url=TEST_FILESERVER_URL,
|
|
correlation_id='chat-' + correlation_id,
|
|
is_publish=False
|
|
)
|
|
|
|
if len(chat_env['payloads']) == 3:
|
|
print('✅ Chat-like payloads handled correctly')
|
|
else:
|
|
print('❌ Chat-like payloads handling failed')
|
|
passed = False
|
|
|
|
# Final result
|
|
print('\n=== Test Result ===')
|
|
if passed:
|
|
print('✅ ALL TESTS PASSED')
|
|
sys.exit(0)
|
|
else:
|
|
print('❌ SOME TESTS FAILED')
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
print(f'❌ Test failed with error: {e}')
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(run_test()) |