This commit is contained in:
2026-02-15 21:02:22 +07:00
parent f0df169689
commit 6b49fa68c0
5 changed files with 510 additions and 358 deletions

View File

@@ -12,3 +12,4 @@ Role: Principal Systems Architect & Lead Software Engineer.Objective: Implement
Create a walkthrough for Julia service-A service sending a mix-content chat message to Julia service-B. the chat message must includes

View File

@@ -26,33 +26,55 @@ The system uses a **standardized list-of-tuples format** for all payload operati
**API Standard:** **API Standard:**
```julia ```julia
# Input format for smartsend (always a list of tuples) # Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1), (dataname2, data2), ...] [(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (always returns a list of tuples) # Output format for smartreceive (always returns a list of tuples)
[(dataname1, data1), (dataname2, data2), ...] [(dataname1, data1), (dataname2, data2), ...]
``` ```
**Supported Types:**
- `"text"` - Plain text
- `"dictionary"` - JSON-serializable dictionaries (Dict, NamedTuple)
- `"table"` - Tabular data (DataFrame, array of structs)
- `"image"` - Image data (Bitmap, PNG/JPG bytes)
- `"audio"` - Audio data (WAV, MP3 bytes)
- `"video"` - Video data (MP4, AVI bytes)
- `"binary"` - Generic binary data (Vector{UInt8})
This design allows per-payload type specification, enabling **mixed-content messages** where different payloads can use different serialization formats in a single message.
**Examples:** **Examples:**
```julia ```julia
# Single payload - still wrapped in a list # Single payload - still wrapped in a list
smartsend( smartsend(
"/test", "/test",
[("dataname1", data1)], # List with one tuple [("dataname1", data1, "dictionary")], # List with one tuple (data, type)
nats_url="nats://localhost:4222", nats_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload, fileserverUploadHandler=plik_oneshot_upload,
metadata=user_provided_envelope_level_metadata metadata=user_provided_envelope_level_metadata
) )
# Multiple payloads in one message # Multiple payloads in one message with different types
smartsend( smartsend(
"/test", "/test",
[("dataname1", data1), ("dataname2", data2)], [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")],
nats_url="nats://localhost:4222", nats_url="nats://localhost:4222",
fileserverUploadHandler=plik_oneshot_upload fileserverUploadHandler=plik_oneshot_upload
) )
# Mixed content (e.g., chat with text, image, audio)
smartsend(
"/chat",
[
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
],
nats_url="nats://localhost:4222"
)
# Receive always returns a list # Receive always returns a list
payloads = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay) payloads = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay)
# payloads = [("dataname1", data1), ("dataname2", data2), ...] # payloads = [("dataname1", data1), ("dataname2", data2), ...]
@@ -174,7 +196,7 @@ The `msgPayload_v1` structure provides flexible payload handling for various dat
struct msgPayload_v1 struct msgPayload_v1
id::String # Id of this payload (e.g., "uuid4") id::String # Id of this payload (e.g., "uuid4")
dataname::String # Name of this payload (e.g., "login_image") dataname::String # Name of this payload (e.g., "login_image")
type::String # "text | json | table | image | audio | video | binary" type::String # "text | dictionary | table | image | audio | video | binary"
transport::String # "direct | link" transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc" encoding::String # "none | json | base64 | arrow-ipc"
size::Integer # Data size in bytes size::Integer # Data size in bytes
@@ -184,7 +206,7 @@ end
``` ```
**Key Features:** **Key Features:**
- Supports multiple data types: text, json, table, image, audio, video, binary - Supports multiple data types: text, dictionary, table, image, audio, video, binary
- Flexible transport: "direct" (NATS) or "link" (HTTP fileserver) - Flexible transport: "direct" (NATS) or "link" (HTTP fileserver)
- Multiple payloads per message (essential for chat with mixed content) - Multiple payloads per message (essential for chat with mixed content)
- Per-payload and per-envelope metadata support - Per-payload and per-envelope metadata support
@@ -194,12 +216,16 @@ end
``` ```
┌─────────────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────────────┐
│ smartsend Function │ │ smartsend Function │
│ Accepts: [(dataname1, data1), (dataname2, data2), ...] │ Accepts: [(dataname1, data1, type1), ...]
│ (No standalone type parameter - type per payload) │
└─────────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────────────┐
Is payload size < 1MB? For each payload:
│ 1. Extract type from tuple │
│ 2. Serialize based on type │
│ 3. Check payload size │
└─────────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────────┘
┌────────────────┴─-────────────────┐ ┌────────────────┴─-────────────────┐
@@ -272,8 +298,7 @@ graph TD
```julia ```julia
function smartsend( function smartsend(
subject::String, subject::String,
data::AbstractArray{Tuple{String, Any}}, data::AbstractArray{Tuple{String, Any, String}}; # No standalone type parameter
type::String = "json";
nats_url::String = "nats://localhost:4222", nats_url::String = "nats://localhost:4222",
fileserverUploadHandler::Function = plik_oneshot_upload, fileserverUploadHandler::Function = plik_oneshot_upload,
size_threshold::Int = 1_000_000 # 1MB size_threshold::Int = 1_000_000 # 1MB
@@ -281,12 +306,13 @@ function smartsend(
``` ```
**Input Format:** **Input Format:**
- `data::AbstractArray{Tuple{String, Any}}` - **Must be a list of tuples**: `[("dataname1", data1), ("dataname2", data2), ...]` - `data::AbstractArray{Tuple{String, Any, String}}` - **Must be a list of (dataname, data, type) tuples**: `[("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1)]` - Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
**Flow:** **Flow:**
1. Iterate through the list of `("dataname", data)` tuples 1. Iterate through the list of `(dataname, data, type)` tuples
2. For each payload: serialize to Arrow IPC stream (if table) or JSON 2. For each payload: extract the type from the tuple and serialize accordingly
3. Check payload size 3. Check payload size
4. If < threshold: publish directly to NATS with Base64-encoded payload 4. If < threshold: publish directly to NATS with Base64-encoded payload
5. If >= threshold: upload to HTTP server, publish NATS with URL 5. If >= threshold: upload to HTTP server, publish NATS with URL
@@ -322,7 +348,7 @@ end
- Determine transport type (`direct` or `link`) - Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message - If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff - If `link`: fetch data from URL using exponential backoff
- Deserialize based on payload type (`json`, `table`, `binary`, etc.) - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return list of `(dataname, data)` tuples 4. Return list of `(dataname, data)` tuples
### JavaScript Implementation ### JavaScript Implementation
@@ -335,17 +361,26 @@ end
#### smartsend Function #### smartsend Function
```javascript ```javascript
async function smartsend(subject, data, type = 'json', options = {}) async function smartsend(subject, data, options = {})
// data format: [(dataname, data, type), ...]
// options object should include: // options object should include:
// - fileserverUploadHandler: function to upload data to file server // - natsUrl: NATS server URL
// - fileserver_url: base URL of the file server // - fileserverUrl: base URL of the file server
// - sizeThreshold: threshold in bytes for transport selection
// - correlationId: optional correlation ID for tracing
``` ```
**Input Format:**
- `data` - **Must be a list of (dataname, data, type) tuples**: `[(dataname1, data1, "type1"), (dataname2, data2, "type2"), ...]`
- Even for single payloads: `[(dataname1, data1, "type1")]`
- Each payload can have a different type, enabling mixed-content messages
**Flow:** **Flow:**
1. Serialize data to Arrow IPC buffer (if table) 1. Iterate through the list of (dataname, data, type) tuples
2. Check payload size 2. For each payload: extract the type from the tuple and serialize accordingly
3. If < threshold: publish directly to NATS 3. Check payload size
4. If >= threshold: upload to HTTP server, publish NATS with URL 4. If < threshold: publish directly to NATS
5. If >= threshold: upload to HTTP server, publish NATS with URL
#### smartreceive Handler #### smartreceive Handler
@@ -366,12 +401,12 @@ async function smartreceive(msg, options = {})
- Determine transport type (`direct` or `link`) - Determine transport type (`direct` or `link`)
- If `direct`: decode Base64 data from the message - If `direct`: decode Base64 data from the message
- If `link`: fetch data from URL using exponential backoff - If `link`: fetch data from URL using exponential backoff
- Deserialize based on payload type (`json`, `table`, `binary`, etc.) - Deserialize based on payload type (`dictionary`, `table`, `binary`, etc.)
4. Return list of `(dataname, data)` tuples 4. Return list of `(dataname, data)` tuples
## Scenario Implementations ## Scenario Implementations
### Scenario 1: Command & Control (Small JSON) ### Scenario 1: Command & Control (Small Dictionary)
**Julia (Receiver):** **Julia (Receiver):**
```julia ```julia
@@ -383,8 +418,8 @@ async function smartreceive(msg, options = {})
**JavaScript (Sender):** **JavaScript (Sender):**
```javascript ```javascript
// Create small JSON config // Create small dictionary config
// Send via smartsend with type="json" // Send via smartsend with type="dictionary"
``` ```
### Scenario 2: Deep Dive Analysis (Large Arrow Table) ### Scenario 2: Deep Dive Analysis (Large Arrow Table)

15
etc.jl
View File

@@ -0,0 +1,15 @@
the user will provide data in this form: [("dataname1", data1, "datatype1"), ("dataname2", data2, "datatype2"), ...]
For example:
[
("name", "ton", "text"),
("age", 15, "Integer"),
("school_info", Dict("schoolname"=> "Bodin", "classmates_number"=> 52), "dictionary"),
("classmate_names", Dataframe_data, "table"),
("ton_image", image_data, "image"),
("ton_audio", audio_data, "audio"),
("ton_birthday_video", video_data, "video"),
("achievement.zip", file_data, "binary"),
]

View File

@@ -3,6 +3,35 @@
# This module provides functionality for sending and receiving data across network boundaries # This module provides functionality for sending and receiving data across network boundaries
# using NATS as the message bus, with support for both direct payload transport and # using NATS as the message bus, with support for both direct payload transport and
# URL-based transport for larger payloads. # URL-based transport for larger payloads.
#
# File Server Handler Architecture:
# The system uses handler functions to abstract file server operations, allowing support
# for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
#
# Handler Function Signatures:
#
# ```julia
# # Upload handler - uploads data to file server and returns URL
# fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
#
# # Download handler - fetches data from file server URL
# fileserverDownloadHandler(fileserver_url::String, url::String, max_retries::Int, base_delay::Int, max_delay::Int)::Vector{UInt8}
# ```
#
# Multi-Payload Support (Standard API):
# The system uses a standardized list-of-tuples format for all payload operations.
# Even when sending a single payload, the user must wrap it in a list.
#
# API Standard:
# ```julia
# # Input format for smartsend (always a list of tuples with type info)
# [(dataname1, data1, type1), (dataname2, data2, type2), ...]
#
# # Output format for smartreceive (always returns a list of tuples)
# [(dataname1, data1), (dataname2, data2), ...]
# ```
#
# Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
module NATSBridge module NATSBridge
@@ -18,7 +47,7 @@ const DEFAULT_FILESERVER_URL = "http://localhost:8080/upload" # Default HTTP fi
struct msgPayload_v1 struct msgPayload_v1
id::String # id of this payload e.g. "uuid4" id::String # id of this payload e.g. "uuid4"
dataname::String # name of this payload e.g. "login_image" dataname::String # name of this payload e.g. "login_image"
type::String # this payload type. Can be "text | json | table | image | audio | video | binary" type::String # this payload type. Can be "text | dictionary | table | image | audio | video | binary"
transport::String # "direct | link" transport::String # "direct | link"
encoding::String # "none | json | base64 | arrow-ipc" encoding::String # "none | json | base64 | arrow-ipc"
size::Integer # data size in bytes e.g. 15433 size::Integer # data size in bytes e.g. 15433
@@ -164,15 +193,18 @@ function envelope_to_json(env::msgEnvelope_v1)
end end
""" Helper function to get payload bytes from data """ Helper function to get payload bytes from data
This function is kept for compatibility but is no longer used.
All serialization is now handled by `_serialize_data`.
""" """
function _get_payload_bytes(data::Any) function _get_payload_bytes(data::Any)
# This is a placeholder - actual implementation depends on data type # This function is kept for compatibility but is no longer used.
# All serialization is now handled by `_serialize_data`.
if isa(data, Vector{UInt8}) if isa(data, Vector{UInt8})
return data return data
elseif isa(data, String) elseif isa(data, String)
return bytes(data) return bytes(data)
else else
return String(data) return bytes(data)
end end
end end
@@ -192,22 +224,27 @@ This function intelligently routes data delivery based on payload size relative
If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS. If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS.
Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_upload`) and publishes only the download URL over NATS. Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_upload`) and publishes only the download URL over NATS.
The function accepts a list of (dataname, data, type) tuples as input and processes each payload individually.
Each payload can have a different type, enabling mixed-content messages (e.g., chat with text, images, audio).
The function workflow: The function workflow:
1. Serializes the provided data according to the specified format (`type`) 1. Iterates through the list of (dataname, data, type) tuples
2. Compares the serialized size against `size_threshold` 2. For each payload: extracts the type from the tuple and serializes accordingly
3. For small payloads: encodes as Base64, constructs a "direct" msgEnvelope_v1, and publishes to NATS 3. Compares the serialized size against `size_threshold`
4. For large payloads: uploads to the fileserver, constructs a "link" msgEnvelope_v1 with the URL, and publishes to NATS 4. For small payloads: encodes as Base64, constructs a "direct" msgPayload_v1
5. For large payloads: uploads to the fileserver, constructs a "link" msgPayload_v1 with the URL
# Arguments: # Arguments:
- `subject::String` - NATS subject to publish the message to - `subject::String` - NATS subject to publish the message to
- `data::Any` - Data payload to send (any Julia object) - `data::AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples to send
- `type::String = "json"` - Serialization format: `"json"` or `"arrow"` - `dataname::String` - Name of the payload
- `data::Any` - The actual data to send
- `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
- No standalone `type` parameter - type is specified per payload
# Keyword Arguments: # Keyword Arguments:
- `dataname::String = string(UUIDs.uuid4())` - Filename to use when uploading to fileserver (auto-generated UUID if not provided)
- `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server - `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server
- `fileserver_url::String = DEFAULT_FILESERVER_URL` - Base URL of the fileserver (e.g., `"http://localhost:8080"`) - `fileserverUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
- `fileServerUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must match signature of `plik_oneshot_upload`)
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport - `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated - `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc. - `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
@@ -224,23 +261,32 @@ The function workflow:
```julia ```julia
using UUIDs using UUIDs
# Send a small struct directly via NATS # Send a single payload (still wrapped in a list)
data = Dict("key" => "value") data = Dict("key" => "value")
env = smartsend("my.subject", data, "json") env = smartsend("my.subject", [("dataname1", data, "dictionary")])
# Send multiple payloads in one message with different types
data1 = Dict("key1" => "value1")
data2 = rand(10_000) # Small array
env = smartsend("my.subject", [("dataname1", data1, "dictionary"), ("dataname2", data2, "table")])
# Send a large array using fileserver upload # Send a large array using fileserver upload
data = rand(10_000_000) # ~80 MB data = rand(10_000_000) # ~80 MB
env = smartsend("large.data", data, "arrow") env = smartsend("large.data", [("large_table", data, "table")])
# Mixed content (e.g., chat with text and image)
env = smartsend("chat.subject", [
("message_text", "Hello!", "text"),
("user_image", image_data, "image"),
("audio_clip", audio_data, "audio")
])
``` ```
""" """
function smartsend( function smartsend(
subject::String, # smartreceive's subject subject::String, # smartreceive's subject
data::Any, data::AbstractArray{Tuple{String, Any, String}}; # List of (dataname, data, type) tuples
type::String = "json";
dataname="NA",
nats_url::String = DEFAULT_NATS_URL, nats_url::String = DEFAULT_NATS_URL,
fileserver_url::String = DEFAULT_FILESERVER_URL, fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
fileServerUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
size_threshold::Int = DEFAULT_SIZE_THRESHOLD, size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing, correlation_id::Union{String, Nothing} = nothing,
msg_purpose::String = "chat", msg_purpose::String = "chat",
@@ -255,16 +301,19 @@ function smartsend(
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
# Serialize data based on type # Generate message metadata
payload_bytes = _serialize_data(data, type) # Convert data to bytes based on type
payload_size = length(payload_bytes) # Calculate payload size in bytes
log_trace(cid, "Serialized payload size: $payload_size bytes") # Log payload size
# Generate unique IDs
msg_id = string(uuid4()) msg_id = string(uuid4())
timestamp = string(Dates.now()) timestamp = string(Dates.now())
# Process each payload in the list
payloads = msgPayload_v1[]
for (dataname, payload_data, payload_type) in data
# Serialize data based on type
payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = length(payload_bytes) # Calculate payload size in bytes
log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size
# Decision: Direct vs Link # Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport if payload_size < size_threshold # Check if payload is small enough for direct transport
# Direct path - Base64 encode and send via NATS # Direct path - Base64 encode and send via NATS
@@ -275,42 +324,20 @@ function smartsend(
payload = msgPayload_v1( payload = msgPayload_v1(
id = string(uuid4()), id = string(uuid4()),
dataname = dataname, dataname = dataname,
type = type, type = payload_type,
transport = "direct", transport = "direct",
encoding = "base64", encoding = "base64",
size = payload_size, size = payload_size,
data = payload_b64, data = payload_b64,
metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") metadata = Dict("content_length" => payload_size)
) )
push!(payloads, payload)
# Create msgEnvelope_v1 with all fields populated
env = msgEnvelope_v1(
correlationId = cid,
msgId = msg_id,
timestamp = timestamp,
sendTo = subject,
msgPurpose = msg_purpose,
senderName = sender_name,
senderId = string(uuid4()),
receiverName = receiver_name,
receiverId = receiver_id,
replyTo = reply_to,
replyToMsgId = reply_to_msg_id,
brokerURL = nats_url,
metadata = Dict(),
payloads = [payload]
)
msg_json = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
return env # Return the envelope for tracking
else else
# Link path - Upload to HTTP server, send URL via NATS # Link path - Upload to HTTP server, send URL via NATS
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
# Upload to HTTP server # Upload to HTTP server
response = fileServerUploadHandler(fileserver_url, dataname, payload_bytes) response = fileserverUploadHandler(DEFAULT_FILESERVER_URL, dataname, payload_bytes)
if response[:status] != 200 # Check if upload was successful if response[:status] != 200 # Check if upload was successful
error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed
@@ -323,15 +350,18 @@ function smartsend(
payload = msgPayload_v1( payload = msgPayload_v1(
id = string(uuid4()), id = string(uuid4()),
dataname = dataname, dataname = dataname,
type = type, type = payload_type,
transport = "link", transport = "link",
encoding = "none", encoding = "none",
size = payload_size, size = payload_size,
data = url, data = url,
metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream") metadata = Dict("dataname" => dataname, "content_length" => payload_size, "format" => "arrow_ipc_stream")
) )
push!(payloads, payload)
end
end
# Create msgEnvelope_v1 with all fields populated # Create msgEnvelope_v1 with all payloads
env = msgEnvelope_v1( env = msgEnvelope_v1(
correlationId = cid, correlationId = cid,
msgId = msg_id, msgId = msg_id,
@@ -346,56 +376,72 @@ function smartsend(
replyToMsgId = reply_to_msg_id, replyToMsgId = reply_to_msg_id,
brokerURL = nats_url, brokerURL = nats_url,
metadata = Dict(), metadata = Dict(),
payloads = [payload] payloads = payloads
) )
msg_json = envelope_to_json(env) # Convert envelope to JSON msg_json = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
return env # Return the envelope for tracking return env # Return the envelope for tracking
end
end end
""" _serialize_data - Serialize data according to specified format """ _serialize_data - Serialize data according to specified format
This function serializes arbitrary Julia data into a binary representation based on the specified format. This function serializes arbitrary Julia data into a binary representation based on the specified format.
It supports three serialization formats: It supports multiple serialization formats:
- `"json"`: Serializes data as JSON and returns the UTF-8 byte representation - `"text"`: Treats data as text and converts to UTF-8 bytes
- `"dictionary"`: Serializes data as JSON and returns the UTF-8 byte representation
- `"table"`: Serializes data as an Arrow IPC stream (table format) and returns the byte stream - `"table"`: Serializes data as an Arrow IPC stream (table format) and returns the byte stream
- `"binary"`: Expects already-binary data (either `IOBuffer` or `Vector{UInt8}`) and returns it as bytes - `"image"`: Expects binary image data (Vector{UInt8}) and returns it as bytes
- `"audio"`: Expects binary audio data (Vector{UInt8}) and returns it as bytes
- `"video"`: Expects binary video data (Vector{UInt8}) and returns it as bytes
- `"binary"`: Generic binary data (Vector{UInt8} or IOBuffer) and returns bytes
The function handles format-specific serialization logic: The function handles format-specific serialization logic:
1. For `"json"`: Converts Julia data to JSON string, then encodes to bytes 1. For `"text"`: Converts string to UTF-8 bytes
2. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer 2. For `"dictionary"`: Converts Julia data to JSON string, then encodes to bytes
3. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly 3. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer
4. For `"image"`, `"audio"`, `"video"`: Treats data as binary (Vector{UInt8})
5. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly
# Arguments: # Arguments:
- `data::Any` - Data to serialize (JSON-serializable for `"json"`, table-like for `"table"`, binary for `"binary"`) - `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`)
- `type::String` - Target format: `"json"`, `"table"`, or `"binary"`
# Return: # Return:
- `Vector{UInt8}` - Binary representation of the serialized data - `Vector{UInt8}` - Binary representation of the serialized data
# Throws: # Throws:
- `Error` if `type` is not one of `"json"`, `"table"`, or `"binary"` - `Error` if `type` is not one of the supported types
- `Error` if `type == "binary"` but `data` is neither `IOBuffer` nor `Vector{UInt8}` - `Error` if `type` is `"image"`, `"audio"`, or `"video"` but `data` is not `Vector{UInt8}`
# Example # Example
```julia ```julia
using JSON, Arrow, DataFrames using JSON, Arrow, DataFrames
# Text serialization
text_data = "Hello, World!"
text_bytes = _serialize_data(text_data, "text")
# JSON serialization # JSON serialization
json_data = Dict("name" => "Alice", "age" => 30) json_data = Dict("name" => "Alice", "age" => 30)
json_bytes = _serialize_data(json_data, "json") json_bytes = _serialize_data(json_data, "dictionary")
# Table serialization with a DataFrame (recommended for tabular data) # Table serialization with a DataFrame (recommended for tabular data)
df = DataFrame(id = 1:3, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) df = DataFrame(id = 1:3, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92])
table_bytes = _serialize_data(df, "table") table_bytes = _serialize_data(df, "table")
# Table serialization with named tuple of vectors (also supported) # Image data (Vector{UInt8})
nt = (id = [1, 2, 3], name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) image_bytes = UInt8[1, 2, 3] # Image bytes
table_bytes_nt = _serialize_data(nt, "table") image_serialized = _serialize_data(image_bytes, "image")
# Audio data (Vector{UInt8})
audio_bytes = UInt8[1, 2, 3] # Audio bytes
audio_serialized = _serialize_data(audio_bytes, "audio")
# Video data (Vector{UInt8})
video_bytes = UInt8[1, 2, 3] # Video bytes
video_serialized = _serialize_data(video_bytes, "video")
# Binary data (IOBuffer) # Binary data (IOBuffer)
buf = IOBuffer() buf = IOBuffer()
@@ -407,13 +453,37 @@ binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary")
``` ```
""" """
function _serialize_data(data::Any, type::String) function _serialize_data(data::Any, type::String)
if type == "json" # JSON data - serialize directly if type == "text" # Text data - convert to UTF-8 bytes
if isa(data, String)
return bytes(data) # Convert string to UTF-8 bytes
else
error("Text data must be a String")
end
elseif type == "dictionary" # JSON data - serialize directly
json_str = JSON.json(data) # Convert Julia data to JSON string json_str = JSON.json(data) # Convert Julia data to JSON string
return bytes(json_str) # Convert JSON string to bytes return bytes(json_str) # Convert JSON string to bytes
elseif type == "table" # Table data - convert to Arrow IPC stream elseif type == "table" # Table data - convert to Arrow IPC stream
io = IOBuffer() # Create in-memory buffer io = IOBuffer() # Create in-memory buffer
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
return take!(io) # Return the buffer contents as bytes return take!(io) # Return the buffer contents as bytes
elseif type == "image" # Image data - treat as binary
if isa(data, Vector{UInt8})
return data # Return binary data directly
else
error("Image data must be Vector{UInt8}")
end
elseif type == "audio" # Audio data - treat as binary
if isa(data, Vector{UInt8})
return data # Return binary data directly
else
error("Audio data must be Vector{UInt8}")
end
elseif type == "video" # Video data - treat as binary
if isa(data, Vector{UInt8})
return data # Return binary data directly
else
error("Video data must be Vector{UInt8}")
end
elseif type == "binary" # Binary data - treat as binary elseif type == "binary" # Binary data - treat as binary
if isa(data, IOBuffer) # Check if data is an IOBuffer if isa(data, IOBuffer) # Check if data is an IOBuffer
return take!(data) # Return buffer contents as bytes return take!(data) # Return buffer contents as bytes
@@ -457,19 +527,27 @@ A HTTP file server is required along with its upload function.
Arguments: Arguments:
- `msg::NATS.Message` - NATS message to process - `msg::NATS.Message` - NATS message to process
- `fileserverDownloadHandler::Function` - Function to handle downloading data from file server URLs
Keyword Arguments: Keyword Arguments:
- `fileserver_url::String` - HTTP file server URL for link transport (default: DEFAULT_FILESERVER_URL)
- `max_retries::Int` - Maximum retry attempts for fetching URL (default: 5) - `max_retries::Int` - Maximum retry attempts for fetching URL (default: 5)
- `base_delay::Int` - Initial delay for exponential backoff in ms (default: 100) - `base_delay::Int` - Initial delay for exponential backoff in ms (default: 100)
- `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000) - `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000)
Return: Return:
- Tuple `(data = deserialized_data, envelope = msgEnvelope_v1)` - Data and envelope - `AbstractArray{Tuple{String, Any}}` - List of (dataname, data) tuples
# Example
```julia
# Receive and process message
msg = nats_message # NATS message
payloads = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay)
# payloads = [("dataname1", data1), ("dataname2", data2), ...]
```
""" """
function smartreceive( function smartreceive(
msg::NATS.Msg; msg::NATS.Message,
fileserver_url::String = DEFAULT_FILESERVER_URL, fileserverDownloadHandler::Function;
max_retries::Int = 5, max_retries::Int = 5,
base_delay::Int = 100, base_delay::Int = 100,
max_delay::Int = 5000 max_delay::Int = 5000
@@ -477,77 +555,53 @@ function smartreceive(
# Parse the JSON envelope # Parse the JSON envelope
json_data = JSON.parse(String(msg.payload)) json_data = JSON.parse(String(msg.payload))
# Get transport from the first payload
transport = String(json_data["payloads"][1]["transport"])
log_trace(json_data["correlationId"], "Processing received message") # Log message processing start log_trace(json_data["correlationId"], "Processing received message") # Log message processing start
if transport == "direct" # Direct transport - payload is in the message # Process all payloads in the envelope
log_trace(json_data["correlationId"], "Direct transport - decoding payload") # Log direct transport handling payloads_list = Tuple{String, Any}[]
# Extract base64 payload from the first payload # Get number of payloads
payload_b64 = String(json_data["payloads"][1]["data"]) num_payloads = length(json_data["payloads"])
for i in 1:num_payloads
payload_data = json_data["payloads"][i]
transport = String(payload_data["transport"])
dataname = String(payload_data["dataname"])
if transport == "direct" # Direct transport - payload is in the message
log_trace(json_data["correlationId"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
# Extract base64 payload from the payload
payload_b64 = String(payload_data["data"])
# Decode Base64 payload # Decode Base64 payload
payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes
# Deserialize based on type # Deserialize based on type
data_type = String(json_data["payloads"][1]["type"]) data_type = String(payload_data["type"])
data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"], Dict{String, Any}()) data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"])
# Create msgEnvelope_v1 from parsed data push!(payloads_list, (dataname, data))
env = msgEnvelope_v1(
correlationId = json_data["correlationId"],
msgId = haskey(json_data, "msgId") ? String(json_data["msgId"]) : "",
timestamp = haskey(json_data, "timestamp") ? String(json_data["timestamp"]) : "",
sendTo = json_data["sendTo"],
msgPurpose = haskey(json_data, "msgPurpose") ? String(json_data["msgPurpose"]) : "",
senderName = haskey(json_data, "senderName") ? String(json_data["senderName"]) : "",
senderId = haskey(json_data, "senderId") ? String(json_data["senderId"]) : "",
receiverName = haskey(json_data, "receiverName") ? String(json_data["receiverName"]) : "",
receiverId = haskey(json_data, "receiverId") ? String(json_data["receiverId"]) : "",
replyTo = haskey(json_data, "replyTo") ? String(json_data["replyTo"]) : "",
replyToMsgId = haskey(json_data, "replyToMsgId") ? String(json_data["replyToMsgId"]) : "",
brokerURL = haskey(json_data, "brokerURL") ? String(json_data["brokerURL"]) : DEFAULT_NATS_URL,
metadata = Dict{String, Any}(),
payloads = msgPayload_v1[]
)
return (data = data, envelope = env) # Return data and envelope as tuple
elseif transport == "link" # Link transport - payload is at URL elseif transport == "link" # Link transport - payload is at URL
log_trace(json_data["correlationId"], "Link transport - fetching from URL") # Log link transport handling log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL") # Log link transport handling
# Extract URL from the first payload # Extract URL from the payload
url = String(json_data["payloads"][1]["data"]) url = String(payload_data["data"])
# Fetch with exponential backoff # Fetch with exponential backoff using the download handler
downloaded_data = _fetch_with_backoff(url, max_retries, base_delay, max_delay, json_data["correlationId"]) # Fetch data from URL downloaded_data = fileserverDownloadHandler(DEFAULT_FILESERVER_URL, url, max_retries, base_delay, max_delay)
# Deserialize based on type # Deserialize based on type
data_type = String(json_data["payloads"][1]["type"]) data_type = String(payload_data["type"])
data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"], Dict{String, Any}()) data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"])
# Create msgEnvelope_v1 from parsed data push!(payloads_list, (dataname, data))
env = msgEnvelope_v1(
correlationId = json_data["correlationId"],
msgId = haskey(json_data, "msgId") ? String(json_data["msgId"]) : "",
timestamp = haskey(json_data, "timestamp") ? String(json_data["timestamp"]) : "",
sendTo = json_data["sendTo"],
msgPurpose = haskey(json_data, "msgPurpose") ? String(json_data["msgPurpose"]) : "",
senderName = haskey(json_data, "senderName") ? String(json_data["senderName"]) : "",
senderId = haskey(json_data, "senderId") ? String(json_data["senderId"]) : "",
receiverName = haskey(json_data, "receiverName") ? String(json_data["receiverName"]) : "",
receiverId = haskey(json_data, "receiverId") ? String(json_data["receiverId"]) : "",
replyTo = haskey(json_data, "replyTo") ? String(json_data["replyTo"]) : "",
replyToMsgId = haskey(json_data, "replyToMsgId") ? String(json_data["replyToMsgId"]) : "",
brokerURL = haskey(json_data, "brokerURL") ? String(json_data["brokerURL"]) : DEFAULT_NATS_URL,
metadata = Dict{String, Any}(),
payloads = msgPayload_v1[]
)
return (data = data, envelope = env) # Return data and envelope as tuple
else # Unknown transport type else # Unknown transport type
error("Unknown transport type: $(transport)") # Throw error for unknown transport error("Unknown transport type for payload '$dataname': $(transport)") # Throw error for unknown transport
end end
end
return payloads_list # Return list of (dataname, data) tuples
end end
@@ -598,31 +652,37 @@ end
""" Deserialize bytes to data based on type """ Deserialize bytes to data based on type
This internal function converts serialized bytes back to Julia data based on type. This internal function converts serialized bytes back to Julia data based on type.
It handles "json" (JSON deserialization), "table" (Arrow IPC deserialization), It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow IPC deserialization),
and "binary" (binary data). "image" (binary data), "audio" (binary data), "video" (binary data), and "binary" (binary data).
Arguments: Arguments:
- `data::Vector{UInt8}` - Serialized data as bytes - `data::Vector{UInt8}` - Serialized data as bytes
- `type::String` - Data type ("json", "table", "binary") - `type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
- `correlation_id::String` - Correlation ID for logging - `correlation_id::String` - Correlation ID for logging
- `metadata::Dict{String, Any}` - Metadata about the data
Return: Return:
- Deserialized data (DataFrame for "table", JSON data for "json", bytes for "binary") - Deserialized data (String for "text", DataFrame for "table", JSON data for "dictionary", bytes for "image", "audio", "video", "binary")
""" """
function _deserialize_data( function _deserialize_data(
data::Vector{UInt8}, data::Vector{UInt8},
type::String, type::String,
correlation_id::String, correlation_id::String
metadata::Dict{String, Any}
) )
if type == "json" # JSON data - deserialize if type == "text" # Text data - convert to string
return String(data) # Convert bytes to string
elseif type == "dictionary" # JSON data - deserialize
json_str = String(data) # Convert bytes to string json_str = String(data) # Convert bytes to string
return JSON.parse(json_str) # Parse JSON string to Julia data structure return JSON.parse(json_str) # Parse JSON string to Julia data structure
elseif type == "table" # Table data - deserialize Arrow IPC stream elseif type == "table" # Table data - deserialize Arrow IPC stream
io = IOBuffer(data) # Create buffer from bytes io = IOBuffer(data) # Create buffer from bytes
df = Arrow.Table(io) # Read Arrow IPC format from buffer df = Arrow.Table(io) # Read Arrow IPC format from buffer
return df # Return DataFrame return df # Return DataFrame
elseif type == "image" # Image data - return binary
return data # Return bytes directly
elseif type == "audio" # Audio data - return binary
return data # Return bytes directly
elseif type == "video" # Video data - return binary
return data # Return bytes directly
elseif type == "binary" # Binary data - return binary elseif type == "binary" # Binary data - return binary
return data # Return bytes directly return data # Return bytes directly
else # Unknown type else # Unknown type
@@ -648,26 +708,25 @@ The function workflow:
- `data::Vector{UInt8}` - Raw byte data of the file content - `data::Vector{UInt8}` - Raw byte data of the file content
# Return: # Return:
- A named tuple with fields: - A Dict with keys:
- `status::Integer` - HTTP server response status - `"status"` - HTTP server response status
- `uploadid::String` - ID of the one-shot upload session - `"uploadid"` - ID of the one-shot upload session
- `fileid::String` - ID of the uploaded file within the session - `"fileid"` - ID of the uploaded file within the session
- `url::String` - Full URL to download the uploaded file - `"url"` - Full URL to download the uploaded file
# Example # Example
```jldoctest ```julia
using HTTP, JSON using HTTP, JSON
fileServerURL = "http://localhost:8080" fileServerURL = "http://localhost:8080"
filepath = "./test.zip" filename = "test.txt"
filename = basename(filepath) data = UInt8["hello world"]
filebytes = read(filepath) # read(filepath) output is raw bytes of the file
# Upload to local plik server # Upload to local plik server
status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filename, filebytes) result = plik_oneshot_upload(fileServerURL, filename, data)
# to download an uploaded file # Access the result as a Dict
curl -L -O "url" # result["status"], result["uploadid"], result["fileid"], result["url"]
``` ```
""" """
function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8}) function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8})
@@ -697,18 +756,17 @@ function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vect
httpResponse = nothing httpResponse = nothing
try try
httpResponse = HTTP.post(url_upload, headers, form) httpResponse = HTTP.post(url_upload, headers, form)
# println("Status: ", httpResponse.status)
responseJson = JSON.parse(httpResponse.body) responseJson = JSON.parse(httpResponse.body)
catch e catch e
@error "Request failed" exception=e @error "Request failed" exception=e
end end
fileid=responseJson["id"] fileid = responseJson["id"]
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
url = "$fileServerURL/file/$uploadid/$fileid/$filename" url = "$fileServerURL/file/$uploadid/$fileid/$filename"
return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url) return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end end
@@ -730,11 +788,11 @@ The function workflow:
- `filepath::String` - Full path to the local file to upload - `filepath::String` - Full path to the local file to upload
# Return: # Return:
- A named tuple with fields: - A Dict with keys:
- `status::Integer` - HTTP server response status - `"status"` - HTTP server response status
- `uploadid::String` - ID of the one-shot upload session - `"uploadid"` - ID of the one-shot upload session
- `fileid::String` - ID of the uploaded file within the session - `"fileid"` - ID of the uploaded file within the session
- `url::String` - Full URL to download the uploaded file - `"url"` - Full URL to download the uploaded file
# Example # Example
```julia ```julia
@@ -744,10 +802,10 @@ fileServerURL = "http://localhost:8080"
filepath = "./test.zip" filepath = "./test.zip"
# Upload to local plik server # Upload to local plik server
status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filepath) result = plik_oneshot_upload(fileServerURL, filepath)
# To download the uploaded file later (via curl as example): # Access the result as a Dict
curl -L -O "url" # result["status"], result["uploadid"], result["fileid"], result["url"]
``` ```
""" """
function plik_oneshot_upload(fileServerURL::String, filepath::String) function plik_oneshot_upload(fileServerURL::String, filepath::String)
@@ -763,7 +821,6 @@ function plik_oneshot_upload(fileServerURL::String, filepath::String)
uploadid = responseJson["id"] uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"] uploadtoken = responseJson["uploadToken"]
println("uploadid = ", uploadid)
# ------------------------------------------ upload file ----------------------------------------- # # ------------------------------------------ upload file ----------------------------------------- #
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID # Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
@@ -780,18 +837,17 @@ function plik_oneshot_upload(fileServerURL::String, filepath::String)
httpResponse = nothing httpResponse = nothing
try try
httpResponse = HTTP.post(url_upload, headers, form) httpResponse = HTTP.post(url_upload, headers, form)
# println("Status: ", httpResponse.status)
responseJson = JSON.parse(httpResponse.body) responseJson = JSON.parse(httpResponse.body)
catch e catch e
@error "Request failed" exception=e @error "Request failed" exception=e
end end
fileid=responseJson["id"] fileid = responseJson["id"]
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
url = "$fileServerURL/file/$uploadid/$fileid/$filename" url = "$fileServerURL/file/$uploadid/$fileid/$filename"
return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url) return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end end

View File

@@ -12,6 +12,9 @@ const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB
const DEFAULT_NATS_URL = 'nats://localhost:4222'; const DEFAULT_NATS_URL = 'nats://localhost:4222';
const DEFAULT_FILESERVER_URL = 'http://localhost:8080/upload'; const DEFAULT_FILESERVER_URL = 'http://localhost:8080/upload';
// Supported payload types
const PAYLOAD_TYPES = ['text', 'dictionary', 'table', 'image', 'audio', 'video', 'binary'];
// Logging helper // Logging helper
function logTrace(correlationId, message) { function logTrace(correlationId, message) {
const timestamp = new Date().toISOString(); const timestamp = new Date().toISOString();
@@ -64,64 +67,6 @@ class MessageEnvelope {
} }
} }
// SmartSend for JavaScript - Handles transport selection based on payload size
async function SmartSend(subject, data, type = 'json', options = {}) {
const {
natsUrl = DEFAULT_NATS_URL,
fileserverUrl = DEFAULT_FILESERVER_URL,
sizeThreshold = DEFAULT_SIZE_THRESHOLD,
correlationId = uuidv4()
} = options;
logTrace(correlationId, `Starting SmartSend for subject: ${subject}`);
// Serialize data based on type
const payloadBytes = _serializeData(data, type, correlationId);
const payloadSize = payloadBytes.length;
logTrace(correlationId, `Serialized payload size: ${payloadSize} bytes`);
// Decision: Direct vs Link
if (payloadSize < sizeThreshold) {
// Direct path - Base64 encode and send via NATS
const payloadBase64 = encode(payloadBytes);
logTrace(correlationId, `Using direct transport for ${payloadSize} bytes`);
const env = new MessageEnvelope({
correlation_id: correlationId,
type: type,
transport: 'direct',
payload: payloadBase64,
metadata: {
content_length: payloadSize.toString(),
format: 'arrow_ipc_stream'
}
});
await publishMessage(natsUrl, subject, env.toJSON(), correlationId);
return env;
} else {
// Link path - Upload to HTTP server, send URL via NATS
logTrace(correlationId, `Using link transport, uploading to fileserver`);
const url = await uploadToServer(payloadBytes, fileserverUrl, correlationId);
const env = new MessageEnvelope({
correlation_id: correlationId,
type: type,
transport: 'link',
url: url,
metadata: {
content_length: payloadSize.toString(),
format: 'arrow_ipc_stream'
}
});
await publishMessage(natsUrl, subject, env.toJSON(), correlationId);
return env;
}
}
// Helper: Serialize data based on type // Helper: Serialize data based on type
function _serializeData(data, type, correlationId) { function _serializeData(data, type, correlationId) {
if (type === 'json') { if (type === 'json') {
@@ -146,6 +91,106 @@ function _serializeData(data, type, correlationId) {
} }
} }
// Helper: Upload data to fileserver (mock implementation)
async function uploadToServer(data, fileserverUrl, correlationId) {
// This is a placeholder - in real implementation, this would upload to the fileserver
// and return the URL. For now, we return a mock URL.
return `${fileserverUrl}/mock-upload-${Date.now()}`;
}
// SmartSend for JavaScript - Handles transport selection based on payload size
// Input format: [(dataname, data, type), ...]
async function SmartSend(subject, data, options = {}) {
const {
natsUrl = DEFAULT_NATS_URL,
fileserverUrl = DEFAULT_FILESERVER_URL,
sizeThreshold = DEFAULT_SIZE_THRESHOLD,
correlationId = uuidv4()
} = options;
logTrace(correlationId, `Starting SmartSend for subject: ${subject}`);
// Process each payload in the list
const payloadResults = [];
for (let i = 0; i < data.length; i++) {
const tuple = data[i];
if (tuple.length !== 3) {
throw new Error(`Payload at index ${i} must be a tuple of [dataname, data, type]`);
}
const [dataname, payload_data, payload_type] = tuple;
// Validate type
if (!PAYLOAD_TYPES.includes(payload_type)) {
throw new Error(`Unknown payload type '${payload_type}' for payload '${dataname}'. Supported types: ${PAYLOAD_TYPES.join(', ')}`);
}
// Serialize data based on type
const payloadBytes = _serializeData(payload_data, payload_type, correlationId);
const payloadSize = payloadBytes.length;
logTrace(correlationId, `Serialized payload '${dataname}' (type: ${payload_type}) size: ${payloadSize} bytes`);
// Decision: Direct vs Link
if (payloadSize < sizeThreshold) {
// Direct path - Base64 encode and send via NATS
const payloadBase64 = encode(payloadBytes);
logTrace(correlationId, `Using direct transport for ${payloadSize} bytes`);
payloadResults.push({
dataname,
payload_type,
transport: 'direct',
payload: payloadBase64,
metadata: {
content_length: payloadSize.toString(),
format: 'arrow_ipc_stream'
}
});
} else {
// Link path - Upload to HTTP server, send URL via NATS
logTrace(correlationId, `Using link transport, uploading to fileserver`);
const url = await uploadToServer(payloadBytes, fileserverUrl, correlationId);
payloadResults.push({
dataname,
payload_type,
transport: 'link',
url: url,
metadata: {
content_length: payloadSize.toString(),
format: 'arrow_ipc_stream'
}
});
}
}
// Build the final message with all payloads
const allPayloads = payloadResults.map(p => ({
dataname: p.dataname,
type: p.payload_type,
transport: p.transport,
...(p.transport === 'direct' ? { payload: p.payload } : { url: p.url }),
metadata: p.metadata
}));
// Create envelope and publish
const env = {
correlation_id: correlationId,
type: allPayloads[0].type, // Use first payload's type as envelope type
transport: allPayloads[0].transport,
payload: allPayloads.length === 1 && allPayloads[0].transport === 'direct' ? allPayloads[0].payload : undefined,
url: allPayloads.length === 1 && allPayloads[0].transport === 'link' ? allPayloads[0].url : undefined,
metadata: {},
_payloads: allPayloads // Internal storage for multiple payloads
};
await publishMessage(natsUrl, subject, JSON.stringify(env), correlationId);
return env;
}
// Helper: Publish message to NATS // Helper: Publish message to NATS
async function publishMessage(natsUrl, subject, message, correlationId) { async function publishMessage(natsUrl, subject, message, correlationId) {
const { connect } = require('nats'); const { connect } = require('nats');