1207 lines
43 KiB
Rust
1207 lines
43 KiB
Rust
// msghandler Rust Module
|
|
// Cross-platform bi-directional data bridge
|
|
// Implements smartpack and smartunpack for message transport
|
|
// with support for both direct payload transport and URL-based transport
|
|
// for larger payloads using the Claim-Check pattern.
|
|
//
|
|
// File Server Handler Architecture:
|
|
// The system uses handler functions to abstract file server operations,
|
|
// allowing support for different file server implementations
|
|
// (e.g., Plik, AWS S3, custom HTTP server).
|
|
//
|
|
// Multi-Payload Support (Standard API):
|
|
// The system uses a standardized tuple format for all payload operations.
|
|
// Each payload is (dataname, data, type) and can have a different type,
|
|
// enabling mixed-content messages (e.g., chat with text, images, audio).
|
|
//
|
|
// Supported types: "text", "dictionary", "arrowtable", "jsontable",
|
|
// "image", "audio", "video", "binary"
|
|
|
|
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
|
|
use chrono::Utc;
|
|
use reqwest::blocking::Client;
|
|
use serde::{Deserialize, Serialize};
|
|
use serde_json::Value as JsonValue;
|
|
use std::collections::HashMap;
|
|
use std::fmt;
|
|
use std::path::Path;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use uuid::Uuid;
|
|
|
|
// ============================================================================
|
|
// Constants
|
|
// ============================================================================
|
|
|
|
/// Default size threshold (0.5MB) for switching from direct to link transport
|
|
pub const DEFAULT_SIZE_THRESHOLD: usize = 500_000;
|
|
|
|
/// Default broker URL
|
|
pub const DEFAULT_BROKER_URL: &str = "localhost:4222";
|
|
|
|
/// Default HTTP file server URL for link transport
|
|
pub const DEFAULT_FILESERVER_URL: &str = "http://localhost:8080";
|
|
|
|
/// Default max retries for download with exponential backoff
|
|
pub const DEFAULT_MAX_RETRIES: u32 = 5;
|
|
|
|
/// Default base delay for exponential backoff (milliseconds)
|
|
pub const DEFAULT_BASE_DELAY: u64 = 100;
|
|
|
|
/// Default max delay for exponential backoff (milliseconds)
|
|
pub const DEFAULT_MAX_DELAY: u64 = 5_000;
|
|
|
|
// ============================================================================
|
|
// Error Types
|
|
// ============================================================================
|
|
|
|
/// Errors that can occur during msghandler operations
|
|
#[derive(Debug)]
|
|
pub enum MsgHandlerError {
|
|
/// Unsupported or unknown payload type
|
|
UnknownPayloadType(String),
|
|
/// File server upload failed
|
|
UploadFailed(String),
|
|
/// File server download failed after retries
|
|
DownloadFailed { url: String, retries: u32 },
|
|
/// Unknown transport type
|
|
UnknownTransport(String),
|
|
/// Connection failed
|
|
ConnectionFailed(String),
|
|
/// Payload deserialization error
|
|
DeserializationError(String),
|
|
/// HTTP request error
|
|
HttpError { status: u16, message: String },
|
|
/// IO error
|
|
IoError(String),
|
|
/// JSON serialization/deserialization error
|
|
JsonError(String),
|
|
/// Base64 decode error
|
|
Base64Error(String),
|
|
/// Payload size exceeded maximum
|
|
SizeExceeded { size: usize, max: usize },
|
|
/// Invalid envelope (missing required fields)
|
|
InvalidEnvelope(String),
|
|
}
|
|
|
|
impl fmt::Display for MsgHandlerError {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
match self {
|
|
MsgHandlerError::UnknownPayloadType(p) => write!(f, "Unknown payload_type: {}", p),
|
|
MsgHandlerError::UploadFailed(msg) => write!(f, "Failed to upload: {}", msg),
|
|
MsgHandlerError::DownloadFailed { url, retries } => {
|
|
write!(f, "Failed to fetch {} after {} attempts", url, retries)
|
|
}
|
|
MsgHandlerError::UnknownTransport(t) => write!(f, "Unknown transport type: {}", t),
|
|
MsgHandlerError::ConnectionFailed(msg) => write!(f, "Connection failed: {}", msg),
|
|
MsgHandlerError::DeserializationError(msg) => {
|
|
write!(f, "Deserialization error: {}", msg)
|
|
}
|
|
MsgHandlerError::HttpError { status, message } => {
|
|
write!(f, "HTTP error {}: {}", status, message)
|
|
}
|
|
MsgHandlerError::IoError(msg) => write!(f, "IO error: {}", msg),
|
|
MsgHandlerError::JsonError(msg) => write!(f, "JSON error: {}", msg),
|
|
MsgHandlerError::Base64Error(msg) => write!(f, "Base64 error: {}", msg),
|
|
MsgHandlerError::SizeExceeded { size, max } => {
|
|
write!(f, "Payload size {} exceeds max {}", size, max)
|
|
}
|
|
MsgHandlerError::InvalidEnvelope(msg) => write!(f, "Invalid envelope: {}", msg),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for MsgHandlerError {}
|
|
|
|
// ============================================================================
|
|
// Payload Enum - Type-safe payload data
|
|
// ============================================================================
|
|
|
|
/// Type-safe payload data for sending. Each variant represents a supported
|
|
/// payload type with its corresponding data representation.
|
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
|
pub enum Payload {
|
|
Text(String),
|
|
Dictionary(JsonValue),
|
|
ArrowTable(Vec<u8>),
|
|
JsonTable(JsonValue),
|
|
Image(Vec<u8>),
|
|
Audio(Vec<u8>),
|
|
Video(Vec<u8>),
|
|
Binary(Vec<u8>),
|
|
}
|
|
|
|
impl Payload {
|
|
/// Get the payload type string for this variant
|
|
pub fn payload_type(&self) -> &'static str {
|
|
match self {
|
|
Payload::Text(_) => "text",
|
|
Payload::Dictionary(_) => "dictionary",
|
|
Payload::ArrowTable(_) => "arrowtable",
|
|
Payload::JsonTable(_) => "jsontable",
|
|
Payload::Image(_) => "image",
|
|
Payload::Audio(_) => "audio",
|
|
Payload::Video(_) => "video",
|
|
Payload::Binary(_) => "binary",
|
|
}
|
|
}
|
|
|
|
/// Get the serialized bytes for this payload
|
|
pub fn serialized_bytes(&self) -> Vec<u8> {
|
|
match self {
|
|
Payload::Text(s) => s.as_bytes().to_vec(),
|
|
Payload::Dictionary(v) => serde_json::to_vec(v).unwrap_or_default(),
|
|
Payload::ArrowTable(b) => b.clone(),
|
|
Payload::JsonTable(v) => serde_json::to_vec(v).unwrap_or_default(),
|
|
Payload::Image(b) => b.clone(),
|
|
Payload::Audio(b) => b.clone(),
|
|
Payload::Video(b) => b.clone(),
|
|
Payload::Binary(b) => b.clone(),
|
|
}
|
|
}
|
|
|
|
/// Get the size of the serialized bytes
|
|
pub fn size(&self) -> usize {
|
|
self.serialized_bytes().len()
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Message Payload Structure (wire format)
|
|
// ============================================================================
|
|
|
|
/// Represents a single payload within a message envelope.
|
|
/// Supports both direct transport (base64-encoded data) and link transport (URL-based).
|
|
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
|
|
pub struct MsgPayloadV1 {
|
|
/// Unique identifier for this payload (UUID v4)
|
|
pub id: String,
|
|
/// Name of the payload (e.g., "login_image", "msg")
|
|
pub dataname: String,
|
|
/// Payload type: "text", "dictionary", "arrowtable", "jsontable",
|
|
/// "image", "audio", "video", "binary"
|
|
pub payload_type: String,
|
|
/// Transport method: "direct" or "link"
|
|
pub transport: String,
|
|
/// Encoding method: "none", "json", "base64", "arrow-ipc"
|
|
pub encoding: String,
|
|
/// Size of the payload in bytes
|
|
pub size: usize,
|
|
/// Payload data (base64 string for direct transport, URL for link transport)
|
|
pub data: String,
|
|
/// Optional payload-level metadata
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub metadata: Option<HashMap<String, JsonValue>>,
|
|
}
|
|
|
|
impl MsgPayloadV1 {
|
|
/// Create a new direct transport payload
|
|
pub fn new_direct(
|
|
dataname: String,
|
|
payload_type: String,
|
|
data: String,
|
|
size: usize,
|
|
) -> Self {
|
|
let encoding = match payload_type.as_str() {
|
|
"jsontable" => "json".to_string(),
|
|
"arrowtable" => "arrow-ipc".to_string(),
|
|
_ => "base64".to_string(),
|
|
};
|
|
MsgPayloadV1 {
|
|
id: Uuid::new_v4().to_string(),
|
|
dataname,
|
|
payload_type,
|
|
transport: "direct".to_string(),
|
|
encoding,
|
|
size,
|
|
data,
|
|
metadata: Some({
|
|
let mut m = HashMap::new();
|
|
m.insert(
|
|
"payload_bytes".to_string(),
|
|
JsonValue::Number(size.into()),
|
|
);
|
|
m
|
|
}),
|
|
}
|
|
}
|
|
|
|
/// Create a new link transport payload
|
|
pub fn new_link(
|
|
dataname: String,
|
|
payload_type: String,
|
|
url: String,
|
|
size: usize,
|
|
) -> Self {
|
|
let encoding = match payload_type.as_str() {
|
|
"jsontable" => "json".to_string(),
|
|
"arrowtable" => "arrow-ipc".to_string(),
|
|
_ => "none".to_string(),
|
|
};
|
|
MsgPayloadV1 {
|
|
id: Uuid::new_v4().to_string(),
|
|
dataname,
|
|
payload_type,
|
|
transport: "link".to_string(),
|
|
encoding,
|
|
size,
|
|
data: url,
|
|
metadata: None,
|
|
}
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Message Envelope Structure (wire format)
|
|
// ============================================================================
|
|
|
|
/// Represents a complete message envelope containing multiple payloads
|
|
/// with metadata for routing, tracing, and message context.
|
|
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
|
|
pub struct MsgEnvelopeV1 {
|
|
/// Unique identifier to track messages across systems (UUID v4)
|
|
pub correlation_id: String,
|
|
/// Unique message identifier (UUID v4)
|
|
pub msg_id: String,
|
|
/// Message publication timestamp (ISO 8601 UTC)
|
|
pub timestamp: String,
|
|
|
|
/// Subject/topic to send the message to
|
|
pub send_to: String,
|
|
/// Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown",
|
|
/// "chat", "command", "event"
|
|
pub msg_purpose: String,
|
|
/// Sender application name
|
|
pub sender_name: String,
|
|
/// Sender UUID (UUID v4)
|
|
pub sender_id: String,
|
|
/// Receiver application name (empty string = broadcast)
|
|
pub receiver_name: String,
|
|
/// Receiver UUID (empty string = broadcast)
|
|
pub receiver_id: String,
|
|
|
|
/// Topic where receiver should reply
|
|
pub reply_to: String,
|
|
/// Message ID this message is replying to
|
|
pub reply_to_msg_id: String,
|
|
/// Broker URL
|
|
pub broker_url: String,
|
|
|
|
/// Optional message-level metadata
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub metadata: Option<HashMap<String, JsonValue>>,
|
|
/// List of payloads
|
|
pub payloads: Vec<MsgPayloadV1>,
|
|
}
|
|
|
|
impl MsgEnvelopeV1 {
|
|
/// Create a new message envelope
|
|
pub fn new(send_to: String, payloads: Vec<MsgPayloadV1>) -> Self {
|
|
MsgEnvelopeV1 {
|
|
correlation_id: Uuid::new_v4().to_string(),
|
|
msg_id: Uuid::new_v4().to_string(),
|
|
timestamp: Utc::now().to_rfc3339(),
|
|
send_to,
|
|
msg_purpose: "chat".to_string(),
|
|
sender_name: "msghandler".to_string(),
|
|
sender_id: Uuid::new_v4().to_string(),
|
|
receiver_name: String::new(),
|
|
receiver_id: String::new(),
|
|
reply_to: String::new(),
|
|
reply_to_msg_id: String::new(),
|
|
broker_url: DEFAULT_BROKER_URL.to_string(),
|
|
metadata: Some(HashMap::new()),
|
|
payloads,
|
|
}
|
|
}
|
|
|
|
/// Convert the envelope to a JSON string for transport
|
|
pub fn to_json(&self) -> Result<String, MsgHandlerError> {
|
|
serde_json::to_string(self).map_err(|e| MsgHandlerError::JsonError(e.to_string()))
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Options Structures
|
|
// ============================================================================
|
|
|
|
/// Options for the `smartpack` function
|
|
pub struct smartpackOptions {
|
|
/// Broker URL
|
|
pub broker_url: String,
|
|
/// HTTP file server URL for large payloads
|
|
pub fileserver_url: String,
|
|
/// Custom file server upload handler (optional, uses Plik by default)
|
|
pub fileserver_upload_handler: Option<Arc<dyn FileUploadHandler>>,
|
|
/// Size threshold in bytes for switching from direct to link transport
|
|
pub size_threshold: usize,
|
|
/// Correlation ID for distributed tracing (auto-generated if empty)
|
|
pub correlation_id: String,
|
|
/// Purpose of the message
|
|
pub msg_purpose: String,
|
|
/// Sender application name
|
|
pub sender_name: String,
|
|
/// Receiver application name (empty = broadcast)
|
|
pub receiver_name: String,
|
|
/// Receiver UUID (empty = broadcast)
|
|
pub receiver_id: String,
|
|
/// Topic to reply to
|
|
pub reply_to: String,
|
|
/// Message ID being replied to
|
|
pub reply_to_msg_id: String,
|
|
/// Message ID (auto-generated if empty)
|
|
pub msg_id: String,
|
|
/// Sender UUID (auto-generated if empty)
|
|
pub sender_id: String,
|
|
}
|
|
|
|
impl Default for smartpackOptions {
|
|
fn default() -> Self {
|
|
smartpackOptions {
|
|
broker_url: DEFAULT_BROKER_URL.to_string(),
|
|
fileserver_url: DEFAULT_FILESERVER_URL.to_string(),
|
|
fileserver_upload_handler: None,
|
|
size_threshold: DEFAULT_SIZE_THRESHOLD,
|
|
correlation_id: String::new(),
|
|
msg_purpose: "chat".to_string(),
|
|
sender_name: "msghandler".to_string(),
|
|
receiver_name: String::new(),
|
|
receiver_id: String::new(),
|
|
reply_to: String::new(),
|
|
reply_to_msg_id: String::new(),
|
|
msg_id: String::new(),
|
|
sender_id: String::new(),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Options for the `smartunpack` function
|
|
pub struct smartunpackOptions {
|
|
/// Custom file server download handler (optional, uses exponential backoff by default)
|
|
pub fileserver_download_handler: Option<Arc<dyn FileDownloadHandler>>,
|
|
/// Maximum retry attempts for fetching a URL
|
|
pub max_retries: u32,
|
|
/// Initial delay for exponential backoff in milliseconds
|
|
pub base_delay: u64,
|
|
/// Maximum delay for exponential backoff in milliseconds
|
|
pub max_delay: u64,
|
|
}
|
|
|
|
impl Default for smartunpackOptions {
|
|
fn default() -> Self {
|
|
smartunpackOptions {
|
|
fileserver_download_handler: None,
|
|
max_retries: DEFAULT_MAX_RETRIES,
|
|
base_delay: DEFAULT_BASE_DELAY,
|
|
max_delay: DEFAULT_MAX_DELAY,
|
|
}
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// File Server Handler Traits
|
|
// ============================================================================
|
|
|
|
/// Trait for uploading data to a file server
|
|
pub trait FileUploadHandler: Send + Sync {
|
|
/// Upload data to the file server
|
|
/// Returns upload ID, file ID, and download URL
|
|
fn upload(
|
|
&self,
|
|
file_server_url: &str,
|
|
dataname: &str,
|
|
data: &[u8],
|
|
) -> Result<UploadResult, MsgHandlerError>;
|
|
}
|
|
|
|
/// Result of a file server upload
|
|
#[derive(Debug, Clone)]
|
|
pub struct UploadResult {
|
|
/// HTTP response status code
|
|
pub status: u16,
|
|
/// Upload session identifier
|
|
pub uploadid: String,
|
|
/// File identifier within the session
|
|
pub fileid: String,
|
|
/// Full download URL
|
|
pub url: String,
|
|
}
|
|
|
|
/// Trait for downloading data from a file server
|
|
pub trait FileDownloadHandler: Send + Sync {
|
|
/// Download data from a URL with retry logic
|
|
fn download(
|
|
&self,
|
|
url: &str,
|
|
max_retries: u32,
|
|
base_delay: u64,
|
|
max_delay: u64,
|
|
correlation_id: &str,
|
|
) -> Result<Vec<u8>, MsgHandlerError>;
|
|
}
|
|
|
|
// ============================================================================
|
|
// Plik One-Shot Upload Handler Implementation
|
|
// ============================================================================
|
|
|
|
/// Default file server upload handler using Plik one-shot mode.
|
|
///
|
|
/// Workflow:
|
|
/// 1. Creates a one-shot upload session (POST /upload with `{"OneShot": true}`)
|
|
/// 2. Retrieves upload ID and token from response
|
|
/// 3. Uploads binary data as multipart form data with the token
|
|
/// 4. Returns identifiers and download URL
|
|
pub struct PlikOneshotUploadHandler;
|
|
|
|
impl FileUploadHandler for PlikOneshotUploadHandler {
|
|
fn upload(
|
|
&self,
|
|
file_server_url: &str,
|
|
dataname: &str,
|
|
data: &[u8],
|
|
) -> Result<UploadResult, MsgHandlerError> {
|
|
let client = Client::new();
|
|
|
|
// Step 1: Create one-shot upload session
|
|
let session_body = serde_json::json!({"OneShot": true});
|
|
let session_resp = client
|
|
.post(format!("{}/upload", file_server_url))
|
|
.header("Content-Type", "application/json")
|
|
.json(&session_body)
|
|
.send()
|
|
.map_err(|e| MsgHandlerError::UploadFailed(format!("Failed to create upload session: {}", e)))?;
|
|
|
|
if !session_resp.status().is_success() {
|
|
return Err(MsgHandlerError::UploadFailed(format!(
|
|
"Session creation failed with status: {}",
|
|
session_resp.status()
|
|
)));
|
|
}
|
|
|
|
let session_json: JsonValue = session_resp
|
|
.json()
|
|
.map_err(|e| MsgHandlerError::UploadFailed(format!("Failed to parse session response: {}", e)))?;
|
|
|
|
let uploadid = session_json["id"]
|
|
.as_str()
|
|
.unwrap_or("")
|
|
.to_string();
|
|
let uploadtoken = session_json["uploadToken"]
|
|
.as_str()
|
|
.unwrap_or("")
|
|
.to_string();
|
|
|
|
if uploadid.is_empty() || uploadtoken.is_empty() {
|
|
return Err(MsgHandlerError::UploadFailed(
|
|
"Missing uploadid or uploadToken in session response".to_string(),
|
|
));
|
|
}
|
|
|
|
// Step 2: Upload the file as multipart/form-data
|
|
let upload_url = format!("{}/file/{}", file_server_url, uploadid);
|
|
let form = reqwest::blocking::multipart::Form::new()
|
|
.part(
|
|
"file",
|
|
reqwest::blocking::multipart::Part::bytes(data.to_vec())
|
|
.file_name(dataname.to_string())
|
|
.mime_str("application/octet-stream")
|
|
.map_err(|e| MsgHandlerError::UploadFailed(format!("Invalid MIME type: {}", e)))?,
|
|
);
|
|
let resp = client
|
|
.post(&upload_url)
|
|
.header("X-UploadToken", &uploadtoken)
|
|
.multipart(form)
|
|
.send()
|
|
.map_err(|e| MsgHandlerError::UploadFailed(format!("Upload request failed: {}", e)))?;
|
|
|
|
if !resp.status().is_success() {
|
|
return Err(MsgHandlerError::UploadFailed(format!(
|
|
"Upload failed with status: {}",
|
|
resp.status()
|
|
)));
|
|
}
|
|
|
|
let status_code = resp.status().as_u16();
|
|
let upload_json: JsonValue = resp
|
|
.json()
|
|
.map_err(|e| MsgHandlerError::UploadFailed(format!("Failed to parse upload response: {}", e)))?;
|
|
|
|
let fileid = upload_json["id"].as_str().unwrap_or("").to_string();
|
|
|
|
let url = format!(
|
|
"{}/file/{}/{}/{}",
|
|
file_server_url, uploadid, fileid, dataname
|
|
);
|
|
|
|
Ok(UploadResult {
|
|
status: status_code,
|
|
uploadid,
|
|
fileid,
|
|
url,
|
|
})
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Exponential Backoff Download Handler Implementation
|
|
// ============================================================================
|
|
|
|
/// Default download handler using exponential backoff retry logic.
|
|
///
|
|
/// Workflow:
|
|
/// 1. Attempts to fetch data from URL
|
|
/// 2. On failure, retries with exponentially increasing delay
|
|
/// 3. Capped at max_delay between retries
|
|
/// 4. Throws error after max_retries are exhausted
|
|
pub struct BackoffDownloadHandler;
|
|
|
|
impl FileDownloadHandler for BackoffDownloadHandler {
|
|
fn download(
|
|
&self,
|
|
url: &str,
|
|
max_retries: u32,
|
|
base_delay: u64,
|
|
max_delay: u64,
|
|
correlation_id: &str,
|
|
) -> Result<Vec<u8>, MsgHandlerError> {
|
|
let client = Client::new();
|
|
let mut delay = base_delay;
|
|
|
|
for attempt in 1..=max_retries {
|
|
match client.get(url).send() {
|
|
Ok(response) if response.status().is_success() => {
|
|
log_trace(correlation_id, &format!(
|
|
"Successfully fetched {} on attempt {}",
|
|
url, attempt
|
|
));
|
|
let bytes = response
|
|
.bytes()
|
|
.map(|b| b.to_vec())
|
|
.map_err(|_e| MsgHandlerError::DownloadFailed {
|
|
url: url.to_string(),
|
|
retries: max_retries,
|
|
})?;
|
|
return Ok(bytes);
|
|
}
|
|
Ok(response) => {
|
|
log_trace(correlation_id, &format!(
|
|
"Attempt {} failed with status {}: {}",
|
|
attempt,
|
|
response.status(),
|
|
url
|
|
));
|
|
}
|
|
Err(e) => {
|
|
log_trace(correlation_id, &format!(
|
|
"Attempt {} failed: {}: {}",
|
|
attempt,
|
|
e,
|
|
url
|
|
));
|
|
}
|
|
}
|
|
|
|
if attempt < max_retries {
|
|
std::thread::sleep(Duration::from_millis(delay));
|
|
delay = (delay * 2).min(max_delay);
|
|
}
|
|
}
|
|
|
|
Err(MsgHandlerError::DownloadFailed {
|
|
url: url.to_string(),
|
|
retries: max_retries,
|
|
})
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Serialization
|
|
// ============================================================================
|
|
|
|
/// Serialize payload data according to the specified payload type.
|
|
/// Returns the raw bytes for the serialized data.
|
|
fn serialize_data(payload: &Payload) -> Result<Vec<u8>, MsgHandlerError> {
|
|
match payload {
|
|
Payload::Text(s) => Ok(s.as_bytes().to_vec()),
|
|
Payload::Dictionary(v) => serde_json::to_vec(v)
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Dictionary serialization failed: {}", e))),
|
|
Payload::ArrowTable(b) => Ok(b.clone()),
|
|
Payload::JsonTable(v) => serde_json::to_vec(v)
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("JsonTable serialization failed: {}", e))),
|
|
Payload::Image(b) => Ok(b.clone()),
|
|
Payload::Audio(b) => Ok(b.clone()),
|
|
Payload::Video(b) => Ok(b.clone()),
|
|
Payload::Binary(b) => Ok(b.clone()),
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Deserialization
|
|
// ============================================================================
|
|
|
|
/// Deserialize bytes back to a Payload based on the payload type.
|
|
/// Handles direct transport (base64 decoded) and link transport (fetched bytes).
|
|
fn deserialize_data(
|
|
payload_bytes: &[u8],
|
|
payload_type: &str,
|
|
_correlation_id: &str,
|
|
) -> Result<Payload, MsgHandlerError> {
|
|
match payload_type {
|
|
"text" => {
|
|
let text = String::from_utf8(payload_bytes.to_vec())
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid UTF-8 for text: {}", e)))?;
|
|
Ok(Payload::Text(text))
|
|
}
|
|
"dictionary" => {
|
|
let json_str = String::from_utf8(payload_bytes.to_vec())
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid UTF-8 for dictionary: {}", e)))?;
|
|
let value: JsonValue = serde_json::from_str(&json_str)
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid JSON for dictionary: {}", e)))?;
|
|
Ok(Payload::Dictionary(value))
|
|
}
|
|
"arrowtable" => {
|
|
Ok(Payload::ArrowTable(payload_bytes.to_vec()))
|
|
}
|
|
"jsontable" => {
|
|
let json_str = String::from_utf8(payload_bytes.to_vec())
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid UTF-8 for jsontable: {}", e)))?;
|
|
let value: JsonValue = serde_json::from_str(&json_str)
|
|
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid JSON for jsontable: {}", e)))?;
|
|
Ok(Payload::JsonTable(value))
|
|
}
|
|
"image" => Ok(Payload::Image(payload_bytes.to_vec())),
|
|
"audio" => Ok(Payload::Audio(payload_bytes.to_vec())),
|
|
"video" => Ok(Payload::Video(payload_bytes.to_vec())),
|
|
"binary" => Ok(Payload::Binary(payload_bytes.to_vec())),
|
|
_ => Err(MsgHandlerError::UnknownPayloadType(payload_type.to_string())),
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Logging
|
|
// ============================================================================
|
|
|
|
/// Log a trace message with correlation ID and timestamp.
|
|
/// Uses tokio::task::spawn_blocking to avoid blocking the async runtime.
|
|
pub fn log_trace(correlation_id: &str, message: &str) {
|
|
let ts = Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ");
|
|
eprintln!("[{}] [Correlation: {}] {}", ts, correlation_id, message);
|
|
}
|
|
|
|
// ============================================================================
|
|
// Public API: smartpack
|
|
// ============================================================================
|
|
|
|
/// Send data with automatic transport selection.
|
|
///
|
|
/// This function intelligently routes data delivery based on payload size.
|
|
/// If the serialized payload is smaller than `size_threshold`, it encodes the
|
|
/// data as Base64 and constructs a "direct" MsgPayloadV1. Otherwise, it uploads
|
|
/// the data to a file server and constructs a "link" MsgPayloadV1 with the URL.
|
|
///
|
|
/// Each payload in the list can have a different type, enabling mixed-content
|
|
/// messages (e.g., chat with text, images, audio).
|
|
///
|
|
/// Transport publishing is the caller's responsibility. This function returns the
|
|
/// envelope and its JSON string representation.
|
|
///
|
|
/// # Arguments
|
|
/// - `subject`: Subject/topic to send the message to
|
|
/// - `data`: Slice of (dataname, payload, payload_type) tuples
|
|
/// - `options`: Configuration options
|
|
///
|
|
/// # Returns
|
|
/// - `Result<(MsgEnvelopeV1, String), MsgHandlerError>` containing the envelope and JSON string
|
|
///
|
|
/// # Example
|
|
/// ```no_run
|
|
/// use msghandler::{smartpack, Payload, smartpackOptions};
|
|
///
|
|
/// let (envelope, json_str) = smartpack(
|
|
/// "/agent/wine/api/v1/prompt",
|
|
/// &[
|
|
/// ("msg".to_string(), Payload::Text("Hello!".to_string()), "text".to_string()),
|
|
/// ("data".to_string(), Payload::Binary(vec![1, 2, 3]), "binary".to_string()),
|
|
/// ],
|
|
/// &smartpackOptions::default(),
|
|
/// ).unwrap();
|
|
///
|
|
/// // Caller publishes via their preferred transport
|
|
/// ```
|
|
pub fn smartpack(
|
|
subject: &str,
|
|
data: &[(String, Payload, String)],
|
|
options: &smartpackOptions,
|
|
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
|
let correlation_id = if options.correlation_id.is_empty() {
|
|
Uuid::new_v4().to_string()
|
|
} else {
|
|
options.correlation_id.clone()
|
|
};
|
|
|
|
let msg_id = if options.msg_id.is_empty() {
|
|
Uuid::new_v4().to_string()
|
|
} else {
|
|
options.msg_id.clone()
|
|
};
|
|
|
|
let sender_id = if options.sender_id.is_empty() {
|
|
Uuid::new_v4().to_string()
|
|
} else {
|
|
options.sender_id.clone()
|
|
};
|
|
|
|
log_trace(&correlation_id, &format!(
|
|
"Starting smartpack for subject: {}", subject
|
|
));
|
|
|
|
let mut payloads: Vec<MsgPayloadV1> = Vec::new();
|
|
|
|
// Determine the upload handler to use (custom or default Plik)
|
|
let upload_handler: Arc<dyn FileUploadHandler> = options.fileserver_upload_handler.clone()
|
|
.unwrap_or_else(|| Arc::new(PlikOneshotUploadHandler));
|
|
|
|
for (dataname, payload, payload_type) in data {
|
|
// Use the explicitly provided payload_type from the tuple,
|
|
// or derive from the Payload enum
|
|
let ptype = if payload_type.is_empty() {
|
|
payload.payload_type().to_string()
|
|
} else {
|
|
payload_type.clone()
|
|
};
|
|
|
|
// Serialize the payload data
|
|
let payload_bytes = serialize_data(payload)?;
|
|
let payload_size = payload_bytes.len();
|
|
|
|
log_trace(&correlation_id, &format!(
|
|
"Serialized payload '{}' (type: {}) size: {} bytes",
|
|
dataname, ptype, payload_size
|
|
));
|
|
|
|
if payload_size < options.size_threshold {
|
|
// Direct transport: Base64 encode and include in message envelope
|
|
let payload_b64 = BASE64.encode(&payload_bytes);
|
|
log_trace(&correlation_id, &format!(
|
|
"Using direct transport for {} bytes", payload_size
|
|
));
|
|
|
|
let msg_payload = MsgPayloadV1::new_direct(
|
|
dataname.clone(),
|
|
ptype,
|
|
payload_b64,
|
|
payload_size,
|
|
);
|
|
payloads.push(msg_payload);
|
|
} else {
|
|
// Link transport: Upload to file server, include URL in message envelope
|
|
log_trace(&correlation_id, "Using link transport, uploading to fileserver");
|
|
|
|
let upload_result = upload_handler
|
|
.upload(&options.fileserver_url, dataname, &payload_bytes)?;
|
|
|
|
log_trace(&correlation_id, &format!(
|
|
"Uploaded to URL: {}", upload_result.url
|
|
));
|
|
|
|
let msg_payload = MsgPayloadV1::new_link(
|
|
dataname.clone(),
|
|
ptype,
|
|
upload_result.url,
|
|
payload_size,
|
|
);
|
|
payloads.push(msg_payload);
|
|
}
|
|
}
|
|
|
|
// Build the message envelope
|
|
let env = MsgEnvelopeV1 {
|
|
correlation_id: correlation_id.clone(),
|
|
msg_id,
|
|
timestamp: Utc::now().to_rfc3339(),
|
|
send_to: subject.to_string(),
|
|
msg_purpose: options.msg_purpose.clone(),
|
|
sender_name: options.sender_name.clone(),
|
|
sender_id,
|
|
receiver_name: options.receiver_name.clone(),
|
|
receiver_id: options.receiver_id.clone(),
|
|
reply_to: options.reply_to.clone(),
|
|
reply_to_msg_id: options.reply_to_msg_id.clone(),
|
|
broker_url: options.broker_url.clone(),
|
|
metadata: Some(HashMap::new()),
|
|
payloads,
|
|
};
|
|
|
|
let env_json_str = env.to_json()?;
|
|
|
|
log_trace(&correlation_id, "Envelope created successfully");
|
|
|
|
Ok((env, env_json_str))
|
|
}
|
|
|
|
// ============================================================================
|
|
// Helper: store deserialized data back into MsgPayloadV1
|
|
// ============================================================================
|
|
|
|
/// Store deserialized Payload data back into a MsgPayloadV1's data field.
|
|
/// After smartunpack(), payload.data contains the deserialized content as a string
|
|
/// (decoded text, JSON string, or base64 for binary types).
|
|
fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> MsgPayloadV1 {
|
|
let mut p = payload.clone();
|
|
match deserialized {
|
|
Payload::Text(s) => p.data = s.clone(),
|
|
Payload::Dictionary(v) => p.data = serde_json::to_string(v).unwrap_or_default(),
|
|
Payload::JsonTable(v) => p.data = serde_json::to_string(v).unwrap_or_default(),
|
|
Payload::ArrowTable(b) => p.data = BASE64.encode(b),
|
|
Payload::Image(b) | Payload::Audio(b) | Payload::Video(b) | Payload::Binary(b) => {
|
|
p.data = BASE64.encode(b);
|
|
}
|
|
}
|
|
p
|
|
}
|
|
|
|
// ============================================================================
|
|
// Public API: smartunpack
|
|
// ============================================================================
|
|
|
|
/// Receive and process messages.
|
|
///
|
|
/// This function processes incoming messages, handling both direct transport
|
|
/// (base64 decoded payloads) and link transport (URL-based payloads).
|
|
/// It deserializes the data based on the payload type and returns the envelope
|
|
/// with deserialized payloads.
|
|
///
|
|
/// # Arguments
|
|
/// - `msg_json_str`: JSON string from the message payload
|
|
/// - `options`: Configuration options
|
|
///
|
|
/// # Returns
|
|
/// - `Result<MsgEnvelopeV1, MsgHandlerError>` with deserialized payloads
|
|
///
|
|
/// # Example
|
|
/// ```no_run
|
|
/// use msghandler::{smartunpack, smartunpackOptions};
|
|
/// use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
|
|
///
|
|
/// let msg_json_str = r#"{"correlation_id":"abc123","msg_id":"msg-uuid",
|
|
/// "timestamp":"2026-01-01T00:00:00Z","send_to":"/test",
|
|
/// "msg_purpose":"chat","sender_name":"test","sender_id":"sender-uuid",
|
|
/// "receiver_name":"","receiver_id":"","reply_to":"","reply_to_msg_id":"",
|
|
/// "broker_url":"localhost:4222","payloads":[{
|
|
/// "id":"payload-uuid","dataname":"msg","payload_type":"text",
|
|
/// "transport":"direct","encoding":"base64","size":5,
|
|
/// "data":"SGVsbG8=","metadata":{"payload_bytes":5}
|
|
/// }]}"#;
|
|
///
|
|
/// let envelope = smartunpack(msg_json_str, &smartunpackOptions::default()).unwrap();
|
|
///
|
|
/// for payload in &envelope.payloads {
|
|
/// if payload.transport == "direct" {
|
|
/// let decoded = BASE64.decode(&payload.data).unwrap();
|
|
/// println!("{}: {}", payload.dataname, String::from_utf8_lossy(&decoded));
|
|
/// } else {
|
|
/// println!("{}: URL={}", payload.dataname, payload.data);
|
|
/// }
|
|
/// }
|
|
/// ```
|
|
pub fn smartunpack(
|
|
msg_json_str: &str,
|
|
options: &smartunpackOptions,
|
|
) -> Result<MsgEnvelopeV1, MsgHandlerError> {
|
|
// Parse the JSON envelope
|
|
let mut env: MsgEnvelopeV1 = serde_json::from_str(msg_json_str)
|
|
.map_err(|e| MsgHandlerError::InvalidEnvelope(format!(
|
|
"Failed to parse envelope JSON: {}", e
|
|
)))?;
|
|
|
|
let correlation_id = env.correlation_id.clone();
|
|
log_trace(&correlation_id, "Processing received message");
|
|
|
|
// Determine the download handler to use (custom or default backoff)
|
|
let download_handler: Arc<dyn FileDownloadHandler> = options.fileserver_download_handler.clone()
|
|
.unwrap_or_else(|| Arc::new(BackoffDownloadHandler));
|
|
|
|
// Process each payload
|
|
let mut updated_payloads: Vec<MsgPayloadV1> = Vec::new();
|
|
|
|
for payload in &env.payloads {
|
|
let transport = payload.transport.as_str();
|
|
let dataname = payload.dataname.clone();
|
|
let payload_type = payload.payload_type.clone();
|
|
|
|
match transport {
|
|
"direct" => {
|
|
log_trace(&correlation_id, &format!(
|
|
"Direct transport - decoding payload '{}'", dataname
|
|
));
|
|
|
|
// Decode Base64 payload
|
|
let payload_bytes = BASE64.decode(&payload.data)
|
|
.map_err(|e| MsgHandlerError::Base64Error(format!(
|
|
"Base64 decode failed for '{}': {}", dataname, e
|
|
)))?;
|
|
|
|
// Deserialize based on type and store result back into payload
|
|
let deserialized = deserialize_data(
|
|
&payload_bytes,
|
|
&payload_type,
|
|
&correlation_id,
|
|
)?;
|
|
let updated = store_deserialized_data(payload, &deserialized);
|
|
|
|
updated_payloads.push(updated);
|
|
}
|
|
"link" => {
|
|
let url = payload.data.clone();
|
|
log_trace(&correlation_id, &format!(
|
|
"Link transport - fetching '{}' from URL: {}", dataname, url
|
|
));
|
|
|
|
// Fetch with exponential backoff
|
|
let downloaded_data = download_handler
|
|
.download(
|
|
&url,
|
|
options.max_retries,
|
|
options.base_delay,
|
|
options.max_delay,
|
|
&correlation_id,
|
|
)?;
|
|
|
|
// Deserialize based on type and store result back into payload
|
|
let deserialized = deserialize_data(
|
|
&downloaded_data,
|
|
&payload_type,
|
|
&correlation_id,
|
|
)?;
|
|
let updated = store_deserialized_data(payload, &deserialized);
|
|
|
|
updated_payloads.push(updated);
|
|
}
|
|
unknown => {
|
|
return Err(MsgHandlerError::UnknownTransport(format!(
|
|
"Unknown transport type '{}' for payload '{}'",
|
|
unknown, dataname
|
|
)));
|
|
}
|
|
}
|
|
}
|
|
|
|
env.payloads = updated_payloads;
|
|
Ok(env)
|
|
}
|
|
|
|
// ============================================================================
|
|
// Convenience Functions
|
|
// ============================================================================
|
|
|
|
/// Send a single text payload
|
|
pub fn send_text(
|
|
subject: &str,
|
|
text: &str,
|
|
options: &smartpackOptions,
|
|
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
|
smartpack(
|
|
subject,
|
|
&[(
|
|
"text".to_string(),
|
|
Payload::Text(text.to_string()),
|
|
"text".to_string(),
|
|
)],
|
|
options,
|
|
)
|
|
}
|
|
|
|
/// Send a single dictionary payload
|
|
pub fn send_dictionary(
|
|
subject: &str,
|
|
data: &JsonValue,
|
|
options: &smartpackOptions,
|
|
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
|
smartpack(
|
|
subject,
|
|
&[(
|
|
"dictionary".to_string(),
|
|
Payload::Dictionary(data.clone()),
|
|
"dictionary".to_string(),
|
|
)],
|
|
options,
|
|
)
|
|
}
|
|
|
|
/// Send a single binary payload
|
|
pub fn send_binary(
|
|
subject: &str,
|
|
data: &[u8],
|
|
options: &smartpackOptions,
|
|
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
|
smartpack(
|
|
subject,
|
|
&[(
|
|
"binary".to_string(),
|
|
Payload::Binary(data.to_vec()),
|
|
"binary".to_string(),
|
|
)],
|
|
options,
|
|
)
|
|
}
|
|
|
|
// ============================================================================
|
|
// Plik File Upload from Disk
|
|
// ============================================================================
|
|
|
|
/// Upload a file from disk to a Plik server in one-shot mode.
|
|
///
|
|
/// Reads the file at `filepath`, extracts its filename, and uploads it
|
|
/// using the Plik one-shot upload protocol (multipart/form-data).
|
|
///
|
|
/// # Arguments
|
|
/// - `file_server_url`: Base URL of the Plik server (e.g., `"http://localhost:8080"`)
|
|
/// - `filepath`: Full path to the local file to upload
|
|
///
|
|
/// # Returns
|
|
/// - `Result<UploadResult, MsgHandlerError>` with uploadid, fileid, and download URL
|
|
///
|
|
/// # Example
|
|
/// ```no_run
|
|
/// use msghandler::plik_upload_file;
|
|
///
|
|
/// let result = plik_upload_file("http://localhost:8080", "./large_file.zip").unwrap();
|
|
/// println!("Uploaded to: {}", result.url);
|
|
/// ```
|
|
pub fn plik_upload_file(
|
|
file_server_url: &str,
|
|
filepath: &str,
|
|
) -> Result<UploadResult, MsgHandlerError> {
|
|
// Read the file from disk
|
|
let data = std::fs::read(filepath)
|
|
.map_err(|e| MsgHandlerError::IoError(format!(
|
|
"Failed to read file '{}': {}", filepath, e
|
|
)))?;
|
|
|
|
// Extract filename from path
|
|
let dataname = Path::new(filepath)
|
|
.file_name()
|
|
.map(|n| n.to_string_lossy().to_string())
|
|
.unwrap_or_default();
|
|
|
|
// Upload using the Plik one-shot handler
|
|
PlikOneshotUploadHandler.upload(file_server_url, &dataname, &data)
|
|
}
|
|
|
|
// ============================================================================
|
|
// Module Exports
|
|
// ============================================================================
|
|
|
|
// All public types are already exported via `pub` on their definitions.
|
|
// Key types:
|
|
// - `smartpack`, `smartunpack` - main API functions
|
|
// - `Payload` - type-safe payload enum
|
|
// - `MsgEnvelopeV1`, `MsgPayloadV1` - wire format structs
|
|
// - `smartpackOptions`, `smartunpackOptions` - configuration
|
|
// - `FileUploadHandler`, `FileDownloadHandler` - trait abstractions
|
|
// - `PlikOneshotUploadHandler`, `BackoffDownloadHandler` - default implementations
|
|
// - `MsgHandlerError` - error type
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_payload_serialization() {
|
|
let text = Payload::Text("Hello".to_string());
|
|
assert_eq!(text.payload_type(), "text");
|
|
assert_eq!(text.size(), 5);
|
|
|
|
let dict = Payload::Dictionary(serde_json::json!({"key": "value"}));
|
|
assert_eq!(dict.payload_type(), "dictionary");
|
|
|
|
let binary = Payload::Binary(vec![1, 2, 3]);
|
|
assert_eq!(binary.payload_type(), "binary");
|
|
assert_eq!(binary.size(), 3);
|
|
}
|
|
|
|
#[test]
|
|
fn test_serialize_deserialize_text() {
|
|
let payload = Payload::Text("Hello, World!".to_string());
|
|
let bytes = serialize_data(&payload).unwrap();
|
|
let deserialized = deserialize_data(&bytes, "text", "test-corr").unwrap();
|
|
match deserialized {
|
|
Payload::Text(s) => assert_eq!(s, "Hello, World!"),
|
|
_ => panic!("Expected Text payload"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_serialize_deserialize_dictionary() {
|
|
let dict = serde_json::json!({"name": "Alice", "age": 30});
|
|
let payload = Payload::Dictionary(dict.clone());
|
|
let bytes = serialize_data(&payload).unwrap();
|
|
let deserialized = deserialize_data(&bytes, "dictionary", "test-corr").unwrap();
|
|
match deserialized {
|
|
Payload::Dictionary(v) => assert_eq!(v, dict),
|
|
_ => panic!("Expected Dictionary payload"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_serialize_deserialize_binary() {
|
|
let data = vec![0u8, 1, 2, 255, 128];
|
|
let payload = Payload::Binary(data.clone());
|
|
let bytes = serialize_data(&payload).unwrap();
|
|
let deserialized = deserialize_data(&bytes, "binary", "test-corr").unwrap();
|
|
match deserialized {
|
|
Payload::Binary(b) => assert_eq!(b, data),
|
|
_ => panic!("Expected Binary payload"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_envelope_json_roundtrip() {
|
|
let payload = MsgPayloadV1::new_direct(
|
|
"msg".to_string(),
|
|
"text".to_string(),
|
|
"SGVsbG8=".to_string(), // "Hello" in base64
|
|
5,
|
|
);
|
|
let env = MsgEnvelopeV1::new("/test/subject".to_string(), vec![payload]);
|
|
let json = env.to_json().unwrap();
|
|
let parsed: MsgEnvelopeV1 = serde_json::from_str(&json).unwrap();
|
|
assert_eq!(parsed.send_to, "/test/subject");
|
|
assert_eq!(parsed.payloads.len(), 1);
|
|
assert_eq!(parsed.payloads[0].dataname, "msg");
|
|
}
|
|
|
|
#[test]
|
|
fn test_base64_encoding() {
|
|
let data = b"Hello, msghandler!";
|
|
let encoded = BASE64.encode(data);
|
|
let decoded = BASE64.decode(&encoded).unwrap();
|
|
assert_eq!(decoded, data.to_vec());
|
|
}
|
|
|
|
#[test]
|
|
fn test_error_display() {
|
|
let err = MsgHandlerError::UnknownPayloadType("custom_type".to_string());
|
|
assert!(format!("{}", err).contains("custom_type"));
|
|
|
|
let err = MsgHandlerError::DownloadFailed {
|
|
url: "http://example.com/file".to_string(),
|
|
retries: 5,
|
|
};
|
|
assert!(format!("{}", err).contains("example.com"));
|
|
}
|
|
|
|
#[test]
|
|
fn test_default_options() {
|
|
let opts = smartpackOptions::default();
|
|
assert_eq!(opts.size_threshold, DEFAULT_SIZE_THRESHOLD);
|
|
assert_eq!(opts.broker_url, DEFAULT_BROKER_URL);
|
|
assert_eq!(opts.fileserver_url, DEFAULT_FILESERVER_URL);
|
|
|
|
let opts = smartunpackOptions::default();
|
|
assert_eq!(opts.max_retries, DEFAULT_MAX_RETRIES);
|
|
assert_eq!(opts.base_delay, DEFAULT_BASE_DELAY);
|
|
assert_eq!(opts.max_delay, DEFAULT_MAX_DELAY);
|
|
}
|
|
}
|