diff --git a/scylla/src/transport/cluster.rs b/scylla/src/transport/cluster.rs index b95519e9c2..f02329c5c1 100644 --- a/scylla/src/transport/cluster.rs +++ b/scylla/src/transport/cluster.rs @@ -1,7 +1,8 @@ use crate::frame::response::event::{Event, StatusChangeEvent}; /// Cluster manages up to date information and connections to database nodes use crate::routing::Token; -use crate::transport::connection::{Connection, ConnectionConfig, VerifiedKeyspaceName}; +use crate::transport::connection::{Connection, VerifiedKeyspaceName}; +use crate::transport::connection_pool::PoolConfig; use crate::transport::errors::QueryError; use crate::transport::node::Node; use crate::transport::topology::{Keyspace, TopologyInfo, TopologyReader}; @@ -50,7 +51,7 @@ struct ClusterWorker { // Cluster connections topology_reader: TopologyReader, - connection_config: ConnectionConfig, + pool_config: PoolConfig, // To listen for refresh requests refresh_channel: tokio::sync::mpsc::Receiver, @@ -79,7 +80,7 @@ struct UseKeyspaceRequest { impl Cluster { pub async fn new( initial_peers: &[SocketAddr], - connection_config: ConnectionConfig, + pool_config: PoolConfig, ) -> Result { let cluster_data = Arc::new(ArcSwap::from(Arc::new(ClusterData { known_peers: HashMap::new(), @@ -98,10 +99,10 @@ impl Cluster { topology_reader: TopologyReader::new( initial_peers, - connection_config.clone(), + pool_config.connection_config.clone(), server_events_sender, ), - connection_config, + pool_config, refresh_channel: refresh_receiver, server_events_channel: server_events_receiver, @@ -214,7 +215,7 @@ impl ClusterData { /// Uses provided `known_peers` hashmap to recycle nodes if possible. pub async fn new( info: TopologyInfo, - connection_config: &ConnectionConfig, + pool_config: &PoolConfig, known_peers: &HashMap>, used_keyspace: &Option, ) -> Self { @@ -236,7 +237,7 @@ impl ClusterData { _ => Arc::new( Node::new( peer.address, - connection_config.clone(), + pool_config.clone(), peer.datacenter, peer.rack, used_keyspace.clone(), @@ -435,7 +436,7 @@ impl ClusterWorker { let new_cluster_data = Arc::new( ClusterData::new( topo_info, - &self.connection_config, + &self.pool_config, &cluster_data.known_peers, &self.used_keyspace, ) diff --git a/scylla/src/transport/connection_pool.rs b/scylla/src/transport/connection_pool.rs index ae2011208d..1f3463ed03 100644 --- a/scylla/src/transport/connection_pool.rs +++ b/scylla/src/transport/connection_pool.rs @@ -24,6 +24,7 @@ use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, trace, warn}; /// The target size of a per-node connection pool. +#[derive(Debug, Clone)] pub enum PoolSize { /// Indicates that the pool should establish given number of connections to the node. /// @@ -47,6 +48,23 @@ impl Default for PoolSize { } } +#[derive(Clone)] +pub struct PoolConfig { + pub connection_config: ConnectionConfig, + pub pool_size: PoolSize, + pub can_use_shard_aware_port: bool, +} + +impl Default for PoolConfig { + fn default() -> Self { + Self { + connection_config: Default::default(), + pool_size: Default::default(), + can_use_shard_aware_port: true, + } + } +} + enum MaybePoolConnections { // The pool is empty and is waiting to be refilled Pending, @@ -74,8 +92,7 @@ impl NodeConnectionPool { pub async fn new( address: IpAddr, port: u16, - connection_config: ConnectionConfig, - refill_goal: PoolSize, + pool_config: PoolConfig, current_keyspace: Option, ) -> Self { let (use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc::channel(1); @@ -83,8 +100,7 @@ impl NodeConnectionPool { let mut refiller = PoolRefiller::new( address, port, - connection_config, - refill_goal, + pool_config, current_keyspace, use_keyspace_request_receiver, ); @@ -225,9 +241,7 @@ struct PoolRefiller { // Following information identify the pool and do not change address: IpAddr, regular_port: u16, - connection_config: ConnectionConfig, - goal: PoolSize, - can_open_shard_aware_port_connections: bool, + pool_config: PoolConfig, // Following fields are updated with information from OPTIONS shard_aware_port: Option, @@ -274,8 +288,7 @@ impl PoolRefiller { pub fn new( address: IpAddr, port: u16, - connection_config: ConnectionConfig, - refill_goal: PoolSize, + pool_config: PoolConfig, current_keyspace: Option, use_keyspace_request_receiver: mpsc::Receiver, ) -> Self { @@ -290,9 +303,7 @@ impl PoolRefiller { Self { address, regular_port: port, - connection_config, - goal: refill_goal, - can_open_shard_aware_port_connections: true, + pool_config, shard_aware_port: None, sharder: None, @@ -397,7 +408,7 @@ impl PoolRefiller { let shard_count = self.conns.len(); // Calculate the desired connection count - let target_count = match self.goal { + let target_count = match self.pool_config.pool_size { PoolSize::PerHost(target) => target.get(), PoolSize::PerShard(target) => target.get() * shard_count, }; @@ -414,7 +425,7 @@ impl PoolRefiller { match (self.sharder.as_ref(), self.shard_aware_port) { (Some(sharder), Some(shard_aware_port)) - if self.can_open_shard_aware_port_connections => + if self.pool_config.can_use_shard_aware_port => { // Try to fill up each shard up to `per_shard_target` connections for (shard_id, shard_conns) in self.conns.iter().enumerate() { @@ -433,7 +444,7 @@ impl PoolRefiller { opened_connection_futs.push(open_connection( self.address, self.regular_port, - self.connection_config.clone(), + self.pool_config.connection_config.clone(), self.current_keyspace.clone(), Some(sinfo.clone()), )); @@ -461,7 +472,7 @@ impl PoolRefiller { opened_connection_futs.push(open_connection( self.address, self.regular_port, - self.connection_config.clone(), + self.pool_config.connection_config.clone(), self.current_keyspace.clone(), None, )); @@ -493,7 +504,7 @@ impl PoolRefiller { opened_connection_futs.push(open_connection( self.address, self.regular_port, - self.connection_config.clone(), + self.pool_config.connection_config.clone(), self.current_keyspace.clone(), None, )); @@ -565,7 +576,7 @@ impl PoolRefiller { opened_connection_futs.push(open_connection( self.address, self.regular_port, - self.connection_config.clone(), + self.pool_config.connection_config.clone(), self.current_keyspace.clone(), None, )); diff --git a/scylla/src/transport/node.rs b/scylla/src/transport/node.rs index 505f10648b..bed408c263 100644 --- a/scylla/src/transport/node.rs +++ b/scylla/src/transport/node.rs @@ -1,8 +1,8 @@ /// Node represents a cluster node along with it's data and connections use crate::routing::Token; +use crate::transport::connection::Connection; use crate::transport::connection::VerifiedKeyspaceName; -use crate::transport::connection::{Connection, ConnectionConfig}; -use crate::transport::connection_pool::NodeConnectionPool; +use crate::transport::connection_pool::{NodeConnectionPool, PoolConfig}; use crate::transport::errors::QueryError; use std::{ @@ -41,20 +41,13 @@ impl Node { /// `rack` - optional rack name pub async fn new( address: SocketAddr, - connection_config: ConnectionConfig, + pool_config: PoolConfig, datacenter: Option, rack: Option, keyspace_name: Option, ) -> Self { - // TODO: Provide a RefillGoal here - let pool = NodeConnectionPool::new( - address.ip(), - address.port(), - connection_config, - Default::default(), - keyspace_name, - ) - .await; + let pool = + NodeConnectionPool::new(address.ip(), address.port(), pool_config, keyspace_name).await; Node { address, diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index b08c38a218..09cf4bf899 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -5,6 +5,7 @@ use bytes::Bytes; use futures::future::join_all; use std::future::Future; use std::net::SocketAddr; +use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use tokio::net::lookup_host; @@ -21,6 +22,7 @@ use crate::query::Query; use crate::routing::{murmur3_token, Token}; use crate::statement::Consistency; use crate::tracing::{GetTracingConfig, TracingEvent, TracingInfo}; +use crate::transport::connection_pool::PoolConfig; use crate::transport::{ cluster::Cluster, connection::{BatchResult, Connection, ConnectionConfig, QueryResult, VerifiedKeyspaceName}, @@ -35,6 +37,8 @@ use crate::transport::{ use crate::{batch::Batch, statement::StatementConfig}; use crate::{cql_to_rust::FromRow, transport::speculative_execution}; +pub use crate::transport::connection_pool::PoolSize; + #[cfg(feature = "ssl")] use openssl::ssl::SslContext; @@ -82,6 +86,14 @@ pub struct SessionConfig { pub schema_agreement_interval: Duration, pub connect_timeout: std::time::Duration, + + /// Size of the per-node connection pool, i.e. how many connections the driver should keep to each node. + /// The default is `PerShard(1)`, which is the recommended setting for Scylla clusters. + pub connection_pool_size: PoolSize, + + /// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it. + /// Generally, this options is best left as default (false). + pub disallow_shard_aware_port: bool, /* These configuration options will be added in the future: @@ -126,6 +138,8 @@ impl SessionConfig { auth_username: None, auth_password: None, connect_timeout: std::time::Duration::from_secs(5), + connection_pool_size: PoolSize::PerShard(NonZeroUsize::new(1).unwrap()), + disallow_shard_aware_port: false, } } @@ -187,6 +201,15 @@ impl SessionConfig { } } + /// Creates a PoolConfig which can be used to create NodeConnectionPools + fn get_pool_config(&self) -> PoolConfig { + PoolConfig { + connection_config: self.get_connection_config(), + pool_size: self.connection_pool_size.clone(), + can_use_shard_aware_port: !self.disallow_shard_aware_port, + } + } + /// Makes a config that should be used in Connection fn get_connection_config(&self) -> ConnectionConfig { ConnectionConfig { @@ -290,7 +313,7 @@ impl Session { node_addresses.extend(resolved); - let cluster = Cluster::new(&node_addresses, config.get_connection_config()).await?; + let cluster = Cluster::new(&node_addresses, config.get_pool_config()).await?; let session = Session { cluster, diff --git a/scylla/src/transport/session_builder.rs b/scylla/src/transport/session_builder.rs index b3b176f344..67304b0f6f 100644 --- a/scylla/src/transport/session_builder.rs +++ b/scylla/src/transport/session_builder.rs @@ -5,7 +5,7 @@ use super::load_balancing::LoadBalancingPolicy; use super::session::{Session, SessionConfig}; use super::speculative_execution::SpeculativeExecutionPolicy; use super::Compression; -use crate::transport::retry_policy::RetryPolicy; +use crate::transport::{connection_pool::PoolSize, retry_policy::RetryPolicy}; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -380,6 +380,70 @@ impl SessionBuilder { self.config.connect_timeout = duration; self } + + /// Sets the per-node connection pool size. + /// The default is one connection per shard, which is the recommended setting for Scylla. + /// + /// # Example + /// ``` + /// # use scylla::{Session, SessionBuilder}; + /// # async fn example() -> Result<(), Box> { + /// use std::num::NonZeroUsize; + /// use scylla::transport::session::PoolSize; + /// + /// // This session will establish 4 connections to each node. + /// // For Scylla clusters, this number will be divided across shards + /// let session: Session = SessionBuilder::new() + /// .known_node("127.0.0.1:9042") + /// .pool_size(PoolSize::PerHost(NonZeroUsize::new(4).unwrap())) + /// .build() + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn pool_size(mut self, size: PoolSize) -> Self { + self.config.connection_pool_size = size; + self + } + + /// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it. + /// + /// _This is a Scylla-specific option_. It has no effect on Cassandra clusters. + /// + /// By default, connecting to the shard-aware port is __allowed__ and, in general, this setting + /// _should not be changed_. The shard-aware port (19042 or 19142) makes the process of + /// establishing connection per shard more robust compared to the regular transport port + /// (9042 or 9142). With the shard-aware port, the driver is able to choose which shard + /// will be assigned to the connection. + /// + /// In order to be able to use the shard-aware port effectively, the port needs to be + /// reachable and not behind a NAT which changes source ports (the driver uses the source port + /// to tell Scylla which shard to assign). However, the driver is designed to behave in a robust + /// way if those conditions are not met - if the driver fails to connect to the port or gets + /// a connection to the wrong shard, it will re-attempt the connection to the regular transport port. + /// + /// The only cost of misconfigured shard-aware port should be a slightly longer reconnection time. + /// If it is unacceptable to you or suspect that it causes you some other problems, + /// you can use this option to disable the shard-aware port feature completely. + /// However, __you should use it as a last resort__. Before you do that, we strongly recommend + /// that you consider fixing the network issues. + /// + /// # Example + /// ``` + /// # use scylla::{Session, SessionBuilder}; + /// # async fn example() -> Result<(), Box> { + /// let session: Session = SessionBuilder::new() + /// .known_node("127.0.0.1:9042") + /// .disallow_shard_aware_port(true) + /// .build() + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn disallow_shard_aware_port(mut self, disallow: bool) -> Self { + self.config.disallow_shard_aware_port = disallow; + self + } } /// Creates a [`SessionBuilder`] with default configuration, same as [`SessionBuilder::new`]