update
This commit is contained in:
939
walkthrough_julia.md
Normal file
939
walkthrough_julia.md
Normal file
@@ -0,0 +1,939 @@
|
||||
# NATSBridge.jl Walkthrough: Building a Chat System
|
||||
|
||||
A step-by-step guided walkthrough for building a real-time chat system using NATSBridge.jl with mixed content support (text, images, audio, video, and files).
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Julia 1.7+
|
||||
- NATS server running
|
||||
- HTTP file server (Plik) running
|
||||
|
||||
## Step 1: Understanding the Chat System Architecture
|
||||
|
||||
### System Components
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────────────┐
|
||||
│ Chat System │
|
||||
├─────────────────────────────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ ┌──────────────┐ NATS ┌──────────────┐ │
|
||||
│ │ Julia │◄───────┬───────► │ JavaScript │ │
|
||||
│ │ Service │ │ │ Client │ │
|
||||
│ │ │ │ │ │ │
|
||||
│ │ - Text │ │ │ - Text │ │
|
||||
│ │ - Images │ │ │ - Images │ │
|
||||
│ │ - Audio │ ▼ │ - Audio │ │
|
||||
│ │ - Video │ NATSBridge.jl │ - Files │ │
|
||||
│ │ - Files │ │ │ - Tables │ │
|
||||
│ └──────────────┘ │ └──────────────┘ │
|
||||
│ │ │
|
||||
│ ┌───────┴───────┐ │
|
||||
│ │ NATS │ │
|
||||
│ │ Server │ │
|
||||
│ └─────────────┘ │
|
||||
│ │
|
||||
└─────────────────────────────────────────────────────────────────────────────┘
|
||||
|
||||
For large payloads (> 1MB):
|
||||
┌─────────────────────────────────────────────────────────────────────────────┐
|
||||
│ File Server (Plik) │
|
||||
│ │
|
||||
│ Julia Service ──► Upload ──► File Server ──► Download ◄── JavaScript Client│
|
||||
│ │
|
||||
└─────────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Message Format
|
||||
|
||||
Each chat message is an envelope containing multiple payloads:
|
||||
|
||||
```json
|
||||
{
|
||||
"correlationId": "uuid4",
|
||||
"msgId": "uuid4",
|
||||
"timestamp": "2024-01-15T10:30:00Z",
|
||||
"sendTo": "/chat/room1",
|
||||
"msgPurpose": "chat",
|
||||
"senderName": "user-1",
|
||||
"senderId": "uuid4",
|
||||
"receiverName": "user-2",
|
||||
"receiverId": "uuid4",
|
||||
"brokerURL": "nats://localhost:4222",
|
||||
"payloads": [
|
||||
{
|
||||
"id": "uuid4",
|
||||
"dataname": "message_text",
|
||||
"type": "text",
|
||||
"transport": "direct",
|
||||
"encoding": "base64",
|
||||
"size": 256,
|
||||
"data": "SGVsbG8gV29ybGQh",
|
||||
"metadata": {}
|
||||
},
|
||||
{
|
||||
"id": "uuid4",
|
||||
"dataname": "user_image",
|
||||
"type": "image",
|
||||
"transport": "link",
|
||||
"encoding": "none",
|
||||
"size": 15433,
|
||||
"data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/image.jpg",
|
||||
"metadata": {}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## Step 2: Setting Up the Environment
|
||||
|
||||
### 1. Start NATS Server
|
||||
|
||||
```bash
|
||||
# Using Docker
|
||||
docker run -d -p 4222:4222 -p 8222:8222 --name nats-server nats:latest
|
||||
|
||||
# Or download from https://github.com/nats-io/nats-server/releases
|
||||
./nats-server
|
||||
```
|
||||
|
||||
### 2. Start HTTP File Server (Plik)
|
||||
|
||||
```bash
|
||||
# Using Docker
|
||||
docker run -d -p 8080:8080 --name plik plik/plik:latest
|
||||
|
||||
# Or download from https://github.com/arnaud-lb/plik/releases
|
||||
./plikd -d
|
||||
```
|
||||
|
||||
### 3. Install Julia Dependencies
|
||||
|
||||
```julia
|
||||
using Pkg
|
||||
Pkg.add("NATS")
|
||||
Pkg.add("JSON")
|
||||
Pkg.add("Arrow")
|
||||
Pkg.add("HTTP")
|
||||
Pkg.add("UUIDs")
|
||||
Pkg.add("Dates")
|
||||
Pkg.add("Base64")
|
||||
Pkg.add("PrettyPrinting")
|
||||
Pkg.add("DataFrames")
|
||||
```
|
||||
|
||||
## Step 3: Basic Text-Only Chat
|
||||
|
||||
### Sender (User 1)
|
||||
|
||||
```julia
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
# Include the bridge module
|
||||
include("NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const FILESERVER_URL = "http://localhost:8080"
|
||||
const SUBJECT = "/chat/room1"
|
||||
|
||||
# File upload handler for plik server
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
# Send a simple text message
|
||||
function send_text_message()
|
||||
message_text = "Hello, how are you today?"
|
||||
|
||||
env = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[("message", message_text, "text")],
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserverUploadHandler = plik_upload_handler,
|
||||
size_threshold = 1_000_000,
|
||||
correlation_id = string(uuid4()),
|
||||
msg_purpose = "chat",
|
||||
sender_name = "user-1"
|
||||
)
|
||||
|
||||
println("Sent text message with correlation ID: $(env.correlationId)")
|
||||
end
|
||||
|
||||
send_text_message()
|
||||
```
|
||||
|
||||
### Receiver (User 2)
|
||||
|
||||
```julia
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
# Include the bridge module
|
||||
include("NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const SUBJECT = "/chat/room1"
|
||||
|
||||
# Message handler
|
||||
function message_handler(msg::NATS.Msg)
|
||||
payloads = NATSBridge.smartreceive(
|
||||
msg,
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
)
|
||||
|
||||
# Extract the text message
|
||||
for (dataname, data, data_type) in payloads
|
||||
if data_type == "text"
|
||||
println("Received message: $data")
|
||||
# Save to file
|
||||
write("./received_$dataname.txt", data)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Subscribe to the chat room
|
||||
NATS.subscribe(SUBJECT) do msg
|
||||
message_handler(msg)
|
||||
end
|
||||
|
||||
# Keep the program running
|
||||
while true
|
||||
sleep(1)
|
||||
end
|
||||
```
|
||||
|
||||
## Step 4: Adding Image Support
|
||||
|
||||
### Sending an Image
|
||||
|
||||
```julia
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
include("NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const FILESERVER_URL = "http://localhost:8080"
|
||||
const SUBJECT = "/chat/room1"
|
||||
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
function send_image()
|
||||
# Read image file
|
||||
image_data = read("screenshot.png", Vector{UInt8})
|
||||
|
||||
# Send with text message
|
||||
env = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[
|
||||
("text", "Check out this screenshot!", "text"),
|
||||
("screenshot", image_data, "image")
|
||||
],
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserverUploadHandler = plik_upload_handler,
|
||||
size_threshold = 1_000_000,
|
||||
correlation_id = string(uuid4()),
|
||||
msg_purpose = "chat",
|
||||
sender_name = "user-1"
|
||||
)
|
||||
|
||||
println("Sent image with correlation ID: $(env.correlationId)")
|
||||
end
|
||||
|
||||
send_image()
|
||||
```
|
||||
|
||||
### Receiving an Image
|
||||
|
||||
```julia
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
include("NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const SUBJECT = "/chat/room1"
|
||||
|
||||
function message_handler(msg::NATS.Msg)
|
||||
payloads = NATSBridge.smartreceive(
|
||||
msg,
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
)
|
||||
|
||||
for (dataname, data, data_type) in payloads
|
||||
if data_type == "text"
|
||||
println("Text: $data")
|
||||
elseif data_type == "image"
|
||||
# Save image to file
|
||||
filename = "received_$dataname.bin"
|
||||
write(filename, data)
|
||||
println("Saved image: $filename")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
NATS.subscribe(SUBJECT) do msg
|
||||
message_handler(msg)
|
||||
end
|
||||
```
|
||||
|
||||
## Step 5: Handling Large Files with Link Transport
|
||||
|
||||
### Automatic Transport Selection
|
||||
|
||||
```julia
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
include("NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const FILESERVER_URL = "http://localhost:8080"
|
||||
const SUBJECT = "/chat/room1"
|
||||
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
function send_large_file()
|
||||
# Create a large file (> 1MB triggers link transport)
|
||||
large_data = rand(10_000_000) # ~80MB
|
||||
|
||||
env = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[("large_file", large_data, "binary")],
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserverUploadHandler = plik_upload_handler,
|
||||
size_threshold = 1_000_000,
|
||||
correlation_id = string(uuid4()),
|
||||
msg_purpose = "chat",
|
||||
sender_name = "user-1"
|
||||
)
|
||||
|
||||
println("Uploaded large file to: $(env.payloads[1].data)")
|
||||
println("Correlation ID: $(env.correlationId)")
|
||||
end
|
||||
|
||||
send_large_file()
|
||||
```
|
||||
|
||||
## Step 6: Audio and Video Support
|
||||
|
||||
### Sending Audio
|
||||
|
||||
```julia
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
include("NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const FILESERVER_URL = "http://localhost:8080"
|
||||
const SUBJECT = "/chat/room1"
|
||||
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
function send_audio()
|
||||
# Read audio file (WAV, MP3, etc.)
|
||||
audio_data = read("voice_message.mp3", Vector{UInt8})
|
||||
|
||||
env = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[("voice_message", audio_data, "audio")],
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserverUploadHandler = plik_upload_handler,
|
||||
size_threshold = 1_000_000,
|
||||
correlation_id = string(uuid4()),
|
||||
msg_purpose = "chat",
|
||||
sender_name = "user-1"
|
||||
)
|
||||
|
||||
println("Sent audio message: $(env.correlationId)")
|
||||
end
|
||||
|
||||
send_audio()
|
||||
```
|
||||
|
||||
### Sending Video
|
||||
|
||||
```julia
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
include("NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const FILESERVER_URL = "http://localhost:8080"
|
||||
const SUBJECT = "/chat/room1"
|
||||
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
function send_video()
|
||||
# Read video file (MP4, AVI, etc.)
|
||||
video_data = read("video_message.mp4", Vector{UInt8})
|
||||
|
||||
env = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[("video_message", video_data, "video")],
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserverUploadHandler = plik_upload_handler,
|
||||
size_threshold = 1_000_000,
|
||||
correlation_id = string(uuid4()),
|
||||
msg_purpose = "chat",
|
||||
sender_name = "user-1"
|
||||
)
|
||||
|
||||
println("Sent video message: $(env.correlationId)")
|
||||
end
|
||||
|
||||
send_video()
|
||||
```
|
||||
|
||||
## Step 7: Table/Data Exchange
|
||||
|
||||
### Sending Tabular Data
|
||||
|
||||
```julia
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
include("NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const FILESERVER_URL = "http://localhost:8080"
|
||||
const SUBJECT = "/chat/room1"
|
||||
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
function send_table()
|
||||
# Create a DataFrame
|
||||
df = DataFrame(
|
||||
id = 1:5,
|
||||
name = ["Alice", "Bob", "Charlie", "Diana", "Eve"],
|
||||
score = [95, 88, 92, 98, 85],
|
||||
grade = ['A', 'B', 'A', 'B', 'B']
|
||||
)
|
||||
|
||||
env = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[("student_scores", df, "table")],
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserverUploadHandler = plik_upload_handler,
|
||||
size_threshold = 1_000_000,
|
||||
correlation_id = string(uuid4()),
|
||||
msg_purpose = "chat",
|
||||
sender_name = "user-1"
|
||||
)
|
||||
|
||||
println("Sent table with $(nrow(df)) rows")
|
||||
end
|
||||
|
||||
send_table()
|
||||
```
|
||||
|
||||
### Receiving and Using Tables
|
||||
|
||||
```julia
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
include("NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const SUBJECT = "/chat/room1"
|
||||
|
||||
function message_handler(msg::NATS.Msg)
|
||||
payloads = NATSBridge.smartreceive(
|
||||
msg,
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
)
|
||||
|
||||
for (dataname, data, data_type) in payloads
|
||||
if data_type == "table"
|
||||
data = DataFrame(data)
|
||||
println("Received table:")
|
||||
show(data)
|
||||
println("\nAverage score: $(mean(data.score))")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
NATS.subscribe(SUBJECT) do msg
|
||||
message_handler(msg)
|
||||
end
|
||||
```
|
||||
|
||||
## Step 8: Bidirectional Communication
|
||||
|
||||
### Request-Response Pattern
|
||||
|
||||
```julia
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
include("NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const SUBJECT = "/api/query"
|
||||
const REPLY_SUBJECT = "/api/response"
|
||||
|
||||
# Request
|
||||
function send_request()
|
||||
query_data = Dict("query" => "SELECT * FROM users")
|
||||
|
||||
env = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
[("sql_query", query_data, "dictionary")],
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = "http://localhost:8080",
|
||||
fileserverUploadHandler = plik_upload_handler,
|
||||
size_threshold = 1_000_000,
|
||||
correlation_id = string(uuid4()),
|
||||
msg_purpose = "request",
|
||||
sender_name = "frontend",
|
||||
receiver_name = "backend",
|
||||
reply_to = REPLY_SUBJECT,
|
||||
reply_to_msg_id = string(uuid4())
|
||||
)
|
||||
|
||||
println("Request sent: $(env.correlationId)")
|
||||
end
|
||||
|
||||
# Response handler
|
||||
function response_handler(msg::NATS.Msg)
|
||||
payloads = NATSBridge.smartreceive(
|
||||
msg,
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
)
|
||||
|
||||
for (dataname, data, data_type) in payloads
|
||||
if data_type == "table"
|
||||
data = DataFrame(data)
|
||||
println("Query results:")
|
||||
show(data)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
NATS.subscribe(REPLY_SUBJECT) do msg
|
||||
response_handler(msg)
|
||||
end
|
||||
```
|
||||
|
||||
## Step 9: Complete Chat Application
|
||||
|
||||
### Full Chat System
|
||||
|
||||
```julia
|
||||
module ChatApp
|
||||
using NATS
|
||||
using JSON
|
||||
using UUIDs
|
||||
using Dates
|
||||
using PrettyPrinting
|
||||
using DataFrames
|
||||
using Arrow
|
||||
using HTTP
|
||||
using Base64
|
||||
|
||||
# Include the bridge module
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const FILESERVER_URL = "http://localhost:8080"
|
||||
const SUBJECT = "/chat/room1"
|
||||
|
||||
# File upload handler for plik server
|
||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||
url_getUploadID = "$fileserver_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
uploadid = responseJson["id"]
|
||||
uploadtoken = responseJson["uploadToken"]
|
||||
|
||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
||||
url_upload = "$fileserver_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
|
||||
form = HTTP.Form(Dict("file" => file_multipart))
|
||||
httpResponse = HTTP.post(url_upload, headers, form)
|
||||
responseJson = JSON.parse(String(httpResponse.body))
|
||||
|
||||
fileid = responseJson["id"]
|
||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
||||
|
||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
|
||||
function send_chat_message(
|
||||
text::String,
|
||||
image_path::Union{String, Nothing}=nothing,
|
||||
audio_path::Union{String, Nothing}=nothing
|
||||
)
|
||||
# Build payloads list
|
||||
payloads = [("message_text", text, "text")]
|
||||
|
||||
if image_path !== nothing
|
||||
image_data = read(image_path, Vector{UInt8})
|
||||
push!(payloads, ("user_image", image_data, "image"))
|
||||
end
|
||||
|
||||
if audio_path !== nothing
|
||||
audio_data = read(audio_path, Vector{UInt8})
|
||||
push!(payloads, ("user_audio", audio_data, "audio"))
|
||||
end
|
||||
|
||||
env = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
payloads,
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserverUploadHandler = plik_upload_handler,
|
||||
size_threshold = 1_000_000,
|
||||
correlation_id = string(uuid4()),
|
||||
msg_purpose = "chat",
|
||||
sender_name = "user-1"
|
||||
)
|
||||
|
||||
println("Message sent with correlation ID: $(env.correlationId)")
|
||||
end
|
||||
|
||||
function receive_chat_messages()
|
||||
function message_handler(msg::NATS.Msg)
|
||||
payloads = NATSBridge.smartreceive(
|
||||
msg,
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
)
|
||||
|
||||
println("\n--- New Message ---")
|
||||
for (dataname, data, data_type) in payloads
|
||||
if data_type == "text"
|
||||
println("Text: $data")
|
||||
elseif data_type == "image"
|
||||
filename = "received_$dataname.bin"
|
||||
write(filename, data)
|
||||
println("Image saved: $filename")
|
||||
elseif data_type == "audio"
|
||||
filename = "received_$dataname.bin"
|
||||
write(filename, data)
|
||||
println("Audio saved: $filename")
|
||||
elseif data_type == "table"
|
||||
println("Table received:")
|
||||
data = DataFrame(data)
|
||||
show(data)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
NATS.subscribe(SUBJECT) do msg
|
||||
message_handler(msg)
|
||||
end
|
||||
println("Subscribed to: $SUBJECT")
|
||||
end
|
||||
|
||||
function run_interactive_chat()
|
||||
println("\n=== Interactive Chat ===")
|
||||
println("1. Send a message")
|
||||
println("2. Join a chat room")
|
||||
println("3. Exit")
|
||||
|
||||
while true
|
||||
print("\nSelect option (1-3): ")
|
||||
choice = readline()
|
||||
|
||||
if choice == "1"
|
||||
print("Enter message text: ")
|
||||
text = readline()
|
||||
send_chat_message(text)
|
||||
elseif choice == "2"
|
||||
receive_chat_messages()
|
||||
elseif choice == "3"
|
||||
break
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end # module
|
||||
|
||||
# Run the chat app
|
||||
using .ChatApp
|
||||
ChatApp.run_interactive_chat()
|
||||
```
|
||||
|
||||
## Step 10: Testing the Chat System
|
||||
|
||||
### Test Scenario 1: Text-Only Chat
|
||||
|
||||
```bash
|
||||
# Terminal 1: Start the chat receiver
|
||||
julia test_julia_to_julia_text_receiver.jl
|
||||
|
||||
# Terminal 2: Send a message
|
||||
julia test_julia_to_julia_text_sender.jl
|
||||
```
|
||||
|
||||
### Test Scenario 2: Image Chat
|
||||
|
||||
```bash
|
||||
# Terminal 1: Receive messages
|
||||
julia test_julia_to_julia_mix_payloads_receiver.jl
|
||||
|
||||
# Terminal 2: Send image
|
||||
julia test_julia_to_julia_mix_payload_sender.jl
|
||||
```
|
||||
|
||||
### Test Scenario 3: Large File Transfer
|
||||
|
||||
```bash
|
||||
# Terminal 2: Send large file
|
||||
julia test_julia_to_julia_mix_payload_sender.jl
|
||||
```
|
||||
|
||||
## Conclusion
|
||||
|
||||
This walkthrough demonstrated how to build a chat system using NATSBridge.jl with support for:
|
||||
|
||||
- Text messages
|
||||
- Images (direct transport for small, link transport for large)
|
||||
- Audio files
|
||||
- Video files
|
||||
- Tabular data (DataFrames)
|
||||
- Bidirectional communication
|
||||
- Mixed-content messages
|
||||
|
||||
The key takeaways are:
|
||||
|
||||
1. **Always wrap payloads in a list** - Even for single payloads: `[("dataname", data, "type")]`
|
||||
2. **Use appropriate transport** - NATSBridge automatically handles size-based routing
|
||||
3. **Support mixed content** - Multiple payloads of different types in one message
|
||||
4. **Handle errors** - Implement proper error handling for network failures
|
||||
5. **Use correlation IDs** - Track messages across distributed systems
|
||||
|
||||
For more information, see:
|
||||
- [`docs/architecture.md`](./docs/architecture.md) - Detailed architecture documentation
|
||||
- [`docs/implementation.md`](./docs/implementation.md) - Implementation details
|
||||
Reference in New Issue
Block a user