update
This commit is contained in:
172
test/test_js_table_receiver.js
Normal file
172
test/test_js_table_receiver.js
Normal file
@@ -0,0 +1,172 @@
|
||||
/**
|
||||
* JavaScript Table Receiver Test
|
||||
* Tests the smartreceive function with table (Arrow IPC) payloads
|
||||
*/
|
||||
|
||||
const NATSBridge = require('../src/natbridge.js');
|
||||
|
||||
const TEST_BROKER_URL = process.env.NATS_URL || 'nats://localhost:4222';
|
||||
const TEST_FILESERVER_URL = process.env.FILESERVER_URL || 'http://localhost:8080';
|
||||
|
||||
async function runTest() {
|
||||
console.log('=== JavaScript Table Receiver Test ===\n');
|
||||
|
||||
// Create a mock NATS message with table payload
|
||||
const tableData = [
|
||||
{ id: 1, name: 'Alice', age: 30, active: true },
|
||||
{ id: 2, name: 'Bob', age: 25, active: false },
|
||||
{ id: 3, name: 'Charlie', age: 35, active: true }
|
||||
];
|
||||
|
||||
// Convert to Arrow IPC format
|
||||
const arrow = require('apache-arrow');
|
||||
const fields = [
|
||||
new arrow.Field('id', arrow.Int64, true),
|
||||
new arrow.Field('name', arrow.Utf8, true),
|
||||
new arrow.Field('age', arrow.Int64, true),
|
||||
new arrow.Field('active', arrow.Boolean, true)
|
||||
];
|
||||
const schema = new arrow.Schema(fields);
|
||||
const batches = [];
|
||||
for (const row of tableData) {
|
||||
const batch = arrow.recordBatch.fromObjects([row], schema);
|
||||
batches.push(batch);
|
||||
}
|
||||
const buffers = arrow.ipc.recordBatchesToMessage(batches, schema).buffers;
|
||||
const combined = new Uint8Array(buffers.reduce((acc, b) => acc + b.byteLength, 0));
|
||||
let offset = 0;
|
||||
for (const buf of buffers) {
|
||||
combined.set(new Uint8Array(buf), offset);
|
||||
offset += buf.byteLength;
|
||||
}
|
||||
const arrowBuffer = Buffer.from(combined);
|
||||
|
||||
const testData = {
|
||||
correlation_id: 'js-table-receiver-' + Date.now(),
|
||||
msg_id: 'msg-' + Date.now(),
|
||||
timestamp: new Date().toISOString(),
|
||||
send_to: '/test/table',
|
||||
msg_purpose: 'test',
|
||||
sender_name: 'js-table-test',
|
||||
sender_id: 'sender-' + Date.now(),
|
||||
receiver_name: 'js-receiver',
|
||||
receiver_id: 'receiver-' + Date.now(),
|
||||
reply_to: '',
|
||||
reply_to_msg_id: '',
|
||||
broker_url: TEST_BROKER_URL,
|
||||
metadata: {},
|
||||
payloads: [
|
||||
{
|
||||
id: 'payload-1',
|
||||
dataname: 'users_table',
|
||||
payload_type: 'table',
|
||||
transport: 'direct',
|
||||
encoding: 'base64',
|
||||
size: arrowBuffer.length,
|
||||
data: arrowBuffer.toString('base64'),
|
||||
metadata: { payload_bytes: arrowBuffer.length }
|
||||
}
|
||||
]
|
||||
};
|
||||
|
||||
const mockMsg = {
|
||||
payload: JSON.stringify(testData)
|
||||
};
|
||||
|
||||
console.log('Mock Message Created:');
|
||||
console.log(` Correlation ID: ${testData.correlation_id}`);
|
||||
console.log(` Payloads: ${testData.payloads.length}`);
|
||||
console.log(` Payload type: ${testData.payloads[0].payload_type}`);
|
||||
console.log(` Transport: ${testData.payloads[0].transport}\n`);
|
||||
|
||||
try {
|
||||
// Receive and process the message
|
||||
console.log('Receiving and processing message...');
|
||||
const env = await NATSBridge.smartreceive(
|
||||
mockMsg,
|
||||
{
|
||||
max_retries: 3,
|
||||
base_delay: 100,
|
||||
max_delay: 1000
|
||||
}
|
||||
);
|
||||
|
||||
console.log('\n=== Received Envelope ===');
|
||||
console.log(`Correlation ID: ${env.correlation_id}`);
|
||||
console.log(`Message ID: ${env.msg_id}`);
|
||||
console.log(`Timestamp: ${env.timestamp}`);
|
||||
console.log(`Subject: ${env.send_to}`);
|
||||
console.log(`Payloads: ${env.payloads.length}\n`);
|
||||
|
||||
// Validate received data
|
||||
console.log('=== Validation ===');
|
||||
let passed = true;
|
||||
|
||||
if (!env.correlation_id) {
|
||||
console.log('❌ correlation_id is missing');
|
||||
passed = false;
|
||||
} else {
|
||||
console.log('✅ correlation_id present');
|
||||
}
|
||||
|
||||
if (env.payloads.length !== 1) {
|
||||
console.log(`❌ Expected 1 payload, got ${env.payloads.length}`);
|
||||
passed = false;
|
||||
} else {
|
||||
console.log('✅ Correct number of payloads');
|
||||
}
|
||||
|
||||
const payload = env.payloads[0];
|
||||
if (payload[0] !== 'users_table') {
|
||||
console.log(`❌ Expected dataname 'users_table', got '${payload[0]}'`);
|
||||
passed = false;
|
||||
} else {
|
||||
console.log('✅ Correct dataname');
|
||||
}
|
||||
|
||||
if (payload[2] !== 'table') {
|
||||
console.log(`❌ Expected type 'table', got '${payload[2]}'`);
|
||||
passed = false;
|
||||
} else {
|
||||
console.log('✅ Correct type');
|
||||
}
|
||||
|
||||
// Verify table data is a Buffer (Arrow IPC format)
|
||||
if (payload[1] instanceof Buffer || payload[1] instanceof Uint8Array) {
|
||||
console.log('✅ Table data is Arrow IPC buffer');
|
||||
console.log(` Buffer size: ${payload[1].length} bytes`);
|
||||
} else {
|
||||
console.log(`❌ Expected Buffer/Uint8Array, got ${typeof payload[1]}`);
|
||||
passed = false;
|
||||
}
|
||||
|
||||
// Test round-trip with Arrow deserialization
|
||||
console.log('\n=== Arrow Deserialization Test ===');
|
||||
try {
|
||||
const table = arrow.tableFromRawBytes(payload[1]);
|
||||
console.log(`✅ Arrow table deserialized successfully`);
|
||||
console.log(` Schema: ${table.schema.fields.map(f => f.name).join(', ')}`);
|
||||
console.log(` Num rows: ${table.numRows}`);
|
||||
} catch (e) {
|
||||
console.log('❌ Arrow deserialization failed:', e.message);
|
||||
passed = false;
|
||||
}
|
||||
|
||||
// Final result
|
||||
console.log('\n=== Test Result ===');
|
||||
if (passed) {
|
||||
console.log('✅ ALL TESTS PASSED');
|
||||
process.exit(0);
|
||||
} else {
|
||||
console.log('❌ SOME TESTS FAILED');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.error('❌ Test failed with error:', error.message);
|
||||
console.error(error.stack);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
runTest();
|
||||
Reference in New Issue
Block a user