use channel
This commit is contained in:
@@ -241,12 +241,12 @@ end
|
||||
"""
|
||||
mqtt communication loop, call this function manually
|
||||
"""
|
||||
function mqttRun(client::mqttClient)
|
||||
function mqttRun(client::mqttClient, storeResult::Channel)
|
||||
mqttLoop!(client)
|
||||
mqttCheckConnection!(client)
|
||||
mqttReconnect!(client)
|
||||
mqttSubOnConnect!(client)
|
||||
return mqttOnmessage!(client)
|
||||
mqttOnmessage!(client, storeResult)
|
||||
end
|
||||
|
||||
|
||||
@@ -260,28 +260,26 @@ end
|
||||
end
|
||||
|
||||
"""
|
||||
function mqttOnmessage!(client::mqttClient)
|
||||
function mqttOnmessage!(client::mqttClient, storeResult::Channel)
|
||||
nmessages = Base.n_avail(get_messages_channel())
|
||||
nmessages == 0 && return nothing
|
||||
|
||||
timepass = (Dates.now() - client.boottime).value / 1000.0
|
||||
|
||||
if timepass <= 1 && client.retainedMsgCleared == false
|
||||
if client.retainedMsgCleared == false
|
||||
# discard all retained message
|
||||
for i = 1:nmessages
|
||||
_ = take!(get_messages_channel()) # discard new arrival package
|
||||
end
|
||||
elseif timepass > 1 && client.retainedMsgCleared == false
|
||||
client.retainedMsgCleared = true
|
||||
println("retained mqtt messages cleared")
|
||||
else
|
||||
elseif client.retainedMsgCleared
|
||||
for i = 1:nmessages
|
||||
pkg = take!(get_messages_channel()) # take new arrival package
|
||||
# sptoptic = split(topic, "/")
|
||||
payload = vecToDict(pkg.payload) # payload in Dict format
|
||||
|
||||
return pkg.topic, payload
|
||||
put!(storeResult, (topic=pkg.topic, payload=payload))
|
||||
end
|
||||
else
|
||||
error("undefined condition")
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
Reference in New Issue
Block a user