From 565d8bb1991daf3d55684ff54bd2d9629282a79c Mon Sep 17 00:00:00 2001 From: tonaerospace Date: Tue, 1 Oct 2024 17:12:54 +0700 Subject: [PATCH] update --- Manifest.toml | 74 ++++++------ Project.toml | 1 + src/communication.jl | 269 ++++++++++++++++++++++++++----------------- src/llmUtil.jl | 2 +- src/util.jl | 8 +- 5 files changed, 213 insertions(+), 141 deletions(-) diff --git a/Manifest.toml b/Manifest.toml index 615d5c2..2b2495d 100644 --- a/Manifest.toml +++ b/Manifest.toml @@ -1,8 +1,8 @@ # This file is machine-generated - editing it directly is not advised -julia_version = "1.10.4" +julia_version = "1.10.5" manifest_format = "2.0" -project_hash = "b6f51053f1fe5e6c447c7d94689166d2519b40e2" +project_hash = "db30cb95057dc4ab784a71c9e9348bd41786ee98" [[deps.AliasTables]] deps = ["PtrArrays", "Random"] @@ -64,10 +64,10 @@ uuid = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a" version = "1.16.0" [[deps.DataFrames]] -deps = ["Compat", "DataAPI", "DataStructures", "Future", "InlineStrings", "InvertedIndices", "IteratorInterfaceExtensions", "LinearAlgebra", "Markdown", "Missings", "PooledArrays", "PrecompileTools", "PrettyTables", "Printf", "REPL", "Random", "Reexport", "SentinelArrays", "SortingAlgorithms", "Statistics", "TableTraits", "Tables", "Unicode"] -git-tree-sha1 = "04c738083f29f86e62c8afc341f0967d8717bdb8" +deps = ["Compat", "DataAPI", "DataStructures", "Future", "InlineStrings", "InvertedIndices", "IteratorInterfaceExtensions", "LinearAlgebra", "Markdown", "Missings", "PooledArrays", "PrecompileTools", "PrettyTables", "Printf", "Random", "Reexport", "SentinelArrays", "SortingAlgorithms", "Statistics", "TableTraits", "Tables", "Unicode"] +git-tree-sha1 = "fb61b4812c49343d7ef0b533ba982c46021938a6" uuid = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" -version = "1.6.1" +version = "1.7.0" [[deps.DataStructures]] deps = ["Compat", "InteractiveUtils", "OrderedCollections"] @@ -90,9 +90,9 @@ uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b" [[deps.Distributions]] deps = ["AliasTables", "FillArrays", "LinearAlgebra", "PDMats", "Printf", "QuadGK", "Random", "SpecialFunctions", "Statistics", "StatsAPI", "StatsBase", "StatsFuns"] -git-tree-sha1 = "e6c693a0e4394f8fda0e51a5bdf5aef26f8235e9" +git-tree-sha1 = "d7477ecdafb813ddee2ae727afa94e9dcb5f3fb0" uuid = "31c24e10-a181-5473-b8eb-7969acd0382f" -version = "0.25.111" +version = "0.25.112" [deps.Distributions.extensions] DistributionsChainRulesCoreExt = "ChainRulesCore" @@ -134,9 +134,9 @@ uuid = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" [[deps.FillArrays]] deps = ["LinearAlgebra"] -git-tree-sha1 = "fd0002c0b5362d7eb952450ad5eb742443340d6e" +git-tree-sha1 = "6a70198746448456524cb442b8af316927ff3e1a" uuid = "1a297f60-69ca-5386-bcde-b61e274b549b" -version = "1.12.0" +version = "1.13.0" weakdeps = ["PDMats", "SparseArrays", "Statistics"] [deps.FillArrays.extensions] @@ -188,9 +188,9 @@ version = "1.0.0" [[deps.JLLWrappers]] deps = ["Artifacts", "Preferences"] -git-tree-sha1 = "7e5d6779a1e09a36db2a7b6cff50942a0a7d0fca" +git-tree-sha1 = "f389674c99bfcde17dc57454011aa44d5a260a40" uuid = "692b3bcd-3c85-4b1f-b108-f13ce0eb3210" -version = "1.5.0" +version = "1.6.0" [[deps.JSON3]] deps = ["Dates", "Mmap", "Parsers", "PrecompileTools", "StructTypes", "UUIDs"] @@ -206,9 +206,9 @@ version = "1.14.0" [[deps.JuliaInterpreter]] deps = ["CodeTracking", "InteractiveUtils", "Random", "UUIDs"] -git-tree-sha1 = "4b415b6cccb9ab61fec78a621572c82ac7fa5776" +git-tree-sha1 = "2984284a8abcfcc4784d95a9e2ea4e352dd8ede7" uuid = "aa1ae85d-cabe-5617-a682-6adf51b2e16a" -version = "0.9.35" +version = "0.9.36" [[deps.LaTeXStrings]] git-tree-sha1 = "50901ebc375ed41dbf8058da26f9de442febbbec" @@ -267,9 +267,9 @@ uuid = "56ddb016-857b-54e1-b83d-db4d58db5568" [[deps.LoweredCodeUtils]] deps = ["JuliaInterpreter"] -git-tree-sha1 = "1ce1834f9644a8f7c011eb0592b7fd6c42c90653" +git-tree-sha1 = "c2b5e92eaf5101404a58ce9c6083d595472361d6" uuid = "6f1432cf-f94c-5a45-995e-cdbf5db27b0b" -version = "3.0.1" +version = "3.0.2" [[deps.MQTTClient]] deps = ["Distributed", "Random", "Sockets"] @@ -365,24 +365,30 @@ version = "1.4.3" [[deps.PrettyTables]] deps = ["Crayons", "LaTeXStrings", "Markdown", "PrecompileTools", "Printf", "Reexport", "StringManipulation", "Tables"] -git-tree-sha1 = "66b20dd35966a748321d3b2537c4584cf40387c7" +git-tree-sha1 = "1101cd475833706e4d0e7b122218257178f48f34" uuid = "08abe8d2-0d0c-5749-adfa-8a2ac140af0d" -version = "2.3.2" +version = "2.4.0" [[deps.Printf]] deps = ["Unicode"] uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7" [[deps.PtrArrays]] -git-tree-sha1 = "f011fbb92c4d401059b2212c05c0601b70f8b759" +git-tree-sha1 = "77a42d78b6a92df47ab37e177b2deac405e1c88f" uuid = "43287f4e-b6f4-7ad1-bb20-aadabca52c3d" -version = "1.2.0" +version = "1.2.1" [[deps.QuadGK]] deps = ["DataStructures", "LinearAlgebra"] -git-tree-sha1 = "e237232771fdafbae3db5c31275303e056afaa9f" +git-tree-sha1 = "cda3b045cf9ef07a08ad46731f5a3165e56cf3da" uuid = "1fd47b50-473d-5c70-9696-f719f8f3bcdc" -version = "2.10.1" +version = "2.11.1" + + [deps.QuadGK.extensions] + QuadGKEnzymeExt = "Enzyme" + + [deps.QuadGK.weakdeps] + Enzyme = "7da242da-08ed-463a-9acd-ee780be4f1d9" [[deps.REPL]] deps = ["InteractiveUtils", "Markdown", "Sockets", "Unicode"] @@ -405,21 +411,21 @@ version = "1.3.0" [[deps.Revise]] deps = ["CodeTracking", "Distributed", "FileWatching", "JuliaInterpreter", "LibGit2", "LoweredCodeUtils", "OrderedCollections", "REPL", "Requires", "UUIDs", "Unicode"] -git-tree-sha1 = "7b7850bb94f75762d567834d7e9802fc22d62f9c" +git-tree-sha1 = "0a20a01fbb3a9531f3325a94b6dcf95c404a1658" uuid = "295af30f-e4ad-537b-8983-00126c2a3abe" -version = "3.5.18" +version = "3.6.0" [[deps.Rmath]] deps = ["Random", "Rmath_jll"] -git-tree-sha1 = "f65dcb5fa46aee0cf9ed6274ccbd597adc49aa7b" +git-tree-sha1 = "852bd0f55565a9e973fcfee83a84413270224dc4" uuid = "79098fc4-a85e-5d69-aa6a-4863f24498fa" -version = "0.7.1" +version = "0.8.0" [[deps.Rmath_jll]] deps = ["Artifacts", "JLLWrappers", "Libdl"] -git-tree-sha1 = "e60724fd3beea548353984dc61c943ecddb0e29a" +git-tree-sha1 = "58cdd8fb2201a6267e1db87ff148dd6c1dbd8ad8" uuid = "f50d1b31-88e8-58de-be2c-1cc44531875f" -version = "0.4.3+0" +version = "0.5.1+0" [[deps.SHA]] uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce" @@ -479,9 +485,9 @@ version = "0.34.3" [[deps.StatsFuns]] deps = ["HypergeometricFunctions", "IrrationalConstants", "LogExpFunctions", "Reexport", "Rmath", "SpecialFunctions"] -git-tree-sha1 = "cef0472124fab0695b58ca35a77c6fb942fdab8a" +git-tree-sha1 = "b423576adc27097764a90e163157bcfc9acf0f46" uuid = "4c63d2b9-4356-54db-8cca-17b64c39e42c" -version = "1.3.1" +version = "1.3.2" [deps.StatsFuns.extensions] StatsFunsChainRulesCoreExt = "ChainRulesCore" @@ -493,15 +499,15 @@ version = "1.3.1" [[deps.StringManipulation]] deps = ["PrecompileTools"] -git-tree-sha1 = "a04cabe79c5f01f4d723cc6704070ada0b9d46d5" +git-tree-sha1 = "a6b1675a536c5ad1a60e5a5153e1fee12eb146e3" uuid = "892a3eda-7b42-436c-8928-eab12a02cf0e" -version = "0.3.4" +version = "0.4.0" [[deps.StructTypes]] deps = ["Dates", "UUIDs"] -git-tree-sha1 = "ca4bccb03acf9faaf4137a9abc1881ed1841aa70" +git-tree-sha1 = "159331b30e94d7b11379037feeb9b690950cace8" uuid = "856f2bd8-1eba-4b0a-8007-ebc267875bd4" -version = "1.10.0" +version = "1.11.0" [[deps.SuiteSparse]] deps = ["Libdl", "LinearAlgebra", "Serialization", "SparseArrays"] @@ -565,7 +571,7 @@ version = "1.2.13+1" [[deps.libblastrampoline_jll]] deps = ["Artifacts", "Libdl"] uuid = "8e850b90-86db-534c-a0d3-1478176c7d93" -version = "5.8.0+1" +version = "5.11.0+0" [[deps.nghttp2_jll]] deps = ["Artifacts", "Libdl"] diff --git a/Project.toml b/Project.toml index 2f29082..73799dd 100644 --- a/Project.toml +++ b/Project.toml @@ -13,4 +13,5 @@ JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" MQTTClient = "985f35cc-2c3d-4943-b8c1-f0931d5f0959" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" +SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce" UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" diff --git a/src/communication.jl b/src/communication.jl index 5e6cc7d..d71ff7e 100644 --- a/src/communication.jl +++ b/src/communication.jl @@ -1,9 +1,11 @@ module communication export generate_msgMeta, isMqttConnectionAlive, checkMqttConnection!, - sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2 + sendMqttMsg, sendReceiveMqttMsg, mqttClientInstance, mqttClientInstance_v2, + dataTransferOverMQTT_sender, dataTransferOverMQTT_receiver -using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient +using JSON3, DataStructures, Distributions, Random, Dates, UUIDs, MQTTClient, DataFrames, + SHA using ..util # ---------------------------------------------- 100 --------------------------------------------- # @@ -33,7 +35,7 @@ function mqttClientInstance_v1( keepaliveChannel::Channel, # user needs to specify because it has to be accessible by user-defined onMsgCallback() onMsgCallback::Function ; - keepalivetopic::String= "/keepalive/$(uuid4())", + keepalivetopic::String= "/keepalive/$(uuid4snakecase())", keepaliveCheckInterval::Integer=30, qos::MQTTClient.QOS=QOS_1, ) @@ -77,6 +79,7 @@ mutable struct mqttClientInstance_v2 <: mqttClientInstance keepaliveChannel::Channel keepaliveCheckInterval::Integer # second lastTimeMqttConnCheck::DateTime + multiMsg::String # "single", "metadata", "datapart" end """ MQTT client v2. The difference between client v1 and v2 is v2 allows multiple msgReceiveChannel without having to redefine mqttClientInstance again like in v1. @@ -120,9 +123,9 @@ end incomingMqttMsg = copy(jobj) # convert json object into julia dictionary recursively if occursin("topic_1", topic) - push!(mqttMsgReceiveTopic[:ch1], incomingMqttMsg) + put!(mqttMsgReceiveChannel[:ch1], incomingMqttMsg) elseif occursin("topic_2", topic) - push!(mqttMsgReceiveTopic[:ch2], incomingMqttMsg) + put!(mqttMsgReceiveChannel[:ch2], incomingMqttMsg) elseif occursin("keepalive", topic) put!(keepaliveChannel, incomingMqttMsg) else @@ -148,9 +151,10 @@ function mqttClientInstance_v2( onMsgCallback::Function ; mqttBrokerPort::Integer=1883, - keepalivetopic::String= "/keepalive/$(uuid4())", + keepalivetopic::String= "/keepalive/$(uuid4snakecase())", keepaliveCheckInterval::Integer=30, qos::MQTTClient.QOS=QOS_1, + multiMsg::String="single", ) client, connection = MQTTClient.MakeConnection(mqttBrokerAddress, mqttBrokerPort) @@ -173,6 +177,7 @@ function mqttClientInstance_v2( keepaliveChannel, keepaliveCheckInterval, Dates.now(), + multiMsg, ) return instance @@ -193,17 +198,17 @@ end - `senderName` sender name (String) e.g. "agent-wine-web-frontend" - `senderId` - sender id e.g. string(uuid4()) + sender id e.g. uuid4snakecase() - `receiverName` msg receiver name (String) e.g. "agent-backend" - `receiverId` - msg receiver id, nothing means everyone in the topic e.g. string(uuid4()) + msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase() - `requestresponse` this message is a "request" or "response" - `getpost` this message is a "get" or "post" - `msgId` - message ID e.g. string(uuid4()) + message ID e.g. uuid4snakecase() - `timestamp` message published timestamp e.g. string(Dates.now()) - `senderselfnote` @@ -211,9 +216,9 @@ end - `replyTopic` sender ask receiver to reply to this topic e.g. "/agent/frontend" - `replyToMsgId` - sender is responding to this msg ID e.g. string(uuid4()) + sender is responding to this msg ID e.g. uuid4snakecase() - `acknowledgestatus` - e.g. "acknowledged" + e.g. "ACK", "NACK" - `mqttBrokerAddress` MQTT broker URI. e.g. "test.mosquitto.org" - `mqttBrokerPort` @@ -221,9 +226,6 @@ end - `msgFormatVersion` - - `multiMsgData` - - # Return - `msgMeta` @@ -244,12 +246,12 @@ function generate_msgMeta( ; msgPurpose::T1= "nothing", # purpose of this message e.g. "updateStatus" senderName::T1= "nothing", # sender name (String) e.g. "agent-wine-web-frontend" - senderId::T1= "nothing", # sender id e.g. string(uuid4()) + senderId::T1= "nothing", # sender id e.g. uuid4snakecase() receiverName::T1= "nothing", # msg receiver name (String) e.g. "agent-backend" - receiverId::T1= "nothing", # id of msg receiver, nothing means everyone in the topic e.g. string(uuid4()) + receiverId::T1= "nothing", # id of msg receiver, nothing means everyone in the topic e.g. uuid4snakecase() requestresponse::T1= "nothing", # this message is a "request" or "response" getpost::T1= "nothing", # this message is a "get" or "post" - msgId::T1= string(uuid4()), + msgId::T1= uuid4snakecase(), timestamp= string(Dates.now()), # message published timestamp # help sender manage incoming reply msg, could be "for text inference", "img generation" @@ -259,12 +261,11 @@ function generate_msgMeta( # e.g. "/agent/frontend/wine/chat/api/v1/txt/receive" replyTopic::T1= "nothing", replyToMsgId::T1= "nothing", # sender is responding to this msg ID - acknowledgestatus::T1= "nothing", # "acknowledged", + acknowledgestatus::T1= "nothing", # "ACK", "NACK" mqttBrokerAddress::T1= "test.mosquitto.org", mqttBrokerPort::Integer= 1883, msgFormatVersion::T1= "nothing", - multiMsgData::T1= "nothing", )::Dict{Symbol, Any} where {T1<:AbstractString} @@ -272,9 +273,9 @@ function generate_msgMeta( :sendTopic=> sendTopic, # topic the sender sends to e.g. "/agent/wine/api/v1/prompt" :msgPurpose=> msgPurpose, # purpose of this message e.g. "updateStatus" :senderName=> senderName, # sender name (String) e.g. "agent-wine-web-frontend" - :senderId=> senderId, # sender id e.g. string(uuid4()) + :senderId=> senderId, # sender id e.g. uuid4snakecase() :receiverName=> receiverName, # msg receiver name (String) e.g. "agent-backend" - :receiverId=> receiverId, # msg receiver id, nothing means everyone in the topic e.g. string(uuid4()) + :receiverId=> receiverId, # msg receiver id, nothing means everyone in the topic e.g. uuid4snakecase() :requestResponse=> requestresponse, # this message is a "request" or "response" :getPost=> getpost, # this message is a "get" or "post" :msgId=> msgId, @@ -294,7 +295,6 @@ function generate_msgMeta( :mqttBrokerPort=> mqttBrokerPort, :msgFormatVersion=> msgFormatVersion, - :multiMsgData=> multiMsgData, ) return msgMeta @@ -441,11 +441,14 @@ function checkMqttConnection!(mqttInstance::T; # check if mqtt connection is still up intervaldiff = timedifference(mqttInstance.lastTimeMqttConnCheck, Dates.now(), "seconds") + isdisconnected = false # flag + if intervaldiff > interval while true mqttConnStatus = isMqttConnectionAlive(mqttInstance) if mqttConnStatus == false - println("Attemping to reconnect") + isdisconnected = true + println("mqtt connection $mqttConnStatus, Attemping to reconnect") # use new client to reconnect instead of the previous one because I don't want to modify MQTTClient.jl yet mqttInstance.client, mqttInstance.connection = MakeConnection(mqttInstance.mqttBrokerAddress, @@ -457,7 +460,9 @@ function checkMqttConnection!(mqttInstance::T; MQTTClient.subscribe(mqttInstance.client, mqttInstance.keepalivetopic, mqttInstance.onMsgCallback, qos=mqttInstance.qos) else mqttInstance.lastTimeMqttConnCheck = Dates.now() - println("Connected") + if isdisconnected + println("connected to mqtt broker") + end return mqttConnStatus break end @@ -516,7 +521,7 @@ end julia> msgMeta = GeneralUtils.generate_msgMeta( "/testtopic", senderName= "somename", - senderId= string(uuid4()), + senderId= uuid4snakecase(), mqttBrokerAddress= "test.mosquitto.org", mqttBrokerPort= 1883, ) @@ -529,6 +534,29 @@ end # Signature """ +function sendMqttMsg(outgoingMsg::Dict{Symbol, T})::NamedTuple where {T<:Any} + mqttMsgReceiveTopic = ["/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"] + mqttMsgReceiveChannel = (ch1=Channel(8),) + keepaliveChannel = Channel(8) + + # Define the callback for receiving messages. + function onMsgCallback(topic, payload) + jobj = JSON3.read(String(payload)) + onMsg = copy(jobj) + put!(mqttMsgReceiveChannel[:ch1], onMsg) + end + + mqttInstance = mqttClientInstance_v2( + outgoingMsg[:msgMeta][:mqttBrokerAddress], + mqttMsgReceiveTopic, + mqttMsgReceiveChannel, + keepaliveChannel, + onMsgCallback; + mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort] + ) + + return sendMqttMsg(mqttInstance, outgoingMsg) +end function sendMqttMsg(mqttInstance::mqttClientInstance, outgoingMsg::Dict{Symbol, T} )::NamedTuple where {T<:Any} try @@ -583,7 +611,7 @@ julia> using GeneralUtils, Dates, UUIDs, JSON3 julia> msgMeta = GeneralUtils.generate_msgMeta( "/testtopic", senderName= "somename", - senderId= string(uuid4()), + senderId= uuid4snakecase(), mqttBrokerAddress= "mqtt.yiem.cc", mqttBrokerPort= 1883, ) @@ -598,8 +626,7 @@ julia> success, error, response = GeneralUtils.sendReceiveMqttMsg(outgoingMsg) """ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T}; timeout::Integer=60, maxattempt::Integer=1)::NamedTuple where {T<:Any} - # mqttMsgReceiveTopic = ["/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"] - mqttMsgReceiveTopic = ["/receivetopic"] + mqttMsgReceiveTopic = ["/GeneralUtils_sendReceiveMqttMsg/$(outgoingMsg[:msgMeta][:senderId])"] mqttMsgReceiveChannel = (ch1=Channel(8),) keepaliveChannel = Channel(8) @@ -615,7 +642,8 @@ function sendReceiveMqttMsg(outgoingMsg::Dict{Symbol, T}; mqttMsgReceiveTopic, mqttMsgReceiveChannel, keepaliveChannel, - onMsgCallback + onMsgCallback; + mqttBrokerPort=outgoingMsg[:msgMeta][:mqttBrokerPort] ) return sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=timeout, maxattempt=maxattempt) @@ -639,7 +667,7 @@ function sendReceiveMqttMsg(mqttInstance::mqttClientInstance_v2, receivechannel: incomingMsg = take!(mqttInstance.msgReceiveChannel[receivechannel]) if incomingMsg[:msgMeta][:replyToMsgId] == outgoingMsg[:msgMeta][:msgId] - if incomingMsg[:msgMeta][:msgPurpose] == "NACK" + if incomingMsg[:msgMeta][:acknowledgestatus] == "NACK" println("NACK") break # resend the msg else @@ -744,7 +772,7 @@ end julia> msgMeta = GeneralUtils.generate_msgMeta( "/testtopic", senderName= "somename", - senderId= string(uuid4()), + senderId= uuid4snakecase(), mqttBrokerAddress= "test.mosquitto.org", mqttBrokerPort= 1883, ) @@ -755,91 +783,126 @@ end julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) ``` # TODO - - [TESTING] implement crude version of this function + - [x] implement crude version of this function + - [] update docs - [] transfer multiple msg with the same mqtt connection. The transfer loop should be implemented by passing mqttInstance into sendReceiveMqttMsg() so that 1-connection is used to send all data + - [] partsize should be calculate from the fact that MQTT max size is 4MB (256MB theorectically) # Signature """ -function transferDataOverMQTT_sender(disintegrateF::Function, data, partsize, msgMeta) +function dataTransferOverMQTT_sender(workDict::Dict, incomingMsg::Dict; data=nothing) + dataTransferSessionID = nothing - # use disintegrateF(workDict, data) to disintegrate data into small parts - d = disintegrateF(data, partsize) + if !haskey(incomingMsg[:payload], :dataTransferSessionID) + """ Expected incomingMsg (without "dataTransferSessionID") for requesting metadata + + Dict(:payload => Dict(:args => Dict(), :functioncall => "load_winetable"), + :msgMeta => Dict(:msgPurpose => nothing, + :receiverId => nothing, + :getpost => nothing, + :msgId => "7fe4b765_fa2e_4c2e_9df0_9cb97e84ec32", + :requestresponse => nothing, + :msgFormatVersion => nothing, + :timestamp => "Tue Oct 01 2024 16:22:12 GMT+0700 (เวลาอินโดจีน)", + :replyToMsgId => nothing, + :acknowledgestatus => nothing, + :sendTopic => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing", + :mqttBrokerPort => nothing, + :receiverName => "agent_wine_backend", + :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25", + :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083", + :senderName => "agent_wine_frontend", + :senderselfnote => nothing, + :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25")) + """ - # compute hash value for each piece of data - hashes = [hash(d[i]) for i in 1:d[:totalparts]] - for i in 1:totalpart - push!(hashes, hash(d[i])) - end + if data !== nothing # if the user provide data, store it in a new session workspace + dataTransferSessionID = uuid4snakecase() + haskey(workDict, :dataTransferOverMQTT) ? nothing : workDict[:dataTransferOverMQTT] = Dict() + workDict[:dataTransferOverMQTT][dataTransferSessionID] = Dict() + s = workDict[:dataTransferOverMQTT][dataTransferSessionID] + s[:dataTransferSessionID] = dataTransferSessionID + dataDict = Dict(pairs(data)) + merge!(s, dataDict) + else + error("No data provided") + end - mqttMsgReceiveTopic = "/GeneralUtils_transferDataOverMQTT/$(msgMeta[:senderId])" - mqttMsgReceiveChannel = (ch1=Channel(8),) - keepaliveChannel = Channel(8) + # compute hash value for each piece of data + s = workDict[:dataTransferOverMQTT][dataTransferSessionID] + s[:hashes] = [bytes2hex(sha256(JSON3.write(s[:dataparts][i]))) for i in 1:s[:totalparts]] - # Define the callback for receiving messages. - function onMsgCallback(topic, payload) - jobj = JSON3.read(String(payload)) - onMsg = copy(jobj) - put!(mqttMsgReceiveChannel[:ch1], onMsg) - end + # send metadata without data to receiver() + datainfo = Dict() + for (k, v) in s + if k ∉ [:dataparts] + datainfo[k] = v + end + end - mqttInstance = GeneralUtils.mqttClientInstance_v2( - msgMeta[:mqttBrokerAddress], - msgMeta[:mqttBrokerPort], - [mqttMsgReceiveTopic], - mqttMsgReceiveChannel, - keepaliveChannel, - onMsgCallback - ) + d = Dict( + :result=> "dataTransferOverMQTT", + :dataTransferSessionID=> dataTransferSessionID, + Symbol(dataTransferSessionID)=> datainfo + ) + + """ Sending metadata to the requester + Dict(:result => "dataTransferOverMQTT", + :b57123aa_a4c6_4d07_ace2_4885005d51c6 => Dict(:partsize => 3, + :hashes => ["755fb25ae04890fa2a40c2b5fb5ddf05adeff6b934475998576bdf307359cd0f", + "b95ba78344ad409755a386b6ab0cb2c8ea490756ce2df401aa6e5f77cf445025", + "5b51277dd06ca255492cfc432824adf77e9e0de94179806cb130bef38e75da48"], + :datatype => "vector{Dict}", + :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6", + :totalparts => 46), + :dataTransferSessionID => "b57123aa_a4c6_4d07_ace2_4885005d51c6") + """ - # 1st msg to send metadata - msgMeta[:replyTopic] = mqttMsgReceiveTopic # receiver() to send a message back to this topic - msgMeta[:multiMsgData] = "start" - outgoingMsg = Dict( - :msgMeta=> msgMeta, - :payload=> Dict( - :datatype=> d[:datatype], - :totalparts=> d[:totalparts], - :partsize=> d[:partsize], - :hashes=> hashes + return d + + else + + """ Expected incomingMsg (with "dataTransferSessionID") for requesting a data part + Dict(:payload => Dict(:dataTransferSessionID => "5543f3b9_194e_4670_b961_e51c77955e40", :args => Dict(), :requestingDataPartNumber => 1, :functioncall => "load_winetable"), + :msgMeta => Dict(:msgPurpose => nothing, + :receiverId => nothing, + :getpost => nothing, + :msgId => "fbcc3bd9_c2bc_4bde_908a_dfe88ab7a305", + :requestresponse => nothing, + :msgFormatVersion => nothing, + :timestamp => "Tue Oct 01 2024 16:34:06 GMT+0700 (เวลาอินโดจีน)", + :replyToMsgId => nothing, + :acknowledgestatus => nothing, + :sendTopic => "/yiem_branch_1/agent/wine/backend/db/api/v1/testing", + :mqttBrokerPort => nothing, + :receiverName => "agent_wine_backend", + :replyTopic => "/agent/wine/frontend/514b634a_90b9_4e07_891d_fdf55b1aed25", + :mqttBrokerAddress => "wss://mqtt.yiem.cc:8083", + :senderName => "agent_wine_frontend", + :senderselfnote => nothing, + :senderId => "514b634a_90b9_4e07_891d_fdf55b1aed25")) + """ + + # check transfer session id + dataTransferSessionID = incomingMsg[:payload][:dataTransferSessionID] + clientRequestingDataPartNumber = + incomingMsg[:payload][:requestingDataPartNumber] + s = workDict[:dataTransferOverMQTT][dataTransferSessionID] + + d = Dict( + :result=> "dataTransferOverMQTT", + :dataTransferSessionID=> dataTransferSessionID, + Symbol(dataTransferSessionID)=> Dict( + :dataPartNumber=> clientRequestingDataPartNumber, + :datapart=> s[:dataparts][clientRequestingDataPartNumber] + ), ) - ) - pubresult = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=10, maxattempt=3) - pubresult[:success] == true ? nothing : return pubresult - # while loop for sending the data - msgMeta[:multiMsgData] = "transfering" - partnumber = 1 - while partnumber <= d[:totalparts] - - # publish - outgoingMsg = Dict( - :msgMeta=> msgMeta, - :payload=> Dict( - :partnumber=> partnumber, - :data=> d[:data][partnumber], - ) - ) - - pubresult = sendReceiveMqttMsg(mqttInstance, :ch1, outgoingMsg; timeout=10, maxattempt=3) - pubresult[:success] == true ? nothing : return pubresult - - partnumber += 1 - - sleep(1) + return d end - # final msg to send end msg - msgMeta[:multiMsgData] = "end" - outgoingMsg = Dict( - :msgMeta=> msgMeta, - :payload=> Dict() - ) - pubresult = sendReceiveMqttMsg(outgoingMsg; timeout=10, maxattempt=3) - pubresult[:success] == true ? nothing : return pubresult - - return (success=true, error=nothing, response=nothing) end """ Send a message to specified MQTT topic then wait for reply. @@ -859,7 +922,7 @@ end julia> msgMeta = GeneralUtils.generate_msgMeta( "/testtopic", senderName= "somename", - senderId= string(uuid4()), + senderId= uuid4snakecase(), mqttBrokerAddress= "test.mosquitto.org", mqttBrokerPort= 1883, ) @@ -870,11 +933,11 @@ end julia> success, error = GeneralUtils.sendMqttMsg(outgoingMsg) ``` # TODO - - [WORKING] + - [PENDING] # Signature """ -function transferDataOverMQTT_receiver() +function dataTransferOverMQTT_receiver() diff --git a/src/llmUtil.jl b/src/llmUtil.jl index ccc570b..54f4f75 100644 --- a/src/llmUtil.jl +++ b/src/llmUtil.jl @@ -201,7 +201,7 @@ function jsoncorrection(config::T1, input::T2, correctJsonExample::T3; msgMeta = GeneralUtils.generate_msgMeta( externalService[:mqtttopic], senderName= "jsoncorrection", - senderId= string(uuid4()), + senderId= uuid4snakecase(), receiverName= "text2textinstruct", mqttBroker= config[:mqttServerInfo][:broker], mqttBrokerPort= config[:mqttServerInfo][:port], diff --git a/src/util.jl b/src/util.jl index 6f92278..b7a12e8 100644 --- a/src/util.jl +++ b/src/util.jl @@ -505,17 +505,19 @@ end # Signature """ -function disintegrate_vectorDict(data::Vector{Dict{T1, T2}}, partsize::Integer - ) where {T1<:Any, T2<:Any} +function disintegrate_vectorDict(data::Vector, partsize::Integer + ) + println("--> disintegrate_vectorDict()") parts = Dict{Int, Vector{Dict}}() for (i, dict) in enumerate(data) + # println("--> disintegrate_vectorDict ", i) partkey = (i - 1) ÷ partsize + 1 if !haskey(parts, partkey) parts[partkey] = Vector{Dict}() end push!(parts[partkey], dict) end - return (datatype="vector{Dict}", totalparts=length(parts), partsize=partsize, data=parts) + return (datatype="vector{Dict}", totalparts=length(parts), partsize=partsize, dataparts=parts) end