diff --git a/scylla/Cargo.toml b/scylla/Cargo.toml index f7f7edf434..c832a62303 100644 --- a/scylla/Cargo.toml +++ b/scylla/Cargo.toml @@ -24,6 +24,7 @@ num-bigint-03 = ["scylla-cql/num-bigint-03"] num-bigint-04 = ["scylla-cql/num-bigint-04"] bigdecimal-04 = ["scylla-cql/bigdecimal-04"] full-serialization = ["chrono", "time", "secret", "num-bigint-03", "num-bigint-04", "bigdecimal-04"] +unstable-tablets = ["dep:lazy_static"] [dependencies] scylla-macros = { version = "0.4.0", path = "../scylla-macros" } @@ -53,7 +54,7 @@ url = { version = "2.3.1", optional = true } base64 = { version = "0.21.1", optional = true } rand_pcg = "0.3.1" socket2 = { version = "0.5.3", features = ["all"] } -lazy_static = "1" +lazy_static = { version = "1", optional = true } [dev-dependencies] num-bigint-03 = { package = "num-bigint", version = "0.3" } diff --git a/scylla/src/transport/cluster.rs b/scylla/src/transport/cluster.rs index 7192cd7879..e81206fbc4 100644 --- a/scylla/src/transport/cluster.rs +++ b/scylla/src/transport/cluster.rs @@ -27,6 +27,7 @@ use tracing::instrument::WithSubscriber; use tracing::{debug, warn}; use uuid::Uuid; +#[cfg(feature = "unstable-tablets")] use super::locator::tablets::{RawTablet, Tablet, TabletsInfo}; use super::node::{KnownNode, NodeAddr}; use super::NodeRef; @@ -119,6 +120,7 @@ struct ClusterWorker { // Channel used to receive info about new tablets from custom payload in responses // sent by server. + #[cfg(feature = "unstable-tablets")] tablets_channel: tokio::sync::mpsc::Receiver<(TableSpec, RawTablet)>, // Keyspace send in "USE " when opening each connection @@ -152,7 +154,10 @@ impl Cluster { fetch_schema_metadata: bool, host_filter: Option>, cluster_metadata_refresh_interval: Duration, - tablet_receiver: tokio::sync::mpsc::Receiver<(TableSpec, RawTablet)>, + #[cfg(feature = "unstable-tablets")] tablet_receiver: tokio::sync::mpsc::Receiver<( + TableSpec, + RawTablet, + )>, ) -> Result { let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32); let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32); @@ -179,6 +184,7 @@ impl Cluster { &HashMap::new(), &None, host_filter.as_deref(), + #[cfg(feature = "unstable-tablets")] TabletsInfo::new(), ) .await; @@ -195,6 +201,7 @@ impl Cluster { refresh_channel: refresh_receiver, server_events_channel: server_events_receiver, control_connection_repair_channel: control_connection_repair_receiver, + #[cfg(feature = "unstable-tablets")] tablets_channel: tablet_receiver, use_keyspace_channel: use_keyspace_receiver, @@ -284,7 +291,7 @@ impl ClusterData { known_peers: &HashMap>, used_keyspace: &Option, host_filter: Option<&dyn HostFilter>, - tablets: TabletsInfo, + #[cfg(feature = "unstable-tablets")] tablets: TabletsInfo, ) -> Self { // Create new updated known_peers and ring let mut new_known_peers: HashMap> = @@ -352,7 +359,12 @@ impl ClusterData { let keyspaces = metadata.keyspaces; let (locator, keyspaces) = tokio::task::spawn_blocking(move || { let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy); - let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies, tablets); + let locator = ReplicaLocator::new( + ring.into_iter(), + keyspace_strategies, + #[cfg(feature = "unstable-tablets")] + tablets, + ); (locator, keyspaces) }) .await @@ -494,6 +506,7 @@ impl ClusterData { // is nonempty, too. } + #[cfg(feature = "unstable-tablets")] fn update_tablets(&mut self, raw_tablets: Vec<(TableSpec, RawTablet)>) { let replica_translator = |uuid: Uuid| self.known_peers.get(&uuid).map(Arc::clone); @@ -524,9 +537,19 @@ impl ClusterWorker { }) .unwrap_or_else(Instant::now); + let sleep_future = tokio::time::sleep_until(sleep_until); + + #[cfg(feature = "unstable-tablets")] let mut tablets = Vec::new(); + #[cfg(feature = "unstable-tablets")] + let channel_ref = &mut self.tablets_channel; + #[cfg(feature = "unstable-tablets")] + let tablets_ref = &mut tablets; + #[cfg(feature = "unstable-tablets")] + let tablets_poll = { || async move { channel_ref.recv_many(tablets_ref, 8192).await } }; + #[cfg(not(feature = "unstable-tablets"))] + let tablets_poll = std::future::pending; - let sleep_future = tokio::time::sleep_until(sleep_until); tokio::pin!(sleep_future); tokio::select! { @@ -537,16 +560,26 @@ impl ClusterWorker { None => return, // If refresh_channel was closed then cluster was dropped, we can stop working } } - tablets_count = self.tablets_channel.recv_many(&mut tablets, 8192) => { - if tablets_count == 0 { - // If the channel was closed then cluster was dropped, we can stop working - return; + tablets_count = tablets_poll() => { + #[cfg(feature = "unstable-tablets")] + { + if tablets_count == 0 { + // If the channel was closed then cluste1r was dropped, we can stop working + return; + } + let mut new_cluster_data = self.cluster_data.load_full().as_ref().clone(); + new_cluster_data.update_tablets(tablets); + self.update_cluster_data(Arc::new(new_cluster_data)); + + continue; } - let mut new_cluster_data = self.cluster_data.load_full().as_ref().clone(); - new_cluster_data.update_tablets(tablets); - self.update_cluster_data(Arc::new(new_cluster_data)); - continue; + #[cfg(not(feature = "unstable-tablets"))] + { + #[allow(clippy::let_unit_value)] + let _tablets_count = tablets_count; + continue; + } } recv_res = self.server_events_channel.recv() => { if let Some(event) = recv_res { @@ -703,6 +736,7 @@ impl ClusterWorker { &cluster_data.known_peers, &self.used_keyspace, self.host_filter.as_deref(), + #[cfg(feature = "unstable-tablets")] cluster_data.locator.tablets.clone(), ) .await, diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 94b101b631..91c96735dc 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -2,7 +2,9 @@ use bytes::Bytes; use futures::{future::RemoteHandle, FutureExt}; use scylla_cql::errors::TranslationError; use scylla_cql::frame::request::options::Options; -use scylla_cql::frame::response::result::{ResultMetadata, TableSpec}; +use scylla_cql::frame::response::result::ResultMetadata; +#[cfg(feature = "unstable-tablets")] +use scylla_cql::frame::response::result::TableSpec; use scylla_cql::frame::response::Error; use scylla_cql::frame::types::SerialConsistency; use scylla_cql::types::serialize::batch::{BatchValues, BatchValuesIterator}; @@ -43,6 +45,7 @@ use std::{ use super::errors::{BadKeyspaceName, DbError, QueryError}; use super::iterator::RowIterator; +#[cfg(feature = "unstable-tablets")] use super::locator::tablets::{RawTablet, TabletParsingError}; use super::session::AddressTranslator; use super::topology::{PeerEndpoint, UntranslatedEndpoint, UntranslatedPeer}; @@ -371,6 +374,7 @@ pub struct ConnectionConfig { pub keepalive_interval: Option, pub keepalive_timeout: Option, + #[cfg(feature = "unstable-tablets")] pub tablet_sender: Option>, } @@ -395,6 +399,7 @@ impl Default for ConnectionConfig { keepalive_interval: None, keepalive_timeout: None, + #[cfg(feature = "unstable-tablets")] tablet_sender: None, } } @@ -728,6 +733,7 @@ impl Connection { ) .await?; + #[cfg(feature = "unstable-tablets")] if let Some(spec) = prepared_statement.get_table_spec() { if let Err(e) = self .update_tablets_from_response(spec, &query_response) @@ -755,6 +761,7 @@ impl Connection { ) .await?; + #[cfg(feature = "unstable-tablets")] if let Some(spec) = prepared_statement.get_table_spec() { if let Err(e) = self.update_tablets_from_response(spec, &new_response).await { tracing::warn!( @@ -1438,6 +1445,7 @@ impl Connection { self.connect_address } + #[cfg(feature = "unstable-tablets")] async fn update_tablets_from_response( &self, table: &TableSpec, diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index b8c5ec738b..03db7f5d66 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -962,15 +962,15 @@ mod tests { load_balancing::{LoadBalancingPolicy, Plan, RoutingInfo}, routing::Token, transport::{ - locator::{ - tablets::TabletsInfo, - test::{id_to_invalid_addr, mock_metadata_for_token_aware_tests}, - }, + locator::test::{id_to_invalid_addr, mock_metadata_for_token_aware_tests}, topology::{Metadata, Peer}, ClusterData, }, }; + #[cfg(feature = "unstable-tablets")] + use crate::transport::locator::tablets::TabletsInfo; + enum ExpectedGroup { NonDeterministic(HashSet), Deterministic(HashSet), @@ -1166,6 +1166,7 @@ mod tests { &HashMap::new(), &None, None, + #[cfg(feature = "unstable-tablets")] TabletsInfo::new(), ) .await @@ -1198,6 +1199,7 @@ mod tests { &HashMap::new(), &None, None, + #[cfg(feature = "unstable-tablets")] TabletsInfo::new(), ) .await diff --git a/scylla/src/transport/locator/mod.rs b/scylla/src/transport/locator/mod.rs index 63afcc6374..d48040e7fc 100644 --- a/scylla/src/transport/locator/mod.rs +++ b/scylla/src/transport/locator/mod.rs @@ -1,6 +1,7 @@ mod precomputed_replicas; mod replicas; mod replication_info; +#[cfg(feature = "unstable-tablets")] pub(crate) mod tablets; #[cfg(test)] pub(crate) mod test; @@ -10,6 +11,7 @@ use rand::{seq::IteratorRandom, Rng}; use scylla_cql::frame::response::result::TableSpec; pub use token_ring::TokenRing; +#[cfg(feature = "unstable-tablets")] use self::tablets::TabletsInfo; use super::{topology::Strategy, Node, NodeRef}; @@ -37,6 +39,7 @@ pub struct ReplicaLocator { datacenters: Vec, + #[cfg(feature = "unstable-tablets")] pub(crate) tablets: TabletsInfo, } @@ -47,7 +50,7 @@ impl ReplicaLocator { pub(crate) fn new<'a>( ring_iter: impl Iterator)>, precompute_replica_sets_for: impl Iterator, - tablets: TabletsInfo, + #[cfg(feature = "unstable-tablets")] tablets: TabletsInfo, ) -> Self { let replication_data = ReplicationInfo::new(ring_iter); let precomputed_replicas = @@ -65,6 +68,7 @@ impl ReplicaLocator { replication_data, precomputed_replicas, datacenters, + #[cfg(feature = "unstable-tablets")] tablets, } } @@ -89,6 +93,7 @@ impl ReplicaLocator { datacenter: Option<&'a str>, table: &TableSpec, ) -> ReplicaSet<'a> { + #[cfg(feature = "unstable-tablets")] if let Some(tablets) = self.tablets.tablets_for_table(table) { let replicas: Option<&[(Arc, u32)]> = if let Some(datacenter) = datacenter { tablets.dc_replicas_for_token(token, datacenter) @@ -102,6 +107,9 @@ impl ReplicaLocator { }; } } + #[cfg(not(feature = "unstable-tablets"))] + let _table = table; + match strategy { Strategy::SimpleStrategy { replication_factor } => { if let Some(datacenter) = datacenter { @@ -257,6 +265,9 @@ fn with_computed_shard(node: NodeRef, token: Token) -> (NodeRef, Shard) { enum ReplicaSetInner<'a> { Plain(ReplicasArray<'a>), + // Unused when tablets disabled + // TODO: Remove when tablets no longer guarded by feature gate + #[allow(dead_code)] PlainSharded(&'a [(Arc, Shard)]), // Represents a set of SimpleStrategy replicas that is limited to a specified datacenter. diff --git a/scylla/src/transport/locator/test.rs b/scylla/src/transport/locator/test.rs index 0fd3af7ba0..dc7ffcf18f 100644 --- a/scylla/src/transport/locator/test.rs +++ b/scylla/src/transport/locator/test.rs @@ -3,6 +3,7 @@ use rand_chacha::ChaCha8Rng; use scylla_cql::frame::response::result::TableSpec; use uuid::Uuid; +#[cfg(feature = "unstable-tablets")] use super::tablets::TabletsInfo; use super::{ReplicaLocator, ReplicaSet}; use crate::routing::Token; @@ -204,7 +205,12 @@ pub(crate) fn create_locator(metadata: &Metadata) -> ReplicaLocator { let ring = create_ring(metadata); let strategies = metadata.keyspaces.values().map(|ks| &ks.strategy); - ReplicaLocator::new(ring, strategies, TabletsInfo::new()) + ReplicaLocator::new( + ring, + strategies, + #[cfg(feature = "unstable-tablets")] + TabletsInfo::new(), + ) } #[tokio::test] diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 288fb79ac0..7baaf3a559 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -493,6 +493,7 @@ impl Session { return Err(NewSessionError::EmptyKnownNodesList); } + #[cfg(feature = "unstable-tablets")] let (tablet_sender, tablet_receiver) = tokio::sync::mpsc::channel(8192); let connection_config = ConnectionConfig { @@ -511,6 +512,7 @@ impl Session { enable_write_coalescing: config.enable_write_coalescing, keepalive_interval: config.keepalive_interval, keepalive_timeout: config.keepalive_timeout, + #[cfg(feature = "unstable-tablets")] tablet_sender: Some(tablet_sender), }; @@ -528,6 +530,7 @@ impl Session { config.fetch_schema_metadata, config.host_filter, config.cluster_metadata_refresh_interval, + #[cfg(feature = "unstable-tablets")] tablet_receiver, ) .await?;