diff --git a/Manifest.toml b/Manifest.toml
index 15af71d..e55c21a 100644
--- a/Manifest.toml
+++ b/Manifest.toml
@@ -2,7 +2,7 @@
julia_version = "1.11.5"
manifest_format = "2.0"
-project_hash = "310a130f0609e5376c1f8f5c3b09f0f87b328887"
+project_hash = "a942446c2f26ef72d0c4b0ca522e0adcf709ce4e"
[[deps.AliasTables]]
deps = ["PtrArrays", "Random"]
@@ -92,11 +92,6 @@ deps = ["Printf"]
uuid = "ade2ca70-3891-5945-98fb-dc099432e06a"
version = "1.11.0"
-[[deps.Distributed]]
-deps = ["Random", "Serialization", "Sockets"]
-uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b"
-version = "1.11.0"
-
[[deps.Distributions]]
deps = ["AliasTables", "FillArrays", "LinearAlgebra", "PDMats", "Printf", "QuadGK", "Random", "SpecialFunctions", "Statistics", "StatsAPI", "StatsBase", "StatsFuns"]
git-tree-sha1 = "3101c32aab536e7a27b1763c0797dba151b899ad"
@@ -280,16 +275,6 @@ version = "0.3.28"
uuid = "56ddb016-857b-54e1-b83d-db4d58db5568"
version = "1.11.0"
-[[deps.MQTTClient]]
-deps = ["Distributed", "Random", "Sockets"]
-git-tree-sha1 = "f2597b290d4bf17b577346153cd2ddf9accb5c26"
-uuid = "985f35cc-2c3d-4943-b8c1-f0931d5f0959"
-version = "0.3.1"
-weakdeps = ["PrecompileTools"]
-
- [deps.MQTTClient.extensions]
- PrecompileMQTT = "PrecompileTools"
-
[[deps.Markdown]]
deps = ["Base64"]
uuid = "d6f4376e-aef5-505a-96c1-9c027394607a"
diff --git a/Project.toml b/Project.toml
index 1e93b82..5a68284 100644
--- a/Project.toml
+++ b/Project.toml
@@ -10,7 +10,6 @@ DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
-MQTTClient = "985f35cc-2c3d-4943-b8c1-f0931d5f0959"
NATS = "55e73f9c-eeeb-467f-b4cc-a633fde63d2a"
PrettyPrinting = "54e16d92-306c-5ea0-a30b-337be88ac337"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
diff --git a/src/GeneralUtils.jl b/src/GeneralUtils.jl
index 392e4c4..41463b1 100644
--- a/src/GeneralUtils.jl
+++ b/src/GeneralUtils.jl
@@ -2,7 +2,7 @@ module GeneralUtils
export # struct
- mqttClientInstance,
+ # mqttClientInstance,
# function
noNegative!, randomWithProb, randomChoiceWithProb, findIndex, limitvalue
diff --git a/src/communication.jl b/src/communication.jl
index b56609d..f44f77f 100644
--- a/src/communication.jl
+++ b/src/communication.jl
@@ -1,210 +1,211 @@
module communication
-export generate_msgMeta, isMqttConnectionAlive, checkMqttConnection!,
- sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2,
- dataTransferOverMQTT_sender, dataTransferOverMQTT_receiver
+export generate_msgMeta
+ # isMqttConnectionAlive, checkMqttConnection!,
+ # sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2,
+ # dataTransferOverMQTT_sender, dataTransferOverMQTT_receiver
-using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames,
+using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, DataFrames,
SHA, PrettyPrinting
using ..util
# ---------------------------------------------- 100 --------------------------------------------- #
-abstract type mqttClientInstance end
+# abstract type mqttClientInstance end
-mutable struct mqttClientInstance_v1 <: mqttClientInstance
- mqttBrokerAddress::String
- mqttBrokerPort::Integer
- subtopic::Vector{String}
- receiveDataChannel::Channel
- onMsgCallback::Function
- qos::MQTTClient.QOS
- client::MQTTClient.Client
- connection::MQTTClient.Connection
- keepalivetopic::String
- keepaliveChannel::Channel
- keepaliveCheckInterval::Integer # second
- lastTimeMqttConnCheck::Union{DateTime, Nothing}
-end
+# mutable struct mqttClientInstance_v1 <: mqttClientInstance
+# mqttBrokerAddress::String
+# mqttBrokerPort::Integer
+# subtopic::Vector{String}
+# receiveDataChannel::Channel
+# onMsgCallback::Function
+# qos::MQTTClient.QOS
+# client::MQTTClient.Client
+# connection::MQTTClient.Connection
+# keepalivetopic::String
+# keepaliveChannel::Channel
+# keepaliveCheckInterval::Integer # second
+# lastTimeMqttConnCheck::Union{DateTime, Nothing}
+# end
-function mqttClientInstance_v1(
- mqttBrokerAddress::String,
- mqttBrokerPort::Integer,
- subtopic::Vector{String},
- receiveDataChannel::Channel,
- keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback()
- onMsgCallback::Function
- ;
- keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
- keepaliveCheckInterval::Integer=30,
- qos::MQTTClient.QOS=QOS_1,
- )
+# function mqttClientInstance_v1(
+# mqttBrokerAddress::String,
+# mqttBrokerPort::Integer,
+# subtopic::Vector{String},
+# receiveDataChannel::Channel,
+# keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback()
+# onMsgCallback::Function
+# ;
+# keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
+# keepaliveCheckInterval::Integer=30,
+# qos::MQTTClient.QOS=QOS_1,
+# )
- client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
- MQTTClient.connect(client, connection)
- for i in subtopic
- MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
- end
- MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos)
+# client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
+# MQTTClient.connect(client, connection)
+# for i in subtopic
+# MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
+# end
+# MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos)
- instance = mqttClientInstance(
- mqttBrokerAddress,
- mqttBrokerPort,
- subtopic,
- receiveDataChannel,
- onMsgCallback,
- qos,
- client,
- connection,
- keepalivetopic,
- keepaliveChannel,
- keepaliveCheckInterval,
- Dates.now(),
- )
+# instance = mqttClientInstance(
+# mqttBrokerAddress,
+# mqttBrokerPort,
+# subtopic,
+# receiveDataChannel,
+# onMsgCallback,
+# qos,
+# client,
+# connection,
+# keepalivetopic,
+# keepaliveChannel,
+# keepaliveCheckInterval,
+# Dates.now(),
+# )
- return instance
-end
+# return instance
+# end
-mutable struct mqttClientInstance_v2 <: mqttClientInstance
- mqttBrokerAddress::String
- mqttBrokerPort::Integer
- subtopic::Vector{String}
- msgReceiveChannel
- onMsgCallback::Function
- qos::MQTTClient.QOS
- client::MQTTClient.Client
- clientId::String
- connection::MQTTClient.Connection
- keepalivetopic::String
- keepaliveChannel::Channel
- keepaliveCheckInterval::Integer # second
- lastTimeMqttConnCheck::Union{DateTime, Nothing}
- multiMsg::String # "single", "metadata", "datapart"
- latestMsg
-end
-""" MQTT client v2. The difference between client v1 and v2 is v2 allows multiple
- msgReceiveChannel without having to redefine mqttClientInstance again like in v1.
- This is achieved by using msgReceiveChannel::NamedTuple to store multiple channel.
-# Arguments
- - `mqttBrokerAddress`
- MQTT broker address. Can be URL or IP address
- - `subtopic`
- A list of topic to be subscribed Ex. ["/testopic_1", "/testopic_2"]
- - `msgReceiveChannel`
- A storage the mqtt client used to store incoming message.
- Ex. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(32))
- - `keepaliveChannel`
- A channel to store keepalive message
- - `onMsgCallback`
- A user-defined function to handle incoming message
+# mutable struct mqttClientInstance_v2 <: mqttClientInstance
+# mqttBrokerAddress::String
+# mqttBrokerPort::Integer
+# subtopic::Vector{String}
+# msgReceiveChannel
+# onMsgCallback::Function
+# qos::MQTTClient.QOS
+# client::MQTTClient.Client
+# clientId::String
+# connection::MQTTClient.Connection
+# keepalivetopic::String
+# keepaliveChannel::Channel
+# keepaliveCheckInterval::Integer # second
+# lastTimeMqttConnCheck::Union{DateTime, Nothing}
+# multiMsg::String # "single", "metadata", "datapart"
+# latestMsg
+# end
+# """ MQTT client v2. The difference between client v1 and v2 is v2 allows multiple
+# msgReceiveChannel without having to redefine mqttClientInstance again like in v1.
+# This is achieved by using msgReceiveChannel::NamedTuple to store multiple channel.
+# # Arguments
+# - `mqttBrokerAddress`
+# MQTT broker address. Can be URL or IP address
+# - `subtopic`
+# A list of topic to be subscribed Ex. ["/testopic_1", "/testopic_2"]
+# - `msgReceiveChannel`
+# A storage the mqtt client used to store incoming message.
+# Ex. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(32))
+# - `keepaliveChannel`
+# A channel to store keepalive message
+# - `onMsgCallback`
+# A user-defined function to handle incoming message
-# Keyword Arguments
- - `mqttBrokerPort`
- mqtt broker port
- - `keepalivetopic`
- A topic where keepalive message going
- - `keepaliveCheckInterval`
- A time interval to check whether the mqtt client still connect to mqtt broker
- - `qos`
- Quality of Service. Can be QOS_0, QOS_1, QOS_2
- - `clearOldMsg`
- Boolean to determine if old messages should be cleared from channels
+# # Keyword Arguments
+# - `mqttBrokerPort`
+# mqtt broker port
+# - `keepalivetopic`
+# A topic where keepalive message going
+# - `keepaliveCheckInterval`
+# A time interval to check whether the mqtt client still connect to mqtt broker
+# - `qos`
+# Quality of Service. Can be QOS_0, QOS_1, QOS_2
+# - `clearOldMsg`
+# Boolean to determine if old messages should be cleared from channels
-# Return
- - `mqttInstance`
- A new mqtt client
+# # Return
+# - `mqttInstance`
+# A new mqtt client
-# Example
- ```jldoctest
- julia> using Revise
- julia> using GeneralUtils, Dates, JSON3, UUIDs
- julia> mqttMsgReceiveTopic = ["/receivetopic_1", "/receivetopic_2"]
- julia> mqttMsgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) # single channel Ex. (ch1=Channel(8),)
- julia> keepaliveChannel = Channel(8)
- julia> function onMsgCallback(topic, payload)
- jobj = JSON3.read(String(payload))
- incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively
+# # Example
+# ```jldoctest
+# julia> using Revise
+# julia> using GeneralUtils, Dates, JSON3, UUIDs
+# julia> mqttMsgReceiveTopic = ["/receivetopic_1", "/receivetopic_2"]
+# julia> mqttMsgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) # single channel Ex. (ch1=Channel(8),)
+# julia> keepaliveChannel = Channel(8)
+# julia> function onMsgCallback(topic, payload)
+# jobj = JSON3.read(String(payload))
+# incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively
- if occursin("topic_1", topic)
- put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
- elseif occursin("topic_2", topic)
- put!(mqttMsgReceiveChannel[:ch2], incomingMqttMsg)
- elseif occursin("keepalive", topic)
- put!(keepaliveChannel, incomingMqttMsg)
- else
- println("undefined condition ", @__FILE__, " ", @__LINE__)
- end
- end
- julia> mqttInstance = GeneralUtils.mqttClientInstance_v2(
- "mqtt.yiem.cc",
- mqttMsgReceiveTopic,
- mqttMsgReceiveChannel,
- keepaliveChannel,
- onMsgCallback
- )
- ```
+# if occursin("topic_1", topic)
+# put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
+# elseif occursin("topic_2", topic)
+# put!(mqttMsgReceiveChannel[:ch2], incomingMqttMsg)
+# elseif occursin("keepalive", topic)
+# put!(keepaliveChannel, incomingMqttMsg)
+# else
+# println("undefined condition ", @__FILE__, " ", @__LINE__)
+# end
+# end
+# julia> mqttInstance = GeneralUtils.mqttClientInstance_v2(
+# "mqtt.yiem.cc",
+# mqttMsgReceiveTopic,
+# mqttMsgReceiveChannel,
+# keepaliveChannel,
+# onMsgCallback
+# )
+# ```
-# Signature
-"""
-function mqttClientInstance_v2(
- mqttBrokerAddress::String,
- subtopic::Vector{String},
- msgReceiveChannel, # NamedTuple of channels i.e. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(8))
- keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
- onMsgCallback::Function
- ;
- clientId::String="NA",
- mqttBrokerPort::Integer=1883,
- keepaliveTopic::String= "/keepalive/$(uuid4snakecase())",
- keepaliveCheckInterval::Integer=30,
- qos::MQTTClient.QOS=QOS_1,
- multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta
- clearOldMsg::Bool=true,
- )
+# # Signature
+# """
+# function mqttClientInstance_v2(
+# mqttBrokerAddress::String,
+# subtopic::Vector{String},
+# msgReceiveChannel, # NamedTuple of channels i.e. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(8))
+# keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
+# onMsgCallback::Function
+# ;
+# clientId::String="NA",
+# mqttBrokerPort::Integer=1883,
+# keepaliveTopic::String= "/keepalive/$(uuid4snakecase())",
+# keepaliveCheckInterval::Integer=30,
+# qos::MQTTClient.QOS=QOS_1,
+# multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta
+# clearOldMsg::Bool=true,
+# )
- client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
- MQTTClient.connect(client, connection)
- for i in subtopic
- MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
- end
- MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos)
+# client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
+# MQTTClient.connect(client, connection)
+# for i in subtopic
+# MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
+# end
+# MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos)
- keepaliveTopic = clientId == "NA" ? keepaliveTopic : "/keepalive/$clientId"
+# keepaliveTopic = clientId == "NA" ? keepaliveTopic : "/keepalive/$clientId"
- instance = mqttClientInstance_v2(
- mqttBrokerAddress,
- mqttBrokerPort,
- subtopic,
- msgReceiveChannel,
- onMsgCallback,
- qos,
- client,
- clientId,
- connection,
- keepaliveTopic,
- keepaliveChannel,
- keepaliveCheckInterval,
- nothing,
- multiMsg,
- nothing, # store latest message to prevent doing the same massage twice
- )
+# instance = mqttClientInstance_v2(
+# mqttBrokerAddress,
+# mqttBrokerPort,
+# subtopic,
+# msgReceiveChannel,
+# onMsgCallback,
+# qos,
+# client,
+# clientId,
+# connection,
+# keepaliveTopic,
+# keepaliveChannel,
+# keepaliveCheckInterval,
+# nothing,
+# multiMsg,
+# nothing, # store latest message to prevent doing the same massage twice
+# )
- if clearOldMsg
- chnames = keys(msgReceiveChannel)
- for i in chnames
- totalmsg = msgReceiveChannel[i].n_avail_items #[TESTING]
- if totalmsg > 0
- while isready(msgReceiveChannel[i])
- _ = take!(msgReceiveChannel[i])
- end
- println("Total $totalmsg old MQTT messages cleared ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
- end
- end
- end
+# if clearOldMsg
+# chnames = keys(msgReceiveChannel)
+# for i in chnames
+# totalmsg = msgReceiveChannel[i].n_avail_items #[TESTING]
+# if totalmsg > 0
+# while isready(msgReceiveChannel[i])
+# _ = take!(msgReceiveChannel[i])
+# end
+# println("Total $totalmsg old MQTT messages cleared ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
+# end
+# end
+# end
- return instance
-end
+# return instance
+# end
""" Generate msgMeta to be including in a message. So the message receiver know
@@ -324,551 +325,551 @@ end
-""" Check mqtt server connection.
+# """ Check mqtt server connection.
- Arguments\n
- -----
- mqttInstanceDict::Dict{Symbol, Any}
- a dictionary contain mqtt instance. 1 per mqtt client.
- interval::Integer
- time interval to check mqtt server in seconds
+# Arguments\n
+# -----
+# mqttInstanceDict::Dict{Symbol, Any}
+# a dictionary contain mqtt instance. 1 per mqtt client.
+# interval::Integer
+# time interval to check mqtt server in seconds
- Return\n
- -----
- isconnectionalive::Bool
- true if mqtt connection is alive
+# Return\n
+# -----
+# isconnectionalive::Bool
+# true if mqtt connection is alive
- Example\n
- -----
- ```jldoctest
- julia> using Revise
- julia> using GeneralUtils, Dates, JSON3, MQTTClient
- julia> GeneralUtils.isMqttConnectionAlive(mqttInstance)
- true
- ```
+# Example\n
+# -----
+# ```jldoctest
+# julia> using Revise
+# julia> using GeneralUtils, Dates, JSON3, MQTTClient
+# julia> GeneralUtils.isMqttConnectionAlive(mqttInstance)
+# true
+# ```
- Signature\n
- -----
-"""
-function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance}
- isconnectionalive = false
+# Signature\n
+# -----
+# """
+# function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance}
+# isconnectionalive = false
- # ditch old keepalive msg is any
- while isready(mqttInstance.keepaliveChannel)
- _ = take!(mqttInstance.keepaliveChannel)
- end
+# # ditch old keepalive msg is any
+# while isready(mqttInstance.keepaliveChannel)
+# _ = take!(mqttInstance.keepaliveChannel)
+# end
- msgMeta = generate_msgMeta(
- mqttInstance.keepalivetopic,
- msgPurpose= "keepalive",
- )
+# msgMeta = generate_msgMeta(
+# mqttInstance.keepalivetopic,
+# msgPurpose= "keepalive",
+# )
- keepaliveMsg = Dict(
- :msgMeta=> msgMeta,
- :payload=> Dict(
- :text=>"keepalive",
- )
- )
+# keepaliveMsg = Dict(
+# :msgMeta=> msgMeta,
+# :payload=> Dict(
+# :text=>"keepalive",
+# )
+# )
- publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTo],
- JSON3.write(keepaliveMsg))
- timediff = 0
- starttime = Dates.now()
- while timediff <= 5
- timediff = timedifference(starttime, Dates.now(), "seconds")
- if isready(mqttInstance.keepaliveChannel)
- incomingMsg = take!(mqttInstance.keepaliveChannel)
- if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId]
- # connection is alive
- isconnectionalive = true
- break
- end
- end
- sleep(1)
- end
- return isconnectionalive
-end
+# publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTo],
+# JSON3.write(keepaliveMsg))
+# timediff = 0
+# starttime = Dates.now()
+# while timediff <= 5
+# timediff = timedifference(starttime, Dates.now(), "seconds")
+# if isready(mqttInstance.keepaliveChannel)
+# incomingMsg = take!(mqttInstance.keepaliveChannel)
+# if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId]
+# # connection is alive
+# isconnectionalive = true
+# break
+# end
+# end
+# sleep(1)
+# end
+# return isconnectionalive
+# end
-""" Check mqtt server connection, reconnect if disconnected.
+# """ Check mqtt server connection, reconnect if disconnected.
- Arguments\n
- -----
- mqttInstanceDict::Dict{Symbol, Any}
- a dictionary contain mqtt instance. 1 per mqtt client.
- keepaliveCheckInterval::Integer
- time interval to check mqtt server in seconds
+# Arguments\n
+# -----
+# mqttInstanceDict::Dict{Symbol, Any}
+# a dictionary contain mqtt instance. 1 per mqtt client.
+# keepaliveCheckInterval::Integer
+# time interval to check mqtt server in seconds
- Return\n
- -----
- isreconnect::Bool
- true if mqtt connection is reconnected
+# Return\n
+# -----
+# isreconnect::Bool
+# true if mqtt connection is reconnected
- Example\n
- -----
- ```jldoctest
- julia> using Revise
- julia> using GeneralUtils, Dates, JSON3
- julia> GeneralUtils.checkMqttConnection!(mqttInstance, 5)
- ```
+# Example\n
+# -----
+# ```jldoctest
+# julia> using Revise
+# julia> using GeneralUtils, Dates, JSON3
+# julia> GeneralUtils.checkMqttConnection!(mqttInstance, 5)
+# ```
- Signature\n
- -----
-"""
-function checkMqttConnection!(mqttInstance::T;
- keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Union{Bool, Nothing} where {T<:mqttClientInstance}
+# Signature\n
+# -----
+# """
+# function checkMqttConnection!(mqttInstance::T;
+# keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Union{Bool, Nothing} where {T<:mqttClientInstance}
- interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
+# interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
- # check if mqtt connection is still up
- intervaldiff =
- if mqttInstance.lastTimeMqttConnCheck !== nothing
- timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds")
- else
- Inf
- end
+# # check if mqtt connection is still up
+# intervaldiff =
+# if mqttInstance.lastTimeMqttConnCheck !== nothing
+# timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds")
+# else
+# Inf
+# end
- if intervaldiff > interval
- connectionStatusStart = isMqttConnectionAlive(mqttInstance) # a flag to note whether the connection status has changed from false to true
- while true
- mqttConnStatus = isMqttConnectionAlive(mqttInstance)
- if mqttConnStatus == false
- sleep(5) # wait
- println("MQTT connection disconnected, attemping to reconnect $(Dates.now()) at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)")
- # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
- mqttInstance.client, mqttInstance.connection =
- MakeConnection(mqttInstance.mqttBrokerAddress,
- mqttInstance.mqttBrokerPort)
- try
- connect(mqttInstance.client, mqttInstance.connection)
- for topic in mqttInstance.subtopic
- subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
- end
- MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
- catch
- println("Failed to reconnect MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
- end
- else
- mqttInstance.lastTimeMqttConnCheck = Dates.now()
- if connectionStatusStart != mqttConnStatus
- println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
- end
- return mqttConnStatus
- end
- end
- else
- return nothing # still within check interval, no update on connection status yet
- end
-end
+# if intervaldiff > interval
+# connectionStatusStart = isMqttConnectionAlive(mqttInstance) # a flag to note whether the connection status has changed from false to true
+# while true
+# mqttConnStatus = isMqttConnectionAlive(mqttInstance)
+# if mqttConnStatus == false
+# sleep(5) # wait
+# println("MQTT connection disconnected, attemping to reconnect $(Dates.now()) at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)")
+# # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
+# mqttInstance.client, mqttInstance.connection =
+# MakeConnection(mqttInstance.mqttBrokerAddress,
+# mqttInstance.mqttBrokerPort)
+# try
+# connect(mqttInstance.client, mqttInstance.connection)
+# for topic in mqttInstance.subtopic
+# subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
+# end
+# MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
+# catch
+# println("Failed to reconnect MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
+# end
+# else
+# mqttInstance.lastTimeMqttConnCheck = Dates.now()
+# if connectionStatusStart != mqttConnStatus
+# println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
+# end
+# return mqttConnStatus
+# end
+# end
+# else
+# return nothing # still within check interval, no update on connection status yet
+# end
+# end
-""" Send a message to specified MQTT topic then wait for reply.
+# """ Send a message to specified MQTT topic then wait for reply.
-# Arguments
- - `outgoingMsg::Dict`
- an outgoing message
+# # Arguments
+# - `outgoingMsg::Dict`
+# an outgoing message
-# Return
- - `result::NamedTuple`
- ( success= true, error= nothing )
+# # Return
+# - `result::NamedTuple`
+# ( success= true, error= nothing )
-# Example
- ```jldoctest
- julia> using Revise
- julia> using GeneralUtils, Dates, JSON3, UUIDs
- julia> msgMeta = GeneralUtils.generate_msgMeta(
- "/testtopic",
- senderName= "somename",
- senderId= uuid4snakecase(),
- mqttBrokerAddress= "test.mosquitto.org",
- mqttBrokerPort= 1883,
- )
- julia> outgoingMsg = Dict(
- :msgMeta=> msgMeta,
- :payload=> Dict("hello"=> "World"),
- )
- julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
- ```
+# # Example
+# ```jldoctest
+# julia> using Revise
+# julia> using GeneralUtils, Dates, JSON3, UUIDs
+# julia> msgMeta = GeneralUtils.generate_msgMeta(
+# "/testtopic",
+# senderName= "somename",
+# senderId= uuid4snakecase(),
+# mqttBrokerAddress= "test.mosquitto.org",
+# mqttBrokerPort= 1883,
+# )
+# julia> outgoingMsg = Dict(
+# :msgMeta=> msgMeta,
+# :payload=> Dict("hello"=> "World"),
+# )
+# julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
+# ```
-# Signature
-"""
-function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
+# # Signature
+# """
+# function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
- # sendMqttMsg() doesn't need to receive msg but mqttClientInstance_v2 requires to have receive topic
- mqttMsgReceiveTopic = "/GeneralUtils_sendMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
+# # sendMqttMsg() doesn't need to receive msg but mqttClientInstance_v2 requires to have receive topic
+# mqttMsgReceiveTopic = "/GeneralUtils_sendMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
- mqttMsgReceiveChannel = (ch1=Channel(8),)
- keepaliveChannel = Channel(8)
+# mqttMsgReceiveChannel = (ch1=Channel(8),)
+# keepaliveChannel = Channel(8)
- # Define the callback for receiving messages.
- function onMsgCallback(topic, payload)
- jobj = JSON3.read(String(payload))
- onMsg = copy(jobj)
- put!(mqttMsgReceiveChannel[:ch1], onMsg)
- end
+# # Define the callback for receiving messages.
+# function onMsgCallback(topic, payload)
+# jobj = JSON3.read(String(payload))
+# onMsg = copy(jobj)
+# put!(mqttMsgReceiveChannel[:ch1], onMsg)
+# end
- mqttInstance = mqttClientInstance_v2(
- outgoingMsg[:msgMeta][:mqttBrokerAddress],
- [mqttMsgReceiveTopic],
- mqttMsgReceiveChannel,
- keepaliveChannel,
- onMsgCallback;
- mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
- )
+# mqttInstance = mqttClientInstance_v2(
+# outgoingMsg[:msgMeta][:mqttBrokerAddress],
+# [mqttMsgReceiveTopic],
+# mqttMsgReceiveChannel,
+# keepaliveChannel,
+# onMsgCallback;
+# mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
+# )
- response = sendMqttMsg(mqttInstance, outgoingMsg)
- try disconnect(mqttInstance.client) catch end
+# response = sendMqttMsg(mqttInstance, outgoingMsg)
+# try disconnect(mqttInstance.client) catch end
- return response
-end
-function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T}
- )::NamedTuple where {T<:Any}
- try
- publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTo], JSON3.write(outgoingMsg))
- return (success=true, error=nothing)
- catch e
- return (success=false, error=e)
- end
-end
+# return response
+# end
+# function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T}
+# )::NamedTuple where {T<:Any}
+# try
+# publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTo], JSON3.write(outgoingMsg))
+# return (success=true, error=nothing)
+# catch e
+# return (success=false, error=e)
+# end
+# end
-""" Send a message to specified MQTT topic then wait for reply.
+# """ Send a message to specified MQTT topic then wait for reply.
-# Arguments
- - `outgoingMsg::Dict{Symbol, T}`
- an outgoing message
+# # Arguments
+# - `outgoingMsg::Dict{Symbol, T}`
+# an outgoing message
-# Keyword Arguments
- - `timeout::Integer=60`
- time in seconds to wait for a response before error
- - `maxattempt::Integer=1`
- maximum number of attempts to send and receive message
+# # Keyword Arguments
+# - `timeout::Integer=60`
+# time in seconds to wait for a response before error
+# - `maxattempt::Integer=1`
+# maximum number of attempts to send and receive message
-# Return
- - `result::NamedTuple`
- (
- success= true, # idicates whether sending MQTT message successful
- error= nothing # error message
- response= somemessage # response message
- )
+# # Return
+# - `result::NamedTuple`
+# (
+# success= true, # idicates whether sending MQTT message successful
+# error= nothing # error message
+# response= somemessage # response message
+# )
-# Example
-```jldoctest
-julia> using Revise
-julia> using GeneralUtils, Dates, UUIDs, JSON3
-julia> msgMeta = GeneralUtils.generate_msgMeta(
- "/testtopic",
- senderName= "somename",
- senderId= uuid4snakecase(),
- mqttBrokerAddress= "mqtt.yiem.cc",
- mqttBrokerPort= 1883,
- )
-julia> outgoingMsg = Dict(
- :msgMeta=> msgMeta,
- :payload=> Dict(:hello=> "World"),
- )
-julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
-```
+# # Example
+# ```jldoctest
+# julia> using Revise
+# julia> using GeneralUtils, Dates, UUIDs, JSON3
+# julia> msgMeta = GeneralUtils.generate_msgMeta(
+# "/testtopic",
+# senderName= "somename",
+# senderId= uuid4snakecase(),
+# mqttBrokerAddress= "mqtt.yiem.cc",
+# mqttBrokerPort= 1883,
+# )
+# julia> outgoingMsg = Dict(
+# :msgMeta=> msgMeta,
+# :payload=> Dict(:hello=> "World"),
+# )
+# julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
+# ```
-# Signature
-"""
-function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
- connectiontimeout::Integer=600, responsetimeout::Integer=60, responsemaxattempt::Integer=1)::NamedTuple where {T<:Any}
- senderId = outgoingMsg[:msgMeta][:senderId]
- mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$senderId"
- mqttMsgReceiveChannel = (ch1=Channel(8),)
- keepaliveChannel = Channel(8)
- outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
+# # Signature
+# """
+# function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
+# connectiontimeout::Integer=600, responsetimeout::Integer=60, responsemaxattempt::Integer=1)::NamedTuple where {T<:Any}
+# senderId = outgoingMsg[:msgMeta][:senderId]
+# mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$senderId"
+# mqttMsgReceiveChannel = (ch1=Channel(8),)
+# keepaliveChannel = Channel(8)
+# outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
- # Define the callback for receiving messages.
- function onMsgCallback(topic, payload)
- jobj = JSON3.read(String(payload))
- incomingMqttMsg = copy(jobj)
+# # Define the callback for receiving messages.
+# function onMsgCallback(topic, payload)
+# jobj = JSON3.read(String(payload))
+# incomingMqttMsg = copy(jobj)
- if occursin("GeneralUtils_sendReceiveMqttMsg", topic)
- put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
- elseif occursin("keepalive", topic)
- put!(keepaliveChannel, incomingMqttMsg)
- else
- println("undefined condition ", @__FILE__, " ", @__LINE__)
- end
- end
+# if occursin("GeneralUtils_sendReceiveMqttMsg", topic)
+# put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
+# elseif occursin("keepalive", topic)
+# put!(keepaliveChannel, incomingMqttMsg)
+# else
+# println("undefined condition ", @__FILE__, " ", @__LINE__)
+# end
+# end
- mqttInstance = nothing
- attempt = 0
- starttime = Dates.now()
- endtime = starttime + Second(connectiontimeout)
- errormsg = nothing
- while true
- timenow = Dates.now()
- timepass = timedifference(starttime, timenow, "seconds")
- timeleft = timedifference(timenow, endtime, "seconds")
+# mqttInstance = nothing
+# attempt = 0
+# starttime = Dates.now()
+# endtime = starttime + Second(connectiontimeout)
+# errormsg = nothing
+# while true
+# timenow = Dates.now()
+# timepass = timedifference(starttime, timenow, "seconds")
+# timeleft = timedifference(timenow, endtime, "seconds")
- if timepass <= connectiontimeout
- attempt += 1
- attempt > 1 ? println("Attempt $attempt to connect to MQTT broker. Timed out in $timeleft seconds. $errormsg") : nothing
+# if timepass <= connectiontimeout
+# attempt += 1
+# attempt > 1 ? println("Attempt $attempt to connect to MQTT broker. Timed out in $timeleft seconds. $errormsg") : nothing
- try
- mqttInstance = mqttClientInstance_v2(
- outgoingMsg[:msgMeta][:mqttBrokerAddress],
- [mqttMsgReceiveTopic],
- mqttMsgReceiveChannel,
- keepaliveChannel,
- onMsgCallback;
- mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort],
- clientId=senderId
- )
- break
- catch e
- errormsg = e
- sleep(5)
- end
- else
- println("Failed to instantiate MQTT client after $timepass seconds. $errormsg")
- return nothing
- end
- end
+# try
+# mqttInstance = mqttClientInstance_v2(
+# outgoingMsg[:msgMeta][:mqttBrokerAddress],
+# [mqttMsgReceiveTopic],
+# mqttMsgReceiveChannel,
+# keepaliveChannel,
+# onMsgCallback;
+# mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort],
+# clientId=senderId
+# )
+# break
+# catch e
+# errormsg = e
+# sleep(5)
+# end
+# else
+# println("Failed to instantiate MQTT client after $timepass seconds. $errormsg")
+# return nothing
+# end
+# end
- response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg;
- responsetimeout=responsetimeout, responsemaxattempt=responsemaxattempt)
- try disconnect(mqttInstance.client) catch end
+# response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg;
+# responsetimeout=responsetimeout, responsemaxattempt=responsemaxattempt)
+# try disconnect(mqttInstance.client) catch end
- return response
-end
-function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol,
- outgoingMsg::Dict{Symbol, T}; responsetimeout::Integer=60, responsemaxattempt::Integer=1
- )::NamedTuple where {T<:Any}
+# return response
+# end
+# function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol,
+# outgoingMsg::Dict{Symbol, T}; responsetimeout::Integer=60, responsemaxattempt::Integer=1
+# )::NamedTuple where {T<:Any}
- timepass = nothing
- attempts = 1
- while attempts <= responsemaxattempt
- sendMqttMsg(mqttInstance, outgoingMsg)
+# timepass = nothing
+# attempts = 1
+# while attempts <= responsemaxattempt
+# sendMqttMsg(mqttInstance, outgoingMsg)
- starttime = Dates.now()
- while true
- timepass = timedifference(starttime, Dates.now(), "seconds")
+# starttime = Dates.now()
+# while true
+# timepass = timedifference(starttime, Dates.now(), "seconds")
- if timepass <= responsetimeout
- if isready(mqttInstance.msgReceiveChannel[receivechannel])
- incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel])
+# if timepass <= responsetimeout
+# if isready(mqttInstance.msgReceiveChannel[receivechannel])
+# incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel])
- if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
- if incomingMsg[:msgMeta][:acknowledgestatus] == "NACK"
- println("NACK")
- break # resend the msg
- else
- return (success=true, error=nothing, response=incomingMsg[:payload])
- end
- end
- end
- else
- break
- end
- sleep(1)
- end
- if attempts > 1
- println("\n attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
- pprintln(outgoingMsg)
- println(" attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n")
- checkMqttConnection!(mqttInstance, keepaliveCheckInterval=5)
- end
- attempts += 1
- end
+# if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
+# if incomingMsg[:msgMeta][:acknowledgestatus] == "NACK"
+# println("NACK")
+# break # resend the msg
+# else
+# return (success=true, error=nothing, response=incomingMsg[:payload])
+# end
+# end
+# end
+# else
+# break
+# end
+# sleep(1)
+# end
+# if attempts > 1
+# println("\n attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
+# pprintln(outgoingMsg)
+# println(" attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n")
+# checkMqttConnection!(mqttInstance, keepaliveCheckInterval=5)
+# end
+# attempts += 1
+# end
- return (success=false,
- error="no response, timeout $timepass/$timeout, $(attempts-1) publish attempted",
- response=nothing)
-end
+# return (success=false,
+# error="no response, timeout $timepass/$timeout, $(attempts-1) publish attempted",
+# response=nothing)
+# end
-""" Send a message to specified MQTT topic then wait for reply.
+# """ Send a message to specified MQTT topic then wait for reply.
-# Arguments
- - `workDict::Dict`
- A dictionary containing workspace data
- - `incomingMsg::Dict`
- The incoming message to process
+# # Arguments
+# - `workDict::Dict`
+# A dictionary containing workspace data
+# - `incomingMsg::Dict`
+# The incoming message to process
-# Keyword Arguments
- - `data`
- Optional data to be transferred. Default is nothing.
+# # Keyword Arguments
+# - `data`
+# Optional data to be transferred. Default is nothing.
-# Return
- - `result::Dict`
- A dictionary containing either:
- - Metadata about the data transfer session if initiating a new transfer
- - The requested data part if responding to a data part request
+# # Return
+# - `result::Dict`
+# A dictionary containing either:
+# - Metadata about the data transfer session if initiating a new transfer
+# - The requested data part if responding to a data part request
-# Example
- ```jldoctest
- julia> using Revise
- julia> using GeneralUtils, Dates, JSON3, UUIDs
- julia> msgMeta = GeneralUtils.generate_msgMeta(
- "/testtopic",
- senderName= "somename",
- senderId= uuid4snakecase(),
- mqttBrokerAddress= "test.mosquitto.org",
- mqttBrokerPort= 1883,
- )
- julia> outgoingMsg = Dict(
- :msgMeta=> msgMeta,
- :payload=> Dict("hello"=> "World"),
- )
- julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
- ```
-# TODO
- - [x] transfer multiple msg with the same mqtt connection. The transfer loop should be
- implemented by passing mqttInstance into sendReceiveMqttMsg() so that
- 1-connection is used to send all data
- - [] partsize should be calculate from the fact that MQTT max size is 4MB (256MB theorectically)
+# # Example
+# ```jldoctest
+# julia> using Revise
+# julia> using GeneralUtils, Dates, JSON3, UUIDs
+# julia> msgMeta = GeneralUtils.generate_msgMeta(
+# "/testtopic",
+# senderName= "somename",
+# senderId= uuid4snakecase(),
+# mqttBrokerAddress= "test.mosquitto.org",
+# mqttBrokerPort= 1883,
+# )
+# julia> outgoingMsg = Dict(
+# :msgMeta=> msgMeta,
+# :payload=> Dict("hello"=> "World"),
+# )
+# julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
+# ```
+# # TODO
+# - [x] transfer multiple msg with the same mqtt connection. The transfer loop should be
+# implemented by passing mqttInstance into sendReceiveMqttMsg() so that
+# 1-connection is used to send all data
+# - [] partsize should be calculate from the fact that MQTT max size is 4MB (256MB theorectically)
-# Signature
-"""
-function dataTransferOverMQTT_sender(workDict::Dict, incomingMsg::Dict; data=nothing)
- dataTransferSessionID = nothing
+# # Signature
+# """
+# function dataTransferOverMQTT_sender(workDict::Dict, incomingMsg::Dict; data=nothing)
+# dataTransferSessionID = nothing
- if !haskey(incomingMsg[:payload], :dataTransferSessionID)
- """ Expected incomingMsg (without "dataTransferSessionID") for requesting metadata
+# if !haskey(incomingMsg[:payload], :dataTransferSessionID)
+# """ Expected incomingMsg (without "dataTransferSessionID") for requesting metadata
- Dict(:payload => Dict(:args => Dict(), :functioncall => "load_winetable"),
- :msgMeta => Dict(:msgPurpose => nothing,
- :receiverId => nothing,
- :getpost => nothing,
- :msgId => "7fe4b765_fa2e_4c2e_9df0_9cb97e84ec32",
- :requestresponse => nothing,
- :msgFormatVersion => nothing,
- :timestamp => "Tue Oct 01 2024 16:22:12 GMT+0700 (เวลาอินโดจีน)",
- :replyToMsgId => nothing,
- :acknowledgestatus => nothing,
- :sendTo => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
- :mqttBrokerPort => nothing,
- :receiverName => "agent_wine_backend",
- :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
- :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
- :senderName => "agent_wine_frontend",
- :senderselfnote => nothing,
- :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
- """
+# Dict(:payload => Dict(:args => Dict(), :functioncall => "load_winetable"),
+# :msgMeta => Dict(:msgPurpose => nothing,
+# :receiverId => nothing,
+# :getpost => nothing,
+# :msgId => "7fe4b765_fa2e_4c2e_9df0_9cb97e84ec32",
+# :requestresponse => nothing,
+# :msgFormatVersion => nothing,
+# :timestamp => "Tue Oct 01 2024 16:22:12 GMT+0700 (เวลาอินโดจีน)",
+# :replyToMsgId => nothing,
+# :acknowledgestatus => nothing,
+# :sendTo => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
+# :mqttBrokerPort => nothing,
+# :receiverName => "agent_wine_backend",
+# :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
+# :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
+# :senderName => "agent_wine_frontend",
+# :senderselfnote => nothing,
+# :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
+# """
- if data !== nothing # if the user provide data, store it in a new session workspace
- dataTransferSessionID = uuid4snakecase()
- haskey(workDict, :dataTransferOverMQTT) ? nothing : workDict[:dataTransferOverMQTT] = Dict()
- workDict[:dataTransferOverMQTT][dataTransferSessionID] = Dict()
- s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
- s[:dataTransferSessionID] = dataTransferSessionID
- dataDict = Dict(pairs(data))
- merge!(s, dataDict)
- else
- error("No data provided")
- end
+# if data !== nothing # if the user provide data, store it in a new session workspace
+# dataTransferSessionID = uuid4snakecase()
+# haskey(workDict, :dataTransferOverMQTT) ? nothing : workDict[:dataTransferOverMQTT] = Dict()
+# workDict[:dataTransferOverMQTT][dataTransferSessionID] = Dict()
+# s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
+# s[:dataTransferSessionID] = dataTransferSessionID
+# dataDict = Dict(pairs(data))
+# merge!(s, dataDict)
+# else
+# error("No data provided")
+# end
- # compute hash value for each piece of data
- s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
- s[:hashes] = [bytes2hex(sha256(JSON3.write(s[:dataparts][i]))) for i in 1:s[:totalparts]]
+# # compute hash value for each piece of data
+# s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
+# s[:hashes] = [bytes2hex(sha256(JSON3.write(s[:dataparts][i]))) for i in 1:s[:totalparts]]
- # send metadata without data to receiver()
- datainfo = Dict()
- for (k, v) in s
- if k ∉ [:dataparts]
- datainfo[k] = v
- end
- end
+# # send metadata without data to receiver()
+# datainfo = Dict()
+# for (k, v) in s
+# if k ∉ [:dataparts]
+# datainfo[k] = v
+# end
+# end
- d = Dict(
- :result=> "dataTransferOverMQTT",
- :dataTransferSessionID=> dataTransferSessionID,
- Symbol(dataTransferSessionID)=> datainfo
- )
+# d = Dict(
+# :result=> "dataTransferOverMQTT",
+# :dataTransferSessionID=> dataTransferSessionID,
+# Symbol(dataTransferSessionID)=> datainfo
+# )
- """ Sending metadata to the requester
- Dict(:result => "dataTransferOverMQTT",
- :b57123aa_a4c6_4d07_ace2_4885005d51c6 => Dict(:partsize => 3,
- :hashes => ["755fb25ae04890fa2a40c2b5fb5ddf05adeff6b934475998576bdf307359cd0f",
- "b95ba78344ad409755a386b6ab0cb2c8ea490756ce2df401aa6e5f77cf445025",
- "5b51277dd06ca255492cfc432824adf77e9e0de94179806cb130bef38e75da48"],
- :datatype => "vector{Dict}",
- :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6",
- :totalparts => 46),
- :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6")
- """
+# """ Sending metadata to the requester
+# Dict(:result => "dataTransferOverMQTT",
+# :b57123aa_a4c6_4d07_ace2_4885005d51c6 => Dict(:partsize => 3,
+# :hashes => ["755fb25ae04890fa2a40c2b5fb5ddf05adeff6b934475998576bdf307359cd0f",
+# "b95ba78344ad409755a386b6ab0cb2c8ea490756ce2df401aa6e5f77cf445025",
+# "5b51277dd06ca255492cfc432824adf77e9e0de94179806cb130bef38e75da48"],
+# :datatype => "vector{Dict}",
+# :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6",
+# :totalparts => 46),
+# :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6")
+# """
- return d
+# return d
- else
+# else
- """ Expected incomingMsg (with "dataTransferSessionID") for requesting a data part
- Dict(:payload => Dict(:dataTransferSessionID => "5543f3b9_194e_4670_b961_e51c77955e40", :args => Dict(), :requestingDataPartNumber => 1, :functioncall => "load_winetable"),
- :msgMeta => Dict(:msgPurpose => nothing,
- :receiverId => nothing,
- :getpost => nothing,
- :msgId => "fbcc3bd9_c2bc_4bde_908a_dfe88ab7a305",
- :requestresponse => nothing,
- :msgFormatVersion => nothing,
- :timestamp => "Tue Oct 01 2024 16:34:06 GMT+0700 (เวลาอินโดจีน)",
- :replyToMsgId => nothing,
- :acknowledgestatus => nothing,
- :sendTo => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
- :mqttBrokerPort => nothing,
- :receiverName => "agent_wine_backend",
- :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
- :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
- :senderName => "agent_wine_frontend",
- :senderselfnote => nothing,
- :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
- """
+# """ Expected incomingMsg (with "dataTransferSessionID") for requesting a data part
+# Dict(:payload => Dict(:dataTransferSessionID => "5543f3b9_194e_4670_b961_e51c77955e40", :args => Dict(), :requestingDataPartNumber => 1, :functioncall => "load_winetable"),
+# :msgMeta => Dict(:msgPurpose => nothing,
+# :receiverId => nothing,
+# :getpost => nothing,
+# :msgId => "fbcc3bd9_c2bc_4bde_908a_dfe88ab7a305",
+# :requestresponse => nothing,
+# :msgFormatVersion => nothing,
+# :timestamp => "Tue Oct 01 2024 16:34:06 GMT+0700 (เวลาอินโดจีน)",
+# :replyToMsgId => nothing,
+# :acknowledgestatus => nothing,
+# :sendTo => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
+# :mqttBrokerPort => nothing,
+# :receiverName => "agent_wine_backend",
+# :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
+# :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
+# :senderName => "agent_wine_frontend",
+# :senderselfnote => nothing,
+# :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
+# """
- # check transfer session id
- dataTransferSessionID = incomingMsg[:payload][:dataTransferSessionID]
- clientRequestingDataPartNumber =
- incomingMsg[:payload][:requestingDataPartNumber]
- s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
+# # check transfer session id
+# dataTransferSessionID = incomingMsg[:payload][:dataTransferSessionID]
+# clientRequestingDataPartNumber =
+# incomingMsg[:payload][:requestingDataPartNumber]
+# s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
- d = Dict(
- :result=> "dataTransferOverMQTT",
- :dataTransferSessionID=> dataTransferSessionID,
- Symbol(dataTransferSessionID)=> Dict(
- :dataPartNumber=> clientRequestingDataPartNumber,
- :datapart=> s[:dataparts][clientRequestingDataPartNumber]
- ),
- )
+# d = Dict(
+# :result=> "dataTransferOverMQTT",
+# :dataTransferSessionID=> dataTransferSessionID,
+# Symbol(dataTransferSessionID)=> Dict(
+# :dataPartNumber=> clientRequestingDataPartNumber,
+# :datapart=> s[:dataparts][clientRequestingDataPartNumber]
+# ),
+# )
- return d
- end
-end
+# return d
+# end
+# end
-""" Send a message to specified MQTT topic then wait for reply.
+# """ Send a message to specified MQTT topic then wait for reply.
-# Arguments
- - `outgoingMsg::Dict`
- an outgoing message
+# # Arguments
+# - `outgoingMsg::Dict`
+# an outgoing message
-# Return
- - `result::NamedTuple`
- ( success= true, error= nothing )
+# # Return
+# - `result::NamedTuple`
+# ( success= true, error= nothing )
-# Example
- ```jldoctest
- julia> using Revise
- julia> using GeneralUtils, Dates, JSON3, UUIDs
- julia> msgMeta = GeneralUtils.generate_msgMeta(
- "/testtopic",
- senderName= "somename",
- senderId= uuid4snakecase(),
- mqttBrokerAddress= "test.mosquitto.org",
- mqttBrokerPort= 1883,
- )
- julia> outgoingMsg = Dict(
- :msgMeta=> msgMeta,
- :payload=> Dict("hello"=> "World"),
- )
- julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
- ```
-# TODO
- - [PENDING]
+# # Example
+# ```jldoctest
+# julia> using Revise
+# julia> using GeneralUtils, Dates, JSON3, UUIDs
+# julia> msgMeta = GeneralUtils.generate_msgMeta(
+# "/testtopic",
+# senderName= "somename",
+# senderId= uuid4snakecase(),
+# mqttBrokerAddress= "test.mosquitto.org",
+# mqttBrokerPort= 1883,
+# )
+# julia> outgoingMsg = Dict(
+# :msgMeta=> msgMeta,
+# :payload=> Dict("hello"=> "World"),
+# )
+# julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
+# ```
+# # TODO
+# - [PENDING]
-# Signature
-"""
-function dataTransferOverMQTT_receiver()
+# # Signature
+# """
+# function dataTransferOverMQTT_receiver()
@@ -878,7 +879,7 @@ function dataTransferOverMQTT_receiver()
-end
+# end
diff --git a/src/dbUtil.jl b/src/dbUtil.jl
index 40296bd..2e4fea8 100644
--- a/src/dbUtil.jl
+++ b/src/dbUtil.jl
@@ -2,7 +2,7 @@ module dbUtil
export dictToPostgresKeyValueString, generateInsertSQL, generateUpdateSQL
-using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames,
+using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, DataFrames,
SHA
using ..util
diff --git a/src/interface.jl b/src/interface.jl
index f7c5672..dea5dcb 100644
--- a/src/interface.jl
+++ b/src/interface.jl
@@ -8,7 +8,7 @@ export noNegative!, randomWithProb, randomChoiceWithProb, findIndex, limitvalue,
isLess, allTrue, getStringBetweenCharacters, JSON3read_stringKey, mkDictPath!,
getDictPath, detectKeywordVariation, textToDict
-using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames, CSV
+using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, DataFrames, CSV
using ..util, ..communication
# ---------------------------------------------- 100 --------------------------------------------- #
diff --git a/src/util.jl b/src/util.jl
index 16ca962..671a201 100644
--- a/src/util.jl
+++ b/src/util.jl
@@ -9,7 +9,7 @@ export timedifference, showstracktrace, findHighestIndexKey, uuid4snakecase, rep
extractTextBetweenCharacter, extractTextBetweenString,
convertCamelSnakeKebabCase, fitrange, recentElementsIndex, nonRecentElementsIndex
-using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames
+using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, DataFrames
# ---------------------------------------------- 100 --------------------------------------------- #