diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 46aa282992..dd5a989d52 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -58,15 +58,28 @@ impl NodeLocationPreference { } } +/// An ordering requirement for replicas. #[derive(Clone, Copy)] enum ReplicaOrder { + /// No requirement. Replicas can be returned in arbitrary order. Arbitrary, - RingOrder, + + /// A requirement for the order to be deterministic, not only across statement executions + /// but also across drivers. This is used for LWT optimisation, to avoid Paxos conflicts. + Deterministic, } +/// Statement kind, used to enable specific load balancing patterns for certain cases. +/// +/// Currently, there is a distinguished case of LWT statements, which should always be routed +/// to replicas in a deterministic order to avoid Paxos conflicts. Other statements +/// are routed to random replicas to balance the load. #[derive(Clone, Copy)] enum StatementType { + /// The statement is a confirmed LWT. It's to be routed specifically. Lwt, + + /// The statement is not a confirmed LWT. It's to be routed in a default way. NonLwt, } @@ -82,16 +95,52 @@ enum PickedReplica<'a> { /// The default load balancing policy. /// -/// It can be configured to be datacenter-aware and token-aware. -/// Datacenter failover for queries with non local consistency mode is also supported. -/// Latency awareness is available, althrough not recommended. +/// It can be configured to be datacenter-aware, rack-aware and token-aware. +/// Datacenter failover (sending query to a node from a remote datacenter) +/// for queries with non local consistency mode is also supported. +/// +/// Latency awareness is available, although **not recommended**: +/// the penalisation mechanism it involves may interact badly with other +/// mechanisms, such as LWT optimisation. Also, the very tactics of penalising +/// nodes for recently measures latencies is believed to not be very stable +/// and beneficial. The number of in-flight requests, for instance, seems +/// to be a better metric showing how (over)loaded a target node/shard is. +/// For now, however, we don't have an implementation of the +/// in-flight-requests-aware policy. #[allow(clippy::type_complexity)] pub struct DefaultPolicy { + /// Preferences regarding node location. One of: rack and DC, DC, or no preference. preferences: NodeLocationPreference, + + /// Configures whether the policy takes token into consideration when creating plans. + /// If this is set to `true` AND token, keyspace and table are available, + /// then policy prefers replicas and puts them earlier in the query plan. is_token_aware: bool, + + /// Whether to permit remote nodes (those not located in the preferred DC) in plans. + /// If no preferred DC is set, this has no effect. permit_dc_failover: bool, + + /// A predicate that a target (node + shard) must satisfy in order to be picked. + /// This was introduced to make latency awareness cleaner. + /// - if latency awareness is disabled, then `pick_predicate` is just `Self::is_alive()`; + /// - if latency awareness is enabled, then it is `Self::is_alive() && latency_predicate()`, + /// which checks that the target is not penalised due to high latencies. pick_predicate: Box, Option) -> bool + Send + Sync>, + + /// Additional layer that penalises targets that are too slow compared to others + /// in terms of latency. It works in the following way: + /// - for `pick`, it uses `latency_predicate` to filter out penalised nodes, + /// so that a penalised node will never be `pick`ed; + /// - for `fallback`, it wraps the returned iterator, moving all penalised nodes + /// to the end, in a stable way. + /// + /// Penalisation is done based on collected and updated latencies. latency_awareness: Option, + + /// The policy chooses (in `pick`) and shuffles (in `fallback`) replicas and nodes + /// based on random number generator. For sake of deterministic testing, + /// a fixed seed can be used. fixed_seed: Option, } @@ -102,7 +151,7 @@ impl fmt::Debug for DefaultPolicy { .field("is_token_aware", &self.is_token_aware) .field("permit_dc_failover", &self.permit_dc_failover) .field("latency_awareness", &self.latency_awareness) - .field("fixed_shuffle_seed", &self.fixed_seed) + .field("fixed_seed", &self.fixed_seed) .finish_non_exhaustive() } } @@ -113,7 +162,10 @@ impl LoadBalancingPolicy for DefaultPolicy { query: &'a RoutingInfo, cluster: &'a ClusterData, ) -> Option<(NodeRef<'a>, Option)> { + /* For prepared statements, token-aware logic is available, we know what are the replicas + * for the statement, so that we can pick one of them. */ let routing_info = self.routing_info(query, cluster); + if let Some(ref token_with_strategy) = routing_info.token_with_strategy { if self.preferences.datacenter().is_some() && !self.permit_dc_failover @@ -130,11 +182,16 @@ or refrain from preferring datacenters (which may ban all other datacenters, if ); } } + + /* LWT statements need to be routed differently: always to the same replica, to avoid Paxos contention. */ let statement_type = if query.is_confirmed_lwt { StatementType::Lwt } else { StatementType::NonLwt }; + + /* Token-aware logic - if routing info is available, we know what are the replicas + * for the statement. Try to pick one of them. */ if let (Some(ts), Some(table_spec)) = (&routing_info.token_with_strategy, query.table) { if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { // Try to pick some alive local rack random replica. @@ -207,55 +264,70 @@ or refrain from preferring datacenters (which may ban all other datacenters, if } }; - // If no token was available (or all the replicas for that token are down), try to pick - // some alive local node. + /* Token-unaware logic - if routing info is not available (e.g. for unprepared statements), + * or no replica was suitable for targeting it (e.g. disabled or down), try to choose + * a random node, not necessarily a replica. */ + + /* We start having not alive nodes filtered out. This is done by `pick_predicate`, + * which always contains `Self::is_alive()`. */ + + // Let's start with local nodes, i.e. those in the preferred datacenter. // If there was no preferred datacenter specified, all nodes are treated as local. - let nodes = self.preferred_node_set(cluster); + let local_nodes = self.preferred_node_set(cluster); if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { - // Try to pick some alive local rack random node. + // Try to pick some alive random local rack node. let rack_predicate = Self::make_rack_predicate( |node| (self.pick_predicate)(node, None), NodeLocationCriteria::DatacenterAndRack(dc, rack), ); - let local_rack_picked = self.pick_node(nodes, rack_predicate); + let local_rack_node_picked = self.pick_node(local_nodes, rack_predicate); - if let Some(alive_local_rack) = local_rack_picked { - return Some((alive_local_rack, None)); + if let Some(alive_local_rack_node) = local_rack_node_picked { + return Some((alive_local_rack_node, None)); } } - // Try to pick some alive local random node. - if let Some(alive_local) = self.pick_node(nodes, |node| (self.pick_predicate)(node, None)) { - return Some((alive_local, None)); + // Try to pick some alive random local node. + let local_node_picked = + self.pick_node(local_nodes, |node| (self.pick_predicate)(node, None)); + if let Some(alive_local_node) = local_node_picked { + return Some((alive_local_node, None)); } let all_nodes = cluster.replica_locator().unique_nodes_in_global_ring(); // If a datacenter failover is possible, loosen restriction about locality. if self.is_datacenter_failover_possible(&routing_info) { - let picked = self.pick_node(all_nodes, |node| (self.pick_predicate)(node, None)); - if let Some(alive_maybe_remote) = picked { - return Some((alive_maybe_remote, None)); + let maybe_remote_node_picked = + self.pick_node(all_nodes, |node| (self.pick_predicate)(node, None)); + if let Some(alive_maybe_remote_node) = maybe_remote_node_picked { + return Some((alive_maybe_remote_node, None)); } } + /* As we are here, we failed to pick any alive node. Now let's consider even down nodes. */ + // Previous checks imply that every node we could have selected is down. // Let's try to return a down node that wasn't disabled. - let picked = self.pick_node(nodes, |node| node.is_enabled()); - if let Some(down_but_enabled_local_node) = picked { + let maybe_down_local_node_picked = self.pick_node(local_nodes, |node| node.is_enabled()); + if let Some(down_but_enabled_local_node) = maybe_down_local_node_picked { return Some((down_but_enabled_local_node, None)); } // If a datacenter failover is possible, loosen restriction about locality. if self.is_datacenter_failover_possible(&routing_info) { - let picked = self.pick_node(all_nodes, |node| node.is_enabled()); - if let Some(down_but_enabled_maybe_remote_node) = picked { + let maybe_down_maybe_remote_node_picked = + self.pick_node(all_nodes, |node| node.is_enabled()); + if let Some(down_but_enabled_maybe_remote_node) = maybe_down_maybe_remote_node_picked { return Some((down_but_enabled_maybe_remote_node, None)); } } // Every node is disabled. This could be due to a bad host filter - configuration error. - nodes.first().map(|node| (node, None)) + // It makes no sense to return disabled nodes (there are no open connections to them anyway), + // so let's return None. `fallback()` will return empty iterator, and so the whole plan + // will be empty. + None } fn fallback<'a>( @@ -263,20 +335,29 @@ or refrain from preferring datacenters (which may ban all other datacenters, if query: &'a RoutingInfo, cluster: &'a ClusterData, ) -> FallbackPlan<'a> { + /* For prepared statements, token-aware logic is available, we know what are the replicas + * for the statement, so that we can pick one of them. */ let routing_info = self.routing_info(query, cluster); + + /* LWT statements need to be routed differently: always to the same replica, to avoid Paxos contention. */ let statement_type = if query.is_confirmed_lwt { StatementType::Lwt } else { StatementType::NonLwt }; - // If token is available, get a shuffled list of alive replicas. + /* Token-aware logic - if routing info is available, we know what are the replicas for the statement. + * Get a list of alive replicas: + * - shuffled list in case of non-LWTs, + * - deterministically ordered in case of LWTs. */ let maybe_replicas = if let (Some(ts), Some(table_spec)) = (&routing_info.token_with_strategy, query.table) { + // Iterator over alive local rack replicas (shuffled or deterministically ordered, + // depending on the statement being LWT or not). let maybe_local_rack_replicas = if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { - let local_rack_replicas = self.fallback_replicas( + let local_rack_replicas = self.maybe_shuffled_replicas( ts, NodeLocationCriteria::DatacenterAndRack(dc, rack), |node, shard| Self::is_alive(node, Some(shard)), @@ -289,11 +370,13 @@ or refrain from preferring datacenters (which may ban all other datacenters, if Either::Right(std::iter::empty()) }; + // Iterator over alive local datacenter replicas (shuffled or deterministically ordered, + // depending on the statement being LWT or not). let maybe_local_replicas = if let NodeLocationPreference::DatacenterAndRack(dc, _) | NodeLocationPreference::Datacenter(dc) = &self.preferences { - let local_replicas = self.fallback_replicas( + let local_replicas = self.maybe_shuffled_replicas( ts, NodeLocationCriteria::Datacenter(dc), |node, shard| Self::is_alive(node, Some(shard)), @@ -310,7 +393,9 @@ or refrain from preferring datacenters (which may ban all other datacenters, if let maybe_remote_replicas = if self.preferences.datacenter().is_none() || self.is_datacenter_failover_possible(&routing_info) { - let remote_replicas = self.fallback_replicas( + // Iterator over alive replicas (shuffled or deterministically ordered, + // depending on the statement being LWT or not). + let remote_replicas = self.maybe_shuffled_replicas( ts, NodeLocationCriteria::Any, |node, shard| Self::is_alive(node, Some(shard)), @@ -335,10 +420,16 @@ or refrain from preferring datacenters (which may ban all other datacenters, if Either::Right(std::iter::empty::<(NodeRef<'a>, Option)>()) }; - // Get a list of all local alive nodes, and apply a round robin to it + /* Token-unaware logic - if routing info is not available (e.g. for unprepared statements), + * or no replica is suitable for targeting it (e.g. disabled or down), try targetting nodes + * that are not necessarily replicas. */ + + /* We start having not alive nodes filtered out. */ + + // All nodes in the local datacenter (if one is given). let local_nodes = self.preferred_node_set(cluster); - let maybe_local_rack_nodes = + let robinned_local_rack_nodes = if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { let rack_predicate = Self::make_rack_predicate( |node| Self::is_alive(node, None), @@ -351,10 +442,12 @@ or refrain from preferring datacenters (which may ban all other datacenters, if } else { Either::Right(std::iter::empty::<(NodeRef<'a>, Option)>()) }; + let robinned_local_nodes = self .round_robin_nodes(local_nodes, |node| Self::is_alive(node, None)) .map(|node| (node, None)); + // All nodes in the cluster. let all_nodes = cluster.replica_locator().unique_nodes_in_global_ring(); // If a datacenter failover is possible, loosen restriction about locality. @@ -385,6 +478,27 @@ or refrain from preferring datacenters (which may ban all other datacenters, if Either::Right(std::iter::empty()) }; + /// *Plan* should return unique elements. It is not however obvious what it means, + /// because some nodes in the plan may have shards and some may not. + /// + /// This helper structure defines equality of plan elements. + /// How the comparison works: + /// - If at least one of elements is shard-less, then compare just nodes. + /// - If both elements have shards, then compare both nodes and shards. + /// + /// Why is it implemented this way? + /// Driver should not attempt to send a request to the same destination twice. + /// If a plan element doesn't have shard specified, then a random shard will be + /// chosen by the driver. If the plan also contains the same node but with + /// a shard present, and we randomly choose the same shard for the shard-less element, + /// then we have duplication. + /// + /// Example: plan is `[(Node1, Some(1)), (Node1, None)]` - if the driver uses + /// the second element and randomly chooses shard 1, then we have duplication. + /// + /// On the other hand, if a plan has a duplicate node, but with different shards, + /// then we want to use both elements - so we can't just make the list unique by node, + /// and so this struct was created. struct DefaultPolicyTargetComparator { host_id: Uuid, shard: Option, @@ -409,9 +523,17 @@ or refrain from preferring datacenters (which may ban all other datacenters, if } } - // Construct a fallback plan as a composition of replicas, local nodes and remote nodes. + // Construct a fallback plan as a composition of: + // - local rack alive replicas, + // - local datacenter alive replicas (or all alive replicas is no DC is preferred), + // - remote alive replicas (if DC failover is enabled), + // - local rack alive nodes, + // - local datacenter alive nodes (or all alive nodes is no DC is preferred), + // - remote alive nodes (if DC failover is enabled), + // - local datacenter nodes, + // - remote nodes (if DC failover is enabled). let plan = maybe_replicas - .chain(maybe_local_rack_nodes) + .chain(robinned_local_rack_nodes) .chain(robinned_local_nodes) .chain(maybe_remote_nodes) .chain(maybe_down_local_nodes) @@ -421,6 +543,8 @@ or refrain from preferring datacenters (which may ban all other datacenters, if shard: *shard, }); + // If latency awareness is enabled, wrap the plan by applying latency penalisation: + // all penalised nodes are moved behind non-penalised nodes, in a stable fashion. if let Some(latency_awareness) = self.latency_awareness.as_ref() { Box::new(latency_awareness.wrap(plan)) } else { @@ -459,6 +583,7 @@ impl DefaultPolicy { DefaultPolicyBuilder::new() } + /// Returns the given routing info processed based on given cluster data. fn routing_info<'a>( &'a self, query: &'a RoutingInfo, @@ -473,6 +598,8 @@ impl DefaultPolicy { routing_info } + /// Returns all nodes in the local datacenter if one is given, + /// or else all nodes in the cluster. fn preferred_node_set<'a>(&'a self, cluster: &'a ClusterData) -> &'a [Arc] { if let Some(preferred_datacenter) = self.preferences.datacenter() { if let Some(nodes) = cluster @@ -493,6 +620,8 @@ impl DefaultPolicy { } } + /// Returns a full replica set for given datacenter (if given, else for all DCs), + /// cluster data and table spec. fn nonfiltered_replica_set<'a>( &'a self, ts: &TokenWithStrategy<'a>, @@ -535,7 +664,12 @@ impl DefaultPolicy { } } - fn replicas<'a>( + /// Returns iterator over replicas for given token and table spec, filtered + /// by provided location criteria and predicate. + /// Respects requested replica order, i.e. if requested, returns replicas ordered + /// deterministically (i.e. by token ring order or by tablet definition order), + /// else returns replicas in arbitrary order. + fn filtered_replicas<'a>( &'a self, ts: &TokenWithStrategy<'a>, replica_location: NodeLocationCriteria<'a>, @@ -551,7 +685,7 @@ impl DefaultPolicy { self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec) .into_iter(), ), - ReplicaOrder::RingOrder => Either::Right( + ReplicaOrder::Deterministic => Either::Right( self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec) .into_replicas_ordered() .into_iter(), @@ -560,6 +694,11 @@ impl DefaultPolicy { replica_iter.filter(move |(node, shard): &(NodeRef<'a>, Shard)| predicate(node, *shard)) } + /// Picks a replica for given token and table spec which meets the provided location criteria + /// and the predicate. + /// The replica is chosen randomly over all candidates that meet the criteria + /// unless the query is LWT; if so, the first replica meeting the criteria is chosen + /// to avoid Paxos contention. fn pick_replica<'a>( &'a self, ts: &TokenWithStrategy<'a>, @@ -579,11 +718,17 @@ impl DefaultPolicy { } } + /// Picks the first (wrt the deterministic order imposed on the keyspace, see comment below) + /// replica for given token and table spec which meets the provided location criteria + /// and the predicate. // This is to be used for LWT optimisation: in order to reduce contention - // caused by Paxos conflicts, we always try to query replicas in the same, ring order. + // caused by Paxos conflicts, we always try to query replicas in the same, + // deterministic order: + // - ring order for token ring keyspaces, + // - tablet definition order for tablet keyspaces. // // If preferred rack and DC are set, then the first (encountered on the ring) replica - // that resides in that rack in that DC **and** satisfies the `predicate` is returned. + // that resides in that rack in that DC **and** satisfies the `predicate` is returned. // // If preferred DC is set, then the first (encountered on the ring) replica // that resides in that DC **and** satisfies the `predicate` is returned. @@ -630,12 +775,12 @@ impl DefaultPolicy { // 2) returns replicas in the ring order (this is not true for the case // when multiple DCs are allowed, because ReplicaSet chains replicas sequences // from different DCs, thus not preserving the global ring order) - self.replicas( + self.filtered_replicas( ts, replica_location, predicate, cluster, - ReplicaOrder::RingOrder, + ReplicaOrder::Deterministic, table_spec, ) .next() @@ -644,6 +789,8 @@ impl DefaultPolicy { } } + /// Picks a random replica for given token and table spec which meets the provided + /// location criteria and the predicate. fn pick_random_replica<'a>( &'a self, ts: &TokenWithStrategy<'a>, @@ -664,7 +811,11 @@ impl DefaultPolicy { } } - fn fallback_replicas<'a>( + /// Returns iterator over replicas for given token and table spec, filtered + /// by provided location criteria and predicate. + /// By default, the replicas are shuffled. + /// For LWTs, though, the replicas are instead returned in a deterministic order. + fn maybe_shuffled_replicas<'a>( &'a self, ts: &TokenWithStrategy<'a>, replica_location: NodeLocationCriteria<'a>, @@ -674,20 +825,22 @@ impl DefaultPolicy { table_spec: &TableSpec, ) -> impl Iterator, Shard)> { let order = match statement_type { - StatementType::Lwt => ReplicaOrder::RingOrder, + StatementType::Lwt => ReplicaOrder::Deterministic, StatementType::NonLwt => ReplicaOrder::Arbitrary, }; - let replicas = self.replicas(ts, replica_location, predicate, cluster, order, table_spec); + let replicas = + self.filtered_replicas(ts, replica_location, predicate, cluster, order, table_spec); match statement_type { // As an LWT optimisation: in order to reduce contention caused by Paxos conflicts, - // we always try to query replicas in the same order. + // we always try to query replicas in the same order. StatementType::Lwt => Either::Left(replicas), StatementType::NonLwt => Either::Right(self.shuffle(replicas)), } } + /// Returns an iterator over the given slice of nodes, rotated by a random shift. fn randomly_rotated_nodes(nodes: &[Arc]) -> impl Iterator> { // Create a randomly rotated slice view let nodes_len = nodes.len(); @@ -704,6 +857,7 @@ impl DefaultPolicy { } } + /// Picks a random node from the slice of nodes. The node must satisfy the given predicate. fn pick_node<'a>( &'a self, nodes: &'a [Arc], @@ -713,6 +867,8 @@ impl DefaultPolicy { Self::randomly_rotated_nodes(nodes).find(|&node| predicate(node)) } + /// Returns an iterator over the given slice of nodes, rotated by a random shift + /// and filtered by given predicate. fn round_robin_nodes<'a>( &'a self, nodes: &'a [Arc], @@ -721,6 +877,7 @@ impl DefaultPolicy { Self::randomly_rotated_nodes(nodes).filter(move |node| predicate(node)) } + /// Wraps a given iterator by shuffling its contents. fn shuffle<'a>( &self, iter: impl Iterator, Shard)>, @@ -737,12 +894,14 @@ impl DefaultPolicy { vec.into_iter() } + /// Returns true iff the node should be considered to be alive. fn is_alive(node: NodeRef, _shard: Option) -> bool { // For now, we leave this as stub, until we have time to improve node events. // node.is_enabled() && !node.is_down() node.is_enabled() } + /// Returns true iff the datacenter failover is permitted for the statement being executed. fn is_datacenter_failover_possible(&self, routing_info: &ProcessedRoutingInfo) -> bool { self.preferences.datacenter().is_some() && self.permit_dc_failover @@ -2464,13 +2623,13 @@ mod latency_awareness { Err(e) => { warn!( "Error while calculating average: {e}. \ - prev_avg_secs: {prev_avg_secs}, \ - last_latency_secs: {last_latency_secs}, \ - prev_weight: {prev_weight}, \ - scaled_delay: {scaled_delay}, \ - delay: {delay}, \ - prev_avg.timestamp: {:?}, \ - now: {now:?}", + prev_avg_secs: {prev_avg_secs}, \ + last_latency_secs: {last_latency_secs}, \ + prev_weight: {prev_weight}, \ + scaled_delay: {scaled_delay}, \ + delay: {delay}, \ + prev_avg.timestamp: {:?}, \ + now: {now:?}", prev_avg.timestamp ); diff --git a/scylla/src/transport/locator/mod.rs b/scylla/src/transport/locator/mod.rs index e0f06e8ba2..2ae46856d1 100644 --- a/scylla/src/transport/locator/mod.rs +++ b/scylla/src/transport/locator/mod.rs @@ -25,12 +25,12 @@ use std::{ }; use tracing::debug; -/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication -/// strategy) pair. It does so by either using the precomputed token ranges, or doing the -/// computation on the fly. +/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, +/// replication strategy, table) tuple. It does so by either using the precomputed +/// token ranges, or doing the computation on the fly (precomputation is configurable). #[derive(Debug, Clone)] pub struct ReplicaLocator { - /// the data based on which `ReplicaLocator` computes replica sets. + /// The data based on which `ReplicaLocator` computes replica sets. replication_data: ReplicationInfo, precomputed_replicas: PrecomputedReplicas, @@ -69,7 +69,7 @@ impl ReplicaLocator { } } - /// Returns a set of nodes that are considered to be replicas for a given token and strategy. + /// Returns a set of nodes that are considered to be replicas for a given token, strategy and table. /// If the `datacenter` parameter is set, the returned `ReplicaSet` is limited only to replicas /// from that datacenter. If a specified datacenter name does not correspond to a valid /// datacenter, an empty set will be returned.