Skip to content

Commit

Permalink
node: ConnectionPoolError
Browse files Browse the repository at this point in the history
Introduced a ConnectionPoolError which appears when
we were unable to select a connection from the connection pool.
  • Loading branch information
muzarski committed Aug 29, 2024
1 parent 51f1fc0 commit c4cc700
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 26 deletions.
24 changes: 24 additions & 0 deletions scylla-cql/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub enum QueryError {
#[error("IO Error: {0}")]
IoError(Arc<std::io::Error>),

/// Selected node's connection pool is in invalid state.
#[error("No connections in the pool: {0}")]
ConnectionPoolError(#[from] ConnectionPoolError),

/// Unexpected message received
#[error("Protocol Error: {0}")]
ProtocolError(&'static str),
Expand Down Expand Up @@ -432,6 +436,10 @@ pub enum NewSessionError {
#[error("IO Error: {0}")]
IoError(Arc<std::io::Error>),

/// Selected node's connection pool is in invalid state.
#[error("No connections in the pool: {0}")]
ConnectionPoolError(#[from] ConnectionPoolError),

/// Unexpected message received
#[error("Protocol Error: {0}")]
ProtocolError(&'static str),
Expand Down Expand Up @@ -475,6 +483,21 @@ pub enum BadKeyspaceName {
IllegalCharacter(String, char),
}

/// An error that occurred when selecting a node connection
/// to perform a request on.
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum ConnectionPoolError {
#[error("The pool is broken; Last connection failed with: {last_connection_error}")]
Broken {
last_connection_error: Arc<dyn Error + Sync + Send>,
},
#[error("Pool is still being initialized")]
Initializing,
#[error("The node has been disabled by a host filter")]
NodeDisabledByHostFilter,
}

#[derive(Error, Debug, Clone)]
#[error("Connection broken, reason: {0}")]
pub struct BrokenConnectionError(Arc<dyn Error + Sync + Send>);
Expand Down Expand Up @@ -680,6 +703,7 @@ impl From<QueryError> for NewSessionError {
QueryError::CqlResultParseError(e) => NewSessionError::CqlResultParseError(e),
QueryError::CqlErrorParseError(e) => NewSessionError::CqlErrorParseError(e),
QueryError::IoError(e) => NewSessionError::IoError(e),
QueryError::ConnectionPoolError(e) => NewSessionError::ConnectionPoolError(e),
QueryError::ProtocolError(m) => NewSessionError::ProtocolError(m),
QueryError::InvalidMessage(m) => NewSessionError::InvalidMessage(m),
QueryError::TimeoutError => NewSessionError::TimeoutError,
Expand Down
27 changes: 11 additions & 16 deletions scylla/src/transport/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use arc_swap::ArcSwap;
use futures::{future::RemoteHandle, stream::FuturesUnordered, Future, FutureExt, StreamExt};
use rand::Rng;
use scylla_cql::errors::BrokenConnectionErrorKind;
use scylla_cql::errors::ConnectionPoolError;
use std::convert::TryInto;
use std::io::ErrorKind;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::{Arc, RwLock, Weak};
Expand Down Expand Up @@ -239,7 +239,7 @@ impl NodeConnectionPool {
pub(crate) fn connection_for_shard(
&self,
shard: Shard,
) -> Result<Arc<Connection>, std::io::Error> {
) -> Result<Arc<Connection>, ConnectionPoolError> {
trace!(shard = shard, "Selecting connection for shard");
self.with_connections(|pool_conns| match pool_conns {
PoolConnections::NotSharded(conns) => {
Expand All @@ -262,7 +262,7 @@ impl NodeConnectionPool {
})
}

pub(crate) fn random_connection(&self) -> Result<Arc<Connection>, std::io::Error> {
pub(crate) fn random_connection(&self) -> Result<Arc<Connection>, ConnectionPoolError> {
trace!("Selecting random connection");
self.with_connections(|pool_conns| match pool_conns {
PoolConnections::NotSharded(conns) => {
Expand Down Expand Up @@ -346,7 +346,9 @@ impl NodeConnectionPool {
}
}

pub(crate) fn get_working_connections(&self) -> Result<Vec<Arc<Connection>>, std::io::Error> {
pub(crate) fn get_working_connections(
&self,
) -> Result<Vec<Arc<Connection>>, ConnectionPoolError> {
self.with_connections(|pool_conns| match pool_conns {
PoolConnections::NotSharded(conns) => conns.clone(),
PoolConnections::Sharded { connections, .. } => {
Expand Down Expand Up @@ -378,21 +380,14 @@ impl NodeConnectionPool {
fn with_connections<T>(
&self,
f: impl FnOnce(&PoolConnections) -> T,
) -> Result<T, std::io::Error> {
) -> Result<T, ConnectionPoolError> {
let conns = self.conns.load_full();
match &*conns {
MaybePoolConnections::Ready(pool_connections) => Ok(f(pool_connections)),
MaybePoolConnections::Broken(err) => Err(std::io::Error::new(
ErrorKind::Other,
format!(
"No connections in the pool; last connection failed with: {}",
err
),
)),
MaybePoolConnections::Initializing => Err(std::io::Error::new(
ErrorKind::Other,
"No connections in the pool, pool is still being initialized",
)),
MaybePoolConnections::Broken(err) => Err(ConnectionPoolError::Broken {
last_connection_error: Arc::new(err.clone()),
}),
MaybePoolConnections::Initializing => Err(ConnectionPoolError::Initializing),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2838,6 +2838,7 @@ mod latency_awareness {
// "fast" errors, i.e. ones that are returned quickly after the query begins
QueryError::BadQuery(_)
| QueryError::BrokenConnection(_)
| QueryError::ConnectionPoolError(_)
| QueryError::TooManyOrphanedStreamIds(_)
| QueryError::UnableToAllocStreamId
| QueryError::DbError(DbError::IsBootstrapping, _)
Expand Down
19 changes: 9 additions & 10 deletions scylla/src/transport/node.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use scylla_cql::errors::ConnectionPoolError;
use tokio::net::lookup_host;
use tracing::warn;
use uuid::Uuid;
Expand Down Expand Up @@ -157,7 +158,7 @@ impl Node {
pub(crate) async fn connection_for_shard(
&self,
shard: Shard,
) -> Result<Arc<Connection>, std::io::Error> {
) -> Result<Arc<Connection>, ConnectionPoolError> {
self.get_pool()?.connection_for_shard(shard)
}

Expand Down Expand Up @@ -186,7 +187,9 @@ impl Node {
Ok(())
}

pub(crate) fn get_working_connections(&self) -> Result<Vec<Arc<Connection>>, std::io::Error> {
pub(crate) fn get_working_connections(
&self,
) -> Result<Vec<Arc<Connection>>, ConnectionPoolError> {
self.get_pool()?.get_working_connections()
}

Expand All @@ -196,14 +199,10 @@ impl Node {
}
}

fn get_pool(&self) -> Result<&NodeConnectionPool, std::io::Error> {
self.pool.as_ref().ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
"No connections in the pool: the node has been disabled \
by the host filter",
)
})
fn get_pool(&self) -> Result<&NodeConnectionPool, ConnectionPoolError> {
self.pool
.as_ref()
.ok_or(ConnectionPoolError::NodeDisabledByHostFilter)
}
}

Expand Down

0 comments on commit c4cc700

Please sign in to comment.