Files
NATSBridge/docs/architecture.md
2026-02-25 20:27:51 +07:00

21 KiB

Architecture Documentation: Bi-Directional Data Bridge

Overview

This document describes the architecture for a high-performance, bi-directional data bridge for Julia applications using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.

The system enables seamless communication for Julia applications:

  • Julia messaging with NATS

File Server Handler Architecture

The system uses handler functions to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).

Handler Function Signatures:

# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (fileserver_url::String, dataname::String, data::Vector{UInt8})
# Returns: Dict{String, Any} with keys: "status", "uploadid", "fileid", "url"
fileserver_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}

# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)
# Returns: Vector{UInt8} (the downloaded data)
fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}

This design allows the system to support multiple file server backends without changing the core messaging logic.

Multi-Payload Support (Standard API)

The system uses a standardized list-of-tuples format for all payload operations. Even when sending a single payload, the user must wrap it in a list.

API Standard:

# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]

# Output format for smartreceive (returns a dictionary-like object with payloads field containing list of tuples)
# Returns: Dict-like object with envelope metadata and payloads field containing Vector{Tuple{String, Any, String}}
# {
#   "correlation_id": "...",
#   "msg_id": "...",
#   "timestamp": "...",
#   "send_to": "...",
#   "msg_purpose": "...",
#   "sender_name": "...",
#   "sender_id": "...",
#   "receiver_name": "...",
#   "receiver_id": "...",
#   "reply_to": "...",
#   "reply_to_msg_id": "...",
#   "broker_url": "...",
#   "metadata": {...},
#   "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# }

Supported Types:

  • "text" - Plain text
  • "dictionary" - JSON-serializable dictionaries (Dict, NamedTuple)
  • "table" - Tabular data (DataFrame, array of structs)
  • "image" - Image data (Bitmap, PNG/JPG bytes)
  • "audio" - Audio data (WAV, MP3 bytes)
  • "video" - Video data (MP4, AVI bytes)
  • "binary" - Generic binary data (Vector{UInt8})

This design allows per-payload type specification, enabling mixed-content messages where different payloads can use different serialization formats in a single message.

Examples:

# Single payload - still wrapped in a list
smartsend(
    "/test",
    [("dataname1", data1, "dictionary")],  # List with one tuple (data, type)
    broker_url="nats://localhost:4222",
    fileserver_upload_handler=plik_oneshot_upload
)

# Multiple payloads in one message with different types
smartsend(
    "/test",
    [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
    broker_url="nats://localhost:4222",
    fileserver_upload_handler=plik_oneshot_upload
)

# Mixed content (e.g., chat with text, image, audio)
smartsend(
    "/chat",
    [
        ("message_text", "Hello!", "text"),
        ("user_image", image_data, "image"),
        ("audio_clip", audio_data, "audio")
    ],
    broker_url="nats://localhost:4222"
)

# Receive returns a dictionary envelope with all metadata and deserialized payloads
env = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
# env["correlation_id"], env["msg_id"], etc.
# env is a dictionary containing envelope metadata and payloads field

Architecture Diagram

flowchart TD
    subgraph Client
        App[Julia Application]
    end

    subgraph Server
        Julia[Julia Service]
        NATS[NATS Server]
        FileServer[HTTP File Server]
    end

    App -->|NATS| NATS
    NATS -->|NATS| Julia
    Julia -->|NATS| NATS
    Julia -->|HTTP POST| FileServer

    style App fill:#e8f5e9
    style Julia fill:#e8f5e9
    style NATS fill:#fff3e0
    style FileServer fill:#f3e5f5

System Components

1. msg_envelope_v1 - Message Envelope

The msg_envelope_v1 structure provides a comprehensive message format for bidirectional communication in Julia applications.

Julia Structure:

struct msg_envelope_v1
  correlation_id::String       # Unique identifier to track messages across systems
  msg_id::String               # This message id
  timestamp::String            # Message published timestamp
  
  send_to::String              # Topic/subject the sender sends to
  msg_purpose::String          # Purpose of this message (ACK | NACK | updateStatus | shutdown | ...)
  sender_name::String          # Sender name (e.g., "agent-wine-web-frontend")
  sender_id::String            # Sender id (uuid4)
  receiver_name::String        # Message receiver name (e.g., "agent-backend")
  receiver_id::String          # Message receiver id (uuid4 or nothing for broadcast)
  reply_to::String             # Topic to reply to
  reply_to_msg_id::String      # Message id this message is replying to
  broker_url::String           # NATS server address
  
  metadata::Dict{String, Any}
  payloads::Vector{msg_payload_v1}  # Multiple payloads stored here
end

JSON Schema:

{
  "correlation_id": "uuid-v4-string",
  "msg_id": "uuid-v4-string",
  "timestamp": "2024-01-15T10:30:00Z",
  
  "send_to": "topic/subject",
  "msg_purpose": "ACK | NACK | updateStatus | shutdown | chat",
  "sender_name": "agent-wine-web-frontend",
  "sender_id": "uuid4",
  "receiver_name": "agent-backend",
  "receiver_id": "uuid4",
  "reply_to": "topic",
  "reply_to_msg_id": "uuid4",
  "broker_url": "nats://localhost:4222",
  
  "metadata": {

  },
  
  "payloads": [
    {
      "id": "uuid4",
      "dataname": "login_image",
      "payload_type": "image",
      "transport": "direct",
      "encoding": "base64",
      "size": 15433,
      "data": "base64-encoded-string",
      "metadata": {

      }
    },
    {
      "id": "uuid4",
      "dataname": "large_data",
      "payload_type": "table",
      "transport": "link",
      "encoding": "none",
      "size": 524288,
      "data": "http://localhost:8080/file/UPLOAD_ID/FILE_ID/data.arrow",
      "metadata": {

      }
    }
  ]
}

2. msg_payload_v1 - Payload Structure

The msg_payload_v1 structure provides flexible payload handling for various data types.

Julia Structure:

struct msg_payload_v1
  id::String                    # Id of this payload (e.g., "uuid4")
  dataname::String              # Name of this payload (e.g., "login_image")
  payload_type::String          # "text | dictionary | table | image | audio | video | binary"
  transport::String             # "direct | link"
  encoding::String              # "none | json | base64 | arrow-ipc"
  size::Integer                 # Data size in bytes
  data::Any                     # Payload data in case of direct transport or a URL in case of link
  metadata::Dict{String, Any}   # Dict("checksum" => "sha256_hash", ...)
end

Key Features:

  • Supports multiple data types: text, dictionary, table, image, audio, video, binary
  • Flexible transport: "direct" (NATS) or "link" (HTTP fileserver)
  • Multiple payloads per message (essential for chat with mixed content)
  • Per-payload and per-envelope metadata support

3. Transport Strategy Decision Logic

┌─────────────────────────────────────────────────────────────┐
│                     smartsend Function                      │
│  Accepts: [(dataname1, data1, type1), ...]                  │
│  (Type is per payload, not standalone)                      │
└─────────────────────────────────────────────────────────────┘
                             │
                             ▼
┌─────────────────────────────────────────────────────────────┐
│  For each payload:                                          │
│  1. Extract type from tuple                                 │
│  2. Serialize based on type                                 │
│  3. Check payload size                                      │
└─────────────────────────────────────────────────────────────┘
                             │
            ┌────────────────┴─-────────────────┐
            ▼                                   ▼
      ┌─────────────────┐                 ┌─────────────────┐
      │  Direct Path    │                 │  Link Path      │
      │  (< 1MB)        │                 │  (> 1MB)        │
      │                 │                 │                 │
      │ • Serialize to  │                 │ • Serialize to  │
      │   IOBuffer      │                 │   IOBuffer      │
      │ • Base64 encode │                 │ • Upload to     │
      │ • Publish to    │                 │   HTTP Server   │
      │   NATS          │                 │ • Publish to    │
      │   (with payload │                 │   NATS with URL │
      │    in envelope) │                 │   (in envelope) │
      └─────────────────┘                 └─────────────────┘

4. Julia Module Architecture

graph TD
    subgraph JuliaModule
        JuliaSmartSend[smartsend]
        SizeCheck[Size Check]
        DirectPath[Direct Path]
        LinkPath[Link Path]
        HTTPClient[HTTP Client]
    end

    JuliaSmartSend --> SizeCheck
    SizeCheck -->|< 1MB| DirectPath
    SizeCheck -->|>= 1MB| LinkPath
    LinkPath --> HTTPClient

    style JuliaModule fill:#c5e1a5

Implementation Details

Julia Implementation

Dependencies

  • NATS.jl - Core NATS functionality
  • Arrow.jl - Arrow IPC serialization
  • JSON3.jl - JSON parsing
  • HTTP.jl - HTTP client for file server
  • Dates.jl - Timestamps for logging

smartsend Function

function smartsend(
  subject::String,
  data::AbstractArray{Tuple{String, Any, String}, 1};  # List of (dataname, data, type) tuples
  broker_url::String = DEFAULT_BROKER_URL,  # NATS server URL
  fileserver_url = DEFAULT_FILESERVER_URL,
  fileserver_upload_handler::Function = plik_oneshot_upload,
  size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
  correlation_id::Union{String, Nothing} = nothing,
  msg_purpose::String = "chat",
  sender_name::String = "NATSBridge",
  receiver_name::String = "",
  receiver_id::String = "",
  reply_to::String = "",
  reply_to_msg_id::String = "",
  is_publish::Bool = true,   # Whether to automatically publish to NATS
  NATS_connection::Union{NATS.Connection, Nothing} = nothing  # Pre-existing NATS connection (optional, saves connection overhead)
)

Keyword Parameter - NATS_connection:

  • NATS_connection::Union{NATS.Connection, Nothing} = nothing - Pre-existing NATS connection. When provided, smartsend uses this connection instead of creating a new one, avoiding the overhead of connection establishment. This is useful for high-frequency publishing scenarios where connection reuse provides performance benefits.

Connection Handling Logic:

if is_publish == false
  # skip publish a message
elseif is_publish == true && NATS_connection === nothing
  publish_message(broker_url, subject, env_json_str, cid)  # Creates new connection
elseif is_publish == true && NATS_connection !== nothing
  publish_message(NATS_connection, subject, env_json_str, cid)  # Uses provided connection
end

Return Value:

  • Returns a tuple (env, env_json_str) where:
    • env::msg_envelope_v1 - The envelope object containing all metadata and payloads
    • env_json_str::String - JSON string representation of the envelope for publishing

Options:

  • is_publish::Bool = true - When true (default), the message is automatically published to NATS. When false, the function returns the envelope and JSON string without publishing, allowing manual publishing via NATS request-reply pattern.

The envelope object can be accessed directly for programmatic use, while the JSON string can be published directly to NATS using the request-reply pattern.

Input Format:

  • data::AbstractArray{Tuple{String, Any, String}} - Must be a list of (dataname, data, type) tuples: [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]
  • Even for single payloads: [(dataname1, data1, "type1")]
  • Each payload can have a different type, enabling mixed-content messages

Flow:

  1. Iterate through the list of (dataname, data, type) tuples
  2. For each payload: extract the type from the tuple and serialize accordingly
  3. Check payload size
  4. If < threshold: publish directly to NATS with Base64-encoded payload
  5. If >= threshold: upload to HTTP server, publish NATS with URL

smartreceive Handler

function smartreceive(
    msg::NATS.Msg;
    fileserver_download_handler::Function = _fetch_with_backoff,
    max_retries::Int = 5,
    base_delay::Int = 100,
    max_delay::Int = 5000
)
    # Parse envelope
    # Iterate through all payloads
    # For each payload: check transport type
    #   If direct: decode Base64 payload
    #   If link: fetch from URL with exponential backoff using fileserver_download_handler
    # Deserialize payload based on type
    # Return envelope dictionary with all metadata and deserialized payloads
end

Output Format:

  • Returns a JSON object (dictionary) containing all envelope fields:
    • correlation_id, msg_id, timestamp, send_to, msg_purpose, sender_name, sender_id, receiver_name, receiver_id, reply_to, reply_to_msg_id, broker_url
    • metadata - Message-level metadata dictionary
    • payloads - List of tuples, each containing (dataname, data, type) with deserialized payload data

Process Flow:

  1. Parse the JSON envelope to extract all fields
  2. Iterate through each payload in payloads
  3. For each payload:
    • Determine transport type (direct or link)
    • If direct: decode Base64 data from the message
    • If link: fetch data from URL using exponential backoff (via fileserver_download_handler)
    • Deserialize based on payload type (dictionary, table, binary, etc.)
  4. Return envelope dictionary with payloads field containing list of (dataname, data, type) tuples

Note: The fileserver_download_handler receives (url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String) and returns Vector{UInt8}.

publish_message Function

The publish_message function provides two overloads for publishing messages to NATS:

Overload 1 - URL-based publishing (creates new connection):

function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
  conn = NATS.connect(broker_url)  # Create NATS connection
  publish_message(conn, subject, message, correlation_id)
end

Overload 2 - Connection-based publishing (uses pre-existing connection):

function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
  try
    NATS.publish(conn, subject, message)  # Publish message to NATS
    log_trace(correlation_id, "Message published to $subject")  # Log successful publish
  finally
    NATS.drain(conn)  # Ensure connection is closed properly
  end
end

Use Case: Use the connection-based overload when you already have an established NATS connection and want to publish multiple messages without the overhead of creating a new connection for each publish. This is a Julia-specific optimization that leverages function overloading.

Integration with smartsend:

# When NATS_connection is provided to smartsend, it uses the connection-based publish_message
env, env_json_str = smartsend(
    "my.subject",
    [("data", payload_data, "type")],
    NATS_connection=my_connection,  # Pre-existing connection
    is_publish=true
)
# Uses: publish_message(NATS_connection, subject, env_json_str, cid)

# When NATS_connection is not provided, it uses the URL-based publish_message
env, env_json_str = smartsend(
    "my.subject",
    [("data", payload_data, "type")],
    broker_url="nats://localhost:4222",
    is_publish=true
)
# Uses: publish_message(broker_url, subject, env_json_str, cid)

Scenario Implementations

Scenario 1: Command & Control (Small Dictionary)

Julia (Sender/Receiver):

# Subscribe to control subject
# Parse JSON envelope
# Execute simulation with parameters
# Send acknowledgment

Scenario 2: Deep Dive Analysis (Large Arrow Table)

Julia (Sender/Receiver):

# Create large DataFrame
# Convert to Arrow IPC stream
# Check size (> 1MB)
# Upload to HTTP server
# Publish NATS with URL

Scenario 3: Live Audio Processing

Julia (Sender/Receiver):

# Receive audio data
# Perform FFT or AI transcription
# Send results back (JSON + Arrow table)

Scenario 4: Catch-Up (JetStream)

Julia (Producer/Consumer):

# Publish to JetStream
# Include metadata for temporal tracking

Scenario 5: Selection (Low Bandwidth)

Focus: Small Arrow tables. The Action: Julia wants to send a small DataFrame to show on a receiving application for the user to choose.

Julia (Sender/Receiver):

# Create small DataFrame (e.g., 50KB - 500KB)
# Convert to Arrow IPC stream
# Check payload size (< 1MB threshold)
# Publish directly to NATS with Base64-encoded payload
# Include metadata for dashboard selection context

Scenario 6: Chat System

Focus: Every conversational message is composed of any number and any combination of components, spanning the full spectrum from small to large. This includes text, images, audio, video, tables, and files—specifically accommodating everything from brief snippets to high-resolution images, large audio files, extensive tables, and massive documents. Support for claim-check delivery and full bi-directional messaging.

Multi-Payload Support: The system supports mixed-payload messages where a single message can contain multiple payloads with different transport strategies. The smartreceive function iterates through all payloads in the envelope and processes each according to its transport type.

Julia (Sender/Receiver):

# Build chat message with mixed payloads:
# - Text: direct transport (Base64)
# - Small images: direct transport (Base64)
# - Large images: link transport (HTTP URL)
# - Audio/video: link transport (HTTP URL)
# - Tables: direct or link depending on size
# - Files: link transport (HTTP URL)
# 
# Each payload uses appropriate transport strategy:
# - Size < 1MB → direct (NATS + Base64)
# - Size >= 1MB → link (HTTP upload + NATS URL)
# 
# Include claim-check metadata for delivery tracking
# Support bidirectional messaging with replyTo fields

Use Case: Full-featured chat system supporting rich media. User can send text, small images directly, or upload large files that get uploaded to HTTP server and referenced via URLs. Claim-check pattern ensures reliable delivery tracking for all message components.

Implementation Note: The smartreceive function iterates through all payloads in the envelope and processes each according to its transport type. See the standard API format in Section 1: msg_envelope_v1 supports Vector{msg_payload_v1} for multiple payloads.

Performance Considerations

Zero-Copy Reading

  • Use Arrow's memory-mapped file reading
  • Avoid unnecessary data copying during deserialization
  • Use Apache Arrow's native IPC reader

Exponential Backoff

  • Implement exponential backoff for HTTP link fetching
  • Maximum retry count: 5
  • Base delay: 100ms, max delay: 5000ms

Correlation ID Logging

  • Log correlation_id at every stage
  • Include: send, receive, serialize, deserialize
  • Use structured logging format

Testing Strategy

Unit Tests

  • Test smartsend with various payload sizes
  • Test smartreceive with direct and link transport
  • Test Arrow IPC serialization/deserialization

Integration Tests

  • Test full flow with NATS server
  • Test large data transfer (> 100MB)
  • Test audio processing pipeline

Performance Tests

  • Measure throughput for small payloads
  • Measure throughput for large payloads