diff --git a/Project.toml b/Project.toml index 6be28fb..09b7cf2 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "GeneralUtils" uuid = "c6c72f09-b708-4ac8-ac7c-2084d70108fe" authors = ["tonaerospace "] -version = "0.2.3" +version = "0.2.4" [deps] CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" diff --git a/src/communication.jl b/src/communication.jl index 18acde8..6c56468 100644 --- a/src/communication.jl +++ b/src/communication.jl @@ -74,6 +74,7 @@ mutable struct mqttClientInstance_v2 <: mqttClientInstance onMsgCallback::Function qos::MQTTClient.QOS client::MQTTClient.Client + clientId::String connection::MQTTClient.Connection keepalivetopic::String keepaliveChannel::Channel @@ -153,8 +154,9 @@ function mqttClientInstance_v2( keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback() onMsgCallback::Function ; + clientId::Union{String,Nothing}=nothing, mqttBrokerPort::Integer=1883, - keepalivetopic::String= "/keepalive/$(uuid4snakecase())", + keepaliveTopic::String= "/keepalive/$(uuid4snakecase())", keepaliveCheckInterval::Integer=30, qos::MQTTClient.QOS=QOS_1, multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta @@ -166,7 +168,9 @@ function mqttClientInstance_v2( for i in subtopic MQTTClient.subscribe(client, i, onMsgCallback, qos=qos) end - MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos) + MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos) + + keepaliveTopic = clientId === nothing ? keepaliveTopic : "/keepalive/$clientId" instance = mqttClientInstance_v2( mqttBrokerAddress, @@ -176,8 +180,9 @@ function mqttClientInstance_v2( onMsgCallback, qos, client, + clientId, connection, - keepalivetopic, + keepaliveTopic, keepaliveChannel, keepaliveCheckInterval, nothing, @@ -346,8 +351,6 @@ end ----- """ function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance} - - starttime = Dates.now() isconnectionalive = false # ditch old keepalive msg is any @@ -366,12 +369,12 @@ function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstan :text=>"keepalive", ) ) - + publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTopic], JSON3.write(keepaliveMsg)) - timediff = 0 - while timediff < 5 + starttime = Dates.now() + while timediff <= 5 timediff = timedifference(starttime, Dates.now(), "seconds") if isready(mqttInstance.keepaliveChannel) incomingMsg = take!(mqttInstance.keepaliveChannel) @@ -393,7 +396,7 @@ end ----- mqttInstanceDict::Dict{Symbol, Any} a dictionary contain mqtt instance. 1 per mqtt client. - interval::Integer + keepaliveCheckInterval::Integer time interval to check mqtt server in seconds Return\n @@ -413,7 +416,7 @@ end ----- """ function checkMqttConnection!(mqttInstance::T; - keepaliveCheckInterval::Union{Integer, Nothing}=nothing) where {T<:mqttClientInstance} + keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Bool where {T<:mqttClientInstance} interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval @@ -425,13 +428,12 @@ function checkMqttConnection!(mqttInstance::T; Inf end - isreconnect = false # this value is true if connection is disconnected if intervaldiff > interval while true mqttConnStatus = isMqttConnectionAlive(mqttInstance) if mqttConnStatus == false - isreconnect = true - println("mqtt connection disconnected, attemping to reconnect $(Dates.now())") + sleep(5) # wait + println("MQTT connection disconnected, attemping to reconnect $(Dates.now()) at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)") # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet mqttInstance.client, mqttInstance.connection = MakeConnection(mqttInstance.mqttBrokerAddress, @@ -441,13 +443,12 @@ function checkMqttConnection!(mqttInstance::T; subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) end MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) - sleep(1) # wait before checking connection again + + else mqttInstance.lastTimeMqttConnCheck = Dates.now() - if isreconnect - println("connected to mqtt broker") - end - return isreconnect + println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)") + return mqttConnStatus end end else @@ -567,8 +568,9 @@ julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg) # Signature """ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T}; - timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any} - mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])" + connectiontimeout::Integer=60, responsetimeout::Integer=60, responsemaxattempt::Integer=1)::NamedTuple where {T<:Any} + senderId = outgoingMsg[:msgMeta][:senderId] + mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$senderId" mqttMsgReceiveChannel = (ch1=Channel(8),) keepaliveChannel = Channel(8) outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic @@ -576,38 +578,73 @@ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T}; # Define the callback for receiving messages. function onMsgCallback(topic, payload) jobj = JSON3.read(String(payload)) - onMsg = copy(jobj) - put!(mqttMsgReceiveChannel[:ch1], onMsg) + incomingMqttMsg = copy(jobj) + + if occursin("GeneralUtils_sendReceiveMqttMsg", topic) + put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg) + elseif occursin("keepalive", topic) + put!(keepaliveChannel, incomingMqttMsg) + println("keepalive received ", incomingMqttMsg) + else + println("undefined condition ", @__FILE__, " ", @__LINE__) + end + end + + mqttInstance = nothing + attempt = 0 + starttime = Dates.now() + endtime = starttime + Second(connectiontimeout) + errormsg = nothing + while true + timenow = Dates.now() + timepass = timedifference(starttime, timenow, "seconds") + timeleft = timedifference(timenow, endtime, "seconds") + + if timepass <= connectiontimeout + attempt += 1 + attempt > 1 ? println("Attempt $attempt to connect to MQTT broker. Timed out in $timeleft seconds. $errormsg") : nothing + + try + mqttInstance = mqttClientInstance_v2( + outgoingMsg[:msgMeta][:mqttBrokerAddress], + [mqttMsgReceiveTopic], + mqttMsgReceiveChannel, + keepaliveChannel, + onMsgCallback; + mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort], + clientId=senderId + ) + break + catch e + errormsg = e + sleep(5) + end + else + println("Failed to instantiate MQTT client after $timepass seconds. $errormsg") + return nothing + end end - mqttInstance = mqttClientInstance_v2( - outgoingMsg[:msgMeta][:mqttBrokerAddress], - [mqttMsgReceiveTopic], - mqttMsgReceiveChannel, - keepaliveChannel, - onMsgCallback; - mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort] - ) - - response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt) + response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; + responsetimeout=responsetimeout, responsemaxattempt=responsemaxattempt) try disconnect(mqttInstance.client) catch end return response end function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol, - outgoingMsg::Dict{Symbol, T}; timeout::Integer=60, maxattempt::Integer=1 + outgoingMsg::Dict{Symbol, T}; responsetimeout::Integer=60, responsemaxattempt::Integer=1 )::NamedTuple where {T<:Any} timepass = nothing attempts = 1 - while attempts <= maxattempt + while attempts <= responsemaxattempt sendMqttMsg(mqttInstance, outgoingMsg) starttime = Dates.now() while true timepass = timedifference(starttime, Dates.now(), "seconds") - if timepass <= timeout + if timepass <= responsetimeout if isready(mqttInstance.msgReceiveChannel[receivechannel]) incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel]) @@ -626,9 +663,10 @@ function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel: sleep(1) end if attempts > 1 - println("\n attempts $attempts/$maxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())") + println("\n attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())") pprintln(outgoingMsg) - println(" attempts $attempts/$maxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n") + println(" attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n") + checkMqttConnection!(mqttInstance, keepaliveCheckInterval=5) end attempts += 1 end