Files
NATSBridge/test/test_py_table_sender.py
2026-03-05 20:17:36 +07:00

167 lines
5.4 KiB
Python

"""
Python Table Sender Test
Tests the smartsend function with table (Arrow IPC) payloads
"""
import asyncio
import sys
import os
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from natbridge import smartsend, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL
TEST_SUBJECT = '/test/table'
TEST_BROKER_URL = os.environ.get('NATS_URL', 'nats://localhost:4222')
TEST_FILESERVER_URL = os.environ.get('FILESERVER_URL', 'http://localhost:8080')
async def run_test():
print('=== Python Table Sender Test ===\n')
correlation_id = 'py-table-test-' + str(asyncio.get_event_loop().time() * 1000000)
print(f'Correlation ID: {correlation_id}')
print(f'Subject: {TEST_SUBJECT}')
print(f'Broker URL: {TEST_BROKER_URL}\n')
# Test data - pandas DataFrame
try:
import pandas as pd
table_data = pd.DataFrame({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [30, 25, 35, 28, 32],
'active': [True, False, True, True, False]
})
table_available = True
except ImportError:
print('❌ pandas not available - skipping table tests')
sys.exit(0)
test_data = [
('users_table', table_data, 'table')
]
try:
# Send the message
print('Sending table payload...')
env, env_json_str = await smartsend(
TEST_SUBJECT,
test_data,
broker_url=TEST_BROKER_URL,
fileserver_url=TEST_FILESERVER_URL,
correlation_id=correlation_id,
msg_purpose='test',
sender_name='py-table-test',
is_publish=False
)
print('\n=== Envelope Created ===')
print(f'Correlation ID: {env["correlation_id"]}')
print(f'Message ID: {env["msg_id"]}')
print(f'Timestamp: {env["timestamp"]}')
print(f'Subject: {env["send_to"]}')
print(f'Purpose: {env["msg_purpose"]}')
print(f'Sender: {env["sender_name"]}')
print(f'Payloads: {len(env["payloads"])}\n')
# Validate envelope structure
print('=== Validation ===')
passed = True
if len(env['payloads']) != 1:
print(f'❌ Expected 1 payload, got {len(env["payloads"])}')
passed = False
else:
print('✅ Correct number of payloads')
payload = env['payloads'][0]
if payload['dataname'] != 'users_table':
print(f"❌ Expected dataname 'users_table', got '{payload['dataname']}'")
passed = False
else:
print('✅ Correct dataname')
if payload['payload_type'] != 'table':
print(f"❌ Expected payload_type 'table', got '{payload['payload_type']}'")
passed = False
else:
print('✅ Correct payload_type')
if payload['transport'] != 'direct':
print(f"❌ Expected transport 'direct', got '{payload['transport']}'")
passed = False
else:
print('✅ Correct transport')
if payload['encoding'] != 'base64':
print(f"❌ Expected encoding 'base64', got '{payload['encoding']}'")
passed = False
else:
print('✅ Correct encoding')
print(f'\nPayload size: {payload["size"]} bytes')
# Test with larger table
print('\n=== Larger Table Test ===')
large_table_data = pd.DataFrame({
'id': range(100),
'name': [f'User{i}' for i in range(100)],
'age': [20 + (i % 50) for i in range(100)],
'active': [i % 2 == 0 for i in range(100)]
})
large_test_data = [
('large_table', large_table_data, 'table')
]
large_env, _ = await smartsend(
TEST_SUBJECT,
large_test_data,
broker_url=TEST_BROKER_URL,
fileserver_url=TEST_FILESERVER_URL,
correlation_id='large-' + correlation_id,
is_publish=False
)
if len(large_env['payloads']) == 1:
print('✅ Large table handled correctly')
print(f' Size: {large_env["payloads"][0]["size"]} bytes')
else:
print('❌ Large table handling failed')
passed = False
# Test JSON string output
print('\n=== JSON String Output Test ===')
import json
try:
parsed = json.loads(env_json_str)
if parsed['correlation_id'] == env['correlation_id'] and \
len(parsed['payloads']) == len(env['payloads']):
print('✅ JSON string is valid and matches envelope')
else:
print('❌ JSON string does not match envelope')
passed = False
except json.JSONDecodeError as e:
print(f'❌ JSON string is invalid: {e}')
passed = False
# Final result
print('\n=== Test Result ===')
if passed:
print('✅ ALL TESTS PASSED')
sys.exit(0)
else:
print('❌ SOME TESTS FAILED')
sys.exit(1)
except Exception as e:
print(f'❌ Test failed with error: {e}')
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
asyncio.run(run_test())