From 93aa0ee1ac9433a6a8883b3829476e7a6bf3c93f Mon Sep 17 00:00:00 2001 From: narawat lamaiin Date: Tue, 10 Jun 2025 10:16:31 +0700 Subject: [PATCH] update --- src/communication.jl | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/communication.jl b/src/communication.jl index 6c56468..cf69b98 100644 --- a/src/communication.jl +++ b/src/communication.jl @@ -154,7 +154,7 @@ function mqttClientInstance_v2( keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback() onMsgCallback::Function ; - clientId::Union{String,Nothing}=nothing, + clientId::String="NA", mqttBrokerPort::Integer=1883, keepaliveTopic::String= "/keepalive/$(uuid4snakecase())", keepaliveCheckInterval::Integer=30, @@ -170,7 +170,7 @@ function mqttClientInstance_v2( end MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos) - keepaliveTopic = clientId === nothing ? keepaliveTopic : "/keepalive/$clientId" + keepaliveTopic = clientId == "NA" ? keepaliveTopic : "/keepalive/$clientId" instance = mqttClientInstance_v2( mqttBrokerAddress, @@ -416,7 +416,7 @@ end ----- """ function checkMqttConnection!(mqttInstance::T; - keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Bool where {T<:mqttClientInstance} + keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Union{Bool, Nothing} where {T<:mqttClientInstance} interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval @@ -429,6 +429,7 @@ function checkMqttConnection!(mqttInstance::T; end if intervaldiff > interval + connectionStatusStart = isMqttConnectionAlive(mqttInstance) # a flag to note whether the connection status has changed from false to true while true mqttConnStatus = isMqttConnectionAlive(mqttInstance) if mqttConnStatus == false @@ -438,16 +439,20 @@ function checkMqttConnection!(mqttInstance::T; mqttInstance.client, mqttInstance.connection = MakeConnection(mqttInstance.mqttBrokerAddress, mqttInstance.mqttBrokerPort) - connect(mqttInstance.client, mqttInstance.connection) - for topic in mqttInstance.subtopic - subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) + try + connect(mqttInstance.client, mqttInstance.connection) + for topic in mqttInstance.subtopic + subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) + end + MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) + catch + println("Failed to reconnect MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())") end - MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) - - else mqttInstance.lastTimeMqttConnCheck = Dates.now() - println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)") + if connectionStatusStart != mqttConnStatus + println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())") + end return mqttConnStatus end end