This commit is contained in:
2024-09-13 15:07:19 +07:00
commit c8684dea31
28 changed files with 9143 additions and 0 deletions

640
src/communication.jl Normal file
View File

@@ -0,0 +1,640 @@
module communication
export generate_msgMeta, isMqttConnectionAlive, checkMqttConnection!,
sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance
using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient
using ..util
# ---------------------------------------------- 100 --------------------------------------------- #
mutable struct mqttClientInstance
mqttBrokerAddress::String
mqttBrokerPort::Integer
subtopic::Vector{String}
receiveDataChannel::Channel
onMsgCallback::Function
qos::MQTTClient.QOS
client::MQTTClient.Client
connection::MQTTClient.Connection
keepalivetopic::String
keepaliveChannel::Channel
keepaliveCheckInterval::Integer # second
lastTimeMqttConnCheck::DateTime
end
function mqttClientInstance(
mqttBrokerAddress::String,
mqttBrokerPort::Integer,
subtopic::Vector{String},
receiveDataChannel::Channel,
keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback()
onMsgCallback::Function
;
keepalivetopic::String= "/keepalive/$(uuid4())",
keepaliveCheckInterval::Integer=30,
qos::MQTTClient.QOS=QOS_1,
)
client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
MQTTClient.connect(client, connection)
for i in subtopic
MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
end
MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos)
instance = mqttClientInstance(
mqttBrokerAddress,
mqttBrokerPort,
subtopic,
receiveDataChannel,
onMsgCallback,
qos,
client,
connection,
keepalivetopic,
keepaliveChannel,
keepaliveCheckInterval,
Dates.now(),
)
return instance
end
""" Generate msgMeta to be including in a message. So the message receiver know
what to do with the message.
Arguments\n
-----
sendTopic::String
topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
Keyword Arguments\n
-----
msgPurpose::String
purpose of this message e.g. "updateStatus"
senderName::String
sender name (String) e.g. "agent-wine-web-frontend"
senderId::String
sender id e.g. string(uuid4())
receiverName::String
msg receiver name (String) e.g. "agent-backend"
receiverId::String
msg receiver id, nothing means everyone in the topic e.g. string(uuid4())
requestresponse::String
this message is a "request" or "response"
getpost::String
this message is a "get" or "post"
msgId::String
message ID e.g. string(uuid4())
timestamp::String
message published timestamp e.g. string(Dates.now())
senderselfnote::Any
help sender manage incoming reply msg, could be "for text inference", "img generation"
replyTopic::String
sender ask receiver to reply to this topic e.g. "/agent/frontend"
replyToMsgId::String
sender is responding to this msg ID e.g. string(uuid4())
acknowledgestatus::String
e.g. "acknowledged"
mqttBrokerAddress::String
MQTT broker URI. e.g. "test.mosquitto.org"
mqttBrokerPort::Integer
MQTT broker port
Return\n
-----
msgMeta::Dict{Symbol, Any}
Example\n
-----
```jldoctest
julia> using Revise
julia> using GeneralUtils
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/agent/frontend/wine/chat/api/v1/txt/receive",
msgPurpose="keepalive",
senderName="keepaliveservice")
```
Signature\n
-----
"""
function generate_msgMeta(
sendTopic::T1, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
;
msgPurpose::Union{T1, Nothing}= nothing, # purpose of this message e.g. "updateStatus"
senderName::Union{T1, Nothing}= nothing, # sender name (String) e.g. "agent-wine-web-frontend"
senderId::Union{T1, Nothing}= nothing, # sender id e.g. string(uuid4())
receiverName::Union{T1, Nothing}= nothing, # msg receiver name (String) e.g. "agent-backend"
receiverId::Union{T1, Nothing}= nothing, # id of msg receiver, nothing means everyone in the topic e.g. string(uuid4())
requestresponse::Union{T1, Nothing}= nothing, # this message is a "request" or "response"
getpost::Union{T1, Nothing}= nothing, # this message is a "get" or "post"
msgId::Union{T1, Nothing}= string(uuid4()),
timestamp= string(Dates.now()), # message published timestamp
# help sender manage incoming reply msg, could be "for text inference", "img generation"
senderselfnote::Any= nothing,
# sender ask receiver to reply to this topic
# e.g. "/agent/frontend/wine/chat/api/v1/txt/receive"
replyTopic::Union{T1, Nothing}= nothing,
replyToMsgId::Union{T1, Nothing}= nothing, # sender is responding to this msg ID
acknowledgestatus::Union{T1, Nothing}= nothing, # "acknowledged",
mqttBrokerAddress::T1= "test.mosquitto.org",
mqttBrokerPort::Integer= 1883,
msgFormatVersion::Union{T1, Nothing}= nothing,
)::Dict{Symbol, Any} where {T1<:AbstractString}
msgMeta::Dict=Dict(
:sendTopic=> sendTopic, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
:msgPurpose=> msgPurpose, # purpose of this message e.g. "updateStatus"
:senderName=> senderName, # sender name (String) e.g. "agent-wine-web-frontend"
:senderId=> senderId, # sender id e.g. string(uuid4())
:receiverName=> receiverName, # msg receiver name (String) e.g. "agent-backend"
:receiverId=> receiverId, # msg receiver id, nothing means everyone in the topic e.g. string(uuid4())
:requestResponse=> requestresponse, # this message is a "request" or "response"
:getPost=> getpost, # this message is a "get" or "post"
:msgId=> msgId,
:timeStamp=> timestamp, # message published timestamp
# help sender manage incoming reply msg, could be "for text inference", "img generation"
:senderSelfnote=>senderselfnote,
# sender ask receiver to reply to this topic
# e.g. "/agent/frontend/wine/chat/api/v1/txt/receive"
:replyTopic=> replyTopic,
:replyToMsgId=> replyToMsgId,
:acknowledgestatus=> acknowledgestatus,
:mqttBrokerAddress=> mqttBrokerAddress,
:mqttBrokerPort=> mqttBrokerPort,
:msgFormatVersion=> msgFormatVersion,
)
return msgMeta
end
""" Check mqtt server connection.
Arguments\n
-----
mqttInstanceDict::Dict{Symbol, Any}
a dictionary contain mqtt instance. 1 per mqtt client.
interval::Integer
time interval to check mqtt server in seconds
Return\n
-----
isconnectionalive::Bool
true if mqtt connection is alive
Example\n
-----
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, MQTTClient
julia> mqttMsgReceiveChannel = Channel(8)
julia> keepaliveChannel = Channel(8)
julia> function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel, onMsg)
end
julia> mqttInstanceDict = Dict{Symbol, Any}(
:mqttServerInfo=> Dict(
:broker=> "test.mosquitto.org",
:port=> 1883,
),
:onMsgCallback=> onMsgCallback,
:client=> nothing,
:connection=> nothing,
:subtopic=> ["/servicetopic", "/keepalivetopic"],
:keepalivetopic=> "/keepalive",
:keepaliveChannel=> keepaliveChannel,
:lastTimeMqttConnCheck=> Dates.now(),
)
julia> mqttInstanceDict[:client], mqttInstanceDict[:connection] =
MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker],
mqttInstanceDict[:mqttServerInfo][:port])
julia> connect(mqttInstanceDict[:client], mqttInstanceDict[:connection])
julia> mqttConnStatus = GeneralUtils.isMqttConnectionAlive(mqttInstanceDict)
julia> println(mqttConnStatus)
```
Signature\n
-----
"""
function isMqttConnectionAlive(mqttInstanceDict::Dict{Symbol, Any})::Bool
starttime = Dates.now()
isconnectionalive = false
# ditch old keepalive msg is any
while isready(mqttInstanceDict[:keepaliveChannel])
_ = take!(mqttInstanceDict[:keepaliveChannel])
end
msgMeta = generate_msgMeta(
mqttInstanceDict[:keepalivetopic],
msgPurpose= "keepalive",
)
keepaliveMsg = Dict(
:msgMeta => msgMeta,
:text=>"keepalive",
)
publish(mqttInstanceDict[:client], keepaliveMsg[:msgMeta][:sendTopic],
JSON3.write(keepaliveMsg))
timediff = 0
while timediff < 5
timediff = timedifference(starttime, Dates.now(), "seconds")
if isready(mqttInstanceDict[:keepaliveChannel])
incomingMsg = take!(mqttInstanceDict[:keepaliveChannel])
if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId]
# connection is alive
isconnectionalive = true
break
end
end
sleep(1)
end
return isconnectionalive
end
function isMqttConnectionAlive(mqttInstance::mqttClientInstance)::Bool
starttime = Dates.now()
isconnectionalive = false
# ditch old keepalive msg is any
while isready(mqttInstance.keepaliveChannel)
_ = take!(mqttInstance.keepaliveChannel)
end
msgMeta = generate_msgMeta(
mqttInstance.keepalivetopic,
msgPurpose= "keepalive",
)
keepaliveMsg = Dict(
:msgMeta => msgMeta,
:text=>"keepalive",
)
publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTopic],
JSON3.write(keepaliveMsg))
timediff = 0
while timediff < 5
timediff = timedifference(starttime, Dates.now(), "seconds")
if isready(mqttInstance.keepaliveChannel)
incomingMsg = take!(mqttInstance.keepaliveChannel)
if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId]
# connection is alive
isconnectionalive = true
break
end
end
sleep(1)
end
return isconnectionalive
end
""" Check mqtt server connection, reconnect if disconnected.
Arguments\n
-----
mqttInstanceDict::Dict{Symbol, Any}
a dictionary contain mqtt instance. 1 per mqtt client.
interval::Integer
time interval to check mqtt server in seconds
Return\n
-----
isreconnect::Bool
true if mqtt connection is reconnected
Example\n
-----
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, MQTTClient
julia> mqttMsgReceiveChannel = Channel(8)
julia> keepaliveChannel = Channel(8)
julia> function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel, onMsg)
end
julia> mqttInstanceDict = Dict{Symbol, Any}(
:mqttServerInfo=> Dict(
:broker=> "test.mosquitto.org",
:port=> 1883,
),
:onMsgCallback=> onMsgCallback, # function
:client=> nothing,
:connection=> nothing,
:subtopic=> ["/servicetopic", "/sometopic"], # Vector{String}
:keepalivetopic=> "/keepalive",
:keepaliveChannel=> keepaliveChannel, # Channel{Dict}
:lastTimeMqttConnCheck=> Dates.now(),
)
julia> mqttInstanceDict[:client], mqttInstanceDict[:connection] =
MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker],
mqttInstanceDict[:mqttServerInfo][:port])
julia> connect(mqttInstanceDict[:client], mqttInstanceDict[:connection])
julia> GeneralUtils.checkMqttConnection!(mqttInstanceDict, 5)
```
Signature\n
-----
"""
function checkMqttConnection!(mqttInstanceDict::Dict{Symbol, Any}, interval::Integer)::Bool
isreconnect = false
# check if mqtt connection is still up
intervaldiff = timedifference(mqttInstanceDict[:lastTimeMqttConnCheck], Dates.now(), "seconds")
if intervaldiff > interval
while true
mqttConnStatus = isMqttConnectionAlive(mqttInstanceDict)
if mqttConnStatus == false
println("Attemping to reconnect")
# use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
mqttInstanceDict[:client], mqttInstanceDict[:connection] =
MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker],
mqttInstanceDict[:mqttServerInfo][:port])
connect(mqttInstanceDict[:client], mqttInstanceDict[:connection])
for topic in mqttInstanceDict[:subtopic]
subscribe(mqttInstanceDict[:client], topic, mqttInstanceDict[:onMsgCallback], qos=QOS_1)
end
isreconnect = true
println("reconnected")
else
mqttInstanceDict[:lastTimeMqttConnCheck] = Dates.now()
break
end
end
end
return isreconnect
end
function checkMqttConnection!(mqttInstance::mqttClientInstance;
keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Bool
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
isreconnect = false
# check if mqtt connection is still up
intervaldiff = timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds")
if intervaldiff > interval
while true
mqttConnStatus = isMqttConnectionAlive(mqttInstance)
if mqttConnStatus == false
println("Attemping to reconnect")
# use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
mqttInstance.client, mqttInstance.connection =
MakeConnection(mqttInstance.mqttBrokerAddress,
mqttInstance.mqttBrokerPort)
connect(mqttInstance.client, mqttInstance.connection)
for topic in mqttInstance.subtopic
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
end
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
isreconnect = true
println("reconnected")
else
mqttInstance.lastTimeMqttConnCheck = Dates.now()
break
end
end
end
return isreconnect
end
""" Send a message to specified MQTT topic then wait for reply.
# Arguments
- `outgoingMsg::Dict`
an outgoing message
# Return
- `result::NamedTuple`
( isDone= true, error= nothing )
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, UUIDs
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= string(uuid4()),
mqttBrokerAddress= "test.mosquitto.org",
mqttBrokerPort= 1883,
)
julia> outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict("hello"=> "World"),
)
julia> isDone, error = GeneralUtils.sendMqttMsg(outgoingMsg)
```
# Signature
"""
function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
try
# Instantiate a client and connection.
client, connection = MakeConnection(
outgoingMsg[:msgMeta][:mqttBrokerAddress],
outgoingMsg[:msgMeta][:mqttBrokerPort])
connect(client, connection)
publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
# disconnect mqtt
disconnect(client)
return (isDone=true, error=nothing)
catch e
return (isDone=false, error=e)
end
end
function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T}
)::NamedTuple where {T<:Any}
try
publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
# disconnect mqtt
disconnect(client)
return (isDone=true, error=nothing)
catch e
return (isDone=false, error=e)
end
end
""" Send a message to specified MQTT topic then wait for reply.
# Arguments
- `outgoingMsg::Dict{Symbol, T}`
an outgoing message
# Keyword Arguments
- `timeout::Integer`
time to wait for a response before error
# Return
- `result::NamedTuple`
(
isDone= true, # idicates whether sending MQTT message successful
error= nothing # error message
response= somemessage # response message
)
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, UUIDs
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= string(uuid4()),
mqttBrokerAddress= "mqtt.yiem.cc",
mqttBrokerPort= 1883,
)
julia> outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(:hello=> "World"),
)
julia> isDone, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
```
# Signature
"""
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
mqttMsgReceiveChannel = Channel(4)
# ask message receiver to send a message back to specified topic
outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
# Define the callback for receiving messages.
function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel, onMsg)
end
# Instantiate a client and connection.
client, connection = MakeConnection(outgoingMsg[:msgMeta][:mqttBrokerAddress],
outgoingMsg[:msgMeta][:mqttBrokerPort])
connect(client, connection)
subscribe(client, mqttMsgReceiveTopic, onMsgCallback, qos=QOS_1)
timepass = nothing
attempts = 1
while attempts <= maxattempt
publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
starttime = Dates.now()
while true
timepass = timedifference(starttime, Dates.now(), "seconds")
if timepass <= timeout
if isready(mqttMsgReceiveChannel)
incomingMsg = take!(mqttMsgReceiveChannel)
if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
# disconnect mqtt
unsubscribe(client, mqttMsgReceiveTopic)
disconnect(client)
return (isDone=true, error=nothing, response=incomingMsg[:payload])
end
end
else
break
end
sleep(1)
end
attempts += 1
println("attempts $attempts ", @__FILE__, " ", @__LINE__)
end
# disconnect mqtt
unsubscribe(client, mqttMsgReceiveTopic)
disconnect(client)
return (isDone=false, error="no response, timeout $timepass/$timeout, $attempts publish attempted",
response=nothing)
end
end # module communication