update
This commit is contained in:
@@ -1,220 +1,207 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Micropython NATS Bridge - Basic Test Examples
|
||||
|
||||
This module demonstrates basic usage of the NATSBridge for Micropython.
|
||||
Basic functionality test for nats_bridge.py
|
||||
Tests the core classes and functions without NATS connection
|
||||
"""
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, "../src")
|
||||
import os
|
||||
|
||||
from nats_bridge import MessageEnvelope, MessagePayload, smartsend, smartreceive, log_trace
|
||||
# Add src to path for import
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
|
||||
|
||||
from nats_bridge import (
|
||||
MessagePayload,
|
||||
MessageEnvelope,
|
||||
smartsend,
|
||||
smartreceive,
|
||||
log_trace,
|
||||
generate_uuid,
|
||||
get_timestamp,
|
||||
_serialize_data,
|
||||
_deserialize_data
|
||||
)
|
||||
import json
|
||||
|
||||
# ============================================= 100 ============================================== #
|
||||
|
||||
|
||||
def test_text_message():
|
||||
"""Test sending and receiving text messages."""
|
||||
print("\n=== Test 1: Text Message ===")
|
||||
def test_message_payload():
|
||||
"""Test MessagePayload class"""
|
||||
print("\n=== Testing MessagePayload ===")
|
||||
|
||||
# Send text message
|
||||
data = [
|
||||
("message", "Hello World", "text"),
|
||||
("greeting", "Good morning!", "text")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/test/text",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
size_threshold=1000000
|
||||
# Test direct transport with text
|
||||
payload1 = MessagePayload(
|
||||
data="Hello World",
|
||||
msg_type="text",
|
||||
id="test-id-1",
|
||||
dataname="message",
|
||||
transport="direct",
|
||||
encoding="base64",
|
||||
size=11
|
||||
)
|
||||
|
||||
print("Sent envelope:")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Correlation ID: {}".format(env.correlation_id))
|
||||
print(" Payloads: {}".format(len(env.payloads)))
|
||||
assert payload1.id == "test-id-1"
|
||||
assert payload1.dataname == "message"
|
||||
assert payload1.type == "text"
|
||||
assert payload1.transport == "direct"
|
||||
assert payload1.encoding == "base64"
|
||||
assert payload1.size == 11
|
||||
print(" [PASS] MessagePayload with text data")
|
||||
|
||||
# Expected output on receiver:
|
||||
# envelope = smartreceive(msg)
|
||||
# for dataname, data, type in envelope["payloads"]:
|
||||
# print("Received {}: {}".format(dataname, data))
|
||||
|
||||
|
||||
def test_dictionary_message():
|
||||
"""Test sending and receiving dictionary messages."""
|
||||
print("\n=== Test 2: Dictionary Message ===")
|
||||
|
||||
# Send dictionary message
|
||||
config = {
|
||||
"step_size": 0.01,
|
||||
"iterations": 1000,
|
||||
"threshold": 0.5
|
||||
}
|
||||
|
||||
data = [
|
||||
("config", config, "dictionary")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/test/dictionary",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
size_threshold=1000000
|
||||
# Test link transport with URL
|
||||
payload2 = MessagePayload(
|
||||
data="http://example.com/file.txt",
|
||||
msg_type="binary",
|
||||
id="test-id-2",
|
||||
dataname="file",
|
||||
transport="link",
|
||||
encoding="none",
|
||||
size=1000
|
||||
)
|
||||
|
||||
print("Sent envelope:")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Payloads: {}".format(len(env.payloads)))
|
||||
assert payload2.transport == "link"
|
||||
assert payload2.data == "http://example.com/file.txt"
|
||||
print(" [PASS] MessagePayload with link transport")
|
||||
|
||||
# Expected output on receiver:
|
||||
# envelope = smartreceive(msg)
|
||||
# for dataname, data, type in envelope["payloads"]:
|
||||
# if type == "dictionary":
|
||||
# print("Config: {}".format(data))
|
||||
# Test to_dict method
|
||||
payload_dict = payload1.to_dict()
|
||||
assert "id" in payload_dict
|
||||
assert "dataname" in payload_dict
|
||||
assert "type" in payload_dict
|
||||
assert "transport" in payload_dict
|
||||
assert "data" in payload_dict
|
||||
print(" [PASS] MessagePayload.to_dict() method")
|
||||
|
||||
|
||||
def test_mixed_payloads():
|
||||
"""Test sending mixed payload types in a single message."""
|
||||
print("\n=== Test 3: Mixed Payloads ===")
|
||||
def test_message_envelope():
|
||||
"""Test MessageEnvelope class"""
|
||||
print("\n=== Testing MessageEnvelope ===")
|
||||
|
||||
# Mixed content: text, dictionary, and binary
|
||||
image_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR" # PNG header (example)
|
||||
# Create payloads
|
||||
payload1 = MessagePayload("Hello", "text", id="p1", dataname="msg1")
|
||||
payload2 = MessagePayload("http://example.com/file", "binary", id="p2", dataname="file", transport="link")
|
||||
|
||||
data = [
|
||||
("message_text", "Hello!", "text"),
|
||||
("user_config", {"theme": "dark", "volume": 80}, "dictionary"),
|
||||
("user_image", image_data, "binary")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/test/mixed",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
size_threshold=1000000
|
||||
# Create envelope
|
||||
env = MessageEnvelope(
|
||||
send_to="/test/subject",
|
||||
payloads=[payload1, payload2],
|
||||
correlation_id="test-correlation-id",
|
||||
msg_id="test-msg-id",
|
||||
msg_purpose="chat",
|
||||
sender_name="test_sender",
|
||||
receiver_name="test_receiver",
|
||||
reply_to="/test/reply"
|
||||
)
|
||||
|
||||
print("Sent envelope:")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Payloads: {}".format(len(env.payloads)))
|
||||
assert env.send_to == "/test/subject"
|
||||
assert env.correlation_id == "test-correlation-id"
|
||||
assert env.msg_id == "test-msg-id"
|
||||
assert env.msg_purpose == "chat"
|
||||
assert len(env.payloads) == 2
|
||||
print(" [PASS] MessageEnvelope creation")
|
||||
|
||||
# Expected output on receiver:
|
||||
# envelope = smartreceive(msg)
|
||||
# for dataname, data, type in envelope["payloads"]:
|
||||
# print("Received {}: {} (type: {})".format(dataname, data if type != "binary" else len(data), type))
|
||||
# Test to_json method
|
||||
json_str = env.to_json()
|
||||
json_data = json.loads(json_str)
|
||||
assert json_data["sendTo"] == "/test/subject"
|
||||
assert json_data["correlationId"] == "test-correlation-id"
|
||||
assert json_data["msgPurpose"] == "chat"
|
||||
assert len(json_data["payloads"]) == 2
|
||||
print(" [PASS] MessageEnvelope.to_json() method")
|
||||
|
||||
|
||||
def test_large_payload():
|
||||
"""Test sending large payloads that require fileserver upload."""
|
||||
print("\n=== Test 4: Large Payload (Link Transport) ===")
|
||||
def test_serialize_data():
|
||||
"""Test _serialize_data function"""
|
||||
print("\n=== Testing _serialize_data ===")
|
||||
|
||||
# Create large data (> 1MB would trigger link transport)
|
||||
# For testing, we'll use a smaller size but configure threshold lower
|
||||
large_data = b"A" * 100000 # 100KB
|
||||
# Test text serialization
|
||||
text_bytes = _serialize_data("Hello", "text")
|
||||
assert isinstance(text_bytes, bytes)
|
||||
assert text_bytes == b"Hello"
|
||||
print(" [PASS] Text serialization")
|
||||
|
||||
data = [
|
||||
("large_data", large_data, "binary")
|
||||
]
|
||||
# Test dictionary serialization
|
||||
dict_data = {"key": "value", "number": 42}
|
||||
dict_bytes = _serialize_data(dict_data, "dictionary")
|
||||
assert isinstance(dict_bytes, bytes)
|
||||
parsed = json.loads(dict_bytes.decode('utf-8'))
|
||||
assert parsed["key"] == "value"
|
||||
print(" [PASS] Dictionary serialization")
|
||||
|
||||
# Use a lower threshold for testing
|
||||
env = smartsend(
|
||||
"/test/large",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
fileserver_url="http://localhost:8080",
|
||||
size_threshold=50000 # 50KB threshold for testing
|
||||
)
|
||||
# Test binary serialization
|
||||
binary_data = b"\x00\x01\x02"
|
||||
binary_bytes = _serialize_data(binary_data, "binary")
|
||||
assert binary_bytes == b"\x00\x01\x02"
|
||||
print(" [PASS] Binary serialization")
|
||||
|
||||
print("Sent envelope:")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Payloads: {}".format(len(env.payloads)))
|
||||
for p in env.payloads:
|
||||
print(" - Transport: {}, Type: {}".format(p.transport, p.type))
|
||||
# Test image serialization
|
||||
image_data = bytes([1, 2, 3, 4, 5])
|
||||
image_bytes = _serialize_data(image_data, "image")
|
||||
assert image_bytes == image_data
|
||||
print(" [PASS] Image serialization")
|
||||
|
||||
|
||||
def test_reply_to():
|
||||
"""Test sending messages with reply-to functionality."""
|
||||
print("\n=== Test 5: Reply To ===")
|
||||
def test_deserialize_data():
|
||||
"""Test _deserialize_data function"""
|
||||
print("\n=== Testing _deserialize_data ===")
|
||||
|
||||
data = [
|
||||
("command", {"action": "start"}, "dictionary")
|
||||
]
|
||||
# Test text deserialization
|
||||
text_bytes = b"Hello"
|
||||
text_data = _deserialize_data(text_bytes, "text", "test-correlation-id")
|
||||
assert text_data == "Hello"
|
||||
print(" [PASS] Text deserialization")
|
||||
|
||||
env = smartsend(
|
||||
"/test/command",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
reply_to="/test/response",
|
||||
reply_to_msg_id="reply-123",
|
||||
msg_purpose="command"
|
||||
)
|
||||
# Test dictionary deserialization
|
||||
dict_bytes = b'{"key": "value"}'
|
||||
dict_data = _deserialize_data(dict_bytes, "dictionary", "test-correlation-id")
|
||||
assert dict_data == {"key": "value"}
|
||||
print(" [PASS] Dictionary deserialization")
|
||||
|
||||
print("Sent envelope:")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Reply To: {}".format(env.reply_to))
|
||||
print(" Reply To Msg ID: {}".format(env.reply_to_msg_id))
|
||||
# Test binary deserialization
|
||||
binary_data = b"\x00\x01\x02"
|
||||
binary_result = _deserialize_data(binary_data, "binary", "test-correlation-id")
|
||||
assert binary_result == b"\x00\x01\x02"
|
||||
print(" [PASS] Binary deserialization")
|
||||
|
||||
|
||||
def test_correlation_id():
|
||||
"""Test using custom correlation IDs for tracing."""
|
||||
print("\n=== Test 6: Custom Correlation ID ===")
|
||||
def test_utilities():
|
||||
"""Test utility functions"""
|
||||
print("\n=== Testing Utility Functions ===")
|
||||
|
||||
custom_cid = "trace-abc123"
|
||||
data = [
|
||||
("message", "Test with correlation ID", "text")
|
||||
]
|
||||
# Test generate_uuid
|
||||
uuid1 = generate_uuid()
|
||||
uuid2 = generate_uuid()
|
||||
assert uuid1 != uuid2
|
||||
print(f" [PASS] generate_uuid() - generated: {uuid1}")
|
||||
|
||||
env = smartsend(
|
||||
"/test/correlation",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
correlation_id=custom_cid
|
||||
)
|
||||
|
||||
print("Sent envelope with correlation ID: {}".format(env.correlation_id))
|
||||
print("This ID can be used to trace the message flow.")
|
||||
# Test get_timestamp
|
||||
timestamp = get_timestamp()
|
||||
assert "T" in timestamp
|
||||
print(f" [PASS] get_timestamp() - generated: {timestamp}")
|
||||
|
||||
|
||||
def test_multiple_payloads():
|
||||
"""Test sending multiple payloads in one message."""
|
||||
print("\n=== Test 7: Multiple Payloads ===")
|
||||
def main():
|
||||
"""Run all tests"""
|
||||
print("=" * 60)
|
||||
print("NATSBridge Python/Micropython - Basic Functionality Tests")
|
||||
print("=" * 60)
|
||||
|
||||
data = [
|
||||
("text_message", "Hello", "text"),
|
||||
("json_data", {"key": "value", "number": 42}, "dictionary"),
|
||||
("table_data", b"\x01\x02\x03\x04", "binary"),
|
||||
("audio_data", b"\x00\x01\x02\x03", "binary")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/test/multiple",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
size_threshold=1000000
|
||||
)
|
||||
|
||||
print("Sent {} payloads in one message".format(len(env.payloads)))
|
||||
try:
|
||||
test_message_payload()
|
||||
test_message_envelope()
|
||||
test_serialize_data()
|
||||
test_deserialize_data()
|
||||
test_utilities()
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("ALL TESTS PASSED!")
|
||||
print("=" * 60)
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n[FAIL] Test failed with error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Micropython NATS Bridge Test Suite")
|
||||
print("==================================")
|
||||
print()
|
||||
|
||||
# Run tests
|
||||
test_text_message()
|
||||
test_dictionary_message()
|
||||
test_mixed_payloads()
|
||||
test_large_payload()
|
||||
test_reply_to()
|
||||
test_correlation_id()
|
||||
test_multiple_payloads()
|
||||
|
||||
print("\n=== All tests completed ===")
|
||||
print()
|
||||
print("Note: These tests require:")
|
||||
print(" 1. A running NATS server at nats://localhost:4222")
|
||||
print(" 2. An HTTP file server at http://localhost:8080 (for large payloads)")
|
||||
print()
|
||||
print("To run the tests:")
|
||||
print(" python test_micropython_basic.py")
|
||||
main()
|
||||
Reference in New Issue
Block a user