From 564ed3cb70f387f312f33ca05fc9adb1dbec6319 Mon Sep 17 00:00:00 2001 From: tonaerospace Date: Fri, 27 Sep 2024 15:23:56 +0700 Subject: [PATCH] update --- src/communication.jl | 745 ++++++++++++++++++++++++++++++------------- src/interface.jl | 59 +--- src/util.jl | 130 +++++++- 3 files changed, 651 insertions(+), 283 deletions(-) diff --git a/src/communication.jl b/src/communication.jl index fca5d25..5e6cc7d 100644 --- a/src/communication.jl +++ b/src/communication.jl @@ -1,14 +1,16 @@ module communication export generate_msgMeta, isMqttConnectionAlive, checkMqttConnection!, - sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance + sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2 using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient using ..util # ---------------------------------------------- 100 --------------------------------------------- # -mutable struct mqttClientInstance +abstract type mqttClientInstance end + +mutable struct mqttClientInstance_v1 <: mqttClientInstance mqttBrokerAddress::String mqttBrokerPort::Integer subtopic::Vector{String} @@ -23,7 +25,7 @@ mutable struct mqttClientInstance lastTimeMqttConnCheck::DateTime end -function mqttClientInstance( +function mqttClientInstance_v1( mqttBrokerAddress::String, mqttBrokerPort::Integer, subtopic::Vector{String}, @@ -62,53 +64,170 @@ function mqttClientInstance( end +mutable struct mqttClientInstance_v2 <: mqttClientInstance + mqttBrokerAddress::String + mqttBrokerPort::Integer + subtopic::Vector{String} + msgReceiveChannel + onMsgCallback::Function + qos::MQTTClient.QOS + client::MQTTClient.Client + connection::MQTTClient.Connection + keepalivetopic::String + keepaliveChannel::Channel + keepaliveCheckInterval::Integer # second + lastTimeMqttConnCheck::DateTime +end +""" MQTT client v2. The difference between client v1 and v2 is v2 allows multiple + msgReceiveChannel without having to redefine mqttClientInstance again like in v1. + This is achieved by using msgReceiveChannel::NamedTuple to store multiple channel. +# Arguments + - `mqttBrokerAddress` + MQTT broker address. Can be URL or IP address + - `subtopic` + A list of topic to be subscribed Ex. ["/testopic_1", "/testopic_2"] + - `msgReceiveChannel` + A storage the mqtt client used to store incoming message. + Ex. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) + - `keepaliveChannel` + A channel to store keepalive message + - `onMsgCallback` + A user-defined function to handle incoming message + +# Keyword Arguments + - `mqttBrokerPort` + mqtt broker port + - `keepalivetopic` + A topic where keepalive message going + - `keepaliveCheckInterval` + A time interval to check whether the mqtt client still connect to mqtt broker + - `qos` + Quality of Service. Can be QOS_0, QOS_1, QOS_2 + +# Return + - `mqttInstance` + A new mqtt client + +# Example + ```jldoctest + julia> using Revise + julia> using GeneralUtils, Dates, JSON3, UUIDs + julia> mqttMsgReceiveTopic = ["/receivetopic_1", "/receivetopic_2"] + julia> mqttMsgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) + julia> keepaliveChannel = Channel(8) + julia> function onMsgCallback(topic, payload) + jobj = JSON3.read(String(payload)) + incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively + + if occursin("topic_1", topic) + push!(mqttMsgReceiveTopic[:ch1], incomingMqttMsg) + elseif occursin("topic_2", topic) + push!(mqttMsgReceiveTopic[:ch2], incomingMqttMsg) + elseif occursin("keepalive", topic) + put!(keepaliveChannel, incomingMqttMsg) + else + println("undefined condition ", @__FILE__, " ", @__LINE__) + end + end + julia> mqttInstance = GeneralUtils.mqttClientInstance_v2( + "mqtt.yiem.cc", + mqttMsgReceiveTopic, + mqttMsgReceiveChannel, + keepaliveChannel, + onMsgCallback + ) + ``` + +# Signature +""" +function mqttClientInstance_v2( + mqttBrokerAddress::String, + subtopic::Vector{String}, + msgReceiveChannel, # NamedTuple of channels i.e. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(8)) + keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback() + onMsgCallback::Function + ; + mqttBrokerPort::Integer=1883, + keepalivetopic::String= "/keepalive/$(uuid4())", + keepaliveCheckInterval::Integer=30, + qos::MQTTClient.QOS=QOS_1, + ) + + client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort) + MQTTClient.connect(client, connection) + for i in subtopic + MQTTClient.subscribe(client, i, onMsgCallback, qos=qos) + end + MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos) + + instance = mqttClientInstance_v2( + mqttBrokerAddress, + mqttBrokerPort, + subtopic, + msgReceiveChannel, + onMsgCallback, + qos, + client, + connection, + keepalivetopic, + keepaliveChannel, + keepaliveCheckInterval, + Dates.now(), + ) + + return instance +end + + + """ Generate msgMeta to be including in a message. So the message receiver know what to do with the message. - Arguments\n - ----- - sendTopic::String +# Arguments + - `sendTopic` topic the sender sends to e.g. "/agent/wine/api/v1/prompt" - Keyword Arguments\n - ----- - msgPurpose::String +# Keyword Arguments + - `msgPurpose` purpose of this message e.g. "updateStatus" - senderName::String + - `senderName` sender name (String) e.g. "agent-wine-web-frontend" - senderId::String + - `senderId` sender id e.g. string(uuid4()) - receiverName::String + - `receiverName` msg receiver name (String) e.g. "agent-backend" - receiverId::String + - `receiverId` msg receiver id, nothing means everyone in the topic e.g. string(uuid4()) - requestresponse::String + - `requestresponse` this message is a "request" or "response" - getpost::String + - `getpost` this message is a "get" or "post" - msgId::String + - `msgId` message ID e.g. string(uuid4()) - timestamp::String + - `timestamp` message published timestamp e.g. string(Dates.now()) - senderselfnote::Any + - `senderselfnote` help sender manage incoming reply msg, could be "for text inference", "img generation" - replyTopic::String + - `replyTopic` sender ask receiver to reply to this topic e.g. "/agent/frontend" - replyToMsgId::String + - `replyToMsgId` sender is responding to this msg ID e.g. string(uuid4()) - acknowledgestatus::String + - `acknowledgestatus` e.g. "acknowledged" - mqttBrokerAddress::String + - `mqttBrokerAddress` MQTT broker URI. e.g. "test.mosquitto.org" - mqttBrokerPort::Integer + - `mqttBrokerPort` MQTT broker port - Return\n - ----- - msgMeta::Dict{Symbol, Any} + - `msgFormatVersion` - Example\n - ----- + - `multiMsgData` + + +# Return + - `msgMeta` + +# Example ```jldoctest julia> using Revise julia> using GeneralUtils @@ -118,36 +237,34 @@ end senderName="keepaliveservice") ``` - Signature\n - ----- +# Signature """ function generate_msgMeta( sendTopic::T1, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt" ; - msgPurpose::Union{T1, Nothing}= nothing, # purpose of this message e.g. "updateStatus" - senderName::Union{T1, Nothing}= nothing, # sender name (String) e.g. "agent-wine-web-frontend" - senderId::Union{T1, Nothing}= nothing, # sender id e.g. string(uuid4()) - receiverName::Union{T1, Nothing}= nothing, # msg receiver name (String) e.g. "agent-backend" - receiverId::Union{T1, Nothing}= nothing, # id of msg receiver, nothing means everyone in the topic e.g. string(uuid4()) - requestresponse::Union{T1, Nothing}= nothing, # this message is a "request" or "response" - getpost::Union{T1, Nothing}= nothing, # this message is a "get" or "post" - msgId::Union{T1, Nothing}= string(uuid4()), + msgPurpose::T1= "nothing", # purpose of this message e.g. "updateStatus" + senderName::T1= "nothing", # sender name (String) e.g. "agent-wine-web-frontend" + senderId::T1= "nothing", # sender id e.g. string(uuid4()) + receiverName::T1= "nothing", # msg receiver name (String) e.g. "agent-backend" + receiverId::T1= "nothing", # id of msg receiver, nothing means everyone in the topic e.g. string(uuid4()) + requestresponse::T1= "nothing", # this message is a "request" or "response" + getpost::T1= "nothing", # this message is a "get" or "post" + msgId::T1= string(uuid4()), timestamp= string(Dates.now()), # message published timestamp # help sender manage incoming reply msg, could be "for text inference", "img generation" - senderselfnote::Any= nothing, + senderselfnote::Any= "nothing", # sender ask receiver to reply to this topic # e.g. "/agent/frontend/wine/chat/api/v1/txt/receive" - replyTopic::Union{T1, Nothing}= nothing, - - replyToMsgId::Union{T1, Nothing}= nothing, # sender is responding to this msg ID - acknowledgestatus::Union{T1, Nothing}= nothing, # "acknowledged", + replyTopic::T1= "nothing", + replyToMsgId::T1= "nothing", # sender is responding to this msg ID + acknowledgestatus::T1= "nothing", # "acknowledged", mqttBrokerAddress::T1= "test.mosquitto.org", mqttBrokerPort::Integer= 1883, - - msgFormatVersion::Union{T1, Nothing}= nothing, + msgFormatVersion::T1= "nothing", + multiMsgData::T1= "nothing", )::Dict{Symbol, Any} where {T1<:AbstractString} @@ -177,6 +294,7 @@ function generate_msgMeta( :mqttBrokerPort=> mqttBrokerPort, :msgFormatVersion=> msgFormatVersion, + :multiMsgData=> multiMsgData, ) return msgMeta @@ -203,76 +321,14 @@ end ```jldoctest julia> using Revise julia> using GeneralUtils, Dates, JSON3, MQTTClient - julia> mqttMsgReceiveChannel = Channel(8) - julia> keepaliveChannel = Channel(8) - julia> function onMsgCallback(topic, payload) - jobj = JSON3.read(String(payload)) - onMsg = copy(jobj) - put!(mqttMsgReceiveChannel, onMsg) - end - julia> mqttInstanceDict = Dict{Symbol, Any}( - :mqttServerInfo=> Dict( - :broker=> "test.mosquitto.org", - :port=> 1883, - ), - :onMsgCallback=> onMsgCallback, - :client=> nothing, - :connection=> nothing, - :subtopic=> ["/servicetopic", "/keepalivetopic"], - :keepalivetopic=> "/keepalive", - :keepaliveChannel=> keepaliveChannel, - :lastTimeMqttConnCheck=> Dates.now(), - ) - julia> mqttInstanceDict[:client], mqttInstanceDict[:connection] = - MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker], - mqttInstanceDict[:mqttServerInfo][:port]) - julia> connect(mqttInstanceDict[:client], mqttInstanceDict[:connection]) - julia> mqttConnStatus = GeneralUtils.isMqttConnectionAlive(mqttInstanceDict) - julia> println(mqttConnStatus) + julia> GeneralUtils.isMqttConnectionAlive(mqttInstance) + true ``` Signature\n ----- """ -function isMqttConnectionAlive(mqttInstanceDict::Dict{Symbol, Any})::Bool - - starttime = Dates.now() - isconnectionalive = false - - # ditch old keepalive msg is any - while isready(mqttInstanceDict[:keepaliveChannel]) - _ = take!(mqttInstanceDict[:keepaliveChannel]) - end - - msgMeta = generate_msgMeta( - mqttInstanceDict[:keepalivetopic], - msgPurpose= "keepalive", - ) - - keepaliveMsg = Dict( - :msgMeta => msgMeta, - :text=>"keepalive", - ) - - publish(mqttInstanceDict[:client], keepaliveMsg[:msgMeta][:sendTopic], - JSON3.write(keepaliveMsg)) - - timediff = 0 - while timediff < 5 - timediff = timedifference(starttime, Dates.now(), "seconds") - if isready(mqttInstanceDict[:keepaliveChannel]) - incomingMsg = take!(mqttInstanceDict[:keepaliveChannel]) - if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId] - # connection is alive - isconnectionalive = true - break - end - end - sleep(1) - end - return isconnectionalive -end -function isMqttConnectionAlive(mqttInstance::mqttClientInstance)::Bool +function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance} starttime = Dates.now() isconnectionalive = false @@ -288,8 +344,10 @@ function isMqttConnectionAlive(mqttInstance::mqttClientInstance)::Bool ) keepaliveMsg = Dict( - :msgMeta => msgMeta, - :text=>"keepalive", + :msgMeta=> msgMeta, + :payload=> Dict( + :text=>"keepalive", + ) ) publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTopic], @@ -310,6 +368,44 @@ function isMqttConnectionAlive(mqttInstance::mqttClientInstance)::Bool end return isconnectionalive end +# function isMqttConnectionAlive(mqttInstanceDict::Dict{Symbol, Any})::Bool + +# starttime = Dates.now() +# isconnectionalive = false + +# # ditch old keepalive msg is any +# while isready(mqttInstanceDict[:keepaliveChannel]) +# _ = take!(mqttInstanceDict[:keepaliveChannel]) +# end + +# msgMeta = generate_msgMeta( +# mqttInstanceDict[:keepalivetopic], +# msgPurpose= "keepalive", +# ) + +# keepaliveMsg = Dict( +# :msgMeta => msgMeta, +# :text=>"keepalive", +# ) + +# publish(mqttInstanceDict[:client], keepaliveMsg[:msgMeta][:sendTopic], +# JSON3.write(keepaliveMsg)) + +# timediff = 0 +# while timediff < 5 +# timediff = timedifference(starttime, Dates.now(), "seconds") +# if isready(mqttInstanceDict[:keepaliveChannel]) +# incomingMsg = take!(mqttInstanceDict[:keepaliveChannel]) +# if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId] +# # connection is alive +# isconnectionalive = true +# break +# end +# end +# sleep(1) +# end +# return isconnectionalive +# end """ Check mqtt server connection, reconnect if disconnected. @@ -331,71 +427,16 @@ end ```jldoctest julia> using Revise julia> using GeneralUtils, Dates, JSON3, MQTTClient - julia> mqttMsgReceiveChannel = Channel(8) - julia> keepaliveChannel = Channel(8) - julia> function onMsgCallback(topic, payload) - jobj = JSON3.read(String(payload)) - onMsg = copy(jobj) - put!(mqttMsgReceiveChannel, onMsg) - end - julia> mqttInstanceDict = Dict{Symbol, Any}( - :mqttServerInfo=> Dict( - :broker=> "test.mosquitto.org", - :port=> 1883, - ), - :onMsgCallback=> onMsgCallback, # function - :client=> nothing, - :connection=> nothing, - :subtopic=> ["/servicetopic", "/sometopic"], # Vector{String} - :keepalivetopic=> "/keepalive", - :keepaliveChannel=> keepaliveChannel, # Channel{Dict} - :lastTimeMqttConnCheck=> Dates.now(), - ) - julia> mqttInstanceDict[:client], mqttInstanceDict[:connection] = - MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker], - mqttInstanceDict[:mqttServerInfo][:port]) - julia> connect(mqttInstanceDict[:client], mqttInstanceDict[:connection]) - julia> GeneralUtils.checkMqttConnection!(mqttInstanceDict, 5) + julia> GeneralUtils.checkMqttConnection!(mqttInstance, 5) ``` Signature\n ----- """ -function checkMqttConnection!(mqttInstanceDict::Dict{Symbol, Any}, interval::Integer)::Bool - isreconnect = false - - # check if mqtt connection is still up - intervaldiff = timedifference(mqttInstanceDict[:lastTimeMqttConnCheck], Dates.now(), "seconds") - - if intervaldiff > interval - while true - mqttConnStatus = isMqttConnectionAlive(mqttInstanceDict) - if mqttConnStatus == false - println("Attemping to reconnect") - # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet - mqttInstanceDict[:client], mqttInstanceDict[:connection] = - MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker], - mqttInstanceDict[:mqttServerInfo][:port]) - connect(mqttInstanceDict[:client], mqttInstanceDict[:connection]) - for topic in mqttInstanceDict[:subtopic] - subscribe(mqttInstanceDict[:client], topic, mqttInstanceDict[:onMsgCallback], qos=QOS_1) - end - isreconnect = true - println("reconnected") - else - mqttInstanceDict[:lastTimeMqttConnCheck] = Dates.now() - break - end - end - end - - return isreconnect -end -function checkMqttConnection!(mqttInstance::mqttClientInstance; - keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Bool +function checkMqttConnection!(mqttInstance::T; + keepaliveCheckInterval::Union{Integer, Nothing}=nothing) where {T<:mqttClientInstance} interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval - isreconnect = false # check if mqtt connection is still up intervaldiff = timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds") @@ -414,17 +455,48 @@ function checkMqttConnection!(mqttInstance::mqttClientInstance; subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) end MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) - isreconnect = true - println("reconnected") - else + else mqttInstance.lastTimeMqttConnCheck = Dates.now() + println("Connected") + return mqttConnStatus break end end + else + return nothing # still within check interval end - - return isreconnect end +# function checkMqttConnection!(mqttInstanceDict::Dict{Symbol, Any}, interval::Integer)::Bool +# isreconnect = false + +# # check if mqtt connection is still up +# intervaldiff = timedifference(mqttInstanceDict[:lastTimeMqttConnCheck], Dates.now(), "seconds") + +# if intervaldiff > interval +# while true +# mqttConnStatus = isMqttConnectionAlive(mqttInstanceDict) +# if mqttConnStatus == false +# println("Attemping to reconnect") +# # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet +# mqttInstanceDict[:client], mqttInstanceDict[:connection] = +# MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker], +# mqttInstanceDict[:mqttServerInfo][:port]) +# connect(mqttInstanceDict[:client], mqttInstanceDict[:connection]) +# for topic in mqttInstanceDict[:subtopic] +# subscribe(mqttInstanceDict[:client], topic, mqttInstanceDict[:onMsgCallback], qos=QOS_1) +# end +# isreconnect = true +# println("reconnected") +# else +# mqttInstanceDict[:lastTimeMqttConnCheck] = Dates.now() +# break +# end +# end +# end + +# return isreconnect +# end + """ Send a message to specified MQTT topic then wait for reply. @@ -435,7 +507,7 @@ end # Return - `result::NamedTuple` - ( isDone= true, error= nothing ) + ( success= true, error= nothing ) # Example ```jldoctest @@ -452,39 +524,38 @@ end :msgMeta=> msgMeta, :payload=> Dict("hello"=> "World"), ) - julia> isDone, error = GeneralUtils.sendMqttMsg(outgoingMsg) + julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) ``` # Signature """ -function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any} - try - # Instantiate a client and connection. - client, connection = MakeConnection( - outgoingMsg[:msgMeta][:mqttBrokerAddress], - outgoingMsg[:msgMeta][:mqttBrokerPort]) - connect(client, connection) - - publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg)) - # disconnect mqtt - disconnect(client) - return (isDone=true, error=nothing) - catch e - return (isDone=false, error=e) - end -end - function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T} )::NamedTuple where {T<:Any} try publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg)) # disconnect mqtt disconnect(client) - return (isDone=true, error=nothing) + return (success=true, error=nothing) catch e - return (isDone=false, error=e) + return (success=false, error=e) end end +# function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any} +# try +# # Instantiate a client and connection. +# client, connection = MakeConnection( +# outgoingMsg[:msgMeta][:mqttBrokerAddress], +# outgoingMsg[:msgMeta][:mqttBrokerPort]) +# connect(client, connection) + +# publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg)) +# # disconnect mqtt +# disconnect(client) +# return (success=true, error=nothing) +# catch e +# return (success=false, error=e) +# end +# end """ Send a message to specified MQTT topic then wait for reply. @@ -500,7 +571,7 @@ end # Return - `result::NamedTuple` ( - isDone= true, # idicates whether sending MQTT message successful + success= true, # idicates whether sending MQTT message successful error= nothing # error message response= somemessage # response message ) @@ -508,7 +579,7 @@ end # Example ```jldoctest julia> using Revise -julia> using GeneralUtils, Dates, UUIDs +julia> using GeneralUtils, Dates, UUIDs, JSON3 julia> msgMeta = GeneralUtils.generate_msgMeta( "/testtopic", senderName= "somename", @@ -520,51 +591,60 @@ julia> outgoingMsg = Dict( :msgMeta=> msgMeta, :payload=> Dict(:hello=> "World"), ) -julia> isDone, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg) +julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg) ``` # Signature """ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T}; timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any} - - mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])" - mqttMsgReceiveChannel = Channel(4) - - # ask message receiver to send a message back to specified topic - outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic + # mqttMsgReceiveTopic = ["/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"] + mqttMsgReceiveTopic = ["/receivetopic"] + mqttMsgReceiveChannel = (ch1=Channel(8),) + keepaliveChannel = Channel(8) # Define the callback for receiving messages. function onMsgCallback(topic, payload) jobj = JSON3.read(String(payload)) onMsg = copy(jobj) - put!(mqttMsgReceiveChannel, onMsg) + put!(mqttMsgReceiveChannel[:ch1], onMsg) end - # Instantiate a client and connection. - client, connection = MakeConnection(outgoingMsg[:msgMeta][:mqttBrokerAddress], - outgoingMsg[:msgMeta][:mqttBrokerPort]) - connect(client, connection) - subscribe(client, mqttMsgReceiveTopic, onMsgCallback, qos=QOS_1) - + mqttInstance = mqttClientInstance_v2( + outgoingMsg[:msgMeta][:mqttBrokerAddress], + mqttMsgReceiveTopic, + mqttMsgReceiveChannel, + keepaliveChannel, + onMsgCallback + ) + + return sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt) +end + +function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol, + outgoingMsg::Dict{Symbol, T}; timeout::Integer=60, maxattempt::Integer=1 + )::NamedTuple where {T<:Any} + timepass = nothing attempts = 1 while attempts <= maxattempt - publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg)) + sendMqttMsg(mqttInstance, outgoingMsg) starttime = Dates.now() while true timepass = timedifference(starttime, Dates.now(), "seconds") if timepass <= timeout - if isready(mqttMsgReceiveChannel) - incomingMsg = take!(mqttMsgReceiveChannel) + if isready(mqttInstance.msgReceiveChannel[receivechannel]) + incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel]) if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId] - # disconnect mqtt - unsubscribe(client, mqttMsgReceiveTopic) - disconnect(client) - return (isDone=true, error=nothing, response=incomingMsg[:payload]) + if incomingMsg[:msgMeta][:msgPurpose] == "NACK" + println("NACK") + break # resend the msg + else + return (success=true, error=nothing, response=incomingMsg[:payload]) + end end end else @@ -576,17 +656,236 @@ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T}; attempts += 1 end - # disconnect mqtt - unsubscribe(client, mqttMsgReceiveTopic) - disconnect(client) - return (isDone=false, error="no response, timeout $timepass/$timeout, $attempts publish attempted", + return (success=false, + error="no response, timeout $timepass/$timeout, $(attempts-1) publish attempted", response=nothing) end +# function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T}; +# timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any} + +# mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])" +# mqttMsgReceiveChannel = Channel(4) + +# # ask message receiver to send a message back to specified topic +# outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic + +# # Define the callback for receiving messages. +# function onMsgCallback(topic, payload) +# jobj = JSON3.read(String(payload)) +# onMsg = copy(jobj) +# put!(mqttMsgReceiveChannel, onMsg) +# end + +# # Instantiate a client and connection. +# client, connection = MakeConnection(outgoingMsg[:msgMeta][:mqttBrokerAddress], +# outgoingMsg[:msgMeta][:mqttBrokerPort]) +# connect(client, connection) +# subscribe(client, mqttMsgReceiveTopic, onMsgCallback, qos=QOS_1) + +# timepass = nothing +# attempts = 1 +# while attempts <= maxattempt +# publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg)) + +# starttime = Dates.now() +# while true +# timepass = timedifference(starttime, Dates.now(), "seconds") + +# if timepass <= timeout +# if isready(mqttMsgReceiveChannel) +# incomingMsg = take!(mqttMsgReceiveChannel) + +# if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId] +# if msgPurpose == "NACK" +# break # resend the msg +# else +# # disconnect mqtt +# unsubscribe(client, mqttMsgReceiveTopic) +# disconnect(client) +# return (success=true, error=nothing, response=incomingMsg[:payload]) +# end +# end +# end +# else +# break +# end +# sleep(1) +# end +# println("attempts $attempts ", @__FILE__, " ", @__LINE__) +# attempts += 1 +# end + +# # disconnect mqtt +# unsubscribe(client, mqttMsgReceiveTopic) +# disconnect(client) +# return (success=false, +# error="no response, timeout $timepass/$timeout, $attempts publish attempted", +# response=nothing) +# end +""" Send a message to specified MQTT topic then wait for reply. + +# Arguments + - `outgoingMsg::Dict` + an outgoing message + +# Return + - `result::NamedTuple` + ( success= true, error= nothing ) + +# Example + ```jldoctest + julia> using Revise + julia> using GeneralUtils, Dates, JSON3, UUIDs + julia> msgMeta = GeneralUtils.generate_msgMeta( + "/testtopic", + senderName= "somename", + senderId= string(uuid4()), + mqttBrokerAddress= "test.mosquitto.org", + mqttBrokerPort= 1883, + ) + julia> outgoingMsg = Dict( + :msgMeta=> msgMeta, + :payload=> Dict("hello"=> "World"), + ) + julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) + ``` +# TODO + - [TESTING] implement crude version of this function + - [] transfer multiple msg with the same mqtt connection. The transfer loop should be + implemented by passing mqttInstance into sendReceiveMqttMsg() so that + 1-connection is used to send all data + +# Signature +""" +function transferDataOverMQTT_sender(disintegrateF::Function, data, partsize, msgMeta) + + # use disintegrateF(workDict, data) to disintegrate data into small parts + d = disintegrateF(data, partsize) + + # compute hash value for each piece of data + hashes = [hash(d[i]) for i in 1:d[:totalparts]] + for i in 1:totalpart + push!(hashes, hash(d[i])) + end + + mqttMsgReceiveTopic = "/GeneralUtils_transferDataOverMQTT/$(msgMeta[:senderId])" + mqttMsgReceiveChannel = (ch1=Channel(8),) + keepaliveChannel = Channel(8) + + # Define the callback for receiving messages. + function onMsgCallback(topic, payload) + jobj = JSON3.read(String(payload)) + onMsg = copy(jobj) + put!(mqttMsgReceiveChannel[:ch1], onMsg) + end + + mqttInstance = GeneralUtils.mqttClientInstance_v2( + msgMeta[:mqttBrokerAddress], + msgMeta[:mqttBrokerPort], + [mqttMsgReceiveTopic], + mqttMsgReceiveChannel, + keepaliveChannel, + onMsgCallback + ) + + # 1st msg to send metadata + msgMeta[:replyTopic] = mqttMsgReceiveTopic # receiver() to send a message back to this topic + msgMeta[:multiMsgData] = "start" + outgoingMsg = Dict( + :msgMeta=> msgMeta, + :payload=> Dict( + :datatype=> d[:datatype], + :totalparts=> d[:totalparts], + :partsize=> d[:partsize], + :hashes=> hashes + ) + ) + pubresult = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=10, maxattempt=3) + pubresult[:success] == true ? nothing : return pubresult + + # while loop for sending the data + msgMeta[:multiMsgData] = "transfering" + partnumber = 1 + while partnumber <= d[:totalparts] + + # publish + outgoingMsg = Dict( + :msgMeta=> msgMeta, + :payload=> Dict( + :partnumber=> partnumber, + :data=> d[:data][partnumber], + ) + ) + + pubresult = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=10, maxattempt=3) + pubresult[:success] == true ? nothing : return pubresult + + partnumber += 1 + + sleep(1) + end + + # final msg to send end msg + msgMeta[:multiMsgData] = "end" + outgoingMsg = Dict( + :msgMeta=> msgMeta, + :payload=> Dict() + ) + pubresult = sendReceiveMqttMsg(outgoingMsg; timeout=10, maxattempt=3) + pubresult[:success] == true ? nothing : return pubresult + + return (success=true, error=nothing, response=nothing) +end + +""" Send a message to specified MQTT topic then wait for reply. + +# Arguments + - `outgoingMsg::Dict` + an outgoing message + +# Return + - `result::NamedTuple` + ( success= true, error= nothing ) + +# Example + ```jldoctest + julia> using Revise + julia> using GeneralUtils, Dates, JSON3, UUIDs + julia> msgMeta = GeneralUtils.generate_msgMeta( + "/testtopic", + senderName= "somename", + senderId= string(uuid4()), + mqttBrokerAddress= "test.mosquitto.org", + mqttBrokerPort= 1883, + ) + julia> outgoingMsg = Dict( + :msgMeta=> msgMeta, + :payload=> Dict("hello"=> "World"), + ) + julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) + ``` +# TODO + - [WORKING] + +# Signature +""" +function transferDataOverMQTT_receiver() + + + + + + + + + +end + diff --git a/src/interface.jl b/src/interface.jl index 8d647bd..c4f0d40 100644 --- a/src/interface.jl +++ b/src/interface.jl @@ -6,7 +6,7 @@ export noNegative!, randomWithProb, randomChoiceWithProb, findIndex, limitvalue, matMul_3Dto4D_batchwise, isNotEqual, linearToCartesian, vectorMax, multiply_last, multiplyRandomElements, replaceElements, replaceElements!, isBetween, isLess, allTrue, getStringBetweenCharacters, JSON3read_stringKey, mkDictPath!, - getDictPath, dataframeToCSV, dfToJSONRows + getDictPath using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames, CSV using ..util, ..communication @@ -1149,63 +1149,6 @@ function getDictPath(dict::Dict, keys::Vector) end -""" Convert a dataframe into CSV. - -# Arguments - - `df::DataFrame` - A connection object to Postgres database - -# Return - - `result::String` - -# Example -```jldoctest -julia> using DataFrames, GeneralUtils -julia> df = DataFrame(A=1:3, B=5:7, fixed=1) -julia> result = GeneralUtils.dataframeToCSV(df) -``` - -# Signature -""" -function dataframeToCSV(df::DataFrame) - # Create an IOBuffer to capture the output - io = IOBuffer() - CSV.write(io, df) - dfStr = String(take!(io)) - return dfStr -end - -""" Convert a DataFrame into a list of JSON rows. - -# Arguments - - `df::DataFrame` - The input DataFrame to be converted. - -# Return - - `rows::Vector{Dict{String, Any}}` - A vector of dictionaries, where each dictionary represents a row in JSON format. - -# Example - ```jldoctest - julia> using DataFrame, JSON3 - julia> df = DataFrame(A = [1, 2, 3], B = ["apple", "banana", "cherry"]) - julia> json_rows = dfToJSONRows(df) - ``` - -# Signature -""" -function dfToJSONRows(df::DataFrame) - rows = [] - for row in eachrow(df) - json_row = Dict{String, Any}() - for col in names(df) - json_row[col] = row[col] - end - push!(rows, json_row) - end - return rows -end - diff --git a/src/util.jl b/src/util.jl index dd91e54..6f92278 100644 --- a/src/util.jl +++ b/src/util.jl @@ -1,9 +1,10 @@ module util export timedifference, showstracktrace, findHighestIndexKey, uuid4snakecase, replaceDictKeys, - findMatchingDictKey, textToDict, randstring, randstrings, timeout + findMatchingDictKey, textToDict, randstring, randstrings, timeout, + dataframeToCSV, dfToVectorDict, disintegrate_vectorDict -using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient +using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames # ---------------------------------------------- 100 --------------------------------------------- # @@ -412,6 +413,131 @@ function timeout(f::Function, timeoutwindow::Integer; fargs=nothing, timeoutmsg= timeoutmsg end end + + + +""" Convert a dataframe into CSV. + +# Arguments + - `df::DataFrame` + A connection object to Postgres database + +# Return + - `result::String` + +# Example +```jldoctest +julia> using DataFrames, GeneralUtils +julia> df = DataFrame(A=1:3, B=5:7, fixed=1) +julia> result = GeneralUtils.dataframeToCSV(df) +``` + +# Signature +""" +function dataframeToCSV(df::DataFrame) + # Create an IOBuffer to capture the output + io = IOBuffer() + CSV.write(io, df) + dfStr = String(take!(io)) + return dfStr +end + +""" Convert a DataFrame into a list of Dict rows. + +# Arguments + - `df::DataFrame` + The input DataFrame to be converted. + +# Return + - `rows::Vector{Dict{String, Any}}` + A vector of dictionaries, where each dictionary represents a row in a dataframe. + +# Example + ```jldoctest + julia> using DataFrames, JSON3, GeneralUtils + julia> df = DataFrame(A = [1, 2, 3], B = ["apple", "banana", "cherry"]) + julia> vectorDict = GeneralUtils.dfToVectorDict(df) + [Dict{String, Any}("B" => "apple", "A" => 1), + Dict{String, Any}("B" => "banana", "A" => 2) + Dict{String, Any}("B" => "cherry", "A" => 3)] + ``` + +# Signature +""" +function dfToVectorDict(df::DataFrame) + vec = [] + for row in eachrow(df) + d = Dict{String, Any}() + for col in names(df) + d[col] = row[col] + end + push!(vec, d) + end + return vec +end + + + +""" Turn a large vector of dictionaries into smaller one + +# Arguments + - `data` + data to be partioning + - `partsize` + how many dicts per part + +# Return + - `parts` + a dictionay of parts + +# Example + ```jldoctest + julia> using GeneralUtils, Dates, JSON3, UUIDs + julia> vecDict = [Dict("a" => i) for i in 1:10] + julia> d = GeneralUtils.disintegrate_vectorDict(vecDict, 3) + julia> println(d[:data]) + Dict{Int64, Vector{Dict}} with 4 entries: + 1 => [Dict("a"=>1), Dict("a"=>2), Dict("a"=>3)] + 2 => [Dict("a"=>4), Dict("a"=>5), Dict("a"=>6)] + 3 => [Dict("a"=>7), Dict("a"=>8), Dict("a"=>9)] + 4 => [Dict("a"=>10)] + ``` + +# Signature +""" +function disintegrate_vectorDict(data::Vector{Dict{T1, T2}}, partsize::Integer + ) where {T1<:Any, T2<:Any} + parts = Dict{Int, Vector{Dict}}() + for (i, dict) in enumerate(data) + partkey = (i - 1) รท partsize + 1 + if !haskey(parts, partkey) + parts[partkey] = Vector{Dict}() + end + push!(parts[partkey], dict) + end + return (datatype="vector{Dict}", totalparts=length(parts), partsize=partsize, data=parts) +end + + + + + + + + + + + + + + + + + + + + +