Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove r2d2 database connection pools #8442

Merged
merged 5 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ deadpool-diesel = { version = "=0.6.0", features = ["postgres", "tracing"] }
derive_builder = "=0.20.0"
derive_deref = "=1.1.1"
dialoguer = "=0.11.0"
diesel = { version = "=2.1.5", features = ["postgres", "serde_json", "chrono", "r2d2", "numeric"] }
diesel = { version = "=2.1.5", features = ["postgres", "serde_json", "chrono", "numeric"] }
diesel_full_text_search = "=2.1.1"
diesel_migrations = { version = "=2.1.0", features = ["postgres"] }
dotenvy = "=0.15.7"
Expand Down Expand Up @@ -126,6 +126,7 @@ crates_io_index = { path = "crates/crates_io_index", features = ["testing"] }
crates_io_tarball = { path = "crates/crates_io_tarball", features = ["builder"] }
crates_io_test_db = { path = "crates/crates_io_test_db" }
claims = "=0.7.1"
diesel = { version = "=2.1.5", features = ["r2d2"] }
googletest = "=0.11.0"
insta = { version = "=1.38.0", features = ["json", "redactions"] }
regex = "=1.10.4"
Expand Down
131 changes: 1 addition & 130 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Application-wide components in a struct accessible from each request

use crate::config;
use crate::db::{connection_url, ConnectionConfig, DieselPool, DieselPooledConn, PoolError};
use crate::db::{connection_url, ConnectionConfig};
use std::ops::Deref;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
Expand All @@ -14,25 +14,17 @@ use axum::extract::{FromRef, FromRequestParts, State};
use crates_io_github::GitHubClient;
use deadpool_diesel::postgres::{Manager as DeadpoolManager, Pool as DeadpoolPool};
use deadpool_diesel::Runtime;
use diesel::r2d2;
use oauth2::basic::BasicClient;
use scheduled_thread_pool::ScheduledThreadPool;

type DeadpoolResult = Result<deadpool_diesel::postgres::Connection, deadpool_diesel::PoolError>;

/// The `App` struct holds the main components of the application like
/// the database connection pool and configurations
pub struct App {
/// The primary database connection pool
pub primary_database: DieselPool,

/// Async database connection pool based on `deadpool` connected
/// to the primary database
pub deadpool_primary: DeadpoolPool,

/// The read-only replica database connection pool
pub read_only_replica_database: Option<DieselPool>,

/// Async database connection pool based on `deadpool` connected
/// to the read-only replica database
pub deadpool_replica: Option<DeadpoolPool>,
Expand Down Expand Up @@ -88,32 +80,6 @@ impl App {
),
);

let thread_pool = Arc::new(ScheduledThreadPool::new(config.db.helper_threads));

let primary_database = {
let primary_db_connection_config = ConnectionConfig {
statement_timeout: config.db.statement_timeout,
read_only: config.db.primary.read_only_mode,
};

let primary_db_config = r2d2::Pool::builder()
.max_size(config.db.primary.pool_size)
.min_idle(config.db.primary.min_idle)
.connection_timeout(config.db.connection_timeout)
.connection_customizer(Box::new(primary_db_connection_config))
.thread_pool(thread_pool.clone());

DieselPool::new(
&config.db.primary.url,
&config.db,
primary_db_config,
instance_metrics
.database_time_to_obtain_connection
.with_label_values(&["primary"]),
)
.unwrap()
};

let primary_database_async = {
use secrecy::ExposeSecret;

Expand All @@ -134,34 +100,6 @@ impl App {
.unwrap()
};

let replica_database = if let Some(pool_config) = config.db.replica.as_ref() {
let replica_db_connection_config = ConnectionConfig {
statement_timeout: config.db.statement_timeout,
read_only: pool_config.read_only_mode,
};

let replica_db_config = r2d2::Pool::builder()
.max_size(pool_config.pool_size)
.min_idle(pool_config.min_idle)
.connection_timeout(config.db.connection_timeout)
.connection_customizer(Box::new(replica_db_connection_config))
.thread_pool(thread_pool);

Some(
DieselPool::new(
&pool_config.url,
&config.db,
replica_db_config,
instance_metrics
.database_time_to_obtain_connection
.with_label_values(&["follower"]),
)
.unwrap(),
)
} else {
None
};

let replica_database_async = if let Some(pool_config) = config.db.replica.as_ref() {
use secrecy::ExposeSecret;

Expand All @@ -187,9 +125,7 @@ impl App {
};

App {
primary_database,
deadpool_primary: primary_database_async,
read_only_replica_database: replica_database,
deadpool_replica: replica_database_async,
github,
github_oauth,
Expand All @@ -208,48 +144,12 @@ impl App {
&self.config.session_key
}

/// Obtain a read/write database connection from the primary pool
#[instrument(skip_all)]
pub fn db_write(&self) -> Result<DieselPooledConn, PoolError> {
self.primary_database.get()
}

/// Obtain a read/write database connection from the async primary pool
#[instrument(skip_all)]
pub async fn db_write_async(&self) -> DeadpoolResult {
self.deadpool_primary.get().await
}

/// Obtain a readonly database connection from the replica pool
///
/// If the replica pool is disabled or unavailable, the primary pool is used instead.
#[instrument(skip_all)]
pub fn db_read(&self) -> Result<DieselPooledConn, PoolError> {
let Some(read_only_pool) = self.read_only_replica_database.as_ref() else {
// Replica is disabled, but primary might be available
return self.primary_database.get();
};

match read_only_pool.get() {
// Replica is available
Ok(connection) => Ok(connection),

// Replica is not available, but primary might be available
Err(PoolError::UnhealthyPool) => {
let _ = self
.instance_metrics
.database_fallback_used
.get_metric_with_label_values(&["follower"])
.map(|metric| metric.inc());

self.primary_database.get()
}

// Replica failed
Err(error) => Err(error),
}
}

/// Obtain a readonly database connection from the replica pool
///
/// If the replica pool is disabled or unavailable, the primary pool is used instead.
Expand Down Expand Up @@ -281,35 +181,6 @@ impl App {
}
}

/// Obtain a readonly database connection from the primary pool
///
/// If the primary pool is unavailable, the replica pool is used instead, if not disabled.
#[instrument(skip_all)]
pub fn db_read_prefer_primary(&self) -> Result<DieselPooledConn, PoolError> {
let Some(read_only_pool) = self.read_only_replica_database.as_ref() else {
return self.primary_database.get();
};

match self.primary_database.get() {
// Primary is available
Ok(connection) => Ok(connection),

// Primary is not available, but replica might be available
Err(PoolError::UnhealthyPool) => {
let _ = self
.instance_metrics
.database_fallback_used
.get_metric_with_label_values(&["primary"])
.map(|metric| metric.inc());

read_only_pool.get()
}

// Primary failed
Err(error) => Err(error),
}
}

/// Obtain a readonly database connection from the primary pool
///
/// If the primary pool is unavailable, the replica pool is used instead, if not disabled.
Expand Down
113 changes: 1 addition & 112 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,110 +1,13 @@
use deadpool_diesel::postgres::{Hook, HookError};
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager, CustomizeConnection, State};
use prometheus::Histogram;
use secrecy::{ExposeSecret, SecretString};
use std::ops::Deref;
use secrecy::ExposeSecret;
use std::time::Duration;
use thiserror::Error;
use url::Url;

use crate::config;

pub mod sql_types;

pub type ConnectionPool = r2d2::Pool<ConnectionManager<PgConnection>>;

#[derive(Clone)]
pub struct DieselPool {
pool: ConnectionPool,
time_to_obtain_connection_metric: Option<Histogram>,
}

impl DieselPool {
pub(crate) fn new(
url: &SecretString,
config: &config::DatabasePools,
r2d2_config: r2d2::Builder<ConnectionManager<PgConnection>>,
time_to_obtain_connection_metric: Histogram,
) -> Result<DieselPool, PoolError> {
let manager = ConnectionManager::new(connection_url(config, url.expose_secret()));

// For crates.io we want the behavior of creating a database pool to be slightly different
// than the defaults of R2D2: the library's build() method assumes its consumers always
// need a database connection to operate, so it blocks creating a pool until a minimum
// number of connections is available.
//
// crates.io can actually operate in a limited capacity without a database connections,
// especially by serving download requests to our users. Because of that we don't want to
// block indefinitely waiting for a connection: we instead need to wait for a bit (to avoid
// serving errors for the first connections until the pool is initialized) and if we can't
// establish any connection continue booting up the application. The database pool will
// automatically be marked as unhealthy and the rest of the application will adapt.
let pool = DieselPool {
pool: r2d2_config.build_unchecked(manager),
time_to_obtain_connection_metric: Some(time_to_obtain_connection_metric),
};
match pool.wait_until_healthy(Duration::from_secs(5)) {
Ok(()) => {}
Err(PoolError::UnhealthyPool) => {}
Err(err) => return Err(err),
}

Ok(pool)
}

pub fn new_background_worker(pool: r2d2::Pool<ConnectionManager<PgConnection>>) -> Self {
Self {
pool,
time_to_obtain_connection_metric: None,
}
}

#[instrument(name = "db.connect", skip_all)]
pub fn get(&self) -> Result<DieselPooledConn, PoolError> {
match self.time_to_obtain_connection_metric.as_ref() {
Some(time_to_obtain_connection_metric) => time_to_obtain_connection_metric
.observe_closure_duration(|| {
if let Some(conn) = self.pool.try_get() {
Ok(conn)
} else if !self.is_healthy() {
Err(PoolError::UnhealthyPool)
} else {
Ok(self.pool.get()?)
}
}),
None => Ok(self.pool.get()?),
}
}

pub fn state(&self) -> State {
self.pool.state()
}

#[instrument(skip_all)]
pub fn wait_until_healthy(&self, timeout: Duration) -> Result<(), PoolError> {
match self.pool.get_timeout(timeout) {
Ok(_) => Ok(()),
Err(_) if !self.is_healthy() => Err(PoolError::UnhealthyPool),
Err(err) => Err(PoolError::R2D2(err)),
}
}

fn is_healthy(&self) -> bool {
self.state().connections > 0
}
}

impl Deref for DieselPool {
type Target = ConnectionPool;

fn deref(&self) -> &Self::Target {
&self.pool
}
}

pub type DieselPooledConn = r2d2::PooledConnection<ConnectionManager<PgConnection>>;

pub fn oneoff_connection_with_config(
config: &config::DatabasePools,
) -> ConnectionResult<PgConnection> {
Expand Down Expand Up @@ -160,12 +63,6 @@ impl ConnectionConfig {
}
}

impl CustomizeConnection<PgConnection, r2d2::Error> for ConnectionConfig {
fn on_acquire(&self, conn: &mut PgConnection) -> Result<(), r2d2::Error> {
self.apply(conn).map_err(r2d2::Error::QueryError)
}
}

impl From<ConnectionConfig> for Hook {
fn from(config: ConnectionConfig) -> Self {
Hook::async_fn(move |conn, _| {
Expand All @@ -178,11 +75,3 @@ impl From<ConnectionConfig> for Hook {
})
}
}

#[derive(Debug, Error)]
pub enum PoolError {
#[error(transparent)]
R2D2(#[from] r2d2::PoolError),
#[error("unhealthy database pool")]
UnhealthyPool,
}
20 changes: 1 addition & 19 deletions src/metrics/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
//! As a rule of thumb, if the metric requires a database query to be updated it's probably a
//! service-level metric, and you should add it to `src/metrics/service.rs` instead.

use crate::app::App;
use crate::metrics::macros::metrics;
use crate::{app::App, db::DieselPool};
use deadpool_diesel::postgres::Pool;
use prometheus::{
proto::MetricFamily, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
Expand Down Expand Up @@ -53,11 +53,6 @@ metrics! {
impl InstanceMetrics {
pub fn gather(&self, app: &App) -> prometheus::Result<Vec<MetricFamily>> {
// Database pool stats
self.refresh_pool_stats("primary", &app.primary_database)?;
if let Some(follower) = &app.read_only_replica_database {
self.refresh_pool_stats("follower", follower)?;
}

self.refresh_async_pool_stats("async_primary", &app.deadpool_primary)?;
if let Some(follower) = &app.deadpool_replica {
self.refresh_async_pool_stats("async_follower", follower)?;
Expand All @@ -66,19 +61,6 @@ impl InstanceMetrics {
Ok(self.registry.gather())
}

fn refresh_pool_stats(&self, name: &str, pool: &DieselPool) -> prometheus::Result<()> {
let state = pool.state();

self.database_idle_conns
.get_metric_with_label_values(&[name])?
.set(state.idle_connections as i64);
self.database_used_conns
.get_metric_with_label_values(&[name])?
.set((state.connections - state.idle_connections) as i64);

Ok(())
}

fn refresh_async_pool_stats(&self, name: &str, pool: &Pool) -> prometheus::Result<()> {
let status = pool.status();

Expand Down
Loading