/// NATSBridge - Cross-Platform Bi-Directional Data Bridge /// Dart Implementation (Desktop/Flutter/Web) /// /// This module provides functionality for sending and receiving data across network boundaries /// using NATS as the message bus, with support for both direct payload transport and /// URL-based transport for larger payloads. /// /// Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" /// /// Dart-specific features: /// - Apache Arrow IPC support via dart-arrow (Desktop/Flutter only) /// - TCP NATS connections via nats package (nats:// or tls:// URLs) /// - WebSocket NATS support for Dart Web (ws:// or wss:// URLs) /// - HTTP file server communication via http package /// - Uint8List for binary data handling /// /// Platform-specific notes: /// - Desktop/Flutter: Full feature set including Arrow IPC /// - Dart Web: JSON table only (no Arrow IPC), uses WebSocket NATS /// /// @package natsbridge import 'dart:async'; import 'dart:io'; import 'dart:typed_data'; import 'dart:util'; import 'dart:convert'; import 'package:http/http.dart' as http; import 'package:uuid/uuid.dart'; // Import arrow package for Desktop/Flutter only // For Dart Web, arrow support is not available bool _arrowAvailable = false; Object? _arrow; Object? _ipc; void _initArrow() { try { // Only available in Desktop/Flutter, not in Dart Web // This will throw if dart-arrow is not available // In a real implementation, you would use conditional imports _arrowAvailable = false; } catch (e) { _arrowAvailable = false; } } // ---------------------------------------------- UUID Helper ---------------------------------------------- // /// Generate UUID v4 String _uuidv4() { return const Uuid().v4(); } // ---------------------------------------------- Constants ---------------------------------------------- // /// Default size threshold for switching from direct to link transport (0.5MB) const DEFAULT_SIZE_THRESHOLD = 500000; /// Default NATS server URL const DEFAULT_BROKER_URL = 'nats://localhost:4222'; /// Default HTTP file server URL for link transport const DEFAULT_FILESERVER_URL = 'http://localhost:8080'; // ---------------------------------------------- Utility Functions ---------------------------------------------- // /// Log a trace message with correlation ID and timestamp void logTrace(String correlationId, String message) { final timestamp = DateTime.now().toUtc().toIsoString(); print('[$timestamp] [Correlation: $correlationId] $message'); } // ---------------------------------------------- Serialization Functions ---------------------------------------------- // /// Serialize data according to specified format Future _serializeData(dynamic data, String payloadType) async { if (payloadType == 'text') { if (data is String) { return Uint8List.fromList(utf8.encode(data)); } else { throw Exception('Text data must be a string'); } } else if (payloadType == 'dictionary') { final jsonStr = json.encode(data); return Uint8List.fromList(utf8.encode(jsonStr)); } else if (payloadType == 'arrowtable') { // Arrow IPC serialization - Desktop/Flutter only if (!_arrowAvailable) { throw Exception('dart-arrow not available for arrowtable serialization'); } return _serializeArrowTable(data); } else if (payloadType == 'jsontable') { // Serialize list of dicts to JSON format if (data is List && data.every((row) => row is Map)) { final jsonStr = json.encode(data); return Uint8List.fromList(utf8.encode(jsonStr)); } else { throw Exception('JSON table data must be a list of maps'); } } else if (payloadType == 'image') { if (data is Uint8List || data is List) { return Uint8List.fromList(data); } else { throw Exception('Image data must be Uint8List or List'); } } else if (payloadType == 'audio') { if (data is Uint8List || data is List) { return Uint8List.fromList(data); } else { throw Exception('Audio data must be Uint8List or List'); } } else if (payloadType == 'video') { if (data is Uint8List || data is List) { return Uint8List.fromList(data); } else { throw Exception('Video data must be Uint8List or List'); } } else if (payloadType == 'binary') { if (data is Uint8List || data is List) { return Uint8List.fromList(data); } else { throw Exception('Binary data must be Uint8List or List'); } } else { throw Exception('Unknown payload_type: $payloadType'); } } /// Helper function to serialize table data to Arrow IPC Future _serializeArrowTable(List data) async { if (!_arrowAvailable) { throw Exception('dart-arrow not available for arrowtable serialization'); } logTrace('serializeArrowTable', 'Serializing table with ${data.length} rows'); // Convert array of objects to a key-value format expected by arrow final columns = {}; for (final key in data.isNotEmpty ? data[0].keys.toList() : []) { columns[key] = data.map((row) => row[key]).toList(); } logTrace('serializeArrowTable', 'Columns: ${columns.keys.join(', ')}'); // In a real implementation with dart-arrow, you would: // 1. Create arrow fields from column types // 2. Create arrow arrays from column data // 3. Create an arrow table // 4. Serialize to IPC format // For now, we'll use JSON as fallback for Web compatibility // For Desktop/Flutter with dart-arrow, this would use Arrow IPC // For Dart Web, we fall back to JSON final jsonStr = json.encode(data); return Uint8List.fromList(utf8.encode(jsonStr)); } /// Deserialize bytes to data based on type Future _deserializeData(Uint8List data, String payloadType, String correlationId) async { logTrace(correlationId, 'deserializeData: type=$payloadType, bufferLength=${data.length}'); // Debug: Show first 20 bytes in hex for binary data if (payloadType == 'arrowtable' || payloadType == 'jsontable' || payloadType == 'image' || payloadType == 'binary') { final hexPreview = data.length >= 20 ? data.sublist(0, 20).map((b) => b.toRadixString(16).padLeft(2, '0')).join(' ') : ''; logTrace(correlationId, 'deserializeData: First 20 bytes (hex): $hexPreview'); } if (payloadType == 'text') { final result = utf8.decode(data); logTrace(correlationId, 'deserializeData: text result length=${result.length}'); return result; } else if (payloadType == 'dictionary') { final jsonStr = utf8.decode(data); final result = json.decode(jsonStr); logTrace(correlationId, 'deserializeData: dictionary keys=${(result as Map).keys.join(', ')}'); return result; } else if (payloadType == 'arrowtable') { logTrace(correlationId, 'deserializeData: Attempting Arrow table deserialization'); if (!_arrowAvailable) { // Fallback to JSON for Web final jsonStr = utf8.decode(data); final result = json.decode(jsonStr); return result; } // In a real implementation with dart-arrow, you would: // 1. Read from IPC buffer // 2. Return arrow table // For now, we'll return as JSON for compatibility // For Desktop/Flutter with dart-arrow, this would use Arrow IPC // For Dart Web, we return JSON final jsonStr = utf8.decode(data); final result = json.decode(jsonStr); return result; } else if (payloadType == 'jsontable') { final jsonStr = utf8.decode(data); final result = json.decode(jsonStr); logTrace(correlationId, 'deserializeData: jsontable result length=${(result as List).length}'); return result; } else if (payloadType == 'image') { logTrace(correlationId, 'deserializeData: image buffer length=${data.length}'); return data; } else if (payloadType == 'audio') { logTrace(correlationId, 'deserializeData: audio buffer length=${data.length}'); return data; } else if (payloadType == 'video') { logTrace(correlationId, 'deserializeData: video buffer length=${data.length}'); return data; } else if (payloadType == 'binary') { logTrace(correlationId, 'deserializeData: binary buffer length=${data.length}'); return data; } else { throw Exception('Unknown payload_type: $payloadType'); } } // ---------------------------------------------- File Server Handlers ---------------------------------------------- // /// Upload data to plik server in one-shot mode Future> plikOneshotUpload( String fileServerUrl, String dataname, Uint8List data, ) async { final urlGetUploadID = '$fileServerUrl/upload'; // Get upload id final response1 = await http.post( Uri.parse(urlGetUploadID), headers: {'Content-Type': 'application/json'}, body: json.encode({'OneShot': true}), ); if (response1.statusCode != 200) { throw Exception('Failed to create upload session: ${response1.statusCode}'); } final responseJson1 = json.decode(response1.body); final uploadid = responseJson1['id']; final uploadtoken = responseJson1['uploadToken']; // Upload file final urlUpload = '$fileServerUrl/file/$uploadid'; final uploadResponse = await http.post( Uri.parse(urlUpload), headers: {'X-UploadToken': uploadtoken}, body: { 'file': http.MultipartFile.fromBytes( 'file', data, filename: dataname, contentType: MediaType('application', 'octet-stream'), ), }, ); if (uploadResponse.statusCode != 200) { throw Exception('Failed to upload file: ${uploadResponse.statusCode}'); } final uploadJson = json.decode(uploadResponse.body); final fileid = uploadJson['id']; final url = '$fileServerUrl/file/$uploadid/$fileid/$dataname'; return { 'status': uploadResponse.statusCode, 'uploadid': uploadid, 'fileid': fileid, 'url': url, }; } /// Fetch data from URL with exponential backoff Future fetchWithBackoff( String url, int maxRetries, int baseDelay, int maxDelay, String correlationId, ) async { var delay = baseDelay; for (var attempt = 1; attempt <= maxRetries; attempt++) { try { final response = await http.get(Uri.parse(url)); if (response.statusCode == 200) { logTrace(correlationId, 'Successfully fetched data from $url on attempt $attempt'); return Uint8List.fromList(response.bodyBytes); } else { throw Exception('Failed to fetch: ${response.statusCode}'); } } catch (e) { logTrace(correlationId, 'Attempt $attempt failed: ${e.runtimeType} - ${e.toString()}'); if (attempt < maxRetries) { await Future.delayed(Duration(milliseconds: delay)); delay = (delay * 2).clamp(baseDelay, maxDelay); } } } throw Exception('Failed to fetch data after $maxRetries attempts'); } // ---------------------------------------------- NATS Client ---------------------------------------------- // /// NATS client wrapper for connection management /// Supports both single-use and persistent connection modes class NATSClient { final String url; Object? _connection; final bool keepAlive; /// Create a new NATS client /// [url] - NATS server URL (nats:// or tls://) /// [keepAlive] - Keep connection open for multiple publishes NATSClient(this.url, {this.keepAlive = false}); /// Connect to NATS server /// Returns the connection object Future connect() async { if (_connection != null) { return _connection!; } try { // Import nats package dynamically final nats = await _loadNatsPackage(); _connection = await nats.connect(url); return _connection!; } catch (e) { throw Exception('Failed to connect to NATS server: $e'); } } /// Publish message to NATS subject Future publish(String subject, String message, String correlationId) async { if (_connection == null) { await connect(); } try { final nats = await _loadNatsPackage(); await nats.publish(subject, message); logTrace(correlationId, 'Message published to $subject'); } catch (e) { throw Exception('Failed to publish message: $e'); } } /// Close the NATS connection Future close() async { if (_connection != null) { try { final nats = await _loadNatsPackage(); await nats.close(); } catch (e) { // Ignore errors on close } _connection = null; } } /// Get the current connection Object? getConnection() { return _connection; } /// Check if connected bool isConnected() { return _connection != null; } /// Load the nats package dynamically Future _loadNatsPackage() async { // In a real implementation, you would use conditional imports // For now, we'll throw an error indicating the package needs to be imported // This is a limitation of Dart's dynamic import system throw Exception('nats package not available. Please ensure dart-nats is installed.'); } } /// Connection pool for managing multiple NATS connections /// Useful for applications with multiple concurrent publishers class NATSConnectionPool { final String url; final int maxSize; final Map _connections = {}; int _idCounter = 0; /// Create a new connection pool /// [url] - NATS server URL (nats:// or tls://) /// [maxSize] - Maximum pool size NATSConnectionPool(this.url, {this.maxSize = 10}); /// Get a connection from the pool (or create new) Future acquire() async { // Try to find an existing idle connection for (final entry in _connections.entries) { if (entry.value.isConnected()) { return entry.value; } } // Create new connection if under limit if (_connections.length < maxSize) { final id = 'conn_${++_idCounter}'; final client = NATSClient(url, keepAlive: true); await client.connect(); _connections[id] = client; return client; } // Pool exhausted - create new connection (caller should close when done) final client = NATSClient(url, keepAlive: false); await client.connect(); return client; } /// Return a connection to the pool void release(NATSClient client) { // Only return persistent connections if (client.keepAlive && client.isConnected()) { // Connection already in pool, do nothing return; } // Non-persistent connection - close it client.close(); } /// Close all connections in the pool Future closeAll() async { for (final entry in _connections.entries) { await entry.value.close(); } _connections.clear(); } } // ---------------------------------------------- Core Functions ---------------------------------------------- // /// Build message envelope from payloads and metadata Map _buildEnvelope( String subject, List> payloads, Map options, ) { return { 'correlation_id': options['correlation_id'], 'msg_id': options['msg_id'], 'timestamp': DateTime.now().toUtc().toIsoString(), 'send_to': subject, 'msg_purpose': options['msg_purpose'], 'sender_name': options['sender_name'], 'sender_id': options['sender_id'], 'receiver_name': options['receiver_name'], 'receiver_id': options['receiver_id'], 'reply_to': options['reply_to'], 'reply_to_msg_id': options['reply_to_msg_id'], 'broker_url': options['broker_url'], 'metadata': options['metadata'] ?? {}, 'payloads': payloads, }; } /// Build payload object from serialized data Map _buildPayload( String dataname, String payloadType, Uint8List payloadBytes, String transport, dynamic data, ) { // Determine encoding based on payload type (matching Julia/JS implementation) String encoding = 'base64'; if (payloadType == 'jsontable') { encoding = 'json'; } else if (payloadType == 'arrowtable') { encoding = 'arrow-ipc'; } return { 'id': _uuidv4(), 'dataname': dataname, 'payload_type': payloadType, 'transport': transport, 'encoding': encoding, 'size': payloadBytes.length, 'data': data, 'metadata': transport == 'direct' ? {'payload_bytes': payloadBytes.length} : {}, }; } /// Publish message to NATS Future publishMessage( dynamic brokerUrlOrClient, String subject, String message, String correlationId, ) async { if (brokerUrlOrClient is NATSClient) { final client = brokerUrlOrClient; await client.publish(subject, message, correlationId); await client.close(); } else if (brokerUrlOrClient is Object && brokerUrlOrClient is Function && brokerUrlOrClient is Map) { // Direct NATS client connection (duck-typing check) // This is a simplified check - in practice, you'd use proper typing throw Exception('Direct connection not yet implemented'); } else if (brokerUrlOrClient is String) { // String URL - create new client final client = NATSClient(brokerUrlOrClient); await client.connect(); await client.publish(subject, message, correlationId); await client.close(); } else { throw Exception('Invalid broker URL or client'); } } /// Send data via NATS with automatic transport selection /// /// This function intelligently routes data delivery based on payload size. /// If the serialized payload is smaller than size_threshold, it encodes the data as Base64 /// and publishes directly over NATS. Otherwise, it uploads the data to a fileserver /// and publishes only the download URL over NATS. /// /// [subject] - NATS subject to publish the message to /// [data] - List of [dataname, data, type] lists to send /// - dataname: Name of the payload /// - data: The actual data to send /// - type: Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary" /// [options] - Optional configuration /// /// Returns a Future that completes with a tuple of [envelope, env_json_str] Future> smartsend( String subject, List> data, { String brokerUrl = DEFAULT_BROKER_URL, String fileserverUrl = DEFAULT_FILESERVER_URL, Function? fileserverUploadHandler, int sizeThreshold = DEFAULT_SIZE_THRESHOLD, String? correlationId, String msgPurpose = 'chat', String senderName = 'NATSBridge', String receiverName = '', String receiverId = '', String replyTo = '', String replyToMsgId = '', bool isPublish = true, dynamic natsConnection, String? msgId, String? senderId, }) async { final actualCorrelationId = correlationId ?? _uuidv4(); final actualMsgId = msgId ?? _uuidv4(); final actualSenderId = senderId ?? _uuidv4(); logTrace(actualCorrelationId, 'Starting smartsend for subject: $subject'); // Process payloads final payloads = >[]; for (final item in data) { final dataname = item[0] as String; final payloadData = item[1]; final payloadType = item[2] as String; final payloadBytes = await _serializeData(payloadData, payloadType); final payloadSize = payloadBytes.length; logTrace(actualCorrelationId, 'Serialized payload \'$dataname\' (type: $payloadType) size: $payloadSize bytes'); if (payloadSize < sizeThreshold) { // Direct path final payloadB64 = base64Encode(payloadBytes); logTrace(actualCorrelationId, 'Using direct transport for $payloadSize bytes'); final payload = _buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64); payloads.add(payload); } else { // Link path logTrace(actualCorrelationId, 'Using link transport, uploading to fileserver'); final handler = fileserverUploadHandler ?? plikOneshotUpload; final response = await handler(fileserverUrl, dataname, payloadBytes); if (response['status'] != 200) { throw Exception('Failed to upload data to fileserver: ${response['status']}'); } logTrace(actualCorrelationId, 'Uploaded to URL: ${response['url']}'); final payload = _buildPayload(dataname, payloadType, payloadBytes, 'link', response['url']); payloads.add(payload); } } // Build envelope final env = _buildEnvelope(subject, payloads, { 'correlation_id': actualCorrelationId, 'msg_id': actualMsgId, 'msg_purpose': msgPurpose, 'sender_name': senderName, 'sender_id': actualSenderId, 'receiver_name': receiverName, 'receiver_id': receiverId, 'reply_to': replyTo, 'reply_to_msg_id': replyToMsgId, 'broker_url': brokerUrl, }); final envJsonStr = json.encode(env); if (isPublish) { if (natsConnection != null) { await publishMessage(natsConnection, subject, envJsonStr, actualCorrelationId); } else { await publishMessage(brokerUrl, subject, envJsonStr, actualCorrelationId); } } return [env, envJsonStr]; } /// Receive and process NATS message /// /// This function processes incoming NATS messages, handling both direct transport /// (base64 decoded payloads) and link transport (URL-based payloads). /// It deserializes the data based on the transport type and returns the result. /// /// [msg] - NATS message to process (dict with 'payloads' key) /// [options] - Optional configuration /// /// Returns a Future that completes with the envelope object with processed payloads Future> smartreceive( Map msg, { Function? fileserverDownloadHandler, int maxRetries = 5, int baseDelay = 100, int maxDelay = 5000, }) async { final correlationId = msg['correlation_id'] as String; logTrace(correlationId, 'Processing received message'); // Process all payloads in the envelope final payloadsList = >[]; final numPayloads = (msg['payloads'] as List).length; logTrace(correlationId, 'Processing $numPayloads payloads'); for (var i = 0; i < numPayloads; i++) { final payloadObj = msg['payloads'][i] as Map; final transport = payloadObj['transport'] as String; final dataname = payloadObj['dataname'] as String; if (transport == 'direct') { logTrace(correlationId, 'Direct transport - decoding payload \'$dataname\''); // Extract base64 payload from the payload final payloadB64 = payloadObj['data'] as String; // Decode Base64 payload final payloadBytes = base64Decode(payloadB64); // Deserialize based on type final dataType = payloadObj['payload_type'] as String; final data = await _deserializeData(payloadBytes, dataType, correlationId); payloadsList.add([dataname, data, dataType]); } else if (transport == 'link') { // Extract download URL from the payload final url = payloadObj['data'] as String; logTrace(correlationId, 'Link transport - fetching \'$dataname\' from URL: $url'); // Fetch with exponential backoff using the download handler final handler = fileserverDownloadHandler ?? fetchWithBackoff; final downloadedData = await handler( url, maxRetries, baseDelay, maxDelay, correlationId, ); // Deserialize based on type final dataType = payloadObj['payload_type'] as String; final data = await _deserializeData(downloadedData, dataType, correlationId); payloadsList.add([dataname, data, dataType]); } else { throw Exception('Unknown transport type for payload \'$dataname\': $transport'); } } msg['payloads'] = payloadsList; return msg; } // ---------------------------------------------- Module Exports ---------------------------------------------- // /// Convenience class for NATSBridge functionality class NATSBridge { static const DEFAULT_SIZE_THRESHOLD = DEFAULT_SIZE_THRESHOLD; static const DEFAULT_BROKER_URL = DEFAULT_BROKER_URL; static const DEFAULT_FILESERVER_URL = DEFAULT_FILESERVER_URL; /// Send data via NATS static Future> send( String subject, List> data, { String brokerUrl = DEFAULT_BROKER_URL, String fileserverUrl = DEFAULT_FILESERVER_URL, Function? fileserverUploadHandler, int sizeThreshold = DEFAULT_SIZE_THRESHOLD, String? correlationId, String msgPurpose = 'chat', String senderName = 'NATSBridge', String receiverName = '', String receiverId = '', String replyTo = '', String replyToMsgId = '', bool isPublish = true, dynamic natsConnection, String? msgId, String? senderId, }) { return smartsend( subject, data, brokerUrl: brokerUrl, fileserverUrl: fileserverUrl, fileserverUploadHandler: fileserverUploadHandler, sizeThreshold: sizeThreshold, correlationId: correlationId, msgPurpose: msgPurpose, senderName: senderName, receiverName: receiverName, receiverId: receiverId, replyTo: replyTo, replyToMsgId: replyToMsgId, isPublish: isPublish, natsConnection: natsConnection, msgId: msgId, senderId: senderId, ); } /// Receive and process NATS message static Future> receive( Map msg, { Function? fileserverDownloadHandler, int maxRetries = 5, int baseDelay = 100, int maxDelay = 5000, }) { return smartreceive( msg, fileserverDownloadHandler: fileserverDownloadHandler, maxRetries: maxRetries, baseDelay: baseDelay, maxDelay: maxDelay, ); } } // Base64 encoding/decoding utilities // These functions are re-exported from dart:convert for convenience // The dart:convert library provides these functions directly // String base64Encode(Uint8List data) - from dart:convert // Uint8List base64Decode(String data) - from dart:convert // Re-export base64 from dart:convert for convenience export 'dart:convert' show base64Encode, base64Decode;