Skip to content

Commit

Permalink
fix(pool): spawn task for before_acquire
Browse files Browse the repository at this point in the history
  • Loading branch information
abonander committed Nov 8, 2024
1 parent cb4c975 commit 974a2c9
Showing 1 changed file with 63 additions and 49 deletions.
112 changes: 63 additions & 49 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use std::task::ready;
use crate::logger::private_level_filter_to_trace_level;
use crate::pool::connect::{ConnectPermit, ConnectionCounter, DynConnector};
use crate::pool::idle::IdleQueue;
use crate::private_tracing_dynamic_event;
use crate::rt::JoinHandle;
use crate::{private_tracing_dynamic_event, rt};
use either::Either;
use futures_util::future::{self, OptionFuture};
use futures_util::FutureExt;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -116,7 +118,7 @@ impl<DB: Database> PoolInner<DB> {
let mut close_event = pin!(self.close_event());
let mut deadline = pin!(crate::rt::sleep(self.options.acquire_timeout));
let mut acquire_idle = pin!(self.idle.acquire(self).fuse());
let mut check_idle = pin!(OptionFuture::from(None));
let mut before_acquire = OptionFuture::from(None);
let mut acquire_connect_permit = pin!(OptionFuture::from(Some(
self.counter.acquire_permit(self).fuse()
)));
Expand Down Expand Up @@ -144,21 +146,26 @@ impl<DB: Database> PoolInner<DB> {

// Attempt to acquire a connection from the idle queue.
if let Ready(idle) = acquire_idle.poll_unpin(cx) {
check_idle.set(Some(check_idle_conn(idle, &self.options)).into());
// If we acquired an idle connection, run any checks that need to be done.
//
// Includes `test_on_acquire` and the `before_acquire` callback, if set.
match finish_acquire(idle) {
// There are checks needed to be done, so they're spawned as a task
// to be cancellation-safe.
Either::Left(check_task) => {
before_acquire = Some(check_task).into();
}
// The connection is ready to go.
Either::Right(conn) => {
return Ready(Ok(conn));
}
}
}

// If we acquired an idle connection, run any checks that need to be done.
//
// Includes `test_on_acquire` and the `before_acquire` callback, if set.
//
// We don't want to race this step if it's already running because canceling it
// will result in the potentially unnecessary closure of a connection.
//
// Instead, we just wait and see what happens. If we already started connecting,
// that'll happen concurrently.
match ready!(check_idle.poll_unpin(cx)) {
// Poll the task returned by `finish_acquire`
match ready!(before_acquire.poll_unpin(cx)) {
// The `.reattach()` call errors with "type annotations needed" if not qualified.
Some(Ok(live)) => return Ready(Ok(Floating::reattach(live))),
Some(Ok(conn)) => return Ready(Ok(conn)),
Some(Err(permit)) => {
// We don't strictly need to poll `connect` here; all we really want to do
// is to check if it is `None`. But since currently there's no getter for that,
Expand All @@ -178,7 +185,7 @@ impl<DB: Database> PoolInner<DB> {
// Attempt to acquire another idle connection concurrently to opening a new one.
acquire_idle.set(self.idle.acquire(self).fuse());
// Annoyingly, `OptionFuture` doesn't fuse to `None` on its own
check_idle.set(None.into());
before_acquire = None.into();
}
None => (),
}
Expand Down Expand Up @@ -289,42 +296,49 @@ fn is_beyond_idle_timeout<DB: Database>(idle: &Idle<DB>, options: &PoolOptions<D
.map_or(false, |timeout| idle.idle_since.elapsed() > timeout)
}

async fn check_idle_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
) -> Result<Floating<DB, Live<DB>>, ConnectPermit<DB>> {
if options.test_before_acquire {
// Check that the connection is still live
if let Err(error) = conn.ping().await {
// an error here means the other end has hung up or we lost connectivity
// either way we're fine to just discard the connection
// the error itself here isn't necessarily unexpected so WARN is too strong
tracing::info!(%error, "ping on idle connection returned error");
// connection is broken so don't try to close nicely
return Err(conn.close_hard().await);
}
}

if let Some(test) = &options.before_acquire {
let meta = conn.metadata();
match test(&mut conn.live.raw, meta).await {
Ok(false) => {
// connection was rejected by user-defined hook, close nicely
return Err(conn.close().await);
}

Err(error) => {
tracing::warn!(%error, "error from `before_acquire`");
/// Execute `test_before_acquire` and/or
fn finish_acquire<DB: Database>(
mut conn: Floating<DB, Idle<DB>>
) -> Either<JoinHandle<Result<PoolConnection<DB>, ConnectPermit<DB>>>, PoolConnection<DB>> {
let pool = conn.permit.pool();

if pool.options.test_before_acquire || pool.options.before_acquire.is_some() {
// Spawn a task so the call may complete even if `acquire()` is cancelled.
return Either::Left(rt::spawn(async move {
// Check that the connection is still live
if let Err(error) = conn.ping().await {
// an error here means the other end has hung up or we lost connectivity
// either way we're fine to just discard the connection
// the error itself here isn't necessarily unexpected so WARN is too strong
tracing::info!(%error, "ping on idle connection returned error");
// connection is broken so don't try to close nicely
return Err(conn.close_hard().await);
}

Ok(true) => {}
}
}
if let Some(test) = &conn.permit.pool().options.before_acquire {
let meta = conn.metadata();
match test(&mut conn.inner.live.raw, meta).await {
Ok(false) => {
// connection was rejected by user-defined hook, close nicely
return Err(conn.close().await);
}

// No need to re-connect; connection is alive or we don't care
Ok(conn.into_live())
Err(error) => {
tracing::warn!(%error, "error from `before_acquire`");
// connection is broken so don't try to close nicely
return Err(conn.close_hard().await);
}

Ok(true) => {}
}
}

Ok(conn.into_live().reattach())
}));
}

// No checks are configured, return immediately.
Either::Right(conn.into_live().reattach())
}

fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
Expand Down Expand Up @@ -353,7 +367,7 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
// Immediately cancel this task if the pool is closed.
let mut close_event = pool.close_event();

crate::rt::spawn(async move {
rt::spawn(async move {
let _ = close_event
.do_until(async {
// If the last handle to the pool was dropped while we were sleeping
Expand Down Expand Up @@ -386,10 +400,10 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {

if let Some(duration) = next_run.checked_duration_since(Instant::now()) {
// `async-std` doesn't have a `sleep_until()`
crate::rt::sleep(duration).await;
rt::sleep(duration).await;
} else {
// `next_run` is in the past, just yield.
crate::rt::yield_now().await;
rt::yield_now().await;
}
}
})
Expand Down

0 comments on commit 974a2c9

Please sign in to comment.