Skip to content

Commit

Permalink
Move tablets behind unstable feature gate
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorak-mmk committed Mar 13, 2024
1 parent a32fc31 commit 1021299
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 20 deletions.
3 changes: 2 additions & 1 deletion scylla/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ num-bigint-03 = ["scylla-cql/num-bigint-03"]
num-bigint-04 = ["scylla-cql/num-bigint-04"]
bigdecimal-04 = ["scylla-cql/bigdecimal-04"]
full-serialization = ["chrono", "time", "secret", "num-bigint-03", "num-bigint-04", "bigdecimal-04"]
unstable-tablets = ["dep:lazy_static"]

[dependencies]
scylla-macros = { version = "0.4.0", path = "../scylla-macros" }
Expand Down Expand Up @@ -53,7 +54,7 @@ url = { version = "2.3.1", optional = true }
base64 = { version = "0.21.1", optional = true }
rand_pcg = "0.3.1"
socket2 = { version = "0.5.3", features = ["all"] }
lazy_static = "1"
lazy_static = { version = "1", optional = true }

[dev-dependencies]
num-bigint-03 = { package = "num-bigint", version = "0.3" }
Expand Down
58 changes: 46 additions & 12 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tracing::instrument::WithSubscriber;
use tracing::{debug, warn};
use uuid::Uuid;

#[cfg(feature = "unstable-tablets")]
use super::locator::tablets::{RawTablet, Tablet, TabletsInfo};
use super::node::{KnownNode, NodeAddr};
use super::NodeRef;
Expand Down Expand Up @@ -119,6 +120,7 @@ struct ClusterWorker {

// Channel used to receive info about new tablets from custom payload in responses
// sent by server.
#[cfg(feature = "unstable-tablets")]
tablets_channel: tokio::sync::mpsc::Receiver<(TableSpec, RawTablet)>,

// Keyspace send in "USE <keyspace name>" when opening each connection
Expand Down Expand Up @@ -152,7 +154,10 @@ impl Cluster {
fetch_schema_metadata: bool,
host_filter: Option<Arc<dyn HostFilter>>,
cluster_metadata_refresh_interval: Duration,
tablet_receiver: tokio::sync::mpsc::Receiver<(TableSpec, RawTablet)>,
#[cfg(feature = "unstable-tablets")] tablet_receiver: tokio::sync::mpsc::Receiver<(
TableSpec,
RawTablet,
)>,
) -> Result<Cluster, NewSessionError> {
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32);
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32);
Expand All @@ -179,6 +184,7 @@ impl Cluster {
&HashMap::new(),
&None,
host_filter.as_deref(),
#[cfg(feature = "unstable-tablets")]
TabletsInfo::new(),
)
.await;
Expand All @@ -195,6 +201,7 @@ impl Cluster {
refresh_channel: refresh_receiver,
server_events_channel: server_events_receiver,
control_connection_repair_channel: control_connection_repair_receiver,
#[cfg(feature = "unstable-tablets")]
tablets_channel: tablet_receiver,

use_keyspace_channel: use_keyspace_receiver,
Expand Down Expand Up @@ -284,7 +291,7 @@ impl ClusterData {
known_peers: &HashMap<Uuid, Arc<Node>>,
used_keyspace: &Option<VerifiedKeyspaceName>,
host_filter: Option<&dyn HostFilter>,
tablets: TabletsInfo,
#[cfg(feature = "unstable-tablets")] tablets: TabletsInfo,
) -> Self {
// Create new updated known_peers and ring
let mut new_known_peers: HashMap<Uuid, Arc<Node>> =
Expand Down Expand Up @@ -352,7 +359,12 @@ impl ClusterData {
let keyspaces = metadata.keyspaces;
let (locator, keyspaces) = tokio::task::spawn_blocking(move || {
let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy);
let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies, tablets);
let locator = ReplicaLocator::new(
ring.into_iter(),
keyspace_strategies,
#[cfg(feature = "unstable-tablets")]
tablets,
);
(locator, keyspaces)
})
.await
Expand Down Expand Up @@ -494,6 +506,7 @@ impl ClusterData {
// is nonempty, too.
}

#[cfg(feature = "unstable-tablets")]
fn update_tablets(&mut self, raw_tablets: Vec<(TableSpec, RawTablet)>) {
let replica_translator = |uuid: Uuid| self.known_peers.get(&uuid).map(Arc::clone);

Expand Down Expand Up @@ -524,9 +537,19 @@ impl ClusterWorker {
})
.unwrap_or_else(Instant::now);

let sleep_future = tokio::time::sleep_until(sleep_until);

#[cfg(feature = "unstable-tablets")]
let mut tablets = Vec::new();
#[cfg(feature = "unstable-tablets")]
let channel_ref = &mut self.tablets_channel;
#[cfg(feature = "unstable-tablets")]
let tablets_ref = &mut tablets;
#[cfg(feature = "unstable-tablets")]
let tablets_poll = { || async move { channel_ref.recv_many(tablets_ref, 8192).await } };
#[cfg(not(feature = "unstable-tablets"))]
let tablets_poll = std::future::pending;

let sleep_future = tokio::time::sleep_until(sleep_until);
tokio::pin!(sleep_future);

tokio::select! {
Expand All @@ -537,16 +560,26 @@ impl ClusterWorker {
None => return, // If refresh_channel was closed then cluster was dropped, we can stop working
}
}
tablets_count = self.tablets_channel.recv_many(&mut tablets, 8192) => {
if tablets_count == 0 {
// If the channel was closed then cluster was dropped, we can stop working
return;
tablets_count = tablets_poll() => {
#[cfg(feature = "unstable-tablets")]
{
if tablets_count == 0 {
// If the channel was closed then cluste1r was dropped, we can stop working
return;
}
let mut new_cluster_data = self.cluster_data.load_full().as_ref().clone();
new_cluster_data.update_tablets(tablets);
self.update_cluster_data(Arc::new(new_cluster_data));

continue;
}
let mut new_cluster_data = self.cluster_data.load_full().as_ref().clone();
new_cluster_data.update_tablets(tablets);
self.update_cluster_data(Arc::new(new_cluster_data));

continue;
#[cfg(not(feature = "unstable-tablets"))]
{
#[allow(clippy::let_unit_value)]
let _tablets_count = tablets_count;
continue;
}
}
recv_res = self.server_events_channel.recv() => {
if let Some(event) = recv_res {
Expand Down Expand Up @@ -703,6 +736,7 @@ impl ClusterWorker {
&cluster_data.known_peers,
&self.used_keyspace,
self.host_filter.as_deref(),
#[cfg(feature = "unstable-tablets")]
cluster_data.locator.tablets.clone(),
)
.await,
Expand Down
10 changes: 9 additions & 1 deletion scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use bytes::Bytes;
use futures::{future::RemoteHandle, FutureExt};
use scylla_cql::errors::TranslationError;
use scylla_cql::frame::request::options::Options;
use scylla_cql::frame::response::result::{ResultMetadata, TableSpec};
use scylla_cql::frame::response::result::ResultMetadata;
#[cfg(feature = "unstable-tablets")]
use scylla_cql::frame::response::result::TableSpec;
use scylla_cql::frame::response::Error;
use scylla_cql::frame::types::SerialConsistency;
use scylla_cql::types::serialize::batch::{BatchValues, BatchValuesIterator};
Expand Down Expand Up @@ -43,6 +45,7 @@ use std::{

use super::errors::{BadKeyspaceName, DbError, QueryError};
use super::iterator::RowIterator;
#[cfg(feature = "unstable-tablets")]
use super::locator::tablets::{RawTablet, TabletParsingError};
use super::session::AddressTranslator;
use super::topology::{PeerEndpoint, UntranslatedEndpoint, UntranslatedPeer};
Expand Down Expand Up @@ -371,6 +374,7 @@ pub struct ConnectionConfig {
pub keepalive_interval: Option<Duration>,
pub keepalive_timeout: Option<Duration>,

#[cfg(feature = "unstable-tablets")]
pub tablet_sender: Option<mpsc::Sender<(TableSpec, RawTablet)>>,
}

Expand All @@ -395,6 +399,7 @@ impl Default for ConnectionConfig {
keepalive_interval: None,
keepalive_timeout: None,

#[cfg(feature = "unstable-tablets")]
tablet_sender: None,
}
}
Expand Down Expand Up @@ -728,6 +733,7 @@ impl Connection {
)
.await?;

#[cfg(feature = "unstable-tablets")]
if let Some(spec) = prepared_statement.get_table_spec() {
if let Err(e) = self
.update_tablets_from_response(spec, &query_response)
Expand Down Expand Up @@ -755,6 +761,7 @@ impl Connection {
)
.await?;

#[cfg(feature = "unstable-tablets")]
if let Some(spec) = prepared_statement.get_table_spec() {
if let Err(e) = self.update_tablets_from_response(spec, &new_response).await {
tracing::warn!(
Expand Down Expand Up @@ -1438,6 +1445,7 @@ impl Connection {
self.connect_address
}

#[cfg(feature = "unstable-tablets")]
async fn update_tablets_from_response(
&self,
table: &TableSpec,
Expand Down
10 changes: 6 additions & 4 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,15 +962,15 @@ mod tests {
load_balancing::{LoadBalancingPolicy, Plan, RoutingInfo},
routing::Token,
transport::{
locator::{
tablets::TabletsInfo,
test::{id_to_invalid_addr, mock_metadata_for_token_aware_tests},
},
locator::test::{id_to_invalid_addr, mock_metadata_for_token_aware_tests},
topology::{Metadata, Peer},
ClusterData,
},
};

#[cfg(feature = "unstable-tablets")]
use crate::transport::locator::tablets::TabletsInfo;

enum ExpectedGroup {
NonDeterministic(HashSet<u16>),
Deterministic(HashSet<u16>),
Expand Down Expand Up @@ -1166,6 +1166,7 @@ mod tests {
&HashMap::new(),
&None,
None,
#[cfg(feature = "unstable-tablets")]
TabletsInfo::new(),
)
.await
Expand Down Expand Up @@ -1198,6 +1199,7 @@ mod tests {
&HashMap::new(),
&None,
None,
#[cfg(feature = "unstable-tablets")]
TabletsInfo::new(),
)
.await
Expand Down
13 changes: 12 additions & 1 deletion scylla/src/transport/locator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod precomputed_replicas;
mod replicas;
mod replication_info;
#[cfg(feature = "unstable-tablets")]
pub(crate) mod tablets;
#[cfg(test)]
pub(crate) mod test;
Expand All @@ -10,6 +11,7 @@ use rand::{seq::IteratorRandom, Rng};
use scylla_cql::frame::response::result::TableSpec;
pub use token_ring::TokenRing;

#[cfg(feature = "unstable-tablets")]
use self::tablets::TabletsInfo;

use super::{topology::Strategy, Node, NodeRef};
Expand Down Expand Up @@ -37,6 +39,7 @@ pub struct ReplicaLocator {

datacenters: Vec<String>,

#[cfg(feature = "unstable-tablets")]
pub(crate) tablets: TabletsInfo,
}

Expand All @@ -47,7 +50,7 @@ impl ReplicaLocator {
pub(crate) fn new<'a>(
ring_iter: impl Iterator<Item = (Token, Arc<Node>)>,
precompute_replica_sets_for: impl Iterator<Item = &'a Strategy>,
tablets: TabletsInfo,
#[cfg(feature = "unstable-tablets")] tablets: TabletsInfo,
) -> Self {
let replication_data = ReplicationInfo::new(ring_iter);
let precomputed_replicas =
Expand All @@ -65,6 +68,7 @@ impl ReplicaLocator {
replication_data,
precomputed_replicas,
datacenters,
#[cfg(feature = "unstable-tablets")]
tablets,
}
}
Expand All @@ -89,6 +93,7 @@ impl ReplicaLocator {
datacenter: Option<&'a str>,
table: &TableSpec,
) -> ReplicaSet<'a> {
#[cfg(feature = "unstable-tablets")]
if let Some(tablets) = self.tablets.tablets_for_table(table) {
let replicas: Option<&[(Arc<Node>, u32)]> = if let Some(datacenter) = datacenter {
tablets.dc_replicas_for_token(token, datacenter)
Expand All @@ -102,6 +107,9 @@ impl ReplicaLocator {
};
}
}
#[cfg(not(feature = "unstable-tablets"))]
let _table = table;

match strategy {
Strategy::SimpleStrategy { replication_factor } => {
if let Some(datacenter) = datacenter {
Expand Down Expand Up @@ -257,6 +265,9 @@ fn with_computed_shard(node: NodeRef, token: Token) -> (NodeRef, Shard) {
enum ReplicaSetInner<'a> {
Plain(ReplicasArray<'a>),

// Unused when tablets disabled
// TODO: Remove when tablets no longer guarded by feature gate
#[allow(dead_code)]
PlainSharded(&'a [(Arc<Node>, Shard)]),

// Represents a set of SimpleStrategy replicas that is limited to a specified datacenter.
Expand Down
8 changes: 7 additions & 1 deletion scylla/src/transport/locator/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use rand_chacha::ChaCha8Rng;
use scylla_cql::frame::response::result::TableSpec;
use uuid::Uuid;

#[cfg(feature = "unstable-tablets")]
use super::tablets::TabletsInfo;
use super::{ReplicaLocator, ReplicaSet};
use crate::routing::Token;
Expand Down Expand Up @@ -204,7 +205,12 @@ pub(crate) fn create_locator(metadata: &Metadata) -> ReplicaLocator {
let ring = create_ring(metadata);
let strategies = metadata.keyspaces.values().map(|ks| &ks.strategy);

ReplicaLocator::new(ring, strategies, TabletsInfo::new())
ReplicaLocator::new(
ring,
strategies,
#[cfg(feature = "unstable-tablets")]
TabletsInfo::new(),
)
}

#[tokio::test]
Expand Down
3 changes: 3 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ impl Session {
return Err(NewSessionError::EmptyKnownNodesList);
}

#[cfg(feature = "unstable-tablets")]
let (tablet_sender, tablet_receiver) = tokio::sync::mpsc::channel(8192);

let connection_config = ConnectionConfig {
Expand All @@ -511,6 +512,7 @@ impl Session {
enable_write_coalescing: config.enable_write_coalescing,
keepalive_interval: config.keepalive_interval,
keepalive_timeout: config.keepalive_timeout,
#[cfg(feature = "unstable-tablets")]
tablet_sender: Some(tablet_sender),
};

Expand All @@ -528,6 +530,7 @@ impl Session {
config.fetch_schema_metadata,
config.host_filter,
config.cluster_metadata_refresh_interval,
#[cfg(feature = "unstable-tablets")]
tablet_receiver,
)
.await?;
Expand Down

0 comments on commit 1021299

Please sign in to comment.