diff --git a/scylla/src/transport/cluster.rs b/scylla/src/transport/cluster.rs index 503d14519d..8128f141ac 100644 --- a/scylla/src/transport/cluster.rs +++ b/scylla/src/transport/cluster.rs @@ -22,13 +22,14 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use std::vec; use tracing::instrument::WithSubscriber; use tracing::{debug, warn}; use uuid::Uuid; use super::node::{KnownNode, NodeAddr}; -use super::locator::ReplicaLocator; +use super::locator::{ReplicaLocator, ReplicationConfigs}; use super::partitioner::calculate_token_for_partition_key; use super::topology::Strategy; @@ -68,6 +69,7 @@ pub struct ClusterData { pub(crate) known_peers: HashMap>, // Invariant: nonempty after Cluster::new() pub(crate) keyspaces: HashMap, pub(crate) locator: ReplicaLocator, + pub(crate) replication_configs: ReplicationConfigs, } /// Enables printing [ClusterData] struct in a neat way, skipping the clutter involved by @@ -145,6 +147,7 @@ impl Cluster { fetch_schema_metadata: bool, host_filter: Option>, cluster_metadata_refresh_interval: Duration, + replication_opts: Option<&ReplicationConfigs>, ) -> Result { let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32); let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32); @@ -171,6 +174,7 @@ impl Cluster { &HashMap::new(), &None, host_filter.as_deref(), + replication_opts, ) .await; cluster_data.wait_until_all_pools_are_initialized().await; @@ -274,6 +278,7 @@ impl ClusterData { known_peers: &HashMap>, used_keyspace: &Option, host_filter: Option<&dyn HostFilter>, + replication_opts: Option<&ReplicationConfigs>, ) -> Self { // Create new updated known_peers and ring let mut new_known_peers: HashMap> = @@ -339,8 +344,16 @@ impl ClusterData { Self::update_rack_count(&mut datacenters); let keyspaces = metadata.keyspaces; + // Exclude these keyspaces from replica set precomputation + let keyspaces_to_exclude = match replication_opts { + Some(opts) => opts.keyspaces_to_exclude().to_vec(), + None => vec![], + }; let (locator, keyspaces) = tokio::task::spawn_blocking(move || { - let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy); + let keyspace_strategies = keyspaces + .iter() + .filter(|(k, _)| !keyspaces_to_exclude.contains(k)) + .map(|(_, ks)| &ks.strategy); let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies); (locator, keyspaces) }) @@ -351,6 +364,10 @@ impl ClusterData { known_peers: new_known_peers, keyspaces, locator, + replication_configs: match replication_opts { + Some(opts) => opts.clone(), + None => ReplicationConfigs::default(), + }, } } @@ -662,6 +679,7 @@ impl ClusterWorker { &cluster_data.known_peers, &self.used_keyspace, self.host_filter.as_deref(), + Some(&cluster_data.replication_configs), ) .await, ); diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 3c2097c9dc..9365352fc4 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -1090,7 +1090,15 @@ mod tests { // based on locator mock cluster pub(crate) async fn mock_cluster_data_for_token_aware_tests() -> ClusterData { let metadata = mock_metadata_for_token_aware_tests(); - ClusterData::new(metadata, &Default::default(), &HashMap::new(), &None, None).await + ClusterData::new( + metadata, + &Default::default(), + &HashMap::new(), + &None, + None, + None, + ) + .await } // creates ClusterData with info about 5 nodes living in 2 different datacenters @@ -1114,7 +1122,15 @@ mod tests { keyspaces: HashMap::new(), }; - ClusterData::new(info, &Default::default(), &HashMap::new(), &None, None).await + ClusterData::new( + info, + &Default::default(), + &HashMap::new(), + &None, + None, + None, + ) + .await } pub(crate) fn get_plan_and_collect_node_identifiers( diff --git a/scylla/src/transport/load_balancing/plan.rs b/scylla/src/transport/load_balancing/plan.rs index e49d4cb012..582b441c8f 100644 --- a/scylla/src/transport/load_balancing/plan.rs +++ b/scylla/src/transport/load_balancing/plan.rs @@ -106,7 +106,10 @@ mod tests { use std::{net::SocketAddr, str::FromStr, sync::Arc}; use crate::transport::{ - locator::test::{create_locator, mock_metadata_for_token_aware_tests}, + locator::{ + test::{create_locator, mock_metadata_for_token_aware_tests}, + ReplicationConfigs, + }, Node, NodeAddr, }; @@ -156,6 +159,7 @@ mod tests { known_peers: Default::default(), keyspaces: Default::default(), locator, + replication_configs: ReplicationConfigs::default(), }; let routing_info = RoutingInfo::default(); let plan = Plan::new(&policy, &routing_info, &cluster_data); diff --git a/scylla/src/transport/locator/mod.rs b/scylla/src/transport/locator/mod.rs index db55b9fe69..538030ce7a 100644 --- a/scylla/src/transport/locator/mod.rs +++ b/scylla/src/transport/locator/mod.rs @@ -21,6 +21,23 @@ use std::{ }; use tracing::debug; +/// TODO(michael): Docs +#[derive(Debug, Default, Clone)] +pub struct ReplicationConfigs { + // Keyspaces to exclude from precomuted replica sets + keyspaces_to_exclude: Vec, +} + +impl ReplicationConfigs { + pub fn exclude_keyspace(&mut self, keyspace: String) { + self.keyspaces_to_exclude.push(keyspace) + } + + pub fn keyspaces_to_exclude(&self) -> &Vec { + &self.keyspaces_to_exclude + } +} + /// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication /// strategy) pair. It does so by either using the precomputed token ranges, or doing the /// computation on the fly. diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 35ff25475f..2a295ae6ad 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -60,6 +60,7 @@ use crate::transport::connection_pool::PoolConfig; use crate::transport::host_filter::HostFilter; use crate::transport::iterator::{PreparedIteratorConfig, RowIterator}; use crate::transport::load_balancing::{self, RoutingInfo}; +use crate::transport::locator::ReplicationConfigs; use crate::transport::metrics::Metrics; use crate::transport::node::Node; use crate::transport::query_result::QueryResult; @@ -285,6 +286,9 @@ pub struct SessionConfig { /// for e.g: if they do not want unexpected traffic /// or they expect the topology to change frequently. pub cluster_metadata_refresh_interval: Duration, + + /// TODO(michael): Docs ... + pub replication_configs: ReplicationConfigs, } impl SessionConfig { @@ -331,6 +335,7 @@ impl SessionConfig { tracing_info_fetch_interval: Duration::from_millis(3), tracing_info_fetch_consistency: Consistency::One, cluster_metadata_refresh_interval: Duration::from_secs(60), + replication_configs: ReplicationConfigs::default(), } } @@ -524,6 +529,7 @@ impl Session { config.fetch_schema_metadata, config.host_filter, config.cluster_metadata_refresh_interval, + Some(&config.replication_configs), ) .await?;