Skip to content

Commit

Permalink
default_lb: fix LWT routing bug
Browse files Browse the repository at this point in the history
There is a special logic in default policy that handles the case when
computing the first acceptable replica (i.e. such replica that it
satisfies `pick_predicate`) is expensive (i.e. requires allocation).
That logic was to make `pick` return None, because then `fallback`
would allocate and compute all acceptable replicas. Unfortunately,
the logic contained a bug that made `pick` continue execution instead
of returning None, leading to a non-necessarily-replica being returned.
This would break the LWT optimisation, because in case that the primary
replica is filtered out by LB (e.g. it's down, or disabled by
HostFilter), the second replica should be targeted instead,
deterministically.

The fix involves creating an enum to distinguish between three
scenarios:
1. No replica exists that could satisfy the pick predicate -> None;
2. The primary replica satisfies the pick predicate ->
   Some(Computed(primary_replica));
3. The primary replica does not satisfy the pick predicate, but it's
   possible that another replica does -> Some(ToBeComputedInFallback).

Before the fix, the third scenario would merge with the first,
leading to incorrect behaviour of not returning None from `pick`.
  • Loading branch information
wprzytula committed Jul 11, 2024
1 parent d77990c commit 37971c5
Showing 1 changed file with 52 additions and 18 deletions.
70 changes: 52 additions & 18 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ enum StatementType {
NonLwt,
}

/// A result of `pick_replica`.
enum PickedReplica<'a> {
/// A replica that could be computed cheaply.
Computed((NodeRef<'a>, Shard)),

/// A replica that could not be computed cheaply. `pick` should therefore return None
/// and `fallback` will then return that replica as the first in the iterator.
ToBeComputedInFallback,
}

/// The default load balancing policy.
///
/// It can be configured to be datacenter-aware and token-aware.
Expand Down Expand Up @@ -137,8 +147,14 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
table_spec,
);

if let Some((alive_local_rack_replica, shard)) = local_rack_picked {
return Some((alive_local_rack_replica, Some(shard)));
if let Some(picked) = local_rack_picked {
return match picked {
PickedReplica::Computed((alive_local_rack_replica, shard)) => {
Some((alive_local_rack_replica, Some(shard)))
}
// Let call to fallback() compute the replica, because it requires allocation.
PickedReplica::ToBeComputedInFallback => None,
};
}
}

Expand All @@ -155,8 +171,14 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
table_spec,
);

if let Some((alive_local_replica, shard)) = picked {
return Some((alive_local_replica, Some(shard)));
if let Some(picked) = picked {
return match picked {
PickedReplica::Computed((alive_local_replica, shard)) => {
Some((alive_local_replica, Some(shard)))
}
// Let call to fallback() compute the replica, because it requires allocation.
PickedReplica::ToBeComputedInFallback => None,
};
}
}

Expand All @@ -173,8 +195,14 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
statement_type,
table_spec,
);
if let Some((alive_remote_replica, shard)) = picked {
return Some((alive_remote_replica, Some(shard)));
if let Some(picked) = picked {
return match picked {
PickedReplica::Computed((alive_remote_replica, shard)) => {
Some((alive_remote_replica, Some(shard)))
}
// Let call to fallback() compute the replica, because it requires allocation.
PickedReplica::ToBeComputedInFallback => None,
};
}
}
};
Expand Down Expand Up @@ -540,14 +568,14 @@ impl DefaultPolicy {
cluster: &'a ClusterData,
statement_type: StatementType,
table_spec: &TableSpec,
) -> Option<(NodeRef<'a>, Shard)> {
) -> Option<PickedReplica> {
match statement_type {
StatementType::Lwt => {
self.pick_first_replica(ts, replica_location, predicate, cluster, table_spec)
}
StatementType::NonLwt => {
self.pick_random_replica(ts, replica_location, predicate, cluster, table_spec)
}
StatementType::NonLwt => self
.pick_random_replica(ts, replica_location, predicate, cluster, table_spec)
.map(PickedReplica::Computed),
}
}

Expand All @@ -562,38 +590,43 @@ impl DefaultPolicy {
//
// If no DC/rack preferences are set, then the only possible replica to be returned
// (due to expensive computation of the others, and we avoid expensive computation in `pick()`)
// is the primary replica. It is returned **iff** it satisfies the predicate, else None.
// is the primary replica. If it exists, Some is returned, with either Computed(primary_replica)
// **iff** it satisfies the predicate or ToBeComputedInFallback otherwise.
fn pick_first_replica<'a>(
&'a self,
ts: &TokenWithStrategy<'a>,
replica_location: NodeLocationCriteria<'a>,
predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
cluster: &'a ClusterData,
table_spec: &TableSpec,
) -> Option<(NodeRef<'a>, Shard)> {
) -> Option<PickedReplica> {
match replica_location {
NodeLocationCriteria::Any => {
// ReplicaSet returned by ReplicaLocator for this case:
// 1) can be precomputed and lated used cheaply,
// 1) can be precomputed and later used cheaply,
// 2) returns replicas in the **non-ring order** (this because ReplicaSet chains
// ring-ordered replicas sequences from different DCs, thus not preserving
// the global ring order).
// Because of 2), we can't use a precomputed ReplicaSet, but instead we need ReplicasOrdered.
// As ReplicasOrdered can compute cheaply only the primary global replica
// (computation of the remaining ones is expensive), in case that the primary replica
// does not satisfy the `predicate`, None is returned. All expensive computation
// is to be done only when `fallback()` is called.
// does not satisfy the `predicate`, ToBeComputedInFallback is returned.
// All expensive computation is to be done only when `fallback()` is called.
self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec)
.into_replicas_ordered()
.into_iter()
.next()
.and_then(|(primary_replica, shard)| {
predicate(primary_replica, shard).then_some((primary_replica, shard))
.map(|(primary_replica, shard)| {
if predicate(primary_replica, shard) {
PickedReplica::Computed((primary_replica, shard))
} else {
PickedReplica::ToBeComputedInFallback
}
})
}
NodeLocationCriteria::Datacenter(_) | NodeLocationCriteria::DatacenterAndRack(_, _) => {
// ReplicaSet returned by ReplicaLocator for this case:
// 1) can be precomputed and lated used cheaply,
// 1) can be precomputed and later used cheaply,
// 2) returns replicas in the ring order (this is not true for the case
// when multiple DCs are allowed, because ReplicaSet chains replicas sequences
// from different DCs, thus not preserving the global ring order)
Expand All @@ -606,6 +639,7 @@ impl DefaultPolicy {
table_spec,
)
.next()
.map(PickedReplica::Computed)
}
}
}
Expand Down

0 comments on commit 37971c5

Please sign in to comment.