From 8a5eef6b13bc1fa11b818b15c641427d026eed15 Mon Sep 17 00:00:00 2001 From: narawat Date: Fri, 13 Mar 2026 20:53:35 +0700 Subject: [PATCH] update --- docs/earlier_architecture.md | 475 -------- docs/implementation.md | 1859 ------------------------------ docs/walkthrough.md | 2087 ++++++++++++++-------------------- 3 files changed, 837 insertions(+), 3584 deletions(-) delete mode 100644 docs/earlier_architecture.md delete mode 100644 docs/implementation.md diff --git a/docs/earlier_architecture.md b/docs/earlier_architecture.md deleted file mode 100644 index b1f7929..0000000 --- a/docs/earlier_architecture.md +++ /dev/null @@ -1,475 +0,0 @@ -# Cross-Platform Architecture Documentation: Bi-Directional Data Bridge - -## Overview - -This document describes the architecture for a high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. - -**Supported Platforms:** -- **Julia** - Ground truth implementation with full feature set -- **JavaScript** - Node.js and browser-compatible implementation -- **Python/MicroPython** - Desktop and embedded-compatible implementation - -### Cross-Platform Design Principles - -1. **High-Level API Parity**: All three platforms expose the same `smartsend()` and `smartreceive()` functions with identical signatures and behavior -2. **Idiomatic Implementations**: Each platform uses its native patterns (multiple dispatch in Julia, async/prototype in JS, class-based in Python) -3. **Message Format Consistency**: The `msg_envelope_v1` and `msg_payload_v1` JSON schemas are identical across all platforms -4. **Handler Function Abstraction**: File server operations are abstracted through handler functions for backend flexibility - ---- - -## High-Level API Standard (Cross-Platform) - -### Unified API Signature - -All three platforms expose the same high-level API: - -**Input Format (smartsend):** -``` -[(dataname1, data1, type1), (dataname2, data2, type2), ...] -``` - -**Output Format (smartreceive):** -``` -{ - "correlation_id": "...", - "msg_id": "...", - "timestamp": "...", - "send_to": "...", - "msg_purpose": "...", - "sender_name": "...", - "sender_id": "...", - "receiver_name": "...", - "receiver_id": "...", - "reply_to": "...", - "reply_to_msg_id": "...", - "broker_url": "...", - "metadata": {...}, - "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] -} -``` - -### Supported Payload Types - -| Type | Julia | JavaScript | Python/MicroPython | -|------|-------|------------|-------------------| -| `text` | `String` | `string` | `str` | -| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | -| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | -| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | -| `table` | ❌ | ❌ | `pandas.DataFrame`, `bytes` (Arrow IPC) | -| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | -| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | -| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | -| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray`, `io.BytesIO` | - -**Note on MicroPython:** MicroPython does not support table types (`arrowtable` or `jsontable`) due to memory constraints. Use `dictionary` or `binary` instead. - -### Cross-Platform API Examples - -**Julia:** -```julia -using NATSBridge - -# Send -env, env_json_str = smartsend( - "/chat", - [("message", "Hello!", "text"), ("image", image_bytes, "image")], - broker_url="nats://localhost:4222" -) - -# Receive - returns JSON.Object{String, Any} -env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) -# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}} -# Access payloads: for (dataname, data, type) in env["payloads] -``` - -**JavaScript:** -```javascript -const NATSBridge = require('natsbridge'); - -// Send -const [env, env_json_str] = await NATSBridge.smartsend( - "/chat", - [ - ["message", "Hello!", "text"], - ["image", imageBuffer, "image"] - ], - { broker_url: "nats://localhost:4222" } -); - -// Receive - returns Promise -const env = await NATSBridge.smartreceive(msg, { - fileserver_download_handler: fetchWithBackoff -}); -// env is an object with "payloads" field containing Array of arrays -// Access payloads: for (const [dataname, data, type] of env.payloads) -``` - -**Python:** -```python -from natsbridge import NATSBridge - -# Send -env, env_json_str = NATSBridge.smartsend( - "/chat", - [("message", "Hello!", "text"), ("image", image_bytes, "image")], - broker_url="nats://localhost:4222" -) - -# Receive - returns Tuple[Dict, str] -env = NATSBridge.smartreceive( - msg, - fileserver_download_handler=fetch_with_backoff -) -# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]] -# Access payloads: for dataname, data, type_ in env["payloads"] -``` - -**MicroPython:** -```python -from natsbridge import NATSBridge - -# Send (limited to direct transport due to memory constraints) -env, env_json_str = NATSBridge.smartsend( - "/chat", - [("message", "Hello!", "text")], - broker_url="nats://localhost:4222" -) -``` - ---- - -## Architecture Diagram (Cross-Platform) - -```mermaid -flowchart TD - subgraph Client - App[Julia/JS/Python/MicroPython Application] - end - - subgraph Server - Julia/JS/Python/MicroPython[Julia/JS/Python/MicroPython Service] - NATS[NATS Server] - FileServer[HTTP File Server] - end - - App -->|NATS| NATS - NATS -->|NATS| Julia/JS/Python/MicroPython - Julia/JS/Python/MicroPython -->|NATS| NATS - Julia/JS/Python/MicroPython -->|HTTP POST| FileServer - - style App fill:#e8f5e9 - style Julia/JS/Python/MicroPython fill:#e8f5e9 - style NATS fill:#fff3e0 - style FileServer fill:#f3e5f5 -``` - ---- - -## System Components - -### 1. msg_envelope_v1 - Message Envelope - -**JSON Schema (Identical Across All Platforms):** -```json -{ - "correlation_id": "uuid-v4-string", - "msg_id": "uuid-v4-string", - "timestamp": "2024-01-15T10:30:00Z", - - "send_to": "topic/subject", - "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat", - "sender_name": "agent-wine-web-frontend", - "sender_id": "uuid4", - "receiver_name": "agent-backend", - "receiver_id": "uuid4", - "reply_to": "topic", - "reply_to_msg_id": "uuid4", - "broker_url": "nats://localhost:4222", - - "metadata": { - "content_type": "application/octet-stream", - "content_length": 123456 - }, - - "payloads": [ - { - "id": "uuid4", - "dataname": "login_image", - "payload_type": "image", - "transport": "direct", - "encoding": "base64", - "size": 15433, - "data": "base64-encoded-string", - "metadata": { - "checksum": "sha256_hash" - } - }, - { - "id": "uuid4", - "dataname": "large_arrow_table", - "payload_type": "arrowtable", - "transport": "link", - "encoding": "arrow-ipc", - "size": 524288, - "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow", - "metadata": {} - } - ] -} -``` - -### 2. msg_payload_v1 - Payload Structure - -**JSON Schema (Identical Across All Platforms):** -```json -{ - "id": "uuid4", - "dataname": "login_image", - "payload_type": "image | dictionary | arrowtable | jsontable | table | text | audio | video | binary", - "transport": "direct | link", - "encoding": "none | json | base64 | arrow-ipc", - "size": 15433, - "data": "base64-encoded-string | http-url | json-string", - "metadata": { - "checksum": "sha256_hash" - } -} -``` - -### 3. Transport Strategy Decision Logic (Cross-Platform) - -``` -┌─────────────────────────────────────────────────────────────┐ -│ smartsend Function (All Platforms) │ -│ Accepts: [(dataname1, data1, type1), ...] │ -│ (Type is per payload, not standalone) │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ For each payload: │ -│ 1. Extract type from tuple/array │ -│ 2. Serialize based on type │ -│ 3. Check payload size │ -└─────────────────────────────────────────────────────────────┘ - │ - ┌───────────┴────────────┐ - ▼ ▼ - ┌──────────────┐ ┌──────────────┐ - │ Direct Path │ │ Link Path │ - │ (< 1MB) │ │ (>= 1MB) │ - │ │ │ │ - │ • Serialize │ │ • Serialize │ - │ to buffer │ │ to buffer │ - │ • Base64/JSON│ │ • Upload to │ - │ encode │ │ HTTP Server│ - │ • Publish to │ │ • Publish to │ - │ NATS │ │ NATS with │ - │ (in msg) │ │ URL │ - └──────────────┘ └──────────────┘ -``` - ---- - -## Platform Comparison Matrix - -| Feature | Julia | JavaScript | Python | MicroPython | -|---------|-------|------------|--------|-------------| -| **Multiple Dispatch** | ✅ Native | ❌ (Prototypes) | ❌ (Overload via `@overload`) | ❌ | -| **Async/Await** | ❌ (Tasks) | ✅ Native | ✅ Native | ⚠️ (uasyncio) | -| **Type Safety** | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ | -| **Memory Management** | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) | -| **Arrow IPC** | ✅ Native | ✅ (arrow package) | ✅ (pyarrow) | ❌ | -| **JSON Serialization** | ✅ (JSON.jl) | ✅ (native) | ✅ (json) | ✅ (json) | -| **arrowtable Support** | ✅ | ✅ | ✅ | ❌ | -| **jsontable Support** | ✅ | ✅ | ✅ | ❌ | -| **Direct Transport** | ✅ | ✅ | ✅ | ✅ | -| **Link Transport** | ✅ | ✅ | ✅ | ⚠️ (Limited) | -| **Handler Functions** | ✅ | ✅ | ✅ | ✅ | -| **Cross-Platform API** | ✅ | ✅ | ✅ | ✅ | - ---- - -## Platform-Specific Architecture Patterns - -### Julia: Multiple Dispatch Pattern - -Julia leverages multiple dispatch for type-specific implementations: - -- **Function overloading** based on argument types -- **Struct-based data models** with explicit types -- **Native Arrow IPC** support via Arrow.jl - -### JavaScript: Prototype + Async Pattern - -JavaScript uses async/await for non-blocking I/O: - -- **Class-based NATS client** for connection management -- **Module-level utility functions** for serialization -- **Native ArrayBuffer** for binary data handling - -### Python: Class-Based Pattern - -Python uses classes for stateful operations: - -- **Class-based NATSBridge** with type hints -- **Dataclasses** for structured data (MsgPayloadV1, MsgEnvelopeV1) -- **Async/await** for I/O operations - -### MicroPython: Synchronous Pattern - -MicroPython has significant constraints: - -- **Synchronous API** (no async/await) -- **Memory-constrained** (256KB - 1MB) -- **Limited payload support** (no tables, max 50KB) - ---- - -## Cross-Platform Compatibility Notes - -### 1. Payload Type Consistency - -All platforms use the same payload type values for tabular data: - -| Platform | Table Types | -|----------|-------------| -| Julia | `"arrowtable"`, `"jsontable"` | -| JavaScript | `"arrowtable"`, `"jsontable"` | -| Python | `"arrowtable"`, `"jsontable"` | -| MicroPython | Not supported | - - -### 2. Direct Transport Encoding Field - -The encoding field in direct transport payloads differs between platforms: - -| Platform | Encoding for Direct Transport | -|----------|-------------------------------| -| Julia | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` | -| JavaScript | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` | -| Python | Always `"base64"` for all direct transport payloads | -| MicroPython | Always `"base64"` for all direct transport payloads | - -**Impact:** The encoding field may not accurately reflect the original serialization format when using Python or MicroPython. - -### 3. MicroPython Limitations - -MicroPython has significant constraints that affect feature support: - -| Feature | Desktop Platforms | MicroPython | -|---------|-------------------|-------------| -| `arrowtable` | ✅ | ❌ (not supported - memory constraints) | -| `jsontable` | ✅ | ❌ (not supported - memory constraints) | -| `table` | ✅ | ❌ (not supported - memory constraints) | -| Async/await | ✅ | ❌ (synchronous only) | -| File upload/download | ✅ | ⚠️ (placeholder implementations) | -| MAX_PAYLOAD_SIZE | 1MB+ | 50KB (hard limit) | -| DEFAULT_SIZE_THRESHOLD | 1MB | 100KB | - -**Impact:** MicroPython should only be used for small payloads with direct transport. File server operations are not fully implemented. - ---- - -## Configuration - -### Environment Variables - -| Variable | Default | Description | -|----------|---------|-------------| -| `NATS_URL` | `nats://localhost:4222` | NATS server URL | -| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | -| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) | - -### MicroPython-Specific Configuration - -```python -# micropython.conf -NATS_URL = "nats://broker.local:4222" -FILESERVER_URL = "http://fileserver.local:8080" -SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices -MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython -``` - ---- - -## Performance Considerations - -### Zero-Copy Reading - -| Platform | Strategy | -|----------|----------| -| **Julia** | `Arrow.read()` with memory-mapped files | -| **JavaScript** | `ArrayBuffer` with `DataView` | -| **Python** | `pyarrow` memory mapping | -| **MicroPython** | Not available (streaming only) | - -### Exponential Backoff - -All platforms implement exponential backoff for HTTP downloads: - -``` -delay = base_delay -for attempt in 1:max_retries: - try: - response = fetch(url) - if success: return response - except: - if attempt < max_retries: - sleep(delay) - delay = min(delay * 2, max_delay) -``` - -### Correlation ID Logging - -All platforms use correlation IDs for distributed tracing: - -``` -[timestamp] [Correlation: abc123] Message published to subject -``` - -### Serialization Performance Comparison - -| Format | Use Case | Pros | Cons | -|--------|----------|------|------| -| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library | -| `jsontable` | Small/medium tabular data | Human-readable, universal support | Slower, larger size, no schema | -| `table` (Python) | Large tabular data | Fast, zero-copy, schema-preserving | Python-specific, requires pyarrow | - ---- - -## Summary - -This cross-platform NATS bridge provides: - -1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across Julia, JavaScript, and Python/MicroPython -2. **Idiomatic Implementations**: - - Julia: Multiple dispatch and struct-based design - - JavaScript: Async/await and prototype-based utilities - - Python: Class-based design with type hints - - MicroPython: Synchronous API with memory constraints -3. **Message Format Consistency**: Identical `msg_envelope_v1` and `msg_payload_v1` JSON schemas -4. **Handler Abstraction**: File server operations abstracted through configurable handlers -5. **Platform-Specific Optimizations**: - - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data - - **JSON** (`jsontable`): Universal human-readable format for smaller tables - - **Python table**: Unified table type for Python-specific implementations - - Streaming support in MicroPython - -The Julia implementation serves as the **ground truth** for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms. - -### Datatype Summary - -| Datatype | Serialization | Use Case | Encoding | Supported Platforms | -|----------|---------------|----------|----------|---------------------| -| `text` | UTF-8 bytes | Text messages, chat content | `utf-8` → `base64` | All | -| `dictionary` | JSON | Structured key-value data, config | `json` → `base64` | All | -| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python | -| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python | -| `table` | Apache Arrow IPC | Python's unified table type | `arrow-ipc` → `base64` | Python | -| `image` | Binary | Image files (JPEG, PNG, etc.) | `binary` → `base64` | All | -| `audio` | Binary | Audio files (WAV, MP3, etc.) | `binary` → `base64` | All | -| `video` | Binary | Video files (MP4, AVI, etc.) | `binary` → `base64` | All | -| `binary` | Binary | Generic binary data, files | `binary` → `base64` | All | diff --git a/docs/implementation.md b/docs/implementation.md deleted file mode 100644 index 8467f78..0000000 --- a/docs/implementation.md +++ /dev/null @@ -1,1859 +0,0 @@ -# Cross-Platform Implementation Guide: Bi-Directional Data Bridge - -## Overview - -This document describes the detailed implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. - -**Supported Platforms:** -- **Julia** - Ground truth implementation (reference) -- **JavaScript** - Node.js and browser implementation -- **Python/MicroPython** - Desktop and embedded implementation - ---- - -## Cross-Platform Compatibility Notes - -### 1. Python Payload Type Naming - -The Python implementation uses `"table"` as a single payload type for both Arrow and JSON table serialization, while Julia and JavaScript use separate types (`"arrowtable"` and `"jsontable"`): - -| Platform | Table Types | -|----------|-------------| -| Julia | `"arrowtable"`, `"jsontable"` | -| JavaScript | `"arrowtable"`, `"jsontable"` | -| Python | `"table"` (single type) | -| MicroPython | Not supported | - -**Impact:** When exchanging data between Python and Julia/JavaScript, the payload type will differ. Python code should use `"table"` while Julia/JavaScript code should use `"arrowtable"` or `"jsontable"`. - -### 2. Direct Transport Encoding Field - -The encoding field in direct transport payloads differs between platforms: - -| Platform | Encoding for Direct Transport | -|----------|-------------------------------| -| Julia | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` | -| JavaScript | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` | -| Python | Always `"base64"` for all direct transport payloads | -| MicroPython | Always `"base64"` for all direct transport payloads | - -**Impact:** The encoding field may not accurately reflect the original serialization format when using Python or MicroPython. - -### 3. MicroPython Limitations - -MicroPython has significant constraints that affect feature support: - -| Feature | Desktop Platforms | MicroPython | -|---------|-------------------|-------------| -| `arrowtable` | ✅ | ❌ (not supported - memory constraints) | -| `jsontable` | ✅ | ❌ (not supported - memory constraints) | -| `table` | ✅ | ❌ (not supported - memory constraints) | -| Async/await | ✅ | ❌ (synchronous only) | -| File upload/download | ✅ | ⚠️ (placeholder implementations) | -| MAX_PAYLOAD_SIZE | 1MB+ | 50KB (hard limit) | -| DEFAULT_SIZE_THRESHOLD | 1MB | 100KB | - -**Impact:** MicroPython should only be used for small payloads with direct transport. File server operations are not fully implemented. - ---- - -## Implementation Files - -| Language | Implementation File | Description | -|----------|---------------------|-------------| -| **Julia** | [`src/NATSBridge.jl`](../src/NATSBridge.jl) | Full Julia implementation with Arrow IPC support | -| **JavaScript** | `src/natsbridge.js` | Node.js/browser implementation | -| **Python** | `src/natsbridge.py` | Desktop Python implementation | -| **MicroPython** | `src/natsbridge_mpy.py` | MicroPython implementation (limited features) | - ---- - -## File Server Handler Architecture - -The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). - -### Handler Function Signatures - -#### Julia - -```julia -# Upload handler - uploads data to file server and returns URL -fileserver_upload_handler( - fileserver_url::String, - dataname::String, - data::Vector{UInt8} -)::Dict{String, Any} - -# Download handler - fetches data from file server URL with exponential backoff -fileserver_download_handler( - url::String, - max_retries::Int, - base_delay::Int, - max_delay::Int, - correlation_id::String -)::Vector{UInt8} -``` - -#### JavaScript - -```javascript -// Upload handler - async function -async function fileserver_upload_handler( - fileserver_url, - dataname, - data // Uint8Array -) { - // Returns: { status, uploadid, fileid, url } -} - -// Download handler - async function -async function fileserver_download_handler( - url, - max_retries, - base_delay, - max_delay, - correlation_id -) { - // Returns: Uint8Array -} -``` - -#### Python - -```python -# Upload handler - async function -async def fileserver_upload_handler( - fileserver_url: str, - dataname: str, - data: bytes -) -> Dict[str, Any]: - """ - Upload data to file server. - - Returns: - Dict with keys: 'status', 'uploadid', 'fileid', 'url' - """ - pass - -# Download handler - async function -async def fileserver_download_handler( - url: str, - max_retries: int, - base_delay: int, - max_delay: int, - correlation_id: str -) -> bytes: - """ - Download data from URL with exponential backoff. - - Returns: - Downloaded bytes - """ - pass -``` - -#### MicroPython - -```python -# Upload handler - synchronous (no async in MicroPython) -def fileserver_upload_handler( - fileserver_url: str, - dataname: str, - data: bytearray -) -> Dict: - """ - Upload data to file server (synchronous). - - Returns: - Dict with keys: 'status', 'url' - """ - pass - -# Download handler - synchronous -def fileserver_download_handler( - url: str, - max_retries: int, - base_delay: int, - max_delay: int, - correlation_id: str -) -> bytearray: - """ - Download data from URL with exponential backoff (synchronous). - - Returns: - Downloaded bytes - """ - pass -``` - ---- - -## Multi-Payload Support (Standard API) - -The system uses a **standardized list-of-tuples format** for all payload operations across all platforms. - -### API Standard - -``` -# Input format for smartsend (always a list of tuples with type info) -[(dataname1, data1, type1), (dataname2, data2, type2), ...] - -# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples) -{ - "correlation_id": "...", - "msg_id": "...", - "timestamp": "...", - "send_to": "...", - "msg_purpose": "...", - "sender_name": "...", - "sender_id": "...", - "receiver_name": "...", - "receiver_id": "...", - "reply_to": "...", - "reply_to_msg_id": "...", - "broker_url": "...", - "metadata": {...}, - "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] -} -``` - -### Supported Types - -| Type | Julia | JavaScript | Python | MicroPython | -|------|-------|------------|--------|-------------| -| `text` | `String` | `string` | `str` | `str` | -| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | -| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) | -| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | ⚠️ (limited) | -| `table` | ❌ | ❌ | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ | -| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | -| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | -| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | -| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` | - -**Note:** Python uses `"table"` as a single type for both Arrow and JSON table serialization. When exchanging data between Python and Julia/JavaScript, ensure the payload type is correctly translated (`"table"` ↔ `"arrowtable"` or `"jsontable"`). - ---- - -## Platform-Specific Implementations - -### Julia Implementation - -#### Module Structure - -```julia -module NATSBridge - using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64 - - # Constants - const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - const DEFAULT_BROKER_URL = "nats://localhost:4222" - const DEFAULT_FILESERVER_URL = "http://localhost:8080" - - # Structs - struct msg_payload_v1 - id::String - dataname::String - payload_type::String - transport::String - encoding::String - size::Integer - data::Any - metadata::Dict{String, Any} - end - - struct msg_envelope_v1 - correlation_id::String - msg_id::String - timestamp::String - send_to::String - msg_purpose::String - sender_name::String - sender_id::String - receiver_name::String - receiver_id::String - reply_to::String - reply_to_msg_id::String - broker_url::String - metadata::Dict{String, Any} - payloads::Vector{msg_payload_v1} - end - - # Main functions - function smartsend(...) end - function smartreceive(...) end - - # Utility functions - function _serialize_data(...) end - function _deserialize_data(...) end - function envelope_to_json(...) end - function log_trace(...) end - - # File server handlers - function plik_oneshot_upload(...) end - function _fetch_with_backoff(...) end - function publish_message(...) end - - # Internal helpers - function _get_payload_bytes(...) end -end -``` - -#### Multiple Dispatch Pattern - -Julia leverages multiple dispatch for type-specific implementations: - -```julia -# publish_message has two overloads based on argument types -function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) - conn = NATS.connect(broker_url) - publish_message(conn, subject, message, correlation_id) -end - -function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) - try - NATS.publish(conn, subject, message) - log_trace(correlation_id, "Message published to $subject") - finally - NATS.drain(conn) - end -end - -# Type-specific serialization -function _serialize_data(data::String, payload_type::String) - # Text handling - return Vector{UInt8}(data) -end - -function _serialize_data(data::Dict, payload_type::String) - # Dictionary handling - json_str = JSON.json(data) - return Vector{UInt8}(json_str) -end - -function _serialize_data(data::DataFrame, payload_type::String) - # Table handling - arrowtable - io = IOBuffer() - Arrow.write(io, data) - return take!(io) -end -``` - -#### smartsend Implementation - -```julia -function smartsend( - subject::String, - data::AbstractArray{Tuple{String, T1, String}, 1}; - broker_url::String = DEFAULT_BROKER_URL, - fileserver_url = DEFAULT_FILESERVER_URL, - fileserver_upload_handler::Function = plik_oneshot_upload, - size_threshold::Int = DEFAULT_SIZE_THRESHOLD, - correlation_id::String = string(uuid4()), - msg_purpose::String = "chat", - sender_name::String = "NATSBridge", - receiver_name::String = "", - receiver_id::String = "", - reply_to::String = "", - reply_to_msg_id::String = "", - is_publish::Bool = true, - NATS_connection::Union{NATS.Connection, Nothing} = nothing, - msg_id::String = string(uuid4()), - sender_id::String = string(uuid4()) -)::Tuple{msg_envelope_v1, String} where {T1<:Any} - - log_trace(correlation_id, "Starting smartsend for subject: $subject") - - # Process each payload in the list - payloads = msg_payload_v1[] - for (dataname, payload_data, payload_type) in data - # Serialize data based on type - payload_bytes = _serialize_data(payload_data, payload_type) - - payload_size = length(payload_bytes) - log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes") - - # Decision: Direct vs Link - if payload_size < size_threshold - # Direct path - Base64 encode and send via NATS - payload_b64 = Base64.base64encode(payload_bytes) - log_trace(correlation_id, "Using direct transport for $payload_size bytes") - - payload = msg_payload_v1( - payload_b64, - payload_type; - id = string(uuid4()), - dataname = dataname, - transport = "direct", - encoding = "base64", - size = payload_size, - metadata = Dict{String, Any}("payload_bytes" => payload_size) - ) - push!(payloads, payload) - else - # Link path - Upload to HTTP server, send URL via NATS - log_trace(correlation_id, "Using link transport, uploading to fileserver") - - response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) - - if response["status"] != 200 - error("Failed to upload data to fileserver: $(response["status"])") - end - - url = response["url"] - log_trace(correlation_id, "Uploaded to URL: $url") - - payload = msg_payload_v1( - url, - payload_type; - id = string(uuid4()), - dataname = dataname, - transport = "link", - encoding = "none", - size = payload_size, - metadata = Dict{String, Any}() - ) - push!(payloads, payload) - end - end - - # Create msg_envelope_v1 with all payloads - # Note: First positional argument is "send_to" (the NATS subject), not "subject" - env = msg_envelope_v1( - subject, # send_to: NATS subject to publish to - payloads; - correlation_id = correlation_id, - msg_id = msg_id, - msg_purpose = msg_purpose, - sender_name = sender_name, - sender_id = sender_id, - receiver_name = receiver_name, - receiver_id = receiver_id, - reply_to = reply_to, - reply_to_msg_id = reply_to_msg_id, - broker_url = broker_url, - metadata = Dict{String, Any}(), - ) - - env_json_str = envelope_to_json(env) - - if is_publish == false - # skip publish - elseif is_publish == true && NATS_connection === nothing - publish_message(broker_url, subject, env_json_str, correlation_id) - elseif is_publish == true && NATS_connection !== nothing - publish_message(NATS_connection, subject, env_json_str, correlation_id) - end - - return (env, env_json_str) -end -``` - -#### smartreceive Implementation - -```julia -function smartreceive( - msg::NATS.Msg; - fileserver_download_handler::Function = _fetch_with_backoff, - max_retries::Int = 5, - base_delay::Int = 100, - max_delay::Int = 5000 -)::JSON.Object{String, Any} - # Parse the JSON envelope - env_json_obj = JSON.parse(String(msg.payload)) - log_trace(env_json_obj["correlation_id"], "Processing received message") - - # Process all payloads in the envelope - payloads_list = Tuple{String, Any, String}[] - - num_payloads = length(env_json_obj["payloads"]) - - for i in 1:num_payloads - payload = env_json_obj["payloads"][i] - transport = String(payload["transport"]) - dataname = String(payload["dataname"]) - - if transport == "direct" - log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'") - - # Extract base64 payload from the payload - payload_b64 = String(payload["data"]) - - # Decode Base64 payload - payload_bytes = Base64.base64decode(payload_b64) - - # Deserialize based on type - data_type = String(payload["payload_type"]) - data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"]) - - push!(payloads_list, (dataname, data, data_type)) - elseif transport == "link" - # Extract download URL from the payload - url = String(payload["data"]) - log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") - - # Fetch with exponential backoff using the download handler - downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"]) - - # Deserialize based on type - data_type = String(payload["payload_type"]) - data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"]) - - push!(payloads_list, (dataname, data, data_type)) - else - error("Unknown transport type for payload '$dataname': $(transport)") - end - end - env_json_obj["payloads"] = payloads_list - return env_json_obj -end -``` - -#### _serialize_data Implementation - -```julia -function _serialize_data(data::Any, payload_type::String) - if payload_type == "text" - if isa(data, String) - data_bytes = Vector{UInt8}(data) - return data_bytes - else - error("Text data must be a String") - end - elseif payload_type == "dictionary" - json_str = JSON.json(data) - json_str_bytes = Vector{UInt8}(json_str) - return json_str_bytes - elseif payload_type == "arrowtable" - # Serialize DataFrame to Arrow IPC format - io = IOBuffer() - Arrow.write(io, data) - return take!(io) - elseif payload_type == "jsontable" - # Serialize to JSON - # data is Vector{NamedTuple} or Vector{Dict} - json_str = JSON.json(data) - return Vector{UInt8}(json_str) - elseif payload_type == "image" - if isa(data, Vector{UInt8}) - return data - else - error("Image data must be Vector{UInt8}") - end - elseif payload_type == "audio" - if isa(data, Vector{UInt8}) - return data - else - error("Audio data must be Vector{UInt8}") - end - elseif payload_type == "video" - if isa(data, Vector{UInt8}) - return data - else - error("Video data must be Vector{UInt8}") - end - elseif payload_type == "binary" - if isa(data, IOBuffer) - return take!(data) - elseif isa(data, Vector{UInt8}) - return data - else - error("Binary data must be binary (Vector{UInt8} or IOBuffer)") - end - else - error("Unknown payload_type: $payload_type") - end -end -``` - -#### _deserialize_data Implementation - -```julia -function _deserialize_data( - data::Vector{UInt8}, - payload_type::String, - correlation_id::String -) - if payload_type == "text" - return String(data) - elseif payload_type == "dictionary" - json_str = String(data) - return JSON.parse(json_str) - elseif payload_type == "arrowtable" - # Deserialize from Arrow IPC format - io = IOBuffer(data) - arrow_table = Arrow.Table(io) - return arrow_table - elseif payload_type == "jsontable" - # Deserialize from JSON format - # Returns Vector{NamedTuple} or Vector{Dict} - json_str = String(data) - parsed = JSON.parse(json_str) - return parsed - elseif payload_type == "image" - return data - elseif payload_type == "audio" - return data - elseif payload_type == "video" - return data - elseif payload_type == "binary" - return data - else - error("Unknown payload_type: $payload_type") - end -end -``` - -#### _fetch_with_backoff Implementation - -```julia -function _fetch_with_backoff( - url::String, - max_retries::Int, - base_delay::Int, - max_delay::Int, - correlation_id::String -) - delay = base_delay - for attempt in 1:max_retries - try - response = HTTP.request("GET", url) - if response.status == 200 - log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") - return response.body - else - error("Failed to fetch: $(response.status)") - end - catch e - log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") - - if attempt < max_retries - sleep(delay / 1000.0) - delay = min(delay * 2, max_delay) - end - end - end - - error("Failed to fetch data after $max_retries attempts") -end -``` - -#### plik_oneshot_upload Implementation - -**Overload 1: Upload from binary data** - -```julia -function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8}) - # Get upload id - url_getUploadID = "$file_server_url/upload" - headers = ["Content-Type" => "application/json"] - body = """{ "OneShot" : true }""" - http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) - response_json = JSON.parse(http_response.body) - uploadid = response_json["id"] - uploadtoken = response_json["uploadToken"] - - # Upload file - file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") - url_upload = "$file_server_url/file/$uploadid" - headers = ["X-UploadToken" => uploadtoken] - - form = HTTP.Form(Dict( - "file" => file_multipart - )) - - http_response = nothing - try - http_response = HTTP.post(url_upload, headers, form) - catch e - @error "Request failed" exception=e - end - response_json = JSON.parse(http_response.body) - fileid = response_json["id"] - - url = "$file_server_url/file/$uploadid/$fileid/$dataname" - - return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) -end -``` - -**Overload 2: Upload from file path** - -```julia -function plik_oneshot_upload(file_server_url::String, filepath::String) - # Get upload id - filename = basename(filepath) - url_getUploadID = "$file_server_url/upload" - headers = ["Content-Type" => "application/json"] - body = """{ "OneShot" : true }""" - http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) - response_json = JSON.parse(http_response.body) - - uploadid = response_json["id"] - uploadtoken = response_json["uploadToken"] - - # Upload file - url_upload = "$file_server_url/file/$uploadid" - headers = ["X-UploadToken" => uploadtoken] - http_response = open(filepath, "r") do file_stream - form = HTTP.Form(Dict("file" => file_stream)) - - # Adding status_exception=false prevents 4xx/5xx from triggering 'catch' - HTTP.post(url_upload, headers, form; status_exception = false) - end - - if !isnothing(http_response) && http_response.status == 200 - # Success - response already logged by caller - else - error("Failed to upload file: server returned status $(http_response.status)") - end - response_json = JSON.parse(http_response.body) - fileid = response_json["id"] - - # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" - url = "$file_server_url/file/$uploadid/$fileid/$filename" - - return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) -end -``` - ---- - -### JavaScript Implementation - -#### Module Structure - -```javascript -// natsbridge.js -const nats = require('nats'); -const crypto = require('crypto'); -const fetch = require('node-fetch'); - -// UUID generation using built-in crypto module -const uuidv4 = () => crypto.randomUUID(); - -const DEFAULT_SIZE_THRESHOLD = 1_000_000; -const DEFAULT_BROKER_URL = 'nats://localhost:4222'; -const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; - -class NATSClient { - constructor(url) { - this.url = url; - this.connection = null; - } - - async connect() { - this.connection = await nats.connect({ servers: this.url }); - return this.connection; - } - - async publish(subject, message) { - if (!this.connection) { - await this.connect(); - } - await this.connection.publish(subject, message); - } - - async close() { - if (this.connection) { - this.connection.close(); - } - } -} - -async function smartsend(subject, data, options = {}) { - // Implementation -} - -async function smartreceive(msg, options = {}) { - // Implementation -} - -module.exports = { - NATSClient, - smartsend, - smartreceive, - plikOneshotUpload, - fetchWithBackoff -}; -``` - -#### smartsend Implementation - -```javascript -const nats = require('nats'); -const crypto = require('crypto'); -const fetch = require('node-fetch'); -const arrow = require('apache-arrow'); - -// UUID generation using built-in crypto module -const uuidv4 = () => crypto.randomUUID(); - -const DEFAULT_SIZE_THRESHOLD = 1_000_000; -const DEFAULT_BROKER_URL = 'nats://localhost:4222'; -const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; - -async function smartsend(subject, data, options = {}) { - const { - broker_url = DEFAULT_BROKER_URL, - fileserver_url = DEFAULT_FILESERVER_URL, - fileserver_upload_handler = plikOneshotUpload, - size_threshold = DEFAULT_SIZE_THRESHOLD, - correlation_id = uuidv4(), - msg_purpose = 'chat', - sender_name = 'NATSBridge', - receiver_name = '', - receiver_id = '', - reply_to = '', - reply_to_msg_id = '', - is_publish = true, - nats_connection = null, - msg_id = uuidv4(), - sender_id = uuidv4() - } = options; - - console.log(`[Correlation: ${correlation_id}] Starting smartsend for subject: ${subject}`); - - // Process payloads - const payloads = []; - for (const [dataname, payloadData, payloadType] of data) { - const payloadBytes = await serializeData(payloadData, payloadType); - const payloadSize = payloadBytes.byteLength; - - console.log(`[Correlation: ${correlation_id}] Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); - - if (payloadSize < size_threshold) { - // Direct path - const payloadB64 = bufferToBase64(payloadBytes); - console.log(`[Correlation: ${correlation_id}] Using direct transport for ${payloadSize} bytes`); - - payloads.push({ - id: uuidv4(), - dataname, - payload_type: payloadType, - transport: 'direct', - encoding: 'base64', - size: payloadSize, - data: payloadB64, - metadata: { payload_bytes: payloadSize } - }); - } else { - // Link path - console.log(`[Correlation: ${correlation_id}] Using link transport, uploading to fileserver`); - - const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); - - if (response.status !== 200) { - throw new Error(`Failed to upload data to fileserver: ${response.status}`); - } - - console.log(`[Correlation: ${correlation_id}] Uploaded to URL: ${response.url}`); - - payloads.push({ - id: uuidv4(), - dataname, - payload_type: payloadType, - transport: 'link', - encoding: 'none', - size: payloadSize, - data: response.url, - metadata: {} - }); - } - } - - // Build envelope - const env = { - correlation_id, - msg_id, - timestamp: new Date().toISOString(), - send_to: subject, - msg_purpose, - sender_name, - sender_id, - receiver_name, - receiver_id, - reply_to, - reply_to_msg_id, - broker_url, - metadata: {}, - payloads - }; - - const env_json_str = JSON.stringify(env); - - if (is_publish) { - if (nats_connection) { - await publishMessage(nats_connection, subject, env_json_str, correlation_id); - } else { - await publishMessage(broker_url, subject, env_json_str, correlation_id); - } - } - - return [env, env_json_str]; -} -``` - -#### serializeData Implementation - -```javascript -const arrow = require('apache-arrow'); - -async function serializeData(data, payload_type) { - if (payload_type === 'text') { - if (typeof data === 'string') { - return Buffer.from(data, 'utf8'); - } else { - throw new Error('Text data must be a string'); - } - } else if (payload_type === 'dictionary') { - const jsonStr = JSON.stringify(data); - return Buffer.from(jsonStr, 'utf8'); - } else if (payload_type === 'arrowtable') { - // Convert Array to Arrow IPC - if (!Array.isArray(data) || data.length === 0) { - throw new Error('arrowtable data must be a non-empty array of objects'); - } - - // Create schema from first row - const schemaFields = Object.keys(data[0]).map(key => - new arrow.Field(key, arrow.any()) - ); - const schema = new arrow.Schema(schemaFields); - - // Create writer - const writer = new arrow.RecordBatchWriter([schema]); - - // Write rows - for (const row of data) { - const recordBatch = arrow.recordBatch.fromObjects([row], schema); - writer.write(recordBatch); - } - await writer.close(); - - // Read buffer - return writer.toBuffer(); - } else if (payload_type === 'jsontable') { - // Serialize directly to JSON - const jsonStr = JSON.stringify(data); - return Buffer.from(jsonStr, 'utf8'); - } else if (payload_type === 'image') { - if (data instanceof Uint8Array || Buffer.isBuffer(data)) { - return Buffer.from(data); - } else { - throw new Error('Image data must be Uint8Array or Buffer'); - } - } else if (payload_type === 'audio') { - if (data instanceof Uint8Array || Buffer.isBuffer(data)) { - return Buffer.from(data); - } else { - throw new Error('Audio data must be Uint8Array or Buffer'); - } - } else if (payload_type === 'video') { - if (data instanceof Uint8Array || Buffer.isBuffer(data)) { - return Buffer.from(data); - } else { - throw new Error('Video data must be Uint8Array or Buffer'); - } - } else if (payload_type === 'binary') { - if (data instanceof Uint8Array || Buffer.isBuffer(data)) { - return Buffer.from(data); - } else { - throw new Error('Binary data must be Uint8Array or Buffer'); - } - } else { - throw new Error(`Unknown payload_type: ${payload_type}`); - } -} - -function bufferToBase64(buffer) { - return buffer.toString('base64'); -} -``` - -#### deserializeData Implementation - -```javascript -const arrow = require('apache-arrow'); - -async function deserializeData(data, payload_type, correlation_id) { - if (payload_type === 'text') { - return Buffer.from(data).toString('utf8'); - } else if (payload_type === 'dictionary') { - const jsonStr = Buffer.from(data).toString('utf8'); - return JSON.parse(jsonStr); - } else if (payload_type === 'arrowtable') { - // Deserialize from Arrow IPC - const buffer = Buffer.from(data); - const table = arrow.tableFromRawBytes(buffer); - return table; - } else if (payload_type === 'jsontable') { - // Deserialize from JSON - returns Array - const jsonStr = Buffer.from(data).toString('utf8'); - return JSON.parse(jsonStr); - } else if (payload_type === 'image') { - return Buffer.from(data); - } else if (payload_type === 'audio') { - return Buffer.from(data); - } else if (payload_type === 'video') { - return Buffer.from(data); - } else if (payload_type === 'binary') { - return Buffer.from(data); - } else { - throw new Error(`Unknown payload_type: ${payload_type}`); - } -} -``` - -#### fetchWithBackoff Implementation - -```javascript -async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) { - let delay = base_delay; - - for (let attempt = 1; attempt <= max_retries; attempt++) { - try { - const response = await fetch(url); - - if (response.status === 200) { - console.log(`[Correlation: ${correlation_id}] Successfully fetched data from ${url} on attempt ${attempt}`); - return await response.arrayBuffer(); - } else { - throw new Error(`Failed to fetch: ${response.status}`); - } - } catch (e) { - console.log(`[Correlation: ${correlation_id}] Attempt ${attempt} failed: ${e.constructor.name}`); - - if (attempt < max_retries) { - await new Promise(resolve => setTimeout(resolve, delay)); - delay = Math.min(delay * 2, max_delay); - } - } - } - - throw new Error(`Failed to fetch data after ${max_retries} attempts`); -} -``` - -#### plikOneshotUpload Implementation - -```javascript -async function plikOneshotUpload(file_server_url, dataname, data) { - // Get upload id - const url_getUploadID = `${file_server_url}/upload`; - const headers = { 'Content-Type': 'application/json' }; - const body = JSON.stringify({ OneShot: true }); - - const http_response = await fetch(url_getUploadID, { - method: 'POST', - headers, - body - }); - - const response_json = await http_response.json(); - const uploadid = response_json.id; - const uploadtoken = response_json.uploadToken; - - // Upload file - const url_upload = `${file_server_url}/file/${uploadid}`; - const form = new FormData(); - const blob = new Blob([data]); - form.append('file', blob, dataname); - - const upload_headers = { - 'X-UploadToken': uploadtoken - }; - - const upload_response = await fetch(url_upload, { - method: 'POST', - headers: upload_headers, - body: form - }); - - const upload_json = await upload_response.json(); - const fileid = upload_json.id; - - const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`; - - return { - status: upload_response.status, - uploadid, - fileid, - url - }; -} -``` - ---- - -### Python Implementation - -#### Module Structure - -```python -# natsbridge.py -import asyncio -import base64 -import json -import uuid -import time -from typing import Any, Dict, List, Tuple, Union, Callable -from dataclasses import dataclass, field -from datetime import datetime - -try: - import pyarrow as arrow - import pyarrow.parquet as pq - ARROW_AVAILABLE = True -except ImportError: - ARROW_AVAILABLE = False - -try: - import aiohttp - import nats - from nats.aio.client import Client as NATSClient - NATS_AVAILABLE = True -except ImportError: - NATS_AVAILABLE = False - - -DEFAULT_SIZE_THRESHOLD = 1_000_000 -DEFAULT_BROKER_URL = "nats://localhost:4222" -DEFAULT_FILESERVER_URL = "http://localhost:8080" - - -@dataclass -class MsgPayloadV1: - """Message payload structure.""" - id: str - dataname: str - payload_type: str - transport: str - encoding: str - size: int - data: Union[str, bytes] - metadata: Dict[str, Any] = field(default_factory=dict) - - -@dataclass -class MsgEnvelopeV1: - """Message envelope structure.""" - correlation_id: str - msg_id: str - timestamp: str - send_to: str - msg_purpose: str - sender_name: str - sender_id: str - receiver_name: str - receiver_id: str - reply_to: str - reply_to_msg_id: str - broker_url: str - metadata: Dict[str, Any] = field(default_factory=dict) - payloads: List[MsgPayloadV1] = field(default_factory=list) - - -class NATSBridge: - """Cross-platform NATS bridge implementation.""" - - def __init__(self, broker_url: str = None, fileserver_url: str = None): - self.broker_url = broker_url or DEFAULT_BROKER_URL - self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL - self._nats_client: NATSClient = None - - async def smartsend(self, subject: str, data: List[Tuple[str, Any, str]], **kwargs) -> Tuple[Dict, str]: - """Send data via NATS.""" - pass - - async def smartreceive(self, msg: Any, **kwargs) -> Dict: - """Receive and process NATS message.""" - pass -``` - -#### smartsend Implementation - -```python -import asyncio -import base64 -import json -import uuid -from typing import Any, Dict, List, Tuple, Union, Callable -from datetime import datetime - -DEFAULT_SIZE_THRESHOLD = 1_000_000 -DEFAULT_BROKER_URL = "nats://localhost:4222" -DEFAULT_FILESERVER_URL = "http://localhost:8080" - - -async def smartsend( - subject: str, - data: List[Tuple[str, Any, str]], - broker_url: str = DEFAULT_BROKER_URL, - fileserver_url: str = DEFAULT_FILESERVER_URL, - fileserver_upload_handler: Callable = plik_oneshot_upload, - size_threshold: int = DEFAULT_SIZE_THRESHOLD, - correlation_id: str = None, - msg_purpose: str = "chat", - sender_name: str = "NATSBridge", - receiver_name: str = "", - receiver_id: str = "", - reply_to: str = "", - reply_to_msg_id: str = "", - is_publish: bool = True, - nats_connection: Any = None, - msg_id: str = None, - sender_id: str = None -) -> Tuple[Dict, str]: - """ - Send data via NATS with automatic transport selection. - - Args: - subject: NATS subject to publish to - data: List of (dataname, data, type) tuples - **kwargs: Additional options - - Returns: - Tuple of (env, env_json_str) - """ - if correlation_id is None: - correlation_id = str(uuid.uuid4()) - if msg_id is None: - msg_id = str(uuid.uuid4()) - if sender_id is None: - sender_id = str(uuid.uuid4()) - - print(f"[Correlation: {correlation_id}] Starting smartsend for subject: {subject}") - - # Process payloads - payloads = [] - for dataname, payload_data, payload_type in data: - payload_bytes = _serialize_data(payload_data, payload_type) - payload_size = len(payload_bytes) - - print(f"[Correlation: {correlation_id}] Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes") - - if payload_size < size_threshold: - # Direct path - payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') - print(f"[Correlation: {correlation_id}] Using direct transport for {payload_size} bytes") - - payloads.append({ - 'id': str(uuid.uuid4()), - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'direct', - 'encoding': 'base64', - 'size': payload_size, - 'data': payload_b64, - 'metadata': {'payload_bytes': payload_size} - }) - else: - # Link path - print(f"[Correlation: {correlation_id}] Using link transport, uploading to fileserver") - - response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes) - - if response['status'] != 200: - raise Exception(f"Failed to upload data to fileserver: {response['status']}") - - print(f"[Correlation: {correlation_id}] Uploaded to URL: {response['url']}") - - payloads.append({ - 'id': str(uuid.uuid4()), - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'link', - 'encoding': 'none', - 'size': payload_size, - 'data': response['url'], - 'metadata': {} - }) - - # Build envelope - env = { - 'correlation_id': correlation_id, - 'msg_id': msg_id, - 'timestamp': datetime.utcnow().isoformat() + 'Z', - 'send_to': subject, - 'msg_purpose': msg_purpose, - 'sender_name': sender_name, - 'sender_id': sender_id, - 'receiver_name': receiver_name, - 'receiver_id': receiver_id, - 'reply_to': reply_to, - 'reply_to_msg_id': reply_to_msg_id, - 'broker_url': broker_url, - 'metadata': {}, - 'payloads': payloads - } - - env_json_str = json.dumps(env) - - if is_publish: - if nats_connection: - await publish_message(nats_connection, subject, env_json_str, correlation_id) - else: - await publish_message(broker_url, subject, env_json_str, correlation_id) - - return env, env_json_str -``` - -#### serializeData Implementation - -```python -import base64 -import json -from typing import Any - -try: - import pyarrow as arrow - import pyarrow.feather as feather - import pyarrow.ipc as ipc - ARROW_AVAILABLE = True -except ImportError: - ARROW_AVAILABLE = False - - -def _serialize_data(data: Any, payload_type: str) -> bytes: - """ - Serialize data to bytes based on type. - - Note: Python uses "table" as a single type for both Arrow and JSON table - serialization. Julia/JavaScript use separate "arrowtable" and "jsontable" types. - """ - if payload_type == 'text': - if isinstance(data, str): - return data.encode('utf-8') - else: - raise ValueError('Text data must be a string') - elif payload_type == 'dictionary': - json_str = json.dumps(data) - return json_str.encode('utf-8') - elif payload_type == 'table': - # Python uses "table" for both arrowtable and jsontable - if not ARROW_AVAILABLE: - raise RuntimeError('pyarrow not available for table serialization') - - import io - buf = io.BytesIO() - import pandas as pd - if isinstance(data, pd.DataFrame): - # Serialize DataFrame to Arrow - table = arrow.Table.from_pandas(data) - sink = ipc.new_file(buf, table.schema) - ipc.write_table(table, sink) - sink.close() - return buf.getvalue() - elif isinstance(data, arrow.Table): - sink = ipc.new_file(buf, data.schema) - ipc.write_table(data, sink) - sink.close() - return buf.getvalue() - else: - raise ValueError('Table data must be a pandas DataFrame or pyarrow Table') - elif payload_type in ('image', 'audio', 'video', 'binary'): - if isinstance(data, (bytes, bytearray)): - return bytes(data) - else: - raise ValueError(f'{payload_type} data must be bytes') - else: - raise ValueError(f'Unknown payload_type: {payload_type}') -``` - -#### deserializeData Implementation - -```python -import base64 -import json -from typing import Any - -try: - import pyarrow as arrow - import pyarrow.feather as feather - import pyarrow.ipc as ipc - ARROW_AVAILABLE = True -except ImportError: - ARROW_AVAILABLE = False - - -def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any: - """ - Deserialize bytes to data based on type. - - Note: Python uses "table" as a single type for both Arrow and JSON table - deserialization. Julia/JavaScript use separate "arrowtable" and "jsontable" types. - """ - if payload_type == 'text': - return data.decode('utf-8') - elif payload_type == 'dictionary': - json_str = data.decode('utf-8') - return json.loads(json_str) - elif payload_type == 'table': - # Python uses "table" for both arrowtable and jsontable - if not ARROW_AVAILABLE: - raise RuntimeError('pyarrow not available for table deserialization') - - import io - buf = io.BytesIO(data) - reader = ipc.open_file(buf) - return reader.read_all().to_pandas() - elif payload_type in ('image', 'audio', 'video', 'binary'): - return data - else: - raise ValueError(f'Unknown payload_type: {payload_type}') -``` - -#### fetchWithBackoff Implementation - -```python -import asyncio -import aiohttp -from typing import Callable - - -async def fetch_with_backoff( - url: str, - max_retries: int, - base_delay: int, - max_delay: int, - correlation_id: str -) -> bytes: - """Fetch URL with exponential backoff.""" - delay = base_delay - - for attempt in range(1, max_retries + 1): - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - print(f"[Correlation: {correlation_id}] Successfully fetched data from {url} on attempt {attempt}") - return await response.read() - else: - raise Exception(f"Failed to fetch: {response.status}") - except Exception as e: - print(f"[Correlation: {correlation_id}] Attempt {attempt} failed: {type(e).__name__}") - - if attempt < max_retries: - await asyncio.sleep(delay / 1000.0) - delay = min(delay * 2, max_delay) - - raise Exception(f"Failed to fetch data after {max_retries} attempts") -``` - -#### plikOneshotUpload Implementation - -```python -import aiohttp -import json -from typing import Dict, Any - - -async def plik_oneshot_upload( - file_server_url: str, - dataname: str, - data: bytes -) -> Dict[str, Any]: - """Upload data to plik server in one-shot mode.""" - - # Get upload id - async with aiohttp.ClientSession() as session: - url_getUploadID = f"{file_server_url}/upload" - headers = {'Content-Type': 'application/json'} - body = json.dumps({"OneShot": True}) - - async with session.post(url_getUploadID, headers=headers, data=body) as response: - response_json = await response.json() - uploadid = response_json['id'] - uploadtoken = response_json['uploadToken'] - - # Upload file - url_upload = f"{file_server_url}/file/{uploadid}" - headers = {'X-UploadToken': uploadtoken} - - form = aiohttp.FormData() - form.add_field('file', data, filename=dataname, content_type='application/octet-stream') - - async with session.post(url_upload, headers=headers, data=form) as upload_response: - upload_json = await upload_response.json() - fileid = upload_json['id'] - - url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}" - - return { - 'status': upload_response.status, - 'uploadid': uploadid, - 'fileid': fileid, - 'url': url - } -``` - ---- - -### MicroPython Implementation - -#### Limitations - -MicroPython has significant constraints compared to desktop implementations: - -| Feature | Desktop | MicroPython | -|---------|---------|-------------| -| Memory | Unlimited | ~256KB - 1MB | -| Arrow IPC | ✅ | ❌ (not supported) | -| Async/Await | ✅ | ❌ (synchronous only) | -| Large payloads (>1MB) | ✅ | ❌ (enforced limit) | -| arrowtable | ✅ | ❌ (not supported) | -| jsontable | ✅ | ❌ (not supported) | -| Multiple payloads | ✅ | ⚠️ (limited) | - -**Note:** MicroPython does NOT support table types (`arrowtable` or `jsontable`) due to memory constraints. - -#### Module Structure - -```python -# natsbridge_mpy.py (MicroPython) -import network -import time -import json -import base64 -import uos -import struct -import random - -# Constants -DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython -DEFAULT_BROKER_URL = "nats://localhost:4222" -DEFAULT_FILESERVER_URL = "http://localhost:8080" -MAX_PAYLOAD_SIZE = 50000 # Hard limit (lower than threshold for safety) - -# Note: MicroPython does NOT support table types (arrowtable/jsontable) -# Only supports: text, dictionary, image, audio, video, binary - - -class NATSBridge: - """MicroPython NATS bridge implementation.""" - - def __init__(self, broker_url=None, fileserver_url=None): - self.broker_url = broker_url or DEFAULT_BROKER_URL - self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL - self._nats_conn = None - - def smartsend(self, subject, data, **kwargs): - """Send data (synchronous).""" - correlation_id = self._generate_uuid() - msg_id = self._generate_uuid() - sender_id = self._generate_uuid() - - print(f"[Correlation: {correlation_id}] Starting smartsend") - - payloads = [] - for dataname, payload_data, payload_type in data: - payload_bytes = self._serialize_data(payload_data, payload_type) - payload_size = len(payload_bytes) - - if payload_size > MAX_PAYLOAD_SIZE: - raise MemoryError(f"Payload {dataname} exceeds max size {MAX_PAYLOAD_SIZE}") - - if payload_size < DEFAULT_SIZE_THRESHOLD: - # Direct path - payload_b64 = base64.b64encode(payload_bytes).decode('ascii') - payloads.append({ - 'id': self._generate_uuid(), - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'direct', - 'encoding': 'base64', - 'size': payload_size, - 'data': payload_b64 - }) - else: - # Link path (limited support) - response = self._sync_fileserver_upload(self.fileserver_url, dataname, payload_bytes) - payloads.append({ - 'id': self._generate_uuid(), - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'link', - 'encoding': 'none', - 'size': payload_size, - 'data': response['url'] - }) - - env = { - 'correlation_id': correlation_id, - 'msg_id': msg_id, - 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), - 'send_to': subject, - 'msg_purpose': kwargs.get('msg_purpose', 'chat'), - 'sender_name': kwargs.get('sender_name', 'NATSBridge'), - 'sender_id': sender_id, - 'receiver_name': kwargs.get('receiver_name', ''), - 'receiver_id': kwargs.get('receiver_id', ''), - 'reply_to': kwargs.get('reply_to', ''), - 'reply_to_msg_id': kwargs.get('reply_to_msg_id', ''), - 'broker_url': self.broker_url, - 'metadata': {}, - 'payloads': payloads - } - - env_json_str = json.dumps(env) - - # Publish - self._publish(subject, env_json_str, correlation_id) - - return env, env_json_str - - def smartreceive(self, msg, **kwargs): - """Receive and process message (synchronous).""" - env_json_obj = json.loads(msg.payload) - correlation_id = env_json_obj['correlation_id'] - - payloads_list = [] - for payload in env_json_obj['payloads']: - transport = payload['transport'] - dataname = payload['dataname'] - - if transport == 'direct': - payload_b64 = payload['data'] - payload_bytes = base64.b64decode(payload_b64) - data_type = payload['payload_type'] - data = self._deserialize_data(payload_bytes, data_type) - payloads_list.append((dataname, data, data_type)) - elif transport == 'link': - url = payload['data'] - downloaded_data = self._sync_fileserver_download( - url, - kwargs.get('max_retries', 3), - kwargs.get('base_delay', 100), - kwargs.get('max_delay', 1000), - correlation_id - ) - data_type = payload['payload_type'] - data = self._deserialize_data(downloaded_data, data_type) - payloads_list.append((dataname, data, data_type)) - - env_json_obj['payloads'] = payloads_list - return env_json_obj - - def _serialize_data(self, data, payload_type): - """ - Serialize data (MicroPython version). - - Note: MicroPython does NOT support table types (arrowtable/jsontable). - Only supports: text, dictionary, image, audio, video, binary - """ - if payload_type == 'text': - if isinstance(data, str): - return data.encode('utf-8') - else: - raise ValueError('Text data must be a string') - elif payload_type == 'dictionary': - json_str = json.dumps(data) - return json_str.encode('utf-8') - elif payload_type in ('image', 'audio', 'video', 'binary'): - if isinstance(data, (bytes, bytearray, memoryview)): - return bytes(data) - else: - raise ValueError(f'{payload_type} data must be bytes') - else: - raise ValueError(f'Unknown payload_type: {payload_type}') - - def _deserialize_data(self, data, payload_type): - """ - Deserialize data (MicroPython version). - - Note: MicroPython does NOT support table types (arrowtable/jsontable). - Only supports: text, dictionary, image, audio, video, binary - """ - if payload_type == 'text': - return data.decode('utf-8') - elif payload_type == 'dictionary': - json_str = data.decode('utf-8') - return json.loads(json_str) - elif payload_type in ('image', 'audio', 'video', 'binary'): - return data - else: - raise ValueError(f'Unknown payload_type: {payload_type}') - - def _generate_uuid(self): - """Generate simple UUID (MicroPython compatible).""" - return 'mp-%04x%04x-%04x-%04x-%04x-%04x%04x%04x' % ( - time.time_ns() // (10**6) % 0xFFFFFFFF, - time.time_ns() % 0xFFFFFFFF, - time.time_ns() >> 32 & 0xFFFF, - time.time_ns() >> 48 & 0xFFFF, - time.time_ns() >> 64 & 0xFFFF, - time.time_ns() >> 80 & 0xFFFF, - time.time_ns() >> 96 & 0xFFFF, - time.time_ns() >> 112 & 0xFFFF - ) - - def _sync_fileserver_upload(self, url, dataname, data): - """Synchronous file upload (limited).""" - # Simplified implementation for MicroPython - # In practice, would use network.HTTP or similar - raise NotImplementedError("File upload not implemented in MicroPython") - - def _sync_fileserver_download(self, url, max_retries, base_delay, max_delay, correlation_id): - """Synchronous file download with backoff.""" - # Simplified implementation for MicroPython - raise NotImplementedError("File download not implemented in MicroPython") - - def _publish(self, subject, message, correlation_id): - """Publish message to NATS.""" - # Simplified implementation for MicroPython - raise NotImplementedError("NATS publishing not implemented in MicroPython") -``` - ---- - -## Configuration - -### Environment Variables - -| Variable | Default | Description | -|----------|---------|-------------| -| `NATS_URL` | `nats://localhost:4222` | NATS server URL | -| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | -| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) | - -### MicroPython Configuration - -```python -# micropython.conf -NATS_URL = "nats://broker.local:4222" -FILESERVER_URL = "http://fileserver.local:8080" -SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices -MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython -``` - ---- - -## Performance Considerations - -### Zero-Copy Reading - -| Platform | Strategy | -|----------|----------| -| **Julia** | `Arrow.read()` with memory-mapped files | -| **JavaScript** | `ArrayBuffer` with `DataView` | -| **Python** | `pyarrow` memory mapping | -| **MicroPython** | Not available (streaming only) | - -### Exponential Backoff - -All platforms implement exponential backoff for HTTP downloads: - -```python -# Python -async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id): - delay = base_delay - for attempt in range(1, max_retries + 1): - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - return await response.read() - except Exception as e: - if attempt < max_retries: - await asyncio.sleep(delay / 1000.0) - delay = min(delay * 2, max_delay) - raise Exception("Failed to fetch after max retries") -``` - -### Correlation ID Logging - -All platforms use correlation IDs for distributed tracing: - -``` -[timestamp] [Correlation: abc123] Message published to subject -``` - -### Serialization Performance - -| Format | Use Case | Pros | Cons | -|--------|----------|------|------| -| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library, not supported in MicroPython | -| `jsontable` | Small/medium tabular data | Human-readable, universal support, works in MicroPython | Slower, larger size, no schema enforcement | - ---- - -## Testing - -### Test File Organization - -| Platform | Sender Tests | Receiver Tests | -|----------|--------------|----------------| -| **Julia** | `test/test_julia_*_sender.jl` | `test/test_julia_*_receiver.jl` | -| **JavaScript** | `test/test_js_*_sender.js` | `test/test_js_*_receiver.js` | -| **Python** | `test/test_py_*_sender.py` | `test/test_py_*_receiver.py` | - -### Run Tests - -```bash -# Julia -julia test/test_julia_text_sender.jl -julia test/test_julia_text_receiver.jl - -# JavaScript (Node.js) -node test/test_js_text_sender.js -node test/test_js_text_receiver.js - -# Python -python3 test/test_py_text_sender.py -python3 test/test_py_text_receiver.py -``` - ---- - -## Troubleshooting - -### Common Issues - -1. **NATS Connection Failed** - - Ensure NATS server is running - - Check `broker_url` configuration - -2. **HTTP Upload Failed** - - Ensure file server is running - - Check `fileserver_url` configuration - - Verify upload permissions - -3. **Arrow IPC Deserialization Error** - - Ensure data is properly serialized to Arrow format - - Check Arrow version compatibility - - MicroPython doesn't support Arrow IPC - -4. **Memory Constraints (MicroPython)** - - Reduce `size_threshold` - - Use direct transport only (< 100KB) - - Avoid large payloads - - Use `jsontable` instead of `arrowtable` (arrowtable not supported) - ---- - -## Summary - -This cross-platform NATS bridge provides: - -1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across all platforms -2. **Idiomatic Implementations**: - - **Julia**: Multiple dispatch, struct-based design, native Arrow IPC - - **JavaScript**: Async/await, prototype-based utilities, class-based NATS client - - **Python**: Class-based design with dataclasses, type hints, async/await - - **MicroPython**: Synchronous API, memory-constrained optimizations -3. **Message Format Consistency**: Identical JSON schemas across all platforms -4. **Handler Abstraction**: File server operations abstracted through configurable handlers -5. **Platform-Specific Optimizations**: - - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython) - - **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in Julia, JavaScript, Python; NOT supported in MicroPython) - -The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior. - -### Datatype Summary - -| Datatype | Serialization | Use Case | Encoding | Supported Platforms | -|----------|---------------|----------|----------|---------------------| -| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python | -| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python | -| `table` | Apache Arrow IPC (Python only) | Python's unified table type | `arrow-ipc` → `base64` | Python | diff --git a/docs/walkthrough.md b/docs/walkthrough.md index 39b7d1b..07a7595 100644 --- a/docs/walkthrough.md +++ b/docs/walkthrough.md @@ -1,1378 +1,965 @@ -# Cross-Platform NATSBridge Walkthrough +# Walkthrough: NATSBridge -A comprehensive guide to building real-world applications with NATSBridge across **Julia**, **JavaScript**, and **Python/MicroPython**. - -## Table of Contents - -1. [Introduction](#introduction) -2. [Architecture Overview](#architecture-overview) -3. [Building a Chat Application](#building-a-chat-application) -4. [Building a File Transfer System](#building-a-file-transfer-system) -5. [Building a Streaming Data Pipeline](#building-a-streaming-data-pipeline) -6. [Performance Optimization](#performance-optimization) -7. [Best Practices](#best-practices) +**Version**: 1.0.0 +**Date**: 2026-03-13 +**Status**: Active +**Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl) --- -## Introduction +## Executive Summary -This walkthrough will guide you through building several real-world applications using NATSBridge. We'll cover: +This document provides the **story of flow** for NATSBridge - the cross-platform bi-directional data bridge that enables seamless communication between **Julia**, **JavaScript**, **Python**, and **MicroPython** applications using NATS as the message bus. -- Chat applications with rich media support -- File transfer systems with claim-check pattern -- Streaming data pipelines - -Each section builds on the previous one, gradually increasing in complexity. +This walkthrough serves as the primary onboarding guide for new developers and explains: +- **How the system works** - Step-by-step flow of data transmission and reception +- **Why steps are sequenced** - The rationale behind architectural decisions +- **What could go wrong** - Common failure scenarios and recovery strategies --- -## Architecture Overview +## Overview: The Big Picture -### Cross-Platform System Components +NATSBridge implements the **Claim-Check pattern** for efficient handling of large payloads (>0.5MB): + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ NATSBridge Architecture │ +├─────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ Sender │ │ Receiver │ │ +│ │ │ │ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │smartsend │◀─────────┤ │smartreceive│ │ │ +│ │ └────┬─────┘ │ │ └────┬─────┘ │ │ +│ │ │ │ │ │ │ │ +│ │ ▼ │ │ ▼ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │Serialize │◀─────────┤ │Deserialize│ │ │ +│ │ └────┬─────┘ │ │ └────┬─────┘ │ │ +│ │ │ │ │ │ │ │ +│ │ ▼ │ │ ▼ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │Transport │◀─────────┤ │Transport │ │ │ +│ │ │Selection │ │ │ │Selection │ │ │ +│ │ └────┬─────┘ │ │ └────┬─────┘ │ │ +│ │ │ │ │ │ │ │ +│ │ ▼ │ │ ▼ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │ NATS │◀─────────┤ │ NATS │ │ │ +│ │ │Publish │ │ │ │Subscribe │ │ │ +│ │ └──────────┘ │ │ └──────────┘ │ │ +│ │ │ │ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │File Server│◀─────────┤ │File Server│ │ │ +│ │ │Upload │ │ │ │Download │ │ │ +│ │ └──────────┘ │ │ └──────────┘ │ │ +│ └──────────────┘ └──────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +### Key Design Principles + +| Principle | Description | Rationale | +|-----------|-------------|-----------| +| **Claim-Check Pattern** | Large payloads uploaded to HTTP server, URL sent via NATS | NATS has message size limits; avoids NATS overflow | +| **Automatic Transport Selection** | Direct (< threshold) vs Link (≥ threshold) based on size | Optimizes memory vs network I/O trade-off | +| **Cross-Platform API** | Consistent `smartsend()`/`smartreceive()` across all platforms | Simplifies developer experience | +| **Exponential Backoff** | Retry downloads with increasing delays | Handles transient failures gracefully | + +--- + +## The Sending Flow: `smartsend()` + +### Step-by-Step Journey ```mermaid -flowchart TB - subgraph JuliaApp["Julia Application"] - JuliaAppCode[App Code] - JuliaBridge[NATSBridge.jl] - JuliaNATS[NATS.jl] - end - - subgraph JSApp["JavaScript Application"] - JSAppCode[App Code] - JSBridge[NATSBridge.js] - JSNATS[nats.js] - end - - subgraph PythonApp["Python Application"] - PythonAppCode[App Code] - PythonBridge[NATSBridge.py] - PythonNATS[nats-py] - end - - subgraph Infrastructure["Infrastructure"] - NATS[NATS Server
Message Broker] - FileServer[HTTP File Server
Upload/Download] - end - - JuliaAppCode --> JuliaBridge - JuliaBridge --> JuliaNATS - JSAppCode --> JSBridge - JSBridge --> JSNATS - PythonAppCode --> PythonBridge - PythonBridge --> PythonNATS - - JuliaNATS --> NATS - JSNATS --> NATS - PythonNATS --> NATS - - NATS --> JuliaNATS - NATS --> JSNATS - NATS --> PythonNATS - - JuliaBridge -.->|HTTP POST upload| FileServer - JSBridge -.->|HTTP POST upload| FileServer - PythonBridge -.->|HTTP POST upload| FileServer - - FileServer -.->|HTTP GET download| JuliaBridge - FileServer -.->|HTTP GET download| JSBridge - FileServer -.->|HTTP GET download| PythonBridge - - style JuliaApp fill:#c5e1a5 - style JSApp fill:#bbdefb - style PythonApp fill:#f8bbd0 - style NATS fill:#fff3e0 - style FileServer fill:#f3e5f5 +flowchart TD + A[User calls smartsend subject data] --> B[Process each payload] + B --> C{Parse payload tuple} + C --> D[Extract: dataname, data, payload_type] + + D --> E[_serialize_data] + E --> F{payload_type} + + F -->|"text"| G[UTF-8 encode] + F -->|"dictionary"| H[JSON serialize] + F -->|"arrowtable"| I[Arrow IPC serialize] + F -->|"jsontable"| J[JSON serialize] + F -->|"image"| K[Raw bytes] + F -->|"audio"| L[Raw bytes] + F -->|"video"| M[Raw bytes] + F -->|"binary"| N[Raw bytes] + + G --> O[Return bytes] + H --> O + I --> O + J --> O + K --> O + L --> O + M --> O + N --> O + + O --> P[Calculate serialized size] + P --> Q{Size < Threshold?} + + Q -->|Yes| R[Direct Transport] + Q -->|No| S[Link Transport] + + R --> T[Base64 encode] + T --> U[Build payload with direct] + + S --> V[Upload to file server] + V --> W[Get download URL] + W --> U + + U --> X[Build envelope] + X --> Y[Convert to JSON] + Y --> Z[Publish to NATS] + + style A fill:#f9f9f9,stroke:#333 + style Z fill:#e0e7ff,stroke:#3b82f6 + style R fill:#d1fae5,stroke:#10b981 + style S fill:#fef3c7,stroke:#f59e0b ``` -### Message Flow +### Detailed Walkthrough -1. **Sender** creates a message envelope with payloads -2. **NATSBridge** serializes and encodes payloads -3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server -4. **NATS** routes messages to subscribers -5. **Receiver** fetches payloads (from NATS or file server) -6. **NATSBridge** deserializes and decodes payloads - ---- - -## Building a Chat Application - -Let's build a full-featured chat application that supports text, images, and file attachments. - -### Step 1: Set Up the Project - -```bash -# Create project directory -mkdir -p chat-app/src -cd chat-app - -# Create configuration file -cat > config.json << 'EOF' -{ - "nats_url": "nats://localhost:4222", - "fileserver_url": "http://localhost:8080", - "size_threshold": 1048576 -} -EOF -``` - -### Step 2: Create the Chat Interface - -#### Julia +#### Step 1: User Calls `smartsend()` ```julia -# src/chat_ui.jl -using NATSBridge, NATS - -struct ChatUI - messages::Vector{Dict} - current_room::String -end - -function ChatUI() - ChatUI(Dict[], "") -end - -function send_message(ui::ChatUI, message_input::String, selected_file::Union{Nothing, String}) - data = [] - - # Add text message - if !isempty(message_input) - push!(data, ("text", message_input, "text")) - end - - # Add file if selected - if selected_file !== nothing - file_data = read(selected_file) - file_type = get_file_type(selected_file) - push!(data, ("attachment", file_data, file_type)) - end - - return data -end - -function get_file_type(filename::String)::String - if endswith(filename, ".png") || endswith(filename, ".jpg") - return "image" - elseif endswith(filename, ".mp3") || endswith(filename, ".wav") - return "audio" - elseif endswith(filename, ".mp4") || endswith(filename, ".avi") - return "video" - else - return "binary" - end -end - -function add_message(ui::ChatUI, user::String, text::String, attachment::Union{Nothing, Dict}) - push!(ui.messages, Dict( - "user" => user, - "text" => text, - "attachment" => attachment - )) -end -``` - -#### JavaScript - -```javascript -// src/chat_ui.js -const NATSBridge = require('./src/natsbridge.js'); - -class ChatUI { - constructor() { - this.messages = []; - this.currentRoom = ""; - } - - sendMessage(messageInput, selectedFile = null) { - const data = []; - - // Add text message - if (messageInput.length > 0) { - data.push(["text", messageInput, "text"]); - } - - // Add file if selected - if (selectedFile !== null) { - const fileData = fs.readFileSync(selectedFile); - const fileType = this.getFileType(selectedFile); - data.push(["attachment", fileData, fileType]); - } - - return data; - } - - getFileType(filename) { - if (filename.endsWith('.png') || filename.endsWith('.jpg')) { - return 'image'; - } else if (filename.endsWith('.mp3') || filename.endsWith('.wav')) { - return 'audio'; - } else if (filename.endsWith('.mp4') || filename.endsWith('.avi')) { - return 'video'; - } else { - return 'binary'; - } - } - - addMessage(user, text, attachment = null) { - this.messages.push({ - user, - text, - attachment - }); - } -} - -module.exports = ChatUI; -``` - -#### Python - -```python -# src/chat_ui.py -from typing import List, Dict, Optional, Union - -class ChatUI: - def __init__(self): - self.messages: List[Dict] = [] - self.current_room: str = "" - - def send_message(self, message_input: str, selected_file: Optional[str] = None) -> List[tuple]: - data = [] - - # Add text message - if message_input: - data.append(("text", message_input, "text")) - - # Add file if selected - if selected_file: - with open(selected_file, "rb") as f: - file_data = f.read() - file_type = self.get_file_type(selected_file) - data.append(("attachment", file_data, file_type)) - - return data - - def get_file_type(self, filename: str) -> str: - if filename.endswith(('.png', '.jpg')): - return "image" - elif filename.endswith(('.mp3', '.wav')): - return "audio" - elif filename.endswith(('.mp4', '.avi')): - return "video" - else: - return "binary" - - def add_message(self, user: str, text: str, attachment: Optional[Dict] = None): - self.messages.append({ - "user": user, - "text": text, - "attachment": attachment - }) -``` - -### Step 3: Create the Message Handler - -#### Julia - -```julia -# src/chat_handler.jl -using NATSBridge, NATS - -struct ChatHandler - nats::NATS.Connection - ui::ChatUI -end - -function ChatHandler(nats_connection::NATS.Connection) - ChatHandler(nats_connection, ChatUI()) -end - -function start(handler::ChatHandler) - # Subscribe to chat rooms - rooms = ["general", "tech", "random"] - - for room in rooms - NATS.subscribe(handler.nats, "/chat/$room") do msg - handle_message(handler, msg) - end - end - - println("Chat handler started") -end - -function handle_message(handler::ChatHandler, msg::NATS.Msg) - env = smartreceive(msg, fileserver_download_handler=_fetch_with_backoff) - - # Extract sender info from envelope - sender = get(env, "sender_name", "Anonymous") - - # Process each payload - for (dataname, data, type) in env["payloads"] - if type == "text" - add_message(handler.ui, sender, data, nothing) - elseif type == "image" - # Convert to data URL for display - base64_data = base64encode(data) - attachment = Dict( - "type" => "image", - "data" => "data:image/png;base64,$base64_data" - ) - add_message(handler.ui, sender, "", attachment) - else - # For other types, use file server URL - attachment = Dict("type" => type, "data" => data) - add_message(handler.ui, sender, "", attachment) - end - end -end -``` - -#### JavaScript - -```javascript -// src/chat_handler.js -const NATSBridge = require('./src/natsbridge.js'); -const nats = require('nats'); - -class ChatHandler { - constructor(natsConnection) { - this.nats = natsConnection; - this.ui = new (require('./chat_ui.js'))(); - } - - async start() { - // Subscribe to chat rooms - const rooms = ['general', 'tech', 'random']; - - for (const room of rooms) { - this.nats.subscribe(`/chat/${room}`, async (msg) => { - await this.handleMessage(msg); - }); - } - - console.log('Chat handler started'); - } - - async handleMessage(msg) { - const env = await NATSBridge.smartreceive(msg, { - fileserver_download_handler: NATSBridge.fetchWithBackoff - }); - - // Extract sender info from envelope - const sender = env.sender_name || 'Anonymous'; - - // Process each payload - for (const [dataname, data, type] of env.payloads) { - if (type === 'text') { - this.ui.addMessage(sender, data, null); - } else if (type === 'image') { - // Convert to data URL for display - const base64Data = Buffer.from(data).toString('base64'); - const attachment = { - type: 'image', - data: `data:image/png;base64,${base64Data}` - }; - this.ui.addMessage(sender, '', attachment); - } else { - // For other types, use file server URL - const attachment = { type, data }; - this.ui.addMessage(sender, '', attachment); - } - } - } -} - -module.exports = ChatHandler; -``` - -#### Python - -```python -# src/chat_handler.py -import asyncio -from typing import Optional -from natsbridge import smartreceive, fetch_with_backoff - -class ChatHandler: - def __init__(self, nats_connection): - self.nats = nats_connection - self.ui = ChatUI() - - async def start(self): - # Subscribe to chat rooms - rooms = ['general', 'tech', 'random'] - - for room in rooms: - await self.nats.subscribe( - f'/chat/{room}', - callback=self.handle_message - ) - - print('Chat handler started') - - async def handle_message(self, msg): - env = await smartreceive( - msg, - fileserver_download_handler=fetch_with_backoff - ) - - # Extract sender info from envelope - sender = env.get('sender_name', 'Anonymous') - - # Process each payload - for dataname, data, type_ in env['payloads']: - if type_ == 'text': - self.ui.add_message(sender, data, None) - elif type_ == 'image': - # Convert to data URL for display - import base64 - base64_data = base64.b64encode(data).decode('utf-8') - attachment = { - 'type': 'image', - 'data': f'data:image/png;base64,{base64_data}' - } - self.ui.add_message(sender, '', attachment) - else: - # For other types, use file server URL or data - attachment = {'type': type_, 'data': data} - self.ui.add_message(sender, '', attachment) -``` - -### Step 4: Run the Application - -```bash -# Start NATS -docker run -p 4222:4222 nats:latest - -# Start file server -mkdir -p /tmp/fileserver -python3 -m http.server 8080 --directory /tmp/fileserver - -# Run chat app # Julia -julia src/chat_ui.jl -julia src/chat_handler.jl - -# JavaScript -node src/chat_ui.js -node src/chat_handler.js +data = [ + ("msg", "Hello World", "text"), + ("img", binary_data, "image") +] +env, msg_json = smartsend("/chat/user/v1/message", data) +``` +```python # Python -python3 src/chat_ui.py -python3 src/chat_handler.py +data = [ + ("msg", "Hello World", "text"), + ("img", binary_data, "image") +] +env, msg_json = await smartsend("/chat/user/v1/message", data) +``` + +```javascript +// JavaScript +const data = [ + ["msg", "Hello World", "text"], + ["img", binaryData, "image"] +]; +const [env, msgJson] = await smartsend("/chat/user/v1/message", data); +``` + +**What happens**: +- User provides a list of tuples: `(dataname, data, payload_type)` +- `dataname`: Identifier for the payload (e.g., "msg", "login_image") +- `data`: The actual data to send +- `payload_type`: Type string determining serialization method + +#### Step 2: Serialization (`_serialize_data`) + +Each payload is serialized based on its type: + +| Payload Type | Julia | Python | JavaScript | Encoding | +|--------------|-------|--------|------------|----------| +| `text` | UTF-8 bytes | UTF-8 bytes | UTF-8 bytes | Base64 | +| `dictionary` | JSON string | JSON string | JSON string | Base64 | +| `arrowtable` | Arrow IPC | Arrow IPC | Arrow IPC | Base64/arrow-ipc | +| `jsontable` | JSON array | JSON array | JSON array | Base64/json | +| `image`/`audio`/`video`/`binary` | Raw bytes | Raw bytes | Raw bytes | Base64 | + +**Example**: +```julia +# Text serialization +text_bytes = Vector{UInt8}("Hello World") # 11 bytes + +# Dictionary serialization +dict_bytes = Vector{UInt8}("{\"key\":\"value\"}") # 17 bytes + +# Arrow table serialization +io = IOBuffer() +Arrow.write(io, data_frame) +arrow_bytes = take!(io) # Binary Arrow IPC stream +``` + +#### Step 3: Transport Selection + +The serialized size determines the transport method: + +| Platform | Threshold | Notes | +|----------|-----------|-------| +| Desktop (Julia/JS/Python) | 500,000 bytes (0.5MB) | Default threshold | +| MicroPython | 100,000 bytes (100KB) | Lower threshold for memory constraints | + +**Decision Logic**: +```julia +if payload_size < size_threshold + # Direct transport: send via NATS +else + # Link transport: upload to file server +end +``` + +#### Step 4: Direct Transport Path + +For payloads < threshold: + +1. **Base64 Encode**: Convert binary data to ASCII string +2. **Build Payload**: Create `msg_payload_v1` with `transport="direct"` + +```julia +# Encode as Base64 +payload_b64 = Base64.base64encode(payload_bytes) + +# Build payload +payload = msg_payload_v1( + payload_b64, + payload_type; + transport = "direct", + encoding = "base64", + size = payload_size +) +``` + +#### Step 5: Link Transport Path + +For payloads ≥ threshold: + +1. **Upload to File Server**: Use `plik_oneshot_upload()` +2. **Get Download URL**: Server returns URL for the uploaded file +3. **Build Payload**: Create `msg_payload_v1` with `transport="link"` + +```julia +# Upload to Plik server +response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) + +# Extract URL +url = response["url"] + +# Build payload +payload = msg_payload_v1( + url, + payload_type; + transport = "link", + encoding = "none", + size = payload_size +) +``` + +**File Server Handler Contract**: +```julia +function fileserver_upload_handler( + file_server_url::String, + dataname::String, + data::Vector{UInt8} +)::Dict{String, Any} + # Returns: Dict("status" => 200, "uploadid" => "...", "fileid" => "...", "url" => "...") +end +``` + +#### Step 6: Build Envelope + +All payloads are wrapped in a message envelope: + +```julia +env = msg_envelope_v1( + subject, + payloads; + correlation_id = correlation_id, + msg_id = msg_id, + msg_purpose = msg_purpose, + sender_name = sender_name, + sender_id = sender_id, + receiver_name = receiver_name, + receiver_id = receiver_id, + reply_to = reply_to, + reply_to_msg_id = reply_to_msg_id, + broker_url = broker_url +) +``` + +**Envelope Fields**: +| Field | Purpose | +|-------|---------| +| `correlation_id` | Track message flow across distributed systems | +| `msg_id` | Unique identifier for this message | +| `timestamp` | ISO 8601 UTC timestamp | +| `send_to` | NATS subject to publish to | +| `msg_purpose` | ACK, NACK, updateStatus, shutdown, chat, command, event | +| `sender_name`/`sender_id` | Sender identification | +| `receiver_name`/`receiver_id` | Receiver identification (empty = broadcast) | +| `reply_to` | Topic for reply messages | +| `broker_url` | NATS server URL | +| `metadata` | Message-level metadata | +| `payloads` | Array of payload objects | + +#### Step 7: Publish to NATS + +The envelope is converted to JSON and published to NATS: + +```julia +env_json_str = envelope_to_json(env) + +# Publish with existing connection +publish_message(nats_connection, subject, env_json_str, correlation_id) + +# Or publish by creating new connection +publish_message(broker_url, subject, env_json_str, correlation_id) ``` --- -## Building a File Transfer System +## The Receiving Flow: `smartreceive()` -Let's build a file transfer system that handles large files efficiently. +### Step-by-Step Journey -### Step 1: File Upload Service +```mermaid +flowchart TD + A[NATS message arrives] --> B[Parse JSON envelope] + B --> C[Extract payloads array] + C --> D{Iterate through payloads} + + D --> E[Get payload transport] + E --> F{transport == direct?} + + F -->|Yes| G[Extract Base64 data] + G --> H[Decode Base64] + H --> I[_deserialize_data] + + F -->|No| J[Extract download URL] + J --> K[Fetch with exponential backoff] + K --> L[_deserialize_data] + + I --> M[Build payload tuple] + L --> M + + M --> N{More payloads?} + N -->|Yes| D + N -->|No| O[Replace payloads array] + O --> P[Return envelope] + + style A fill:#f9f9f9,stroke:#333 + style P fill:#e0e7ff,stroke:#3b82f6 + style G fill:#d1fae5,stroke:#10b981 + style J fill:#fef3c7,stroke:#f59e0b +``` -#### Julia +### Detailed Walkthrough + +#### Step 1: NATS Message Arrives + +The receiver gets a message from NATS: ```julia -# src/file_upload_service.jl -using NATSBridge, HTTP - -struct FileUploadService - broker_url::String - fileserver_url::String -end - -function FileUploadService(broker_url::String, fileserver_url::String) - FileUploadService(broker_url, fileserver_url) -end - -function upload_file(service::FileUploadService, file_path::String, recipient::String)::Dict - file_data = read(file_path) - file_name = basename(file_path) - - data = [("file", file_data, "binary")] - - env, env_json_str = smartsend( - "/files/$recipient", - data, - broker_url=service.broker_url, - fileserver_url=service.fileserver_url - ) - - return env -end - -function upload_large_file(service::FileUploadService, file_path::String, recipient::String)::Dict - file_size = stat(file_path).size - - if file_size > 100 * 1024 * 1024 # > 100MB - println("File too large for direct upload, using streaming...") - return stream_upload(service, file_path, recipient) - end - - return upload_file(service, file_path, recipient) -end - -function stream_upload(service::FileUploadService, file_path::String, recipient::String)::Dict - # Implement streaming upload to file server - # This would require a more sophisticated file server - # For now, we'll use the standard upload - return upload_file(service, file_path, recipient) -end +# Julia +msg = nats_subscription.next() # Get next message +env = smartreceive(msg) ``` -#### JavaScript - -```javascript -// src/file_upload_service.js -const NATSBridge = require('./src/natsbridge.js'); -const fs = require('fs'); - -class FileUploadService { - constructor(brokerUrl, fileserverUrl) { - this.broker_url = brokerUrl; - this.fileserver_url = fileserverUrl; - } - - async uploadFile(filePath, recipient) { - const fileData = fs.readFileSync(filePath); - const fileName = require('path').basename(filePath); - - const data = [["file", fileData, "binary"]]; - - const [env, env_json_str] = await NATSBridge.smartsend( - `/files/${recipient}`, - data, - { - broker_url: this.broker_url, - fileserver_url: this.fileserver_url - } - ); - - return env; - } - - async uploadLargeFile(filePath, recipient) { - const stats = fs.statSync(filePath); - const fileSize = stats.size; - - if (fileSize > 100 * 1024 * 1024) { // > 100MB - console.log('File too large for direct upload, using streaming...'); - return this.streamUpload(filePath, recipient); - } - - return this.uploadFile(filePath, recipient); - } - - async streamUpload(filePath, recipient) { - // Implement streaming upload to file server - // This would require a more sophisticated file server - // For now, we'll use the standard upload - return this.uploadFile(filePath, recipient); - } -} - -module.exports = FileUploadService; -``` - -#### Python - ```python -# src/file_upload_service.py -from natsbridge import smartsend -import os - -class FileUploadService: - def __init__(self, broker_url: str, fileserver_url: str): - self.broker_url = broker_url - self.fileserver_url = fileserver_url - - async def upload_file(self, file_path: str, recipient: str) -> tuple: - with open(file_path, "rb") as f: - file_data = f.read() - file_name = os.path.basename(file_path) - - data = [("file", file_data, "binary")] - - env, env_json_str = await smartsend( - f"/files/{recipient}", - data, - broker_url=self.broker_url, - fileserver_url=self.fileserver_url - ) - - return env, env_json_str - - async def upload_large_file(self, file_path: str, recipient: str) -> tuple: - file_size = os.path.getsize(file_path) - - if file_size > 100 * 1024 * 1024: # > 100MB - print("File too large for direct upload, using streaming...") - return await self.stream_upload(file_path, recipient) - - return await self.upload_file(file_path, recipient) - - async def stream_upload(self, file_path: str, recipient: str) -> tuple: - # Implement streaming upload to file server - # This would require a more sophisticated file server - # For now, we'll use the standard upload - return await self.upload_file(file_path, recipient) +# Python +msg = await nats_consumer.next() # Get next message +env = await smartreceive(msg) ``` -### Step 2: File Download Service +```javascript +// JavaScript +const msg = await natsSubscription.next(); +const env = await smartreceive(msg); +``` -#### Julia +#### Step 2: Parse JSON Envelope + +The message payload is parsed as JSON: ```julia -# src/file_download_service.jl -using NATSBridge +env_json_obj = JSON.parse(String(msg.payload)) +``` -struct FileDownloadService - nats_url::String -end +**Expected Structure**: +```json +{ + "correlation_id": "abc123...", + "msg_id": "def456...", + "timestamp": "2026-03-13T07:02:50.443Z", + "send_to": "/chat/user/v1/message", + "msg_purpose": "chat", + "sender_name": "sender-app", + "sender_id": "sender-uuid...", + "receiver_name": "receiver-app", + "receiver_id": "receiver-uuid...", + "reply_to": "reply.subject", + "reply_to_msg_id": "msg-id...", + "broker_url": "nats://localhost:4222", + "metadata": {}, + "payloads": [ + { + "id": "payload-uuid...", + "dataname": "msg", + "payload_type": "text", + "transport": "direct", + "encoding": "base64", + "size": 11, + "data": "SGVsbG8gV29ybGQ=", + "metadata": {"payload_bytes": 11} + } + ] +} +``` -function FileDownloadService(nats_url::String) - FileDownloadService(nats_url) -end +#### Step 3: Process Each Payload -function download_file(service::FileDownloadService, msg::NATS.Msg, sender::String, download_id::String) - env = smartreceive(msg, fileserver_download_handler=fetch_from_url) +For each payload in the envelope: + +```julia +num_payloads = length(env_json_obj["payloads"]) + +for i in 1:num_payloads + payload = env_json_obj["payloads"][i] + transport = String(payload["transport"]) + dataname = String(payload["dataname"]) - # Process each payload - for (dataname, data, type) in env["payloads"] - if type == "binary" - file_path = "/downloads/$dataname" - write(file_path, data) - println("File saved to $file_path") + if transport == "direct" + # Direct transport path + elseif transport == "link" + # Link transport path + else + error("Unknown transport type: $transport") + end +end +``` + +#### Step 4: Direct Transport Path + +For payloads with `transport == "direct"`: + +1. **Extract Base64 Data**: Get the Base64-encoded string +2. **Decode Base64**: Convert to binary data +3. **Deserialize**: Convert bytes to native data type + +```julia +# Extract Base64 payload +payload_b64 = String(payload["data"]) + +# Decode Base64 +payload_bytes = Base64.base64decode(payload_b64) + +# Deserialize based on type +data_type = String(payload["payload_type"]) +data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"]) +``` + +**Deserialization Logic**: +| Payload Type | Deserialization | +|--------------|-----------------| +| `text` | UTF-8 bytes → String | +| `dictionary` | UTF-8 bytes → JSON string → Julia object | +| `arrowtable` | UTF-8 bytes → Arrow IPC → DataFrame | +| `jsontable` | UTF-8 bytes → JSON string → Vector{Dict} → DataFrame | +| `image`/`audio`/`video`/`binary` | Bytes directly | + +#### Step 5: Link Transport Path + +For payloads with `transport == "link"`: + +1. **Extract URL**: Get the download URL from payload +2. **Fetch with Backoff**: Download data with retry logic +3. **Deserialize**: Convert bytes to native data type + +```julia +# Extract download URL +url = String(payload["data"]) + +# Fetch with exponential backoff +downloaded_data = fileserver_download_handler( + url, + max_retries, + base_delay, + max_delay, + env_json_obj["correlation_id"] +) + +# Deserialize based on type +data_type = String(payload["payload_type"]) +data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"]) +``` + +**Download Handler Contract**: +```julia +function fileserver_download_handler( + url::String, + max_retries::Int, + base_delay::Int, + max_delay::Int, + correlation_id::String +)::Vector{UInt8} + # Returns: Vector{UInt8} (downloaded bytes) +end +``` + +#### Step 6: Build Payload List + +Each processed payload is added to the result list: + +```julia +payloads_list = Tuple{String, Any, String}[] + +# After processing each payload +push!(payloads_list, (dataname, data, data_type)) +``` + +**Result Format**: +```julia +[ + ("msg", "Hello World", "text"), + ("img", binary_data, "image") +] +``` + +#### Step 7: Return Envelope + +The envelope is updated with the processed payloads and returned: + +```julia +env_json_obj["payloads"] = payloads_list +return env_json_obj +``` + +--- + +## File Server Integration + +### Plik One-Shot Upload + +NATSBridge uses **Plik** as the default HTTP file server for link transport: + +```julia +# Upload handler +function plik_oneshot_upload( + file_server_url::String, + dataname::String, + data::Vector{UInt8} +)::Dict{String, Any} +``` + +**Upload Flow**: +1. **Create One-Shot Session**: POST `/upload` with `{"OneShot": true}` +2. **Get Upload ID**: Server returns `uploadid` and `uploadtoken` +3. **Upload File**: POST `/file/{uploadid}` with multipart form data +4. **Get File ID**: Server returns `fileid` +5. **Return URL**: Construct download URL + +```julia +# Step 1: Create one-shot session +POST /upload +Headers: Content-Type: application/json +Body: {"OneShot": true} + +Response: +{ + "id": "UPLOAD_ID", + "uploadToken": "UPLOAD_TOKEN", + "status": 200 +} + +# Step 2: Upload file +POST /file/UPLOAD_ID +Headers: X-UploadToken: UPLOAD_TOKEN +Body: multipart/form-data (file) + +Response: +{ + "id": "FILE_ID", + "status": 200 +} + +# Final URL: http://localhost:8080/file/UPLOAD_ID/FILE_ID/filename.ext +``` + +### Exponential Backoff for Downloads + +File downloads use exponential backoff for resilience: + +```julia +function _fetch_with_backoff( + url::String, + max_retries::Int, + base_delay::Int, + max_delay::Int, + correlation_id::String +)::Vector{UInt8} +``` + +**Retry Policy**: +- Initial delay: `base_delay` milliseconds (default: 100ms) +- Multiplier: 2x per retry +- Maximum delay: `max_delay` milliseconds (default: 5000ms) +- Maximum retries: `max_retries` (default: 5) + +**Delay Calculation**: +```julia +delay = base_delay # Start with 100ms + +for attempt in 1:max_retries + try + # Try to fetch + response = HTTP.request("GET", url) + if response.status == 200 + return response.body + end + catch e + if attempt < max_retries + sleep(delay / 1000.0) # Sleep before retry + delay = min(delay * 2, max_delay) # Double delay, cap at max end end end -function fetch_from_url(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} - # Fetch data from URL with exponential backoff - # Return downloaded data as Vector{UInt8} -end +error("Failed after $max_retries attempts") ``` -#### JavaScript +**Example Delays**: +| Attempt | Delay | +|---------|-------| +| 1 | 100ms | +| 2 | 200ms | +| 3 | 400ms | +| 4 | 800ms | +| 5 | 1600ms (capped at 5000ms) | -```javascript -// src/file_download_service.js -const NATSBridge = require('./src/natsbridge.js'); -const fs = require('fs'); +--- -class FileDownloadService { - constructor(natsUrl) { - this.nats_url = natsUrl; - } - - async downloadFile(msg, sender, downloadId) { - const env = await NATSBridge.smartreceive(msg, { - fileserver_download_handler: NATSBridge.fetchWithBackoff - }); - - // Process each payload - for (const [dataname, data, type] of env.payloads) { - if (type === 'binary') { - const filePath = `/downloads/${dataname}`; - fs.writeFileSync(filePath, data); - console.log(`File saved to ${filePath}`); - } - } +## Cross-Platform Compatibility + +### Platform-Specific Implementations + +| Platform | File | Key Features | +|----------|------|--------------| +| Julia | `src/NATSBridge.jl` | Multiple dispatch, Arrow.jl support | +| Python | `src/natsbridge.py` | Async/await, pyarrow support | +| Node.js | `src/natsbridge_ssr.js` | Buffer, nats.js | +| Browser | `src/natsbridge_csr.js` | Uint8Array, nats.ws, Web Crypto | +| MicroPython | `src/natsbridge_mpy.py` | Synchronous, limited payload types | + +### API Parity + +All platforms implement the same core API: + +| Function | Julia | Python | JavaScript | MicroPython | +|----------|-------|--------|------------|-------------| +| `smartsend()` | ✅ | ✅ | ✅ | ✅ | +| `smartreceive()` | ✅ | ✅ | ✅ | ✅ | +| `plik_oneshot_upload()` | ✅ | ✅ | ✅ | ⚠️ (placeholder) | +| `fetch_with_backoff()` | ✅ | ✅ | ✅ | ⚠️ (placeholder) | + +### Payload Type Support by Platform + +| Type | Julia | Python | Node.js | Browser | MicroPython | +|------|-------|--------|---------|---------|-------------| +| `text` | ✅ | ✅ | ✅ | ✅ | ✅ | +| `dictionary` | ✅ | ✅ | ✅ | ✅ | ✅ | +| `arrowtable` | ✅ | ✅ | ✅ | ✅ | ❌ | +| `jsontable` | ✅ | ✅ | ✅ | ✅ | ⚠️ | +| `image` | ✅ | ✅ | ✅ | ✅ | ✅ | +| `audio` | ✅ | ✅ | ✅ | ✅ | ✅ | +| `video` | ✅ | ✅ | ✅ | ✅ | ✅ | +| `binary` | ✅ | ✅ | ✅ | ✅ | ✅ | + +--- + +## Error Handling + +### Common Error Scenarios + +| Scenario | Error Code | Recovery | +|----------|------------|----------| +| **Unknown payload_type** | `INVALID_PAYLOAD_TYPE` | Use supported payload_type | +| **Failed to upload** | `UPLOAD_FAILED` | Retry or use direct transport | +| **Failed to fetch** | `DOWNLOAD_FAILED` | Retry with exponential backoff | +| **Unknown transport** | `INVALID_TRANSPORT` | Check payload transport field | +| **NATS connection failed** | `NATS_CONNECTION_FAILED` | Check NATS server availability | +| **Deserialization error** | `DESERIALIZATION_ERROR` | Validate payload_type matches data | + +### Error Response Format + +```json +{ + "correlation_id": "abc123...", + "msg_id": "def456...", + "timestamp": "2026-03-13T07:02:50.443Z", + "send_to": "/chat/user/v1/message", + "error": { + "code": "DOWNLOAD_FAILED", + "message": "Failed to fetch data after 5 attempts", + "details": { + "url": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/filename.ext", + "correlation_id": "abc123..." } + } } - -module.exports = FileDownloadService; ``` -#### Python - -```python -# src/file_download_service.py -from natsbridge import smartreceive, fetch_with_backoff -import os - -class FileDownloadService: - def __init__(self, nats_url: str): - self.nats_url = nats_url - - async def download_file(self, msg, sender: str, download_id: str): - env = await smartreceive( - msg, - fileserver_download_handler=fetch_with_backoff - ) - - # Process each payload - for dataname, data, type_ in env['payloads']: - if type_ == 'binary': - file_path = f'/downloads/{dataname}' - os.makedirs('/downloads', exist_ok=True) - with open(file_path, 'wb') as f: - f.write(data) - print(f"File saved to {file_path}") -``` - -### Step 3: File Transfer CLI - -#### Julia +### Exception Handling Examples ```julia -# src/cli.jl -using NATSBridge - -function main() - println("File Transfer System") - println("====================") - println("1. Upload file") - println("2. Download file") - println("3. List pending downloads") - - print("Enter choice: ") - choice = readline() - - if choice == "1" - upload_file_cli() - elseif choice == "2" - download_file_cli() - end +# File server unavailable +try + env, msg_json = smartsend("/subject", data) +catch e + # Retry with direct transport or use smaller payloads end -function upload_file_cli() - print("Enter file path: ") - file_path = readline() - - print("Enter recipient: ") - recipient = readline() - - file_service = FileUploadService("nats://localhost:4222", "http://localhost:8080") - - try - env = upload_file(file_service, file_path, recipient) - println("Upload successful!") - println("File ID: $(env["payloads"][1][1])") - catch error - println("Upload failed: $(error)") - end +# Deserialization error +try + env = smartreceive(msg) +catch e + # Log correlation_id and inspect payload structure + @error "Deserialization failed" exception=(e, env.correlation_id) end - -function download_file_cli() - print("Enter sender: ") - sender = readline() - - file_service = FileDownloadService("nats://localhost:4222") - - try - download_file(file_service, sender) - println("Download complete!") - catch error - println("Download failed: $(error)") - end -end - -main() ``` --- -## Building a Streaming Data Pipeline +## Debugging and Tracing -Let's build a data pipeline that processes streaming data from sensors. +### Correlation ID Tracking -### Step 1: Sensor Data Model - -#### Julia +Every message includes a `correlation_id` for distributed tracing: ```julia -# src/sensor_data.jl -using Dates, DataFrames +# Generate correlation ID at start of request +correlation_id = string(uuid4()) -struct SensorReading - sensor_id::String - timestamp::String - value::Float64 - unit::String - metadata::Dict{String, Any} -end - -function SensorReading(sensor_id::String, value::Float64, unit::String, metadata::Dict{String, Any}=Dict()) - SensorReading( - sensor_id, - ISODateTime(now(), Dates.Second) |> string, - value, - unit, - metadata - ) -end - -struct SensorBatch - readings::Vector{SensorReading} -end - -function SensorBatch() - SensorBatch(SensorReading[]) -end - -function add_reading(batch::SensorBatch, reading::SensorReading) - push!(batch.readings, reading) -end - -function to_dataframe(batch::SensorBatch)::DataFrame - data = Dict{String, Any}() - data["sensor_id"] = [r.sensor_id for r in batch.readings] - data["timestamp"] = [r.timestamp for r in batch.readings] - data["value"] = [r.value for r in batch.readings] - data["unit"] = [r.unit for r in batch.readings] - - return DataFrame(data) -end +# Use throughout the request flow +log_trace(correlation_id, "Starting smartsend for subject: $subject") +log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes") +log_trace(correlation_id, "Using direct transport for $payload_size bytes") ``` -#### JavaScript - -```javascript -// src/sensor_data.js -const NATSBridge = require('./src/natsbridge.js'); - -class SensorReading { - constructor(sensorId, value, unit, metadata = {}) { - this.sensor_id = sensorId; - this.timestamp = new Date().toISOString(); - this.value = value; - this.unit = unit; - this.metadata = metadata; - } -} - -class SensorBatch { - constructor() { - this.readings = []; - } - - addReading(reading) { - this.readings.push(reading); - } - - toDataFrame() { - return { - sensor_id: this.readings.map(r => r.sensor_id), - timestamp: this.readings.map(r => r.timestamp), - value: this.readings.map(r => r.value), - unit: this.readings.map(r => r.unit) - }; - } -} - -module.exports = { SensorReading, SensorBatch }; +**Log Format**: +``` +[2026-03-13T07:02:50.443Z] [Correlation: abc123...] Starting smartsend for subject: /chat/user/v1/message +[2026-03-13T07:02:50.445Z] [Correlation: abc123...] Serialized payload 'msg' (type: text) size: 11 bytes +[2026-03-13T07:02:50.446Z] [Correlation: abc123...] Using direct transport for 11 bytes ``` -#### Python +### Logging in All Implementations -```python -# src/sensor_data.py -from datetime import datetime -from dataclasses import dataclass, field -from typing import List, Dict, Any +| Platform | Logging Method | +|----------|----------------| +| Julia | `@info` macro | +| Python | `print()` with timestamp | +| JavaScript | `console.log()` | +| MicroPython | `print()` | -@dataclass -class SensorReading: - sensor_id: str - timestamp: str - value: float - unit: str - metadata: Dict[str, Any] = field(default_factory=dict) +--- - @classmethod - def create(cls, sensor_id: str, value: float, unit: str, metadata: Dict[str, Any] = None): - return cls( - sensor_id=sensor_id, - timestamp=datetime.utcnow().isoformat(), - value=value, - unit=unit, - metadata=metadata or {} - ) +## Testing the Flow -class SensorBatch: - def __init__(self): - self.readings: List[SensorReading] = [] - - def add_reading(self, reading: SensorReading): - self.readings.append(reading) - - def to_dataframe(self): - import pandas as pd - return pd.DataFrame({ - 'sensor_id': [r.sensor_id for r in self.readings], - 'timestamp': [r.timestamp for r in self.readings], - 'value': [r.value for r in self.readings], - 'unit': [r.unit for r in self.readings] - }) -``` - -### Step 2: Sensor Sender - -#### Julia +### Example: End-to-End Test ```julia -# src/sensor_sender.jl -using NATSBridge, Dates, Random +# Sender side +data = [ + ("msg", "Hello", "text"), + ("img", image_data, "image") +] +env, msg_json = smartsend("/chat/user/v1/message", data) -struct SensorSender - broker_url::String - fileserver_url::String -end +# Receiver side +msg = nats_subscription.next() +env = smartreceive(msg) -function SensorSender(broker_url::String, fileserver_url::String) - SensorSender(broker_url, fileserver_url) -end - -function send_reading(sender::SensorSender, sensor_id::String, value::Float64, unit::String) - reading = SensorReading(sensor_id, value, unit) - - data = [("reading", reading.metadata, "dictionary")] - - # Default: is_publish=True (automatically publishes to NATS) - smartsend( - "/sensors/$sensor_id", - data, - broker_url=sender.broker_url, - fileserver_url=sender.fileserver_url - ) -end - -function send_batch(sender::SensorSender, readings::Vector{SensorReading}) - batch = SensorBatch() - for reading in readings - add_reading(batch, reading) - end - - df = to_dataframe(batch) - - # Convert to Arrow IPC format - import Arrow - table = Arrow.Table(df) - - # Serialize to Arrow IPC - import IOBuffer - buf = IOBuffer() - Arrow.write(buf, table) - - arrow_data = take!(buf) - - # Send based on size (auto-selected by smartsend) - data = [("batch", arrow_data, "arrowtable")] - smartsend( - "/sensors/batch", - data, - broker_url=sender.broker_url, - fileserver_url=sender.fileserver_url - ) +# Verify payloads +for (dataname, data, type_) in env["payloads"] + println("$dataname: $data (type: $type_)") end ``` -#### JavaScript +### Test Scenarios -```javascript -// src/sensor_sender.js -const NATSBridge = require('./src/natsbridge.js'); -const { SensorReading, SensorBatch } = require('./sensor_data.js'); +| Scenario | Payloads | Transport | Expected Result | +|----------|----------|-----------|-----------------| +| Single text (small) | `text` | direct | Round-trip successful | +| Single dictionary (small) | `dictionary` | direct | Round-trip successful | +| Single arrow table (small) | `arrowtable` | direct | Arrow IPC round-trip | +| Single image (large) | `image` | link | File server upload/download | +| Mixed payloads | `text` + `image` | direct + link | All payloads preserved | -class SensorSender { - constructor(brokerUrl, fileserverUrl) { - this.broker_url = brokerUrl; - this.fileserver_url = fileserverUrl; - } +--- - async sendReading(sensorId, value, unit) { - const reading = new SensorReading(sensorId, value, unit); - - const data = [["reading", reading.metadata, "dictionary"]]; - - await NATSBridge.smartsend( - `/sensors/${sensorId}`, - data, - { - broker_url: this.broker_url, - fileserver_url: this.fileserver_url - } - ); - } +## Deployment Considerations - async sendBatch(readings) { - const batch = new SensorBatch(); - for (const reading of readings) { - batch.addReading(reading); - } - - const df = batch.toDataFrame(); - - // Convert to Arrow IPC - const arrow = require('apache-arrow'); - const schema = new arrow.Schema([ - new arrow.Field('sensor_id', arrow.string()), - new arrow.Field('timestamp', arrow.string()), - new arrow.Field('value', arrow.float64()), - new arrow.Field('unit', arrow.string()) - ]); - - const arrays = { - sensor_id: new arrow.StringArray(df.sensor_id.map(s => String(s))), - timestamp: new arrow.StringArray(df.timestamp), - value: new arrow.Float64Array(df.value), - unit: new arrow.StringArray(df.unit) - }; - - const recordBatch = arrow.RecordBatch.fromArrays(schema, arrays, df.value.length); - const buffer = arrow.tableFromBatches([recordBatch]).toBuffer(); - const arrow_data = new Uint8Array(buffer); - - // Send based on size (auto-selected by smartsend) - const data = [["batch", arrow_data, "arrowtable"]]; - await NATSBridge.smartsend( - "/sensors/batch", - data, - { - broker_url: this.broker_url, - fileserver_url: this.fileserver_url - } - ); - } -} +### Minimum Infrastructure -module.exports = SensorSender; -``` +| Component | Minimum | Notes | +|-----------|---------|-------| +| NATS Server | 1 instance | Single node for development | +| File Server | 1 instance | HTTP server for large payloads | +| Client Memory | 50MB | Desktop platforms | +| Client Memory | 256KB | MicroPython devices | -#### Python +### Environment Variables -```python -# src/sensor_sender.py -from natsbridge import smartsend -from sensor_data import SensorReading, SensorBatch +| Variable | Default | Description | +|----------|---------|-------------| +| `NATS_URL` | `nats://localhost:4222` | NATS server URL | +| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | +| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes | -class SensorSender: - def __init__(self, broker_url: str, fileserver_url: str): - self.broker_url = broker_url - self.fileserver_url = fileserver_url +### Container Deployment - async def send_reading(self, sensor_id: str, value: float, unit: str): - reading = SensorReading.create(sensor_id, value, unit) - - data = [("reading", reading.metadata, "dictionary")] - - await smartsend( - f"/sensors/{sensor_id}", - data, - broker_url=self.broker_url, - fileserver_url=self.fileserver_url - ) - - async def send_batch(self, readings): - batch = SensorBatch() - for reading in readings: - batch.add_reading(reading) - - df = batch.to_dataframe() - - # Convert to Arrow IPC - import pyarrow as arrow - import pyarrow.ipc as ipc - import io - - table = arrow.Table.from_pandas(df) - buf = io.BytesIO() - sink = ipc.new_file(buf, table.schema) - ipc.write_table(table, sink) - sink.close() - arrow_data = buf.getvalue() - - # Send based on size (auto-selected by smartsend) - data = [("batch", arrow_data, "arrowtable")] - await smartsend( - "/sensors/batch", - data, - broker_url=self.broker_url, - fileserver_url=self.fileserver_url - ) +```yaml +# docker-compose.yml +version: '3' +services: + nats: + image: nats:latest + ports: + - "4222:4222" + + plik: + image: rootfs/plik:latest + ports: + - "8080:8080" + volumes: + - plik-data:/data + + app: + image: my-app:latest + depends_on: + - nats + - plik ``` --- -## Performance Optimization +## Common Pitfalls -### 1. Batch Processing +### Pitfall 1: Payload Size Threshold -#### Julia +**Issue**: Payloads just above threshold may cause unnecessary file server uploads + +**Solution**: Monitor payload sizes and adjust threshold based on: +- Network latency to file server +- Memory constraints +- File server performance ```julia -# Batch multiple readings into a single message -function send_batch_readings(sender::SensorSender, readings::Vector{Tuple{String, Float64, String}}) - batch = SensorBatch() - - for (sensor_id, value, unit) in readings - reading = SensorReading(sensor_id, value, unit) - add_reading(batch, reading) - end - - df = to_dataframe(batch) - - # Convert to Arrow IPC - import Arrow - table = Arrow.Table(df) - - # Serialize to Arrow IPC - import IOBuffer - buf = IOBuffer() - Arrow.write(buf, table) - - arrow_data = take!(buf) - - # Send as single message - smartsend( - "/sensors/batch", - [("batch", arrow_data, "arrowtable")], - broker_url=sender.broker_url - ) +# Adjust threshold based on use case +env, msg_json = smartsend("/subject", data; size_threshold = 1_000_000) # 1MB +``` + +### Pitfall 2: File Server Availability + +**Issue**: File server down during upload/download + +**Solution**: Implement fallback strategies: +- Fall back to direct transport for uploads +- Use smaller payloads to avoid link transport +- Implement application-level retries + +```julia +# Fallback to direct transport if file upload fails +try + response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) +catch e + # Fall back to direct transport + payload_b64 = Base64.base64encode(payload_bytes) + # Build payload with direct transport end ``` -#### JavaScript +### Pitfall 3: Payload Type Mismatch -```javascript -// Batch multiple readings into a single message -async function sendBatchReadings(sender, readings) { - const batch = new SensorBatch(); - - for (const [sensorId, value, unit] of readings) { - const reading = new SensorReading(sensorId, value, unit); - batch.addReading(reading); - } - - const df = batch.toDataFrame(); - - // Convert to Arrow IPC - const arrow = require('apache-arrow'); - const schema = new arrow.Schema([ - new arrow.Field('sensor_id', arrow.string()), - new arrow.Field('timestamp', arrow.string()), - new arrow.Field('value', arrow.float64()), - new arrow.Field('unit', arrow.string()) - ]); - - const arrays = { - sensor_id: new arrow.StringArray(df.sensor_id), - timestamp: new arrow.StringArray(df.timestamp), - value: new arrow.Float64Array(df.value), - unit: new arrow.StringArray(df.unit) - }; - - const recordBatch = arrow.RecordBatch.fromArrays(schema, arrays, df.value.length); - const buffer = arrow.tableFromBatches([recordBatch]).toBuffer(); - const arrow_data = new Uint8Array(buffer); - - // Send as single message - const data = [["batch", arrow_data, "arrowtable"]]; - await NATSBridge.smartsend( - "/sensors/batch", - data, - { broker_url: sender.broker_url } - ); -} -``` +**Issue**: Receiver deserializes with wrong payload_type -### 2. Connection Reuse - -#### Julia +**Solution**: Always validate payload_type matches data: +- Sender and receiver must agree on payload types +- Use consistent payload_type strings across platforms ```julia -# Reuse NATS connections -function create_connection_pool() - connections = Dict{String, NATS.Connection}() - - function get_connection(nats_url::String)::NATS.Connection - if !haskey(connections, nats_url) - connections[nats_url] = NATS.connect(nats_url) - end - return connections[nats_url] - end - - function close_all() - for conn in values(connections) - NATS.drain(conn) - end - empty!(connections) - end - - return (get_connection=get_connection, close_all=close_all) -end -``` +# Sender +smartsend("/subject", [("data", data, "arrowtable")]) -#### Python - -```python -# Reuse NATS connections -import asyncio -import nats - -class ConnectionPool: - def __init__(self): - self.connections = {} - - async def get_connection(self, nats_url: str): - if nats_url not in self.connections: - self.connections[nats_url] = await nats.connect(nats_url) - return self.connections[nats_url] - - async def close_all(self): - for conn in self.connections.values(): - await conn.drain() - self.connections.clear() -``` - -### 3. Caching - -#### Julia - -```julia -# Cache file server responses -using Base.Threads - -const file_cache = Dict{String, Vector{UInt8}}() - -function fetch_with_caching(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} - if haskey(file_cache, url) - return file_cache[url] - end - - # Fetch from file server - data = _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) - - # Cache the result - file_cache[url] = data - - return data -end -``` - -#### Python - -```python -# Cache file server responses -import asyncio -import threading -from natsbridge import fetch_with_backoff - -file_cache = {} -cache_lock = threading.Lock() - -async def fetch_with_caching(url, max_retries, base_delay, max_delay, correlation_id): - with cache_lock: - if url in file_cache: - return file_cache[url] - - # Fetch from file server - data = await fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) - - # Cache the result - with cache_lock: - file_cache[url] = data - - return data +# Receiver (must use same payload_type) +env = smartreceive(msg) +# env["payloads"][1][3] == "arrowtable" ``` --- -## Best Practices +## Performance Considerations -### 1. Error Handling +### Optimization Strategies -#### Julia +| Strategy | Description | When to Use | +|----------|-------------|-------------| +| **Pre-create NATS connection** | Reuse connection for multiple sends | High-throughput scenarios | +| **Batch small payloads** | Combine multiple small payloads | Reduce NATS overhead | +| **Adjust size threshold** | Increase threshold if file server slow | File server bottleneck | +| **Use direct transport** | Avoid file server for small payloads | Low latency requirements | + +### Benchmarking ```julia -function safe_smartsend(subject::String, data::Vector{Tuple}, kwargs...) - try - return smartsend(subject, data; kwargs...) - catch error - println("Failed to send message: $(error)") - return nothing - end -end -``` +# Benchmark direct vs link transport +using BenchmarkTools -#### JavaScript +# Direct transport +@btime smartsend("/subject", [("data", rand(1000), "arrowtable")]) -```javascript -async function safeSmartSend(subject, data, options = {}) { - try { - return await NATSBridge.smartsend(subject, data, options); - } catch (error) { - console.error(`Failed to send message: ${error}`); - return null; - } -} -``` - -#### Python - -```python -from typing import List, Tuple, Optional, Union - -async def safe_smartsend( - subject: str, - data: List[Tuple[str, Any, str]], - **kwargs -) -> Optional[Tuple[dict, str]]: - try: - return await smartsend(subject, data, **kwargs) - except Exception as error: - print(f"Failed to send message: {error}") - return None -``` - -### 2. Logging - -#### Julia - -```julia -using Logging - -function log_send(subject::String, data::Vector{Tuple}, correlation_id::String) - @info "Sending to $subject: $(length(data)) payloads, correlation_id=$correlation_id" -end - -function log_receive(correlation_id::String, num_payloads::Int) - @info "Received message: $num_payloads payloads, correlation_id=$correlation_id" -end -``` - -#### Python - -```python -import logging -from typing import List, Tuple, Any - -logger = logging.getLogger(__name__) - -def log_send(subject: str, data: List[Tuple[str, Any, str]], correlation_id: str): - logger.info(f"Sending to {subject}: {len(data)} payloads, correlation_id={correlation_id}") - -def log_receive(correlation_id: str, num_payloads: int): - logger.info(f"Received message: {num_payloads} payloads, correlation_id={correlation_id}") +# Link transport (with file server) +@btime smartsend("/subject", [("data", rand(1_000_000), "arrowtable")]) ``` --- -## Conclusion +## Versioning -This walkthrough covered: +### Current Version -- Building a chat application with rich media support -- Building a file transfer system with claim-check pattern -- Building a streaming data pipeline for sensor data +- **Major**: 1 (Breaking changes require major version bump) +- **Minor**: 0 (Feature additions) +- **Patch**: 0 (Bug fixes) -For more information, check the [API documentation](../src/README.md) and [test examples](../test/). +### Version Compatibility + +| Version | Supported Platforms | +|---------|---------------------| +| v1.0.x | Julia 1.7+, Node.js 16+, Python 3.8+, MicroPython 1.19+ | --- -## License +## Change Log -MIT +| Date | Version | Changes | +|------|---------|---------| +| 2026-03-13 | 1.0.0 | Initial walkthrough documentation | + +--- + +## References + +- [`docs/requirements.md`](./requirements.md) - Business requirements and user stories +- [`docs/spec.md`](./spec.md) - Technical specification and contracts +- [`docs/architecture.md`](./architecture.md) - System architecture diagrams +- [`src/NATSBridge.jl`](../src/NATSBridge.jl) - Ground truth implementation +- [`README.md`](../README.md) - Project overview + +--- + +*This walkthrough document is versioned and maintained in git alongside the codebase. All implementations must adhere to this documentation.* + + + + +[x] Analyze existing documentation (requirements.md, spec.md, architecture.md) +[x] Read all source files in src/ folder +[x] Write docs/walkthrough.md according to SDD framework \ No newline at end of file