diff --git a/src/msghandler-csr.js b/src/msghandler-csr.js index 5b075ce..7ecf45c 100644 --- a/src/msghandler-csr.js +++ b/src/msghandler-csr.js @@ -526,36 +526,15 @@ async function smartunpack(msg, options = {}) { max_delay = 5000 } = options; - // Handle both raw JSON strings and transport message objects - let payload; - if (typeof msg === 'string') { - payload = msg; - } else if (msg !== null && typeof msg === 'object') { - if (msg.data !== undefined) { - payload = typeof msg.data === 'string' ? msg.data : new TextDecoder().decode(msg.data); - } else if (msg.payload !== undefined) { - payload = typeof msg.payload === 'string' ? msg.payload : new TextDecoder().decode(msg.payload); - } else { - throw new Error('Message has neither data nor payload property'); - } + let envJsonObj; + + // NATS's Javascript lib already return JSON object + if (msg !== null && typeof msg === 'object') { + envJsonObj = msg; } else { throw new Error('Invalid message format: expected JSON string or message object'); } - - logTrace('smartunpack', `smartunpack: raw payload length=${payload.length}`); - - // Debug: Show first 200 chars of payload - const payloadPreview = payload.substring(0, 200); - logTrace('smartunpack', `smartunpack: payload preview: ${payloadPreview}`); - - let envJsonObj; - try { - envJsonObj = JSON.parse(payload); - } catch (e) { - logTrace('smartunpack', `smartunpack: JSON parse failed: ${e.message}`); - throw e; - } - + logTrace(envJsonObj.correlation_id, 'Processing received message'); logTrace(envJsonObj.correlation_id, `smartunpack: envelope has ${envJsonObj.payloads.length} payloads`); diff --git a/test/test_julia_mix_payloads_sender.jl b/test/test_julia_mix_payloads_sender.jl index 9da2d47..7ecf45c 100644 --- a/test/test_julia_mix_payloads_sender.jl +++ b/test/test_julia_mix_payloads_sender.jl @@ -1,259 +1,632 @@ -#!/usr/bin/env julia -# Test script for mixed-content message testing -# Tests sending a mix of text, dictionary, arrowtable, jsontable, image, audio, video, and binary data -# from Julia serviceA to Julia serviceB using msghandler.jl smartpack -# -# This test demonstrates that any combination and any number of mixed content -# can be sent and received correctly. -# -# Key concept: DataFrames are the main table representation in Julia. -# The msghandler.jl library handles serialization: -# - For "arrowtable" type: DataFrame is serialized to Arrow IPC format -# - For "jsontable" type: DataFrame is converted to Vector{Dict} and then to JSON +/** + * msghandler - Cross-Platform Bi-Directional Data Bridge + * Browser-Compatible Implementation (Client-Side Rendering) + * + * This module provides functionality for sending and receiving data across network boundaries + * with support for both direct payload transport and URL-based transport for larger payloads. + * + * Supported payload types: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" + * Note: Browser version does NOT support Apache Arrow IPC (arrowtable) due to browser compatibility constraints. + * Use "jsontable" for tabular data in browser applications. + * + * Browser requirements: + * - Modern browser with ES module support (or use module bundler) + * - Web Crypto API for UUID generation + * - Fetch API for HTTP requests + * + * Browser-compatible version uses: + * - Web Crypto API for UUID generation + * - Uint8Array instead of Buffer + * - fetch API for file server communication + * + * @module msghandlerCSR + */ -using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64 +// Use native fetch available in browsers -# Include the bridge module -include("/home/ton/docker-apps/sommpanion/msghandler/src/msghandler.jl") -using .msghandler +// ---------------------------------------------- Constants ---------------------------------------------- // -# Configuration -const SUBJECT = "/msghandler" -const NATS_URL = "nats.yiem.cc" -# const FILESERVER_URL = "http://192.168.88.104:8080" -const FILESERVER_URL = "https://fileserver.yiem.cc" +/** + * Default size threshold for switching from direct to link transport (0.5MB) + */ +const DEFAULT_SIZE_THRESHOLD = 500_000; -# Create correlation ID for tracing -correlation_id = string(uuid4()) +/** + * Default broker URL + */ +const DEFAULT_BROKER_URL = 'localhost:4222'; +/** + * Default HTTP file server URL for link transport + */ +const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; -# ------------------------------------------------------------------------------------------------ # -# test mixed content transfer # -# ------------------------------------------------------------------------------------------------ # +// ---------------------------------------------- Utility Functions ---------------------------------------------- // +/** + * Convert Uint8Array to Base64 string + * @param {Uint8Array} data - Data to encode + * @returns {string} Base64 encoded string + */ +function bufferToBase64(data) { + const bytes = new Uint8Array(data); + const binary = String.fromCharCode(...bytes); + return btoa(binary); +} -# Helper: Log with correlation ID -function log_trace(message) - timestamp = Dates.now() - println("[$timestamp] [Correlation: $correlation_id] $message") -end +/** + * Convert Base64 string to Uint8Array + * @param {string} base64 - Base64 encoded string + * @returns {Uint8Array} Decoded binary data + */ +function base64ToBuffer(base64) { + const binary = atob(base64); + const len = binary.length; + const bytes = new Uint8Array(len); + for (let i = 0; i < len; i++) { + bytes[i] = binary.charCodeAt(i); + } + return bytes; +} +/** + * Generate UUID v4 using Web Crypto API + * @returns {string} UUID string + */ +function uuidv4() { + const array = new Uint8Array(16); + crypto.getRandomValues(array); + array[6] = (array[6] & 0x0f) | 0x40; + array[8] = (array[8] & 0x3f) | 0x80; + return Array.from(array, (val) => val.toString(16).padStart(2, '0').toUpperCase()).join(''); +} -# File upload handler for plik server -function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} - # Get upload ID - url_getUploadID = "$fileserver_url/upload" - headers = ["Content-Type" => "application/json"] - body = """{ "OneShot" : true }""" - httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) - responseJson = JSON.parse(String(httpResponse.body)) - uploadid = responseJson["id"] - uploadtoken = responseJson["uploadToken"] +/** + * Log a trace message with correlation ID and timestamp + * @param {string} correlationId - Correlation ID for tracing + * @param {string} message - Message content to log + */ +function logTrace(correlationId, message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`); +} + +// ---------------------------------------------- Serialization Functions ---------------------------------------------- // + +/** + * Serialize data according to specified format + * @param {any} data - Data to serialize + * @param {string} payloadType - Target format: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" + * @returns {Uint8Array} Binary representation of the serialized data + */ +async function serializeData(data, payloadType) { + if (payloadType === 'text') { + if (typeof data === 'string') { + return new Uint8Array(new TextEncoder().encode(data)); + } else { + throw new Error('Text data must be a string'); + } + } else if (payloadType === 'dictionary') { + const jsonStr = JSON.stringify(data); + return new Uint8Array(new TextEncoder().encode(jsonStr)); + } else if (payloadType === 'jsontable') { + // Serialize array of objects to JSON format + if (!Array.isArray(data)) { + throw new Error('JSON table data must be an array'); + } + const jsonStr = JSON.stringify(data); + return new Uint8Array(new TextEncoder().encode(jsonStr)); + } else if (payloadType === 'image') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return new Uint8Array(data); + } else { + throw new Error('Image data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); + } + } else if (payloadType === 'audio') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return new Uint8Array(data); + } else { + throw new Error('Audio data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); + } + } else if (payloadType === 'video') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return new Uint8Array(data); + } else { + throw new Error('Video data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); + } + } else if (payloadType === 'binary') { + if (data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return new Uint8Array(data); + } else { + throw new Error('Binary data must be Uint8Array, ArrayBuffer, or ArrayBuffer view'); + } + } else { + throw new Error(`Unknown payload_type: ${payloadType}`); + } +} + +/** + * Deserialize bytes to data based on type + * @param {Uint8Array|ArrayBuffer} data - Serialized data as bytes + * @param {string} payloadType - Data type + * @param {string} correlationId - Correlation ID for logging + * @returns {any} Deserialized data + */ +async function deserializeData(data, payloadType, correlationId) { + const buffer = data instanceof Uint8Array ? data : new Uint8Array(data); - # Upload file - file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") - url_upload = "$fileserver_url/file/$uploadid" - headers = ["X-UploadToken" => uploadtoken] + logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`); - form = HTTP.Form(Dict("file" => file_multipart)) - httpResponse = HTTP.post(url_upload, headers, form) - responseJson = JSON.parse(String(httpResponse.body)) + // Debug: Show first 20 bytes in hex for binary data + if (payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') { + const hexPreview = []; + for (let i = 0; i < Math.min(20, buffer.length); i++) { + hexPreview.push(buffer[i].toString(16).padStart(2, '0')); + } + logTrace(correlationId, `deserializeData: First 20 bytes (hex): ${hexPreview.join(' ')}`); + } - fileid = responseJson["id"] - url = "$fileserver_url/file/$uploadid/$fileid/$dataname" - - return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) -end + if (payloadType === 'text') { + const result = new TextDecoder().decode(buffer); + logTrace(correlationId, `deserializeData: text result length=${result.length}`); + return result; + } else if (payloadType === 'dictionary') { + const jsonStr = new TextDecoder().decode(buffer); + const result = JSON.parse(jsonStr); + logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`); + return result; + } else if (payloadType === 'jsontable') { + const jsonStr = new TextDecoder().decode(buffer); + const result = JSON.parse(jsonStr); + logTrace(correlationId, `deserializeData: jsontable result length=${Array.isArray(result) ? result.length : 'N/A'}`); + return result; + } else if (payloadType === 'image') { + logTrace(correlationId, `deserializeData: image buffer length=${buffer.length}`); + return buffer; + } else if (payloadType === 'audio') { + logTrace(correlationId, `deserializeData: audio buffer length=${buffer.length}`); + return buffer; + } else if (payloadType === 'video') { + logTrace(correlationId, `deserializeData: video buffer length=${buffer.length}`); + return buffer; + } else if (payloadType === 'binary') { + logTrace(correlationId, `deserializeData: binary buffer length=${buffer.length}`); + return buffer; + } else { + throw new Error(`Unknown payload_type: ${payloadType}`); + } +} +// ---------------------------------------------- File Server Handlers ---------------------------------------------- // -# Helper: Create sample data for each type -function create_sample_data() - # Text data (small - direct transport) - text_data = "Hello! This is a test chat message. 🎉\nHow are you doing today? 😊" +/** + * Upload data to plik server in one-shot mode + * @param {string} fileServerUrl - Base URL of the plik server + * @param {string} dataname - Name of the file being uploaded + * @param {Uint8Array} data - Raw byte data of the file content + * @returns {Promise<{status: number, uploadid: string, fileid: string, url: string}>} + */ +async function plikOneshotUpload(fileServerUrl, dataname, data) { + const buffer = data instanceof Uint8Array ? data : new Uint8Array(data); - # Dictionary/JSON data (medium - could be direct or link) - dict_data = Dict( - "type" => "chat", - "sender" => "serviceA", - "receiver" => "serviceB", - "metadata" => Dict( - "timestamp" => string(Dates.now()), - "priority" => "high", - "tags" => ["urgent", "chat", "test"] - ), - "content" => Dict( - "text" => "This is a JSON-formatted chat message with nested structure.", - "format" => "markdown", - "mentions" => ["user1", "user2"] - ) - ) + // Get upload id + const urlGetUploadID = `${fileServerUrl}/upload`; + const headers = { 'Content-Type': 'application/json' }; + const body = JSON.stringify({ OneShot: true }); - # Arrow table data (DataFrame - small - direct transport) - # Uses Arrow IPC format for efficient binary serialization - # msghandler.jl handles serialization: DataFrame -> Arrow IPC - arrow_table_small = DataFrame( - id = 1:10, - name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"], - score = rand(50:100, 10), - active = rand([true, false], 10) - ) + const httpResponse = await fetch(urlGetUploadID, { + method: 'POST', + headers, + body + }); - # Arrow table data (DataFrame - large - link transport) - # ~1.5MB of Arrow data (200,000 rows) - should trigger link transport - # msghandler.jl handles serialization: DataFrame -> Arrow IPC - arrow_table_large = DataFrame( - id = 1:2_000_000, - name = ["user_$i" for i in 1:2_000_000], - score = rand(50:100, 2_000_000), - active = rand([true, false], 2_000_000), - timestamp = [string(Dates.now()) for _ in 1:2_000_000] - ) - - # Json table data (DataFrame - small - direct transport) - # Uses JSON format for human-readable tabular data - # msghandler.jl handles serialization: DataFrame -> Vector{Dict} -> JSON - json_table_small = DataFrame( - id = 1:10, - name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"], - score = rand(50:100, 10), - active = rand([true, false], 10) - ) - - # Json table data (DataFrame - large - link transport) - # ~1.5MB of JSON data (150,000 rows) - should trigger link transport - # msghandler.jl handles serialization: DataFrame -> Vector{Dict} -> JSON - json_table_large = DataFrame( - id = 1:2_000_000, - name = ["user_$i" for i in 1:2_000_000], - score = rand(50:100, 2_000_000), - active = rand([true, false], 2_000_000) - ) - - # Audio data (small binary - direct transport) - audio_data = UInt8[rand(1:255) for _ in 1:100] - - # Audio data (large - link transport) - # ~1.5MB of audio-like data - large_audio_data = UInt8[rand(1:255) for _ in 1:1_500_000] - - # Video data (small binary - direct transport) - video_data = UInt8[rand(1:255) for _ in 1:150] - - # Video data (large - link transport) - # ~1.5MB of video-like data - large_video_data = UInt8[rand(1:255) for _ in 1:1_500_000] - - # Binary data (small - direct transport) - binary_data = UInt8[rand(1:255) for _ in 1:200] - - # Binary data (large - link transport) - # ~1.5MB of binary data - large_binary_data = UInt8[rand(1:255) for _ in 1:1_500_000] - - return ( - text_data, - dict_data, - arrow_table_small, - arrow_table_large, - json_table_small, - json_table_large, - audio_data, - large_audio_data, - video_data, - large_video_data, - binary_data, - large_binary_data - ) -end + const responseJson = await httpResponse.json(); + const uploadid = responseJson.id; + const uploadtoken = responseJson.uploadToken; - -# Sender: Send mixed content via smartpack -function test_mix_send() - # Create sample data - (text_data, dict_data, arrow_table_small, arrow_table_large, json_table_small, json_table_large, audio_data, large_audio_data, video_data, large_video_data, binary_data, large_binary_data) = create_sample_data() - - # Read image files from disk (following test_julia_file_sender.jl pattern) - # Small image - should use direct transport - file_path_small_image = "./test/small_image.jpg" - file_data_small_image = read(file_path_small_image) - filename_small_image = basename(file_path_small_image) - - # Large image - should use link transport - file_path_large_image = "./test/large_image.png" - file_data_large_image = read(file_path_large_image) - filename_large_image = basename(file_path_large_image) - - # Create payloads list - mixed content with both small and large data - # Small data uses direct transport, large data uses link transport - # Key: Pass DataFrame directly and specify type as "arrowtable" or "jsontable" - # msghandler.jl handles the serialization internally - payloads = [ - # Small data (direct transport) - text, dictionary, arrowtable, jsontable, small image - ("chat_text", text_data, "text"), - ("chat_json", dict_data, "dictionary"), - # ("arrow_table_small", arrow_table_small, "arrowtable"), - ("json_table_small", json_table_small, "jsontable"), - (filename_small_image, file_data_small_image, "binary"), - - # Large data (link transport) - large arrowtable, large jsontable, large image, large audio, large video, large binary - # ("arrow_table_large", arrow_table_large, "arrowtable"), - ("json_table_large", json_table_large, "jsontable"), - (filename_large_image, file_data_large_image, "binary"), - ("audio_clip_large", large_audio_data, "audio"), - ("video_clip_large", large_video_data, "video"), - ("binary_file_large", large_binary_data, "binary") - ] + // Upload file + const urlUpload = `${fileServerUrl}/file/${uploadid}`; + const form = new FormData(); + const blob = new Blob([buffer], { type: 'application/octet-stream' }); + form.append('file', blob, dataname); - # Use smartpack with mixed content - env, env_json_str = msghandler.smartpack( - SUBJECT, - payloads; # List of (dataname, data, type) tuples - broker_url = NATS_URL, - fileserver_url = FILESERVER_URL, - fileserver_upload_handler = plik_upload_handler, - size_threshold = 1_000_000, # 1MB threshold - correlation_id = correlation_id, - msg_purpose = "chat", - sender_name = "mix_sender", - receiver_name = "", - receiver_id = "", - reply_to = "", - reply_to_msg_id = "", - ) + const uploadHeaders = { + 'X-UploadToken': uploadtoken + }; - log_trace("Sent message with $(length(env.payloads)) payloads") + const uploadResponse = await fetch(urlUpload, { + method: 'POST', + headers: uploadHeaders, + body: form + }); - # Log transport type for each payload - for (i, payload) in enumerate(env.payloads) - log_trace("Payload $i ('$payload.dataname'):") - log_trace(" Transport: $(payload.transport)") - log_trace(" Type: $(payload.payload_type)") - log_trace(" Size: $(payload.size) bytes") - log_trace(" Encoding: $(payload.encoding)") + const uploadJson = await uploadResponse.json(); + const fileid = uploadJson.id; + + const url = `${fileServerUrl}/file/${uploadid}/${fileid}/${dataname}`; + + return { + status: uploadResponse.status, + uploadid, + fileid, + url + }; +} + +/** + * Fetch data from URL with exponential backoff + * @param {string} url - URL to fetch from + * @param {number} maxRetries - Maximum number of retry attempts + * @param {number} baseDelay - Initial delay in milliseconds + * @param {number} maxDelay - Maximum delay in milliseconds + * @param {string} correlationId - Correlation ID for logging + * @returns {Promise} Fetched data as bytes + */ +async function fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) { + let delay = baseDelay; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + const response = await fetch(url); + + if (response.status === 200) { + logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`); + const arrayBuffer = await response.arrayBuffer(); + return new Uint8Array(arrayBuffer); + } else { + throw new Error(`Failed to fetch: ${response.status}`); + } + } catch (e) { + logTrace(correlationId, `Attempt ${attempt} failed: ${e.constructor.name} - ${e.message}`); + + if (attempt < maxRetries) { + await new Promise(resolve => setTimeout(resolve, delay)); + delay = Math.min(delay * 2, maxDelay); + } + } + } + + throw new Error(`Failed to fetch data after ${maxRetries} attempts`); +} + +// ---------------------------------------------- Core Functions ---------------------------------------------- // + +/** + * Build message envelope from payloads and metadata + * @param {string} subject - Subject/topic + * @param {Array} payloads - Array of payload objects + * @param {Object} options - Envelope metadata options + * @returns {Object} Envelope object + */ +function buildEnvelope(subject, payloads, options) { + return { + correlation_id: options.correlation_id, + msg_id: options.msg_id, + timestamp: new Date().toISOString(), + send_to: subject, + msg_purpose: options.msg_purpose, + sender_name: options.sender_name, + sender_id: options.sender_id, + receiver_name: options.receiver_name, + receiver_id: options.receiver_id, + reply_to: options.reply_to, + reply_to_msg_id: options.reply_to_msg_id, + broker_url: options.broker_url, + metadata: options.metadata || {}, + payloads: payloads + }; +} + +/** + * Build payload object from serialized data + * @param {string} dataname - Name of the payload + * @param {string} payloadType - Type of the payload + * @param {Uint8Array} payloadBytes - Serialized payload bytes + * @param {string} transport - Transport type ("direct" or "link") + * @param {string} data - Data (base64 for direct, URL for link) + * @returns {Object} Payload object + */ +function buildPayload(dataname, payloadType, payloadBytes, transport, data) { + // Determine encoding based on payload type (matching Julia implementation) + let encoding = 'base64'; + if (payloadType === 'jsontable') { + encoding = 'json'; + } + + return { + id: uuidv4(), + dataname, + payload_type: payloadType, + transport, + encoding, + size: payloadBytes.byteLength, + data, + metadata: transport === 'direct' ? { payload_bytes: payloadBytes.byteLength } : {} + }; +} + +/** + * Send data with automatic transport selection + * + * This function intelligently routes data delivery based on payload size. + * If the serialized payload is smaller than size_threshold, it encodes the data as Base64 + * into a "direct" payload. Otherwise, it uploads the data to a fileserver + * and creates a "link" payload with the URL. + * + * Transport publishing is the caller's responsibility. This function returns the + * envelope and its JSON string representation. + * + * @param {string} subject - Subject/topic to send the message to + * @param {Array} data - List of [dataname, data, type] tuples to send + * - type: "text", "dictionary", "jsontable", "image", "audio", "video", "binary" + * - Note: "arrowtable" is NOT supported in browser (use "jsontable" for tabular data) + * @param {Object} options - Optional configuration + * @param {string} [options.broker_url=DEFAULT_BROKER_URL] - Broker URL (for envelope metadata) + * @param {string} [options.fileserver_url=DEFAULT_FILESERVER_URL] - URL of the HTTP file server + * @param {Function} [options.fileserver_upload_handler=plikOneshotUpload] - Function to handle fileserver uploads + * @param {number} [options.size_threshold=DEFAULT_SIZE_THRESHOLD] - Threshold separating direct vs link transport + * @param {string} [options.correlation_id=uuidv4()] - Correlation ID for tracing + * @param {string} [options.msg_purpose="chat"] - Purpose of the message + * @param {string} [options.sender_name="msghandler"] - Name of the sender + * @param {string} [options.receiver_name=""] - Name of the receiver (empty means broadcast) + * @param {string} [options.receiver_id=""] - UUID of the receiver (empty means broadcast) + * @param {string} [options.reply_to=""] - Topic to reply to + * @param {string} [options.reply_to_msg_id=""] - Message ID this message is replying to + * @param {string} [options.msg_id=uuidv4()] - Message ID + * @param {string} [options.sender_id=uuidv4()] - Sender ID + * @returns {Promise<[Object, string]>} Tuple of [env, env_json_str] + * + * @example + * // Send a single payload + * const [env, envJsonStr] = await msghandlerCSR.smartpack( + * "/test", + * [["dataname1", data1, "dictionary"]] + * ); + * + * // Send multiple payloads (use jsontable instead of arrowtable for browser) + * const [env, envJsonStr] = await msghandlerCSR.smartpack( + * "/test", + * [ + * ["dataname1", data1, "dictionary"], + * ["dataname2", tableData, "jsontable"] + * ] + * ); + * + * // Publish via your transport (NATS, MQTT, HTTP, etc.) + * // await myNatsClient.publish("/test", envJsonStr); + */ +async function smartpack(subject, data, options = {}) { + const { + broker_url = DEFAULT_BROKER_URL, + fileserver_url = DEFAULT_FILESERVER_URL, + fileserver_upload_handler = plikOneshotUpload, + size_threshold = DEFAULT_SIZE_THRESHOLD, + correlation_id = uuidv4(), + msg_purpose = 'chat', + sender_name = 'msghandler', + receiver_name = '', + receiver_id = '', + reply_to = '', + reply_to_msg_id = '', + msg_id = uuidv4(), + sender_id = uuidv4() + } = options; + + logTrace(correlation_id, `Starting smartpack for subject: ${subject}`); + logTrace(correlation_id, `smartpack: data array length=${data.length}`); + + // Debug: Log input data structure + for (let i = 0; i < data.length; i++) { + const [dataname, payloadData, payloadType] = data[i]; + logTrace(correlation_id, `smartpack: payload[${i}] dataname=${dataname}, type=${payloadType}, data type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`); + } + + // Process payloads + const payloads = []; + for (const [dataname, payloadData, payloadType] of data) { + logTrace(correlation_id, `smartpack: Processing payload '${dataname}' type=${payloadType}`); + logTrace(correlation_id, `smartpack: payloadData type=${typeof payloadData}, constructor=${payloadData?.constructor?.name}`); - if payload.transport == "link" - log_trace(" URL: $(payload.data)") - end - end - - # Summary - println("\n--- Transport Summary ---") - direct_count = count(p -> p.transport == "direct", env.payloads) - link_count = count(p -> p.transport == "link", env.payloads) - log_trace("Direct transport: $direct_count payloads") - log_trace("Link transport: $link_count payloads") + const payloadBytes = await serializeData(payloadData, payloadType); + const payloadSize = payloadBytes.byteLength; - return env_json_str -end + logTrace(correlation_id, `Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); + // Debug: Show first 20 bytes of serialized data for table type + if (payloadType === 'table') { + const hexPreview = []; + for (let i = 0; i < Math.min(20, payloadBytes.length); i++) { + hexPreview.push(payloadBytes[i].toString(16).padStart(2, '0')); + } + logTrace(correlation_id, `Serialized table data first 20 bytes (hex): ${hexPreview.join(' ')}`); + } -# Run the test -println("Starting mixed-content transport test...") -println("Correlation ID: $correlation_id") + if (payloadSize < size_threshold) { + // Direct path + const payloadB64 = bufferToBase64(payloadBytes); + logTrace(correlation_id, `Using direct transport for ${payloadSize} bytes, base64 length=${payloadB64.length}`); -# Run sender -println("start smartpack for mixed content") -env_json_str = test_mix_send() + const payload = buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64); + payloads.push(payload); + } else { + // Link path + logTrace(correlation_id, `Using link transport, uploading to fileserver`); -println("\nTest completed.") -println("Note: Run test_julia_to_julia_mix_receiver.jl to receive the messages.") + const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); + + if (response.status !== 200) { + throw new Error(`Failed to upload data to fileserver: ${response.status}`); + } + + logTrace(correlation_id, `Uploaded to URL: ${response.url}`); + + const payload = buildPayload(dataname, payloadType, payloadBytes, 'link', response.url); + payloads.push(payload); + } + } + + // Build envelope + const env = buildEnvelope(subject, payloads, { + correlation_id, + msg_id, + msg_purpose, + sender_name, + sender_id, + receiver_name, + receiver_id, + reply_to, + reply_to_msg_id, + broker_url + }); + + const env_json_str = JSON.stringify(env); + + return [env, env_json_str]; +} + +/** + * Receive and process messages + * + * This function processes incoming messages, handling both direct transport + * (base64 decoded payloads) and link transport (URL-based payloads). + * It deserializes the data based on the transport type and returns the result. + * + * @param {string|Object} msg - Message payload. Accepts either a JSON string directly, + * or an object with a `data` or `payload` property containing the JSON string. + * @param {Object} options - Optional configuration + * @param {Function} [options.fileserver_download_handler=fetchWithBackoff] - Function to handle fileserver downloads + * @param {number} [options.max_retries=5] - Maximum retry attempts for fetching URL + * @param {number} [options.base_delay=100] - Initial delay for exponential backoff in ms + * @param {number} [options.max_delay=5000] - Maximum delay for exponential backoff in ms + * @returns {Promise} Envelope object with processed payloads + * + * @example + * // Receive from JSON string directly + * const env = await msghandlerCSR.smartunpack(jsonString, { + * fileserver_download_handler: msghandlerCSR.fetchWithBackoff, + * max_retries: 5, + * base_delay: 100, + * max_delay: 5000 + * }); + * + * // Receive from transport message object (e.g., NATS, MQTT) + * const env = await msghandlerCSR.smartunpack(natsMsg, { + * fileserver_download_handler: msghandlerCSR.fetchWithBackoff + * }); + * // env.payloads is an Array of [dataname, data, type] arrays + * for (const [dataname, data, type] of env.payloads) { + * console.log(`${dataname}: ${data} (type: ${type})`); + * } + */ +async function smartunpack(msg, options = {}) { + const { + fileserver_download_handler = fetchWithBackoff, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + } = options; + + let envJsonObj; + + // NATS's Javascript lib already return JSON object + if (msg !== null && typeof msg === 'object') { + envJsonObj = msg; + } else { + throw new Error('Invalid message format: expected JSON string or message object'); + } + + logTrace(envJsonObj.correlation_id, 'Processing received message'); + logTrace(envJsonObj.correlation_id, `smartunpack: envelope has ${envJsonObj.payloads.length} payloads`); + + // Process all payloads in the envelope + const payloadsList = []; + const numPayloads = envJsonObj.payloads.length; + + logTrace(envJsonObj.correlation_id, `smartunpack: Processing ${numPayloads} payloads`); + + for (let i = 0; i < numPayloads; i++) { + const payloadObj = envJsonObj.payloads[i]; + const transport = payloadObj.transport; + const dataname = payloadObj.dataname; + const payloadType = payloadObj.payload_type; + + logTrace(envJsonObj.correlation_id, `smartunpack: Processing payload ${i + 1}/${numPayloads}: dataname=${dataname}, type=${payloadType}, transport=${transport}`); + + if (transport === 'direct') { + logTrace(envJsonObj.correlation_id, `Direct transport - decoding payload '${dataname}'`); + + // Extract base64 payload from the payload + const payloadB64 = payloadObj.data; + logTrace(envJsonObj.correlation_id, `Direct transport: base64 length=${payloadB64?.length}`); + + // Decode Base64 payload + const payloadBytes = base64ToBuffer(payloadB64); + logTrace(envJsonObj.correlation_id, `Direct transport: decoded bytes=${payloadBytes.length}`); + + // Deserialize based on type + const dataType = payloadObj.payload_type; + const data = await deserializeData(payloadBytes, dataType, envJsonObj.correlation_id); + logTrace(envJsonObj.correlation_id, `Direct transport: deserialized data type=${typeof data}, constructor=${data?.constructor?.name}`); + + payloadsList.push([dataname, data, dataType]); + } else if (transport === 'link') { + // Extract download URL from the payload + const url = payloadObj.data; + logTrace(envJsonObj.correlation_id, `Link transport - fetching '${dataname}' from URL: ${url}`); + + // Fetch with exponential backoff using the download handler + const downloadedData = await fileserver_download_handler( + url, + max_retries, + base_delay, + max_delay, + envJsonObj.correlation_id + ); + + // Deserialize based on type + const dataType = payloadObj.payload_type; + const data = await deserializeData(downloadedData, dataType, envJsonObj.correlation_id); + + payloadsList.push([dataname, data, dataType]); + } else { + throw new Error(`Unknown transport type for payload '${dataname}': ${transport}`); + } + } + + logTrace(envJsonObj.correlation_id, `smartunpack: Successfully processed all ${payloadsList.length} payloads`); + envJsonObj.payloads = payloadsList; + return envJsonObj; +} + +// ---------------------------------------------- Module Exports ---------------------------------------------- // + +const msghandlerCSR = { + /** + * Send data with automatic transport selection + */ + smartpack, + + /** + * Receive and process messages + */ + smartunpack, + + /** + * Upload data to plik server in one-shot mode + */ + plikOneshotUpload, + + /** + * Fetch data from URL with exponential backoff + */ + fetchWithBackoff, + + /** + * Default constants + */ + DEFAULT_SIZE_THRESHOLD, + DEFAULT_BROKER_URL, + DEFAULT_FILESERVER_URL +}; + +export default msghandlerCSR;