This commit is contained in:
2025-03-14 18:45:44 +07:00
parent 092215c3be
commit d163105d93

View File

@@ -107,6 +107,8 @@ end
A time interval to check whether the mqtt client still connect to mqtt broker
- `qos`
Quality of Service. Can be QOS_0, QOS_1, QOS_2
- `clearOldMsg`
Boolean to determine if old messages should be cleared from channels
# Return
- `mqttInstance`
@@ -155,7 +157,7 @@ function mqttClientInstance_v2(
keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
keepaliveCheckInterval::Integer=30,
qos::MQTTClient.QOS=QOS_1,
multiMsg::String="single",
multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta
clearOldMsg::Bool=true,
)
@@ -200,7 +202,6 @@ function mqttClientInstance_v2(
end
""" Generate msgMeta to be including in a message. So the message receiver know
what to do with the message.
@@ -604,9 +605,9 @@ function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel:
while attempts <= maxattempt
attempts += 1
if attempts > 1
println("\nsendReceiveMqttMsg() attempts $attempts ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
println("\n<sendReceiveMqttMsg()> attempts $attempts/$maxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
pprintln(outgoingMsg)
println("--------------\n")
println("</sendReceiveMqttMsg()> attempts $attempts/$maxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n")
end
sendMqttMsg(mqttInstance, outgoingMsg)
@@ -641,71 +642,6 @@ function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel:
end
# function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
# timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
# mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
# mqttMsgReceiveChannel = Channel(4)
# # ask message receiver to send a message back to specified topic
# outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
# # Define the callback for receiving messages.
# function onMsgCallback(topic, payload)
# jobj = JSON3.read(String(payload))
# onMsg = copy(jobj)
# put!(mqttMsgReceiveChannel, onMsg)
# end
# # Instantiate a client and connection.
# client, connection = MakeConnection(outgoingMsg[:msgMeta][:mqttBrokerAddress],
# outgoingMsg[:msgMeta][:mqttBrokerPort])
# connect(client, connection)
# subscribe(client, mqttMsgReceiveTopic, onMsgCallback, qos=QOS_1)
# timepass = nothing
# attempts = 1
# while attempts <= maxattempt
# publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
# starttime = Dates.now()
# while true
# timepass = timedifference(starttime, Dates.now(), "seconds")
# if timepass <= timeout
# if isready(mqttMsgReceiveChannel)
# incomingMsg = take!(mqttMsgReceiveChannel)
# if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
# if msgPurpose == "NACK"
# break # resend the msg
# else
# # disconnect mqtt
# unsubscribe(client, mqttMsgReceiveTopic)
# disconnect(client)
# return (success=true, error=nothing, response=incomingMsg[:payload])
# end
# end
# end
# else
# break
# end
# sleep(1)
# end
# println("attempts $attempts ", @__FILE__, " ", @__LINE__)
# attempts += 1
# end
# # disconnect mqtt
# unsubscribe(client, mqttMsgReceiveTopic)
# disconnect(client)
# return (success=false,
# error="no response, timeout $timepass/$timeout, $attempts publish attempted",
# response=nothing)
# end
""" Send a message to specified MQTT topic then wait for reply.
# Arguments