Files
msghandler/docs/walkthrough.md
2026-05-22 19:42:34 +07:00

41 KiB

Walkthrough: msghandler

Version: 1.6.0
Date: 2026-05-22
Status: Active
Ground Truth: src/msghandler.jl
ASG Framework Alignment: v8 pillars - Requirements → Solution Design → Specification → Walkthrough → Implementation Plan → Validation → Runbook


1. Executive Summary

This document provides the end-to-end trace for msghandler - the cross-platform bi-directional data bridge that enables seamless communication between Julia, JavaScript, Python, Dart, Rust, and MicroPython applications using a message broker as the transport layer.

This walkthrough serves as the primary onboarding guide for new developers and explains:

  • User scenarios - Real-world use cases from developer perspective
  • Why steps are sequenced - The rationale behind architectural decisions
  • What could go wrong - Common failure scenarios and recovery strategies
  • Transport integration - Complete end-to-end flow with publish/subscribe patterns

1.1 Specification Traceability

Walkthrough Section Specification Reference Requirement ID(s) Solution Design Ref(s) Description
Section 2 (Big Picture) specification.md:2, specification.md:15 FR-001, FR-002, FR-003, FR-004, FR-005, FR-006, FR-007, FR-012, FR-013, FR-014 SD-001, SD-002, SD-005, SD-006 End-to-end system flow diagrams and transport pattern
Section 3 (Chat Scenario) specification.md:2, specification.md:3, specification.md:5, specification.md:11 FR-001, FR-006, FR-007, FR-012, FR-013, FR-014 SD-001, SD-004, SD-005, SD-006 Chat webapp ↔ Julia backend with mixed payloads
Section 4 (Large File) specification.md:6, specification.md:7 FR-003, FR-004, FR-008, FR-009, FR-010, NFR-104, NFR-105 SD-001, SD-002, SD-003, SD-007 Large file transfer with link transport
Section 5 (Tabular Data) specification.md:5, specification.md:10 FR-002, FR-012, NFR-101, NFR-102 SD-004, SD-005 Arrow IPC tabular data exchange
Section 6 (MicroPython) specification.md:13, specification.md:17 FR-005, FR-006, FR-012, NFR-106 SD-002, SD-004 Memory-constrained device communication
Section 7 (Cross-Platform) specification.md:3, specification.md:4, specification.md:5, specification.md:11 FR-001, FR-002, FR-003, FR-004, FR-005, FR-006, FR-007, FR-012, FR-013, FR-014 SD-001, SD-002, SD-004, SD-005, SD-006 Multi-platform chat application
Section 10 (Error Handling) specification.md:9 FR-008, FR-009, FR-010, NFR-201, NFR-202, NFR-203 SD-003, SD-007 Common error scenarios and recovery
Section 11 (Debugging) specification.md:4, specification.md:11 FR-011, NFR-401, NFR-403 SD-008 Correlation ID tracking
Section 12 (Performance) specification.md:7, specification.md:13 NFR-101, NFR-102, NFR-103, NFR-104, NFR-105, NFR-106, NFR-107 SD-001, SD-002, SD-006 Optimization strategies
Section 13 (Deployment) specification.md:12, specification.md:18 FR-013, FR-014, NFR-201, NFR-203 SD-006 Infrastructure requirements
Section 9 (Transport Layer) specification.md:3, specification.md:5 FR-013, FR-014 SD-006 Complete end-to-end transport integration examples

2. Overview: The Big Picture

msghandler implements the Claim-Check pattern for efficient handling of large payloads (>0.5MB):

2.0 Transport Layer Pattern

Critical: msghandler is transport-agnostic. The caller is responsible for:

  1. Sending side: Call smartpack() → receive (envelope, json_string) → publish JSON string via transport
  2. Receiving side: Subscribe to transport → receive JSON string → call smartunpack()
flowchart LR
    subgraph msghandler["msghandler Module"]
        direction TB
        S1["smartpack(data)"]
        S2["Returns (envelope, json_str)"]
        R1["smartunpack(json_str)"]
        R2["Returns payloads"]
    end
    
    subgraph Transport["Transport Layer (Caller's Responsibility)"]
        direction TB
        PUBLISH["Publish JSON string"]
        SUBSCRIBE["Subscribe to messages"]
    end
    
    S1 --> S2
    S2 --> PUBLISH
    SUBSCRIBE --> R1
    R1 --> R2
    PUBLISH --> SUBSCRIBE
    
    style msghandler fill:#b3e5fc,stroke:#0288d1
    style Transport fill:#ffe0b2,stroke:#f57c00

Supported Transports:

  • NATS (WebSocket/TCP)
  • MQTT (WebSocket/TCP)
  • WebSocket (direct HTTP)
  • HTTP (polling/long-polling)

Key Principle: msghandler only handles serialization/deserialization. Transport is handled by caller.

flowchart TB
    subgraph msghandler["msghandler Module"]
        direction TB
        
        subgraph Sender["Sender (smartpack)"]
            direction LR
            S1["Data Tuples<br/>[(dataname, data, type)]"]
            S2["Serialize Data"]
            S3["Size Check"]
            S4["Transport Selection"]
            S5["Build Envelope"]
            S6["Publish to transport"]
            
            S1 --> S2
            S2 --> S3
            S3 --> S4
            S4 --> S5
            S5 --> S6
        end
        
        subgraph Receiver["Receiver (smartunpack)"]
            direction LR
            R1["Subscribe via transport"]
            R2["Parse Envelope"]
            R3["Check Transport"]
            R4["Deserialize Data"]
            R5["Return Payloads"]
            
            R1 --> R2
            R2 --> R3
            R3 --> R4
            R4 --> R5
        end
        
        S6 -.->|Message| R1
    end
    
    subgraph FileServer["HTTP File Server (Plik)"]
        direction TB
        FS1["Upload URL"]
        FS2["Download URL"]
        
        S4 -.->|Large Payload| FS1
        FS1 -.->|URL| S5
        R3 -.->|Fetch URL| FS2
    end
    
    style msghandler fill:#e1f5fe,stroke:#0288d1,stroke-width:2px
    style Sender fill:#b3e5fc,stroke:#0288d1
    style Receiver fill:#b3e5fc,stroke:#0288d1
    style FileServer fill:#ffe0b2,stroke:#f57c00

2.2 Key Design Principles

Principle Description Rationale
Claim-Check Pattern Large payloads uploaded to HTTP server, URL sent via transport Transport has message size limits; avoids overflow
Automatic Transport Selection Direct (< threshold) vs Link (≥ threshold) based on size Optimizes memory vs network I/O trade-off
Cross-Platform API Consistent smartpack()/smartunpack() across all platforms Simplifies developer experience
Exponential Backoff Retry downloads with increasing delays Handles transient failures gracefully
Transport Agnostic Caller handles transport (NATS/MQTT/WebSocket) No vendor lock-in; works with any broker

3. User Scenario 1: Chat Webapp ↔ Julia Backend

Scenario Description

A JavaScript chat webapp wants to send mixed payloads (text message + user avatar image) to a Julia backend, and receive mixed payloads (text response + AI-generated image) back.

Step-by-Step Flow

3.1 Step 1: JavaScript Webapp Sends Mixed Payloads

// JavaScript (Browser or Node.js)
const [env, msgJson] = await msghandler.smartpack(
    "/agent/wine/api/v1/prompt",
    [
        ["msg", "Hello! I'm Ton.", "text"],
        ["avatar", avatarImageData, "image"]
    ],
    {
        broker_url: "ws://localhost:4222",
        receiver_name: "agent-backend",
        msg_purpose: "chat"
    }
);

Rationale:

  • Why mixed payloads? Real chat apps often send both text and images together
  • Why text first? Text is smaller, sent via direct transport (fast, no file server needed)
  • Why image second? Images may trigger link transport if >0.5MB

3.1b Step 1b: JavaScript Publishes via Transport

// Publish the JSON string via WebSocket (or NATS/MQTT)
const conn = await transportClient.connect({ servers: "ws://localhost:4222" });
await conn.publish("/agent/wine/api/v1/prompt", msgJson);

3.2 Step 2: Transport Selection

For each payload, msghandler determines transport:

Payload Size Transport Reason
"msg" (text) ~20 bytes direct < 0.5MB threshold
"avatar" (image) ~150KB direct < 0.5MB threshold

Rationale:

  • Direct transport is faster for small payloads (no file server round-trip)
  • Link transport is used when payload ≥ 0.5MB (avoids transport size limits)

3.3 Step 3: Serialization and Encoding

Each payload is serialized:

Payload Type Serialization Encoding
"msg" text UTF-8 bytes Base64
"avatar" image Raw bytes Base64

Rationale:

  • Text uses UTF-8 encoding for human-readable data
  • Images use raw bytes to preserve binary data integrity
  • All payloads encoded as Base64 for JSON compatibility

3.4 Step 4: Envelope Building

msghandler builds the message envelope:

{
  "correlation_id": "a1b2c3d4...",
  "msg_id": "e5f6g7h8...",
  "timestamp": "2026-03-13T16:30:00.000Z",
  "send_to": "/agent/wine/api/v1/prompt",
  "msg_purpose": "chat",
  "sender_name": "chat-webapp",
  "sender_id": "sender-uuid...",
  "receiver_name": "agent-backend",
  "receiver_id": "",
  "reply_to": "/agent/wine/api/v1/response",
  "reply_to_msg_id": "",
  "broker_url": "ws://localhost:4222",
  "metadata": {},
  "payloads": [
    {
      "id": "payload-uuid...",
      "dataname": "msg",
      "payload_type": "text",
      "transport": "direct",
      "encoding": "base64",
      "size": 20,
      "data": "SGVsbG8hIEknIHRlbCB5b3UgSW4gZW5nbGlzaC4=",
      "metadata": {"payload_bytes": 20}
    },
    {
      "id": "payload-uuid...",
      "dataname": "avatar",
      "payload_type": "image",
      "transport": "direct",
      "encoding": "base64",
      "size": 150000,
      "data": "iVBORw0KGgoAAAANSUhEUgAA...",
      "metadata": {"payload_bytes": 150000}
    }
  ]
}

Rationale:

  • correlation_id: Tracks this chat session across all systems
  • reply_to: Tells backend where to send response
  • payloads array: Contains all data with metadata for proper handling

3.5 Step 5: Publish JSON String via Transport

// JavaScript: Publish via WebSocket (NATS/MQTT/HTTP work similarly)
const conn = await transportClient.connect({ servers: "ws://localhost:4222" });
await conn.publish("/agent/wine/api/v1/prompt", msgJson);

3.6 Step 6: Julia Backend Receives and Unpacks

# Julia backend
transport_msg = transport_subscription.next()  # Get message from transport
env = smartunpack(String(transport_msg.payload))

# env["payloads"] is now:
# [
#     ("msg", "Hello! I'm Ton.", "text"),
#     ("avatar", binary_data, "image")
# ]

3.7 Step 7: Julia Backend Sends Response

# Julia backend processes the message
response_text = "Hello Ton! I'm the AI assistant."
generated_image = generate_ai_image(response_text);

env, msg_json = smartpack(
    "/agent/wine/api/v1/response",
    [
        ("response", response_text, "text"),
        ("generated_image", generated_image, "image")
    ],
    reply_to = "/chat/user/v1/message",
    reply_to_msg_id = msg["msg_id"]
);
publish(transport_conn, "/agent/wine/api/v1/response", msg_json);

Rationale:

  • Mixed response: Text explanation + AI-generated image
  • reply_to: Ensures response goes to correct topic
  • reply_to_msg_id: Links response to original message for tracing

4. User Scenario 2: Large File Transfer

Scenario Description

A JavaScript webapp wants to upload a large file (10MB) to a Julia backend for processing.

Step-by-Step Flow

4.1 Step 1: JavaScript Webapp Sends Large File

const [env, msgJson] = await msghandler.smartpack(
    "/agent/wine/api/v1/process",
    [
        ["file", largeFileData, "binary"]
    ],
    {
        broker_url: "ws://localhost:4222",
        receiver_name: "agent-backend"
    }
);

4.1b Step 1b: JavaScript Publishes via Transport

// Publish the JSON string via WebSocket (or NATS/MQTT)
const conn = await transportClient.connect({ servers: "ws://localhost:4222" });
await conn.publish("/agent/wine/api/v1/process", msgJson);
Payload Size Transport Reason
"file" 10MB link ≥ 0.5MB threshold

Rationale:

  • Link transport used for large payloads
  • File server handles large file upload
  • Transport only sends URL (small message)

4.3 Step 3: File Server Upload

// msghandler internally calls:
const response = await plikOneshotUpload(
    "http://localhost:8080",
    "file",
    largeFileData
);

// Response:
// {
//   status: 200,
//   uploadid: "UPLOAD_ID",
//   fileid: "FILE_ID",
//   url: "http://localhost:8080/file/UPLOAD_ID/FILE_ID/file"
// }

Rationale:

  • Plik handles multipart upload
  • One-shot mode simplifies API
  • Returns URL for download
{
  "correlation_id": "a1b2c3d4...",
  "payloads": [
    {
      "id": "payload-uuid...",
      "dataname": "file",
      "payload_type": "binary",
      "transport": "link",
      "encoding": "none",
      "size": 10000000,
      "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/file"
    }
  ]
}

Rationale:

  • data field contains URL instead of Base64
  • transport: "link" signals URL-based download
  • encoding: "none" indicates no additional encoding

4.5 Step 5: Julia Backend Receives, Downloads, and Unpacks

# Julia backend
transport_msg = transport_subscription.next();
env = smartunpack(String(transport_msg.payload));

# msghandler automatically:
# 1. Extracts URL from payload (link transport)
# 2. Downloads file with exponential backoff
# 3. Deserializes to binary data

Rationale:

  • Exponential backoff handles transient failures
  • Automatic download simplifies receiver code
  • Binary data returned directly

5. User Scenario 3: Tabular Data Exchange

Scenario Description

A Python application sends tabular data (pandas DataFrame) to a Julia backend for analysis, and receives processed results back.

Step-by-Step Flow

5.1 Step 1: Python Sends Tabular Data

# Python
import pandas as pd
from msghandler import smartpack

df = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Charlie"],
    "score": [95, 88, 92]
});

env, msg_json = await smartpack(
    "/agent/wine/api/v1/analyze",
    [("data", df, "arrowtable")],
    broker_url=DEFAULT_BROKER_URL,
    receiver_name="agent-backend"
);

Rationale:

  • arrowtable type for efficient tabular data transfer
  • Arrow IPC format preserves data types
  • Much faster than JSON serialization

5.1b Step 1b: Python Publishes via Transport

# Publish the JSON string via WebSocket/NATS/MQTT
await transport_publisher.publish("/agent/wine/api/v1/analyze", msg_json)

5.2 Step 2: Serialization to Arrow IPC

# msghandler internally:
import pyarrow as pa
import pyarrow.ipc as ipc

table = pa.Table.from_pandas(df);
buf = io.BytesIO();
sink = ipc.new_file(buf, table.schema);
ipc.write_table(table, sink);
arrow_bytes = buf.getvalue();

Rationale:

  • Arrow IPC preserves column types
  • Binary format is compact
  • No schema information loss

5.3 Step 3: Julia Receives and Deserializes

# Julia backend
transport_msg = transport_subscription.next();
env = smartunpack(String(transport_msg.payload));

# env["payloads"][1] is now:
# ("data", DataFrame with id, name, score columns, "arrowtable")

Rationale:

  • Arrow.jl reads IPC format directly
  • DataFrame returned with correct types
  • No manual parsing needed

5.4 Step 4: Julia Sends Results (Complete Round-Trip)

# Julia backend
results = analyze_data(env["payloads"][1][2]);

# Send results back
env, msg_json = smartpack(
    "/agent/wine/api/v1/results",
    [("results", results, "arrowtable")],
    reply_to = "/python/worker/v1/results"
);
publish(transport_conn, "/agent/wine/api/v1/results", msg_json);

Rationale:

  • Arrow IPC format for efficient round-trip
  • Results preserve DataFrame structure
  • Python can deserialize to pandas DataFrame

6. User Scenario 4: Rust Service with Type-Safe API

Scenario Description

A Rust service needs to process messages from a Julia analytics pipeline and send typed results back. The Rust implementation leverages compile-time type safety via Rust enums and serde for serialization.

Step-by-Step Flow

6.1 Step 1: Rust Service Receives Message

// Rust service - using tokio async runtime
use msghandler::{smartunpack, MsgEnvelopeV1};
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};

#[tokio::main]
async fn main() {
    let conn = transport_client::connect("ws://localhost:4222").unwrap();
    
    // Subscribe and receive messages
    let mut sub = conn.subscribe("/agent/wine/api/v1/analyze").unwrap();
    
    for msg in sub.messages() {
        let envelope = smartunpack(
            &String::from_utf8_lossy(&msg.payload),
            &Default::default(),
        ).await.unwrap();
        
        // Access deserialized payloads by type
        for payload in &envelope.payloads {
            match payload.payload_type.as_str() {
                "arrowtable" => {
                    // Data is base64-encoded Arrow IPC bytes after smartunpack()
                    let arrow_bytes = BASE64.decode(&payload.data).unwrap();
                    println!("Received arrowtable payload ({} bytes)", arrow_bytes.len());
                },
                "text" => {
                    // Data is the decoded text string
                    println!("Message: {}", payload.data);
                },
                "image" | "audio" | "video" | "binary" => {
                    // Data is base64-encoded binary content
                    let bytes = BASE64.decode(&payload.data).unwrap();
                    println!("Received {} bytes of {} data", bytes.len(), payload.payload_type);
                },
                "dictionary" | "jsontable" => {
                    // Data is a JSON string
                    println!("Data: {}", payload.data);
                },
                _ => println!("Unknown payload type: {}", payload.payload_type),
            }
        }
    }
}

Rationale:

  • serde serialization: Automatic JSON deserialization to MsgEnvelopeV1
  • tokio runtime: Efficient async I/O for transport and HTTP operations
  • smartunpack deserialization: Payload data is deserialized and stored as strings in payload.data
  • Type dispatch: payload_type field determines how to interpret the data string

6.2 Step 2: Rust Service Sends Processed Results

// Rust service sends results back with mixed payload types
use msghandler::{smartpack, Payload, smartpackOptions};

let results_df = /* processed Arrow table */;
let result_bytes = /* serialize to Arrow IPC */;

let (envelope, json_str) = smartpack(
    "/agent/wine/api/v1/results",
    &[
        (
            "results".to_string(),
            Payload::ArrowTable(result_bytes),
            "arrowtable".to_string(),
        ),
        (
            "summary".to_string(),
            Payload::Text("Analysis complete: 1500 rows processed".to_string()),
            "text".to_string(),
        ),
    ],
    &smartpackOptions {
        broker_url: DEFAULT_BROKER_URL.to_string(),
        reply_to: "/python/worker/v1/results".to_string(),
        msg_purpose: "chat".to_string(),
        ..Default::default()
    },
).await?;

// Caller publishes via transport (WebSocket/MQTT/NATS)
conn.publish("/agent/wine/api/v1/results", &json_str)?;

Rationale:

  • Builder pattern: smartpackOptions provides clean configuration
  • Enum-based payloads: Type safety prevents sending incorrect data types
  • Default options: sensible defaults reduce boilerplate
  • Result<T, E>: idiomatic Rust error handling

6.3 Step 3: Python/Julia Receives Rust Response

# Python backend receives Rust response
env = await smartunpack(str(transport_msg.payload));

# env["payloads"][0] is now:
# ("results", arrow_table_data, "arrowtable")
# env["payloads"][1] is now:
# ("summary", "Analysis complete: 1500 rows processed", "text")

Rationale:

  • Cross-platform parity: Rust envelope matches other platform envelopes exactly
  • Same JSON wire format: No protocol translation needed
  • Type preservation: Arrow IPC and text types preserved across all platforms

6.4 Step 4: Large File Transfer from Rust

// Rust service sends large binary file via link transport
let large_file_data: Vec<u8> = std::fs::read("/data/large_dataset.parquet")?;

let (envelope, json_str) = smartpack(
    "/agent/wine/api/v1/upload",
    &[
        (
            "dataset".to_string(),
            Payload::Binary(large_file_data),
            "binary".to_string(),
        ),
    ],
    &smartpackOptions {
        broker_url: DEFAULT_BROKER_URL.to_string(),
        fileserver_url: DEFAULT_FILESERVER_URL.to_string(),
        size_threshold: DEFAULT_SIZE_THRESHOLD,  // threshold triggers link transport
        ..Default::default()
    },
).await?;

// Caller publishes via transport
conn.publish("/agent/wine/api/v1/upload", &json_str)?;

Rationale:

  • Automatic transport selection: Same 0.5MB threshold as other desktop platforms
  • reqwest integration: Efficient HTTP client for file server upload/download
  • Exponential backoff: Built-in retry with configurable parameters
  • Zero-copy where possible: Vec<u8> passed directly without intermediate copies

7. User Scenario 5: MicroPython Device

Scenario Description

A MicroPython sensor device sends sensor readings to a Python backend.

Step-by-Step Flow

7.1 Step 1: MicroPython Sends Sensor Data

# MicroPython
from msghandler import smartpack

sensor_data = {
    "temperature": 25.5,
    "humidity": 60.0,
    "pressure": 1013.25
};

env, msg_json = smartpack(
    "/sensor/device/v1/readings",
    [("data", sensor_data, "dictionary")],
    broker_url=DEFAULT_BROKER_URL,
    size_threshold=100000  # 100KB for MicroPython
);

Rationale:

  • dictionary type for JSON-serializable sensor data
  • Smaller threshold (100KB) for memory constraints
  • Direct transport only (no file server support)

7.1b Step 1b: MicroPython Publishes via Transport

# Publish the JSON string via MQTT (common on IoT devices)
mqtt_client.publish("/sensor/device/v1/readings", msg_json)

7.2 Step 2: Serialization

# msghandler internally:
json_str = json.dumps(sensor_data);
json_bytes = json_str.encode('utf-8');
payload_b64 = base64.b64encode(json_bytes).decode('ascii');

Rationale:

  • JSON format for human-readable data
  • Base64 for transport compatibility
  • UTF-8 for text encoding

7.3 Step 3: Python Backend Receives

# Python backend
transport_msg = await transport_consumer.next();
env = await smartunpack(str(transport_msg.payload));

# env["payloads"][0] is now:
# ("data", {"temperature": 25.5, "humidity": 60.0, ...}, "dictionary")

Rationale:

  • JSON deserialization
  • Dictionary returned directly
  • No Arrow support (memory constraints)

7.4 Step 4: Complete MicroPython End-to-End Flow

flowchart LR
    subgraph MicroPython["MicroPython Device"]
        MPY1["sensor_data = {temp, humidity}"]
        MPY2["smartpack() → JSON"]
        MPY3["publish via MQTT"]
    end
    
    subgraph Transport["MQTT Broker"]
        direction TB
        MQTT["broker.local:1883"]
    end
    
    subgraph Python["Python Backend"]
        PY1["subscribe via MQTT"]
        PY2["smartunpack(JSON)"]
        PY3["get dictionary"]
    end
    
    MPY1 --> MPY2
    MPY2 --> MPY3
    MPY3 --> MQTT
    MQTT --> PY1
    PY1 --> PY2
    PY2 --> PY3

Key Points:

  • MicroPython limited to text and dictionary types (memory constraints)
  • Direct transport only (no file server support)
  • Size threshold: 100KB (vs 500KB for desktop)

8. User Scenario 6: Cross-Platform Chat with Mixed Payloads

Scenario Description

Multiple platforms (JavaScript, Python, Julia) communicate in a chat application with mixed payload types.

Step-by-Step Flow

8.1 Step 1: JavaScript Sends Chat Message

// JavaScript (Frontend)
const [env, msgJson] = await msghandler.smartpack(
    "/chat/user/v1/message",
    [
        ["text", "Check this out!", "text"],
        ["image", imageData, "image"]
    ],
    {
        broker_url: "ws://localhost:4222",
        receiver_name: "",
        msg_purpose: "chat"
    }
);

Rationale:

  • Empty receiver_name = broadcast to all subscribers
  • Chat messages often include text + images
  • Transport wildcard subscriptions route to correct recipients

8.1b Step 1b: JavaScript Publishes via Transport

// Publish via WebSocket/NATS/MQTT
const conn = await transportClient.connect({ servers: "ws://localhost:4222" });
await conn.publish("/chat/user/v1/message", msgJson);

8.2 Step 2: Python Backend Receives

# Python (Backend)
transport_msg = await transport_consumer.next();
env = await smartunpack(str(transport_msg.payload));

# env["payloads"] is now:
# [
#     ("text", "Check this out!", "text"),
#     ("image", binary_data, "image")
# ]

Rationale:

  • Consistent API across platforms
  • Same payload structure regardless of sender
  • Type information preserved

8.3 Step 3: Julia Backend Receives

# Julia (Backend)
transport_msg = transport_subscription.next();
env = smartunpack(String(transport_msg.payload));

# env["payloads"] is now:
# [
#     ("text", "Check this out!", "text"),
#     ("image", binary_data, "image")
# ]

Rationale:

  • Cross-platform API parity
  • Same function signature across platforms
  • Type information enables proper deserialization

8.4 Step 4: Complete End-to-End Flow

flowchart TB
    subgraph Sender["Sender (JavaScript)"]
        direction TB
        JS1["Create data tuples"]
        JS2["smartpack() → JSON string"]
        JS3["Publish via WebSocket"]
    end
    
    subgraph Transport["Transport Layer"]
        direction TB
        BROKER["Message Broker"]
    end
    
    subgraph Receiver["Receiver (Julia/Python)"]
        direction TB
        REC1["Subscribe via WebSocket"]
        REC2["smartunpack(JSON string)"]
        REC3["Get payload tuples"]
    end
    
    JS1 --> JS2
    JS2 --> JS3
    JS3 --> BROKER
    BROKER --> REC1
    REC1 --> REC2
    REC2 --> REC3

Rationale:

  • smartpack() handles serialization, transport selection, and envelope building
  • Caller publishes JSON string via WebSocket/NATS/MQTT
  • smartunpack() handles transport detection, file downloads (if link), and deserialization
  • Same API across all platforms ensures consistency

8.6 Step 6: Complete Cross-Platform Flow

flowchart TB
    subgraph JS["JavaScript (Frontend)"]
        J1["Create message tuples"]
        J2["smartpack()"]
        J3["Publish via WebSocket"]
        J4["Subscribe for replies"]
    end
    
    subgraph Python["Python (Backend)"]
        P1["Subscribe via NATS"]
        P2["smartunpack()"]
        P3["Process data"]
        P4["smartpack()"]
        P5["Publish reply via NATS"]
    end
    
    subgraph Julia["Julia (Backend)"]
        L1["Subscribe via MQTT"]
        L2["smartunpack()"]
        L3["Process data"]
        L4["smartpack()"]
        L5["Publish reply via MQTT"]
    end
    
    subgraph Transport["Message Broker (NATS/MQTT/WebSocket)"]
        direction TB
        NATS["NATS: ws://localhost:4222"]
        MQTT["MQTT: broker.local:1883"]
    end
    
    J2 --> J3
    J3 --> NATS
    NATS --> P1
    P1 --> P2
    P2 --> P3
    P3 --> P4
    P4 --> P5
    P5 --> MQTT
    MQTT --> L1
    L1 --> L2
    L2 --> L3
    L3 --> L4
    L4 --> L5
    L5 --> NATS
    NATS --> J4
    
    style JS fill:#e3f2fd,stroke:#1976d2
    style Python fill:#fff3e0,stroke:#f57c00
    style Julia fill:#e8f5e9,stroke:#388e3c
    style Transport fill:#f3e5f5,stroke:#7b1fa2

Key Points:

  • Same smartpack()/smartunpack() API across all platforms
  • JSON wire format ensures compatibility
  • Each platform uses their preferred transport protocol

9. Transport Layer Integration

msghandler is transport-agnostic - it only handles serialization/deserialization. The caller is responsible for:

9.1 Publishing Flow (Sender)

  1. Call smartpack() → gets (envelope, json_string)
  2. Publish JSON string via transport (WebSocket/MQTT/NATS/etc.)
  3. Transport delivers message to subscriber

9.2 Subscribing Flow (Receiver)

  1. Subscribe to transport topic
  2. Receive JSON string from transport
  3. Call smartunpack(json_string) → gets payloads array

9.3 Complete End-to-End Example

JavaScript → Julia Complete Flow

flowchart LR
    subgraph JavaScript["JavaScript Webapp"]
        J1["Create tuples"]
        J2["smartpack()"]
        J3["Publish via WebSocket"]
    end
    
    subgraph Transport["NATS Broker"]
        direction TB
        NATS["ws://localhost:4222"]
    end
    
    subgraph Julia["Julia Backend"]
        J4["Subscribe via WebSocket"]
        J5["smartunpack()"]
        J6["Process payloads"]
    end
    
    J1 --> J2
    J2 --> J3
    J3 --> NATS
    NATS --> J4
    J4 --> J5
    J5 --> J6
// JavaScript - Sender
const [env, msgJson] = await msghandler.smartpack(
    "/agent/wine/api/v1/prompt",
    [["msg", "Hello!", "text"]],
    { broker_url: "ws://localhost:4222" }
);
const conn = await nats.connect({ servers: "ws://localhost:4222" });
await conn.publish("/agent/wine/api/v1/prompt", msgJson);
# Julia - Receiver
transport_msg = transport_subscription.next()
env = smartunpack(String(transport_msg.payload))
# env["payloads"] is now: [("msg", "Hello!", "text")]

Transport Examples (Single Platform)

const conn = await nats.connect({ servers: "ws://localhost:4222" });
await conn.publish(topic, json_string);

MQTT (Python)

await mqtt_client.publish(topic, json_string)

WebSocket (Browser)

const ws = new WebSocket("ws://localhost:4222");
ws.send(json_string);

Rust (Tokio)

conn.publish(topic, &json_str)?;

9.4 Important Notes

  • JSON format: All messages use JSON string format for cross-platform compatibility
  • Caller responsibility: Transport publishing/subscription is always the caller's code
  • No vendor lock-in: Works with any message broker that supports your platform

10. Error Handling

10.1 Common Error Scenarios

Scenario Error Recovery
File server unavailable UPLOAD_FAILED Fall back to direct transport or smaller payloads
File server download fails DOWNLOAD_FAILED Retry with exponential backoff
Payload type mismatch DESERIALIZATION_ERROR Validate payload_type matches data
Transport connection lost TRANSPORT_CONNECTION_FAILED Transport client auto-reconnects

10.2 Error Response Format

{
  "correlation_id": "abc123...",
  "error": {
    "code": "DOWNLOAD_FAILED",
    "message": "Failed to fetch data after 5 attempts",
    "details": {
      "url": "http://localhost:8080/file/...",
      "correlation_id": "abc123..."
    }
  }
}

11. Debugging and Tracing

11.1 Correlation ID Tracking

Every message includes a correlation_id:

# At start of request
correlation_id = string(uuid4());

# Use throughout the flow
log_trace(correlation_id, "Starting smartpack");
log_trace(correlation_id, "Serialized payload size: 100 bytes");
log_trace(correlation_id, "Published to transport");

Log Format:

[2026-03-13T16:30:00.000Z] [Correlation: abc123...] Starting smartpack
[2026-03-13T16:30:00.001Z] [Correlation: abc123...] Serialized payload size: 100 bytes
[2026-03-13T16:30:00.002Z] [Correlation: abc123...] Published to transport

Log Format:

[2026-03-13T16:30:00.000Z] [Correlation: abc123...] Starting smartpack
[2026-03-13T16:30:00.001Z] [Correlation: abc123...] Serialized payload size: 100 bytes
[2026-03-13T16:30:00.002Z] [Correlation: abc123...] Published to transport

12. Performance Considerations

12.1 Optimization Strategies

Strategy Description When to Use
Pre-create transport connection Reuse connection for multiple sends High-throughput scenarios
Adjust size threshold Increase threshold if file server slow File server bottleneck
Use direct transport Avoid file server for small payloads Low latency requirements

12.2 Size Threshold by Platform

Platform Threshold Notes
Desktop (Julia/JS/Python/Dart) 500,000 bytes (0.5MB) Default threshold
Dart Desktop 500,000 bytes (0.5MB) Default threshold
Dart Flutter 500,000 bytes (0.5MB) Default threshold
Dart Web 500,000 bytes (0.5MB) Default threshold
MicroPython 100,000 bytes (100KB) Lower threshold for memory constraints

13. Deployment Considerations

12.1 Minimum Infrastructure

Component Minimum Notes
Message Broker 1 instance Single node for development
File Server 1 instance HTTP server for large payloads
Client Memory 50MB Desktop platforms (Julia/JS/Python/Dart)
Client Memory 256KB MicroPython devices

12.2 Environment Variables

Variable Default Description
BROKER_URL ws://localhost:4222 Message broker URL
FILESERVER_URL http://localhost:8080 HTTP file server URL
SIZE_THRESHOLD 500000 Size threshold in bytes (0.5MB)

Change Log

Date Version Changes
2026-05-22 1.5.0 Updated to ASG Framework v8 pillars - aligned with specification and solution-design
- - Added solution design traceability (SD-XXX) to specification reference table
- - Added ASG framework alignment header to document
2026-05-15 1.4.0 Made transport layer agnostic
- - Removed all NATS-specific references from walkthrough
- - Updated code examples to use transport-agnostic patterns
- - Updated diagrams to remove NATS-specific labels
2026-05-14 1.3.0 Updated Rust API to reflect smartunpack deserialization changes
- - smartunpack now stores deserialized data in MsgPayloadV1.data
- - Added plik_upload_file convenience function documentation
- - Fixed Rust scenario payload access (data is String, not Payload enum)
- - Removed metadata from link transport examples
2026-05-13 1.2.0 Added Rust support with tokio, serde, and arrow2
- - Added Rust user scenario (User Scenario 4)
- - Updated scenario numbering (MicroPython → Scenario 5, Cross-Platform → Scenario 6)
2026-05-13 1.1.0 Aligned with ground truth implementation (src/msghandler.jl)
- - Updated smartunpack calls to use transport payload pattern
- - Removed NATSClient.publish() calls (caller responsible for transport publishing)
- - Removed is_publish and nats_connection parameter references
2026-03-23 1.0.0 Updated to ASG Framework walkthrough guidelines
2026-03-13 1.0.0 Initial walkthrough documentation

14. Gap-Check Validation

Stage Transition Gap-Check Question Status
Requirements → Specification Does the Specification define all edge cases and conflict scenarios from the Requirements? Verified - All FR-XXX requirements have corresponding spec rules
Specification → UI Specification Does the UI Specification expose all the data and states defined in the Specification? Pending - UI spec not yet created
UI Specification → Walkthrough Does the Walkthrough reflect the complete flow including error states and timing? Pending - UI spec not yet created
Walkthrough → Architecture Does the Architecture support the performance and integration requirements defined in the Walkthrough? Pending - Architecture not yet created

15. Complete End-to-End Flow Summary

The msghandler transport-agnostic pattern works as follows:

Sender:                    Transport Layer:           Receiver:
smartpack()                Publish JSON string         Subscribe
    ↓                           ↓                        ↓
(envelope, json_str)    ────────────→   JSON string    smartunpack()
                                                ↓
                                         payloads array

Key Points:

  • smartpack() handles serialization, transport selection, and envelope building
  • Caller publishes JSON string via WebSocket/NATS/MQTT
  • smartunpack() handles transport detection, file downloads (if link), and deserialization
  • Same API across all platforms ensures consistency

16. References

16.1 Documentation Artifacts

Document Purpose Specification Traceability Solution Design Traceability
docs/requirements.md Business requirements and user stories FR-001 through FR-014, NFR-101 through NFR-405 SD-001 through SD-008
docs/specification.md Technical contract for msghandler specification.md:2-19 (all sections) SD-001 through SD-008
docs/ui-specification.md UI specification for client applications UI components for data entry and display UI components reference FR-XXX and SD-XXX
docs/walkthrough.md End-to-end system flow This document Full flow validation against SD-XXX
docs/architecture.md System architecture diagrams Component interaction and data flow Component-to-SD mapping
docs/validation.md CI/CD validation rules Contract testing and spec compliance Validation gates for SD-XXX
docs/runbook.md Operational runbook Deployment, scaling, and troubleshooting Operation-to-SD mapping

16.2 Implementation Files

File Platform Features Specification Traceability Solution Design Traceability
src/msghandler.jl Julia Full feature set, Arrow IPC, multiple dispatch FR-001 through FR-014, NFR-101 through NFR-405 SD-001 through SD-008
src/msghandler_ssr.js Node.js Arrow IPC, async/await FR-001 through FR-014, NFR-101 through NFR-405 SD-001 through SD-008
src/msghandler_csr.js Browser JSON table only FR-001 through FR-014, NFR-101 through NFR-405 SD-001 through SD-008
src/msghandler.py Python Arrow IPC, async/await FR-001 through FR-014, NFR-101 through NFR-405 SD-001 through SD-008
src/msghandler.dart Dart Full feature set, Arrow IPC, async/await FR-001 through FR-014, NFR-101 through NFR-405 SD-001 through SD-008
src/msghandler.rs Rust Full feature set, Arrow IPC, async/await, type-safe, file upload helpers FR-001 through FR-014, NFR-101 through NFR-405 SD-001 through SD-008
src/msghandler_mpy.py MicroPython Limited to direct transport FR-005, FR-006, FR-012 SD-002, SD-004

17. Change Log

Date Version Changes Specification Reference Solution Design Reference
2026-05-22 1.6.0 Added complete transport integration examples with end-to-end flow diagrams Section 9 SD-006
2026-05-22 1.5.0 Updated to ASG Framework v8 pillars - aligned with specification and solution-design All sections All SD-XXX
2026-05-15 1.4.0 Made transport layer agnostic All sections SD-001 through SD-008
2026-05-14 1.3.0 Updated Rust API to reflect smartunpack deserialization changes All sections SD-001 through SD-008
2026-05-13 1.2.0 Added Rust support with tokio, serde, and arrow2 specification.md:11 (Rust API) SD-001 through SD-008
2026-05-13 1.1.0 Aligned with ground truth implementation (src/msghandler.jl) All sections SD-001 through SD-008
2026-03-23 1.0.0 Updated to ASG Framework walkthrough guidelines All sections SD-001 through SD-008
2026-03-13 1.0.0 Initial walkthrough documentation specification.md:2-19 (all sections) SD-001 through SD-008

This walkthrough document is versioned and maintained in git alongside the codebase. All implementations must adhere to this documentation.


18. ASG Framework Validation

Pillar Status Reference
Requirements Complete requirements.md: FR-001 through FR-014, NFR-101 through NFR-405
Solution Design Complete solution-design.md: SD-001 through SD-008
Specification Complete specification.md: Section 2-19
Walkthrough Complete walkthrough.md: Sections 2-18 (includes Transport Layer Integration)
Implementation Plan Pending implementation-plan.md
Validation Pending validation.md
Runbook Pending runbook.md

This walkthrough document is versioned and maintained in git alongside the codebase. All implementations must adhere to this documentation.