This commit is contained in:
2026-05-14 13:16:13 +07:00
parent c5a70edd57
commit 809aea454b
4 changed files with 137 additions and 34 deletions

View File

@@ -189,3 +189,14 @@ Check the following files:
I would like to expand this package (NATSBRIDGE) to include Rust support.
Now help me update Rust implementation of this package at ./src/natsbridge.rs.
I want to build a client-side-rendering Dioxus-based chat webapp.
Dioxus version 0.7+ should be great.
I already populate the current folder for the project.
my server REST API endpoint is sommpanion.yiem.cc/agent-fronent/api/v1/chat but I didn't run the server yet. A message format is JSON string.
I just placed my custom package for encode and decode message at ./src/natsbridge.rs. smartsend() is for encoding and smartreceive() is for decoding.
you may also check the file /home/ton/docker-apps/sommpanion/NATSBridge/docs/walkthrough.md for more info about my package.
You can test whether Dioxus webapp can be build using this command "dx bundle --web --release --debug-symbols=false"

17
Cargo.lock generated
View File

@@ -692,6 +692,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "mio"
version = "1.2.0"
@@ -909,6 +919,7 @@ dependencies = [
"js-sys",
"log",
"mime",
"mime_guess",
"native-tls",
"percent-encoding",
"pin-project-lite",
@@ -1357,6 +1368,12 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "unicase"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
[[package]]
name = "unicode-ident"
version = "1.0.24"

View File

@@ -12,7 +12,7 @@ path = "src/natsbridge.rs"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.12", features = ["json", "stream"] }
reqwest = { version = "0.12", features = ["json", "stream", "multipart"] }
uuid = { version = "1", features = ["v4", "serde"] }
base64 = "0.22"
chrono = { version = "0.4", features = ["serde"] }

View File

@@ -25,6 +25,8 @@ use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::fmt;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use uuid::Uuid;
@@ -239,15 +241,15 @@ impl MsgPayloadV1 {
_ => "none".to_string(),
};
MsgPayloadV1 {
id: Uuid::new_v4().to_string(),
dataname,
payload_type,
transport: "link".to_string(),
encoding,
size,
data: url,
metadata: Some(HashMap::new()),
}
id: Uuid::new_v4().to_string(),
dataname,
payload_type,
transport: "link".to_string(),
encoding,
size,
data: url,
metadata: None,
}
}
}
@@ -332,7 +334,7 @@ pub struct SmartsendOptions {
/// HTTP file server URL for large payloads
pub fileserver_url: String,
/// Custom file server upload handler (optional, uses Plik by default)
pub fileserver_upload_handler: Option<Box<dyn FileUploadHandler>>,
pub fileserver_upload_handler: Option<Arc<dyn FileUploadHandler>>,
/// Size threshold in bytes for switching from direct to link transport
pub size_threshold: usize,
/// Correlation ID for distributed tracing (auto-generated if empty)
@@ -378,7 +380,7 @@ impl Default for SmartsendOptions {
/// Options for the `smartreceive` function
pub struct SmartreceiveOptions {
/// Custom file server download handler (optional, uses exponential backoff by default)
pub fileserver_download_handler: Option<Box<dyn FileDownloadHandler>>,
pub fileserver_download_handler: Option<Arc<dyn FileDownloadHandler>>,
/// Maximum retry attempts for fetching a URL
pub max_retries: u32,
/// Initial delay for exponential backoff in milliseconds
@@ -502,14 +504,20 @@ impl FileUploadHandler for PlikOneshotUploadHandler {
));
}
// Step 2: Upload the file
// Step 2: Upload the file as multipart/form-data
let upload_url = format!("{}/file/{}", file_server_url, uploadid);
let form = reqwest::multipart::Form::new()
.part(
"file",
reqwest::multipart::Part::bytes(data.to_vec())
.file_name(dataname.to_string())
.mime_str("application/octet-stream")
.map_err(|e| NatSBridgeError::UploadFailed(format!("Invalid MIME type: {}", e)))?,
);
let resp = client
.post(&upload_url)
.header("X-UploadToken", &uploadtoken)
.body(data.to_vec())
.header("Content-Type", "application/octet-stream")
.header("Filename", dataname)
.multipart(form)
.send()
.await
.map_err(|e| NatSBridgeError::UploadFailed(format!("Upload request failed: {}", e)))?;
@@ -530,8 +538,8 @@ impl FileUploadHandler for PlikOneshotUploadHandler {
let fileid = upload_json["id"].as_str().unwrap_or("").to_string();
let url = format!(
"{}/file/{}/{}",
file_server_url, uploadid, dataname
"{}/file/{}/{}/{}",
file_server_url, uploadid, fileid, dataname
);
Ok(UploadResult {
@@ -762,9 +770,9 @@ pub async fn smartsend(
let mut payloads: Vec<MsgPayloadV1> = Vec::new();
// Determine the upload handler to use
let _custom_upload = options.fileserver_upload_handler.is_some();
let upload_handler: Box<dyn FileUploadHandler> = Box::new(PlikOneshotUploadHandler);
// Determine the upload handler to use (custom or default Plik)
let upload_handler: Arc<dyn FileUploadHandler> = options.fileserver_upload_handler.clone()
.unwrap_or_else(|| Arc::new(PlikOneshotUploadHandler));
for (dataname, payload, payload_type) in data {
// Use the explicitly provided payload_type from the tuple,
@@ -845,6 +853,27 @@ pub async fn smartsend(
Ok((env, env_json_str))
}
// ============================================================================
// Helper: store deserialized data back into MsgPayloadV1
// ============================================================================
/// Store deserialized Payload data back into a MsgPayloadV1's data field.
/// After smartreceive(), payload.data contains the deserialized content as a string
/// (decoded text, JSON string, or base64 for binary types).
fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> MsgPayloadV1 {
let mut p = payload.clone();
match deserialized {
Payload::Text(s) => p.data = s.clone(),
Payload::Dictionary(v) => p.data = serde_json::to_string(v).unwrap_or_default(),
Payload::JsonTable(v) => p.data = serde_json::to_string(v).unwrap_or_default(),
Payload::ArrowTable(b) => p.data = BASE64.encode(b),
Payload::Image(b) | Payload::Audio(b) | Payload::Video(b) | Payload::Binary(b) => {
p.data = BASE64.encode(b);
}
}
p
}
// ============================================================================
// Public API: smartreceive
// ============================================================================
@@ -905,12 +934,12 @@ pub async fn smartreceive(
let correlation_id = env.correlation_id.clone();
log_trace(&correlation_id, "Processing received message");
// Determine the download handler to use
let _custom_download = options.fileserver_download_handler.is_some();
let download_handler: Box<dyn FileDownloadHandler> = Box::new(BackoffDownloadHandler);
// Determine the download handler to use (custom or default backoff)
let download_handler: Arc<dyn FileDownloadHandler> = options.fileserver_download_handler.clone()
.unwrap_or_else(|| Arc::new(BackoffDownloadHandler));
// Process each payload
let mut deserialized_payloads: Vec<MsgPayloadV1> = Vec::new();
let mut updated_payloads: Vec<MsgPayloadV1> = Vec::new();
for payload in &env.payloads {
let transport = payload.transport.as_str();
@@ -929,15 +958,15 @@ pub async fn smartreceive(
"Base64 decode failed for '{}': {}", dataname, e
)))?;
// Deserialize based on type
let _deserialized = deserialize_data(
// Deserialize based on type and store result back into payload
let deserialized = deserialize_data(
&payload_bytes,
&payload_type,
&correlation_id,
)?;
let updated = store_deserialized_data(payload, &deserialized);
// Keep the original payload structure (with base64 data)
deserialized_payloads.push(payload.clone());
updated_payloads.push(updated);
}
"link" => {
let url = payload.data.clone();
@@ -956,15 +985,15 @@ pub async fn smartreceive(
)
.await?;
// Deserialize based on type
let _deserialized = deserialize_data(
// Deserialize based on type and store result back into payload
let deserialized = deserialize_data(
&downloaded_data,
&payload_type,
&correlation_id,
)?;
let updated = store_deserialized_data(payload, &deserialized);
// Keep the original payload structure (with URL)
deserialized_payloads.push(payload.clone());
updated_payloads.push(updated);
}
unknown => {
return Err(NatSBridgeError::UnknownTransport(format!(
@@ -975,7 +1004,7 @@ pub async fn smartreceive(
}
}
env.payloads = deserialized_payloads;
env.payloads = updated_payloads;
Ok(env)
}
@@ -1037,6 +1066,52 @@ pub async fn send_binary(
.await
}
// ============================================================================
// Plik File Upload from Disk
// ============================================================================
/// Upload a file from disk to a Plik server in one-shot mode.
///
/// Reads the file at `filepath`, extracts its filename, and uploads it
/// using the Plik one-shot upload protocol (multipart/form-data).
///
/// # Arguments
/// - `file_server_url`: Base URL of the Plik server (e.g., `"http://localhost:8080"`)
/// - `filepath`: Full path to the local file to upload
///
/// # Returns
/// - `Result<UploadResult, NatSBridgeError>` with uploadid, fileid, and download URL
///
/// # Example
/// ```no_run
/// use natsbridge::plik_upload_file;
///
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let result = plik_upload_file("http://localhost:8080", "./large_file.zip").await?;
/// println!("Uploaded to: {}", result.url);
/// # Ok(())
/// # }
/// ```
pub async fn plik_upload_file(
file_server_url: &str,
filepath: &str,
) -> Result<UploadResult, NatSBridgeError> {
// Read the file from disk
let data = tokio::fs::read(filepath).await
.map_err(|e| NatSBridgeError::IoError(format!(
"Failed to read file '{}': {}", filepath, e
)))?;
// Extract filename from path
let dataname = Path::new(filepath)
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
// Upload using the Plik one-shot handler
PlikOneshotUploadHandler.upload(file_server_url, &dataname, &data).await
}
// ============================================================================
// Module Exports
// ============================================================================