Files
YiemAgent/src/util.jl
narawat lamaiin 0a1032c545 update
2025-07-18 07:54:59 +07:00

486 lines
14 KiB
Julia

module util
export clearhistory, addNewMessage, chatHistoryToText, eventdict, noises, createTimeline,
availableWineToText, createEventsLog, createChatLog, checkAgentResponse_JSON,
checkAgentResponse_text
using UUIDs, Dates, DataStructures, HTTP, JSON3
using GeneralUtils
using ..type
# ---------------------------------------------- 100 --------------------------------------------- #
""" Clear agent chat history.
# Arguments
- `a::agent`
an agent
# Return
- nothing
# Example
```jldoctest
julia> using YiemAgent, MQTTClient, GeneralUtils
julia> client, connection = MakeConnection("test.mosquitto.org", 1883)
julia> connect(client, connection)
julia> msgMeta = GeneralUtils.generate_msgMeta("testtopic")
julia> agentConfig = Dict(
:receiveprompt=>Dict(
:mqtttopic=> "testtopic/receive",
),
:receiveinternal=>Dict(
:mqtttopic=> "testtopic/internal",
),
:text2text=>Dict(
:mqtttopic=> "testtopic/text2text",
),
)
julia> a = YiemAgent.sommelier(
client,
msgMeta,
agentConfig,
)
julia> YiemAgent.addNewMessage(a, "user", "hello")
julia> YiemAgent.clearhistory(a)
```
# TODO
- [PENDING] clear memory
# Signature
"""
function clearhistory(a::T) where {T<:agent}
empty!(a.chathistory)
empty!(a.memory[:shortmem])
empty!(a.memory[:events])
a.memory[:chatbox] = ""
end
""" Add new message to agent.
Arguments\n
-----
a::agent
an agent
role::String
message sender role i.e. system, user or assistant
text::String
message text
Return\n
-----
nothing
Example\n
-----
```jldoctest
julia> using YiemAgent, MQTTClient, GeneralUtils
julia> client, connection = MakeConnection("test.mosquitto.org", 1883)
julia> connect(client, connection)
julia> msgMeta = GeneralUtils.generate_msgMeta("testtopic")
julia> agentConfig = Dict(
:receiveprompt=>Dict(
:mqtttopic=> "testtopic/receive",
),
:receiveinternal=>Dict(
:mqtttopic=> "testtopic/internal",
),
:text2text=>Dict(
:mqtttopic=> "testtopic/text2text",
),
)
julia> a = YiemAgent.sommelier(
client,
msgMeta,
agentConfig,
)
julia> YiemAgent.addNewMessage(a, "user", "hello")
```
Signature\n
-----
"""
function addNewMessage(a::T1, name::String, text::T2;
maximumMsg::Integer=30) where {T1<:agent, T2<:AbstractString}
if name ["system", "user", "assistant"] # guard against typo
error("name is not in agent.availableRole $(@__LINE__)")
end
#[PENDING] summarize the oldest 10 message
if length(a.chathistory) > maximumMsg
summarize(a.chathistory)
else
d = Dict(:name=> name, :text=> text, :timestamp=> Dates.now())
push!(a.chathistory, d)
end
end
""" Converts a vector of dictionaries to a formatted string.
This function takes in a vector of dictionaries and outputs a single string where each dictionary's keys are prefixed by their values.
# Arguments
- `vecd::Vector`
A vector of dictionaries containing chat messages
- `withkey::Bool`
Whether to include the name as a prefix in the output text. Default is true
- `range::Union{Nothing,UnitRange,Int}`
Optional range of messages to include. If nothing, includes all messages
# Returns
A formatted string where each line contains either:
- If withkey=true: "name> message\n"
- If withkey=false: "message\n"
# Example
julia> using Revise
julia> using GeneralUtils
julia> vecd = [Dict(:name => "John", :text => "Hello"), Dict(:name => "Jane", :text => "Goodbye")]
julia> GeneralUtils.vectorOfDictToText(vecd, withkey=true)
"John> Hello\nJane> Goodbye\n"
```
"""
function chatHistoryToText(vecd::Vector; withkey=true, range=nothing)::String
# Initialize an empty string to hold the final text
text = ""
# Get the elements within the specified range, or all elements if no range provided
elements = isnothing(range) ? vecd : vecd[range]
# Determine whether to include the key in the output text or not
if withkey
# Loop through each dictionary in the input vector
for d in elements
# Extract the 'name' and 'text' keys from the dictionary
name = titlecase(d[:name])
_text = d[:text]
# Append the formatted string to the text variable
text *= "$name> $_text \n"
end
else
# Loop through each dictionary in the input vector
for d in elements
# Iterate over all key-value pairs in the dictionary
for (k, v) in d
# Append the formatted string to the text variable
text *= "$v \n"
end
end
end
# Return the final text
return text
end
function availableWineToText(vecd::Vector)::String
# Initialize an empty string to hold the final text
rowtext = ""
# Loop through each dictionary in the input vector
for (i, d) in enumerate(vecd)
# Iterate over all key-value pairs in the dictionary
temp = []
for (k, v) in d
# Append the formatted string to the text variable
t = "$k:$v"
push!(temp, t)
end
_rowtext = join(temp, ',')
rowtext *= "$i) $_rowtext "
end
return rowtext
end
""" Create a dictionary representing an event with optional details.
# Arguments
- `event_description::Union{String, Nothing}`
A description of the event
- `timestamp::Union{DateTime, Nothing}`
The time when the event occurred
- `subject::Union{String, Nothing}`
The subject or entity associated with the event
- `thought::Union{AbstractDict, Nothing}`
Any associated thoughts or metadata
- `actionname::Union{String, Nothing}`
The name of the action performed (e.g., "CHAT", "CHECKINVENTORY")
- `actioninput::Union{String, Nothing}`
Input or parameters for the action
- `location::Union{String, Nothing}`
Where the event took place
- `equipment_used::Union{String, Nothing}`
Equipment involved in the event
- `material_used::Union{String, Nothing}`
Materials used during the event
- `outcome::Union{String, Nothing}`
The result or consequence of the event after action execution
- `note::Union{String, Nothing}`
Additional notes or comments
# Returns
A dictionary with event details as symbol-keyed key-value pairs
"""
function eventdict(;
event_description::Union{String, Nothing}=nothing,
timestamp::Union{DateTime, Nothing}=nothing,
subject::Union{String, Nothing}=nothing,
thought::Union{AbstractDict, Nothing}=nothing,
actionname::Union{String, Nothing}=nothing, # "CHAT", "CHECKINVENTORY", "PRESENTBOX", etc
actioninput::Union{String, Nothing}=nothing,
location::Union{String, Nothing}=nothing,
equipment_used::Union{String, Nothing}=nothing,
material_used::Union{String, Nothing}=nothing,
observation::Union{String, Nothing}=nothing,
note::Union{String, Nothing}=nothing,
)
d = Dict{Symbol, Any}(
:event_description=> event_description,
:timestamp=> timestamp,
:subject=> subject,
:thought=> thought,
:actionname=> actionname,
:actioninput=> actioninput,
:location=> location,
:equipment_used=> equipment_used,
:material_used=> material_used,
:observation=> observation,
:note=> note,
)
return d
end
""" Create a formatted timeline string from a sequence of events.
# Arguments
- `events::T1`
Vector of event dictionaries containing subject, actioninput and optional outcome fields
Each event dictionary should have the following keys:
- :subject - The subject or entity performing the action
- :actioninput - The action or input performed by the subject
- :observation - (Optional) The result or outcome of the action
# Returns
- `timeline::String`
A formatted string representing the events with their subjects, actions, and optional outcomes
Format: "{index}) {subject}> {actioninput} {outcome}\n" for each event
# Example
events = [
Dict(:subject => "User", :actioninput => "Hello", :observation => nothing),
Dict(:subject => "Assistant", :actioninput => "Hi there!", :observation => "with a smile")
]
timeline = createTimeline(events)
# 1) User> Hello
# 2) Assistant> Hi there! with a smile
"""
function createTimeline(events::T1; eventindex::Union{UnitRange, Nothing}=nothing
) where {T1<:AbstractVector}
# Initialize empty timeline string
timeline = ""
# Determine which indices to use - either provided range or full length
ind =
if eventindex !== nothing
[eventindex...]
else
1:length(events)
end
# Iterate through events and format each one
for i in ind
event = events[i]
# If no outcome exists, format without outcome
# if event[:actionname] == "CHATBOX"
# timeline *= "Event_$i $(event[:subject])> actionname: $(event[:actionname]), actioninput: $(event[:actioninput])\n"
# elseif event[:actionname] == "CHECKINVENTORY" && event[:observation] === nothing
# timeline *= "Event_$i $(event[:subject])> actionname: $(event[:actionname]), actioninput: $(event[:actioninput]), observation: Not done yet.\n"
# If outcome exists, include it in formatting
if event[:actionname] == "CHECKWINE"
timeline *= "Event_$i $(event[:subject])> actionname: $(event[:actionname]), actioninput: $(event[:actioninput]), observation: $(event[:observation])\n"
else
timeline *= "Event_$i $(event[:subject])> actionname: $(event[:actionname]), actioninput: $(event[:actioninput])\n"
end
end
# Return formatted timeline string
return timeline
end
# function createTimeline(events::T1; eventindex::Union{UnitRange, Nothing}=nothing
# ) where {T1<:AbstractVector}
# # Initialize empty timeline string
# timeline = ""
# # Determine which indices to use - either provided range or full length
# ind =
# if eventindex !== nothing
# [eventindex...]
# else
# 1:length(events)
# end
# # Iterate through events and format each one
# for i in ind
# event = events[i]
# # If no outcome exists, format without outcome
# if event[:observation] === nothing
# timeline *= "Event_$i $(event[:subject])> actionname: $(event[:actionname]), actioninput: $(event[:actioninput]), observation: Not done yet.\n"
# # If outcome exists, include it in formatting
# else
# timeline *= "Event_$i $(event[:subject])> actionname: $(event[:actionname]), actioninput: $(event[:actioninput]), observation: $(event[:observation])\n"
# end
# end
# # Return formatted timeline string
# return timeline
# end
function createEventsLog(events::T1; index::Union{UnitRange, Nothing}=nothing
) where {T1<:AbstractVector}
# Initialize empty log array
log = Dict{Symbol, String}[]
# Determine which indices to use - either provided range or full length
ind =
if index !== nothing
[index...]
else
1:length(events)
end
# Iterate through events and format each one
for i in ind
event = events[i]
# If no outcome exists, format without outcome
if event[:observation] === nothing
subject = event[:subject]
actionname = event[:actionname]
actioninput = event[:actioninput]
str = "actionname: $actionname, actioninput: $actioninput"
d = Dict{Symbol, String}(:name=>subject, :text=>str)
push!(log, d)
else
subject = event[:subject]
actionname = event[:actionname]
actioninput = event[:actioninput]
observation = event[:observation]
str = "actionname: $actionname, actioninput: $actioninput, observation: $observation"
d = Dict{Symbol, String}(:name=>subject, :text=>str)
push!(log, d)
end
end
return log
end
function createChatLog(chatdict::T1; index::Union{UnitRange, Nothing}=nothing
) where {T1<:AbstractVector}
# Initialize empty log array
log = Dict{Symbol, String}[]
# Determine which indices to use - either provided range or full length
ind =
if index !== nothing
[index...]
else
1:length(chatdict)
end
# Iterate through events and format each one
for i in ind
event = chatdict[i]
subject = event[:name]
text = event[:text]
d = Dict{Symbol, String}(:name=>subject, :text=>text)
push!(log, d)
end
return log
end
function checkAgentResponse_text(response::String, requiredHeader::T
)::Tuple where {T<:Array{String}}
detected_kw = GeneralUtils.detectKeywordVariation(requiredHeader, response)
missingkeys = [k for (k, v) in detected_kw if v === nothing]
ispass = false
errormsg = nothing
if !isempty(missingkeys)
errormsg = "$missingkeys are missing from your previous response"
ispass = false
elseif sum([length(i) for i in values(detected_kw)]) > length(requiredHeader)
errormsg = "Your previous attempt has duplicated points according to the required response format"
ispass = false
else
ispass = true
end
return (ispass, errormsg)
end
function checkAgentResponse_JSON(responsedict::Dict, requiredKeys::T
)::Tuple where {T<:Array{Symbol}}
_responsedictKey = keys(responsedict)
responsedictKey = [i for i in _responsedictKey] # convert into a list
is_requiredKeys_in_responsedictKey = [i responsedictKey for i in requiredKeys]
ispass = false
errormsg = nothing
if length(is_requiredKeys_in_responsedictKey) > length(requiredKeys)
errormsg = "Your previous attempt has duplicated points according to the required response format"
ispass = false
elseif !all(is_requiredKeys_in_responsedictKey)
zeroind = findall(x -> x == 0, is_requiredKeys_in_responsedictKey)
missingkeys = [requiredKeys[i] for i in zeroind]
errormsg = "$missingkeys are missing from your previous response"
ispass = false
else
ispass = true
end
return (ispass, errormsg)
end
end # module util