10 Commits

Author SHA1 Message Date
narawat lamaiin
5b4c1c1471 update 2025-06-10 10:38:51 +07:00
narawat lamaiin
fc3edd7b8f update 2025-06-10 10:29:57 +07:00
narawat lamaiin
93aa0ee1ac update 2025-06-10 10:16:31 +07:00
narawat lamaiin
42378714a0 mark new version 2025-06-10 09:31:00 +07:00
ton
759f022c98 Merge pull request 'v0.2.4' (#5) from v0.2.4 into main
Reviewed-on: #5
2025-06-10 02:27:09 +00:00
narawat lamaiin
5af4d481f2 update 2025-06-10 09:25:41 +07:00
narawat lamaiin
221bb5beb7 update 2025-06-09 06:34:29 +07:00
narawat lamaiin
5a89e86120 update 2025-06-03 10:08:40 +07:00
narawat lamaiin
e351a92680 mark new version 2025-05-24 08:52:50 +07:00
ton
83cd0cfea3 Merge pull request 'v0.2.3' (#4) from v0.2.3 into main
Reviewed-on: #4
2025-05-24 01:47:53 +00:00
2 changed files with 83 additions and 41 deletions

View File

@@ -1,7 +1,7 @@
name = "GeneralUtils" name = "GeneralUtils"
uuid = "c6c72f09-b708-4ac8-ac7c-2084d70108fe" uuid = "c6c72f09-b708-4ac8-ac7c-2084d70108fe"
authors = ["tonaerospace <tonaerospace.etc@gmail.com>"] authors = ["tonaerospace <tonaerospace.etc@gmail.com>"]
version = "0.2.3" version = "0.3.0"
[deps] [deps]
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"

View File

@@ -74,6 +74,7 @@ mutable struct mqttClientInstance_v2 <: mqttClientInstance
onMsgCallback::Function onMsgCallback::Function
qos::MQTTClient.QOS qos::MQTTClient.QOS
client::MQTTClient.Client client::MQTTClient.Client
clientId::String
connection::MQTTClient.Connection connection::MQTTClient.Connection
keepalivetopic::String keepalivetopic::String
keepaliveChannel::Channel keepaliveChannel::Channel
@@ -153,8 +154,9 @@ function mqttClientInstance_v2(
keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback() keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
onMsgCallback::Function onMsgCallback::Function
; ;
clientId::String="NA",
mqttBrokerPort::Integer=1883, mqttBrokerPort::Integer=1883,
keepalivetopic::String= "/keepalive/$(uuid4snakecase())", keepaliveTopic::String= "/keepalive/$(uuid4snakecase())",
keepaliveCheckInterval::Integer=30, keepaliveCheckInterval::Integer=30,
qos::MQTTClient.QOS=QOS_1, qos::MQTTClient.QOS=QOS_1,
multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta
@@ -166,7 +168,9 @@ function mqttClientInstance_v2(
for i in subtopic for i in subtopic
MQTTClient.subscribe(client, i, onMsgCallback, qos=qos) MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
end end
MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos) MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos)
keepaliveTopic = clientId == "NA" ? keepaliveTopic : "/keepalive/$clientId"
instance = mqttClientInstance_v2( instance = mqttClientInstance_v2(
mqttBrokerAddress, mqttBrokerAddress,
@@ -176,8 +180,9 @@ function mqttClientInstance_v2(
onMsgCallback, onMsgCallback,
qos, qos,
client, client,
clientId,
connection, connection,
keepalivetopic, keepaliveTopic,
keepaliveChannel, keepaliveChannel,
keepaliveCheckInterval, keepaliveCheckInterval,
nothing, nothing,
@@ -346,8 +351,6 @@ end
----- -----
""" """
function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance} function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance}
starttime = Dates.now()
isconnectionalive = false isconnectionalive = false
# ditch old keepalive msg is any # ditch old keepalive msg is any
@@ -366,12 +369,12 @@ function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstan
:text=>"keepalive", :text=>"keepalive",
) )
) )
publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTopic], publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTopic],
JSON3.write(keepaliveMsg)) JSON3.write(keepaliveMsg))
timediff = 0 timediff = 0
while timediff < 5 starttime = Dates.now()
while timediff <= 5
timediff = timedifference(starttime, Dates.now(), "seconds") timediff = timedifference(starttime, Dates.now(), "seconds")
if isready(mqttInstance.keepaliveChannel) if isready(mqttInstance.keepaliveChannel)
incomingMsg = take!(mqttInstance.keepaliveChannel) incomingMsg = take!(mqttInstance.keepaliveChannel)
@@ -393,7 +396,7 @@ end
----- -----
mqttInstanceDict::Dict{Symbol, Any} mqttInstanceDict::Dict{Symbol, Any}
a dictionary contain mqtt instance. 1 per mqtt client. a dictionary contain mqtt instance. 1 per mqtt client.
interval::Integer keepaliveCheckInterval::Integer
time interval to check mqtt server in seconds time interval to check mqtt server in seconds
Return\n Return\n
@@ -413,7 +416,7 @@ end
----- -----
""" """
function checkMqttConnection!(mqttInstance::T; function checkMqttConnection!(mqttInstance::T;
keepaliveCheckInterval::Union{Integer, Nothing}=nothing) where {T<:mqttClientInstance} keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Union{Bool, Nothing} where {T<:mqttClientInstance}
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
@@ -425,29 +428,32 @@ function checkMqttConnection!(mqttInstance::T;
Inf Inf
end end
isreconnect = false # this value is true if connection is disconnected
if intervaldiff > interval if intervaldiff > interval
connectionStatusStart = isMqttConnectionAlive(mqttInstance) # a flag to note whether the connection status has changed from false to true
while true while true
mqttConnStatus = isMqttConnectionAlive(mqttInstance) mqttConnStatus = isMqttConnectionAlive(mqttInstance)
if mqttConnStatus == false if mqttConnStatus == false
isreconnect = true sleep(5) # wait
println("mqtt connection disconnected, attemping to reconnect $(Dates.now())") println("MQTT connection disconnected, attemping to reconnect $(Dates.now()) at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)")
# use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
mqttInstance.client, mqttInstance.connection = mqttInstance.client, mqttInstance.connection =
MakeConnection(mqttInstance.mqttBrokerAddress, MakeConnection(mqttInstance.mqttBrokerAddress,
mqttInstance.mqttBrokerPort) mqttInstance.mqttBrokerPort)
connect(mqttInstance.client, mqttInstance.connection) try
for topic in mqttInstance.subtopic connect(mqttInstance.client, mqttInstance.connection)
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) for topic in mqttInstance.subtopic
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
end
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
catch
println("Failed to reconnect MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
end end
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
sleep(1) # wait before checking connection again
else else
mqttInstance.lastTimeMqttConnCheck = Dates.now() mqttInstance.lastTimeMqttConnCheck = Dates.now()
if isreconnect if connectionStatusStart != mqttConnStatus
println("connected to mqtt broker") println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
end end
return isreconnect return mqttConnStatus
end end
end end
else else
@@ -567,8 +573,9 @@ julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
# Signature # Signature
""" """
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T}; function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any} connectiontimeout::Integer=600, responsetimeout::Integer=60, responsemaxattempt::Integer=1)::NamedTuple where {T<:Any}
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])" senderId = outgoingMsg[:msgMeta][:senderId]
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$senderId"
mqttMsgReceiveChannel = (ch1=Channel(8),) mqttMsgReceiveChannel = (ch1=Channel(8),)
keepaliveChannel = Channel(8) keepaliveChannel = Channel(8)
outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
@@ -576,38 +583,72 @@ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
# Define the callback for receiving messages. # Define the callback for receiving messages.
function onMsgCallback(topic, payload) function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload)) jobj = JSON3.read(String(payload))
onMsg = copy(jobj) incomingMqttMsg = copy(jobj)
put!(mqttMsgReceiveChannel[:ch1], onMsg)
if occursin("GeneralUtils_sendReceiveMqttMsg", topic)
put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
elseif occursin("keepalive", topic)
put!(keepaliveChannel, incomingMqttMsg)
else
println("undefined condition ", @__FILE__, " ", @__LINE__)
end
end
mqttInstance = nothing
attempt = 0
starttime = Dates.now()
endtime = starttime + Second(connectiontimeout)
errormsg = nothing
while true
timenow = Dates.now()
timepass = timedifference(starttime, timenow, "seconds")
timeleft = timedifference(timenow, endtime, "seconds")
if timepass <= connectiontimeout
attempt += 1
attempt > 1 ? println("Attempt $attempt to connect to MQTT broker. Timed out in $timeleft seconds. $errormsg") : nothing
try
mqttInstance = mqttClientInstance_v2(
outgoingMsg[:msgMeta][:mqttBrokerAddress],
[mqttMsgReceiveTopic],
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback;
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort],
clientId=senderId
)
break
catch e
errormsg = e
sleep(5)
end
else
println("Failed to instantiate MQTT client after $timepass seconds. $errormsg")
return nothing
end
end end
mqttInstance = mqttClientInstance_v2( response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg;
outgoingMsg[:msgMeta][:mqttBrokerAddress], responsetimeout=responsetimeout, responsemaxattempt=responsemaxattempt)
[mqttMsgReceiveTopic],
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback;
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
)
response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt)
try disconnect(mqttInstance.client) catch end try disconnect(mqttInstance.client) catch end
return response return response
end end
function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol, function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol,
outgoingMsg::Dict{Symbol, T}; timeout::Integer=60, maxattempt::Integer=1 outgoingMsg::Dict{Symbol, T}; responsetimeout::Integer=60, responsemaxattempt::Integer=1
)::NamedTuple where {T<:Any} )::NamedTuple where {T<:Any}
timepass = nothing timepass = nothing
attempts = 1 attempts = 1
while attempts <= maxattempt while attempts <= responsemaxattempt
sendMqttMsg(mqttInstance, outgoingMsg) sendMqttMsg(mqttInstance, outgoingMsg)
starttime = Dates.now() starttime = Dates.now()
while true while true
timepass = timedifference(starttime, Dates.now(), "seconds") timepass = timedifference(starttime, Dates.now(), "seconds")
if timepass <= timeout if timepass <= responsetimeout
if isready(mqttInstance.msgReceiveChannel[receivechannel]) if isready(mqttInstance.msgReceiveChannel[receivechannel])
incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel]) incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel])
@@ -626,9 +667,10 @@ function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel:
sleep(1) sleep(1)
end end
if attempts > 1 if attempts > 1
println("\n<sendReceiveMqttMsg()> attempts $attempts/$maxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())") println("\n<sendReceiveMqttMsg()> attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
pprintln(outgoingMsg) pprintln(outgoingMsg)
println("</sendReceiveMqttMsg()> attempts $attempts/$maxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n") println("</sendReceiveMqttMsg()> attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n")
checkMqttConnection!(mqttInstance, keepaliveCheckInterval=5)
end end
attempts += 1 attempts += 1
end end