From 071ea122c49457a21a34833123d9b17bf01e8b78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= <59482568+wprzytula@users.noreply.github.com> Date: Thu, 11 Jul 2024 14:02:37 +0200 Subject: [PATCH] Merge pull request #1037 from wprzytula/lwt-routing-bugfix LWT and rack-aware routing bugfixes (cherry picked from commit c01ad2bbd3ea6b386b0f97ae4d51b14835dda199) --- .../src/transport/load_balancing/default.rs | 169 +++++++++++++++--- scylla/src/transport/locator/mod.rs | 11 +- 2 files changed, 152 insertions(+), 28 deletions(-) diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 4280c855fa..46aa282992 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -70,6 +70,16 @@ enum StatementType { NonLwt, } +/// A result of `pick_replica`. +enum PickedReplica<'a> { + /// A replica that could be computed cheaply. + Computed((NodeRef<'a>, Shard)), + + /// A replica that could not be computed cheaply. `pick` should therefore return None + /// and `fallback` will then return that replica as the first in the iterator. + ToBeComputedInFallback, +} + /// The default load balancing policy. /// /// It can be configured to be datacenter-aware and token-aware. @@ -137,8 +147,14 @@ or refrain from preferring datacenters (which may ban all other datacenters, if table_spec, ); - if let Some((alive_local_rack_replica, shard)) = local_rack_picked { - return Some((alive_local_rack_replica, Some(shard))); + if let Some(picked) = local_rack_picked { + return match picked { + PickedReplica::Computed((alive_local_rack_replica, shard)) => { + Some((alive_local_rack_replica, Some(shard))) + } + // Let call to fallback() compute the replica, because it requires allocation. + PickedReplica::ToBeComputedInFallback => None, + }; } } @@ -155,8 +171,14 @@ or refrain from preferring datacenters (which may ban all other datacenters, if table_spec, ); - if let Some((alive_local_replica, shard)) = picked { - return Some((alive_local_replica, Some(shard))); + if let Some(picked) = picked { + return match picked { + PickedReplica::Computed((alive_local_replica, shard)) => { + Some((alive_local_replica, Some(shard))) + } + // Let call to fallback() compute the replica, because it requires allocation. + PickedReplica::ToBeComputedInFallback => None, + }; } } @@ -173,8 +195,14 @@ or refrain from preferring datacenters (which may ban all other datacenters, if statement_type, table_spec, ); - if let Some((alive_remote_replica, shard)) = picked { - return Some((alive_remote_replica, Some(shard))); + if let Some(picked) = picked { + return match picked { + PickedReplica::Computed((alive_remote_replica, shard)) => { + Some((alive_remote_replica, Some(shard))) + } + // Let call to fallback() compute the replica, because it requires allocation. + PickedReplica::ToBeComputedInFallback => None, + }; } } }; @@ -313,7 +341,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if let maybe_local_rack_nodes = if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { let rack_predicate = Self::make_rack_predicate( - |node| (self.pick_predicate)(node, None), + |node| Self::is_alive(node, None), NodeLocationCriteria::DatacenterAndRack(dc, rack), ); Either::Left( @@ -540,14 +568,14 @@ impl DefaultPolicy { cluster: &'a ClusterData, statement_type: StatementType, table_spec: &TableSpec, - ) -> Option<(NodeRef<'a>, Shard)> { + ) -> Option { match statement_type { StatementType::Lwt => { self.pick_first_replica(ts, replica_location, predicate, cluster, table_spec) } - StatementType::NonLwt => { - self.pick_random_replica(ts, replica_location, predicate, cluster, table_spec) - } + StatementType::NonLwt => self + .pick_random_replica(ts, replica_location, predicate, cluster, table_spec) + .map(PickedReplica::Computed), } } @@ -562,7 +590,8 @@ impl DefaultPolicy { // // If no DC/rack preferences are set, then the only possible replica to be returned // (due to expensive computation of the others, and we avoid expensive computation in `pick()`) - // is the primary replica. It is returned **iff** it satisfies the predicate, else None. + // is the primary replica. If it exists, Some is returned, with either Computed(primary_replica) + // **iff** it satisfies the predicate or ToBeComputedInFallback otherwise. fn pick_first_replica<'a>( &'a self, ts: &TokenWithStrategy<'a>, @@ -570,30 +599,34 @@ impl DefaultPolicy { predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a, cluster: &'a ClusterData, table_spec: &TableSpec, - ) -> Option<(NodeRef<'a>, Shard)> { + ) -> Option { match replica_location { NodeLocationCriteria::Any => { // ReplicaSet returned by ReplicaLocator for this case: - // 1) can be precomputed and lated used cheaply, + // 1) can be precomputed and later used cheaply, // 2) returns replicas in the **non-ring order** (this because ReplicaSet chains // ring-ordered replicas sequences from different DCs, thus not preserving // the global ring order). // Because of 2), we can't use a precomputed ReplicaSet, but instead we need ReplicasOrdered. // As ReplicasOrdered can compute cheaply only the primary global replica // (computation of the remaining ones is expensive), in case that the primary replica - // does not satisfy the `predicate`, None is returned. All expensive computation - // is to be done only when `fallback()` is called. + // does not satisfy the `predicate`, ToBeComputedInFallback is returned. + // All expensive computation is to be done only when `fallback()` is called. self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec) .into_replicas_ordered() .into_iter() .next() - .and_then(|(primary_replica, shard)| { - predicate(primary_replica, shard).then_some((primary_replica, shard)) + .map(|(primary_replica, shard)| { + if predicate(primary_replica, shard) { + PickedReplica::Computed((primary_replica, shard)) + } else { + PickedReplica::ToBeComputedInFallback + } }) } NodeLocationCriteria::Datacenter(_) | NodeLocationCriteria::DatacenterAndRack(_, _) => { // ReplicaSet returned by ReplicaLocator for this case: - // 1) can be precomputed and lated used cheaply, + // 1) can be precomputed and later used cheaply, // 2) returns replicas in the ring order (this is not true for the case // when multiple DCs are allowed, because ReplicaSet chains replicas sequences // from different DCs, thus not preserving the global ring order) @@ -606,6 +639,7 @@ impl DefaultPolicy { table_spec, ) .next() + .map(PickedReplica::Computed) } } } @@ -959,6 +993,8 @@ impl<'a> TokenWithStrategy<'a> { #[cfg(test)] mod tests { + use std::collections::HashMap; + use scylla_cql::{frame::types::SerialConsistency, Consistency}; use tracing::info; @@ -966,7 +1002,12 @@ mod tests { get_plan_and_collect_node_identifiers, mock_cluster_data_for_token_unaware_tests, ExpectedGroups, ExpectedGroupsBuilder, }; - use crate::transport::locator::test::{TABLE_NTS_RF_2, TABLE_NTS_RF_3, TABLE_SS_RF_2}; + use crate::host_filter::HostFilter; + use crate::transport::locator::tablets::TabletsInfo; + use crate::transport::locator::test::{ + id_to_invalid_addr, mock_metadata_for_token_aware_tests, TABLE_NTS_RF_2, TABLE_NTS_RF_3, + TABLE_SS_RF_2, + }; use crate::{ load_balancing::{ default::tests::framework::mock_cluster_data_for_token_aware_tests, Plan, RoutingInfo, @@ -1078,7 +1119,9 @@ mod tests { assert_eq!( got.len(), combined_groups_len, - "Plan length different than expected" + "Plan length different than expected. Got plan {:?}, expected groups {:?}", + got, + self.groups, ); // Now, split `got` into groups of expected sizes @@ -1095,10 +1138,10 @@ mod tests { // Verify that the group has the same nodes as the // expected one let got_set: HashSet<_> = got_group.iter().copied().collect(); - assert_eq!(&got_set, expected_set); + assert_eq!(&got_set, expected_set, "Unordered group mismatch"); } ExpectedGroup::Ordered(sequence) => { - assert_eq!(&got_group, sequence); + assert_eq!(&got_group, sequence, "Ordered group mismatch"); } } @@ -1117,7 +1160,11 @@ mod tests { // then expect there to be more than one group // in the set. if gots.len() > 1 && s.len() > 1 { - assert!(sets.len() > 1); + assert!( + sets.len() > 1, + "Group {:?} is expected to be nondeterministic, but it appears to be deterministic", + expected + ); } } ExpectedGroup::Deterministic(_) | ExpectedGroup::Ordered(_) => { @@ -1127,7 +1174,12 @@ mod tests { // the same order. // There will only be one, unique ordering shared // by all plans - check this - assert_eq!(sets.len(), 1); + assert_eq!( + sets.len(), + 1, + "Group {:?} is expected to be deterministic, but it appears to be nondeterministic", + expected + ); } } } @@ -2261,6 +2313,73 @@ mod tests { ) .await; } + + let cluster_with_disabled_node_f = ClusterData::new( + mock_metadata_for_token_aware_tests(), + &Default::default(), + &HashMap::new(), + &None, + { + struct FHostFilter; + impl HostFilter for FHostFilter { + fn accept(&self, peer: &crate::transport::topology::Peer) -> bool { + peer.address != id_to_invalid_addr(F) + } + } + + Some(&FHostFilter) + }, + TabletsInfo::new(), + ) + .await; + + let tests_with_disabled_node_f = [ + // Keyspace NTS with RF=3 without preferred DC. + // The primary replica does not satisfy the predicate (being disabled by HostFilter), + // so pick() should return None and fallback should return A first. + // + // This is a regression test after a bug was fixed. + Test { + policy: DefaultPolicy { + preferences: NodeLocationPreference::Any, + is_token_aware: true, + permit_dc_failover: true, + pick_predicate: Box::new(|node, _shard| node.address != id_to_invalid_addr(F)), + ..Default::default() + }, + routing_info: RoutingInfo { + token: Some(Token::new(160)), + table: Some(TABLE_NTS_RF_3), + consistency: Consistency::One, + is_confirmed_lwt: true, + ..Default::default() + }, + // going through the ring, we get order: F , A , C , D , G , B , E + // us eu eu us eu eu us + // r2 r1 r1 r1 r2 r1 r1 + expected_groups: ExpectedGroupsBuilder::new() + // pick is empty, because the primary replica does not satisfy pick predicate, + // and with LWT we cannot compute other replicas for NTS without allocations. + .ordered([A, C, D, G, E]) // replicas + .group([B]) // nodes + .build(), + }, + ]; + + for Test { + policy, + routing_info, + expected_groups, + } in tests_with_disabled_node_f + { + test_default_policy_with_given_cluster_and_routing_info( + &policy, + &cluster_with_disabled_node_f, + &routing_info, + &expected_groups, + ) + .await; + } } } diff --git a/scylla/src/transport/locator/mod.rs b/scylla/src/transport/locator/mod.rs index 64b4dc9fa0..e0f06e8ba2 100644 --- a/scylla/src/transport/locator/mod.rs +++ b/scylla/src/transport/locator/mod.rs @@ -472,19 +472,23 @@ impl<'a> IntoIterator for ReplicaSet<'a> { } enum ReplicaSetIteratorInner<'a> { + /// Token ring with SimpleStrategy, any datacenter Plain { replicas: ReplicasArray<'a>, idx: usize, }, + /// Tablets PlainSharded { replicas: &'a [(Arc, Shard)], idx: usize, }, + /// Token ring with SimpleStrategy, specific datacenter FilteredSimple { replicas: ReplicasArray<'a>, datacenter: &'a str, idx: usize, }, + /// Token ring with NetworkTopologyStrategy ChainedNTS { replicas: ReplicasArray<'a>, replicas_idx: usize, @@ -637,8 +641,8 @@ impl<'a> ReplicaSet<'a> { /// or it must compute them on-demand (in case of NetworkTopologyStrategy). /// The computation is lazy (performed by `ReplicasOrderedIterator` upon call to `next()`). /// For obtaining the primary replica, no allocations are needed. Therefore, the first call -/// to `next()` is optimised and doesn not allocate. -/// For the remaining others, unfortunately, allocation is unevitable. +/// to `next()` is optimised and does not allocate. +/// For the remaining others, unfortunately, allocation is inevitable. pub struct ReplicasOrdered<'a> { replica_set: ReplicaSet<'a>, } @@ -650,7 +654,8 @@ pub struct ReplicasOrderedIterator<'a> { enum ReplicasOrderedIteratorInner<'a> { AlreadyRingOrdered { - // In case of Plain and FilteredSimple variants, ReplicaSetIterator respects ring order. + // In case of Plain, PlainSharded and FilteredSimple variants, + // ReplicaSetIterator respects ring order. replica_set_iter: ReplicaSetIterator<'a>, }, PolyDatacenterNTS {