Files
NATSBridge/src
2026-02-22 13:26:44 +07:00
..
2026-02-22 13:26:44 +07:00
2026-02-22 13:26:44 +07:00
2026-02-19 19:25:34 +07:00
2026-02-22 13:26:44 +07:00

NATSBridge for Micropython

A high-performance, bi-directional data bridge for Micropython devices using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.

Overview

This module provides functionality for sending and receiving data over NATS with automatic transport selection based on payload size:

  • Direct Transport: Payloads < 1MB are sent directly via NATS (Base64 encoded)
  • Link Transport: Payloads >= 1MB are uploaded to an HTTP file server and referenced via URL

Features

  • Bi-directional NATS communication
  • Multi-payload support (mixed content in single message)
  • Automatic transport selection based on payload size
  • File server integration for large payloads
  • Exponential backoff for URL fetching
  • Correlation ID tracking
  • Reply-to support for request-response pattern

Supported Payload Types

Type Description
text Plain text strings
dictionary JSON-serializable dictionaries
table Tabular data (Arrow IPC format)
image Image data (PNG, JPG bytes)
audio Audio data (WAV, MP3 bytes)
video Video data (MP4, AVI bytes)
binary Generic binary data

Installation

  1. Copy nats_bridge.py to your Micropython device
  2. Ensure you have the following dependencies:
    • urequests for HTTP requests
    • ubinascii for base64 encoding
    • ujson for JSON handling
    • usocket for networking

Usage

Basic Text Message

from nats_bridge import smartsend, smartreceive

# Sender
data = [("message", "Hello World", "text")]
env = smartsend("/chat/room1", data, nats_url="nats://localhost:4222")

# Receiver
payloads = smartreceive(msg)
for dataname, data, type in payloads:
    print("Received {}: {}".format(dataname, data))

Sending JSON Configuration

from nats_bridge import smartsend

config = {
    "wifi_ssid": "MyNetwork",
    "wifi_password": "password123",
    "update_interval": 60
}

data = [("config", config, "dictionary")]
env = smartsend("/device/config", data, nats_url="nats://localhost:4222")

Mixed Content (Chat with Text + Image)

from nats_bridge import smartsend

image_data = b"\x89PNG..."  # PNG bytes

data = [
    ("message_text", "Hello with image!", "text"),
    ("user_avatar", image_data, "binary")
]

env = smartsend("/chat/mixed", data, nats_url="nats://localhost:4222")

Request-Response Pattern

from nats_bridge import smartsend

# Send command with reply-to
data = [("command", {"action": "read_sensor"}, "dictionary")]
env = smartsend(
    "/device/command",
    data,
    nats_url="nats://localhost:4222",
    reply_to="/device/response",
    reply_to_msg_id="cmd-001"
)

Large Payloads (File Server)

from nats_bridge import smartsend

# Large data (> 1MB)
large_data = b"A" * 2000000  # 2MB

env = smartsend(
    "/data/large",
    [("large_file", large_data, "binary")],
    nats_url="nats://localhost:4222",
    fileserver_url="http://localhost:8080",
    size_threshold=1000000  # 1MB threshold
)

API Reference

smartsend(subject, data, ...)

Send data via NATS with automatic transport selection.

Arguments:

  • subject (str): NATS subject to publish to
  • data (list): List of (dataname, data, type) tuples
  • nats_url (str): NATS server URL (default: nats://localhost:4222)
  • fileserver_url (str): HTTP file server URL (default: http://localhost:8080)
  • size_threshold (int): Threshold in bytes (default: 1,000,000)
  • correlation_id (str): Optional correlation ID for tracing
  • msg_purpose (str): Message purpose (default: "chat")
  • sender_name (str): Sender name (default: "NATSBridge")
  • receiver_name (str): Receiver name (default: "")
  • receiver_id (str): Receiver ID (default: "")
  • reply_to (str): Reply topic (default: "")
  • reply_to_msg_id (str): Reply message ID (default: "")

Returns: MessageEnvelope object

smartreceive(msg, ...)

Receive and process NATS messages.

Arguments:

  • msg: NATS message (dict or JSON string)
  • fileserver_download_handler (function): Function to fetch data from URLs
  • max_retries (int): Maximum retry attempts (default: 5)
  • base_delay (int): Initial delay in ms (default: 100)
  • max_delay (int): Maximum delay in ms (default: 5000)

Returns: List of (dataname, data, type) tuples

MessageEnvelope

Represents a complete NATS message envelope.

Attributes:

  • correlation_id: Unique identifier for tracing
  • msg_id: Unique message identifier
  • timestamp: Message publication timestamp
  • send_to: NATS subject
  • msg_purpose: Message purpose
  • sender_name: Sender name
  • sender_id: Sender UUID
  • receiver_name: Receiver name
  • receiver_id: Receiver UUID
  • reply_to: Reply topic
  • reply_to_msg_id: Reply message ID
  • broker_url: NATS broker URL
  • metadata: Message-level metadata
  • payloads: List of MessagePayload objects

MessagePayload

Represents a single payload within a message envelope.

Attributes:

  • id: Unique payload identifier
  • dataname: Name of the payload
  • type: Payload type ("text", "dictionary", etc.)
  • transport: Transport method ("direct" or "link")
  • encoding: Encoding method ("none", "base64", etc.)
  • size: Payload size in bytes
  • data: Payload data (bytes for direct, URL for link)
  • metadata: Payload-level metadata

Examples

See examples/micropython_example.py for more detailed examples.

Testing

Run the test suite:

python test/test_micropython_basic.py

Requirements

  • Micropython with networking support
  • NATS server (nats.io)
  • HTTP file server (optional, for large payloads)

License

MIT