Skip to content

Commit

Permalink
config: expose pool size and shard-aware switch options
Browse files Browse the repository at this point in the history
Exposes two new options in the configuration:

- connection_pool_size - the target connection pool size,
- disallow_shard_aware_port - setting it to true prevents the driver
  from using the shard-aware port at all.

Both options can also be set in the SessionBuilder.
  • Loading branch information
piodul committed Jul 4, 2021
1 parent b0d3e10 commit eb11747
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 40 deletions.
17 changes: 9 additions & 8 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::frame::response::event::{Event, StatusChangeEvent};
/// Cluster manages up to date information and connections to database nodes
use crate::routing::Token;
use crate::transport::connection::{Connection, ConnectionConfig, VerifiedKeyspaceName};
use crate::transport::connection::{Connection, VerifiedKeyspaceName};
use crate::transport::connection_pool::PoolConfig;
use crate::transport::errors::QueryError;
use crate::transport::node::Node;
use crate::transport::topology::{Keyspace, TopologyInfo, TopologyReader};
Expand Down Expand Up @@ -50,7 +51,7 @@ struct ClusterWorker {

// Cluster connections
topology_reader: TopologyReader,
connection_config: ConnectionConfig,
pool_config: PoolConfig,

// To listen for refresh requests
refresh_channel: tokio::sync::mpsc::Receiver<RefreshRequest>,
Expand Down Expand Up @@ -79,7 +80,7 @@ struct UseKeyspaceRequest {
impl Cluster {
pub async fn new(
initial_peers: &[SocketAddr],
connection_config: ConnectionConfig,
pool_config: PoolConfig,
) -> Result<Cluster, QueryError> {
let cluster_data = Arc::new(ArcSwap::from(Arc::new(ClusterData {
known_peers: HashMap::new(),
Expand All @@ -98,10 +99,10 @@ impl Cluster {

topology_reader: TopologyReader::new(
initial_peers,
connection_config.clone(),
pool_config.connection_config.clone(),
server_events_sender,
),
connection_config,
pool_config,

refresh_channel: refresh_receiver,
server_events_channel: server_events_receiver,
Expand Down Expand Up @@ -214,7 +215,7 @@ impl ClusterData {
/// Uses provided `known_peers` hashmap to recycle nodes if possible.
pub async fn new(
info: TopologyInfo,
connection_config: &ConnectionConfig,
pool_config: &PoolConfig,
known_peers: &HashMap<SocketAddr, Arc<Node>>,
used_keyspace: &Option<VerifiedKeyspaceName>,
) -> Self {
Expand All @@ -236,7 +237,7 @@ impl ClusterData {
_ => Arc::new(
Node::new(
peer.address,
connection_config.clone(),
pool_config.clone(),
peer.datacenter,
peer.rack,
used_keyspace.clone(),
Expand Down Expand Up @@ -435,7 +436,7 @@ impl ClusterWorker {
let new_cluster_data = Arc::new(
ClusterData::new(
topo_info,
&self.connection_config,
&self.pool_config,
&cluster_data.known_peers,
&self.used_keyspace,
)
Expand Down
47 changes: 29 additions & 18 deletions scylla/src/transport/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, trace, warn};

/// The target size of a per-node connection pool.
#[derive(Debug, Clone)]
pub enum PoolSize {
/// Indicates that the pool should establish given number of connections to the node.
///
Expand All @@ -47,6 +48,23 @@ impl Default for PoolSize {
}
}

#[derive(Clone)]
pub struct PoolConfig {
pub connection_config: ConnectionConfig,
pub pool_size: PoolSize,
pub can_use_shard_aware_port: bool,
}

impl Default for PoolConfig {
fn default() -> Self {
Self {
connection_config: Default::default(),
pool_size: Default::default(),
can_use_shard_aware_port: true,
}
}
}

enum MaybePoolConnections {
// The pool is empty and is waiting to be refilled
Pending,
Expand Down Expand Up @@ -74,17 +92,15 @@ impl NodeConnectionPool {
pub async fn new(
address: IpAddr,
port: u16,
connection_config: ConnectionConfig,
refill_goal: PoolSize,
pool_config: PoolConfig,
current_keyspace: Option<VerifiedKeyspaceName>,
) -> Self {
let (use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc::channel(1);

let mut refiller = PoolRefiller::new(
address,
port,
connection_config,
refill_goal,
pool_config,
current_keyspace,
use_keyspace_request_receiver,
);
Expand Down Expand Up @@ -225,9 +241,7 @@ struct PoolRefiller {
// Following information identify the pool and do not change
address: IpAddr,
regular_port: u16,
connection_config: ConnectionConfig,
goal: PoolSize,
can_open_shard_aware_port_connections: bool,
pool_config: PoolConfig,

// Following fields are updated with information from OPTIONS
shard_aware_port: Option<u16>,
Expand Down Expand Up @@ -274,8 +288,7 @@ impl PoolRefiller {
pub fn new(
address: IpAddr,
port: u16,
connection_config: ConnectionConfig,
refill_goal: PoolSize,
pool_config: PoolConfig,
current_keyspace: Option<VerifiedKeyspaceName>,
use_keyspace_request_receiver: mpsc::Receiver<UseKeyspaceRequest>,
) -> Self {
Expand All @@ -290,9 +303,7 @@ impl PoolRefiller {
Self {
address,
regular_port: port,
connection_config,
goal: refill_goal,
can_open_shard_aware_port_connections: true,
pool_config,

shard_aware_port: None,
sharder: None,
Expand Down Expand Up @@ -397,7 +408,7 @@ impl PoolRefiller {
let shard_count = self.conns.len();

// Calculate the desired connection count
let target_count = match self.goal {
let target_count = match self.pool_config.pool_size {
PoolSize::PerHost(target) => target.get(),
PoolSize::PerShard(target) => target.get() * shard_count,
};
Expand All @@ -414,7 +425,7 @@ impl PoolRefiller {

match (self.sharder.as_ref(), self.shard_aware_port) {
(Some(sharder), Some(shard_aware_port))
if self.can_open_shard_aware_port_connections =>
if self.pool_config.can_use_shard_aware_port =>
{
// Try to fill up each shard up to `per_shard_target` connections
for (shard_id, shard_conns) in self.conns.iter().enumerate() {
Expand All @@ -433,7 +444,7 @@ impl PoolRefiller {
opened_connection_futs.push(open_connection(
self.address,
self.regular_port,
self.connection_config.clone(),
self.pool_config.connection_config.clone(),
self.current_keyspace.clone(),
Some(sinfo.clone()),
));
Expand Down Expand Up @@ -461,7 +472,7 @@ impl PoolRefiller {
opened_connection_futs.push(open_connection(
self.address,
self.regular_port,
self.connection_config.clone(),
self.pool_config.connection_config.clone(),
self.current_keyspace.clone(),
None,
));
Expand Down Expand Up @@ -493,7 +504,7 @@ impl PoolRefiller {
opened_connection_futs.push(open_connection(
self.address,
self.regular_port,
self.connection_config.clone(),
self.pool_config.connection_config.clone(),
self.current_keyspace.clone(),
None,
));
Expand Down Expand Up @@ -565,7 +576,7 @@ impl PoolRefiller {
opened_connection_futs.push(open_connection(
self.address,
self.regular_port,
self.connection_config.clone(),
self.pool_config.connection_config.clone(),
self.current_keyspace.clone(),
None,
));
Expand Down
17 changes: 5 additions & 12 deletions scylla/src/transport/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/// Node represents a cluster node along with it's data and connections
use crate::routing::Token;
use crate::transport::connection::Connection;
use crate::transport::connection::VerifiedKeyspaceName;
use crate::transport::connection::{Connection, ConnectionConfig};
use crate::transport::connection_pool::NodeConnectionPool;
use crate::transport::connection_pool::{NodeConnectionPool, PoolConfig};
use crate::transport::errors::QueryError;

use std::{
Expand Down Expand Up @@ -41,20 +41,13 @@ impl Node {
/// `rack` - optional rack name
pub async fn new(
address: SocketAddr,
connection_config: ConnectionConfig,
pool_config: PoolConfig,
datacenter: Option<String>,
rack: Option<String>,
keyspace_name: Option<VerifiedKeyspaceName>,
) -> Self {
// TODO: Provide a RefillGoal here
let pool = NodeConnectionPool::new(
address.ip(),
address.port(),
connection_config,
Default::default(),
keyspace_name,
)
.await;
let pool =
NodeConnectionPool::new(address.ip(), address.port(), pool_config, keyspace_name).await;

Node {
address,
Expand Down
25 changes: 24 additions & 1 deletion scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bytes::Bytes;
use futures::future::join_all;
use std::future::Future;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::lookup_host;
Expand All @@ -21,6 +22,7 @@ use crate::query::Query;
use crate::routing::{murmur3_token, Token};
use crate::statement::Consistency;
use crate::tracing::{GetTracingConfig, TracingEvent, TracingInfo};
use crate::transport::connection_pool::PoolConfig;
use crate::transport::{
cluster::Cluster,
connection::{BatchResult, Connection, ConnectionConfig, QueryResult, VerifiedKeyspaceName},
Expand All @@ -35,6 +37,8 @@ use crate::transport::{
use crate::{batch::Batch, statement::StatementConfig};
use crate::{cql_to_rust::FromRow, transport::speculative_execution};

pub use crate::transport::connection_pool::PoolSize;

#[cfg(feature = "ssl")]
use openssl::ssl::SslContext;

Expand Down Expand Up @@ -82,6 +86,14 @@ pub struct SessionConfig {

pub schema_agreement_interval: Duration,
pub connect_timeout: std::time::Duration,

/// Size of the per-node connection pool, i.e. how many connections the driver should keep to each node.
/// The default is `PerShard(1)`, which is the recommended setting for Scylla clusters.
pub connection_pool_size: PoolSize,

/// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it.
/// Generally, this options is best left as default (false).
pub disallow_shard_aware_port: bool,
/*
These configuration options will be added in the future:
Expand Down Expand Up @@ -126,6 +138,8 @@ impl SessionConfig {
auth_username: None,
auth_password: None,
connect_timeout: std::time::Duration::from_secs(5),
connection_pool_size: PoolSize::PerShard(NonZeroUsize::new(1).unwrap()),
disallow_shard_aware_port: false,
}
}

Expand Down Expand Up @@ -187,6 +201,15 @@ impl SessionConfig {
}
}

/// Creates a PoolConfig which can be used to create NodeConnectionPools
fn get_pool_config(&self) -> PoolConfig {
PoolConfig {
connection_config: self.get_connection_config(),
pool_size: self.connection_pool_size.clone(),
can_use_shard_aware_port: !self.disallow_shard_aware_port,
}
}

/// Makes a config that should be used in Connection
fn get_connection_config(&self) -> ConnectionConfig {
ConnectionConfig {
Expand Down Expand Up @@ -290,7 +313,7 @@ impl Session {

node_addresses.extend(resolved);

let cluster = Cluster::new(&node_addresses, config.get_connection_config()).await?;
let cluster = Cluster::new(&node_addresses, config.get_pool_config()).await?;

let session = Session {
cluster,
Expand Down
66 changes: 65 additions & 1 deletion scylla/src/transport/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::load_balancing::LoadBalancingPolicy;
use super::session::{Session, SessionConfig};
use super::speculative_execution::SpeculativeExecutionPolicy;
use super::Compression;
use crate::transport::retry_policy::RetryPolicy;
use crate::transport::{connection_pool::PoolSize, retry_policy::RetryPolicy};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -380,6 +380,70 @@ impl SessionBuilder {
self.config.connect_timeout = duration;
self
}

/// Sets the per-node connection pool size.
/// The default is one connection per shard, which is the recommended setting for Scylla.
///
/// # Example
/// ```
/// # use scylla::{Session, SessionBuilder};
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// use std::num::NonZeroUsize;
/// use scylla::transport::session::PoolSize;
///
/// // This session will establish 4 connections to each node.
/// // For Scylla clusters, this number will be divided across shards
/// let session: Session = SessionBuilder::new()
/// .known_node("127.0.0.1:9042")
/// .pool_size(PoolSize::PerHost(NonZeroUsize::new(4).unwrap()))
/// .build()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn pool_size(mut self, size: PoolSize) -> Self {
self.config.connection_pool_size = size;
self
}

/// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it.
///
/// _This is a Scylla-specific option_. It has no effect on Cassandra clusters.
///
/// By default, connecting to the shard-aware port is __allowed__ and, in general, this setting
/// _should not be changed_. The shard-aware port (19042 or 19142) makes the process of
/// establishing connection per shard more robust compared to the regular transport port
/// (9042 or 9142). With the shard-aware port, the driver is able to choose which shard
/// will be assigned to the connection.
///
/// In order to be able to use the shard-aware port effectively, the port needs to be
/// reachable and not behind a NAT which changes source ports (the driver uses the source port
/// to tell Scylla which shard to assign). However, the driver is designed to behave in a robust
/// way if those conditions are not met - if the driver fails to connect to the port or gets
/// a connection to the wrong shard, it will re-attempt the connection to the regular transport port.
///
/// The only cost of misconfigured shard-aware port should be a slightly longer reconnection time.
/// If it is unacceptable to you or suspect that it causes you some other problems,
/// you can use this option to disable the shard-aware port feature completely.
/// However, __you should use it as a last resort__. Before you do that, we strongly recommend
/// that you consider fixing the network issues.
///
/// # Example
/// ```
/// # use scylla::{Session, SessionBuilder};
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let session: Session = SessionBuilder::new()
/// .known_node("127.0.0.1:9042")
/// .disallow_shard_aware_port(true)
/// .build()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn disallow_shard_aware_port(mut self, disallow: bool) -> Self {
self.config.disallow_shard_aware_port = disallow;
self
}
}

/// Creates a [`SessionBuilder`] with default configuration, same as [`SessionBuilder::new`]
Expand Down

0 comments on commit eb11747

Please sign in to comment.