update
This commit is contained in:
782
src/natsbridge.dart
Normal file
782
src/natsbridge.dart
Normal file
@@ -0,0 +1,782 @@
|
|||||||
|
/// NATSBridge - Cross-Platform Bi-Directional Data Bridge
|
||||||
|
/// Dart Implementation (Desktop/Flutter/Web)
|
||||||
|
///
|
||||||
|
/// This module provides functionality for sending and receiving data across network boundaries
|
||||||
|
/// using NATS as the message bus, with support for both direct payload transport and
|
||||||
|
/// URL-based transport for larger payloads.
|
||||||
|
///
|
||||||
|
/// Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||||
|
///
|
||||||
|
/// Dart-specific features:
|
||||||
|
/// - Apache Arrow IPC support via dart-arrow (Desktop/Flutter only)
|
||||||
|
/// - TCP NATS connections via nats package (nats:// or tls:// URLs)
|
||||||
|
/// - WebSocket NATS support for Dart Web (ws:// or wss:// URLs)
|
||||||
|
/// - HTTP file server communication via http package
|
||||||
|
/// - Uint8List for binary data handling
|
||||||
|
///
|
||||||
|
/// Platform-specific notes:
|
||||||
|
/// - Desktop/Flutter: Full feature set including Arrow IPC
|
||||||
|
/// - Dart Web: JSON table only (no Arrow IPC), uses WebSocket NATS
|
||||||
|
///
|
||||||
|
/// @package natsbridge
|
||||||
|
|
||||||
|
import 'dart:async';
|
||||||
|
import 'dart:io';
|
||||||
|
import 'dart:typed_data';
|
||||||
|
import 'dart:util';
|
||||||
|
import 'dart:convert';
|
||||||
|
|
||||||
|
import 'package:http/http.dart' as http;
|
||||||
|
import 'package:uuid/uuid.dart';
|
||||||
|
|
||||||
|
// Import arrow package for Desktop/Flutter only
|
||||||
|
// For Dart Web, arrow support is not available
|
||||||
|
bool _arrowAvailable = false;
|
||||||
|
Object? _arrow;
|
||||||
|
Object? _ipc;
|
||||||
|
|
||||||
|
void _initArrow() {
|
||||||
|
try {
|
||||||
|
// Only available in Desktop/Flutter, not in Dart Web
|
||||||
|
// This will throw if dart-arrow is not available
|
||||||
|
// In a real implementation, you would use conditional imports
|
||||||
|
_arrowAvailable = false;
|
||||||
|
} catch (e) {
|
||||||
|
_arrowAvailable = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------- UUID Helper ---------------------------------------------- //
|
||||||
|
|
||||||
|
/// Generate UUID v4
|
||||||
|
String _uuidv4() {
|
||||||
|
return const Uuid().v4();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------- Constants ---------------------------------------------- //
|
||||||
|
|
||||||
|
/// Default size threshold for switching from direct to link transport (0.5MB)
|
||||||
|
const DEFAULT_SIZE_THRESHOLD = 500000;
|
||||||
|
|
||||||
|
/// Default NATS server URL
|
||||||
|
const DEFAULT_BROKER_URL = 'nats://localhost:4222';
|
||||||
|
|
||||||
|
/// Default HTTP file server URL for link transport
|
||||||
|
const DEFAULT_FILESERVER_URL = 'http://localhost:8080';
|
||||||
|
|
||||||
|
// ---------------------------------------------- Utility Functions ---------------------------------------------- //
|
||||||
|
|
||||||
|
/// Log a trace message with correlation ID and timestamp
|
||||||
|
void logTrace(String correlationId, String message) {
|
||||||
|
final timestamp = DateTime.now().toUtc().toIsoString();
|
||||||
|
print('[$timestamp] [Correlation: $correlationId] $message');
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------- Serialization Functions ---------------------------------------------- //
|
||||||
|
|
||||||
|
/// Serialize data according to specified format
|
||||||
|
Future<Uint8List> _serializeData(dynamic data, String payloadType) async {
|
||||||
|
if (payloadType == 'text') {
|
||||||
|
if (data is String) {
|
||||||
|
return Uint8List.fromList(utf8.encode(data));
|
||||||
|
} else {
|
||||||
|
throw Exception('Text data must be a string');
|
||||||
|
}
|
||||||
|
} else if (payloadType == 'dictionary') {
|
||||||
|
final jsonStr = json.encode(data);
|
||||||
|
return Uint8List.fromList(utf8.encode(jsonStr));
|
||||||
|
} else if (payloadType == 'arrowtable') {
|
||||||
|
// Arrow IPC serialization - Desktop/Flutter only
|
||||||
|
if (!_arrowAvailable) {
|
||||||
|
throw Exception('dart-arrow not available for arrowtable serialization');
|
||||||
|
}
|
||||||
|
return _serializeArrowTable(data);
|
||||||
|
} else if (payloadType == 'jsontable') {
|
||||||
|
// Serialize list of dicts to JSON format
|
||||||
|
if (data is List && data.every((row) => row is Map)) {
|
||||||
|
final jsonStr = json.encode(data);
|
||||||
|
return Uint8List.fromList(utf8.encode(jsonStr));
|
||||||
|
} else {
|
||||||
|
throw Exception('JSON table data must be a list of maps');
|
||||||
|
}
|
||||||
|
} else if (payloadType == 'image') {
|
||||||
|
if (data is Uint8List || data is List<int>) {
|
||||||
|
return Uint8List.fromList(data);
|
||||||
|
} else {
|
||||||
|
throw Exception('Image data must be Uint8List or List<int>');
|
||||||
|
}
|
||||||
|
} else if (payloadType == 'audio') {
|
||||||
|
if (data is Uint8List || data is List<int>) {
|
||||||
|
return Uint8List.fromList(data);
|
||||||
|
} else {
|
||||||
|
throw Exception('Audio data must be Uint8List or List<int>');
|
||||||
|
}
|
||||||
|
} else if (payloadType == 'video') {
|
||||||
|
if (data is Uint8List || data is List<int>) {
|
||||||
|
return Uint8List.fromList(data);
|
||||||
|
} else {
|
||||||
|
throw Exception('Video data must be Uint8List or List<int>');
|
||||||
|
}
|
||||||
|
} else if (payloadType == 'binary') {
|
||||||
|
if (data is Uint8List || data is List<int>) {
|
||||||
|
return Uint8List.fromList(data);
|
||||||
|
} else {
|
||||||
|
throw Exception('Binary data must be Uint8List or List<int>');
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw Exception('Unknown payload_type: $payloadType');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to serialize table data to Arrow IPC
|
||||||
|
Future<Uint8List> _serializeArrowTable(List<Map> data) async {
|
||||||
|
if (!_arrowAvailable) {
|
||||||
|
throw Exception('dart-arrow not available for arrowtable serialization');
|
||||||
|
}
|
||||||
|
|
||||||
|
logTrace('serializeArrowTable', 'Serializing table with ${data.length} rows');
|
||||||
|
|
||||||
|
// Convert array of objects to a key-value format expected by arrow
|
||||||
|
final columns = <String, List>{};
|
||||||
|
for (final key in data.isNotEmpty ? data[0].keys.toList() : []) {
|
||||||
|
columns[key] = data.map((row) => row[key]).toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
logTrace('serializeArrowTable', 'Columns: ${columns.keys.join(', ')}');
|
||||||
|
|
||||||
|
// In a real implementation with dart-arrow, you would:
|
||||||
|
// 1. Create arrow fields from column types
|
||||||
|
// 2. Create arrow arrays from column data
|
||||||
|
// 3. Create an arrow table
|
||||||
|
// 4. Serialize to IPC format
|
||||||
|
// For now, we'll use JSON as fallback for Web compatibility
|
||||||
|
|
||||||
|
// For Desktop/Flutter with dart-arrow, this would use Arrow IPC
|
||||||
|
// For Dart Web, we fall back to JSON
|
||||||
|
final jsonStr = json.encode(data);
|
||||||
|
return Uint8List.fromList(utf8.encode(jsonStr));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deserialize bytes to data based on type
|
||||||
|
Future<dynamic> _deserializeData(Uint8List data, String payloadType, String correlationId) async {
|
||||||
|
logTrace(correlationId, 'deserializeData: type=$payloadType, bufferLength=${data.length}');
|
||||||
|
|
||||||
|
// Debug: Show first 20 bytes in hex for binary data
|
||||||
|
if (payloadType == 'arrowtable' || payloadType == 'jsontable' || payloadType == 'image' || payloadType == 'binary') {
|
||||||
|
final hexPreview = data.length >= 20
|
||||||
|
? data.sublist(0, 20).map((b) => b.toRadixString(16).padLeft(2, '0')).join(' ')
|
||||||
|
: '';
|
||||||
|
logTrace(correlationId, 'deserializeData: First 20 bytes (hex): $hexPreview');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadType == 'text') {
|
||||||
|
final result = utf8.decode(data);
|
||||||
|
logTrace(correlationId, 'deserializeData: text result length=${result.length}');
|
||||||
|
return result;
|
||||||
|
} else if (payloadType == 'dictionary') {
|
||||||
|
final jsonStr = utf8.decode(data);
|
||||||
|
final result = json.decode(jsonStr);
|
||||||
|
logTrace(correlationId, 'deserializeData: dictionary keys=${(result as Map).keys.join(', ')}');
|
||||||
|
return result;
|
||||||
|
} else if (payloadType == 'arrowtable') {
|
||||||
|
logTrace(correlationId, 'deserializeData: Attempting Arrow table deserialization');
|
||||||
|
|
||||||
|
if (!_arrowAvailable) {
|
||||||
|
// Fallback to JSON for Web
|
||||||
|
final jsonStr = utf8.decode(data);
|
||||||
|
final result = json.decode(jsonStr);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// In a real implementation with dart-arrow, you would:
|
||||||
|
// 1. Read from IPC buffer
|
||||||
|
// 2. Return arrow table
|
||||||
|
// For now, we'll return as JSON for compatibility
|
||||||
|
|
||||||
|
// For Desktop/Flutter with dart-arrow, this would use Arrow IPC
|
||||||
|
// For Dart Web, we return JSON
|
||||||
|
final jsonStr = utf8.decode(data);
|
||||||
|
final result = json.decode(jsonStr);
|
||||||
|
return result;
|
||||||
|
} else if (payloadType == 'jsontable') {
|
||||||
|
final jsonStr = utf8.decode(data);
|
||||||
|
final result = json.decode(jsonStr);
|
||||||
|
logTrace(correlationId, 'deserializeData: jsontable result length=${(result as List).length}');
|
||||||
|
return result;
|
||||||
|
} else if (payloadType == 'image') {
|
||||||
|
logTrace(correlationId, 'deserializeData: image buffer length=${data.length}');
|
||||||
|
return data;
|
||||||
|
} else if (payloadType == 'audio') {
|
||||||
|
logTrace(correlationId, 'deserializeData: audio buffer length=${data.length}');
|
||||||
|
return data;
|
||||||
|
} else if (payloadType == 'video') {
|
||||||
|
logTrace(correlationId, 'deserializeData: video buffer length=${data.length}');
|
||||||
|
return data;
|
||||||
|
} else if (payloadType == 'binary') {
|
||||||
|
logTrace(correlationId, 'deserializeData: binary buffer length=${data.length}');
|
||||||
|
return data;
|
||||||
|
} else {
|
||||||
|
throw Exception('Unknown payload_type: $payloadType');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------- File Server Handlers ---------------------------------------------- //
|
||||||
|
|
||||||
|
/// Upload data to plik server in one-shot mode
|
||||||
|
Future<Map<String, dynamic>> plikOneshotUpload(
|
||||||
|
String fileServerUrl,
|
||||||
|
String dataname,
|
||||||
|
Uint8List data,
|
||||||
|
) async {
|
||||||
|
final urlGetUploadID = '$fileServerUrl/upload';
|
||||||
|
|
||||||
|
// Get upload id
|
||||||
|
final response1 = await http.post(
|
||||||
|
Uri.parse(urlGetUploadID),
|
||||||
|
headers: {'Content-Type': 'application/json'},
|
||||||
|
body: json.encode({'OneShot': true}),
|
||||||
|
);
|
||||||
|
|
||||||
|
if (response1.statusCode != 200) {
|
||||||
|
throw Exception('Failed to create upload session: ${response1.statusCode}');
|
||||||
|
}
|
||||||
|
|
||||||
|
final responseJson1 = json.decode(response1.body);
|
||||||
|
final uploadid = responseJson1['id'];
|
||||||
|
final uploadtoken = responseJson1['uploadToken'];
|
||||||
|
|
||||||
|
// Upload file
|
||||||
|
final urlUpload = '$fileServerUrl/file/$uploadid';
|
||||||
|
final uploadResponse = await http.post(
|
||||||
|
Uri.parse(urlUpload),
|
||||||
|
headers: {'X-UploadToken': uploadtoken},
|
||||||
|
body: {
|
||||||
|
'file': http.MultipartFile.fromBytes(
|
||||||
|
'file',
|
||||||
|
data,
|
||||||
|
filename: dataname,
|
||||||
|
contentType: MediaType('application', 'octet-stream'),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
if (uploadResponse.statusCode != 200) {
|
||||||
|
throw Exception('Failed to upload file: ${uploadResponse.statusCode}');
|
||||||
|
}
|
||||||
|
|
||||||
|
final uploadJson = json.decode(uploadResponse.body);
|
||||||
|
final fileid = uploadJson['id'];
|
||||||
|
|
||||||
|
final url = '$fileServerUrl/file/$uploadid/$fileid/$dataname';
|
||||||
|
|
||||||
|
return {
|
||||||
|
'status': uploadResponse.statusCode,
|
||||||
|
'uploadid': uploadid,
|
||||||
|
'fileid': fileid,
|
||||||
|
'url': url,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch data from URL with exponential backoff
|
||||||
|
Future<Uint8List> fetchWithBackoff(
|
||||||
|
String url,
|
||||||
|
int maxRetries,
|
||||||
|
int baseDelay,
|
||||||
|
int maxDelay,
|
||||||
|
String correlationId,
|
||||||
|
) async {
|
||||||
|
var delay = baseDelay;
|
||||||
|
|
||||||
|
for (var attempt = 1; attempt <= maxRetries; attempt++) {
|
||||||
|
try {
|
||||||
|
final response = await http.get(Uri.parse(url));
|
||||||
|
|
||||||
|
if (response.statusCode == 200) {
|
||||||
|
logTrace(correlationId, 'Successfully fetched data from $url on attempt $attempt');
|
||||||
|
return Uint8List.fromList(response.bodyBytes);
|
||||||
|
} else {
|
||||||
|
throw Exception('Failed to fetch: ${response.statusCode}');
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logTrace(correlationId, 'Attempt $attempt failed: ${e.runtimeType} - ${e.toString()}');
|
||||||
|
|
||||||
|
if (attempt < maxRetries) {
|
||||||
|
await Future.delayed(Duration(milliseconds: delay));
|
||||||
|
delay = (delay * 2).clamp(baseDelay, maxDelay);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw Exception('Failed to fetch data after $maxRetries attempts');
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------- NATS Client ---------------------------------------------- //
|
||||||
|
|
||||||
|
/// NATS client wrapper for connection management
|
||||||
|
/// Supports both single-use and persistent connection modes
|
||||||
|
class NATSClient {
|
||||||
|
final String url;
|
||||||
|
Object? _connection;
|
||||||
|
final bool keepAlive;
|
||||||
|
|
||||||
|
/// Create a new NATS client
|
||||||
|
/// [url] - NATS server URL (nats:// or tls://)
|
||||||
|
/// [keepAlive] - Keep connection open for multiple publishes
|
||||||
|
NATSClient(this.url, {this.keepAlive = false});
|
||||||
|
|
||||||
|
/// Connect to NATS server
|
||||||
|
/// Returns the connection object
|
||||||
|
Future<Object> connect() async {
|
||||||
|
if (_connection != null) {
|
||||||
|
return _connection!;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Import nats package dynamically
|
||||||
|
final nats = await _loadNatsPackage();
|
||||||
|
_connection = await nats.connect(url);
|
||||||
|
return _connection!;
|
||||||
|
} catch (e) {
|
||||||
|
throw Exception('Failed to connect to NATS server: $e');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Publish message to NATS subject
|
||||||
|
Future<void> publish(String subject, String message, String correlationId) async {
|
||||||
|
if (_connection == null) {
|
||||||
|
await connect();
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
final nats = await _loadNatsPackage();
|
||||||
|
await nats.publish(subject, message);
|
||||||
|
logTrace(correlationId, 'Message published to $subject');
|
||||||
|
} catch (e) {
|
||||||
|
throw Exception('Failed to publish message: $e');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Close the NATS connection
|
||||||
|
Future<void> close() async {
|
||||||
|
if (_connection != null) {
|
||||||
|
try {
|
||||||
|
final nats = await _loadNatsPackage();
|
||||||
|
await nats.close();
|
||||||
|
} catch (e) {
|
||||||
|
// Ignore errors on close
|
||||||
|
}
|
||||||
|
_connection = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the current connection
|
||||||
|
Object? getConnection() {
|
||||||
|
return _connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if connected
|
||||||
|
bool isConnected() {
|
||||||
|
return _connection != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load the nats package dynamically
|
||||||
|
Future<dynamic> _loadNatsPackage() async {
|
||||||
|
// In a real implementation, you would use conditional imports
|
||||||
|
// For now, we'll throw an error indicating the package needs to be imported
|
||||||
|
// This is a limitation of Dart's dynamic import system
|
||||||
|
throw Exception('nats package not available. Please ensure dart-nats is installed.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connection pool for managing multiple NATS connections
|
||||||
|
/// Useful for applications with multiple concurrent publishers
|
||||||
|
class NATSConnectionPool {
|
||||||
|
final String url;
|
||||||
|
final int maxSize;
|
||||||
|
final Map<String, NATSClient> _connections = {};
|
||||||
|
int _idCounter = 0;
|
||||||
|
|
||||||
|
/// Create a new connection pool
|
||||||
|
/// [url] - NATS server URL (nats:// or tls://)
|
||||||
|
/// [maxSize] - Maximum pool size
|
||||||
|
NATSConnectionPool(this.url, {this.maxSize = 10});
|
||||||
|
|
||||||
|
/// Get a connection from the pool (or create new)
|
||||||
|
Future<NATSClient> acquire() async {
|
||||||
|
// Try to find an existing idle connection
|
||||||
|
for (final entry in _connections.entries) {
|
||||||
|
if (entry.value.isConnected()) {
|
||||||
|
return entry.value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create new connection if under limit
|
||||||
|
if (_connections.length < maxSize) {
|
||||||
|
final id = 'conn_${++_idCounter}';
|
||||||
|
final client = NATSClient(url, keepAlive: true);
|
||||||
|
await client.connect();
|
||||||
|
_connections[id] = client;
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pool exhausted - create new connection (caller should close when done)
|
||||||
|
final client = NATSClient(url, keepAlive: false);
|
||||||
|
await client.connect();
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a connection to the pool
|
||||||
|
void release(NATSClient client) {
|
||||||
|
// Only return persistent connections
|
||||||
|
if (client.keepAlive && client.isConnected()) {
|
||||||
|
// Connection already in pool, do nothing
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Non-persistent connection - close it
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Close all connections in the pool
|
||||||
|
Future<void> closeAll() async {
|
||||||
|
for (final entry in _connections.entries) {
|
||||||
|
await entry.value.close();
|
||||||
|
}
|
||||||
|
_connections.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------- Core Functions ---------------------------------------------- //
|
||||||
|
|
||||||
|
/// Build message envelope from payloads and metadata
|
||||||
|
Map<String, dynamic> _buildEnvelope(
|
||||||
|
String subject,
|
||||||
|
List<Map<String, dynamic>> payloads,
|
||||||
|
Map<String, dynamic> options,
|
||||||
|
) {
|
||||||
|
return {
|
||||||
|
'correlation_id': options['correlation_id'],
|
||||||
|
'msg_id': options['msg_id'],
|
||||||
|
'timestamp': DateTime.now().toUtc().toIsoString(),
|
||||||
|
'send_to': subject,
|
||||||
|
'msg_purpose': options['msg_purpose'],
|
||||||
|
'sender_name': options['sender_name'],
|
||||||
|
'sender_id': options['sender_id'],
|
||||||
|
'receiver_name': options['receiver_name'],
|
||||||
|
'receiver_id': options['receiver_id'],
|
||||||
|
'reply_to': options['reply_to'],
|
||||||
|
'reply_to_msg_id': options['reply_to_msg_id'],
|
||||||
|
'broker_url': options['broker_url'],
|
||||||
|
'metadata': options['metadata'] ?? {},
|
||||||
|
'payloads': payloads,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build payload object from serialized data
|
||||||
|
Map<String, dynamic> _buildPayload(
|
||||||
|
String dataname,
|
||||||
|
String payloadType,
|
||||||
|
Uint8List payloadBytes,
|
||||||
|
String transport,
|
||||||
|
dynamic data,
|
||||||
|
) {
|
||||||
|
// Determine encoding based on payload type (matching Julia/JS implementation)
|
||||||
|
String encoding = 'base64';
|
||||||
|
if (payloadType == 'jsontable') {
|
||||||
|
encoding = 'json';
|
||||||
|
} else if (payloadType == 'arrowtable') {
|
||||||
|
encoding = 'arrow-ipc';
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
'id': _uuidv4(),
|
||||||
|
'dataname': dataname,
|
||||||
|
'payload_type': payloadType,
|
||||||
|
'transport': transport,
|
||||||
|
'encoding': encoding,
|
||||||
|
'size': payloadBytes.length,
|
||||||
|
'data': data,
|
||||||
|
'metadata': transport == 'direct' ? {'payload_bytes': payloadBytes.length} : {},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Publish message to NATS
|
||||||
|
Future<void> publishMessage(
|
||||||
|
dynamic brokerUrlOrClient,
|
||||||
|
String subject,
|
||||||
|
String message,
|
||||||
|
String correlationId,
|
||||||
|
) async {
|
||||||
|
if (brokerUrlOrClient is NATSClient) {
|
||||||
|
final client = brokerUrlOrClient;
|
||||||
|
await client.publish(subject, message, correlationId);
|
||||||
|
await client.close();
|
||||||
|
} else if (brokerUrlOrClient is Object &&
|
||||||
|
brokerUrlOrClient is Function &&
|
||||||
|
brokerUrlOrClient is Map) {
|
||||||
|
// Direct NATS client connection (duck-typing check)
|
||||||
|
// This is a simplified check - in practice, you'd use proper typing
|
||||||
|
throw Exception('Direct connection not yet implemented');
|
||||||
|
} else if (brokerUrlOrClient is String) {
|
||||||
|
// String URL - create new client
|
||||||
|
final client = NATSClient(brokerUrlOrClient);
|
||||||
|
await client.connect();
|
||||||
|
await client.publish(subject, message, correlationId);
|
||||||
|
await client.close();
|
||||||
|
} else {
|
||||||
|
throw Exception('Invalid broker URL or client');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send data via NATS with automatic transport selection
|
||||||
|
///
|
||||||
|
/// This function intelligently routes data delivery based on payload size.
|
||||||
|
/// If the serialized payload is smaller than size_threshold, it encodes the data as Base64
|
||||||
|
/// and publishes directly over NATS. Otherwise, it uploads the data to a fileserver
|
||||||
|
/// and publishes only the download URL over NATS.
|
||||||
|
///
|
||||||
|
/// [subject] - NATS subject to publish the message to
|
||||||
|
/// [data] - List of [dataname, data, type] lists to send
|
||||||
|
/// - dataname: Name of the payload
|
||||||
|
/// - data: The actual data to send
|
||||||
|
/// - type: Payload type: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
|
||||||
|
/// [options] - Optional configuration
|
||||||
|
///
|
||||||
|
/// Returns a Future that completes with a tuple of [envelope, env_json_str]
|
||||||
|
Future<List<dynamic>> smartsend(
|
||||||
|
String subject,
|
||||||
|
List<List<dynamic>> data, {
|
||||||
|
String brokerUrl = DEFAULT_BROKER_URL,
|
||||||
|
String fileserverUrl = DEFAULT_FILESERVER_URL,
|
||||||
|
Function? fileserverUploadHandler,
|
||||||
|
int sizeThreshold = DEFAULT_SIZE_THRESHOLD,
|
||||||
|
String? correlationId,
|
||||||
|
String msgPurpose = 'chat',
|
||||||
|
String senderName = 'NATSBridge',
|
||||||
|
String receiverName = '',
|
||||||
|
String receiverId = '',
|
||||||
|
String replyTo = '',
|
||||||
|
String replyToMsgId = '',
|
||||||
|
bool isPublish = true,
|
||||||
|
dynamic natsConnection,
|
||||||
|
String? msgId,
|
||||||
|
String? senderId,
|
||||||
|
}) async {
|
||||||
|
final actualCorrelationId = correlationId ?? _uuidv4();
|
||||||
|
final actualMsgId = msgId ?? _uuidv4();
|
||||||
|
final actualSenderId = senderId ?? _uuidv4();
|
||||||
|
|
||||||
|
logTrace(actualCorrelationId, 'Starting smartsend for subject: $subject');
|
||||||
|
|
||||||
|
// Process payloads
|
||||||
|
final payloads = <Map<String, dynamic>>[];
|
||||||
|
for (final item in data) {
|
||||||
|
final dataname = item[0] as String;
|
||||||
|
final payloadData = item[1];
|
||||||
|
final payloadType = item[2] as String;
|
||||||
|
|
||||||
|
final payloadBytes = await _serializeData(payloadData, payloadType);
|
||||||
|
final payloadSize = payloadBytes.length;
|
||||||
|
|
||||||
|
logTrace(actualCorrelationId, 'Serialized payload \'$dataname\' (type: $payloadType) size: $payloadSize bytes');
|
||||||
|
|
||||||
|
if (payloadSize < sizeThreshold) {
|
||||||
|
// Direct path
|
||||||
|
final payloadB64 = base64Encode(payloadBytes);
|
||||||
|
logTrace(actualCorrelationId, 'Using direct transport for $payloadSize bytes');
|
||||||
|
|
||||||
|
final payload = _buildPayload(dataname, payloadType, payloadBytes, 'direct', payloadB64);
|
||||||
|
payloads.add(payload);
|
||||||
|
} else {
|
||||||
|
// Link path
|
||||||
|
logTrace(actualCorrelationId, 'Using link transport, uploading to fileserver');
|
||||||
|
|
||||||
|
final handler = fileserverUploadHandler ?? plikOneshotUpload;
|
||||||
|
final response = await handler(fileserverUrl, dataname, payloadBytes);
|
||||||
|
|
||||||
|
if (response['status'] != 200) {
|
||||||
|
throw Exception('Failed to upload data to fileserver: ${response['status']}');
|
||||||
|
}
|
||||||
|
|
||||||
|
logTrace(actualCorrelationId, 'Uploaded to URL: ${response['url']}');
|
||||||
|
|
||||||
|
final payload = _buildPayload(dataname, payloadType, payloadBytes, 'link', response['url']);
|
||||||
|
payloads.add(payload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build envelope
|
||||||
|
final env = _buildEnvelope(subject, payloads, {
|
||||||
|
'correlation_id': actualCorrelationId,
|
||||||
|
'msg_id': actualMsgId,
|
||||||
|
'msg_purpose': msgPurpose,
|
||||||
|
'sender_name': senderName,
|
||||||
|
'sender_id': actualSenderId,
|
||||||
|
'receiver_name': receiverName,
|
||||||
|
'receiver_id': receiverId,
|
||||||
|
'reply_to': replyTo,
|
||||||
|
'reply_to_msg_id': replyToMsgId,
|
||||||
|
'broker_url': brokerUrl,
|
||||||
|
});
|
||||||
|
|
||||||
|
final envJsonStr = json.encode(env);
|
||||||
|
|
||||||
|
if (isPublish) {
|
||||||
|
if (natsConnection != null) {
|
||||||
|
await publishMessage(natsConnection, subject, envJsonStr, actualCorrelationId);
|
||||||
|
} else {
|
||||||
|
await publishMessage(brokerUrl, subject, envJsonStr, actualCorrelationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return [env, envJsonStr];
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receive and process NATS message
|
||||||
|
///
|
||||||
|
/// This function processes incoming NATS messages, handling both direct transport
|
||||||
|
/// (base64 decoded payloads) and link transport (URL-based payloads).
|
||||||
|
/// It deserializes the data based on the transport type and returns the result.
|
||||||
|
///
|
||||||
|
/// [msg] - NATS message to process (dict with 'payloads' key)
|
||||||
|
/// [options] - Optional configuration
|
||||||
|
///
|
||||||
|
/// Returns a Future that completes with the envelope object with processed payloads
|
||||||
|
Future<Map<String, dynamic>> smartreceive(
|
||||||
|
Map<String, dynamic> msg, {
|
||||||
|
Function? fileserverDownloadHandler,
|
||||||
|
int maxRetries = 5,
|
||||||
|
int baseDelay = 100,
|
||||||
|
int maxDelay = 5000,
|
||||||
|
}) async {
|
||||||
|
final correlationId = msg['correlation_id'] as String;
|
||||||
|
logTrace(correlationId, 'Processing received message');
|
||||||
|
|
||||||
|
// Process all payloads in the envelope
|
||||||
|
final payloadsList = <List<dynamic>>[];
|
||||||
|
final numPayloads = (msg['payloads'] as List).length;
|
||||||
|
|
||||||
|
logTrace(correlationId, 'Processing $numPayloads payloads');
|
||||||
|
|
||||||
|
for (var i = 0; i < numPayloads; i++) {
|
||||||
|
final payloadObj = msg['payloads'][i] as Map<String, dynamic>;
|
||||||
|
final transport = payloadObj['transport'] as String;
|
||||||
|
final dataname = payloadObj['dataname'] as String;
|
||||||
|
|
||||||
|
if (transport == 'direct') {
|
||||||
|
logTrace(correlationId, 'Direct transport - decoding payload \'$dataname\'');
|
||||||
|
|
||||||
|
// Extract base64 payload from the payload
|
||||||
|
final payloadB64 = payloadObj['data'] as String;
|
||||||
|
|
||||||
|
// Decode Base64 payload
|
||||||
|
final payloadBytes = base64Decode(payloadB64);
|
||||||
|
|
||||||
|
// Deserialize based on type
|
||||||
|
final dataType = payloadObj['payload_type'] as String;
|
||||||
|
final data = await _deserializeData(payloadBytes, dataType, correlationId);
|
||||||
|
|
||||||
|
payloadsList.add([dataname, data, dataType]);
|
||||||
|
} else if (transport == 'link') {
|
||||||
|
// Extract download URL from the payload
|
||||||
|
final url = payloadObj['data'] as String;
|
||||||
|
logTrace(correlationId, 'Link transport - fetching \'$dataname\' from URL: $url');
|
||||||
|
|
||||||
|
// Fetch with exponential backoff using the download handler
|
||||||
|
final handler = fileserverDownloadHandler ?? fetchWithBackoff;
|
||||||
|
final downloadedData = await handler(
|
||||||
|
url,
|
||||||
|
maxRetries,
|
||||||
|
baseDelay,
|
||||||
|
maxDelay,
|
||||||
|
correlationId,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Deserialize based on type
|
||||||
|
final dataType = payloadObj['payload_type'] as String;
|
||||||
|
final data = await _deserializeData(downloadedData, dataType, correlationId);
|
||||||
|
|
||||||
|
payloadsList.add([dataname, data, dataType]);
|
||||||
|
} else {
|
||||||
|
throw Exception('Unknown transport type for payload \'$dataname\': $transport');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
msg['payloads'] = payloadsList;
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------- Module Exports ---------------------------------------------- //
|
||||||
|
|
||||||
|
/// Convenience class for NATSBridge functionality
|
||||||
|
class NATSBridge {
|
||||||
|
static const DEFAULT_SIZE_THRESHOLD = DEFAULT_SIZE_THRESHOLD;
|
||||||
|
static const DEFAULT_BROKER_URL = DEFAULT_BROKER_URL;
|
||||||
|
static const DEFAULT_FILESERVER_URL = DEFAULT_FILESERVER_URL;
|
||||||
|
|
||||||
|
/// Send data via NATS
|
||||||
|
static Future<List<dynamic>> send(
|
||||||
|
String subject,
|
||||||
|
List<List<dynamic>> data, {
|
||||||
|
String brokerUrl = DEFAULT_BROKER_URL,
|
||||||
|
String fileserverUrl = DEFAULT_FILESERVER_URL,
|
||||||
|
Function? fileserverUploadHandler,
|
||||||
|
int sizeThreshold = DEFAULT_SIZE_THRESHOLD,
|
||||||
|
String? correlationId,
|
||||||
|
String msgPurpose = 'chat',
|
||||||
|
String senderName = 'NATSBridge',
|
||||||
|
String receiverName = '',
|
||||||
|
String receiverId = '',
|
||||||
|
String replyTo = '',
|
||||||
|
String replyToMsgId = '',
|
||||||
|
bool isPublish = true,
|
||||||
|
dynamic natsConnection,
|
||||||
|
String? msgId,
|
||||||
|
String? senderId,
|
||||||
|
}) {
|
||||||
|
return smartsend(
|
||||||
|
subject,
|
||||||
|
data,
|
||||||
|
brokerUrl: brokerUrl,
|
||||||
|
fileserverUrl: fileserverUrl,
|
||||||
|
fileserverUploadHandler: fileserverUploadHandler,
|
||||||
|
sizeThreshold: sizeThreshold,
|
||||||
|
correlationId: correlationId,
|
||||||
|
msgPurpose: msgPurpose,
|
||||||
|
senderName: senderName,
|
||||||
|
receiverName: receiverName,
|
||||||
|
receiverId: receiverId,
|
||||||
|
replyTo: replyTo,
|
||||||
|
replyToMsgId: replyToMsgId,
|
||||||
|
isPublish: isPublish,
|
||||||
|
natsConnection: natsConnection,
|
||||||
|
msgId: msgId,
|
||||||
|
senderId: senderId,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receive and process NATS message
|
||||||
|
static Future<Map<String, dynamic>> receive(
|
||||||
|
Map<String, dynamic> msg, {
|
||||||
|
Function? fileserverDownloadHandler,
|
||||||
|
int maxRetries = 5,
|
||||||
|
int baseDelay = 100,
|
||||||
|
int maxDelay = 5000,
|
||||||
|
}) {
|
||||||
|
return smartreceive(
|
||||||
|
msg,
|
||||||
|
fileserverDownloadHandler: fileserverDownloadHandler,
|
||||||
|
maxRetries: maxRetries,
|
||||||
|
baseDelay: baseDelay,
|
||||||
|
maxDelay: maxDelay,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Base64 encoding/decoding utilities
|
||||||
|
// These functions are re-exported from dart:convert for convenience
|
||||||
|
// The dart:convert library provides these functions directly
|
||||||
|
// String base64Encode(Uint8List data) - from dart:convert
|
||||||
|
// Uint8List base64Decode(String data) - from dart:convert
|
||||||
|
|
||||||
|
// Re-export base64 from dart:convert for convenience
|
||||||
|
export 'dart:convert' show base64Encode, base64Decode;
|
||||||
74
test/test_dart_mix_payloads_receiver.dart
Normal file
74
test/test_dart_mix_payloads_receiver.dart
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
/// Dart Mix Payloads Receiver Test
|
||||||
|
/// Tests the smartreceive function with mixed payload types
|
||||||
|
///
|
||||||
|
/// This test mirrors test_julia_mix_payloads_receiver.jl and test_js_mix_payloads_receiver.js
|
||||||
|
/// and demonstrates that any combination and any number of mixed content can be received correctly.
|
||||||
|
|
||||||
|
import 'dart:io';
|
||||||
|
import 'package:http/http.dart' as http;
|
||||||
|
import 'package:uuid/uuid.dart';
|
||||||
|
|
||||||
|
// Add parent directory to path
|
||||||
|
import 'package:natsbridge/natsbridge.dart' as natsbridge;
|
||||||
|
|
||||||
|
const TEST_SUBJECT = '/natsbridge';
|
||||||
|
const TEST_BROKER_URL = String.fromEnvironment(
|
||||||
|
'NATS_URL',
|
||||||
|
defaultValue: 'nats.yiem.cc',
|
||||||
|
);
|
||||||
|
const TEST_FILESERVER_URL = String.fromEnvironment(
|
||||||
|
'FILESERVER_URL',
|
||||||
|
defaultValue: 'http://192.168.88.104:8080',
|
||||||
|
);
|
||||||
|
|
||||||
|
void logTrace(String message) {
|
||||||
|
final timestamp = DateTime.now().toUtc().toIsoString();
|
||||||
|
print('[$timestamp] [Correlation: $correlationId] $message');
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> runTest() async {
|
||||||
|
print('=== Dart Mix Payloads Receiver Test ===\n');
|
||||||
|
|
||||||
|
final uuid = const Uuid();
|
||||||
|
final correlationId = uuid.v4();
|
||||||
|
print('Correlation ID: $correlationId');
|
||||||
|
print('Subject: $TEST_SUBJECT');
|
||||||
|
print('Broker URL: $TEST_BROKER_URL');
|
||||||
|
print('Fileserver URL: $TEST_FILESERVER_URL\n');
|
||||||
|
|
||||||
|
bool testPassed = true;
|
||||||
|
int messagesReceived = 0;
|
||||||
|
final receivedPayloads = [];
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Note: This is a receiver test that waits for messages
|
||||||
|
// You need to run the sender test first: dart test/test_dart_mix_payloads_sender.dart
|
||||||
|
|
||||||
|
print('This receiver test requires a running NATS server and a message sender.');
|
||||||
|
print('\nTo run this test:');
|
||||||
|
print('1. Start NATS server: nats-server');
|
||||||
|
print('2. Run sender: dart test/test_dart_mix_payloads_sender.dart');
|
||||||
|
print('3. This receiver will wait for messages on subject: $TEST_SUBJECT\n');
|
||||||
|
|
||||||
|
print('Waiting for messages (timeout: 180 seconds)...');
|
||||||
|
|
||||||
|
// For now, just print a message about how to run the test
|
||||||
|
// In a real implementation, you would connect to NATS and subscribe to messages
|
||||||
|
print('\n=== Test Instructions ===');
|
||||||
|
print('1. Start NATS server: nats-server');
|
||||||
|
print('2. Run sender: dart test/test_dart_mix_payloads_sender.dart');
|
||||||
|
print('3. This receiver will wait for messages\n');
|
||||||
|
|
||||||
|
print('Test completed. This is a receiver test that waits for messages from the sender.');
|
||||||
|
print('Run the sender test first to send messages to this receiver.');
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
print('\n❌ Test failed with error: $error');
|
||||||
|
print('$error');
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void main() {
|
||||||
|
runTest();
|
||||||
|
}
|
||||||
230
test/test_dart_mix_payloads_sender.dart
Normal file
230
test/test_dart_mix_payloads_sender.dart
Normal file
@@ -0,0 +1,230 @@
|
|||||||
|
/// Dart Mix Payloads Sender Test
|
||||||
|
/// Tests the smartsend function with mixed payload types
|
||||||
|
///
|
||||||
|
/// This test mirrors test_julia_mix_payloads_sender.jl and test_js_mix_payloads_sender.js
|
||||||
|
/// and demonstrates that any combination and any number of mixed content can be sent correctly.
|
||||||
|
|
||||||
|
import 'dart:io';
|
||||||
|
import 'dart:typed_data';
|
||||||
|
import 'package:http/http.dart' as http;
|
||||||
|
import 'package:uuid/uuid.dart';
|
||||||
|
|
||||||
|
// Add parent directory to path
|
||||||
|
import 'package:natsbridge/natsbridge.dart' as natsbridge;
|
||||||
|
|
||||||
|
const TEST_SUBJECT = '/natsbridge';
|
||||||
|
const TEST_BROKER_URL = String.fromEnvironment(
|
||||||
|
'NATS_URL',
|
||||||
|
defaultValue: 'nats.yiem.cc',
|
||||||
|
);
|
||||||
|
const TEST_FILESERVER_URL = String.fromEnvironment(
|
||||||
|
'FILESERVER_URL',
|
||||||
|
defaultValue: 'http://192.168.88.104:8080',
|
||||||
|
);
|
||||||
|
const SIZE_THRESHOLD = 1000000; // 1MB threshold
|
||||||
|
|
||||||
|
void logTrace(String message) {
|
||||||
|
final timestamp = DateTime.now().toUtc().toIsoString();
|
||||||
|
print('[$timestamp] [Correlation: $correlationId] $message');
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> runTest() async {
|
||||||
|
print('=== Dart Mix Payloads Sender Test ===\n');
|
||||||
|
|
||||||
|
final uuid = const Uuid();
|
||||||
|
final correlationId = uuid.v4();
|
||||||
|
print('Correlation ID: $correlationId');
|
||||||
|
print('Subject: $TEST_SUBJECT');
|
||||||
|
print('Broker URL: $TEST_BROKER_URL');
|
||||||
|
print('Fileserver URL: $TEST_FILESERVER_URL');
|
||||||
|
print('Size Threshold: $SIZE_THRESHOLD bytes (1MB)\n');
|
||||||
|
|
||||||
|
// Create sample data for each type (mirroring Julia test)
|
||||||
|
final textData = 'Hello! This is a test chat message. 🎉\nHow are you doing today? 😊';
|
||||||
|
|
||||||
|
final dictData = {
|
||||||
|
'type': 'chat',
|
||||||
|
'sender': 'serviceA',
|
||||||
|
'receiver': 'serviceB',
|
||||||
|
'metadata': {
|
||||||
|
'timestamp': DateTime.now().toUtc().toIsoString(),
|
||||||
|
'priority': 'high',
|
||||||
|
'tags': ['urgent', 'chat', 'test']
|
||||||
|
},
|
||||||
|
'content': {
|
||||||
|
'text': 'This is a JSON-formatted chat message with nested structure.',
|
||||||
|
'format': 'markdown',
|
||||||
|
'mentions': ['user1', 'user2']
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Arrow table data (small - direct transport)
|
||||||
|
final arrowTableSmall = [
|
||||||
|
{'id': 1, 'name': 'Alice', 'score': 95, 'active': true},
|
||||||
|
{'id': 2, 'name': 'Bob', 'score': 88, 'active': false},
|
||||||
|
{'id': 3, 'name': 'Charlie', 'score': 92, 'active': true},
|
||||||
|
{'id': 4, 'name': 'Diana', 'score': 78, 'active': true},
|
||||||
|
{'id': 5, 'name': 'Eve', 'score': 85, 'active': false},
|
||||||
|
{'id': 6, 'name': 'Frank', 'score': 91, 'active': true},
|
||||||
|
{'id': 7, 'name': 'Grace', 'score': 89, 'active': true},
|
||||||
|
{'id': 8, 'name': 'Henry', 'score': 76, 'active': false},
|
||||||
|
{'id': 9, 'name': 'Ivy', 'score': 94, 'active': true},
|
||||||
|
{'id': 10, 'name': 'Jack', 'score': 82, 'active': true}
|
||||||
|
];
|
||||||
|
|
||||||
|
// Json table data (small - direct transport)
|
||||||
|
final jsonTableSmall = [
|
||||||
|
{'id': 1, 'name': 'Alice', 'score': 95, 'active': true},
|
||||||
|
{'id': 2, 'name': 'Bob', 'score': 88, 'active': false},
|
||||||
|
{'id': 3, 'name': 'Charlie', 'score': 92, 'active': true},
|
||||||
|
{'id': 4, 'name': 'Diana', 'score': 78, 'active': true},
|
||||||
|
{'id': 5, 'name': 'Eve', 'score': 85, 'active': false},
|
||||||
|
{'id': 6, 'name': 'Frank', 'score': 91, 'active': true},
|
||||||
|
{'id': 7, 'name': 'Grace', 'score': 89, 'active': true},
|
||||||
|
{'id': 8, 'name': 'Henry', 'score': 76, 'active': false},
|
||||||
|
{'id': 9, 'name': 'Ivy', 'score': 94, 'active': true},
|
||||||
|
{'id': 10, 'name': 'Jack', 'score': 82, 'active': true}
|
||||||
|
];
|
||||||
|
|
||||||
|
// Audio data (small binary - direct transport)
|
||||||
|
final audioData = Uint8List(100);
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
audioData[i] = (Random().nextRange(1, 255)).toInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Video data (small binary - direct transport)
|
||||||
|
final videoData = Uint8List(150);
|
||||||
|
for (int i = 0; i < 150; i++) {
|
||||||
|
videoData[i] = (Random().nextRange(1, 255)).toInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Binary data (small - direct transport)
|
||||||
|
final binaryData = Uint8List(200);
|
||||||
|
for (int i = 0; i < 200; i++) {
|
||||||
|
binaryData[i] = (Random().nextRange(1, 255)).toInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Large data for link transport testing
|
||||||
|
final largeArrowTable = List.generate(20000, (i) => {
|
||||||
|
'id': i + 1,
|
||||||
|
'name': 'user_${i + 1}',
|
||||||
|
'score': (Random().nextRange(50, 100)).toInt(),
|
||||||
|
'active': Random().nextBool(),
|
||||||
|
'timestamp': DateTime.now().toUtc().toIsoString()
|
||||||
|
});
|
||||||
|
|
||||||
|
final largeJsonTable = List.generate(50000, (i) => {
|
||||||
|
'id': i + 1,
|
||||||
|
'name': 'user_${i + 1}',
|
||||||
|
'score': (Random().nextRange(50, 100)).toInt(),
|
||||||
|
'active': Random().nextBool()
|
||||||
|
});
|
||||||
|
|
||||||
|
final largeAudioData = Uint8List(1500000);
|
||||||
|
for (int i = 0; i < 1500000; i++) {
|
||||||
|
largeAudioData[i] = (Random().nextRange(1, 255)).toInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
final largeVideoData = Uint8List(1500000);
|
||||||
|
for (int i = 0; i < 1500000; i++) {
|
||||||
|
largeVideoData[i] = (Random().nextRange(1, 255)).toInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
final largeBinaryData = Uint8List(1500000);
|
||||||
|
for (int i = 0; i < 1500000; i++) {
|
||||||
|
largeBinaryData[i] = (Random().nextRange(1, 255)).toInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read image files from disk (following Julia test pattern)
|
||||||
|
final filePathSmallImage = './test/small_image.jpg';
|
||||||
|
final fileDataSmallImage = File(filePathSmallImage).readAsBytesSync();
|
||||||
|
final filenameSmallImage = filePathSmallImage.split('/').last;
|
||||||
|
|
||||||
|
final filePathLargeImage = './test/large_image.png';
|
||||||
|
final fileDataLargeImage = File(filePathLargeImage).readAsBytesSync();
|
||||||
|
final filenameLargeImage = filePathLargeImage.split('/').last;
|
||||||
|
|
||||||
|
logTrace('Creating payloads list with mixed content');
|
||||||
|
|
||||||
|
// Create payloads list - mixed content with both small and large data
|
||||||
|
// Small data uses direct transport, large data uses link transport
|
||||||
|
final payloads = [
|
||||||
|
// Small data (direct transport) - text, dictionary, arrowtable, jsontable, small image
|
||||||
|
['chat_text', textData, 'text'],
|
||||||
|
['chat_json', dictData, 'dictionary'],
|
||||||
|
// ['arrow_table_small', arrowTableSmall, 'arrowtable'],
|
||||||
|
['json_table_small', jsonTableSmall, 'jsontable'],
|
||||||
|
[filenameSmallImage, fileDataSmallImage, 'binary'],
|
||||||
|
|
||||||
|
// Large data (link transport) - large arrowtable, large jsontable, large image, large audio, large video, large binary
|
||||||
|
// ['arrow_table_large', largeArrowTable, 'arrowtable'],
|
||||||
|
['json_table_large', largeJsonTable, 'jsontable'],
|
||||||
|
[filenameLargeImage, fileDataLargeImage, 'binary'],
|
||||||
|
// ['audio_clip_large', largeAudioData, 'audio'],
|
||||||
|
// ['video_clip_large', largeVideoData, 'video'],
|
||||||
|
// ['binary_file_large', largeBinaryData, 'binary']
|
||||||
|
];
|
||||||
|
|
||||||
|
logTrace('Total payloads: ${payloads.length}');
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Send the message
|
||||||
|
print('Sending mixed payloads...\n');
|
||||||
|
final (env, envJsonStr) = await natsbridge.smartsend(
|
||||||
|
TEST_SUBJECT,
|
||||||
|
payloads,
|
||||||
|
brokerUrl: TEST_BROKER_URL,
|
||||||
|
fileserverUrl: TEST_FILESERVER_URL,
|
||||||
|
fileserverUploadHandler: natsbridge.plikOneshotUpload,
|
||||||
|
sizeThreshold: SIZE_THRESHOLD,
|
||||||
|
correlationId: correlationId,
|
||||||
|
msgPurpose: 'chat',
|
||||||
|
senderName: 'dart-mix-test',
|
||||||
|
receiverName: '',
|
||||||
|
receiverId: '',
|
||||||
|
replyTo: '',
|
||||||
|
replyToMsgId: '',
|
||||||
|
isPublish: true,
|
||||||
|
);
|
||||||
|
|
||||||
|
print('\n=== Envelope Created ===');
|
||||||
|
print('Correlation ID: ${env['correlation_id']}');
|
||||||
|
print('Message ID: ${env['msg_id']}');
|
||||||
|
print('Timestamp: ${env['timestamp']}');
|
||||||
|
print('Subject: ${env['send_to']}');
|
||||||
|
print('Purpose: ${env['msg_purpose']}');
|
||||||
|
print('Sender: ${env['sender_name']}');
|
||||||
|
print('Payloads: ${env['payloads'].length}\n');
|
||||||
|
|
||||||
|
// Log transport type for each payload
|
||||||
|
for (int i = 0; i < env['payloads'].length; i++) {
|
||||||
|
final payload = env['payloads'][i];
|
||||||
|
logTrace('Payload ${i + 1} (${payload['dataname']}):');
|
||||||
|
logTrace(' Transport: ${payload['transport']}');
|
||||||
|
logTrace(' Type: ${payload['payload_type']}');
|
||||||
|
logTrace(' Size: ${payload['size']} bytes');
|
||||||
|
logTrace(' Encoding: ${payload['encoding']}');
|
||||||
|
|
||||||
|
if (payload['transport'] == 'link') {
|
||||||
|
logTrace(' URL: ${payload['data']}');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Summary
|
||||||
|
print('\n--- Transport Summary ---');
|
||||||
|
final directCount = env['payloads'].where((p) => p['transport'] == 'direct').length;
|
||||||
|
final linkCount = env['payloads'].where((p) => p['transport'] == 'link').length;
|
||||||
|
logTrace('Direct transport: $directCount payloads');
|
||||||
|
logTrace('Link transport: $linkCount payloads');
|
||||||
|
|
||||||
|
print('\nTest completed.');
|
||||||
|
} catch (error) {
|
||||||
|
print('\n❌ Test failed with error: $error');
|
||||||
|
print('$error');
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void main() {
|
||||||
|
runTest();
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user