diff --git a/scylla/src/transport/locator/mod.rs b/scylla/src/transport/locator/mod.rs index 57b0b8fda0..539b69820d 100644 --- a/scylla/src/transport/locator/mod.rs +++ b/scylla/src/transport/locator/mod.rs @@ -1,6 +1,7 @@ mod precomputed_replicas; mod replicas; mod replication_info; +pub(crate) mod tablets; #[cfg(test)] pub(crate) mod test; mod token_ring; @@ -9,6 +10,8 @@ use rand::{seq::IteratorRandom, Rng}; use scylla_cql::frame::response::result::TableSpec; pub use token_ring::TokenRing; +use self::tablets::TabletsInfo; + use super::{topology::Strategy, Node, NodeRef}; use crate::routing::{Shard, Token}; use itertools::Itertools; @@ -33,6 +36,8 @@ pub struct ReplicaLocator { precomputed_replicas: PrecomputedReplicas, datacenters: Vec, + + pub(crate) tablets: TabletsInfo, } impl ReplicaLocator { @@ -42,6 +47,7 @@ impl ReplicaLocator { pub(crate) fn new<'a>( ring_iter: impl Iterator)>, precompute_replica_sets_for: impl Iterator, + tablets: TabletsInfo, ) -> Self { let replication_data = ReplicationInfo::new(ring_iter); let precomputed_replicas = @@ -59,6 +65,7 @@ impl ReplicaLocator { replication_data, precomputed_replicas, datacenters, + tablets, } } @@ -80,8 +87,21 @@ impl ReplicaLocator { token: Token, strategy: &'a Strategy, datacenter: Option<&'a str>, - _table: &TableSpec, + table: &TableSpec, ) -> ReplicaSet<'a> { + if let Some(tablets) = self.tablets.tablets_for_table(table) { + let replicas: Option<&[(Arc, u32)]> = if let Some(datacenter) = datacenter { + tablets.dc_replicas_for_token(token, datacenter) + } else { + tablets.replicas_for_token(token) + }; + if let Some(replicas) = replicas { + return ReplicaSet { + inner: ReplicaSetInner::PlainSharded(replicas), + token, + }; + } + } match strategy { Strategy::SimpleStrategy { replication_factor } => { if let Some(datacenter) = datacenter { @@ -145,7 +165,7 @@ impl ReplicaLocator { replication_factor: 1, }, datacenter, - _table, + table, ) } @@ -237,6 +257,8 @@ fn with_computed_shard(node: NodeRef, token: Token) -> (NodeRef, Shard) { enum ReplicaSetInner<'a> { Plain(ReplicasArray<'a>), + PlainSharded(&'a [(Arc, Shard)]), + // Represents a set of SimpleStrategy replicas that is limited to a specified datacenter. FilteredSimple { replicas: ReplicasArray<'a>, @@ -294,6 +316,7 @@ impl<'a> ReplicaSet<'a> { pub fn len(&self) -> usize { match &self.inner { ReplicaSetInner::Plain(replicas) => replicas.len(), + ReplicaSetInner::PlainSharded(replicas) => replicas.len(), ReplicaSetInner::FilteredSimple { replicas, datacenter, @@ -338,6 +361,9 @@ impl<'a> ReplicaSet<'a> { ReplicaSetInner::Plain(replicas) => replicas .get(index) .map(|node| with_computed_shard(node, self.token)), + ReplicaSetInner::PlainSharded(replicas) => { + replicas.get(index).map(|(node, shard)| (node, *shard)) + } ReplicaSetInner::FilteredSimple { replicas, datacenter, @@ -393,6 +419,9 @@ impl<'a> IntoIterator for ReplicaSet<'a> { fn into_iter(self) -> Self::IntoIter { let inner = match self.inner { ReplicaSetInner::Plain(replicas) => ReplicaSetIteratorInner::Plain { replicas, idx: 0 }, + ReplicaSetInner::PlainSharded(replicas) => { + ReplicaSetIteratorInner::PlainSharded { replicas, idx: 0 } + } ReplicaSetInner::FilteredSimple { replicas, datacenter, @@ -439,6 +468,10 @@ enum ReplicaSetIteratorInner<'a> { replicas: ReplicasArray<'a>, idx: usize, }, + PlainSharded { + replicas: &'a [(Arc, Shard)], + idx: usize, + }, FilteredSimple { replicas: ReplicasArray<'a>, datacenter: &'a str, @@ -474,6 +507,14 @@ impl<'a> Iterator for ReplicaSetIterator<'a> { None } + ReplicaSetIteratorInner::PlainSharded { replicas, idx } => { + if let Some((replica, shard)) = replicas.get(*idx) { + *idx += 1; + return Some((replica, *shard)); + } + + None + } ReplicaSetIteratorInner::FilteredSimple { replicas, datacenter, @@ -523,6 +564,11 @@ impl<'a> Iterator for ReplicaSetIterator<'a> { (size, Some(size)) } + ReplicaSetIteratorInner::PlainSharded { replicas, idx } => { + let size = replicas.len() - *idx; + + (size, Some(size)) + } ReplicaSetIteratorInner::FilteredSimple { replicas, datacenter: _, @@ -551,7 +597,8 @@ impl<'a> Iterator for ReplicaSetIterator<'a> { fn nth(&mut self, n: usize) -> Option { match &mut self.inner { - ReplicaSetIteratorInner::Plain { replicas: _, idx } => { + ReplicaSetIteratorInner::Plain { replicas: _, idx } + | ReplicaSetIteratorInner::PlainSharded { replicas: _, idx } => { *idx += n; self.next() @@ -742,6 +789,9 @@ impl<'a> IntoIterator for ReplicasOrdered<'a> { replica_set_iter: replica_set.into_iter(), } } + ReplicaSetInner::PlainSharded(_) => { + todo!() + } ReplicaSetInner::ChainedNTS { datacenter_repfactors, locator,