first commit
This commit is contained in:
238
natsClient.js
Normal file
238
natsClient.js
Normal file
@@ -0,0 +1,238 @@
|
||||
import { connect, StringCodec } from "nats.ws";
|
||||
import { writable } from "svelte/store";
|
||||
import msghandlerCSR from './msghandler-csr.js';
|
||||
|
||||
export const connectionStatus = writable(null);
|
||||
|
||||
connectionStatus.subscribe(status => {
|
||||
console.log('[NATS] Status updated:', status);
|
||||
});
|
||||
|
||||
let nc = null;
|
||||
let sc = StringCodec();
|
||||
let subs = new Map();
|
||||
let reconnectTimer = null;
|
||||
let isDisconnecting = false;
|
||||
|
||||
export async function getNats(opts = {}) {
|
||||
const servers = opts.servers || (import.meta.env.VITE_NATS_URL) || "wss://nats.yiem.cc";
|
||||
console.log('[NATS] Connecting to:', servers);
|
||||
try {
|
||||
connectionStatus.set({ state: "connecting", message: `connecting to ${servers}` });
|
||||
console.log('[NATS] Status: connecting');
|
||||
nc = await connect({ servers });
|
||||
console.log('[NATS] Connected successfully!');
|
||||
connectionStatus.set({ state: "connected", message: `connected to ${servers}` });
|
||||
isDisconnecting = false;
|
||||
|
||||
nc.closed().then((err) => {
|
||||
if (isDisconnecting) return;
|
||||
if (err) {
|
||||
console.log('[NATS] Connection closed with error:', err);
|
||||
connectionStatus.set({ state: "error", message: String(err) });
|
||||
} else {
|
||||
console.log('[NATS] Connection closed normally');
|
||||
connectionStatus.set({ state: "disconnected", message: "connection closed" });
|
||||
}
|
||||
attemptReconnect(servers);
|
||||
});
|
||||
|
||||
return nc;
|
||||
} catch (e) {
|
||||
console.error('[NATS] Connection failed:', e);
|
||||
connectionStatus.set({ state: "error", message: String(e) });
|
||||
attemptReconnect(servers);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
async function attemptReconnect(servers, retryCount = 0) {
|
||||
const maxDelay = 30000;
|
||||
const baseDelay = Math.min(1000 * Math.pow(2, retryCount), maxDelay);
|
||||
|
||||
console.log(`[NATS] Reconnecting in ${baseDelay}ms (attempt ${retryCount + 1})...`);
|
||||
|
||||
reconnectTimer = setTimeout(async () => {
|
||||
try {
|
||||
connectionStatus.set({ state: "connecting", message: `reconnecting to ${servers}` });
|
||||
nc = await connect({ servers });
|
||||
console.log('[NATS] Reconnected successfully!');
|
||||
connectionStatus.set({ state: "connected", message: `connected to ${servers}` });
|
||||
isDisconnecting = false;
|
||||
|
||||
const subsCopy = Array.from(subs.entries());
|
||||
for (const [subject, onMessage] of subsCopy) {
|
||||
try {
|
||||
await subscribe(subject, onMessage);
|
||||
} catch (e) {
|
||||
console.warn(`[NATS] Failed to re-subscribe to ${subject}:`, e);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[NATS] Reconnection failed:', e);
|
||||
connectionStatus.set({ state: "error", message: String(e) });
|
||||
attemptReconnect(servers, retryCount + 1);
|
||||
}
|
||||
}, baseDelay);
|
||||
}
|
||||
|
||||
export async function subscribe(subject, onMessage) {
|
||||
console.log('[NATS] Subscribing to:', subject);
|
||||
if (!nc) await getNats();
|
||||
const sub = nc.subscribe(subject);
|
||||
subs.set(subject, sub);
|
||||
console.log('[NATS] Subscription created for:', subject);
|
||||
(async () => {
|
||||
for await (const m of sub) {
|
||||
let text = "";
|
||||
try {
|
||||
text = sc.decode(m.data);
|
||||
let parsed = {};
|
||||
try { parsed = JSON.parse(text); } catch (e) { parsed = { text }; }
|
||||
console.log('[NATS] Received message on', subject, ':', parsed);
|
||||
onMessage(parsed);
|
||||
} catch (e) {
|
||||
console.warn("nats message parse error", e);
|
||||
}
|
||||
}
|
||||
})();
|
||||
return {
|
||||
unsubscribe: async () => {
|
||||
console.log('[NATS] Unsubscribing from:', subject);
|
||||
try { sub.unsubscribe(); } catch (e) {}
|
||||
subs.delete(subject);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export async function publish(subject, payload = {}, opts = {}) {
|
||||
console.log('[NATS] Publishing to:', subject, 'payload:', payload);
|
||||
if (!nc) await getNats(opts);
|
||||
const msg = typeof payload === "string" ? payload : JSON.stringify(payload);
|
||||
nc.publish(subject, sc.encode(msg));
|
||||
console.log('[NATS] Message published successfully');
|
||||
}
|
||||
|
||||
export async function closeNats() {
|
||||
console.log('[NATS] Closing connection...');
|
||||
connectionStatus.set({ state: "disconnected", message: "closing" });
|
||||
try {
|
||||
for (const s of subs.values()) {
|
||||
try { s.unsubscribe(); } catch (e) {}
|
||||
}
|
||||
subs.clear();
|
||||
if (nc) {
|
||||
await nc.close();
|
||||
nc = null;
|
||||
}
|
||||
console.log('[NATS] Connection closed');
|
||||
connectionStatus.set({ state: "disconnected", message: "closed" });
|
||||
} catch (e) {
|
||||
console.error('[NATS] Error closing connection:', e);
|
||||
connectionStatus.set({ state: "error", message: String(e) });
|
||||
}
|
||||
}
|
||||
|
||||
export async function connectNATS(brokerUrl, config) {
|
||||
console.log('[NATS] Connecting with broker URL:', brokerUrl);
|
||||
|
||||
let serverUrl = brokerUrl;
|
||||
|
||||
if (!serverUrl && config) {
|
||||
serverUrl = config?.nats_server_info?.url || 'wss://nats.yiem.cc';
|
||||
}
|
||||
|
||||
console.log('[NATS] Using server URL:', serverUrl);
|
||||
await getNats({ servers: serverUrl });
|
||||
}
|
||||
|
||||
export async function request(subject, payload, opts = {}) {
|
||||
console.log('[NATS] Requesting from:', subject, 'payload:', payload);
|
||||
if (!nc) await getNats(opts);
|
||||
|
||||
const msg = typeof payload === "string" ? payload : JSON.stringify(payload);
|
||||
const response = await nc.request(subject, sc.encode(msg));
|
||||
const responseText = sc.decode(response.data);
|
||||
let responseJson;
|
||||
try {
|
||||
responseJson = JSON.parse(responseText);
|
||||
} catch (e) {
|
||||
responseJson = { text: responseText };
|
||||
}
|
||||
console.log('[NATS] Request response:', responseJson);
|
||||
return responseJson;
|
||||
}
|
||||
|
||||
export async function reply(subject, handler, opts = {}) {
|
||||
console.log('[NATS] Setting up reply handler on:', subject);
|
||||
if (!nc) await getNats(opts);
|
||||
|
||||
const sub = nc.subscribe(subject);
|
||||
subs.set(subject, sub);
|
||||
|
||||
(async () => {
|
||||
for await (const m of sub) {
|
||||
let text = "";
|
||||
try {
|
||||
text = sc.decode(m.data);
|
||||
let parsed = {};
|
||||
try { parsed = JSON.parse(text); } catch (e) { parsed = { text }; }
|
||||
console.log('[NATS] Received request on', subject, ':', parsed);
|
||||
|
||||
const response = handler(parsed, m);
|
||||
const responseMsg = typeof response === "string" ? response : JSON.stringify(response);
|
||||
nc.publish(m.subject, sc.encode(responseMsg));
|
||||
console.log('[NATS] Reply sent to:', m.subject);
|
||||
} catch (e) {
|
||||
console.warn("nats reply handler error", e);
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
return {
|
||||
unsubscribe: async () => {
|
||||
console.log('[NATS] Unsubscribing from:', subject);
|
||||
try { sub.unsubscribe(); } catch (e) {}
|
||||
subs.delete(subject);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export async function sendMsghandlerRequest(subject, functioncall, args, config) {
|
||||
console.log('[NATS] Sending msghandler request:', functioncall);
|
||||
|
||||
const payload = {
|
||||
functioncall: functioncall,
|
||||
args: args
|
||||
};
|
||||
|
||||
const brokerUrl = config?.nats_server_info?.url || 'wss://nats.yiem.cc';
|
||||
|
||||
const [envelope, envelopeJsonStr] = await msghandlerCSR.smartpack(
|
||||
subject,
|
||||
[["db_request", payload, "dictionary"]],
|
||||
{ broker_url: brokerUrl }
|
||||
);
|
||||
|
||||
console.log('[NATS] Request envelope created, sending...');
|
||||
const natsReply = await request(subject, envelopeJsonStr);
|
||||
|
||||
console.log('[NATS] Response received, unpacking...');
|
||||
const unpackedEnvelope = await msghandlerCSR.smartunpack(natsReply);
|
||||
console.log('-----');
|
||||
console.log(unpackedEnvelope);
|
||||
console.log('===== 1');
|
||||
for (const [dataname, data, type] of unpackedEnvelope.payloads) {
|
||||
console.log('[NATS] Extracted payload:', dataname, type);
|
||||
if (type === "jsontable" || type === "dictionary") {
|
||||
console.log('[NATS] Request successful:', data);
|
||||
return data;
|
||||
}
|
||||
if (type === "text") {
|
||||
console.error('[NATS] Error response:', data);
|
||||
throw new Error(data);
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error('No valid response found');
|
||||
}
|
||||
Reference in New Issue
Block a user