remove arrow support for natsbridge_csr.js

This commit is contained in:
2026-03-15 11:58:08 +07:00
parent fb315a0525
commit 49d7898720
5 changed files with 69 additions and 86 deletions

View File

@@ -6,7 +6,9 @@
* using NATS as the message bus, with support for both direct payload transport and
* URL-based transport for larger payloads.
*
* Supported payload types: "text", "dictionary", "arrowtable", "jsontable", "image", "audio", "video", "binary"
* Supported payload types: "text", "dictionary", "jsontable", "image", "audio", "video", "binary"
* Note: Browser version does NOT support Apache Arrow IPC (arrowtable) due to browser compatibility constraints.
* Use "jsontable" for tabular data in browser applications.
*
* Browser-compatible version uses:
* - nats.ws for WebSocket-based NATS connections
@@ -21,7 +23,6 @@
import * as nats from 'nats.ws';
// Use native fetch available in browsers
import { tableFromArrays, tableToIPC } from 'apache-arrow/browser';
// ---------------------------------------------- Constants ---------------------------------------------- //
@@ -111,13 +112,6 @@ async function serializeData(data, payloadType) {
} else if (payloadType === 'dictionary') {
const jsonStr = JSON.stringify(data);
return new Uint8Array(new TextEncoder().encode(jsonStr));
} else if (payloadType === 'arrowtable') {
// Convert array of objects to Arrow IPC format
if (!Array.isArray(data) || data.length === 0) {
throw new Error('Arrow table data must be a non-empty array of objects');
}
return serializeArrowTable(data);
} else if (payloadType === 'jsontable') {
// Serialize array of objects to JSON format
if (!Array.isArray(data)) {
@@ -154,49 +148,6 @@ async function serializeData(data, payloadType) {
}
}
/**
* Helper function to properly serialize table data to Arrow IPC
* @param {Array<Object>} data - Array of objects representing table rows
* @returns {Uint8Array} Arrow IPC formatted buffer
*/
function serializeArrowTable(data) {
if (!Array.isArray(data) || data.length === 0) {
throw new Error('Table data must be a non-empty array of objects');
}
logTrace('serializeArrowTable', `Serializing table with ${data.length} rows`);
// Convert array of objects to a key-value format expected by tableFromArrays
const columns = {};
const keys = Object.keys(data[0]);
for (const key of keys) {
columns[key] = data.map(row => row[key]);
}
logTrace('serializeArrowTable', `Columns: ${Object.keys(columns).join(', ')}`);
const table = tableFromArrays(columns);
logTrace('serializeArrowTable', `Arrow table created with ${table.numRows} rows, ${table.numCols} cols`);
// Convert to IPC format
const ipcBuffer = tableToIPC(table);
logTrace('serializeArrowTable', `IPC buffer type: ${typeof ipcBuffer}, byteLength: ${ipcBuffer.byteLength}`);
const resultBuffer = new Uint8Array(ipcBuffer);
logTrace('serializeArrowTable', `Result buffer: ${resultBuffer.length} bytes`);
// Debug: Show first 20 bytes in hex
const hexPreview = [];
for (let i = 0; i < Math.min(20, resultBuffer.length); i++) {
hexPreview.push(resultBuffer[i].toString(16).padStart(2, '0'));
}
logTrace('serializeArrowTable', `First 20 bytes (hex): ${hexPreview.join(' ')}`);
return resultBuffer;
}
/**
* Deserialize bytes to data based on type
* @param {Uint8Array|ArrayBuffer} data - Serialized data as bytes
@@ -210,7 +161,7 @@ async function deserializeData(data, payloadType, correlationId) {
logTrace(correlationId, `deserializeData: type=${payloadType}, bufferLength=${buffer.length}`);
// Debug: Show first 20 bytes in hex for binary data
if (payloadType === 'arrowtable' || payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') {
if (payloadType === 'jsontable' || payloadType === 'image' || payloadType === 'binary') {
const hexPreview = [];
for (let i = 0; i < Math.min(20, buffer.length); i++) {
hexPreview.push(buffer[i].toString(16).padStart(2, '0'));
@@ -227,18 +178,6 @@ async function deserializeData(data, payloadType, correlationId) {
const result = JSON.parse(jsonStr);
logTrace(correlationId, `deserializeData: dictionary keys=${Object.keys(result).join(', ')}`);
return result;
} else if (payloadType === 'arrowtable') {
logTrace(correlationId, `deserializeData: Attempting Arrow table deserialization`);
try {
// Try tableFromIPC (browser API)
const table = tableFromIPC(buffer);
logTrace(correlationId, `deserializeData: Arrow table from IPC - rows=${table.numRows}, cols=${table.numCols}`);
return table;
} catch (e) {
logTrace(correlationId, `deserializeData: tableFromIPC failed: ${e.message}`);
throw new Error(`Unable to deserialize Arrow table: ${e.message}`);
}
} else if (payloadType === 'jsontable') {
const jsonStr = new TextDecoder().decode(buffer);
const result = JSON.parse(jsonStr);
@@ -478,8 +417,6 @@ function buildPayload(dataname, payloadType, payloadBytes, transport, data) {
let encoding = 'base64';
if (payloadType === 'jsontable') {
encoding = 'json';
} else if (payloadType === 'arrowtable') {
encoding = 'arrow-ipc';
}
return {