From 11b317af2bc94392c8687c5494585b0e7c408293 Mon Sep 17 00:00:00 2001 From: narawat Date: Fri, 29 May 2026 12:05:57 +0700 Subject: [PATCH] first commit --- natsClient.js | 238 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 natsClient.js diff --git a/natsClient.js b/natsClient.js new file mode 100644 index 0000000..29b9616 --- /dev/null +++ b/natsClient.js @@ -0,0 +1,238 @@ +import { connect, StringCodec } from "nats.ws"; +import { writable } from "svelte/store"; +import msghandlerCSR from './msghandler-csr.js'; + +export const connectionStatus = writable(null); + +connectionStatus.subscribe(status => { + console.log('[NATS] Status updated:', status); +}); + +let nc = null; +let sc = StringCodec(); +let subs = new Map(); +let reconnectTimer = null; +let isDisconnecting = false; + +export async function getNats(opts = {}) { + const servers = opts.servers || (import.meta.env.VITE_NATS_URL) || "wss://nats.yiem.cc"; + console.log('[NATS] Connecting to:', servers); + try { + connectionStatus.set({ state: "connecting", message: `connecting to ${servers}` }); + console.log('[NATS] Status: connecting'); + nc = await connect({ servers }); + console.log('[NATS] Connected successfully!'); + connectionStatus.set({ state: "connected", message: `connected to ${servers}` }); + isDisconnecting = false; + + nc.closed().then((err) => { + if (isDisconnecting) return; + if (err) { + console.log('[NATS] Connection closed with error:', err); + connectionStatus.set({ state: "error", message: String(err) }); + } else { + console.log('[NATS] Connection closed normally'); + connectionStatus.set({ state: "disconnected", message: "connection closed" }); + } + attemptReconnect(servers); + }); + + return nc; + } catch (e) { + console.error('[NATS] Connection failed:', e); + connectionStatus.set({ state: "error", message: String(e) }); + attemptReconnect(servers); + throw e; + } +} + +async function attemptReconnect(servers, retryCount = 0) { + const maxDelay = 30000; + const baseDelay = Math.min(1000 * Math.pow(2, retryCount), maxDelay); + + console.log(`[NATS] Reconnecting in ${baseDelay}ms (attempt ${retryCount + 1})...`); + + reconnectTimer = setTimeout(async () => { + try { + connectionStatus.set({ state: "connecting", message: `reconnecting to ${servers}` }); + nc = await connect({ servers }); + console.log('[NATS] Reconnected successfully!'); + connectionStatus.set({ state: "connected", message: `connected to ${servers}` }); + isDisconnecting = false; + + const subsCopy = Array.from(subs.entries()); + for (const [subject, onMessage] of subsCopy) { + try { + await subscribe(subject, onMessage); + } catch (e) { + console.warn(`[NATS] Failed to re-subscribe to ${subject}:`, e); + } + } + } catch (e) { + console.error('[NATS] Reconnection failed:', e); + connectionStatus.set({ state: "error", message: String(e) }); + attemptReconnect(servers, retryCount + 1); + } + }, baseDelay); +} + +export async function subscribe(subject, onMessage) { + console.log('[NATS] Subscribing to:', subject); + if (!nc) await getNats(); + const sub = nc.subscribe(subject); + subs.set(subject, sub); + console.log('[NATS] Subscription created for:', subject); + (async () => { + for await (const m of sub) { + let text = ""; + try { + text = sc.decode(m.data); + let parsed = {}; + try { parsed = JSON.parse(text); } catch (e) { parsed = { text }; } + console.log('[NATS] Received message on', subject, ':', parsed); + onMessage(parsed); + } catch (e) { + console.warn("nats message parse error", e); + } + } + })(); + return { + unsubscribe: async () => { + console.log('[NATS] Unsubscribing from:', subject); + try { sub.unsubscribe(); } catch (e) {} + subs.delete(subject); + } + }; +} + +export async function publish(subject, payload = {}, opts = {}) { + console.log('[NATS] Publishing to:', subject, 'payload:', payload); + if (!nc) await getNats(opts); + const msg = typeof payload === "string" ? payload : JSON.stringify(payload); + nc.publish(subject, sc.encode(msg)); + console.log('[NATS] Message published successfully'); +} + +export async function closeNats() { + console.log('[NATS] Closing connection...'); + connectionStatus.set({ state: "disconnected", message: "closing" }); + try { + for (const s of subs.values()) { + try { s.unsubscribe(); } catch (e) {} + } + subs.clear(); + if (nc) { + await nc.close(); + nc = null; + } + console.log('[NATS] Connection closed'); + connectionStatus.set({ state: "disconnected", message: "closed" }); + } catch (e) { + console.error('[NATS] Error closing connection:', e); + connectionStatus.set({ state: "error", message: String(e) }); + } +} + +export async function connectNATS(brokerUrl, config) { + console.log('[NATS] Connecting with broker URL:', brokerUrl); + + let serverUrl = brokerUrl; + + if (!serverUrl && config) { + serverUrl = config?.nats_server_info?.url || 'wss://nats.yiem.cc'; + } + + console.log('[NATS] Using server URL:', serverUrl); + await getNats({ servers: serverUrl }); +} + +export async function request(subject, payload, opts = {}) { + console.log('[NATS] Requesting from:', subject, 'payload:', payload); + if (!nc) await getNats(opts); + + const msg = typeof payload === "string" ? payload : JSON.stringify(payload); + const response = await nc.request(subject, sc.encode(msg)); + const responseText = sc.decode(response.data); + let responseJson; + try { + responseJson = JSON.parse(responseText); + } catch (e) { + responseJson = { text: responseText }; + } + console.log('[NATS] Request response:', responseJson); + return responseJson; +} + +export async function reply(subject, handler, opts = {}) { + console.log('[NATS] Setting up reply handler on:', subject); + if (!nc) await getNats(opts); + + const sub = nc.subscribe(subject); + subs.set(subject, sub); + + (async () => { + for await (const m of sub) { + let text = ""; + try { + text = sc.decode(m.data); + let parsed = {}; + try { parsed = JSON.parse(text); } catch (e) { parsed = { text }; } + console.log('[NATS] Received request on', subject, ':', parsed); + + const response = handler(parsed, m); + const responseMsg = typeof response === "string" ? response : JSON.stringify(response); + nc.publish(m.subject, sc.encode(responseMsg)); + console.log('[NATS] Reply sent to:', m.subject); + } catch (e) { + console.warn("nats reply handler error", e); + } + } + })(); + + return { + unsubscribe: async () => { + console.log('[NATS] Unsubscribing from:', subject); + try { sub.unsubscribe(); } catch (e) {} + subs.delete(subject); + } + }; +} + +export async function sendMsghandlerRequest(subject, functioncall, args, config) { + console.log('[NATS] Sending msghandler request:', functioncall); + + const payload = { + functioncall: functioncall, + args: args + }; + + const brokerUrl = config?.nats_server_info?.url || 'wss://nats.yiem.cc'; + + const [envelope, envelopeJsonStr] = await msghandlerCSR.smartpack( + subject, + [["db_request", payload, "dictionary"]], + { broker_url: brokerUrl } + ); + + console.log('[NATS] Request envelope created, sending...'); + const natsReply = await request(subject, envelopeJsonStr); + + console.log('[NATS] Response received, unpacking...'); + const unpackedEnvelope = await msghandlerCSR.smartunpack(natsReply); + console.log('-----'); + console.log(unpackedEnvelope); + console.log('===== 1'); + for (const [dataname, data, type] of unpackedEnvelope.payloads) { + console.log('[NATS] Extracted payload:', dataname, type); + if (type === "jsontable" || type === "dictionary") { + console.log('[NATS] Request successful:', data); + return data; + } + if (type === "text") { + console.error('[NATS] Error response:', data); + throw new Error(data); + } + } + + throw new Error('No valid response found'); +}