diff --git a/src/api/core/health.rs b/src/api/core/health.rs index b05f15d4..66bcc381 100644 --- a/src/api/core/health.rs +++ b/src/api/core/health.rs @@ -11,7 +11,7 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, skip_serializing_none}; #[cfg(feature = "sidekiq")] use sidekiq::redis_rs::cmd; -#[cfg(feature = "sidekiq")] +#[cfg(any(feature = "db-sql", feature = "sidekiq"))] use std::time::Duration; use std::time::Instant; #[cfg(feature = "sidekiq")] @@ -79,35 +79,21 @@ where S: Clone + Send + Sync + 'static, { let timer = Instant::now(); - #[cfg(any(feature = "sidekiq", feature = "db-sql"))] - #[cfg(feature = "db-sql")] - let db = { - let db_timer = Instant::now(); - let db_status = if ping_db(state.db()).await.is_ok() { - Status::Ok - } else { - Status::Err(ErrorData { msg: None }) - }; - let db_timer = db_timer.elapsed(); - ResourceHealth { - status: db_status, - acquire_conn_latency: None, - ping_latency: None, - latency: db_timer.as_millis(), - } - }; - #[cfg(feature = "sidekiq")] - let (redis_enqueue, redis_fetch) = { - let redis_enqueue = redis_health(state.redis_enqueue()); - if let Some(redis_fetch) = state.redis_fetch() { - let (redis_enqueue, redis_fetch) = - tokio::join!(redis_enqueue, redis_health(redis_fetch)); - (redis_enqueue, Some(redis_fetch)) - } else { - (redis_enqueue.await, None) - } - }; + #[cfg(any(feature = "db-sql", feature = "sidekiq"))] + let timeout_duration = Duration::from_secs(1); + + #[cfg(all(feature = "db-sql", feature = "sidekiq"))] + let (db, (redis_enqueue, redis_fetch)) = tokio::join!( + db_health(state, timeout_duration), + all_redis_health(state, timeout_duration) + ); + + #[cfg(all(feature = "db-sql", not(feature = "sidekiq")))] + let db = db_health(state).await; + + #[cfg(all(not(feature = "db-sql"), feature = "sidekiq"))] + let (redis_enqueue, redis_fetch) = all_redis_health(state).await; Ok(HeathCheckResponse { latency: timer.elapsed().as_millis(), @@ -120,18 +106,60 @@ where }) } +#[cfg(feature = "db-sql")] +pub(crate) async fn db_health(state: &AppContext, duration: Duration) -> ResourceHealth +where + S: Clone + Send + Sync + 'static, +{ + let db_timer = Instant::now(); + let db_status = match ping_db(state.db(), duration).await { + Ok(_) => Status::Ok, + Err(err) => Status::Err(ErrorData { + msg: Some(err.to_string()), + }), + }; + let db_timer = db_timer.elapsed(); + ResourceHealth { + status: db_status, + acquire_conn_latency: None, + ping_latency: None, + latency: db_timer.as_millis(), + } +} + #[cfg(feature = "db-sql")] #[instrument(skip_all)] -async fn ping_db(db: &DatabaseConnection) -> RoadsterResult<()> { - db.ping().await?; +async fn ping_db(db: &DatabaseConnection, duration: Duration) -> RoadsterResult<()> { + timeout(duration, db.ping()).await??; Ok(()) } +#[cfg(feature = "sidekiq")] +pub(crate) async fn all_redis_health( + state: &AppContext, + duration: Duration, +) -> (ResourceHealth, Option) +where + S: Clone + Send + Sync + 'static, +{ + { + let redis_enqueue = redis_health(state.redis_enqueue(), duration); + if let Some(redis_fetch) = state.redis_fetch() { + let (redis_enqueue, redis_fetch) = + tokio::join!(redis_enqueue, redis_health(redis_fetch, duration)); + (redis_enqueue, Some(redis_fetch)) + } else { + (redis_enqueue.await, None) + } + } +} + #[cfg(feature = "sidekiq")] #[instrument(skip_all)] -async fn redis_health(redis: &sidekiq::RedisPool) -> ResourceHealth { +async fn redis_health(redis: &sidekiq::RedisPool, duration: Duration) -> ResourceHealth { let redis_timer = Instant::now(); - let (redis_status, acquire_conn_latency, ping_latency) = match ping_redis(redis).await { + let (redis_status, acquire_conn_latency, ping_latency) = match ping_redis(redis, duration).await + { Ok((a, b)) => (Status::Ok, Some(a.as_millis()), Some(b.as_millis())), Err(err) => ( Status::Err(ErrorData { @@ -152,9 +180,12 @@ async fn redis_health(redis: &sidekiq::RedisPool) -> ResourceHealth { #[cfg(feature = "sidekiq")] #[instrument(skip_all)] -async fn ping_redis(redis: &sidekiq::RedisPool) -> RoadsterResult<(Duration, Duration)> { +async fn ping_redis( + redis: &sidekiq::RedisPool, + duration: Duration, +) -> RoadsterResult<(Duration, Duration)> { let timer = Instant::now(); - let mut conn = timeout(Duration::from_secs(1), redis.get()).await??; + let mut conn = timeout(duration, redis.get()).await??; let acquire_conn_latency = timer.elapsed(); let timer = Instant::now(); diff --git a/src/app/mod.rs b/src/app/mod.rs index ff64a6c2..914ed1fa 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -89,6 +89,8 @@ where A::M::up(context.db(), None).await?; } + crate::service::runner::before_run(&service_registry, &context).await?; + crate::service::runner::run(service_registry, &context).await?; Ok(()) diff --git a/src/service/mod.rs b/src/service/mod.rs index a53f9b1c..f11606ad 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -46,6 +46,14 @@ pub trait AppService: Send + Sync { Ok(false) } + /// Perform any initialization work or other checks that should be done before the service runs. + /// + /// For example, checking that the service is healthy, removing stale items from the + /// service's queue, etc. + async fn before_run(&self, _app_context: &AppContext) -> RoadsterResult<()> { + Ok(()) + } + /// Run the service in a new tokio task. /// /// * cancel_token - A tokio [CancellationToken] to use as a signal to gracefully shut down diff --git a/src/service/runner.rs b/src/service/runner.rs index c4083e95..dc132d03 100644 --- a/src/service/runner.rs +++ b/src/service/runner.rs @@ -27,6 +27,21 @@ where Ok(false) } +pub(crate) async fn before_run( + service_registry: &ServiceRegistry, + context: &AppContext, +) -> RoadsterResult<()> +where + A: App, +{ + for (name, service) in service_registry.services.iter() { + info!(service=%name, "Running `before run`"); + service.before_run(context).await?; + } + + Ok(()) +} + pub(crate) async fn run( service_registry: ServiceRegistry, context: &AppContext, diff --git a/src/service/worker/sidekiq/builder.rs b/src/service/worker/sidekiq/builder.rs index f0acff34..c3f9754d 100644 --- a/src/service/worker/sidekiq/builder.rs +++ b/src/service/worker/sidekiq/builder.rs @@ -10,19 +10,14 @@ use crate::service::worker::sidekiq::Processor; use crate::service::AppServiceBuilder; use anyhow::anyhow; use async_trait::async_trait; -use bb8::PooledConnection; use itertools::Itertools; use num_traits::ToPrimitive; use serde::Serialize; -use sidekiq::redis_rs::ToRedisArgs; -use sidekiq::{ - periodic, ProcessorConfig, RedisConnection, RedisConnectionManager, RedisError, - ServerMiddleware, -}; +use sidekiq::{periodic, ProcessorConfig, ServerMiddleware}; use std::collections::HashSet; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; -const PERIODIC_KEY: &str = "periodic"; +pub(crate) const PERIODIC_KEY: &str = "periodic"; pub struct SidekiqWorkerServiceBuilder where @@ -57,20 +52,16 @@ where } } - async fn build(self, context: &AppContext) -> RoadsterResult { + async fn build(self, _context: &AppContext) -> RoadsterResult { let service = match self.state { BuilderState::Enabled { processor, registered_periodic_workers, .. - } => { - let mut conn = context.redis_enqueue().get().await?; - remove_stale_periodic_jobs(&mut conn, context, ®istered_periodic_workers) - .await?; - SidekiqWorkerService { - processor: processor.into_sidekiq_processor(), - } - } + } => SidekiqWorkerService { + registered_periodic_workers, + processor: processor.into_sidekiq_processor(), + }, BuilderState::Disabled => { return Err(anyhow!( "This builder is not enabled; it's build method should not have been called." @@ -294,94 +285,6 @@ where } } -/// Compares the list of periodic jobs that were registered by the app during app startup with -/// the list of periodic jobs in Redis, and removes any that exist in Redis but weren't -/// registered during start up. -/// -/// The jobs are only removed if the [worker.sidekiq.periodic.stale-cleanup][crate::config::worker::Periodic] -/// config is set to [auto-clean-stale][StaleCleanUpBehavior::AutoCleanStale]. -/// -/// This is run after all the app's periodic jobs have been registered. -async fn remove_stale_periodic_jobs( - conn: &mut C, - context: &AppContext, - registered_periodic_workers: &HashSet, -) -> RoadsterResult<()> { - let stale_jobs = conn - .zrange(PERIODIC_KEY.to_string(), 0, -1) - .await? - .into_iter() - .filter(|job| !registered_periodic_workers.contains(job)) - .collect_vec(); - - if stale_jobs.is_empty() { - info!("No stale periodic jobs found"); - return Ok(()); - } - - if context - .config() - .service - .sidekiq - .custom - .periodic - .stale_cleanup - == StaleCleanUpBehavior::AutoCleanStale - { - info!( - "Removing {} stale periodic jobs:\n{}", - stale_jobs.len(), - stale_jobs.join("\n") - ); - conn.zrem(PERIODIC_KEY.to_string(), stale_jobs.clone()) - .await?; - } else { - warn!( - "Found {} stale periodic jobs:\n{}", - stale_jobs.len(), - stale_jobs.join("\n") - ); - } - - Ok(()) -} - -/// Trait to help with mocking responses from Redis. -// Todo: Make available to other parts of the project? -#[cfg_attr(test, mockall::automock)] -#[async_trait] -trait RedisCommands { - async fn zrange( - &mut self, - key: String, - lower: isize, - upper: isize, - ) -> Result, RedisError>; - - async fn zrem(&mut self, key: String, value: V) -> Result - where - V: ToRedisArgs + Send + Sync + 'static; -} - -#[async_trait] -impl<'a> RedisCommands for PooledConnection<'a, RedisConnectionManager> { - async fn zrange( - &mut self, - key: String, - lower: isize, - upper: isize, - ) -> Result, RedisError> { - RedisConnection::zrange(self, key, lower, upper).await - } - - async fn zrem(&mut self, key: String, value: V) -> Result - where - V: ToRedisArgs + Send + Sync + 'static, - { - RedisConnection::zrem(self, key, value).await - } -} - #[cfg(test)] mod tests { use super::*; @@ -584,52 +487,4 @@ mod tests { // Assert assert_eq!(result.is_err(), expect_err); } - - #[rstest] - #[case(false, Default::default(), Default::default(), Default::default())] - #[case(true, Default::default(), Default::default(), Default::default())] - #[case(true, Default::default(), vec!["foo".to_string()], vec!["foo".to_string()])] - #[case(true, vec!["foo".to_string()], vec!["foo".to_string()], Default::default())] - #[case(true, vec!["foo".to_string()], vec!["bar".to_string()], vec!["bar".to_string()])] - #[case(false, Default::default(), vec!["foo".to_string()], Default::default())] - #[tokio::test] - #[cfg_attr(coverage_nightly, coverage(off))] - async fn remove_stale_periodic_jobs( - #[case] clean_stale: bool, - #[case] registered_jobs: Vec, - #[case] jobs_in_redis: Vec, - #[case] expected_jobs_removed: Vec, - ) { - let mut config = AppConfig::test(None).unwrap(); - if clean_stale { - config.service.sidekiq.custom.periodic.stale_cleanup = - StaleCleanUpBehavior::AutoCleanStale; - } else { - config.service.sidekiq.custom.periodic.stale_cleanup = StaleCleanUpBehavior::Manual; - } - - let context = AppContext::<()>::test(Some(config), None, None).unwrap(); - - let mut redis = MockRedisCommands::default(); - redis - .expect_zrange() - .times(1) - .return_once(move |_, _, _| Ok(jobs_in_redis)); - - let zrem = redis.expect_zrem(); - if clean_stale && !expected_jobs_removed.is_empty() { - zrem.times(1); - } else { - zrem.never(); - } - zrem.withf(move |key, jobs| PERIODIC_KEY == key && expected_jobs_removed.iter().eq(jobs)) - .return_once(|_, _: Vec| Ok(true)); - - let registered_jobs: HashSet = - registered_jobs.iter().map(|s| s.to_string()).collect(); - - super::remove_stale_periodic_jobs(&mut redis, &context, ®istered_jobs) - .await - .unwrap(); - } } diff --git a/src/service/worker/sidekiq/service.rs b/src/service/worker/sidekiq/service.rs index 6643d8d3..df867541 100644 --- a/src/service/worker/sidekiq/service.rs +++ b/src/service/worker/sidekiq/service.rs @@ -1,13 +1,18 @@ use crate::app::context::AppContext; use crate::app::App; +use crate::config::service::worker::sidekiq::StaleCleanUpBehavior; use crate::error::RoadsterResult; -use crate::service::worker::sidekiq::builder::SidekiqWorkerServiceBuilder; +use crate::service::worker::sidekiq::builder::{SidekiqWorkerServiceBuilder, PERIODIC_KEY}; use crate::service::AppService; use async_trait::async_trait; -use sidekiq::Processor; +use bb8::PooledConnection; +use itertools::Itertools; +use sidekiq::redis_rs::ToRedisArgs; +use sidekiq::{Processor, RedisConnection, RedisConnectionManager, RedisError}; +use std::collections::HashSet; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::{debug, error}; +use tracing::{debug, error, info, instrument, warn}; pub(crate) const NAME: &str = "sidekiq"; @@ -33,6 +38,7 @@ pub(crate) fn enabled(context: &AppContext) -> bool { } pub struct SidekiqWorkerService { + pub(crate) registered_periodic_workers: HashSet, pub(crate) processor: Processor, } @@ -46,6 +52,12 @@ impl AppService for SidekiqWorkerService { enabled(context) } + #[instrument(skip_all)] + async fn before_run(&self, app_context: &AppContext) -> RoadsterResult<()> { + let mut conn = app_context.redis_enqueue().get().await?; + remove_stale_periodic_jobs(&mut conn, app_context, &self.registered_periodic_workers).await + } + async fn run( self: Box, _app_context: &AppContext, @@ -90,8 +102,97 @@ impl SidekiqWorkerService { } } +/// Compares the list of periodic jobs that were registered by the app during app startup with +/// the list of periodic jobs in Redis, and removes any that exist in Redis but weren't +/// registered during start up. +/// +/// The jobs are only removed if the [worker.sidekiq.periodic.stale-cleanup][crate::config::worker::Periodic] +/// config is set to [auto-clean-stale][StaleCleanUpBehavior::AutoCleanStale]. +/// +/// This is run after all the app's periodic jobs have been registered. +async fn remove_stale_periodic_jobs( + conn: &mut C, + context: &AppContext, + registered_periodic_workers: &HashSet, +) -> RoadsterResult<()> { + let stale_jobs = conn + .zrange(PERIODIC_KEY.to_string(), 0, -1) + .await? + .into_iter() + .filter(|job| !registered_periodic_workers.contains(job)) + .collect_vec(); + + if stale_jobs.is_empty() { + info!("No stale periodic jobs found"); + return Ok(()); + } + + if context + .config() + .service + .sidekiq + .custom + .periodic + .stale_cleanup + == StaleCleanUpBehavior::AutoCleanStale + { + info!( + "Removing {} stale periodic jobs:\n{}", + stale_jobs.len(), + stale_jobs.join("\n") + ); + conn.zrem(PERIODIC_KEY.to_string(), stale_jobs.clone()) + .await?; + } else { + warn!( + "Found {} stale periodic jobs:\n{}", + stale_jobs.len(), + stale_jobs.join("\n") + ); + } + + Ok(()) +} + +/// Trait to help with mocking responses from Redis. +// Todo: Make available to other parts of the project? +#[cfg_attr(test, mockall::automock)] +#[async_trait] +trait RedisCommands { + async fn zrange( + &mut self, + key: String, + lower: isize, + upper: isize, + ) -> Result, RedisError>; + + async fn zrem(&mut self, key: String, value: V) -> Result + where + V: ToRedisArgs + Send + Sync + 'static; +} + +#[async_trait] +impl<'a> RedisCommands for PooledConnection<'a, RedisConnectionManager> { + async fn zrange( + &mut self, + key: String, + lower: isize, + upper: isize, + ) -> Result, RedisError> { + RedisConnection::zrange(self, key, lower, upper).await + } + + async fn zrem(&mut self, key: String, value: V) -> Result + where + V: ToRedisArgs + Send + Sync + 'static, + { + RedisConnection::zrem(self, key, value).await + } +} + #[cfg(test)] mod tests { + use super::*; use crate::app::context::AppContext; use crate::config::app_config::AppConfig; use bb8::Pool; @@ -134,4 +235,52 @@ mod tests { assert_eq!(super::enabled(&context), expected_enabled); } + + #[rstest] + #[case(false, Default::default(), Default::default(), Default::default())] + #[case(true, Default::default(), Default::default(), Default::default())] + #[case(true, Default::default(), vec!["foo".to_string()], vec!["foo".to_string()])] + #[case(true, vec!["foo".to_string()], vec!["foo".to_string()], Default::default())] + #[case(true, vec!["foo".to_string()], vec!["bar".to_string()], vec!["bar".to_string()])] + #[case(false, Default::default(), vec!["foo".to_string()], Default::default())] + #[tokio::test] + #[cfg_attr(coverage_nightly, coverage(off))] + async fn remove_stale_periodic_jobs( + #[case] clean_stale: bool, + #[case] registered_jobs: Vec, + #[case] jobs_in_redis: Vec, + #[case] expected_jobs_removed: Vec, + ) { + let mut config = AppConfig::test(None).unwrap(); + if clean_stale { + config.service.sidekiq.custom.periodic.stale_cleanup = + StaleCleanUpBehavior::AutoCleanStale; + } else { + config.service.sidekiq.custom.periodic.stale_cleanup = StaleCleanUpBehavior::Manual; + } + + let context = AppContext::<()>::test(Some(config), None, None).unwrap(); + + let mut redis = MockRedisCommands::default(); + redis + .expect_zrange() + .times(1) + .return_once(move |_, _, _| Ok(jobs_in_redis)); + + let zrem = redis.expect_zrem(); + if clean_stale && !expected_jobs_removed.is_empty() { + zrem.times(1); + } else { + zrem.never(); + } + zrem.withf(move |key, jobs| PERIODIC_KEY == key && expected_jobs_removed.iter().eq(jobs)) + .return_once(|_, _: Vec| Ok(true)); + + let registered_jobs: HashSet = + registered_jobs.iter().map(|s| s.to_string()).collect(); + + super::remove_stale_periodic_jobs(&mut redis, &context, ®istered_jobs) + .await + .unwrap(); + } }