using JSON3, MQTTClient, Dates, UUIDs, PrettyPrinting, LibPQ, Base64, DataFrames using GeneralUtils using Base.Threads # ---------------------------------------------- 100 --------------------------------------------- # """ Expected incomming MQTT message format for this service: { "msgMeta": { "msgPurpose": "updateStatus", "requestresponse": "request", "timestamp": "2024-03-29T05:8:48.362", "replyToMsgId": null, "receiverId": null, "getpost": "get", "msgId": "e5c09bd8-7100-4e4e-bb43-05bee589a22c", "acknowledgestatus": null, "sendTopic": "/agent/wine/backend/chat/api/v1/prompt", "receiverName": "agent-wine-backend", "replyTopic": "/agent/wine/frontend/chat/api/v1/txt/receive", "senderName": "agent-wine-frontend-chat", "senderId": "0938a757-e0ee-40a9-8355-5e24906a87cd" }, "payload" : { "text": "hello" } } """ # load config config = copy(JSON3.read("config.json")) function executeSQL(sql::T) where {T<:AbstractString} DBconnection = LibPQ.Connection("host=192.168.88.12 port=10201 dbname=wineDB user=yiemtechnologies password=yiemtechnologies@Postgres_0.0") result = LibPQ.execute(DBconnection, sql) LibPQ.close(DBconnection) return result end # function executeSQLVectorDB(sql) # DBconnection = LibPQ.Connection("host=192.168.88.12 port=5433 dbname=SQLVectorDB user=yiemtechnologies@gmail.com password=yiem@Postgres_0.0") # result = LibPQ.execute(DBconnection, sql) # LibPQ.close(DBconnection) # return result # end # function addSQLVectorDB(state) # # get embedding of the query # query = [state[:thoughtHistory][:question]] # msgMeta = GeneralUtils.generate_msgMeta( # config[:externalservice][:text2textinstruct][:mqtttopic]; # msgPurpose= "embedding", # senderName= "yiemagent", # senderId= string(uuid4()), # receiverName= "text2textinstruct", # mqttBrokerAddress= config[:mqttServerInfo][:broker], # mqttBrokerPort= config[:mqttServerInfo][:port], # ) # outgoingMsg = Dict( # :msgMeta=> msgMeta, # :payload=> Dict( # :text=> query # ) # ) # response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg) # embedding = response[:response][:embeddings][1] # # check whether there is close enough vector already store in vectorDB. if no, add, else skip # sql = # """ # SELECT *, embedding <-> '$embedding' as distance # FROM sql_statement_repository # ORDER BY distance LIMIT 1; # """ # response = executeSQLVectorDB(sql) # df = DataFrame(response) # row, col = size(df) # distance = row == 0 ? Inf : df[1, :distance] # if row == 0 || distance > 1 # no close enough SQL stored in the database # latestKey, _ = GeneralUtils.findHighestIndexKey(state[:thoughtHistory], :action_input) # _sqlStatement = state[:thoughtHistory][latestKey] # if occursin("SELECT", _sqlStatement) # make sure it is an SQL statement before adding into DB # sqlStatementBase64 = base64encode(_sqlStatement) # sqlStatement = replace(_sqlStatement, "'"=>"") # sql = # """ # INSERT INTO sql_statement_repository (question, sql_statement, sql_statement_base64, embedding) VALUES ('$query', '$sqlStatement', '$sqlStatementBase64', '$embedding'); # """ # _ = executeSQLVectorDB(sql) # println("~~ added new SQL statement to vectorDB ", @__FILE__, " ", @__LINE__) # println(sqlStatement) # end # end # end # function querySQLVectorDB(state) # # provide similarSQL at the first time thinking only # latestKey, _ = GeneralUtils.findHighestIndexKey(state[:thoughtHistory], :action_input) # if latestKey === nothing # # get embedding of the query # query = [state[:thoughtHistory][:question]] # msgMeta = GeneralUtils.generate_msgMeta( # config[:externalservice][:text2textinstruct][:mqtttopic]; # msgPurpose= "embedding", # senderName= "yiemagent", # senderId= string(uuid4()), # receiverName= "text2textinstruct", # mqttBrokerAddress= config[:mqttServerInfo][:broker], # mqttBrokerPort= config[:mqttServerInfo][:port], # ) # outgoingMsg = Dict( # :msgMeta=> msgMeta, # :payload=> Dict( # :text=> query # ) # ) # response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg) # embedding = response[:response][:embeddings][1] # # check whether there is close enough vector already store in vectorDB. if no, add, else skip # sql = # """ # SELECT *, embedding <-> '$embedding' as distance # FROM sql_statement_repository # ORDER BY distance LIMIT 1; # """ # response = executeSQLVectorDB(sql) # df = DataFrame(response) # row, col = size(df) # distance = row == 0 ? Inf : df[1, :distance] # if row != 0 && distance < 100 # # if there is usable SQL, return it. # sqlStatementBase64 = df[1, :sql_statement_base64] # sqlStatement = String(base64decode(sqlStatementBase64)) # println("~~~ getting SQL statement from vectorDB ", @__FILE__, " ", @__LINE__) # println(sqlStatement) # return sqlStatement # else # return nothing # end # end # return nothing # end function listAllTableColumns(tablename::String) sql = """ SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '$tablename'; """ response = executeSQL(sql) df = DataFrame(response) return Symbol.(df[:, 1]) end function load_winetable(args::Dict) tablename = "wine" sql = """ SELECT * FROM $tablename; """ response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 return df else return nothing end end function insert_masterWineDB(args::Dict) tablename = "wine" columnToUpdate = listAllTableColumns(tablename) sql = GeneralUtils.generateInsertSQL(tablename, columnToUpdate, args) println("insert_masterWineDB() SQL: $sql") response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function delete_masterWineDB(args::Dict) tablename = "wine" sql = """ DELETE FROM $tablename WHERE wine_id = '$(args[:wine_id])'; """ println("delete_masterWineDB() SQL: $sql") response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function edit_masterWineDB(args::Dict) tablename = "wine" columnToUpdate = listAllTableColumns(tablename) sql = GeneralUtils.generateUpdateSQL(tablename, columnToUpdate, args, [:wine_id]) println("") println("edit_masterWineDB() SQL: $sql") response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function search_masterWineTable(args::Dict) tablename = "wine" _searchkeyword = args[:searchkeyword] searchkeyword1 = split(_searchkeyword, "'") # postgres not support search keyword containing ' searchkeyword_length = length.(searchkeyword1) _, searchkeywordIndex, _ = GeneralUtils.findMax(searchkeyword_length) searchkeyword = searchkeyword1[searchkeywordIndex] columnname = args[:searchcolumn] sql = if searchkeyword == "*" """ SELECT * FROM $tablename; """ else """ SELECT * FROM $tablename WHERE $columnname ILIKE '%$searchkeyword%' LIMIT 100; """ end println("~~~ sql ", sql) response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function load_retailerWineInventory(args::Dict) retailer_id = args[:retailerid] if length(retailer_id) == 0 println("No retailer_id provided ", @__FILE__, " ", @__LINE__) return nothing end sql = """ SELECT w.wine_id, w.winery, w.wine_name, w.vintage, w.grape, w.wine_type, w.region, w.country, w.created_time, w.updated_time, rw.price, rw.currency FROM wine w JOIN retailer_wine rw ON w.wine_id = rw.wine_id WHERE rw.retailer_id = '$retailer_id'; """ # sql = # """ # SELECT # w.wine_id, # w.wine_name, # w.winery, # w.region, # w.country, # w.wine_type, # w.grape, # w.serving_temperature, # w.intensity, # w.sweetness, # w.tannin, # w.acidity, # w.fizziness, # w.tasting_notes, # w.note, # w.other_attributes, # w.created_time, # w.updated_time, # w.description, # rw.vintage, # rw.price, # rw.currency # FROM # wine w # JOIN # retailer_wine rw ON w.wine_id = rw.wine_id # WHERE # rw.retailer_id = '$retailer_id'; # """ response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 return df else return nothing end end function insert_retailerWineInventory(args::Dict) tablename = "retailer_wine" columnToUpdate = listAllTableColumns(tablename) sql = GeneralUtils.generateInsertSQL(tablename, columnToUpdate, args) println("~~ sql ", sql) response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function delete_retailerWineInventory(args::Dict) tablename = "retailer_wine" sql = """ DELETE FROM $tablename WHERE retailer_id = '$(args[:retailer_id])' AND wine_id = '$(args[:wine_id])'; """ println("~~ sql ", sql) response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end function edit_retailerWineInventory(args::Dict) # result = delete_retailerWineInventory(args) # result = insert_retailerWineInventory(args) tablename = "retailer_wine" columnToUpdate = listAllTableColumns(tablename) sql = GeneralUtils.generateUpdateSQL(tablename, columnToUpdate, args, [:retailer_id, :wine_id]) println("~~ sql ", sql) response = executeSQL(sql) df = DataFrame(response) row, col = size(df) if row != 0 vd = GeneralUtils.dfToVectorDict(df) return vd else return nothing end end # ---------------------------------------------- 100 --------------------------------------------- # function runServiceInstance( receiveUserMsgChannel::Channel, outputchannel::Channel, config::Dict, timeout::Int64, ) workDict = Dict() latestUserMsgTimeStamp::DateTime = Dates.now() while true # check for new user message if isready(receiveUserMsgChannel) incomingMsg = take!(receiveUserMsgChannel) incomingPayload = incomingMsg[:payload] latestUserMsgTimeStamp = Dates.now() println("") println("<-- incomingMsg ", @__FILE__, " ", @__LINE__) println(incomingMsg) # sending msg back to sender i.e. LINE msgMeta = GeneralUtils.generate_msgMeta( incomingMsg[:msgMeta][:replyTopic]; senderName = "wine_assistant_backend_db", senderId= GeneralUtils.uuid4snakecase(), replyToMsgId= incomingMsg[:msgMeta][:msgId], mqttBrokerAddress= config[:mqttServerInfo][:broker], mqttBrokerPort= config[:mqttServerInfo][:port], ) # add other DB call function here if incomingPayload[:functioncall] == "search_masterWineTable" println("search_masterWineTable()") result = GeneralUtils.timeout(search_masterWineTable, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) # println("") # println("-~~~ outgoingMsg ", @__FILE__, " ", @__LINE__) # pprint(outgoingMsg) elseif incomingPayload[:functioncall] == "load_retailerWineInventory" println("load_retailerWineInventory()") # incomingMsg is requesting metadata by not having :dataTransferSessionID if !haskey(incomingPayload, :dataTransferSessionID) # load data df = load_retailerWineInventory(incomingPayload[:args]) vd = GeneralUtils.dfToVectorDict(df) disvd = GeneralUtils.disintegrate_vectorDict(vd, 100) result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg; data=disvd) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict{Symbol, Any}( :functioncall=> incomingPayload[:functioncall], ) ) for (k, v) in result outgoingMsg[:payload][k] = v end _ = GeneralUtils.sendMqttMsg(outgoingMsg) else # call dataTransferOverMQTT_sender result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict{Symbol, Any}( :functioncall=> incomingPayload[:functioncall], ) ) for (k, v) in result outgoingMsg[:payload][k] = v end _ = GeneralUtils.sendMqttMsg(outgoingMsg) end # elseif incomingPayload[:functioncall] == "load_retailerWineInventory" # result = GeneralUtils.timeout(load_retailerWineInventory, 30; # fargs=incomingPayload[:args]) # outgoingMsg = Dict( # :msgMeta=> msgMeta, # :payload=> Dict( # :functioncall=> incomingPayload[:functioncall], # :result=> result # ) # ) # _ = GeneralUtils.sendMqttMsg(outgoingMsg) elseif incomingPayload[:functioncall] == "insert_retailerWineInventory" println("insert_retailerWineInventory()") result = GeneralUtils.timeout(insert_retailerWineInventory, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) elseif incomingPayload[:functioncall] == "delete_retailerWineInventory" println("delete_retailerWineInventory()") retailer_id = incomingPayload[:args][:retailer_id] wine_id = incomingPayload[:args][:wine_id] if length(retailer_id) != 0 && length(wine_id) != 0 result = GeneralUtils.timeout(delete_retailerWineInventory, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) else println("Skipped call, insufficient args for delete_retailerWineInventory() retailer_id: $retailer_id wine_id: $wine_id") end elseif incomingPayload[:functioncall] == "edit_retailerWineInventory" println("edit_retailerWineInventory()") retailer_id = incomingPayload[:args][:retailer_id] wine_id = incomingPayload[:args][:wine_id] if length(retailer_id) != 0 && length(wine_id) != 0 result = GeneralUtils.timeout(edit_retailerWineInventory, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) else println("Skipped call, insufficient args for edit_retailerWineInventory() retailer_id: $retailer_id wine_id: $wine_id") end elseif incomingPayload[:functioncall] == "load_winetable" println("load_winetable()") # incomingMsg is requesting metadata by not having :dataTransferSessionID if !haskey(incomingPayload, :dataTransferSessionID) # load data df = load_winetable(incomingPayload[:args]) vd = GeneralUtils.dfToVectorDict(df) println(typeof(vd)) disvd = GeneralUtils.disintegrate_vectorDict(vd, 100) result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg; data=disvd) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict{Symbol, Any}( :functioncall=> incomingPayload[:functioncall], ) ) for (k, v) in result outgoingMsg[:payload][k] = v end _ = GeneralUtils.sendMqttMsg(outgoingMsg) pprintln(outgoingMsg) else # call dataTransferOverMQTT_sender result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict{Symbol, Any}( :functioncall=> incomingPayload[:functioncall], ) ) for (k, v) in result outgoingMsg[:payload][k] = v end _ = GeneralUtils.sendMqttMsg(outgoingMsg) end elseif incomingPayload[:functioncall] == "insert_masterWineDB" println("insert_masterWineDB()") result = GeneralUtils.timeout(insert_masterWineDB, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) elseif incomingPayload[:functioncall] == "delete_masterWineDB" result = GeneralUtils.timeout(delete_masterWineDB, 30; fargs=incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) elseif incomingPayload[:functioncall] == "edit_masterWineDB" println("edit_masterWineDB()") result = GeneralUtils.timeout(edit_masterWineDB, 30; fargs=incomingPayload[:args]) # result = edit_masterWineDB(incomingPayload[:args]) outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict( :functioncall=> incomingPayload[:functioncall], :result=> result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) else println("") println("~~~ The requested function is not defined ", @__FILE__, " ", @__LINE__) end end # self terminate if too long inactivity timediff = GeneralUtils.timedifference(latestUserMsgTimeStamp, Dates.now(), "minutes") if timediff > timeout result = Dict(:exitreason=>"timeout", :timestamp=>Dates.now()) put!(outputchannel, result) disconnect(a.mqttClient) break else sleep(1) # allowing on_msg_2, asyncmove above and other process to run end end end sessionDict = Dict{String,Any}() mqttMsgReceiveChannel = (ch1=Channel(8),) # store msg that coming into servicetopic keepaliveChannel::Channel{Dict} = Channel{Dict}(8) # Define the callback for receiving messages. function onMsgCallback_1(topic, payload) jobj = JSON3.read(String(payload)) incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively if occursin("db", topic) # println("~~~ incomingMqttMsg ", incomingMqttMsg) put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg) elseif occursin("keepalive", topic) put!(keepaliveChannel, incomingMqttMsg) else println("undefined condition ", @__FILE__, " ", @__LINE__) end end mqttInstance = GeneralUtils.mqttClientInstance_v2( config[:mqttServerInfo][:broker], config[:servicetopic][:mqtttopic], mqttMsgReceiveChannel, keepaliveChannel, onMsgCallback_1 ) println("ready!") # ------------------------------------------------------------------------------------------------ # # this service main loop # # ------------------------------------------------------------------------------------------------ # function main() sessiontimeout = 1*1*10 # timeout in minutes checkSessionTimeout = 10 # minutes clearedSessionTimestamp = Dates.now() lastMsgId = nothing while true # check if mqtt connection is still up _ = GeneralUtils.checkMqttConnection!(mqttInstance; keepaliveCheckInterval=30) # check for new session if isready(mqttMsgReceiveChannel[:ch1]) msg = popfirst!(mqttMsgReceiveChannel[:ch1]) # println("~~~ new msg ", msg[:payload]) # @spawn new runAgentInstance and store it in sessionDict # use agent's frontend id because 1 backend agent per 1 frontend session sessionId = msg[:msgMeta][:senderId] sessionId = replace(sessionId, "-" => "_") # julia can't use "-" in a dict key msgId = msg[:msgMeta][:msgId] if msgId != lastMsgId && sessionId ∉ keys(sessionDict) lastMsgId = msgId inputch = Channel{Dict}(8) outputch = Channel{Dict}(8) process = @spawn runServiceInstance(inputch, outputch, config, sessiontimeout) # process = runServiceInstance(inputch, outputch, config, sessiontimeout) #XXX use spawn version println("~~ instantiate session success") # call runAgentInstance() and store it in sessionDict to be able to check on it later sessionDict[sessionId] = Dict( :inputchannel=> inputch, :outputchannel=> outputch, :process=> process, ) put!(sessionDict[sessionId][:inputchannel], msg) else put!(sessionDict[sessionId][:inputchannel], msg) end end # check for process completed msg in serviceInternalTopic and delete it from sessionDict # self terminate if too long inactivity timediff = GeneralUtils.timedifference(clearedSessionTimestamp, Dates.now(), "minutes") if timediff > checkSessionTimeout for (sessionId, v) in sessionDict if isready(v[:outputchannel]) result = take!(v[:outputchannel]) if result[:exitreason] == "timeout" println("sessionId $(sessionId) has been deleted because it is timed out") delete!(sessionDict, sessionId) end end end clearedSessionTimestamp = Dates.now() end # sleep is needed because MQTTClient use async. while true loop leave no # chance for control to switch to on_msg() sleep(1) end end main()