rename to smartpack n smartunpack
This commit is contained in:
@@ -378,13 +378,13 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
|
||||
*
|
||||
* @example
|
||||
* // Send a single payload
|
||||
* const [env, envJsonStr] = await msghandlerCSR.smartsend(
|
||||
* const [env, envJsonStr] = await msghandlerCSR.smartpack(
|
||||
* "/test",
|
||||
* [["dataname1", data1, "dictionary"]]
|
||||
* );
|
||||
*
|
||||
* // Send multiple payloads (use jsontable instead of arrowtable for browser)
|
||||
* const [env, envJsonStr] = await msghandlerCSR.smartsend(
|
||||
* const [env, envJsonStr] = await msghandlerCSR.smartpack(
|
||||
* "/test",
|
||||
* [
|
||||
* ["dataname1", data1, "dictionary"],
|
||||
@@ -395,7 +395,7 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
|
||||
* // Publish via your transport (NATS, MQTT, HTTP, etc.)
|
||||
* // await myNatsClient.publish("/test", envJsonStr);
|
||||
*/
|
||||
async function smartsend(subject, data, options = {}) {
|
||||
async function smartpack(subject, data, options = {}) {
|
||||
const {
|
||||
broker_url = DEFAULT_BROKER_URL,
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
@@ -412,20 +412,20 @@ async function smartsend(subject, data, options = {}) {
|
||||
sender_id = uuidv4()
|
||||
} = options;
|
||||
|
||||
logTrace(correlation_id, `Starting smartsend for subject: ${subject}`);
|
||||
logTrace(correlation_id, `smartsend: data array length=${data.length}`);
|
||||
logTrace(correlation_id, `Starting smartpack for subject: ${subject}`);
|
||||
logTrace(correlation_id, `smartpack: data array length=${data.length}`);
|
||||
|
||||
// Debug: Log input data structure
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
const [dataname, payloadData, payloadType] = data[i];
|
||||
logTrace(correlation_id, `smartsend: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
logTrace(correlation_id, `smartpack: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
}
|
||||
|
||||
// Process payloads
|
||||
const payloads = [];
|
||||
for (const [dataname, payloadData, payloadType] of data) {
|
||||
logTrace(correlation_id, `smartsend: Processing payload '${dataname}' type=${payloadType}`);
|
||||
logTrace(correlation_id, `smartsend: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
logTrace(correlation_id, `smartpack: Processing payload '${dataname}' type=${payloadType}`);
|
||||
logTrace(correlation_id, `smartpack: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
|
||||
const payloadBytes = await serializeData(payloadData, payloadType);
|
||||
const payloadSize = payloadBytes.byteLength;
|
||||
@@ -502,7 +502,7 @@ async function smartsend(subject, data, options = {}) {
|
||||
*
|
||||
* @example
|
||||
* // Receive from JSON string directly
|
||||
* const env = await msghandlerCSR.smartreceive(jsonString, {
|
||||
* const env = await msghandlerCSR.smartunpack(jsonString, {
|
||||
* fileserver_download_handler: msghandlerCSR.fetchWithBackoff,
|
||||
* max_retries: 5,
|
||||
* base_delay: 100,
|
||||
@@ -510,7 +510,7 @@ async function smartsend(subject, data, options = {}) {
|
||||
* });
|
||||
*
|
||||
* // Receive from transport message object (e.g., NATS, MQTT)
|
||||
* const env = await msghandlerCSR.smartreceive(natsMsg, {
|
||||
* const env = await msghandlerCSR.smartunpack(natsMsg, {
|
||||
* fileserver_download_handler: msghandlerCSR.fetchWithBackoff
|
||||
* });
|
||||
* // env.payloads is an Array of [dataname, data, type] arrays
|
||||
@@ -518,7 +518,7 @@ async function smartsend(subject, data, options = {}) {
|
||||
* console.log(`${dataname}: ${data} (type: ${type})`);
|
||||
* }
|
||||
*/
|
||||
async function smartreceive(msg, options = {}) {
|
||||
async function smartunpack(msg, options = {}) {
|
||||
const {
|
||||
fileserver_download_handler = fetchWithBackoff,
|
||||
max_retries = 5,
|
||||
@@ -542,28 +542,28 @@ async function smartreceive(msg, options = {}) {
|
||||
throw new Error('Invalid message format: expected JSON string or message object');
|
||||
}
|
||||
|
||||
logTrace('smartreceive', `smartreceive: raw payload length=${payload.length}`);
|
||||
logTrace('smartunpack', `smartunpack: raw payload length=${payload.length}`);
|
||||
|
||||
// Debug: Show first 200 chars of payload
|
||||
const payloadPreview = payload.substring(0, 200);
|
||||
logTrace('smartreceive', `smartreceive: payload preview: ${payloadPreview}`);
|
||||
logTrace('smartunpack', `smartunpack: payload preview: ${payloadPreview}`);
|
||||
|
||||
let envJsonObj;
|
||||
try {
|
||||
envJsonObj = JSON.parse(payload);
|
||||
} catch (e) {
|
||||
logTrace('smartreceive', `smartreceive: JSON parse failed: ${e.message}`);
|
||||
logTrace('smartunpack', `smartunpack: JSON parse failed: ${e.message}`);
|
||||
throw e;
|
||||
}
|
||||
|
||||
logTrace(envJsonObj.correlation_id, 'Processing received message');
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: envelope has ${envJsonObj.payloads.length} payloads`);
|
||||
logTrace(envJsonObj.correlation_id, `smartunpack: envelope has ${envJsonObj.payloads.length} payloads`);
|
||||
|
||||
// Process all payloads in the envelope
|
||||
const payloadsList = [];
|
||||
const numPayloads = envJsonObj.payloads.length;
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Processing ${numPayloads} payloads`);
|
||||
logTrace(envJsonObj.correlation_id, `smartunpack: Processing ${numPayloads} payloads`);
|
||||
|
||||
for (let i = 0; i < numPayloads; i++) {
|
||||
const payloadObj = envJsonObj.payloads[i];
|
||||
@@ -571,7 +571,7 @@ async function smartreceive(msg, options = {}) {
|
||||
const dataname = payloadObj.dataname;
|
||||
const payloadType = payloadObj.payload_type;
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`);
|
||||
logTrace(envJsonObj.correlation_id, `smartunpack: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`);
|
||||
|
||||
if (transport === 'direct') {
|
||||
logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`);
|
||||
@@ -614,7 +614,7 @@ async function smartreceive(msg, options = {}) {
|
||||
}
|
||||
}
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Successfully processed all ${payloadsList.length} payloads`);
|
||||
logTrace(envJsonObj.correlation_id, `smartunpack: Successfully processed all ${payloadsList.length} payloads`);
|
||||
envJsonObj.payloads = payloadsList;
|
||||
return envJsonObj;
|
||||
}
|
||||
@@ -625,12 +625,12 @@ const msghandlerCSR = {
|
||||
/**
|
||||
* Send data with automatic transport selection
|
||||
*/
|
||||
smartsend,
|
||||
smartpack,
|
||||
|
||||
/**
|
||||
* Receive and process messages
|
||||
*/
|
||||
smartreceive,
|
||||
smartunpack,
|
||||
|
||||
/**
|
||||
* Upload data to plik server in one-shot mode
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Bi-Directional Data Bridge - Julia Module
|
||||
# Implements smartsend and smartreceive for message transport
|
||||
# Implements smartpack and smartunpack for message transport
|
||||
# This module provides functionality for sending and receiving data across network boundaries
|
||||
# with support for both direct payload transport and
|
||||
# URL-based transport for larger payloads.
|
||||
@@ -24,10 +24,10 @@
|
||||
#
|
||||
# API Standard:
|
||||
# ```jldoctest
|
||||
# # Input format for smartsend (always a list of tuples with type info)
|
||||
# # Input format for smartpack (always a list of tuples with type info)
|
||||
# [(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
||||
#
|
||||
# # Output format for smartreceive (always returns a list of tuples)
|
||||
# # Output format for smartunpack (always returns a list of tuples)
|
||||
# [(dataname1, data1, type1), (dataname2, data2, type2), ...]
|
||||
# ```
|
||||
#
|
||||
@@ -337,7 +337,7 @@ function log_trace(correlation_id::String, message::String)
|
||||
end
|
||||
|
||||
|
||||
""" smartsend - Send data with automatic transport selection, depending on payload size
|
||||
""" smartpack - Send data with automatic transport selection, depending on payload size
|
||||
|
||||
This function intelligently routes data delivery based on payload size relative to a threshold.
|
||||
If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and constructs a "direct" msg_payload_v1.
|
||||
@@ -392,23 +392,23 @@ using UUIDs
|
||||
|
||||
# Send a single payload (still wrapped in a list)
|
||||
data = Dict("key" => "value")
|
||||
env, msg_json = smartsend("my.subject", [("dataname1", data, "dictionary")])
|
||||
env, msg_json = smartpack("my.subject", [("dataname1", data, "dictionary")])
|
||||
|
||||
# Send multiple payloads in one message with different types
|
||||
data1 = Dict("key1" => "value1")
|
||||
data2 = rand(10_000) # Small array
|
||||
env, msg_json = smartsend("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")])
|
||||
env, msg_json = smartpack("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "arrowtable")])
|
||||
|
||||
# Send a large array using fileserver upload
|
||||
data = rand(10_000_000) # ~80 MB
|
||||
env, msg_json = smartsend("large.data", [("large_arrow_table", data, "arrowtable")])
|
||||
env, msg_json = smartpack("large.data", [("large_arrow_table", data, "arrowtable")])
|
||||
|
||||
# Send jsontable (JSON format)
|
||||
rows = [Dict("id" => 1, "name" => "Alice"), Dict("id" => 2, "name" => "Bob")]
|
||||
env, msg_json = smartsend("json.data", [("users", rows, "jsontable")])
|
||||
env, msg_json = smartpack("json.data", [("users", rows, "jsontable")])
|
||||
|
||||
# Mixed content (e.g., chat with text and image)
|
||||
env, msg_json = smartsend("chat.subject", [
|
||||
env, msg_json = smartpack("chat.subject", [
|
||||
("message_text", "Hello!", "text"),
|
||||
("user_image", image_data, "image"),
|
||||
("audio_clip", audio_data, "audio")
|
||||
@@ -419,8 +419,8 @@ env, msg_json = smartsend("chat.subject", [
|
||||
# my_transport.publish(conn, subject, env_json_str)
|
||||
```
|
||||
"""
|
||||
function smartsend(
|
||||
subject::String, # smartreceive's subject
|
||||
function smartpack(
|
||||
subject::String, # smartunpack's subject
|
||||
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples. Use Tuple{String, Any, String}[] for empty payloads
|
||||
broker_url::String = DEFAULT_BROKER_URL, # Broker URL
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
@@ -446,7 +446,7 @@ function smartsend(
|
||||
)::Tuple{msg_envelope_v1, String} where {T1<:Any}
|
||||
|
||||
# Log start of send operation
|
||||
log_trace(correlation_id, "Starting smartsend for subject: $subject")
|
||||
log_trace(correlation_id, "Starting smartpack for subject: $subject")
|
||||
|
||||
# Process each payload in the list
|
||||
payloads = msg_payload_v1[]
|
||||
@@ -772,7 +772,7 @@ end
|
||||
# end
|
||||
|
||||
|
||||
""" smartreceive - Receive and process messages
|
||||
""" smartunpack - Receive and process messages
|
||||
This function processes incoming messages, handling both direct transport
|
||||
(base64 decoded payloads) and link transport (URL-based payloads).
|
||||
It deserializes the data based on the transport type and returns the result.
|
||||
@@ -801,11 +801,11 @@ A HTTP file server is required along with its download function.
|
||||
```jldoctest
|
||||
# Receive and process message
|
||||
msg_json_str = String(msg.payload)
|
||||
env = smartreceive(msg_json_str; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
|
||||
env = smartunpack(msg_json_str; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
|
||||
# env["payloads"] = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]
|
||||
```
|
||||
"""
|
||||
function smartreceive(
|
||||
function smartunpack(
|
||||
msg_json_str::String; # get it from String(nats_msg.payload)
|
||||
fileserver_download_handler::Function = _fetch_with_backoff,
|
||||
max_retries::Int = 5,
|
||||
|
||||
@@ -431,13 +431,13 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
|
||||
*
|
||||
* @example
|
||||
* // Send a single payload
|
||||
* const [env, envJsonStr] = await smartsend(
|
||||
* const [env, envJsonStr] = await smartpack(
|
||||
* "/test",
|
||||
* [["dataname1", data1, "dictionary"]]
|
||||
* );
|
||||
*
|
||||
* // Send multiple payloads
|
||||
* const [env, envJsonStr] = await smartsend(
|
||||
* const [env, envJsonStr] = await smartpack(
|
||||
* "/test",
|
||||
* [
|
||||
* ["dataname1", data1, "dictionary"],
|
||||
@@ -448,7 +448,7 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
|
||||
* // Publish via your transport (NATS, MQTT, HTTP, etc.)
|
||||
* // await myNatsClient.publish("/test", envJsonStr);
|
||||
*/
|
||||
async function smartsend(subject, data, options = {}) {
|
||||
async function smartpack(subject, data, options = {}) {
|
||||
const {
|
||||
broker_url = DEFAULT_BROKER_URL,
|
||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||
@@ -465,20 +465,20 @@ async function smartsend(subject, data, options = {}) {
|
||||
sender_id = uuidv4()
|
||||
} = options;
|
||||
|
||||
logTrace(correlation_id, `Starting smartsend for subject: ${subject}`);
|
||||
logTrace(correlation_id, `smartsend: data array length=${data.length}`);
|
||||
logTrace(correlation_id, `Starting smartpack for subject: ${subject}`);
|
||||
logTrace(correlation_id, `smartpack: data array length=${data.length}`);
|
||||
|
||||
// Debug: Log input data structure
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
const [dataname, payloadData, payloadType] = data[i];
|
||||
logTrace(correlation_id, `smartsend: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
logTrace(correlation_id, `smartpack: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
}
|
||||
|
||||
// Process payloads
|
||||
const payloads = [];
|
||||
for (const [dataname, payloadData, payloadType] of data) {
|
||||
logTrace(correlation_id, `smartsend: Processing payload '${dataname}' type=${payloadType}`);
|
||||
logTrace(correlation_id, `smartsend: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
logTrace(correlation_id, `smartpack: Processing payload '${dataname}' type=${payloadType}`);
|
||||
logTrace(correlation_id, `smartpack: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
|
||||
|
||||
const payloadBytes = await serializeData(payloadData, payloadType);
|
||||
const payloadSize = payloadBytes.byteLength;
|
||||
@@ -552,7 +552,7 @@ async function smartsend(subject, data, options = {}) {
|
||||
*
|
||||
* @example
|
||||
* // Receive from JSON string directly
|
||||
* const env = await smartreceive(jsonString, {
|
||||
* const env = await smartunpack(jsonString, {
|
||||
* fileserver_download_handler: fetchWithBackoff,
|
||||
* max_retries: 5,
|
||||
* base_delay: 100,
|
||||
@@ -560,7 +560,7 @@ async function smartsend(subject, data, options = {}) {
|
||||
* });
|
||||
*
|
||||
* // Receive from transport message object (e.g., NATS, MQTT)
|
||||
* const env = await smartreceive(natsMsg, {
|
||||
* const env = await smartunpack(natsMsg, {
|
||||
* fileserver_download_handler: fetchWithBackoff
|
||||
* });
|
||||
* // env.payloads is an Array of [dataname, data, type] arrays
|
||||
@@ -568,7 +568,7 @@ async function smartsend(subject, data, options = {}) {
|
||||
* console.log(`${dataname}: ${data} (type: ${type})`);
|
||||
* }
|
||||
*/
|
||||
async function smartreceive(msg, options = {}) {
|
||||
async function smartunpack(msg, options = {}) {
|
||||
const {
|
||||
fileserver_download_handler = fetchWithBackoff,
|
||||
max_retries = 5,
|
||||
@@ -592,28 +592,28 @@ async function smartreceive(msg, options = {}) {
|
||||
throw new Error('Invalid message format: expected JSON string or message object');
|
||||
}
|
||||
|
||||
logTrace('smartreceive', `smartreceive: raw payload length=${payload.length}`);
|
||||
logTrace('smartunpack', `smartunpack: raw payload length=${payload.length}`);
|
||||
|
||||
// Debug: Show first 200 chars of payload
|
||||
const payloadPreview = payload.substring(0, 200);
|
||||
logTrace('smartreceive', `smartreceive: payload preview: ${payloadPreview}`);
|
||||
logTrace('smartunpack', `smartunpack: payload preview: ${payloadPreview}`);
|
||||
|
||||
let envJsonObj;
|
||||
try {
|
||||
envJsonObj = JSON.parse(payload);
|
||||
} catch (e) {
|
||||
logTrace('smartreceive', `smartreceive: JSON parse failed: ${e.message}`);
|
||||
logTrace('smartunpack', `smartunpack: JSON parse failed: ${e.message}`);
|
||||
throw e;
|
||||
}
|
||||
|
||||
logTrace(envJsonObj.correlation_id, 'Processing received message');
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: envelope has ${envJsonObj.payloads.length} payloads`);
|
||||
logTrace(envJsonObj.correlation_id, `smartunpack: envelope has ${envJsonObj.payloads.length} payloads`);
|
||||
|
||||
// Process all payloads in the envelope
|
||||
const payloadsList = [];
|
||||
const numPayloads = envJsonObj.payloads.length;
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Processing ${numPayloads} payloads`);
|
||||
logTrace(envJsonObj.correlation_id, `smartunpack: Processing ${numPayloads} payloads`);
|
||||
|
||||
for (let i = 0; i < numPayloads; i++) {
|
||||
const payloadObj = envJsonObj.payloads[i];
|
||||
@@ -621,7 +621,7 @@ async function smartreceive(msg, options = {}) {
|
||||
const dataname = payloadObj.dataname;
|
||||
const payloadType = payloadObj.payload_type;
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`);
|
||||
logTrace(envJsonObj.correlation_id, `smartunpack: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`);
|
||||
|
||||
if (transport === 'direct') {
|
||||
logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`);
|
||||
@@ -664,7 +664,7 @@ async function smartreceive(msg, options = {}) {
|
||||
}
|
||||
}
|
||||
|
||||
logTrace(envJsonObj.correlation_id, `smartreceive: Successfully processed all ${payloadsList.length} payloads`);
|
||||
logTrace(envJsonObj.correlation_id, `smartunpack: Successfully processed all ${payloadsList.length} payloads`);
|
||||
envJsonObj.payloads = payloadsList;
|
||||
return envJsonObj;
|
||||
}
|
||||
@@ -675,12 +675,12 @@ const msghandler = {
|
||||
/**
|
||||
* Send data with automatic transport selection
|
||||
*/
|
||||
smartsend,
|
||||
smartpack,
|
||||
|
||||
/**
|
||||
* Receive and process messages
|
||||
*/
|
||||
smartreceive,
|
||||
smartunpack,
|
||||
|
||||
/**
|
||||
* Upload data to plik server in one-shot mode
|
||||
|
||||
@@ -372,7 +372,7 @@ def _build_payload(
|
||||
}
|
||||
|
||||
|
||||
async def smartsend(
|
||||
async def smartpack(
|
||||
subject: str,
|
||||
data: List[Tuple[str, Any, str]],
|
||||
broker_url: str = DEFAULT_BROKER_URL,
|
||||
@@ -429,7 +429,7 @@ async def smartsend(
|
||||
Example:
|
||||
>>> # Send a single payload (still wrapped in a list)
|
||||
>>> data = {"key": "value"}
|
||||
>>> env, env_json_str = await smartsend(
|
||||
>>> env, env_json_str = await smartpack(
|
||||
... "my.subject",
|
||||
... [("dataname1", data, "dictionary")]
|
||||
... )
|
||||
@@ -444,7 +444,7 @@ async def smartsend(
|
||||
if sender_id is None:
|
||||
sender_id = str(uuid.uuid4())
|
||||
|
||||
log_trace(correlation_id, f"Starting smartsend for subject: {subject}")
|
||||
log_trace(correlation_id, f"Starting smartpack for subject: {subject}")
|
||||
|
||||
# Process payloads
|
||||
payloads = []
|
||||
@@ -494,7 +494,7 @@ async def smartsend(
|
||||
return env, env_json_str
|
||||
|
||||
|
||||
async def smartreceive(
|
||||
async def smartunpack(
|
||||
msg: Any,
|
||||
fileserver_download_handler: Callable = fetch_with_backoff,
|
||||
max_retries: int = 5,
|
||||
@@ -521,10 +521,10 @@ async def smartreceive(
|
||||
|
||||
Example:
|
||||
>>> # Receive from JSON string directly
|
||||
>>> env = await smartreceive(json_string)
|
||||
>>> env = await smartunpack(json_string)
|
||||
>>>
|
||||
>>> # Receive from transport message object (e.g., NATS, MQTT)
|
||||
>>> env = await smartreceive(nats_msg, fileserver_download_handler=fetch_with_backoff)
|
||||
>>> env = await smartunpack(nats_msg, fileserver_download_handler=fetch_with_backoff)
|
||||
>>> # env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
|
||||
>>> for dataname, data, type_ in env["payloads"]:
|
||||
>>> print(f"{dataname}: {data} (type: {type_})")
|
||||
@@ -623,7 +623,7 @@ class msghandler:
|
||||
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
|
||||
self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL
|
||||
|
||||
async def smartsend(
|
||||
async def smartpack(
|
||||
self,
|
||||
subject: str,
|
||||
data: List[Tuple[str, Any, str]],
|
||||
@@ -635,16 +635,16 @@ class msghandler:
|
||||
Args:
|
||||
subject: Subject/topic to send to
|
||||
data: List of (dataname, data, type) tuples
|
||||
**kwargs: Additional options passed to smartsend
|
||||
**kwargs: Additional options passed to smartpack
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
"""
|
||||
kwargs['broker_url'] = kwargs.get('broker_url', self.broker_url)
|
||||
kwargs['fileserver_url'] = kwargs.get('fileserver_url', self.fileserver_url)
|
||||
return await smartsend(subject, data, **kwargs)
|
||||
return await smartpack(subject, data, **kwargs)
|
||||
|
||||
async def smartreceive(
|
||||
async def smartunpack(
|
||||
self,
|
||||
msg: Any,
|
||||
**kwargs
|
||||
@@ -654,12 +654,12 @@ class msghandler:
|
||||
|
||||
Args:
|
||||
msg: Message to process
|
||||
**kwargs: Additional options passed to smartreceive
|
||||
**kwargs: Additional options passed to smartunpack
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
return await smartreceive(msg, **kwargs)
|
||||
return await smartunpack(msg, **kwargs)
|
||||
|
||||
|
||||
# Convenience functions for module-level usage
|
||||
@@ -679,7 +679,7 @@ def send(
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
"""
|
||||
return asyncio.run(smartsend(subject, data, **kwargs))
|
||||
return asyncio.run(smartpack(subject, data, **kwargs))
|
||||
|
||||
|
||||
def receive(
|
||||
@@ -696,12 +696,12 @@ def receive(
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
return asyncio.run(smartreceive(msg, **kwargs))
|
||||
return asyncio.run(smartunpack(msg, **kwargs))
|
||||
|
||||
|
||||
__all__ = [
|
||||
'smartsend',
|
||||
'smartreceive',
|
||||
'smartpack',
|
||||
'smartunpack',
|
||||
'plik_oneshot_upload',
|
||||
'fetch_with_backoff',
|
||||
'msghandler',
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// msghandler Rust Module
|
||||
// Cross-platform bi-directional data bridge
|
||||
// Implements smartsend and smartreceive for message transport
|
||||
// Implements smartpack and smartunpack for message transport
|
||||
// with support for both direct payload transport and URL-based transport
|
||||
// for larger payloads using the Claim-Check pattern.
|
||||
//
|
||||
@@ -325,8 +325,8 @@ impl MsgEnvelopeV1 {
|
||||
// Options Structures
|
||||
// ============================================================================
|
||||
|
||||
/// Options for the `smartsend` function
|
||||
pub struct SmartsendOptions {
|
||||
/// Options for the `smartpack` function
|
||||
pub struct smartpackOptions {
|
||||
/// Broker URL
|
||||
pub broker_url: String,
|
||||
/// HTTP file server URL for large payloads
|
||||
@@ -355,9 +355,9 @@ pub struct SmartsendOptions {
|
||||
pub sender_id: String,
|
||||
}
|
||||
|
||||
impl Default for SmartsendOptions {
|
||||
impl Default for smartpackOptions {
|
||||
fn default() -> Self {
|
||||
SmartsendOptions {
|
||||
smartpackOptions {
|
||||
broker_url: DEFAULT_BROKER_URL.to_string(),
|
||||
fileserver_url: DEFAULT_FILESERVER_URL.to_string(),
|
||||
fileserver_upload_handler: None,
|
||||
@@ -375,8 +375,8 @@ impl Default for SmartsendOptions {
|
||||
}
|
||||
}
|
||||
|
||||
/// Options for the `smartreceive` function
|
||||
pub struct SmartreceiveOptions {
|
||||
/// Options for the `smartunpack` function
|
||||
pub struct smartunpackOptions {
|
||||
/// Custom file server download handler (optional, uses exponential backoff by default)
|
||||
pub fileserver_download_handler: Option<Arc<dyn FileDownloadHandler>>,
|
||||
/// Maximum retry attempts for fetching a URL
|
||||
@@ -387,9 +387,9 @@ pub struct SmartreceiveOptions {
|
||||
pub max_delay: u64,
|
||||
}
|
||||
|
||||
impl Default for SmartreceiveOptions {
|
||||
impl Default for smartunpackOptions {
|
||||
fn default() -> Self {
|
||||
SmartreceiveOptions {
|
||||
smartunpackOptions {
|
||||
fileserver_download_handler: None,
|
||||
max_retries: DEFAULT_MAX_RETRIES,
|
||||
base_delay: DEFAULT_BASE_DELAY,
|
||||
@@ -689,7 +689,7 @@ pub fn log_trace(correlation_id: &str, message: &str) {
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Public API: smartsend
|
||||
// Public API: smartpack
|
||||
// ============================================================================
|
||||
|
||||
/// Send data with automatic transport selection.
|
||||
@@ -715,23 +715,23 @@ pub fn log_trace(correlation_id: &str, message: &str) {
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// use msghandler::{smartsend, Payload, SmartsendOptions};
|
||||
/// use msghandler::{smartpack, Payload, smartpackOptions};
|
||||
///
|
||||
/// let (envelope, json_str) = smartsend(
|
||||
/// let (envelope, json_str) = smartpack(
|
||||
/// "/agent/wine/api/v1/prompt",
|
||||
/// &[
|
||||
/// ("msg".to_string(), Payload::Text("Hello!".to_string()), "text".to_string()),
|
||||
/// ("data".to_string(), Payload::Binary(vec![1, 2, 3]), "binary".to_string()),
|
||||
/// ],
|
||||
/// &SmartsendOptions::default(),
|
||||
/// &smartpackOptions::default(),
|
||||
/// ).unwrap();
|
||||
///
|
||||
/// // Caller publishes via their preferred transport
|
||||
/// ```
|
||||
pub fn smartsend(
|
||||
pub fn smartpack(
|
||||
subject: &str,
|
||||
data: &[(String, Payload, String)],
|
||||
options: &SmartsendOptions,
|
||||
options: &smartpackOptions,
|
||||
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
||||
let correlation_id = if options.correlation_id.is_empty() {
|
||||
Uuid::new_v4().to_string()
|
||||
@@ -752,7 +752,7 @@ pub fn smartsend(
|
||||
};
|
||||
|
||||
log_trace(&correlation_id, &format!(
|
||||
"Starting smartsend for subject: {}", subject
|
||||
"Starting smartpack for subject: {}", subject
|
||||
));
|
||||
|
||||
let mut payloads: Vec<MsgPayloadV1> = Vec::new();
|
||||
@@ -844,7 +844,7 @@ pub fn smartsend(
|
||||
// ============================================================================
|
||||
|
||||
/// Store deserialized Payload data back into a MsgPayloadV1's data field.
|
||||
/// After smartreceive(), payload.data contains the deserialized content as a string
|
||||
/// After smartunpack(), payload.data contains the deserialized content as a string
|
||||
/// (decoded text, JSON string, or base64 for binary types).
|
||||
fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> MsgPayloadV1 {
|
||||
let mut p = payload.clone();
|
||||
@@ -861,7 +861,7 @@ fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> Ms
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Public API: smartreceive
|
||||
// Public API: smartunpack
|
||||
// ============================================================================
|
||||
|
||||
/// Receive and process messages.
|
||||
@@ -880,7 +880,7 @@ fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> Ms
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// use msghandler::{smartreceive, SmartreceiveOptions};
|
||||
/// use msghandler::{smartunpack, smartunpackOptions};
|
||||
/// use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
|
||||
///
|
||||
/// let msg_json_str = r#"{"correlation_id":"abc123","msg_id":"msg-uuid",
|
||||
@@ -893,7 +893,7 @@ fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> Ms
|
||||
/// "data":"SGVsbG8=","metadata":{"payload_bytes":5}
|
||||
/// }]}"#;
|
||||
///
|
||||
/// let envelope = smartreceive(msg_json_str, &SmartreceiveOptions::default()).unwrap();
|
||||
/// let envelope = smartunpack(msg_json_str, &smartunpackOptions::default()).unwrap();
|
||||
///
|
||||
/// for payload in &envelope.payloads {
|
||||
/// if payload.transport == "direct" {
|
||||
@@ -904,9 +904,9 @@ fn store_deserialized_data(payload: &MsgPayloadV1, deserialized: &Payload) -> Ms
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub fn smartreceive(
|
||||
pub fn smartunpack(
|
||||
msg_json_str: &str,
|
||||
options: &SmartreceiveOptions,
|
||||
options: &smartunpackOptions,
|
||||
) -> Result<MsgEnvelopeV1, MsgHandlerError> {
|
||||
// Parse the JSON envelope
|
||||
let mut env: MsgEnvelopeV1 = serde_json::from_str(msg_json_str)
|
||||
@@ -998,9 +998,9 @@ pub fn smartreceive(
|
||||
pub fn send_text(
|
||||
subject: &str,
|
||||
text: &str,
|
||||
options: &SmartsendOptions,
|
||||
options: &smartpackOptions,
|
||||
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
||||
smartsend(
|
||||
smartpack(
|
||||
subject,
|
||||
&[(
|
||||
"text".to_string(),
|
||||
@@ -1015,9 +1015,9 @@ pub fn send_text(
|
||||
pub fn send_dictionary(
|
||||
subject: &str,
|
||||
data: &JsonValue,
|
||||
options: &SmartsendOptions,
|
||||
options: &smartpackOptions,
|
||||
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
||||
smartsend(
|
||||
smartpack(
|
||||
subject,
|
||||
&[(
|
||||
"dictionary".to_string(),
|
||||
@@ -1032,9 +1032,9 @@ pub fn send_dictionary(
|
||||
pub fn send_binary(
|
||||
subject: &str,
|
||||
data: &[u8],
|
||||
options: &SmartsendOptions,
|
||||
options: &smartpackOptions,
|
||||
) -> Result<(MsgEnvelopeV1, String), MsgHandlerError> {
|
||||
smartsend(
|
||||
smartpack(
|
||||
subject,
|
||||
&[(
|
||||
"binary".to_string(),
|
||||
@@ -1094,10 +1094,10 @@ pub fn plik_upload_file(
|
||||
|
||||
// All public types are already exported via `pub` on their definitions.
|
||||
// Key types:
|
||||
// - `smartsend`, `smartreceive` - main API functions
|
||||
// - `smartpack`, `smartunpack` - main API functions
|
||||
// - `Payload` - type-safe payload enum
|
||||
// - `MsgEnvelopeV1`, `MsgPayloadV1` - wire format structs
|
||||
// - `SmartsendOptions`, `SmartreceiveOptions` - configuration
|
||||
// - `smartpackOptions`, `smartunpackOptions` - configuration
|
||||
// - `FileUploadHandler`, `FileDownloadHandler` - trait abstractions
|
||||
// - `PlikOneshotUploadHandler`, `BackoffDownloadHandler` - default implementations
|
||||
// - `MsgHandlerError` - error type
|
||||
@@ -1193,12 +1193,12 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_default_options() {
|
||||
let opts = SmartsendOptions::default();
|
||||
let opts = smartpackOptions::default();
|
||||
assert_eq!(opts.size_threshold, DEFAULT_SIZE_THRESHOLD);
|
||||
assert_eq!(opts.broker_url, DEFAULT_BROKER_URL);
|
||||
assert_eq!(opts.fileserver_url, DEFAULT_FILESERVER_URL);
|
||||
|
||||
let opts = SmartreceiveOptions::default();
|
||||
let opts = smartunpackOptions::default();
|
||||
assert_eq!(opts.max_retries, DEFAULT_MAX_RETRIES);
|
||||
assert_eq!(opts.base_delay, DEFAULT_BASE_DELAY);
|
||||
assert_eq!(opts.max_delay, DEFAULT_MAX_DELAY);
|
||||
|
||||
@@ -263,7 +263,7 @@ def _publish(subject, message, correlation_id):
|
||||
# Placeholder - actual implementation would publish via preferred transport
|
||||
|
||||
|
||||
def smartsend(subject, data, **kwargs):
|
||||
def smartpack(subject, data, **kwargs):
|
||||
"""
|
||||
Send data with automatic transport selection.
|
||||
|
||||
@@ -306,7 +306,7 @@ def smartsend(subject, data, **kwargs):
|
||||
|
||||
Example:
|
||||
>>> # Send text payload
|
||||
>>> env, env_json_str = smartsend(
|
||||
>>> env, env_json_str = smartpack(
|
||||
... "/chat",
|
||||
... [("message", "Hello!", "text")]
|
||||
... )
|
||||
@@ -330,7 +330,7 @@ def smartsend(subject, data, **kwargs):
|
||||
is_publish = kwargs.get('is_publish', True)
|
||||
fileserver_upload_handler = kwargs.get('fileserver_upload_handler', _sync_fileserver_upload)
|
||||
|
||||
log_trace(correlation_id, f"Starting smartsend for subject: {subject}")
|
||||
log_trace(correlation_id, f"Starting smartpack for subject: {subject}")
|
||||
|
||||
# Process payloads
|
||||
payloads = []
|
||||
@@ -390,7 +390,7 @@ def smartsend(subject, data, **kwargs):
|
||||
return env, env_json_str
|
||||
|
||||
|
||||
def smartreceive(msg, **kwargs):
|
||||
def smartunpack(msg, **kwargs):
|
||||
"""
|
||||
Receive and process messages.
|
||||
|
||||
@@ -414,10 +414,10 @@ def smartreceive(msg, **kwargs):
|
||||
|
||||
Example:
|
||||
>>> # Receive from JSON string
|
||||
>>> env = smartreceive(json_string)
|
||||
>>> env = smartunpack(json_string)
|
||||
>>>
|
||||
>>> # Receive from transport message object
|
||||
>>> env = smartreceive(transport_msg, fileserver_download_handler=_sync_fileserver_download)
|
||||
>>> env = smartunpack(transport_msg, fileserver_download_handler=_sync_fileserver_download)
|
||||
>>> # env is a Dict with "payloads" key containing List[Tuple[str, Any, str]]
|
||||
>>> for dataname, data, type_ in env["payloads"]:
|
||||
... print(f"{dataname}: {data} (type: {type_})")
|
||||
@@ -530,34 +530,34 @@ class msghandler:
|
||||
self.broker_url = broker_url or self.DEFAULT_BROKER_URL
|
||||
self.fileserver_url = fileserver_url or self.DEFAULT_FILESERVER_URL
|
||||
|
||||
def smartsend(self, subject, data, **kwargs):
|
||||
def smartpack(self, subject, data, **kwargs):
|
||||
"""
|
||||
Send data.
|
||||
|
||||
Args:
|
||||
subject: Subject/topic to send to
|
||||
data: List of (dataname, data, type) tuples
|
||||
**kwargs: Additional options passed to smartsend
|
||||
**kwargs: Additional options passed to smartpack
|
||||
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
"""
|
||||
kwargs['broker_url'] = kwargs.get('broker_url', self.broker_url)
|
||||
kwargs['fileserver_url'] = kwargs.get('fileserver_url', self.fileserver_url)
|
||||
return smartsend(subject, data, **kwargs)
|
||||
return smartpack(subject, data, **kwargs)
|
||||
|
||||
def smartreceive(self, msg, **kwargs):
|
||||
def smartunpack(self, msg, **kwargs):
|
||||
"""
|
||||
Receive and process message.
|
||||
|
||||
Args:
|
||||
msg: Message to process
|
||||
**kwargs: Additional options passed to smartreceive
|
||||
**kwargs: Additional options passed to smartunpack
|
||||
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
return smartreceive(msg, **kwargs)
|
||||
return smartunpack(msg, **kwargs)
|
||||
|
||||
|
||||
# Convenience functions for module-level usage
|
||||
@@ -573,7 +573,7 @@ def send(subject, data, **kwargs):
|
||||
Returns:
|
||||
Tuple of (env, env_json_str)
|
||||
"""
|
||||
return smartsend(subject, data, **kwargs)
|
||||
return smartpack(subject, data, **kwargs)
|
||||
|
||||
|
||||
def receive(msg, **kwargs):
|
||||
@@ -587,12 +587,12 @@ def receive(msg, **kwargs):
|
||||
Returns:
|
||||
Dict with envelope metadata and payloads
|
||||
"""
|
||||
return smartreceive(msg, **kwargs)
|
||||
return smartunpack(msg, **kwargs)
|
||||
|
||||
|
||||
__all__ = [
|
||||
'smartsend',
|
||||
'smartreceive',
|
||||
'smartpack',
|
||||
'smartunpack',
|
||||
'msghandler',
|
||||
'send',
|
||||
'receive',
|
||||
|
||||
Reference in New Issue
Block a user