1080 lines
43 KiB
Markdown
1080 lines
43 KiB
Markdown
# Walkthrough: msghandler
|
|
|
|
**Version**: 1.5.0
|
|
**Date**: 2026-05-22
|
|
**Status**: Active
|
|
**Ground Truth**: [`src/msghandler.jl`](../src/msghandler.jl)
|
|
**ASG Framework Alignment**: v8 pillars - Requirements → Solution Design → Specification → Walkthrough → Implementation Plan → Validation → Runbook
|
|
|
|
---
|
|
|
|
## 1. Executive Summary
|
|
|
|
This document provides the **end-to-end trace** for msghandler - the cross-platform bi-directional data bridge that enables seamless communication between **Julia**, **JavaScript**, **Python**, **Dart**, **Rust**, and **MicroPython** applications using a message broker as the transport layer.
|
|
|
|
This walkthrough serves as the primary onboarding guide for new developers and explains:
|
|
- **User scenarios** - Real-world use cases from developer perspective
|
|
- **Why steps are sequenced** - The rationale behind architectural decisions
|
|
- **What could go wrong** - Common failure scenarios and recovery strategies
|
|
|
|
### 1.1 Specification Traceability
|
|
|
|
| Walkthrough Section | Specification Reference | Requirement ID(s) | Solution Design Ref(s) | Description |
|
|
|---------------------|-------------------------|-------------------|------------------------|-------------|
|
|
| Section 2 (Big Picture) | specification.md:2, specification.md:15 | FR-001, FR-002, FR-003, FR-004, FR-005, FR-006, FR-007, FR-012, FR-013, FR-014 | SD-001, SD-002, SD-005, SD-006 | End-to-end system flow diagrams |
|
|
| Section 3 (Chat Scenario) | specification.md:2, specification.md:3, specification.md:5, specification.md:11 | FR-001, FR-006, FR-007, FR-012, FR-013, FR-014 | SD-001, SD-004, SD-005, SD-006 | Chat webapp ↔ Julia backend with mixed payloads |
|
|
| Section 4 (Large File) | specification.md:6, specification.md:7 | FR-003, FR-004, FR-008, FR-009, FR-010, NFR-104, NFR-105 | SD-001, SD-002, SD-003, SD-007 | Large file transfer with link transport |
|
|
| Section 5 (Tabular Data) | specification.md:5, specification.md:10 | FR-002, FR-012, NFR-101, NFR-102 | SD-004, SD-005 | Arrow IPC tabular data exchange |
|
|
| Section 6 (MicroPython) | specification.md:13, specification.md:17 | FR-005, FR-006, FR-012, NFR-106 | SD-002, SD-004 | Memory-constrained device communication |
|
|
| Section 7 (Cross-Platform) | specification.md:3, specification.md:4, specification.md:5, specification.md:11 | FR-001, FR-002, FR-003, FR-004, FR-005, FR-006, FR-007, FR-012, FR-013, FR-014 | SD-001, SD-002, SD-004, SD-005, SD-006 | Multi-platform chat application |
|
|
| Section 8 (Error Handling) | specification.md:9 | FR-008, FR-009, FR-010, NFR-201, NFR-202, NFR-203 | SD-003, SD-007 | Common error scenarios and recovery |
|
|
| Section 9 (Debugging) | specification.md:4, specification.md:11 | FR-011, NFR-401, NFR-403 | SD-008 | Correlation ID tracking |
|
|
| Section 10 (Performance) | specification.md:7, specification.md:13 | NFR-101, NFR-102, NFR-103, NFR-104, NFR-105, NFR-106, NFR-107 | SD-001, SD-002, SD-006 | Optimization strategies |
|
|
| Section 11 (Deployment) | specification.md:12, specification.md:18 | FR-013, FR-014, NFR-201, NFR-203 | SD-006 | Infrastructure requirements |
|
|
|
|
---
|
|
|
|
## 2. Overview: The Big Picture
|
|
|
|
msghandler implements the **Claim-Check pattern** for efficient handling of large payloads (>0.5MB):
|
|
|
|
```mermaid
|
|
flowchart TB
|
|
subgraph msghandler["msghandler Module"]
|
|
direction TB
|
|
|
|
subgraph Sender["Sender (smartpack)"]
|
|
direction LR
|
|
S1["Data Tuples<br/>[(dataname, data, type)]"]
|
|
S2["Serialize Data"]
|
|
S3["Size Check"]
|
|
S4["Transport Selection"]
|
|
S5["Build Envelope"]
|
|
S6["Publish to transport"]
|
|
|
|
S1 --> S2
|
|
S2 --> S3
|
|
S3 --> S4
|
|
S4 --> S5
|
|
S5 --> S6
|
|
end
|
|
|
|
subgraph Receiver["Receiver (smartunpack)"]
|
|
direction LR
|
|
R1["Subscribe via transport"]
|
|
R2["Parse Envelope"]
|
|
R3["Check Transport"]
|
|
R4["Deserialize Data"]
|
|
R5["Return Payloads"]
|
|
|
|
R1 --> R2
|
|
R2 --> R3
|
|
R3 --> R4
|
|
R4 --> R5
|
|
end
|
|
|
|
S6 -.->|Message| R1
|
|
end
|
|
|
|
subgraph FileServer["HTTP File Server (Plik)"]
|
|
direction TB
|
|
FS1["Upload URL"]
|
|
FS2["Download URL"]
|
|
|
|
S4 -.->|Large Payload| FS1
|
|
FS1 -.->|URL| S5
|
|
R3 -.->|Fetch URL| FS2
|
|
end
|
|
|
|
style msghandler fill:#e1f5fe,stroke:#0288d1,stroke-width:2px
|
|
style Sender fill:#b3e5fc,stroke:#0288d1
|
|
style Receiver fill:#b3e5fc,stroke:#0288d1
|
|
style FileServer fill:#ffe0b2,stroke:#f57c00
|
|
```
|
|
|
|
### 2.1 Key Design Principles
|
|
|
|
| Principle | Description | Rationale |
|
|
|-----------|-------------|-----------|
|
|
| **Claim-Check Pattern** | Large payloads uploaded to HTTP server, URL sent via transport | Transport has message size limits; avoids overflow |
|
|
| **Automatic Transport Selection** | Direct (< threshold) vs Link (≥ threshold) based on size | Optimizes memory vs network I/O trade-off |
|
|
| **Cross-Platform API** | Consistent `smartpack()`/`smartunpack()` across all platforms | Simplifies developer experience |
|
|
| **Exponential Backoff** | Retry downloads with increasing delays | Handles transient failures gracefully |
|
|
|
|
---
|
|
|
|
## 3. User Scenario 1: Chat Webapp ↔ Julia Backend
|
|
|
|
### Scenario Description
|
|
|
|
A JavaScript chat webapp wants to send mixed payloads (text message + user avatar image) to a Julia backend, and receive mixed payloads (text response + AI-generated image) back.
|
|
|
|
### Complete End-to-End Round-Trip Flow
|
|
|
|
msghandler implements a **transport-agnostic** messaging pattern. The library handles serialization and envelope building, while the caller is responsible for publishing/subscribing via their chosen transport (NATS, MQTT, WebSocket, HTTP, etc.).
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
|
│ END-TO-END ROUND-TRIP FLOW │
|
|
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
|
|
|
SENDER (JavaScript Webapp) TRANSPORT LAYER RECEIVER (Julia Backend)
|
|
┌─────────────────────┐ ┌───────────────────┐ ┌─────────────────────┐
|
|
│ │ │ │ │ │
|
|
│ 1. Prepare data │ │ │ │ │
|
|
│ [(msg, data, │ │ │ │ │
|
|
│ type)] │ │ │ │ │
|
|
│ │ │ │ │ │
|
|
│ 2. Call smartpack() │──────────────────────────────────────>│ │ │ │
|
|
│ Returns (env, │ │ JSON Message │ │ │
|
|
│ json_str) │ │ (json_str) │ │ │
|
|
│ │ │ │ │ │
|
|
│ 3. Publish via │ │ │ │ │
|
|
│ transport: │──────────────────────────────────────>│ (NATS/MQTT/ │────────────────────────>│ 4. Subscribe via │
|
|
│ MY_TRANSPORT. │ │ WebSocket, etc.) │ │ transport: │
|
|
│ publish(subject, │ │ │ │ msg = │
|
|
│ msgJson) │ │ │ │ SUBSCRIBE() │
|
|
│ │ │ │ │ │
|
|
│ │ │ │ │ 5. Call │
|
|
│ │ │ │ │ smartunpack( │
|
|
│ │ │ │ │ String( │
|
|
│ │ │ │ │ msg.payload) │
|
|
│ │ │ │ │ │
|
|
│ │ │ │ │ 6. Receive │
|
|
│ │ │ │ │ payloads: │
|
|
│ │ │ │ │ [(msg, data, │
|
|
│ │ │ │ │ type), ...] │
|
|
│ │ │ │ │ │
|
|
└─────────────────────┘ └───────────────────┘ └─────────────────────┘
|
|
|
|
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
|
│ JULIA BACKEND RESPONSE (Reverse Flow) │
|
|
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
|
|
|
RECEIVER (Julia Backend) TRANSPORT LAYER SENDER (JavaScript Webapp)
|
|
┌─────────────────────┐ ┌───────────────────┐ ┌─────────────────────┐
|
|
│ │ │ │ │ │
|
|
│ 7. Process data │ │ │ │ │
|
|
│ (AI inference) │ │ │ │ │
|
|
│ │ │ │ │ │
|
|
│ 8. Call smartpack() │──────────────────────────────────────>│ │ │ │
|
|
│ Returns (env, │ │ JSON Message │ │ │
|
|
│ json_str) │ │ (json_str) │ │ │
|
|
│ │ │ │ │ │
|
|
│ 9. Publish reply │ │ │ │ │
|
|
│ via transport: │──────────────────────────────────────>│ (NATS/MQTT/ │────────────────────────>│ 10. Subscribe via │
|
|
│ MY_TRANSPORT. │ │ WebSocket, etc.) │ │ reply_to topic │
|
|
│ publish(replyTo, │ │ │ │ │
|
|
│ msgJson) │ │ │ │ │
|
|
│ │ │ │ │ │
|
|
└─────────────────────┘ └───────────────────┘ └─────────────────────┘
|
|
```
|
|
|
|
### Step-by-Step Flow (Sender Perspective)
|
|
|
|
#### 3.1 Step 1: JavaScript Webapp Sends Mixed Payloads
|
|
|
|
```javascript
|
|
// JavaScript (Browser or Node.js)
|
|
const [env, msgJson] = await msghandler.smartpack(
|
|
"/agent/wine/api/v1/prompt",
|
|
[
|
|
["msg", "Hello! I'm Ton.", "text"],
|
|
["avatar", avatarImageData, "image"]
|
|
],
|
|
{
|
|
broker_url: "ws://localhost:4222",
|
|
receiver_name: "agent-backend",
|
|
msg_purpose: "chat"
|
|
}
|
|
);
|
|
```
|
|
|
|
**Rationale**:
|
|
- **Why mixed payloads?** Real chat apps often send both text and images together
|
|
- **Why text first?** Text is smaller, sent via direct transport (fast, no file server needed)
|
|
- **Why image second?** Images may trigger link transport if >0.5MB
|
|
|
|
#### 3.2 Step 2: Transport Selection
|
|
|
|
For each payload, msghandler determines transport:
|
|
|
|
| Payload | Size | Transport | Reason |
|
|
|---------|------|-----------|--------|
|
|
| `"msg"` (text) | ~20 bytes | direct | < 0.5MB threshold |
|
|
| `"avatar"` (image) | ~150KB | direct | < 0.5MB threshold |
|
|
|
|
**Rationale**:
|
|
- Direct transport is faster for small payloads (no file server round-trip)
|
|
- Link transport is used when payload ≥ 0.5MB (avoids transport size limits)
|
|
|
|
#### 3.3 Step 3: Serialization and Encoding
|
|
|
|
Each payload is serialized:
|
|
|
|
| Payload | Type | Serialization | Encoding |
|
|
|---------|------|---------------|----------|
|
|
| `"msg"` | `text` | UTF-8 bytes | Base64 |
|
|
| `"avatar"` | `image` | Raw bytes | Base64 |
|
|
|
|
**Rationale**:
|
|
- Text uses UTF-8 encoding for human-readable data
|
|
- Images use raw bytes to preserve binary data integrity
|
|
- All payloads encoded as Base64 for JSON compatibility
|
|
|
|
#### 3.4 Step 4: Envelope Building
|
|
|
|
msghandler builds the message envelope:
|
|
|
|
```json
|
|
{
|
|
"correlation_id": "a1b2c3d4...",
|
|
"msg_id": "e5f6g7h8...",
|
|
"timestamp": "2026-03-13T16:30:00.000Z",
|
|
"send_to": "/agent/wine/api/v1/prompt",
|
|
"msg_purpose": "chat",
|
|
"sender_name": "chat-webapp",
|
|
"sender_id": "sender-uuid...",
|
|
"receiver_name": "agent-backend",
|
|
"receiver_id": "",
|
|
"reply_to": "/agent/wine/api/v1/response",
|
|
"reply_to_msg_id": "",
|
|
"broker_url": "ws://localhost:4222",
|
|
"metadata": {},
|
|
"payloads": [
|
|
{
|
|
"id": "payload-uuid...",
|
|
"dataname": "msg",
|
|
"payload_type": "text",
|
|
"transport": "direct",
|
|
"encoding": "base64",
|
|
"size": 20,
|
|
"data": "SGVsbG8hIEknIHRlbCB5b3UgSW4gZW5nbGlzaC4=",
|
|
"metadata": {"payload_bytes": 20}
|
|
},
|
|
{
|
|
"id": "payload-uuid...",
|
|
"dataname": "avatar",
|
|
"payload_type": "image",
|
|
"transport": "direct",
|
|
"encoding": "base64",
|
|
"size": 150000,
|
|
"data": "iVBORw0KGgoAAAANSUhEUgAA...",
|
|
"metadata": {"payload_bytes": 150000}
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
**Rationale**:
|
|
- **correlation_id**: Tracks this chat session across all systems
|
|
- **reply_to**: Tells backend where to send response
|
|
- **payloads array**: Contains all data with metadata for proper handling
|
|
|
|
#### 3.5 Step 5: Publish to Transport (Caller's Responsibility)
|
|
|
|
**Choose your transport** (replace `MY_TRANSPORT` with your actual library):
|
|
|
|
```javascript
|
|
// Example 1: NATS (Node.js)
|
|
import { connect } from 'nats';
|
|
const nats = connect({ servers: 'ws://localhost:4222' });
|
|
await nats.publish("/agent/wine/api/v1/prompt", msgJson);
|
|
|
|
// Example 2: MQTT (Node.js)
|
|
import * as mqtt from 'mqtt';
|
|
const client = mqtt.connect('ws://localhost:1883');
|
|
client.publish("/agent/wine/api/v1/prompt", msgJson);
|
|
|
|
// Example 3: WebSocket (Browser)
|
|
const ws = new WebSocket('ws://localhost:4222/ws');
|
|
ws.send(msgJson);
|
|
|
|
// Example 4: Custom HTTP POST
|
|
fetch('http://localhost:8000/publish', {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: msgJson
|
|
});
|
|
```
|
|
|
|
**Why caller responsibility?**
|
|
- **Transport agnostic**: msghandler supports NATS, MQTT, WebSocket, HTTP, or any custom transport
|
|
- **Connection reuse**: Callers can manage connection pools efficiently
|
|
- **Flexibility**: No library lock-in; use whatever transport best fits your stack
|
|
|
|
#### 3.6 Step 6: Julia Backend Receives Message
|
|
|
|
```julia
|
|
# Julia backend
|
|
# Choose your transport (replace MY_TRANSPORT with your actual library)
|
|
|
|
# Example: NATS subscription
|
|
using NATS
|
|
conn = NATS.connect("nats.yiem.cc")
|
|
NATS.subscribe(conn, "/agent/wine/api/v1/prompt") do msg
|
|
env = smartunpack(String(msg.payload))
|
|
|
|
# env["payloads"] is now:
|
|
# [
|
|
# ("msg", "Hello! I'm Ton.", "text"),
|
|
# ("avatar", binary_data, "image")
|
|
# ]
|
|
end
|
|
```
|
|
|
|
**Rationale**:
|
|
- `smartunpack()` handles both transport types automatically
|
|
- Deserialization is type-aware based on `payload_type`
|
|
- Returns consistent tuple format regardless of transport
|
|
|
|
#### 3.7 Step 7: Julia Backend Sends Response
|
|
|
|
```julia
|
|
# Julia backend processes the message
|
|
response_text = "Hello Ton! I'm the AI assistant."
|
|
generated_image = generate_ai_image(response_text);
|
|
|
|
env, msg_json = smartpack(
|
|
"/agent/wine/api/v1/response",
|
|
[
|
|
("response", response_text, "text"),
|
|
("generated_image", generated_image, "image")
|
|
],
|
|
reply_to = "/chat/user/v1/message",
|
|
reply_to_msg_id = env["msg_id"]
|
|
);
|
|
|
|
# Publish response via transport (caller's responsibility)
|
|
# Example: NATS
|
|
using NATS
|
|
NATS.publish(conn, "/agent/wine/api/v1/response", msg_json)
|
|
```
|
|
|
|
**Rationale**:
|
|
- **Mixed response**: Text explanation + AI-generated image
|
|
- **reply_to**: Ensures response goes to correct topic
|
|
- **reply_to_msg_id**: Links response to original message for tracing
|
|
|
|
---
|
|
|
|
## 4. User Scenario 2: Large File Transfer
|
|
|
|
### Scenario Description
|
|
|
|
A JavaScript webapp wants to upload a large file (10MB) to a Julia backend for processing.
|
|
|
|
### Step-by-Step Flow
|
|
|
|
#### 4.1 Step 1: JavaScript Webapp Sends Large File
|
|
|
|
```javascript
|
|
const [env, msgJson] = await msghandler.smartpack(
|
|
"/agent/wine/api/v1/process",
|
|
[
|
|
["file", largeFileData, "binary"]
|
|
],
|
|
{
|
|
broker_url: "ws://localhost:4222",
|
|
receiver_name: "agent-backend"
|
|
}
|
|
);
|
|
```
|
|
|
|
#### 4.2 Step 2: Transport Selection (Link)
|
|
|
|
| Payload | Size | Transport | Reason |
|
|
|---------|------|-----------|--------|
|
|
| `"file"` | 10MB | link | ≥ 0.5MB threshold |
|
|
|
|
**Rationale**:
|
|
- Link transport used for large payloads
|
|
- File server handles large file upload
|
|
- Transport only sends URL (small message)
|
|
|
|
#### 4.3 Step 3: File Server Upload
|
|
|
|
```javascript
|
|
// msghandler internally calls:
|
|
const response = await plikOneshotUpload(
|
|
"http://localhost:8080",
|
|
"file",
|
|
largeFileData
|
|
);
|
|
|
|
// Response:
|
|
// {
|
|
// status: 200,
|
|
// uploadid: "UPLOAD_ID",
|
|
// fileid: "FILE_ID",
|
|
// url: "http://localhost:8080/file/UPLOAD_ID/FILE_ID/file"
|
|
// }
|
|
```
|
|
|
|
**Rationale**:
|
|
- Plik handles multipart upload
|
|
- One-shot mode simplifies API
|
|
- Returns URL for download
|
|
|
|
#### 4.4 Step 4: Envelope with Link Transport
|
|
|
|
```json
|
|
{
|
|
"correlation_id": "a1b2c3d4...",
|
|
"payloads": [
|
|
{
|
|
"id": "payload-uuid...",
|
|
"dataname": "file",
|
|
"payload_type": "binary",
|
|
"transport": "link",
|
|
"encoding": "none",
|
|
"size": 10000000,
|
|
"data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/file"
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
**Rationale**:
|
|
- `data` field contains URL instead of Base64
|
|
- `transport: "link"` signals URL-based download
|
|
- `encoding: "none"` indicates no additional encoding
|
|
|
|
#### 4.5 Step 5: Julia Backend Receives and Downloads
|
|
|
|
```julia
|
|
# Julia backend
|
|
transport_msg = transport_subscription.next();
|
|
env = smartunpack(String(transport_msg.payload));
|
|
|
|
# msghandler automatically:
|
|
# 1. Extracts URL from payload
|
|
# 2. Downloads with exponential backoff
|
|
# 3. Deserializes to binary data
|
|
```
|
|
|
|
**Rationale**:
|
|
- Exponential backoff handles transient failures
|
|
- Automatic download simplifies receiver code
|
|
- Binary data returned directly
|
|
|
|
---
|
|
|
|
## 5. User Scenario 3: Tabular Data Exchange
|
|
|
|
### Scenario Description
|
|
|
|
A Python application sends tabular data (pandas DataFrame) to a Julia backend for analysis, and receives processed results back.
|
|
|
|
### Step-by-Step Flow
|
|
|
|
#### 5.1 Step 1: Python Sends Tabular Data
|
|
|
|
```python
|
|
# Python
|
|
import pandas as pd
|
|
from msghandler import smartpack
|
|
|
|
df = pd.DataFrame({
|
|
"id": [1, 2, 3],
|
|
"name": ["Alice", "Bob", "Charlie"],
|
|
"score": [95, 88, 92]
|
|
});
|
|
|
|
env, msg_json = await smartpack(
|
|
"/agent/wine/api/v1/analyze",
|
|
[("data", df, "arrowtable")],
|
|
broker_url=DEFAULT_BROKER_URL,
|
|
receiver_name="agent-backend"
|
|
);
|
|
```
|
|
|
|
**Rationale**:
|
|
- `arrowtable` type for efficient tabular data transfer
|
|
- Arrow IPC format preserves data types
|
|
- Much faster than JSON serialization
|
|
|
|
#### 5.2 Step 2: Serialization to Arrow IPC
|
|
|
|
```python
|
|
# msghandler internally:
|
|
import pyarrow as pa
|
|
import pyarrow.ipc as ipc
|
|
|
|
table = pa.Table.from_pandas(df);
|
|
buf = io.BytesIO();
|
|
sink = ipc.new_file(buf, table.schema);
|
|
ipc.write_table(table, sink);
|
|
arrow_bytes = buf.getvalue();
|
|
```
|
|
|
|
**Rationale**:
|
|
- Arrow IPC preserves column types
|
|
- Binary format is compact
|
|
- No schema information loss
|
|
|
|
#### 5.3 Step 3: Julia Receives and Deserializes
|
|
|
|
```julia
|
|
# Julia backend
|
|
transport_msg = transport_subscription.next();
|
|
env = smartunpack(String(transport_msg.payload));
|
|
|
|
# env["payloads"][1] is now:
|
|
# ("data", DataFrame with id, name, score columns, "arrowtable")
|
|
```
|
|
|
|
**Rationale**:
|
|
- Arrow.jl reads IPC format directly
|
|
- DataFrame returned with correct types
|
|
- No manual parsing needed
|
|
|
|
#### 5.4 Step 4: Julia Sends Results
|
|
|
|
```julia
|
|
# Julia backend
|
|
results = analyze_data(env["payloads"][1][2]);
|
|
|
|
# Send results back
|
|
env, msg_json = smartpack(
|
|
"/agent/wine/api/v1/results",
|
|
[("results", results, "arrowtable")],
|
|
reply_to = "/python/worker/v1/results"
|
|
);
|
|
```
|
|
|
|
**Rationale**:
|
|
- Arrow IPC format for efficient round-trip
|
|
- Results preserve DataFrame structure
|
|
- Python can deserialize to pandas DataFrame
|
|
|
|
---
|
|
|
|
## 6. User Scenario 4: Rust Service with Type-Safe API
|
|
|
|
### Scenario Description
|
|
|
|
A Rust service needs to process messages from a Julia analytics pipeline and send typed results back. The Rust implementation leverages compile-time type safety via Rust enums and serde for serialization.
|
|
|
|
### Step-by-Step Flow
|
|
|
|
#### 6.1 Step 1: Rust Service Receives Message
|
|
|
|
```rust
|
|
// Rust service - using tokio async runtime
|
|
use msghandler::{smartunpack, MsgEnvelopeV1};
|
|
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
let conn = transport_client::connect("ws://localhost:4222").unwrap();
|
|
|
|
// Subscribe and receive messages
|
|
let mut sub = conn.subscribe("/agent/wine/api/v1/analyze").unwrap();
|
|
|
|
for msg in sub.messages() {
|
|
let envelope = smartunpack(
|
|
&String::from_utf8_lossy(&msg.payload),
|
|
&Default::default(),
|
|
).await.unwrap();
|
|
|
|
// Access deserialized payloads by type
|
|
for payload in &envelope.payloads {
|
|
match payload.payload_type.as_str() {
|
|
"arrowtable" => {
|
|
// Data is base64-encoded Arrow IPC bytes after smartunpack()
|
|
let arrow_bytes = BASE64.decode(&payload.data).unwrap();
|
|
println!("Received arrowtable payload ({} bytes)", arrow_bytes.len());
|
|
},
|
|
"text" => {
|
|
// Data is the decoded text string
|
|
println!("Message: {}", payload.data);
|
|
},
|
|
"image" | "audio" | "video" | "binary" => {
|
|
// Data is base64-encoded binary content
|
|
let bytes = BASE64.decode(&payload.data).unwrap();
|
|
println!("Received {} bytes of {} data", bytes.len(), payload.payload_type);
|
|
},
|
|
"dictionary" | "jsontable" => {
|
|
// Data is a JSON string
|
|
println!("Data: {}", payload.data);
|
|
},
|
|
_ => println!("Unknown payload type: {}", payload.payload_type),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
**Rationale**:
|
|
- **serde serialization**: Automatic JSON deserialization to `MsgEnvelopeV1`
|
|
- **tokio runtime**: Efficient async I/O for transport and HTTP operations
|
|
- **smartunpack deserialization**: Payload data is deserialized and stored as strings in `payload.data`
|
|
- **Type dispatch**: `payload_type` field determines how to interpret the `data` string
|
|
|
|
#### 6.2 Step 2: Rust Service Sends Processed Results
|
|
|
|
```rust
|
|
// Rust service sends results back with mixed payload types
|
|
use msghandler::{smartpack, Payload, smartpackOptions};
|
|
|
|
let results_df = /* processed Arrow table */;
|
|
let result_bytes = /* serialize to Arrow IPC */;
|
|
|
|
let (envelope, json_str) = smartpack(
|
|
"/agent/wine/api/v1/results",
|
|
&[
|
|
(
|
|
"results".to_string(),
|
|
Payload::ArrowTable(result_bytes),
|
|
"arrowtable".to_string(),
|
|
),
|
|
(
|
|
"summary".to_string(),
|
|
Payload::Text("Analysis complete: 1500 rows processed".to_string()),
|
|
"text".to_string(),
|
|
),
|
|
],
|
|
&smartpackOptions {
|
|
broker_url: DEFAULT_BROKER_URL.to_string(),
|
|
reply_to: "/python/worker/v1/results".to_string(),
|
|
msg_purpose: "chat".to_string(),
|
|
..Default::default()
|
|
},
|
|
).await?;
|
|
|
|
// Caller publishes via transport
|
|
conn.publish("/agent/wine/api/v1/results", &json_str)?;
|
|
```
|
|
|
|
**Rationale**:
|
|
- **Builder pattern**: `smartpackOptions` provides clean configuration
|
|
- **Enum-based payloads**: Type safety prevents sending incorrect data types
|
|
- **Default options**: sensible defaults reduce boilerplate
|
|
- **Result<T, E>**: idiomatic Rust error handling
|
|
|
|
#### 6.3 Step 3: Python/Julia Receives Rust Response
|
|
|
|
```python
|
|
# Python backend receives Rust response
|
|
env = await smartunpack(str(transport_msg.payload));
|
|
|
|
# env["payloads"][0] is now:
|
|
# ("results", arrow_table_data, "arrowtable")
|
|
# env["payloads"][1] is now:
|
|
# ("summary", "Analysis complete: 1500 rows processed", "text")
|
|
```
|
|
|
|
**Rationale**:
|
|
- **Cross-platform parity**: Rust envelope matches other platform envelopes exactly
|
|
- **Same JSON wire format**: No protocol translation needed
|
|
- **Type preservation**: Arrow IPC and text types preserved across all platforms
|
|
|
|
#### 6.4 Step 4: Large File Transfer from Rust
|
|
|
|
```rust
|
|
// Rust service sends large binary file via link transport
|
|
let large_file_data: Vec<u8> = std::fs::read("/data/large_dataset.parquet")?;
|
|
|
|
let (envelope, json_str) = smartpack(
|
|
"/agent/wine/api/v1/upload",
|
|
&[
|
|
(
|
|
"dataset".to_string(),
|
|
Payload::Binary(large_file_data),
|
|
"binary".to_string(),
|
|
),
|
|
],
|
|
&smartpackOptions {
|
|
broker_url: DEFAULT_BROKER_URL.to_string(),
|
|
fileserver_url: DEFAULT_FILESERVER_URL.to_string(),
|
|
size_threshold: DEFAULT_SIZE_THRESHOLD, // threshold triggers link transport
|
|
..Default::default()
|
|
},
|
|
).await?;
|
|
```
|
|
|
|
**Rationale**:
|
|
- **Automatic transport selection**: Same 0.5MB threshold as other desktop platforms
|
|
- **reqwest integration**: Efficient HTTP client for file server upload/download
|
|
- **Exponential backoff**: Built-in retry with configurable parameters
|
|
- **Zero-copy where possible**: `Vec<u8>` passed directly without intermediate copies
|
|
|
|
---
|
|
|
|
## 7. User Scenario 5: MicroPython Device
|
|
|
|
### Scenario Description
|
|
|
|
A MicroPython sensor device sends sensor readings to a Python backend.
|
|
|
|
### Step-by-Step Flow
|
|
|
|
#### 7.1 Step 1: MicroPython Sends Sensor Data
|
|
|
|
```python
|
|
# MicroPython
|
|
from msghandler import smartpack
|
|
|
|
sensor_data = {
|
|
"temperature": 25.5,
|
|
"humidity": 60.0,
|
|
"pressure": 1013.25
|
|
};
|
|
|
|
env, msg_json = smartpack(
|
|
"/sensor/device/v1/readings",
|
|
[("data", sensor_data, "dictionary")],
|
|
broker_url=DEFAULT_BROKER_URL,
|
|
size_threshold=100000 # 100KB for MicroPython
|
|
);
|
|
```
|
|
|
|
**Rationale**:
|
|
- `dictionary` type for JSON-serializable sensor data
|
|
- Smaller threshold (100KB) for memory constraints
|
|
- Direct transport only (no file server support)
|
|
|
|
#### 7.2 Step 2: Serialization
|
|
|
|
```python
|
|
# msghandler internally:
|
|
json_str = json.dumps(sensor_data);
|
|
json_bytes = json_str.encode('utf-8');
|
|
payload_b64 = base64.b64encode(json_bytes).decode('ascii');
|
|
```
|
|
|
|
**Rationale**:
|
|
- JSON format for human-readable data
|
|
- Base64 for transport compatibility
|
|
- UTF-8 for text encoding
|
|
|
|
#### 7.3 Step 3: Python Backend Receives
|
|
|
|
```python
|
|
# Python backend
|
|
transport_msg = await transport_consumer.next();
|
|
env = await smartunpack(str(transport_msg.payload));
|
|
|
|
# env["payloads"][0] is now:
|
|
# ("data", {"temperature": 25.5, "humidity": 60.0, ...}, "dictionary")
|
|
```
|
|
|
|
**Rationale**:
|
|
- JSON deserialization
|
|
- Dictionary returned directly
|
|
- No Arrow support (memory constraints)
|
|
|
|
---
|
|
|
|
## 8. User Scenario 6: Cross-Platform Chat with Mixed Payloads
|
|
|
|
### Scenario Description
|
|
|
|
Multiple platforms (JavaScript, Python, Julia) communicate in a chat application with mixed payload types.
|
|
|
|
### Step-by-Step Flow
|
|
|
|
#### 8.1 Step 1: JavaScript Sends Chat Message
|
|
|
|
```javascript
|
|
// JavaScript (Frontend)
|
|
const [env, msgJson] = await msghandler.smartpack(
|
|
"/chat/user/v1/message",
|
|
[
|
|
["text", "Check this out!", "text"],
|
|
["image", imageData, "image"]
|
|
],
|
|
{
|
|
broker_url: "ws://localhost:4222",
|
|
receiver_name: "",
|
|
msg_purpose: "chat"
|
|
}
|
|
);
|
|
```
|
|
|
|
**Rationale**:
|
|
- Empty `receiver_name` = broadcast to all subscribers
|
|
- Chat messages often include text + images
|
|
- Transport wildcard subscriptions route to correct recipients
|
|
|
|
#### 8.2 Step 2: Python Backend Receives
|
|
|
|
```python
|
|
# Python (Backend)
|
|
transport_msg = await transport_consumer.next();
|
|
env = await smartunpack(str(transport_msg.payload));
|
|
|
|
# env["payloads"] is now:
|
|
# [
|
|
# ("text", "Check this out!", "text"),
|
|
# ("image", binary_data, "image")
|
|
# ]
|
|
```
|
|
|
|
**Rationale**:
|
|
- Consistent API across platforms
|
|
- Same payload structure regardless of sender
|
|
- Type information preserved
|
|
|
|
#### 8.3 Step 3: Julia Backend Receives
|
|
|
|
```julia
|
|
# Julia (Backend)
|
|
transport_msg = transport_subscription.next();
|
|
env = smartunpack(String(transport_msg.payload));
|
|
|
|
# env["payloads"] is now:
|
|
# [
|
|
# ("text", "Check this out!", "text"),
|
|
# ("image", binary_data, "image")
|
|
# ]
|
|
```
|
|
|
|
**Rationale**:
|
|
- Cross-platform API parity
|
|
- Same function signature across platforms
|
|
- Type information enables proper deserialization
|
|
|
|
#### 8.4 Step 4: All Platforms Reply
|
|
|
|
Each platform can reply using the same API:
|
|
|
|
```python
|
|
# Python reply
|
|
await smartpack(
|
|
"/chat/user/v1/reply",
|
|
[("response", "Nice!", "text")],
|
|
reply_to="/chat/user/v1/message"
|
|
);
|
|
```
|
|
|
|
```julia
|
|
# Julia reply
|
|
smartpack(
|
|
"/chat/user/v1/reply",
|
|
[("response", "Nice!", "text")],
|
|
reply_to="/chat/user/v1/message"
|
|
);
|
|
```
|
|
|
|
```javascript
|
|
// JavaScript reply
|
|
await msghandler.smartpack(
|
|
"/chat/user/v1/reply",
|
|
[["response", "Nice!", "text"]],
|
|
{ reply_to: "/chat/user/v1/message" }
|
|
);
|
|
```
|
|
|
|
**Rationale**:
|
|
- Same API across platforms
|
|
- Consistent behavior
|
|
- Easy to maintain parity
|
|
|
|
---
|
|
|
|
## 9. Error Handling
|
|
|
|
### 9.1 Common Error Scenarios
|
|
|
|
| Scenario | Error | Recovery |
|
|
|----------|-------|----------|
|
|
| File server unavailable | `UPLOAD_FAILED` | Fall back to direct transport or smaller payloads |
|
|
| File server download fails | `DOWNLOAD_FAILED` | Retry with exponential backoff |
|
|
| Payload type mismatch | `DESERIALIZATION_ERROR` | Validate payload_type matches data |
|
|
| Transport connection lost | `TRANSPORT_CONNECTION_FAILED` | Transport client auto-reconnects |
|
|
|
|
### 9.2 Error Response Format
|
|
|
|
```json
|
|
{
|
|
"correlation_id": "abc123...",
|
|
"error": {
|
|
"code": "DOWNLOAD_FAILED",
|
|
"message": "Failed to fetch data after 5 attempts",
|
|
"details": {
|
|
"url": "http://localhost:8080/file/...",
|
|
"correlation_id": "abc123..."
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 10. Debugging and Tracing
|
|
|
|
### 10.1 Correlation ID Tracking
|
|
|
|
Every message includes a `correlation_id`:
|
|
|
|
```julia
|
|
# At start of request
|
|
correlation_id = string(uuid4());
|
|
|
|
# Use throughout the flow
|
|
log_trace(correlation_id, "Starting smartpack");
|
|
log_trace(correlation_id, "Serialized payload size: 100 bytes");
|
|
log_trace(correlation_id, "Published to transport");
|
|
```
|
|
|
|
**Log Format**:
|
|
```
|
|
[2026-03-13T16:30:00.000Z] [Correlation: abc123...] Starting smartpack
|
|
[2026-03-13T16:30:00.001Z] [Correlation: abc123...] Serialized payload size: 100 bytes
|
|
[2026-03-13T16:30:00.002Z] [Correlation: abc123...] Published to transport
|
|
```
|
|
|
|
**Log Format**:
|
|
```
|
|
[2026-03-13T16:30:00.000Z] [Correlation: abc123...] Starting smartpack
|
|
[2026-03-13T16:30:00.001Z] [Correlation: abc123...] Serialized payload size: 100 bytes
|
|
[2026-03-13T16:30:00.002Z] [Correlation: abc123...] Published to transport
|
|
```
|
|
|
|
---
|
|
|
|
## 11. Performance Considerations
|
|
|
|
### 11.1 Optimization Strategies
|
|
|
|
| Strategy | Description | When to Use |
|
|
|----------|-------------|-------------|
|
|
| Pre-create transport connection | Reuse connection for multiple sends | High-throughput scenarios |
|
|
| Adjust size threshold | Increase threshold if file server slow | File server bottleneck |
|
|
| Use direct transport | Avoid file server for small payloads | Low latency requirements |
|
|
|
|
### 11.2 Size Threshold by Platform
|
|
|
|
| Platform | Threshold | Notes |
|
|
|----------|-----------|-------|
|
|
| Desktop (Julia/JS/Python/Dart) | 500,000 bytes (0.5MB) | Default threshold |
|
|
| Dart Desktop | 500,000 bytes (0.5MB) | Default threshold |
|
|
| Dart Flutter | 500,000 bytes (0.5MB) | Default threshold |
|
|
| Dart Web | 500,000 bytes (0.5MB) | Default threshold |
|
|
| MicroPython | 100,000 bytes (100KB) | Lower threshold for memory constraints |
|
|
|
|
---
|
|
|
|
## 12. Deployment Considerations
|
|
|
|
### 12.1 Minimum Infrastructure
|
|
|
|
| Component | Minimum | Notes |
|
|
|-----------|---------|-------|
|
|
| Message Broker | 1 instance | Single node for development |
|
|
| File Server | 1 instance | HTTP server for large payloads |
|
|
| Client Memory | 50MB | Desktop platforms (Julia/JS/Python/Dart) |
|
|
| Client Memory | 256KB | MicroPython devices |
|
|
|
|
### 12.2 Environment Variables
|
|
|
|
| Variable | Default | Description |
|
|
|----------|---------|-------------|
|
|
| `BROKER_URL` | `ws://localhost:4222` | Message broker URL |
|
|
| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL |
|
|
| `SIZE_THRESHOLD` | `500000` | Size threshold in bytes (0.5MB) |
|
|
|
|
---
|
|
|
|
## Change Log
|
|
|
|
| Date | Version | Changes |
|
|
|------|---------|---------|
|
|
| 2026-05-22 | 1.5.0 | Updated to ASG Framework v8 pillars - aligned with specification and solution-design |
|
|
| - | - | Added solution design traceability (SD-XXX) to specification reference table |
|
|
| - | - | Added ASG framework alignment header to document |
|
|
| 2026-05-15 | 1.4.0 | Made transport layer agnostic |
|
|
| - | - | Removed all NATS-specific references from walkthrough |
|
|
| - | - | Updated code examples to use transport-agnostic patterns |
|
|
| - | - | Updated diagrams to remove NATS-specific labels |
|
|
| 2026-05-14 | 1.3.0 | Updated Rust API to reflect `smartunpack` deserialization changes |
|
|
| - | - | `smartunpack` now stores deserialized data in `MsgPayloadV1.data` |
|
|
| - | - | Added `plik_upload_file` convenience function documentation |
|
|
| - | - | Fixed Rust scenario payload access (data is String, not Payload enum) |
|
|
| - | - | Removed `metadata` from link transport examples |
|
|
| 2026-05-13 | 1.2.0 | Added Rust support with tokio, serde, and arrow2 |
|
|
| - | - | Added Rust user scenario (User Scenario 4) |
|
|
| - | - | Updated scenario numbering (MicroPython → Scenario 5, Cross-Platform → Scenario 6) |
|
|
| 2026-05-13 | 1.1.0 | Aligned with ground truth implementation (src/msghandler.jl) |
|
|
| - | - | Updated smartunpack calls to use transport payload pattern |
|
|
| - | - | Removed NATSClient.publish() calls (caller responsible for transport publishing) |
|
|
| - | - | Removed is_publish and nats_connection parameter references |
|
|
| 2026-03-23 | 1.0.0 | Updated to ASG Framework walkthrough guidelines |
|
|
| 2026-03-13 | 1.0.0 | Initial walkthrough documentation |
|
|
|
|
---
|
|
|
|
## 15. Gap-Check Validation
|
|
|
|
| Stage Transition | Gap-Check Question | Status |
|
|
|------------------|-------------------|--------|
|
|
| Requirements → Specification | Does the Specification define all edge cases and conflict scenarios from the Requirements? | ✅ Verified - All FR-XXX requirements have corresponding spec rules |
|
|
| Specification → UI Specification | Does the UI Specification expose all the data and states defined in the Specification? | ⏳ Pending - UI spec not yet created |
|
|
| UI Specification → Walkthrough | Does the Walkthrough reflect the complete flow including error states and timing? | ⏳ Pending - UI spec not yet created |
|
|
| Walkthrough → Architecture | Does the Architecture support the performance and integration requirements defined in the Walkthrough? | ⏳ Pending - Architecture not yet created |
|
|
|
|
---
|
|
|
|
## 16. References
|
|
|
|
### 16.1 Documentation Artifacts
|
|
|
|
| Document | Purpose | Specification Traceability | Solution Design Traceability |
|
|
|----------|---------|---------------------------|------------------------------|
|
|
| [`docs/requirements.md`](./requirements.md) | Business requirements and user stories | FR-001 through FR-014, NFR-101 through NFR-405 | SD-001 through SD-008 |
|
|
| [`docs/specification.md`](./specification.md) | Technical contract for msghandler | specification.md:2-19 (all sections) | SD-001 through SD-008 |
|
|
| [`docs/ui-specification.md`](./ui-specification.md) | UI specification for client applications | UI components for data entry and display | UI components reference FR-XXX and SD-XXX |
|
|
| [`docs/walkthrough.md`](./walkthrough.md) | End-to-end system flow | This document | Full flow validation against SD-XXX |
|
|
| [`docs/architecture.md`](./architecture.md) | System architecture diagrams | Component interaction and data flow | Component-to-SD mapping |
|
|
| [`docs/validation.md`](./validation.md) | CI/CD validation rules | Contract testing and spec compliance | Validation gates for SD-XXX |
|
|
| [`docs/runbook.md`](./runbook.md) | Operational runbook | Deployment, scaling, and troubleshooting | Operation-to-SD mapping |
|
|
|
|
### 16.2 Implementation Files
|
|
|
|
| File | Platform | Features | Specification Traceability | Solution Design Traceability |
|
|
|------|----------|----------|---------------------------|------------------------------|
|
|
| [`src/msghandler.jl`](../src/msghandler.jl) | Julia | Full feature set, Arrow IPC, multiple dispatch | FR-001 through FR-014, NFR-101 through NFR-405 | SD-001 through SD-008 |
|
|
| [`src/msghandler_ssr.js`](../src/msghandler_ssr.js) | Node.js | Arrow IPC, async/await | FR-001 through FR-014, NFR-101 through NFR-405 | SD-001 through SD-008 |
|
|
| [`src/msghandler_csr.js`](../src/msghandler_csr.js) | Browser | JSON table only | FR-001 through FR-014, NFR-101 through NFR-405 | SD-001 through SD-008 |
|
|
| [`src/msghandler.py`](../src/msghandler.py) | Python | Arrow IPC, async/await | FR-001 through FR-014, NFR-101 through NFR-405 | SD-001 through SD-008 |
|
|
| [`src/msghandler.dart`](../src/msghandler.dart) | Dart | Full feature set, Arrow IPC, async/await | FR-001 through FR-014, NFR-101 through NFR-405 | SD-001 through SD-008 |
|
|
| [`src/msghandler.rs`](../src/msghandler.rs) | Rust | Full feature set, Arrow IPC, async/await, type-safe, file upload helpers | FR-001 through FR-014, NFR-101 through NFR-405 | SD-001 through SD-008 |
|
|
| [`src/msghandler_mpy.py`](../src/msghandler_mpy.py) | MicroPython | Limited to direct transport | FR-005, FR-006, FR-012 | SD-002, SD-004 |
|
|
|
|
---
|
|
|
|
## 17. Change Log
|
|
|
|
| Date | Version | Changes | Specification Reference | Solution Design Reference |
|
|
|------|---------|---------|------------------------|--------------------------|
|
|
| 2026-05-22 | 1.5.0 | Updated to ASG Framework v8 pillars - aligned with specification and solution-design | All sections | All SD-XXX |
|
|
| 2026-05-15 | 1.4.0 | Made transport layer agnostic | All sections | SD-001 through SD-008 |
|
|
| 2026-05-14 | 1.3.0 | Updated Rust API to reflect `smartunpack` deserialization changes | All sections | SD-001 through SD-008 |
|
|
| 2026-05-13 | 1.2.0 | Added Rust support with tokio, serde, and arrow2 | specification.md:11 (Rust API) | SD-001 through SD-008 |
|
|
| 2026-05-13 | 1.1.0 | Aligned with ground truth implementation (src/msghandler.jl) | All sections | SD-001 through SD-008 |
|
|
| 2026-03-23 | 1.0.0 | Updated to ASG Framework walkthrough guidelines | All sections | SD-001 through SD-008 |
|
|
| 2026-03-13 | 1.0.0 | Initial walkthrough documentation | specification.md:2-19 (all sections) | SD-001 through SD-008 |
|
|
|
|
---
|
|
|
|
*This walkthrough document is versioned and maintained in git alongside the codebase. All implementations must adhere to this documentation.*
|
|
|
|
---
|
|
|
|
## 18. ASG Framework Validation
|
|
|
|
| Pillar | Status | Reference |
|
|
|--------|--------|-----------|
|
|
| Requirements | ✅ Complete | requirements.md: FR-001 through FR-014, NFR-101 through NFR-405 |
|
|
| Solution Design | ✅ Complete | solution-design.md: SD-001 through SD-008 |
|
|
| Specification | ✅ Complete | specification.md: Section 2-19 |
|
|
| Walkthrough | ✅ Complete | walkthrough.md: Sections 2-17 |
|
|
| Implementation Plan | ⏳ Pending | implementation-plan.md |
|
|
| Validation | ⏳ Pending | validation.md |
|
|
| Runbook | ⏳ Pending | runbook.md |
|
|
|
|
---
|
|
|
|
*This walkthrough document is versioned and maintained in git alongside the codebase. All implementations must adhere to this documentation.* |