update
This commit is contained in:
@@ -12,10 +12,10 @@
|
|||||||
#
|
#
|
||||||
# ```jldoctest
|
# ```jldoctest
|
||||||
# # Upload handler - uploads data to file server and returns URL
|
# # Upload handler - uploads data to file server and returns URL
|
||||||
# fileserverUploadHandler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
# fileserver_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
||||||
#
|
#
|
||||||
# # Download handler - fetches data from file server URL with exponential backoff
|
# # Download handler - fetches data from file server URL with exponential backoff
|
||||||
# fileserverDownloadHandler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
|
# fileserver_download_handler(url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String)::Vector{UInt8}
|
||||||
# ```
|
# ```
|
||||||
#
|
#
|
||||||
# Multi-Payload Support (Standard API):
|
# Multi-Payload Support (Standard API):
|
||||||
@@ -35,24 +35,23 @@
|
|||||||
|
|
||||||
module NATSBridge
|
module NATSBridge
|
||||||
|
|
||||||
using Revise
|
|
||||||
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting
|
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64, PrettyPrinting
|
||||||
# ---------------------------------------------- 100 --------------------------------------------- #
|
# ---------------------------------------------- 100 --------------------------------------------- #
|
||||||
|
|
||||||
# Constants
|
# Constants
|
||||||
const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport
|
const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport
|
||||||
const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL
|
const DEFAULT_BROKER_URL = "nats://localhost:4222" # Default NATS server URL
|
||||||
const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport
|
const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL for link transport
|
||||||
|
|
||||||
|
|
||||||
""" msgPayload_v1 - Internal message payload structure
|
""" msg_payload_v1 - Internal message payload structure
|
||||||
This structure represents a single payload within a NATS message envelope.
|
This structure represents a single payload within a NATS message envelope.
|
||||||
It supports both direct transport (base64-encoded data) and link transport (URL-based).
|
It supports both direct transport (base64-encoded data) and link transport (URL-based).
|
||||||
|
|
||||||
# Arguments:
|
# Arguments:
|
||||||
- `id::String` - Unique identifier for this payload (e.g., "uuid4")
|
- `id::String` - Unique identifier for this payload (e.g., "uuid4")
|
||||||
- `dataname::String` - Name of the payload (e.g., "login_image")
|
- `dataname::String` - Name of the payload (e.g., "login_image")
|
||||||
- `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
- `payload_type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||||
- `transport::String` - Transport method: "direct" or "link"
|
- `transport::String` - Transport method: "direct" or "link"
|
||||||
- `encoding::String` - Encoding method: "none", "json", "base64", "arrow-ipc"
|
- `encoding::String` - Encoding method: "none", "json", "base64", "arrow-ipc"
|
||||||
- `size::Integer` - Size of the payload in bytes (e.g., 15433)
|
- `size::Integer` - Size of the payload in bytes (e.g., 15433)
|
||||||
@@ -68,14 +67,14 @@ It supports both direct transport (base64-encoded data) and link transport (URL-
|
|||||||
- `metadata::Dict{String, T} = Dict{String, Any}()` - Metadata dictionary
|
- `metadata::Dict{String, T} = Dict{String, Any}()` - Metadata dictionary
|
||||||
|
|
||||||
# Return:
|
# Return:
|
||||||
- A msgPayload_v1 struct instance
|
- A msg_payload_v1 struct instance
|
||||||
|
|
||||||
# Example
|
# Example
|
||||||
```jldoctest
|
```jldoctest
|
||||||
using UUIDs
|
using UUIDs
|
||||||
|
|
||||||
# Create a direct transport payload
|
# Create a direct transport payload
|
||||||
payload = msgPayload_v1(
|
payload = msg_payload_v1(
|
||||||
"Hello World",
|
"Hello World",
|
||||||
"text";
|
"text";
|
||||||
id = string(uuid4()),
|
id = string(uuid4()),
|
||||||
@@ -87,7 +86,7 @@ payload = msgPayload_v1(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Create a link transport payload
|
# Create a link transport payload
|
||||||
payload = msgPayload_v1(
|
payload = msg_payload_v1(
|
||||||
"http://example.com/file.zip",
|
"http://example.com/file.zip",
|
||||||
"binary";
|
"binary";
|
||||||
id = string(uuid4()),
|
id = string(uuid4()),
|
||||||
@@ -98,10 +97,10 @@ payload = msgPayload_v1(
|
|||||||
)
|
)
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
struct msgPayload_v1
|
struct msg_payload_v1
|
||||||
id::String # id of this payload e.g. "uuid4"
|
id::String # id of this payload e.g. "uuid4"
|
||||||
dataname::String # name of this payload e.g. "login_image"
|
dataname::String # name of this payload e.g. "login_image"
|
||||||
type::String # this payload type. Can be "text | dictionary | table | image | audio | video | binary"
|
payload_type::String # this payload type. Can be "text | dictionary | table | image | audio | video | binary"
|
||||||
transport::String # "direct | link"
|
transport::String # "direct | link"
|
||||||
encoding::String # "none | json | base64 | arrow-ipc"
|
encoding::String # "none | json | base64 | arrow-ipc"
|
||||||
size::Integer # data size in bytes e.g. 15433
|
size::Integer # data size in bytes e.g. 15433
|
||||||
@@ -110,9 +109,9 @@ struct msgPayload_v1
|
|||||||
end
|
end
|
||||||
|
|
||||||
# constructor
|
# constructor
|
||||||
function msgPayload_v1(
|
function msg_payload_v1(
|
||||||
data::Any,
|
data::Any,
|
||||||
type::String;
|
payload_type::String;
|
||||||
id::String = "",
|
id::String = "",
|
||||||
dataname::String = string(uuid4()),
|
dataname::String = string(uuid4()),
|
||||||
transport::String = "direct",
|
transport::String = "direct",
|
||||||
@@ -120,10 +119,10 @@ function msgPayload_v1(
|
|||||||
size::Integer = 0,
|
size::Integer = 0,
|
||||||
metadata::Dict{String, T} = Dict{String, Any}()
|
metadata::Dict{String, T} = Dict{String, Any}()
|
||||||
) where {T<:Any}
|
) where {T<:Any}
|
||||||
return msgPayload_v1(
|
return msg_payload_v1(
|
||||||
id,
|
id,
|
||||||
dataname,
|
dataname,
|
||||||
type,
|
payload_type,
|
||||||
transport,
|
transport,
|
||||||
encoding,
|
encoding,
|
||||||
size,
|
size,
|
||||||
@@ -133,101 +132,101 @@ function msgPayload_v1(
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
""" msgEnvelope_v1 - Internal message envelope structure
|
""" msg_envelope_v1 - Internal message envelope structure
|
||||||
This structure represents a complete NATS message envelope containing multiple payloads
|
This structure represents a complete NATS message envelope containing multiple payloads
|
||||||
with metadata for routing, tracing, and message context.
|
with metadata for routing, tracing, and message context.
|
||||||
|
|
||||||
# Arguments:
|
# Arguments:
|
||||||
- `sendTo::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt")
|
- `send_to::String` - NATS subject/topic to publish the message to (e.g., "/agent/wine/api/v1/prompt")
|
||||||
- `payloads::AbstractArray{msgPayload_v1}` - List of payloads to include in the message
|
- `payloads::AbstractArray{msg_payload_v1}` - List of payloads to include in the message
|
||||||
|
|
||||||
# Keyword Arguments:
|
# Keyword Arguments:
|
||||||
- `correlationId::String = ""` - Unique identifier to track messages across systems; auto-generated if empty
|
- `correlation_id::String = ""` - Unique identifier to track messages across systems; auto-generated if empty
|
||||||
- `msgId::String = ""` - Unique message identifier; auto-generated if empty
|
- `msg_id::String = ""` - Unique message identifier; auto-generated if empty
|
||||||
- `timestamp::String = string(Dates.now())` - Message publication timestamp
|
- `timestamp::String = string(Dates.now())` - Message publication timestamp
|
||||||
- `msgPurpose::String = ""` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
- `msg_purpose::String = ""` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
||||||
- `senderName::String = ""` - Name of the sender (e.g., "agent-wine-web-frontend")
|
- `sender_name::String = ""` - Name of the sender (e.g., "agent-wine-web-frontend")
|
||||||
- `senderId::String = ""` - UUID of the sender; auto-generated if empty
|
- `sender_id::String = ""` - UUID of the sender; auto-generated if empty
|
||||||
- `receiverName::String = ""` - Name of the receiver (empty string means broadcast)
|
- `receiver_name::String = ""` - Name of the receiver (empty string means broadcast)
|
||||||
- `receiverId::String = ""` - UUID of the receiver (empty string means broadcast)
|
- `receiver_id::String = ""` - UUID of the receiver (empty string means broadcast)
|
||||||
- `replyTo::String = ""` - Topic where receiver should reply (empty string if no reply expected)
|
- `reply_to::String = ""` - Topic where receiver should reply (empty string if no reply expected)
|
||||||
- `replyToMsgId::String = ""` - Message ID this message is replying to
|
- `reply_to_msg_id::String = ""` - Message ID this message is replying to
|
||||||
- `brokerURL::String = DEFAULT_NATS_URL` - NATS broker URL
|
- `broker_url::String = DEFAULT_BROKER_URL` - NATS broker URL
|
||||||
- `metadata::Dict{String, Any} = Dict{String, Any}()` - Optional message-level metadata
|
- `metadata::Dict{String, Any} = Dict{String, Any}()` - Optional message-level metadata
|
||||||
|
|
||||||
# Return:
|
# Return:
|
||||||
- A msgEnvelope_v1 struct instance
|
- A msg_envelope_v1 struct instance
|
||||||
|
|
||||||
# Example
|
# Example
|
||||||
```jldoctest
|
```jldoctest
|
||||||
using UUIDs, NATSBridge
|
using UUIDs, NATSBridge
|
||||||
|
|
||||||
# Create payloads for the message
|
# Create payloads for the message
|
||||||
payload1 = msgPayload_v1("Hello", "text"; dataname="message", transport="direct", encoding="base64")
|
payload1 = msg_payload_v1("Hello", "text"; dataname="message", transport="direct", encoding="base64")
|
||||||
payload2 = msgPayload_v1("http://example.com/file.zip", "binary"; dataname="file", transport="link")
|
payload2 = msg_payload_v1("http://example.com/file.zip", "binary"; dataname="file", transport="link")
|
||||||
|
|
||||||
# Create message envelope
|
# Create message envelope
|
||||||
env = msgEnvelope_v1(
|
env = msg_envelope_v1(
|
||||||
"my.subject",
|
"my.subject",
|
||||||
[payload1, payload2];
|
[payload1, payload2];
|
||||||
correlationId = string(uuid4()),
|
correlation_id = string(uuid4()),
|
||||||
msgPurpose = "chat",
|
msg_purpose = "chat",
|
||||||
senderName = "my-app",
|
sender_name = "my-app",
|
||||||
receiverName = "receiver-app",
|
receiver_name = "receiver-app",
|
||||||
replyTo = "reply.subject"
|
reply_to = "reply.subject"
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
struct msgEnvelope_v1
|
struct msg_envelope_v1
|
||||||
correlationId::String # Unique identifier to track messages across systems. Many senders can talk about the same topic.
|
correlation_id::String # Unique identifier to track messages across systems. Many senders can talk about the same topic.
|
||||||
msgId::String # this message id
|
msg_id::String # this message id
|
||||||
timestamp::String # message published timestamp. string(Dates.now())
|
timestamp::String # message published timestamp. string(Dates.now())
|
||||||
|
|
||||||
sendTo::String # topic/subject the sender sends to e.g. "/agent/wine/api/v1/prompt"
|
send_to::String # topic/subject the sender sends to e.g. "/agent/wine/api/v1/prompt"
|
||||||
msgPurpose::String # purpose of this message e.g. "ACK | NACK | updateStatus | shutdown | ..."
|
msg_purpose::String # purpose of this message e.g. "ACK | NACK | updateStatus | shutdown | ..."
|
||||||
senderName::String # sender name (String) e.g. "agent-wine-web-frontend"
|
sender_name::String # sender name (String) e.g. "agent-wine-web-frontend"
|
||||||
senderId::String # sender id e.g. uuid4snakecase()
|
sender_id::String # sender id e.g. uuid4snakecase()
|
||||||
receiverName::String # msg receiver name (String) e.g. "agent-backend"
|
receiver_name::String # msg receiver name (String) e.g. "agent-backend"
|
||||||
receiverId::String # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
|
receiver_id::String # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
|
||||||
|
|
||||||
replyTo::String # sender ask receiver to reply to this topic
|
reply_to::String # sender ask receiver to reply to this topic
|
||||||
replyToMsgId::String # the message id this message is replying to
|
reply_to_msg_id::String # the message id this message is replying to
|
||||||
brokerURL::String # mqtt/NATS server address
|
broker_url::String # mqtt/NATS server address
|
||||||
|
|
||||||
metadata::Dict{String, Any}
|
metadata::Dict{String, Any}
|
||||||
payloads::AbstractArray{msgPayload_v1} # multiple payload store here
|
payloads::AbstractArray{msg_payload_v1} # multiple payload store here
|
||||||
end
|
end
|
||||||
|
|
||||||
# constructor
|
# constructor
|
||||||
function msgEnvelope_v1(
|
function msg_envelope_v1(
|
||||||
sendTo::String,
|
send_to::String,
|
||||||
payloads::AbstractArray{msgPayload_v1};
|
payloads::AbstractArray{msg_payload_v1};
|
||||||
correlationId::String = "",
|
correlation_id::String = "",
|
||||||
msgId::String = "",
|
msg_id::String = "",
|
||||||
timestamp::String = string(Dates.now()),
|
timestamp::String = string(Dates.now()),
|
||||||
msgPurpose::String = "",
|
msg_purpose::String = "",
|
||||||
senderName::String = "",
|
sender_name::String = "",
|
||||||
senderId::String = "",
|
sender_id::String = "",
|
||||||
receiverName::String = "",
|
receiver_name::String = "",
|
||||||
receiverId::String = "",
|
receiver_id::String = "",
|
||||||
replyTo::String = "",
|
reply_to::String = "",
|
||||||
replyToMsgId::String = "",
|
reply_to_msg_id::String = "",
|
||||||
brokerURL::String = DEFAULT_NATS_URL,
|
broker_url::String = DEFAULT_BROKER_URL,
|
||||||
metadata::Dict{String, Any} = Dict{String, Any}()
|
metadata::Dict{String, Any} = Dict{String, Any}()
|
||||||
)
|
)
|
||||||
return msgEnvelope_v1(
|
return msg_envelope_v1(
|
||||||
correlationId,
|
correlation_id,
|
||||||
msgId,
|
msg_id,
|
||||||
timestamp,
|
timestamp,
|
||||||
sendTo,
|
send_to,
|
||||||
msgPurpose,
|
msg_purpose,
|
||||||
senderName,
|
sender_name,
|
||||||
senderId,
|
sender_id,
|
||||||
receiverName,
|
receiver_name,
|
||||||
receiverId,
|
receiver_id,
|
||||||
replyTo,
|
reply_to,
|
||||||
replyToMsgId,
|
reply_to_msg_id,
|
||||||
brokerURL,
|
broker_url,
|
||||||
metadata,
|
metadata,
|
||||||
payloads
|
payloads
|
||||||
)
|
)
|
||||||
@@ -235,19 +234,19 @@ end
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
""" envelope_to_json - Convert msgEnvelope_v1 to JSON string
|
""" envelope_to_json - Convert msg_envelope_v1 to JSON string
|
||||||
This function converts the msgEnvelope_v1 struct to a JSON string representation,
|
This function converts the msg_envelope_v1 struct to a JSON string representation,
|
||||||
preserving all metadata and payload information for NATS message publishing.
|
preserving all metadata and payload information for NATS message publishing.
|
||||||
|
|
||||||
# Function Workflow:
|
# Function Workflow:
|
||||||
1. Creates a dictionary with envelope metadata (correlationId, msgId, timestamp, etc.)
|
1. Creates a dictionary with envelope metadata (correlation_id, msg_id, timestamp, etc.)
|
||||||
2. Conditionally includes metadata dictionary if not empty
|
2. Conditionally includes metadata dictionary if not empty
|
||||||
3. Iterates through payloads and converts each to JSON-compatible dictionary
|
3. Iterates through payloads and converts each to JSON-compatible dictionary
|
||||||
4. Handles direct transport payloads (Base64 encoding) and link transport payloads (URL)
|
4. Handles direct transport payloads (Base64 encoding) and link transport payloads (URL)
|
||||||
5. Returns final JSON string representation
|
5. Returns final JSON string representation
|
||||||
|
|
||||||
# Arguments:
|
# Arguments:
|
||||||
- `env::msgEnvelope_v1` - The msgEnvelope_v1 struct to convert to JSON
|
- `env::msg_envelope_v1` - The msg_envelope_v1 struct to convert to JSON
|
||||||
|
|
||||||
# Return:
|
# Return:
|
||||||
- `String` - JSON string representation of the envelope
|
- `String` - JSON string representation of the envelope
|
||||||
@@ -257,27 +256,27 @@ preserving all metadata and payload information for NATS message publishing.
|
|||||||
using UUIDs
|
using UUIDs
|
||||||
|
|
||||||
# Create an envelope with payloads
|
# Create an envelope with payloads
|
||||||
payload = msgPayload_v1("Hello", "text"; dataname="msg", transport="direct", encoding="base64")
|
payload = msg_payload_v1("Hello", "text"; dataname="msg", transport="direct", encoding="base64")
|
||||||
env = msgEnvelope_v1("my.subject", [payload])
|
env = msg_envelope_v1("my.subject", [payload])
|
||||||
|
|
||||||
# Convert to JSON for publishing
|
# Convert to JSON for publishing
|
||||||
json_msg = envelope_to_json(env)
|
json_msg = envelope_to_json(env)
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
function envelope_to_json(env::msgEnvelope_v1)
|
function envelope_to_json(env::msg_envelope_v1)
|
||||||
obj = Dict{String, Any}(
|
obj = Dict{String, Any}(
|
||||||
"correlationId" => env.correlationId,
|
"correlation_id" => env.correlation_id,
|
||||||
"msgId" => env.msgId,
|
"msg_id" => env.msg_id,
|
||||||
"timestamp" => env.timestamp,
|
"timestamp" => env.timestamp,
|
||||||
"sendTo" => env.sendTo,
|
"send_to" => env.send_to,
|
||||||
"msgPurpose" => env.msgPurpose,
|
"msg_purpose" => env.msg_purpose,
|
||||||
"senderName" => env.senderName,
|
"sender_name" => env.sender_name,
|
||||||
"senderId" => env.senderId,
|
"sender_id" => env.sender_id,
|
||||||
"receiverName" => env.receiverName,
|
"receiver_name" => env.receiver_name,
|
||||||
"receiverId" => env.receiverId,
|
"receiver_id" => env.receiver_id,
|
||||||
"replyTo" => env.replyTo,
|
"reply_to" => env.reply_to,
|
||||||
"replyToMsgId" => env.replyToMsgId,
|
"reply_to_msg_id" => env.reply_to_msg_id,
|
||||||
"brokerURL" => env.brokerURL
|
"broker_url" => env.broker_url
|
||||||
)
|
)
|
||||||
|
|
||||||
if !isempty(env.metadata) # Only include metadata if it exists and is not empty
|
if !isempty(env.metadata) # Only include metadata if it exists and is not empty
|
||||||
@@ -291,7 +290,7 @@ function envelope_to_json(env::msgEnvelope_v1)
|
|||||||
payload_obj = Dict{String, Any}(
|
payload_obj = Dict{String, Any}(
|
||||||
"id" => payload.id,
|
"id" => payload.id,
|
||||||
"dataname" => payload.dataname,
|
"dataname" => payload.dataname,
|
||||||
"type" => payload.type,
|
"payload_type" => payload.type,
|
||||||
"transport" => payload.transport,
|
"transport" => payload.transport,
|
||||||
"encoding" => payload.encoding,
|
"encoding" => payload.encoding,
|
||||||
"size" => payload.size,
|
"size" => payload.size,
|
||||||
@@ -359,8 +358,8 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
|
|||||||
1. Iterates through the list of (dataname, data, type) tuples
|
1. Iterates through the list of (dataname, data, type) tuples
|
||||||
2. For each payload: extracts the type from the tuple and serializes accordingly
|
2. For each payload: extracts the type from the tuple and serializes accordingly
|
||||||
3. Compares the serialized size against `size_threshold`
|
3. Compares the serialized size against `size_threshold`
|
||||||
4. For small payloads: encodes as Base64, constructs a "direct" msgPayload_v1
|
4. For small payloads: encodes as Base64, constructs a "direct" msg_payload_v1
|
||||||
5. For large payloads: uploads to the fileserver, constructs a "link" msgPayload_v1 with the URL
|
5. For large payloads: uploads to the fileserver, constructs a "link" msg_payload_v1 with the URL
|
||||||
6. Converts envelope to JSON string and optionally publishes to NATS
|
6. Converts envelope to JSON string and optionally publishes to NATS
|
||||||
|
|
||||||
# Arguments:
|
# Arguments:
|
||||||
@@ -368,13 +367,13 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
|
|||||||
- `data::AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples to send
|
- `data::AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples to send
|
||||||
- `dataname::String` - Name of the payload
|
- `dataname::String` - Name of the payload
|
||||||
- `data::Any` - The actual data to send
|
- `data::Any` - The actual data to send
|
||||||
- `type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
- `payload_type::String` - Payload type: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||||
- No standalone `type` parameter - type is specified per payload
|
- No standalone `type` parameter - type is specified per payload
|
||||||
|
|
||||||
# Keyword Arguments:
|
# Keyword Arguments:
|
||||||
- `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server
|
- `broker_url::String = DEFAULT_BROKER_URL` - URL of the NATS server
|
||||||
- `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads
|
- `fileserver_url = DEFAULT_FILESERVER_URL` - URL of the HTTP file server for large payloads
|
||||||
- `fileserverUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
|
- `fileserver_upload_handler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must return Dict with "status", "uploadid", "fileid", "url" keys)
|
||||||
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
|
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
|
||||||
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
|
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
|
||||||
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
- `msg_purpose::String = "chat"` - Purpose of the message: "ACK", "NACK", "updateStatus", "shutdown", "chat", etc.
|
||||||
@@ -387,7 +386,7 @@ Each payload can have a different type, enabling mixed-content messages (e.g., c
|
|||||||
|
|
||||||
# Return:
|
# Return:
|
||||||
- A tuple `(env, env_json_str)` where:
|
- A tuple `(env, env_json_str)` where:
|
||||||
- `env::msgEnvelope_v1` - The envelope object containing all metadata and payloads
|
- `env::msg_envelope_v1` - The envelope object containing all metadata and payloads
|
||||||
- `env_json_str::String` - JSON string representation of the envelope for publishing
|
- `env_json_str::String` - JSON string representation of the envelope for publishing
|
||||||
|
|
||||||
# Example
|
# Example
|
||||||
@@ -415,15 +414,15 @@ env, msg_json = smartsend("chat.subject", [
|
|||||||
])
|
])
|
||||||
|
|
||||||
# Publish the JSON string directly using NATS request-reply pattern
|
# Publish the JSON string directly using NATS request-reply pattern
|
||||||
# reply = NATS.request(nats_url, subject, env_json_str; reply_to=reply_to_topic)
|
# reply = NATS.request(broker_url, subject, env_json_str; reply_to=reply_to_topic)
|
||||||
```
|
```
|
||||||
""" #[PENDING]
|
"""
|
||||||
function smartsend(
|
function smartsend(
|
||||||
subject::String, # smartreceive's subject
|
subject::String, # smartreceive's subject
|
||||||
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples. Use Tuple{String, Any, String}[] for empty payloads
|
data::AbstractArray{Tuple{String, T1, String}, 1}; # List of (dataname, data, type) tuples. Use Tuple{String, Any, String}[] for empty payloads
|
||||||
nats_url::String = DEFAULT_NATS_URL,
|
broker_url::String = DEFAULT_BROKER_URL, # NATS server URL
|
||||||
fileserver_url = DEFAULT_FILESERVER_URL,
|
fileserver_url = DEFAULT_FILESERVER_URL,
|
||||||
fileserverUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
|
fileserver_upload_handler::Function = plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
|
||||||
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
||||||
correlation_id::Union{String, Nothing} = nothing,
|
correlation_id::Union{String, Nothing} = nothing,
|
||||||
msg_purpose::String = "chat",
|
msg_purpose::String = "chat",
|
||||||
@@ -444,13 +443,13 @@ function smartsend(
|
|||||||
msg_id = string(uuid4())
|
msg_id = string(uuid4())
|
||||||
|
|
||||||
# Process each payload in the list
|
# Process each payload in the list
|
||||||
payloads = msgPayload_v1[]
|
payloads = msg_payload_v1[]
|
||||||
for (dataname, payload_data, payload_type) in data
|
for (dataname, payload_data, payload_type) in data
|
||||||
# Serialize data based on type
|
# Serialize data based on type
|
||||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
payload_bytes = _serialize_data(payload_data, payload_type)
|
||||||
|
|
||||||
payload_size = length(payload_bytes) # Calculate payload size in bytes
|
payload_size = length(payload_bytes) # Calculate payload size in bytes
|
||||||
log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size
|
log_trace(cid, "Serialized payload '$dataname' (payload_type: $payload_type) size: $payload_size bytes") # Log payload size
|
||||||
|
|
||||||
# Decision: Direct vs Link
|
# Decision: Direct vs Link
|
||||||
if payload_size < size_threshold # Check if payload is small enough for direct transport
|
if payload_size < size_threshold # Check if payload is small enough for direct transport
|
||||||
@@ -458,8 +457,8 @@ function smartsend(
|
|||||||
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
||||||
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
|
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
|
||||||
|
|
||||||
# Create msgPayload_v1 for direct transport
|
# Create msg_payload_v1 for direct transport
|
||||||
payload = msgPayload_v1(
|
payload = msg_payload_v1(
|
||||||
payload_b64,
|
payload_b64,
|
||||||
payload_type;
|
payload_type;
|
||||||
id = string(uuid4()),
|
id = string(uuid4()),
|
||||||
@@ -474,18 +473,18 @@ function smartsend(
|
|||||||
# Link path - Upload to HTTP server, send URL via NATS
|
# Link path - Upload to HTTP server, send URL via NATS
|
||||||
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
|
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
|
||||||
|
|
||||||
# Upload to HTTP server
|
# Upload to HTTP server
|
||||||
response = fileserverUploadHandler(fileserver_url, dataname, payload_bytes)
|
response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
|
||||||
|
|
||||||
if response["status"] != 200 # Check if upload was successful
|
if response["status"] != 200 # Check if upload was successful
|
||||||
error("Failed to upload data to fileserver: $(response["status"])") # Throw error if upload failed
|
error("Failed to upload data to fileserver: $(response["status"])") # Throw error if upload failed
|
||||||
end
|
end
|
||||||
|
|
||||||
url = response["url"] # URL for the uploaded data
|
url = response["url"] # URL for the uploaded data
|
||||||
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
|
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
|
||||||
|
|
||||||
# Create msgPayload_v1 for link transport
|
# Create msg_payload_v1 for link transport
|
||||||
payload = msgPayload_v1(
|
payload = msg_payload_v1(
|
||||||
url,
|
url,
|
||||||
payload_type;
|
payload_type;
|
||||||
id = string(uuid4()),
|
id = string(uuid4()),
|
||||||
@@ -499,26 +498,26 @@ function smartsend(
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Create msgEnvelope_v1 with all payloads
|
# Create msg_envelope_v1 with all payloads
|
||||||
env = msgEnvelope_v1(
|
env = msg_envelope_v1(
|
||||||
subject,
|
subject,
|
||||||
payloads;
|
payloads;
|
||||||
correlationId = cid,
|
correlation_id = cid,
|
||||||
msgId = msg_id,
|
msg_id = msg_id,
|
||||||
msgPurpose = msg_purpose,
|
msg_purpose = msg_purpose,
|
||||||
senderName = sender_name,
|
sender_name = sender_name,
|
||||||
senderId = string(uuid4()),
|
sender_id = string(uuid4()),
|
||||||
receiverName = receiver_name,
|
receiver_name = receiver_name,
|
||||||
receiverId = receiver_id,
|
receiver_id = receiver_id,
|
||||||
replyTo = reply_to,
|
reply_to = reply_to,
|
||||||
replyToMsgId = reply_to_msg_id,
|
reply_to_msg_id = reply_to_msg_id,
|
||||||
brokerURL = nats_url,
|
broker_url = broker_url,
|
||||||
metadata = Dict{String, Any}(),
|
metadata = Dict{String, Any}(),
|
||||||
)
|
)
|
||||||
|
|
||||||
env_json_str = envelope_to_json(env) # Convert envelope to JSON
|
env_json_str = envelope_to_json(env) # Convert envelope to JSON
|
||||||
if is_publish
|
if is_publish
|
||||||
publish_message(nats_url, subject, env_json_str, cid) # Publish message to NATS
|
publish_message(broker_url, subject, env_json_str, cid) # Publish message to NATS
|
||||||
end
|
end
|
||||||
|
|
||||||
return (env, env_json_str)
|
return (env, env_json_str)
|
||||||
@@ -539,7 +538,7 @@ It supports multiple serialization formats for different data types.
|
|||||||
|
|
||||||
# Arguments:
|
# Arguments:
|
||||||
- `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`)
|
- `data::Any` - Data to serialize (string for `"text"`, JSON-serializable for `"dictionary"`, table-like for `"table"`, binary for `"image"`, `"audio"`, `"video"`, `"binary"`)
|
||||||
- `type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
- `payload_type::String` - Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
|
||||||
|
|
||||||
# Return:
|
# Return:
|
||||||
- `Vector{UInt8}` - Binary representation of the serialized data
|
- `Vector{UInt8}` - Binary representation of the serialized data
|
||||||
@@ -585,7 +584,7 @@ binary_bytes = _serialize_data(buf, "binary")
|
|||||||
binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary")
|
binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary")
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
function _serialize_data(data::Any, type::String)
|
function _serialize_data(data::Any, payload_type::String)
|
||||||
""" Example on how JSON.jl convert: dictionary -> json string -> json string bytes -> json string -> json object
|
""" Example on how JSON.jl convert: dictionary -> json string -> json string bytes -> json string -> json object
|
||||||
d = Dict(
|
d = Dict(
|
||||||
"name"=>"ton",
|
"name"=>"ton",
|
||||||
@@ -602,40 +601,40 @@ function _serialize_data(data::Any, type::String)
|
|||||||
json_obj = JSON.parse(json_str_2)
|
json_obj = JSON.parse(json_str_2)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if type == "text" # Text data - convert to UTF-8 bytes
|
if payload_type == "text" # Text data - convert to UTF-8 bytes
|
||||||
if isa(data, String)
|
if isa(data, String)
|
||||||
data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes
|
data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes
|
||||||
return data_bytes
|
return data_bytes
|
||||||
else
|
else
|
||||||
error("Text data must be a String")
|
error("Text data must be a String")
|
||||||
end
|
end
|
||||||
elseif type == "dictionary" # JSON data - serialize directly
|
elseif payload_type == "dictionary" # JSON data - serialize directly
|
||||||
json_str = JSON.json(data) # Convert Julia data to JSON string
|
json_str = JSON.json(data) # Convert Julia data to JSON string
|
||||||
json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes
|
json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes
|
||||||
return json_str_bytes
|
return json_str_bytes
|
||||||
elseif type == "table" # Table data - convert to Arrow IPC stream
|
elseif payload_type == "table" # Table data - convert to Arrow IPC stream
|
||||||
io = IOBuffer() # Create in-memory buffer
|
io = IOBuffer() # Create in-memory buffer
|
||||||
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
|
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
|
||||||
return take!(io) # Return the buffer contents as bytes
|
return take!(io) # Return the buffer contents as bytes
|
||||||
elseif type == "image" # Image data - treat as binary
|
elseif payload_type == "image" # Image data - treat as binary
|
||||||
if isa(data, Vector{UInt8})
|
if isa(data, Vector{UInt8})
|
||||||
return data # Return binary data directly
|
return data # Return binary data directly
|
||||||
else
|
else
|
||||||
error("Image data must be Vector{UInt8}")
|
error("Image data must be Vector{UInt8}")
|
||||||
end
|
end
|
||||||
elseif type == "audio" # Audio data - treat as binary
|
elseif payload_type == "audio" # Audio data - treat as binary
|
||||||
if isa(data, Vector{UInt8})
|
if isa(data, Vector{UInt8})
|
||||||
return data # Return binary data directly
|
return data # Return binary data directly
|
||||||
else
|
else
|
||||||
error("Audio data must be Vector{UInt8}")
|
error("Audio data must be Vector{UInt8}")
|
||||||
end
|
end
|
||||||
elseif type == "video" # Video data - treat as binary
|
elseif payload_type == "video" # Video data - treat as binary
|
||||||
if isa(data, Vector{UInt8})
|
if isa(data, Vector{UInt8})
|
||||||
return data # Return binary data directly
|
return data # Return binary data directly
|
||||||
else
|
else
|
||||||
error("Video data must be Vector{UInt8}")
|
error("Video data must be Vector{UInt8}")
|
||||||
end
|
end
|
||||||
elseif type == "binary" # Binary data - treat as binary
|
elseif payload_type == "binary" # Binary data - treat as binary
|
||||||
if isa(data, IOBuffer) # Check if data is an IOBuffer
|
if isa(data, IOBuffer) # Check if data is an IOBuffer
|
||||||
return take!(data) # Return buffer contents as bytes
|
return take!(data) # Return buffer contents as bytes
|
||||||
elseif isa(data, Vector{UInt8}) # Check if data is already binary
|
elseif isa(data, Vector{UInt8}) # Check if data is already binary
|
||||||
@@ -644,7 +643,7 @@ function _serialize_data(data::Any, type::String)
|
|||||||
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
||||||
end
|
end
|
||||||
else # Unknown type
|
else # Unknown type
|
||||||
error("Unknown type: $type")
|
error("Unknown payload_type: $type")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -654,7 +653,7 @@ This internal function publishes a message to a NATS subject with proper
|
|||||||
connection management and logging.
|
connection management and logging.
|
||||||
|
|
||||||
# Arguments:
|
# Arguments:
|
||||||
- `nats_url::String` - NATS server URL (e.g., "nats://localhost:4222")
|
- `broker_url::String` - NATS server URL (e.g., "nats://localhost:4222")
|
||||||
- `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
|
- `subject::String` - NATS subject to publish to (e.g., "/agent/wine/api/v1/prompt")
|
||||||
- `message::String` - JSON message to publish
|
- `message::String` - JSON message to publish
|
||||||
- `correlation_id::String` - Correlation ID for tracing and logging
|
- `correlation_id::String` - Correlation ID for tracing and logging
|
||||||
@@ -663,18 +662,18 @@ connection management and logging.
|
|||||||
- `nothing` - This function performs publishing but returns nothing
|
- `nothing` - This function performs publishing but returns nothing
|
||||||
|
|
||||||
# Example
|
# Example
|
||||||
```jldoctest
|
```jldoctest
|
||||||
using NATS
|
using NATS
|
||||||
|
|
||||||
# Prepare JSON message
|
# Prepare JSON message
|
||||||
message = "{\"correlationId\":\"abc123\",\"payload\":\"test\"}"
|
message = "{\"correlation_id\":\"abc123\",\"payload\":\"test\"}"
|
||||||
|
|
||||||
# Publish to NATS
|
# Publish to NATS
|
||||||
publish_message("nats://localhost:4222", "my.subject", message, "abc123")
|
publish_message("nats://localhost:4222", "my.subject", message, "abc123")
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
function publish_message(nats_url::String, subject::String, message::String, correlation_id::String)
|
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
|
||||||
conn = NATS.connect(nats_url) # Create NATS connection
|
conn = NATS.connect(broker_url) # Create NATS connection
|
||||||
try
|
try
|
||||||
NATS.publish(conn, subject, message) # Publish message to NATS
|
NATS.publish(conn, subject, message) # Publish message to NATS
|
||||||
log_trace(correlation_id, "Message published to $subject") # Log successful publish
|
log_trace(correlation_id, "Message published to $subject") # Log successful publish
|
||||||
@@ -701,7 +700,7 @@ A HTTP file server is required along with its download function.
|
|||||||
- `msg::NATS.Msg` - NATS message to process
|
- `msg::NATS.Msg` - NATS message to process
|
||||||
|
|
||||||
# Keyword Arguments:
|
# Keyword Arguments:
|
||||||
- `fileserverDownloadHandler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs
|
- `fileserver_download_handler::Function = _fetch_with_backoff` - Function to handle downloading data from file server URLs
|
||||||
- `max_retries::Int = 5` - Maximum retry attempts for fetching URL
|
- `max_retries::Int = 5` - Maximum retry attempts for fetching URL
|
||||||
- `base_delay::Int = 100` - Initial delay for exponential backoff in ms
|
- `base_delay::Int = 100` - Initial delay for exponential backoff in ms
|
||||||
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
|
- `max_delay::Int = 5000` - Maximum delay for exponential backoff in ms
|
||||||
@@ -710,23 +709,23 @@ A HTTP file server is required along with its download function.
|
|||||||
- `AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples
|
- `AbstractArray{Tuple{String, Any, String}}` - List of (dataname, data, type) tuples
|
||||||
|
|
||||||
# Example
|
# Example
|
||||||
```jldoctest
|
```jldoctest
|
||||||
# Receive and process message
|
# Receive and process message
|
||||||
msg = nats_message # NATS message
|
msg = nats_message # NATS message
|
||||||
payloads = smartreceive(msg; fileserverDownloadHandler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
|
payloads = smartreceive(msg; fileserver_download_handler=_fetch_with_backoff, max_retries=5, base_delay=100, max_delay=5000)
|
||||||
# payloads = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]
|
# payloads = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
function smartreceive(
|
function smartreceive(
|
||||||
msg::NATS.Msg;
|
msg::NATS.Msg;
|
||||||
fileserverDownloadHandler::Function=_fetch_with_backoff,
|
fileserver_download_handler::Function=_fetch_with_backoff,
|
||||||
max_retries::Int = 5,
|
max_retries::Int = 5,
|
||||||
base_delay::Int = 100,
|
base_delay::Int = 100,
|
||||||
max_delay::Int = 5000
|
max_delay::Int = 5000
|
||||||
)
|
)
|
||||||
# Parse the JSON envelope
|
# Parse the JSON envelope
|
||||||
json_data = JSON.parse(String(msg.payload))
|
json_data = JSON.parse(String(msg.payload))
|
||||||
log_trace(json_data["correlationId"], "Processing received message") # Log message processing start
|
log_trace(json_data["correlation_id"], "Processing received message") # Log message processing start
|
||||||
|
|
||||||
# Process all payloads in the envelope
|
# Process all payloads in the envelope
|
||||||
payloads_list = Tuple{String, Any, String}[]
|
payloads_list = Tuple{String, Any, String}[]
|
||||||
@@ -740,7 +739,7 @@ function smartreceive(
|
|||||||
dataname = String(payload["dataname"])
|
dataname = String(payload["dataname"])
|
||||||
|
|
||||||
if transport == "direct" # Direct transport - payload is in the message
|
if transport == "direct" # Direct transport - payload is in the message
|
||||||
log_trace(json_data["correlationId"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
|
log_trace(json_data["correlation_id"], "Direct transport - decoding payload '$dataname'") # Log direct transport handling
|
||||||
|
|
||||||
# Extract base64 payload from the payload
|
# Extract base64 payload from the payload
|
||||||
payload_b64 = String(payload["data"])
|
payload_b64 = String(payload["data"])
|
||||||
@@ -749,21 +748,21 @@ function smartreceive(
|
|||||||
payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes
|
payload_bytes = Base64.base64decode(payload_b64) # Decode base64 payload to bytes
|
||||||
|
|
||||||
# Deserialize based on type
|
# Deserialize based on type
|
||||||
data_type = String(payload["type"])
|
data_type = String(payload["payload_type"])
|
||||||
data = _deserialize_data(payload_bytes, data_type, json_data["correlationId"])
|
data = _deserialize_data(payload_bytes, data_type, json_data["correlation_id"])
|
||||||
|
|
||||||
push!(payloads_list, (dataname, data, data_type))
|
push!(payloads_list, (dataname, data, data_type))
|
||||||
elseif transport == "link" # Link transport - payload is at URL
|
elseif transport == "link" # Link transport - payload is at URL
|
||||||
# Extract download URL from the payload
|
# Extract download URL from the payload
|
||||||
url = String(payload["data"])
|
url = String(payload["data"])
|
||||||
log_trace(json_data["correlationId"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
|
log_trace(json_data["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Log link transport handling
|
||||||
|
|
||||||
# Fetch with exponential backoff using the download handler
|
# Fetch with exponential backoff using the download handler
|
||||||
downloaded_data = fileserverDownloadHandler(url, max_retries, base_delay, max_delay, json_data["correlationId"])
|
downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, json_data["correlation_id"])
|
||||||
|
|
||||||
# Deserialize based on type
|
# Deserialize based on type
|
||||||
data_type = String(payload["type"])
|
data_type = String(payload["payload_type"])
|
||||||
data = _deserialize_data(downloaded_data, data_type, json_data["correlationId"])
|
data = _deserialize_data(downloaded_data, data_type, json_data["correlation_id"])
|
||||||
|
|
||||||
push!(payloads_list, (dataname, data, data_type))
|
push!(payloads_list, (dataname, data, data_type))
|
||||||
else # Unknown transport type
|
else # Unknown transport type
|
||||||
@@ -851,7 +850,7 @@ It handles "text" (string), "dictionary" (JSON deserialization), "table" (Arrow
|
|||||||
|
|
||||||
# Arguments:
|
# Arguments:
|
||||||
- `data::Vector{UInt8}` - Serialized data as bytes
|
- `data::Vector{UInt8}` - Serialized data as bytes
|
||||||
- `type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
|
- `payload_type::String` - Data type ("text", "dictionary", "table", "image", "audio", "video", "binary")
|
||||||
- `correlation_id::String` - Correlation ID for logging
|
- `correlation_id::String` - Correlation ID for logging
|
||||||
|
|
||||||
# Return:
|
# Return:
|
||||||
@@ -877,28 +876,28 @@ table_data = _deserialize_data(arrow_bytes, "table", "correlation123")
|
|||||||
"""
|
"""
|
||||||
function _deserialize_data(
|
function _deserialize_data(
|
||||||
data::Vector{UInt8},
|
data::Vector{UInt8},
|
||||||
type::String,
|
payload_type::String,
|
||||||
correlation_id::String
|
correlation_id::String
|
||||||
)
|
)
|
||||||
if type == "text" # Text data - convert to string
|
if payload_type == "text" # Text data - convert to string
|
||||||
return String(data) # Convert bytes to string
|
return String(data) # Convert bytes to string
|
||||||
elseif type == "dictionary" # JSON data - deserialize
|
elseif payload_type == "dictionary" # JSON data - deserialize
|
||||||
json_str = String(data) # Convert bytes to string
|
json_str = String(data) # Convert bytes to string
|
||||||
return JSON.parse(json_str) # Parse JSON string to JSON object
|
return JSON.parse(json_str) # Parse JSON string to JSON object
|
||||||
elseif type == "table" # Table data - deserialize Arrow IPC stream
|
elseif payload_type == "table" # Table data - deserialize Arrow IPC stream
|
||||||
io = IOBuffer(data) # Create buffer from bytes
|
io = IOBuffer(data) # Create buffer from bytes
|
||||||
df = Arrow.Table(io) # Read Arrow IPC format from buffer
|
df = Arrow.Table(io) # Read Arrow IPC format from buffer
|
||||||
return df # Return DataFrame
|
return df # Return DataFrame
|
||||||
elseif type == "image" # Image data - return binary
|
elseif payload_type == "image" # Image data - return binary
|
||||||
return data # Return bytes directly
|
return data # Return bytes directly
|
||||||
elseif type == "audio" # Audio data - return binary
|
elseif payload_type == "audio" # Audio data - return binary
|
||||||
return data # Return bytes directly
|
return data # Return bytes directly
|
||||||
elseif type == "video" # Video data - return binary
|
elseif payload_type == "video" # Video data - return binary
|
||||||
return data # Return bytes directly
|
return data # Return bytes directly
|
||||||
elseif type == "binary" # Binary data - return binary
|
elseif payload_type == "binary" # Binary data - return binary
|
||||||
return data # Return bytes directly
|
return data # Return bytes directly
|
||||||
else # Unknown type
|
else # Unknown type
|
||||||
error("Unknown type: $type") # Throw error for unknown type
|
error("Unknown payload_type: $payload_type") # Throw error for unknown type
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -915,9 +914,9 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
|
|||||||
4. Returns identifiers and download URL for the uploaded file
|
4. Returns identifiers and download URL for the uploaded file
|
||||||
|
|
||||||
# Arguments:
|
# Arguments:
|
||||||
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
|
- `file_server_url::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
|
||||||
- `filename::String` - Name of the file being uploaded
|
- `filename::String` - Name of the file being uploaded
|
||||||
- `data::Vector{UInt8}` - Raw byte data of the file content
|
- `data::Vector{UInt8}` - Raw byte data of the file content
|
||||||
|
|
||||||
# Return:
|
# Return:
|
||||||
- `Dict{String, Any}` - Dictionary with keys:
|
- `Dict{String, Any}` - Dictionary with keys:
|
||||||
@@ -927,36 +926,36 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
|
|||||||
- `"url"` - Full URL to download the uploaded file
|
- `"url"` - Full URL to download the uploaded file
|
||||||
|
|
||||||
# Example
|
# Example
|
||||||
```jldoctest
|
```jldoctest
|
||||||
using HTTP, JSON
|
using HTTP, JSON
|
||||||
|
|
||||||
fileServerURL = "http://localhost:8080"
|
file_server_url = "http://localhost:8080"
|
||||||
filename = "test.txt"
|
filename = "test.txt"
|
||||||
data = UInt8["hello world"]
|
data = UInt8["hello world"]
|
||||||
|
|
||||||
# Upload to local plik server
|
# Upload to local plik server
|
||||||
result = plik_oneshot_upload(fileServerURL, filename, data)
|
result = plik_oneshot_upload(file_server_url, filename, data)
|
||||||
|
|
||||||
# Access the result as a Dict
|
# Access the result as a Dict
|
||||||
# result["status"], result["uploadid"], result["fileid"], result["url"]
|
# result["status"], result["uploadid"], result["fileid"], result["url"]
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8})
|
function plik_oneshot_upload(file_server_url::String, filename::String, data::Vector{UInt8})
|
||||||
|
|
||||||
# ----------------------------------------- get upload id ---------------------------------------- #
|
# ----------------------------------------- get upload id ---------------------------------------- #
|
||||||
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
|
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
|
||||||
url_getUploadID = "$fileServerURL/upload" # URL to get upload ID
|
url_getUploadID = "$file_server_url/upload" # URL to get upload ID
|
||||||
headers = ["Content-Type" => "application/json"]
|
headers = ["Content-Type" => "application/json"]
|
||||||
body = """{ "OneShot" : true }"""
|
body = """{ "OneShot" : true }"""
|
||||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||||
responseJson = JSON.parse(httpResponse.body)
|
response_json = JSON.parse(http_response.body)
|
||||||
uploadid = responseJson["id"]
|
uploadid = response_json["id"]
|
||||||
uploadtoken = responseJson["uploadToken"]
|
uploadtoken = response_json["uploadToken"]
|
||||||
|
|
||||||
# ------------------------------------------ upload file ----------------------------------------- #
|
# ------------------------------------------ upload file ----------------------------------------- #
|
||||||
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
|
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
|
||||||
file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload
|
file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload
|
||||||
url_upload = "$fileServerURL/file/$uploadid"
|
url_upload = "$file_server_url/file/$uploadid"
|
||||||
headers = ["X-UploadToken" => uploadtoken]
|
headers = ["X-UploadToken" => uploadtoken]
|
||||||
|
|
||||||
# Create the multipart form data
|
# Create the multipart form data
|
||||||
@@ -965,24 +964,24 @@ function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vect
|
|||||||
))
|
))
|
||||||
|
|
||||||
# Execute the POST request
|
# Execute the POST request
|
||||||
httpResponse = nothing
|
http_response = nothing
|
||||||
try
|
try
|
||||||
httpResponse = HTTP.post(url_upload, headers, form)
|
http_response = HTTP.post(url_upload, headers, form)
|
||||||
responseJson = JSON.parse(httpResponse.body)
|
response_json = JSON.parse(http_response.body)
|
||||||
catch e
|
catch e
|
||||||
@error "Request failed" exception=e
|
@error "Request failed" exception=e
|
||||||
end
|
end
|
||||||
|
|
||||||
fileid = responseJson["id"]
|
fileid = response_json["id"]
|
||||||
|
|
||||||
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
|
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
|
||||||
url = "$fileServerURL/file/$uploadid/$fileid/$filename"
|
url = "$file_server_url/file/$uploadid/$fileid/$filename"
|
||||||
|
|
||||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
""" plik_oneshot_upload(fileServerURL::String, filepath::String)
|
""" plik_oneshot_upload(file_server_url::String, filepath::String)
|
||||||
This function uploads a file from disk to a plik server in one-shot mode (no upload session).
|
This function uploads a file from disk to a plik server in one-shot mode (no upload session).
|
||||||
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
|
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
|
||||||
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
|
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
|
||||||
@@ -994,7 +993,7 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
|
|||||||
4. Returns identifiers and download URL for the uploaded file
|
4. Returns identifiers and download URL for the uploaded file
|
||||||
|
|
||||||
# Arguments:
|
# Arguments:
|
||||||
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
|
- `file_server_url::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
|
||||||
- `filepath::String` - Full path to the local file to upload
|
- `filepath::String` - Full path to the local file to upload
|
||||||
|
|
||||||
# Return:
|
# Return:
|
||||||
@@ -1008,56 +1007,54 @@ retrieves an upload ID and token, then uploads the file data as multipart form d
|
|||||||
```jldoctest
|
```jldoctest
|
||||||
using HTTP, JSON
|
using HTTP, JSON
|
||||||
|
|
||||||
fileServerURL = "http://localhost:8080"
|
file_server_url = "http://localhost:8080"
|
||||||
filepath = "./test.zip"
|
filepath = "./test.zip"
|
||||||
|
|
||||||
# Upload to local plik server
|
# Upload to local plik server
|
||||||
result = plik_oneshot_upload(fileServerURL, filepath)
|
result = plik_oneshot_upload(file_server_url, filepath)
|
||||||
|
|
||||||
# Access the result as a Dict
|
# Access the result as a Dict
|
||||||
# result["status"], result["uploadid"], result["fileid"], result["url"]
|
# result["status"], result["uploadid"], result["fileid"], result["url"]
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
function plik_oneshot_upload(fileServerURL::String, filepath::String)
|
function plik_oneshot_upload(file_server_url::String, filepath::String)
|
||||||
|
|
||||||
# ----------------------------------------- get upload id ---------------------------------------- #
|
# ----------------------------------------- get upload id ---------------------------------------- #
|
||||||
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
|
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
|
||||||
filename = basename(filepath)
|
filename = basename(filepath)
|
||||||
url_getUploadID = "$fileServerURL/upload" # URL to get upload ID
|
url_getUploadID = "$file_server_url/upload" # URL to get upload ID
|
||||||
headers = ["Content-Type" => "application/json"]
|
headers = ["Content-Type" => "application/json"]
|
||||||
body = """{ "OneShot" : true }"""
|
body = """{ "OneShot" : true }"""
|
||||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
||||||
responseJson = JSON.parse(httpResponse.body)
|
response_json = JSON.parse(http_response.body)
|
||||||
|
|
||||||
uploadid = responseJson["id"]
|
uploadid = response_json["id"]
|
||||||
uploadtoken = responseJson["uploadToken"]
|
uploadtoken = response_json["uploadToken"]
|
||||||
|
|
||||||
# ------------------------------------------ upload file ----------------------------------------- #
|
# ------------------------------------------ upload file ----------------------------------------- #
|
||||||
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
|
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
|
||||||
file_multipart = open(filepath, "r")
|
url_upload = "$file_server_url/file/$uploadid"
|
||||||
url_upload = "$fileServerURL/file/$uploadid"
|
|
||||||
headers = ["X-UploadToken" => uploadtoken]
|
headers = ["X-UploadToken" => uploadtoken]
|
||||||
|
response = open(filepath, "r") do file_stream
|
||||||
|
form = HTTP.Form(Dict("file" => file_stream))
|
||||||
|
|
||||||
# Create the multipart form data
|
# Adding status_exception=false prevents 4xx/5xx from triggering 'catch'
|
||||||
form = HTTP.Form(Dict(
|
HTTP.post(url_upload, headers, form; status_exception = false)
|
||||||
"file" => file_multipart
|
end
|
||||||
))
|
|
||||||
|
|
||||||
# Execute the POST request
|
if !isnothing(response) && response.status == 200
|
||||||
httpResponse = nothing
|
println("Success!")
|
||||||
try
|
else
|
||||||
httpResponse = HTTP.post(url_upload, headers, form)
|
println("Server returned an error code: ", response.status)
|
||||||
responseJson = JSON.parse(httpResponse.body)
|
|
||||||
catch e
|
|
||||||
@error "Request failed" exception=e
|
|
||||||
end
|
end
|
||||||
|
response_json = JSON.parse(response.body)
|
||||||
|
|
||||||
fileid = responseJson["id"]
|
fileid = response_json["id"]
|
||||||
|
|
||||||
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
|
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
|
||||||
url = "$fileServerURL/file/$uploadid/$fileid/$filename"
|
url = "$file_server_url/file/$uploadid/$fileid/$filename"
|
||||||
|
|
||||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user