This commit is contained in:
2026-02-22 21:55:18 +07:00
parent 69f2173f75
commit def75d8f86
5 changed files with 541 additions and 127 deletions

634
examples/tutorial_julia.md Normal file
View File

@@ -0,0 +1,634 @@
# NATSBridge.jl Tutorial
A comprehensive tutorial for learning how to use NATSBridge.jl for bi-directional communication between Julia and JavaScript services using NATS.
## Table of Contents
1. [What is NATSBridge.jl?](#what-is-natsbridgejl)
2. [Key Concepts](#key-concepts)
3. [Installation](#installation)
4. [Basic Usage](#basic-usage)
5. [Payload Types](#payload-types)
6. [Transport Strategies](#transport-strategies)
7. [Advanced Features](#advanced-features)
8. [Complete Examples](#complete-examples)
---
## What is NATSBridge.jl?
NATSBridge.jl is a Julia module that provides a high-level API for sending and receiving data across network boundaries using NATS as the message bus. It implements the **Claim-Check pattern** for handling large payloads efficiently.
### Core Features
- **Bi-directional communication**: Julia ↔ JavaScript
- **Smart transport selection**: Automatic direct vs link transport based on payload size
- **Multi-payload support**: Send multiple payloads of different types in a single message
- **Claim-check pattern**: Upload large files to HTTP server, send only URLs via NATS
- **Type-aware serialization**: Different serialization strategies for different data types
---
## Key Concepts
### 1. msgEnvelope_v1 (Message Envelope)
The `msgEnvelope_v1` structure provides a comprehensive message format for bidirectional communication:
```julia
struct msgEnvelope_v1
correlationId::String # Unique identifier to track messages
msgId::String # This message id
timestamp::String # Message published timestamp
sendTo::String # Topic/subject the sender sends to
msgPurpose::String # Purpose (ACK | NACK | updateStatus | shutdown | chat)
senderName::String # Sender name (e.g., "agent-wine-web-frontend")
senderId::String # Sender id (uuid4)
receiverName::String # Message receiver name (e.g., "agent-backend")
receiverId::String # Message receiver id (uuid4 or nothing for broadcast)
replyTo::String # Topic to reply to
replyToMsgId::String # Message id this message is replying to
brokerURL::String # NATS server address
metadata::Dict{String, Any}
payloads::AbstractArray{msgPayload_v1} # Multiple payloads stored here
end
```
### 2. msgPayload_v1 (Payload Structure)
The `msgPayload_v1` structure provides flexible payload handling:
```julia
struct msgPayload_v1
id::String # Id of this payload (e.g., "uuid4")
dataname::String # Name of this payload (e.g., "login_image")
type::String # "text | dictionary | table | image | audio | video | binary"
transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc"
size::Integer # Data size in bytes
data::Any # Payload data in case of direct transport or a URL in case of link
metadata::Dict{String, Any} # Dict("checksum" => "sha256_hash", ...)
end
```
### 3. Standard API Format
The system uses a **standardized list-of-tuples format** for all payload operations:
```julia
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (always returns a list of tuples)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
```
**Important**: Even when sending a single payload, you must wrap it in a list.
---
## Installation
```julia
using Pkg
Pkg.add("NATS")
Pkg.add("JSON")
Pkg.add("Arrow")
Pkg.add("HTTP")
Pkg.add("UUIDs")
Pkg.add("Dates")
Pkg.add("Base64")
Pkg.add("PrettyPrinting")
Pkg.add("DataFrames")
```
Then include the NATSBridge module:
```julia
include("NATSBridge.jl")
using .NATSBridge
```
---
## Basic Usage
### Sending Data (smartsend)
```julia
using NATSBridge
# Send a simple dictionary
data = Dict("key" => "value")
env = NATSBridge.smartsend("my.subject", [("dataname1", data, "dictionary")])
```
### Receiving Data (smartreceive)
```julia
using NATSBridge
# Subscribe to a NATS subject
NATS.subscribe("my.subject") do msg
# Process the message
result = NATSBridge.smartreceive(
msg,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
# result is a list of (dataname, data, type) tuples
for (dataname, data, type) in result
println("Received $dataname of type $type")
println("Data: $data")
end
end
```
---
## Payload Types
NATSBridge.jl supports the following payload types:
| Type | Description | Serialization |
|------|-------------|---------------|
| `text` | Plain text | UTF-8 encoding |
| `dictionary` | JSON-serializable data (Dict, NamedTuple) | JSON |
| `table` | Tabular data (DataFrame, array of structs) | Apache Arrow IPC |
| `image` | Image data (Bitmap, PNG/JPG bytes) | Binary |
| `audio` | Audio data (WAV, MP3 bytes) | Binary |
| `video` | Video data (MP4, AVI bytes) | Binary |
| `binary` | Generic binary data | Binary |
---
## Transport Strategies
NATSBridge.jl automatically selects the appropriate transport strategy based on payload size:
### Direct Transport (< 1MB)
Small payloads are encoded as Base64 and sent directly over NATS.
```julia
# Small data (< 1MB) - uses direct transport
small_data = rand(1000) # ~8KB
env = NATSBridge.smartsend("small", [("data", small_data, "table")])
```
### Link Transport (≥ 1MB)
Large payloads are uploaded to an HTTP file server, and only the URL is sent via NATS.
```julia
# Large data (≥ 1MB) - uses link transport
large_data = rand(10_000_000) # ~80MB
env = NATSBridge.smartsend("large", [("data", large_data, "table")])
```
---
## Complete Examples
### Example 1: Text Message
**Sender:**
```julia
using NATSBridge
using UUIDs
const SUBJECT = "/NATSBridge_text_test"
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function test_text_send()
small_text = "Hello, this is a small text message."
large_text = join(["Line $i: " for i in 1:50000], "")
data1 = ("small_text", small_text, "text")
data2 = ("large_text", large_text, "text")
env = NATSBridge.smartsend(
SUBJECT,
[data1, data2],
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat",
sender_name = "text_sender"
)
end
```
**Receiver:**
```julia
using NATSBridge
const SUBJECT = "/NATSBridge_text_test"
const NATS_URL = "nats://localhost:4222"
function test_text_receive()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
result = NATSBridge.smartreceive(
msg,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
for (dataname, data, data_type) in result
if data_type == "text"
println("Received text: $data")
write("./received_$dataname.txt", data)
end
end
end
sleep(120)
NATS.drain(conn)
end
```
### Example 2: Dictionary (JSON) Message
**Sender:**
```julia
using NATSBridge
using UUIDs
const SUBJECT = "/NATSBridge_dict_test"
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function test_dict_send()
small_dict = Dict("name" => "Alice", "age" => 30)
large_dict = Dict("ids" => collect(1:50000), "names" => ["User_$i" for i in 1:50000])
data1 = ("small_dict", small_dict, "dictionary")
data2 = ("large_dict", large_dict, "dictionary")
env = NATSBridge.smartsend(
SUBJECT,
[data1, data2],
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat"
)
end
```
**Receiver:**
```julia
using NATSBridge
const SUBJECT = "/NATSBridge_dict_test"
const NATS_URL = "nats://localhost:4222"
function test_dict_receive()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
result = NATSBridge.smartreceive(
msg,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
for (dataname, data, data_type) in result
if data_type == "dictionary"
println("Received dictionary: $data")
write("./received_$dataname.json", JSON.json(data, 2))
end
end
end
sleep(120)
NATS.drain(conn)
end
```
### Example 3: DataFrame (Table) Message
**Sender:**
```julia
using NATSBridge
using DataFrames
using UUIDs
const SUBJECT = "/NATSBridge_table_test"
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function test_table_send()
small_df = DataFrame(id = 1:10, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92])
large_df = DataFrame(id = 1:50000, name = ["User_$i" for i in 1:50000], score = rand(1:100, 50000))
data1 = ("small_table", small_df, "table")
data2 = ("large_table", large_df, "table")
env = NATSBridge.smartsend(
SUBJECT,
[data1, data2],
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat"
)
end
```
**Receiver:**
```julia
using NATSBridge
using DataFrames
const SUBJECT = "/NATSBridge_table_test"
const NATS_URL = "nats://localhost:4222"
function test_table_receive()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
result = NATSBridge.smartreceive(
msg,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
for (dataname, data, data_type) in result
if data_type == "table"
data = DataFrame(data)
println("Received DataFrame with $(size(data, 1)) rows")
display(data[1:min(5, size(data, 1)), :])
end
end
end
sleep(120)
NATS.drain(conn)
end
```
### Example 4: Mixed Content (Chat with Text, Image, Audio)
**Sender:**
```julia
using NATSBridge
using DataFrames
using UUIDs
const SUBJECT = "/NATSBridge_mix_test"
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function test_mix_send()
# Text data
text_data = "Hello! This is a test chat message. 🎉"
# Dictionary data
dict_data = Dict("type" => "chat", "sender" => "serviceA")
# Small table data
table_data_small = DataFrame(id = 1:10, name = ["msg_$i" for i in 1:10])
# Large table data (link transport)
table_data_large = DataFrame(id = 1:150_000, name = ["msg_$i" for i in 1:150_000])
# Small image data (direct transport)
image_data = UInt8[rand(1:255) for _ in 1:100]
# Large image data (link transport)
large_image_data = UInt8[rand(1:255) for _ in 1:1_500_000]
# Small audio data (direct transport)
audio_data = UInt8[rand(1:255) for _ in 1:100]
# Large audio data (link transport)
large_audio_data = UInt8[rand(1:255) for _ in 1:1_500_000]
# Small video data (direct transport)
video_data = UInt8[rand(1:255) for _ in 1:150]
# Large video data (link transport)
large_video_data = UInt8[rand(1:255) for _ in 1:1_500_000]
# Small binary data (direct transport)
binary_data = UInt8[rand(1:255) for _ in 1:200]
# Large binary data (link transport)
large_binary_data = UInt8[rand(1:255) for _ in 1:1_500_000]
# Create payloads list - mixed content
payloads = [
# Small data (direct transport)
("chat_text", text_data, "text"),
("chat_json", dict_data, "dictionary"),
("chat_table_small", table_data_small, "table"),
# Large data (link transport)
("chat_table_large", table_data_large, "table"),
("user_image_large", large_image_data, "image"),
("audio_clip_large", large_audio_data, "audio"),
("video_clip_large", large_video_data, "video"),
("binary_file_large", large_binary_data, "binary")
]
env = NATSBridge.smartsend(
SUBJECT,
payloads,
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat",
sender_name = "mix_sender"
)
end
```
**Receiver:**
```julia
using NATSBridge
using DataFrames
const SUBJECT = "/NATSBridge_mix_test"
const NATS_URL = "nats://localhost:4222"
function test_mix_receive()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
result = NATSBridge.smartreceive(
msg,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
println("Received $(length(result)) payloads")
for (dataname, data, data_type) in result
println("\n=== Payload: $dataname (type: $data_type) ===")
if data_type == "text"
println(" Type: String")
println(" Length: $(length(data)) characters")
elseif data_type == "dictionary"
println(" Type: JSON Object")
println(" Keys: $(keys(data))")
elseif data_type == "table"
data = DataFrame(data)
println(" Type: DataFrame")
println(" Dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns")
elseif data_type == "image"
println(" Type: Vector{UInt8}")
println(" Size: $(length(data)) bytes")
write("./received_$dataname.bin", data)
elseif data_type == "audio"
println(" Type: Vector{UInt8}")
println(" Size: $(length(data)) bytes")
write("./received_$dataname.bin", data)
elseif data_type == "video"
println(" Type: Vector{UInt8}")
println(" Size: $(length(data)) bytes")
write("./received_$dataname.bin", data)
elseif data_type == "binary"
println(" Type: Vector{UInt8}")
println(" Size: $(length(data)) bytes")
write("./received_$dataname.bin", data)
end
end
end
sleep(120)
NATS.drain(conn)
end
```
---
## Best Practices
1. **Always wrap payloads in a list** - Even for single payloads: `[("dataname", data, "type")]`
2. **Use appropriate transport** - Let NATSBridge handle size-based routing (default 1MB threshold)
3. **Customize size threshold** - Use `size_threshold` parameter to adjust the direct/link split
4. **Provide fileserver handler** - Implement `fileserverUploadHandler` for link transport
5. **Include correlation IDs** - Track messages across distributed systems
6. **Handle errors** - Implement proper error handling for network failures
7. **Close connections** - Ensure NATS connections are properly closed using `NATS.drain()`
---
## Conclusion
NATSBridge.jl provides a powerful abstraction for bi-directional communication between Julia and JavaScript services. By understanding the key concepts and following the best practices, you can build robust, scalable applications that leverage the full power of NATS messaging.
For more information, see:
- [`docs/architecture.md`](./architecture.md) - Detailed architecture documentation
- [`docs/implementation.md`](./implementation.md) - Implementation details

View File

@@ -0,0 +1,939 @@
# NATSBridge.jl Walkthrough: Building a Chat System
A step-by-step guided walkthrough for building a real-time chat system using NATSBridge.jl with mixed content support (text, images, audio, video, and files).
## Prerequisites
- Julia 1.7+
- NATS server running
- HTTP file server (Plik) running
## Step 1: Understanding the Chat System Architecture
### System Components
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ Chat System │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ NATS ┌──────────────┐ │
│ │ Julia │◄───────┬───────► │ JavaScript │ │
│ │ Service │ │ │ Client │ │
│ │ │ │ │ │ │
│ │ - Text │ │ │ - Text │ │
│ │ - Images │ │ │ - Images │ │
│ │ - Audio │ ▼ │ - Audio │ │
│ │ - Video │ NATSBridge.jl │ - Files │ │
│ │ - Files │ │ │ - Tables │ │
│ └──────────────┘ │ └──────────────┘ │
│ │ │
│ ┌───────┴───────┐ │
│ │ NATS │ │
│ │ Server │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
For large payloads (> 1MB):
┌─────────────────────────────────────────────────────────────────────────────┐
│ File Server (Plik) │
│ │
│ Julia Service ──► Upload ──► File Server ──► Download ◄── JavaScript Client│
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
### Message Format
Each chat message is an envelope containing multiple payloads:
```json
{
"correlationId": "uuid4",
"msgId": "uuid4",
"timestamp": "2024-01-15T10:30:00Z",
"sendTo": "/chat/room1",
"msgPurpose": "chat",
"senderName": "user-1",
"senderId": "uuid4",
"receiverName": "user-2",
"receiverId": "uuid4",
"brokerURL": "nats://localhost:4222",
"payloads": [
{
"id": "uuid4",
"dataname": "message_text",
"type": "text",
"transport": "direct",
"encoding": "base64",
"size": 256,
"data": "SGVsbG8gV29ybGQh",
"metadata": {}
},
{
"id": "uuid4",
"dataname": "user_image",
"type": "image",
"transport": "link",
"encoding": "none",
"size": 15433,
"data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/image.jpg",
"metadata": {}
}
]
}
```
## Step 2: Setting Up the Environment
### 1. Start NATS Server
```bash
# Using Docker
docker run -d -p 4222:4222 -p 8222:8222 --name nats-server nats:latest
# Or download from https://github.com/nats-io/nats-server/releases
./nats-server
```
### 2. Start HTTP File Server (Plik)
```bash
# Using Docker
docker run -d -p 8080:8080 --name plik plik/plik:latest
# Or download from https://github.com/arnaud-lb/plik/releases
./plikd -d
```
### 3. Install Julia Dependencies
```julia
using Pkg
Pkg.add("NATS")
Pkg.add("JSON")
Pkg.add("Arrow")
Pkg.add("HTTP")
Pkg.add("UUIDs")
Pkg.add("Dates")
Pkg.add("Base64")
Pkg.add("PrettyPrinting")
Pkg.add("DataFrames")
```
## Step 3: Basic Text-Only Chat
### Sender (User 1)
```julia
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
# Include the bridge module
include("NATSBridge.jl")
using .NATSBridge
# Configuration
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
const SUBJECT = "/chat/room1"
# File upload handler for plik server
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
# Send a simple text message
function send_text_message()
message_text = "Hello, how are you today?"
env = NATSBridge.smartsend(
SUBJECT,
[("message", message_text, "text")],
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat",
sender_name = "user-1"
)
println("Sent text message with correlation ID: $(env.correlationId)")
end
send_text_message()
```
### Receiver (User 2)
```julia
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
# Include the bridge module
include("NATSBridge.jl")
using .NATSBridge
# Configuration
const NATS_URL = "nats://localhost:4222"
const SUBJECT = "/chat/room1"
# Message handler
function message_handler(msg::NATS.Msg)
payloads = NATSBridge.smartreceive(
msg,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
# Extract the text message
for (dataname, data, data_type) in payloads
if data_type == "text"
println("Received message: $data")
# Save to file
write("./received_$dataname.txt", data)
end
end
end
# Subscribe to the chat room
NATS.subscribe(SUBJECT) do msg
message_handler(msg)
end
# Keep the program running
while true
sleep(1)
end
```
## Step 4: Adding Image Support
### Sending an Image
```julia
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
include("NATSBridge.jl")
using .NATSBridge
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
const SUBJECT = "/chat/room1"
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function send_image()
# Read image file
image_data = read("screenshot.png", Vector{UInt8})
# Send with text message
env = NATSBridge.smartsend(
SUBJECT,
[
("text", "Check out this screenshot!", "text"),
("screenshot", image_data, "image")
],
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat",
sender_name = "user-1"
)
println("Sent image with correlation ID: $(env.correlationId)")
end
send_image()
```
### Receiving an Image
```julia
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
include("NATSBridge.jl")
using .NATSBridge
const NATS_URL = "nats://localhost:4222"
const SUBJECT = "/chat/room1"
function message_handler(msg::NATS.Msg)
payloads = NATSBridge.smartreceive(
msg,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
for (dataname, data, data_type) in payloads
if data_type == "text"
println("Text: $data")
elseif data_type == "image"
# Save image to file
filename = "received_$dataname.bin"
write(filename, data)
println("Saved image: $filename")
end
end
end
NATS.subscribe(SUBJECT) do msg
message_handler(msg)
end
```
## Step 5: Handling Large Files with Link Transport
### Automatic Transport Selection
```julia
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
include("NATSBridge.jl")
using .NATSBridge
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
const SUBJECT = "/chat/room1"
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function send_large_file()
# Create a large file (> 1MB triggers link transport)
large_data = rand(10_000_000) # ~80MB
env = NATSBridge.smartsend(
SUBJECT,
[("large_file", large_data, "binary")],
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat",
sender_name = "user-1"
)
println("Uploaded large file to: $(env.payloads[1].data)")
println("Correlation ID: $(env.correlationId)")
end
send_large_file()
```
## Step 6: Audio and Video Support
### Sending Audio
```julia
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
include("NATSBridge.jl")
using .NATSBridge
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
const SUBJECT = "/chat/room1"
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function send_audio()
# Read audio file (WAV, MP3, etc.)
audio_data = read("voice_message.mp3", Vector{UInt8})
env = NATSBridge.smartsend(
SUBJECT,
[("voice_message", audio_data, "audio")],
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat",
sender_name = "user-1"
)
println("Sent audio message: $(env.correlationId)")
end
send_audio()
```
### Sending Video
```julia
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
include("NATSBridge.jl")
using .NATSBridge
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
const SUBJECT = "/chat/room1"
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function send_video()
# Read video file (MP4, AVI, etc.)
video_data = read("video_message.mp4", Vector{UInt8})
env = NATSBridge.smartsend(
SUBJECT,
[("video_message", video_data, "video")],
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat",
sender_name = "user-1"
)
println("Sent video message: $(env.correlationId)")
end
send_video()
```
## Step 7: Table/Data Exchange
### Sending Tabular Data
```julia
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
include("NATSBridge.jl")
using .NATSBridge
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
const SUBJECT = "/chat/room1"
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function send_table()
# Create a DataFrame
df = DataFrame(
id = 1:5,
name = ["Alice", "Bob", "Charlie", "Diana", "Eve"],
score = [95, 88, 92, 98, 85],
grade = ['A', 'B', 'A', 'B', 'B']
)
env = NATSBridge.smartsend(
SUBJECT,
[("student_scores", df, "table")],
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat",
sender_name = "user-1"
)
println("Sent table with $(nrow(df)) rows")
end
send_table()
```
### Receiving and Using Tables
```julia
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
include("NATSBridge.jl")
using .NATSBridge
const NATS_URL = "nats://localhost:4222"
const SUBJECT = "/chat/room1"
function message_handler(msg::NATS.Msg)
payloads = NATSBridge.smartreceive(
msg,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
for (dataname, data, data_type) in payloads
if data_type == "table"
data = DataFrame(data)
println("Received table:")
show(data)
println("\nAverage score: $(mean(data.score))")
end
end
end
NATS.subscribe(SUBJECT) do msg
message_handler(msg)
end
```
## Step 8: Bidirectional Communication
### Request-Response Pattern
```julia
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
include("NATSBridge.jl")
using .NATSBridge
const NATS_URL = "nats://localhost:4222"
const SUBJECT = "/api/query"
const REPLY_SUBJECT = "/api/response"
# Request
function send_request()
query_data = Dict("query" => "SELECT * FROM users")
env = NATSBridge.smartsend(
SUBJECT,
[("sql_query", query_data, "dictionary")],
nats_url = NATS_URL,
fileserver_url = "http://localhost:8080",
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "request",
sender_name = "frontend",
receiver_name = "backend",
reply_to = REPLY_SUBJECT,
reply_to_msg_id = string(uuid4())
)
println("Request sent: $(env.correlationId)")
end
# Response handler
function response_handler(msg::NATS.Msg)
payloads = NATSBridge.smartreceive(
msg,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
for (dataname, data, data_type) in payloads
if data_type == "table"
data = DataFrame(data)
println("Query results:")
show(data)
end
end
end
NATS.subscribe(REPLY_SUBJECT) do msg
response_handler(msg)
end
```
## Step 9: Complete Chat Application
### Full Chat System
```julia
module ChatApp
using NATS
using JSON
using UUIDs
using Dates
using PrettyPrinting
using DataFrames
using Arrow
using HTTP
using Base64
# Include the bridge module
include("../src/NATSBridge.jl")
using .NATSBridge
# Configuration
const NATS_URL = "nats://localhost:4222"
const FILESERVER_URL = "http://localhost:8080"
const SUBJECT = "/chat/room1"
# File upload handler for plik server
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
url_getUploadID = "$fileserver_url/upload"
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(String(httpResponse.body))
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
url_upload = "$fileserver_url/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
form = HTTP.Form(Dict("file" => file_multipart))
httpResponse = HTTP.post(url_upload, headers, form)
responseJson = JSON.parse(String(httpResponse.body))
fileid = responseJson["id"]
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end
function send_chat_message(
text::String,
image_path::Union{String, Nothing}=nothing,
audio_path::Union{String, Nothing}=nothing
)
# Build payloads list
payloads = [("message_text", text, "text")]
if image_path !== nothing
image_data = read(image_path, Vector{UInt8})
push!(payloads, ("user_image", image_data, "image"))
end
if audio_path !== nothing
audio_data = read(audio_path, Vector{UInt8})
push!(payloads, ("user_audio", audio_data, "audio"))
end
env = NATSBridge.smartsend(
SUBJECT,
payloads,
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL,
fileserverUploadHandler = plik_upload_handler,
size_threshold = 1_000_000,
correlation_id = string(uuid4()),
msg_purpose = "chat",
sender_name = "user-1"
)
println("Message sent with correlation ID: $(env.correlationId)")
end
function receive_chat_messages()
function message_handler(msg::NATS.Msg)
payloads = NATSBridge.smartreceive(
msg,
max_retries = 5,
base_delay = 100,
max_delay = 5000
)
println("\n--- New Message ---")
for (dataname, data, data_type) in payloads
if data_type == "text"
println("Text: $data")
elseif data_type == "image"
filename = "received_$dataname.bin"
write(filename, data)
println("Image saved: $filename")
elseif data_type == "audio"
filename = "received_$dataname.bin"
write(filename, data)
println("Audio saved: $filename")
elseif data_type == "table"
println("Table received:")
data = DataFrame(data)
show(data)
end
end
end
NATS.subscribe(SUBJECT) do msg
message_handler(msg)
end
println("Subscribed to: $SUBJECT")
end
function run_interactive_chat()
println("\n=== Interactive Chat ===")
println("1. Send a message")
println("2. Join a chat room")
println("3. Exit")
while true
print("\nSelect option (1-3): ")
choice = readline()
if choice == "1"
print("Enter message text: ")
text = readline()
send_chat_message(text)
elseif choice == "2"
receive_chat_messages()
elseif choice == "3"
break
end
end
end
end # module
# Run the chat app
using .ChatApp
ChatApp.run_interactive_chat()
```
## Step 10: Testing the Chat System
### Test Scenario 1: Text-Only Chat
```bash
# Terminal 1: Start the chat receiver
julia test_julia_to_julia_text_receiver.jl
# Terminal 2: Send a message
julia test_julia_to_julia_text_sender.jl
```
### Test Scenario 2: Image Chat
```bash
# Terminal 1: Receive messages
julia test_julia_to_julia_mix_payloads_receiver.jl
# Terminal 2: Send image
julia test_julia_to_julia_mix_payload_sender.jl
```
### Test Scenario 3: Large File Transfer
```bash
# Terminal 2: Send large file
julia test_julia_to_julia_mix_payload_sender.jl
```
## Conclusion
This walkthrough demonstrated how to build a chat system using NATSBridge.jl with support for:
- Text messages
- Images (direct transport for small, link transport for large)
- Audio files
- Video files
- Tabular data (DataFrames)
- Bidirectional communication
- Mixed-content messages
The key takeaways are:
1. **Always wrap payloads in a list** - Even for single payloads: `[("dataname", data, "type")]`
2. **Use appropriate transport** - NATSBridge automatically handles size-based routing
3. **Support mixed content** - Multiple payloads of different types in one message
4. **Handle errors** - Implement proper error handling for network failures
5. **Use correlation IDs** - Track messages across distributed systems
For more information, see:
- [`docs/architecture.md`](./docs/architecture.md) - Detailed architecture documentation
- [`docs/implementation.md`](./docs/implementation.md) - Implementation details