rust version implemented
This commit is contained in:
@@ -9,7 +9,7 @@
|
||||
|
||||
## 1. Executive Summary
|
||||
|
||||
This document provides the **end-to-end trace** for NATSBridge - the cross-platform bi-directional data bridge that enables seamless communication between **Julia**, **JavaScript**, **Python**, **Dart**, and **MicroPython** applications using NATS as the message bus.
|
||||
This document provides the **end-to-end trace** for NATSBridge - the cross-platform bi-directional data bridge that enables seamless communication between **Julia**, **JavaScript**, **Python**, **Dart**, **Rust**, and **MicroPython** applications using NATS as the message bus.
|
||||
|
||||
This walkthrough serves as the primary onboarding guide for new developers and explains:
|
||||
- **User scenarios** - Real-world use cases from developer perspective
|
||||
@@ -463,7 +463,155 @@ env, msg_json = smartsend(
|
||||
|
||||
---
|
||||
|
||||
## User Scenario 4: MicroPython Device
|
||||
## User Scenario 4: Rust Service with Type-Safe API
|
||||
|
||||
### Scenario Description
|
||||
|
||||
A Rust service needs to process messages from a Julia analytics pipeline and send typed results back. The Rust implementation leverages compile-time type safety via Rust enums and serde for serialization.
|
||||
|
||||
### Step-by-Step Flow
|
||||
|
||||
#### Step 1: Rust Service Receives Message
|
||||
|
||||
```rust
|
||||
// Rust service - using tokio async runtime
|
||||
use natsbridge::{smartreceive, MsgEnvelopeV1};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let conn = nats::connect("nats://localhost:4222").unwrap();
|
||||
|
||||
// Subscribe and receive messages
|
||||
let mut sub = conn.subscribe("/agent/wine/api/v1/analyze").unwrap();
|
||||
|
||||
for msg in sub.messages() {
|
||||
let envelope: MsgEnvelopeV1 = smartreceive(
|
||||
&String::from_utf8_lossy(&msg.payload),
|
||||
&Default::default(),
|
||||
).await.unwrap();
|
||||
|
||||
// Type-safe payload access
|
||||
for payload in &envelope.payloads {
|
||||
match &payload.data {
|
||||
Payload::ArrowTable(arrow_bytes) => {
|
||||
// Process Arrow IPC data using arrow2
|
||||
let table = arrow2::io::ipc::read::Reader::new(
|
||||
std::io::Cursor::new(arrow_bytes.clone()),
|
||||
);
|
||||
println!("Received {} rows", table.len());
|
||||
},
|
||||
Payload::Text(text) => {
|
||||
println!("Message: {}", text);
|
||||
},
|
||||
_ => println!("Received {} bytes of {} data",
|
||||
match &payload.data {
|
||||
Payload::Binary(b) => b.len(),
|
||||
_ => 0,
|
||||
},
|
||||
payload.payload_type),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Rationale**:
|
||||
- **Type-safe payloads**: Rust enum discriminates between payload types at compile time
|
||||
- **serde serialization**: Automatic JSON deserialization to `MsgEnvelopeV1`
|
||||
- **tokio runtime**: Efficient async I/O for NATS and HTTP operations
|
||||
- **arrow2 integration**: Direct Arrow IPC deserialization without intermediate format
|
||||
|
||||
#### Step 2: Rust Service Sends Processed Results
|
||||
|
||||
```rust
|
||||
// Rust service sends results back with mixed payload types
|
||||
use natsbridge::{smartsend, Payload, SmartsendOptions};
|
||||
|
||||
let results_df = /* processed Arrow table */;
|
||||
let result_bytes = /* serialize to Arrow IPC */;
|
||||
|
||||
let (envelope, json_str) = smartsend(
|
||||
"/agent/wine/api/v1/results",
|
||||
&[
|
||||
(
|
||||
"results".to_string(),
|
||||
Payload::ArrowTable(result_bytes),
|
||||
"arrowtable".to_string(),
|
||||
),
|
||||
(
|
||||
"summary".to_string(),
|
||||
Payload::Text("Analysis complete: 1500 rows processed".to_string()),
|
||||
"text".to_string(),
|
||||
),
|
||||
],
|
||||
&SmartsendOptions {
|
||||
broker_url: "nats://localhost:4222".to_string(),
|
||||
reply_to: "/python/worker/v1/results".to_string(),
|
||||
msg_purpose: "chat".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
).await?;
|
||||
|
||||
// Caller publishes to NATS
|
||||
conn.publish("/agent/wine/api/v1/results", &json_str)?;
|
||||
```
|
||||
|
||||
**Rationale**:
|
||||
- **Builder pattern**: `SmartsendOptions` provides clean configuration
|
||||
- **Enum-based payloads**: Type safety prevents sending incorrect data types
|
||||
- **Default options**: sensible defaults reduce boilerplate
|
||||
- **Result<T, E>**: idiomatic Rust error handling
|
||||
|
||||
#### Step 3: Python/Julia Receives Rust Response
|
||||
|
||||
```python
|
||||
# Python backend receives Rust response
|
||||
env = await smartreceive(str(nats_msg.payload))
|
||||
|
||||
# env["payloads"][0] is now:
|
||||
# ("results", arrow_table_data, "arrowtable")
|
||||
# env["payloads"][1] is now:
|
||||
# ("summary", "Analysis complete: 1500 rows processed", "text")
|
||||
```
|
||||
|
||||
**Rationale**:
|
||||
- **Cross-platform parity**: Rust envelope matches other platform envelopes exactly
|
||||
- **Same JSON wire format**: No protocol translation needed
|
||||
- **Type preservation**: Arrow IPC and text types preserved across all platforms
|
||||
|
||||
#### Step 4: Large File Transfer from Rust
|
||||
|
||||
```rust
|
||||
// Rust service sends large binary file via link transport
|
||||
let large_file_data: Vec<u8> = std::fs::read("/data/large_dataset.parquet")?;
|
||||
|
||||
let (envelope, json_str) = smartsend(
|
||||
"/agent/wine/api/v1/upload",
|
||||
&[
|
||||
(
|
||||
"dataset".to_string(),
|
||||
Payload::Binary(large_file_data),
|
||||
"binary".to_string(),
|
||||
),
|
||||
],
|
||||
&SmartsendOptions {
|
||||
broker_url: "nats://localhost:4222".to_string(),
|
||||
fileserver_url: "http://localhost:8080".to_string(),
|
||||
size_threshold: 500_000, // 0.5MB triggers link transport
|
||||
..Default::default()
|
||||
},
|
||||
).await?;
|
||||
```
|
||||
|
||||
**Rationale**:
|
||||
- **Automatic transport selection**: Same 0.5MB threshold as other desktop platforms
|
||||
- **reqwest integration**: Efficient HTTP client for file server upload/download
|
||||
- **Exponential backoff**: Built-in retry with configurable parameters
|
||||
- **Zero-copy where possible**: `Vec<u8>` passed directly without intermediate copies
|
||||
|
||||
---
|
||||
|
||||
## User Scenario 5: MicroPython Device
|
||||
|
||||
### Scenario Description
|
||||
|
||||
@@ -528,7 +676,7 @@ env = await smartreceive(str(nats_msg.payload))
|
||||
|
||||
---
|
||||
|
||||
## User Scenario 5: Cross-Platform Chat with Mixed Payloads
|
||||
## User Scenario 6: Cross-Platform Chat with Mixed Payloads
|
||||
|
||||
### Scenario Description
|
||||
|
||||
@@ -763,6 +911,7 @@ log_trace(correlation_id, "Published to NATS")
|
||||
| [`src/natsbridge_csr.js`](../src/natsbridge_csr.js) | Browser | JSON table only, WebSocket NATS | specification.md:2-19 (all sections) |
|
||||
| [`src/natsbridge.py`](../src/natsbridge.py) | Python | Arrow IPC, async/await | specification.md:2-19 (all sections) |
|
||||
| [`src/natsbridge.dart`](../src/natsbridge.dart) | Dart | Full feature set, Arrow IPC, async/await | specification.md:2-19 (all sections) |
|
||||
| [`src/natsbridge.rs`](../src/natsbridge.rs) | Rust | Full feature set, Arrow IPC, async/await, type-safe | specification.md:2-19 (all sections) |
|
||||
| [`src/natsbridge_mpy.py`](../src/natsbridge_mpy.py) | MicroPython | Limited to direct transport | specification.md:2-19 (all sections) |
|
||||
|
||||
---
|
||||
@@ -771,6 +920,9 @@ log_trace(correlation_id, "Published to NATS")
|
||||
|
||||
| Date | Version | Changes | Specification Reference |
|
||||
|------|---------|---------|------------------------|
|
||||
| 2026-05-13 | 1.3.0 | Added Rust support with tokio, serde, and arrow2 | All sections |
|
||||
| - | - | Added Rust user scenario (User Scenario 4) | specification.md:11 (Rust API) |
|
||||
| - | - | Updated scenario numbering (MicroPython → Scenario 5, Cross-Platform → Scenario 6) | All sections |
|
||||
| 2026-05-13 | 1.2.0 | Aligned with ground truth implementation (src/NATSBridge.jl) | All sections |
|
||||
| - | - | Updated smartreceive calls to use String(nats_msg.payload) pattern | All sections |
|
||||
| - | - | Removed NATSClient.publish() calls (caller responsible for NATS publishing) | All sections |
|
||||
|
||||
Reference in New Issue
Block a user