# Cross-Platform Implementation Guide: Bi-Directional Data Bridge ## Overview This document describes the detailed implementation of the high-performance, bi-directional data bridge using **NATS (Core & JetStream)**, implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with **high-level API parity** while maintaining **idiomatic implementations** for each language. **Supported Platforms:** - **Julia** - Ground truth implementation (reference) - **JavaScript** - Node.js and browser implementation - **Python/MicroPython** - Desktop and embedded implementation --- ## Cross-Platform Compatibility Notes ### 1. Python Payload Type Naming The Python implementation uses `"table"` as a single payload type for both Arrow and JSON table serialization, while Julia and JavaScript use separate types (`"arrowtable"` and `"jsontable"`): | Platform | Table Types | |----------|-------------| | Julia | `"arrowtable"`, `"jsontable"` | | JavaScript | `"arrowtable"`, `"jsontable"` | | Python | `"table"` (single type) | | MicroPython | Not supported | **Impact:** When exchanging data between Python and Julia/JavaScript, the payload type will differ. Python code should use `"table"` while Julia/JavaScript code should use `"arrowtable"` or `"jsontable"`. ### 2. Direct Transport Encoding Field The encoding field in direct transport payloads differs between platforms: | Platform | Encoding for Direct Transport | |----------|-------------------------------| | Julia | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` | | JavaScript | Preserves original type: `"base64"`, `"json"`, or `"arrow-ipc"` | | Python | Always `"base64"` for all direct transport payloads | | MicroPython | Always `"base64"` for all direct transport payloads | **Impact:** The encoding field may not accurately reflect the original serialization format when using Python or MicroPython. ### 3. MicroPython Limitations MicroPython has significant constraints that affect feature support: | Feature | Desktop Platforms | MicroPython | |---------|-------------------|-------------| | `arrowtable` | ✅ | ❌ (not supported - memory constraints) | | `jsontable` | ✅ | ❌ (not supported - memory constraints) | | `table` | ✅ | ❌ (not supported - memory constraints) | | Async/await | ✅ | ❌ (synchronous only) | | File upload/download | ✅ | ⚠️ (placeholder implementations) | | MAX_PAYLOAD_SIZE | 1MB+ | 50KB (hard limit) | | DEFAULT_SIZE_THRESHOLD | 1MB | 100KB | **Impact:** MicroPython should only be used for small payloads with direct transport. File server operations are not fully implemented. --- ## Implementation Files | Language | Implementation File | Description | |----------|---------------------|-------------| | **Julia** | [`src/NATSBridge.jl`](../src/NATSBridge.jl) | Full Julia implementation with Arrow IPC support | | **JavaScript** | `src/natsbridge.js` | Node.js/browser implementation | | **Python** | `src/natsbridge.py` | Desktop Python implementation | | **MicroPython** | `src/natsbridge_mpy.py` | MicroPython implementation (limited features) | --- ## File Server Handler Architecture The system uses **handler functions** to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server). ### Handler Function Signatures #### Julia ```julia # Upload handler - uploads data to file server and returns URL fileserver_upload_handler( fileserver_url::String, dataname::String, data::Vector{UInt8} )::Dict{String, Any} # Download handler - fetches data from file server URL with exponential backoff fileserver_download_handler( url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String )::Vector{UInt8} ``` #### JavaScript ```javascript // Upload handler - async function async function fileserver_upload_handler( fileserver_url, dataname, data // Uint8Array ) { // Returns: { status, uploadid, fileid, url } } // Download handler - async function async function fileserver_download_handler( url, max_retries, base_delay, max_delay, correlation_id ) { // Returns: Uint8Array } ``` #### Python ```python # Upload handler - async function async def fileserver_upload_handler( fileserver_url: str, dataname: str, data: bytes ) -> Dict[str, Any]: """ Upload data to file server. Returns: Dict with keys: 'status', 'uploadid', 'fileid', 'url' """ pass # Download handler - async function async def fileserver_download_handler( url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str ) -> bytes: """ Download data from URL with exponential backoff. Returns: Downloaded bytes """ pass ``` #### MicroPython ```python # Upload handler - synchronous (no async in MicroPython) def fileserver_upload_handler( fileserver_url: str, dataname: str, data: bytearray ) -> Dict: """ Upload data to file server (synchronous). Returns: Dict with keys: 'status', 'url' """ pass # Download handler - synchronous def fileserver_download_handler( url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str ) -> bytearray: """ Download data from URL with exponential backoff (synchronous). Returns: Downloaded bytes """ pass ``` --- ## Multi-Payload Support (Standard API) The system uses a **standardized list-of-tuples format** for all payload operations across all platforms. ### API Standard ``` # Input format for smartsend (always a list of tuples with type info) [(dataname1, data1, type1), (dataname2, data2, type2), ...] # Output format for smartreceive (returns a dictionary with payloads field containing list of tuples) { "correlation_id": "...", "msg_id": "...", "timestamp": "...", "send_to": "...", "msg_purpose": "...", "sender_name": "...", "sender_id": "...", "receiver_name": "...", "receiver_id": "...", "reply_to": "...", "reply_to_msg_id": "...", "broker_url": "...", "metadata": {...}, "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...] } ``` ### Supported Types | Type | Julia | JavaScript | Python | MicroPython | |------|-------|------------|--------|-------------| | `text` | `String` | `string` | `str` | `str` | | `dictionary` | `Dict`, `NamedTuple` | `Object`, `Array` | `dict`, `list` | `dict` | | `arrowtable` | `DataFrame`, `Arrow.Table` | `Array` (input) → `Buffer` (Arrow IPC) | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ (not supported) | | `jsontable` | `Vector{NamedTuple}`, `Vector{Dict}` | `Array` | `list[dict]`, `list` | ⚠️ (limited) | | `table` | ❌ | ❌ | `pandas.DataFrame`, `bytes` (Arrow IPC) | ❌ | | `image` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | | `audio` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | | `video` | `Vector{UInt8}` | `Uint8Array`, `Buffer` | `bytes` | `bytearray` | | `binary` | `Vector{UInt8}`, `IOBuffer` | `Uint8Array`, `Buffer` | `bytes`, `bytearray` | `bytearray` | **Note:** Python uses `"table"` as a single type for both Arrow and JSON table serialization. When exchanging data between Python and Julia/JavaScript, ensure the payload type is correctly translated (`"table"` ↔ `"arrowtable"` or `"jsontable"`). --- ## Platform-Specific Implementations ### Julia Implementation #### Module Structure ```julia module NATSBridge using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64 # Constants const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB const DEFAULT_BROKER_URL = "nats://localhost:4222" const DEFAULT_FILESERVER_URL = "http://localhost:8080" # Structs struct msg_payload_v1 id::String dataname::String payload_type::String transport::String encoding::String size::Integer data::Any metadata::Dict{String, Any} end struct msg_envelope_v1 correlation_id::String msg_id::String timestamp::String send_to::String msg_purpose::String sender_name::String sender_id::String receiver_name::String receiver_id::String reply_to::String reply_to_msg_id::String broker_url::String metadata::Dict{String, Any} payloads::Vector{msg_payload_v1} end # Main functions function smartsend(...) end function smartreceive(...) end # Utility functions function _serialize_data(...) end function _deserialize_data(...) end function envelope_to_json(...) end function log_trace(...) end # File server handlers function plik_oneshot_upload(...) end function _fetch_with_backoff(...) end function publish_message(...) end # Internal helpers function _get_payload_bytes(...) end end ``` #### Multiple Dispatch Pattern Julia leverages multiple dispatch for type-specific implementations: ```julia # publish_message has two overloads based on argument types function publish_message(broker_url::String, subject::String, message::String, correlation_id::String) conn = NATS.connect(broker_url) publish_message(conn, subject, message, correlation_id) end function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String) try NATS.publish(conn, subject, message) log_trace(correlation_id, "Message published to $subject") finally NATS.drain(conn) end end # Type-specific serialization function _serialize_data(data::String, payload_type::String) # Text handling return Vector{UInt8}(data) end function _serialize_data(data::Dict, payload_type::String) # Dictionary handling json_str = JSON.json(data) return Vector{UInt8}(json_str) end function _serialize_data(data::DataFrame, payload_type::String) # Table handling - arrowtable io = IOBuffer() Arrow.write(io, data) return take!(io) end ``` #### smartsend Implementation ```julia function smartsend( subject::String, data::AbstractArray{Tuple{String, T1, String}, 1}; broker_url::String = DEFAULT_BROKER_URL, fileserver_url = DEFAULT_FILESERVER_URL, fileserver_upload_handler::Function = plik_oneshot_upload, size_threshold::Int = DEFAULT_SIZE_THRESHOLD, correlation_id::String = string(uuid4()), msg_purpose::String = "chat", sender_name::String = "NATSBridge", receiver_name::String = "", receiver_id::String = "", reply_to::String = "", reply_to_msg_id::String = "", is_publish::Bool = true, NATS_connection::Union{NATS.Connection, Nothing} = nothing, msg_id::String = string(uuid4()), sender_id::String = string(uuid4()) )::Tuple{msg_envelope_v1, String} where {T1<:Any} log_trace(correlation_id, "Starting smartsend for subject: $subject") # Process each payload in the list payloads = msg_payload_v1[] for (dataname, payload_data, payload_type) in data # Serialize data based on type payload_bytes = _serialize_data(payload_data, payload_type) payload_size = length(payload_bytes) log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes") # Decision: Direct vs Link if payload_size < size_threshold # Direct path - Base64 encode and send via NATS payload_b64 = Base64.base64encode(payload_bytes) log_trace(correlation_id, "Using direct transport for $payload_size bytes") payload = msg_payload_v1( payload_b64, payload_type; id = string(uuid4()), dataname = dataname, transport = "direct", encoding = "base64", size = payload_size, metadata = Dict{String, Any}("payload_bytes" => payload_size) ) push!(payloads, payload) else # Link path - Upload to HTTP server, send URL via NATS log_trace(correlation_id, "Using link transport, uploading to fileserver") response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes) if response["status"] != 200 error("Failed to upload data to fileserver: $(response["status"])") end url = response["url"] log_trace(correlation_id, "Uploaded to URL: $url") payload = msg_payload_v1( url, payload_type; id = string(uuid4()), dataname = dataname, transport = "link", encoding = "none", size = payload_size, metadata = Dict{String, Any}() ) push!(payloads, payload) end end # Create msg_envelope_v1 with all payloads # Note: First positional argument is "send_to" (the NATS subject), not "subject" env = msg_envelope_v1( subject, # send_to: NATS subject to publish to payloads; correlation_id = correlation_id, msg_id = msg_id, msg_purpose = msg_purpose, sender_name = sender_name, sender_id = sender_id, receiver_name = receiver_name, receiver_id = receiver_id, reply_to = reply_to, reply_to_msg_id = reply_to_msg_id, broker_url = broker_url, metadata = Dict{String, Any}(), ) env_json_str = envelope_to_json(env) if is_publish == false # skip publish elseif is_publish == true && NATS_connection === nothing publish_message(broker_url, subject, env_json_str, correlation_id) elseif is_publish == true && NATS_connection !== nothing publish_message(NATS_connection, subject, env_json_str, correlation_id) end return (env, env_json_str) end ``` #### smartreceive Implementation ```julia function smartreceive( msg::NATS.Msg; fileserver_download_handler::Function = _fetch_with_backoff, max_retries::Int = 5, base_delay::Int = 100, max_delay::Int = 5000 )::JSON.Object{String, Any} # Parse the JSON envelope env_json_obj = JSON.parse(String(msg.payload)) log_trace(env_json_obj["correlation_id"], "Processing received message") # Process all payloads in the envelope payloads_list = Tuple{String, Any, String}[] num_payloads = length(env_json_obj["payloads"]) for i in 1:num_payloads payload = env_json_obj["payloads"][i] transport = String(payload["transport"]) dataname = String(payload["dataname"]) if transport == "direct" log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'") # Extract base64 payload from the payload payload_b64 = String(payload["data"]) # Decode Base64 payload payload_bytes = Base64.base64decode(payload_b64) # Deserialize based on type data_type = String(payload["payload_type"]) data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"]) push!(payloads_list, (dataname, data, data_type)) elseif transport == "link" # Extract download URL from the payload url = String(payload["data"]) log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url") # Fetch with exponential backoff using the download handler downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"]) # Deserialize based on type data_type = String(payload["payload_type"]) data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"]) push!(payloads_list, (dataname, data, data_type)) else error("Unknown transport type for payload '$dataname': $(transport)") end end env_json_obj["payloads"] = payloads_list return env_json_obj end ``` #### _serialize_data Implementation ```julia function _serialize_data(data::Any, payload_type::String) if payload_type == "text" if isa(data, String) data_bytes = Vector{UInt8}(data) return data_bytes else error("Text data must be a String") end elseif payload_type == "dictionary" json_str = JSON.json(data) json_str_bytes = Vector{UInt8}(json_str) return json_str_bytes elseif payload_type == "arrowtable" # Serialize DataFrame to Arrow IPC format io = IOBuffer() Arrow.write(io, data) return take!(io) elseif payload_type == "jsontable" # Serialize to JSON # data is Vector{NamedTuple} or Vector{Dict} json_str = JSON.json(data) return Vector{UInt8}(json_str) elseif payload_type == "image" if isa(data, Vector{UInt8}) return data else error("Image data must be Vector{UInt8}") end elseif payload_type == "audio" if isa(data, Vector{UInt8}) return data else error("Audio data must be Vector{UInt8}") end elseif payload_type == "video" if isa(data, Vector{UInt8}) return data else error("Video data must be Vector{UInt8}") end elseif payload_type == "binary" if isa(data, IOBuffer) return take!(data) elseif isa(data, Vector{UInt8}) return data else error("Binary data must be binary (Vector{UInt8} or IOBuffer)") end else error("Unknown payload_type: $payload_type") end end ``` #### _deserialize_data Implementation ```julia function _deserialize_data( data::Vector{UInt8}, payload_type::String, correlation_id::String ) if payload_type == "text" return String(data) elseif payload_type == "dictionary" json_str = String(data) return JSON.parse(json_str) elseif payload_type == "arrowtable" # Deserialize from Arrow IPC format io = IOBuffer(data) arrow_table = Arrow.Table(io) return arrow_table elseif payload_type == "jsontable" # Deserialize from JSON format # Returns Vector{NamedTuple} or Vector{Dict} json_str = String(data) parsed = JSON.parse(json_str) return parsed elseif payload_type == "image" return data elseif payload_type == "audio" return data elseif payload_type == "video" return data elseif payload_type == "binary" return data else error("Unknown payload_type: $payload_type") end end ``` #### _fetch_with_backoff Implementation ```julia function _fetch_with_backoff( url::String, max_retries::Int, base_delay::Int, max_delay::Int, correlation_id::String ) delay = base_delay for attempt in 1:max_retries try response = HTTP.request("GET", url) if response.status == 200 log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") return response.body else error("Failed to fetch: $(response.status)") end catch e log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") if attempt < max_retries sleep(delay / 1000.0) delay = min(delay * 2, max_delay) end end end error("Failed to fetch data after $max_retries attempts") end ``` #### plik_oneshot_upload Implementation **Overload 1: Upload from binary data** ```julia function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8}) # Get upload id url_getUploadID = "$file_server_url/upload" headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) response_json = JSON.parse(http_response.body) uploadid = response_json["id"] uploadtoken = response_json["uploadToken"] # Upload file file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream") url_upload = "$file_server_url/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] form = HTTP.Form(Dict( "file" => file_multipart )) http_response = nothing try http_response = HTTP.post(url_upload, headers, form) catch e @error "Request failed" exception=e end response_json = JSON.parse(http_response.body) fileid = response_json["id"] url = "$file_server_url/file/$uploadid/$fileid/$dataname" return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end ``` **Overload 2: Upload from file path** ```julia function plik_oneshot_upload(file_server_url::String, filepath::String) # Get upload id filename = basename(filepath) url_getUploadID = "$file_server_url/upload" headers = ["Content-Type" => "application/json"] body = """{ "OneShot" : true }""" http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) response_json = JSON.parse(http_response.body) uploadid = response_json["id"] uploadtoken = response_json["uploadToken"] # Upload file url_upload = "$file_server_url/file/$uploadid" headers = ["X-UploadToken" => uploadtoken] http_response = open(filepath, "r") do file_stream form = HTTP.Form(Dict("file" => file_stream)) # Adding status_exception=false prevents 4xx/5xx from triggering 'catch' HTTP.post(url_upload, headers, form; status_exception = false) end if !isnothing(http_response) && http_response.status == 200 # Success - response already logged by caller else error("Failed to upload file: server returned status $(http_response.status)") end response_json = JSON.parse(http_response.body) fileid = response_json["id"] # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" url = "$file_server_url/file/$uploadid/$fileid/$filename" return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url) end ``` --- ### JavaScript Implementation #### Module Structure ```javascript // natsbridge.js const nats = require('nats'); const crypto = require('crypto'); const fetch = require('node-fetch'); // UUID generation using built-in crypto module const uuidv4 = () => crypto.randomUUID(); const DEFAULT_SIZE_THRESHOLD = 1_000_000; const DEFAULT_BROKER_URL = 'nats://localhost:4222'; const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; class NATSClient { constructor(url) { this.url = url; this.connection = null; } async connect() { this.connection = await nats.connect({ servers: this.url }); return this.connection; } async publish(subject, message) { if (!this.connection) { await this.connect(); } await this.connection.publish(subject, message); } async close() { if (this.connection) { this.connection.close(); } } } async function smartsend(subject, data, options = {}) { // Implementation } async function smartreceive(msg, options = {}) { // Implementation } module.exports = { NATSClient, smartsend, smartreceive, plikOneshotUpload, fetchWithBackoff }; ``` #### smartsend Implementation ```javascript const nats = require('nats'); const crypto = require('crypto'); const fetch = require('node-fetch'); const arrow = require('apache-arrow'); // UUID generation using built-in crypto module const uuidv4 = () => crypto.randomUUID(); const DEFAULT_SIZE_THRESHOLD = 1_000_000; const DEFAULT_BROKER_URL = 'nats://localhost:4222'; const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; async function smartsend(subject, data, options = {}) { const { broker_url = DEFAULT_BROKER_URL, fileserver_url = DEFAULT_FILESERVER_URL, fileserver_upload_handler = plikOneshotUpload, size_threshold = DEFAULT_SIZE_THRESHOLD, correlation_id = uuidv4(), msg_purpose = 'chat', sender_name = 'NATSBridge', receiver_name = '', receiver_id = '', reply_to = '', reply_to_msg_id = '', is_publish = true, nats_connection = null, msg_id = uuidv4(), sender_id = uuidv4() } = options; console.log(`[Correlation: ${correlation_id}] Starting smartsend for subject: ${subject}`); // Process payloads const payloads = []; for (const [dataname, payloadData, payloadType] of data) { const payloadBytes = await serializeData(payloadData, payloadType); const payloadSize = payloadBytes.byteLength; console.log(`[Correlation: ${correlation_id}] Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`); if (payloadSize < size_threshold) { // Direct path const payloadB64 = bufferToBase64(payloadBytes); console.log(`[Correlation: ${correlation_id}] Using direct transport for ${payloadSize} bytes`); payloads.push({ id: uuidv4(), dataname, payload_type: payloadType, transport: 'direct', encoding: 'base64', size: payloadSize, data: payloadB64, metadata: { payload_bytes: payloadSize } }); } else { // Link path console.log(`[Correlation: ${correlation_id}] Using link transport, uploading to fileserver`); const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes); if (response.status !== 200) { throw new Error(`Failed to upload data to fileserver: ${response.status}`); } console.log(`[Correlation: ${correlation_id}] Uploaded to URL: ${response.url}`); payloads.push({ id: uuidv4(), dataname, payload_type: payloadType, transport: 'link', encoding: 'none', size: payloadSize, data: response.url, metadata: {} }); } } // Build envelope const env = { correlation_id, msg_id, timestamp: new Date().toISOString(), send_to: subject, msg_purpose, sender_name, sender_id, receiver_name, receiver_id, reply_to, reply_to_msg_id, broker_url, metadata: {}, payloads }; const env_json_str = JSON.stringify(env); if (is_publish) { if (nats_connection) { await publishMessage(nats_connection, subject, env_json_str, correlation_id); } else { await publishMessage(broker_url, subject, env_json_str, correlation_id); } } return [env, env_json_str]; } ``` #### serializeData Implementation ```javascript const arrow = require('apache-arrow'); async function serializeData(data, payload_type) { if (payload_type === 'text') { if (typeof data === 'string') { return Buffer.from(data, 'utf8'); } else { throw new Error('Text data must be a string'); } } else if (payload_type === 'dictionary') { const jsonStr = JSON.stringify(data); return Buffer.from(jsonStr, 'utf8'); } else if (payload_type === 'arrowtable') { // Convert Array to Arrow IPC if (!Array.isArray(data) || data.length === 0) { throw new Error('arrowtable data must be a non-empty array of objects'); } // Create schema from first row const schemaFields = Object.keys(data[0]).map(key => new arrow.Field(key, arrow.any()) ); const schema = new arrow.Schema(schemaFields); // Create writer const writer = new arrow.RecordBatchWriter([schema]); // Write rows for (const row of data) { const recordBatch = arrow.recordBatch.fromObjects([row], schema); writer.write(recordBatch); } await writer.close(); // Read buffer return writer.toBuffer(); } else if (payload_type === 'jsontable') { // Serialize directly to JSON const jsonStr = JSON.stringify(data); return Buffer.from(jsonStr, 'utf8'); } else if (payload_type === 'image') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Image data must be Uint8Array or Buffer'); } } else if (payload_type === 'audio') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Audio data must be Uint8Array or Buffer'); } } else if (payload_type === 'video') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Video data must be Uint8Array or Buffer'); } } else if (payload_type === 'binary') { if (data instanceof Uint8Array || Buffer.isBuffer(data)) { return Buffer.from(data); } else { throw new Error('Binary data must be Uint8Array or Buffer'); } } else { throw new Error(`Unknown payload_type: ${payload_type}`); } } function bufferToBase64(buffer) { return buffer.toString('base64'); } ``` #### deserializeData Implementation ```javascript const arrow = require('apache-arrow'); async function deserializeData(data, payload_type, correlation_id) { if (payload_type === 'text') { return Buffer.from(data).toString('utf8'); } else if (payload_type === 'dictionary') { const jsonStr = Buffer.from(data).toString('utf8'); return JSON.parse(jsonStr); } else if (payload_type === 'arrowtable') { // Deserialize from Arrow IPC const buffer = Buffer.from(data); const table = arrow.tableFromRawBytes(buffer); return table; } else if (payload_type === 'jsontable') { // Deserialize from JSON - returns Array const jsonStr = Buffer.from(data).toString('utf8'); return JSON.parse(jsonStr); } else if (payload_type === 'image') { return Buffer.from(data); } else if (payload_type === 'audio') { return Buffer.from(data); } else if (payload_type === 'video') { return Buffer.from(data); } else if (payload_type === 'binary') { return Buffer.from(data); } else { throw new Error(`Unknown payload_type: ${payload_type}`); } } ``` #### fetchWithBackoff Implementation ```javascript async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) { let delay = base_delay; for (let attempt = 1; attempt <= max_retries; attempt++) { try { const response = await fetch(url); if (response.status === 200) { console.log(`[Correlation: ${correlation_id}] Successfully fetched data from ${url} on attempt ${attempt}`); return await response.arrayBuffer(); } else { throw new Error(`Failed to fetch: ${response.status}`); } } catch (e) { console.log(`[Correlation: ${correlation_id}] Attempt ${attempt} failed: ${e.constructor.name}`); if (attempt < max_retries) { await new Promise(resolve => setTimeout(resolve, delay)); delay = Math.min(delay * 2, max_delay); } } } throw new Error(`Failed to fetch data after ${max_retries} attempts`); } ``` #### plikOneshotUpload Implementation ```javascript async function plikOneshotUpload(file_server_url, dataname, data) { // Get upload id const url_getUploadID = `${file_server_url}/upload`; const headers = { 'Content-Type': 'application/json' }; const body = JSON.stringify({ OneShot: true }); const http_response = await fetch(url_getUploadID, { method: 'POST', headers, body }); const response_json = await http_response.json(); const uploadid = response_json.id; const uploadtoken = response_json.uploadToken; // Upload file const url_upload = `${file_server_url}/file/${uploadid}`; const form = new FormData(); const blob = new Blob([data]); form.append('file', blob, dataname); const upload_headers = { 'X-UploadToken': uploadtoken }; const upload_response = await fetch(url_upload, { method: 'POST', headers: upload_headers, body: form }); const upload_json = await upload_response.json(); const fileid = upload_json.id; const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`; return { status: upload_response.status, uploadid, fileid, url }; } ``` --- ### Python Implementation #### Module Structure ```python # natsbridge.py import asyncio import base64 import json import uuid import time from typing import Any, Dict, List, Tuple, Union, Callable from dataclasses import dataclass, field from datetime import datetime try: import pyarrow as arrow import pyarrow.parquet as pq ARROW_AVAILABLE = True except ImportError: ARROW_AVAILABLE = False try: import aiohttp import nats from nats.aio.client import Client as NATSClient NATS_AVAILABLE = True except ImportError: NATS_AVAILABLE = False DEFAULT_SIZE_THRESHOLD = 1_000_000 DEFAULT_BROKER_URL = "nats://localhost:4222" DEFAULT_FILESERVER_URL = "http://localhost:8080" @dataclass class MsgPayloadV1: """Message payload structure.""" id: str dataname: str payload_type: str transport: str encoding: str size: int data: Union[str, bytes] metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class MsgEnvelopeV1: """Message envelope structure.""" correlation_id: str msg_id: str timestamp: str send_to: str msg_purpose: str sender_name: str sender_id: str receiver_name: str receiver_id: str reply_to: str reply_to_msg_id: str broker_url: str metadata: Dict[str, Any] = field(default_factory=dict) payloads: List[MsgPayloadV1] = field(default_factory=list) class NATSBridge: """Cross-platform NATS bridge implementation.""" def __init__(self, broker_url: str = None, fileserver_url: str = None): self.broker_url = broker_url or DEFAULT_BROKER_URL self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL self._nats_client: NATSClient = None async def smartsend(self, subject: str, data: List[Tuple[str, Any, str]], **kwargs) -> Tuple[Dict, str]: """Send data via NATS.""" pass async def smartreceive(self, msg: Any, **kwargs) -> Dict: """Receive and process NATS message.""" pass ``` #### smartsend Implementation ```python import asyncio import base64 import json import uuid from typing import Any, Dict, List, Tuple, Union, Callable from datetime import datetime DEFAULT_SIZE_THRESHOLD = 1_000_000 DEFAULT_BROKER_URL = "nats://localhost:4222" DEFAULT_FILESERVER_URL = "http://localhost:8080" async def smartsend( subject: str, data: List[Tuple[str, Any, str]], broker_url: str = DEFAULT_BROKER_URL, fileserver_url: str = DEFAULT_FILESERVER_URL, fileserver_upload_handler: Callable = plik_oneshot_upload, size_threshold: int = DEFAULT_SIZE_THRESHOLD, correlation_id: str = None, msg_purpose: str = "chat", sender_name: str = "NATSBridge", receiver_name: str = "", receiver_id: str = "", reply_to: str = "", reply_to_msg_id: str = "", is_publish: bool = True, nats_connection: Any = None, msg_id: str = None, sender_id: str = None ) -> Tuple[Dict, str]: """ Send data via NATS with automatic transport selection. Args: subject: NATS subject to publish to data: List of (dataname, data, type) tuples **kwargs: Additional options Returns: Tuple of (env, env_json_str) """ if correlation_id is None: correlation_id = str(uuid.uuid4()) if msg_id is None: msg_id = str(uuid.uuid4()) if sender_id is None: sender_id = str(uuid.uuid4()) print(f"[Correlation: {correlation_id}] Starting smartsend for subject: {subject}") # Process payloads payloads = [] for dataname, payload_data, payload_type in data: payload_bytes = _serialize_data(payload_data, payload_type) payload_size = len(payload_bytes) print(f"[Correlation: {correlation_id}] Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes") if payload_size < size_threshold: # Direct path payload_b64 = base64.b64encode(payload_bytes).decode('utf-8') print(f"[Correlation: {correlation_id}] Using direct transport for {payload_size} bytes") payloads.append({ 'id': str(uuid.uuid4()), 'dataname': dataname, 'payload_type': payload_type, 'transport': 'direct', 'encoding': 'base64', 'size': payload_size, 'data': payload_b64, 'metadata': {'payload_bytes': payload_size} }) else: # Link path print(f"[Correlation: {correlation_id}] Using link transport, uploading to fileserver") response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes) if response['status'] != 200: raise Exception(f"Failed to upload data to fileserver: {response['status']}") print(f"[Correlation: {correlation_id}] Uploaded to URL: {response['url']}") payloads.append({ 'id': str(uuid.uuid4()), 'dataname': dataname, 'payload_type': payload_type, 'transport': 'link', 'encoding': 'none', 'size': payload_size, 'data': response['url'], 'metadata': {} }) # Build envelope env = { 'correlation_id': correlation_id, 'msg_id': msg_id, 'timestamp': datetime.utcnow().isoformat() + 'Z', 'send_to': subject, 'msg_purpose': msg_purpose, 'sender_name': sender_name, 'sender_id': sender_id, 'receiver_name': receiver_name, 'receiver_id': receiver_id, 'reply_to': reply_to, 'reply_to_msg_id': reply_to_msg_id, 'broker_url': broker_url, 'metadata': {}, 'payloads': payloads } env_json_str = json.dumps(env) if is_publish: if nats_connection: await publish_message(nats_connection, subject, env_json_str, correlation_id) else: await publish_message(broker_url, subject, env_json_str, correlation_id) return env, env_json_str ``` #### serializeData Implementation ```python import base64 import json from typing import Any try: import pyarrow as arrow import pyarrow.feather as feather import pyarrow.ipc as ipc ARROW_AVAILABLE = True except ImportError: ARROW_AVAILABLE = False def _serialize_data(data: Any, payload_type: str) -> bytes: """ Serialize data to bytes based on type. Note: Python uses "table" as a single type for both Arrow and JSON table serialization. Julia/JavaScript use separate "arrowtable" and "jsontable" types. """ if payload_type == 'text': if isinstance(data, str): return data.encode('utf-8') else: raise ValueError('Text data must be a string') elif payload_type == 'dictionary': json_str = json.dumps(data) return json_str.encode('utf-8') elif payload_type == 'table': # Python uses "table" for both arrowtable and jsontable if not ARROW_AVAILABLE: raise RuntimeError('pyarrow not available for table serialization') import io buf = io.BytesIO() import pandas as pd if isinstance(data, pd.DataFrame): # Serialize DataFrame to Arrow table = arrow.Table.from_pandas(data) sink = ipc.new_file(buf, table.schema) ipc.write_table(table, sink) sink.close() return buf.getvalue() elif isinstance(data, arrow.Table): sink = ipc.new_file(buf, data.schema) ipc.write_table(data, sink) sink.close() return buf.getvalue() else: raise ValueError('Table data must be a pandas DataFrame or pyarrow Table') elif payload_type in ('image', 'audio', 'video', 'binary'): if isinstance(data, (bytes, bytearray)): return bytes(data) else: raise ValueError(f'{payload_type} data must be bytes') else: raise ValueError(f'Unknown payload_type: {payload_type}') ``` #### deserializeData Implementation ```python import base64 import json from typing import Any try: import pyarrow as arrow import pyarrow.feather as feather import pyarrow.ipc as ipc ARROW_AVAILABLE = True except ImportError: ARROW_AVAILABLE = False def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any: """ Deserialize bytes to data based on type. Note: Python uses "table" as a single type for both Arrow and JSON table deserialization. Julia/JavaScript use separate "arrowtable" and "jsontable" types. """ if payload_type == 'text': return data.decode('utf-8') elif payload_type == 'dictionary': json_str = data.decode('utf-8') return json.loads(json_str) elif payload_type == 'table': # Python uses "table" for both arrowtable and jsontable if not ARROW_AVAILABLE: raise RuntimeError('pyarrow not available for table deserialization') import io buf = io.BytesIO(data) reader = ipc.open_file(buf) return reader.read_all().to_pandas() elif payload_type in ('image', 'audio', 'video', 'binary'): return data else: raise ValueError(f'Unknown payload_type: {payload_type}') ``` #### fetchWithBackoff Implementation ```python import asyncio import aiohttp from typing import Callable async def fetch_with_backoff( url: str, max_retries: int, base_delay: int, max_delay: int, correlation_id: str ) -> bytes: """Fetch URL with exponential backoff.""" delay = base_delay for attempt in range(1, max_retries + 1): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: print(f"[Correlation: {correlation_id}] Successfully fetched data from {url} on attempt {attempt}") return await response.read() else: raise Exception(f"Failed to fetch: {response.status}") except Exception as e: print(f"[Correlation: {correlation_id}] Attempt {attempt} failed: {type(e).__name__}") if attempt < max_retries: await asyncio.sleep(delay / 1000.0) delay = min(delay * 2, max_delay) raise Exception(f"Failed to fetch data after {max_retries} attempts") ``` #### plikOneshotUpload Implementation ```python import aiohttp import json from typing import Dict, Any async def plik_oneshot_upload( file_server_url: str, dataname: str, data: bytes ) -> Dict[str, Any]: """Upload data to plik server in one-shot mode.""" # Get upload id async with aiohttp.ClientSession() as session: url_getUploadID = f"{file_server_url}/upload" headers = {'Content-Type': 'application/json'} body = json.dumps({"OneShot": True}) async with session.post(url_getUploadID, headers=headers, data=body) as response: response_json = await response.json() uploadid = response_json['id'] uploadtoken = response_json['uploadToken'] # Upload file url_upload = f"{file_server_url}/file/{uploadid}" headers = {'X-UploadToken': uploadtoken} form = aiohttp.FormData() form.add_field('file', data, filename=dataname, content_type='application/octet-stream') async with session.post(url_upload, headers=headers, data=form) as upload_response: upload_json = await upload_response.json() fileid = upload_json['id'] url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}" return { 'status': upload_response.status, 'uploadid': uploadid, 'fileid': fileid, 'url': url } ``` --- ### MicroPython Implementation #### Limitations MicroPython has significant constraints compared to desktop implementations: | Feature | Desktop | MicroPython | |---------|---------|-------------| | Memory | Unlimited | ~256KB - 1MB | | Arrow IPC | ✅ | ❌ (not supported) | | Async/Await | ✅ | ❌ (synchronous only) | | Large payloads (>1MB) | ✅ | ❌ (enforced limit) | | arrowtable | ✅ | ❌ (not supported) | | jsontable | ✅ | ❌ (not supported) | | Multiple payloads | ✅ | ⚠️ (limited) | **Note:** MicroPython does NOT support table types (`arrowtable` or `jsontable`) due to memory constraints. #### Module Structure ```python # natsbridge_mpy.py (MicroPython) import network import time import json import base64 import uos import struct import random # Constants DEFAULT_SIZE_THRESHOLD = 100000 # 100KB for MicroPython DEFAULT_BROKER_URL = "nats://localhost:4222" DEFAULT_FILESERVER_URL = "http://localhost:8080" MAX_PAYLOAD_SIZE = 50000 # Hard limit (lower than threshold for safety) # Note: MicroPython does NOT support table types (arrowtable/jsontable) # Only supports: text, dictionary, image, audio, video, binary class NATSBridge: """MicroPython NATS bridge implementation.""" def __init__(self, broker_url=None, fileserver_url=None): self.broker_url = broker_url or DEFAULT_BROKER_URL self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL self._nats_conn = None def smartsend(self, subject, data, **kwargs): """Send data (synchronous).""" correlation_id = self._generate_uuid() msg_id = self._generate_uuid() sender_id = self._generate_uuid() print(f"[Correlation: {correlation_id}] Starting smartsend") payloads = [] for dataname, payload_data, payload_type in data: payload_bytes = self._serialize_data(payload_data, payload_type) payload_size = len(payload_bytes) if payload_size > MAX_PAYLOAD_SIZE: raise MemoryError(f"Payload {dataname} exceeds max size {MAX_PAYLOAD_SIZE}") if payload_size < DEFAULT_SIZE_THRESHOLD: # Direct path payload_b64 = base64.b64encode(payload_bytes).decode('ascii') payloads.append({ 'id': self._generate_uuid(), 'dataname': dataname, 'payload_type': payload_type, 'transport': 'direct', 'encoding': 'base64', 'size': payload_size, 'data': payload_b64 }) else: # Link path (limited support) response = self._sync_fileserver_upload(self.fileserver_url, dataname, payload_bytes) payloads.append({ 'id': self._generate_uuid(), 'dataname': dataname, 'payload_type': payload_type, 'transport': 'link', 'encoding': 'none', 'size': payload_size, 'data': response['url'] }) env = { 'correlation_id': correlation_id, 'msg_id': msg_id, 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), 'send_to': subject, 'msg_purpose': kwargs.get('msg_purpose', 'chat'), 'sender_name': kwargs.get('sender_name', 'NATSBridge'), 'sender_id': sender_id, 'receiver_name': kwargs.get('receiver_name', ''), 'receiver_id': kwargs.get('receiver_id', ''), 'reply_to': kwargs.get('reply_to', ''), 'reply_to_msg_id': kwargs.get('reply_to_msg_id', ''), 'broker_url': self.broker_url, 'metadata': {}, 'payloads': payloads } env_json_str = json.dumps(env) # Publish self._publish(subject, env_json_str, correlation_id) return env, env_json_str def smartreceive(self, msg, **kwargs): """Receive and process message (synchronous).""" env_json_obj = json.loads(msg.payload) correlation_id = env_json_obj['correlation_id'] payloads_list = [] for payload in env_json_obj['payloads']: transport = payload['transport'] dataname = payload['dataname'] if transport == 'direct': payload_b64 = payload['data'] payload_bytes = base64.b64decode(payload_b64) data_type = payload['payload_type'] data = self._deserialize_data(payload_bytes, data_type) payloads_list.append((dataname, data, data_type)) elif transport == 'link': url = payload['data'] downloaded_data = self._sync_fileserver_download( url, kwargs.get('max_retries', 3), kwargs.get('base_delay', 100), kwargs.get('max_delay', 1000), correlation_id ) data_type = payload['payload_type'] data = self._deserialize_data(downloaded_data, data_type) payloads_list.append((dataname, data, data_type)) env_json_obj['payloads'] = payloads_list return env_json_obj def _serialize_data(self, data, payload_type): """ Serialize data (MicroPython version). Note: MicroPython does NOT support table types (arrowtable/jsontable). Only supports: text, dictionary, image, audio, video, binary """ if payload_type == 'text': if isinstance(data, str): return data.encode('utf-8') else: raise ValueError('Text data must be a string') elif payload_type == 'dictionary': json_str = json.dumps(data) return json_str.encode('utf-8') elif payload_type in ('image', 'audio', 'video', 'binary'): if isinstance(data, (bytes, bytearray, memoryview)): return bytes(data) else: raise ValueError(f'{payload_type} data must be bytes') else: raise ValueError(f'Unknown payload_type: {payload_type}') def _deserialize_data(self, data, payload_type): """ Deserialize data (MicroPython version). Note: MicroPython does NOT support table types (arrowtable/jsontable). Only supports: text, dictionary, image, audio, video, binary """ if payload_type == 'text': return data.decode('utf-8') elif payload_type == 'dictionary': json_str = data.decode('utf-8') return json.loads(json_str) elif payload_type in ('image', 'audio', 'video', 'binary'): return data else: raise ValueError(f'Unknown payload_type: {payload_type}') def _generate_uuid(self): """Generate simple UUID (MicroPython compatible).""" return 'mp-%04x%04x-%04x-%04x-%04x-%04x%04x%04x' % ( time.time_ns() // (10**6) % 0xFFFFFFFF, time.time_ns() % 0xFFFFFFFF, time.time_ns() >> 32 & 0xFFFF, time.time_ns() >> 48 & 0xFFFF, time.time_ns() >> 64 & 0xFFFF, time.time_ns() >> 80 & 0xFFFF, time.time_ns() >> 96 & 0xFFFF, time.time_ns() >> 112 & 0xFFFF ) def _sync_fileserver_upload(self, url, dataname, data): """Synchronous file upload (limited).""" # Simplified implementation for MicroPython # In practice, would use network.HTTP or similar raise NotImplementedError("File upload not implemented in MicroPython") def _sync_fileserver_download(self, url, max_retries, base_delay, max_delay, correlation_id): """Synchronous file download with backoff.""" # Simplified implementation for MicroPython raise NotImplementedError("File download not implemented in MicroPython") def _publish(self, subject, message, correlation_id): """Publish message to NATS.""" # Simplified implementation for MicroPython raise NotImplementedError("NATS publishing not implemented in MicroPython") ``` --- ## Configuration ### Environment Variables | Variable | Default | Description | |----------|---------|-------------| | `NATS_URL` | `nats://localhost:4222` | NATS server URL | | `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | | `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes (1MB) | ### MicroPython Configuration ```python # micropython.conf NATS_URL = "nats://broker.local:4222" FILESERVER_URL = "http://fileserver.local:8080" SIZE_THRESHOLD = 100000 # Lower threshold for memory-constrained devices MAX_PAYLOAD_SIZE = 50000 # Hard limit for MicroPython ``` --- ## Performance Considerations ### Zero-Copy Reading | Platform | Strategy | |----------|----------| | **Julia** | `Arrow.read()` with memory-mapped files | | **JavaScript** | `ArrayBuffer` with `DataView` | | **Python** | `pyarrow` memory mapping | | **MicroPython** | Not available (streaming only) | ### Exponential Backoff All platforms implement exponential backoff for HTTP downloads: ```python # Python async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id): delay = base_delay for attempt in range(1, max_retries + 1): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: return await response.read() except Exception as e: if attempt < max_retries: await asyncio.sleep(delay / 1000.0) delay = min(delay * 2, max_delay) raise Exception("Failed to fetch after max retries") ``` ### Correlation ID Logging All platforms use correlation IDs for distributed tracing: ``` [timestamp] [Correlation: abc123] Message published to subject ``` ### Serialization Performance | Format | Use Case | Pros | Cons | |--------|----------|------|------| | `arrowtable` | Large tabular data | Fast, zero-copy, schema-preserving | Binary format, requires Arrow library, not supported in MicroPython | | `jsontable` | Small/medium tabular data | Human-readable, universal support, works in MicroPython | Slower, larger size, no schema enforcement | --- ## Testing ### Test File Organization | Platform | Sender Tests | Receiver Tests | |----------|--------------|----------------| | **Julia** | `test/test_julia_*_sender.jl` | `test/test_julia_*_receiver.jl` | | **JavaScript** | `test/test_js_*_sender.js` | `test/test_js_*_receiver.js` | | **Python** | `test/test_py_*_sender.py` | `test/test_py_*_receiver.py` | ### Run Tests ```bash # Julia julia test/test_julia_text_sender.jl julia test/test_julia_text_receiver.jl # JavaScript (Node.js) node test/test_js_text_sender.js node test/test_js_text_receiver.js # Python python3 test/test_py_text_sender.py python3 test/test_py_text_receiver.py ``` --- ## Troubleshooting ### Common Issues 1. **NATS Connection Failed** - Ensure NATS server is running - Check `broker_url` configuration 2. **HTTP Upload Failed** - Ensure file server is running - Check `fileserver_url` configuration - Verify upload permissions 3. **Arrow IPC Deserialization Error** - Ensure data is properly serialized to Arrow format - Check Arrow version compatibility - MicroPython doesn't support Arrow IPC 4. **Memory Constraints (MicroPython)** - Reduce `size_threshold` - Use direct transport only (< 100KB) - Avoid large payloads - Use `jsontable` instead of `arrowtable` (arrowtable not supported) --- ## Summary This cross-platform NATS bridge provides: 1. **High-Level API Parity**: Identical `smartsend()` and `smartreceive()` signatures across all platforms 2. **Idiomatic Implementations**: - **Julia**: Multiple dispatch, struct-based design, native Arrow IPC - **JavaScript**: Async/await, prototype-based utilities, class-based NATS client - **Python**: Class-based design with dataclasses, type hints, async/await - **MicroPython**: Synchronous API, memory-constrained optimizations 3. **Message Format Consistency**: Identical JSON schemas across all platforms 4. **Handler Abstraction**: File server operations abstracted through configurable handlers 5. **Platform-Specific Optimizations**: - **Arrow IPC** (`arrowtable`): Efficient binary format for large tabular data (not supported in MicroPython) - **JSON** (`jsontable`): Universal human-readable format for smaller tables (works in Julia, JavaScript, Python; NOT supported in MicroPython) The Julia implementation in [`src/NATSBridge.jl`](src/NATSBridge.jl:1) serves as the ground truth for API design and behavior. ### Datatype Summary | Datatype | Serialization | Use Case | Encoding | Supported Platforms | |----------|---------------|----------|----------|---------------------| | `arrowtable` | Apache Arrow IPC | Large tabular data, schema-preserving | `arrow-ipc` → `base64` | Julia, JavaScript, Python | | `jsontable` | JSON | Small/medium tabular data, human-readable | `json` → `base64` | Julia, JavaScript, Python | | `table` | Apache Arrow IPC (Python only) | Python's unified table type | `arrow-ipc` → `base64` | Python |