Files
msghandler/src/msghandler.rs

1207 lines
43 KiB
Rust

// msghandler Rust Module
// Cross-platform bi-directional data bridge
// Implements smartpack and smartunpack for message transport
// with support for both direct payload transport and URL-based transport
// for larger payloads using the Claim-Check pattern.
//
// File Server Handler Architecture:
// The system uses handler functions to abstract file server operations,
// allowing support for different file server implementations
// (e.g., Plik, AWS S3, custom HTTP server).
//
// Multi-Payload Support (Standard API):
// The system uses a standardized tuple format for all payload operations.
// Each payload is (dataname, data, type) and can have a different type,
// enabling mixed-content messages (e.g., chat with text, images, audio).
//
// Supported types: "text", "dictionary", "arrowtable", "jsontable",
// "image", "audio", "video", "binary"
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
use chrono::Utc;
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::fmt;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
// ============================================================================
// Constants
// ============================================================================
/// Default size threshold (0.5MB) for switching from direct to link transport
pub const DEFAULT_SIZE_THRESHOLD: usize = 500_000;
/// Default broker URL
pub const DEFAULT_BROKER_URL: &str = "localhost:4222";
/// Default HTTP file server URL for link transport
pub const DEFAULT_FILESERVER_URL: &str = "http://localhost:8080";
/// Default max retries for download with exponential backoff
pub const DEFAULT_MAX_RETRIES: u32 = 5;
/// Default base delay for exponential backoff (milliseconds)
pub const DEFAULT_BASE_DELAY: u64 = 100;
/// Default max delay for exponential backoff (milliseconds)
pub const DEFAULT_MAX_DELAY: u64 = 5_000;
// ============================================================================
// Error Types
// ============================================================================
/// Errors that can occur during msghandler operations
#[derive(Debug)]
pub enum MsgHandlerError {
/// Unsupported or unknown payload type
UnknownPayloadType(String),
/// File server upload failed
UploadFailed(String),
/// File server download failed after retries
DownloadFailed { url: String, retries: u32 },
/// Unknown transport type
UnknownTransport(String),
/// Connection failed
ConnectionFailed(String),
/// Payload deserialization error
DeserializationError(String),
/// HTTP request error
HttpError { status: u16, message: String },
/// IO error
IoError(String),
/// JSON serialization/deserialization error
JsonError(String),
/// Base64 decode error
Base64Error(String),
/// Payload size exceeded maximum
SizeExceeded { size: usize, max: usize },
/// Invalid envelope (missing required fields)
InvalidEnvelope(String),
}
impl fmt::Display for MsgHandlerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MsgHandlerError::UnknownPayloadType(p) => write!(f, "Unknown payload_type: {}", p),
MsgHandlerError::UploadFailed(msg) => write!(f, "Failed to upload: {}", msg),
MsgHandlerError::DownloadFailed { url, retries } => {
write!(f, "Failed to fetch {} after {} attempts", url, retries)
}
MsgHandlerError::UnknownTransport(t) => write!(f, "Unknown transport type: {}", t),
MsgHandlerError::ConnectionFailed(msg) => write!(f, "Connection failed: {}", msg),
MsgHandlerError::DeserializationError(msg) => {
write!(f, "Deserialization error: {}", msg)
}
MsgHandlerError::HttpError { status, message } => {
write!(f, "HTTP error {}: {}", status, message)
}
MsgHandlerError::IoError(msg) => write!(f, "IO error: {}", msg),
MsgHandlerError::JsonError(msg) => write!(f, "JSON error: {}", msg),
MsgHandlerError::Base64Error(msg) => write!(f, "Base64 error: {}", msg),
MsgHandlerError::SizeExceeded { size, max } => {
write!(f, "Payload size {} exceeds max {}", size, max)
}
MsgHandlerError::InvalidEnvelope(msg) => write!(f, "Invalid envelope: {}", msg),
}
}
}
impl std::error::Error for MsgHandlerError {}
// ============================================================================
// Payload Enum - Type-safe payload data
// ============================================================================
/// Type-safe payload data for sending. Each variant represents a supported
/// payload type with its corresponding data representation.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Payload {
Text(String),
Dictionary(JsonValue),
ArrowTable(Vec<u8>),
JsonTable(JsonValue),
Image(Vec<u8>),
Audio(Vec<u8>),
Video(Vec<u8>),
Binary(Vec<u8>),
}
impl Payload {
/// Get the payload type string for this variant
pub fn payload_type(&self) -> &'static str {
match self {
Payload::Text(_) => "text",
Payload::Dictionary(_) => "dictionary",
Payload::ArrowTable(_) => "arrowtable",
Payload::JsonTable(_) => "jsontable",
Payload::Image(_) => "image",
Payload::Audio(_) => "audio",
Payload::Video(_) => "video",
Payload::Binary(_) => "binary",
}
}
/// Get the serialized bytes for this payload
pub fn serialized_bytes(&self) -> Vec<u8> {
match self {
Payload::Text(s) => s.as_bytes().to_vec(),
Payload::Dictionary(v) => serde_json::to_vec(v).unwrap_or_default(),
Payload::ArrowTable(b) => b.clone(),
Payload::JsonTable(v) => serde_json::to_vec(v).unwrap_or_default(),
Payload::Image(b) => b.clone(),
Payload::Audio(b) => b.clone(),
Payload::Video(b) => b.clone(),
Payload::Binary(b) => b.clone(),
}
}
/// Get the size of the serialized bytes
pub fn size(&self) -> usize {
self.serialized_bytes().len()
}
}
// ============================================================================
// Message Payload Structure (wire format)
// ============================================================================
/// Represents a single payload within a message envelope.
/// Supports both direct transport (base64-encoded data) and link transport (URL-based).
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct MsgPayloadV1 {
/// Unique identifier for this payload (UUID v4)
pub id: String,
/// Name of the payload (e.g., "login_image", "msg")
pub dataname: String,
/// Payload type: "text", "dictionary", "arrowtable", "jsontable",
/// "image", "audio", "video", "binary"
pub payload_type: String,
/// Transport method: "direct" or "link"
pub transport: String,
/// Encoding method: "none", "json", "base64", "arrow-ipc"
pub encoding: String,
/// Size of the payload in bytes
pub size: usize,
/// Payload data (base64 string for direct transport, URL for link transport)
pub data: String,
/// Optional payload-level metadata
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, JsonValue>>,
}
impl MsgPayloadV1 {
/// Create a new direct transport payload
pub fn new_direct(
dataname: String,
payload_type: String,
data: String,
size: usize,
) -> Self {
let encoding = match payload_type.as_str() {
"jsontable" => "json".to_string(),
"arrowtable" => "arrow-ipc".to_string(),
_ => "base64".to_string(),
};
MsgPayloadV1 {
id: Uuid::new_v4().to_string(),
dataname,
payload_type,
transport: "direct".to_string(),
encoding,
size,
data,
metadata: Some({
let mut m = HashMap::new();
m.insert(
"payload_bytes".to_string(),
JsonValue::Number(size.into()),
);
m
}),
}
}
/// Create a new link transport payload
pub fn new_link(
dataname: String,
payload_type: String,
url: String,
size: usize,
) -> Self {
let encoding = match payload_type.as_str() {
"jsontable" => "json".to_string(),
"arrowtable" => "arrow-ipc".to_string(),
_ => "none".to_string(),
};
MsgPayloadV1 {
id: Uuid::new_v4().to_string(),
dataname,
payload_type,
transport: "link".to_string(),
encoding,
size,
data: url,
metadata: None,
}
}
}
// ============================================================================
// Message Envelope Structure (wire format)
// ============================================================================
/// Represents a complete message envelope containing multiple payloads
/// with metadata for routing, tracing, and message context.
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct MsgEnvelopeV1 {
/// Unique identifier to track messages across systems (UUID v4)
pub correlation_id: String,
/// Unique message identifier (UUID v4)
pub msg_id: String,
/// Message publication timestamp (ISO 8601 UTC)
pub timestamp: String,
/// Subject/topic to send the message to
pub send_to: String,
/// Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown",
/// "chat", "command", "event"
pub msg_purpose: String,
/// Sender application name
pub sender_name: String,
/// Sender UUID (UUID v4)
pub sender_id: String,
/// Receiver application name (empty string = broadcast)
pub receiver_name: String,
/// Receiver UUID (empty string = broadcast)
pub receiver_id: String,
/// Topic where receiver should reply
pub reply_to: String,
/// Message ID this message is replying to
pub reply_to_msg_id: String,
/// Broker URL
pub broker_url: String,
/// Optional message-level metadata
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, JsonValue>>,
/// List of payloads
pub payloads: Vec<MsgPayloadV1>,
}
impl MsgEnvelopeV1 {
/// Create a new message envelope
pub fn new(send_to: String, payloads: Vec<MsgPayloadV1>) -> Self {
MsgEnvelopeV1 {
correlation_id: Uuid::new_v4().to_string(),
msg_id: Uuid::new_v4().to_string(),
timestamp: Utc::now().to_rfc3339(),
send_to,
msg_purpose: "chat".to_string(),
sender_name: "msghandler".to_string(),
sender_id: Uuid::new_v4().to_string(),
receiver_name: String::new(),
receiver_id: String::new(),
reply_to: String::new(),
reply_to_msg_id: String::new(),
broker_url: DEFAULT_BROKER_URL.to_string(),
metadata: Some(HashMap::new()),
payloads,
}
}
/// Convert the envelope to a JSON string for transport
pub fn to_json(&self) -> Result<String, MsgHandlerError> {
serde_json::to_string(self).map_err(|e| MsgHandlerError::JsonError(e.to_string()))
}
}
// ============================================================================
// Options Structures
// ============================================================================
/// Options for the `smartpack` function
pub struct smartpackOptions {
/// Broker URL
pub broker_url: String,
/// HTTP file server URL for large payloads
pub fileserver_url: String,
/// Custom file server upload handler (optional, uses Plik by default)
pub fileserver_upload_handler: Option<Arc<dyn FileUploadHandler>>,
/// Size threshold in bytes for switching from direct to link transport
pub size_threshold: usize,
/// Correlation ID for distributed tracing (auto-generated if empty)
pub correlation_id: String,
/// Purpose of the message
pub msg_purpose: String,
/// Sender application name
pub sender_name: String,
/// Receiver application name (empty = broadcast)
pub receiver_name: String,
/// Receiver UUID (empty = broadcast)
pub receiver_id: String,
/// Topic to reply to
pub reply_to: String,
/// Message ID being replied to
pub reply_to_msg_id: String,
/// Message ID (auto-generated if empty)
pub msg_id: String,
/// Sender UUID (auto-generated if empty)
pub sender_id: String,
}
impl Default for smartpackOptions {
fn default() -> Self {
smartpackOptions {
broker_url: DEFAULT_BROKER_URL.to_string(),
fileserver_url: DEFAULT_FILESERVER_URL.to_string(),
fileserver_upload_handler: None,
size_threshold: DEFAULT_SIZE_THRESHOLD,
correlation_id: String::new(),
msg_purpose: "chat".to_string(),
sender_name: "msghandler".to_string(),
receiver_name: String::new(),
receiver_id: String::new(),
reply_to: String::new(),
reply_to_msg_id: String::new(),
msg_id: String::new(),
sender_id: String::new(),
}
}
}
/// Options for the `smartunpack` function
pub struct smartunpackOptions {
/// Custom file server download handler (optional, uses exponential backoff by default)
pub fileserver_download_handler: Option<Arc<dyn FileDownloadHandler>>,
/// Maximum retry attempts for fetching a URL
pub max_retries: u32,
/// Initial delay for exponential backoff in milliseconds
pub base_delay: u64,
/// Maximum delay for exponential backoff in milliseconds
pub max_delay: u64,
}
impl Default for smartunpackOptions {
fn default() -> Self {
smartunpackOptions {
fileserver_download_handler: None,
max_retries: DEFAULT_MAX_RETRIES,
base_delay: DEFAULT_BASE_DELAY,
max_delay: DEFAULT_MAX_DELAY,
}
}
}
// ============================================================================
// File Server Handler Traits
// ============================================================================
/// Trait for uploading data to a file server
pub trait FileUploadHandler: Send + Sync {
/// Upload data to the file server
/// Returns upload ID, file ID, and download URL
fn upload(
&self,
file_server_url: &str,
dataname: &str,
data: &[u8],
) -> Result<UploadResult, MsgHandlerError>;
}
/// Result of a file server upload
#[derive(Debug, Clone)]
pub struct UploadResult {
/// HTTP response status code
pub status: u16,
/// Upload session identifier
pub uploadid: String,
/// File identifier within the session
pub fileid: String,
/// Full download URL
pub url: String,
}
/// Trait for downloading data from a file server
pub trait FileDownloadHandler: Send + Sync {
/// Download data from a URL with retry logic
fn download(
&self,
url: &str,
max_retries: u32,
base_delay: u64,
max_delay: u64,
correlation_id: &str,
) -> Result<Vec<u8>, MsgHandlerError>;
}
// ============================================================================
// Plik One-Shot Upload Handler Implementation
// ============================================================================
/// Default file server upload handler using Plik one-shot mode.
///
/// Workflow:
/// 1. Creates a one-shot upload session (POST /upload with `{"OneShot": true}`)
/// 2. Retrieves upload ID and token from response
/// 3. Uploads binary data as multipart form data with the token
/// 4. Returns identifiers and download URL
pub struct PlikOneshotUploadHandler;
impl FileUploadHandler for PlikOneshotUploadHandler {
fn upload(
&self,
file_server_url: &str,
dataname: &str,
data: &[u8],
) -> Result<UploadResult, MsgHandlerError> {
let client = Client::new();
// Step 1: Create one-shot upload session
let session_body = serde_json::json!({"OneShot": true});
let session_resp = client
.post(format!("{}/upload", file_server_url))
.header("Content-Type", "application/json")
.json(&session_body)
.send()
.map_err(|e| MsgHandlerError::UploadFailed(format!("Failed to create upload session: {}", e)))?;
if !session_resp.status().is_success() {
return Err(MsgHandlerError::UploadFailed(format!(
"Session creation failed with status: {}",
session_resp.status()
)));
}
let session_json: JsonValue = session_resp
.json()
.map_err(|e| MsgHandlerError::UploadFailed(format!("Failed to parse session response: {}", e)))?;
let uploadid = session_json["id"]
.as_str()
.unwrap_or("")
.to_string();
let uploadtoken = session_json["uploadToken"]
.as_str()
.unwrap_or("")
.to_string();
if uploadid.is_empty() || uploadtoken.is_empty() {
return Err(MsgHandlerError::UploadFailed(
"Missing uploadid or uploadToken in session response".to_string(),
));
}
// Step 2: Upload the file as multipart/form-data
let upload_url = format!("{}/file/{}", file_server_url, uploadid);
let form = reqwest::blocking::multipart::Form::new()
.part(
"file",
reqwest::blocking::multipart::Part::bytes(data.to_vec())
.file_name(dataname.to_string())
.mime_str("application/octet-stream")
.map_err(|e| MsgHandlerError::UploadFailed(format!("Invalid MIME type: {}", e)))?,
);
let resp = client
.post(&upload_url)
.header("X-UploadToken", &uploadtoken)
.multipart(form)
.send()
.map_err(|e| MsgHandlerError::UploadFailed(format!("Upload request failed: {}", e)))?;
if !resp.status().is_success() {
return Err(MsgHandlerError::UploadFailed(format!(
"Upload failed with status: {}",
resp.status()
)));
}
let status_code = resp.status().as_u16();
let upload_json: JsonValue = resp
.json()
.map_err(|e| MsgHandlerError::UploadFailed(format!("Failed to parse upload response: {}", e)))?;
let fileid = upload_json["id"].as_str().unwrap_or("").to_string();
let url = format!(
"{}/file/{}/{}/{}",
file_server_url, uploadid, fileid, dataname
);
Ok(UploadResult {
status: status_code,
uploadid,
fileid,
url,
})
}
}
// ============================================================================
// Exponential Backoff Download Handler Implementation
// ============================================================================
/// Default download handler using exponential backoff retry logic.
///
/// Workflow:
/// 1. Attempts to fetch data from URL
/// 2. On failure, retries with exponentially increasing delay
/// 3. Capped at max_delay between retries
/// 4. Throws error after max_retries are exhausted
pub struct BackoffDownloadHandler;
impl FileDownloadHandler for BackoffDownloadHandler {
fn download(
&self,
url: &str,
max_retries: u32,
base_delay: u64,
max_delay: u64,
correlation_id: &str,
) -> Result<Vec<u8>, MsgHandlerError> {
let client = Client::new();
let mut delay = base_delay;
for attempt in 1..=max_retries {
match client.get(url).send() {
Ok(response) if response.status().is_success() => {
log_trace(correlation_id, &format!(
"Successfully fetched {} on attempt {}",
url, attempt
));
let bytes = response
.bytes()
.map(|b| b.to_vec())
.map_err(|_e| MsgHandlerError::DownloadFailed {
url: url.to_string(),
retries: max_retries,
})?;
return Ok(bytes);
}
Ok(response) => {
log_trace(correlation_id, &format!(
"Attempt {} failed with status {}: {}",
attempt,
response.status(),
url
));
}
Err(e) => {
log_trace(correlation_id, &format!(
"Attempt {} failed: {}: {}",
attempt,
e,
url
));
}
}
if attempt < max_retries {
std::thread::sleep(Duration::from_millis(delay));
delay = (delay * 2).min(max_delay);
}
}
Err(MsgHandlerError::DownloadFailed {
url: url.to_string(),
retries: max_retries,
})
}
}
// ============================================================================
// Serialization
// ============================================================================
/// Serialize payload data according to the specified payload type.
/// Returns the raw bytes for the serialized data.
fn serialize_data(payload: &Payload) -> Result<Vec<u8>, MsgHandlerError> {
match payload {
Payload::Text(s) => Ok(s.as_bytes().to_vec()),
Payload::Dictionary(v) => serde_json::to_vec(v)
.map_err(|e| MsgHandlerError::DeserializationError(format!("Dictionary serialization failed: {}", e))),
Payload::ArrowTable(b) => Ok(b.clone()),
Payload::JsonTable(v) => serde_json::to_vec(v)
.map_err(|e| MsgHandlerError::DeserializationError(format!("JsonTable serialization failed: {}", e))),
Payload::Image(b) => Ok(b.clone()),
Payload::Audio(b) => Ok(b.clone()),
Payload::Video(b) => Ok(b.clone()),
Payload::Binary(b) => Ok(b.clone()),
}
}
// ============================================================================
// Deserialization
// ============================================================================
/// Deserialize bytes back to a Payload based on the payload type.
/// Handles direct transport (base64 decoded) and link transport (fetched bytes).
fn deserialize_data(
payload_bytes: &[u8],
payload_type: &str,
_correlation_id: &str,
) -> Result<Payload, MsgHandlerError> {
match payload_type {
"text" => {
let text = String::from_utf8(payload_bytes.to_vec())
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid UTF-8 for text: {}", e)))?;
Ok(Payload::Text(text))
}
"dictionary" => {
let json_str = String::from_utf8(payload_bytes.to_vec())
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid UTF-8 for dictionary: {}", e)))?;
let value: JsonValue = serde_json::from_str(&json_str)
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid JSON for dictionary: {}", e)))?;
Ok(Payload::Dictionary(value))
}
"arrowtable" => {
Ok(Payload::ArrowTable(payload_bytes.to_vec()))
}
"jsontable" => {
let json_str = String::from_utf8(payload_bytes.to_vec())
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid UTF-8 for jsontable: {}", e)))?;
let value: JsonValue = serde_json::from_str(&json_str)
.map_err(|e| MsgHandlerError::DeserializationError(format!("Invalid JSON for jsontable: {}", e)))?;
Ok(Payload::JsonTable(value))
}
"image" => Ok(Payload::Image(payload_bytes.to_vec())),
"audio" => Ok(Payload::Audio(payload_bytes.to_vec())),
"video" => Ok(Payload::Video(payload_bytes.to_vec())),
"binary" => Ok(Payload::Binary(payload_bytes.to_vec())),
_ => Err(MsgHandlerError::UnknownPayloadType(payload_type.to_string())),
}
}
// ============================================================================
// Logging
// ============================================================================
/// Log a trace message with correlation ID and timestamp.
/// Uses tokio::task::spawn_blocking to avoid blocking the async runtime.
pub fn log_trace(correlation_id: &str, message: &str) {
let ts = Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ");
eprintln!("[{}] [Correlation: {}] {}", ts, correlation_id, message);
}
// ============================================================================
// Public API: smartpack
// ============================================================================
/// Send data with automatic transport selection.
///
/// This function intelligently routes data delivery based on payload size.
/// If the serialized payload is smaller than `size_threshold`, it encodes the
/// data as Base64 and constructs a "direct" MsgPayloadV1. Otherwise, it uploads
/// the data to a file server and constructs a "link" MsgPayloadV1 with the URL.
///
/// Each payload in the list can have a different type, enabling mixed-content
/// messages (e.g., chat with text, images, audio).
///
/// Transport publishing is the caller's responsibility. This function returns the
/// envelope and its JSON string representation.
///
/// # Arguments
/// - `subject`: Subject/topic to send the message to
/// - `data`: Slice of (dataname, payload, payload_type) tuples
/// - `options`: Configuration options
///
/// # Returns
/// - `Result<(MsgEnvelopeV1, String), MsgHandlerError>` containing the envelope and JSON string
///
/// # Example
/// ```no_run
/// use msghandler::{smartpack, Payload, smartpackOptions};
///
/// let (envelope, json_str) = smartpack(
/// "/agent/wine/api/v1/prompt",
/// &[
/// ("msg".to_string(), Payload::Text("Hello!".to_string()), "text".to_string()),
/// ("data".to_string(), Payload::Binary(vec![1, 2, 3]), "binary".to_string()),
/// ],
/// &smartpackOptions::default(),
/// ).unwrap();
///
/// // Caller publishes via their preferred transport
/// ```
pub fn smartpack(
subject: &str,
data: &[(String, Payload, String)],
options: &smartpackOptions,
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
let correlation_id = if options.correlation_id.is_empty() {
Uuid::new_v4().to_string()
} else {
options.correlation_id.clone()
};
let msg_id = if options.msg_id.is_empty() {
Uuid::new_v4().to_string()
} else {
options.msg_id.clone()
};
let sender_id = if options.sender_id.is_empty() {
Uuid::new_v4().to_string()
} else {
options.sender_id.clone()
};
log_trace(&correlation_id, &format!(
"Starting smartpack for subject: {}", subject
));
let mut payloads: Vec<MsgPayloadV1> = Vec::new();
// Determine the upload handler to use (custom or default Plik)
let upload_handler: Arc<dyn FileUploadHandler> = options.fileserver_upload_handler.clone()
.unwrap_or_else(|| Arc::new(PlikOneshotUploadHandler));
for (dataname, payload, payload_type) in data {
// Use the explicitly provided payload_type from the tuple,
// or derive from the Payload enum
let ptype = if payload_type.is_empty() {
payload.payload_type().to_string()
} else {
payload_type.clone()
};
// Serialize the payload data
let payload_bytes = serialize_data(payload)?;
let payload_size = payload_bytes.len();
log_trace(&correlation_id, &format!(
"Serialized payload '{}' (type: {}) size: {} bytes",
dataname, ptype, payload_size
));
if payload_size < options.size_threshold {
// Direct transport: Base64 encode and include in message envelope
let payload_b64 = BASE64.encode(&payload_bytes);
log_trace(&correlation_id, &format!(
"Using direct transport for {} bytes", payload_size
));
let msg_payload = MsgPayloadV1::new_direct(
dataname.clone(),
ptype,
payload_b64,
payload_size,
);
payloads.push(msg_payload);
} else {
// Link transport: Upload to file server, include URL in message envelope
log_trace(&correlation_id, "Using link transport, uploading to fileserver");
let upload_result = upload_handler
.upload(&options.fileserver_url, dataname, &payload_bytes)?;
log_trace(&correlation_id, &format!(
"Uploaded to URL: {}", upload_result.url
));
let msg_payload = MsgPayloadV1::new_link(
dataname.clone(),
ptype,
upload_result.url,
payload_size,
);
payloads.push(msg_payload);
}
}
// Build the message envelope
let env = MsgEnvelopeV1 {
correlation_id: correlation_id.clone(),
msg_id,
timestamp: Utc::now().to_rfc3339(),
send_to: subject.to_string(),
msg_purpose: options.msg_purpose.clone(),
sender_name: options.sender_name.clone(),
sender_id,
receiver_name: options.receiver_name.clone(),
receiver_id: options.receiver_id.clone(),
reply_to: options.reply_to.clone(),
reply_to_msg_id: options.reply_to_msg_id.clone(),
broker_url: options.broker_url.clone(),
metadata: Some(HashMap::new()),
payloads,
};
let env_json_str = env.to_json()?;
log_trace(&correlation_id, "Envelope created successfully");
Ok((env, env_json_str))
}
// ============================================================================
// Helper: store deserialized data back into MsgPayloadV1
// ============================================================================
/// Store deserialized Payload data back into a MsgPayloadV1's data field.
/// After smartunpack(), payload.data contains the deserialized content as a string
/// (decoded text, JSON string, or base64 for binary types).
fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> MsgPayloadV1 {
let mut p = payload.clone();
match deserialized {
Payload::Text(s) => p.data = s.clone(),
Payload::Dictionary(v) => p.data = serde_json::to_string(v).unwrap_or_default(),
Payload::JsonTable(v) => p.data = serde_json::to_string(v).unwrap_or_default(),
Payload::ArrowTable(b) => p.data = BASE64.encode(b),
Payload::Image(b) | Payload::Audio(b) | Payload::Video(b) | Payload::Binary(b) => {
p.data = BASE64.encode(b);
}
}
p
}
// ============================================================================
// Public API: smartunpack
// ============================================================================
/// Receive and process messages.
///
/// This function processes incoming messages, handling both direct transport
/// (base64 decoded payloads) and link transport (URL-based payloads).
/// It deserializes the data based on the payload type and returns the envelope
/// with deserialized payloads.
///
/// # Arguments
/// - `msg_json_str`: JSON string from the message payload
/// - `options`: Configuration options
///
/// # Returns
/// - `Result<MsgEnvelopeV1, MsgHandlerError>` with deserialized payloads
///
/// # Example
/// ```no_run
/// use msghandler::{smartunpack, smartunpackOptions};
/// use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
///
/// let msg_json_str = r#"{"correlation_id":"abc123","msg_id":"msg-uuid",
/// "timestamp":"2026-01-01T00:00:00Z","send_to":"/test",
/// "msg_purpose":"chat","sender_name":"test","sender_id":"sender-uuid",
/// "receiver_name":"","receiver_id":"","reply_to":"","reply_to_msg_id":"",
/// "broker_url":"localhost:4222","payloads":[{
/// "id":"payload-uuid","dataname":"msg","payload_type":"text",
/// "transport":"direct","encoding":"base64","size":5,
/// "data":"SGVsbG8=","metadata":{"payload_bytes":5}
/// }]}"#;
///
/// let envelope = smartunpack(msg_json_str, &smartunpackOptions::default()).unwrap();
///
/// for payload in &envelope.payloads {
/// if payload.transport == "direct" {
/// let decoded = BASE64.decode(&payload.data).unwrap();
/// println!("{}: {}", payload.dataname, String::from_utf8_lossy(&decoded));
/// } else {
/// println!("{}: URL={}", payload.dataname, payload.data);
/// }
/// }
/// ```
pub fn smartunpack(
msg_json_str: &str,
options: &smartunpackOptions,
) -> Result<MsgEnvelopeV1, MsgHandlerError> {
// Parse the JSON envelope
let mut env: MsgEnvelopeV1 = serde_json::from_str(msg_json_str)
.map_err(|e| MsgHandlerError::InvalidEnvelope(format!(
"Failed to parse envelope JSON: {}", e
)))?;
let correlation_id = env.correlation_id.clone();
log_trace(&correlation_id, "Processing received message");
// Determine the download handler to use (custom or default backoff)
let download_handler: Arc<dyn FileDownloadHandler> = options.fileserver_download_handler.clone()
.unwrap_or_else(|| Arc::new(BackoffDownloadHandler));
// Process each payload
let mut updated_payloads: Vec<MsgPayloadV1> = Vec::new();
for payload in &env.payloads {
let transport = payload.transport.as_str();
let dataname = payload.dataname.clone();
let payload_type = payload.payload_type.clone();
match transport {
"direct" => {
log_trace(&correlation_id, &format!(
"Direct transport - decoding payload '{}'", dataname
));
// Decode Base64 payload
let payload_bytes = BASE64.decode(&payload.data)
.map_err(|e| MsgHandlerError::Base64Error(format!(
"Base64 decode failed for '{}': {}", dataname, e
)))?;
// Deserialize based on type and store result back into payload
let deserialized = deserialize_data(
&payload_bytes,
&payload_type,
&correlation_id,
)?;
let updated = store_deserialized_data(payload, &deserialized);
updated_payloads.push(updated);
}
"link" => {
let url = payload.data.clone();
log_trace(&correlation_id, &format!(
"Link transport - fetching '{}' from URL: {}", dataname, url
));
// Fetch with exponential backoff
let downloaded_data = download_handler
.download(
&url,
options.max_retries,
options.base_delay,
options.max_delay,
&correlation_id,
)?;
// Deserialize based on type and store result back into payload
let deserialized = deserialize_data(
&downloaded_data,
&payload_type,
&correlation_id,
)?;
let updated = store_deserialized_data(payload, &deserialized);
updated_payloads.push(updated);
}
unknown => {
return Err(MsgHandlerError::UnknownTransport(format!(
"Unknown transport type '{}' for payload '{}'",
unknown, dataname
)));
}
}
}
env.payloads = updated_payloads;
Ok(env)
}
// ============================================================================
// Convenience Functions
// ============================================================================
/// Send a single text payload
pub fn send_text(
subject: &str,
text: &str,
options: &smartpackOptions,
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
smartpack(
subject,
&[(
"text".to_string(),
Payload::Text(text.to_string()),
"text".to_string(),
)],
options,
)
}
/// Send a single dictionary payload
pub fn send_dictionary(
subject: &str,
data: &JsonValue,
options: &smartpackOptions,
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
smartpack(
subject,
&[(
"dictionary".to_string(),
Payload::Dictionary(data.clone()),
"dictionary".to_string(),
)],
options,
)
}
/// Send a single binary payload
pub fn send_binary(
subject: &str,
data: &[u8],
options: &smartpackOptions,
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
smartpack(
subject,
&[(
"binary".to_string(),
Payload::Binary(data.to_vec()),
"binary".to_string(),
)],
options,
)
}
// ============================================================================
// Plik File Upload from Disk
// ============================================================================
/// Upload a file from disk to a Plik server in one-shot mode.
///
/// Reads the file at `filepath`, extracts its filename, and uploads it
/// using the Plik one-shot upload protocol (multipart/form-data).
///
/// # Arguments
/// - `file_server_url`: Base URL of the Plik server (e.g., `"http://localhost:8080"`)
/// - `filepath`: Full path to the local file to upload
///
/// # Returns
/// - `Result<UploadResult, MsgHandlerError>` with uploadid, fileid, and download URL
///
/// # Example
/// ```no_run
/// use msghandler::plik_upload_file;
///
/// let result = plik_upload_file("http://localhost:8080", "./large_file.zip").unwrap();
/// println!("Uploaded to: {}", result.url);
/// ```
pub fn plik_upload_file(
file_server_url: &str,
filepath: &str,
) -> Result<UploadResult, MsgHandlerError> {
// Read the file from disk
let data = std::fs::read(filepath)
.map_err(|e| MsgHandlerError::IoError(format!(
"Failed to read file '{}': {}", filepath, e
)))?;
// Extract filename from path
let dataname = Path::new(filepath)
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
// Upload using the Plik one-shot handler
PlikOneshotUploadHandler.upload(file_server_url, &dataname, &data)
}
// ============================================================================
// Module Exports
// ============================================================================
// All public types are already exported via `pub` on their definitions.
// Key types:
// - `smartpack`, `smartunpack` - main API functions
// - `Payload` - type-safe payload enum
// - `MsgEnvelopeV1`, `MsgPayloadV1` - wire format structs
// - `smartpackOptions`, `smartunpackOptions` - configuration
// - `FileUploadHandler`, `FileDownloadHandler` - trait abstractions
// - `PlikOneshotUploadHandler`, `BackoffDownloadHandler` - default implementations
// - `MsgHandlerError` - error type
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_payload_serialization() {
let text = Payload::Text("Hello".to_string());
assert_eq!(text.payload_type(), "text");
assert_eq!(text.size(), 5);
let dict = Payload::Dictionary(serde_json::json!({"key": "value"}));
assert_eq!(dict.payload_type(), "dictionary");
let binary = Payload::Binary(vec![1, 2, 3]);
assert_eq!(binary.payload_type(), "binary");
assert_eq!(binary.size(), 3);
}
#[test]
fn test_serialize_deserialize_text() {
let payload = Payload::Text("Hello, World!".to_string());
let bytes = serialize_data(&payload).unwrap();
let deserialized = deserialize_data(&bytes, "text", "test-corr").unwrap();
match deserialized {
Payload::Text(s) => assert_eq!(s, "Hello, World!"),
_ => panic!("Expected Text payload"),
}
}
#[test]
fn test_serialize_deserialize_dictionary() {
let dict = serde_json::json!({"name": "Alice", "age": 30});
let payload = Payload::Dictionary(dict.clone());
let bytes = serialize_data(&payload).unwrap();
let deserialized = deserialize_data(&bytes, "dictionary", "test-corr").unwrap();
match deserialized {
Payload::Dictionary(v) => assert_eq!(v, dict),
_ => panic!("Expected Dictionary payload"),
}
}
#[test]
fn test_serialize_deserialize_binary() {
let data = vec![0u8, 1, 2, 255, 128];
let payload = Payload::Binary(data.clone());
let bytes = serialize_data(&payload).unwrap();
let deserialized = deserialize_data(&bytes, "binary", "test-corr").unwrap();
match deserialized {
Payload::Binary(b) => assert_eq!(b, data),
_ => panic!("Expected Binary payload"),
}
}
#[test]
fn test_envelope_json_roundtrip() {
let payload = MsgPayloadV1::new_direct(
"msg".to_string(),
"text".to_string(),
"SGVsbG8=".to_string(), // "Hello" in base64
5,
);
let env = MsgEnvelopeV1::new("/test/subject".to_string(), vec![payload]);
let json = env.to_json().unwrap();
let parsed: MsgEnvelopeV1 = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.send_to, "/test/subject");
assert_eq!(parsed.payloads.len(), 1);
assert_eq!(parsed.payloads[0].dataname, "msg");
}
#[test]
fn test_base64_encoding() {
let data = b"Hello, msghandler!";
let encoded = BASE64.encode(data);
let decoded = BASE64.decode(&encoded).unwrap();
assert_eq!(decoded, data.to_vec());
}
#[test]
fn test_error_display() {
let err = MsgHandlerError::UnknownPayloadType("custom_type".to_string());
assert!(format!("{}", err).contains("custom_type"));
let err = MsgHandlerError::DownloadFailed {
url: "http://example.com/file".to_string(),
retries: 5,
};
assert!(format!("{}", err).contains("example.com"));
}
#[test]
fn test_default_options() {
let opts = smartpackOptions::default();
assert_eq!(opts.size_threshold, DEFAULT_SIZE_THRESHOLD);
assert_eq!(opts.broker_url, DEFAULT_BROKER_URL);
assert_eq!(opts.fileserver_url, DEFAULT_FILESERVER_URL);
let opts = smartunpackOptions::default();
assert_eq!(opts.max_retries, DEFAULT_MAX_RETRIES);
assert_eq!(opts.base_delay, DEFAULT_BASE_DELAY);
assert_eq!(opts.max_delay, DEFAULT_MAX_DELAY);
}
}