Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ReplicationConfigs to SessionConfig for excluding keyspaces from replica precompute #831

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::vec;
use tracing::instrument::WithSubscriber;
use tracing::{debug, warn};
use uuid::Uuid;

use super::node::{KnownNode, NodeAddr};

use super::locator::ReplicaLocator;
use super::locator::{ReplicaLocator, ReplicationConfigs};
use super::partitioner::calculate_token_for_partition_key;
use super::topology::Strategy;

Expand Down Expand Up @@ -68,6 +69,7 @@ pub struct ClusterData {
pub(crate) known_peers: HashMap<Uuid, Arc<Node>>, // Invariant: nonempty after Cluster::new()
pub(crate) keyspaces: HashMap<String, Keyspace>,
pub(crate) locator: ReplicaLocator,
pub(crate) replication_configs: ReplicationConfigs,
}

/// Enables printing [ClusterData] struct in a neat way, skipping the clutter involved by
Expand Down Expand Up @@ -145,6 +147,7 @@ impl Cluster {
fetch_schema_metadata: bool,
host_filter: Option<Arc<dyn HostFilter>>,
cluster_metadata_refresh_interval: Duration,
replication_opts: Option<&ReplicationConfigs>,
) -> Result<Cluster, NewSessionError> {
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32);
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32);
Expand All @@ -171,6 +174,7 @@ impl Cluster {
&HashMap::new(),
&None,
host_filter.as_deref(),
replication_opts,
)
.await;
cluster_data.wait_until_all_pools_are_initialized().await;
Expand Down Expand Up @@ -274,6 +278,7 @@ impl ClusterData {
known_peers: &HashMap<Uuid, Arc<Node>>,
used_keyspace: &Option<VerifiedKeyspaceName>,
host_filter: Option<&dyn HostFilter>,
replication_opts: Option<&ReplicationConfigs>,
) -> Self {
Comment on lines 280 to 282
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not make it an Option; ReplicationConfig should be always passed to where ReplicaLocator is constructed.

// Create new updated known_peers and ring
let mut new_known_peers: HashMap<Uuid, Arc<Node>> =
Expand Down Expand Up @@ -339,8 +344,16 @@ impl ClusterData {
Self::update_rack_count(&mut datacenters);

let keyspaces = metadata.keyspaces;
// Exclude these keyspaces from replica set precomputation
let keyspaces_to_exclude = match replication_opts {
Some(opts) => opts.keyspaces_to_exclude().to_vec(),
None => vec![],
};
Comment on lines +347 to +351
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better not to allocate here. Let's just use the borrowed Vec there.

Suggested change
// Exclude these keyspaces from replica set precomputation
let keyspaces_to_exclude = match replication_opts {
Some(opts) => opts.keyspaces_to_exclude().to_vec(),
None => vec![],
};
// Exclude these keyspaces from replica set precomputation
let keyspaces_to_exclude = replication_config.keyspaces_to_exclude();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even better, let's allow for both Allowlist and Denylist variant:

Suggested change
// Exclude these keyspaces from replica set precomputation
let keyspaces_to_exclude = match replication_opts {
Some(opts) => opts.keyspaces_to_exclude().to_vec(),
None => vec![],
};
let replicas_precomputation_config = replication_config.replicas_precomputation_config();

let (locator, keyspaces) = tokio::task::spawn_blocking(move || {
let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy);
let keyspace_strategies = keyspaces
.iter()
.filter(|(k, _)| !keyspaces_to_exclude.contains(k))
.map(|(_, ks)| &ks.strategy);
Comment on lines +353 to +356
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let keyspace_strategies = keyspaces
.iter()
.filter(|(k, _)| !keyspaces_to_exclude.contains(k))
.map(|(_, ks)| &ks.strategy);
let keyspace_strategies = keyspaces
.iter()
.filter(|(k, _)| match replicas_precomputation_config {
Denylist(disallowed_keyspaces) => !disallowed_keyspaces.contains(k),
AllowList(allowed_keyspaces) => allowed_keyspaces.contains(k),
)
.map(|(_, ks)| &ks.strategy);

let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies);
(locator, keyspaces)
})
Expand All @@ -351,6 +364,10 @@ impl ClusterData {
known_peers: new_known_peers,
keyspaces,
locator,
replication_configs: match replication_opts {
Some(opts) => opts.clone(),
None => ReplicationConfigs::default(),
},
}
}

Expand Down Expand Up @@ -662,6 +679,7 @@ impl ClusterWorker {
&cluster_data.known_peers,
&self.used_keyspace,
self.host_filter.as_deref(),
Some(&cluster_data.replication_configs),
)
.await,
);
Expand Down
20 changes: 18 additions & 2 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,15 @@ mod tests {
// based on locator mock cluster
pub(crate) async fn mock_cluster_data_for_token_aware_tests() -> ClusterData {
let metadata = mock_metadata_for_token_aware_tests();
ClusterData::new(metadata, &Default::default(), &HashMap::new(), &None, None).await
ClusterData::new(
metadata,
&Default::default(),
&HashMap::new(),
&None,
None,
None,
)
.await
}

// creates ClusterData with info about 5 nodes living in 2 different datacenters
Expand All @@ -1114,7 +1122,15 @@ mod tests {
keyspaces: HashMap::new(),
};

ClusterData::new(info, &Default::default(), &HashMap::new(), &None, None).await
ClusterData::new(
info,
&Default::default(),
&HashMap::new(),
&None,
None,
None,
)
.await
}

pub(crate) fn get_plan_and_collect_node_identifiers(
Expand Down
6 changes: 5 additions & 1 deletion scylla/src/transport/load_balancing/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ mod tests {
use std::{net::SocketAddr, str::FromStr, sync::Arc};

use crate::transport::{
locator::test::{create_locator, mock_metadata_for_token_aware_tests},
locator::{
test::{create_locator, mock_metadata_for_token_aware_tests},
ReplicationConfigs,
},
Node, NodeAddr,
};

Expand Down Expand Up @@ -156,6 +159,7 @@ mod tests {
known_peers: Default::default(),
keyspaces: Default::default(),
locator,
replication_configs: ReplicationConfigs::default(),
};
let routing_info = RoutingInfo::default();
let plan = Plan::new(&policy, &routing_info, &cluster_data);
Expand Down
17 changes: 17 additions & 0 deletions scylla/src/transport/locator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ use std::{
};
use tracing::debug;

/// TODO(michael): Docs
#[derive(Debug, Default, Clone)]
pub struct ReplicationConfigs {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub struct ReplicationConfigs {
pub struct ReplicationConfig {

// Keyspaces to exclude from precomuted replica sets
keyspaces_to_exclude: Vec<String>,
}

impl ReplicationConfigs {
pub fn exclude_keyspace(&mut self, keyspace: String) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn exclude_keyspace(&mut self, keyspace: String) {
pub fn exclude_keyspace(&mut self, keyspace: impl Into<String>) {

self.keyspaces_to_exclude.push(keyspace)
}

pub fn keyspaces_to_exclude(&self) -> &Vec<String> {
&self.keyspaces_to_exclude
}
}

Comment on lines +36 to +40
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create a ReplicasPrecomputationList enum:

enum ReplicasPrecomputationList {
    Allowlist { allowed_keyspaces: Vec<String> },
    Denylist { disallowed_keyspaces: Vec<String> },
}

and store it ReplicationConfig as Option<ReplicaPrecomputationList>.

/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication
/// strategy) pair. It does so by either using the precomputed token ranges, or doing the
/// computation on the fly.
Expand Down
6 changes: 6 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::transport::connection_pool::PoolConfig;
use crate::transport::host_filter::HostFilter;
use crate::transport::iterator::{PreparedIteratorConfig, RowIterator};
use crate::transport::load_balancing::{self, RoutingInfo};
use crate::transport::locator::ReplicationConfigs;
use crate::transport::metrics::Metrics;
use crate::transport::node::Node;
use crate::transport::query_result::QueryResult;
Expand Down Expand Up @@ -285,6 +286,9 @@ pub struct SessionConfig {
/// for e.g: if they do not want unexpected traffic
/// or they expect the topology to change frequently.
pub cluster_metadata_refresh_interval: Duration,

/// TODO(michael): Docs ...
pub replication_configs: ReplicationConfigs,
Comment on lines +289 to +291
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionBuilder needs the corresponding setters as well, as it's the intended way to construct a Session.

}

impl SessionConfig {
Expand Down Expand Up @@ -331,6 +335,7 @@ impl SessionConfig {
tracing_info_fetch_interval: Duration::from_millis(3),
tracing_info_fetch_consistency: Consistency::One,
cluster_metadata_refresh_interval: Duration::from_secs(60),
replication_configs: ReplicationConfigs::default(),
}
}

Expand Down Expand Up @@ -524,6 +529,7 @@ impl Session {
config.fetch_schema_metadata,
config.host_filter,
config.cluster_metadata_refresh_interval,
Some(&config.replication_configs),
)
.await?;

Expand Down
Loading