Files
NATSBridge/docs/implementation.md
2026-03-09 15:41:24 +07:00

58 KiB

Cross-Platform Implementation Guide: Bi-Directional Data Bridge

Overview

This document describes the detailed implementation of the high-performance, bi-directional data bridge using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. The system is implemented across three platforms with high-level API parity while maintaining idiomatic implementations for each language.

Supported Platforms:

  • Julia - Ground truth implementation (reference)
  • JavaScript - Node.js and browser implementation
  • Python/MicroPython - Desktop and embedded implementation

Cross-Platform Compatibility Notes

1. Python Payload Type Naming

The Python implementation uses "table" as a single payload type for both Arrow and JSON table serialization, while Julia and JavaScript use separate types ("arrowtable" and "jsontable"):

Platform Table Types
Julia "arrowtable", "jsontable"
JavaScript "arrowtable", "jsontable"
Python "table" (single type)
MicroPython Not supported

Impact: When exchanging data between Python and Julia/JavaScript, the payload type will differ. Python code should use "table" while Julia/JavaScript code should use "arrowtable" or "jsontable".

2. Direct Transport Encoding Field

The encoding field in direct transport payloads differs between platforms:

Platform Encoding for Direct Transport
Julia Preserves original type: "base64", "json", or "arrow-ipc"
JavaScript Preserves original type: "base64", "json", or "arrow-ipc"
Python Always "base64" for all direct transport payloads
MicroPython Always "base64" for all direct transport payloads

Impact: The encoding field may not accurately reflect the original serialization format when using Python or MicroPython.

3. MicroPython Limitations

MicroPython has significant constraints that affect feature support:

Feature Desktop Platforms MicroPython
arrowtable (not supported - memory constraints)
jsontable (not supported - memory constraints)
table (not supported - memory constraints)
Async/await (synchronous only)
File upload/download ⚠️ (placeholder implementations)
MAX_PAYLOAD_SIZE 1MB+ 50KB (hard limit)
DEFAULT_SIZE_THRESHOLD 1MB 100KB

Impact: MicroPython should only be used for small payloads with direct transport. File server operations are not fully implemented.


Implementation Files

Language Implementation File Description
Julia src/NATSBridge.jl Full Julia implementation with Arrow IPC support
JavaScript src/natsbridge.js Node.js/browser implementation
Python src/natsbridge.py Desktop Python implementation
MicroPython src/natsbridge_mpy.py MicroPython implementation (limited features)

File Server Handler Architecture

The system uses handler functions to abstract file server operations, allowing support for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).

Handler Function Signatures

Julia

# Upload handler - uploads data to file server and returns URL
fileserver_upload_handler(
    fileserver_url::String,
    dataname::String,
    data::Vector{UInt8}
)::Dict{String, Any}

# Download handler - fetches data from file server URL with exponential backoff
fileserver_download_handler(
    url::String,
    max_retries::Int,
    base_delay::Int,
    max_delay::Int,
    correlation_id::String
)::Vector{UInt8}

JavaScript

// Upload handler - async function
async function fileserver_upload_handler(
    fileserver_url,
    dataname,
    data  // Uint8Array
) {
    // Returns: { status, uploadid, fileid, url }
}

// Download handler - async function
async function fileserver_download_handler(
    url,
    max_retries,
    base_delay,
    max_delay,
    correlation_id
) {
    // Returns: Uint8Array
}

Python

# Upload handler - async function
async def fileserver_upload_handler(
    fileserver_url: str,
    dataname: str,
    data: bytes
) -> Dict[str, Any]:
    """
    Upload data to file server.
    
    Returns:
        Dict with keys: 'status', 'uploadid', 'fileid', 'url'
    """
    pass

# Download handler - async function
async def fileserver_download_handler(
    url: str,
    max_retries: int,
    base_delay: int,
    max_delay: int,
    correlation_id: str
) -> bytes:
    """
    Download data from URL with exponential backoff.
    
    Returns:
        Downloaded bytes
    """
    pass

MicroPython

# Upload handler - synchronous (no async in MicroPython)
def fileserver_upload_handler(
    fileserver_url: str,
    dataname: str,
    data: bytearray
) -> Dict:
    """
    Upload data to file server (synchronous).
    
    Returns:
        Dict with keys: 'status', 'url'
    """
    pass

# Download handler - synchronous
def fileserver_download_handler(
    url: str,
    max_retries: int,
    base_delay: int,
    max_delay: int,
    correlation_id: str
) -> bytearray:
    """
    Download data from URL with exponential backoff (synchronous).
    
    Returns:
        Downloaded bytes
    """
    pass

Multi-Payload Support (Standard API)

The system uses a standardized list-of-tuples format for all payload operations across all platforms.

API Standard

# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]

# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
{
  "correlation_id": "...",
  "msg_id": "...",
  "timestamp": "...",
  "send_to": "...",
  "msg_purpose": "...",
  "sender_name": "...",
  "sender_id": "...",
  "receiver_name": "...",
  "receiver_id": "...",
  "reply_to": "...",
  "reply_to_msg_id": "...",
  "broker_url": "...",
  "metadata": {...},
  "payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
}

Supported Types

Type Julia JavaScript Python MicroPython
text String string str str
dictionary Dict, NamedTuple Object, Array dict, list dict
arrowtable DataFrame, Arrow.Table Array<Object> (input) → Buffer (Arrow IPC) pandas.DataFrame, bytes (Arrow IPC) (not supported)
jsontable Vector{NamedTuple}, Vector{Dict} Array<Object> list[dict], list ⚠️ (limited)
table pandas.DataFrame, bytes (Arrow IPC)
image Vector{UInt8} Uint8Array, Buffer bytes bytearray
audio Vector{UInt8} Uint8Array, Buffer bytes bytearray
video Vector{UInt8} Uint8Array, Buffer bytes bytearray
binary Vector{UInt8}, IOBuffer Uint8Array, Buffer bytes, bytearray bytearray

Note: Python uses "table" as a single type for both Arrow and JSON table serialization. When exchanging data between Python and Julia/JavaScript, ensure the payload type is correctly translated ("table""arrowtable" or "jsontable").


Platform-Specific Implementations

Julia Implementation

Module Structure

module NATSBridge
    using NATS, JSON, Arrow, HTTP, UUIDs, Dates, Base64
    
    # Constants
    const DEFAULT_SIZE_THRESHOLD = 1_000_000  # 1MB
    const DEFAULT_BROKER_URL = "nats://localhost:4222"
    const DEFAULT_FILESERVER_URL = "http://localhost:8080"
    
    # Structs
    struct msg_payload_v1
        id::String
        dataname::String
        payload_type::String
        transport::String
        encoding::String
        size::Integer
        data::Any
        metadata::Dict{String, Any}
    end
    
    struct msg_envelope_v1
        correlation_id::String
        msg_id::String
        timestamp::String
        send_to::String
        msg_purpose::String
        sender_name::String
        sender_id::String
        receiver_name::String
        receiver_id::String
        reply_to::String
        reply_to_msg_id::String
        broker_url::String
        metadata::Dict{String, Any}
        payloads::Vector{msg_payload_v1}
    end
    
    # Main functions
    function smartsend(...) end
    function smartreceive(...) end
    
    # Utility functions
    function _serialize_data(...) end
    function _deserialize_data(...) end
    function envelope_to_json(...) end
    function log_trace(...) end
    
    # File server handlers
    function plik_oneshot_upload(...) end
    function _fetch_with_backoff(...) end
    function publish_message(...) end
    
    # Internal helpers
    function _get_payload_bytes(...) end
end

Multiple Dispatch Pattern

Julia leverages multiple dispatch for type-specific implementations:

# publish_message has two overloads based on argument types
function publish_message(broker_url::String, subject::String, message::String, correlation_id::String)
    conn = NATS.connect(broker_url)
    publish_message(conn, subject, message, correlation_id)
end

function publish_message(conn::NATS.Connection, subject::String, message::String, correlation_id::String)
    try
        NATS.publish(conn, subject, message)
        log_trace(correlation_id, "Message published to $subject")
    finally
        NATS.drain(conn)
    end
end

# Type-specific serialization
function _serialize_data(data::String, payload_type::String)
    # Text handling
    return Vector{UInt8}(data)
end

function _serialize_data(data::Dict, payload_type::String)
    # Dictionary handling
    json_str = JSON.json(data)
    return Vector{UInt8}(json_str)
end

function _serialize_data(data::DataFrame, payload_type::String)
    # Table handling - arrowtable
    io = IOBuffer()
    Arrow.write(io, data)
    return take!(io)
end

smartsend Implementation

function smartsend(
    subject::String,
    data::AbstractArray{Tuple{String, T1, String}, 1};
    broker_url::String = DEFAULT_BROKER_URL,
    fileserver_url = DEFAULT_FILESERVER_URL,
    fileserver_upload_handler::Function = plik_oneshot_upload,
    size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
    correlation_id::String = string(uuid4()),
    msg_purpose::String = "chat",
    sender_name::String = "NATSBridge",
    receiver_name::String = "",
    receiver_id::String = "",
    reply_to::String = "",
    reply_to_msg_id::String = "",
    is_publish::Bool = true,
    NATS_connection::Union{NATS.Connection, Nothing} = nothing,
    msg_id::String = string(uuid4()),
    sender_id::String = string(uuid4())
)::Tuple{msg_envelope_v1, String} where {T1<:Any}

    log_trace(correlation_id, "Starting smartsend for subject: $subject")
    
    # Process each payload in the list
    payloads = msg_payload_v1[]
    for (dataname, payload_data, payload_type) in data
        # Serialize data based on type
        payload_bytes = _serialize_data(payload_data, payload_type)
        
        payload_size = length(payload_bytes)
        log_trace(correlation_id, "Serialized payload '$dataname' size: $payload_size bytes")
        
        # Decision: Direct vs Link
        if payload_size < size_threshold
            # Direct path - Base64 encode and send via NATS
            payload_b64 = Base64.base64encode(payload_bytes)
            log_trace(correlation_id, "Using direct transport for $payload_size bytes")
            
            payload = msg_payload_v1(
                payload_b64,
                payload_type;
                id = string(uuid4()),
                dataname = dataname,
                transport = "direct",
                encoding = "base64",
                size = payload_size,
                metadata = Dict{String, Any}("payload_bytes" => payload_size)
            )
            push!(payloads, payload)
        else
            # Link path - Upload to HTTP server, send URL via NATS
            log_trace(correlation_id, "Using link transport, uploading to fileserver")
            
            response = fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
            
            if response["status"] != 200
                error("Failed to upload data to fileserver: $(response["status"])")
            end
            
            url = response["url"]
            log_trace(correlation_id, "Uploaded to URL: $url")

            payload = msg_payload_v1(
                url,
                payload_type;
                id = string(uuid4()),
                dataname = dataname,
                transport = "link",
                encoding = "none",
                size = payload_size,
                metadata = Dict{String, Any}()
            )
            push!(payloads, payload)
        end
    end

    # Create msg_envelope_v1 with all payloads
    # Note: First positional argument is "send_to" (the NATS subject), not "subject"
    env = msg_envelope_v1(
        subject,  # send_to: NATS subject to publish to
        payloads;
        correlation_id = correlation_id,
        msg_id = msg_id,
        msg_purpose = msg_purpose,
        sender_name = sender_name,
        sender_id = sender_id,
        receiver_name = receiver_name,
        receiver_id = receiver_id,
        reply_to = reply_to,
        reply_to_msg_id = reply_to_msg_id,
        broker_url = broker_url,
        metadata = Dict{String, Any}(),
    )
    
    env_json_str = envelope_to_json(env)
    
    if is_publish == false
        # skip publish
    elseif is_publish == true && NATS_connection === nothing
        publish_message(broker_url, subject, env_json_str, correlation_id)
    elseif is_publish == true && NATS_connection !== nothing
        publish_message(NATS_connection, subject, env_json_str, correlation_id)
    end
    
    return (env, env_json_str)
end

smartreceive Implementation

function smartreceive(
    msg::NATS.Msg;
    fileserver_download_handler::Function = _fetch_with_backoff,
    max_retries::Int = 5,
    base_delay::Int = 100,
    max_delay::Int = 5000
)::JSON.Object{String, Any}
    # Parse the JSON envelope
    env_json_obj = JSON.parse(String(msg.payload))
    log_trace(env_json_obj["correlation_id"], "Processing received message")
    
    # Process all payloads in the envelope
    payloads_list = Tuple{String, Any, String}[]
    
    num_payloads = length(env_json_obj["payloads"])
      
    for i in 1:num_payloads
        payload = env_json_obj["payloads"][i]
        transport = String(payload["transport"])
        dataname = String(payload["dataname"])
        
        if transport == "direct"
            log_trace(env_json_obj["correlation_id"], "Direct transport - decoding payload '$dataname'")
            
            # Extract base64 payload from the payload
            payload_b64 = String(payload["data"])
            
            # Decode Base64 payload
            payload_bytes = Base64.base64decode(payload_b64)
            
            # Deserialize based on type
            data_type = String(payload["payload_type"])
            data = _deserialize_data(payload_bytes, data_type, env_json_obj["correlation_id"])
            
            push!(payloads_list, (dataname, data, data_type))
        elseif transport == "link"
            # Extract download URL from the payload
            url = String(payload["data"])
            log_trace(env_json_obj["correlation_id"], "Link transport - fetching '$dataname' from URL: $url")
            
            # Fetch with exponential backoff using the download handler
            downloaded_data = fileserver_download_handler(url, max_retries, base_delay, max_delay, env_json_obj["correlation_id"])

            # Deserialize based on type
            data_type = String(payload["payload_type"])
            data = _deserialize_data(downloaded_data, data_type, env_json_obj["correlation_id"])
            
            push!(payloads_list, (dataname, data, data_type))
        else
            error("Unknown transport type for payload '$dataname': $(transport)")
        end
    end
    env_json_obj["payloads"] = payloads_list
    return env_json_obj
end

_serialize_data Implementation

function _serialize_data(data::Any, payload_type::String)
    if payload_type == "text"
        if isa(data, String)
            data_bytes = Vector{UInt8}(data)
            return data_bytes  
        else
            error("Text data must be a String")
        end
    elseif payload_type == "dictionary"
        json_str = JSON.json(data)
        json_str_bytes = Vector{UInt8}(json_str)
        return json_str_bytes 
    elseif payload_type == "arrowtable"
        # Serialize DataFrame to Arrow IPC format
        io = IOBuffer()
        Arrow.write(io, data)
        return take!(io)
    elseif payload_type == "jsontable"
        # Serialize to JSON
        # data is Vector{NamedTuple} or Vector{Dict}
        json_str = JSON.json(data)
        return Vector{UInt8}(json_str)
    elseif payload_type == "image"
        if isa(data, Vector{UInt8})
            return data
        else
            error("Image data must be Vector{UInt8}")
        end
    elseif payload_type == "audio"
        if isa(data, Vector{UInt8})
            return data
        else
            error("Audio data must be Vector{UInt8}")
        end
    elseif payload_type == "video"
        if isa(data, Vector{UInt8})
            return data
        else
            error("Video data must be Vector{UInt8}")
        end
    elseif payload_type == "binary"
        if isa(data, IOBuffer)
            return take!(data)
        elseif isa(data, Vector{UInt8})
            return data
        else
            error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
        end
    else
        error("Unknown payload_type: $payload_type")
    end
end

_deserialize_data Implementation

function _deserialize_data(
    data::Vector{UInt8},
    payload_type::String,
    correlation_id::String
)
    if payload_type == "text"
        return String(data)
    elseif payload_type == "dictionary"
        json_str = String(data)
        return JSON.parse(json_str)
    elseif payload_type == "arrowtable"
        # Deserialize from Arrow IPC format
        io = IOBuffer(data)
        arrow_table = Arrow.Table(io)
        return arrow_table
    elseif payload_type == "jsontable"
        # Deserialize from JSON format
        # Returns Vector{NamedTuple} or Vector{Dict}
        json_str = String(data)
        parsed = JSON.parse(json_str)
        return parsed
    elseif payload_type == "image"
        return data
    elseif payload_type == "audio"
        return data
    elseif payload_type == "video"
        return data
    elseif payload_type == "binary"
        return data
    else
        error("Unknown payload_type: $payload_type")
    end
end

_fetch_with_backoff Implementation

function _fetch_with_backoff(
    url::String,
    max_retries::Int,
    base_delay::Int,
    max_delay::Int,
    correlation_id::String
)
    delay = base_delay
    for attempt in 1:max_retries
        try
            response = HTTP.request("GET", url)
            if response.status == 200
                log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt")
                return response.body
            else
                error("Failed to fetch: $(response.status)")
            end
        catch e
            log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))")
            
            if attempt < max_retries
                sleep(delay / 1000.0)
                delay = min(delay * 2, max_delay)
            end
        end
    end
    
    error("Failed to fetch data after $max_retries attempts")
end

plik_oneshot_upload Implementation

Overload 1: Upload from binary data

function plik_oneshot_upload(file_server_url::String, dataname::String, data::Vector{UInt8})
    # Get upload id
    url_getUploadID = "$file_server_url/upload"
    headers = ["Content-Type" => "application/json"]
    body = """{ "OneShot" : true }"""
    http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
    response_json = JSON.parse(http_response.body)
    uploadid = response_json["id"]
    uploadtoken = response_json["uploadToken"]

    # Upload file
    file_multipart = HTTP.Multipart(dataname, IOBuffer(data), "application/octet-stream")
    url_upload = "$file_server_url/file/$uploadid"
    headers = ["X-UploadToken" => uploadtoken]

    form = HTTP.Form(Dict(
        "file" => file_multipart
    ))

    http_response = nothing
    try
        http_response = HTTP.post(url_upload, headers, form)
    catch e
        @error "Request failed" exception=e
    end
    response_json = JSON.parse(http_response.body)
    fileid = response_json["id"]

    url = "$file_server_url/file/$uploadid/$fileid/$dataname"

    return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end

Overload 2: Upload from file path

function plik_oneshot_upload(file_server_url::String, filepath::String)
    # Get upload id
    filename = basename(filepath)
    url_getUploadID = "$file_server_url/upload"
    headers = ["Content-Type" => "application/json"]
    body = """{ "OneShot" : true }"""
    http_response = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
    response_json = JSON.parse(http_response.body)

    uploadid = response_json["id"]
    uploadtoken = response_json["uploadToken"]

    # Upload file
    url_upload = "$file_server_url/file/$uploadid"
    headers = ["X-UploadToken" => uploadtoken]
    http_response = open(filepath, "r") do file_stream
        form = HTTP.Form(Dict("file" => file_stream))
        
        # Adding status_exception=false prevents 4xx/5xx from triggering 'catch'
        HTTP.post(url_upload, headers, form; status_exception = false)
        end

    if !isnothing(http_response) && http_response.status == 200
        # Success - response already logged by caller
    else
        error("Failed to upload file: server returned status $(http_response.status)")
    end
    response_json = JSON.parse(http_response.body)
    fileid = response_json["id"]

    # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
    url = "$file_server_url/file/$uploadid/$fileid/$filename"

    return Dict("status" => http_response.status, "uploadid" => uploadid, "fileid" => fileid, "url" => url)
end

JavaScript Implementation

Module Structure

// natsbridge.js
const nats = require('nats');
const crypto = require('crypto');
const fetch = require('node-fetch');

// UUID generation using built-in crypto module
const uuidv4 = () => crypto.randomUUID();

const DEFAULT_SIZE_THRESHOLD = 1_000_000;
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';

class NATSClient {
    constructor(url) {
        this.url = url;
        this.connection = null;
    }

    async connect() {
        this.connection = await nats.connect({ servers: this.url });
        return this.connection;
    }

    async publish(subject, message) {
        if (!this.connection) {
            await this.connect();
        }
        await this.connection.publish(subject, message);
    }

    async close() {
        if (this.connection) {
            this.connection.close();
        }
    }
}

async function smartsend(subject, data, options = {}) {
    // Implementation
}

async function smartreceive(msg, options = {}) {
    // Implementation
}

module.exports = {
    NATSClient,
    smartsend,
    smartreceive,
    plikOneshotUpload,
    fetchWithBackoff
};

smartsend Implementation

const nats = require('nats');
const crypto = require('crypto');
const fetch = require('node-fetch');
const arrow = require('apache-arrow');

// UUID generation using built-in crypto module
const uuidv4 = () => crypto.randomUUID();

const DEFAULT_SIZE_THRESHOLD = 1_000_000;
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';

async function smartsend(subject, data, options = {}) {
    const {
        broker_url = DEFAULT_BROKER_URL,
        fileserver_url = DEFAULT_FILESERVER_URL,
        fileserver_upload_handler = plikOneshotUpload,
        size_threshold = DEFAULT_SIZE_THRESHOLD,
        correlation_id = uuidv4(),
        msg_purpose = 'chat',
        sender_name = 'NATSBridge',
        receiver_name = '',
        receiver_id = '',
        reply_to = '',
        reply_to_msg_id = '',
        is_publish = true,
        nats_connection = null,
        msg_id = uuidv4(),
        sender_id = uuidv4()
    } = options;

    console.log(`[Correlation: ${correlation_id}] Starting smartsend for subject: ${subject}`);

    // Process payloads
    const payloads = [];
    for (const [dataname, payloadData, payloadType] of data) {
        const payloadBytes = await serializeData(payloadData, payloadType);
        const payloadSize = payloadBytes.byteLength;

        console.log(`[Correlation: ${correlation_id}] Serialized payload '${dataname}' (type: ${payloadType}) size: ${payloadSize} bytes`);

        if (payloadSize < size_threshold) {
            // Direct path
            const payloadB64 = bufferToBase64(payloadBytes);
            console.log(`[Correlation: ${correlation_id}] Using direct transport for ${payloadSize} bytes`);

            payloads.push({
                id: uuidv4(),
                dataname,
                payload_type: payloadType,
                transport: 'direct',
                encoding: 'base64',
                size: payloadSize,
                data: payloadB64,
                metadata: { payload_bytes: payloadSize }
            });
        } else {
            // Link path
            console.log(`[Correlation: ${correlation_id}] Using link transport, uploading to fileserver`);

            const response = await fileserver_upload_handler(fileserver_url, dataname, payloadBytes);

            if (response.status !== 200) {
                throw new Error(`Failed to upload data to fileserver: ${response.status}`);
            }

            console.log(`[Correlation: ${correlation_id}] Uploaded to URL: ${response.url}`);

            payloads.push({
                id: uuidv4(),
                dataname,
                payload_type: payloadType,
                transport: 'link',
                encoding: 'none',
                size: payloadSize,
                data: response.url,
                metadata: {}
            });
        }
    }

    // Build envelope
    const env = {
        correlation_id,
        msg_id,
        timestamp: new Date().toISOString(),
        send_to: subject,
        msg_purpose,
        sender_name,
        sender_id,
        receiver_name,
        receiver_id,
        reply_to,
        reply_to_msg_id,
        broker_url,
        metadata: {},
        payloads
    };

    const env_json_str = JSON.stringify(env);

    if (is_publish) {
        if (nats_connection) {
            await publishMessage(nats_connection, subject, env_json_str, correlation_id);
        } else {
            await publishMessage(broker_url, subject, env_json_str, correlation_id);
        }
    }

    return [env, env_json_str];
}

serializeData Implementation

const arrow = require('apache-arrow');

async function serializeData(data, payload_type) {
    if (payload_type === 'text') {
        if (typeof data === 'string') {
            return Buffer.from(data, 'utf8');
        } else {
            throw new Error('Text data must be a string');
        }
    } else if (payload_type === 'dictionary') {
        const jsonStr = JSON.stringify(data);
        return Buffer.from(jsonStr, 'utf8');
    } else if (payload_type === 'arrowtable') {
        // Convert Array<Object> to Arrow IPC
        if (!Array.isArray(data) || data.length === 0) {
            throw new Error('arrowtable data must be a non-empty array of objects');
        }
        
        // Create schema from first row
        const schemaFields = Object.keys(data[0]).map(key => 
            new arrow.Field(key, arrow.any())
        );
        const schema = new arrow.Schema(schemaFields);
        
        // Create writer
        const writer = new arrow.RecordBatchWriter([schema]);
        
        // Write rows
        for (const row of data) {
            const recordBatch = arrow.recordBatch.fromObjects([row], schema);
            writer.write(recordBatch);
        }
        await writer.close();
        
        // Read buffer
        return writer.toBuffer();
    } else if (payload_type === 'jsontable') {
        // Serialize directly to JSON
        const jsonStr = JSON.stringify(data);
        return Buffer.from(jsonStr, 'utf8');
    } else if (payload_type === 'image') {
        if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
            return Buffer.from(data);
        } else {
            throw new Error('Image data must be Uint8Array or Buffer');
        }
    } else if (payload_type === 'audio') {
        if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
            return Buffer.from(data);
        } else {
            throw new Error('Audio data must be Uint8Array or Buffer');
        }
    } else if (payload_type === 'video') {
        if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
            return Buffer.from(data);
        } else {
            throw new Error('Video data must be Uint8Array or Buffer');
        }
    } else if (payload_type === 'binary') {
        if (data instanceof Uint8Array || Buffer.isBuffer(data)) {
            return Buffer.from(data);
        } else {
            throw new Error('Binary data must be Uint8Array or Buffer');
        }
    } else {
        throw new Error(`Unknown payload_type: ${payload_type}`);
    }
}

function bufferToBase64(buffer) {
    return buffer.toString('base64');
}

deserializeData Implementation

const arrow = require('apache-arrow');

async function deserializeData(data, payload_type, correlation_id) {
    if (payload_type === 'text') {
        return Buffer.from(data).toString('utf8');
    } else if (payload_type === 'dictionary') {
        const jsonStr = Buffer.from(data).toString('utf8');
        return JSON.parse(jsonStr);
    } else if (payload_type === 'arrowtable') {
        // Deserialize from Arrow IPC
        const buffer = Buffer.from(data);
        const table = arrow.tableFromRawBytes(buffer);
        return table;
    } else if (payload_type === 'jsontable') {
        // Deserialize from JSON - returns Array<Object>
        const jsonStr = Buffer.from(data).toString('utf8');
        return JSON.parse(jsonStr);
    } else if (payload_type === 'image') {
        return Buffer.from(data);
    } else if (payload_type === 'audio') {
        return Buffer.from(data);
    } else if (payload_type === 'video') {
        return Buffer.from(data);
    } else if (payload_type === 'binary') {
        return Buffer.from(data);
    } else {
        throw new Error(`Unknown payload_type: ${payload_type}`);
    }
}

fetchWithBackoff Implementation

async function fetchWithBackoff(url, max_retries, base_delay, max_delay, correlation_id) {
    let delay = base_delay;
    
    for (let attempt = 1; attempt <= max_retries; attempt++) {
        try {
            const response = await fetch(url);
            
            if (response.status === 200) {
                console.log(`[Correlation: ${correlation_id}] Successfully fetched data from ${url} on attempt ${attempt}`);
                return await response.arrayBuffer();
            } else {
                throw new Error(`Failed to fetch: ${response.status}`);
            }
        } catch (e) {
            console.log(`[Correlation: ${correlation_id}] Attempt ${attempt} failed: ${e.constructor.name}`);
            
            if (attempt < max_retries) {
                await new Promise(resolve => setTimeout(resolve, delay));
                delay = Math.min(delay * 2, max_delay);
            }
        }
    }
    
    throw new Error(`Failed to fetch data after ${max_retries} attempts`);
}

plikOneshotUpload Implementation

async function plikOneshotUpload(file_server_url, dataname, data) {
    // Get upload id
    const url_getUploadID = `${file_server_url}/upload`;
    const headers = { 'Content-Type': 'application/json' };
    const body = JSON.stringify({ OneShot: true });
    
    const http_response = await fetch(url_getUploadID, {
        method: 'POST',
        headers,
        body
    });
    
    const response_json = await http_response.json();
    const uploadid = response_json.id;
    const uploadtoken = response_json.uploadToken;

    // Upload file
    const url_upload = `${file_server_url}/file/${uploadid}`;
    const form = new FormData();
    const blob = new Blob([data]);
    form.append('file', blob, dataname);
    
    const upload_headers = {
        'X-UploadToken': uploadtoken
    };

    const upload_response = await fetch(url_upload, {
        method: 'POST',
        headers: upload_headers,
        body: form
    });
    
    const upload_json = await upload_response.json();
    const fileid = upload_json.id;

    const url = `${file_server_url}/file/${uploadid}/${fileid}/${dataname}`;

    return {
        status: upload_response.status,
        uploadid,
        fileid,
        url
    };
}

Python Implementation

Module Structure

# natsbridge.py
import asyncio
import base64
import json
import uuid
import time
from typing import Any, Dict, List, Tuple, Union, Callable
from dataclasses import dataclass, field
from datetime import datetime

try:
    import pyarrow as arrow
    import pyarrow.parquet as pq
    ARROW_AVAILABLE = True
except ImportError:
    ARROW_AVAILABLE = False

try:
    import aiohttp
    import nats
    from nats.aio.client import Client as NATSClient
    NATS_AVAILABLE = True
except ImportError:
    NATS_AVAILABLE = False


DEFAULT_SIZE_THRESHOLD = 1_000_000
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"


@dataclass
class MsgPayloadV1:
    """Message payload structure."""
    id: str
    dataname: str
    payload_type: str
    transport: str
    encoding: str
    size: int
    data: Union[str, bytes]
    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass
class MsgEnvelopeV1:
    """Message envelope structure."""
    correlation_id: str
    msg_id: str
    timestamp: str
    send_to: str
    msg_purpose: str
    sender_name: str
    sender_id: str
    receiver_name: str
    receiver_id: str
    reply_to: str
    reply_to_msg_id: str
    broker_url: str
    metadata: Dict[str, Any] = field(default_factory=dict)
    payloads: List[MsgPayloadV1] = field(default_factory=list)


class NATSBridge:
    """Cross-platform NATS bridge implementation."""
    
    def __init__(self, broker_url: str = None, fileserver_url: str = None):
        self.broker_url = broker_url or DEFAULT_BROKER_URL
        self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
        self._nats_client: NATSClient = None
    
    async def smartsend(self, subject: str, data: List[Tuple[str, Any, str]], **kwargs) -> Tuple[Dict, str]:
        """Send data via NATS."""
        pass
    
    async def smartreceive(self, msg: Any, **kwargs) -> Dict:
        """Receive and process NATS message."""
        pass

smartsend Implementation

import asyncio
import base64
import json
import uuid
from typing import Any, Dict, List, Tuple, Union, Callable
from datetime import datetime

DEFAULT_SIZE_THRESHOLD = 1_000_000
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"


async def smartsend(
    subject: str,
    data: List[Tuple[str, Any, str]],
    broker_url: str = DEFAULT_BROKER_URL,
    fileserver_url: str = DEFAULT_FILESERVER_URL,
    fileserver_upload_handler: Callable = plik_oneshot_upload,
    size_threshold: int = DEFAULT_SIZE_THRESHOLD,
    correlation_id: str = None,
    msg_purpose: str = "chat",
    sender_name: str = "NATSBridge",
    receiver_name: str = "",
    receiver_id: str = "",
    reply_to: str = "",
    reply_to_msg_id: str = "",
    is_publish: bool = True,
    nats_connection: Any = None,
    msg_id: str = None,
    sender_id: str = None
) -> Tuple[Dict, str]:
    """
    Send data via NATS with automatic transport selection.
    
    Args:
        subject: NATS subject to publish to
        data: List of (dataname, data, type) tuples
        **kwargs: Additional options
    
    Returns:
        Tuple of (env, env_json_str)
    """
    if correlation_id is None:
        correlation_id = str(uuid.uuid4())
    if msg_id is None:
        msg_id = str(uuid.uuid4())
    if sender_id is None:
        sender_id = str(uuid.uuid4())
    
    print(f"[Correlation: {correlation_id}] Starting smartsend for subject: {subject}")
    
    # Process payloads
    payloads = []
    for dataname, payload_data, payload_type in data:
        payload_bytes = _serialize_data(payload_data, payload_type)
        payload_size = len(payload_bytes)
        
        print(f"[Correlation: {correlation_id}] Serialized payload '{dataname}' (type: {payload_type}) size: {payload_size} bytes")
        
        if payload_size < size_threshold:
            # Direct path
            payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
            print(f"[Correlation: {correlation_id}] Using direct transport for {payload_size} bytes")
            
            payloads.append({
                'id': str(uuid.uuid4()),
                'dataname': dataname,
                'payload_type': payload_type,
                'transport': 'direct',
                'encoding': 'base64',
                'size': payload_size,
                'data': payload_b64,
                'metadata': {'payload_bytes': payload_size}
            })
        else:
            # Link path
            print(f"[Correlation: {correlation_id}] Using link transport, uploading to fileserver")
            
            response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
            
            if response['status'] != 200:
                raise Exception(f"Failed to upload data to fileserver: {response['status']}")
            
            print(f"[Correlation: {correlation_id}] Uploaded to URL: {response['url']}")
            
            payloads.append({
                'id': str(uuid.uuid4()),
                'dataname': dataname,
                'payload_type': payload_type,
                'transport': 'link',
                'encoding': 'none',
                'size': payload_size,
                'data': response['url'],
                'metadata': {}
            })
    
    # Build envelope
    env = {
        'correlation_id': correlation_id,
        'msg_id': msg_id,
        'timestamp': datetime.utcnow().isoformat() + 'Z',
        'send_to': subject,
        'msg_purpose': msg_purpose,
        'sender_name': sender_name,
        'sender_id': sender_id,
        'receiver_name': receiver_name,
        'receiver_id': receiver_id,
        'reply_to': reply_to,
        'reply_to_msg_id': reply_to_msg_id,
        'broker_url': broker_url,
        'metadata': {},
        'payloads': payloads
    }
    
    env_json_str = json.dumps(env)
    
    if is_publish:
        if nats_connection:
            await publish_message(nats_connection, subject, env_json_str, correlation_id)
        else:
            await publish_message(broker_url, subject, env_json_str, correlation_id)
    
    return env, env_json_str

serializeData Implementation

import base64
import json
from typing import Any

try:
    import pyarrow as arrow
    import pyarrow.feather as feather
    import pyarrow.ipc as ipc
    ARROW_AVAILABLE = True
except ImportError:
    ARROW_AVAILABLE = False


def _serialize_data(data: Any, payload_type: str) -> bytes:
    """
    Serialize data to bytes based on type.
    
    Note: Python uses "table" as a single type for both Arrow and JSON table
    serialization. Julia/JavaScript use separate "arrowtable" and "jsontable" types.
    """
    if payload_type == 'text':
        if isinstance(data, str):
            return data.encode('utf-8')
        else:
            raise ValueError('Text data must be a string')
    elif payload_type == 'dictionary':
        json_str = json.dumps(data)
        return json_str.encode('utf-8')
    elif payload_type == 'table':
        # Python uses "table" for both arrowtable and jsontable
        if not ARROW_AVAILABLE:
            raise RuntimeError('pyarrow not available for table serialization')
        
        import io
        buf = io.BytesIO()
        import pandas as pd
        if isinstance(data, pd.DataFrame):
            # Serialize DataFrame to Arrow
            table = arrow.Table.from_pandas(data)
            sink = ipc.new_file(buf, table.schema)
            ipc.write_table(table, sink)
            sink.close()
            return buf.getvalue()
        elif isinstance(data, arrow.Table):
            sink = ipc.new_file(buf, data.schema)
            ipc.write_table(data, sink)
            sink.close()
            return buf.getvalue()
        else:
            raise ValueError('Table data must be a pandas DataFrame or pyarrow Table')
    elif payload_type in ('image', 'audio', 'video', 'binary'):
        if isinstance(data, (bytes, bytearray)):
            return bytes(data)
        else:
            raise ValueError(f'{payload_type} data must be bytes')
    else:
        raise ValueError(f'Unknown payload_type: {payload_type}')

deserializeData Implementation

import base64
import json
from typing import Any

try:
    import pyarrow as arrow
    import pyarrow.feather as feather
    import pyarrow.ipc as ipc
    ARROW_AVAILABLE = True
except ImportError:
    ARROW_AVAILABLE = False


def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any:
    """
    Deserialize bytes to data based on type.
    
    Note: Python uses "table" as a single type for both Arrow and JSON table
    deserialization. Julia/JavaScript use separate "arrowtable" and "jsontable" types.
    """
    if payload_type == 'text':
        return data.decode('utf-8')
    elif payload_type == 'dictionary':
        json_str = data.decode('utf-8')
        return json.loads(json_str)
    elif payload_type == 'table':
        # Python uses "table" for both arrowtable and jsontable
        if not ARROW_AVAILABLE:
            raise RuntimeError('pyarrow not available for table deserialization')
        
        import io
        buf = io.BytesIO(data)
        reader = ipc.open_file(buf)
        return reader.read_all().to_pandas()
    elif payload_type in ('image', 'audio', 'video', 'binary'):
        return data
    else:
        raise ValueError(f'Unknown payload_type: {payload_type}')

fetchWithBackoff Implementation

import asyncio
import aiohttp
from typing import Callable


async def fetch_with_backoff(
    url: str,
    max_retries: int,
    base_delay: int,
    max_delay: int,
    correlation_id: str
) -> bytes:
    """Fetch URL with exponential backoff."""
    delay = base_delay
    
    for attempt in range(1, max_retries + 1):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    if response.status == 200:
                        print(f"[Correlation: {correlation_id}] Successfully fetched data from {url} on attempt {attempt}")
                        return await response.read()
                    else:
                        raise Exception(f"Failed to fetch: {response.status}")
        except Exception as e:
            print(f"[Correlation: {correlation_id}] Attempt {attempt} failed: {type(e).__name__}")
            
            if attempt < max_retries:
                await asyncio.sleep(delay / 1000.0)
                delay = min(delay * 2, max_delay)
    
    raise Exception(f"Failed to fetch data after {max_retries} attempts")

plikOneshotUpload Implementation

import aiohttp
import json
from typing import Dict, Any


async def plik_oneshot_upload(
    file_server_url: str,
    dataname: str,
    data: bytes
) -> Dict[str, Any]:
    """Upload data to plik server in one-shot mode."""
    
    # Get upload id
    async with aiohttp.ClientSession() as session:
        url_getUploadID = f"{file_server_url}/upload"
        headers = {'Content-Type': 'application/json'}
        body = json.dumps({"OneShot": True})
        
        async with session.post(url_getUploadID, headers=headers, data=body) as response:
            response_json = await response.json()
            uploadid = response_json['id']
            uploadtoken = response_json['uploadToken']

        # Upload file
        url_upload = f"{file_server_url}/file/{uploadid}"
        headers = {'X-UploadToken': uploadtoken}
        
        form = aiohttp.FormData()
        form.add_field('file', data, filename=dataname, content_type='application/octet-stream')
        
        async with session.post(url_upload, headers=headers, data=form) as upload_response:
            upload_json = await upload_response.json()
            fileid = upload_json['id']

    url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}"

    return {
        'status': upload_response.status,
        'uploadid': uploadid,
        'fileid': fileid,
        'url': url
    }

MicroPython Implementation

Limitations

MicroPython has significant constraints compared to desktop implementations:

Feature Desktop MicroPython
Memory Unlimited ~256KB - 1MB
Arrow IPC (not supported)
Async/Await (synchronous only)
Large payloads (>1MB) (enforced limit)
arrowtable (not supported)
jsontable (not supported)
Multiple payloads ⚠️ (limited)

Note: MicroPython does NOT support table types (arrowtable or jsontable) due to memory constraints.

Module Structure

# natsbridge_mpy.py (MicroPython)
import network
import time
import json
import base64
import uos
import struct
import random

# Constants
DEFAULT_SIZE_THRESHOLD = 100000  # 100KB for MicroPython
DEFAULT_BROKER_URL = "nats://localhost:4222"
DEFAULT_FILESERVER_URL = "http://localhost:8080"
MAX_PAYLOAD_SIZE = 50000  # Hard limit (lower than threshold for safety)

# Note: MicroPython does NOT support table types (arrowtable/jsontable)
# Only supports: text, dictionary, image, audio, video, binary


class NATSBridge:
    """MicroPython NATS bridge implementation."""
    
    def __init__(self, broker_url=None, fileserver_url=None):
        self.broker_url = broker_url or DEFAULT_BROKER_URL
        self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
        self._nats_conn = None
    
    def smartsend(self, subject, data, **kwargs):
        """Send data (synchronous)."""
        correlation_id = self._generate_uuid()
        msg_id = self._generate_uuid()
        sender_id = self._generate_uuid()
        
        print(f"[Correlation: {correlation_id}] Starting smartsend")
        
        payloads = []
        for dataname, payload_data, payload_type in data:
            payload_bytes = self._serialize_data(payload_data, payload_type)
            payload_size = len(payload_bytes)
            
            if payload_size > MAX_PAYLOAD_SIZE:
                raise MemoryError(f"Payload {dataname} exceeds max size {MAX_PAYLOAD_SIZE}")
            
            if payload_size < DEFAULT_SIZE_THRESHOLD:
                # Direct path
                payload_b64 = base64.b64encode(payload_bytes).decode('ascii')
                payloads.append({
                    'id': self._generate_uuid(),
                    'dataname': dataname,
                    'payload_type': payload_type,
                    'transport': 'direct',
                    'encoding': 'base64',
                    'size': payload_size,
                    'data': payload_b64
                })
            else:
                # Link path (limited support)
                response = self._sync_fileserver_upload(self.fileserver_url, dataname, payload_bytes)
                payloads.append({
                    'id': self._generate_uuid(),
                    'dataname': dataname,
                    'payload_type': payload_type,
                    'transport': 'link',
                    'encoding': 'none',
                    'size': payload_size,
                    'data': response['url']
                })
        
        env = {
            'correlation_id': correlation_id,
            'msg_id': msg_id,
            'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()),
            'send_to': subject,
            'msg_purpose': kwargs.get('msg_purpose', 'chat'),
            'sender_name': kwargs.get('sender_name', 'NATSBridge'),
            'sender_id': sender_id,
            'receiver_name': kwargs.get('receiver_name', ''),
            'receiver_id': kwargs.get('receiver_id', ''),
            'reply_to': kwargs.get('reply_to', ''),
            'reply_to_msg_id': kwargs.get('reply_to_msg_id', ''),
            'broker_url': self.broker_url,
            'metadata': {},
            'payloads': payloads
        }
        
        env_json_str = json.dumps(env)
        
        # Publish
        self._publish(subject, env_json_str, correlation_id)
        
        return env, env_json_str
    
    def smartreceive(self, msg, **kwargs):
        """Receive and process message (synchronous)."""
        env_json_obj = json.loads(msg.payload)
        correlation_id = env_json_obj['correlation_id']
        
        payloads_list = []
        for payload in env_json_obj['payloads']:
            transport = payload['transport']
            dataname = payload['dataname']
            
            if transport == 'direct':
                payload_b64 = payload['data']
                payload_bytes = base64.b64decode(payload_b64)
                data_type = payload['payload_type']
                data = self._deserialize_data(payload_bytes, data_type)
                payloads_list.append((dataname, data, data_type))
            elif transport == 'link':
                url = payload['data']
                downloaded_data = self._sync_fileserver_download(
                    url,
                    kwargs.get('max_retries', 3),
                    kwargs.get('base_delay', 100),
                    kwargs.get('max_delay', 1000),
                    correlation_id
                )
                data_type = payload['payload_type']
                data = self._deserialize_data(downloaded_data, data_type)
                payloads_list.append((dataname, data, data_type))
        
        env_json_obj['payloads'] = payloads_list
        return env_json_obj
    
    def _serialize_data(self, data, payload_type):
        """
        Serialize data (MicroPython version).
        
        Note: MicroPython does NOT support table types (arrowtable/jsontable).
        Only supports: text, dictionary, image, audio, video, binary
        """
        if payload_type == 'text':
            if isinstance(data, str):
                return data.encode('utf-8')
            else:
                raise ValueError('Text data must be a string')
        elif payload_type == 'dictionary':
            json_str = json.dumps(data)
            return json_str.encode('utf-8')
        elif payload_type in ('image', 'audio', 'video', 'binary'):
            if isinstance(data, (bytes, bytearray, memoryview)):
                return bytes(data)
            else:
                raise ValueError(f'{payload_type} data must be bytes')
        else:
            raise ValueError(f'Unknown payload_type: {payload_type}')
    
    def _deserialize_data(self, data, payload_type):
        """
        Deserialize data (MicroPython version).
        
        Note: MicroPython does NOT support table types (arrowtable/jsontable).
        Only supports: text, dictionary, image, audio, video, binary
        """
        if payload_type == 'text':
            return data.decode('utf-8')
        elif payload_type == 'dictionary':
            json_str = data.decode('utf-8')
            return json.loads(json_str)
        elif payload_type in ('image', 'audio', 'video', 'binary'):
            return data
        else:
            raise ValueError(f'Unknown payload_type: {payload_type}')
    
    def _generate_uuid(self):
        """Generate simple UUID (MicroPython compatible)."""
        return 'mp-%04x%04x-%04x-%04x-%04x-%04x%04x%04x' % (
            time.time_ns() // (10**6) % 0xFFFFFFFF,
            time.time_ns() % 0xFFFFFFFF,
            time.time_ns() >> 32 & 0xFFFF,
            time.time_ns() >> 48 & 0xFFFF,
            time.time_ns() >> 64 & 0xFFFF,
            time.time_ns() >> 80 & 0xFFFF,
            time.time_ns() >> 96 & 0xFFFF,
            time.time_ns() >> 112 & 0xFFFF
        )
    
    def _sync_fileserver_upload(self, url, dataname, data):
        """Synchronous file upload (limited)."""
        # Simplified implementation for MicroPython
        # In practice, would use network.HTTP or similar
        raise NotImplementedError("File upload not implemented in MicroPython")
    
    def _sync_fileserver_download(self, url, max_retries, base_delay, max_delay, correlation_id):
        """Synchronous file download with backoff."""
        # Simplified implementation for MicroPython
        raise NotImplementedError("File download not implemented in MicroPython")
    
    def _publish(self, subject, message, correlation_id):
        """Publish message to NATS."""
        # Simplified implementation for MicroPython
        raise NotImplementedError("NATS publishing not implemented in MicroPython")

Configuration

Environment Variables

Variable Default Description
NATS_URL nats://localhost:4222 NATS server URL
FILESERVER_URL http://localhost:8080 HTTP file server URL
SIZE_THRESHOLD 1000000 Size threshold in bytes (1MB)

MicroPython Configuration

# micropython.conf
NATS_URL = "nats://broker.local:4222"
FILESERVER_URL = "http://fileserver.local:8080"
SIZE_THRESHOLD = 100000  # Lower threshold for memory-constrained devices
MAX_PAYLOAD_SIZE = 50000  # Hard limit for MicroPython

Performance Considerations

Zero-Copy Reading

Platform Strategy
Julia Arrow.read() with memory-mapped files
JavaScript ArrayBuffer with DataView
Python pyarrow memory mapping
MicroPython Not available (streaming only)

Exponential Backoff

All platforms implement exponential backoff for HTTP downloads:

# Python
async def fetch_with_backoff(url, max_retries, base_delay, max_delay, correlation_id):
    delay = base_delay
    for attempt in range(1, max_retries + 1):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    if response.status == 200:
                        return await response.read()
        except Exception as e:
            if attempt < max_retries:
                await asyncio.sleep(delay / 1000.0)
                delay = min(delay * 2, max_delay)
    raise Exception("Failed to fetch after max retries")

Correlation ID Logging

All platforms use correlation IDs for distributed tracing:

[timestamp] [Correlation: abc123] Message published to subject

Serialization Performance

Format Use Case Pros Cons
arrowtable Large tabular data Fast, zero-copy, schema-preserving Binary format, requires Arrow library, not supported in MicroPython
jsontable Small/medium tabular data Human-readable, universal support, works in MicroPython Slower, larger size, no schema enforcement

Testing

Test File Organization

Platform Sender Tests Receiver Tests
Julia test/test_julia_*_sender.jl test/test_julia_*_receiver.jl
JavaScript test/test_js_*_sender.js test/test_js_*_receiver.js
Python test/test_py_*_sender.py test/test_py_*_receiver.py

Run Tests

# Julia
julia test/test_julia_text_sender.jl
julia test/test_julia_text_receiver.jl

# JavaScript (Node.js)
node test/test_js_text_sender.js
node test/test_js_text_receiver.js

# Python
python3 test/test_py_text_sender.py
python3 test/test_py_text_receiver.py

Troubleshooting

Common Issues

  1. NATS Connection Failed

    • Ensure NATS server is running
    • Check broker_url configuration
  2. HTTP Upload Failed

    • Ensure file server is running
    • Check fileserver_url configuration
    • Verify upload permissions
  3. Arrow IPC Deserialization Error

    • Ensure data is properly serialized to Arrow format
    • Check Arrow version compatibility
    • MicroPython doesn't support Arrow IPC
  4. Memory Constraints (MicroPython)

    • Reduce size_threshold
    • Use direct transport only (< 100KB)
    • Avoid large payloads
    • Use jsontable instead of arrowtable (arrowtable not supported)

Summary

This cross-platform NATS bridge provides:

  1. High-Level API Parity: Identical smartsend() and smartreceive() signatures across all platforms
  2. Idiomatic Implementations:
    • Julia: Multiple dispatch, struct-based design, native Arrow IPC
    • JavaScript: Async/await, prototype-based utilities, class-based NATS client
    • Python: Class-based design with dataclasses, type hints, async/await
    • MicroPython: Synchronous API, memory-constrained optimizations
  3. Message Format Consistency: Identical JSON schemas across all platforms
  4. Handler Abstraction: File server operations abstracted through configurable handlers
  5. Platform-Specific Optimizations:
    • Arrow IPC (arrowtable): Efficient binary format for large tabular data (not supported in MicroPython)
    • JSON (jsontable): Universal human-readable format for smaller tables (works in Julia, JavaScript, Python; NOT supported in MicroPython)

The Julia implementation in src/NATSBridge.jl serves as the ground truth for API design and behavior.

Datatype Summary

Datatype Serialization Use Case Encoding Supported Platforms
arrowtable Apache Arrow IPC Large tabular data, schema-preserving arrow-ipcbase64 Julia, JavaScript, Python
jsontable JSON Small/medium tabular data, human-readable jsonbase64 Julia, JavaScript, Python
table Apache Arrow IPC (Python only) Python's unified table type arrow-ipcbase64 Python