module interface export vecToDict, mqttClient, mqttRun, publish, request, respond using JSON3, Redis, UUIDs, Dates # Mosquitto.jl requires C++ Mosquitto lib (linux only). Ref: https://github.com/denglerchr/Mosquitto.jl # push!(Base.DL_LOAD_PATH, "/usr", "/usr/local/lib", "/usr/include", "/usr/lib") using Mosquitto #------------------------------------------------------------------------------------------------100 """ Convert received MQTT payload of JSON data to julia dict. copy() convert JSON3 obj to julia dict """ vecToDict(vec::Vector{UInt8}) = copy(JSON3.read(String(vec))) # ---------------------------------------------------------------------------- # # publish image with julia PythonCall (python 3.9) and opencv # # ---------------------------------------------------------------------------- # """ using Revise using Images, UUIDs # https://pyshine.com/Video-and-bidirectional-text-streaming/ using CondaPkg; CondaPkg.add_pip("pybase64"); CondaPkg.add_pip("opencv-python"); CondaPkg.add("jsons"); CondaPkg.add("numpy"); using PythonCall cv = pyimport("cv2") base64 = pyimport("pybase64") np = pyimport("numpy") using imageUtils # ---------------------------------------------------------------------------- # # client - publish image # # ---------------------------------------------------------------------------- # webcam = cv2.VideoCapture(0) _, frame_1 = webcam.read() # julia's PythonCall python-obj numpy array # frame_1 = cv.imread("20.jpg") # compress image data formats in order to make network transfer easier. _, buffer = cv.imencode(".jpg", frame_1) jpg_as_pyStr = base64.b64encode(buffer).decode("utf-8") jpg_as_juliaStr = pyconvert(String, jpg_as_pyStr) # for base64, python string or julia string is the same juliaDict = Dict("img"=> jpg_as_juliaStr) publish(mqttClient, "sometopic", JSON3.write(juliaDict)) # mqtt publish # ---------------------------------------------------------------------------- # # server - receive image # # ---------------------------------------------------------------------------- # img_str = payload["img"] # payload received thru mqtt buf = base64.b64decode(img_str) buffer = np.frombuffer(buf, dtype=np.uint8) frame_1 = cv.imdecode(buffer, 1) frame_1_julia = pyconvert(Array, frame_1); img_julia_RGB = imageUtils.np2juliaImage(frame_1_julia) # ----------------------------- modify image here ---------------------------- # img_bgr = imageUtils.juliaImg2npImg(img_julia_RGB); frame_2 = np.array(img_bgr) # julia's PythonCall python-obj numpy array """ # ---------------------------------------------------------------------------- # # mqtt # # ---------------------------------------------------------------------------- # """ ``` Store mqtt client info with client instance ``` # Example mqttClientSpec = ( clientName= "someclient", # name of this client clientID= mqttclienid, broker= "192.168.88.10", pub= (faceAI="face_problem_detection/request",), ) client = CommUtils.mqttClient(mqttClientSpec) # msg format msg = Dict{Symbol, Any}( :msgMeta=> Dict( :msgPurpose=> "updateStatus", :from=> "tempCtrl", :to=> "masterCtrl", :requestrespond=> "request", # this is a request or respond msg :sendto=> "", # responder's topic :replyTo=> nothing, # requester ask responder to send reply to this topic :repondToMsgId=> ", # responder is responding to this msg id :taskstatus=> "", # "complete", "fail", "waiting" or other status :timestamp=> Dates.now(), :msgId=> uuid4() ), :userdata1=> somedata, :userdata2=> somedata, ) """ Base.@kwdef mutable struct mqttClient # client info clientName::String= "" # name of this client clientID::String= "" userName::String= "" password::String= "" # e.g. raw"GP$gxe_2W8vJ-9-5)YU8#-x". use raw"" if $ is present # server info serverName::String= "localmqttserver" # 172.17.0.1 is docker internal gateway that bind to "0.0.0.0"(linux) or "127.0.0.1"(windows) broker::String="172.17.0.1" port::Integer=1883 pubtopic::NamedTuple= ( respondtopic= "", ) telemetrytopic::NamedTuple= NamedTuple() # e.g. (temp="temp/telemetry", humid="humid/telemetry) debugtopic::NamedTuple= NamedTuple() subtopic::NamedTuple= ( requesttopic= "", selfmailboxtopic= "", # use it to receive result from other microservices ) # client operation clientInstance::Any= nothing # client instance after connected to mqtt server resides here connectStatus::Bool= false subStatus::Bool= false boottime::DateTime= Dates.now() retainedMsgCleared= false end function mqttClient(kwargs::NamedTuple) # instantiate client data struct s = mqttClient() for (k, v) in pairs(kwargs) s.:($k) = v end # instantiate mqtt client if s.clientID != "" s.clientInstance = Mosquitto.Client(id=s.clientID) else s.clientInstance = Mosquitto.Client() end # connect mqttConnect!(s) return s end function mqttConnect!(client::mqttClient) if client.userName != "" result = Mosquitto.connect(client.client, client.broker, client.port, username=client.userName, password=client.password) client.connectStatus = result == 0 ? true : false else result = Mosquitto.connect(client.clientInstance, client.broker, client.port) client.connectStatus = result == 0 ? true : false end end # loop client instance manually mqttLoop!(client::mqttClient; timeout=100) = loop(client.clientInstance, timeout=timeout) """ Checking all client for potential connection lost and mark the lost connection """ function mqttCheckConnection!(client::mqttClient) # check channel, if there is something todo nmessages = Base.n_avail(get_connect_channel()) if nmessages == 0 return 0 else for i = 1:nmessages # each conncb in a channel is a connection struct resulted from Mosquitto.connect() conncb = take!(get_connect_channel()) if conncb.clientptr == client.clientInstance.cptr.mosc # use conncb.clientptr (pointer) as identifier if conncb.val == 1 println("$(conncb.clientptr): connection successfull") client.connectStatus = true elseif conncb.val == 0 println("$(conncb.clientptr): $(client.clientName) disconnected") client.connectStatus = false client.subStatus = false else error("undefined condition line $(@__LINE__)") end end end end end """ Reconnect all client that is marked "lost connection" """ function mqttReconnect!(client::mqttClient) if client.connectStatus == false Mosquitto.disconnect(client.clientInstance) # disconnect to release port before attemping new connection if client.userName === nothing # check if the member needs username/password result = Mosquitto.connect(client.clientInstance, client.broker, client.port) client.connectStatus = result == 0 ? true : false else result = Mosquitto.connect(client.clientInstance, client.broker, client.port, username=client.userName, password=client.password) client.connectStatus = result == 0 ? true : false end end end """ Subscribe all topic for all client """ function mqttSubOnConnect!(client::mqttClient) if client.connectStatus == true && client.subStatus == false for (k, v) in pairs(client.subtopic) if v != "" println("sub to $k") Mosquitto.subscribe(client.clientInstance, v) client.subStatus = true end end end end """ mqtt communication loop, call this function manually """ function mqttRun(client::mqttClient, userMsgHandler::Function) mqttLoop!(client) mqttCheckConnection!(client) mqttReconnect!(client) mqttSubOnConnect!(client) mqttOnmessage!(client, userMsgHandler) end """ userMsgHandler. A user defined function in user space. # Example function mqttMsgHander(topic::String, payload) # this function access workSpace by global variable and do something with msg end """ function mqttOnmessage!(client::mqttClient, userMsgHandler::Function) nmessages = Base.n_avail(get_messages_channel()) nmessages == 0 && return 0 timepass = (Dates.now() - client.boottime).value / 1000.0 if timepass <= 1 && client.retainedMsgCleared == false # discard all retained message for i = 1:nmessages _ = take!(get_messages_channel()) # discard new arrival package end elseif timepass > 1 && client.retainedMsgCleared == false client.retainedMsgCleared = true println("retained mqtt messages cleared") else for i = 1:nmessages pkg = take!(get_messages_channel()) # take new arrival package topic = pkg.topic # sptoptic = split(topic, "/") payload = vecToDict(pkg.payload) # payload in Dict format # Do something with the message userMsgHandler(topic, payload) # user defined function in user space. How to handle msg end end end function publish(client::mqttClient, msg::Dict, pubtopic::String="") Mosquitto.publish(client.clientInstance, pubtopic, JSON3.write(msg)) end """ request function check msg for additional communication info in addition to publish() """ function request(client::mqttClient, msg::Dict; pubtopic::String="") pubto = Vector{String}() if pubtopic != "" push!(pubto, pubtopic) elseif msg[:msgMeta][:sendto] != "" push!(pubto, msg[:msgMeta][:sendto]) else if length(client.pubtopic) != 0 for (k, topic) in pairs(client.pubtopic) push!(pubto, topic) end else error("publish topic not defined") end end for i in pubto Mosquitto.publish(client.clientInstance, i, JSON3.write(msg)) end end """ respond function check msg for additional communication info in addition to publish() """ function respond(client::mqttClient, respond_msg::Dict, request_msg::Dict; pubtopic::String="") replyTo = request_msg[:msgMeta][:replyTo] respond_msg[:msgMeta][:repondToMsgId] = request_msg[:msgMeta][:msgId] respond_msg[:msgMeta][:requestrespond] = "respond" pubto = Vector{String}() if pubtopic != "" push!(pubto, pubtopic) elseif replyTo != "" push!(pubto, replyTo) else if length(client.pubtopic) != 0 for (k, topic) in pairs(client.pubtopic) push!(pubto, topic) end else error("publish topic not defined") end end for i in pubto Mosquitto.publish(client.clientInstance, i, JSON3.write(respond_msg)) end end """ Store details about mqtt and connected-Clients1 """ mqtt_member_dict = Dict( :firstClient=> Dict( # masterController itself for internal communication :clientName=> "masterController", # name of this client :clientID=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98", :username=> nothing, :password=> nothing, # mqtt container is located outside of greenhouse-network1, # 172.17.0.1 is local docker IP :serverName=> "local_mqtt_server", # name of the server this client is connecting :broker=> "172.17.0.1", :port=> 1883, :requesttopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/request", # channel of THIS client :respondtopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/respond", # channel of THIS client :telemetrytopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/telemetry", # channel of THIS client :debugtopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/debug", # channel of THIS client :pubtoptic=> Vector{String}(), # other channel this client publish to :subtoptic=> Vector{String}(), # other channel this client subscribe to :mqttClient=> nothing, # client instance after connected to mqtt server resides here :connectStatus=> false, :subStatus=> false ), :secondClient=> Dict( # masterController itself for internal communication :clientName=> "masterController", # name of this client :clientID=> "42f53073-b2ed-4b4a-8b11-155c58135dfc", :username=> nothing, :password=> nothing, # mqtt container is located outside of greenhouse-network1, # 172.17.0.1 is local docker IP :serverName=> "yiemMaintenance", # name of the server this client is connecting :broker=> "mqtt.yiem.ai", :port=> 1883, :requesttopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/request", # channel of THIS client :respondtopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/respond", # channel of THIS client :telemetrytopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/telemetry", # channel of THIS client :debugtopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/debug", # channel of THIS client :pubtoptic=> Vector{String}(), # other channel this client publish to :subtoptic=> Vector{String}(), # other channel this client subscribe to :mqttClient=> nothing, # client instance after connected to mqtt server resides here :connectStatus=> false, :subStatus=> false ), :thirdClient=> Dict( # masterController itself for internal communication :clientName=> "masterController", # name of this client :clientID=> "f64e864b-e93f-4d1e-8662-0d6da68c9c03", #TODO change here netpie clientID :username=> "t5gBa", #TODO change here netpie token :password=> raw"GP$gxe_2W8vJ-9-5)YU8#-x", #TODO change here netpie secret # mqtt container is located outside of greenhouse-network1, # 172.17.0.1 is local docker IP :serverName=> "netpie2020", # name of the server this client is connecting :broker=> "broker.netpie.io", :port=> 1883, :requesttopic=> "@msg/masterController", # channel of THIS client :respondtopic=> "@shadow/data/update", # channel of THIS client # channel of THIS client :pubtoptic=> Vector{String}(), # other channel this client publish to :subtoptic=> Vector{String}(), # other channel this client subscribe to :mqttClient=> nothing, # client instance after connected to mqtt server resides here :connectStatus=> false, :subStatus=> false ), ) """ operate on mqtt_member_dict Connect all clients to relevent server """ function mqttConnect!(mqttMemberDict::Dict) for (k, v) in mqttMemberDict member = mqttMemberDict[k] member[:mqttClient] = member[:clientID] === nothing ? Mosquitto.Client() : Mosquitto.Client(id=member[:clientID]) println("connecting member $k") port = haskey(member, :port) ? member[:port] : 1883 if member[:username] === nothing result = Mosquitto.connect(member[:mqttClient], member[:broker], port) member[:connectStatus] = result == 0 ? true : false else result = Mosquitto.connect(member[:mqttClient], member[:broker], port, username=member[:username], password=member[:password]) member[:connectStatus] = result == 0 ? true : false end end end """ operate on mqtt_member_dict Checking all client for potential connection lost and mark the lost connection """ function mqttCheckConnection!(mqttMemberDict::Dict) # check channel, if there is something todo nmessages = Base.n_avail(get_connect_channel()) if nmessages == 0 return 0 else for i = 1:nmessages # each conncb in a channel is a connection struct resulted from Mosquitto.connect() conncb = take!(get_connect_channel()) for (k, v) in mqttMemberDict member = mqttMemberDict[k] if conncb.clientptr == member[:mqttClient].cptr.mosc # use conncb.clientptr (pointer) as identifier if conncb.val == 1 println("$(conncb.clientptr): connection successfull") member[:connectStatus] = true elseif conncb.val == 0 println("$(conncb.clientptr): member $i disconnected") member[:connectStatus] = false member[:subStatus] = false else error("something wrong with connection message") end end end end end end """ operate on mqtt_member_dict Reconnect all client that is marked "lost connection" """ function mqttReconnect!(mqttMemberDict::Dict) for (k, v) in mqttMemberDict # find the disconnected member member = mqttMemberDict[k] if member[:connectStatus] == false Mosquitto.disconnect(member[:mqttClient]) # disconnect to release port before attemping new connection port = haskey(member, :port) ? member[:port] : 1883 if member[:username] === nothing # check if the member needs username/password result = Mosquitto.connect(member[:mqttClient], member[:broker], port) member[:connectStatus] = result == 0 ? true : false else result = Mosquitto.connect(member[:mqttClient], member[:broker], port, username=member[:username], password=member[:password]) member[:connectStatus] = result == 0 ? true : false end end end end """ operate on mqtt_member_dict Subscribe all topic for all client """ function mqttSubOnConnect!(mqttMemberDict::Dict, subChannel::Vector{String}) for (k, v) in mqttMemberDict member = mqttMemberDict[k] #TODO once reconnect, publish current machine state to netpie if member[:connectStatus] == true && member[:subStatus] == false Mosquitto.subscribe(member[:mqttClient], member[:requesttopic]) # sub to its own channel # mastercontroller sub to other controller if k == :firstClient Mosquitto.subscribe(member[:mqttClient], "$(config[:farminfo][:farmId])/controller/+/telemetry") Mosquitto.subscribe(member[:mqttClient], "$(config[:farminfo][:farmId])/controller/+/respond") end member[:subStatus] = true end end end """ operate on mqtt_member_dict """ # What to do if there is a message function mqttOnmessage!(mqttMemberDict, netpieCommDict::Dict, workDict::Dict) nmessages = Base.n_avail(get_messages_channel()) nmessages == 0 && return 0 if workDict[:residueMsgClearedStartTime] === nothing workDict[:residueMsgClearedStartTime] = Dates.now() end timepass = (Dates.now() - workDict[:residueMsgClearedStartTime]).value / 1000.0 if timepass <= 10 && workDict[:residueMsgCleared] == 0 # discard all residue message for i = 1:nmessages _ = take!(get_messages_channel()) # take new arrival package end elseif timepass > 10 && workDict[:residueMsgCleared] == 0 workDict[:residueMsgCleared] = 1 println("residue mqtt messages cleared") else for i = 1:nmessages pkg = take!(get_messages_channel()) # take new arrival package topic = pkg.topic sptoptic = split(topic, "/") payload = vecToDict(pkg.payload) # println("topic ", topic) # println("payload ", payload) # Do something with the message if topic == mqttMemberDict[:firstClient][:requesttopic] # publish(c2, "julia", "From member 2"; qos = 2) # result = yiemHandler() return 0 #FIXME add yiem.ai in config.json file # elseif topic == mqttMemberDict[:yiemMaintenance][:requesttopic] # localHandler(payload, workDict) elseif topic == mqttMemberDict[:netpieGreenhouseActive][:requesttopic] # command from NETPIE println("NETPIE msg") netpieMsgHandler(payload, netpieCommDict, workDict, mqttMemberDict[:netpieGreenhouseActive]) elseif sptoptic[2] == "controller" controllerName = sptoptic[3] if controllerName == "fertilizerController_1" fertilizerController_msgHandler(sptoptic[end], payload, workDict) elseif controllerName == "temperatureController_1" temperatureController_msgHandler(sptoptic[end], payload, workDict) elseif controllerName == "humidityController_1" humidityController_msgHandler(sptoptic[end], payload, workDict) elseif controllerName == "illuminanceController_1" illuminanceController_msgHandler(sptoptic[end], payload, workDict) elseif controllerName == "co2Controller_1" co2Controller_msgHandler(sptoptic[end], payload, workDict) elseif controllerName == "ventilationController_1" ventilationController_msgHandler(sptoptic[end], payload, workDict) else error("undefined condition") end else println("Not yet define what to do with message from this topic: $topic") return 1 end return 0 end end end """ operate on mqtt_member_dict network run """ function mqttLoop!(mqttMemberDict; timeout=100) for (k, v) in mqttMemberDict member = mqttMemberDict[k] loop(member[:mqttClient], timeout=timeout) end end """ operate on mqtt_member_dict """ function mqttDisconnect!(memberList) for member in memberList disconnect(member[:mqttClient]) end end """ publish msg with msgMeta information. sourceName = "fertilizerController_1" or could be any name destinationName = "masterController" or could be any name topic = :requesttopic, :respondtopic, :telemetrytopic will use its own topic or "masterController/request" will use specified topic """ function publishMsg(destinationName::String, msgPurpose::String, topic::Symbol, variableNameAndValue::Dict, workDict::Dict, config::Dict; sourceName=workDict[:mainCtrlName]) msg = Dict{Symbol, Any}( :msgMeta=> Dict( :msgPurpose=> msgPurpose, # "updateStatus", "fertilizerMixOrder" :from=> sourceName, :to=> destinationName, :repondToMsgId=> nothing, # the msg this msg is responding to :repondtopic=> nothing, # ask server to reply to this topic :timestamp=> Dates.now(), :msgId=> uuid4()), :payload=> nothing, ) for (k, v) in variableNameAndValue msg[k] = v end pubtopic = config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][topic] publish(config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][:mqttClient], pubtopic, JSON3.write(msg)) end function publishMsg(destinationName::String, msgPurpose::String, topic::String, variableNameAndValue::Dict, workDict::Dict, config::Dict; sourceName=workDict[:mainCtrlName]) msg = Dict{Symbol, Any}( :msgMeta=> Dict( :msgPurpose=> msgPurpose, # "updateStatus", "fertilizerMixOrder" :from=> sourceName, :to=> destinationName, :repondToMsgId=> nothing, # the msg this msg is responding to :repondtopic=> nothing, # ask server to reply to this topic :timestamp=> Dates.now(), :msgId=> uuid4()), :payload=> nothing, ) for (k, v) in variableNameAndValue msg[k] = v end pubtopic = topic publish(config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][:mqttClient], pubtopic, JSON3.write(msg)) end end # module end