diff --git a/Cargo.toml b/Cargo.toml index b78763302a6..09f339f39c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,7 +72,7 @@ deadpool-diesel = { version = "=0.6.0", features = ["postgres", "tracing"] } derive_builder = "=0.20.0" derive_deref = "=1.1.1" dialoguer = "=0.11.0" -diesel = { version = "=2.1.5", features = ["postgres", "serde_json", "chrono", "r2d2", "numeric"] } +diesel = { version = "=2.1.5", features = ["postgres", "serde_json", "chrono", "numeric"] } diesel_full_text_search = "=2.1.1" diesel_migrations = { version = "=2.1.0", features = ["postgres"] } dotenvy = "=0.15.7" @@ -126,6 +126,7 @@ crates_io_index = { path = "crates/crates_io_index", features = ["testing"] } crates_io_tarball = { path = "crates/crates_io_tarball", features = ["builder"] } crates_io_test_db = { path = "crates/crates_io_test_db" } claims = "=0.7.1" +diesel = { version = "=2.1.5", features = ["r2d2"] } googletest = "=0.11.0" insta = { version = "=1.38.0", features = ["json", "redactions"] } regex = "=1.10.4" diff --git a/src/app.rs b/src/app.rs index bf1d25965ee..dcd043d78fc 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,7 +1,7 @@ //! Application-wide components in a struct accessible from each request use crate::config; -use crate::db::{connection_url, ConnectionConfig, DieselPool, DieselPooledConn, PoolError}; +use crate::db::{connection_url, ConnectionConfig}; use std::ops::Deref; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -14,25 +14,17 @@ use axum::extract::{FromRef, FromRequestParts, State}; use crates_io_github::GitHubClient; use deadpool_diesel::postgres::{Manager as DeadpoolManager, Pool as DeadpoolPool}; use deadpool_diesel::Runtime; -use diesel::r2d2; use oauth2::basic::BasicClient; -use scheduled_thread_pool::ScheduledThreadPool; type DeadpoolResult = Result; /// The `App` struct holds the main components of the application like /// the database connection pool and configurations pub struct App { - /// The primary database connection pool - pub primary_database: DieselPool, - /// Async database connection pool based on `deadpool` connected /// to the primary database pub deadpool_primary: DeadpoolPool, - /// The read-only replica database connection pool - pub read_only_replica_database: Option, - /// Async database connection pool based on `deadpool` connected /// to the read-only replica database pub deadpool_replica: Option, @@ -88,32 +80,6 @@ impl App { ), ); - let thread_pool = Arc::new(ScheduledThreadPool::new(config.db.helper_threads)); - - let primary_database = { - let primary_db_connection_config = ConnectionConfig { - statement_timeout: config.db.statement_timeout, - read_only: config.db.primary.read_only_mode, - }; - - let primary_db_config = r2d2::Pool::builder() - .max_size(config.db.primary.pool_size) - .min_idle(config.db.primary.min_idle) - .connection_timeout(config.db.connection_timeout) - .connection_customizer(Box::new(primary_db_connection_config)) - .thread_pool(thread_pool.clone()); - - DieselPool::new( - &config.db.primary.url, - &config.db, - primary_db_config, - instance_metrics - .database_time_to_obtain_connection - .with_label_values(&["primary"]), - ) - .unwrap() - }; - let primary_database_async = { use secrecy::ExposeSecret; @@ -134,34 +100,6 @@ impl App { .unwrap() }; - let replica_database = if let Some(pool_config) = config.db.replica.as_ref() { - let replica_db_connection_config = ConnectionConfig { - statement_timeout: config.db.statement_timeout, - read_only: pool_config.read_only_mode, - }; - - let replica_db_config = r2d2::Pool::builder() - .max_size(pool_config.pool_size) - .min_idle(pool_config.min_idle) - .connection_timeout(config.db.connection_timeout) - .connection_customizer(Box::new(replica_db_connection_config)) - .thread_pool(thread_pool); - - Some( - DieselPool::new( - &pool_config.url, - &config.db, - replica_db_config, - instance_metrics - .database_time_to_obtain_connection - .with_label_values(&["follower"]), - ) - .unwrap(), - ) - } else { - None - }; - let replica_database_async = if let Some(pool_config) = config.db.replica.as_ref() { use secrecy::ExposeSecret; @@ -187,9 +125,7 @@ impl App { }; App { - primary_database, deadpool_primary: primary_database_async, - read_only_replica_database: replica_database, deadpool_replica: replica_database_async, github, github_oauth, @@ -208,48 +144,12 @@ impl App { &self.config.session_key } - /// Obtain a read/write database connection from the primary pool - #[instrument(skip_all)] - pub fn db_write(&self) -> Result { - self.primary_database.get() - } - /// Obtain a read/write database connection from the async primary pool #[instrument(skip_all)] pub async fn db_write_async(&self) -> DeadpoolResult { self.deadpool_primary.get().await } - /// Obtain a readonly database connection from the replica pool - /// - /// If the replica pool is disabled or unavailable, the primary pool is used instead. - #[instrument(skip_all)] - pub fn db_read(&self) -> Result { - let Some(read_only_pool) = self.read_only_replica_database.as_ref() else { - // Replica is disabled, but primary might be available - return self.primary_database.get(); - }; - - match read_only_pool.get() { - // Replica is available - Ok(connection) => Ok(connection), - - // Replica is not available, but primary might be available - Err(PoolError::UnhealthyPool) => { - let _ = self - .instance_metrics - .database_fallback_used - .get_metric_with_label_values(&["follower"]) - .map(|metric| metric.inc()); - - self.primary_database.get() - } - - // Replica failed - Err(error) => Err(error), - } - } - /// Obtain a readonly database connection from the replica pool /// /// If the replica pool is disabled or unavailable, the primary pool is used instead. @@ -281,35 +181,6 @@ impl App { } } - /// Obtain a readonly database connection from the primary pool - /// - /// If the primary pool is unavailable, the replica pool is used instead, if not disabled. - #[instrument(skip_all)] - pub fn db_read_prefer_primary(&self) -> Result { - let Some(read_only_pool) = self.read_only_replica_database.as_ref() else { - return self.primary_database.get(); - }; - - match self.primary_database.get() { - // Primary is available - Ok(connection) => Ok(connection), - - // Primary is not available, but replica might be available - Err(PoolError::UnhealthyPool) => { - let _ = self - .instance_metrics - .database_fallback_used - .get_metric_with_label_values(&["primary"]) - .map(|metric| metric.inc()); - - read_only_pool.get() - } - - // Primary failed - Err(error) => Err(error), - } - } - /// Obtain a readonly database connection from the primary pool /// /// If the primary pool is unavailable, the replica pool is used instead, if not disabled. diff --git a/src/db.rs b/src/db.rs index bcbb4acd5ca..52ad049a5d7 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,110 +1,13 @@ use deadpool_diesel::postgres::{Hook, HookError}; use diesel::prelude::*; -use diesel::r2d2::{self, ConnectionManager, CustomizeConnection, State}; -use prometheus::Histogram; -use secrecy::{ExposeSecret, SecretString}; -use std::ops::Deref; +use secrecy::ExposeSecret; use std::time::Duration; -use thiserror::Error; use url::Url; use crate::config; pub mod sql_types; -pub type ConnectionPool = r2d2::Pool>; - -#[derive(Clone)] -pub struct DieselPool { - pool: ConnectionPool, - time_to_obtain_connection_metric: Option, -} - -impl DieselPool { - pub(crate) fn new( - url: &SecretString, - config: &config::DatabasePools, - r2d2_config: r2d2::Builder>, - time_to_obtain_connection_metric: Histogram, - ) -> Result { - let manager = ConnectionManager::new(connection_url(config, url.expose_secret())); - - // For crates.io we want the behavior of creating a database pool to be slightly different - // than the defaults of R2D2: the library's build() method assumes its consumers always - // need a database connection to operate, so it blocks creating a pool until a minimum - // number of connections is available. - // - // crates.io can actually operate in a limited capacity without a database connections, - // especially by serving download requests to our users. Because of that we don't want to - // block indefinitely waiting for a connection: we instead need to wait for a bit (to avoid - // serving errors for the first connections until the pool is initialized) and if we can't - // establish any connection continue booting up the application. The database pool will - // automatically be marked as unhealthy and the rest of the application will adapt. - let pool = DieselPool { - pool: r2d2_config.build_unchecked(manager), - time_to_obtain_connection_metric: Some(time_to_obtain_connection_metric), - }; - match pool.wait_until_healthy(Duration::from_secs(5)) { - Ok(()) => {} - Err(PoolError::UnhealthyPool) => {} - Err(err) => return Err(err), - } - - Ok(pool) - } - - pub fn new_background_worker(pool: r2d2::Pool>) -> Self { - Self { - pool, - time_to_obtain_connection_metric: None, - } - } - - #[instrument(name = "db.connect", skip_all)] - pub fn get(&self) -> Result { - match self.time_to_obtain_connection_metric.as_ref() { - Some(time_to_obtain_connection_metric) => time_to_obtain_connection_metric - .observe_closure_duration(|| { - if let Some(conn) = self.pool.try_get() { - Ok(conn) - } else if !self.is_healthy() { - Err(PoolError::UnhealthyPool) - } else { - Ok(self.pool.get()?) - } - }), - None => Ok(self.pool.get()?), - } - } - - pub fn state(&self) -> State { - self.pool.state() - } - - #[instrument(skip_all)] - pub fn wait_until_healthy(&self, timeout: Duration) -> Result<(), PoolError> { - match self.pool.get_timeout(timeout) { - Ok(_) => Ok(()), - Err(_) if !self.is_healthy() => Err(PoolError::UnhealthyPool), - Err(err) => Err(PoolError::R2D2(err)), - } - } - - fn is_healthy(&self) -> bool { - self.state().connections > 0 - } -} - -impl Deref for DieselPool { - type Target = ConnectionPool; - - fn deref(&self) -> &Self::Target { - &self.pool - } -} - -pub type DieselPooledConn = r2d2::PooledConnection>; - pub fn oneoff_connection_with_config( config: &config::DatabasePools, ) -> ConnectionResult { @@ -160,12 +63,6 @@ impl ConnectionConfig { } } -impl CustomizeConnection for ConnectionConfig { - fn on_acquire(&self, conn: &mut PgConnection) -> Result<(), r2d2::Error> { - self.apply(conn).map_err(r2d2::Error::QueryError) - } -} - impl From for Hook { fn from(config: ConnectionConfig) -> Self { Hook::async_fn(move |conn, _| { @@ -178,11 +75,3 @@ impl From for Hook { }) } } - -#[derive(Debug, Error)] -pub enum PoolError { - #[error(transparent)] - R2D2(#[from] r2d2::PoolError), - #[error("unhealthy database pool")] - UnhealthyPool, -} diff --git a/src/metrics/instance.rs b/src/metrics/instance.rs index 8582b1a6ecd..fe7a9205c9e 100644 --- a/src/metrics/instance.rs +++ b/src/metrics/instance.rs @@ -17,8 +17,8 @@ //! As a rule of thumb, if the metric requires a database query to be updated it's probably a //! service-level metric, and you should add it to `src/metrics/service.rs` instead. +use crate::app::App; use crate::metrics::macros::metrics; -use crate::{app::App, db::DieselPool}; use deadpool_diesel::postgres::Pool; use prometheus::{ proto::MetricFamily, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, @@ -53,11 +53,6 @@ metrics! { impl InstanceMetrics { pub fn gather(&self, app: &App) -> prometheus::Result> { // Database pool stats - self.refresh_pool_stats("primary", &app.primary_database)?; - if let Some(follower) = &app.read_only_replica_database { - self.refresh_pool_stats("follower", follower)?; - } - self.refresh_async_pool_stats("async_primary", &app.deadpool_primary)?; if let Some(follower) = &app.deadpool_replica { self.refresh_async_pool_stats("async_follower", follower)?; @@ -66,19 +61,6 @@ impl InstanceMetrics { Ok(self.registry.gather()) } - fn refresh_pool_stats(&self, name: &str, pool: &DieselPool) -> prometheus::Result<()> { - let state = pool.state(); - - self.database_idle_conns - .get_metric_with_label_values(&[name])? - .set(state.idle_connections as i64); - self.database_used_conns - .get_metric_with_label_values(&[name])? - .set((state.connections - state.idle_connections) as i64); - - Ok(()) - } - fn refresh_async_pool_stats(&self, name: &str, pool: &Pool) -> prometheus::Result<()> { let status = pool.status(); diff --git a/src/util/errors.rs b/src/util/errors.rs index 5d8d73cb316..ab20c0f5403 100644 --- a/src/util/errors.rs +++ b/src/util/errors.rs @@ -24,7 +24,6 @@ use diesel::result::{DatabaseErrorKind, Error as DieselError}; use http::StatusCode; use tokio::task::JoinError; -use crate::db::PoolError; use crate::middleware::log_request::ErrorField; mod json; @@ -172,15 +171,6 @@ impl From for BoxedAppError { } } -impl From for BoxedAppError { - fn from(err: PoolError) -> BoxedAppError { - match err { - PoolError::UnhealthyPool => service_unavailable(), - _ => Box::new(err), - } - } -} - impl From for BoxedAppError { fn from(_err: deadpool_diesel::PoolError) -> BoxedAppError { service_unavailable()