173 lines
6.0 KiB
JavaScript
173 lines
6.0 KiB
JavaScript
/**
|
|
* JavaScript Table Receiver Test
|
|
* Tests the smartreceive function with table (Arrow IPC) payloads
|
|
*/
|
|
|
|
const NATSBridge = require('../src/natsbridge.js');
|
|
const crypto = require('crypto');
|
|
|
|
const TEST_BROKER_URL = process.env.NATS_URL || 'nats://localhost:4222';
|
|
const TEST_FILESERVER_URL = process.env.FILESERVER_URL || 'http://localhost:8080';
|
|
|
|
async function runTest() {
|
|
console.log('=== JavaScript Table Receiver Test ===\n');
|
|
|
|
// Create a mock NATS message with table payload
|
|
const tableData = [
|
|
{ id: 1, name: 'Alice', age: 30, active: true },
|
|
{ id: 2, name: 'Bob', age: 25, active: false },
|
|
{ id: 3, name: 'Charlie', age: 35, active: true }
|
|
];
|
|
|
|
// Convert to Arrow IPC format
|
|
const arrow = require('apache-arrow');
|
|
const fields = [
|
|
new arrow.Field('id', arrow.Int64, true),
|
|
new arrow.Field('name', arrow.Utf8, true),
|
|
new arrow.Field('age', arrow.Int64, true),
|
|
new arrow.Field('active', arrow.Boolean, true)
|
|
];
|
|
const schema = new arrow.Schema(fields);
|
|
const batches = [];
|
|
for (const row of tableData) {
|
|
const batch = arrow.recordBatch.fromObjects([row], schema);
|
|
batches.push(batch);
|
|
}
|
|
const buffers = arrow.ipc.recordBatchesToMessage(batches, schema).buffers;
|
|
const combined = new Uint8Array(buffers.reduce((acc, b) => acc + b.byteLength, 0));
|
|
let offset = 0;
|
|
for (const buf of buffers) {
|
|
combined.set(new Uint8Array(buf), offset);
|
|
offset += buf.byteLength;
|
|
}
|
|
const arrowBuffer = Buffer.from(combined);
|
|
|
|
const testData = {
|
|
correlation_id: 'js-table-receiver-' + crypto.randomUUID(),
|
|
msg_id: 'msg-' + crypto.randomUUID(),
|
|
timestamp: new Date().toISOString(),
|
|
send_to: '/test/table',
|
|
msg_purpose: 'test',
|
|
sender_name: 'js-table-test',
|
|
sender_id: 'sender-' + crypto.randomUUID(),
|
|
receiver_name: 'js-receiver',
|
|
receiver_id: 'receiver-' + crypto.randomUUID(),
|
|
reply_to: '',
|
|
reply_to_msg_id: '',
|
|
broker_url: TEST_BROKER_URL,
|
|
metadata: {},
|
|
payloads: [
|
|
{
|
|
id: 'payload-1',
|
|
dataname: 'users_table',
|
|
payload_type: 'table',
|
|
transport: 'direct',
|
|
encoding: 'base64',
|
|
size: arrowBuffer.length,
|
|
data: arrowBuffer.toString('base64'),
|
|
metadata: { payload_bytes: arrowBuffer.length }
|
|
}
|
|
]
|
|
};
|
|
|
|
const mockMsg = {
|
|
payload: JSON.stringify(testData)
|
|
};
|
|
|
|
console.log('Mock Message Created:');
|
|
console.log(` Correlation ID: ${testData.correlation_id}`);
|
|
console.log(` Payloads: ${testData.payloads.length}`);
|
|
console.log(` Payload type: ${testData.payloads[0].payload_type}`);
|
|
console.log(` Transport: ${testData.payloads[0].transport}\n`);
|
|
|
|
try {
|
|
// Receive and process the message
|
|
console.log('Receiving and processing message...');
|
|
const env = await NATSBridge.smartreceive(
|
|
mockMsg,
|
|
{
|
|
max_retries: 3,
|
|
base_delay: 100,
|
|
max_delay: 1000
|
|
}
|
|
);
|
|
|
|
console.log('\n=== Received Envelope ===');
|
|
console.log(`Correlation ID: ${env.correlation_id}`);
|
|
console.log(`Message ID: ${env.msg_id}`);
|
|
console.log(`Timestamp: ${env.timestamp}`);
|
|
console.log(`Subject: ${env.send_to}`);
|
|
console.log(`Payloads: ${env.payloads.length}\n`);
|
|
|
|
// Validate received data
|
|
console.log('=== Validation ===');
|
|
let passed = true;
|
|
|
|
if (!env.correlation_id) {
|
|
console.log('❌ correlation_id is missing');
|
|
passed = false;
|
|
} else {
|
|
console.log('✅ correlation_id present');
|
|
}
|
|
|
|
if (env.payloads.length !== 1) {
|
|
console.log(`❌ Expected 1 payload, got ${env.payloads.length}`);
|
|
passed = false;
|
|
} else {
|
|
console.log('✅ Correct number of payloads');
|
|
}
|
|
|
|
const payload = env.payloads[0];
|
|
if (payload[0] !== 'users_table') {
|
|
console.log(`❌ Expected dataname 'users_table', got '${payload[0]}'`);
|
|
passed = false;
|
|
} else {
|
|
console.log('✅ Correct dataname');
|
|
}
|
|
|
|
if (payload[2] !== 'table') {
|
|
console.log(`❌ Expected type 'table', got '${payload[2]}'`);
|
|
passed = false;
|
|
} else {
|
|
console.log('✅ Correct type');
|
|
}
|
|
|
|
// Verify table data is a Buffer (Arrow IPC format)
|
|
if (payload[1] instanceof Buffer || payload[1] instanceof Uint8Array) {
|
|
console.log('✅ Table data is Arrow IPC buffer');
|
|
console.log(` Buffer size: ${payload[1].length} bytes`);
|
|
} else {
|
|
console.log(`❌ Expected Buffer/Uint8Array, got ${typeof payload[1]}`);
|
|
passed = false;
|
|
}
|
|
|
|
// Test round-trip with Arrow deserialization
|
|
console.log('\n=== Arrow Deserialization Test ===');
|
|
try {
|
|
const table = arrow.tableFromRawBytes(payload[1]);
|
|
console.log(`✅ Arrow table deserialized successfully`);
|
|
console.log(` Schema: ${table.schema.fields.map(f => f.name).join(', ')}`);
|
|
console.log(` Num rows: ${table.numRows}`);
|
|
} catch (e) {
|
|
console.log('❌ Arrow deserialization failed:', e.message);
|
|
passed = false;
|
|
}
|
|
|
|
// Final result
|
|
console.log('\n=== Test Result ===');
|
|
if (passed) {
|
|
console.log('✅ ALL TESTS PASSED');
|
|
process.exit(0);
|
|
} else {
|
|
console.log('❌ SOME TESTS FAILED');
|
|
process.exit(1);
|
|
}
|
|
|
|
} catch (error) {
|
|
console.error('❌ Test failed with error:', error.message);
|
|
console.error(error.stack);
|
|
process.exit(1);
|
|
}
|
|
}
|
|
|
|
runTest(); |