module utils export makeSummary, sendReceivePrompt, chunktext, extractStepFromPlan, checkTotalTaskInPlan, detectCharacters, findDetectedCharacter, extract_number, toolNameBeingCalled, isUsePlans, conversationSummary, checkReasonableness, replaceHeaders, addShortMem!, splittext, dictToString, removeHeaders, keepOnlyKeys, experience, messagesToString, messagesToString_nomark, removeTrailingCharacters, shortMemLatestTask using UUIDs, Dates, DataStructures using CommUtils, GeneralUtils using ..type #------------------------------------------------------------------------------------------------100 function makeSummary(a::T1, input::T2) where {T1<:agent, T2<:AbstractString} summary = "Nothing." prompt = """ <|im_start|>system Input text: $input Your job is to determine now whether you can make a summary of the input text by choosing one of following choices: If you cannot make a summary say, "{No}". If you can make a summary say, "{Yes}". <|im_end|> """ prompt = replace(prompt, "{input}" => input) result = sendReceivePrompt(a, prompt) result = GeneralUtils.getStringBetweenCharacters(result, "{", "}") if result == "Yes" # seperate summary part prompt = """ <|im_start|>system Input text: $input Your job is to make a concise summary of the input text. <|im_end|> """ result = sendReceivePrompt(a, prompt) if result[1:1] == "\n" summary = result[2:end] else summary = result end end input_summary = input @show input_summary @show summary return summary end """ Send a msg to registered mqtt topic within mqttClient. ```jldoctest julia> using JSON3, UUIDs, Dates, FileIO, CommUtils, ChatAgent julia> newAgent = ChatAgent.agentReact( "Jene", mqttClientSpec, role=:assistant_react, msgMeta=msgMeta ) ``` """ function sendReceivePrompt(a::T, prompt::String; max_tokens=256, timeout::Int=120, temperature::AbstractFloat=0.2) where {T<:agent} a.msgMeta[:msgId] = "$(uuid4())" # new msg id for each msg msg = Dict( :msgMeta=> a.msgMeta, :txt=> prompt, :max_tokens=> max_tokens, :temperature=> temperature, ) payloadChannel = Channel(1) # send prompt CommUtils.request(a.mqttClient, msg) starttime = Dates.now() result = nothing while true timepass = (Dates.now() - starttime).value / 1000.0 CommUtils.mqttRun(a.mqttClient, payloadChannel) if isready(payloadChannel) topic, payload = take!(payloadChannel) if payload[:msgMeta][:repondToMsgId] == msg[:msgMeta][:msgId] result = haskey(payload, :txt) ? payload[:txt] : nothing break end elseif timepass <= timeout # skip, within waiting period elseif timepass > timeout println("sendReceivePrompt timeout $timepass/$timeout") result = nothing break else error("undefined condition. timepass=$timepass timeout=$timeout $(@__LINE__)") end end return result end """ Detect given characters. Output is a list of named tuple of detected char. ```jldoctest julia> text = "I like to eat apples and use utensils." julia> characters = ["eat", "use", "i"] julia> result = detectCharacters(text, characters) 4-element Vector{Any}: (char = "i", start = 4, stop = 4) (char = "eat", start = 11, stop = 13) (char = "use", start = 26, stop = 28) (char = "i", start = 35, stop = 35) ``` """ function detectCharacters(text::T1, characters::Vector{T2}) where {T1<:AbstractString, T2<:AbstractString} result = [] for i in eachindex(text) for char in characters l = length(char) char_startInd = i char_endInd = i+l-1 # -1 because Julia use inclusive index if char_endInd > length(text) # skip else try # some time StringIndexError: invalid index [535], valid nearby indices [534]=>'é', [536]=>' ' if text[char_startInd: char_endInd] == char push!(result, (char=char, start=char_startInd, stop=char_endInd)) end catch end end end end return result end """ Chunk a text into smaller pieces by header. ```jldoctest julia> using ChatAgent julia> text = "Plan: First, we need to find out what kind of wine the user wants." julia> headers = ChatAgent.detectCharacters(text, ["Nope", "sick", "First", "user", "Then", ]) 3-element Vector{Any}: (char = "First", start = 7, stop = 11) (char = "user", start = 56, stop = 59) (char = "Then", start = 102, stop = 105) julia> chunkedtext = ChatAgent.chunktext(text, headers) OrderedDict{String, String} with 3 entries: "Act 1:" => " wikisearch" "Actinput 1:" => " latest AMD GPU" "Thought 1:" => " I should always think about..." ``` """ function chunktext(text::T1, headers::T2) where {T1<:AbstractString, T2<:AbstractVector} result = OrderedDict{String, Any}() for (i, v) in enumerate(headers) if i < length(headers) nextheader = headers[i+1] body = text[v[:stop]+1: nextheader[:start]-1] # push!(result, (header=v[:char], body=body)) result[v[:char]] = body else body = text[v[:stop]+1: end] # push!(result, (header=v[:char], body=body)) result[v[:char]] = body end end return result end function extractStepFromPlan(a::agent, plan::T, step::Int) where {T<:AbstractString} prompt = """ <|im_start|>system You are a helpful assistant. Your job is to extract step $step in the user plan. Use the following format only: {copy the step and put it here} <|im_end|> <|im_start|>user $plan <|im_end|> <|im_start|>assistant """ response = sendReceivePrompt(a, prompt) return response end function checkTotalTaskInPlan(a::agent) headers = [] for (k, v) in a.memory[:shortterm] push!(headers, k) end # Plan will have number e.g. Plan 3: so I need a way to detect latest Plan header = nothing for i in reverse(headers) if occursin("Plan", i) header = i break end end p = a.memory[:shortterm][header] plan = "Plan: $p" prompt = """ <|im_start|>system You are a helpful assistant. Your job is to determine how many steps in a user plan. Use the following format to answer: Total step number is {} <|im_end|> <|im_start|>user $plan <|im_end|> <|im_start|>assistant """ response = sendReceivePrompt(a, prompt) result = extract_number(response) return result end """ Find a given character from a vector of named tuple. Output is character location index inside detectedCharacters ```jldoctest julia a = [ (char = "i", start = 4, stop = 4) (char = "eat", start = 11, stop = 13) (char = "use", start = 26, stop = 28) (char = "i", start = 35, stop = 35) ] julia> findDetectedCharacter(a, "i") [1, 4] ``` """ function findDetectedCharacter(detectedCharacters, character) allchar = [i[1] for i in detectedCharacters] return findall(isequal.(allchar, character)) end function extract_number(text::T) where {T<:AbstractString} regex = r"\d+" # regular expression to match one or more digits match = Base.match(regex, text) # find the first match in the text if match !== nothing number = parse(Int, match.match) return number else error("No number found in the text $(@__LINE__)") end end """ Extract toolname from text. ```jldoctest julia> text = " internetsearch\n" julia> tools = Dict( :internetsearch=>Dict( :name => "internetsearch", :description => "Useful for when you need to search the Internet", :input => "Input should be a search query.", :output => "", # :func => internetsearch # function ), :chatbox=>Dict( :name => "chatbox", :description => "Useful for when you need to ask a customer what you need to know or to talk with them.", :input => "Input should be a conversation to customer.", :output => "" , ), ) julia> toolname = toolNameBeingCalled(text, tools) ``` """ function toolNameBeingCalled(text::T, tools::Dict) where {T<:AbstractString} toolNameBeingCalled = nothing for (k, v) in tools toolname = String(k) if contains(text, toolname) toolNameBeingCalled = toolname break end end return toolNameBeingCalled end # function chooseThinkingMode(a::agentReflex, usermsg::String) # thinkingmode = nothing # if length(a.memory[:log]) != 0 # thinkingmode = :continue_thinking # else # prompt = # """ # <|im_start|>system # {systemMsg} # You always use tools if there is a chance to impove your response. # You have access to the following tools: # {tools} # Your job is to determine whether you will use tools or actions to response. # Choose one of the following choices: # Choice 1: If you don't need to use tools or actions to response to the stimulus say, "{no}". # Choice 2: If you think the user want to get wine say, "{yes}". # <|im_end|> # <|im_start|>user # {input} # <|im_end|> # <|im_start|>assistant # """ # toollines = "" # for (toolname, v) in a.tools # if toolname ∉ ["chatbox", "nothing"] # toolline = "$toolname: $(v[:description]) $(v[:input]) $(v[:output])\n" # toollines *= toolline # end # end # prompt = replace(prompt, "{systemMsg}" => a.roles[a.role]) # prompt = replace(prompt, "{tools}" => toollines) # prompt = replace(prompt, "{input}" => usermsg) # result = sendReceivePrompt(a, prompt) # willusetools = GeneralUtils.getStringBetweenCharacters(result, "{", "}") # thinkingmode = willusetools == "yes" ? :new_thinking : :no_thinking # end # return thinkingmode # end """ Determine from a user message whether an assistant need to use tools. Arguments: a, one of ChatAgent's agent. Return: 1. true/false # is LLM going to use tools 2. objective # what LLM going to do """ function isUsePlans(a::agentReflex) toollines = "" for (toolname, v) in a.tools if toolname ∉ ["chatbox"] # LLM will always use chatbox toolline = "$toolname: $(v[:description]) $(v[:input]) $(v[:output])\n" toollines *= toolline end end conversation = messagesToString(a.messages) prompt = """ <|system|> You are a helpful assistant. You have access to the following tools: $toollines Your task is to decide whether you need think thoroughly in order to respond to the user according to your conversation with the user and tools you have. So for instance the following: user: Hello!. How are you? assistant: {No}, the user is greeting me, I could respond right away. user: "I want a bottle of wine." assistant: {Yes}, I need to think thoroughly about the user stimulus. $conversation <|assistant|> """ # if LLM mentions any tools, use Plan/Thought/Act loop isuseplan = false response = sendReceivePrompt(a, prompt, temperature=0.2, max_tokens=64) response = split(response, "<|assistant|>")[1] response = split(response, "<|user|>")[1] for (toolname, v) in a.tools if occursin("Yes", String(response)) isuseplan = true break end end if length(a.memory[:shortterm]) != 0 isuseplan = true end return isuseplan end # function isUsePlans(a::agentReflex) # toollines = "" # for (toolname, v) in a.tools # if toolname ∉ ["chatbox"] # LLM will always use chatbox # toolline = "$toolname: $(v[:description]) $(v[:input]) $(v[:output])\n" # toollines *= toolline # end # end # conversation = messagesToString_nomark(a.messages) # prompt = # """ # <|im_start|>system # $(a.roles[a.role]) # You have access to the following tools: # $toollines # Your conversation with the user: # $conversation # From the conversation, ask yourself what do you intend to do now? # <|im_end|> # """ # # if LLM mentions any tools, use Plan/Thought/Act loop # isuseplan = false # response = sendReceivePrompt(a, prompt, temperature=0.0) # response = split(response, "<|im_end|>")[1] # for (toolname, v) in a.tools # if occursin(toolname, String(response)) # isuseplan = true # break # end # end # if length(a.memory[:shortterm]) != 0 # isuseplan = true # end # return isuseplan # end """ make a conversation summary. ```jldoctest julia> conversation = [ Dict(:role=> "user", :content=> "I would like to get a bottle of wine", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "What kind of Thai dishes are you having?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "It a pad thai.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "Is there any special occasion for this event?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "We'll hold a wedding party at the beach.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "What is your preferred type of wine?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "I like dry white wine with medium tanins.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "What is your preferred price range for this bottle of wine?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "lower than 50 dollars.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "Based on your preferences and our stock, I recommend the following two wines for you: 1. Pierre Girardin \"Murgers des Dents de Chien\" - Saint-Aubin 1er Cru (17 USD) 2. Etienne Sauzet'Les Perrieres' - Puligny Montrachet Premier Cru (22 USD) The first wine, Pierre Girardin \"Murgers des Dents de Chien\" - Saint-Aubin 1er Cru, is a great choice for its affordable price and refreshing taste. It pairs well with Thai dishes and will be perfect for your beach wedding party. The second wine, Etienne Sauzet'Les Perrieres' - Puligny Montrachet Premier Cru, offers a more complex flavor profile and slightly higher price point, but still remains within your budget. Both wines are suitable for serving at 22 C temperature.", :timestamp=> Dates.now()), ] julia> summary = conversationSummary(conversation) ``` """ function conversationSummary(a::T) where {T<:agent} prompt = """ <|im_start|>system You talked with a user earlier. Now you make a detailed bullet summary of the conversation from your perspective. You must refers to yourself by "I" in the summary. Here are the conversation: {conversation} <|im_end|> """ conversation = "" summary = "" if length(a.messages)!= 0 for msg in a.messages[1:end-1] role = msg[:role] content = msg[:content] if role == "user" conversation *= "$role: $content\n" elseif role == "assistant" conversation *= "I: $content\n" else error("undefied condition role = $role $(@__LINE__)") end end prompt = replace(prompt, "{conversation}" => conversation) result = sendReceivePrompt(a, prompt) summary = result === nothing ? "N/A" : result summary = split(summary, "<|im_end|>")[1] if summary[1:1] == "\n" summary = summary[2:end] end end @show summary return summary end """ Convert a vector of dict into 1-continous string. Arguments: vecofdict, a vector of dict Return: 1-continous string # Example ```jldoctest julia> using ChatAgent julia> agent = ChatAgent.agentReflex("Jene") julia> agent.messages = [Dict(:role=> "user", :content=> "Hi there."), Dict(:role=> "assistant", :content=> "Hello! How can I assist you today?"),] julia> messagesToString(agent.messages) "<|im_start|>user: Hi there.\n<|im_end|><|im_start|>assistant: Hello! How can I assist you today?\n<|im_end|>" ``` """ function messagesToString(messages::AbstractVector{T}; addressAIas="assistant") where {T<:AbstractDict} conversation = "" if length(messages)!= 0 for msg in messages role = msg[:role] content = msg[:content] nouse = 0 for i in reverse(content) if i == '\n' || i == ' ' nouse += 1 else break end end if role == "user" conversation *= "<|$role|>\n $(content[1:end-nouse])\n" elseif role == "assistant" conversation *= "<|$addressAIas|>\n $(content[1:end-nouse])\n" else error("undefied condition role = $role $(@__LINE__)") end end else conversation = "N/A" end return conversation end # function messagesToString(messages::AbstractVector{T}; addressAIas="assistant") where {T<:AbstractDict} # conversation = "" # if length(messages)!= 0 # for msg in messages # role = msg[:role] # content = msg[:content] # nouse = 0 # for i in reverse(content) # if i == '\n' || i == ' ' # nouse += 1 # else # break # end # end # if role == "user" # conversation *= "<|im_start|>$role: $(content[1:end-nouse])\n<|im_end|>" # elseif role == "assistant" # conversation *= "<|im_start|>$addressAIas: $(content[1:end-nouse])\n<|im_end|>" # else # error("undefied condition role = $role $(@__LINE__)") # end # end # else # conversation = "N/A" # end # return conversation # end """ Convert a vector of dict into 1-continous string. Arguments: vecofdict, a vector of dict Return: 1-continous string # Example ```jldoctest julia> using ChatAgent julia> agent = ChatAgent.agentReflex("Jene") julia> agent.messages = [Dict(:role=> "user", :content=> "Hi there."), Dict(:role=> "assistant", :content=> "Hello! How can I assist you today?"),] julia> messagesToString(agent.messages) "user: Hi there.\nassistant: Hello! How can I assist you today?\n" ``` """ function messagesToString_nomark(messages::AbstractVector{T}; addressAIas="assistant") where {T<:AbstractDict} conversation = "" if length(messages)!= 0 for msg in messages role = msg[:role] content = msg[:content] content = removeTrailingCharacters(content) if role == "user" conversation *= "$role: $content\n" elseif role == "assistant" conversation *= "$addressAIas: $content\n" else error("undefied condition role = $role $(@__LINE__)") end end else conversation = "N/A" end return conversation end function dictToString(shortMemory::T; skiplist::Union{Vector{String}, Vector{Symbol}}=[""]) where {T<:AbstractDict} s = "" for (k, v) in shortMemory if k ∉ skiplist new_v = removeTrailingCharacters(v) s1 = "$k $new_v\n" s *= s1 end end return s end # function dictToString(dict::T; # skiplist::Union{Array{String}, Array{Symbol}}=[]) where {T<:AbstractDict} # s = "" # for (k, v) in dict # if k ∉ skiplist # s1 = "$k $v" # s *= s1 # # ensure a newline seperate each sentences # if s[end] != "\n" # s *= "\n" # end # end # end # return s # end """ Remove trailing characters from text. Arguments: text, text you want to remove trailing characters charTobeRemoved, a list of characters to be removed Return: text with specified trailing characters removed # Example ```jldoctest julia> text = "Hello! How can I assist you today?\n\n " julia> removelist = ['\n', ' ',] julia> removeTrailingCharacters(text, charTobeRemoved=removelist) "Hello! How can I assist you today?" ``` """ function removeTrailingCharacters(text; charTobeRemoved::AbstractVector{T}=['\n', ' ',]) where {T<:Char} nouse = 0 for i in reverse(text) if i ∈ charTobeRemoved nouse += 1 else break end end return text[1:end-nouse] end #TODO function checkReasonableness(userMsg::String, context::String, tools) # Ref: https://www.youtube.com/watch?v=XV4IBaZqbps prompt = """ <|im_start|>system You are a helpful assistant. Your job is to check the reasonableness of user assignments. If the user assignment can be answered given the tools available say, "This is a reasonable assignment". If the user assignment cannot be answered then provide some feedback to the user that may improve their assignment. Here is the context for the assignment: {context} <|im_end|> <|im_start|>user {assignment} <|im_end|> <|im_start|>assistant """ context = "You have access to the following tools: WineStock: useful for when you need to find info about wine by matching your description, price, name or ID. Input should be a search query with as much details as possible." prompt = replace(prompt, "{assignment}" => userMsg) prompt = replace(prompt, "{context}" => context) output_py = llm( prompt, max_tokens=512, temperature=0.1, # top_p=top_p, echo=false, stop=["", "<>", ], ) _output_jl = pyconvert(Dict, output_py); output = pyconvert(Dict, _output_jl["choices"][1]); output["text"] end """ Add chunked text to a short term memory of a chat agent Arguments: shortMem = short memory of a chat agent, chunkedtext = a dict contains text Return: no return # Example ```jldoctest julia> chunkedtext = OrderedDict{String, String}( "Thought 1:" => " I should always think about...", "Act 1:" => " wikisearch", "Actinput 1:" => " latest AMD GPU",) julia> shortMem = OrderedDict{String, Any}() julia> addShortMem!(shortMem, chunkedtext) OrderedDict{String, Any} with 3 entries: "Thought 1:" => " I should always think about..." "Act 1:" => " wikisearch" "Actinput 1:" => " latest AMD GPU" ``` """ function addShortMem!(shortMem::OrderedDict{String, Any}, chunkedtext::T) where {T<:AbstractDict} for (k, v) in chunkedtext shortMem[k] = v end return shortMem end """ Split text using all keywords in a list. Start spliting from rightmost of the text. Arguments: text = a text you want to split list = a list of keywords you want to split Return: a leftmost text after split # Example ```jldoctest julia> text = "Consider the type of food, occasion and temperature at the serving location." julia> list = ["at", "and"] "Consider the type of food, occasion " ``` """ function splittext(text, list) newtext = text for i in list newtext = split(newtext, i)[1] end return newtext end """ Add step number to header in a text """ function addStepNumber(text::T, headers, step::Int) where {T<:AbstractString} newtext = text for i in headers if occursin(i[:char], newtext) new = replace(i[:char], ":"=> " $step:") newtext = replace(newtext, i[:char]=>new ) end end return newtext end function addStepNumber(text::T, headers, step::Int, substep::Int) where {T<:AbstractString} newtext = text for i in headers if occursin(i[:char], newtext) new = replace(i[:char], ":"=> " $step-$substep:") newtext = replace(newtext, i[:char]=>new ) end end return newtext end """ Add step number to header in a text Arguments: text = a text you want to split headers = a list of keywords you want to add step and substep to step = a number you want to add Return: a leftmost text after split # Example ```jldoctest julia> text = "Consider the type of food, occasion and temperature at the serving location." julia> headers = ["Thought", "Act"] ``` """ function replaceHeaders(text::T, headers, step::Int) where {T<:AbstractString} newtext = text for i in headers header = i[1:end-1] # not include ":" if occursin(header, newtext) startind = findfirst(header, newtext)[1] stopind = findnext(":", newtext, startind+1)[end] #BUG MethodError: no method matching lastindex(::Nothing) word = newtext[startind: stopind] newword = "$header $step:" newtext = replace(newtext, word=> newword) end end return newtext end """ Remove headers of specific step from memory. Arguments: shortMemory = a short term memory of a ChatAgent's agent skipHeaders = a list of keys in memory you want to skip step = a step number you want to remove Return: a short term memory # Example ```jldoctest julia> shortMemory = OrderedDict( "user:" => "May I try this one?", "Plan 1:" => "testing a small portion of icecream", "Thought 1:" => "I like it.", "Act 1:" => "chatbox", "Actinput 1:" => "I get this one.", "Plan 2:" => "I'm meeting my wife this afternoon", "Thought 2:" => "I also want it for my wife", "Act 2:" => "chatbox", "Actinput 2:" => "I would like to get 2 more", ) julia> skipHeaders = ["Plan"] julia> step = 2 julia> removeHeaders(shortMemory, step, skipHeaders) OrderedDict( "user:" => "May I try this one?", "Plan 1:" => "testing a small portion of icecream", "Thought 1:" => "I like it.", "Act 1:" => "chatbox", "Actinput 1:" => "I get this one.", "Plan 2:" => "I'm meeting my wife this afternoon", ) ``` """ function removeHeaders(shortMemory::OrderedDict, step::Int, skipHeaders::Union{Array{String}, Array{Symbol}, Nothing}=nothing) newdict = similar(shortMemory) for (k, v) in shortMemory if occursin("$step", k) if skipHeaders !== nothing for i in skipHeaders if occursin(i, k) newdict[k] = v else # skip, not copy end end else # no copy end else newdict[k] = v end end return newdict end """ Keep only specified keys in a dictionary. All non-specified keys will be removed. Arguments: dict = a dictionary keys = keys you want to keep in a dict Return: a dict with all non-specified keys removed # Example ```jldoctest julia> dict = OrderedDict( "user:" => "May I try this one?", "Plan 1:" => "testing a small portion of icecream", "Thought 1:" => "I like it.", "Act 1:" => "chatbox", "Actinput 1:" => "I get this one.", "Plan 2:" => "I'm meeting my wife this afternoon", "Thought 2:" => "I also want it for my wife", "Act 2:" => "chatbox", "Actinput 2:" => "I would like to get 2 more", ) julia> keys = ["user:"] julia> keepOnlyKeys(dict, keys) OrderedDict( "user:" => "May I try this one?", ) ``` """ function keepOnlyKeys(dict::T1, keys::T2) where {T1<:AbstractDict, T2<:AbstractVector} newdict = similar(dict) for (k, v) in dict if k ∈ keys newdict[k] = v end end return newdict end """ Convert experience dict into 1 string for LLM to use. Arguments: dict = a dictionary contain past experience Return: An experience in 1 string without context keys. # Example ```jldoctest julia> dict = OrderedDict{String, Any}( " This lesson can be applied to various situations => " Gathering accurate and relevant information about the user's preferences, budget, and event details is crucial for providing personalized recommendations.\n" ) julia> experience(dict) ``` """ function experience(dict::T) where {T<:AbstractDict} s = "" for (k, v) in dict s *= v end return s end """ Get the latest step number of short term memory Arguments: dict = a dictionary contain past experience Return: latest step number # Example ```jldoctest julia> dict = OrderedDict( "Plan 1:" => "1. Ask about the type of food that will be served at the wedding party.") julia>shortMemLatestTask(dict) 1 """ function shortMemLatestTask(dict::T) where {T<:AbstractDict} _latest_step = keys(dict) _latest_step = [i for i in _latest_step] _latest_step = _latest_step[end] latest_step = parse(Int, _latest_step[end-2:end-1]) return latest_step end end # end module