From e586d6ed009e13057e30d83451a8f4f6b6df0d8f Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Mon, 6 Nov 2023 23:04:09 +0100 Subject: [PATCH 1/2] worker/environment: Clone index `Repository` lazily --- src/bin/background-worker.rs | 13 ++------- src/tests/util/test_app.rs | 5 ++-- src/worker/environment.rs | 53 ++++++++++++++++++++++++++++++------ 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 99b31489ca7..a97014966b1 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -24,14 +24,14 @@ use crates_io::worker::swirl::Runner; use crates_io::worker::{Environment, RunnerExt}; use crates_io::{db, ssh}; use crates_io_env_vars::{var, var_parsed}; -use crates_io_index::{Repository, RepositoryConfig}; +use crates_io_index::RepositoryConfig; use diesel::r2d2; use diesel::r2d2::ConnectionManager; use reqwest::blocking::Client; use secrecy::ExposeSecret; use std::sync::Arc; use std::thread::sleep; -use std::time::{Duration, Instant}; +use std::time::Duration; fn main() -> anyhow::Result<()> { let _sentry = crates_io::sentry::init(); @@ -59,18 +59,11 @@ fn main() -> anyhow::Result<()> { let job_start_timeout = var_parsed("BACKGROUND_JOB_TIMEOUT")?.unwrap_or(30); - info!("Cloning index"); - if var("HEROKU")?.is_some() { ssh::write_known_hosts_file().unwrap(); } - let clone_start = Instant::now(); let repository_config = RepositoryConfig::from_environment()?; - let repository = Repository::open(&repository_config).expect("Failed to clone index"); - - let clone_duration = clone_start.elapsed(); - info!(duration = ?clone_duration, "Index cloned"); let cloudfront = CloudFront::from_environment(); let fastly = Fastly::from_environment(); @@ -81,7 +74,7 @@ fn main() -> anyhow::Result<()> { .build() .expect("Couldn't build client"); - let environment = Environment::new(repository, client, cloudfront, fastly, storage); + let environment = Environment::new(repository_config, client, cloudfront, fastly, storage); let environment = Arc::new(environment); diff --git a/src/tests/util/test_app.rs b/src/tests/util/test_app.rs index 861dcc4b43b..6ab4c7436b0 100644 --- a/src/tests/util/test_app.rs +++ b/src/tests/util/test_app.rs @@ -11,7 +11,7 @@ use crates_io::worker::{Environment, RunnerExt}; use crates_io::{App, Emails, Env}; use crates_io_env_vars::required_var; use crates_io_index::testing::UpstreamIndex; -use crates_io_index::{Credentials, Repository as WorkerRepository, RepositoryConfig}; +use crates_io_index::{Credentials, RepositoryConfig}; use crates_io_test_db::TestDatabase; use diesel::PgConnection; use futures_util::TryStreamExt; @@ -252,9 +252,8 @@ impl TestAppBuilder { index_location: UpstreamIndex::url(), credentials: Credentials::Missing, }; - let index = WorkerRepository::open(&repository_config).expect("Could not clone index"); let environment = Environment::new( - index, + repository_config, app.http_client().clone(), None, None, diff --git a/src/worker/environment.rs b/src/worker/environment.rs index 41865603923..fb9e79efba4 100644 --- a/src/worker/environment.rs +++ b/src/worker/environment.rs @@ -2,12 +2,15 @@ use crate::cloudfront::CloudFront; use crate::fastly::Fastly; use crate::storage::Storage; use crate::worker::swirl::PerformError; -use crates_io_index::Repository; +use crates_io_index::{Repository, RepositoryConfig}; use reqwest::blocking::Client; +use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; +use std::time::Instant; pub struct Environment { - index: Mutex, + repository_config: RepositoryConfig, + repository: Mutex>, http_client: Client, cloudfront: Option, fastly: Option, @@ -16,14 +19,15 @@ pub struct Environment { impl Environment { pub fn new( - index: Repository, + repository_config: RepositoryConfig, http_client: Client, cloudfront: Option, fastly: Option, storage: Arc, ) -> Self { Self { - index: Mutex::new(index), + repository_config, + repository: Mutex::new(None), http_client, cloudfront, fastly, @@ -32,10 +36,25 @@ impl Environment { } #[instrument(skip_all)] - pub fn lock_index(&self) -> Result, PerformError> { - let repo = self.index.lock().unwrap_or_else(PoisonError::into_inner); - repo.reset_head()?; - Ok(repo) + pub fn lock_index(&self) -> Result, PerformError> { + let mut repo = self + .repository + .lock() + .unwrap_or_else(PoisonError::into_inner); + + if repo.is_none() { + info!("Cloning index"); + let clone_start = Instant::now(); + + *repo = Some(Repository::open(&self.repository_config)?); + + let clone_duration = clone_start.elapsed(); + info!(duration = ?clone_duration, "Index cloned"); + } + + let repo_lock = RepositoryLock { repo }; + repo_lock.reset_head()?; + Ok(repo_lock) } /// Returns a client for making HTTP requests to upload crate files. @@ -51,3 +70,21 @@ impl Environment { self.fastly.as_ref() } } + +pub struct RepositoryLock<'a> { + repo: MutexGuard<'a, Option>, +} + +impl Deref for RepositoryLock<'_> { + type Target = Repository; + + fn deref(&self) -> &Self::Target { + self.repo.as_ref().unwrap() + } +} + +impl DerefMut for RepositoryLock<'_> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.repo.as_mut().unwrap() + } +} From 7ed429777bb5305fa3cbb67c7e8ef224b44c3b97 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Mon, 6 Nov 2023 23:11:25 +0100 Subject: [PATCH 2/2] bin/background-worker: Clone index `Repository` eagerly... but in the background This commit moves us back to the previous behavior, where the index repository is cloned on startup. But compared to the previous behavior, the cloning is now happening in a background thread so that any background jobs that don't need the repository can run uninterrupted. --- src/bin/background-worker.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index a97014966b1..fd185b1fc7b 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -78,6 +78,15 @@ fn main() -> anyhow::Result<()> { let environment = Arc::new(environment); + std::thread::spawn({ + let environment = environment.clone(); + move || { + if let Err(err) = environment.lock_index() { + warn!(%err, "Failed to clone index"); + }; + } + }); + let connection_pool = r2d2::Pool::builder() .max_size(10) .min_idle(Some(0))