Files
GeneralUtils/src/communication.jl
2025-03-12 14:34:14 +07:00

950 lines
29 KiB
Julia

module communication
export generate_msgMeta, isMqttConnectionAlive, checkMqttConnection!,
sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2,
dataTransferOverMQTT_sender, dataTransferOverMQTT_receiver
using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames,
SHA, PrettyPrinting
using ..util
# ---------------------------------------------- 100 --------------------------------------------- #
abstract type mqttClientInstance end
mutable struct mqttClientInstance_v1 <: mqttClientInstance
mqttBrokerAddress::String
mqttBrokerPort::Integer
subtopic::Vector{String}
receiveDataChannel::Channel
onMsgCallback::Function
qos::MQTTClient.QOS
client::MQTTClient.Client
connection::MQTTClient.Connection
keepalivetopic::String
keepaliveChannel::Channel
keepaliveCheckInterval::Integer # second
lastTimeMqttConnCheck::Union{DateTime, Nothing}
end
function mqttClientInstance_v1(
mqttBrokerAddress::String,
mqttBrokerPort::Integer,
subtopic::Vector{String},
receiveDataChannel::Channel,
keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback()
onMsgCallback::Function
;
keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
keepaliveCheckInterval::Integer=30,
qos::MQTTClient.QOS=QOS_1,
)
client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
MQTTClient.connect(client, connection)
for i in subtopic
MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
end
MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos)
instance = mqttClientInstance(
mqttBrokerAddress,
mqttBrokerPort,
subtopic,
receiveDataChannel,
onMsgCallback,
qos,
client,
connection,
keepalivetopic,
keepaliveChannel,
keepaliveCheckInterval,
Dates.now(),
)
return instance
end
mutable struct mqttClientInstance_v2 <: mqttClientInstance
mqttBrokerAddress::String
mqttBrokerPort::Integer
subtopic::Vector{String}
msgReceiveChannel
onMsgCallback::Function
qos::MQTTClient.QOS
client::MQTTClient.Client
connection::MQTTClient.Connection
keepalivetopic::String
keepaliveChannel::Channel
keepaliveCheckInterval::Integer # second
lastTimeMqttConnCheck::Union{DateTime, Nothing}
multiMsg::String # "single", "metadata", "datapart"
latestMsg
end
""" MQTT client v2. The difference between client v1 and v2 is v2 allows multiple
msgReceiveChannel without having to redefine mqttClientInstance again like in v1.
This is achieved by using msgReceiveChannel::NamedTuple to store multiple channel.
# Arguments
- `mqttBrokerAddress`
MQTT broker address. Can be URL or IP address
- `subtopic`
A list of topic to be subscribed Ex. ["/testopic_1", "/testopic_2"]
- `msgReceiveChannel`
A storage the mqtt client used to store incoming message.
Ex. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(32))
- `keepaliveChannel`
A channel to store keepalive message
- `onMsgCallback`
A user-defined function to handle incoming message
# Keyword Arguments
- `mqttBrokerPort`
mqtt broker port
- `keepalivetopic`
A topic where keepalive message going
- `keepaliveCheckInterval`
A time interval to check whether the mqtt client still connect to mqtt broker
- `qos`
Quality of Service. Can be QOS_0, QOS_1, QOS_2
# Return
- `mqttInstance`
A new mqtt client
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, UUIDs
julia> mqttMsgReceiveTopic = ["/receivetopic_1", "/receivetopic_2"]
julia> mqttMsgReceiveChannel = (ch1=Channel(8), ch2=Channel(32)) # single channel Ex. (ch1=Channel(8),)
julia> keepaliveChannel = Channel(8)
julia> function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively
if occursin("topic_1", topic)
put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
elseif occursin("topic_2", topic)
put!(mqttMsgReceiveChannel[:ch2], incomingMqttMsg)
elseif occursin("keepalive", topic)
put!(keepaliveChannel, incomingMqttMsg)
else
println("undefined condition ", @__FILE__, " ", @__LINE__)
end
end
julia> mqttInstance = GeneralUtils.mqttClientInstance_v2(
"mqtt.yiem.cc",
mqttMsgReceiveTopic,
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback
)
```
# Signature
"""
function mqttClientInstance_v2(
mqttBrokerAddress::String,
subtopic::Vector{String},
msgReceiveChannel, # NamedTuple of channels i.e. msgReceiveChannel = (ch1=Channel(8), ch2=Channel(8))
keepaliveChannel::Channel, # used for checkMqttConnection(). user needs to specify because it has to be accessible by user-defined onMsgCallback()
onMsgCallback::Function
;
mqttBrokerPort::Integer=1883,
keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
keepaliveCheckInterval::Integer=30,
qos::MQTTClient.QOS=QOS_1,
multiMsg::String="single",
clearOldMsg::Bool=true,
)
client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
MQTTClient.connect(client, connection)
for i in subtopic
MQTTClient.subscribe(client, i, onMsgCallback, qos=qos)
end
MQTTClient.subscribe(client, keepalivetopic, onMsgCallback, qos=qos)
instance = mqttClientInstance_v2(
mqttBrokerAddress,
mqttBrokerPort,
subtopic,
msgReceiveChannel,
onMsgCallback,
qos,
client,
connection,
keepalivetopic,
keepaliveChannel,
keepaliveCheckInterval,
nothing,
multiMsg,
nothing, # store latest message to prevent doing the same massage twice
)
if clearOldMsg
chnames = keys(msgReceiveChannel)
for i in chnames
while isready(msgReceiveChannel[i])
_ = take!(msgReceiveChannel[i])
end
end
println("Old MQTT messages cleared")
end
return instance
end
""" Generate msgMeta to be including in a message. So the message receiver know
what to do with the message.
# Arguments
- `sendTopic`
topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
# Keyword Arguments
- `msgPurpose`
purpose of this message e.g. "updateStatus"
- `senderName`
sender name (String) e.g. "agent-wine-web-frontend"
- `senderId`
sender id e.g. uuid4snakecase()
- `receiverName`
msg receiver name (String) e.g. "agent-backend"
- `receiverId`
msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
- `requestresponse`
this message is a "request" or "response"
- `getpost`
this message is a "get" or "post"
- `msgId`
message ID e.g. uuid4snakecase()
- `timestamp`
message published timestamp e.g. string(Dates.now())
- `senderselfnote`
help sender manage incoming reply msg, could be "for text inference", "img generation"
- `replyTopic`
sender ask receiver to reply to this topic e.g. "/agent/frontend"
- `replyToMsgId`
sender is responding to this msg ID e.g. uuid4snakecase()
- `acknowledgestatus`
e.g. "ACK", "NACK"
- `mqttBrokerAddress`
MQTT broker URI. e.g. "test.mosquitto.org"
- `mqttBrokerPort`
MQTT broker port
- `msgFormatVersion`
# Return
- `msgMeta`
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/agent/frontend/wine/chat/api/v1/txt/receive",
msgPurpose="keepalive",
senderName="keepaliveservice")
```
# Signature
"""
function generate_msgMeta(
sendTopic::T1, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
;
msgPurpose::T1= "nothing", # purpose of this message e.g. "updateStatus"
senderName::T1= "nothing", # sender name (String) e.g. "agent-wine-web-frontend"
senderId::T1= "nothing", # sender id e.g. uuid4snakecase()
receiverName::T1= "nothing", # msg receiver name (String) e.g. "agent-backend"
receiverId::T1= "nothing", # id of msg receiver, nothing means everyone in the topic e.g. uuid4snakecase()
requestresponse::T1= "nothing", # this message is a "request" or "response"
getpost::T1= "nothing", # this message is a "get" or "post"
msgId::T1= uuid4snakecase(),
timestamp= string(Dates.now()), # message published timestamp
# help sender manage incoming reply msg, could be "for text inference", "img generation"
senderselfnote::Any= "nothing",
# sender ask receiver to reply to this topic
# e.g. "/agent/frontend/wine/chat/api/v1/txt/receive"
replyTopic::T1= "nothing",
replyToMsgId::T1= "nothing", # sender is responding to this msg ID
acknowledgestatus::T1= "nothing", # "ACK", "NACK"
mqttBrokerAddress::T1= "test.mosquitto.org",
mqttBrokerPort::Integer= 1883,
msgFormatVersion::T1= "nothing",
)::Dict{Symbol, Any} where {T1<:AbstractString}
msgMeta::Dict=Dict(
:sendTopic=> sendTopic, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
:msgPurpose=> msgPurpose, # purpose of this message e.g. "updateStatus"
:senderName=> senderName, # sender name (String) e.g. "agent-wine-web-frontend"
:senderId=> senderId, # sender id e.g. uuid4snakecase()
:receiverName=> receiverName, # msg receiver name (String) e.g. "agent-backend"
:receiverId=> receiverId, # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
:requestResponse=> requestresponse, # this message is a "request" or "response"
:getPost=> getpost, # this message is a "get" or "post"
:msgId=> msgId,
:timeStamp=> timestamp, # message published timestamp
# help sender manage incoming reply msg, could be "for text inference", "img generation"
:senderSelfnote=>senderselfnote,
# sender ask receiver to reply to this topic
# e.g. "/agent/frontend/wine/chat/api/v1/txt/receive"
:replyTopic=> replyTopic,
:replyToMsgId=> replyToMsgId,
:acknowledgestatus=> acknowledgestatus,
:mqttBrokerAddress=> mqttBrokerAddress,
:mqttBrokerPort=> mqttBrokerPort,
:msgFormatVersion=> msgFormatVersion,
)
return msgMeta
end
""" Check mqtt server connection.
Arguments\n
-----
mqttInstanceDict::Dict{Symbol, Any}
a dictionary contain mqtt instance. 1 per mqtt client.
interval::Integer
time interval to check mqtt server in seconds
Return\n
-----
isconnectionalive::Bool
true if mqtt connection is alive
Example\n
-----
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, MQTTClient
julia> GeneralUtils.isMqttConnectionAlive(mqttInstance)
true
```
Signature\n
-----
"""
function isMqttConnectionAlive(mqttInstance::T)::Bool where {T<:mqttClientInstance}
starttime = Dates.now()
isconnectionalive = false
# ditch old keepalive msg is any
while isready(mqttInstance.keepaliveChannel)
_ = take!(mqttInstance.keepaliveChannel)
end
msgMeta = generate_msgMeta(
mqttInstance.keepalivetopic,
msgPurpose= "keepalive",
)
keepaliveMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:text=>"keepalive",
)
)
publish(mqttInstance.client, keepaliveMsg[:msgMeta][:sendTopic],
JSON3.write(keepaliveMsg))
timediff = 0
while timediff < 5
timediff = timedifference(starttime, Dates.now(), "seconds")
if isready(mqttInstance.keepaliveChannel)
incomingMsg = take!(mqttInstance.keepaliveChannel)
if incomingMsg[:msgMeta][:msgId] == keepaliveMsg[:msgMeta][:msgId]
# connection is alive
isconnectionalive = true
break
end
end
sleep(1)
end
return isconnectionalive
end
""" Check mqtt server connection, reconnect if disconnected.
Arguments\n
-----
mqttInstanceDict::Dict{Symbol, Any}
a dictionary contain mqtt instance. 1 per mqtt client.
interval::Integer
time interval to check mqtt server in seconds
Return\n
-----
isreconnect::Bool
true if mqtt connection is reconnected
Example\n
-----
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3
julia> GeneralUtils.checkMqttConnection!(mqttInstance, 5)
```
Signature\n
-----
"""
function checkMqttConnection!(mqttInstance::T;
keepaliveCheckInterval::Union{Integer, Nothing}=nothing) where {T<:mqttClientInstance}
interval = keepaliveCheckInterval !== nothing ? keepaliveCheckInterval : mqttInstance.keepaliveCheckInterval
# check if mqtt connection is still up
intervaldiff =
if mqttInstance.lastTimeMqttConnCheck !== nothing
timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds")
else
Inf
end
isreconnect = false # this value is true if connection is disconnected
if intervaldiff > interval
while true
mqttConnStatus = isMqttConnectionAlive(mqttInstance)
if mqttConnStatus == false
isreconnect = true
println("mqtt connection disconnected, attemping to reconnect $(Dates.now())")
# use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
mqttInstance.client, mqttInstance.connection =
MakeConnection(mqttInstance.mqttBrokerAddress,
mqttInstance.mqttBrokerPort)
connect(mqttInstance.client, mqttInstance.connection)
for topic in mqttInstance.subtopic
subscribe(mqttInstance.client, topic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
end
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
sleep(1) # wait before checking connection again
else
mqttInstance.lastTimeMqttConnCheck = Dates.now()
if isreconnect
println("connected to mqtt broker")
end
return isreconnect
end
end
else
return nothing # still within check interval, no update on connection status yet
end
end
""" Send a message to specified MQTT topic then wait for reply.
# Arguments
- `outgoingMsg::Dict`
an outgoing message
# Return
- `result::NamedTuple`
( success= true, error= nothing )
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, UUIDs
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= uuid4snakecase(),
mqttBrokerAddress= "test.mosquitto.org",
mqttBrokerPort= 1883,
)
julia> outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict("hello"=> "World"),
)
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
```
# Signature
"""
function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
# sendMqttMsg() doesn't need to receive msg but mqttClientInstance_v2 requires to have receive topic
mqttMsgReceiveTopic = "/GeneralUtils_sendMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
mqttMsgReceiveChannel = (ch1=Channel(8),)
keepaliveChannel = Channel(8)
# Define the callback for receiving messages.
function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel[:ch1], onMsg)
end
mqttInstance = mqttClientInstance_v2(
outgoingMsg[:msgMeta][:mqttBrokerAddress],
[mqttMsgReceiveTopic],
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback;
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
)
response = sendMqttMsg(mqttInstance, outgoingMsg)
try disconnect(mqttInstance.client) catch end
return response
end
function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T}
)::NamedTuple where {T<:Any}
try
publish(mqttInstance.client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
return (success=true, error=nothing)
catch e
return (success=false, error=e)
end
end
""" Send a message to specified MQTT topic then wait for reply.
# Arguments
- `outgoingMsg::Dict{Symbol, T}`
an outgoing message
# Keyword Arguments
- `timeout::Integer`
time to wait for a response before error
# Return
- `result::NamedTuple`
(
success= true, # idicates whether sending MQTT message successful
error= nothing # error message
response= somemessage # response message
)
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, UUIDs, JSON3
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= uuid4snakecase(),
mqttBrokerAddress= "mqtt.yiem.cc",
mqttBrokerPort= 1883,
)
julia> outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(:hello=> "World"),
)
julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
```
# TODO
- [] update docs
# Signature
"""
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
mqttMsgReceiveChannel = (ch1=Channel(8),)
keepaliveChannel = Channel(8)
outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
# Define the callback for receiving messages.
function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel[:ch1], onMsg)
end
mqttInstance = mqttClientInstance_v2(
outgoingMsg[:msgMeta][:mqttBrokerAddress],
[mqttMsgReceiveTopic],
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback;
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
)
response = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt)
try disconnect(mqttInstance.client) catch end
return response
end
function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel::Symbol,
outgoingMsg::Dict{Symbol, T}; timeout::Integer=60, maxattempt::Integer=1
)::NamedTuple where {T<:Any}
timepass = nothing
attempts = 0
while attempts <= maxattempt
attempts += 1
if attempts > 1
println("\nsendReceiveMqttMsg() attempts $attempts ", @__FILE__, ":", @__LINE__, " $(Dates.now())")
pprintln(outgoingMsg)
println("--------------\n")
end
sendMqttMsg(mqttInstance, outgoingMsg)
starttime = Dates.now()
while true
timepass = timedifference(starttime, Dates.now(), "seconds")
if timepass <= timeout
if isready(mqttInstance.msgReceiveChannel[receivechannel])
incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel])
if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
if incomingMsg[:msgMeta][:acknowledgestatus] == "NACK"
println("NACK")
break # resend the msg
else
return (success=true, error=nothing, response=incomingMsg[:payload])
end
end
end
else
break
end
sleep(1)
end
end
return (success=false,
error="no response, timeout $timepass/$timeout, $(attempts-1) publish attempted",
response=nothing)
end
# function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
# timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
# mqttMsgReceiveTopic = "/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"
# mqttMsgReceiveChannel = Channel(4)
# # ask message receiver to send a message back to specified topic
# outgoingMsg[:msgMeta][:replyTopic] = mqttMsgReceiveTopic
# # Define the callback for receiving messages.
# function onMsgCallback(topic, payload)
# jobj = JSON3.read(String(payload))
# onMsg = copy(jobj)
# put!(mqttMsgReceiveChannel, onMsg)
# end
# # Instantiate a client and connection.
# client, connection = MakeConnection(outgoingMsg[:msgMeta][:mqttBrokerAddress],
# outgoingMsg[:msgMeta][:mqttBrokerPort])
# connect(client, connection)
# subscribe(client, mqttMsgReceiveTopic, onMsgCallback, qos=QOS_1)
# timepass = nothing
# attempts = 1
# while attempts <= maxattempt
# publish(client, outgoingMsg[:msgMeta][:sendTopic], JSON3.write(outgoingMsg))
# starttime = Dates.now()
# while true
# timepass = timedifference(starttime, Dates.now(), "seconds")
# if timepass <= timeout
# if isready(mqttMsgReceiveChannel)
# incomingMsg = take!(mqttMsgReceiveChannel)
# if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
# if msgPurpose == "NACK"
# break # resend the msg
# else
# # disconnect mqtt
# unsubscribe(client, mqttMsgReceiveTopic)
# disconnect(client)
# return (success=true, error=nothing, response=incomingMsg[:payload])
# end
# end
# end
# else
# break
# end
# sleep(1)
# end
# println("attempts $attempts ", @__FILE__, " ", @__LINE__)
# attempts += 1
# end
# # disconnect mqtt
# unsubscribe(client, mqttMsgReceiveTopic)
# disconnect(client)
# return (success=false,
# error="no response, timeout $timepass/$timeout, $attempts publish attempted",
# response=nothing)
# end
""" Send a message to specified MQTT topic then wait for reply.
# Arguments
- `outgoingMsg::Dict`
an outgoing message
# Return
- `result::NamedTuple`
( success= true, error= nothing )
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, UUIDs
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= uuid4snakecase(),
mqttBrokerAddress= "test.mosquitto.org",
mqttBrokerPort= 1883,
)
julia> outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict("hello"=> "World"),
)
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
```
# TODO
- [x] implement crude version of this function
- [] update docs
- [] transfer multiple msg with the same mqtt connection. The transfer loop should be
implemented by passing mqttInstance into sendReceiveMqttMsg() so that
1-connection is used to send all data
- [] partsize should be calculate from the fact that MQTT max size is 4MB (256MB theorectically)
# Signature
"""
function dataTransferOverMQTT_sender(workDict::Dict, incomingMsg::Dict; data=nothing)
dataTransferSessionID = nothing
if !haskey(incomingMsg[:payload], :dataTransferSessionID)
""" Expected incomingMsg (without "dataTransferSessionID") for requesting metadata
Dict(:payload => Dict(:args => Dict(), :functioncall => "load_winetable"),
:msgMeta => Dict(:msgPurpose => nothing,
:receiverId => nothing,
:getpost => nothing,
:msgId => "7fe4b765_fa2e_4c2e_9df0_9cb97e84ec32",
:requestresponse => nothing,
:msgFormatVersion => nothing,
:timestamp => "Tue Oct 01 2024 16:22:12 GMT+0700 (เวลาอินโดจีน)",
:replyToMsgId => nothing,
:acknowledgestatus => nothing,
:sendTopic => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
:mqttBrokerPort => nothing,
:receiverName => "agent_wine_backend",
:replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
:mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
:senderName => "agent_wine_frontend",
:senderselfnote => nothing,
:senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
"""
if data !== nothing # if the user provide data, store it in a new session workspace
dataTransferSessionID = uuid4snakecase()
haskey(workDict, :dataTransferOverMQTT) ? nothing : workDict[:dataTransferOverMQTT] = Dict()
workDict[:dataTransferOverMQTT][dataTransferSessionID] = Dict()
s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
s[:dataTransferSessionID] = dataTransferSessionID
dataDict = Dict(pairs(data))
merge!(s, dataDict)
else
error("No data provided")
end
# compute hash value for each piece of data
s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
s[:hashes] = [bytes2hex(sha256(JSON3.write(s[:dataparts][i]))) for i in 1:s[:totalparts]]
# send metadata without data to receiver()
datainfo = Dict()
for (k, v) in s
if k [:dataparts]
datainfo[k] = v
end
end
d = Dict(
:result=> "dataTransferOverMQTT",
:dataTransferSessionID=> dataTransferSessionID,
Symbol(dataTransferSessionID)=> datainfo
)
""" Sending metadata to the requester
Dict(:result => "dataTransferOverMQTT",
:b57123aa_a4c6_4d07_ace2_4885005d51c6 => Dict(:partsize => 3,
:hashes => ["755fb25ae04890fa2a40c2b5fb5ddf05adeff6b934475998576bdf307359cd0f",
"b95ba78344ad409755a386b6ab0cb2c8ea490756ce2df401aa6e5f77cf445025",
"5b51277dd06ca255492cfc432824adf77e9e0de94179806cb130bef38e75da48"],
:datatype => "vector{Dict}",
:dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6",
:totalparts => 46),
:dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6")
"""
return d
else
""" Expected incomingMsg (with "dataTransferSessionID") for requesting a data part
Dict(:payload => Dict(:dataTransferSessionID => "5543f3b9_194e_4670_b961_e51c77955e40", :args => Dict(), :requestingDataPartNumber => 1, :functioncall => "load_winetable"),
:msgMeta => Dict(:msgPurpose => nothing,
:receiverId => nothing,
:getpost => nothing,
:msgId => "fbcc3bd9_c2bc_4bde_908a_dfe88ab7a305",
:requestresponse => nothing,
:msgFormatVersion => nothing,
:timestamp => "Tue Oct 01 2024 16:34:06 GMT+0700 (เวลาอินโดจีน)",
:replyToMsgId => nothing,
:acknowledgestatus => nothing,
:sendTopic => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
:mqttBrokerPort => nothing,
:receiverName => "agent_wine_backend",
:replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
:mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
:senderName => "agent_wine_frontend",
:senderselfnote => nothing,
:senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
"""
# check transfer session id
dataTransferSessionID = incomingMsg[:payload][:dataTransferSessionID]
clientRequestingDataPartNumber =
incomingMsg[:payload][:requestingDataPartNumber]
s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
d = Dict(
:result=> "dataTransferOverMQTT",
:dataTransferSessionID=> dataTransferSessionID,
Symbol(dataTransferSessionID)=> Dict(
:dataPartNumber=> clientRequestingDataPartNumber,
:datapart=> s[:dataparts][clientRequestingDataPartNumber]
),
)
return d
end
end
""" Send a message to specified MQTT topic then wait for reply.
# Arguments
- `outgoingMsg::Dict`
an outgoing message
# Return
- `result::NamedTuple`
( success= true, error= nothing )
# Example
```jldoctest
julia> using Revise
julia> using GeneralUtils, Dates, JSON3, UUIDs
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= uuid4snakecase(),
mqttBrokerAddress= "test.mosquitto.org",
mqttBrokerPort= 1883,
)
julia> outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict("hello"=> "World"),
)
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
```
# TODO
- [PENDING]
# Signature
"""
function dataTransferOverMQTT_receiver()
end
end # module communication