From 0e36b8db90857113608f6ff6421cd1a9fed4ba2b Mon Sep 17 00:00:00 2001 From: narawat Date: Fri, 1 Aug 2025 06:04:17 +0700 Subject: [PATCH] remove MQTT dependency --- Manifest.toml | 17 +- Project.toml | 1 - src/GeneralUtils.jl | 2 +- src/communication.jl | 1311 +++++++++++++++++++++--------------------- src/dbUtil.jl | 2 +- src/interface.jl | 2 +- src/util.jl | 2 +- 7 files changed, 661 insertions(+), 676 deletions(-) diff --git a/Manifest.toml b/Manifest.toml index 15af71d..e55c21a 100644 --- a/Manifest.toml +++ b/Manifest.toml @@ -2,7 +2,7 @@ julia_version = "1.11.5" manifest_format = "2.0" -project_hash = "310a130f0609e5376c1f8f5c3b09f0f87b328887" +project_hash = "a942446c2f26ef72d0c4b0ca522e0adcf709ce4e" [[deps.AliasTables]] deps = ["PtrArrays", "Random"] @@ -92,11 +92,6 @@ deps = ["Printf"] uuid = "ade2ca70-3891-5945-98fb-dc099432e06a" version = "1.11.0" -[[deps.Distributed]] -deps = ["Random", "Serialization", "Sockets"] -uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b" -version = "1.11.0" - [[deps.Distributions]] deps = ["AliasTables", "FillArrays", "LinearAlgebra", "PDMats", "Printf", "QuadGK", "Random", "SpecialFunctions", "Statistics", "StatsAPI", "StatsBase", "StatsFuns"] git-tree-sha1 = "3101c32aab536e7a27b1763c0797dba151b899ad" @@ -280,16 +275,6 @@ version = "0.3.28" uuid = "56ddb016-857b-54e1-b83d-db4d58db5568" version = "1.11.0" -[[deps.MQTTClient]] -deps = ["Distributed", "Random", "Sockets"] -git-tree-sha1 = "f2597b290d4bf17b577346153cd2ddf9accb5c26" -uuid = "985f35cc-2c3d-4943-b8c1-f0931d5f0959" -version = "0.3.1" -weakdeps = ["PrecompileTools"] - - [deps.MQTTClient.extensions] - PrecompileMQTT = "PrecompileTools" - [[deps.Markdown]] deps = ["Base64"] uuid = "d6f4376e-aef5-505a-96c1-9c027394607a" diff --git a/Project.toml b/Project.toml index 1e93b82..5a68284 100644 --- a/Project.toml +++ b/Project.toml @@ -10,7 +10,6 @@ DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" -MQTTClient = "985f35cc-2c3d-4943-b8c1-f0931d5f0959" NATS = "55e73f9c-eeeb-467f-b4cc-a633fde63d2a" PrettyPrinting = "54e16d92-306c-5ea0-a30b-337be88ac337" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" diff --git a/src/GeneralUtils.jl b/src/GeneralUtils.jl index 392e4c4..41463b1 100644 --- a/src/GeneralUtils.jl +++ b/src/GeneralUtils.jl @@ -2,7 +2,7 @@ module GeneralUtils export # struct - mqttClientInstance, + # mqttClientInstance, # function noNegative!, randomWithProb, randomChoiceWithProb, findIndex, limitvalue diff --git a/src/communication.jl b/src/communication.jl index b56609d..f44f77f 100644 --- a/src/communication.jl +++ b/src/communication.jl @@ -1,210 +1,211 @@ module communication -export generate_msgMeta, isMqttConnectionAlive, checkMqttConnection!, - sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2, - dataTransferOverMQTT_sender, dataTransferOverMQTT_receiver +export generate_msgMeta + # isMqttConnectionAlive, checkMqttConnection!, + # sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2, + # dataTransferOverMQTT_sender, dataTransferOverMQTT_receiver -using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames, +using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, DataFrames, SHA, PrettyPrinting using ..util # ---------------------------------------------- 100 --------------------------------------------- # -abstract type mqttClientInstance end +# abstract type mqttClientInstance end -mutable struct mqttClientInstance_v1 <: mqttClientInstance - mqttBrokerAddress::String - mqttBrokerPort::Integer - subtopic::Vector{String} - receiveDataChannel::Channel - onMsgCallback::Function - qos::MQTTClient.QOS - client::MQTTClient.Client - connection::MQTTClient.Connection - keepalivetopic::String - keepaliveChannel::Channel - keepaliveCheckInterval::Integer # second - lastTimeMqttConnCheck::Union{DateTime, Nothing} -end +# mutable struct mqttClientInstance_v1 <: mqttClientInstance +# mqttBrokerAddress::String +# mqttBrokerPort::Integer +# subtopic::Vector{String} +# receiveDataChannel::Channel +# onMsgCallback::Function +# qos::MQTTClient.QOS +# client::MQTTClient.Client +# connection::MQTTClient.Connection +# keepalivetopic::String +# keepaliveChannel::Channel +# keepaliveCheckInterval::Integer # second +# lastTimeMqttConnCheck::Union{DateTime, Nothing} +# end -function mqttClientInstance_v1( - mqttBrokerAddress::String, - mqttBrokerPort::Integer, - subtopic::Vector{String}, - receiveDataChannel::Channel, - keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback() - onMsgCallback::Function - ; - keepalivetopic::String= "/keepalive/$(uuid4snakecase())", - keepaliveCheckInterval::Integer=30, - qos::MQTTClient.QOS=QOS_1, - ) +# function mqttClientInstance_v1( +# mqttBrokerAddress::String, +# mqttBrokerPort::Integer, +# subtopic::Vector{String}, +# receiveDataChannel::Channel, +# keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback() +# onMsgCallback::Function +# ; +# keepalivetopic::String= "/keepalive/$(uuid4snakecase())", +# keepaliveCheckInterval::Integer=30, +# qos::MQTTClient.QOS=QOS_1, +# ) - client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort) - MQTTClient.connect(client, connection) - for i in subtopic - MQTTClient.subscribe(client, i, onMsgCallback, qos=qos) - end - MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos) +# client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort) +# MQTTClient.connect(client, connection) +# for i in subtopic +# MQTTClient.subscribe(client, i, onMsgCallback, qos=qos) +# end +# MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos) - instance = mqttClientInstance( - mqttBrokerAddress, - mqttBrokerPort, - subtopic, - receiveDataChannel, - onMsgCallback, - qos, - client, - connection, - keepalivetopic, - keepaliveChannel, - keepaliveCheckInterval, - Dates.now(), - ) +# instance = mqttClientInstance( +# mqttBrokerAddress, +# mqttBrokerPort, +# subtopic, +# receiveDataChannel, +# onMsgCallback, +# qos, +# client, +# connection, +# keepalivetopic, +# keepaliveChannel, +# keepaliveCheckInterval, +# Dates.now(), +# ) - return instance -end +# return instance +# end -mutable struct mqttClientInstance_v2 <: mqttClientInstance - mqttBrokerAddress::String - mqttBrokerPort::Integer - subtopic::Vector{String} - msgReceiveChannel - onMsgCallback::Function - qos::MQTTClient.QOS - client::MQTTClient.Client - clientId::String - connection::MQTTClient.Connection - keepalivetopic::String - keepaliveChannel::Channel - keepaliveCheckInterval::Integer # second - lastTimeMqttConnCheck::Union{DateTime, Nothing} - multiMsg::String # "single", "metadata", "datapart" - latestMsg -end -""" MQTT client v2. The difference between client v1 and v2 is v2 allows multiple - msgReceiveChannel without having to redefine mqttClientInstance again like in v1. - This is achieved by using msgReceiveChannel::NamedTuple to store multiple channel. -# Arguments - - `mqttBrokerAddress` - MQTT broker address. Can be URL or IP address - - `subtopic` - A list of topic to be subscribed Ex. ["/testopic_1", "/testopic_2"] - - `msgReceiveChannel` - A storage the mqtt client used to store incoming message. - Ex. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) - - `keepaliveChannel` - A channel to store keepalive message - - `onMsgCallback` - A user-defined function to handle incoming message +# mutable struct mqttClientInstance_v2 <: mqttClientInstance +# mqttBrokerAddress::String +# mqttBrokerPort::Integer +# subtopic::Vector{String} +# msgReceiveChannel +# onMsgCallback::Function +# qos::MQTTClient.QOS +# client::MQTTClient.Client +# clientId::String +# connection::MQTTClient.Connection +# keepalivetopic::String +# keepaliveChannel::Channel +# keepaliveCheckInterval::Integer # second +# lastTimeMqttConnCheck::Union{DateTime, Nothing} +# multiMsg::String # "single", "metadata", "datapart" +# latestMsg +# end +# """ MQTT client v2. The difference between client v1 and v2 is v2 allows multiple +# msgReceiveChannel without having to redefine mqttClientInstance again like in v1. +# This is achieved by using msgReceiveChannel::NamedTuple to store multiple channel. +# # Arguments +# - `mqttBrokerAddress` +# MQTT broker address. Can be URL or IP address +# - `subtopic` +# A list of topic to be subscribed Ex. ["/testopic_1", "/testopic_2"] +# - `msgReceiveChannel` +# A storage the mqtt client used to store incoming message. +# Ex. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) +# - `keepaliveChannel` +# A channel to store keepalive message +# - `onMsgCallback` +# A user-defined function to handle incoming message -# Keyword Arguments - - `mqttBrokerPort` - mqtt broker port - - `keepalivetopic` - A topic where keepalive message going - - `keepaliveCheckInterval` - A time interval to check whether the mqtt client still connect to mqtt broker - - `qos` - Quality of Service. Can be QOS_0, QOS_1, QOS_2 - - `clearOldMsg` - Boolean to determine if old messages should be cleared from channels +# # Keyword Arguments +# - `mqttBrokerPort` +# mqtt broker port +# - `keepalivetopic` +# A topic where keepalive message going +# - `keepaliveCheckInterval` +# A time interval to check whether the mqtt client still connect to mqtt broker +# - `qos` +# Quality of Service. Can be QOS_0, QOS_1, QOS_2 +# - `clearOldMsg` +# Boolean to determine if old messages should be cleared from channels -# Return - - `mqttInstance` - A new mqtt client +# # Return +# - `mqttInstance` +# A new mqtt client -# Example - ```jldoctest - julia> using Revise - julia> using GeneralUtils, Dates, JSON3, UUIDs - julia> mqttMsgReceiveTopic = ["/receivetopic_1", "/receivetopic_2"] - julia> mqttMsgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) # single channel Ex. (ch1=Channel(8),) - julia> keepaliveChannel = Channel(8) - julia> function onMsgCallback(topic, payload) - jobj = JSON3.read(String(payload)) - incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively +# # Example +# ```jldoctest +# julia> using Revise +# julia> using GeneralUtils, Dates, JSON3, UUIDs +# julia> mqttMsgReceiveTopic = ["/receivetopic_1", "/receivetopic_2"] +# julia> mqttMsgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) # single channel Ex. (ch1=Channel(8),) +# julia> keepaliveChannel = Channel(8) +# julia> function onMsgCallback(topic, payload) +# jobj = JSON3.read(String(payload)) +# incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively - if occursin("topic_1", topic) - put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg) - elseif occursin("topic_2", topic) - put!(mqttMsgReceiveChannel[:ch2], incomingMqttMsg) - elseif occursin("keepalive", topic) - put!(keepaliveChannel, incomingMqttMsg) - else - println("undefined condition ", @__FILE__, " ", @__LINE__) - end - end - julia> mqttInstance = GeneralUtils.mqttClientInstance_v2( - "mqtt.yiem.cc", - mqttMsgReceiveTopic, - mqttMsgReceiveChannel, - keepaliveChannel, - onMsgCallback - ) - ``` +# if occursin("topic_1", topic) +# put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg) +# elseif occursin("topic_2", topic) +# put!(mqttMsgReceiveChannel[:ch2], incomingMqttMsg) +# elseif occursin("keepalive", topic) +# put!(keepaliveChannel, incomingMqttMsg) +# else +# println("undefined condition ", @__FILE__, " ", @__LINE__) +# end +# end +# julia> mqttInstance = GeneralUtils.mqttClientInstance_v2( +# "mqtt.yiem.cc", +# mqttMsgReceiveTopic, +# mqttMsgReceiveChannel, +# keepaliveChannel, +# onMsgCallback +# ) +# ``` -# Signature -""" -function mqttClientInstance_v2( - mqttBrokerAddress::String, - subtopic::Vector{String}, - msgReceiveChannel, # NamedTuple of channels i.e. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(8)) - keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback() - onMsgCallback::Function - ; - clientId::String="NA", - mqttBrokerPort::Integer=1883, - keepaliveTopic::String= "/keepalive/$(uuid4snakecase())", - keepaliveCheckInterval::Integer=30, - qos::MQTTClient.QOS=QOS_1, - multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta - clearOldMsg::Bool=true, - ) +# # Signature +# """ +# function mqttClientInstance_v2( +# mqttBrokerAddress::String, +# subtopic::Vector{String}, +# msgReceiveChannel, # NamedTuple of channels i.e. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(8)) +# keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback() +# onMsgCallback::Function +# ; +# clientId::String="NA", +# mqttBrokerPort::Integer=1883, +# keepaliveTopic::String= "/keepalive/$(uuid4snakecase())", +# keepaliveCheckInterval::Integer=30, +# qos::MQTTClient.QOS=QOS_1, +# multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta +# clearOldMsg::Bool=true, +# ) - client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort) - MQTTClient.connect(client, connection) - for i in subtopic - MQTTClient.subscribe(client, i, onMsgCallback, qos=qos) - end - MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos) +# client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort) +# MQTTClient.connect(client, connection) +# for i in subtopic +# MQTTClient.subscribe(client, i, onMsgCallback, qos=qos) +# end +# MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos) - keepaliveTopic = clientId == "NA" ? keepaliveTopic : "/keepalive/$clientId" +# keepaliveTopic = clientId == "NA" ? keepaliveTopic : "/keepalive/$clientId" - instance = mqttClientInstance_v2( - mqttBrokerAddress, - mqttBrokerPort, - subtopic, - msgReceiveChannel, - onMsgCallback, - qos, - client, - clientId, - connection, - keepaliveTopic, - keepaliveChannel, - keepaliveCheckInterval, - nothing, - multiMsg, - nothing, # store latest message to prevent doing the same massage twice - ) +# instance = mqttClientInstance_v2( +# mqttBrokerAddress, +# mqttBrokerPort, +# subtopic, +# msgReceiveChannel, +# onMsgCallback, +# qos, +# client, +# clientId, +# connection, +# keepaliveTopic, +# keepaliveChannel, +# keepaliveCheckInterval, +# nothing, +# multiMsg, +# nothing, # store latest message to prevent doing the same massage twice +# ) - if clearOldMsg - chnames = keys(msgReceiveChannel) - for i in chnames - totalmsg = msgReceiveChannel[i].n_avail_items #[TESTING] - if totalmsg > 0 - while isready(msgReceiveChannel[i]) - _ = take!(msgReceiveChannel[i]) - end - println("Total $totalmsg old MQTT messages cleared ", @__FILE__, ":", @__LINE__, " $(Dates.now())") - end - end - end +# if clearOldMsg +# chnames = keys(msgReceiveChannel) +# for i in chnames +# totalmsg = msgReceiveChannel[i].n_avail_items #[TESTING] +# if totalmsg > 0 +# while isready(msgReceiveChannel[i]) +# _ = take!(msgReceiveChannel[i]) +# end +# println("Total $totalmsg old MQTT messages cleared ", @__FILE__, ":", @__LINE__, " $(Dates.now())") +# end +# end +# end - return instance -end +# return instance +# end """ Generate msgMeta to be including in a message. So the message receiver know @@ -324,551 +325,551 @@ end -""" Check mqtt server connection. +# """ Check mqtt server connection. - Arguments\n - ----- - mqttInstanceDict::Dict{Symbol, Any} - a dictionary contain mqtt instance. 1 per mqtt client. - interval::Integer - time interval to check mqtt server in seconds +# Arguments\n +# ----- +# mqttInstanceDict::Dict{Symbol, Any} +# a dictionary contain mqtt instance. 1 per mqtt client. +# interval::Integer +# time interval to check mqtt server in seconds - Return\n - ----- - isconnectionalive::Bool - true if mqtt connection is alive +# Return\n +# ----- +# isconnectionalive::Bool +# true if mqtt connection is alive - Example\n - ----- - ```jldoctest - julia> using Revise - julia> using GeneralUtils, Dates, JSON3, MQTTClient - julia> GeneralUtils.isMqttConnectionAlive(mqttInstance) - true - ``` +# Example\n +# ----- +# ```jldoctest +# julia> using Revise +# julia> using GeneralUtils, Dates, JSON3, MQTTClient +# julia> GeneralUtils.isMqttConnectionAlive(mqttInstance) +# true +# ``` - Signature\n - ----- -""" -function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance} - isconnectionalive = false +# Signature\n +# ----- +# """ +# function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance} +# isconnectionalive = false - # ditch old keepalive msg is any - while isready(mqttInstance.keepaliveChannel) - _ = take!(mqttInstance.keepaliveChannel) - end +# # ditch old keepalive msg is any +# while isready(mqttInstance.keepaliveChannel) +# _ = take!(mqttInstance.keepaliveChannel) +# end - msgMeta = generate_msgMeta( - mqttInstance.keepalivetopic, - msgPurpose= "keepalive", - ) +# msgMeta = generate_msgMeta( +# mqttInstance.keepalivetopic, +# msgPurpose= "keepalive", +# ) - keepaliveMsg = Dict( - :msgMeta=> msgMeta, - :payload=> Dict( - :text=>"keepalive", - ) - ) +# keepaliveMsg = Dict( +# :msgMeta=> msgMeta, +# :payload=> Dict( +# :text=>"keepalive", +# ) +# ) - publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTo], - JSON3.write(keepaliveMsg)) - timediff = 0 - starttime = Dates.now() - while timediff <= 5 - timediff = timedifference(starttime, Dates.now(), "seconds") - if isready(mqttInstance.keepaliveChannel) - incomingMsg = take!(mqttInstance.keepaliveChannel) - if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId] - # connection is alive - isconnectionalive = true - break - end - end - sleep(1) - end - return isconnectionalive -end +# publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTo], +# JSON3.write(keepaliveMsg)) +# timediff = 0 +# starttime = Dates.now() +# while timediff <= 5 +# timediff = timedifference(starttime, Dates.now(), "seconds") +# if isready(mqttInstance.keepaliveChannel) +# incomingMsg = take!(mqttInstance.keepaliveChannel) +# if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId] +# # connection is alive +# isconnectionalive = true +# break +# end +# end +# sleep(1) +# end +# return isconnectionalive +# end -""" Check mqtt server connection, reconnect if disconnected. +# """ Check mqtt server connection, reconnect if disconnected. - Arguments\n - ----- - mqttInstanceDict::Dict{Symbol, Any} - a dictionary contain mqtt instance. 1 per mqtt client. - keepaliveCheckInterval::Integer - time interval to check mqtt server in seconds +# Arguments\n +# ----- +# mqttInstanceDict::Dict{Symbol, Any} +# a dictionary contain mqtt instance. 1 per mqtt client. +# keepaliveCheckInterval::Integer +# time interval to check mqtt server in seconds - Return\n - ----- - isreconnect::Bool - true if mqtt connection is reconnected +# Return\n +# ----- +# isreconnect::Bool +# true if mqtt connection is reconnected - Example\n - ----- - ```jldoctest - julia> using Revise - julia> using GeneralUtils, Dates, JSON3 - julia> GeneralUtils.checkMqttConnection!(mqttInstance, 5) - ``` +# Example\n +# ----- +# ```jldoctest +# julia> using Revise +# julia> using GeneralUtils, Dates, JSON3 +# julia> GeneralUtils.checkMqttConnection!(mqttInstance, 5) +# ``` - Signature\n - ----- -""" -function checkMqttConnection!(mqttInstance::T; - keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Union{Bool, Nothing} where {T<:mqttClientInstance} +# Signature\n +# ----- +# """ +# function checkMqttConnection!(mqttInstance::T; +# keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Union{Bool, Nothing} where {T<:mqttClientInstance} - interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval +# interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval - # check if mqtt connection is still up - intervaldiff = - if mqttInstance.lastTimeMqttConnCheck !== nothing - timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds") - else - Inf - end +# # check if mqtt connection is still up +# intervaldiff = +# if mqttInstance.lastTimeMqttConnCheck !== nothing +# timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds") +# else +# Inf +# end - if intervaldiff > interval - connectionStatusStart = isMqttConnectionAlive(mqttInstance) # a flag to note whether the connection status has changed from false to true - while true - mqttConnStatus = isMqttConnectionAlive(mqttInstance) - if mqttConnStatus == false - sleep(5) # wait - println("MQTT connection disconnected, attemping to reconnect $(Dates.now()) at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)") - # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet - mqttInstance.client, mqttInstance.connection = - MakeConnection(mqttInstance.mqttBrokerAddress, - mqttInstance.mqttBrokerPort) - try - connect(mqttInstance.client, mqttInstance.connection) - for topic in mqttInstance.subtopic - subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) - end - MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) - catch - println("Failed to reconnect MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())") - end - else - mqttInstance.lastTimeMqttConnCheck = Dates.now() - if connectionStatusStart != mqttConnStatus - println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())") - end - return mqttConnStatus - end - end - else - return nothing # still within check interval, no update on connection status yet - end -end +# if intervaldiff > interval +# connectionStatusStart = isMqttConnectionAlive(mqttInstance) # a flag to note whether the connection status has changed from false to true +# while true +# mqttConnStatus = isMqttConnectionAlive(mqttInstance) +# if mqttConnStatus == false +# sleep(5) # wait +# println("MQTT connection disconnected, attemping to reconnect $(Dates.now()) at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)") +# # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet +# mqttInstance.client, mqttInstance.connection = +# MakeConnection(mqttInstance.mqttBrokerAddress, +# mqttInstance.mqttBrokerPort) +# try +# connect(mqttInstance.client, mqttInstance.connection) +# for topic in mqttInstance.subtopic +# subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) +# end +# MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) +# catch +# println("Failed to reconnect MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())") +# end +# else +# mqttInstance.lastTimeMqttConnCheck = Dates.now() +# if connectionStatusStart != mqttConnStatus +# println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())") +# end +# return mqttConnStatus +# end +# end +# else +# return nothing # still within check interval, no update on connection status yet +# end +# end -""" Send a message to specified MQTT topic then wait for reply. +# """ Send a message to specified MQTT topic then wait for reply. -# Arguments - - `outgoingMsg::Dict` - an outgoing message +# # Arguments +# - `outgoingMsg::Dict` +# an outgoing message -# Return - - `result::NamedTuple` - ( success= true, error= nothing ) +# # Return +# - `result::NamedTuple` +# ( success= true, error= nothing ) -# Example - ```jldoctest - julia> using Revise - julia> using GeneralUtils, Dates, JSON3, UUIDs - julia> msgMeta = GeneralUtils.generate_msgMeta( - "/testtopic", - senderName= "somename", - senderId= uuid4snakecase(), - mqttBrokerAddress= "test.mosquitto.org", - mqttBrokerPort= 1883, - ) - julia> outgoingMsg = Dict( - :msgMeta=> msgMeta, - :payload=> Dict("hello"=> "World"), - ) - julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) - ``` +# # Example +# ```jldoctest +# julia> using Revise +# julia> using GeneralUtils, Dates, JSON3, UUIDs +# julia> msgMeta = GeneralUtils.generate_msgMeta( +# "/testtopic", +# senderName= "somename", +# senderId= uuid4snakecase(), +# mqttBrokerAddress= "test.mosquitto.org", +# mqttBrokerPort= 1883, +# ) +# julia> outgoingMsg = Dict( +# :msgMeta=> msgMeta, +# :payload=> Dict("hello"=> "World"), +# ) +# julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) +# ``` -# Signature -""" -function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any} +# # Signature +# """ +# function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any} - # sendMqttMsg() doesn't need to receive msg but mqttClientInstance_v2 requires to have receive topic - mqttMsgReceiveTopic = "/GeneralUtils_sendMqttMsg/$(outgoingMsg[:msgMeta][:senderId])" +# # sendMqttMsg() doesn't need to receive msg but mqttClientInstance_v2 requires to have receive topic +# mqttMsgReceiveTopic = "/GeneralUtils_sendMqttMsg/$(outgoingMsg[:msgMeta][:senderId])" - mqttMsgReceiveChannel = (ch1=Channel(8),) - keepaliveChannel = Channel(8) +# mqttMsgReceiveChannel = (ch1=Channel(8),) +# keepaliveChannel = Channel(8) - # Define the callback for receiving messages. - function onMsgCallback(topic, payload) - jobj = JSON3.read(String(payload)) - onMsg = copy(jobj) - put!(mqttMsgReceiveChannel[:ch1], onMsg) - end +# # Define the callback for receiving messages. +# function onMsgCallback(topic, payload) +# jobj = JSON3.read(String(payload)) +# onMsg = copy(jobj) +# put!(mqttMsgReceiveChannel[:ch1], onMsg) +# end - mqttInstance = mqttClientInstance_v2( - outgoingMsg[:msgMeta][:mqttBrokerAddress], - [mqttMsgReceiveTopic], - mqttMsgReceiveChannel, - keepaliveChannel, - onMsgCallback; - mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort] - ) +# mqttInstance = mqttClientInstance_v2( +# outgoingMsg[:msgMeta][:mqttBrokerAddress], +# [mqttMsgReceiveTopic], +# mqttMsgReceiveChannel, +# keepaliveChannel, +# onMsgCallback; +# mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort] +# ) - response = sendMqttMsg(mqttInstance, outgoingMsg) - try disconnect(mqttInstance.client) catch end +# response = sendMqttMsg(mqttInstance, outgoingMsg) +# try disconnect(mqttInstance.client) catch end - return response -end -function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T} - )::NamedTuple where {T<:Any} - try - publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTo], JSON3.write(outgoingMsg)) - return (success=true, error=nothing) - catch e - return (success=false, error=e) - end -end +# return response +# end +# function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T} +# )::NamedTuple where {T<:Any} +# try +# publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTo], JSON3.write(outgoingMsg)) +# return (success=true, error=nothing) +# catch e +# return (success=false, error=e) +# end +# end -""" Send a message to specified MQTT topic then wait for reply. +# """ Send a message to specified MQTT topic then wait for reply. -# Arguments - - `outgoingMsg::Dict{Symbol, T}` - an outgoing message +# # Arguments +# - `outgoingMsg::Dict{Symbol, T}` +# an outgoing message -# Keyword Arguments - - `timeout::Integer=60` - time in seconds to wait for a response before error - - `maxattempt::Integer=1` - maximum number of attempts to send and receive message +# # Keyword Arguments +# - `timeout::Integer=60` +# time in seconds to wait for a response before error +# - `maxattempt::Integer=1` +# maximum number of attempts to send and receive message -# Return - - `result::NamedTuple` - ( - success= true, # idicates whether sending MQTT message successful - error= nothing # error message - response= somemessage # response message - ) +# # Return +# - `result::NamedTuple` +# ( +# success= true, # idicates whether sending MQTT message successful +# error= nothing # error message +# response= somemessage # response message +# ) -# Example -```jldoctest -julia> using Revise -julia> using GeneralUtils, Dates, UUIDs, JSON3 -julia> msgMeta = GeneralUtils.generate_msgMeta( - "/testtopic", - senderName= "somename", - senderId= uuid4snakecase(), - mqttBrokerAddress= "mqtt.yiem.cc", - mqttBrokerPort= 1883, - ) -julia> outgoingMsg = Dict( - :msgMeta=> msgMeta, - :payload=> Dict(:hello=> "World"), - ) -julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg) -``` +# # Example +# ```jldoctest +# julia> using Revise +# julia> using GeneralUtils, Dates, UUIDs, JSON3 +# julia> msgMeta = GeneralUtils.generate_msgMeta( +# "/testtopic", +# senderName= "somename", +# senderId= uuid4snakecase(), +# mqttBrokerAddress= "mqtt.yiem.cc", +# mqttBrokerPort= 1883, +# ) +# julia> outgoingMsg = Dict( +# :msgMeta=> msgMeta, +# :payload=> Dict(:hello=> "World"), +# ) +# julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg) +# ``` -# Signature -""" -function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T}; - connectiontimeout::Integer=600, responsetimeout::Integer=60, responsemaxattempt::Integer=1)::NamedTuple where {T<:Any} - senderId = outgoingMsg[:msgMeta][:senderId] - mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$senderId" - mqttMsgReceiveChannel = (ch1=Channel(8),) - keepaliveChannel = Channel(8) - outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic +# # Signature +# """ +# function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T}; +# connectiontimeout::Integer=600, responsetimeout::Integer=60, responsemaxattempt::Integer=1)::NamedTuple where {T<:Any} +# senderId = outgoingMsg[:msgMeta][:senderId] +# mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$senderId" +# mqttMsgReceiveChannel = (ch1=Channel(8),) +# keepaliveChannel = Channel(8) +# outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic - # Define the callback for receiving messages. - function onMsgCallback(topic, payload) - jobj = JSON3.read(String(payload)) - incomingMqttMsg = copy(jobj) +# # Define the callback for receiving messages. +# function onMsgCallback(topic, payload) +# jobj = JSON3.read(String(payload)) +# incomingMqttMsg = copy(jobj) - if occursin("GeneralUtils_sendReceiveMqttMsg", topic) - put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg) - elseif occursin("keepalive", topic) - put!(keepaliveChannel, incomingMqttMsg) - else - println("undefined condition ", @__FILE__, " ", @__LINE__) - end - end +# if occursin("GeneralUtils_sendReceiveMqttMsg", topic) +# put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg) +# elseif occursin("keepalive", topic) +# put!(keepaliveChannel, incomingMqttMsg) +# else +# println("undefined condition ", @__FILE__, " ", @__LINE__) +# end +# end - mqttInstance = nothing - attempt = 0 - starttime = Dates.now() - endtime = starttime + Second(connectiontimeout) - errormsg = nothing - while true - timenow = Dates.now() - timepass = timedifference(starttime, timenow, "seconds") - timeleft = timedifference(timenow, endtime, "seconds") +# mqttInstance = nothing +# attempt = 0 +# starttime = Dates.now() +# endtime = starttime + Second(connectiontimeout) +# errormsg = nothing +# while true +# timenow = Dates.now() +# timepass = timedifference(starttime, timenow, "seconds") +# timeleft = timedifference(timenow, endtime, "seconds") - if timepass <= connectiontimeout - attempt += 1 - attempt > 1 ? println("Attempt $attempt to connect to MQTT broker. Timed out in $timeleft seconds. $errormsg") : nothing +# if timepass <= connectiontimeout +# attempt += 1 +# attempt > 1 ? println("Attempt $attempt to connect to MQTT broker. Timed out in $timeleft seconds. $errormsg") : nothing - try - mqttInstance = mqttClientInstance_v2( - outgoingMsg[:msgMeta][:mqttBrokerAddress], - [mqttMsgReceiveTopic], - mqttMsgReceiveChannel, - keepaliveChannel, - onMsgCallback; - mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort], - clientId=senderId - ) - break - catch e - errormsg = e - sleep(5) - end - else - println("Failed to instantiate MQTT client after $timepass seconds. $errormsg") - return nothing - end - end +# try +# mqttInstance = mqttClientInstance_v2( +# outgoingMsg[:msgMeta][:mqttBrokerAddress], +# [mqttMsgReceiveTopic], +# mqttMsgReceiveChannel, +# keepaliveChannel, +# onMsgCallback; +# mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort], +# clientId=senderId +# ) +# break +# catch e +# errormsg = e +# sleep(5) +# end +# else +# println("Failed to instantiate MQTT client after $timepass seconds. $errormsg") +# return nothing +# end +# end - response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; - responsetimeout=responsetimeout, responsemaxattempt=responsemaxattempt) - try disconnect(mqttInstance.client) catch end +# response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; +# responsetimeout=responsetimeout, responsemaxattempt=responsemaxattempt) +# try disconnect(mqttInstance.client) catch end - return response -end -function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol, - outgoingMsg::Dict{Symbol, T}; responsetimeout::Integer=60, responsemaxattempt::Integer=1 - )::NamedTuple where {T<:Any} +# return response +# end +# function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol, +# outgoingMsg::Dict{Symbol, T}; responsetimeout::Integer=60, responsemaxattempt::Integer=1 +# )::NamedTuple where {T<:Any} - timepass = nothing - attempts = 1 - while attempts <= responsemaxattempt - sendMqttMsg(mqttInstance, outgoingMsg) +# timepass = nothing +# attempts = 1 +# while attempts <= responsemaxattempt +# sendMqttMsg(mqttInstance, outgoingMsg) - starttime = Dates.now() - while true - timepass = timedifference(starttime, Dates.now(), "seconds") +# starttime = Dates.now() +# while true +# timepass = timedifference(starttime, Dates.now(), "seconds") - if timepass <= responsetimeout - if isready(mqttInstance.msgReceiveChannel[receivechannel]) - incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel]) +# if timepass <= responsetimeout +# if isready(mqttInstance.msgReceiveChannel[receivechannel]) +# incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel]) - if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId] - if incomingMsg[:msgMeta][:acknowledgestatus] == "NACK" - println("NACK") - break # resend the msg - else - return (success=true, error=nothing, response=incomingMsg[:payload]) - end - end - end - else - break - end - sleep(1) - end - if attempts > 1 - println("\n attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())") - pprintln(outgoingMsg) - println(" attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n") - checkMqttConnection!(mqttInstance, keepaliveCheckInterval=5) - end - attempts += 1 - end +# if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId] +# if incomingMsg[:msgMeta][:acknowledgestatus] == "NACK" +# println("NACK") +# break # resend the msg +# else +# return (success=true, error=nothing, response=incomingMsg[:payload]) +# end +# end +# end +# else +# break +# end +# sleep(1) +# end +# if attempts > 1 +# println("\n attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())") +# pprintln(outgoingMsg) +# println(" attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n") +# checkMqttConnection!(mqttInstance, keepaliveCheckInterval=5) +# end +# attempts += 1 +# end - return (success=false, - error="no response, timeout $timepass/$timeout, $(attempts-1) publish attempted", - response=nothing) -end +# return (success=false, +# error="no response, timeout $timepass/$timeout, $(attempts-1) publish attempted", +# response=nothing) +# end -""" Send a message to specified MQTT topic then wait for reply. +# """ Send a message to specified MQTT topic then wait for reply. -# Arguments - - `workDict::Dict` - A dictionary containing workspace data - - `incomingMsg::Dict` - The incoming message to process +# # Arguments +# - `workDict::Dict` +# A dictionary containing workspace data +# - `incomingMsg::Dict` +# The incoming message to process -# Keyword Arguments - - `data` - Optional data to be transferred. Default is nothing. +# # Keyword Arguments +# - `data` +# Optional data to be transferred. Default is nothing. -# Return - - `result::Dict` - A dictionary containing either: - - Metadata about the data transfer session if initiating a new transfer - - The requested data part if responding to a data part request +# # Return +# - `result::Dict` +# A dictionary containing either: +# - Metadata about the data transfer session if initiating a new transfer +# - The requested data part if responding to a data part request -# Example - ```jldoctest - julia> using Revise - julia> using GeneralUtils, Dates, JSON3, UUIDs - julia> msgMeta = GeneralUtils.generate_msgMeta( - "/testtopic", - senderName= "somename", - senderId= uuid4snakecase(), - mqttBrokerAddress= "test.mosquitto.org", - mqttBrokerPort= 1883, - ) - julia> outgoingMsg = Dict( - :msgMeta=> msgMeta, - :payload=> Dict("hello"=> "World"), - ) - julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) - ``` -# TODO - - [x] transfer multiple msg with the same mqtt connection. The transfer loop should be - implemented by passing mqttInstance into sendReceiveMqttMsg() so that - 1-connection is used to send all data - - [] partsize should be calculate from the fact that MQTT max size is 4MB (256MB theorectically) +# # Example +# ```jldoctest +# julia> using Revise +# julia> using GeneralUtils, Dates, JSON3, UUIDs +# julia> msgMeta = GeneralUtils.generate_msgMeta( +# "/testtopic", +# senderName= "somename", +# senderId= uuid4snakecase(), +# mqttBrokerAddress= "test.mosquitto.org", +# mqttBrokerPort= 1883, +# ) +# julia> outgoingMsg = Dict( +# :msgMeta=> msgMeta, +# :payload=> Dict("hello"=> "World"), +# ) +# julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) +# ``` +# # TODO +# - [x] transfer multiple msg with the same mqtt connection. The transfer loop should be +# implemented by passing mqttInstance into sendReceiveMqttMsg() so that +# 1-connection is used to send all data +# - [] partsize should be calculate from the fact that MQTT max size is 4MB (256MB theorectically) -# Signature -""" -function dataTransferOverMQTT_sender(workDict::Dict, incomingMsg::Dict; data=nothing) - dataTransferSessionID = nothing +# # Signature +# """ +# function dataTransferOverMQTT_sender(workDict::Dict, incomingMsg::Dict; data=nothing) +# dataTransferSessionID = nothing - if !haskey(incomingMsg[:payload], :dataTransferSessionID) - """ Expected incomingMsg (without "dataTransferSessionID") for requesting metadata +# if !haskey(incomingMsg[:payload], :dataTransferSessionID) +# """ Expected incomingMsg (without "dataTransferSessionID") for requesting metadata - Dict(:payload => Dict(:args => Dict(), :functioncall => "load_winetable"), - :msgMeta => Dict(:msgPurpose => nothing, - :receiverId => nothing, - :getpost => nothing, - :msgId => "7fe4b765_fa2e_4c2e_9df0_9cb97e84ec32", - :requestresponse => nothing, - :msgFormatVersion => nothing, - :timestamp => "Tue Oct 01 2024 16:22:12 GMT+0700 (เวลาอินโดจีน)", - :replyToMsgId => nothing, - :acknowledgestatus => nothing, - :sendTo => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing", - :mqttBrokerPort => nothing, - :receiverName => "agent_wine_backend", - :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25", - :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083", - :senderName => "agent_wine_frontend", - :senderselfnote => nothing, - :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25")) - """ +# Dict(:payload => Dict(:args => Dict(), :functioncall => "load_winetable"), +# :msgMeta => Dict(:msgPurpose => nothing, +# :receiverId => nothing, +# :getpost => nothing, +# :msgId => "7fe4b765_fa2e_4c2e_9df0_9cb97e84ec32", +# :requestresponse => nothing, +# :msgFormatVersion => nothing, +# :timestamp => "Tue Oct 01 2024 16:22:12 GMT+0700 (เวลาอินโดจีน)", +# :replyToMsgId => nothing, +# :acknowledgestatus => nothing, +# :sendTo => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing", +# :mqttBrokerPort => nothing, +# :receiverName => "agent_wine_backend", +# :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25", +# :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083", +# :senderName => "agent_wine_frontend", +# :senderselfnote => nothing, +# :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25")) +# """ - if data !== nothing # if the user provide data, store it in a new session workspace - dataTransferSessionID = uuid4snakecase() - haskey(workDict, :dataTransferOverMQTT) ? nothing : workDict[:dataTransferOverMQTT] = Dict() - workDict[:dataTransferOverMQTT][dataTransferSessionID] = Dict() - s = workDict[:dataTransferOverMQTT][dataTransferSessionID] - s[:dataTransferSessionID] = dataTransferSessionID - dataDict = Dict(pairs(data)) - merge!(s, dataDict) - else - error("No data provided") - end +# if data !== nothing # if the user provide data, store it in a new session workspace +# dataTransferSessionID = uuid4snakecase() +# haskey(workDict, :dataTransferOverMQTT) ? nothing : workDict[:dataTransferOverMQTT] = Dict() +# workDict[:dataTransferOverMQTT][dataTransferSessionID] = Dict() +# s = workDict[:dataTransferOverMQTT][dataTransferSessionID] +# s[:dataTransferSessionID] = dataTransferSessionID +# dataDict = Dict(pairs(data)) +# merge!(s, dataDict) +# else +# error("No data provided") +# end - # compute hash value for each piece of data - s = workDict[:dataTransferOverMQTT][dataTransferSessionID] - s[:hashes] = [bytes2hex(sha256(JSON3.write(s[:dataparts][i]))) for i in 1:s[:totalparts]] +# # compute hash value for each piece of data +# s = workDict[:dataTransferOverMQTT][dataTransferSessionID] +# s[:hashes] = [bytes2hex(sha256(JSON3.write(s[:dataparts][i]))) for i in 1:s[:totalparts]] - # send metadata without data to receiver() - datainfo = Dict() - for (k, v) in s - if k ∉ [:dataparts] - datainfo[k] = v - end - end +# # send metadata without data to receiver() +# datainfo = Dict() +# for (k, v) in s +# if k ∉ [:dataparts] +# datainfo[k] = v +# end +# end - d = Dict( - :result=> "dataTransferOverMQTT", - :dataTransferSessionID=> dataTransferSessionID, - Symbol(dataTransferSessionID)=> datainfo - ) +# d = Dict( +# :result=> "dataTransferOverMQTT", +# :dataTransferSessionID=> dataTransferSessionID, +# Symbol(dataTransferSessionID)=> datainfo +# ) - """ Sending metadata to the requester - Dict(:result => "dataTransferOverMQTT", - :b57123aa_a4c6_4d07_ace2_4885005d51c6 => Dict(:partsize => 3, - :hashes => ["755fb25ae04890fa2a40c2b5fb5ddf05adeff6b934475998576bdf307359cd0f", - "b95ba78344ad409755a386b6ab0cb2c8ea490756ce2df401aa6e5f77cf445025", - "5b51277dd06ca255492cfc432824adf77e9e0de94179806cb130bef38e75da48"], - :datatype => "vector{Dict}", - :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6", - :totalparts => 46), - :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6") - """ +# """ Sending metadata to the requester +# Dict(:result => "dataTransferOverMQTT", +# :b57123aa_a4c6_4d07_ace2_4885005d51c6 => Dict(:partsize => 3, +# :hashes => ["755fb25ae04890fa2a40c2b5fb5ddf05adeff6b934475998576bdf307359cd0f", +# "b95ba78344ad409755a386b6ab0cb2c8ea490756ce2df401aa6e5f77cf445025", +# "5b51277dd06ca255492cfc432824adf77e9e0de94179806cb130bef38e75da48"], +# :datatype => "vector{Dict}", +# :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6", +# :totalparts => 46), +# :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6") +# """ - return d +# return d - else +# else - """ Expected incomingMsg (with "dataTransferSessionID") for requesting a data part - Dict(:payload => Dict(:dataTransferSessionID => "5543f3b9_194e_4670_b961_e51c77955e40", :args => Dict(), :requestingDataPartNumber => 1, :functioncall => "load_winetable"), - :msgMeta => Dict(:msgPurpose => nothing, - :receiverId => nothing, - :getpost => nothing, - :msgId => "fbcc3bd9_c2bc_4bde_908a_dfe88ab7a305", - :requestresponse => nothing, - :msgFormatVersion => nothing, - :timestamp => "Tue Oct 01 2024 16:34:06 GMT+0700 (เวลาอินโดจีน)", - :replyToMsgId => nothing, - :acknowledgestatus => nothing, - :sendTo => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing", - :mqttBrokerPort => nothing, - :receiverName => "agent_wine_backend", - :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25", - :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083", - :senderName => "agent_wine_frontend", - :senderselfnote => nothing, - :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25")) - """ +# """ Expected incomingMsg (with "dataTransferSessionID") for requesting a data part +# Dict(:payload => Dict(:dataTransferSessionID => "5543f3b9_194e_4670_b961_e51c77955e40", :args => Dict(), :requestingDataPartNumber => 1, :functioncall => "load_winetable"), +# :msgMeta => Dict(:msgPurpose => nothing, +# :receiverId => nothing, +# :getpost => nothing, +# :msgId => "fbcc3bd9_c2bc_4bde_908a_dfe88ab7a305", +# :requestresponse => nothing, +# :msgFormatVersion => nothing, +# :timestamp => "Tue Oct 01 2024 16:34:06 GMT+0700 (เวลาอินโดจีน)", +# :replyToMsgId => nothing, +# :acknowledgestatus => nothing, +# :sendTo => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing", +# :mqttBrokerPort => nothing, +# :receiverName => "agent_wine_backend", +# :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25", +# :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083", +# :senderName => "agent_wine_frontend", +# :senderselfnote => nothing, +# :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25")) +# """ - # check transfer session id - dataTransferSessionID = incomingMsg[:payload][:dataTransferSessionID] - clientRequestingDataPartNumber = - incomingMsg[:payload][:requestingDataPartNumber] - s = workDict[:dataTransferOverMQTT][dataTransferSessionID] +# # check transfer session id +# dataTransferSessionID = incomingMsg[:payload][:dataTransferSessionID] +# clientRequestingDataPartNumber = +# incomingMsg[:payload][:requestingDataPartNumber] +# s = workDict[:dataTransferOverMQTT][dataTransferSessionID] - d = Dict( - :result=> "dataTransferOverMQTT", - :dataTransferSessionID=> dataTransferSessionID, - Symbol(dataTransferSessionID)=> Dict( - :dataPartNumber=> clientRequestingDataPartNumber, - :datapart=> s[:dataparts][clientRequestingDataPartNumber] - ), - ) +# d = Dict( +# :result=> "dataTransferOverMQTT", +# :dataTransferSessionID=> dataTransferSessionID, +# Symbol(dataTransferSessionID)=> Dict( +# :dataPartNumber=> clientRequestingDataPartNumber, +# :datapart=> s[:dataparts][clientRequestingDataPartNumber] +# ), +# ) - return d - end -end +# return d +# end +# end -""" Send a message to specified MQTT topic then wait for reply. +# """ Send a message to specified MQTT topic then wait for reply. -# Arguments - - `outgoingMsg::Dict` - an outgoing message +# # Arguments +# - `outgoingMsg::Dict` +# an outgoing message -# Return - - `result::NamedTuple` - ( success= true, error= nothing ) +# # Return +# - `result::NamedTuple` +# ( success= true, error= nothing ) -# Example - ```jldoctest - julia> using Revise - julia> using GeneralUtils, Dates, JSON3, UUIDs - julia> msgMeta = GeneralUtils.generate_msgMeta( - "/testtopic", - senderName= "somename", - senderId= uuid4snakecase(), - mqttBrokerAddress= "test.mosquitto.org", - mqttBrokerPort= 1883, - ) - julia> outgoingMsg = Dict( - :msgMeta=> msgMeta, - :payload=> Dict("hello"=> "World"), - ) - julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) - ``` -# TODO - - [PENDING] +# # Example +# ```jldoctest +# julia> using Revise +# julia> using GeneralUtils, Dates, JSON3, UUIDs +# julia> msgMeta = GeneralUtils.generate_msgMeta( +# "/testtopic", +# senderName= "somename", +# senderId= uuid4snakecase(), +# mqttBrokerAddress= "test.mosquitto.org", +# mqttBrokerPort= 1883, +# ) +# julia> outgoingMsg = Dict( +# :msgMeta=> msgMeta, +# :payload=> Dict("hello"=> "World"), +# ) +# julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) +# ``` +# # TODO +# - [PENDING] -# Signature -""" -function dataTransferOverMQTT_receiver() +# # Signature +# """ +# function dataTransferOverMQTT_receiver() @@ -878,7 +879,7 @@ function dataTransferOverMQTT_receiver() -end +# end diff --git a/src/dbUtil.jl b/src/dbUtil.jl index 40296bd..2e4fea8 100644 --- a/src/dbUtil.jl +++ b/src/dbUtil.jl @@ -2,7 +2,7 @@ module dbUtil export dictToPostgresKeyValueString, generateInsertSQL, generateUpdateSQL -using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames, +using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, DataFrames, SHA using ..util diff --git a/src/interface.jl b/src/interface.jl index f7c5672..dea5dcb 100644 --- a/src/interface.jl +++ b/src/interface.jl @@ -8,7 +8,7 @@ export noNegative!, randomWithProb, randomChoiceWithProb, findIndex, limitvalue, isLess, allTrue, getStringBetweenCharacters, JSON3read_stringKey, mkDictPath!, getDictPath, detectKeywordVariation, textToDict -using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames, CSV +using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, DataFrames, CSV using ..util, ..communication # ---------------------------------------------- 100 --------------------------------------------- # diff --git a/src/util.jl b/src/util.jl index 16ca962..671a201 100644 --- a/src/util.jl +++ b/src/util.jl @@ -9,7 +9,7 @@ export timedifference, showstracktrace, findHighestIndexKey, uuid4snakecase, rep extractTextBetweenCharacter, extractTextBetweenString, convertCamelSnakeKebabCase, fitrange, recentElementsIndex, nonRecentElementsIndex -using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames +using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, DataFrames # ---------------------------------------------- 100 --------------------------------------------- #