From 5a9e93d6e77f344cbe005921326f16d0b5f87462 Mon Sep 17 00:00:00 2001 From: narawat Date: Tue, 24 Feb 2026 20:38:45 +0700 Subject: [PATCH] update --- docs/architecture.md | 165 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 140 insertions(+), 25 deletions(-) diff --git a/docs/architecture.md b/docs/architecture.md index 31e5c99..215bfbf 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -464,60 +464,175 @@ end #### Dependencies - `nats.js` - Core NATS functionality - `apache-arrow` - Arrow IPC serialization -- `uuid` - Correlation ID generation +- `uuid` - Correlation ID and message ID generation +- `base64-arraybuffer` - Base64 encoding/decoding +- `node-fetch` or `fetch` - HTTP client for file server #### smartsend Function ```javascript -async function smartsend(subject, data, options = {}) - // data format: [(dataname, data, type), ...] - // options object should include: - // - natsUrl: NATS server URL - // - fileserverUrl: base URL of the file server - // - sizeThreshold: threshold in bytes for transport selection - // - correlationId: optional correlation ID for tracing +async function smartsend( + subject, + data, // List of (dataname, data, type) tuples: [(dataname1, data1, type1), ...] + options = {} +) ``` +**Options:** +- `broker_url` (String) - NATS server URL (default: `"nats://localhost:4222"`) +- `fileserver_url` (String) - Base URL of the file server (default: `"http://localhost:8080"`) +- `size_threshold` (Number) - Threshold in bytes for transport selection (default: `1048576` = 1MB) +- `correlation_id` (String) - Optional correlation ID for tracing +- `msg_purpose` (String) - Purpose of the message (default: `"chat"`) +- `sender_name` (String) - Sender name (default: `"NATSBridge"`) +- `receiver_name` (String) - Message receiver name (default: `""`) +- `receiver_id` (String) - Message receiver ID (default: `""`) +- `reply_to` (String) - Topic to reply to (default: `""`) +- `reply_to_msg_id` (String) - Message ID this message is replying to (default: `""`) +- `fileserver_upload_handler` (Function) - Custom upload handler function + +**Return Value:** +- Returns a Promise that resolves to an object containing: + - `envelope` - The envelope object containing all metadata and payloads + - `envelope_json` - JSON string representation of the envelope for publishing + - `published` - Boolean indicating whether the message was automatically published to NATS + **Input Format:** - `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]` - Even for single payloads: `[(dataname1, data1, "type1")]` - Each payload can have a different type, enabling mixed-content messages +- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"` **Flow:** -1. Iterate through the list of (dataname, data, type) tuples -2. For each payload: extract the type from the tuple and serialize accordingly -3. Check payload size -4. If < threshold: publish directly to NATS -5. If >= threshold: upload to HTTP server, publish NATS with URL +1. Generate correlation ID and message ID if not provided +2. Iterate through the list of `(dataname, data, type)` tuples +3. For each payload: + - Serialize based on payload type + - Check payload size + - If < threshold: Base64 encode and include in envelope + - If >= threshold: Upload to HTTP server, store URL in envelope +4. Publish the JSON envelope to NATS +5. Return envelope object and JSON string #### smartreceive Handler ```javascript async function smartreceive(msg, options = {}) - // options object should include: - // - fileserverDownloadHandler: function to fetch data from file server URL - // - max_retries: maximum retry attempts for fetching URL - // - base_delay: initial delay for exponential backoff in ms - // - max_delay: maximum delay for exponential backoff in ms - // - correlationId: optional correlation ID for tracing ``` +**Options:** +- `fileserver_download_handler` (Function) - Custom download handler function +- `max_retries` (Number) - Maximum retry attempts for fetching URL (default: `5`) +- `base_delay` (Number) - Initial delay for exponential backoff in ms (default: `100`) +- `max_delay` (Number) - Maximum delay for exponential backoff in ms (default: `5000`) +- `correlation_id` (String) - Optional correlation ID for tracing + **Output Format:** -- Returns a dictionary (key-value map) containing all envelope fields: - - `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL` +- Returns a Promise that resolves to an object containing all envelope fields: + - `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url` - `metadata` - Message-level metadata dictionary - - `payloads` - List of dictionaries, each containing deserialized payload data + - `payloads` - List of dictionaries, each containing deserialized payload data with keys: `dataname`, `data`, `payload_type` **Process Flow:** 1. Parse the JSON envelope to extract all fields -2. Iterate through each payload in `payloads` +2. Iterate through each payload in `payloads` array 3. For each payload: - Determine transport type (`direct` or `link`) - - If `direct`: decode Base64 data from the message - - If `link`: fetch data from URL using exponential backoff + - If `direct`: Base64 decode the data from the message + - If `link`: Fetch data from URL using exponential backoff (via `fileserverDownloadHandler`) + - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.) +4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples + +**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`. + +### Python/Micropython Implementation + +#### Dependencies +- `nats-python` - Core NATS functionality +- `pyarrow` - Arrow IPC serialization +- `uuid` - Correlation ID and message ID generation +- `base64` - Base64 encoding/decoding +- `requests` or `aiohttp` - HTTP client for file server + +#### smartsend Function + +```python +async def smartsend( + subject: str, + data: List[Tuple[str, Any, str]], # List of (dataname, data, type) tuples + options: Dict = {} +) +``` + +**Options:** +- `broker_url` (str) - NATS server URL (default: `"nats://localhost:4222"`) +- `fileserver_url` (str) - Base URL of the file server (default: `"http://localhost:8080"`) +- `size_threshold` (int) - Threshold in bytes for transport selection (default: `1048576` = 1MB) +- `correlation_id` (str) - Optional correlation ID for tracing +- `msg_purpose` (str) - Purpose of the message (default: `"chat"`) +- `sender_name` (str) - Sender name (default: `"NATSBridge"`) +- `receiver_name` (str) - Message receiver name (default: `""`) +- `receiver_id` (str) - Message receiver ID (default: `""`) +- `reply_to` (str) - Topic to reply to (default: `""`) +- `reply_to_msg_id` (str) - Message ID this message is replying to (default: `""`) +- `fileserver_upload_handler` (Callable) - Custom upload handler function + +**Return Value:** +- Returns a tuple `(envelope, envelope_json)` where: + - `envelope` - The envelope dictionary containing all metadata and payloads + - `envelope_json` - JSON string representation of the envelope for publishing + +**Input Format:** +- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]` +- Even for single payloads: `[(dataname1, data1, "type1")]` +- Each payload can have a different type, enabling mixed-content messages +- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"` + +**Flow:** +1. Generate correlation ID and message ID if not provided +2. Iterate through the list of `(dataname, data, type)` tuples +3. For each payload: + - Serialize based on payload type + - Check payload size + - If < threshold: Base64 encode and include in envelope + - If >= threshold: Upload to HTTP server, store URL in envelope +4. Publish the JSON envelope to NATS +5. Return envelope dictionary and JSON string + +#### smartreceive Handler + +```python +async def smartreceive( + msg: NATS.Message, + options: Dict = {} +) +``` + +**Options:** +- `fileserver_download_handler` (Callable) - Custom download handler function +- `max_retries` (int) - Maximum retry attempts for fetching URL (default: `5`) +- `base_delay` (int) - Initial delay for exponential backoff in ms (default: `100`) +- `max_delay` (int) - Maximum delay for exponential backoff in ms (default: `5000`) +- `correlation_id` (str) - Optional correlation ID for tracing + +**Output Format:** +- Returns a dictionary containing all envelope fields: + - `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url` + - `metadata` - Message-level metadata dictionary + - `payloads` - List of tuples, each containing `(dataname, data, payload_type)` with deserialized payload data + +**Process Flow:** +1. Parse the JSON envelope to extract all fields +2. Iterate through each payload in `payloads` list +3. For each payload: + - Determine transport type (`direct` or `link`) + - If `direct`: Base64 decode the data from the message + - If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`) - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.) 4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples +**Note:** The `fileserver_download_handler` receives `(url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str)` and returns `bytes`. + ## Scenario Implementations ### Scenario 1: Command & Control (Small Dictionary)