import { connect, StringCodec } from "nats.ws"; import { writable } from "svelte/store"; import msghandlerCSR from './msghandler-csr.js'; export const connectionStatus = writable(null); let config = null; export function setConfig(cfg) { config = cfg; } connectionStatus.subscribe(status => { console.log('[NATS] Status updated:', status); }); let nc = null; let sc = StringCodec(); let subs = new Map(); let reconnectTimer = null; let isDisconnecting = false; export async function getNats(opts = {}) { const servers = opts.servers || (import.meta.env.VITE_NATS_URL) || (config?.nats_server_info?.url) || "wss://nats.yiem.cc"; console.log('[NATS] Connecting to:', servers); try { connectionStatus.set({ state: "connecting", message: `connecting to ${servers}` }); console.log('[NATS] Status: connecting'); nc = await connect({ servers }); console.log('[NATS] Connected successfully!'); connectionStatus.set({ state: "connected", message: `connected to ${servers}` }); isDisconnecting = false; nc.closed().then((err) => { if (isDisconnecting) return; if (err) { console.log('[NATS] Connection closed with error:', err); connectionStatus.set({ state: "error", message: String(err) }); } else { console.log('[NATS] Connection closed normally'); connectionStatus.set({ state: "disconnected", message: "connection closed" }); } attemptReconnect(servers); }); return nc; } catch (e) { console.error('[NATS] Connection failed:', e); connectionStatus.set({ state: "error", message: String(e) }); attemptReconnect(servers); throw e; } } async function attemptReconnect(servers, retryCount = 0) { const maxDelay = 30000; const baseDelay = Math.min(1000 * Math.pow(2, retryCount), maxDelay); console.log(`[NATS] Reconnecting in ${baseDelay}ms (attempt ${retryCount + 1})...`); reconnectTimer = setTimeout(async () => { try { connectionStatus.set({ state: "connecting", message: `reconnecting to ${servers}` }); nc = await connect({ servers }); console.log('[NATS] Reconnected successfully!'); connectionStatus.set({ state: "connected", message: `connected to ${servers}` }); isDisconnecting = false; const subsCopy = Array.from(subs.entries()); for (const [subject, onMessage] of subsCopy) { try { await subscribe(subject, onMessage); } catch (e) { console.warn(`[NATS] Failed to re-subscribe to ${subject}:`, e); } } } catch (e) { console.error('[NATS] Reconnection failed:', e); connectionStatus.set({ state: "error", message: String(e) }); attemptReconnect(servers, retryCount + 1); } }, baseDelay); } export async function subscribe(subject, onMessage) { console.log('[NATS] Subscribing to:', subject); if (!nc) await getNats(); const sub = nc.subscribe(subject); subs.set(subject, sub); console.log('[NATS] Subscription created for:', subject); (async () => { for await (const m of sub) { let text = ""; try { text = sc.decode(m.data); let parsed = {}; try { parsed = JSON.parse(text); } catch (e) { parsed = { text }; } console.log('[NATS] Received message on', subject, ':', parsed); onMessage(parsed); } catch (e) { console.warn("nats message parse error", e); } } })(); return { unsubscribe: async () => { console.log('[NATS] Unsubscribing from:', subject); try { sub.unsubscribe(); } catch (e) {} subs.delete(subject); } }; } export async function publish(subject, payload = {}, opts = {}) { console.log('[NATS] Publishing to:', subject, 'payload:', payload); if (!nc) await getNats(opts); const msg = typeof payload === "string" ? payload : JSON.stringify(payload); nc.publish(subject, sc.encode(msg)); console.log('[NATS] Message published successfully'); } export async function closeNats() { console.log('[NATS] Closing connection...'); connectionStatus.set({ state: "disconnected", message: "closing" }); try { for (const s of subs.values()) { try { s.unsubscribe(); } catch (e) {} } subs.clear(); if (nc) { await nc.close(); nc = null; } console.log('[NATS] Connection closed'); connectionStatus.set({ state: "disconnected", message: "closed" }); } catch (e) { console.error('[NATS] Error closing connection:', e); connectionStatus.set({ state: "error", message: String(e) }); } } export async function connectNATS(brokerUrl, cfg) { console.log('[NATS] Connecting with broker URL:', brokerUrl); let serverUrl = brokerUrl; if (!serverUrl && cfg) { serverUrl = cfg?.nats_server_info?.url || 'wss://nats.yiem.cc'; } console.log('[NATS] Using server URL:', serverUrl); setConfig(cfg); await getNats({ servers: serverUrl }); } export async function request(subject, payload, opts = {}) { console.log('[NATS] Requesting from:', subject, 'payload:', payload); if (!nc) await getNats(opts); const msg = typeof payload === "string" ? payload : JSON.stringify(payload); const response = await nc.request(subject, sc.encode(msg)); const responseText = sc.decode(response.data); let responseJson; try { responseJson = JSON.parse(responseText); } catch (e) { responseJson = { text: responseText }; } console.log('[NATS] Request response:', responseJson); return responseJson; } export async function reply(subject, handler, opts = {}) { console.log('[NATS] Setting up reply handler on:', subject); if (!nc) await getNats(opts); const sub = nc.subscribe(subject); subs.set(subject, sub); (async () => { for await (const m of sub) { let text = ""; try { text = sc.decode(m.data); let parsed = {}; try { parsed = JSON.parse(text); } catch (e) { parsed = { text }; } console.log('[NATS] Received request on', subject, ':', parsed); const response = handler(parsed, m); const responseMsg = typeof response === "string" ? response : JSON.stringify(response); nc.publish(m.subject, sc.encode(responseMsg)); console.log('[NATS] Reply sent to:', m.subject); } catch (e) { console.warn("nats reply handler error", e); } } })(); return { unsubscribe: async () => { console.log('[NATS] Unsubscribing from:', subject); try { sub.unsubscribe(); } catch (e) {} subs.delete(subject); } }; } export async function sendMsghandlerRequest(subject, functioncall, args, config) { console.log('[NATS] Sending msghandler request:', functioncall); const payload = { functioncall: functioncall, args: args }; const brokerUrl = config?.nats_server_info?.url || 'wss://nats.yiem.cc'; const fileserverUrl = config?.file_server_url || 'http://localhost:8080'; const [envelope, envelopeJsonStr] = await msghandlerCSR.smartpack( subject, [["db_request", payload, "dictionary"]], { broker_url: brokerUrl, fileserver_url: fileserverUrl } ); console.log('[NATS] Request envelope created, sending...'); const natsReply = await request(subject, envelopeJsonStr); console.log('[NATS] Response received, unpacking...'); const unpackedEnvelope = await msghandlerCSR.smartunpack(natsReply); console.log('-----'); console.log(unpackedEnvelope); console.log('===== 1'); for (const [dataname, data, type] of unpackedEnvelope.payloads) { console.log('[NATS] Extracted payload:', dataname, type); if (type === "jsontable" || type === "dictionary") { console.log('[NATS] Request successful:', data); return data; }