add example

This commit is contained in:
narawat lamaiin
2025-06-17 12:52:00 +07:00
parent e0dc7d29b2
commit 5d552d96c4
4 changed files with 1367 additions and 0 deletions

706
example/main.jl Normal file
View File

@@ -0,0 +1,706 @@
using JSON, JSON3, Dates, UUIDs, PrettyPrinting, LibPQ, Base64, DataFrames, DataStructures
using YiemAgent, GeneralUtils
using Base.Threads
# ---------------------------------------------- 100 --------------------------------------------- #
""" Expected incomming MQTT message format for this service:
{
"msgMeta": {
"msgPurpose": "updateStatus",
"requestresponse": "request",
"timestamp": "2024-03-29T05:8:48.362",
"replyToMsgId": null,
"receiverId": null,
"getpost": "get",
"msgId": "e5c09bd8-7100-4e4e-bb43-05bee589a22c",
"acknowledgestatus": null,
"sendTopic": "/agent/wine/backend/chat/api/v1/prompt",
"receiverName": "agent-wine-backend",
"replyTopic": "/agent/wine/frontend/chat/api/v1/txt/receive",
"senderName": "agent-wine-frontend-chat",
"senderId": "0938a757-e0ee-40a9-8355-5e24906a87cd"
},
"payload" : {
"text": "hello"
}
}
"""
# load config
config = copy(JSON3.read("../mountvolume/config/config.json"))
""" Instantiate an agent. One need to specify startmessage and one of gpu location info,
Mqtt or Rest. start message must be comply with GeneralUtils's message format
Arguments\n
-----
channel::Channel
communication channel
sessionId::String
sesstion ID of the agent
agentName::String
Name of the agent
mqttBroker::String
mqtt broker e.g. "tcp://127.0.0.1:1883"
agentConfigTopic::String
main communication topic for an agent to ask for config
timeout::Int64
inactivity timeout in minutes. If timeout is reached, an agent will be terminated.
Return\n
-----
a task represent an agent
Example\n
-----
```jldoctest
julia> using YiemAgent, GeneralUtils
julia> msg = GeneralUtils.generate_msgMeta("/agent")
julia> incoming_msg = msg # assuming 1st msg was sent from other app
julia> agentConfigTopic = "/agent/wine/backend/config"
julia> task = runAgentInstance(incoming_msg, mqttBroker, agentConfigTopic, 60)
```
TODO\n
-----
[] update docstringLAMA_CONTEXT_LENGTH=40960 since the default size is 2048 as you can see in your debug log:
[] change how to get result of YiemAgent from let YiemAgent send msg directly to frontend,
to
response = YiemAgent.conversation()
then send response to frontend
Signature\n
-----
"""
function runAgentInstance(
receiveUserMsgChannel::Channel,
outputchannel::Channel,
sessionId::String,
config::Dict,
timeout::Int64,
)
function executeSQL(sql::T) where {T<:AbstractString}
host = config[:externalservice][:wineDB][:host]
port = config[:externalservice][:wineDB][:port]
dbname = config[:externalservice][:wineDB][:dbname]
user = config[:externalservice][:wineDB][:user]
password = config[:externalservice][:wineDB][:password]
DBconnection = LibPQ.Connection("host=$host port=$port dbname=$dbname user=$user password=$password")
result = LibPQ.execute(DBconnection, sql)
close(DBconnection)
return result
end
function executeSQLVectorDB(sql)
host = config[:externalservice][:SQLVectorDB][:host]
port = config[:externalservice][:SQLVectorDB][:port]
dbname = config[:externalservice][:SQLVectorDB][:dbname]
user = config[:externalservice][:SQLVectorDB][:user]
password = config[:externalservice][:SQLVectorDB][:password]
DBconnection = LibPQ.Connection("host=$host port=$port dbname=$dbname user=$user password=$password")
result = LibPQ.execute(DBconnection, sql)
close(DBconnection)
return result
end
function text2textInstructLLM(prompt::String; maxattempt::Integer=3, modelsize::String="medium",
senderId=GeneralUtils.uuid4snakecase(), timeout=180,
llmkwargs=Dict(
:num_ctx => 32768,
:temperature => 0.5,
))
msgMeta = GeneralUtils.generate_msgMeta(
config[:externalservice][:loadbalancer][:mqtttopic];
msgPurpose="inference",
senderName="yiemagent",
senderId=senderId,
receiverName="text2textinstruct_$modelsize",
mqttBrokerAddress=config[:mqttServerInfo][:broker],
mqttBrokerPort=config[:mqttServerInfo][:port],
)
outgoingMsg = Dict(
:msgMeta => msgMeta,
:payload => Dict(
:text => prompt,
:kwargs => llmkwargs
)
)
response = nothing
for attempts in 1:maxattempt
_response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg; timeout=timeout, maxattempt=maxattempt)
payload = _response[:response]
if _response[:success] && payload[:text] !== nothing
response = _response[:response][:text]
break
else
println("\n<text2textInstructLLM()> attempt $attempts/$maxattempt failed ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
pprintln(outgoingMsg)
println("</text2textInstructLLM()> attempt $attempts/$maxattempt failed ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n")
sleep(3)
end
end
return response
end
# get text embedding from a LLM service
function getEmbedding(text::T) where {T<:AbstractString}
msgMeta = GeneralUtils.generate_msgMeta(
config[:externalservice][:loadbalancer][:mqtttopic];
msgPurpose="embedding",
senderName="yiemagent",
senderId=sessionId,
receiverName="textembedding",
mqttBrokerAddress=config[:mqttServerInfo][:broker],
mqttBrokerPort=config[:mqttServerInfo][:port],
)
outgoingMsg = Dict(
:msgMeta => msgMeta,
:payload => Dict(
:text => [text] # must be a vector of string
)
)
response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg; timeout=120, maxattempt=3)
embedding = response[:response][:embeddings]
return embedding
end
function findSimilarTextFromVectorDB(text::T1, tablename::T2, embeddingColumnName::T3,
vectorDB::Function; limit::Integer=1
)::DataFrame where {T1<:AbstractString, T2<:AbstractString, T3<:AbstractString}
# get embedding from LLM service
embedding = getEmbedding(text)[1]
# check whether there is close enough vector already store in vectorDB. if no, add, else skip
sql = """
SELECT *, $embeddingColumnName <-> '$embedding' as distance
FROM $tablename
ORDER BY distance LIMIT $limit;
"""
response = vectorDB(sql)
df = DataFrame(response)
return df
end
function similarSQLVectorDB(query; maxdistance::Integer=100)
tablename = "sqlllm_decision_repository"
# get embedding of the query
df = findSimilarTextFromVectorDB(query, tablename,
"function_input_embedding", executeSQLVectorDB)
# println(df[1, [:id, :function_output]])
row, col = size(df)
distance = row == 0 ? Inf : df[1, :distance]
# distance = 100 # CHANGE this is for testing only
if row != 0 && distance < maxdistance
# if there is usable SQL, return it.
output_b64 = df[1, :function_output_base64] # pick the closest match
output_str = String(base64decode(output_b64))
rowid = df[1, :id]
println("\n~~~ found similar sql. row id $rowid, distance $distance ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
return (dict=output_str, distance=distance)
else
println("\n~~~ similar sql not found, max distance $maxdistance ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
return (dict=nothing, distance=nothing)
end
end
function insertSQLVectorDB(query::T1, SQL::T2; maxdistance::Integer=3) where {T1<:AbstractString, T2<:AbstractString}
tablename = "sqlllm_decision_repository"
# get embedding of the query
# query = state[:thoughtHistory][:question]
df = findSimilarTextFromVectorDB(query, tablename,
"function_input_embedding", executeSQLVectorDB)
row, col = size(df)
distance = row == 0 ? Inf : df[1, :distance]
if row == 0 || distance > maxdistance # no close enough SQL stored in the database
query_embedding = getEmbedding(query)[1]
query = replace(query, "'" => "")
sql_base64 = base64encode(SQL)
sql_ = replace(SQL, "'" => "")
sql = """
INSERT INTO $tablename (function_input, function_output, function_output_base64, function_input_embedding) VALUES ('$query', '$sql_', '$sql_base64', '$query_embedding');
"""
# println("\n~~~ added new decision to vectorDB ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
# println(sql)
_ = executeSQLVectorDB(sql)
end
end
function similarSommelierDecision(recentevents::T1; maxdistance::Integer=3
)::Union{AbstractDict, Nothing} where {T1<:AbstractString}
tablename = "sommelier_decision_repository"
# find similar
println("\n~~~ search vectorDB for this: $recentevents ", @__FILE__, " ", @__LINE__)
df = findSimilarTextFromVectorDB(recentevents, tablename,
"function_input_embedding", executeSQLVectorDB)
row, col = size(df)
distance = row == 0 ? Inf : df[1, :distance]
if row != 0 && distance < maxdistance
# if there is usable decision, return it.
rowid = df[1, :id]
println("\n~~~ found similar decision. row id $rowid, distance $distance ", @__FILE__, " ", @__LINE__)
output_b64 = df[1, :function_output_base64] # pick the closest match
_output_str = String(base64decode(output_b64))
output = copy(JSON3.read(_output_str))
return output
else
println("\n~~~ similar decision not found, max distance $maxdistance ", @__FILE__, " ", @__LINE__)
return nothing
end
end
function insertSommelierDecision(recentevents::T1, decision::T2; maxdistance::Integer=5
) where {T1<:AbstractString, T2<:AbstractDict}
tablename = "sommelier_decision_repository"
# find similar
df = findSimilarTextFromVectorDB(recentevents, tablename,
"function_input_embedding", executeSQLVectorDB)
row, col = size(df)
distance = row == 0 ? Inf : df[1, :distance]
if row == 0 || distance > maxdistance # no close enough SQL stored in the database
recentevents_embedding = getEmbedding(recentevents)[1]
recentevents = replace(recentevents, "'" => "")
decision_json = JSON3.write(decision)
decision_base64 = base64encode(decision_json)
decision = replace(decision_json, "'" => "")
sql =
"""
INSERT INTO $tablename (function_input, function_output, function_output_base64, function_input_embedding) VALUES ('$recentevents', '$decision', '$decision_base64', '$recentevents_embedding');
"""
println("\n~~~ added new decision to vectorDB ", @__FILE__, " ", @__LINE__)
println(sql)
_ = executeSQLVectorDB(sql)
else
println("~~~ similar decision previously cached, distance $distance ", @__FILE__, " ", @__LINE__)
end
end
# keepaliveChannel_2::Channel{Dict} = Channel{Dict}(8)
latestUserMsgTimeStamp::DateTime = Dates.now()
externalFunction = (
getEmbedding=getEmbedding,
text2textInstructLLM=text2textInstructLLM,
executeSQL=executeSQL,
similarSQLVectorDB=similarSQLVectorDB,
insertSQLVectorDB=insertSQLVectorDB,
similarSommelierDecision=similarSommelierDecision,
insertSommelierDecision=insertSommelierDecision,
)
agent = YiemAgent.sommelier(
externalFunction;
name="Jane",
id=sessionId, # agent instance id
retailername="Yiem",
llmFormatName="qwen3"
)
# user chat loop
while true
# check for new user message
if isready(receiveUserMsgChannel)
incomingMsg = take!(receiveUserMsgChannel)
incoming_msgMeta = incomingMsg[:msgMeta]
incomingPayload = incomingMsg[:payload]
latestUserMsgTimeStamp = Dates.now()
# make sure the message has :text key because YiemAgent use this key for incoming user msg
if haskey(incomingPayload, :text)
# skip, msg already has correct key name
elseif haskey(incomingPayload, :txt)
# change key name to text
incomingPayload[:text] = incomingPayload[:txt]
else
error("\n no :txt or :text key in the message.")
end
# reset agent
if occursin("newtopic", incomingPayload[:text]) ||
occursin("Newtopic", incomingPayload[:text]) ||
occursin("New topic", incomingPayload[:text]) ||
occursin("new topic", incomingPayload[:text])
# YiemAgent.clearhistory(agent)
agent = YiemAgent.sommelier(
externalFunction;
name="Janie",
id=sessionId, # agent instance id
retailername="Yiem",
)
# sending msg back to sender i.e. LINE
msgMeta = GeneralUtils.generate_msgMeta(
incomingMsg[:msgMeta][:replyTopic];
senderName="wine_assistant_backend",
senderId=sessionId,
replyToMsgId=incomingMsg[:msgMeta][:msgId],
mqttBrokerAddress=config[:mqttServerInfo][:broker],
mqttBrokerPort=config[:mqttServerInfo][:port],
)
outgoingMsg = Dict(
:msgMeta => msgMeta,
:payload => Dict(
:alias => agent.name, # will be shown in frontend as agent name
:text => "Okay. What shall we talk about?"
)
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
println("--> outgoingMsg ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
pprintln(outgoingMsg)
else
usermsg = incomingPayload
if incoming_msgMeta[:msgPurpose] == "initialize"
println("\n-- Initializing... ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
end
# send prompt
result = YiemAgent.conversation(agent;
userinput=usermsg,
maximumMsg=50)
# Ken's bot use [br] for newline character '\n'
# result = replace(result, '\n'=>"[br]")
if incoming_msgMeta[:msgPurpose] == "initialize"
println("\n-- Initialized. Ready! waiting for request at:\n$(config[:servicetopic][:mqtttopic]) ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
continue
end
msgMeta = GeneralUtils.generate_msgMeta(
incomingMsg[:msgMeta][:replyTopic];
senderName="wine_assistant_backend",
senderId=string(uuid4()),
replyToMsgId=incomingMsg[:msgMeta][:msgId],
mqttBrokerAddress=config[:mqttServerInfo][:broker],
mqttBrokerPort=config[:mqttServerInfo][:port],
)
outgoingMsg = Dict(
:msgMeta => msgMeta,
:payload => Dict(
:alias => agent.name, # will be shown in frontend as agent name
:text => result
)
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
println("\n--> outgoingMsg ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
pprintln(outgoingMsg)
# jpg_as_juliaStr = nothing
# prompt = nothing
# if haskey(payload, "img")
# url_or_base64 = payload["img"]
# if startswith(url_or_base64, "http")
# # img in http
# julia_rgb_img, cv2_bgr_img = ImageUtils.url_to_cv2_image(url_or_base64)
# _, buffer = cv2.imencode(".jpg", cv2_bgr_img)
# jpg_as_pyStr = base64.b64encode(buffer).decode("utf-8")
# jpg_as_juliaStr = pyconvert(String, jpg_as_pyStr)
# else
# # img in base64
# cv2_bgr_img = payload["img"]
# jpg_as_juliaStr = pyconvert(String, jpg_as_pyStr)
# end
# end
end
else
# println("\n no msg")
end
if haskey(agent.memory[:events][end], :thought)
lastAssistantAction = agent.memory[:events][end][:thought][:action_name]
if lastAssistantAction == "ENDCONVERSATION" # store thoughtDict
# save a.memory[:shortmem][:decisionlog] to disk using JSON3
println("\nsaving agent.memory[:shortmem][:decisionlog] to disk")
filename = "agent_decision_log_$(Dates.now())_$(agent.id).json"
filepath = "/appfolder/app/log/$filename"
open(filepath, "w") do io
JSON3.pretty(io, agent.memory[:shortmem][:decisionlog])
end
# for (i, event) in enumerate(agent.memory[:events])
# if event[:subject] == "assistant"
# # create timeline of the last 3 conversation except the last one.
# # The former will be used as caching key and the latter will be the caching target
# # in vector database
# all_recapkeys = keys(agent.memory[:recap]) #[TESTING] recap as caching
# all_recapkeys_vec = [r for r in all_recapkeys] # convert to a vector
# # select from 1 to 2nd-to-lase event (i.e. excluding the latest which is assistant's response)
# _recapkeys_vec = all_recapkeys_vec[1:i-1]
# # select only previous 3 recaps
# recapkeys_vec =
# if length(_recapkeys_vec) <= 3 # 1st message is a user's hello msg
# _recapkeys_vec # choose all
# else
# _recapkeys_vec[end-2:end]
# end
# #[PENDING] if there is specific data such as number, donot store in database
# tempmem = DataStructures.OrderedDict()
# for k in recapkeys_vec
# tempmem[k] = agent.memory[:recap][k]
# end
# recap = GeneralUtils.dictToString_noKey(tempmem)
# thoughtDict = agent.memory[:events][i][:thought] # latest assistant thoughtDict
# insertSommelierDecision(recap, thoughtDict)
# else
# # skip
# end
# end
println("\nCaching conversation process done")
break
end
end
# self terminate if too long inactivity
timediff = GeneralUtils.timedifference(latestUserMsgTimeStamp, Dates.now(), "minutes")
if timediff > timeout
result = Dict(:exitreason => "timeout", :timestamp => Dates.now())
put!(outputchannel, result)
println("Agent ID $(agent.id) timeout has been reached $timediff/$timeout minutes Send delete session msg ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
# send "delete session" message to inform the main loop that this session can be deleted
sendto =
if typeof(config[:servicetopic][:mqtttopic]) <: Array
config[:servicetopic][:mqtttopic][1]
else
config[:servicetopic][:mqtttopic]
end
msgMeta = GeneralUtils.generate_msgMeta(
sendto;
senderName="session",
senderId=sessionId,
msgPurpose="delete session",
mqttBrokerAddress=config[:mqttServerInfo][:broker],
mqttBrokerPort=config[:mqttServerInfo][:port],
)
outgoingMsg = Dict(
:msgMeta => msgMeta,
:payload => nothing
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
try disconnect(agent.mqttClient) catch end
break
end
sleep(1) # allowing on_msg_2, asyncmove above and other process to run
end
end
sessionDict = Dict{String,Any}()
incomingMsgChannel = (ch1=Channel(8),) # store msg that coming into servicetopic
# incommingInternalMsg = [] # st ore msg that coming into servicetopic internal management
keepaliveChannel::Channel{Dict} = Channel{Dict}(8)
# Define the callback for receiving messages.
function onMsgCallback_1(topic, payload)
jobj = JSON3.read(String(payload))
incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively
if occursin("keepalive", topic)
put!(keepaliveChannel, incomingMqttMsg)
else
put!(incomingMsgChannel[:ch1], incomingMqttMsg)
end
end
mqttInstance = GeneralUtils.mqttClientInstance_v2(
config[:mqttServerInfo][:broker],
config[:servicetopic][:mqtttopic],
incomingMsgChannel,
keepaliveChannel,
onMsgCallback_1
)
# ------------------------------------------------------------------------------------------------ #
# this service main loop #
# ------------------------------------------------------------------------------------------------ #
function main()
sessiontimeout = 1 * 1 * 60 # timeout in minute for each instance (day * hour * minute)
initializing = false
while true
# check if mqtt connection is still up
_ = GeneralUtils.checkMqttConnection!(mqttInstance; keepaliveCheckInterval=30)
# initialize session 0
if initializing == false # send init msg
sendto =
if typeof(config[:servicetopic][:mqtttopic]) <: Array
config[:servicetopic][:mqtttopic][1]
else
config[:servicetopic][:mqtttopic]
end
msgMeta = GeneralUtils.generate_msgMeta(
sendto;
msgPurpose="initialize",
senderName="initializer",
senderId="0",
msgId= "initMsg",
replyTopic=sendto,
mqttBrokerAddress=config[:mqttServerInfo][:broker],
mqttBrokerPort=config[:mqttServerInfo][:port],
)
outgoingMsg = Dict(
:msgMeta => msgMeta,
:payload => Dict( # will be shown in frontend as agent name
:text => "Do you have full-bodied red wines under 100 USD. I don't have any other preferences."
)
)
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
initializing = true
println("\n--> Initializing msg sent ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
end
# check for new message
if !isempty(incomingMsgChannel[:ch1])
msg = popfirst!(incomingMsgChannel[:ch1])
println("\n<-- incomingMsg ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
pprintln(msg)
# @spawn new runAgentInstance and store it in sessionDict
# use agent's frontend id because 1 backend agent per 1 frontend session
sessionId = msg[:msgMeta][:senderId]
sessionId = replace(sessionId, "-" => "_") # julia can't use "-" in a dict key
# check for delete session msg
if msg[:msgMeta][:msgPurpose] == "delete session"
delete!(sessionDict, sessionId)
println("sessionId $(sessionId) has been terminated ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
# no session yet, create new session
elseif sessionId keys(sessionDict)
inputch = Channel{Dict}(8)
outputch = Channel{Dict}(8)
process = @spawn runAgentInstance(inputch, outputch, sessionId, config, sessiontimeout)
# process = runAgentInstance(inputch, outputch, sessionId, config, sessiontimeout) #XXX use spawn version
println("\ninstantiate agent success ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
# call runAgentInstance() and store it in sessionDict to be able to check on it later
sessionDict[sessionId] = Dict(
:inputchannel => inputch,
:outputchannel => outputch,
:process => process,
)
put!(sessionDict[sessionId][:inputchannel], msg)
# ongoing session
else
println("sessionId $(sessionId) existing session ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
put!(sessionDict[sessionId][:inputchannel], msg)
end
end
# sleep is needed because MQTTClient use async. "while true" loop leave no
# chance for control to switch to on_msg()
sleep(1)
end
end
main()