9 Commits

Author SHA1 Message Date
94fde6cea9 update 2026-02-10 14:11:07 +07:00
dcf79c92d1 fix base64encode 2026-02-10 13:48:44 +07:00
eaaebc247c update 2026-02-10 13:34:17 +07:00
14fc30696a add test file 2026-02-10 13:29:41 +07:00
8e9464210a update 2026-02-10 11:38:33 +07:00
4019b35574 update 2026-02-10 08:19:10 +07:00
eb99df02c9 update 2026-02-10 07:13:56 +07:00
8d4384ae3f rename 2026-02-10 07:12:16 +07:00
28158a284c rename test file 2026-02-10 07:11:07 +07:00
4 changed files with 50 additions and 33 deletions

View File

@@ -6,7 +6,7 @@
module NATSBridge module NATSBridge
using NATS, JSON, Arrow, HTTP, UUIDs, Dates using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64
# ---------------------------------------------- 100 --------------------------------------------- # # ---------------------------------------------- 100 --------------------------------------------- #
# Constants # Constants
@@ -104,7 +104,7 @@ function log_trace(correlation_id::String, message::String)
timestamp = Dates.now() # Get current timestamp timestamp = Dates.now() # Get current timestamp
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message @info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
end end
2112
""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size """ smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
@@ -155,7 +155,7 @@ env = smartsend("large.data", data, "arrow")
# In another process, retrieve and deserialize: # In another process, retrieve and deserialize:
# msg = subscribe(nats_url, "my.subject") # msg = subscribe(nats_url, "my.subject")
# env = json_to_envelope(msg.data) # env = json_to_envelope(msg.data)
# data = _deserialize_data(base64decode(env.payload), env.type) # data = _deserialize_data(Base64.decode(env.payload), env.type)
``` ```
""" """
function smartsend( function smartsend(
@@ -183,7 +183,7 @@ function smartsend(
# Decision: Direct vs Link # Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport if payload_size < size_threshold # Check if payload is small enough for direct transport
# Direct path - Base64 encode and send via NATS # Direct path - Base64 encode and send via NATS
payload_b64 = base64encode(payload_bytes) # Encode bytes as base64 string payload_b64 = Base64.base64encode(payload_bytes) # Encode bytes as base64 string
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
env = MessageEnvelope( # Create envelope for direct transport env = MessageEnvelope( # Create envelope for direct transport
@@ -346,8 +346,7 @@ function smartreceive(
max_delay::Int = 5000 max_delay::Int = 5000
) )
# Parse the envelope # Parse the envelope
env = MessageEnvelope(String(msg.data)) # Parse NATS message data as JSON envelope env = MessageEnvelope(String(msg.payload)) # Parse NATS message data as JSON envelope
log_trace(env.correlation_id, "Processing received message") # Log message processing start log_trace(env.correlation_id, "Processing received message") # Log message processing start
# Check transport type # Check transport type
@@ -355,7 +354,7 @@ function smartreceive(
log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling
# Decode Base64 payload # Decode Base64 payload
payload_bytes = base64decode(env.payload) # Decode base64 payload to bytes payload_bytes = Base64.base64decode(env.payload) # Decode base64 payload to bytes
# Deserialize based on type # Deserialize based on type
data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
@@ -457,19 +456,19 @@ function _deserialize_data(
end end
""" Decode base64 string to bytes # """ Decode base64 string to bytes
This internal function decodes a base64-encoded string back to binary data. # This internal function decodes a base64-encoded string back to binary data.
It's a wrapper around Base64.decode for consistency in the module. # It's a wrapper around Base64.decode for consistency in the module.
Arguments: # Arguments:
- `str::String` - Base64-encoded string to decode # - `str::String` - Base64-encoded string to decode
Return: # Return:
- Vector{UInt8} - Decoded binary data # - Vector{UInt8} - Decoded binary data
""" # """
function base64decode(str::String) # function base64decode(str::String)
return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module # return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module
end # end
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode """ plik_oneshot_upload - Upload a single file to a plik server using one-shot mode

View File

@@ -17,7 +17,8 @@ const FILESERVER_URL = "http://192.168.88.104:8080"
correlation_id = string(uuid4()) correlation_id = string(uuid4())
# File path for large binary payload test # File path for large binary payload test
const LARGE_FILE_PATH = "./test.zip" const LARGE_FILE_PATH = "./testFile_small.zip"
const filename = basename(LARGE_FILE_PATH)
# Helper: Log with correlation ID # Helper: Log with correlation ID
function log_trace(message) function log_trace(message)
@@ -43,7 +44,7 @@ function test_large_binary_send()
"binary", "binary",
nats_url = NATS_URL, nats_url = NATS_URL,
fileserver_url = FILESERVER_URL; fileserver_url = FILESERVER_URL;
dataname="test.zip" dataname=filename
) )
log_trace("Sent message with transport: $(env.transport)") log_trace("Sent message with transport: $(env.transport)")
@@ -65,17 +66,16 @@ function test_large_binary_receive()
conn = NATS.connect(NATS_URL) conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg NATS.subscribe(conn, SUBJECT) do msg
log_trace("Received message on $(msg.subject)") log_trace("Received message on $(msg.subject)")
log_trace("Received message:\n$msg")
# Use SmartReceive to handle the data
result = SmartReceive(msg)
# Use NATSBridge.smartreceive to handle the data
result = NATSBridge.smartreceive(msg)
# println("envelope----- ", result.envelope)
# Check transport type # Check transport type
if result.envelope.transport == "direct" if result.envelope.transport == "direct"
log_trace("Received direct transport with $(length(result.data)) bytes") log_trace("Received direct transport with ---- bytes")
else else
# For link transport, result.data is the URL # For link transport, result.data is the URL
log_trace("Received link transport at $(result.data)") log_trace("Received link transport at ---")
end end
# Verify the received data matches the original # Verify the received data matches the original
@@ -85,7 +85,9 @@ function test_large_binary_receive()
log_trace("Received $(file_size) bytes of binary data") log_trace("Received $(file_size) bytes of binary data")
# Save received data to a test file # Save received data to a test file
output_path = "test_output.bin" #[WORKING] add dataname so I know it is a file
filename = basename(result.envelope.url)
output_path = "./new_$filename"
write(output_path, result.data) write(output_path, result.data)
log_trace("Saved received data to $output_path") log_trace("Saved received data to $output_path")
@@ -101,7 +103,7 @@ function test_large_binary_receive()
end end
# Keep listening for 10 seconds # Keep listening for 10 seconds
sleep(10) sleep(60)
NATS.drain(conn) NATS.drain(conn)
end end
@@ -110,12 +112,28 @@ println("Starting large binary payload test...")
println("Correlation ID: $correlation_id") println("Correlation ID: $correlation_id")
println("Large file: $LARGE_FILE_PATH") println("Large file: $LARGE_FILE_PATH")
# Run sender first # # Run sender first
println("start smartsend") # println("start smartsend")
test_large_binary_send() # test_large_binary_send()
# Run receiver # Run receiver
println("testing smartreceive") println("testing smartreceive")
test_large_binary_receive() test_large_binary_receive()
println("Test completed.") println("Test completed.")

BIN
testFile_small.zip Normal file

Binary file not shown.