This commit is contained in:
2024-09-27 15:23:56 +07:00
parent f8a72427fe
commit 564ed3cb70
3 changed files with 651 additions and 283 deletions

View File

@@ -1,14 +1,16 @@
module communication
export generate_msgMeta, isMqttConnectionAlive, checkMqttConnection!,
sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance
sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2
using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient
using ..util
# ---------------------------------------------- 100 --------------------------------------------- #
mutable struct mqttClientInstance
abstract type mqttClientInstance end
mutable struct mqttClientInstance_v1 <: mqttClientInstance
mqttBrokerAddress::String
mqttBrokerPort::Integer
subtopic::Vector{String}
@@ -23,7 +25,7 @@ mutable struct mqttClientInstance
lastTimeMqttConnCheck::DateTime
end
function mqttClientInstance(
function mqttClientInstance_v1(
mqttBrokerAddress::String,
mqttBrokerPort::Integer,
subtopic::Vector{String},
@@ -62,53 +64,170 @@ function mqttClientInstance(
end
mutable struct mqttClientInstance_v2 <: mqttClientInstance
mqttBrokerAddress::String
mqttBrokerPort::Integer
subtopic::Vector{String}
msgReceiveChannel
onMsgCallback::Function
qos::MQTTClient.QOS
client::MQTTClient.Client
connection::MQTTClient.Connection
keepalivetopic::String
keepaliveChannel::Channel
keepaliveCheckInterval::Integer # second
lastTimeMqttConnCheck::DateTime
end
""" MQTT client v2. The difference between client v1 and v2 is v2 allows multiple
msgReceiveChannel without having to redefine mqttClientInstance again like in v1.
This is achieved by using msgReceiveChannel::NamedTuple to store multiple channel.
# Arguments
- `mqttBrokerAddress`
MQTT broker address. Can be URL or IP address
- `subtopic`
A list of topic to be subscribed Ex. ["/testopic_1", "/testopic_2"]
- `msgReceiveChannel`
A storage the mqtt client used to store incoming message.
Ex. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(32))
- `keepaliveChannel`
A channel to store keepalive message
- `onMsgCallback`
A user-defined function to handle incoming message
# Keyword Arguments
- `mqttBrokerPort`
mqtt broker port
- `keepalivetopic`
A topic where keepalive message going
- `keepaliveCheckInterval`
A time interval to check whether the mqtt client still connect to mqtt broker
- `qos`
Quality of Service. Can be QOS_0, QOS_1, QOS_2
# Return
- `mqttInstance`
A new mqtt client
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, UUIDs
julia> mqttMsgReceiveTopic = ["/receivetopic_1", "/receivetopic_2"]
julia> mqttMsgReceiveChannel = (ch1=Channel(8), ch2=Channel(32))
julia> keepaliveChannel = Channel(8)
julia> function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively
if occursin("topic_1", topic)
push!(mqttMsgReceiveTopic[:ch1], incomingMqttMsg)
elseif occursin("topic_2", topic)
push!(mqttMsgReceiveTopic[:ch2], incomingMqttMsg)
elseif occursin("keepalive", topic)
put!(keepaliveChannel, incomingMqttMsg)
else
println("undefined condition ", @__FILE__, " ", @__LINE__)
end
end
julia> mqttInstance = GeneralUtils.mqttClientInstance_v2(
"mqtt.yiem.cc",
mqttMsgReceiveTopic,
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback
)
```
# Signature
"""
function mqttClientInstance_v2(
mqttBrokerAddress::String,
subtopic::Vector{String},
msgReceiveChannel, # NamedTuple of channels i.e. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(8))
keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback()
onMsgCallback::Function
;
mqttBrokerPort::Integer=1883,
keepalivetopic::String= "/keepalive/$(uuid4())",
keepaliveCheckInterval::Integer=30,
qos::MQTTClient.QOS=QOS_1,
)
client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
MQTTClient.connect(client, connection)
for i in subtopic
MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
end
MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos)
instance = mqttClientInstance_v2(
mqttBrokerAddress,
mqttBrokerPort,
subtopic,
msgReceiveChannel,
onMsgCallback,
qos,
client,
connection,
keepalivetopic,
keepaliveChannel,
keepaliveCheckInterval,
Dates.now(),
)
return instance
end
""" Generate msgMeta to be including in a message. So the message receiver know
what to do with the message.
Arguments\n
-----
sendTopic::String
# Arguments
- `sendTopic`
topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
Keyword Arguments\n
-----
msgPurpose::String
# Keyword Arguments
- `msgPurpose`
purpose of this message e.g. "updateStatus"
senderName::String
- `senderName`
sender name (String) e.g. "agent-wine-web-frontend"
senderId::String
- `senderId`
sender id e.g. string(uuid4())
receiverName::String
- `receiverName`
msg receiver name (String) e.g. "agent-backend"
receiverId::String
- `receiverId`
msg receiver id, nothing means everyone in the topic e.g. string(uuid4())
requestresponse::String
- `requestresponse`
this message is a "request" or "response"
getpost::String
- `getpost`
this message is a "get" or "post"
msgId::String
- `msgId`
message ID e.g. string(uuid4())
timestamp::String
- `timestamp`
message published timestamp e.g. string(Dates.now())
senderselfnote::Any
- `senderselfnote`
help sender manage incoming reply msg, could be "for text inference", "img generation"
replyTopic::String
- `replyTopic`
sender ask receiver to reply to this topic e.g. "/agent/frontend"
replyToMsgId::String
- `replyToMsgId`
sender is responding to this msg ID e.g. string(uuid4())
acknowledgestatus::String
- `acknowledgestatus`
e.g. "acknowledged"
mqttBrokerAddress::String
- `mqttBrokerAddress`
MQTT broker URI. e.g. "test.mosquitto.org"
mqttBrokerPort::Integer
- `mqttBrokerPort`
MQTT broker port
Return\n
-----
msgMeta::Dict{Symbol, Any}
- `msgFormatVersion`
Example\n
-----
- `multiMsgData`
# Return
- `msgMeta`
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils
@@ -118,36 +237,34 @@ end
senderName="keepaliveservice")
```
Signature\n
-----
# Signature
"""
function generate_msgMeta(
sendTopic::T1, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
;
msgPurpose::Union{T1, Nothing}= nothing, # purpose of this message e.g. "updateStatus"
senderName::Union{T1, Nothing}= nothing, # sender name (String) e.g. "agent-wine-web-frontend"
senderId::Union{T1, Nothing}= nothing, # sender id e.g. string(uuid4())
receiverName::Union{T1, Nothing}= nothing, # msg receiver name (String) e.g. "agent-backend"
receiverId::Union{T1, Nothing}= nothing, # id of msg receiver, nothing means everyone in the topic e.g. string(uuid4())
requestresponse::Union{T1, Nothing}= nothing, # this message is a "request" or "response"
getpost::Union{T1, Nothing}= nothing, # this message is a "get" or "post"
msgId::Union{T1, Nothing}= string(uuid4()),
msgPurpose::T1= "nothing", # purpose of this message e.g. "updateStatus"
senderName::T1= "nothing", # sender name (String) e.g. "agent-wine-web-frontend"
senderId::T1= "nothing", # sender id e.g. string(uuid4())
receiverName::T1= "nothing", # msg receiver name (String) e.g. "agent-backend"
receiverId::T1= "nothing", # id of msg receiver, nothing means everyone in the topic e.g. string(uuid4())
requestresponse::T1= "nothing", # this message is a "request" or "response"
getpost::T1= "nothing", # this message is a "get" or "post"
msgId::T1= string(uuid4()),
timestamp= string(Dates.now()), # message published timestamp
# help sender manage incoming reply msg, could be "for text inference", "img generation"
senderselfnote::Any= nothing,
senderselfnote::Any= "nothing",
# sender ask receiver to reply to this topic
# e.g. "/agent/frontend/wine/chat/api/v1/txt/receive"
replyTopic::Union{T1, Nothing}= nothing,
replyToMsgId::Union{T1, Nothing}= nothing, # sender is responding to this msg ID
acknowledgestatus::Union{T1, Nothing}= nothing, # "acknowledged",
replyTopic::T1= "nothing",
replyToMsgId::T1= "nothing", # sender is responding to this msg ID
acknowledgestatus::T1= "nothing", # "acknowledged",
mqttBrokerAddress::T1= "test.mosquitto.org",
mqttBrokerPort::Integer= 1883,
msgFormatVersion::Union{T1, Nothing}= nothing,
msgFormatVersion::T1= "nothing",
multiMsgData::T1= "nothing",
)::Dict{Symbol, Any} where {T1<:AbstractString}
@@ -177,6 +294,7 @@ function generate_msgMeta(
:mqttBrokerPort=> mqttBrokerPort,
:msgFormatVersion=> msgFormatVersion,
:multiMsgData=> multiMsgData,
)
return msgMeta
@@ -203,76 +321,14 @@ end
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, MQTTClient
julia> mqttMsgReceiveChannel = Channel(8)
julia> keepaliveChannel = Channel(8)
julia> function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel, onMsg)
end
julia> mqttInstanceDict = Dict{Symbol, Any}(
:mqttServerInfo=> Dict(
:broker=> "test.mosquitto.org",
:port=> 1883,
),
:onMsgCallback=> onMsgCallback,
:client=> nothing,
:connection=> nothing,
:subtopic=> ["/servicetopic", "/keepalivetopic"],
:keepalivetopic=> "/keepalive",
:keepaliveChannel=> keepaliveChannel,
:lastTimeMqttConnCheck=> Dates.now(),
)
julia> mqttInstanceDict[:client], mqttInstanceDict[:connection] =
MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker],
mqttInstanceDict[:mqttServerInfo][:port])
julia> connect(mqttInstanceDict[:client], mqttInstanceDict[:connection])
julia> mqttConnStatus = GeneralUtils.isMqttConnectionAlive(mqttInstanceDict)
julia> println(mqttConnStatus)
julia> GeneralUtils.isMqttConnectionAlive(mqttInstance)
true
```
Signature\n
-----
"""
function isMqttConnectionAlive(mqttInstanceDict::Dict{Symbol, Any})::Bool
starttime = Dates.now()
isconnectionalive = false
# ditch old keepalive msg is any
while isready(mqttInstanceDict[:keepaliveChannel])
_ = take!(mqttInstanceDict[:keepaliveChannel])
end
msgMeta = generate_msgMeta(
mqttInstanceDict[:keepalivetopic],
msgPurpose= "keepalive",
)
keepaliveMsg = Dict(
:msgMeta => msgMeta,
:text=>"keepalive",
)
publish(mqttInstanceDict[:client], keepaliveMsg[:msgMeta][:sendTopic],
JSON3.write(keepaliveMsg))
timediff = 0
while timediff < 5
timediff = timedifference(starttime, Dates.now(), "seconds")
if isready(mqttInstanceDict[:keepaliveChannel])
incomingMsg = take!(mqttInstanceDict[:keepaliveChannel])
if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId]
# connection is alive
isconnectionalive = true
break
end
end
sleep(1)
end
return isconnectionalive
end
function isMqttConnectionAlive(mqttInstance::mqttClientInstance)::Bool
function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance}
starttime = Dates.now()
isconnectionalive = false
@@ -288,8 +344,10 @@ function isMqttConnectionAlive(mqttInstance::mqttClientInstance)::Bool
)
keepaliveMsg = Dict(
:msgMeta => msgMeta,
:text=>"keepalive",
:msgMeta=> msgMeta,
:payload=> Dict(
:text=>"keepalive",
)
)
publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTopic],
@@ -310,6 +368,44 @@ function isMqttConnectionAlive(mqttInstance::mqttClientInstance)::Bool
end
return isconnectionalive
end
# function isMqttConnectionAlive(mqttInstanceDict::Dict{Symbol, Any})::Bool
# starttime = Dates.now()
# isconnectionalive = false
# # ditch old keepalive msg is any
# while isready(mqttInstanceDict[:keepaliveChannel])
# _ = take!(mqttInstanceDict[:keepaliveChannel])
# end
# msgMeta = generate_msgMeta(
# mqttInstanceDict[:keepalivetopic],
# msgPurpose= "keepalive",
# )
# keepaliveMsg = Dict(
# :msgMeta => msgMeta,
# :text=>"keepalive",
# )
# publish(mqttInstanceDict[:client], keepaliveMsg[:msgMeta][:sendTopic],
# JSON3.write(keepaliveMsg))
# timediff = 0
# while timediff < 5
# timediff = timedifference(starttime, Dates.now(), "seconds")
# if isready(mqttInstanceDict[:keepaliveChannel])
# incomingMsg = take!(mqttInstanceDict[:keepaliveChannel])
# if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId]
# # connection is alive
# isconnectionalive = true
# break
# end
# end
# sleep(1)
# end
# return isconnectionalive
# end
""" Check mqtt server connection, reconnect if disconnected.
@@ -331,71 +427,16 @@ end
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, MQTTClient
julia> mqttMsgReceiveChannel = Channel(8)
julia> keepaliveChannel = Channel(8)
julia> function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel, onMsg)
end
julia> mqttInstanceDict = Dict{Symbol, Any}(
:mqttServerInfo=> Dict(
:broker=> "test.mosquitto.org",
:port=> 1883,
),
:onMsgCallback=> onMsgCallback, # function
:client=> nothing,
:connection=> nothing,
:subtopic=> ["/servicetopic", "/sometopic"], # Vector{String}
:keepalivetopic=> "/keepalive",
:keepaliveChannel=> keepaliveChannel, # Channel{Dict}
:lastTimeMqttConnCheck=> Dates.now(),
)
julia> mqttInstanceDict[:client], mqttInstanceDict[:connection] =
MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker],
mqttInstanceDict[:mqttServerInfo][:port])
julia> connect(mqttInstanceDict[:client], mqttInstanceDict[:connection])
julia> GeneralUtils.checkMqttConnection!(mqttInstanceDict, 5)
julia> GeneralUtils.checkMqttConnection!(mqttInstance, 5)
```
Signature\n
-----
"""
function checkMqttConnection!(mqttInstanceDict::Dict{Symbol, Any}, interval::Integer)::Bool
isreconnect = false
# check if mqtt connection is still up
intervaldiff = timedifference(mqttInstanceDict[:lastTimeMqttConnCheck], Dates.now(), "seconds")
if intervaldiff > interval
while true
mqttConnStatus = isMqttConnectionAlive(mqttInstanceDict)
if mqttConnStatus == false
println("Attemping to reconnect")
# use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
mqttInstanceDict[:client], mqttInstanceDict[:connection] =
MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker],
mqttInstanceDict[:mqttServerInfo][:port])
connect(mqttInstanceDict[:client], mqttInstanceDict[:connection])
for topic in mqttInstanceDict[:subtopic]
subscribe(mqttInstanceDict[:client], topic, mqttInstanceDict[:onMsgCallback], qos=QOS_1)
end
isreconnect = true
println("reconnected")
else
mqttInstanceDict[:lastTimeMqttConnCheck] = Dates.now()
break
end
end
end
return isreconnect
end
function checkMqttConnection!(mqttInstance::mqttClientInstance;
keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Bool
function checkMqttConnection!(mqttInstance::T;
keepaliveCheckInterval::Union{Integer, Nothing}=nothing) where {T<:mqttClientInstance}
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
isreconnect = false
# check if mqtt connection is still up
intervaldiff = timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds")
@@ -414,17 +455,48 @@ function checkMqttConnection!(mqttInstance::mqttClientInstance;
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
end
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
isreconnect = true
println("reconnected")
else
mqttInstance.lastTimeMqttConnCheck = Dates.now()
println("Connected")
return mqttConnStatus
break
end
end
else
return nothing # still within check interval
end
return isreconnect
end
# function checkMqttConnection!(mqttInstanceDict::Dict{Symbol, Any}, interval::Integer)::Bool
# isreconnect = false
# # check if mqtt connection is still up
# intervaldiff = timedifference(mqttInstanceDict[:lastTimeMqttConnCheck], Dates.now(), "seconds")
# if intervaldiff > interval
# while true
# mqttConnStatus = isMqttConnectionAlive(mqttInstanceDict)
# if mqttConnStatus == false
# println("Attemping to reconnect")
# # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
# mqttInstanceDict[:client], mqttInstanceDict[:connection] =
# MakeConnection(mqttInstanceDict[:mqttServerInfo][:broker],
# mqttInstanceDict[:mqttServerInfo][:port])
# connect(mqttInstanceDict[:client], mqttInstanceDict[:connection])
# for topic in mqttInstanceDict[:subtopic]
# subscribe(mqttInstanceDict[:client], topic, mqttInstanceDict[:onMsgCallback], qos=QOS_1)
# end
# isreconnect = true
# println("reconnected")
# else
# mqttInstanceDict[:lastTimeMqttConnCheck] = Dates.now()
# break
# end
# end
# end
# return isreconnect
# end
""" Send a message to specified MQTT topic then wait for reply.
@@ -435,7 +507,7 @@ end
# Return
- `result::NamedTuple`
( isDone= true, error= nothing )
( success= true, error= nothing )
# Example
```jldoctest
@@ -452,39 +524,38 @@ end
:msgMeta=> msgMeta,
:payload=> Dict("hello"=> "World"),
)
julia> isDone, error = GeneralUtils.sendMqttMsg(outgoingMsg)
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
```
# Signature
"""
function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
try
# Instantiate a client and connection.
client, connection = MakeConnection(
outgoingMsg[:msgMeta][:mqttBrokerAddress],
outgoingMsg[:msgMeta][:mqttBrokerPort])
connect(client, connection)
publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
# disconnect mqtt
disconnect(client)
return (isDone=true, error=nothing)
catch e
return (isDone=false, error=e)
end
end
function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T}
)::NamedTuple where {T<:Any}
try
publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
# disconnect mqtt
disconnect(client)
return (isDone=true, error=nothing)
return (success=true, error=nothing)
catch e
return (isDone=false, error=e)
return (success=false, error=e)
end
end
# function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
# try
# # Instantiate a client and connection.
# client, connection = MakeConnection(
# outgoingMsg[:msgMeta][:mqttBrokerAddress],
# outgoingMsg[:msgMeta][:mqttBrokerPort])
# connect(client, connection)
# publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
# # disconnect mqtt
# disconnect(client)
# return (success=true, error=nothing)
# catch e
# return (success=false, error=e)
# end
# end
""" Send a message to specified MQTT topic then wait for reply.
@@ -500,7 +571,7 @@ end
# Return
- `result::NamedTuple`
(
isDone= true, # idicates whether sending MQTT message successful
success= true, # idicates whether sending MQTT message successful
error= nothing # error message
response= somemessage # response message
)
@@ -508,7 +579,7 @@ end
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, UUIDs
julia> using GeneralUtils, Dates, UUIDs, JSON3
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
@@ -520,51 +591,60 @@ julia> outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(:hello=> "World"),
)
julia> isDone, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
```
# Signature
"""
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
mqttMsgReceiveChannel = Channel(4)
# ask message receiver to send a message back to specified topic
outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
# mqttMsgReceiveTopic = ["/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"]
mqttMsgReceiveTopic = ["/receivetopic"]
mqttMsgReceiveChannel = (ch1=Channel(8),)
keepaliveChannel = Channel(8)
# Define the callback for receiving messages.
function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel, onMsg)
put!(mqttMsgReceiveChannel[:ch1], onMsg)
end
# Instantiate a client and connection.
client, connection = MakeConnection(outgoingMsg[:msgMeta][:mqttBrokerAddress],
outgoingMsg[:msgMeta][:mqttBrokerPort])
connect(client, connection)
subscribe(client, mqttMsgReceiveTopic, onMsgCallback, qos=QOS_1)
mqttInstance = mqttClientInstance_v2(
outgoingMsg[:msgMeta][:mqttBrokerAddress],
mqttMsgReceiveTopic,
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback
)
return sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt)
end
function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol,
outgoingMsg::Dict{Symbol, T}; timeout::Integer=60, maxattempt::Integer=1
)::NamedTuple where {T<:Any}
timepass = nothing
attempts = 1
while attempts <= maxattempt
publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
sendMqttMsg(mqttInstance, outgoingMsg)
starttime = Dates.now()
while true
timepass = timedifference(starttime, Dates.now(), "seconds")
if timepass <= timeout
if isready(mqttMsgReceiveChannel)
incomingMsg = take!(mqttMsgReceiveChannel)
if isready(mqttInstance.msgReceiveChannel[receivechannel])
incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel])
if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
# disconnect mqtt
unsubscribe(client, mqttMsgReceiveTopic)
disconnect(client)
return (isDone=true, error=nothing, response=incomingMsg[:payload])
if incomingMsg[:msgMeta][:msgPurpose] == "NACK"
println("NACK")
break # resend the msg
else
return (success=true, error=nothing, response=incomingMsg[:payload])
end
end
end
else
@@ -576,17 +656,236 @@ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
attempts += 1
end
# disconnect mqtt
unsubscribe(client, mqttMsgReceiveTopic)
disconnect(client)
return (isDone=false, error="no response, timeout $timepass/$timeout, $attempts publish attempted",
return (success=false,
error="no response, timeout $timepass/$timeout, $(attempts-1) publish attempted",
response=nothing)
end
# function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
# timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
# mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
# mqttMsgReceiveChannel = Channel(4)
# # ask message receiver to send a message back to specified topic
# outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
# # Define the callback for receiving messages.
# function onMsgCallback(topic, payload)
# jobj = JSON3.read(String(payload))
# onMsg = copy(jobj)
# put!(mqttMsgReceiveChannel, onMsg)
# end
# # Instantiate a client and connection.
# client, connection = MakeConnection(outgoingMsg[:msgMeta][:mqttBrokerAddress],
# outgoingMsg[:msgMeta][:mqttBrokerPort])
# connect(client, connection)
# subscribe(client, mqttMsgReceiveTopic, onMsgCallback, qos=QOS_1)
# timepass = nothing
# attempts = 1
# while attempts <= maxattempt
# publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
# starttime = Dates.now()
# while true
# timepass = timedifference(starttime, Dates.now(), "seconds")
# if timepass <= timeout
# if isready(mqttMsgReceiveChannel)
# incomingMsg = take!(mqttMsgReceiveChannel)
# if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
# if msgPurpose == "NACK"
# break # resend the msg
# else
# # disconnect mqtt
# unsubscribe(client, mqttMsgReceiveTopic)
# disconnect(client)
# return (success=true, error=nothing, response=incomingMsg[:payload])
# end
# end
# end
# else
# break
# end
# sleep(1)
# end
# println("attempts $attempts ", @__FILE__, " ", @__LINE__)
# attempts += 1
# end
# # disconnect mqtt
# unsubscribe(client, mqttMsgReceiveTopic)
# disconnect(client)
# return (success=false,
# error="no response, timeout $timepass/$timeout, $attempts publish attempted",
# response=nothing)
# end
""" Send a message to specified MQTT topic then wait for reply.
# Arguments
- `outgoingMsg::Dict`
an outgoing message
# Return
- `result::NamedTuple`
( success= true, error= nothing )
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, UUIDs
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= string(uuid4()),
mqttBrokerAddress= "test.mosquitto.org",
mqttBrokerPort= 1883,
)
julia> outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict("hello"=> "World"),
)
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
```
# TODO
- [TESTING] implement crude version of this function
- [] transfer multiple msg with the same mqtt connection. The transfer loop should be
implemented by passing mqttInstance into sendReceiveMqttMsg() so that
1-connection is used to send all data
# Signature
"""
function transferDataOverMQTT_sender(disintegrateF::Function, data, partsize, msgMeta)
# use disintegrateF(workDict, data) to disintegrate data into small parts
d = disintegrateF(data, partsize)
# compute hash value for each piece of data
hashes = [hash(d[i]) for i in 1:d[:totalparts]]
for i in 1:totalpart
push!(hashes, hash(d[i]))
end
mqttMsgReceiveTopic = "/GeneralUtils_transferDataOverMQTT/$(msgMeta[:senderId])"
mqttMsgReceiveChannel = (ch1=Channel(8),)
keepaliveChannel = Channel(8)
# Define the callback for receiving messages.
function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel[:ch1], onMsg)
end
mqttInstance = GeneralUtils.mqttClientInstance_v2(
msgMeta[:mqttBrokerAddress],
msgMeta[:mqttBrokerPort],
[mqttMsgReceiveTopic],
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback
)
# 1st msg to send metadata
msgMeta[:replyTopic] = mqttMsgReceiveTopic # receiver() to send a message back to this topic
msgMeta[:multiMsgData] = "start"
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:datatype=> d[:datatype],
:totalparts=> d[:totalparts],
:partsize=> d[:partsize],
:hashes=> hashes
)
)
pubresult = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=10, maxattempt=3)
pubresult[:success] == true ? nothing : return pubresult
# while loop for sending the data
msgMeta[:multiMsgData] = "transfering"
partnumber = 1
while partnumber <= d[:totalparts]
# publish
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:partnumber=> partnumber,
:data=> d[:data][partnumber],
)
)
pubresult = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=10, maxattempt=3)
pubresult[:success] == true ? nothing : return pubresult
partnumber += 1
sleep(1)
end
# final msg to send end msg
msgMeta[:multiMsgData] = "end"
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict()
)
pubresult = sendReceiveMqttMsg(outgoingMsg; timeout=10, maxattempt=3)
pubresult[:success] == true ? nothing : return pubresult
return (success=true, error=nothing, response=nothing)
end
""" Send a message to specified MQTT topic then wait for reply.
# Arguments
- `outgoingMsg::Dict`
an outgoing message
# Return
- `result::NamedTuple`
( success= true, error= nothing )
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, UUIDs
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= string(uuid4()),
mqttBrokerAddress= "test.mosquitto.org",
mqttBrokerPort= 1883,
)
julia> outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict("hello"=> "World"),
)
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
```
# TODO
- [WORKING]
# Signature
"""
function transferDataOverMQTT_receiver()
end

View File

@@ -6,7 +6,7 @@ export noNegative!, randomWithProb, randomChoiceWithProb, findIndex, limitvalue,
matMul_3Dto4D_batchwise, isNotEqual, linearToCartesian, vectorMax,
multiply_last, multiplyRandomElements, replaceElements, replaceElements!, isBetween,
isLess, allTrue, getStringBetweenCharacters, JSON3read_stringKey, mkDictPath!,
getDictPath, dataframeToCSV, dfToJSONRows
getDictPath
using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames, CSV
using ..util, ..communication
@@ -1149,63 +1149,6 @@ function getDictPath(dict::Dict, keys::Vector)
end
""" Convert a dataframe into CSV.
# Arguments
- `df::DataFrame`
A connection object to Postgres database
# Return
- `result::String`
# Example
```jldoctest
julia> using DataFrames, GeneralUtils
julia> df = DataFrame(A=1:3, B=5:7, fixed=1)
julia> result = GeneralUtils.dataframeToCSV(df)
```
# Signature
"""
function dataframeToCSV(df::DataFrame)
# Create an IOBuffer to capture the output
io = IOBuffer()
CSV.write(io, df)
dfStr = String(take!(io))
return dfStr
end
""" Convert a DataFrame into a list of JSON rows.
# Arguments
- `df::DataFrame`
The input DataFrame to be converted.
# Return
- `rows::Vector{Dict{String, Any}}`
A vector of dictionaries, where each dictionary represents a row in JSON format.
# Example
```jldoctest
julia> using DataFrame, JSON3
julia> df = DataFrame(A = [1, 2, 3], B = ["apple", "banana", "cherry"])
julia> json_rows = dfToJSONRows(df)
```
# Signature
"""
function dfToJSONRows(df::DataFrame)
rows = []
for row in eachrow(df)
json_row = Dict{String, Any}()
for col in names(df)
json_row[col] = row[col]
end
push!(rows, json_row)
end
return rows
end

View File

@@ -1,9 +1,10 @@
module util
export timedifference, showstracktrace, findHighestIndexKey, uuid4snakecase, replaceDictKeys,
findMatchingDictKey, textToDict, randstring, randstrings, timeout
findMatchingDictKey, textToDict, randstring, randstrings, timeout,
dataframeToCSV, dfToVectorDict, disintegrate_vectorDict
using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient
using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames
# ---------------------------------------------- 100 --------------------------------------------- #
@@ -412,6 +413,131 @@ function timeout(f::Function, timeoutwindow::Integer; fargs=nothing, timeoutmsg=
timeoutmsg
end
end
""" Convert a dataframe into CSV.
# Arguments
- `df::DataFrame`
A connection object to Postgres database
# Return
- `result::String`
# Example
```jldoctest
julia> using DataFrames, GeneralUtils
julia> df = DataFrame(A=1:3, B=5:7, fixed=1)
julia> result = GeneralUtils.dataframeToCSV(df)
```
# Signature
"""
function dataframeToCSV(df::DataFrame)
# Create an IOBuffer to capture the output
io = IOBuffer()
CSV.write(io, df)
dfStr = String(take!(io))
return dfStr
end
""" Convert a DataFrame into a list of Dict rows.
# Arguments
- `df::DataFrame`
The input DataFrame to be converted.
# Return
- `rows::Vector{Dict{String, Any}}`
A vector of dictionaries, where each dictionary represents a row in a dataframe.
# Example
```jldoctest
julia> using DataFrames, JSON3, GeneralUtils
julia> df = DataFrame(A = [1, 2, 3], B = ["apple", "banana", "cherry"])
julia> vectorDict = GeneralUtils.dfToVectorDict(df)
[Dict{String, Any}("B" => "apple", "A" => 1),
Dict{String, Any}("B" => "banana", "A" => 2)
Dict{String, Any}("B" => "cherry", "A" => 3)]
```
# Signature
"""
function dfToVectorDict(df::DataFrame)
vec = []
for row in eachrow(df)
d = Dict{String, Any}()
for col in names(df)
d[col] = row[col]
end
push!(vec, d)
end
return vec
end
""" Turn a large vector of dictionaries into smaller one
# Arguments
- `data`
data to be partioning
- `partsize`
how many dicts per part
# Return
- `parts`
a dictionay of parts
# Example
```jldoctest
julia> using GeneralUtils, Dates, JSON3, UUIDs
julia> vecDict = [Dict("a" => i) for i in 1:10]
julia> d = GeneralUtils.disintegrate_vectorDict(vecDict, 3)
julia> println(d[:data])
Dict{Int64, Vector{Dict}} with 4 entries:
1 => [Dict("a"=>1), Dict("a"=>2), Dict("a"=>3)]
2 => [Dict("a"=>4), Dict("a"=>5), Dict("a"=>6)]
3 => [Dict("a"=>7), Dict("a"=>8), Dict("a"=>9)]
4 => [Dict("a"=>10)]
```
# Signature
"""
function disintegrate_vectorDict(data::Vector{Dict{T1, T2}}, partsize::Integer
) where {T1<:Any, T2<:Any}
parts = Dict{Int, Vector{Dict}}()
for (i, dict) in enumerate(data)
partkey = (i - 1) ÷ partsize + 1
if !haskey(parts, partkey)
parts[partkey] = Vector{Dict}()
end
push!(parts[partkey], dict)
end
return (datatype="vector{Dict}", totalparts=length(parts), partsize=partsize, data=parts)
end