module utils export sendReceivePrompt, chunktext, extractStepFromPlan, checkTotalTaskInPlan, detectCharacters, findDetectedCharacter, extract_number, toolNameBeingCalled, isUsePlans, conversationSummary, checkReasonableness, replaceHeaders, addShortMem!, splittext, dictToString, removeHeaders, keepOnlyKeys, experience, messagesToString, messagesToString_nomark, removeTrailingCharacters, shortMemLatestTask, keywordMemoryUpdate! using UUIDs, Dates, DataStructures using CommUtils, GeneralUtils using ..type #------------------------------------------------------------------------------------------------100 """ Send a msg to registered mqtt topic within mqttClient. ```jldoctest julia> using JSON3, UUIDs, Dates, FileIO, CommUtils, ChatAgent julia> newAgent = ChatAgent.agentReact( "Jene", mqttClientSpec, role=:assistant_react, msgMeta=msgMeta ) ``` """ function sendReceivePrompt(a::T, prompt::String; max_tokens=256, timeout::Int=120, temperature::AbstractFloat=0.2, stopword=[], seed=nothing) where {T<:agent} a.msgMeta[:msgId] = "$(uuid4())" # new msg id for each msg msg = Dict( :msgMeta=> a.msgMeta, :txt=> prompt, :max_tokens=> max_tokens, :temperature=> temperature, :stopword=> stopword, :seed=> seed, ) payloadChannel = Channel(1) # send prompt CommUtils.request(a.mqttClient, msg) starttime = Dates.now() result = nothing while true timepass = (Dates.now() - starttime).value / 1000.0 CommUtils.mqttRun(a.mqttClient, payloadChannel) if isready(payloadChannel) topic, payload = take!(payloadChannel) if payload[:msgMeta][:repondToMsgId] == msg[:msgMeta][:msgId] result = haskey(payload, :txt) ? payload[:txt] : nothing break end elseif timepass <= timeout # skip, within waiting period elseif timepass > timeout println("sendReceivePrompt timeout $timepass/$timeout") result = nothing break else error("undefined condition. timepass=$timepass timeout=$timeout $(@__LINE__)") end end return result end """ Detect given characters. Output is a list of named tuple of detected char. ```jldoctest julia> text = "I like to eat apples and use utensils." julia> characters = ["eat", "use", "i"] julia> result = detectCharacters(text, characters) 4-element Vector{Any}: (char = "i", start = 4, stop = 4) (char = "eat", start = 11, stop = 13) (char = "use", start = 26, stop = 28) (char = "i", start = 35, stop = 35) ``` """ function detectCharacters(text::T1, characters::Vector{T2}) where {T1<:AbstractString, T2<:AbstractString} result = [] for i in eachindex(text) for char in characters l = length(char) char_startInd = i char_endInd = i+l-1 # -1 because Julia use inclusive index if char_endInd > length(text) # skip else try # some time StringIndexError: invalid index [535], valid nearby indices [534]=>'é', [536]=>' ' if text[char_startInd: char_endInd] == char push!(result, (char=char, start=char_startInd, stop=char_endInd)) end catch end end end end return result end """ Chunk a text into smaller pieces by header. ```jldoctest julia> using ChatAgent julia> text = "Plan: First, we need to find out what kind of wine the user wants." julia> headers = ChatAgent.detectCharacters(text, ["Nope", "sick", "First", "user", "Then", ]) 3-element Vector{Any}: (char = "First", start = 7, stop = 11) (char = "user", start = 56, stop = 59) (char = "Then", start = 102, stop = 105) julia> chunkedtext = ChatAgent.chunktext(text, headers) OrderedDict{String, String} with 3 entries: "Act 1:" => " wikisearch" "Actinput 1:" => " latest AMD GPU" "Thought 1:" => " I should always think about..." ``` """ function chunktext(text::T1, headers::T2) where {T1<:AbstractString, T2<:AbstractVector} result = OrderedDict{String, Any}() for (i, v) in enumerate(headers) if i < length(headers) nextheader = headers[i+1] body = text[v[:stop]+1: nextheader[:start]-1] # push!(result, (header=v[:char], body=body)) result[v[:char]] = body else body = text[v[:stop]+1: end] # push!(result, (header=v[:char], body=body)) result[v[:char]] = body end end return result end function extractStepFromPlan(a::agent, plan::T, step::Int) where {T<:AbstractString} prompt = """ <|im_start|>system You are a helpful assistant. Your job is to extract step $step in the user plan. Use the following format only: {copy the step and put it here} <|im_end|> <|im_start|>user $plan <|im_end|> <|im_start|>assistant """ response = sendReceivePrompt(a, prompt) return response end function checkTotalTaskInPlan(a::agent) headers = [] for (k, v) in a.memory[:shortterm] push!(headers, k) end # Plan will have number e.g. Plan 3: so I need a way to detect latest Plan header = nothing for i in reverse(headers) if occursin("Plan", i) header = i break end end p = a.memory[:shortterm][header] plan = "Plan: $p" prompt = """ <|im_start|>system You are a helpful assistant. Your job is to determine how many steps in a user plan. Use the following format to answer: Total step number is {} <|im_end|> <|im_start|>user $plan <|im_end|> <|im_start|>assistant """ response = sendReceivePrompt(a, prompt) result = extract_number(response) return result end """ Find a given character from a vector of named tuple. Output is character location index inside detectedCharacters ```jldoctest julia a = [ (char = "i", start = 4, stop = 4) (char = "eat", start = 11, stop = 13) (char = "use", start = 26, stop = 28) (char = "i", start = 35, stop = 35) ] julia> findDetectedCharacter(a, "i") [1, 4] ``` """ function findDetectedCharacter(detectedCharacters, character) allchar = [i[1] for i in detectedCharacters] return findall(isequal.(allchar, character)) end function extract_number(text::T) where {T<:AbstractString} regex = r"\d+" # regular expression to match one or more digits match = Base.match(regex, text) # find the first match in the text if match !== nothing number = parse(Int, match.match) return number else error("No number found in the text $(@__LINE__)") end end """ Extract toolname from text. ```jldoctest julia> text = " internetsearch\n" julia> tools = Dict( :internetsearch=>Dict( :name => "internetsearch", :description => "Useful for when you need to search the Internet", :input => "Input should be a search query.", :output => "", # :func => internetsearch # function ), :chatbox=>Dict( :name => "chatbox", :description => "Useful for when you need to ask a customer what you need to know or to talk with them.", :input => "Input should be a conversation to customer.", :output => "" , ), ) julia> toolname = toolNameBeingCalled(text, tools) ``` """ function toolNameBeingCalled(text::T, tools::Dict) where {T<:AbstractString} toolNameBeingCalled = nothing for (k, v) in tools toolname = String(k) if contains(text, toolname) toolNameBeingCalled = toolname break end end return toolNameBeingCalled end # function chooseThinkingMode(a::agentReflex, usermsg::String) # thinkingmode = nothing # if length(a.memory[:log]) != 0 # thinkingmode = :continue_thinking # else # prompt = # """ # <|im_start|>system # {systemMsg} # You always use tools if there is a chance to impove your response. # You have access to the following tools: # {tools} # Your job is to determine whether you will use tools or actions to response. # Choose one of the following choices: # Choice 1: If you don't need to use tools or actions to response to the stimulus say, "{no}". # Choice 2: If you think the user want to get wine say, "{yes}". # <|im_end|> # <|im_start|>user # {input} # <|im_end|> # <|im_start|>assistant # """ # toollines = "" # for (toolname, v) in a.tools # if toolname ∉ ["chatbox", "nothing"] # toolline = "$toolname: $(v[:description]) $(v[:input]) $(v[:output])\n" # toollines *= toolline # end # end # prompt = replace(prompt, "{systemMsg}" => a.roles[a.role]) # prompt = replace(prompt, "{tools}" => toollines) # prompt = replace(prompt, "{input}" => usermsg) # result = sendReceivePrompt(a, prompt) # willusetools = GeneralUtils.getStringBetweenCharacters(result, "{", "}") # thinkingmode = willusetools == "yes" ? :new_thinking : :no_thinking # end # return thinkingmode # end """ Determine from a user message whether an assistant need to use tools. Arguments: a, one of ChatAgent's agent. Return: 1. true/false # is LLM going to use tools 2. objective # what LLM going to do """ function isUsePlans(a::agentReflex) toollines = "" for (toolname, v) in a.tools if toolname ∉ ["chatbox"] # LLM will always use chatbox toolline = "$toolname is $(v[:description])\n" toollines *= toolline end end conversation = messagesToString(a.messages) aboutYourself = """ Your name is $(a.agentName) $(a.roles[a.role]) """ prompt = """ <|system|> $aboutYourself $toollines $conversation Your job is to decide whether you need think thoroughly or use tools in order to respond to the user. Use the following format: Thought: Do you need to think thoroughly or use tools before responding to the user? user: Hello!. How are you? assistant: The user is greeting me, I don't need to think about it. user: "What's tomorrow weather like?" assistant: I will need to use weather tools to check for tomorrow's temperature. <|assistant|> """ isuseplan = false @show a.role if length(a.memory[:shortterm]) != 0 isuseplan = true elseif a.role == :assistant isuseplan = false else # if LLM mentions any tools, use Plan/Thought/Act loop response = sendReceivePrompt(a, prompt, temperature=0.2, max_tokens=64, stopword=["<|", " conversation = [ Dict(:role=> "user", :content=> "I would like to get a bottle of wine", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "What kind of Thai dishes are you having?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "It a pad thai.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "Is there any special occasion for this event?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "We'll hold a wedding party at the beach.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "What is your preferred type of wine?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "I like dry white wine with medium tanins.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "What is your preferred price range for this bottle of wine?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "lower than 50 dollars.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "Based on your preferences and our stock, I recommend the following two wines for you: 1. Pierre Girardin \"Murgers des Dents de Chien\" - Saint-Aubin 1er Cru (17 USD) 2. Etienne Sauzet'Les Perrieres' - Puligny Montrachet Premier Cru (22 USD) The first wine, Pierre Girardin \"Murgers des Dents de Chien\" - Saint-Aubin 1er Cru, is a great choice for its affordable price and refreshing taste. It pairs well with Thai dishes and will be perfect for your beach wedding party. The second wine, Etienne Sauzet'Les Perrieres' - Puligny Montrachet Premier Cru, offers a more complex flavor profile and slightly higher price point, but still remains within your budget. Both wines are suitable for serving at 22 C temperature.", :timestamp=> Dates.now()), ] julia> summary = conversationSummary(conversation) ``` """ function conversationSummary(a::T) where {T<:agent} conversation = messagesToString_nomark(a.messages, addressAIas="I") prompt = """ <|system|> Your conversation with the user: $conversation Your job is to paraphrase a conversation from your perspective. You must refers to yourself by "I" in the summary. <|/s|> <|assistant|> Paraphrase: """ result = sendReceivePrompt(a, prompt) summary = split(result, "<|/s|>")[1] summary = return summary end """ Convert a vector of dict into 1-continous string. Arguments\n vecofdict : a vector of dict Return\n 1-continous string Example: ```jldoctest julia> using ChatAgent julia> agent = ChatAgent.agentReflex("Jene") julia> agent.messages = [Dict(:role=> "user", :content=> "Hi there."), Dict(:role=> "assistant", :content=> "Hello! How can I assist you today?"),] julia> messagesToString(agent.messages) "<|im_start|>user: Hi there.\n<|im_end|><|im_start|>assistant: Hello! How can I assist you today?\n<|im_end|>" ``` """ function messagesToString(messages::AbstractVector{T}; addressAIas="assistant") where {T<:AbstractDict} conversation = "" if length(messages)!= 0 for msg in messages role = msg[:role] content = msg[:content] nouse = 0 for i in reverse(content) if i == '\n' || i == ' ' nouse += 1 else break end end if role == "user" conversation *= "<|$role|>\n $(content[1:end-nouse])\n" elseif role == "assistant" conversation *= "<|$addressAIas|>\n $(content[1:end-nouse])\n" else error("undefied condition role = $role $(@__LINE__)") end end else conversation = "N/A" end return conversation end # function messagesToString(messages::AbstractVector{T}; addressAIas="assistant") where {T<:AbstractDict} # conversation = "" # if length(messages)!= 0 # for msg in messages # role = msg[:role] # content = msg[:content] # nouse = 0 # for i in reverse(content) # if i == '\n' || i == ' ' # nouse += 1 # else # break # end # end # if role == "user" # conversation *= "<|$role|>\n $(content[1:end-nouse])\n" # elseif role == "assistant" # conversation *= "<|$addressAIas|>\n $(content[1:end-nouse])\n" # else # error("undefied condition role = $role $(@__LINE__)") # end # end # else # conversation = "N/A" # end # return conversation # end """ Convert a vector of dict into 1-continous string. Arguments: vecofdict, a vector of dict Return: 1-continous string Example: ```jldoctest julia> using ChatAgent julia> agent = ChatAgent.agentReflex("Jene") julia> agent.messages = [Dict(:role=> "user", :content=> "Hi there."), Dict(:role=> "assistant", :content=> "Hello! How can I assist you today?"),] julia> messagesToString(agent.messages) "user: Hi there.\nassistant: Hello! How can I assist you today?\n" ``` """ function messagesToString_nomark(messages::AbstractVector{T}; addressAIas="assistant") where {T<:AbstractDict} conversation = "" if length(messages)!= 0 for msg in messages role = msg[:role] content = msg[:content] content = removeTrailingCharacters(content) if role == "user" conversation *= "$role: $content\n" elseif role == "assistant" conversation *= "$addressAIas: $content\n" else error("undefied condition role = $role $(@__LINE__)") end end else conversation = "N/A" end return conversation end function dictToString(shortMemory::T; skiplist::Union{Vector{String}, Vector{Symbol}}=[""]) where {T<:AbstractDict} s = "" for (k, v) in shortMemory if k ∉ skiplist new_v = removeTrailingCharacters(v) s1 = "$k $new_v\n" s *= s1 end end return s end # function dictToString(dict::T; # skiplist::Union{Array{String}, Array{Symbol}}=[]) where {T<:AbstractDict} # s = "" # for (k, v) in dict # if k ∉ skiplist # s1 = "$k $v" # s *= s1 # # ensure a newline seperate each sentences # if s[end] != "\n" # s *= "\n" # end # end # end # return s # end """ Remove trailing characters from text. Arguments: text, text you want to remove trailing characters charTobeRemoved, a list of characters to be removed Return: text with specified trailing characters removed Example: ```jldoctest julia> text = "Hello! How can I assist you today?\n\n " julia> removelist = ['\n', ' ',] julia> removeTrailingCharacters(text, charTobeRemoved=removelist) "Hello! How can I assist you today?" ``` """ function removeTrailingCharacters(text; charTobeRemoved::AbstractVector{T}=['\n', ' ',]) where {T<:Char} nouse = 0 for i in reverse(text) if i ∈ charTobeRemoved nouse += 1 else break end end return text[1:end-nouse] end #TODO function checkReasonableness(userMsg::String, context::String, tools) # Ref: https://www.youtube.com/watch?v=XV4IBaZqbps prompt = """ <|im_start|>system You are a helpful assistant. Your job is to check the reasonableness of user assignments. If the user assignment can be answered given the tools available say, "This is a reasonable assignment". If the user assignment cannot be answered then provide some feedback to the user that may improve their assignment. Here is the context for the assignment: {context} <|im_end|> <|im_start|>user {assignment} <|im_end|> <|im_start|>assistant """ context = "You have access to the following tools: WineStock: useful for when you need to find info about wine by matching your description, price, name or ID. Input should be a search query with as much details as possible." prompt = replace(prompt, "{assignment}" => userMsg) prompt = replace(prompt, "{context}" => context) output_py = llm( prompt, max_tokens=512, temperature=0.1, # top_p=top_p, echo=false, stop=["", "<>", ], ) _output_jl = pyconvert(Dict, output_py); output = pyconvert(Dict, _output_jl["choices"][1]); output["text"] end """ Add chunked text to a short term memory of a chat agent Arguments: shortMem = short memory of a chat agent, chunkedtext = a dict contains text Return: no return Example: ```jldoctest julia> chunkedtext = OrderedDict{String, String}( "Thought 1:" => " I should always think about...", "Act 1:" => " wikisearch", "Actinput 1:" => " latest AMD GPU",) julia> shortMem = OrderedDict{String, Any}() julia> addShortMem!(shortMem, chunkedtext) OrderedDict{String, Any} with 3 entries: "Thought 1:" => " I should always think about..." "Act 1:" => " wikisearch" "Actinput 1:" => " latest AMD GPU" ``` """ function addShortMem!(shortMem::OrderedDict{String, Any}, chunkedtext::T) where {T<:AbstractDict} for (k, v) in chunkedtext shortMem[k] = v end return shortMem end """ Split text using all keywords in a list. Start spliting from rightmost of the text. Arguments: text = a text you want to split list = a list of keywords you want to split Return: a leftmost text after split Example: ```jldoctest julia> text = "Consider the type of food, occasion and temperature at the serving location." julia> list = ["at", "and"] "Consider the type of food, occasion " ``` """ function splittext(text, list) newtext = text for i in list newtext = split(newtext, i)[1] end return newtext end """ Add step number to header in a text """ function addStepNumber(text::T, headers, step::Int) where {T<:AbstractString} newtext = text for i in headers if occursin(i[:char], newtext) new = replace(i[:char], ":"=> " $step:") newtext = replace(newtext, i[:char]=>new ) end end return newtext end function addStepNumber(text::T, headers, step::Int, substep::Int) where {T<:AbstractString} newtext = text for i in headers if occursin(i[:char], newtext) new = replace(i[:char], ":"=> " $step-$substep:") newtext = replace(newtext, i[:char]=>new ) end end return newtext end """ Add step number to header in a text Arguments: text = a text you want to split headers = a list of keywords you want to add step and substep to step = a number you want to add Return: a leftmost text after split Example: ```jldoctest julia> text = "Consider the type of food, occasion and temperature at the serving location." julia> headers = ["Thought", "Act"] ``` """ function replaceHeaders(text::T, headers, step::Int) where {T<:AbstractString} newtext = text for i in headers header = i[1:end-1] # not include ":" if occursin(header, newtext) startind = findfirst(header, newtext)[1] stopind = findnext(":", newtext, startind+1)[end] word = newtext[startind: stopind] newword = "$header $step:" newtext = replace(newtext, word=> newword) if header == "Thought:" error(1) end end end return newtext end """ Remove headers of specific step from memory. Arguments: shortMemory = a short term memory of a ChatAgent's agent skipHeaders = a list of keys in memory you want to skip step = a step number you want to remove Return: a short term memory Example: ```jldoctest julia> shortMemory = OrderedDict( "user:" => "May I try this one?", "Plan 1:" => "testing a small portion of icecream", "Thought 1:" => "I like it.", "Act 1:" => "chatbox", "Actinput 1:" => "I get this one.", "Plan 2:" => "I'm meeting my wife this afternoon", "Thought 2:" => "I also want it for my wife", "Act 2:" => "chatbox", "Actinput 2:" => "I would like to get 2 more", ) julia> skipHeaders = ["Plan"] julia> step = 2 julia> removeHeaders(shortMemory, step, skipHeaders) OrderedDict( "user:" => "May I try this one?", "Plan 1:" => "testing a small portion of icecream", "Thought 1:" => "I like it.", "Act 1:" => "chatbox", "Actinput 1:" => "I get this one.", "Plan 2:" => "I'm meeting my wife this afternoon", ) ``` """ function removeHeaders(shortMemory::OrderedDict, step::Int, skipHeaders::Union{Array{String}, Array{Symbol}, Nothing}=nothing) newdict = similar(shortMemory) for (k, v) in shortMemory if occursin("$step", k) if skipHeaders !== nothing for i in skipHeaders if occursin(i, k) newdict[k] = v else # skip, not copy end end else # no copy end else newdict[k] = v end end return newdict end """ Keep only specified keys in a dictionary. All non-specified keys will be removed. Arguments: dict = a dictionary keys = keys you want to keep in a dict Return: a dict with all non-specified keys removed Example: ```jldoctest julia> dict = OrderedDict( "user:" => "May I try this one?", "Plan 1:" => "testing a small portion of icecream", "Thought 1:" => "I like it.", "Act 1:" => "chatbox", "Actinput 1:" => "I get this one.", "Plan 2:" => "I'm meeting my wife this afternoon", "Thought 2:" => "I also want it for my wife", "Act 2:" => "chatbox", "Actinput 2:" => "I would like to get 2 more", ) julia> keys = ["user:"] julia> keepOnlyKeys(dict, keys) OrderedDict( "user:" => "May I try this one?", ) ``` """ function keepOnlyKeys(dict::T1, keys::T2) where {T1<:AbstractDict, T2<:AbstractVector} newdict = similar(dict) for (k, v) in dict if k ∈ keys newdict[k] = v end end return newdict end """ Convert experience dict into 1 string for LLM to use. Arguments: dict = a dictionary contain past experience Return: An experience in 1 string without context keys. Example: ```jldoctest julia> dict = OrderedDict{String, Any}( " This lesson can be applied to various situations => " Gathering accurate and relevant information about the user's preferences, budget, and event details is crucial for providing personalized recommendations.\n" ) julia> experience(dict) ``` """ function experience(dict::T) where {T<:AbstractDict} s = "" for (k, v) in dict s *= v end return s end """ Get the latest step number of short term memory Arguments: dict = a dictionary contain past experience Return: latest step number Example: ```jldoctest julia> dict = OrderedDict( "Plan 1:" => "1. Ask about the type of food that will be served at the wedding party.") julia> shortMemLatestTask(dict) """ function shortMemLatestTask(dict::T) where {T<:AbstractDict} _latest_step = keys(dict) _latest_step = [i for i in _latest_step] _latest_step = _latest_step[end] latest_step = parse(Int, _latest_step[end-2:end-1]) return latest_step end """ Update a keyword memory Arguments: newinfo = a dictionary contain new info keywordmemory = a dictionary contain previous info Return: an updated keyword memory Example: ```jldoctest julia> newinfo = Dict("car type" => "SUV", "engine type" => "electric") julia> keywordmemory = Dict("car type" => "sedan", "car color" => "blue", "financing" => null) julia> keywordMemoryUpdate(keywordmemory, newdict) """ function keywordMemoryUpdate!(keywordmemory::AbstractDict, newinfo::AbstractDict) for (k, v) in newinfo k = String(k) # some time input keyword is different than dict's key thus it skip update # e.x. input key "tannin level" => "low to medium" # dict key "wine tannin level" => "low to medium" similar_k = checkSimilarKey(keywordmemory, k) k = similar_k === nothing ? k : similar_k if v === nothing && haskey(keywordmemory, similar_k) && keywordmemory[similar_k] !== nothing # do not write value nothing if the key already has value else if haskey(keywordmemory, k) println("before k $k v $(keywordmemory[k])") end println("write k $k v $v") keywordmemory[k] = v println("after k $k v $(keywordmemory[k])") println("-----") end end end function checkSimilarKey(dict::AbstractDict, key::AbstractString) similar_k = nothing key = replace(key, "_" => " ") key = replace(key, "-" => " ") for (k, v) in dict if occursin(key, String(k)) similar_k = k break end end return similar_k end end # end module