Skip to content

Commit

Permalink
Make LoadBalancingPolicy shard-aware
Browse files Browse the repository at this point in the history
Co-authored-by: Wojciech Przytuła <[email protected]>
  • Loading branch information
Lorak-mmk and wprzytula committed Mar 2, 2024
1 parent 7c9059d commit dbc52fa
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 115 deletions.
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ uuid = "1.0"
tower = "0.4"
stats_alloc = "0.1"
clap = { version = "3.2.4", features = ["derive"] }
rand = "0.8.5"

[[example]]
name = "auth"
Expand Down
23 changes: 20 additions & 3 deletions examples/custom_load_balancing_policy.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,37 @@
use anyhow::Result;
use rand::thread_rng;
use rand::Rng;
use scylla::transport::NodeRef;
use scylla::{
load_balancing::{LoadBalancingPolicy, RoutingInfo},
routing::Shard,
transport::{ClusterData, ExecutionProfile},
Session, SessionBuilder,
};
use std::{env, sync::Arc};

/// Example load balancing policy that prefers nodes from favorite datacenter
/// This is, of course, very naive, as it is completely non token-aware.
/// For more realistic implementation, see [`DefaultPolicy`](scylla::load_balancing::DefaultPolicy).
#[derive(Debug)]
struct CustomLoadBalancingPolicy {
fav_datacenter_name: String,
}

fn with_random_shard(node: NodeRef) -> (NodeRef, Shard) {
let nr_shards = node
.sharder()
.map(|sharder| sharder.nr_shards.get())
.unwrap_or(1);
(node, thread_rng().gen_range(0..nr_shards) as Shard)
}

impl LoadBalancingPolicy for CustomLoadBalancingPolicy {
fn pick<'a>(&'a self, _info: &'a RoutingInfo, cluster: &'a ClusterData) -> Option<NodeRef<'a>> {
fn pick<'a>(
&'a self,
_info: &'a RoutingInfo,
cluster: &'a ClusterData,
) -> Option<(NodeRef<'a>, Shard)> {
self.fallback(_info, cluster).next()
}

Expand All @@ -28,9 +45,9 @@ impl LoadBalancingPolicy for CustomLoadBalancingPolicy {
.unique_nodes_in_datacenter_ring(&self.fav_datacenter_name);

match fav_dc_nodes {
Some(nodes) => Box::new(nodes.iter()),
Some(nodes) => Box::new(nodes.iter().map(with_random_shard)),
// If there is no dc with provided name, fallback to other datacenters
None => Box::new(cluster.get_nodes_info().iter()),
None => Box::new(cluster.get_nodes_info().iter().map(with_random_shard)),
}
}

Expand Down
Loading

0 comments on commit dbc52fa

Please sign in to comment.