using JSON, JSON3, Dates, UUIDs, PrettyPrinting, LibPQ, Base64, DataFrames, DataStructures using YiemAgent, GeneralUtils using Base.Threads # ---------------------------------------------- 100 --------------------------------------------- # """ Expected incomming MQTT message format for this service: { "msgMeta": { "msgPurpose": "updateStatus", "requestresponse": "request", "timestamp": "2024-03-29T05:8:48.362", "replyToMsgId": null, "receiverId": null, "getpost": "get", "msgId": "e5c09bd8-7100-4e4e-bb43-05bee589a22c", "acknowledgestatus": null, "sendTopic": "/agent/wine/backend/chat/api/v1/prompt", "receiverName": "agent-wine-backend", "replyTopic": "/agent/wine/frontend/chat/api/v1/txt/receive", "senderName": "agent-wine-frontend-chat", "senderId": "0938a757-e0ee-40a9-8355-5e24906a87cd" }, "payload" : { "text": "hello" } } """ # load config config = copy(JSON3.read("../mountvolume/config/config.json")) """ Instantiate an agent. One need to specify startmessage and one of gpu location info, Mqtt or Rest. start message must be comply with GeneralUtils's message format Arguments\n ----- channel::Channel communication channel sessionId::String sesstion ID of the agent agentName::String Name of the agent mqttBroker::String mqtt broker e.g. "tcp://127.0.0.1:1883" agentConfigTopic::String main communication topic for an agent to ask for config timeout::Int64 inactivity timeout in minutes. If timeout is reached, an agent will be terminated. Return\n ----- a task represent an agent Example\n ----- ```jldoctest julia> using YiemAgent, GeneralUtils julia> msg = GeneralUtils.generate_msgMeta("/agent") julia> incoming_msg = msg # assuming 1st msg was sent from other app julia> agentConfigTopic = "/agent/wine/backend/config" julia> task = runAgentInstance(incoming_msg, mqttBroker, agentConfigTopic, 60) ``` TODO\n ----- [] update docstringLAMA_CONTEXT_LENGTH=40960 since the default size is 2048 as you can see in your debug log: [] change how to get result of YiemAgent from let YiemAgent send msg directly to frontend, to response = YiemAgent.conversation() then send response to frontend Signature\n ----- """ function runAgentInstance( receiveUserMsgChannel::Channel, outputchannel::Channel, sessionId::String, config::Dict, timeout::Int64, ) function executeSQL(sql::T) where {T<:AbstractString} host = config[:externalservice][:wineDB][:host] port = config[:externalservice][:wineDB][:port] dbname = config[:externalservice][:wineDB][:dbname] user = config[:externalservice][:wineDB][:user] password = config[:externalservice][:wineDB][:password] DBconnection = LibPQ.Connection("host=$host port=$port dbname=$dbname user=$user password=$password") result = LibPQ.execute(DBconnection, sql) close(DBconnection) return result end function executeSQLVectorDB(sql) host = config[:externalservice][:SQLVectorDB][:host] port = config[:externalservice][:SQLVectorDB][:port] dbname = config[:externalservice][:SQLVectorDB][:dbname] user = config[:externalservice][:SQLVectorDB][:user] password = config[:externalservice][:SQLVectorDB][:password] DBconnection = LibPQ.Connection("host=$host port=$port dbname=$dbname user=$user password=$password") result = LibPQ.execute(DBconnection, sql) close(DBconnection) return result end function text2textInstructLLM(prompt::String; maxattempt::Integer=3, modelsize::String="medium", senderId=GeneralUtils.uuid4snakecase(), timeout=180, llmkwargs=Dict( :num_ctx => 32768, :temperature => 0.5, )) msgMeta = GeneralUtils.generate_msgMeta( config[:externalservice][:loadbalancer][:mqtttopic]; msgPurpose="inference", senderName="yiemagent", senderId=senderId, receiverName="text2textinstruct_$modelsize", mqttBrokerAddress=config[:mqttServerInfo][:broker], mqttBrokerPort=config[:mqttServerInfo][:port], ) outgoingMsg = Dict( :msgMeta => msgMeta, :payload => Dict( :text => prompt, :kwargs => llmkwargs ) ) response = nothing for attempts in 1:maxattempt _response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg; timeout=timeout, maxattempt=maxattempt) payload = _response[:response] if _response[:success] && payload[:text] !== nothing response = _response[:response][:text] break else println("\n attempt $attempts/$maxattempt failed ", @__FILE__, ":", @__LINE__, " $(Dates.now())") pprintln(outgoingMsg) println(" attempt $attempts/$maxattempt failed ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n") sleep(3) end end return response end # get text embedding from a LLM service function getEmbedding(text::T) where {T<:AbstractString} msgMeta = GeneralUtils.generate_msgMeta( config[:externalservice][:loadbalancer][:mqtttopic]; msgPurpose="embedding", senderName="yiemagent", senderId=sessionId, receiverName="textembedding", mqttBrokerAddress=config[:mqttServerInfo][:broker], mqttBrokerPort=config[:mqttServerInfo][:port], ) outgoingMsg = Dict( :msgMeta => msgMeta, :payload => Dict( :text => [text] # must be a vector of string ) ) response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg; timeout=120, maxattempt=3) embedding = response[:response][:embeddings] return embedding end function findSimilarTextFromVectorDB(text::T1, tablename::T2, embeddingColumnName::T3, vectorDB::Function; limit::Integer=1 )::DataFrame where {T1<:AbstractString, T2<:AbstractString, T3<:AbstractString} # get embedding from LLM service embedding = getEmbedding(text)[1] # check whether there is close enough vector already store in vectorDB. if no, add, else skip sql = """ SELECT *, $embeddingColumnName <-> '$embedding' as distance FROM $tablename ORDER BY distance LIMIT $limit; """ response = vectorDB(sql) df = DataFrame(response) return df end function similarSQLVectorDB(query; maxdistance::Integer=100) tablename = "sqlllm_decision_repository" # get embedding of the query df = findSimilarTextFromVectorDB(query, tablename, "function_input_embedding", executeSQLVectorDB) # println(df[1, [:id, :function_output]]) row, col = size(df) distance = row == 0 ? Inf : df[1, :distance] # distance = 100 # CHANGE this is for testing only if row != 0 && distance < maxdistance # if there is usable SQL, return it. output_b64 = df[1, :function_output_base64] # pick the closest match output_str = String(base64decode(output_b64)) rowid = df[1, :id] println("\n~~~ found similar sql. row id $rowid, distance $distance ", @__FILE__, ":", @__LINE__, " $(Dates.now())") return (dict=output_str, distance=distance) else println("\n~~~ similar sql not found, max distance $maxdistance ", @__FILE__, ":", @__LINE__, " $(Dates.now())") return (dict=nothing, distance=nothing) end end function insertSQLVectorDB(query::T1, SQL::T2; maxdistance::Integer=3) where {T1<:AbstractString, T2<:AbstractString} tablename = "sqlllm_decision_repository" # get embedding of the query # query = state[:thoughtHistory][:question] df = findSimilarTextFromVectorDB(query, tablename, "function_input_embedding", executeSQLVectorDB) row, col = size(df) distance = row == 0 ? Inf : df[1, :distance] if row == 0 || distance > maxdistance # no close enough SQL stored in the database query_embedding = getEmbedding(query)[1] query = replace(query, "'" => "") sql_base64 = base64encode(SQL) sql_ = replace(SQL, "'" => "") sql = """ INSERT INTO $tablename (function_input, function_output, function_output_base64, function_input_embedding) VALUES ('$query', '$sql_', '$sql_base64', '$query_embedding'); """ # println("\n~~~ added new decision to vectorDB ", @__FILE__, ":", @__LINE__, " $(Dates.now())") # println(sql) _ = executeSQLVectorDB(sql) end end function similarSommelierDecision(recentevents::T1; maxdistance::Integer=3 )::Union{AbstractDict, Nothing} where {T1<:AbstractString} tablename = "sommelier_decision_repository" # find similar println("\n~~~ search vectorDB for this: $recentevents ", @__FILE__, " ", @__LINE__) df = findSimilarTextFromVectorDB(recentevents, tablename, "function_input_embedding", executeSQLVectorDB) row, col = size(df) distance = row == 0 ? Inf : df[1, :distance] if row != 0 && distance < maxdistance # if there is usable decision, return it. rowid = df[1, :id] println("\n~~~ found similar decision. row id $rowid, distance $distance ", @__FILE__, " ", @__LINE__) output_b64 = df[1, :function_output_base64] # pick the closest match _output_str = String(base64decode(output_b64)) output = copy(JSON3.read(_output_str)) return output else println("\n~~~ similar decision not found, max distance $maxdistance ", @__FILE__, " ", @__LINE__) return nothing end end function insertSommelierDecision(recentevents::T1, decision::T2; maxdistance::Integer=5 ) where {T1<:AbstractString, T2<:AbstractDict} tablename = "sommelier_decision_repository" # find similar df = findSimilarTextFromVectorDB(recentevents, tablename, "function_input_embedding", executeSQLVectorDB) row, col = size(df) distance = row == 0 ? Inf : df[1, :distance] if row == 0 || distance > maxdistance # no close enough SQL stored in the database recentevents_embedding = getEmbedding(recentevents)[1] recentevents = replace(recentevents, "'" => "") decision_json = JSON3.write(decision) decision_base64 = base64encode(decision_json) decision = replace(decision_json, "'" => "") sql = """ INSERT INTO $tablename (function_input, function_output, function_output_base64, function_input_embedding) VALUES ('$recentevents', '$decision', '$decision_base64', '$recentevents_embedding'); """ println("\n~~~ added new decision to vectorDB ", @__FILE__, " ", @__LINE__) println(sql) _ = executeSQLVectorDB(sql) else println("~~~ similar decision previously cached, distance $distance ", @__FILE__, " ", @__LINE__) end end # keepaliveChannel_2::Channel{Dict} = Channel{Dict}(8) latestUserMsgTimeStamp::DateTime = Dates.now() externalFunction = ( getEmbedding=getEmbedding, text2textInstructLLM=text2textInstructLLM, executeSQL=executeSQL, similarSQLVectorDB=similarSQLVectorDB, insertSQLVectorDB=insertSQLVectorDB, similarSommelierDecision=similarSommelierDecision, insertSommelierDecision=insertSommelierDecision, ) agent = YiemAgent.sommelier( externalFunction; name="Jane", id=sessionId, # agent instance id retailername="Yiem", llmFormatName="qwen3" ) # user chat loop while true # check for new user message if isready(receiveUserMsgChannel) incomingMsg = take!(receiveUserMsgChannel) incoming_msgMeta = incomingMsg[:msgMeta] incomingPayload = incomingMsg[:payload] latestUserMsgTimeStamp = Dates.now() # make sure the message has :text key because YiemAgent use this key for incoming user msg if haskey(incomingPayload, :text) # skip, msg already has correct key name elseif haskey(incomingPayload, :txt) # change key name to text incomingPayload[:text] = incomingPayload[:txt] else error("\n no :txt or :text key in the message.") end # reset agent if occursin("newtopic", incomingPayload[:text]) || occursin("Newtopic", incomingPayload[:text]) || occursin("New topic", incomingPayload[:text]) || occursin("new topic", incomingPayload[:text]) # YiemAgent.clearhistory(agent) agent = YiemAgent.sommelier( externalFunction; name="Janie", id=sessionId, # agent instance id retailername="Yiem", ) # sending msg back to sender i.e. LINE msgMeta = GeneralUtils.generate_msgMeta( incomingMsg[:msgMeta][:replyTopic]; senderName="wine_assistant_backend", senderId=sessionId, replyToMsgId=incomingMsg[:msgMeta][:msgId], mqttBrokerAddress=config[:mqttServerInfo][:broker], mqttBrokerPort=config[:mqttServerInfo][:port], ) outgoingMsg = Dict( :msgMeta => msgMeta, :payload => Dict( :alias => agent.name, # will be shown in frontend as agent name :text => "Okay. What shall we talk about?" ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) println("--> outgoingMsg ", @__FILE__, ":", @__LINE__, " $(Dates.now())") pprintln(outgoingMsg) else usermsg = incomingPayload if incoming_msgMeta[:msgPurpose] == "initialize" println("\n-- Initializing... ", @__FILE__, ":", @__LINE__, " $(Dates.now())") end # send prompt result = YiemAgent.conversation(agent; userinput=usermsg, maximumMsg=50) # Ken's bot use [br] for newline character '\n' # result = replace(result, '\n'=>"[br]") if incoming_msgMeta[:msgPurpose] == "initialize" println("\n-- Initialized. Ready! waiting for request at:\n$(config[:servicetopic][:mqtttopic]) ", @__FILE__, ":", @__LINE__, " $(Dates.now())") continue end msgMeta = GeneralUtils.generate_msgMeta( incomingMsg[:msgMeta][:replyTopic]; senderName="wine_assistant_backend", senderId=string(uuid4()), replyToMsgId=incomingMsg[:msgMeta][:msgId], mqttBrokerAddress=config[:mqttServerInfo][:broker], mqttBrokerPort=config[:mqttServerInfo][:port], ) outgoingMsg = Dict( :msgMeta => msgMeta, :payload => Dict( :alias => agent.name, # will be shown in frontend as agent name :text => result ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) println("\n--> outgoingMsg ", @__FILE__, ":", @__LINE__, " $(Dates.now())") pprintln(outgoingMsg) # jpg_as_juliaStr = nothing # prompt = nothing # if haskey(payload, "img") # url_or_base64 = payload["img"] # if startswith(url_or_base64, "http") # # img in http # julia_rgb_img, cv2_bgr_img = ImageUtils.url_to_cv2_image(url_or_base64) # _, buffer = cv2.imencode(".jpg", cv2_bgr_img) # jpg_as_pyStr = base64.b64encode(buffer).decode("utf-8") # jpg_as_juliaStr = pyconvert(String, jpg_as_pyStr) # else # # img in base64 # cv2_bgr_img = payload["img"] # jpg_as_juliaStr = pyconvert(String, jpg_as_pyStr) # end # end end else # println("\n no msg") end if haskey(agent.memory[:events][end], :thought) lastAssistantAction = agent.memory[:events][end][:thought][:action_name] if lastAssistantAction == "ENDCONVERSATION" # store thoughtDict # save a.memory[:shortmem][:decisionlog] to disk using JSON3 println("\nsaving agent.memory[:shortmem][:decisionlog] to disk") filename = "agent_decision_log_$(Dates.now())_$(agent.id).json" filepath = "/appfolder/app/log/$filename" open(filepath, "w") do io JSON3.pretty(io, agent.memory[:shortmem][:decisionlog]) end # for (i, event) in enumerate(agent.memory[:events]) # if event[:subject] == "assistant" # # create timeline of the last 3 conversation except the last one. # # The former will be used as caching key and the latter will be the caching target # # in vector database # all_recapkeys = keys(agent.memory[:recap]) #[TESTING] recap as caching # all_recapkeys_vec = [r for r in all_recapkeys] # convert to a vector # # select from 1 to 2nd-to-lase event (i.e. excluding the latest which is assistant's response) # _recapkeys_vec = all_recapkeys_vec[1:i-1] # # select only previous 3 recaps # recapkeys_vec = # if length(_recapkeys_vec) <= 3 # 1st message is a user's hello msg # _recapkeys_vec # choose all # else # _recapkeys_vec[end-2:end] # end # #[PENDING] if there is specific data such as number, donot store in database # tempmem = DataStructures.OrderedDict() # for k in recapkeys_vec # tempmem[k] = agent.memory[:recap][k] # end # recap = GeneralUtils.dictToString_noKey(tempmem) # thoughtDict = agent.memory[:events][i][:thought] # latest assistant thoughtDict # insertSommelierDecision(recap, thoughtDict) # else # # skip # end # end println("\nCaching conversation process done") break end end # self terminate if too long inactivity timediff = GeneralUtils.timedifference(latestUserMsgTimeStamp, Dates.now(), "minutes") if timediff > timeout result = Dict(:exitreason => "timeout", :timestamp => Dates.now()) put!(outputchannel, result) println("Agent ID $(agent.id) timeout has been reached $timediff/$timeout minutes Send delete session msg ", @__FILE__, ":", @__LINE__, " $(Dates.now())") # send "delete session" message to inform the main loop that this session can be deleted sendto = if typeof(config[:servicetopic][:mqtttopic]) <: Array config[:servicetopic][:mqtttopic][1] else config[:servicetopic][:mqtttopic] end msgMeta = GeneralUtils.generate_msgMeta( sendto; senderName="session", senderId=sessionId, msgPurpose="delete session", mqttBrokerAddress=config[:mqttServerInfo][:broker], mqttBrokerPort=config[:mqttServerInfo][:port], ) outgoingMsg = Dict( :msgMeta => msgMeta, :payload => nothing ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) try disconnect(agent.mqttClient) catch end break end sleep(1) # allowing on_msg_2, asyncmove above and other process to run end end sessionDict = Dict{String,Any}() incomingMsgChannel = (ch1=Channel(8),) # store msg that coming into servicetopic # incommingInternalMsg = [] # st ore msg that coming into servicetopic internal management keepaliveChannel::Channel{Dict} = Channel{Dict}(8) # Define the callback for receiving messages. function onMsgCallback_1(topic, payload) jobj = JSON3.read(String(payload)) incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively if occursin("keepalive", topic) put!(keepaliveChannel, incomingMqttMsg) else put!(incomingMsgChannel[:ch1], incomingMqttMsg) end end mqttInstance = GeneralUtils.mqttClientInstance_v2( config[:mqttServerInfo][:broker], config[:servicetopic][:mqtttopic], incomingMsgChannel, keepaliveChannel, onMsgCallback_1 ) # ------------------------------------------------------------------------------------------------ # # this service main loop # # ------------------------------------------------------------------------------------------------ # function main() sessiontimeout = 1 * 1 * 60 # timeout in minute for each instance (day * hour * minute) initializing = false while true # check if mqtt connection is still up _ = GeneralUtils.checkMqttConnection!(mqttInstance; keepaliveCheckInterval=30) # initialize session 0 if initializing == false # send init msg sendto = if typeof(config[:servicetopic][:mqtttopic]) <: Array config[:servicetopic][:mqtttopic][1] else config[:servicetopic][:mqtttopic] end msgMeta = GeneralUtils.generate_msgMeta( sendto; msgPurpose="initialize", senderName="initializer", senderId="0", msgId= "initMsg", replyTopic=sendto, mqttBrokerAddress=config[:mqttServerInfo][:broker], mqttBrokerPort=config[:mqttServerInfo][:port], ) outgoingMsg = Dict( :msgMeta => msgMeta, :payload => Dict( # will be shown in frontend as agent name :text => "Do you have full-bodied red wines under 100 USD. I don't have any other preferences." ) ) _ = GeneralUtils.sendMqttMsg(outgoingMsg) initializing = true println("\n--> Initializing msg sent ", @__FILE__, ":", @__LINE__, " $(Dates.now())") end # check for new message if !isempty(incomingMsgChannel[:ch1]) msg = popfirst!(incomingMsgChannel[:ch1]) println("\n<-- incomingMsg ", @__FILE__, ":", @__LINE__, " $(Dates.now())") pprintln(msg) # @spawn new runAgentInstance and store it in sessionDict # use agent's frontend id because 1 backend agent per 1 frontend session sessionId = msg[:msgMeta][:senderId] sessionId = replace(sessionId, "-" => "_") # julia can't use "-" in a dict key # check for delete session msg if msg[:msgMeta][:msgPurpose] == "delete session" delete!(sessionDict, sessionId) println("sessionId $(sessionId) has been terminated ", @__FILE__, ":", @__LINE__, " $(Dates.now())") # no session yet, create new session elseif sessionId ∉ keys(sessionDict) inputch = Channel{Dict}(8) outputch = Channel{Dict}(8) process = @spawn runAgentInstance(inputch, outputch, sessionId, config, sessiontimeout) # process = runAgentInstance(inputch, outputch, sessionId, config, sessiontimeout) #XXX use spawn version println("\ninstantiate agent success ", @__FILE__, ":", @__LINE__, " $(Dates.now())") # call runAgentInstance() and store it in sessionDict to be able to check on it later sessionDict[sessionId] = Dict( :inputchannel => inputch, :outputchannel => outputch, :process => process, ) put!(sessionDict[sessionId][:inputchannel], msg) # ongoing session else println("sessionId $(sessionId) existing session ", @__FILE__, ":", @__LINE__, " $(Dates.now())") put!(sessionDict[sessionId][:inputchannel], msg) end end # sleep is needed because MQTTClient use async. "while true" loop leave no # chance for control to switch to on_msg() sleep(1) end end main()