# NATSBridge Walkthrough A comprehensive guide to building real-world applications with NATSBridge. ## Table of Contents 1. [Introduction](#introduction) 2. [Architecture Overview](#architecture-overview) 3. [Building a Chat Application](#building-a-chat-application) 4. [Building a File Transfer System](#building-a-file-transfer-system) 5. [Building a Streaming Data Pipeline](#building-a-streaming-data-pipeline) 6. [Performance Optimization](#performance-optimimization) 7. [Best Practices](#best-practices) --- ## Introduction This walkthrough will guide you through building several real-world applications using NATSBridge. We'll cover: - Chat applications with rich media support - File transfer systems with claim-check pattern - Streaming data pipelines Each section builds on the previous one, gradually increasing in complexity. --- ## Architecture Overview ### System Components ``` ┌─────────────────────────────────────────────────────────────────┐ │ NATSBridge Architecture │ ├─────────────────────────────────────────────────────────────────┤ │ ┌──────────────┐ ┌──────────────┐ │ │ │ Julia │ │ NATS │ │ │ │ (NATS.jl) │◄──►│ Server │ │ │ └──────────────┘ └──────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌──────────────────────────────────────┐ │ │ │ File Server │ │ │ │ (HTTP Upload) │ │ │ └──────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` ### Message Flow 1. **Sender** creates a message envelope with payloads 2. **NATSBridge** serializes and encodes payloads 3. **Transport Decision**: Small payloads go directly to NATS, large payloads are uploaded to file server 4. **NATS** routes messages to subscribers 5. **Receiver** fetches payloads (from NATS or file server) 6. **NATSBridge** deserializes and decodes payloads --- ## Building a Chat Application Let's build a full-featured chat application that supports text, images, and file attachments. ### Step 1: Set Up the Project ```bash # Create project directory mkdir -p chat-app/src cd chat-app # Create configuration file cat > config.json << 'EOF' { "nats_url": "nats://localhost:4222", "fileserver_url": "http://localhost:8080", "size_threshold": 1048576 } EOF ``` ### Step 2: Create the Chat Interface (Julia) ```julia # src/chat_ui.jl using NATSBridge, NATS struct ChatUI messages::Vector{Dict} current_room::String end function ChatUI() ChatUI(Dict[], "") end function send_message(ui::ChatUI, message_input::String, selected_file::Union{Nothing, String}) data = [] # Add text message if !isempty(message_input) push!(data, ("text", message_input, "text")) end # Add file if selected if selected_file !== nothing file_data = read(selected_file) file_type = get_file_type(selected_file) push!(data, ("attachment", file_data, file_type)) end return data end function get_file_type(filename::String)::String if endswith(filename, ".png") || endswith(filename, ".jpg") return "image" elseif endswith(filename, ".mp3") || endswith(filename, ".wav") return "audio" elseif endswith(filename, ".mp4") || endswith(filename, ".avi") return "video" else return "binary" end end function add_message(ui::ChatUI, user::String, text::String, attachment::Union{Nothing, Dict}) push!(ui.messages, Dict( "user" => user, "text" => text, "attachment" => attachment )) end ``` ### Step 3: Create the Message Handler ```julia # src/chat_handler.jl using NATSBridge, NATS struct ChatHandler nats::NATS.Connection ui::ChatUI end function ChatHandler(nats_connection::NATS.Connection) ChatHandler(nats_connection, ChatUI()) end function start(handler::ChatHandler) # Subscribe to chat rooms rooms = ["general", "tech", "random"] for room in rooms NATS.subscribe(handler.nats, "/chat/$room") do msg handle_message(handler, msg) end end println("Chat handler started") end function handle_message(handler::ChatHandler, msg::NATS.Msg) env = smartreceive(msg, fileserver_download_handler=_fetch_with_backoff) # Extract sender info from envelope sender = get(env, "sender_name", "Anonymous") # Process each payload for (dataname, data, type) in env["payloads"] if type == "text" add_message(handler.ui, sender, data, nothing) elseif type == "image" # Convert to data URL for display base64_data = base64encode(data) attachment = Dict( "type" => "image", "data" => "data:image/png;base64,$base64_data" ) add_message(handler.ui, sender, "", attachment) else # For other types, use file server URL attachment = Dict("type" => type, "data" => data) add_message(handler.ui, sender, "", attachment) end end end function download_file(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} # Implement exponential backoff for file server downloads # Return downloaded data as Vector{UInt8} end ``` ### Step 4: Run the Application ```bash # Start NATS docker run -p 4222:4222 nats:latest # Start file server mkdir -p /tmp/fileserver python3 -m http.server 8080 --directory /tmp/fileserver # Run chat app julia src/chat_ui.jl julia src/chat_handler.jl ``` --- ## Building a File Transfer System Let's build a file transfer system that handles large files efficiently. ### Step 1: File Upload Service (Julia) ```julia # src/file_upload_service.jl using NATSBridge, HTTP struct FileUploadService broker_url::String fileserver_url::String end function FileUploadService(broker_url::String, fileserver_url::String) FileUploadService(broker_url, fileserver_url) end function upload_file(service::FileUploadService, file_path::String, recipient::String)::Dict file_data = read(file_path) file_name = basename(file_path) data = [("file", file_data, "binary")] env, env_json_str = smartsend( "/files/$recipient", data, broker_url=service.broker_url, fileserver_url=service.fileserver_url ) return env end function upload_large_file(service::FileUploadService, file_path::String, recipient::String)::Dict file_size = stat(file_path).size if file_size > 100 * 1024 * 1024 # > 100MB println("File too large for direct upload, using streaming...") return stream_upload(service, file_path, recipient) end return upload_file(service, file_path, recipient) end function stream_upload(service::FileUploadService, file_path::String, recipient::String)::Dict # Implement streaming upload to file server # This would require a more sophisticated file server # For now, we'll use the standard upload return upload_file(service, file_path, recipient) end ``` ### Step 2: File Download Service (Julia) ```julia # src/file_download_service.jl using NATSBridge struct FileDownloadService nats_url::String end function FileDownloadService(nats_url::String) FileDownloadService(nats_url) end function download_file(service::FileDownloadService, msg::NATS.Msg, sender::String, download_id::String) # Subscribe to sender's file channel env = smartreceive(msg, fileserver_download_handler=fetch_from_url) # Process each payload for (dataname, data, type) in env["payloads"] if type == "binary" file_path = "/downloads/$dataname" write(file_path, data) println("File saved to $file_path") end end end function fetch_from_url(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} # Fetch data from URL with exponential backoff # Return downloaded data as Vector{UInt8} end ``` ### Step 3: File Transfer CLI (Julia) ```julia # src/cli.jl using NATSBridge, Readlines, FileIO function main() config = JSON3.read(read("config.json", String)) println("File Transfer System") println("====================") println("1. Upload file") println("2. Download file") println("3. List pending downloads") print("Enter choice: ") choice = readline() if choice == "1" upload_file_cli(config) elseif choice == "2" download_file_cli(config) end end function upload_file_cli(config) print("Enter file path: ") file_path = readline() print("Enter recipient: ") recipient = readline() file_service = FileUploadService(config.nats_url, config.fileserver_url) try env = upload_file(file_service, file_path, recipient) println("Upload successful!") println("File ID: $(env["payloads"][1][1])") catch error println("Upload failed: $(error)") end end function download_file_cli(config) print("Enter sender: ") sender = readline() file_service = FileDownloadService(config.nats_url) try download_file(file_service, sender) println("Download complete!") catch error println("Download failed: $(error)") end end main() ``` --- ## Building a Streaming Data Pipeline Let's build a data pipeline that processes streaming data from sensors. ### Step 1: Sensor Data Model (Julia) ```julia # src/sensor_data.jl using Dates, DataFrames struct SensorReading sensor_id::String timestamp::String value::Float64 unit::String metadata::Dict{String, Any} end function SensorReading(sensor_id::String, value::Float64, unit::String, metadata::Dict{String, Any}=Dict()) SensorReading( sensor_id, ISODateTime(now(), Dates.Second) |> string, value, unit, metadata ) end struct SensorBatch readings::Vector{SensorReading} end function SensorBatch() SensorBatch(SensorReading[]) end function add_reading(batch::SensorBatch, reading::SensorReading) push!(batch.readings, reading) end function to_dataframe(batch::SensorBatch)::DataFrame data = Dict{String, Any}() data["sensor_id"] = [r.sensor_id for r in batch.readings] data["timestamp"] = [r.timestamp for r in batch.readings] data["value"] = [r.value for r in batch.readings] data["unit"] = [r.unit for r in batch.readings] return DataFrame(data) end ``` ### Step 2: Sensor Sender (Julia) ```julia # src/sensor_sender.jl using NATSBridge, Dates, Random struct SensorSender broker_url::String fileserver_url::String end function SensorSender(broker_url::String, fileserver_url::String) SensorSender(broker_url, fileserver_url) end function send_reading(sender::SensorSender, sensor_id::String, value::Float64, unit::String) reading = SensorReading(sensor_id, value, unit) data = [("reading", reading.metadata, "dictionary")] # Default: is_publish=True (automatically publishes to NATS) smartsend( "/sensors/$sensor_id", data, broker_url=sender.broker_url, fileserver_url=sender.fileserver_url ) end function prepare_message_only(sender::SensorSender, sensor_id::String, value::Float64, unit::String) """Prepare a message without publishing (is_publish=False).""" reading = SensorReading(sensor_id, value, unit) data = [("reading", reading.metadata, "dictionary")] # With is_publish=False, returns (env, env_json_str) without publishing env, env_json_str = smartsend( "/sensors/$sensor_id/prepare", data, broker_url=sender.broker_url, fileserver_url=sender.fileserver_url, is_publish=false ) # Now you can publish manually using NATS request-reply pattern # nc.request(subject, env_json_str, reply_to=reply_to_topic) return env, env_json_str end function send_batch(sender::SensorSender, readings::Vector{SensorReading}) batch = SensorBatch() for reading in readings add_reading(batch, reading) end df = to_dataframe(batch) # Convert to Arrow IPC format import Arrow table = Arrow.Table(df) # Serialize to Arrow IPC import IOBuffer buf = IOBuffer() Arrow.write(buf, table) arrow_data = take!(buf) # Send based on size if length(arrow_data) < 1048576 # < 1MB data = [("batch", arrow_data, "table")] smartsend( "/sensors/batch", data, broker_url=sender.broker_url, fileserver_url=sender.fileserver_url ) else # Upload to file server data = [("batch", arrow_data, "table")] smartsend( "/sensors/batch", data, broker_url=sender.broker_url, fileserver_url=sender.fileserver_url ) end end ``` ### Step 3: Sensor Receiver (Julia) ```julia # src/sensor_receiver.jl using NATSBridge, Arrow, DataFrames, IOBuffer struct SensorReceiver fileserver_download_handler::Function end function SensorReceiver(download_handler::Function) SensorReceiver(download_handler) end function process_reading(receiver::SensorReceiver, msg::NATS.Msg) env = smartreceive(msg, receiver.fileserver_download_handler) for (dataname, data, data_type) in env["payloads"] if data_type == "dictionary" # Process dictionary payload println("Received: $dataname = $data") elseif data_type == "table" # Deserialize Arrow IPC buf = IOBuffer(data) table = Arrow.read(buf) df = DataFrame(table) println("Received batch with $(nrow(df)) readings") println(df) end end end ``` --- ## Performance Optimization ### 1. Batch Processing ```julia # Batch multiple readings into a single message function send_batch_readings(sender::SensorSender, readings::Vector{Tuple{String, Float64, String}}) batch = SensorBatch() for (sensor_id, value, unit) in readings reading = SensorReading(sensor_id, value, unit) add_reading(batch, reading) end df = to_dataframe(batch) # Convert to Arrow IPC import Arrow table = Arrow.Table(df) # Serialize to Arrow IPC import IOBuffer buf = IOBuffer() Arrow.write(buf, table) arrow_data = take!(buf) # Send as single message smartsend( "/sensors/batch", [("batch", arrow_data, "table")], broker_url=sender.broker_url ) end ``` ### 2. Connection Reuse ```julia # Reuse NATS connections function create_connection_pool() connections = Dict{String, NATS.Connection}() function get_connection(nats_url::String)::NATS.Connection if !haskey(connections, nats_url) connections[nats_url] = NATS.connect(nats_url) end return connections[nats_url] end function close_all() for conn in values(connections) NATS.drain(conn) end empty!(connections) end return (get_connection= get_connection, close_all=close_all) end ``` ### 3. Caching ```julia # Cache file server responses using Base.Threads const file_cache = Dict{String, Vector{UInt8}}() function fetch_with_caching(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8} if haskey(file_cache, url) return file_cache[url] end # Fetch from file server data = _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id) # Cache the result file_cache[url] = data return data end ``` --- ## Best Practices ### 1. Error Handling ```julia function safe_smartsend(subject::String, data::Vector{Tuple}, kwargs...) try return smartsend(subject, data; kwargs...) catch error println("Failed to send message: $(error)") return nothing end end ``` ### 2. Logging ```julia using Logging function log_send(subject::String, data::Vector{Tuple}, correlation_id::String) @info "Sending to $subject: $(length(data)) payloads, correlation_id=$correlation_id" end function log_receive(correlation_id::String, num_payloads::Int) @info "Received message: $num_payloads payloads, correlation_id=$correlation_id" end ``` ### 3. Rate Limiting ```julia using Dates, Collections struct RateLimiter max_requests::Int time_window::Float64 requests::Deque{Float64} end function RateLimiter(max_requests::Int, time_window::Float64) RateLimiter(max_requests, time_window, Deque{Float64}()) end function allow(limiter::RateLimiter)::Bool now = time() # Remove old requests while !isempty(limiter.requests) && limiter.requests[1] < now - limiter.time_window popfirst!(limiter.requests) end if length(limiter.requests) >= limiter.max_requests return false end push!(limiter.requests, now) return true end ``` --- ## Conclusion This walkthrough covered: - Building a chat application with rich media support - Building a file transfer system with claim-check pattern - Building a streaming data pipeline for sensor data For more information, check the [API documentation](../src/README.md) and [test examples](../test/). --- ## License MIT