Skip to content

Commit

Permalink
Merge pull request #8385 from Turbo87/deadpool-worker
Browse files Browse the repository at this point in the history
worker: Replace `r2d2` with `deadpool`
  • Loading branch information
Turbo87 authored Apr 8, 2024
2 parents 2ec0929 + e6bbda7 commit bf5ee48
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 94 deletions.
2 changes: 0 additions & 2 deletions src/bin/background-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ extern crate tracing;

use anyhow::Context;
use crates_io::cloudfront::CloudFront;
use crates_io::db::DieselPool;
use crates_io::fastly::Fastly;
use crates_io::storage::Storage;
use crates_io::team_repo::TeamRepoImpl;
Expand Down Expand Up @@ -96,7 +95,6 @@ fn main() -> anyhow::Result<()> {
.cloudfront(cloudfront)
.fastly(fastly)
.storage(storage)
.connection_pool(DieselPool::new_background_worker(connection_pool.clone()))
.deadpool(deadpool)
.emails(emails)
.team_repo(Box::new(team_repo))
Expand Down
1 change: 0 additions & 1 deletion src/tests/util/test_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ impl TestAppBuilder {
.config(app.config.clone())
.repository_config(repository_config)
.storage(app.storage.clone())
.connection_pool(app.primary_database.clone())
.deadpool(app.deadpool_primary.clone())
.emails(app.emails.clone())
.team_repo(Box::new(self.team_repo))
Expand Down
2 changes: 0 additions & 2 deletions src/worker/environment.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::cloudfront::CloudFront;
use crate::db::DieselPool;
use crate::fastly::Fastly;
use crate::storage::Storage;
use crate::team_repo::TeamRepo;
Expand Down Expand Up @@ -27,7 +26,6 @@ pub struct Environment {
#[builder(default)]
fastly: Option<Fastly>,
pub storage: Arc<Storage>,
pub connection_pool: DieselPool,
pub deadpool: DeadpoolPool,
pub emails: Emails,
pub team_repo: Box<dyn TeamRepo + Send + Sync>,
Expand Down
10 changes: 5 additions & 5 deletions src/worker/jobs/daily_db_maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::tasks::spawn_blocking;
use crate::worker::Environment;
use anyhow::anyhow;
use crates_io_worker::BackgroundJob;
use diesel::{sql_query, RunQueryDsl};
use std::sync::Arc;
Expand All @@ -22,14 +22,14 @@ impl BackgroundJob for DailyDbMaintenance {
/// archive daily download counts and drop historical data, we can drop this task and rely on
/// auto-vacuum again.
async fn run(&self, env: Self::Context) -> anyhow::Result<()> {
spawn_blocking(move || {
let mut conn = env.connection_pool.get()?;

let conn = env.deadpool.get().await?;
conn.interact(move |conn| {
info!("Running VACUUM on version_downloads table");
sql_query("VACUUM version_downloads;").execute(&mut *conn)?;
sql_query("VACUUM version_downloads;").execute(conn)?;
info!("Finished running VACUUM on version_downloads table");
Ok(())
})
.await
.map_err(|err| anyhow!(err.to_string()))?
}
}
67 changes: 32 additions & 35 deletions src/worker/jobs/downloads/process_log.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::config::CdnLogStorageConfig;
use crate::db::DieselPool;
use crate::tasks::spawn_blocking;
use crate::worker::Environment;
use anyhow::Context;
use anyhow::{anyhow, Context};
use chrono::NaiveDate;
use crates_io_cdn_logs::{count_downloads, Decompressor, DownloadsMap};
use crates_io_worker::BackgroundJob;
use deadpool_diesel::postgres::Pool;
use diesel::dsl::exists;
use diesel::prelude::*;
use diesel::{select, PgConnection, QueryResult};
Expand Down Expand Up @@ -52,7 +51,7 @@ impl BackgroundJob for ProcessCdnLog {
let store = build_store(&ctx.config.cdn_log_storage, &self.region, &self.bucket)
.context("Failed to build object store")?;

let db_pool = ctx.connection_pool.clone();
let db_pool = ctx.deadpool.clone();
run(store, &self.path, db_pool).await
}
}
Expand Down Expand Up @@ -97,7 +96,7 @@ fn build_store(
/// it can be tested without having to construct a full [`Environment`]
/// struct.
#[instrument(skip_all, fields(cdn_log_store.path = %path))]
async fn run(store: Arc<dyn ObjectStore>, path: &str, db_pool: DieselPool) -> anyhow::Result<()> {
async fn run(store: Arc<dyn ObjectStore>, path: &str, db_pool: Pool) -> anyhow::Result<()> {
if already_processed(path, db_pool.clone()).await? {
warn!("Skipping already processed log file");
return Ok(());
Expand All @@ -115,8 +114,8 @@ async fn run(store: Arc<dyn ObjectStore>, path: &str, db_pool: DieselPool) -> an
log_stats(&downloads);

let path = path.to_string();
spawn_blocking(move || {
let mut conn = db_pool.get()?;
let conn = db_pool.get().await?;
conn.interact(|conn| {
conn.transaction(|conn| {
// Mark the log file as processed before saving the downloads to
// the database.
Expand All @@ -128,14 +127,15 @@ async fn run(store: Arc<dyn ObjectStore>, path: &str, db_pool: DieselPool) -> an
// When the job is retried the `already_processed()` call above
// will return `true` and the job will skip processing the log
// file again.
save_as_processed(&path, conn)?;
save_as_processed(path, conn)?;

save_downloads(downloads, conn)
})?;

Ok::<_, anyhow::Error>(())
})
.await?;
.await
.map_err(|err| anyhow!(err.to_string()))??;

Ok(())
}
Expand Down Expand Up @@ -343,16 +343,16 @@ impl Debug for NameAndVersion {

/// Checks if the given log file has already been processed.
///
/// Calls [`spawn_blocking()`] and acquires a connection from the pool before
/// passing it to the [`already_processed_inner()`] function.
async fn already_processed(path: impl Into<String>, db_pool: DieselPool) -> anyhow::Result<bool> {
/// Acquires a connection from the pool before passing it to the
/// [`already_processed_inner()`] function.
async fn already_processed(path: impl Into<String>, db_pool: Pool) -> anyhow::Result<bool> {
let path = path.into();

let already_processed = spawn_blocking(move || {
let mut conn = db_pool.get()?;
Ok::<_, anyhow::Error>(already_processed_inner(path, &mut conn)?)
})
.await?;
let conn = db_pool.get().await?;
let already_processed = conn
.interact(move |conn| already_processed_inner(path, conn))
.await
.map_err(|err| anyhow!(err.to_string()))??;

Ok(already_processed)
}
Expand Down Expand Up @@ -387,7 +387,8 @@ mod tests {
use super::*;
use crate::schema::{crates, version_downloads, versions};
use crates_io_test_db::TestDatabase;
use diesel::r2d2::{ConnectionManager, Pool};
use deadpool_diesel::postgres::Manager;
use deadpool_diesel::Runtime;
use insta::assert_debug_snapshot;

const CLOUDFRONT_PATH: &str =
Expand Down Expand Up @@ -466,23 +467,23 @@ mod tests {
}

/// Builds a connection pool to the test database.
fn build_connection_pool(url: &str) -> DieselPool {
let pool = Pool::builder().build(ConnectionManager::new(url)).unwrap();
DieselPool::new_background_worker(pool)
fn build_connection_pool(url: &str) -> Pool {
let manager = Manager::new(url, Runtime::Tokio1);
Pool::builder(manager).build().unwrap()
}

/// Inserts some dummy crates and versions into the database.
async fn create_dummy_crates_and_versions(db_pool: DieselPool) {
spawn_blocking(move || {
let mut conn = db_pool.get().unwrap();

create_crate_and_version("bindgen", "0.65.1", &mut conn);
create_crate_and_version("tracing-core", "0.1.32", &mut conn);
create_crate_and_version("quick-error", "1.2.3", &mut conn);
async fn create_dummy_crates_and_versions(db_pool: Pool) {
let conn = db_pool.get().await.unwrap();
conn.interact(move |conn| {
create_crate_and_version("bindgen", "0.65.1", conn);
create_crate_and_version("tracing-core", "0.1.32", conn);
create_crate_and_version("quick-error", "1.2.3", conn);

Ok::<_, anyhow::Error>(())
})
.await
.unwrap()
.unwrap();
}

Expand All @@ -506,13 +507,9 @@ mod tests {

/// Queries all version downloads from the database and returns them as a
/// [`Vec`] of strings for use with [`assert_debug_snapshot!()`].
async fn all_version_downloads(db_pool: DieselPool) -> Vec<String> {
let downloads = spawn_blocking(move || {
let mut conn = db_pool.get().unwrap();
Ok::<_, anyhow::Error>(query_all_version_downloads(&mut conn))
})
.await
.unwrap();
async fn all_version_downloads(db_pool: Pool) -> Vec<String> {
let conn = db_pool.get().await.unwrap();
let downloads = conn.interact(query_all_version_downloads).await.unwrap();

downloads
.into_iter()
Expand Down
38 changes: 19 additions & 19 deletions src/worker/jobs/downloads/queue/job.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::config::CdnLogQueueConfig;
use crate::db::DieselPool;
use crate::sqs::{MockSqsQueue, SqsQueue, SqsQueueImpl};
use crate::tasks::spawn_blocking;
use crate::worker::jobs::ProcessCdnLog;
use crate::worker::Environment;
use anyhow::Context;
use anyhow::{anyhow, Context};
use aws_credential_types::Credentials;
use aws_sdk_sqs::config::Region;
use aws_sdk_sqs::types::Message;
use crates_io_worker::BackgroundJob;
use deadpool_diesel::postgres::Pool;
use diesel::PgConnection;
use std::sync::Arc;

/// A background job that processes messages from the CDN log queue.
Expand All @@ -33,7 +33,7 @@ impl BackgroundJob for ProcessCdnLogQueue {
info!("Processing messages from the CDN log queue…");

let queue = build_queue(&ctx.config.cdn_log_queue);
run(&queue, self.max_messages, &ctx.connection_pool).await
run(&queue, self.max_messages, &ctx.deadpool).await
}
}

Expand Down Expand Up @@ -66,7 +66,7 @@ fn build_queue(config: &CdnLogQueueConfig) -> Box<dyn SqsQueue + Send + Sync> {
async fn run(
queue: &impl SqsQueue,
max_messages: usize,
connection_pool: &DieselPool,
connection_pool: &Pool,
) -> anyhow::Result<()> {
const MAX_BATCH_SIZE: usize = 10;

Expand Down Expand Up @@ -101,7 +101,7 @@ async fn run(
async fn process_message(
message: &Message,
queue: &impl SqsQueue,
connection_pool: &DieselPool,
connection_pool: &Pool,
) -> anyhow::Result<()> {
debug!("Processing message…");

Expand Down Expand Up @@ -133,7 +133,7 @@ async fn process_message(
/// warning and returns `Ok(())` instead. This is because we don't want to
/// requeue the message in the case of a parsing error, as it would just be
/// retried indefinitely.
async fn process_body(body: &str, connection_pool: &DieselPool) -> anyhow::Result<()> {
async fn process_body(body: &str, connection_pool: &Pool) -> anyhow::Result<()> {
let message = match serde_json::from_str::<super::message::Message>(body) {
Ok(message) => message,
Err(err) => {
Expand All @@ -152,8 +152,11 @@ async fn process_body(body: &str, connection_pool: &DieselPool) -> anyhow::Resul
return Ok(());
}

let pool = connection_pool.clone();
spawn_blocking(move || enqueue_jobs(jobs, &pool)).await
let conn = connection_pool.get().await;
let conn = conn.context("Failed to acquire database connection")?;
conn.interact(move |conn| enqueue_jobs(jobs, conn))
.await
.map_err(|err| anyhow!(err.to_string()))?
}

/// Extracts a list of [`ProcessCdnLog`] jobs from a message.
Expand Down Expand Up @@ -199,16 +202,12 @@ fn is_ignored_path(path: &str) -> bool {
path.contains("/index.staging.crates.io/") || path.contains("/index.crates.io/")
}

fn enqueue_jobs(jobs: Vec<ProcessCdnLog>, pool: &DieselPool) -> anyhow::Result<()> {
let mut conn = pool
.get()
.context("Failed to acquire database connection")?;

fn enqueue_jobs(jobs: Vec<ProcessCdnLog>, conn: &mut PgConnection) -> anyhow::Result<()> {
for job in jobs {
let path = &job.path;

info!("Enqueuing processing job… ({path})");
job.enqueue(&mut conn)
job.enqueue(conn)
.context("Failed to enqueue processing job")?;

debug!("Enqueued processing job");
Expand All @@ -225,8 +224,9 @@ mod tests {
use aws_sdk_sqs::types::Message;
use crates_io_test_db::TestDatabase;
use crates_io_worker::schema::background_jobs;
use deadpool_diesel::postgres::Manager;
use deadpool_diesel::Runtime;
use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, Pool};
use diesel::QueryDsl;
use insta::assert_snapshot;
use parking_lot::Mutex;
Expand Down Expand Up @@ -392,9 +392,9 @@ mod tests {
deleted_handles
}

fn build_connection_pool(url: &str) -> DieselPool {
let pool = Pool::builder().build(ConnectionManager::new(url)).unwrap();
DieselPool::new_background_worker(pool)
fn build_connection_pool(url: &str) -> Pool {
let manager = Manager::new(url, Runtime::Tokio1);
Pool::builder(manager).build().unwrap()
}

fn message(id: &str, region: &str, bucket: &str, path: &str) -> Message {
Expand Down
21 changes: 11 additions & 10 deletions src/worker/jobs/git.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::models;
use crate::tasks::spawn_blocking;
use crate::worker::Environment;
use anyhow::Context;
use anyhow::{anyhow, Context};
use chrono::Utc;
use crates_io_env_vars::var_parsed;
use crates_io_index::{Crate, Repository};
Expand Down Expand Up @@ -39,9 +39,9 @@ impl BackgroundJob for SyncToGitIndex {
info!("Syncing to git index");

let crate_name = self.krate.clone();
spawn_blocking(move || {
let mut conn = env.connection_pool.get()?;
let new = get_index_data(&crate_name, &mut conn).context("Failed to get index data")?;
let conn = env.deadpool.get().await?;
conn.interact(move |conn| {
let new = get_index_data(&crate_name, conn).context("Failed to get index data")?;

let repo = env.lock_index()?;
let dst = repo.index_file(&crate_name);
Expand Down Expand Up @@ -75,6 +75,7 @@ impl BackgroundJob for SyncToGitIndex {
Ok(())
})
.await
.map_err(|err| anyhow!(err.to_string()))?
}
}

Expand Down Expand Up @@ -102,12 +103,12 @@ impl BackgroundJob for SyncToSparseIndex {
info!("Syncing to sparse index");

let crate_name = self.krate.clone();
let pool = env.connection_pool.clone();
let content = spawn_blocking(move || {
let mut conn = pool.get()?;
get_index_data(&crate_name, &mut conn).context("Failed to get index data")
})
.await?;
let conn = env.deadpool.get().await?;
let content = conn
.interact(move |conn| get_index_data(&crate_name, conn))
.await
.map_err(|err| anyhow!(err.to_string()))?
.context("Failed to get index data")?;

let future = env.storage.sync_index(&self.krate, content);
future.await.context("Failed to sync index data")?;
Expand Down
Loading

0 comments on commit bf5ee48

Please sign in to comment.