|
|
|
|
@@ -17,10 +17,9 @@
|
|
|
|
|
// Supported types: "text", "dictionary", "arrowtable", "jsontable",
|
|
|
|
|
// "image", "audio", "video", "binary"
|
|
|
|
|
|
|
|
|
|
use async_trait::async_trait;
|
|
|
|
|
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
|
|
|
|
|
use chrono::Utc;
|
|
|
|
|
use reqwest::Client;
|
|
|
|
|
use reqwest::blocking::Client;
|
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
use serde_json::Value as JsonValue;
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
@@ -28,7 +27,6 @@ use std::fmt;
|
|
|
|
|
use std::path::Path;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
use tokio::time::sleep;
|
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
@@ -59,7 +57,7 @@ pub const DEFAULT_MAX_DELAY: u64 = 5_000;
|
|
|
|
|
|
|
|
|
|
/// Errors that can occur during msghandler operations
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub enum msghandlerError {
|
|
|
|
|
pub enum MsgHandlerError {
|
|
|
|
|
/// Unsupported or unknown payload type
|
|
|
|
|
UnknownPayloadType(String),
|
|
|
|
|
/// File server upload failed
|
|
|
|
|
@@ -86,34 +84,34 @@ pub enum msghandlerError {
|
|
|
|
|
InvalidEnvelope(String),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl fmt::Display for msghandlerError {
|
|
|
|
|
impl fmt::Display for MsgHandlerError {
|
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
|
match self {
|
|
|
|
|
msghandlerError::UnknownPayloadType(p) => write!(f, "Unknown payload_type: {}", p),
|
|
|
|
|
msghandlerError::UploadFailed(msg) => write!(f, "Failed to upload: {}", msg),
|
|
|
|
|
msghandlerError::DownloadFailed { url, retries } => {
|
|
|
|
|
MsgHandlerError::UnknownPayloadType(p) => write!(f, "Unknown payload_type: {}", p),
|
|
|
|
|
MsgHandlerError::UploadFailed(msg) => write!(f, "Failed to upload: {}", msg),
|
|
|
|
|
MsgHandlerError::DownloadFailed { url, retries } => {
|
|
|
|
|
write!(f, "Failed to fetch {} after {} attempts", url, retries)
|
|
|
|
|
}
|
|
|
|
|
msghandlerError::UnknownTransport(t) => write!(f, "Unknown transport type: {}", t),
|
|
|
|
|
msghandlerError::ConnectionFailed(msg) => write!(f, "Connection failed: {}", msg),
|
|
|
|
|
msghandlerError::DeserializationError(msg) => {
|
|
|
|
|
MsgHandlerError::UnknownTransport(t) => write!(f, "Unknown transport type: {}", t),
|
|
|
|
|
MsgHandlerError::ConnectionFailed(msg) => write!(f, "Connection failed: {}", msg),
|
|
|
|
|
MsgHandlerError::DeserializationError(msg) => {
|
|
|
|
|
write!(f, "Deserialization error: {}", msg)
|
|
|
|
|
}
|
|
|
|
|
msghandlerError::HttpError { status, message } => {
|
|
|
|
|
MsgHandlerError::HttpError { status, message } => {
|
|
|
|
|
write!(f, "HTTP error {}: {}", status, message)
|
|
|
|
|
}
|
|
|
|
|
msghandlerError::IoError(msg) => write!(f, "IO error: {}", msg),
|
|
|
|
|
msghandlerError::JsonError(msg) => write!(f, "JSON error: {}", msg),
|
|
|
|
|
msghandlerError::Base64Error(msg) => write!(f, "Base64 error: {}", msg),
|
|
|
|
|
msghandlerError::SizeExceeded { size, max } => {
|
|
|
|
|
MsgHandlerError::IoError(msg) => write!(f, "IO error: {}", msg),
|
|
|
|
|
MsgHandlerError::JsonError(msg) => write!(f, "JSON error: {}", msg),
|
|
|
|
|
MsgHandlerError::Base64Error(msg) => write!(f, "Base64 error: {}", msg),
|
|
|
|
|
MsgHandlerError::SizeExceeded { size, max } => {
|
|
|
|
|
write!(f, "Payload size {} exceeds max {}", size, max)
|
|
|
|
|
}
|
|
|
|
|
msghandlerError::InvalidEnvelope(msg) => write!(f, "Invalid envelope: {}", msg),
|
|
|
|
|
MsgHandlerError::InvalidEnvelope(msg) => write!(f, "Invalid envelope: {}", msg),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl std::error::Error for msghandlerError {}
|
|
|
|
|
impl std::error::Error for MsgHandlerError {}
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
// Payload Enum - Type-safe payload data
|
|
|
|
|
@@ -318,8 +316,8 @@ impl MsgEnvelopeV1 {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Convert the envelope to a JSON string for transport
|
|
|
|
|
pub fn to_json(&self) -> Result<String, msghandlerError> {
|
|
|
|
|
serde_json::to_string(self).map_err(|e| msghandlerError::JsonError(e.to_string()))
|
|
|
|
|
pub fn to_json(&self) -> Result<String, MsgHandlerError> {
|
|
|
|
|
serde_json::to_string(self).map_err(|e| MsgHandlerError::JsonError(e.to_string()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -405,16 +403,15 @@ impl Default for SmartreceiveOptions {
|
|
|
|
|
// ============================================================================
|
|
|
|
|
|
|
|
|
|
/// Trait for uploading data to a file server
|
|
|
|
|
#[async_trait]
|
|
|
|
|
pub trait FileUploadHandler: Send + Sync {
|
|
|
|
|
/// Upload data to the file server
|
|
|
|
|
/// Returns upload ID, file ID, and download URL
|
|
|
|
|
async fn upload(
|
|
|
|
|
fn upload(
|
|
|
|
|
&self,
|
|
|
|
|
file_server_url: &str,
|
|
|
|
|
dataname: &str,
|
|
|
|
|
data: &[u8],
|
|
|
|
|
) -> Result<UploadResult, msghandlerError>;
|
|
|
|
|
) -> Result<UploadResult, MsgHandlerError>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Result of a file server upload
|
|
|
|
|
@@ -431,17 +428,16 @@ pub struct UploadResult {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Trait for downloading data from a file server
|
|
|
|
|
#[async_trait]
|
|
|
|
|
pub trait FileDownloadHandler: Send + Sync {
|
|
|
|
|
/// Download data from a URL with retry logic
|
|
|
|
|
async fn download(
|
|
|
|
|
fn download(
|
|
|
|
|
&self,
|
|
|
|
|
url: &str,
|
|
|
|
|
max_retries: u32,
|
|
|
|
|
base_delay: u64,
|
|
|
|
|
max_delay: u64,
|
|
|
|
|
correlation_id: &str,
|
|
|
|
|
) -> Result<Vec<u8>, msghandlerError>;
|
|
|
|
|
) -> Result<Vec<u8>, MsgHandlerError>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
@@ -457,14 +453,13 @@ pub trait FileDownloadHandler: Send + Sync {
|
|
|
|
|
/// 4. Returns identifiers and download URL
|
|
|
|
|
pub struct PlikOneshotUploadHandler;
|
|
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
|
impl FileUploadHandler for PlikOneshotUploadHandler {
|
|
|
|
|
async fn upload(
|
|
|
|
|
fn upload(
|
|
|
|
|
&self,
|
|
|
|
|
file_server_url: &str,
|
|
|
|
|
dataname: &str,
|
|
|
|
|
data: &[u8],
|
|
|
|
|
) -> Result<UploadResult, msghandlerError> {
|
|
|
|
|
) -> Result<UploadResult, MsgHandlerError> {
|
|
|
|
|
let client = Client::new();
|
|
|
|
|
|
|
|
|
|
// Step 1: Create one-shot upload session
|
|
|
|
|
@@ -474,11 +469,10 @@ impl FileUploadHandler for PlikOneshotUploadHandler {
|
|
|
|
|
.header("Content-Type", "application/json")
|
|
|
|
|
.json(&session_body)
|
|
|
|
|
.send()
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|e| msghandlerError::UploadFailed(format!("Failed to create upload session: {}", e)))?;
|
|
|
|
|
.map_err(|e| MsgHandlerError::UploadFailed(format!("Failed to create upload session: {}", e)))?;
|
|
|
|
|
|
|
|
|
|
if !session_resp.status().is_success() {
|
|
|
|
|
return Err(msghandlerError::UploadFailed(format!(
|
|
|
|
|
return Err(MsgHandlerError::UploadFailed(format!(
|
|
|
|
|
"Session creation failed with status: {}",
|
|
|
|
|
session_resp.status()
|
|
|
|
|
)));
|
|
|
|
|
@@ -486,8 +480,7 @@ impl FileUploadHandler for PlikOneshotUploadHandler {
|
|
|
|
|
|
|
|
|
|
let session_json: JsonValue = session_resp
|
|
|
|
|
.json()
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|e| msghandlerError::UploadFailed(format!("Failed to parse session response: {}", e)))?;
|
|
|
|
|
.map_err(|e| MsgHandlerError::UploadFailed(format!("Failed to parse session response: {}", e)))?;
|
|
|
|
|
|
|
|
|
|
let uploadid = session_json["id"]
|
|
|
|
|
.as_str()
|
|
|
|
|
@@ -499,31 +492,30 @@ impl FileUploadHandler for PlikOneshotUploadHandler {
|
|
|
|
|
.to_string();
|
|
|
|
|
|
|
|
|
|
if uploadid.is_empty() || uploadtoken.is_empty() {
|
|
|
|
|
return Err(msghandlerError::UploadFailed(
|
|
|
|
|
return Err(MsgHandlerError::UploadFailed(
|
|
|
|
|
"Missing uploadid or uploadToken in session response".to_string(),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Step 2: Upload the file as multipart/form-data
|
|
|
|
|
let upload_url = format!("{}/file/{}", file_server_url, uploadid);
|
|
|
|
|
let form = reqwest::multipart::Form::new()
|
|
|
|
|
let form = reqwest::blocking::multipart::Form::new()
|
|
|
|
|
.part(
|
|
|
|
|
"file",
|
|
|
|
|
reqwest::multipart::Part::bytes(data.to_vec())
|
|
|
|
|
reqwest::blocking::multipart::Part::bytes(data.to_vec())
|
|
|
|
|
.file_name(dataname.to_string())
|
|
|
|
|
.mime_str("application/octet-stream")
|
|
|
|
|
.map_err(|e| msghandlerError::UploadFailed(format!("Invalid MIME type: {}", e)))?,
|
|
|
|
|
.map_err(|e| MsgHandlerError::UploadFailed(format!("Invalid MIME type: {}", e)))?,
|
|
|
|
|
);
|
|
|
|
|
let resp = client
|
|
|
|
|
.post(&upload_url)
|
|
|
|
|
.header("X-UploadToken", &uploadtoken)
|
|
|
|
|
.multipart(form)
|
|
|
|
|
.send()
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|e| msghandlerError::UploadFailed(format!("Upload request failed: {}", e)))?;
|
|
|
|
|
.map_err(|e| MsgHandlerError::UploadFailed(format!("Upload request failed: {}", e)))?;
|
|
|
|
|
|
|
|
|
|
if !resp.status().is_success() {
|
|
|
|
|
return Err(msghandlerError::UploadFailed(format!(
|
|
|
|
|
return Err(MsgHandlerError::UploadFailed(format!(
|
|
|
|
|
"Upload failed with status: {}",
|
|
|
|
|
resp.status()
|
|
|
|
|
)));
|
|
|
|
|
@@ -532,8 +524,7 @@ impl FileUploadHandler for PlikOneshotUploadHandler {
|
|
|
|
|
let status_code = resp.status().as_u16();
|
|
|
|
|
let upload_json: JsonValue = resp
|
|
|
|
|
.json()
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|e| msghandlerError::UploadFailed(format!("Failed to parse upload response: {}", e)))?;
|
|
|
|
|
.map_err(|e| MsgHandlerError::UploadFailed(format!("Failed to parse upload response: {}", e)))?;
|
|
|
|
|
|
|
|
|
|
let fileid = upload_json["id"].as_str().unwrap_or("").to_string();
|
|
|
|
|
|
|
|
|
|
@@ -564,29 +555,29 @@ impl FileUploadHandler for PlikOneshotUploadHandler {
|
|
|
|
|
/// 4. Throws error after max_retries are exhausted
|
|
|
|
|
pub struct BackoffDownloadHandler;
|
|
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
|
impl FileDownloadHandler for BackoffDownloadHandler {
|
|
|
|
|
async fn download(
|
|
|
|
|
fn download(
|
|
|
|
|
&self,
|
|
|
|
|
url: &str,
|
|
|
|
|
max_retries: u32,
|
|
|
|
|
base_delay: u64,
|
|
|
|
|
max_delay: u64,
|
|
|
|
|
correlation_id: &str,
|
|
|
|
|
) -> Result<Vec<u8>, msghandlerError> {
|
|
|
|
|
) -> Result<Vec<u8>, MsgHandlerError> {
|
|
|
|
|
let client = Client::new();
|
|
|
|
|
let mut delay = base_delay;
|
|
|
|
|
|
|
|
|
|
for attempt in 1..=max_retries {
|
|
|
|
|
match client.get(url).send().await {
|
|
|
|
|
match client.get(url).send() {
|
|
|
|
|
Ok(response) if response.status().is_success() => {
|
|
|
|
|
log_trace(correlation_id, &format!(
|
|
|
|
|
"Successfully fetched {} on attempt {}",
|
|
|
|
|
url, attempt
|
|
|
|
|
));
|
|
|
|
|
let bytes = response.bytes().await
|
|
|
|
|
let bytes = response
|
|
|
|
|
.bytes()
|
|
|
|
|
.map(|b| b.to_vec())
|
|
|
|
|
.map_err(|_e| msghandlerError::DownloadFailed {
|
|
|
|
|
.map_err(|_e| MsgHandlerError::DownloadFailed {
|
|
|
|
|
url: url.to_string(),
|
|
|
|
|
retries: max_retries,
|
|
|
|
|
})?;
|
|
|
|
|
@@ -611,12 +602,12 @@ impl FileDownloadHandler for BackoffDownloadHandler {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if attempt < max_retries {
|
|
|
|
|
sleep(Duration::from_millis(delay)).await;
|
|
|
|
|
std::thread::sleep(Duration::from_millis(delay));
|
|
|
|
|
delay = (delay * 2).min(max_delay);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Err(msghandlerError::DownloadFailed {
|
|
|
|
|
Err(MsgHandlerError::DownloadFailed {
|
|
|
|
|
url: url.to_string(),
|
|
|
|
|
retries: max_retries,
|
|
|
|
|
})
|
|
|
|
|
@@ -629,14 +620,14 @@ impl FileDownloadHandler for BackoffDownloadHandler {
|
|
|
|
|
|
|
|
|
|
/// Serialize payload data according to the specified payload type.
|
|
|
|
|
/// Returns the raw bytes for the serialized data.
|
|
|
|
|
fn serialize_data(payload: &Payload) -> Result<Vec<u8>, msghandlerError> {
|
|
|
|
|
fn serialize_data(payload: &Payload) -> Result<Vec<u8>, MsgHandlerError> {
|
|
|
|
|
match payload {
|
|
|
|
|
Payload::Text(s) => Ok(s.as_bytes().to_vec()),
|
|
|
|
|
Payload::Dictionary(v) => serde_json::to_vec(v)
|
|
|
|
|
.map_err(|e| msghandlerError::DeserializationError(format!("Dictionary serialization failed: {}", e))),
|
|
|
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Dictionary serialization failed: {}", e))),
|
|
|
|
|
Payload::ArrowTable(b) => Ok(b.clone()),
|
|
|
|
|
Payload::JsonTable(v) => serde_json::to_vec(v)
|
|
|
|
|
.map_err(|e| msghandlerError::DeserializationError(format!("JsonTable serialization failed: {}", e))),
|
|
|
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("JsonTable serialization failed: {}", e))),
|
|
|
|
|
Payload::Image(b) => Ok(b.clone()),
|
|
|
|
|
Payload::Audio(b) => Ok(b.clone()),
|
|
|
|
|
Payload::Video(b) => Ok(b.clone()),
|
|
|
|
|
@@ -654,18 +645,18 @@ fn deserialize_data(
|
|
|
|
|
payload_bytes: &[u8],
|
|
|
|
|
payload_type: &str,
|
|
|
|
|
_correlation_id: &str,
|
|
|
|
|
) -> Result<Payload, msghandlerError> {
|
|
|
|
|
) -> Result<Payload, MsgHandlerError> {
|
|
|
|
|
match payload_type {
|
|
|
|
|
"text" => {
|
|
|
|
|
let text = String::from_utf8(payload_bytes.to_vec())
|
|
|
|
|
.map_err(|e| msghandlerError::DeserializationError(format!("Invalid UTF-8 for text: {}", e)))?;
|
|
|
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid UTF-8 for text: {}", e)))?;
|
|
|
|
|
Ok(Payload::Text(text))
|
|
|
|
|
}
|
|
|
|
|
"dictionary" => {
|
|
|
|
|
let json_str = String::from_utf8(payload_bytes.to_vec())
|
|
|
|
|
.map_err(|e| msghandlerError::DeserializationError(format!("Invalid UTF-8 for dictionary: {}", e)))?;
|
|
|
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid UTF-8 for dictionary: {}", e)))?;
|
|
|
|
|
let value: JsonValue = serde_json::from_str(&json_str)
|
|
|
|
|
.map_err(|e| msghandlerError::DeserializationError(format!("Invalid JSON for dictionary: {}", e)))?;
|
|
|
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid JSON for dictionary: {}", e)))?;
|
|
|
|
|
Ok(Payload::Dictionary(value))
|
|
|
|
|
}
|
|
|
|
|
"arrowtable" => {
|
|
|
|
|
@@ -673,16 +664,16 @@ fn deserialize_data(
|
|
|
|
|
}
|
|
|
|
|
"jsontable" => {
|
|
|
|
|
let json_str = String::from_utf8(payload_bytes.to_vec())
|
|
|
|
|
.map_err(|e| msghandlerError::DeserializationError(format!("Invalid UTF-8 for jsontable: {}", e)))?;
|
|
|
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid UTF-8 for jsontable: {}", e)))?;
|
|
|
|
|
let value: JsonValue = serde_json::from_str(&json_str)
|
|
|
|
|
.map_err(|e| msghandlerError::DeserializationError(format!("Invalid JSON for jsontable: {}", e)))?;
|
|
|
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid JSON for jsontable: {}", e)))?;
|
|
|
|
|
Ok(Payload::JsonTable(value))
|
|
|
|
|
}
|
|
|
|
|
"image" => Ok(Payload::Image(payload_bytes.to_vec())),
|
|
|
|
|
"audio" => Ok(Payload::Audio(payload_bytes.to_vec())),
|
|
|
|
|
"video" => Ok(Payload::Video(payload_bytes.to_vec())),
|
|
|
|
|
"binary" => Ok(Payload::Binary(payload_bytes.to_vec())),
|
|
|
|
|
_ => Err(msghandlerError::UnknownPayloadType(payload_type.to_string())),
|
|
|
|
|
_ => Err(MsgHandlerError::UnknownPayloadType(payload_type.to_string())),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -720,13 +711,12 @@ pub fn log_trace(correlation_id: &str, message: &str) {
|
|
|
|
|
/// - `options`: Configuration options
|
|
|
|
|
///
|
|
|
|
|
/// # Returns
|
|
|
|
|
/// - `Result<(MsgEnvelopeV1, String), msghandlerError>` containing the envelope and JSON string
|
|
|
|
|
/// - `Result<(MsgEnvelopeV1, String), MsgHandlerError>` containing the envelope and JSON string
|
|
|
|
|
///
|
|
|
|
|
/// # Example
|
|
|
|
|
/// ```no_run
|
|
|
|
|
/// use msghandler::{smartsend, Payload, SmartsendOptions};
|
|
|
|
|
///
|
|
|
|
|
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
|
|
|
|
|
/// let (envelope, json_str) = smartsend(
|
|
|
|
|
/// "/agent/wine/api/v1/prompt",
|
|
|
|
|
/// &[
|
|
|
|
|
@@ -734,17 +724,15 @@ pub fn log_trace(correlation_id: &str, message: &str) {
|
|
|
|
|
/// ("data".to_string(), Payload::Binary(vec![1, 2, 3]), "binary".to_string()),
|
|
|
|
|
/// ],
|
|
|
|
|
/// &SmartsendOptions::default(),
|
|
|
|
|
/// ).await?;
|
|
|
|
|
/// ).unwrap();
|
|
|
|
|
///
|
|
|
|
|
/// // Caller publishes via their preferred transport
|
|
|
|
|
/// # Ok(())
|
|
|
|
|
/// # }
|
|
|
|
|
/// ```
|
|
|
|
|
pub async fn smartsend(
|
|
|
|
|
pub fn smartsend(
|
|
|
|
|
subject: &str,
|
|
|
|
|
data: &[(String, Payload, String)],
|
|
|
|
|
options: &SmartsendOptions,
|
|
|
|
|
) -> Result<(MsgEnvelopeV1, String), msghandlerError> {
|
|
|
|
|
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
|
|
|
|
let correlation_id = if options.correlation_id.is_empty() {
|
|
|
|
|
Uuid::new_v4().to_string()
|
|
|
|
|
} else {
|
|
|
|
|
@@ -810,8 +798,7 @@ pub async fn smartsend(
|
|
|
|
|
log_trace(&correlation_id, "Using link transport, uploading to fileserver");
|
|
|
|
|
|
|
|
|
|
let upload_result = upload_handler
|
|
|
|
|
.upload(&options.fileserver_url, dataname, &payload_bytes)
|
|
|
|
|
.await?;
|
|
|
|
|
.upload(&options.fileserver_url, dataname, &payload_bytes)?;
|
|
|
|
|
|
|
|
|
|
log_trace(&correlation_id, &format!(
|
|
|
|
|
"Uploaded to URL: {}", upload_result.url
|
|
|
|
|
@@ -889,14 +876,13 @@ fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> Ms
|
|
|
|
|
/// - `options`: Configuration options
|
|
|
|
|
///
|
|
|
|
|
/// # Returns
|
|
|
|
|
/// - `Result<MsgEnvelopeV1, msghandlerError>` with deserialized payloads
|
|
|
|
|
/// - `Result<MsgEnvelopeV1, MsgHandlerError>` with deserialized payloads
|
|
|
|
|
///
|
|
|
|
|
/// # Example
|
|
|
|
|
/// ```no_run
|
|
|
|
|
/// use msghandler::{smartreceive, SmartreceiveOptions};
|
|
|
|
|
/// use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
|
|
|
|
|
///
|
|
|
|
|
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
|
|
|
|
|
/// let msg_json_str = r#"{"correlation_id":"abc123","msg_id":"msg-uuid",
|
|
|
|
|
/// "timestamp":"2026-01-01T00:00:00Z","send_to":"/test",
|
|
|
|
|
/// "msg_purpose":"chat","sender_name":"test","sender_id":"sender-uuid",
|
|
|
|
|
@@ -907,26 +893,24 @@ fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> Ms
|
|
|
|
|
/// "data":"SGVsbG8=","metadata":{"payload_bytes":5}
|
|
|
|
|
/// }]}"#;
|
|
|
|
|
///
|
|
|
|
|
/// let envelope = smartreceive(msg_json_str, &SmartreceiveOptions::default()).await?;
|
|
|
|
|
/// let envelope = smartreceive(msg_json_str, &SmartreceiveOptions::default()).unwrap();
|
|
|
|
|
///
|
|
|
|
|
/// for payload in &envelope.payloads {
|
|
|
|
|
/// if payload.transport == "direct" {
|
|
|
|
|
/// let decoded = BASE64.decode(&payload.data)?;
|
|
|
|
|
/// let decoded = BASE64.decode(&payload.data).unwrap();
|
|
|
|
|
/// println!("{}: {}", payload.dataname, String::from_utf8_lossy(&decoded));
|
|
|
|
|
/// } else {
|
|
|
|
|
/// println!("{}: URL={}", payload.dataname, payload.data);
|
|
|
|
|
/// }
|
|
|
|
|
/// }
|
|
|
|
|
/// # Ok(())
|
|
|
|
|
/// # }
|
|
|
|
|
/// ```
|
|
|
|
|
pub async fn smartreceive(
|
|
|
|
|
pub fn smartreceive(
|
|
|
|
|
msg_json_str: &str,
|
|
|
|
|
options: &SmartreceiveOptions,
|
|
|
|
|
) -> Result<MsgEnvelopeV1, msghandlerError> {
|
|
|
|
|
) -> Result<MsgEnvelopeV1, MsgHandlerError> {
|
|
|
|
|
// Parse the JSON envelope
|
|
|
|
|
let mut env: MsgEnvelopeV1 = serde_json::from_str(msg_json_str)
|
|
|
|
|
.map_err(|e| msghandlerError::InvalidEnvelope(format!(
|
|
|
|
|
.map_err(|e| MsgHandlerError::InvalidEnvelope(format!(
|
|
|
|
|
"Failed to parse envelope JSON: {}", e
|
|
|
|
|
)))?;
|
|
|
|
|
|
|
|
|
|
@@ -953,7 +937,7 @@ pub async fn smartreceive(
|
|
|
|
|
|
|
|
|
|
// Decode Base64 payload
|
|
|
|
|
let payload_bytes = BASE64.decode(&payload.data)
|
|
|
|
|
.map_err(|e| msghandlerError::Base64Error(format!(
|
|
|
|
|
.map_err(|e| MsgHandlerError::Base64Error(format!(
|
|
|
|
|
"Base64 decode failed for '{}': {}", dataname, e
|
|
|
|
|
)))?;
|
|
|
|
|
|
|
|
|
|
@@ -981,8 +965,7 @@ pub async fn smartreceive(
|
|
|
|
|
options.base_delay,
|
|
|
|
|
options.max_delay,
|
|
|
|
|
&correlation_id,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
)?;
|
|
|
|
|
|
|
|
|
|
// Deserialize based on type and store result back into payload
|
|
|
|
|
let deserialized = deserialize_data(
|
|
|
|
|
@@ -995,7 +978,7 @@ pub async fn smartreceive(
|
|
|
|
|
updated_payloads.push(updated);
|
|
|
|
|
}
|
|
|
|
|
unknown => {
|
|
|
|
|
return Err(msghandlerError::UnknownTransport(format!(
|
|
|
|
|
return Err(MsgHandlerError::UnknownTransport(format!(
|
|
|
|
|
"Unknown transport type '{}' for payload '{}'",
|
|
|
|
|
unknown, dataname
|
|
|
|
|
)));
|
|
|
|
|
@@ -1012,11 +995,11 @@ pub async fn smartreceive(
|
|
|
|
|
// ============================================================================
|
|
|
|
|
|
|
|
|
|
/// Send a single text payload
|
|
|
|
|
pub async fn send_text(
|
|
|
|
|
pub fn send_text(
|
|
|
|
|
subject: &str,
|
|
|
|
|
text: &str,
|
|
|
|
|
options: &SmartsendOptions,
|
|
|
|
|
) -> Result<(MsgEnvelopeV1, String), msghandlerError> {
|
|
|
|
|
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
|
|
|
|
smartsend(
|
|
|
|
|
subject,
|
|
|
|
|
&[(
|
|
|
|
|
@@ -1026,15 +1009,14 @@ pub async fn send_text(
|
|
|
|
|
)],
|
|
|
|
|
options,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Send a single dictionary payload
|
|
|
|
|
pub async fn send_dictionary(
|
|
|
|
|
pub fn send_dictionary(
|
|
|
|
|
subject: &str,
|
|
|
|
|
data: &JsonValue,
|
|
|
|
|
options: &SmartsendOptions,
|
|
|
|
|
) -> Result<(MsgEnvelopeV1, String), msghandlerError> {
|
|
|
|
|
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
|
|
|
|
smartsend(
|
|
|
|
|
subject,
|
|
|
|
|
&[(
|
|
|
|
|
@@ -1044,15 +1026,14 @@ pub async fn send_dictionary(
|
|
|
|
|
)],
|
|
|
|
|
options,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Send a single binary payload
|
|
|
|
|
pub async fn send_binary(
|
|
|
|
|
pub fn send_binary(
|
|
|
|
|
subject: &str,
|
|
|
|
|
data: &[u8],
|
|
|
|
|
options: &SmartsendOptions,
|
|
|
|
|
) -> Result<(MsgEnvelopeV1, String), msghandlerError> {
|
|
|
|
|
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
|
|
|
|
smartsend(
|
|
|
|
|
subject,
|
|
|
|
|
&[(
|
|
|
|
|
@@ -1062,7 +1043,6 @@ pub async fn send_binary(
|
|
|
|
|
)],
|
|
|
|
|
options,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
@@ -1079,25 +1059,22 @@ pub async fn send_binary(
|
|
|
|
|
/// - `filepath`: Full path to the local file to upload
|
|
|
|
|
///
|
|
|
|
|
/// # Returns
|
|
|
|
|
/// - `Result<UploadResult, msghandlerError>` with uploadid, fileid, and download URL
|
|
|
|
|
/// - `Result<UploadResult, MsgHandlerError>` with uploadid, fileid, and download URL
|
|
|
|
|
///
|
|
|
|
|
/// # Example
|
|
|
|
|
/// ```no_run
|
|
|
|
|
/// use msghandler::plik_upload_file;
|
|
|
|
|
///
|
|
|
|
|
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
|
|
|
|
|
/// let result = plik_upload_file("http://localhost:8080", "./large_file.zip").await?;
|
|
|
|
|
/// let result = plik_upload_file("http://localhost:8080", "./large_file.zip").unwrap();
|
|
|
|
|
/// println!("Uploaded to: {}", result.url);
|
|
|
|
|
/// # Ok(())
|
|
|
|
|
/// # }
|
|
|
|
|
/// ```
|
|
|
|
|
pub async fn plik_upload_file(
|
|
|
|
|
pub fn plik_upload_file(
|
|
|
|
|
file_server_url: &str,
|
|
|
|
|
filepath: &str,
|
|
|
|
|
) -> Result<UploadResult, msghandlerError> {
|
|
|
|
|
) -> Result<UploadResult, MsgHandlerError> {
|
|
|
|
|
// Read the file from disk
|
|
|
|
|
let data = tokio::fs::read(filepath).await
|
|
|
|
|
.map_err(|e| msghandlerError::IoError(format!(
|
|
|
|
|
let data = std::fs::read(filepath)
|
|
|
|
|
.map_err(|e| MsgHandlerError::IoError(format!(
|
|
|
|
|
"Failed to read file '{}': {}", filepath, e
|
|
|
|
|
)))?;
|
|
|
|
|
|
|
|
|
|
@@ -1108,7 +1085,7 @@ pub async fn plik_upload_file(
|
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
|
|
|
|
|
// Upload using the Plik one-shot handler
|
|
|
|
|
PlikOneshotUploadHandler.upload(file_server_url, &dataname, &data).await
|
|
|
|
|
PlikOneshotUploadHandler.upload(file_server_url, &dataname, &data)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================================================================
|
|
|
|
|
@@ -1123,7 +1100,7 @@ pub async fn plik_upload_file(
|
|
|
|
|
// - `SmartsendOptions`, `SmartreceiveOptions` - configuration
|
|
|
|
|
// - `FileUploadHandler`, `FileDownloadHandler` - trait abstractions
|
|
|
|
|
// - `PlikOneshotUploadHandler`, `BackoffDownloadHandler` - default implementations
|
|
|
|
|
// - `msghandlerError` - error type
|
|
|
|
|
// - `MsgHandlerError` - error type
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
@@ -1204,10 +1181,10 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_error_display() {
|
|
|
|
|
let err = msghandlerError::UnknownPayloadType("custom_type".to_string());
|
|
|
|
|
let err = MsgHandlerError::UnknownPayloadType("custom_type".to_string());
|
|
|
|
|
assert!(format!("{}", err).contains("custom_type"));
|
|
|
|
|
|
|
|
|
|
let err = msghandlerError::DownloadFailed {
|
|
|
|
|
let err = MsgHandlerError::DownloadFailed {
|
|
|
|
|
url: "http://example.com/file".to_string(),
|
|
|
|
|
retries: 5,
|
|
|
|
|
};
|
|
|
|
|
|