update
This commit is contained in:
@@ -154,7 +154,7 @@ function mqttClientInstance_v2(
|
||||
keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
|
||||
onMsgCallback::Function
|
||||
;
|
||||
clientId::Union{String,Nothing}=nothing,
|
||||
clientId::String="NA",
|
||||
mqttBrokerPort::Integer=1883,
|
||||
keepaliveTopic::String= "/keepalive/$(uuid4snakecase())",
|
||||
keepaliveCheckInterval::Integer=30,
|
||||
@@ -170,7 +170,7 @@ function mqttClientInstance_v2(
|
||||
end
|
||||
MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos)
|
||||
|
||||
keepaliveTopic = clientId === nothing ? keepaliveTopic : "/keepalive/$clientId"
|
||||
keepaliveTopic = clientId == "NA" ? keepaliveTopic : "/keepalive/$clientId"
|
||||
|
||||
instance = mqttClientInstance_v2(
|
||||
mqttBrokerAddress,
|
||||
@@ -416,7 +416,7 @@ end
|
||||
-----
|
||||
"""
|
||||
function checkMqttConnection!(mqttInstance::T;
|
||||
keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Bool where {T<:mqttClientInstance}
|
||||
keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Union{Bool, Nothing} where {T<:mqttClientInstance}
|
||||
|
||||
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
|
||||
|
||||
@@ -429,6 +429,7 @@ function checkMqttConnection!(mqttInstance::T;
|
||||
end
|
||||
|
||||
if intervaldiff > interval
|
||||
connectionStatusStart = isMqttConnectionAlive(mqttInstance) # a flag to note whether the connection status has changed from false to true
|
||||
while true
|
||||
mqttConnStatus = isMqttConnectionAlive(mqttInstance)
|
||||
if mqttConnStatus == false
|
||||
@@ -438,16 +439,20 @@ function checkMqttConnection!(mqttInstance::T;
|
||||
mqttInstance.client, mqttInstance.connection =
|
||||
MakeConnection(mqttInstance.mqttBrokerAddress,
|
||||
mqttInstance.mqttBrokerPort)
|
||||
connect(mqttInstance.client, mqttInstance.connection)
|
||||
for topic in mqttInstance.subtopic
|
||||
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
||||
try
|
||||
connect(mqttInstance.client, mqttInstance.connection)
|
||||
for topic in mqttInstance.subtopic
|
||||
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
||||
end
|
||||
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
||||
catch
|
||||
println("Failed to reconnect MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
|
||||
end
|
||||
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
||||
|
||||
|
||||
else
|
||||
mqttInstance.lastTimeMqttConnCheck = Dates.now()
|
||||
println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)")
|
||||
if connectionStatusStart != mqttConnStatus
|
||||
println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
|
||||
end
|
||||
return mqttConnStatus
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user