# NATSBridge.jl Tutorial A comprehensive tutorial for learning how to use NATSBridge.jl for bi-directional communication between Julia and JavaScript services using NATS. ## Table of Contents 1. [What is NATSBridge.jl?](#what-is-natsbridgejl) 2. [Key Concepts](#key-concepts) 3. [Installation](#installation) 4. [Basic Usage](#basic-usage) 5. [Payload Types](#payload-types) 6. [Transport Strategies](#transport-strategies) 7. [Advanced Features](#advanced-features) 8. [Complete Examples](#complete-examples) --- ## What is NATSBridge.jl? NATSBridge.jl is a Julia module that provides a high-level API for sending and receiving data across network boundaries using NATS as the message bus. It implements the **Claim-Check pattern** for handling large payloads efficiently. ### Core Features - **Bi-directional communication**: Julia ↔ JavaScript - **Smart transport selection**: Automatic direct vs link transport based on payload size - **Multi-payload support**: Send multiple payloads of different types in a single message - **Claim-check pattern**: Upload large files to HTTP server, send only URLs via NATS - **Type-aware serialization**: Different serialization strategies for different data types --- ## Key Concepts ### 1. msgEnvelope_v1 (Message Envelope) The `msgEnvelope_v1` structure provides a comprehensive message format for bidirectional communication: ```julia struct msgEnvelope_v1 correlationId::String # Unique identifier to track messages msgId::String # This message id timestamp::String # Message published timestamp sendTo::String # Topic/subject the sender sends to msgPurpose::String # Purpose (ACK | NACK | updateStatus | shutdown | chat) senderName::String # Sender name (e.g., "agent-wine-web-frontend") senderId::String # Sender id (uuid4) receiverName::String # Message receiver name (e.g., "agent-backend") receiverId::String # Message receiver id (uuid4 or nothing for broadcast) replyTo::String # Topic to reply to replyToMsgId::String # Message id this message is replying to brokerURL::String # NATS server address metadata::Dict{String, Any} payloads::AbstractArray{msgPayload_v1} # Multiple payloads stored here end ``` ### 2. msgPayload_v1 (Payload Structure) The `msgPayload_v1` structure provides flexible payload handling: ```julia struct msgPayload_v1 id::String # Id of this payload (e.g., "uuid4") dataname::String # Name of this payload (e.g., "login_image") type::String # "text | dictionary | table | image | audio | video | binary" transport::String # "direct | link" encoding::String # "none | json | base64 | arrow-ipc" size::Integer # Data size in bytes data::Any # Payload data in case of direct transport or a URL in case of link metadata::Dict{String, Any} # Dict("checksum" => "sha256_hash", ...) end ``` ### 3. Standard API Format The system uses a **standardized list-of-tuples format** for all payload operations: ```julia # Input format for smartsend (always a list of tuples with type info) [(dataname1, data1, type1), (dataname2, data2, type2), ...] # Output format for smartreceive (always returns a list of tuples) [(dataname1, data1, type1), (dataname2, data2, type2), ...] ``` **Important**: Even when sending a single payload, you must wrap it in a list. --- ## Installation ```julia using Pkg Pkg.add("NATS") Pkg.add("JSON") Pkg.add("Arrow") Pkg.add("HTTP") Pkg.add("UUIDs") Pkg.add("Dates") Pkg.add("Base64") Pkg.add("PrettyPrinting") Pkg.add("DataFrames") ``` Then include the NATSBridge module: ```julia include("NATSBridge.jl") using .NATSBridge ``` --- ## Basic Usage ### Sending Data (smartsend) ```julia using NATSBridge # Send a simple dictionary data = Dict("key" => "value") env = NATSBridge.smartsend("my.subject", [("dataname1", data, "dictionary")]) ``` ### Receiving Data (smartreceive) ```julia using NATSBridge # Subscribe to a NATS subject NATS.subscribe("my.subject") do msg # Process the message result = NATSBridge.smartreceive( msg, max_retries = 5, base_delay = 100, max_delay = 5000 ) # result is a list of (dataname, data, type) tuples for (dataname, data, type) in result println("Received $dataname of type $type") println("Data: $data") end end ``` --- ## Payload Types NATSBridge.jl supports the following payload types: | Type | Description | Serialization | |------|-------------|---------------| | `text` | Plain text | UTF-8 encoding | | `dictionary` | JSON-serializable data (Dict, NamedTuple) | JSON | | `table` | Tabular data (DataFrame, array of structs) | Apache Arrow IPC | | `image` | Image data (Bitmap, PNG/JPG bytes) | Binary | | `audio` | Audio data (WAV, MP3 bytes) | Binary | | `video` | Video data (MP4, AVI bytes) | Binary | | `binary` | Generic binary data | Binary | --- ## Transport Strategies NATSBridge.jl automatically selects the appropriate transport strategy based on payload size: ### Direct Transport (< 1MB) Small payloads are encoded as Base64 and sent directly over NATS. ```julia # Small data (< 1MB) - uses direct transport small_data = rand(1000) # ~8KB env = NATSBridge.smartsend("small", [("data", small_data, "table")]) ``` ### Link Transport (≥ 1MB) Large payloads are uploaded to an HTTP file server, and only the URL is sent via NATS. ```julia # Large data (≥ 1MB) - uses link transport large_data = rand(10_000_000) # ~80MB env = NATSBridge.smartsend("large", [("data", large_data, "table")]) ``` --- ## Complete Examples ### Example 1: Text Message **Sender:** ```julia using NATSBridge using UUIDs const SUBJECT = "/NATSBridge_text_test" const NATS_URL = "nats://localhost:4222" const FILESERVER_URL = "http://localhost:8080" function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} url_getUploadID = "$fileserver_url/upload" headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) responseJson = JSON.parse(String(httpResponse.body)) uploadid = responseJson["id"] uploadtoken = responseJson["uploadToken"] file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") url_upload = "$fileserver_url/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] form = HTTP.Form(Dict("file" => file_multipart)) httpResponse = HTTP.post(url_upload, headers, form) responseJson = JSON.parse(String(httpResponse.body)) fileid = responseJson["id"] url = "$fileserver_url/file/$uploadid/$fileid/$dataname" return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end function test_text_send() small_text = "Hello, this is a small text message." large_text = join(["Line $i: " for i in 1:50000], "") data1 = ("small_text", small_text, "text") data2 = ("large_text", large_text, "text") env = NATSBridge.smartsend( SUBJECT, [data1, data2], nats_url = NATS_URL, fileserver_url = FILESERVER_URL, fileserverUploadHandler = plik_upload_handler, size_threshold = 1_000_000, correlation_id = string(uuid4()), msg_purpose = "chat", sender_name = "text_sender" ) end ``` **Receiver:** ```julia using NATSBridge const SUBJECT = "/NATSBridge_text_test" const NATS_URL = "nats://localhost:4222" function test_text_receive() conn = NATS.connect(NATS_URL) NATS.subscribe(conn, SUBJECT) do msg result = NATSBridge.smartreceive( msg, max_retries = 5, base_delay = 100, max_delay = 5000 ) for (dataname, data, data_type) in result if data_type == "text" println("Received text: $data") write("./received_$dataname.txt", data) end end end sleep(120) NATS.drain(conn) end ``` ### Example 2: Dictionary (JSON) Message **Sender:** ```julia using NATSBridge using UUIDs const SUBJECT = "/NATSBridge_dict_test" const NATS_URL = "nats://localhost:4222" const FILESERVER_URL = "http://localhost:8080" function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} url_getUploadID = "$fileserver_url/upload" headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) responseJson = JSON.parse(String(httpResponse.body)) uploadid = responseJson["id"] uploadtoken = responseJson["uploadToken"] file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") url_upload = "$fileserver_url/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] form = HTTP.Form(Dict("file" => file_multipart)) httpResponse = HTTP.post(url_upload, headers, form) responseJson = JSON.parse(String(httpResponse.body)) fileid = responseJson["id"] url = "$fileserver_url/file/$uploadid/$fileid/$dataname" return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end function test_dict_send() small_dict = Dict("name" => "Alice", "age" => 30) large_dict = Dict("ids" => collect(1:50000), "names" => ["User_$i" for i in 1:50000]) data1 = ("small_dict", small_dict, "dictionary") data2 = ("large_dict", large_dict, "dictionary") env = NATSBridge.smartsend( SUBJECT, [data1, data2], nats_url = NATS_URL, fileserver_url = FILESERVER_URL, fileserverUploadHandler = plik_upload_handler, size_threshold = 1_000_000, correlation_id = string(uuid4()), msg_purpose = "chat" ) end ``` **Receiver:** ```julia using NATSBridge const SUBJECT = "/NATSBridge_dict_test" const NATS_URL = "nats://localhost:4222" function test_dict_receive() conn = NATS.connect(NATS_URL) NATS.subscribe(conn, SUBJECT) do msg result = NATSBridge.smartreceive( msg, max_retries = 5, base_delay = 100, max_delay = 5000 ) for (dataname, data, data_type) in result if data_type == "dictionary" println("Received dictionary: $data") write("./received_$dataname.json", JSON.json(data, 2)) end end end sleep(120) NATS.drain(conn) end ``` ### Example 3: DataFrame (Table) Message **Sender:** ```julia using NATSBridge using DataFrames using UUIDs const SUBJECT = "/NATSBridge_table_test" const NATS_URL = "nats://localhost:4222" const FILESERVER_URL = "http://localhost:8080" function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} url_getUploadID = "$fileserver_url/upload" headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) responseJson = JSON.parse(String(httpResponse.body)) uploadid = responseJson["id"] uploadtoken = responseJson["uploadToken"] file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") url_upload = "$fileserver_url/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] form = HTTP.Form(Dict("file" => file_multipart)) httpResponse = HTTP.post(url_upload, headers, form) responseJson = JSON.parse(String(httpResponse.body)) fileid = responseJson["id"] url = "$fileserver_url/file/$uploadid/$fileid/$dataname" return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end function test_table_send() small_df = DataFrame(id = 1:10, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) large_df = DataFrame(id = 1:50000, name = ["User_$i" for i in 1:50000], score = rand(1:100, 50000)) data1 = ("small_table", small_df, "table") data2 = ("large_table", large_df, "table") env = NATSBridge.smartsend( SUBJECT, [data1, data2], nats_url = NATS_URL, fileserver_url = FILESERVER_URL, fileserverUploadHandler = plik_upload_handler, size_threshold = 1_000_000, correlation_id = string(uuid4()), msg_purpose = "chat" ) end ``` **Receiver:** ```julia using NATSBridge using DataFrames const SUBJECT = "/NATSBridge_table_test" const NATS_URL = "nats://localhost:4222" function test_table_receive() conn = NATS.connect(NATS_URL) NATS.subscribe(conn, SUBJECT) do msg result = NATSBridge.smartreceive( msg, max_retries = 5, base_delay = 100, max_delay = 5000 ) for (dataname, data, data_type) in result if data_type == "table" data = DataFrame(data) println("Received DataFrame with $(size(data, 1)) rows") display(data[1:min(5, size(data, 1)), :]) end end end sleep(120) NATS.drain(conn) end ``` ### Example 4: Mixed Content (Chat with Text, Image, Audio) **Sender:** ```julia using NATSBridge using DataFrames using UUIDs const SUBJECT = "/NATSBridge_mix_test" const NATS_URL = "nats://localhost:4222" const FILESERVER_URL = "http://localhost:8080" function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} url_getUploadID = "$fileserver_url/upload" headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) responseJson = JSON.parse(String(httpResponse.body)) uploadid = responseJson["id"] uploadtoken = responseJson["uploadToken"] file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") url_upload = "$fileserver_url/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] form = HTTP.Form(Dict("file" => file_multipart)) httpResponse = HTTP.post(url_upload, headers, form) responseJson = JSON.parse(String(httpResponse.body)) fileid = responseJson["id"] url = "$fileserver_url/file/$uploadid/$fileid/$dataname" return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end function test_mix_send() # Text data text_data = "Hello! This is a test chat message. 🎉" # Dictionary data dict_data = Dict("type" => "chat", "sender" => "serviceA") # Small table data table_data_small = DataFrame(id = 1:10, name = ["msg_$i" for i in 1:10]) # Large table data (link transport) table_data_large = DataFrame(id = 1:150_000, name = ["msg_$i" for i in 1:150_000]) # Small image data (direct transport) image_data = UInt8[rand(1:255) for _ in 1:100] # Large image data (link transport) large_image_data = UInt8[rand(1:255) for _ in 1:1_500_000] # Small audio data (direct transport) audio_data = UInt8[rand(1:255) for _ in 1:100] # Large audio data (link transport) large_audio_data = UInt8[rand(1:255) for _ in 1:1_500_000] # Small video data (direct transport) video_data = UInt8[rand(1:255) for _ in 1:150] # Large video data (link transport) large_video_data = UInt8[rand(1:255) for _ in 1:1_500_000] # Small binary data (direct transport) binary_data = UInt8[rand(1:255) for _ in 1:200] # Large binary data (link transport) large_binary_data = UInt8[rand(1:255) for _ in 1:1_500_000] # Create payloads list - mixed content payloads = [ # Small data (direct transport) ("chat_text", text_data, "text"), ("chat_json", dict_data, "dictionary"), ("chat_table_small", table_data_small, "table"), # Large data (link transport) ("chat_table_large", table_data_large, "table"), ("user_image_large", large_image_data, "image"), ("audio_clip_large", large_audio_data, "audio"), ("video_clip_large", large_video_data, "video"), ("binary_file_large", large_binary_data, "binary") ] env = NATSBridge.smartsend( SUBJECT, payloads, nats_url = NATS_URL, fileserver_url = FILESERVER_URL, fileserverUploadHandler = plik_upload_handler, size_threshold = 1_000_000, correlation_id = string(uuid4()), msg_purpose = "chat", sender_name = "mix_sender" ) end ``` **Receiver:** ```julia using NATSBridge using DataFrames const SUBJECT = "/NATSBridge_mix_test" const NATS_URL = "nats://localhost:4222" function test_mix_receive() conn = NATS.connect(NATS_URL) NATS.subscribe(conn, SUBJECT) do msg result = NATSBridge.smartreceive( msg, max_retries = 5, base_delay = 100, max_delay = 5000 ) println("Received $(length(result)) payloads") for (dataname, data, data_type) in result println("\n=== Payload: $dataname (type: $data_type) ===") if data_type == "text" println(" Type: String") println(" Length: $(length(data)) characters") elseif data_type == "dictionary" println(" Type: JSON Object") println(" Keys: $(keys(data))") elseif data_type == "table" data = DataFrame(data) println(" Type: DataFrame") println(" Dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns") elseif data_type == "image" println(" Type: Vector{UInt8}") println(" Size: $(length(data)) bytes") write("./received_$dataname.bin", data) elseif data_type == "audio" println(" Type: Vector{UInt8}") println(" Size: $(length(data)) bytes") write("./received_$dataname.bin", data) elseif data_type == "video" println(" Type: Vector{UInt8}") println(" Size: $(length(data)) bytes") write("./received_$dataname.bin", data) elseif data_type == "binary" println(" Type: Vector{UInt8}") println(" Size: $(length(data)) bytes") write("./received_$dataname.bin", data) end end end sleep(120) NATS.drain(conn) end ``` --- ## Best Practices 1. **Always wrap payloads in a list** - Even for single payloads: `[("dataname", data, "type")]` 2. **Use appropriate transport** - Let NATSBridge handle size-based routing (default 1MB threshold) 3. **Customize size threshold** - Use `size_threshold` parameter to adjust the direct/link split 4. **Provide fileserver handler** - Implement `fileserverUploadHandler` for link transport 5. **Include correlation IDs** - Track messages across distributed systems 6. **Handle errors** - Implement proper error handling for network failures 7. **Close connections** - Ensure NATS connections are properly closed using `NATS.drain()` --- ## Conclusion NATSBridge.jl provides a powerful abstraction for bi-directional communication between Julia and JavaScript services. By understanding the key concepts and following the best practices, you can build robust, scalable applications that leverage the full power of NATS messaging. For more information, see: - [`docs/architecture.md`](./architecture.md) - Detailed architecture documentation - [`docs/implementation.md`](./implementation.md) - Implementation details