diff --git a/AI_prompt.txt b/AI_prompt.txt index 8e66928..6a0f3f3 100644 --- a/AI_prompt.txt +++ b/AI_prompt.txt @@ -12,3 +12,4 @@ Role: Principal Systems Architect & Lead Software Engineer.Objective: Implement +Create a walkthrough for Julia service-A service sending a mix-content chat message to Julia service-B. the chat message must includes \ No newline at end of file diff --git a/docs/architecture.md b/docs/architecture.md index 0daa156..4fa65fc 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -26,33 +26,55 @@ The system uses a **standardized list-of-tuples format** for all payload operati **API Standard:** ```julia -# Input format for smartsend (always a list of tuples) -[(dataname1, data1), (dataname2, data2), ...] +# Input format for smartsend (always a list of tuples with type info) +[(dataname1, data1, type1), (dataname2, data2, type2), ...] # Output format for smartreceive (always returns a list of tuples) [(dataname1, data1), (dataname2, data2), ...] ``` +**Supported Types:** +- `"text"` - Plain text +- `"dictionary"` - JSON-serializable dictionaries (Dict, NamedTuple) +- `"table"` - Tabular data (DataFrame, array of structs) +- `"image"` - Image data (Bitmap, PNG/JPG bytes) +- `"audio"` - Audio data (WAV, MP3 bytes) +- `"video"` - Video data (MP4, AVI bytes) +- `"binary"` - Generic binary data (Vector{UInt8}) + +This design allows per-payload type specification, enabling **mixed-content messages** where different payloads can use different serialization formats in a single message. + **Examples:** ```julia # Single payload - still wrapped in a list smartsend( "/test", - [("dataname1", data1)], # List with one tuple + [("dataname1", data1, "dictionary")], # List with one tuple (data, type) nats_url="nats://localhost:4222", fileserverUploadHandler=plik_oneshot_upload, metadata=user_provided_envelope_level_metadata ) -# Multiple payloads in one message +# Multiple payloads in one message with different types smartsend( "/test", - [("dataname1", data1), ("dataname2", data2)], + [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")], nats_url="nats://localhost:4222", fileserverUploadHandler=plik_oneshot_upload ) +# Mixed content (e.g., chat with text, image, audio) +smartsend( + "/chat", + [ + ("message_text", "Hello!", "text"), + ("user_image", image_data, "image"), + ("audio_clip", audio_data, "audio") + ], + nats_url="nats://localhost:4222" +) + # Receive always returns a list payloads = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay) # payloads = [("dataname1", data1), ("dataname2", data2), ...] @@ -174,7 +196,7 @@ The `msgPayload_v1` structure provides flexible payload handling for various dat struct msgPayload_v1 id::String # Id of this payload (e.g., "uuid4") dataname::String # Name of this payload (e.g., "login_image") - type::String # "text | json | table | image | audio | video | binary" + type::String # "text | dictionary | table | image | audio | video | binary" transport::String # "direct | link" encoding::String # "none | json | base64 | arrow-ipc" size::Integer # Data size in bytes @@ -184,7 +206,7 @@ end ``` **Key Features:** -- Supports multiple data types: text, json, table, image, audio, video, binary +- Supports multiple data types: text, dictionary, table, image, audio, video, binary - Flexible transport: "direct" (NATS) or "link" (HTTP fileserver) - Multiple payloads per message (essential for chat with mixed content) - Per-payload and per-envelope metadata support @@ -194,28 +216,32 @@ end ``` ┌─────────────────────────────────────────────────────────────┐ │ smartsend Function │ -│ Accepts: [(dataname1, data1), (dataname2, data2), ...] │ +│ Accepts: [(dataname1, data1, type1), ...] │ +│ (No standalone type parameter - type per payload) │ └─────────────────────────────────────────────────────────────┘ - │ - ▼ + │ + ▼ ┌─────────────────────────────────────────────────────────────┐ -│ Is payload size < 1MB? │ +│ For each payload: │ +│ 1. Extract type from tuple │ +│ 2. Serialize based on type │ +│ 3. Check payload size │ └─────────────────────────────────────────────────────────────┘ - │ - ┌────────────────┴─-────────────────┐ - ▼ ▼ - ┌─────────────────┐ ┌─────────────────┐ - │ Direct Path │ │ Link Path │ - │ (< 1MB) │ │ (> 1MB) │ - │ │ │ │ - │ • Serialize to │ │ • Serialize to │ - │ IOBuffer │ │ IOBuffer │ - │ • Base64 encode │ │ • Upload to │ - │ • Publish to │ │ HTTP Server │ - │ NATS │ │ • Publish to │ - │ (with payload │ │ NATS with URL │ - │ in envelope) │ │ (in envelope) │ - └─────────────────┘ └─────────────────┘ + │ + ┌────────────────┴─-────────────────┐ + ▼ ▼ + ┌─────────────────┐ ┌─────────────────┐ + │ Direct Path │ │ Link Path │ + │ (< 1MB) │ │ (> 1MB) │ + │ │ │ │ + │ • Serialize to │ │ • Serialize to │ + │ IOBuffer │ │ IOBuffer │ + │ • Base64 encode │ │ • Upload to │ + │ • Publish to │ │ HTTP Server │ + │ NATS │ │ • Publish to │ + │ (with payload │ │ NATS with URL │ + │ in envelope) │ │ (in envelope) │ + └─────────────────┘ └─────────────────┘ ``` ### 4. Julia Module Architecture @@ -271,22 +297,22 @@ graph TD ```julia function smartsend( - subject::String, - data::AbstractArray{Tuple{String, Any}}, - type::String = "json"; - nats_url::String = "nats://localhost:4222", - fileserverUploadHandler::Function = plik_oneshot_upload, - size_threshold::Int = 1_000_000 # 1MB + subject::String, + data::AbstractArray{Tuple{String, Any, String}}; # No standalone type parameter + nats_url::String = "nats://localhost:4222", + fileserverUploadHandler::Function = plik_oneshot_upload, + size_threshold::Int = 1_000_000 # 1MB ) ``` **Input Format:** -- `data::AbstractArray{Tuple{String, Any}}` - **Must be a list of tuples**: `[("dataname1", data1), ("dataname2", data2), ...]` -- Even for single payloads: `[(dataname1, data1)]` +- `data::AbstractArray{Tuple{String, Any, String}}` - **Must be a list of (dataname, data, type) tuples**: `[("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]` +- Even for single payloads: `[(dataname1, data1, "type1")]` +- Each payload can have a different type, enabling mixed-content messages **Flow:** -1. Iterate through the list of `("dataname", data)` tuples -2. For each payload: serialize to Arrow IPC stream (if table) or JSON +1. Iterate through the list of `(dataname, data, type)` tuples +2. For each payload: extract the type from the tuple and serialize accordingly 3. Check payload size 4. If < threshold: publish directly to NATS with Base64-encoded payload 5. If >= threshold: upload to HTTP server, publish NATS with URL @@ -295,19 +321,19 @@ function smartsend( ```julia function smartreceive( - msg::NATS.Message; - fileserverDownloadHandler::Function, - max_retries::Int = 5, - base_delay::Int = 100, - max_delay::Int = 5000 + msg::NATS.Message; + fileserverDownloadHandler::Function, + max_retries::Int = 5, + base_delay::Int = 100, + max_delay::Int = 5000 ) - # Parse envelope - # Iterate through all payloads - # For each payload: check transport type - # If direct: decode Base64 payload - # If link: fetch from URL with exponential backoff using fileserverDownloadHandler - # Deserialize payload based on type - # Return list of (dataname, data) tuples + # Parse envelope + # Iterate through all payloads + # For each payload: check transport type + # If direct: decode Base64 payload + # If link: fetch from URL with exponential backoff using fileserverDownloadHandler + # Deserialize payload based on type + # Return list of (dataname, data) tuples end ``` @@ -322,7 +348,7 @@ end - Determine transport type (`direct` or `link`) - If `direct`: decode Base64 data from the message - If `link`: fetch data from URL using exponential backoff - - Deserialize based on payload type (`json`, `table`, `binary`, etc.) + - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.) 4. Return list of `(dataname, data)` tuples ### JavaScript Implementation @@ -335,17 +361,26 @@ end #### smartsend Function ```javascript -async function smartsend(subject, data, type = 'json', options = {}) +async function smartsend(subject, data, options = {}) + // data format: [(dataname, data, type), ...] // options object should include: - // - fileserverUploadHandler: function to upload data to file server - // - fileserver_url: base URL of the file server + // - natsUrl: NATS server URL + // - fileserverUrl: base URL of the file server + // - sizeThreshold: threshold in bytes for transport selection + // - correlationId: optional correlation ID for tracing ``` +**Input Format:** +- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]` +- Even for single payloads: `[(dataname1, data1, "type1")]` +- Each payload can have a different type, enabling mixed-content messages + **Flow:** -1. Serialize data to Arrow IPC buffer (if table) -2. Check payload size -3. If < threshold: publish directly to NATS -4. If >= threshold: upload to HTTP server, publish NATS with URL +1. Iterate through the list of (dataname, data, type) tuples +2. For each payload: extract the type from the tuple and serialize accordingly +3. Check payload size +4. If < threshold: publish directly to NATS +5. If >= threshold: upload to HTTP server, publish NATS with URL #### smartreceive Handler @@ -366,12 +401,12 @@ async function smartreceive(msg, options = {}) - Determine transport type (`direct` or `link`) - If `direct`: decode Base64 data from the message - If `link`: fetch data from URL using exponential backoff - - Deserialize based on payload type (`json`, `table`, `binary`, etc.) + - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.) 4. Return list of `(dataname, data)` tuples ## Scenario Implementations -### Scenario 1: Command & Control (Small JSON) +### Scenario 1: Command & Control (Small Dictionary) **Julia (Receiver):** ```julia @@ -383,8 +418,8 @@ async function smartreceive(msg, options = {}) **JavaScript (Sender):** ```javascript -// Create small JSON config -// Send via smartsend with type="json" +// Create small dictionary config +// Send via smartsend with type="dictionary" ``` ### Scenario 2: Deep Dive Analysis (Large Arrow Table) diff --git a/etc.jl b/etc.jl index e69de29..532d5d1 100644 --- a/etc.jl +++ b/etc.jl @@ -0,0 +1,15 @@ +the user will provide data in this form: [("dataname1", data1, "datatype1"), ("dataname2", data2, "datatype2"), ...] +For example: +[ + ("name", "ton", "text"), + ("age", 15, "Integer"), + ("school_info", Dict("schoolname"=> "Bodin", "classmates_number"=> 52), "dictionary"), + ("classmate_names", Dataframe_data, "table"), + ("ton_image", image_data, "image"), + ("ton_audio", audio_data, "audio"), + ("ton_birthday_video", video_data, "video"), + ("achievement.zip", file_data, "binary"), +] + + + diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl index 333c8e9..f20c50e 100644 --- a/src/NATSBridge.jl +++ b/src/NATSBridge.jl @@ -3,6 +3,35 @@ # This module provides functionality for sending and receiving data across network boundaries # using NATS as the message bus, with support for both direct payload transport and # URL-based transport for larger payloads. +# +# File Server Handler Architecture: +# The system uses handler functions to abstract file server operations, allowing support +# for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). +# +# Handler Function Signatures: +# +# ```julia +# # Upload handler - uploads data to file server and returns URL +# fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any} +# +# # Download handler - fetches data from file server URL +# fileserverDownloadHandler(fileserver_url::String, url::String, max_retries::Int, base_delay::Int, max_delay::Int)::Vector{UInt8} +# ``` +# +# Multi-Payload Support (Standard API): +# The system uses a standardized list-of-tuples format for all payload operations. +# Even when sending a single payload, the user must wrap it in a list. +# +# API Standard: +# ```julia +# # Input format for smartsend (always a list of tuples with type info) +# [(dataname1, data1, type1), (dataname2, data2, type2), ...] +# +# # Output format for smartreceive (always returns a list of tuples) +# [(dataname1, data1), (dataname2, data2), ...] +# ``` +# +# Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary" module NATSBridge @@ -18,7 +47,7 @@ const DEFAULT_FILESERVER_URL = "http://localhost:8080/upload" # Default HTTP fi struct msgPayload_v1 id::String # id of this payload e.g. "uuid4" dataname::String # name of this payload e.g. "login_image" - type::String # this payload type. Can be "text | json | table | image | audio | video | binary" + type::String # this payload type. Can be "text | dictionary | table | image | audio | video | binary" transport::String # "direct | link" encoding::String # "none | json | base64 | arrow-ipc" size::Integer # data size in bytes e.g. 15433 @@ -127,7 +156,7 @@ function envelope_to_json(env::msgEnvelope_v1) ) if !isempty(env.metadata) # Only include metadata if it exists and is not empty - obj["metadata"] = Dict(String(k) => v for (k, v) in env.metadata) + obj["metadata"] = Dict(String(k) => v for (k, v) in env.metadata) end # Convert payloads to JSON array @@ -164,15 +193,18 @@ function envelope_to_json(env::msgEnvelope_v1) end """ Helper function to get payload bytes from data +This function is kept for compatibility but is no longer used. +All serialization is now handled by `_serialize_data`. """ function _get_payload_bytes(data::Any) - # This is a placeholder - actual implementation depends on data type + # This function is kept for compatibility but is no longer used. + # All serialization is now handled by `_serialize_data`. if isa(data, Vector{UInt8}) return data elseif isa(data, String) return bytes(data) else - return String(data) + return bytes(data) end end @@ -192,22 +224,27 @@ This function intelligently routes data delivery based on payload size relative If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS. Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_upload`) and publishes only the download URL over NATS. +The function accepts a list of (dataname, data, type) tuples as input and processes each payload individually. +Each payload can have a different type, enabling mixed-content messages (e.g., chat with text, images, audio). + The function workflow: -1. Serializes the provided data according to the specified format (`type`) -2. Compares the serialized size against `size_threshold` -3. For small payloads: encodes as Base64, constructs a "direct" msgEnvelope_v1, and publishes to NATS -4. For large payloads: uploads to the fileserver, constructs a "link" msgEnvelope_v1 with the URL, and publishes to NATS +1. Iterates through the list of (dataname, data, type) tuples +2. For each payload: extracts the type from the tuple and serializes accordingly +3. Compares the serialized size against `size_threshold` +4. For small payloads: encodes as Base64, constructs a "direct" msgPayload_v1 +5. For large payloads: uploads to the fileserver, constructs a "link" msgPayload_v1 with the URL # Arguments: - `subject::String` - NATS subject to publish the message to - - `data::Any` - Data payload to send (any Julia object) - - `type::String = "json"` - Serialization format: `"json"` or `"arrow"` + - `data::AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples to send + - `dataname::String` - Name of the payload + - `data::Any` - The actual data to send + - `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary" + - No standalone `type` parameter - type is specified per payload # Keyword Arguments: - - `dataname::String = string(UUIDs.uuid4())` - Filename to use when uploading to fileserver (auto-generated UUID if not provided) - `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server - - `fileserver_url::String = DEFAULT_FILESERVER_URL` - Base URL of the fileserver (e.g., `"http://localhost:8080"`) - - `fileServerUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must match signature of `plik_oneshot_upload`) + - `fileserverUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys) - `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport - `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated - `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc. @@ -224,23 +261,32 @@ The function workflow: ```julia using UUIDs -# Send a small struct directly via NATS +# Send a single payload (still wrapped in a list) data = Dict("key" => "value") -env = smartsend("my.subject", data, "json") +env = smartsend("my.subject", [("dataname1", data, "dictionary")]) + +# Send multiple payloads in one message with different types +data1 = Dict("key1" => "value1") +data2 = rand(10_000) # Small array +env = smartsend("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")]) # Send a large array using fileserver upload data = rand(10_000_000) # ~80 MB -env = smartsend("large.data", data, "arrow") +env = smartsend("large.data", [("large_table", data, "table")]) + +# Mixed content (e.g., chat with text and image) +env = smartsend("chat.subject", [ + ("message_text", "Hello!", "text"), + ("user_image", image_data, "image"), + ("audio_clip", audio_data, "audio") +]) ``` """ function smartsend( subject::String, # smartreceive's subject - data::Any, - type::String = "json"; - dataname="NA", + data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) tuples nats_url::String = DEFAULT_NATS_URL, - fileserver_url::String = DEFAULT_FILESERVER_URL, - fileServerUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver + fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver size_threshold::Int = DEFAULT_SIZE_THRESHOLD, correlation_id::Union{String, Nothing} = nothing, msg_purpose::String = "chat", @@ -255,147 +301,147 @@ function smartsend( log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation - # Serialize data based on type - payload_bytes = _serialize_data(data, type) # Convert data to bytes based on type + # Generate message metadata + msg_id = string(uuid4()) + timestamp = string(Dates.now()) - payload_size = length(payload_bytes) # Calculate payload size in bytes - log_trace(cid, "Serialized payload size: $payload_size bytes") # Log payload size - - # Generate unique IDs - msg_id = string(uuid4()) - timestamp = string(Dates.now()) - - # Decision: Direct vs Link - if payload_size < size_threshold # Check if payload is small enough for direct transport - # Direct path - Base64 encode and send via NATS - payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string - log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice - - # Create msgPayload_v1 for direct transport - payload = msgPayload_v1( - id = string(uuid4()), - dataname = dataname, - type = type, - transport = "direct", - encoding = "base64", - size = payload_size, - data = payload_b64, - metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") - ) - - # Create msgEnvelope_v1 with all fields populated - env = msgEnvelope_v1( - correlationId = cid, - msgId = msg_id, - timestamp = timestamp, - sendTo = subject, - msgPurpose = msg_purpose, - senderName = sender_name, - senderId = string(uuid4()), - receiverName = receiver_name, - receiverId = receiver_id, - replyTo = reply_to, - replyToMsgId = reply_to_msg_id, - brokerURL = nats_url, - metadata = Dict(), - payloads = [payload] - ) - - msg_json = envelope_to_json(env) # Convert envelope to JSON - publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS - - return env # Return the envelope for tracking - else - # Link path - Upload to HTTP server, send URL via NATS - log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice - - # Upload to HTTP server - response = fileServerUploadHandler(fileserver_url, dataname, payload_bytes) - - if response[:status] != 200 # Check if upload was successful - error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed - end - - url = response[:url] # URL for the uploaded data - log_trace(cid, "Uploaded to URL: $url") # Log successful upload + # Process each payload in the list + payloads = msgPayload_v1[] + for (dataname, payload_data, payload_type) in data + # Serialize data based on type + payload_bytes = _serialize_data(payload_data, payload_type) + + payload_size = length(payload_bytes) # Calculate payload size in bytes + log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size + + # Decision: Direct vs Link + if payload_size < size_threshold # Check if payload is small enough for direct transport + # Direct path - Base64 encode and send via NATS + payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string + log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice + + # Create msgPayload_v1 for direct transport + payload = msgPayload_v1( + id = string(uuid4()), + dataname = dataname, + type = payload_type, + transport = "direct", + encoding = "base64", + size = payload_size, + data = payload_b64, + metadata = Dict("content_length" => payload_size) + ) + push!(payloads, payload) + else + # Link path - Upload to HTTP server, send URL via NATS + log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice + + # Upload to HTTP server + response = fileserverUploadHandler(DEFAULT_FILESERVER_URL, dataname, payload_bytes) + + if response[:status] != 200 # Check if upload was successful + error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed + end + + url = response[:url] # URL for the uploaded data + log_trace(cid, "Uploaded to URL: $url") # Log successful upload - # Create msgPayload_v1 for link transport - payload = msgPayload_v1( - id = string(uuid4()), - dataname = dataname, - type = type, - transport = "link", - encoding = "none", - size = payload_size, - data = url, - metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") - ) - - # Create msgEnvelope_v1 with all fields populated - env = msgEnvelope_v1( - correlationId = cid, - msgId = msg_id, - timestamp = timestamp, - sendTo = subject, - msgPurpose = msg_purpose, - senderName = sender_name, - senderId = string(uuid4()), - receiverName = receiver_name, - receiverId = receiver_id, - replyTo = reply_to, - replyToMsgId = reply_to_msg_id, - brokerURL = nats_url, - metadata = Dict(), - payloads = [payload] - ) - - msg_json = envelope_to_json(env) # Convert envelope to JSON - publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS - - return env # Return the envelope for tracking - end + # Create msgPayload_v1 for link transport + payload = msgPayload_v1( + id = string(uuid4()), + dataname = dataname, + type = payload_type, + transport = "link", + encoding = "none", + size = payload_size, + data = url, + metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") + ) + push!(payloads, payload) + end + end + + # Create msgEnvelope_v1 with all payloads + env = msgEnvelope_v1( + correlationId = cid, + msgId = msg_id, + timestamp = timestamp, + sendTo = subject, + msgPurpose = msg_purpose, + senderName = sender_name, + senderId = string(uuid4()), + receiverName = receiver_name, + receiverId = receiver_id, + replyTo = reply_to, + replyToMsgId = reply_to_msg_id, + brokerURL = nats_url, + metadata = Dict(), + payloads = payloads + ) + + msg_json = envelope_to_json(env) # Convert envelope to JSON + publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS + + return env # Return the envelope for tracking end """ _serialize_data - Serialize data according to specified format This function serializes arbitrary Julia data into a binary representation based on the specified format. -It supports three serialization formats: -- `"json"`: Serializes data as JSON and returns the UTF-8 byte representation +It supports multiple serialization formats: +- `"text"`: Treats data as text and converts to UTF-8 bytes +- `"dictionary"`: Serializes data as JSON and returns the UTF-8 byte representation - `"table"`: Serializes data as an Arrow IPC stream (table format) and returns the byte stream -- `"binary"`: Expects already-binary data (either `IOBuffer` or `Vector{UInt8}`) and returns it as bytes +- `"image"`: Expects binary image data (Vector{UInt8}) and returns it as bytes +- `"audio"`: Expects binary audio data (Vector{UInt8}) and returns it as bytes +- `"video"`: Expects binary video data (Vector{UInt8}) and returns it as bytes +- `"binary"`: Generic binary data (Vector{UInt8} or IOBuffer) and returns bytes The function handles format-specific serialization logic: -1. For `"json"`: Converts Julia data to JSON string, then encodes to bytes -2. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer -3. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly +1. For `"text"`: Converts string to UTF-8 bytes +2. For `"dictionary"`: Converts Julia data to JSON string, then encodes to bytes +3. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer +4. For `"image"`, `"audio"`, `"video"`: Treats data as binary (Vector{UInt8}) +5. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly # Arguments: - - `data::Any` - Data to serialize (JSON-serializable for `"json"`, table-like for `"table"`, binary for `"binary"`) - - `type::String` - Target format: `"json"`, `"table"`, or `"binary"` + - `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`) # Return: - `Vector{UInt8}` - Binary representation of the serialized data # Throws: - - `Error` if `type` is not one of `"json"`, `"table"`, or `"binary"` - - `Error` if `type == "binary"` but `data` is neither `IOBuffer` nor `Vector{UInt8}` + - `Error` if `type` is not one of the supported types + - `Error` if `type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}` # Example ```julia using JSON, Arrow, DataFrames +# Text serialization +text_data = "Hello, World!" +text_bytes = _serialize_data(text_data, "text") + # JSON serialization json_data = Dict("name" => "Alice", "age" => 30) -json_bytes = _serialize_data(json_data, "json") +json_bytes = _serialize_data(json_data, "dictionary") # Table serialization with a DataFrame (recommended for tabular data) df = DataFrame(id = 1:3, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) table_bytes = _serialize_data(df, "table") -# Table serialization with named tuple of vectors (also supported) -nt = (id = [1, 2, 3], name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) -table_bytes_nt = _serialize_data(nt, "table") +# Image data (Vector{UInt8}) +image_bytes = UInt8[1, 2, 3] # Image bytes +image_serialized = _serialize_data(image_bytes, "image") + +# Audio data (Vector{UInt8}) +audio_bytes = UInt8[1, 2, 3] # Audio bytes +audio_serialized = _serialize_data(audio_bytes, "audio") + +# Video data (Vector{UInt8}) +video_bytes = UInt8[1, 2, 3] # Video bytes +video_serialized = _serialize_data(video_bytes, "video") # Binary data (IOBuffer) buf = IOBuffer() @@ -407,13 +453,37 @@ binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary") ``` """ function _serialize_data(data::Any, type::String) - if type == "json" # JSON data - serialize directly + if type == "text" # Text data - convert to UTF-8 bytes + if isa(data, String) + return bytes(data) # Convert string to UTF-8 bytes + else + error("Text data must be a String") + end + elseif type == "dictionary" # JSON data - serialize directly json_str = JSON.json(data) # Convert Julia data to JSON string return bytes(json_str) # Convert JSON string to bytes elseif type == "table" # Table data - convert to Arrow IPC stream io = IOBuffer() # Create in-memory buffer Arrow.write(io, data) # Write data as Arrow IPC stream to buffer return take!(io) # Return the buffer contents as bytes + elseif type == "image" # Image data - treat as binary + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Image data must be Vector{UInt8}") + end + elseif type == "audio" # Audio data - treat as binary + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Audio data must be Vector{UInt8}") + end + elseif type == "video" # Video data - treat as binary + if isa(data, Vector{UInt8}) + return data # Return binary data directly + else + error("Video data must be Vector{UInt8}") + end elseif type == "binary" # Binary data - treat as binary if isa(data, IOBuffer) # Check if data is an IOBuffer return take!(data) # Return buffer contents as bytes @@ -457,19 +527,27 @@ A HTTP file server is required along with its upload function. Arguments: - `msg::NATS.Message` - NATS message to process + - `fileserverDownloadHandler::Function` - Function to handle downloading data from file server URLs Keyword Arguments: - - `fileserver_url::String` - HTTP file server URL for link transport (default: DEFAULT_FILESERVER_URL) - `max_retries::Int` - Maximum retry attempts for fetching URL (default: 5) - `base_delay::Int` - Initial delay for exponential backoff in ms (default: 100) - `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000) Return: - - Tuple `(data = deserialized_data, envelope = msgEnvelope_v1)` - Data and envelope + - `AbstractArray{Tuple{String, Any}}` - List of (dataname, data) tuples + +# Example +```julia +# Receive and process message +msg = nats_message # NATS message +payloads = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay) +# payloads = [("dataname1", data1), ("dataname2", data2), ...] +``` """ function smartreceive( - msg::NATS.Msg; - fileserver_url::String = DEFAULT_FILESERVER_URL, + msg::NATS.Message, + fileserverDownloadHandler::Function; max_retries::Int = 5, base_delay::Int = 100, max_delay::Int = 5000 @@ -477,77 +555,53 @@ function smartreceive( # Parse the JSON envelope json_data = JSON.parse(String(msg.payload)) - # Get transport from the first payload - transport = String(json_data["payloads"][1]["transport"]) log_trace(json_data["correlationId"], "Processing received message") # Log message processing start - if transport == "direct" # Direct transport - payload is in the message - log_trace(json_data["correlationId"], "Direct transport - decoding payload") # Log direct transport handling + # Process all payloads in the envelope + payloads_list = Tuple{String, Any}[] + + # Get number of payloads + num_payloads = length(json_data["payloads"]) + + for i in 1:num_payloads + payload_data = json_data["payloads"][i] + transport = String(payload_data["transport"]) + dataname = String(payload_data["dataname"]) - # Extract base64 payload from the first payload - payload_b64 = String(json_data["payloads"][1]["data"]) - - # Decode Base64 payload - payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes - - # Deserialize based on type - data_type = String(json_data["payloads"][1]["type"]) - data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"], Dict{String, Any}()) - - # Create msgEnvelope_v1 from parsed data - env = msgEnvelope_v1( - correlationId = json_data["correlationId"], - msgId = haskey(json_data, "msgId") ? String(json_data["msgId"]) : "", - timestamp = haskey(json_data, "timestamp") ? String(json_data["timestamp"]) : "", - sendTo = json_data["sendTo"], - msgPurpose = haskey(json_data, "msgPurpose") ? String(json_data["msgPurpose"]) : "", - senderName = haskey(json_data, "senderName") ? String(json_data["senderName"]) : "", - senderId = haskey(json_data, "senderId") ? String(json_data["senderId"]) : "", - receiverName = haskey(json_data, "receiverName") ? String(json_data["receiverName"]) : "", - receiverId = haskey(json_data, "receiverId") ? String(json_data["receiverId"]) : "", - replyTo = haskey(json_data, "replyTo") ? String(json_data["replyTo"]) : "", - replyToMsgId = haskey(json_data, "replyToMsgId") ? String(json_data["replyToMsgId"]) : "", - brokerURL = haskey(json_data, "brokerURL") ? String(json_data["brokerURL"]) : DEFAULT_NATS_URL, - metadata = Dict{String, Any}(), - payloads = msgPayload_v1[] - ) - - return (data = data, envelope = env) # Return data and envelope as tuple - elseif transport == "link" # Link transport - payload is at URL - log_trace(json_data["correlationId"], "Link transport - fetching from URL") # Log link transport handling - - # Extract URL from the first payload - url = String(json_data["payloads"][1]["data"]) - - # Fetch with exponential backoff - downloaded_data = _fetch_with_backoff(url, max_retries, base_delay, max_delay, json_data["correlationId"]) # Fetch data from URL - - # Deserialize based on type - data_type = String(json_data["payloads"][1]["type"]) - data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"], Dict{String, Any}()) - - # Create msgEnvelope_v1 from parsed data - env = msgEnvelope_v1( - correlationId = json_data["correlationId"], - msgId = haskey(json_data, "msgId") ? String(json_data["msgId"]) : "", - timestamp = haskey(json_data, "timestamp") ? String(json_data["timestamp"]) : "", - sendTo = json_data["sendTo"], - msgPurpose = haskey(json_data, "msgPurpose") ? String(json_data["msgPurpose"]) : "", - senderName = haskey(json_data, "senderName") ? String(json_data["senderName"]) : "", - senderId = haskey(json_data, "senderId") ? String(json_data["senderId"]) : "", - receiverName = haskey(json_data, "receiverName") ? String(json_data["receiverName"]) : "", - receiverId = haskey(json_data, "receiverId") ? String(json_data["receiverId"]) : "", - replyTo = haskey(json_data, "replyTo") ? String(json_data["replyTo"]) : "", - replyToMsgId = haskey(json_data, "replyToMsgId") ? String(json_data["replyToMsgId"]) : "", - brokerURL = haskey(json_data, "brokerURL") ? String(json_data["brokerURL"]) : DEFAULT_NATS_URL, - metadata = Dict{String, Any}(), - payloads = msgPayload_v1[] - ) - - return (data = data, envelope = env) # Return data and envelope as tuple - else # Unknown transport type - error("Unknown transport type: $(transport)") # Throw error for unknown transport + if transport == "direct" # Direct transport - payload is in the message + log_trace(json_data["correlationId"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling + + # Extract base64 payload from the payload + payload_b64 = String(payload_data["data"]) + + # Decode Base64 payload + payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes + + # Deserialize based on type + data_type = String(payload_data["type"]) + data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"]) + + push!(payloads_list, (dataname, data)) + elseif transport == "link" # Link transport - payload is at URL + log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL") # Log link transport handling + + # Extract URL from the payload + url = String(payload_data["data"]) + + # Fetch with exponential backoff using the download handler + downloaded_data = fileserverDownloadHandler(DEFAULT_FILESERVER_URL, url, max_retries, base_delay, max_delay) + + # Deserialize based on type + data_type = String(payload_data["type"]) + data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"]) + + push!(payloads_list, (dataname, data)) + else # Unknown transport type + error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport + end end + + return payloads_list # Return list of (dataname, data) tuples end @@ -598,31 +652,37 @@ end """ Deserialize bytes to data based on type This internal function converts serialized bytes back to Julia data based on type. -It handles "json" (JSON deserialization), "table" (Arrow IPC deserialization), -and "binary" (binary data). +It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow IPC deserialization), +"image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data). Arguments: - `data::Vector{UInt8}` - Serialized data as bytes - - `type::String` - Data type ("json", "table", "binary") + - `type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary") - `correlation_id::String` - Correlation ID for logging - - `metadata::Dict{String, Any}` - Metadata about the data Return: - - Deserialized data (DataFrame for "table", JSON data for "json", bytes for "binary") + - Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary") """ function _deserialize_data( data::Vector{UInt8}, type::String, - correlation_id::String, - metadata::Dict{String, Any} + correlation_id::String ) - if type == "json" # JSON data - deserialize + if type == "text" # Text data - convert to string + return String(data) # Convert bytes to string + elseif type == "dictionary" # JSON data - deserialize json_str = String(data) # Convert bytes to string return JSON.parse(json_str) # Parse JSON string to Julia data structure elseif type == "table" # Table data - deserialize Arrow IPC stream io = IOBuffer(data) # Create buffer from bytes df = Arrow.Table(io) # Read Arrow IPC format from buffer return df # Return DataFrame + elseif type == "image" # Image data - return binary + return data # Return bytes directly + elseif type == "audio" # Audio data - return binary + return data # Return bytes directly + elseif type == "video" # Video data - return binary + return data # Return bytes directly elseif type == "binary" # Binary data - return binary return data # Return bytes directly else # Unknown type @@ -648,26 +708,25 @@ The function workflow: - `data::Vector{UInt8}` - Raw byte data of the file content # Return: - - A named tuple with fields: - - `status::Integer` - HTTP server response status - - `uploadid::String` - ID of the one-shot upload session - - `fileid::String` - ID of the uploaded file within the session - - `url::String` - Full URL to download the uploaded file + - A Dict with keys: + - `"status"` - HTTP server response status + - `"uploadid"` - ID of the one-shot upload session + - `"fileid"` - ID of the uploaded file within the session + - `"url"` - Full URL to download the uploaded file # Example -```jldoctest +```julia using HTTP, JSON fileServerURL = "http://localhost:8080" -filepath = "./test.zip" -filename = basename(filepath) -filebytes = read(filepath) # read(filepath) output is raw bytes of the file +filename = "test.txt" +data = UInt8["hello world"] # Upload to local plik server -status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filename, filebytes) +result = plik_oneshot_upload(fileServerURL, filename, data) -# to download an uploaded file -curl -L -O "url" +# Access the result as a Dict +# result["status"], result["uploadid"], result["fileid"], result["url"] ``` """ function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8}) @@ -697,18 +756,17 @@ function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vect httpResponse = nothing try httpResponse = HTTP.post(url_upload, headers, form) - # println("Status: ", httpResponse.status) responseJson = JSON.parse(httpResponse.body) catch e @error "Request failed" exception=e end - fileid=responseJson["id"] + fileid = responseJson["id"] # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" url = "$fileServerURL/file/$uploadid/$fileid/$filename" - return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url) + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end @@ -730,11 +788,11 @@ The function workflow: - `filepath::String` - Full path to the local file to upload # Return: - - A named tuple with fields: - - `status::Integer` - HTTP server response status - - `uploadid::String` - ID of the one-shot upload session - - `fileid::String` - ID of the uploaded file within the session - - `url::String` - Full URL to download the uploaded file + - A Dict with keys: + - `"status"` - HTTP server response status + - `"uploadid"` - ID of the one-shot upload session + - `"fileid"` - ID of the uploaded file within the session + - `"url"` - Full URL to download the uploaded file # Example ```julia @@ -744,10 +802,10 @@ fileServerURL = "http://localhost:8080" filepath = "./test.zip" # Upload to local plik server -status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filepath) +result = plik_oneshot_upload(fileServerURL, filepath) -# To download the uploaded file later (via curl as example): -curl -L -O "url" +# Access the result as a Dict +# result["status"], result["uploadid"], result["fileid"], result["url"] ``` """ function plik_oneshot_upload(fileServerURL::String, filepath::String) @@ -763,7 +821,6 @@ function plik_oneshot_upload(fileServerURL::String, filepath::String) uploadid = responseJson["id"] uploadtoken = responseJson["uploadToken"] - println("uploadid = ", uploadid) # ------------------------------------------ upload file ----------------------------------------- # # Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID @@ -780,19 +837,18 @@ function plik_oneshot_upload(fileServerURL::String, filepath::String) httpResponse = nothing try httpResponse = HTTP.post(url_upload, headers, form) - # println("Status: ", httpResponse.status) responseJson = JSON.parse(httpResponse.body) catch e @error "Request failed" exception=e end - fileid=responseJson["id"] + fileid = responseJson["id"] # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" url = "$fileServerURL/file/$uploadid/$fileid/$filename" - return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url) + return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end -end # module \ No newline at end of file +end # module diff --git a/src/NATSBridge.js b/src/NATSBridge.js index eda1329..d604fbe 100644 --- a/src/NATSBridge.js +++ b/src/NATSBridge.js @@ -12,6 +12,9 @@ const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB const DEFAULT_NATS_URL = 'nats://localhost:4222'; const DEFAULT_FILESERVER_URL = 'http://localhost:8080/upload'; +// Supported payload types +const PAYLOAD_TYPES = ['text', 'dictionary', 'table', 'image', 'audio', 'video', 'binary']; + // Logging helper function logTrace(correlationId, message) { const timestamp = new Date().toISOString(); @@ -64,64 +67,6 @@ class MessageEnvelope { } } -// SmartSend for JavaScript - Handles transport selection based on payload size -async function SmartSend(subject, data, type = 'json', options = {}) { - const { - natsUrl = DEFAULT_NATS_URL, - fileserverUrl = DEFAULT_FILESERVER_URL, - sizeThreshold = DEFAULT_SIZE_THRESHOLD, - correlationId = uuidv4() - } = options; - - logTrace(correlationId, `Starting SmartSend for subject: ${subject}`); - - // Serialize data based on type - const payloadBytes = _serializeData(data, type, correlationId); - const payloadSize = payloadBytes.length; - - logTrace(correlationId, `Serialized payload size: ${payloadSize} bytes`); - - // Decision: Direct vs Link - if (payloadSize < sizeThreshold) { - // Direct path - Base64 encode and send via NATS - const payloadBase64 = encode(payloadBytes); - logTrace(correlationId, `Using direct transport for ${payloadSize} bytes`); - - const env = new MessageEnvelope({ - correlation_id: correlationId, - type: type, - transport: 'direct', - payload: payloadBase64, - metadata: { - content_length: payloadSize.toString(), - format: 'arrow_ipc_stream' - } - }); - - await publishMessage(natsUrl, subject, env.toJSON(), correlationId); - return env; - } else { - // Link path - Upload to HTTP server, send URL via NATS - logTrace(correlationId, `Using link transport, uploading to fileserver`); - - const url = await uploadToServer(payloadBytes, fileserverUrl, correlationId); - - const env = new MessageEnvelope({ - correlation_id: correlationId, - type: type, - transport: 'link', - url: url, - metadata: { - content_length: payloadSize.toString(), - format: 'arrow_ipc_stream' - } - }); - - await publishMessage(natsUrl, subject, env.toJSON(), correlationId); - return env; - } -} - // Helper: Serialize data based on type function _serializeData(data, type, correlationId) { if (type === 'json') { @@ -146,6 +91,106 @@ function _serializeData(data, type, correlationId) { } } +// Helper: Upload data to fileserver (mock implementation) +async function uploadToServer(data, fileserverUrl, correlationId) { + // This is a placeholder - in real implementation, this would upload to the fileserver + // and return the URL. For now, we return a mock URL. + return `${fileserverUrl}/mock-upload-${Date.now()}`; +} + +// SmartSend for JavaScript - Handles transport selection based on payload size +// Input format: [(dataname, data, type), ...] +async function SmartSend(subject, data, options = {}) { + const { + natsUrl = DEFAULT_NATS_URL, + fileserverUrl = DEFAULT_FILESERVER_URL, + sizeThreshold = DEFAULT_SIZE_THRESHOLD, + correlationId = uuidv4() + } = options; + + logTrace(correlationId, `Starting SmartSend for subject: ${subject}`); + + // Process each payload in the list + const payloadResults = []; + + for (let i = 0; i < data.length; i++) { + const tuple = data[i]; + if (tuple.length !== 3) { + throw new Error(`Payload at index ${i} must be a tuple of [dataname, data, type]`); + } + + const [dataname, payload_data, payload_type] = tuple; + + // Validate type + if (!PAYLOAD_TYPES.includes(payload_type)) { + throw new Error(`Unknown payload type '${payload_type}' for payload '${dataname}'. Supported types: ${PAYLOAD_TYPES.join(', ')}`); + } + + // Serialize data based on type + const payloadBytes = _serializeData(payload_data, payload_type, correlationId); + const payloadSize = payloadBytes.length; + + logTrace(correlationId, `Serialized payload '${dataname}' (type: ${payload_type}) size: ${payloadSize} bytes`); + + // Decision: Direct vs Link + if (payloadSize < sizeThreshold) { + // Direct path - Base64 encode and send via NATS + const payloadBase64 = encode(payloadBytes); + logTrace(correlationId, `Using direct transport for ${payloadSize} bytes`); + + payloadResults.push({ + dataname, + payload_type, + transport: 'direct', + payload: payloadBase64, + metadata: { + content_length: payloadSize.toString(), + format: 'arrow_ipc_stream' + } + }); + } else { + // Link path - Upload to HTTP server, send URL via NATS + logTrace(correlationId, `Using link transport, uploading to fileserver`); + + const url = await uploadToServer(payloadBytes, fileserverUrl, correlationId); + + payloadResults.push({ + dataname, + payload_type, + transport: 'link', + url: url, + metadata: { + content_length: payloadSize.toString(), + format: 'arrow_ipc_stream' + } + }); + } + } + + // Build the final message with all payloads + const allPayloads = payloadResults.map(p => ({ + dataname: p.dataname, + type: p.payload_type, + transport: p.transport, + ...(p.transport === 'direct' ? { payload: p.payload } : { url: p.url }), + metadata: p.metadata + })); + + // Create envelope and publish + const env = { + correlation_id: correlationId, + type: allPayloads[0].type, // Use first payload's type as envelope type + transport: allPayloads[0].transport, + payload: allPayloads.length === 1 && allPayloads[0].transport === 'direct' ? allPayloads[0].payload : undefined, + url: allPayloads.length === 1 && allPayloads[0].transport === 'link' ? allPayloads[0].url : undefined, + metadata: {}, + _payloads: allPayloads // Internal storage for multiple payloads + }; + + await publishMessage(natsUrl, subject, JSON.stringify(env), correlationId); + return env; +} + // Helper: Publish message to NATS async function publishMessage(natsUrl, subject, message, correlationId) { const { connect } = require('nats');