comment here

This commit is contained in:
2023-03-16 11:42:56 +07:00
commit 2c788143de
5 changed files with 990 additions and 0 deletions

760
src/interface.jl Normal file
View File

@@ -0,0 +1,760 @@
module interface
export vecToDict, mqttClient, mqttRun, publish, request, respond
using JSON3, Redis, UUIDs, Dates
# Mosquitto.jl requires C++ Mosquitto lib (linux only). Ref: https://github.com/denglerchr/Mosquitto.jl
# push!(Base.DL_LOAD_PATH, "/usr", "/usr/local/lib", "/usr/include", "/usr/lib")
using Mosquitto
#------------------------------------------------------------------------------------------------100
""" Convert received MQTT payload of JSON data to julia dict.
copy() convert JSON3 obj to julia dict
"""
vecToDict(vec::Vector{UInt8}) = copy(JSON3.read(String(vec)))
# ---------------------------------------------------------------------------- #
# publish image with julia PythonCall (python 3.9) and opencv #
# ---------------------------------------------------------------------------- #
"""
using Revise
using Images, UUIDs
# https://pyshine.com/Video-and-bidirectional-text-streaming/
using CondaPkg; CondaPkg.add_pip("pybase64"); CondaPkg.add_pip("opencv-python"); CondaPkg.add("jsons"); CondaPkg.add("numpy");
using PythonCall
cv = pyimport("cv2")
base64 = pyimport("pybase64")
np = pyimport("numpy")
jsons = pyimport("jsons")
using imageUtils
# ---------------------------------------------------------------------------- #
# client - publish image #
# ---------------------------------------------------------------------------- #
webcam = cv2.VideoCapture(0)
_, frame_1 = webcam.read() # julia's PythonCall python-obj numpy array
# frame_1 = cv.imread("20.jpg")
_, buffer = cv.imencode(".jpg", frame_1)
jpg_as_text = base64.b64encode(buffer).decode("utf-8")
juliaDict = Dict("img"=> jpg_as_text)
pythonDict = pydict(juliaDict)
jsonStr = jsons.dumps(pythonDict)
# mqtt publish jsonStr
# ---------------------------------------------------------------------------- #
# server - receive image #
# ---------------------------------------------------------------------------- #
# mqtt receive jsonStr
pythonDict = jsons.loads(jsonStr)
juliaDict = pyconvert(Dict, pythonDict)
buf = base64.b64decode(juliaDict["img"])
buffer = np.frombuffer(buf, dtype=np.uint8)
frame_1 = cv.imdecode(buffer, 1)
frame_1_julia = pyconvert(Array, frame_1);
img_julia_RGB = imageUtils.np2juliaImage(frame_1_julia)
# ----------------------------- modify image here ---------------------------- #
img_bgr = imageUtils.juliaImg2npImg(img_julia_RGB);
frame_2 = np.array(img_bgr) # julia's PythonCall python-obj numpy array
"""
# ---------------------------------------------------------------------------- #
# mqtt #
# ---------------------------------------------------------------------------- #
"""
```
Store mqtt client info with client instance
```
# Example
mqttClientSpec = (
clientName= "someclient", # name of this client
clientID= mqttclienid,
broker= "192.168.88.10",
pub= (faceAI="face_problem_detection/request",),
)
client = CommUtils.mqttClient(mqttClientSpec)
# msg format
msg = Dict{Symbol, Any}(
:msgMeta=> Dict(
:msgPurpose=> "updateStatus",
:from=> "tempCtrl",
:to=> "masterCtrl",
:requestrespond=> "request", # this is a request or respond msg
:sendto=> "", # responder's topic
:replyTo=> nothing, # requester ask responder to send reply to this topic
:repondToMsgId=> ", # responder is responding to this msg id
:taskstatus=> "", # "complete", "fail", "waiting" or other status
:timestamp=> Dates.now(),
:msgId=> uuid4()
),
:userdata1=> somedata,
:userdata2=> somedata,
)
"""
Base.@kwdef mutable struct mqttClient
# client info
clientName::String= "" # name of this client
clientID::String= ""
userName::String= ""
password::String= "" # e.g. raw"GP$gxe_2W8vJ-9-5)YU8#-x". use raw"" if $ is present
# server info
serverName::String= "localmqttserver"
# 172.17.0.1 is docker internal gateway that bind to "0.0.0.0"(linux) or "127.0.0.1"(windows)
broker::String="172.17.0.1"
port::Integer=1883
pubtopic::NamedTuple= (
respondtopic= "",
)
telemetrytopic::NamedTuple= NamedTuple() # e.g. (temp="temp/telemetry", humid="humid/telemetry)
debugtopic::NamedTuple= NamedTuple()
subtopic::NamedTuple= (
requesttopic= "",
selfmailboxtopic= "", # use it to receive result from other microservices
)
# client operation
clientInstance::Any= nothing # client instance after connected to mqtt server resides here
connectStatus::Bool= false
subStatus::Bool= false
boottime::DateTime= Dates.now()
retainedMsgCleared= false
end
function mqttClient(kwargs::NamedTuple)
# instantiate client data struct
s = mqttClient()
for (k, v) in pairs(kwargs)
s.:($k) = v
end
# instantiate mqtt client
if s.clientID != ""
s.clientInstance = Mosquitto.Client(id=s.clientID)
else
s.clientInstance = Mosquitto.Client()
end
# connect
mqttConnect!(s)
return s
end
function mqttConnect!(client::mqttClient)
if client.userName != ""
result = Mosquitto.connect(client.client, client.broker, client.port,
username=client.userName,
password=client.password)
client.connectStatus = result == 0 ? true : false
else
result = Mosquitto.connect(client.clientInstance, client.broker, client.port)
client.connectStatus = result == 0 ? true : false
end
end
# loop client instance manually
mqttLoop!(client::mqttClient; timeout=100) = loop(client.clientInstance, timeout=timeout)
"""
Checking all client for potential connection lost and mark the lost connection
"""
function mqttCheckConnection!(client::mqttClient)
# check channel, if there is something todo
nmessages = Base.n_avail(get_connect_channel())
if nmessages == 0
return 0
else
for i = 1:nmessages
# each conncb in a channel is a connection struct resulted from Mosquitto.connect()
conncb = take!(get_connect_channel())
if conncb.clientptr == client.clientInstance.cptr.mosc # use conncb.clientptr (pointer) as identifier
if conncb.val == 1
println("$(conncb.clientptr): connection successfull")
client.connectStatus = true
elseif conncb.val == 0
println("$(conncb.clientptr): $(client.clientName) disconnected")
client.connectStatus = false
client.subStatus = false
else
error("undefined condition line $(@__LINE__)")
end
end
end
end
end
"""
Reconnect all client that is marked "lost connection"
"""
function mqttReconnect!(client::mqttClient)
if client.connectStatus == false
Mosquitto.disconnect(client) # disconnect to release port before attemping new connection
if client.userName === nothing # check if the member needs username/password
result = Mosquitto.connect(client.clientInstance, client.broker, client.port)
client.connectStatus = result == 0 ? true : false
else
result = Mosquitto.connect(client.clientInstance, client.broker, client.port,
username=client.userName,
password=client.password)
client.connectStatus = result == 0 ? true : false
end
end
end
"""
Subscribe all topic for all client
"""
function mqttSubOnConnect!(client::mqttClient)
if client.connectStatus == true && client.subStatus == false
for (k, v) in pairs(client.sub)
if v != ""
println("sub to $k")
subscribe(client.clientInstance, v)
client.subStatus = true
end
end
end
end
"""
mqtt communication loop, call this function manually
"""
function mqttRun(client::mqttClient)
mqttLoop!(client)
mqttCheckConnection!(client)
mqttReconnect!(client)
mqttSubOnConnect!(client)
end
# What to do if there is a message
function mqttOnmessage!(client::mqttClient, msgHandler::Function)
nmessages = Base.n_avail(get_messages_channel())
nmessages == 0 && return 0
timepass = (Dates.now() - client.boottime).value / 1000.0
if timepass <= 1 && client.retainedMsgCleared == false
# discard all retained message
for i = 1:nmessages
_ = take!(get_messages_channel()) # discard new arrival package
end
elseif timepass > 1 && client.retainedMsgCleared == false
workDict[:retainedMsgCleared] = true
println("retained mqtt messages cleared")
else
for i = 1:nmessages
pkg = take!(get_messages_channel()) # take new arrival package
topic = pkg.topic
# sptoptic = split(topic, "/")
payload = vecToDict(pkg.payload) # payload in Dict format
# Do something with the message
msgHandler(topic, payload)
end
end
end
function publish(client::mqttClient, msg::Dict, pubtopic::String="")
Mosquitto.publish(client.clientInstance, pubtopic, JSON3.write(msg))
end
"""
request function check msg for additional communication info in addition to publish()
"""
# function request(client::mqttClient, msg::Dict; pubtopic::String="")
# if pubtopic != ""
# Mosquitto.publish(client.clientInstance, pubtopic, JSON3.write(msg))
# elseif msg[:msgMeta][:sendto] != ""
# Mosquitto.publish(client.clientInstance, msg[:msgMeta][:sendto], JSON3.write(msg))
# else
# if length(client.pubtopic) != 0
# for (k, topic) in pairs(client.pubtopic)
# println("pub to $k")
# Mosquitto.publish(client.clientInstance, topic, JSON3.write(msg))
# end
# else
# error("publish topic not defined")
# end
# end
# end
function request(client::mqttClient, msg::Dict; pubtopic::String="")
pubto = Vector{String}()
if pubtopic != ""
push!(pubto, pubtopic)
elseif msg[:msgMeta][:sendto] != ""
push!(pubto, msg[:msgMeta][:sendto])
else
if length(client.pubtopic) != 0
for (k, topic) in pairs(client.pubtopic)
push!(pubto, topic)
end
else
error("publish topic not defined")
end
end
for i in pubto
Mosquitto.publish(client.clientInstance, i, JSON3.write(msg))
end
end
"""
respond function check msg for additional communication info in addition to publish()
"""
function respond(client::mqttClient, respond_msg::Dict, request_msg::Dict; pubtopic::String="")
replyTo = request_msg[:msgMeta][:replyTo]
respond_msg[:msgMeta][:repondToMsgId] = request_msg[:msgMeta][:msgId]
respond_msg[:msgMeta][:requestrespond] = "respond"
pubto = Vector{String}()
if pubtopic != ""
push!(pubto, pubtopic)
elseif replyTo != ""
push!(pubto, replyTo)
else
if length(client.pubtopic) != 0
for (k, topic) in pairs(client.pubtopic)
push!(pubto, topic)
end
else
error("publish topic not defined")
end
end
for i in pubto
Mosquitto.publish(client.clientInstance, i, JSON3.write(respond_msg))
end
end
"""
Store details about mqtt and connected-Clients1
"""
mqtt_member_dict = Dict(
:firstClient=> Dict( # masterController itself for internal communication
:clientName=> "masterController", # name of this client
:clientID=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98",
:username=> nothing,
:password=> nothing,
# mqtt container is located outside of greenhouse-network1,
# 172.17.0.1 is local docker IP
:serverName=> "local_mqtt_server", # name of the server this client is connecting
:broker=> "172.17.0.1",
:port=> 1883,
:requesttopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/request", # channel of THIS client
:respondtopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/respond", # channel of THIS client
:telemetrytopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/telemetry", # channel of THIS client
:debugtopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/debug", # channel of THIS client
:pubtoptic=> Vector{String}(), # other channel this client publish to
:subtoptic=> Vector{String}(), # other channel this client subscribe to
:mqttClient=> nothing, # client instance after connected to mqtt server resides here
:connectStatus=> false,
:subStatus=> false
),
:secondClient=> Dict( # masterController itself for internal communication
:clientName=> "masterController", # name of this client
:clientID=> "42f53073-b2ed-4b4a-8b11-155c58135dfc",
:username=> nothing,
:password=> nothing,
# mqtt container is located outside of greenhouse-network1,
# 172.17.0.1 is local docker IP
:serverName=> "yiemMaintenance", # name of the server this client is connecting
:broker=> "mqtt.yiem.ai",
:port=> 1883,
:requesttopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/request", # channel of THIS client
:respondtopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/respond", # channel of THIS client
:telemetrytopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/telemetry", # channel of THIS client
:debugtopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/debug", # channel of THIS client
:pubtoptic=> Vector{String}(), # other channel this client publish to
:subtoptic=> Vector{String}(), # other channel this client subscribe to
:mqttClient=> nothing, # client instance after connected to mqtt server resides here
:connectStatus=> false,
:subStatus=> false
),
:thirdClient=> Dict( # masterController itself for internal communication
:clientName=> "masterController", # name of this client
:clientID=> "f64e864b-e93f-4d1e-8662-0d6da68c9c03", #TODO change here netpie clientID
:username=> "t5gBa", #TODO change here netpie token
:password=> raw"GP$gxe_2W8vJ-9-5)YU8#-x", #TODO change here netpie secret
# mqtt container is located outside of greenhouse-network1,
# 172.17.0.1 is local docker IP
:serverName=> "netpie2020", # name of the server this client is connecting
:broker=> "broker.netpie.io",
:port=> 1883,
:requesttopic=> "@msg/masterController", # channel of THIS client
:respondtopic=> "@shadow/data/update", # channel of THIS client # channel of THIS client
:pubtoptic=> Vector{String}(), # other channel this client publish to
:subtoptic=> Vector{String}(), # other channel this client subscribe to
:mqttClient=> nothing, # client instance after connected to mqtt server resides here
:connectStatus=> false,
:subStatus=> false
),
)
""" operate on mqtt_member_dict
Connect all clients to relevent server
"""
function mqttConnect!(mqttMemberDict::Dict)
for (k, v) in mqttMemberDict
member = mqttMemberDict[k]
member[:mqttClient] = member[:clientID] === nothing ? Mosquitto.Client() :
Mosquitto.Client(id=member[:clientID])
println("connecting member $k")
port = haskey(member, :port) ? member[:port] : 1883
if member[:username] === nothing
result = Mosquitto.connect(member[:mqttClient], member[:broker], port)
member[:connectStatus] = result == 0 ? true : false
else
result = Mosquitto.connect(member[:mqttClient], member[:broker], port,
username=member[:username],
password=member[:password])
member[:connectStatus] = result == 0 ? true : false
end
end
end
""" operate on mqtt_member_dict
Checking all client for potential connection lost and mark the lost connection
"""
function mqttCheckConnection!(mqttMemberDict::Dict)
# check channel, if there is something todo
nmessages = Base.n_avail(get_connect_channel())
if nmessages == 0
return 0
else
for i = 1:nmessages
# each conncb in a channel is a connection struct resulted from Mosquitto.connect()
conncb = take!(get_connect_channel())
for (k, v) in mqttMemberDict
member = mqttMemberDict[k]
if conncb.clientptr == member[:mqttClient].cptr.mosc # use conncb.clientptr (pointer) as identifier
if conncb.val == 1
println("$(conncb.clientptr): connection successfull")
member[:connectStatus] = true
elseif conncb.val == 0
println("$(conncb.clientptr): member $i disconnected")
member[:connectStatus] = false
member[:subStatus] = false
else
error("something wrong with connection message")
end
end
end
end
end
end
""" operate on mqtt_member_dict
Reconnect all client that is marked "lost connection"
"""
function mqttReconnect!(mqttMemberDict::Dict)
for (k, v) in mqttMemberDict # find the disconnected member
member = mqttMemberDict[k]
if member[:connectStatus] == false
Mosquitto.disconnect(member[:mqttClient]) # disconnect to release port before attemping new connection
port = haskey(member, :port) ? member[:port] : 1883
if member[:username] === nothing # check if the member needs username/password
result = Mosquitto.connect(member[:mqttClient], member[:broker], port)
member[:connectStatus] = result == 0 ? true : false
else
result = Mosquitto.connect(member[:mqttClient], member[:broker], port,
username=member[:username],
password=member[:password])
member[:connectStatus] = result == 0 ? true : false
end
end
end
end
""" operate on mqtt_member_dict
Subscribe all topic for all client
"""
function mqttSubOnConnect!(mqttMemberDict::Dict, subChannel::Vector{String})
for (k, v) in mqttMemberDict
member = mqttMemberDict[k]
#TODO once reconnect, publish current machine state to netpie
if member[:connectStatus] == true && member[:subStatus] == false
Mosquitto.subscribe(member[:mqttClient], member[:requesttopic]) # sub to its own channel
# mastercontroller sub to other controller
if k == :firstClient
Mosquitto.subscribe(member[:mqttClient], "$(config[:farminfo][:farmId])/controller/+/telemetry")
Mosquitto.subscribe(member[:mqttClient], "$(config[:farminfo][:farmId])/controller/+/respond")
end
member[:subStatus] = true
end
end
end
""" operate on mqtt_member_dict
"""
# What to do if there is a message
function mqttOnmessage!(mqttMemberDict, netpieCommDict::Dict, workDict::Dict)
nmessages = Base.n_avail(get_messages_channel())
nmessages == 0 && return 0
if workDict[:residueMsgClearedStartTime] === nothing
workDict[:residueMsgClearedStartTime] = Dates.now()
end
timepass = (Dates.now() - workDict[:residueMsgClearedStartTime]).value / 1000.0
if timepass <= 10 && workDict[:residueMsgCleared] == 0
# discard all residue message
for i = 1:nmessages
_ = take!(get_messages_channel()) # take new arrival package
end
elseif timepass > 10 && workDict[:residueMsgCleared] == 0
workDict[:residueMsgCleared] = 1
println("residue mqtt messages cleared")
else
for i = 1:nmessages
pkg = take!(get_messages_channel()) # take new arrival package
topic = pkg.topic
sptoptic = split(topic, "/")
payload = vecToDict(pkg.payload)
# println("topic ", topic)
# println("payload ", payload)
# Do something with the message
if topic == mqttMemberDict[:firstClient][:requesttopic]
# publish(c2, "julia", "From member 2"; qos = 2)
# result = yiemHandler()
return 0
#FIXME add yiem.ai in config.json file
# elseif topic == mqttMemberDict[:yiemMaintenance][:requesttopic]
# localHandler(payload, workDict)
elseif topic == mqttMemberDict[:netpieGreenhouseActive][:requesttopic] # command from NETPIE
println("NETPIE msg")
netpieMsgHandler(payload, netpieCommDict, workDict, mqttMemberDict[:netpieGreenhouseActive])
elseif sptoptic[2] == "controller"
controllerName = sptoptic[3]
if controllerName == "fertilizerController_1"
fertilizerController_msgHandler(sptoptic[end], payload, workDict)
elseif controllerName == "temperatureController_1"
temperatureController_msgHandler(sptoptic[end], payload, workDict)
elseif controllerName == "humidityController_1"
humidityController_msgHandler(sptoptic[end], payload, workDict)
elseif controllerName == "illuminanceController_1"
illuminanceController_msgHandler(sptoptic[end], payload, workDict)
elseif controllerName == "co2Controller_1"
co2Controller_msgHandler(sptoptic[end], payload, workDict)
elseif controllerName == "ventilationController_1"
ventilationController_msgHandler(sptoptic[end], payload, workDict)
else
error("undefined condition")
end
else
println("Not yet define what to do with message from this topic: $topic")
return 1
end
return 0
end
end
end
""" operate on mqtt_member_dict
network run
"""
function mqttLoop!(mqttMemberDict; timeout=100)
for (k, v) in mqttMemberDict
member = mqttMemberDict[k]
loop(member[:mqttClient], timeout=timeout)
end
end
""" operate on mqtt_member_dict
"""
function mqttDisconnect!(memberList)
for member in memberList
disconnect(member[:mqttClient])
end
end
"""
publish msg with msgMeta information.
sourceName = "fertilizerController_1" or could be any name
destinationName = "masterController" or could be any name
topic = :requesttopic, :respondtopic, :telemetrytopic will use its own topic
or
"masterController/request" will use specified topic
"""
function publishMsg(destinationName::String, msgPurpose::String, topic::Symbol,
variableNameAndValue::Dict, workDict::Dict, config::Dict;
sourceName=workDict[:mainCtrlName])
msg = Dict{Symbol, Any}(
:msgMeta=> Dict(
:msgPurpose=> msgPurpose, # "updateStatus", "fertilizerMixOrder"
:from=> sourceName,
:to=> destinationName,
:repondToMsgId=> nothing, # the msg this msg is responding to
:repondtopic=> nothing, # ask server to reply to this topic
:timestamp=> Dates.now(),
:msgId=> uuid4()),
:payload=> nothing,
)
for (k, v) in variableNameAndValue
msg[k] = v
end
pubtopic = config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][topic]
publish(config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][:mqttClient],
pubtopic, JSON3.write(msg))
end
function publishMsg(destinationName::String, msgPurpose::String, topic::String,
variableNameAndValue::Dict, workDict::Dict, config::Dict;
sourceName=workDict[:mainCtrlName])
msg = Dict{Symbol, Any}(
:msgMeta=> Dict(
:msgPurpose=> msgPurpose, # "updateStatus", "fertilizerMixOrder"
:from=> sourceName,
:to=> destinationName,
:repondToMsgId=> nothing, # the msg this msg is responding to
:repondtopic=> nothing, # ask server to reply to this topic
:timestamp=> Dates.now(),
:msgId=> uuid4()),
:payload=> nothing,
)
for (k, v) in variableNameAndValue
msg[k] = v
end
pubtopic = topic
publish(config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][:mqttClient],
pubtopic, JSON3.write(msg))
end
end # module end