update
This commit is contained in:
@@ -535,7 +535,7 @@ end
|
||||
# Signature
|
||||
"""
|
||||
function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
|
||||
mqttMsgReceiveTopic = ["/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"]
|
||||
mqttMsgReceiveTopic = ["/GeneralUtils_sendMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"]
|
||||
mqttMsgReceiveChannel = (ch1=Channel(8),)
|
||||
keepaliveChannel = Channel(8)
|
||||
|
||||
@@ -555,35 +555,20 @@ function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
|
||||
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
|
||||
)
|
||||
|
||||
return sendMqttMsg(mqttInstance, outgoingMsg)
|
||||
response = sendMqttMsg(mqttInstance, outgoingMsg)
|
||||
disconnect(mqttInstance.client)
|
||||
|
||||
return response
|
||||
end
|
||||
function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T}
|
||||
)::NamedTuple where {T<:Any}
|
||||
try
|
||||
publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
|
||||
# disconnect mqtt
|
||||
disconnect(client)
|
||||
return (success=true, error=nothing)
|
||||
catch e
|
||||
return (success=false, error=e)
|
||||
end
|
||||
end
|
||||
# function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
|
||||
# try
|
||||
# # Instantiate a client and connection.
|
||||
# client, connection = MakeConnection(
|
||||
# outgoingMsg[:msgMeta][:mqttBrokerAddress],
|
||||
# outgoingMsg[:msgMeta][:mqttBrokerPort])
|
||||
# connect(client, connection)
|
||||
|
||||
# publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
|
||||
# # disconnect mqtt
|
||||
# disconnect(client)
|
||||
# return (success=true, error=nothing)
|
||||
# catch e
|
||||
# return (success=false, error=e)
|
||||
# end
|
||||
# end
|
||||
|
||||
|
||||
""" Send a message to specified MQTT topic then wait for reply.
|
||||
@@ -626,9 +611,10 @@ julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
|
||||
"""
|
||||
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
||||
timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
|
||||
mqttMsgReceiveTopic = ["/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"]
|
||||
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
|
||||
mqttMsgReceiveChannel = (ch1=Channel(8),)
|
||||
keepaliveChannel = Channel(8)
|
||||
outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
|
||||
|
||||
# Define the callback for receiving messages.
|
||||
function onMsgCallback(topic, payload)
|
||||
@@ -639,14 +625,17 @@ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
||||
|
||||
mqttInstance = mqttClientInstance_v2(
|
||||
outgoingMsg[:msgMeta][:mqttBrokerAddress],
|
||||
mqttMsgReceiveTopic,
|
||||
[mqttMsgReceiveTopic],
|
||||
mqttMsgReceiveChannel,
|
||||
keepaliveChannel,
|
||||
onMsgCallback;
|
||||
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
|
||||
)
|
||||
|
||||
return sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt)
|
||||
response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt)
|
||||
disconnect(mqttInstance.client)
|
||||
|
||||
return response
|
||||
end
|
||||
|
||||
function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol,
|
||||
|
||||
Reference in New Issue
Block a user