From 847e71d6e7d0709b17cfc75f4f9a647a8e5abc94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 08:28:45 +0200 Subject: [PATCH 01/16] locator, default_policy: Docstring typos fixes --- scylla/src/transport/load_balancing/default.rs | 2 +- scylla/src/transport/locator/precomputed_replicas.rs | 1 + scylla/src/transport/locator/replication_info.rs | 2 +- scylla/src/transport/locator/token_ring.rs | 7 ++++--- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 821e4a9fd5..67368e0e78 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -279,7 +279,7 @@ impl DefaultPolicy { "Datacenter specified as the preferred one ({}) does not exist!", preferred_datacenter ); - // We won't guess any DC, as it could lead to possible violation dc failover ban. + // We won't guess any DC, as it could lead to possible violation of dc failover ban. return &[]; } } diff --git a/scylla/src/transport/locator/precomputed_replicas.rs b/scylla/src/transport/locator/precomputed_replicas.rs index aa7f4749fd..de6d5e1a63 100644 --- a/scylla/src/transport/locator/precomputed_replicas.rs +++ b/scylla/src/transport/locator/precomputed_replicas.rs @@ -84,6 +84,7 @@ impl PrecomputedReplicas { // Each ring will precompute for at least this RF let min_precomputed_rep_factor: usize = 1; + // The maximum replication factor over those used with Simple strategy. let mut max_global_repfactor: usize = min_precomputed_rep_factor; let mut dc_repfactors: HashMap<&'a str, BTreeSet> = HashMap::new(); diff --git a/scylla/src/transport/locator/replication_info.rs b/scylla/src/transport/locator/replication_info.rs index a4be678c8f..d17ed361ba 100644 --- a/scylla/src/transport/locator/replication_info.rs +++ b/scylla/src/transport/locator/replication_info.rs @@ -213,7 +213,7 @@ mod tests { use super::ReplicationInfo; #[tokio::test] - async fn test_simple_stategy() { + async fn test_simple_strategy() { let ring = create_ring(&mock_metadata_for_token_aware_tests()); let replication_info = ReplicationInfo::new(ring); diff --git a/scylla/src/transport/locator/token_ring.rs b/scylla/src/transport/locator/token_ring.rs index 5fdf7c823e..686d8e0a90 100644 --- a/scylla/src/transport/locator/token_ring.rs +++ b/scylla/src/transport/locator/token_ring.rs @@ -1,6 +1,7 @@ use crate::routing::Token; -/// A token ring is a continous hash ring. It defines association by hasing a key onto the ring and the walking the ring in one direction. +/// A token ring is a continuous hash ring. It defines association by hashing a key +/// onto the ring and then walking the ring in one direction. /// Cassandra and Scylla use it for determining data ownership which allows for efficient load balancing. /// The token ring is used by the driver to find the replicas for a given token. /// Each ring member has a token (i64 number) which defines the member's position on the ring. @@ -30,7 +31,7 @@ impl TokenRing { /// Provides an iterator over the ring members starting at the given token. /// The iterator traverses the whole ring in the direction of increasing tokens. /// After reaching the maximum token it wraps around and continues from the lowest one. - /// The iterator visits each member once, it doesn't have an infinte length. + /// The iterator visits each member once, it doesn't have infinite length. pub fn ring_range_full(&self, token: Token) -> impl Iterator { let binary_search_index: usize = match self.ring.binary_search_by(|e| e.0.cmp(&token)) { Ok(exact_match_index) => exact_match_index, @@ -47,7 +48,7 @@ impl TokenRing { /// The iterator traverses the whole ring in the direction of increasing tokens. /// After reaching the maximum token it wraps around and continues from the lowest one. /// The iterator visits each member once, it doesn't have an infinte length. - /// To access the token along with the element you can use `ring_range_full` + /// To access the token along with the element you can use `ring_range_full`. pub fn ring_range(&self, token: Token) -> impl Iterator { self.ring_range_full(token).map(|(_t, e)| e) } From 87be6889e3ba08a9b83bfe5660ab11b06c98ff87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 11:54:16 +0200 Subject: [PATCH 02/16] default_policy: rename ReplicaLocation -> ...Criteria The new name defines better the semantics of this type: we only provide criteria that we want to taken into account, without specifying the actual data (the replica's location) to be compared to. --- .../src/transport/load_balancing/default.rs | 58 ++++++++++++------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 67368e0e78..c1c7645f71 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -14,7 +14,7 @@ use std::{fmt, sync::Arc, time::Duration}; use tracing::warn; #[derive(Clone, Copy)] -enum ReplicaLocation { +enum ReplicaLocationCriteria { Any, Datacenter, DatacenterAndRack, @@ -74,7 +74,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if if self.preferred_rack.is_some() { let local_rack_picked = self.pick_replica( ts, - ReplicaLocation::DatacenterAndRack, + ReplicaLocationCriteria::DatacenterAndRack, &self.pick_predicate, cluster, ); @@ -88,7 +88,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if // If preferred datacenter is not specified, all replicas are treated as local. let picked = self.pick_replica( ts, - ReplicaLocation::Datacenter, + ReplicaLocationCriteria::Datacenter, &self.pick_predicate, cluster, ); @@ -99,8 +99,12 @@ or refrain from preferring datacenters (which may ban all other datacenters, if // If datacenter failover is possible, loosen restriction about locality. if self.is_datacenter_failover_possible(&routing_info) { - let picked = - self.pick_replica(ts, ReplicaLocation::Any, &self.pick_predicate, cluster); + let picked = self.pick_replica( + ts, + ReplicaLocationCriteria::Any, + &self.pick_predicate, + cluster, + ); if let Some(alive_remote_replica) = picked { return Some(alive_remote_replica); } @@ -155,17 +159,25 @@ or refrain from preferring datacenters (which may ban all other datacenters, if let maybe_replicas = if let Some(ts) = &routing_info.token_with_strategy { let local_rack_replicas = self.shuffled_replicas( ts, - ReplicaLocation::DatacenterAndRack, + ReplicaLocationCriteria::DatacenterAndRack, + Self::is_alive, + cluster, + ); + let local_replicas = self.shuffled_replicas( + ts, + ReplicaLocationCriteria::Datacenter, Self::is_alive, cluster, ); - let local_replicas = - self.shuffled_replicas(ts, ReplicaLocation::Datacenter, Self::is_alive, cluster); // If a datacenter failover is possible, loosen restriction about locality. let maybe_remote_replicas = if self.is_datacenter_failover_possible(&routing_info) { - let remote_replicas = - self.shuffled_replicas(ts, ReplicaLocation::Any, Self::is_alive, cluster); + let remote_replicas = self.shuffled_replicas( + ts, + ReplicaLocationCriteria::Any, + Self::is_alive, + cluster, + ); Either::Left(remote_replicas) } else { Either::Right(std::iter::empty()) @@ -305,19 +317,21 @@ impl DefaultPolicy { fn replicas<'a>( &'a self, ts: &TokenWithStrategy<'a>, - replica_location: ReplicaLocation, + replica_location: ReplicaLocationCriteria, predicate: impl Fn(&NodeRef<'a>) -> bool, cluster: &'a ClusterData, ) -> impl Iterator> { let predicate = move |node| match replica_location { - ReplicaLocation::Any | ReplicaLocation::Datacenter => predicate(&node), - ReplicaLocation::DatacenterAndRack => { + ReplicaLocationCriteria::Any | ReplicaLocationCriteria::Datacenter => predicate(&node), + ReplicaLocationCriteria::DatacenterAndRack => { predicate(&node) && node.rack == self.preferred_rack } }; let should_be_local = match replica_location { - ReplicaLocation::Any => false, - ReplicaLocation::Datacenter | ReplicaLocation::DatacenterAndRack => true, + ReplicaLocationCriteria::Any => false, + ReplicaLocationCriteria::Datacenter | ReplicaLocationCriteria::DatacenterAndRack => { + true + } }; self.nonfiltered_replica_set(ts, should_be_local, cluster) .into_iter() @@ -327,19 +341,21 @@ impl DefaultPolicy { fn pick_replica<'a>( &'a self, ts: &TokenWithStrategy<'a>, - replica_location: ReplicaLocation, + replica_location: ReplicaLocationCriteria, predicate: &impl Fn(&NodeRef<'a>) -> bool, cluster: &'a ClusterData, ) -> Option> { let predicate = |node| match replica_location { - ReplicaLocation::Any | ReplicaLocation::Datacenter => predicate(&node), - ReplicaLocation::DatacenterAndRack => { + ReplicaLocationCriteria::Any | ReplicaLocationCriteria::Datacenter => predicate(&node), + ReplicaLocationCriteria::DatacenterAndRack => { predicate(&node) && node.rack == self.preferred_rack } }; let should_be_local = match replica_location { - ReplicaLocation::Any => false, - ReplicaLocation::Datacenter | ReplicaLocation::DatacenterAndRack => true, + ReplicaLocationCriteria::Any => false, + ReplicaLocationCriteria::Datacenter | ReplicaLocationCriteria::DatacenterAndRack => { + true + } }; let replica_set = self.nonfiltered_replica_set(ts, should_be_local, cluster); if let Some(fixed) = self.fixed_shuffle_seed { @@ -353,7 +369,7 @@ impl DefaultPolicy { fn shuffled_replicas<'a>( &'a self, ts: &TokenWithStrategy<'a>, - replica_location: ReplicaLocation, + replica_location: ReplicaLocationCriteria, predicate: impl Fn(&NodeRef<'a>) -> bool, cluster: &'a ClusterData, ) -> impl Iterator> { From f7cbb17ef5e3a4321ae389e2f5dca0b19032394b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 12:26:24 +0200 Subject: [PATCH 03/16] default_policy: compute should_be_local on one path This is a simple refactor to avoid code duplication. --- .../src/transport/load_balancing/default.rs | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index c1c7645f71..9056d03bdd 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -302,9 +302,16 @@ impl DefaultPolicy { fn nonfiltered_replica_set<'a>( &'a self, ts: &TokenWithStrategy<'a>, - should_be_local: bool, + replica_location: ReplicaLocationCriteria, cluster: &'a ClusterData, ) -> ReplicaSet<'a> { + let should_be_local = match replica_location { + ReplicaLocationCriteria::Any => false, + ReplicaLocationCriteria::Datacenter | ReplicaLocationCriteria::DatacenterAndRack => { + true + } + }; + let datacenter = should_be_local .then_some(self.preferred_datacenter.as_deref()) .flatten(); @@ -327,13 +334,8 @@ impl DefaultPolicy { predicate(&node) && node.rack == self.preferred_rack } }; - let should_be_local = match replica_location { - ReplicaLocationCriteria::Any => false, - ReplicaLocationCriteria::Datacenter | ReplicaLocationCriteria::DatacenterAndRack => { - true - } - }; - self.nonfiltered_replica_set(ts, should_be_local, cluster) + + self.nonfiltered_replica_set(ts, replica_location, cluster) .into_iter() .filter(move |node: &NodeRef<'a>| predicate(node)) } @@ -351,13 +353,9 @@ impl DefaultPolicy { predicate(&node) && node.rack == self.preferred_rack } }; - let should_be_local = match replica_location { - ReplicaLocationCriteria::Any => false, - ReplicaLocationCriteria::Datacenter | ReplicaLocationCriteria::DatacenterAndRack => { - true - } - }; - let replica_set = self.nonfiltered_replica_set(ts, should_be_local, cluster); + + let replica_set = self.nonfiltered_replica_set(ts, replica_location, cluster); + if let Some(fixed) = self.fixed_shuffle_seed { let mut gen = Pcg32::new(fixed, 0); replica_set.choose_filtered(&mut gen, predicate) From bb1051bd8e5d2f51b3c518d298822cc38ab8a512 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 12:29:56 +0200 Subject: [PATCH 04/16] default_policy: improve readability of preferred_node_set() It is clearer not to use returns but instead use the functional style. --- scylla/src/transport/load_balancing/default.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 9056d03bdd..541c1b23d2 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -285,18 +285,18 @@ impl DefaultPolicy { .replica_locator() .unique_nodes_in_datacenter_ring(preferred_datacenter.as_str()) { - return nodes; + nodes } else { tracing::warn!( "Datacenter specified as the preferred one ({}) does not exist!", preferred_datacenter ); // We won't guess any DC, as it could lead to possible violation of dc failover ban. - return &[]; + &[] } + } else { + cluster.replica_locator().unique_nodes_in_global_ring() } - - cluster.replica_locator().unique_nodes_in_global_ring() } fn nonfiltered_replica_set<'a>( From b41d34813f502286fe8317439d38dee85337aabf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 12:50:37 +0200 Subject: [PATCH 05/16] default_policy: extract make_rack_predicate() This is another refactor that aims to reduce code duplication. --- .../src/transport/load_balancing/default.rs | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 541c1b23d2..4c836e663e 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -321,19 +321,29 @@ impl DefaultPolicy { .replicas_for_token(ts.token, ts.strategy, datacenter) } + /// Wraps the provided predicate, adding the requirement for rack to match. + fn make_rack_predicate<'a>( + predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, + replica_location: ReplicaLocationCriteria, + preferred_rack: &'a Option, + ) -> impl Fn(NodeRef<'a>) -> bool { + move |node| match replica_location { + ReplicaLocationCriteria::Any | ReplicaLocationCriteria::Datacenter => predicate(&node), + ReplicaLocationCriteria::DatacenterAndRack => { + predicate(&node) && &node.rack == preferred_rack + } + } + } + fn replicas<'a>( &'a self, ts: &TokenWithStrategy<'a>, replica_location: ReplicaLocationCriteria, - predicate: impl Fn(&NodeRef<'a>) -> bool, + predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, cluster: &'a ClusterData, ) -> impl Iterator> { - let predicate = move |node| match replica_location { - ReplicaLocationCriteria::Any | ReplicaLocationCriteria::Datacenter => predicate(&node), - ReplicaLocationCriteria::DatacenterAndRack => { - predicate(&node) && node.rack == self.preferred_rack - } - }; + let predicate = + Self::make_rack_predicate(predicate, replica_location, &self.preferred_rack); self.nonfiltered_replica_set(ts, replica_location, cluster) .into_iter() @@ -344,15 +354,11 @@ impl DefaultPolicy { &'a self, ts: &TokenWithStrategy<'a>, replica_location: ReplicaLocationCriteria, - predicate: &impl Fn(&NodeRef<'a>) -> bool, + predicate: &'a impl Fn(&NodeRef<'a>) -> bool, cluster: &'a ClusterData, ) -> Option> { - let predicate = |node| match replica_location { - ReplicaLocationCriteria::Any | ReplicaLocationCriteria::Datacenter => predicate(&node), - ReplicaLocationCriteria::DatacenterAndRack => { - predicate(&node) && node.rack == self.preferred_rack - } - }; + let predicate = + Self::make_rack_predicate(predicate, replica_location, &self.preferred_rack); let replica_set = self.nonfiltered_replica_set(ts, replica_location, cluster); @@ -368,7 +374,7 @@ impl DefaultPolicy { &'a self, ts: &TokenWithStrategy<'a>, replica_location: ReplicaLocationCriteria, - predicate: impl Fn(&NodeRef<'a>) -> bool, + predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, cluster: &'a ClusterData, ) -> impl Iterator> { let replicas = self.replicas(ts, replica_location, predicate, cluster); From bb09756a1d97da4a870f87f00c5b2c081e8079b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 13:22:46 +0200 Subject: [PATCH 06/16] default_policy: introduce ReplicaLocationPreference MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As the case of providing preferred rack without specifying the preferred datacenter is invalid, in spirit of the principle “Make illegal states unrepresentable”, an enum is introduced to restrict the possible cases. So far, it is only used in DefaultPolicy, but in the next major release DefaultBuilder's API should be changed accordingly to disallow creating such configuration. It appears that there was a bug which involved incorrect behaviour when preferred rack was provided without setting preferred datacenter: instead on ignoring the preferred rack, replicas were filtered by it. Now, the following is true: it is no longer possible to turn on rack awareness without dc-awareness; attempts to do so will result in fallback to non-rack awareness. --- .../src/transport/load_balancing/default.rs | 158 ++++++++++-------- 1 file changed, 88 insertions(+), 70 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 4c836e663e..3a1824f4bd 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -20,6 +20,29 @@ enum ReplicaLocationCriteria { DatacenterAndRack, } +#[derive(Debug, Clone)] +enum ReplicaLocationPreference { + Any, + Datacenter(String), + DatacenterAndRack(String, String), +} + +impl ReplicaLocationPreference { + fn datacenter(&self) -> Option<&str> { + match self { + Self::Any => None, + Self::Datacenter(dc) | Self::DatacenterAndRack(dc, _) => Some(dc), + } + } + + fn rack(&self) -> Option<&str> { + match self { + Self::Any | Self::Datacenter(_) => None, + Self::DatacenterAndRack(_, rack) => Some(rack), + } + } +} + // TODO: LWT optimisation /// The default load balancing policy. /// @@ -27,8 +50,7 @@ enum ReplicaLocationCriteria { /// Datacenter failover for queries with non local consistency mode is also supported. /// Latency awareness is available, although not recommended. pub struct DefaultPolicy { - preferred_datacenter: Option, - preferred_rack: Option, + preferences: ReplicaLocationPreference, is_token_aware: bool, permit_dc_failover: bool, pick_predicate: Box bool + Send + Sync>, @@ -39,8 +61,7 @@ pub struct DefaultPolicy { impl fmt::Debug for DefaultPolicy { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DefaultPolicy") - .field("preferred_datacenter", &self.preferred_datacenter) - .field("preferred_rack", &self.preferred_rack) + .field("preferences", &self.preferences) .field("is_token_aware", &self.is_token_aware) .field("permit_dc_failover", &self.permit_dc_failover) .field("latency_awareness", &self.latency_awareness) @@ -53,7 +74,7 @@ impl LoadBalancingPolicy for DefaultPolicy { fn pick<'a>(&'a self, query: &'a RoutingInfo, cluster: &'a ClusterData) -> Option> { let routing_info = self.routing_info(query, cluster); if let Some(ref token_with_strategy) = routing_info.token_with_strategy { - if self.preferred_datacenter.is_some() + if self.preferences.datacenter().is_some() && !self.permit_dc_failover && matches!( token_with_strategy.strategy, @@ -71,7 +92,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if if let Some(ts) = &routing_info.token_with_strategy { // Try to pick some alive local rack random replica. // If preferred rack is not specified, try to pick local DC replica. - if self.preferred_rack.is_some() { + if self.preferences.rack().is_some() { let local_rack_picked = self.pick_replica( ts, ReplicaLocationCriteria::DatacenterAndRack, @@ -280,10 +301,10 @@ impl DefaultPolicy { } fn preferred_node_set<'a>(&'a self, cluster: &'a ClusterData) -> &'a [Arc] { - if let Some(preferred_datacenter) = &self.preferred_datacenter { + if let Some(preferred_datacenter) = self.preferences.datacenter() { if let Some(nodes) = cluster .replica_locator() - .unique_nodes_in_datacenter_ring(preferred_datacenter.as_str()) + .unique_nodes_in_datacenter_ring(preferred_datacenter) { nodes } else { @@ -313,7 +334,7 @@ impl DefaultPolicy { }; let datacenter = should_be_local - .then_some(self.preferred_datacenter.as_deref()) + .then_some(self.preferences.datacenter()) .flatten(); cluster @@ -325,12 +346,12 @@ impl DefaultPolicy { fn make_rack_predicate<'a>( predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, replica_location: ReplicaLocationCriteria, - preferred_rack: &'a Option, + preferences: &'a ReplicaLocationPreference, ) -> impl Fn(NodeRef<'a>) -> bool { move |node| match replica_location { ReplicaLocationCriteria::Any | ReplicaLocationCriteria::Datacenter => predicate(&node), ReplicaLocationCriteria::DatacenterAndRack => { - predicate(&node) && &node.rack == preferred_rack + predicate(&node) && node.rack.as_deref() == preferences.rack() } } } @@ -342,8 +363,7 @@ impl DefaultPolicy { predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, cluster: &'a ClusterData, ) -> impl Iterator> { - let predicate = - Self::make_rack_predicate(predicate, replica_location, &self.preferred_rack); + let predicate = Self::make_rack_predicate(predicate, replica_location, &self.preferences); self.nonfiltered_replica_set(ts, replica_location, cluster) .into_iter() @@ -357,8 +377,7 @@ impl DefaultPolicy { predicate: &'a impl Fn(&NodeRef<'a>) -> bool, cluster: &'a ClusterData, ) -> Option> { - let predicate = - Self::make_rack_predicate(predicate, replica_location, &self.preferred_rack); + let predicate = Self::make_rack_predicate(predicate, replica_location, &self.preferences); let replica_set = self.nonfiltered_replica_set(ts, replica_location, cluster); @@ -436,7 +455,7 @@ impl DefaultPolicy { } fn is_datacenter_failover_possible(&self, routing_info: &ProcessedRoutingInfo) -> bool { - self.preferred_datacenter.is_some() + self.preferences.datacenter().is_some() && self.permit_dc_failover && !routing_info.local_consistency } @@ -445,8 +464,7 @@ impl DefaultPolicy { impl Default for DefaultPolicy { fn default() -> Self { Self { - preferred_datacenter: None, - preferred_rack: None, + preferences: ReplicaLocationPreference::Any, is_token_aware: true, permit_dc_failover: false, pick_predicate: Box::new(Self::is_alive), @@ -504,9 +522,24 @@ impl DefaultPolicyBuilder { Box::new(DefaultPolicy::is_alive) }; + // As the case of providing preferred rack without providing datacenter is invalid, the rack is then ignored. + // According to the principle “Make illegal states unrepresentable”, in the next major release we will + // alter the `DefaultPolicyBuilder`'s API so that it is impossible for the user to create such state. + let preferences = match (self.preferred_datacenter, self.preferred_rack) { + (None, None) => ReplicaLocationPreference::Any, + (None, Some(_)) => { + // This a is case that the user shouldn't be able to represent. + warn!("Preferred rack has effect only if a preferred datacenter is set as well. Ignoring the preferred rack."); + ReplicaLocationPreference::Any + } + (Some(datacenter), None) => ReplicaLocationPreference::Datacenter(datacenter), + (Some(datacenter), Some(rack)) => { + ReplicaLocationPreference::DatacenterAndRack(datacenter, rack) + } + }; + Arc::new(DefaultPolicy { - preferred_datacenter: self.preferred_datacenter, - preferred_rack: self.preferred_rack, + preferences, is_token_aware: self.is_token_aware, permit_dc_failover: self.permit_dc_failover, pick_predicate, @@ -687,7 +720,7 @@ mod tests { }, }; - use super::DefaultPolicy; + use super::{DefaultPolicy, ReplicaLocationPreference}; pub(crate) mod framework { use std::collections::{HashMap, HashSet}; @@ -958,7 +991,7 @@ mod tests { async fn test_default_policy_with_token_unaware_statements() { let local_dc = "eu".to_string(); let policy_with_disabled_dc_failover = DefaultPolicy { - preferred_datacenter: Some(local_dc.clone()), + preferences: ReplicaLocationPreference::Datacenter(local_dc.clone()), permit_dc_failover: false, ..Default::default() }; @@ -972,7 +1005,7 @@ mod tests { .await; let policy_with_enabled_dc_failover = DefaultPolicy { - preferred_datacenter: Some(local_dc), + preferences: ReplicaLocationPreference::Datacenter(local_dc.clone()), permit_dc_failover: true, ..Default::default() }; @@ -1003,7 +1036,7 @@ mod tests { // Keyspace NTS with RF=2 with enabled DC failover Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: true, ..Default::default() @@ -1027,7 +1060,7 @@ mod tests { // Keyspace NTS with RF=2 with enabled DC failover, shuffling replicas disabled Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: true, fixed_shuffle_seed: Some(123), @@ -1052,7 +1085,7 @@ mod tests { // Keyspace NTS with RF=2 with DC failover forbidden by local Consistency Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: true, ..Default::default() @@ -1074,7 +1107,7 @@ mod tests { // Keyspace NTS with RF=2 with explicitly disabled DC failover Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: false, ..Default::default() @@ -1096,7 +1129,7 @@ mod tests { // Keyspace NTS with RF=3 with enabled DC failover Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: true, ..Default::default() @@ -1120,7 +1153,7 @@ mod tests { // Keyspace NTS with RF=3 with enabled DC failover, shuffling replicas disabled Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: true, fixed_shuffle_seed: Some(123), @@ -1145,7 +1178,7 @@ mod tests { // Keyspace NTS with RF=3 with disabled DC failover Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: false, ..Default::default() @@ -1167,7 +1200,7 @@ mod tests { // Keyspace SS with RF=2 with enabled DC failover Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: true, ..Default::default() @@ -1191,7 +1224,7 @@ mod tests { // Keyspace SS with RF=2 with DC failover forbidden by local Consistency Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: true, ..Default::default() @@ -1213,7 +1246,7 @@ mod tests { // No token implies no token awareness Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: true, ..Default::default() @@ -1232,7 +1265,7 @@ mod tests { // No keyspace implies no token awareness Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), is_token_aware: true, permit_dc_failover: true, ..Default::default() @@ -1251,7 +1284,7 @@ mod tests { // Unknown preferred DC, failover permitted Test { policy: DefaultPolicy { - preferred_datacenter: Some("au".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("au".to_owned()), is_token_aware: true, permit_dc_failover: true, ..Default::default() @@ -1273,7 +1306,7 @@ mod tests { // Unknown preferred DC, failover forbidden Test { policy: DefaultPolicy { - preferred_datacenter: Some("au".to_owned()), + preferences: ReplicaLocationPreference::Datacenter("au".to_owned()), is_token_aware: true, permit_dc_failover: false, ..Default::default() @@ -1292,7 +1325,7 @@ mod tests { // No preferred DC, failover permitted Test { policy: DefaultPolicy { - preferred_datacenter: None, + preferences: ReplicaLocationPreference::Any, is_token_aware: true, permit_dc_failover: true, ..Default::default() @@ -1314,7 +1347,7 @@ mod tests { // No preferred DC, failover forbidden Test { policy: DefaultPolicy { - preferred_datacenter: None, + preferences: ReplicaLocationPreference::Any, is_token_aware: true, permit_dc_failover: false, ..Default::default() @@ -1336,8 +1369,10 @@ mod tests { // Keyspace SS with RF=2 with enabled DC failover and rack-awareness Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), - preferred_rack: Some("r1".to_owned()), + preferences: ReplicaLocationPreference::DatacenterAndRack( + "eu".to_owned(), + "r1".to_owned(), + ), is_token_aware: true, permit_dc_failover: true, ..Default::default() @@ -1361,8 +1396,10 @@ mod tests { // Keyspace SS with RF=2 with enabled rack-awareness, shuffling replicas disabled Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), - preferred_rack: Some("r1".to_owned()), + preferences: ReplicaLocationPreference::DatacenterAndRack( + "eu".to_owned(), + "r1".to_owned(), + ), is_token_aware: true, permit_dc_failover: false, fixed_shuffle_seed: Some(123), @@ -1386,8 +1423,10 @@ mod tests { // Keyspace SS with RF=2 with enabled rack-awareness and no local-rack replica Test { policy: DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), - preferred_rack: Some("r2".to_owned()), + preferences: ReplicaLocationPreference::DatacenterAndRack( + "eu".to_owned(), + "r2".to_owned(), + ), is_token_aware: true, permit_dc_failover: false, ..Default::default() @@ -1406,29 +1445,6 @@ mod tests { .group([C, G, B]) // local nodes .build(), }, - // No preferred DC, preferred rack should be ignored, failover permitted - Test { - policy: DefaultPolicy { - preferred_datacenter: None, - preferred_rack: Some("r2".to_owned()), - is_token_aware: true, - permit_dc_failover: true, - ..Default::default() - }, - routing_info: RoutingInfo { - token: Some(Token { value: 160 }), - keyspace: Some(KEYSPACE_NTS_RF_2), - consistency: Consistency::Quorum, - ..Default::default() - }, - // going though the ring, we get order: F , A , C , D , G , B , E - // us eu eu us eu eu us - // r2 r1 r1 r1 r2 r1 r1 - expected_groups: ExpectedGroupsBuilder::new() - .group([A, D, F, G]) // remote replicas - .group([B, C, E]) // remote nodes - .build(), - }, ]; for Test { @@ -1988,7 +2004,10 @@ mod latency_awareness { *, }; - use crate::test_utils::create_new_session_builder; + use crate::{ + load_balancing::default::ReplicaLocationPreference, + test_utils::create_new_session_builder, + }; use crate::{ load_balancing::{ default::tests::test_default_policy_with_given_cluster_and_routing_info, @@ -2054,8 +2073,7 @@ mod latency_awareness { }; DefaultPolicy { - preferred_datacenter: Some("eu".to_owned()), - preferred_rack: None, + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), permit_dc_failover: true, is_token_aware: true, pick_predicate, From c2091df00805bec7d03fdf31379653a66397c109 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Fri, 12 May 2023 18:27:52 +0200 Subject: [PATCH 07/16] default_policy: harden ReplicaLocationCriteria It is cleaner when the Criteria itself carries the required information for the filtering, as well as less prone to errors (because one has to provide the required DC/rack name). As a bonus, the semantics of helper fns calls in `pick()` were made more sane: `pick_replica()` is called with `ReplicaLocationCriteria::Datacenter` only if the preferred datacenter is actually specified. `fallback()` was modified analogously (in regard to calling `shuffled_replicas()`). --- .../src/transport/load_balancing/default.rs | 140 ++++++++++-------- 1 file changed, 82 insertions(+), 58 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 3a1824f4bd..75b5d26854 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -14,10 +14,19 @@ use std::{fmt, sync::Arc, time::Duration}; use tracing::warn; #[derive(Clone, Copy)] -enum ReplicaLocationCriteria { +enum ReplicaLocationCriteria<'a> { Any, - Datacenter, - DatacenterAndRack, + Datacenter(&'a str), + DatacenterAndRack(&'a str, &'a str), +} + +impl<'a> ReplicaLocationCriteria<'a> { + fn datacenter(&self) -> Option<&'a str> { + match self { + Self::Any => None, + Self::Datacenter(dc) | Self::DatacenterAndRack(dc, _) => Some(dc), + } + } } #[derive(Debug, Clone)] @@ -35,6 +44,7 @@ impl ReplicaLocationPreference { } } + #[allow(unused)] fn rack(&self) -> Option<&str> { match self { Self::Any | Self::Datacenter(_) => None, @@ -90,12 +100,11 @@ or refrain from preferring datacenters (which may ban all other datacenters, if } } if let Some(ts) = &routing_info.token_with_strategy { - // Try to pick some alive local rack random replica. - // If preferred rack is not specified, try to pick local DC replica. - if self.preferences.rack().is_some() { + if let ReplicaLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { + // Try to pick some alive local rack random replica. let local_rack_picked = self.pick_replica( ts, - ReplicaLocationCriteria::DatacenterAndRack, + ReplicaLocationCriteria::DatacenterAndRack(dc, rack), &self.pick_predicate, cluster, ); @@ -105,21 +114,27 @@ or refrain from preferring datacenters (which may ban all other datacenters, if } } - // Try to pick some alive local random replica. - // If preferred datacenter is not specified, all replicas are treated as local. - let picked = self.pick_replica( - ts, - ReplicaLocationCriteria::Datacenter, - &self.pick_predicate, - cluster, - ); + if let ReplicaLocationPreference::DatacenterAndRack(dc, _) + | ReplicaLocationPreference::Datacenter(dc) = &self.preferences + { + // Try to pick some alive local random replica. + let picked = self.pick_replica( + ts, + ReplicaLocationCriteria::Datacenter(dc), + &self.pick_predicate, + cluster, + ); - if let Some(alive_local_replica) = picked { - return Some(alive_local_replica); + if let Some(alive_local_replica) = picked { + return Some(alive_local_replica); + } } - // If datacenter failover is possible, loosen restriction about locality. - if self.is_datacenter_failover_possible(&routing_info) { + // If preferred datacenter is not specified, or if datacenter failover is possible, loosen restriction about locality. + if self.preferences.datacenter().is_none() + || self.is_datacenter_failover_possible(&routing_info) + { + // Try to pick some alive random replica. let picked = self.pick_replica( ts, ReplicaLocationCriteria::Any, @@ -178,21 +193,38 @@ or refrain from preferring datacenters (which may ban all other datacenters, if // If token is available, get a shuffled list of alive replicas. let maybe_replicas = if let Some(ts) = &routing_info.token_with_strategy { - let local_rack_replicas = self.shuffled_replicas( - ts, - ReplicaLocationCriteria::DatacenterAndRack, - Self::is_alive, - cluster, - ); - let local_replicas = self.shuffled_replicas( - ts, - ReplicaLocationCriteria::Datacenter, - Self::is_alive, - cluster, - ); + let maybe_local_rack_replicas = + if let ReplicaLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { + let local_rack_replicas = self.shuffled_replicas( + ts, + ReplicaLocationCriteria::DatacenterAndRack(dc, rack), + Self::is_alive, + cluster, + ); + Either::Left(local_rack_replicas) + } else { + Either::Right(std::iter::empty()) + }; + + let maybe_local_replicas = if let ReplicaLocationPreference::DatacenterAndRack(dc, _) + | ReplicaLocationPreference::Datacenter(dc) = + &self.preferences + { + let local_replicas = self.shuffled_replicas( + ts, + ReplicaLocationCriteria::Datacenter(dc), + Self::is_alive, + cluster, + ); + Either::Left(local_replicas) + } else { + Either::Right(std::iter::empty()) + }; - // If a datacenter failover is possible, loosen restriction about locality. - let maybe_remote_replicas = if self.is_datacenter_failover_possible(&routing_info) { + // If no datacenter is preferred, or datacenter failover is possible, loosen restriction about locality. + let maybe_remote_replicas = if self.preferences.datacenter().is_none() + || self.is_datacenter_failover_possible(&routing_info) + { let remote_replicas = self.shuffled_replicas( ts, ReplicaLocationCriteria::Any, @@ -204,11 +236,11 @@ or refrain from preferring datacenters (which may ban all other datacenters, if Either::Right(std::iter::empty()) }; - // Produce an iterator, prioritizes local replicas. - // If preferred datacenter is not specified, every replica is treated as a local one. + // Produce an iterator, prioritizing local replicas. + // If preferred datacenter is not specified, every replica is treated as a remote one. Either::Left( - local_rack_replicas - .chain(local_replicas) + maybe_local_rack_replicas + .chain(maybe_local_replicas) .chain(maybe_remote_replicas), ) } else { @@ -323,19 +355,10 @@ impl DefaultPolicy { fn nonfiltered_replica_set<'a>( &'a self, ts: &TokenWithStrategy<'a>, - replica_location: ReplicaLocationCriteria, + replica_location: ReplicaLocationCriteria<'a>, cluster: &'a ClusterData, ) -> ReplicaSet<'a> { - let should_be_local = match replica_location { - ReplicaLocationCriteria::Any => false, - ReplicaLocationCriteria::Datacenter | ReplicaLocationCriteria::DatacenterAndRack => { - true - } - }; - - let datacenter = should_be_local - .then_some(self.preferences.datacenter()) - .flatten(); + let datacenter = replica_location.datacenter(); cluster .replica_locator() @@ -345,13 +368,14 @@ impl DefaultPolicy { /// Wraps the provided predicate, adding the requirement for rack to match. fn make_rack_predicate<'a>( predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, - replica_location: ReplicaLocationCriteria, - preferences: &'a ReplicaLocationPreference, + replica_location: ReplicaLocationCriteria<'a>, ) -> impl Fn(NodeRef<'a>) -> bool { move |node| match replica_location { - ReplicaLocationCriteria::Any | ReplicaLocationCriteria::Datacenter => predicate(&node), - ReplicaLocationCriteria::DatacenterAndRack => { - predicate(&node) && node.rack.as_deref() == preferences.rack() + ReplicaLocationCriteria::Any | ReplicaLocationCriteria::Datacenter(_) => { + predicate(&node) + } + ReplicaLocationCriteria::DatacenterAndRack(_, rack) => { + predicate(&node) && node.rack.as_deref() == Some(rack) } } } @@ -359,11 +383,11 @@ impl DefaultPolicy { fn replicas<'a>( &'a self, ts: &TokenWithStrategy<'a>, - replica_location: ReplicaLocationCriteria, + replica_location: ReplicaLocationCriteria<'a>, predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, cluster: &'a ClusterData, ) -> impl Iterator> { - let predicate = Self::make_rack_predicate(predicate, replica_location, &self.preferences); + let predicate = Self::make_rack_predicate(predicate, replica_location); self.nonfiltered_replica_set(ts, replica_location, cluster) .into_iter() @@ -373,11 +397,11 @@ impl DefaultPolicy { fn pick_replica<'a>( &'a self, ts: &TokenWithStrategy<'a>, - replica_location: ReplicaLocationCriteria, + replica_location: ReplicaLocationCriteria<'a>, predicate: &'a impl Fn(&NodeRef<'a>) -> bool, cluster: &'a ClusterData, ) -> Option> { - let predicate = Self::make_rack_predicate(predicate, replica_location, &self.preferences); + let predicate = Self::make_rack_predicate(predicate, replica_location); let replica_set = self.nonfiltered_replica_set(ts, replica_location, cluster); @@ -392,7 +416,7 @@ impl DefaultPolicy { fn shuffled_replicas<'a>( &'a self, ts: &TokenWithStrategy<'a>, - replica_location: ReplicaLocationCriteria, + replica_location: ReplicaLocationCriteria<'a>, predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, cluster: &'a ClusterData, ) -> impl Iterator> { From 88f46058ca9db98df0e54809127a925da926ec5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 11 May 2023 18:18:45 +0200 Subject: [PATCH 08/16] {lb,plan}: call fallback() even if pick() is None We want to keep regarding pick() as a cheap, happy path function, which does not allocate. In LWT optimisation, however, only the primary replica can be computed without allocations (due to specifics of Network Topology Strategy, replicas are not assigned greedily, but their distribution across racks is being balanced). If a policy (in our case, the default policy) recognizes that a picked replica is down, it would try to pick another, and this computation would be expensive for ReplicasOrdered. Instead, having recognized during LWT optimised case that a picked replica is down, the policy returns None from pick() to hint that further computation will be expensive. The plan logic is hence altered to call fallback() even if pick() returns None. In the non-LWT case, as subsequent calls to pick() are still cheap, pick() will still try to find next replicas if one is recognized to be down. A test is added. It asserts that `fallback()` is called if `pick()` returned None. For the test to be possible to be written, a convenience constructor is added for `Node` under `cfg(test)`. --- scylla/src/transport/load_balancing/mod.rs | 5 +- scylla/src/transport/load_balancing/plan.rs | 83 ++++++++++++++++++++- scylla/src/transport/node.rs | 22 ++++++ 3 files changed, 104 insertions(+), 6 deletions(-) diff --git a/scylla/src/transport/load_balancing/mod.rs b/scylla/src/transport/load_balancing/mod.rs index aaf828c443..d4095743c3 100644 --- a/scylla/src/transport/load_balancing/mod.rs +++ b/scylla/src/transport/load_balancing/mod.rs @@ -53,9 +53,8 @@ pub type FallbackPlan<'a> = Box> + Send + Sync + /// `pick` and `fallback`. `pick` returns a first node to contact for a given query, `fallback` /// returns the rest of the load balancing plan. /// -/// `fallback` is called only after a failed send to `pick`ed node (or when executing -/// speculatively). -/// If a `pick` returns `None`, `fallback` will not be called. +/// `fallback` is called not only if a send to `pick`ed node failed (or when executing +/// speculatively), but also if `pick` returns `None`. /// /// Usually the driver needs only the first node from load balancing plan (most queries are send /// successfully, and there is no need to retry). diff --git a/scylla/src/transport/load_balancing/plan.rs b/scylla/src/transport/load_balancing/plan.rs index cf6b37cfeb..e49d4cb012 100644 --- a/scylla/src/transport/load_balancing/plan.rs +++ b/scylla/src/transport/load_balancing/plan.rs @@ -54,9 +54,24 @@ impl<'a> Iterator for Plan<'a> { self.state = PlanState::Picked(picked); Some(picked) } else { - error!("Load balancing policy returned an empty plan! The query cannot be executed. Routing info: {:?}", self.routing_info); - self.state = PlanState::PickedNone; - None + // `pick()` returned None, which semantically means that a first node cannot be computed _cheaply_. + // This, however, does not imply that fallback would return an empty plan, too. + // For instance, as a side effect of LWT optimisation in Default Policy, pick() may return None + // when the primary replica is down. `fallback()` will nevertheless return the remaining replicas, + // if there are such. + let mut iter = self.policy.fallback(self.routing_info, self.cluster); + let first_fallback_node = iter.next(); + if let Some(node) = first_fallback_node { + self.state = PlanState::Fallback { + iter, + node_to_filter_out: node, + }; + Some(node) + } else { + error!("Load balancing policy returned an empty plan! The query cannot be executed. Routing info: {:?}", self.routing_info); + self.state = PlanState::PickedNone; + None + } } } PlanState::Picked(node) => { @@ -85,3 +100,65 @@ impl<'a> Iterator for Plan<'a> { } } } + +#[cfg(test)] +mod tests { + use std::{net::SocketAddr, str::FromStr, sync::Arc}; + + use crate::transport::{ + locator::test::{create_locator, mock_metadata_for_token_aware_tests}, + Node, NodeAddr, + }; + + use super::*; + + fn expected_nodes() -> Vec> { + vec![Arc::new(Node::new_for_test( + NodeAddr::Translatable(SocketAddr::from_str("127.0.0.1:9042").unwrap()), + None, + None, + ))] + } + + #[derive(Debug)] + struct PickingNonePolicy { + expected_nodes: Vec>, + } + impl LoadBalancingPolicy for PickingNonePolicy { + fn pick<'a>( + &'a self, + _query: &'a RoutingInfo, + _cluster: &'a ClusterData, + ) -> Option> { + None + } + + fn fallback<'a>( + &'a self, + _query: &'a RoutingInfo, + _cluster: &'a ClusterData, + ) -> FallbackPlan<'a> { + Box::new(self.expected_nodes.iter()) + } + + fn name(&self) -> String { + "PickingNone".into() + } + } + + #[tokio::test] + async fn plan_calls_fallback_even_if_pick_returned_none() { + let policy = PickingNonePolicy { + expected_nodes: expected_nodes(), + }; + let locator = create_locator(&mock_metadata_for_token_aware_tests()); + let cluster_data = ClusterData { + known_peers: Default::default(), + keyspaces: Default::default(), + locator, + }; + let routing_info = RoutingInfo::default(); + let plan = Plan::new(&policy, &routing_info, &cluster_data); + assert_eq!(Vec::from_iter(plan.cloned()), policy.expected_nodes); + } +} diff --git a/scylla/src/transport/node.rs b/scylla/src/transport/node.rs index 1cc33b6a9c..e796e6bea2 100644 --- a/scylla/src/transport/node.rs +++ b/scylla/src/transport/node.rs @@ -214,3 +214,25 @@ impl Hash for Node { self.host_id.hash(state); } } + +#[cfg(test)] +mod tests { + use super::*; + + impl Node { + pub(crate) fn new_for_test( + address: NodeAddr, + datacenter: Option, + rack: Option, + ) -> Self { + Self { + host_id: Uuid::new_v4(), + address, + datacenter, + rack, + pool: None, + down_marker: false.into(), + } + } + } +} From 920e6dfdf67e7dfd8909e6a770d8cdbacf564db7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 11 May 2023 18:33:18 +0200 Subject: [PATCH 09/16] locator: implement ReplicasOrdered (ring-wise) In order to be consistent with LWT optimisation implementation in other drivers, it is mandatory to return query plan consisting of replicas put in ring order. To accomplish that, the (abstractly unordered) ReplicaSet becomes convertible into a newly defined ReplicasOrdered struct. Iterating over ReplicasOrdered yields replicas in ring order, so policies can depend on it to perform LWT optimisation correctly. Note that the computation of ReplicasOrdered is lazy and performed in two steps. - the first step is relatively cheap and free of allocations and involves finding the primary replica by iterating over the global ring. It is triggered by the first call to ReplicasOrderedIterator::next(). - the second step is relatively expensive and involves allocations. It computes the whole ReplicaArray of all replicas, put in the ring over. Precomputation is not an option, as it would be very expensive to compute and store all precomputed arrays that would respect the global ring order (as opposed to only local, per-DC ring order). --- scylla/src/transport/locator/mod.rs | 260 +++++++++++++++++++++++++++- 1 file changed, 258 insertions(+), 2 deletions(-) diff --git a/scylla/src/transport/locator/mod.rs b/scylla/src/transport/locator/mod.rs index 95a4f4fc1e..fb4993430e 100644 --- a/scylla/src/transport/locator/mod.rs +++ b/scylla/src/transport/locator/mod.rs @@ -14,7 +14,11 @@ use itertools::Itertools; use precomputed_replicas::PrecomputedReplicas; use replicas::{ReplicasArray, EMPTY_REPLICAS}; use replication_info::ReplicationInfo; -use std::{cmp, collections::HashMap, sync::Arc}; +use std::{ + cmp, + collections::{HashMap, HashSet}, + sync::Arc, +}; use tracing::debug; /// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication @@ -376,7 +380,7 @@ impl<'a> IntoIterator for ReplicaSet<'a> { locator, token, } => { - if let Some(datacenter) = &locator.datacenters.first() { + if let Some(datacenter) = locator.datacenters.first() { let repfactor = *datacenter_repfactors.get(datacenter.as_str()).unwrap_or(&0); ReplicaSetIteratorInner::ChainedNTS { replicas: locator @@ -549,3 +553,255 @@ impl<'a> Iterator for ReplicaSetIterator<'a> { } } } + +impl<'a> ReplicaSet<'a> { + pub fn into_replicas_ordered(self) -> ReplicasOrdered<'a> { + ReplicasOrdered { replica_set: self } + } +} + +/// Represents a sequence of replicas for a given token and strategy, +/// ordered according to the ring order. +/// +/// This container can only be created by calling `ReplicaSet::into_replicas_ordered()`, +/// and either it can borrow precomputed replica lists living in the locator (in case of SimpleStrategy) +/// or it must compute them on-demand (in case of NetworkTopologyStrategy). +/// The computation is lazy (performed by `ReplicasOrderedIterator` upon call to `next()`). +/// For obtaining the primary replica, no allocations are needed. Therefore, the first call +/// to `next()` is optimised and doesn not allocate. +/// For the remaining others, unfortunately, allocation is unevitable. +pub struct ReplicasOrdered<'a> { + replica_set: ReplicaSet<'a>, +} + +/// Iterator that returns replicas from some replica sequence, ordered according to the ring order. +pub struct ReplicasOrderedIterator<'a> { + inner: ReplicasOrderedIteratorInner<'a>, +} + +enum ReplicasOrderedIteratorInner<'a> { + AlreadyRingOrdered { + // In case of Plain and FilteredSimple variants, ReplicaSetIterator respects ring order. + replica_set_iter: ReplicaSetIterator<'a>, + }, + PolyDatacenterNTS { + // In case of ChainedNTS variant, ReplicaSetIterator does not respect ring order, + // so specific code is needed to yield replicas according to that order. + replicas_ordered_iter: ReplicasOrderedNTSIterator<'a>, + }, +} + +enum ReplicasOrderedNTSIterator<'a> { + FreshForPick { + datacenter_repfactors: &'a HashMap, + locator: &'a ReplicaLocator, + token: Token, + }, + Picked { + datacenter_repfactors: &'a HashMap, + locator: &'a ReplicaLocator, + token: Token, + picked: NodeRef<'a>, + }, + ComputedFallback { + replicas: ReplicasArray<'a>, + idx: usize, + }, +} + +impl<'a> Iterator for ReplicasOrderedNTSIterator<'a> { + type Item = NodeRef<'a>; + + fn next(&mut self) -> Option { + match *self { + Self::FreshForPick { + datacenter_repfactors, + locator, + token, + } => { + // We're going to find the primary replica for the given token. + let nodes_on_ring = locator.replication_data.get_global_ring().ring_range(token); + for node in nodes_on_ring { + // If this node's DC has some replicas in this NTS... + if let Some(dc) = &node.datacenter { + if datacenter_repfactors.get(dc).is_some() { + // ...then this node must be the primary replica. + *self = Self::Picked { + datacenter_repfactors, + locator, + token, + picked: node, + }; + return Some(node); + } + } + } + None + } + Self::Picked { + datacenter_repfactors, + locator, + token, + picked, + } => { + // Clippy can't check that in Eq and Hash impls we don't actually use any field with interior mutability + // (in Node only `down_marker` is such, being an AtomicBool). + // https://rust-lang.github.io/rust-clippy/master/index.html#mutable_key_type + #[allow(clippy::mutable_key_type)] + let mut all_replicas: HashSet<&'a Arc> = HashSet::new(); + for (datacenter, repfactor) in datacenter_repfactors.iter() { + all_replicas.extend( + locator + .get_network_strategy_replicas(token, datacenter, *repfactor) + .iter(), + ); + } + // It's no use returning a node that was already picked. + all_replicas.remove(picked); + + let mut replicas_ordered = vec![]; + let nodes_on_ring = locator.replication_data.get_global_ring().ring_range(token); + for node in nodes_on_ring { + if all_replicas.is_empty() { + // All replicas were put in order. + break; + } + if all_replicas.remove(node) { + replicas_ordered.push(node); + } + } + assert!( + all_replicas.is_empty(), + "all_replicas somehow contained a node that wasn't present in the global ring!" + ); + + *self = Self::ComputedFallback { + replicas: ReplicasArray::Owned(replicas_ordered), + idx: 0, + }; + self.next() + } + Self::ComputedFallback { + ref replicas, + ref mut idx, + } => { + if let Some(replica) = replicas.get(*idx) { + *idx += 1; + Some(replica) + } else { + None + } + } + } + } +} + +impl<'a> Iterator for ReplicasOrderedIterator<'a> { + type Item = NodeRef<'a>; + + fn next(&mut self) -> Option { + match &mut self.inner { + ReplicasOrderedIteratorInner::AlreadyRingOrdered { replica_set_iter } => { + replica_set_iter.next() + } + ReplicasOrderedIteratorInner::PolyDatacenterNTS { + replicas_ordered_iter, + } => replicas_ordered_iter.next(), + } + } +} + +impl<'a> IntoIterator for ReplicasOrdered<'a> { + type Item = NodeRef<'a>; + type IntoIter = ReplicasOrderedIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + let Self { replica_set } = self; + Self::IntoIter { + inner: match replica_set.inner { + ReplicaSetInner::Plain(_) | ReplicaSetInner::FilteredSimple { .. } => { + ReplicasOrderedIteratorInner::AlreadyRingOrdered { + replica_set_iter: replica_set.into_iter(), + } + } + ReplicaSetInner::ChainedNTS { + datacenter_repfactors, + locator, + token, + } => ReplicasOrderedIteratorInner::PolyDatacenterNTS { + replicas_ordered_iter: ReplicasOrderedNTSIterator::FreshForPick { + datacenter_repfactors, + locator, + token, + }, + }, + }, + } + } +} + +#[cfg(test)] +mod tests { + use crate::{routing::Token, transport::locator::test::*}; + + #[tokio::test] + async fn test_replicas_ordered() { + let metadata = mock_metadata_for_token_aware_tests(); + let locator = create_locator(&metadata); + + // For each case (token, limit_to_dc, strategy), we are checking + // that ReplicasOrdered yields replicas in the expected order. + let check = |token, limit_to_dc, strategy, expected| { + let replica_set = + locator.replicas_for_token(Token { value: token }, strategy, limit_to_dc); + let replicas_ordered = replica_set.into_replicas_ordered(); + let ids: Vec<_> = replicas_ordered + .into_iter() + .map(|node| node.address.port()) + .collect(); + assert_eq!(expected, ids); + }; + + // In all these tests: + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + check( + 160, + None, + &metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy, + vec![F, A, C, D, G, E], + ); + check( + 160, + None, + &metadata.keyspaces.get(KEYSPACE_NTS_RF_2).unwrap().strategy, + vec![F, A, D, G], + ); + check( + 160, + None, + &metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy, + vec![F, A], + ); + + check( + 160, + Some("eu"), + &metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy, + vec![A, C, G], + ); + check( + 160, + Some("us"), + &metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy, + vec![F, D, E], + ); + check( + 160, + Some("eu"), + &metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy, + vec![A], + ); + } +} From 304d2f1fcfe2c260cc515edacfe512ad8852b1aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 08:37:17 +0200 Subject: [PATCH 10/16] default_policy: introduce and respect ReplicaOrder For LWT optimisation, we want to request replicas to be given in the ring order. For other cases (non-LWT requests) the order: - either does not matter (can be arbitrary), - or it is required to be random, but then the replicas are to be shuffled anyway, so before shuffling they can be given, again, in arbitrary order. This leads to introducing the enum ReplicaOrder with Arbitrary and RingOrder variants. --- .../src/transport/load_balancing/default.rs | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 75b5d26854..f2e8eab2d9 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -53,6 +53,12 @@ impl ReplicaLocationPreference { } } +#[derive(Clone, Copy)] +enum ReplicaOrder { + Arbitrary, + RingOrder, +} + // TODO: LWT optimisation /// The default load balancing policy. /// @@ -386,12 +392,22 @@ impl DefaultPolicy { replica_location: ReplicaLocationCriteria<'a>, predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, cluster: &'a ClusterData, + order: ReplicaOrder, ) -> impl Iterator> { let predicate = Self::make_rack_predicate(predicate, replica_location); - self.nonfiltered_replica_set(ts, replica_location, cluster) - .into_iter() - .filter(move |node: &NodeRef<'a>| predicate(node)) + let replica_iter = match order { + ReplicaOrder::Arbitrary => Either::Left( + self.nonfiltered_replica_set(ts, replica_location, cluster) + .into_iter(), + ), + ReplicaOrder::RingOrder => Either::Right( + self.nonfiltered_replica_set(ts, replica_location, cluster) + .into_replicas_ordered() + .into_iter(), + ), + }; + replica_iter.filter(move |node: &NodeRef<'a>| predicate(node)) } fn pick_replica<'a>( @@ -420,7 +436,13 @@ impl DefaultPolicy { predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, cluster: &'a ClusterData, ) -> impl Iterator> { - let replicas = self.replicas(ts, replica_location, predicate, cluster); + let replicas = self.replicas( + ts, + replica_location, + predicate, + cluster, + ReplicaOrder::Arbitrary, + ); self.shuffle(replicas) } From cbaae8f6e1ea31a6e5f7447ece5d7e810c29292c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 08:40:03 +0200 Subject: [PATCH 11/16] default_policy: introduce and respect StatementType For strong typing, an enum StatementType is introduces with Lwt and NonLwt variants. If the executed query is confirmed LWT, then it will be routed to replicas in ring order. --- .../src/transport/load_balancing/default.rs | 126 ++++++++++++++++-- 1 file changed, 113 insertions(+), 13 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index f2e8eab2d9..070474603e 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -59,7 +59,12 @@ enum ReplicaOrder { RingOrder, } -// TODO: LWT optimisation +#[derive(Clone, Copy)] +enum StatementType { + Lwt, + NonLwt, +} + /// The default load balancing policy. /// /// It can be configured to be datacenter-aware and token-aware. @@ -105,6 +110,11 @@ or refrain from preferring datacenters (which may ban all other datacenters, if ); } } + let statement_type = if query.is_confirmed_lwt { + StatementType::Lwt + } else { + StatementType::NonLwt + }; if let Some(ts) = &routing_info.token_with_strategy { if let ReplicaLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { // Try to pick some alive local rack random replica. @@ -113,6 +123,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if ReplicaLocationCriteria::DatacenterAndRack(dc, rack), &self.pick_predicate, cluster, + statement_type, ); if let Some(alive_local_rack_replica) = local_rack_picked { @@ -129,6 +140,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if ReplicaLocationCriteria::Datacenter(dc), &self.pick_predicate, cluster, + statement_type, ); if let Some(alive_local_replica) = picked { @@ -146,6 +158,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if ReplicaLocationCriteria::Any, &self.pick_predicate, cluster, + statement_type, ); if let Some(alive_remote_replica) = picked { return Some(alive_remote_replica); @@ -196,16 +209,22 @@ or refrain from preferring datacenters (which may ban all other datacenters, if cluster: &'a ClusterData, ) -> FallbackPlan<'a> { let routing_info = self.routing_info(query, cluster); + let statement_type = if query.is_confirmed_lwt { + StatementType::Lwt + } else { + StatementType::NonLwt + }; // If token is available, get a shuffled list of alive replicas. let maybe_replicas = if let Some(ts) = &routing_info.token_with_strategy { let maybe_local_rack_replicas = if let ReplicaLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { - let local_rack_replicas = self.shuffled_replicas( + let local_rack_replicas = self.fallback_replicas( ts, ReplicaLocationCriteria::DatacenterAndRack(dc, rack), Self::is_alive, cluster, + statement_type, ); Either::Left(local_rack_replicas) } else { @@ -216,11 +235,12 @@ or refrain from preferring datacenters (which may ban all other datacenters, if | ReplicaLocationPreference::Datacenter(dc) = &self.preferences { - let local_replicas = self.shuffled_replicas( + let local_replicas = self.fallback_replicas( ts, ReplicaLocationCriteria::Datacenter(dc), Self::is_alive, cluster, + statement_type, ); Either::Left(local_replicas) } else { @@ -231,11 +251,12 @@ or refrain from preferring datacenters (which may ban all other datacenters, if let maybe_remote_replicas = if self.preferences.datacenter().is_none() || self.is_datacenter_failover_possible(&routing_info) { - let remote_replicas = self.shuffled_replicas( + let remote_replicas = self.fallback_replicas( ts, ReplicaLocationCriteria::Any, Self::is_alive, cluster, + statement_type, ); Either::Left(remote_replicas) } else { @@ -416,6 +437,80 @@ impl DefaultPolicy { replica_location: ReplicaLocationCriteria<'a>, predicate: &'a impl Fn(&NodeRef<'a>) -> bool, cluster: &'a ClusterData, + statement_type: StatementType, + ) -> Option> { + match statement_type { + StatementType::Lwt => self.pick_first_replica(ts, replica_location, predicate, cluster), + StatementType::NonLwt => { + self.pick_random_replica(ts, replica_location, predicate, cluster) + } + } + } + + // This is to be used for LWT optimisation: in order to reduce contention + // caused by Paxos conflicts, we always try to query replicas in the same, ring order. + // + // If preferred rack and DC are set, then the first (encountered on the ring) replica + // that resides in that rack in that DC **and** satisfies the `predicate` is returned. + // + // If preferred DC is set, then the first (encountered on the ring) replica + // that resides in that DC **and** satisfies the `predicate` is returned. + // + // If no DC/rack preferences are set, then the only possible replica to be returned + // (due to expensive computation of the others, and we avoid expensive computation in `pick()`) + // is the primary replica. It is returned **iff** it satisfies the predicate, else None. + fn pick_first_replica<'a>( + &'a self, + ts: &TokenWithStrategy<'a>, + replica_location: ReplicaLocationCriteria<'a>, + predicate: &'a impl Fn(&NodeRef<'a>) -> bool, + cluster: &'a ClusterData, + ) -> Option> { + match replica_location { + ReplicaLocationCriteria::Any => { + // ReplicaSet returned by ReplicaLocator for this case: + // 1) can be precomputed and lated used cheaply, + // 2) returns replicas in the **non-ring order** (this because ReplicaSet chains + // ring-ordered replicas sequences from different DCs, thus not preserving + // the global ring order). + // Because of 2), we can't use a precomputed ReplicaSet, but instead we need ReplicasOrdered. + // As ReplicasOrdered can compute cheaply only the primary global replica + // (computation of the remaining ones is expensive), in case that the primary replica + // does not satisfy the `predicate`, None is returned. All expensive computation + // is to be done only when `fallback()` is called. + self.nonfiltered_replica_set(ts, replica_location, cluster) + .into_replicas_ordered() + .into_iter() + .next() + .and_then(|primary_replica| { + predicate(&primary_replica).then_some(primary_replica) + }) + } + ReplicaLocationCriteria::Datacenter(_) + | ReplicaLocationCriteria::DatacenterAndRack(_, _) => { + // ReplicaSet returned by ReplicaLocator for this case: + // 1) can be precomputed and lated used cheaply, + // 2) returns replicas in the ring order (this is not true for the case + // when multiple DCs are allowed, because ReplicaSet chains replicas sequences + // from different DCs, thus not preserving the global ring order) + self.replicas( + ts, + replica_location, + move |node| predicate(node), + cluster, + ReplicaOrder::RingOrder, + ) + .next() + } + } + } + + fn pick_random_replica<'a>( + &'a self, + ts: &TokenWithStrategy<'a>, + replica_location: ReplicaLocationCriteria<'a>, + predicate: &'a impl Fn(&NodeRef<'a>) -> bool, + cluster: &'a ClusterData, ) -> Option> { let predicate = Self::make_rack_predicate(predicate, replica_location); @@ -429,22 +524,27 @@ impl DefaultPolicy { } } - fn shuffled_replicas<'a>( + fn fallback_replicas<'a>( &'a self, ts: &TokenWithStrategy<'a>, replica_location: ReplicaLocationCriteria<'a>, predicate: impl Fn(&NodeRef<'a>) -> bool + 'a, cluster: &'a ClusterData, + statement_type: StatementType, ) -> impl Iterator> { - let replicas = self.replicas( - ts, - replica_location, - predicate, - cluster, - ReplicaOrder::Arbitrary, - ); + let order = match statement_type { + StatementType::Lwt => ReplicaOrder::RingOrder, + StatementType::NonLwt => ReplicaOrder::Arbitrary, + }; - self.shuffle(replicas) + let replicas = self.replicas(ts, replica_location, predicate, cluster, order); + + match statement_type { + // As an LWT optimisation: in order to reduce contention caused by Paxos conflicts, + // we always try to query replicas in the same order. + StatementType::Lwt => Either::Left(replicas), + StatementType::NonLwt => Either::Right(self.shuffle(replicas)), + } } fn randomly_rotated_nodes(nodes: &[Arc]) -> impl Iterator> { From 105005cf0c3f16105b41b6acd0637474294fdc8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 08:19:45 +0200 Subject: [PATCH 12/16] default_policy: tests: add ExpectedGroup::Ordered In case of executing LWT statements, we expect the policy to return plans in order exactly matching the ring order. To verify this, a new ExpectedGroup variant is added that will be fed with a sequence of nodes put in the expected ring order. --- .../src/transport/load_balancing/default.rs | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 070474603e..10e9f61f85 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -886,6 +886,7 @@ mod tests { enum ExpectedGroup { NonDeterministic(HashSet), Deterministic(HashSet), + Ordered(Vec), } impl ExpectedGroup { @@ -893,13 +894,7 @@ mod tests { match self { Self::NonDeterministic(s) => s.len(), Self::Deterministic(s) => s.len(), - } - } - - fn nodes(&self) -> &HashSet { - match self { - Self::NonDeterministic(s) => s, - Self::Deterministic(s) => s, + Self::Ordered(v) => v.len(), } } } @@ -929,6 +924,13 @@ mod tests { .push(ExpectedGroup::Deterministic(group.into_iter().collect())); self } + /// Expects that the next group in the plan will have a sequence of nodes + /// that is equal to the provided one, including order. + pub(crate) fn ordered(mut self, group: impl IntoIterator) -> Self { + self.groups + .push(ExpectedGroup::Ordered(group.into_iter().collect())); + self + } pub(crate) fn build(self) -> ExpectedGroups { ExpectedGroups { groups: self.groups, @@ -967,11 +969,18 @@ mod tests { // in the actual plan let got_group: Vec<_> = (&mut got).take(expected.len()).copied().collect(); - // Verify that the group has the same nodes as the - // expected one - let got_set: HashSet<_> = got_group.iter().copied().collect(); - let expected_set = expected.nodes(); - assert_eq!(&got_set, expected_set); + match expected { + ExpectedGroup::NonDeterministic(expected_set) + | ExpectedGroup::Deterministic(expected_set) => { + // Verify that the group has the same nodes as the + // expected one + let got_set: HashSet<_> = got_group.iter().copied().collect(); + assert_eq!(&got_set, expected_set); + } + ExpectedGroup::Ordered(sequence) => { + assert_eq!(&got_group, sequence); + } + } // Put the group into sets_of_groups sets_of_groups[group_id].insert(got_group); @@ -991,7 +1000,7 @@ mod tests { assert!(sets.len() > 1); } } - ExpectedGroup::Deterministic(_) => { + ExpectedGroup::Deterministic(_) | ExpectedGroup::Ordered(_) => { // The group is supposed to be deterministic, // i.e. a given instance of the default policy // must always return the nodes within it using From c5f3c9f3149bf32a6d2710a6f7bc9a29d2db26e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 24 May 2023 08:24:59 +0200 Subject: [PATCH 13/16] default_policy: tests: fix typo (going) though->through (the ring) --- .../src/transport/load_balancing/default.rs | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 10e9f61f85..77e991730c 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -1202,7 +1202,7 @@ mod tests { consistency: Consistency::Two, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1227,7 +1227,7 @@ mod tests { consistency: Consistency::Two, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1251,7 +1251,7 @@ mod tests { consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1273,7 +1273,7 @@ mod tests { consistency: Consistency::One, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1295,7 +1295,7 @@ mod tests { consistency: Consistency::Quorum, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1320,7 +1320,7 @@ mod tests { consistency: Consistency::Quorum, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1344,7 +1344,7 @@ mod tests { consistency: Consistency::Quorum, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1366,7 +1366,7 @@ mod tests { consistency: Consistency::Two, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1390,7 +1390,7 @@ mod tests { consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1450,7 +1450,7 @@ mod tests { consistency: Consistency::Quorum, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1472,7 +1472,7 @@ mod tests { consistency: Consistency::Quorum, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new().build(), // empty plan, because all nodes are remote and failover is forbidden @@ -1491,7 +1491,7 @@ mod tests { consistency: Consistency::Quorum, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1513,7 +1513,7 @@ mod tests { consistency: Consistency::Quorum, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1538,7 +1538,7 @@ mod tests { consistency: Consistency::One, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1566,7 +1566,7 @@ mod tests { consistency: Consistency::Two, ..Default::default() }, - // going though the ring, we get order: B , C , E , G , A , F , D + // going through the ring, we get order: B , C , E , G , A , F , D // eu eu us eu eu us us // r1 r1 r1 r2 r1 r2 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -1592,7 +1592,7 @@ mod tests { consistency: Consistency::One, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -2765,7 +2765,7 @@ mod latency_awareness { consistency: Consistency::Quorum, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -2791,7 +2791,7 @@ mod latency_awareness { (C, slow_penalised()), (D, slow_penalised()), ], - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() @@ -2814,7 +2814,7 @@ mod latency_awareness { consistency: Consistency::Quorum, ..Default::default() }, - // going though the ring, we get order: F , A , C , D , G , B , E + // going through the ring, we get order: F , A , C , D , G , B , E // us eu eu us eu eu us // r2 r1 r1 r1 r2 r1 r1 expected_groups: ExpectedGroupsBuilder::new() From e777236f466700d35f95849347556eb8ab8f4918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 09:42:50 +0200 Subject: [PATCH 14/16] default_policy: LWT optimisation unit tests These test that when a LWT statement is executed, the policy returns replicas in the fixed, ring order. Logger is no longer initialised in `test_default_policy_with_token_aware_statements` test not to spam console with (expected) errors about returning empty plan. --- .../src/transport/load_balancing/default.rs | 471 +++++++++++++++++- 1 file changed, 467 insertions(+), 4 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 77e991730c..8cac92d6e5 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -69,7 +69,7 @@ enum StatementType { /// /// It can be configured to be datacenter-aware and token-aware. /// Datacenter failover for queries with non local consistency mode is also supported. -/// Latency awareness is available, although not recommended. +/// Latency awareness is available, althrough not recommended. pub struct DefaultPolicy { preferences: ReplicaLocationPreference, is_token_aware: bool, @@ -959,7 +959,11 @@ mod tests { // First, make sure that `got` has the right number of items, // equal to the sum of sizes of all expected groups let combined_groups_len: usize = self.groups.iter().map(|s| s.len()).sum(); - assert_eq!(got.len(), combined_groups_len); + assert_eq!( + got.len(), + combined_groups_len, + "Plan length different than expected" + ); // Now, split `got` into groups of expected sizes // and just `assert_eq` them @@ -1177,7 +1181,6 @@ mod tests { #[tokio::test] async fn test_default_policy_with_token_aware_statements() { - let _ = tracing_subscriber::fmt::try_init(); use crate::transport::locator::test::{A, B, C, D, E, F, G}; let cluster = mock_cluster_data_for_token_aware_tests().await; @@ -1521,7 +1524,7 @@ mod tests { .group([B, C, E]) // remote nodes .build(), }, - // Keyspace SS with RF=2 with enabled DC failover and rack-awareness + // Keyspace NTS with RF=3 with enabled DC failover and rack-awareness Test { policy: DefaultPolicy { preferences: ReplicaLocationPreference::DatacenterAndRack( @@ -1617,6 +1620,466 @@ mod tests { .await; } } + + #[tokio::test] + async fn test_default_policy_with_lwt_statements() { + use crate::transport::locator::test::{A, B, C, D, E, F, G}; + + let cluster = mock_cluster_data_for_token_aware_tests().await; + struct Test<'a> { + policy: DefaultPolicy, + routing_info: RoutingInfo<'a>, + expected_groups: ExpectedGroups, + } + + let tests = [ + // Keyspace NTS with RF=2 with enabled DC failover + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: true, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_2), + consistency: Consistency::Two, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([A, G]) // pick + fallback local replicas + .ordered([F, D]) // remote replicas + .group([C, B]) // local nodes + .group([E]) // remote nodes + .build(), + }, + // Keyspace NTS with RF=2 with enabled DC failover, shuffling replicas disabled + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: true, + fixed_shuffle_seed: Some(123), + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_2), + consistency: Consistency::Two, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([A, G]) // pick + fallback local replicas + .ordered([F, D]) // remote replicas + .group([C, B]) // local nodes + .group([E]) // remote nodes + .build(), + }, + // Keyspace NTS with RF=2 with DC failover forbidden by local Consistency + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: true, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_2), + consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([A, G]) // pick + fallback local replicas + .group([C, B]) // local nodes + .build(), // failover is forbidden by local Consistency + }, + // Keyspace NTS with RF=2 with explicitly disabled DC failover + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: false, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_2), + consistency: Consistency::One, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([A, G]) // pick + fallback local replicas + .group([C, B]) // local nodes + .build(), // failover is explicitly forbidden + }, + // Keyspace NTS with RF=3 with enabled DC failover + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: true, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_3), + consistency: Consistency::Quorum, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([A, C, G]) // pick + fallback local replicas + .ordered([F, D, E]) // remote replicas + .group([B]) // local nodes + .group([]) // remote nodes + .build(), + }, + // Keyspace NTS with RF=3 with enabled DC failover, shuffling replicas disabled + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: true, + fixed_shuffle_seed: Some(123), + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_3), + consistency: Consistency::Quorum, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([A, C, G]) // pick + fallback local replicas + .ordered([F, D, E]) // remote replicas + .group([B]) // local nodes + .group([]) // remote nodes + .build(), + }, + // Keyspace NTS with RF=3 with disabled DC failover + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: false, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_3), + consistency: Consistency::Quorum, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([A, C, G]) // pick + fallback local replicas + .group([B]) // local nodes + .build(), // failover explicitly forbidden + }, + // Keyspace SS with RF=2 with enabled DC failover + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: true, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_SS_RF_2), + consistency: Consistency::Two, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([A]) // pick + fallback local replicas + .ordered([F]) // remote replicas + .group([C, G, B]) // local nodes + .group([D, E]) // remote nodes + .build(), + }, + // Keyspace SS with RF=2 with DC failover forbidden by local Consistency + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: true, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_SS_RF_2), + consistency: Consistency::LocalOne, // local Consistency forbids datacenter failover + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([A]) // pick + fallback local replicas + .group([C, G, B]) // local nodes + .build(), // failover is forbidden by local Consistency + }, + // No token implies no token awareness + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: true, + ..Default::default() + }, + routing_info: RoutingInfo { + token: None, // no token + keyspace: Some(KEYSPACE_NTS_RF_3), + consistency: Consistency::Quorum, + is_confirmed_lwt: true, + ..Default::default() + }, + expected_groups: ExpectedGroupsBuilder::new() + .group([A, B, C, G]) // local nodes + .group([D, E, F]) // remote nodes + .build(), + }, + // No keyspace implies no token awareness + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("eu".to_owned()), + is_token_aware: true, + permit_dc_failover: true, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: None, // no keyspace + consistency: Consistency::Quorum, + is_confirmed_lwt: true, + ..Default::default() + }, + expected_groups: ExpectedGroupsBuilder::new() + .group([A, B, C, G]) // local nodes + .group([D, E, F]) // remote nodes + .build(), + }, + // Unknown preferred DC, failover permitted + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("au".to_owned()), + is_token_aware: true, + permit_dc_failover: true, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_2), + consistency: Consistency::Quorum, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([F, A, D, G]) // remote replicas + .group([B, C, E]) // remote nodes + .build(), + }, + // Unknown preferred DC, failover forbidden + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Datacenter("au".to_owned()), + is_token_aware: true, + permit_dc_failover: false, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_2), + consistency: Consistency::Quorum, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new().build(), // empty plan, because all nodes are remote and failover is forbidden + }, + // No preferred DC, failover permitted + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Any, + is_token_aware: true, + permit_dc_failover: true, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_2), + consistency: Consistency::Quorum, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([F, A, D, G]) // remote replicas + .group([B, C, E]) // remote nodes + .build(), + }, + // No preferred DC, failover forbidden + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::Any, + is_token_aware: true, + permit_dc_failover: false, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_2), + consistency: Consistency::Quorum, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([F, A, D, G]) // remote replicas + .group([B, C, E]) // remote nodes + .build(), + }, + // Keyspace NTS with RF=3 with enabled DC failover and rack-awareness + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::DatacenterAndRack( + "eu".to_owned(), + "r1".to_owned(), + ), + is_token_aware: true, + permit_dc_failover: true, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_NTS_RF_3), + consistency: Consistency::One, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([A, C]) // pick local rack replicas + .ordered([G]) // local DC replicas + .ordered([F, D, E]) // remote replicas + .group([B]) // local nodes + .build(), + }, + // Keyspace SS with RF=2 with enabled rack-awareness, shuffling replicas disabled + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::DatacenterAndRack( + "eu".to_owned(), + "r1".to_owned(), + ), + is_token_aware: true, + permit_dc_failover: false, + fixed_shuffle_seed: Some(123), + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 760 }), + keyspace: Some(KEYSPACE_SS_RF_2), + consistency: Consistency::Two, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: G , B , A , E , F , C , D + // eu eu eu us us eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .ordered([B]) // pick local rack replicas + .ordered([G]) // local DC replicas + .group([A, C]) // local nodes + .build(), + }, + // Keyspace SS with RF=2 with enabled rack-awareness and no local-rack replica + Test { + policy: DefaultPolicy { + preferences: ReplicaLocationPreference::DatacenterAndRack( + "eu".to_owned(), + "r2".to_owned(), + ), + is_token_aware: true, + permit_dc_failover: false, + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token { value: 160 }), + keyspace: Some(KEYSPACE_SS_RF_2), + consistency: Consistency::One, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + .group([A]) // pick local DC + .group([C, G, B]) // local nodes + .build(), + }, + ]; + + for Test { + policy, + routing_info, + expected_groups, + } in tests + { + test_default_policy_with_given_cluster_and_routing_info( + &policy, + &cluster, + &routing_info, + &expected_groups, + ) + .await; + } + } } mod latency_awareness { From 475be40ad20afcc35cd50f48519802dda826ecbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Wed, 26 Apr 2023 08:29:03 +0200 Subject: [PATCH 15/16] enable LWT integration test once again --- scylla/tests/integration/lwt_optimisation.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/scylla/tests/integration/lwt_optimisation.rs b/scylla/tests/integration/lwt_optimisation.rs index cd1871f11d..c25581c09b 100644 --- a/scylla/tests/integration/lwt_optimisation.rs +++ b/scylla/tests/integration/lwt_optimisation.rs @@ -12,13 +12,11 @@ use scylla_proxy::{ ResponseOpcode, ResponseReaction, ResponseRule, ShardAwareness, WorkerError, }; -// FIXME: This will only work again after refactor of load balancing is finished. -#[ignore] #[tokio::test] #[ntest::timeout(20000)] #[cfg(not(scylla_cloud_tests))] async fn if_lwt_optimisation_mark_offered_then_negotiatied_and_lwt_routed_optimally() { - // This is just to increase the likelyhood that only intended prepared statements (which contain this mark) are captures by the proxy. + // This is just to increase the likelihood that only intended prepared statements (which contain this mark) are captured by the proxy. const MAGIC_MARK: i32 = 123; let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move { From ecb4a9feb66d888ee66b6229719a8c08ada63fab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 27 Apr 2023 17:15:01 +0200 Subject: [PATCH 16/16] docs: mention LWT optimisation in default policy --- docs/source/load-balancing/default-policy.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/source/load-balancing/default-policy.md b/docs/source/load-balancing/default-policy.md index 78dbfa4ec4..340c2d151d 100644 --- a/docs/source/load-balancing/default-policy.md +++ b/docs/source/load-balancing/default-policy.md @@ -166,5 +166,11 @@ And only if latency awareness is enabled: If no preferred datacenter is specified, all nodes are treated as local ones. -Replicas in the same priority groups are shuffled. Non-replicas are randomly +Replicas in the same priority groups are shuffled[^*]. Non-replicas are randomly rotated (similarly to a round robin with a random index). + +[^*]: There is an optimisation implemented for LWT requests[^**] that routes them +to the replicas in the ring order (as it prevents contention due to Paxos conflicts), +so replicas in that case are not shuffled in groups at all. + +[^**]: In order for the optimisation to be applied, LWT statements must be prepared before.