Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 94fde6cea9 | |||
| dcf79c92d1 | |||
| eaaebc247c | |||
| 14fc30696a | |||
| 8e9464210a | |||
| 4019b35574 | |||
| eb99df02c9 | |||
| 8d4384ae3f | |||
| 28158a284c |
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
module NATSBridge
|
module NATSBridge
|
||||||
|
|
||||||
using NATS, JSON, Arrow, HTTP, UUIDs, Dates
|
using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64
|
||||||
# ---------------------------------------------- 100 --------------------------------------------- #
|
# ---------------------------------------------- 100 --------------------------------------------- #
|
||||||
|
|
||||||
# Constants
|
# Constants
|
||||||
@@ -104,7 +104,7 @@ function log_trace(correlation_id::String, message::String)
|
|||||||
timestamp = Dates.now() # Get current timestamp
|
timestamp = Dates.now() # Get current timestamp
|
||||||
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
|
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
|
||||||
end
|
end
|
||||||
|
2112
|
||||||
|
|
||||||
""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
|
""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
|
||||||
|
|
||||||
@@ -155,7 +155,7 @@ env = smartsend("large.data", data, "arrow")
|
|||||||
# In another process, retrieve and deserialize:
|
# In another process, retrieve and deserialize:
|
||||||
# msg = subscribe(nats_url, "my.subject")
|
# msg = subscribe(nats_url, "my.subject")
|
||||||
# env = json_to_envelope(msg.data)
|
# env = json_to_envelope(msg.data)
|
||||||
# data = _deserialize_data(base64decode(env.payload), env.type)
|
# data = _deserialize_data(Base64.decode(env.payload), env.type)
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
function smartsend(
|
function smartsend(
|
||||||
@@ -183,7 +183,7 @@ function smartsend(
|
|||||||
# Decision: Direct vs Link
|
# Decision: Direct vs Link
|
||||||
if payload_size < size_threshold # Check if payload is small enough for direct transport
|
if payload_size < size_threshold # Check if payload is small enough for direct transport
|
||||||
# Direct path - Base64 encode and send via NATS
|
# Direct path - Base64 encode and send via NATS
|
||||||
payload_b64 = base64encode(payload_bytes) # Encode bytes as base64 string
|
payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
|
||||||
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
|
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
|
||||||
|
|
||||||
env = MessageEnvelope( # Create envelope for direct transport
|
env = MessageEnvelope( # Create envelope for direct transport
|
||||||
@@ -346,8 +346,7 @@ function smartreceive(
|
|||||||
max_delay::Int = 5000
|
max_delay::Int = 5000
|
||||||
)
|
)
|
||||||
# Parse the envelope
|
# Parse the envelope
|
||||||
env = MessageEnvelope(String(msg.data)) # Parse NATS message data as JSON envelope
|
env = MessageEnvelope(String(msg.payload)) # Parse NATS message data as JSON envelope
|
||||||
|
|
||||||
log_trace(env.correlation_id, "Processing received message") # Log message processing start
|
log_trace(env.correlation_id, "Processing received message") # Log message processing start
|
||||||
|
|
||||||
# Check transport type
|
# Check transport type
|
||||||
@@ -355,7 +354,7 @@ function smartreceive(
|
|||||||
log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling
|
log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling
|
||||||
|
|
||||||
# Decode Base64 payload
|
# Decode Base64 payload
|
||||||
payload_bytes = base64decode(env.payload) # Decode base64 payload to bytes
|
payload_bytes = Base64.base64decode(env.payload) # Decode base64 payload to bytes
|
||||||
|
|
||||||
# Deserialize based on type
|
# Deserialize based on type
|
||||||
data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
|
data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
|
||||||
@@ -457,19 +456,19 @@ function _deserialize_data(
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
""" Decode base64 string to bytes
|
# """ Decode base64 string to bytes
|
||||||
This internal function decodes a base64-encoded string back to binary data.
|
# This internal function decodes a base64-encoded string back to binary data.
|
||||||
It's a wrapper around Base64.decode for consistency in the module.
|
# It's a wrapper around Base64.decode for consistency in the module.
|
||||||
|
|
||||||
Arguments:
|
# Arguments:
|
||||||
- `str::String` - Base64-encoded string to decode
|
# - `str::String` - Base64-encoded string to decode
|
||||||
|
|
||||||
Return:
|
# Return:
|
||||||
- Vector{UInt8} - Decoded binary data
|
# - Vector{UInt8} - Decoded binary data
|
||||||
"""
|
# """
|
||||||
function base64decode(str::String)
|
# function base64decode(str::String)
|
||||||
return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module
|
# return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module
|
||||||
end
|
# end
|
||||||
|
|
||||||
|
|
||||||
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
|
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
|
||||||
|
|||||||
@@ -17,7 +17,8 @@ const FILESERVER_URL = "http://192.168.88.104:8080"
|
|||||||
correlation_id = string(uuid4())
|
correlation_id = string(uuid4())
|
||||||
|
|
||||||
# File path for large binary payload test
|
# File path for large binary payload test
|
||||||
const LARGE_FILE_PATH = "./test.zip"
|
const LARGE_FILE_PATH = "./testFile_small.zip"
|
||||||
|
const filename = basename(LARGE_FILE_PATH)
|
||||||
|
|
||||||
# Helper: Log with correlation ID
|
# Helper: Log with correlation ID
|
||||||
function log_trace(message)
|
function log_trace(message)
|
||||||
@@ -43,7 +44,7 @@ function test_large_binary_send()
|
|||||||
"binary",
|
"binary",
|
||||||
nats_url = NATS_URL,
|
nats_url = NATS_URL,
|
||||||
fileserver_url = FILESERVER_URL;
|
fileserver_url = FILESERVER_URL;
|
||||||
dataname="test.zip"
|
dataname=filename
|
||||||
)
|
)
|
||||||
|
|
||||||
log_trace("Sent message with transport: $(env.transport)")
|
log_trace("Sent message with transport: $(env.transport)")
|
||||||
@@ -65,17 +66,16 @@ function test_large_binary_receive()
|
|||||||
conn = NATS.connect(NATS_URL)
|
conn = NATS.connect(NATS_URL)
|
||||||
NATS.subscribe(conn, SUBJECT) do msg
|
NATS.subscribe(conn, SUBJECT) do msg
|
||||||
log_trace("Received message on $(msg.subject)")
|
log_trace("Received message on $(msg.subject)")
|
||||||
log_trace("Received message:\n$msg")
|
|
||||||
|
|
||||||
# Use SmartReceive to handle the data
|
|
||||||
result = SmartReceive(msg)
|
|
||||||
|
|
||||||
|
# Use NATSBridge.smartreceive to handle the data
|
||||||
|
result = NATSBridge.smartreceive(msg)
|
||||||
|
# println("envelope----- ", result.envelope)
|
||||||
# Check transport type
|
# Check transport type
|
||||||
if result.envelope.transport == "direct"
|
if result.envelope.transport == "direct"
|
||||||
log_trace("Received direct transport with $(length(result.data)) bytes")
|
log_trace("Received direct transport with ---- bytes")
|
||||||
else
|
else
|
||||||
# For link transport, result.data is the URL
|
# For link transport, result.data is the URL
|
||||||
log_trace("Received link transport at $(result.data)")
|
log_trace("Received link transport at ---")
|
||||||
end
|
end
|
||||||
|
|
||||||
# Verify the received data matches the original
|
# Verify the received data matches the original
|
||||||
@@ -85,7 +85,9 @@ function test_large_binary_receive()
|
|||||||
log_trace("Received $(file_size) bytes of binary data")
|
log_trace("Received $(file_size) bytes of binary data")
|
||||||
|
|
||||||
# Save received data to a test file
|
# Save received data to a test file
|
||||||
output_path = "test_output.bin"
|
#[WORKING] add dataname so I know it is a file
|
||||||
|
filename = basename(result.envelope.url)
|
||||||
|
output_path = "./new_$filename"
|
||||||
write(output_path, result.data)
|
write(output_path, result.data)
|
||||||
log_trace("Saved received data to $output_path")
|
log_trace("Saved received data to $output_path")
|
||||||
|
|
||||||
@@ -99,9 +101,9 @@ function test_large_binary_receive()
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Keep listening for 10 seconds
|
# Keep listening for 10 seconds
|
||||||
sleep(10)
|
sleep(60)
|
||||||
NATS.drain(conn)
|
NATS.drain(conn)
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -110,12 +112,28 @@ println("Starting large binary payload test...")
|
|||||||
println("Correlation ID: $correlation_id")
|
println("Correlation ID: $correlation_id")
|
||||||
println("Large file: $LARGE_FILE_PATH")
|
println("Large file: $LARGE_FILE_PATH")
|
||||||
|
|
||||||
# Run sender first
|
# # Run sender first
|
||||||
println("start smartsend")
|
# println("start smartsend")
|
||||||
test_large_binary_send()
|
# test_large_binary_send()
|
||||||
|
|
||||||
# Run receiver
|
# Run receiver
|
||||||
println("testing smartreceive")
|
println("testing smartreceive")
|
||||||
test_large_binary_receive()
|
test_large_binary_receive()
|
||||||
|
|
||||||
println("Test completed.")
|
println("Test completed.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
BIN
testFile_small.zip
Normal file
BIN
testFile_small.zip
Normal file
Binary file not shown.
Reference in New Issue
Block a user