diff --git a/docs/architecture.md b/docs/architecture.md index 2f04b01..2043353 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -57,11 +57,14 @@ All three platforms expose the same high-level API: | `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | | `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | | `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | +| `table` | ❌ | ❌ | `pandas.DataFrame`, `bytes` (Arrow IPC) | | `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | | `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | | `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | | `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray`, `io.BytesIO` | +**Note on MicroPython:** MicroPython does not support table types (`arrowtable` or `jsontable`) due to memory constraints. Use `dictionary` or `binary` instead. + ### Cross-Platform API Examples **Julia:** @@ -244,16 +247,6 @@ flowchart TB "size": 524288, "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow", "metadata": {} - }, - { - "id": "uuid4", - "dataname": "json_table", - "payload_type": "jsontable", - "transport": "direct", - "encoding": "json", - "size": 1024, - "data": "[{\"id\": 1, \"name\": \"Alice\"}, {\"id\": 2, \"name\": \"Bob\"}]", - "metadata": {} } ] } @@ -266,7 +259,7 @@ flowchart TB { "id": "uuid4", "dataname": "login_image", - "payload_type": "image | dictionary | arrowtable | jsontable | text | audio | video | binary", + "payload_type": "image | dictionary | arrowtable | jsontable | table | text | audio | video | binary", "transport": "direct | link", "encoding": "none | json | base64 | arrow-ipc", "size": 15433, @@ -312,735 +305,6 @@ flowchart TB --- -## Platform-Specific Implementations - -### Julia Implementation - -#### Architecture Patterns - -**Multiple Dispatch:** Julia's core strength is leveraged through function overloading: - -```julia -# publish_message has two overloads based on argument types -function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) - conn = NATS.connect(broker_url) - publish_message(conn, subject, message, correlation_id) -end - -function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) - try - NATS.publish(conn, subject, message) - log_trace(correlation_id, "Message published to $subject") - finally - NATS.drain(conn) - end -end -``` - -**Struct-Based Data Models:** -```julia -struct msg_payload_v1 - id::String - dataname::String - payload_type::String - transport::String - encoding::String - size::Integer - data::Any - metadata::Dict{String, Any} -end - -struct msg_envelope_v1 - correlation_id::String - msg_id::String - timestamp::String - send_to::String - msg_purpose::String - sender_name::String - sender_id::String - receiver_name::String - receiver_id::String - reply_to::String - reply_to_msg_id::String - broker_url::String - metadata::Dict{String, Any} - payloads::Vector{msg_payload_v1} -end -``` - -#### Dependencies - -| Package | Purpose | -|---------|---------| -| `NATS.jl` | Core NATS functionality | -| `Arrow.jl` | Arrow IPC serialization | -| `JSON.jl` | JSON parsing | -| `HTTP.jl` | HTTP client for file server | -| `UUIDs.jl` | UUID generation | -| `Dates.jl` | Timestamps | -| `Base64` | Base64 encoding | - -#### File Server Handler Signatures - -```julia -# Upload handler -fileserver_upload_handler( - fileserver_url::String, - dataname::String, - data::Vector{UInt8} -)::Dict{String, Any} - -# Download handler -fileserver_download_handler( - url::String, - max_retries::Int, - base_delay::Int, - max_delay::Int, - correlation_id::String -)::Vector{UInt8} -``` - -#### Key Functions - -```julia -# Main send/receive functions -function smartsend( - subject::String, - data::AbstractArray{Tuple{String, Any, String}, 1}; - broker_url::String = DEFAULT_BROKER_URL, - fileserver_url = DEFAULT_FILESERVER_URL, - fileserver_upload_handler::Function = plik_oneshot_upload, - size_threshold::Int = DEFAULT_SIZE_THRESHOLD, - correlation_id::String = string(uuid4()), - msg_purpose::String = "chat", - sender_name::String = "NATSBridge", - receiver_name::String = "", - receiver_id::String = "", - reply_to::String = "", - reply_to_msg_id::String = "", - is_publish::Bool = true, - NATS_connection::Union{NATS.Connection, Nothing} = nothing, - msg_id::String = string(uuid4()), - sender_id::String = string(uuid4()) -)::Tuple{msg_envelope_v1, String} - -function smartreceive( - msg::NATS.Msg; - fileserver_download_handler::Function = _fetch_with_backoff, - max_retries::Int = 5, - base_delay::Int = 100, - max_delay::Int = 5000 -)::JSON.Object{String, Any} -``` - -#### Serialization Logic for Tables - -```julia -# Serialize table data based on payload_type -function _serialize_table_data(data::Any, payload_type::String)::Vector{UInt8} - if payload_type == "arrowtable" - # Serialize to Apache Arrow IPC format - buffer = IOBuffer() - Arrow.write(buffer, data) - return take!(buffer) - elseif payload_type == "jsontable" - # Serialize to JSON format - json_str = JSON.json(data) - return Vector{UInt8}(json_str) - else - throw(ArgumentError("Unknown payload_type: $payload_type")) - end -end - -# Deserialize table data based on payload_type -function _deserialize_table_data(data::Vector{UInt8}, payload_type::String)::Any - if payload_type == "arrowtable" - # Deserialize from Apache Arrow IPC format - buffer = Buffer(data) - return Arrow.read(buffer) - elseif payload_type == "jsontable" - # Deserialize from JSON format - json_str = String(data) - return JSON.parse(json_str) - else - throw(ArgumentError("Unknown payload_type: $payload_type")) - end -end -``` - ---- - -### JavaScript Implementation - -#### Architecture Patterns - -**Async/Await Pattern:** JavaScript uses async/await for non-blocking I/O: - -```javascript -// smartsend is async and returns a Promise -async function smartsend(subject, data, options = {}) { - const { - broker_url = DEFAULT_BROKER_URL, - fileserver_url = DEFAULT_FILESERVER_URL, - fileserver_upload_handler = plikOneshotUpload, - size_threshold = DEFAULT_SIZE_THRESHOLD, - correlation_id = generateUUID(), - msg_purpose = "chat", - sender_name = "NATSBridge", - receiver_name = "", - receiver_id = "", - reply_to = "", - reply_to_msg_id = "", - is_publish = true, - nats_connection = null, - msg_id = generateUUID(), - sender_id = generateUUID() - } = options; - - // Process payloads - const payloads = []; - for (const [dataname, payloadData, payloadType] of data) { - const payloadBytes = await serializeData(payloadData, payloadType); - const payloadSize = payloadBytes.byteLength; - - if (payloadSize < size_threshold) { - // Direct path - const payloadB64 = base64Encode(payloadBytes); - payloads.push({ - id: generateUUID(), - dataname, - payload_type: payloadType, - transport: "direct", - encoding: "base64", - size: payloadSize, - data: payloadB64 - }); - } else { - // Link path - const response = await fileserver_upload_handler( - fileserver_url, dataname, payloadBytes - ); - payloads.push({ - id: generateUUID(), - dataname, - payload_type: payloadType, - transport: "link", - encoding: "none", - size: payloadSize, - data: response.url - }); - } - } - - const env = buildEnvelope(subject, payloads, { - correlation_id, msg_id, msg_purpose, - sender_name, sender_id, receiver_name, - receiver_id, reply_to, reply_to_msg_id, - broker_url - }); - - const env_json_str = JSON.stringify(env); - - if (is_publish) { - if (nats_connection) { - await publishMessage(nats_connection, subject, env_json_str, correlation_id); - } else { - await publishMessage(broker_url, subject, env_json_str, correlation_id); - } - } - - return [env, env_json_str]; -} -``` - -**Prototype-Based Utilities:** -```javascript -// NATS client wrapper (prototype-based) -class NATSClient { - constructor(url) { - this.url = url; - this.connection = null; - } - - async connect() { - this.connection = await nats.connect({ servers: this.url }); - return this.connection; - } - - async publish(subject, message) { - if (!this.connection) { - await this.connect(); - } - await this.connection.publish(subject, message); - } - - async close() { - if (this.connection) { - this.connection.close(); - } - } -} -``` - -#### Dependencies (Node.js) - -| Package | Purpose | -|---------|---------| -| `nats` | Core NATS functionality (nats.js) | -| `crypto` (built-in) | UUID generation (Node.js) | -| `node-fetch` or `axios` | HTTP client for file server | -| `apache-arrow` | Arrow IPC serialization | - -#### Dependencies (Browser) - -| Package | Purpose | -|---------|---------| -| `nats` | Browser-compatible NATS client | -| `crypto` (built-in) | UUID generation (browser) | -| `fetch` (native) | HTTP client for file server | -| `apache-arrow` | Arrow IPC serialization | - -#### Dependencies (MicroPython) - -| Module | Purpose | -|--------|---------| -| `nats` (custom) | MicroPython NATS client | -| `time` | Timestamps | -| `uos` | File operations | -| `base64` | Base64 encoding | - -#### File Server Handler Signatures - -```javascript -// Upload handler - async function returning Promise -async function fileserver_upload_handler( - fileserver_url, - dataname, - data // Uint8Array -) { - // Returns: { status, uploadid, fileid, url } -} - -// Download handler - async function returning Promise -async function fileserver_download_handler( - url, - max_retries, - base_delay, - max_delay, - correlation_id -) { - // Returns: Uint8Array -} -``` - -#### Key Functions - -```javascript -// Main send/receive functions -async function smartsend(subject, data, options = {}) { - // data: Array of [dataname, data, type] tuples - // Returns: Promise<[env, env_json_str]> -} - -async function smartreceive(msg, options = {}) { - // msg: NATS message object - // Returns: Promise -} - -// Utility functions -async function serializeData(data, payload_type) { - // Returns: Uint8Array -} - -async function deserializeData(data, payload_type) { - // Returns: deserialized data -} - -async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) { - // Returns: Uint8Array -} -``` - -#### Serialization Logic for Tables - -```javascript -// Serialize table data based on payload_type -async function serializeTableData(data, payload_type) { - if (payload_type === "arrowtable") { - // Serialize to Apache Arrow IPC format - const schema = new arrow.Schema([...]); // Define schema - const arr = arrow.tableToArrowTable(data, schema); - const buffer = arrow.RecordBatch.from(arr).toBuffer(); - return new Uint8Array(buffer); - } else if (payload_type === "jsontable") { - // Serialize to JSON format - const jsonStr = JSON.stringify(data); - return new TextEncoder().encode(jsonStr); - } else { - throw new Error(`Unknown payload_type: ${payload_type}`); - } -} - -// Deserialize table data based on payload_type -async function deserializeTableData(data, payload_type) { - if (payload_type === "arrowtable") { - // Deserialize from Apache Arrow IPC format - const buffer = arrow.arrayBufferToBuffer(data.buffer); - const batch = arrow.RecordBatch.deserialize(buffer); - return arrow.tableFromBatch(batch); - } else if (payload_type === "jsontable") { - // Deserialize from JSON format - const jsonStr = new TextDecoder().decode(data); - return JSON.parse(jsonStr); - } else { - throw new Error(`Unknown payload_type: ${payload_type}`); - } -} -``` - ---- - -### Python/MicroPython Implementation - -#### Architecture Patterns - -**Class-Based Design:** Python uses classes for stateful operations: - -```python -class NATSBridge: - """Cross-platform NATS bridge implementation.""" - - DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - DEFAULT_BROKER_URL = "nats://localhost:4222" - DEFAULT_FILESERVER_URL = "http://localhost:8080" - - def __init__(self, broker_url=None, fileserver_url=None): - self.broker_url = broker_url or self.DEFAULT_BROKER_URL - self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL - self._nats_client = None - - async def smartsend(self, subject, data, **kwargs): - """ - Send data via NATS with automatic transport selection. - - Args: - subject: NATS subject to publish to - data: List of (dataname, data, type) tuples - **kwargs: Additional options (broker_url, fileserver_url, etc.) - - Returns: - Tuple of (env, env_json_str) - """ - # Extract options with defaults - options = self._merge_options(kwargs) - - # Process payloads - payloads = [] - for dataname, payload_data, payload_type in data: - payload_bytes = self._serialize_data(payload_data, payload_type) - payload_size = len(payload_bytes) - - if payload_size < options['size_threshold']: - # Direct path - payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') - payloads.append({ - 'id': uuid.uuid4().hex, - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'direct', - 'encoding': 'base64', - 'size': payload_size, - 'data': payload_b64 - }) - else: - # Link path - response = await options['fileserver_upload_handler']( - options['fileserver_url'], dataname, payload_bytes - ) - payloads.append({ - 'id': uuid.uuid4().hex, - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'link', - 'encoding': 'none', - 'size': payload_size, - 'data': response['url'] - }) - - # Build envelope - env = self._build_envelope(subject, payloads, options) - env_json_str = json.dumps(env) - - if options['is_publish']: - await self._publish_message( - subject, env_json_str, options['correlation_id'], - nats_connection=options.get('nats_connection') - ) - - return env, env_json_str - - async def smartreceive(self, msg, **kwargs): - """ - Receive and process NATS message. - - Args: - msg: NATS message object - **kwargs: Additional options (fileserver_download_handler, etc.) - - Returns: - Dict with envelope metadata and payloads - """ - # Parse envelope - env_json_obj = json.loads(msg.payload) - - # Process payloads - payloads_list = [] - for payload in env_json_obj['payloads']: - transport = payload['transport'] - dataname = payload['dataname'] - - if transport == 'direct': - payload_b64 = payload['data'] - payload_bytes = base64.b64decode(payload_b64) - data_type = payload['payload_type'] - data = self._deserialize_data(payload_bytes, data_type) - payloads_list.append((dataname, data, data_type)) - elif transport == 'link': - url = payload['data'] - downloaded_data = await options['fileserver_download_handler']( - url, - options['max_retries'], - options['base_delay'], - options['max_delay'], - env_json_obj['correlation_id'] - ) - data_type = payload['payload_type'] - data = self._deserialize_data(downloaded_data, data_type) - payloads_list.append((dataname, data, data_type)) - - env_json_obj['payloads'] = payloads_list - return env_json_obj -``` - -**Dataclass for Type Safety:** -```python -from dataclasses import dataclass, field -from typing import Any, Dict, List, Tuple, Union - -@dataclass -class MsgPayloadV1: - """Message payload structure.""" - id: str - dataname: str - payload_type: str - transport: str - encoding: str - size: int - data: Union[str, bytes] # URL for link, base64 for direct - metadata: Dict[str, Any] = field(default_factory=dict) - -@dataclass -class MsgEnvelopeV1: - """Message envelope structure.""" - correlation_id: str - msg_id: str - timestamp: str - send_to: str - msg_purpose: str - sender_name: str - sender_id: str - receiver_name: str - receiver_id: str - reply_to: str - reply_to_msg_id: str - broker_url: str - metadata: Dict[str, Any] = field(default_factory=dict) - payloads: List[MsgPayloadV1] = field(default_factory=list) -``` - -#### Dependencies (Desktop Python) - -| Package | Purpose | -|---------|---------| -| `nats-py` | Core NATS functionality | -| `uuid` | UUID generation (stdlib) | -| `aiohttp` or `requests` | HTTP client for file server | -| `pyarrow` | Arrow IPC serialization | -| `pandas` | DataFrame support (optional) | -| `python-dateutil` | Timestamps | -| `base64` | Base64 encoding (stdlib) | - -#### Dependencies (MicroPython) - -| Module | Purpose | -|--------|---------| -| `network` | NATS connection (custom) | -| `time` | Timestamps | -| `uos` | File operations | -| `base64` | Base64 encoding | -| `json` | JSON parsing | -| `struct` | Binary data handling | - -**MicroPython Limitations:** -- No Arrow IPC support (memory constraints) -- Only direct transport (< 1MB threshold enforced) -- Simplified UUID generation -- No async/await (use callbacks or uasyncio) - -#### File Server Handler Signatures - -```python -# Upload handler - async function -async def fileserver_upload_handler( - fileserver_url: str, - dataname: str, - data: bytes -) -> Dict[str, Any]: - """ - Upload data to file server. - - Args: - fileserver_url: Base URL of file server - dataname: Name of the file - data: Binary data - - Returns: - Dict with keys: 'status', 'uploadid', 'fileid', 'url' - """ - pass - -# Download handler - async function -async def fileserver_download_handler( - url: str, - max_retries: int, - base_delay: int, - max_delay: int, - correlation_id: str -) -> bytes: - """ - Download data from URL with exponential backoff. - - Args: - url: URL to download from - max_retries: Maximum retry attempts - base_delay: Initial delay in ms - max_delay: Maximum delay in ms - correlation_id: Correlation ID for logging - - Returns: - Downloaded bytes - """ - pass -``` - -#### Key Functions - -```python -# Main send/receive functions (standalone or class methods) -async def smartsend( - subject: str, - data: List[Tuple[str, Any, str]], - broker_url: str = DEFAULT_BROKER_URL, - fileserver_url: str = DEFAULT_FILESERVER_URL, - fileserver_upload_handler: Callable = plik_oneshot_upload, - size_threshold: int = DEFAULT_SIZE_THRESHOLD, - correlation_id: str = None, - msg_purpose: str = "chat", - sender_name: str = "NATSBridge", - receiver_name: str = "", - receiver_id: str = "", - reply_to: str = "", - reply_to_msg_id: str = "", - is_publish: bool = True, - nats_connection: Any = None, - msg_id: str = None, - sender_id: str = None -) -> Tuple[Dict, str]: - """Send data via NATS.""" - pass - -async def smartreceive( - msg: Any, - fileserver_download_handler: Callable = fetch_with_backoff, - max_retries: int = 5, - base_delay: int = 100, - max_delay: int = 5000 -) -> Dict: - """Receive and process NATS message.""" - pass - -# Utility functions -def _serialize_data(data: Any, payload_type: str) -> bytes: - """Serialize data to bytes.""" - pass - -def _deserialize_data(data: bytes, payload_type: str) -> Any: - """Deserialize bytes to data.""" - pass - -async def fetch_with_backoff( - url: str, - max_retries: int, - base_delay: int, - max_delay: int, - correlation_id: str -) -> bytes: - """Fetch URL with exponential backoff.""" - pass -``` - -#### Serialization Logic for Tables - -```python -# Serialize table data based on payload_type -def serialize_table_data(data: Any, payload_type: str) -> bytes: - if payload_type == "arrowtable": - # Serialize to Apache Arrow IPC format - import pyarrow as pa - import pyarrow.feather as feather - import io - - if isinstance(data, pd.DataFrame): - table = pa.Table.from_pandas(data) - buffer = io.BytesIO() - feather.write_feather(table, buffer) - return buffer.getvalue() - else: - raise TypeError("Expected pandas DataFrame for arrowtable") - - elif payload_type == "jsontable": - # Serialize to JSON format - if isinstance(data, list) and all(isinstance(row, dict) for row in data): - return json.dumps(data).encode('utf-8') - else: - raise TypeError("Expected list of dicts for jsontable") - - else: - raise ValueError(f"Unknown payload_type: {payload_type}") - -# Deserialize table data based on payload_type -def deserialize_table_data(data: bytes, payload_type: str) -> Any: - if payload_type == "arrowtable": - # Deserialize from Apache Arrow IPC format - import pyarrow as pa - import pyarrow.feather as feather - import io - - buffer = io.BytesIO(data) - table = feather.read_table(buffer) - return table.to_pandas() - - elif payload_type == "jsontable": - # Deserialize from JSON format - json_str = data.decode('utf-8') - return json.loads(json_str) - - else: - raise ValueError(f"Unknown payload_type: {payload_type}") -``` - ---- - ## Platform Comparison Matrix | Feature | Julia | JavaScript | Python | MicroPython | @@ -1052,7 +316,8 @@ def deserialize_table_data(data: bytes, payload_type: str) -> Any: | **Arrow IPC** | ✅ Native | ✅ (arrow package) | ✅ (pyarrow) | ❌ | | **JSON Serialization** | ✅ (JSON.jl) | ✅ (native) | ✅ (json) | ✅ (json) | | **arrowtable Support** | ✅ | ✅ | ✅ | ❌ | -| **jsontable Support** | ✅ | ✅ | ✅ | ✅ | +| **jsontable Support** | ✅ | ✅ | ✅ | ❌ | +| **table Support** | ❌ | ❌ | ✅ | ❌ | | **Direct Transport** | ✅ | ✅ | ✅ | ✅ | | **Link Transport** | ✅ | ✅ | ✅ | ⚠️ (Limited) | | **Handler Functions** | ✅ | ✅ | ✅ | ✅ | @@ -1060,217 +325,85 @@ def deserialize_table_data(data: bytes, payload_type: str) -> Any: --- -## Implementation Details by Platform +## Platform-Specific Architecture Patterns ### Julia: Multiple Dispatch Pattern -```julia -# Function overloading based on argument types -function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) - # Creates new connection -end +Julia leverages multiple dispatch for type-specific implementations: -function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) - # Uses pre-existing connection -end - -# Type-specific serialization -function _serialize_data(data::String, payload_type::String) - # Text handling -end - -function _serialize_data(data::Dict, payload_type::String) - # Dictionary handling -end - -function _serialize_data(data::DataFrame, payload_type::String) - # Table handling - arrowtable -end - -function _serialize_data(data::Vector{NamedTuple}, payload_type::String) - # Table handling - jsontable -end -``` +- **Function overloading** based on argument types +- **Struct-based data models** with explicit types +- **Native Arrow IPC** support via Arrow.jl ### JavaScript: Prototype + Async Pattern -```javascript -// Class-based NATS client -class NATSClient { - constructor(url) { - this.url = url; - } - - async connect() { - // Connection logic - } - - async publish(subject, message) { - // Publish logic - } -} +JavaScript uses async/await for non-blocking I/O: -// Module-level utility functions -function generateUUID() { - return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => { - const r = Math.random() * 16 | 0; - return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16); - }); -} - -async function serializeData(data, payload_type) { - // Serialization logic for arrowtable and jsontable -} -``` +- **Class-based NATS client** for connection management +- **Module-level utility functions** for serialization +- **Native ArrayBuffer** for binary data handling ### Python: Class-Based Pattern -```python -class NATSBridge: - """Main bridge class.""" - - def __init__(self, broker_url=None): - self.broker_url = broker_url or DEFAULT_BROKER_URL - - async def smartsend(self, subject, data, **kwargs): - """Send data.""" - pass - - async def smartreceive(self, msg, **kwargs): - """Receive message.""" - pass +Python uses classes for stateful operations: -# Module-level convenience functions -def smartsend(subject, data, **kwargs): - """Convenience function using default NATSBridge instance.""" - bridge = NATSBridge() - return await bridge.smartsend(subject, data, **kwargs) +- **Class-based NATSBridge** with type hints +- **Dataclasses** for structured data (MsgPayloadV1, MsgEnvelopeV1) +- **Async/await** for I/O operations -def smartreceive(msg, **kwargs): - """Convenience function using default NATSBridge instance.""" - bridge = NATSBridge() - return await bridge.smartreceive(msg, **kwargs) -``` +### MicroPython: Synchronous Pattern + +MicroPython has significant constraints: + +- **Synchronous API** (no async/await) +- **Memory-constrained** (256KB - 1MB) +- **Limited payload support** (no tables, max 50KB) --- -## Scenario Implementations (Cross-Platform) +## Cross-Platform Compatibility Notes -### Scenario 1: Command & Control (Small Dictionary) +### 1. Python Payload Type Naming -| Platform | Code | -|----------|------| -| **Julia** | ```julia
config = Dict("step_size" => 0.01)
env, env_json_str = smartsend("control", [("config", config, "dictionary")])``` | -| **JavaScript** | ```javascript
const config = { step_size: 0.01 };
[env, env_json_str] = await smartsend("control", [["config", config, "dictionary"]]);``` | -| **Python** | ```python
config = {"step_size": 0.01}
env, env_json_str = await smartsend("control", [("config", config, "dictionary")])``` | +The Python implementation uses `"table"` as a single payload type for both Arrow and JSON table serialization, while Julia and JavaScript use separate types (`"arrowtable"` and `"jsontable"`): -### Scenario 2: Deep Dive Analysis (Large Arrow Table) +| Platform | Table Types | +|----------|-------------| +| Julia | `"arrowtable"`, `"jsontable"` | +| JavaScript | `"arrowtable"`, `"jsontable"` | +| Python | `"table"` (single type) | +| MicroPython | Not supported | -| Platform | Code | -|----------|------| -| **Julia** | ```julia
df = DataFrame(id=1:1000000, value=rand(1000000))
env, env_json_str = smartsend("analysis", [("table_data", df, "arrowtable")])``` | -| **JavaScript** | ```javascript
const df = [{ id: 1, value: 0.5 }, ...];
[env, env_json_str] = await smartsend("analysis", [["table_data", df, "arrowtable"]]);``` | -| **Python** | ```python
import pandas as pd
df = pd.DataFrame({"id": range(1000000), "value": np.random.rand(1000000)})
env, env_json_str = await smartsend("analysis", [("table_data", df, "arrowtable")])``` | +**Impact:** When exchanging data between Python and Julia/JavaScript, the payload type will differ. Python code should use `"table"` while Julia/JavaScript code should use `"arrowtable"` or `"jsontable"`. -### Scenario 3: Chat System (Multi-Payload) +### 2. Direct Transport Encoding Field -| Platform | Code | -|----------|------| -| **Julia** | ```julia
chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]
env, env_json_str = smartsend("chat", chat)``` | -| **JavaScript** | ```javascript
const chat = [["text", "Hello!", "text"], ["image", imgBuffer, "image"]];
[env, env_json_str] = await smartsend("chat", chat);``` | -| **Python** | ```python
chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]
env, env_json_str = await smartsend("chat", chat)``` | +The encoding field in direct transport payloads differs between platforms: -### Scenario 4: JSON Table Transfer (Cross-Platform) +| Platform | Encoding for Direct Transport | +|----------|-------------------------------| +| Julia | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` | +| JavaScript | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` | +| Python | Always `"base64"` for all direct transport payloads | +| MicroPython | Always `"base64"` for all direct transport payloads | -| Platform | Code | -|----------|------| -| **Julia** | ```julia
rows = [Dict("id" => 1, "name" => "Alice"), Dict("id" => 2, "name" => "Bob")]
env, env_json_str = smartsend("data", [("users", rows, "jsontable")])``` | -| **JavaScript** | ```javascript
const users = [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }];
[env, env_json_str] = await smartsend("data", [["users", users, "jsontable"]]);``` | -| **Python** | ```python
users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
env, env_json_str = await smartsend("data", [("users", users, "jsontable")])``` | +**Impact:** The encoding field may not accurately reflect the original serialization format when using Python or MicroPython. -### Scenario 5: Smart Transport Selection +### 3. MicroPython Limitations -The `smartsend` function automatically selects the transport method based on payload size: +MicroPython has significant constraints that affect feature support: -- **Direct Transport (< 1MB)**: Payload is serialized and embedded directly in the NATS message - - `arrowtable`: Serialized to Arrow IPC, base64 encoded - - `jsontable`: Serialized to JSON, base64 encoded - - `dictionary`: Serialized to JSON, base64 encoded - - `text`: Serialized to UTF-8, base64 encoded - - `image/audio/video/binary`: Base64 encoded +| Feature | Desktop Platforms | MicroPython | +|---------|-------------------|-------------| +| `arrowtable` | ✅ | ❌ (not supported - memory constraints) | +| `jsontable` | ✅ | ❌ (not supported - memory constraints) | +| `table` | ✅ | ❌ (not supported - memory constraints) | +| Async/await | ✅ | ❌ (synchronous only) | +| File upload/download | ✅ | ⚠️ (placeholder implementations) | +| MAX_PAYLOAD_SIZE | 1MB+ | 50KB (hard limit) | +| DEFAULT_SIZE_THRESHOLD | 1MB | 100KB | -- **Link Transport (>= 1MB)**: Payload is uploaded to HTTP file server, URL embedded in message - - All types supported - - Receiver downloads from URL and deserializes - ---- - -## Performance Considerations (Cross-Platform) - -### Zero-Copy Reading - -| Platform | Strategy | -|----------|----------| -| **Julia** | `Arrow.read()` with memory-mapped files | -| **JavaScript** | `ArrayBuffer` with `DataView` | -| **Python** | `pyarrow` memory mapping | -| **MicroPython** | Not available (streaming only) | - -### Exponential Backoff - -```python -# Python/MicroPython -async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id): - delay = base_delay - for attempt in range(1, max_retries + 1): - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - return await response.read() - except Exception as e: - if attempt < max_retries: - await asyncio.sleep(delay / 1000.0) - delay = min(delay * 2, max_delay) - raise Exception("Failed to fetch after max retries") -``` - -### Correlation ID Logging - -All platforms use correlation IDs for distributed tracing: - -``` -[timestamp] [Correlation: abc123] Message published to subject -``` - -### Serialization Performance Comparison - -| Format | Use Case | Pros | Cons | -|--------|----------|------|------| -| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library | -| `jsontable` | Small/medium tabular data | Human-readable, universal support | Slower, larger size, no schema | - ---- - -## Testing Strategy (Cross-Platform) - -### Unit Tests - -| Test Type | Julia | JavaScript | Python | -|-----------|-------|------------|--------| -| **Serialization** | `test/test_julia_text_sender.jl` | `test/test_js_text_sender.js` | `test/test_py_text_sender.py` | -| **Deserialization** | `test/test_julia_text_receiver.jl` | `test/test_js_text_receiver.js` | `test/test_py_text_receiver.py` | -| **Large Payload** | `test/test_julia_file_sender.jl` | `test/test_js_file_sender.js` | `test/test_py_file_sender.py` | -| **Multi-Payload** | `test/test_julia_mix_payloads_sender.jl` | `test/test_js_mix_payloads_sender.js` | `test/test_py_mix_payloads_sender.py` | -| **Arrow Table** | `test/test_julia_table_sender.jl` | `test/test_js_table_sender.js` | `test/test_py_table_sender.py` | - -### Integration Tests - -- NATS server communication -- File server upload/download -- Cross-platform message exchange -- Arrow table serialization/deserialization -- JSON table serialization/deserialization +**Impact:** MicroPython should only be used for small payloads with direct transport. File server operations are not fully implemented. --- @@ -1296,6 +429,51 @@ MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython --- +## Performance Considerations + +### Zero-Copy Reading + +| Platform | Strategy | +|----------|----------| +| **Julia** | `Arrow.read()` with memory-mapped files | +| **JavaScript** | `ArrayBuffer` with `DataView` | +| **Python** | `pyarrow` memory mapping | +| **MicroPython** | Not available (streaming only) | + +### Exponential Backoff + +All platforms implement exponential backoff for HTTP downloads: + +``` +delay = base_delay +for attempt in 1:max_retries: + try: + response = fetch(url) + if success: return response + except: + if attempt < max_retries: + sleep(delay) + delay = min(delay * 2, max_delay) +``` + +### Correlation ID Logging + +All platforms use correlation IDs for distributed tracing: + +``` +[timestamp] [Correlation: abc123] Message published to subject +``` + +### Serialization Performance Comparison + +| Format | Use Case | Pros | Cons | +|--------|----------|------|------| +| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library | +| `jsontable` | Small/medium tabular data | Human-readable, universal support | Slower, larger size, no schema | +| `table` (Python) | Large tabular data | Fast, zero-copy, schema-preserving | Python-specific, requires pyarrow | + +--- + ## Summary This cross-platform NATS bridge provides: @@ -1305,24 +483,27 @@ This cross-platform NATS bridge provides: - Julia: Multiple dispatch and struct-based design - JavaScript: Async/await and prototype-based utilities - Python: Class-based design with type hints + - MicroPython: Synchronous API with memory constraints 3. **Message Format Consistency**: Identical `msg_envelope_v1` and `msg_payload_v1` JSON schemas 4. **Handler Abstraction**: File server operations abstracted through configurable handlers 5. **Platform-Specific Optimizations**: - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data - **JSON** (`jsontable`): Universal human-readable format for smaller tables + - **Python table**: Unified table type for Python-specific implementations - Streaming support in MicroPython The Julia implementation serves as the **ground truth** for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms. ### Datatype Summary -| Datatype | Serialization | Use Case | Encoding | -|----------|---------------|----------|----------| -| `text` | UTF-8 bytes | Text messages, chat content | `utf-8` → `base64` | -| `dictionary` | JSON | Structured key-value data, config | `json` → `base64` | -| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | -| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | -| `image` | Binary | Image files (JPEG, PNG, etc.) | `binary` → `base64` | -| `audio` | Binary | Audio files (WAV, MP3, etc.) | `binary` → `base64` | -| `video` | Binary | Video files (MP4, AVI, etc.) | `binary` → `base64` | -| `binary` | Binary | Generic binary data, files | `binary` → `base64` | +| Datatype | Serialization | Use Case | Encoding | Supported Platforms | +|----------|---------------|----------|----------|---------------------| +| `text` | UTF-8 bytes | Text messages, chat content | `utf-8` → `base64` | All | +| `dictionary` | JSON | Structured key-value data, config | `json` → `base64` | All | +| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python | +| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python | +| `table` | Apache Arrow IPC | Python's unified table type | `arrow-ipc` → `base64` | Python | +| `image` | Binary | Image files (JPEG, PNG, etc.) | `binary` → `base64` | All | +| `audio` | Binary | Audio files (WAV, MP3, etc.) | `binary` → `base64` | All | +| `video` | Binary | Video files (MP4, AVI, etc.) | `binary` → `base64` | All | +| `binary` | Binary | Generic binary data, files | `binary` → `base64` | All | diff --git a/docs/implementation.md b/docs/implementation.md index 18a8105..8467f78 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -2,7 +2,7 @@ ## Overview -This document describes the implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. +This document describes the detailed implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. **Supported Platforms:** - **Julia** - Ground truth implementation (reference) @@ -11,6 +11,52 @@ This document describes the implementation of the high-performance, bi-direction --- +## Cross-Platform Compatibility Notes + +### 1. Python Payload Type Naming + +The Python implementation uses `"table"` as a single payload type for both Arrow and JSON table serialization, while Julia and JavaScript use separate types (`"arrowtable"` and `"jsontable"`): + +| Platform | Table Types | +|----------|-------------| +| Julia | `"arrowtable"`, `"jsontable"` | +| JavaScript | `"arrowtable"`, `"jsontable"` | +| Python | `"table"` (single type) | +| MicroPython | Not supported | + +**Impact:** When exchanging data between Python and Julia/JavaScript, the payload type will differ. Python code should use `"table"` while Julia/JavaScript code should use `"arrowtable"` or `"jsontable"`. + +### 2. Direct Transport Encoding Field + +The encoding field in direct transport payloads differs between platforms: + +| Platform | Encoding for Direct Transport | +|----------|-------------------------------| +| Julia | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` | +| JavaScript | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` | +| Python | Always `"base64"` for all direct transport payloads | +| MicroPython | Always `"base64"` for all direct transport payloads | + +**Impact:** The encoding field may not accurately reflect the original serialization format when using Python or MicroPython. + +### 3. MicroPython Limitations + +MicroPython has significant constraints that affect feature support: + +| Feature | Desktop Platforms | MicroPython | +|---------|-------------------|-------------| +| `arrowtable` | ✅ | ❌ (not supported - memory constraints) | +| `jsontable` | ✅ | ❌ (not supported - memory constraints) | +| `table` | ✅ | ❌ (not supported - memory constraints) | +| Async/await | ✅ | ❌ (synchronous only) | +| File upload/download | ✅ | ⚠️ (placeholder implementations) | +| MAX_PAYLOAD_SIZE | 1MB+ | 50KB (hard limit) | +| DEFAULT_SIZE_THRESHOLD | 1MB | 100KB | + +**Impact:** MicroPython should only be used for small payloads with direct transport. File server operations are not fully implemented. + +--- + ## Implementation Files | Language | Implementation File | Description | @@ -178,579 +224,14 @@ The system uses a **standardized list-of-tuples format** for all payload operati | `text` | `String` | `string` | `str` | `str` | | `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | | `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) | -| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | `list` | +| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | ⚠️ (limited) | +| `table` | ❌ | ❌ | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ | | `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | | `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | | `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | | `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` | -### Cross-Platform Examples - -#### Julia - -```julia -using NATSBridge - -# Single payload - still wrapped in a list -env, env_json_str = smartsend( - "/test", - [("dataname1", data1, "dictionary")], - broker_url="nats://localhost:4222", - fileserver_upload_handler=plik_oneshot_upload -) - -# Multiple payloads with different types -env, env_json_str = smartsend( - "/test", - [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")], - broker_url="nats://localhost:4222" -) - -# Mixed content (chat with text, image, audio) -env, env_json_str = smartsend( - "/chat", - [ - ("message_text", "Hello!", "text"), - ("user_image", image_data, "image"), - ("audio_clip", audio_data, "audio") - ], - broker_url="nats://localhost:4222" -) - -# Receive returns a JSON.Object{String, Any} envelope -env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) -# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}} -# Access payloads: env["payloads"] which is a Vector of tuples -for (dataname, data, type) in env["payloads"] - println("$dataname: $data (type: $type)") -end -``` - -#### JavaScript - -```javascript -const NATSBridge = require('natsbridge'); - -// Single payload -const [env, env_json_str] = await NATSBridge.smartsend( - "/test", - [["dataname1", data1, "dictionary"]], - { - broker_url: "nats://localhost:4222", - fileserver_upload_handler: plikOneshotUpload - } -); - -// Multiple payloads -const [env, env_json_str] = await NATSBridge.smartsend( - "/test", - [ - ["dataname1", data1, "dictionary"], - ["dataname2", data2, "arrowtable"] - ], - { broker_url: "nats://localhost:4222" } -); - -// Mixed content -const [env, env_json_str] = await NATSBridge.smartsend( - "/chat", - [ - ["message_text", "Hello!", "text"], - ["user_image", imageData, "image"], - ["audio_clip", audioData, "audio"] - ], - { broker_url: "nats://localhost:4222" } -); - -// Receive -const env = await NATSBridge.smartreceive(msg, { - fileserver_download_handler: fetchWithBackoff -}); -// env is an object with "payloads" field containing Array of arrays -// Access payloads: env.payloads which is an Array of [dataname, data, type] arrays -for (const [dataname, data, type] of env.payloads) { - console.log(`${dataname}: ${data} (type: ${type})`); -} -``` - -#### Python - -```python -from natsbridge import NATSBridge - -# Single payload -env, env_json_str = await NATSBridge.smartsend( - "/test", - [("dataname1", data1, "dictionary")], - broker_url="nats://localhost:4222", - fileserver_upload_handler=plik_oneshot_upload -) - -# Multiple payloads -env, env_json_str = await NATSBridge.smartsend( - "/test", - [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")], - broker_url="nats://localhost:4222" -) - -# Mixed content -env, env_json_str = await NATSBridge.smartsend( - "/chat", - [ - ("message_text", "Hello!", "text"), - ("user_image", image_data, "image"), - ("audio_clip", audio_data, "audio") - ], - broker_url="nats://localhost:4222" -) - -# Receive -env = await NATSBridge.smartreceive( - msg, - fileserver_download_handler=fetch_with_backoff -) -# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]] -# Access payloads: env["payloads"] which is a list of tuples -for dataname, data, type_ in env["payloads"]: - print(f"{dataname}: {data} (type: {type_})") -``` - -#### MicroPython - -```python -from natsbridge import NATSBridge - -# Limited to text and binary (no tables due to memory constraints) -env, env_json_str = NATSBridge.smartsend( - "/chat", - [ - ("message_text", "Hello!", "text"), - ("binary_data", data_bytes, "binary") - ], - broker_url="nats://localhost:4222", - size_threshold=100000 # Lower threshold for memory constraints -) -# Note: MicroPython uses synchronous handlers -``` - ---- - -### Mixed-Content Transport Examples - -The NATSBridge.jl library supports sending mixed content types in a single message, with automatic transport selection based on payload size. Small payloads (< 1MB threshold) use direct transport (Base64-encoded in NATS message), while large payloads (≥ 1MB) use link transport (uploaded to file server, URL sent via NATS). - -#### Julia Mixed-Content Example - -```julia -using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64 -include("../src/NATSBridge.jl") -using .NATSBridge - -# Configuration -const SUBJECT = "/natsbridge" -const NATS_URL = "nats://localhost:4222" -const FILESERVER_URL = "http://localhost:8080" - -# Create sample data - mix of small and large payloads -text_data = "Hello! This is a test chat message. 🎉" - -dict_data = Dict( - "type" => "chat", - "sender" => "serviceA", - "content" => Dict("text" => "JSON-formatted message", "format" => "markdown") -) - -# Small arrow table (< 1MB) - direct transport -arrow_table_small = DataFrame( - id = 1:10, - name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"], - score = rand(50:100, 10), - active = rand([true, false], 10) -) - -# Large arrow table (≥ 1MB) - link transport -arrow_table_large = DataFrame( - id = 1:200_000, - name = ["user_$i" for i in 1:200_000], - score = rand(50:100, 200_000), - active = rand([true, false], 200_000) -) - -# Small jsontable (< 1MB) - direct transport -json_table_small = DataFrame( - id = 1:10, - name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"], - score = rand(50:100, 10) -) - -# Large jsontable (≥ 1MB) - link transport -json_table_large = DataFrame( - id = 1:1_500_000, - name = ["user_$i" for i in 1:1_500_000], - score = rand(50:100, 1_500_000) -) - -# Binary data (small) - direct transport -binary_data_small = UInt8[rand(1:255) for _ in 1:200] - -# Binary data (large) - link transport -binary_data_large = UInt8[rand(1:255) for _ in 1:1_500_000] - -# Read image file -file_path_small_image = "./test/small_image.jpg" -file_data_small_image = read(file_path_small_image) -filename_small_image = basename(file_path_small_image) - -file_path_large_image = "./test/large_image.png" -file_data_large_image = read(file_path_large_image) -filename_large_image = basename(file_path_large_image) - -# Create payloads list - mixed content with both small and large data -payloads = [ - # Small data (direct transport) - ("chat_text", text_data, "text"), - ("chat_json", dict_data, "dictionary"), - ("arrow_table_small", arrow_table_small, "arrowtable"), - ("json_table_small", json_table_small, "jsontable"), - (filename_small_image, file_data_small_image, "binary"), - - # Large data (link transport) - ("arrow_table_large", arrow_table_large, "arrowtable"), - ("json_table_large", json_table_large, "jsontable"), - (filename_large_image, file_data_large_image, "binary"), - ("binary_file_large", binary_data_large, "binary") -] - -# Use smartsend with mixed content -correlation_id = string(uuid4()) -sendinfo = NATSBridge.smartsend( - SUBJECT, - payloads; - broker_url = NATS_URL, - fileserver_url = FILESERVER_URL, - fileserver_upload_handler = plik_oneshot_upload, - size_threshold = 1_000_000, # 1MB threshold - correlation_id = correlation_id, - msg_purpose = "chat", - sender_name = "mix_sender", - receiver_name = "", - receiver_id = "", - reply_to = "", - reply_to_msg_id = "", - is_publish = true -) - -env, env_json_str = sendinfo - -# Log transport type for each payload -for (i, payload) in enumerate(env.payloads) - println("Payload $i ('$payload.dataname'):") - println(" Transport: $(payload.transport)") - println(" Type: $(payload.payload_type)") - println(" Size: $(payload.size) bytes") - println(" Encoding: $(payload.encoding)") - - if payload.transport == "link" - println(" URL: $(payload.data)") - end -end - -# Summary -direct_count = count(p -> p.transport == "direct", env.payloads) -link_count = count(p -> p.transport == "link", env.payloads) -println("\n--- Transport Summary ---") -println("Direct transport: $direct_count payloads") -println("Link transport: $link_count payloads") -``` - -**Expected Output:** -``` -Payload 1 ('chat_text'): - Transport: direct - Type: text - Size: 38 bytes - Encoding: base64 - -Payload 2 ('chat_json'): - Transport: direct - Type: dictionary - Size: 156 bytes - Encoding: json - -Payload 3 ('arrow_table_small'): - Transport: direct - Type: arrowtable - Size: 1245 bytes - Encoding: arrow-ipc - -Payload 4 ('json_table_small'): - Transport: direct - Type: jsontable - Size: 892 bytes - Encoding: json - -Payload 5 ('small_image.jpg'): - Transport: direct - Type: binary - Size: 73269 bytes - Encoding: base64 - -Payload 6 ('arrow_table_large'): - Transport: link - Type: arrowtable - Size: 5242880 bytes - Encoding: arrow-ipc - URL: http://localhost:8080/file/ABC123/DEF456/arrow_table_large.arrow - -Payload 7 ('json_table_large'): - Transport: link - Type: jsontable - Size: 45678900 bytes - Encoding: json - URL: http://localhost:8080/file/GHI789/JKL012/json_table_large.json - -Payload 8 ('large_image.png'): - Transport: link - Type: binary - Size: 1168437 bytes - Encoding: base64 - URL: http://localhost:8080/file/MNO345/PQR678/large_image.png - -Payload 9 ('binary_file_large'): - Transport: link - Type: binary - Size: 1500000 bytes - Encoding: base64 - URL: http://localhost:8080/file/STU901/VWX234/binary_file_large.bin - ---- Transport Summary --- -Direct transport: 5 payloads -Link transport: 4 payloads -``` - -#### Receiver Example - -```julia -using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64 -include("../src/NATSBridge.jl") -using .NATSBridge - -const SUBJECT = "/natsbridge" -const NATS_URL = "nats://localhost:4222" - -conn = NATS.connect(NATS_URL) -NATS.subscribe(conn, SUBJECT) do msg - # Use NATSBridge.smartreceive to handle the data - result = NATSBridge.smartreceive( - msg; - max_retries = 5, - base_delay = 100, - max_delay = 5000 - ) - - println("Received $(length(result["payloads"])) payloads") - - # Process each payload - for (dataname, data, data_type) in result["payloads"] - println("\n=== Payload: $dataname (type: $data_type) ===") - - if data_type == "arrowtable" - if isa(data, Arrow.Table) - df = DataFrame(data) - println(" Type: Arrow.Table") - println(" Size: $(size(df, 1)) rows x $(size(df, 2)) columns") - end - elseif data_type == "jsontable" - if isa(data, Vector{Any}) - df = DataFrame(data) - println(" Type: Vector{Dict}") - println(" Size: $(length(data)) rows") - end - elseif data_type == "text" - if isa(data, String) - println(" Type: String") - println(" Length: $(length(data)) characters") - end - elseif data_type in ["image", "audio", "video", "binary"] - if isa(data, Vector{UInt8}) - println(" Type: Vector{UInt8} (binary)") - println(" Size: $(length(data)) bytes") - end - end - end -end - -# Keep listening -sleep(120) -NATS.drain(conn) -``` - ---- - -## Architecture - -### Cross-Platform Claim-Check Pattern - -```mermaid -flowchart TD - A[SmartSend Function] --> B{Is payload size < 1MB?} - B -->|Yes | C[Direct Path
< 1MB] - B -->|No | D[Link Path
>= 1MB] - - C --> C1[Serialize to Buffer] - C1 --> C2[Base64/JSON encode] - C2 --> C3[Publish to NATS] - - D --> D1[Serialize to Buffer] - D1 --> D2[Upload to HTTP Server] - D2 --> D3[Publish to NATS with URL] - - style A fill:#e1f5ff,stroke:#0066cc,stroke-width:2px - style B fill:#fff4e1,stroke:#cc6600,stroke-width:2px - style C fill:#e8f5e9,stroke:#008000,stroke-width:2px - style D fill:#e8f5e9,stroke:#008000,stroke-width:2px - style C1 fill:#f5f5f5,stroke:#666,stroke-width:1px - style C2 fill:#f5f5f5,stroke:#666,stroke-width:1px - style C3 fill:#f5f5f5,stroke:#666,stroke-width:1px - style D1 fill:#f5f5f5,stroke:#666,stroke-width:1px - style D2 fill:#f5f5f5,stroke:#666,stroke-width:1px - style D3 fill:#f5f5f5,stroke:#666,stroke-width:1px -``` - -**Claim-Check Pattern Overview:** -- **Direct Path** (< 1MB): Payload is serialized, Base64-encoded, and published directly to NATS -- **Link Path** (≥ 1MB): Payload is serialized, uploaded to an HTTP file server, and only the URL is published to NATS (claim-check pattern) - -### smartsend Return Value - -All platforms return a tuple/array containing both the envelope and JSON string: - -#### Julia - -```julia -env, env_json_str = smartsend(...) -# Returns: ::Tuple{msg_envelope_v1, String} -# env::msg_envelope_v1 - The envelope object with all metadata and payloads -# env_json_str::String - JSON string for publishing to NATS -``` - -#### JavaScript - -```javascript -const [env, env_json_str] = await smartsend(...); -// Returns: Promise<[env, env_json_str]> -// env: Object with all metadata and payloads -// env_json_str: String for publishing to NATS -``` - -#### Python - -```python -env, env_json_str = await smartsend(...) -# Returns: Tuple[Dict, str] -# env: Dict with all metadata and payloads -# env_json_str: String for publishing to NATS -``` - -#### MicroPython - -```python -env, env_json_str = NATSBridge.smartsend(...) -# Returns: Tuple[Dict, str] -# Note: MicroPython returns plain dicts (no structured envelope object) -``` - ---- - -## Installation - -### Julia Dependencies - -```julia -using Pkg -Pkg.add("NATS") -Pkg.add("Arrow") -Pkg.add("JSON") -Pkg.add("HTTP") -Pkg.add("UUIDs") -Pkg.add("Dates") -Pkg.add("PrettyPrinting") -Pkg.add("DataFrames") -``` - -### JavaScript Dependencies (Node.js) - -```bash -npm install nats apache-arrow node-fetch -# or -yarn add nats apache-arrow node-fetch -``` - -**Note:** Node.js has a built-in `crypto` module for UUID generation, so no external `uuid` package is needed. - -### JavaScript Dependencies (Browser) - -```bash -npm install nats apache-arrow -# or use CDN: -# https://unpkg.com/nats-js/dist/bundle/nats.min.js -# https://unpkg.com/apache-arrow/arrow.min.js -``` - -**Note:** For browser UUID generation, use the built-in `crypto.randomUUID()` API (available in modern browsers) or a lightweight alternative like `uuidv4` package. - -### Python Dependencies (Desktop) - -```bash -pip install nats-py aiohttp pyarrow pandas python-dateutil -``` - -### MicroPython Dependencies - -MicroPython uses built-in modules: -- `network` - NATS connection (custom implementation) -- `time` - Timestamps -- `uos` - File operations -- `base64` - Base64 encoding -- `json` - JSON parsing -- `struct` - Binary data handling - ---- - -## Usage Tutorial - -### Step 1: Start NATS Server - -```bash -docker run -p 4222:4222 nats:latest -``` - -### Step 2: Start HTTP File Server (optional) - -```bash -# Create a directory for file uploads -mkdir -p /tmp/fileserver - -# Use any HTTP server that supports POST for file uploads -# Example: Python's built-in server -python3 -m http.server 8080 --directory /tmp/fileserver -``` - -### Step 3: Run Test Scenarios - -```bash -# Julia tests -julia test/test_julia_to_julia_text_sender.jl -julia test/test_julia_to_julia_text_receiver.jl - -# JavaScript tests (Node.js) -node test/test_js_text_sender.js -node test/test_js_text_receiver.js - -# Python tests -python3 test/test_py_text_sender.py -python3 test/test_py_text_receiver.py -``` +**Note:** Python uses `"table"` as a single type for both Arrow and JSON table serialization. When exchanging data between Python and Julia/JavaScript, ensure the payload type is correctly translated (`"table"` ↔ `"arrowtable"` or `"jsontable"`). --- @@ -1236,17 +717,6 @@ function plik_oneshot_upload(file_server_url::String, filepath::String) end ``` -**Usage Examples:** - -```julia -# Upload binary data -data = read("./test_image.jpg") -result = plik_oneshot_upload("http://localhost:8080", "my_image.jpg", data) - -# Upload file directly from disk -result = plik_oneshot_upload("http://localhost:8080", "./test_image.jpg") -``` - --- ### JavaScript Implementation @@ -1846,18 +1316,24 @@ except ImportError: def _serialize_data(data: Any, payload_type: str) -> bytes: - """Serialize data to bytes based on type.""" + """ + Serialize data to bytes based on type. + + Note: Python uses "table" as a single type for both Arrow and JSON table + serialization. Julia/JavaScript use separate "arrowtable" and "jsontable" types. + """ if payload_type == 'text': if isinstance(data, str): return data.encode('utf-8') else: - raise Error('Text data must be a string') + raise ValueError('Text data must be a string') elif payload_type == 'dictionary': json_str = json.dumps(data) return json_str.encode('utf-8') - elif payload_type == 'arrowtable': + elif payload_type == 'table': + # Python uses "table" for both arrowtable and jsontable if not ARROW_AVAILABLE: - raise Error('pyarrow not available for table serialization') + raise RuntimeError('pyarrow not available for table serialization') import io buf = io.BytesIO() @@ -1865,38 +1341,24 @@ def _serialize_data(data: Any, payload_type: str) -> bytes: if isinstance(data, pd.DataFrame): # Serialize DataFrame to Arrow table = arrow.Table.from_pandas(data) - sink = arrow.ipc.new_file(buf) - arrow.ipc.write_table(table, sink) + sink = ipc.new_file(buf, table.schema) + ipc.write_table(table, sink) + sink.close() + return buf.getvalue() + elif isinstance(data, arrow.Table): + sink = ipc.new_file(buf, data.schema) + ipc.write_table(data, sink) sink.close() return buf.getvalue() else: - raise Error('arrowtable data must be a pandas DataFrame') - elif payload_type == 'jsontable': - # Serialize directly to JSON - json_str = json.dumps(data) - return json_str.encode('utf-8') - elif payload_type == 'image': + raise ValueError('Table data must be a pandas DataFrame or pyarrow Table') + elif payload_type in ('image', 'audio', 'video', 'binary'): if isinstance(data, (bytes, bytearray)): return bytes(data) else: - raise Error('Image data must be bytes') - elif payload_type == 'audio': - if isinstance(data, (bytes, bytearray)): - return bytes(data) - else: - raise Error('Audio data must be bytes') - elif payload_type == 'video': - if isinstance(data, (bytes, bytearray)): - return bytes(data) - else: - raise Error('Video data must be bytes') - elif payload_type == 'binary': - if isinstance(data, (bytes, bytearray)): - return bytes(data) - else: - raise Error('Binary data must be bytes') + raise ValueError(f'{payload_type} data must be bytes') else: - raise Error(f'Unknown payload_type: {payload_type}') + raise ValueError(f'Unknown payload_type: {payload_type}') ``` #### deserializeData Implementation @@ -1916,34 +1378,30 @@ except ImportError: def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any: - """Deserialize bytes to data based on type.""" + """ + Deserialize bytes to data based on type. + + Note: Python uses "table" as a single type for both Arrow and JSON table + deserialization. Julia/JavaScript use separate "arrowtable" and "jsontable" types. + """ if payload_type == 'text': return data.decode('utf-8') elif payload_type == 'dictionary': json_str = data.decode('utf-8') return json.loads(json_str) - elif payload_type == 'arrowtable': + elif payload_type == 'table': + # Python uses "table" for both arrowtable and jsontable if not ARROW_AVAILABLE: - raise Error('pyarrow not available for table deserialization') + raise RuntimeError('pyarrow not available for table deserialization') import io buf = io.BytesIO(data) - reader = arrow.ipc.open_file(buf) + reader = ipc.open_file(buf) return reader.read_all().to_pandas() - elif payload_type == 'jsontable': - # Deserialize from JSON - returns list[dict] - json_str = data.decode('utf-8') - return json.loads(json_str) - elif payload_type == 'image': - return data - elif payload_type == 'audio': - return data - elif payload_type == 'video': - return data - elif payload_type == 'binary': + elif payload_type in ('image', 'audio', 'video', 'binary'): return data else: - raise Error(f'Unknown payload_type: {payload_type}') + raise ValueError(f'Unknown payload_type: {payload_type}') ``` #### fetchWithBackoff Implementation @@ -2032,9 +1490,9 @@ async def plik_oneshot_upload( --- -## MicroPython Implementation +### MicroPython Implementation -### Limitations +#### Limitations MicroPython has significant constraints compared to desktop implementations: @@ -2042,13 +1500,15 @@ MicroPython has significant constraints compared to desktop implementations: |---------|---------|-------------| | Memory | Unlimited | ~256KB - 1MB | | Arrow IPC | ✅ | ❌ (not supported) | -| Async/Await | ✅ | ⚠️ (uasyncio only) | +| Async/Await | ✅ | ❌ (synchronous only) | | Large payloads (>1MB) | ✅ | ❌ (enforced limit) | -| arrowtable | ✅ | ❌ | -| jsontable | ⚠️ (limited) | ⚠️ (limited) | +| arrowtable | ✅ | ❌ (not supported) | +| jsontable | ✅ | ❌ (not supported) | | Multiple payloads | ✅ | ⚠️ (limited) | -### MicroPython Module Structure +**Note:** MicroPython does NOT support table types (`arrowtable` or `jsontable`) due to memory constraints. + +#### Module Structure ```python # natsbridge_mpy.py (MicroPython) @@ -2058,15 +1518,16 @@ import json import base64 import uos import struct +import random # Constants DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython DEFAULT_BROKER_URL = "nats://localhost:4222" DEFAULT_FILESERVER_URL = "http://localhost:8080" -MAX_PAYLOAD_SIZE = 50000 # Hard limit +MAX_PAYLOAD_SIZE = 50000 # Hard limit (lower than threshold for safety) -# Note: MicroPython uses list[list] for jsontable -# No DataFrame support +# Note: MicroPython does NOT support table types (arrowtable/jsontable) +# Only supports: text, dictionary, image, audio, video, binary class NATSBridge: @@ -2175,32 +1636,44 @@ class NATSBridge: return env_json_obj def _serialize_data(self, data, payload_type): - """Serialize data (MicroPython version - no arrowtable support).""" + """ + Serialize data (MicroPython version). + + Note: MicroPython does NOT support table types (arrowtable/jsontable). + Only supports: text, dictionary, image, audio, video, binary + """ if payload_type == 'text': - return data.encode('utf-8') + if isinstance(data, str): + return data.encode('utf-8') + else: + raise ValueError('Text data must be a string') elif payload_type == 'dictionary': - return json.dumps(data).encode('utf-8') - elif payload_type == 'jsontable': - # Serialize list of lists to JSON - return json.dumps(data).encode('utf-8') + json_str = json.dumps(data) + return json_str.encode('utf-8') elif payload_type in ('image', 'audio', 'video', 'binary'): - return bytes(data) + if isinstance(data, (bytes, bytearray, memoryview)): + return bytes(data) + else: + raise ValueError(f'{payload_type} data must be bytes') else: - raise ValueError(f"Unknown payload_type: {payload_type}") + raise ValueError(f'Unknown payload_type: {payload_type}') def _deserialize_data(self, data, payload_type): - """Deserialize data (MicroPython version).""" + """ + Deserialize data (MicroPython version). + + Note: MicroPython does NOT support table types (arrowtable/jsontable). + Only supports: text, dictionary, image, audio, video, binary + """ if payload_type == 'text': return data.decode('utf-8') elif payload_type == 'dictionary': - return json.loads(data.decode('utf-8')) - elif payload_type == 'jsontable': - # Returns list of lists - return json.loads(data.decode('utf-8')) + json_str = data.decode('utf-8') + return json.loads(json_str) elif payload_type in ('image', 'audio', 'video', 'binary'): return data else: - raise ValueError(f"Unknown payload_type: {payload_type}") + raise ValueError(f'Unknown payload_type: {payload_type}') def _generate_uuid(self): """Generate simple UUID (MicroPython compatible).""" @@ -2372,8 +1845,8 @@ This cross-platform NATS bridge provides: 3. **Message Format Consistency**: Identical JSON schemas across all platforms 4. **Handler Abstraction**: File server operations abstracted through configurable handlers 5. **Platform-Specific Optimizations**: - - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython) - - **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in all platforms) + - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython) + - **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in Julia, JavaScript, Python; NOT supported in MicroPython) The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior. @@ -2382,4 +1855,5 @@ The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as | Datatype | Serialization | Use Case | Encoding | Supported Platforms | |----------|---------------|----------|----------|---------------------| | `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python | -| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python, MicroPython | +| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python | +| `table` | Apache Arrow IPC (Python only) | Python's unified table type | `arrow-ipc` → `base64` | Python | diff --git a/docs/tutorial.md b/docs/tutorial.md index b43715d..562e082 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -33,18 +33,28 @@ smartsend(subject, [(dataname, data, type), ...], options) env = smartreceive(msg, options) ``` +**Important Platform Differences:** + +1. **Encoding field:** Julia and JavaScript preserve the original serialization format in the encoding field (`"base64"`, `"json"`, or `"arrow-ipc"`), while Python and MicroPython always use `"base64"` for all direct transport payloads. + +2. **Async vs Sync:** JavaScript and Python desktop use async/await, while MicroPython uses synchronous API. + ### Supported Payload Types | Type | Julia | JavaScript | Python | MicroPython | |------|-------|------------|--------|-------------| | `text` | `String` | `string` | `str` | `str` | | `dictionary` | `Dict` | `Object` | `dict` | `dict` | -| `table` | `DataFrame` | `Array` | `DataFrame` | ❌ | +| `arrowtable` | `DataFrame` | `Array` | `pandas.DataFrame` | ❌ | +| `jsontable` | `Vector{NamedTuple}` | `Array` | `list[dict]` | ❌ | +| `table` | ❌ | ❌ | `pandas.DataFrame` | ❌ | | `image` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` | | `audio` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` | | `video` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` | | `binary` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` | +**Note on MicroPython:** MicroPython does not support table types (`arrowtable`, `jsontable`, or `table`) due to memory constraints. Use `dictionary` or `binary` instead. + --- ## Prerequisites @@ -645,7 +655,7 @@ df = DataFrame( score = [95, 88, 92] ) -data = [("students", df, "table")] +data = [("students", df, "arrowtable")] env, env_json_str = smartsend("/data/students", data, broker_url="nats://localhost:4222") ``` @@ -661,7 +671,7 @@ const table_data = [ { id: 3, name: "Charlie", score: 92 } ]; -const data = [["students", table_data, "table"]]; +const data = [["students", table_data, "arrowtable"]]; const [env, env_json_str] = await NATSBridge.smartsend( "/data/students", data, @@ -728,4 +738,4 @@ MicroPython does not support table type due to memory constraints. Use dictionar ## License -MIT \ No newline at end of file +MIT diff --git a/docs/walkthrough.md b/docs/walkthrough.md index 28cb59f..39b7d1b 100644 --- a/docs/walkthrough.md +++ b/docs/walkthrough.md @@ -959,25 +959,14 @@ function send_batch(sender::SensorSender, readings::Vector{SensorReading}) arrow_data = take!(buf) - # Send based on size - if length(arrow_data) < 1048576 # < 1MB - data = [("batch", arrow_data, "table")] - smartsend( - "/sensors/batch", - data, - broker_url=sender.broker_url, - fileserver_url=sender.fileserver_url - ) - else - # Upload to file server - data = [("batch", arrow_data, "table")] - smartsend( - "/sensors/batch", - data, - broker_url=sender.broker_url, - fileserver_url=sender.fileserver_url - ) - end + # Send based on size (auto-selected by smartsend) + data = [("batch", arrow_data, "arrowtable")] + smartsend( + "/sensors/batch", + data, + broker_url=sender.broker_url, + fileserver_url=sender.fileserver_url + ) end ``` @@ -1037,28 +1026,16 @@ class SensorSender { const buffer = arrow.tableFromBatches([recordBatch]).toBuffer(); const arrow_data = new Uint8Array(buffer); - // Send based on size - if (arrow_data.length < 1048576) { - const data = [["batch", arrow_data, "table"]]; - await NATSBridge.smartsend( - "/sensors/batch", - data, - { - broker_url: this.broker_url, - fileserver_url: this.fileserver_url - } - ); - } else { - const data = [["batch", arrow_data, "table"]]; - await NATSBridge.smartsend( - "/sensors/batch", - data, - { - broker_url: this.broker_url, - fileserver_url: this.fileserver_url - } - ); - } + // Send based on size (auto-selected by smartsend) + const data = [["batch", arrow_data, "arrowtable"]]; + await NATSBridge.smartsend( + "/sensors/batch", + data, + { + broker_url: this.broker_url, + fileserver_url: this.fileserver_url + } + ); } } @@ -1109,7 +1086,7 @@ class SensorSender: arrow_data = buf.getvalue() # Send based on size (auto-selected by smartsend) - data = [("batch", arrow_data, "table")] + data = [("batch", arrow_data, "arrowtable")] await smartsend( "/sensors/batch", data, @@ -1152,7 +1129,7 @@ function send_batch_readings(sender::SensorSender, readings::Vector{Tuple{String # Send as single message smartsend( "/sensors/batch", - [("batch", arrow_data, "table")], + [("batch", arrow_data, "arrowtable")], broker_url=sender.broker_url ) end @@ -1193,7 +1170,7 @@ async function sendBatchReadings(sender, readings) { const arrow_data = new Uint8Array(buffer); // Send as single message - const data = [["batch", arrow_data, "table"]]; + const data = [["batch", arrow_data, "arrowtable"]]; await NATSBridge.smartsend( "/sensors/batch", data, @@ -1398,4 +1375,4 @@ For more information, check the [API documentation](../src/README.md) and [test ## License -MIT \ No newline at end of file +MIT diff --git a/src/natsbridge.py b/src/natsbridge.py index f30851d..98e456a 100644 --- a/src/natsbridge.py +++ b/src/natsbridge.py @@ -71,8 +71,9 @@ def _serialize_data(data: Any, payload_type: str) -> bytes: Args: data: Data to serialize (string for "text", JSON-serializable for "dictionary", - table-like for "table", binary for "image", "audio", "video", "binary") - payload_type: Target format: "text", "dictionary", "table", "image", "audio", "video", "binary" + table-like for "arrowtable"/"jsontable", binary for "image", "audio", "video", "binary") + payload_type: Target format: "text", "dictionary", "arrowtable", "jsontable", + "image", "audio", "video", "binary" Returns: Binary representation of the serialized data @@ -80,7 +81,8 @@ def _serialize_data(data: Any, payload_type: str) -> bytes: Raises: Error: If payload_type is not one of the supported types Error: If payload_type is "image", "audio", or "video" but data is not bytes - Error: If payload_type is "table" but data is not a pandas DataFrame or pyarrow Table + Error: If payload_type is "arrowtable" but data is not a pandas DataFrame or pyarrow Table + Error: If payload_type is "jsontable" but data is not a list of dicts """ if payload_type == 'text': if isinstance(data, str): @@ -90,9 +92,9 @@ def _serialize_data(data: Any, payload_type: str) -> bytes: elif payload_type == 'dictionary': json_str = json.dumps(data) return json_str.encode('utf-8') - elif payload_type == 'table': + elif payload_type == 'arrowtable': if not ARROW_AVAILABLE: - raise RuntimeError('pyarrow not available for table serialization') + raise RuntimeError('pyarrow not available for arrowtable serialization') import io buf = io.BytesIO() @@ -110,7 +112,14 @@ def _serialize_data(data: Any, payload_type: str) -> bytes: sink.close() return buf.getvalue() else: - raise ValueError('Table data must be a pandas DataFrame or pyarrow Table') + raise ValueError('Arrow table data must be a pandas DataFrame or pyarrow Table') + elif payload_type == 'jsontable': + # Serialize list of dicts to JSON format + if isinstance(data, list) and all(isinstance(row, dict) for row in data): + json_str = json.dumps(data) + return json_str.encode('utf-8') + else: + raise ValueError('JSON table data must be a list of dicts') elif payload_type == 'image': if isinstance(data, (bytes, bytearray)): return bytes(data) @@ -141,12 +150,13 @@ def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> An Args: data: Serialized data as bytes - payload_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") + payload_type: Data type ("text", "dictionary", "arrowtable", "jsontable", + "image", "audio", "video", "binary") correlation_id: Correlation ID for logging Returns: - Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", - bytes for "image", "audio", "video", "binary") + Deserialized data (String for "text", DataFrame for "arrowtable", + Vector{Dict} for "jsontable"/"dictionary", bytes for "image", "audio", "video", "binary") Raises: Error: If payload_type is not one of the supported types @@ -156,14 +166,18 @@ def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> An elif payload_type == 'dictionary': json_str = data.decode('utf-8') return json.loads(json_str) - elif payload_type == 'table': + elif payload_type == 'arrowtable': if not ARROW_AVAILABLE: - raise RuntimeError('pyarrow not available for table deserialization') + raise RuntimeError('pyarrow not available for arrowtable deserialization') import io buf = io.BytesIO(data) reader = ipc.open_file(buf) return reader.read_all().to_pandas() + elif payload_type == 'jsontable': + # Deserialize JSON to list of dicts + json_str = data.decode('utf-8') + return json.loads(json_str) elif payload_type == 'image': return data elif payload_type == 'audio': @@ -317,7 +331,7 @@ class NATSClient: self._client = nats.connect(self.url) await self._client else: - raise Error('nats-py not available') + raise RuntimeError('nats-py not available') return self._client async def publish(self, subject: str, message: str, correlation_id: str = "") -> None: @@ -397,12 +411,19 @@ def _build_payload( Returns: Payload object """ + # Determine encoding based on payload type (matching Julia/JS implementation) + encoding = 'base64' + if payload_type == 'jsontable': + encoding = 'json' + elif payload_type == 'arrowtable': + encoding = 'arrow-ipc' + return { 'id': str(uuid.uuid4()), 'dataname': dataname, 'payload_type': payload_type, 'transport': transport, - 'encoding': 'base64' if transport == 'direct' else 'none', + 'encoding': encoding, 'size': len(payload_bytes), 'data': data, 'metadata': {'payload_bytes': len(payload_bytes)} if transport == 'direct' else {} @@ -476,7 +497,7 @@ async def smartsend( data: List of (dataname, data, type) tuples to send - dataname: Name of the payload - data: The actual data to send - - type: Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" + - type: Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" broker_url: URL of the NATS server fileserver_url: URL of the HTTP file server for large payloads fileserver_upload_handler: Function to handle fileserver uploads (must return Dict with "status", @@ -514,14 +535,21 @@ async def smartsend( >>> data2 = [1, 2, 3, 4, 5] >>> env, env_json_str = await smartsend( ... "my.subject", - ... [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")] + ... [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")] ... ) >>> >>> # Send a large array using fileserver upload >>> data = list(range(10_000_000)) # ~80 MB >>> env, env_json_str = await smartsend( ... "large.data", - ... [("large_table", data, "table")] + ... [("large_table", data, "arrowtable")] + ... ) + >>> + >>> # Send jsontable (JSON format for human-readable tabular data) + >>> users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}] + >>> env, env_json_str = await smartsend( + ... "json.data", + ... [("users", users, "jsontable")] ... ) >>> >>> # Mixed content (e.g., chat with text and image)