add_mix_content_capability #2
176
etc.jl
176
etc.jl
@@ -1,168 +1,18 @@
|
|||||||
|
using JSON
|
||||||
|
|
||||||
|
d = Dict(
|
||||||
|
"name"=>"ton",
|
||||||
|
"age"=> 20,
|
||||||
|
"metadata" => Dict(
|
||||||
|
"height"=> 155,
|
||||||
|
"wife"=> "jane"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
using Revise
|
json_str = JSON.json(d)
|
||||||
using NATS, JSON, UUIDs, Dates
|
json_str_bytes = Vector{UInt8}(json_str)
|
||||||
using HTTP
|
json_str_2 = String(json_str_bytes)
|
||||||
|
json_obj = JSON.parse(json_str_2)
|
||||||
|
|
||||||
# workdir =
|
|
||||||
|
|
||||||
# Include the bridge module
|
|
||||||
include("./src/NATSBridge.jl")
|
|
||||||
using .NATSBridge
|
|
||||||
|
|
||||||
# Configuration
|
|
||||||
const SUBJECT = "/NATSBridge_test"
|
|
||||||
const NATS_URL = "nats.yiem.cc"
|
|
||||||
const FILESERVER_URL = "http://192.168.88.104:8080"
|
|
||||||
|
|
||||||
# Create correlation ID for tracing
|
|
||||||
correlation_id = string(uuid4())
|
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------------------------ #
|
|
||||||
# test file transfer #
|
|
||||||
# ------------------------------------------------------------------------------------------------ #
|
|
||||||
|
|
||||||
# File path for large binary payload test
|
|
||||||
const FILE_PATH = "./testFile_small.zip"
|
|
||||||
const filename = basename(FILE_PATH)
|
|
||||||
|
|
||||||
# Helper: Log with correlation ID
|
|
||||||
function log_trace(message)
|
|
||||||
timestamp = Dates.now()
|
|
||||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
function _serialize_data(data::Any, type::String)
|
|
||||||
if type == "text" # Text data - convert to UTF-8 bytes
|
|
||||||
if isa(data, String)
|
|
||||||
return bytes(data) # Convert string to UTF-8 bytes
|
|
||||||
else
|
|
||||||
error("Text data must be a String")
|
|
||||||
end
|
|
||||||
elseif type == "dictionary" # JSON data - serialize directly
|
|
||||||
json_str = JSON.json(data) # Convert Julia data to JSON string
|
|
||||||
return bytes(json_str) # Convert JSON string to bytes
|
|
||||||
elseif type == "table" # Table data - convert to Arrow IPC stream
|
|
||||||
io = IOBuffer() # Create in-memory buffer
|
|
||||||
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
|
|
||||||
return take!(io) # Return the buffer contents as bytes
|
|
||||||
elseif type == "image" # Image data - treat as binary
|
|
||||||
if isa(data, Vector{UInt8})
|
|
||||||
return data # Return binary data directly
|
|
||||||
else
|
|
||||||
error("Image data must be Vector{UInt8}")
|
|
||||||
end
|
|
||||||
elseif type == "audio" # Audio data - treat as binary
|
|
||||||
if isa(data, Vector{UInt8})
|
|
||||||
return data # Return binary data directly
|
|
||||||
else
|
|
||||||
error("Audio data must be Vector{UInt8}")
|
|
||||||
end
|
|
||||||
elseif type == "video" # Video data - treat as binary
|
|
||||||
if isa(data, Vector{UInt8})
|
|
||||||
return data # Return binary data directly
|
|
||||||
else
|
|
||||||
error("Video data must be Vector{UInt8}")
|
|
||||||
end
|
|
||||||
elseif type == "binary" # Binary data - treat as binary
|
|
||||||
if isa(data, IOBuffer) # Check if data is an IOBuffer
|
|
||||||
return take!(data) # Return buffer contents as bytes
|
|
||||||
elseif isa(data, Vector{UInt8}) # Check if data is already binary
|
|
||||||
return data # Return binary data directly
|
|
||||||
else # Unsupported binary data type
|
|
||||||
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
|
||||||
end
|
|
||||||
else # Unknown type
|
|
||||||
error("Unknown type: $type")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
# File upload handler for plik server
|
|
||||||
function plik_upload_handler(fileserver_url::String, dataname::String, data::Vector{UInt8})::Dict{String, Any}
|
|
||||||
# Get upload ID
|
|
||||||
url_getUploadID = "$fileserver_url/upload"
|
|
||||||
headers = ["Content-Type" => "application/json"]
|
|
||||||
body = """{ "OneShot" : true }"""
|
|
||||||
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
|
|
||||||
responseJson = JSON.parse(String(httpResponse.body))
|
|
||||||
uploadid = responseJson["id"]
|
|
||||||
uploadtoken = responseJson["uploadToken"]
|
|
||||||
|
|
||||||
# Upload file
|
|
||||||
file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
|
|
||||||
url_upload = "$fileserver_url/file/$uploadid"
|
|
||||||
headers = ["X-UploadToken" => uploadtoken]
|
|
||||||
|
|
||||||
form = HTTP.Form(Dict("file" => file_multipart))
|
|
||||||
httpResponse = HTTP.post(url_upload, headers, form)
|
|
||||||
responseJson = JSON.parse(String(httpResponse.body))
|
|
||||||
|
|
||||||
fileid = responseJson["id"]
|
|
||||||
url = "$fileserver_url/file/$uploadid/$fileid/$dataname"
|
|
||||||
|
|
||||||
return Dict("status" => httpResponse.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
|
|
||||||
end
|
|
||||||
|
|
||||||
function log_trace(correlation_id::String, message::String)
|
|
||||||
timestamp = Dates.now() # Get current timestamp
|
|
||||||
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
|
|
||||||
end
|
|
||||||
|
|
||||||
file_data = read(FILE_PATH)
|
|
||||||
file_size = length(file_data)
|
|
||||||
|
|
||||||
|
|
||||||
subject::String=SUBJECT
|
|
||||||
data::AbstractArray{Tuple{String, Array{UInt8, 1}, String}, 1}=[(filename, file_data, "binary")]
|
|
||||||
nats_url::String=DEFAULT_NATS_URL
|
|
||||||
fileserver_url::String=DEFAULT_FILESERVER_URL
|
|
||||||
fileserverUploadHandler::Function=plik_upload_handler
|
|
||||||
size_threshold::Int=1_000_000
|
|
||||||
correlation_id::Union{String, Nothing}=correlation_id
|
|
||||||
msg_purpose::String="chat"
|
|
||||||
sender_name::String="sender"
|
|
||||||
receiver_name::String="receiver_name"
|
|
||||||
receiver_id::String="receiver_id"
|
|
||||||
reply_to::String="reply_to"
|
|
||||||
reply_to_msg_id::String="reply_to_msg_id"
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
(dataname, payload_data, payload_type) = data[1]
|
|
||||||
payload_bytes = _serialize_data(payload_data, payload_type)
|
|
||||||
|
|
||||||
payload_size = length(payload_bytes) # Calculate payload size in bytes
|
|
||||||
log_trace(cid, "Serialized payload '$dataname' (type: $payload_type) size: $payload_size bytes") # Log payload size
|
|
||||||
|
|
||||||
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
|
||||||
log_trace(cid, "Using direct transport for $payload_size bytes")
|
|
||||||
|
|
||||||
payload = msgPayload_v1(
|
|
||||||
id = string(uuid4()),
|
|
||||||
dataname = dataname,
|
|
||||||
type = payload_type,
|
|
||||||
transport = "direct",
|
|
||||||
encoding = "base64",
|
|
||||||
size = payload_size,
|
|
||||||
data = payload_b64,
|
|
||||||
metadata = Dict("payload_bytes" => payload_size)
|
|
||||||
)
|
|
||||||
|
|
||||||
push!(payloads, payload)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -457,48 +457,66 @@ binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary")
|
|||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
function _serialize_data(data::Any, type::String)
|
function _serialize_data(data::Any, type::String)
|
||||||
if type == "text" # Text data - convert to UTF-8 bytes
|
""" Example on how JSON.jl convert: dictionary -> json string -> json string bytes -> json string -> json object
|
||||||
if isa(data, String)
|
d = Dict(
|
||||||
return bytes(data) # Convert string to UTF-8 bytes
|
"name"=>"ton",
|
||||||
else
|
"age"=> 20,
|
||||||
error("Text data must be a String")
|
"metadata" => Dict(
|
||||||
end
|
"height"=> 155,
|
||||||
elseif type == "dictionary" # JSON data - serialize directly
|
"wife"=> "jane"
|
||||||
json_str = JSON.json(data) # Convert Julia data to JSON string
|
)
|
||||||
return bytes(json_str) # Convert JSON string to bytes
|
)
|
||||||
elseif type == "table" # Table data - convert to Arrow IPC stream
|
|
||||||
io = IOBuffer() # Create in-memory buffer
|
json_str = JSON.json(d)
|
||||||
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
|
json_str_bytes = Vector{UInt8}(json_str)
|
||||||
return take!(io) # Return the buffer contents as bytes
|
json_str_2 = String(json_str_bytes)
|
||||||
elseif type == "image" # Image data - treat as binary
|
json_obj = JSON.parse(json_str_2)
|
||||||
if isa(data, Vector{UInt8})
|
"""
|
||||||
return data # Return binary data directly
|
|
||||||
else
|
if type == "text" # Text data - convert to UTF-8 bytes
|
||||||
error("Image data must be Vector{UInt8}")
|
if isa(data, String)
|
||||||
end
|
data_bytes = Vector{UInt8}(data) # Convert string to UTF-8 bytes
|
||||||
elseif type == "audio" # Audio data - treat as binary
|
return data_bytes
|
||||||
if isa(data, Vector{UInt8})
|
else
|
||||||
return data # Return binary data directly
|
error("Text data must be a String")
|
||||||
else
|
end
|
||||||
error("Audio data must be Vector{UInt8}")
|
elseif type == "dictionary" # JSON data - serialize directly
|
||||||
end
|
json_str = JSON.json(data) # Convert Julia data to JSON string
|
||||||
elseif type == "video" # Video data - treat as binary
|
json_str_bytes = Vector{UInt8}(json_str) # Convert JSON string to bytes
|
||||||
if isa(data, Vector{UInt8})
|
return json_str_bytes
|
||||||
return data # Return binary data directly
|
elseif type == "table" # Table data - convert to Arrow IPC stream
|
||||||
else
|
io = IOBuffer() # Create in-memory buffer
|
||||||
error("Video data must be Vector{UInt8}")
|
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
|
||||||
end
|
return take!(io) # Return the buffer contents as bytes
|
||||||
elseif type == "binary" # Binary data - treat as binary
|
elseif type == "image" # Image data - treat as binary
|
||||||
if isa(data, IOBuffer) # Check if data is an IOBuffer
|
if isa(data, Vector{UInt8})
|
||||||
return take!(data) # Return buffer contents as bytes
|
return data # Return binary data directly
|
||||||
elseif isa(data, Vector{UInt8}) # Check if data is already binary
|
else
|
||||||
return data # Return binary data directly
|
error("Image data must be Vector{UInt8}")
|
||||||
else # Unsupported binary data type
|
end
|
||||||
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
elseif type == "audio" # Audio data - treat as binary
|
||||||
end
|
if isa(data, Vector{UInt8})
|
||||||
else # Unknown type
|
return data # Return binary data directly
|
||||||
error("Unknown type: $type")
|
else
|
||||||
end
|
error("Audio data must be Vector{UInt8}")
|
||||||
|
end
|
||||||
|
elseif type == "video" # Video data - treat as binary
|
||||||
|
if isa(data, Vector{UInt8})
|
||||||
|
return data # Return binary data directly
|
||||||
|
else
|
||||||
|
error("Video data must be Vector{UInt8}")
|
||||||
|
end
|
||||||
|
elseif type == "binary" # Binary data - treat as binary
|
||||||
|
if isa(data, IOBuffer) # Check if data is an IOBuffer
|
||||||
|
return take!(data) # Return buffer contents as bytes
|
||||||
|
elseif isa(data, Vector{UInt8}) # Check if data is already binary
|
||||||
|
return data # Return binary data directly
|
||||||
|
else # Unsupported binary data type
|
||||||
|
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
|
||||||
|
end
|
||||||
|
else # Unknown type
|
||||||
|
error("Unknown type: $type")
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
@@ -675,7 +693,7 @@ function _deserialize_data(
|
|||||||
return String(data) # Convert bytes to string
|
return String(data) # Convert bytes to string
|
||||||
elseif type == "dictionary" # JSON data - deserialize
|
elseif type == "dictionary" # JSON data - deserialize
|
||||||
json_str = String(data) # Convert bytes to string
|
json_str = String(data) # Convert bytes to string
|
||||||
return JSON.parse(json_str) # Parse JSON string to Julia data structure
|
return JSON.parse(json_str) # Parse JSON string to JSON object
|
||||||
elseif type == "table" # Table data - deserialize Arrow IPC stream
|
elseif type == "table" # Table data - deserialize Arrow IPC stream
|
||||||
io = IOBuffer(data) # Create buffer from bytes
|
io = IOBuffer(data) # Create buffer from bytes
|
||||||
df = Arrow.Table(io) # Read Arrow IPC format from buffer
|
df = Arrow.Table(io) # Read Arrow IPC format from buffer
|
||||||
|
|||||||
@@ -1,97 +0,0 @@
|
|||||||
#!/usr/bin/env julia
|
|
||||||
# Test script for dictionary transfer from Julia serviceA to Julia serviceB
|
|
||||||
# Demonstrates the "Command & Control" scenario (small dictionary) using NATSBridge
|
|
||||||
#
|
|
||||||
# This is serviceB - the receiver that receives a dummy dictionary from serviceA
|
|
||||||
|
|
||||||
using UUIDs
|
|
||||||
using JSON
|
|
||||||
using Dates
|
|
||||||
|
|
||||||
# Include the NATSBridge module
|
|
||||||
include("../src/NATSBridge.jl")
|
|
||||||
|
|
||||||
# Configuration
|
|
||||||
const SUBJECT = "/NATSBridge_dict_test"
|
|
||||||
const NATS_URL = "nats.yiem.cc"
|
|
||||||
|
|
||||||
# Helper: Log with correlation ID
|
|
||||||
function log_trace(correlation_id::String, message::String)
|
|
||||||
timestamp = Dates.now()
|
|
||||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
# Receiver: Receive and process dictionary from serviceA
|
|
||||||
function receive_dictionary()
|
|
||||||
# Connect to NATS
|
|
||||||
conn = NATS.connect(NATS_URL)
|
|
||||||
|
|
||||||
# Subscribe to the subject
|
|
||||||
subscription = NATS.subscribe(conn, SUBJECT)
|
|
||||||
|
|
||||||
println("Listening for dictionary messages on '$SUBJECT'...")
|
|
||||||
println("Press Ctrl+C to stop listening.")
|
|
||||||
|
|
||||||
# Listen for messages
|
|
||||||
while true
|
|
||||||
# Wait for a message with a 1-second timeout
|
|
||||||
msg = NATS.waitfor(subscription, 1.0)
|
|
||||||
|
|
||||||
if msg !== nothing
|
|
||||||
# Extract correlation ID for logging
|
|
||||||
json_data = JSON.parse(String(msg.payload))
|
|
||||||
cid = json_data["correlationId"]
|
|
||||||
|
|
||||||
log_trace(cid, "Received message from $(json_data["senderName"])")
|
|
||||||
|
|
||||||
# Process the message using smartreceive
|
|
||||||
payloads = NATSBridge.smartreceive(
|
|
||||||
msg;
|
|
||||||
fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, cid) ->
|
|
||||||
NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, cid),
|
|
||||||
max_retries = 5,
|
|
||||||
base_delay = 100,
|
|
||||||
max_delay = 5000
|
|
||||||
)
|
|
||||||
|
|
||||||
log_trace(cid, "Processed $(length(payloads)) payload(s)")
|
|
||||||
|
|
||||||
# Process each payload
|
|
||||||
for (dataname, data, payload_type) in payloads
|
|
||||||
log_trace(cid, "Payload '$dataname' type: $payload_type")
|
|
||||||
|
|
||||||
# Handle dictionary type
|
|
||||||
if payload_type == "dictionary"
|
|
||||||
println("\nReceived dictionary:")
|
|
||||||
println(JSON.json(data, 2))
|
|
||||||
|
|
||||||
# Extract and display specific fields
|
|
||||||
if isa(data, Dict)
|
|
||||||
command = get(data, "command", "unknown")
|
|
||||||
println("\nCommand: $command")
|
|
||||||
|
|
||||||
# Optionally send acknowledgment
|
|
||||||
reply_to = get(json_data, "replyTo", "")
|
|
||||||
if !isempty(reply_to)
|
|
||||||
log_trace(cid, "Reply to: $reply_to")
|
|
||||||
# Could send ACK here
|
|
||||||
end
|
|
||||||
end
|
|
||||||
else
|
|
||||||
println("\nReceived non-dictionary payload: $dataname (type: $payload_type)")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
# Run the receiver
|
|
||||||
println("Starting dictionary receiver...")
|
|
||||||
println("Subject: $SUBJECT")
|
|
||||||
println("NATS URL: $NATS_URL")
|
|
||||||
println("="^50)
|
|
||||||
|
|
||||||
# Run receiver (this will block and listen for messages)
|
|
||||||
receive_dictionary()
|
|
||||||
@@ -1,90 +0,0 @@
|
|||||||
#!/usr/bin/env julia
|
|
||||||
# Test script for dictionary transfer from Julia serviceA to Julia serviceB
|
|
||||||
# Demonstrates the "Command & Control" scenario (small dictionary) using NATSBridge
|
|
||||||
#
|
|
||||||
# This is serviceA - the sender that sends a dummy dictionary to serviceB
|
|
||||||
|
|
||||||
using UUIDs
|
|
||||||
using JSON
|
|
||||||
using Dates
|
|
||||||
|
|
||||||
# Include the NATSBridge module
|
|
||||||
include("../src/NATSBridge.jl")
|
|
||||||
|
|
||||||
# Configuration
|
|
||||||
const SUBJECT = "/NATSBridge_dict_test"
|
|
||||||
const NATS_URL = "nats.yiem.cc"
|
|
||||||
const FILESERVER_URL = "http://192.168.88.104:8080"
|
|
||||||
|
|
||||||
# Create correlation ID for tracing
|
|
||||||
correlation_id = string(uuid4())
|
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------------------------ #
|
|
||||||
# dictionary sender #
|
|
||||||
# ------------------------------------------------------------------------------------------------ #
|
|
||||||
|
|
||||||
|
|
||||||
# Helper: Log with correlation ID
|
|
||||||
function log_trace(message)
|
|
||||||
timestamp = Dates.now()
|
|
||||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
# Sender: Send a dummy dictionary to serviceB
|
|
||||||
function send_dictionary()
|
|
||||||
# Create a dummy dictionary to send
|
|
||||||
dummy_dict = Dict(
|
|
||||||
"command" => "start_simulation",
|
|
||||||
"simulation_id" => string(uuid4()),
|
|
||||||
"duration_seconds" => 60,
|
|
||||||
"parameters" => Dict(
|
|
||||||
"temperature" => 25.5,
|
|
||||||
"pressure" => 101.3,
|
|
||||||
"active" => true,
|
|
||||||
"tags" => ["test", "simulation", "julia_to_julia"]
|
|
||||||
),
|
|
||||||
"metadata" => Dict(
|
|
||||||
"sender" => "serviceA",
|
|
||||||
"timestamp" => string(Dates.now())
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Send the dictionary using smartsend with type="dictionary"
|
|
||||||
# API: smartsend(subject, [(dataname, data, type), ...]; keywords...)
|
|
||||||
env = NATSBridge.smartsend(
|
|
||||||
SUBJECT,
|
|
||||||
[("dummy_dict", dummy_dict, "dictionary")], # List of (dataname, data, type) tuples
|
|
||||||
nats_url = NATS_URL,
|
|
||||||
fileserver_url = FILESERVER_URL,
|
|
||||||
size_threshold = 1_000_000, # 1MB threshold - dictionary will use direct transport
|
|
||||||
correlation_id = correlation_id,
|
|
||||||
msg_purpose = "chat",
|
|
||||||
sender_name = "serviceA",
|
|
||||||
receiver_name = "serviceB",
|
|
||||||
reply_to = "",
|
|
||||||
reply_to_msg_id = ""
|
|
||||||
)
|
|
||||||
|
|
||||||
log_trace("Sent dictionary via $(env.payloads[1].transport) transport")
|
|
||||||
log_trace("Payload type: $(env.payloads[1].type)")
|
|
||||||
log_trace("Envelope correlationId: $(env.correlationId)")
|
|
||||||
|
|
||||||
# Display the sent dictionary
|
|
||||||
println("\nSent dictionary content:")
|
|
||||||
println(JSON.json(dummy_dict, 2))
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
# Run the test
|
|
||||||
println("Starting dictionary transfer test...")
|
|
||||||
println("Correlation ID: $correlation_id")
|
|
||||||
println("Subject: $SUBJECT")
|
|
||||||
println("NATS URL: $NATS_URL")
|
|
||||||
|
|
||||||
# Run sender
|
|
||||||
println("\n--- Sending dictionary ---")
|
|
||||||
send_dictionary()
|
|
||||||
|
|
||||||
println("\nTest completed.")
|
|
||||||
@@ -1,131 +0,0 @@
|
|||||||
using NATSBridge
|
|
||||||
using UUIDs
|
|
||||||
using JSON
|
|
||||||
using DataFrames
|
|
||||||
using Dates
|
|
||||||
|
|
||||||
# Include the NATSBridge module
|
|
||||||
include("src/NATSBridge.jl")
|
|
||||||
|
|
||||||
# Constants
|
|
||||||
const NATS_URL = "nats://localhost:4222"
|
|
||||||
const FILESERVER_URL = "http://localhost:8080"
|
|
||||||
|
|
||||||
# Main chat receiver function for scenario 6
|
|
||||||
function chat_receiver(
|
|
||||||
subject::String = "/chat/test";
|
|
||||||
nats_url::String = NATS_URL,
|
|
||||||
fileserver_url::String = FILESERVER_URL,
|
|
||||||
duration::Int = 60, # Duration in seconds to listen for messages
|
|
||||||
max_messages::Int = 100 # Maximum number of messages to receive
|
|
||||||
)
|
|
||||||
println("\n=== Chat Receiver (ServiceB) ===")
|
|
||||||
println("Subject: $subject")
|
|
||||||
println("NATS URL: $nats_url")
|
|
||||||
println("Fileserver URL: $fileserver_url")
|
|
||||||
println("Listening duration: $(duration)s")
|
|
||||||
println("Max messages: $max_messages")
|
|
||||||
println("="^50)
|
|
||||||
|
|
||||||
# Create a handler for the fileserver download
|
|
||||||
# This will be passed to smartreceive as fileserverDownloadHandler parameter
|
|
||||||
fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, correlation_id) ->
|
|
||||||
NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id)
|
|
||||||
|
|
||||||
# Connect to NATS and subscribe to the chat subject
|
|
||||||
conn = NATS.connect(nats_url)
|
|
||||||
|
|
||||||
# Track received messages
|
|
||||||
message_count = 0
|
|
||||||
total_payloads = 0
|
|
||||||
|
|
||||||
# Subscribe to the subject
|
|
||||||
subscription = NATS.subscribe(conn, subject)
|
|
||||||
|
|
||||||
@info "Listening for chat messages on '$subject'..."
|
|
||||||
|
|
||||||
# Listen for messages for the specified duration
|
|
||||||
timeout = time() + duration
|
|
||||||
|
|
||||||
while time() < timeout && message_count < max_messages
|
|
||||||
# Wait for a message with a short timeout
|
|
||||||
msg = NATS.waitfor(subscription, 1.0) # 1 second timeout
|
|
||||||
|
|
||||||
if msg !== nothing
|
|
||||||
message_count += 1
|
|
||||||
println("\n--- Message $(message_count) Received ---")
|
|
||||||
|
|
||||||
# Process the message using smartreceive
|
|
||||||
payloads = NATSBridge.smartreceive(
|
|
||||||
msg;
|
|
||||||
fileserverDownloadHandler = fileserverDownloadHandler,
|
|
||||||
max_retries = 5,
|
|
||||||
base_delay = 100,
|
|
||||||
max_delay = 5000
|
|
||||||
)
|
|
||||||
|
|
||||||
println("Payloads received: $(length(payloads))")
|
|
||||||
total_payloads += length(payloads)
|
|
||||||
|
|
||||||
# Process each payload
|
|
||||||
for (dataname, data, payload_type) in payloads
|
|
||||||
println(" - $dataname (type: $payload_type)")
|
|
||||||
|
|
||||||
# Handle different types differently for display
|
|
||||||
if payload_type == "text"
|
|
||||||
println(" Text content: $(String(data))")
|
|
||||||
elseif payload_type == "dictionary"
|
|
||||||
println(" Dictionary content: $(JSON.json(data, 2))")
|
|
||||||
elseif payload_type == "table"
|
|
||||||
println(" Table content: $(size(data, 1)) rows, $(size(data, 2)) columns")
|
|
||||||
if size(data, 1) <= 10
|
|
||||||
println(" Sample: $(DataFrames.show(data))")
|
|
||||||
end
|
|
||||||
elseif payload_type == "image"
|
|
||||||
println(" Image: $(length(data)) bytes")
|
|
||||||
elseif payload_type == "audio"
|
|
||||||
println(" Audio: $(length(data)) bytes")
|
|
||||||
elseif payload_type == "video"
|
|
||||||
println(" Video: $(length(data)) bytes")
|
|
||||||
elseif payload_type == "binary"
|
|
||||||
println(" Binary: $(length(data)) bytes")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# Extract correlation ID from the message
|
|
||||||
json_data = JSON.parse(String(msg.payload))
|
|
||||||
println(" Correlation ID: $(json_data["correlationId"])")
|
|
||||||
println(" Message ID: $(json_data["msgId"])")
|
|
||||||
|
|
||||||
# Optional: Send ACK reply
|
|
||||||
reply_to = get(json_data, "replyTo", "")
|
|
||||||
if !isempty(reply_to)
|
|
||||||
println(" Reply to: $reply_to")
|
|
||||||
# Could send an ACK message here
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
println("\n=== Chat Receiver Summary ===")
|
|
||||||
println("Total messages received: $message_count")
|
|
||||||
println("Total payloads processed: $total_payloads")
|
|
||||||
println("Average payloads per message: $(round(total_payloads / max(message_count, 1), digits=2))")
|
|
||||||
println("="^50)
|
|
||||||
|
|
||||||
# Cleanup
|
|
||||||
NATS.drain(conn)
|
|
||||||
|
|
||||||
return message_count
|
|
||||||
end
|
|
||||||
|
|
||||||
# Example usage
|
|
||||||
if abspath(PROGRAM_FILE) == @__FILE__
|
|
||||||
# Parse command line arguments
|
|
||||||
if length(ARGS) >= 1
|
|
||||||
subject = ARGS[1]
|
|
||||||
else
|
|
||||||
subject = "/chat/test"
|
|
||||||
end
|
|
||||||
|
|
||||||
chat_receiver(subject)
|
|
||||||
end
|
|
||||||
@@ -1,219 +0,0 @@
|
|||||||
using NATSBridge
|
|
||||||
using UUIDs
|
|
||||||
using JSON
|
|
||||||
using DataFrames
|
|
||||||
using Random
|
|
||||||
|
|
||||||
# Include the NATSBridge module
|
|
||||||
include("src/NATSBridge.jl")
|
|
||||||
|
|
||||||
# Constants
|
|
||||||
const NATS_URL = "nats://localhost:4222"
|
|
||||||
const FILESERVER_URL = "http://localhost:8080"
|
|
||||||
|
|
||||||
# Chat message types for scenario 6
|
|
||||||
const CHAT_TYPES = ["text", "dictionary", "table", "image", "audio", "video", "binary"]
|
|
||||||
|
|
||||||
# Helper function to create sample text data
|
|
||||||
function create_text_payload()
|
|
||||||
texts = [
|
|
||||||
"Hello!",
|
|
||||||
"How are you doing today?",
|
|
||||||
"This is a test message.",
|
|
||||||
"Chat with mixed content is fun!",
|
|
||||||
"Short text payload."
|
|
||||||
]
|
|
||||||
return (rand(texts), "text")
|
|
||||||
end
|
|
||||||
|
|
||||||
# Helper function to create sample dictionary data
|
|
||||||
function create_dictionary_payload()
|
|
||||||
dictionaries = [
|
|
||||||
Dict("greeting" => "Hello", "status" => "active", "count" => 42),
|
|
||||||
Dict("user" => "alice", "message_id" => string(uuid4()), "timestamp" => Dates.now().iso8601),
|
|
||||||
Dict("config" => Dict("theme" => "dark", "notifications" => true))
|
|
||||||
]
|
|
||||||
return (rand(dictionaries), "dictionary")
|
|
||||||
end
|
|
||||||
|
|
||||||
# Helper function to create sample table data (DataFrame)
|
|
||||||
function create_table_payload()
|
|
||||||
# Small DataFrame
|
|
||||||
df_small = DataFrame(
|
|
||||||
id = 1:5,
|
|
||||||
name = ["Alice", "Bob", "Charlie", "Diana", "Eve"],
|
|
||||||
score = [95, 88, 92, 78, 85],
|
|
||||||
status = ["active", "active", "inactive", "active", "pending"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Large DataFrame (> 1MB)
|
|
||||||
df_large = DataFrame(
|
|
||||||
id = 1:50000,
|
|
||||||
name = ["User_$i" for i in 1:50000],
|
|
||||||
value = rand(50000) .* 100,
|
|
||||||
status = ["active", "inactive", "pending"][rand(1:3, 50000)]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Randomly choose small or large
|
|
||||||
return (rand([df_small, df_large]), "table")
|
|
||||||
end
|
|
||||||
|
|
||||||
# Helper function to create sample image data (Vector{UInt8})
|
|
||||||
function create_image_payload()
|
|
||||||
# Create random image bytes (small)
|
|
||||||
small_image = rand(UInt8, 100_000) # ~100KB
|
|
||||||
# Large image (> 1MB)
|
|
||||||
large_image = rand(UInt8, 2_000_000) # ~2MB
|
|
||||||
|
|
||||||
return (rand([small_image, large_image]), "image")
|
|
||||||
end
|
|
||||||
|
|
||||||
# Helper function to create sample audio data (Vector{UInt8})
|
|
||||||
function create_audio_payload()
|
|
||||||
# Create random audio bytes (small)
|
|
||||||
small_audio = rand(UInt8, 150_000) # ~150KB
|
|
||||||
# Large audio (> 1MB)
|
|
||||||
large_audio = rand(UInt8, 3_000_000) # ~3MB
|
|
||||||
|
|
||||||
return (rand([small_audio, large_audio]), "audio")
|
|
||||||
end
|
|
||||||
|
|
||||||
# Helper function to create sample video data (Vector{UInt8})
|
|
||||||
function create_video_payload()
|
|
||||||
# Create random video bytes (small)
|
|
||||||
small_video = rand(UInt8, 200_000) # ~200KB
|
|
||||||
# Large video (> 1MB)
|
|
||||||
large_video = rand(UInt8, 5_000_000) # ~5MB
|
|
||||||
|
|
||||||
return (rand([small_video, large_video]), "video")
|
|
||||||
end
|
|
||||||
|
|
||||||
# Helper function to create sample binary data (Vector{UInt8})
|
|
||||||
function create_binary_payload()
|
|
||||||
# Create random binary bytes (small)
|
|
||||||
small_binary = rand(UInt8, 50_000) # ~50KB
|
|
||||||
# Large binary (> 1MB)
|
|
||||||
large_binary = rand(UInt8, 1_500_000) # ~1.5MB
|
|
||||||
|
|
||||||
return (rand([small_binary, large_binary]), "binary")
|
|
||||||
end
|
|
||||||
|
|
||||||
# Main chat sender function for scenario 6
|
|
||||||
function chat_sender(
|
|
||||||
subject::String = "/chat/test",
|
|
||||||
num_messages::Int = 10;
|
|
||||||
nats_url::String = NATS_URL,
|
|
||||||
fileserver_url::String = FILESERVER_URL
|
|
||||||
)
|
|
||||||
println("\n=== Chat Sender (ServiceA) ===")
|
|
||||||
println("Subject: $subject")
|
|
||||||
println("Number of messages: $num_messages")
|
|
||||||
println("NATS URL: $nats_url")
|
|
||||||
println("Fileserver URL: $fileserver_url")
|
|
||||||
println("="^50)
|
|
||||||
|
|
||||||
# Create a handler for the fileserver upload
|
|
||||||
# This will be passed to smartsend as fileserverUploadHandler parameter
|
|
||||||
fileserverUploadHandler = (url, dataname, data) -> NATSBridge.plik_oneshot_upload(url, dataname, data)
|
|
||||||
|
|
||||||
for i in 1:num_messages
|
|
||||||
# Generate random chat message with mixed content
|
|
||||||
# Each message can have 1-5 payloads with different types
|
|
||||||
num_payloads = rand(1:5)
|
|
||||||
|
|
||||||
# Create payloads list
|
|
||||||
payloads = Tuple{String, Any, String}[]
|
|
||||||
|
|
||||||
# Track if we need to include text (required for chat)
|
|
||||||
has_text = false
|
|
||||||
|
|
||||||
# Create random payloads
|
|
||||||
for j in 1:num_payloads
|
|
||||||
# Randomly select a payload type
|
|
||||||
payload_type = rand(CHAT_TYPES)
|
|
||||||
|
|
||||||
# Create the payload based on type
|
|
||||||
payload_data, payload_type = if payload_type == "text"
|
|
||||||
create_text_payload()
|
|
||||||
elseif payload_type == "dictionary"
|
|
||||||
create_dictionary_payload()
|
|
||||||
elseif payload_type == "table"
|
|
||||||
create_table_payload()
|
|
||||||
elseif payload_type == "image"
|
|
||||||
create_image_payload()
|
|
||||||
elseif payload_type == "audio"
|
|
||||||
create_audio_payload()
|
|
||||||
elseif payload_type == "video"
|
|
||||||
create_video_payload()
|
|
||||||
elseif payload_type == "binary"
|
|
||||||
create_binary_payload()
|
|
||||||
end
|
|
||||||
|
|
||||||
# Ensure at least one text payload
|
|
||||||
if payload_type == "text"
|
|
||||||
has_text = true
|
|
||||||
end
|
|
||||||
|
|
||||||
push!(payloads, ("payload_$j", payload_data, payload_type))
|
|
||||||
end
|
|
||||||
|
|
||||||
# Ensure at least one text payload exists
|
|
||||||
if !has_text
|
|
||||||
text_data, text_type = create_text_payload()
|
|
||||||
push!(payloads, ("message_text", text_data, text_type))
|
|
||||||
end
|
|
||||||
|
|
||||||
# Generate chat message metadata
|
|
||||||
chat_metadata = Dict(
|
|
||||||
"message_index" => i,
|
|
||||||
"timestamp" => Dates.now().iso8601,
|
|
||||||
"sender" => "serviceA",
|
|
||||||
"payload_count" => length(payloads)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Send the chat message with mixed content
|
|
||||||
println("\n--- Message $i ---")
|
|
||||||
println("Payloads: $(length(payloads))")
|
|
||||||
for (dataname, data, type) in payloads
|
|
||||||
println(" - $dataname (type: $type)")
|
|
||||||
end
|
|
||||||
|
|
||||||
env = NATSBridge.smartsend(
|
|
||||||
subject,
|
|
||||||
payloads;
|
|
||||||
nats_url = nats_url,
|
|
||||||
fileserver_url = fileserver_url,
|
|
||||||
fileserverUploadHandler = fileserverUploadHandler,
|
|
||||||
size_threshold = 1_000_000, # 1MB threshold
|
|
||||||
correlation_id = string(uuid4()),
|
|
||||||
msg_purpose = "chat",
|
|
||||||
sender_name = "serviceA",
|
|
||||||
receiver_name = "serviceB",
|
|
||||||
reply_to = "/chat/reply",
|
|
||||||
reply_to_msg_id = ""
|
|
||||||
)
|
|
||||||
|
|
||||||
println("Envelope created with correlationId: $(env.correlationId)")
|
|
||||||
println("Message published successfully!")
|
|
||||||
|
|
||||||
# Wait a bit between messages
|
|
||||||
sleep(rand(0.1:0.3))
|
|
||||||
end
|
|
||||||
|
|
||||||
println("\n=== Chat Sender Complete ===")
|
|
||||||
return true
|
|
||||||
end
|
|
||||||
|
|
||||||
# Example usage
|
|
||||||
if abspath(PROGRAM_FILE) == @__FILE__
|
|
||||||
# Parse command line arguments
|
|
||||||
if length(ARGS) >= 2
|
|
||||||
subject = ARGS[1]
|
|
||||||
num_messages = parse(Int, ARGS[2])
|
|
||||||
else
|
|
||||||
subject = "/chat/test"
|
|
||||||
num_messages = 5
|
|
||||||
end
|
|
||||||
|
|
||||||
chat_sender(subject, num_messages)
|
|
||||||
end
|
|
||||||
@@ -1,99 +0,0 @@
|
|||||||
#!/usr/bin/env julia
|
|
||||||
# Test script for DataFrame transfer from Julia serviceA to Julia serviceB
|
|
||||||
# Demonstrates the "Selection" scenario (small Arrow table) using NATSBridge
|
|
||||||
#
|
|
||||||
# This is serviceB - the receiver that receives a dummy DataFrame from serviceA
|
|
||||||
|
|
||||||
using NATSBridge
|
|
||||||
using UUIDs
|
|
||||||
using DataFrames
|
|
||||||
using JSON
|
|
||||||
using Dates
|
|
||||||
|
|
||||||
# Include the NATSBridge module
|
|
||||||
include("src/NATSBridge.jl")
|
|
||||||
|
|
||||||
# Configuration
|
|
||||||
const SUBJECT = "/NATSBridge_table_test"
|
|
||||||
const NATS_URL = "nats://localhost:4222"
|
|
||||||
|
|
||||||
# Helper: Log with correlation ID
|
|
||||||
function log_trace(correlation_id::String, message::String)
|
|
||||||
timestamp = Dates.now()
|
|
||||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
# Receiver: Receive and process DataFrame from serviceA
|
|
||||||
function receive_dataframe()
|
|
||||||
# Connect to NATS
|
|
||||||
conn = NATS.connect(NATS_URL)
|
|
||||||
|
|
||||||
# Subscribe to the subject
|
|
||||||
subscription = NATS.subscribe(conn, SUBJECT)
|
|
||||||
|
|
||||||
println("Listening for DataFrame messages on '$SUBJECT'...")
|
|
||||||
println("Press Ctrl+C to stop listening.")
|
|
||||||
|
|
||||||
# Listen for messages
|
|
||||||
while true
|
|
||||||
# Wait for a message with a 1-second timeout
|
|
||||||
msg = NATS.waitfor(subscription, 1.0)
|
|
||||||
|
|
||||||
if msg !== nothing
|
|
||||||
# Extract correlation ID for logging
|
|
||||||
json_data = JSON.parse(String(msg.payload))
|
|
||||||
cid = json_data["correlationId"]
|
|
||||||
|
|
||||||
log_trace(cid, "Received message from $(json_data["senderName"])")
|
|
||||||
|
|
||||||
# Process the message using smartreceive
|
|
||||||
payloads = NATSBridge.smartreceive(
|
|
||||||
msg;
|
|
||||||
fileserverDownloadHandler = (url, max_retries, base_delay, max_delay, cid) ->
|
|
||||||
NATSBridge._fetch_with_backoff(url, max_retries, base_delay, max_delay, cid),
|
|
||||||
max_retries = 5,
|
|
||||||
base_delay = 100,
|
|
||||||
max_delay = 5000
|
|
||||||
)
|
|
||||||
|
|
||||||
log_trace(cid, "Processed $(length(payloads)) payload(s)")
|
|
||||||
|
|
||||||
# Process each payload
|
|
||||||
for (dataname, data, payload_type) in payloads
|
|
||||||
log_trace(cid, "Payload '$dataname' type: $payload_type")
|
|
||||||
|
|
||||||
# Handle table (DataFrame) type
|
|
||||||
if payload_type == "table"
|
|
||||||
println("\nReceived DataFrame:")
|
|
||||||
println(data)
|
|
||||||
|
|
||||||
# Display DataFrame dimensions
|
|
||||||
println("\nDataFrame dimensions: $(size(data, 1)) rows x $(size(data, 2)) columns")
|
|
||||||
|
|
||||||
# Display column names
|
|
||||||
println("Column names: $(names(data))")
|
|
||||||
|
|
||||||
# Optionally send acknowledgment
|
|
||||||
reply_to = get(json_data, "replyTo", "")
|
|
||||||
if !isempty(reply_to)
|
|
||||||
log_trace(cid, "Reply to: $reply_to")
|
|
||||||
# Could send ACK here
|
|
||||||
end
|
|
||||||
else
|
|
||||||
println("\nReceived non-table payload: $dataname (type: $payload_type)")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
# Run the receiver
|
|
||||||
println("Starting DataFrame receiver...")
|
|
||||||
println("Subject: $SUBJECT")
|
|
||||||
println("NATS URL: $NATS_URL")
|
|
||||||
println("="^50)
|
|
||||||
|
|
||||||
# Run receiver (this will block and listen for messages)
|
|
||||||
receive_dataframe()
|
|
||||||
@@ -1,97 +0,0 @@
|
|||||||
#!/usr/bin/env julia
|
|
||||||
# Test script for DataFrame transfer from Julia serviceA to Julia serviceB
|
|
||||||
# Demonstrates the "Selection" scenario (small Arrow table) using NATSBridge
|
|
||||||
#
|
|
||||||
# This is serviceA - the sender that sends a dummy DataFrame to serviceB
|
|
||||||
|
|
||||||
using NATSBridge
|
|
||||||
using UUIDs
|
|
||||||
using DataFrames
|
|
||||||
using JSON
|
|
||||||
using Dates
|
|
||||||
|
|
||||||
# Include the NATSBridge module
|
|
||||||
include("src/NATSBridge.jl")
|
|
||||||
|
|
||||||
# Configuration
|
|
||||||
const SUBJECT = "/NATSBridge_table_test"
|
|
||||||
const NATS_URL = "nats://localhost:4222"
|
|
||||||
const FILESERVER_URL = "http://localhost:8080"
|
|
||||||
|
|
||||||
# Create correlation ID for tracing
|
|
||||||
correlation_id = string(uuid4())
|
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------------------------ #
|
|
||||||
# DataFrame sender #
|
|
||||||
# ------------------------------------------------------------------------------------------------ #
|
|
||||||
|
|
||||||
|
|
||||||
# Helper: Log with correlation ID
|
|
||||||
function log_trace(message)
|
|
||||||
timestamp = Dates.now()
|
|
||||||
println("[$timestamp] [Correlation: $correlation_id] $message")
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
# Sender: Send a dummy DataFrame to serviceB
|
|
||||||
function send_dataframe()
|
|
||||||
# Create a dummy DataFrame (table) to send
|
|
||||||
# This simulates a selection scenario where Julia server generates options for user selection
|
|
||||||
dummy_df = DataFrame(
|
|
||||||
id = 1:10,
|
|
||||||
name = ["Option A", "Option B", "Option C", "Option D", "Option E",
|
|
||||||
"Option F", "Option G", "Option H", "Option I", "Option J"],
|
|
||||||
score = [95, 88, 92, 78, 85, 90, 87, 93, 89, 91],
|
|
||||||
category = ["A", "B", "A", "C", "B", "A", "C", "A", "B", "C"],
|
|
||||||
active = [true, true, false, true, true, false, true, true, true, false]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Calculate approximate size
|
|
||||||
df_size = sizeof(dummy_df)
|
|
||||||
log_trace("DataFrame size: $(df_size / 1024) KB")
|
|
||||||
|
|
||||||
# Check if DataFrame is small enough for direct transport (< 1MB)
|
|
||||||
if df_size < 1_000_000
|
|
||||||
log_trace("Using direct transport (size < 1MB)")
|
|
||||||
else
|
|
||||||
log_trace("Using link transport (size >= 1MB)")
|
|
||||||
end
|
|
||||||
|
|
||||||
# Send the DataFrame using smartsend with type="table"
|
|
||||||
# API: smartsend(subject, [(dataname, data, type), ...]; keywords...)
|
|
||||||
env = NATSBridge.smartsend(
|
|
||||||
SUBJECT,
|
|
||||||
[("selection_table", dummy_df, "table")], # List of (dataname, data, type) tuples
|
|
||||||
nats_url = NATS_URL,
|
|
||||||
fileserver_url = FILESERVER_URL,
|
|
||||||
size_threshold = 1_000_000, # 1MB threshold
|
|
||||||
correlation_id = correlation_id,
|
|
||||||
msg_purpose = "chat",
|
|
||||||
sender_name = "serviceA",
|
|
||||||
receiver_name = "serviceB",
|
|
||||||
reply_to = "",
|
|
||||||
reply_to_msg_id = ""
|
|
||||||
)
|
|
||||||
|
|
||||||
log_trace("Sent DataFrame via $(env.payloads[1].transport) transport")
|
|
||||||
log_trace("Payload type: $(env.payloads[1].type)")
|
|
||||||
log_trace("Envelope correlationId: $(env.correlationId)")
|
|
||||||
|
|
||||||
# Display the sent DataFrame
|
|
||||||
println("\nSent DataFrame content:")
|
|
||||||
println(dummy_df)
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
# Run the test
|
|
||||||
println("Starting DataFrame transfer test...")
|
|
||||||
println("Correlation ID: $correlation_id")
|
|
||||||
println("Subject: $SUBJECT")
|
|
||||||
println("NATS URL: $NATS_URL")
|
|
||||||
|
|
||||||
# Run sender
|
|
||||||
println("\n--- Sending DataFrame ---")
|
|
||||||
send_dataframe()
|
|
||||||
|
|
||||||
println("\nTest completed.")
|
|
||||||
Reference in New Issue
Block a user