3 Commits

Author SHA1 Message Date
a064be0e5c update 2026-02-22 13:54:36 +07:00
ton
8a35f1d4dc Merge pull request 'add micropython support' (#3) from add_micropython into main
Reviewed-on: #3
2026-02-22 06:28:25 +00:00
9e5ee61785 update 2026-02-22 13:26:44 +07:00
6 changed files with 1590 additions and 68 deletions

View File

@@ -1,3 +1,8 @@
name = "NATSBridge"
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
version = "0.1.0"
authors = ["narawat <narawat@gmail.com>"]
[deps] [deps]
Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
@@ -9,3 +14,9 @@ NATS = "55e73f9c-eeeb-467f-b4cc-a633fde63d2a"
PrettyPrinting = "54e16d92-306c-5ea0-a30b-337be88ac337" PrettyPrinting = "54e16d92-306c-5ea0-a30b-337be88ac337"
Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" Revise = "295af30f-e4ad-537b-8983-00126c2a3abe"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
[compat]
JSON = "1.4.0"

View File

@@ -0,0 +1,221 @@
"""
Micropython NATS Bridge - Simple Example
This example demonstrates the basic usage of the NATSBridge for Micropython.
"""
import sys
sys.path.insert(0, "../src")
from nats_bridge import smartsend, smartreceive, log_trace
import json
def example_simple_chat():
"""
Simple chat example: Send text messages via NATS.
Sender (this script):
- Sends a text message to NATS
- Uses direct transport (no fileserver needed)
Receiver (separate script):
- Listens to NATS
- Receives and processes the message
"""
print("=== Simple Chat Example ===")
print()
# Define the message data as list of (dataname, data, type) tuples
data = [
("message", "Hello from Micropython!", "text")
]
# Send the message
env = smartsend(
"/chat/room1",
data,
nats_url="nats://localhost:4222",
msg_purpose="chat",
sender_name="micropython-client"
)
print("Message sent!")
print(" Subject: {}".format(env.send_to))
print(" Correlation ID: {}".format(env.correlation_id))
print(" Payloads: {}".format(len(env.payloads)))
print()
# Expected receiver output:
print("Expected receiver output:")
print(" [timestamp] [Correlation: ...] Starting smartsend for subject: /chat/room1")
print(" [timestamp] [Correlation: ...] Serialized payload 'message' (type: text) size: 22 bytes")
print(" [timestamp] [Correlation: ...] Using direct transport for 22 bytes")
print(" [timestamp] [Correlation: ...] Message published to /chat/room1")
print()
return env
def example_send_json():
"""
Example: Send JSON configuration to a Micropython device.
This demonstrates sending structured data (dictionary type).
"""
print("\n=== Send JSON Configuration ===")
print()
# Define configuration as dictionary
config = {
"wifi_ssid": "MyNetwork",
"wifi_password": "password123",
"server_host": "mqtt.example.com",
"server_port": 1883,
"update_interval": 60
}
# Send configuration
data = [
("device_config", config, "dictionary")
]
env = smartsend(
"/device/config",
data,
nats_url="nats://localhost:4222",
msg_purpose="updateStatus",
sender_name="server"
)
print("Configuration sent!")
print(" Subject: {}".format(env.send_to))
print(" Payloads: {}".format(len(env.payloads)))
print()
return env
def example_receive_message(msg):
"""
Example: Receive and process a NATS message.
Args:
msg: The NATS message received (should be dict or JSON string)
Returns:
list: List of (dataname, data, type) tuples
"""
print("\n=== Receive Message ===")
print()
# Process the message
payloads = smartreceive(
msg,
fileserver_download_handler=None, # Not needed for direct transport
max_retries=3,
base_delay=100,
max_delay=1000
)
print("Received {} payload(s):".format(len(payloads)))
for dataname, data, type in payloads:
print(" - {}: {} (type: {})".format(dataname, data, type))
return payloads
def example_mixed_content():
"""
Example: Send mixed content (text + dictionary + binary).
This demonstrates the multi-payload capability.
"""
print("\n=== Mixed Content Example ===")
print()
# Create mixed content
image_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR" # Example PNG header
data = [
("message_text", "Hello with image!", "text"),
("user_config", {"theme": "dark", "notifications": True}, "dictionary"),
("user_avatar", image_data, "binary")
]
env = smartsend(
"/chat/mixed",
data,
nats_url="nats://localhost:4222",
msg_purpose="chat",
sender_name="micropython-client"
)
print("Mixed content sent!")
print(" Subject: {}".format(env.send_to))
print(" Payloads:")
for p in env.payloads:
print(" - {} (transport: {}, type: {}, size: {} bytes)".format(
p.dataname, p.transport, p.type, p.size))
return env
def example_reply():
"""
Example: Send a message with reply-to functionality.
This demonstrates request-response pattern.
"""
print("\n=== Request-Response Example ===")
print()
# Send command
data = [
("command", {"action": "read_sensor", "sensor_id": "temp1"}, "dictionary")
]
env = smartsend(
"/device/command",
data,
nats_url="nats://localhost:4222",
msg_purpose="command",
sender_name="server",
reply_to="/device/response",
reply_to_msg_id="cmd-001"
)
print("Command sent!")
print(" Subject: {}".format(env.send_to))
print(" Reply To: {}".format(env.reply_to))
print(" Reply To Msg ID: {}".format(env.reply_to_msg_id))
print()
print("Expected receiver behavior:")
print(" 1. Receive command on /device/command")
print(" 2. Process command")
print(" 3. Send response to /device/response")
print(" 4. Include replyToMsgId in response")
return env
if __name__ == "__main__":
print("Micropython NATS Bridge Examples")
print("================================")
print()
# Run examples
example_simple_chat()
example_send_json()
example_mixed_content()
example_reply()
print("\n=== Examples Completed ===")
print()
print("To run these examples, you need:")
print(" 1. A running NATS server at nats://localhost:4222")
print(" 2. Import the nats_bridge module")
print(" 3. Call the desired example function")
print()
print("For more examples, see test/test_micropython_basic.py")

View File

@@ -10,7 +10,7 @@
# #
# Handler Function Signatures: # Handler Function Signatures:
# #
# ```julia # ```jldoctest
# # Upload handler - uploads data to file server and returns URL # # Upload handler - uploads data to file server and returns URL
# fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} # fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# #
@@ -23,7 +23,7 @@
# Even when sending a single payload, the user must wrap it in a list. # Even when sending a single payload, the user must wrap it in a list.
# #
# API Standard: # API Standard:
# ```julia # ```jldoctest
# # Input format for smartsend (always a list of tuples with type info) # # Input format for smartsend (always a list of tuples with type info)
# [(dataname1, data1, type1), (dataname2, data2, type2), ...] # [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# #
@@ -45,6 +45,59 @@ const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL
const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport
""" msgPayload_v1 - Internal message payload structure
This structure represents a single payload within a NATS message envelope.
It supports both direct transport (base64-encoded data) and link transport (URL-based).
# Arguments:
- `id::String` - Unique identifier for this payload (e.g., "uuid4")
- `dataname::String` - Name of the payload (e.g., "login_image")
- `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
- `transport::String` - Transport method: "direct" or "link"
- `encoding::String` - Encoding method: "none", "json", "base64", "arrow-ipc"
- `size::Integer` - Size of the payload in bytes (e.g., 15433)
- `data::Any` - Payload data (bytes for direct, URL for link)
- `metadata::Dict{String, Any}` - Optional metadata dictionary
# Keyword Arguments:
- `id::String = ""` - Payload ID, auto-generated if empty
- `dataname::String = string(uuid4())` - Payload name, auto-generated UUID if empty
- `transport::String = "direct"` - Transport method
- `encoding::String = "none"` - Encoding method
- `size::Integer = 0` - Payload size
- `metadata::Dict{String, T} = Dict{String, Any}()` - Metadata dictionary
# Return:
- A msgPayload_v1 struct instance
# Example
```jldoctest
using UUIDs
# Create a direct transport payload
payload = msgPayload_v1(
"Hello World",
"text";
id = string(uuid4()),
dataname = "message",
transport = "direct",
encoding = "base64",
size = 11,
metadata = Dict{String, Any}()
)
# Create a link transport payload
payload = msgPayload_v1(
"http://example.com/file.zip",
"binary";
id = string(uuid4()),
dataname = "file",
transport = "link",
encoding = "none",
size = 1000000
)
```
"""
struct msgPayload_v1 struct msgPayload_v1
id::String # id of this payload e.g. "uuid4" id::String # id of this payload e.g. "uuid4"
dataname::String # name of this payload e.g. "login_image" dataname::String # name of this payload e.g. "login_image"
@@ -80,6 +133,51 @@ function msgPayload_v1(
end end
""" msgEnvelope_v1 - Internal message envelope structure
This structure represents a complete NATS message envelope containing multiple payloads
with metadata for routing, tracing, and message context.
# Arguments:
- `sendTo::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt")
- `payloads::AbstractArray{msgPayload_v1}` - List of payloads to include in the message
# Keyword Arguments:
- `correlationId::String = ""` - Unique identifier to track messages across systems; auto-generated if empty
- `msgId::String = ""` - Unique message identifier; auto-generated if empty
- `timestamp::String = string(Dates.now())` - Message publication timestamp
- `msgPurpose::String = ""` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
- `senderName::String = ""` - Name of the sender (e.g., "agent-wine-web-frontend")
- `senderId::String = ""` - UUID of the sender; auto-generated if empty
- `receiverName::String = ""` - Name of the receiver (empty string means broadcast)
- `receiverId::String = ""` - UUID of the receiver (empty string means broadcast)
- `replyTo::String = ""` - Topic where receiver should reply (empty string if no reply expected)
- `replyToMsgId::String = ""` - Message ID this message is replying to
- `brokerURL::String = DEFAULT_NATS_URL` - NATS broker URL
- `metadata::Dict{String, Any} = Dict{String, Any}()` - Optional message-level metadata
# Return:
- A msgEnvelope_v1 struct instance
# Example
```jldoctest
using UUIDs, NATSBridge
# Create payloads for the message
payload1 = msgPayload_v1("Hello", "text"; dataname="message", transport="direct", encoding="base64")
payload2 = msgPayload_v1("http://example.com/file.zip", "binary"; dataname="file", transport="link")
# Create message envelope
env = msgEnvelope_v1(
"my.subject",
[payload1, payload2];
correlationId = string(uuid4()),
msgPurpose = "chat",
senderName = "my-app",
receiverName = "receiver-app",
replyTo = "reply.subject"
)
```
"""
struct msgEnvelope_v1 struct msgEnvelope_v1
correlationId::String # Unique identifier to track messages across systems. Many senders can talk about the same topic. correlationId::String # Unique identifier to track messages across systems. Many senders can talk about the same topic.
msgId::String # this message id msgId::String # this message id
@@ -137,8 +235,34 @@ end
""" Convert msgEnvelope_v1 to JSON string """ envelope_to_json - Convert msgEnvelope_v1 to JSON string
This function converts the msgEnvelope_v1 struct to a JSON string representation. This function converts the msgEnvelope_v1 struct to a JSON string representation,
preserving all metadata and payload information for NATS message publishing.
# Function Workflow:
1. Creates a dictionary with envelope metadata (correlationId, msgId, timestamp, etc.)
2. Conditionally includes metadata dictionary if not empty
3. Iterates through payloads and converts each to JSON-compatible dictionary
4. Handles direct transport payloads (Base64 encoding) and link transport payloads (URL)
5. Returns final JSON string representation
# Arguments:
- `env::msgEnvelope_v1` - The msgEnvelope_v1 struct to convert to JSON
# Return:
- `String` - JSON string representation of the envelope
# Example
```jldoctest
using UUIDs
# Create an envelope with payloads
payload = msgPayload_v1("Hello", "text"; dataname="msg", transport="direct", encoding="base64")
env = msgEnvelope_v1("my.subject", [payload])
# Convert to JSON for publishing
json_msg = envelope_to_json(env)
```
""" """
function envelope_to_json(env::msgEnvelope_v1) function envelope_to_json(env::msgEnvelope_v1)
obj = Dict{String, Any}( obj = Dict{String, Any}(
@@ -197,9 +321,24 @@ function envelope_to_json(env::msgEnvelope_v1)
end end
""" Log a trace message with correlation ID and timestamp """ log_trace - Log a trace message with correlation ID and timestamp
This function logs information messages with a correlation ID for tracing purposes, This function logs information messages with a correlation ID for tracing purposes,
making it easier to track message flow across distributed systems. making it easier to track message flow across distributed systems.
# Arguments:
- `correlation_id::String` - Correlation ID to identify the message flow
- `message::String` - The message content to log
# Return:
- `nothing` - This function performs logging but returns nothing
# Example
```jldoctest
using Dates
log_trace("abc123", "Starting message processing")
# Logs: [2026-02-21T05:39:00] [Correlation: abc123] Starting message processing
```
""" """
function log_trace(correlation_id::String, message::String) function log_trace(correlation_id::String, message::String)
timestamp = Dates.now() # Get current timestamp timestamp = Dates.now() # Get current timestamp
@@ -216,7 +355,7 @@ Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_u
The function accepts a list of (dataname, data, type) tuples as input and processes each payload individually. The function accepts a list of (dataname, data, type) tuples as input and processes each payload individually.
Each payload can have a different type, enabling mixed-content messages (e.g., chat with text, images, audio). Each payload can have a different type, enabling mixed-content messages (e.g., chat with text, images, audio).
The function workflow: # Function Workflow:
1. Iterates through the list of (dataname, data, type) tuples 1. Iterates through the list of (dataname, data, type) tuples
2. For each payload: extracts the type from the tuple and serializes accordingly 2. For each payload: extracts the type from the tuple and serializes accordingly
3. Compares the serialized size against `size_threshold` 3. Compares the serialized size against `size_threshold`
@@ -247,7 +386,7 @@ The function workflow:
- A `msgEnvelope_v1` object containing metadata and transport information - A `msgEnvelope_v1` object containing metadata and transport information
# Example # Example
```julia ```jldoctest
using UUIDs using UUIDs
# Send a single payload (still wrapped in a list) # Send a single payload (still wrapped in a list)
@@ -376,26 +515,20 @@ end
""" _serialize_data - Serialize data according to specified format """ _serialize_data - Serialize data according to specified format
This function serializes arbitrary Julia data into a binary representation based on the specified format. This function serializes arbitrary Julia data into a binary representation based on the specified format.
It supports multiple serialization formats: It supports multiple serialization formats for different data types.
- `"text"`: Treats data as text and converts to UTF-8 bytes
- `"dictionary"`: Serializes data as JSON and returns the UTF-8 byte representation
- `"table"`: Serializes data as an Arrow IPC stream (table format) and returns the byte stream
- `"image"`: Expects binary image data (Vector{UInt8}) and returns it as bytes
- `"audio"`: Expects binary audio data (Vector{UInt8}) and returns it as bytes
- `"video"`: Expects binary video data (Vector{UInt8}) and returns it as bytes
- `"binary"`: Generic binary data (Vector{UInt8} or IOBuffer) and returns bytes
The function handles format-specific serialization logic: # Function Workflow:
1. For `"text"`: Converts string to UTF-8 bytes 1. Validates the data type against the specified format
2. For `"dictionary"`: Converts Julia data to JSON string, then encodes to bytes 2. Converts data to binary representation according to format rules
3. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer 3. For text: converts string to UTF-8 bytes
4. For `"image"`, `"audio"`, `"video"`: Treats data as binary (Vector{UInt8}) 4. For dictionary: serializes as JSON then converts to bytes
5. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly 5. For table: uses Arrow.jl to write as IPC stream
6. For image/audio/video/binary: returns binary data directly
# Arguments: # Arguments:
- `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`) - `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`)
- `type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
# Return: # Return:
- `Vector{UInt8}` - Binary representation of the serialized data - `Vector{UInt8}` - Binary representation of the serialized data
@@ -405,7 +538,7 @@ The function handles format-specific serialization logic:
- `Error` if `type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}` - `Error` if `type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}`
# Example # Example
```julia ```jldoctest
using JSON, Arrow, DataFrames using JSON, Arrow, DataFrames
# Text serialization # Text serialization
@@ -505,15 +638,29 @@ function _serialize_data(data::Any, type::String)
end end
""" Publish message to NATS """ publish_message - Publish message to NATS
This internal function publishes a message to a NATS subject with proper This internal function publishes a message to a NATS subject with proper
connection management and logging. connection management and logging.
Arguments: # Arguments:
- `nats_url::String` - NATS server URL - `nats_url::String` - NATS server URL (e.g., "nats://localhost:4222")
- `subject::String` - NATS subject to publish to - `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
- `message::String` - JSON message to publish - `message::String` - JSON message to publish
- `correlation_id::String` - Correlation ID for logging - `correlation_id::String` - Correlation ID for tracing and logging
# Return:
- `nothing` - This function performs publishing but returns nothing
# Example
```jldoctest
using NATS
# Prepare JSON message
message = "{\"correlationId\":\"abc123\",\"payload\":\"test\"}"
# Publish to NATS
publish_message("nats://localhost:4222", "my.subject", message, "abc123")
```
""" """
function publish_message(nats_url::String, subject::String, message::String, correlation_id::String) function publish_message(nats_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(nats_url) # Create NATS connection conn = NATS.connect(nats_url) # Create NATS connection
@@ -532,20 +679,27 @@ This function processes incoming NATS messages, handling both direct transport
It deserializes the data based on the transport type and returns the result. It deserializes the data based on the transport type and returns the result.
A HTTP file server is required along with its download function. A HTTP file server is required along with its download function.
Arguments: # Function Workflow:
1. Parses the JSON envelope from the NATS message
2. Iterates through each payload in the envelope
3. For each payload: determines the transport type (direct or link)
4. For direct transport: decodes Base64 payload and deserializes based on type
5. For link transport: fetches data from URL with exponential backoff, then deserializes
# Arguments:
- `msg::NATS.Msg` - NATS message to process - `msg::NATS.Msg` - NATS message to process
- `fileserverDownloadHandler::Function` - Function to handle downloading data from file server URLs
Keyword Arguments: # Keyword Arguments:
- `max_retries::Int` - Maximum retry attempts for fetching URL (default: 5) - `fileserverDownloadHandler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs
- `base_delay::Int` - Initial delay for exponential backoff in ms (default: 100) - `max_retries::Int = 5` - Maximum retry attempts for fetching URL
- `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000) - `base_delay::Int = 100` - Initial delay for exponential backoff in ms
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
Return: # Return:
- `AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples - `AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples
# Example # Example
```julia ```jldoctest
# Receive and process message # Receive and process message
msg = nats_message # NATS message msg = nats_message # NATS message
payloads = smartreceive(msg; fileserverDownloadHandler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000) payloads = smartreceive(msg; fileserverDownloadHandler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
@@ -610,19 +764,35 @@ function smartreceive(
end end
""" Fetch data from URL with exponential backoff """ _fetch_with_backoff - Fetch data from URL with exponential backoff
This internal function retrieves data from a URL with retry logic using This internal function retrieves data from a URL with retry logic using
exponential backoff to handle transient failures. exponential backoff to handle transient failures.
Arguments: # Function Workflow:
1. Initializes delay with base_delay value
2. Attempts to fetch data from URL in a retry loop
3. On success: logs success and returns response body as bytes
4. On failure: sleeps using exponential backoff and retries
5. After max_retries: throws error indicating failure
# Arguments:
- `url::String` - URL to fetch from - `url::String` - URL to fetch from
- `max_retries::Int` - Maximum number of retry attempts - `max_retries::Int` - Maximum number of retry attempts
- `base_delay::Int` - Initial delay in milliseconds - `base_delay::Int` - Initial delay in milliseconds
- `max_delay::Int` - Maximum delay in milliseconds - `max_delay::Int` - Maximum delay in milliseconds
- `correlation_id::String` - Correlation ID for logging - `correlation_id::String` - Correlation ID for logging
Return: # Return:
- Vector{UInt8} - Fetched data as bytes - `Vector{UInt8}` - Fetched data as bytes
# Throws:
- `Error` if all retry attempts fail
# Example
```jldoctest
# Fetch data with exponential backoff
data = _fetch_with_backoff("http://example.com/file.zip", 5, 100, 5000, "correlation123")
```
""" """
function _fetch_with_backoff( function _fetch_with_backoff(
url::String, url::String,
@@ -655,18 +825,44 @@ function _fetch_with_backoff(
end end
""" Deserialize bytes to data based on type """ _deserialize_data - Deserialize bytes to data based on type
This internal function converts serialized bytes back to Julia data based on type. This internal function converts serialized bytes back to Julia data based on type.
It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow IPC deserialization), It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow IPC deserialization),
"image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data). "image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data).
Arguments: # Function Workflow:
1. Validates the data type against supported formats
2. Converts bytes to appropriate Julia data type based on format
3. For text: converts bytes to string
4. For dictionary: converts bytes to JSON string then parses to Julia object
5. For table: reads Arrow IPC format and returns DataFrame
6. For image/audio/video/binary: returns bytes directly
# Arguments:
- `data::Vector{UInt8}` - Serialized data as bytes - `data::Vector{UInt8}` - Serialized data as bytes
- `type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") - `type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
- `correlation_id::String` - Correlation ID for logging - `correlation_id::String` - Correlation ID for logging
Return: # Return:
- Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary") - Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary")
# Throws:
- `Error` if `type` is not one of the supported types
# Example
```jldoctest
# Text data
text_bytes = UInt8["Hello World"]
text_data = _deserialize_data(text_bytes, "text", "correlation123")
# JSON data
json_bytes = UInt8[123, 34, 110, 97, 109, 101, 34, 58, 34, 65, 108, 105, 99, 101, 125] # {"name":"Alice"}
json_data = _deserialize_data(json_bytes, "dictionary", "correlation123")
# Arrow IPC data (table)
arrow_bytes = UInt8[1, 2, 3] # Arrow IPC bytes
table_data = _deserialize_data(arrow_bytes, "table", "correlation123")
```
""" """
function _deserialize_data( function _deserialize_data(
data::Vector{UInt8}, data::Vector{UInt8},
@@ -697,15 +893,15 @@ end
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode """ plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
This function uploads a raw byte array to a plik server in one-shot mode (no upload session). This function uploads a raw byte array to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`, It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
retrieves an upload ID and token, then uploads the file data as multipart form data using the token. retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
The function workflow: # Function Workflow:
1. Obtains an upload ID and token from the server 1. Creates a one-shot upload session by sending POST request with `{"OneShot": true}`
2. Uploads the provided binary data as a file using the `X-UploadToken` header 2. Retrieves upload ID and token from server response
3. Returns identifiers and download URL for the uploaded file 3. Uploads binary data as multipart form data using the token
4. Returns identifiers and download URL for the uploaded file
# Arguments: # Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) - `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
@@ -713,14 +909,14 @@ The function workflow:
- `data::Vector{UInt8}` - Raw byte data of the file content - `data::Vector{UInt8}` - Raw byte data of the file content
# Return: # Return:
- A Dict with keys: - `Dict{String, Any}` - Dictionary with keys:
- `"status"` - HTTP server response status - `"status"` - HTTP server response status
- `"uploadid"` - ID of the one-shot upload session - `"uploadid"` - ID of the one-shot upload session
- `"fileid"` - ID of the uploaded file within the session - `"fileid"` - ID of the uploaded file within the session
- `"url"` - Full URL to download the uploaded file - `"url"` - Full URL to download the uploaded file
# Example # Example
```julia ```jldoctest
using HTTP, JSON using HTTP, JSON
fileServerURL = "http://localhost:8080" fileServerURL = "http://localhost:8080"
@@ -776,31 +972,29 @@ end
""" plik_oneshot_upload(fileServerURL::String, filepath::String) """ plik_oneshot_upload(fileServerURL::String, filepath::String)
Upload a single file to a plik server using one-shot mode.
This function uploads a file from disk to a plik server in one-shot mode (no upload session). This function uploads a file from disk to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`, It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
retrieves an upload ID and token, then uploads the file data as multipart form data using the token. retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
The function workflow: # Function Workflow:
1. Obtains an upload ID and token from the server 1. Creates a one-shot upload session by sending POST request with `{"OneShot": true}`
2. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header 2. Retrieves upload ID and token from server response
3. Returns identifiers and download URL for the uploaded file 3. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header
4. Returns identifiers and download URL for the uploaded file
# Arguments: # Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) - `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `filepath::String` - Full path to the local file to upload - `filepath::String` - Full path to the local file to upload
# Return: # Return:
- A Dict with keys: - `Dict{String, Any}` - Dictionary with keys:
- `"status"` - HTTP server response status - `"status"` - HTTP server response status
- `"uploadid"` - ID of the one-shot upload session - `"uploadid"` - ID of the one-shot upload session
- `"fileid"` - ID of the uploaded file within the session - `"fileid"` - ID of the uploaded file within the session
- `"url"` - Full URL to download the uploaded file - `"url"` - Full URL to download the uploaded file
# Example # Example
```julia ```jldoctest
using HTTP, JSON using HTTP, JSON
fileServerURL = "http://localhost:8080" fileServerURL = "http://localhost:8080"

212
src/README.md Normal file
View File

@@ -0,0 +1,212 @@
# NATSBridge for Micropython
A high-performance, bi-directional data bridge for Micropython devices using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
## Overview
This module provides functionality for sending and receiving data over NATS with automatic transport selection based on payload size:
- **Direct Transport**: Payloads < 1MB are sent directly via NATS (Base64 encoded)
- **Link Transport**: Payloads >= 1MB are uploaded to an HTTP file server and referenced via URL
## Features
- ✅ Bi-directional NATS communication
- ✅ Multi-payload support (mixed content in single message)
- ✅ Automatic transport selection based on payload size
- ✅ File server integration for large payloads
- ✅ Exponential backoff for URL fetching
- ✅ Correlation ID tracking
- ✅ Reply-to support for request-response pattern
## Supported Payload Types
| Type | Description |
|------|-------------|
| `text` | Plain text strings |
| `dictionary` | JSON-serializable dictionaries |
| `table` | Tabular data (Arrow IPC format) |
| `image` | Image data (PNG, JPG bytes) |
| `audio` | Audio data (WAV, MP3 bytes) |
| `video` | Video data (MP4, AVI bytes) |
| `binary` | Generic binary data |
## Installation
1. Copy `nats_bridge.py` to your Micropython device
2. Ensure you have the following dependencies:
- `urequests` for HTTP requests
- `ubinascii` for base64 encoding
- `ujson` for JSON handling
- `usocket` for networking
## Usage
### Basic Text Message
```python
from nats_bridge import smartsend, smartreceive
# Sender
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")
# Receiver
payloads = smartreceive(msg)
for dataname, data, type in payloads:
print("Received {}: {}".format(dataname, data))
```
### Sending JSON Configuration
```python
from nats_bridge import smartsend
config = {
"wifi_ssid": "MyNetwork",
"wifi_password": "password123",
"update_interval": 60
}
data = [("config", config, "dictionary")]
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")
```
### Mixed Content (Chat with Text + Image)
```python
from nats_bridge import smartsend
image_data = b"\x89PNG..." # PNG bytes
data = [
("message_text", "Hello with image!", "text"),
("user_avatar", image_data, "binary")
]
env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")
```
### Request-Response Pattern
```python
from nats_bridge import smartsend
# Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend(
"/device/command",
data,
nats_url="nats://localhost:4222",
reply_to="/device/response",
reply_to_msg_id="cmd-001"
)
```
### Large Payloads (File Server)
```python
from nats_bridge import smartsend
# Large data (> 1MB)
large_data = b"A" * 2000000 # 2MB
env = smartsend(
"/data/large",
[("large_file", large_data, "binary")],
nats_url="nats://localhost:4222",
fileserver_url="http://localhost:8080",
size_threshold=1000000 # 1MB threshold
)
```
## API Reference
### `smartsend(subject, data, ...)`
Send data via NATS with automatic transport selection.
**Arguments:**
- `subject` (str): NATS subject to publish to
- `data` (list): List of `(dataname, data, type)` tuples
- `nats_url` (str): NATS server URL (default: `nats://localhost:4222`)
- `fileserver_url` (str): HTTP file server URL (default: `http://localhost:8080`)
- `size_threshold` (int): Threshold in bytes (default: 1,000,000)
- `correlation_id` (str): Optional correlation ID for tracing
- `msg_purpose` (str): Message purpose (default: `"chat"`)
- `sender_name` (str): Sender name (default: `"NATSBridge"`)
- `receiver_name` (str): Receiver name (default: `""`)
- `receiver_id` (str): Receiver ID (default: `""`)
- `reply_to` (str): Reply topic (default: `""`)
- `reply_to_msg_id` (str): Reply message ID (default: `""`)
**Returns:** `MessageEnvelope` object
### `smartreceive(msg, ...)`
Receive and process NATS messages.
**Arguments:**
- `msg`: NATS message (dict or JSON string)
- `fileserver_download_handler` (function): Function to fetch data from URLs
- `max_retries` (int): Maximum retry attempts (default: 5)
- `base_delay` (int): Initial delay in ms (default: 100)
- `max_delay` (int): Maximum delay in ms (default: 5000)
**Returns:** List of `(dataname, data, type)` tuples
### `MessageEnvelope`
Represents a complete NATS message envelope.
**Attributes:**
- `correlation_id`: Unique identifier for tracing
- `msg_id`: Unique message identifier
- `timestamp`: Message publication timestamp
- `send_to`: NATS subject
- `msg_purpose`: Message purpose
- `sender_name`: Sender name
- `sender_id`: Sender UUID
- `receiver_name`: Receiver name
- `receiver_id`: Receiver UUID
- `reply_to`: Reply topic
- `reply_to_msg_id`: Reply message ID
- `broker_url`: NATS broker URL
- `metadata`: Message-level metadata
- `payloads`: List of MessagePayload objects
### `MessagePayload`
Represents a single payload within a message envelope.
**Attributes:**
- `id`: Unique payload identifier
- `dataname`: Name of the payload
- `type`: Payload type ("text", "dictionary", etc.)
- `transport`: Transport method ("direct" or "link")
- `encoding`: Encoding method ("none", "base64", etc.)
- `size`: Payload size in bytes
- `data`: Payload data (bytes for direct, URL for link)
- `metadata`: Payload-level metadata
## Examples
See `examples/micropython_example.py` for more detailed examples.
## Testing
Run the test suite:
```bash
python test/test_micropython_basic.py
```
## Requirements
- Micropython with networking support
- NATS server (nats.io)
- HTTP file server (optional, for large payloads)
## License
MIT

664
src/nats_bridge.py Normal file
View File

@@ -0,0 +1,664 @@
"""
Micropython NATS Bridge - Bi-Directional Data Bridge for Micropython
This module provides functionality for sending and receiving data over NATS
using the Claim-Check pattern for large payloads.
Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
"""
import json
import random
import time
import usocket
import uselect
import ustruct
import uuid
try:
import ussl
HAS_SSL = True
except ImportError:
HAS_SSL = False
# Constants
DEFAULT_SIZE_THRESHOLD = 1000000 # 1MB - threshold for switching from direct to link transport
DEFAULT_NATS_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"
# ============================================= 100 ============================================== #
class MessagePayload:
"""Internal message payload structure representing a single payload within a NATS message envelope."""
def __init__(self, data, msg_type, id="", dataname="", transport="direct",
encoding="none", size=0, metadata=None):
"""
Initialize a MessagePayload.
Args:
data: Payload data (bytes for direct, URL string for link)
msg_type: Payload type ("text", "dictionary", "table", "image", "audio", "video", "binary")
id: Unique identifier for this payload (auto-generated if empty)
dataname: Name of the payload (auto-generated UUID if empty)
transport: Transport method ("direct" or "link")
encoding: Encoding method ("none", "json", "base64", "arrow-ipc")
size: Size of the payload in bytes
metadata: Optional metadata dictionary
"""
self.id = id if id else self._generate_uuid()
self.dataname = dataname if dataname else self._generate_uuid()
self.type = msg_type
self.transport = transport
self.encoding = encoding
self.size = size
self.data = data
self.metadata = metadata if metadata else {}
def _generate_uuid(self):
"""Generate a UUID string."""
return str(uuid.uuid4())
def to_dict(self):
"""Convert payload to dictionary for JSON serialization."""
payload_dict = {
"id": self.id,
"dataname": self.dataname,
"type": self.type,
"transport": self.transport,
"encoding": self.encoding,
"size": self.size,
}
# Include data based on transport type
if self.transport == "direct" and self.data is not None:
if self.encoding == "base64" or self.encoding == "json":
payload_dict["data"] = self.data
else:
# For other encodings, use base64
payload_dict["data"] = self._to_base64(self.data)
elif self.transport == "link" and self.data is not None:
# For link transport, data is a URL string
payload_dict["data"] = self.data
if self.metadata:
payload_dict["metadata"] = self.metadata
return payload_dict
def _to_base64(self, data):
"""Convert bytes to base64 string."""
if isinstance(data, bytes):
# Simple base64 encoding without library
import ubinascii
return ubinascii.b2a_base64(data).decode('utf-8').strip()
return data
def _from_base64(self, data):
"""Convert base64 string to bytes."""
import ubinascii
return ubinascii.a2b_base64(data)
class MessageEnvelope:
"""Internal message envelope structure containing multiple payloads with metadata."""
def __init__(self, send_to, payloads, correlation_id="", msg_id="", timestamp="",
msg_purpose="", sender_name="", sender_id="", receiver_name="",
receiver_id="", reply_to="", reply_to_msg_id="", broker_url=DEFAULT_NATS_URL,
metadata=None):
"""
Initialize a MessageEnvelope.
Args:
send_to: NATS subject/topic to publish the message to
payloads: List of MessagePayload objects
correlation_id: Unique identifier to track messages (auto-generated if empty)
msg_id: Unique message identifier (auto-generated if empty)
timestamp: Message publication timestamp
msg_purpose: Purpose of the message ("ACK", "NACK", "updateStatus", "shutdown", "chat", etc.)
sender_name: Name of the sender
sender_id: UUID of the sender
receiver_name: Name of the receiver (empty means broadcast)
receiver_id: UUID of the receiver (empty means broadcast)
reply_to: Topic where receiver should reply
reply_to_msg_id: Message ID this message is replying to
broker_url: NATS broker URL
metadata: Optional message-level metadata
"""
self.correlation_id = correlation_id if correlation_id else self._generate_uuid()
self.msg_id = msg_id if msg_id else self._generate_uuid()
self.timestamp = timestamp if timestamp else self._get_timestamp()
self.send_to = send_to
self.msg_purpose = msg_purpose
self.sender_name = sender_name
self.sender_id = sender_id if sender_id else self._generate_uuid()
self.receiver_name = receiver_name
self.receiver_id = receiver_id if receiver_id else self._generate_uuid()
self.reply_to = reply_to
self.reply_to_msg_id = reply_to_msg_id
self.broker_url = broker_url
self.metadata = metadata if metadata else {}
self.payloads = payloads
def _generate_uuid(self):
"""Generate a UUID string."""
return str(uuid.uuid4())
def _get_timestamp(self):
"""Get current timestamp in ISO format."""
# Simplified timestamp - Micropython may not have full datetime
return "2026-02-21T" + time.strftime("%H:%M:%S", time.localtime())
def to_json(self):
"""Convert envelope to JSON string."""
obj = {
"correlationId": self.correlation_id,
"msgId": self.msg_id,
"timestamp": self.timestamp,
"sendTo": self.send_to,
"msgPurpose": self.msg_purpose,
"senderName": self.sender_name,
"senderId": self.sender_id,
"receiverName": self.receiver_name,
"receiverId": self.receiver_id,
"replyTo": self.reply_to,
"replyToMsgId": self.reply_to_msg_id,
"brokerURL": self.broker_url
}
# Include metadata if not empty
if self.metadata:
obj["metadata"] = self.metadata
# Convert payloads to JSON array
if self.payloads:
payloads_json = []
for payload in self.payloads:
payloads_json.append(payload.to_dict())
obj["payloads"] = payloads_json
return json.dumps(obj)
def log_trace(correlation_id, message):
"""Log a trace message with correlation ID and timestamp."""
timestamp = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime())
print("[{}] [Correlation: {}] {}".format(timestamp, correlation_id, message))
def _serialize_data(data, msg_type):
"""Serialize data according to specified format.
Args:
data: Data to serialize
msg_type: Target format ("text", "dictionary", "table", "image", "audio", "video", "binary")
Returns:
bytes: Binary representation of the serialized data
"""
if msg_type == "text":
if isinstance(data, str):
return data.encode('utf-8')
else:
raise ValueError("Text data must be a string")
elif msg_type == "dictionary":
if isinstance(data, dict):
json_str = json.dumps(data)
return json_str.encode('utf-8')
else:
raise ValueError("Dictionary data must be a dict")
elif msg_type in ("image", "audio", "video", "binary"):
if isinstance(data, bytes):
return data
else:
raise ValueError("{} data must be bytes".format(msg_type.capitalize()))
else:
raise ValueError("Unknown type: {}".format(msg_type))
def _deserialize_data(data_bytes, msg_type, correlation_id):
"""Deserialize bytes to data based on type.
Args:
data_bytes: Serialized data as bytes
msg_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
correlation_id: Correlation ID for logging
Returns:
Deserialized data
"""
if msg_type == "text":
return data_bytes.decode('utf-8')
elif msg_type == "dictionary":
json_str = data_bytes.decode('utf-8')
return json.loads(json_str)
elif msg_type in ("image", "audio", "video", "binary"):
return data_bytes
else:
raise ValueError("Unknown type: {}".format(msg_type))
class NATSConnection:
"""Simple NATS connection for Micropython."""
def __init__(self, url=DEFAULT_NATS_URL):
"""Initialize NATS connection.
Args:
url: NATS server URL (e.g., "nats://localhost:4222")
"""
self.url = url
self.host = "localhost"
self.port = 4222
self.conn = None
self._parse_url(url)
def _parse_url(self, url):
"""Parse NATS URL to extract host and port."""
if url.startswith("nats://"):
url = url[7:]
elif url.startswith("tls://"):
url = url[6:]
if ":" in url:
self.host, port_str = url.split(":")
self.port = int(port_str)
else:
self.host = url
def connect(self):
"""Connect to NATS server."""
addr = usocket.getaddrinfo(self.host, self.port)[0][-1]
self.conn = usocket.socket()
self.conn.connect(addr)
log_trace("", "Connected to NATS server at {}:{}".format(self.host, self.port))
def publish(self, subject, message):
"""Publish a message to a NATS subject.
Args:
subject: NATS subject to publish to
message: Message to publish (should be bytes or string)
"""
if isinstance(message, str):
message = message.encode('utf-8')
# Simple NATS protocol implementation
msg = "PUB {} {}\r\n".format(subject, len(message))
msg = msg.encode('utf-8') + message + b"\r\n"
self.conn.send(msg)
log_trace("", "Message published to {}".format(subject))
def subscribe(self, subject, callback):
"""Subscribe to a NATS subject.
Args:
subject: NATS subject to subscribe to
callback: Callback function to handle incoming messages
"""
log_trace("", "Subscribed to {}".format(subject))
# Simplified subscription - in a real implementation, you'd handle SUB/PUB messages
# For Micropython, we'll use a simple polling approach
self.subscribed_subject = subject
self.subscription_callback = callback
def wait_message(self, timeout=1000):
"""Wait for incoming message.
Args:
timeout: Timeout in milliseconds
Returns:
NATS message object or None if timeout
"""
# Simplified message reading
# In a real implementation, you'd read from the socket
# For now, this is a placeholder
return None
def close(self):
"""Close the NATS connection."""
if self.conn:
self.conn.close()
self.conn = None
log_trace("", "NATS connection closed")
def _fetch_with_backoff(url, max_retries=5, base_delay=100, max_delay=5000, correlation_id=""):
"""Fetch data from URL with exponential backoff.
Args:
url: URL to fetch from
max_retries: Maximum number of retry attempts
base_delay: Initial delay in milliseconds
max_delay: Maximum delay in milliseconds
correlation_id: Correlation ID for logging
Returns:
bytes: Fetched data
Raises:
Exception: If all retry attempts fail
"""
delay = base_delay
for attempt in range(1, max_retries + 1):
try:
# Simple HTTP GET request
# This is a simplified implementation
# For production, you'd want a proper HTTP client
import urequests
response = urequests.get(url)
if response.status_code == 200:
log_trace(correlation_id, "Successfully fetched data from {} on attempt {}".format(url, attempt))
return response.content
else:
raise Exception("Failed to fetch: {}".format(response.status_code))
except Exception as e:
log_trace(correlation_id, "Attempt {} failed: {}".format(attempt, str(e)))
if attempt < max_retries:
time.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
def plik_oneshot_upload(file_server_url, filename, data):
"""Upload a single file to a plik server using one-shot mode.
Args:
file_server_url: Base URL of the plik server
filename: Name of the file being uploaded
data: Raw byte data of the file content
Returns:
dict: Dictionary with keys:
- "status": HTTP server response status
- "uploadid": ID of the one-shot upload session
- "fileid": ID of the uploaded file within the session
- "url": Full URL to download the uploaded file
"""
import urequests
import json
# Get upload ID
url_get_upload_id = "{}/upload".format(file_server_url)
headers = {"Content-Type": "application/json"}
body = json.dumps({"OneShot": True})
response = urequests.post(url_get_upload_id, headers=headers, data=body)
response_json = json.loads(response.content)
uploadid = response_json.get("id")
uploadtoken = response_json.get("uploadToken")
# Upload file
url_upload = "{}/file/{}".format(file_server_url, uploadid)
headers = {"X-UploadToken": uploadtoken}
# For Micropython, we need to construct the multipart form data manually
# This is a simplified approach
boundary = "----WebKitFormBoundary{}".format(uuid.uuid4().hex[:16])
# Create multipart body
part1 = "--{}\r\n".format(boundary)
part1 += "Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n".format(filename)
part1 += "Content-Type: application/octet-stream\r\n\r\n"
part1_bytes = part1.encode('utf-8')
part2 = "\r\n--{}--".format(boundary)
part2_bytes = part2.encode('utf-8')
# Combine all parts
full_body = part1_bytes + data + part2_bytes
# Set content type with boundary
content_type = "multipart/form-data; boundary={}".format(boundary)
response = urequests.post(url_upload, headers={"Content-Type": content_type}, data=full_body)
response_json = json.loads(response.content)
fileid = response_json.get("id")
url = "{}/file/{}/{}".format(file_server_url, uploadid, filename)
return {
"status": response.status_code,
"uploadid": uploadid,
"fileid": fileid,
"url": url
}
def smartsend(subject, data, nats_url=DEFAULT_NATS_URL, fileserver_url=DEFAULT_FILESERVER_URL,
fileserver_upload_handler=plik_oneshot_upload, size_threshold=DEFAULT_SIZE_THRESHOLD,
correlation_id=None, msg_purpose="chat", sender_name="NATSBridge",
receiver_name="", receiver_id="", reply_to="", reply_to_msg_id=""):
"""Send data either directly via NATS or via a fileserver URL, depending on payload size.
This function intelligently routes data delivery based on payload size relative to a threshold.
If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and
publishes directly over NATS. Otherwise, it uploads the data to a fileserver and publishes
only the download URL over NATS.
Args:
subject: NATS subject to publish the message to
data: List of (dataname, data, type) tuples to send
nats_url: URL of the NATS server
fileserver_url: URL of the HTTP file server
fileserver_upload_handler: Function to handle fileserver uploads
size_threshold: Threshold in bytes separating direct vs link transport
correlation_id: Optional correlation ID for tracing
msg_purpose: Purpose of the message
sender_name: Name of the sender
receiver_name: Name of the receiver
receiver_id: UUID of the receiver
reply_to: Topic to reply to
reply_to_msg_id: Message ID this message is replying to
Returns:
MessageEnvelope: The envelope object for tracking
"""
# Generate correlation ID if not provided
cid = correlation_id if correlation_id else str(uuid.uuid4())
log_trace(cid, "Starting smartsend for subject: {}".format(subject))
# Generate message metadata
msg_id = str(uuid.uuid4())
# Process each payload in the list
payloads = []
for dataname, payload_data, payload_type in data:
# Serialize data based on type
payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes)
log_trace(cid, "Serialized payload '{}' (type: {}) size: {} bytes".format(
dataname, payload_type, payload_size))
# Decision: Direct vs Link
if payload_size < size_threshold:
# Direct path - Base64 encode and send via NATS
payload_b64 = _serialize_data(payload_bytes, "binary") # Already bytes
# Convert to base64 string for JSON
import ubinascii
payload_b64_str = ubinascii.b2a_base64(payload_bytes).decode('utf-8').strip()
log_trace(cid, "Using direct transport for {} bytes".format(payload_size))
# Create MessagePayload for direct transport
payload = MessagePayload(
payload_b64_str,
payload_type,
id=str(uuid.uuid4()),
dataname=dataname,
transport="direct",
encoding="base64",
size=payload_size,
metadata={"payload_bytes": payload_size}
)
payloads.append(payload)
else:
# Link path - Upload to HTTP server, send URL via NATS
log_trace(cid, "Using link transport, uploading to fileserver")
# Upload to HTTP server
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
if response["status"] != 200:
raise Exception("Failed to upload data to fileserver: {}".format(response["status"]))
url = response["url"]
log_trace(cid, "Uploaded to URL: {}".format(url))
# Create MessagePayload for link transport
payload = MessagePayload(
url,
payload_type,
id=str(uuid.uuid4()),
dataname=dataname,
transport="link",
encoding="none",
size=payload_size,
metadata={}
)
payloads.append(payload)
# Create MessageEnvelope with all payloads
env = MessageEnvelope(
subject,
payloads,
correlation_id=cid,
msg_id=msg_id,
msg_purpose=msg_purpose,
sender_name=sender_name,
sender_id=str(uuid.uuid4()),
receiver_name=receiver_name,
receiver_id=receiver_id,
reply_to=reply_to,
reply_to_msg_id=reply_to_msg_id,
broker_url=nats_url,
metadata={}
)
msg_json = env.to_json()
# Publish to NATS
nats_conn = NATSConnection(nats_url)
nats_conn.connect()
nats_conn.publish(subject, msg_json)
nats_conn.close()
return env
def smartreceive(msg, fileserver_download_handler=_fetch_with_backoff, max_retries=5,
base_delay=100, max_delay=5000):
"""Receive and process messages from NATS.
This function processes incoming NATS messages, handling both direct transport
(base64 decoded payloads) and link transport (URL-based payloads).
Args:
msg: NATS message to process (dict with payload data)
fileserver_download_handler: Function to handle downloading data from file server URLs
max_retries: Maximum retry attempts for fetching URL
base_delay: Initial delay for exponential backoff in ms
max_delay: Maximum delay for exponential backoff in ms
Returns:
list: List of (dataname, data, type) tuples
"""
# Parse the JSON envelope
json_data = msg if isinstance(msg, dict) else json.loads(msg)
log_trace(json_data.get("correlationId", ""), "Processing received message")
# Process all payloads in the envelope
payloads_list = []
# Get number of payloads
num_payloads = len(json_data.get("payloads", []))
for i in range(num_payloads):
payload = json_data["payloads"][i]
transport = payload.get("transport", "")
dataname = payload.get("dataname", "")
if transport == "direct":
log_trace(json_data.get("correlationId", ""),
"Direct transport - decoding payload '{}'".format(dataname))
# Extract base64 payload from the payload
payload_b64 = payload.get("data", "")
# Decode Base64 payload
import ubinascii
payload_bytes = ubinascii.a2b_base64(payload_b64.encode('utf-8'))
# Deserialize based on type
data_type = payload.get("type", "")
data = _deserialize_data(payload_bytes, data_type, json_data.get("correlationId", ""))
payloads_list.append((dataname, data, data_type))
elif transport == "link":
# Extract download URL from the payload
url = payload.get("data", "")
log_trace(json_data.get("correlationId", ""),
"Link transport - fetching '{}' from URL: {}".format(dataname, url))
# Fetch with exponential backoff
downloaded_data = fileserver_download_handler(
url, max_retries, base_delay, max_delay, json_data.get("correlationId", "")
)
# Deserialize based on type
data_type = payload.get("type", "")
data = _deserialize_data(downloaded_data, data_type, json_data.get("correlationId", ""))
payloads_list.append((dataname, data, data_type))
else:
raise ValueError("Unknown transport type for payload '{}': {}".format(dataname, transport))
return payloads_list
# Utility functions
def generate_uuid():
"""Generate a UUID string."""
return str(uuid.uuid4())
def get_timestamp():
"""Get current timestamp in ISO format."""
return time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime())
# Example usage
if __name__ == "__main__":
print("NATSBridge for Micropython")
print("=========================")
print("This module provides:")
print(" - MessageEnvelope: Message envelope structure")
print(" - MessagePayload: Payload structure")
print(" - smartsend: Send data via NATS with automatic transport selection")
print(" - smartreceive: Receive and process messages from NATS")
print(" - plik_oneshot_upload: Upload files to HTTP file server")
print(" - _fetch_with_backoff: Fetch data from URLs with retry logic")
print()
print("Usage:")
print(" from nats_bridge import smartsend, smartreceive")
print(" data = [(\"message\", \"Hello World\", \"text\")]")
print(" env = smartsend(\"my.subject\", data)")
print()
print(" # On receiver:")
print(" payloads = smartreceive(msg)")
print(" for dataname, data, type in payloads:")
print(" print(f\"Received {dataname} of type {type}: {data}\")")

View File

@@ -0,0 +1,220 @@
"""
Micropython NATS Bridge - Basic Test Examples
This module demonstrates basic usage of the NATSBridge for Micropython.
"""
import sys
sys.path.insert(0, "../src")
from nats_bridge import MessageEnvelope, MessagePayload, smartsend, smartreceive, log_trace
import json
# ============================================= 100 ============================================== #
def test_text_message():
"""Test sending and receiving text messages."""
print("\n=== Test 1: Text Message ===")
# Send text message
data = [
("message", "Hello World", "text"),
("greeting", "Good morning!", "text")
]
env = smartsend(
"/test/text",
data,
nats_url="nats://localhost:4222",
size_threshold=1000000
)
print("Sent envelope:")
print(" Subject: {}".format(env.send_to))
print(" Correlation ID: {}".format(env.correlation_id))
print(" Payloads: {}".format(len(env.payloads)))
# Expected output on receiver:
# payloads = smartreceive(msg)
# for dataname, data, type in payloads:
# print("Received {}: {}".format(dataname, data))
def test_dictionary_message():
"""Test sending and receiving dictionary messages."""
print("\n=== Test 2: Dictionary Message ===")
# Send dictionary message
config = {
"step_size": 0.01,
"iterations": 1000,
"threshold": 0.5
}
data = [
("config", config, "dictionary")
]
env = smartsend(
"/test/dictionary",
data,
nats_url="nats://localhost:4222",
size_threshold=1000000
)
print("Sent envelope:")
print(" Subject: {}".format(env.send_to))
print(" Payloads: {}".format(len(env.payloads)))
# Expected output on receiver:
# payloads = smartreceive(msg)
# for dataname, data, type in payloads:
# if type == "dictionary":
# print("Config: {}".format(data))
def test_mixed_payloads():
"""Test sending mixed payload types in a single message."""
print("\n=== Test 3: Mixed Payloads ===")
# Mixed content: text, dictionary, and binary
image_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR" # PNG header (example)
data = [
("message_text", "Hello!", "text"),
("user_config", {"theme": "dark", "volume": 80}, "dictionary"),
("user_image", image_data, "binary")
]
env = smartsend(
"/test/mixed",
data,
nats_url="nats://localhost:4222",
size_threshold=1000000
)
print("Sent envelope:")
print(" Subject: {}".format(env.send_to))
print(" Payloads: {}".format(len(env.payloads)))
# Expected output on receiver:
# payloads = smartreceive(msg)
# for dataname, data, type in payloads:
# print("Received {}: {} (type: {})".format(dataname, data if type != "binary" else len(data), type))
def test_large_payload():
"""Test sending large payloads that require fileserver upload."""
print("\n=== Test 4: Large Payload (Link Transport) ===")
# Create large data (> 1MB would trigger link transport)
# For testing, we'll use a smaller size but configure threshold lower
large_data = b"A" * 100000 # 100KB
data = [
("large_data", large_data, "binary")
]
# Use a lower threshold for testing
env = smartsend(
"/test/large",
data,
nats_url="nats://localhost:4222",
fileserver_url="http://localhost:8080",
size_threshold=50000 # 50KB threshold for testing
)
print("Sent envelope:")
print(" Subject: {}".format(env.send_to))
print(" Payloads: {}".format(len(env.payloads)))
for p in env.payloads:
print(" - Transport: {}, Type: {}".format(p.transport, p.type))
def test_reply_to():
"""Test sending messages with reply-to functionality."""
print("\n=== Test 5: Reply To ===")
data = [
("command", {"action": "start"}, "dictionary")
]
env = smartsend(
"/test/command",
data,
nats_url="nats://localhost:4222",
reply_to="/test/response",
reply_to_msg_id="reply-123",
msg_purpose="command"
)
print("Sent envelope:")
print(" Subject: {}".format(env.send_to))
print(" Reply To: {}".format(env.reply_to))
print(" Reply To Msg ID: {}".format(env.reply_to_msg_id))
def test_correlation_id():
"""Test using custom correlation IDs for tracing."""
print("\n=== Test 6: Custom Correlation ID ===")
custom_cid = "trace-abc123"
data = [
("message", "Test with correlation ID", "text")
]
env = smartsend(
"/test/correlation",
data,
nats_url="nats://localhost:4222",
correlation_id=custom_cid
)
print("Sent envelope with correlation ID: {}".format(env.correlation_id))
print("This ID can be used to trace the message flow.")
def test_multiple_payloads():
"""Test sending multiple payloads in one message."""
print("\n=== Test 7: Multiple Payloads ===")
data = [
("text_message", "Hello", "text"),
("json_data", {"key": "value", "number": 42}, "dictionary"),
("table_data", b"\x01\x02\x03\x04", "binary"),
("audio_data", b"\x00\x01\x02\x03", "binary")
]
env = smartsend(
"/test/multiple",
data,
nats_url="nats://localhost:4222",
size_threshold=1000000
)
print("Sent {} payloads in one message".format(len(env.payloads)))
if __name__ == "__main__":
print("Micropython NATS Bridge Test Suite")
print("==================================")
print()
# Run tests
test_text_message()
test_dictionary_message()
test_mixed_payloads()
test_large_payload()
test_reply_to()
test_correlation_id()
test_multiple_payloads()
print("\n=== All tests completed ===")
print()
print("Note: These tests require:")
print(" 1. A running NATS server at nats://localhost:4222")
print(" 2. An HTTP file server at http://localhost:8080 (for large payloads)")
print()
print("To run the tests:")
print(" python test_micropython_basic.py")