Skip to content

Commit

Permalink
create a mock broker and update tests and precomp with it
Browse files Browse the repository at this point in the history
  • Loading branch information
NickMcSweeney committed May 28, 2024
1 parent 7407565 commit f75dda8
Show file tree
Hide file tree
Showing 15 changed files with 333 additions and 114 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "MQTTClient"
uuid = "985f35cc-2c3d-4943-b8c1-f0931d5f0959"
authors = ["Nick Shindler <[email protected]>"]
version = "0.3.0"
version = "0.3.1"

[deps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Expand Down
2 changes: 1 addition & 1 deletion docs/src/api/client.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
```@docs
Client
MQTTConnection
Connection
IOConnection
MQTTClient.Message
User
Expand Down
4 changes: 2 additions & 2 deletions docs/src/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ using MQTTClient

## Getting started
To use this library you need to follow at least these steps:
1. Create an `MQTTConnection` struct for a given broker and protocol.
1. Create an `Connection` struct for a given broker and protocol.
2. Create an instance of the `Client` struct.
3. Call the connect method with your `Client` and `MQTTConnection` instance.
3. Call the connect method with your `Client` and `Connection` instance.
4. Exchange data with the broker through publish, subscribe and unsubscribe. When subscribing, pass your `on_msg` function for that topic.
5. Disconnect from the broker. (Not strictly necessary, if you don't want to resume the session but considered good form and less likely to crash).

Expand Down
2 changes: 1 addition & 1 deletion docs/src/interfaces.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Connects the `Client` instance to the specified broker. There is a synchronous a
#### Arguments
**Required arguments:**
* **client**::Client: The client to connect to the broker.
* **connection**::MQTTConnection: The information for how the client connects to the broker.
* **connection**::Connection: The information for how the client connects to the broker.

use `MakeConnection` to get the client and the connection objects.

Expand Down
90 changes: 58 additions & 32 deletions ext/PrecompileMQTT.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,38 @@ using Sockets

using MQTTClient

precompile(Tuple{typeof(Core.kwcall), NamedTuple{(:qos,), Tuple{MQTTClient.QOS}}, typeof(MQTTClient.subscribe), MQTTClient.Client, String, Function})
precompile(Tuple{typeof(Core.kwcall), NamedTuple{(:qos,), Tuple{MQTTClient.QOS}}, typeof(MQTTClient.publish), MQTTClient.Client, String, String})

precompile(Tuple{typeof(Base.convert), Type{MQTTClient.Packet}, MQTTClient.Packet})
precompile(Tuple{typeof(Base.write), Base.PipeEndpoint, Array{UInt8, 1}})
precompile(Tuple{typeof(Base.read), Base.PipeEndpoint, Type{UInt8}})
precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64})
precompile(Tuple{typeof(Base.read), Base.PipeEndpoint, Int64})
precompile(Tuple{typeof(Base.haskey), Base.Dict{UInt8, Function}, UInt8})
precompile(Tuple{typeof(Base.getindex), Base.Dict{UInt8, Function}, UInt8})
precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64, Int64})
precompile(Tuple{typeof(Base.read), Sockets.TCPSocket, Int64})
precompile(Tuple{typeof(Base.write), Base.PipeEndpoint, UInt8})
precompile(Tuple{typeof(Base.fetch), Base.Channel{Any}})
precompile(Tuple{typeof(Base.iterate), UInt16})
precompile(Tuple{typeof(Base.something), MQTTClient.TrieNode{MQTTClient.FunctionCallback}, Nothing, Vararg{Any}})

precompile(Tuple{typeof(Sockets.connect), MQTTClient.Client, MQTTClient.MQTTConnection{MQTTClient.TCP}})
precompile(Tuple{typeof(Sockets.connect), Sockets.IPv6, Int64})
precompile(Tuple{typeof(Sockets.connect), MQTTClient.Client, MQTTClient.MQTTConnection{MQTTClient.UDS}})
precompile(Tuple{typeof(Sockets.connect), String})

precompile(Tuple{typeof(MQTTClient.write_len), Sockets.TCPSocket, Int64})
precompile(Tuple{typeof(MQTTClient.read_len), Sockets.TCPSocket})
precompile(Tuple{typeof(MQTTClient.write_packet), MQTTClient.Client, UInt8, String, Vararg{Any}})
precompile(Tuple{typeof(MQTTClient.mqtt_write), Base.GenericIOBuffer{Array{UInt8, 1}}, UInt8})
precompile(Tuple{typeof(MQTTClient.write_len), Base.PipeEndpoint, Int64})
precompile(Tuple{typeof(MQTTClient.read_len), Base.PipeEndpoint})
precompile(Tuple{typeof(MQTTClient.mqtt_write), Base.GenericIOBuffer{Array{UInt8, 1}}, MQTTClient.QOS})

precompile(Tuple{Type{MQTTClient.Packet}, UInt8, Tuple{}})
precompile(Tuple{Type{NamedTuple{(:qos,), T} where T<:Tuple}, Tuple{MQTTClient.QOS}})
# precompile(Tuple{typeof(Core.kwcall), NamedTuple{(:qos,), Tuple{MQTTClient.QOS}}, typeof(MQTTClient.subscribe), MQTTClient.Client, String, Function})
# precompile(Tuple{typeof(Core.kwcall), NamedTuple{(:qos,), Tuple{MQTTClient.QOS}}, typeof(MQTTClient.publish), MQTTClient.Client, String, String})

# precompile(Tuple{typeof(Base.convert), Type{MQTTClient.Packet}, MQTTClient.Packet})
# precompile(Tuple{typeof(Base.write), Base.PipeEndpoint, Array{UInt8, 1}})
# precompile(Tuple{typeof(Base.read), Base.PipeEndpoint, Type{UInt8}})
# precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64})
# precompile(Tuple{typeof(Base.read), Base.PipeEndpoint, Int64})
# precompile(Tuple{typeof(Base.haskey), Base.Dict{UInt8, Function}, UInt8})
# precompile(Tuple{typeof(Base.getindex), Base.Dict{UInt8, Function}, UInt8})
# precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64, Int64})
# precompile(Tuple{typeof(Base.read), Sockets.TCPSocket, Int64})
# precompile(Tuple{typeof(Base.write), Base.PipeEndpoint, UInt8})
# precompile(Tuple{typeof(Base.fetch), Base.Channel{Any}})
# precompile(Tuple{typeof(Base.iterate), UInt16})
# precompile(Tuple{typeof(Base.something), MQTTClient.TrieNode{MQTTClient.FunctionCallback}, Nothing, Vararg{Any}})

# precompile(Tuple{typeof(Sockets.connect), MQTTClient.Client, MQTTClient.Connection{MQTTClient.TCP}})
# precompile(Tuple{typeof(Sockets.connect), Sockets.IPv6, Int64})
# precompile(Tuple{typeof(Sockets.connect), MQTTClient.Client, MQTTClient.Connection{MQTTClient.UDS}})
# precompile(Tuple{typeof(Sockets.connect), String})

# precompile(Tuple{typeof(MQTTClient.write_len), Sockets.TCPSocket, Int64})
# precompile(Tuple{typeof(MQTTClient.read_len), Sockets.TCPSocket})
# precompile(Tuple{typeof(MQTTClient.write_packet), MQTTClient.Client, UInt8, String, Vararg{Any}})
# precompile(Tuple{typeof(MQTTClient.mqtt_write), Base.GenericIOBuffer{Array{UInt8, 1}}, UInt8})
# precompile(Tuple{typeof(MQTTClient.write_len), Base.PipeEndpoint, Int64})
# precompile(Tuple{typeof(MQTTClient.read_len), Base.PipeEndpoint})
# precompile(Tuple{typeof(MQTTClient.mqtt_write), Base.GenericIOBuffer{Array{UInt8, 1}}, MQTTClient.QOS})

# precompile(Tuple{Type{MQTTClient.Packet}, UInt8, Tuple{}})
# precompile(Tuple{Type{NamedTuple{(:qos,), T} where T<:Tuple}, Tuple{MQTTClient.QOS}})


# Precompiling the package like this provides a slower initial load of the package but faster code execution.
Expand Down Expand Up @@ -138,6 +138,32 @@ precompile(Tuple{Type{NamedTuple{(:qos,), T} where T<:Tuple}, Tuple{MQTTClient.Q
@atomicswap c.last_id = 0x0
future = unsubscribe_async(c, topic)


## TCP Basic Run
server = MQTTClient.MockMQTTBroker(ip"127.0.0.1", 1889)
client, conn = MakeConnection(ip"127.0.0.1", 1889)

connect(client, conn)

subscribe(client, "foo/bar", cb)
publish(client, "bar/foo", qos=QOS_2)
unsubscribe(client, "foo/bar")

disconnect(client)
close(server)

## UDS Basic Run
server = MQTTClient.MockMQTTBroker("/tmp/testmqtt.sock")
client, conn = MakeConnection("/tmp/testmqtt.sock")

connect(client, conn)

subscribe(client, "foo/bar", cb)
publish(client, "bar/foo")
unsubscribe(client, "foo/bar")

disconnect(client)
close(server)
end
end

Expand Down
7 changes: 3 additions & 4 deletions src/MQTTClient.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module MQTTClient

using Distributed: Future, myid, remotecall, RemoteChannel
using Sockets: TCPSocket, IPAddr, PipeServer, getaddrinfo
import Sockets: connect
import Sockets: connect, listen, accept
using Random: randstring
import Base: ReentrantLock, lock, unlock, convert, PipeEndpoint, fetch, show
import Base: @atomic, @atomicreplace, @atomicswap, Ref, RefValue, isready
Expand All @@ -20,7 +20,7 @@ include("interface.jl")
export
MakeConnection,
Client,
MQTTConnection,
Connection,
IOConnection,
MQTTException,
User,
Expand All @@ -35,6 +35,5 @@ export
unsubscribe,
publish_async,
publish,
disconnect,
MQTT_ERR_INVAL
disconnect
end
10 changes: 4 additions & 6 deletions src/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ This client uses atomic operations to ensure thread safety for shared variables
# Constructor
`Client(ping_timeout::UInt64=UInt64(60))` constructs a new `Client` object with the specified ping timeout (default: 60 seconds).
"""
mutable struct Client
mutable struct Client <: AbstractConfigElement
@atomic state::UInt8

on_msg::TrieNode
Expand Down Expand Up @@ -161,11 +161,9 @@ end
function keep_alive_loop(client::Client)::UInt8
ping_sent = time()

if client.keep_alive > 10
check_interval = 5
else
check_interval = client.keep_alive / 2
end
# TODO: improve, this causes reconnect to take ~1 second. is there a way to interupt?
check_interval = 1

timer = Timer(0, interval = check_interval)

while !isclosed(client)
Expand Down
78 changes: 62 additions & 16 deletions src/connection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,34 +54,34 @@ IOConnection(path::AbstractString) = UDS(path)
IOConnection() = MockIOConnection()

"""
connect(protocol::UDS) -> PipeEndpoint
connect(protocol::UDS)::PipeEndpoint
Establishes a connection to a Unix domain socket at the given path specified in the `UDS` struct.
"""
connect(protocol::UDS) = connect(protocol.path)
"""
connect(protocol::TCP) -> TCPSocket
connect(protocol::TCP)::TCPSocket
Establishes a TCP connection to the given IP address and port specified in the `TCP` struct.
"""
connect(protocol::TCP) = connect(protocol.ip, protocol.port)

"""
connect(protocol::MockIOConnection) -> IOBuffer
connect(protocol::MockIOConnection)::IOBuffer
Mocks a connection to an MQTT Broker with a local IOBuffer. Should only be used for testing.
"""
connect(protocol::MockIOConnection) = IOBuffer()


"""
MQTTConnection{T <: AbstractIOConnection}
Connection{T <: AbstractIOConnection}
The `MQTTConnection` struct in Julia encapsulates the configuration and connection details required for an MQTT client to connect to an MQTT broker.
The `Connection` struct in Julia encapsulates the configuration and connection details required for an MQTT client to connect to an MQTT broker.
This struct supports two types of connection protocols: TCP and Unix Domain Sockets (UDS), both of which are subtypes of `AbstractIOConnection`.
The struct includes fields for protocol type, keep-alive interval, client ID, user credentials, a will message (a message that is sent by the broker if the client disconnects unexpectedly),
and a flag indicating whether the session is clean (i.e., no persistent session state).
The `MQTTConnection` constructor allows for flexible instantiation with default or specified values for each field,
The `Connection` constructor allows for flexible instantiation with default or specified values for each field,
enabling easy setup of connection parameters tailored to the specific requirements of the MQTT client and broker interaction.
## Fields
Expand All @@ -93,61 +93,107 @@ enabling easy setup of connection parameters tailored to the specific requiremen
- `clean_session::Bool`: Whether to start a clean session.
## Constructors
`MQTTConnection(protocol::T;
`Connection(protocol::T;
keep_alive::Int64=32,
client_id::String=randstring(8),
user::User=User("", ""),
will::Message=Message(false, 0x00, false, "", UInt8[]),
clean_session::Bool=true) where T <: AbstractIOConnection` constructs a new `MQTTConnection` object with the specified protocol and optional keyword arguments.
clean_session::Bool=true) where T <: AbstractIOConnection` constructs a new `Connection` object with the specified protocol and optional keyword arguments.
`MQTTConnection(protocol::T,
`Connection(protocol::T,
keep_alive::Int64,
client_id::String,
user::User,
will::Message,
clean_session::Bool) where T <: AbstractIOConnection` constructs a new `MQTTConnection` object with the specified arguments.
clean_session::Bool) where T <: AbstractIOConnection` constructs a new `Connection` object with the specified arguments.
### Example using TCP protocol with default and custom values
tcp_connection = MQTTConnection(
```julia
tcp_connection = Connection(
TCP(Sockets.localhost, 1883); # Using TCP with localhost and port 1883
keep_alive=60, # Custom keep-alive interval of 60 seconds
client_id="my_mqtt_client", # Custom client ID
user=User("username", "password"), # Custom user credentials
will=Message(false, 0x01, false, "last/will/topic", UInt8[]), # Custom will message
clean_session=true # Default clean session flag
)
```
### Example using UDS protocol with all custom values
uds_connection_full = MQTTConnection(
```julia
uds_connection_full = Connection(
UDS("/var/run/mqtt.sock"), # Using UDS with specified socket path
45, # Custom keep-alive interval of 45 seconds
"another_client", # Custom client ID
User("user", "pass"), # Custom user credentials
Message(true, 0x00, true, "will/topic", UInt8[1, 2, 3]), # Custom will message
false # Custom clean session flag
)
```
"""
struct MQTTConnection{T <: AbstractIOConnection}
struct Connection{T <: AbstractIOConnection} <: AbstractConfigElement
protocol::T
keep_alive::Int64
client_id::String
user::User
will::Message
clean_session::Bool

MQTTConnection(protocol::T;
Connection(protocol::T;
keep_alive::Int64=32,
client_id::String=randstring(8),
user::User=User("", ""),
will::Message=Message(false, 0x00, false, "", UInt8[]),
clean_session::Bool=true) where T <: AbstractIOConnection = new{T}(protocol, keep_alive, client_id, user, will, clean_session)

MQTTConnection(protocol::T,
Connection(protocol::T,
keep_alive::Int64,
client_id::String,
user::User,
will::Message,
clean_session::Bool) where T <: AbstractIOConnection = new{T}(protocol, keep_alive, client_id, user, will, clean_session)
end

Base.show(io::IO, connection::MQTTConnection) = print(io, "MQTTConnection(Protocol: $(connection.protocol), Client ID: $(connection.client_id)", (connection.user == User("","") ? "" : ", User Name: $(connection.user.name)"), ")")
Base.show(io::IO, connection::Connection) = print(io, "Connection(Protocol: $(connection.protocol), Client ID: $(connection.client_id)", (connection.user == User("","") ? "" : ", User Name: $(connection.user.name)"), ")")

"""
Configuration
Container for the mqtt client and mqtt connection data. This is partially iterable, and can be spread to 2 variables with the `...` splat operator or `client, conn = conf` variable assignment.
## Example
```julia
# using the MakeConnection interface function
config = MakeConnection("/temp/mqtt.sock")
# using a defined IO
io = IOConnection("localhost",1883)
config = Configuration(io)
# spreading the variables
client, connection = Configuration(...)
```
"""
struct Configuration
client::Client
connection::Connection

function Configuration(io::T,
ping_timeout=UInt64(60),
keep_alive::Int64=32,
client_id::String=randstring(8),
user::User=User("", ""),
will::Message=Message(false, 0x00, false, "", UInt8[]),
clean_session::Bool=true) where {T <: AbstractIOConnection}
new(Client(ping_timeout), Connection(io, keep_alive, client_id, user, will, clean_session))
end
function Configuration(client::Client, connection::Connection)
new(client, connection)
end
end

Base.iterate(conf::Configuration, state=1) = state == 1 ? (conf.client, 2) : (conf.connection, nothing)
Base.length(::Configuration) = 2
Base.IteratorSize(::Type{Configuration}) = Base.HasLength()
Base.IteratorEltype(::Type{Configuration}) = AbstractConfigElement
3 changes: 1 addition & 2 deletions src/handlers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ function handle_pingresp(client::Client, s::IO, cmd::UInt8, flags::UInt8)
if @atomic(client.ping_outstanding) == 0x1
@atomicswap client.ping_outstanding = 0x0
else
# We received a subresp packet we didn't ask for
# disconnect(client)
# We received a ping resp packet we didn't ask for
@atomicswap client.state = 0x03
throw(ArgumentError("No outstanding ping. client.ping_outstanding = $(client.ping_outstanding) and should be 0x1"))
end
Expand Down
Loading

0 comments on commit f75dda8

Please sign in to comment.