939 lines
29 KiB
Julia
939 lines
29 KiB
Julia
module communication
|
|
|
|
export generate_msgMeta, isMqttConnectionAlive, checkMqttConnection!,
|
|
sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2,
|
|
dataTransferOverMQTT_sender, dataTransferOverMQTT_receiver
|
|
|
|
using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames,
|
|
SHA, PrettyPrinting
|
|
using ..util
|
|
|
|
# ---------------------------------------------- 100 --------------------------------------------- #
|
|
|
|
abstract type mqttClientInstance end
|
|
|
|
mutable struct mqttClientInstance_v1 <: mqttClientInstance
|
|
mqttBrokerAddress::String
|
|
mqttBrokerPort::Integer
|
|
subtopic::Vector{String}
|
|
receiveDataChannel::Channel
|
|
onMsgCallback::Function
|
|
qos::MQTTClient.QOS
|
|
client::MQTTClient.Client
|
|
connection::MQTTClient.Connection
|
|
keepalivetopic::String
|
|
keepaliveChannel::Channel
|
|
keepaliveCheckInterval::Integer # second
|
|
lastTimeMqttConnCheck::Union{DateTime, Nothing}
|
|
end
|
|
|
|
function mqttClientInstance_v1(
|
|
mqttBrokerAddress::String,
|
|
mqttBrokerPort::Integer,
|
|
subtopic::Vector{String},
|
|
receiveDataChannel::Channel,
|
|
keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback()
|
|
onMsgCallback::Function
|
|
;
|
|
keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
|
|
keepaliveCheckInterval::Integer=30,
|
|
qos::MQTTClient.QOS=QOS_1,
|
|
)
|
|
|
|
client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
|
|
MQTTClient.connect(client, connection)
|
|
for i in subtopic
|
|
MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
|
|
end
|
|
MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos)
|
|
|
|
instance = mqttClientInstance(
|
|
mqttBrokerAddress,
|
|
mqttBrokerPort,
|
|
subtopic,
|
|
receiveDataChannel,
|
|
onMsgCallback,
|
|
qos,
|
|
client,
|
|
connection,
|
|
keepalivetopic,
|
|
keepaliveChannel,
|
|
keepaliveCheckInterval,
|
|
Dates.now(),
|
|
)
|
|
|
|
return instance
|
|
end
|
|
|
|
|
|
mutable struct mqttClientInstance_v2 <: mqttClientInstance
|
|
mqttBrokerAddress::String
|
|
mqttBrokerPort::Integer
|
|
subtopic::Vector{String}
|
|
msgReceiveChannel
|
|
onMsgCallback::Function
|
|
qos::MQTTClient.QOS
|
|
client::MQTTClient.Client
|
|
connection::MQTTClient.Connection
|
|
keepalivetopic::String
|
|
keepaliveChannel::Channel
|
|
keepaliveCheckInterval::Integer # second
|
|
lastTimeMqttConnCheck::Union{DateTime, Nothing}
|
|
multiMsg::String # "single", "metadata", "datapart"
|
|
latestMsg
|
|
end
|
|
""" MQTT client v2. The difference between client v1 and v2 is v2 allows multiple
|
|
msgReceiveChannel without having to redefine mqttClientInstance again like in v1.
|
|
This is achieved by using msgReceiveChannel::NamedTuple to store multiple channel.
|
|
# Arguments
|
|
- `mqttBrokerAddress`
|
|
MQTT broker address. Can be URL or IP address
|
|
- `subtopic`
|
|
A list of topic to be subscribed Ex. ["/testopic_1", "/testopic_2"]
|
|
- `msgReceiveChannel`
|
|
A storage the mqtt client used to store incoming message.
|
|
Ex. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(32))
|
|
- `keepaliveChannel`
|
|
A channel to store keepalive message
|
|
- `onMsgCallback`
|
|
A user-defined function to handle incoming message
|
|
|
|
# Keyword Arguments
|
|
- `mqttBrokerPort`
|
|
mqtt broker port
|
|
- `keepalivetopic`
|
|
A topic where keepalive message going
|
|
- `keepaliveCheckInterval`
|
|
A time interval to check whether the mqtt client still connect to mqtt broker
|
|
- `qos`
|
|
Quality of Service. Can be QOS_0, QOS_1, QOS_2
|
|
|
|
# Return
|
|
- `mqttInstance`
|
|
A new mqtt client
|
|
|
|
# Example
|
|
```jldoctest
|
|
julia> using Revise
|
|
julia> using GeneralUtils, Dates, JSON3, UUIDs
|
|
julia> mqttMsgReceiveTopic = ["/receivetopic_1", "/receivetopic_2"]
|
|
julia> mqttMsgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) # single channel Ex. (ch1=Channel(8),)
|
|
julia> keepaliveChannel = Channel(8)
|
|
julia> function onMsgCallback(topic, payload)
|
|
jobj = JSON3.read(String(payload))
|
|
incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively
|
|
|
|
if occursin("topic_1", topic)
|
|
put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
|
|
elseif occursin("topic_2", topic)
|
|
put!(mqttMsgReceiveChannel[:ch2], incomingMqttMsg)
|
|
elseif occursin("keepalive", topic)
|
|
put!(keepaliveChannel, incomingMqttMsg)
|
|
else
|
|
println("undefined condition ", @__FILE__, " ", @__LINE__)
|
|
end
|
|
end
|
|
julia> mqttInstance = GeneralUtils.mqttClientInstance_v2(
|
|
"mqtt.yiem.cc",
|
|
mqttMsgReceiveTopic,
|
|
mqttMsgReceiveChannel,
|
|
keepaliveChannel,
|
|
onMsgCallback
|
|
)
|
|
```
|
|
|
|
# Signature
|
|
"""
|
|
function mqttClientInstance_v2(
|
|
mqttBrokerAddress::String,
|
|
subtopic::Vector{String},
|
|
msgReceiveChannel, # NamedTuple of channels i.e. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(8))
|
|
keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
|
|
onMsgCallback::Function
|
|
;
|
|
mqttBrokerPort::Integer=1883,
|
|
keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
|
|
keepaliveCheckInterval::Integer=30,
|
|
qos::MQTTClient.QOS=QOS_1,
|
|
multiMsg::String="single",
|
|
)
|
|
|
|
client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
|
|
MQTTClient.connect(client, connection)
|
|
for i in subtopic
|
|
MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
|
|
end
|
|
MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos)
|
|
|
|
instance = mqttClientInstance_v2(
|
|
mqttBrokerAddress,
|
|
mqttBrokerPort,
|
|
subtopic,
|
|
msgReceiveChannel,
|
|
onMsgCallback,
|
|
qos,
|
|
client,
|
|
connection,
|
|
keepalivetopic,
|
|
keepaliveChannel,
|
|
keepaliveCheckInterval,
|
|
nothing,
|
|
multiMsg,
|
|
nothing, # store latest message to prevent doing the same massage twice
|
|
)
|
|
|
|
return instance
|
|
end
|
|
|
|
|
|
|
|
""" Generate msgMeta to be including in a message. So the message receiver know
|
|
what to do with the message.
|
|
|
|
# Arguments
|
|
- `sendTopic`
|
|
topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
|
|
|
|
# Keyword Arguments
|
|
- `msgPurpose`
|
|
purpose of this message e.g. "updateStatus"
|
|
- `senderName`
|
|
sender name (String) e.g. "agent-wine-web-frontend"
|
|
- `senderId`
|
|
sender id e.g. uuid4snakecase()
|
|
- `receiverName`
|
|
msg receiver name (String) e.g. "agent-backend"
|
|
- `receiverId`
|
|
msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
|
|
- `requestresponse`
|
|
this message is a "request" or "response"
|
|
- `getpost`
|
|
this message is a "get" or "post"
|
|
- `msgId`
|
|
message ID e.g. uuid4snakecase()
|
|
- `timestamp`
|
|
message published timestamp e.g. string(Dates.now())
|
|
- `senderselfnote`
|
|
help sender manage incoming reply msg, could be "for text inference", "img generation"
|
|
- `replyTopic`
|
|
sender ask receiver to reply to this topic e.g. "/agent/frontend"
|
|
- `replyToMsgId`
|
|
sender is responding to this msg ID e.g. uuid4snakecase()
|
|
- `acknowledgestatus`
|
|
e.g. "ACK", "NACK"
|
|
- `mqttBrokerAddress`
|
|
MQTT broker URI. e.g. "test.mosquitto.org"
|
|
- `mqttBrokerPort`
|
|
MQTT broker port
|
|
|
|
- `msgFormatVersion`
|
|
|
|
# Return
|
|
- `msgMeta`
|
|
|
|
# Example
|
|
```jldoctest
|
|
julia> using Revise
|
|
julia> using GeneralUtils
|
|
julia> msgMeta = GeneralUtils.generate_msgMeta(
|
|
"/agent/frontend/wine/chat/api/v1/txt/receive",
|
|
msgPurpose="keepalive",
|
|
senderName="keepaliveservice")
|
|
```
|
|
|
|
# Signature
|
|
"""
|
|
function generate_msgMeta(
|
|
sendTopic::T1, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
|
|
;
|
|
msgPurpose::T1= "nothing", # purpose of this message e.g. "updateStatus"
|
|
senderName::T1= "nothing", # sender name (String) e.g. "agent-wine-web-frontend"
|
|
senderId::T1= "nothing", # sender id e.g. uuid4snakecase()
|
|
receiverName::T1= "nothing", # msg receiver name (String) e.g. "agent-backend"
|
|
receiverId::T1= "nothing", # id of msg receiver, nothing means everyone in the topic e.g. uuid4snakecase()
|
|
requestresponse::T1= "nothing", # this message is a "request" or "response"
|
|
getpost::T1= "nothing", # this message is a "get" or "post"
|
|
msgId::T1= uuid4snakecase(),
|
|
timestamp= string(Dates.now()), # message published timestamp
|
|
|
|
# help sender manage incoming reply msg, could be "for text inference", "img generation"
|
|
senderselfnote::Any= "nothing",
|
|
|
|
# sender ask receiver to reply to this topic
|
|
# e.g. "/agent/frontend/wine/chat/api/v1/txt/receive"
|
|
replyTopic::T1= "nothing",
|
|
replyToMsgId::T1= "nothing", # sender is responding to this msg ID
|
|
acknowledgestatus::T1= "nothing", # "ACK", "NACK"
|
|
|
|
mqttBrokerAddress::T1= "test.mosquitto.org",
|
|
mqttBrokerPort::Integer= 1883,
|
|
msgFormatVersion::T1= "nothing",
|
|
|
|
)::Dict{Symbol, Any} where {T1<:AbstractString}
|
|
|
|
msgMeta::Dict=Dict(
|
|
:sendTopic=> sendTopic, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
|
|
:msgPurpose=> msgPurpose, # purpose of this message e.g. "updateStatus"
|
|
:senderName=> senderName, # sender name (String) e.g. "agent-wine-web-frontend"
|
|
:senderId=> senderId, # sender id e.g. uuid4snakecase()
|
|
:receiverName=> receiverName, # msg receiver name (String) e.g. "agent-backend"
|
|
:receiverId=> receiverId, # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
|
|
:requestResponse=> requestresponse, # this message is a "request" or "response"
|
|
:getPost=> getpost, # this message is a "get" or "post"
|
|
:msgId=> msgId,
|
|
:timeStamp=> timestamp, # message published timestamp
|
|
|
|
# help sender manage incoming reply msg, could be "for text inference", "img generation"
|
|
:senderSelfnote=>senderselfnote,
|
|
|
|
# sender ask receiver to reply to this topic
|
|
# e.g. "/agent/frontend/wine/chat/api/v1/txt/receive"
|
|
:replyTopic=> replyTopic,
|
|
|
|
:replyToMsgId=> replyToMsgId,
|
|
:acknowledgestatus=> acknowledgestatus,
|
|
|
|
:mqttBrokerAddress=> mqttBrokerAddress,
|
|
:mqttBrokerPort=> mqttBrokerPort,
|
|
|
|
:msgFormatVersion=> msgFormatVersion,
|
|
)
|
|
|
|
return msgMeta
|
|
end
|
|
|
|
|
|
|
|
""" Check mqtt server connection.
|
|
|
|
Arguments\n
|
|
-----
|
|
mqttInstanceDict::Dict{Symbol, Any}
|
|
a dictionary contain mqtt instance. 1 per mqtt client.
|
|
interval::Integer
|
|
time interval to check mqtt server in seconds
|
|
|
|
Return\n
|
|
-----
|
|
isconnectionalive::Bool
|
|
true if mqtt connection is alive
|
|
|
|
Example\n
|
|
-----
|
|
```jldoctest
|
|
julia> using Revise
|
|
julia> using GeneralUtils, Dates, JSON3, MQTTClient
|
|
julia> GeneralUtils.isMqttConnectionAlive(mqttInstance)
|
|
true
|
|
```
|
|
|
|
Signature\n
|
|
-----
|
|
"""
|
|
function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance}
|
|
|
|
starttime = Dates.now()
|
|
isconnectionalive = false
|
|
|
|
# ditch old keepalive msg is any
|
|
while isready(mqttInstance.keepaliveChannel)
|
|
_ = take!(mqttInstance.keepaliveChannel)
|
|
end
|
|
|
|
msgMeta = generate_msgMeta(
|
|
mqttInstance.keepalivetopic,
|
|
msgPurpose= "keepalive",
|
|
)
|
|
|
|
keepaliveMsg = Dict(
|
|
:msgMeta=> msgMeta,
|
|
:payload=> Dict(
|
|
:text=>"keepalive",
|
|
)
|
|
)
|
|
|
|
publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTopic],
|
|
JSON3.write(keepaliveMsg))
|
|
|
|
timediff = 0
|
|
while timediff < 5
|
|
timediff = timedifference(starttime, Dates.now(), "seconds")
|
|
if isready(mqttInstance.keepaliveChannel)
|
|
incomingMsg = take!(mqttInstance.keepaliveChannel)
|
|
if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId]
|
|
# connection is alive
|
|
isconnectionalive = true
|
|
break
|
|
end
|
|
end
|
|
sleep(1)
|
|
end
|
|
return isconnectionalive
|
|
end
|
|
|
|
|
|
""" Check mqtt server connection, reconnect if disconnected.
|
|
|
|
Arguments\n
|
|
-----
|
|
mqttInstanceDict::Dict{Symbol, Any}
|
|
a dictionary contain mqtt instance. 1 per mqtt client.
|
|
interval::Integer
|
|
time interval to check mqtt server in seconds
|
|
|
|
Return\n
|
|
-----
|
|
isreconnect::Bool
|
|
true if mqtt connection is reconnected
|
|
|
|
Example\n
|
|
-----
|
|
```jldoctest
|
|
julia> using Revise
|
|
julia> using GeneralUtils, Dates, JSON3
|
|
julia> GeneralUtils.checkMqttConnection!(mqttInstance, 5)
|
|
```
|
|
|
|
Signature\n
|
|
-----
|
|
"""
|
|
function checkMqttConnection!(mqttInstance::T;
|
|
keepaliveCheckInterval::Union{Integer, Nothing}=nothing) where {T<:mqttClientInstance}
|
|
|
|
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
|
|
|
|
# check if mqtt connection is still up
|
|
intervaldiff =
|
|
if mqttInstance.lastTimeMqttConnCheck !== nothing
|
|
timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds")
|
|
else
|
|
Inf
|
|
end
|
|
|
|
isreconnect = false # this value is true if connection is disconnected
|
|
if intervaldiff > interval
|
|
while true
|
|
mqttConnStatus = isMqttConnectionAlive(mqttInstance)
|
|
if mqttConnStatus == false
|
|
isreconnect = true
|
|
println("mqtt connection disconnected, attemping to reconnect $(Dates.now())")
|
|
# use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
|
|
mqttInstance.client, mqttInstance.connection =
|
|
MakeConnection(mqttInstance.mqttBrokerAddress,
|
|
mqttInstance.mqttBrokerPort)
|
|
connect(mqttInstance.client, mqttInstance.connection)
|
|
for topic in mqttInstance.subtopic
|
|
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
|
end
|
|
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
|
sleep(1) # wait before checking connection again
|
|
else
|
|
mqttInstance.lastTimeMqttConnCheck = Dates.now()
|
|
if isreconnect
|
|
println("connected to mqtt broker")
|
|
end
|
|
return isreconnect
|
|
end
|
|
end
|
|
else
|
|
return nothing # still within check interval, no update on connection status yet
|
|
end
|
|
end
|
|
|
|
|
|
""" Send a message to specified MQTT topic then wait for reply.
|
|
|
|
# Arguments
|
|
- `outgoingMsg::Dict`
|
|
an outgoing message
|
|
|
|
# Return
|
|
- `result::NamedTuple`
|
|
( success= true, error= nothing )
|
|
|
|
# Example
|
|
```jldoctest
|
|
julia> using Revise
|
|
julia> using GeneralUtils, Dates, JSON3, UUIDs
|
|
julia> msgMeta = GeneralUtils.generate_msgMeta(
|
|
"/testtopic",
|
|
senderName= "somename",
|
|
senderId= uuid4snakecase(),
|
|
mqttBrokerAddress= "test.mosquitto.org",
|
|
mqttBrokerPort= 1883,
|
|
)
|
|
julia> outgoingMsg = Dict(
|
|
:msgMeta=> msgMeta,
|
|
:payload=> Dict("hello"=> "World"),
|
|
)
|
|
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
|
|
```
|
|
|
|
# Signature
|
|
"""
|
|
function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
|
|
|
|
# sendMqttMsg() doesn't need to receive msg but mqttClientInstance_v2 requires to have receive topic
|
|
mqttMsgReceiveTopic = "/GeneralUtils_sendMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
|
|
|
|
mqttMsgReceiveChannel = (ch1=Channel(8),)
|
|
keepaliveChannel = Channel(8)
|
|
|
|
# Define the callback for receiving messages.
|
|
function onMsgCallback(topic, payload)
|
|
jobj = JSON3.read(String(payload))
|
|
onMsg = copy(jobj)
|
|
put!(mqttMsgReceiveChannel[:ch1], onMsg)
|
|
end
|
|
|
|
mqttInstance = mqttClientInstance_v2(
|
|
outgoingMsg[:msgMeta][:mqttBrokerAddress],
|
|
[mqttMsgReceiveTopic],
|
|
mqttMsgReceiveChannel,
|
|
keepaliveChannel,
|
|
onMsgCallback;
|
|
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
|
|
)
|
|
|
|
response = sendMqttMsg(mqttInstance, outgoingMsg)
|
|
try disconnect(mqttInstance.client) catch end
|
|
|
|
return response
|
|
end
|
|
function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T}
|
|
)::NamedTuple where {T<:Any}
|
|
try
|
|
publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
|
|
return (success=true, error=nothing)
|
|
catch e
|
|
return (success=false, error=e)
|
|
end
|
|
end
|
|
|
|
|
|
""" Send a message to specified MQTT topic then wait for reply.
|
|
|
|
# Arguments
|
|
- `outgoingMsg::Dict{Symbol, T}`
|
|
an outgoing message
|
|
|
|
# Keyword Arguments
|
|
- `timeout::Integer`
|
|
time to wait for a response before error
|
|
|
|
# Return
|
|
- `result::NamedTuple`
|
|
(
|
|
success= true, # idicates whether sending MQTT message successful
|
|
error= nothing # error message
|
|
response= somemessage # response message
|
|
)
|
|
|
|
# Example
|
|
```jldoctest
|
|
julia> using Revise
|
|
julia> using GeneralUtils, Dates, UUIDs, JSON3
|
|
julia> msgMeta = GeneralUtils.generate_msgMeta(
|
|
"/testtopic",
|
|
senderName= "somename",
|
|
senderId= uuid4snakecase(),
|
|
mqttBrokerAddress= "mqtt.yiem.cc",
|
|
mqttBrokerPort= 1883,
|
|
)
|
|
julia> outgoingMsg = Dict(
|
|
:msgMeta=> msgMeta,
|
|
:payload=> Dict(:hello=> "World"),
|
|
)
|
|
julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
|
|
```
|
|
|
|
# TODO
|
|
- [] update docs
|
|
|
|
# Signature
|
|
"""
|
|
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
|
timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
|
|
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
|
|
mqttMsgReceiveChannel = (ch1=Channel(8),)
|
|
keepaliveChannel = Channel(8)
|
|
outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
|
|
|
|
# Define the callback for receiving messages.
|
|
function onMsgCallback(topic, payload)
|
|
jobj = JSON3.read(String(payload))
|
|
onMsg = copy(jobj)
|
|
put!(mqttMsgReceiveChannel[:ch1], onMsg)
|
|
end
|
|
|
|
mqttInstance = mqttClientInstance_v2(
|
|
outgoingMsg[:msgMeta][:mqttBrokerAddress],
|
|
[mqttMsgReceiveTopic],
|
|
mqttMsgReceiveChannel,
|
|
keepaliveChannel,
|
|
onMsgCallback;
|
|
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
|
|
)
|
|
|
|
response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt)
|
|
try disconnect(mqttInstance.client) catch end
|
|
|
|
return response
|
|
end
|
|
|
|
function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol,
|
|
outgoingMsg::Dict{Symbol, T}; timeout::Integer=60, maxattempt::Integer=1
|
|
)::NamedTuple where {T<:Any}
|
|
|
|
timepass = nothing
|
|
attempts = 0
|
|
while attempts <= maxattempt
|
|
attempts += 1
|
|
if attempts > 1
|
|
println("\nsendReceiveMqttMsg() attempts $attempts ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
pprintln(outgoingMsg)
|
|
println("--------------\n")
|
|
end
|
|
|
|
sendMqttMsg(mqttInstance, outgoingMsg)
|
|
|
|
starttime = Dates.now()
|
|
while true
|
|
timepass = timedifference(starttime, Dates.now(), "seconds")
|
|
|
|
if timepass <= timeout
|
|
if isready(mqttInstance.msgReceiveChannel[receivechannel])
|
|
incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel])
|
|
|
|
if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
|
|
if incomingMsg[:msgMeta][:acknowledgestatus] == "NACK"
|
|
println("NACK")
|
|
break # resend the msg
|
|
else
|
|
return (success=true, error=nothing, response=incomingMsg[:payload])
|
|
end
|
|
end
|
|
end
|
|
else
|
|
break
|
|
end
|
|
sleep(1)
|
|
end
|
|
end
|
|
|
|
return (success=false,
|
|
error="no response, timeout $timepass/$timeout, $(attempts-1) publish attempted",
|
|
response=nothing)
|
|
end
|
|
|
|
|
|
# function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
|
# timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
|
|
|
|
# mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
|
|
# mqttMsgReceiveChannel = Channel(4)
|
|
|
|
# # ask message receiver to send a message back to specified topic
|
|
# outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
|
|
|
|
# # Define the callback for receiving messages.
|
|
# function onMsgCallback(topic, payload)
|
|
# jobj = JSON3.read(String(payload))
|
|
# onMsg = copy(jobj)
|
|
# put!(mqttMsgReceiveChannel, onMsg)
|
|
# end
|
|
|
|
# # Instantiate a client and connection.
|
|
# client, connection = MakeConnection(outgoingMsg[:msgMeta][:mqttBrokerAddress],
|
|
# outgoingMsg[:msgMeta][:mqttBrokerPort])
|
|
# connect(client, connection)
|
|
# subscribe(client, mqttMsgReceiveTopic, onMsgCallback, qos=QOS_1)
|
|
|
|
# timepass = nothing
|
|
# attempts = 1
|
|
# while attempts <= maxattempt
|
|
# publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
|
|
|
|
# starttime = Dates.now()
|
|
# while true
|
|
# timepass = timedifference(starttime, Dates.now(), "seconds")
|
|
|
|
# if timepass <= timeout
|
|
# if isready(mqttMsgReceiveChannel)
|
|
# incomingMsg = take!(mqttMsgReceiveChannel)
|
|
|
|
# if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
|
|
# if msgPurpose == "NACK"
|
|
# break # resend the msg
|
|
# else
|
|
# # disconnect mqtt
|
|
# unsubscribe(client, mqttMsgReceiveTopic)
|
|
# disconnect(client)
|
|
# return (success=true, error=nothing, response=incomingMsg[:payload])
|
|
# end
|
|
# end
|
|
# end
|
|
# else
|
|
# break
|
|
# end
|
|
# sleep(1)
|
|
# end
|
|
# println("attempts $attempts ", @__FILE__, " ", @__LINE__)
|
|
# attempts += 1
|
|
# end
|
|
|
|
# # disconnect mqtt
|
|
# unsubscribe(client, mqttMsgReceiveTopic)
|
|
# disconnect(client)
|
|
# return (success=false,
|
|
# error="no response, timeout $timepass/$timeout, $attempts publish attempted",
|
|
# response=nothing)
|
|
# end
|
|
|
|
|
|
|
|
""" Send a message to specified MQTT topic then wait for reply.
|
|
|
|
# Arguments
|
|
- `outgoingMsg::Dict`
|
|
an outgoing message
|
|
|
|
# Return
|
|
- `result::NamedTuple`
|
|
( success= true, error= nothing )
|
|
|
|
# Example
|
|
```jldoctest
|
|
julia> using Revise
|
|
julia> using GeneralUtils, Dates, JSON3, UUIDs
|
|
julia> msgMeta = GeneralUtils.generate_msgMeta(
|
|
"/testtopic",
|
|
senderName= "somename",
|
|
senderId= uuid4snakecase(),
|
|
mqttBrokerAddress= "test.mosquitto.org",
|
|
mqttBrokerPort= 1883,
|
|
)
|
|
julia> outgoingMsg = Dict(
|
|
:msgMeta=> msgMeta,
|
|
:payload=> Dict("hello"=> "World"),
|
|
)
|
|
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
|
|
```
|
|
# TODO
|
|
- [x] implement crude version of this function
|
|
- [] update docs
|
|
- [] transfer multiple msg with the same mqtt connection. The transfer loop should be
|
|
implemented by passing mqttInstance into sendReceiveMqttMsg() so that
|
|
1-connection is used to send all data
|
|
- [] partsize should be calculate from the fact that MQTT max size is 4MB (256MB theorectically)
|
|
|
|
# Signature
|
|
"""
|
|
function dataTransferOverMQTT_sender(workDict::Dict, incomingMsg::Dict; data=nothing)
|
|
dataTransferSessionID = nothing
|
|
|
|
if !haskey(incomingMsg[:payload], :dataTransferSessionID)
|
|
""" Expected incomingMsg (without "dataTransferSessionID") for requesting metadata
|
|
|
|
Dict(:payload => Dict(:args => Dict(), :functioncall => "load_winetable"),
|
|
:msgMeta => Dict(:msgPurpose => nothing,
|
|
:receiverId => nothing,
|
|
:getpost => nothing,
|
|
:msgId => "7fe4b765_fa2e_4c2e_9df0_9cb97e84ec32",
|
|
:requestresponse => nothing,
|
|
:msgFormatVersion => nothing,
|
|
:timestamp => "Tue Oct 01 2024 16:22:12 GMT+0700 (เวลาอินโดจีน)",
|
|
:replyToMsgId => nothing,
|
|
:acknowledgestatus => nothing,
|
|
:sendTopic => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
|
|
:mqttBrokerPort => nothing,
|
|
:receiverName => "agent_wine_backend",
|
|
:replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
|
|
:mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
|
|
:senderName => "agent_wine_frontend",
|
|
:senderselfnote => nothing,
|
|
:senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
|
|
"""
|
|
|
|
if data !== nothing # if the user provide data, store it in a new session workspace
|
|
dataTransferSessionID = uuid4snakecase()
|
|
haskey(workDict, :dataTransferOverMQTT) ? nothing : workDict[:dataTransferOverMQTT] = Dict()
|
|
workDict[:dataTransferOverMQTT][dataTransferSessionID] = Dict()
|
|
s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
|
|
s[:dataTransferSessionID] = dataTransferSessionID
|
|
dataDict = Dict(pairs(data))
|
|
merge!(s, dataDict)
|
|
else
|
|
error("No data provided")
|
|
end
|
|
|
|
# compute hash value for each piece of data
|
|
s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
|
|
s[:hashes] = [bytes2hex(sha256(JSON3.write(s[:dataparts][i]))) for i in 1:s[:totalparts]]
|
|
|
|
# send metadata without data to receiver()
|
|
datainfo = Dict()
|
|
for (k, v) in s
|
|
if k ∉ [:dataparts]
|
|
datainfo[k] = v
|
|
end
|
|
end
|
|
|
|
d = Dict(
|
|
:result=> "dataTransferOverMQTT",
|
|
:dataTransferSessionID=> dataTransferSessionID,
|
|
Symbol(dataTransferSessionID)=> datainfo
|
|
)
|
|
|
|
""" Sending metadata to the requester
|
|
Dict(:result => "dataTransferOverMQTT",
|
|
:b57123aa_a4c6_4d07_ace2_4885005d51c6 => Dict(:partsize => 3,
|
|
:hashes => ["755fb25ae04890fa2a40c2b5fb5ddf05adeff6b934475998576bdf307359cd0f",
|
|
"b95ba78344ad409755a386b6ab0cb2c8ea490756ce2df401aa6e5f77cf445025",
|
|
"5b51277dd06ca255492cfc432824adf77e9e0de94179806cb130bef38e75da48"],
|
|
:datatype => "vector{Dict}",
|
|
:dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6",
|
|
:totalparts => 46),
|
|
:dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6")
|
|
"""
|
|
|
|
return d
|
|
|
|
else
|
|
|
|
""" Expected incomingMsg (with "dataTransferSessionID") for requesting a data part
|
|
Dict(:payload => Dict(:dataTransferSessionID => "5543f3b9_194e_4670_b961_e51c77955e40", :args => Dict(), :requestingDataPartNumber => 1, :functioncall => "load_winetable"),
|
|
:msgMeta => Dict(:msgPurpose => nothing,
|
|
:receiverId => nothing,
|
|
:getpost => nothing,
|
|
:msgId => "fbcc3bd9_c2bc_4bde_908a_dfe88ab7a305",
|
|
:requestresponse => nothing,
|
|
:msgFormatVersion => nothing,
|
|
:timestamp => "Tue Oct 01 2024 16:34:06 GMT+0700 (เวลาอินโดจีน)",
|
|
:replyToMsgId => nothing,
|
|
:acknowledgestatus => nothing,
|
|
:sendTopic => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
|
|
:mqttBrokerPort => nothing,
|
|
:receiverName => "agent_wine_backend",
|
|
:replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
|
|
:mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
|
|
:senderName => "agent_wine_frontend",
|
|
:senderselfnote => nothing,
|
|
:senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
|
|
"""
|
|
|
|
# check transfer session id
|
|
dataTransferSessionID = incomingMsg[:payload][:dataTransferSessionID]
|
|
clientRequestingDataPartNumber =
|
|
incomingMsg[:payload][:requestingDataPartNumber]
|
|
s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
|
|
|
|
d = Dict(
|
|
:result=> "dataTransferOverMQTT",
|
|
:dataTransferSessionID=> dataTransferSessionID,
|
|
Symbol(dataTransferSessionID)=> Dict(
|
|
:dataPartNumber=> clientRequestingDataPartNumber,
|
|
:datapart=> s[:dataparts][clientRequestingDataPartNumber]
|
|
),
|
|
)
|
|
|
|
return d
|
|
end
|
|
|
|
end
|
|
|
|
""" Send a message to specified MQTT topic then wait for reply.
|
|
|
|
# Arguments
|
|
- `outgoingMsg::Dict`
|
|
an outgoing message
|
|
|
|
# Return
|
|
- `result::NamedTuple`
|
|
( success= true, error= nothing )
|
|
|
|
# Example
|
|
```jldoctest
|
|
julia> using Revise
|
|
julia> using GeneralUtils, Dates, JSON3, UUIDs
|
|
julia> msgMeta = GeneralUtils.generate_msgMeta(
|
|
"/testtopic",
|
|
senderName= "somename",
|
|
senderId= uuid4snakecase(),
|
|
mqttBrokerAddress= "test.mosquitto.org",
|
|
mqttBrokerPort= 1883,
|
|
)
|
|
julia> outgoingMsg = Dict(
|
|
:msgMeta=> msgMeta,
|
|
:payload=> Dict("hello"=> "World"),
|
|
)
|
|
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
|
|
```
|
|
# TODO
|
|
- [PENDING]
|
|
|
|
# Signature
|
|
"""
|
|
function dataTransferOverMQTT_receiver()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
end # module communication |