This commit is contained in:
2024-10-01 17:12:54 +07:00
parent 564ed3cb70
commit 565d8bb199
5 changed files with 213 additions and 141 deletions

View File

@@ -1,9 +1,11 @@
module communication
export generate_msgMeta, isMqttConnectionAlive, checkMqttConnection!,
sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2
sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2,
dataTransferOverMQTT_sender, dataTransferOverMQTT_receiver
using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient
using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames,
SHA
using ..util
# ---------------------------------------------- 100 --------------------------------------------- #
@@ -33,7 +35,7 @@ function mqttClientInstance_v1(
keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback()
onMsgCallback::Function
;
keepalivetopic::String= "/keepalive/$(uuid4())",
keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
keepaliveCheckInterval::Integer=30,
qos::MQTTClient.QOS=QOS_1,
)
@@ -77,6 +79,7 @@ mutable struct mqttClientInstance_v2 <: mqttClientInstance
keepaliveChannel::Channel
keepaliveCheckInterval::Integer # second
lastTimeMqttConnCheck::DateTime
multiMsg::String # "single", "metadata", "datapart"
end
""" MQTT client v2. The difference between client v1 and v2 is v2 allows multiple
msgReceiveChannel without having to redefine mqttClientInstance again like in v1.
@@ -120,9 +123,9 @@ end
incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively
if occursin("topic_1", topic)
push!(mqttMsgReceiveTopic[:ch1], incomingMqttMsg)
put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg)
elseif occursin("topic_2", topic)
push!(mqttMsgReceiveTopic[:ch2], incomingMqttMsg)
put!(mqttMsgReceiveChannel[:ch2], incomingMqttMsg)
elseif occursin("keepalive", topic)
put!(keepaliveChannel, incomingMqttMsg)
else
@@ -148,9 +151,10 @@ function mqttClientInstance_v2(
onMsgCallback::Function
;
mqttBrokerPort::Integer=1883,
keepalivetopic::String= "/keepalive/$(uuid4())",
keepalivetopic::String= "/keepalive/$(uuid4snakecase())",
keepaliveCheckInterval::Integer=30,
qos::MQTTClient.QOS=QOS_1,
multiMsg::String="single",
)
client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort)
@@ -173,6 +177,7 @@ function mqttClientInstance_v2(
keepaliveChannel,
keepaliveCheckInterval,
Dates.now(),
multiMsg,
)
return instance
@@ -193,17 +198,17 @@ end
- `senderName`
sender name (String) e.g. "agent-wine-web-frontend"
- `senderId`
sender id e.g. string(uuid4())
sender id e.g. uuid4snakecase()
- `receiverName`
msg receiver name (String) e.g. "agent-backend"
- `receiverId`
msg receiver id, nothing means everyone in the topic e.g. string(uuid4())
msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
- `requestresponse`
this message is a "request" or "response"
- `getpost`
this message is a "get" or "post"
- `msgId`
message ID e.g. string(uuid4())
message ID e.g. uuid4snakecase()
- `timestamp`
message published timestamp e.g. string(Dates.now())
- `senderselfnote`
@@ -211,9 +216,9 @@ end
- `replyTopic`
sender ask receiver to reply to this topic e.g. "/agent/frontend"
- `replyToMsgId`
sender is responding to this msg ID e.g. string(uuid4())
sender is responding to this msg ID e.g. uuid4snakecase()
- `acknowledgestatus`
e.g. "acknowledged"
e.g. "ACK", "NACK"
- `mqttBrokerAddress`
MQTT broker URI. e.g. "test.mosquitto.org"
- `mqttBrokerPort`
@@ -221,9 +226,6 @@ end
- `msgFormatVersion`
- `multiMsgData`
# Return
- `msgMeta`
@@ -244,12 +246,12 @@ function generate_msgMeta(
;
msgPurpose::T1= "nothing", # purpose of this message e.g. "updateStatus"
senderName::T1= "nothing", # sender name (String) e.g. "agent-wine-web-frontend"
senderId::T1= "nothing", # sender id e.g. string(uuid4())
senderId::T1= "nothing", # sender id e.g. uuid4snakecase()
receiverName::T1= "nothing", # msg receiver name (String) e.g. "agent-backend"
receiverId::T1= "nothing", # id of msg receiver, nothing means everyone in the topic e.g. string(uuid4())
receiverId::T1= "nothing", # id of msg receiver, nothing means everyone in the topic e.g. uuid4snakecase()
requestresponse::T1= "nothing", # this message is a "request" or "response"
getpost::T1= "nothing", # this message is a "get" or "post"
msgId::T1= string(uuid4()),
msgId::T1= uuid4snakecase(),
timestamp= string(Dates.now()), # message published timestamp
# help sender manage incoming reply msg, could be "for text inference", "img generation"
@@ -259,12 +261,11 @@ function generate_msgMeta(
# e.g. "/agent/frontend/wine/chat/api/v1/txt/receive"
replyTopic::T1= "nothing",
replyToMsgId::T1= "nothing", # sender is responding to this msg ID
acknowledgestatus::T1= "nothing", # "acknowledged",
acknowledgestatus::T1= "nothing", # "ACK", "NACK"
mqttBrokerAddress::T1= "test.mosquitto.org",
mqttBrokerPort::Integer= 1883,
msgFormatVersion::T1= "nothing",
multiMsgData::T1= "nothing",
)::Dict{Symbol, Any} where {T1<:AbstractString}
@@ -272,9 +273,9 @@ function generate_msgMeta(
:sendTopic=> sendTopic, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt"
:msgPurpose=> msgPurpose, # purpose of this message e.g. "updateStatus"
:senderName=> senderName, # sender name (String) e.g. "agent-wine-web-frontend"
:senderId=> senderId, # sender id e.g. string(uuid4())
:senderId=> senderId, # sender id e.g. uuid4snakecase()
:receiverName=> receiverName, # msg receiver name (String) e.g. "agent-backend"
:receiverId=> receiverId, # msg receiver id, nothing means everyone in the topic e.g. string(uuid4())
:receiverId=> receiverId, # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase()
:requestResponse=> requestresponse, # this message is a "request" or "response"
:getPost=> getpost, # this message is a "get" or "post"
:msgId=> msgId,
@@ -294,7 +295,6 @@ function generate_msgMeta(
:mqttBrokerPort=> mqttBrokerPort,
:msgFormatVersion=> msgFormatVersion,
:multiMsgData=> multiMsgData,
)
return msgMeta
@@ -441,11 +441,14 @@ function checkMqttConnection!(mqttInstance::T;
# check if mqtt connection is still up
intervaldiff = timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds")
isdisconnected = false # flag
if intervaldiff > interval
while true
mqttConnStatus = isMqttConnectionAlive(mqttInstance)
if mqttConnStatus == false
println("Attemping to reconnect")
isdisconnected = true
println("mqtt connection $mqttConnStatus, Attemping to reconnect")
# use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet
mqttInstance.client, mqttInstance.connection =
MakeConnection(mqttInstance.mqttBrokerAddress,
@@ -457,7 +460,9 @@ function checkMqttConnection!(mqttInstance::T;
MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos)
else
mqttInstance.lastTimeMqttConnCheck = Dates.now()
println("Connected")
if isdisconnected
println("connected to mqtt broker")
end
return mqttConnStatus
break
end
@@ -516,7 +521,7 @@ end
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= string(uuid4()),
senderId= uuid4snakecase(),
mqttBrokerAddress= "test.mosquitto.org",
mqttBrokerPort= 1883,
)
@@ -529,6 +534,29 @@ end
# Signature
"""
function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any}
mqttMsgReceiveTopic = ["/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"]
mqttMsgReceiveChannel = (ch1=Channel(8),)
keepaliveChannel = Channel(8)
# Define the callback for receiving messages.
function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel[:ch1], onMsg)
end
mqttInstance = mqttClientInstance_v2(
outgoingMsg[:msgMeta][:mqttBrokerAddress],
mqttMsgReceiveTopic,
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback;
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
)
return sendMqttMsg(mqttInstance, outgoingMsg)
end
function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T}
)::NamedTuple where {T<:Any}
try
@@ -583,7 +611,7 @@ julia> using GeneralUtils, Dates, UUIDs, JSON3
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= string(uuid4()),
senderId= uuid4snakecase(),
mqttBrokerAddress= "mqtt.yiem.cc",
mqttBrokerPort= 1883,
)
@@ -598,8 +626,7 @@ julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg)
"""
function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any}
# mqttMsgReceiveTopic = ["/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"]
mqttMsgReceiveTopic = ["/receivetopic"]
mqttMsgReceiveTopic = ["/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"]
mqttMsgReceiveChannel = (ch1=Channel(8),)
keepaliveChannel = Channel(8)
@@ -615,7 +642,8 @@ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T};
mqttMsgReceiveTopic,
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback
onMsgCallback;
mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort]
)
return sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt)
@@ -639,7 +667,7 @@ function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel:
incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel])
if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId]
if incomingMsg[:msgMeta][:msgPurpose] == "NACK"
if incomingMsg[:msgMeta][:acknowledgestatus] == "NACK"
println("NACK")
break # resend the msg
else
@@ -744,7 +772,7 @@ end
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= string(uuid4()),
senderId= uuid4snakecase(),
mqttBrokerAddress= "test.mosquitto.org",
mqttBrokerPort= 1883,
)
@@ -755,91 +783,126 @@ end
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
```
# TODO
- [TESTING] implement crude version of this function
- [x] implement crude version of this function
- [] update docs
- [] transfer multiple msg with the same mqtt connection. The transfer loop should be
implemented by passing mqttInstance into sendReceiveMqttMsg() so that
1-connection is used to send all data
- [] partsize should be calculate from the fact that MQTT max size is 4MB (256MB theorectically)
# Signature
"""
function transferDataOverMQTT_sender(disintegrateF::Function, data, partsize, msgMeta)
function dataTransferOverMQTT_sender(workDict::Dict, incomingMsg::Dict; data=nothing)
dataTransferSessionID = nothing
# use disintegrateF(workDict, data) to disintegrate data into small parts
d = disintegrateF(data, partsize)
if !haskey(incomingMsg[:payload], :dataTransferSessionID)
""" Expected incomingMsg (without "dataTransferSessionID") for requesting metadata
Dict(:payload => Dict(:args => Dict(), :functioncall => "load_winetable"),
:msgMeta => Dict(:msgPurpose => nothing,
:receiverId => nothing,
:getpost => nothing,
:msgId => "7fe4b765_fa2e_4c2e_9df0_9cb97e84ec32",
:requestresponse => nothing,
:msgFormatVersion => nothing,
:timestamp => "Tue Oct 01 2024 16:22:12 GMT+0700 (เวลาอินโดจีน)",
:replyToMsgId => nothing,
:acknowledgestatus => nothing,
:sendTopic => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
:mqttBrokerPort => nothing,
:receiverName => "agent_wine_backend",
:replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
:mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
:senderName => "agent_wine_frontend",
:senderselfnote => nothing,
:senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
"""
# compute hash value for each piece of data
hashes = [hash(d[i]) for i in 1:d[:totalparts]]
for i in 1:totalpart
push!(hashes, hash(d[i]))
end
if data !== nothing # if the user provide data, store it in a new session workspace
dataTransferSessionID = uuid4snakecase()
haskey(workDict, :dataTransferOverMQTT) ? nothing : workDict[:dataTransferOverMQTT] = Dict()
workDict[:dataTransferOverMQTT][dataTransferSessionID] = Dict()
s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
s[:dataTransferSessionID] = dataTransferSessionID
dataDict = Dict(pairs(data))
merge!(s, dataDict)
else
error("No data provided")
end
mqttMsgReceiveTopic = "/GeneralUtils_transferDataOverMQTT/$(msgMeta[:senderId])"
mqttMsgReceiveChannel = (ch1=Channel(8),)
keepaliveChannel = Channel(8)
# compute hash value for each piece of data
s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
s[:hashes] = [bytes2hex(sha256(JSON3.write(s[:dataparts][i]))) for i in 1:s[:totalparts]]
# Define the callback for receiving messages.
function onMsgCallback(topic, payload)
jobj = JSON3.read(String(payload))
onMsg = copy(jobj)
put!(mqttMsgReceiveChannel[:ch1], onMsg)
end
# send metadata without data to receiver()
datainfo = Dict()
for (k, v) in s
if k [:dataparts]
datainfo[k] = v
end
end
mqttInstance = GeneralUtils.mqttClientInstance_v2(
msgMeta[:mqttBrokerAddress],
msgMeta[:mqttBrokerPort],
[mqttMsgReceiveTopic],
mqttMsgReceiveChannel,
keepaliveChannel,
onMsgCallback
)
d = Dict(
:result=> "dataTransferOverMQTT",
:dataTransferSessionID=> dataTransferSessionID,
Symbol(dataTransferSessionID)=> datainfo
)
""" Sending metadata to the requester
Dict(:result => "dataTransferOverMQTT",
:b57123aa_a4c6_4d07_ace2_4885005d51c6 => Dict(:partsize => 3,
:hashes => ["755fb25ae04890fa2a40c2b5fb5ddf05adeff6b934475998576bdf307359cd0f",
"b95ba78344ad409755a386b6ab0cb2c8ea490756ce2df401aa6e5f77cf445025",
"5b51277dd06ca255492cfc432824adf77e9e0de94179806cb130bef38e75da48"],
:datatype => "vector{Dict}",
:dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6",
:totalparts => 46),
:dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6")
"""
# 1st msg to send metadata
msgMeta[:replyTopic] = mqttMsgReceiveTopic # receiver() to send a message back to this topic
msgMeta[:multiMsgData] = "start"
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:datatype=> d[:datatype],
:totalparts=> d[:totalparts],
:partsize=> d[:partsize],
:hashes=> hashes
return d
else
""" Expected incomingMsg (with "dataTransferSessionID") for requesting a data part
Dict(:payload => Dict(:dataTransferSessionID => "5543f3b9_194e_4670_b961_e51c77955e40", :args => Dict(), :requestingDataPartNumber => 1, :functioncall => "load_winetable"),
:msgMeta => Dict(:msgPurpose => nothing,
:receiverId => nothing,
:getpost => nothing,
:msgId => "fbcc3bd9_c2bc_4bde_908a_dfe88ab7a305",
:requestresponse => nothing,
:msgFormatVersion => nothing,
:timestamp => "Tue Oct 01 2024 16:34:06 GMT+0700 (เวลาอินโดจีน)",
:replyToMsgId => nothing,
:acknowledgestatus => nothing,
:sendTopic => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing",
:mqttBrokerPort => nothing,
:receiverName => "agent_wine_backend",
:replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25",
:mqttBrokerAddress => "wss://mqtt.yiem.cc:8083",
:senderName => "agent_wine_frontend",
:senderselfnote => nothing,
:senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25"))
"""
# check transfer session id
dataTransferSessionID = incomingMsg[:payload][:dataTransferSessionID]
clientRequestingDataPartNumber =
incomingMsg[:payload][:requestingDataPartNumber]
s = workDict[:dataTransferOverMQTT][dataTransferSessionID]
d = Dict(
:result=> "dataTransferOverMQTT",
:dataTransferSessionID=> dataTransferSessionID,
Symbol(dataTransferSessionID)=> Dict(
:dataPartNumber=> clientRequestingDataPartNumber,
:datapart=> s[:dataparts][clientRequestingDataPartNumber]
),
)
)
pubresult = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=10, maxattempt=3)
pubresult[:success] == true ? nothing : return pubresult
# while loop for sending the data
msgMeta[:multiMsgData] = "transfering"
partnumber = 1
while partnumber <= d[:totalparts]
# publish
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict(
:partnumber=> partnumber,
:data=> d[:data][partnumber],
)
)
pubresult = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=10, maxattempt=3)
pubresult[:success] == true ? nothing : return pubresult
partnumber += 1
sleep(1)
return d
end
# final msg to send end msg
msgMeta[:multiMsgData] = "end"
outgoingMsg = Dict(
:msgMeta=> msgMeta,
:payload=> Dict()
)
pubresult = sendReceiveMqttMsg(outgoingMsg; timeout=10, maxattempt=3)
pubresult[:success] == true ? nothing : return pubresult
return (success=true, error=nothing, response=nothing)
end
""" Send a message to specified MQTT topic then wait for reply.
@@ -859,7 +922,7 @@ end
julia> msgMeta = GeneralUtils.generate_msgMeta(
"/testtopic",
senderName= "somename",
senderId= string(uuid4()),
senderId= uuid4snakecase(),
mqttBrokerAddress= "test.mosquitto.org",
mqttBrokerPort= 1883,
)
@@ -870,11 +933,11 @@ end
julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg)
```
# TODO
- [WORKING]
- [PENDING]
# Signature
"""
function transferDataOverMQTT_receiver()
function dataTransferOverMQTT_receiver()

View File

@@ -201,7 +201,7 @@ function jsoncorrection(config::T1, input::T2, correctJsonExample::T3;
msgMeta = GeneralUtils.generate_msgMeta(
externalService[:mqtttopic],
senderName= "jsoncorrection",
senderId= string(uuid4()),
senderId= uuid4snakecase(),
receiverName= "text2textinstruct",
mqttBroker= config[:mqttServerInfo][:broker],
mqttBrokerPort= config[:mqttServerInfo][:port],

View File

@@ -505,17 +505,19 @@ end
# Signature
"""
function disintegrate_vectorDict(data::Vector{Dict{T1, T2}}, partsize::Integer
) where {T1<:Any, T2<:Any}
function disintegrate_vectorDict(data::Vector, partsize::Integer
)
println("--> disintegrate_vectorDict()")
parts = Dict{Int, Vector{Dict}}()
for (i, dict) in enumerate(data)
# println("--> disintegrate_vectorDict ", i)
partkey = (i - 1) ÷ partsize + 1
if !haskey(parts, partkey)
parts[partkey] = Vector{Dict}()
end
push!(parts[partkey], dict)
end
return (datatype="vector{Dict}", totalparts=length(parts), partsize=partsize, data=parts)
return (datatype="vector{Dict}", totalparts=length(parts), partsize=partsize, dataparts=parts)
end