Skip to content

Commit

Permalink
Merge pull request #1037 from wprzytula/lwt-routing-bugfix
Browse files Browse the repository at this point in the history
LWT and rack-aware routing bugfixes

(cherry picked from commit c01ad2b)
  • Loading branch information
wprzytula committed Jul 11, 2024
1 parent 8dd3bb7 commit f12620e
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 28 deletions.
169 changes: 144 additions & 25 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ enum StatementType {
NonLwt,
}

/// A result of `pick_replica`.
enum PickedReplica<'a> {
/// A replica that could be computed cheaply.
Computed((NodeRef<'a>, Shard)),

/// A replica that could not be computed cheaply. `pick` should therefore return None
/// and `fallback` will then return that replica as the first in the iterator.
ToBeComputedInFallback,
}

/// The default load balancing policy.
///
/// It can be configured to be datacenter-aware and token-aware.
Expand Down Expand Up @@ -137,8 +147,14 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
table_spec,
);

if let Some((alive_local_rack_replica, shard)) = local_rack_picked {
return Some((alive_local_rack_replica, Some(shard)));
if let Some(picked) = local_rack_picked {
return match picked {
PickedReplica::Computed((alive_local_rack_replica, shard)) => {
Some((alive_local_rack_replica, Some(shard)))
}
// Let call to fallback() compute the replica, because it requires allocation.
PickedReplica::ToBeComputedInFallback => None,
};
}
}

Expand All @@ -155,8 +171,14 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
table_spec,
);

if let Some((alive_local_replica, shard)) = picked {
return Some((alive_local_replica, Some(shard)));
if let Some(picked) = picked {
return match picked {
PickedReplica::Computed((alive_local_replica, shard)) => {
Some((alive_local_replica, Some(shard)))
}
// Let call to fallback() compute the replica, because it requires allocation.
PickedReplica::ToBeComputedInFallback => None,
};
}
}

Expand All @@ -173,8 +195,14 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
statement_type,
table_spec,
);
if let Some((alive_remote_replica, shard)) = picked {
return Some((alive_remote_replica, Some(shard)));
if let Some(picked) = picked {
return match picked {
PickedReplica::Computed((alive_remote_replica, shard)) => {
Some((alive_remote_replica, Some(shard)))
}
// Let call to fallback() compute the replica, because it requires allocation.
PickedReplica::ToBeComputedInFallback => None,
};
}
}
};
Expand Down Expand Up @@ -313,7 +341,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
let maybe_local_rack_nodes =
if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences {
let rack_predicate = Self::make_rack_predicate(
|node| (self.pick_predicate)(node, None),
|node| Self::is_alive(node, None),
NodeLocationCriteria::DatacenterAndRack(dc, rack),
);
Either::Left(
Expand Down Expand Up @@ -540,14 +568,14 @@ impl DefaultPolicy {
cluster: &'a ClusterData,
statement_type: StatementType,
table_spec: &TableSpec,
) -> Option<(NodeRef<'a>, Shard)> {
) -> Option<PickedReplica> {
match statement_type {
StatementType::Lwt => {
self.pick_first_replica(ts, replica_location, predicate, cluster, table_spec)
}
StatementType::NonLwt => {
self.pick_random_replica(ts, replica_location, predicate, cluster, table_spec)
}
StatementType::NonLwt => self
.pick_random_replica(ts, replica_location, predicate, cluster, table_spec)
.map(PickedReplica::Computed),
}
}

Expand All @@ -562,38 +590,43 @@ impl DefaultPolicy {
//
// If no DC/rack preferences are set, then the only possible replica to be returned
// (due to expensive computation of the others, and we avoid expensive computation in `pick()`)
// is the primary replica. It is returned **iff** it satisfies the predicate, else None.
// is the primary replica. If it exists, Some is returned, with either Computed(primary_replica)
// **iff** it satisfies the predicate or ToBeComputedInFallback otherwise.
fn pick_first_replica<'a>(
&'a self,
ts: &TokenWithStrategy<'a>,
replica_location: NodeLocationCriteria<'a>,
predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
cluster: &'a ClusterData,
table_spec: &TableSpec,
) -> Option<(NodeRef<'a>, Shard)> {
) -> Option<PickedReplica> {
match replica_location {
NodeLocationCriteria::Any => {
// ReplicaSet returned by ReplicaLocator for this case:
// 1) can be precomputed and lated used cheaply,
// 1) can be precomputed and later used cheaply,
// 2) returns replicas in the **non-ring order** (this because ReplicaSet chains
// ring-ordered replicas sequences from different DCs, thus not preserving
// the global ring order).
// Because of 2), we can't use a precomputed ReplicaSet, but instead we need ReplicasOrdered.
// As ReplicasOrdered can compute cheaply only the primary global replica
// (computation of the remaining ones is expensive), in case that the primary replica
// does not satisfy the `predicate`, None is returned. All expensive computation
// is to be done only when `fallback()` is called.
// does not satisfy the `predicate`, ToBeComputedInFallback is returned.
// All expensive computation is to be done only when `fallback()` is called.
self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec)
.into_replicas_ordered()
.into_iter()
.next()
.and_then(|(primary_replica, shard)| {
predicate(primary_replica, shard).then_some((primary_replica, shard))
.map(|(primary_replica, shard)| {
if predicate(primary_replica, shard) {
PickedReplica::Computed((primary_replica, shard))
} else {
PickedReplica::ToBeComputedInFallback
}
})
}
NodeLocationCriteria::Datacenter(_) | NodeLocationCriteria::DatacenterAndRack(_, _) => {
// ReplicaSet returned by ReplicaLocator for this case:
// 1) can be precomputed and lated used cheaply,
// 1) can be precomputed and later used cheaply,
// 2) returns replicas in the ring order (this is not true for the case
// when multiple DCs are allowed, because ReplicaSet chains replicas sequences
// from different DCs, thus not preserving the global ring order)
Expand All @@ -606,6 +639,7 @@ impl DefaultPolicy {
table_spec,
)
.next()
.map(PickedReplica::Computed)
}
}
}
Expand Down Expand Up @@ -959,14 +993,21 @@ impl<'a> TokenWithStrategy<'a> {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use scylla_cql::{frame::types::SerialConsistency, Consistency};
use tracing::info;

use self::framework::{
get_plan_and_collect_node_identifiers, mock_cluster_data_for_token_unaware_tests,
ExpectedGroups, ExpectedGroupsBuilder,
};
use crate::transport::locator::test::{TABLE_NTS_RF_2, TABLE_NTS_RF_3, TABLE_SS_RF_2};
use crate::host_filter::HostFilter;
use crate::transport::locator::tablets::TabletsInfo;
use crate::transport::locator::test::{
id_to_invalid_addr, mock_metadata_for_token_aware_tests, TABLE_NTS_RF_2, TABLE_NTS_RF_3,
TABLE_SS_RF_2,
};
use crate::{
load_balancing::{
default::tests::framework::mock_cluster_data_for_token_aware_tests, Plan, RoutingInfo,
Expand Down Expand Up @@ -1078,7 +1119,9 @@ mod tests {
assert_eq!(
got.len(),
combined_groups_len,
"Plan length different than expected"
"Plan length different than expected. Got plan {:?}, expected groups {:?}",
got,
self.groups,
);

// Now, split `got` into groups of expected sizes
Expand All @@ -1095,10 +1138,10 @@ mod tests {
// Verify that the group has the same nodes as the
// expected one
let got_set: HashSet<_> = got_group.iter().copied().collect();
assert_eq!(&got_set, expected_set);
assert_eq!(&got_set, expected_set, "Unordered group mismatch");
}
ExpectedGroup::Ordered(sequence) => {
assert_eq!(&got_group, sequence);
assert_eq!(&got_group, sequence, "Ordered group mismatch");
}
}

Expand All @@ -1117,7 +1160,11 @@ mod tests {
// then expect there to be more than one group
// in the set.
if gots.len() > 1 && s.len() > 1 {
assert!(sets.len() > 1);
assert!(
sets.len() > 1,
"Group {:?} is expected to be nondeterministic, but it appears to be deterministic",
expected
);
}
}
ExpectedGroup::Deterministic(_) | ExpectedGroup::Ordered(_) => {
Expand All @@ -1127,7 +1174,12 @@ mod tests {
// the same order.
// There will only be one, unique ordering shared
// by all plans - check this
assert_eq!(sets.len(), 1);
assert_eq!(
sets.len(),
1,
"Group {:?} is expected to be deterministic, but it appears to be nondeterministic",
expected
);
}
}
}
Expand Down Expand Up @@ -2261,6 +2313,73 @@ mod tests {
)
.await;
}

let cluster_with_disabled_node_f = ClusterData::new(
mock_metadata_for_token_aware_tests(),
&Default::default(),
&HashMap::new(),
&None,
{
struct FHostFilter;
impl HostFilter for FHostFilter {
fn accept(&self, peer: &crate::transport::topology::Peer) -> bool {
peer.address != id_to_invalid_addr(F)
}
}

Some(&FHostFilter)
},
TabletsInfo::new(),
)
.await;

let tests_with_disabled_node_f = [
// Keyspace NTS with RF=3 without preferred DC.
// The primary replica does not satisfy the predicate (being disabled by HostFilter),
// so pick() should return None and fallback should return A first.
//
// This is a regression test after a bug was fixed.
Test {
policy: DefaultPolicy {
preferences: NodeLocationPreference::Any,
is_token_aware: true,
permit_dc_failover: true,
pick_predicate: Box::new(|node, _shard| node.address != id_to_invalid_addr(F)),
..Default::default()
},
routing_info: RoutingInfo {
token: Some(Token::new(160)),
table: Some(TABLE_NTS_RF_3),
consistency: Consistency::One,
is_confirmed_lwt: true,
..Default::default()
},
// going through the ring, we get order: F , A , C , D , G , B , E
// us eu eu us eu eu us
// r2 r1 r1 r1 r2 r1 r1
expected_groups: ExpectedGroupsBuilder::new()
// pick is empty, because the primary replica does not satisfy pick predicate,
// and with LWT we cannot compute other replicas for NTS without allocations.
.ordered([A, C, D, G, E]) // replicas
.group([B]) // nodes
.build(),
},
];

for Test {
policy,
routing_info,
expected_groups,
} in tests_with_disabled_node_f
{
test_default_policy_with_given_cluster_and_routing_info(
&policy,
&cluster_with_disabled_node_f,
&routing_info,
&expected_groups,
)
.await;
}
}
}

Expand Down
11 changes: 8 additions & 3 deletions scylla/src/transport/locator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,19 +472,23 @@ impl<'a> IntoIterator for ReplicaSet<'a> {
}

enum ReplicaSetIteratorInner<'a> {
/// Token ring with SimpleStrategy, any datacenter
Plain {
replicas: ReplicasArray<'a>,
idx: usize,
},
/// Tablets
PlainSharded {
replicas: &'a [(Arc<Node>, Shard)],
idx: usize,
},
/// Token ring with SimpleStrategy, specific datacenter
FilteredSimple {
replicas: ReplicasArray<'a>,
datacenter: &'a str,
idx: usize,
},
/// Token ring with NetworkTopologyStrategy
ChainedNTS {
replicas: ReplicasArray<'a>,
replicas_idx: usize,
Expand Down Expand Up @@ -637,8 +641,8 @@ impl<'a> ReplicaSet<'a> {
/// or it must compute them on-demand (in case of NetworkTopologyStrategy).
/// The computation is lazy (performed by `ReplicasOrderedIterator` upon call to `next()`).
/// For obtaining the primary replica, no allocations are needed. Therefore, the first call
/// to `next()` is optimised and doesn not allocate.
/// For the remaining others, unfortunately, allocation is unevitable.
/// to `next()` is optimised and does not allocate.
/// For the remaining others, unfortunately, allocation is inevitable.
pub struct ReplicasOrdered<'a> {
replica_set: ReplicaSet<'a>,
}
Expand All @@ -650,7 +654,8 @@ pub struct ReplicasOrderedIterator<'a> {

enum ReplicasOrderedIteratorInner<'a> {
AlreadyRingOrdered {
// In case of Plain and FilteredSimple variants, ReplicaSetIterator respects ring order.
// In case of Plain, PlainSharded and FilteredSimple variants,
// ReplicaSetIterator respects ring order.
replica_set_iter: ReplicaSetIterator<'a>,
},
PolyDatacenterNTS {
Expand Down

0 comments on commit f12620e

Please sign in to comment.