# Bi-Directional Data Bridge - Julia Module # Implements smartsend and smartreceive for NATS communication # This module provides functionality for sending and receiving data across network boundaries # using NATS as the message bus, with support for both direct payload transport and # URL-based transport for larger payloads. module NATSBridge using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64 # ---------------------------------------------- 100 --------------------------------------------- # # Constants const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL const DEFAULT_FILESERVER_URL = "http://localhost:8080/upload" # Default HTTP file server URL for link transport """ Struct for the unified JSON envelope This struct represents a standardized message format that can carry either direct payload data or a URL reference, allowing flexible transport strategies based on payload size and requirements. """ struct MessageEnvelope correlation_id::String # Unique identifier to track messages across systems type::String # Data type indicator (e.g., "json", "table", "binary") transport::String # Transport strategy: "direct" (base64 encoded bytes) or "link" (URL reference) payload::Union{String, Nothing} # Base64-encoded payload for direct transport url::Union{String, Nothing} # URL reference for link transport metadata::Dict{String, Any} # Additional metadata about the payload end """ Constructor for MessageEnvelope with keyword arguments and defaults This constructor provides a convenient way to create an envelope using keyword arguments, automatically generating a correlation ID if not provided, and defaulting to "json" type and "direct" transport. """ function MessageEnvelope( ; correlation_id::String = string(uuid4()), # Generate unique ID if not provided type::String = "json", # Default data type transport::String = "direct", # Default transport method payload::Union{String, Nothing} = nothing, # No payload by default url::Union{String, Nothing} = nothing, # No URL by default metadata::Dict{String, Any} = Dict{String, Any}() # Empty metadata by default ) MessageEnvelope(correlation_id, type, transport, payload, url, metadata) end """ Constructor for MessageEnvelope from JSON string This constructor parses a JSON string and reconstructs a MessageEnvelope struct. It handles the metadata field specially by converting the JSON object to a Julia Dict, extracting values from the JSON structure for all other fields. """ function MessageEnvelope(json_str::String) data = JSON.parse(json_str) # Parse JSON string into Julia data structure metadata = Dict{String, Any}() if haskey(data, :metadata) # Check if metadata exists in JSON metadata = Dict(String(k) => v for (k, v) in data.metadata) # Convert JSON keys to strings and store in Dict end MessageEnvelope( correlation_id = String(data.correlation_id), # Extract correlation_id from JSON data type = String(data.type), # Extract type from JSON data transport = String(data.transport), # Extract transport from JSON data payload = haskey(data, :payload) ? String(data.payload) : nothing, # Extract payload if present url = haskey(data, :url) ? String(data.url) : nothing, # Extract URL if present metadata = metadata # Use the parsed metadata ) end """ Convert MessageEnvelope to JSON string This function converts the MessageEnvelope struct to a JSON string representation. It only includes fields in the JSON output if they have non-nothing values, making the JSON output cleaner and more efficient. """ function envelope_to_json(env::MessageEnvelope) obj = Dict{String, Any}( "correlation_id" => env.correlation_id, # Always include correlation_id "type" => env.type, # Always include type "transport" => env.transport # Always include transport ) if env.payload !== nothing # Only include payload if it exists obj["payload"] = env.payload end if env.url !== nothing # Only include URL if it exists obj["url"] = env.url end if !isempty(env.metadata) # Only include metadata if it exists and is not empty obj["metadata"] = env.metadata end JSON.json(obj) # Convert Dict to JSON string end """ Log a trace message with correlation ID and timestamp This function logs information messages with a correlation ID for tracing purposes, making it easier to track message flow across distributed systems. """ function log_trace(correlation_id::String, message::String) timestamp = Dates.now() # Get current timestamp @info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message end 2112 """ smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size This function intelligently routes data delivery based on payload size relative to a threshold. If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS. Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_upload`) and publishes only the download URL over NATS. The function workflow: 1. Serializes the provided data according to the specified format (`type`) 2. Compares the serialized size against `size_threshold` 3. For small payloads: encodes as Base64, constructs a "direct" MessageEnvelope, and publishes to NATS 4. For large payloads: uploads to the fileserver, constructs a "link" MessageEnvelope with the URL, and publishes to NATS # Arguments: - `subject::String` - NATS subject to publish the message to - `data::Any` - Data payload to send (any Julia object) - `type::String = "json"` - Serialization format: `"json"` or `"arrow"` # Keyword Arguments: - `dataname::String = string(UUIDs.uuid4())` - Filename to use when uploading to fileserver (auto-generated UUID if not provided) - `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server - `fileserver_url::String = DEFAULT_FILESERVER_URL` - Base URL of the fileserver (e.g., `"http://localhost:8080"`) - `fileServerUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must match signature of `plik_oneshot_upload`) - `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport - `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated # Return: - A `MessageEnvelope` object containing metadata and transport information: - `correlation_id::String` - Unique identifier for this message exchange - `type::String` - Serialization type used (`"json"` or `"arrow"`) - `transport::String` - Either `"direct"` or `"link"` - `payload::Union{String, Nothing}` - Base64-encoded data for direct transport, `nothing` for link transport - `url::Union{String, Nothing}` - Download URL for link transport, `nothing` for direct transport - `metadata::Dict` - Additional metadata (e.g., `"content_length"`, `"format"`) # Example ```julia using UUIDs # Send a small struct directly via NATS data = Dict("key" => "value") env = smartsend("my.subject", data, "json") # Send a large array using fileserver upload data = rand(10_000_000) # ~80 MB env = smartsend("large.data", data, "arrow") # In another process, retrieve and deserialize: # msg = subscribe(nats_url, "my.subject") # env = json_to_envelope(msg.data) # data = _deserialize_data(Base64.decode(env.payload), env.type) ``` """ function smartsend( subject::String, # smartreceive's subject data::Any, type::String = "json"; dataname="NA", nats_url::String = DEFAULT_NATS_URL, fileserver_url::String = DEFAULT_FILESERVER_URL, fileServerUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver size_threshold::Int = DEFAULT_SIZE_THRESHOLD, correlation_id::Union{String, Nothing} = nothing ) # Generate correlation ID if not provided cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation # Serialize data based on type payload_bytes = _serialize_data(data, type) # Convert data to bytes based on type payload_size = length(payload_bytes) # Calculate payload size in bytes log_trace(cid, "Serialized payload size: $payload_size bytes") # Log payload size # Decision: Direct vs Link if payload_size < size_threshold # Check if payload is small enough for direct transport # Direct path - Base64 encode and send via NATS payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice env = MessageEnvelope( # Create envelope for direct transport correlation_id = cid, type = type, transport = "direct", payload = payload_b64, metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") ) msg_json = envelope_to_json(env) # Convert envelope to JSON publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS return env # Return the envelope for tracking else # Link path - Upload to HTTP server, send URL via NATS log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice # Upload to HTTP server response = fileServerUploadHandler(fileserver_url, dataname, payload_bytes) if response[:status] != 200 # Check if upload was successful error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed end url = response[:url] # URL for the uploaded data log_trace(cid, "Uploaded to URL: $url") # Log successful upload env = MessageEnvelope( # Create envelope for link transport correlation_id = cid, type = type, transport = "link", url = url, metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") ) msg_json = envelope_to_json(env) # Convert envelope to JSON publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS return env # Return the envelope for tracking end end """ _serialize_data - Serialize data according to specified format This function serializes arbitrary Julia data into a binary representation based on the specified format. It supports three serialization formats: - `"json"`: Serializes data as JSON and returns the UTF-8 byte representation - `"table"`: Serializes data as an Arrow IPC stream (table format) and returns the byte stream - `"binary"`: Expects already-binary data (either `IOBuffer` or `Vector{UInt8}`) and returns it as bytes The function handles format-specific serialization logic: 1. For `"json"`: Converts Julia data to JSON string, then encodes to bytes 2. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer 3. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly # Arguments: - `data::Any` - Data to serialize (JSON-serializable for `"json"`, table-like for `"table"`, binary for `"binary"`) - `type::String` - Target format: `"json"`, `"table"`, or `"binary"` # Return: - `Vector{UInt8}` - Binary representation of the serialized data # Throws: - `Error` if `type` is not one of `"json"`, `"table"`, or `"binary"` - `Error` if `type == "binary"` but `data` is neither `IOBuffer` nor `Vector{UInt8}` # Example ```julia using JSON, Arrow, DataFrames # JSON serialization json_data = Dict("name" => "Alice", "age" => 30) json_bytes = _serialize_data(json_data, "json") # Table serialization with a DataFrame (recommended for tabular data) df = DataFrame(id = 1:3, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) table_bytes = _serialize_data(df, "table") # Table serialization with named tuple of vectors (also supported) nt = (id = [1, 2, 3], name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) table_bytes_nt = _serialize_data(nt, "table") # Binary data (IOBuffer) buf = IOBuffer() write(buf, "hello") binary_bytes = _serialize_data(buf, "binary") # Binary data (already bytes) binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary") ``` """ function _serialize_data(data::Any, type::String) if type == "json" # JSON data - serialize directly json_str = JSON.json(data) # Convert Julia data to JSON string return bytes(json_str) # Convert JSON string to bytes elseif type == "table" # Table data - convert to Arrow IPC stream io = IOBuffer() # Create in-memory buffer Arrow.write(io, data) # Write data as Arrow IPC stream to buffer return take!(io) # Return the buffer contents as bytes elseif type == "binary" # Binary data - treat as binary if isa(data, IOBuffer) # Check if data is an IOBuffer return take!(data) # Return buffer contents as bytes elseif isa(data, Vector{UInt8}) # Check if data is already binary return data # Return binary data directly else # Unsupported binary data type error("Binary data must be binary (Vector{UInt8} or IOBuffer)") end else # Unknown type error("Unknown type: $type") end end """ Publish message to NATS This internal function publishes a message to a NATS subject with proper connection management and logging. Arguments: - `nats_url::String` - NATS server URL - `subject::String` - NATS subject to publish to - `message::String` - JSON message to publish - `correlation_id::String` - Correlation ID for logging """ function publish_message(nats_url::String, subject::String, message::String, correlation_id::String) conn = NATS.connect(nats_url) # Create NATS connection try NATS.publish(conn, subject, message) # Publish message to NATS log_trace(correlation_id, "Message published to $subject") # Log successful publish finally NATS.drain(conn) # Ensure connection is closed properly end end """ smartreceive - Receive and process messages from NATS This function processes incoming NATS messages, handling both direct transport (base64 decoded payloads) and link transport (URL-based payloads). It deserializes the data based on the transport type and returns the result. A HTTP file server is required along with its upload function. Arguments: - `msg::NATS.Message` - NATS message to process Keyword Arguments: - `fileserver_url::String` - HTTP file server URL for link transport (default: DEFAULT_FILESERVER_URL) - `max_retries::Int` - Maximum retry attempts for fetching URL (default: 5) - `base_delay::Int` - Initial delay for exponential backoff in ms (default: 100) - `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000) Return: - Tuple `(data = deserialized_data, envelope = MessageEnvelope)` - Data and envelope """ function smartreceive( msg::NATS.Msg; fileserver_url::String = DEFAULT_FILESERVER_URL, max_retries::Int = 5, base_delay::Int = 100, max_delay::Int = 5000 ) # Parse the envelope env = MessageEnvelope(String(msg.payload)) # Parse NATS message data as JSON envelope log_trace(env.correlation_id, "Processing received message") # Log message processing start # Check transport type if env.transport == "direct" # Direct transport - payload is in the message log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling # Decode Base64 payload payload_bytes = Base64.base64decode(env.payload) # Decode base64 payload to bytes # Deserialize based on type data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data return (data = data, envelope = env) # Return data and envelope as tuple elseif env.transport == "link" # Link transport - payload is at URL log_trace(env.correlation_id, "Link transport - fetching from URL") # Log link transport handling # Fetch with exponential backoff data = _fetch_with_backoff(env.url, max_retries, base_delay, max_delay, env.correlation_id) # Fetch data from URL # Deserialize based on type result = _deserialize_data(data, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data return (data = result, envelope = env) # Return data and envelope as tuple else # Unknown transport type error("Unknown transport type: $(env.transport)") # Throw error for unknown transport end end """ Fetch data from URL with exponential backoff This internal function retrieves data from a URL with retry logic using exponential backoff to handle transient failures. Arguments: - `url::String` - URL to fetch from - `max_retries::Int` - Maximum number of retry attempts - `base_delay::Int` - Initial delay in milliseconds - `max_delay::Int` - Maximum delay in milliseconds - `correlation_id::String` - Correlation ID for logging Return: - Vector{UInt8} - Fetched data as bytes """ function _fetch_with_backoff( url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String ) delay = base_delay # Initialize delay with base delay value for attempt in 1:max_retries # Attempt to fetch data up to max_retries times try response = HTTP.request("GET", url) # Make HTTP GET request to URL if response.status == 200 # Check if request was successful log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") # Log success return response.body # Return response body as bytes else # Request failed error("Failed to fetch: $(response.status)") # Throw error for non-200 status end catch e # Handle exceptions during fetch log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") # Log failure if attempt < max_retries # Only sleep if not the last attempt sleep(delay / 1000.0) # Sleep for delay seconds (convert from ms) delay = min(delay * 2, max_delay) # Double delay for next attempt, capped at max_delay end end end error("Failed to fetch data after $max_retries attempts") # Throw error if all attempts failed end """ Deserialize bytes to data based on type This internal function converts serialized bytes back to Julia data based on type. It handles "json" (JSON deserialization), "table" (Arrow IPC deserialization), and "binary" (binary data). Arguments: - `data::Vector{UInt8}` - Serialized data as bytes - `type::String` - Data type ("json", "table", "binary") - `correlation_id::String` - Correlation ID for logging - `metadata::Dict{String, Any}` - Metadata about the data Return: - Deserialized data (DataFrame for "table", JSON data for "json", bytes for "binary") """ function _deserialize_data( data::Vector{UInt8}, type::String, correlation_id::String, metadata::Dict{String, Any} ) if type == "json" # JSON data - deserialize json_str = String(data) # Convert bytes to string return JSON.parse(json_str) # Parse JSON string to Julia data structure elseif type == "table" # Table data - deserialize Arrow IPC stream io = IOBuffer(data) # Create buffer from bytes df = Arrow.Table(io) # Read Arrow IPC format from buffer return df # Return DataFrame elseif type == "binary" # Binary data - return binary return data # Return bytes directly else # Unknown type error("Unknown type: $type") # Throw error for unknown type end end # """ Decode base64 string to bytes # This internal function decodes a base64-encoded string back to binary data. # It's a wrapper around Base64.decode for consistency in the module. # Arguments: # - `str::String` - Base64-encoded string to decode # Return: # - Vector{UInt8} - Decoded binary data # """ # function base64decode(str::String) # return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module # end """ plik_oneshot_upload - Upload a single file to a plik server using one-shot mode This function uploads a raw byte array to a plik server in one-shot mode (no upload session). It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`, retrieves an upload ID and token, then uploads the file data as multipart form data using the token. The function workflow: 1. Obtains an upload ID and token from the server 2. Uploads the provided binary data as a file using the `X-UploadToken` header 3. Returns identifiers and download URL for the uploaded file # Arguments: - `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) - `filename::String` - Name of the file being uploaded - `data::Vector{UInt8}` - Raw byte data of the file content # Return: - A named tuple with fields: - `status::Integer` - HTTP server response status - `uploadid::String` - ID of the one-shot upload session - `fileid::String` - ID of the uploaded file within the session - `url::String` - Full URL to download the uploaded file # Example ```jldoctest using HTTP, JSON fileServerURL = "http://localhost:8080" filepath = "./test.zip" filename = basename(filepath) filebytes = read(filepath) # read(filepath) output is raw bytes of the file # Upload to local plik server status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filename, filebytes) # to download an uploaded file curl -L -O "url" ``` """ #[x] function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8}) # ----------------------------------------- get upload id ---------------------------------------- # # Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload url_getUploadID = "$fileServerURL/upload" # URL to get upload ID headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) responseJson = JSON.parse(httpResponse.body) uploadid = responseJson["id"] uploadtoken = responseJson["uploadToken"] # ------------------------------------------ upload file ----------------------------------------- # # Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload url_upload = "$fileServerURL/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] # Create the multipart form data form = HTTP.Form(Dict( "file" => file_multipart )) # Execute the POST request httpResponse = nothing try httpResponse = HTTP.post(url_upload, headers, form) # println("Status: ", httpResponse.status) responseJson = JSON.parse(httpResponse.body) catch e @error "Request failed" exception=e end fileid=responseJson["id"] # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" url = "$fileServerURL/file/$uploadid/$fileid/$filename" return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url) end """ plik_oneshot_upload(fileServerURL::String, filepath::String) Upload a single file to a plik server using one-shot mode. This function uploads a file from disk to a plik server in one-shot mode (no upload session). It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`, retrieves an upload ID and token, then uploads the file data as multipart form data using the token. The function workflow: 1. Obtains an upload ID and token from the server 2. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header 3. Returns identifiers and download URL for the uploaded file # Arguments: - `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) - `filepath::String` - Full path to the local file to upload # Return: - A named tuple with fields: - `status::Integer` - HTTP server response status - `uploadid::String` - ID of the one-shot upload session - `fileid::String` - ID of the uploaded file within the session - `url::String` - Full URL to download the uploaded file # Example ```julia using HTTP, JSON fileServerURL = "http://localhost:8080" filepath = "./test.zip" # Upload to local plik server status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filepath) # To download the uploaded file later (via curl as example): curl -L -O "url" ``` """ #[x] function plik_oneshot_upload(fileServerURL::String, filepath::String) # ----------------------------------------- get upload id ---------------------------------------- # # Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload filename = basename(filepath) url_getUploadID = "$fileServerURL/upload" # URL to get upload ID headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) responseJson = JSON.parse(httpResponse.body) uploadid = responseJson["id"] uploadtoken = responseJson["uploadToken"] println("uploadid = ", uploadid) # ------------------------------------------ upload file ----------------------------------------- # # Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID file_multipart = open(filepath, "r") url_upload = "$fileServerURL/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] # Create the multipart form data form = HTTP.Form(Dict( "file" => file_multipart )) # Execute the POST request httpResponse = nothing try httpResponse = HTTP.post(url_upload, headers, form) # println("Status: ", httpResponse.status) responseJson = JSON.parse(httpResponse.body) catch e @error "Request failed" exception=e end fileid=responseJson["id"] # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" url = "$fileServerURL/file/$uploadid/$fileid/$filename" return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url) end end # module