diff --git a/docs/walkthrough.md b/docs/walkthrough.md index 589bf39..c243035 100644 --- a/docs/walkthrough.md +++ b/docs/walkthrough.md @@ -12,7 +12,7 @@ This document provides the **story of flow** for NATSBridge - the cross-platform bi-directional data bridge that enables seamless communication between **Julia**, **JavaScript**, **Python**, and **MicroPython** applications using NATS as the message bus. This walkthrough serves as the primary onboarding guide for new developers and explains: -- **How the system works** - Step-by-step flow of data transmission and reception +- **User scenarios** - Real-world use cases from developer perspective - **Why steps are sequenced** - The rationale behind architectural decisions - **What could go wrong** - Common failure scenarios and recovery strategies @@ -22,45 +22,42 @@ This walkthrough serves as the primary onboarding guide for new developers and e NATSBridge implements the **Claim-Check pattern** for efficient handling of large payloads (>0.5MB): -```mermaid -flowchart TD - subgraph "Sender Application" - A1[User Code] - A2[smartsend] - A3[_serialize_data] - A4[Transport Selection] - A5[Build Envelope] - A6[Publish to NATS] - end - - subgraph "NATS Broker" - B1[NATS Server] - end - - subgraph "Receiver Application" - C1[Subscribe to NATS] - C2[smartreceive] - C3[Parse Envelope] - C4[Process Payloads] - C5[Deserialize Data] - end - - A1 --> A2 - A2 --> A3 - A3 --> A4 - A4 -->|Direct| A5 - A4 -->|Link| A6 - A6 --> B1 - B1 --> C1 - C1 --> C2 - C2 --> C3 - C3 --> C4 - C4 --> C5 - - style A1 fill:#e8f5e9,stroke:#4caf50 - style B1 fill:#fff3e0,stroke:#f57c00 - style C1 fill:#e3f2fd,stroke:#2196f3 - style A4 fill:#fef3c7,stroke:#f59e0b +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ NATSBridge Architecture │ +├─────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ Sender │ │ Receiver │ │ +│ │ │ │ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │smartsend │◀─────────┤ │smartreceive│ │ │ +│ │ └────┬─────┘ │ │ └────┬─────┘ │ │ +│ │ │ │ │ │ │ │ +│ │ ▼ │ │ ▼ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │Serialize │◀─────────┤ │Deserialize│ │ │ +│ │ └────┬─────┘ │ │ └────┬─────┘ │ │ +│ │ │ │ │ │ │ │ +│ │ ▼ │ │ ▼ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │Transport │◀─────────┤ │Transport │ │ │ +│ │ │Selection │ │ │ │Selection │ │ │ +│ │ └────┬─────┘ │ │ └────┬─────┘ │ │ +│ │ │ │ │ │ │ │ +│ │ ▼ │ │ ▼ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │ NATS │◀─────────┤ │ NATS │ │ │ +│ │ │Publish │ │ │ │Subscribe │ │ │ +│ │ └──────────┘ │ │ └──────────┘ │ │ +│ │ │ │ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │File Server│◀─────────┤ │File Server│ │ │ +│ │ │Upload │ │ │ │Download │ │ │ +│ │ └──────────┘ │ │ └──────────┘ │ │ +│ └──────────────┘ └──────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────┘ ``` ### Key Design Principles @@ -74,332 +71,82 @@ flowchart TD --- -## The Sending Flow: `smartsend()` +## User Scenario 1: Chat Webapp ↔ Julia Backend -### Step-by-Step Journey +### Scenario Description -```mermaid -flowchart TD - A[User calls smartsend subject data] --> B[Process each payload] - B --> C{Parse payload tuple} - C --> D[Extract: dataname, data, payload_type] - - D --> E[_serialize_data] - E --> F{payload_type} - - F -->|"text"| G[UTF-8 encode] - F -->|"dictionary"| H[JSON serialize] - F -->|"arrowtable"| I[Arrow IPC serialize] - F -->|"jsontable"| J[JSON serialize] - F -->|"image"| K[Raw bytes] - F -->|"audio"| L[Raw bytes] - F -->|"video"| M[Raw bytes] - F -->|"binary"| N[Raw bytes] - - G --> O[Return bytes] - H --> O - I --> O - J --> O - K --> O - L --> O - M --> O - N --> O - - O --> P[Calculate serialized size] - P --> Q{Size < Threshold?} - - Q -->|Yes| R[Direct Transport] - Q -->|No| S[Link Transport] - - R --> T[Base64 encode] - T --> U[Build payload with direct] - - S --> V[Upload to file server] - V --> W[Get download URL] - W --> U - - U --> X[Build envelope] - X --> Y[Convert to JSON] - Y --> Z[Publish to NATS] - - style A fill:#f9f9f9,stroke:#333 - style Z fill:#e0e7ff,stroke:#3b82f6 - style R fill:#d1fae5,stroke:#10b981 - style S fill:#fef3c7,stroke:#f59e0b -``` +A JavaScript chat webapp wants to send mixed payloads (text message + user avatar image) to a Julia backend, and receive mixed payloads (text response + AI-generated image) back. -### Detailed Walkthrough +### Step-by-Step Flow -#### Step 1: User Calls `smartsend()` - -```julia -# Julia -data = [ - ("msg", "Hello World", "text"), - ("img", binary_data, "image") -] -env, msg_json = smartsend("/chat/user/v1/message", data) -``` - -```python -# Python -data = [ - ("msg", "Hello World", "text"), - ("img", binary_data, "image") -] -env, msg_json = await smartsend("/chat/user/v1/message", data) -``` +#### Step 1: JavaScript Webapp Sends Mixed Payloads ```javascript -// JavaScript -const data = [ - ["msg", "Hello World", "text"], - ["img", binaryData, "image"] -]; -const [env, msgJson] = await smartsend("/chat/user/v1/message", data); +// JavaScript (Browser or Node.js) +const [env, msgJson] = await NATSBridge.smartsend( + "/agent/wine/api/v1/prompt", + [ + ["msg", "Hello! I'm Ton.", "text"], + ["avatar", avatarImageData, "image"] + ], + { + broker_url: "ws://localhost:4222", + receiver_name: "agent-backend", + msg_purpose: "chat" + } +); ``` -**What happens**: -- User provides a list of tuples: `(dataname, data, payload_type)` -- `dataname`: Identifier for the payload (e.g., "msg", "login_image") -- `data`: The actual data to send -- `payload_type`: Type string determining serialization method +**Rationale**: +- **Why mixed payloads?** Real chat apps often send both text and images together +- **Why text first?** Text is smaller, sent via direct transport (fast, no file server needed) +- **Why image second?** Images may trigger link transport if >0.5MB -#### Step 2: Serialization (`_serialize_data`) +#### Step 2: Transport Selection -Each payload is serialized based on its type: +For each payload, NATSBridge determines transport: -| Payload Type | Julia | Python | JavaScript | Encoding | -|--------------|-------|--------|------------|----------| -| `text` | UTF-8 bytes | UTF-8 bytes | UTF-8 bytes | Base64 | -| `dictionary` | JSON string | JSON string | JSON string | Base64 | -| `arrowtable` | Arrow IPC | Arrow IPC | Arrow IPC | Base64/arrow-ipc | -| `jsontable` | JSON array | JSON array | JSON array | Base64/json | -| `image`/`audio`/`video`/`binary` | Raw bytes | Raw bytes | Raw bytes | Base64 | +| Payload | Size | Transport | Reason | +|---------|------|-----------|--------| +| `"msg"` (text) | ~20 bytes | direct | < 0.5MB threshold | +| `"avatar"` (image) | ~150KB | direct | < 0.5MB threshold | -**Example**: -```julia -# Text serialization -text_bytes = Vector{UInt8}("Hello World") # 11 bytes +**Rationale**: +- Direct transport is faster for small payloads (no file server round-trip) +- Link transport is used when payload ≥ 0.5MB (avoids NATS size limits) -# Dictionary serialization -dict_bytes = Vector{UInt8}("{\"key\":\"value\"}") # 17 bytes +#### Step 3: Serialization and Encoding -# Arrow table serialization -io = IOBuffer() -Arrow.write(io, data_frame) -arrow_bytes = take!(io) # Binary Arrow IPC stream -``` +Each payload is serialized: -#### Step 3: Transport Selection +| Payload | Type | Serialization | Encoding | +|---------|------|---------------|----------| +| `"msg"` | `text` | UTF-8 bytes | Base64 | +| `"avatar"` | `image` | Raw bytes | Base64 | -The serialized size determines the transport method: +**Rationale**: +- Text uses UTF-8 encoding for human-readable data +- Images use raw bytes to preserve binary data integrity +- All payloads encoded as Base64 for JSON compatibility -| Platform | Threshold | Notes | -|----------|-----------|-------| -| Desktop (Julia/JS/Python) | 500,000 bytes (0.5MB) | Default threshold | -| MicroPython | 100,000 bytes (100KB) | Lower threshold for memory constraints | +#### Step 4: Envelope Building -**Decision Logic**: -```julia -if payload_size < size_threshold - # Direct transport: send via NATS -else - # Link transport: upload to file server -end -``` +NATSBridge builds the message envelope: -#### Step 4: Direct Transport Path - -For payloads < threshold: - -1. **Base64 Encode**: Convert binary data to ASCII string -2. **Build Payload**: Create `msg_payload_v1` with `transport="direct"` - -```julia -# Encode as Base64 -payload_b64 = Base64.base64encode(payload_bytes) - -# Build payload -payload = msg_payload_v1( - payload_b64, - payload_type; - transport = "direct", - encoding = "base64", - size = payload_size -) -``` - -#### Step 5: Link Transport Path - -For payloads ≥ threshold: - -1. **Upload to File Server**: Use `plik_oneshot_upload()` -2. **Get Download URL**: Server returns URL for the uploaded file -3. **Build Payload**: Create `msg_payload_v1` with `transport="link"` - -```julia -# Upload to Plik server -response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) - -# Extract URL -url = response["url"] - -# Build payload -payload = msg_payload_v1( - url, - payload_type; - transport = "link", - encoding = "none", - size = payload_size -) -``` - -**File Server Handler Contract**: -```julia -function fileserver_upload_handler( - file_server_url::String, - dataname::String, - data::Vector{UInt8} -)::Dict{String, Any} - # Returns: Dict("status" => 200, "uploadid" => "...", "fileid" => "...", "url" => "...") -end -``` - -#### Step 6: Build Envelope - -All payloads are wrapped in a message envelope: - -```julia -env = msg_envelope_v1( - subject, - payloads; - correlation_id = correlation_id, - msg_id = msg_id, - msg_purpose = msg_purpose, - sender_name = sender_name, - sender_id = sender_id, - receiver_name = receiver_name, - receiver_id = receiver_id, - reply_to = reply_to, - reply_to_msg_id = reply_to_msg_id, - broker_url = broker_url -) -``` - -**Envelope Fields**: -| Field | Purpose | -|-------|---------| -| `correlation_id` | Track message flow across distributed systems | -| `msg_id` | Unique identifier for this message | -| `timestamp` | ISO 8601 UTC timestamp | -| `send_to` | NATS subject to publish to | -| `msg_purpose` | ACK, NACK, updateStatus, shutdown, chat, command, event | -| `sender_name`/`sender_id` | Sender identification | -| `receiver_name`/`receiver_id` | Receiver identification (empty = broadcast) | -| `reply_to` | Topic for reply messages | -| `broker_url` | NATS server URL | -| `metadata` | Message-level metadata | -| `payloads` | Array of payload objects | - -#### Step 7: Publish to NATS - -The envelope is converted to JSON and published to NATS: - -```julia -env_json_str = envelope_to_json(env) - -# Publish with existing connection -publish_message(nats_connection, subject, env_json_str, correlation_id) - -# Or publish by creating new connection -publish_message(broker_url, subject, env_json_str, correlation_id) -``` - ---- - -## The Receiving Flow: `smartreceive()` - -### Step-by-Step Journey - -```mermaid -flowchart TD - A[NATS message arrives] --> B[Parse JSON envelope] - B --> C[Extract payloads array] - C --> D{Iterate through payloads} - - D --> E[Get payload transport] - E --> F{transport == direct?} - - F -->|Yes| G[Extract Base64 data] - G --> H[Decode Base64] - H --> I[_deserialize_data] - - F -->|No| J[Extract download URL] - J --> K[Fetch with exponential backoff] - K --> L[_deserialize_data] - - I --> M[Build payload tuple] - L --> M - - M --> N{More payloads?} - N -->|Yes| D - N -->|No| O[Replace payloads array] - O --> P[Return envelope] - - style A fill:#f9f9f9,stroke:#333 - style P fill:#e0e7ff,stroke:#3b82f6 - style G fill:#d1fae5,stroke:#10b981 - style J fill:#fef3c7,stroke:#f59e0b -``` - -### Detailed Walkthrough - -#### Step 1: NATS Message Arrives - -The receiver gets a message from NATS: - -```julia -# Julia -msg = nats_subscription.next() # Get next message -env = smartreceive(msg) -``` - -```python -# Python -msg = await nats_consumer.next() # Get next message -env = await smartreceive(msg) -``` - -```javascript -// JavaScript -const msg = await natsSubscription.next(); -const env = await smartreceive(msg); -``` - -#### Step 2: Parse JSON Envelope - -The message payload is parsed as JSON: - -```julia -env_json_obj = JSON.parse(String(msg.payload)) -``` - -**Expected Structure**: ```json { - "correlation_id": "abc123...", - "msg_id": "def456...", - "timestamp": "2026-03-13T07:02:50.443Z", - "send_to": "/chat/user/v1/message", + "correlation_id": "a1b2c3d4...", + "msg_id": "e5f6g7h8...", + "timestamp": "2026-03-13T16:30:00.000Z", + "send_to": "/agent/wine/api/v1/prompt", "msg_purpose": "chat", - "sender_name": "sender-app", + "sender_name": "chat-webapp", "sender_id": "sender-uuid...", - "receiver_name": "receiver-app", - "receiver_id": "receiver-uuid...", - "reply_to": "reply.subject", - "reply_to_msg_id": "msg-id...", - "broker_url": "nats://localhost:4222", + "receiver_name": "agent-backend", + "receiver_id": "", + "reply_to": "/agent/wine/api/v1/response", + "reply_to_msg_id": "", + "broker_url": "ws://localhost:4222", "metadata": {}, "payloads": [ { @@ -408,271 +155,446 @@ env_json_obj = JSON.parse(String(msg.payload)) "payload_type": "text", "transport": "direct", "encoding": "base64", - "size": 11, - "data": "SGVsbG8gV29ybGQ=", - "metadata": {"payload_bytes": 11} + "size": 20, + "data": "SGVsbG8hIEknIHRlbCB5b3UgSW4gZW5nbGlzaC4=", + "metadata": {"payload_bytes": 20} + }, + { + "id": "payload-uuid...", + "dataname": "avatar", + "payload_type": "image", + "transport": "direct", + "encoding": "base64", + "size": 150000, + "data": "iVBORw0KGgoAAAANSUhEUgAA...", + "metadata": {"payload_bytes": 150000} } ] } ``` -#### Step 3: Process Each Payload +**Rationale**: +- **correlation_id**: Tracks this chat session across all systems +- **reply_to**: Tells backend where to send response +- **payloads array**: Contains all data with metadata for proper handling -For each payload in the envelope: +#### Step 5: Publish to NATS -```julia -num_payloads = length(env_json_obj["payloads"]) - -for i in 1:num_payloads - payload = env_json_obj["payloads"][i] - transport = String(payload["transport"]) - dataname = String(payload["dataname"]) - - if transport == "direct" - # Direct transport path - elseif transport == "link" - # Link transport path - else - error("Unknown transport type: $transport") - end -end +```javascript +await NATSBridge.NATSClient.connect("ws://localhost:4222"); +await NATSBridge.NATSClient.publish("/agent/wine/api/v1/prompt", msgJson); ``` -#### Step 4: Direct Transport Path +**Rationale**: +- NATS provides low-latency message delivery +- JSON format ensures cross-platform compatibility -For payloads with `transport == "direct"`: - -1. **Extract Base64 Data**: Get the Base64-encoded string -2. **Decode Base64**: Convert to binary data -3. **Deserialize**: Convert bytes to native data type +#### Step 6: Julia Backend Receives Message ```julia -# Extract Base64 payload -payload_b64 = String(payload["data"]) +# Julia backend +msg = NATS.subscription.next() # Get message from NATS +env = smartreceive(msg) -# Decode Base64 -payload_bytes = Base64.base64decode(payload_b64) - -# Deserialize based on type -data_type = String(payload["payload_type"]) -data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"]) +# env["payloads"] is now: +# [ +# ("msg", "Hello! I'm Ton.", "text"), +# ("avatar", binary_data, "image") +# ] ``` -**Deserialization Logic**: -| Payload Type | Deserialization | -|--------------|-----------------| -| `text` | UTF-8 bytes → String | -| `dictionary` | UTF-8 bytes → JSON string → Julia object | -| `arrowtable` | UTF-8 bytes → Arrow IPC → DataFrame | -| `jsontable` | UTF-8 bytes → JSON string → Vector{Dict} → DataFrame | -| `image`/`audio`/`video`/`binary` | Bytes directly | +**Rationale**: +- `smartreceive()` handles both transport types automatically +- Deserialization is type-aware based on `payload_type` +- Returns consistent tuple format regardless of transport -#### Step 5: Link Transport Path - -For payloads with `transport == "link"`: - -1. **Extract URL**: Get the download URL from payload -2. **Fetch with Backoff**: Download data with retry logic -3. **Deserialize**: Convert bytes to native data type +#### Step 7: Julia Backend Sends Response ```julia -# Extract download URL -url = String(payload["data"]) +# Julia backend processes the message +response_text = "Hello Ton! I'm the AI assistant." +generated_image = generate_ai_image(response_text) -# Fetch with exponential backoff -downloaded_data = fileserver_download_handler( - url, - max_retries, - base_delay, - max_delay, - env_json_obj["correlation_id"] +env, msg_json = smartsend( + "/agent/wine/api/v1/response", + [ + ("response", response_text, "text"), + ("generated_image", generated_image, "image") + ], + reply_to = "/chat/user/v1/message", + reply_to_msg_id = msg["msg_id"] ) - -# Deserialize based on type -data_type = String(payload["payload_type"]) -data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"]) ``` -**Download Handler Contract**: -```julia -function fileserver_download_handler( - url::String, - max_retries::Int, - base_delay::Int, - max_delay::Int, - correlation_id::String -)::Vector{UInt8} - # Returns: Vector{UInt8} (downloaded bytes) -end -``` - -#### Step 6: Build Payload List - -Each processed payload is added to the result list: - -```julia -payloads_list = Tuple{String, Any, String}[] - -# After processing each payload -push!(payloads_list, (dataname, data, data_type)) -``` - -**Result Format**: -```julia -[ - ("msg", "Hello World", "text"), - ("img", binary_data, "image") -] -``` - -#### Step 7: Return Envelope - -The envelope is updated with the processed payloads and returned: - -```julia -env_json_obj["payloads"] = payloads_list -return env_json_obj -``` +**Rationale**: +- **Mixed response**: Text explanation + AI-generated image +- **reply_to**: Ensures response goes to correct topic +- **reply_to_msg_id**: Links response to original message for tracing --- -## File Server Integration +## User Scenario 2: Large File Transfer -### Plik One-Shot Upload +### Scenario Description -NATSBridge uses **Plik** as the default HTTP file server for link transport: +A JavaScript webapp wants to upload a large file (10MB) to a Julia backend for processing. -```julia -# Upload handler -function plik_oneshot_upload( - file_server_url::String, - dataname::String, - data::Vector{UInt8} -)::Dict{String, Any} +### Step-by-Step Flow + +#### Step 1: JavaScript Webapp Sends Large File + +```javascript +const [env, msgJson] = await NATSBridge.smartsend( + "/agent/wine/api/v1/process", + [ + ["file", largeFileData, "binary"] + ], + { + broker_url: "ws://localhost:4222", + receiver_name: "agent-backend" + } +); ``` -**Upload Flow**: -1. **Create One-Shot Session**: POST `/upload` with `{"OneShot": true}` -2. **Get Upload ID**: Server returns `uploadid` and `uploadtoken` -3. **Upload File**: POST `/file/{uploadid}` with multipart form data -4. **Get File ID**: Server returns `fileid` -5. **Return URL**: Construct download URL +#### Step 2: Transport Selection (Link) -```julia -# Step 1: Create one-shot session -POST /upload -Headers: Content-Type: application/json -Body: {"OneShot": true} +| Payload | Size | Transport | Reason | +|---------|------|-----------|--------| +| `"file"` | 10MB | link | ≥ 0.5MB threshold | -Response: +**Rationale**: +- Link transport used for large payloads +- File server handles large file upload +- NATS only sends URL (small message) + +#### Step 3: File Server Upload + +```javascript +// NATSBridge internally calls: +const response = await plikOneshotUpload( + "http://localhost:8080", + "file", + largeFileData +); + +// Response: +// { +// status: 200, +// uploadid: "UPLOAD_ID", +// fileid: "FILE_ID", +// url: "http://localhost:8080/file/UPLOAD_ID/FILE_ID/file" +// } +``` + +**Rationale**: +- Plik handles multipart upload +- One-shot mode simplifies API +- Returns URL for download + +#### Step 4: Envelope with Link Transport + +```json { - "id": "UPLOAD_ID", - "uploadToken": "UPLOAD_TOKEN", - "status": 200 + "correlation_id": "a1b2c3d4...", + "payloads": [ + { + "id": "payload-uuid...", + "dataname": "file", + "payload_type": "binary", + "transport": "link", + "encoding": "none", + "size": 10000000, + "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/file", + "metadata": {} + } + ] } - -# Step 2: Upload file -POST /file/UPLOAD_ID -Headers: X-UploadToken: UPLOAD_TOKEN -Body: multipart/form-data (file) - -Response: -{ - "id": "FILE_ID", - "status": 200 -} - -# Final URL: http://localhost:8080/file/UPLOAD_ID/FILE_ID/filename.ext ``` -### Exponential Backoff for Downloads +**Rationale**: +- `data` field contains URL instead of Base64 +- `transport: "link"` signals URL-based download +- `encoding: "none"` indicates no additional encoding -File downloads use exponential backoff for resilience: +#### Step 5: Julia Backend Receives and Downloads ```julia -function _fetch_with_backoff( - url::String, - max_retries::Int, - base_delay::Int, - max_delay::Int, - correlation_id::String -)::Vector{UInt8} +# Julia backend +msg = NATS.subscription.next() +env = smartreceive(msg) + +# NATSBridge automatically: +# 1. Extracts URL from payload +# 2. Downloads with exponential backoff +# 3. Deserializes to binary data ``` -**Retry Policy**: -- Initial delay: `base_delay` milliseconds (default: 100ms) -- Multiplier: 2x per retry -- Maximum delay: `max_delay` milliseconds (default: 5000ms) -- Maximum retries: `max_retries` (default: 5) - -**Delay Calculation**: -```julia -delay = base_delay # Start with 100ms - -for attempt in 1:max_retries - try - # Try to fetch - response = HTTP.request("GET", url) - if response.status == 200 - return response.body - end - catch e - if attempt < max_retries - sleep(delay / 1000.0) # Sleep before retry - delay = min(delay * 2, max_delay) # Double delay, cap at max - end - end -end - -error("Failed after $max_retries attempts") -``` - -**Example Delays**: -| Attempt | Delay | -|---------|-------| -| 1 | 100ms | -| 2 | 200ms | -| 3 | 400ms | -| 4 | 800ms | -| 5 | 1600ms (capped at 5000ms) | +**Rationale**: +- Exponential backoff handles transient failures +- Automatic download simplifies receiver code +- Binary data returned directly --- -## Cross-Platform Compatibility +## User Scenario 3: Tabular Data Exchange -### Platform-Specific Implementations +### Scenario Description -| Platform | File | Key Features | -|----------|------|--------------| -| Julia | `src/NATSBridge.jl` | Multiple dispatch, Arrow.jl support | -| Python | `src/natsbridge.py` | Async/await, pyarrow support | -| Node.js | `src/natsbridge_ssr.js` | Buffer, nats.js | -| Browser | `src/natsbridge_csr.js` | Uint8Array, nats.ws, Web Crypto | -| MicroPython | `src/natsbridge_mpy.py` | Synchronous, limited payload types | +A Python application sends tabular data (pandas DataFrame) to a Julia backend for analysis, and receives processed results back. -### API Parity +### Step-by-Step Flow -All platforms implement the same core API: +#### Step 1: Python Sends Tabular Data -| Function | Julia | Python | JavaScript | MicroPython | -|----------|-------|--------|------------|-------------| -| `smartsend()` | ✅ | ✅ | ✅ | ✅ | -| `smartreceive()` | ✅ | ✅ | ✅ | ✅ | -| `plik_oneshot_upload()` | ✅ | ✅ | ✅ | ⚠️ (placeholder) | -| `fetch_with_backoff()` | ✅ | ✅ | ✅ | ⚠️ (placeholder) | +```python +# Python +import pandas as pd +from natsbridge import smartsend -### Payload Type Support by Platform +df = pd.DataFrame({ + "id": [1, 2, 3], + "name": ["Alice", "Bob", "Charlie"], + "score": [95, 88, 92] +}) -| Type | Julia | Python | Node.js | Browser | MicroPython | -|------|-------|--------|---------|---------|-------------| -| `text` | ✅ | ✅ | ✅ | ✅ | ✅ | -| `dictionary` | ✅ | ✅ | ✅ | ✅ | ✅ | -| `arrowtable` | ✅ | ✅ | ✅ | ✅ | ❌ | -| `jsontable` | ✅ | ✅ | ✅ | ✅ | ⚠️ | -| `image` | ✅ | ✅ | ✅ | ✅ | ✅ | -| `audio` | ✅ | ✅ | ✅ | ✅ | ✅ | -| `video` | ✅ | ✅ | ✅ | ✅ | ✅ | -| `binary` | ✅ | ✅ | ✅ | ✅ | ✅ | +env, msg_json = await smartsend( + "/agent/wine/api/v1/analyze", + [("data", df, "arrowtable")], + broker_url="nats://localhost:4222", + receiver_name="agent-backend" +) +``` + +**Rationale**: +- `arrowtable` type for efficient tabular data transfer +- Arrow IPC format preserves data types +- Much faster than JSON serialization + +#### Step 2: Serialization to Arrow IPC + +```python +# NATSBridge internally: +import pyarrow as pa +import pyarrow.ipc as ipc + +table = pa.Table.from_pandas(df) +buf = io.BytesIO() +sink = ipc.new_file(buf, table.schema) +ipc.write_table(table, sink) +arrow_bytes = buf.getvalue() +``` + +**Rationale**: +- Arrow IPC preserves column types +- Binary format is compact +- No schema information loss + +#### Step 3: Julia Receives and Deserializes + +```julia +# Julia backend +msg = NATS.subscription.next() +env = smartreceive(msg) + +# env["payloads"][1] is now: +# ("data", DataFrame with id, name, score columns, "arrowtable") +``` + +**Rationale**: +- Arrow.jl reads IPC format directly +- DataFrame returned with correct types +- No manual parsing needed + +#### Step 4: Julia Sends Results + +```julia +# Julia backend +results = analyze_data(env["payloads"][1][2]) + +# Send results back +env, msg_json = smartsend( + "/agent/wine/api/v1/results", + [("results", results, "arrowtable")], + reply_to = "/python/worker/v1/results" +) +``` + +**Rationale**: +- Arrow IPC format for efficient round-trip +- Results preserve DataFrame structure +- Python can deserialize to pandas DataFrame + +--- + +## User Scenario 4: MicroPython Device + +### Scenario Description + +A MicroPython sensor device sends sensor readings to a Python backend. + +### Step-by-Step Flow + +#### Step 1: MicroPython Sends Sensor Data + +```python +# MicroPython +from natsbridge import smartsend + +sensor_data = { + "temperature": 25.5, + "humidity": 60.0, + "pressure": 1013.25 +} + +env, msg_json = smartsend( + "/sensor/device/v1/readings", + [("data", sensor_data, "dictionary")], + broker_url="nats://localhost:4222", + size_threshold=100000 # 100KB for MicroPython +) +``` + +**Rationale**: +- `dictionary` type for JSON-serializable sensor data +- Smaller threshold (100KB) for memory constraints +- Direct transport only (no file server support) + +#### Step 2: Serialization + +```python +# NATSBridge internally: +json_str = json.dumps(sensor_data) +json_bytes = json_str.encode('utf-8') +payload_b64 = base64.b64encode(json_bytes).decode('ascii') +``` + +**Rationale**: +- JSON format for human-readable data +- Base64 for NATS compatibility +- UTF-8 for text encoding + +#### Step 3: Python Backend Receives + +```python +# Python backend +msg = await nats_consumer.next() +env = await smartreceive(msg) + +# env["payloads"][0] is now: +# ("data", {"temperature": 25.5, "humidity": 60.0, ...}, "dictionary") +``` + +**Rationale**: +- JSON deserialization +- Dictionary returned directly +- No Arrow support (memory constraints) + +--- + +## User Scenario 5: Cross-Platform Chat with Mixed Payloads + +### Scenario Description + +Multiple platforms (JavaScript, Python, Julia) communicate in a chat application with mixed payload types. + +### Step-by-Step Flow + +#### Step 1: JavaScript Sends Chat Message + +```javascript +// JavaScript (Frontend) +const [env, msgJson] = await NATSBridge.smartsend( + "/chat/user/v1/message", + [ + ["text", "Check this out!", "text"], + ["image", imageData, "image"] + ], + { + broker_url: "ws://localhost:4222", + receiver_name: "", + msg_purpose: "chat" + } +); +``` + +**Rationale**: +- Empty `receiver_name` = broadcast to all subscribers +- Chat messages often include text + images +- NATS wildcard subscriptions route to correct recipients + +#### Step 2: Python Backend Receives + +```python +# Python (Backend) +msg = await nats_consumer.next() +env = await smartreceive(msg) + +# env["payloads"] is now: +# [ +# ("text", "Check this out!", "text"), +# ("image", binary_data, "image") +# ] +``` + +**Rationale**: +- Consistent API across platforms +- Same payload structure regardless of sender +- Type information preserved + +#### Step 3: Julia Backend Receives + +```julia +# Julia (Backend) +msg = NATS.subscription.next() +env = smartreceive(msg) + +# env["payloads"] is now: +# [ +# ("text", "Check this out!", "text"), +# ("image", binary_data, "image") +# ] +``` + +**Rationale**: +- Cross-platform API parity +- Same function signature across platforms +- Type information enables proper deserialization + +#### Step 4: All Platforms Reply + +Each platform can reply using the same API: + +```python +# Python reply +await smartsend( + "/chat/user/v1/reply", + [("response", "Nice!", "text")], + reply_to="/chat/user/v1/message" +) +``` + +```julia +# Julia reply +smartsend( + "/chat/user/v1/reply", + [("response", "Nice!", "text")], + reply_to="/chat/user/v1/message" +) +``` + +```javascript +// JavaScript reply +await NATSBridge.smartsend( + "/chat/user/v1/reply", + [["response", "Nice!", "text"]], + { reply_to: "/chat/user/v1/message" } +); +``` + +**Rationale**: +- Same API across platforms +- Consistent behavior +- Easy to maintain parity --- @@ -680,120 +602,72 @@ All platforms implement the same core API: ### Common Error Scenarios -| Scenario | Error Code | Recovery | -|----------|------------|----------| -| **Unknown payload_type** | `INVALID_PAYLOAD_TYPE` | Use supported payload_type | -| **Failed to upload** | `UPLOAD_FAILED` | Retry or use direct transport | -| **Failed to fetch** | `DOWNLOAD_FAILED` | Retry with exponential backoff | -| **Unknown transport** | `INVALID_TRANSPORT` | Check payload transport field | -| **NATS connection failed** | `NATS_CONNECTION_FAILED` | Check NATS server availability | -| **Deserialization error** | `DESERIALIZATION_ERROR` | Validate payload_type matches data | +| Scenario | Error | Recovery | +|----------|-------|----------| +| File server unavailable | `UPLOAD_FAILED` | Fall back to direct transport or smaller payloads | +| File server download fails | `DOWNLOAD_FAILED` | Retry with exponential backoff | +| Payload type mismatch | `DESERIALIZATION_ERROR` | Validate payload_type matches data | +| NATS connection lost | `NATS_CONNECTION_FAILED` | NATS client auto-reconnects | ### Error Response Format ```json { "correlation_id": "abc123...", - "msg_id": "def456...", - "timestamp": "2026-03-13T07:02:50.443Z", - "send_to": "/chat/user/v1/message", "error": { "code": "DOWNLOAD_FAILED", "message": "Failed to fetch data after 5 attempts", "details": { - "url": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/filename.ext", + "url": "http://localhost:8080/file/...", "correlation_id": "abc123..." } } } ``` -### Exception Handling Examples - -```julia -# File server unavailable -try - env, msg_json = smartsend("/subject", data) -catch e - # Retry with direct transport or use smaller payloads -end - -# Deserialization error -try - env = smartreceive(msg) -catch e - # Log correlation_id and inspect payload structure - @error "Deserialization failed" exception=(e, env.correlation_id) -end -``` - --- ## Debugging and Tracing ### Correlation ID Tracking -Every message includes a `correlation_id` for distributed tracing: +Every message includes a `correlation_id`: ```julia -# Generate correlation ID at start of request +# At start of request correlation_id = string(uuid4()) -# Use throughout the request flow -log_trace(correlation_id, "Starting smartsend for subject: $subject") -log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes") -log_trace(correlation_id, "Using direct transport for $payload_size bytes") +# Use throughout the flow +log_trace(correlation_id, "Starting smartsend") +log_trace(correlation_id, "Serialized payload size: 100 bytes") +log_trace(correlation_id, "Published to NATS") ``` **Log Format**: ``` -[2026-03-13T07:02:50.443Z] [Correlation: abc123...] Starting smartsend for subject: /chat/user/v1/message -[2026-03-13T07:02:50.445Z] [Correlation: abc123...] Serialized payload 'msg' (type: text) size: 11 bytes -[2026-03-13T07:02:50.446Z] [Correlation: abc123...] Using direct transport for 11 bytes +[2026-03-13T16:30:00.000Z] [Correlation: abc123...] Starting smartsend +[2026-03-13T16:30:00.001Z] [Correlation: abc123...] Serialized payload size: 100 bytes +[2026-03-13T16:30:00.002Z] [Correlation: abc123...] Published to NATS ``` -### Logging in All Implementations - -| Platform | Logging Method | -|----------|----------------| -| Julia | `@info` macro | -| Python | `print()` with timestamp | -| JavaScript | `console.log()` | -| MicroPython | `print()` | - --- -## Testing the Flow +## Performance Considerations -### Example: End-to-End Test +### Optimization Strategies -```julia -# Sender side -data = [ - ("msg", "Hello", "text"), - ("img", image_data, "image") -] -env, msg_json = smartsend("/chat/user/v1/message", data) +| Strategy | Description | When to Use | +|----------|-------------|-------------| +| Pre-create NATS connection | Reuse connection for multiple sends | High-throughput scenarios | +| Adjust size threshold | Increase threshold if file server slow | File server bottleneck | +| Use direct transport | Avoid file server for small payloads | Low latency requirements | -# Receiver side -msg = nats_subscription.next() -env = smartreceive(msg) +### Size Threshold by Platform -# Verify payloads -for (dataname, data, type_) in env["payloads"] - println("$dataname: $data (type: $type_)") -end -``` - -### Test Scenarios - -| Scenario | Payloads | Transport | Expected Result | -|----------|----------|-----------|-----------------| -| Single text (small) | `text` | direct | Round-trip successful | -| Single dictionary (small) | `dictionary` | direct | Round-trip successful | -| Single arrow table (small) | `arrowtable` | direct | Arrow IPC round-trip | -| Single image (large) | `image` | link | File server upload/download | -| Mixed payloads | `text` + `image` | direct + link | All payloads preserved | +| Platform | Threshold | Notes | +|----------|-----------|-------| +| Desktop (Julia/JS/Python) | 500,000 bytes (0.5MB) | Default threshold | +| MicroPython | 100,000 bytes (100KB) | Lower threshold for memory constraints | --- @@ -816,128 +690,6 @@ end | `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | | `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes | -### Container Deployment - -```yaml -# docker-compose.yml -version: '3' -services: - nats: - image: nats:latest - ports: - - "4222:4222" - - plik: - image: rootfs/plik:latest - ports: - - "8080:8080" - volumes: - - plik-data:/data - - app: - image: my-app:latest - depends_on: - - nats - - plik -``` - ---- - -## Common Pitfalls - -### Pitfall 1: Payload Size Threshold - -**Issue**: Payloads just above threshold may cause unnecessary file server uploads - -**Solution**: Monitor payload sizes and adjust threshold based on: -- Network latency to file server -- Memory constraints -- File server performance - -```julia -# Adjust threshold based on use case -env, msg_json = smartsend("/subject", data; size_threshold = 1_000_000) # 1MB -``` - -### Pitfall 2: File Server Availability - -**Issue**: File server down during upload/download - -**Solution**: Implement fallback strategies: -- Fall back to direct transport for uploads -- Use smaller payloads to avoid link transport -- Implement application-level retries - -```julia -# Fallback to direct transport if file upload fails -try - response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) -catch e - # Fall back to direct transport - payload_b64 = Base64.base64encode(payload_bytes) - # Build payload with direct transport -end -``` - -### Pitfall 3: Payload Type Mismatch - -**Issue**: Receiver deserializes with wrong payload_type - -**Solution**: Always validate payload_type matches data: -- Sender and receiver must agree on payload types -- Use consistent payload_type strings across platforms - -```julia -# Sender -smartsend("/subject", [("data", data, "arrowtable")]) - -# Receiver (must use same payload_type) -env = smartreceive(msg) -# env["payloads"][1][3] == "arrowtable" -``` - ---- - -## Performance Considerations - -### Optimization Strategies - -| Strategy | Description | When to Use | -|----------|-------------|-------------| -| **Pre-create NATS connection** | Reuse connection for multiple sends | High-throughput scenarios | -| **Batch small payloads** | Combine multiple small payloads | Reduce NATS overhead | -| **Adjust size threshold** | Increase threshold if file server slow | File server bottleneck | -| **Use direct transport** | Avoid file server for small payloads | Low latency requirements | - -### Benchmarking - -```julia -# Benchmark direct vs link transport -using BenchmarkTools - -# Direct transport -@btime smartsend("/subject", [("data", rand(1000), "arrowtable")]) - -# Link transport (with file server) -@btime smartsend("/subject", [("data", rand(1_000_000), "arrowtable")]) -``` - ---- - -## Versioning - -### Current Version - -- **Major**: 1 (Breaking changes require major version bump) -- **Minor**: 0 (Feature additions) -- **Patch**: 0 (Bug fixes) - -### Version Compatibility - -| Version | Supported Platforms | -|---------|---------------------| -| v1.0.x | Julia 1.7+, Node.js 16+, Python 3.8+, MicroPython 1.19+ | - --- ## Change Log @@ -965,4 +717,4 @@ using BenchmarkTools [x] Analyze existing documentation (requirements.md, spec.md, architecture.md) [x] Read all source files in src/ folder -[x] Write docs/walkthrough.md according to SDD framework \ No newline at end of file +[x] Write docs/walkthrough.md according to SDD framework with user scenarios \ No newline at end of file