This commit is contained in:
2026-06-05 11:27:33 +07:00
parent 0d1bf11b21
commit 97821f83e5
2 changed files with 610 additions and 258 deletions

View File

@@ -526,36 +526,15 @@ async function smartunpack(msg, options = {}) {
max_delay = 5000
} = options;
// Handle both raw JSON strings and transport message objects
let payload;
if (typeof msg === 'string') {
payload = msg;
} else if (msg !== null && typeof msg === 'object') {
if (msg.data !== undefined) {
payload = typeof msg.data === 'string' ? msg.data : new TextDecoder().decode(msg.data);
} else if (msg.payload !== undefined) {
payload = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload);
} else {
throw new Error('Message has neither data nor payload property');
}
let envJsonObj;
// NATS's Javascript lib already return JSON object
if (msg !== null && typeof msg === 'object') {
envJsonObj = msg;
} else {
throw new Error('Invalid message format: expected JSON string or message object');
}
logTrace('smartunpack', `smartunpack: raw payload length=${payload.length}`);
// Debug: Show first 200 chars of payload
const payloadPreview = payload.substring(0, 200);
logTrace('smartunpack', `smartunpack: payload preview: ${payloadPreview}`);
let envJsonObj;
try {
envJsonObj = JSON.parse(payload);
} catch (e) {
logTrace('smartunpack', `smartunpack: JSON parse failed: ${e.message}`);
throw e;
}
logTrace(envJsonObj.correlation_id, 'Processing received message');
logTrace(envJsonObj.correlation_id, `smartunpack: envelope has ${envJsonObj.payloads.length} payloads`);

View File

@@ -1,259 +1,632 @@
#!/usr/bin/env julia
# Test script for mixed-content message testing
# Tests sending a mix of text, dictionary, arrowtable, jsontable, image, audio, video, and binary data
# from Julia serviceA to Julia serviceB using msghandler.jl smartpack
#
# This test demonstrates that any combination and any number of mixed content
# can be sent and received correctly.
#
# Key concept: DataFrames are the main table representation in Julia.
# The msghandler.jl library handles serialization:
# - For "arrowtable" type: DataFrame is serialized to Arrow IPC format
# - For "jsontable" type: DataFrame is converted to Vector{Dict} and then to JSON
/**
* msghandler - Cross-Platform Bi-Directional Data Bridge
* Browser-Compatible Implementation (Client-Side Rendering)
*
* This module provides functionality for sending and receiving data across network boundaries
* with support for both direct payload transport and URL-based transport for larger payloads.
*
* Supported payload types: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
* Note: Browser version does NOT support Apache Arrow IPC (arrowtable) due to browser compatibility constraints.
* Use "jsontable" for tabular data in browser applications.
*
* Browser requirements:
* - Modern browser with ES module support (or use module bundler)
* - Web Crypto API for UUID generation
* - Fetch API for HTTP requests
*
* Browser-compatible version uses:
* - Web Crypto API for UUID generation
* - Uint8Array instead of Buffer
* - fetch API for file server communication
*
* @module msghandlerCSR
*/
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64
// Use native fetch available in browsers
# Include the bridge module
include("/home/ton/docker-apps/sommpanion/msghandler/src/msghandler.jl")
using .msghandler
// ---------------------------------------------- Constants ---------------------------------------------- //
# Configuration
const SUBJECT = "/msghandler"
const NATS_URL = "nats.yiem.cc"
# const FILESERVER_URL = "http://192.168.88.104:8080"
const FILESERVER_URL = "https://fileserver.yiem.cc"
/**
* Default size threshold for switching from direct to link transport (0.5MB)
*/
const DEFAULT_SIZE_THRESHOLD = 500_000;
# Create correlation ID for tracing
correlation_id = string(uuid4())
/**
* Default broker URL
*/
const DEFAULT_BROKER_URL = 'localhost:4222';
/**
* Default HTTP file server URL for link transport
*/
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
# ------------------------------------------------------------------------------------------------ #
# test mixed content transfer #
# ------------------------------------------------------------------------------------------------ #
// ---------------------------------------------- Utility Functions ---------------------------------------------- //
/**
* Convert Uint8Array to Base64 string
* @param {Uint8Array} data - Data to encode
* @returns {string} Base64 encoded string
*/
function bufferToBase64(data) {
const bytes = new Uint8Array(data);
const binary = String.fromCharCode(...bytes);
return btoa(binary);
}
# Helper: Log with correlation ID
function log_trace(message)
timestamp = Dates.now()
println("[$timestamp] [Correlation: $correlation_id] $message")
end
/**
* Convert Base64 string to Uint8Array
* @param {string} base64 - Base64 encoded string
* @returns {Uint8Array} Decoded binary data
*/
function base64ToBuffer(base64) {
const binary = atob(base64);
const len = binary.length;
const bytes = new Uint8Array(len);
for (let i = 0; i < len; i++) {
bytes[i] = binary.charCodeAt(i);
}
return bytes;
}
/**
* Generate UUID v4 using Web Crypto API
* @returns {string} UUID string
*/
function uuidv4() {
const array = new Uint8Array(16);
crypto.getRandomValues(array);
array[6] = (array[6] & 0x0f) | 0x40;
array[8] = (array[8] & 0x3f) | 0x80;
return Array.from(array, (val) => val.toString(16).padStart(2, '0').toUpperCase()).join('');
}
# File upload handler for plik server
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
# Get upload ID
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
/**
* Log a trace message with correlation ID and timestamp
* @param {string} correlationId - Correlation ID for tracing
* @param {string} message - Message content to log
*/
function logTrace(correlationId, message) {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`);
}
# Upload file
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
// ---------------------------------------------- Serialization Functions ---------------------------------------------- //
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
/**
* Serialize data according to specified format
* @param {any} data - Data to serialize
* @param {string} payloadType - Target format: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
* @returns {Uint8Array} Binary representation of the serialized data
*/
async function serializeData(data, payloadType) {
if (payloadType === 'text') {
if (typeof data === 'string') {
return new Uint8Array(new TextEncoder().encode(data));
} else {
throw new Error('Text data must be a string');
}
} else if (payloadType === 'dictionary') {
const jsonStr = JSON.stringify(data);
return new Uint8Array(new TextEncoder().encode(jsonStr));
} else if (payloadType === 'jsontable') {
// Serialize array of objects to JSON format
if (!Array.isArray(data)) {
throw new Error('JSON table data must be an array');
}
const jsonStr = JSON.stringify(data);
return new Uint8Array(new TextEncoder().encode(jsonStr));
} else if (payloadType === 'image') {
if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
return new Uint8Array(data);
} else {
throw new Error('Image data must be Uint8Array, ArrayBuffer, or ArrayBuffer view');
}
} else if (payloadType === 'audio') {
if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
return new Uint8Array(data);
} else {
throw new Error('Audio data must be Uint8Array, ArrayBuffer, or ArrayBuffer view');
}
} else if (payloadType === 'video') {
if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
return new Uint8Array(data);
} else {
throw new Error('Video data must be Uint8Array, ArrayBuffer, or ArrayBuffer view');
}
} else if (payloadType === 'binary') {
if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
return new Uint8Array(data);
} else {
throw new Error('Binary data must be Uint8Array, ArrayBuffer, or ArrayBuffer view');
}
} else {
throw new Error(`Unknown payload_type: ${payloadType}`);
}
}
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
/**
* Deserialize bytes to data based on type
* @param {Uint8Array|ArrayBuffer} data - Serialized data as bytes
* @param {string} payloadType - Data type
* @param {string} correlationId - Correlation ID for logging
* @returns {any} Deserialized data
*/
async function deserializeData(data, payloadType, correlationId) {
const buffer = data instanceof Uint8Array ? data : new Uint8Array(data);
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`);
// Debug: Show first 20 bytes in hex for binary data
if (payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') {
const hexPreview = [];
for (let i = 0; i < Math.min(20, buffer.length); i++) {
hexPreview.push(buffer[i].toString(16).padStart(2, '0'));
}
logTrace(correlationId, `deserializeData: First 20 bytes (hex): ${hexPreview.join(' ')}`);
}
# Helper: Create sample data for each type
function create_sample_data()
# Text data (small - direct transport)
text_data = "Hello! This is a test chat message. 🎉\nHow are you doing today? 😊"
if (payloadType === 'text') {
const result = new TextDecoder().decode(buffer);
logTrace(correlationId, `deserializeData: text result length=${result.length}`);
return result;
} else if (payloadType === 'dictionary') {
const jsonStr = new TextDecoder().decode(buffer);
const result = JSON.parse(jsonStr);
logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`);
return result;
} else if (payloadType === 'jsontable') {
const jsonStr = new TextDecoder().decode(buffer);
const result = JSON.parse(jsonStr);
logTrace(correlationId, `deserializeData: jsontable result length=${Array.isArray(result) ? result.length : 'N/A'}`);
return result;
} else if (payloadType === 'image') {
logTrace(correlationId, `deserializeData: image buffer length=${buffer.length}`);
return buffer;
} else if (payloadType === 'audio') {
logTrace(correlationId, `deserializeData: audio buffer length=${buffer.length}`);
return buffer;
} else if (payloadType === 'video') {
logTrace(correlationId, `deserializeData: video buffer length=${buffer.length}`);
return buffer;
} else if (payloadType === 'binary') {
logTrace(correlationId, `deserializeData: binary buffer length=${buffer.length}`);
return buffer;
} else {
throw new Error(`Unknown payload_type: ${payloadType}`);
}
}
# Dictionary/JSON data (medium - could be direct or link)
dict_data = Dict(
"type" => "chat",
"sender" => "serviceA",
"receiver" => "serviceB",
"metadata" => Dict(
"timestamp" => string(Dates.now()),
"priority" => "high",
"tags" => ["urgent", "chat", "test"]
),
"content" => Dict(
"text" => "This is a JSON-formatted chat message with nested structure.",
"format" => "markdown",
"mentions" => ["user1", "user2"]
)
)
// ---------------------------------------------- File Server Handlers ---------------------------------------------- //
# Arrow table data (DataFrame - small - direct transport)
# Uses Arrow IPC format for efficient binary serialization
# msghandler.jl handles serialization: DataFrame -> Arrow IPC
arrow_table_small = DataFrame(
id = 1:10,
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
score = rand(50:100, 10),
active = rand([true, false], 10)
)
/**
* Upload data to plik server in one-shot mode
* @param {string} fileServerUrl - Base URL of the plik server
* @param {string} dataname - Name of the file being uploaded
* @param {Uint8Array} data - Raw byte data of the file content
* @returns {Promise<{status: number, uploadid: string, fileid: string, url: string}>}
*/
async function plikOneshotUpload(fileServerUrl, dataname, data) {
const buffer = data instanceof Uint8Array ? data : new Uint8Array(data);
# Arrow table data (DataFrame - large - link transport)
# ~1.5MB of Arrow data (200,000 rows) - should trigger link transport
# msghandler.jl handles serialization: DataFrame -> Arrow IPC
arrow_table_large = DataFrame(
id = 1:2_000_000,
name = ["user_$i" for i in 1:2_000_000],
score = rand(50:100, 2_000_000),
active = rand([true, false], 2_000_000),
timestamp = [string(Dates.now()) for _ in 1:2_000_000]
)
// Get upload id
const urlGetUploadID = `${fileServerUrl}/upload`;
const headers = { 'Content-Type': 'application/json' };
const body = JSON.stringify({ OneShot: true });
# Json table data (DataFrame - small - direct transport)
# Uses JSON format for human-readable tabular data
# msghandler.jl handles serialization: DataFrame -> Vector{Dict} -> JSON
json_table_small = DataFrame(
id = 1:10,
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
score = rand(50:100, 10),
active = rand([true, false], 10)
)
const httpResponse = await fetch(urlGetUploadID, {
method: 'POST',
headers,
body
});
# Json table data (DataFrame - large - link transport)
# ~1.5MB of JSON data (150,000 rows) - should trigger link transport
# msghandler.jl handles serialization: DataFrame -> Vector{Dict} -> JSON
json_table_large = DataFrame(
id = 1:2_000_000,
name = ["user_$i" for i in 1:2_000_000],
score = rand(50:100, 2_000_000),
active = rand([true, false], 2_000_000)
)
const responseJson = await httpResponse.json();
const uploadid = responseJson.id;
const uploadtoken = responseJson.uploadToken;
# Audio data (small binary - direct transport)
audio_data = UInt8[rand(1:255) for _ in 1:100]
// Upload file
const urlUpload = `${fileServerUrl}/file/${uploadid}`;
const form = new FormData();
const blob = new Blob([buffer], { type: 'application/octet-stream' });
form.append('file', blob, dataname);
# Audio data (large - link transport)
# ~1.5MB of audio-like data
large_audio_data = UInt8[rand(1:255) for _ in 1:1_500_000]
const uploadHeaders = {
'X-UploadToken': uploadtoken
};
# Video data (small binary - direct transport)
video_data = UInt8[rand(1:255) for _ in 1:150]
const uploadResponse = await fetch(urlUpload, {
method: 'POST',
headers: uploadHeaders,
body: form
});
# Video data (large - link transport)
# ~1.5MB of video-like data
large_video_data = UInt8[rand(1:255) for _ in 1:1_500_000]
const uploadJson = await uploadResponse.json();
const fileid = uploadJson.id;
# Binary data (small - direct transport)
binary_data = UInt8[rand(1:255) for _ in 1:200]
const url = `${fileServerUrl}/file/${uploadid}/${fileid}/${dataname}`;
# Binary data (large - link transport)
# ~1.5MB of binary data
large_binary_data = UInt8[rand(1:255) for _ in 1:1_500_000]
return {
status: uploadResponse.status,
uploadid,
fileid,
url
};
}
return (
text_data,
dict_data,
arrow_table_small,
arrow_table_large,
json_table_small,
json_table_large,
audio_data,
large_audio_data,
video_data,
large_video_data,
binary_data,
large_binary_data
)
end
/**
* Fetch data from URL with exponential backoff
* @param {string} url - URL to fetch from
* @param {number} maxRetries - Maximum number of retry attempts
* @param {number} baseDelay - Initial delay in milliseconds
* @param {number} maxDelay - Maximum delay in milliseconds
* @param {string} correlationId - Correlation ID for logging
* @returns {Promise<Uint8Array>} Fetched data as bytes
*/
async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) {
let delay = baseDelay;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url);
# Sender: Send mixed content via smartpack
function test_mix_send()
# Create sample data
(text_data, dict_data, arrow_table_small, arrow_table_large, json_table_small, json_table_large, audio_data, large_audio_data, video_data, large_video_data, binary_data, large_binary_data) = create_sample_data()
if (response.status === 200) {
logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`);
const arrayBuffer = await response.arrayBuffer();
return new Uint8Array(arrayBuffer);
} else {
throw new Error(`Failed to fetch: ${response.status}`);
}
} catch (e) {
logTrace(correlationId, `Attempt ${attempt} failed: ${e.constructor.name} - ${e.message}`);
# Read image files from disk (following test_julia_file_sender.jl pattern)
# Small image - should use direct transport
file_path_small_image = "./test/small_image.jpg"
file_data_small_image = read(file_path_small_image)
filename_small_image = basename(file_path_small_image)
if (attempt < maxRetries) {
await new Promise(resolve => setTimeout(resolve, delay));
delay = Math.min(delay * 2, maxDelay);
}
}
}
# Large image - should use link transport
file_path_large_image = "./test/large_image.png"
file_data_large_image = read(file_path_large_image)
filename_large_image = basename(file_path_large_image)
throw new Error(`Failed to fetch data after ${maxRetries} attempts`);
}
# Create payloads list - mixed content with both small and large data
# Small data uses direct transport, large data uses link transport
# Key: Pass DataFrame directly and specify type as "arrowtable" or "jsontable"
# msghandler.jl handles the serialization internally
payloads = [
# Small data (direct transport) - text, dictionary, arrowtable, jsontable, small image
("chat_text", text_data, "text"),
("chat_json", dict_data, "dictionary"),
# ("arrow_table_small", arrow_table_small, "arrowtable"),
("json_table_small", json_table_small, "jsontable"),
(filename_small_image, file_data_small_image, "binary"),
// ---------------------------------------------- Core Functions ---------------------------------------------- //
# Large data (link transport) - large arrowtable, large jsontable, large image, large audio, large video, large binary
# ("arrow_table_large", arrow_table_large, "arrowtable"),
("json_table_large", json_table_large, "jsontable"),
(filename_large_image, file_data_large_image, "binary"),
("audio_clip_large", large_audio_data, "audio"),
("video_clip_large", large_video_data, "video"),
("binary_file_large", large_binary_data, "binary")
]
/**
* Build message envelope from payloads and metadata
* @param {string} subject - Subject/topic
* @param {Array} payloads - Array of payload objects
* @param {Object} options - Envelope metadata options
* @returns {Object} Envelope object
*/
function buildEnvelope(subject, payloads, options) {
return {
correlation_id: options.correlation_id,
msg_id: options.msg_id,
timestamp: new Date().toISOString(),
send_to: subject,
msg_purpose: options.msg_purpose,
sender_name: options.sender_name,
sender_id: options.sender_id,
receiver_name: options.receiver_name,
receiver_id: options.receiver_id,
reply_to: options.reply_to,
reply_to_msg_id: options.reply_to_msg_id,
broker_url: options.broker_url,
metadata: options.metadata || {},
payloads: payloads
};
}
# Use smartpack with mixed content
env, env_json_str = msghandler.smartpack(
SUBJECT,
payloads; # List of (dataname, data, type) tuples
broker_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserver_upload_handler = plik_upload_handler,
size_threshold = 1_000_000, # 1MB threshold
correlation_id = correlation_id,
msg_purpose = "chat",
sender_name = "mix_sender",
receiver_name = "",
receiver_id = "",
reply_to = "",
reply_to_msg_id = "",
)
/**
* Build payload object from serialized data
* @param {string} dataname - Name of the payload
* @param {string} payloadType - Type of the payload
* @param {Uint8Array} payloadBytes - Serialized payload bytes
* @param {string} transport - Transport type ("direct" or "link")
* @param {string} data - Data (base64 for direct, URL for link)
* @returns {Object} Payload object
*/
function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
// Determine encoding based on payload type (matching Julia implementation)
let encoding = 'base64';
if (payloadType === 'jsontable') {
encoding = 'json';
}
log_trace("Sent message with $(length(env.payloads)) payloads")
return {
id: uuidv4(),
dataname,
payload_type: payloadType,
transport,
encoding,
size: payloadBytes.byteLength,
data,
metadata: transport === 'direct' ? { payload_bytes: payloadBytes.byteLength } : {}
};
}
# Log transport type for each payload
for (i, payload) in enumerate(env.payloads)
log_trace("Payload $i ('$payload.dataname'):")
log_trace(" Transport: $(payload.transport)")
log_trace(" Type: $(payload.payload_type)")
log_trace(" Size: $(payload.size) bytes")
log_trace(" Encoding: $(payload.encoding)")
/**
* Send data with automatic transport selection
*
* This function intelligently routes data delivery based on payload size.
* If the serialized payload is smaller than size_threshold, it encodes the data as Base64
* into a "direct" payload. Otherwise, it uploads the data to a fileserver
* and creates a "link" payload with the URL.
*
* Transport publishing is the caller's responsibility. This function returns the
* envelope and its JSON string representation.
*
* @param {string} subject - Subject/topic to send the message to
* @param {Array} data - List of [dataname, data, type] tuples to send
* - type: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
* - Note: "arrowtable" is NOT supported in browser (use "jsontable" for tabular data)
* @param {Object} options - Optional configuration
* @param {string} [options.broker_url=DEFAULT_BROKER_URL] - Broker URL (for envelope metadata)
* @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server
* @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads
* @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport
* @param {string} [options.correlation_id=uuidv4()] - Correlation ID for tracing
* @param {string} [options.msg_purpose="chat"] - Purpose of the message
* @param {string} [options.sender_name="msghandler"] - Name of the sender
* @param {string} [options.receiver_name=""] - Name of the receiver (empty means broadcast)
* @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast)
* @param {string} [options.reply_to=""] - Topic to reply to
* @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to
* @param {string} [options.msg_id=uuidv4()] - Message ID
* @param {string} [options.sender_id=uuidv4()] - Sender ID
* @returns {Promise<[Object, string]>} Tuple of [env, env_json_str]
*
* @example
* // Send a single payload
* const [env, envJsonStr] = await msghandlerCSR.smartpack(
* "/test",
* [["dataname1", data1, "dictionary"]]
* );
*
* // Send multiple payloads (use jsontable instead of arrowtable for browser)
* const [env, envJsonStr] = await msghandlerCSR.smartpack(
* "/test",
* [
* ["dataname1", data1, "dictionary"],
* ["dataname2", tableData, "jsontable"]
* ]
* );
*
* // Publish via your transport (NATS, MQTT, HTTP, etc.)
* // await myNatsClient.publish("/test", envJsonStr);
*/
async function smartpack(subject, data, options = {}) {
const {
broker_url = DEFAULT_BROKER_URL,
fileserver_url = DEFAULT_FILESERVER_URL,
fileserver_upload_handler = plikOneshotUpload,
size_threshold = DEFAULT_SIZE_THRESHOLD,
correlation_id = uuidv4(),
msg_purpose = 'chat',
sender_name = 'msghandler',
receiver_name = '',
receiver_id = '',
reply_to = '',
reply_to_msg_id = '',
msg_id = uuidv4(),
sender_id = uuidv4()
} = options;
if payload.transport == "link"
log_trace(" URL: $(payload.data)")
end
end
logTrace(correlation_id, `Starting smartpack for subject: ${subject}`);
logTrace(correlation_id, `smartpack: data array length=${data.length}`);
# Summary
println("\n--- Transport Summary ---")
direct_count = count(p -> p.transport == "direct", env.payloads)
link_count = count(p -> p.transport == "link", env.payloads)
log_trace("Direct transport: $direct_count payloads")
log_trace("Link transport: $link_count payloads")
// Debug: Log input data structure
for (let i = 0; i < data.length; i++) {
const [dataname, payloadData, payloadType] = data[i];
logTrace(correlation_id, `smartpack: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
}
return env_json_str
end
// Process payloads
const payloads = [];
for (const [dataname, payloadData, payloadType] of data) {
logTrace(correlation_id, `smartpack: Processing payload '${dataname}' type=${payloadType}`);
logTrace(correlation_id, `smartpack: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`);
const payloadBytes = await serializeData(payloadData, payloadType);
const payloadSize = payloadBytes.byteLength;
# Run the test
println("Starting mixed-content transport test...")
println("Correlation ID: $correlation_id")
logTrace(correlation_id, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);
# Run sender
println("start smartpack for mixed content")
env_json_str = test_mix_send()
// Debug: Show first 20 bytes of serialized data for table type
if (payloadType === 'table') {
const hexPreview = [];
for (let i = 0; i < Math.min(20, payloadBytes.length); i++) {
hexPreview.push(payloadBytes[i].toString(16).padStart(2, '0'));
}
logTrace(correlation_id, `Serialized table data first 20 bytes (hex): ${hexPreview.join(' ')}`);
}
println("\nTest completed.")
println("Note: Run test_julia_to_julia_mix_receiver.jl to receive the messages.")
if (payloadSize < size_threshold) {
// Direct path
const payloadB64 = bufferToBase64(payloadBytes);
logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes, base64 length=${payloadB64.length}`);
const payload = buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64);
payloads.push(payload);
} else {
// Link path
logTrace(correlation_id, `Using link transport, uploading to fileserver`);
const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes);
if (response.status !== 200) {
throw new Error(`Failed to upload data to fileserver: ${response.status}`);
}
logTrace(correlation_id, `Uploaded to URL: ${response.url}`);
const payload = buildPayload(dataname, payloadType, payloadBytes, 'link', response.url);
payloads.push(payload);
}
}
// Build envelope
const env = buildEnvelope(subject, payloads, {
correlation_id,
msg_id,
msg_purpose,
sender_name,
sender_id,
receiver_name,
receiver_id,
reply_to,
reply_to_msg_id,
broker_url
});
const env_json_str = JSON.stringify(env);
return [env, env_json_str];
}
/**
* Receive and process messages
*
* This function processes incoming messages, handling both direct transport
* (base64 decoded payloads) and link transport (URL-based payloads).
* It deserializes the data based on the transport type and returns the result.
*
* @param {string|Object} msg - Message payload. Accepts either a JSON string directly,
* or an object with a `data` or `payload` property containing the JSON string.
* @param {Object} options - Optional configuration
* @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads
* @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL
* @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms
* @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms
* @returns {Promise<Object>} Envelope object with processed payloads
*
* @example
* // Receive from JSON string directly
* const env = await msghandlerCSR.smartunpack(jsonString, {
* fileserver_download_handler: msghandlerCSR.fetchWithBackoff,
* max_retries: 5,
* base_delay: 100,
* max_delay: 5000
* });
*
* // Receive from transport message object (e.g., NATS, MQTT)
* const env = await msghandlerCSR.smartunpack(natsMsg, {
* fileserver_download_handler: msghandlerCSR.fetchWithBackoff
* });
* // env.payloads is an Array of [dataname, data, type] arrays
* for (const [dataname, data, type] of env.payloads) {
* console.log(`${dataname}: ${data} (type: ${type})`);
* }
*/
async function smartunpack(msg, options = {}) {
const {
fileserver_download_handler = fetchWithBackoff,
max_retries = 5,
base_delay = 100,
max_delay = 5000
} = options;
let envJsonObj;
// NATS's Javascript lib already return JSON object
if (msg !== null && typeof msg === 'object') {
envJsonObj = msg;
} else {
throw new Error('Invalid message format: expected JSON string or message object');
}
logTrace(envJsonObj.correlation_id, 'Processing received message');
logTrace(envJsonObj.correlation_id, `smartunpack: envelope has ${envJsonObj.payloads.length} payloads`);
// Process all payloads in the envelope
const payloadsList = [];
const numPayloads = envJsonObj.payloads.length;
logTrace(envJsonObj.correlation_id, `smartunpack: Processing ${numPayloads} payloads`);
for (let i = 0; i < numPayloads; i++) {
const payloadObj = envJsonObj.payloads[i];
const transport = payloadObj.transport;
const dataname = payloadObj.dataname;
const payloadType = payloadObj.payload_type;
logTrace(envJsonObj.correlation_id, `smartunpack: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`);
if (transport === 'direct') {
logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`);
// Extract base64 payload from the payload
const payloadB64 = payloadObj.data;
logTrace(envJsonObj.correlation_id, `Direct transport: base64 length=${payloadB64?.length}`);
// Decode Base64 payload
const payloadBytes = base64ToBuffer(payloadB64);
logTrace(envJsonObj.correlation_id, `Direct transport: decoded bytes=${payloadBytes.length}`);
// Deserialize based on type
const dataType = payloadObj.payload_type;
const data = await deserializeData(payloadBytes, dataType, envJsonObj.correlation_id);
logTrace(envJsonObj.correlation_id, `Direct transport: deserialized data type=${typeof data}, constructor=${data?.constructor?.name}`);
payloadsList.push([dataname, data, dataType]);
} else if (transport === 'link') {
// Extract download URL from the payload
const url = payloadObj.data;
logTrace(envJsonObj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`);
// Fetch with exponential backoff using the download handler
const downloadedData = await fileserver_download_handler(
url,
max_retries,
base_delay,
max_delay,
envJsonObj.correlation_id
);
// Deserialize based on type
const dataType = payloadObj.payload_type;
const data = await deserializeData(downloadedData, dataType, envJsonObj.correlation_id);
payloadsList.push([dataname, data, dataType]);
} else {
throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`);
}
}
logTrace(envJsonObj.correlation_id, `smartunpack: Successfully processed all ${payloadsList.length} payloads`);
envJsonObj.payloads = payloadsList;
return envJsonObj;
}
// ---------------------------------------------- Module Exports ---------------------------------------------- //
const msghandlerCSR = {
/**
* Send data with automatic transport selection
*/
smartpack,
/**
* Receive and process messages
*/
smartunpack,
/**
* Upload data to plik server in one-shot mode
*/
plikOneshotUpload,
/**
* Fetch data from URL with exponential backoff
*/
fetchWithBackoff,
/**
* Default constants
*/
DEFAULT_SIZE_THRESHOLD,
DEFAULT_BROKER_URL,
DEFAULT_FILESERVER_URL
};
export default msghandlerCSR;