diff --git a/previousVersion/0.0.1/Manifest.toml b/previousVersion/0.0.1/Manifest.toml new file mode 100644 index 0000000..53893a0 --- /dev/null +++ b/previousVersion/0.0.1/Manifest.toml @@ -0,0 +1,144 @@ +# This file is machine-generated - editing it directly is not advised + +julia_version = "1.8.5" +manifest_format = "2.0" +project_hash = "fb16ae23b45a4c23db667056d8a1047b9686893f" + +[[deps.Artifacts]] +uuid = "56f22d72-fd6d-98f1-02f0-08ddc0907c33" + +[[deps.Base64]] +uuid = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" + +[[deps.Compat]] +deps = ["Dates", "LinearAlgebra", "UUIDs"] +git-tree-sha1 = "61fdd77467a5c3ad071ef8277ac6bd6af7dd4c04" +uuid = "34da2185-b29b-5c13-b0c7-acf172513d20" +version = "4.6.0" + +[[deps.CompilerSupportLibraries_jll]] +deps = ["Artifacts", "Libdl"] +uuid = "e66e0078-7015-5450-92f7-15fbd957f2ae" +version = "1.0.1+0" + +[[deps.DataStructures]] +deps = ["Compat", "InteractiveUtils", "OrderedCollections"] +git-tree-sha1 = "d1fff3a548102f48987a52a2e0d114fa97d730f0" +uuid = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" +version = "0.18.13" + +[[deps.Dates]] +deps = ["Printf"] +uuid = "ade2ca70-3891-5945-98fb-dc099432e06a" + +[[deps.InteractiveUtils]] +deps = ["Markdown"] +uuid = "b77e0a4c-d291-57a0-90e8-8db25a27a240" + +[[deps.JSON3]] +deps = ["Dates", "Mmap", "Parsers", "SnoopPrecompile", "StructTypes", "UUIDs"] +git-tree-sha1 = "84b10656a41ef564c39d2d477d7236966d2b5683" +uuid = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +version = "1.12.0" + +[[deps.Libdl]] +uuid = "8f399da3-3557-5675-b5ff-fb832c97cbdb" + +[[deps.LinearAlgebra]] +deps = ["Libdl", "libblastrampoline_jll"] +uuid = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" + +[[deps.Logging]] +uuid = "56ddb016-857b-54e1-b83d-db4d58db5568" + +[[deps.Markdown]] +deps = ["Base64"] +uuid = "d6f4376e-aef5-505a-96c1-9c027394607a" + +[[deps.Mmap]] +uuid = "a63ad114-7e13-5084-954f-fe012c677804" + +[[deps.Mosquitto]] +deps = ["Libdl", "Random", "Test"] +path = "/home/ton/.julia/dev/Mosquitto" +uuid = "db317de6-444b-4dfa-9d0e-fbf3d8dd78ea" +version = "0.4.1" + +[[deps.OpenBLAS_jll]] +deps = ["Artifacts", "CompilerSupportLibraries_jll", "Libdl"] +uuid = "4536629a-c528-5b80-bd46-f80d51c5b363" +version = "0.3.20+0" + +[[deps.OrderedCollections]] +git-tree-sha1 = "85f8e6578bf1f9ee0d11e7bb1b1456435479d47c" +uuid = "bac558e1-5e72-5ebc-8fee-abe8a469f55d" +version = "1.4.1" + +[[deps.Parsers]] +deps = ["Dates", "SnoopPrecompile"] +git-tree-sha1 = "6f4fbcd1ad45905a5dee3f4256fabb49aa2110c6" +uuid = "69de0a69-1ddd-5017-9359-2bf0b02dc9f0" +version = "2.5.7" + +[[deps.Preferences]] +deps = ["TOML"] +git-tree-sha1 = "47e5f437cc0e7ef2ce8406ce1e7e24d44915f88d" +uuid = "21216c6a-2e73-6563-6e65-726566657250" +version = "1.3.0" + +[[deps.Printf]] +deps = ["Unicode"] +uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7" + +[[deps.Random]] +deps = ["SHA", "Serialization"] +uuid = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" + +[[deps.Redis]] +deps = ["DataStructures", "Dates", "Sockets"] +git-tree-sha1 = "6b3c136222b08ae0c71657f2501c6741782a1ad1" +uuid = "0cf705f9-a9e2-50d1-a699-2b372a39b750" +version = "1.0.0" + +[[deps.SHA]] +uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce" +version = "0.7.0" + +[[deps.Serialization]] +uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b" + +[[deps.SnoopPrecompile]] +deps = ["Preferences"] +git-tree-sha1 = "e760a70afdcd461cf01a575947738d359234665c" +uuid = "66db9d55-30c0-4569-8b51-7e840670fc0c" +version = "1.0.3" + +[[deps.Sockets]] +uuid = "6462fe0b-24de-5631-8697-dd941f90decc" + +[[deps.StructTypes]] +deps = ["Dates", "UUIDs"] +git-tree-sha1 = "ca4bccb03acf9faaf4137a9abc1881ed1841aa70" +uuid = "856f2bd8-1eba-4b0a-8007-ebc267875bd4" +version = "1.10.0" + +[[deps.TOML]] +deps = ["Dates"] +uuid = "fa267f1f-6049-4f14-aa54-33bafae1ed76" +version = "1.0.0" + +[[deps.Test]] +deps = ["InteractiveUtils", "Logging", "Random", "Serialization"] +uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[[deps.UUIDs]] +deps = ["Random", "SHA"] +uuid = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" + +[[deps.Unicode]] +uuid = "4ec0a83e-493e-50e2-b9ac-8f72acf5a8f5" + +[[deps.libblastrampoline_jll]] +deps = ["Artifacts", "Libdl", "OpenBLAS_jll"] +uuid = "8e850b90-86db-534c-a0d3-1478176c7d93" +version = "5.1.1+0" diff --git a/previousVersion/0.0.1/Project.toml b/previousVersion/0.0.1/Project.toml new file mode 100644 index 0000000..f0e8f00 --- /dev/null +++ b/previousVersion/0.0.1/Project.toml @@ -0,0 +1,11 @@ +name = "CommUtils" +uuid = "646cbe82-3d4a-47b2-9440-2e80a472ca20" +authors = ["naraw "] +version = "0.1.0" + +[deps] +Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +Mosquitto = "db317de6-444b-4dfa-9d0e-fbf3d8dd78ea" +Redis = "0cf705f9-a9e2-50d1-a699-2b372a39b750" +UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" diff --git a/previousVersion/0.0.1/src/CommUtils.jl b/previousVersion/0.0.1/src/CommUtils.jl new file mode 100644 index 0000000..f4b5944 --- /dev/null +++ b/previousVersion/0.0.1/src/CommUtils.jl @@ -0,0 +1,72 @@ +module CommUtils + +# request and respond cannot be exported because Genie confuse +export mqttClient, publish, request, respond, mqttRun + + +include("interface.jl") +using .interface + + +#------------------------------------------------------------------------------------------------100 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +end # commUtils \ No newline at end of file diff --git a/previousVersion/0.0.1/src/interface.jl b/previousVersion/0.0.1/src/interface.jl new file mode 100644 index 0000000..20c9300 --- /dev/null +++ b/previousVersion/0.0.1/src/interface.jl @@ -0,0 +1,750 @@ +module interface + +export vecToDict, mqttClient, mqttRun, publish, request, respond + +using JSON3, Redis, UUIDs, Dates + +# Mosquitto.jl requires C++ Mosquitto lib (linux only). Ref: https://github.com/denglerchr/Mosquitto.jl +# push!(Base.DL_LOAD_PATH, "/usr", "/usr/local/lib", "/usr/include", "/usr/lib") +using Mosquitto + + +#------------------------------------------------------------------------------------------------100 + +""" Convert received MQTT payload of JSON data to julia dict. + copy() convert JSON3 obj to julia dict +""" +vecToDict(vec::Vector{UInt8}) = copy(JSON3.read(String(vec))) + + +# ---------------------------------------------------------------------------- # +# publish image with julia PythonCall (python 3.9) and opencv # +# ---------------------------------------------------------------------------- # +""" + using Revise + using Images, UUIDs + + # https://pyshine.com/Video-and-bidirectional-text-streaming/ + using CondaPkg; CondaPkg.add_pip("pybase64"); CondaPkg.add_pip("opencv-python"); CondaPkg.add("jsons"); CondaPkg.add("numpy"); + using PythonCall + cv2 = pyimport("cv2") + base64 = pyimport("pybase64") + np = pyimport("numpy") + + using imageUtils + + # ---------------------------------------------------------------------------- # + # client - publish image # + # ---------------------------------------------------------------------------- # + webcam = cv2.VideoCapture(0) + + _, frame_1 = webcam.read() # julia's PythonCall python-obj numpy array + # cv2_bgr_img = cv2.imread("20.jpg") + + # compress image data formats in order to make network transfer easier. + _, buffer = cv2.imencode(".jpg", cv2_bgr_img) + + jpg_as_pyStr = base64.b64encode(buffer).decode("utf-8") + jpg_as_juliaStr = pyconvert(String, jpg_as_pyStr) # for base64, python string or julia string is the same + juliaDict = Dict("img"=> jpg_as_juliaStr) + publish(mqttClient, "sometopic", JSON3.write(juliaDict)) # mqtt publish + + + # ---------------------------------------------------------------------------- # + # server - receive image # + # ---------------------------------------------------------------------------- # + img_str = payload["img"] # payload received thru mqtt + buf = base64.b64decode(img_str) + buffer = np.frombuffer(buf, dtype=np.uint8) + cv2_img = cv2.imdecode(buffer, 1) + julia_array_img = pyconvert(Array, cv2_img); # resulted in julia array but in cv2-numpy's row-major BGR format + img_julia_RGB = np2juliaImage(julia_array_img) + + + # ----------------------------- modify image here ---------------------------- # + + + img_bgr = juliaImg2npImg(img_julia_RGB); + frame_2 = np.array(img_bgr) # julia's PythonCall python-obj numpy array +""" + +# ---------------------------------------------------------------------------- # +# mqtt # +# ---------------------------------------------------------------------------- # +""" + ``` + Store mqtt client info with client instance + ``` + # Example + mqttClientSpec = ( + clientName= "someclient", # name of this client + clientID= mqttclienid, + broker= "192.168.88.10", + pub= (faceAI="face_problem_detection/request",), + ) + client = CommUtils.mqttClient(mqttClientSpec) + + # msg format + msg = Dict{Symbol, Any}( + :msgMeta=> Dict( + :msgPurpose=> "updateStatus", + :from=> "tempCtrl", + :to=> "masterCtrl", + :requestrespond=> "request", # this is a request or respond msg + :sendto=> "", # responder's topic + :replyTo=> nothing, # requester ask responder to send reply to this topic + :repondToMsgId=> ", # responder is responding to this msg id + :taskstatus=> "", # "complete", "fail", "waiting" or other status + :timestamp=> Dates.now(), + :msgId=> uuid4() + ), + :userdata1=> somedata, + :userdata2=> somedata, + ) +""" +Base.@kwdef mutable struct mqttClient + # client info + clientName::String= "" # name of this client + clientID::String= "" + userName::String= "" + password::String= "" # e.g. raw"GP$gxe_2W8vJ-9-5)YU8#-x". use raw"" if $ is present + + # server info + serverName::String= "localmqttserver" + # 172.17.0.1 is docker internal gateway that bind to "0.0.0.0"(linux) or "127.0.0.1"(windows) + broker::String="172.17.0.1" + port::Integer=1883 + + pubtopic::NamedTuple= ( + respondtopic= "", + ) + telemetrytopic::NamedTuple= NamedTuple() # e.g. (temp="temp/telemetry", humid="humid/telemetry) + debugtopic::NamedTuple= NamedTuple() + subtopic::NamedTuple= ( + requesttopic= "", + selfmailboxtopic= "", # use it to receive result from other microservices + ) + + # client operation + clientInstance::Any= nothing # client instance after connected to mqtt server resides here + connectStatus::Bool= false + subStatus::Bool= false + boottime::DateTime= Dates.now() + retainedMsgCleared= false + +end +function mqttClient(kwargs::NamedTuple) + + # instantiate client data struct + s = mqttClient() + + for (k, v) in pairs(kwargs) + s.:($k) = v + end + + # instantiate mqtt client + if s.clientID != "" + s.clientInstance = Mosquitto.Client(id=s.clientID) + else + s.clientInstance = Mosquitto.Client() + end + + # connect + mqttConnect!(s) + + return s +end + +function mqttConnect!(client::mqttClient) + if client.userName != "" + result = Mosquitto.connect(client.client, client.broker, client.port, + username=client.userName, + password=client.password) + client.connectStatus = result == 0 ? true : false + else + result = Mosquitto.connect(client.clientInstance, client.broker, client.port) + client.connectStatus = result == 0 ? true : false + end +end + +# loop client instance manually +mqttLoop!(client::mqttClient; timeout=100) = loop(client.clientInstance, timeout=timeout) + + +""" + Checking all client for potential connection lost and mark the lost connection +""" +function mqttCheckConnection!(client::mqttClient) + # check channel, if there is something todo + nmessages = Base.n_avail(get_connect_channel()) + + if nmessages == 0 + return 0 + else + for i = 1:nmessages + # each conncb in a channel is a connection struct resulted from Mosquitto.connect() + conncb = take!(get_connect_channel()) + if conncb.clientptr == client.clientInstance.cptr.mosc # use conncb.clientptr (pointer) as identifier + if conncb.val == 1 + println("$(conncb.clientptr): connection successfull") + client.connectStatus = true + elseif conncb.val == 0 + println("$(conncb.clientptr): $(client.clientName) disconnected") + client.connectStatus = false + client.subStatus = false + else + error("undefined condition line $(@__LINE__)") + end + end + end + end +end + + +""" + Reconnect all client that is marked "lost connection" +""" +function mqttReconnect!(client::mqttClient) + if client.connectStatus == false + Mosquitto.disconnect(client.clientInstance) # disconnect to release port before attemping new connection + + if client.userName === nothing # check if the member needs username/password + result = Mosquitto.connect(client.clientInstance, client.broker, client.port) + client.connectStatus = result == 0 ? true : false + else + result = Mosquitto.connect(client.clientInstance, client.broker, client.port, + username=client.userName, + password=client.password) + client.connectStatus = result == 0 ? true : false + end + + end +end + + +""" + Subscribe all topic for all client +""" +function mqttSubOnConnect!(client::mqttClient) + if client.connectStatus == true && client.subStatus == false + for (k, v) in pairs(client.subtopic) + if v != "" + println("sub to $k") + Mosquitto.subscribe(client.clientInstance, v) + client.subStatus = true + end + end + end +end + + +""" + mqtt communication loop, call this function manually +""" +function mqttRun(client::mqttClient, userMsgHandler::Function) + mqttLoop!(client) + mqttCheckConnection!(client) + mqttReconnect!(client) + mqttSubOnConnect!(client) + mqttOnmessage!(client, userMsgHandler) +end + + +""" + userMsgHandler. A user defined function in user space. + + # Example + + function mqttMsgHander(topic::String, payload) + # this function access workSpace by global variable and do something with msg + end + +""" +function mqttOnmessage!(client::mqttClient, userMsgHandler::Function) + nmessages = Base.n_avail(get_messages_channel()) + nmessages == 0 && return 0 + + timepass = (Dates.now() - client.boottime).value / 1000.0 + + if timepass <= 1 && client.retainedMsgCleared == false + # discard all retained message + for i = 1:nmessages + _ = take!(get_messages_channel()) # discard new arrival package + end + elseif timepass > 1 && client.retainedMsgCleared == false + client.retainedMsgCleared = true + println("retained mqtt messages cleared") + else + for i = 1:nmessages + pkg = take!(get_messages_channel()) # take new arrival package + topic = pkg.topic + # sptoptic = split(topic, "/") + payload = vecToDict(pkg.payload) # payload in Dict format + + # Do something with the message + userMsgHandler(topic, payload) # user defined function in user space. How to handle msg + end + end +end + + + +function publish(client::mqttClient, msg::Dict, pubtopic::String="") + Mosquitto.publish(client.clientInstance, pubtopic, JSON3.write(msg)) +end + + +""" + request function check msg for additional communication info in addition to publish() +""" +function request(client::mqttClient, msg::Dict; pubtopic::String="") + pubto = Vector{String}() + if pubtopic != "" + push!(pubto, pubtopic) + elseif msg[:msgMeta][:sendto] != "" + push!(pubto, msg[:msgMeta][:sendto]) + else + if length(client.pubtopic) != 0 + for (k, topic) in pairs(client.pubtopic) + push!(pubto, topic) + end + else + error("publish topic not defined") + end + end + + + for i in pubto + Mosquitto.publish(client.clientInstance, i, JSON3.write(msg)) + end +end + +""" + respond function check msg for additional communication info in addition to publish() +""" +function respond(client::mqttClient, respond_msg::Dict, request_msg::Dict; pubtopic::String="") + replyTo = request_msg[:msgMeta][:replyTo] + respond_msg[:msgMeta][:repondToMsgId] = request_msg[:msgMeta][:msgId] + respond_msg[:msgMeta][:requestrespond] = "respond" + + pubto = Vector{String}() + if pubtopic != "" + push!(pubto, pubtopic) + elseif replyTo != "" + push!(pubto, replyTo) + else + if length(client.pubtopic) != 0 + for (k, topic) in pairs(client.pubtopic) + push!(pubto, topic) + end + else + error("publish topic not defined") + end + end + + for i in pubto + Mosquitto.publish(client.clientInstance, i, JSON3.write(respond_msg)) + end +end + + + + + + + + + + + + + + + +""" + Store details about mqtt and connected-Clients1 +""" +mqtt_member_dict = Dict( + :firstClient=> Dict( # masterController itself for internal communication + :clientName=> "masterController", # name of this client + :clientID=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98", + :username=> nothing, + :password=> nothing, + + # mqtt container is located outside of greenhouse-network1, + # 172.17.0.1 is local docker IP + :serverName=> "local_mqtt_server", # name of the server this client is connecting + :broker=> "172.17.0.1", + :port=> 1883, + :requesttopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/request", # channel of THIS client + :respondtopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/respond", # channel of THIS client + :telemetrytopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/telemetry", # channel of THIS client + :debugtopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/debug", # channel of THIS client + :pubtoptic=> Vector{String}(), # other channel this client publish to + :subtoptic=> Vector{String}(), # other channel this client subscribe to + :mqttClient=> nothing, # client instance after connected to mqtt server resides here + :connectStatus=> false, + :subStatus=> false + ), + :secondClient=> Dict( # masterController itself for internal communication + :clientName=> "masterController", # name of this client + :clientID=> "42f53073-b2ed-4b4a-8b11-155c58135dfc", + :username=> nothing, + :password=> nothing, + + # mqtt container is located outside of greenhouse-network1, + # 172.17.0.1 is local docker IP + :serverName=> "yiemMaintenance", # name of the server this client is connecting + :broker=> "mqtt.yiem.ai", + :port=> 1883, + :requesttopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/request", # channel of THIS client + :respondtopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/respond", # channel of THIS client + :telemetrytopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/telemetry", # channel of THIS client + :debugtopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/debug", # channel of THIS client + :pubtoptic=> Vector{String}(), # other channel this client publish to + :subtoptic=> Vector{String}(), # other channel this client subscribe to + :mqttClient=> nothing, # client instance after connected to mqtt server resides here + :connectStatus=> false, + :subStatus=> false + ), + :thirdClient=> Dict( # masterController itself for internal communication + :clientName=> "masterController", # name of this client + :clientID=> "f64e864b-e93f-4d1e-8662-0d6da68c9c03", #TODO change here netpie clientID + :username=> "t5gBa", #TODO change here netpie token + :password=> raw"GP$gxe_2W8vJ-9-5)YU8#-x", #TODO change here netpie secret + + # mqtt container is located outside of greenhouse-network1, + # 172.17.0.1 is local docker IP + :serverName=> "netpie2020", # name of the server this client is connecting + :broker=> "broker.netpie.io", + :port=> 1883, + :requesttopic=> "@msg/masterController", # channel of THIS client + :respondtopic=> "@shadow/data/update", # channel of THIS client # channel of THIS client + :pubtoptic=> Vector{String}(), # other channel this client publish to + :subtoptic=> Vector{String}(), # other channel this client subscribe to + :mqttClient=> nothing, # client instance after connected to mqtt server resides here + :connectStatus=> false, + :subStatus=> false + ), +) + + + + +""" operate on mqtt_member_dict + Connect all clients to relevent server +""" +function mqttConnect!(mqttMemberDict::Dict) + for (k, v) in mqttMemberDict + member = mqttMemberDict[k] + member[:mqttClient] = member[:clientID] === nothing ? Mosquitto.Client() : + Mosquitto.Client(id=member[:clientID]) + println("connecting member $k") + port = haskey(member, :port) ? member[:port] : 1883 + if member[:username] === nothing + result = Mosquitto.connect(member[:mqttClient], member[:broker], port) + member[:connectStatus] = result == 0 ? true : false + else + result = Mosquitto.connect(member[:mqttClient], member[:broker], port, + username=member[:username], + password=member[:password]) + member[:connectStatus] = result == 0 ? true : false + end + end +end + + +""" operate on mqtt_member_dict + Checking all client for potential connection lost and mark the lost connection +""" +function mqttCheckConnection!(mqttMemberDict::Dict) + # check channel, if there is something todo + nmessages = Base.n_avail(get_connect_channel()) + + if nmessages == 0 + return 0 + else + for i = 1:nmessages + # each conncb in a channel is a connection struct resulted from Mosquitto.connect() + conncb = take!(get_connect_channel()) + for (k, v) in mqttMemberDict + member = mqttMemberDict[k] + if conncb.clientptr == member[:mqttClient].cptr.mosc # use conncb.clientptr (pointer) as identifier + if conncb.val == 1 + println("$(conncb.clientptr): connection successfull") + member[:connectStatus] = true + elseif conncb.val == 0 + println("$(conncb.clientptr): member $i disconnected") + member[:connectStatus] = false + member[:subStatus] = false + else + error("something wrong with connection message") + end + end + end + end + end +end + + +""" operate on mqtt_member_dict + Reconnect all client that is marked "lost connection" +""" +function mqttReconnect!(mqttMemberDict::Dict) + for (k, v) in mqttMemberDict # find the disconnected member + member = mqttMemberDict[k] + if member[:connectStatus] == false + Mosquitto.disconnect(member[:mqttClient]) # disconnect to release port before attemping new connection + + port = haskey(member, :port) ? member[:port] : 1883 + if member[:username] === nothing # check if the member needs username/password + result = Mosquitto.connect(member[:mqttClient], member[:broker], port) + member[:connectStatus] = result == 0 ? true : false + else + result = Mosquitto.connect(member[:mqttClient], member[:broker], port, + username=member[:username], + password=member[:password]) + member[:connectStatus] = result == 0 ? true : false + end + + end + end +end + + +""" operate on mqtt_member_dict + Subscribe all topic for all client +""" +function mqttSubOnConnect!(mqttMemberDict::Dict, subChannel::Vector{String}) + for (k, v) in mqttMemberDict + member = mqttMemberDict[k] + #TODO once reconnect, publish current machine state to netpie + if member[:connectStatus] == true && member[:subStatus] == false + Mosquitto.subscribe(member[:mqttClient], member[:requesttopic]) # sub to its own channel + + # mastercontroller sub to other controller + if k == :firstClient + Mosquitto.subscribe(member[:mqttClient], "$(config[:farminfo][:farmId])/controller/+/telemetry") + Mosquitto.subscribe(member[:mqttClient], "$(config[:farminfo][:farmId])/controller/+/respond") + end + member[:subStatus] = true + end + end +end + + +""" operate on mqtt_member_dict +""" +# What to do if there is a message +function mqttOnmessage!(mqttMemberDict, netpieCommDict::Dict, workDict::Dict) + nmessages = Base.n_avail(get_messages_channel()) + nmessages == 0 && return 0 + + if workDict[:residueMsgClearedStartTime] === nothing + workDict[:residueMsgClearedStartTime] = Dates.now() + end + timepass = (Dates.now() - workDict[:residueMsgClearedStartTime]).value / 1000.0 + + if timepass <= 10 && workDict[:residueMsgCleared] == 0 + # discard all residue message + for i = 1:nmessages + _ = take!(get_messages_channel()) # take new arrival package + end + elseif timepass > 10 && workDict[:residueMsgCleared] == 0 + workDict[:residueMsgCleared] = 1 + println("residue mqtt messages cleared") + else + for i = 1:nmessages + pkg = take!(get_messages_channel()) # take new arrival package + topic = pkg.topic + sptoptic = split(topic, "/") + payload = vecToDict(pkg.payload) + # println("topic ", topic) + # println("payload ", payload) + # Do something with the message + if topic == mqttMemberDict[:firstClient][:requesttopic] + # publish(c2, "julia", "From member 2"; qos = 2) + # result = yiemHandler() + return 0 + #FIXME add yiem.ai in config.json file + # elseif topic == mqttMemberDict[:yiemMaintenance][:requesttopic] + # localHandler(payload, workDict) + elseif topic == mqttMemberDict[:netpieGreenhouseActive][:requesttopic] # command from NETPIE + println("NETPIE msg") + netpieMsgHandler(payload, netpieCommDict, workDict, mqttMemberDict[:netpieGreenhouseActive]) + + elseif sptoptic[2] == "controller" + controllerName = sptoptic[3] + if controllerName == "fertilizerController_1" + fertilizerController_msgHandler(sptoptic[end], payload, workDict) + elseif controllerName == "temperatureController_1" + temperatureController_msgHandler(sptoptic[end], payload, workDict) + elseif controllerName == "humidityController_1" + humidityController_msgHandler(sptoptic[end], payload, workDict) + elseif controllerName == "illuminanceController_1" + illuminanceController_msgHandler(sptoptic[end], payload, workDict) + elseif controllerName == "co2Controller_1" + co2Controller_msgHandler(sptoptic[end], payload, workDict) + elseif controllerName == "ventilationController_1" + ventilationController_msgHandler(sptoptic[end], payload, workDict) + else + error("undefined condition") + end + else + println("Not yet define what to do with message from this topic: $topic") + return 1 + end + + return 0 + end + end +end + + +""" operate on mqtt_member_dict + network run +""" +function mqttLoop!(mqttMemberDict; timeout=100) + for (k, v) in mqttMemberDict + member = mqttMemberDict[k] + loop(member[:mqttClient], timeout=timeout) + end +end + +""" operate on mqtt_member_dict +""" +function mqttDisconnect!(memberList) + for member in memberList + disconnect(member[:mqttClient]) + end +end + + +""" + publish msg with msgMeta information. + + sourceName = "fertilizerController_1" or could be any name + destinationName = "masterController" or could be any name + topic = :requesttopic, :respondtopic, :telemetrytopic will use its own topic + or + "masterController/request" will use specified topic +""" +function publishMsg(destinationName::String, msgPurpose::String, topic::Symbol, + variableNameAndValue::Dict, workDict::Dict, config::Dict; + sourceName=workDict[:mainCtrlName]) + + msg = Dict{Symbol, Any}( + :msgMeta=> Dict( + :msgPurpose=> msgPurpose, # "updateStatus", "fertilizerMixOrder" + :from=> sourceName, + :to=> destinationName, + :repondToMsgId=> nothing, # the msg this msg is responding to + :repondtopic=> nothing, # ask server to reply to this topic + :timestamp=> Dates.now(), + :msgId=> uuid4()), + :payload=> nothing, + ) + for (k, v) in variableNameAndValue + msg[k] = v + end + + pubtopic = config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][topic] + publish(config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][:mqttClient], + pubtopic, JSON3.write(msg)) +end + +function publishMsg(destinationName::String, msgPurpose::String, topic::String, + variableNameAndValue::Dict, workDict::Dict, config::Dict; + sourceName=workDict[:mainCtrlName]) + + msg = Dict{Symbol, Any}( + :msgMeta=> Dict( + :msgPurpose=> msgPurpose, # "updateStatus", "fertilizerMixOrder" + :from=> sourceName, + :to=> destinationName, + :repondToMsgId=> nothing, # the msg this msg is responding to + :repondtopic=> nothing, # ask server to reply to this topic + :timestamp=> Dates.now(), + :msgId=> uuid4()), + :payload=> nothing, + ) + for (k, v) in variableNameAndValue + msg[k] = v + end + + pubtopic = topic + publish(config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][:mqttClient], + pubtopic, JSON3.write(msg)) +end + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +end # module end diff --git a/src/interface.jl b/src/interface.jl index 75060e6..cf57757 100644 --- a/src/interface.jl +++ b/src/interface.jl @@ -241,12 +241,12 @@ end """ mqtt communication loop, call this function manually """ -function mqttRun(client::mqttClient, userMsgHandler::Function) +function mqttRun(client::mqttClient) mqttLoop!(client) mqttCheckConnection!(client) mqttReconnect!(client) mqttSubOnConnect!(client) - mqttOnmessage!(client, userMsgHandler) + return mqttOnmessage!(client) end @@ -260,9 +260,9 @@ end end """ -function mqttOnmessage!(client::mqttClient, userMsgHandler::Function) +function mqttOnmessage!(client::mqttClient) nmessages = Base.n_avail(get_messages_channel()) - nmessages == 0 && return 0 + nmessages == 0 && return nothing timepass = (Dates.now() - client.boottime).value / 1000.0 @@ -280,9 +280,8 @@ function mqttOnmessage!(client::mqttClient, userMsgHandler::Function) topic = pkg.topic # sptoptic = split(topic, "/") payload = vecToDict(pkg.payload) # payload in Dict format - - # Do something with the message - userMsgHandler(topic, payload) # user defined function in user space. How to handle msg + + return payload end end end @@ -348,21 +347,8 @@ function respond(client::mqttClient, respond_msg::Dict, request_msg::Dict; pubto end - - - - - - - - - - - - - """ - Store details about mqtt and connected-Clients1 + Example of mqtt_member_dict which store details about mqtt and connected-Client """ mqtt_member_dict = Dict( :firstClient=> Dict( # masterController itself for internal communication @@ -429,8 +415,6 @@ mqtt_member_dict = Dict( ) - - """ operate on mqtt_member_dict Connect all clients to relevent server """ @@ -536,7 +520,7 @@ end """ operate on mqtt_member_dict """ # What to do if there is a message -function mqttOnmessage!(mqttMemberDict, netpieCommDict::Dict, workDict::Dict) +function mqttOnmessage!(mqttMemberDict::Dict, netpieCommDict::Dict, workDict::Dict) nmessages = Base.n_avail(get_messages_channel()) nmessages == 0 && return 0 @@ -604,7 +588,7 @@ end """ operate on mqtt_member_dict network run """ -function mqttLoop!(mqttMemberDict; timeout=100) +function mqttLoop!(mqttMemberDict::Dict; timeout=100) for (k, v) in mqttMemberDict member = mqttMemberDict[k] loop(member[:mqttClient], timeout=timeout)