diff --git a/Plik_API_document.md b/Plik_API_document.md deleted file mode 100644 index bfef4a4..0000000 --- a/Plik_API_document.md +++ /dev/null @@ -1,194 +0,0 @@ -### API -Plik server expose a REST-full API to manage uploads and get files : - -Get and create upload : - - - **POST** /upload - - Params (json object in request body) : - - oneshot (bool) - - stream (bool) - - removable (bool) - - ttl (int) - - login (string) - - password (string) - - files (see below) - - Return : - JSON formatted upload object. - Important fields : - - id (required to upload files) - - uploadToken (required to upload/remove files) - - files (see below) - - For stream mode you need to know the file id before the upload starts as it will block. - File size and/or file type also need to be known before the upload starts as they have to be printed - in HTTP response headers. - To get the file ids pass a "files" json object with each file you are about to upload. - Fill the reference field with an arbitrary string to avoid matching file ids using the fileName field. - This is also used to notify of MISSING files when file upload is not yet finished or has failed. - ``` - "files" : [ - { - "fileName": "file.txt", - "fileSize": 12345, - "fileType": "text/plain", - "reference": "0" - },... - ] - ``` - - - **GET** /upload/:uploadid: - - Get upload metadata (files list, upload date, ttl,...) - -Upload file : - - - **POST** /$mode/:uploadid:/:fileid:/:filename: - - Request body must be a multipart request with a part named "file" containing file data. - - - **POST** /file/:uploadid: - - Same as above without passing file id, won't work for stream mode. - - - **POST** /: - - Quick mode, automatically create an upload with default parameters and add the file to it. - -Get file : - - - **HEAD** /$mode/:uploadid:/:fileid:/:filename: - - Returns only HTTP headers. Useful to know Content-Type and Content-Length without downloading the file. Especially if upload has OneShot option enabled. - - - **GET** /$mode/:uploadid:/:fileid:/:filename: - - Download file. Filename **MUST** match. A browser, might try to display the file if it's a jpeg for example. You may try to force download with ?dl=1 in url. - - - **GET** /archive/:uploadid:/:filename: - - Download uploaded files in a zip archive. :filename: must end with .zip - -Remove file : - - - **DELETE** /$mode/:uploadid:/:fileid:/:filename: - - Delete file. Upload **MUST** have "removable" option enabled. - -Show server details : - - - **GET** /version - - Show plik server version, and some build information (build host, date, git revision,...) - - - **GET** /config - - Show plik server configuration (ttl values, max file size, ...) - - - **GET** /stats - - Get server statistics ( upload/file count, user count, total size used ) - - Admin only - -User authentication : - - - - Plik can authenticate users using Google and/or OVH third-party API. - The /auth API is designed for the Plik web application nevertheless if you want to automatize it be sure to provide a valid - Referrer HTTP header and forward all session cookies. - Plik session cookies have the "secure" flag set, so they can only be transmitted over secure HTTPS connections. - To avoid CSRF attacks the value of the plik-xsrf cookie MUST be copied in the X-XSRFToken HTTP header of each - authenticated request. - Once authenticated a user can generate upload tokens. Those tokens can be used in the X-PlikToken HTTP header used to link - an upload to the user account. It can be put in the ~/.plikrc file of the Plik command line client. - - - **Local** : - - You'll need to create users using the server command line - - - **Google** : - - You'll need to create a new application in the [Google Developper Console](https://console.developers.google.com) - - You'll be handed a Google API ClientID and a Google API ClientSecret that you'll need to put in the plikd.cfg file - - Do not forget to whitelist valid origin and redirect url ( https://yourdomain/auth/google/callback ) for your domain - - - **OVH** : - - You'll need to create a new application in the OVH API : https://eu.api.ovh.com/createApp/ - - You'll be handed an OVH application key and an OVH application secret key that you'll need to put in the plikd.cfg file - - - **GET** /auth/google/login - - Get Google user consent URL. User have to visit this URL to authenticate - - - **GET** /auth/google/callback - - Callback of the user consent dialog - - The user will be redirected back to the web application with a Plik session cookie at the end of this call - - - **GET** /auth/ovh/login - - Get OVH user consent URL. User have to visit this URL to authenticate - - The response will contain a temporary session cookie to forward the API endpoint and OVH consumer key to the callback - - - **GET** /auth/ovh/callback - - Callback of the user consent dialog. - - The user will be redirected back to the web application with a Plik session cookie at the end of this call - - - **POST** /auth/local/login - - Params : - - login : user login - - password : user password - - - **GET** /auth/logout - - Invalidate Plik session cookies - - - **GET** /me - - Return basic user info ( ID, name, email ) and tokens - - - **DELETE** /me - - Remove user account. - - - **GET** /me/token - - List user tokens - - This call use pagination - - - **POST** /me/token - - Create a new upload token - - A comment can be passed in the json body - - - **DELETE** /me/token/{token} - - Revoke an upload token - - - **GET** /me/uploads - - List user uploads - - Params : - - token : filter by token - - This call use pagination - - - **DELETE** /me/uploads - - Remove all uploads linked to a user account - - Params : - - token : filter by token - - - **GET** /me/stats - - Get user statistics ( upload/file count, total size used ) - - - **GET** /users - - List all users - - This call use pagination - - Admin only - -QRCode : - - - **GET** /qrcode - - Generate a QRCode image from an url - - Params : - - url : The url you want to store in the QRCode - - size : The size of the generated image in pixels (default: 250, max: 1000) - - -$mode can be "file" or "stream" depending if stream mode is enabled. See FAQ for more details. - -Examples : -```sh -Create an upload (in the json response, you'll have upload id and upload token) -$ curl -X POST http://127.0.0.1:8080/upload - -Create a OneShot upload -$ curl -X POST -d '{ "OneShot" : true }' http://127.0.0.1:8080/upload - -Upload a file to upload -$ curl -X POST --header "X-UploadToken: M9PJftiApG1Kqr81gN3Fq1HJItPENMhl" -F "file=@test.txt" http://127.0.0.1:8080/file/IsrIPIsDskFpN12E - -Get headers -$ curl -I http://127.0.0.1:8080/file/IsrIPIsDskFpN12E/sFjIeokH23M35tN4/test.txt -HTTP/1.1 200 OK -Content-Disposition: filename=test.txt -Content-Length: 3486 -Content-Type: text/plain; charset=utf-8 -Date: Fri, 15 May 2015 09:16:20 GMT - -``` \ No newline at end of file diff --git a/docs/IMPLEMENTATION.md b/docs/implementation.md similarity index 60% rename from docs/IMPLEMENTATION.md rename to docs/implementation.md index 2523360..ba3ff2b 100644 --- a/docs/IMPLEMENTATION.md +++ b/docs/implementation.md @@ -10,24 +10,26 @@ The implementation uses a **standardized list-of-tuples format** for all payload **API Standard:** ```julia -# Input format for smartsend (always a list of tuples) -[(dataname1, data1), (dataname2, data2), ...] +# Input format for smartsend (always a list of tuples with type info) +[(dataname1, data1, type1), (dataname2, data2, type2), ...] -# Output format for smartreceive (always returns a list of tuples) -[(dataname1, data1), (dataname2, data2), ...] +# Output format for smartreceive (always returns a list of tuples with type info) +[(dataname1, data1, type1), (dataname2, data2, type2), ...] ``` +Where `type` can be: `"text"`, `"dictionary"`, `"table"`, `"image"`, `"audio"`, `"video"`, `"binary"` + **Examples:** ```julia -# Single payload - still wrapped in a list -smartsend("/test", [(dataname1, data1)], ...) +# Single payload - still wrapped in a list (type is required as third element) +smartsend("/test", [(dataname1, data1, "text")], ...) -# Multiple payloads in one message -smartsend("/test", [(dataname1, data1), (dataname2, data2)], ...) +# Multiple payloads in one message (each payload has its own type) +smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], ...) -# Receive always returns a list +# Receive always returns a list with type info payloads = smartreceive(msg, ...) -# payloads = [(dataname1, data1), (dataname2, data2), ...] +# payloads = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...] ``` ## Architecture @@ -139,26 +141,26 @@ node test/scenario3_julia_to_julia.js ```julia using NATSBridge -# Send multiple payloads in one message +# Send multiple payloads in one message (type is required per payload) smartsend( "/test", - [("dataname1", data1), ("dataname2", data2)], + [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")], nats_url="nats://localhost:4222", - fileserver_url="http://localhost:8080/upload", + fileserver_url="http://localhost:8080", metadata=Dict("custom_key" => "custom_value") ) -# Even single payload must be wrapped in a list -smartsend("/test", [("single_data", mydata)]) +# Even single payload must be wrapped in a list with type +smartsend("/test", [("single_data", mydata, "dictionary")]) ``` #### Julia (Receiver) ```julia using NATSBridge -# Receive returns a list of payloads -payloads = smartreceive(msg, "http://localhost:8080/upload") -# payloads = [(dataname1, data1), (dataname2, data2), ...] +# Receive returns a list of payloads with type info +payloads = smartreceive(msg, "http://localhost:8080") +# payloads = [(dataname1, data1, "dictionary"), (dataname2, data2, "table"), ...] ``` ### Scenario 1: Command & Control (Small JSON) @@ -229,8 +231,8 @@ df = DataFrame( category = rand(["A", "B", "C"], 10_000_000) ) -# Send via SmartSend - wrapped in a list -await SmartSend("analysis_results", [("table_data", df)], "table"); +# Send via SmartSend - wrapped in a list (type is part of each tuple) +await SmartSend("analysis_results", [("table_data", df, "table")]); ``` #### JavaScript (Receiver) @@ -287,9 +289,9 @@ end using NATSBridge function publish_health_status(nats_url) - # Send status wrapped in a list + # Send status wrapped in a list (type is part of each tuple) status = Dict("cpu" => rand(), "memory" => rand()) - smartsend("health", [("status", status)], "json", nats_url=nats_url) + smartsend("health", [("status", status, "dictionary")], nats_url=nats_url) sleep(5) # Every 5 seconds end ``` @@ -317,6 +319,145 @@ for await (const msg of consumer) { } ``` +### Scenario 5: Selection (Low Bandwidth) + +**Focus:** Small Arrow tables, Julia to JavaScript. The Action: Julia wants to send a small DataFrame to show on a JavaScript dashboard for the user to choose. + +**Julia (Sender):** +```julia +using NATSBridge +using DataFrames + +# Create small DataFrame (e.g., 50KB - 500KB) +options_df = DataFrame( + id = 1:10, + name = ["Option A", "Option B", "Option C", "Option D", "Option E", + "Option F", "Option G", "Option H", "Option I", "Option J"], + description = ["Description A", "Description B", "Description C", "Description D", "Description E", + "Description F", "Description G", "Description H", "Description I", "Description J"] +) + +# Convert to Arrow IPC stream +# Check payload size (< 1MB threshold) +# Publish directly to NATS with Base64-encoded payload +# Include metadata for dashboard selection context +smartsend( + "dashboard.selection", + [("options_table", options_df, "table")], + nats_url="nats://localhost:4222", + metadata=Dict("context" => "user_selection") +) +``` + +**JavaScript (Receiver):** +```javascript +const { SmartReceive } = require('./js_bridge'); + +// Receive NATS message with direct transport +const result = await SmartReceive(msg); + +// Decode Base64 payload +// Parse Arrow IPC with zero-copy +// Load into selection UI component (e.g., dropdown, table) +const table = result[2]; // Get the DataFrame from the tuple + +// User makes selection +const selection = uiComponent.getSelectedOption(); + +// Send selection back to Julia +await SmartSend("dashboard.response", [ + ("selected_option", selection, "dictionary") +]); +``` + +**Use Case:** Julia server generates a list of available options (e.g., file selections, configuration presets) as a small DataFrame and sends to JavaScript dashboard for user selection. The selection is then sent back to Julia for processing. + +### Scenario 6: Chat System + +**Focus:** Every conversational message is composed of any number and any combination of components, spanning the full spectrum from small to large. This includes text, images, audio, video, tables, and files—specifically accommodating everything from brief snippets to high-resolution images, large audio files, extensive tables, and massive documents. Support for claim-check delivery and full bi-directional messaging. + +**Multi-Payload Support:** The system supports mixed-payload messages where a single message can contain multiple payloads with different transport strategies. The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. + +**Julia (Sender/Receiver):** +```julia +using NATSBridge +using DataFrames + +# Build chat message with mixed payloads: +# - Text: direct transport (Base64) +# - Small images: direct transport (Base64) +# - Large images: link transport (HTTP URL) +# - Audio/video: link transport (HTTP URL) +# - Tables: direct or link depending on size +# - Files: link transport (HTTP URL) +# +# Each payload uses appropriate transport strategy: +# - Size < 1MB → direct (NATS + Base64) +# - Size >= 1MB → link (HTTP upload + NATS URL) +# +# Include claim-check metadata for delivery tracking +# Support bidirectional messaging with replyTo fields + +# Example: Chat with text, small image, and large file +chat_message = [ + ("message_text", "Hello, this is a test message!", "text"), + ("user_avatar", image_bytes, "image"), # Small image, direct transport + ("large_document", large_file_bytes, "binary") # Large file, link transport +] + +smartsend( + "chat.room123", + chat_message, + nats_url="nats://localhost:4222", + msg_purpose="chat", + reply_to="chat.room123.responses" +) +``` + +**JavaScript (Sender/Receiver):** +```javascript +const { SmartSend, SmartReceive } = require('./js_bridge'); + +// Build chat message with mixed content: +// - User input text: direct transport +// - Selected image: check size, use appropriate transport +// - Audio recording: link transport for large files +// - File attachment: link transport +// +// Parse received message: +// - Direct payloads: decode Base64 +// - Link payloads: fetch from HTTP with exponential backoff +// - Deserialize all payloads appropriately +// +// Render mixed content in chat interface +// Support bidirectional reply with claim-check delivery confirmation + +// Example: Send chat with mixed content +const message = [ + { + dataname: "text", + data: "Hello from JavaScript!", + type: "text" + }, + { + dataname: "image", + data: selectedImageBuffer, // Small image + type: "image" + }, + { + dataname: "audio", + data: audioUrl, // Large audio, link transport + type: "audio" + } +]; + +await SmartSend("chat.room123", message); +``` + +**Use Case:** Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components. + +**Implementation Note:** The `smartreceive` function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: `msgEnvelope_v1` supports `AbstractArray{msgPayload_v1}` for multiple payloads. + ## Configuration ### Environment Variables @@ -324,7 +465,7 @@ for await (const msg of consumer) { | Variable | Default | Description | |----------|---------|-------------| | `NATS_URL` | `nats://localhost:4222` | NATS server URL | -| `FILESERVER_URL` | `http://localhost:8080/upload` | HTTP file server URL | +| `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL (base URL without `/upload` suffix) | | `SIZE_THRESHOLD` | `1_000_000` | Size threshold in bytes (1MB) | ### Message Envelope Schema diff --git a/etc.jl b/etc.jl index 7542b76..85c5745 100644 --- a/etc.jl +++ b/etc.jl @@ -12,3 +12,10 @@ Can you write me the following test files: +1. create a tutorial file "tutorial_julia.md" for NATSBridge.jl +2. create a walkthrough file "walkthrough_julia.md" for NATSBridge.jl + +You may consult architecture.md for more info. + + + diff --git a/test/test_julia_to_julia_mix_sender.jl b/test/test_julia_to_julia_mix_payload_sender.jl similarity index 71% rename from test/test_julia_to_julia_mix_sender.jl rename to test/test_julia_to_julia_mix_payload_sender.jl index bfb1471..00a3951 100644 --- a/test/test_julia_to_julia_mix_sender.jl +++ b/test/test_julia_to_julia_mix_payload_sender.jl @@ -83,7 +83,7 @@ function create_sample_data() ) # Table data (DataFrame - small - direct transport) - table_data = DataFrame( + table_data_small = DataFrame( id = 1:10, message = ["msg_$i" for i in 1:10], sender = ["sender_$i" for i in 1:10], @@ -91,6 +91,16 @@ function create_sample_data() priority = rand(1:3, 10) ) + # Table data (DataFrame - large - link transport) + # ~1.5MB of data (150,000 rows) - should trigger link transport + table_data_large = DataFrame( + id = 1:150_000, + message = ["msg_$i" for i in 1:150_000], + sender = ["sender_$i" for i in 1:150_000], + timestamp = [string(Dates.now()) for i in 1:150_000], + priority = rand(1:3, 150_000) + ) + # Image data (small binary - direct transport) # Create a simple 10x10 pixel PNG-like data (128 bytes header + 100 pixels = 112 bytes) # Using simple RGB data (10*10*3 = 300 bytes of pixel data) @@ -104,25 +114,52 @@ function create_sample_data() push!(image_data, 0xFF, 0x00, 0x00) # Red pixel end + # Image data (large - link transport) + # Create a larger image (~1.5MB) to test link transport + large_image_width = 500 + large_image_height = 1000 + large_image_data = UInt8[] + # PNG header (simplified for 500x1000) + push!(large_image_data, 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A) + # RGB data (500*1000*3 = 1,500,000 bytes) + for i in 1:large_image_width*large_image_height + push!(large_image_data, rand(1:255), rand(1:255), rand(1:255)) # Random color pixels + end + # Audio data (small binary - direct transport) - # Create a simple audio-like data (100 bytes) audio_data = UInt8[rand(1:255) for _ in 1:100] + # Audio data (large - link transport) + # ~1.5MB of audio-like data + large_audio_data = UInt8[rand(1:255) for _ in 1:1_500_000] + # Video data (small binary - direct transport) - # Create a simple video-like data (150 bytes) video_data = UInt8[rand(1:255) for _ in 1:150] + # Video data (large - link transport) + # ~1.5MB of video-like data + large_video_data = UInt8[rand(1:255) for _ in 1:1_500_000] + # Binary data (small - direct transport) binary_data = UInt8[rand(1:255) for _ in 1:200] + # Binary data (large - link transport) + # ~1.5MB of binary data + large_binary_data = UInt8[rand(1:255) for _ in 1:1_500_000] + return ( text_data, dict_data, - table_data, + table_data_small, + table_data_large, image_data, + large_image_data, audio_data, + large_audio_data, video_data, - binary_data + large_video_data, + binary_data, + large_binary_data ) end @@ -130,17 +167,22 @@ end # Sender: Send mixed content via smartsend function test_mix_send() # Create sample data - (text_data, dict_data, table_data, image_data, audio_data, video_data, binary_data) = create_sample_data() + (text_data, dict_data, table_data_small, table_data_large, image_data, large_image_data, audio_data, large_audio_data, video_data, large_video_data, binary_data, large_binary_data) = create_sample_data() - # Create payloads list - mixed content with different types + # Create payloads list - mixed content with both small and large data + # Small data uses direct transport, large data uses link transport payloads = [ + # Small data (direct transport) - text, dictionary, small table ("chat_text", text_data, "text"), ("chat_json", dict_data, "dictionary"), - ("chat_table", table_data, "table"), - ("user_image", image_data, "image"), - ("audio_clip", audio_data, "audio"), - ("video_clip", video_data, "video"), - ("binary_file", binary_data, "binary") + ("chat_table_small", table_data_small, "table"), + + # Large data (link transport) - large table, large image, large audio, large video, large binary + ("chat_table_large", table_data_large, "table"), + ("user_image_large", large_image_data, "image"), + ("audio_clip_large", large_audio_data, "audio"), + ("video_clip_large", large_video_data, "video"), + ("binary_file_large", large_binary_data, "binary") ] # Use smartsend with mixed content diff --git a/test/test_julia_to_julia_mix_receiver.jl b/test/test_julia_to_julia_mix_payloads_receiver.jl similarity index 91% rename from test/test_julia_to_julia_mix_receiver.jl rename to test/test_julia_to_julia_mix_payloads_receiver.jl index 312c2fe..e5c693a 100644 --- a/test/test_julia_to_julia_mix_receiver.jl +++ b/test/test_julia_to_julia_mix_payloads_receiver.jl @@ -193,6 +193,21 @@ function test_mix_receive() log_trace("Audio payloads: $audio_count") log_trace("Video payloads: $video_count") log_trace("Binary payloads: $binary_count") + + # Print transport type info for each payload if available + println("\n=== Payload Details ===") + for (dataname, data, data_type) in result + if data_type in ["image", "audio", "video", "binary"] + log_trace("$dataname: $(length(data)) bytes (binary)") + elseif data_type == "table" + data = DataFrame(data) + log_trace("$dataname: $(size(data, 1)) rows x $(size(data, 2)) columns (DataFrame)") + elseif data_type == "dictionary" + log_trace("$dataname: $(length(JSON.json(data))) bytes (Dict)") + elseif data_type == "text" + log_trace("$dataname: $(length(data)) characters (String)") + end + end end # Keep listening for 2 minutes diff --git a/tutorial_julia.md b/tutorial_julia.md new file mode 100644 index 0000000..2e395df --- /dev/null +++ b/tutorial_julia.md @@ -0,0 +1,634 @@ +# NATSBridge.jl Tutorial + +A comprehensive tutorial for learning how to use NATSBridge.jl for bi-directional communication between Julia and JavaScript services using NATS. + +## Table of Contents + +1. [What is NATSBridge.jl?](#what-is-natsbridgejl) +2. [Key Concepts](#key-concepts) +3. [Installation](#installation) +4. [Basic Usage](#basic-usage) +5. [Payload Types](#payload-types) +6. [Transport Strategies](#transport-strategies) +7. [Advanced Features](#advanced-features) +8. [Complete Examples](#complete-examples) + +--- + +## What is NATSBridge.jl? + +NATSBridge.jl is a Julia module that provides a high-level API for sending and receiving data across network boundaries using NATS as the message bus. It implements the **Claim-Check pattern** for handling large payloads efficiently. + +### Core Features + +- **Bi-directional communication**: Julia ↔ JavaScript +- **Smart transport selection**: Automatic direct vs link transport based on payload size +- **Multi-payload support**: Send multiple payloads of different types in a single message +- **Claim-check pattern**: Upload large files to HTTP server, send only URLs via NATS +- **Type-aware serialization**: Different serialization strategies for different data types + +--- + +## Key Concepts + +### 1. msgEnvelope_v1 (Message Envelope) + +The `msgEnvelope_v1` structure provides a comprehensive message format for bidirectional communication: + +```julia +struct msgEnvelope_v1 + correlationId::String # Unique identifier to track messages + msgId::String # This message id + timestamp::String # Message published timestamp + + sendTo::String # Topic/subject the sender sends to + msgPurpose::String # Purpose (ACK | NACK | updateStatus | shutdown | chat) + senderName::String # Sender name (e.g., "agent-wine-web-frontend") + senderId::String # Sender id (uuid4) + receiverName::String # Message receiver name (e.g., "agent-backend") + receiverId::String # Message receiver id (uuid4 or nothing for broadcast) + replyTo::String # Topic to reply to + replyToMsgId::String # Message id this message is replying to + brokerURL::String # NATS server address + + metadata::Dict{String, Any} + payloads::AbstractArray{msgPayload_v1} # Multiple payloads stored here +end +``` + +### 2. msgPayload_v1 (Payload Structure) + +The `msgPayload_v1` structure provides flexible payload handling: + +```julia +struct msgPayload_v1 + id::String # Id of this payload (e.g., "uuid4") + dataname::String # Name of this payload (e.g., "login_image") + type::String # "text | dictionary | table | image | audio | video | binary" + transport::String # "direct | link" + encoding::String # "none | json | base64 | arrow-ipc" + size::Integer # Data size in bytes + data::Any # Payload data in case of direct transport or a URL in case of link + metadata::Dict{String, Any} # Dict("checksum" => "sha256_hash", ...) +end +``` + +### 3. Standard API Format + +The system uses a **standardized list-of-tuples format** for all payload operations: + +```julia +# Input format for smartsend (always a list of tuples with type info) +[(dataname1, data1, type1), (dataname2, data2, type2), ...] + +# Output format for smartreceive (always returns a list of tuples) +[(dataname1, data1, type1), (dataname2, data2, type2), ...] +``` + +**Important**: Even when sending a single payload, you must wrap it in a list. + +--- + +## Installation + +```julia +using Pkg +Pkg.add("NATS") +Pkg.add("JSON") +Pkg.add("Arrow") +Pkg.add("HTTP") +Pkg.add("UUIDs") +Pkg.add("Dates") +Pkg.add("Base64") +Pkg.add("PrettyPrinting") +Pkg.add("DataFrames") +``` + +Then include the NATSBridge module: + +```julia +include("NATSBridge.jl") +using .NATSBridge +``` + +--- + +## Basic Usage + +### Sending Data (smartsend) + +```julia +using NATSBridge + +# Send a simple dictionary +data = Dict("key" => "value") +env = NATSBridge.smartsend("my.subject", [("dataname1", data, "dictionary")]) +``` + +### Receiving Data (smartreceive) + +```julia +using NATSBridge + +# Subscribe to a NATS subject +NATS.subscribe("my.subject") do msg + # Process the message + result = NATSBridge.smartreceive( + msg, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + # result is a list of (dataname, data, type) tuples + for (dataname, data, type) in result + println("Received $dataname of type $type") + println("Data: $data") + end +end +``` + +--- + +## Payload Types + +NATSBridge.jl supports the following payload types: + +| Type | Description | Serialization | +|------|-------------|---------------| +| `text` | Plain text | UTF-8 encoding | +| `dictionary` | JSON-serializable data (Dict, NamedTuple) | JSON | +| `table` | Tabular data (DataFrame, array of structs) | Apache Arrow IPC | +| `image` | Image data (Bitmap, PNG/JPG bytes) | Binary | +| `audio` | Audio data (WAV, MP3 bytes) | Binary | +| `video` | Video data (MP4, AVI bytes) | Binary | +| `binary` | Generic binary data | Binary | + +--- + +## Transport Strategies + +NATSBridge.jl automatically selects the appropriate transport strategy based on payload size: + +### Direct Transport (< 1MB) + +Small payloads are encoded as Base64 and sent directly over NATS. + +```julia +# Small data (< 1MB) - uses direct transport +small_data = rand(1000) # ~8KB +env = NATSBridge.smartsend("small", [("data", small_data, "table")]) +``` + +### Link Transport (≥ 1MB) + +Large payloads are uploaded to an HTTP file server, and only the URL is sent via NATS. + +```julia +# Large data (≥ 1MB) - uses link transport +large_data = rand(10_000_000) # ~80MB +env = NATSBridge.smartsend("large", [("data", large_data, "table")]) +``` + +--- + +## Complete Examples + +### Example 1: Text Message + +**Sender:** +```julia +using NATSBridge +using UUIDs + +const SUBJECT = "/NATSBridge_text_test" +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" + +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function test_text_send() + small_text = "Hello, this is a small text message." + large_text = join(["Line $i: " for i in 1:50000], "") + + data1 = ("small_text", small_text, "text") + data2 = ("large_text", large_text, "text") + + env = NATSBridge.smartsend( + SUBJECT, + [data1, data2], + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "text_sender" + ) +end +``` + +**Receiver:** +```julia +using NATSBridge + +const SUBJECT = "/NATSBridge_text_test" +const NATS_URL = "nats://localhost:4222" + +function test_text_receive() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + result = NATSBridge.smartreceive( + msg, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + for (dataname, data, data_type) in result + if data_type == "text" + println("Received text: $data") + write("./received_$dataname.txt", data) + end + end + end + sleep(120) + NATS.drain(conn) +end +``` + +### Example 2: Dictionary (JSON) Message + +**Sender:** +```julia +using NATSBridge +using UUIDs + +const SUBJECT = "/NATSBridge_dict_test" +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" + +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function test_dict_send() + small_dict = Dict("name" => "Alice", "age" => 30) + large_dict = Dict("ids" => collect(1:50000), "names" => ["User_$i" for i in 1:50000]) + + data1 = ("small_dict", small_dict, "dictionary") + data2 = ("large_dict", large_dict, "dictionary") + + env = NATSBridge.smartsend( + SUBJECT, + [data1, data2], + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat" + ) +end +``` + +**Receiver:** +```julia +using NATSBridge + +const SUBJECT = "/NATSBridge_dict_test" +const NATS_URL = "nats://localhost:4222" + +function test_dict_receive() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + result = NATSBridge.smartreceive( + msg, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + for (dataname, data, data_type) in result + if data_type == "dictionary" + println("Received dictionary: $data") + write("./received_$dataname.json", JSON.json(data, 2)) + end + end + end + sleep(120) + NATS.drain(conn) +end +``` + +### Example 3: DataFrame (Table) Message + +**Sender:** +```julia +using NATSBridge +using DataFrames +using UUIDs + +const SUBJECT = "/NATSBridge_table_test" +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" + +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function test_table_send() + small_df = DataFrame(id = 1:10, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) + large_df = DataFrame(id = 1:50000, name = ["User_$i" for i in 1:50000], score = rand(1:100, 50000)) + + data1 = ("small_table", small_df, "table") + data2 = ("large_table", large_df, "table") + + env = NATSBridge.smartsend( + SUBJECT, + [data1, data2], + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat" + ) +end +``` + +**Receiver:** +```julia +using NATSBridge +using DataFrames + +const SUBJECT = "/NATSBridge_table_test" +const NATS_URL = "nats://localhost:4222" + +function test_table_receive() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + result = NATSBridge.smartreceive( + msg, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + for (dataname, data, data_type) in result + if data_type == "table" + data = DataFrame(data) + println("Received DataFrame with $(size(data, 1)) rows") + display(data[1:min(5, size(data, 1)), :]) + end + end + end + sleep(120) + NATS.drain(conn) +end +``` + +### Example 4: Mixed Content (Chat with Text, Image, Audio) + +**Sender:** +```julia +using NATSBridge +using DataFrames +using UUIDs + +const SUBJECT = "/NATSBridge_mix_test" +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" + +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function test_mix_send() + # Text data + text_data = "Hello! This is a test chat message. 🎉" + + # Dictionary data + dict_data = Dict("type" => "chat", "sender" => "serviceA") + + # Small table data + table_data_small = DataFrame(id = 1:10, name = ["msg_$i" for i in 1:10]) + + # Large table data (link transport) + table_data_large = DataFrame(id = 1:150_000, name = ["msg_$i" for i in 1:150_000]) + + # Small image data (direct transport) + image_data = UInt8[rand(1:255) for _ in 1:100] + + # Large image data (link transport) + large_image_data = UInt8[rand(1:255) for _ in 1:1_500_000] + + # Small audio data (direct transport) + audio_data = UInt8[rand(1:255) for _ in 1:100] + + # Large audio data (link transport) + large_audio_data = UInt8[rand(1:255) for _ in 1:1_500_000] + + # Small video data (direct transport) + video_data = UInt8[rand(1:255) for _ in 1:150] + + # Large video data (link transport) + large_video_data = UInt8[rand(1:255) for _ in 1:1_500_000] + + # Small binary data (direct transport) + binary_data = UInt8[rand(1:255) for _ in 1:200] + + # Large binary data (link transport) + large_binary_data = UInt8[rand(1:255) for _ in 1:1_500_000] + + # Create payloads list - mixed content + payloads = [ + # Small data (direct transport) + ("chat_text", text_data, "text"), + ("chat_json", dict_data, "dictionary"), + ("chat_table_small", table_data_small, "table"), + + # Large data (link transport) + ("chat_table_large", table_data_large, "table"), + ("user_image_large", large_image_data, "image"), + ("audio_clip_large", large_audio_data, "audio"), + ("video_clip_large", large_video_data, "video"), + ("binary_file_large", large_binary_data, "binary") + ] + + env = NATSBridge.smartsend( + SUBJECT, + payloads, + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "mix_sender" + ) +end +``` + +**Receiver:** +```julia +using NATSBridge +using DataFrames + +const SUBJECT = "/NATSBridge_mix_test" +const NATS_URL = "nats://localhost:4222" + +function test_mix_receive() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + result = NATSBridge.smartreceive( + msg, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + println("Received $(length(result)) payloads") + + for (dataname, data, data_type) in result + println("\n=== Payload: $dataname (type: $data_type) ===") + + if data_type == "text" + println(" Type: String") + println(" Length: $(length(data)) characters") + + elseif data_type == "dictionary" + println(" Type: JSON Object") + println(" Keys: $(keys(data))") + + elseif data_type == "table" + data = DataFrame(data) + println(" Type: DataFrame") + println(" Dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns") + + elseif data_type == "image" + println(" Type: Vector{UInt8}") + println(" Size: $(length(data)) bytes") + write("./received_$dataname.bin", data) + + elseif data_type == "audio" + println(" Type: Vector{UInt8}") + println(" Size: $(length(data)) bytes") + write("./received_$dataname.bin", data) + + elseif data_type == "video" + println(" Type: Vector{UInt8}") + println(" Size: $(length(data)) bytes") + write("./received_$dataname.bin", data) + + elseif data_type == "binary" + println(" Type: Vector{UInt8}") + println(" Size: $(length(data)) bytes") + write("./received_$dataname.bin", data) + end + end + end + sleep(120) + NATS.drain(conn) +end +``` + +--- + +## Best Practices + +1. **Always wrap payloads in a list** - Even for single payloads: `[("dataname", data, "type")]` +2. **Use appropriate transport** - Let NATSBridge handle size-based routing (default 1MB threshold) +3. **Customize size threshold** - Use `size_threshold` parameter to adjust the direct/link split +4. **Provide fileserver handler** - Implement `fileserverUploadHandler` for link transport +5. **Include correlation IDs** - Track messages across distributed systems +6. **Handle errors** - Implement proper error handling for network failures +7. **Close connections** - Ensure NATS connections are properly closed using `NATS.drain()` + +--- + +## Conclusion + +NATSBridge.jl provides a powerful abstraction for bi-directional communication between Julia and JavaScript services. By understanding the key concepts and following the best practices, you can build robust, scalable applications that leverage the full power of NATS messaging. + +For more information, see: +- [`docs/architecture.md`](./architecture.md) - Detailed architecture documentation +- [`docs/implementation.md`](./implementation.md) - Implementation details \ No newline at end of file diff --git a/walkthrough_julia.md b/walkthrough_julia.md new file mode 100644 index 0000000..d7081dc --- /dev/null +++ b/walkthrough_julia.md @@ -0,0 +1,939 @@ +# NATSBridge.jl Walkthrough: Building a Chat System + +A step-by-step guided walkthrough for building a real-time chat system using NATSBridge.jl with mixed content support (text, images, audio, video, and files). + +## Prerequisites + +- Julia 1.7+ +- NATS server running +- HTTP file server (Plik) running + +## Step 1: Understanding the Chat System Architecture + +### System Components + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Chat System │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ NATS ┌──────────────┐ │ +│ │ Julia │◄───────┬───────► │ JavaScript │ │ +│ │ Service │ │ │ Client │ │ +│ │ │ │ │ │ │ +│ │ - Text │ │ │ - Text │ │ +│ │ - Images │ │ │ - Images │ │ +│ │ - Audio │ ▼ │ - Audio │ │ +│ │ - Video │ NATSBridge.jl │ - Files │ │ +│ │ - Files │ │ │ - Tables │ │ +│ └──────────────┘ │ └──────────────┘ │ +│ │ │ +│ ┌───────┴───────┐ │ +│ │ NATS │ │ +│ │ Server │ │ +│ └─────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ + +For large payloads (> 1MB): +┌─────────────────────────────────────────────────────────────────────────────┐ +│ File Server (Plik) │ +│ │ +│ Julia Service ──► Upload ──► File Server ──► Download ◄── JavaScript Client│ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +### Message Format + +Each chat message is an envelope containing multiple payloads: + +```json +{ + "correlationId": "uuid4", + "msgId": "uuid4", + "timestamp": "2024-01-15T10:30:00Z", + "sendTo": "/chat/room1", + "msgPurpose": "chat", + "senderName": "user-1", + "senderId": "uuid4", + "receiverName": "user-2", + "receiverId": "uuid4", + "brokerURL": "nats://localhost:4222", + "payloads": [ + { + "id": "uuid4", + "dataname": "message_text", + "type": "text", + "transport": "direct", + "encoding": "base64", + "size": 256, + "data": "SGVsbG8gV29ybGQh", + "metadata": {} + }, + { + "id": "uuid4", + "dataname": "user_image", + "type": "image", + "transport": "link", + "encoding": "none", + "size": 15433, + "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/image.jpg", + "metadata": {} + } + ] +} +``` + +## Step 2: Setting Up the Environment + +### 1. Start NATS Server + +```bash +# Using Docker +docker run -d -p 4222:4222 -p 8222:8222 --name nats-server nats:latest + +# Or download from https://github.com/nats-io/nats-server/releases +./nats-server +``` + +### 2. Start HTTP File Server (Plik) + +```bash +# Using Docker +docker run -d -p 8080:8080 --name plik plik/plik:latest + +# Or download from https://github.com/arnaud-lb/plik/releases +./plikd -d +``` + +### 3. Install Julia Dependencies + +```julia +using Pkg +Pkg.add("NATS") +Pkg.add("JSON") +Pkg.add("Arrow") +Pkg.add("HTTP") +Pkg.add("UUIDs") +Pkg.add("Dates") +Pkg.add("Base64") +Pkg.add("PrettyPrinting") +Pkg.add("DataFrames") +``` + +## Step 3: Basic Text-Only Chat + +### Sender (User 1) + +```julia +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +# Include the bridge module +include("NATSBridge.jl") +using .NATSBridge + +# Configuration +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" +const SUBJECT = "/chat/room1" + +# File upload handler for plik server +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +# Send a simple text message +function send_text_message() + message_text = "Hello, how are you today?" + + env = NATSBridge.smartsend( + SUBJECT, + [("message", message_text, "text")], + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "user-1" + ) + + println("Sent text message with correlation ID: $(env.correlationId)") +end + +send_text_message() +``` + +### Receiver (User 2) + +```julia +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +# Include the bridge module +include("NATSBridge.jl") +using .NATSBridge + +# Configuration +const NATS_URL = "nats://localhost:4222" +const SUBJECT = "/chat/room1" + +# Message handler +function message_handler(msg::NATS.Msg) + payloads = NATSBridge.smartreceive( + msg, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + # Extract the text message + for (dataname, data, data_type) in payloads + if data_type == "text" + println("Received message: $data") + # Save to file + write("./received_$dataname.txt", data) + end + end +end + +# Subscribe to the chat room +NATS.subscribe(SUBJECT) do msg + message_handler(msg) +end + +# Keep the program running +while true + sleep(1) +end +``` + +## Step 4: Adding Image Support + +### Sending an Image + +```julia +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +include("NATSBridge.jl") +using .NATSBridge + +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" +const SUBJECT = "/chat/room1" + +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function send_image() + # Read image file + image_data = read("screenshot.png", Vector{UInt8}) + + # Send with text message + env = NATSBridge.smartsend( + SUBJECT, + [ + ("text", "Check out this screenshot!", "text"), + ("screenshot", image_data, "image") + ], + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "user-1" + ) + + println("Sent image with correlation ID: $(env.correlationId)") +end + +send_image() +``` + +### Receiving an Image + +```julia +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +include("NATSBridge.jl") +using .NATSBridge + +const NATS_URL = "nats://localhost:4222" +const SUBJECT = "/chat/room1" + +function message_handler(msg::NATS.Msg) + payloads = NATSBridge.smartreceive( + msg, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + for (dataname, data, data_type) in payloads + if data_type == "text" + println("Text: $data") + elseif data_type == "image" + # Save image to file + filename = "received_$dataname.bin" + write(filename, data) + println("Saved image: $filename") + end + end +end + +NATS.subscribe(SUBJECT) do msg + message_handler(msg) +end +``` + +## Step 5: Handling Large Files with Link Transport + +### Automatic Transport Selection + +```julia +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +include("NATSBridge.jl") +using .NATSBridge + +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" +const SUBJECT = "/chat/room1" + +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function send_large_file() + # Create a large file (> 1MB triggers link transport) + large_data = rand(10_000_000) # ~80MB + + env = NATSBridge.smartsend( + SUBJECT, + [("large_file", large_data, "binary")], + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "user-1" + ) + + println("Uploaded large file to: $(env.payloads[1].data)") + println("Correlation ID: $(env.correlationId)") +end + +send_large_file() +``` + +## Step 6: Audio and Video Support + +### Sending Audio + +```julia +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +include("NATSBridge.jl") +using .NATSBridge + +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" +const SUBJECT = "/chat/room1" + +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function send_audio() + # Read audio file (WAV, MP3, etc.) + audio_data = read("voice_message.mp3", Vector{UInt8}) + + env = NATSBridge.smartsend( + SUBJECT, + [("voice_message", audio_data, "audio")], + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "user-1" + ) + + println("Sent audio message: $(env.correlationId)") +end + +send_audio() +``` + +### Sending Video + +```julia +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +include("NATSBridge.jl") +using .NATSBridge + +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" +const SUBJECT = "/chat/room1" + +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function send_video() + # Read video file (MP4, AVI, etc.) + video_data = read("video_message.mp4", Vector{UInt8}) + + env = NATSBridge.smartsend( + SUBJECT, + [("video_message", video_data, "video")], + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "user-1" + ) + + println("Sent video message: $(env.correlationId)") +end + +send_video() +``` + +## Step 7: Table/Data Exchange + +### Sending Tabular Data + +```julia +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +include("NATSBridge.jl") +using .NATSBridge + +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" +const SUBJECT = "/chat/room1" + +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function send_table() + # Create a DataFrame + df = DataFrame( + id = 1:5, + name = ["Alice", "Bob", "Charlie", "Diana", "Eve"], + score = [95, 88, 92, 98, 85], + grade = ['A', 'B', 'A', 'B', 'B'] + ) + + env = NATSBridge.smartsend( + SUBJECT, + [("student_scores", df, "table")], + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "user-1" + ) + + println("Sent table with $(nrow(df)) rows") +end + +send_table() +``` + +### Receiving and Using Tables + +```julia +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +include("NATSBridge.jl") +using .NATSBridge + +const NATS_URL = "nats://localhost:4222" +const SUBJECT = "/chat/room1" + +function message_handler(msg::NATS.Msg) + payloads = NATSBridge.smartreceive( + msg, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + for (dataname, data, data_type) in payloads + if data_type == "table" + data = DataFrame(data) + println("Received table:") + show(data) + println("\nAverage score: $(mean(data.score))") + end + end +end + +NATS.subscribe(SUBJECT) do msg + message_handler(msg) +end +``` + +## Step 8: Bidirectional Communication + +### Request-Response Pattern + +```julia +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +include("NATSBridge.jl") +using .NATSBridge + +const NATS_URL = "nats://localhost:4222" +const SUBJECT = "/api/query" +const REPLY_SUBJECT = "/api/response" + +# Request +function send_request() + query_data = Dict("query" => "SELECT * FROM users") + + env = NATSBridge.smartsend( + SUBJECT, + [("sql_query", query_data, "dictionary")], + nats_url = NATS_URL, + fileserver_url = "http://localhost:8080", + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "request", + sender_name = "frontend", + receiver_name = "backend", + reply_to = REPLY_SUBJECT, + reply_to_msg_id = string(uuid4()) + ) + + println("Request sent: $(env.correlationId)") +end + +# Response handler +function response_handler(msg::NATS.Msg) + payloads = NATSBridge.smartreceive( + msg, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + for (dataname, data, data_type) in payloads + if data_type == "table" + data = DataFrame(data) + println("Query results:") + show(data) + end + end +end + +NATS.subscribe(REPLY_SUBJECT) do msg + response_handler(msg) +end +``` + +## Step 9: Complete Chat Application + +### Full Chat System + +```julia +module ChatApp +using NATS +using JSON +using UUIDs +using Dates +using PrettyPrinting +using DataFrames +using Arrow +using HTTP +using Base64 + +# Include the bridge module +include("../src/NATSBridge.jl") +using .NATSBridge + +# Configuration +const NATS_URL = "nats://localhost:4222" +const FILESERVER_URL = "http://localhost:8080" +const SUBJECT = "/chat/room1" + +# File upload handler for plik server +function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} + url_getUploadID = "$fileserver_url/upload" + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(String(httpResponse.body)) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") + url_upload = "$fileserver_url/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + form = HTTP.Form(Dict("file" => file_multipart)) + httpResponse = HTTP.post(url_upload, headers, form) + responseJson = JSON.parse(String(httpResponse.body)) + + fileid = responseJson["id"] + url = "$fileserver_url/file/$uploadid/$fileid/$dataname" + + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) +end + +function send_chat_message( + text::String, + image_path::Union{String, Nothing}=nothing, + audio_path::Union{String, Nothing}=nothing +) + # Build payloads list + payloads = [("message_text", text, "text")] + + if image_path !== nothing + image_data = read(image_path, Vector{UInt8}) + push!(payloads, ("user_image", image_data, "image")) + end + + if audio_path !== nothing + audio_data = read(audio_path, Vector{UInt8}) + push!(payloads, ("user_audio", audio_data, "audio")) + end + + env = NATSBridge.smartsend( + SUBJECT, + payloads, + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL, + fileserverUploadHandler = plik_upload_handler, + size_threshold = 1_000_000, + correlation_id = string(uuid4()), + msg_purpose = "chat", + sender_name = "user-1" + ) + + println("Message sent with correlation ID: $(env.correlationId)") +end + +function receive_chat_messages() + function message_handler(msg::NATS.Msg) + payloads = NATSBridge.smartreceive( + msg, + max_retries = 5, + base_delay = 100, + max_delay = 5000 + ) + + println("\n--- New Message ---") + for (dataname, data, data_type) in payloads + if data_type == "text" + println("Text: $data") + elseif data_type == "image" + filename = "received_$dataname.bin" + write(filename, data) + println("Image saved: $filename") + elseif data_type == "audio" + filename = "received_$dataname.bin" + write(filename, data) + println("Audio saved: $filename") + elseif data_type == "table" + println("Table received:") + data = DataFrame(data) + show(data) + end + end + end + + NATS.subscribe(SUBJECT) do msg + message_handler(msg) + end + println("Subscribed to: $SUBJECT") +end + +function run_interactive_chat() + println("\n=== Interactive Chat ===") + println("1. Send a message") + println("2. Join a chat room") + println("3. Exit") + + while true + print("\nSelect option (1-3): ") + choice = readline() + + if choice == "1" + print("Enter message text: ") + text = readline() + send_chat_message(text) + elseif choice == "2" + receive_chat_messages() + elseif choice == "3" + break + end + end +end + +end # module + +# Run the chat app +using .ChatApp +ChatApp.run_interactive_chat() +``` + +## Step 10: Testing the Chat System + +### Test Scenario 1: Text-Only Chat + +```bash +# Terminal 1: Start the chat receiver +julia test_julia_to_julia_text_receiver.jl + +# Terminal 2: Send a message +julia test_julia_to_julia_text_sender.jl +``` + +### Test Scenario 2: Image Chat + +```bash +# Terminal 1: Receive messages +julia test_julia_to_julia_mix_payloads_receiver.jl + +# Terminal 2: Send image +julia test_julia_to_julia_mix_payload_sender.jl +``` + +### Test Scenario 3: Large File Transfer + +```bash +# Terminal 2: Send large file +julia test_julia_to_julia_mix_payload_sender.jl +``` + +## Conclusion + +This walkthrough demonstrated how to build a chat system using NATSBridge.jl with support for: + +- Text messages +- Images (direct transport for small, link transport for large) +- Audio files +- Video files +- Tabular data (DataFrames) +- Bidirectional communication +- Mixed-content messages + +The key takeaways are: + +1. **Always wrap payloads in a list** - Even for single payloads: `[("dataname", data, "type")]` +2. **Use appropriate transport** - NATSBridge automatically handles size-based routing +3. **Support mixed content** - Multiple payloads of different types in one message +4. **Handle errors** - Implement proper error handling for network failures +5. **Use correlation IDs** - Track messages across distributed systems + +For more information, see: +- [`docs/architecture.md`](./docs/architecture.md) - Detailed architecture documentation +- [`docs/implementation.md`](./docs/implementation.md) - Implementation details \ No newline at end of file