update docs
This commit is contained in:
@@ -9,7 +9,7 @@
|
||||
|
||||
## 1. Executive Summary
|
||||
|
||||
This document provides the **end-to-end trace** for msghandler - the cross-platform bi-directional data bridge that enables seamless communication between **Julia**, **JavaScript**, **Python**, **Dart**, **Rust**, and **MicroPython** applications using NATS as the message bus.
|
||||
This document provides the **end-to-end trace** for msghandler - the cross-platform bi-directional data bridge that enables seamless communication between **Julia**, **JavaScript**, **Python**, **Dart**, **Rust**, and **MicroPython** applications using a message broker as the transport layer.
|
||||
|
||||
This walkthrough serves as the primary onboarding guide for new developers and explains:
|
||||
- **User scenarios** - Real-world use cases from developer perspective
|
||||
@@ -51,7 +51,7 @@ flowchart TB
|
||||
S3["Size Check"]
|
||||
S4["Transport Selection"]
|
||||
S5["Build Envelope"]
|
||||
S6["Publish to NATS"]
|
||||
S6["Publish to transport"]
|
||||
|
||||
S1 --> S2
|
||||
S2 --> S3
|
||||
@@ -62,7 +62,7 @@ flowchart TB
|
||||
|
||||
subgraph Receiver["Receiver (smartreceive)"]
|
||||
direction LR
|
||||
R1["Subscribe to NATS"]
|
||||
R1["Subscribe via transport"]
|
||||
R2["Parse Envelope"]
|
||||
R3["Check Transport"]
|
||||
R4["Deserialize Data"]
|
||||
@@ -99,7 +99,7 @@ flowchart TB
|
||||
|
||||
| Principle | Description | Rationale |
|
||||
|-----------|-------------|-----------|
|
||||
| **Claim-Check Pattern** | Large payloads uploaded to HTTP server, URL sent via NATS | NATS has message size limits; avoids NATS overflow |
|
||||
| **Claim-Check Pattern** | Large payloads uploaded to HTTP server, URL sent via transport | Transport has message size limits; avoids overflow |
|
||||
| **Automatic Transport Selection** | Direct (< threshold) vs Link (≥ threshold) based on size | Optimizes memory vs network I/O trade-off |
|
||||
| **Cross-Platform API** | Consistent `smartsend()`/`smartreceive()` across all platforms | Simplifies developer experience |
|
||||
| **Exponential Backoff** | Retry downloads with increasing delays | Handles transient failures gracefully |
|
||||
@@ -148,7 +148,7 @@ For each payload, msghandler determines transport:
|
||||
|
||||
**Rationale**:
|
||||
- Direct transport is faster for small payloads (no file server round-trip)
|
||||
- Link transport is used when payload ≥ 0.5MB (avoids NATS size limits)
|
||||
- Link transport is used when payload ≥ 0.5MB (avoids transport size limits)
|
||||
|
||||
#### Step 3: Serialization and Encoding
|
||||
|
||||
@@ -213,25 +213,26 @@ msghandler builds the message envelope:
|
||||
- **reply_to**: Tells backend where to send response
|
||||
- **payloads array**: Contains all data with metadata for proper handling
|
||||
|
||||
#### Step 5: Publish to NATS (Caller's Responsibility)
|
||||
#### Step 5: Publish to Transport (Caller's Responsibility)
|
||||
|
||||
```javascript
|
||||
// NATS publishing is the caller's responsibility
|
||||
const conn = await NATS.connect({ servers: "ws://localhost:4222" });
|
||||
await conn.publish("/agent/wine/api/v1/prompt", msgJson);
|
||||
// Publishing via the transport layer is the caller's responsibility
|
||||
// Example with any transport (NATS, MQTT, WebSocket, etc.)
|
||||
// const conn = await transportClient.connect({ servers: "ws://localhost:4222" });
|
||||
// await conn.publish("/agent/wine/api/v1/prompt", msgJson);
|
||||
```
|
||||
|
||||
**Rationale**:
|
||||
- NATS provides low-latency message delivery
|
||||
- The transport layer provides message delivery (NATS, MQTT, WebSocket, etc.)
|
||||
- JSON format ensures cross-platform compatibility
|
||||
- `smartsend()` returns `(env, msgJson)` - caller handles publishing
|
||||
- `smartsend()` returns `(env, msgJson)` - caller handles publishing via their chosen transport
|
||||
|
||||
#### Step 6: Julia Backend Receives Message
|
||||
|
||||
```julia
|
||||
# Julia backend
|
||||
nats_msg = NATS.subscription.next() # Get message from NATS
|
||||
env = smartreceive(String(nats_msg.payload))
|
||||
transport_msg = transport_subscription.next() # Get message from transport
|
||||
env = smartreceive(String(transport_msg.payload))
|
||||
|
||||
# env["payloads"] is now:
|
||||
# [
|
||||
@@ -302,7 +303,7 @@ const [env, msgJson] = await msghandler.smartsend(
|
||||
**Rationale**:
|
||||
- Link transport used for large payloads
|
||||
- File server handles large file upload
|
||||
- NATS only sends URL (small message)
|
||||
- Transport only sends URL (small message)
|
||||
|
||||
#### Step 3: File Server Upload
|
||||
|
||||
@@ -356,8 +357,8 @@ const response = await plikOneshotUpload(
|
||||
|
||||
```julia
|
||||
# Julia backend
|
||||
nats_msg = NATS.subscription.next()
|
||||
env = smartreceive(String(nats_msg.payload))
|
||||
transport_msg = transport_subscription.next()
|
||||
env = smartreceive(String(transport_msg.payload))
|
||||
|
||||
# msghandler automatically:
|
||||
# 1. Extracts URL from payload
|
||||
@@ -396,7 +397,7 @@ df = pd.DataFrame({
|
||||
env, msg_json = await smartsend(
|
||||
"/agent/wine/api/v1/analyze",
|
||||
[("data", df, "arrowtable")],
|
||||
broker_url="nats://localhost:4222",
|
||||
broker_url=DEFAULT_BROKER_URL,
|
||||
receiver_name="agent-backend"
|
||||
)
|
||||
```
|
||||
@@ -429,8 +430,8 @@ arrow_bytes = buf.getvalue()
|
||||
|
||||
```julia
|
||||
# Julia backend
|
||||
nats_msg = NATS.subscription.next()
|
||||
env = smartreceive(String(nats_msg.payload))
|
||||
transport_msg = transport_subscription.next()
|
||||
env = smartreceive(String(transport_msg.payload))
|
||||
|
||||
# env["payloads"][1] is now:
|
||||
# ("data", DataFrame with id, name, score columns, "arrowtable")
|
||||
@@ -479,7 +480,7 @@ use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let conn = nats::connect("nats://localhost:4222").unwrap();
|
||||
let conn = transport_client::connect("ws://localhost:4222").unwrap();
|
||||
|
||||
// Subscribe and receive messages
|
||||
let mut sub = conn.subscribe("/agent/wine/api/v1/analyze").unwrap();
|
||||
@@ -520,7 +521,7 @@ async fn main() {
|
||||
|
||||
**Rationale**:
|
||||
- **serde serialization**: Automatic JSON deserialization to `MsgEnvelopeV1`
|
||||
- **tokio runtime**: Efficient async I/O for NATS and HTTP operations
|
||||
- **tokio runtime**: Efficient async I/O for transport and HTTP operations
|
||||
- **smartreceive deserialization**: Payload data is deserialized and stored as strings in `payload.data`
|
||||
- **Type dispatch**: `payload_type` field determines how to interpret the `data` string
|
||||
|
||||
@@ -548,14 +549,14 @@ let (envelope, json_str) = smartsend(
|
||||
),
|
||||
],
|
||||
&SmartsendOptions {
|
||||
broker_url: "nats://localhost:4222".to_string(),
|
||||
broker_url: DEFAULT_BROKER_URL.to_string(),
|
||||
reply_to: "/python/worker/v1/results".to_string(),
|
||||
msg_purpose: "chat".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
).await?;
|
||||
|
||||
// Caller publishes to NATS
|
||||
// Caller publishes via transport
|
||||
conn.publish("/agent/wine/api/v1/results", &json_str)?;
|
||||
```
|
||||
|
||||
@@ -569,7 +570,7 @@ conn.publish("/agent/wine/api/v1/results", &json_str)?;
|
||||
|
||||
```python
|
||||
# Python backend receives Rust response
|
||||
env = await smartreceive(str(nats_msg.payload))
|
||||
env = await smartreceive(str(transport_msg.payload))
|
||||
|
||||
# env["payloads"][0] is now:
|
||||
# ("results", arrow_table_data, "arrowtable")
|
||||
@@ -598,9 +599,9 @@ let (envelope, json_str) = smartsend(
|
||||
),
|
||||
],
|
||||
&SmartsendOptions {
|
||||
broker_url: "nats://localhost:4222".to_string(),
|
||||
fileserver_url: "http://localhost:8080".to_string(),
|
||||
size_threshold: 500_000, // 0.5MB triggers link transport
|
||||
broker_url: DEFAULT_BROKER_URL.to_string(),
|
||||
fileserver_url: DEFAULT_FILESERVER_URL.to_string(),
|
||||
size_threshold: DEFAULT_SIZE_THRESHOLD, // threshold triggers link transport
|
||||
..Default::default()
|
||||
},
|
||||
).await?;
|
||||
@@ -637,7 +638,7 @@ sensor_data = {
|
||||
env, msg_json = smartsend(
|
||||
"/sensor/device/v1/readings",
|
||||
[("data", sensor_data, "dictionary")],
|
||||
broker_url="nats://localhost:4222",
|
||||
broker_url=DEFAULT_BROKER_URL,
|
||||
size_threshold=100000 # 100KB for MicroPython
|
||||
)
|
||||
```
|
||||
@@ -658,15 +659,15 @@ payload_b64 = base64.b64encode(json_bytes).decode('ascii')
|
||||
|
||||
**Rationale**:
|
||||
- JSON format for human-readable data
|
||||
- Base64 for NATS compatibility
|
||||
- Base64 for transport compatibility
|
||||
- UTF-8 for text encoding
|
||||
|
||||
#### Step 3: Python Backend Receives
|
||||
|
||||
```python
|
||||
# Python backend
|
||||
nats_msg = await nats_consumer.next()
|
||||
env = await smartreceive(str(nats_msg.payload))
|
||||
transport_msg = await transport_consumer.next()
|
||||
env = await smartreceive(str(transport_msg.payload))
|
||||
|
||||
# env["payloads"][0] is now:
|
||||
# ("data", {"temperature": 25.5, "humidity": 60.0, ...}, "dictionary")
|
||||
@@ -708,14 +709,14 @@ const [env, msgJson] = await msghandler.smartsend(
|
||||
**Rationale**:
|
||||
- Empty `receiver_name` = broadcast to all subscribers
|
||||
- Chat messages often include text + images
|
||||
- NATS wildcard subscriptions route to correct recipients
|
||||
- Transport wildcard subscriptions route to correct recipients
|
||||
|
||||
#### Step 2: Python Backend Receives
|
||||
|
||||
```python
|
||||
# Python (Backend)
|
||||
nats_msg = await nats_consumer.next()
|
||||
env = await smartreceive(str(nats_msg.payload))
|
||||
transport_msg = await transport_consumer.next()
|
||||
env = await smartreceive(str(transport_msg.payload))
|
||||
|
||||
# env["payloads"] is now:
|
||||
# [
|
||||
@@ -733,8 +734,8 @@ env = await smartreceive(str(nats_msg.payload))
|
||||
|
||||
```julia
|
||||
# Julia (Backend)
|
||||
nats_msg = NATS.subscription.next()
|
||||
env = smartreceive(String(nats_msg.payload))
|
||||
transport_msg = transport_subscription.next()
|
||||
env = smartreceive(String(transport_msg.payload))
|
||||
|
||||
# env["payloads"] is now:
|
||||
# [
|
||||
@@ -795,7 +796,7 @@ await msghandler.smartsend(
|
||||
| File server unavailable | `UPLOAD_FAILED` | Fall back to direct transport or smaller payloads |
|
||||
| File server download fails | `DOWNLOAD_FAILED` | Retry with exponential backoff |
|
||||
| Payload type mismatch | `DESERIALIZATION_ERROR` | Validate payload_type matches data |
|
||||
| NATS connection lost | `NATS_CONNECTION_FAILED` | NATS client auto-reconnects |
|
||||
| Transport connection lost | `TRANSPORT_CONNECTION_FAILED` | Transport client auto-reconnects |
|
||||
|
||||
### Error Response Format
|
||||
|
||||
@@ -828,14 +829,14 @@ correlation_id = string(uuid4())
|
||||
# Use throughout the flow
|
||||
log_trace(correlation_id, "Starting smartsend")
|
||||
log_trace(correlation_id, "Serialized payload size: 100 bytes")
|
||||
log_trace(correlation_id, "Published to NATS")
|
||||
log_trace(correlation_id, "Published to transport")
|
||||
```
|
||||
|
||||
**Log Format**:
|
||||
```
|
||||
[2026-03-13T16:30:00.000Z] [Correlation: abc123...] Starting smartsend
|
||||
[2026-03-13T16:30:00.001Z] [Correlation: abc123...] Serialized payload size: 100 bytes
|
||||
[2026-03-13T16:30:00.002Z] [Correlation: abc123...] Published to NATS
|
||||
[2026-03-13T16:30:00.002Z] [Correlation: abc123...] Published to transport
|
||||
```
|
||||
|
||||
---
|
||||
@@ -846,7 +847,7 @@ log_trace(correlation_id, "Published to NATS")
|
||||
|
||||
| Strategy | Description | When to Use |
|
||||
|----------|-------------|-------------|
|
||||
| Pre-create NATS connection | Reuse connection for multiple sends | High-throughput scenarios |
|
||||
| Pre-create transport connection | Reuse connection for multiple sends | High-throughput scenarios |
|
||||
| Adjust size threshold | Increase threshold if file server slow | File server bottleneck |
|
||||
| Use direct transport | Avoid file server for small payloads | Low latency requirements |
|
||||
|
||||
@@ -868,7 +869,7 @@ log_trace(correlation_id, "Published to NATS")
|
||||
|
||||
| Component | Minimum | Notes |
|
||||
|-----------|---------|-------|
|
||||
| NATS Server | 1 instance | Single node for development |
|
||||
| Message Broker | 1 instance | Single node for development |
|
||||
| File Server | 1 instance | HTTP server for large payloads |
|
||||
| Client Memory | 50MB | Desktop platforms (Julia/JS/Python/Dart) |
|
||||
| Client Memory | 256KB | MicroPython devices |
|
||||
@@ -877,7 +878,7 @@ log_trace(correlation_id, "Published to NATS")
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `NATS_URL` | `nats://localhost:4222` | NATS server URL |
|
||||
| `BROKER_URL` | `ws://localhost:4222` | Message broker URL |
|
||||
| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL |
|
||||
| `SIZE_THRESHOLD` | `500000` | Size threshold in bytes (0.5MB) |
|
||||
|
||||
@@ -911,7 +912,7 @@ log_trace(correlation_id, "Published to NATS")
|
||||
|------|----------|----------|---------------------------|
|
||||
| [`src/msghandler.jl`](../src/msghandler.jl) | Julia | Full feature set, Arrow IPC, multiple dispatch | specification.md:2-19 (all sections) |
|
||||
| [`src/msghandler_ssr.js`](../src/msghandler_ssr.js) | Node.js | Arrow IPC, async/await | specification.md:2-19 (all sections) |
|
||||
| [`src/msghandler_csr.js`](../src/msghandler_csr.js) | Browser | JSON table only, WebSocket NATS | specification.md:2-19 (all sections) |
|
||||
| [`src/msghandler_csr.js`](../src/msghandler_csr.js) | Browser | JSON table only | specification.md:2-19 (all sections) |
|
||||
| [`src/msghandler.py`](../src/msghandler.py) | Python | Arrow IPC, async/await | specification.md:2-19 (all sections) |
|
||||
| [`src/msghandler.dart`](../src/msghandler.dart) | Dart | Full feature set, Arrow IPC, async/await | specification.md:2-19 (all sections) |
|
||||
| [`src/msghandler.rs`](../src/msghandler.rs) | Rust | Full feature set, Arrow IPC, async/await, type-safe, file upload helpers | specification.md:2-19 (all sections) |
|
||||
@@ -923,6 +924,10 @@ log_trace(correlation_id, "Published to NATS")
|
||||
|
||||
| Date | Version | Changes | Specification Reference |
|
||||
|------|---------|---------|------------------------|
|
||||
| 2026-05-15 | 1.5.0 | Made transport layer agnostic | All sections |
|
||||
| - | - | Removed all NATS-specific references from walkthrough | All sections |
|
||||
| - | - | Updated code examples to use transport-agnostic patterns | All sections |
|
||||
| - | - | Updated diagrams to remove NATS-specific labels | All sections |
|
||||
| 2026-05-14 | 1.4.0 | Updated Rust API to reflect `smartreceive` deserialization changes | All sections |
|
||||
| - | - | `smartreceive` now stores deserialized data in `MsgPayloadV1.data` | specification.md:8 |
|
||||
| - | - | Added `plik_upload_file` convenience function documentation | specification.md:13 |
|
||||
@@ -932,8 +937,8 @@ log_trace(correlation_id, "Published to NATS")
|
||||
| - | - | Added Rust user scenario (User Scenario 4) | specification.md:11 (Rust API) |
|
||||
| - | - | Updated scenario numbering (MicroPython → Scenario 5, Cross-Platform → Scenario 6) | All sections |
|
||||
| 2026-05-13 | 1.2.0 | Aligned with ground truth implementation (src/msghandler.jl) | All sections |
|
||||
| - | - | Updated smartreceive calls to use String(nats_msg.payload) pattern | All sections |
|
||||
| - | - | Removed NATSClient.publish() calls (caller responsible for NATS publishing) | All sections |
|
||||
| - | - | Updated smartreceive calls to use transport payload pattern | All sections |
|
||||
| - | - | Removed NATSClient.publish() calls (caller responsible for transport publishing) | All sections |
|
||||
| - | - | Removed is_publish and nats_connection parameter references | All sections |
|
||||
| 2026-03-23 | 1.0.0 | Updated to ASG Framework walkthrough guidelines | All sections |
|
||||
| 2026-03-13 | 1.0.0 | Initial walkthrough documentation | specification.md:2-19 (all sections) |
|
||||
|
||||
Reference in New Issue
Block a user