239 lines
7.6 KiB
JavaScript
239 lines
7.6 KiB
JavaScript
import { connect, StringCodec } from "nats.ws";
|
|
import { writable } from "svelte/store";
|
|
import msghandlerCSR from './msghandler-csr.js';
|
|
|
|
export const connectionStatus = writable(null);
|
|
|
|
let config = null;
|
|
|
|
export function setConfig(cfg) {
|
|
config = cfg;
|
|
}
|
|
|
|
connectionStatus.subscribe(status => {
|
|
console.log('[NATS] Status updated:', status);
|
|
});
|
|
|
|
let nc = null;
|
|
let sc = StringCodec();
|
|
let subs = new Map();
|
|
let reconnectTimer = null;
|
|
let isDisconnecting = false;
|
|
|
|
export async function getNats(opts = {}) {
|
|
const servers = opts.servers || (import.meta.env.VITE_NATS_URL) || (config?.nats_server_info?.url) || "wss://nats.yiem.cc";
|
|
console.log('[NATS] Connecting to:', servers);
|
|
try {
|
|
connectionStatus.set({ state: "connecting", message: `connecting to ${servers}` });
|
|
console.log('[NATS] Status: connecting');
|
|
nc = await connect({ servers });
|
|
console.log('[NATS] Connected successfully!');
|
|
connectionStatus.set({ state: "connected", message: `connected to ${servers}` });
|
|
isDisconnecting = false;
|
|
|
|
nc.closed().then((err) => {
|
|
if (isDisconnecting) return;
|
|
if (err) {
|
|
console.log('[NATS] Connection closed with error:', err);
|
|
connectionStatus.set({ state: "error", message: String(err) });
|
|
} else {
|
|
console.log('[NATS] Connection closed normally');
|
|
connectionStatus.set({ state: "disconnected", message: "connection closed" });
|
|
}
|
|
attemptReconnect(servers);
|
|
});
|
|
|
|
return nc;
|
|
} catch (e) {
|
|
console.error('[NATS] Connection failed:', e);
|
|
connectionStatus.set({ state: "error", message: String(e) });
|
|
attemptReconnect(servers);
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
async function attemptReconnect(servers, retryCount = 0) {
|
|
const maxDelay = 30000;
|
|
const baseDelay = Math.min(1000 * Math.pow(2, retryCount), maxDelay);
|
|
|
|
console.log(`[NATS] Reconnecting in ${baseDelay}ms (attempt ${retryCount + 1})...`);
|
|
|
|
reconnectTimer = setTimeout(async () => {
|
|
try {
|
|
connectionStatus.set({ state: "connecting", message: `reconnecting to ${servers}` });
|
|
nc = await connect({ servers });
|
|
console.log('[NATS] Reconnected successfully!');
|
|
connectionStatus.set({ state: "connected", message: `connected to ${servers}` });
|
|
isDisconnecting = false;
|
|
|
|
const subsCopy = Array.from(subs.entries());
|
|
for (const [subject, onMessage] of subsCopy) {
|
|
try {
|
|
await subscribe(subject, onMessage);
|
|
} catch (e) {
|
|
console.warn(`[NATS] Failed to re-subscribe to ${subject}:`, e);
|
|
}
|
|
}
|
|
} catch (e) {
|
|
console.error('[NATS] Reconnection failed:', e);
|
|
connectionStatus.set({ state: "error", message: String(e) });
|
|
attemptReconnect(servers, retryCount + 1);
|
|
}
|
|
}, baseDelay);
|
|
}
|
|
|
|
export async function subscribe(subject, onMessage) {
|
|
console.log('[NATS] Subscribing to:', subject);
|
|
if (!nc) await getNats();
|
|
const sub = nc.subscribe(subject);
|
|
subs.set(subject, sub);
|
|
console.log('[NATS] Subscription created for:', subject);
|
|
(async () => {
|
|
for await (const m of sub) {
|
|
let text = "";
|
|
try {
|
|
text = sc.decode(m.data);
|
|
let parsed = {};
|
|
try { parsed = JSON.parse(text); } catch (e) { parsed = { text }; }
|
|
console.log('[NATS] Received message on', subject, ':', parsed);
|
|
onMessage(parsed);
|
|
} catch (e) {
|
|
console.warn("nats message parse error", e);
|
|
}
|
|
}
|
|
})();
|
|
return {
|
|
unsubscribe: async () => {
|
|
console.log('[NATS] Unsubscribing from:', subject);
|
|
try { sub.unsubscribe(); } catch (e) {}
|
|
subs.delete(subject);
|
|
}
|
|
};
|
|
}
|
|
|
|
export async function publish(subject, payload = {}, opts = {}) {
|
|
console.log('[NATS] Publishing to:', subject, 'payload:', payload);
|
|
if (!nc) await getNats(opts);
|
|
const msg = typeof payload === "string" ? payload : JSON.stringify(payload);
|
|
nc.publish(subject, sc.encode(msg));
|
|
console.log('[NATS] Message published successfully');
|
|
}
|
|
|
|
export async function closeNats() {
|
|
console.log('[NATS] Closing connection...');
|
|
connectionStatus.set({ state: "disconnected", message: "closing" });
|
|
try {
|
|
for (const s of subs.values()) {
|
|
try { s.unsubscribe(); } catch (e) {}
|
|
}
|
|
subs.clear();
|
|
if (nc) {
|
|
await nc.close();
|
|
nc = null;
|
|
}
|
|
console.log('[NATS] Connection closed');
|
|
connectionStatus.set({ state: "disconnected", message: "closed" });
|
|
} catch (e) {
|
|
console.error('[NATS] Error closing connection:', e);
|
|
connectionStatus.set({ state: "error", message: String(e) });
|
|
}
|
|
}
|
|
|
|
export async function connectNATS(brokerUrl, cfg) {
|
|
console.log('[NATS] Connecting with broker URL:', brokerUrl);
|
|
|
|
let serverUrl = brokerUrl;
|
|
|
|
if (!serverUrl && cfg) {
|
|
serverUrl = cfg?.nats_server_info?.url || 'wss://nats.yiem.cc';
|
|
}
|
|
|
|
console.log('[NATS] Using server URL:', serverUrl);
|
|
setConfig(cfg);
|
|
await getNats({ servers: serverUrl });
|
|
}
|
|
|
|
export async function request(subject, payload, opts = {}) {
|
|
console.log('[NATS] Requesting from:', subject, 'payload:', payload);
|
|
if (!nc) await getNats(opts);
|
|
|
|
const msg = typeof payload === "string" ? payload : JSON.stringify(payload);
|
|
const response = await nc.request(subject, sc.encode(msg));
|
|
const responseText = sc.decode(response.data);
|
|
let responseJson;
|
|
try {
|
|
responseJson = JSON.parse(responseText);
|
|
} catch (e) {
|
|
responseJson = { text: responseText };
|
|
}
|
|
console.log('[NATS] Request response:', responseJson);
|
|
return responseJson;
|
|
}
|
|
|
|
export async function reply(subject, handler, opts = {}) {
|
|
console.log('[NATS] Setting up reply handler on:', subject);
|
|
if (!nc) await getNats(opts);
|
|
|
|
const sub = nc.subscribe(subject);
|
|
subs.set(subject, sub);
|
|
|
|
(async () => {
|
|
for await (const m of sub) {
|
|
let text = "";
|
|
try {
|
|
text = sc.decode(m.data);
|
|
let parsed = {};
|
|
try { parsed = JSON.parse(text); } catch (e) { parsed = { text }; }
|
|
console.log('[NATS] Received request on', subject, ':', parsed);
|
|
|
|
const response = handler(parsed, m);
|
|
const responseMsg = typeof response === "string" ? response : JSON.stringify(response);
|
|
nc.publish(m.subject, sc.encode(responseMsg));
|
|
console.log('[NATS] Reply sent to:', m.subject);
|
|
} catch (e) {
|
|
console.warn("nats reply handler error", e);
|
|
}
|
|
}
|
|
})();
|
|
|
|
return {
|
|
unsubscribe: async () => {
|
|
console.log('[NATS] Unsubscribing from:', subject);
|
|
try { sub.unsubscribe(); } catch (e) {}
|
|
subs.delete(subject);
|
|
}
|
|
};
|
|
}
|
|
|
|
export async function sendMsghandlerRequest(subject, functioncall, args, config) {
|
|
console.log('[NATS] Sending msghandler request:', functioncall);
|
|
|
|
const payload = {
|
|
functioncall: functioncall,
|
|
args: args
|
|
};
|
|
|
|
const brokerUrl = config?.nats_server_info?.url || 'wss://nats.yiem.cc';
|
|
const fileserverUrl = config?.file_server_url || 'http://localhost:8080';
|
|
|
|
const [envelope, envelopeJsonStr] = await msghandlerCSR.smartpack(
|
|
subject,
|
|
[["db_request", payload, "dictionary"]],
|
|
{ broker_url: brokerUrl, fileserver_url: fileserverUrl }
|
|
);
|
|
|
|
console.log('[NATS] Request envelope created, sending...');
|
|
const natsReply = await request(subject, envelopeJsonStr);
|
|
|
|
console.log('[NATS] Response received, unpacking...');
|
|
const unpackedEnvelope = await msghandlerCSR.smartunpack(natsReply);
|
|
console.log('-----');
|
|
console.log(unpackedEnvelope);
|
|
console.log('===== 1');
|
|
for (const [dataname, data, type] of unpackedEnvelope.payloads) {
|
|
console.log('[NATS] Extracted payload:', dataname, type);
|
|
if (type === "jsontable" || type === "dictionary") {
|
|
console.log('[NATS] Request successful:', data);
|
|
return data;
|
|
}
|