13 Commits

Author SHA1 Message Date
6d327967b1 update readme 2026-05-23 20:37:01 +07:00
4e8b7faead update readme 2026-05-23 20:10:37 +07:00
efd77937a2 update readme 2026-05-23 14:29:10 +07:00
9c9493eaa0 update readme 2026-05-23 13:47:29 +07:00
f06fbce486 update 2026-05-23 13:07:37 +07:00
c919f9585d update readme 2026-05-23 12:29:18 +07:00
bfb7acd55b update readme 2026-05-23 12:20:17 +07:00
e74d0a3301 update readme 2026-05-23 12:15:27 +07:00
a4f450386c update 2026-05-23 12:02:18 +07:00
15a7f1c178 update readme 2026-05-23 10:57:01 +07:00
31727f3337 update 2026-05-23 06:17:13 +07:00
cc52ef1bda update 2026-05-23 06:16:16 +07:00
ton
a1fcd86a74 Merge pull request 'v1.0.0' (#1) from v1.0.0 into main
Reviewed-on: #1
2026-05-22 22:16:16 +00:00
6 changed files with 455 additions and 122 deletions

View File

@@ -2,7 +2,7 @@
julia_version = "1.12.6"
manifest_format = "2.0"
project_hash = "6757ef801c2fba25b1829ffc7ce99f19563e7dc4"
project_hash = "ec31595f278190cb6cfb8b50156867ebf16234d0"
[[deps.AliasTables]]
deps = ["PtrArrays", "Random"]
@@ -829,7 +829,7 @@ uuid = "a9144af2-ca23-56d9-984f-0d03f7b5ccf8"
version = "1.0.21+0"
[[deps.msghandler]]
deps = ["Arrow", "Base64", "DataFrames", "Dates", "HTTP", "JSON", "NATS", "PrettyPrinting", "Revise", "UUIDs"]
deps = ["Arrow", "Base64", "DataFrames", "Dates", "GeneralUtils", "HTTP", "JSON", "NATS", "PrettyPrinting", "Revise", "UUIDs"]
path = "."
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
version = "0.5.6"

View File

@@ -19,5 +19,5 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
[compat]
Base64 = "1.11.0"
Dates = "1.11.0"
GeneralUtils = "0.3.1"
GeneralUtils = "0.3 - 0.3.1"
JSON = "1.4.0"

533
README.md
View File

@@ -1,9 +1,8 @@
# msghandler - Cross-Platform Bi-Directional Data Bridge
# msghandler - Cross-Platform Communication Layer
A high-performance, bi-directional data bridge for **Julia**, **JavaScript**, **Python**, and **MicroPython** applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
A high-performance, **transport-agnostic** communication layer for **Julia**, **JavaScript**, **Python**, and **MicroPython** applications. Implements the Claim-Check pattern for efficient payload transport (direct for small payloads, URL-based for large payloads).
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![NATS](https://img.shields.io/badge/NATS-Enabled-green.svg)](https://nats.io)
---
@@ -14,7 +13,6 @@ A high-performance, bi-directional data bridge for **Julia**, **JavaScript**, **
- [Features](#features)
- [Quick Start](#quick-start)
- [API Reference](#api-reference)
- [Payload Types](#payload-types)
- [Cross-Platform Examples](#cross-platform-examples)
- [Testing](#testing)
- [Documentation](#documentation)
@@ -22,14 +20,81 @@ A high-performance, bi-directional data bridge for **Julia**, **JavaScript**, **
---
## Quick Data Format Reference
All platforms use the same input format for `smartpack()`:
### Format: `[(dataname, data, type), ...]`
| Element | Type | Description |
|---------|------|-------------|
| `dataname` | String | Name for the payload (e.g., `"message"`, `"config.json"`) |
| `data` | Any | The actual data (type depends on the `type` parameter below) |
| `type` | String | Payload type identifier |
### Supported Payload Types
| Type | Julia Data | JavaScript Data | Python Data | Description |
|------|------------|-----------------|-------------|-------------|
| `"text"` | `String` | `string` | `str` | Plain text |
| `"dictionary"` | `Dict` | `Object` | `dict` | JSON object |
| `"arrowtable"` | `DataFrame` | ❌ | `pandas.DataFrame` | Arrow IPC table (Desktop only) |
| `"jsontable"` | `DataFrame` | `Array<Object>` | `list[dict]` | JSON table |
| `"image"` | `Vector{UInt8}` | `Uint8Array` | `bytes` | Image data |
| `"audio"` | `Vector{UInt8}` | `Uint8Array` | `bytes` | Audio data |
| `"video"` | `Vector{UInt8}` | `Uint8Array` | `bytes` | Video data |
| `"binary"` | `Vector{UInt8}` | `Uint8Array` | `bytes` | Binary data |
### Examples
**Sending multiple payloads:**
```julia
# Julia
data = [
("message", "Hello!", "text"),
("config", Dict("key" => "value"), "dictionary"),
("file", file_bytes, "binary")
]
env, json_str = msghandler.smartpack("subject", data, options...)
```
```python
# Python
data = [
("message", "Hello!", "text"),
("config", {"key": "value"}, "dictionary"),
("file", file_bytes, "binary")
]
env, json_str = await smartpack("subject", data, **options)
```
```javascript
// JavaScript
const data = [
["message", "Hello!", "text"],
["config", {key: "value"}, "dictionary"],
["file", fileBytes, "binary"]
];
const [env, json_str] = await smartpack("subject", data, options);
```
**Important Notes:**
- Always wrap payloads in a list `[]`, even for single payloads
- Type must be one of the supported types above
- Large payloads (≥500KB) automatically use link transport
- Browser only supports: `"text"`, `"dictionary"`, `"jsontable"`, `"image"`, `"audio"`, `"video"`, `"binary"`
(No Arrow IPC due to browser incompatibility)
---
## Overview
msghandler enables seamless communication across multiple platforms through NATS, with intelligent transport selection based on payload size:
msghandler provides a **transport-agnostic** communication layer with intelligent payload transport selection:
| Transport | Payload Size | Method |
|-----------|--------------|--------|
| **Direct** | < 500KB | Sent directly via NATS (Base64 encoded) |
| **Link** | ≥ 500KB | Uploaded to HTTP file server, URL sent via NATS |
| **Direct** | < 500KB | Sent directly via chosen transport (Base64 encoded) |
| **Link** | ≥ 500KB | Uploaded to HTTP file server, URL sent via chosen transport |
### Use Cases
@@ -37,6 +102,14 @@ msghandler enables seamless communication across multiple platforms through NATS
- **File Transfer**: Efficient transfer of large files using claim-check pattern
- **IoT/Embedded**: Sensor data, telemetry, and analytics pipelines (MicroPython)
- **Cross-Platform Communication**: Interoperability between Julia, JavaScript, Python, and MicroPython systems
- **Any Transport**: Works with NATS, HTTP, WebSockets, WebRTC, or any custom transport mechanism
### Key Design Principles
- **Transport Agnostic**: Core API (`smartpack`/`smartunpack`) is decoupled from transport
- **Claim-Check Pattern**: Efficient handling of large payloads via URL references
- **Type System**: Rich payload types (text, dictionary, arrowtable, jsontable, binary)
- **Cross-Platform**: Unified API across Julia, JavaScript, Python, and MicroPython
---
@@ -46,7 +119,7 @@ msghandler enables seamless communication across multiple platforms through NATS
|----------|----------------|----------|
| **Julia** | [`src/msghandler.jl`](src/msghandler.jl) | Full feature set, Arrow IPC, multiple dispatch |
| **JavaScript (Node.js)** | [`src/msghandler_ssr.js`](src/msghandler_ssr.js) | Node.js, async/await, Arrow IPC |
| **JavaScript (Browser)** | [`src/msghandler_csr.js`](src/msghandler_csr.js) | Browser, WebSocket NATS, async/await, JSON table only |
| **JavaScript (Browser)** | [`src/msghandler_csr.js`](src/msghandler_csr.js) | Browser, async/await, JSON table only |
| **Python** | [`src/msghandler.py`](src/msghandler.py) | Desktop Python, asyncio, type hints, Arrow IPC |
| **MicroPython** | [`src/msghandler_mpy.py`](src/msghandler_mpy.py) | Memory-constrained, synchronous API |
@@ -63,23 +136,21 @@ msghandler enables seamless communication across multiple platforms through NATS
| Link Transport | ✅ | ✅ | ✅ | ✅ | ⚠️ (Limited) |
| Handler Functions | ✅ | ✅ | ✅ | ✅ | ✅ |
| Cross-Platform API | ✅ | ✅ | ✅ | ✅ | ✅ |
| WebSocket NATS | ❌ | ❌ | ✅ | ❌ | ❌ |
---
## Features
-**Cross-platform messaging** for Julia, JavaScript, Python, and MicroPython applications
-**Bi-directional messaging** with request-reply patterns
-**Multi-payload support** - send multiple payloads with different types in one message
-**Automatic transport selection** - direct vs link based on payload size
-**Claim-Check pattern** for payloads ≥ 500KB
-**Apache Arrow IPC** support for tabular data (Desktop: Julia/Python/Node.js)
-**JSON Table** support for tabular data (All platforms including Browser)
-**Exponential backoff** for reliable file server downloads
-**Correlation ID tracking** for message tracing
-**Reply-to support** for request-response patterns
-**Handler function abstraction** - pluggable file server implementations (Plik, AWS S3, custom)
-**Transport-agnostic** - Core API works with any communication channel (NATS, HTTP, WebSockets, etc.)
-**Cross-platform** - Unified API for Julia, JavaScript, Python, and MicroPython
-**Bi-directional** - Request-reply patterns with `reply_to` support
-**Multi-payload** - Send multiple payloads of different types in one message
-**Automatic transport selection** - Direct vs link based on payload size (threshold: 500KB)
-**Claim-Check pattern** - Efficient large payload handling via URL references
-**Rich payload types** - Text, dictionary, Arrow IPC, JSON table, image, audio, video, binary
-**Exponential backoff** - Reliable file server downloads with retry logic
-**Correlation ID** - End-to-end message tracing
-**Handler abstraction** - Pluggable file server implementations (Plik, AWS S3, custom)
---
@@ -87,17 +158,17 @@ msghandler enables seamless communication across multiple platforms through NATS
### Prerequisites
1. **NATS Server** - Install and run a NATS server:
```bash
docker run -p 4222:4222 nats:latest
```
1. **Transport Channel** - Choose your communication channel:
- **NATS** (recommended): `docker run -p 4222:4222 nats:latest`
- **HTTP/WebSocket**: Any HTTP server or WebSocket endpoint
- **Custom**: Implement your own transport mechanism
2. **HTTP File Server** (optional, for large payloads) - Install and run a file server:
2. **HTTP File Server** (optional, for payloads ≥ 500KB) - Install and run:
```bash
# Using Plik
docker run -p 8080:8080 -v /tmp/fileserver:/var/lib/plik -e PLIK_ADMIN_PASSWORD=admin plik/plik
# OR using simple Python HTTP server
# OR using Python HTTP server
mkdir -p /tmp/fileserver
python3 -m http.server 8080 --directory /tmp/fileserver
```
@@ -107,11 +178,38 @@ msghandler enables seamless communication across multiple platforms through NATS
#### Julia
```julia
using msghandler
using msghandler, NATS
data = [("message", "Hello World", "text")]
env, env_json_str = smartpack("/chat/room1", data; broker_url="nats://localhost:4222")
println("Message sent!")
# Data format: [(dataname, data, type), ...]
# Each tuple contains:
# 1. dataname: String name for the payload (e.g., "message")
# 2. data: The actual data (String, bytes, DataFrame, etc.)
# 3. type: String type indicator (e.g., "text", "binary", "dictionary")
# Supported types: "text", "dictionary", "arrowtable", "jsontable",
# "image", "audio", "video", "binary"
payload_1 = ("test_message", "Hello World", "text")
file_path_large_image = "./test/large_image.png"
file_data_large_image = read(file_path_large_image)
filename_large_image = basename(file_path_large_image)
payload_2 = (filename_large_image, file_data_large_image, "binary")
payloads = [payload_1, payload_2] # List of tuples
# Step 1: Create the message envelope (transport-agnostic)
envelope, envelope_json_str = msghandler.smartpack("test.topic",
payloads;
broker_url="nats.yiem.cc",
fileserver_url="http://192.168.88.104:8080")
# Step 2: Send via your chosen transport (NATS in this example)
conn = NATS.connect("nats.yiem.cc")
NATS.publish(conn, "test.topic", envelope_json_str; reply_to="test.replytopic")
NATS.drain(conn)
# OR
reply = NATS.request(conn, "test.topic", envelope_json_str, timeout=10)
```
#### JavaScript (Node.js)
@@ -147,13 +245,24 @@ console.log("Message sent!");
```python
from msghandler import smartpack
# Data format: [(dataname, data, type), ...]
# Each tuple contains:
# 1. dataname: String name for the payload (e.g., "message")
# 2. data: The actual data (str, bytes, dict, list, etc.)
# 3. type: String type indicator (e.g., "text", "binary", "dictionary")
# Supported types: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
data = [("message", "Hello World", "text")]
env, env_json_str = await smartpack(
"/chat/room1",
data,
broker_url="nats://localhost:4222"
broker_url="nats://localhost:4222",
fileserver_url="http://localhost:8080"
)
print("Message sent!")
print("Envelope created:", env)
# To send via transport (e.g., NATS, HTTP, WebSocket):
# transport_send_function("/chat/room1", env_json_str)
```
#### MicroPython
@@ -166,9 +275,37 @@ env, env_json_str = smartpack(
"/chat/room1",
data,
broker_url="nats://localhost:4222",
size_threshold=100000 # 100KB for MicroPython
fileserver_url="http://localhost:8080",
size_threshold=100000
)
print("Message sent!")
print("Envelope created:", env)
# To send via transport (e.g., HTTP POST, WebSocket):
# transport_send_function("/chat/room1", env_json_str)
```
### Receive Your First Message
#### Julia
```julia
using msghandler, NATS
# Step 1: Receive message from your transport (NATS in this example)
conn = NATS.connect("nats.yiem.cc")
NATS.subscribe(conn, "test.topic") do msg
println("Received message on $(msg.subject)")
envelope_json_str = String(msg.payload)
# Step 2: Unpack the envelope (transport-agnostic)
envelope = msghandler.smartunpack(
envelope_json_str;
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
println(envelope.payloads[1])
end
```
---
@@ -177,14 +314,77 @@ print("Message sent!")
### Unified API Standard
All platforms use the same input/output format for payloads:
All platforms use the same input/output format for payloads.
**Input format for `smartpack`:**
```
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
#### Input Format for `smartpack`
**Format:** `[(dataname1, data1, type1), (dataname2, data2, type2), ...]`
**Each tuple contains 3 elements:**
1. **`dataname`** (String) - Name for the payload (e.g., `"message"`, `"config.json"`, `"avatar.png"`)
2. **`data`** (Any) - The actual data to send (type depends on `type`):
- `text` → String
- `dictionary` → Dict/Object
- `arrowtable` → DataFrame/Arrow.Table
- `jsontable` → List of dicts/Vector{Dict}
- `image/audio/video/binary` → bytes/Uint8Array/Vector{UInt8}
3. **`type`** (String) - Payload type (required):
- `"text"` - Plain text string
- `"dictionary"` - JSON-serializable dictionary
- `"arrowtable"` - Apache Arrow IPC table (Desktop only)
- `"jsontable"` - JSON table (all platforms)
- `"image"` - Image data (PNG, JPG)
- `"audio"` - Audio data (WAV, MP3)
- `"video"` - Video data (MP4, AVI)
- `"binary"` - Generic binary data
**Important:**
- Always wrap payloads in a list, even for single payloads
- Type must be one of the supported types above
- Large payloads (≥500KB by default) automatically use link transport
**Example:**
```julia
# Julia
data = [
("message", "Hello World", "text"),
("config", Dict("key" => "value"), "dictionary"),
("file", file_bytes, "binary")
]
```
**Output format for `smartunpack`:**
```python
# Python
data = [
("message", "Hello World", "text"),
("config", {"key": "value"}, "dictionary"),
("file", file_bytes, "binary")
]
```
```javascript
// JavaScript
const data = [
["message", "Hello World", "text"],
["config", {key: "value"}, "dictionary"],
["file", fileBytes, "binary"]
];
```
### Common Payload Examples
| Use Case | Data Format | Type |
|----------|-------------|------|
| Simple text message | `("message", "Hello World", "text")` | `"text"` |
| Configuration JSON | `("config", Dict("key" => "value"), "dictionary")` | `"dictionary"` |
| Table as Arrow | `("data", dataframe, "arrowtable")` | `"arrowtable"` |
| Table as JSON | `("data", [{"col": 1}], "jsontable")` | `"jsontable"` |
| Image file | `("avatar.png", read("file.png"), "image")` | `"image"` |
| Audio file | `("audio.mp3", audio_bytes, "audio")` | `"audio"` |
| Video file | `("video.mp4", video_bytes, "video")` | `"video"` |
| Binary file | `("data.bin", file_bytes, "binary")` | `"binary"` |
#### Output Format for `smartunpack`
```json
{
"correlation_id": "...",
@@ -206,7 +406,7 @@ All platforms use the same input/output format for payloads:
### smartpack
Sends data either directly via NATS or via a fileserver URL, depending on payload size.
Sends data via your chosen transport mechanism (NATS, HTTP, WebSocket, etc.) with intelligent transport selection (direct vs URL-based) based on payload size.
#### Julia
@@ -323,16 +523,16 @@ env, env_json_str = await msghandler.smartpack(
#### MicroPython
```python
from msghandler import msghandler
from msghandler import smartpack
# Limited to direct transport (< 100KB threshold)
env, env_json_str = msghandler.smartpack(
subject,
data, # List of (dataname, data, type) tuples
env, env_json_str = smartpack(
"/device/config",
[("config", config, "dictionary")],
broker_url="nats://localhost:4222",
size_threshold=100000 # Lower threshold for memory constraints
size_threshold=100000
)
# Returns: Tuple[Dict, str]
print("Message sent!")
```
### smartunpack
@@ -406,35 +606,20 @@ env = await msghandler.smartunpack(
#### MicroPython
```python
from msghandler import msghandler
from msghandler import smartunpack
env = msghandler.smartunpack(
env = smartunpack(
msg,
fileserver_download_handler=_sync_fileserver_download,
max_retries=3,
base_delay=100,
max_delay=1000
)
# Returns: Dict with "payloads" key
print("Received payloads:", env["payloads"])
```
---
## Payload Types
| Type | Julia | JavaScript | Python | MicroPython | Description |
|------|-------|------------|--------|-------------|-------------|
| `text` | `String` | `string` | `str` | `str` | Plain text strings |
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | JSON-serializable dictionaries |
| `arrowtable` | `DataFrame`, `Arrow.Table` | ❌ (Browser), ✅ (Node.js) | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) |
| `jsontable` | `DataFrame`, `Vector{NamedTuple}` | `Array<Object>` | `list[dict]` | ⚠️ | Tabular data (JSON) - **Only table type in Browser** |
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Image data (PNG, JPG) |
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Audio data (WAV, MP3) |
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Video data (MP4, AVI) |
| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` | Generic binary data |
---
## Cross-Platform Examples
### Example 1: Chat with Mixed Content
@@ -480,33 +665,63 @@ import msghandler from './src/msghandler_csr.js';
const data = [
["message_text", "Hello!", "text"],
["user_avatar", imageData, "image"],
["large_document", largeFileData, "binary"]
["user_avatar", image_data, "image"]
];
const [env, env_json_str] = await msghandler.smartpack(
"/chat/room1",
data,
{ broker_url: 'ws://localhost:4222', fileserver_url: 'http://localhost:8080' }
{ broker_url: 'ws://localhost:4222' }
);
console.log("Message sent!");
```
#### Python
```python
from msghandler import msghandler
import asyncio
from msghandler import smartpack
data = [
async def main():
image_data = open("user_avatar.png", "rb").read()
large_file_data = open("large_document.pdf", "rb").read()
data = [
("message_text", "Hello!", "text"),
("user_avatar", image_data, "image"),
("large_document", large_file_data, "binary")
]
]
env, env_json_str = await msghandler.smartpack(
env, env_json_str = await smartpack(
"/chat/room1",
data,
fileserver_url="http://localhost:8080"
)
print("Message sent!")
if __name__ == "__main__":
asyncio.run(main())
```
#### MicroPython
```python
from msghandler import smartpack
# Note: MicroPython only supports direct transport (< 100KB threshold)
# Large files must be uploaded via file server first
data = [
("message_text", "Hello!", "text"),
("user_avatar", image_bytes, "image")
]
env, env_json_str = smartpack(
"/chat/room1",
data,
broker_url="nats://localhost:4222",
size_threshold=100000
)
print("Message sent!")
```
### Example 2: Dictionary Exchange
@@ -543,12 +758,34 @@ const [env, env_json_str] = await msghandler.smartpack(
"/device/config",
[["config", config, "dictionary"]]
);
console.log("Message sent!");
```
#### Python
```python
from msghandler import msghandler
import asyncio
from msghandler import smartpack
async def main():
config = {
"wifi_ssid": "MyNetwork",
"wifi_password": "password123",
"update_interval": 60
}
data = [("config", config, "dictionary")]
env, env_json_str = await smartpack("/device/config", data)
print("Message sent!")
if __name__ == "__main__":
asyncio.run(main())
```
#### MicroPython
```python
from msghandler import smartpack
config = {
"wifi_ssid": "MyNetwork",
@@ -557,7 +794,13 @@ config = {
}
data = [("config", config, "dictionary")]
env, env_json_str = await msghandler.smartpack("/device/config", data)
env, env_json_str = smartpack(
"/device/config",
data,
broker_url="nats://localhost:4222",
size_threshold=100000
)
print("Message sent!")
```
### Example 3: Table Data (Arrow IPC)
@@ -618,8 +861,6 @@ env, env_json_str = await msghandler.smartpack("/data/analysis", data)
```javascript
import msghandler from './src/msghandler_csr.js';
// Browser uses jsontable (JSON array of objects) instead of arrowtable
// Apache Arrow is not compatible with browsers
const df = [
{ id: 1, name: "Alice", score: 95 },
{ id: 2, name: "Bob", score: 88 },
@@ -628,9 +869,33 @@ const df = [
const [env, env_json_str] = await msghandler.smartpack(
"/data/analysis",
[["students", df, "jsontable"]], // Use jsontable for browser
[["students", df, "jsontable"]],
{ broker_url: 'ws://localhost:4222' }
);
console.log("Message sent!");
```
#### MicroPython
```python
from msghandler import smartpack
# Note: MicroPython only supports direct transport (< 100KB threshold)
# MicroPython doesn't support Arrow IPC, only jsontable
df = [
{"id": 1, "name": "Alice", "score": 95},
{"id": 2, "name": "Bob", "score": 88},
{"id": 3, "name": "Charlie", "score": 92}
]
data = [("students", df, "jsontable")]
env, env_json_str = smartpack(
"/data/analysis",
data,
broker_url="nats://localhost:4222",
size_threshold=100000
)
print("Message sent!")
```
### Example 4: Request-Response Pattern
@@ -640,32 +905,44 @@ Bi-directional communication with reply-to support.
#### Julia
```julia
using msghandler
using msghandler, NATS
# Requester
env, env_json_str = smartpack(
env, env_json_str = msghandler.smartpack(
"/device/command",
[("command", Dict("action" => "read_sensor"), "dictionary")];
broker_url="nats://localhost:4222",
reply_to="/device/response"
)
conn = NATS.connect("nats://localhost:4222")
NATS.publish(conn, "/device/command", env_json_str)
NATS.drain(conn)
# Receiver (in separate application)
msg = NATS.subscription.next()
env = smartunpack(msg)
# Process request and send response
response_env, response_json = smartpack(
conn = NATS.connect("nats://localhost:4222")
NATS.subscribe(conn, "/device/command") do msg
env = msghandler.smartunpack(msg)
println("Received command: ", env["payloads"])
result = Dict("value" => 42)
response_env, response_json = msghandler.smartpack(
"/device/response",
[("result", Dict("value" => 42), "dictionary")],
[("result", result, "dictionary")],
reply_to="/device/command",
reply_to_msg_id=env["msg_id"]
)
)
NATS.publish(conn, "/device/response", response_json)
NATS.drain(conn)
end
```
#### JavaScript (Node.js)
```javascript
import msghandler from './src/msghandler_ssr.js';
import { connect } from 'nats';
// Requester
const [env, env_json_str] = await msghandler.smartpack(
@@ -673,11 +950,76 @@ const [env, env_json_str] = await msghandler.smartpack(
[["command", { action: "read_sensor" }, "dictionary"]],
{ broker_url: 'nats://localhost:4222', reply_to: '/device/response' }
);
const nc = await connect({ port: 4222 });
nc.publish("/device/command", env_json_str);
await nc.flush();
// Receiver (in separate application)
const nc = await connect({ port: 4222 });
const sub = nc.subscribe("/device/command");
for await (const msg of sub) {
const env = await msghandler.smartunpack(msg);
console.log("Received command:", env.payloads);
const response_env, response_json = await msghandler.smartpack(
"/device/response",
[["result", { value: 42 }, "dictionary"]],
{ reply_to: '/device/command', reply_to_msg_id: env.msg_id }
);
nc.publish("/device/response", response_json);
await nc.flush();
}
```
#### Python
```python
import asyncio
from msghandler import smartpack
async def main():
# Requester
env, env_json_str = await smartpack(
"/device/command",
[("command", {"action": "read_sensor"}, "dictionary")],
broker_url="nats://localhost:4222",
reply_to="/device/response"
)
print("Request sent!")
# Receiver (in separate application)
# await nats_consumer.next()
# env = await smartunpack(msg)
# Process request and send response
# response_env, response_json = await smartpack(
# "/device/response",
# [("result", {"value": 42}, "dictionary")],
# reply_to="/device/command",
# reply_to_msg_id=env["msg_id"]
# )
if __name__ == "__main__":
asyncio.run(main())
```
#### JavaScript (Browser)
```javascript
import msghandler from './src/msghandler_csr.js';
// Requester
const [env, env_json_str] = await msghandler.smartpack(
"/device/command",
[["command", { action: "read_sensor" }, "dictionary"]],
{ broker_url: 'ws://localhost:4222', reply_to: '/device/response' }
);
console.log("Request sent!");
// Receiver (in separate application)
// const msg = await natsConsumer.next();
// const env = await msghandler.smartunpack(msg);
// Process request and send response
// console.log("Received command:", env.payloads);
// const response_env, response_json = await msghandler.smartpack(
// "/device/response",
// [["result", { value: 42 }, "dictionary"]],
@@ -685,29 +1027,20 @@ const [env, env_json_str] = await msghandler.smartpack(
// );
```
#### Python
#### MicroPython
```python
from msghandler import msghandler
from msghandler import smartpack
# Requester
env, env_json_str = await msghandler.smartpack(
env, env_json_str = smartpack(
"/device/command",
[("command", {"action": "read_sensor"}, "dictionary")],
broker_url="nats://localhost:4222",
reply_to="/device/response"
reply_to="/device/response",
size_threshold=100000
)
# Receiver (in separate application)
# msg = await nats_consumer.next()
# env = await msghandler.smartunpack(msg)
# Process request and send response
# response_env, response_json = await msghandler.smartpack(
# "/device/response",
# [("result", {"value": 42}, "dictionary")],
# reply_to="/device/command",
# reply_to_msg_id=env["msg_id"]
# )
print("Request sent!")
```
---

View File

@@ -421,7 +421,7 @@ env, msg_json = smartpack("chat.subject", [
"""
function smartpack(
subject::String, # smartunpack's subject
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples. Use Tuple{String, Any, String}[] for empty payloads
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples. Use data = Tuple{String, Any, String}[] for empty payloads
broker_url::String = DEFAULT_BROKER_URL, # Broker URL
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver

View File

@@ -9,7 +9,7 @@
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64
# Include the bridge module
include("../src/msghandler.jl")
include("/home/ton/docker-apps/sommpanion/msghandler/src/msghandler.jl")
using .msghandler
# Configuration
@@ -39,7 +39,7 @@ function test_mix_receive()
# Use msghandler.smartunpack to handle the data
# API: smartunpack(msg, download_handler; max_retries, base_delay, max_delay)
result = msghandler.smartunpack(
msg;
env_json_str;
max_retries = 5,
base_delay = 100,
max_delay = 5000

View File

@@ -14,7 +14,7 @@
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64
# Include the bridge module
include("../src/msghandler.jl")
include("/home/ton/docker-apps/sommpanion/msghandler/src/msghandler.jl")
using .msghandler
# Configuration
@@ -204,7 +204,7 @@ function test_mix_send()
]
# Use smartpack with mixed content
sendinfo = msghandler.smartpack(
env, env_json_str = msghandler.smartpack(
SUBJECT,
payloads; # List of (dataname, data, type) tuples
broker_url = NATS_URL,
@@ -218,10 +218,8 @@ function test_mix_send()
receiver_id = "",
reply_to = "",
reply_to_msg_id = "",
is_publish = true # Publish the message to NATS
)
env, env_json_str = sendinfo
log_trace("Sent message with $(length(env.payloads)) payloads")
# Log transport type for each payload
@@ -243,6 +241,8 @@ function test_mix_send()
link_count = count(p -> p.transport == "link", env.payloads)
log_trace("Direct transport: $direct_count payloads")
log_trace("Link transport: $link_count payloads")
return env_json_str
end
@@ -252,7 +252,7 @@ println("Correlation ID: $correlation_id")
# Run sender
println("start smartpack for mixed content")
test_mix_send()
env_json_str = test_mix_send()
println("\nTest completed.")
println("Note: Run test_julia_to_julia_mix_receiver.jl to receive the messages.")