Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 94fde6cea9 | |||
| dcf79c92d1 | |||
| eaaebc247c | |||
| 14fc30696a | |||
| 8e9464210a | |||
| 4019b35574 | |||
| eb99df02c9 | |||
| 8d4384ae3f | |||
| 28158a284c |
@@ -6,7 +6,7 @@
|
||||
|
||||
module NATSBridge
|
||||
|
||||
using NATS, JSON, Arrow, HTTP, UUIDs, Dates
|
||||
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64
|
||||
# ---------------------------------------------- 100 --------------------------------------------- #
|
||||
|
||||
# Constants
|
||||
@@ -104,7 +104,7 @@ function log_trace(correlation_id::String, message::String)
|
||||
timestamp = Dates.now() # Get current timestamp
|
||||
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
|
||||
end
|
||||
|
||||
2112
|
||||
|
||||
""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
|
||||
|
||||
@@ -155,7 +155,7 @@ env = smartsend("large.data", data, "arrow")
|
||||
# In another process, retrieve and deserialize:
|
||||
# msg = subscribe(nats_url, "my.subject")
|
||||
# env = json_to_envelope(msg.data)
|
||||
# data = _deserialize_data(base64decode(env.payload), env.type)
|
||||
# data = _deserialize_data(Base64.decode(env.payload), env.type)
|
||||
```
|
||||
"""
|
||||
function smartsend(
|
||||
@@ -183,7 +183,7 @@ function smartsend(
|
||||
# Decision: Direct vs Link
|
||||
if payload_size < size_threshold # Check if payload is small enough for direct transport
|
||||
# Direct path - Base64 encode and send via NATS
|
||||
payload_b64 = base64encode(payload_bytes) # Encode bytes as base64 string
|
||||
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
||||
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
|
||||
|
||||
env = MessageEnvelope( # Create envelope for direct transport
|
||||
@@ -346,8 +346,7 @@ function smartreceive(
|
||||
max_delay::Int = 5000
|
||||
)
|
||||
# Parse the envelope
|
||||
env = MessageEnvelope(String(msg.data)) # Parse NATS message data as JSON envelope
|
||||
|
||||
env = MessageEnvelope(String(msg.payload)) # Parse NATS message data as JSON envelope
|
||||
log_trace(env.correlation_id, "Processing received message") # Log message processing start
|
||||
|
||||
# Check transport type
|
||||
@@ -355,7 +354,7 @@ function smartreceive(
|
||||
log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling
|
||||
|
||||
# Decode Base64 payload
|
||||
payload_bytes = base64decode(env.payload) # Decode base64 payload to bytes
|
||||
payload_bytes = Base64.base64decode(env.payload) # Decode base64 payload to bytes
|
||||
|
||||
# Deserialize based on type
|
||||
data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
|
||||
@@ -457,19 +456,19 @@ function _deserialize_data(
|
||||
end
|
||||
|
||||
|
||||
""" Decode base64 string to bytes
|
||||
This internal function decodes a base64-encoded string back to binary data.
|
||||
It's a wrapper around Base64.decode for consistency in the module.
|
||||
# """ Decode base64 string to bytes
|
||||
# This internal function decodes a base64-encoded string back to binary data.
|
||||
# It's a wrapper around Base64.decode for consistency in the module.
|
||||
|
||||
Arguments:
|
||||
- `str::String` - Base64-encoded string to decode
|
||||
# Arguments:
|
||||
# - `str::String` - Base64-encoded string to decode
|
||||
|
||||
Return:
|
||||
- Vector{UInt8} - Decoded binary data
|
||||
"""
|
||||
function base64decode(str::String)
|
||||
return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module
|
||||
end
|
||||
# Return:
|
||||
# - Vector{UInt8} - Decoded binary data
|
||||
# """
|
||||
# function base64decode(str::String)
|
||||
# return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module
|
||||
# end
|
||||
|
||||
|
||||
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
|
||||
|
||||
@@ -17,7 +17,8 @@ const FILESERVER_URL = "http://192.168.88.104:8080"
|
||||
correlation_id = string(uuid4())
|
||||
|
||||
# File path for large binary payload test
|
||||
const LARGE_FILE_PATH = "./test.zip"
|
||||
const LARGE_FILE_PATH = "./testFile_small.zip"
|
||||
const filename = basename(LARGE_FILE_PATH)
|
||||
|
||||
# Helper: Log with correlation ID
|
||||
function log_trace(message)
|
||||
@@ -43,7 +44,7 @@ function test_large_binary_send()
|
||||
"binary",
|
||||
nats_url = NATS_URL,
|
||||
fileserver_url = FILESERVER_URL;
|
||||
dataname="test.zip"
|
||||
dataname=filename
|
||||
)
|
||||
|
||||
log_trace("Sent message with transport: $(env.transport)")
|
||||
@@ -65,17 +66,16 @@ function test_large_binary_receive()
|
||||
conn = NATS.connect(NATS_URL)
|
||||
NATS.subscribe(conn, SUBJECT) do msg
|
||||
log_trace("Received message on $(msg.subject)")
|
||||
log_trace("Received message:\n$msg")
|
||||
|
||||
# Use SmartReceive to handle the data
|
||||
result = SmartReceive(msg)
|
||||
|
||||
# Use NATSBridge.smartreceive to handle the data
|
||||
result = NATSBridge.smartreceive(msg)
|
||||
# println("envelope----- ", result.envelope)
|
||||
# Check transport type
|
||||
if result.envelope.transport == "direct"
|
||||
log_trace("Received direct transport with $(length(result.data)) bytes")
|
||||
log_trace("Received direct transport with ---- bytes")
|
||||
else
|
||||
# For link transport, result.data is the URL
|
||||
log_trace("Received link transport at $(result.data)")
|
||||
log_trace("Received link transport at ---")
|
||||
end
|
||||
|
||||
# Verify the received data matches the original
|
||||
@@ -85,7 +85,9 @@ function test_large_binary_receive()
|
||||
log_trace("Received $(file_size) bytes of binary data")
|
||||
|
||||
# Save received data to a test file
|
||||
output_path = "test_output.bin"
|
||||
#[WORKING] add dataname so I know it is a file
|
||||
filename = basename(result.envelope.url)
|
||||
output_path = "./new_$filename"
|
||||
write(output_path, result.data)
|
||||
log_trace("Saved received data to $output_path")
|
||||
|
||||
@@ -101,7 +103,7 @@ function test_large_binary_receive()
|
||||
end
|
||||
|
||||
# Keep listening for 10 seconds
|
||||
sleep(10)
|
||||
sleep(60)
|
||||
NATS.drain(conn)
|
||||
end
|
||||
|
||||
@@ -110,12 +112,28 @@ println("Starting large binary payload test...")
|
||||
println("Correlation ID: $correlation_id")
|
||||
println("Large file: $LARGE_FILE_PATH")
|
||||
|
||||
# Run sender first
|
||||
println("start smartsend")
|
||||
test_large_binary_send()
|
||||
# # Run sender first
|
||||
# println("start smartsend")
|
||||
# test_large_binary_send()
|
||||
|
||||
# Run receiver
|
||||
println("testing smartreceive")
|
||||
test_large_binary_receive()
|
||||
|
||||
println("Test completed.")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
BIN
testFile_small.zip
Normal file
BIN
testFile_small.zip
Normal file
Binary file not shown.
Reference in New Issue
Block a user