diff --git a/docs/implementation.md b/docs/implementation.md index b8d3ceb..18a8105 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -335,6 +335,258 @@ env, env_json_str = NATSBridge.smartsend( --- +### Mixed-Content Transport Examples + +The NATSBridge.jl library supports sending mixed content types in a single message, with automatic transport selection based on payload size. Small payloads (< 1MB threshold) use direct transport (Base64-encoded in NATS message), while large payloads (≥ 1MB) use link transport (uploaded to file server, URL sent via NATS). + +#### Julia Mixed-Content Example + +```julia +using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64 +include("../src/NATSBridge.jl") +using .NATSBridge + +# Configuration +const SUBJECT = "/natsbridge" +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" + +# Create sample data - mix of small and large payloads +text_data = "Hello! This is a test chat message. 🎉" + +dict_data = Dict( + "type" => "chat", + "sender" => "serviceA", + "content" => Dict("text" => "JSON-formatted message", "format" => "markdown") +) + +# Small arrow table (< 1MB) - direct transport +arrow_table_small = DataFrame( + id = 1:10, + name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"], + score = rand(50:100, 10), + active = rand([true, false], 10) +) + +# Large arrow table (≥ 1MB) - link transport +arrow_table_large = DataFrame( + id = 1:200_000, + name = ["user_$i" for i in 1:200_000], + score = rand(50:100, 200_000), + active = rand([true, false], 200_000) +) + +# Small jsontable (< 1MB) - direct transport +json_table_small = DataFrame( + id = 1:10, + name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"], + score = rand(50:100, 10) +) + +# Large jsontable (≥ 1MB) - link transport +json_table_large = DataFrame( + id = 1:1_500_000, + name = ["user_$i" for i in 1:1_500_000], + score = rand(50:100, 1_500_000) +) + +# Binary data (small) - direct transport +binary_data_small = UInt8[rand(1:255) for _ in 1:200] + +# Binary data (large) - link transport +binary_data_large = UInt8[rand(1:255) for _ in 1:1_500_000] + +# Read image file +file_path_small_image = "./test/small_image.jpg" +file_data_small_image = read(file_path_small_image) +filename_small_image = basename(file_path_small_image) + +file_path_large_image = "./test/large_image.png" +file_data_large_image = read(file_path_large_image) +filename_large_image = basename(file_path_large_image) + +# Create payloads list - mixed content with both small and large data +payloads = [ + # Small data (direct transport) + ("chat_text", text_data, "text"), + ("chat_json", dict_data, "dictionary"), + ("arrow_table_small", arrow_table_small, "arrowtable"), + ("json_table_small", json_table_small, "jsontable"), + (filename_small_image, file_data_small_image, "binary"), + + # Large data (link transport) + ("arrow_table_large", arrow_table_large, "arrowtable"), + ("json_table_large", json_table_large, "jsontable"), + (filename_large_image, file_data_large_image, "binary"), + ("binary_file_large", binary_data_large, "binary") +] + +# Use smartsend with mixed content +correlation_id = string(uuid4()) +sendinfo = NATSBridge.smartsend( + SUBJECT, + payloads; + broker_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserver_upload_handler = plik_oneshot_upload, + size_threshold = 1_000_000, # 1MB threshold + correlation_id = correlation_id, + msg_purpose = "chat", + sender_name = "mix_sender", + receiver_name = "", + receiver_id = "", + reply_to = "", + reply_to_msg_id = "", + is_publish = true +) + +env, env_json_str = sendinfo + +# Log transport type for each payload +for (i, payload) in enumerate(env.payloads) + println("Payload $i ('$payload.dataname'):") + println(" Transport: $(payload.transport)") + println(" Type: $(payload.payload_type)") + println(" Size: $(payload.size) bytes") + println(" Encoding: $(payload.encoding)") + + if payload.transport == "link" + println(" URL: $(payload.data)") + end +end + +# Summary +direct_count = count(p -> p.transport == "direct", env.payloads) +link_count = count(p -> p.transport == "link", env.payloads) +println("\n--- Transport Summary ---") +println("Direct transport: $direct_count payloads") +println("Link transport: $link_count payloads") +``` + +**Expected Output:** +``` +Payload 1 ('chat_text'): + Transport: direct + Type: text + Size: 38 bytes + Encoding: base64 + +Payload 2 ('chat_json'): + Transport: direct + Type: dictionary + Size: 156 bytes + Encoding: json + +Payload 3 ('arrow_table_small'): + Transport: direct + Type: arrowtable + Size: 1245 bytes + Encoding: arrow-ipc + +Payload 4 ('json_table_small'): + Transport: direct + Type: jsontable + Size: 892 bytes + Encoding: json + +Payload 5 ('small_image.jpg'): + Transport: direct + Type: binary + Size: 73269 bytes + Encoding: base64 + +Payload 6 ('arrow_table_large'): + Transport: link + Type: arrowtable + Size: 5242880 bytes + Encoding: arrow-ipc + URL: http://localhost:8080/file/ABC123/DEF456/arrow_table_large.arrow + +Payload 7 ('json_table_large'): + Transport: link + Type: jsontable + Size: 45678900 bytes + Encoding: json + URL: http://localhost:8080/file/GHI789/JKL012/json_table_large.json + +Payload 8 ('large_image.png'): + Transport: link + Type: binary + Size: 1168437 bytes + Encoding: base64 + URL: http://localhost:8080/file/MNO345/PQR678/large_image.png + +Payload 9 ('binary_file_large'): + Transport: link + Type: binary + Size: 1500000 bytes + Encoding: base64 + URL: http://localhost:8080/file/STU901/VWX234/binary_file_large.bin + +--- Transport Summary --- +Direct transport: 5 payloads +Link transport: 4 payloads +``` + +#### Receiver Example + +```julia +using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64 +include("../src/NATSBridge.jl") +using .NATSBridge + +const SUBJECT = "/natsbridge" +const NATS_URL = "nats://localhost:4222" + +conn = NATS.connect(NATS_URL) +NATS.subscribe(conn, SUBJECT) do msg + # Use NATSBridge.smartreceive to handle the data + result = NATSBridge.smartreceive( + msg; + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + println("Received $(length(result["payloads"])) payloads") + + # Process each payload + for (dataname, data, data_type) in result["payloads"] + println("\n=== Payload: $dataname (type: $data_type) ===") + + if data_type == "arrowtable" + if isa(data, Arrow.Table) + df = DataFrame(data) + println(" Type: Arrow.Table") + println(" Size: $(size(df, 1)) rows x $(size(df, 2)) columns") + end + elseif data_type == "jsontable" + if isa(data, Vector{Any}) + df = DataFrame(data) + println(" Type: Vector{Dict}") + println(" Size: $(length(data)) rows") + end + elseif data_type == "text" + if isa(data, String) + println(" Type: String") + println(" Length: $(length(data)) characters") + end + elseif data_type in ["image", "audio", "video", "binary"] + if isa(data, Vector{UInt8}) + println(" Type: Vector{UInt8} (binary)") + println(" Size: $(length(data)) bytes") + end + end + end +end + +# Keep listening +sleep(120) +NATS.drain(conn) +``` + +--- + ## Architecture ### Cross-Platform Claim-Check Pattern @@ -418,10 +670,12 @@ env, env_json_str = NATSBridge.smartsend(...) using Pkg Pkg.add("NATS") Pkg.add("Arrow") -Pkg.add("JSON3") +Pkg.add("JSON") Pkg.add("HTTP") Pkg.add("UUIDs") Pkg.add("Dates") +Pkg.add("PrettyPrinting") +Pkg.add("DataFrames") ``` ### JavaScript Dependencies (Node.js) @@ -905,6 +1159,8 @@ end #### plik_oneshot_upload Implementation +**Overload 1: Upload from binary data** + ```julia function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8}) # Get upload id @@ -940,6 +1196,57 @@ function plik_oneshot_upload(file_server_url::String, dataname::String, data::Ve end ``` +**Overload 2: Upload from file path** + +```julia +function plik_oneshot_upload(file_server_url::String, filepath::String) + # Get upload id + filename = basename(filepath) + url_getUploadID = "$file_server_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + response_json = JSON.parse(http_response.body) + + uploadid = response_json["id"] + uploadtoken = response_json["uploadToken"] + + # Upload file + url_upload = "$file_server_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + http_response = open(filepath, "r") do file_stream + form = HTTP.Form(Dict("file" => file_stream)) + + # Adding status_exception=false prevents 4xx/5xx from triggering 'catch' + HTTP.post(url_upload, headers, form; status_exception = false) + end + + if !isnothing(http_response) && http_response.status == 200 + # Success - response already logged by caller + else + error("Failed to upload file: server returned status $(http_response.status)") + end + response_json = JSON.parse(http_response.body) + fileid = response_json["id"] + + # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" + url = "$file_server_url/file/$uploadid/$fileid/$filename" + + return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end +``` + +**Usage Examples:** + +```julia +# Upload binary data +data = read("./test_image.jpg") +result = plik_oneshot_upload("http://localhost:8080", "my_image.jpg", data) + +# Upload file directly from disk +result = plik_oneshot_upload("http://localhost:8080", "./test_image.jpg") +``` + --- ### JavaScript Implementation