Skip to content

Commit

Permalink
crates_io_worker: Replace r2d2 with deadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
Turbo87 committed Apr 9, 2024
1 parent 470b0ff commit 9e29777
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/crates_io_worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ workspace = true

[dependencies]
anyhow = "=1.0.81"
diesel = { version = "=2.1.5", features = ["postgres", "r2d2", "serde_json"] }
deadpool-diesel = { version = "=0.6.0", features = ["postgres", "tracing"] }
diesel = { version = "=2.1.5", features = ["postgres", "serde_json"] }
futures-util = "=0.3.30"
sentry-core = { version = "=0.32.2", features = ["client"] }
serde = { version = "=1.0.197", features = ["derive"] }
Expand Down
18 changes: 7 additions & 11 deletions crates/crates_io_worker/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use crate::background_job::DEFAULT_QUEUE;
use crate::job_registry::JobRegistry;
use crate::util::spawn_blocking;
use crate::worker::Worker;
use crate::{storage, BackgroundJob};
use anyhow::anyhow;
use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, Pool};
use deadpool_diesel::postgres::Pool;
use futures_util::future::join_all;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -16,19 +14,17 @@ use tracing::{info, info_span, warn, Instrument};

const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(1);

pub type ConnectionPool = Pool<ConnectionManager<PgConnection>>;

/// The core runner responsible for locking and running jobs
pub struct Runner<Context> {
rt_handle: Handle,
connection_pool: ConnectionPool,
connection_pool: Pool,
queues: HashMap<String, Queue<Context>>,
context: Context,
shutdown_when_queue_empty: bool,
}

impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
pub fn new(rt_handle: &Handle, connection_pool: ConnectionPool, context: Context) -> Self {
pub fn new(rt_handle: &Handle, connection_pool: Pool, context: Context) -> Self {
Self {
rt_handle: rt_handle.clone(),
connection_pool,
Expand Down Expand Up @@ -104,17 +100,17 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
/// This function is intended for use in tests and will return an error if
/// any jobs have failed.
pub async fn check_for_failed_jobs(&self) -> anyhow::Result<()> {
let pool = self.connection_pool.clone();
spawn_blocking(move || {
let mut conn = pool.get()?;
let failed_jobs = storage::failed_job_count(&mut conn)?;
let conn = self.connection_pool.get().await?;
conn.interact(move |conn| {
let failed_jobs = storage::failed_job_count(conn)?;
if failed_jobs == 0 {
Ok(())
} else {
Err(anyhow!("{failed_jobs} jobs failed"))
}
})
.await
.map_err(|err| anyhow!(err.to_string()))?
}
}

Expand Down
17 changes: 0 additions & 17 deletions crates/crates_io_worker/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,6 @@ use sentry_core::Hub;
use std::any::Any;
use std::future::Future;
use std::panic::PanicInfo;
use tokio::task::JoinError;

pub async fn spawn_blocking<F, R, E>(f: F) -> Result<R, E>
where
F: FnOnce() -> Result<R, E> + Send + 'static,
R: Send + 'static,
E: Send + From<JoinError> + 'static,
{
let current_span = tracing::Span::current();
let hub = Hub::current();
tokio::task::spawn_blocking(move || current_span.in_scope(|| Hub::run(hub, f)))
.await
// Convert `JoinError` to `E`
.map_err(Into::into)
// Flatten `Result<Result<_, E>, E>` to `Result<_, E>`
.and_then(std::convert::identity)
}

pub async fn with_sentry_transaction<F, R, E, Fut>(
transaction_name: &str,
Expand Down
12 changes: 6 additions & 6 deletions crates/crates_io_worker/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::job_registry::JobRegistry;
use crate::runner::ConnectionPool;
use crate::storage;
use crate::util::{spawn_blocking, try_to_extract_panic_info, with_sentry_transaction};
use crate::util::{try_to_extract_panic_info, with_sentry_transaction};
use anyhow::anyhow;
use deadpool_diesel::postgres::Pool;
use diesel::prelude::*;
use futures_util::FutureExt;
use sentry_core::{Hub, SentryFutureExt};
Expand All @@ -14,7 +14,7 @@ use tokio::time::sleep;
use tracing::{debug, error, info_span, warn};

pub struct Worker<Context> {
pub(crate) connection_pool: ConnectionPool,
pub(crate) connection_pool: Pool,
pub(crate) context: Context,
pub(crate) job_registry: Arc<JobRegistry<Context>>,
pub(crate) shutdown_when_queue_empty: bool,
Expand Down Expand Up @@ -56,11 +56,10 @@ impl<Context: Clone + Send + Sync + 'static> Worker<Context> {
async fn run_next_job(&self) -> anyhow::Result<Option<i64>> {
let context = self.context.clone();
let job_registry = self.job_registry.clone();
let pool = self.connection_pool.clone();
let conn = self.connection_pool.get().await?;

spawn_blocking(move || {
conn.interact(move |conn| {
let job_types = job_registry.job_types();
let conn = &mut *pool.get()?;
conn.transaction(|conn| {
debug!("Looking for next background worker job…");
let Some(job) = storage::find_next_unlocked_job(conn, &job_types).optional()?
Expand Down Expand Up @@ -105,5 +104,6 @@ impl<Context: Clone + Send + Sync + 'static> Worker<Context> {
})
})
.await
.map_err(|err| anyhow!(err.to_string()))?
}
}
11 changes: 5 additions & 6 deletions crates/crates_io_worker/tests/runner.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crates_io_test_db::TestDatabase;
use crates_io_worker::schema::background_jobs;
use crates_io_worker::{BackgroundJob, Runner};
use deadpool_diesel::postgres::{Manager, Pool};
use deadpool_diesel::Runtime;
use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, Pool};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -210,12 +211,10 @@ fn runner<Context: Clone + Send + Sync + 'static>(
database_url: &str,
context: Context,
) -> Runner<Context> {
let connection_pool = Pool::builder()
.max_size(4)
.min_idle(Some(0))
.build_unchecked(ConnectionManager::new(database_url));
let manager = Manager::new(database_url, Runtime::Tokio1);
let deadpool = Pool::builder(manager).max_size(4).build().unwrap();

Runner::new(&Handle::current(), connection_pool, context)
Runner::new(&Handle::current(), deadpool, context)
.configure_default_queue(|queue| queue.num_workers(2))
.shutdown_when_queue_empty()
}
11 changes: 2 additions & 9 deletions src/bin/background-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use crates_io_index::RepositoryConfig;
use crates_io_worker::Runner;
use deadpool_diesel::postgres::{Manager as DeadpoolManager, Pool as DeadpoolPool};
use deadpool_diesel::Runtime;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use reqwest::Client;
use secrecy::ExposeSecret;
use std::sync::Arc;
Expand Down Expand Up @@ -81,11 +79,6 @@ fn main() -> anyhow::Result<()> {
let fastly = Fastly::from_environment(client.clone());
let team_repo = TeamRepoImpl::default();

let connection_pool = r2d2::Pool::builder()
.max_size(10)
.min_idle(Some(0))
.build_unchecked(ConnectionManager::new(&db_url));

let manager = DeadpoolManager::new(db_url, Runtime::Tokio1);
let deadpool = DeadpoolPool::builder(manager).max_size(10).build().unwrap();

Expand All @@ -95,7 +88,7 @@ fn main() -> anyhow::Result<()> {
.cloudfront(cloudfront)
.fastly(fastly)
.storage(storage)
.deadpool(deadpool)
.deadpool(deadpool.clone())
.emails(emails)
.team_repo(Box::new(team_repo))
.build()?;
Expand All @@ -111,7 +104,7 @@ fn main() -> anyhow::Result<()> {
}
});

let runner = Runner::new(runtime.handle(), connection_pool, environment.clone())
let runner = Runner::new(runtime.handle(), deadpool, environment.clone())
.configure_default_queue(|queue| queue.num_workers(5))
.configure_queue("downloads", |queue| queue.num_workers(1))
.configure_queue("repository", |queue| queue.num_workers(1))
Expand Down
2 changes: 1 addition & 1 deletion src/tests/util/test_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl TestAppBuilder {

let runner = Runner::new(
runtime.handle(),
(*app.primary_database).clone(),
app.deadpool_primary.clone(),
Arc::new(environment),
)
.shutdown_when_queue_empty()
Expand Down

0 comments on commit 9e29777

Please sign in to comment.