reduce docs
This commit is contained in:
1045
docs/architecture.md
1045
docs/architecture.md
File diff suppressed because it is too large
Load Diff
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
This document describes the implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language.
|
This document describes the detailed implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language.
|
||||||
|
|
||||||
**Supported Platforms:**
|
**Supported Platforms:**
|
||||||
- **Julia** - Ground truth implementation (reference)
|
- **Julia** - Ground truth implementation (reference)
|
||||||
@@ -11,6 +11,52 @@ This document describes the implementation of the high-performance, bi-direction
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## Cross-Platform Compatibility Notes
|
||||||
|
|
||||||
|
### 1. Python Payload Type Naming
|
||||||
|
|
||||||
|
The Python implementation uses `"table"` as a single payload type for both Arrow and JSON table serialization, while Julia and JavaScript use separate types (`"arrowtable"` and `"jsontable"`):
|
||||||
|
|
||||||
|
| Platform | Table Types |
|
||||||
|
|----------|-------------|
|
||||||
|
| Julia | `"arrowtable"`, `"jsontable"` |
|
||||||
|
| JavaScript | `"arrowtable"`, `"jsontable"` |
|
||||||
|
| Python | `"table"` (single type) |
|
||||||
|
| MicroPython | Not supported |
|
||||||
|
|
||||||
|
**Impact:** When exchanging data between Python and Julia/JavaScript, the payload type will differ. Python code should use `"table"` while Julia/JavaScript code should use `"arrowtable"` or `"jsontable"`.
|
||||||
|
|
||||||
|
### 2. Direct Transport Encoding Field
|
||||||
|
|
||||||
|
The encoding field in direct transport payloads differs between platforms:
|
||||||
|
|
||||||
|
| Platform | Encoding for Direct Transport |
|
||||||
|
|----------|-------------------------------|
|
||||||
|
| Julia | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` |
|
||||||
|
| JavaScript | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` |
|
||||||
|
| Python | Always `"base64"` for all direct transport payloads |
|
||||||
|
| MicroPython | Always `"base64"` for all direct transport payloads |
|
||||||
|
|
||||||
|
**Impact:** The encoding field may not accurately reflect the original serialization format when using Python or MicroPython.
|
||||||
|
|
||||||
|
### 3. MicroPython Limitations
|
||||||
|
|
||||||
|
MicroPython has significant constraints that affect feature support:
|
||||||
|
|
||||||
|
| Feature | Desktop Platforms | MicroPython |
|
||||||
|
|---------|-------------------|-------------|
|
||||||
|
| `arrowtable` | ✅ | ❌ (not supported - memory constraints) |
|
||||||
|
| `jsontable` | ✅ | ❌ (not supported - memory constraints) |
|
||||||
|
| `table` | ✅ | ❌ (not supported - memory constraints) |
|
||||||
|
| Async/await | ✅ | ❌ (synchronous only) |
|
||||||
|
| File upload/download | ✅ | ⚠️ (placeholder implementations) |
|
||||||
|
| MAX_PAYLOAD_SIZE | 1MB+ | 50KB (hard limit) |
|
||||||
|
| DEFAULT_SIZE_THRESHOLD | 1MB | 100KB |
|
||||||
|
|
||||||
|
**Impact:** MicroPython should only be used for small payloads with direct transport. File server operations are not fully implemented.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## Implementation Files
|
## Implementation Files
|
||||||
|
|
||||||
| Language | Implementation File | Description |
|
| Language | Implementation File | Description |
|
||||||
@@ -178,579 +224,14 @@ The system uses a **standardized list-of-tuples format** for all payload operati
|
|||||||
| `text` | `String` | `string` | `str` | `str` |
|
| `text` | `String` | `string` | `str` | `str` |
|
||||||
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` |
|
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` |
|
||||||
| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array<Object>` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) |
|
| `arrowtable` | `DataFrame`, `Arrow.Table` | `Array<Object>` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) |
|
||||||
| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array<Object>` | `list[dict]`, `list` | `list` |
|
| `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array<Object>` | `list[dict]`, `list` | ⚠️ (limited) |
|
||||||
|
| `table` | ❌ | ❌ | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ |
|
||||||
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
||||||
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
||||||
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` |
|
||||||
| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` |
|
| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` |
|
||||||
|
|
||||||
### Cross-Platform Examples
|
**Note:** Python uses `"table"` as a single type for both Arrow and JSON table serialization. When exchanging data between Python and Julia/JavaScript, ensure the payload type is correctly translated (`"table"` ↔ `"arrowtable"` or `"jsontable"`).
|
||||||
|
|
||||||
#### Julia
|
|
||||||
|
|
||||||
```julia
|
|
||||||
using NATSBridge
|
|
||||||
|
|
||||||
# Single payload - still wrapped in a list
|
|
||||||
env, env_json_str = smartsend(
|
|
||||||
"/test",
|
|
||||||
[("dataname1", data1, "dictionary")],
|
|
||||||
broker_url="nats://localhost:4222",
|
|
||||||
fileserver_upload_handler=plik_oneshot_upload
|
|
||||||
)
|
|
||||||
|
|
||||||
# Multiple payloads with different types
|
|
||||||
env, env_json_str = smartsend(
|
|
||||||
"/test",
|
|
||||||
[("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")],
|
|
||||||
broker_url="nats://localhost:4222"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Mixed content (chat with text, image, audio)
|
|
||||||
env, env_json_str = smartsend(
|
|
||||||
"/chat",
|
|
||||||
[
|
|
||||||
("message_text", "Hello!", "text"),
|
|
||||||
("user_image", image_data, "image"),
|
|
||||||
("audio_clip", audio_data, "audio")
|
|
||||||
],
|
|
||||||
broker_url="nats://localhost:4222"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Receive returns a JSON.Object{String, Any} envelope
|
|
||||||
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff)
|
|
||||||
# env is a JSON.Object{String, Any} with "payloads" field containing Vector{Tuple{String, Any, String}}
|
|
||||||
# Access payloads: env["payloads"] which is a Vector of tuples
|
|
||||||
for (dataname, data, type) in env["payloads"]
|
|
||||||
println("$dataname: $data (type: $type)")
|
|
||||||
end
|
|
||||||
```
|
|
||||||
|
|
||||||
#### JavaScript
|
|
||||||
|
|
||||||
```javascript
|
|
||||||
const NATSBridge = require('natsbridge');
|
|
||||||
|
|
||||||
// Single payload
|
|
||||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
|
||||||
"/test",
|
|
||||||
[["dataname1", data1, "dictionary"]],
|
|
||||||
{
|
|
||||||
broker_url: "nats://localhost:4222",
|
|
||||||
fileserver_upload_handler: plikOneshotUpload
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
// Multiple payloads
|
|
||||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
|
||||||
"/test",
|
|
||||||
[
|
|
||||||
["dataname1", data1, "dictionary"],
|
|
||||||
["dataname2", data2, "arrowtable"]
|
|
||||||
],
|
|
||||||
{ broker_url: "nats://localhost:4222" }
|
|
||||||
);
|
|
||||||
|
|
||||||
// Mixed content
|
|
||||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
|
||||||
"/chat",
|
|
||||||
[
|
|
||||||
["message_text", "Hello!", "text"],
|
|
||||||
["user_image", imageData, "image"],
|
|
||||||
["audio_clip", audioData, "audio"]
|
|
||||||
],
|
|
||||||
{ broker_url: "nats://localhost:4222" }
|
|
||||||
);
|
|
||||||
|
|
||||||
// Receive
|
|
||||||
const env = await NATSBridge.smartreceive(msg, {
|
|
||||||
fileserver_download_handler: fetchWithBackoff
|
|
||||||
});
|
|
||||||
// env is an object with "payloads" field containing Array of arrays
|
|
||||||
// Access payloads: env.payloads which is an Array of [dataname, data, type] arrays
|
|
||||||
for (const [dataname, data, type] of env.payloads) {
|
|
||||||
console.log(`${dataname}: ${data} (type: ${type})`);
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Python
|
|
||||||
|
|
||||||
```python
|
|
||||||
from natsbridge import NATSBridge
|
|
||||||
|
|
||||||
# Single payload
|
|
||||||
env, env_json_str = await NATSBridge.smartsend(
|
|
||||||
"/test",
|
|
||||||
[("dataname1", data1, "dictionary")],
|
|
||||||
broker_url="nats://localhost:4222",
|
|
||||||
fileserver_upload_handler=plik_oneshot_upload
|
|
||||||
)
|
|
||||||
|
|
||||||
# Multiple payloads
|
|
||||||
env, env_json_str = await NATSBridge.smartsend(
|
|
||||||
"/test",
|
|
||||||
[("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")],
|
|
||||||
broker_url="nats://localhost:4222"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Mixed content
|
|
||||||
env, env_json_str = await NATSBridge.smartsend(
|
|
||||||
"/chat",
|
|
||||||
[
|
|
||||||
("message_text", "Hello!", "text"),
|
|
||||||
("user_image", image_data, "image"),
|
|
||||||
("audio_clip", audio_data, "audio")
|
|
||||||
],
|
|
||||||
broker_url="nats://localhost:4222"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Receive
|
|
||||||
env = await NATSBridge.smartreceive(
|
|
||||||
msg,
|
|
||||||
fileserver_download_handler=fetch_with_backoff
|
|
||||||
)
|
|
||||||
# env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
|
|
||||||
# Access payloads: env["payloads"] which is a list of tuples
|
|
||||||
for dataname, data, type_ in env["payloads"]:
|
|
||||||
print(f"{dataname}: {data} (type: {type_})")
|
|
||||||
```
|
|
||||||
|
|
||||||
#### MicroPython
|
|
||||||
|
|
||||||
```python
|
|
||||||
from natsbridge import NATSBridge
|
|
||||||
|
|
||||||
# Limited to text and binary (no tables due to memory constraints)
|
|
||||||
env, env_json_str = NATSBridge.smartsend(
|
|
||||||
"/chat",
|
|
||||||
[
|
|
||||||
("message_text", "Hello!", "text"),
|
|
||||||
("binary_data", data_bytes, "binary")
|
|
||||||
],
|
|
||||||
broker_url="nats://localhost:4222",
|
|
||||||
size_threshold=100000 # Lower threshold for memory constraints
|
|
||||||
)
|
|
||||||
# Note: MicroPython uses synchronous handlers
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### Mixed-Content Transport Examples
|
|
||||||
|
|
||||||
The NATSBridge.jl library supports sending mixed content types in a single message, with automatic transport selection based on payload size. Small payloads (< 1MB threshold) use direct transport (Base64-encoded in NATS message), while large payloads (≥ 1MB) use link transport (uploaded to file server, URL sent via NATS).
|
|
||||||
|
|
||||||
#### Julia Mixed-Content Example
|
|
||||||
|
|
||||||
```julia
|
|
||||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64
|
|
||||||
include("../src/NATSBridge.jl")
|
|
||||||
using .NATSBridge
|
|
||||||
|
|
||||||
# Configuration
|
|
||||||
const SUBJECT = "/natsbridge"
|
|
||||||
const NATS_URL = "nats://localhost:4222"
|
|
||||||
const FILESERVER_URL = "http://localhost:8080"
|
|
||||||
|
|
||||||
# Create sample data - mix of small and large payloads
|
|
||||||
text_data = "Hello! This is a test chat message. 🎉"
|
|
||||||
|
|
||||||
dict_data = Dict(
|
|
||||||
"type" => "chat",
|
|
||||||
"sender" => "serviceA",
|
|
||||||
"content" => Dict("text" => "JSON-formatted message", "format" => "markdown")
|
|
||||||
)
|
|
||||||
|
|
||||||
# Small arrow table (< 1MB) - direct transport
|
|
||||||
arrow_table_small = DataFrame(
|
|
||||||
id = 1:10,
|
|
||||||
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
|
|
||||||
score = rand(50:100, 10),
|
|
||||||
active = rand([true, false], 10)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Large arrow table (≥ 1MB) - link transport
|
|
||||||
arrow_table_large = DataFrame(
|
|
||||||
id = 1:200_000,
|
|
||||||
name = ["user_$i" for i in 1:200_000],
|
|
||||||
score = rand(50:100, 200_000),
|
|
||||||
active = rand([true, false], 200_000)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Small jsontable (< 1MB) - direct transport
|
|
||||||
json_table_small = DataFrame(
|
|
||||||
id = 1:10,
|
|
||||||
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
|
|
||||||
score = rand(50:100, 10)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Large jsontable (≥ 1MB) - link transport
|
|
||||||
json_table_large = DataFrame(
|
|
||||||
id = 1:1_500_000,
|
|
||||||
name = ["user_$i" for i in 1:1_500_000],
|
|
||||||
score = rand(50:100, 1_500_000)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Binary data (small) - direct transport
|
|
||||||
binary_data_small = UInt8[rand(1:255) for _ in 1:200]
|
|
||||||
|
|
||||||
# Binary data (large) - link transport
|
|
||||||
binary_data_large = UInt8[rand(1:255) for _ in 1:1_500_000]
|
|
||||||
|
|
||||||
# Read image file
|
|
||||||
file_path_small_image = "./test/small_image.jpg"
|
|
||||||
file_data_small_image = read(file_path_small_image)
|
|
||||||
filename_small_image = basename(file_path_small_image)
|
|
||||||
|
|
||||||
file_path_large_image = "./test/large_image.png"
|
|
||||||
file_data_large_image = read(file_path_large_image)
|
|
||||||
filename_large_image = basename(file_path_large_image)
|
|
||||||
|
|
||||||
# Create payloads list - mixed content with both small and large data
|
|
||||||
payloads = [
|
|
||||||
# Small data (direct transport)
|
|
||||||
("chat_text", text_data, "text"),
|
|
||||||
("chat_json", dict_data, "dictionary"),
|
|
||||||
("arrow_table_small", arrow_table_small, "arrowtable"),
|
|
||||||
("json_table_small", json_table_small, "jsontable"),
|
|
||||||
(filename_small_image, file_data_small_image, "binary"),
|
|
||||||
|
|
||||||
# Large data (link transport)
|
|
||||||
("arrow_table_large", arrow_table_large, "arrowtable"),
|
|
||||||
("json_table_large", json_table_large, "jsontable"),
|
|
||||||
(filename_large_image, file_data_large_image, "binary"),
|
|
||||||
("binary_file_large", binary_data_large, "binary")
|
|
||||||
]
|
|
||||||
|
|
||||||
# Use smartsend with mixed content
|
|
||||||
correlation_id = string(uuid4())
|
|
||||||
sendinfo = NATSBridge.smartsend(
|
|
||||||
SUBJECT,
|
|
||||||
payloads;
|
|
||||||
broker_url = NATS_URL,
|
|
||||||
fileserver_url = FILESERVER_URL,
|
|
||||||
fileserver_upload_handler = plik_oneshot_upload,
|
|
||||||
size_threshold = 1_000_000, # 1MB threshold
|
|
||||||
correlation_id = correlation_id,
|
|
||||||
msg_purpose = "chat",
|
|
||||||
sender_name = "mix_sender",
|
|
||||||
receiver_name = "",
|
|
||||||
receiver_id = "",
|
|
||||||
reply_to = "",
|
|
||||||
reply_to_msg_id = "",
|
|
||||||
is_publish = true
|
|
||||||
)
|
|
||||||
|
|
||||||
env, env_json_str = sendinfo
|
|
||||||
|
|
||||||
# Log transport type for each payload
|
|
||||||
for (i, payload) in enumerate(env.payloads)
|
|
||||||
println("Payload $i ('$payload.dataname'):")
|
|
||||||
println(" Transport: $(payload.transport)")
|
|
||||||
println(" Type: $(payload.payload_type)")
|
|
||||||
println(" Size: $(payload.size) bytes")
|
|
||||||
println(" Encoding: $(payload.encoding)")
|
|
||||||
|
|
||||||
if payload.transport == "link"
|
|
||||||
println(" URL: $(payload.data)")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# Summary
|
|
||||||
direct_count = count(p -> p.transport == "direct", env.payloads)
|
|
||||||
link_count = count(p -> p.transport == "link", env.payloads)
|
|
||||||
println("\n--- Transport Summary ---")
|
|
||||||
println("Direct transport: $direct_count payloads")
|
|
||||||
println("Link transport: $link_count payloads")
|
|
||||||
```
|
|
||||||
|
|
||||||
**Expected Output:**
|
|
||||||
```
|
|
||||||
Payload 1 ('chat_text'):
|
|
||||||
Transport: direct
|
|
||||||
Type: text
|
|
||||||
Size: 38 bytes
|
|
||||||
Encoding: base64
|
|
||||||
|
|
||||||
Payload 2 ('chat_json'):
|
|
||||||
Transport: direct
|
|
||||||
Type: dictionary
|
|
||||||
Size: 156 bytes
|
|
||||||
Encoding: json
|
|
||||||
|
|
||||||
Payload 3 ('arrow_table_small'):
|
|
||||||
Transport: direct
|
|
||||||
Type: arrowtable
|
|
||||||
Size: 1245 bytes
|
|
||||||
Encoding: arrow-ipc
|
|
||||||
|
|
||||||
Payload 4 ('json_table_small'):
|
|
||||||
Transport: direct
|
|
||||||
Type: jsontable
|
|
||||||
Size: 892 bytes
|
|
||||||
Encoding: json
|
|
||||||
|
|
||||||
Payload 5 ('small_image.jpg'):
|
|
||||||
Transport: direct
|
|
||||||
Type: binary
|
|
||||||
Size: 73269 bytes
|
|
||||||
Encoding: base64
|
|
||||||
|
|
||||||
Payload 6 ('arrow_table_large'):
|
|
||||||
Transport: link
|
|
||||||
Type: arrowtable
|
|
||||||
Size: 5242880 bytes
|
|
||||||
Encoding: arrow-ipc
|
|
||||||
URL: http://localhost:8080/file/ABC123/DEF456/arrow_table_large.arrow
|
|
||||||
|
|
||||||
Payload 7 ('json_table_large'):
|
|
||||||
Transport: link
|
|
||||||
Type: jsontable
|
|
||||||
Size: 45678900 bytes
|
|
||||||
Encoding: json
|
|
||||||
URL: http://localhost:8080/file/GHI789/JKL012/json_table_large.json
|
|
||||||
|
|
||||||
Payload 8 ('large_image.png'):
|
|
||||||
Transport: link
|
|
||||||
Type: binary
|
|
||||||
Size: 1168437 bytes
|
|
||||||
Encoding: base64
|
|
||||||
URL: http://localhost:8080/file/MNO345/PQR678/large_image.png
|
|
||||||
|
|
||||||
Payload 9 ('binary_file_large'):
|
|
||||||
Transport: link
|
|
||||||
Type: binary
|
|
||||||
Size: 1500000 bytes
|
|
||||||
Encoding: base64
|
|
||||||
URL: http://localhost:8080/file/STU901/VWX234/binary_file_large.bin
|
|
||||||
|
|
||||||
--- Transport Summary ---
|
|
||||||
Direct transport: 5 payloads
|
|
||||||
Link transport: 4 payloads
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Receiver Example
|
|
||||||
|
|
||||||
```julia
|
|
||||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64
|
|
||||||
include("../src/NATSBridge.jl")
|
|
||||||
using .NATSBridge
|
|
||||||
|
|
||||||
const SUBJECT = "/natsbridge"
|
|
||||||
const NATS_URL = "nats://localhost:4222"
|
|
||||||
|
|
||||||
conn = NATS.connect(NATS_URL)
|
|
||||||
NATS.subscribe(conn, SUBJECT) do msg
|
|
||||||
# Use NATSBridge.smartreceive to handle the data
|
|
||||||
result = NATSBridge.smartreceive(
|
|
||||||
msg;
|
|
||||||
max_retries = 5,
|
|
||||||
base_delay = 100,
|
|
||||||
max_delay = 5000
|
|
||||||
)
|
|
||||||
|
|
||||||
println("Received $(length(result["payloads"])) payloads")
|
|
||||||
|
|
||||||
# Process each payload
|
|
||||||
for (dataname, data, data_type) in result["payloads"]
|
|
||||||
println("\n=== Payload: $dataname (type: $data_type) ===")
|
|
||||||
|
|
||||||
if data_type == "arrowtable"
|
|
||||||
if isa(data, Arrow.Table)
|
|
||||||
df = DataFrame(data)
|
|
||||||
println(" Type: Arrow.Table")
|
|
||||||
println(" Size: $(size(df, 1)) rows x $(size(df, 2)) columns")
|
|
||||||
end
|
|
||||||
elseif data_type == "jsontable"
|
|
||||||
if isa(data, Vector{Any})
|
|
||||||
df = DataFrame(data)
|
|
||||||
println(" Type: Vector{Dict}")
|
|
||||||
println(" Size: $(length(data)) rows")
|
|
||||||
end
|
|
||||||
elseif data_type == "text"
|
|
||||||
if isa(data, String)
|
|
||||||
println(" Type: String")
|
|
||||||
println(" Length: $(length(data)) characters")
|
|
||||||
end
|
|
||||||
elseif data_type in ["image", "audio", "video", "binary"]
|
|
||||||
if isa(data, Vector{UInt8})
|
|
||||||
println(" Type: Vector{UInt8} (binary)")
|
|
||||||
println(" Size: $(length(data)) bytes")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# Keep listening
|
|
||||||
sleep(120)
|
|
||||||
NATS.drain(conn)
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Architecture
|
|
||||||
|
|
||||||
### Cross-Platform Claim-Check Pattern
|
|
||||||
|
|
||||||
```mermaid
|
|
||||||
flowchart TD
|
|
||||||
A[SmartSend Function] --> B{Is payload size < 1MB?}
|
|
||||||
B -->|Yes | C[Direct Path<br/><small>< 1MB</small>]
|
|
||||||
B -->|No | D[Link Path<br/><small>>= 1MB</small>]
|
|
||||||
|
|
||||||
C --> C1[Serialize to Buffer]
|
|
||||||
C1 --> C2[Base64/JSON encode]
|
|
||||||
C2 --> C3[Publish to NATS]
|
|
||||||
|
|
||||||
D --> D1[Serialize to Buffer]
|
|
||||||
D1 --> D2[Upload to HTTP Server]
|
|
||||||
D2 --> D3[Publish to NATS with URL]
|
|
||||||
|
|
||||||
style A fill:#e1f5ff,stroke:#0066cc,stroke-width:2px
|
|
||||||
style B fill:#fff4e1,stroke:#cc6600,stroke-width:2px
|
|
||||||
style C fill:#e8f5e9,stroke:#008000,stroke-width:2px
|
|
||||||
style D fill:#e8f5e9,stroke:#008000,stroke-width:2px
|
|
||||||
style C1 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
||||||
style C2 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
||||||
style C3 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
||||||
style D1 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
||||||
style D2 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
||||||
style D3 fill:#f5f5f5,stroke:#666,stroke-width:1px
|
|
||||||
```
|
|
||||||
|
|
||||||
**Claim-Check Pattern Overview:**
|
|
||||||
- **Direct Path** (< 1MB): Payload is serialized, Base64-encoded, and published directly to NATS
|
|
||||||
- **Link Path** (≥ 1MB): Payload is serialized, uploaded to an HTTP file server, and only the URL is published to NATS (claim-check pattern)
|
|
||||||
|
|
||||||
### smartsend Return Value
|
|
||||||
|
|
||||||
All platforms return a tuple/array containing both the envelope and JSON string:
|
|
||||||
|
|
||||||
#### Julia
|
|
||||||
|
|
||||||
```julia
|
|
||||||
env, env_json_str = smartsend(...)
|
|
||||||
# Returns: ::Tuple{msg_envelope_v1, String}
|
|
||||||
# env::msg_envelope_v1 - The envelope object with all metadata and payloads
|
|
||||||
# env_json_str::String - JSON string for publishing to NATS
|
|
||||||
```
|
|
||||||
|
|
||||||
#### JavaScript
|
|
||||||
|
|
||||||
```javascript
|
|
||||||
const [env, env_json_str] = await smartsend(...);
|
|
||||||
// Returns: Promise<[env, env_json_str]>
|
|
||||||
// env: Object with all metadata and payloads
|
|
||||||
// env_json_str: String for publishing to NATS
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Python
|
|
||||||
|
|
||||||
```python
|
|
||||||
env, env_json_str = await smartsend(...)
|
|
||||||
# Returns: Tuple[Dict, str]
|
|
||||||
# env: Dict with all metadata and payloads
|
|
||||||
# env_json_str: String for publishing to NATS
|
|
||||||
```
|
|
||||||
|
|
||||||
#### MicroPython
|
|
||||||
|
|
||||||
```python
|
|
||||||
env, env_json_str = NATSBridge.smartsend(...)
|
|
||||||
# Returns: Tuple[Dict, str]
|
|
||||||
# Note: MicroPython returns plain dicts (no structured envelope object)
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Installation
|
|
||||||
|
|
||||||
### Julia Dependencies
|
|
||||||
|
|
||||||
```julia
|
|
||||||
using Pkg
|
|
||||||
Pkg.add("NATS")
|
|
||||||
Pkg.add("Arrow")
|
|
||||||
Pkg.add("JSON")
|
|
||||||
Pkg.add("HTTP")
|
|
||||||
Pkg.add("UUIDs")
|
|
||||||
Pkg.add("Dates")
|
|
||||||
Pkg.add("PrettyPrinting")
|
|
||||||
Pkg.add("DataFrames")
|
|
||||||
```
|
|
||||||
|
|
||||||
### JavaScript Dependencies (Node.js)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
npm install nats apache-arrow node-fetch
|
|
||||||
# or
|
|
||||||
yarn add nats apache-arrow node-fetch
|
|
||||||
```
|
|
||||||
|
|
||||||
**Note:** Node.js has a built-in `crypto` module for UUID generation, so no external `uuid` package is needed.
|
|
||||||
|
|
||||||
### JavaScript Dependencies (Browser)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
npm install nats apache-arrow
|
|
||||||
# or use CDN:
|
|
||||||
# https://unpkg.com/nats-js/dist/bundle/nats.min.js
|
|
||||||
# https://unpkg.com/apache-arrow/arrow.min.js
|
|
||||||
```
|
|
||||||
|
|
||||||
**Note:** For browser UUID generation, use the built-in `crypto.randomUUID()` API (available in modern browsers) or a lightweight alternative like `uuidv4` package.
|
|
||||||
|
|
||||||
### Python Dependencies (Desktop)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
pip install nats-py aiohttp pyarrow pandas python-dateutil
|
|
||||||
```
|
|
||||||
|
|
||||||
### MicroPython Dependencies
|
|
||||||
|
|
||||||
MicroPython uses built-in modules:
|
|
||||||
- `network` - NATS connection (custom implementation)
|
|
||||||
- `time` - Timestamps
|
|
||||||
- `uos` - File operations
|
|
||||||
- `base64` - Base64 encoding
|
|
||||||
- `json` - JSON parsing
|
|
||||||
- `struct` - Binary data handling
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Usage Tutorial
|
|
||||||
|
|
||||||
### Step 1: Start NATS Server
|
|
||||||
|
|
||||||
```bash
|
|
||||||
docker run -p 4222:4222 nats:latest
|
|
||||||
```
|
|
||||||
|
|
||||||
### Step 2: Start HTTP File Server (optional)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Create a directory for file uploads
|
|
||||||
mkdir -p /tmp/fileserver
|
|
||||||
|
|
||||||
# Use any HTTP server that supports POST for file uploads
|
|
||||||
# Example: Python's built-in server
|
|
||||||
python3 -m http.server 8080 --directory /tmp/fileserver
|
|
||||||
```
|
|
||||||
|
|
||||||
### Step 3: Run Test Scenarios
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Julia tests
|
|
||||||
julia test/test_julia_to_julia_text_sender.jl
|
|
||||||
julia test/test_julia_to_julia_text_receiver.jl
|
|
||||||
|
|
||||||
# JavaScript tests (Node.js)
|
|
||||||
node test/test_js_text_sender.js
|
|
||||||
node test/test_js_text_receiver.js
|
|
||||||
|
|
||||||
# Python tests
|
|
||||||
python3 test/test_py_text_sender.py
|
|
||||||
python3 test/test_py_text_receiver.py
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -1236,17 +717,6 @@ function plik_oneshot_upload(file_server_url::String, filepath::String)
|
|||||||
end
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
**Usage Examples:**
|
|
||||||
|
|
||||||
```julia
|
|
||||||
# Upload binary data
|
|
||||||
data = read("./test_image.jpg")
|
|
||||||
result = plik_oneshot_upload("http://localhost:8080", "my_image.jpg", data)
|
|
||||||
|
|
||||||
# Upload file directly from disk
|
|
||||||
result = plik_oneshot_upload("http://localhost:8080", "./test_image.jpg")
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
### JavaScript Implementation
|
### JavaScript Implementation
|
||||||
@@ -1846,18 +1316,24 @@ except ImportError:
|
|||||||
|
|
||||||
|
|
||||||
def _serialize_data(data: Any, payload_type: str) -> bytes:
|
def _serialize_data(data: Any, payload_type: str) -> bytes:
|
||||||
"""Serialize data to bytes based on type."""
|
"""
|
||||||
|
Serialize data to bytes based on type.
|
||||||
|
|
||||||
|
Note: Python uses "table" as a single type for both Arrow and JSON table
|
||||||
|
serialization. Julia/JavaScript use separate "arrowtable" and "jsontable" types.
|
||||||
|
"""
|
||||||
if payload_type == 'text':
|
if payload_type == 'text':
|
||||||
if isinstance(data, str):
|
if isinstance(data, str):
|
||||||
return data.encode('utf-8')
|
return data.encode('utf-8')
|
||||||
else:
|
else:
|
||||||
raise Error('Text data must be a string')
|
raise ValueError('Text data must be a string')
|
||||||
elif payload_type == 'dictionary':
|
elif payload_type == 'dictionary':
|
||||||
json_str = json.dumps(data)
|
json_str = json.dumps(data)
|
||||||
return json_str.encode('utf-8')
|
return json_str.encode('utf-8')
|
||||||
elif payload_type == 'arrowtable':
|
elif payload_type == 'table':
|
||||||
|
# Python uses "table" for both arrowtable and jsontable
|
||||||
if not ARROW_AVAILABLE:
|
if not ARROW_AVAILABLE:
|
||||||
raise Error('pyarrow not available for table serialization')
|
raise RuntimeError('pyarrow not available for table serialization')
|
||||||
|
|
||||||
import io
|
import io
|
||||||
buf = io.BytesIO()
|
buf = io.BytesIO()
|
||||||
@@ -1865,38 +1341,24 @@ def _serialize_data(data: Any, payload_type: str) -> bytes:
|
|||||||
if isinstance(data, pd.DataFrame):
|
if isinstance(data, pd.DataFrame):
|
||||||
# Serialize DataFrame to Arrow
|
# Serialize DataFrame to Arrow
|
||||||
table = arrow.Table.from_pandas(data)
|
table = arrow.Table.from_pandas(data)
|
||||||
sink = arrow.ipc.new_file(buf)
|
sink = ipc.new_file(buf, table.schema)
|
||||||
arrow.ipc.write_table(table, sink)
|
ipc.write_table(table, sink)
|
||||||
|
sink.close()
|
||||||
|
return buf.getvalue()
|
||||||
|
elif isinstance(data, arrow.Table):
|
||||||
|
sink = ipc.new_file(buf, data.schema)
|
||||||
|
ipc.write_table(data, sink)
|
||||||
sink.close()
|
sink.close()
|
||||||
return buf.getvalue()
|
return buf.getvalue()
|
||||||
else:
|
else:
|
||||||
raise Error('arrowtable data must be a pandas DataFrame')
|
raise ValueError('Table data must be a pandas DataFrame or pyarrow Table')
|
||||||
elif payload_type == 'jsontable':
|
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
||||||
# Serialize directly to JSON
|
|
||||||
json_str = json.dumps(data)
|
|
||||||
return json_str.encode('utf-8')
|
|
||||||
elif payload_type == 'image':
|
|
||||||
if isinstance(data, (bytes, bytearray)):
|
if isinstance(data, (bytes, bytearray)):
|
||||||
return bytes(data)
|
return bytes(data)
|
||||||
else:
|
else:
|
||||||
raise Error('Image data must be bytes')
|
raise ValueError(f'{payload_type} data must be bytes')
|
||||||
elif payload_type == 'audio':
|
|
||||||
if isinstance(data, (bytes, bytearray)):
|
|
||||||
return bytes(data)
|
|
||||||
else:
|
|
||||||
raise Error('Audio data must be bytes')
|
|
||||||
elif payload_type == 'video':
|
|
||||||
if isinstance(data, (bytes, bytearray)):
|
|
||||||
return bytes(data)
|
|
||||||
else:
|
|
||||||
raise Error('Video data must be bytes')
|
|
||||||
elif payload_type == 'binary':
|
|
||||||
if isinstance(data, (bytes, bytearray)):
|
|
||||||
return bytes(data)
|
|
||||||
else:
|
|
||||||
raise Error('Binary data must be bytes')
|
|
||||||
else:
|
else:
|
||||||
raise Error(f'Unknown payload_type: {payload_type}')
|
raise ValueError(f'Unknown payload_type: {payload_type}')
|
||||||
```
|
```
|
||||||
|
|
||||||
#### deserializeData Implementation
|
#### deserializeData Implementation
|
||||||
@@ -1916,34 +1378,30 @@ except ImportError:
|
|||||||
|
|
||||||
|
|
||||||
def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any:
|
def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any:
|
||||||
"""Deserialize bytes to data based on type."""
|
"""
|
||||||
|
Deserialize bytes to data based on type.
|
||||||
|
|
||||||
|
Note: Python uses "table" as a single type for both Arrow and JSON table
|
||||||
|
deserialization. Julia/JavaScript use separate "arrowtable" and "jsontable" types.
|
||||||
|
"""
|
||||||
if payload_type == 'text':
|
if payload_type == 'text':
|
||||||
return data.decode('utf-8')
|
return data.decode('utf-8')
|
||||||
elif payload_type == 'dictionary':
|
elif payload_type == 'dictionary':
|
||||||
json_str = data.decode('utf-8')
|
json_str = data.decode('utf-8')
|
||||||
return json.loads(json_str)
|
return json.loads(json_str)
|
||||||
elif payload_type == 'arrowtable':
|
elif payload_type == 'table':
|
||||||
|
# Python uses "table" for both arrowtable and jsontable
|
||||||
if not ARROW_AVAILABLE:
|
if not ARROW_AVAILABLE:
|
||||||
raise Error('pyarrow not available for table deserialization')
|
raise RuntimeError('pyarrow not available for table deserialization')
|
||||||
|
|
||||||
import io
|
import io
|
||||||
buf = io.BytesIO(data)
|
buf = io.BytesIO(data)
|
||||||
reader = arrow.ipc.open_file(buf)
|
reader = ipc.open_file(buf)
|
||||||
return reader.read_all().to_pandas()
|
return reader.read_all().to_pandas()
|
||||||
elif payload_type == 'jsontable':
|
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
||||||
# Deserialize from JSON - returns list[dict]
|
|
||||||
json_str = data.decode('utf-8')
|
|
||||||
return json.loads(json_str)
|
|
||||||
elif payload_type == 'image':
|
|
||||||
return data
|
|
||||||
elif payload_type == 'audio':
|
|
||||||
return data
|
|
||||||
elif payload_type == 'video':
|
|
||||||
return data
|
|
||||||
elif payload_type == 'binary':
|
|
||||||
return data
|
return data
|
||||||
else:
|
else:
|
||||||
raise Error(f'Unknown payload_type: {payload_type}')
|
raise ValueError(f'Unknown payload_type: {payload_type}')
|
||||||
```
|
```
|
||||||
|
|
||||||
#### fetchWithBackoff Implementation
|
#### fetchWithBackoff Implementation
|
||||||
@@ -2032,9 +1490,9 @@ async def plik_oneshot_upload(
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## MicroPython Implementation
|
### MicroPython Implementation
|
||||||
|
|
||||||
### Limitations
|
#### Limitations
|
||||||
|
|
||||||
MicroPython has significant constraints compared to desktop implementations:
|
MicroPython has significant constraints compared to desktop implementations:
|
||||||
|
|
||||||
@@ -2042,13 +1500,15 @@ MicroPython has significant constraints compared to desktop implementations:
|
|||||||
|---------|---------|-------------|
|
|---------|---------|-------------|
|
||||||
| Memory | Unlimited | ~256KB - 1MB |
|
| Memory | Unlimited | ~256KB - 1MB |
|
||||||
| Arrow IPC | ✅ | ❌ (not supported) |
|
| Arrow IPC | ✅ | ❌ (not supported) |
|
||||||
| Async/Await | ✅ | ⚠️ (uasyncio only) |
|
| Async/Await | ✅ | ❌ (synchronous only) |
|
||||||
| Large payloads (>1MB) | ✅ | ❌ (enforced limit) |
|
| Large payloads (>1MB) | ✅ | ❌ (enforced limit) |
|
||||||
| arrowtable | ✅ | ❌ |
|
| arrowtable | ✅ | ❌ (not supported) |
|
||||||
| jsontable | ⚠️ (limited) | ⚠️ (limited) |
|
| jsontable | ✅ | ❌ (not supported) |
|
||||||
| Multiple payloads | ✅ | ⚠️ (limited) |
|
| Multiple payloads | ✅ | ⚠️ (limited) |
|
||||||
|
|
||||||
### MicroPython Module Structure
|
**Note:** MicroPython does NOT support table types (`arrowtable` or `jsontable`) due to memory constraints.
|
||||||
|
|
||||||
|
#### Module Structure
|
||||||
|
|
||||||
```python
|
```python
|
||||||
# natsbridge_mpy.py (MicroPython)
|
# natsbridge_mpy.py (MicroPython)
|
||||||
@@ -2058,15 +1518,16 @@ import json
|
|||||||
import base64
|
import base64
|
||||||
import uos
|
import uos
|
||||||
import struct
|
import struct
|
||||||
|
import random
|
||||||
|
|
||||||
# Constants
|
# Constants
|
||||||
DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython
|
DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython
|
||||||
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
DEFAULT_BROKER_URL = "nats://localhost:4222"
|
||||||
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
DEFAULT_FILESERVER_URL = "http://localhost:8080"
|
||||||
MAX_PAYLOAD_SIZE = 50000 # Hard limit
|
MAX_PAYLOAD_SIZE = 50000 # Hard limit (lower than threshold for safety)
|
||||||
|
|
||||||
# Note: MicroPython uses list[list] for jsontable
|
# Note: MicroPython does NOT support table types (arrowtable/jsontable)
|
||||||
# No DataFrame support
|
# Only supports: text, dictionary, image, audio, video, binary
|
||||||
|
|
||||||
|
|
||||||
class NATSBridge:
|
class NATSBridge:
|
||||||
@@ -2175,32 +1636,44 @@ class NATSBridge:
|
|||||||
return env_json_obj
|
return env_json_obj
|
||||||
|
|
||||||
def _serialize_data(self, data, payload_type):
|
def _serialize_data(self, data, payload_type):
|
||||||
"""Serialize data (MicroPython version - no arrowtable support)."""
|
"""
|
||||||
|
Serialize data (MicroPython version).
|
||||||
|
|
||||||
|
Note: MicroPython does NOT support table types (arrowtable/jsontable).
|
||||||
|
Only supports: text, dictionary, image, audio, video, binary
|
||||||
|
"""
|
||||||
if payload_type == 'text':
|
if payload_type == 'text':
|
||||||
return data.encode('utf-8')
|
if isinstance(data, str):
|
||||||
|
return data.encode('utf-8')
|
||||||
|
else:
|
||||||
|
raise ValueError('Text data must be a string')
|
||||||
elif payload_type == 'dictionary':
|
elif payload_type == 'dictionary':
|
||||||
return json.dumps(data).encode('utf-8')
|
json_str = json.dumps(data)
|
||||||
elif payload_type == 'jsontable':
|
return json_str.encode('utf-8')
|
||||||
# Serialize list of lists to JSON
|
|
||||||
return json.dumps(data).encode('utf-8')
|
|
||||||
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
||||||
return bytes(data)
|
if isinstance(data, (bytes, bytearray, memoryview)):
|
||||||
|
return bytes(data)
|
||||||
|
else:
|
||||||
|
raise ValueError(f'{payload_type} data must be bytes')
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown payload_type: {payload_type}")
|
raise ValueError(f'Unknown payload_type: {payload_type}')
|
||||||
|
|
||||||
def _deserialize_data(self, data, payload_type):
|
def _deserialize_data(self, data, payload_type):
|
||||||
"""Deserialize data (MicroPython version)."""
|
"""
|
||||||
|
Deserialize data (MicroPython version).
|
||||||
|
|
||||||
|
Note: MicroPython does NOT support table types (arrowtable/jsontable).
|
||||||
|
Only supports: text, dictionary, image, audio, video, binary
|
||||||
|
"""
|
||||||
if payload_type == 'text':
|
if payload_type == 'text':
|
||||||
return data.decode('utf-8')
|
return data.decode('utf-8')
|
||||||
elif payload_type == 'dictionary':
|
elif payload_type == 'dictionary':
|
||||||
return json.loads(data.decode('utf-8'))
|
json_str = data.decode('utf-8')
|
||||||
elif payload_type == 'jsontable':
|
return json.loads(json_str)
|
||||||
# Returns list of lists
|
|
||||||
return json.loads(data.decode('utf-8'))
|
|
||||||
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
elif payload_type in ('image', 'audio', 'video', 'binary'):
|
||||||
return data
|
return data
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown payload_type: {payload_type}")
|
raise ValueError(f'Unknown payload_type: {payload_type}')
|
||||||
|
|
||||||
def _generate_uuid(self):
|
def _generate_uuid(self):
|
||||||
"""Generate simple UUID (MicroPython compatible)."""
|
"""Generate simple UUID (MicroPython compatible)."""
|
||||||
@@ -2372,8 +1845,8 @@ This cross-platform NATS bridge provides:
|
|||||||
3. **Message Format Consistency**: Identical JSON schemas across all platforms
|
3. **Message Format Consistency**: Identical JSON schemas across all platforms
|
||||||
4. **Handler Abstraction**: File server operations abstracted through configurable handlers
|
4. **Handler Abstraction**: File server operations abstracted through configurable handlers
|
||||||
5. **Platform-Specific Optimizations**:
|
5. **Platform-Specific Optimizations**:
|
||||||
- **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython)
|
- **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython)
|
||||||
- **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in all platforms)
|
- **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in Julia, JavaScript, Python; NOT supported in MicroPython)
|
||||||
|
|
||||||
The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior.
|
The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior.
|
||||||
|
|
||||||
@@ -2382,4 +1855,5 @@ The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as
|
|||||||
| Datatype | Serialization | Use Case | Encoding | Supported Platforms |
|
| Datatype | Serialization | Use Case | Encoding | Supported Platforms |
|
||||||
|----------|---------------|----------|----------|---------------------|
|
|----------|---------------|----------|----------|---------------------|
|
||||||
| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python |
|
| `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python |
|
||||||
| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python, MicroPython |
|
| `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python |
|
||||||
|
| `table` | Apache Arrow IPC (Python only) | Python's unified table type | `arrow-ipc` → `base64` | Python |
|
||||||
|
|||||||
@@ -33,18 +33,28 @@ smartsend(subject, [(dataname, data, type), ...], options)
|
|||||||
env = smartreceive(msg, options)
|
env = smartreceive(msg, options)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**Important Platform Differences:**
|
||||||
|
|
||||||
|
1. **Encoding field:** Julia and JavaScript preserve the original serialization format in the encoding field (`"base64"`, `"json"`, or `"arrow-ipc"`), while Python and MicroPython always use `"base64"` for all direct transport payloads.
|
||||||
|
|
||||||
|
2. **Async vs Sync:** JavaScript and Python desktop use async/await, while MicroPython uses synchronous API.
|
||||||
|
|
||||||
### Supported Payload Types
|
### Supported Payload Types
|
||||||
|
|
||||||
| Type | Julia | JavaScript | Python | MicroPython |
|
| Type | Julia | JavaScript | Python | MicroPython |
|
||||||
|------|-------|------------|--------|-------------|
|
|------|-------|------------|--------|-------------|
|
||||||
| `text` | `String` | `string` | `str` | `str` |
|
| `text` | `String` | `string` | `str` | `str` |
|
||||||
| `dictionary` | `Dict` | `Object` | `dict` | `dict` |
|
| `dictionary` | `Dict` | `Object` | `dict` | `dict` |
|
||||||
| `table` | `DataFrame` | `Array<Object>` | `DataFrame` | ❌ |
|
| `arrowtable` | `DataFrame` | `Array<Object>` | `pandas.DataFrame` | ❌ |
|
||||||
|
| `jsontable` | `Vector{NamedTuple}` | `Array<Object>` | `list[dict]` | ❌ |
|
||||||
|
| `table` | ❌ | ❌ | `pandas.DataFrame` | ❌ |
|
||||||
| `image` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
| `image` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
||||||
| `audio` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
| `audio` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
||||||
| `video` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
| `video` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
||||||
| `binary` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
| `binary` | `Vector{UInt8}` | `Uint8Array` | `bytes` | `bytearray` |
|
||||||
|
|
||||||
|
**Note on MicroPython:** MicroPython does not support table types (`arrowtable`, `jsontable`, or `table`) due to memory constraints. Use `dictionary` or `binary` instead.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Prerequisites
|
## Prerequisites
|
||||||
@@ -645,7 +655,7 @@ df = DataFrame(
|
|||||||
score = [95, 88, 92]
|
score = [95, 88, 92]
|
||||||
)
|
)
|
||||||
|
|
||||||
data = [("students", df, "table")]
|
data = [("students", df, "arrowtable")]
|
||||||
env, env_json_str = smartsend("/data/students", data, broker_url="nats://localhost:4222")
|
env, env_json_str = smartsend("/data/students", data, broker_url="nats://localhost:4222")
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -661,7 +671,7 @@ const table_data = [
|
|||||||
{ id: 3, name: "Charlie", score: 92 }
|
{ id: 3, name: "Charlie", score: 92 }
|
||||||
];
|
];
|
||||||
|
|
||||||
const data = [["students", table_data, "table"]];
|
const data = [["students", table_data, "arrowtable"]];
|
||||||
const [env, env_json_str] = await NATSBridge.smartsend(
|
const [env, env_json_str] = await NATSBridge.smartsend(
|
||||||
"/data/students",
|
"/data/students",
|
||||||
data,
|
data,
|
||||||
|
|||||||
@@ -959,25 +959,14 @@ function send_batch(sender::SensorSender, readings::Vector{SensorReading})
|
|||||||
|
|
||||||
arrow_data = take!(buf)
|
arrow_data = take!(buf)
|
||||||
|
|
||||||
# Send based on size
|
# Send based on size (auto-selected by smartsend)
|
||||||
if length(arrow_data) < 1048576 # < 1MB
|
data = [("batch", arrow_data, "arrowtable")]
|
||||||
data = [("batch", arrow_data, "table")]
|
smartsend(
|
||||||
smartsend(
|
"/sensors/batch",
|
||||||
"/sensors/batch",
|
data,
|
||||||
data,
|
broker_url=sender.broker_url,
|
||||||
broker_url=sender.broker_url,
|
fileserver_url=sender.fileserver_url
|
||||||
fileserver_url=sender.fileserver_url
|
)
|
||||||
)
|
|
||||||
else
|
|
||||||
# Upload to file server
|
|
||||||
data = [("batch", arrow_data, "table")]
|
|
||||||
smartsend(
|
|
||||||
"/sensors/batch",
|
|
||||||
data,
|
|
||||||
broker_url=sender.broker_url,
|
|
||||||
fileserver_url=sender.fileserver_url
|
|
||||||
)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -1037,28 +1026,16 @@ class SensorSender {
|
|||||||
const buffer = arrow.tableFromBatches([recordBatch]).toBuffer();
|
const buffer = arrow.tableFromBatches([recordBatch]).toBuffer();
|
||||||
const arrow_data = new Uint8Array(buffer);
|
const arrow_data = new Uint8Array(buffer);
|
||||||
|
|
||||||
// Send based on size
|
// Send based on size (auto-selected by smartsend)
|
||||||
if (arrow_data.length < 1048576) {
|
const data = [["batch", arrow_data, "arrowtable"]];
|
||||||
const data = [["batch", arrow_data, "table"]];
|
await NATSBridge.smartsend(
|
||||||
await NATSBridge.smartsend(
|
"/sensors/batch",
|
||||||
"/sensors/batch",
|
data,
|
||||||
data,
|
{
|
||||||
{
|
broker_url: this.broker_url,
|
||||||
broker_url: this.broker_url,
|
fileserver_url: this.fileserver_url
|
||||||
fileserver_url: this.fileserver_url
|
}
|
||||||
}
|
);
|
||||||
);
|
|
||||||
} else {
|
|
||||||
const data = [["batch", arrow_data, "table"]];
|
|
||||||
await NATSBridge.smartsend(
|
|
||||||
"/sensors/batch",
|
|
||||||
data,
|
|
||||||
{
|
|
||||||
broker_url: this.broker_url,
|
|
||||||
fileserver_url: this.fileserver_url
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1109,7 +1086,7 @@ class SensorSender:
|
|||||||
arrow_data = buf.getvalue()
|
arrow_data = buf.getvalue()
|
||||||
|
|
||||||
# Send based on size (auto-selected by smartsend)
|
# Send based on size (auto-selected by smartsend)
|
||||||
data = [("batch", arrow_data, "table")]
|
data = [("batch", arrow_data, "arrowtable")]
|
||||||
await smartsend(
|
await smartsend(
|
||||||
"/sensors/batch",
|
"/sensors/batch",
|
||||||
data,
|
data,
|
||||||
@@ -1152,7 +1129,7 @@ function send_batch_readings(sender::SensorSender, readings::Vector{Tuple{String
|
|||||||
# Send as single message
|
# Send as single message
|
||||||
smartsend(
|
smartsend(
|
||||||
"/sensors/batch",
|
"/sensors/batch",
|
||||||
[("batch", arrow_data, "table")],
|
[("batch", arrow_data, "arrowtable")],
|
||||||
broker_url=sender.broker_url
|
broker_url=sender.broker_url
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
@@ -1193,7 +1170,7 @@ async function sendBatchReadings(sender, readings) {
|
|||||||
const arrow_data = new Uint8Array(buffer);
|
const arrow_data = new Uint8Array(buffer);
|
||||||
|
|
||||||
// Send as single message
|
// Send as single message
|
||||||
const data = [["batch", arrow_data, "table"]];
|
const data = [["batch", arrow_data, "arrowtable"]];
|
||||||
await NATSBridge.smartsend(
|
await NATSBridge.smartsend(
|
||||||
"/sensors/batch",
|
"/sensors/batch",
|
||||||
data,
|
data,
|
||||||
|
|||||||
@@ -71,8 +71,9 @@ def _serialize_data(data: Any, payload_type: str) -> bytes:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
data: Data to serialize (string for "text", JSON-serializable for "dictionary",
|
data: Data to serialize (string for "text", JSON-serializable for "dictionary",
|
||||||
table-like for "table", binary for "image", "audio", "video", "binary")
|
table-like for "arrowtable"/"jsontable", binary for "image", "audio", "video", "binary")
|
||||||
payload_type: Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
payload_type: Target format: "text", "dictionary", "arrowtable", "jsontable",
|
||||||
|
"image", "audio", "video", "binary"
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Binary representation of the serialized data
|
Binary representation of the serialized data
|
||||||
@@ -80,7 +81,8 @@ def _serialize_data(data: Any, payload_type: str) -> bytes:
|
|||||||
Raises:
|
Raises:
|
||||||
Error: If payload_type is not one of the supported types
|
Error: If payload_type is not one of the supported types
|
||||||
Error: If payload_type is "image", "audio", or "video" but data is not bytes
|
Error: If payload_type is "image", "audio", or "video" but data is not bytes
|
||||||
Error: If payload_type is "table" but data is not a pandas DataFrame or pyarrow Table
|
Error: If payload_type is "arrowtable" but data is not a pandas DataFrame or pyarrow Table
|
||||||
|
Error: If payload_type is "jsontable" but data is not a list of dicts
|
||||||
"""
|
"""
|
||||||
if payload_type == 'text':
|
if payload_type == 'text':
|
||||||
if isinstance(data, str):
|
if isinstance(data, str):
|
||||||
@@ -90,9 +92,9 @@ def _serialize_data(data: Any, payload_type: str) -> bytes:
|
|||||||
elif payload_type == 'dictionary':
|
elif payload_type == 'dictionary':
|
||||||
json_str = json.dumps(data)
|
json_str = json.dumps(data)
|
||||||
return json_str.encode('utf-8')
|
return json_str.encode('utf-8')
|
||||||
elif payload_type == 'table':
|
elif payload_type == 'arrowtable':
|
||||||
if not ARROW_AVAILABLE:
|
if not ARROW_AVAILABLE:
|
||||||
raise RuntimeError('pyarrow not available for table serialization')
|
raise RuntimeError('pyarrow not available for arrowtable serialization')
|
||||||
|
|
||||||
import io
|
import io
|
||||||
buf = io.BytesIO()
|
buf = io.BytesIO()
|
||||||
@@ -110,7 +112,14 @@ def _serialize_data(data: Any, payload_type: str) -> bytes:
|
|||||||
sink.close()
|
sink.close()
|
||||||
return buf.getvalue()
|
return buf.getvalue()
|
||||||
else:
|
else:
|
||||||
raise ValueError('Table data must be a pandas DataFrame or pyarrow Table')
|
raise ValueError('Arrow table data must be a pandas DataFrame or pyarrow Table')
|
||||||
|
elif payload_type == 'jsontable':
|
||||||
|
# Serialize list of dicts to JSON format
|
||||||
|
if isinstance(data, list) and all(isinstance(row, dict) for row in data):
|
||||||
|
json_str = json.dumps(data)
|
||||||
|
return json_str.encode('utf-8')
|
||||||
|
else:
|
||||||
|
raise ValueError('JSON table data must be a list of dicts')
|
||||||
elif payload_type == 'image':
|
elif payload_type == 'image':
|
||||||
if isinstance(data, (bytes, bytearray)):
|
if isinstance(data, (bytes, bytearray)):
|
||||||
return bytes(data)
|
return bytes(data)
|
||||||
@@ -141,12 +150,13 @@ def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> An
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
data: Serialized data as bytes
|
data: Serialized data as bytes
|
||||||
payload_type: Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
|
payload_type: Data type ("text", "dictionary", "arrowtable", "jsontable",
|
||||||
|
"image", "audio", "video", "binary")
|
||||||
correlation_id: Correlation ID for logging
|
correlation_id: Correlation ID for logging
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary",
|
Deserialized data (String for "text", DataFrame for "arrowtable",
|
||||||
bytes for "image", "audio", "video", "binary")
|
Vector{Dict} for "jsontable"/"dictionary", bytes for "image", "audio", "video", "binary")
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
Error: If payload_type is not one of the supported types
|
Error: If payload_type is not one of the supported types
|
||||||
@@ -156,14 +166,18 @@ def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> An
|
|||||||
elif payload_type == 'dictionary':
|
elif payload_type == 'dictionary':
|
||||||
json_str = data.decode('utf-8')
|
json_str = data.decode('utf-8')
|
||||||
return json.loads(json_str)
|
return json.loads(json_str)
|
||||||
elif payload_type == 'table':
|
elif payload_type == 'arrowtable':
|
||||||
if not ARROW_AVAILABLE:
|
if not ARROW_AVAILABLE:
|
||||||
raise RuntimeError('pyarrow not available for table deserialization')
|
raise RuntimeError('pyarrow not available for arrowtable deserialization')
|
||||||
|
|
||||||
import io
|
import io
|
||||||
buf = io.BytesIO(data)
|
buf = io.BytesIO(data)
|
||||||
reader = ipc.open_file(buf)
|
reader = ipc.open_file(buf)
|
||||||
return reader.read_all().to_pandas()
|
return reader.read_all().to_pandas()
|
||||||
|
elif payload_type == 'jsontable':
|
||||||
|
# Deserialize JSON to list of dicts
|
||||||
|
json_str = data.decode('utf-8')
|
||||||
|
return json.loads(json_str)
|
||||||
elif payload_type == 'image':
|
elif payload_type == 'image':
|
||||||
return data
|
return data
|
||||||
elif payload_type == 'audio':
|
elif payload_type == 'audio':
|
||||||
@@ -317,7 +331,7 @@ class NATSClient:
|
|||||||
self._client = nats.connect(self.url)
|
self._client = nats.connect(self.url)
|
||||||
await self._client
|
await self._client
|
||||||
else:
|
else:
|
||||||
raise Error('nats-py not available')
|
raise RuntimeError('nats-py not available')
|
||||||
return self._client
|
return self._client
|
||||||
|
|
||||||
async def publish(self, subject: str, message: str, correlation_id: str = "") -> None:
|
async def publish(self, subject: str, message: str, correlation_id: str = "") -> None:
|
||||||
@@ -397,12 +411,19 @@ def _build_payload(
|
|||||||
Returns:
|
Returns:
|
||||||
Payload object
|
Payload object
|
||||||
"""
|
"""
|
||||||
|
# Determine encoding based on payload type (matching Julia/JS implementation)
|
||||||
|
encoding = 'base64'
|
||||||
|
if payload_type == 'jsontable':
|
||||||
|
encoding = 'json'
|
||||||
|
elif payload_type == 'arrowtable':
|
||||||
|
encoding = 'arrow-ipc'
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'id': str(uuid.uuid4()),
|
'id': str(uuid.uuid4()),
|
||||||
'dataname': dataname,
|
'dataname': dataname,
|
||||||
'payload_type': payload_type,
|
'payload_type': payload_type,
|
||||||
'transport': transport,
|
'transport': transport,
|
||||||
'encoding': 'base64' if transport == 'direct' else 'none',
|
'encoding': encoding,
|
||||||
'size': len(payload_bytes),
|
'size': len(payload_bytes),
|
||||||
'data': data,
|
'data': data,
|
||||||
'metadata': {'payload_bytes': len(payload_bytes)} if transport == 'direct' else {}
|
'metadata': {'payload_bytes': len(payload_bytes)} if transport == 'direct' else {}
|
||||||
@@ -476,7 +497,7 @@ async def smartsend(
|
|||||||
data: List of (dataname, data, type) tuples to send
|
data: List of (dataname, data, type) tuples to send
|
||||||
- dataname: Name of the payload
|
- dataname: Name of the payload
|
||||||
- data: The actual data to send
|
- data: The actual data to send
|
||||||
- type: Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
- type: Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||||
broker_url: URL of the NATS server
|
broker_url: URL of the NATS server
|
||||||
fileserver_url: URL of the HTTP file server for large payloads
|
fileserver_url: URL of the HTTP file server for large payloads
|
||||||
fileserver_upload_handler: Function to handle fileserver uploads (must return Dict with "status",
|
fileserver_upload_handler: Function to handle fileserver uploads (must return Dict with "status",
|
||||||
@@ -514,14 +535,21 @@ async def smartsend(
|
|||||||
>>> data2 = [1, 2, 3, 4, 5]
|
>>> data2 = [1, 2, 3, 4, 5]
|
||||||
>>> env, env_json_str = await smartsend(
|
>>> env, env_json_str = await smartsend(
|
||||||
... "my.subject",
|
... "my.subject",
|
||||||
... [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")]
|
... [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")]
|
||||||
... )
|
... )
|
||||||
>>>
|
>>>
|
||||||
>>> # Send a large array using fileserver upload
|
>>> # Send a large array using fileserver upload
|
||||||
>>> data = list(range(10_000_000)) # ~80 MB
|
>>> data = list(range(10_000_000)) # ~80 MB
|
||||||
>>> env, env_json_str = await smartsend(
|
>>> env, env_json_str = await smartsend(
|
||||||
... "large.data",
|
... "large.data",
|
||||||
... [("large_table", data, "table")]
|
... [("large_table", data, "arrowtable")]
|
||||||
|
... )
|
||||||
|
>>>
|
||||||
|
>>> # Send jsontable (JSON format for human-readable tabular data)
|
||||||
|
>>> users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
|
||||||
|
>>> env, env_json_str = await smartsend(
|
||||||
|
... "json.data",
|
||||||
|
... [("users", users, "jsontable")]
|
||||||
... )
|
... )
|
||||||
>>>
|
>>>
|
||||||
>>> # Mixed content (e.g., chat with text and image)
|
>>> # Mixed content (e.g., chat with text and image)
|
||||||
|
|||||||
Reference in New Issue
Block a user