remove userHander()
This commit is contained in:
144
previousVersion/0.0.1/Manifest.toml
Normal file
144
previousVersion/0.0.1/Manifest.toml
Normal file
@@ -0,0 +1,144 @@
|
||||
# This file is machine-generated - editing it directly is not advised
|
||||
|
||||
julia_version = "1.8.5"
|
||||
manifest_format = "2.0"
|
||||
project_hash = "fb16ae23b45a4c23db667056d8a1047b9686893f"
|
||||
|
||||
[[deps.Artifacts]]
|
||||
uuid = "56f22d72-fd6d-98f1-02f0-08ddc0907c33"
|
||||
|
||||
[[deps.Base64]]
|
||||
uuid = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
|
||||
|
||||
[[deps.Compat]]
|
||||
deps = ["Dates", "LinearAlgebra", "UUIDs"]
|
||||
git-tree-sha1 = "61fdd77467a5c3ad071ef8277ac6bd6af7dd4c04"
|
||||
uuid = "34da2185-b29b-5c13-b0c7-acf172513d20"
|
||||
version = "4.6.0"
|
||||
|
||||
[[deps.CompilerSupportLibraries_jll]]
|
||||
deps = ["Artifacts", "Libdl"]
|
||||
uuid = "e66e0078-7015-5450-92f7-15fbd957f2ae"
|
||||
version = "1.0.1+0"
|
||||
|
||||
[[deps.DataStructures]]
|
||||
deps = ["Compat", "InteractiveUtils", "OrderedCollections"]
|
||||
git-tree-sha1 = "d1fff3a548102f48987a52a2e0d114fa97d730f0"
|
||||
uuid = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
|
||||
version = "0.18.13"
|
||||
|
||||
[[deps.Dates]]
|
||||
deps = ["Printf"]
|
||||
uuid = "ade2ca70-3891-5945-98fb-dc099432e06a"
|
||||
|
||||
[[deps.InteractiveUtils]]
|
||||
deps = ["Markdown"]
|
||||
uuid = "b77e0a4c-d291-57a0-90e8-8db25a27a240"
|
||||
|
||||
[[deps.JSON3]]
|
||||
deps = ["Dates", "Mmap", "Parsers", "SnoopPrecompile", "StructTypes", "UUIDs"]
|
||||
git-tree-sha1 = "84b10656a41ef564c39d2d477d7236966d2b5683"
|
||||
uuid = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
|
||||
version = "1.12.0"
|
||||
|
||||
[[deps.Libdl]]
|
||||
uuid = "8f399da3-3557-5675-b5ff-fb832c97cbdb"
|
||||
|
||||
[[deps.LinearAlgebra]]
|
||||
deps = ["Libdl", "libblastrampoline_jll"]
|
||||
uuid = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
|
||||
|
||||
[[deps.Logging]]
|
||||
uuid = "56ddb016-857b-54e1-b83d-db4d58db5568"
|
||||
|
||||
[[deps.Markdown]]
|
||||
deps = ["Base64"]
|
||||
uuid = "d6f4376e-aef5-505a-96c1-9c027394607a"
|
||||
|
||||
[[deps.Mmap]]
|
||||
uuid = "a63ad114-7e13-5084-954f-fe012c677804"
|
||||
|
||||
[[deps.Mosquitto]]
|
||||
deps = ["Libdl", "Random", "Test"]
|
||||
path = "/home/ton/.julia/dev/Mosquitto"
|
||||
uuid = "db317de6-444b-4dfa-9d0e-fbf3d8dd78ea"
|
||||
version = "0.4.1"
|
||||
|
||||
[[deps.OpenBLAS_jll]]
|
||||
deps = ["Artifacts", "CompilerSupportLibraries_jll", "Libdl"]
|
||||
uuid = "4536629a-c528-5b80-bd46-f80d51c5b363"
|
||||
version = "0.3.20+0"
|
||||
|
||||
[[deps.OrderedCollections]]
|
||||
git-tree-sha1 = "85f8e6578bf1f9ee0d11e7bb1b1456435479d47c"
|
||||
uuid = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
|
||||
version = "1.4.1"
|
||||
|
||||
[[deps.Parsers]]
|
||||
deps = ["Dates", "SnoopPrecompile"]
|
||||
git-tree-sha1 = "6f4fbcd1ad45905a5dee3f4256fabb49aa2110c6"
|
||||
uuid = "69de0a69-1ddd-5017-9359-2bf0b02dc9f0"
|
||||
version = "2.5.7"
|
||||
|
||||
[[deps.Preferences]]
|
||||
deps = ["TOML"]
|
||||
git-tree-sha1 = "47e5f437cc0e7ef2ce8406ce1e7e24d44915f88d"
|
||||
uuid = "21216c6a-2e73-6563-6e65-726566657250"
|
||||
version = "1.3.0"
|
||||
|
||||
[[deps.Printf]]
|
||||
deps = ["Unicode"]
|
||||
uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7"
|
||||
|
||||
[[deps.Random]]
|
||||
deps = ["SHA", "Serialization"]
|
||||
uuid = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
|
||||
|
||||
[[deps.Redis]]
|
||||
deps = ["DataStructures", "Dates", "Sockets"]
|
||||
git-tree-sha1 = "6b3c136222b08ae0c71657f2501c6741782a1ad1"
|
||||
uuid = "0cf705f9-a9e2-50d1-a699-2b372a39b750"
|
||||
version = "1.0.0"
|
||||
|
||||
[[deps.SHA]]
|
||||
uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce"
|
||||
version = "0.7.0"
|
||||
|
||||
[[deps.Serialization]]
|
||||
uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
|
||||
|
||||
[[deps.SnoopPrecompile]]
|
||||
deps = ["Preferences"]
|
||||
git-tree-sha1 = "e760a70afdcd461cf01a575947738d359234665c"
|
||||
uuid = "66db9d55-30c0-4569-8b51-7e840670fc0c"
|
||||
version = "1.0.3"
|
||||
|
||||
[[deps.Sockets]]
|
||||
uuid = "6462fe0b-24de-5631-8697-dd941f90decc"
|
||||
|
||||
[[deps.StructTypes]]
|
||||
deps = ["Dates", "UUIDs"]
|
||||
git-tree-sha1 = "ca4bccb03acf9faaf4137a9abc1881ed1841aa70"
|
||||
uuid = "856f2bd8-1eba-4b0a-8007-ebc267875bd4"
|
||||
version = "1.10.0"
|
||||
|
||||
[[deps.TOML]]
|
||||
deps = ["Dates"]
|
||||
uuid = "fa267f1f-6049-4f14-aa54-33bafae1ed76"
|
||||
version = "1.0.0"
|
||||
|
||||
[[deps.Test]]
|
||||
deps = ["InteractiveUtils", "Logging", "Random", "Serialization"]
|
||||
uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
|
||||
|
||||
[[deps.UUIDs]]
|
||||
deps = ["Random", "SHA"]
|
||||
uuid = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
|
||||
|
||||
[[deps.Unicode]]
|
||||
uuid = "4ec0a83e-493e-50e2-b9ac-8f72acf5a8f5"
|
||||
|
||||
[[deps.libblastrampoline_jll]]
|
||||
deps = ["Artifacts", "Libdl", "OpenBLAS_jll"]
|
||||
uuid = "8e850b90-86db-534c-a0d3-1478176c7d93"
|
||||
version = "5.1.1+0"
|
||||
11
previousVersion/0.0.1/Project.toml
Normal file
11
previousVersion/0.0.1/Project.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
name = "CommUtils"
|
||||
uuid = "646cbe82-3d4a-47b2-9440-2e80a472ca20"
|
||||
authors = ["naraw "]
|
||||
version = "0.1.0"
|
||||
|
||||
[deps]
|
||||
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
|
||||
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
|
||||
Mosquitto = "db317de6-444b-4dfa-9d0e-fbf3d8dd78ea"
|
||||
Redis = "0cf705f9-a9e2-50d1-a699-2b372a39b750"
|
||||
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
|
||||
72
previousVersion/0.0.1/src/CommUtils.jl
Normal file
72
previousVersion/0.0.1/src/CommUtils.jl
Normal file
@@ -0,0 +1,72 @@
|
||||
module CommUtils
|
||||
|
||||
# request and respond cannot be exported because Genie confuse
|
||||
export mqttClient, publish, request, respond, mqttRun
|
||||
|
||||
|
||||
include("interface.jl")
|
||||
using .interface
|
||||
|
||||
|
||||
#------------------------------------------------------------------------------------------------100
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
end # commUtils
|
||||
750
previousVersion/0.0.1/src/interface.jl
Normal file
750
previousVersion/0.0.1/src/interface.jl
Normal file
@@ -0,0 +1,750 @@
|
||||
module interface
|
||||
|
||||
export vecToDict, mqttClient, mqttRun, publish, request, respond
|
||||
|
||||
using JSON3, Redis, UUIDs, Dates
|
||||
|
||||
# Mosquitto.jl requires C++ Mosquitto lib (linux only). Ref: https://github.com/denglerchr/Mosquitto.jl
|
||||
# push!(Base.DL_LOAD_PATH, "/usr", "/usr/local/lib", "/usr/include", "/usr/lib")
|
||||
using Mosquitto
|
||||
|
||||
|
||||
#------------------------------------------------------------------------------------------------100
|
||||
|
||||
""" Convert received MQTT payload of JSON data to julia dict.
|
||||
copy() convert JSON3 obj to julia dict
|
||||
"""
|
||||
vecToDict(vec::Vector{UInt8}) = copy(JSON3.read(String(vec)))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------- #
|
||||
# publish image with julia PythonCall (python 3.9) and opencv #
|
||||
# ---------------------------------------------------------------------------- #
|
||||
"""
|
||||
using Revise
|
||||
using Images, UUIDs
|
||||
|
||||
# https://pyshine.com/Video-and-bidirectional-text-streaming/
|
||||
using CondaPkg; CondaPkg.add_pip("pybase64"); CondaPkg.add_pip("opencv-python"); CondaPkg.add("jsons"); CondaPkg.add("numpy");
|
||||
using PythonCall
|
||||
cv2 = pyimport("cv2")
|
||||
base64 = pyimport("pybase64")
|
||||
np = pyimport("numpy")
|
||||
|
||||
using imageUtils
|
||||
|
||||
# ---------------------------------------------------------------------------- #
|
||||
# client - publish image #
|
||||
# ---------------------------------------------------------------------------- #
|
||||
webcam = cv2.VideoCapture(0)
|
||||
|
||||
_, frame_1 = webcam.read() # julia's PythonCall python-obj numpy array
|
||||
# cv2_bgr_img = cv2.imread("20.jpg")
|
||||
|
||||
# compress image data formats in order to make network transfer easier.
|
||||
_, buffer = cv2.imencode(".jpg", cv2_bgr_img)
|
||||
|
||||
jpg_as_pyStr = base64.b64encode(buffer).decode("utf-8")
|
||||
jpg_as_juliaStr = pyconvert(String, jpg_as_pyStr) # for base64, python string or julia string is the same
|
||||
juliaDict = Dict("img"=> jpg_as_juliaStr)
|
||||
publish(mqttClient, "sometopic", JSON3.write(juliaDict)) # mqtt publish
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------- #
|
||||
# server - receive image #
|
||||
# ---------------------------------------------------------------------------- #
|
||||
img_str = payload["img"] # payload received thru mqtt
|
||||
buf = base64.b64decode(img_str)
|
||||
buffer = np.frombuffer(buf, dtype=np.uint8)
|
||||
cv2_img = cv2.imdecode(buffer, 1)
|
||||
julia_array_img = pyconvert(Array, cv2_img); # resulted in julia array but in cv2-numpy's row-major BGR format
|
||||
img_julia_RGB = np2juliaImage(julia_array_img)
|
||||
|
||||
|
||||
# ----------------------------- modify image here ---------------------------- #
|
||||
|
||||
|
||||
img_bgr = juliaImg2npImg(img_julia_RGB);
|
||||
frame_2 = np.array(img_bgr) # julia's PythonCall python-obj numpy array
|
||||
"""
|
||||
|
||||
# ---------------------------------------------------------------------------- #
|
||||
# mqtt #
|
||||
# ---------------------------------------------------------------------------- #
|
||||
"""
|
||||
```
|
||||
Store mqtt client info with client instance
|
||||
```
|
||||
# Example
|
||||
mqttClientSpec = (
|
||||
clientName= "someclient", # name of this client
|
||||
clientID= mqttclienid,
|
||||
broker= "192.168.88.10",
|
||||
pub= (faceAI="face_problem_detection/request",),
|
||||
)
|
||||
client = CommUtils.mqttClient(mqttClientSpec)
|
||||
|
||||
# msg format
|
||||
msg = Dict{Symbol, Any}(
|
||||
:msgMeta=> Dict(
|
||||
:msgPurpose=> "updateStatus",
|
||||
:from=> "tempCtrl",
|
||||
:to=> "masterCtrl",
|
||||
:requestrespond=> "request", # this is a request or respond msg
|
||||
:sendto=> "", # responder's topic
|
||||
:replyTo=> nothing, # requester ask responder to send reply to this topic
|
||||
:repondToMsgId=> ", # responder is responding to this msg id
|
||||
:taskstatus=> "", # "complete", "fail", "waiting" or other status
|
||||
:timestamp=> Dates.now(),
|
||||
:msgId=> uuid4()
|
||||
),
|
||||
:userdata1=> somedata,
|
||||
:userdata2=> somedata,
|
||||
)
|
||||
"""
|
||||
Base.@kwdef mutable struct mqttClient
|
||||
# client info
|
||||
clientName::String= "" # name of this client
|
||||
clientID::String= ""
|
||||
userName::String= ""
|
||||
password::String= "" # e.g. raw"GP$gxe_2W8vJ-9-5)YU8#-x". use raw"" if $ is present
|
||||
|
||||
# server info
|
||||
serverName::String= "localmqttserver"
|
||||
# 172.17.0.1 is docker internal gateway that bind to "0.0.0.0"(linux) or "127.0.0.1"(windows)
|
||||
broker::String="172.17.0.1"
|
||||
port::Integer=1883
|
||||
|
||||
pubtopic::NamedTuple= (
|
||||
respondtopic= "",
|
||||
)
|
||||
telemetrytopic::NamedTuple= NamedTuple() # e.g. (temp="temp/telemetry", humid="humid/telemetry)
|
||||
debugtopic::NamedTuple= NamedTuple()
|
||||
subtopic::NamedTuple= (
|
||||
requesttopic= "",
|
||||
selfmailboxtopic= "", # use it to receive result from other microservices
|
||||
)
|
||||
|
||||
# client operation
|
||||
clientInstance::Any= nothing # client instance after connected to mqtt server resides here
|
||||
connectStatus::Bool= false
|
||||
subStatus::Bool= false
|
||||
boottime::DateTime= Dates.now()
|
||||
retainedMsgCleared= false
|
||||
|
||||
end
|
||||
function mqttClient(kwargs::NamedTuple)
|
||||
|
||||
# instantiate client data struct
|
||||
s = mqttClient()
|
||||
|
||||
for (k, v) in pairs(kwargs)
|
||||
s.:($k) = v
|
||||
end
|
||||
|
||||
# instantiate mqtt client
|
||||
if s.clientID != ""
|
||||
s.clientInstance = Mosquitto.Client(id=s.clientID)
|
||||
else
|
||||
s.clientInstance = Mosquitto.Client()
|
||||
end
|
||||
|
||||
# connect
|
||||
mqttConnect!(s)
|
||||
|
||||
return s
|
||||
end
|
||||
|
||||
function mqttConnect!(client::mqttClient)
|
||||
if client.userName != ""
|
||||
result = Mosquitto.connect(client.client, client.broker, client.port,
|
||||
username=client.userName,
|
||||
password=client.password)
|
||||
client.connectStatus = result == 0 ? true : false
|
||||
else
|
||||
result = Mosquitto.connect(client.clientInstance, client.broker, client.port)
|
||||
client.connectStatus = result == 0 ? true : false
|
||||
end
|
||||
end
|
||||
|
||||
# loop client instance manually
|
||||
mqttLoop!(client::mqttClient; timeout=100) = loop(client.clientInstance, timeout=timeout)
|
||||
|
||||
|
||||
"""
|
||||
Checking all client for potential connection lost and mark the lost connection
|
||||
"""
|
||||
function mqttCheckConnection!(client::mqttClient)
|
||||
# check channel, if there is something todo
|
||||
nmessages = Base.n_avail(get_connect_channel())
|
||||
|
||||
if nmessages == 0
|
||||
return 0
|
||||
else
|
||||
for i = 1:nmessages
|
||||
# each conncb in a channel is a connection struct resulted from Mosquitto.connect()
|
||||
conncb = take!(get_connect_channel())
|
||||
if conncb.clientptr == client.clientInstance.cptr.mosc # use conncb.clientptr (pointer) as identifier
|
||||
if conncb.val == 1
|
||||
println("$(conncb.clientptr): connection successfull")
|
||||
client.connectStatus = true
|
||||
elseif conncb.val == 0
|
||||
println("$(conncb.clientptr): $(client.clientName) disconnected")
|
||||
client.connectStatus = false
|
||||
client.subStatus = false
|
||||
else
|
||||
error("undefined condition line $(@__LINE__)")
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
"""
|
||||
Reconnect all client that is marked "lost connection"
|
||||
"""
|
||||
function mqttReconnect!(client::mqttClient)
|
||||
if client.connectStatus == false
|
||||
Mosquitto.disconnect(client.clientInstance) # disconnect to release port before attemping new connection
|
||||
|
||||
if client.userName === nothing # check if the member needs username/password
|
||||
result = Mosquitto.connect(client.clientInstance, client.broker, client.port)
|
||||
client.connectStatus = result == 0 ? true : false
|
||||
else
|
||||
result = Mosquitto.connect(client.clientInstance, client.broker, client.port,
|
||||
username=client.userName,
|
||||
password=client.password)
|
||||
client.connectStatus = result == 0 ? true : false
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
"""
|
||||
Subscribe all topic for all client
|
||||
"""
|
||||
function mqttSubOnConnect!(client::mqttClient)
|
||||
if client.connectStatus == true && client.subStatus == false
|
||||
for (k, v) in pairs(client.subtopic)
|
||||
if v != ""
|
||||
println("sub to $k")
|
||||
Mosquitto.subscribe(client.clientInstance, v)
|
||||
client.subStatus = true
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
"""
|
||||
mqtt communication loop, call this function manually
|
||||
"""
|
||||
function mqttRun(client::mqttClient, userMsgHandler::Function)
|
||||
mqttLoop!(client)
|
||||
mqttCheckConnection!(client)
|
||||
mqttReconnect!(client)
|
||||
mqttSubOnConnect!(client)
|
||||
mqttOnmessage!(client, userMsgHandler)
|
||||
end
|
||||
|
||||
|
||||
"""
|
||||
userMsgHandler. A user defined function in user space.
|
||||
|
||||
# Example
|
||||
|
||||
function mqttMsgHander(topic::String, payload)
|
||||
# this function access workSpace by global variable and do something with msg
|
||||
end
|
||||
|
||||
"""
|
||||
function mqttOnmessage!(client::mqttClient, userMsgHandler::Function)
|
||||
nmessages = Base.n_avail(get_messages_channel())
|
||||
nmessages == 0 && return 0
|
||||
|
||||
timepass = (Dates.now() - client.boottime).value / 1000.0
|
||||
|
||||
if timepass <= 1 && client.retainedMsgCleared == false
|
||||
# discard all retained message
|
||||
for i = 1:nmessages
|
||||
_ = take!(get_messages_channel()) # discard new arrival package
|
||||
end
|
||||
elseif timepass > 1 && client.retainedMsgCleared == false
|
||||
client.retainedMsgCleared = true
|
||||
println("retained mqtt messages cleared")
|
||||
else
|
||||
for i = 1:nmessages
|
||||
pkg = take!(get_messages_channel()) # take new arrival package
|
||||
topic = pkg.topic
|
||||
# sptoptic = split(topic, "/")
|
||||
payload = vecToDict(pkg.payload) # payload in Dict format
|
||||
|
||||
# Do something with the message
|
||||
userMsgHandler(topic, payload) # user defined function in user space. How to handle msg
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
||||
function publish(client::mqttClient, msg::Dict, pubtopic::String="")
|
||||
Mosquitto.publish(client.clientInstance, pubtopic, JSON3.write(msg))
|
||||
end
|
||||
|
||||
|
||||
"""
|
||||
request function check msg for additional communication info in addition to publish()
|
||||
"""
|
||||
function request(client::mqttClient, msg::Dict; pubtopic::String="")
|
||||
pubto = Vector{String}()
|
||||
if pubtopic != ""
|
||||
push!(pubto, pubtopic)
|
||||
elseif msg[:msgMeta][:sendto] != ""
|
||||
push!(pubto, msg[:msgMeta][:sendto])
|
||||
else
|
||||
if length(client.pubtopic) != 0
|
||||
for (k, topic) in pairs(client.pubtopic)
|
||||
push!(pubto, topic)
|
||||
end
|
||||
else
|
||||
error("publish topic not defined")
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
for i in pubto
|
||||
Mosquitto.publish(client.clientInstance, i, JSON3.write(msg))
|
||||
end
|
||||
end
|
||||
|
||||
"""
|
||||
respond function check msg for additional communication info in addition to publish()
|
||||
"""
|
||||
function respond(client::mqttClient, respond_msg::Dict, request_msg::Dict; pubtopic::String="")
|
||||
replyTo = request_msg[:msgMeta][:replyTo]
|
||||
respond_msg[:msgMeta][:repondToMsgId] = request_msg[:msgMeta][:msgId]
|
||||
respond_msg[:msgMeta][:requestrespond] = "respond"
|
||||
|
||||
pubto = Vector{String}()
|
||||
if pubtopic != ""
|
||||
push!(pubto, pubtopic)
|
||||
elseif replyTo != ""
|
||||
push!(pubto, replyTo)
|
||||
else
|
||||
if length(client.pubtopic) != 0
|
||||
for (k, topic) in pairs(client.pubtopic)
|
||||
push!(pubto, topic)
|
||||
end
|
||||
else
|
||||
error("publish topic not defined")
|
||||
end
|
||||
end
|
||||
|
||||
for i in pubto
|
||||
Mosquitto.publish(client.clientInstance, i, JSON3.write(respond_msg))
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
"""
|
||||
Store details about mqtt and connected-Clients1
|
||||
"""
|
||||
mqtt_member_dict = Dict(
|
||||
:firstClient=> Dict( # masterController itself for internal communication
|
||||
:clientName=> "masterController", # name of this client
|
||||
:clientID=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98",
|
||||
:username=> nothing,
|
||||
:password=> nothing,
|
||||
|
||||
# mqtt container is located outside of greenhouse-network1,
|
||||
# 172.17.0.1 is local docker IP
|
||||
:serverName=> "local_mqtt_server", # name of the server this client is connecting
|
||||
:broker=> "172.17.0.1",
|
||||
:port=> 1883,
|
||||
:requesttopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/request", # channel of THIS client
|
||||
:respondtopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/respond", # channel of THIS client
|
||||
:telemetrytopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/telemetry", # channel of THIS client
|
||||
:debugtopic=> "09f165cf-df9e-4607-bb34-a3c9f0a6bb98/debug", # channel of THIS client
|
||||
:pubtoptic=> Vector{String}(), # other channel this client publish to
|
||||
:subtoptic=> Vector{String}(), # other channel this client subscribe to
|
||||
:mqttClient=> nothing, # client instance after connected to mqtt server resides here
|
||||
:connectStatus=> false,
|
||||
:subStatus=> false
|
||||
),
|
||||
:secondClient=> Dict( # masterController itself for internal communication
|
||||
:clientName=> "masterController", # name of this client
|
||||
:clientID=> "42f53073-b2ed-4b4a-8b11-155c58135dfc",
|
||||
:username=> nothing,
|
||||
:password=> nothing,
|
||||
|
||||
# mqtt container is located outside of greenhouse-network1,
|
||||
# 172.17.0.1 is local docker IP
|
||||
:serverName=> "yiemMaintenance", # name of the server this client is connecting
|
||||
:broker=> "mqtt.yiem.ai",
|
||||
:port=> 1883,
|
||||
:requesttopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/request", # channel of THIS client
|
||||
:respondtopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/respond", # channel of THIS client
|
||||
:telemetrytopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/telemetry", # channel of THIS client
|
||||
:debugtopic=> "42f53073-b2ed-4b4a-8b11-155c58135dfc/debug", # channel of THIS client
|
||||
:pubtoptic=> Vector{String}(), # other channel this client publish to
|
||||
:subtoptic=> Vector{String}(), # other channel this client subscribe to
|
||||
:mqttClient=> nothing, # client instance after connected to mqtt server resides here
|
||||
:connectStatus=> false,
|
||||
:subStatus=> false
|
||||
),
|
||||
:thirdClient=> Dict( # masterController itself for internal communication
|
||||
:clientName=> "masterController", # name of this client
|
||||
:clientID=> "f64e864b-e93f-4d1e-8662-0d6da68c9c03", #TODO change here netpie clientID
|
||||
:username=> "t5gBa", #TODO change here netpie token
|
||||
:password=> raw"GP$gxe_2W8vJ-9-5)YU8#-x", #TODO change here netpie secret
|
||||
|
||||
# mqtt container is located outside of greenhouse-network1,
|
||||
# 172.17.0.1 is local docker IP
|
||||
:serverName=> "netpie2020", # name of the server this client is connecting
|
||||
:broker=> "broker.netpie.io",
|
||||
:port=> 1883,
|
||||
:requesttopic=> "@msg/masterController", # channel of THIS client
|
||||
:respondtopic=> "@shadow/data/update", # channel of THIS client # channel of THIS client
|
||||
:pubtoptic=> Vector{String}(), # other channel this client publish to
|
||||
:subtoptic=> Vector{String}(), # other channel this client subscribe to
|
||||
:mqttClient=> nothing, # client instance after connected to mqtt server resides here
|
||||
:connectStatus=> false,
|
||||
:subStatus=> false
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
""" operate on mqtt_member_dict
|
||||
Connect all clients to relevent server
|
||||
"""
|
||||
function mqttConnect!(mqttMemberDict::Dict)
|
||||
for (k, v) in mqttMemberDict
|
||||
member = mqttMemberDict[k]
|
||||
member[:mqttClient] = member[:clientID] === nothing ? Mosquitto.Client() :
|
||||
Mosquitto.Client(id=member[:clientID])
|
||||
println("connecting member $k")
|
||||
port = haskey(member, :port) ? member[:port] : 1883
|
||||
if member[:username] === nothing
|
||||
result = Mosquitto.connect(member[:mqttClient], member[:broker], port)
|
||||
member[:connectStatus] = result == 0 ? true : false
|
||||
else
|
||||
result = Mosquitto.connect(member[:mqttClient], member[:broker], port,
|
||||
username=member[:username],
|
||||
password=member[:password])
|
||||
member[:connectStatus] = result == 0 ? true : false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
""" operate on mqtt_member_dict
|
||||
Checking all client for potential connection lost and mark the lost connection
|
||||
"""
|
||||
function mqttCheckConnection!(mqttMemberDict::Dict)
|
||||
# check channel, if there is something todo
|
||||
nmessages = Base.n_avail(get_connect_channel())
|
||||
|
||||
if nmessages == 0
|
||||
return 0
|
||||
else
|
||||
for i = 1:nmessages
|
||||
# each conncb in a channel is a connection struct resulted from Mosquitto.connect()
|
||||
conncb = take!(get_connect_channel())
|
||||
for (k, v) in mqttMemberDict
|
||||
member = mqttMemberDict[k]
|
||||
if conncb.clientptr == member[:mqttClient].cptr.mosc # use conncb.clientptr (pointer) as identifier
|
||||
if conncb.val == 1
|
||||
println("$(conncb.clientptr): connection successfull")
|
||||
member[:connectStatus] = true
|
||||
elseif conncb.val == 0
|
||||
println("$(conncb.clientptr): member $i disconnected")
|
||||
member[:connectStatus] = false
|
||||
member[:subStatus] = false
|
||||
else
|
||||
error("something wrong with connection message")
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
""" operate on mqtt_member_dict
|
||||
Reconnect all client that is marked "lost connection"
|
||||
"""
|
||||
function mqttReconnect!(mqttMemberDict::Dict)
|
||||
for (k, v) in mqttMemberDict # find the disconnected member
|
||||
member = mqttMemberDict[k]
|
||||
if member[:connectStatus] == false
|
||||
Mosquitto.disconnect(member[:mqttClient]) # disconnect to release port before attemping new connection
|
||||
|
||||
port = haskey(member, :port) ? member[:port] : 1883
|
||||
if member[:username] === nothing # check if the member needs username/password
|
||||
result = Mosquitto.connect(member[:mqttClient], member[:broker], port)
|
||||
member[:connectStatus] = result == 0 ? true : false
|
||||
else
|
||||
result = Mosquitto.connect(member[:mqttClient], member[:broker], port,
|
||||
username=member[:username],
|
||||
password=member[:password])
|
||||
member[:connectStatus] = result == 0 ? true : false
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
""" operate on mqtt_member_dict
|
||||
Subscribe all topic for all client
|
||||
"""
|
||||
function mqttSubOnConnect!(mqttMemberDict::Dict, subChannel::Vector{String})
|
||||
for (k, v) in mqttMemberDict
|
||||
member = mqttMemberDict[k]
|
||||
#TODO once reconnect, publish current machine state to netpie
|
||||
if member[:connectStatus] == true && member[:subStatus] == false
|
||||
Mosquitto.subscribe(member[:mqttClient], member[:requesttopic]) # sub to its own channel
|
||||
|
||||
# mastercontroller sub to other controller
|
||||
if k == :firstClient
|
||||
Mosquitto.subscribe(member[:mqttClient], "$(config[:farminfo][:farmId])/controller/+/telemetry")
|
||||
Mosquitto.subscribe(member[:mqttClient], "$(config[:farminfo][:farmId])/controller/+/respond")
|
||||
end
|
||||
member[:subStatus] = true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
""" operate on mqtt_member_dict
|
||||
"""
|
||||
# What to do if there is a message
|
||||
function mqttOnmessage!(mqttMemberDict, netpieCommDict::Dict, workDict::Dict)
|
||||
nmessages = Base.n_avail(get_messages_channel())
|
||||
nmessages == 0 && return 0
|
||||
|
||||
if workDict[:residueMsgClearedStartTime] === nothing
|
||||
workDict[:residueMsgClearedStartTime] = Dates.now()
|
||||
end
|
||||
timepass = (Dates.now() - workDict[:residueMsgClearedStartTime]).value / 1000.0
|
||||
|
||||
if timepass <= 10 && workDict[:residueMsgCleared] == 0
|
||||
# discard all residue message
|
||||
for i = 1:nmessages
|
||||
_ = take!(get_messages_channel()) # take new arrival package
|
||||
end
|
||||
elseif timepass > 10 && workDict[:residueMsgCleared] == 0
|
||||
workDict[:residueMsgCleared] = 1
|
||||
println("residue mqtt messages cleared")
|
||||
else
|
||||
for i = 1:nmessages
|
||||
pkg = take!(get_messages_channel()) # take new arrival package
|
||||
topic = pkg.topic
|
||||
sptoptic = split(topic, "/")
|
||||
payload = vecToDict(pkg.payload)
|
||||
# println("topic ", topic)
|
||||
# println("payload ", payload)
|
||||
# Do something with the message
|
||||
if topic == mqttMemberDict[:firstClient][:requesttopic]
|
||||
# publish(c2, "julia", "From member 2"; qos = 2)
|
||||
# result = yiemHandler()
|
||||
return 0
|
||||
#FIXME add yiem.ai in config.json file
|
||||
# elseif topic == mqttMemberDict[:yiemMaintenance][:requesttopic]
|
||||
# localHandler(payload, workDict)
|
||||
elseif topic == mqttMemberDict[:netpieGreenhouseActive][:requesttopic] # command from NETPIE
|
||||
println("NETPIE msg")
|
||||
netpieMsgHandler(payload, netpieCommDict, workDict, mqttMemberDict[:netpieGreenhouseActive])
|
||||
|
||||
elseif sptoptic[2] == "controller"
|
||||
controllerName = sptoptic[3]
|
||||
if controllerName == "fertilizerController_1"
|
||||
fertilizerController_msgHandler(sptoptic[end], payload, workDict)
|
||||
elseif controllerName == "temperatureController_1"
|
||||
temperatureController_msgHandler(sptoptic[end], payload, workDict)
|
||||
elseif controllerName == "humidityController_1"
|
||||
humidityController_msgHandler(sptoptic[end], payload, workDict)
|
||||
elseif controllerName == "illuminanceController_1"
|
||||
illuminanceController_msgHandler(sptoptic[end], payload, workDict)
|
||||
elseif controllerName == "co2Controller_1"
|
||||
co2Controller_msgHandler(sptoptic[end], payload, workDict)
|
||||
elseif controllerName == "ventilationController_1"
|
||||
ventilationController_msgHandler(sptoptic[end], payload, workDict)
|
||||
else
|
||||
error("undefined condition")
|
||||
end
|
||||
else
|
||||
println("Not yet define what to do with message from this topic: $topic")
|
||||
return 1
|
||||
end
|
||||
|
||||
return 0
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
""" operate on mqtt_member_dict
|
||||
network run
|
||||
"""
|
||||
function mqttLoop!(mqttMemberDict; timeout=100)
|
||||
for (k, v) in mqttMemberDict
|
||||
member = mqttMemberDict[k]
|
||||
loop(member[:mqttClient], timeout=timeout)
|
||||
end
|
||||
end
|
||||
|
||||
""" operate on mqtt_member_dict
|
||||
"""
|
||||
function mqttDisconnect!(memberList)
|
||||
for member in memberList
|
||||
disconnect(member[:mqttClient])
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
"""
|
||||
publish msg with msgMeta information.
|
||||
|
||||
sourceName = "fertilizerController_1" or could be any name
|
||||
destinationName = "masterController" or could be any name
|
||||
topic = :requesttopic, :respondtopic, :telemetrytopic will use its own topic
|
||||
or
|
||||
"masterController/request" will use specified topic
|
||||
"""
|
||||
function publishMsg(destinationName::String, msgPurpose::String, topic::Symbol,
|
||||
variableNameAndValue::Dict, workDict::Dict, config::Dict;
|
||||
sourceName=workDict[:mainCtrlName])
|
||||
|
||||
msg = Dict{Symbol, Any}(
|
||||
:msgMeta=> Dict(
|
||||
:msgPurpose=> msgPurpose, # "updateStatus", "fertilizerMixOrder"
|
||||
:from=> sourceName,
|
||||
:to=> destinationName,
|
||||
:repondToMsgId=> nothing, # the msg this msg is responding to
|
||||
:repondtopic=> nothing, # ask server to reply to this topic
|
||||
:timestamp=> Dates.now(),
|
||||
:msgId=> uuid4()),
|
||||
:payload=> nothing,
|
||||
)
|
||||
for (k, v) in variableNameAndValue
|
||||
msg[k] = v
|
||||
end
|
||||
|
||||
pubtopic = config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][topic]
|
||||
publish(config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][:mqttClient],
|
||||
pubtopic, JSON3.write(msg))
|
||||
end
|
||||
|
||||
function publishMsg(destinationName::String, msgPurpose::String, topic::String,
|
||||
variableNameAndValue::Dict, workDict::Dict, config::Dict;
|
||||
sourceName=workDict[:mainCtrlName])
|
||||
|
||||
msg = Dict{Symbol, Any}(
|
||||
:msgMeta=> Dict(
|
||||
:msgPurpose=> msgPurpose, # "updateStatus", "fertilizerMixOrder"
|
||||
:from=> sourceName,
|
||||
:to=> destinationName,
|
||||
:repondToMsgId=> nothing, # the msg this msg is responding to
|
||||
:repondtopic=> nothing, # ask server to reply to this topic
|
||||
:timestamp=> Dates.now(),
|
||||
:msgId=> uuid4()),
|
||||
:payload=> nothing,
|
||||
)
|
||||
for (k, v) in variableNameAndValue
|
||||
msg[k] = v
|
||||
end
|
||||
|
||||
pubtopic = topic
|
||||
publish(config[workDict[:mainCtrlName]][:mqttMembers][:firstClient][:mqttClient],
|
||||
pubtopic, JSON3.write(msg))
|
||||
end
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
end # module end
|
||||
@@ -241,12 +241,12 @@ end
|
||||
"""
|
||||
mqtt communication loop, call this function manually
|
||||
"""
|
||||
function mqttRun(client::mqttClient, userMsgHandler::Function)
|
||||
function mqttRun(client::mqttClient)
|
||||
mqttLoop!(client)
|
||||
mqttCheckConnection!(client)
|
||||
mqttReconnect!(client)
|
||||
mqttSubOnConnect!(client)
|
||||
mqttOnmessage!(client, userMsgHandler)
|
||||
return mqttOnmessage!(client)
|
||||
end
|
||||
|
||||
|
||||
@@ -260,9 +260,9 @@ end
|
||||
end
|
||||
|
||||
"""
|
||||
function mqttOnmessage!(client::mqttClient, userMsgHandler::Function)
|
||||
function mqttOnmessage!(client::mqttClient)
|
||||
nmessages = Base.n_avail(get_messages_channel())
|
||||
nmessages == 0 && return 0
|
||||
nmessages == 0 && return nothing
|
||||
|
||||
timepass = (Dates.now() - client.boottime).value / 1000.0
|
||||
|
||||
@@ -281,8 +281,7 @@ function mqttOnmessage!(client::mqttClient, userMsgHandler::Function)
|
||||
# sptoptic = split(topic, "/")
|
||||
payload = vecToDict(pkg.payload) # payload in Dict format
|
||||
|
||||
# Do something with the message
|
||||
userMsgHandler(topic, payload) # user defined function in user space. How to handle msg
|
||||
return payload
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -348,21 +347,8 @@ function respond(client::mqttClient, respond_msg::Dict, request_msg::Dict; pubto
|
||||
end
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
"""
|
||||
Store details about mqtt and connected-Clients1
|
||||
Example of mqtt_member_dict which store details about mqtt and connected-Client
|
||||
"""
|
||||
mqtt_member_dict = Dict(
|
||||
:firstClient=> Dict( # masterController itself for internal communication
|
||||
@@ -429,8 +415,6 @@ mqtt_member_dict = Dict(
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
""" operate on mqtt_member_dict
|
||||
Connect all clients to relevent server
|
||||
"""
|
||||
@@ -536,7 +520,7 @@ end
|
||||
""" operate on mqtt_member_dict
|
||||
"""
|
||||
# What to do if there is a message
|
||||
function mqttOnmessage!(mqttMemberDict, netpieCommDict::Dict, workDict::Dict)
|
||||
function mqttOnmessage!(mqttMemberDict::Dict, netpieCommDict::Dict, workDict::Dict)
|
||||
nmessages = Base.n_avail(get_messages_channel())
|
||||
nmessages == 0 && return 0
|
||||
|
||||
@@ -604,7 +588,7 @@ end
|
||||
""" operate on mqtt_member_dict
|
||||
network run
|
||||
"""
|
||||
function mqttLoop!(mqttMemberDict; timeout=100)
|
||||
function mqttLoop!(mqttMemberDict::Dict; timeout=100)
|
||||
for (k, v) in mqttMemberDict
|
||||
member = mqttMemberDict[k]
|
||||
loop(member[:mqttClient], timeout=timeout)
|
||||
|
||||
Reference in New Issue
Block a user