- Router: Data storage + routing logic. Connections (links) forward data to router
The Broker
when created spawns a Router
which listens on the receiver for any events. When Broker::start()
is called, a TCP listener is spawned which listens for MQTT connections.
LinkRx
is responsible for sending Notification
from Router
to RemoteLink
and LinkTx
is responsible for sending Event
s from RemoteLink
to Router
, though RemoteLink
is not the only thing sending Event
s to Router
.
Whenever a new connection is received, a Network
is spawned to handle the connection stream, and a LinTx
-LinkRx
pair wrapped in a RemoteLink
is spawned which is responsible to:
- initiate he MQTT connection using the corresponding
Network
and registering the connection with theRouter
. - asynchronously await on
Network
for reading packetsLinkRx
for notifications from router which need to be forwarded toNetwork
.
There are separate shared buffers (Arc<Mutex<..>>
s) for LinkTx
and LinkRx
which is used to actually send notifications and events over, and the mpsc channel is just used to notify that there is something present in the shared buffer.
Connection
is a struct used by Router
to store all the metadata about a connection, as well as manage a QoS1 buffer for the same. Connection
also holds onto the mpsc receiver which is used to notify the RemoteLink
that are there some new notifications to send over to Network
.
Router::readyqueue
stores all the connections to which Router
has some notifications to forward to, whether these are acks or publish messages.
DataRequest
represent a connection's request to fetch the data off from a topic it has subscribed to. The connection hasn't actually sent any request except the subscribe packet at start, but these are instead used to continuously read data off the logs to ensure that connection is up-to-date.
Tracker
stores to current status of a given connection's DataRequest
s. It has various unschedule flags, one if which is set whenever the connection is removed from the Router::reaydqueue
, the reason for which in majority of cases is because the connection has caught up the with topic (Tracker::caughtup_unschedule
flag is set in this case). Tracker also keeps all the DataRequest
s for the given connection, as well as whether there are any pending acks for the connection in the Router::ackslog
.
Router::datalog
, for each valid topic, stores all the pending DataRequest
s. Note that these data requests pending because the corresponding connection has caught up with the logs. These DataRequest
s being here means that they are not stored in Router::trackers
when Tracker::caughtup_unschedule
is set.
Router::run()
loops over Router::run_inner()
, which:
- if
Router::reaydqueue
is empty, blockingly wait over events receiver to prevent spinning in loop. There is nothing to send over toRemoteLink
, and thus blocking here is fine. - if
Router::readyqueue
is not empty, parses 0-500 events from receiver. This means that if there are no events to be parsed, router moves on to sending notifications over toRemoteLink
.
Router::consume(id: ConnectionId)
is the function where we for the given id
:
- send all the pending acks
- send all the pending data, that is, all the messages from topics the connection has subscribed to, if any.
Whenever a new connection is formed (which Router
gets informed of via Event::Connect
), the router:
- checks if adding new connection exceeds the total number of allowed connections or not.
- if clean session is not requested, retrieve the any pending data and acks that were not sent
- NOTE: The metrics of the previous session are still retrieved, regardless of whether a new session was requested or not.
- necessary initializations are made
- finally a connack is sent back
When a new connection is added, we don't add the corresponding ConnectionId
to the readyqueue
because we directly append the connack to the shared buffer and notify RemoteLink
here itself instead of waiting for Router::consume
to do so.
Whenever the RemoteLink
reads off some bytes from Network
, it appends it to the shared buffer and notifies the Router
via Event::DeviceData
. The router parses these bytes as some MQTT packets and performs the required actions.
The publish packets are appended to CommitLog
, a puback is added to ackslog for the connection, and the connection is added to Router::readyqueue
so that the ack gets flushed.
For all the the waiters on the topic, they are added to DataLog::notifications
, which at the end of parsing loop, get added to respective trackers and corresponding connections are added to readyqueue.
Publish packet are only rejected if topic is invalid, in which case, the connection is disconnected.
For each filter subsribed to, we validate the subscription and call Router::prepare_consumption()
, which adds connection to readyqueue. If subscription is invalid, connection is disconnected.
These packets are registed with the QoS1 buffer in Connection
, and if the ack is out of order or unsolicited then we disconnect.
PingReq
and disconnect packets are also parsed, rest are ignored.
Router::handle_disconnection()
is called here, as well as within data events when disconnection is required. The connection is removed, and so is corresponding acks in Router::ackslog
. All the data requests in waiters is pushed back to tracker, and tracker itself is saved in graveyard if clean session is not asked.
When a connection's buffer is full, the router pushes an unschedule notification at the last of the buffer. So whenever the RemoteLink
encounters unschedule notification, it sends a ready event to router to let it know that buffer has now free space for more notifications.
NOTE: Transition to ConnectionReady will schedule the connection for consumption
-
Send acks (connack, suback, puback, pubrec ...) and data forwards
-
Possible output states
- ConnectionCaughtup
- ConnectionBusy
- InflightFull
-
New connection (event)
- New tracker initialized with ConnectionBusy
- ConnectionBusy -> ConnectionReady
-
New subscription
- Initialize tracker with requests
- if ConnectionCaughtUp
- ConnectionCaughtUp -> ConnectionReady
-
New publish
- Write to a filter/s
- Reinitialize trackers with parked requests
- If ConnectionCaughtUp
- ConnectionCaughtUp -> ConnectionReady
-
Acks
- Handle outgoing state
- If InflightFull
- ConnectionCaughtUp -> ConnectionReady
-
Connection ready
- Connection should already be in ConnectionBusy
- If ConnectionBusy
- ConnectionBusy -> ConnectionReady
Here are a few terms to understand before we move on. These terms are used internally, within the codebase, to organize the files and modules.
-
Input and Output Queues (
ibufs
andobufs
)The datapath of
rumqttd
are two instances ofVecDequeue
fromstd::collections
. They areSlab
datatypes, which pre-allocates storage. Everything else just manages these and their interfaces to various other parts of the system. The queues are as follows -ibufs
: All incoming data torumqttd
(represented by aPacket
) is stored immediately inibufs
(which holds aVecDequeue
)obufs
: All outgoing datarumqttd
(represented byNotification
) is stored immediately in theobufs
(which holds aVecDequeue
).
obufs
also has an in-flight queue to track packets that have been sent via the network but have not been acknowledged. -
Broker
Top-level entity that handles everything, including, but not limited to
- Configuring and creating the Router
- Starting the Router
- Maintaining a channel that communicates with the router
When you create a new Broker, a new Router is also automatically created.
When you start the Broker, the following happens ->
- The Router is started, and we recieve a channel (
router_tx
), which all other Links and Servers passEvent
s into - (on a new thread) Creates the metrics server
- (on a new thread) Starts all
mqttv4
servers - (on a new thread) Starts all
mqttv5
servers - (on a new thread, if enabled) Starts all
websocket
servers - (on a new thread) Starts the prometheus listener
- (on the same thread) Starts the ConsoleLink to allow HTTP queries to monitor and manage the broker itself.
-
Router
Entity created by the broker that controls the flow of data in
rumqttd
. The router is responsible for managing, authorizing, and scheduling the flow of data between components within and connected torumqttd
.The router is built on the reactor pattern, where it works primarily by reacting to events that happen. As and when an event is added to
ibufs
, the router reacts by checking its current state (for a certain connection or message) and dispatching the appropriate action. This can involve changing other in-memory data structures, or writing the message toobufs
to make sure it's handled by the appropriate link.An action of any sort on the router's end which involves communication usually involves the router adding a structure or packet to one of these buffers.
The router also shares Channels, which do not contain the events themselves, but rather contain notifications to events that might be added into the shared event buffers. This also has the added benefit of preventing lock contention between different futures/tasks, which would've happened if they had to keep checking the buffers
When you create the router, the following happens ->
- Transmission Channels get created to and from the router
- A
Graveyard
to store information to persist connections - A tracker for All the alerts and metrics that are necessary to be handled
- A Map between subscriptions and devices gets created
- The two communication buffers
ibufs
andobufs
get initialized - Logs for message acknowledgements, subscriptions, and alerts get created
- A Packet Cache is created
- The Scheduler struct is created
Once all necessary things have been initialized, the
spawn
method intializes the router's event loop on a separate thread. This method returns a cloneable transmission channel that the Broker passes to all links so they can communicate with the router. -
Link
An Asynchronous and usually network-facing entity that Handles transmission of information to and from the Router. Information here can mean things including but not limited to
- Messages incoming from devices (publish)
- Messages outgoing to devices (subscribe)
- Information about metrics
- Other events such as pings
There are various types of Links. Here they are, explored in minimal detail -
- Bridge -> Can be used to link two
rumqttd
instances for high availability. - Alerts -> Since
rumqttd
is embeddable, this allows developers to create custom hooks to handle alerts thatrumqttd
gives out. - Console -> Creates a lightweight HTTP Server to obtain information and metrics from the
rumqttd
instance. - Local -> Contains functions to perform Publish and Subscribe actions to the router. This is used inside RemoteLink.
- Meters -> Creates a link specifically to consume metrics, is useful for external observability or hooks
- Remote ->
RemoteLink
is an asynchronous entity that represents a network connection. It provides its own event loop, which does the following -- Reads from the network socket, writes to
ibufs
and notifies the router - Writes to the network socket when required
- Reads from the network socket, writes to
- Timer -> Handles timing actions for metrics, and sends
Event
s related to alerts and metrics to their respective links.
link
-> Contains all the differentlink
s and their individual logicprotocol
-> Handles serializing/deserializing MQTT v4 and v5 Packets while providing a unified abstraction to process the same. Pinned version of mqttbytesreplicator
(not in use) -> Contains primitives for a clustered setuprouter
-> Contains the logic for the router, central I/O Buffers, Connection and Client Management, Scheduler, Timersegments
-> Contains the logic for disk persistence and Memory Managementserver
-> Contains the Broker, TLS Connection Logic, and some Commmon Types