3 Commits

Author SHA1 Message Date
14cd4c0076 update doc 2026-03-24 08:55:06 +07:00
5191f1aae5 update 2026-03-23 16:15:15 +07:00
ton
c20a266e72 Merge pull request 'adopt_ASG_doc' (#12) from adopt_ASG_doc into v0.6.0-dev
Reviewed-on: #12
2026-03-23 08:00:20 +00:00
6 changed files with 1344 additions and 57 deletions

View File

@@ -52,9 +52,9 @@ version = "1.2.2"
[[deps.CSV]]
deps = ["CodecZlib", "Dates", "FilePathsBase", "InlineStrings", "Mmap", "Parsers", "PooledArrays", "PrecompileTools", "SentinelArrays", "Tables", "Unicode", "WeakRefStrings", "WorkerUtilities"]
git-tree-sha1 = "deddd8725e5e1cc49ee205a1964256043720a6c3"
git-tree-sha1 = "8d8e0b0f350b8e1c91420b5e64e5de774c2f0f4d"
uuid = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
version = "0.10.15"
version = "0.10.16"
[[deps.CodeTracking]]
deps = ["InteractiveUtils", "UUIDs"]
@@ -108,9 +108,9 @@ version = "1.3.0+1"
[[deps.ConcurrentUtilities]]
deps = ["Serialization", "Sockets"]
git-tree-sha1 = "d9d26935a0bcffc87d2613ce14c527c99fc543fd"
git-tree-sha1 = "21d088c496ea22914fe80906eb5bce65755e5ec8"
uuid = "f0e56b4a-5159-44fe-b623-3e5288b988bb"
version = "2.5.0"
version = "2.5.1"
[[deps.Crayons]]
git-tree-sha1 = "249fe38abf76d48563e2f4556bebd215aa317e15"
@@ -171,9 +171,9 @@ uuid = "f43a241f-c20a-4ad4-852c-f6b1247861c6"
version = "1.7.0"
[[deps.EnumX]]
git-tree-sha1 = "7bebc8aad6ee6217c78c5ddcf7ed289d65d0263e"
git-tree-sha1 = "c49898e8438c828577f04b92fc9368c388ac783c"
uuid = "4e289a0a-7415-4d19-859d-a7e5c4648b56"
version = "1.0.6"
version = "1.0.7"
[[deps.ExceptionUnwrapping]]
deps = ["Test"]
@@ -234,9 +234,9 @@ version = "0.3.1"
[[deps.HTTP]]
deps = ["Base64", "CodecZlib", "ConcurrentUtilities", "Dates", "ExceptionUnwrapping", "Logging", "LoggingExtras", "MbedTLS", "NetworkOptions", "OpenSSL", "PrecompileTools", "Random", "SimpleBufferStream", "Sockets", "URIs", "UUIDs"]
git-tree-sha1 = "5e6fe50ae7f23d171f44e311c2960294aaa0beb5"
git-tree-sha1 = "51059d23c8bb67911a2e6fd5130229113735fc7e"
uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3"
version = "1.10.19"
version = "1.11.0"
[[deps.HashArrayMappedTries]]
git-tree-sha1 = "2eaa69a7cab70a52b9687c8bf950a5a93ec895ae"
@@ -307,9 +307,9 @@ weakdeps = ["ArrowTypes"]
[[deps.JuliaInterpreter]]
deps = ["CodeTracking", "InteractiveUtils", "Random", "UUIDs"]
git-tree-sha1 = "80580012d4ed5a3e8b18c7cd86cebe4b816d17a6"
git-tree-sha1 = "3d3b79166e2a0afcf875df20db110af91ad3ab61"
uuid = "aa1ae85d-cabe-5617-a682-6adf51b2e16a"
version = "0.10.9"
version = "0.10.11"
[[deps.JuliaSyntaxHighlighting]]
deps = ["StyledStrings"]
@@ -383,9 +383,9 @@ version = "1.2.0"
[[deps.LoweredCodeUtils]]
deps = ["CodeTracking", "Compiler", "JuliaInterpreter"]
git-tree-sha1 = "65ae3db6ab0e5b1b5f217043c558d9d1d33cc88d"
git-tree-sha1 = "5d4278f755440f70648d80cc6225f51e78e94094"
uuid = "6f1432cf-f94c-5a45-995e-cdbf5db27b0b"
version = "3.5.0"
version = "3.5.1"
[[deps.Lz4_jll]]
deps = ["Artifacts", "JLLWrappers", "Libdl"]
@@ -400,9 +400,9 @@ version = "1.11.0"
[[deps.MbedTLS]]
deps = ["Dates", "MbedTLS_jll", "MozillaCACerts_jll", "NetworkOptions", "Random", "Sockets"]
git-tree-sha1 = "c067a280ddc25f196b5e7df3877c6b226d390aaf"
git-tree-sha1 = "8785729fa736197687541f7053f6d8ab7fc44f92"
uuid = "739be429-bea8-5141-9913-cc70e7f3736d"
version = "1.1.9"
version = "1.1.10"
[[deps.MbedTLS_jll]]
deps = ["Artifacts", "JLLWrappers", "Libdl"]
@@ -437,10 +437,10 @@ uuid = "55e73f9c-eeeb-467f-b4cc-a633fde63d2a"
version = "0.1.0"
[[deps.NATSBridge]]
deps = ["Arrow", "DataFrames", "Dates", "GeneralUtils", "HTTP", "JSON", "NATS", "PrettyPrinting", "Revise", "UUIDs"]
deps = ["Arrow", "Base64", "DataFrames", "Dates", "GeneralUtils", "HTTP", "JSON", "NATS", "PrettyPrinting", "Revise", "UUIDs"]
path = "."
uuid = "f2724d33-f338-4a57-b9f8-1be882570d10"
version = "0.4.1"
version = "0.5.6"
[[deps.NanoDates]]
deps = ["Dates", "Parsers"]
@@ -514,9 +514,9 @@ version = "1.3.3"
[[deps.Preferences]]
deps = ["TOML"]
git-tree-sha1 = "522f093a29b31a93e34eaea17ba055d850edea28"
git-tree-sha1 = "8b770b60760d4451834fe79dd483e318eee709c4"
uuid = "21216c6a-2e73-6563-6e65-726566657250"
version = "1.5.1"
version = "1.5.2"
[[deps.PrettyPrinting]]
git-tree-sha1 = "142ee93724a9c5d04d78df7006670a93ed1b244e"
@@ -525,9 +525,15 @@ version = "0.4.2"
[[deps.PrettyTables]]
deps = ["Crayons", "LaTeXStrings", "Markdown", "PrecompileTools", "Printf", "REPL", "Reexport", "StringManipulation", "Tables"]
git-tree-sha1 = "c5a07210bd060d6a8491b0ccdee2fa0235fc00bf"
git-tree-sha1 = "211530a7dc76ab59087f4d4d1fc3f086fbe87594"
uuid = "08abe8d2-0d0c-5749-adfa-8a2ac140af0d"
version = "3.1.2"
version = "3.2.3"
[deps.PrettyTables.extensions]
PrettyTablesTypstryExt = "Typstry"
[deps.PrettyTables.weakdeps]
Typstry = "f0ed7684-a786-439e-b1e3-3b82803b501e"
[[deps.Printf]]
deps = ["Unicode"]
@@ -535,9 +541,9 @@ uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7"
version = "1.11.0"
[[deps.PtrArrays]]
git-tree-sha1 = "1d36ef11a9aaf1e8b74dacc6a731dd1de8fd493d"
git-tree-sha1 = "4fbbafbc6251b883f4d2705356f3641f3652a7fe"
uuid = "43287f4e-b6f4-7ad1-bb20-aadabca52c3d"
version = "1.3.0"
version = "1.4.0"
[[deps.QuadGK]]
deps = ["DataStructures", "LinearAlgebra"]
@@ -568,9 +574,9 @@ version = "1.2.2"
[[deps.Revise]]
deps = ["CodeTracking", "FileWatching", "InteractiveUtils", "JuliaInterpreter", "LibGit2", "LoweredCodeUtils", "OrderedCollections", "Preferences", "REPL", "UUIDs"]
git-tree-sha1 = "14d1bfb0a30317edc77e11094607ace3c800f193"
git-tree-sha1 = "d97d78d4fc5f858d8ce44f6b88bc972f2023f51d"
uuid = "295af30f-e4ad-537b-8983-00126c2a3abe"
version = "3.13.2"
version = "3.14.0"
[deps.Revise.extensions]
DistributedExt = "Distributed"
@@ -596,9 +602,9 @@ version = "0.7.0"
[[deps.ScopedValues]]
deps = ["HashArrayMappedTries", "Logging"]
git-tree-sha1 = "c3b2323466378a2ba15bea4b2f73b081e022f473"
git-tree-sha1 = "ac4b837d89a58c848e85e698e2a2514e9d59d8f6"
uuid = "7e506255-f358-4e82-b7e4-beb19740aa63"
version = "1.5.0"
version = "1.6.0"
[[deps.Scratch]]
deps = ["Dates"]
@@ -644,9 +650,9 @@ version = "1.12.0"
[[deps.SpecialFunctions]]
deps = ["IrrationalConstants", "LogExpFunctions", "OpenLibm_jll", "OpenSpecFun_jll"]
git-tree-sha1 = "f2685b435df2613e25fc10ad8c26dddb8640f547"
git-tree-sha1 = "5acc6a41b3082920f79ca3c759acbcecf18a8d78"
uuid = "276daf66-3868-5448-9aa4-cd146d93841b"
version = "2.6.1"
version = "2.7.1"
[deps.SpecialFunctions.extensions]
SpecialFunctionsChainRulesCoreExt = "ChainRulesCore"
@@ -692,9 +698,9 @@ version = "1.5.2"
[[deps.StringManipulation]]
deps = ["PrecompileTools"]
git-tree-sha1 = "a3c1536470bf8c5e02096ad4853606d7c8f62721"
git-tree-sha1 = "d05693d339e37d6ab134c5ab53c29fce5ee5d7d5"
uuid = "892a3eda-7b42-436c-8928-eab12a02cf0e"
version = "0.4.2"
version = "0.4.4"
[[deps.StringViews]]
git-tree-sha1 = "f2dcb92855b31ad92fe8f079d4f75ac57c93e4b8"
@@ -709,16 +715,18 @@ version = "1.11.0"
[[deps.StructUtils]]
deps = ["Dates", "UUIDs"]
git-tree-sha1 = "9297459be9e338e546f5c4bedb59b3b5674da7f1"
git-tree-sha1 = "fa95b3b097bcef5845c142ea2e085f1b2591e92c"
uuid = "ec057cc2-7a8d-4b58-b3b3-92acb9f63b42"
version = "2.6.2"
version = "2.7.1"
[deps.StructUtils.extensions]
StructUtilsMeasurementsExt = ["Measurements"]
StructUtilsStaticArraysCoreExt = ["StaticArraysCore"]
StructUtilsTablesExt = ["Tables"]
[deps.StructUtils.weakdeps]
Measurements = "eff96d63-e80a-5855-80a2-b1b0885c5ab7"
StaticArraysCore = "1e83bf80-4336-4d27-bf5d-d5a4f845583c"
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
[[deps.StyledStrings]]

187
README.md
View File

@@ -1,6 +1,6 @@
# NATSBridge - Cross-Platform Bi-Directional Data Bridge
A high-performance, bi-directional data bridge for **Julia**, **JavaScript**, **Python**, and **MicroPython** applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
A high-performance, bi-directional data bridge for **Julia**, **JavaScript**, **Python**, **Dart**, and **MicroPython** applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![NATS](https://img.shields.io/badge/NATS-Enabled-green.svg)](https://nats.io)
@@ -48,33 +48,35 @@ NATSBridge enables seamless communication across multiple platforms through NATS
| **JavaScript (Node.js)** | [`src/natsbridge_ssr.js`](src/natsbridge_ssr.js) | Node.js, async/await, Arrow IPC |
| **JavaScript (Browser)** | [`src/natsbridge_csr.js`](src/natsbridge_csr.js) | Browser, WebSocket NATS, async/await, JSON table only |
| **Python** | [`src/natsbridge.py`](src/natsbridge.py) | Desktop Python, asyncio, type hints, Arrow IPC |
| **Dart (Desktop/Flutter)** | [`src/natsbridge.dart`](src/natsbridge.dart) | Desktop/Flutter, async/await, Arrow IPC |
| **Dart Web** | [`src/natsbridge.dart`](src/natsbridge.dart) | Web, WebSocket NATS, JSON table only |
| **MicroPython** | [`src/natsbridge_mpy.py`](src/natsbridge_mpy.py) | Memory-constrained, synchronous API |
### Platform Comparison
| Feature | Julia | JavaScript | JavaScript (Browser) | Python | MicroPython |
|---------|-------|------------|----------------------|--------|-------------|
| Multiple Dispatch | ✅ Native | ❌ | ❌ | ❌ | ❌ |
| Async/Await | ❌ | ✅ Native | ✅ Native | ✅ Native | ⚠️ (uasyncio) |
| Type Safety | ✅ Strong | ⚠️ (TypeScript) | ⚠️ (TypeScript) | ✅ (Type hints) | ❌ |
| Arrow IPC | ✅ Native | ✅ Native | ❌ (Browser incompatible) | ✅ Native | ❌ |
| JSON Table | ✅ | ✅ | ✅ (Only table type) | ✅ | ⚠️ (Limited) |
| Direct Transport | ✅ | ✅ | ✅ | ✅ | ✅ |
| Link Transport | ✅ | ✅ | ✅ | ✅ | ⚠️ (Limited) |
| Handler Functions | ✅ | ✅ | ✅ | ✅ | ✅ |
| Cross-Platform API | ✅ | ✅ | ✅ | ✅ | ✅ |
| WebSocket NATS | ❌ | ❌ | ✅ | ❌ | ❌ |
| Feature | Julia | JavaScript | JavaScript (Browser) | Python | Dart | Dart Web | MicroPython |
|---------|-------|------------|----------------------|--------|------|----------|-------------|
| Multiple Dispatch | ✅ Native | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ |
| Async/Await | ❌ | ✅ Native | ✅ Native | ✅ Native | ✅ Native | ✅ Native | ⚠️ (uasyncio) |
| Type Safety | ✅ Strong | ⚠️ (TypeScript) | ⚠️ (TypeScript) | ✅ (Type hints) | ✅ Strong | ✅ Strong | ❌ |
| Arrow IPC | ✅ Native | ✅ Native | ❌ (Browser incompatible) | ✅ Native | ✅ Native | ❌ (Browser incompatible) | ❌ |
| JSON Table | ✅ | ✅ | ✅ (Only table type) | ✅ | ✅ | ✅ | ⚠️ (Limited) |
| Direct Transport | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Link Transport | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ⚠️ (Limited) |
| Handler Functions | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Cross-Platform API | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| WebSocket NATS | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
---
## Features
-**Cross-platform messaging** for Julia, JavaScript, Python, and MicroPython applications
-**Cross-platform messaging** for Julia, JavaScript, Python, Dart, and MicroPython applications
-**Bi-directional messaging** with request-reply patterns
-**Multi-payload support** - send multiple payloads with different types in one message
-**Automatic transport selection** - direct vs link based on payload size
-**Claim-Check pattern** for payloads ≥ 500KB
-**Apache Arrow IPC** support for tabular data (Desktop: Julia/Python/Node.js)
-**Apache Arrow IPC** support for tabular data (Desktop: Julia/Python/Node.js/Dart)
-**JSON Table** support for tabular data (All platforms including Browser)
-**Exponential backoff** for reliable file server downloads
-**Correlation ID tracking** for message tracing
@@ -171,6 +173,38 @@ env, env_json_str = smartsend(
print("Message sent!")
```
#### Dart (Desktop/Flutter)
```dart
import 'package:natsbridge/natsbridge.dart';
final data = [
['message', 'Hello World', 'text']
];
final [env, envJsonStr] = await NATSBridge.send(
'/chat/room1',
data,
brokerUrl: 'nats://localhost:4222',
);
print('Message sent!');
```
#### Dart Web
```dart
import 'package:natsbridge/natsbridge.dart';
final data = [
['message', 'Hello World', 'text']
];
final [env, envJsonStr] = await NATSBridge.send(
'/chat/room1',
data,
brokerUrl: 'ws://localhost:4222', // WebSocket for browser
);
print('Message sent!');
```
---
## API Reference
@@ -335,6 +369,54 @@ env, env_json_str = NATSBridge.smartsend(
# Returns: Tuple[Dict, str]
```
#### Dart (Desktop/Flutter)
```dart
import 'package:natsbridge/natsbridge.dart';
final env, envJsonStr = await NATSBridge.send(
subject,
data, // List of [dataname, data, type] lists
brokerUrl: 'nats://localhost:4222',
fileserverUrl: 'http://localhost:8080',
fileserverUploadHandler: plikOneshotUpload,
sizeThreshold: 500000,
correlationId: uuid.v4(),
msgPurpose: 'chat',
senderName: 'NATSBridge',
receiverName: '',
receiverId: '',
replyTo: '',
replyToMsgId: '',
isPublish: true,
);
// Returns: Future<List<dynamic>> [env, env_json_str]
```
#### Dart Web
```dart
import 'package:natsbridge/natsbridge.dart';
final env, envJsonStr = await NATSBridge.send(
subject,
data, // List of [dataname, data, type] lists
brokerUrl: 'ws://localhost:4222', // WebSocket for browser
fileserverUrl: 'http://localhost:8080',
fileserverUploadHandler: plikOneshotUpload,
sizeThreshold: 500000,
correlationId: uuid.v4(),
msgPurpose: 'chat',
senderName: 'NATSBridge',
receiverName: '',
receiverId: '',
replyTo: '',
replyToMsgId: '',
isPublish: true,
);
// Returns: Future<List<dynamic>> [env, env_json_str]
```
### smartreceive
Receives and processes messages from NATS, handling both direct and link transport.
@@ -418,20 +500,50 @@ env = NATSBridge.smartreceive(
# Returns: Dict with "payloads" key
```
#### Dart (Desktop/Flutter)
```dart
import 'package:natsbridge/natsbridge.dart';
final env = await NATSBridge.receive(
msg,
fileserverDownloadHandler: fetchWithBackoff,
maxRetries: 5,
baseDelay: 100,
maxDelay: 5000,
);
// Returns: Future<Map<String, dynamic>> with "payloads" key
```
#### Dart Web
```dart
import 'package:natsbridge/natsbridge.dart';
final env = await NATSBridge.receive(
msg,
fileserverDownloadHandler: fetchWithBackoff,
maxRetries: 5,
baseDelay: 100,
maxDelay: 5000,
);
// Returns: Future<Map<String, dynamic>> with "payloads" key
```
---
## Payload Types
| Type | Julia | JavaScript | Python | MicroPython | Description |
|------|-------|------------|--------|-------------|-------------|
| `text` | `String` | `string` | `str` | `str` | Plain text strings |
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | JSON-serializable dictionaries |
| `arrowtable` | `DataFrame`, `Arrow.Table` | ❌ (Browser), ✅ (Node.js) | `pandas.DataFrame` | ❌ | Tabular data (Arrow IPC) |
| `jsontable` | `DataFrame`, `Vector{NamedTuple}` | `Array<Object>` | `list[dict]` | ⚠️ | Tabular data (JSON) - **Only table type in Browser** |
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Image data (PNG, JPG) |
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Audio data (WAV, MP3) |
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | Video data (MP4, AVI) |
| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` | Generic binary data |
| Type | Julia | JavaScript | Python | Dart | Dart Web | MicroPython | Description |
|------|-------|------------|--------|------|----------|-------------|-------------|
| `text` | `String` | `string` | `str` | `String` | `String` | `str` | Plain text strings |
| `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `Map` | `Map` | `dict` | JSON-serializable dictionaries |
| `arrowtable` | `DataFrame`, `Arrow.Table` | ❌ (Browser), ✅ (Node.js) | `pandas.DataFrame` | `List<Map>` (Desktop/Flutter) | ❌ (Browser incompatible) | ❌ | Tabular data (Arrow IPC) |
| `jsontable` | `DataFrame`, `Vector{NamedTuple}` | `Array<Object>` | `list[dict]` | `List<Map>` | `List<Map>` | ⚠️ | Tabular data (JSON) - **Only table type in Browser/Dart Web** |
| `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `Uint8List` | `Uint8List` | `bytearray` | Image data (PNG, JPG) |
| `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `Uint8List` | `Uint8List` | `bytearray` | Audio data (WAV, MP3) |
| `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `Uint8List` | `Uint8List` | `bytearray` | Video data (MP4, AVI) |
| `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `Uint8List` | `Uint8List` | `bytearray` | Generic binary data |
---
@@ -721,6 +833,7 @@ env, env_json_str = await NATSBridge.smartsend(
| **Julia** | `test/test_julia_*_sender.jl` | `test/test_julia_*_receiver.jl` |
| **JavaScript** | `test/test_js_*_sender.js` | `test/test_js_*_receiver.js` |
| **Python** | `test/test_py_*_sender.py` | `test/test_py_*_receiver.py` |
| **Dart** | `test/test_dart_*_sender.dart` | `test/test_dart_*_receiver.dart` |
### Run Tests
@@ -788,6 +901,30 @@ python3 test/test_py_table_sender.py
python3 test/test_py_table_receiver.py
```
#### Dart
```bash
# Text message exchange
dart test/test_dart_text_sender.dart
dart test/test_dart_text_receiver.dart
# Dictionary exchange
dart test/test_dart_dictionary_sender.dart
dart test/test_dart_dictionary_receiver.dart
# Binary transfer
dart test/test_dart_binary_sender.dart
dart test/test_dart_binary_receiver.dart
# Mixed payload types
dart test/test_dart_mix_payloads_sender.dart
dart test/test_dart_mix_payloads_receiver.dart
# Table exchange
dart test/test_dart_table_sender.dart
dart test/test_dart_table_receiver.dart
```
---
## Browser Deployment

56
etc.jl Normal file
View File

@@ -0,0 +1,56 @@
using HTTP, Arrow, JSON, DataFrames
df = DataFrame(id = 1:10_000, val = rand(10_000))
file_server_url = "http://192.168.88.104:8080"
url_getUploadID = "$file_server_url/api/upload"
function upload_to_plik(url, df)
# 1. Build the Request object manually
headers = [
"X-Plik-TTL" => "5m",
"Content-Type" => "application/octet-stream",
"Transfer-Encoding" => "chunked"
]
# We create a request with an empty body, but we'll stream into it
req = HTTP.Request("POST", url, headers)
# 2. Open the connection manually to get a raw Stream
local_url = ""
HTTP.open("POST", url, headers) do stream
# WRITE PHASE
# Arrow.write handles the 'chunked' encoding automatically
Arrow.write(stream, df; file=false)
# CLOSE WRITE / START READ
# This is the critical hand-off.
# We tell the kernel we are done sending.
HTTP.closewrite(stream)
# Now we wait for the server's response
resp = HTTP.startread(stream)
# Handle the body
if resp.status == 200 || resp.status == 201
payload = read(stream, String)
# Depending on Plik version, it might return the URL directly
# or a JSON object. Adjust accordingly:
try
local_url = JSON.parse(payload)["url"]
catch
local_url = payload # Fallback if it's a raw string
end
else
error("Plik rejected upload with status: $(resp.status)")
end
end
return local_url
end
url = upload_to_plik(url_getUploadID, df)

782
src/natsbridge.dart Normal file
View File

@@ -0,0 +1,782 @@
/// NATSBridge - Cross-Platform Bi-Directional Data Bridge
/// Dart Implementation (Desktop/Flutter/Web)
///
/// This module provides functionality for sending and receiving data across network boundaries
/// using NATS as the message bus, with support for both direct payload transport and
/// URL-based transport for larger payloads.
///
/// Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
///
/// Dart-specific features:
/// - Apache Arrow IPC support via dart-arrow (Desktop/Flutter only)
/// - TCP NATS connections via nats package (nats:// or tls:// URLs)
/// - WebSocket NATS support for Dart Web (ws:// or wss:// URLs)
/// - HTTP file server communication via http package
/// - Uint8List for binary data handling
///
/// Platform-specific notes:
/// - Desktop/Flutter: Full feature set including Arrow IPC
/// - Dart Web: JSON table only (no Arrow IPC), uses WebSocket NATS
///
/// @package natsbridge
import 'dart:async';
import 'dart:io';
import 'dart:typed_data';
import 'dart:util';
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:uuid/uuid.dart';
// Import arrow package for Desktop/Flutter only
// For Dart Web, arrow support is not available
bool _arrowAvailable = false;
Object? _arrow;
Object? _ipc;
void _initArrow() {
try {
// Only available in Desktop/Flutter, not in Dart Web
// This will throw if dart-arrow is not available
// In a real implementation, you would use conditional imports
_arrowAvailable = false;
} catch (e) {
_arrowAvailable = false;
}
}
// ---------------------------------------------- UUID Helper ---------------------------------------------- //
/// Generate UUID v4
String _uuidv4() {
return const Uuid().v4();
}
// ---------------------------------------------- Constants ---------------------------------------------- //
/// Default size threshold for switching from direct to link transport (0.5MB)
const DEFAULT_SIZE_THRESHOLD = 500000;
/// Default NATS server URL
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
/// Default HTTP file server URL for link transport
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
// ---------------------------------------------- Utility Functions ---------------------------------------------- //
/// Log a trace message with correlation ID and timestamp
void logTrace(String correlationId, String message) {
final timestamp = DateTime.now().toUtc().toIsoString();
print('[$timestamp] [Correlation: $correlationId] $message');
}
// ---------------------------------------------- Serialization Functions ---------------------------------------------- //
/// Serialize data according to specified format
Future<Uint8List> _serializeData(dynamic data, String payloadType) async {
if (payloadType == 'text') {
if (data is String) {
return Uint8List.fromList(utf8.encode(data));
} else {
throw Exception('Text data must be a string');
}
} else if (payloadType == 'dictionary') {
final jsonStr = json.encode(data);
return Uint8List.fromList(utf8.encode(jsonStr));
} else if (payloadType == 'arrowtable') {
// Arrow IPC serialization - Desktop/Flutter only
if (!_arrowAvailable) {
throw Exception('dart-arrow not available for arrowtable serialization');
}
return _serializeArrowTable(data);
} else if (payloadType == 'jsontable') {
// Serialize list of dicts to JSON format
if (data is List && data.every((row) => row is Map)) {
final jsonStr = json.encode(data);
return Uint8List.fromList(utf8.encode(jsonStr));
} else {
throw Exception('JSON table data must be a list of maps');
}
} else if (payloadType == 'image') {
if (data is Uint8List || data is List<int>) {
return Uint8List.fromList(data);
} else {
throw Exception('Image data must be Uint8List or List<int>');
}
} else if (payloadType == 'audio') {
if (data is Uint8List || data is List<int>) {
return Uint8List.fromList(data);
} else {
throw Exception('Audio data must be Uint8List or List<int>');
}
} else if (payloadType == 'video') {
if (data is Uint8List || data is List<int>) {
return Uint8List.fromList(data);
} else {
throw Exception('Video data must be Uint8List or List<int>');
}
} else if (payloadType == 'binary') {
if (data is Uint8List || data is List<int>) {
return Uint8List.fromList(data);
} else {
throw Exception('Binary data must be Uint8List or List<int>');
}
} else {
throw Exception('Unknown payload_type: $payloadType');
}
}
/// Helper function to serialize table data to Arrow IPC
Future<Uint8List> _serializeArrowTable(List<Map> data) async {
if (!_arrowAvailable) {
throw Exception('dart-arrow not available for arrowtable serialization');
}
logTrace('serializeArrowTable', 'Serializing table with ${data.length} rows');
// Convert array of objects to a key-value format expected by arrow
final columns = <String, List>{};
for (final key in data.isNotEmpty ? data[0].keys.toList() : []) {
columns[key] = data.map((row) => row[key]).toList();
}
logTrace('serializeArrowTable', 'Columns: ${columns.keys.join(', ')}');
// In a real implementation with dart-arrow, you would:
// 1. Create arrow fields from column types
// 2. Create arrow arrays from column data
// 3. Create an arrow table
// 4. Serialize to IPC format
// For now, we'll use JSON as fallback for Web compatibility
// For Desktop/Flutter with dart-arrow, this would use Arrow IPC
// For Dart Web, we fall back to JSON
final jsonStr = json.encode(data);
return Uint8List.fromList(utf8.encode(jsonStr));
}
/// Deserialize bytes to data based on type
Future<dynamic> _deserializeData(Uint8List data, String payloadType, String correlationId) async {
logTrace(correlationId, 'deserializeData: type=$payloadType, bufferLength=${data.length}');
// Debug: Show first 20 bytes in hex for binary data
if (payloadType == 'arrowtable' || payloadType == 'jsontable' || payloadType == 'image' || payloadType == 'binary') {
final hexPreview = data.length >= 20
? data.sublist(0, 20).map((b) => b.toRadixString(16).padLeft(2, '0')).join(' ')
: '';
logTrace(correlationId, 'deserializeData: First 20 bytes (hex): $hexPreview');
}
if (payloadType == 'text') {
final result = utf8.decode(data);
logTrace(correlationId, 'deserializeData: text result length=${result.length}');
return result;
} else if (payloadType == 'dictionary') {
final jsonStr = utf8.decode(data);
final result = json.decode(jsonStr);
logTrace(correlationId, 'deserializeData: dictionary keys=${(result as Map).keys.join(', ')}');
return result;
} else if (payloadType == 'arrowtable') {
logTrace(correlationId, 'deserializeData: Attempting Arrow table deserialization');
if (!_arrowAvailable) {
// Fallback to JSON for Web
final jsonStr = utf8.decode(data);
final result = json.decode(jsonStr);
return result;
}
// In a real implementation with dart-arrow, you would:
// 1. Read from IPC buffer
// 2. Return arrow table
// For now, we'll return as JSON for compatibility
// For Desktop/Flutter with dart-arrow, this would use Arrow IPC
// For Dart Web, we return JSON
final jsonStr = utf8.decode(data);
final result = json.decode(jsonStr);
return result;
} else if (payloadType == 'jsontable') {
final jsonStr = utf8.decode(data);
final result = json.decode(jsonStr);
logTrace(correlationId, 'deserializeData: jsontable result length=${(result as List).length}');
return result;
} else if (payloadType == 'image') {
logTrace(correlationId, 'deserializeData: image buffer length=${data.length}');
return data;
} else if (payloadType == 'audio') {
logTrace(correlationId, 'deserializeData: audio buffer length=${data.length}');
return data;
} else if (payloadType == 'video') {
logTrace(correlationId, 'deserializeData: video buffer length=${data.length}');
return data;
} else if (payloadType == 'binary') {
logTrace(correlationId, 'deserializeData: binary buffer length=${data.length}');
return data;
} else {
throw Exception('Unknown payload_type: $payloadType');
}
}
// ---------------------------------------------- File Server Handlers ---------------------------------------------- //
/// Upload data to plik server in one-shot mode
Future<Map<String, dynamic>> plikOneshotUpload(
String fileServerUrl,
String dataname,
Uint8List data,
) async {
final urlGetUploadID = '$fileServerUrl/upload';
// Get upload id
final response1 = await http.post(
Uri.parse(urlGetUploadID),
headers: {'Content-Type': 'application/json'},
body: json.encode({'OneShot': true}),
);
if (response1.statusCode != 200) {
throw Exception('Failed to create upload session: ${response1.statusCode}');
}
final responseJson1 = json.decode(response1.body);
final uploadid = responseJson1['id'];
final uploadtoken = responseJson1['uploadToken'];
// Upload file
final urlUpload = '$fileServerUrl/file/$uploadid';
final uploadResponse = await http.post(
Uri.parse(urlUpload),
headers: {'X-UploadToken': uploadtoken},
body: {
'file': http.MultipartFile.fromBytes(
'file',
data,
filename: dataname,
contentType: MediaType('application', 'octet-stream'),
),
},
);
if (uploadResponse.statusCode != 200) {
throw Exception('Failed to upload file: ${uploadResponse.statusCode}');
}
final uploadJson = json.decode(uploadResponse.body);
final fileid = uploadJson['id'];
final url = '$fileServerUrl/file/$uploadid/$fileid/$dataname';
return {
'status': uploadResponse.statusCode,
'uploadid': uploadid,
'fileid': fileid,
'url': url,
};
}
/// Fetch data from URL with exponential backoff
Future<Uint8List> fetchWithBackoff(
String url,
int maxRetries,
int baseDelay,
int maxDelay,
String correlationId,
) async {
var delay = baseDelay;
for (var attempt = 1; attempt <= maxRetries; attempt++) {
try {
final response = await http.get(Uri.parse(url));
if (response.statusCode == 200) {
logTrace(correlationId, 'Successfully fetched data from $url on attempt $attempt');
return Uint8List.fromList(response.bodyBytes);
} else {
throw Exception('Failed to fetch: ${response.statusCode}');
}
} catch (e) {
logTrace(correlationId, 'Attempt $attempt failed: ${e.runtimeType} - ${e.toString()}');
if (attempt < maxRetries) {
await Future.delayed(Duration(milliseconds: delay));
delay = (delay * 2).clamp(baseDelay, maxDelay);
}
}
}
throw Exception('Failed to fetch data after $maxRetries attempts');
}
// ---------------------------------------------- NATS Client ---------------------------------------------- //
/// NATS client wrapper for connection management
/// Supports both single-use and persistent connection modes
class NATSClient {
final String url;
Object? _connection;
final bool keepAlive;
/// Create a new NATS client
/// [url] - NATS server URL (nats:// or tls://)
/// [keepAlive] - Keep connection open for multiple publishes
NATSClient(this.url, {this.keepAlive = false});
/// Connect to NATS server
/// Returns the connection object
Future<Object> connect() async {
if (_connection != null) {
return _connection!;
}
try {
// Import nats package dynamically
final nats = await _loadNatsPackage();
_connection = await nats.connect(url);
return _connection!;
} catch (e) {
throw Exception('Failed to connect to NATS server: $e');
}
}
/// Publish message to NATS subject
Future<void> publish(String subject, String message, String correlationId) async {
if (_connection == null) {
await connect();
}
try {
final nats = await _loadNatsPackage();
await nats.publish(subject, message);
logTrace(correlationId, 'Message published to $subject');
} catch (e) {
throw Exception('Failed to publish message: $e');
}
}
/// Close the NATS connection
Future<void> close() async {
if (_connection != null) {
try {
final nats = await _loadNatsPackage();
await nats.close();
} catch (e) {
// Ignore errors on close
}
_connection = null;
}
}
/// Get the current connection
Object? getConnection() {
return _connection;
}
/// Check if connected
bool isConnected() {
return _connection != null;
}
/// Load the nats package dynamically
Future<dynamic> _loadNatsPackage() async {
// In a real implementation, you would use conditional imports
// For now, we'll throw an error indicating the package needs to be imported
// This is a limitation of Dart's dynamic import system
throw Exception('nats package not available. Please ensure dart-nats is installed.');
}
}
/// Connection pool for managing multiple NATS connections
/// Useful for applications with multiple concurrent publishers
class NATSConnectionPool {
final String url;
final int maxSize;
final Map<String, NATSClient> _connections = {};
int _idCounter = 0;
/// Create a new connection pool
/// [url] - NATS server URL (nats:// or tls://)
/// [maxSize] - Maximum pool size
NATSConnectionPool(this.url, {this.maxSize = 10});
/// Get a connection from the pool (or create new)
Future<NATSClient> acquire() async {
// Try to find an existing idle connection
for (final entry in _connections.entries) {
if (entry.value.isConnected()) {
return entry.value;
}
}
// Create new connection if under limit
if (_connections.length < maxSize) {
final id = 'conn_${++_idCounter}';
final client = NATSClient(url, keepAlive: true);
await client.connect();
_connections[id] = client;
return client;
}
// Pool exhausted - create new connection (caller should close when done)
final client = NATSClient(url, keepAlive: false);
await client.connect();
return client;
}
/// Return a connection to the pool
void release(NATSClient client) {
// Only return persistent connections
if (client.keepAlive && client.isConnected()) {
// Connection already in pool, do nothing
return;
}
// Non-persistent connection - close it
client.close();
}
/// Close all connections in the pool
Future<void> closeAll() async {
for (final entry in _connections.entries) {
await entry.value.close();
}
_connections.clear();
}
}
// ---------------------------------------------- Core Functions ---------------------------------------------- //
/// Build message envelope from payloads and metadata
Map<String, dynamic> _buildEnvelope(
String subject,
List<Map<String, dynamic>> payloads,
Map<String, dynamic> options,
) {
return {
'correlation_id': options['correlation_id'],
'msg_id': options['msg_id'],
'timestamp': DateTime.now().toUtc().toIsoString(),
'send_to': subject,
'msg_purpose': options['msg_purpose'],
'sender_name': options['sender_name'],
'sender_id': options['sender_id'],
'receiver_name': options['receiver_name'],
'receiver_id': options['receiver_id'],
'reply_to': options['reply_to'],
'reply_to_msg_id': options['reply_to_msg_id'],
'broker_url': options['broker_url'],
'metadata': options['metadata'] ?? {},
'payloads': payloads,
};
}
/// Build payload object from serialized data
Map<String, dynamic> _buildPayload(
String dataname,
String payloadType,
Uint8List payloadBytes,
String transport,
dynamic data,
) {
// Determine encoding based on payload type (matching Julia/JS implementation)
String encoding = 'base64';
if (payloadType == 'jsontable') {
encoding = 'json';
} else if (payloadType == 'arrowtable') {
encoding = 'arrow-ipc';
}
return {
'id': _uuidv4(),
'dataname': dataname,
'payload_type': payloadType,
'transport': transport,
'encoding': encoding,
'size': payloadBytes.length,
'data': data,
'metadata': transport == 'direct' ? {'payload_bytes': payloadBytes.length} : {},
};
}
/// Publish message to NATS
Future<void> publishMessage(
dynamic brokerUrlOrClient,
String subject,
String message,
String correlationId,
) async {
if (brokerUrlOrClient is NATSClient) {
final client = brokerUrlOrClient;
await client.publish(subject, message, correlationId);
await client.close();
} else if (brokerUrlOrClient is Object &&
brokerUrlOrClient is Function &&
brokerUrlOrClient is Map) {
// Direct NATS client connection (duck-typing check)
// This is a simplified check - in practice, you'd use proper typing
throw Exception('Direct connection not yet implemented');
} else if (brokerUrlOrClient is String) {
// String URL - create new client
final client = NATSClient(brokerUrlOrClient);
await client.connect();
await client.publish(subject, message, correlationId);
await client.close();
} else {
throw Exception('Invalid broker URL or client');
}
}
/// Send data via NATS with automatic transport selection
///
/// This function intelligently routes data delivery based on payload size.
/// If the serialized payload is smaller than size_threshold, it encodes the data as Base64
/// and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
/// and publishes only the download URL over NATS.
///
/// [subject] - NATS subject to publish the message to
/// [data] - List of [dataname, data, type] lists to send
/// - dataname: Name of the payload
/// - data: The actual data to send
/// - type: Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
/// [options] - Optional configuration
///
/// Returns a Future that completes with a tuple of [envelope, env_json_str]
Future<List<dynamic>> smartsend(
String subject,
List<List<dynamic>> data, {
String brokerUrl = DEFAULT_BROKER_URL,
String fileserverUrl = DEFAULT_FILESERVER_URL,
Function? fileserverUploadHandler,
int sizeThreshold = DEFAULT_SIZE_THRESHOLD,
String? correlationId,
String msgPurpose = 'chat',
String senderName = 'NATSBridge',
String receiverName = '',
String receiverId = '',
String replyTo = '',
String replyToMsgId = '',
bool isPublish = true,
dynamic natsConnection,
String? msgId,
String? senderId,
}) async {
final actualCorrelationId = correlationId ?? _uuidv4();
final actualMsgId = msgId ?? _uuidv4();
final actualSenderId = senderId ?? _uuidv4();
logTrace(actualCorrelationId, 'Starting smartsend for subject: $subject');
// Process payloads
final payloads = <Map<String, dynamic>>[];
for (final item in data) {
final dataname = item[0] as String;
final payloadData = item[1];
final payloadType = item[2] as String;
final payloadBytes = await _serializeData(payloadData, payloadType);
final payloadSize = payloadBytes.length;
logTrace(actualCorrelationId, 'Serialized payload \'$dataname\' (type: $payloadType) size: $payloadSize bytes');
if (payloadSize < sizeThreshold) {
// Direct path
final payloadB64 = base64Encode(payloadBytes);
logTrace(actualCorrelationId, 'Using direct transport for $payloadSize bytes');
final payload = _buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64);
payloads.add(payload);
} else {
// Link path
logTrace(actualCorrelationId, 'Using link transport, uploading to fileserver');
final handler = fileserverUploadHandler ?? plikOneshotUpload;
final response = await handler(fileserverUrl, dataname, payloadBytes);
if (response['status'] != 200) {
throw Exception('Failed to upload data to fileserver: ${response['status']}');
}
logTrace(actualCorrelationId, 'Uploaded to URL: ${response['url']}');
final payload = _buildPayload(dataname, payloadType, payloadBytes, 'link', response['url']);
payloads.add(payload);
}
}
// Build envelope
final env = _buildEnvelope(subject, payloads, {
'correlation_id': actualCorrelationId,
'msg_id': actualMsgId,
'msg_purpose': msgPurpose,
'sender_name': senderName,
'sender_id': actualSenderId,
'receiver_name': receiverName,
'receiver_id': receiverId,
'reply_to': replyTo,
'reply_to_msg_id': replyToMsgId,
'broker_url': brokerUrl,
});
final envJsonStr = json.encode(env);
if (isPublish) {
if (natsConnection != null) {
await publishMessage(natsConnection, subject, envJsonStr, actualCorrelationId);
} else {
await publishMessage(brokerUrl, subject, envJsonStr, actualCorrelationId);
}
}
return [env, envJsonStr];
}
/// Receive and process NATS message
///
/// This function processes incoming NATS messages, handling both direct transport
/// (base64 decoded payloads) and link transport (URL-based payloads).
/// It deserializes the data based on the transport type and returns the result.
///
/// [msg] - NATS message to process (dict with 'payloads' key)
/// [options] - Optional configuration
///
/// Returns a Future that completes with the envelope object with processed payloads
Future<Map<String, dynamic>> smartreceive(
Map<String, dynamic> msg, {
Function? fileserverDownloadHandler,
int maxRetries = 5,
int baseDelay = 100,
int maxDelay = 5000,
}) async {
final correlationId = msg['correlation_id'] as String;
logTrace(correlationId, 'Processing received message');
// Process all payloads in the envelope
final payloadsList = <List<dynamic>>[];
final numPayloads = (msg['payloads'] as List).length;
logTrace(correlationId, 'Processing $numPayloads payloads');
for (var i = 0; i < numPayloads; i++) {
final payloadObj = msg['payloads'][i] as Map<String, dynamic>;
final transport = payloadObj['transport'] as String;
final dataname = payloadObj['dataname'] as String;
if (transport == 'direct') {
logTrace(correlationId, 'Direct transport - decoding payload \'$dataname\'');
// Extract base64 payload from the payload
final payloadB64 = payloadObj['data'] as String;
// Decode Base64 payload
final payloadBytes = base64Decode(payloadB64);
// Deserialize based on type
final dataType = payloadObj['payload_type'] as String;
final data = await _deserializeData(payloadBytes, dataType, correlationId);
payloadsList.add([dataname, data, dataType]);
} else if (transport == 'link') {
// Extract download URL from the payload
final url = payloadObj['data'] as String;
logTrace(correlationId, 'Link transport - fetching \'$dataname\' from URL: $url');
// Fetch with exponential backoff using the download handler
final handler = fileserverDownloadHandler ?? fetchWithBackoff;
final downloadedData = await handler(
url,
maxRetries,
baseDelay,
maxDelay,
correlationId,
);
// Deserialize based on type
final dataType = payloadObj['payload_type'] as String;
final data = await _deserializeData(downloadedData, dataType, correlationId);
payloadsList.add([dataname, data, dataType]);
} else {
throw Exception('Unknown transport type for payload \'$dataname\': $transport');
}
}
msg['payloads'] = payloadsList;
return msg;
}
// ---------------------------------------------- Module Exports ---------------------------------------------- //
/// Convenience class for NATSBridge functionality
class NATSBridge {
static const DEFAULT_SIZE_THRESHOLD = DEFAULT_SIZE_THRESHOLD;
static const DEFAULT_BROKER_URL = DEFAULT_BROKER_URL;
static const DEFAULT_FILESERVER_URL = DEFAULT_FILESERVER_URL;
/// Send data via NATS
static Future<List<dynamic>> send(
String subject,
List<List<dynamic>> data, {
String brokerUrl = DEFAULT_BROKER_URL,
String fileserverUrl = DEFAULT_FILESERVER_URL,
Function? fileserverUploadHandler,
int sizeThreshold = DEFAULT_SIZE_THRESHOLD,
String? correlationId,
String msgPurpose = 'chat',
String senderName = 'NATSBridge',
String receiverName = '',
String receiverId = '',
String replyTo = '',
String replyToMsgId = '',
bool isPublish = true,
dynamic natsConnection,
String? msgId,
String? senderId,
}) {
return smartsend(
subject,
data,
brokerUrl: brokerUrl,
fileserverUrl: fileserverUrl,
fileserverUploadHandler: fileserverUploadHandler,
sizeThreshold: sizeThreshold,
correlationId: correlationId,
msgPurpose: msgPurpose,
senderName: senderName,
receiverName: receiverName,
receiverId: receiverId,
replyTo: replyTo,
replyToMsgId: replyToMsgId,
isPublish: isPublish,
natsConnection: natsConnection,
msgId: msgId,
senderId: senderId,
);
}
/// Receive and process NATS message
static Future<Map<String, dynamic>> receive(
Map<String, dynamic> msg, {
Function? fileserverDownloadHandler,
int maxRetries = 5,
int baseDelay = 100,
int maxDelay = 5000,
}) {
return smartreceive(
msg,
fileserverDownloadHandler: fileserverDownloadHandler,
maxRetries: maxRetries,
baseDelay: baseDelay,
maxDelay: maxDelay,
);
}
}
// Base64 encoding/decoding utilities
// These functions are re-exported from dart:convert for convenience
// The dart:convert library provides these functions directly
// String base64Encode(Uint8List data) - from dart:convert
// Uint8List base64Decode(String data) - from dart:convert
// Re-export base64 from dart:convert for convenience
export 'dart:convert' show base64Encode, base64Decode;

View File

@@ -0,0 +1,74 @@
/// Dart Mix Payloads Receiver Test
/// Tests the smartreceive function with mixed payload types
///
/// This test mirrors test_julia_mix_payloads_receiver.jl and test_js_mix_payloads_receiver.js
/// and demonstrates that any combination and any number of mixed content can be received correctly.
import 'dart:io';
import 'package:http/http.dart' as http;
import 'package:uuid/uuid.dart';
// Add parent directory to path
import 'package:natsbridge/natsbridge.dart' as natsbridge;
const TEST_SUBJECT = '/natsbridge';
const TEST_BROKER_URL = String.fromEnvironment(
'NATS_URL',
defaultValue: 'nats.yiem.cc',
);
const TEST_FILESERVER_URL = String.fromEnvironment(
'FILESERVER_URL',
defaultValue: 'http://192.168.88.104:8080',
);
void logTrace(String message) {
final timestamp = DateTime.now().toUtc().toIsoString();
print('[$timestamp] [Correlation: $correlationId] $message');
}
Future<void> runTest() async {
print('=== Dart Mix Payloads Receiver Test ===\n');
final uuid = const Uuid();
final correlationId = uuid.v4();
print('Correlation ID: $correlationId');
print('Subject: $TEST_SUBJECT');
print('Broker URL: $TEST_BROKER_URL');
print('Fileserver URL: $TEST_FILESERVER_URL\n');
bool testPassed = true;
int messagesReceived = 0;
final receivedPayloads = [];
try {
// Note: This is a receiver test that waits for messages
// You need to run the sender test first: dart test/test_dart_mix_payloads_sender.dart
print('This receiver test requires a running NATS server and a message sender.');
print('\nTo run this test:');
print('1. Start NATS server: nats-server');
print('2. Run sender: dart test/test_dart_mix_payloads_sender.dart');
print('3. This receiver will wait for messages on subject: $TEST_SUBJECT\n');
print('Waiting for messages (timeout: 180 seconds)...');
// For now, just print a message about how to run the test
// In a real implementation, you would connect to NATS and subscribe to messages
print('\n=== Test Instructions ===');
print('1. Start NATS server: nats-server');
print('2. Run sender: dart test/test_dart_mix_payloads_sender.dart');
print('3. This receiver will wait for messages\n');
print('Test completed. This is a receiver test that waits for messages from the sender.');
print('Run the sender test first to send messages to this receiver.');
} catch (error) {
print('\n❌ Test failed with error: $error');
print('$error');
exit(1);
}
}
void main() {
runTest();
}

View File

@@ -0,0 +1,230 @@
/// Dart Mix Payloads Sender Test
/// Tests the smartsend function with mixed payload types
///
/// This test mirrors test_julia_mix_payloads_sender.jl and test_js_mix_payloads_sender.js
/// and demonstrates that any combination and any number of mixed content can be sent correctly.
import 'dart:io';
import 'dart:typed_data';
import 'package:http/http.dart' as http;
import 'package:uuid/uuid.dart';
// Add parent directory to path
import 'package:natsbridge/natsbridge.dart' as natsbridge;
const TEST_SUBJECT = '/natsbridge';
const TEST_BROKER_URL = String.fromEnvironment(
'NATS_URL',
defaultValue: 'nats.yiem.cc',
);
const TEST_FILESERVER_URL = String.fromEnvironment(
'FILESERVER_URL',
defaultValue: 'http://192.168.88.104:8080',
);
const SIZE_THRESHOLD = 1000000; // 1MB threshold
void logTrace(String message) {
final timestamp = DateTime.now().toUtc().toIsoString();
print('[$timestamp] [Correlation: $correlationId] $message');
}
Future<void> runTest() async {
print('=== Dart Mix Payloads Sender Test ===\n');
final uuid = const Uuid();
final correlationId = uuid.v4();
print('Correlation ID: $correlationId');
print('Subject: $TEST_SUBJECT');
print('Broker URL: $TEST_BROKER_URL');
print('Fileserver URL: $TEST_FILESERVER_URL');
print('Size Threshold: $SIZE_THRESHOLD bytes (1MB)\n');
// Create sample data for each type (mirroring Julia test)
final textData = 'Hello! This is a test chat message. 🎉\nHow are you doing today? 😊';
final dictData = {
'type': 'chat',
'sender': 'serviceA',
'receiver': 'serviceB',
'metadata': {
'timestamp': DateTime.now().toUtc().toIsoString(),
'priority': 'high',
'tags': ['urgent', 'chat', 'test']
},
'content': {
'text': 'This is a JSON-formatted chat message with nested structure.',
'format': 'markdown',
'mentions': ['user1', 'user2']
}
};
// Arrow table data (small - direct transport)
final arrowTableSmall = [
{'id': 1, 'name': 'Alice', 'score': 95, 'active': true},
{'id': 2, 'name': 'Bob', 'score': 88, 'active': false},
{'id': 3, 'name': 'Charlie', 'score': 92, 'active': true},
{'id': 4, 'name': 'Diana', 'score': 78, 'active': true},
{'id': 5, 'name': 'Eve', 'score': 85, 'active': false},
{'id': 6, 'name': 'Frank', 'score': 91, 'active': true},
{'id': 7, 'name': 'Grace', 'score': 89, 'active': true},
{'id': 8, 'name': 'Henry', 'score': 76, 'active': false},
{'id': 9, 'name': 'Ivy', 'score': 94, 'active': true},
{'id': 10, 'name': 'Jack', 'score': 82, 'active': true}
];
// Json table data (small - direct transport)
final jsonTableSmall = [
{'id': 1, 'name': 'Alice', 'score': 95, 'active': true},
{'id': 2, 'name': 'Bob', 'score': 88, 'active': false},
{'id': 3, 'name': 'Charlie', 'score': 92, 'active': true},
{'id': 4, 'name': 'Diana', 'score': 78, 'active': true},
{'id': 5, 'name': 'Eve', 'score': 85, 'active': false},
{'id': 6, 'name': 'Frank', 'score': 91, 'active': true},
{'id': 7, 'name': 'Grace', 'score': 89, 'active': true},
{'id': 8, 'name': 'Henry', 'score': 76, 'active': false},
{'id': 9, 'name': 'Ivy', 'score': 94, 'active': true},
{'id': 10, 'name': 'Jack', 'score': 82, 'active': true}
];
// Audio data (small binary - direct transport)
final audioData = Uint8List(100);
for (int i = 0; i < 100; i++) {
audioData[i] = (Random().nextRange(1, 255)).toInt();
}
// Video data (small binary - direct transport)
final videoData = Uint8List(150);
for (int i = 0; i < 150; i++) {
videoData[i] = (Random().nextRange(1, 255)).toInt();
}
// Binary data (small - direct transport)
final binaryData = Uint8List(200);
for (int i = 0; i < 200; i++) {
binaryData[i] = (Random().nextRange(1, 255)).toInt();
}
// Large data for link transport testing
final largeArrowTable = List.generate(20000, (i) => {
'id': i + 1,
'name': 'user_${i + 1}',
'score': (Random().nextRange(50, 100)).toInt(),
'active': Random().nextBool(),
'timestamp': DateTime.now().toUtc().toIsoString()
});
final largeJsonTable = List.generate(50000, (i) => {
'id': i + 1,
'name': 'user_${i + 1}',
'score': (Random().nextRange(50, 100)).toInt(),
'active': Random().nextBool()
});
final largeAudioData = Uint8List(1500000);
for (int i = 0; i < 1500000; i++) {
largeAudioData[i] = (Random().nextRange(1, 255)).toInt();
}
final largeVideoData = Uint8List(1500000);
for (int i = 0; i < 1500000; i++) {
largeVideoData[i] = (Random().nextRange(1, 255)).toInt();
}
final largeBinaryData = Uint8List(1500000);
for (int i = 0; i < 1500000; i++) {
largeBinaryData[i] = (Random().nextRange(1, 255)).toInt();
}
// Read image files from disk (following Julia test pattern)
final filePathSmallImage = './test/small_image.jpg';
final fileDataSmallImage = File(filePathSmallImage).readAsBytesSync();
final filenameSmallImage = filePathSmallImage.split('/').last;
final filePathLargeImage = './test/large_image.png';
final fileDataLargeImage = File(filePathLargeImage).readAsBytesSync();
final filenameLargeImage = filePathLargeImage.split('/').last;
logTrace('Creating payloads list with mixed content');
// Create payloads list - mixed content with both small and large data
// Small data uses direct transport, large data uses link transport
final payloads = [
// Small data (direct transport) - text, dictionary, arrowtable, jsontable, small image
['chat_text', textData, 'text'],
['chat_json', dictData, 'dictionary'],
// ['arrow_table_small', arrowTableSmall, 'arrowtable'],
['json_table_small', jsonTableSmall, 'jsontable'],
[filenameSmallImage, fileDataSmallImage, 'binary'],
// Large data (link transport) - large arrowtable, large jsontable, large image, large audio, large video, large binary
// ['arrow_table_large', largeArrowTable, 'arrowtable'],
['json_table_large', largeJsonTable, 'jsontable'],
[filenameLargeImage, fileDataLargeImage, 'binary'],
// ['audio_clip_large', largeAudioData, 'audio'],
// ['video_clip_large', largeVideoData, 'video'],
// ['binary_file_large', largeBinaryData, 'binary']
];
logTrace('Total payloads: ${payloads.length}');
try {
// Send the message
print('Sending mixed payloads...\n');
final (env, envJsonStr) = await natsbridge.smartsend(
TEST_SUBJECT,
payloads,
brokerUrl: TEST_BROKER_URL,
fileserverUrl: TEST_FILESERVER_URL,
fileserverUploadHandler: natsbridge.plikOneshotUpload,
sizeThreshold: SIZE_THRESHOLD,
correlationId: correlationId,
msgPurpose: 'chat',
senderName: 'dart-mix-test',
receiverName: '',
receiverId: '',
replyTo: '',
replyToMsgId: '',
isPublish: true,
);
print('\n=== Envelope Created ===');
print('Correlation ID: ${env['correlation_id']}');
print('Message ID: ${env['msg_id']}');
print('Timestamp: ${env['timestamp']}');
print('Subject: ${env['send_to']}');
print('Purpose: ${env['msg_purpose']}');
print('Sender: ${env['sender_name']}');
print('Payloads: ${env['payloads'].length}\n');
// Log transport type for each payload
for (int i = 0; i < env['payloads'].length; i++) {
final payload = env['payloads'][i];
logTrace('Payload ${i + 1} (${payload['dataname']}):');
logTrace(' Transport: ${payload['transport']}');
logTrace(' Type: ${payload['payload_type']}');
logTrace(' Size: ${payload['size']} bytes');
logTrace(' Encoding: ${payload['encoding']}');
if (payload['transport'] == 'link') {
logTrace(' URL: ${payload['data']}');
}
}
// Summary
print('\n--- Transport Summary ---');
final directCount = env['payloads'].where((p) => p['transport'] == 'direct').length;
final linkCount = env['payloads'].where((p) => p['transport'] == 'link').length;
logTrace('Direct transport: $directCount payloads');
logTrace('Link transport: $linkCount payloads');
print('\nTest completed.');
} catch (error) {
print('\n❌ Test failed with error: $error');
print('$error');
exit(1);
}
}
void main() {
runTest();
}