diff --git a/docs/architecture.md b/docs/architecture.md index d496e92..8b57af8 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -55,7 +55,8 @@ All three platforms expose the same high-level API: |------|-------|------------|-------------------| | `text` | `String` | `string` | `str` | | `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | -| `table` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | +| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | +| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | | `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | | `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | | `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | @@ -236,13 +237,23 @@ flowchart TB }, { "id": "uuid4", - "dataname": "large_table", - "payload_type": "table", + "dataname": "large_arrow_table", + "payload_type": "arrowtable", "transport": "link", - "encoding": "none", + "encoding": "arrow-ipc", "size": 524288, "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow", "metadata": {} + }, + { + "id": "uuid4", + "dataname": "json_table", + "payload_type": "jsontable", + "transport": "direct", + "encoding": "json", + "size": 1024, + "data": "[{\"id\": 1, \"name\": \"Alice\"}, {\"id\": 2, \"name\": \"Bob\"}]", + "metadata": {} } ] } @@ -255,11 +266,11 @@ flowchart TB { "id": "uuid4", "dataname": "login_image", - "payload_type": "image | dictionary | table | text | audio | video | binary", + "payload_type": "image | dictionary | arrowtable | jsontable | text | audio | video | binary", "transport": "direct | link", "encoding": "none | json | base64 | arrow-ipc", "size": 15433, - "data": "base64-encoded-string | http-url", + "data": "base64-encoded-string | http-url | json-string", "metadata": { "checksum": "sha256_hash" } @@ -278,25 +289,25 @@ flowchart TB ▼ ┌─────────────────────────────────────────────────────────────┐ │ For each payload: │ -│ 1. Extract type from tuple/array │ +│ 1. Extract type from tuple/array │ │ 2. Serialize based on type │ │ 3. Check payload size │ └─────────────────────────────────────────────────────────────┘ │ - ┌───────────┴────────────┐ - ▼ ▼ - ┌──────────────┐ ┌──────────────┐ - │ Direct Path │ │ Link Path │ - │ (< 1MB) │ │ (>= 1MB) │ - │ │ │ │ - │ • Serialize │ │ • Serialize │ - │ to buffer │ │ to buffer │ - │ • Base64 │ │ • Upload to │ - │ encode │ │ HTTP Server│ - │ • Publish to │ │ • Publish to │ - │ NATS │ │ NATS with │ - │ (in msg) │ │ URL │ - └──────────────┘ └──────────────┘ + ┌───────────┴────────────┐ + ▼ ▼ + ┌──────────────┐ ┌──────────────┐ + │ Direct Path │ │ Link Path │ + │ (< 1MB) │ │ (>= 1MB) │ + │ │ │ │ + │ • Serialize │ │ • Serialize │ + │ to buffer │ │ to buffer │ + │ • Base64/JSON│ │ • Upload to │ + │ encode │ │ HTTP Server│ + │ • Publish to │ │ • Publish to │ + │ NATS │ │ NATS with │ + │ (in msg) │ │ URL │ + └──────────────┘ └──────────────┘ ``` --- @@ -422,6 +433,41 @@ function smartreceive( )::JSON.Object{String, Any} ``` +#### Serialization Logic for Tables + +```julia +# Serialize table data based on payload_type +function _serialize_table_data(data::Any, payload_type::String)::Vector{UInt8} + if payload_type == "arrowtable" + # Serialize to Apache Arrow IPC format + buffer = IOBuffer() + Arrow.write(buffer, data) + return take!(buffer) + elseif payload_type == "jsontable" + # Serialize to JSON format + json_str = JSON.json(data) + return Vector{UInt8}(json_str) + else + throw(ArgumentError("Unknown payload_type: $payload_type")) + end +end + +# Deserialize table data based on payload_type +function _deserialize_table_data(data::Vector{UInt8}, payload_type::String)::Any + if payload_type == "arrowtable" + # Deserialize from Apache Arrow IPC format + buffer = Buffer(data) + return Arrow.read(buffer) + elseif payload_type == "jsontable" + # Deserialize from JSON format + json_str = String(data) + return JSON.parse(json_str) + else + throw(ArgumentError("Unknown payload_type: $payload_type")) + end +end +``` + --- ### JavaScript Implementation @@ -541,7 +587,7 @@ class NATSClient { | Package | Purpose | |---------|---------| | `nats` | Core NATS functionality (nats.js) | -| `uuid` | UUID generation | +| `crypto` (built-in) | UUID generation (Node.js) | | `node-fetch` or `axios` | HTTP client for file server | | `apache-arrow` | Arrow IPC serialization | @@ -550,7 +596,7 @@ class NATSClient { | Package | Purpose | |---------|---------| | `nats` | Browser-compatible NATS client | -| `uuid` | UUID generation | +| `crypto` (built-in) | UUID generation (browser) | | `fetch` (native) | HTTP client for file server | | `apache-arrow` | Arrow IPC serialization | @@ -615,6 +661,43 @@ async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correla } ``` +#### Serialization Logic for Tables + +```javascript +// Serialize table data based on payload_type +async function serializeTableData(data, payload_type) { + if (payload_type === "arrowtable") { + // Serialize to Apache Arrow IPC format + const schema = new arrow.Schema([...]); // Define schema + const arr = arrow.tableToArrowTable(data, schema); + const buffer = arrow.RecordBatch.from(arr).toBuffer(); + return new Uint8Array(buffer); + } else if (payload_type === "jsontable") { + // Serialize to JSON format + const jsonStr = JSON.stringify(data); + return new TextEncoder().encode(jsonStr); + } else { + throw new Error(`Unknown payload_type: ${payload_type}`); + } +} + +// Deserialize table data based on payload_type +async function deserializeTableData(data, payload_type) { + if (payload_type === "arrowtable") { + // Deserialize from Apache Arrow IPC format + const buffer = arrow.arrayBufferToBuffer(data.buffer); + const batch = arrow.RecordBatch.deserialize(buffer); + return arrow.tableFromBatch(batch); + } else if (payload_type === "jsontable") { + // Deserialize from JSON format + const jsonStr = new TextDecoder().decode(data); + return JSON.parse(jsonStr); + } else { + throw new Error(`Unknown payload_type: ${payload_type}`); + } +} +``` + --- ### Python/MicroPython Implementation @@ -906,6 +989,56 @@ async def fetch_with_backoff( pass ``` +#### Serialization Logic for Tables + +```python +# Serialize table data based on payload_type +def serialize_table_data(data: Any, payload_type: str) -> bytes: + if payload_type == "arrowtable": + # Serialize to Apache Arrow IPC format + import pyarrow as pa + import pyarrow.feather as feather + import io + + if isinstance(data, pd.DataFrame): + table = pa.Table.from_pandas(data) + buffer = io.BytesIO() + feather.write_feather(table, buffer) + return buffer.getvalue() + else: + raise TypeError("Expected pandas DataFrame for arrowtable") + + elif payload_type == "jsontable": + # Serialize to JSON format + if isinstance(data, list) and all(isinstance(row, dict) for row in data): + return json.dumps(data).encode('utf-8') + else: + raise TypeError("Expected list of dicts for jsontable") + + else: + raise ValueError(f"Unknown payload_type: {payload_type}") + +# Deserialize table data based on payload_type +def deserialize_table_data(data: bytes, payload_type: str) -> Any: + if payload_type == "arrowtable": + # Deserialize from Apache Arrow IPC format + import pyarrow as pa + import pyarrow.feather as feather + import io + + buffer = io.BytesIO(data) + table = feather.read_table(buffer) + return table.to_pandas() + + elif payload_type == "jsontable": + # Deserialize from JSON format + json_str = data.decode('utf-8') + return json.loads(json_str) + + else: + raise ValueError(f"Unknown payload_type: {payload_type}") +``` + --- ## Platform Comparison Matrix @@ -917,6 +1050,9 @@ async def fetch_with_backoff( | **Type Safety** | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ | | **Memory Management** | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) | | **Arrow IPC** | ✅ Native | ✅ (arrow package) | ✅ (pyarrow) | ❌ | +| **JSON Serialization** | ✅ (JSON.jl) | ✅ (native) | ✅ (json) | ✅ (json) | +| **arrowtable Support** | ✅ | ✅ | ✅ | ❌ | +| **jsontable Support** | ✅ | ✅ | ✅ | ✅ | | **Direct Transport** | ✅ | ✅ | ✅ | ✅ | | **Link Transport** | ✅ | ✅ | ✅ | ⚠️ (Limited) | | **Handler Functions** | ✅ | ✅ | ✅ | ✅ | @@ -948,7 +1084,11 @@ function _serialize_data(data::Dict, payload_type::String) end function _serialize_data(data::DataFrame, payload_type::String) - # Table handling + # Table handling - arrowtable +end + +function _serialize_data(data::Vector{NamedTuple}, payload_type::String) + # Table handling - jsontable end ``` @@ -979,7 +1119,7 @@ function generateUUID() { } async function serializeData(data, payload_type) { - // Serialization logic + // Serialization logic for arrowtable and jsontable } ``` @@ -1028,9 +1168,9 @@ def smartreceive(msg, **kwargs): | Platform | Code | |----------|------| -| **Julia** | ```julia
df = DataFrame(id=1:1000000, value=rand(1000000))
env, env_json_str = smartsend("analysis", [("table", df, "table")])``` | -| **JavaScript** | ```javascript
const df = [{ id: 1, value: 0.5 }, ...];
[env, env_json_str] = await smartsend("analysis", [["table", df, "table"]]);``` | -| **Python** | ```python
import pandas as pd
df = pd.DataFrame({"id": range(1000000), "value": np.random.rand(1000000)})
env, env_json_str = await smartsend("analysis", [("table", df, "table")])``` | +| **Julia** | ```julia
df = DataFrame(id=1:1000000, value=rand(1000000))
env, env_json_str = smartsend("analysis", [("table_data", df, "arrowtable")])``` | +| **JavaScript** | ```javascript
const df = [{ id: 1, value: 0.5 }, ...];
[env, env_json_str] = await smartsend("analysis", [["table_data", df, "arrowtable"]]);``` | +| **Python** | ```python
import pandas as pd
df = pd.DataFrame({"id": range(1000000), "value": np.random.rand(1000000)})
env, env_json_str = await smartsend("analysis", [("table_data", df, "arrowtable")])``` | ### Scenario 3: Chat System (Multi-Payload) @@ -1040,6 +1180,29 @@ def smartreceive(msg, **kwargs): | **JavaScript** | ```javascript
const chat = [["text", "Hello!", "text"], ["image", imgBuffer, "image"]];
[env, env_json_str] = await smartsend("chat", chat);``` | | **Python** | ```python
chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]
env, env_json_str = await smartsend("chat", chat)``` | +### Scenario 4: JSON Table Transfer (Cross-Platform) + +| Platform | Code | +|----------|------| +| **Julia** | ```julia
rows = [Dict("id" => 1, "name" => "Alice"), Dict("id" => 2, "name" => "Bob")]
env, env_json_str = smartsend("data", [("users", rows, "jsontable")])``` | +| **JavaScript** | ```javascript
const users = [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }];
[env, env_json_str] = await smartsend("data", [["users", users, "jsontable"]]);``` | +| **Python** | ```python
users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
env, env_json_str = await smartsend("data", [("users", users, "jsontable")])``` | + +### Scenario 5: Smart Transport Selection + +The `smartsend` function automatically selects the transport method based on payload size: + +- **Direct Transport (< 1MB)**: Payload is serialized and embedded directly in the NATS message + - `arrowtable`: Serialized to Arrow IPC, base64 encoded + - `jsontable`: Serialized to JSON, base64 encoded + - `dictionary`: Serialized to JSON, base64 encoded + - `text`: Serialized to UTF-8, base64 encoded + - `image/audio/video/binary`: Base64 encoded + +- **Link Transport (>= 1MB)**: Payload is uploaded to HTTP file server, URL embedded in message + - All types supported + - Receiver downloads from URL and deserializes + --- ## Performance Considerations (Cross-Platform) @@ -1080,6 +1243,13 @@ All platforms use correlation IDs for distributed tracing: [timestamp] [Correlation: abc123] Message published to subject ``` +### Serialization Performance Comparison + +| Format | Use Case | Pros | Cons | +|--------|----------|------|------| +| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library | +| `jsontable` | Small/medium tabular data | Human-readable, universal support | Slower, larger size, no schema | + --- ## Testing Strategy (Cross-Platform) @@ -1092,12 +1262,15 @@ All platforms use correlation IDs for distributed tracing: | **Deserialization** | `test/test_julia_text_receiver.jl` | `test/test_js_text_receiver.js` | `test/test_py_text_receiver.py` | | **Large Payload** | `test/test_julia_file_sender.jl` | `test/test_js_file_sender.js` | `test/test_py_file_sender.py` | | **Multi-Payload** | `test/test_julia_mix_payloads_sender.jl` | `test/test_js_mix_payloads_sender.js` | `test/test_py_mix_payloads_sender.py` | +| **Arrow Table** | `test/test_julia_table_sender.jl` | `test/test_js_table_sender.js` | `test/test_py_table_sender.py` | ### Integration Tests - NATS server communication - File server upload/download - Cross-platform message exchange +- Arrow table serialization/deserialization +- JSON table serialization/deserialization --- @@ -1134,6 +1307,16 @@ This cross-platform NATS bridge provides: - Python: Class-based design with type hints 3. **Message Format Consistency**: Identical `msg_envelope_v1` and `msg_payload_v1` JSON schemas 4. **Handler Abstraction**: File server operations abstracted through configurable handlers -5. **Platform-Specific Optimizations**: Arrow IPC support in desktop platforms, streaming support in MicroPython +5. **Platform-Specific Optimizations**: + - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data + - **JSON** (`jsontable`): Universal human-readable format for smaller tables + - Streaming support in MicroPython -The Julia implementation serves as the **ground truth** for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms. \ No newline at end of file +The Julia implementation serves as the **ground truth** for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms. + +### Datatype Summary + +| Datatype | Serialization | Use Case | Encoding | +|----------|---------------|----------|----------| +| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | +| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | diff --git a/docs/implementation.md b/docs/implementation.md index 49b9c1f..ed21567 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -177,7 +177,8 @@ The system uses a **standardized list-of-tuples format** for all payload operati |------|-------|------------|--------|-------------| | `text` | `String` | `string` | `str` | `str` | | `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | -| `table` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) | +| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) | +| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | `list` | | `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | | `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | | `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | @@ -201,7 +202,7 @@ env, env_json_str = smartsend( # Multiple payloads with different types env, env_json_str = smartsend( "/test", - [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")], + [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")], broker_url="nats://localhost:4222" ) @@ -245,7 +246,7 @@ const [env, env_json_str] = await NATSBridge.smartsend( "/test", [ ["dataname1", data1, "dictionary"], - ["dataname2", data2, "table"] + ["dataname2", data2, "arrowtable"] ], { broker_url: "nats://localhost:4222" } ); @@ -288,7 +289,7 @@ env, env_json_str = await NATSBridge.smartsend( # Multiple payloads env, env_json_str = await NATSBridge.smartsend( "/test", - [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")], + [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")], broker_url="nats://localhost:4222" ) @@ -334,6 +335,160 @@ env, env_json_str = NATSBridge.smartsend( --- +## Row-Oriented vs Column-Oriented Data Structures + +Different platforms use different internal representations for tabular data. Understanding these differences is crucial for proper serialization/deserialization when using `jsontable` and `arrowtable` datatypes. + +### Data Structure Comparison + +| Platform | Table Structure | Orientation | +|----------|-----------------|-------------| +| **Julia (DataFrame)** | `Dict{String, Vector}` | Column-oriented | +| **Python (pandas)** | `dict[str, list]` | Column-oriented | +| **JavaScript** | `Array` | Row-oriented | +| **MicroPython** | `list[list]` | Row-oriented | + +### Column-Oriented (Julia DataFrame, Python pandas) + +In column-oriented structures, each column is stored as a separate array/vector: + +**Julia Example:** +```julia +# Create dictionary with column vectors +dict = Dict("customer age" => [15, 20, 25], + "first name" => ["Rohit", "Rahul", "Akshat"]) + +# Convert to DataFrame +df = DataFrame(dict) +println(df) +# Output: +# 3×2 DataFrame +# Row ┆ customer age ┆ first name +# ┆ Int64 ┆ String +# ─────┼──────────────┼──────────── +# 1 ┆ 15 ┆ "Rohit" +# 2 ┆ 20 ┆ "Rahul" +# 3 ┆ 25 ┆ "Akshat" +``` + +**Python Example:** +```python +# Create dictionary with column lists +data = { + "Name": ["Alice", "Bob", "Charlie"], + "Age": [25, 30, 35], + "Score": [88.5, 92.0, 79.5] +} + +# Convert to DataFrame +df = pd.DataFrame(data) +print(df) +# Output: +# Name Age Score +# 0 Alice 25 88.5 +# 1 Bob 30 92.0 +# 2 Charlie 35 79.5 +``` + +### Row-Oriented (JavaScript, MicroPython) + +In row-oriented structures, each row is stored as a separate object/array: + +**JavaScript Example:** +```javascript +// Array of objects (row-oriented) +const users = [ + { Name: "Alice", Age: 25, Score: 88.5 }, + { Name: "Bob", Age: 30, Score: 92.0 }, + { Name: "Charlie", Age: 35, Score: 79.5 } +]; +``` + +**MicroPython Example:** +```python +# List of lists (row-oriented) +users = [ + ["Alice", 25, 88.5], + ["Bob", 30, 92.0], + ["Charlie", 35, 79.5] +] +``` + +### Cross-Platform Conversion for jsontable + +When sending `jsontable` across platforms, the system performs automatic conversion between row-oriented and column-oriented formats: + +**Sending from Julia/Python (column-oriented) to JS/MicroPython (row-oriented):** +1. Convert column-oriented dict to row-oriented array of objects +2. Serialize to JSON +3. Send with `payload_type = "jsontable"` + +**Receiving from JS/MicroPython (row-oriented) to Julia/Python (column-oriented):** +1. Deserialize JSON to row-oriented array of objects +2. Convert to column-oriented dict +3. Create DataFrame from column-oriented dict + +**Example: Julia to JavaScript** +```julia +# Julia side - column-oriented DataFrame +df = DataFrame( + "Name" => ["Alice", "Bob", "Charlie"], + "Age" => [25, 30, 35], + "Score" => [88.5, 92.0, 79.5] +) + +# smartsend automatically converts to row-oriented JSON +env, env_json_str = smartsend( + "/data", + [("users", df, "jsontable")] +) +# JSON sent: [{"Name":"Alice","Age":25,"Score":88.5}, ...] +``` + +```javascript +// JavaScript side - receives row-oriented array +const [env, env_json_str] = await NATSBridge.smartsend( + "/data", + [["users", users, "jsontable"]] +); +// users is already row-oriented: [{Name: "Alice", Age: 25, ...}, ...] +``` + +**Example: JavaScript to Julia** +```javascript +// JavaScript side - row-oriented array +const users = [ + { Name: "Alice", Age: 25, Score: 88.5 }, + { Name: "Bob", Age: 30, Score: 92.0 } +]; + +const [env, env_json_str] = await NATSBridge.smartsend( + "/data", + [["users", users, "jsontable"]] +); +``` + +```julia +# Julia side - receives and converts to column-oriented DataFrame +env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) +# The jsontable is automatically converted to DataFrame +for (dataname, data, type) in env["payloads"] + if type == "jsontable" + # data is now a DataFrame with column-oriented structure + println(data) + # Output: + # 2×3 DataFrame + # Row ┆ Name ┆ Age ┆ Score + # ┆ String ┆ Int64 ┆ Float64 + # ─────┼────────┼──────┼─────── + # 1 ┆ Alice ┆ 25 ┆ 88.5 + # 2 ┆ Bob ┆ 30 ┆ 92.0 + end +end +``` + +--- + ## Architecture ### Cross-Platform Claim-Check Pattern @@ -345,7 +500,7 @@ flowchart TD B -->|No | D[Link Path
>= 1MB] C --> C1[Serialize to Buffer] - C1 --> C2[Base64 encode] + C1 --> C2[Base64/JSON encode] C2 --> C3[Publish to NATS] D --> D1[Serialize to Buffer] @@ -426,20 +581,24 @@ Pkg.add("Dates") ### JavaScript Dependencies (Node.js) ```bash -npm install nats uuid apache-arrow node-fetch +npm install nats apache-arrow node-fetch # or -yarn add nats uuid apache-arrow node-fetch +yarn add nats apache-arrow node-fetch ``` +**Note:** Node.js has a built-in `crypto` module for UUID generation, so no external `uuid` package is needed. + ### JavaScript Dependencies (Browser) ```bash -npm install nats uuid apache-arrow +npm install nats apache-arrow # or use CDN: # https://unpkg.com/nats-js/dist/bundle/nats.min.js # https://unpkg.com/apache-arrow/arrow.min.js ``` +**Note:** For browser UUID generation, use the built-in `crypto.randomUUID()` API (available in modern browsers) or a lightweight alternative like `uuidv4` package. + ### Python Dependencies (Desktop) ```bash @@ -592,7 +751,7 @@ function _serialize_data(data::Dict, payload_type::String) end function _serialize_data(data::DataFrame, payload_type::String) - # Table handling + # Table handling - arrowtable io = IOBuffer() Arrow.write(io, data) return take!(io) @@ -784,10 +943,16 @@ function _serialize_data(data::Any, payload_type::String) json_str = JSON.json(data) json_str_bytes = Vector{UInt8}(json_str) return json_str_bytes - elseif payload_type == "table" + elseif payload_type == "arrowtable" + # Serialize DataFrame to Arrow IPC format io = IOBuffer() Arrow.write(io, data) return take!(io) + elseif payload_type == "jsontable" + # Convert column-oriented to row-oriented JSON + # data is Vector{NamedTuple} or Vector{Dict} + json_str = JSON.json(data) + return Vector{UInt8}(json_str) elseif payload_type == "image" if isa(data, Vector{UInt8}) return data @@ -833,10 +998,17 @@ function _deserialize_data( elseif payload_type == "dictionary" json_str = String(data) return JSON.parse(json_str) - elseif payload_type == "table" + elseif payload_type == "arrowtable" + # Deserialize from Arrow IPC format io = IOBuffer(data) - df = Arrow.Table(io) - return df + arrow_table = Arrow.Table(io) + return arrow_table + elseif payload_type == "jsontable" + # Deserialize from JSON format + # Returns Vector{NamedTuple} (column-oriented compatible) + json_str = String(data) + parsed = JSON.parse(json_str) + return parsed elseif payload_type == "image" return data elseif payload_type == "audio" @@ -931,9 +1103,12 @@ end ```javascript // natsbridge.js const nats = require('nats'); -const { v4: uuidv4 } = require('uuid'); +const crypto = require('crypto'); const fetch = require('node-fetch'); +// UUID generation using built-in crypto module +const uuidv4 = () => crypto.randomUUID(); + const DEFAULT_SIZE_THRESHOLD = 1_000_000; const DEFAULT_BROKER_URL = 'nats://localhost:4222'; const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; @@ -984,10 +1159,13 @@ module.exports = { ```javascript const nats = require('nats'); -const { v4: uuidv4 } = require('uuid'); +const crypto = require('crypto'); const fetch = require('node-fetch'); const arrow = require('apache-arrow'); +// UUID generation using built-in crypto module +const uuidv4 = () => crypto.randomUUID(); + const DEFAULT_SIZE_THRESHOLD = 1_000_000; const DEFAULT_BROKER_URL = 'nats://localhost:4222'; const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; @@ -1108,21 +1286,36 @@ async function serializeData(data, payload_type) { } else if (payload_type === 'dictionary') { const jsonStr = JSON.stringify(data); return Buffer.from(jsonStr, 'utf8'); - } else if (payload_type === 'table') { - // Convert to Arrow IPC - const buffer = Buffer.alloc(1024 * 1024); // Pre-allocate buffer - const writer = new arrow.RecordBatchWriter([ - new arrow.Schema(Object.keys(data[0]).map(key => new arrow.Field(key, arrow.any()))) - ]); + } else if (payload_type === 'arrowtable') { + // Convert Array to Arrow IPC + // data is row-oriented: [{id: 1, name: "Alice"}, ...] + if (!Array.isArray(data) || data.length === 0) { + throw new Error('arrowtable data must be a non-empty array of objects'); + } + // Create schema from first row + const schemaFields = Object.keys(data[0]).map(key => + new arrow.Field(key, arrow.any()) + ); + const schema = new arrow.Schema(schemaFields); + + // Create writer + const writer = new arrow.RecordBatchWriter([schema]); + + // Write rows for (const row of data) { - const recordBatch = arrow.recordBatch.fromObjects([row], writer.schema); + const recordBatch = arrow.recordBatch.fromObjects([row], schema); writer.write(recordBatch); } await writer.close(); - // Read from the underlying buffer - return buffer; + // Read buffer + return writer.toBuffer(); + } else if (payload_type === 'jsontable') { + // data is already row-oriented Array + // Serialize directly to JSON + const jsonStr = JSON.stringify(data); + return Buffer.from(jsonStr, 'utf8'); } else if (payload_type === 'image') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); @@ -1168,10 +1361,15 @@ async function deserializeData(data, payload_type, correlation_id) { } else if (payload_type === 'dictionary') { const jsonStr = Buffer.from(data).toString('utf8'); return JSON.parse(jsonStr); - } else if (payload_type === 'table') { + } else if (payload_type === 'arrowtable') { + // Deserialize from Arrow IPC const buffer = Buffer.from(data); const table = arrow.tableFromRawBytes(buffer); return table; + } else if (payload_type === 'jsontable') { + // Deserialize from JSON - returns Array (row-oriented) + const jsonStr = Buffer.from(data).toString('utf8'); + return JSON.parse(jsonStr); } else if (payload_type === 'image') { return Buffer.from(data); } else if (payload_type === 'audio') { @@ -1489,7 +1687,8 @@ from typing import Any try: import pyarrow as arrow - import pyarrow.parquet as pq + import pyarrow.feather as feather + import pyarrow.ipc as ipc ARROW_AVAILABLE = True except ImportError: ARROW_AVAILABLE = False @@ -1505,22 +1704,27 @@ def _serialize_data(data: Any, payload_type: str) -> bytes: elif payload_type == 'dictionary': json_str = json.dumps(data) return json_str.encode('utf-8') - elif payload_type == 'table': + elif payload_type == 'arrowtable': if not ARROW_AVAILABLE: raise Error('pyarrow not available for table serialization') - # Convert DataFrame to Arrow import io buf = io.BytesIO() import pandas as pd if isinstance(data, pd.DataFrame): + # Column-oriented DataFrame to Arrow table = arrow.Table.from_pandas(data) sink = arrow.ipc.new_file(buf) arrow.ipc.write_table(table, sink) sink.close() return buf.getvalue() else: - raise Error('Table data must be a pandas DataFrame') + raise Error('arrowtable data must be a pandas DataFrame') + elif payload_type == 'jsontable': + # data is list[dict] or list (row-oriented) + # Serialize directly to JSON + json_str = json.dumps(data) + return json_str.encode('utf-8') elif payload_type == 'image': if isinstance(data, (bytes, bytearray)): return bytes(data) @@ -1554,6 +1758,8 @@ from typing import Any try: import pyarrow as arrow + import pyarrow.feather as feather + import pyarrow.ipc as ipc ARROW_AVAILABLE = True except ImportError: ARROW_AVAILABLE = False @@ -1566,7 +1772,7 @@ def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> An elif payload_type == 'dictionary': json_str = data.decode('utf-8') return json.loads(json_str) - elif payload_type == 'table': + elif payload_type == 'arrowtable': if not ARROW_AVAILABLE: raise Error('pyarrow not available for table deserialization') @@ -1574,6 +1780,10 @@ def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> An buf = io.BytesIO(data) reader = arrow.ipc.open_file(buf) return reader.read_all().to_pandas() + elif payload_type == 'jsontable': + # Deserialize from JSON - returns list[dict] (row-oriented) + json_str = data.decode('utf-8') + return json.loads(json_str) elif payload_type == 'image': return data elif payload_type == 'audio': @@ -1684,7 +1894,8 @@ MicroPython has significant constraints compared to desktop implementations: | Arrow IPC | ✅ | ❌ (not supported) | | Async/Await | ✅ | ⚠️ (uasyncio only) | | Large payloads (>1MB) | ✅ | ❌ (enforced limit) | -| Table type | ✅ | ❌ | +| arrowtable | ✅ | ❌ | +| jsontable | ⚠️ (limited) | ⚠️ (limited) | | Multiple payloads | ✅ | ⚠️ (limited) | ### MicroPython Module Structure @@ -1704,6 +1915,9 @@ DEFAULT_BROKER_URL = "nats://localhost:4222" DEFAULT_FILESERVER_URL = "http://localhost:8080" MAX_PAYLOAD_SIZE = 50000 # Hard limit +# Note: MicroPython uses list[list] for jsontable (row-oriented) +# No DataFrame support - data is always row-oriented + class NATSBridge: """MicroPython NATS bridge implementation.""" @@ -1811,11 +2025,14 @@ class NATSBridge: return env_json_obj def _serialize_data(self, data, payload_type): - """Serialize data (MicroPython version - no table support).""" + """Serialize data (MicroPython version - no arrowtable support).""" if payload_type == 'text': return data.encode('utf-8') elif payload_type == 'dictionary': return json.dumps(data).encode('utf-8') + elif payload_type == 'jsontable': + # data is list[list] (row-oriented) + return json.dumps(data).encode('utf-8') elif payload_type in ('image', 'audio', 'video', 'binary'): return bytes(data) else: @@ -1827,6 +2044,9 @@ class NATSBridge: return data.decode('utf-8') elif payload_type == 'dictionary': return json.loads(data.decode('utf-8')) + elif payload_type == 'jsontable': + # Returns list[list] (row-oriented) + return json.loads(data.decode('utf-8')) elif payload_type in ('image', 'audio', 'video', 'binary'): return data else: @@ -1926,6 +2146,13 @@ All platforms use correlation IDs for distributed tracing: [timestamp] [Correlation: abc123] Message published to subject ``` +### Serialization Performance + +| Format | Use Case | Pros | Cons | +|--------|----------|------|------| +| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library, not supported in MicroPython | +| `jsontable` | Small/medium tabular data | Human-readable, universal support, works in MicroPython | Slower, larger size, no schema enforcement | + --- ## Testing @@ -1978,6 +2205,12 @@ python3 test/test_py_text_receiver.py - Reduce `size_threshold` - Use direct transport only (< 100KB) - Avoid large payloads + - Use `jsontable` instead of `arrowtable` (arrowtable not supported) + +5. **Row-Oriented vs Column-Oriented Conversion Issues** + - Julia/Python: DataFrames are column-oriented; when sending `jsontable`, they are converted to row-oriented JSON + - JavaScript/MicroPython: Data is natively row-oriented + - When receiving `jsontable` in Julia/Python, JSON is automatically converted back to column-oriented DataFrame --- @@ -1993,6 +2226,16 @@ This cross-platform NATS bridge provides: - **MicroPython**: Synchronous API, memory-constrained optimizations 3. **Message Format Consistency**: Identical JSON schemas across all platforms 4. **Handler Abstraction**: File server operations abstracted through configurable handlers -5. **Platform-Specific Optimizations**: Arrow IPC in desktop platforms, streaming support in MicroPython +5. **Platform-Specific Optimizations**: + - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython) + - **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in all platforms) +6. **Row-Oriented ↔ Column-Oriented Conversion**: Automatic conversion between row-oriented (JS, MicroPython) and column-oriented (Julia DataFrame, Python pandas) formats when using `jsontable` -The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior. \ No newline at end of file +The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior. + +### Datatype Summary + +| Datatype | Serialization | Use Case | Encoding | Supported Platforms | +|----------|---------------|----------|----------|---------------------| +| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python | +| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python, MicroPython | diff --git a/docs/updated_architecture.md b/docs/updated_architecture.md deleted file mode 100644 index 8b57af8..0000000 --- a/docs/updated_architecture.md +++ /dev/null @@ -1,1322 +0,0 @@ -# Cross-Platform Architecture Documentation: Bi-Directional Data Bridge - -## Overview - -This document describes the architecture for a high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. - -**Supported Platforms:** -- **Julia** - Ground truth implementation with full feature set -- **JavaScript** - Node.js and browser-compatible implementation -- **Python/MicroPython** - Desktop and embedded-compatible implementation - -### Cross-Platform Design Principles - -1. **High-Level API Parity**: All three platforms expose the same `smartsend()` and `smartreceive()` functions with identical signatures and behavior -2. **Idiomatic Implementations**: Each platform uses its native patterns (multiple dispatch in Julia, async/prototype in JS, class-based in Python) -3. **Message Format Consistency**: The `msg_envelope_v1` and `msg_payload_v1` JSON schemas are identical across all platforms -4. **Handler Function Abstraction**: File server operations are abstracted through handler functions for backend flexibility - ---- - -## High-Level API Standard (Cross-Platform) - -### Unified API Signature - -All three platforms expose the same high-level API: - -**Input Format (smartsend):** -``` -[(dataname1, data1, type1), (dataname2, data2, type2), ...] -``` - -**Output Format (smartreceive):** -``` -{ - "correlation_id": "...", - "msg_id": "...", - "timestamp": "...", - "send_to": "...", - "msg_purpose": "...", - "sender_name": "...", - "sender_id": "...", - "receiver_name": "...", - "receiver_id": "...", - "reply_to": "...", - "reply_to_msg_id": "...", - "broker_url": "...", - "metadata": {...}, - "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] -} -``` - -### Supported Payload Types - -| Type | Julia | JavaScript | Python/MicroPython | -|------|-------|------------|-------------------| -| `text` | `String` | `string` | `str` | -| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | -| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | -| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | -| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | -| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | -| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | -| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray`, `io.BytesIO` | - -### Cross-Platform API Examples - -**Julia:** -```julia -using NATSBridge - -# Send -env, env_json_str = smartsend( - "/chat", - [("message", "Hello!", "text"), ("image", image_bytes, "image")], - broker_url="nats://localhost:4222" -) - -# Receive - returns JSON.Object{String, Any} -env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) -# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}} -# Access payloads: for (dataname, data, type) in env["payloads] -``` - -**JavaScript:** -```javascript -const NATSBridge = require('natsbridge'); - -// Send -const [env, env_json_str] = await NATSBridge.smartsend( - "/chat", - [ - ["message", "Hello!", "text"], - ["image", imageBuffer, "image"] - ], - { broker_url: "nats://localhost:4222" } -); - -// Receive - returns Promise -const env = await NATSBridge.smartreceive(msg, { - fileserver_download_handler: fetchWithBackoff -}); -// env is an object with "payloads" field containing Array of arrays -// Access payloads: for (const [dataname, data, type] of env.payloads) -``` - -**Python:** -```python -from natsbridge import NATSBridge - -# Send -env, env_json_str = NATSBridge.smartsend( - "/chat", - [("message", "Hello!", "text"), ("image", image_bytes, "image")], - broker_url="nats://localhost:4222" -) - -# Receive - returns Tuple[Dict, str] -env = NATSBridge.smartreceive( - msg, - fileserver_download_handler=fetch_with_backoff -) -# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]] -# Access payloads: for dataname, data, type_ in env["payloads"] -``` - -**MicroPython:** -```python -from natsbridge import NATSBridge - -# Send (limited to direct transport due to memory constraints) -env, env_json_str = NATSBridge.smartsend( - "/chat", - [("message", "Hello!", "text")], - broker_url="nats://localhost:4222" -) -``` - ---- - -## Architecture Diagram (Cross-Platform) - -```mermaid -flowchart TB - subgraph JuliaApp["Julia Application"] - JuliaAppCode[App Code] - JuliaBridge[NATSBridge.jl] - JuliaNATS[NATS.jl] - end - - subgraph JSApp["JavaScript Application"] - JSAppCode[App Code] - JSBridge[NATSBridge.js] - JSNATS[nats.js] - end - - subgraph PythonApp["Python/MicroPython Application"] - PythonAppCode[App Code] - PythonBridge[NATSBridge.py] - PythonNATS[nats.py] - end - - subgraph Infrastructure["Infrastructure"] - NATS[NATS Server
Message Broker] - FileServer[HTTP File Server
Upload/Download] - end - - JuliaAppCode --> JuliaBridge - JuliaBridge --> JuliaNATS - JSAppCode --> JSBridge - JSBridge --> JSNATS - PythonAppCode --> PythonBridge - PythonBridge --> PythonNATS - - JuliaNATS --> NATS - JSNATS --> NATS - PythonNATS --> NATS - - NATS --> JuliaNATS - NATS --> JSNATS - NATS --> PythonNATS - - JuliaBridge -.->|HTTP POST upload| FileServer - JSBridge -.->|HTTP POST upload| FileServer - PythonBridge -.->|HTTP POST upload| FileServer - - FileServer -.->|HTTP GET download| JuliaBridge - FileServer -.->|HTTP GET download| JSBridge - FileServer -.->|HTTP GET download| PythonBridge - - style JuliaApp fill:#c5e1a5 - style JSApp fill:#bbdefb - style PythonApp fill:#f8bbd0 - style NATS fill:#fff3e0 - style FileServer fill:#f3e5f5 -``` - ---- - -## System Components - -### 1. msg_envelope_v1 - Message Envelope - -**JSON Schema (Identical Across All Platforms):** -```json -{ - "correlation_id": "uuid-v4-string", - "msg_id": "uuid-v4-string", - "timestamp": "2024-01-15T10:30:00Z", - - "send_to": "topic/subject", - "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat", - "sender_name": "agent-wine-web-frontend", - "sender_id": "uuid4", - "receiver_name": "agent-backend", - "receiver_id": "uuid4", - "reply_to": "topic", - "reply_to_msg_id": "uuid4", - "broker_url": "nats://localhost:4222", - - "metadata": { - "content_type": "application/octet-stream", - "content_length": 123456 - }, - - "payloads": [ - { - "id": "uuid4", - "dataname": "login_image", - "payload_type": "image", - "transport": "direct", - "encoding": "base64", - "size": 15433, - "data": "base64-encoded-string", - "metadata": { - "checksum": "sha256_hash" - } - }, - { - "id": "uuid4", - "dataname": "large_arrow_table", - "payload_type": "arrowtable", - "transport": "link", - "encoding": "arrow-ipc", - "size": 524288, - "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow", - "metadata": {} - }, - { - "id": "uuid4", - "dataname": "json_table", - "payload_type": "jsontable", - "transport": "direct", - "encoding": "json", - "size": 1024, - "data": "[{\"id\": 1, \"name\": \"Alice\"}, {\"id\": 2, \"name\": \"Bob\"}]", - "metadata": {} - } - ] -} -``` - -### 2. msg_payload_v1 - Payload Structure - -**JSON Schema (Identical Across All Platforms):** -```json -{ - "id": "uuid4", - "dataname": "login_image", - "payload_type": "image | dictionary | arrowtable | jsontable | text | audio | video | binary", - "transport": "direct | link", - "encoding": "none | json | base64 | arrow-ipc", - "size": 15433, - "data": "base64-encoded-string | http-url | json-string", - "metadata": { - "checksum": "sha256_hash" - } -} -``` - -### 3. Transport Strategy Decision Logic (Cross-Platform) - -``` -┌─────────────────────────────────────────────────────────────┐ -│ smartsend Function (All Platforms) │ -│ Accepts: [(dataname1, data1, type1), ...] │ -│ (Type is per payload, not standalone) │ -└─────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ For each payload: │ -│ 1. Extract type from tuple/array │ -│ 2. Serialize based on type │ -│ 3. Check payload size │ -└─────────────────────────────────────────────────────────────┘ - │ - ┌───────────┴────────────┐ - ▼ ▼ - ┌──────────────┐ ┌──────────────┐ - │ Direct Path │ │ Link Path │ - │ (< 1MB) │ │ (>= 1MB) │ - │ │ │ │ - │ • Serialize │ │ • Serialize │ - │ to buffer │ │ to buffer │ - │ • Base64/JSON│ │ • Upload to │ - │ encode │ │ HTTP Server│ - │ • Publish to │ │ • Publish to │ - │ NATS │ │ NATS with │ - │ (in msg) │ │ URL │ - └──────────────┘ └──────────────┘ -``` - ---- - -## Platform-Specific Implementations - -### Julia Implementation - -#### Architecture Patterns - -**Multiple Dispatch:** Julia's core strength is leveraged through function overloading: - -```julia -# publish_message has two overloads based on argument types -function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) - conn = NATS.connect(broker_url) - publish_message(conn, subject, message, correlation_id) -end - -function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) - try - NATS.publish(conn, subject, message) - log_trace(correlation_id, "Message published to $subject") - finally - NATS.drain(conn) - end -end -``` - -**Struct-Based Data Models:** -```julia -struct msg_payload_v1 - id::String - dataname::String - payload_type::String - transport::String - encoding::String - size::Integer - data::Any - metadata::Dict{String, Any} -end - -struct msg_envelope_v1 - correlation_id::String - msg_id::String - timestamp::String - send_to::String - msg_purpose::String - sender_name::String - sender_id::String - receiver_name::String - receiver_id::String - reply_to::String - reply_to_msg_id::String - broker_url::String - metadata::Dict{String, Any} - payloads::Vector{msg_payload_v1} -end -``` - -#### Dependencies - -| Package | Purpose | -|---------|---------| -| `NATS.jl` | Core NATS functionality | -| `Arrow.jl` | Arrow IPC serialization | -| `JSON.jl` | JSON parsing | -| `HTTP.jl` | HTTP client for file server | -| `UUIDs.jl` | UUID generation | -| `Dates.jl` | Timestamps | -| `Base64` | Base64 encoding | - -#### File Server Handler Signatures - -```julia -# Upload handler -fileserver_upload_handler( - fileserver_url::String, - dataname::String, - data::Vector{UInt8} -)::Dict{String, Any} - -# Download handler -fileserver_download_handler( - url::String, - max_retries::Int, - base_delay::Int, - max_delay::Int, - correlation_id::String -)::Vector{UInt8} -``` - -#### Key Functions - -```julia -# Main send/receive functions -function smartsend( - subject::String, - data::AbstractArray{Tuple{String, Any, String}, 1}; - broker_url::String = DEFAULT_BROKER_URL, - fileserver_url = DEFAULT_FILESERVER_URL, - fileserver_upload_handler::Function = plik_oneshot_upload, - size_threshold::Int = DEFAULT_SIZE_THRESHOLD, - correlation_id::String = string(uuid4()), - msg_purpose::String = "chat", - sender_name::String = "NATSBridge", - receiver_name::String = "", - receiver_id::String = "", - reply_to::String = "", - reply_to_msg_id::String = "", - is_publish::Bool = true, - NATS_connection::Union{NATS.Connection, Nothing} = nothing, - msg_id::String = string(uuid4()), - sender_id::String = string(uuid4()) -)::Tuple{msg_envelope_v1, String} - -function smartreceive( - msg::NATS.Msg; - fileserver_download_handler::Function = _fetch_with_backoff, - max_retries::Int = 5, - base_delay::Int = 100, - max_delay::Int = 5000 -)::JSON.Object{String, Any} -``` - -#### Serialization Logic for Tables - -```julia -# Serialize table data based on payload_type -function _serialize_table_data(data::Any, payload_type::String)::Vector{UInt8} - if payload_type == "arrowtable" - # Serialize to Apache Arrow IPC format - buffer = IOBuffer() - Arrow.write(buffer, data) - return take!(buffer) - elseif payload_type == "jsontable" - # Serialize to JSON format - json_str = JSON.json(data) - return Vector{UInt8}(json_str) - else - throw(ArgumentError("Unknown payload_type: $payload_type")) - end -end - -# Deserialize table data based on payload_type -function _deserialize_table_data(data::Vector{UInt8}, payload_type::String)::Any - if payload_type == "arrowtable" - # Deserialize from Apache Arrow IPC format - buffer = Buffer(data) - return Arrow.read(buffer) - elseif payload_type == "jsontable" - # Deserialize from JSON format - json_str = String(data) - return JSON.parse(json_str) - else - throw(ArgumentError("Unknown payload_type: $payload_type")) - end -end -``` - ---- - -### JavaScript Implementation - -#### Architecture Patterns - -**Async/Await Pattern:** JavaScript uses async/await for non-blocking I/O: - -```javascript -// smartsend is async and returns a Promise -async function smartsend(subject, data, options = {}) { - const { - broker_url = DEFAULT_BROKER_URL, - fileserver_url = DEFAULT_FILESERVER_URL, - fileserver_upload_handler = plikOneshotUpload, - size_threshold = DEFAULT_SIZE_THRESHOLD, - correlation_id = generateUUID(), - msg_purpose = "chat", - sender_name = "NATSBridge", - receiver_name = "", - receiver_id = "", - reply_to = "", - reply_to_msg_id = "", - is_publish = true, - nats_connection = null, - msg_id = generateUUID(), - sender_id = generateUUID() - } = options; - - // Process payloads - const payloads = []; - for (const [dataname, payloadData, payloadType] of data) { - const payloadBytes = await serializeData(payloadData, payloadType); - const payloadSize = payloadBytes.byteLength; - - if (payloadSize < size_threshold) { - // Direct path - const payloadB64 = base64Encode(payloadBytes); - payloads.push({ - id: generateUUID(), - dataname, - payload_type: payloadType, - transport: "direct", - encoding: "base64", - size: payloadSize, - data: payloadB64 - }); - } else { - // Link path - const response = await fileserver_upload_handler( - fileserver_url, dataname, payloadBytes - ); - payloads.push({ - id: generateUUID(), - dataname, - payload_type: payloadType, - transport: "link", - encoding: "none", - size: payloadSize, - data: response.url - }); - } - } - - const env = buildEnvelope(subject, payloads, { - correlation_id, msg_id, msg_purpose, - sender_name, sender_id, receiver_name, - receiver_id, reply_to, reply_to_msg_id, - broker_url - }); - - const env_json_str = JSON.stringify(env); - - if (is_publish) { - if (nats_connection) { - await publishMessage(nats_connection, subject, env_json_str, correlation_id); - } else { - await publishMessage(broker_url, subject, env_json_str, correlation_id); - } - } - - return [env, env_json_str]; -} -``` - -**Prototype-Based Utilities:** -```javascript -// NATS client wrapper (prototype-based) -class NATSClient { - constructor(url) { - this.url = url; - this.connection = null; - } - - async connect() { - this.connection = await nats.connect({ servers: this.url }); - return this.connection; - } - - async publish(subject, message) { - if (!this.connection) { - await this.connect(); - } - await this.connection.publish(subject, message); - } - - async close() { - if (this.connection) { - this.connection.close(); - } - } -} -``` - -#### Dependencies (Node.js) - -| Package | Purpose | -|---------|---------| -| `nats` | Core NATS functionality (nats.js) | -| `crypto` (built-in) | UUID generation (Node.js) | -| `node-fetch` or `axios` | HTTP client for file server | -| `apache-arrow` | Arrow IPC serialization | - -#### Dependencies (Browser) - -| Package | Purpose | -|---------|---------| -| `nats` | Browser-compatible NATS client | -| `crypto` (built-in) | UUID generation (browser) | -| `fetch` (native) | HTTP client for file server | -| `apache-arrow` | Arrow IPC serialization | - -#### Dependencies (MicroPython) - -| Module | Purpose | -|--------|---------| -| `nats` (custom) | MicroPython NATS client | -| `time` | Timestamps | -| `uos` | File operations | -| `base64` | Base64 encoding | - -#### File Server Handler Signatures - -```javascript -// Upload handler - async function returning Promise -async function fileserver_upload_handler( - fileserver_url, - dataname, - data // Uint8Array -) { - // Returns: { status, uploadid, fileid, url } -} - -// Download handler - async function returning Promise -async function fileserver_download_handler( - url, - max_retries, - base_delay, - max_delay, - correlation_id -) { - // Returns: Uint8Array -} -``` - -#### Key Functions - -```javascript -// Main send/receive functions -async function smartsend(subject, data, options = {}) { - // data: Array of [dataname, data, type] tuples - // Returns: Promise<[env, env_json_str]> -} - -async function smartreceive(msg, options = {}) { - // msg: NATS message object - // Returns: Promise -} - -// Utility functions -async function serializeData(data, payload_type) { - // Returns: Uint8Array -} - -async function deserializeData(data, payload_type) { - // Returns: deserialized data -} - -async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) { - // Returns: Uint8Array -} -``` - -#### Serialization Logic for Tables - -```javascript -// Serialize table data based on payload_type -async function serializeTableData(data, payload_type) { - if (payload_type === "arrowtable") { - // Serialize to Apache Arrow IPC format - const schema = new arrow.Schema([...]); // Define schema - const arr = arrow.tableToArrowTable(data, schema); - const buffer = arrow.RecordBatch.from(arr).toBuffer(); - return new Uint8Array(buffer); - } else if (payload_type === "jsontable") { - // Serialize to JSON format - const jsonStr = JSON.stringify(data); - return new TextEncoder().encode(jsonStr); - } else { - throw new Error(`Unknown payload_type: ${payload_type}`); - } -} - -// Deserialize table data based on payload_type -async function deserializeTableData(data, payload_type) { - if (payload_type === "arrowtable") { - // Deserialize from Apache Arrow IPC format - const buffer = arrow.arrayBufferToBuffer(data.buffer); - const batch = arrow.RecordBatch.deserialize(buffer); - return arrow.tableFromBatch(batch); - } else if (payload_type === "jsontable") { - // Deserialize from JSON format - const jsonStr = new TextDecoder().decode(data); - return JSON.parse(jsonStr); - } else { - throw new Error(`Unknown payload_type: ${payload_type}`); - } -} -``` - ---- - -### Python/MicroPython Implementation - -#### Architecture Patterns - -**Class-Based Design:** Python uses classes for stateful operations: - -```python -class NATSBridge: - """Cross-platform NATS bridge implementation.""" - - DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - DEFAULT_BROKER_URL = "nats://localhost:4222" - DEFAULT_FILESERVER_URL = "http://localhost:8080" - - def __init__(self, broker_url=None, fileserver_url=None): - self.broker_url = broker_url or self.DEFAULT_BROKER_URL - self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL - self._nats_client = None - - async def smartsend(self, subject, data, **kwargs): - """ - Send data via NATS with automatic transport selection. - - Args: - subject: NATS subject to publish to - data: List of (dataname, data, type) tuples - **kwargs: Additional options (broker_url, fileserver_url, etc.) - - Returns: - Tuple of (env, env_json_str) - """ - # Extract options with defaults - options = self._merge_options(kwargs) - - # Process payloads - payloads = [] - for dataname, payload_data, payload_type in data: - payload_bytes = self._serialize_data(payload_data, payload_type) - payload_size = len(payload_bytes) - - if payload_size < options['size_threshold']: - # Direct path - payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') - payloads.append({ - 'id': uuid.uuid4().hex, - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'direct', - 'encoding': 'base64', - 'size': payload_size, - 'data': payload_b64 - }) - else: - # Link path - response = await options['fileserver_upload_handler']( - options['fileserver_url'], dataname, payload_bytes - ) - payloads.append({ - 'id': uuid.uuid4().hex, - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'link', - 'encoding': 'none', - 'size': payload_size, - 'data': response['url'] - }) - - # Build envelope - env = self._build_envelope(subject, payloads, options) - env_json_str = json.dumps(env) - - if options['is_publish']: - await self._publish_message( - subject, env_json_str, options['correlation_id'], - nats_connection=options.get('nats_connection') - ) - - return env, env_json_str - - async def smartreceive(self, msg, **kwargs): - """ - Receive and process NATS message. - - Args: - msg: NATS message object - **kwargs: Additional options (fileserver_download_handler, etc.) - - Returns: - Dict with envelope metadata and payloads - """ - # Parse envelope - env_json_obj = json.loads(msg.payload) - - # Process payloads - payloads_list = [] - for payload in env_json_obj['payloads']: - transport = payload['transport'] - dataname = payload['dataname'] - - if transport == 'direct': - payload_b64 = payload['data'] - payload_bytes = base64.b64decode(payload_b64) - data_type = payload['payload_type'] - data = self._deserialize_data(payload_bytes, data_type) - payloads_list.append((dataname, data, data_type)) - elif transport == 'link': - url = payload['data'] - downloaded_data = await options['fileserver_download_handler']( - url, - options['max_retries'], - options['base_delay'], - options['max_delay'], - env_json_obj['correlation_id'] - ) - data_type = payload['payload_type'] - data = self._deserialize_data(downloaded_data, data_type) - payloads_list.append((dataname, data, data_type)) - - env_json_obj['payloads'] = payloads_list - return env_json_obj -``` - -**Dataclass for Type Safety:** -```python -from dataclasses import dataclass, field -from typing import Any, Dict, List, Tuple, Union - -@dataclass -class MsgPayloadV1: - """Message payload structure.""" - id: str - dataname: str - payload_type: str - transport: str - encoding: str - size: int - data: Union[str, bytes] # URL for link, base64 for direct - metadata: Dict[str, Any] = field(default_factory=dict) - -@dataclass -class MsgEnvelopeV1: - """Message envelope structure.""" - correlation_id: str - msg_id: str - timestamp: str - send_to: str - msg_purpose: str - sender_name: str - sender_id: str - receiver_name: str - receiver_id: str - reply_to: str - reply_to_msg_id: str - broker_url: str - metadata: Dict[str, Any] = field(default_factory=dict) - payloads: List[MsgPayloadV1] = field(default_factory=list) -``` - -#### Dependencies (Desktop Python) - -| Package | Purpose | -|---------|---------| -| `nats-py` | Core NATS functionality | -| `uuid` | UUID generation (stdlib) | -| `aiohttp` or `requests` | HTTP client for file server | -| `pyarrow` | Arrow IPC serialization | -| `pandas` | DataFrame support (optional) | -| `python-dateutil` | Timestamps | -| `base64` | Base64 encoding (stdlib) | - -#### Dependencies (MicroPython) - -| Module | Purpose | -|--------|---------| -| `network` | NATS connection (custom) | -| `time` | Timestamps | -| `uos` | File operations | -| `base64` | Base64 encoding | -| `json` | JSON parsing | -| `struct` | Binary data handling | - -**MicroPython Limitations:** -- No Arrow IPC support (memory constraints) -- Only direct transport (< 1MB threshold enforced) -- Simplified UUID generation -- No async/await (use callbacks or uasyncio) - -#### File Server Handler Signatures - -```python -# Upload handler - async function -async def fileserver_upload_handler( - fileserver_url: str, - dataname: str, - data: bytes -) -> Dict[str, Any]: - """ - Upload data to file server. - - Args: - fileserver_url: Base URL of file server - dataname: Name of the file - data: Binary data - - Returns: - Dict with keys: 'status', 'uploadid', 'fileid', 'url' - """ - pass - -# Download handler - async function -async def fileserver_download_handler( - url: str, - max_retries: int, - base_delay: int, - max_delay: int, - correlation_id: str -) -> bytes: - """ - Download data from URL with exponential backoff. - - Args: - url: URL to download from - max_retries: Maximum retry attempts - base_delay: Initial delay in ms - max_delay: Maximum delay in ms - correlation_id: Correlation ID for logging - - Returns: - Downloaded bytes - """ - pass -``` - -#### Key Functions - -```python -# Main send/receive functions (standalone or class methods) -async def smartsend( - subject: str, - data: List[Tuple[str, Any, str]], - broker_url: str = DEFAULT_BROKER_URL, - fileserver_url: str = DEFAULT_FILESERVER_URL, - fileserver_upload_handler: Callable = plik_oneshot_upload, - size_threshold: int = DEFAULT_SIZE_THRESHOLD, - correlation_id: str = None, - msg_purpose: str = "chat", - sender_name: str = "NATSBridge", - receiver_name: str = "", - receiver_id: str = "", - reply_to: str = "", - reply_to_msg_id: str = "", - is_publish: bool = True, - nats_connection: Any = None, - msg_id: str = None, - sender_id: str = None -) -> Tuple[Dict, str]: - """Send data via NATS.""" - pass - -async def smartreceive( - msg: Any, - fileserver_download_handler: Callable = fetch_with_backoff, - max_retries: int = 5, - base_delay: int = 100, - max_delay: int = 5000 -) -> Dict: - """Receive and process NATS message.""" - pass - -# Utility functions -def _serialize_data(data: Any, payload_type: str) -> bytes: - """Serialize data to bytes.""" - pass - -def _deserialize_data(data: bytes, payload_type: str) -> Any: - """Deserialize bytes to data.""" - pass - -async def fetch_with_backoff( - url: str, - max_retries: int, - base_delay: int, - max_delay: int, - correlation_id: str -) -> bytes: - """Fetch URL with exponential backoff.""" - pass -``` - -#### Serialization Logic for Tables - -```python -# Serialize table data based on payload_type -def serialize_table_data(data: Any, payload_type: str) -> bytes: - if payload_type == "arrowtable": - # Serialize to Apache Arrow IPC format - import pyarrow as pa - import pyarrow.feather as feather - import io - - if isinstance(data, pd.DataFrame): - table = pa.Table.from_pandas(data) - buffer = io.BytesIO() - feather.write_feather(table, buffer) - return buffer.getvalue() - else: - raise TypeError("Expected pandas DataFrame for arrowtable") - - elif payload_type == "jsontable": - # Serialize to JSON format - if isinstance(data, list) and all(isinstance(row, dict) for row in data): - return json.dumps(data).encode('utf-8') - else: - raise TypeError("Expected list of dicts for jsontable") - - else: - raise ValueError(f"Unknown payload_type: {payload_type}") - -# Deserialize table data based on payload_type -def deserialize_table_data(data: bytes, payload_type: str) -> Any: - if payload_type == "arrowtable": - # Deserialize from Apache Arrow IPC format - import pyarrow as pa - import pyarrow.feather as feather - import io - - buffer = io.BytesIO(data) - table = feather.read_table(buffer) - return table.to_pandas() - - elif payload_type == "jsontable": - # Deserialize from JSON format - json_str = data.decode('utf-8') - return json.loads(json_str) - - else: - raise ValueError(f"Unknown payload_type: {payload_type}") -``` - ---- - -## Platform Comparison Matrix - -| Feature | Julia | JavaScript | Python | MicroPython | -|---------|-------|------------|--------|-------------| -| **Multiple Dispatch** | ✅ Native | ❌ (Prototypes) | ❌ (Overload via `@overload`) | ❌ | -| **Async/Await** | ❌ (Tasks) | ✅ Native | ✅ Native | ⚠️ (uasyncio) | -| **Type Safety** | ✅ Strong | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ | -| **Memory Management** | ✅ GC | ✅ GC | ✅ GC | ⚠️ (Manual) | -| **Arrow IPC** | ✅ Native | ✅ (arrow package) | ✅ (pyarrow) | ❌ | -| **JSON Serialization** | ✅ (JSON.jl) | ✅ (native) | ✅ (json) | ✅ (json) | -| **arrowtable Support** | ✅ | ✅ | ✅ | ❌ | -| **jsontable Support** | ✅ | ✅ | ✅ | ✅ | -| **Direct Transport** | ✅ | ✅ | ✅ | ✅ | -| **Link Transport** | ✅ | ✅ | ✅ | ⚠️ (Limited) | -| **Handler Functions** | ✅ | ✅ | ✅ | ✅ | -| **Cross-Platform API** | ✅ | ✅ | ✅ | ✅ | - ---- - -## Implementation Details by Platform - -### Julia: Multiple Dispatch Pattern - -```julia -# Function overloading based on argument types -function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) - # Creates new connection -end - -function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) - # Uses pre-existing connection -end - -# Type-specific serialization -function _serialize_data(data::String, payload_type::String) - # Text handling -end - -function _serialize_data(data::Dict, payload_type::String) - # Dictionary handling -end - -function _serialize_data(data::DataFrame, payload_type::String) - # Table handling - arrowtable -end - -function _serialize_data(data::Vector{NamedTuple}, payload_type::String) - # Table handling - jsontable -end -``` - -### JavaScript: Prototype + Async Pattern - -```javascript -// Class-based NATS client -class NATSClient { - constructor(url) { - this.url = url; - } - - async connect() { - // Connection logic - } - - async publish(subject, message) { - // Publish logic - } -} - -// Module-level utility functions -function generateUUID() { - return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => { - const r = Math.random() * 16 | 0; - return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16); - }); -} - -async function serializeData(data, payload_type) { - // Serialization logic for arrowtable and jsontable -} -``` - -### Python: Class-Based Pattern - -```python -class NATSBridge: - """Main bridge class.""" - - def __init__(self, broker_url=None): - self.broker_url = broker_url or DEFAULT_BROKER_URL - - async def smartsend(self, subject, data, **kwargs): - """Send data.""" - pass - - async def smartreceive(self, msg, **kwargs): - """Receive message.""" - pass - -# Module-level convenience functions -def smartsend(subject, data, **kwargs): - """Convenience function using default NATSBridge instance.""" - bridge = NATSBridge() - return await bridge.smartsend(subject, data, **kwargs) - -def smartreceive(msg, **kwargs): - """Convenience function using default NATSBridge instance.""" - bridge = NATSBridge() - return await bridge.smartreceive(msg, **kwargs) -``` - ---- - -## Scenario Implementations (Cross-Platform) - -### Scenario 1: Command & Control (Small Dictionary) - -| Platform | Code | -|----------|------| -| **Julia** | ```julia
config = Dict("step_size" => 0.01)
env, env_json_str = smartsend("control", [("config", config, "dictionary")])``` | -| **JavaScript** | ```javascript
const config = { step_size: 0.01 };
[env, env_json_str] = await smartsend("control", [["config", config, "dictionary"]]);``` | -| **Python** | ```python
config = {"step_size": 0.01}
env, env_json_str = await smartsend("control", [("config", config, "dictionary")])``` | - -### Scenario 2: Deep Dive Analysis (Large Arrow Table) - -| Platform | Code | -|----------|------| -| **Julia** | ```julia
df = DataFrame(id=1:1000000, value=rand(1000000))
env, env_json_str = smartsend("analysis", [("table_data", df, "arrowtable")])``` | -| **JavaScript** | ```javascript
const df = [{ id: 1, value: 0.5 }, ...];
[env, env_json_str] = await smartsend("analysis", [["table_data", df, "arrowtable"]]);``` | -| **Python** | ```python
import pandas as pd
df = pd.DataFrame({"id": range(1000000), "value": np.random.rand(1000000)})
env, env_json_str = await smartsend("analysis", [("table_data", df, "arrowtable")])``` | - -### Scenario 3: Chat System (Multi-Payload) - -| Platform | Code | -|----------|------| -| **Julia** | ```julia
chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]
env, env_json_str = smartsend("chat", chat)``` | -| **JavaScript** | ```javascript
const chat = [["text", "Hello!", "text"], ["image", imgBuffer, "image"]];
[env, env_json_str] = await smartsend("chat", chat);``` | -| **Python** | ```python
chat = [("text", "Hello!", "text"), ("image", img_bytes, "image")]
env, env_json_str = await smartsend("chat", chat)``` | - -### Scenario 4: JSON Table Transfer (Cross-Platform) - -| Platform | Code | -|----------|------| -| **Julia** | ```julia
rows = [Dict("id" => 1, "name" => "Alice"), Dict("id" => 2, "name" => "Bob")]
env, env_json_str = smartsend("data", [("users", rows, "jsontable")])``` | -| **JavaScript** | ```javascript
const users = [{ id: 1, name: "Alice" }, { id: 2, name: "Bob" }];
[env, env_json_str] = await smartsend("data", [["users", users, "jsontable"]]);``` | -| **Python** | ```python
users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
env, env_json_str = await smartsend("data", [("users", users, "jsontable")])``` | - -### Scenario 5: Smart Transport Selection - -The `smartsend` function automatically selects the transport method based on payload size: - -- **Direct Transport (< 1MB)**: Payload is serialized and embedded directly in the NATS message - - `arrowtable`: Serialized to Arrow IPC, base64 encoded - - `jsontable`: Serialized to JSON, base64 encoded - - `dictionary`: Serialized to JSON, base64 encoded - - `text`: Serialized to UTF-8, base64 encoded - - `image/audio/video/binary`: Base64 encoded - -- **Link Transport (>= 1MB)**: Payload is uploaded to HTTP file server, URL embedded in message - - All types supported - - Receiver downloads from URL and deserializes - ---- - -## Performance Considerations (Cross-Platform) - -### Zero-Copy Reading - -| Platform | Strategy | -|----------|----------| -| **Julia** | `Arrow.read()` with memory-mapped files | -| **JavaScript** | `ArrayBuffer` with `DataView` | -| **Python** | `pyarrow` memory mapping | -| **MicroPython** | Not available (streaming only) | - -### Exponential Backoff - -```python -# Python/MicroPython -async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id): - delay = base_delay - for attempt in range(1, max_retries + 1): - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - return await response.read() - except Exception as e: - if attempt < max_retries: - await asyncio.sleep(delay / 1000.0) - delay = min(delay * 2, max_delay) - raise Exception("Failed to fetch after max retries") -``` - -### Correlation ID Logging - -All platforms use correlation IDs for distributed tracing: - -``` -[timestamp] [Correlation: abc123] Message published to subject -``` - -### Serialization Performance Comparison - -| Format | Use Case | Pros | Cons | -|--------|----------|------|------| -| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library | -| `jsontable` | Small/medium tabular data | Human-readable, universal support | Slower, larger size, no schema | - ---- - -## Testing Strategy (Cross-Platform) - -### Unit Tests - -| Test Type | Julia | JavaScript | Python | -|-----------|-------|------------|--------| -| **Serialization** | `test/test_julia_text_sender.jl` | `test/test_js_text_sender.js` | `test/test_py_text_sender.py` | -| **Deserialization** | `test/test_julia_text_receiver.jl` | `test/test_js_text_receiver.js` | `test/test_py_text_receiver.py` | -| **Large Payload** | `test/test_julia_file_sender.jl` | `test/test_js_file_sender.js` | `test/test_py_file_sender.py` | -| **Multi-Payload** | `test/test_julia_mix_payloads_sender.jl` | `test/test_js_mix_payloads_sender.js` | `test/test_py_mix_payloads_sender.py` | -| **Arrow Table** | `test/test_julia_table_sender.jl` | `test/test_js_table_sender.js` | `test/test_py_table_sender.py` | - -### Integration Tests - -- NATS server communication -- File server upload/download -- Cross-platform message exchange -- Arrow table serialization/deserialization -- JSON table serialization/deserialization - ---- - -## Configuration - -### Environment Variables - -| Variable | Default | Description | -|----------|---------|-------------| -| `NATS_URL` | `nats://localhost:4222` | NATS server URL | -| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | -| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) | - -### MicroPython-Specific Configuration - -```python -# micropython.conf -NATS_URL = "nats://broker.local:4222" -FILESERVER_URL = "http://fileserver.local:8080" -SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices -MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython -``` - ---- - -## Summary - -This cross-platform NATS bridge provides: - -1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across Julia, JavaScript, and Python/MicroPython -2. **Idiomatic Implementations**: - - Julia: Multiple dispatch and struct-based design - - JavaScript: Async/await and prototype-based utilities - - Python: Class-based design with type hints -3. **Message Format Consistency**: Identical `msg_envelope_v1` and `msg_payload_v1` JSON schemas -4. **Handler Abstraction**: File server operations abstracted through configurable handlers -5. **Platform-Specific Optimizations**: - - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data - - **JSON** (`jsontable`): Universal human-readable format for smaller tables - - Streaming support in MicroPython - -The Julia implementation serves as the **ground truth** for API design and behavior, while JavaScript and Python implementations maintain interface parity while leveraging their respective language idioms. - -### Datatype Summary - -| Datatype | Serialization | Use Case | Encoding | -|----------|---------------|----------|----------| -| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | -| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | diff --git a/docs/updated_implementation.md b/docs/updated_implementation.md deleted file mode 100644 index ed21567..0000000 --- a/docs/updated_implementation.md +++ /dev/null @@ -1,2241 +0,0 @@ -# Cross-Platform Implementation Guide: Bi-Directional Data Bridge - -## Overview - -This document describes the implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. - -**Supported Platforms:** -- **Julia** - Ground truth implementation (reference) -- **JavaScript** - Node.js and browser implementation -- **Python/MicroPython** - Desktop and embedded implementation - ---- - -## Implementation Files - -| Language | Implementation File | Description | -|----------|---------------------|-------------| -| **Julia** | [`src/NATSBridge.jl`](../src/NATSBridge.jl) | Full Julia implementation with Arrow IPC support | -| **JavaScript** | `src/natsbridge.js` | Node.js/browser implementation | -| **Python** | `src/natsbridge.py` | Desktop Python implementation | -| **MicroPython** | `src/natsbridge_mpy.py` | MicroPython implementation (limited features) | - ---- - -## File Server Handler Architecture - -The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). - -### Handler Function Signatures - -#### Julia - -```julia -# Upload handler - uploads data to file server and returns URL -fileserver_upload_handler( - fileserver_url::String, - dataname::String, - data::Vector{UInt8} -)::Dict{String, Any} - -# Download handler - fetches data from file server URL with exponential backoff -fileserver_download_handler( - url::String, - max_retries::Int, - base_delay::Int, - max_delay::Int, - correlation_id::String -)::Vector{UInt8} -``` - -#### JavaScript - -```javascript -// Upload handler - async function -async function fileserver_upload_handler( - fileserver_url, - dataname, - data // Uint8Array -) { - // Returns: { status, uploadid, fileid, url } -} - -// Download handler - async function -async function fileserver_download_handler( - url, - max_retries, - base_delay, - max_delay, - correlation_id -) { - // Returns: Uint8Array -} -``` - -#### Python - -```python -# Upload handler - async function -async def fileserver_upload_handler( - fileserver_url: str, - dataname: str, - data: bytes -) -> Dict[str, Any]: - """ - Upload data to file server. - - Returns: - Dict with keys: 'status', 'uploadid', 'fileid', 'url' - """ - pass - -# Download handler - async function -async def fileserver_download_handler( - url: str, - max_retries: int, - base_delay: int, - max_delay: int, - correlation_id: str -) -> bytes: - """ - Download data from URL with exponential backoff. - - Returns: - Downloaded bytes - """ - pass -``` - -#### MicroPython - -```python -# Upload handler - synchronous (no async in MicroPython) -def fileserver_upload_handler( - fileserver_url: str, - dataname: str, - data: bytearray -) -> Dict: - """ - Upload data to file server (synchronous). - - Returns: - Dict with keys: 'status', 'url' - """ - pass - -# Download handler - synchronous -def fileserver_download_handler( - url: str, - max_retries: int, - base_delay: int, - max_delay: int, - correlation_id: str -) -> bytearray: - """ - Download data from URL with exponential backoff (synchronous). - - Returns: - Downloaded bytes - """ - pass -``` - ---- - -## Multi-Payload Support (Standard API) - -The system uses a **standardized list-of-tuples format** for all payload operations across all platforms. - -### API Standard - -``` -# Input format for smartsend (always a list of tuples with type info) -[(dataname1, data1, type1), (dataname2, data2, type2), ...] - -# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples) -{ - "correlation_id": "...", - "msg_id": "...", - "timestamp": "...", - "send_to": "...", - "msg_purpose": "...", - "sender_name": "...", - "sender_id": "...", - "receiver_name": "...", - "receiver_id": "...", - "reply_to": "...", - "reply_to_msg_id": "...", - "broker_url": "...", - "metadata": {...}, - "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] -} -``` - -### Supported Types - -| Type | Julia | JavaScript | Python | MicroPython | -|------|-------|------------|--------|-------------| -| `text` | `String` | `string` | `str` | `str` | -| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | -| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) | -| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | `list` | -| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | -| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | -| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | -| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` | - -### Cross-Platform Examples - -#### Julia - -```julia -using NATSBridge - -# Single payload - still wrapped in a list -env, env_json_str = smartsend( - "/test", - [("dataname1", data1, "dictionary")], - broker_url="nats://localhost:4222", - fileserver_upload_handler=plik_oneshot_upload -) - -# Multiple payloads with different types -env, env_json_str = smartsend( - "/test", - [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")], - broker_url="nats://localhost:4222" -) - -# Mixed content (chat with text, image, audio) -env, env_json_str = smartsend( - "/chat", - [ - ("message_text", "Hello!", "text"), - ("user_image", image_data, "image"), - ("audio_clip", audio_data, "audio") - ], - broker_url="nats://localhost:4222" -) - -# Receive returns a JSON.Object{String, Any} envelope -env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) -# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}} -# Access payloads: env["payloads"] which is a Vector of tuples -for (dataname, data, type) in env["payloads"] - println("$dataname: $data (type: $type)") -end -``` - -#### JavaScript - -```javascript -const NATSBridge = require('natsbridge'); - -// Single payload -const [env, env_json_str] = await NATSBridge.smartsend( - "/test", - [["dataname1", data1, "dictionary"]], - { - broker_url: "nats://localhost:4222", - fileserver_upload_handler: plikOneshotUpload - } -); - -// Multiple payloads -const [env, env_json_str] = await NATSBridge.smartsend( - "/test", - [ - ["dataname1", data1, "dictionary"], - ["dataname2", data2, "arrowtable"] - ], - { broker_url: "nats://localhost:4222" } -); - -// Mixed content -const [env, env_json_str] = await NATSBridge.smartsend( - "/chat", - [ - ["message_text", "Hello!", "text"], - ["user_image", imageData, "image"], - ["audio_clip", audioData, "audio"] - ], - { broker_url: "nats://localhost:4222" } -); - -// Receive -const env = await NATSBridge.smartreceive(msg, { - fileserver_download_handler: fetchWithBackoff -}); -// env is an object with "payloads" field containing Array of arrays -// Access payloads: env.payloads which is an Array of [dataname, data, type] arrays -for (const [dataname, data, type] of env.payloads) { - console.log(`${dataname}: ${data} (type: ${type})`); -} -``` - -#### Python - -```python -from natsbridge import NATSBridge - -# Single payload -env, env_json_str = await NATSBridge.smartsend( - "/test", - [("dataname1", data1, "dictionary")], - broker_url="nats://localhost:4222", - fileserver_upload_handler=plik_oneshot_upload -) - -# Multiple payloads -env, env_json_str = await NATSBridge.smartsend( - "/test", - [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")], - broker_url="nats://localhost:4222" -) - -# Mixed content -env, env_json_str = await NATSBridge.smartsend( - "/chat", - [ - ("message_text", "Hello!", "text"), - ("user_image", image_data, "image"), - ("audio_clip", audio_data, "audio") - ], - broker_url="nats://localhost:4222" -) - -# Receive -env = await NATSBridge.smartreceive( - msg, - fileserver_download_handler=fetch_with_backoff -) -# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]] -# Access payloads: env["payloads"] which is a list of tuples -for dataname, data, type_ in env["payloads"]: - print(f"{dataname}: {data} (type: {type_})") -``` - -#### MicroPython - -```python -from natsbridge import NATSBridge - -# Limited to text and binary (no tables due to memory constraints) -env, env_json_str = NATSBridge.smartsend( - "/chat", - [ - ("message_text", "Hello!", "text"), - ("binary_data", data_bytes, "binary") - ], - broker_url="nats://localhost:4222", - size_threshold=100000 # Lower threshold for memory constraints -) -# Note: MicroPython uses synchronous handlers -``` - ---- - -## Row-Oriented vs Column-Oriented Data Structures - -Different platforms use different internal representations for tabular data. Understanding these differences is crucial for proper serialization/deserialization when using `jsontable` and `arrowtable` datatypes. - -### Data Structure Comparison - -| Platform | Table Structure | Orientation | -|----------|-----------------|-------------| -| **Julia (DataFrame)** | `Dict{String, Vector}` | Column-oriented | -| **Python (pandas)** | `dict[str, list]` | Column-oriented | -| **JavaScript** | `Array` | Row-oriented | -| **MicroPython** | `list[list]` | Row-oriented | - -### Column-Oriented (Julia DataFrame, Python pandas) - -In column-oriented structures, each column is stored as a separate array/vector: - -**Julia Example:** -```julia -# Create dictionary with column vectors -dict = Dict("customer age" => [15, 20, 25], - "first name" => ["Rohit", "Rahul", "Akshat"]) - -# Convert to DataFrame -df = DataFrame(dict) -println(df) -# Output: -# 3×2 DataFrame -# Row ┆ customer age ┆ first name -# ┆ Int64 ┆ String -# ─────┼──────────────┼──────────── -# 1 ┆ 15 ┆ "Rohit" -# 2 ┆ 20 ┆ "Rahul" -# 3 ┆ 25 ┆ "Akshat" -``` - -**Python Example:** -```python -# Create dictionary with column lists -data = { - "Name": ["Alice", "Bob", "Charlie"], - "Age": [25, 30, 35], - "Score": [88.5, 92.0, 79.5] -} - -# Convert to DataFrame -df = pd.DataFrame(data) -print(df) -# Output: -# Name Age Score -# 0 Alice 25 88.5 -# 1 Bob 30 92.0 -# 2 Charlie 35 79.5 -``` - -### Row-Oriented (JavaScript, MicroPython) - -In row-oriented structures, each row is stored as a separate object/array: - -**JavaScript Example:** -```javascript -// Array of objects (row-oriented) -const users = [ - { Name: "Alice", Age: 25, Score: 88.5 }, - { Name: "Bob", Age: 30, Score: 92.0 }, - { Name: "Charlie", Age: 35, Score: 79.5 } -]; -``` - -**MicroPython Example:** -```python -# List of lists (row-oriented) -users = [ - ["Alice", 25, 88.5], - ["Bob", 30, 92.0], - ["Charlie", 35, 79.5] -] -``` - -### Cross-Platform Conversion for jsontable - -When sending `jsontable` across platforms, the system performs automatic conversion between row-oriented and column-oriented formats: - -**Sending from Julia/Python (column-oriented) to JS/MicroPython (row-oriented):** -1. Convert column-oriented dict to row-oriented array of objects -2. Serialize to JSON -3. Send with `payload_type = "jsontable"` - -**Receiving from JS/MicroPython (row-oriented) to Julia/Python (column-oriented):** -1. Deserialize JSON to row-oriented array of objects -2. Convert to column-oriented dict -3. Create DataFrame from column-oriented dict - -**Example: Julia to JavaScript** -```julia -# Julia side - column-oriented DataFrame -df = DataFrame( - "Name" => ["Alice", "Bob", "Charlie"], - "Age" => [25, 30, 35], - "Score" => [88.5, 92.0, 79.5] -) - -# smartsend automatically converts to row-oriented JSON -env, env_json_str = smartsend( - "/data", - [("users", df, "jsontable")] -) -# JSON sent: [{"Name":"Alice","Age":25,"Score":88.5}, ...] -``` - -```javascript -// JavaScript side - receives row-oriented array -const [env, env_json_str] = await NATSBridge.smartsend( - "/data", - [["users", users, "jsontable"]] -); -// users is already row-oriented: [{Name: "Alice", Age: 25, ...}, ...] -``` - -**Example: JavaScript to Julia** -```javascript -// JavaScript side - row-oriented array -const users = [ - { Name: "Alice", Age: 25, Score: 88.5 }, - { Name: "Bob", Age: 30, Score: 92.0 } -]; - -const [env, env_json_str] = await NATSBridge.smartsend( - "/data", - [["users", users, "jsontable"]] -); -``` - -```julia -# Julia side - receives and converts to column-oriented DataFrame -env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff) -# The jsontable is automatically converted to DataFrame -for (dataname, data, type) in env["payloads"] - if type == "jsontable" - # data is now a DataFrame with column-oriented structure - println(data) - # Output: - # 2×3 DataFrame - # Row ┆ Name ┆ Age ┆ Score - # ┆ String ┆ Int64 ┆ Float64 - # ─────┼────────┼──────┼─────── - # 1 ┆ Alice ┆ 25 ┆ 88.5 - # 2 ┆ Bob ┆ 30 ┆ 92.0 - end -end -``` - ---- - -## Architecture - -### Cross-Platform Claim-Check Pattern - -```mermaid -flowchart TD - A[SmartSend Function] --> B{Is payload size < 1MB?} - B -->|Yes | C[Direct Path
< 1MB] - B -->|No | D[Link Path
>= 1MB] - - C --> C1[Serialize to Buffer] - C1 --> C2[Base64/JSON encode] - C2 --> C3[Publish to NATS] - - D --> D1[Serialize to Buffer] - D1 --> D2[Upload to HTTP Server] - D2 --> D3[Publish to NATS with URL] - - style A fill:#e1f5ff,stroke:#0066cc,stroke-width:2px - style B fill:#fff4e1,stroke:#cc6600,stroke-width:2px - style C fill:#e8f5e9,stroke:#008000,stroke-width:2px - style D fill:#e8f5e9,stroke:#008000,stroke-width:2px - style C1 fill:#f5f5f5,stroke:#666,stroke-width:1px - style C2 fill:#f5f5f5,stroke:#666,stroke-width:1px - style C3 fill:#f5f5f5,stroke:#666,stroke-width:1px - style D1 fill:#f5f5f5,stroke:#666,stroke-width:1px - style D2 fill:#f5f5f5,stroke:#666,stroke-width:1px - style D3 fill:#f5f5f5,stroke:#666,stroke-width:1px -``` - -**Claim-Check Pattern Overview:** -- **Direct Path** (< 1MB): Payload is serialized, Base64-encoded, and published directly to NATS -- **Link Path** (≥ 1MB): Payload is serialized, uploaded to an HTTP file server, and only the URL is published to NATS (claim-check pattern) - -### smartsend Return Value - -All platforms return a tuple/array containing both the envelope and JSON string: - -#### Julia - -```julia -env, env_json_str = smartsend(...) -# Returns: ::Tuple{msg_envelope_v1, String} -# env::msg_envelope_v1 - The envelope object with all metadata and payloads -# env_json_str::String - JSON string for publishing to NATS -``` - -#### JavaScript - -```javascript -const [env, env_json_str] = await smartsend(...); -// Returns: Promise<[env, env_json_str]> -// env: Object with all metadata and payloads -// env_json_str: String for publishing to NATS -``` - -#### Python - -```python -env, env_json_str = await smartsend(...) -# Returns: Tuple[Dict, str] -# env: Dict with all metadata and payloads -# env_json_str: String for publishing to NATS -``` - -#### MicroPython - -```python -env, env_json_str = NATSBridge.smartsend(...) -# Returns: Tuple[Dict, str] -# Note: MicroPython returns plain dicts (no structured envelope object) -``` - ---- - -## Installation - -### Julia Dependencies - -```julia -using Pkg -Pkg.add("NATS") -Pkg.add("Arrow") -Pkg.add("JSON3") -Pkg.add("HTTP") -Pkg.add("UUIDs") -Pkg.add("Dates") -``` - -### JavaScript Dependencies (Node.js) - -```bash -npm install nats apache-arrow node-fetch -# or -yarn add nats apache-arrow node-fetch -``` - -**Note:** Node.js has a built-in `crypto` module for UUID generation, so no external `uuid` package is needed. - -### JavaScript Dependencies (Browser) - -```bash -npm install nats apache-arrow -# or use CDN: -# https://unpkg.com/nats-js/dist/bundle/nats.min.js -# https://unpkg.com/apache-arrow/arrow.min.js -``` - -**Note:** For browser UUID generation, use the built-in `crypto.randomUUID()` API (available in modern browsers) or a lightweight alternative like `uuidv4` package. - -### Python Dependencies (Desktop) - -```bash -pip install nats-py aiohttp pyarrow pandas python-dateutil -``` - -### MicroPython Dependencies - -MicroPython uses built-in modules: -- `network` - NATS connection (custom implementation) -- `time` - Timestamps -- `uos` - File operations -- `base64` - Base64 encoding -- `json` - JSON parsing -- `struct` - Binary data handling - ---- - -## Usage Tutorial - -### Step 1: Start NATS Server - -```bash -docker run -p 4222:4222 nats:latest -``` - -### Step 2: Start HTTP File Server (optional) - -```bash -# Create a directory for file uploads -mkdir -p /tmp/fileserver - -# Use any HTTP server that supports POST for file uploads -# Example: Python's built-in server -python3 -m http.server 8080 --directory /tmp/fileserver -``` - -### Step 3: Run Test Scenarios - -```bash -# Julia tests -julia test/test_julia_to_julia_text_sender.jl -julia test/test_julia_to_julia_text_receiver.jl - -# JavaScript tests (Node.js) -node test/test_js_text_sender.js -node test/test_js_text_receiver.js - -# Python tests -python3 test/test_py_text_sender.py -python3 test/test_py_text_receiver.py -``` - ---- - -## Platform-Specific Implementations - -### Julia Implementation - -#### Module Structure - -```julia -module NATSBridge - using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64 - - # Constants - const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - const DEFAULT_BROKER_URL = "nats://localhost:4222" - const DEFAULT_FILESERVER_URL = "http://localhost:8080" - - # Structs - struct msg_payload_v1 - id::String - dataname::String - payload_type::String - transport::String - encoding::String - size::Integer - data::Any - metadata::Dict{String, Any} - end - - struct msg_envelope_v1 - correlation_id::String - msg_id::String - timestamp::String - send_to::String - msg_purpose::String - sender_name::String - sender_id::String - receiver_name::String - receiver_id::String - reply_to::String - reply_to_msg_id::String - broker_url::String - metadata::Dict{String, Any} - payloads::Vector{msg_payload_v1} - end - - # Main functions - function smartsend(...) end - function smartreceive(...) end - - # Utility functions - function _serialize_data(...) end - function _deserialize_data(...) end - function envelope_to_json(...) end - function log_trace(...) end - - # File server handlers - function plik_oneshot_upload(...) end - function _fetch_with_backoff(...) end - function publish_message(...) end - - # Internal helpers - function _get_payload_bytes(...) end -end -``` - -#### Multiple Dispatch Pattern - -Julia leverages multiple dispatch for type-specific implementations: - -```julia -# publish_message has two overloads based on argument types -function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) - conn = NATS.connect(broker_url) - publish_message(conn, subject, message, correlation_id) -end - -function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) - try - NATS.publish(conn, subject, message) - log_trace(correlation_id, "Message published to $subject") - finally - NATS.drain(conn) - end -end - -# Type-specific serialization -function _serialize_data(data::String, payload_type::String) - # Text handling - return Vector{UInt8}(data) -end - -function _serialize_data(data::Dict, payload_type::String) - # Dictionary handling - json_str = JSON.json(data) - return Vector{UInt8}(json_str) -end - -function _serialize_data(data::DataFrame, payload_type::String) - # Table handling - arrowtable - io = IOBuffer() - Arrow.write(io, data) - return take!(io) -end -``` - -#### smartsend Implementation - -```julia -function smartsend( - subject::String, - data::AbstractArray{Tuple{String, T1, String}, 1}; - broker_url::String = DEFAULT_BROKER_URL, - fileserver_url = DEFAULT_FILESERVER_URL, - fileserver_upload_handler::Function = plik_oneshot_upload, - size_threshold::Int = DEFAULT_SIZE_THRESHOLD, - correlation_id::String = string(uuid4()), - msg_purpose::String = "chat", - sender_name::String = "NATSBridge", - receiver_name::String = "", - receiver_id::String = "", - reply_to::String = "", - reply_to_msg_id::String = "", - is_publish::Bool = true, - NATS_connection::Union{NATS.Connection, Nothing} = nothing, - msg_id::String = string(uuid4()), - sender_id::String = string(uuid4()) -)::Tuple{msg_envelope_v1, String} where {T1<:Any} - - log_trace(correlation_id, "Starting smartsend for subject: $subject") - - # Process each payload in the list - payloads = msg_payload_v1[] - for (dataname, payload_data, payload_type) in data - # Serialize data based on type - payload_bytes = _serialize_data(payload_data, payload_type) - - payload_size = length(payload_bytes) - log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes") - - # Decision: Direct vs Link - if payload_size < size_threshold - # Direct path - Base64 encode and send via NATS - payload_b64 = Base64.base64encode(payload_bytes) - log_trace(correlation_id, "Using direct transport for $payload_size bytes") - - payload = msg_payload_v1( - payload_b64, - payload_type; - id = string(uuid4()), - dataname = dataname, - transport = "direct", - encoding = "base64", - size = payload_size, - metadata = Dict{String, Any}("payload_bytes" => payload_size) - ) - push!(payloads, payload) - else - # Link path - Upload to HTTP server, send URL via NATS - log_trace(correlation_id, "Using link transport, uploading to fileserver") - - response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) - - if response["status"] != 200 - error("Failed to upload data to fileserver: $(response["status"])") - end - - url = response["url"] - log_trace(correlation_id, "Uploaded to URL: $url") - - payload = msg_payload_v1( - url, - payload_type; - id = string(uuid4()), - dataname = dataname, - transport = "link", - encoding = "none", - size = payload_size, - metadata = Dict{String, Any}() - ) - push!(payloads, payload) - end - end - - # Create msg_envelope_v1 with all payloads - # Note: First positional argument is "send_to" (the NATS subject), not "subject" - env = msg_envelope_v1( - subject, # send_to: NATS subject to publish to - payloads; - correlation_id = correlation_id, - msg_id = msg_id, - msg_purpose = msg_purpose, - sender_name = sender_name, - sender_id = sender_id, - receiver_name = receiver_name, - receiver_id = receiver_id, - reply_to = reply_to, - reply_to_msg_id = reply_to_msg_id, - broker_url = broker_url, - metadata = Dict{String, Any}(), - ) - - env_json_str = envelope_to_json(env) - - if is_publish == false - # skip publish - elseif is_publish == true && NATS_connection === nothing - publish_message(broker_url, subject, env_json_str, correlation_id) - elseif is_publish == true && NATS_connection !== nothing - publish_message(NATS_connection, subject, env_json_str, correlation_id) - end - - return (env, env_json_str) -end -``` - -#### smartreceive Implementation - -```julia -function smartreceive( - msg::NATS.Msg; - fileserver_download_handler::Function = _fetch_with_backoff, - max_retries::Int = 5, - base_delay::Int = 100, - max_delay::Int = 5000 -)::JSON.Object{String, Any} - # Parse the JSON envelope - env_json_obj = JSON.parse(String(msg.payload)) - log_trace(env_json_obj["correlation_id"], "Processing received message") - - # Process all payloads in the envelope - payloads_list = Tuple{String, Any, String}[] - - num_payloads = length(env_json_obj["payloads"]) - - for i in 1:num_payloads - payload = env_json_obj["payloads"][i] - transport = String(payload["transport"]) - dataname = String(payload["dataname"]) - - if transport == "direct" - log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'") - - # Extract base64 payload from the payload - payload_b64 = String(payload["data"]) - - # Decode Base64 payload - payload_bytes = Base64.base64decode(payload_b64) - - # Deserialize based on type - data_type = String(payload["payload_type"]) - data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"]) - - push!(payloads_list, (dataname, data, data_type)) - elseif transport == "link" - # Extract download URL from the payload - url = String(payload["data"]) - log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") - - # Fetch with exponential backoff using the download handler - downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"]) - - # Deserialize based on type - data_type = String(payload["payload_type"]) - data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"]) - - push!(payloads_list, (dataname, data, data_type)) - else - error("Unknown transport type for payload '$dataname': $(transport)") - end - end - env_json_obj["payloads"] = payloads_list - return env_json_obj -end -``` - -#### _serialize_data Implementation - -```julia -function _serialize_data(data::Any, payload_type::String) - if payload_type == "text" - if isa(data, String) - data_bytes = Vector{UInt8}(data) - return data_bytes - else - error("Text data must be a String") - end - elseif payload_type == "dictionary" - json_str = JSON.json(data) - json_str_bytes = Vector{UInt8}(json_str) - return json_str_bytes - elseif payload_type == "arrowtable" - # Serialize DataFrame to Arrow IPC format - io = IOBuffer() - Arrow.write(io, data) - return take!(io) - elseif payload_type == "jsontable" - # Convert column-oriented to row-oriented JSON - # data is Vector{NamedTuple} or Vector{Dict} - json_str = JSON.json(data) - return Vector{UInt8}(json_str) - elseif payload_type == "image" - if isa(data, Vector{UInt8}) - return data - else - error("Image data must be Vector{UInt8}") - end - elseif payload_type == "audio" - if isa(data, Vector{UInt8}) - return data - else - error("Audio data must be Vector{UInt8}") - end - elseif payload_type == "video" - if isa(data, Vector{UInt8}) - return data - else - error("Video data must be Vector{UInt8}") - end - elseif payload_type == "binary" - if isa(data, IOBuffer) - return take!(data) - elseif isa(data, Vector{UInt8}) - return data - else - error("Binary data must be binary (Vector{UInt8} or IOBuffer)") - end - else - error("Unknown payload_type: $payload_type") - end -end -``` - -#### _deserialize_data Implementation - -```julia -function _deserialize_data( - data::Vector{UInt8}, - payload_type::String, - correlation_id::String -) - if payload_type == "text" - return String(data) - elseif payload_type == "dictionary" - json_str = String(data) - return JSON.parse(json_str) - elseif payload_type == "arrowtable" - # Deserialize from Arrow IPC format - io = IOBuffer(data) - arrow_table = Arrow.Table(io) - return arrow_table - elseif payload_type == "jsontable" - # Deserialize from JSON format - # Returns Vector{NamedTuple} (column-oriented compatible) - json_str = String(data) - parsed = JSON.parse(json_str) - return parsed - elseif payload_type == "image" - return data - elseif payload_type == "audio" - return data - elseif payload_type == "video" - return data - elseif payload_type == "binary" - return data - else - error("Unknown payload_type: $payload_type") - end -end -``` - -#### _fetch_with_backoff Implementation - -```julia -function _fetch_with_backoff( - url::String, - max_retries::Int, - base_delay::Int, - max_delay::Int, - correlation_id::String -) - delay = base_delay - for attempt in 1:max_retries - try - response = HTTP.request("GET", url) - if response.status == 200 - log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") - return response.body - else - error("Failed to fetch: $(response.status)") - end - catch e - log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") - - if attempt < max_retries - sleep(delay / 1000.0) - delay = min(delay * 2, max_delay) - end - end - end - - error("Failed to fetch data after $max_retries attempts") -end -``` - -#### plik_oneshot_upload Implementation - -```julia -function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8}) - # Get upload id - url_getUploadID = "$file_server_url/upload" - headers = ["Content-Type" => "application/json"] - body = """{ "OneShot" : true }""" - http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) - response_json = JSON.parse(http_response.body) - uploadid = response_json["id"] - uploadtoken = response_json["uploadToken"] - - # Upload file - file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") - url_upload = "$file_server_url/file/$uploadid" - headers = ["X-UploadToken" => uploadtoken] - - form = HTTP.Form(Dict( - "file" => file_multipart - )) - - http_response = nothing - try - http_response = HTTP.post(url_upload, headers, form) - catch e - @error "Request failed" exception=e - end - response_json = JSON.parse(http_response.body) - fileid = response_json["id"] - - url = "$file_server_url/file/$uploadid/$fileid/$dataname" - - return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) -end -``` - ---- - -### JavaScript Implementation - -#### Module Structure - -```javascript -// natsbridge.js -const nats = require('nats'); -const crypto = require('crypto'); -const fetch = require('node-fetch'); - -// UUID generation using built-in crypto module -const uuidv4 = () => crypto.randomUUID(); - -const DEFAULT_SIZE_THRESHOLD = 1_000_000; -const DEFAULT_BROKER_URL = 'nats://localhost:4222'; -const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; - -class NATSClient { - constructor(url) { - this.url = url; - this.connection = null; - } - - async connect() { - this.connection = await nats.connect({ servers: this.url }); - return this.connection; - } - - async publish(subject, message) { - if (!this.connection) { - await this.connect(); - } - await this.connection.publish(subject, message); - } - - async close() { - if (this.connection) { - this.connection.close(); - } - } -} - -async function smartsend(subject, data, options = {}) { - // Implementation -} - -async function smartreceive(msg, options = {}) { - // Implementation -} - -module.exports = { - NATSClient, - smartsend, - smartreceive, - plikOneshotUpload, - fetchWithBackoff -}; -``` - -#### smartsend Implementation - -```javascript -const nats = require('nats'); -const crypto = require('crypto'); -const fetch = require('node-fetch'); -const arrow = require('apache-arrow'); - -// UUID generation using built-in crypto module -const uuidv4 = () => crypto.randomUUID(); - -const DEFAULT_SIZE_THRESHOLD = 1_000_000; -const DEFAULT_BROKER_URL = 'nats://localhost:4222'; -const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; - -async function smartsend(subject, data, options = {}) { - const { - broker_url = DEFAULT_BROKER_URL, - fileserver_url = DEFAULT_FILESERVER_URL, - fileserver_upload_handler = plikOneshotUpload, - size_threshold = DEFAULT_SIZE_THRESHOLD, - correlation_id = uuidv4(), - msg_purpose = 'chat', - sender_name = 'NATSBridge', - receiver_name = '', - receiver_id = '', - reply_to = '', - reply_to_msg_id = '', - is_publish = true, - nats_connection = null, - msg_id = uuidv4(), - sender_id = uuidv4() - } = options; - - console.log(`[Correlation: ${correlation_id}] Starting smartsend for subject: ${subject}`); - - // Process payloads - const payloads = []; - for (const [dataname, payloadData, payloadType] of data) { - const payloadBytes = await serializeData(payloadData, payloadType); - const payloadSize = payloadBytes.byteLength; - - console.log(`[Correlation: ${correlation_id}] Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); - - if (payloadSize < size_threshold) { - // Direct path - const payloadB64 = bufferToBase64(payloadBytes); - console.log(`[Correlation: ${correlation_id}] Using direct transport for ${payloadSize} bytes`); - - payloads.push({ - id: uuidv4(), - dataname, - payload_type: payloadType, - transport: 'direct', - encoding: 'base64', - size: payloadSize, - data: payloadB64, - metadata: { payload_bytes: payloadSize } - }); - } else { - // Link path - console.log(`[Correlation: ${correlation_id}] Using link transport, uploading to fileserver`); - - const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); - - if (response.status !== 200) { - throw new Error(`Failed to upload data to fileserver: ${response.status}`); - } - - console.log(`[Correlation: ${correlation_id}] Uploaded to URL: ${response.url}`); - - payloads.push({ - id: uuidv4(), - dataname, - payload_type: payloadType, - transport: 'link', - encoding: 'none', - size: payloadSize, - data: response.url, - metadata: {} - }); - } - } - - // Build envelope - const env = { - correlation_id, - msg_id, - timestamp: new Date().toISOString(), - send_to: subject, - msg_purpose, - sender_name, - sender_id, - receiver_name, - receiver_id, - reply_to, - reply_to_msg_id, - broker_url, - metadata: {}, - payloads - }; - - const env_json_str = JSON.stringify(env); - - if (is_publish) { - if (nats_connection) { - await publishMessage(nats_connection, subject, env_json_str, correlation_id); - } else { - await publishMessage(broker_url, subject, env_json_str, correlation_id); - } - } - - return [env, env_json_str]; -} -``` - -#### serializeData Implementation - -```javascript -const arrow = require('apache-arrow'); - -async function serializeData(data, payload_type) { - if (payload_type === 'text') { - if (typeof data === 'string') { - return Buffer.from(data, 'utf8'); - } else { - throw new Error('Text data must be a string'); - } - } else if (payload_type === 'dictionary') { - const jsonStr = JSON.stringify(data); - return Buffer.from(jsonStr, 'utf8'); - } else if (payload_type === 'arrowtable') { - // Convert Array to Arrow IPC - // data is row-oriented: [{id: 1, name: "Alice"}, ...] - if (!Array.isArray(data) || data.length === 0) { - throw new Error('arrowtable data must be a non-empty array of objects'); - } - - // Create schema from first row - const schemaFields = Object.keys(data[0]).map(key => - new arrow.Field(key, arrow.any()) - ); - const schema = new arrow.Schema(schemaFields); - - // Create writer - const writer = new arrow.RecordBatchWriter([schema]); - - // Write rows - for (const row of data) { - const recordBatch = arrow.recordBatch.fromObjects([row], schema); - writer.write(recordBatch); - } - await writer.close(); - - // Read buffer - return writer.toBuffer(); - } else if (payload_type === 'jsontable') { - // data is already row-oriented Array - // Serialize directly to JSON - const jsonStr = JSON.stringify(data); - return Buffer.from(jsonStr, 'utf8'); - } else if (payload_type === 'image') { - if (data instanceof Uint8Array || Buffer.isBuffer(data)) { - return Buffer.from(data); - } else { - throw new Error('Image data must be Uint8Array or Buffer'); - } - } else if (payload_type === 'audio') { - if (data instanceof Uint8Array || Buffer.isBuffer(data)) { - return Buffer.from(data); - } else { - throw new Error('Audio data must be Uint8Array or Buffer'); - } - } else if (payload_type === 'video') { - if (data instanceof Uint8Array || Buffer.isBuffer(data)) { - return Buffer.from(data); - } else { - throw new Error('Video data must be Uint8Array or Buffer'); - } - } else if (payload_type === 'binary') { - if (data instanceof Uint8Array || Buffer.isBuffer(data)) { - return Buffer.from(data); - } else { - throw new Error('Binary data must be Uint8Array or Buffer'); - } - } else { - throw new Error(`Unknown payload_type: ${payload_type}`); - } -} - -function bufferToBase64(buffer) { - return buffer.toString('base64'); -} -``` - -#### deserializeData Implementation - -```javascript -const arrow = require('apache-arrow'); - -async function deserializeData(data, payload_type, correlation_id) { - if (payload_type === 'text') { - return Buffer.from(data).toString('utf8'); - } else if (payload_type === 'dictionary') { - const jsonStr = Buffer.from(data).toString('utf8'); - return JSON.parse(jsonStr); - } else if (payload_type === 'arrowtable') { - // Deserialize from Arrow IPC - const buffer = Buffer.from(data); - const table = arrow.tableFromRawBytes(buffer); - return table; - } else if (payload_type === 'jsontable') { - // Deserialize from JSON - returns Array (row-oriented) - const jsonStr = Buffer.from(data).toString('utf8'); - return JSON.parse(jsonStr); - } else if (payload_type === 'image') { - return Buffer.from(data); - } else if (payload_type === 'audio') { - return Buffer.from(data); - } else if (payload_type === 'video') { - return Buffer.from(data); - } else if (payload_type === 'binary') { - return Buffer.from(data); - } else { - throw new Error(`Unknown payload_type: ${payload_type}`); - } -} -``` - -#### fetchWithBackoff Implementation - -```javascript -async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) { - let delay = base_delay; - - for (let attempt = 1; attempt <= max_retries; attempt++) { - try { - const response = await fetch(url); - - if (response.status === 200) { - console.log(`[Correlation: ${correlation_id}] Successfully fetched data from ${url} on attempt ${attempt}`); - return await response.arrayBuffer(); - } else { - throw new Error(`Failed to fetch: ${response.status}`); - } - } catch (e) { - console.log(`[Correlation: ${correlation_id}] Attempt ${attempt} failed: ${e.constructor.name}`); - - if (attempt < max_retries) { - await new Promise(resolve => setTimeout(resolve, delay)); - delay = Math.min(delay * 2, max_delay); - } - } - } - - throw new Error(`Failed to fetch data after ${max_retries} attempts`); -} -``` - -#### plikOneshotUpload Implementation - -```javascript -async function plikOneshotUpload(file_server_url, dataname, data) { - // Get upload id - const url_getUploadID = `${file_server_url}/upload`; - const headers = { 'Content-Type': 'application/json' }; - const body = JSON.stringify({ OneShot: true }); - - const http_response = await fetch(url_getUploadID, { - method: 'POST', - headers, - body - }); - - const response_json = await http_response.json(); - const uploadid = response_json.id; - const uploadtoken = response_json.uploadToken; - - // Upload file - const url_upload = `${file_server_url}/file/${uploadid}`; - const form = new FormData(); - const blob = new Blob([data]); - form.append('file', blob, dataname); - - const upload_headers = { - 'X-UploadToken': uploadtoken - }; - - const upload_response = await fetch(url_upload, { - method: 'POST', - headers: upload_headers, - body: form - }); - - const upload_json = await upload_response.json(); - const fileid = upload_json.id; - - const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`; - - return { - status: upload_response.status, - uploadid, - fileid, - url - }; -} -``` - ---- - -### Python Implementation - -#### Module Structure - -```python -# natsbridge.py -import asyncio -import base64 -import json -import uuid -import time -from typing import Any, Dict, List, Tuple, Union, Callable -from dataclasses import dataclass, field -from datetime import datetime - -try: - import pyarrow as arrow - import pyarrow.parquet as pq - ARROW_AVAILABLE = True -except ImportError: - ARROW_AVAILABLE = False - -try: - import aiohttp - import nats - from nats.aio.client import Client as NATSClient - NATS_AVAILABLE = True -except ImportError: - NATS_AVAILABLE = False - - -DEFAULT_SIZE_THRESHOLD = 1_000_000 -DEFAULT_BROKER_URL = "nats://localhost:4222" -DEFAULT_FILESERVER_URL = "http://localhost:8080" - - -@dataclass -class MsgPayloadV1: - """Message payload structure.""" - id: str - dataname: str - payload_type: str - transport: str - encoding: str - size: int - data: Union[str, bytes] - metadata: Dict[str, Any] = field(default_factory=dict) - - -@dataclass -class MsgEnvelopeV1: - """Message envelope structure.""" - correlation_id: str - msg_id: str - timestamp: str - send_to: str - msg_purpose: str - sender_name: str - sender_id: str - receiver_name: str - receiver_id: str - reply_to: str - reply_to_msg_id: str - broker_url: str - metadata: Dict[str, Any] = field(default_factory=dict) - payloads: List[MsgPayloadV1] = field(default_factory=list) - - -class NATSBridge: - """Cross-platform NATS bridge implementation.""" - - def __init__(self, broker_url: str = None, fileserver_url: str = None): - self.broker_url = broker_url or DEFAULT_BROKER_URL - self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL - self._nats_client: NATSClient = None - - async def smartsend(self, subject: str, data: List[Tuple[str, Any, str]], **kwargs) -> Tuple[Dict, str]: - """Send data via NATS.""" - pass - - async def smartreceive(self, msg: Any, **kwargs) -> Dict: - """Receive and process NATS message.""" - pass -``` - -#### smartsend Implementation - -```python -import asyncio -import base64 -import json -import uuid -from typing import Any, Dict, List, Tuple, Union, Callable -from datetime import datetime - -DEFAULT_SIZE_THRESHOLD = 1_000_000 -DEFAULT_BROKER_URL = "nats://localhost:4222" -DEFAULT_FILESERVER_URL = "http://localhost:8080" - - -async def smartsend( - subject: str, - data: List[Tuple[str, Any, str]], - broker_url: str = DEFAULT_BROKER_URL, - fileserver_url: str = DEFAULT_FILESERVER_URL, - fileserver_upload_handler: Callable = plik_oneshot_upload, - size_threshold: int = DEFAULT_SIZE_THRESHOLD, - correlation_id: str = None, - msg_purpose: str = "chat", - sender_name: str = "NATSBridge", - receiver_name: str = "", - receiver_id: str = "", - reply_to: str = "", - reply_to_msg_id: str = "", - is_publish: bool = True, - nats_connection: Any = None, - msg_id: str = None, - sender_id: str = None -) -> Tuple[Dict, str]: - """ - Send data via NATS with automatic transport selection. - - Args: - subject: NATS subject to publish to - data: List of (dataname, data, type) tuples - **kwargs: Additional options - - Returns: - Tuple of (env, env_json_str) - """ - if correlation_id is None: - correlation_id = str(uuid.uuid4()) - if msg_id is None: - msg_id = str(uuid.uuid4()) - if sender_id is None: - sender_id = str(uuid.uuid4()) - - print(f"[Correlation: {correlation_id}] Starting smartsend for subject: {subject}") - - # Process payloads - payloads = [] - for dataname, payload_data, payload_type in data: - payload_bytes = _serialize_data(payload_data, payload_type) - payload_size = len(payload_bytes) - - print(f"[Correlation: {correlation_id}] Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes") - - if payload_size < size_threshold: - # Direct path - payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') - print(f"[Correlation: {correlation_id}] Using direct transport for {payload_size} bytes") - - payloads.append({ - 'id': str(uuid.uuid4()), - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'direct', - 'encoding': 'base64', - 'size': payload_size, - 'data': payload_b64, - 'metadata': {'payload_bytes': payload_size} - }) - else: - # Link path - print(f"[Correlation: {correlation_id}] Using link transport, uploading to fileserver") - - response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes) - - if response['status'] != 200: - raise Exception(f"Failed to upload data to fileserver: {response['status']}") - - print(f"[Correlation: {correlation_id}] Uploaded to URL: {response['url']}") - - payloads.append({ - 'id': str(uuid.uuid4()), - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'link', - 'encoding': 'none', - 'size': payload_size, - 'data': response['url'], - 'metadata': {} - }) - - # Build envelope - env = { - 'correlation_id': correlation_id, - 'msg_id': msg_id, - 'timestamp': datetime.utcnow().isoformat() + 'Z', - 'send_to': subject, - 'msg_purpose': msg_purpose, - 'sender_name': sender_name, - 'sender_id': sender_id, - 'receiver_name': receiver_name, - 'receiver_id': receiver_id, - 'reply_to': reply_to, - 'reply_to_msg_id': reply_to_msg_id, - 'broker_url': broker_url, - 'metadata': {}, - 'payloads': payloads - } - - env_json_str = json.dumps(env) - - if is_publish: - if nats_connection: - await publish_message(nats_connection, subject, env_json_str, correlation_id) - else: - await publish_message(broker_url, subject, env_json_str, correlation_id) - - return env, env_json_str -``` - -#### serializeData Implementation - -```python -import base64 -import json -from typing import Any - -try: - import pyarrow as arrow - import pyarrow.feather as feather - import pyarrow.ipc as ipc - ARROW_AVAILABLE = True -except ImportError: - ARROW_AVAILABLE = False - - -def _serialize_data(data: Any, payload_type: str) -> bytes: - """Serialize data to bytes based on type.""" - if payload_type == 'text': - if isinstance(data, str): - return data.encode('utf-8') - else: - raise Error('Text data must be a string') - elif payload_type == 'dictionary': - json_str = json.dumps(data) - return json_str.encode('utf-8') - elif payload_type == 'arrowtable': - if not ARROW_AVAILABLE: - raise Error('pyarrow not available for table serialization') - - import io - buf = io.BytesIO() - import pandas as pd - if isinstance(data, pd.DataFrame): - # Column-oriented DataFrame to Arrow - table = arrow.Table.from_pandas(data) - sink = arrow.ipc.new_file(buf) - arrow.ipc.write_table(table, sink) - sink.close() - return buf.getvalue() - else: - raise Error('arrowtable data must be a pandas DataFrame') - elif payload_type == 'jsontable': - # data is list[dict] or list (row-oriented) - # Serialize directly to JSON - json_str = json.dumps(data) - return json_str.encode('utf-8') - elif payload_type == 'image': - if isinstance(data, (bytes, bytearray)): - return bytes(data) - else: - raise Error('Image data must be bytes') - elif payload_type == 'audio': - if isinstance(data, (bytes, bytearray)): - return bytes(data) - else: - raise Error('Audio data must be bytes') - elif payload_type == 'video': - if isinstance(data, (bytes, bytearray)): - return bytes(data) - else: - raise Error('Video data must be bytes') - elif payload_type == 'binary': - if isinstance(data, (bytes, bytearray)): - return bytes(data) - else: - raise Error('Binary data must be bytes') - else: - raise Error(f'Unknown payload_type: {payload_type}') -``` - -#### deserializeData Implementation - -```python -import base64 -import json -from typing import Any - -try: - import pyarrow as arrow - import pyarrow.feather as feather - import pyarrow.ipc as ipc - ARROW_AVAILABLE = True -except ImportError: - ARROW_AVAILABLE = False - - -def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any: - """Deserialize bytes to data based on type.""" - if payload_type == 'text': - return data.decode('utf-8') - elif payload_type == 'dictionary': - json_str = data.decode('utf-8') - return json.loads(json_str) - elif payload_type == 'arrowtable': - if not ARROW_AVAILABLE: - raise Error('pyarrow not available for table deserialization') - - import io - buf = io.BytesIO(data) - reader = arrow.ipc.open_file(buf) - return reader.read_all().to_pandas() - elif payload_type == 'jsontable': - # Deserialize from JSON - returns list[dict] (row-oriented) - json_str = data.decode('utf-8') - return json.loads(json_str) - elif payload_type == 'image': - return data - elif payload_type == 'audio': - return data - elif payload_type == 'video': - return data - elif payload_type == 'binary': - return data - else: - raise Error(f'Unknown payload_type: {payload_type}') -``` - -#### fetchWithBackoff Implementation - -```python -import asyncio -import aiohttp -from typing import Callable - - -async def fetch_with_backoff( - url: str, - max_retries: int, - base_delay: int, - max_delay: int, - correlation_id: str -) -> bytes: - """Fetch URL with exponential backoff.""" - delay = base_delay - - for attempt in range(1, max_retries + 1): - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - print(f"[Correlation: {correlation_id}] Successfully fetched data from {url} on attempt {attempt}") - return await response.read() - else: - raise Exception(f"Failed to fetch: {response.status}") - except Exception as e: - print(f"[Correlation: {correlation_id}] Attempt {attempt} failed: {type(e).__name__}") - - if attempt < max_retries: - await asyncio.sleep(delay / 1000.0) - delay = min(delay * 2, max_delay) - - raise Exception(f"Failed to fetch data after {max_retries} attempts") -``` - -#### plikOneshotUpload Implementation - -```python -import aiohttp -import json -from typing import Dict, Any - - -async def plik_oneshot_upload( - file_server_url: str, - dataname: str, - data: bytes -) -> Dict[str, Any]: - """Upload data to plik server in one-shot mode.""" - - # Get upload id - async with aiohttp.ClientSession() as session: - url_getUploadID = f"{file_server_url}/upload" - headers = {'Content-Type': 'application/json'} - body = json.dumps({"OneShot": True}) - - async with session.post(url_getUploadID, headers=headers, data=body) as response: - response_json = await response.json() - uploadid = response_json['id'] - uploadtoken = response_json['uploadToken'] - - # Upload file - url_upload = f"{file_server_url}/file/{uploadid}" - headers = {'X-UploadToken': uploadtoken} - - form = aiohttp.FormData() - form.add_field('file', data, filename=dataname, content_type='application/octet-stream') - - async with session.post(url_upload, headers=headers, data=form) as upload_response: - upload_json = await upload_response.json() - fileid = upload_json['id'] - - url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}" - - return { - 'status': upload_response.status, - 'uploadid': uploadid, - 'fileid': fileid, - 'url': url - } -``` - ---- - -## MicroPython Implementation - -### Limitations - -MicroPython has significant constraints compared to desktop implementations: - -| Feature | Desktop | MicroPython | -|---------|---------|-------------| -| Memory | Unlimited | ~256KB - 1MB | -| Arrow IPC | ✅ | ❌ (not supported) | -| Async/Await | ✅ | ⚠️ (uasyncio only) | -| Large payloads (>1MB) | ✅ | ❌ (enforced limit) | -| arrowtable | ✅ | ❌ | -| jsontable | ⚠️ (limited) | ⚠️ (limited) | -| Multiple payloads | ✅ | ⚠️ (limited) | - -### MicroPython Module Structure - -```python -# natsbridge_mpy.py (MicroPython) -import network -import time -import json -import base64 -import uos -import struct - -# Constants -DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython -DEFAULT_BROKER_URL = "nats://localhost:4222" -DEFAULT_FILESERVER_URL = "http://localhost:8080" -MAX_PAYLOAD_SIZE = 50000 # Hard limit - -# Note: MicroPython uses list[list] for jsontable (row-oriented) -# No DataFrame support - data is always row-oriented - - -class NATSBridge: - """MicroPython NATS bridge implementation.""" - - def __init__(self, broker_url=None, fileserver_url=None): - self.broker_url = broker_url or DEFAULT_BROKER_URL - self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL - self._nats_conn = None - - def smartsend(self, subject, data, **kwargs): - """Send data (synchronous).""" - correlation_id = self._generate_uuid() - msg_id = self._generate_uuid() - sender_id = self._generate_uuid() - - print(f"[Correlation: {correlation_id}] Starting smartsend") - - payloads = [] - for dataname, payload_data, payload_type in data: - payload_bytes = self._serialize_data(payload_data, payload_type) - payload_size = len(payload_bytes) - - if payload_size > MAX_PAYLOAD_SIZE: - raise MemoryError(f"Payload {dataname} exceeds max size {MAX_PAYLOAD_SIZE}") - - if payload_size < DEFAULT_SIZE_THRESHOLD: - # Direct path - payload_b64 = base64.b64encode(payload_bytes).decode('ascii') - payloads.append({ - 'id': self._generate_uuid(), - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'direct', - 'encoding': 'base64', - 'size': payload_size, - 'data': payload_b64 - }) - else: - # Link path (limited support) - response = self._sync_fileserver_upload(self.fileserver_url, dataname, payload_bytes) - payloads.append({ - 'id': self._generate_uuid(), - 'dataname': dataname, - 'payload_type': payload_type, - 'transport': 'link', - 'encoding': 'none', - 'size': payload_size, - 'data': response['url'] - }) - - env = { - 'correlation_id': correlation_id, - 'msg_id': msg_id, - 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), - 'send_to': subject, - 'msg_purpose': kwargs.get('msg_purpose', 'chat'), - 'sender_name': kwargs.get('sender_name', 'NATSBridge'), - 'sender_id': sender_id, - 'receiver_name': kwargs.get('receiver_name', ''), - 'receiver_id': kwargs.get('receiver_id', ''), - 'reply_to': kwargs.get('reply_to', ''), - 'reply_to_msg_id': kwargs.get('reply_to_msg_id', ''), - 'broker_url': self.broker_url, - 'metadata': {}, - 'payloads': payloads - } - - env_json_str = json.dumps(env) - - # Publish - self._publish(subject, env_json_str, correlation_id) - - return env, env_json_str - - def smartreceive(self, msg, **kwargs): - """Receive and process message (synchronous).""" - env_json_obj = json.loads(msg.payload) - correlation_id = env_json_obj['correlation_id'] - - payloads_list = [] - for payload in env_json_obj['payloads']: - transport = payload['transport'] - dataname = payload['dataname'] - - if transport == 'direct': - payload_b64 = payload['data'] - payload_bytes = base64.b64decode(payload_b64) - data_type = payload['payload_type'] - data = self._deserialize_data(payload_bytes, data_type) - payloads_list.append((dataname, data, data_type)) - elif transport == 'link': - url = payload['data'] - downloaded_data = self._sync_fileserver_download( - url, - kwargs.get('max_retries', 3), - kwargs.get('base_delay', 100), - kwargs.get('max_delay', 1000), - correlation_id - ) - data_type = payload['payload_type'] - data = self._deserialize_data(downloaded_data, data_type) - payloads_list.append((dataname, data, data_type)) - - env_json_obj['payloads'] = payloads_list - return env_json_obj - - def _serialize_data(self, data, payload_type): - """Serialize data (MicroPython version - no arrowtable support).""" - if payload_type == 'text': - return data.encode('utf-8') - elif payload_type == 'dictionary': - return json.dumps(data).encode('utf-8') - elif payload_type == 'jsontable': - # data is list[list] (row-oriented) - return json.dumps(data).encode('utf-8') - elif payload_type in ('image', 'audio', 'video', 'binary'): - return bytes(data) - else: - raise ValueError(f"Unknown payload_type: {payload_type}") - - def _deserialize_data(self, data, payload_type): - """Deserialize data (MicroPython version).""" - if payload_type == 'text': - return data.decode('utf-8') - elif payload_type == 'dictionary': - return json.loads(data.decode('utf-8')) - elif payload_type == 'jsontable': - # Returns list[list] (row-oriented) - return json.loads(data.decode('utf-8')) - elif payload_type in ('image', 'audio', 'video', 'binary'): - return data - else: - raise ValueError(f"Unknown payload_type: {payload_type}") - - def _generate_uuid(self): - """Generate simple UUID (MicroPython compatible).""" - return 'mp-%04x%04x-%04x-%04x-%04x-%04x%04x%04x' % ( - time.time_ns() // (10**6) % 0xFFFFFFFF, - time.time_ns() % 0xFFFFFFFF, - time.time_ns() >> 32 & 0xFFFF, - time.time_ns() >> 48 & 0xFFFF, - time.time_ns() >> 64 & 0xFFFF, - time.time_ns() >> 80 & 0xFFFF, - time.time_ns() >> 96 & 0xFFFF, - time.time_ns() >> 112 & 0xFFFF - ) - - def _sync_fileserver_upload(self, url, dataname, data): - """Synchronous file upload (limited).""" - # Simplified implementation for MicroPython - # In practice, would use network.HTTP or similar - raise NotImplementedError("File upload not implemented in MicroPython") - - def _sync_fileserver_download(self, url, max_retries, base_delay, max_delay, correlation_id): - """Synchronous file download with backoff.""" - # Simplified implementation for MicroPython - raise NotImplementedError("File download not implemented in MicroPython") - - def _publish(self, subject, message, correlation_id): - """Publish message to NATS.""" - # Simplified implementation for MicroPython - raise NotImplementedError("NATS publishing not implemented in MicroPython") -``` - ---- - -## Configuration - -### Environment Variables - -| Variable | Default | Description | -|----------|---------|-------------| -| `NATS_URL` | `nats://localhost:4222` | NATS server URL | -| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | -| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) | - -### MicroPython Configuration - -```python -# micropython.conf -NATS_URL = "nats://broker.local:4222" -FILESERVER_URL = "http://fileserver.local:8080" -SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices -MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython -``` - ---- - -## Performance Considerations - -### Zero-Copy Reading - -| Platform | Strategy | -|----------|----------| -| **Julia** | `Arrow.read()` with memory-mapped files | -| **JavaScript** | `ArrayBuffer` with `DataView` | -| **Python** | `pyarrow` memory mapping | -| **MicroPython** | Not available (streaming only) | - -### Exponential Backoff - -All platforms implement exponential backoff for HTTP downloads: - -```python -# Python -async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id): - delay = base_delay - for attempt in range(1, max_retries + 1): - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - return await response.read() - except Exception as e: - if attempt < max_retries: - await asyncio.sleep(delay / 1000.0) - delay = min(delay * 2, max_delay) - raise Exception("Failed to fetch after max retries") -``` - -### Correlation ID Logging - -All platforms use correlation IDs for distributed tracing: - -``` -[timestamp] [Correlation: abc123] Message published to subject -``` - -### Serialization Performance - -| Format | Use Case | Pros | Cons | -|--------|----------|------|------| -| `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library, not supported in MicroPython | -| `jsontable` | Small/medium tabular data | Human-readable, universal support, works in MicroPython | Slower, larger size, no schema enforcement | - ---- - -## Testing - -### Test File Organization - -| Platform | Sender Tests | Receiver Tests | -|----------|--------------|----------------| -| **Julia** | `test/test_julia_*_sender.jl` | `test/test_julia_*_receiver.jl` | -| **JavaScript** | `test/test_js_*_sender.js` | `test/test_js_*_receiver.js` | -| **Python** | `test/test_py_*_sender.py` | `test/test_py_*_receiver.py` | - -### Run Tests - -```bash -# Julia -julia test/test_julia_text_sender.jl -julia test/test_julia_text_receiver.jl - -# JavaScript (Node.js) -node test/test_js_text_sender.js -node test/test_js_text_receiver.js - -# Python -python3 test/test_py_text_sender.py -python3 test/test_py_text_receiver.py -``` - ---- - -## Troubleshooting - -### Common Issues - -1. **NATS Connection Failed** - - Ensure NATS server is running - - Check `broker_url` configuration - -2. **HTTP Upload Failed** - - Ensure file server is running - - Check `fileserver_url` configuration - - Verify upload permissions - -3. **Arrow IPC Deserialization Error** - - Ensure data is properly serialized to Arrow format - - Check Arrow version compatibility - - MicroPython doesn't support Arrow IPC - -4. **Memory Constraints (MicroPython)** - - Reduce `size_threshold` - - Use direct transport only (< 100KB) - - Avoid large payloads - - Use `jsontable` instead of `arrowtable` (arrowtable not supported) - -5. **Row-Oriented vs Column-Oriented Conversion Issues** - - Julia/Python: DataFrames are column-oriented; when sending `jsontable`, they are converted to row-oriented JSON - - JavaScript/MicroPython: Data is natively row-oriented - - When receiving `jsontable` in Julia/Python, JSON is automatically converted back to column-oriented DataFrame - ---- - -## Summary - -This cross-platform NATS bridge provides: - -1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across all platforms -2. **Idiomatic Implementations**: - - **Julia**: Multiple dispatch, struct-based design, native Arrow IPC - - **JavaScript**: Async/await, prototype-based utilities, class-based NATS client - - **Python**: Class-based design with dataclasses, type hints, async/await - - **MicroPython**: Synchronous API, memory-constrained optimizations -3. **Message Format Consistency**: Identical JSON schemas across all platforms -4. **Handler Abstraction**: File server operations abstracted through configurable handlers -5. **Platform-Specific Optimizations**: - - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython) - - **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in all platforms) -6. **Row-Oriented ↔ Column-Oriented Conversion**: Automatic conversion between row-oriented (JS, MicroPython) and column-oriented (Julia DataFrame, Python pandas) formats when using `jsontable` - -The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior. - -### Datatype Summary - -| Datatype | Serialization | Use Case | Encoding | Supported Platforms | -|----------|---------------|----------|----------|---------------------| -| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python | -| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python, MicroPython | diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index e9912e6..d8a9050 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -31,7 +31,15 @@ # [(dataname1, data1, type1), (dataname2, data2, type2), ...] # ``` # -# Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" +# Supported types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" +# +# Table Datatypes: +# - `arrowtable`: Apache Arrow IPC format for efficient binary serialization +# - Input: DataFrame, Arrow.Table +# - Encoding: arrow-ipc +# - `jsontable`: JSON format for human-readable tabular data +# - Input: Vector{NamedTuple}, Vector{Dict} (column-oriented compatible) +# - Encoding: json module NATSBridge @@ -51,7 +59,7 @@ It supports both direct transport (base64-encoded data) and link transport (URL- # Arguments: - `id::String` - Unique identifier for this payload (e.g., "uuid4") - `dataname::String` - Name of the payload (e.g., "login_image") - - `payload_type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" + - `payload_type::String` - Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" - `transport::String` - Transport method: "direct" or "link" - `encoding::String` - Encoding method: "none", "json", "base64", "arrow-ipc" - `size::Integer` - Size of the payload in bytes (e.g., 15433) @@ -100,7 +108,7 @@ payload = msg_payload_v1( struct msg_payload_v1 id::String # id of this payload e.g. "uuid4" dataname::String # name of this payload e.g. "login_image" - payload_type::String # this payload type. Can be "text", "dictionary", "table", "image", "audio", "video", "binary" + payload_type::String # this payload type. Can be "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" transport::String # transport method: "direct" or "link" encoding::String # encoding method: "none", "json", "base64", "arrow-ipc" size::Integer # data size in bytes e.g. 15433 @@ -363,7 +371,7 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c - `data::AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples to send - `dataname::String` - Name of the payload - `data::Any` - The actual data to send - - `payload_type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" + - `payload_type::String` - Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" - No standalone `type` parameter - type is specified per payload # Keyword Arguments: @@ -399,11 +407,15 @@ env, msg_json = smartsend("my.subject", [("dataname1", data, "dictionary")]) # Send multiple payloads in one message with different types data1 = Dict("key1" => "value1") data2 = rand(10_000) # Small array -env, msg_json = smartsend("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")]) +env, msg_json = smartsend("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")]) # Send a large array using fileserver upload data = rand(10_000_000) # ~80 MB -env, msg_json = smartsend("large.data", [("large_table", data, "table")]) +env, msg_json = smartsend("large.data", [("large_arrow_table", data, "arrowtable")]) + +# Send jsontable (JSON format) +rows = [Dict("id" => 1, "name" => "Alice"), Dict("id" => 2, "name" => "Bob")] +env, msg_json = smartsend("json.data", [("users", rows, "jsontable")]) # Mixed content (e.g., chat with text and image) env, msg_json = smartsend("chat.subject", [ @@ -424,13 +436,12 @@ function smartsend( fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver size_threshold::Int = DEFAULT_SIZE_THRESHOLD, - #= - Generate a globally unique identifier (UUID) at the start of the request. - This ID must remain constant and immutable as it propagates through every - stage of the execution pipeline. It serves as the end-to-end ID for - distributed tracing, enabling the correlation of all logs, metrics, and - errors across the system back to this specific request instance. - =# + # Generate a globally unique identifier (UUID) at the start of the request. + # This ID must remain constant and immutable as it propagates through every + # stage of the execution pipeline. It serves as the end-to-end ID for + # distributed tracing, enabling the correlation of all logs, metrics, and + # errors across the system back to this specific request instance. + correlation_id::String = string(uuid4()), msg_purpose::String = "chat", @@ -463,6 +474,14 @@ function smartsend( payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string log_trace(correlation_id, "Using direct transport for $payload_size bytes") # Log transport choice + # Determine encoding based on payload_type + encoding = "base64" + if payload_type == "jsontable" + encoding = "json" + elseif payload_type == "arrowtable" + encoding = "arrow-ipc" + end + # Create msg_payload_v1 for direct transport payload = msg_payload_v1( payload_b64, @@ -470,7 +489,7 @@ function smartsend( id = string(uuid4()), dataname = dataname, transport = "direct", - encoding = "base64", + encoding = encoding, size = payload_size, metadata = Dict{String, Any}("payload_bytes" => payload_size) ) @@ -481,7 +500,7 @@ function smartsend( # Upload to HTTP server response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) - + if response["status"] != 200 # Check if upload was successful error("Failed to upload data to fileserver: $(response["status"])") # Throw error if upload failed end @@ -489,6 +508,14 @@ function smartsend( url = response["url"] # URL for the uploaded data log_trace(correlation_id, "Uploaded to URL: $url") # Log successful upload + # Determine encoding based on payload_type + encoding = "none" + if payload_type == "jsontable" + encoding = "json" + elseif payload_type == "arrowtable" + encoding = "arrow-ipc" + end + # Create msg_payload_v1 for link transport payload = msg_payload_v1( url, @@ -496,7 +523,7 @@ function smartsend( id = string(uuid4()), dataname = dataname, transport = "link", - encoding = "none", + encoding = encoding, size = payload_size, metadata = Dict{String, Any}() ) @@ -543,12 +570,13 @@ It supports multiple serialization formats for different data types. 2. Converts data to binary representation according to format rules 3. For text: converts string to UTF-8 bytes 4. For dictionary: serializes as JSON then converts to bytes -5. For table: uses Arrow.jl to write as IPC stream -6. For image/audio/video/binary: returns binary data directly +5. For arrowtable: uses Arrow.jl to write as IPC stream +6. For jsontable: converts to JSON then to bytes +7. For image/audio/video/binary: returns binary data directly # Arguments: - - `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`) - - `payload_type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary" + - `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"arrowtable"`, Vector{NamedTuple}/Vector{Dict} for `"jsontable"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`) + - `payload_type::String` - Target format: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" # Return: - `Vector{UInt8}` - Binary representation of the serialized data @@ -569,9 +597,13 @@ text_bytes = _serialize_data(text_data, "text") json_data = Dict("name" => "Alice", "age" => 30) json_bytes = _serialize_data(json_data, "dictionary") -# Table serialization with a DataFrame (recommended for tabular data) +# Arrow table serialization with a DataFrame (recommended for tabular data) df = DataFrame(id = 1:3, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) -table_bytes = _serialize_data(df, "table") +arrow_bytes = _serialize_data(df, "arrowtable") + +# JSON table serialization - Vector{NamedTuple} or Vector{Dict} +rows = [Dict("id" => 1, "name" => "Alice"), Dict("id" => 2, "name" => "Bob")] +json_bytes = _serialize_data(rows, "jsontable") # Image data (Vector{UInt8}) image_bytes = UInt8[1, 2, 3] # Image bytes @@ -622,10 +654,30 @@ function _serialize_data(data::Any, payload_type::String) json_str = JSON.json(data) # Convert Julia data to JSON string json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes return json_str_bytes - elseif payload_type == "table" # Table data - convert to Arrow IPC stream + elseif payload_type == "arrowtable" # Arrow table data - convert to Arrow IPC stream io = IOBuffer() # Create in-memory buffer Arrow.write(io, data) # Write data as Arrow IPC stream to buffer return take!(io) # Return the buffer contents as bytes + elseif payload_type == "jsontable" # JSON table data - convert to JSON + # data can be Vector{NamedTuple}, Vector{Dict}, or DataFrame + # If DataFrame, convert to Vector{Dict} first + if isa(data, DataFrame) + # Convert DataFrame to Vector{Dict} (row-oriented) + rows = [] + for i in 1:nrow(data) + row_dict = Dict() + for col in names(data) + row_dict[String(col)] = data[i, col] + end + push!(rows, row_dict) + end + json_str = JSON.json(rows) + return Vector{UInt8}(json_str) + else + # Already Vector{NamedTuple} or Vector{Dict} + json_str = JSON.json(data) + return Vector{UInt8}(json_str) + end elseif payload_type == "image" # Image data - treat as binary if isa(data, Vector{UInt8}) return data # Return binary data directly @@ -881,24 +933,25 @@ end """ _deserialize_data - Deserialize bytes to data based on type This internal function converts serialized bytes back to Julia data based on type. -It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow IPC deserialization), -"image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data). +It handles "text" (string), "dictionary" (JSON deserialization), "arrowtable" (Arrow IPC deserialization), +"jsontable" (JSON deserialization), "image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data). # Function Workflow: 1. Validates the data type against supported formats 2. Converts bytes to appropriate Julia data type based on format 3. For text: converts bytes to string 4. For dictionary: converts bytes to JSON string then parses to Julia object -5. For table: reads Arrow IPC format and returns DataFrame -6. For image/audio/video/binary: returns bytes directly +5. For arrowtable: reads Arrow IPC format and returns Arrow.Table +6. For jsontable: converts bytes to JSON string then parses to Vector{Dict} +7. For image/audio/video/binary: returns bytes directly # Arguments: - `data::Vector{UInt8}` - Serialized data as bytes - - `payload_type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") + - `payload_type::String` - Data type ("text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary") - `correlation_id::String` - Correlation ID for logging # Return: - - Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary") + - Deserialized data (String for "text", Arrow.Table for "arrowtable", Vector{Dict} for "jsontable", JSON data for "dictionary", bytes for "image", "audio", "video", "binary") # Throws: - `Error` if `payload_type` is not one of the supported types @@ -913,9 +966,13 @@ text_data = _deserialize_data(text_bytes, "text", "correlation123") json_bytes = UInt8[123, 34, 110, 97, 109, 101, 34, 58, 34, 65, 108, 105, 99, 101, 125] # {"name":"Alice"} json_data = _deserialize_data(json_bytes, "dictionary", "correlation123") -# Arrow IPC data (table) +# Arrow IPC data (arrowtable) arrow_bytes = Vector{UInt8}([1, 2, 3]) # Arrow IPC bytes -table_data = _deserialize_data(arrow_bytes, "table", "correlation123") +arrow_table = _deserialize_data(arrow_bytes, "arrowtable", "correlation123") + +# JSON table data (jsontable) +json_table_bytes = UInt8[91, 123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, 65, 108, 105, 99, 101, 34, 125] # [{"id":1,"name":"Alice"}] +json_table = _deserialize_data(json_table_bytes, "jsontable", "correlation123") ``` """ function _deserialize_data( @@ -928,10 +985,13 @@ function _deserialize_data( elseif payload_type == "dictionary" # JSON data - deserialize json_str = String(data) # Convert bytes to string return JSON.parse(json_str) # Parse JSON string to JSON object - elseif payload_type == "table" # Table data - deserialize Arrow IPC stream + elseif payload_type == "arrowtable" # Arrow table data - deserialize Arrow IPC stream io = IOBuffer(data) # Create buffer from bytes - df = Arrow.Table(io) # Read Arrow IPC format from buffer - return df # Return DataFrame + table = Arrow.Table(io) # Read Arrow IPC format from buffer + return table # Return Arrow.Table + elseif payload_type == "jsontable" # JSON table data - deserialize JSON + json_str = String(data) # Convert bytes to string + return JSON.parse(json_str) # Parse JSON string to Vector{Dict} elseif payload_type == "image" # Image data - return binary return data # Return bytes directly elseif payload_type == "audio" # Audio data - return binary @@ -945,6 +1005,16 @@ function _deserialize_data( end end +function rows_to_columns_dict(rows::Vector{Dict{Symbol,Any}}) + # Ensure rows is not empty + isempty(rows) && return Dict{Symbol,Vector{Any}}() + + # Build column-oriented dictionary + return Dict( + key => [get(row, key, missing) for row in rows] + for key in keys(rows[1]) + ) +end """ plik_oneshot_upload - Upload a single file to a plik server using one-shot mode This function uploads a raw byte array to a plik server in one-shot mode (no upload session). @@ -970,19 +1040,19 @@ retrieves an upload ID and token, then uploads the file data as multipart form d - `"url"` - Full URL to download the uploaded file # Example - ```jldoctest - using HTTP, JSON + ```jldoctest + using HTTP, JSON - fileserver_url = "http://localhost:8080" - dataname = "test.txt" - data = Vector{UInt8}("hello world") + fileserver_url = "http://localhost:8080" + dataname = "test.txt" + data = Vector{UInt8}("hello world") - # Upload to local plik server - result = plik_oneshot_upload(file_server_url, dataname, data) + # Upload to local plik server + result = plik_oneshot_upload(file_server_url, dataname, data) - # Access the result as a Dict - # result["status"], result["uploadid"], result["fileid"], result["url"] - ``` + # Access the result as a Dict + # result["status"], result["uploadid"], result["fileid"], result["url"] + ``` """ function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8}) @@ -1106,18 +1176,4 @@ end - - - - - - - - - - - - - - end # module