Files
wine-db-api/app/main backup.jl
2026-01-26 07:14:29 +07:00

842 lines
23 KiB
Julia

using JSON3, MQTTClient, Dates, UUIDs, PrettyPrinting, LibPQ, Base64, DataFrames
using GeneralUtils
using Base.Threads
# ---------------------------------------------- 100 --------------------------------------------- #
""" Expected incomming MQTT message format for this service:
{
"msgMeta": {
"msgPurpose": "updateStatus",
"requestresponse": "request",
"timestamp": "2024-03-29T05:8:48.362",
"replyToMsgId": null,
"receiverId": null,
"getpost": "get",
"msgId": "e5c09bd8-7100-4e4e-bb43-05bee589a22c",
"acknowledgestatus": null,
"sendTopic": "/agent/wine/backend/chat/api/v1/prompt",
"receiverName": "agent-wine-backend",
"replyTopic": "/agent/wine/frontend/chat/api/v1/txt/receive",
"senderName": "agent-wine-frontend-chat",
"senderId": "0938a757-e0ee-40a9-8355-5e24906a87cd"
},
"payload" : {
"text": "hello"
}
}
"""
# load config
config = copy(JSON3.read("config.json"))
function executeSQL(sql::T) where {T<:AbstractString}
DBconnection = LibPQ.Connection("host=192.168.88.12 port=10201 dbname=wineDB user=yiemtechnologies password=yiemtechnologies@Postgres_0.0")
result = LibPQ.execute(DBconnection, sql)
LibPQ.close(DBconnection)
return result
end
# function executeSQLVectorDB(sql)
# DBconnection = LibPQ.Connection("host=192.168.88.12 port=5433 dbname=SQLVectorDB user=yiemtechnologies@gmail.com password=yiem@Postgres_0.0")
# result = LibPQ.execute(DBconnection, sql)
# LibPQ.close(DBconnection)
# return result
# end
# function addSQLVectorDB(state)
# # get embedding of the query
# query = [state[:thoughtHistory][:question]]
# msgMeta = GeneralUtils.generate_msgMeta(
# config[:externalservice][:text2textinstruct][:mqtttopic];
# msgPurpose= "embedding",
# senderName= "yiemagent",
# senderId= string(uuid4()),
# receiverName= "text2textinstruct",
# mqttBrokerAddress= config[:mqttServerInfo][:broker],
# mqttBrokerPort= config[:mqttServerInfo][:port],
# )
# outgoingMsg = Dict(
# :msgMeta=> msgMeta,
# :payload=> Dict(
# :text=> query
# )
# )
# response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
# embedding = response[:response][:embeddings][1]
# # check whether there is close enough vector already store in vectorDB. if no, add, else skip
# sql =
# """
# SELECT *, embedding <-> '$embedding' as distance
# FROM sql_statement_repository
# ORDER BY distance LIMIT 1;
# """
# response = executeSQLVectorDB(sql)
# df = DataFrame(response)
# row, col = size(df)
# distance = row == 0 ? Inf : df[1, :distance]
# if row == 0 || distance > 1 # no close enough SQL stored in the database
# latestKey, _ = GeneralUtils.findHighestIndexKey(state[:thoughtHistory], :action_input)
# _sqlStatement = state[:thoughtHistory][latestKey]
# if occursin("SELECT", _sqlStatement) # make sure it is an SQL statement before adding into DB
# sqlStatementBase64 = base64encode(_sqlStatement)
# sqlStatement = replace(_sqlStatement, "'"=>"")
# sql =
# """
# INSERT INTO sql_statement_repository (question, sql_statement, sql_statement_base64, embedding) VALUES ('$query', '$sqlStatement', '$sqlStatementBase64', '$embedding');
# """
# _ = executeSQLVectorDB(sql)
# println("~~ added new SQL statement to vectorDB ", @__FILE__, " ", @__LINE__)
# println(sqlStatement)
# end
# end
# end
# function querySQLVectorDB(state)
# # provide similarSQL at the first time thinking only
# latestKey, _ = GeneralUtils.findHighestIndexKey(state[:thoughtHistory], :action_input)
# if latestKey === nothing
# # get embedding of the query
# query = [state[:thoughtHistory][:question]]
# msgMeta = GeneralUtils.generate_msgMeta(
# config[:externalservice][:text2textinstruct][:mqtttopic];
# msgPurpose= "embedding",
# senderName= "yiemagent",
# senderId= string(uuid4()),
# receiverName= "text2textinstruct",
# mqttBrokerAddress= config[:mqttServerInfo][:broker],
# mqttBrokerPort= config[:mqttServerInfo][:port],
# )
# outgoingMsg = Dict(
# :msgMeta=> msgMeta,
# :payload=> Dict(
# :text=> query
# )
# )
# response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
# embedding = response[:response][:embeddings][1]
# # check whether there is close enough vector already store in vectorDB. if no, add, else skip
# sql =
# """
# SELECT *, embedding <-> '$embedding' as distance
# FROM sql_statement_repository
# ORDER BY distance LIMIT 1;
# """
# response = executeSQLVectorDB(sql)
# df = DataFrame(response)
# row, col = size(df)
# distance = row == 0 ? Inf : df[1, :distance]
# if row != 0 && distance < 100
# # if there is usable SQL, return it.
# sqlStatementBase64 = df[1, :sql_statement_base64]
# sqlStatement = String(base64decode(sqlStatementBase64))
# println("~~~ getting SQL statement from vectorDB ", @__FILE__, " ", @__LINE__)
# println(sqlStatement)
# return sqlStatement
# else
# return nothing
# end
# end
# return nothing
# end
function listAllTableColumns(tablename::String)
sql =
"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = '$tablename';
"""
response = executeSQL(sql)
df = DataFrame(response)
return Symbol.(df[:, 1])
end
function load_winetable(args::Dict)
tablename = "wine"
sql =
"""
SELECT *
FROM $tablename;
"""
response = executeSQL(sql)
df = DataFrame(response)
row, col = size(df)
if row != 0
return df
else
return nothing
end
end
function insert_masterWineDB(args::Dict)
tablename = "wine"
columnToUpdate = listAllTableColumns(tablename)
sql = GeneralUtils.generateInsertSQL(tablename, columnToUpdate, args)
println("insert_masterWineDB() SQL: $sql")
response = executeSQL(sql)
df = DataFrame(response)
row, col = size(df)
if row != 0
vd = GeneralUtils.dfToVectorDict(df)
return vd
else
return nothing
end
end
function delete_masterWineDB(args::Dict)
tablename = "wine"
sql =
"""
DELETE FROM $tablename
WHERE wine_id = '$(args[:wine_id])';
"""
println("delete_masterWineDB() SQL: $sql")
response = executeSQL(sql)
df = DataFrame(response)
row, col = size(df)
if row != 0
vd = GeneralUtils.dfToVectorDict(df)
return vd
else
return nothing
end
end
function edit_masterWineDB(args::Dict)
tablename = "wine"
columnToUpdate = listAllTableColumns(tablename)
sql = GeneralUtils.generateUpdateSQL(tablename, columnToUpdate, args, [:wine_id])
println("")
println("edit_masterWineDB() SQL: $sql")
response = executeSQL(sql)
df = DataFrame(response)
row, col = size(df)
if row != 0
vd = GeneralUtils.dfToVectorDict(df)
return vd
else
return nothing
end
end
function search_masterWineTable(args::Dict)
tablename = "wine"
_searchkeyword = args[:searchkeyword]
searchkeyword1 = split(_searchkeyword, "'") # postgres not support search keyword containing '
searchkeyword_length = length.(searchkeyword1)
_, searchkeywordIndex, _ = GeneralUtils.findMax(searchkeyword_length)
searchkeyword = searchkeyword1[searchkeywordIndex]
columnname = args[:searchcolumn]
sql =
if searchkeyword == "*"
"""
SELECT *
FROM $tablename;
"""
else
"""
SELECT *
FROM $tablename
WHERE $columnname ILIKE '%$searchkeyword%' LIMIT 100;
"""
end
println("~~~ sql ", sql)
response = executeSQL(sql)
df = DataFrame(response)
row, col = size(df)
if row != 0
vd = GeneralUtils.dfToVectorDict(df)
return vd
else
return nothing
end
end
function load_retailerWineInventory(args::Dict)
retailer_id = args[:retailerid]
if length(retailer_id) == 0
println("No retailer_id provided ", @__FILE__, " ", @__LINE__)
return nothing
end
sql =
"""
SELECT
w.wine_id,
w.winery,
w.wine_name,
w.vintage,
w.grape,
w.wine_type,
w.region,
w.country,
w.created_time,
w.updated_time,
rw.price,
rw.currency
FROM
wine w
JOIN
retailer_wine rw ON w.wine_id = rw.wine_id
WHERE
rw.retailer_id = '$retailer_id';
"""
# sql =
# """
# SELECT
# w.wine_id,
# w.wine_name,
# w.winery,
# w.region,
# w.country,
# w.wine_type,
# w.grape,
# w.serving_temperature,
# w.intensity,
# w.sweetness,
# w.tannin,
# w.acidity,
# w.fizziness,
# w.tasting_notes,
# w.note,
# w.other_attributes,
# w.created_time,
# w.updated_time,
# w.description,
# rw.vintage,
# rw.price,
# rw.currency
# FROM
# wine w
# JOIN
# retailer_wine rw ON w.wine_id = rw.wine_id
# WHERE
# rw.retailer_id = '$retailer_id';
# """
response = executeSQL(sql)
df = DataFrame(response)
row, col = size(df)
if row != 0
return df
else
return nothing
end
end
function insert_retailerWineInventory(args::Dict)
tablename = "retailer_wine"
columnToUpdate = listAllTableColumns(tablename)
sql = GeneralUtils.generateInsertSQL(tablename, columnToUpdate, args)
println("~~ sql ", sql)
response = executeSQL(sql)
df = DataFrame(response)
row, col = size(df)
if row != 0
vd = GeneralUtils.dfToVectorDict(df)
return vd
else
return nothing
end
end
function delete_retailerWineInventory(args::Dict)
tablename = "retailer_wine"
sql =
"""
DELETE FROM $tablename
WHERE retailer_id = '$(args[:retailer_id])' AND wine_id = '$(args[:wine_id])';
"""
println("~~ sql ", sql)
response = executeSQL(sql)
df = DataFrame(response)
row, col = size(df)
if row != 0
vd = GeneralUtils.dfToVectorDict(df)
return vd
else
return nothing
end
end
function edit_retailerWineInventory(args::Dict)
# result = delete_retailerWineInventory(args)
# result = insert_retailerWineInventory(args)
tablename = "retailer_wine"
columnToUpdate = listAllTableColumns(tablename)
sql = GeneralUtils.generateUpdateSQL(tablename, columnToUpdate, args, [:retailer_id, :wine_id])
println("~~ sql ", sql)
response = executeSQL(sql)
df = DataFrame(response)
row, col = size(df)
if row != 0
vd = GeneralUtils.dfToVectorDict(df)
return vd
else
return nothing
end
end
# ---------------------------------------------- 100 --------------------------------------------- #
function runServiceInstance(
receiveUserMsgChannel::Channel,
outputchannel::Channel,
config::Dict,
timeout::Int64,
)
workDict = Dict()
latestUserMsgTimeStamp::DateTime = Dates.now()
while true
# check for new user message
if isready(receiveUserMsgChannel)
incomingMsg = take!(receiveUserMsgChannel)
incomingPayload = incomingMsg[:payload]
latestUserMsgTimeStamp = Dates.now()
println("")
println("<-- incomingMsg ", @__FILE__, " ", @__LINE__)
println(incomingMsg)
# sending msg back to sender i.e. LINE
msgMeta = GeneralUtils.generate_msgMeta(
incomingMsg[:msgMeta][:replyTopic];
senderName = "wine_assistant_backend_db",
senderId= GeneralUtils.uuid4snakecase(),
replyToMsgId= incomingMsg[:msgMeta][:msgId],
mqttBrokerAddress= config[:mqttServerInfo][:broker],
mqttBrokerPort= config[:mqttServerInfo][:port],
)
# add other DB call function here
if incomingPayload[:functioncall] == "search_masterWineTable"
println("search_masterWineTable()")
result = GeneralUtils.timeout(search_masterWineTable, 30;
fargs=incomingPayload[:args])
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:functioncall=> incomingPayload[:functioncall],
:result=> result
)
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
# println("")
# println("-~~~ outgoingMsg ", @__FILE__, " ", @__LINE__)
# pprint(outgoingMsg)
elseif incomingPayload[:functioncall] == "load_retailerWineInventory"
println("load_retailerWineInventory()")
# incomingMsg is requesting metadata by not having :dataTransferSessionID
if !haskey(incomingPayload, :dataTransferSessionID)
# load data
df = load_retailerWineInventory(incomingPayload[:args])
vd = GeneralUtils.dfToVectorDict(df)
disvd = GeneralUtils.disintegrate_vectorDict(vd, 100)
result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg; data=disvd)
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict{Symbol, Any}(
:functioncall=> incomingPayload[:functioncall],
)
)
for (k, v) in result
outgoingMsg[:payload][k] = v
end
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
else
# call dataTransferOverMQTT_sender
result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg)
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict{Symbol, Any}(
:functioncall=> incomingPayload[:functioncall],
)
)
for (k, v) in result
outgoingMsg[:payload][k] = v
end
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
end
# elseif incomingPayload[:functioncall] == "load_retailerWineInventory"
# result = GeneralUtils.timeout(load_retailerWineInventory, 30;
# fargs=incomingPayload[:args])
# outgoingMsg = Dict(
# :msgMeta=> msgMeta,
# :payload=> Dict(
# :functioncall=> incomingPayload[:functioncall],
# :result=> result
# )
# )
# _ = GeneralUtils.sendMqttMsg(outgoingMsg)
elseif incomingPayload[:functioncall] == "insert_retailerWineInventory"
println("insert_retailerWineInventory()")
result = GeneralUtils.timeout(insert_retailerWineInventory, 30;
fargs=incomingPayload[:args])
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:functioncall=> incomingPayload[:functioncall],
:result=> result
)
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
elseif incomingPayload[:functioncall] == "delete_retailerWineInventory"
println("delete_retailerWineInventory()")
retailer_id = incomingPayload[:args][:retailer_id]
wine_id = incomingPayload[:args][:wine_id]
if length(retailer_id) != 0 && length(wine_id) != 0
result = GeneralUtils.timeout(delete_retailerWineInventory, 30;
fargs=incomingPayload[:args])
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:functioncall=> incomingPayload[:functioncall],
:result=> result
)
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
else
println("Skipped call, insufficient args for delete_retailerWineInventory() retailer_id: $retailer_id wine_id: $wine_id")
end
elseif incomingPayload[:functioncall] == "edit_retailerWineInventory"
println("edit_retailerWineInventory()")
retailer_id = incomingPayload[:args][:retailer_id]
wine_id = incomingPayload[:args][:wine_id]
if length(retailer_id) != 0 && length(wine_id) != 0
result = GeneralUtils.timeout(edit_retailerWineInventory, 30;
fargs=incomingPayload[:args])
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:functioncall=> incomingPayload[:functioncall],
:result=> result
)
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
else
println("Skipped call, insufficient args for edit_retailerWineInventory() retailer_id: $retailer_id wine_id: $wine_id")
end
elseif incomingPayload[:functioncall] == "load_winetable"
println("load_winetable()")
# incomingMsg is requesting metadata by not having :dataTransferSessionID
if !haskey(incomingPayload, :dataTransferSessionID)
# load data
df = load_winetable(incomingPayload[:args])
vd = GeneralUtils.dfToVectorDict(df)
println(typeof(vd))
disvd = GeneralUtils.disintegrate_vectorDict(vd, 100)
result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg; data=disvd)
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict{Symbol, Any}(
:functioncall=> incomingPayload[:functioncall],
)
)
for (k, v) in result
outgoingMsg[:payload][k] = v
end
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
pprintln(outgoingMsg)
else
# call dataTransferOverMQTT_sender
result = GeneralUtils.dataTransferOverMQTT_sender(workDict, incomingMsg)
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict{Symbol, Any}(
:functioncall=> incomingPayload[:functioncall],
)
)
for (k, v) in result
outgoingMsg[:payload][k] = v
end
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
end
elseif incomingPayload[:functioncall] == "insert_masterWineDB"
println("insert_masterWineDB()")
result = GeneralUtils.timeout(insert_masterWineDB, 30;
fargs=incomingPayload[:args])
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:functioncall=> incomingPayload[:functioncall],
:result=> result
)
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
elseif incomingPayload[:functioncall] == "delete_masterWineDB"
result = GeneralUtils.timeout(delete_masterWineDB, 30;
fargs=incomingPayload[:args])
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:functioncall=> incomingPayload[:functioncall],
:result=> result
)
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
elseif incomingPayload[:functioncall] == "edit_masterWineDB"
println("edit_masterWineDB()")
result = GeneralUtils.timeout(edit_masterWineDB, 30;
fargs=incomingPayload[:args])
# result = edit_masterWineDB(incomingPayload[:args])
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:functioncall=> incomingPayload[:functioncall],
:result=> result
)
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
else
println("")
println("~~~ The requested function is not defined ", @__FILE__, " ", @__LINE__)
end
end
# self terminate if too long inactivity
timediff = GeneralUtils.timedifference(latestUserMsgTimeStamp, Dates.now(), "minutes")
if timediff > timeout
result = Dict(:exitreason=>"timeout", :timestamp=>Dates.now())
put!(outputchannel, result)
disconnect(a.mqttClient)
break
else
sleep(1) # allowing on_msg_2, asyncmove above and other process to run
end
end
end
sessionDict = Dict{String,Any}()
mqttMsgReceiveChannel = (ch1=Channel(8),) # store msg that coming into servicetopic
keepaliveChannel::Channel{Dict} = Channel{Dict}(8)
# Define the callback for receiving messages.
function onMsgCallback_1(topic, payload)
jobj = JSON3.read(String(payload))
incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively
if occursin("db", topic)
# println("~~~ incomingMqttMsg ", incomingMqttMsg)
put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
elseif occursin("keepalive", topic)
put!(keepaliveChannel, incomingMqttMsg)
else
println("undefined condition ", @__FILE__, " ", @__LINE__)
end
end
mqttInstance = GeneralUtils.mqttClientInstance_v2(
config[:mqttServerInfo][:broker],
config[:servicetopic][:mqtttopic],
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback_1
)
println("ready!")
# ------------------------------------------------------------------------------------------------ #
# this service main loop #
# ------------------------------------------------------------------------------------------------ #
function main()
sessiontimeout = 1*1*10 # timeout in minutes
checkSessionTimeout = 10 # minutes
clearedSessionTimestamp = Dates.now()
lastMsgId = nothing
while true
# check if mqtt connection is still up
_ = GeneralUtils.checkMqttConnection!(mqttInstance; keepaliveCheckInterval=30)
# check for new session
if isready(mqttMsgReceiveChannel[:ch1])
msg = popfirst!(mqttMsgReceiveChannel[:ch1])
# println("~~~ new msg ", msg[:payload])
# @spawn new runAgentInstance and store it in sessionDict
# use agent's frontend id because 1 backend agent per 1 frontend session
sessionId = msg[:msgMeta][:senderId]
sessionId = replace(sessionId, "-" => "_") # julia can't use "-" in a dict key
msgId = msg[:msgMeta][:msgId]
if msgId != lastMsgId && sessionId keys(sessionDict)
lastMsgId = msgId
inputch = Channel{Dict}(8)
outputch = Channel{Dict}(8)
process = @spawn runServiceInstance(inputch, outputch, config, sessiontimeout)
# process = runServiceInstance(inputch, outputch, config, sessiontimeout) #XXX use spawn version
println("~~ instantiate session success")
# call runAgentInstance() and store it in sessionDict to be able to check on it later
sessionDict[sessionId] = Dict(
:inputchannel=> inputch,
:outputchannel=> outputch,
:process=> process,
)
put!(sessionDict[sessionId][:inputchannel], msg)
else
put!(sessionDict[sessionId][:inputchannel], msg)
end
end
# check for process completed msg in serviceInternalTopic and delete it from sessionDict
# self terminate if too long inactivity
timediff = GeneralUtils.timedifference(clearedSessionTimestamp, Dates.now(), "minutes")
if timediff > checkSessionTimeout
for (sessionId, v) in sessionDict
if isready(v[:outputchannel])
result = take!(v[:outputchannel])
if result[:exitreason] == "timeout"
println("sessionId $(sessionId) has been deleted because it is timed out")
delete!(sessionDict, sessionId)
end
end
end
clearedSessionTimestamp = Dates.now()
end
# sleep is needed because MQTTClient use async. while true loop leave no
# chance for control to switch to on_msg()
sleep(1)
end
end
main()