928 lines
30 KiB
Julia
928 lines
30 KiB
Julia
module communication
|
|
|
|
export generate_msgMeta
|
|
# isMqttConnectionAlive, checkMqttConnection!,
|
|
# sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2,
|
|
# dataTransferOverMQTT_sender, dataTransferOverMQTT_receiver
|
|
|
|
using JSON, DataStructures, Distributions, Random, Dates, UUIDs, DataFrames,
|
|
SHA, PrettyPrinting
|
|
using ..util
|
|
#[PENDING] update code to use JSON
|
|
# ---------------------------------------------- 100 --------------------------------------------- #
|
|
|
|
# abstract type mqttClientInstance end
|
|
|
|
# mutable struct mqttClientInstance_v1 <: mqttClientInstance
|
|
# mqttBrokerAddress::String
|
|
# mqttBrokerPort::Integer
|
|
# subtopic::Vector{String}
|
|
# receiveDataChannel::Channel
|
|
# onMsgCallback::Function
|
|
# qos::MQTTClient.QOS
|
|
# client::MQTTClient.Client
|
|
# connection::MQTTClient.Connection
|
|
# keepalivetopic::String
|
|
# keepaliveChannel::Channel
|
|
# keepaliveCheckInterval::Integer # second
|
|
# lastTimeMqttConnCheck::Union{DateTime, Nothing}
|
|
# end
|
|
|
|
# function mqttClientInstance_v1(
|
|
# mqttBrokerAddress::String,
|
|
# mqttBrokerPort::Integer,
|
|
# subtopic::Vector{String},
|
|
# receiveDataChannel::Channel,
|
|
# keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback()
|
|
# onMsgCallback::Function
|
|
# ;
|
|
# keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
|
|
# keepaliveCheckInterval::Integer=30,
|
|
# qos::MQTTClient.QOS=QOS_1,
|
|
# )
|
|
|
|
# client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
|
|
# MQTTClient.connect(client, connection)
|
|
# for i in subtopic
|
|
# MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
|
|
# end
|
|
# MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos)
|
|
|
|
# instance = mqttClientInstance(
|
|
# mqttBrokerAddress,
|
|
# mqttBrokerPort,
|
|
# subtopic,
|
|
# receiveDataChannel,
|
|
# onMsgCallback,
|
|
# qos,
|
|
# client,
|
|
# connection,
|
|
# keepalivetopic,
|
|
# keepaliveChannel,
|
|
# keepaliveCheckInterval,
|
|
# Dates.now(),
|
|
# )
|
|
|
|
# return instance
|
|
# end
|
|
|
|
|
|
# mutable struct mqttClientInstance_v2 <: mqttClientInstance
|
|
# mqttBrokerAddress::String
|
|
# mqttBrokerPort::Integer
|
|
# subtopic::Vector{String}
|
|
# msgReceiveChannel
|
|
# onMsgCallback::Function
|
|
# qos::MQTTClient.QOS
|
|
# client::MQTTClient.Client
|
|
# clientId::String
|
|
# connection::MQTTClient.Connection
|
|
# keepalivetopic::String
|
|
# keepaliveChannel::Channel
|
|
# keepaliveCheckInterval::Integer # second
|
|
# lastTimeMqttConnCheck::Union{DateTime, Nothing}
|
|
# multiMsg::String # "single", "metadata", "datapart"
|
|
# latestMsg
|
|
# end
|
|
# """ MQTT client v2. The difference between client v1 and v2 is v2 allows multiple
|
|
# msgReceiveChannel without having to redefine mqttClientInstance again like in v1.
|
|
# This is achieved by using msgReceiveChannel::NamedTuple to store multiple channel.
|
|
# # Arguments
|
|
# - `mqttBrokerAddress`
|
|
# MQTT broker address. Can be URL or IP address
|
|
# - `subtopic`
|
|
# A list of topic to be subscribed Ex. ["/testopic_1", "/testopic_2"]
|
|
# - `msgReceiveChannel`
|
|
# A storage the mqtt client used to store incoming message.
|
|
# Ex. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(32))
|
|
# - `keepaliveChannel`
|
|
# A channel to store keepalive message
|
|
# - `onMsgCallback`
|
|
# A user-defined function to handle incoming message
|
|
|
|
# # Keyword Arguments
|
|
# - `mqttBrokerPort`
|
|
# mqtt broker port
|
|
# - `keepalivetopic`
|
|
# A topic where keepalive message going
|
|
# - `keepaliveCheckInterval`
|
|
# A time interval to check whether the mqtt client still connect to mqtt broker
|
|
# - `qos`
|
|
# Quality of Service. Can be QOS_0, QOS_1, QOS_2
|
|
# - `clearOldMsg`
|
|
# Boolean to determine if old messages should be cleared from channels
|
|
|
|
# # Return
|
|
# - `mqttInstance`
|
|
# A new mqtt client
|
|
|
|
# # Example
|
|
# ```jldoctest
|
|
# julia> using Revise
|
|
# julia> using GeneralUtils, Dates, JSON3, UUIDs
|
|
# julia> mqttMsgReceiveTopic = ["/receivetopic_1", "/receivetopic_2"]
|
|
# julia> mqttMsgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) # single channel Ex. (ch1=Channel(8),)
|
|
# julia> keepaliveChannel = Channel(8)
|
|
# julia> function onMsgCallback(topic, payload)
|
|
# jobj = JSON3.read(String(payload))
|
|
# incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively
|
|
|
|
# if occursin("topic_1", topic)
|
|
# put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
|
|
# elseif occursin("topic_2", topic)
|
|
# put!(mqttMsgReceiveChannel[:ch2], incomingMqttMsg)
|
|
# elseif occursin("keepalive", topic)
|
|
# put!(keepaliveChannel, incomingMqttMsg)
|
|
# else
|
|
# println("undefined condition ", @__FILE__, " ", @__LINE__)
|
|
# end
|
|
# end
|
|
# julia> mqttInstance = GeneralUtils.mqttClientInstance_v2(
|
|
# "mqtt.yiem.cc",
|
|
# mqttMsgReceiveTopic,
|
|
# mqttMsgReceiveChannel,
|
|
# keepaliveChannel,
|
|
# onMsgCallback
|
|
# )
|
|
# ```
|
|
|
|
# # Signature
|
|
# """
|
|
# function mqttClientInstance_v2(
|
|
# mqttBrokerAddress::String,
|
|
# subtopic::Vector{String},
|
|
# msgReceiveChannel, # NamedTuple of channels i.e. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(8))
|
|
# keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
|
|
# onMsgCallback::Function
|
|
# ;
|
|
# clientId::String="NA",
|
|
# mqttBrokerPort::Integer=1883,
|
|
# keepaliveTopic::String= "/keepalive/$(uuid4snakecase())",
|
|
# keepaliveCheckInterval::Integer=30,
|
|
# qos::MQTTClient.QOS=QOS_1,
|
|
# multiMsg::String="single", #[PENDING] bad design. this info should be stored in each msgMeta
|
|
# clearOldMsg::Bool=true,
|
|
# )
|
|
|
|
# client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
|
|
# MQTTClient.connect(client, connection)
|
|
# for i in subtopic
|
|
# MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
|
|
# end
|
|
# MQTTClient.subscribe(client, keepaliveTopic, onMsgCallback, qos=qos)
|
|
|
|
# keepaliveTopic = clientId == "NA" ? keepaliveTopic : "/keepalive/$clientId"
|
|
|
|
# instance = mqttClientInstance_v2(
|
|
# mqttBrokerAddress,
|
|
# mqttBrokerPort,
|
|
# subtopic,
|
|
# msgReceiveChannel,
|
|
# onMsgCallback,
|
|
# qos,
|
|
# client,
|
|
# clientId,
|
|
# connection,
|
|
# keepaliveTopic,
|
|
# keepaliveChannel,
|
|
# keepaliveCheckInterval,
|
|
# nothing,
|
|
# multiMsg,
|
|
# nothing, # store latest message to prevent doing the same massage twice
|
|
# )
|
|
|
|
# if clearOldMsg
|
|
# chnames = keys(msgReceiveChannel)
|
|
# for i in chnames
|
|
# totalmsg = msgReceiveChannel[i].n_avail_items #[TESTING]
|
|
# if totalmsg > 0
|
|
# while isready(msgReceiveChannel[i])
|
|
# _ = take!(msgReceiveChannel[i])
|
|
# end
|
|
# println("Total $totalmsg old MQTT messages cleared ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
# end
|
|
# end
|
|
# end
|
|
|
|
# return instance
|
|
# end
|
|
|
|
|
|
""" Generate msgMeta to be including in a message. So the message receiver know
|
|
what to do with the message.
|
|
|
|
# Arguments
|
|
- `sendTo`
|
|
topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
|
|
|
|
# Keyword Arguments
|
|
- `msgPurpose`
|
|
purpose of this message e.g. "updateStatus"
|
|
- `senderName`
|
|
sender name (String) e.g. "agent-wine-web-frontend"
|
|
- `senderId`
|
|
sender id e.g. uuid4snakecase()
|
|
- `receiverName`
|
|
msg receiver name (String) e.g. "agent-backend"
|
|
- `receiverId`
|
|
msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
|
|
- `requestresponse`
|
|
this message is a "request" or "response"
|
|
- `getpost`
|
|
this message is a "get" or "post"
|
|
- `msgId`
|
|
message ID e.g. uuid4snakecase()
|
|
- `timestamp`
|
|
message published timestamp e.g. string(Dates.now())
|
|
- `senderselfnote`
|
|
help sender manage incoming reply msg, could be "for text inference", "img generation"
|
|
- `replyTopic`
|
|
sender ask receiver to reply to this topic e.g. "/agent/frontend"
|
|
- `replyToMsgId`
|
|
sender is responding to this msg ID e.g. uuid4snakecase()
|
|
- `acknowledgestatus`
|
|
e.g. "ACK", "NACK"
|
|
- `mqttBrokerAddress`
|
|
MQTT broker URI. e.g. "test.mosquitto.org"
|
|
- `mqttBrokerPort`
|
|
MQTT broker port
|
|
|
|
- `msgFormatVersion`
|
|
|
|
# Return
|
|
- `msgMeta`
|
|
|
|
# Example
|
|
```jldoctest
|
|
julia> using Revise
|
|
julia> using GeneralUtils
|
|
julia> msgMeta = GeneralUtils.generate_msgMeta(
|
|
"/agent/frontend/wine/chat/api/v1/txt/receive",
|
|
msgPurpose="keepalive",
|
|
senderName="keepaliveservice")
|
|
```
|
|
|
|
# Signature
|
|
"""
|
|
function generate_msgMeta(
|
|
sendTo::T1, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
|
|
;
|
|
msgPurpose::T1= "nothing", # purpose of this message e.g. "updateStatus"
|
|
senderName::T1= "nothing", # sender name (String) e.g. "agent-wine-web-frontend"
|
|
senderId::T1= "nothing", # sender id e.g. uuid4snakecase()
|
|
receiverName::T1= "nothing", # msg receiver name (String) e.g. "agent-backend"
|
|
receiverId::T1= "nothing", # id of msg receiver, nothing means everyone in the topic e.g. uuid4snakecase()
|
|
requestresponse::T1= "nothing", # this message is a "request" or "response"
|
|
getpost::T1= "nothing", # this message is a "get" or "post"
|
|
msgId::T1= uuid4snakecase(),
|
|
timestamp= string(Dates.now()), # message published timestamp
|
|
|
|
# help sender manage incoming reply msg, could be "for text inference", "img generation"
|
|
senderselfnote::Any= "nothing",
|
|
|
|
# sender ask receiver to reply to this topic
|
|
# e.g. "/agent/frontend/wine/chat/api/v1/txt/receive"
|
|
replyTopic::T1= "nothing",
|
|
replyToMsgId::T1= "nothing", # sender is responding to this msg ID
|
|
acknowledgestatus::T1= "nothing", # "ACK", "NACK"
|
|
|
|
mqttBrokerAddress::T1= "test.mosquitto.org",
|
|
mqttBrokerPort::Integer= 1883,
|
|
msgFormatVersion::T1= "nothing",
|
|
|
|
)::Dict{Symbol, Any} where {T1<:AbstractString}
|
|
|
|
msgMeta::Dict=Dict(
|
|
:sendTo=> sendTo, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
|
|
:msgPurpose=> msgPurpose, # purpose of this message e.g. "updateStatus"
|
|
:senderName=> senderName, # sender name (String) e.g. "agent-wine-web-frontend"
|
|
:senderId=> senderId, # sender id e.g. uuid4snakecase()
|
|
:receiverName=> receiverName, # msg receiver name (String) e.g. "agent-backend"
|
|
:receiverId=> receiverId, # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
|
|
:requestResponse=> requestresponse, # this message is a "request" or "response"
|
|
:getPost=> getpost, # this message is a "get" or "post"
|
|
:msgId=> msgId,
|
|
:timeStamp=> timestamp, # message published timestamp
|
|
|
|
# help sender manage incoming reply msg, could be "for text inference", "img generation"
|
|
:senderSelfnote=>senderselfnote,
|
|
|
|
# sender ask receiver to reply to this topic
|
|
# e.g. "/agent/frontend/wine/chat/api/v1/txt/receive"
|
|
:replyTopic=> replyTopic,
|
|
|
|
:replyToMsgId=> replyToMsgId,
|
|
:acknowledgestatus=> acknowledgestatus,
|
|
|
|
:mqttBrokerAddress=> mqttBrokerAddress,
|
|
:mqttBrokerPort=> mqttBrokerPort,
|
|
|
|
:msgFormatVersion=> msgFormatVersion,
|
|
)
|
|
|
|
return msgMeta
|
|
end
|
|
|
|
|
|
|
|
# """ Check mqtt server connection.
|
|
|
|
# Arguments\n
|
|
# -----
|
|
# mqttInstanceDict::Dict{Symbol, Any}
|
|
# a dictionary contain mqtt instance. 1 per mqtt client.
|
|
# interval::Integer
|
|
# time interval to check mqtt server in seconds
|
|
|
|
# Return\n
|
|
# -----
|
|
# isconnectionalive::Bool
|
|
# true if mqtt connection is alive
|
|
|
|
# Example\n
|
|
# -----
|
|
# ```jldoctest
|
|
# julia> using Revise
|
|
# julia> using GeneralUtils, Dates, JSON3, MQTTClient
|
|
# julia> GeneralUtils.isMqttConnectionAlive(mqttInstance)
|
|
# true
|
|
# ```
|
|
|
|
# Signature\n
|
|
# -----
|
|
# """
|
|
# function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance}
|
|
# isconnectionalive = false
|
|
|
|
# # ditch old keepalive msg is any
|
|
# while isready(mqttInstance.keepaliveChannel)
|
|
# _ = take!(mqttInstance.keepaliveChannel)
|
|
# end
|
|
|
|
# msgMeta = generate_msgMeta(
|
|
# mqttInstance.keepalivetopic,
|
|
# msgPurpose= "keepalive",
|
|
# )
|
|
|
|
# keepaliveMsg = Dict(
|
|
# :msgMeta=> msgMeta,
|
|
# :payload=> Dict(
|
|
# :text=>"keepalive",
|
|
# )
|
|
# )
|
|
|
|
# publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTo],
|
|
# JSON3.write(keepaliveMsg))
|
|
# timediff = 0
|
|
# starttime = Dates.now()
|
|
# while timediff <= 5
|
|
# timediff = timedifference(starttime, Dates.now(), "seconds")
|
|
# if isready(mqttInstance.keepaliveChannel)
|
|
# incomingMsg = take!(mqttInstance.keepaliveChannel)
|
|
# if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId]
|
|
# # connection is alive
|
|
# isconnectionalive = true
|
|
# break
|
|
# end
|
|
# end
|
|
# sleep(1)
|
|
# end
|
|
# return isconnectionalive
|
|
# end
|
|
|
|
|
|
# """ Check mqtt server connection, reconnect if disconnected.
|
|
|
|
# Arguments\n
|
|
# -----
|
|
# mqttInstanceDict::Dict{Symbol, Any}
|
|
# a dictionary contain mqtt instance. 1 per mqtt client.
|
|
# keepaliveCheckInterval::Integer
|
|
# time interval to check mqtt server in seconds
|
|
|
|
# Return\n
|
|
# -----
|
|
# isreconnect::Bool
|
|
# true if mqtt connection is reconnected
|
|
|
|
# Example\n
|
|
# -----
|
|
# ```jldoctest
|
|
# julia> using Revise
|
|
# julia> using GeneralUtils, Dates, JSON3
|
|
# julia> GeneralUtils.checkMqttConnection!(mqttInstance, 5)
|
|
# ```
|
|
|
|
# Signature\n
|
|
# -----
|
|
# """
|
|
# function checkMqttConnection!(mqttInstance::T;
|
|
# keepaliveCheckInterval::Union{Integer, Nothing}=nothing)::Union{Bool, Nothing} where {T<:mqttClientInstance}
|
|
|
|
# interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
|
|
|
|
# # check if mqtt connection is still up
|
|
# intervaldiff =
|
|
# if mqttInstance.lastTimeMqttConnCheck !== nothing
|
|
# timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds")
|
|
# else
|
|
# Inf
|
|
# end
|
|
|
|
# if intervaldiff > interval
|
|
# connectionStatusStart = isMqttConnectionAlive(mqttInstance) # a flag to note whether the connection status has changed from false to true
|
|
# while true
|
|
# mqttConnStatus = isMqttConnectionAlive(mqttInstance)
|
|
# if mqttConnStatus == false
|
|
# sleep(5) # wait
|
|
# println("MQTT connection disconnected, attemping to reconnect $(Dates.now()) at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort)")
|
|
# # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
|
|
# mqttInstance.client, mqttInstance.connection =
|
|
# MakeConnection(mqttInstance.mqttBrokerAddress,
|
|
# mqttInstance.mqttBrokerPort)
|
|
# try
|
|
# connect(mqttInstance.client, mqttInstance.connection)
|
|
# for topic in mqttInstance.subtopic
|
|
# subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
|
# end
|
|
# MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
|
|
# catch
|
|
# println("Failed to reconnect MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
|
|
# end
|
|
# else
|
|
# mqttInstance.lastTimeMqttConnCheck = Dates.now()
|
|
# if connectionStatusStart != mqttConnStatus
|
|
# println("Reconnected to MQTT broker at $(mqttInstance.mqttBrokerAddress):$(mqttInstance.mqttBrokerPort) $(Dates.now())")
|
|
# end
|
|
# return mqttConnStatus
|
|
# end
|
|
# end
|
|
# else
|
|
# return nothing # still within check interval, no update on connection status yet
|
|
# end
|
|
# end
|
|
|
|
|
|
# """ Send a message to specified MQTT topic then wait for reply.
|
|
|
|
# # Arguments
|
|
# - `outgoingMsg::Dict`
|
|
# an outgoing message
|
|
|
|
# # Return
|
|
# - `result::NamedTuple`
|
|
# ( success= true, error= nothing )
|
|
|
|
# # Example
|
|
# ```jldoctest
|
|
# julia> using Revise
|
|
# julia> using GeneralUtils, Dates, JSON3, UUIDs
|
|
# julia> msgMeta = GeneralUtils.generate_msgMeta(
|
|
# "/testtopic",
|
|
# senderName= "somename",
|
|
# senderId= uuid4snakecase(),
|
|
# mqttBrokerAddress= "test.mosquitto.org",
|
|
# mqttBrokerPort= 1883,
|
|
# )
|
|
# julia> outgoingMsg = Dict(
|
|
# :msgMeta=> msgMeta,
|
|
# :payload=> Dict("hello"=> "World"),
|
|
# )
|
|
# julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
|
|
# ```
|
|
|
|
# # Signature
|
|
# """
|
|
# function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
|
|
|
|
# # sendMqttMsg() doesn't need to receive msg but mqttClientInstance_v2 requires to have receive topic
|
|
# mqttMsgReceiveTopic = "/GeneralUtils_sendMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
|
|
|
|
# mqttMsgReceiveChannel = (ch1=Channel(8),)
|
|
# keepaliveChannel = Channel(8)
|
|
|
|
# # Define the callback for receiving messages.
|
|
# function onMsgCallback(topic, payload)
|
|
# jobj = JSON3.read(String(payload))
|
|
# onMsg = copy(jobj)
|
|
# put!(mqttMsgReceiveChannel[:ch1], onMsg)
|
|
# end
|
|
|
|
# mqttInstance = mqttClientInstance_v2(
|
|
# outgoingMsg[:msgMeta][:mqttBrokerAddress],
|
|
# [mqttMsgReceiveTopic],
|
|
# mqttMsgReceiveChannel,
|
|
# keepaliveChannel,
|
|
# onMsgCallback;
|
|
# mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
|
|
# )
|
|
|
|
# response = sendMqttMsg(mqttInstance, outgoingMsg)
|
|
# try disconnect(mqttInstance.client) catch end
|
|
|
|
# return response
|
|
# end
|
|
# function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T}
|
|
# )::NamedTuple where {T<:Any}
|
|
# try
|
|
# publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTo], JSON3.write(outgoingMsg))
|
|
# return (success=true, error=nothing)
|
|
# catch e
|
|
# return (success=false, error=e)
|
|
# end
|
|
# end
|
|
|
|
|
|
# """ Send a message to specified MQTT topic then wait for reply.
|
|
|
|
# # Arguments
|
|
# - `outgoingMsg::Dict{Symbol, T}`
|
|
# an outgoing message
|
|
|
|
# # Keyword Arguments
|
|
# - `timeout::Integer=60`
|
|
# time in seconds to wait for a response before error
|
|
# - `maxattempt::Integer=1`
|
|
# maximum number of attempts to send and receive message
|
|
|
|
# # Return
|
|
# - `result::NamedTuple`
|
|
# (
|
|
# success= true, # idicates whether sending MQTT message successful
|
|
# error= nothing # error message
|
|
# response= somemessage # response message
|
|
# )
|
|
|
|
# # Example
|
|
# ```jldoctest
|
|
# julia> using Revise
|
|
# julia> using GeneralUtils, Dates, UUIDs, JSON3
|
|
# julia> msgMeta = GeneralUtils.generate_msgMeta(
|
|
# "/testtopic",
|
|
# senderName= "somename",
|
|
# senderId= uuid4snakecase(),
|
|
# mqttBrokerAddress= "mqtt.yiem.cc",
|
|
# mqttBrokerPort= 1883,
|
|
# )
|
|
# julia> outgoingMsg = Dict(
|
|
# :msgMeta=> msgMeta,
|
|
# :payload=> Dict(:hello=> "World"),
|
|
# )
|
|
# julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
|
|
# ```
|
|
|
|
# # Signature
|
|
# """
|
|
# function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
|
|
# connectiontimeout::Integer=600, responsetimeout::Integer=60, responsemaxattempt::Integer=1)::NamedTuple where {T<:Any}
|
|
# senderId = outgoingMsg[:msgMeta][:senderId]
|
|
# mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$senderId"
|
|
# mqttMsgReceiveChannel = (ch1=Channel(8),)
|
|
# keepaliveChannel = Channel(8)
|
|
# outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
|
|
|
|
# # Define the callback for receiving messages.
|
|
# function onMsgCallback(topic, payload)
|
|
# jobj = JSON3.read(String(payload))
|
|
# incomingMqttMsg = copy(jobj)
|
|
|
|
# if occursin("GeneralUtils_sendReceiveMqttMsg", topic)
|
|
# put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
|
|
# elseif occursin("keepalive", topic)
|
|
# put!(keepaliveChannel, incomingMqttMsg)
|
|
# else
|
|
# println("undefined condition ", @__FILE__, " ", @__LINE__)
|
|
# end
|
|
# end
|
|
|
|
# mqttInstance = nothing
|
|
# attempt = 0
|
|
# starttime = Dates.now()
|
|
# endtime = starttime + Second(connectiontimeout)
|
|
# errormsg = nothing
|
|
# while true
|
|
# timenow = Dates.now()
|
|
# timepass = timedifference(starttime, timenow, "seconds")
|
|
# timeleft = timedifference(timenow, endtime, "seconds")
|
|
|
|
# if timepass <= connectiontimeout
|
|
# attempt += 1
|
|
# attempt > 1 ? println("Attempt $attempt to connect to MQTT broker. Timed out in $timeleft seconds. $errormsg") : nothing
|
|
|
|
# try
|
|
# mqttInstance = mqttClientInstance_v2(
|
|
# outgoingMsg[:msgMeta][:mqttBrokerAddress],
|
|
# [mqttMsgReceiveTopic],
|
|
# mqttMsgReceiveChannel,
|
|
# keepaliveChannel,
|
|
# onMsgCallback;
|
|
# mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort],
|
|
# clientId=senderId
|
|
# )
|
|
# break
|
|
# catch e
|
|
# errormsg = e
|
|
# sleep(5)
|
|
# end
|
|
# else
|
|
# println("Failed to instantiate MQTT client after $timepass seconds. $errormsg")
|
|
# return nothing
|
|
# end
|
|
# end
|
|
|
|
# response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg;
|
|
# responsetimeout=responsetimeout, responsemaxattempt=responsemaxattempt)
|
|
# try disconnect(mqttInstance.client) catch end
|
|
|
|
# return response
|
|
# end
|
|
# function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol,
|
|
# outgoingMsg::Dict{Symbol, T}; responsetimeout::Integer=60, responsemaxattempt::Integer=1
|
|
# )::NamedTuple where {T<:Any}
|
|
|
|
# timepass = nothing
|
|
# attempts = 1
|
|
# while attempts <= responsemaxattempt
|
|
# sendMqttMsg(mqttInstance, outgoingMsg)
|
|
|
|
# starttime = Dates.now()
|
|
# while true
|
|
# timepass = timedifference(starttime, Dates.now(), "seconds")
|
|
|
|
# if timepass <= responsetimeout
|
|
# if isready(mqttInstance.msgReceiveChannel[receivechannel])
|
|
# incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel])
|
|
|
|
# if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
|
|
# if incomingMsg[:msgMeta][:acknowledgestatus] == "NACK"
|
|
# println("NACK")
|
|
# break # resend the msg
|
|
# else
|
|
# return (success=true, error=nothing, response=incomingMsg[:payload])
|
|
# end
|
|
# end
|
|
# end
|
|
# else
|
|
# break
|
|
# end
|
|
# sleep(1)
|
|
# end
|
|
# if attempts > 1
|
|
# println("\n<sendReceiveMqttMsg()> attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
|
|
# pprintln(outgoingMsg)
|
|
# println("</sendReceiveMqttMsg()> attempts $attempts/$responsemaxattempt ", @__FILE__, ":", @__LINE__, " $(Dates.now())\n")
|
|
# checkMqttConnection!(mqttInstance, keepaliveCheckInterval=5)
|
|
# end
|
|
# attempts += 1
|
|
# end
|
|
|
|
# return (success=false,
|
|
# error="no response, timeout $timepass/$timeout, $(attempts-1) publish attempted",
|
|
# response=nothing)
|
|
# end
|
|
|
|
|
|
# """ Send a message to specified MQTT topic then wait for reply.
|
|
|
|
# # Arguments
|
|
# - `workDict::Dict`
|
|
# A dictionary containing workspace data
|
|
# - `incomingMsg::Dict`
|
|
# The incoming message to process
|
|
|
|
# # Keyword Arguments
|
|
# - `data`
|
|
# Optional data to be transferred. Default is nothing.
|
|
|
|
# # Return
|
|
# - `result::Dict`
|
|
# A dictionary containing either:
|
|
# - Metadata about the data transfer session if initiating a new transfer
|
|
# - The requested data part if responding to a data part request
|
|
|
|
# # Example
|
|
# ```jldoctest
|
|
# julia> using Revise
|
|
# julia> using GeneralUtils, Dates, JSON3, UUIDs
|
|
# julia> msgMeta = GeneralUtils.generate_msgMeta(
|
|
# "/testtopic",
|
|
# senderName= "somename",
|
|
# senderId= uuid4snakecase(),
|
|
# mqttBrokerAddress= "test.mosquitto.org",
|
|
# mqttBrokerPort= 1883,
|
|
# )
|
|
# julia> outgoingMsg = Dict(
|
|
# :msgMeta=> msgMeta,
|
|
# :payload=> Dict("hello"=> "World"),
|
|
# )
|
|
# julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
|
|
# ```
|
|
# # TODO
|
|
# - [x] transfer multiple msg with the same mqtt connection. The transfer loop should be
|
|
# implemented by passing mqttInstance into sendReceiveMqttMsg() so that
|
|
# 1-connection is used to send all data
|
|
# - [] partsize should be calculate from the fact that MQTT max size is 4MB (256MB theorectically)
|
|
|
|
# # Signature
|
|
# """
|
|
# function dataTransferOverMQTT_sender(workDict::Dict, incomingMsg::Dict; data=nothing)
|
|
# dataTransferSessionID = nothing
|
|
|
|
# if !haskey(incomingMsg[:payload], :dataTransferSessionID)
|
|
# """ Expected incomingMsg (without "dataTransferSessionID") for requesting metadata
|
|
|
|
# Dict(:payload => Dict(:args => Dict(), :functioncall => "load_winetable"),
|
|
# :msgMeta => Dict(:msgPurpose => nothing,
|
|
# :receiverId => nothing,
|
|
# :getpost => nothing,
|
|
# :msgId => "7fe4b765_fa2e_4c2e_9df0_9cb97e84ec32",
|
|
# :requestresponse => nothing,
|
|
# :msgFormatVersion => nothing,
|
|
# :timestamp => "Tue Oct 01 2024 16:22:12 GMT+0700 (เวลาอินโดจีน)",
|
|
# :replyToMsgId => nothing,
|
|
# :acknowledgestatus => nothing,
|
|
# :sendTo => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
|
|
# :mqttBrokerPort => nothing,
|
|
# :receiverName => "agent_wine_backend",
|
|
# :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
|
|
# :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
|
|
# :senderName => "agent_wine_frontend",
|
|
# :senderselfnote => nothing,
|
|
# :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
|
|
# """
|
|
|
|
# if data !== nothing # if the user provide data, store it in a new session workspace
|
|
# dataTransferSessionID = uuid4snakecase()
|
|
# haskey(workDict, :dataTransferOverMQTT) ? nothing : workDict[:dataTransferOverMQTT] = Dict()
|
|
# workDict[:dataTransferOverMQTT][dataTransferSessionID] = Dict()
|
|
# s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
|
|
# s[:dataTransferSessionID] = dataTransferSessionID
|
|
# dataDict = Dict(pairs(data))
|
|
# merge!(s, dataDict)
|
|
# else
|
|
# error("No data provided")
|
|
# end
|
|
|
|
# # compute hash value for each piece of data
|
|
# s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
|
|
# s[:hashes] = [bytes2hex(sha256(JSON3.write(s[:dataparts][i]))) for i in 1:s[:totalparts]]
|
|
|
|
# # send metadata without data to receiver()
|
|
# datainfo = Dict()
|
|
# for (k, v) in s
|
|
# if k ∉ [:dataparts]
|
|
# datainfo[k] = v
|
|
# end
|
|
# end
|
|
|
|
# d = Dict(
|
|
# :result=> "dataTransferOverMQTT",
|
|
# :dataTransferSessionID=> dataTransferSessionID,
|
|
# Symbol(dataTransferSessionID)=> datainfo
|
|
# )
|
|
|
|
# """ Sending metadata to the requester
|
|
# Dict(:result => "dataTransferOverMQTT",
|
|
# :b57123aa_a4c6_4d07_ace2_4885005d51c6 => Dict(:partsize => 3,
|
|
# :hashes => ["755fb25ae04890fa2a40c2b5fb5ddf05adeff6b934475998576bdf307359cd0f",
|
|
# "b95ba78344ad409755a386b6ab0cb2c8ea490756ce2df401aa6e5f77cf445025",
|
|
# "5b51277dd06ca255492cfc432824adf77e9e0de94179806cb130bef38e75da48"],
|
|
# :datatype => "vector{Dict}",
|
|
# :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6",
|
|
# :totalparts => 46),
|
|
# :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6")
|
|
# """
|
|
|
|
# return d
|
|
|
|
# else
|
|
|
|
# """ Expected incomingMsg (with "dataTransferSessionID") for requesting a data part
|
|
# Dict(:payload => Dict(:dataTransferSessionID => "5543f3b9_194e_4670_b961_e51c77955e40", :args => Dict(), :requestingDataPartNumber => 1, :functioncall => "load_winetable"),
|
|
# :msgMeta => Dict(:msgPurpose => nothing,
|
|
# :receiverId => nothing,
|
|
# :getpost => nothing,
|
|
# :msgId => "fbcc3bd9_c2bc_4bde_908a_dfe88ab7a305",
|
|
# :requestresponse => nothing,
|
|
# :msgFormatVersion => nothing,
|
|
# :timestamp => "Tue Oct 01 2024 16:34:06 GMT+0700 (เวลาอินโดจีน)",
|
|
# :replyToMsgId => nothing,
|
|
# :acknowledgestatus => nothing,
|
|
# :sendTo => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
|
|
# :mqttBrokerPort => nothing,
|
|
# :receiverName => "agent_wine_backend",
|
|
# :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
|
|
# :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
|
|
# :senderName => "agent_wine_frontend",
|
|
# :senderselfnote => nothing,
|
|
# :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
|
|
# """
|
|
|
|
# # check transfer session id
|
|
# dataTransferSessionID = incomingMsg[:payload][:dataTransferSessionID]
|
|
# clientRequestingDataPartNumber =
|
|
# incomingMsg[:payload][:requestingDataPartNumber]
|
|
# s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
|
|
|
|
# d = Dict(
|
|
# :result=> "dataTransferOverMQTT",
|
|
# :dataTransferSessionID=> dataTransferSessionID,
|
|
# Symbol(dataTransferSessionID)=> Dict(
|
|
# :dataPartNumber=> clientRequestingDataPartNumber,
|
|
# :datapart=> s[:dataparts][clientRequestingDataPartNumber]
|
|
# ),
|
|
# )
|
|
|
|
# return d
|
|
# end
|
|
# end
|
|
|
|
# """ Send a message to specified MQTT topic then wait for reply.
|
|
|
|
# # Arguments
|
|
# - `outgoingMsg::Dict`
|
|
# an outgoing message
|
|
|
|
# # Return
|
|
# - `result::NamedTuple`
|
|
# ( success= true, error= nothing )
|
|
|
|
# # Example
|
|
# ```jldoctest
|
|
# julia> using Revise
|
|
# julia> using GeneralUtils, Dates, JSON3, UUIDs
|
|
# julia> msgMeta = GeneralUtils.generate_msgMeta(
|
|
# "/testtopic",
|
|
# senderName= "somename",
|
|
# senderId= uuid4snakecase(),
|
|
# mqttBrokerAddress= "test.mosquitto.org",
|
|
# mqttBrokerPort= 1883,
|
|
# )
|
|
# julia> outgoingMsg = Dict(
|
|
# :msgMeta=> msgMeta,
|
|
# :payload=> Dict("hello"=> "World"),
|
|
# )
|
|
# julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
|
|
# ```
|
|
# # TODO
|
|
# - [PENDING]
|
|
|
|
# # Signature
|
|
# """
|
|
# function dataTransferOverMQTT_receiver()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
end # module communication |