173 lines
5.6 KiB
Julia
173 lines
5.6 KiB
Julia
|
|
|
|
using Revise
|
|
using NATS, JSON, UUIDs, Dates
|
|
using HTTP
|
|
|
|
|
|
# workdir =
|
|
|
|
# Include the bridge module
|
|
include("./src/NATSBridge.jl")
|
|
using .NATSBridge
|
|
|
|
# Configuration
|
|
const SUBJECT = "/NATSBridge_test"
|
|
const NATS_URL = "nats.yiem.cc"
|
|
const FILESERVER_URL = "http://192.168.88.104:8080"
|
|
|
|
# Create correlation ID for tracing
|
|
correlation_id = string(uuid4())
|
|
|
|
|
|
# ------------------------------------------------------------------------------------------------ #
|
|
# test file transfer #
|
|
# ------------------------------------------------------------------------------------------------ #
|
|
|
|
# File path for large binary payload test
|
|
const FILE_PATH = "./testFile_small.zip"
|
|
const filename = basename(FILE_PATH)
|
|
|
|
# Helper: Log with correlation ID
|
|
function log_trace(message)
|
|
timestamp = Dates.now()
|
|
println("[$timestamp] [Correlation: $correlation_id] $message")
|
|
end
|
|
|
|
|
|
function _serialize_data(data::Any, type::String)
|
|
if type == "text" # Text data - convert to UTF-8 bytes
|
|
if isa(data, String)
|
|
return bytes(data) # Convert string to UTF-8 bytes
|
|
else
|
|
error("Text data must be a String")
|
|
end
|
|
elseif type == "dictionary" # JSON data - serialize directly
|
|
json_str = JSON.json(data) # Convert Julia data to JSON string
|
|
return bytes(json_str) # Convert JSON string to bytes
|
|
elseif type == "table" # Table data - convert to Arrow IPC stream
|
|
io = IOBuffer() # Create in-memory buffer
|
|
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
|
|
return take!(io) # Return the buffer contents as bytes
|
|
elseif type == "image" # Image data - treat as binary
|
|
if isa(data, Vector{UInt8})
|
|
return data # Return binary data directly
|
|
else
|
|
error("Image data must be Vector{UInt8}")
|
|
end
|
|
elseif type == "audio" # Audio data - treat as binary
|
|
if isa(data, Vector{UInt8})
|
|
return data # Return binary data directly
|
|
else
|
|
error("Audio data must be Vector{UInt8}")
|
|
end
|
|
elseif type == "video" # Video data - treat as binary
|
|
if isa(data, Vector{UInt8})
|
|
return data # Return binary data directly
|
|
else
|
|
error("Video data must be Vector{UInt8}")
|
|
end
|
|
elseif type == "binary" # Binary data - treat as binary
|
|
if isa(data, IOBuffer) # Check if data is an IOBuffer
|
|
return take!(data) # Return buffer contents as bytes
|
|
elseif isa(data, Vector{UInt8}) # Check if data is already binary
|
|
return data # Return binary data directly
|
|
else # Unsupported binary data type
|
|
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
|
end
|
|
else # Unknown type
|
|
error("Unknown type: $type")
|
|
end
|
|
end
|
|
|
|
|
|
# File upload handler for plik server
|
|
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
|
# Get upload ID
|
|
url_getUploadID = "$fileserver_url/upload"
|
|
headers = ["Content-Type" => "application/json"]
|
|
body = """{ "OneShot" : true }"""
|
|
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
|
responseJson = JSON.parse(String(httpResponse.body))
|
|
uploadid = responseJson["id"]
|
|
uploadtoken = responseJson["uploadToken"]
|
|
|
|
# Upload file
|
|
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
|
url_upload = "$fileserver_url/file/$uploadid"
|
|
headers = ["X-UploadToken" => uploadtoken]
|
|
|
|
form = HTTP.Form(Dict("file" => file_multipart))
|
|
httpResponse = HTTP.post(url_upload, headers, form)
|
|
responseJson = JSON.parse(String(httpResponse.body))
|
|
|
|
fileid = responseJson["id"]
|
|
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
|
|
|
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
|
end
|
|
|
|
function log_trace(correlation_id::String, message::String)
|
|
timestamp = Dates.now() # Get current timestamp
|
|
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
|
|
end
|
|
|
|
file_data = read(FILE_PATH)
|
|
file_size = length(file_data)
|
|
|
|
|
|
subject::String=SUBJECT
|
|
data::AbstractArray{Tuple{String, Array{UInt8, 1}, String}, 1}=[(filename, file_data, "binary")]
|
|
nats_url::String=DEFAULT_NATS_URL
|
|
fileserver_url::String=DEFAULT_FILESERVER_URL
|
|
fileserverUploadHandler::Function=plik_upload_handler
|
|
size_threshold::Int=1_000_000
|
|
correlation_id::Union{String, Nothing}=correlation_id
|
|
msg_purpose::String="chat"
|
|
sender_name::String="sender"
|
|
receiver_name::String="receiver_name"
|
|
receiver_id::String="receiver_id"
|
|
reply_to::String="reply_to"
|
|
reply_to_msg_id::String="reply_to_msg_id"
|
|
|
|
|
|
|
|
(dataname, payload_data, payload_type) = data[1]
|
|
payload_bytes = _serialize_data(payload_data, payload_type)
|
|
|
|
payload_size = length(payload_bytes) # Calculate payload size in bytes
|
|
log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size
|
|
|
|
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
|
log_trace(cid, "Using direct transport for $payload_size bytes")
|
|
|
|
payload = msgPayload_v1(
|
|
id = string(uuid4()),
|
|
dataname = dataname,
|
|
type = payload_type,
|
|
transport = "direct",
|
|
encoding = "base64",
|
|
size = payload_size,
|
|
data = payload_b64,
|
|
metadata = Dict("payload_bytes" => payload_size)
|
|
)
|
|
|
|
push!(payloads, payload)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|