Files
NATSBridge/test/test_py_binary_sender.py
2026-03-06 08:19:15 +07:00

183 lines
6.5 KiB
Python

"""
Python Binary Sender Test
Tests the smartsend function with binary/image/audio/video/table payloads
"""
import asyncio
import sys
import os
import base64
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from natsbridge import smartsend, DEFAULT_BROKER_URL, DEFAULT_FILESERVER_URL
TEST_SUBJECT = '/test/binary'
TEST_BROKER_URL = os.environ.get('NATS_URL', 'nats://localhost:4222')
TEST_FILESERVER_URL = os.environ.get('FILESERVER_URL', 'http://localhost:8080')
async def run_test():
print('=== Python Binary Sender Test ===\n')
correlation_id = 'py-binary-test-' + str(asyncio.get_event_loop().time() * 1000000)
print(f'Correlation ID: {correlation_id}')
print(f'Subject: {TEST_SUBJECT}')
print(f'Broker URL: {TEST_BROKER_URL}\n')
# Test data - binary data for different types
image_data = bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) # PNG header
audio_data = bytes([0x46, 0x4C, 0x41, 0x43, 0x00, 0x00, 0x00, 0x00]) # FLAC header
video_data = bytes([0x00, 0x00, 0x00, 0x18, 0x66, 0x74, 0x79, 0x70]) # MP4 header
generic_binary = bytes([0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0xBA, 0xBE])
# Test table data
try:
import pandas as pd
table_data = pd.DataFrame({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'value': [10.5, 20.3, 30.1, 40.9, 50.7]
})
table_available = True
except ImportError:
table_available = False
table_data = None
test_data = [
('image', image_data, 'image'),
('audio', audio_data, 'audio'),
('video', video_data, 'video'),
('binary', generic_binary, 'binary')
]
if table_available:
test_data.append(('table', table_data, 'table'))
try:
# Send the message
print('Sending binary payloads...')
env, env_json_str = await smartsend(
TEST_SUBJECT,
test_data,
broker_url=TEST_BROKER_URL,
fileserver_url=TEST_FILESERVER_URL,
correlation_id=correlation_id,
msg_purpose='test',
sender_name='py-binary-test',
is_publish=False
)
print('\n=== Envelope Created ===')
print(f'Correlation ID: {env["correlation_id"]}')
print(f'Message ID: {env["msg_id"]}')
print(f'Timestamp: {env["timestamp"]}')
print(f'Subject: {env["send_to"]}')
print(f'Purpose: {env["msg_purpose"]}')
print(f'Sender: {env["sender_name"]}')
print(f'Payloads: {len(env["payloads"])}\n')
# Validate envelope structure
print('=== Validation ===')
passed = True
expected_count = 5 if table_available else 4
if len(env['payloads']) != expected_count:
print(f'❌ Expected {expected_count} payloads, got {len(env["payloads"])}')
passed = False
else:
print('✅ Correct number of payloads')
# Test each payload
expected_datanames = ['image', 'audio', 'video', 'binary']
expected_types = ['image', 'audio', 'video', 'binary']
expected_data = [image_data, audio_data, video_data, generic_binary]
if table_available:
expected_datanames.append('table')
expected_types.append('table')
for i in range(len(env['payloads'])):
payload = env['payloads'][i]
if payload['dataname'] != expected_datanames[i]:
print(f"❌ Payload {i + 1}: Expected dataname '{expected_datanames[i]}', got '{payload['dataname']}'")
passed = False
else:
print(f'✅ Payload {i + 1}: Correct dataname')
if payload['payload_type'] != expected_types[i]:
print(f"❌ Payload {i + 1}: Expected type '{expected_types[i]}', got '{payload['payload_type']}'")
passed = False
else:
print(f'✅ Payload {i + 1}: Correct type')
if payload['transport'] != 'direct':
print(f"❌ Payload {i + 1}: Expected transport 'direct', got '{payload['transport']}'")
passed = False
else:
print(f'✅ Payload {i + 1}: Correct transport')
if payload['encoding'] != 'base64':
print(f"❌ Payload {i + 1}: Expected encoding 'base64', got '{payload['encoding']}'")
passed = False
else:
print(f'✅ Payload {i + 1}: Correct encoding')
# Decode and verify the data
decoded_data = base64.b64decode(payload['data'])
if i < len(expected_data):
original_data = expected_data[i]
if decoded_data != original_data:
print(f'❌ Payload {i + 1}: Data integrity mismatch')
passed = False
else:
print(f'✅ Payload {i + 1}: Data integrity verified')
else:
# Table payload - just verify it's present
print(f'✅ Payload {i + 1}: Table data present (size: {payload["size"]} bytes)')
print(f' Size: {payload["size"]} bytes\n')
# Test with larger binary data
print('=== Large Binary Data Test ===')
large_data = bytes([0xFF] * 10000) # 10KB of binary data
large_test_data = [
('large_binary', large_data, 'binary')
]
large_env, _ = await smartsend(
TEST_SUBJECT,
large_test_data,
broker_url=TEST_BROKER_URL,
fileserver_url=TEST_FILESERVER_URL,
correlation_id='large-' + correlation_id,
is_publish=False
)
if len(large_env['payloads']) == 1 and large_env['payloads'][0]['size'] == 10000:
print('✅ Large binary data handled correctly')
else:
print('❌ Large binary data handling failed')
passed = False
# Final result
print('\n=== Test Result ===')
if passed:
print('✅ ALL TESTS PASSED')
sys.exit(0)
else:
print('❌ SOME TESTS FAILED')
sys.exit(1)
except Exception as e:
print(f'❌ Test failed with error: {e}')
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
asyncio.run(run_test())