Skip to content

Commit

Permalink
Cluster: get_endpoints and related methods return shards
Browse files Browse the repository at this point in the history
Co-authored-by: Wojciech Przytuła <[email protected]>
  • Loading branch information
Lorak-mmk and wprzytula committed Mar 9, 2024
1 parent 9f5cbae commit efab01a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 15 deletions.
2 changes: 1 addition & 1 deletion examples/compare-tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn main() -> Result<()> {
.get_cluster_data()
.get_token_endpoints("examples_ks", Token { value: t })
.iter()
.map(|n| n.address)
.map(|(node, _shard)| node.address)
.collect::<Vec<NodeAddr>>()
);

Expand Down
13 changes: 7 additions & 6 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// Cluster manages up to date information and connections to database nodes
use crate::frame::response::event::{Event, StatusChangeEvent};
use crate::prepared_statement::TokenCalculationError;
use crate::routing::Token;
use crate::routing::{Shard, Token};
use crate::transport::host_filter::HostFilter;
use crate::transport::{
connection::{Connection, VerifiedKeyspaceName},
Expand All @@ -27,6 +27,7 @@ use tracing::{debug, warn};
use uuid::Uuid;

use super::node::{KnownNode, NodeAddr};
use super::NodeRef;

use super::locator::ReplicaLocator;
use super::partitioner::calculate_token_for_partition_key;
Expand Down Expand Up @@ -408,17 +409,17 @@ impl ClusterData {
}

/// Access to replicas owning a given token
pub fn get_token_endpoints(&self, keyspace: &str, token: Token) -> Vec<Arc<Node>> {
pub fn get_token_endpoints(&self, keyspace: &str, token: Token) -> Vec<(Arc<Node>, Shard)> {
self.get_token_endpoints_iter(keyspace, token)
.cloned()
.map(|(node, shard)| (node.clone(), shard))
.collect()
}

pub(crate) fn get_token_endpoints_iter(
&self,
keyspace: &str,
token: Token,
) -> impl Iterator<Item = &Arc<Node>> {
) -> impl Iterator<Item = (NodeRef<'_>, Shard)> {
let keyspace = self.keyspaces.get(keyspace);
let strategy = keyspace
.map(|k| &k.strategy)
Expand All @@ -427,7 +428,7 @@ impl ClusterData {
.replica_locator()
.replicas_for_token(token, strategy, None);

replica_set.into_iter().map(|(node, _shard)| node)
replica_set.into_iter()
}

/// Access to replicas owning a given partition key (similar to `nodetool getendpoints`)
Expand All @@ -436,7 +437,7 @@ impl ClusterData {
keyspace: &str,
table: &str,
partition_key: &SerializedValues,
) -> Result<Vec<Arc<Node>>, BadQuery> {
) -> Result<Vec<(Arc<Node>, Shard)>, BadQuery> {
Ok(self.get_token_endpoints(
keyspace,
self.compute_token(keyspace, table, partition_key)?,
Expand Down
2 changes: 1 addition & 1 deletion scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl RowIterator {
config
.cluster_data
.get_token_endpoints_iter(keyspace, token)
.cloned()
.map(|(node, shard)| (node.clone(), shard))
.collect(),
)
} else {
Expand Down
14 changes: 7 additions & 7 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2027,19 +2027,19 @@ impl RequestSpan {
self.span.record("result_rows", rows.rows.len());
}

pub(crate) fn record_replicas<'a>(&'a self, replicas: &'a [impl Borrow<Arc<Node>>]) {
struct ReplicaIps<'a, N>(&'a [N]);
pub(crate) fn record_replicas<'a>(&'a self, replicas: &'a [(impl Borrow<Arc<Node>>, Shard)]) {
struct ReplicaIps<'a, N>(&'a [(N, Shard)]);
impl<'a, N> Display for ReplicaIps<'a, N>
where
N: Borrow<Arc<Node>>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut nodes = self.0.iter();
if let Some(node) = nodes.next() {
write!(f, "{}", node.borrow().address.ip())?;
let mut nodes_with_shards = self.0.iter();
if let Some((node, shard)) = nodes_with_shards.next() {
write!(f, "{}-shard{}", node.borrow().address.ip(), shard)?;

for node in nodes {
write!(f, ",{}", node.borrow().address.ip())?;
for (node, shard) in nodes_with_shards {
write!(f, ",{}-shard{}", node.borrow().address.ip(), shard)?;
}
}
Ok(())
Expand Down

0 comments on commit efab01a

Please sign in to comment.