using JSON3, Dates, UUIDs, PrettyPrinting, LibPQ, Base64, DataFrames using GeneralUtils using Base.Threads # ---------------------------------------------- 100 --------------------------------------------- # """ Expected incomming MQTT message format for this service: { "msgMeta": { "msgPurpose": "updateStatus", "requestresponse": "request", "timestamp": "2024-03-29T05:8:48.362", "replyToMsgId": null, "receiverId": null, "getpost": "get", "msgId": "e5c09bd8-7100-4e4e-bb43-05bee589a22c", "acknowledgestatus": null, "sendTopic": "/agent/wine/backend/chat/api/v1/prompt", "receiverName": "agent-wine-backend", "replyTopic": "/agent/wine/frontend/chat/api/v1/txt/receive", "senderName": "agent-wine-frontend-chat", "senderId": "0938a757-e0ee-40a9-8355-5e24906a87cd" }, "payload" : { "text": "hello" } } """ # load config config = copy(JSON3.read("../mountvolume/config.json")) function executeSQL(sql::T) where {T<:AbstractString} DBconnection = LibPQ.Connection("host=$(config[:db][:host]) port=$(config[:db][:port]) dbname=$(config[:db][:dbname]) user=$(config[:db][:user]) password=$(config[:db][:password])") result = LibPQ.execute(DBconnection, sql) LibPQ.close(DBconnection) return result end function listAllTableColumns(tablename::String)::Vector sql = """ SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '$tablename'; """ response = executeSQL(sql) df = DataFrame(response) return Symbol.(df[:, 1]) end function load_winetable(args::Dict) tablename = "wine" sql = """ SELECT * FROM $tablename; """ response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 return df else return nothing end end function insert_masterWineDB(args::Dict) tablename = "wine" columnToUpdate = listAllTableColumns(tablename) sql = GeneralUtils.generateInsertSQL(tablename, columnToUpdate, args) println("insert_masterWineDB() SQL: $sql") response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function delete_masterWineDB(args::Dict) tablename = "wine" sql = """ DELETE FROM $tablename WHERE wine_id = '$(args[:wine_id])'; """ println("delete_masterWineDB() SQL: $sql") response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function edit_masterWineDB(args::Dict) tablename = "wine" columnToUpdate = listAllTableColumns(tablename) sql = GeneralUtils.generateUpdateSQL(tablename, columnToUpdate, args, [:wine_id]) println("") println("edit_masterWineDB() SQL: $sql") response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function search_masterWineTable(args::Dict) tablename = "wine" _searchkeyword = args[:searchkeyword] searchkeyword1 = split(_searchkeyword, "'") # postgres not support search keyword containing ' searchkeyword_length = length.(searchkeyword1) _, searchkeywordIndex, _ = GeneralUtils.findMax(searchkeyword_length) searchkeyword = searchkeyword1[searchkeywordIndex] columnname = args[:searchcolumn] # check parameters if searchkeyword == "" println("Error, search keyword is empty") return "Error, search keyword is empty" elseif columnname == "" println("Error, search column name is empty") return "Error, search column name is empty" elseif isa(columnname, Number) println("Error, search search column name must be string") return "Error, search search column name must be string" end sql = if searchkeyword == "*" """ SELECT * FROM $tablename; """ else """ SELECT * FROM $tablename WHERE $columnname ILIKE '%$searchkeyword%' LIMIT 1000; """ end println("~~~ sql ", sql) response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function load_retailerWineInventory(args::Dict) retailer_id = args[:retailerid] if length(retailer_id) == 0 println("No retailer_id provided ", @__FILE__, " ", @__LINE__) return nothing end sql = """ SELECT w.wine_id, w.winery, w.wine_name, w.vintage, w.grape, w.wine_type, w.region, w.country, w.created_time, w.updated_time, rw.price, rw.currency FROM wine w JOIN retailer_wine rw ON w.wine_id = rw.wine_id WHERE rw.retailer_id = '$retailer_id'; """ # sql = # """ # SELECT # w.wine_id, # w.wine_name, # w.winery, # w.region, # w.country, # w.wine_type, # w.grape, # w.serving_temperature, # w.intensity, # w.sweetness, # w.tannin, # w.acidity, # w.fizziness, # w.tasting_notes, # w.note, # w.other_attributes, # w.created_time, # w.updated_time, # w.description, # rw.vintage, # rw.price, # rw.currency # FROM # wine w # JOIN # retailer_wine rw ON w.wine_id = rw.wine_id # WHERE # rw.retailer_id = '$retailer_id'; # """ response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 return df else return nothing end end function insert_retailerWineInventory(args::Dict) tablename = "retailer_wine" columnToUpdate = listAllTableColumns(tablename) sql = GeneralUtils.generateInsertSQL(tablename, columnToUpdate, args) println("~~ sql ", sql) response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function delete_retailerWineInventory(args::Dict) tablename = "retailer_wine" sql = """ DELETE FROM $tablename WHERE retailer_id = '$(args[:retailer_id])' AND wine_id = '$(args[:wine_id])'; """ println("~~ sql ", sql) response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function edit_retailerWineInventory(args::Dict) # result = delete_retailerWineInventory(args) # result = insert_retailerWineInventory(args) tablename = "retailer_wine" columnToUpdate = listAllTableColumns(tablename) sql = GeneralUtils.generateUpdateSQL(tablename, columnToUpdate, args, [:retailer_id, :wine_id]) println("~~ sql ", sql) response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end # ---------------------------------------------- 100 --------------------------------------------- # function runServiceInstance( receiveUserMsgChannel::Channel, outputchannel::Channel, config::Dict, timeout::Int64, ) workDict = Dict() latestUserMsgTimeStamp::DateTime = Dates.now() while true # check for new user message if isready(receiveUserMsgChannel) incomingMsg = take!(receiveUserMsgChannel) incomingPayload = incomingMsg[:payload] latestUserMsgTimeStamp = Dates.now() println("") println("<-- incomingMsg ", @__FILE__, " ", @__LINE__) println(incomingMsg) # sending msg back to sender i.e. LINE msgMeta = GeneralUtils.generate_msgMeta( incomingMsg[:msgMeta][:replyTopic]; senderName = "wine_assistant_backend_db", senderId= GeneralUtils.uuid4snakecase(), replyToMsgId= incomingMsg[:msgMeta][:msgId], mqttBrokerAddress= config[:mqttServerInfo][:broker], mqttBrokerPort= config[:mqttServerInfo][:port], ) #[WORKING] add other DB call function here if incomingPayload[:functioncall] == "search_masterWineTable" result = GeneralUtils.timeout(search_masterWineTable, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) # println("") # println("-~~~ outgoingMsg ", @__FILE__, " ", @__LINE__) # pprint(outgoingMsg) elseif incomingPayload[:functioncall] == "load_retailerWineInventory" println("load_retailerWineInventory()") # incomingMsg is requesting metadata by not having :dataTransferSessionID if !haskey(incomingPayload, :dataTransferSessionID) # load data df = load_retailerWineInventory(incomingPayload[:args]) vd = GeneralUtils.dfToVectorDict(df) disvd = GeneralUtils.disintegrate_vectorDict(vd, 100) result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg; data=disvd) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict{Symbol, Any}( :functioncall=> incomingPayload[:functioncall], ) ) # merge dictionary so that keys are on the same level for (k, v) in result outgoingMsg[:payload][k] = v end _ = GeneralUtils.sendMqttMsg(outgoingMsg) else # call dataTransferOverMQTT_sender result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict{Symbol, Any}( :functioncall=> incomingPayload[:functioncall], ) ) for (k, v) in result outgoingMsg[:payload][k] = v end _ = GeneralUtils.sendMqttMsg(outgoingMsg) end # elseif incomingPayload[:functioncall] == "load_retailerWineInventory" # result = GeneralUtils.timeout(load_retailerWineInventory, 30; # fargs=incomingPayload[:args]) # outgoingMsg = Dict( # :msgMeta=> msgMeta, # :payload=> Dict( # :functioncall=> incomingPayload[:functioncall], # :result=> result # ) # ) # _ = GeneralUtils.sendMqttMsg(outgoingMsg) elseif incomingPayload[:functioncall] == "insert_retailerWineInventory" println("insert_retailerWineInventory()") result = GeneralUtils.timeout(insert_retailerWineInventory, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) elseif incomingPayload[:functioncall] == "delete_retailerWineInventory" println("delete_retailerWineInventory()") retailer_id = incomingPayload[:args][:retailer_id] wine_id = incomingPayload[:args][:wine_id] if length(retailer_id) != 0 && length(wine_id) != 0 result = GeneralUtils.timeout(delete_retailerWineInventory, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) else println("Skipped call, insufficient args for delete_retailerWineInventory() retailer_id: $retailer_id wine_id: $wine_id") end elseif incomingPayload[:functioncall] == "edit_retailerWineInventory" println("edit_retailerWineInventory()") retailer_id = incomingPayload[:args][:retailer_id] wine_id = incomingPayload[:args][:wine_id] if length(retailer_id) != 0 && length(wine_id) != 0 result = GeneralUtils.timeout(edit_retailerWineInventory, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) else println("Skipped call, insufficient args for edit_retailerWineInventory() retailer_id: $retailer_id wine_id: $wine_id") end elseif incomingPayload[:functioncall] == "load_winetable" println("load_winetable()") # incomingMsg is requesting metadata by not having :dataTransferSessionID if !haskey(incomingPayload, :dataTransferSessionID) # load data df = load_winetable(incomingPayload[:args]) vd = GeneralUtils.dfToVectorDict(df) println(typeof(vd)) disvd = GeneralUtils.disintegrate_vectorDict(vd, 100) result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg; data=disvd) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict{Symbol, Any}( :functioncall=> incomingPayload[:functioncall], ) ) for (k, v) in result outgoingMsg[:payload][k] = v end _ = GeneralUtils.sendMqttMsg(outgoingMsg) pprintln(outgoingMsg) else # call dataTransferOverMQTT_sender result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict{Symbol, Any}( :functioncall=> incomingPayload[:functioncall], ) ) for (k, v) in result outgoingMsg[:payload][k] = v end _ = GeneralUtils.sendMqttMsg(outgoingMsg) end elseif incomingPayload[:functioncall] == "insert_masterWineDB" println("insert_masterWineDB()") result = GeneralUtils.timeout(insert_masterWineDB, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) elseif incomingPayload[:functioncall] == "delete_masterWineDB" result = GeneralUtils.timeout(delete_masterWineDB, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) elseif incomingPayload[:functioncall] == "edit_masterWineDB" println("edit_masterWineDB()") result = GeneralUtils.timeout(edit_masterWineDB, 30; fargs=incomingPayload[:args]) # result = edit_masterWineDB(incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) else println("") println("~~~ The requested function is not defined ", @__FILE__, " ", @__LINE__) end end # self terminate if too long inactivity timediff = GeneralUtils.timedifference(latestUserMsgTimeStamp, Dates.now(), "minutes") if timediff > timeout result = Dict(:exitreason=>"timeout", :timestamp=>Dates.now()) put!(outputchannel, result) disconnect(a.mqttClient) break else sleep(1) # allowing on_msg_2, asyncmove above and other process to run end end end sessionDict = Dict{String,Any}() mqttMsgReceiveChannel = (ch1=Channel(8),) # store msg that coming into servicetopic keepaliveChannel::Channel{Dict} = Channel{Dict}(8) # Define the callback for receiving messages. function onMsgCallback_1(topic, payload) jobj = JSON3.read(String(payload)) incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively if occursin("db", topic) # println("~~~ incomingMqttMsg ", incomingMqttMsg) put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg) elseif occursin("keepalive", topic) put!(keepaliveChannel, incomingMqttMsg) else println("undefined condition ", @__FILE__, " ", @__LINE__) end end mqttInstance = GeneralUtils.mqttClientInstance_v2( config[:mqttServerInfo][:broker], config[:servicetopic][:mqtttopic], mqttMsgReceiveChannel, keepaliveChannel, onMsgCallback_1 ) println("ready!") # ------------------------------------------------------------------------------------------------ # # this service main loop # # ------------------------------------------------------------------------------------------------ # function main() sessiontimeout = 1*1*10 # timeout in minutes checkSessionTimeout = 10 # minutes clearedSessionTimestamp = Dates.now() lastMsgId = nothing while true # check if mqtt connection is still up _ = GeneralUtils.checkMqttConnection!(mqttInstance; keepaliveCheckInterval=30) # check for new session if isready(mqttMsgReceiveChannel[:ch1]) msg = popfirst!(mqttMsgReceiveChannel[:ch1]) # println("~~~ new msg ", msg[:payload]) # @spawn new runAgentInstance and store it in sessionDict # use agent's frontend id because 1 backend agent per 1 frontend session sessionId = msg[:msgMeta][:senderId] sessionId = replace(sessionId, "-" => "_") # julia can't use "-" in a dict key msgId = msg[:msgMeta][:msgId] if msgId != lastMsgId && sessionId ∉ keys(sessionDict) lastMsgId = msgId inputch = Channel{Dict}(8) outputch = Channel{Dict}(8) process = @spawn runServiceInstance(inputch, outputch, config, sessiontimeout) # process = runServiceInstance(inputch, outputch, config, sessiontimeout) #XXX use spawn version println("~~ instantiate session success") # call runAgentInstance() and store it in sessionDict to be able to check on it later sessionDict[sessionId] = Dict( :inputchannel=> inputch, :outputchannel=> outputch, :process=> process, ) put!(sessionDict[sessionId][:inputchannel], msg) else put!(sessionDict[sessionId][:inputchannel], msg) end end # check for process completed msg in serviceInternalTopic and delete it from sessionDict # self terminate if too long inactivity timediff = GeneralUtils.timedifference(clearedSessionTimestamp, Dates.now(), "minutes") if timediff > checkSessionTimeout for (sessionId, v) in sessionDict if isready(v[:outputchannel]) result = take!(v[:outputchannel]) if result[:exitreason] == "timeout" println("sessionId $(sessionId) has been deleted because it is timed out") delete!(sessionDict, sessionId) end end end clearedSessionTimestamp = Dates.now() end # sleep is needed because MQTTClient use async. while true loop leave no # chance for control to switch to on_msg() sleep(1) end end main()