Skip to content

Commit

Permalink
Use tablets in locator for relevant tables. This enables shard awaren…
Browse files Browse the repository at this point in the history
…ess for tablet tables
  • Loading branch information
Lorak-mmk committed Mar 5, 2024
1 parent 45c66b8 commit a18df86
Showing 1 changed file with 53 additions and 3 deletions.
56 changes: 53 additions & 3 deletions scylla/src/transport/locator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod precomputed_replicas;
mod replicas;
mod replication_info;
pub(crate) mod tablets;
#[cfg(test)]
pub(crate) mod test;
mod token_ring;
Expand All @@ -9,6 +10,8 @@ use rand::{seq::IteratorRandom, Rng};
use scylla_cql::frame::response::result::TableSpec;
pub use token_ring::TokenRing;

use self::tablets::TabletsInfo;

use super::{topology::Strategy, Node, NodeRef};
use crate::routing::{Shard, Token};
use itertools::Itertools;
Expand All @@ -33,6 +36,8 @@ pub struct ReplicaLocator {
precomputed_replicas: PrecomputedReplicas,

datacenters: Vec<String>,

pub(crate) tablets: TabletsInfo,
}

impl ReplicaLocator {
Expand All @@ -42,6 +47,7 @@ impl ReplicaLocator {
pub(crate) fn new<'a>(
ring_iter: impl Iterator<Item = (Token, Arc<Node>)>,
precompute_replica_sets_for: impl Iterator<Item = &'a Strategy>,
tablets: TabletsInfo,
) -> Self {
let replication_data = ReplicationInfo::new(ring_iter);
let precomputed_replicas =
Expand All @@ -59,6 +65,7 @@ impl ReplicaLocator {
replication_data,
precomputed_replicas,
datacenters,
tablets,
}
}

Expand All @@ -80,8 +87,21 @@ impl ReplicaLocator {
token: Token,
strategy: &'a Strategy,
datacenter: Option<&'a str>,
_table: &TableSpec,
table: &TableSpec,
) -> ReplicaSet<'a> {
if let Some(tablets) = self.tablets.tablets_for_table(table) {
let replicas: Option<&[(Arc<Node>, u32)]> = if let Some(datacenter) = datacenter {
tablets.dc_replicas_for_token(token, datacenter)
} else {
tablets.replicas_for_token(token)
};
if let Some(replicas) = replicas {
return ReplicaSet {
inner: ReplicaSetInner::PlainSharded(replicas),
token,
};
}
}
match strategy {
Strategy::SimpleStrategy { replication_factor } => {
if let Some(datacenter) = datacenter {
Expand Down Expand Up @@ -145,7 +165,7 @@ impl ReplicaLocator {
replication_factor: 1,
},
datacenter,
_table,
table,
)
}

Expand Down Expand Up @@ -237,6 +257,8 @@ fn with_computed_shard(node: NodeRef, token: Token) -> (NodeRef, Shard) {
enum ReplicaSetInner<'a> {
Plain(ReplicasArray<'a>),

PlainSharded(&'a [(Arc<Node>, Shard)]),

// Represents a set of SimpleStrategy replicas that is limited to a specified datacenter.
FilteredSimple {
replicas: ReplicasArray<'a>,
Expand Down Expand Up @@ -294,6 +316,7 @@ impl<'a> ReplicaSet<'a> {
pub fn len(&self) -> usize {
match &self.inner {
ReplicaSetInner::Plain(replicas) => replicas.len(),
ReplicaSetInner::PlainSharded(replicas) => replicas.len(),
ReplicaSetInner::FilteredSimple {
replicas,
datacenter,
Expand Down Expand Up @@ -338,6 +361,9 @@ impl<'a> ReplicaSet<'a> {
ReplicaSetInner::Plain(replicas) => replicas
.get(index)
.map(|node| with_computed_shard(node, self.token)),
ReplicaSetInner::PlainSharded(replicas) => {
replicas.get(index).map(|(node, shard)| (node, *shard))
}
ReplicaSetInner::FilteredSimple {
replicas,
datacenter,
Expand Down Expand Up @@ -393,6 +419,9 @@ impl<'a> IntoIterator for ReplicaSet<'a> {
fn into_iter(self) -> Self::IntoIter {
let inner = match self.inner {
ReplicaSetInner::Plain(replicas) => ReplicaSetIteratorInner::Plain { replicas, idx: 0 },
ReplicaSetInner::PlainSharded(replicas) => {
ReplicaSetIteratorInner::PlainSharded { replicas, idx: 0 }
}
ReplicaSetInner::FilteredSimple {
replicas,
datacenter,
Expand Down Expand Up @@ -439,6 +468,10 @@ enum ReplicaSetIteratorInner<'a> {
replicas: ReplicasArray<'a>,
idx: usize,
},
PlainSharded {
replicas: &'a [(Arc<Node>, Shard)],
idx: usize,
},
FilteredSimple {
replicas: ReplicasArray<'a>,
datacenter: &'a str,
Expand Down Expand Up @@ -474,6 +507,14 @@ impl<'a> Iterator for ReplicaSetIterator<'a> {

None
}
ReplicaSetIteratorInner::PlainSharded { replicas, idx } => {
if let Some((replica, shard)) = replicas.get(*idx) {
*idx += 1;
return Some((replica, *shard));
}

None
}
ReplicaSetIteratorInner::FilteredSimple {
replicas,
datacenter,
Expand Down Expand Up @@ -523,6 +564,11 @@ impl<'a> Iterator for ReplicaSetIterator<'a> {

(size, Some(size))
}
ReplicaSetIteratorInner::PlainSharded { replicas, idx } => {
let size = replicas.len() - *idx;

(size, Some(size))
}
ReplicaSetIteratorInner::FilteredSimple {
replicas,
datacenter: _,
Expand Down Expand Up @@ -551,7 +597,8 @@ impl<'a> Iterator for ReplicaSetIterator<'a> {

fn nth(&mut self, n: usize) -> Option<Self::Item> {
match &mut self.inner {
ReplicaSetIteratorInner::Plain { replicas: _, idx } => {
ReplicaSetIteratorInner::Plain { replicas: _, idx }
| ReplicaSetIteratorInner::PlainSharded { replicas: _, idx } => {
*idx += n;

self.next()
Expand Down Expand Up @@ -742,6 +789,9 @@ impl<'a> IntoIterator for ReplicasOrdered<'a> {
replica_set_iter: replica_set.into_iter(),
}
}
ReplicaSetInner::PlainSharded(_) => {
todo!()
}
ReplicaSetInner::ChainedNTS {
datacenter_repfactors,
locator,
Expand Down

0 comments on commit a18df86

Please sign in to comment.