668 lines
27 KiB
Julia
668 lines
27 KiB
Julia
# Bi-Directional Data Bridge - Julia Module
|
|
# Implements smartsend and smartreceive for NATS communication
|
|
# This module provides functionality for sending and receiving data across network boundaries
|
|
# using NATS as the message bus, with support for both direct payload transport and
|
|
# URL-based transport for larger payloads.
|
|
|
|
module NATSBridge
|
|
|
|
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64
|
|
# ---------------------------------------------- 100 --------------------------------------------- #
|
|
|
|
# Constants
|
|
const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport
|
|
const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL
|
|
const DEFAULT_FILESERVER_URL = "http://localhost:8080/upload" # Default HTTP file server URL for link transport
|
|
|
|
|
|
""" Struct for the unified JSON envelope
|
|
This struct represents a standardized message format that can carry either
|
|
direct payload data or a URL reference, allowing flexible transport strategies
|
|
based on payload size and requirements.
|
|
"""
|
|
struct MessageEnvelope
|
|
correlation_id::String # Unique identifier to track messages across systems
|
|
type::String # Data type indicator (e.g., "json", "table", "binary")
|
|
transport::String # Transport strategy: "direct" (base64 encoded bytes) or "link" (URL reference)
|
|
payload::Union{String, Nothing} # Base64-encoded payload for direct transport
|
|
url::Union{String, Nothing} # URL reference for link transport
|
|
metadata::Dict{String, Any} # Additional metadata about the payload
|
|
end
|
|
|
|
""" Constructor for MessageEnvelope with keyword arguments and defaults
|
|
This constructor provides a convenient way to create an envelope using keyword arguments,
|
|
automatically generating a correlation ID if not provided, and defaulting to "json" type
|
|
and "direct" transport.
|
|
"""
|
|
function MessageEnvelope(
|
|
; correlation_id::String = string(uuid4()), # Generate unique ID if not provided
|
|
type::String = "json", # Default data type
|
|
transport::String = "direct", # Default transport method
|
|
payload::Union{String, Nothing} = nothing, # No payload by default
|
|
url::Union{String, Nothing} = nothing, # No URL by default
|
|
metadata::Dict{String, Any} = Dict{String, Any}() # Empty metadata by default
|
|
)
|
|
MessageEnvelope(correlation_id, type, transport, payload, url, metadata)
|
|
end
|
|
|
|
""" Constructor for MessageEnvelope from JSON string
|
|
This constructor parses a JSON string and reconstructs a MessageEnvelope struct.
|
|
It handles the metadata field specially by converting the JSON object to a Julia Dict,
|
|
extracting values from the JSON structure for all other fields.
|
|
"""
|
|
function MessageEnvelope(json_str::String)
|
|
data = JSON.parse(json_str) # Parse JSON string into Julia data structure
|
|
metadata = Dict{String, Any}()
|
|
if haskey(data, :metadata) # Check if metadata exists in JSON
|
|
metadata = Dict(String(k) => v for (k, v) in data.metadata) # Convert JSON keys to strings and store in Dict
|
|
end
|
|
|
|
MessageEnvelope(
|
|
correlation_id = String(data.correlation_id), # Extract correlation_id from JSON data
|
|
type = String(data.type), # Extract type from JSON data
|
|
transport = String(data.transport), # Extract transport from JSON data
|
|
payload = haskey(data, :payload) ? String(data.payload) : nothing, # Extract payload if present
|
|
url = haskey(data, :url) ? String(data.url) : nothing, # Extract URL if present
|
|
metadata = metadata # Use the parsed metadata
|
|
)
|
|
end
|
|
|
|
|
|
""" Convert MessageEnvelope to JSON string
|
|
This function converts the MessageEnvelope struct to a JSON string representation.
|
|
It only includes fields in the JSON output if they have non-nothing values,
|
|
making the JSON output cleaner and more efficient.
|
|
"""
|
|
function envelope_to_json(env::MessageEnvelope)
|
|
obj = Dict{String, Any}(
|
|
"correlation_id" => env.correlation_id, # Always include correlation_id
|
|
"type" => env.type, # Always include type
|
|
"transport" => env.transport # Always include transport
|
|
)
|
|
|
|
if env.payload !== nothing # Only include payload if it exists
|
|
obj["payload"] = env.payload
|
|
end
|
|
|
|
if env.url !== nothing # Only include URL if it exists
|
|
obj["url"] = env.url
|
|
end
|
|
|
|
if !isempty(env.metadata) # Only include metadata if it exists and is not empty
|
|
obj["metadata"] = env.metadata
|
|
end
|
|
|
|
JSON.json(obj) # Convert Dict to JSON string
|
|
end
|
|
|
|
|
|
""" Log a trace message with correlation ID and timestamp
|
|
This function logs information messages with a correlation ID for tracing purposes,
|
|
making it easier to track message flow across distributed systems.
|
|
"""
|
|
function log_trace(correlation_id::String, message::String)
|
|
timestamp = Dates.now() # Get current timestamp
|
|
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
|
|
end
|
|
2112
|
|
|
|
""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
|
|
|
|
This function intelligently routes data delivery based on payload size relative to a threshold.
|
|
If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS.
|
|
Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_upload`) and publishes only the download URL over NATS.
|
|
|
|
The function workflow:
|
|
1. Serializes the provided data according to the specified format (`type`)
|
|
2. Compares the serialized size against `size_threshold`
|
|
3. For small payloads: encodes as Base64, constructs a "direct" MessageEnvelope, and publishes to NATS
|
|
4. For large payloads: uploads to the fileserver, constructs a "link" MessageEnvelope with the URL, and publishes to NATS
|
|
|
|
# Arguments:
|
|
- `subject::String` - NATS subject to publish the message to
|
|
- `data::Any` - Data payload to send (any Julia object)
|
|
- `type::String = "json"` - Serialization format: `"json"` or `"arrow"`
|
|
|
|
# Keyword Arguments:
|
|
- `dataname::String = string(UUIDs.uuid4())` - Filename to use when uploading to fileserver (auto-generated UUID if not provided)
|
|
- `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server
|
|
- `fileserver_url::String = DEFAULT_FILESERVER_URL` - Base URL of the fileserver (e.g., `"http://localhost:8080"`)
|
|
- `fileServerUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must match signature of `plik_oneshot_upload`)
|
|
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
|
|
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
|
|
|
|
# Return:
|
|
- A `MessageEnvelope` object containing metadata and transport information:
|
|
- `correlation_id::String` - Unique identifier for this message exchange
|
|
- `type::String` - Serialization type used (`"json"` or `"arrow"`)
|
|
- `transport::String` - Either `"direct"` or `"link"`
|
|
- `payload::Union{String, Nothing}` - Base64-encoded data for direct transport, `nothing` for link transport
|
|
- `url::Union{String, Nothing}` - Download URL for link transport, `nothing` for direct transport
|
|
- `metadata::Dict` - Additional metadata (e.g., `"content_length"`, `"format"`)
|
|
|
|
# Example
|
|
```julia
|
|
using UUIDs
|
|
|
|
# Send a small struct directly via NATS
|
|
data = Dict("key" => "value")
|
|
env = smartsend("my.subject", data, "json")
|
|
|
|
# Send a large array using fileserver upload
|
|
data = rand(10_000_000) # ~80 MB
|
|
env = smartsend("large.data", data, "arrow")
|
|
|
|
# In another process, retrieve and deserialize:
|
|
# msg = subscribe(nats_url, "my.subject")
|
|
# env = json_to_envelope(msg.data)
|
|
# data = _deserialize_data(Base64.decode(env.payload), env.type)
|
|
```
|
|
"""
|
|
function smartsend(
|
|
subject::String, # smartreceive's subject
|
|
data::Any,
|
|
type::String = "json";
|
|
dataname=string(UUIDs.uuid4()),
|
|
nats_url::String = DEFAULT_NATS_URL,
|
|
fileserver_url::String = DEFAULT_FILESERVER_URL,
|
|
fileServerUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
|
|
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
|
|
correlation_id::Union{String, Nothing} = nothing
|
|
)
|
|
# Generate correlation ID if not provided
|
|
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
|
|
|
|
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
|
|
|
|
# Serialize data based on type
|
|
payload_bytes = _serialize_data(data, type) # Convert data to bytes based on type
|
|
|
|
payload_size = length(payload_bytes) # Calculate payload size in bytes
|
|
log_trace(cid, "Serialized payload size: $payload_size bytes") # Log payload size
|
|
|
|
# Decision: Direct vs Link
|
|
if payload_size < size_threshold # Check if payload is small enough for direct transport
|
|
# Direct path - Base64 encode and send via NATS
|
|
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
|
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
|
|
|
|
env = MessageEnvelope( # Create envelope for direct transport
|
|
correlation_id = cid,
|
|
type = type,
|
|
transport = "direct",
|
|
payload = payload_b64,
|
|
metadata = Dict("content_length" => payload_size, "format" => "arrow_ipc_stream")
|
|
)
|
|
|
|
msg_json = envelope_to_json(env) # Convert envelope to JSON
|
|
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
|
|
|
|
return env # Return the envelope for tracking
|
|
else
|
|
# Link path - Upload to HTTP server, send URL via NATS
|
|
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
|
|
|
|
# Upload to HTTP server
|
|
response = fileServerUploadHandler(fileserver_url, dataname, payload_bytes)
|
|
|
|
if response[:status] != 200 # Check if upload was successful
|
|
error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed
|
|
end
|
|
|
|
url = response[:url] # URL for the uploaded data
|
|
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
|
|
|
|
env = MessageEnvelope( # Create envelope for link transport
|
|
correlation_id = cid,
|
|
type = type,
|
|
transport = "link",
|
|
url = url,
|
|
metadata = Dict("content_length" => payload_size, "format" => "arrow_ipc_stream")
|
|
)
|
|
|
|
msg_json = envelope_to_json(env) # Convert envelope to JSON
|
|
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
|
|
|
|
return env # Return the envelope for tracking
|
|
end
|
|
end
|
|
|
|
|
|
""" _serialize_data - Serialize data according to specified format
|
|
|
|
This function serializes arbitrary Julia data into a binary representation based on the specified format.
|
|
It supports three serialization formats:
|
|
- `"json"`: Serializes data as JSON and returns the UTF-8 byte representation
|
|
- `"table"`: Serializes data as an Arrow IPC stream (table format) and returns the byte stream
|
|
- `"binary"`: Expects already-binary data (either `IOBuffer` or `Vector{UInt8}`) and returns it as bytes
|
|
|
|
The function handles format-specific serialization logic:
|
|
1. For `"json"`: Converts Julia data to JSON string, then encodes to bytes
|
|
2. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer
|
|
3. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly
|
|
|
|
# Arguments:
|
|
- `data::Any` - Data to serialize (JSON-serializable for `"json"`, table-like for `"table"`, binary for `"binary"`)
|
|
- `type::String` - Target format: `"json"`, `"table"`, or `"binary"`
|
|
|
|
# Return:
|
|
- `Vector{UInt8}` - Binary representation of the serialized data
|
|
|
|
# Throws:
|
|
- `Error` if `type` is not one of `"json"`, `"table"`, or `"binary"`
|
|
- `Error` if `type == "binary"` but `data` is neither `IOBuffer` nor `Vector{UInt8}`
|
|
|
|
# Example
|
|
```julia
|
|
using JSON, Arrow, DataFrames
|
|
|
|
# JSON serialization
|
|
json_data = Dict("name" => "Alice", "age" => 30)
|
|
json_bytes = _serialize_data(json_data, "json")
|
|
|
|
# Table serialization with a DataFrame (recommended for tabular data)
|
|
df = DataFrame(id = 1:3, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92])
|
|
table_bytes = _serialize_data(df, "table")
|
|
|
|
# Table serialization with named tuple of vectors (also supported)
|
|
nt = (id = [1, 2, 3], name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92])
|
|
table_bytes_nt = _serialize_data(nt, "table")
|
|
|
|
# Binary data (IOBuffer)
|
|
buf = IOBuffer()
|
|
write(buf, "hello")
|
|
binary_bytes = _serialize_data(buf, "binary")
|
|
|
|
# Binary data (already bytes)
|
|
binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary")
|
|
```
|
|
"""
|
|
function _serialize_data(data::Any, type::String)
|
|
if type == "json" # JSON data - serialize directly
|
|
json_str = JSON.json(data) # Convert Julia data to JSON string
|
|
return bytes(json_str) # Convert JSON string to bytes
|
|
elseif type == "table" # Table data - convert to Arrow IPC stream
|
|
io = IOBuffer() # Create in-memory buffer
|
|
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
|
|
return take!(io) # Return the buffer contents as bytes
|
|
elseif type == "binary" # Binary data - treat as binary
|
|
if isa(data, IOBuffer) # Check if data is an IOBuffer
|
|
return take!(data) # Return buffer contents as bytes
|
|
elseif isa(data, Vector{UInt8}) # Check if data is already binary
|
|
return data # Return binary data directly
|
|
else # Unsupported binary data type
|
|
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
|
end
|
|
else # Unknown type
|
|
error("Unknown type: $type")
|
|
end
|
|
end
|
|
|
|
|
|
""" Publish message to NATS
|
|
This internal function publishes a message to a NATS subject with proper
|
|
connection management and logging.
|
|
|
|
Arguments:
|
|
- `nats_url::String` - NATS server URL
|
|
- `subject::String` - NATS subject to publish to
|
|
- `message::String` - JSON message to publish
|
|
- `correlation_id::String` - Correlation ID for logging
|
|
"""
|
|
function publish_message(nats_url::String, subject::String, message::String, correlation_id::String)
|
|
conn = NATS.connect(nats_url) # Create NATS connection
|
|
try
|
|
NATS.publish(conn, subject, message) # Publish message to NATS
|
|
log_trace(correlation_id, "Message published to $subject") # Log successful publish
|
|
finally
|
|
NATS.drain(conn) # Ensure connection is closed properly
|
|
end
|
|
end
|
|
|
|
|
|
""" smartreceive - Receive and process messages from NATS
|
|
This function processes incoming NATS messages, handling both direct transport
|
|
(base64 decoded payloads) and link transport (URL-based payloads).
|
|
It deserializes the data based on the transport type and returns the result.
|
|
A HTTP file server is required along with its upload function.
|
|
|
|
Arguments:
|
|
- `msg::NATS.Message` - NATS message to process
|
|
|
|
Keyword Arguments:
|
|
- `fileserver_url::String` - HTTP file server URL for link transport (default: DEFAULT_FILESERVER_URL)
|
|
- `max_retries::Int` - Maximum retry attempts for fetching URL (default: 5)
|
|
- `base_delay::Int` - Initial delay for exponential backoff in ms (default: 100)
|
|
- `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000)
|
|
|
|
Return:
|
|
- Tuple `(data = deserialized_data, envelope = MessageEnvelope)` - Data and envelope
|
|
"""
|
|
function smartreceive(
|
|
msg::NATS.Msg;
|
|
fileserver_url::String = DEFAULT_FILESERVER_URL,
|
|
max_retries::Int = 5,
|
|
base_delay::Int = 100,
|
|
max_delay::Int = 5000
|
|
)
|
|
# Parse the envelope
|
|
env = MessageEnvelope(String(msg.payload)) # Parse NATS message data as JSON envelope
|
|
log_trace(env.correlation_id, "Processing received message") # Log message processing start
|
|
|
|
# Check transport type
|
|
if env.transport == "direct" # Direct transport - payload is in the message
|
|
log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling
|
|
|
|
# Decode Base64 payload
|
|
payload_bytes = Base64.base64decode(env.payload) # Decode base64 payload to bytes
|
|
|
|
# Deserialize based on type
|
|
data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
|
|
|
|
return (data = data, envelope = env) # Return data and envelope as tuple
|
|
elseif env.transport == "link" # Link transport - payload is at URL
|
|
log_trace(env.correlation_id, "Link transport - fetching from URL") # Log link transport handling
|
|
|
|
# Fetch with exponential backoff
|
|
data = _fetch_with_backoff(env.url, max_retries, base_delay, max_delay, env.correlation_id) # Fetch data from URL
|
|
|
|
# Deserialize based on type
|
|
result = _deserialize_data(data, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
|
|
|
|
return (data = result, envelope = env) # Return data and envelope as tuple
|
|
else # Unknown transport type
|
|
error("Unknown transport type: $(env.transport)") # Throw error for unknown transport
|
|
end
|
|
end
|
|
|
|
|
|
""" Fetch data from URL with exponential backoff
|
|
This internal function retrieves data from a URL with retry logic using
|
|
exponential backoff to handle transient failures.
|
|
|
|
Arguments:
|
|
- `url::String` - URL to fetch from
|
|
- `max_retries::Int` - Maximum number of retry attempts
|
|
- `base_delay::Int` - Initial delay in milliseconds
|
|
- `max_delay::Int` - Maximum delay in milliseconds
|
|
- `correlation_id::String` - Correlation ID for logging
|
|
|
|
Return:
|
|
- Vector{UInt8} - Fetched data as bytes
|
|
"""
|
|
function _fetch_with_backoff(
|
|
url::String,
|
|
max_retries::Int,
|
|
base_delay::Int,
|
|
max_delay::Int,
|
|
correlation_id::String
|
|
)
|
|
delay = base_delay # Initialize delay with base delay value
|
|
for attempt in 1:max_retries # Attempt to fetch data up to max_retries times
|
|
try
|
|
response = HTTP.request("GET", url) # Make HTTP GET request to URL
|
|
if response.status == 200 # Check if request was successful
|
|
log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") # Log success
|
|
return response.body # Return response body as bytes
|
|
else # Request failed
|
|
error("Failed to fetch: $(response.status)") # Throw error for non-200 status
|
|
end
|
|
catch e # Handle exceptions during fetch
|
|
log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") # Log failure
|
|
|
|
if attempt < max_retries # Only sleep if not the last attempt
|
|
sleep(delay / 1000.0) # Sleep for delay seconds (convert from ms)
|
|
delay = min(delay * 2, max_delay) # Double delay for next attempt, capped at max_delay
|
|
end
|
|
end
|
|
end
|
|
|
|
error("Failed to fetch data after $max_retries attempts") # Throw error if all attempts failed
|
|
end
|
|
|
|
|
|
""" Deserialize bytes to data based on type
|
|
This internal function converts serialized bytes back to Julia data based on type.
|
|
It handles "json" (JSON deserialization), "table" (Arrow IPC deserialization),
|
|
and "binary" (binary data).
|
|
|
|
Arguments:
|
|
- `data::Vector{UInt8}` - Serialized data as bytes
|
|
- `type::String` - Data type ("json", "table", "binary")
|
|
- `correlation_id::String` - Correlation ID for logging
|
|
- `metadata::Dict{String, Any}` - Metadata about the data
|
|
|
|
Return:
|
|
- Deserialized data (DataFrame for "table", JSON data for "json", bytes for "binary")
|
|
"""
|
|
function _deserialize_data(
|
|
data::Vector{UInt8},
|
|
type::String,
|
|
correlation_id::String,
|
|
metadata::Dict{String, Any}
|
|
)
|
|
if type == "json" # JSON data - deserialize
|
|
json_str = String(data) # Convert bytes to string
|
|
return JSON.parse(json_str) # Parse JSON string to Julia data structure
|
|
elseif type == "table" # Table data - deserialize Arrow IPC stream
|
|
io = IOBuffer(data) # Create buffer from bytes
|
|
df = Arrow.Table(io) # Read Arrow IPC format from buffer
|
|
return df # Return DataFrame
|
|
elseif type == "binary" # Binary data - return binary
|
|
return data # Return bytes directly
|
|
else # Unknown type
|
|
error("Unknown type: $type") # Throw error for unknown type
|
|
end
|
|
end
|
|
|
|
|
|
# """ Decode base64 string to bytes
|
|
# This internal function decodes a base64-encoded string back to binary data.
|
|
# It's a wrapper around Base64.decode for consistency in the module.
|
|
|
|
# Arguments:
|
|
# - `str::String` - Base64-encoded string to decode
|
|
|
|
# Return:
|
|
# - Vector{UInt8} - Decoded binary data
|
|
# """
|
|
# function base64decode(str::String)
|
|
# return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module
|
|
# end
|
|
|
|
|
|
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
|
|
|
|
This function uploads a raw byte array to a plik server in one-shot mode (no upload session).
|
|
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
|
|
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
|
|
|
|
The function workflow:
|
|
1. Obtains an upload ID and token from the server
|
|
2. Uploads the provided binary data as a file using the `X-UploadToken` header
|
|
3. Returns identifiers and download URL for the uploaded file
|
|
|
|
# Arguments:
|
|
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
|
|
- `filename::String` - Name of the file being uploaded
|
|
- `data::Vector{UInt8}` - Raw byte data of the file content
|
|
|
|
# Return:
|
|
- A named tuple with fields:
|
|
- `status::Integer` - HTTP server response status
|
|
- `uploadid::String` - ID of the one-shot upload session
|
|
- `fileid::String` - ID of the uploaded file within the session
|
|
- `url::String` - Full URL to download the uploaded file
|
|
|
|
# Example
|
|
```jldoctest
|
|
using HTTP, JSON
|
|
|
|
fileServerURL = "http://localhost:8080"
|
|
filepath = "./test.zip"
|
|
filename = basename(filepath)
|
|
filebytes = read(filepath) # read(filepath) output is raw bytes of the file
|
|
|
|
# Upload to local plik server
|
|
status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filename, filebytes)
|
|
|
|
# to download an uploaded file
|
|
curl -L -O "url"
|
|
```
|
|
""" #[x]
|
|
function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8})
|
|
|
|
# ----------------------------------------- get upload id ---------------------------------------- #
|
|
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
|
|
url_getUploadID = "$fileServerURL/upload" # URL to get upload ID
|
|
headers = ["Content-Type" => "application/json"]
|
|
body = """{ "OneShot" : true }"""
|
|
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
|
responseJson = JSON.parse(httpResponse.body)
|
|
uploadid = responseJson["id"]
|
|
uploadtoken = responseJson["uploadToken"]
|
|
|
|
# ------------------------------------------ upload file ----------------------------------------- #
|
|
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
|
|
file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload
|
|
url_upload = "$fileServerURL/file/$uploadid"
|
|
headers = ["X-UploadToken" => uploadtoken]
|
|
|
|
# Create the multipart form data
|
|
form = HTTP.Form(Dict(
|
|
"file" => file_multipart
|
|
))
|
|
|
|
# Execute the POST request
|
|
httpResponse = nothing
|
|
try
|
|
httpResponse = HTTP.post(url_upload, headers, form)
|
|
# println("Status: ", httpResponse.status)
|
|
responseJson = JSON.parse(httpResponse.body)
|
|
catch e
|
|
@error "Request failed" exception=e
|
|
end
|
|
|
|
fileid=responseJson["id"]
|
|
|
|
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
|
|
url = "$fileServerURL/file/$uploadid/$fileid/$filename"
|
|
|
|
return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url)
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" plik_oneshot_upload(fileServerURL::String, filepath::String)
|
|
|
|
Upload a single file to a plik server using one-shot mode.
|
|
|
|
This function uploads a file from disk to a plik server in one-shot mode (no upload session).
|
|
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
|
|
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
|
|
|
|
The function workflow:
|
|
1. Obtains an upload ID and token from the server
|
|
2. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header
|
|
3. Returns identifiers and download URL for the uploaded file
|
|
|
|
# Arguments:
|
|
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
|
|
- `filepath::String` - Full path to the local file to upload
|
|
|
|
# Return:
|
|
- A named tuple with fields:
|
|
- `status::Integer` - HTTP server response status
|
|
- `uploadid::String` - ID of the one-shot upload session
|
|
- `fileid::String` - ID of the uploaded file within the session
|
|
- `url::String` - Full URL to download the uploaded file
|
|
|
|
# Example
|
|
```julia
|
|
using HTTP, JSON
|
|
|
|
fileServerURL = "http://localhost:8080"
|
|
filepath = "./test.zip"
|
|
|
|
# Upload to local plik server
|
|
status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filepath)
|
|
|
|
# To download the uploaded file later (via curl as example):
|
|
curl -L -O "url"
|
|
```
|
|
""" #[x]
|
|
function plik_oneshot_upload(fileServerURL::String, filepath::String)
|
|
|
|
# ----------------------------------------- get upload id ---------------------------------------- #
|
|
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
|
|
filename = basename(filepath)
|
|
url_getUploadID = "$fileServerURL/upload" # URL to get upload ID
|
|
headers = ["Content-Type" => "application/json"]
|
|
body = """{ "OneShot" : true }"""
|
|
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
|
responseJson = JSON.parse(httpResponse.body)
|
|
|
|
uploadid = responseJson["id"]
|
|
uploadtoken = responseJson["uploadToken"]
|
|
println("uploadid = ", uploadid)
|
|
|
|
# ------------------------------------------ upload file ----------------------------------------- #
|
|
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
|
|
file_multipart = open(filepath, "r")
|
|
url_upload = "$fileServerURL/file/$uploadid"
|
|
headers = ["X-UploadToken" => uploadtoken]
|
|
|
|
# Create the multipart form data
|
|
form = HTTP.Form(Dict(
|
|
"file" => file_multipart
|
|
))
|
|
|
|
# Execute the POST request
|
|
httpResponse = nothing
|
|
try
|
|
httpResponse = HTTP.post(url_upload, headers, form)
|
|
# println("Status: ", httpResponse.status)
|
|
responseJson = JSON.parse(httpResponse.body)
|
|
catch e
|
|
@error "Request failed" exception=e
|
|
end
|
|
|
|
fileid=responseJson["id"]
|
|
|
|
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
|
|
url = "$fileServerURL/file/$uploadid/$fileid/$filename"
|
|
|
|
return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url)
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
end # module |