Compare commits
3 Commits
add_mix_co
...
8a35f1d4dc
| Author | SHA1 | Date | |
|---|---|---|---|
| 8a35f1d4dc | |||
| 9e5ee61785 | |||
| 4b5b5d6ed8 |
221
examples/micropython_example.py
Normal file
221
examples/micropython_example.py
Normal file
@@ -0,0 +1,221 @@
|
||||
"""
|
||||
Micropython NATS Bridge - Simple Example
|
||||
|
||||
This example demonstrates the basic usage of the NATSBridge for Micropython.
|
||||
"""
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, "../src")
|
||||
|
||||
from nats_bridge import smartsend, smartreceive, log_trace
|
||||
import json
|
||||
|
||||
|
||||
def example_simple_chat():
|
||||
"""
|
||||
Simple chat example: Send text messages via NATS.
|
||||
|
||||
Sender (this script):
|
||||
- Sends a text message to NATS
|
||||
- Uses direct transport (no fileserver needed)
|
||||
|
||||
Receiver (separate script):
|
||||
- Listens to NATS
|
||||
- Receives and processes the message
|
||||
"""
|
||||
print("=== Simple Chat Example ===")
|
||||
print()
|
||||
|
||||
# Define the message data as list of (dataname, data, type) tuples
|
||||
data = [
|
||||
("message", "Hello from Micropython!", "text")
|
||||
]
|
||||
|
||||
# Send the message
|
||||
env = smartsend(
|
||||
"/chat/room1",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
msg_purpose="chat",
|
||||
sender_name="micropython-client"
|
||||
)
|
||||
|
||||
print("Message sent!")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Correlation ID: {}".format(env.correlation_id))
|
||||
print(" Payloads: {}".format(len(env.payloads)))
|
||||
print()
|
||||
|
||||
# Expected receiver output:
|
||||
print("Expected receiver output:")
|
||||
print(" [timestamp] [Correlation: ...] Starting smartsend for subject: /chat/room1")
|
||||
print(" [timestamp] [Correlation: ...] Serialized payload 'message' (type: text) size: 22 bytes")
|
||||
print(" [timestamp] [Correlation: ...] Using direct transport for 22 bytes")
|
||||
print(" [timestamp] [Correlation: ...] Message published to /chat/room1")
|
||||
print()
|
||||
|
||||
return env
|
||||
|
||||
|
||||
def example_send_json():
|
||||
"""
|
||||
Example: Send JSON configuration to a Micropython device.
|
||||
|
||||
This demonstrates sending structured data (dictionary type).
|
||||
"""
|
||||
print("\n=== Send JSON Configuration ===")
|
||||
print()
|
||||
|
||||
# Define configuration as dictionary
|
||||
config = {
|
||||
"wifi_ssid": "MyNetwork",
|
||||
"wifi_password": "password123",
|
||||
"server_host": "mqtt.example.com",
|
||||
"server_port": 1883,
|
||||
"update_interval": 60
|
||||
}
|
||||
|
||||
# Send configuration
|
||||
data = [
|
||||
("device_config", config, "dictionary")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/device/config",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
msg_purpose="updateStatus",
|
||||
sender_name="server"
|
||||
)
|
||||
|
||||
print("Configuration sent!")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Payloads: {}".format(len(env.payloads)))
|
||||
print()
|
||||
|
||||
return env
|
||||
|
||||
|
||||
def example_receive_message(msg):
|
||||
"""
|
||||
Example: Receive and process a NATS message.
|
||||
|
||||
Args:
|
||||
msg: The NATS message received (should be dict or JSON string)
|
||||
|
||||
Returns:
|
||||
list: List of (dataname, data, type) tuples
|
||||
"""
|
||||
print("\n=== Receive Message ===")
|
||||
print()
|
||||
|
||||
# Process the message
|
||||
payloads = smartreceive(
|
||||
msg,
|
||||
fileserver_download_handler=None, # Not needed for direct transport
|
||||
max_retries=3,
|
||||
base_delay=100,
|
||||
max_delay=1000
|
||||
)
|
||||
|
||||
print("Received {} payload(s):".format(len(payloads)))
|
||||
for dataname, data, type in payloads:
|
||||
print(" - {}: {} (type: {})".format(dataname, data, type))
|
||||
|
||||
return payloads
|
||||
|
||||
|
||||
def example_mixed_content():
|
||||
"""
|
||||
Example: Send mixed content (text + dictionary + binary).
|
||||
|
||||
This demonstrates the multi-payload capability.
|
||||
"""
|
||||
print("\n=== Mixed Content Example ===")
|
||||
print()
|
||||
|
||||
# Create mixed content
|
||||
image_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR" # Example PNG header
|
||||
|
||||
data = [
|
||||
("message_text", "Hello with image!", "text"),
|
||||
("user_config", {"theme": "dark", "notifications": True}, "dictionary"),
|
||||
("user_avatar", image_data, "binary")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/chat/mixed",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
msg_purpose="chat",
|
||||
sender_name="micropython-client"
|
||||
)
|
||||
|
||||
print("Mixed content sent!")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Payloads:")
|
||||
for p in env.payloads:
|
||||
print(" - {} (transport: {}, type: {}, size: {} bytes)".format(
|
||||
p.dataname, p.transport, p.type, p.size))
|
||||
|
||||
return env
|
||||
|
||||
|
||||
def example_reply():
|
||||
"""
|
||||
Example: Send a message with reply-to functionality.
|
||||
|
||||
This demonstrates request-response pattern.
|
||||
"""
|
||||
print("\n=== Request-Response Example ===")
|
||||
print()
|
||||
|
||||
# Send command
|
||||
data = [
|
||||
("command", {"action": "read_sensor", "sensor_id": "temp1"}, "dictionary")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/device/command",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
msg_purpose="command",
|
||||
sender_name="server",
|
||||
reply_to="/device/response",
|
||||
reply_to_msg_id="cmd-001"
|
||||
)
|
||||
|
||||
print("Command sent!")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Reply To: {}".format(env.reply_to))
|
||||
print(" Reply To Msg ID: {}".format(env.reply_to_msg_id))
|
||||
print()
|
||||
|
||||
print("Expected receiver behavior:")
|
||||
print(" 1. Receive command on /device/command")
|
||||
print(" 2. Process command")
|
||||
print(" 3. Send response to /device/response")
|
||||
print(" 4. Include replyToMsgId in response")
|
||||
|
||||
return env
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Micropython NATS Bridge Examples")
|
||||
print("================================")
|
||||
print()
|
||||
|
||||
# Run examples
|
||||
example_simple_chat()
|
||||
example_send_json()
|
||||
example_mixed_content()
|
||||
example_reply()
|
||||
|
||||
print("\n=== Examples Completed ===")
|
||||
print()
|
||||
print("To run these examples, you need:")
|
||||
print(" 1. A running NATS server at nats://localhost:4222")
|
||||
print(" 2. Import the nats_bridge module")
|
||||
print(" 3. Call the desired example function")
|
||||
print()
|
||||
print("For more examples, see test/test_micropython_basic.py")
|
||||
@@ -10,7 +10,7 @@
|
||||
#
|
||||
# Handler Function Signatures:
|
||||
#
|
||||
# ```julia
|
||||
# ```jldoctest
|
||||
# # Upload handler - uploads data to file server and returns URL
|
||||
# fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
#
|
||||
@@ -23,7 +23,7 @@
|
||||
# Even when sending a single payload, the user must wrap it in a list.
|
||||
#
|
||||
# API Standard:
|
||||
# ```julia
|
||||
# ```jldoctest
|
||||
# # Input format for smartsend (always a list of tuples with type info)
|
||||
# [(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
||||
#
|
||||
@@ -45,6 +45,59 @@ const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL
|
||||
const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport
|
||||
|
||||
|
||||
""" msgPayload_v1 - Internal message payload structure
|
||||
This structure represents a single payload within a NATS message envelope.
|
||||
It supports both direct transport (base64-encoded data) and link transport (URL-based).
|
||||
|
||||
# Arguments:
|
||||
- `id::String` - Unique identifier for this payload (e.g., "uuid4")
|
||||
- `dataname::String` - Name of the payload (e.g., "login_image")
|
||||
- `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||
- `transport::String` - Transport method: "direct" or "link"
|
||||
- `encoding::String` - Encoding method: "none", "json", "base64", "arrow-ipc"
|
||||
- `size::Integer` - Size of the payload in bytes (e.g., 15433)
|
||||
- `data::Any` - Payload data (bytes for direct, URL for link)
|
||||
- `metadata::Dict{String, Any}` - Optional metadata dictionary
|
||||
|
||||
# Keyword Arguments:
|
||||
- `id::String = ""` - Payload ID, auto-generated if empty
|
||||
- `dataname::String = string(uuid4())` - Payload name, auto-generated UUID if empty
|
||||
- `transport::String = "direct"` - Transport method
|
||||
- `encoding::String = "none"` - Encoding method
|
||||
- `size::Integer = 0` - Payload size
|
||||
- `metadata::Dict{String, T} = Dict{String, Any}()` - Metadata dictionary
|
||||
|
||||
# Return:
|
||||
- A msgPayload_v1 struct instance
|
||||
|
||||
# Example
|
||||
```jldoctest
|
||||
using UUIDs
|
||||
|
||||
# Create a direct transport payload
|
||||
payload = msgPayload_v1(
|
||||
"Hello World",
|
||||
"text";
|
||||
id = string(uuid4()),
|
||||
dataname = "message",
|
||||
transport = "direct",
|
||||
encoding = "base64",
|
||||
size = 11,
|
||||
metadata = Dict{String, Any}()
|
||||
)
|
||||
|
||||
# Create a link transport payload
|
||||
payload = msgPayload_v1(
|
||||
"http://example.com/file.zip",
|
||||
"binary";
|
||||
id = string(uuid4()),
|
||||
dataname = "file",
|
||||
transport = "link",
|
||||
encoding = "none",
|
||||
size = 1000000
|
||||
)
|
||||
```
|
||||
"""
|
||||
struct msgPayload_v1
|
||||
id::String # id of this payload e.g. "uuid4"
|
||||
dataname::String # name of this payload e.g. "login_image"
|
||||
@@ -80,6 +133,51 @@ function msgPayload_v1(
|
||||
end
|
||||
|
||||
|
||||
""" msgEnvelope_v1 - Internal message envelope structure
|
||||
This structure represents a complete NATS message envelope containing multiple payloads
|
||||
with metadata for routing, tracing, and message context.
|
||||
|
||||
# Arguments:
|
||||
- `sendTo::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt")
|
||||
- `payloads::AbstractArray{msgPayload_v1}` - List of payloads to include in the message
|
||||
|
||||
# Keyword Arguments:
|
||||
- `correlationId::String = ""` - Unique identifier to track messages across systems; auto-generated if empty
|
||||
- `msgId::String = ""` - Unique message identifier; auto-generated if empty
|
||||
- `timestamp::String = string(Dates.now())` - Message publication timestamp
|
||||
- `msgPurpose::String = ""` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
||||
- `senderName::String = ""` - Name of the sender (e.g., "agent-wine-web-frontend")
|
||||
- `senderId::String = ""` - UUID of the sender; auto-generated if empty
|
||||
- `receiverName::String = ""` - Name of the receiver (empty string means broadcast)
|
||||
- `receiverId::String = ""` - UUID of the receiver (empty string means broadcast)
|
||||
- `replyTo::String = ""` - Topic where receiver should reply (empty string if no reply expected)
|
||||
- `replyToMsgId::String = ""` - Message ID this message is replying to
|
||||
- `brokerURL::String = DEFAULT_NATS_URL` - NATS broker URL
|
||||
- `metadata::Dict{String, Any} = Dict{String, Any}()` - Optional message-level metadata
|
||||
|
||||
# Return:
|
||||
- A msgEnvelope_v1 struct instance
|
||||
|
||||
# Example
|
||||
```jldoctest
|
||||
using UUIDs, NATSBridge
|
||||
|
||||
# Create payloads for the message
|
||||
payload1 = msgPayload_v1("Hello", "text"; dataname="message", transport="direct", encoding="base64")
|
||||
payload2 = msgPayload_v1("http://example.com/file.zip", "binary"; dataname="file", transport="link")
|
||||
|
||||
# Create message envelope
|
||||
env = msgEnvelope_v1(
|
||||
"my.subject",
|
||||
[payload1, payload2];
|
||||
correlationId = string(uuid4()),
|
||||
msgPurpose = "chat",
|
||||
senderName = "my-app",
|
||||
receiverName = "receiver-app",
|
||||
replyTo = "reply.subject"
|
||||
)
|
||||
```
|
||||
"""
|
||||
struct msgEnvelope_v1
|
||||
correlationId::String # Unique identifier to track messages across systems. Many senders can talk about the same topic.
|
||||
msgId::String # this message id
|
||||
@@ -137,8 +235,34 @@ end
|
||||
|
||||
|
||||
|
||||
""" Convert msgEnvelope_v1 to JSON string
|
||||
This function converts the msgEnvelope_v1 struct to a JSON string representation.
|
||||
""" envelope_to_json - Convert msgEnvelope_v1 to JSON string
|
||||
This function converts the msgEnvelope_v1 struct to a JSON string representation,
|
||||
preserving all metadata and payload information for NATS message publishing.
|
||||
|
||||
# Function Workflow:
|
||||
1. Creates a dictionary with envelope metadata (correlationId, msgId, timestamp, etc.)
|
||||
2. Conditionally includes metadata dictionary if not empty
|
||||
3. Iterates through payloads and converts each to JSON-compatible dictionary
|
||||
4. Handles direct transport payloads (Base64 encoding) and link transport payloads (URL)
|
||||
5. Returns final JSON string representation
|
||||
|
||||
# Arguments:
|
||||
- `env::msgEnvelope_v1` - The msgEnvelope_v1 struct to convert to JSON
|
||||
|
||||
# Return:
|
||||
- `String` - JSON string representation of the envelope
|
||||
|
||||
# Example
|
||||
```jldoctest
|
||||
using UUIDs
|
||||
|
||||
# Create an envelope with payloads
|
||||
payload = msgPayload_v1("Hello", "text"; dataname="msg", transport="direct", encoding="base64")
|
||||
env = msgEnvelope_v1("my.subject", [payload])
|
||||
|
||||
# Convert to JSON for publishing
|
||||
json_msg = envelope_to_json(env)
|
||||
```
|
||||
"""
|
||||
function envelope_to_json(env::msgEnvelope_v1)
|
||||
obj = Dict{String, Any}(
|
||||
@@ -197,9 +321,24 @@ function envelope_to_json(env::msgEnvelope_v1)
|
||||
end
|
||||
|
||||
|
||||
""" Log a trace message with correlation ID and timestamp
|
||||
""" log_trace - Log a trace message with correlation ID and timestamp
|
||||
This function logs information messages with a correlation ID for tracing purposes,
|
||||
making it easier to track message flow across distributed systems.
|
||||
|
||||
# Arguments:
|
||||
- `correlation_id::String` - Correlation ID to identify the message flow
|
||||
- `message::String` - The message content to log
|
||||
|
||||
# Return:
|
||||
- `nothing` - This function performs logging but returns nothing
|
||||
|
||||
# Example
|
||||
```jldoctest
|
||||
using Dates
|
||||
|
||||
log_trace("abc123", "Starting message processing")
|
||||
# Logs: [2026-02-21T05:39:00] [Correlation: abc123] Starting message processing
|
||||
```
|
||||
"""
|
||||
function log_trace(correlation_id::String, message::String)
|
||||
timestamp = Dates.now() # Get current timestamp
|
||||
@@ -216,7 +355,7 @@ Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_u
|
||||
The function accepts a list of (dataname, data, type) tuples as input and processes each payload individually.
|
||||
Each payload can have a different type, enabling mixed-content messages (e.g., chat with text, images, audio).
|
||||
|
||||
The function workflow:
|
||||
# Function Workflow:
|
||||
1. Iterates through the list of (dataname, data, type) tuples
|
||||
2. For each payload: extracts the type from the tuple and serializes accordingly
|
||||
3. Compares the serialized size against `size_threshold`
|
||||
@@ -247,7 +386,7 @@ The function workflow:
|
||||
- A `msgEnvelope_v1` object containing metadata and transport information
|
||||
|
||||
# Example
|
||||
```julia
|
||||
```jldoctest
|
||||
using UUIDs
|
||||
|
||||
# Send a single payload (still wrapped in a list)
|
||||
@@ -376,26 +515,20 @@ end
|
||||
|
||||
|
||||
""" _serialize_data - Serialize data according to specified format
|
||||
|
||||
This function serializes arbitrary Julia data into a binary representation based on the specified format.
|
||||
It supports multiple serialization formats:
|
||||
- `"text"`: Treats data as text and converts to UTF-8 bytes
|
||||
- `"dictionary"`: Serializes data as JSON and returns the UTF-8 byte representation
|
||||
- `"table"`: Serializes data as an Arrow IPC stream (table format) and returns the byte stream
|
||||
- `"image"`: Expects binary image data (Vector{UInt8}) and returns it as bytes
|
||||
- `"audio"`: Expects binary audio data (Vector{UInt8}) and returns it as bytes
|
||||
- `"video"`: Expects binary video data (Vector{UInt8}) and returns it as bytes
|
||||
- `"binary"`: Generic binary data (Vector{UInt8} or IOBuffer) and returns bytes
|
||||
It supports multiple serialization formats for different data types.
|
||||
|
||||
The function handles format-specific serialization logic:
|
||||
1. For `"text"`: Converts string to UTF-8 bytes
|
||||
2. For `"dictionary"`: Converts Julia data to JSON string, then encodes to bytes
|
||||
3. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer
|
||||
4. For `"image"`, `"audio"`, `"video"`: Treats data as binary (Vector{UInt8})
|
||||
5. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly
|
||||
# Function Workflow:
|
||||
1. Validates the data type against the specified format
|
||||
2. Converts data to binary representation according to format rules
|
||||
3. For text: converts string to UTF-8 bytes
|
||||
4. For dictionary: serializes as JSON then converts to bytes
|
||||
5. For table: uses Arrow.jl to write as IPC stream
|
||||
6. For image/audio/video/binary: returns binary data directly
|
||||
|
||||
# Arguments:
|
||||
- `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`)
|
||||
- `type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||
|
||||
# Return:
|
||||
- `Vector{UInt8}` - Binary representation of the serialized data
|
||||
@@ -405,7 +538,7 @@ The function handles format-specific serialization logic:
|
||||
- `Error` if `type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}`
|
||||
|
||||
# Example
|
||||
```julia
|
||||
```jldoctest
|
||||
using JSON, Arrow, DataFrames
|
||||
|
||||
# Text serialization
|
||||
@@ -505,15 +638,29 @@ function _serialize_data(data::Any, type::String)
|
||||
end
|
||||
|
||||
|
||||
""" Publish message to NATS
|
||||
""" publish_message - Publish message to NATS
|
||||
This internal function publishes a message to a NATS subject with proper
|
||||
connection management and logging.
|
||||
|
||||
Arguments:
|
||||
- `nats_url::String` - NATS server URL
|
||||
- `subject::String` - NATS subject to publish to
|
||||
# Arguments:
|
||||
- `nats_url::String` - NATS server URL (e.g., "nats://localhost:4222")
|
||||
- `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
|
||||
- `message::String` - JSON message to publish
|
||||
- `correlation_id::String` - Correlation ID for logging
|
||||
- `correlation_id::String` - Correlation ID for tracing and logging
|
||||
|
||||
# Return:
|
||||
- `nothing` - This function performs publishing but returns nothing
|
||||
|
||||
# Example
|
||||
```jldoctest
|
||||
using NATS
|
||||
|
||||
# Prepare JSON message
|
||||
message = "{\"correlationId\":\"abc123\",\"payload\":\"test\"}"
|
||||
|
||||
# Publish to NATS
|
||||
publish_message("nats://localhost:4222", "my.subject", message, "abc123")
|
||||
```
|
||||
"""
|
||||
function publish_message(nats_url::String, subject::String, message::String, correlation_id::String)
|
||||
conn = NATS.connect(nats_url) # Create NATS connection
|
||||
@@ -532,20 +679,27 @@ This function processes incoming NATS messages, handling both direct transport
|
||||
It deserializes the data based on the transport type and returns the result.
|
||||
A HTTP file server is required along with its download function.
|
||||
|
||||
Arguments:
|
||||
# Function Workflow:
|
||||
1. Parses the JSON envelope from the NATS message
|
||||
2. Iterates through each payload in the envelope
|
||||
3. For each payload: determines the transport type (direct or link)
|
||||
4. For direct transport: decodes Base64 payload and deserializes based on type
|
||||
5. For link transport: fetches data from URL with exponential backoff, then deserializes
|
||||
|
||||
# Arguments:
|
||||
- `msg::NATS.Msg` - NATS message to process
|
||||
- `fileserverDownloadHandler::Function` - Function to handle downloading data from file server URLs
|
||||
|
||||
Keyword Arguments:
|
||||
- `max_retries::Int` - Maximum retry attempts for fetching URL (default: 5)
|
||||
- `base_delay::Int` - Initial delay for exponential backoff in ms (default: 100)
|
||||
- `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000)
|
||||
# Keyword Arguments:
|
||||
- `fileserverDownloadHandler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs
|
||||
- `max_retries::Int = 5` - Maximum retry attempts for fetching URL
|
||||
- `base_delay::Int = 100` - Initial delay for exponential backoff in ms
|
||||
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
|
||||
|
||||
Return:
|
||||
# Return:
|
||||
- `AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples
|
||||
|
||||
# Example
|
||||
```julia
|
||||
```jldoctest
|
||||
# Receive and process message
|
||||
msg = nats_message # NATS message
|
||||
payloads = smartreceive(msg; fileserverDownloadHandler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
|
||||
@@ -610,19 +764,35 @@ function smartreceive(
|
||||
end
|
||||
|
||||
|
||||
""" Fetch data from URL with exponential backoff
|
||||
""" _fetch_with_backoff - Fetch data from URL with exponential backoff
|
||||
This internal function retrieves data from a URL with retry logic using
|
||||
exponential backoff to handle transient failures.
|
||||
|
||||
Arguments:
|
||||
# Function Workflow:
|
||||
1. Initializes delay with base_delay value
|
||||
2. Attempts to fetch data from URL in a retry loop
|
||||
3. On success: logs success and returns response body as bytes
|
||||
4. On failure: sleeps using exponential backoff and retries
|
||||
5. After max_retries: throws error indicating failure
|
||||
|
||||
# Arguments:
|
||||
- `url::String` - URL to fetch from
|
||||
- `max_retries::Int` - Maximum number of retry attempts
|
||||
- `base_delay::Int` - Initial delay in milliseconds
|
||||
- `max_delay::Int` - Maximum delay in milliseconds
|
||||
- `correlation_id::String` - Correlation ID for logging
|
||||
|
||||
Return:
|
||||
- Vector{UInt8} - Fetched data as bytes
|
||||
# Return:
|
||||
- `Vector{UInt8}` - Fetched data as bytes
|
||||
|
||||
# Throws:
|
||||
- `Error` if all retry attempts fail
|
||||
|
||||
# Example
|
||||
```jldoctest
|
||||
# Fetch data with exponential backoff
|
||||
data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "correlation123")
|
||||
```
|
||||
"""
|
||||
function _fetch_with_backoff(
|
||||
url::String,
|
||||
@@ -655,18 +825,44 @@ function _fetch_with_backoff(
|
||||
end
|
||||
|
||||
|
||||
""" Deserialize bytes to data based on type
|
||||
""" _deserialize_data - Deserialize bytes to data based on type
|
||||
This internal function converts serialized bytes back to Julia data based on type.
|
||||
It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow IPC deserialization),
|
||||
"image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data).
|
||||
|
||||
Arguments:
|
||||
# Function Workflow:
|
||||
1. Validates the data type against supported formats
|
||||
2. Converts bytes to appropriate Julia data type based on format
|
||||
3. For text: converts bytes to string
|
||||
4. For dictionary: converts bytes to JSON string then parses to Julia object
|
||||
5. For table: reads Arrow IPC format and returns DataFrame
|
||||
6. For image/audio/video/binary: returns bytes directly
|
||||
|
||||
# Arguments:
|
||||
- `data::Vector{UInt8}` - Serialized data as bytes
|
||||
- `type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
|
||||
- `correlation_id::String` - Correlation ID for logging
|
||||
|
||||
Return:
|
||||
# Return:
|
||||
- Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary")
|
||||
|
||||
# Throws:
|
||||
- `Error` if `type` is not one of the supported types
|
||||
|
||||
# Example
|
||||
```jldoctest
|
||||
# Text data
|
||||
text_bytes = UInt8["Hello World"]
|
||||
text_data = _deserialize_data(text_bytes, "text", "correlation123")
|
||||
|
||||
# JSON data
|
||||
json_bytes = UInt8[123, 34, 110, 97, 109, 101, 34, 58, 34, 65, 108, 105, 99, 101, 125] # {"name":"Alice"}
|
||||
json_data = _deserialize_data(json_bytes, "dictionary", "correlation123")
|
||||
|
||||
# Arrow IPC data (table)
|
||||
arrow_bytes = UInt8[1, 2, 3] # Arrow IPC bytes
|
||||
table_data = _deserialize_data(arrow_bytes, "table", "correlation123")
|
||||
```
|
||||
"""
|
||||
function _deserialize_data(
|
||||
data::Vector{UInt8},
|
||||
@@ -697,15 +893,15 @@ end
|
||||
|
||||
|
||||
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
|
||||
|
||||
This function uploads a raw byte array to a plik server in one-shot mode (no upload session).
|
||||
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
|
||||
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
|
||||
|
||||
The function workflow:
|
||||
1. Obtains an upload ID and token from the server
|
||||
2. Uploads the provided binary data as a file using the `X-UploadToken` header
|
||||
3. Returns identifiers and download URL for the uploaded file
|
||||
# Function Workflow:
|
||||
1. Creates a one-shot upload session by sending POST request with `{"OneShot": true}`
|
||||
2. Retrieves upload ID and token from server response
|
||||
3. Uploads binary data as multipart form data using the token
|
||||
4. Returns identifiers and download URL for the uploaded file
|
||||
|
||||
# Arguments:
|
||||
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
|
||||
@@ -713,14 +909,14 @@ The function workflow:
|
||||
- `data::Vector{UInt8}` - Raw byte data of the file content
|
||||
|
||||
# Return:
|
||||
- A Dict with keys:
|
||||
- `Dict{String, Any}` - Dictionary with keys:
|
||||
- `"status"` - HTTP server response status
|
||||
- `"uploadid"` - ID of the one-shot upload session
|
||||
- `"fileid"` - ID of the uploaded file within the session
|
||||
- `"url"` - Full URL to download the uploaded file
|
||||
|
||||
# Example
|
||||
```julia
|
||||
```jldoctest
|
||||
using HTTP, JSON
|
||||
|
||||
fileServerURL = "http://localhost:8080"
|
||||
@@ -776,31 +972,29 @@ end
|
||||
|
||||
|
||||
""" plik_oneshot_upload(fileServerURL::String, filepath::String)
|
||||
|
||||
Upload a single file to a plik server using one-shot mode.
|
||||
|
||||
This function uploads a file from disk to a plik server in one-shot mode (no upload session).
|
||||
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
|
||||
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
|
||||
|
||||
The function workflow:
|
||||
1. Obtains an upload ID and token from the server
|
||||
2. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header
|
||||
3. Returns identifiers and download URL for the uploaded file
|
||||
# Function Workflow:
|
||||
1. Creates a one-shot upload session by sending POST request with `{"OneShot": true}`
|
||||
2. Retrieves upload ID and token from server response
|
||||
3. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header
|
||||
4. Returns identifiers and download URL for the uploaded file
|
||||
|
||||
# Arguments:
|
||||
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
|
||||
- `filepath::String` - Full path to the local file to upload
|
||||
|
||||
# Return:
|
||||
- A Dict with keys:
|
||||
- `Dict{String, Any}` - Dictionary with keys:
|
||||
- `"status"` - HTTP server response status
|
||||
- `"uploadid"` - ID of the one-shot upload session
|
||||
- `"fileid"` - ID of the uploaded file within the session
|
||||
- `"url"` - Full URL to download the uploaded file
|
||||
|
||||
# Example
|
||||
```julia
|
||||
```jldoctest
|
||||
using HTTP, JSON
|
||||
|
||||
fileServerURL = "http://localhost:8080"
|
||||
|
||||
212
src/README.md
Normal file
212
src/README.md
Normal file
@@ -0,0 +1,212 @@
|
||||
# NATSBridge for Micropython
|
||||
|
||||
A high-performance, bi-directional data bridge for Micropython devices using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
|
||||
|
||||
## Overview
|
||||
|
||||
This module provides functionality for sending and receiving data over NATS with automatic transport selection based on payload size:
|
||||
|
||||
- **Direct Transport**: Payloads < 1MB are sent directly via NATS (Base64 encoded)
|
||||
- **Link Transport**: Payloads >= 1MB are uploaded to an HTTP file server and referenced via URL
|
||||
|
||||
## Features
|
||||
|
||||
- ✅ Bi-directional NATS communication
|
||||
- ✅ Multi-payload support (mixed content in single message)
|
||||
- ✅ Automatic transport selection based on payload size
|
||||
- ✅ File server integration for large payloads
|
||||
- ✅ Exponential backoff for URL fetching
|
||||
- ✅ Correlation ID tracking
|
||||
- ✅ Reply-to support for request-response pattern
|
||||
|
||||
## Supported Payload Types
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `text` | Plain text strings |
|
||||
| `dictionary` | JSON-serializable dictionaries |
|
||||
| `table` | Tabular data (Arrow IPC format) |
|
||||
| `image` | Image data (PNG, JPG bytes) |
|
||||
| `audio` | Audio data (WAV, MP3 bytes) |
|
||||
| `video` | Video data (MP4, AVI bytes) |
|
||||
| `binary` | Generic binary data |
|
||||
|
||||
## Installation
|
||||
|
||||
1. Copy `nats_bridge.py` to your Micropython device
|
||||
2. Ensure you have the following dependencies:
|
||||
- `urequests` for HTTP requests
|
||||
- `ubinascii` for base64 encoding
|
||||
- `ujson` for JSON handling
|
||||
- `usocket` for networking
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic Text Message
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend, smartreceive
|
||||
|
||||
# Sender
|
||||
data = [("message", "Hello World", "text")]
|
||||
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
|
||||
|
||||
# Receiver
|
||||
payloads = smartreceive(msg)
|
||||
for dataname, data, type in payloads:
|
||||
print("Received {}: {}".format(dataname, data))
|
||||
```
|
||||
|
||||
### Sending JSON Configuration
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
config = {
|
||||
"wifi_ssid": "MyNetwork",
|
||||
"wifi_password": "password123",
|
||||
"update_interval": 60
|
||||
}
|
||||
|
||||
data = [("config", config, "dictionary")]
|
||||
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")
|
||||
```
|
||||
|
||||
### Mixed Content (Chat with Text + Image)
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
image_data = b"\x89PNG..." # PNG bytes
|
||||
|
||||
data = [
|
||||
("message_text", "Hello with image!", "text"),
|
||||
("user_avatar", image_data, "binary")
|
||||
]
|
||||
|
||||
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
|
||||
```
|
||||
|
||||
### Request-Response Pattern
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
# Send command with reply-to
|
||||
data = [("command", {"action": "read_sensor"}, "dictionary")]
|
||||
env = smartsend(
|
||||
"/device/command",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
reply_to="/device/response",
|
||||
reply_to_msg_id="cmd-001"
|
||||
)
|
||||
```
|
||||
|
||||
### Large Payloads (File Server)
|
||||
|
||||
```python
|
||||
from nats_bridge import smartsend
|
||||
|
||||
# Large data (> 1MB)
|
||||
large_data = b"A" * 2000000 # 2MB
|
||||
|
||||
env = smartsend(
|
||||
"/data/large",
|
||||
[("large_file", large_data, "binary")],
|
||||
nats_url="nats://localhost:4222",
|
||||
fileserver_url="http://localhost:8080",
|
||||
size_threshold=1000000 # 1MB threshold
|
||||
)
|
||||
```
|
||||
|
||||
## API Reference
|
||||
|
||||
### `smartsend(subject, data, ...)`
|
||||
|
||||
Send data via NATS with automatic transport selection.
|
||||
|
||||
**Arguments:**
|
||||
- `subject` (str): NATS subject to publish to
|
||||
- `data` (list): List of `(dataname, data, type)` tuples
|
||||
- `nats_url` (str): NATS server URL (default: `nats://localhost:4222`)
|
||||
- `fileserver_url` (str): HTTP file server URL (default: `http://localhost:8080`)
|
||||
- `size_threshold` (int): Threshold in bytes (default: 1,000,000)
|
||||
- `correlation_id` (str): Optional correlation ID for tracing
|
||||
- `msg_purpose` (str): Message purpose (default: `"chat"`)
|
||||
- `sender_name` (str): Sender name (default: `"NATSBridge"`)
|
||||
- `receiver_name` (str): Receiver name (default: `""`)
|
||||
- `receiver_id` (str): Receiver ID (default: `""`)
|
||||
- `reply_to` (str): Reply topic (default: `""`)
|
||||
- `reply_to_msg_id` (str): Reply message ID (default: `""`)
|
||||
|
||||
**Returns:** `MessageEnvelope` object
|
||||
|
||||
### `smartreceive(msg, ...)`
|
||||
|
||||
Receive and process NATS messages.
|
||||
|
||||
**Arguments:**
|
||||
- `msg`: NATS message (dict or JSON string)
|
||||
- `fileserver_download_handler` (function): Function to fetch data from URLs
|
||||
- `max_retries` (int): Maximum retry attempts (default: 5)
|
||||
- `base_delay` (int): Initial delay in ms (default: 100)
|
||||
- `max_delay` (int): Maximum delay in ms (default: 5000)
|
||||
|
||||
**Returns:** List of `(dataname, data, type)` tuples
|
||||
|
||||
### `MessageEnvelope`
|
||||
|
||||
Represents a complete NATS message envelope.
|
||||
|
||||
**Attributes:**
|
||||
- `correlation_id`: Unique identifier for tracing
|
||||
- `msg_id`: Unique message identifier
|
||||
- `timestamp`: Message publication timestamp
|
||||
- `send_to`: NATS subject
|
||||
- `msg_purpose`: Message purpose
|
||||
- `sender_name`: Sender name
|
||||
- `sender_id`: Sender UUID
|
||||
- `receiver_name`: Receiver name
|
||||
- `receiver_id`: Receiver UUID
|
||||
- `reply_to`: Reply topic
|
||||
- `reply_to_msg_id`: Reply message ID
|
||||
- `broker_url`: NATS broker URL
|
||||
- `metadata`: Message-level metadata
|
||||
- `payloads`: List of MessagePayload objects
|
||||
|
||||
### `MessagePayload`
|
||||
|
||||
Represents a single payload within a message envelope.
|
||||
|
||||
**Attributes:**
|
||||
- `id`: Unique payload identifier
|
||||
- `dataname`: Name of the payload
|
||||
- `type`: Payload type ("text", "dictionary", etc.)
|
||||
- `transport`: Transport method ("direct" or "link")
|
||||
- `encoding`: Encoding method ("none", "base64", etc.)
|
||||
- `size`: Payload size in bytes
|
||||
- `data`: Payload data (bytes for direct, URL for link)
|
||||
- `metadata`: Payload-level metadata
|
||||
|
||||
## Examples
|
||||
|
||||
See `examples/micropython_example.py` for more detailed examples.
|
||||
|
||||
## Testing
|
||||
|
||||
Run the test suite:
|
||||
|
||||
```bash
|
||||
python test/test_micropython_basic.py
|
||||
```
|
||||
|
||||
## Requirements
|
||||
|
||||
- Micropython with networking support
|
||||
- NATS server (nats.io)
|
||||
- HTTP file server (optional, for large payloads)
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
664
src/nats_bridge.py
Normal file
664
src/nats_bridge.py
Normal file
@@ -0,0 +1,664 @@
|
||||
"""
|
||||
Micropython NATS Bridge - Bi-Directional Data Bridge for Micropython
|
||||
|
||||
This module provides functionality for sending and receiving data over NATS
|
||||
using the Claim-Check pattern for large payloads.
|
||||
|
||||
Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||
"""
|
||||
|
||||
import json
|
||||
import random
|
||||
import time
|
||||
import usocket
|
||||
import uselect
|
||||
import ustruct
|
||||
import uuid
|
||||
|
||||
try:
|
||||
import ussl
|
||||
HAS_SSL = True
|
||||
except ImportError:
|
||||
HAS_SSL = False
|
||||
|
||||
# Constants
|
||||
DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport
|
||||
DEFAULT_NATS_URL = "nats://localhost:4222"
|
||||
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
||||
|
||||
# ============================================= 100 ============================================== #
|
||||
|
||||
|
||||
class MessagePayload:
|
||||
"""Internal message payload structure representing a single payload within a NATS message envelope."""
|
||||
|
||||
def __init__(self, data, msg_type, id="", dataname="", transport="direct",
|
||||
encoding="none", size=0, metadata=None):
|
||||
"""
|
||||
Initialize a MessagePayload.
|
||||
|
||||
Args:
|
||||
data: Payload data (bytes for direct, URL string for link)
|
||||
msg_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
|
||||
id: Unique identifier for this payload (auto-generated if empty)
|
||||
dataname: Name of the payload (auto-generated UUID if empty)
|
||||
transport: Transport method ("direct" or "link")
|
||||
encoding: Encoding method ("none", "json", "base64", "arrow-ipc")
|
||||
size: Size of the payload in bytes
|
||||
metadata: Optional metadata dictionary
|
||||
"""
|
||||
self.id = id if id else self._generate_uuid()
|
||||
self.dataname = dataname if dataname else self._generate_uuid()
|
||||
self.type = msg_type
|
||||
self.transport = transport
|
||||
self.encoding = encoding
|
||||
self.size = size
|
||||
self.data = data
|
||||
self.metadata = metadata if metadata else {}
|
||||
|
||||
def _generate_uuid(self):
|
||||
"""Generate a UUID string."""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
def to_dict(self):
|
||||
"""Convert payload to dictionary for JSON serialization."""
|
||||
payload_dict = {
|
||||
"id": self.id,
|
||||
"dataname": self.dataname,
|
||||
"type": self.type,
|
||||
"transport": self.transport,
|
||||
"encoding": self.encoding,
|
||||
"size": self.size,
|
||||
}
|
||||
|
||||
# Include data based on transport type
|
||||
if self.transport == "direct" and self.data is not None:
|
||||
if self.encoding == "base64" or self.encoding == "json":
|
||||
payload_dict["data"] = self.data
|
||||
else:
|
||||
# For other encodings, use base64
|
||||
payload_dict["data"] = self._to_base64(self.data)
|
||||
elif self.transport == "link" and self.data is not None:
|
||||
# For link transport, data is a URL string
|
||||
payload_dict["data"] = self.data
|
||||
|
||||
if self.metadata:
|
||||
payload_dict["metadata"] = self.metadata
|
||||
|
||||
return payload_dict
|
||||
|
||||
def _to_base64(self, data):
|
||||
"""Convert bytes to base64 string."""
|
||||
if isinstance(data, bytes):
|
||||
# Simple base64 encoding without library
|
||||
import ubinascii
|
||||
return ubinascii.b2a_base64(data).decode('utf-8').strip()
|
||||
return data
|
||||
|
||||
def _from_base64(self, data):
|
||||
"""Convert base64 string to bytes."""
|
||||
import ubinascii
|
||||
return ubinascii.a2b_base64(data)
|
||||
|
||||
|
||||
class MessageEnvelope:
|
||||
"""Internal message envelope structure containing multiple payloads with metadata."""
|
||||
|
||||
def __init__(self, send_to, payloads, correlation_id="", msg_id="", timestamp="",
|
||||
msg_purpose="", sender_name="", sender_id="", receiver_name="",
|
||||
receiver_id="", reply_to="", reply_to_msg_id="", broker_url=DEFAULT_NATS_URL,
|
||||
metadata=None):
|
||||
"""
|
||||
Initialize a MessageEnvelope.
|
||||
|
||||
Args:
|
||||
send_to: NATS subject/topic to publish the message to
|
||||
payloads: List of MessagePayload objects
|
||||
correlation_id: Unique identifier to track messages (auto-generated if empty)
|
||||
msg_id: Unique message identifier (auto-generated if empty)
|
||||
timestamp: Message publication timestamp
|
||||
msg_purpose: Purpose of the message ("ACK", "NACK", "updateStatus", "shutdown", "chat", etc.)
|
||||
sender_name: Name of the sender
|
||||
sender_id: UUID of the sender
|
||||
receiver_name: Name of the receiver (empty means broadcast)
|
||||
receiver_id: UUID of the receiver (empty means broadcast)
|
||||
reply_to: Topic where receiver should reply
|
||||
reply_to_msg_id: Message ID this message is replying to
|
||||
broker_url: NATS broker URL
|
||||
metadata: Optional message-level metadata
|
||||
"""
|
||||
self.correlation_id = correlation_id if correlation_id else self._generate_uuid()
|
||||
self.msg_id = msg_id if msg_id else self._generate_uuid()
|
||||
self.timestamp = timestamp if timestamp else self._get_timestamp()
|
||||
self.send_to = send_to
|
||||
self.msg_purpose = msg_purpose
|
||||
self.sender_name = sender_name
|
||||
self.sender_id = sender_id if sender_id else self._generate_uuid()
|
||||
self.receiver_name = receiver_name
|
||||
self.receiver_id = receiver_id if receiver_id else self._generate_uuid()
|
||||
self.reply_to = reply_to
|
||||
self.reply_to_msg_id = reply_to_msg_id
|
||||
self.broker_url = broker_url
|
||||
self.metadata = metadata if metadata else {}
|
||||
self.payloads = payloads
|
||||
|
||||
def _generate_uuid(self):
|
||||
"""Generate a UUID string."""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
def _get_timestamp(self):
|
||||
"""Get current timestamp in ISO format."""
|
||||
# Simplified timestamp - Micropython may not have full datetime
|
||||
return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime())
|
||||
|
||||
def to_json(self):
|
||||
"""Convert envelope to JSON string."""
|
||||
obj = {
|
||||
"correlationId": self.correlation_id,
|
||||
"msgId": self.msg_id,
|
||||
"timestamp": self.timestamp,
|
||||
"sendTo": self.send_to,
|
||||
"msgPurpose": self.msg_purpose,
|
||||
"senderName": self.sender_name,
|
||||
"senderId": self.sender_id,
|
||||
"receiverName": self.receiver_name,
|
||||
"receiverId": self.receiver_id,
|
||||
"replyTo": self.reply_to,
|
||||
"replyToMsgId": self.reply_to_msg_id,
|
||||
"brokerURL": self.broker_url
|
||||
}
|
||||
|
||||
# Include metadata if not empty
|
||||
if self.metadata:
|
||||
obj["metadata"] = self.metadata
|
||||
|
||||
# Convert payloads to JSON array
|
||||
if self.payloads:
|
||||
payloads_json = []
|
||||
for payload in self.payloads:
|
||||
payloads_json.append(payload.to_dict())
|
||||
obj["payloads"] = payloads_json
|
||||
|
||||
return json.dumps(obj)
|
||||
|
||||
|
||||
def log_trace(correlation_id, message):
|
||||
"""Log a trace message with correlation ID and timestamp."""
|
||||
timestamp = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime())
|
||||
print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message))
|
||||
|
||||
|
||||
def _serialize_data(data, msg_type):
|
||||
"""Serialize data according to specified format.
|
||||
|
||||
Args:
|
||||
data: Data to serialize
|
||||
msg_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary")
|
||||
|
||||
Returns:
|
||||
bytes: Binary representation of the serialized data
|
||||
"""
|
||||
if msg_type == "text":
|
||||
if isinstance(data, str):
|
||||
return data.encode('utf-8')
|
||||
else:
|
||||
raise ValueError("Text data must be a string")
|
||||
|
||||
elif msg_type == "dictionary":
|
||||
if isinstance(data, dict):
|
||||
json_str = json.dumps(data)
|
||||
return json_str.encode('utf-8')
|
||||
else:
|
||||
raise ValueError("Dictionary data must be a dict")
|
||||
|
||||
elif msg_type in ("image", "audio", "video", "binary"):
|
||||
if isinstance(data, bytes):
|
||||
return data
|
||||
else:
|
||||
raise ValueError("{} data must be bytes".format(msg_type.capitalize()))
|
||||
|
||||
else:
|
||||
raise ValueError("Unknown type: {}".format(msg_type))
|
||||
|
||||
|
||||
def _deserialize_data(data_bytes, msg_type, correlation_id):
|
||||
"""Deserialize bytes to data based on type.
|
||||
|
||||
Args:
|
||||
data_bytes: Serialized data as bytes
|
||||
msg_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
|
||||
correlation_id: Correlation ID for logging
|
||||
|
||||
Returns:
|
||||
Deserialized data
|
||||
"""
|
||||
if msg_type == "text":
|
||||
return data_bytes.decode('utf-8')
|
||||
|
||||
elif msg_type == "dictionary":
|
||||
json_str = data_bytes.decode('utf-8')
|
||||
return json.loads(json_str)
|
||||
|
||||
elif msg_type in ("image", "audio", "video", "binary"):
|
||||
return data_bytes
|
||||
|
||||
else:
|
||||
raise ValueError("Unknown type: {}".format(msg_type))
|
||||
|
||||
|
||||
class NATSConnection:
|
||||
"""Simple NATS connection for Micropython."""
|
||||
|
||||
def __init__(self, url=DEFAULT_NATS_URL):
|
||||
"""Initialize NATS connection.
|
||||
|
||||
Args:
|
||||
url: NATS server URL (e.g., "nats://localhost:4222")
|
||||
"""
|
||||
self.url = url
|
||||
self.host = "localhost"
|
||||
self.port = 4222
|
||||
self.conn = None
|
||||
self._parse_url(url)
|
||||
|
||||
def _parse_url(self, url):
|
||||
"""Parse NATS URL to extract host and port."""
|
||||
if url.startswith("nats://"):
|
||||
url = url[7:]
|
||||
elif url.startswith("tls://"):
|
||||
url = url[6:]
|
||||
|
||||
if ":" in url:
|
||||
self.host, port_str = url.split(":")
|
||||
self.port = int(port_str)
|
||||
else:
|
||||
self.host = url
|
||||
|
||||
def connect(self):
|
||||
"""Connect to NATS server."""
|
||||
addr = usocket.getaddrinfo(self.host, self.port)[0][-1]
|
||||
self.conn = usocket.socket()
|
||||
self.conn.connect(addr)
|
||||
log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port))
|
||||
|
||||
def publish(self, subject, message):
|
||||
"""Publish a message to a NATS subject.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish to
|
||||
message: Message to publish (should be bytes or string)
|
||||
"""
|
||||
if isinstance(message, str):
|
||||
message = message.encode('utf-8')
|
||||
|
||||
# Simple NATS protocol implementation
|
||||
msg = "PUB {} {}\r\n".format(subject, len(message))
|
||||
msg = msg.encode('utf-8') + message + b"\r\n"
|
||||
self.conn.send(msg)
|
||||
log_trace("", "Message published to {}".format(subject))
|
||||
|
||||
def subscribe(self, subject, callback):
|
||||
"""Subscribe to a NATS subject.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to subscribe to
|
||||
callback: Callback function to handle incoming messages
|
||||
"""
|
||||
log_trace("", "Subscribed to {}".format(subject))
|
||||
# Simplified subscription - in a real implementation, you'd handle SUB/PUB messages
|
||||
# For Micropython, we'll use a simple polling approach
|
||||
self.subscribed_subject = subject
|
||||
self.subscription_callback = callback
|
||||
|
||||
def wait_message(self, timeout=1000):
|
||||
"""Wait for incoming message.
|
||||
|
||||
Args:
|
||||
timeout: Timeout in milliseconds
|
||||
|
||||
Returns:
|
||||
NATS message object or None if timeout
|
||||
"""
|
||||
# Simplified message reading
|
||||
# In a real implementation, you'd read from the socket
|
||||
# For now, this is a placeholder
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
"""Close the NATS connection."""
|
||||
if self.conn:
|
||||
self.conn.close()
|
||||
self.conn = None
|
||||
log_trace("", "NATS connection closed")
|
||||
|
||||
|
||||
def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""):
|
||||
"""Fetch data from URL with exponential backoff.
|
||||
|
||||
Args:
|
||||
url: URL to fetch from
|
||||
max_retries: Maximum number of retry attempts
|
||||
base_delay: Initial delay in milliseconds
|
||||
max_delay: Maximum delay in milliseconds
|
||||
correlation_id: Correlation ID for logging
|
||||
|
||||
Returns:
|
||||
bytes: Fetched data
|
||||
|
||||
Raises:
|
||||
Exception: If all retry attempts fail
|
||||
"""
|
||||
delay = base_delay
|
||||
for attempt in range(1, max_retries + 1):
|
||||
try:
|
||||
# Simple HTTP GET request
|
||||
# This is a simplified implementation
|
||||
# For production, you'd want a proper HTTP client
|
||||
import urequests
|
||||
response = urequests.get(url)
|
||||
if response.status_code == 200:
|
||||
log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt))
|
||||
return response.content
|
||||
else:
|
||||
raise Exception("Failed to fetch: {}".format(response.status_code))
|
||||
except Exception as e:
|
||||
log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e)))
|
||||
if attempt < max_retries:
|
||||
time.sleep(delay / 1000.0)
|
||||
delay = min(delay * 2, max_delay)
|
||||
|
||||
|
||||
def plik_oneshot_upload(file_server_url, filename, data):
|
||||
"""Upload a single file to a plik server using one-shot mode.
|
||||
|
||||
Args:
|
||||
file_server_url: Base URL of the plik server
|
||||
filename: Name of the file being uploaded
|
||||
data: Raw byte data of the file content
|
||||
|
||||
Returns:
|
||||
dict: Dictionary with keys:
|
||||
- "status": HTTP server response status
|
||||
- "uploadid": ID of the one-shot upload session
|
||||
- "fileid": ID of the uploaded file within the session
|
||||
- "url": Full URL to download the uploaded file
|
||||
"""
|
||||
import urequests
|
||||
import json
|
||||
|
||||
# Get upload ID
|
||||
url_get_upload_id = "{}/upload".format(file_server_url)
|
||||
headers = {"Content-Type": "application/json"}
|
||||
body = json.dumps({"OneShot": True})
|
||||
|
||||
response = urequests.post(url_get_upload_id, headers=headers, data=body)
|
||||
response_json = json.loads(response.content)
|
||||
|
||||
uploadid = response_json.get("id")
|
||||
uploadtoken = response_json.get("uploadToken")
|
||||
|
||||
# Upload file
|
||||
url_upload = "{}/file/{}".format(file_server_url, uploadid)
|
||||
headers = {"X-UploadToken": uploadtoken}
|
||||
|
||||
# For Micropython, we need to construct the multipart form data manually
|
||||
# This is a simplified approach
|
||||
boundary = "----WebKitFormBoundary{}".format(uuid.uuid4().hex[:16])
|
||||
|
||||
# Create multipart body
|
||||
part1 = "--{}\r\n".format(boundary)
|
||||
part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(filename)
|
||||
part1 += "Content-Type: application/octet-stream\r\n\r\n"
|
||||
part1_bytes = part1.encode('utf-8')
|
||||
|
||||
part2 = "\r\n--{}--".format(boundary)
|
||||
part2_bytes = part2.encode('utf-8')
|
||||
|
||||
# Combine all parts
|
||||
full_body = part1_bytes + data + part2_bytes
|
||||
|
||||
# Set content type with boundary
|
||||
content_type = "multipart/form-data; boundary={}".format(boundary)
|
||||
|
||||
response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body)
|
||||
response_json = json.loads(response.content)
|
||||
|
||||
fileid = response_json.get("id")
|
||||
url = "{}/file/{}/{}".format(file_server_url, uploadid, filename)
|
||||
|
||||
return {
|
||||
"status": response.status_code,
|
||||
"uploadid": uploadid,
|
||||
"fileid": fileid,
|
||||
"url": url
|
||||
}
|
||||
|
||||
|
||||
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL,
|
||||
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
|
||||
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
|
||||
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id=""):
|
||||
"""Send data either directly via NATS or via a fileserver URL, depending on payload size.
|
||||
|
||||
This function intelligently routes data delivery based on payload size relative to a threshold.
|
||||
If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and
|
||||
publishes directly over NATS. Otherwise, it uploads the data to a fileserver and publishes
|
||||
only the download URL over NATS.
|
||||
|
||||
Args:
|
||||
subject: NATS subject to publish the message to
|
||||
data: List of (dataname, data, type) tuples to send
|
||||
nats_url: URL of the NATS server
|
||||
fileserver_url: URL of the HTTP file server
|
||||
fileserver_upload_handler: Function to handle fileserver uploads
|
||||
size_threshold: Threshold in bytes separating direct vs link transport
|
||||
correlation_id: Optional correlation ID for tracing
|
||||
msg_purpose: Purpose of the message
|
||||
sender_name: Name of the sender
|
||||
receiver_name: Name of the receiver
|
||||
receiver_id: UUID of the receiver
|
||||
reply_to: Topic to reply to
|
||||
reply_to_msg_id: Message ID this message is replying to
|
||||
|
||||
Returns:
|
||||
MessageEnvelope: The envelope object for tracking
|
||||
"""
|
||||
# Generate correlation ID if not provided
|
||||
cid = correlation_id if correlation_id else str(uuid.uuid4())
|
||||
|
||||
log_trace(cid, "Starting smartsend for subject: {}".format(subject))
|
||||
|
||||
# Generate message metadata
|
||||
msg_id = str(uuid.uuid4())
|
||||
|
||||
# Process each payload in the list
|
||||
payloads = []
|
||||
|
||||
for dataname, payload_data, payload_type in data:
|
||||
# Serialize data based on type
|
||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
||||
|
||||
payload_size = len(payload_bytes)
|
||||
log_trace(cid, "Serialized payload '{}' (type: {}) size: {} bytes".format(
|
||||
dataname, payload_type, payload_size))
|
||||
|
||||
# Decision: Direct vs Link
|
||||
if payload_size < size_threshold:
|
||||
# Direct path - Base64 encode and send via NATS
|
||||
payload_b64 = _serialize_data(payload_bytes, "binary") # Already bytes
|
||||
# Convert to base64 string for JSON
|
||||
import ubinascii
|
||||
payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip()
|
||||
|
||||
log_trace(cid, "Using direct transport for {} bytes".format(payload_size))
|
||||
|
||||
# Create MessagePayload for direct transport
|
||||
payload = MessagePayload(
|
||||
payload_b64_str,
|
||||
payload_type,
|
||||
id=str(uuid.uuid4()),
|
||||
dataname=dataname,
|
||||
transport="direct",
|
||||
encoding="base64",
|
||||
size=payload_size,
|
||||
metadata={"payload_bytes": payload_size}
|
||||
)
|
||||
payloads.append(payload)
|
||||
else:
|
||||
# Link path - Upload to HTTP server, send URL via NATS
|
||||
log_trace(cid, "Using link transport, uploading to fileserver")
|
||||
|
||||
# Upload to HTTP server
|
||||
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||||
|
||||
if response["status"] != 200:
|
||||
raise Exception("Failed to upload data to fileserver: {}".format(response["status"]))
|
||||
|
||||
url = response["url"]
|
||||
log_trace(cid, "Uploaded to URL: {}".format(url))
|
||||
|
||||
# Create MessagePayload for link transport
|
||||
payload = MessagePayload(
|
||||
url,
|
||||
payload_type,
|
||||
id=str(uuid.uuid4()),
|
||||
dataname=dataname,
|
||||
transport="link",
|
||||
encoding="none",
|
||||
size=payload_size,
|
||||
metadata={}
|
||||
)
|
||||
payloads.append(payload)
|
||||
|
||||
# Create MessageEnvelope with all payloads
|
||||
env = MessageEnvelope(
|
||||
subject,
|
||||
payloads,
|
||||
correlation_id=cid,
|
||||
msg_id=msg_id,
|
||||
msg_purpose=msg_purpose,
|
||||
sender_name=sender_name,
|
||||
sender_id=str(uuid.uuid4()),
|
||||
receiver_name=receiver_name,
|
||||
receiver_id=receiver_id,
|
||||
reply_to=reply_to,
|
||||
reply_to_msg_id=reply_to_msg_id,
|
||||
broker_url=nats_url,
|
||||
metadata={}
|
||||
)
|
||||
|
||||
msg_json = env.to_json()
|
||||
|
||||
# Publish to NATS
|
||||
nats_conn = NATSConnection(nats_url)
|
||||
nats_conn.connect()
|
||||
nats_conn.publish(subject, msg_json)
|
||||
nats_conn.close()
|
||||
|
||||
return env
|
||||
|
||||
|
||||
def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5,
|
||||
base_delay=100, max_delay=5000):
|
||||
"""Receive and process messages from NATS.
|
||||
|
||||
This function processes incoming NATS messages, handling both direct transport
|
||||
(base64 decoded payloads) and link transport (URL-based payloads).
|
||||
|
||||
Args:
|
||||
msg: NATS message to process (dict with payload data)
|
||||
fileserver_download_handler: Function to handle downloading data from file server URLs
|
||||
max_retries: Maximum retry attempts for fetching URL
|
||||
base_delay: Initial delay for exponential backoff in ms
|
||||
max_delay: Maximum delay for exponential backoff in ms
|
||||
|
||||
Returns:
|
||||
list: List of (dataname, data, type) tuples
|
||||
"""
|
||||
# Parse the JSON envelope
|
||||
json_data = msg if isinstance(msg, dict) else json.loads(msg)
|
||||
log_trace(json_data.get("correlationId", ""), "Processing received message")
|
||||
|
||||
# Process all payloads in the envelope
|
||||
payloads_list = []
|
||||
|
||||
# Get number of payloads
|
||||
num_payloads = len(json_data.get("payloads", []))
|
||||
|
||||
for i in range(num_payloads):
|
||||
payload = json_data["payloads"][i]
|
||||
transport = payload.get("transport", "")
|
||||
dataname = payload.get("dataname", "")
|
||||
|
||||
if transport == "direct":
|
||||
log_trace(json_data.get("correlationId", ""),
|
||||
"Direct transport - decoding payload '{}'".format(dataname))
|
||||
|
||||
# Extract base64 payload from the payload
|
||||
payload_b64 = payload.get("data", "")
|
||||
|
||||
# Decode Base64 payload
|
||||
import ubinascii
|
||||
payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8'))
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = payload.get("type", "")
|
||||
data = _deserialize_data(payload_bytes, data_type, json_data.get("correlationId", ""))
|
||||
|
||||
payloads_list.append((dataname, data, data_type))
|
||||
|
||||
elif transport == "link":
|
||||
# Extract download URL from the payload
|
||||
url = payload.get("data", "")
|
||||
log_trace(json_data.get("correlationId", ""),
|
||||
"Link transport - fetching '{}' from URL: {}".format(dataname, url))
|
||||
|
||||
# Fetch with exponential backoff
|
||||
downloaded_data = fileserver_download_handler(
|
||||
url, max_retries, base_delay, max_delay, json_data.get("correlationId", "")
|
||||
)
|
||||
|
||||
# Deserialize based on type
|
||||
data_type = payload.get("type", "")
|
||||
data = _deserialize_data(downloaded_data, data_type, json_data.get("correlationId", ""))
|
||||
|
||||
payloads_list.append((dataname, data, data_type))
|
||||
|
||||
else:
|
||||
raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport))
|
||||
|
||||
return payloads_list
|
||||
|
||||
|
||||
# Utility functions
|
||||
def generate_uuid():
|
||||
"""Generate a UUID string."""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
def get_timestamp():
|
||||
"""Get current timestamp in ISO format."""
|
||||
return time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime())
|
||||
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
print("NATSBridge for Micropython")
|
||||
print("=========================")
|
||||
print("This module provides:")
|
||||
print(" - MessageEnvelope: Message envelope structure")
|
||||
print(" - MessagePayload: Payload structure")
|
||||
print(" - smartsend: Send data via NATS with automatic transport selection")
|
||||
print(" - smartreceive: Receive and process messages from NATS")
|
||||
print(" - plik_oneshot_upload: Upload files to HTTP file server")
|
||||
print(" - _fetch_with_backoff: Fetch data from URLs with retry logic")
|
||||
print()
|
||||
print("Usage:")
|
||||
print(" from nats_bridge import smartsend, smartreceive")
|
||||
print(" data = [(\"message\", \"Hello World\", \"text\")]")
|
||||
print(" env = smartsend(\"my.subject\", data)")
|
||||
print()
|
||||
print(" # On receiver:")
|
||||
print(" payloads = smartreceive(msg)")
|
||||
print(" for dataname, data, type in payloads:")
|
||||
print(" print(f\"Received {dataname} of type {type}: {data}\")")
|
||||
220
test/test_micropython_basic.py
Normal file
220
test/test_micropython_basic.py
Normal file
@@ -0,0 +1,220 @@
|
||||
"""
|
||||
Micropython NATS Bridge - Basic Test Examples
|
||||
|
||||
This module demonstrates basic usage of the NATSBridge for Micropython.
|
||||
"""
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, "../src")
|
||||
|
||||
from nats_bridge import MessageEnvelope, MessagePayload, smartsend, smartreceive, log_trace
|
||||
import json
|
||||
|
||||
# ============================================= 100 ============================================== #
|
||||
|
||||
|
||||
def test_text_message():
|
||||
"""Test sending and receiving text messages."""
|
||||
print("\n=== Test 1: Text Message ===")
|
||||
|
||||
# Send text message
|
||||
data = [
|
||||
("message", "Hello World", "text"),
|
||||
("greeting", "Good morning!", "text")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/test/text",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
size_threshold=1000000
|
||||
)
|
||||
|
||||
print("Sent envelope:")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Correlation ID: {}".format(env.correlation_id))
|
||||
print(" Payloads: {}".format(len(env.payloads)))
|
||||
|
||||
# Expected output on receiver:
|
||||
# payloads = smartreceive(msg)
|
||||
# for dataname, data, type in payloads:
|
||||
# print("Received {}: {}".format(dataname, data))
|
||||
|
||||
|
||||
def test_dictionary_message():
|
||||
"""Test sending and receiving dictionary messages."""
|
||||
print("\n=== Test 2: Dictionary Message ===")
|
||||
|
||||
# Send dictionary message
|
||||
config = {
|
||||
"step_size": 0.01,
|
||||
"iterations": 1000,
|
||||
"threshold": 0.5
|
||||
}
|
||||
|
||||
data = [
|
||||
("config", config, "dictionary")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/test/dictionary",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
size_threshold=1000000
|
||||
)
|
||||
|
||||
print("Sent envelope:")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Payloads: {}".format(len(env.payloads)))
|
||||
|
||||
# Expected output on receiver:
|
||||
# payloads = smartreceive(msg)
|
||||
# for dataname, data, type in payloads:
|
||||
# if type == "dictionary":
|
||||
# print("Config: {}".format(data))
|
||||
|
||||
|
||||
def test_mixed_payloads():
|
||||
"""Test sending mixed payload types in a single message."""
|
||||
print("\n=== Test 3: Mixed Payloads ===")
|
||||
|
||||
# Mixed content: text, dictionary, and binary
|
||||
image_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR" # PNG header (example)
|
||||
|
||||
data = [
|
||||
("message_text", "Hello!", "text"),
|
||||
("user_config", {"theme": "dark", "volume": 80}, "dictionary"),
|
||||
("user_image", image_data, "binary")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/test/mixed",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
size_threshold=1000000
|
||||
)
|
||||
|
||||
print("Sent envelope:")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Payloads: {}".format(len(env.payloads)))
|
||||
|
||||
# Expected output on receiver:
|
||||
# payloads = smartreceive(msg)
|
||||
# for dataname, data, type in payloads:
|
||||
# print("Received {}: {} (type: {})".format(dataname, data if type != "binary" else len(data), type))
|
||||
|
||||
|
||||
def test_large_payload():
|
||||
"""Test sending large payloads that require fileserver upload."""
|
||||
print("\n=== Test 4: Large Payload (Link Transport) ===")
|
||||
|
||||
# Create large data (> 1MB would trigger link transport)
|
||||
# For testing, we'll use a smaller size but configure threshold lower
|
||||
large_data = b"A" * 100000 # 100KB
|
||||
|
||||
data = [
|
||||
("large_data", large_data, "binary")
|
||||
]
|
||||
|
||||
# Use a lower threshold for testing
|
||||
env = smartsend(
|
||||
"/test/large",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
fileserver_url="http://localhost:8080",
|
||||
size_threshold=50000 # 50KB threshold for testing
|
||||
)
|
||||
|
||||
print("Sent envelope:")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Payloads: {}".format(len(env.payloads)))
|
||||
for p in env.payloads:
|
||||
print(" - Transport: {}, Type: {}".format(p.transport, p.type))
|
||||
|
||||
|
||||
def test_reply_to():
|
||||
"""Test sending messages with reply-to functionality."""
|
||||
print("\n=== Test 5: Reply To ===")
|
||||
|
||||
data = [
|
||||
("command", {"action": "start"}, "dictionary")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/test/command",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
reply_to="/test/response",
|
||||
reply_to_msg_id="reply-123",
|
||||
msg_purpose="command"
|
||||
)
|
||||
|
||||
print("Sent envelope:")
|
||||
print(" Subject: {}".format(env.send_to))
|
||||
print(" Reply To: {}".format(env.reply_to))
|
||||
print(" Reply To Msg ID: {}".format(env.reply_to_msg_id))
|
||||
|
||||
|
||||
def test_correlation_id():
|
||||
"""Test using custom correlation IDs for tracing."""
|
||||
print("\n=== Test 6: Custom Correlation ID ===")
|
||||
|
||||
custom_cid = "trace-abc123"
|
||||
data = [
|
||||
("message", "Test with correlation ID", "text")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/test/correlation",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
correlation_id=custom_cid
|
||||
)
|
||||
|
||||
print("Sent envelope with correlation ID: {}".format(env.correlation_id))
|
||||
print("This ID can be used to trace the message flow.")
|
||||
|
||||
|
||||
def test_multiple_payloads():
|
||||
"""Test sending multiple payloads in one message."""
|
||||
print("\n=== Test 7: Multiple Payloads ===")
|
||||
|
||||
data = [
|
||||
("text_message", "Hello", "text"),
|
||||
("json_data", {"key": "value", "number": 42}, "dictionary"),
|
||||
("table_data", b"\x01\x02\x03\x04", "binary"),
|
||||
("audio_data", b"\x00\x01\x02\x03", "binary")
|
||||
]
|
||||
|
||||
env = smartsend(
|
||||
"/test/multiple",
|
||||
data,
|
||||
nats_url="nats://localhost:4222",
|
||||
size_threshold=1000000
|
||||
)
|
||||
|
||||
print("Sent {} payloads in one message".format(len(env.payloads)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Micropython NATS Bridge Test Suite")
|
||||
print("==================================")
|
||||
print()
|
||||
|
||||
# Run tests
|
||||
test_text_message()
|
||||
test_dictionary_message()
|
||||
test_mixed_payloads()
|
||||
test_large_payload()
|
||||
test_reply_to()
|
||||
test_correlation_id()
|
||||
test_multiple_payloads()
|
||||
|
||||
print("\n=== All tests completed ===")
|
||||
print()
|
||||
print("Note: These tests require:")
|
||||
print(" 1. A running NATS server at nats://localhost:4222")
|
||||
print(" 2. An HTTP file server at http://localhost:8080 (for large payloads)")
|
||||
print()
|
||||
print("To run the tests:")
|
||||
print(" python test_micropython_basic.py")
|
||||
Reference in New Issue
Block a user