Skip to content

Commit

Permalink
Merge pull request #6 from akasamq/improve-route-performance
Browse files Browse the repository at this point in the history
perf: improve route/retain table performance
  • Loading branch information
TheWaWaR authored Nov 12, 2023
2 parents 93e9621 + 2eab6dd commit 9c3dee4
Showing 1 changed file with 34 additions and 22 deletions.
56 changes: 34 additions & 22 deletions akasa-core/src/protocols/mqtt/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::ops::Deref;
use std::sync::Arc;

use ahash::RandomState;
use dashmap::DashMap;
use hashbrown::HashMap;
use mqtt_proto::{QoS, TopicFilter, TopicName, LEVEL_SEP, MATCH_ALL_STR, MATCH_ONE_STR};
use parking_lot::RwLock;
Expand All @@ -12,12 +11,12 @@ use crate::state::ClientId;

#[derive(Default)]
pub struct RouteTable {
nodes: DashMap<String, RouteNode>,
nodes: RwLock<HashMap<String, RouteNode>>,
}

struct RouteNode {
content: Arc<RwLock<RouteContent>>,
nodes: Arc<DashMap<String, RouteNode>>,
nodes: Arc<RwLock<HashMap<String, RouteNode>>>,
}

#[derive(Debug, Clone)]
Expand All @@ -40,16 +39,16 @@ impl RouteTable {
let (topic_item, rest_items) = split_topic(topic_name.deref());
let mut filters = Vec::new();

if let Some(pair) = self.nodes.get(topic_item) {
pair.value()
.get_matches(topic_item, rest_items, &mut filters);
let nodes = self.nodes.read();
if let Some(node) = nodes.get(topic_item) {
node.get_matches(topic_item, rest_items, &mut filters);
}
// [MQTT-4.7.2-1] The Server MUST NOT match Topic Filters starting with a
// wildcard character (# or +) with Topic Names beginning with a $ character
if !topic_name.starts_with('$') {
for item in [MATCH_ALL_STR, MATCH_ONE_STR] {
if let Some(pair) = self.nodes.get(item) {
pair.value().get_matches(item, rest_items, &mut filters);
if let Some(node) = nodes.get(item) {
node.get_matches(item, rest_items, &mut filters);
}
}
}
Expand Down Expand Up @@ -78,6 +77,7 @@ impl RouteTable {
let (filter_item, rest_items) = split_topic(topic_filter.deref());
// Since subscribe is not an frequent action, string clone here is acceptable.
self.nodes
.write()
.entry(filter_item.to_string())
.or_insert_with(RouteNode::new)
.insert(topic_filter, rest_items, id, qos, group);
Expand All @@ -98,11 +98,12 @@ impl RouteTable {
let (filter_item, rest_items) = split_topic(topic_filter.deref());
// bool variable is for resolve dead lock of access `self.nodes`
let mut remove_node = false;
if let Some(mut pair) = self.nodes.get_mut(filter_item) {
remove_node = pair.value_mut().remove(rest_items, id, group);
let mut nodes = self.nodes.write();
if let Some(node) = nodes.get_mut(filter_item) {
remove_node = node.remove(rest_items, id, group);
}
if remove_node {
self.nodes.remove(filter_item);
nodes.remove(filter_item);
}
}
}
Expand All @@ -115,7 +116,7 @@ impl RouteNode {
clients: HashMap::new(),
groups: HashMap::new(),
})),
nodes: Arc::new(DashMap::new()),
nodes: Arc::new(RwLock::new(HashMap::new())),
}
}

Expand All @@ -130,10 +131,11 @@ impl RouteNode {
filters.push(Arc::clone(&self.content));
}
} else if let Some(topic_items) = topic_items {
let nodes = self.nodes.read();
let (topic_item, rest_items) = split_topic(topic_items);
for item in [topic_item, MATCH_ALL_STR, MATCH_ONE_STR] {
if let Some(pair) = self.nodes.get(item) {
pair.value().get_matches(item, rest_items, filters);
if let Some(node) = nodes.get(item) {
node.get_matches(item, rest_items, filters);
}
}
} else {
Expand All @@ -142,9 +144,12 @@ impl RouteNode {
}

// Topic name "abc" will match topic filter "abc/#", since "#" also represent parent level.
if let Some(pair) = self.nodes.get(MATCH_ALL_STR) {
if !pair.value().content.read().is_empty() {
filters.push(Arc::clone(&pair.value().content));
// NOTE: [locks]
// * nodes read
// * content read
if let Some(node) = self.nodes.read().get(MATCH_ALL_STR) {
if !node.content.read().is_empty() {
filters.push(Arc::clone(&node.content));
}
}
}
Expand All @@ -161,6 +166,7 @@ impl RouteNode {
if let Some(filter_items) = filter_items {
let (filter_item, rest_items) = split_topic(filter_items);
self.nodes
.write()
.entry(filter_item.to_string())
.or_insert_with(RouteNode::new)
.insert(topic_filter, rest_items, id, qos, group);
Expand All @@ -185,15 +191,21 @@ impl RouteNode {
if let Some(filter_items) = filter_items {
let (filter_item, rest_items) = split_topic(filter_items);
// bool variables are for resolve dead lock of access `self.nodes`

// NOTE: [locks]
// * nodes write
// * content read
let mut nodes = self.nodes.write();
let mut remove_node = false;
if let Some(mut pair) = self.nodes.get_mut(filter_item) {
if pair.value_mut().remove(rest_items, id, group) {
if let Some(node) = nodes.get_mut(filter_item) {
if node.remove(rest_items, id, group) {
remove_node = true;
}
}
let remove_parent = if remove_node {
self.nodes.remove(filter_item);
self.content.read().is_empty() && self.nodes.is_empty()
nodes.remove(filter_item);
// NOTE: careful lock order
self.content.read().is_empty() && nodes.is_empty()
} else {
false
};
Expand All @@ -212,7 +224,7 @@ impl RouteNode {
}
if content.is_empty() {
content.topic_filter = None;
if self.nodes.is_empty() {
if self.nodes.read().is_empty() {
return true;
}
}
Expand Down

0 comments on commit 9c3dee4

Please sign in to comment.