update
This commit is contained in:
@@ -464,60 +464,175 @@ end
|
||||
#### Dependencies
|
||||
- `nats.js` - Core NATS functionality
|
||||
- `apache-arrow` - Arrow IPC serialization
|
||||
- `uuid` - Correlation ID generation
|
||||
- `uuid` - Correlation ID and message ID generation
|
||||
- `base64-arraybuffer` - Base64 encoding/decoding
|
||||
- `node-fetch` or `fetch` - HTTP client for file server
|
||||
|
||||
#### smartsend Function
|
||||
|
||||
```javascript
|
||||
async function smartsend(subject, data, options = {})
|
||||
// data format: [(dataname, data, type), ...]
|
||||
// options object should include:
|
||||
// - natsUrl: NATS server URL
|
||||
// - fileserverUrl: base URL of the file server
|
||||
// - sizeThreshold: threshold in bytes for transport selection
|
||||
// - correlationId: optional correlation ID for tracing
|
||||
async function smartsend(
|
||||
subject,
|
||||
data, // List of (dataname, data, type) tuples: [(dataname1, data1, type1), ...]
|
||||
options = {}
|
||||
)
|
||||
```
|
||||
|
||||
**Options:**
|
||||
- `broker_url` (String) - NATS server URL (default: `"nats://localhost:4222"`)
|
||||
- `fileserver_url` (String) - Base URL of the file server (default: `"http://localhost:8080"`)
|
||||
- `size_threshold` (Number) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
|
||||
- `correlation_id` (String) - Optional correlation ID for tracing
|
||||
- `msg_purpose` (String) - Purpose of the message (default: `"chat"`)
|
||||
- `sender_name` (String) - Sender name (default: `"NATSBridge"`)
|
||||
- `receiver_name` (String) - Message receiver name (default: `""`)
|
||||
- `receiver_id` (String) - Message receiver ID (default: `""`)
|
||||
- `reply_to` (String) - Topic to reply to (default: `""`)
|
||||
- `reply_to_msg_id` (String) - Message ID this message is replying to (default: `""`)
|
||||
- `fileserver_upload_handler` (Function) - Custom upload handler function
|
||||
|
||||
**Return Value:**
|
||||
- Returns a Promise that resolves to an object containing:
|
||||
- `envelope` - The envelope object containing all metadata and payloads
|
||||
- `envelope_json` - JSON string representation of the envelope for publishing
|
||||
- `published` - Boolean indicating whether the message was automatically published to NATS
|
||||
|
||||
**Input Format:**
|
||||
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
|
||||
- Even for single payloads: `[(dataname1, data1, "type1")]`
|
||||
- Each payload can have a different type, enabling mixed-content messages
|
||||
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
|
||||
|
||||
**Flow:**
|
||||
1. Iterate through the list of (dataname, data, type) tuples
|
||||
2. For each payload: extract the type from the tuple and serialize accordingly
|
||||
3. Check payload size
|
||||
4. If < threshold: publish directly to NATS
|
||||
5. If >= threshold: upload to HTTP server, publish NATS with URL
|
||||
1. Generate correlation ID and message ID if not provided
|
||||
2. Iterate through the list of `(dataname, data, type)` tuples
|
||||
3. For each payload:
|
||||
- Serialize based on payload type
|
||||
- Check payload size
|
||||
- If < threshold: Base64 encode and include in envelope
|
||||
- If >= threshold: Upload to HTTP server, store URL in envelope
|
||||
4. Publish the JSON envelope to NATS
|
||||
5. Return envelope object and JSON string
|
||||
|
||||
#### smartreceive Handler
|
||||
|
||||
```javascript
|
||||
async function smartreceive(msg, options = {})
|
||||
// options object should include:
|
||||
// - fileserverDownloadHandler: function to fetch data from file server URL
|
||||
// - max_retries: maximum retry attempts for fetching URL
|
||||
// - base_delay: initial delay for exponential backoff in ms
|
||||
// - max_delay: maximum delay for exponential backoff in ms
|
||||
// - correlationId: optional correlation ID for tracing
|
||||
```
|
||||
|
||||
**Options:**
|
||||
- `fileserver_download_handler` (Function) - Custom download handler function
|
||||
- `max_retries` (Number) - Maximum retry attempts for fetching URL (default: `5`)
|
||||
- `base_delay` (Number) - Initial delay for exponential backoff in ms (default: `100`)
|
||||
- `max_delay` (Number) - Maximum delay for exponential backoff in ms (default: `5000`)
|
||||
- `correlation_id` (String) - Optional correlation ID for tracing
|
||||
|
||||
**Output Format:**
|
||||
- Returns a dictionary (key-value map) containing all envelope fields:
|
||||
- `correlationId`, `msgId`, `timestamp`, `sendTo`, `msgPurpose`, `senderName`, `senderId`, `receiverName`, `receiverId`, `replyTo`, `replyToMsgId`, `brokerURL`
|
||||
- Returns a Promise that resolves to an object containing all envelope fields:
|
||||
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
|
||||
- `metadata` - Message-level metadata dictionary
|
||||
- `payloads` - List of dictionaries, each containing deserialized payload data
|
||||
- `payloads` - List of dictionaries, each containing deserialized payload data with keys: `dataname`, `data`, `payload_type`
|
||||
|
||||
**Process Flow:**
|
||||
1. Parse the JSON envelope to extract all fields
|
||||
2. Iterate through each payload in `payloads`
|
||||
2. Iterate through each payload in `payloads` array
|
||||
3. For each payload:
|
||||
- Determine transport type (`direct` or `link`)
|
||||
- If `direct`: decode Base64 data from the message
|
||||
- If `link`: fetch data from URL using exponential backoff
|
||||
- If `direct`: Base64 decode the data from the message
|
||||
- If `link`: Fetch data from URL using exponential backoff (via `fileserverDownloadHandler`)
|
||||
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
|
||||
4. Return envelope object with `payloads` field containing list of `(dataname, data, type)` tuples
|
||||
|
||||
**Note:** The `fileserver_download_handler` receives `(url, max_retries, base_delay, max_delay, correlation_id)` and returns `ArrayBuffer` or `Uint8Array`.
|
||||
|
||||
### Python/Micropython Implementation
|
||||
|
||||
#### Dependencies
|
||||
- `nats-python` - Core NATS functionality
|
||||
- `pyarrow` - Arrow IPC serialization
|
||||
- `uuid` - Correlation ID and message ID generation
|
||||
- `base64` - Base64 encoding/decoding
|
||||
- `requests` or `aiohttp` - HTTP client for file server
|
||||
|
||||
#### smartsend Function
|
||||
|
||||
```python
|
||||
async def smartsend(
|
||||
subject: str,
|
||||
data: List[Tuple[str, Any, str]], # List of (dataname, data, type) tuples
|
||||
options: Dict = {}
|
||||
)
|
||||
```
|
||||
|
||||
**Options:**
|
||||
- `broker_url` (str) - NATS server URL (default: `"nats://localhost:4222"`)
|
||||
- `fileserver_url` (str) - Base URL of the file server (default: `"http://localhost:8080"`)
|
||||
- `size_threshold` (int) - Threshold in bytes for transport selection (default: `1048576` = 1MB)
|
||||
- `correlation_id` (str) - Optional correlation ID for tracing
|
||||
- `msg_purpose` (str) - Purpose of the message (default: `"chat"`)
|
||||
- `sender_name` (str) - Sender name (default: `"NATSBridge"`)
|
||||
- `receiver_name` (str) - Message receiver name (default: `""`)
|
||||
- `receiver_id` (str) - Message receiver ID (default: `""`)
|
||||
- `reply_to` (str) - Topic to reply to (default: `""`)
|
||||
- `reply_to_msg_id` (str) - Message ID this message is replying to (default: `""`)
|
||||
- `fileserver_upload_handler` (Callable) - Custom upload handler function
|
||||
|
||||
**Return Value:**
|
||||
- Returns a tuple `(envelope, envelope_json)` where:
|
||||
- `envelope` - The envelope dictionary containing all metadata and payloads
|
||||
- `envelope_json` - JSON string representation of the envelope for publishing
|
||||
|
||||
**Input Format:**
|
||||
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
|
||||
- Even for single payloads: `[(dataname1, data1, "type1")]`
|
||||
- Each payload can have a different type, enabling mixed-content messages
|
||||
- Supported types: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"`
|
||||
|
||||
**Flow:**
|
||||
1. Generate correlation ID and message ID if not provided
|
||||
2. Iterate through the list of `(dataname, data, type)` tuples
|
||||
3. For each payload:
|
||||
- Serialize based on payload type
|
||||
- Check payload size
|
||||
- If < threshold: Base64 encode and include in envelope
|
||||
- If >= threshold: Upload to HTTP server, store URL in envelope
|
||||
4. Publish the JSON envelope to NATS
|
||||
5. Return envelope dictionary and JSON string
|
||||
|
||||
#### smartreceive Handler
|
||||
|
||||
```python
|
||||
async def smartreceive(
|
||||
msg: NATS.Message,
|
||||
options: Dict = {}
|
||||
)
|
||||
```
|
||||
|
||||
**Options:**
|
||||
- `fileserver_download_handler` (Callable) - Custom download handler function
|
||||
- `max_retries` (int) - Maximum retry attempts for fetching URL (default: `5`)
|
||||
- `base_delay` (int) - Initial delay for exponential backoff in ms (default: `100`)
|
||||
- `max_delay` (int) - Maximum delay for exponential backoff in ms (default: `5000`)
|
||||
- `correlation_id` (str) - Optional correlation ID for tracing
|
||||
|
||||
**Output Format:**
|
||||
- Returns a dictionary containing all envelope fields:
|
||||
- `correlation_id`, `msg_id`, `timestamp`, `send_to`, `msg_purpose`, `sender_name`, `sender_id`, `receiver_name`, `receiver_id`, `reply_to`, `reply_to_msg_id`, `broker_url`
|
||||
- `metadata` - Message-level metadata dictionary
|
||||
- `payloads` - List of tuples, each containing `(dataname, data, payload_type)` with deserialized payload data
|
||||
|
||||
**Process Flow:**
|
||||
1. Parse the JSON envelope to extract all fields
|
||||
2. Iterate through each payload in `payloads` list
|
||||
3. For each payload:
|
||||
- Determine transport type (`direct` or `link`)
|
||||
- If `direct`: Base64 decode the data from the message
|
||||
- If `link`: Fetch data from URL using exponential backoff (via `fileserver_download_handler`)
|
||||
- Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
|
||||
4. Return envelope dictionary with `payloads` field containing list of `(dataname, data, type)` tuples
|
||||
|
||||
**Note:** The `fileserver_download_handler` receives `(url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str)` and returns `bytes`.
|
||||
|
||||
## Scenario Implementations
|
||||
|
||||
### Scenario 1: Command & Control (Small Dictionary)
|
||||
|
||||
Reference in New Issue
Block a user