Skip to content

Commit

Permalink
Merge pull request #717 from wprzytula/lwt-optimisation
Browse files Browse the repository at this point in the history
LWT routing optimisation: plan contains replicas in ring order
  • Loading branch information
piodul authored May 31, 2023
2 parents 264139e + ecb4a9f commit 76acf4f
Show file tree
Hide file tree
Showing 10 changed files with 1,190 additions and 174 deletions.
8 changes: 7 additions & 1 deletion docs/source/load-balancing/default-policy.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,11 @@ And only if latency awareness is enabled:

If no preferred datacenter is specified, all nodes are treated as local ones.

Replicas in the same priority groups are shuffled. Non-replicas are randomly
Replicas in the same priority groups are shuffled[^*]. Non-replicas are randomly
rotated (similarly to a round robin with a random index).

[^*]: There is an optimisation implemented for LWT requests[^**] that routes them
to the replicas in the ring order (as it prevents contention due to Paxos conflicts),
so replicas in that case are not shuffled in groups at all.

[^**]: In order for the optimisation to be applied, LWT statements must be prepared before.
972 changes: 814 additions & 158 deletions scylla/src/transport/load_balancing/default.rs

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions scylla/src/transport/load_balancing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ pub type FallbackPlan<'a> = Box<dyn Iterator<Item = NodeRef<'a>> + Send + Sync +
/// `pick` and `fallback`. `pick` returns a first node to contact for a given query, `fallback`
/// returns the rest of the load balancing plan.
///
/// `fallback` is called only after a failed send to `pick`ed node (or when executing
/// speculatively).
/// If a `pick` returns `None`, `fallback` will not be called.
/// `fallback` is called not only if a send to `pick`ed node failed (or when executing
/// speculatively), but also if `pick` returns `None`.
///
/// Usually the driver needs only the first node from load balancing plan (most queries are send
/// successfully, and there is no need to retry).
Expand Down
83 changes: 80 additions & 3 deletions scylla/src/transport/load_balancing/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,24 @@ impl<'a> Iterator for Plan<'a> {
self.state = PlanState::Picked(picked);
Some(picked)
} else {
error!("Load balancing policy returned an empty plan! The query cannot be executed. Routing info: {:?}", self.routing_info);
self.state = PlanState::PickedNone;
None
// `pick()` returned None, which semantically means that a first node cannot be computed _cheaply_.
// This, however, does not imply that fallback would return an empty plan, too.
// For instance, as a side effect of LWT optimisation in Default Policy, pick() may return None
// when the primary replica is down. `fallback()` will nevertheless return the remaining replicas,
// if there are such.
let mut iter = self.policy.fallback(self.routing_info, self.cluster);
let first_fallback_node = iter.next();
if let Some(node) = first_fallback_node {
self.state = PlanState::Fallback {
iter,
node_to_filter_out: node,
};
Some(node)
} else {
error!("Load balancing policy returned an empty plan! The query cannot be executed. Routing info: {:?}", self.routing_info);
self.state = PlanState::PickedNone;
None
}
}
}
PlanState::Picked(node) => {
Expand Down Expand Up @@ -85,3 +100,65 @@ impl<'a> Iterator for Plan<'a> {
}
}
}

#[cfg(test)]
mod tests {
use std::{net::SocketAddr, str::FromStr, sync::Arc};

use crate::transport::{
locator::test::{create_locator, mock_metadata_for_token_aware_tests},
Node, NodeAddr,
};

use super::*;

fn expected_nodes() -> Vec<Arc<Node>> {
vec![Arc::new(Node::new_for_test(
NodeAddr::Translatable(SocketAddr::from_str("127.0.0.1:9042").unwrap()),
None,
None,
))]
}

#[derive(Debug)]
struct PickingNonePolicy {
expected_nodes: Vec<Arc<Node>>,
}
impl LoadBalancingPolicy for PickingNonePolicy {
fn pick<'a>(
&'a self,
_query: &'a RoutingInfo,
_cluster: &'a ClusterData,
) -> Option<NodeRef<'a>> {
None
}

fn fallback<'a>(
&'a self,
_query: &'a RoutingInfo,
_cluster: &'a ClusterData,
) -> FallbackPlan<'a> {
Box::new(self.expected_nodes.iter())
}

fn name(&self) -> String {
"PickingNone".into()
}
}

#[tokio::test]
async fn plan_calls_fallback_even_if_pick_returned_none() {
let policy = PickingNonePolicy {
expected_nodes: expected_nodes(),
};
let locator = create_locator(&mock_metadata_for_token_aware_tests());
let cluster_data = ClusterData {
known_peers: Default::default(),
keyspaces: Default::default(),
locator,
};
let routing_info = RoutingInfo::default();
let plan = Plan::new(&policy, &routing_info, &cluster_data);
assert_eq!(Vec::from_iter(plan.cloned()), policy.expected_nodes);
}
}
260 changes: 258 additions & 2 deletions scylla/src/transport/locator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ use itertools::Itertools;
use precomputed_replicas::PrecomputedReplicas;
use replicas::{ReplicasArray, EMPTY_REPLICAS};
use replication_info::ReplicationInfo;
use std::{cmp, collections::HashMap, sync::Arc};
use std::{
cmp,
collections::{HashMap, HashSet},
sync::Arc,
};
use tracing::debug;

/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication
Expand Down Expand Up @@ -376,7 +380,7 @@ impl<'a> IntoIterator for ReplicaSet<'a> {
locator,
token,
} => {
if let Some(datacenter) = &locator.datacenters.first() {
if let Some(datacenter) = locator.datacenters.first() {
let repfactor = *datacenter_repfactors.get(datacenter.as_str()).unwrap_or(&0);
ReplicaSetIteratorInner::ChainedNTS {
replicas: locator
Expand Down Expand Up @@ -549,3 +553,255 @@ impl<'a> Iterator for ReplicaSetIterator<'a> {
}
}
}

impl<'a> ReplicaSet<'a> {
pub fn into_replicas_ordered(self) -> ReplicasOrdered<'a> {
ReplicasOrdered { replica_set: self }
}
}

/// Represents a sequence of replicas for a given token and strategy,
/// ordered according to the ring order.
///
/// This container can only be created by calling `ReplicaSet::into_replicas_ordered()`,
/// and either it can borrow precomputed replica lists living in the locator (in case of SimpleStrategy)
/// or it must compute them on-demand (in case of NetworkTopologyStrategy).
/// The computation is lazy (performed by `ReplicasOrderedIterator` upon call to `next()`).
/// For obtaining the primary replica, no allocations are needed. Therefore, the first call
/// to `next()` is optimised and doesn not allocate.
/// For the remaining others, unfortunately, allocation is unevitable.
pub struct ReplicasOrdered<'a> {
replica_set: ReplicaSet<'a>,
}

/// Iterator that returns replicas from some replica sequence, ordered according to the ring order.
pub struct ReplicasOrderedIterator<'a> {
inner: ReplicasOrderedIteratorInner<'a>,
}

enum ReplicasOrderedIteratorInner<'a> {
AlreadyRingOrdered {
// In case of Plain and FilteredSimple variants, ReplicaSetIterator respects ring order.
replica_set_iter: ReplicaSetIterator<'a>,
},
PolyDatacenterNTS {
// In case of ChainedNTS variant, ReplicaSetIterator does not respect ring order,
// so specific code is needed to yield replicas according to that order.
replicas_ordered_iter: ReplicasOrderedNTSIterator<'a>,
},
}

enum ReplicasOrderedNTSIterator<'a> {
FreshForPick {
datacenter_repfactors: &'a HashMap<String, usize>,
locator: &'a ReplicaLocator,
token: Token,
},
Picked {
datacenter_repfactors: &'a HashMap<String, usize>,
locator: &'a ReplicaLocator,
token: Token,
picked: NodeRef<'a>,
},
ComputedFallback {
replicas: ReplicasArray<'a>,
idx: usize,
},
}

impl<'a> Iterator for ReplicasOrderedNTSIterator<'a> {
type Item = NodeRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
match *self {
Self::FreshForPick {
datacenter_repfactors,
locator,
token,
} => {
// We're going to find the primary replica for the given token.
let nodes_on_ring = locator.replication_data.get_global_ring().ring_range(token);
for node in nodes_on_ring {
// If this node's DC has some replicas in this NTS...
if let Some(dc) = &node.datacenter {
if datacenter_repfactors.get(dc).is_some() {
// ...then this node must be the primary replica.
*self = Self::Picked {
datacenter_repfactors,
locator,
token,
picked: node,
};
return Some(node);
}
}
}
None
}
Self::Picked {
datacenter_repfactors,
locator,
token,
picked,
} => {
// Clippy can't check that in Eq and Hash impls we don't actually use any field with interior mutability
// (in Node only `down_marker` is such, being an AtomicBool).
// https://rust-lang.github.io/rust-clippy/master/index.html#mutable_key_type
#[allow(clippy::mutable_key_type)]
let mut all_replicas: HashSet<&'a Arc<Node>> = HashSet::new();
for (datacenter, repfactor) in datacenter_repfactors.iter() {
all_replicas.extend(
locator
.get_network_strategy_replicas(token, datacenter, *repfactor)
.iter(),
);
}
// It's no use returning a node that was already picked.
all_replicas.remove(picked);

let mut replicas_ordered = vec![];
let nodes_on_ring = locator.replication_data.get_global_ring().ring_range(token);
for node in nodes_on_ring {
if all_replicas.is_empty() {
// All replicas were put in order.
break;
}
if all_replicas.remove(node) {
replicas_ordered.push(node);
}
}
assert!(
all_replicas.is_empty(),
"all_replicas somehow contained a node that wasn't present in the global ring!"
);

*self = Self::ComputedFallback {
replicas: ReplicasArray::Owned(replicas_ordered),
idx: 0,
};
self.next()
}
Self::ComputedFallback {
ref replicas,
ref mut idx,
} => {
if let Some(replica) = replicas.get(*idx) {
*idx += 1;
Some(replica)
} else {
None
}
}
}
}
}

impl<'a> Iterator for ReplicasOrderedIterator<'a> {
type Item = NodeRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
match &mut self.inner {
ReplicasOrderedIteratorInner::AlreadyRingOrdered { replica_set_iter } => {
replica_set_iter.next()
}
ReplicasOrderedIteratorInner::PolyDatacenterNTS {
replicas_ordered_iter,
} => replicas_ordered_iter.next(),
}
}
}

impl<'a> IntoIterator for ReplicasOrdered<'a> {
type Item = NodeRef<'a>;
type IntoIter = ReplicasOrderedIterator<'a>;

fn into_iter(self) -> Self::IntoIter {
let Self { replica_set } = self;
Self::IntoIter {
inner: match replica_set.inner {
ReplicaSetInner::Plain(_) | ReplicaSetInner::FilteredSimple { .. } => {
ReplicasOrderedIteratorInner::AlreadyRingOrdered {
replica_set_iter: replica_set.into_iter(),
}
}
ReplicaSetInner::ChainedNTS {
datacenter_repfactors,
locator,
token,
} => ReplicasOrderedIteratorInner::PolyDatacenterNTS {
replicas_ordered_iter: ReplicasOrderedNTSIterator::FreshForPick {
datacenter_repfactors,
locator,
token,
},
},
},
}
}
}

#[cfg(test)]
mod tests {
use crate::{routing::Token, transport::locator::test::*};

#[tokio::test]
async fn test_replicas_ordered() {
let metadata = mock_metadata_for_token_aware_tests();
let locator = create_locator(&metadata);

// For each case (token, limit_to_dc, strategy), we are checking
// that ReplicasOrdered yields replicas in the expected order.
let check = |token, limit_to_dc, strategy, expected| {
let replica_set =
locator.replicas_for_token(Token { value: token }, strategy, limit_to_dc);
let replicas_ordered = replica_set.into_replicas_ordered();
let ids: Vec<_> = replicas_ordered
.into_iter()
.map(|node| node.address.port())
.collect();
assert_eq!(expected, ids);
};

// In all these tests:
// going through the ring, we get order: F , A , C , D , G , B , E
// us eu eu us eu eu us
// r2 r1 r1 r1 r2 r1 r1
check(
160,
None,
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
vec![F, A, C, D, G, E],
);
check(
160,
None,
&metadata.keyspaces.get(KEYSPACE_NTS_RF_2).unwrap().strategy,
vec![F, A, D, G],
);
check(
160,
None,
&metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy,
vec![F, A],
);

check(
160,
Some("eu"),
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
vec![A, C, G],
);
check(
160,
Some("us"),
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
vec![F, D, E],
);
check(
160,
Some("eu"),
&metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy,
vec![A],
);
}
}
Loading

0 comments on commit 76acf4f

Please sign in to comment.