update
This commit is contained in:
@@ -335,6 +335,258 @@ env, env_json_str = NATSBridge.smartsend(
|
||||
|
||||
---
|
||||
|
||||
### Mixed-Content Transport Examples
|
||||
|
||||
The NATSBridge.jl library supports sending mixed content types in a single message, with automatic transport selection based on payload size. Small payloads (< 1MB threshold) use direct transport (Base64-encoded in NATS message), while large payloads (≥ 1MB) use link transport (uploaded to file server, URL sent via NATS).
|
||||
|
||||
#### Julia Mixed-Content Example
|
||||
|
||||
```julia
|
||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
# Configuration
|
||||
const SUBJECT = "/natsbridge"
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
const FILESERVER_URL = "http://localhost:8080"
|
||||
|
||||
# Create sample data - mix of small and large payloads
|
||||
text_data = "Hello! This is a test chat message. 🎉"
|
||||
|
||||
dict_data = Dict(
|
||||
"type" => "chat",
|
||||
"sender" => "serviceA",
|
||||
"content" => Dict("text" => "JSON-formatted message", "format" => "markdown")
|
||||
)
|
||||
|
||||
# Small arrow table (< 1MB) - direct transport
|
||||
arrow_table_small = DataFrame(
|
||||
id = 1:10,
|
||||
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
|
||||
score = rand(50:100, 10),
|
||||
active = rand([true, false], 10)
|
||||
)
|
||||
|
||||
# Large arrow table (≥ 1MB) - link transport
|
||||
arrow_table_large = DataFrame(
|
||||
id = 1:200_000,
|
||||
name = ["user_$i" for i in 1:200_000],
|
||||
score = rand(50:100, 200_000),
|
||||
active = rand([true, false], 200_000)
|
||||
)
|
||||
|
||||
# Small jsontable (< 1MB) - direct transport
|
||||
json_table_small = DataFrame(
|
||||
id = 1:10,
|
||||
name = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"],
|
||||
score = rand(50:100, 10)
|
||||
)
|
||||
|
||||
# Large jsontable (≥ 1MB) - link transport
|
||||
json_table_large = DataFrame(
|
||||
id = 1:1_500_000,
|
||||
name = ["user_$i" for i in 1:1_500_000],
|
||||
score = rand(50:100, 1_500_000)
|
||||
)
|
||||
|
||||
# Binary data (small) - direct transport
|
||||
binary_data_small = UInt8[rand(1:255) for _ in 1:200]
|
||||
|
||||
# Binary data (large) - link transport
|
||||
binary_data_large = UInt8[rand(1:255) for _ in 1:1_500_000]
|
||||
|
||||
# Read image file
|
||||
file_path_small_image = "./test/small_image.jpg"
|
||||
file_data_small_image = read(file_path_small_image)
|
||||
filename_small_image = basename(file_path_small_image)
|
||||
|
||||
file_path_large_image = "./test/large_image.png"
|
||||
file_data_large_image = read(file_path_large_image)
|
||||
filename_large_image = basename(file_path_large_image)
|
||||
|
||||
# Create payloads list - mixed content with both small and large data
|
||||
payloads = [
|
||||
# Small data (direct transport)
|
||||
("chat_text", text_data, "text"),
|
||||
("chat_json", dict_data, "dictionary"),
|
||||
("arrow_table_small", arrow_table_small, "arrowtable"),
|
||||
("json_table_small", json_table_small, "jsontable"),
|
||||
(filename_small_image, file_data_small_image, "binary"),
|
||||
|
||||
# Large data (link transport)
|
||||
("arrow_table_large", arrow_table_large, "arrowtable"),
|
||||
("json_table_large", json_table_large, "jsontable"),
|
||||
(filename_large_image, file_data_large_image, "binary"),
|
||||
("binary_file_large", binary_data_large, "binary")
|
||||
]
|
||||
|
||||
# Use smartsend with mixed content
|
||||
correlation_id = string(uuid4())
|
||||
sendinfo = NATSBridge.smartsend(
|
||||
SUBJECT,
|
||||
payloads;
|
||||
broker_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL,
|
||||
fileserver_upload_handler = plik_oneshot_upload,
|
||||
size_threshold = 1_000_000, # 1MB threshold
|
||||
correlation_id = correlation_id,
|
||||
msg_purpose = "chat",
|
||||
sender_name = "mix_sender",
|
||||
receiver_name = "",
|
||||
receiver_id = "",
|
||||
reply_to = "",
|
||||
reply_to_msg_id = "",
|
||||
is_publish = true
|
||||
)
|
||||
|
||||
env, env_json_str = sendinfo
|
||||
|
||||
# Log transport type for each payload
|
||||
for (i, payload) in enumerate(env.payloads)
|
||||
println("Payload $i ('$payload.dataname'):")
|
||||
println(" Transport: $(payload.transport)")
|
||||
println(" Type: $(payload.payload_type)")
|
||||
println(" Size: $(payload.size) bytes")
|
||||
println(" Encoding: $(payload.encoding)")
|
||||
|
||||
if payload.transport == "link"
|
||||
println(" URL: $(payload.data)")
|
||||
end
|
||||
end
|
||||
|
||||
# Summary
|
||||
direct_count = count(p -> p.transport == "direct", env.payloads)
|
||||
link_count = count(p -> p.transport == "link", env.payloads)
|
||||
println("\n--- Transport Summary ---")
|
||||
println("Direct transport: $direct_count payloads")
|
||||
println("Link transport: $link_count payloads")
|
||||
```
|
||||
|
||||
**Expected Output:**
|
||||
```
|
||||
Payload 1 ('chat_text'):
|
||||
Transport: direct
|
||||
Type: text
|
||||
Size: 38 bytes
|
||||
Encoding: base64
|
||||
|
||||
Payload 2 ('chat_json'):
|
||||
Transport: direct
|
||||
Type: dictionary
|
||||
Size: 156 bytes
|
||||
Encoding: json
|
||||
|
||||
Payload 3 ('arrow_table_small'):
|
||||
Transport: direct
|
||||
Type: arrowtable
|
||||
Size: 1245 bytes
|
||||
Encoding: arrow-ipc
|
||||
|
||||
Payload 4 ('json_table_small'):
|
||||
Transport: direct
|
||||
Type: jsontable
|
||||
Size: 892 bytes
|
||||
Encoding: json
|
||||
|
||||
Payload 5 ('small_image.jpg'):
|
||||
Transport: direct
|
||||
Type: binary
|
||||
Size: 73269 bytes
|
||||
Encoding: base64
|
||||
|
||||
Payload 6 ('arrow_table_large'):
|
||||
Transport: link
|
||||
Type: arrowtable
|
||||
Size: 5242880 bytes
|
||||
Encoding: arrow-ipc
|
||||
URL: http://localhost:8080/file/ABC123/DEF456/arrow_table_large.arrow
|
||||
|
||||
Payload 7 ('json_table_large'):
|
||||
Transport: link
|
||||
Type: jsontable
|
||||
Size: 45678900 bytes
|
||||
Encoding: json
|
||||
URL: http://localhost:8080/file/GHI789/JKL012/json_table_large.json
|
||||
|
||||
Payload 8 ('large_image.png'):
|
||||
Transport: link
|
||||
Type: binary
|
||||
Size: 1168437 bytes
|
||||
Encoding: base64
|
||||
URL: http://localhost:8080/file/MNO345/PQR678/large_image.png
|
||||
|
||||
Payload 9 ('binary_file_large'):
|
||||
Transport: link
|
||||
Type: binary
|
||||
Size: 1500000 bytes
|
||||
Encoding: base64
|
||||
URL: http://localhost:8080/file/STU901/VWX234/binary_file_large.bin
|
||||
|
||||
--- Transport Summary ---
|
||||
Direct transport: 5 payloads
|
||||
Link transport: 4 payloads
|
||||
```
|
||||
|
||||
#### Receiver Example
|
||||
|
||||
```julia
|
||||
using NATS, JSON, UUIDs, Dates, PrettyPrinting, DataFrames, Arrow, HTTP, Base64
|
||||
include("../src/NATSBridge.jl")
|
||||
using .NATSBridge
|
||||
|
||||
const SUBJECT = "/natsbridge"
|
||||
const NATS_URL = "nats://localhost:4222"
|
||||
|
||||
conn = NATS.connect(NATS_URL)
|
||||
NATS.subscribe(conn, SUBJECT) do msg
|
||||
# Use NATSBridge.smartreceive to handle the data
|
||||
result = NATSBridge.smartreceive(
|
||||
msg;
|
||||
max_retries = 5,
|
||||
base_delay = 100,
|
||||
max_delay = 5000
|
||||
)
|
||||
|
||||
println("Received $(length(result["payloads"])) payloads")
|
||||
|
||||
# Process each payload
|
||||
for (dataname, data, data_type) in result["payloads"]
|
||||
println("\n=== Payload: $dataname (type: $data_type) ===")
|
||||
|
||||
if data_type == "arrowtable"
|
||||
if isa(data, Arrow.Table)
|
||||
df = DataFrame(data)
|
||||
println(" Type: Arrow.Table")
|
||||
println(" Size: $(size(df, 1)) rows x $(size(df, 2)) columns")
|
||||
end
|
||||
elseif data_type == "jsontable"
|
||||
if isa(data, Vector{Any})
|
||||
df = DataFrame(data)
|
||||
println(" Type: Vector{Dict}")
|
||||
println(" Size: $(length(data)) rows")
|
||||
end
|
||||
elseif data_type == "text"
|
||||
if isa(data, String)
|
||||
println(" Type: String")
|
||||
println(" Length: $(length(data)) characters")
|
||||
end
|
||||
elseif data_type in ["image", "audio", "video", "binary"]
|
||||
if isa(data, Vector{UInt8})
|
||||
println(" Type: Vector{UInt8} (binary)")
|
||||
println(" Size: $(length(data)) bytes")
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Keep listening
|
||||
sleep(120)
|
||||
NATS.drain(conn)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Architecture
|
||||
|
||||
### Cross-Platform Claim-Check Pattern
|
||||
@@ -418,10 +670,12 @@ env, env_json_str = NATSBridge.smartsend(...)
|
||||
using Pkg
|
||||
Pkg.add("NATS")
|
||||
Pkg.add("Arrow")
|
||||
Pkg.add("JSON3")
|
||||
Pkg.add("JSON")
|
||||
Pkg.add("HTTP")
|
||||
Pkg.add("UUIDs")
|
||||
Pkg.add("Dates")
|
||||
Pkg.add("PrettyPrinting")
|
||||
Pkg.add("DataFrames")
|
||||
```
|
||||
|
||||
### JavaScript Dependencies (Node.js)
|
||||
@@ -905,6 +1159,8 @@ end
|
||||
|
||||
#### plik_oneshot_upload Implementation
|
||||
|
||||
**Overload 1: Upload from binary data**
|
||||
|
||||
```julia
|
||||
function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8})
|
||||
# Get upload id
|
||||
@@ -940,6 +1196,57 @@ function plik_oneshot_upload(file_server_url::String, dataname::String, data::Ve
|
||||
end
|
||||
```
|
||||
|
||||
**Overload 2: Upload from file path**
|
||||
|
||||
```julia
|
||||
function plik_oneshot_upload(file_server_url::String, filepath::String)
|
||||
# Get upload id
|
||||
filename = basename(filepath)
|
||||
url_getUploadID = "$file_server_url/upload"
|
||||
headers = ["Content-Type" => "application/json"]
|
||||
body = """{ "OneShot" : true }"""
|
||||
http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||
response_json = JSON.parse(http_response.body)
|
||||
|
||||
uploadid = response_json["id"]
|
||||
uploadtoken = response_json["uploadToken"]
|
||||
|
||||
# Upload file
|
||||
url_upload = "$file_server_url/file/$uploadid"
|
||||
headers = ["X-UploadToken" => uploadtoken]
|
||||
http_response = open(filepath, "r") do file_stream
|
||||
form = HTTP.Form(Dict("file" => file_stream))
|
||||
|
||||
# Adding status_exception=false prevents 4xx/5xx from triggering 'catch'
|
||||
HTTP.post(url_upload, headers, form; status_exception = false)
|
||||
end
|
||||
|
||||
if !isnothing(http_response) && http_response.status == 200
|
||||
# Success - response already logged by caller
|
||||
else
|
||||
error("Failed to upload file: server returned status $(http_response.status)")
|
||||
end
|
||||
response_json = JSON.parse(http_response.body)
|
||||
fileid = response_json["id"]
|
||||
|
||||
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
|
||||
url = "$file_server_url/file/$uploadid/$fileid/$filename"
|
||||
|
||||
return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||
end
|
||||
```
|
||||
|
||||
**Usage Examples:**
|
||||
|
||||
```julia
|
||||
# Upload binary data
|
||||
data = read("./test_image.jpg")
|
||||
result = plik_oneshot_upload("http://localhost:8080", "my_image.jpg", data)
|
||||
|
||||
# Upload file directly from disk
|
||||
result = plik_oneshot_upload("http://localhost:8080", "./test_image.jpg")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### JavaScript Implementation
|
||||
|
||||
Reference in New Issue
Block a user