module utils export makeSummary, sendReceivePrompt, chunktext, extractStepFromPlan, checkTotalStepInPlan, detectCharacters, findDetectedCharacter, extract_number, toolNameBeingCalled, isUseTools, conversationSummary, checkReasonableness, replaceHeaders, addShortMem!, splittext, dictToString, removeHeaders, keepOnlyKeys, experience using UUIDs, Dates, DataStructures using CommUtils, GeneralUtils using ..type #------------------------------------------------------------------------------------------------100 function makeSummary(a::T1, input::T2) where {T1<:agent, T2<:AbstractString} summary = "Nothing." prompt = """ <|im_start|>system Input text: $input Your job is to determine now whether you can make a summary of the input text by choosing one of following choices: If you cannot make a summary say, "{No}". If you can make a summary say, "{Yes}". <|im_end|> """ prompt = replace(prompt, "{input}" => input) result = sendReceivePrompt(a, prompt) result = GeneralUtils.getStringBetweenCharacters(result, "{", "}") if result == "Yes" # seperate summary part prompt = """ <|im_start|>system Input text: $input Your job is to make a concise summary of the input text. <|im_end|> """ result = sendReceivePrompt(a, prompt) if result[1:1] == "\n" summary = result[2:end] else summary = result end end input_summary = input @show input_summary @show summary return summary end """ Send a msg to registered mqtt topic within mqttClient. ```jldoctest julia> using JSON3, UUIDs, Dates, FileIO, CommUtils, ChatAgent julia> newAgent = ChatAgent.agentReact( "Jene", mqttClientSpec, role=:assistant_react, msgMeta=msgMeta ) ``` """ function sendReceivePrompt(a::T, prompt::String; max_tokens=256, timeout::Int=120, temperature::AbstractFloat=0.2) where {T<:agent} a.msgMeta[:msgId] = "$(uuid4())" # new msg id for each msg msg = Dict( :msgMeta=> a.msgMeta, :txt=> prompt, :max_tokens=> max_tokens, :temperature=> temperature, ) payloadChannel = Channel(1) # send prompt CommUtils.request(a.mqttClient, msg) starttime = Dates.now() result = nothing while true timepass = (Dates.now() - starttime).value / 1000.0 CommUtils.mqttRun(a.mqttClient, payloadChannel) if isready(payloadChannel) topic, payload = take!(payloadChannel) if payload[:msgMeta][:repondToMsgId] == msg[:msgMeta][:msgId] result = haskey(payload, :txt) ? payload[:txt] : nothing break end elseif timepass <= timeout # skip, within waiting period elseif timepass > timeout println("sendReceivePrompt timeout $timepass/$timeout") result = nothing break else error("undefined condition. timepass=$timepass timeout=$timeout $(@__LINE__)") end end return result end """ Detect given characters. Output is a list of named tuple of detected char. ```jldoctest julia> text = "I like to eat apples and use utensils." julia> characters = ["eat", "use", "i"] julia> result = detectCharacters(text, characters) 4-element Vector{Any}: (char = "i", start = 4, stop = 4) (char = "eat", start = 11, stop = 13) (char = "use", start = 26, stop = 28) (char = "i", start = 35, stop = 35) ``` """ function detectCharacters(text::T1, characters::Vector{T2}) where {T1<:AbstractString, T2<:AbstractString} result = [] for i in eachindex(text) for char in characters l = length(char) char_startInd = i char_endInd = i+l-1 # -1 because Julia use inclusive index if char_endInd > length(text) # skip else try # some time StringIndexError: invalid index [535], valid nearby indices [534]=>'é', [536]=>' ' if text[char_startInd: char_endInd] == char push!(result, (char=char, start=char_startInd, stop=char_endInd)) end catch end end end end return result end """ Chunk a text into smaller pieces by header. ```jldoctest julia> using ChatAgent julia> text = "Plan: First, we need to find out what kind of wine the user wants." julia> headers = ChatAgent.detectCharacters(text, ["Nope", "sick", "First", "user", "Then", ]) 3-element Vector{Any}: (char = "First", start = 7, stop = 11) (char = "user", start = 56, stop = 59) (char = "Then", start = 102, stop = 105) julia> chunkedtext = ChatAgent.chunktext(text, headers) OrderedDict{String, String} with 3 entries: "Act 1:" => " wikisearch" "Actinput 1:" => " latest AMD GPU" "Thought 1:" => " I should always think about..." ``` """ function chunktext(text::T1, headers::T2) where {T1<:AbstractString, T2<:AbstractVector} result = OrderedDict{String, Any}() for (i, v) in enumerate(headers) if i < length(headers) nextheader = headers[i+1] body = text[v[:stop]+1: nextheader[:start]-1] # push!(result, (header=v[:char], body=body)) result[v[:char]] = body else body = text[v[:stop]+1: end] # push!(result, (header=v[:char], body=body)) result[v[:char]] = body end end return result end function extractStepFromPlan(a::agent, plan::T, step::Int) where {T<:AbstractString} prompt = """ <|im_start|>system You are a helpful assistant. Your job is to extract step $step in the user plan. Use the following format only: {copy the step and put it here} <|im_end|> <|im_start|>user $plan <|im_end|> <|im_start|>assistant """ response = sendReceivePrompt(a, prompt) return response end function checkTotalStepInPlan(a::agent) headers = [] for (k, v) in a.memory[:shortterm] push!(headers, k) end # Plan will have number e.g. Plan 3: so I need a way to detect latest Plan header = nothing for i in reverse(headers) if occursin("Plan", i) header = i break end end p = a.memory[:shortterm][header] plan = "Plan: $p" prompt = """ <|im_start|>system You are a helpful assistant. Your job is to determine how many steps in a user plan. Use the following format to answer: Total step number is {} <|im_end|> <|im_start|>user $plan <|im_end|> <|im_start|>assistant """ response = sendReceivePrompt(a, prompt) result = extract_number(response) return result end """ Find a given character from a vector of named tuple. Output is character location index inside detectedCharacters ```jldoctest julia a = [ (char = "i", start = 4, stop = 4) (char = "eat", start = 11, stop = 13) (char = "use", start = 26, stop = 28) (char = "i", start = 35, stop = 35) ] julia> findDetectedCharacter(a, "i") [1, 4] ``` """ function findDetectedCharacter(detectedCharacters, character) allchar = [i[1] for i in detectedCharacters] return findall(isequal.(allchar, character)) end function extract_number(text::T) where {T<:AbstractString} regex = r"\d+" # regular expression to match one or more digits match = Base.match(regex, text) # find the first match in the text if match !== nothing number = parse(Int, match.match) return number else error("No number found in the text $(@__LINE__)") end end """ Extract toolname from text. ```jldoctest julia> text = " internetsearch\n" julia> tools = Dict( :internetsearch=>Dict( :name => "internetsearch", :description => "Useful for when you need to search the Internet", :input => "Input should be a search query.", :output => "", # :func => internetsearch # function ), :chatbox=>Dict( :name => "chatbox", :description => "Useful for when you need to ask a customer what you need to know or to talk with them.", :input => "Input should be a conversation to customer.", :output => "" , ), ) julia> toolname = toolNameBeingCalled(text, tools) ``` """ function toolNameBeingCalled(text::T, tools::Dict) where {T<:AbstractString} toolNameBeingCalled = nothing for (k, v) in tools toolname = String(k) if contains(text, toolname) toolNameBeingCalled = toolname break end end return toolNameBeingCalled end # function chooseThinkingMode(a::agentReflex, usermsg::String) # thinkingmode = nothing # if length(a.memory[:log]) != 0 # thinkingmode = :continue_thinking # else # prompt = # """ # <|im_start|>system # {systemMsg} # You always use tools if there is a chance to impove your response. # You have access to the following tools: # {tools} # Your job is to determine whether you will use tools or actions to response. # Choose one of the following choices: # Choice 1: If you don't need to use tools or actions to response to the stimulus say, "{no}". # Choice 2: If you think the user want to get wine say, "{yes}". # <|im_end|> # <|im_start|>user # {input} # <|im_end|> # <|im_start|>assistant # """ # toollines = "" # for (toolname, v) in a.tools # if toolname ∉ ["chatbox", "nothing"] # toolline = "$toolname: $(v[:description]) $(v[:input]) $(v[:output])\n" # toollines *= toolline # end # end # prompt = replace(prompt, "{systemMsg}" => a.roles[a.role]) # prompt = replace(prompt, "{tools}" => toollines) # prompt = replace(prompt, "{input}" => usermsg) # result = sendReceivePrompt(a, prompt) # willusetools = GeneralUtils.getStringBetweenCharacters(result, "{", "}") # thinkingmode = willusetools == "yes" ? :new_thinking : :no_thinking # end # return thinkingmode # end function isUseTools(a::agentReflex, usermsg::String) prompt = """ <|im_start|>system {systemMsg} You have access to the following tools: {tools} User's message: {input} From the user's message, Do you need to any tools before responseing? Answer: {Yes/No/Not sure}. What will the you do? <|im_end|> <|im_start|>assistant Answer: """ toollines = "" for (toolname, v) in a.tools if toolname ∉ [:chatbox] toolline = "$toolname: $(v[:description]) $(v[:input]) $(v[:output])\n" toollines *= toolline end end prompt = replace(prompt, "{systemMsg}" => a.roles[a.role]) prompt = replace(prompt, "{tools}" => toollines) prompt = replace(prompt, "{input}" => usermsg) result = sendReceivePrompt(a, prompt, temperature=0.2) if occursin("Yes", result) return true else return false end end """ make a conversation summary. ```jldoctest julia> conversation = [ Dict(:role=> "user", :content=> "I would like to get a bottle of wine", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "What kind of Thai dishes are you having?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "It a pad thai.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "Is there any special occasion for this event?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "We'll hold a wedding party at the beach.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "What is your preferred type of wine?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "I like dry white wine with medium tanins.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "What is your preferred price range for this bottle of wine?", :timestamp=> Dates.now()), Dict(:role=> "user", :content=> "lower than 50 dollars.", :timestamp=> Dates.now()), Dict(:role=> "assistant", :content=> "Based on your preferences and our stock, I recommend the following two wines for you: 1. Pierre Girardin \"Murgers des Dents de Chien\" - Saint-Aubin 1er Cru (17 USD) 2. Etienne Sauzet'Les Perrieres' - Puligny Montrachet Premier Cru (22 USD) The first wine, Pierre Girardin \"Murgers des Dents de Chien\" - Saint-Aubin 1er Cru, is a great choice for its affordable price and refreshing taste. It pairs well with Thai dishes and will be perfect for your beach wedding party. The second wine, Etienne Sauzet'Les Perrieres' - Puligny Montrachet Premier Cru, offers a more complex flavor profile and slightly higher price point, but still remains within your budget. Both wines are suitable for serving at 22 C temperature.", :timestamp=> Dates.now()), ] julia> summary = conversationSummary(conversation) ``` """ function conversationSummary(a::T) where {T<:agent} prompt = """ <|im_start|>system You talked with a user earlier. Now you make a detailed bullet summary of the conversation from your perspective. You must refers to yourself by "I" in the summary. Here are the conversation: {conversation} <|im_end|> """ conversation = "" summary = "nothing" if length(a.messages)!= 0 for msg in a.messages[1:end-1] role = msg[:role] content = msg[:content] if role == "user" conversation *= "$role: $content\n" elseif role == "assistant" conversation *= "I: $content\n" else error("undefied condition role = $role $(@__LINE__)") end end prompt = replace(prompt, "{conversation}" => conversation) result = sendReceivePrompt(a, prompt) summary = result === nothing ? "nothing" : result summary = split(summary, "<|im_end|>")[1] if summary[1:1] == "\n" summary = summary[2:end] end end @show summary return summary end #TODO function checkReasonableness(userMsg::String, context::String, tools) # Ref: https://www.youtube.com/watch?v=XV4IBaZqbps prompt = """ <|im_start|>system You are a helpful assistant. Your job is to check the reasonableness of user assignments. If the user assignment can be answered given the tools available say, "This is a reasonable assignment". If the user assignment cannot be answered then provide some feedback to the user that may improve their assignment. Here is the context for the assignment: {context} <|im_end|> <|im_start|>user {assignment} <|im_end|> <|im_start|>assistant """ context = "You have access to the following tools: WineStock: useful for when you need to find info about wine by matching your description, price, name or ID. Input should be a search query with as much details as possible." prompt = replace(prompt, "{assignment}" => userMsg) prompt = replace(prompt, "{context}" => context) output_py = llm( prompt, max_tokens=512, temperature=0.1, # top_p=top_p, echo=false, stop=["", "<>", ], ) _output_jl = pyconvert(Dict, output_py); output = pyconvert(Dict, _output_jl["choices"][1]); output["text"] end """ Add chunked text to a short term memory of a chat agent Arguments: shortMem = short memory of a chat agent, chunkedtext = a dict contains text Return: no return # Example ```jldoctest julia> chunkedtext = OrderedDict{String, String}( "Thought 1:" => " I should always think about...", "Act 1:" => " wikisearch", "Actinput 1:" => " latest AMD GPU",) julia> shortMem = OrderedDict{String, Any}() julia> addShortMem!(shortMem, chunkedtext) OrderedDict{String, Any} with 3 entries: "Thought 1:" => " I should always think about..." "Act 1:" => " wikisearch" "Actinput 1:" => " latest AMD GPU" ``` """ function addShortMem!(shortMem::OrderedDict{String, Any}, chunkedtext::T) where {T<:AbstractDict} for (k, v) in chunkedtext shortMem[k] = v end return shortMem end """ Split text using all keywords in a list. Start spliting from rightmost of the text. Arguments: text = a text you want to split list = a list of keywords you want to split Return: a leftmost text after split # Example ```jldoctest julia> text = "Consider the type of food, occasion and temperature at the serving location." julia> list = ["at", "and"] "Consider the type of food, occasion " ``` """ function splittext(text, list) newtext = text for i in list newtext = split(newtext, i)[1] end return newtext end """ Add step number to header in a text """ function addStepNumber(text::T, headers, step::Int) where {T<:AbstractString} newtext = text for i in headers if occursin(i[:char], newtext) new = replace(i[:char], ":"=> " $step:") newtext = replace(newtext, i[:char]=>new ) end end return newtext end function addStepNumber(text::T, headers, step::Int, substep::Int) where {T<:AbstractString} newtext = text for i in headers if occursin(i[:char], newtext) new = replace(i[:char], ":"=> " $step-$substep:") newtext = replace(newtext, i[:char]=>new ) end end return newtext end """ Add step number to header in a text Arguments: text = a text you want to split headers = a list of keywords you want to add step and substep to Return: a leftmost text after split # Example ```jldoctest julia> text = "Consider the type of food, occasion and temperature at the serving location." julia> headers = ["Thought", "Act"] ``` """ function replaceHeaders(text::T, headers, step::Int) where {T<:AbstractString} newtext = text for i in headers header = i[1:end-1] # not include ":" if occursin(header, newtext) startind = findfirst(header, newtext)[1] stopind = findnext(":", newtext, startind+1)[end] word = newtext[startind: stopind] newword = "$header $step:" newtext = replace(newtext, word=> newword) end end return newtext end """ Convert short term memory into 1 continous string. Arguments: shortMemory = a short term memory of a ChatAgent's agent skiplist = a list of keys in memory you want to skip Return: a short term memory in 1 countinuous string # Example ```jldoctest julia> shortMemory = OrderedDict( "user:" => "Umm", "Thought 1:" => "I like it.", "Act 1:" => "chatbox", "Actinput 1:" => "I get this one.", ) julia> headers = ["user:"] julia> dictToString(shortMemory, headers) "Thought 1: I like it.\nAct 1: chatbox\nActinput 1: I get this one.\n" ``` """ function shortMemoryToString(shortMemory::OrderedDict, skiplist::Union{Array{String}, Array{Symbol}}) s = "" for (k, v) in shortMemory if k ∉ skiplist s1 = "$k $v" s *= s1 # ensure a newline seperate each sentences if s[end] != "\n" s *= "\n" end end end return s end function dictToString(dict::T, skiplist::Union{Array{String}, Array{Symbol}}) where {T<:AbstractDict} s = "" for (k, v) in dict if k ∉ skiplist s1 = "$k $v" s *= s1 # ensure a newline seperate each sentences if s[end] != "\n" s *= "\n" end end end return s end """ Remove headers of specific step from memory. Arguments: shortMemory = a short term memory of a ChatAgent's agent skipHeaders = a list of keys in memory you want to skip step = a step number you want to remove Return: a short term memory # Example ```jldoctest julia> shortMemory = OrderedDict( "user:" => "May I try this one?", "Plan 1:" => "testing a small portion of icecream", "Thought 1:" => "I like it.", "Act 1:" => "chatbox", "Actinput 1:" => "I get this one.", "Plan 2:" => "I'm meeting my wife this afternoon", "Thought 2:" => "I also want it for my wife", "Act 2:" => "chatbox", "Actinput 2:" => "I would like to get 2 more", ) julia> skipHeaders = ["Plan"] julia> step = 2 julia> removeHeaders(shortMemory, step, skipHeaders) OrderedDict( "user:" => "May I try this one?", "Plan 1:" => "testing a small portion of icecream", "Thought 1:" => "I like it.", "Act 1:" => "chatbox", "Actinput 1:" => "I get this one.", "Plan 2:" => "I'm meeting my wife this afternoon", ) ``` """ #WORKING function removeHeaders(shortMemory::OrderedDict, step::Int, skipHeaders::Union{Array{String}, Array{Symbol}, Nothing}=nothing) newdict = similar(shortMemory) for (k, v) in shortMemory if occursin("$step", k) if skipHeaders !== nothing for i in skipHeaders if occursin(i, k) newdict[k] = v else # skip, not copy end end else # no copy end else newdict[k] = v end end return newdict end """ Keep only specified keys in a dictionary. All non-specified keys will be removed. Arguments: dict = a dictionary keys = keys you want to keep in a dict Return: a dict with all non-specified keys removed # Example ```jldoctest julia> dict = OrderedDict( "user:" => "May I try this one?", "Plan 1:" => "testing a small portion of icecream", "Thought 1:" => "I like it.", "Act 1:" => "chatbox", "Actinput 1:" => "I get this one.", "Plan 2:" => "I'm meeting my wife this afternoon", "Thought 2:" => "I also want it for my wife", "Act 2:" => "chatbox", "Actinput 2:" => "I would like to get 2 more", ) julia> keys = ["user:"] julia> keepOnlyKeys(dict, keys) OrderedDict( "user:" => "May I try this one?", ) ``` """ function keepOnlyKeys(dict::T1, keys::T2) where {T1<:AbstractDict, T2<:AbstractVector} newdict = similar(dict) for (k, v) in dict if k ∈ keys newdict[k] = v end end return newdict end """ Convert experience dict into 1 string for LLM to use. Arguments: dict = a dictionary contain past experience Return: An experience in 1 string without context keys. # Example ```jldoctest julia> dict = OrderedDict{String, Any}( " This lesson can be applied to various situations => " Gathering accurate and relevant information about the user's preferences, budget, and event details is crucial for providing personalized recommendations.\n" ) julia> experience(dict) ``` """ function experience(dict::T) where {T<:AbstractDict} s = "" for (k, v) in dict s *= v end return s end end # end module