From 809aea454bfe18dbc899c9a5ed3b4d9a2676a8cb Mon Sep 17 00:00:00 2001 From: narawat Date: Thu, 14 May 2026 13:16:13 +0700 Subject: [PATCH] update --- AI_prompt.md | 11 ++++ Cargo.lock | 17 ++++++ Cargo.toml | 2 +- src/natsbridge.rs | 141 +++++++++++++++++++++++++++++++++++----------- 4 files changed, 137 insertions(+), 34 deletions(-) diff --git a/AI_prompt.md b/AI_prompt.md index e9951e8..8560444 100644 --- a/AI_prompt.md +++ b/AI_prompt.md @@ -189,3 +189,14 @@ Check the following files: I would like to expand this package (NATSBRIDGE) to include Rust support. Now help me update Rust implementation of this package at ./src/natsbridge.rs. + + + + +I want to build a client-side-rendering Dioxus-based chat webapp. +Dioxus version 0.7+ should be great. +I already populate the current folder for the project. +my server REST API endpoint is sommpanion.yiem.cc/agent-fronent/api/v1/chat but I didn't run the server yet. A message format is JSON string. +I just placed my custom package for encode and decode message at ./src/natsbridge.rs. smartsend() is for encoding and smartreceive() is for decoding. +you may also check the file /home/ton/docker-apps/sommpanion/NATSBridge/docs/walkthrough.md for more info about my package. +You can test whether Dioxus webapp can be build using this command "dx bundle --web --release --debug-symbols=false" diff --git a/Cargo.lock b/Cargo.lock index 59c6af1..a8bdfc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -692,6 +692,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "mio" version = "1.2.0" @@ -909,6 +919,7 @@ dependencies = [ "js-sys", "log", "mime", + "mime_guess", "native-tls", "percent-encoding", "pin-project-lite", @@ -1357,6 +1368,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/Cargo.toml b/Cargo.toml index 1f9bcf8..e803e68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ path = "src/natsbridge.rs" serde = { version = "1", features = ["derive"] } serde_json = "1" tokio = { version = "1", features = ["full"] } -reqwest = { version = "0.12", features = ["json", "stream"] } +reqwest = { version = "0.12", features = ["json", "stream", "multipart"] } uuid = { version = "1", features = ["v4", "serde"] } base64 = "0.22" chrono = { version = "0.4", features = ["serde"] } diff --git a/src/natsbridge.rs b/src/natsbridge.rs index 5a59020..dfc194b 100644 --- a/src/natsbridge.rs +++ b/src/natsbridge.rs @@ -25,6 +25,8 @@ use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::collections::HashMap; use std::fmt; +use std::path::Path; +use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; use uuid::Uuid; @@ -239,15 +241,15 @@ impl MsgPayloadV1 { _ => "none".to_string(), }; MsgPayloadV1 { - id: Uuid::new_v4().to_string(), - dataname, - payload_type, - transport: "link".to_string(), - encoding, - size, - data: url, - metadata: Some(HashMap::new()), - } + id: Uuid::new_v4().to_string(), + dataname, + payload_type, + transport: "link".to_string(), + encoding, + size, + data: url, + metadata: None, + } } } @@ -332,7 +334,7 @@ pub struct SmartsendOptions { /// HTTP file server URL for large payloads pub fileserver_url: String, /// Custom file server upload handler (optional, uses Plik by default) - pub fileserver_upload_handler: Option>, + pub fileserver_upload_handler: Option>, /// Size threshold in bytes for switching from direct to link transport pub size_threshold: usize, /// Correlation ID for distributed tracing (auto-generated if empty) @@ -378,7 +380,7 @@ impl Default for SmartsendOptions { /// Options for the `smartreceive` function pub struct SmartreceiveOptions { /// Custom file server download handler (optional, uses exponential backoff by default) - pub fileserver_download_handler: Option>, + pub fileserver_download_handler: Option>, /// Maximum retry attempts for fetching a URL pub max_retries: u32, /// Initial delay for exponential backoff in milliseconds @@ -502,14 +504,20 @@ impl FileUploadHandler for PlikOneshotUploadHandler { )); } - // Step 2: Upload the file + // Step 2: Upload the file as multipart/form-data let upload_url = format!("{}/file/{}", file_server_url, uploadid); + let form = reqwest::multipart::Form::new() + .part( + "file", + reqwest::multipart::Part::bytes(data.to_vec()) + .file_name(dataname.to_string()) + .mime_str("application/octet-stream") + .map_err(|e| NatSBridgeError::UploadFailed(format!("Invalid MIME type: {}", e)))?, + ); let resp = client .post(&upload_url) .header("X-UploadToken", &uploadtoken) - .body(data.to_vec()) - .header("Content-Type", "application/octet-stream") - .header("Filename", dataname) + .multipart(form) .send() .await .map_err(|e| NatSBridgeError::UploadFailed(format!("Upload request failed: {}", e)))?; @@ -530,8 +538,8 @@ impl FileUploadHandler for PlikOneshotUploadHandler { let fileid = upload_json["id"].as_str().unwrap_or("").to_string(); let url = format!( - "{}/file/{}/{}", - file_server_url, uploadid, dataname + "{}/file/{}/{}/{}", + file_server_url, uploadid, fileid, dataname ); Ok(UploadResult { @@ -762,9 +770,9 @@ pub async fn smartsend( let mut payloads: Vec = Vec::new(); - // Determine the upload handler to use - let _custom_upload = options.fileserver_upload_handler.is_some(); - let upload_handler: Box = Box::new(PlikOneshotUploadHandler); + // Determine the upload handler to use (custom or default Plik) + let upload_handler: Arc = options.fileserver_upload_handler.clone() + .unwrap_or_else(|| Arc::new(PlikOneshotUploadHandler)); for (dataname, payload, payload_type) in data { // Use the explicitly provided payload_type from the tuple, @@ -845,6 +853,27 @@ pub async fn smartsend( Ok((env, env_json_str)) } +// ============================================================================ +// Helper: store deserialized data back into MsgPayloadV1 +// ============================================================================ + +/// Store deserialized Payload data back into a MsgPayloadV1's data field. +/// After smartreceive(), payload.data contains the deserialized content as a string +/// (decoded text, JSON string, or base64 for binary types). +fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> MsgPayloadV1 { + let mut p = payload.clone(); + match deserialized { + Payload::Text(s) => p.data = s.clone(), + Payload::Dictionary(v) => p.data = serde_json::to_string(v).unwrap_or_default(), + Payload::JsonTable(v) => p.data = serde_json::to_string(v).unwrap_or_default(), + Payload::ArrowTable(b) => p.data = BASE64.encode(b), + Payload::Image(b) | Payload::Audio(b) | Payload::Video(b) | Payload::Binary(b) => { + p.data = BASE64.encode(b); + } + } + p +} + // ============================================================================ // Public API: smartreceive // ============================================================================ @@ -905,12 +934,12 @@ pub async fn smartreceive( let correlation_id = env.correlation_id.clone(); log_trace(&correlation_id, "Processing received message"); - // Determine the download handler to use - let _custom_download = options.fileserver_download_handler.is_some(); - let download_handler: Box = Box::new(BackoffDownloadHandler); + // Determine the download handler to use (custom or default backoff) + let download_handler: Arc = options.fileserver_download_handler.clone() + .unwrap_or_else(|| Arc::new(BackoffDownloadHandler)); // Process each payload - let mut deserialized_payloads: Vec = Vec::new(); + let mut updated_payloads: Vec = Vec::new(); for payload in &env.payloads { let transport = payload.transport.as_str(); @@ -929,15 +958,15 @@ pub async fn smartreceive( "Base64 decode failed for '{}': {}", dataname, e )))?; - // Deserialize based on type - let _deserialized = deserialize_data( + // Deserialize based on type and store result back into payload + let deserialized = deserialize_data( &payload_bytes, &payload_type, &correlation_id, )?; + let updated = store_deserialized_data(payload, &deserialized); - // Keep the original payload structure (with base64 data) - deserialized_payloads.push(payload.clone()); + updated_payloads.push(updated); } "link" => { let url = payload.data.clone(); @@ -956,15 +985,15 @@ pub async fn smartreceive( ) .await?; - // Deserialize based on type - let _deserialized = deserialize_data( + // Deserialize based on type and store result back into payload + let deserialized = deserialize_data( &downloaded_data, &payload_type, &correlation_id, )?; + let updated = store_deserialized_data(payload, &deserialized); - // Keep the original payload structure (with URL) - deserialized_payloads.push(payload.clone()); + updated_payloads.push(updated); } unknown => { return Err(NatSBridgeError::UnknownTransport(format!( @@ -975,7 +1004,7 @@ pub async fn smartreceive( } } - env.payloads = deserialized_payloads; + env.payloads = updated_payloads; Ok(env) } @@ -1037,6 +1066,52 @@ pub async fn send_binary( .await } +// ============================================================================ +// Plik File Upload from Disk +// ============================================================================ + +/// Upload a file from disk to a Plik server in one-shot mode. +/// +/// Reads the file at `filepath`, extracts its filename, and uploads it +/// using the Plik one-shot upload protocol (multipart/form-data). +/// +/// # Arguments +/// - `file_server_url`: Base URL of the Plik server (e.g., `"http://localhost:8080"`) +/// - `filepath`: Full path to the local file to upload +/// +/// # Returns +/// - `Result` with uploadid, fileid, and download URL +/// +/// # Example +/// ```no_run +/// use natsbridge::plik_upload_file; +/// +/// # async fn example() -> Result<(), Box> { +/// let result = plik_upload_file("http://localhost:8080", "./large_file.zip").await?; +/// println!("Uploaded to: {}", result.url); +/// # Ok(()) +/// # } +/// ``` +pub async fn plik_upload_file( + file_server_url: &str, + filepath: &str, +) -> Result { + // Read the file from disk + let data = tokio::fs::read(filepath).await + .map_err(|e| NatSBridgeError::IoError(format!( + "Failed to read file '{}': {}", filepath, e + )))?; + + // Extract filename from path + let dataname = Path::new(filepath) + .file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_default(); + + // Upload using the Plik one-shot handler + PlikOneshotUploadHandler.upload(file_server_url, &dataname, &data).await +} + // ============================================================================ // Module Exports // ============================================================================