Skip to content

Commit

Permalink
Merge pull request #7403 from Turbo87/runner-constructor
Browse files Browse the repository at this point in the history
swirl/runner: Extract `new()` fn
  • Loading branch information
Turbo87 authored Oct 31, 2023
2 parents 1d387f2 + 1f350eb commit 2a45482
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 40 deletions.
19 changes: 16 additions & 3 deletions src/bin/background-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
extern crate tracing;

use crates_io::config;
use crates_io::db::DieselPool;
use crates_io::storage::Storage;
use crates_io::worker::cloudfront::CloudFront;
use crates_io::{background_jobs::*, db, env_optional, ssh};
use crates_io_index::{Repository, RepositoryConfig};
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use reqwest::blocking::Client;
use secrecy::ExposeSecret;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::time::{Duration, Instant};

use crates_io::swirl;
use crates_io::swirl::Runner;
use crates_io::worker::fastly::Fastly;

fn main() {
Expand Down Expand Up @@ -83,8 +86,18 @@ fn main() {

let environment = Arc::new(Some(environment));

let build_runner =
|| swirl::Runner::production_runner(environment.clone(), db_url.clone(), job_start_timeout);
let build_runner = || {
let connection_pool = r2d2::Pool::builder()
.max_size(10)
.min_idle(Some(0))
.build_unchecked(ConnectionManager::new(&db_url));

let connection_pool = DieselPool::new_background_worker(connection_pool);

Runner::new(connection_pool, environment.clone())
.num_workers(5)
.job_start_timeout(Duration::from_secs(job_start_timeout))
};

let mut runner = build_runner();

Expand Down
56 changes: 23 additions & 33 deletions src/swirl/runner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use diesel::connection::{AnsiTransactionManager, TransactionManager};
use diesel::prelude::*;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use std::any::Any;
use std::error::Error;
use std::panic::{catch_unwind, AssertUnwindSafe, PanicInfo, UnwindSafe};
Expand All @@ -18,6 +16,8 @@ use event::Event;

mod event;

const DEFAULT_JOB_START_TIMEOUT: Duration = Duration::from_secs(30);

/// The core runner responsible for locking and running jobs
pub struct Runner {
connection_pool: DieselPool,
Expand All @@ -27,43 +27,23 @@ pub struct Runner {
}

impl Runner {
pub fn production_runner(
environment: Arc<Option<Environment>>,
url: String,
job_start_timeout: u64,
) -> Self {
let connection_pool = r2d2::Pool::builder()
.max_size(10)
.min_idle(Some(0))
.build_unchecked(ConnectionManager::new(url));
pub fn new(connection_pool: DieselPool, environment: Arc<Option<Environment>>) -> Self {
Self {
connection_pool: DieselPool::new_background_worker(connection_pool),
thread_pool: ThreadPool::new(5),
connection_pool,
thread_pool: ThreadPool::new(1),
environment,
job_start_timeout: Duration::from_secs(job_start_timeout),
job_start_timeout: DEFAULT_JOB_START_TIMEOUT,
}
}

#[cfg(test)]
fn internal_test_runner(environment: Option<Environment>, url: String) -> Self {
let connection_pool = r2d2::Pool::builder()
.max_size(4)
.build_unchecked(ConnectionManager::new(url));
Self {
connection_pool: DieselPool::new_background_worker(connection_pool),
thread_pool: ThreadPool::new(2),
environment: Arc::new(environment),
job_start_timeout: Duration::from_secs(10),
}
pub fn num_workers(mut self, num_workers: usize) -> Self {
self.thread_pool.set_num_threads(num_workers);
self
}

pub fn test_runner(environment: Environment, connection_pool: DieselPool) -> Self {
Self {
connection_pool,
thread_pool: ThreadPool::new(1),
environment: Arc::new(Some(environment)),
job_start_timeout: Duration::from_secs(5),
}
pub fn job_start_timeout(mut self, job_start_timeout: Duration) -> Self {
self.job_start_timeout = job_start_timeout;
self
}

/// Runs all pending jobs in the queue
Expand Down Expand Up @@ -285,6 +265,8 @@ mod tests {

use super::*;
use crate::schema::background_jobs;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use std::panic::AssertUnwindSafe;
use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::{Arc, Barrier, Mutex, MutexGuard};
Expand Down Expand Up @@ -428,7 +410,15 @@ mod tests {
let database_url =
dotenvy::var("TEST_DATABASE_URL").expect("TEST_DATABASE_URL must be set to run tests");

super::Runner::internal_test_runner(None, database_url)
let connection_pool = r2d2::Pool::builder()
.max_size(4)
.build_unchecked(ConnectionManager::new(database_url));

let connection_pool = DieselPool::new_background_worker(connection_pool);

Runner::new(connection_pool, Arc::new(None))
.num_workers(2)
.job_start_timeout(Duration::from_secs(10))
}

fn create_dummy_job(runner: &Runner) -> storage::BackgroundJob {
Expand Down
9 changes: 5 additions & 4 deletions src/tests/util/test_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,11 @@ impl TestAppBuilder {
app.storage.clone(),
);

Some(Runner::test_runner(
environment,
app.primary_database.clone(),
))
let runner = Runner::new(app.primary_database.clone(), Arc::new(Some(environment)))
.num_workers(1)
.job_start_timeout(Duration::from_secs(5));

Some(runner)
} else {
None
};
Expand Down

0 comments on commit 2a45482

Please sign in to comment.