Skip to content

Commit

Permalink
config: expose pool size and shard-aware switch options
Browse files Browse the repository at this point in the history
Exposes two new options in the configuration:

- connection_pool_size - the target connection pool size,
- disallow_shard_aware_port - setting it to true prevents the driver
  from using the shard-aware port at all.

Both options can also be set in the SessionBuilder.
  • Loading branch information
piodul committed Nov 3, 2021
1 parent 622e22a commit c46a894
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 32 deletions.
17 changes: 9 additions & 8 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::frame::response::event::{Event, StatusChangeEvent};
/// Cluster manages up to date information and connections to database nodes
use crate::routing::Token;
use crate::transport::connection::{Connection, ConnectionConfig, VerifiedKeyspaceName};
use crate::transport::connection::{Connection, VerifiedKeyspaceName};
use crate::transport::connection_pool::PoolConfig;
use crate::transport::errors::QueryError;
use crate::transport::node::Node;
use crate::transport::topology::{Keyspace, TopologyInfo, TopologyReader};
Expand Down Expand Up @@ -50,7 +51,7 @@ struct ClusterWorker {

// Cluster connections
topology_reader: TopologyReader,
connection_config: ConnectionConfig,
pool_config: PoolConfig,

// To listen for refresh requests
refresh_channel: tokio::sync::mpsc::Receiver<RefreshRequest>,
Expand Down Expand Up @@ -79,7 +80,7 @@ struct UseKeyspaceRequest {
impl Cluster {
pub async fn new(
initial_peers: &[SocketAddr],
connection_config: ConnectionConfig,
pool_config: PoolConfig,
) -> Result<Cluster, QueryError> {
let cluster_data = Arc::new(ArcSwap::from(Arc::new(ClusterData {
known_peers: HashMap::new(),
Expand All @@ -98,10 +99,10 @@ impl Cluster {

topology_reader: TopologyReader::new(
initial_peers,
connection_config.clone(),
pool_config.connection_config.clone(),
server_events_sender,
),
connection_config,
pool_config,

refresh_channel: refresh_receiver,
server_events_channel: server_events_receiver,
Expand Down Expand Up @@ -220,7 +221,7 @@ impl ClusterData {
/// Uses provided `known_peers` hashmap to recycle nodes if possible.
pub fn new(
info: TopologyInfo,
connection_config: &ConnectionConfig,
pool_config: &PoolConfig,
known_peers: &HashMap<SocketAddr, Arc<Node>>,
used_keyspace: &Option<VerifiedKeyspaceName>,
) -> Self {
Expand All @@ -241,7 +242,7 @@ impl ClusterData {
}
_ => Arc::new(Node::new(
peer.address,
connection_config.clone(),
pool_config.clone(),
peer.datacenter,
peer.rack,
used_keyspace.clone(),
Expand Down Expand Up @@ -432,7 +433,7 @@ impl ClusterWorker {

let new_cluster_data = Arc::new(ClusterData::new(
topo_info,
&self.connection_config,
&self.pool_config,
&cluster_data.known_peers,
&self.used_keyspace,
));
Expand Down
11 changes: 0 additions & 11 deletions scylla/src/transport/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,6 @@ impl Default for PoolConfig {
}
}

// Temporary code used to reduce commit sizes,
// will be removed in further commits
impl From<ConnectionConfig> for PoolConfig {
fn from(connection_config: ConnectionConfig) -> PoolConfig {
PoolConfig {
connection_config,
..Default::default()
}
}
}

enum MaybePoolConnections {
// The pool is being filled for the first time
Initializing,
Expand Down
14 changes: 5 additions & 9 deletions scylla/src/transport/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/// Node represents a cluster node along with it's data and connections
use crate::routing::Token;
use crate::transport::connection::Connection;
use crate::transport::connection::VerifiedKeyspaceName;
use crate::transport::connection::{Connection, ConnectionConfig};
use crate::transport::connection_pool::NodeConnectionPool;
use crate::transport::connection_pool::{NodeConnectionPool, PoolConfig};
use crate::transport::errors::QueryError;

use std::{
Expand Down Expand Up @@ -41,17 +41,13 @@ impl Node {
/// `rack` - optional rack name
pub fn new(
address: SocketAddr,
connection_config: ConnectionConfig,
pool_config: PoolConfig,
datacenter: Option<String>,
rack: Option<String>,
keyspace_name: Option<VerifiedKeyspaceName>,
) -> Self {
let pool = NodeConnectionPool::new(
address.ip(),
address.port(),
connection_config.into(),
keyspace_name,
);
let pool =
NodeConnectionPool::new(address.ip(), address.port(), pool_config, keyspace_name);

Node {
address,
Expand Down
28 changes: 25 additions & 3 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::query::Query;
use crate::routing::{murmur3_token, Token};
use crate::statement::{Consistency, SerialConsistency};
use crate::tracing::{GetTracingConfig, TracingEvent, TracingInfo};
use crate::transport::connection_pool::PoolConfig;
use crate::transport::{
cluster::Cluster,
connection::{BatchResult, Connection, ConnectionConfig, QueryResult, VerifiedKeyspaceName},
Expand All @@ -36,6 +37,8 @@ use crate::transport::{
use crate::{batch::Batch, statement::StatementConfig};
use crate::{cql_to_rust::FromRow, transport::speculative_execution};

pub use crate::transport::connection_pool::PoolSize;

#[cfg(feature = "ssl")]
use openssl::ssl::SslContext;

Expand Down Expand Up @@ -83,6 +86,14 @@ pub struct SessionConfig {

pub schema_agreement_interval: Duration,
pub connect_timeout: std::time::Duration,

/// Size of the per-node connection pool, i.e. how many connections the driver should keep to each node.
/// The default is `PerShard(1)`, which is the recommended setting for Scylla clusters.
pub connection_pool_size: PoolSize,

/// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it.
/// Generally, this options is best left as default (false).
pub disallow_shard_aware_port: bool,
/*
These configuration options will be added in the future:
Expand Down Expand Up @@ -127,6 +138,8 @@ impl SessionConfig {
auth_username: None,
auth_password: None,
connect_timeout: std::time::Duration::from_secs(5),
connection_pool_size: Default::default(),
disallow_shard_aware_port: false,
}
}

Expand Down Expand Up @@ -188,6 +201,15 @@ impl SessionConfig {
}
}

/// Creates a PoolConfig which can be used to create NodeConnectionPools
fn get_pool_config(&self) -> PoolConfig {
PoolConfig {
connection_config: self.get_connection_config(),
pool_size: self.connection_pool_size.clone(),
can_use_shard_aware_port: !self.disallow_shard_aware_port,
}
}

/// Makes a config that should be used in Connection
fn get_connection_config(&self) -> ConnectionConfig {
ConnectionConfig {
Expand Down Expand Up @@ -312,16 +334,16 @@ impl Session {

// Start the session
let cluster = if !shard_aware_addresses.is_empty() {
match Cluster::new(&shard_aware_addresses, config.get_connection_config()).await {
match Cluster::new(&shard_aware_addresses, config.get_pool_config()).await {
Ok(clust) => clust,
Err(e) => {
warn!("Unable to establish connections at detected shard-aware port, falling back to default ports: {}", e);
Cluster::new(&node_addresses, config.get_connection_config()).await?
Cluster::new(&node_addresses, config.get_pool_config()).await?
}
}
} else {
info!("Shard-aware ports not available, falling back to default ports");
Cluster::new(&node_addresses, config.get_connection_config()).await?
Cluster::new(&node_addresses, config.get_pool_config()).await?
};

let session = Session {
Expand Down
66 changes: 65 additions & 1 deletion scylla/src/transport/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::load_balancing::LoadBalancingPolicy;
use super::session::{Session, SessionConfig};
use super::speculative_execution::SpeculativeExecutionPolicy;
use super::Compression;
use crate::transport::retry_policy::RetryPolicy;
use crate::transport::{connection_pool::PoolSize, retry_policy::RetryPolicy};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -380,6 +380,70 @@ impl SessionBuilder {
self.config.connect_timeout = duration;
self
}

/// Sets the per-node connection pool size.
/// The default is one connection per shard, which is the recommended setting for Scylla.
///
/// # Example
/// ```
/// # use scylla::{Session, SessionBuilder};
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// use std::num::NonZeroUsize;
/// use scylla::transport::session::PoolSize;
///
/// // This session will establish 4 connections to each node.
/// // For Scylla clusters, this number will be divided across shards
/// let session: Session = SessionBuilder::new()
/// .known_node("127.0.0.1:9042")
/// .pool_size(PoolSize::PerHost(NonZeroUsize::new(4).unwrap()))
/// .build()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn pool_size(mut self, size: PoolSize) -> Self {
self.config.connection_pool_size = size;
self
}

/// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it.
///
/// _This is a Scylla-specific option_. It has no effect on Cassandra clusters.
///
/// By default, connecting to the shard-aware port is __allowed__ and, in general, this setting
/// _should not be changed_. The shard-aware port (19042 or 19142) makes the process of
/// establishing connection per shard more robust compared to the regular transport port
/// (9042 or 9142). With the shard-aware port, the driver is able to choose which shard
/// will be assigned to the connection.
///
/// In order to be able to use the shard-aware port effectively, the port needs to be
/// reachable and not behind a NAT which changes source ports (the driver uses the source port
/// to tell Scylla which shard to assign). However, the driver is designed to behave in a robust
/// way if those conditions are not met - if the driver fails to connect to the port or gets
/// a connection to the wrong shard, it will re-attempt the connection to the regular transport port.
///
/// The only cost of misconfigured shard-aware port should be a slightly longer reconnection time.
/// If it is unacceptable to you or suspect that it causes you some other problems,
/// you can use this option to disable the shard-aware port feature completely.
/// However, __you should use it as a last resort__. Before you do that, we strongly recommend
/// that you consider fixing the network issues.
///
/// # Example
/// ```
/// # use scylla::{Session, SessionBuilder};
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let session: Session = SessionBuilder::new()
/// .known_node("127.0.0.1:9042")
/// .disallow_shard_aware_port(true)
/// .build()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn disallow_shard_aware_port(mut self, disallow: bool) -> Self {
self.config.disallow_shard_aware_port = disallow;
self
}
}

/// Creates a [`SessionBuilder`] with default configuration, same as [`SessionBuilder::new`]
Expand Down

0 comments on commit c46a894

Please sign in to comment.