From 473d805a8700af7f323ea064a233b239314b2838 Mon Sep 17 00:00:00 2001 From: Spencer Ferris <3319370+spencewenski@users.noreply.github.com> Date: Sat, 22 Jun 2024 18:02:18 -0700 Subject: [PATCH] feat: Move sidekiq "stale cleanup" to new `before_run` service method In order to allow handling CLI commands with services without requiring the sidkiq redis connection to be available, we need to move the "stale cleanup" behavior out of the "build" method so that the service can be built without needing to access redis. This is accomplished by adding a new `AppService::before_run` trait method, which is run after all the services are built and before any of the services are run. The sidekiq service's "stale cleanup" behavior is now implemented in this trait method. This PR also updates the health check API to run the db and redis checks in parallel. Closes https://github.com/roadster-rs/roadster/issues/238 --- src/api/core/health.rs | 101 ++++++++++------ src/app/mod.rs | 2 + src/service/mod.rs | 8 ++ src/service/runner.rs | 15 +++ src/service/worker/sidekiq/builder.rs | 161 ++------------------------ src/service/worker/sidekiq/service.rs | 155 ++++++++++++++++++++++++- 6 files changed, 251 insertions(+), 191 deletions(-) diff --git a/src/api/core/health.rs b/src/api/core/health.rs index b05f15d4..66bcc381 100644 --- a/src/api/core/health.rs +++ b/src/api/core/health.rs @@ -11,7 +11,7 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, skip_serializing_none}; #[cfg(feature = "sidekiq")] use sidekiq::redis_rs::cmd; -#[cfg(feature = "sidekiq")] +#[cfg(any(feature = "db-sql", feature = "sidekiq"))] use std::time::Duration; use std::time::Instant; #[cfg(feature = "sidekiq")] @@ -79,35 +79,21 @@ where S: Clone + Send + Sync + 'static, { let timer = Instant::now(); - #[cfg(any(feature = "sidekiq", feature = "db-sql"))] - #[cfg(feature = "db-sql")] - let db = { - let db_timer = Instant::now(); - let db_status = if ping_db(state.db()).await.is_ok() { - Status::Ok - } else { - Status::Err(ErrorData { msg: None }) - }; - let db_timer = db_timer.elapsed(); - ResourceHealth { - status: db_status, - acquire_conn_latency: None, - ping_latency: None, - latency: db_timer.as_millis(), - } - }; - #[cfg(feature = "sidekiq")] - let (redis_enqueue, redis_fetch) = { - let redis_enqueue = redis_health(state.redis_enqueue()); - if let Some(redis_fetch) = state.redis_fetch() { - let (redis_enqueue, redis_fetch) = - tokio::join!(redis_enqueue, redis_health(redis_fetch)); - (redis_enqueue, Some(redis_fetch)) - } else { - (redis_enqueue.await, None) - } - }; + #[cfg(any(feature = "db-sql", feature = "sidekiq"))] + let timeout_duration = Duration::from_secs(1); + + #[cfg(all(feature = "db-sql", feature = "sidekiq"))] + let (db, (redis_enqueue, redis_fetch)) = tokio::join!( + db_health(state, timeout_duration), + all_redis_health(state, timeout_duration) + ); + + #[cfg(all(feature = "db-sql", not(feature = "sidekiq")))] + let db = db_health(state).await; + + #[cfg(all(not(feature = "db-sql"), feature = "sidekiq"))] + let (redis_enqueue, redis_fetch) = all_redis_health(state).await; Ok(HeathCheckResponse { latency: timer.elapsed().as_millis(), @@ -120,18 +106,60 @@ where }) } +#[cfg(feature = "db-sql")] +pub(crate) async fn db_health(state: &AppContext, duration: Duration) -> ResourceHealth +where + S: Clone + Send + Sync + 'static, +{ + let db_timer = Instant::now(); + let db_status = match ping_db(state.db(), duration).await { + Ok(_) => Status::Ok, + Err(err) => Status::Err(ErrorData { + msg: Some(err.to_string()), + }), + }; + let db_timer = db_timer.elapsed(); + ResourceHealth { + status: db_status, + acquire_conn_latency: None, + ping_latency: None, + latency: db_timer.as_millis(), + } +} + #[cfg(feature = "db-sql")] #[instrument(skip_all)] -async fn ping_db(db: &DatabaseConnection) -> RoadsterResult<()> { - db.ping().await?; +async fn ping_db(db: &DatabaseConnection, duration: Duration) -> RoadsterResult<()> { + timeout(duration, db.ping()).await??; Ok(()) } +#[cfg(feature = "sidekiq")] +pub(crate) async fn all_redis_health( + state: &AppContext, + duration: Duration, +) -> (ResourceHealth, Option) +where + S: Clone + Send + Sync + 'static, +{ + { + let redis_enqueue = redis_health(state.redis_enqueue(), duration); + if let Some(redis_fetch) = state.redis_fetch() { + let (redis_enqueue, redis_fetch) = + tokio::join!(redis_enqueue, redis_health(redis_fetch, duration)); + (redis_enqueue, Some(redis_fetch)) + } else { + (redis_enqueue.await, None) + } + } +} + #[cfg(feature = "sidekiq")] #[instrument(skip_all)] -async fn redis_health(redis: &sidekiq::RedisPool) -> ResourceHealth { +async fn redis_health(redis: &sidekiq::RedisPool, duration: Duration) -> ResourceHealth { let redis_timer = Instant::now(); - let (redis_status, acquire_conn_latency, ping_latency) = match ping_redis(redis).await { + let (redis_status, acquire_conn_latency, ping_latency) = match ping_redis(redis, duration).await + { Ok((a, b)) => (Status::Ok, Some(a.as_millis()), Some(b.as_millis())), Err(err) => ( Status::Err(ErrorData { @@ -152,9 +180,12 @@ async fn redis_health(redis: &sidekiq::RedisPool) -> ResourceHealth { #[cfg(feature = "sidekiq")] #[instrument(skip_all)] -async fn ping_redis(redis: &sidekiq::RedisPool) -> RoadsterResult<(Duration, Duration)> { +async fn ping_redis( + redis: &sidekiq::RedisPool, + duration: Duration, +) -> RoadsterResult<(Duration, Duration)> { let timer = Instant::now(); - let mut conn = timeout(Duration::from_secs(1), redis.get()).await??; + let mut conn = timeout(duration, redis.get()).await??; let acquire_conn_latency = timer.elapsed(); let timer = Instant::now(); diff --git a/src/app/mod.rs b/src/app/mod.rs index ff64a6c2..914ed1fa 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -89,6 +89,8 @@ where A::M::up(context.db(), None).await?; } + crate::service::runner::before_run(&service_registry, &context).await?; + crate::service::runner::run(service_registry, &context).await?; Ok(()) diff --git a/src/service/mod.rs b/src/service/mod.rs index a53f9b1c..f11606ad 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -46,6 +46,14 @@ pub trait AppService: Send + Sync { Ok(false) } + /// Perform any initialization work or other checks that should be done before the service runs. + /// + /// For example, checking that the service is healthy, removing stale items from the + /// service's queue, etc. + async fn before_run(&self, _app_context: &AppContext) -> RoadsterResult<()> { + Ok(()) + } + /// Run the service in a new tokio task. /// /// * cancel_token - A tokio [CancellationToken] to use as a signal to gracefully shut down diff --git a/src/service/runner.rs b/src/service/runner.rs index c4083e95..dc132d03 100644 --- a/src/service/runner.rs +++ b/src/service/runner.rs @@ -27,6 +27,21 @@ where Ok(false) } +pub(crate) async fn before_run( + service_registry: &ServiceRegistry, + context: &AppContext, +) -> RoadsterResult<()> +where + A: App, +{ + for (name, service) in service_registry.services.iter() { + info!(service=%name, "Running `before run`"); + service.before_run(context).await?; + } + + Ok(()) +} + pub(crate) async fn run( service_registry: ServiceRegistry, context: &AppContext, diff --git a/src/service/worker/sidekiq/builder.rs b/src/service/worker/sidekiq/builder.rs index f0acff34..c3f9754d 100644 --- a/src/service/worker/sidekiq/builder.rs +++ b/src/service/worker/sidekiq/builder.rs @@ -10,19 +10,14 @@ use crate::service::worker::sidekiq::Processor; use crate::service::AppServiceBuilder; use anyhow::anyhow; use async_trait::async_trait; -use bb8::PooledConnection; use itertools::Itertools; use num_traits::ToPrimitive; use serde::Serialize; -use sidekiq::redis_rs::ToRedisArgs; -use sidekiq::{ - periodic, ProcessorConfig, RedisConnection, RedisConnectionManager, RedisError, - ServerMiddleware, -}; +use sidekiq::{periodic, ProcessorConfig, ServerMiddleware}; use std::collections::HashSet; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; -const PERIODIC_KEY: &str = "periodic"; +pub(crate) const PERIODIC_KEY: &str = "periodic"; pub struct SidekiqWorkerServiceBuilder where @@ -57,20 +52,16 @@ where } } - async fn build(self, context: &AppContext) -> RoadsterResult { + async fn build(self, _context: &AppContext) -> RoadsterResult { let service = match self.state { BuilderState::Enabled { processor, registered_periodic_workers, .. - } => { - let mut conn = context.redis_enqueue().get().await?; - remove_stale_periodic_jobs(&mut conn, context, ®istered_periodic_workers) - .await?; - SidekiqWorkerService { - processor: processor.into_sidekiq_processor(), - } - } + } => SidekiqWorkerService { + registered_periodic_workers, + processor: processor.into_sidekiq_processor(), + }, BuilderState::Disabled => { return Err(anyhow!( "This builder is not enabled; it's build method should not have been called." @@ -294,94 +285,6 @@ where } } -/// Compares the list of periodic jobs that were registered by the app during app startup with -/// the list of periodic jobs in Redis, and removes any that exist in Redis but weren't -/// registered during start up. -/// -/// The jobs are only removed if the [worker.sidekiq.periodic.stale-cleanup][crate::config::worker::Periodic] -/// config is set to [auto-clean-stale][StaleCleanUpBehavior::AutoCleanStale]. -/// -/// This is run after all the app's periodic jobs have been registered. -async fn remove_stale_periodic_jobs( - conn: &mut C, - context: &AppContext, - registered_periodic_workers: &HashSet, -) -> RoadsterResult<()> { - let stale_jobs = conn - .zrange(PERIODIC_KEY.to_string(), 0, -1) - .await? - .into_iter() - .filter(|job| !registered_periodic_workers.contains(job)) - .collect_vec(); - - if stale_jobs.is_empty() { - info!("No stale periodic jobs found"); - return Ok(()); - } - - if context - .config() - .service - .sidekiq - .custom - .periodic - .stale_cleanup - == StaleCleanUpBehavior::AutoCleanStale - { - info!( - "Removing {} stale periodic jobs:\n{}", - stale_jobs.len(), - stale_jobs.join("\n") - ); - conn.zrem(PERIODIC_KEY.to_string(), stale_jobs.clone()) - .await?; - } else { - warn!( - "Found {} stale periodic jobs:\n{}", - stale_jobs.len(), - stale_jobs.join("\n") - ); - } - - Ok(()) -} - -/// Trait to help with mocking responses from Redis. -// Todo: Make available to other parts of the project? -#[cfg_attr(test, mockall::automock)] -#[async_trait] -trait RedisCommands { - async fn zrange( - &mut self, - key: String, - lower: isize, - upper: isize, - ) -> Result, RedisError>; - - async fn zrem(&mut self, key: String, value: V) -> Result - where - V: ToRedisArgs + Send + Sync + 'static; -} - -#[async_trait] -impl<'a> RedisCommands for PooledConnection<'a, RedisConnectionManager> { - async fn zrange( - &mut self, - key: String, - lower: isize, - upper: isize, - ) -> Result, RedisError> { - RedisConnection::zrange(self, key, lower, upper).await - } - - async fn zrem(&mut self, key: String, value: V) -> Result - where - V: ToRedisArgs + Send + Sync + 'static, - { - RedisConnection::zrem(self, key, value).await - } -} - #[cfg(test)] mod tests { use super::*; @@ -584,52 +487,4 @@ mod tests { // Assert assert_eq!(result.is_err(), expect_err); } - - #[rstest] - #[case(false, Default::default(), Default::default(), Default::default())] - #[case(true, Default::default(), Default::default(), Default::default())] - #[case(true, Default::default(), vec!["foo".to_string()], vec!["foo".to_string()])] - #[case(true, vec!["foo".to_string()], vec!["foo".to_string()], Default::default())] - #[case(true, vec!["foo".to_string()], vec!["bar".to_string()], vec!["bar".to_string()])] - #[case(false, Default::default(), vec!["foo".to_string()], Default::default())] - #[tokio::test] - #[cfg_attr(coverage_nightly, coverage(off))] - async fn remove_stale_periodic_jobs( - #[case] clean_stale: bool, - #[case] registered_jobs: Vec, - #[case] jobs_in_redis: Vec, - #[case] expected_jobs_removed: Vec, - ) { - let mut config = AppConfig::test(None).unwrap(); - if clean_stale { - config.service.sidekiq.custom.periodic.stale_cleanup = - StaleCleanUpBehavior::AutoCleanStale; - } else { - config.service.sidekiq.custom.periodic.stale_cleanup = StaleCleanUpBehavior::Manual; - } - - let context = AppContext::<()>::test(Some(config), None, None).unwrap(); - - let mut redis = MockRedisCommands::default(); - redis - .expect_zrange() - .times(1) - .return_once(move |_, _, _| Ok(jobs_in_redis)); - - let zrem = redis.expect_zrem(); - if clean_stale && !expected_jobs_removed.is_empty() { - zrem.times(1); - } else { - zrem.never(); - } - zrem.withf(move |key, jobs| PERIODIC_KEY == key && expected_jobs_removed.iter().eq(jobs)) - .return_once(|_, _: Vec| Ok(true)); - - let registered_jobs: HashSet = - registered_jobs.iter().map(|s| s.to_string()).collect(); - - super::remove_stale_periodic_jobs(&mut redis, &context, ®istered_jobs) - .await - .unwrap(); - } } diff --git a/src/service/worker/sidekiq/service.rs b/src/service/worker/sidekiq/service.rs index 6643d8d3..df867541 100644 --- a/src/service/worker/sidekiq/service.rs +++ b/src/service/worker/sidekiq/service.rs @@ -1,13 +1,18 @@ use crate::app::context::AppContext; use crate::app::App; +use crate::config::service::worker::sidekiq::StaleCleanUpBehavior; use crate::error::RoadsterResult; -use crate::service::worker::sidekiq::builder::SidekiqWorkerServiceBuilder; +use crate::service::worker::sidekiq::builder::{SidekiqWorkerServiceBuilder, PERIODIC_KEY}; use crate::service::AppService; use async_trait::async_trait; -use sidekiq::Processor; +use bb8::PooledConnection; +use itertools::Itertools; +use sidekiq::redis_rs::ToRedisArgs; +use sidekiq::{Processor, RedisConnection, RedisConnectionManager, RedisError}; +use std::collections::HashSet; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::{debug, error}; +use tracing::{debug, error, info, instrument, warn}; pub(crate) const NAME: &str = "sidekiq"; @@ -33,6 +38,7 @@ pub(crate) fn enabled(context: &AppContext) -> bool { } pub struct SidekiqWorkerService { + pub(crate) registered_periodic_workers: HashSet, pub(crate) processor: Processor, } @@ -46,6 +52,12 @@ impl AppService for SidekiqWorkerService { enabled(context) } + #[instrument(skip_all)] + async fn before_run(&self, app_context: &AppContext) -> RoadsterResult<()> { + let mut conn = app_context.redis_enqueue().get().await?; + remove_stale_periodic_jobs(&mut conn, app_context, &self.registered_periodic_workers).await + } + async fn run( self: Box, _app_context: &AppContext, @@ -90,8 +102,97 @@ impl SidekiqWorkerService { } } +/// Compares the list of periodic jobs that were registered by the app during app startup with +/// the list of periodic jobs in Redis, and removes any that exist in Redis but weren't +/// registered during start up. +/// +/// The jobs are only removed if the [worker.sidekiq.periodic.stale-cleanup][crate::config::worker::Periodic] +/// config is set to [auto-clean-stale][StaleCleanUpBehavior::AutoCleanStale]. +/// +/// This is run after all the app's periodic jobs have been registered. +async fn remove_stale_periodic_jobs( + conn: &mut C, + context: &AppContext, + registered_periodic_workers: &HashSet, +) -> RoadsterResult<()> { + let stale_jobs = conn + .zrange(PERIODIC_KEY.to_string(), 0, -1) + .await? + .into_iter() + .filter(|job| !registered_periodic_workers.contains(job)) + .collect_vec(); + + if stale_jobs.is_empty() { + info!("No stale periodic jobs found"); + return Ok(()); + } + + if context + .config() + .service + .sidekiq + .custom + .periodic + .stale_cleanup + == StaleCleanUpBehavior::AutoCleanStale + { + info!( + "Removing {} stale periodic jobs:\n{}", + stale_jobs.len(), + stale_jobs.join("\n") + ); + conn.zrem(PERIODIC_KEY.to_string(), stale_jobs.clone()) + .await?; + } else { + warn!( + "Found {} stale periodic jobs:\n{}", + stale_jobs.len(), + stale_jobs.join("\n") + ); + } + + Ok(()) +} + +/// Trait to help with mocking responses from Redis. +// Todo: Make available to other parts of the project? +#[cfg_attr(test, mockall::automock)] +#[async_trait] +trait RedisCommands { + async fn zrange( + &mut self, + key: String, + lower: isize, + upper: isize, + ) -> Result, RedisError>; + + async fn zrem(&mut self, key: String, value: V) -> Result + where + V: ToRedisArgs + Send + Sync + 'static; +} + +#[async_trait] +impl<'a> RedisCommands for PooledConnection<'a, RedisConnectionManager> { + async fn zrange( + &mut self, + key: String, + lower: isize, + upper: isize, + ) -> Result, RedisError> { + RedisConnection::zrange(self, key, lower, upper).await + } + + async fn zrem(&mut self, key: String, value: V) -> Result + where + V: ToRedisArgs + Send + Sync + 'static, + { + RedisConnection::zrem(self, key, value).await + } +} + #[cfg(test)] mod tests { + use super::*; use crate::app::context::AppContext; use crate::config::app_config::AppConfig; use bb8::Pool; @@ -134,4 +235,52 @@ mod tests { assert_eq!(super::enabled(&context), expected_enabled); } + + #[rstest] + #[case(false, Default::default(), Default::default(), Default::default())] + #[case(true, Default::default(), Default::default(), Default::default())] + #[case(true, Default::default(), vec!["foo".to_string()], vec!["foo".to_string()])] + #[case(true, vec!["foo".to_string()], vec!["foo".to_string()], Default::default())] + #[case(true, vec!["foo".to_string()], vec!["bar".to_string()], vec!["bar".to_string()])] + #[case(false, Default::default(), vec!["foo".to_string()], Default::default())] + #[tokio::test] + #[cfg_attr(coverage_nightly, coverage(off))] + async fn remove_stale_periodic_jobs( + #[case] clean_stale: bool, + #[case] registered_jobs: Vec, + #[case] jobs_in_redis: Vec, + #[case] expected_jobs_removed: Vec, + ) { + let mut config = AppConfig::test(None).unwrap(); + if clean_stale { + config.service.sidekiq.custom.periodic.stale_cleanup = + StaleCleanUpBehavior::AutoCleanStale; + } else { + config.service.sidekiq.custom.periodic.stale_cleanup = StaleCleanUpBehavior::Manual; + } + + let context = AppContext::<()>::test(Some(config), None, None).unwrap(); + + let mut redis = MockRedisCommands::default(); + redis + .expect_zrange() + .times(1) + .return_once(move |_, _, _| Ok(jobs_in_redis)); + + let zrem = redis.expect_zrem(); + if clean_stale && !expected_jobs_removed.is_empty() { + zrem.times(1); + } else { + zrem.never(); + } + zrem.withf(move |key, jobs| PERIODIC_KEY == key && expected_jobs_removed.iter().eq(jobs)) + .return_once(|_, _: Vec| Ok(true)); + + let registered_jobs: HashSet = + registered_jobs.iter().map(|s| s.to_string()).collect(); + + super::remove_stale_periodic_jobs(&mut redis, &context, ®istered_jobs) + .await + .unwrap(); + } }