18 KiB
18 KiB
NATSBridge Walkthrough
A comprehensive guide to building real-world applications with NATSBridge.
Table of Contents
- Introduction
- Architecture Overview
- Building a Chat Application
- Building a File Transfer System
- Building a Streaming Data Pipeline
- Performance Optimization
- Best Practices
Introduction
This walkthrough will guide you through building several real-world applications using NATSBridge. We'll cover:
- Chat applications with rich media support
- File transfer systems with claim-check pattern
- Streaming data pipelines
Each section builds on the previous one, gradually increasing in complexity.
Architecture Overview
System Components
┌─────────────────────────────────────────────────────────────────┐
│ NATSBridge Architecture │
├─────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Julia │ │ NATS │ │
│ │ (NATS.jl) │◄──►│ Server │ │
│ └──────────────┘ └──────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ File Server │ │
│ │ (HTTP Upload) │ │
│ └──────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Message Flow
- Sender creates a message envelope with payloads
- NATSBridge serializes and encodes payloads
- Transport Decision: Small payloads go directly to NATS, large payloads are uploaded to file server
- NATS routes messages to subscribers
- Receiver fetches payloads (from NATS or file server)
- NATSBridge deserializes and decodes payloads
Building a Chat Application
Let's build a full-featured chat application that supports text, images, and file attachments.
Step 1: Set Up the Project
# Create project directory
mkdir -p chat-app/src
cd chat-app
# Create configuration file
cat > config.json << 'EOF'
{
"nats_url": "nats://localhost:4222",
"fileserver_url": "http://localhost:8080",
"size_threshold": 1048576
}
EOF
Step 2: Create the Chat Interface (Julia)
# src/chat_ui.jl
using NATSBridge, NATS
struct ChatUI
messages::Vector{Dict}
current_room::String
end
function ChatUI()
ChatUI(Dict[], "")
end
function send_message(ui::ChatUI, message_input::String, selected_file::Union{Nothing, String})
data = []
# Add text message
if !isempty(message_input)
push!(data, ("text", message_input, "text"))
end
# Add file if selected
if selected_file !== nothing
file_data = read(selected_file)
file_type = get_file_type(selected_file)
push!(data, ("attachment", file_data, file_type))
end
return data
end
function get_file_type(filename::String)::String
if endswith(filename, ".png") || endswith(filename, ".jpg")
return "image"
elseif endswith(filename, ".mp3") || endswith(filename, ".wav")
return "audio"
elseif endswith(filename, ".mp4") || endswith(filename, ".avi")
return "video"
else
return "binary"
end
end
function add_message(ui::ChatUI, user::String, text::String, attachment::Union{Nothing, Dict})
push!(ui.messages, Dict(
"user" => user,
"text" => text,
"attachment" => attachment
))
end
Step 3: Create the Message Handler
# src/chat_handler.jl
using NATSBridge, NATS
struct ChatHandler
nats::NATS.Connection
ui::ChatUI
end
function ChatHandler(nats_connection::NATS.Connection)
ChatHandler(nats_connection, ChatUI())
end
function start(handler::ChatHandler)
# Subscribe to chat rooms
rooms = ["general", "tech", "random"]
for room in rooms
NATS.subscribe(handler.nats, "/chat/$room") do msg
handle_message(handler, msg)
end
end
println("Chat handler started")
end
function handle_message(handler::ChatHandler, msg::NATS.Msg)
env = smartreceive(msg, fileserver_download_handler=_fetch_with_backoff)
# Extract sender info from envelope
sender = get(env, "sender_name", "Anonymous")
# Process each payload
for (dataname, data, type) in env["payloads"]
if type == "text"
add_message(handler.ui, sender, data, nothing)
elseif type == "image"
# Convert to data URL for display
base64_data = base64encode(data)
attachment = Dict(
"type" => "image",
"data" => "data:image/png;base64,$base64_data"
)
add_message(handler.ui, sender, "", attachment)
else
# For other types, use file server URL
attachment = Dict("type" => type, "data" => data)
add_message(handler.ui, sender, "", attachment)
end
end
end
function download_file(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
# Implement exponential backoff for file server downloads
# Return downloaded data as Vector{UInt8}
end
Step 4: Run the Application
# Start NATS
docker run -p 4222:4222 nats:latest
# Start file server
mkdir -p /tmp/fileserver
python3 -m http.server 8080 --directory /tmp/fileserver
# Run chat app
julia src/chat_ui.jl
julia src/chat_handler.jl
Building a File Transfer System
Let's build a file transfer system that handles large files efficiently.
Step 1: File Upload Service (Julia)
# src/file_upload_service.jl
using NATSBridge, HTTP
struct FileUploadService
broker_url::String
fileserver_url::String
end
function FileUploadService(broker_url::String, fileserver_url::String)
FileUploadService(broker_url, fileserver_url)
end
function upload_file(service::FileUploadService, file_path::String, recipient::String)::Dict
file_data = read(file_path)
file_name = basename(file_path)
data = [("file", file_data, "binary")]
env, env_json_str = smartsend(
"/files/$recipient",
data,
broker_url=service.broker_url,
fileserver_url=service.fileserver_url
)
return env
end
function upload_large_file(service::FileUploadService, file_path::String, recipient::String)::Dict
file_size = stat(file_path).size
if file_size > 100 * 1024 * 1024 # > 100MB
println("File too large for direct upload, using streaming...")
return stream_upload(service, file_path, recipient)
end
return upload_file(service, file_path, recipient)
end
function stream_upload(service::FileUploadService, file_path::String, recipient::String)::Dict
# Implement streaming upload to file server
# This would require a more sophisticated file server
# For now, we'll use the standard upload
return upload_file(service, file_path, recipient)
end
Step 2: File Download Service (Julia)
# src/file_download_service.jl
using NATSBridge
struct FileDownloadService
nats_url::String
end
function FileDownloadService(nats_url::String)
FileDownloadService(nats_url)
end
function download_file(service::FileDownloadService, msg::NATS.Msg, sender::String, download_id::String)
# Subscribe to sender's file channel
env = smartreceive(msg, fileserver_download_handler=fetch_from_url)
# Process each payload
for (dataname, data, type) in env["payloads"]
if type == "binary"
file_path = "/downloads/$dataname"
write(file_path, data)
println("File saved to $file_path")
end
end
end
function fetch_from_url(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
# Fetch data from URL with exponential backoff
# Return downloaded data as Vector{UInt8}
end
Step 3: File Transfer CLI (Julia)
# src/cli.jl
using NATSBridge, Readlines, FileIO
function main()
config = JSON3.read(read("config.json", String))
println("File Transfer System")
println("====================")
println("1. Upload file")
println("2. Download file")
println("3. List pending downloads")
print("Enter choice: ")
choice = readline()
if choice == "1"
upload_file_cli(config)
elseif choice == "2"
download_file_cli(config)
end
end
function upload_file_cli(config)
print("Enter file path: ")
file_path = readline()
print("Enter recipient: ")
recipient = readline()
file_service = FileUploadService(config.nats_url, config.fileserver_url)
try
env = upload_file(file_service, file_path, recipient)
println("Upload successful!")
println("File ID: $(env["payloads"][1][1])")
catch error
println("Upload failed: $(error)")
end
end
function download_file_cli(config)
print("Enter sender: ")
sender = readline()
file_service = FileDownloadService(config.nats_url)
try
download_file(file_service, sender)
println("Download complete!")
catch error
println("Download failed: $(error)")
end
end
main()
Building a Streaming Data Pipeline
Let's build a data pipeline that processes streaming data from sensors.
Step 1: Sensor Data Model (Julia)
# src/sensor_data.jl
using Dates, DataFrames
struct SensorReading
sensor_id::String
timestamp::String
value::Float64
unit::String
metadata::Dict{String, Any}
end
function SensorReading(sensor_id::String, value::Float64, unit::String, metadata::Dict{String, Any}=Dict())
SensorReading(
sensor_id,
ISODateTime(now(), Dates.Second) |> string,
value,
unit,
metadata
)
end
struct SensorBatch
readings::Vector{SensorReading}
end
function SensorBatch()
SensorBatch(SensorReading[])
end
function add_reading(batch::SensorBatch, reading::SensorReading)
push!(batch.readings, reading)
end
function to_dataframe(batch::SensorBatch)::DataFrame
data = Dict{String, Any}()
data["sensor_id"] = [r.sensor_id for r in batch.readings]
data["timestamp"] = [r.timestamp for r in batch.readings]
data["value"] = [r.value for r in batch.readings]
data["unit"] = [r.unit for r in batch.readings]
return DataFrame(data)
end
Step 2: Sensor Sender (Julia)
# src/sensor_sender.jl
using NATSBridge, Dates, Random
struct SensorSender
broker_url::String
fileserver_url::String
end
function SensorSender(broker_url::String, fileserver_url::String)
SensorSender(broker_url, fileserver_url)
end
function send_reading(sender::SensorSender, sensor_id::String, value::Float64, unit::String)
reading = SensorReading(sensor_id, value, unit)
data = [("reading", reading.metadata, "dictionary")]
# Default: is_publish=True (automatically publishes to NATS)
smartsend(
"/sensors/$sensor_id",
data,
broker_url=sender.broker_url,
fileserver_url=sender.fileserver_url
)
end
function prepare_message_only(sender::SensorSender, sensor_id::String, value::Float64, unit::String)
"""Prepare a message without publishing (is_publish=False)."""
reading = SensorReading(sensor_id, value, unit)
data = [("reading", reading.metadata, "dictionary")]
# With is_publish=False, returns (env, env_json_str) without publishing
env, env_json_str = smartsend(
"/sensors/$sensor_id/prepare",
data,
broker_url=sender.broker_url,
fileserver_url=sender.fileserver_url,
is_publish=false
)
# Now you can publish manually using NATS request-reply pattern
# nc.request(subject, env_json_str, reply_to=reply_to_topic)
return env, env_json_str
end
function send_batch(sender::SensorSender, readings::Vector{SensorReading})
batch = SensorBatch()
for reading in readings
add_reading(batch, reading)
end
df = to_dataframe(batch)
# Convert to Arrow IPC format
import Arrow
table = Arrow.Table(df)
# Serialize to Arrow IPC
import IOBuffer
buf = IOBuffer()
Arrow.write(buf, table)
arrow_data = take!(buf)
# Send based on size
if length(arrow_data) < 1048576 # < 1MB
data = [("batch", arrow_data, "table")]
smartsend(
"/sensors/batch",
data,
broker_url=sender.broker_url,
fileserver_url=sender.fileserver_url
)
else
# Upload to file server
data = [("batch", arrow_data, "table")]
smartsend(
"/sensors/batch",
data,
broker_url=sender.broker_url,
fileserver_url=sender.fileserver_url
)
end
end
Step 3: Sensor Receiver (Julia)
# src/sensor_receiver.jl
using NATSBridge, Arrow, DataFrames, IOBuffer
struct SensorReceiver
fileserver_download_handler::Function
end
function SensorReceiver(download_handler::Function)
SensorReceiver(download_handler)
end
function process_reading(receiver::SensorReceiver, msg::NATS.Msg)
env = smartreceive(msg, receiver.fileserver_download_handler)
for (dataname, data, data_type) in env["payloads"]
if data_type == "dictionary"
# Process dictionary payload
println("Received: $dataname = $data")
elseif data_type == "table"
# Deserialize Arrow IPC
buf = IOBuffer(data)
table = Arrow.read(buf)
df = DataFrame(table)
println("Received batch with $(nrow(df)) readings")
println(df)
end
end
end
Performance Optimization
1. Batch Processing
# Batch multiple readings into a single message
function send_batch_readings(sender::SensorSender, readings::Vector{Tuple{String, Float64, String}})
batch = SensorBatch()
for (sensor_id, value, unit) in readings
reading = SensorReading(sensor_id, value, unit)
add_reading(batch, reading)
end
df = to_dataframe(batch)
# Convert to Arrow IPC
import Arrow
table = Arrow.Table(df)
# Serialize to Arrow IPC
import IOBuffer
buf = IOBuffer()
Arrow.write(buf, table)
arrow_data = take!(buf)
# Send as single message
smartsend(
"/sensors/batch",
[("batch", arrow_data, "table")],
broker_url=sender.broker_url
)
end
2. Connection Reuse
# Reuse NATS connections
function create_connection_pool()
connections = Dict{String, NATS.Connection}()
function get_connection(nats_url::String)::NATS.Connection
if !haskey(connections, nats_url)
connections[nats_url] = NATS.connect(nats_url)
end
return connections[nats_url]
end
function close_all()
for conn in values(connections)
NATS.drain(conn)
end
empty!(connections)
end
return (get_connection= get_connection, close_all=close_all)
end
3. Caching
# Cache file server responses
using Base.Threads
const file_cache = Dict{String, Vector{UInt8}}()
function fetch_with_caching(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
if haskey(file_cache, url)
return file_cache[url]
end
# Fetch from file server
data = _fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id)
# Cache the result
file_cache[url] = data
return data
end
Best Practices
1. Error Handling
function safe_smartsend(subject::String, data::Vector{Tuple}, kwargs...)
try
return smartsend(subject, data; kwargs...)
catch error
println("Failed to send message: $(error)")
return nothing
end
end
2. Logging
using Logging
function log_send(subject::String, data::Vector{Tuple}, correlation_id::String)
@info "Sending to $subject: $(length(data)) payloads, correlation_id=$correlation_id"
end
function log_receive(correlation_id::String, num_payloads::Int)
@info "Received message: $num_payloads payloads, correlation_id=$correlation_id"
end
3. Rate Limiting
using Dates, Collections
struct RateLimiter
max_requests::Int
time_window::Float64
requests::Deque{Float64}
end
function RateLimiter(max_requests::Int, time_window::Float64)
RateLimiter(max_requests, time_window, Deque{Float64}())
end
function allow(limiter::RateLimiter)::Bool
now = time()
# Remove old requests
while !isempty(limiter.requests) && limiter.requests[1] < now - limiter.time_window
popfirst!(limiter.requests)
end
if length(limiter.requests) >= limiter.max_requests
return false
end
push!(limiter.requests, now)
return true
end
Conclusion
This walkthrough covered:
- Building a chat application with rich media support
- Building a file transfer system with claim-check pattern
- Building a streaming data pipeline for sensor data
For more information, check the API documentation and test examples.
License
MIT