@@ -1,7 +1,7 @@
|
|||||||
name = "GeneralUtils"
|
name = "GeneralUtils"
|
||||||
uuid = "c6c72f09-b708-4ac8-ac7c-2084d70108fe"
|
uuid = "c6c72f09-b708-4ac8-ac7c-2084d70108fe"
|
||||||
authors = ["tonaerospace <tonaerospace.etc@gmail.com>"]
|
authors = ["tonaerospace <tonaerospace.etc@gmail.com>"]
|
||||||
version = "0.2.4"
|
version = "0.3.0"
|
||||||
|
|
||||||
[deps]
|
[deps]
|
||||||
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
|
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
|
||||||
|
|||||||
@@ -154,7 +154,7 @@ function mqttClientInstance_v2(
|
|||||||
keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
|
keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
|
||||||
onMsgCallback::Function
|
onMsgCallback::Function
|
||||||
;
|
;
|
||||||
clientId::Union{String,Nothing}=nothing,
|
clientId::String="NA",
|
||||||
mqttBrokerPort::Integer=1883,
|
mqttBrokerPort::Integer=1883,
|
||||||
keepaliveTopic::String= "/keepalive/$(uuid4snakecase())",
|
keepaliveTopic::String= "/keepalive/$(uuid4snakecase())",
|
||||||
keepaliveCheckInterval::Integer=30,
|
keepaliveCheckInterval::Integer=30,
|
||||||
@@ -170,7 +170,7 @@ function mqttClientInstance_v2(
|
|||||||
end
|
end
|
||||||
MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos)
|
MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos)
|
||||||
|
|
||||||
keepaliveTopic = clientId === nothing ? keepaliveTopic : "/keepalive/$clientId"
|
keepaliveTopic = clientId == "NA" ? keepaliveTopic : "/keepalive/$clientId"
|
||||||
|
|
||||||
instance = mqttClientInstance_v2(
|
instance = mqttClientInstance_v2(
|
||||||
mqttBrokerAddress,
|
mqttBrokerAddress,
|
||||||
@@ -416,7 +416,7 @@ end
|
|||||||
-----
|
-----
|
||||||
"""
|
"""
|
||||||
function checkMqttConnection!(mqttInstance::T;
|
function checkMqttConnection!(mqttInstance::T;
|
||||||
keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Bool where {T<:mqttClientInstance}
|
keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Union{Bool, Nothing} where {T<:mqttClientInstance}
|
||||||
|
|
||||||
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
|
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
|
||||||
|
|
||||||
@@ -429,6 +429,7 @@ function checkMqttConnection!(mqttInstance::T;
|
|||||||
end
|
end
|
||||||
|
|
||||||
if intervaldiff > interval
|
if intervaldiff > interval
|
||||||
|
connectionStatusStart = isMqttConnectionAlive(mqttInstance) # a flag to note whether the connection status has changed from false to true
|
||||||
while true
|
while true
|
||||||
mqttConnStatus = isMqttConnectionAlive(mqttInstance)
|
mqttConnStatus = isMqttConnectionAlive(mqttInstance)
|
||||||
if mqttConnStatus == false
|
if mqttConnStatus == false
|
||||||
@@ -438,16 +439,20 @@ function checkMqttConnection!(mqttInstance::T;
|
|||||||
mqttInstance.client, mqttInstance.connection =
|
mqttInstance.client, mqttInstance.connection =
|
||||||
MakeConnection(mqttInstance.mqttBrokerAddress,
|
MakeConnection(mqttInstance.mqttBrokerAddress,
|
||||||
mqttInstance.mqttBrokerPort)
|
mqttInstance.mqttBrokerPort)
|
||||||
|
try
|
||||||
connect(mqttInstance.client, mqttInstance.connection)
|
connect(mqttInstance.client, mqttInstance.connection)
|
||||||
for topic in mqttInstance.subtopic
|
for topic in mqttInstance.subtopic
|
||||||
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
||||||
end
|
end
|
||||||
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
||||||
|
catch
|
||||||
|
println("Failed to reconnect MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
|
||||||
|
end
|
||||||
else
|
else
|
||||||
mqttInstance.lastTimeMqttConnCheck = Dates.now()
|
mqttInstance.lastTimeMqttConnCheck = Dates.now()
|
||||||
println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)")
|
if connectionStatusStart != mqttConnStatus
|
||||||
|
println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
|
||||||
|
end
|
||||||
return mqttConnStatus
|
return mqttConnStatus
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -568,7 +573,7 @@ julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
|
|||||||
# Signature
|
# Signature
|
||||||
"""
|
"""
|
||||||
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
||||||
connectiontimeout::Integer=60, responsetimeout::Integer=60, responsemaxattempt::Integer=1)::NamedTuple where {T<:Any}
|
connectiontimeout::Integer=600, responsetimeout::Integer=60, responsemaxattempt::Integer=1)::NamedTuple where {T<:Any}
|
||||||
senderId = outgoingMsg[:msgMeta][:senderId]
|
senderId = outgoingMsg[:msgMeta][:senderId]
|
||||||
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$senderId"
|
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$senderId"
|
||||||
mqttMsgReceiveChannel = (ch1=Channel(8),)
|
mqttMsgReceiveChannel = (ch1=Channel(8),)
|
||||||
@@ -584,7 +589,6 @@ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
|||||||
put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
|
put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
|
||||||
elseif occursin("keepalive", topic)
|
elseif occursin("keepalive", topic)
|
||||||
put!(keepaliveChannel, incomingMqttMsg)
|
put!(keepaliveChannel, incomingMqttMsg)
|
||||||
println("keepalive received ", incomingMqttMsg)
|
|
||||||
else
|
else
|
||||||
println("undefined condition ", @__FILE__, " ", @__LINE__)
|
println("undefined condition ", @__FILE__, " ", @__LINE__)
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user