@@ -1,7 +1,7 @@
|
|||||||
name = "GeneralUtils"
|
name = "GeneralUtils"
|
||||||
uuid = "c6c72f09-b708-4ac8-ac7c-2084d70108fe"
|
uuid = "c6c72f09-b708-4ac8-ac7c-2084d70108fe"
|
||||||
authors = ["tonaerospace <tonaerospace.etc@gmail.com>"]
|
authors = ["tonaerospace <tonaerospace.etc@gmail.com>"]
|
||||||
version = "0.2.3"
|
version = "0.2.4"
|
||||||
|
|
||||||
[deps]
|
[deps]
|
||||||
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
|
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
|
||||||
|
|||||||
@@ -74,6 +74,7 @@ mutable struct mqttClientInstance_v2 <: mqttClientInstance
|
|||||||
onMsgCallback::Function
|
onMsgCallback::Function
|
||||||
qos::MQTTClient.QOS
|
qos::MQTTClient.QOS
|
||||||
client::MQTTClient.Client
|
client::MQTTClient.Client
|
||||||
|
clientId::String
|
||||||
connection::MQTTClient.Connection
|
connection::MQTTClient.Connection
|
||||||
keepalivetopic::String
|
keepalivetopic::String
|
||||||
keepaliveChannel::Channel
|
keepaliveChannel::Channel
|
||||||
@@ -153,8 +154,9 @@ function mqttClientInstance_v2(
|
|||||||
keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
|
keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
|
||||||
onMsgCallback::Function
|
onMsgCallback::Function
|
||||||
;
|
;
|
||||||
|
clientId::Union{String,Nothing}=nothing,
|
||||||
mqttBrokerPort::Integer=1883,
|
mqttBrokerPort::Integer=1883,
|
||||||
keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
|
keepaliveTopic::String= "/keepalive/$(uuid4snakecase())",
|
||||||
keepaliveCheckInterval::Integer=30,
|
keepaliveCheckInterval::Integer=30,
|
||||||
qos::MQTTClient.QOS=QOS_1,
|
qos::MQTTClient.QOS=QOS_1,
|
||||||
multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta
|
multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta
|
||||||
@@ -166,7 +168,9 @@ function mqttClientInstance_v2(
|
|||||||
for i in subtopic
|
for i in subtopic
|
||||||
MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
|
MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
|
||||||
end
|
end
|
||||||
MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos)
|
MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos)
|
||||||
|
|
||||||
|
keepaliveTopic = clientId === nothing ? keepaliveTopic : "/keepalive/$clientId"
|
||||||
|
|
||||||
instance = mqttClientInstance_v2(
|
instance = mqttClientInstance_v2(
|
||||||
mqttBrokerAddress,
|
mqttBrokerAddress,
|
||||||
@@ -176,8 +180,9 @@ function mqttClientInstance_v2(
|
|||||||
onMsgCallback,
|
onMsgCallback,
|
||||||
qos,
|
qos,
|
||||||
client,
|
client,
|
||||||
|
clientId,
|
||||||
connection,
|
connection,
|
||||||
keepalivetopic,
|
keepaliveTopic,
|
||||||
keepaliveChannel,
|
keepaliveChannel,
|
||||||
keepaliveCheckInterval,
|
keepaliveCheckInterval,
|
||||||
nothing,
|
nothing,
|
||||||
@@ -346,8 +351,6 @@ end
|
|||||||
-----
|
-----
|
||||||
"""
|
"""
|
||||||
function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance}
|
function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance}
|
||||||
|
|
||||||
starttime = Dates.now()
|
|
||||||
isconnectionalive = false
|
isconnectionalive = false
|
||||||
|
|
||||||
# ditch old keepalive msg is any
|
# ditch old keepalive msg is any
|
||||||
@@ -366,12 +369,12 @@ function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstan
|
|||||||
:text=>"keepalive",
|
:text=>"keepalive",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTopic],
|
publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTopic],
|
||||||
JSON3.write(keepaliveMsg))
|
JSON3.write(keepaliveMsg))
|
||||||
|
|
||||||
timediff = 0
|
timediff = 0
|
||||||
while timediff < 5
|
starttime = Dates.now()
|
||||||
|
while timediff <= 5
|
||||||
timediff = timedifference(starttime, Dates.now(), "seconds")
|
timediff = timedifference(starttime, Dates.now(), "seconds")
|
||||||
if isready(mqttInstance.keepaliveChannel)
|
if isready(mqttInstance.keepaliveChannel)
|
||||||
incomingMsg = take!(mqttInstance.keepaliveChannel)
|
incomingMsg = take!(mqttInstance.keepaliveChannel)
|
||||||
@@ -393,7 +396,7 @@ end
|
|||||||
-----
|
-----
|
||||||
mqttInstanceDict::Dict{Symbol, Any}
|
mqttInstanceDict::Dict{Symbol, Any}
|
||||||
a dictionary contain mqtt instance. 1 per mqtt client.
|
a dictionary contain mqtt instance. 1 per mqtt client.
|
||||||
interval::Integer
|
keepaliveCheckInterval::Integer
|
||||||
time interval to check mqtt server in seconds
|
time interval to check mqtt server in seconds
|
||||||
|
|
||||||
Return\n
|
Return\n
|
||||||
@@ -413,7 +416,7 @@ end
|
|||||||
-----
|
-----
|
||||||
"""
|
"""
|
||||||
function checkMqttConnection!(mqttInstance::T;
|
function checkMqttConnection!(mqttInstance::T;
|
||||||
keepaliveCheckInterval::Union{Integer, Nothing}=nothing) where {T<:mqttClientInstance}
|
keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Bool where {T<:mqttClientInstance}
|
||||||
|
|
||||||
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
|
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
|
||||||
|
|
||||||
@@ -425,13 +428,12 @@ function checkMqttConnection!(mqttInstance::T;
|
|||||||
Inf
|
Inf
|
||||||
end
|
end
|
||||||
|
|
||||||
isreconnect = false # this value is true if connection is disconnected
|
|
||||||
if intervaldiff > interval
|
if intervaldiff > interval
|
||||||
while true
|
while true
|
||||||
mqttConnStatus = isMqttConnectionAlive(mqttInstance)
|
mqttConnStatus = isMqttConnectionAlive(mqttInstance)
|
||||||
if mqttConnStatus == false
|
if mqttConnStatus == false
|
||||||
isreconnect = true
|
sleep(5) # wait
|
||||||
println("mqtt connection disconnected, attemping to reconnect $(Dates.now())")
|
println("MQTT connection disconnected, attemping to reconnect $(Dates.now()) at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)")
|
||||||
# use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
|
# use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
|
||||||
mqttInstance.client, mqttInstance.connection =
|
mqttInstance.client, mqttInstance.connection =
|
||||||
MakeConnection(mqttInstance.mqttBrokerAddress,
|
MakeConnection(mqttInstance.mqttBrokerAddress,
|
||||||
@@ -441,13 +443,12 @@ function checkMqttConnection!(mqttInstance::T;
|
|||||||
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
||||||
end
|
end
|
||||||
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
||||||
sleep(1) # wait before checking connection again
|
|
||||||
|
|
||||||
else
|
else
|
||||||
mqttInstance.lastTimeMqttConnCheck = Dates.now()
|
mqttInstance.lastTimeMqttConnCheck = Dates.now()
|
||||||
if isreconnect
|
println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)")
|
||||||
println("connected to mqtt broker")
|
return mqttConnStatus
|
||||||
end
|
|
||||||
return isreconnect
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
@@ -567,8 +568,9 @@ julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
|
|||||||
# Signature
|
# Signature
|
||||||
"""
|
"""
|
||||||
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
||||||
timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
|
connectiontimeout::Integer=60, responsetimeout::Integer=60, responsemaxattempt::Integer=1)::NamedTuple where {T<:Any}
|
||||||
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
|
senderId = outgoingMsg[:msgMeta][:senderId]
|
||||||
|
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$senderId"
|
||||||
mqttMsgReceiveChannel = (ch1=Channel(8),)
|
mqttMsgReceiveChannel = (ch1=Channel(8),)
|
||||||
keepaliveChannel = Channel(8)
|
keepaliveChannel = Channel(8)
|
||||||
outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
|
outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
|
||||||
@@ -576,38 +578,73 @@ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
|||||||
# Define the callback for receiving messages.
|
# Define the callback for receiving messages.
|
||||||
function onMsgCallback(topic, payload)
|
function onMsgCallback(topic, payload)
|
||||||
jobj = JSON3.read(String(payload))
|
jobj = JSON3.read(String(payload))
|
||||||
onMsg = copy(jobj)
|
incomingMqttMsg = copy(jobj)
|
||||||
put!(mqttMsgReceiveChannel[:ch1], onMsg)
|
|
||||||
|
if occursin("GeneralUtils_sendReceiveMqttMsg", topic)
|
||||||
|
put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
|
||||||
|
elseif occursin("keepalive", topic)
|
||||||
|
put!(keepaliveChannel, incomingMqttMsg)
|
||||||
|
println("keepalive received ", incomingMqttMsg)
|
||||||
|
else
|
||||||
|
println("undefined condition ", @__FILE__, " ", @__LINE__)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
mqttInstance = nothing
|
||||||
|
attempt = 0
|
||||||
|
starttime = Dates.now()
|
||||||
|
endtime = starttime + Second(connectiontimeout)
|
||||||
|
errormsg = nothing
|
||||||
|
while true
|
||||||
|
timenow = Dates.now()
|
||||||
|
timepass = timedifference(starttime, timenow, "seconds")
|
||||||
|
timeleft = timedifference(timenow, endtime, "seconds")
|
||||||
|
|
||||||
|
if timepass <= connectiontimeout
|
||||||
|
attempt += 1
|
||||||
|
attempt > 1 ? println("Attempt $attempt to connect to MQTT broker. Timed out in $timeleft seconds. $errormsg") : nothing
|
||||||
|
|
||||||
|
try
|
||||||
|
mqttInstance = mqttClientInstance_v2(
|
||||||
|
outgoingMsg[:msgMeta][:mqttBrokerAddress],
|
||||||
|
[mqttMsgReceiveTopic],
|
||||||
|
mqttMsgReceiveChannel,
|
||||||
|
keepaliveChannel,
|
||||||
|
onMsgCallback;
|
||||||
|
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort],
|
||||||
|
clientId=senderId
|
||||||
|
)
|
||||||
|
break
|
||||||
|
catch e
|
||||||
|
errormsg = e
|
||||||
|
sleep(5)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
println("Failed to instantiate MQTT client after $timepass seconds. $errormsg")
|
||||||
|
return nothing
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
mqttInstance = mqttClientInstance_v2(
|
response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg;
|
||||||
outgoingMsg[:msgMeta][:mqttBrokerAddress],
|
responsetimeout=responsetimeout, responsemaxattempt=responsemaxattempt)
|
||||||
[mqttMsgReceiveTopic],
|
|
||||||
mqttMsgReceiveChannel,
|
|
||||||
keepaliveChannel,
|
|
||||||
onMsgCallback;
|
|
||||||
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
|
|
||||||
)
|
|
||||||
|
|
||||||
response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt)
|
|
||||||
try disconnect(mqttInstance.client) catch end
|
try disconnect(mqttInstance.client) catch end
|
||||||
|
|
||||||
return response
|
return response
|
||||||
end
|
end
|
||||||
function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol,
|
function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol,
|
||||||
outgoingMsg::Dict{Symbol, T}; timeout::Integer=60, maxattempt::Integer=1
|
outgoingMsg::Dict{Symbol, T}; responsetimeout::Integer=60, responsemaxattempt::Integer=1
|
||||||
)::NamedTuple where {T<:Any}
|
)::NamedTuple where {T<:Any}
|
||||||
|
|
||||||
timepass = nothing
|
timepass = nothing
|
||||||
attempts = 1
|
attempts = 1
|
||||||
while attempts <= maxattempt
|
while attempts <= responsemaxattempt
|
||||||
sendMqttMsg(mqttInstance, outgoingMsg)
|
sendMqttMsg(mqttInstance, outgoingMsg)
|
||||||
|
|
||||||
starttime = Dates.now()
|
starttime = Dates.now()
|
||||||
while true
|
while true
|
||||||
timepass = timedifference(starttime, Dates.now(), "seconds")
|
timepass = timedifference(starttime, Dates.now(), "seconds")
|
||||||
|
|
||||||
if timepass <= timeout
|
if timepass <= responsetimeout
|
||||||
if isready(mqttInstance.msgReceiveChannel[receivechannel])
|
if isready(mqttInstance.msgReceiveChannel[receivechannel])
|
||||||
incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel])
|
incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel])
|
||||||
|
|
||||||
@@ -626,9 +663,10 @@ function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel:
|
|||||||
sleep(1)
|
sleep(1)
|
||||||
end
|
end
|
||||||
if attempts > 1
|
if attempts > 1
|
||||||
println("\n<sendReceiveMqttMsg()> attempts $attempts/$maxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
println("\n<sendReceiveMqttMsg()> attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
||||||
pprintln(outgoingMsg)
|
pprintln(outgoingMsg)
|
||||||
println("</sendReceiveMqttMsg()> attempts $attempts/$maxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n")
|
println("</sendReceiveMqttMsg()> attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n")
|
||||||
|
checkMqttConnection!(mqttInstance, keepaliveCheckInterval=5)
|
||||||
end
|
end
|
||||||
attempts += 1
|
attempts += 1
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user