This commit is contained in:
2026-03-05 11:00:46 +07:00
parent e85eba4cea
commit 060c68cd05
5 changed files with 5353 additions and 0 deletions

784
src/natbridge.py Normal file
View File

@@ -0,0 +1,784 @@
"""
NATSBridge - Cross-Platform Bi-Directional Data Bridge
Python Implementation (Desktop Python)
This module provides functionality for sending and receiving data across network boundaries
using NATS as the message bus, with support for both direct payload transport and
URL-based transport for larger payloads.
File Server Handler Architecture:
The system uses handler functions to abstract file server operations, allowing support
for different file server implementations (e.g., Plik, AWS S3, custom HTTP server).
Handler Function Signatures:
```python
# Upload handler - uploads data to file server and returns URL
# The handler is passed to smartsend as fileserver_upload_handler parameter
# It receives: (fileserver_url, dataname, data)
# Returns: Coroutine[Dict[str, Any]]
async def fileserver_upload_handler(fileserver_url, dataname, data)
# Download handler - fetches data from file server URL with exponential backoff
# The handler is passed to smartreceive as fileserver_download_handler parameter
# It receives: (url, max_retries, base_delay, max_delay, correlation_id)
# Returns: Coroutine[bytes]
async def fileserver_download_handler(url, max_retries, base_delay, max_delay, correlation_id)
```
Multi-Payload Support (Standard API):
The system uses a standardized list-of-tuples format for all payload operations.
Even when sending a single payload, the user must wrap it in a list.
API Standard:
```python
# Input format for smartsend (always a list of tuples with type info)
[(dataname1, data1, type1), (dataname2, data2, type2), ...]
# Output format for smartreceive (returns a dictionary with payloads field containing list of tuples)
{
"correlation_id": "...",
"msg_id": "...",
"timestamp": "...",
"send_to": "...",
"msg_purpose": "...",
"sender_name": "...",
"sender_id": "...",
"receiver_name": "...",
"receiver_id": "...",
"reply_to": "...",
"reply_to_msg_id": "...",
"broker_url": "...",
"metadata": {...},
"payloads": [(dataname1, data1, type1), (dataname2, data2, type2), ...]
}
```
Supported types: "text", "dictionary", "table", "image", "audio", "video", "binary"
"""
import asyncio
import base64
import json
import uuid
import time
from typing import Any, Callable, Dict, List, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime
# Optional dependencies
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
try:
import pyarrow as arrow
import pyarrow.parquet as pq
import pandas as pd
ARROW_AVAILABLE = True
except ImportError:
ARROW_AVAILABLE = False
try:
import nats
from nats.aio.client import Client as NATSClient
NATS_AVAILABLE = True
except ImportError:
NATS_AVAILABLE = False
# ============================================================================
# Constants
# ============================================================================
DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport
DEFAULT_BROKER_URL = "nats://localhost:4222" # Default NATS server URL
DEFAULT_FILESERVER_URL = "http://localhost:8080" # Default HTTP file server URL
# ============================================================================
# Data Classes
# ============================================================================
@dataclass
class MsgPayloadV1:
"""Message payload structure."""
id: str
dataname: str
payload_type: str
transport: str
encoding: str
size: int
data: Union[str, bytes] # URL for link, base64 for direct
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MsgEnvelopeV1:
"""Message envelope structure."""
correlation_id: str
msg_id: str
timestamp: str
send_to: str
msg_purpose: str
sender_name: str
sender_id: str
receiver_name: str
receiver_id: str
reply_to: str
reply_to_msg_id: str
broker_url: str
metadata: Dict[str, Any] = field(default_factory=dict)
payloads: List[MsgPayloadV1] = field(default_factory=list)
# ============================================================================
# Utility Functions
# ============================================================================
def log_trace(correlation_id: str, message: str) -> None:
"""Log a trace message with correlation ID and timestamp."""
timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
print(f"[{timestamp}] [Correlation: {correlation_id}] {message}")
def generate_uuid() -> str:
"""Generate a UUID v4 string."""
return str(uuid.uuid4())
# ============================================================================
# Serialization Functions
# ============================================================================
def _serialize_data(data: Any, payload_type: str) -> bytes:
"""
Serialize data according to specified format.
Args:
data: Data to serialize
payload_type: Target format: "text", "dictionary", "table", "image", "audio", "video", "binary"
Returns:
Binary representation of the serialized data
Raises:
Error: If data type doesn't match payload_type
"""
if payload_type == 'text':
if isinstance(data, str):
return data.encode('utf-8')
else:
raise Error('Text data must be a string')
elif payload_type == 'dictionary':
json_str = json.dumps(data)
return json_str.encode('utf-8')
elif payload_type == 'table':
if not ARROW_AVAILABLE:
raise Error('pyarrow not available for table serialization. Install with: pip install pyarrow pandas')
# Convert DataFrame to Arrow IPC
import io
buf = io.BytesIO()
if isinstance(data, pd.DataFrame):
table = arrow.Table.from_pandas(data)
sink = arrow.ipc.new_file(buf)
arrow.ipc.write_table(table, sink)
sink.close()
return buf.getvalue()
else:
raise Error('Table data must be a pandas DataFrame')
elif payload_type == 'image':
if isinstance(data, (bytes, bytearray)):
return bytes(data)
else:
raise Error('Image data must be bytes')
elif payload_type == 'audio':
if isinstance(data, (bytes, bytearray)):
return bytes(data)
else:
raise Error('Audio data must be bytes')
elif payload_type == 'video':
if isinstance(data, (bytes, bytearray)):
return bytes(data)
else:
raise Error('Video data must be bytes')
elif payload_type == 'binary':
if isinstance(data, (bytes, bytearray)):
return bytes(data)
else:
raise Error('Binary data must be bytes')
else:
raise Error(f'Unknown payload_type: {payload_type}')
def _deserialize_data(data: bytes, payload_type: str, correlation_id: str) -> Any:
"""
Deserialize bytes to data based on type.
Args:
data: Serialized data as bytes
payload_type: Data type
correlation_id: Correlation ID for logging
Returns:
Deserialized data
"""
if payload_type == 'text':
return data.decode('utf-8')
elif payload_type == 'dictionary':
json_str = data.decode('utf-8')
return json.loads(json_str)
elif payload_type == 'table':
if not ARROW_AVAILABLE:
raise Error('pyarrow not available for table deserialization')
import io
buf = io.BytesIO(data)
reader = arrow.ipc.open_file(buf)
return reader.read_all().to_pandas()
elif payload_type == 'image':
return data
elif payload_type == 'audio':
return data
elif payload_type == 'video':
return data
elif payload_type == 'binary':
return data
else:
raise Error(f'Unknown payload_type: {payload_type}')
# ============================================================================
# File Server Handlers
# ============================================================================
async def plik_oneshot_upload(
file_server_url: str,
dataname: str,
data: bytes
) -> Dict[str, Any]:
"""
Upload a single file to a plik server using one-shot mode.
Args:
file_server_url: Base URL of the plik server
dataname: Name of the file being uploaded
data: Raw byte data of the file content
Returns:
Dict with keys: 'status', 'uploadid', 'fileid', 'url'
"""
if not AIOHTTP_AVAILABLE:
raise Error('aiohttp not available. Install with: pip install aiohttp')
async with aiohttp.ClientSession() as session:
# Get upload id
url_getUploadID = f"{file_server_url}/upload"
headers = {'Content-Type': 'application/json'}
body = json.dumps({"OneShot": True})
async with session.post(url_getUploadID, headers=headers, data=body) as response:
response_json = await response.json()
uploadid = response_json['id']
uploadtoken = response_json['uploadToken']
# Upload file
url_upload = f"{file_server_url}/file/{uploadid}"
headers = {'X-UploadToken': uploadtoken}
form = aiohttp.FormData()
form.add_field('file', data, filename=dataname, content_type='application/octet-stream')
async with session.post(url_upload, headers=headers, data=form) as upload_response:
upload_json = await upload_response.json()
fileid = upload_json['id']
url = f"{file_server_url}/file/{uploadid}/{fileid}/{dataname}"
return {
'status': upload_response.status,
'uploadid': uploadid,
'fileid': fileid,
'url': url
}
async def fetch_with_backoff(
url: str,
max_retries: int,
base_delay: int,
max_delay: int,
correlation_id: str
) -> bytes:
"""
Fetch data from URL with exponential backoff.
Args:
url: URL to fetch from
max_retries: Maximum retry attempts
base_delay: Initial delay in milliseconds
max_delay: Maximum delay in milliseconds
correlation_id: Correlation ID for logging
Returns:
Fetched data as bytes
Raises:
Error: If all retry attempts fail
"""
if not AIOHTTP_AVAILABLE:
raise Error('aiohttp not available. Install with: pip install aiohttp')
delay = base_delay
for attempt in range(1, max_retries + 1):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
log_trace(correlation_id, f"Successfully fetched data from {url} on attempt {attempt}")
return await response.read()
else:
raise Error(f"Failed to fetch: {response.status}")
except Exception as e:
log_trace(correlation_id, f"Attempt {attempt} failed: {type(e).__name__}")
if attempt < max_retries:
await asyncio.sleep(delay / 1000.0)
delay = min(delay * 2, max_delay)
raise Error(f"Failed to fetch data after {max_retries} attempts")
# ============================================================================
# NATS Publishing
# ============================================================================
async def publish_message(
broker_url_or_conn: Union[str, NATSClient],
subject: str,
message: str,
correlation_id: str,
nats_connection: NATSClient = None
) -> None:
"""
Publish message to NATS.
Args:
broker_url_or_conn: NATS server URL or pre-existing connection
subject: NATS subject to publish to
message: JSON message to publish
correlation_id: Correlation ID for tracing and logging
nats_connection: Optional pre-existing NATS connection
"""
if nats_connection:
# Use provided connection
try:
await nats_connection.publish(subject, message)
log_trace(correlation_id, f"Message published to {subject}")
finally:
# Note: In a real implementation, you might want to drain/close the connection
pass
elif isinstance(broker_url_or_conn, str):
# URL-based - create new connection
if not NATS_AVAILABLE:
raise Error('nats-py not available. Install with: pip install nats-py')
conn = await nats.connect(broker_url_or_conn)
try:
await conn.publish(subject, message)
log_trace(correlation_id, f"Message published to {subject}")
finally:
await conn.drain()
else:
raise Error('Invalid broker_url_or_conn type')
# ============================================================================
# Core Functions
# ============================================================================
async def smartsend(
subject: str,
data: List[Tuple[str, Any, str]],
broker_url: str = DEFAULT_BROKER_URL,
fileserver_url: str = DEFAULT_FILESERVER_URL,
fileserver_upload_handler: Callable = plik_oneshot_upload,
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
correlation_id: str = None,
msg_purpose: str = "chat",
sender_name: str = "NATSBridge",
receiver_name: str = "",
receiver_id: str = "",
reply_to: str = "",
reply_to_msg_id: str = "",
is_publish: bool = True,
nats_connection: NATSClient = None,
msg_id: str = None,
sender_id: str = None
) -> Tuple[Dict, str]:
"""
Send data via NATS with automatic transport selection.
This function intelligently routes data delivery based on payload size relative to a threshold.
If the serialized payload is smaller than size_threshold, it encodes the data as Base64
and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
and publishes only the download URL over NATS.
Args:
subject: NATS subject to publish the message to
data: List of (dataname, data, type) tuples
broker_url: URL of the NATS server
fileserver_url: URL of the HTTP file server for large payloads
fileserver_upload_handler: Function to handle fileserver uploads
size_threshold: Threshold in bytes separating direct vs link transport
correlation_id: Correlation ID for tracing (auto-generated if not provided)
msg_purpose: Purpose of the message
sender_name: Name of the sender
receiver_name: Name of the receiver
receiver_id: UUID of the receiver
reply_to: Topic to reply to
reply_to_msg_id: Message ID this message is replying to
is_publish: Whether to automatically publish the message to NATS
nats_connection: Pre-existing NATS connection (if provided, uses this instead of creating a new one)
msg_id: Message ID (auto-generated if not provided)
sender_id: Sender ID (auto-generated if not provided)
Returns:
Tuple of (env, env_json_str) where:
- env: Dict with all metadata and payloads
- env_json_str: JSON string for publishing to NATS
Example:
```python
# Send a single payload (still wrapped in a list)
config = {"key": "value"}
env, env_json_str = await smartsend(
"my.subject",
[("config", config, "dictionary")],
broker_url="nats://localhost:4222"
)
# Send multiple payloads in one message with different types
data1 = {"key1": "value1"}
data2 = [1, 2, 3, 4, 5]
env, env_json_str = await smartsend(
"my.subject",
[("data1", data1, "dictionary"), ("data2", data2, "table")]
)
# Mixed content (e.g., chat with text and image)
env, env_json_str = await smartsend(
"chat.subject",
[
("message_text", "Hello!", "text"),
("user_image", image_bytes, "image"),
("audio_clip", audio_bytes, "audio")
]
)
```
"""
if correlation_id is None:
correlation_id = generate_uuid()
if msg_id is None:
msg_id = generate_uuid()
if sender_id is None:
sender_id = generate_uuid()
log_trace(correlation_id, f"Starting smartsend for subject: {subject}")
# Process each payload in the list
payloads = []
for dataname, payload_data, payload_type in data:
payload_bytes = _serialize_data(payload_data, payload_type)
payload_size = len(payload_bytes)
log_trace(correlation_id, f"Serialized payload '{dataname}' (payload_type: {payload_type}) size: {payload_size} bytes")
# Decision: Direct vs Link
if payload_size < size_threshold:
# Direct path - Base64 encode and send via NATS
payload_b64 = base64.b64encode(payload_bytes).decode('utf-8')
log_trace(correlation_id, f"Using direct transport for {payload_size} bytes")
payloads.append({
'id': generate_uuid(),
'dataname': dataname,
'payload_type': payload_type,
'transport': 'direct',
'encoding': 'base64',
'size': payload_size,
'data': payload_b64,
'metadata': {'payload_bytes': payload_size}
})
else:
# Link path - Upload to HTTP server, send URL via NATS
log_trace(correlation_id, "Using link transport, uploading to fileserver")
response = await fileserver_upload_handler(fileserver_url, dataname, payload_bytes)
if response['status'] != 200:
raise Error(f"Failed to upload data to fileserver: {response['status']}")
log_trace(correlation_id, f"Uploaded to URL: {response['url']}")
payloads.append({
'id': generate_uuid(),
'dataname': dataname,
'payload_type': payload_type,
'transport': 'link',
'encoding': 'none',
'size': payload_size,
'data': response['url'],
'metadata': {}
})
# Build envelope
env = {
'correlation_id': correlation_id,
'msg_id': msg_id,
'timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
'send_to': subject,
'msg_purpose': msg_purpose,
'sender_name': sender_name,
'sender_id': sender_id,
'receiver_name': receiver_name,
'receiver_id': receiver_id,
'reply_to': reply_to,
'reply_to_msg_id': reply_to_msg_id,
'broker_url': broker_url,
'metadata': {},
'payloads': payloads
}
env_json_str = json.dumps(env)
if is_publish:
if nats_connection:
await publish_message(broker_url, subject, env_json_str, correlation_id, nats_connection)
else:
await publish_message(broker_url, subject, env_json_str, correlation_id)
return env, env_json_str
async def smartreceive(
msg: Any,
fileserver_download_handler: Callable = fetch_with_backoff,
max_retries: int = 5,
base_delay: int = 100,
max_delay: int = 5000
) -> Dict:
"""
Receive and process messages from NATS.
This function processes incoming NATS messages, handling both direct transport
(base64 decoded payloads) and link transport (URL-based payloads).
It deserializes the data based on the transport type and returns the result.
Args:
msg: NATS message object
fileserver_download_handler: Function to handle downloading data from file server URLs
max_retries: Maximum retry attempts for fetching URL
base_delay: Initial delay for exponential backoff in ms
max_delay: Maximum delay for exponential backoff in ms
Returns:
Dict with envelope metadata and payloads field containing list of tuples
Example:
```python
# Receive and process message
msg = await nats_connection.subscribe("my.subject")
env = await smartreceive(msg)
# env["payloads"] = [("dataname1", data1, "type1"), ("dataname2", data2, "type2"), ...]
```
"""
# Parse the JSON envelope
if isinstance(msg.payload, bytes):
payload = msg.payload.decode('utf-8')
else:
payload = msg.payload
env_json_obj = json.loads(payload)
log_trace(env_json_obj['correlation_id'], "Processing received message")
# Process all payloads in the envelope
payloads_list = []
for payload in env_json_obj['payloads']:
transport = payload['transport']
dataname = payload['dataname']
if transport == 'direct':
log_trace(env_json_obj['correlation_id'], f"Direct transport - decoding payload '{dataname}'")
# Extract base64 payload from the payload
payload_b64 = payload['data']
# Decode Base64 payload
payload_bytes = base64.b64decode(payload_b64)
# Deserialize based on type
data_type = payload['payload_type']
data = _deserialize_data(payload_bytes, data_type, env_json_obj['correlation_id'])
payloads_list.append((dataname, data, data_type))
elif transport == 'link':
# Extract download URL from the payload
url = payload['data']
log_trace(env_json_obj['correlation_id'], f"Link transport - fetching '{dataname}' from URL: {url}")
# Fetch with exponential backoff using the download handler
downloaded_data = await fileserver_download_handler(
url,
max_retries,
base_delay,
max_delay,
env_json_obj['correlation_id']
)
# Deserialize based on type
data_type = payload['payload_type']
data = _deserialize_data(downloaded_data, data_type, env_json_obj['correlation_id'])
payloads_list.append((dataname, data, data_type))
else:
raise Error(f"Unknown transport type for payload '{dataname}': {transport}")
env_json_obj['payloads'] = payloads_list
return env_json_obj
# ============================================================================
# NATS Client Wrapper
# ============================================================================
class NATSBridge:
"""
Cross-platform NATS bridge implementation.
Provides a class-based interface for NATSBridge functionality.
"""
def __init__(self, broker_url: str = None, fileserver_url: str = None):
"""
Initialize the NATSBridge client.
Args:
broker_url: NATS server URL (defaults to DEFAULT_BROKER_URL)
fileserver_url: HTTP file server URL (defaults to DEFAULT_FILESERVER_URL)
"""
self.broker_url = broker_url or DEFAULT_BROKER_URL
self.fileserver_url = fileserver_url or DEFAULT_FILESERVER_URL
self._nats_client: NATSClient = None
async def connect(self, broker_url: str = None) -> NATSClient:
"""
Connect to NATS server.
Args:
broker_url: NATS server URL (optional, uses instance broker_url if not provided)
Returns:
NATS connection client
"""
url = broker_url or self.broker_url
if not NATS_AVAILABLE:
raise Error('nats-py not available. Install with: pip install nats-py')
self._nats_client = await nats.connect(url)
return self._nats_client
async def smartsend(
self,
subject: str,
data: List[Tuple[str, Any, str]],
**kwargs
) -> Tuple[Dict, str]:
"""
Send data via NATS using instance configuration.
Args:
subject: NATS subject to publish to
data: List of (dataname, data, type) tuples
**kwargs: Additional options (broker_url, fileserver_url, etc.)
Returns:
Tuple of (env, env_json_str)
"""
# Merge instance config with kwargs
options = {
'broker_url': kwargs.get('broker_url', self.broker_url),
'fileserver_url': kwargs.get('fileserver_url', self.fileserver_url),
**kwargs
}
return await smartsend(subject, data, **options)
async def smartreceive(
self,
msg: Any,
**kwargs
) -> Dict:
"""
Receive and process NATS message using instance configuration.
Args:
msg: NATS message object
**kwargs: Additional options
Returns:
Dict with envelope metadata and payloads
"""
return await smartreceive(msg, **kwargs)
async def close(self):
"""Close the NATS connection."""
if self._nats_client:
await self._nats_client.close()
self._nats_client = None
# ============================================================================
# Module Exports
# ============================================================================
__all__ = [
# Core functions
'smartsend',
'smartreceive',
# Utility functions
'log_trace',
'generate_uuid',
'_serialize_data',
'_deserialize_data',
# File server handlers
'plik_oneshot_upload',
'fetch_with_backoff',
# NATS publishing
'publish_message',
# Data classes
'MsgPayloadV1',
'MsgEnvelopeV1',
# Client class
'NATSBridge',
# Constants
'DEFAULT_SIZE_THRESHOLD',
'DEFAULT_BROKER_URL',
'DEFAULT_FILESERVER_URL',
# Availability flags
'ARROW_AVAILABLE',
'AIOHTTP_AVAILABLE',
'NATS_AVAILABLE',
]