From ae4d1e786c43308422ad64bdbaf697dc5169f560 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 27 Feb 2024 12:30:23 +0100 Subject: [PATCH 1/8] worker/jobs/git: Replace `r2d2` with `deadpool` --- src/worker/jobs/git.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/worker/jobs/git.rs b/src/worker/jobs/git.rs index f6ddef00b12..eceba60396a 100644 --- a/src/worker/jobs/git.rs +++ b/src/worker/jobs/git.rs @@ -1,7 +1,7 @@ use crate::models; use crate::tasks::spawn_blocking; use crate::worker::Environment; -use anyhow::Context; +use anyhow::{anyhow, Context}; use chrono::Utc; use crates_io_env_vars::var_parsed; use crates_io_index::{Crate, Repository}; @@ -39,9 +39,9 @@ impl BackgroundJob for SyncToGitIndex { info!("Syncing to git index"); let crate_name = self.krate.clone(); - spawn_blocking(move || { - let mut conn = env.connection_pool.get()?; - let new = get_index_data(&crate_name, &mut conn).context("Failed to get index data")?; + let conn = env.deadpool.get().await?; + conn.interact(move |conn| { + let new = get_index_data(&crate_name, conn).context("Failed to get index data")?; let repo = env.lock_index()?; let dst = repo.index_file(&crate_name); @@ -75,6 +75,7 @@ impl BackgroundJob for SyncToGitIndex { Ok(()) }) .await + .map_err(|err| anyhow!(err.to_string()))? } } @@ -102,12 +103,12 @@ impl BackgroundJob for SyncToSparseIndex { info!("Syncing to sparse index"); let crate_name = self.krate.clone(); - let pool = env.connection_pool.clone(); - let content = spawn_blocking(move || { - let mut conn = pool.get()?; - get_index_data(&crate_name, &mut conn).context("Failed to get index data") - }) - .await?; + let conn = env.deadpool.get().await?; + let content = conn + .interact(move |conn| get_index_data(&crate_name, conn)) + .await + .map_err(|err| anyhow!(err.to_string()))? + .context("Failed to get index data")?; let future = env.storage.sync_index(&self.krate, content); future.await.context("Failed to sync index data")?; From e90e91604470d433082e058e3b8dcf3defd1a074 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 27 Feb 2024 10:14:53 +0100 Subject: [PATCH 2/8] worker/jobs/downloads/queue: Replace `r2d2` with `deadpool` --- src/worker/jobs/downloads/queue/job.rs | 38 +++++++++++++------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/worker/jobs/downloads/queue/job.rs b/src/worker/jobs/downloads/queue/job.rs index 83de229b4ba..2c8c2478e4e 100644 --- a/src/worker/jobs/downloads/queue/job.rs +++ b/src/worker/jobs/downloads/queue/job.rs @@ -1,14 +1,14 @@ use crate::config::CdnLogQueueConfig; -use crate::db::DieselPool; use crate::sqs::{MockSqsQueue, SqsQueue, SqsQueueImpl}; -use crate::tasks::spawn_blocking; use crate::worker::jobs::ProcessCdnLog; use crate::worker::Environment; -use anyhow::Context; +use anyhow::{anyhow, Context}; use aws_credential_types::Credentials; use aws_sdk_sqs::config::Region; use aws_sdk_sqs::types::Message; use crates_io_worker::BackgroundJob; +use deadpool_diesel::postgres::Pool; +use diesel::PgConnection; use std::sync::Arc; /// A background job that processes messages from the CDN log queue. @@ -33,7 +33,7 @@ impl BackgroundJob for ProcessCdnLogQueue { info!("Processing messages from the CDN log queue…"); let queue = build_queue(&ctx.config.cdn_log_queue); - run(&queue, self.max_messages, &ctx.connection_pool).await + run(&queue, self.max_messages, &ctx.deadpool).await } } @@ -66,7 +66,7 @@ fn build_queue(config: &CdnLogQueueConfig) -> Box { async fn run( queue: &impl SqsQueue, max_messages: usize, - connection_pool: &DieselPool, + connection_pool: &Pool, ) -> anyhow::Result<()> { const MAX_BATCH_SIZE: usize = 10; @@ -101,7 +101,7 @@ async fn run( async fn process_message( message: &Message, queue: &impl SqsQueue, - connection_pool: &DieselPool, + connection_pool: &Pool, ) -> anyhow::Result<()> { debug!("Processing message…"); @@ -133,7 +133,7 @@ async fn process_message( /// warning and returns `Ok(())` instead. This is because we don't want to /// requeue the message in the case of a parsing error, as it would just be /// retried indefinitely. -async fn process_body(body: &str, connection_pool: &DieselPool) -> anyhow::Result<()> { +async fn process_body(body: &str, connection_pool: &Pool) -> anyhow::Result<()> { let message = match serde_json::from_str::(body) { Ok(message) => message, Err(err) => { @@ -152,8 +152,11 @@ async fn process_body(body: &str, connection_pool: &DieselPool) -> anyhow::Resul return Ok(()); } - let pool = connection_pool.clone(); - spawn_blocking(move || enqueue_jobs(jobs, &pool)).await + let conn = connection_pool.get().await; + let conn = conn.context("Failed to acquire database connection")?; + conn.interact(move |conn| enqueue_jobs(jobs, conn)) + .await + .map_err(|err| anyhow!(err.to_string()))? } /// Extracts a list of [`ProcessCdnLog`] jobs from a message. @@ -199,16 +202,12 @@ fn is_ignored_path(path: &str) -> bool { path.contains("/index.staging.crates.io/") || path.contains("/index.crates.io/") } -fn enqueue_jobs(jobs: Vec, pool: &DieselPool) -> anyhow::Result<()> { - let mut conn = pool - .get() - .context("Failed to acquire database connection")?; - +fn enqueue_jobs(jobs: Vec, conn: &mut PgConnection) -> anyhow::Result<()> { for job in jobs { let path = &job.path; info!("Enqueuing processing job… ({path})"); - job.enqueue(&mut conn) + job.enqueue(conn) .context("Failed to enqueue processing job")?; debug!("Enqueued processing job"); @@ -225,8 +224,9 @@ mod tests { use aws_sdk_sqs::types::Message; use crates_io_test_db::TestDatabase; use crates_io_worker::schema::background_jobs; + use deadpool_diesel::postgres::Manager; + use deadpool_diesel::Runtime; use diesel::prelude::*; - use diesel::r2d2::{ConnectionManager, Pool}; use diesel::QueryDsl; use insta::assert_snapshot; use parking_lot::Mutex; @@ -392,9 +392,9 @@ mod tests { deleted_handles } - fn build_connection_pool(url: &str) -> DieselPool { - let pool = Pool::builder().build(ConnectionManager::new(url)).unwrap(); - DieselPool::new_background_worker(pool) + fn build_connection_pool(url: &str) -> Pool { + let manager = Manager::new(url, Runtime::Tokio1); + Pool::builder(manager).build().unwrap() } fn message(id: &str, region: &str, bucket: &str, path: &str) -> Message { From 040f71f3e24b4f217fb9dee385a48d46ad411fad Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 27 Feb 2024 12:18:59 +0100 Subject: [PATCH 3/8] worker/jobs/downloads/process_log: Replace `r2d2` with `deadpool` --- src/worker/jobs/downloads/process_log.rs | 67 +++++++++++------------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/src/worker/jobs/downloads/process_log.rs b/src/worker/jobs/downloads/process_log.rs index 0dd6bac0728..fbd88206d39 100644 --- a/src/worker/jobs/downloads/process_log.rs +++ b/src/worker/jobs/downloads/process_log.rs @@ -1,11 +1,10 @@ use crate::config::CdnLogStorageConfig; -use crate::db::DieselPool; -use crate::tasks::spawn_blocking; use crate::worker::Environment; -use anyhow::Context; +use anyhow::{anyhow, Context}; use chrono::NaiveDate; use crates_io_cdn_logs::{count_downloads, Decompressor, DownloadsMap}; use crates_io_worker::BackgroundJob; +use deadpool_diesel::postgres::Pool; use diesel::dsl::exists; use diesel::prelude::*; use diesel::{select, PgConnection, QueryResult}; @@ -52,7 +51,7 @@ impl BackgroundJob for ProcessCdnLog { let store = build_store(&ctx.config.cdn_log_storage, &self.region, &self.bucket) .context("Failed to build object store")?; - let db_pool = ctx.connection_pool.clone(); + let db_pool = ctx.deadpool.clone(); run(store, &self.path, db_pool).await } } @@ -97,7 +96,7 @@ fn build_store( /// it can be tested without having to construct a full [`Environment`] /// struct. #[instrument(skip_all, fields(cdn_log_store.path = %path))] -async fn run(store: Arc, path: &str, db_pool: DieselPool) -> anyhow::Result<()> { +async fn run(store: Arc, path: &str, db_pool: Pool) -> anyhow::Result<()> { if already_processed(path, db_pool.clone()).await? { warn!("Skipping already processed log file"); return Ok(()); @@ -115,8 +114,8 @@ async fn run(store: Arc, path: &str, db_pool: DieselPool) -> an log_stats(&downloads); let path = path.to_string(); - spawn_blocking(move || { - let mut conn = db_pool.get()?; + let conn = db_pool.get().await?; + conn.interact(|conn| { conn.transaction(|conn| { // Mark the log file as processed before saving the downloads to // the database. @@ -128,14 +127,15 @@ async fn run(store: Arc, path: &str, db_pool: DieselPool) -> an // When the job is retried the `already_processed()` call above // will return `true` and the job will skip processing the log // file again. - save_as_processed(&path, conn)?; + save_as_processed(path, conn)?; save_downloads(downloads, conn) })?; Ok::<_, anyhow::Error>(()) }) - .await?; + .await + .map_err(|err| anyhow!(err.to_string()))??; Ok(()) } @@ -343,16 +343,16 @@ impl Debug for NameAndVersion { /// Checks if the given log file has already been processed. /// -/// Calls [`spawn_blocking()`] and acquires a connection from the pool before -/// passing it to the [`already_processed_inner()`] function. -async fn already_processed(path: impl Into, db_pool: DieselPool) -> anyhow::Result { +/// Acquires a connection from the pool before passing it to the +/// [`already_processed_inner()`] function. +async fn already_processed(path: impl Into, db_pool: Pool) -> anyhow::Result { let path = path.into(); - let already_processed = spawn_blocking(move || { - let mut conn = db_pool.get()?; - Ok::<_, anyhow::Error>(already_processed_inner(path, &mut conn)?) - }) - .await?; + let conn = db_pool.get().await?; + let already_processed = conn + .interact(move |conn| already_processed_inner(path, conn)) + .await + .map_err(|err| anyhow!(err.to_string()))??; Ok(already_processed) } @@ -387,7 +387,8 @@ mod tests { use super::*; use crate::schema::{crates, version_downloads, versions}; use crates_io_test_db::TestDatabase; - use diesel::r2d2::{ConnectionManager, Pool}; + use deadpool_diesel::postgres::Manager; + use deadpool_diesel::Runtime; use insta::assert_debug_snapshot; const CLOUDFRONT_PATH: &str = @@ -466,23 +467,23 @@ mod tests { } /// Builds a connection pool to the test database. - fn build_connection_pool(url: &str) -> DieselPool { - let pool = Pool::builder().build(ConnectionManager::new(url)).unwrap(); - DieselPool::new_background_worker(pool) + fn build_connection_pool(url: &str) -> Pool { + let manager = Manager::new(url, Runtime::Tokio1); + Pool::builder(manager).build().unwrap() } /// Inserts some dummy crates and versions into the database. - async fn create_dummy_crates_and_versions(db_pool: DieselPool) { - spawn_blocking(move || { - let mut conn = db_pool.get().unwrap(); - - create_crate_and_version("bindgen", "0.65.1", &mut conn); - create_crate_and_version("tracing-core", "0.1.32", &mut conn); - create_crate_and_version("quick-error", "1.2.3", &mut conn); + async fn create_dummy_crates_and_versions(db_pool: Pool) { + let conn = db_pool.get().await.unwrap(); + conn.interact(move |conn| { + create_crate_and_version("bindgen", "0.65.1", conn); + create_crate_and_version("tracing-core", "0.1.32", conn); + create_crate_and_version("quick-error", "1.2.3", conn); Ok::<_, anyhow::Error>(()) }) .await + .unwrap() .unwrap(); } @@ -506,13 +507,9 @@ mod tests { /// Queries all version downloads from the database and returns them as a /// [`Vec`] of strings for use with [`assert_debug_snapshot!()`]. - async fn all_version_downloads(db_pool: DieselPool) -> Vec { - let downloads = spawn_blocking(move || { - let mut conn = db_pool.get().unwrap(); - Ok::<_, anyhow::Error>(query_all_version_downloads(&mut conn)) - }) - .await - .unwrap(); + async fn all_version_downloads(db_pool: Pool) -> Vec { + let conn = db_pool.get().await.unwrap(); + let downloads = conn.interact(query_all_version_downloads).await.unwrap(); downloads .into_iter() From 3f6ccf6185dfb62f659b8f650baa97d30c9fbe66 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 27 Feb 2024 12:22:10 +0100 Subject: [PATCH 4/8] worker/jobs/daily_db_maintenance: Replace `r2d2` with `deadpool` --- src/worker/jobs/daily_db_maintenance.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/worker/jobs/daily_db_maintenance.rs b/src/worker/jobs/daily_db_maintenance.rs index 25faab680dc..2d4341b43e3 100644 --- a/src/worker/jobs/daily_db_maintenance.rs +++ b/src/worker/jobs/daily_db_maintenance.rs @@ -1,5 +1,5 @@ -use crate::tasks::spawn_blocking; use crate::worker::Environment; +use anyhow::anyhow; use crates_io_worker::BackgroundJob; use diesel::{sql_query, RunQueryDsl}; use std::sync::Arc; @@ -22,14 +22,14 @@ impl BackgroundJob for DailyDbMaintenance { /// archive daily download counts and drop historical data, we can drop this task and rely on /// auto-vacuum again. async fn run(&self, env: Self::Context) -> anyhow::Result<()> { - spawn_blocking(move || { - let mut conn = env.connection_pool.get()?; - + let conn = env.deadpool.get().await?; + conn.interact(move |conn| { info!("Running VACUUM on version_downloads table"); - sql_query("VACUUM version_downloads;").execute(&mut *conn)?; + sql_query("VACUUM version_downloads;").execute(conn)?; info!("Finished running VACUUM on version_downloads table"); Ok(()) }) .await + .map_err(|err| anyhow!(err.to_string()))? } } From a63f8ed500152c92cabfeb36b003df1c7defd1e3 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 27 Feb 2024 12:25:14 +0100 Subject: [PATCH 5/8] worker/jobs/readmes: Replace `r2d2` with `deadpool` --- src/worker/jobs/readmes.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/worker/jobs/readmes.rs b/src/worker/jobs/readmes.rs index eaf84ebedd8..6a9d636513a 100644 --- a/src/worker/jobs/readmes.rs +++ b/src/worker/jobs/readmes.rs @@ -3,6 +3,7 @@ use crate::models::Version; use crate::tasks::spawn_blocking; use crate::worker::Environment; +use anyhow::anyhow; use crates_io_markdown::text_to_html; use crates_io_worker::BackgroundJob; use std::sync::Arc; @@ -49,18 +50,22 @@ impl BackgroundJob for RenderAndUploadReadme { info!(version_id = ?self.version_id, "Rendering README"); let job = self.clone(); - spawn_blocking(move || { - let rendered = text_to_html( + let rendered = spawn_blocking(move || { + Ok::<_, anyhow::Error>(text_to_html( &job.text, &job.readme_path, job.base_url.as_deref(), job.pkg_path_in_vcs.as_ref(), - ); - if rendered.is_empty() { - return Ok(()); - } + )) + }) + .await?; + + if rendered.is_empty() { + return Ok(()); + } - let mut conn = env.connection_pool.get()?; + let conn = env.deadpool.get().await?; + conn.interact(move |conn| { conn.transaction(|conn| { Version::record_readme_rendering(job.version_id, conn)?; let (crate_name, vers): (String, String) = versions::table @@ -79,5 +84,6 @@ impl BackgroundJob for RenderAndUploadReadme { }) }) .await + .map_err(|err| anyhow!(err.to_string()))? } } From ee6dc1a274100737fa66d35e6ceb06e2ade56b94 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 27 Feb 2024 12:26:30 +0100 Subject: [PATCH 6/8] worker/jobs/sync_admins: Replace `r2d2` with `deadpool` --- src/worker/jobs/sync_admins.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/worker/jobs/sync_admins.rs b/src/worker/jobs/sync_admins.rs index 174dc23a235..42476609ebe 100644 --- a/src/worker/jobs/sync_admins.rs +++ b/src/worker/jobs/sync_admins.rs @@ -1,7 +1,7 @@ use crate::email::Email; use crate::schema::{emails, users}; -use crate::tasks::spawn_blocking; use crate::worker::Environment; +use anyhow::anyhow; use crates_io_worker::BackgroundJob; use diesel::prelude::*; use diesel::RunQueryDsl; @@ -29,7 +29,8 @@ impl BackgroundJob for SyncAdmins { .map(|m| m.github_id) .collect::>(); - spawn_blocking::<_, _, anyhow::Error>(move || { + let conn = ctx.deadpool.get().await?; + conn.interact::<_, anyhow::Result<_>>(move |conn| { let format_repo_admins = |github_ids: &HashSet| { repo_admins .iter() @@ -40,13 +41,11 @@ impl BackgroundJob for SyncAdmins { // Existing admins from the database. - let mut conn = ctx.connection_pool.get()?; - let database_admins = users::table .left_join(emails::table) .select((users::gh_id, users::gh_login, emails::email.nullable())) .filter(users::is_admin.eq(true)) - .get_results::<(i32, String, Option)>(&mut conn)?; + .get_results::<(i32, String, Option)>(conn)?; let database_admin_ids = database_admins .iter() @@ -78,7 +77,7 @@ impl BackgroundJob for SyncAdmins { .filter(users::gh_id.eq_any(&new_admin_ids)) .set(users::is_admin.eq(true)) .returning(users::gh_id) - .get_results::(&mut conn)? + .get_results::(conn)? }; // New admins from the team repo that have been granted admin @@ -121,7 +120,7 @@ impl BackgroundJob for SyncAdmins { .filter(users::gh_id.eq_any(&obsolete_admin_ids)) .set(users::is_admin.eq(false)) .returning(users::gh_id) - .get_results::(&mut conn)? + .get_results::(conn)? }; let removed_admin_ids = HashSet::from_iter(removed_admin_ids); @@ -158,7 +157,8 @@ impl BackgroundJob for SyncAdmins { Ok(()) }) - .await?; + .await + .map_err(|err| anyhow!(err.to_string()))??; Ok(()) } From d12db38b0d493d7fad5ad549cdde2f5356dcd2ac Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 27 Feb 2024 12:28:30 +0100 Subject: [PATCH 7/8] worker/jobs/typosquat: Replace `r2d2` with `deadpool` --- src/worker/jobs/typosquat.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/worker/jobs/typosquat.rs b/src/worker/jobs/typosquat.rs index f2e5e5b5bca..500b62b08a7 100644 --- a/src/worker/jobs/typosquat.rs +++ b/src/worker/jobs/typosquat.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use std::sync::Arc; use crates_io_worker::BackgroundJob; @@ -5,7 +6,6 @@ use diesel::PgConnection; use typomania::Package; use crate::email::Email; -use crate::tasks::spawn_blocking; use crate::{ typosquat::{Cache, Crate}, worker::Environment, @@ -34,12 +34,13 @@ impl BackgroundJob for CheckTyposquat { async fn run(&self, env: Self::Context) -> anyhow::Result<()> { let crate_name = self.name.clone(); - spawn_blocking(move || { - let mut conn = env.connection_pool.get()?; - let cache = env.typosquat_cache(&mut conn)?; - check(&env.emails, cache, &mut conn, &crate_name) + let conn = env.deadpool.get().await?; + conn.interact(move |conn| { + let cache = env.typosquat_cache(conn)?; + check(&env.emails, cache, conn, &crate_name) }) .await + .map_err(|err| anyhow!(err.to_string()))? } } From e6bbda7740364719bfcba0c95fe4f5756167d469 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 27 Feb 2024 12:32:06 +0100 Subject: [PATCH 8/8] worker/environment: Remove obsolete `connection_pool` field --- src/bin/background-worker.rs | 2 -- src/tests/util/test_app.rs | 1 - src/worker/environment.rs | 2 -- 3 files changed, 5 deletions(-) diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 2c83a7726c9..a886f6c374b 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -15,7 +15,6 @@ extern crate tracing; use anyhow::Context; use crates_io::cloudfront::CloudFront; -use crates_io::db::DieselPool; use crates_io::fastly::Fastly; use crates_io::storage::Storage; use crates_io::team_repo::TeamRepoImpl; @@ -96,7 +95,6 @@ fn main() -> anyhow::Result<()> { .cloudfront(cloudfront) .fastly(fastly) .storage(storage) - .connection_pool(DieselPool::new_background_worker(connection_pool.clone())) .deadpool(deadpool) .emails(emails) .team_repo(Box::new(team_repo)) diff --git a/src/tests/util/test_app.rs b/src/tests/util/test_app.rs index 6c3054abb37..f30072b4a94 100644 --- a/src/tests/util/test_app.rs +++ b/src/tests/util/test_app.rs @@ -280,7 +280,6 @@ impl TestAppBuilder { .config(app.config.clone()) .repository_config(repository_config) .storage(app.storage.clone()) - .connection_pool(app.primary_database.clone()) .deadpool(app.deadpool_primary.clone()) .emails(app.emails.clone()) .team_repo(Box::new(self.team_repo)) diff --git a/src/worker/environment.rs b/src/worker/environment.rs index d36d7bf834f..4da8fe9c280 100644 --- a/src/worker/environment.rs +++ b/src/worker/environment.rs @@ -1,5 +1,4 @@ use crate::cloudfront::CloudFront; -use crate::db::DieselPool; use crate::fastly::Fastly; use crate::storage::Storage; use crate::team_repo::TeamRepo; @@ -27,7 +26,6 @@ pub struct Environment { #[builder(default)] fastly: Option, pub storage: Arc, - pub connection_pool: DieselPool, pub deadpool: DeadpoolPool, pub emails: Emails, pub team_repo: Box,