first commit

This commit is contained in:
2023-03-14 11:29:44 +07:00
commit b215f1a3ac
19 changed files with 1061 additions and 0 deletions

39
src/Mosquitto.jl Normal file
View File

@@ -0,0 +1,39 @@
# Documentation
# https://mosquitto.org/api/files/mosquitto-h.html#mosquitto_message_callback_set
# https://github.com/eclipse/mosquitto/blob/master/include/mosquitto.h
module Mosquitto
import Base.finalizer
using Random, Libdl
# find library
const libmosquitto = @static if Sys.isunix()
Libdl.find_library(["libmosquitto", "libmosquitto.so.1"])
elseif Sys.iswindows()
Libdl.find_library("mosquitto.dll", [raw"C:\Program Files\Mosquitto"])
end
function __init__()
libmosquitto == "" && throw("Could not find the mosquitto library. If you're sure that it's installed, try adding it to DL_LOAD_PATH and rebuild the package.")
mosq_error_code = ccall((:mosquitto_lib_init, libmosquitto), Cint, ())
mosq_error_code != 0 && println("Mosquitto init returned error code $mosq_error_code")
v = lib_version()
v[1] != 2 || v[2] != 0 && println("Found lib version $(v[1]).$(v[2]), which is different from 2.0. Some functionality might not work")
atexit(lib_cleanup)
end
include("helpers.jl")
include("cwrapper.jl")
export lib_version
include("callbacks.jl")
export get_messages_channel, get_connect_channel
include("client.jl")
export Client, connect, reconnect, disconnect, publish, subscribe, unsubscribe, loop, tls_set, tls_psk_set
include("looprunner.jl")
export loop_start, loop_stop
end # module

84
src/callbacks.jl Normal file
View File

@@ -0,0 +1,84 @@
"""
struct MessageCB with fields
* topic:: String
* payload::Vector{UInt8}
A struct containing incoming message information and payload.
"""
struct MessageCB
topic::String
payload::Vector{UInt8}
end
"""
struct ConnectionCB with fields
* clientptr::Ptr
* val::UInt8
* returncode::Cint
The clientptr contains the ptr of the client that connected or disconnected.
This allows to distinguish between clients.
val is 0 on disconnect and 1 on connect.
returncode is the MQTT return code which can be used to identify, e.g., the reason for a disconnect.
"""
struct ConnectionCB
clientptr::Ptr{Cmosquitto}
val::UInt8
returncode::Cint
end
const messages_channel = Channel{MessageCB}(20)
const connect_channel = Channel{ConnectionCB}(5)
"""
get_messages_channel()
Returns the channel to which received messages are sent. The channel is a Channel{MessageCB}(20).
See ?Mosquitto.MessageCB for information on the struct
"""
get_messages_channel() = messages_channel
"""
get_connect_channel()
Returns the channel to which event notifications for connections or disconnections are sent. The channel is a Channel{ConnectionCB}(5).
See ?Mosquitto.ConnectionCB for information on the struct
"""
get_connect_channel() = connect_channel
# This callback function puts any message on arrival in the channel
# messages_channel which is a Channel{Mosquitto.Message}(20)
function callback_message(mos::Ptr{Cmosquitto}, obj::Ptr{Cvoid}, message::Ptr{CMosquittoMessage}) #, clientid::String)
# get topic and payload from the message
jlmessage = unsafe_load(message)
jlpayload = [unsafe_load(jlmessage.payload, i) for i = 1:jlmessage.payloadlen]
topic = unsafe_string(jlmessage.topic)
# put it in the channel for further use
if Base.n_avail(messages_channel)>=messages_channel.sz_max
take!(messages_channel)
end
put!(messages_channel, MessageCB(topic, jlpayload))
return nothing
end
function callback_connect(mos::Ptr{Cmosquitto}, obj::Ptr{Cvoid}, rc::Cint)
if Base.n_avail(connect_channel)>=connect_channel.sz_max
take!(connect_channel)
end
put!( connect_channel, ConnectionCB(mos, one(UInt8), rc ) )
return nothing
end
function callback_disconnect(mos::Ptr{Cmosquitto}, obj::Ptr{Cvoid}, rc::Cint)
if Base.n_avail(connect_channel)>=connect_channel.sz_max
take!(connect_channel)
end
put!( connect_channel, ConnectionCB(mos, zero(UInt8), rc ) )
return nothing
end

173
src/client.jl Normal file
View File

@@ -0,0 +1,173 @@
import Base.n_avail, Base.show
struct Cobjs
mosc::Ref{Cmosquitto}
obj::Ref{Cvoid}
conncb::Ref{Cvoid}
dconncb::Ref{Cvoid}
end
mutable struct MoscStatus
conn_status::Bool
loop_status::Bool
end
struct Client
id::String
cptr::Cobjs
loop_channel::AbstractChannel{Int}
status::MoscStatus
end
function show(io::IO, client::Client)
println("MQTTClient_$(client.id)")
end
function finalizer(client::Client)
disconnect(client)
destroy(client.cptr.mosc)
end
"""
Client(ip::String, port::Int=1883; kwargs...)
Create a client connection to an MQTT broker. Possible key word arguments are:
* id::String = randstring(15) The id should be unique per connection.
* connectme::Bool = true Connect immediately if true. If false, you need to manually use *connect(client, ip, port)* and input arguments are not used.
* startloop::Bool = true If true, and Threads.nthreads()>1, the network loop will be executed regularly after connection.
Client( ; id::String = randstring(15))
Create a client structure without connecting to a broker or starting a network loop.
"""
function Client(ip::String, port::Int=1883; id::String = randstring(15), connectme::Bool = true, startloop::Bool = true)
# Create a Client object
client = Client( ; id = id )
# Possibly Connect to broker
if connectme
flag = connect(client, ip, port)
flag != 0 && println("Connection to the broker failed")
# Start loop if it can be started without blocking
if flag == 0 && startloop && Threads.nthreads()>1
loop_start(client)
elseif startloop
println("Single thread, loop will be blocking, start it manually using loop_start(::Client) or call loop(client) regularly.")
end
end
return client
end
function Client(; id::String = randstring(15))
# Create mosquitto object
cobj = Ref{Cvoid}()
cmosc = mosquitto_new(id, true, cobj)
# Set callbacks
#f_message_cb(mos, obj, message) = callback_message(mos, obj, message, id)
cfunc_message = @cfunction(callback_message, Cvoid, (Ptr{Cmosquitto}, Ptr{Cvoid}, Ptr{CMosquittoMessage}))
message_callback_set(cmosc, cfunc_message)
cfunc_connect = @cfunction(callback_connect, Cvoid, (Ptr{Cmosquitto}, Ptr{Cvoid}, Cint))
connect_callback_set(cmosc, cfunc_connect)
cfunc_disconnect = @cfunction(callback_disconnect, Cvoid, (Ptr{Cmosquitto}, Ptr{Cvoid}, Cint))
disconnect_callback_set(cmosc, cfunc_disconnect)
# Create object
loop_channel = Channel{Int}(1)
return Client(id, Cobjs(cmosc, cobj, cfunc_connect, cfunc_disconnect), loop_channel, MoscStatus(false, false) )
end
"""
connect(client::Client, ip::String, port::Int; kwargs...)
Connect the client to a broker. kwargs are:
* username::String = "" A username, should one be required
* password::String = "" A password belonging to the username
* keepalive::Int = 60 Maximal of time the client has to send PINGREQ or a message before disconnection
"""
function connect(client::Client, ip::String, port::Int; username::String = "", password::String = "", keepalive::Int = 60)
if username != ""
flag = username_pw_set(client.cptr.mosc, username, password)
flag != 0 && println("Couldnt set password and username, error $flag")
end
flag = connect(client.cptr.mosc, ip; port = port, keepalive = keepalive)
flag == 0 ? (client.status.conn_status = true) : println("Connection to broker failed")
return flag
end
"""
disconnect(client::Client)
"""
function disconnect(client::Client)
client.status.loop_status && loop_stop(client)
flag = disconnect(client.cptr.mosc)
flag == 0 && (client.status.conn_status = false)
return flag
end
"""
reconnect(client::Client)
"""
function reconnect(client::Client)
flag = reconnect(client.cptr.mosc)
flag == 0 && (client.status.conn_status = true)
return flag
end
"""
publish(client::Client, topic::String, payload; qos::Int = 1, retain::Bool = true)
Publish a message to the broker.
"""
publish(client::Client, topic::String, payload; qos::Int = 1, retain::Bool = true) = publish(client.cptr.mosc, topic, payload; qos = qos, retain = retain)
"""
subscribe(client::Client, topic::String; qos::Int = 1)
Subscribe to a topic. Received messages will be accessible Mosquitto.messages_channel as a Tuple{String, Vector{Uint8}}.
"""
subscribe(client::Client, topic::String; qos::Int = 1) = subscribe(client.cptr.mosc, topic; qos = qos)
"""
unsubscribe(client::Client, topic::String)
Unsubscribe from a topic.
"""
unsubscribe(client::Client, topic::String) = unsubscribe(client.cptr.mosc, topic)
"""
tls_set(client::Client, cafile::String; certfile::String = "", keyfile::String = "")
"""
function tls_set(client::Client, cafile::String; certfile::String = "", keyfile::String = "")
xor( certfile == "", keyfile == "" ) && throw("You need to either provide both cert and key files, or none of both")
if certfile == ""
return tls_set(client.cptr.mosc, cafile, C_NULL, C_NULL, C_NULL, C_NULL)
else
return tls_set(client.cptr.mosc, cafile, C_NULL, certfile, keyfile, C_NULL)
end
end
"""
tls_plk_set(client::Client, psk::String, identity::String, ciphers::Union{Nothing, String})
"""
function tls_psk_set(client::Client, psk::String, identity::String, ciphers::Union{Nothing, String} = nothing)
return tls_psk_set(client.cptr.mosc, psk, identity, ciphers)
end

137
src/cwrapper.jl Normal file
View File

@@ -0,0 +1,137 @@
struct Cmosquitto end
struct CMosquittoMessage
mid::Cint
topic::Cstring
payload::Ptr{UInt8} # we treat payload as raw bytes
payloadlen::Cint
qos::Cint
retain::Bool
end
function mosquitto_new(id::String, clean_start::Bool, obj)
return ccall((:mosquitto_new, libmosquitto), Ptr{Cmosquitto}, (Cstring, Bool, Ptr{Cvoid}), id, clean_start, obj)
end
function destroy(client::Ref{Cmosquitto})
return ccall((:mosquitto_destroy, libmosquitto), Cvoid, (Ptr{Cmosquitto},), client)
end
finalizer(client::Ref{Cmosquitto}) = destroy(client)
function connect(client::Ref{Cmosquitto}, host::String; port::Int = 1883, keepalive::Int = 60)
return ccall((:mosquitto_connect, libmosquitto), Cint, (Ptr{Cmosquitto}, Cstring, Cint, Cint), client, host, port, keepalive)
end
function reconnect(client::Ref{Cmosquitto})
ccall((:mosquitto_reconnect, libmosquitto), Cint, (Ptr{Cmosquitto},), client)
end
function disconnect(client::Ref{Cmosquitto})
return ccall((:mosquitto_disconnect, libmosquitto), Cint, (Ptr{Cmosquitto},), client)
end
function publish(client::Ref{Cmosquitto}, topic::String, payload; qos::Int = 1, retain::Bool = true)
payloadnew = getbytes(payload)
payloadlen = length(payloadnew) # dont use sizeof, as payloadnew might be of type "reinterpreted"
mid = Int[0]
msg_nr = ccall((:mosquitto_publish, libmosquitto), Cint,
(Ptr{Cmosquitto}, Ptr{Cint}, Cstring, Cint, Ptr{UInt8}, Cint, Bool),
client, mid, topic, payloadlen, payloadnew, qos, retain)
return msg_nr
end
function subscribe(client::Ref{Cmosquitto}, sub::String; qos::Int = 1)
mid = zeros(Cint, 1)
msg_nr = ccall((:mosquitto_subscribe, libmosquitto), Cint,
(Ptr{Cmosquitto}, Ptr{Cint}, Cstring, Cint),
client, mid, sub, qos)
return msg_nr
end
function unsubscribe(client::Ref{Cmosquitto}, sub::String)
mid = zeros(Cint, 1)
msg_nr = ccall((:mosquitto_unsubscribe, libmosquitto), Cint,
(Ptr{Cmosquitto}, Ptr{Cint}, Cstring),
client, mid, sub)
return msg_nr
end
#= Broken?
function loop_start(client::Ref{Cmosquitto})
msg_nr = ccall((:mosquitto_loop_start, libmosquitto), Cint, (Ptr{Cmosquitto},), client)
return msg_nr
end
function loop_stop(client::Ref{Cmosquitto}; force::Bool = false)
msg_nr = ccall((:mosquitto_loop_stop, libmosquitto), Cint, (Ptr{Cmosquitto}, Bool), client, force)
return msg_nr
end
=#
function loop_forever(client; timeout::Int = 1000, max_packets::Int = 1)
return ccall((:mosquitto_loop_forever, libmosquitto), Cint, (Ptr{Cmosquitto}, Cint, Cint), client, timeout, max_packets)
end
function loop(client; timeout::Int = 1000, max_packets::Int = 1)
return ccall((:mosquitto_loop, libmosquitto), Cint, (Ptr{Cmosquitto}, Cint, Cint), client, timeout, max_packets)
end
function connect_callback_set(client::Ref{Cmosquitto}, cfunc)
return ccall((:mosquitto_connect_callback_set, libmosquitto), Cvoid, (Ptr{Cmosquitto}, Ptr{Cvoid}), client, cfunc)
end
function disconnect_callback_set(client::Ref{Cmosquitto}, cfunc)
return ccall((:mosquitto_disconnect_callback_set, libmosquitto), Cvoid, (Ptr{Cmosquitto}, Ptr{Cvoid}), client, cfunc)
end
function message_callback_set(client::Ref{Cmosquitto}, cfunc)
ccall((:mosquitto_message_callback_set, libmosquitto), Cvoid, (Ptr{Cmosquitto}, Ptr{Cvoid}), client, cfunc)
return nothing
end
function username_pw_set(client::Ref{Cmosquitto}, username::String, password::String)
#password != "" && (password = Cstring(C_NULL))
return ccall((:mosquitto_username_pw_set, libmosquitto), Cint, (Ptr{Cmosquitto}, Cstring, Cstring), client, username, password)
end
function tls_set(client::Ref{Cmosquitto}, cafile, capath, certfile, keyfile, callback::Ptr{Cvoid})
return ccall((:mosquitto_tls_set, libmosquitto), Cint, (Ptr{Cmosquitto}, Cstring, Cstring, Cstring, Cstring, Ptr{Cvoid}), client, cafile, capath, certfile, keyfile, callback)
end
function tls_psk_set(client::Ref{Cmosquitto}, psk::String, identity::String, ciphers::Nothing)
return ccall((:mosquitto_tls_psk_set, libmosquitto), Cint, (Ptr{Cmosquitto}, Cstring, Cstring, Cstring), client, psk, identity, C_NULL)
end
function tls_psk_set(client::Ref{Cmosquitto}, psk::String, identity::String, ciphers::String)
return ccall((:mosquitto_tls_psk_set, libmosquitto), Cint, (Ptr{Cmosquitto}, Cstring, Cstring, Cstring), client, psk, identity, ciphers)
end
function lib_version()
maj = zeros(Int, 1)
min = zeros(Int, 1)
rev = zeros(Int, 1)
ccall((:mosquitto_lib_version, libmosquitto), Cint, (Ptr{Cint}, Ptr{Cint}, Ptr{Cint}), maj, min, rev)
return maj[1], min[1], rev[1]
end
function lib_cleanup()
ccall((:mosquitto_lib_cleanup, libmosquitto), Cvoid, ())
end

5
src/helpers.jl Normal file
View File

@@ -0,0 +1,5 @@
# The function getbytes transforms the payload into a Vector representation of UInt8
@inline getbytes(in::String) = transcode(UInt8, in)
@inline getbytes(in::AbstractVector{UInt8}) = in
@inline getbytes(in::Number) = reinterpret(UInt8, [in])
@inline getbytes(in) = reinterpret(UInt8, in)

75
src/looprunner.jl Normal file
View File

@@ -0,0 +1,75 @@
"""
loop(client::Client; timeout::Int = 1000, ntimes::Int = 1)
Perform a network loop. This will get messages of subscriptions and send published messages.
"""
function loop(client::Client; timeout::Int = 1000, ntimes::Int = 1, autoreconnect::Bool = true)
out = zero(Cint)
for _ = 1:ntimes
out = loop(client.cptr.mosc; timeout = timeout)
if autoreconnect && out == 4
flag = reconnect(client)
client.status.conn_status = ifelse( flag == 0, true, false )
end
end
return out
end
"""
loop_start(client::Client; autoreconnect::Bool = true)
This function keeps calling the network loop until loop_stop is called.
If only one thread is used, this function will be blocking, else the calls
will be executed on a worker thread.
"""
function loop_start(client::Client; autoreconnect::Bool = true)
if client.status.loop_status == true
println("Loop is already running")
return 1
end
if Threads.nthreads()>1
client.status.loop_status = true
Threads.@spawn loop_runner(client, autoreconnect)
else
client.status.loop_status = true
loop_forever(client.cptr.mosc)
end
return 0
end
"""
loop_stop(client::Client)
Stop the network loop.
"""
function loop_stop(client::Client)
if client.status.loop_status
put!(client.loop_channel, 0)
return fetch(client.loop_channel)
else
println("Loop not running")
return 0
end
end
function loop_runner(client::Client, autoreconnect::Bool)
while isempty(client.loop_channel)
msg = loop(client.cptr.mosc)
if autoreconnect && msg == Cint(4)
# case of a disconnect, try reconnecting every 2 seconds
println("Client disconnected, trying to reconnect...")
reconnect(client) != 0 && sleep(2)
elseif msg != Cint(0)
client.status.loop_status = false
println("Loop failed with error $msg")
return msg
end
end
client.status.loop_status = false
return take!(client.loop_channel)
end