This commit is contained in:
2024-11-07 20:41:29 +07:00
parent 2716e6a32d
commit 09c5221eb3

View File

@@ -24,7 +24,7 @@ mutable struct mqttClientInstance_v1 <: mqttClientInstance
keepalivetopic::String keepalivetopic::String
keepaliveChannel::Channel keepaliveChannel::Channel
keepaliveCheckInterval::Integer # second keepaliveCheckInterval::Integer # second
lastTimeMqttConnCheck::DateTime lastTimeMqttConnCheck::Union{DateTime, Nothing}
end end
function mqttClientInstance_v1( function mqttClientInstance_v1(
@@ -78,7 +78,7 @@ mutable struct mqttClientInstance_v2 <: mqttClientInstance
keepalivetopic::String keepalivetopic::String
keepaliveChannel::Channel keepaliveChannel::Channel
keepaliveCheckInterval::Integer # second keepaliveCheckInterval::Integer # second
lastTimeMqttConnCheck::DateTime lastTimeMqttConnCheck::Union{DateTime, Nothing}
multiMsg::String # "single", "metadata", "datapart" multiMsg::String # "single", "metadata", "datapart"
end end
""" MQTT client v2. The difference between client v1 and v2 is v2 allows multiple """ MQTT client v2. The difference between client v1 and v2 is v2 allows multiple
@@ -176,7 +176,7 @@ function mqttClientInstance_v2(
keepalivetopic, keepalivetopic,
keepaliveChannel, keepaliveChannel,
keepaliveCheckInterval, keepaliveCheckInterval,
Dates.now(), nothing,
multiMsg, multiMsg,
) )
@@ -368,44 +368,6 @@ function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstan
end end
return isconnectionalive return isconnectionalive
end end
# function isMqttConnectionAlive(mqttInstanceDict::Dict{Symbol, Any})::Bool
# starttime = Dates.now()
# isconnectionalive = false
# # ditch old keepalive msg is any
# while isready(mqttInstanceDict[:keepaliveChannel])
# _ = take!(mqttInstanceDict[:keepaliveChannel])
# end
# msgMeta = generate_msgMeta(
# mqttInstanceDict[:keepalivetopic],
# msgPurpose= "keepalive",
# )
# keepaliveMsg = Dict(
# :msgMeta => msgMeta,
# :text=>"keepalive",
# )
# publish(mqttInstanceDict[:client], keepaliveMsg[:msgMeta][:sendTopic],
# JSON3.write(keepaliveMsg))
# timediff = 0
# while timediff < 5
# timediff = timedifference(starttime, Dates.now(), "seconds")
# if isready(mqttInstanceDict[:keepaliveChannel])
# incomingMsg = take!(mqttInstanceDict[:keepaliveChannel])
# if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId]
# # connection is alive
# isconnectionalive = true
# break
# end
# end
# sleep(1)
# end
# return isconnectionalive
# end
""" Check mqtt server connection, reconnect if disconnected. """ Check mqtt server connection, reconnect if disconnected.
@@ -439,16 +401,20 @@ function checkMqttConnection!(mqttInstance::T;
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
# check if mqtt connection is still up # check if mqtt connection is still up
intervaldiff = timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds") intervaldiff =
if mqttInstance.lastTimeMqttConnCheck !== nothing
isdisconnected = false # flag timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds")
else
Inf
end
isreconnect = false # this value is true if connection is disconnected
if intervaldiff > interval if intervaldiff > interval
while true while true
mqttConnStatus = isMqttConnectionAlive(mqttInstance) mqttConnStatus = isMqttConnectionAlive(mqttInstance)
if mqttConnStatus == false if mqttConnStatus == false
isdisconnected = true isreconnect = true
println("mqtt connection $mqttConnStatus, Attemping to reconnect $(Dates.now())") println("mqtt connection disconnected, attemping to reconnect $(Dates.now())")
# use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
mqttInstance.client, mqttInstance.connection = mqttInstance.client, mqttInstance.connection =
MakeConnection(mqttInstance.mqttBrokerAddress, MakeConnection(mqttInstance.mqttBrokerAddress,
@@ -458,51 +424,19 @@ function checkMqttConnection!(mqttInstance::T;
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
end end
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
sleep(30) sleep(1) # wait before checking connection again
else else
mqttInstance.lastTimeMqttConnCheck = Dates.now() mqttInstance.lastTimeMqttConnCheck = Dates.now()
if isdisconnected if isreconnect
println("connected to mqtt broker") println("connected to mqtt broker")
end end
return mqttConnStatus return isreconnect
break
end end
end end
else else
return nothing # still within check interval return nothing # still within check interval, no update on connection status yet
end end
end end
# function checkMqttConnection!(mqttInstanceDict::Dict{Symbol, Any}, interval::Integer)::Bool
# isreconnect = false
# # check if mqtt connection is still up
# intervaldiff = timedifference(mqttInstanceDict[:lastTimeMqttConnCheck], Dates.now(), "seconds")
# if intervaldiff > interval
# while true
# mqttConnStatus = isMqttConnectionAlive(mqttInstanceDict)
# if mqttConnStatus == false
# println("Attemping to reconnect")
# # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
# mqttInstanceDict[:client], mqttInstanceDict[:connection] =
# MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker],
# mqttInstanceDict[:mqttServerInfo][:port])
# connect(mqttInstanceDict[:client], mqttInstanceDict[:connection])
# for topic in mqttInstanceDict[:subtopic]
# subscribe(mqttInstanceDict[:client], topic, mqttInstanceDict[:onMsgCallback], qos=QOS_1)
# end
# isreconnect = true
# println("reconnected")
# else
# mqttInstanceDict[:lastTimeMqttConnCheck] = Dates.now()
# break
# end
# end
# end
# return isreconnect
# end
""" Send a message to specified MQTT topic then wait for reply. """ Send a message to specified MQTT topic then wait for reply.