707 lines
23 KiB
Julia
707 lines
23 KiB
Julia
using JSON, JSON3, Dates, UUIDs, PrettyPrinting, LibPQ, Base64, DataFrames, DataStructures
|
|
using YiemAgent, GeneralUtils
|
|
using Base.Threads
|
|
|
|
# ---------------------------------------------- 100 --------------------------------------------- #
|
|
|
|
|
|
""" Expected incomming MQTT message format for this service:
|
|
{
|
|
"msgMeta": {
|
|
"msgPurpose": "updateStatus",
|
|
"requestresponse": "request",
|
|
"timestamp": "2024-03-29T05:8:48.362",
|
|
"replyToMsgId": null,
|
|
"receiverId": null,
|
|
"getpost": "get",
|
|
"msgId": "e5c09bd8-7100-4e4e-bb43-05bee589a22c",
|
|
"acknowledgestatus": null,
|
|
"sendTopic": "/agent/wine/backend/chat/api/v1/prompt",
|
|
"receiverName": "agent-wine-backend",
|
|
"replyTopic": "/agent/wine/frontend/chat/api/v1/txt/receive",
|
|
"senderName": "agent-wine-frontend-chat",
|
|
"senderId": "0938a757-e0ee-40a9-8355-5e24906a87cd"
|
|
},
|
|
"payload" : {
|
|
"text": "hello"
|
|
}
|
|
|
|
}
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
# load config
|
|
config = copy(JSON3.read("../mountvolume/config/config.json"))
|
|
|
|
""" Instantiate an agent. One need to specify startmessage and one of gpu location info,
|
|
Mqtt or Rest. start message must be comply with GeneralUtils's message format
|
|
|
|
Arguments\n
|
|
-----
|
|
channel::Channel
|
|
communication channel
|
|
sessionId::String
|
|
sesstion ID of the agent
|
|
agentName::String
|
|
Name of the agent
|
|
mqttBroker::String
|
|
mqtt broker e.g. "tcp://127.0.0.1:1883"
|
|
agentConfigTopic::String
|
|
main communication topic for an agent to ask for config
|
|
timeout::Int64
|
|
inactivity timeout in minutes. If timeout is reached, an agent will be terminated.
|
|
|
|
Return\n
|
|
-----
|
|
a task represent an agent
|
|
|
|
Example\n
|
|
-----
|
|
```jldoctest
|
|
julia> using YiemAgent, GeneralUtils
|
|
julia> msg = GeneralUtils.generate_msgMeta("/agent")
|
|
julia> incoming_msg = msg # assuming 1st msg was sent from other app
|
|
julia> agentConfigTopic = "/agent/wine/backend/config"
|
|
julia> task = runAgentInstance(incoming_msg, mqttBroker, agentConfigTopic, 60)
|
|
```
|
|
|
|
TODO\n
|
|
-----
|
|
[] update docstringLAMA_CONTEXT_LENGTH=40960 since the default size is 2048 as you can see in your debug log:
|
|
[] change how to get result of YiemAgent from let YiemAgent send msg directly to frontend,
|
|
to
|
|
response = YiemAgent.conversation()
|
|
then send response to frontend
|
|
|
|
Signature\n
|
|
-----
|
|
"""
|
|
function runAgentInstance(
|
|
receiveUserMsgChannel::Channel,
|
|
outputchannel::Channel,
|
|
sessionId::String,
|
|
config::Dict,
|
|
timeout::Int64,
|
|
)
|
|
|
|
function executeSQL(sql::T) where {T<:AbstractString}
|
|
host = config[:externalservice][:wineDB][:host]
|
|
port = config[:externalservice][:wineDB][:port]
|
|
dbname = config[:externalservice][:wineDB][:dbname]
|
|
user = config[:externalservice][:wineDB][:user]
|
|
password = config[:externalservice][:wineDB][:password]
|
|
DBconnection = LibPQ.Connection("host=$host port=$port dbname=$dbname user=$user password=$password")
|
|
result = LibPQ.execute(DBconnection, sql)
|
|
close(DBconnection)
|
|
return result
|
|
end
|
|
|
|
function executeSQLVectorDB(sql)
|
|
host = config[:externalservice][:SQLVectorDB][:host]
|
|
port = config[:externalservice][:SQLVectorDB][:port]
|
|
dbname = config[:externalservice][:SQLVectorDB][:dbname]
|
|
user = config[:externalservice][:SQLVectorDB][:user]
|
|
password = config[:externalservice][:SQLVectorDB][:password]
|
|
DBconnection = LibPQ.Connection("host=$host port=$port dbname=$dbname user=$user password=$password")
|
|
result = LibPQ.execute(DBconnection, sql)
|
|
close(DBconnection)
|
|
return result
|
|
end
|
|
|
|
function text2textInstructLLM(prompt::String; maxattempt::Integer=3, modelsize::String="medium",
|
|
senderId=GeneralUtils.uuid4snakecase(), timeout=180,
|
|
llmkwargs=Dict(
|
|
:num_ctx => 32768,
|
|
:temperature => 0.5,
|
|
))
|
|
msgMeta = GeneralUtils.generate_msgMeta(
|
|
config[:externalservice][:loadbalancer][:mqtttopic];
|
|
msgPurpose="inference",
|
|
senderName="yiemagent",
|
|
senderId=senderId,
|
|
receiverName="text2textinstruct_$modelsize",
|
|
mqttBrokerAddress=config[:mqttServerInfo][:broker],
|
|
mqttBrokerPort=config[:mqttServerInfo][:port],
|
|
)
|
|
|
|
outgoingMsg = Dict(
|
|
:msgMeta => msgMeta,
|
|
:payload => Dict(
|
|
:text => prompt,
|
|
:kwargs => llmkwargs
|
|
)
|
|
)
|
|
|
|
response = nothing
|
|
for attempts in 1:maxattempt
|
|
_response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg; timeout=timeout, maxattempt=maxattempt)
|
|
payload = _response[:response]
|
|
if _response[:success] && payload[:text] !== nothing
|
|
response = _response[:response][:text]
|
|
break
|
|
else
|
|
println("\n<text2textInstructLLM()> attempt $attempts/$maxattempt failed ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
pprintln(outgoingMsg)
|
|
println("</text2textInstructLLM()> attempt $attempts/$maxattempt failed ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n")
|
|
sleep(3)
|
|
end
|
|
end
|
|
|
|
return response
|
|
end
|
|
|
|
# get text embedding from a LLM service
|
|
function getEmbedding(text::T) where {T<:AbstractString}
|
|
msgMeta = GeneralUtils.generate_msgMeta(
|
|
config[:externalservice][:loadbalancer][:mqtttopic];
|
|
msgPurpose="embedding",
|
|
senderName="yiemagent",
|
|
senderId=sessionId,
|
|
receiverName="textembedding",
|
|
mqttBrokerAddress=config[:mqttServerInfo][:broker],
|
|
mqttBrokerPort=config[:mqttServerInfo][:port],
|
|
)
|
|
|
|
outgoingMsg = Dict(
|
|
:msgMeta => msgMeta,
|
|
:payload => Dict(
|
|
:text => [text] # must be a vector of string
|
|
)
|
|
)
|
|
|
|
response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg; timeout=120, maxattempt=3)
|
|
embedding = response[:response][:embeddings]
|
|
return embedding
|
|
end
|
|
|
|
function findSimilarTextFromVectorDB(text::T1, tablename::T2, embeddingColumnName::T3,
|
|
vectorDB::Function; limit::Integer=1
|
|
)::DataFrame where {T1<:AbstractString, T2<:AbstractString, T3<:AbstractString}
|
|
# get embedding from LLM service
|
|
embedding = getEmbedding(text)[1]
|
|
# check whether there is close enough vector already store in vectorDB. if no, add, else skip
|
|
sql = """
|
|
SELECT *, $embeddingColumnName <-> '$embedding' as distance
|
|
FROM $tablename
|
|
ORDER BY distance LIMIT $limit;
|
|
"""
|
|
response = vectorDB(sql)
|
|
df = DataFrame(response)
|
|
return df
|
|
end
|
|
|
|
function similarSQLVectorDB(query; maxdistance::Integer=100)
|
|
tablename = "sqlllm_decision_repository"
|
|
# get embedding of the query
|
|
df = findSimilarTextFromVectorDB(query, tablename,
|
|
"function_input_embedding", executeSQLVectorDB)
|
|
# println(df[1, [:id, :function_output]])
|
|
row, col = size(df)
|
|
distance = row == 0 ? Inf : df[1, :distance]
|
|
# distance = 100 # CHANGE this is for testing only
|
|
if row != 0 && distance < maxdistance
|
|
# if there is usable SQL, return it.
|
|
output_b64 = df[1, :function_output_base64] # pick the closest match
|
|
output_str = String(base64decode(output_b64))
|
|
rowid = df[1, :id]
|
|
println("\n~~~ found similar sql. row id $rowid, distance $distance ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
return (dict=output_str, distance=distance)
|
|
else
|
|
println("\n~~~ similar sql not found, max distance $maxdistance ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
return (dict=nothing, distance=nothing)
|
|
end
|
|
end
|
|
|
|
function insertSQLVectorDB(query::T1, SQL::T2; maxdistance::Integer=3) where {T1<:AbstractString, T2<:AbstractString}
|
|
tablename = "sqlllm_decision_repository"
|
|
# get embedding of the query
|
|
# query = state[:thoughtHistory][:question]
|
|
df = findSimilarTextFromVectorDB(query, tablename,
|
|
"function_input_embedding", executeSQLVectorDB)
|
|
row, col = size(df)
|
|
distance = row == 0 ? Inf : df[1, :distance]
|
|
if row == 0 || distance > maxdistance # no close enough SQL stored in the database
|
|
query_embedding = getEmbedding(query)[1]
|
|
query = replace(query, "'" => "")
|
|
sql_base64 = base64encode(SQL)
|
|
sql_ = replace(SQL, "'" => "")
|
|
|
|
sql = """
|
|
INSERT INTO $tablename (function_input, function_output, function_output_base64, function_input_embedding) VALUES ('$query', '$sql_', '$sql_base64', '$query_embedding');
|
|
"""
|
|
# println("\n~~~ added new decision to vectorDB ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
# println(sql)
|
|
_ = executeSQLVectorDB(sql)
|
|
end
|
|
end
|
|
|
|
|
|
function similarSommelierDecision(recentevents::T1; maxdistance::Integer=3
|
|
)::Union{AbstractDict, Nothing} where {T1<:AbstractString}
|
|
tablename = "sommelier_decision_repository"
|
|
# find similar
|
|
println("\n~~~ search vectorDB for this: $recentevents ", @__FILE__, " ", @__LINE__)
|
|
df = findSimilarTextFromVectorDB(recentevents, tablename,
|
|
"function_input_embedding", executeSQLVectorDB)
|
|
row, col = size(df)
|
|
distance = row == 0 ? Inf : df[1, :distance]
|
|
if row != 0 && distance < maxdistance
|
|
# if there is usable decision, return it.
|
|
rowid = df[1, :id]
|
|
println("\n~~~ found similar decision. row id $rowid, distance $distance ", @__FILE__, " ", @__LINE__)
|
|
output_b64 = df[1, :function_output_base64] # pick the closest match
|
|
_output_str = String(base64decode(output_b64))
|
|
output = copy(JSON3.read(_output_str))
|
|
return output
|
|
else
|
|
println("\n~~~ similar decision not found, max distance $maxdistance ", @__FILE__, " ", @__LINE__)
|
|
return nothing
|
|
end
|
|
end
|
|
|
|
|
|
function insertSommelierDecision(recentevents::T1, decision::T2; maxdistance::Integer=5
|
|
) where {T1<:AbstractString, T2<:AbstractDict}
|
|
tablename = "sommelier_decision_repository"
|
|
# find similar
|
|
df = findSimilarTextFromVectorDB(recentevents, tablename,
|
|
"function_input_embedding", executeSQLVectorDB)
|
|
row, col = size(df)
|
|
distance = row == 0 ? Inf : df[1, :distance]
|
|
if row == 0 || distance > maxdistance # no close enough SQL stored in the database
|
|
recentevents_embedding = getEmbedding(recentevents)[1]
|
|
recentevents = replace(recentevents, "'" => "")
|
|
decision_json = JSON3.write(decision)
|
|
decision_base64 = base64encode(decision_json)
|
|
decision = replace(decision_json, "'" => "")
|
|
|
|
sql =
|
|
"""
|
|
INSERT INTO $tablename (function_input, function_output, function_output_base64, function_input_embedding) VALUES ('$recentevents', '$decision', '$decision_base64', '$recentevents_embedding');
|
|
"""
|
|
println("\n~~~ added new decision to vectorDB ", @__FILE__, " ", @__LINE__)
|
|
println(sql)
|
|
_ = executeSQLVectorDB(sql)
|
|
else
|
|
println("~~~ similar decision previously cached, distance $distance ", @__FILE__, " ", @__LINE__)
|
|
end
|
|
end
|
|
|
|
# keepaliveChannel_2::Channel{Dict} = Channel{Dict}(8)
|
|
latestUserMsgTimeStamp::DateTime = Dates.now()
|
|
|
|
externalFunction = (
|
|
getEmbedding=getEmbedding,
|
|
text2textInstructLLM=text2textInstructLLM,
|
|
executeSQL=executeSQL,
|
|
similarSQLVectorDB=similarSQLVectorDB,
|
|
insertSQLVectorDB=insertSQLVectorDB,
|
|
similarSommelierDecision=similarSommelierDecision,
|
|
insertSommelierDecision=insertSommelierDecision,
|
|
)
|
|
|
|
agent = YiemAgent.sommelier(
|
|
externalFunction;
|
|
name="Jane",
|
|
id=sessionId, # agent instance id
|
|
retailername="Yiem",
|
|
llmFormatName="qwen3"
|
|
)
|
|
|
|
# user chat loop
|
|
while true
|
|
# check for new user message
|
|
if isready(receiveUserMsgChannel)
|
|
incomingMsg = take!(receiveUserMsgChannel)
|
|
incoming_msgMeta = incomingMsg[:msgMeta]
|
|
incomingPayload = incomingMsg[:payload]
|
|
latestUserMsgTimeStamp = Dates.now()
|
|
|
|
# make sure the message has :text key because YiemAgent use this key for incoming user msg
|
|
if haskey(incomingPayload, :text)
|
|
# skip, msg already has correct key name
|
|
elseif haskey(incomingPayload, :txt)
|
|
# change key name to text
|
|
incomingPayload[:text] = incomingPayload[:txt]
|
|
else
|
|
error("\n no :txt or :text key in the message.")
|
|
end
|
|
|
|
# reset agent
|
|
if occursin("newtopic", incomingPayload[:text]) ||
|
|
occursin("Newtopic", incomingPayload[:text]) ||
|
|
occursin("New topic", incomingPayload[:text]) ||
|
|
occursin("new topic", incomingPayload[:text])
|
|
# YiemAgent.clearhistory(agent)
|
|
|
|
agent = YiemAgent.sommelier(
|
|
externalFunction;
|
|
name="Janie",
|
|
id=sessionId, # agent instance id
|
|
retailername="Yiem",
|
|
)
|
|
|
|
# sending msg back to sender i.e. LINE
|
|
msgMeta = GeneralUtils.generate_msgMeta(
|
|
incomingMsg[:msgMeta][:replyTopic];
|
|
senderName="wine_assistant_backend",
|
|
senderId=sessionId,
|
|
replyToMsgId=incomingMsg[:msgMeta][:msgId],
|
|
mqttBrokerAddress=config[:mqttServerInfo][:broker],
|
|
mqttBrokerPort=config[:mqttServerInfo][:port],
|
|
)
|
|
outgoingMsg = Dict(
|
|
:msgMeta => msgMeta,
|
|
:payload => Dict(
|
|
:alias => agent.name, # will be shown in frontend as agent name
|
|
:text => "Okay. What shall we talk about?"
|
|
)
|
|
)
|
|
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
|
|
println("--> outgoingMsg ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
pprintln(outgoingMsg)
|
|
else
|
|
usermsg = incomingPayload
|
|
|
|
if incoming_msgMeta[:msgPurpose] == "initialize"
|
|
println("\n-- Initializing... ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
end
|
|
|
|
# send prompt
|
|
result = YiemAgent.conversation(agent;
|
|
userinput=usermsg,
|
|
maximumMsg=50)
|
|
# Ken's bot use [br] for newline character '\n'
|
|
# result = replace(result, '\n'=>"[br]")
|
|
|
|
if incoming_msgMeta[:msgPurpose] == "initialize"
|
|
println("\n-- Initialized. Ready! waiting for request at:\n$(config[:servicetopic][:mqtttopic]) ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
continue
|
|
end
|
|
|
|
msgMeta = GeneralUtils.generate_msgMeta(
|
|
incomingMsg[:msgMeta][:replyTopic];
|
|
senderName="wine_assistant_backend",
|
|
senderId=string(uuid4()),
|
|
replyToMsgId=incomingMsg[:msgMeta][:msgId],
|
|
mqttBrokerAddress=config[:mqttServerInfo][:broker],
|
|
mqttBrokerPort=config[:mqttServerInfo][:port],
|
|
)
|
|
|
|
outgoingMsg = Dict(
|
|
:msgMeta => msgMeta,
|
|
:payload => Dict(
|
|
:alias => agent.name, # will be shown in frontend as agent name
|
|
:text => result
|
|
)
|
|
)
|
|
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
|
|
println("\n--> outgoingMsg ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
pprintln(outgoingMsg)
|
|
|
|
|
|
# jpg_as_juliaStr = nothing
|
|
# prompt = nothing
|
|
|
|
# if haskey(payload, "img")
|
|
# url_or_base64 = payload["img"]
|
|
|
|
# if startswith(url_or_base64, "http")
|
|
# # img in http
|
|
# julia_rgb_img, cv2_bgr_img = ImageUtils.url_to_cv2_image(url_or_base64)
|
|
# _, buffer = cv2.imencode(".jpg", cv2_bgr_img)
|
|
# jpg_as_pyStr = base64.b64encode(buffer).decode("utf-8")
|
|
# jpg_as_juliaStr = pyconvert(String, jpg_as_pyStr)
|
|
# else
|
|
# # img in base64
|
|
# cv2_bgr_img = payload["img"]
|
|
# jpg_as_juliaStr = pyconvert(String, jpg_as_pyStr)
|
|
# end
|
|
# end
|
|
|
|
end
|
|
else
|
|
# println("\n no msg")
|
|
end
|
|
|
|
if haskey(agent.memory[:events][end], :thought)
|
|
lastAssistantAction = agent.memory[:events][end][:thought][:action_name]
|
|
if lastAssistantAction == "ENDCONVERSATION" # store thoughtDict
|
|
|
|
# save a.memory[:shortmem][:decisionlog] to disk using JSON3
|
|
println("\nsaving agent.memory[:shortmem][:decisionlog] to disk")
|
|
filename = "agent_decision_log_$(Dates.now())_$(agent.id).json"
|
|
filepath = "/appfolder/app/log/$filename"
|
|
open(filepath, "w") do io
|
|
JSON3.pretty(io, agent.memory[:shortmem][:decisionlog])
|
|
end
|
|
|
|
# for (i, event) in enumerate(agent.memory[:events])
|
|
# if event[:subject] == "assistant"
|
|
# # create timeline of the last 3 conversation except the last one.
|
|
# # The former will be used as caching key and the latter will be the caching target
|
|
# # in vector database
|
|
# all_recapkeys = keys(agent.memory[:recap]) #[TESTING] recap as caching
|
|
# all_recapkeys_vec = [r for r in all_recapkeys] # convert to a vector
|
|
|
|
# # select from 1 to 2nd-to-lase event (i.e. excluding the latest which is assistant's response)
|
|
# _recapkeys_vec = all_recapkeys_vec[1:i-1]
|
|
|
|
# # select only previous 3 recaps
|
|
# recapkeys_vec =
|
|
# if length(_recapkeys_vec) <= 3 # 1st message is a user's hello msg
|
|
# _recapkeys_vec # choose all
|
|
# else
|
|
# _recapkeys_vec[end-2:end]
|
|
# end
|
|
# #[PENDING] if there is specific data such as number, donot store in database
|
|
# tempmem = DataStructures.OrderedDict()
|
|
# for k in recapkeys_vec
|
|
# tempmem[k] = agent.memory[:recap][k]
|
|
# end
|
|
|
|
# recap = GeneralUtils.dictToString_noKey(tempmem)
|
|
# thoughtDict = agent.memory[:events][i][:thought] # latest assistant thoughtDict
|
|
# insertSommelierDecision(recap, thoughtDict)
|
|
# else
|
|
# # skip
|
|
# end
|
|
# end
|
|
println("\nCaching conversation process done")
|
|
break
|
|
end
|
|
end
|
|
|
|
# self terminate if too long inactivity
|
|
timediff = GeneralUtils.timedifference(latestUserMsgTimeStamp, Dates.now(), "minutes")
|
|
if timediff > timeout
|
|
|
|
result = Dict(:exitreason => "timeout", :timestamp => Dates.now())
|
|
put!(outputchannel, result)
|
|
println("Agent ID $(agent.id) timeout has been reached $timediff/$timeout minutes Send delete session msg ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
|
|
# send "delete session" message to inform the main loop that this session can be deleted
|
|
sendto =
|
|
if typeof(config[:servicetopic][:mqtttopic]) <: Array
|
|
config[:servicetopic][:mqtttopic][1]
|
|
else
|
|
config[:servicetopic][:mqtttopic]
|
|
end
|
|
|
|
msgMeta = GeneralUtils.generate_msgMeta(
|
|
sendto;
|
|
senderName="session",
|
|
senderId=sessionId,
|
|
msgPurpose="delete session",
|
|
mqttBrokerAddress=config[:mqttServerInfo][:broker],
|
|
mqttBrokerPort=config[:mqttServerInfo][:port],
|
|
)
|
|
|
|
outgoingMsg = Dict(
|
|
:msgMeta => msgMeta,
|
|
:payload => nothing
|
|
)
|
|
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
|
|
|
|
try disconnect(agent.mqttClient) catch end
|
|
break
|
|
end
|
|
sleep(1) # allowing on_msg_2, asyncmove above and other process to run
|
|
end
|
|
end
|
|
|
|
sessionDict = Dict{String,Any}()
|
|
incomingMsgChannel = (ch1=Channel(8),) # store msg that coming into servicetopic
|
|
# incommingInternalMsg = [] # st ore msg that coming into servicetopic internal management
|
|
keepaliveChannel::Channel{Dict} = Channel{Dict}(8)
|
|
|
|
# Define the callback for receiving messages.
|
|
function onMsgCallback_1(topic, payload)
|
|
jobj = JSON3.read(String(payload))
|
|
incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively
|
|
|
|
if occursin("keepalive", topic)
|
|
put!(keepaliveChannel, incomingMqttMsg)
|
|
else
|
|
put!(incomingMsgChannel[:ch1], incomingMqttMsg)
|
|
end
|
|
end
|
|
|
|
mqttInstance = GeneralUtils.mqttClientInstance_v2(
|
|
config[:mqttServerInfo][:broker],
|
|
config[:servicetopic][:mqtttopic],
|
|
incomingMsgChannel,
|
|
keepaliveChannel,
|
|
onMsgCallback_1
|
|
)
|
|
|
|
# ------------------------------------------------------------------------------------------------ #
|
|
# this service main loop #
|
|
# ------------------------------------------------------------------------------------------------ #
|
|
|
|
function main()
|
|
sessiontimeout = 1 * 1 * 60 # timeout in minute for each instance (day * hour * minute)
|
|
initializing = false
|
|
while true
|
|
# check if mqtt connection is still up
|
|
_ = GeneralUtils.checkMqttConnection!(mqttInstance; keepaliveCheckInterval=30)
|
|
|
|
# initialize session 0
|
|
if initializing == false # send init msg
|
|
sendto =
|
|
if typeof(config[:servicetopic][:mqtttopic]) <: Array
|
|
config[:servicetopic][:mqtttopic][1]
|
|
else
|
|
config[:servicetopic][:mqtttopic]
|
|
end
|
|
|
|
msgMeta = GeneralUtils.generate_msgMeta(
|
|
sendto;
|
|
msgPurpose="initialize",
|
|
senderName="initializer",
|
|
senderId="0",
|
|
msgId= "initMsg",
|
|
replyTopic=sendto,
|
|
mqttBrokerAddress=config[:mqttServerInfo][:broker],
|
|
mqttBrokerPort=config[:mqttServerInfo][:port],
|
|
)
|
|
|
|
outgoingMsg = Dict(
|
|
:msgMeta => msgMeta,
|
|
:payload => Dict( # will be shown in frontend as agent name
|
|
:text => "Do you have full-bodied red wines under 100 USD. I don't have any other preferences."
|
|
)
|
|
)
|
|
_ = GeneralUtils.sendMqttMsg(outgoingMsg)
|
|
initializing = true
|
|
println("\n--> Initializing msg sent ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
end
|
|
|
|
# check for new message
|
|
if !isempty(incomingMsgChannel[:ch1])
|
|
msg = popfirst!(incomingMsgChannel[:ch1])
|
|
println("\n<-- incomingMsg ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
pprintln(msg)
|
|
|
|
# @spawn new runAgentInstance and store it in sessionDict
|
|
# use agent's frontend id because 1 backend agent per 1 frontend session
|
|
sessionId = msg[:msgMeta][:senderId]
|
|
sessionId = replace(sessionId, "-" => "_") # julia can't use "-" in a dict key
|
|
|
|
# check for delete session msg
|
|
if msg[:msgMeta][:msgPurpose] == "delete session"
|
|
delete!(sessionDict, sessionId)
|
|
println("sessionId $(sessionId) has been terminated ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
|
|
# no session yet, create new session
|
|
elseif sessionId ∉ keys(sessionDict)
|
|
inputch = Channel{Dict}(8)
|
|
outputch = Channel{Dict}(8)
|
|
|
|
process = @spawn runAgentInstance(inputch, outputch, sessionId, config, sessiontimeout)
|
|
# process = runAgentInstance(inputch, outputch, sessionId, config, sessiontimeout) #XXX use spawn version
|
|
|
|
println("\ninstantiate agent success ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
|
|
# call runAgentInstance() and store it in sessionDict to be able to check on it later
|
|
sessionDict[sessionId] = Dict(
|
|
:inputchannel => inputch,
|
|
:outputchannel => outputch,
|
|
:process => process,
|
|
)
|
|
put!(sessionDict[sessionId][:inputchannel], msg)
|
|
# ongoing session
|
|
else
|
|
println("sessionId $(sessionId) existing session ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
put!(sessionDict[sessionId][:inputchannel], msg)
|
|
end
|
|
end
|
|
|
|
# sleep is needed because MQTTClient use async. "while true" loop leave no
|
|
# chance for control to switch to on_msg()
|
|
sleep(1)
|
|
end
|
|
end
|
|
|
|
main()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|