diff --git a/etc.jl b/etc.jl index 81fdc52..7542b76 100644 --- a/etc.jl +++ b/etc.jl @@ -1,21 +1,13 @@ -using JSON - -d = Dict( - "name"=>"ton", - "age"=> 20, - "metadata" => Dict( - "height"=> 155, - "wife"=> "jane" - ) -) - -json_str = JSON.json(d) -json_str_bytes = Vector{UInt8}(json_str) -json_str_2 = String(json_str_bytes) -json_obj = JSON.parse(json_str_2) - +Check architecture.jl, NATSBridge.jl and its test files: +- test_julia_to_julia_table_receiver.jl +- test_julia_to_julia_table_sender.jl. +Now I want to test sending a mix-content message from Julia serviceA to Julia serviceB, for example, a chat system. +The test message must show that any combination and any number and any data size of text | json | table | image | audio | video | binary can be send and receive. +Can you write me the following test files: +- test_julia_to_julia_mix_receiver.jl +- test_julia_to_julia_mix_sender.jl diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 36008d5..e2d49ad 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -589,10 +589,9 @@ function smartreceive( push!(payloads_list, (dataname, data, data_type)) elseif transport == "link" # Link transport - payload is at URL - log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL") # Log link transport handling - # Extract download URL from the payload url = String(payload["data"]) + log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling # Fetch with exponential backoff using the download handler downloaded_data = fileserverDownloadHandler(url, max_retries, base_delay, max_delay, json_data["correlationId"]) diff --git a/src/NATSBridge.js b/src/NATSBridge.js deleted file mode 100644 index d604fbe..0000000 --- a/src/NATSBridge.js +++ /dev/null @@ -1,290 +0,0 @@ -/** - * Bi-Directional Data Bridge - JavaScript Module - * Implements SmartSend and SmartReceive for NATS communication - */ - -const { v4: uuidv4 } = require('uuid'); -const { decode, encode } = require('base64-url'); -const Arrow = require('apache-arrow'); - -// Constants -const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB -const DEFAULT_NATS_URL = 'nats://localhost:4222'; -const DEFAULT_FILESERVER_URL = 'http://localhost:8080/upload'; - -// Supported payload types -const PAYLOAD_TYPES = ['text', 'dictionary', 'table', 'image', 'audio', 'video', 'binary']; - -// Logging helper -function logTrace(correlationId, message) { - const timestamp = new Date().toISOString(); - console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`); -} - -// Message Envelope Class -class MessageEnvelope { - constructor(options = {}) { - this.correlation_id = options.correlation_id || uuidv4(); - this.type = options.type || 'json'; - this.transport = options.transport || 'direct'; - this.payload = options.payload || null; - this.url = options.url || null; - this.metadata = options.metadata || {}; - } - - static fromJSON(jsonStr) { - const data = JSON.parse(jsonStr); - return new MessageEnvelope({ - correlation_id: data.correlation_id, - type: data.type, - transport: data.transport, - payload: data.payload || null, - url: data.url || null, - metadata: data.metadata || {} - }); - } - - toJSON() { - const obj = { - correlation_id: this.correlation_id, - type: this.type, - transport: this.transport - }; - - if (this.payload) { - obj.payload = this.payload; - } - - if (this.url) { - obj.url = this.url; - } - - if (Object.keys(this.metadata).length > 0) { - obj.metadata = this.metadata; - } - - return JSON.stringify(obj); - } -} - -// Helper: Serialize data based on type -function _serializeData(data, type, correlationId) { - if (type === 'json') { - const jsonStr = JSON.stringify(data); - return Buffer.from(jsonStr, 'utf8'); - } else if (type === 'table') { - // Table data - convert to Arrow IPC stream - const writer = new Arrow.Writer(); - writer.writeTable(data); - return writer.toByteArray(); - } else if (type === 'binary') { - // Binary data - treat as binary - if (data instanceof Buffer) { - return data; - } else if (Array.isArray(data)) { - return Buffer.from(data); - } else { - throw new Error('Binary data must be binary (Buffer or Array)'); - } - } else { - throw new Error(`Unknown type: ${type}`); - } -} - -// Helper: Upload data to fileserver (mock implementation) -async function uploadToServer(data, fileserverUrl, correlationId) { - // This is a placeholder - in real implementation, this would upload to the fileserver - // and return the URL. For now, we return a mock URL. - return `${fileserverUrl}/mock-upload-${Date.now()}`; -} - -// SmartSend for JavaScript - Handles transport selection based on payload size -// Input format: [(dataname, data, type), ...] -async function SmartSend(subject, data, options = {}) { - const { - natsUrl = DEFAULT_NATS_URL, - fileserverUrl = DEFAULT_FILESERVER_URL, - sizeThreshold = DEFAULT_SIZE_THRESHOLD, - correlationId = uuidv4() - } = options; - - logTrace(correlationId, `Starting SmartSend for subject: ${subject}`); - - // Process each payload in the list - const payloadResults = []; - - for (let i = 0; i < data.length; i++) { - const tuple = data[i]; - if (tuple.length !== 3) { - throw new Error(`Payload at index ${i} must be a tuple of [dataname, data, type]`); - } - - const [dataname, payload_data, payload_type] = tuple; - - // Validate type - if (!PAYLOAD_TYPES.includes(payload_type)) { - throw new Error(`Unknown payload type '${payload_type}' for payload '${dataname}'. Supported types: ${PAYLOAD_TYPES.join(', ')}`); - } - - // Serialize data based on type - const payloadBytes = _serializeData(payload_data, payload_type, correlationId); - const payloadSize = payloadBytes.length; - - logTrace(correlationId, `Serialized payload '${dataname}' (type: ${payload_type}) size: ${payloadSize} bytes`); - - // Decision: Direct vs Link - if (payloadSize < sizeThreshold) { - // Direct path - Base64 encode and send via NATS - const payloadBase64 = encode(payloadBytes); - logTrace(correlationId, `Using direct transport for ${payloadSize} bytes`); - - payloadResults.push({ - dataname, - payload_type, - transport: 'direct', - payload: payloadBase64, - metadata: { - content_length: payloadSize.toString(), - format: 'arrow_ipc_stream' - } - }); - } else { - // Link path - Upload to HTTP server, send URL via NATS - logTrace(correlationId, `Using link transport, uploading to fileserver`); - - const url = await uploadToServer(payloadBytes, fileserverUrl, correlationId); - - payloadResults.push({ - dataname, - payload_type, - transport: 'link', - url: url, - metadata: { - content_length: payloadSize.toString(), - format: 'arrow_ipc_stream' - } - }); - } - } - - // Build the final message with all payloads - const allPayloads = payloadResults.map(p => ({ - dataname: p.dataname, - type: p.payload_type, - transport: p.transport, - ...(p.transport === 'direct' ? { payload: p.payload } : { url: p.url }), - metadata: p.metadata - })); - - // Create envelope and publish - const env = { - correlation_id: correlationId, - type: allPayloads[0].type, // Use first payload's type as envelope type - transport: allPayloads[0].transport, - payload: allPayloads.length === 1 && allPayloads[0].transport === 'direct' ? allPayloads[0].payload : undefined, - url: allPayloads.length === 1 && allPayloads[0].transport === 'link' ? allPayloads[0].url : undefined, - metadata: {}, - _payloads: allPayloads // Internal storage for multiple payloads - }; - - await publishMessage(natsUrl, subject, JSON.stringify(env), correlationId); - return env; -} - -// Helper: Publish message to NATS -async function publishMessage(natsUrl, subject, message, correlationId) { - const { connect } = require('nats'); - - try { - const nc = await connect({ servers: [natsUrl] }); - await nc.publish(subject, message); - logTrace(correlationId, `Message published to ${subject}`); - nc.close(); - } catch (error) { - logTrace(correlationId, `Failed to publish message: ${error.message}`); - throw error; - } -} - -// SmartReceive for JavaScript - Handles both direct and link transport -async function SmartReceive(msg, options = {}) { - const { - fileserverUrl = DEFAULT_FILESERVER_URL, - maxRetries = 5, - baseDelay = 100, - maxDelay = 5000 - } = options; - - const env = MessageEnvelope.fromJSON(msg.data); - - logTrace(env.correlation_id, `Processing received message`); - - if (env.transport === 'direct') { - logTrace(env.correlation_id, `Direct transport - decoding payload`); - - const payloadBytes = decode(env.payload); - const data = _deserializeData(payloadBytes, env.type, env.correlation_id, env.metadata); - - return { data, envelope: env }; - } else if (env.transport === 'link') { - logTrace(env.correlation_id, `Link transport - fetching from URL`); - - const data = await _fetchWithBackoff(env.url, maxRetries, baseDelay, maxDelay, env.correlation_id); - const result = _deserializeData(data, env.type, env.correlation_id, env.metadata); - - return { data: result, envelope: env }; - } else { - throw new Error(`Unknown transport type: ${env.transport}`); - } -} - -// Helper: Fetch with exponential backoff -async function _fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) { - let delay = baseDelay; - - for (let attempt = 1; attempt <= maxRetries; attempt++) { - try { - const response = await fetch(url); - if (response.ok) { - const buffer = await response.arrayBuffer(); - logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`); - return new Uint8Array(buffer); - } else { - throw new Error(`Failed to fetch: ${response.status}`); - } - } catch (error) { - logTrace(correlationId, `Attempt ${attempt} failed: ${error.message}`); - - if (attempt < maxRetries) { - await new Promise(resolve => setTimeout(resolve, delay)); - delay = Math.min(delay * 2, maxDelay); - } - } - } - - throw new Error(`Failed to fetch data after ${maxRetries} attempts`); -} - -// Helper: Deserialize data based on type -async function _deserializeData(data, type, correlationId, metadata) { - if (type === 'json') { - const jsonStr = new TextDecoder().decode(data); - return JSON.parse(jsonStr); - } else if (type === 'table') { - // Deserialize Arrow IPC stream to Table - const table = Arrow.Table.from(data); - return table; - } else if (type === 'binary') { - // Return binary binary data - return data; - } else { - throw new Error(`Unknown type: ${type}`); - } -} - -// Export functions -module.exports = { - SmartSend, - SmartReceive, - MessageEnvelope -}; \ No newline at end of file diff --git a/test/test_julia_to_julia_mix_receiver.jl b/test/test_julia_to_julia_mix_receiver.jl new file mode 100644 index 0000000..312c2fe --- /dev/null +++ b/test/test_julia_to_julia_mix_receiver.jl @@ -0,0 +1,213 @@ +#!/usr/bin/env julia +# Test script for mixed-content message testing +# Tests receiving a mix of text, json, table, image, audio, video, and binary data +# from Julia serviceA to Julia serviceB using NATSBridge.jl smartreceive +# +# This test demonstrates that any combination and any number of mixed content +# can be sent and received correctly. + +using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64 + +# Include the bridge module +include("../src/NATSBridge.jl") +using .NATSBridge + +# Configuration +const SUBJECT = "/NATSBridge_mix_test" +const NATS_URL = "nats.yiem.cc" +const FILESERVER_URL = "http://192.168.88.104:8080" + + +# ------------------------------------------------------------------------------------------------ # +# test mixed content transfer # +# ------------------------------------------------------------------------------------------------ # + + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] $message") +end + + +# Receiver: Listen for messages and verify mixed content handling +function test_mix_receive() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + log_trace("Received message on $(msg.subject)") + + # Use NATSBridge.smartreceive to handle the data + # API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay) + result = NATSBridge.smartreceive( + msg; + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + log_trace("Received $(length(result)) payloads") + + # Result is a list of (dataname, data, data_type) tuples + for (dataname, data, data_type) in result + log_trace("\n=== Payload: $dataname (type: $data_type) ===") + + # Handle different data types + if data_type == "text" + # Text data - should be a String + if isa(data, String) + log_trace(" Type: String") + log_trace(" Length: $(length(data)) characters") + + # Display first 200 characters + if length(data) > 200 + log_trace(" First 200 chars: $(data[1:200])...") + else + log_trace(" Content: $data") + end + + # Save to file + output_path = "./received_$dataname.txt" + write(output_path, data) + log_trace(" Saved to: $output_path") + else + log_trace(" ERROR: Expected String, got $(typeof(data))") + end + + elseif data_type == "dictionary" + # Dictionary data - should be JSON object + if isa(data, JSON.Object{String, Any}) + log_trace(" Type: Dict") + log_trace(" Keys: $(keys(data))") + + # Display nested content + for (key, value) in data + log_trace(" $key => $value") + end + + # Save to JSON file + output_path = "./received_$dataname.json" + json_str = JSON.json(data, 2) + write(output_path, json_str) + log_trace(" Saved to: $output_path") + else + log_trace(" ERROR: Expected Dict, got $(typeof(data))") + end + + elseif data_type == "table" + # Table data - should be a DataFrame + data = DataFrame(data) + if isa(data, DataFrame) + log_trace(" Type: DataFrame") + log_trace(" Dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns") + log_trace(" Columns: $(names(data))") + + # Display first few rows + log_trace(" First 5 rows:") + display(data[1:min(5, size(data, 1)), :]) + + # Save to Arrow file + output_path = "./received_$dataname.arrow" + io = IOBuffer() + Arrow.write(io, data) + write(output_path, take!(io)) + log_trace(" Saved to: $output_path") + else + log_trace(" ERROR: Expected DataFrame, got $(typeof(data))") + end + + elseif data_type == "image" + # Image data - should be Vector{UInt8} + if isa(data, Vector{UInt8}) + log_trace(" Type: Vector{UInt8} (binary)") + log_trace(" Size: $(length(data)) bytes") + + # Save to file + output_path = "./received_$dataname.bin" + write(output_path, data) + log_trace(" Saved to: $output_path") + else + log_trace(" ERROR: Expected Vector{UInt8}, got $(typeof(data))") + end + + elseif data_type == "audio" + # Audio data - should be Vector{UInt8} + if isa(data, Vector{UInt8}) + log_trace(" Type: Vector{UInt8} (binary)") + log_trace(" Size: $(length(data)) bytes") + + # Save to file + output_path = "./received_$dataname.bin" + write(output_path, data) + log_trace(" Saved to: $output_path") + else + log_trace(" ERROR: Expected Vector{UInt8}, got $(typeof(data))") + end + + elseif data_type == "video" + # Video data - should be Vector{UInt8} + if isa(data, Vector{UInt8}) + log_trace(" Type: Vector{UInt8} (binary)") + log_trace(" Size: $(length(data)) bytes") + + # Save to file + output_path = "./received_$dataname.bin" + write(output_path, data) + log_trace(" Saved to: $output_path") + else + log_trace(" ERROR: Expected Vector{UInt8}, got $(typeof(data))") + end + + elseif data_type == "binary" + # Binary data - should be Vector{UInt8} + if isa(data, Vector{UInt8}) + log_trace(" Type: Vector{UInt8} (binary)") + log_trace(" Size: $(length(data)) bytes") + + # Save to file + output_path = "./received_$dataname.bin" + write(output_path, data) + log_trace(" Saved to: $output_path") + else + log_trace(" ERROR: Expected Vector{UInt8}, got $(typeof(data))") + end + + else + log_trace(" ERROR: Unknown data type '$data_type'") + end + end + + # Summary + println("\n=== Verification Summary ===") + text_count = count(x -> x[3] == "text", result) + dict_count = count(x -> x[3] == "dictionary", result) + table_count = count(x -> x[3] == "table", result) + image_count = count(x -> x[3] == "image", result) + audio_count = count(x -> x[3] == "audio", result) + video_count = count(x -> x[3] == "video", result) + binary_count = count(x -> x[3] == "binary", result) + + log_trace("Text payloads: $text_count") + log_trace("Dictionary payloads: $dict_count") + log_trace("Table payloads: $table_count") + log_trace("Image payloads: $image_count") + log_trace("Audio payloads: $audio_count") + log_trace("Video payloads: $video_count") + log_trace("Binary payloads: $binary_count") + end + + # Keep listening for 2 minutes + sleep(120) + NATS.drain(conn) +end + + +# Run the test +println("Starting mixed-content transport test...") +println("Note: This receiver will wait for messages from the sender.") +println("Run test_julia_to_julia_mix_sender.jl first to send test data.") + +# Run receiver +println("\ntesting smartreceive for mixed content") +test_mix_receive() + +println("\nTest completed.") \ No newline at end of file diff --git a/test/test_julia_to_julia_mix_sender.jl b/test/test_julia_to_julia_mix_sender.jl new file mode 100644 index 0000000..bfb1471 --- /dev/null +++ b/test/test_julia_to_julia_mix_sender.jl @@ -0,0 +1,196 @@ +#!/usr/bin/env julia +# Test script for mixed-content message testing +# Tests sending a mix of text, json, table, image, audio, video, and binary data +# from Julia serviceA to Julia serviceB using NATSBridge.jl smartsend +# +# This test demonstrates that any combination and any number of mixed content +# can be sent and received correctly. + +using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64 + +# Include the bridge module +include("../src/NATSBridge.jl") +using .NATSBridge + +# Configuration +const SUBJECT = "/NATSBridge_mix_test" +const NATS_URL = "nats.yiem.cc" +const FILESERVER_URL = "http://192.168.88.104:8080" + +# Create correlation ID for tracing +correlation_id = string(uuid4()) + + +# ------------------------------------------------------------------------------------------------ # +# test mixed content transfer # +# ------------------------------------------------------------------------------------------------ # + + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + + +# File upload handler for plik server +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + # Get upload ID + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + # Upload file + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + + +# Helper: Create sample data for each type +function create_sample_data() + # Text data (small - direct transport) + text_data = "Hello! This is a test chat message. 🎉\nHow are you doing today? 😊" + + # Dictionary/JSON data (medium - could be direct or link) + dict_data = Dict( + "type" => "chat", + "sender" => "serviceA", + "receiver" => "serviceB", + "metadata" => Dict( + "timestamp" => string(Dates.now()), + "priority" => "high", + "tags" => ["urgent", "chat", "test"] + ), + "content" => Dict( + "text" => "This is a JSON-formatted chat message with nested structure.", + "format" => "markdown", + "mentions" => ["user1", "user2"] + ) + ) + + # Table data (DataFrame - small - direct transport) + table_data = DataFrame( + id = 1:10, + message = ["msg_$i" for i in 1:10], + sender = ["sender_$i" for i in 1:10], + timestamp = [string(Dates.now()) for _ in 1:10], + priority = rand(1:3, 10) + ) + + # Image data (small binary - direct transport) + # Create a simple 10x10 pixel PNG-like data (128 bytes header + 100 pixels = 112 bytes) + # Using simple RGB data (10*10*3 = 300 bytes of pixel data) + image_width = 10 + image_height = 10 + image_data = UInt8[] + # PNG header (simplified) + push!(image_data, 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A) + # Simple RGB data (RGBRGBRGB...) + for i in 1:image_width*image_height + push!(image_data, 0xFF, 0x00, 0x00) # Red pixel + end + + # Audio data (small binary - direct transport) + # Create a simple audio-like data (100 bytes) + audio_data = UInt8[rand(1:255) for _ in 1:100] + + # Video data (small binary - direct transport) + # Create a simple video-like data (150 bytes) + video_data = UInt8[rand(1:255) for _ in 1:150] + + # Binary data (small - direct transport) + binary_data = UInt8[rand(1:255) for _ in 1:200] + + return ( + text_data, + dict_data, + table_data, + image_data, + audio_data, + video_data, + binary_data + ) +end + + +# Sender: Send mixed content via smartsend +function test_mix_send() + # Create sample data + (text_data, dict_data, table_data, image_data, audio_data, video_data, binary_data) = create_sample_data() + + # Create payloads list - mixed content with different types + payloads = [ + ("chat_text", text_data, "text"), + ("chat_json", dict_data, "dictionary"), + ("chat_table", table_data, "table"), + ("user_image", image_data, "image"), + ("audio_clip", audio_data, "audio"), + ("video_clip", video_data, "video"), + ("binary_file", binary_data, "binary") + ] + + # Use smartsend with mixed content + env = NATSBridge.smartsend( + SUBJECT, + payloads, # List of (dataname, data, type) tuples + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, # 1MB threshold + correlation_id = correlation_id, + msg_purpose = "chat", + sender_name = "mix_sender", + receiver_name = "", + receiver_id = "", + reply_to = "", + reply_to_msg_id = "" + ) + + log_trace("Sent message with $(length(env.payloads)) payloads") + + # Log transport type for each payload + for (i, payload) in enumerate(env.payloads) + log_trace("Payload $i ('$payload.dataname'):") + log_trace(" Transport: $(payload.transport)") + log_trace(" Type: $(payload.type)") + log_trace(" Size: $(payload.size) bytes") + log_trace(" Encoding: $(payload.encoding)") + + if payload.transport == "link" + log_trace(" URL: $(payload.data)") + end + end + + # Summary + println("\n--- Transport Summary ---") + direct_count = count(p -> p.transport == "direct", env.payloads) + link_count = count(p -> p.transport == "link", env.payloads) + log_trace("Direct transport: $direct_count payloads") + log_trace("Link transport: $link_count payloads") +end + + +# Run the test +println("Starting mixed-content transport test...") +println("Correlation ID: $correlation_id") + +# Run sender +println("start smartsend for mixed content") +test_mix_send() + +println("\nTest completed.") +println("Note: Run test_julia_to_julia_mix_receiver.jl to receive the messages.") \ No newline at end of file diff --git a/test/test_julia_to_julia_text_receiver.jl b/test/test_julia_to_julia_text_receiver.jl new file mode 100644 index 0000000..0d974b9 --- /dev/null +++ b/test/test_julia_to_julia_text_receiver.jl @@ -0,0 +1,83 @@ +#!/usr/bin/env julia +# Test script for text transport testing +# Tests receiving 1 large and 1 small text from Julia serviceA to Julia serviceB +# Uses NATSBridge.jl smartreceive with "text" type + +using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP + +# Include the bridge module +include("../src/NATSBridge.jl") +using .NATSBridge + +# Configuration +const SUBJECT = "/NATSBridge_text_test" +const NATS_URL = "nats.yiem.cc" +const FILESERVER_URL = "http://192.168.88.104:8080" + + +# ------------------------------------------------------------------------------------------------ # +# test text transfer # +# ------------------------------------------------------------------------------------------------ # + + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] $message") +end + + +# Receiver: Listen for messages and verify text handling +function test_text_receive() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + log_trace("Received message on $(msg.subject)") + + # Use NATSBridge.smartreceive to handle the data + # API: smartreceive(msg, download_handler; max_retries, base_delay, max_delay) + result = NATSBridge.smartreceive( + msg; + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + # Result is a list of (dataname, data, data_type) tuples + for (dataname, data, data_type) in result + if isa(data, String) + log_trace("Received text '$dataname' of type $data_type") + log_trace(" Length: $(length(data)) characters") + + # Display first 100 characters + if length(data) > 100 + log_trace(" First 100 characters: $(data[1:100])...") + else + log_trace(" Content: $data") + end + + # Save to file + output_path = "./received_$dataname.txt" + write(output_path, data) + log_trace("Saved text to $output_path") + else + log_trace("Received unexpected data type for '$dataname': $(typeof(data))") + end + end + end + + # Keep listening for 10 seconds + sleep(120) + NATS.drain(conn) +end + + +# Run the test +println("Starting text transport test...") +println("Note: This receiver will wait for messages from the sender.") +println("Run test_julia_to_julia_text_sender.jl first to send test data.") + +# Run receiver +println("testing smartreceive for text") +test_text_receive() + +println("Test completed.") \ No newline at end of file diff --git a/test/test_julia_to_julia_text_sender.jl b/test/test_julia_to_julia_text_sender.jl new file mode 100644 index 0000000..83265be --- /dev/null +++ b/test/test_julia_to_julia_text_sender.jl @@ -0,0 +1,119 @@ +#!/usr/bin/env julia +# Test script for text transport testing +# Tests sending 1 large and 1 small text from Julia serviceA to Julia serviceB +# Uses NATSBridge.jl smartsend with "text" type + +using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP + +# Include the bridge module +include("../src/NATSBridge.jl") +using .NATSBridge + +# Configuration +const SUBJECT = "/NATSBridge_text_test" +const NATS_URL = "nats.yiem.cc" +const FILESERVER_URL = "http://192.168.88.104:8080" + +# Create correlation ID for tracing +correlation_id = string(uuid4()) + + +# ------------------------------------------------------------------------------------------------ # +# test text transfer # +# ------------------------------------------------------------------------------------------------ # + + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + + +# File upload handler for plik server +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + # Get upload ID + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + # Upload file + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + + +# Sender: Send text via smartsend +function test_text_send() + # Create a small text (will use direct transport) + small_text = "Hello, this is a small text message. Testing direct transport via NATS." + + # Create a large text (will use link transport if > 1MB) + # Generate a larger text (~2MB to ensure link transport) + large_text = join(["Line $i: This is a sample text line with some content to pad the size. " for i in 1:50000], "") + + # Test data 1: small text + data1 = ("small_text", small_text, "text") + + # Test data 2: large text + data2 = ("large_text", large_text, "text") + + # Use smartsend with text type + # For small text: will use direct transport (Base64 encoded UTF-8) + # For large text: will use link transport (uploaded to fileserver) + env = NATSBridge.smartsend( + SUBJECT, + [data1, data2], # List of (dataname, data, type) tuples + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, # 1MB threshold + correlation_id = correlation_id, + msg_purpose = "chat", + sender_name = "text_sender", + receiver_name = "", + receiver_id = "", + reply_to = "", + reply_to_msg_id = "" + ) + + log_trace("Sent message with $(length(env.payloads)) payloads") + + # Log transport type for each payload + for (i, payload) in enumerate(env.payloads) + log_trace("Payload $i ('$payload.dataname'):") + log_trace(" Transport: $(payload.transport)") + log_trace(" Type: $(payload.type)") + log_trace(" Size: $(payload.size) bytes") + log_trace(" Encoding: $(payload.encoding)") + + if payload.transport == "link" + log_trace(" URL: $(payload.data)") + end + end +end + + +# Run the test +println("Starting text transport test...") +println("Correlation ID: $correlation_id") + +# Run sender +println("start smartsend for text") +test_text_send() + +println("Test completed.") \ No newline at end of file