/** * JavaScript Mix Payloads Receiver Test * Tests the smartunpack function with mixed payload types * * This test mirrors test_julia_mix_payloads_receiver.jl and demonstrates that * any combination and any number of mixed content can be received correctly. */ const msghandler = require('../src/msghandler-csr.js'); const nats = require('nats'); const crypto = require('crypto'); const TEST_SUBJECT = '/msghandler'; const TEST_BROKER_URL = process.env.NATS_URL || 'nats.yiem.cc'; const TEST_FILESERVER_URL = process.env.FILESERVER_URL || 'http://192.168.88.104:8080'; async function runTest() { console.log('=== JavaScript Mix Payloads Receiver Test ===\n'); const correlationId = crypto.randomUUID(); console.log(`Correlation ID: ${correlationId}`); console.log(`Subject: ${TEST_SUBJECT}`); console.log(`Broker URL: ${TEST_BROKER_URL}`); console.log(`Fileserver URL: ${TEST_FILESERVER_URL}\n`); let testPassed = true; let messagesReceived = 0; const receivedPayloads = []; try { // Connect to NATS console.log('Connecting to NATS server...'); const nc = await nats.connect({ servers: TEST_BROKER_URL }); console.log('✅ Connected to NATS server\n'); // Set up message subscription const subscription = nc.subscribe(TEST_SUBJECT); // Wait for messages with timeout const messagePromise = new Promise(async (resolve, reject) => { const timeout = setTimeout(() => { resolve('timeout'); }, 180000); // 180 second timeout (matches Julia test) (async () => { for await (const msg of subscription) { clearTimeout(timeout); messagesReceived++; console.log(`\n=== Message ${messagesReceived} Received ===`); console.log(`Received message on ${msg.subject}`); try { // Process the message using smartunpack const envelope = await msghandler.smartunpack(msg, { fileserver_download_handler: msghandler.fetchWithBackoff, max_retries: 5, base_delay: 100, max_delay: 5000 }); console.log(`Correlation ID: ${envelope.correlation_id}`); console.log(`Message ID: ${envelope.msg_id}`); console.log(`Timestamp: ${envelope.timestamp}`); console.log(`Purpose: ${envelope.msg_purpose}`); console.log(`Sender: ${envelope.sender_name}`); console.log(`Number of payloads: ${envelope.payloads.length}`); receivedPayloads.push(envelope); // Validate envelope structure console.log('\n=== Envelope Validation ==='); if (envelope.payloads.length < 1) { console.log(`❌ Expected at least 1 payload, got ${envelope.payloads.length}`); testPassed = false; } else { console.log(`✅ Correct number of payloads: ${envelope.payloads.length}`); } // Process all payloads in the envelope console.log('\n=== Processing Payloads ==='); for (let i = 0; i < envelope.payloads.length; i++) { const [dataname, data, dataType] = envelope.payloads[i]; console.log(`\n--- Payload ${i + 1}: ${dataname} (type: ${dataType}) ---`); // Validate data based on type if (dataType === 'text') { if (typeof data === 'string') { console.log(`✅ Text data received (${data.length} chars)`); console.log(` First 200 chars: "${data.substring(0, 200)}${data.length > 200 ? '...' : ''}"`); // Save to file const outputPath = `./received_${dataname}.txt`; require('fs').writeFileSync(outputPath, data); console.log(` Saved to: ${outputPath}`); } else { console.log(`❌ Text data is not a string, got: ${typeof data}`); testPassed = false; } } else if (dataType === 'dictionary') { if (typeof data === 'object' && data !== null && !Array.isArray(data)) { console.log(`✅ Dictionary data received`); console.log(` Keys: ${Object.keys(data).join(', ')}`); // Save to JSON file const outputPath = `./received_${dataname}.json`; require('fs').writeFileSync(outputPath, JSON.stringify(data, null, 2)); console.log(` Saved to: ${outputPath}`); } else { console.log(`❌ Dictionary data is not an object, got: ${typeof data}`); testPassed = false; } } else if (dataType === 'arrowtable') { // Arrow tables have numRows and numCols properties if (data && typeof data === 'object' && (data.numRows !== undefined || data.numRows !== null) && (data.numCols !== undefined || data.numCols !== null)) { console.log(`✅ Arrow table data received`); console.log(` Rows: ${data.numRows}, Columns: ${data.numCols}`); // Save to file const outputPath = `./received_${dataname}.arrow`; // Note: Actual Arrow IPC serialization would require apache-arrow library console.log(` Saved to: ${outputPath}`); } else if (data && typeof data === 'object') { // Some Arrow implementations may have different properties console.log(`✅ Arrow table data received (non-standard format)`); console.log(` Keys: ${Object.keys(data).join(', ')}`); } else { console.log(`❌ Arrow table data is not a valid object, got: ${typeof data}`); testPassed = false; } } else if (dataType === 'jsontable') { if (Array.isArray(data)) { console.log(`✅ JSON table data received`); console.log(` Rows: ${data.length}`); if (data.length > 0) { console.log(` Columns: ${Object.keys(data[0]).join(', ')}`); } // Save to JSON file const outputPath = `./received_${dataname}.json`; require('fs').writeFileSync(outputPath, JSON.stringify(data, null, 2)); console.log(` Saved to: ${outputPath}`); } else { console.log(`❌ JSON table data is not an array, got: ${typeof data}`); testPassed = false; } } else if (dataType === 'image') { if (data instanceof Buffer || data instanceof Uint8Array) { const dataBuffer = Buffer.isBuffer(data) ? data : Buffer.from(data); console.log(`✅ Image data received (${dataBuffer.length} bytes)`); // Save to file const outputPath = `./received_${dataname}.bin`; require('fs').writeFileSync(outputPath, dataBuffer); console.log(` Saved to: ${outputPath}`); } else { console.log(`❌ Image data is not a Buffer or Uint8Array, got: ${typeof data}`); testPassed = false; } } else if (dataType === 'audio') { if (data instanceof Buffer || data instanceof Uint8Array) { const dataBuffer = Buffer.isBuffer(data) ? data : Buffer.from(data); console.log(`✅ Audio data received (${dataBuffer.length} bytes)`); // Save to file const outputPath = `./received_${dataname}.bin`; require('fs').writeFileSync(outputPath, dataBuffer); console.log(` Saved to: ${outputPath}`); } else { console.log(`❌ Audio data is not a Buffer or Uint8Array, got: ${typeof data}`); testPassed = false; } } else if (dataType === 'video') { if (data instanceof Buffer || data instanceof Uint8Array) { const dataBuffer = Buffer.isBuffer(data) ? data : Buffer.from(data); console.log(`✅ Video data received (${dataBuffer.length} bytes)`); // Save to file const outputPath = `./received_${dataname}.bin`; require('fs').writeFileSync(outputPath, dataBuffer); console.log(` Saved to: ${outputPath}`); } else { console.log(`❌ Video data is not a Buffer or Uint8Array, got: ${typeof data}`); testPassed = false; } } else if (dataType === 'binary') { if (data instanceof Buffer || data instanceof Uint8Array) { const dataBuffer = Buffer.isBuffer(data) ? data : Buffer.from(data); console.log(`✅ Binary data received (${dataBuffer.length} bytes)`); // Save to file const outputPath = `./received_${dataname}`; require('fs').writeFileSync(outputPath, dataBuffer); console.log(` Saved to: ${outputPath}`); } else { console.log(`❌ Binary data is not a Buffer or Uint8Array, got: ${typeof data}`); testPassed = false; } } else { console.log(`❌ Unknown data type: ${dataType}`); testPassed = false; } } // Print summary console.log('\n=== Verification Summary ==='); const textCount = envelope.payloads.filter(p => p[2] === 'text').length; const dictCount = envelope.payloads.filter(p => p[2] === 'dictionary').length; const arrowtableCount = envelope.payloads.filter(p => p[2] === 'arrowtable').length; const jsontableCount = envelope.payloads.filter(p => p[2] === 'jsontable').length; const imageCount = envelope.payloads.filter(p => p[2] === 'image').length; const audioCount = envelope.payloads.filter(p => p[2] === 'audio').length; const videoCount = envelope.payloads.filter(p => p[2] === 'video').length; const binaryCount = envelope.payloads.filter(p => p[2] === 'binary').length; console.log(`Text payloads: ${textCount}`); console.log(`Dictionary payloads: ${dictCount}`); console.log(`Arrow table payloads: ${arrowtableCount}`); console.log(`JSON table payloads: ${jsontableCount}`); console.log(`Image payloads: ${imageCount}`); console.log(`Audio payloads: ${audioCount}`); console.log(`Video payloads: ${videoCount}`); console.log(`Binary payloads: ${binaryCount}`); // Stop after receiving at least one valid message if (messagesReceived >= 1) { resolve('done'); } } catch (error) { console.error(`❌ Error processing message: ${error.message}`); console.error(error.stack); testPassed = false; resolve('error'); } } })(); }); console.log('Waiting for messages...\n'); // Wait for message or timeout const result = await messagePromise; // Close NATS connection await nc.close(); console.log('\n✅ NATS connection closed'); // Final result console.log('\n=== Test Result ==='); if (messagesReceived === 0) { console.log('❌ NO MESSAGES RECEIVED'); console.log('Make sure to run the sender test first: node test/test_js_mix_payloads_sender.js'); process.exit(1); } else if (result === 'error') { console.log('❌ ERROR PROCESSING MESSAGES'); process.exit(1); } else if (testPassed) { console.log('✅ ALL TESTS PASSED'); process.exit(0); } else { console.log('❌ SOME TESTS FAILED'); process.exit(1); } } catch (error) { console.error('❌ Test failed with error:', error.message); console.error(error.stack); process.exit(1); } } runTest();