This commit is contained in:
2026-02-19 15:58:22 +07:00
parent 782a935d3d
commit a260def38d
7 changed files with 620 additions and 308 deletions

View File

@@ -1,290 +0,0 @@
/**
* Bi-Directional Data Bridge - JavaScript Module
* Implements SmartSend and SmartReceive for NATS communication
*/
const { v4: uuidv4 } = require('uuid');
const { decode, encode } = require('base64-url');
const Arrow = require('apache-arrow');
// Constants
const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB
const DEFAULT_NATS_URL = 'nats://localhost:4222';
const DEFAULT_FILESERVER_URL = 'http://localhost:8080/upload';
// Supported payload types
const PAYLOAD_TYPES = ['text', 'dictionary', 'table', 'image', 'audio', 'video', 'binary'];
// Logging helper
function logTrace(correlationId, message) {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`);
}
// Message Envelope Class
class MessageEnvelope {
constructor(options = {}) {
this.correlation_id = options.correlation_id || uuidv4();
this.type = options.type || 'json';
this.transport = options.transport || 'direct';
this.payload = options.payload || null;
this.url = options.url || null;
this.metadata = options.metadata || {};
}
static fromJSON(jsonStr) {
const data = JSON.parse(jsonStr);
return new MessageEnvelope({
correlation_id: data.correlation_id,
type: data.type,
transport: data.transport,
payload: data.payload || null,
url: data.url || null,
metadata: data.metadata || {}
});
}
toJSON() {
const obj = {
correlation_id: this.correlation_id,
type: this.type,
transport: this.transport
};
if (this.payload) {
obj.payload = this.payload;
}
if (this.url) {
obj.url = this.url;
}
if (Object.keys(this.metadata).length > 0) {
obj.metadata = this.metadata;
}
return JSON.stringify(obj);
}
}
// Helper: Serialize data based on type
function _serializeData(data, type, correlationId) {
if (type === 'json') {
const jsonStr = JSON.stringify(data);
return Buffer.from(jsonStr, 'utf8');
} else if (type === 'table') {
// Table data - convert to Arrow IPC stream
const writer = new Arrow.Writer();
writer.writeTable(data);
return writer.toByteArray();
} else if (type === 'binary') {
// Binary data - treat as binary
if (data instanceof Buffer) {
return data;
} else if (Array.isArray(data)) {
return Buffer.from(data);
} else {
throw new Error('Binary data must be binary (Buffer or Array)');
}
} else {
throw new Error(`Unknown type: ${type}`);
}
}
// Helper: Upload data to fileserver (mock implementation)
async function uploadToServer(data, fileserverUrl, correlationId) {
// This is a placeholder - in real implementation, this would upload to the fileserver
// and return the URL. For now, we return a mock URL.
return `${fileserverUrl}/mock-upload-${Date.now()}`;
}
// SmartSend for JavaScript - Handles transport selection based on payload size
// Input format: [(dataname, data, type), ...]
async function SmartSend(subject, data, options = {}) {
const {
natsUrl = DEFAULT_NATS_URL,
fileserverUrl = DEFAULT_FILESERVER_URL,
sizeThreshold = DEFAULT_SIZE_THRESHOLD,
correlationId = uuidv4()
} = options;
logTrace(correlationId, `Starting SmartSend for subject: ${subject}`);
// Process each payload in the list
const payloadResults = [];
for (let i = 0; i < data.length; i++) {
const tuple = data[i];
if (tuple.length !== 3) {
throw new Error(`Payload at index ${i} must be a tuple of [dataname, data, type]`);
}
const [dataname, payload_data, payload_type] = tuple;
// Validate type
if (!PAYLOAD_TYPES.includes(payload_type)) {
throw new Error(`Unknown payload type '${payload_type}' for payload '${dataname}'. Supported types: ${PAYLOAD_TYPES.join(', ')}`);
}
// Serialize data based on type
const payloadBytes = _serializeData(payload_data, payload_type, correlationId);
const payloadSize = payloadBytes.length;
logTrace(correlationId, `Serialized payload '${dataname}' (type: ${payload_type}) size: ${payloadSize} bytes`);
// Decision: Direct vs Link
if (payloadSize < sizeThreshold) {
// Direct path - Base64 encode and send via NATS
const payloadBase64 = encode(payloadBytes);
logTrace(correlationId, `Using direct transport for ${payloadSize} bytes`);
payloadResults.push({
dataname,
payload_type,
transport: 'direct',
payload: payloadBase64,
metadata: {
content_length: payloadSize.toString(),
format: 'arrow_ipc_stream'
}
});
} else {
// Link path - Upload to HTTP server, send URL via NATS
logTrace(correlationId, `Using link transport, uploading to fileserver`);
const url = await uploadToServer(payloadBytes, fileserverUrl, correlationId);
payloadResults.push({
dataname,
payload_type,
transport: 'link',
url: url,
metadata: {
content_length: payloadSize.toString(),
format: 'arrow_ipc_stream'
}
});
}
}
// Build the final message with all payloads
const allPayloads = payloadResults.map(p => ({
dataname: p.dataname,
type: p.payload_type,
transport: p.transport,
...(p.transport === 'direct' ? { payload: p.payload } : { url: p.url }),
metadata: p.metadata
}));
// Create envelope and publish
const env = {
correlation_id: correlationId,
type: allPayloads[0].type, // Use first payload's type as envelope type
transport: allPayloads[0].transport,
payload: allPayloads.length === 1 && allPayloads[0].transport === 'direct' ? allPayloads[0].payload : undefined,
url: allPayloads.length === 1 && allPayloads[0].transport === 'link' ? allPayloads[0].url : undefined,
metadata: {},
_payloads: allPayloads // Internal storage for multiple payloads
};
await publishMessage(natsUrl, subject, JSON.stringify(env), correlationId);
return env;
}
// Helper: Publish message to NATS
async function publishMessage(natsUrl, subject, message, correlationId) {
const { connect } = require('nats');
try {
const nc = await connect({ servers: [natsUrl] });
await nc.publish(subject, message);
logTrace(correlationId, `Message published to ${subject}`);
nc.close();
} catch (error) {
logTrace(correlationId, `Failed to publish message: ${error.message}`);
throw error;
}
}
// SmartReceive for JavaScript - Handles both direct and link transport
async function SmartReceive(msg, options = {}) {
const {
fileserverUrl = DEFAULT_FILESERVER_URL,
maxRetries = 5,
baseDelay = 100,
maxDelay = 5000
} = options;
const env = MessageEnvelope.fromJSON(msg.data);
logTrace(env.correlation_id, `Processing received message`);
if (env.transport === 'direct') {
logTrace(env.correlation_id, `Direct transport - decoding payload`);
const payloadBytes = decode(env.payload);
const data = _deserializeData(payloadBytes, env.type, env.correlation_id, env.metadata);
return { data, envelope: env };
} else if (env.transport === 'link') {
logTrace(env.correlation_id, `Link transport - fetching from URL`);
const data = await _fetchWithBackoff(env.url, maxRetries, baseDelay, maxDelay, env.correlation_id);
const result = _deserializeData(data, env.type, env.correlation_id, env.metadata);
return { data: result, envelope: env };
} else {
throw new Error(`Unknown transport type: ${env.transport}`);
}
}
// Helper: Fetch with exponential backoff
async function _fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) {
let delay = baseDelay;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url);
if (response.ok) {
const buffer = await response.arrayBuffer();
logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`);
return new Uint8Array(buffer);
} else {
throw new Error(`Failed to fetch: ${response.status}`);
}
} catch (error) {
logTrace(correlationId, `Attempt ${attempt} failed: ${error.message}`);
if (attempt < maxRetries) {
await new Promise(resolve => setTimeout(resolve, delay));
delay = Math.min(delay * 2, maxDelay);
}
}
}
throw new Error(`Failed to fetch data after ${maxRetries} attempts`);
}
// Helper: Deserialize data based on type
async function _deserializeData(data, type, correlationId, metadata) {
if (type === 'json') {
const jsonStr = new TextDecoder().decode(data);
return JSON.parse(jsonStr);
} else if (type === 'table') {
// Deserialize Arrow IPC stream to Table
const table = Arrow.Table.from(data);
return table;
} else if (type === 'binary') {
// Return binary binary data
return data;
} else {
throw new Error(`Unknown type: ${type}`);
}
}
// Export functions
module.exports = {
SmartSend,
SmartReceive,
MessageEnvelope
};