Skip to content

Commit

Permalink
feat: Move sidekiq "stale cleanup" to new before_run service method
Browse files Browse the repository at this point in the history
In order to allow handling CLI commands with services without requiring
the sidkiq redis connection to be available, we need to move the "stale
cleanup" behavior out of the "build" method so that the service can be
built without needing to access redis.

This is accomplished by adding a new `AppService::before_run` trait
method, which is run after all the services are built and before any of
the services are run. The sidekiq service's "stale cleanup" behavior is
now implemented in this trait method.

This PR also updates the health check API to run the db and redis checks
in parallel.

Closes #238
  • Loading branch information
spencewenski committed Jun 23, 2024
1 parent 4a336c6 commit 473d805
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 191 deletions.
101 changes: 66 additions & 35 deletions src/api/core/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, skip_serializing_none};
#[cfg(feature = "sidekiq")]
use sidekiq::redis_rs::cmd;
#[cfg(feature = "sidekiq")]
#[cfg(any(feature = "db-sql", feature = "sidekiq"))]
use std::time::Duration;
use std::time::Instant;
#[cfg(feature = "sidekiq")]
Expand Down Expand Up @@ -79,35 +79,21 @@ where
S: Clone + Send + Sync + 'static,
{
let timer = Instant::now();
#[cfg(any(feature = "sidekiq", feature = "db-sql"))]
#[cfg(feature = "db-sql")]
let db = {
let db_timer = Instant::now();
let db_status = if ping_db(state.db()).await.is_ok() {
Status::Ok
} else {
Status::Err(ErrorData { msg: None })
};
let db_timer = db_timer.elapsed();
ResourceHealth {
status: db_status,
acquire_conn_latency: None,
ping_latency: None,
latency: db_timer.as_millis(),
}
};

#[cfg(feature = "sidekiq")]
let (redis_enqueue, redis_fetch) = {
let redis_enqueue = redis_health(state.redis_enqueue());
if let Some(redis_fetch) = state.redis_fetch() {
let (redis_enqueue, redis_fetch) =
tokio::join!(redis_enqueue, redis_health(redis_fetch));
(redis_enqueue, Some(redis_fetch))
} else {
(redis_enqueue.await, None)
}
};
#[cfg(any(feature = "db-sql", feature = "sidekiq"))]
let timeout_duration = Duration::from_secs(1);

#[cfg(all(feature = "db-sql", feature = "sidekiq"))]
let (db, (redis_enqueue, redis_fetch)) = tokio::join!(
db_health(state, timeout_duration),
all_redis_health(state, timeout_duration)
);

#[cfg(all(feature = "db-sql", not(feature = "sidekiq")))]
let db = db_health(state).await;

#[cfg(all(not(feature = "db-sql"), feature = "sidekiq"))]
let (redis_enqueue, redis_fetch) = all_redis_health(state).await;

Ok(HeathCheckResponse {
latency: timer.elapsed().as_millis(),
Expand All @@ -120,18 +106,60 @@ where
})
}

#[cfg(feature = "db-sql")]
pub(crate) async fn db_health<S>(state: &AppContext<S>, duration: Duration) -> ResourceHealth
where
S: Clone + Send + Sync + 'static,
{
let db_timer = Instant::now();
let db_status = match ping_db(state.db(), duration).await {
Ok(_) => Status::Ok,
Err(err) => Status::Err(ErrorData {
msg: Some(err.to_string()),
}),
};
let db_timer = db_timer.elapsed();
ResourceHealth {
status: db_status,
acquire_conn_latency: None,
ping_latency: None,
latency: db_timer.as_millis(),
}
}

#[cfg(feature = "db-sql")]
#[instrument(skip_all)]
async fn ping_db(db: &DatabaseConnection) -> RoadsterResult<()> {
db.ping().await?;
async fn ping_db(db: &DatabaseConnection, duration: Duration) -> RoadsterResult<()> {
timeout(duration, db.ping()).await??;
Ok(())
}

#[cfg(feature = "sidekiq")]
pub(crate) async fn all_redis_health<S>(
state: &AppContext<S>,
duration: Duration,
) -> (ResourceHealth, Option<ResourceHealth>)
where
S: Clone + Send + Sync + 'static,
{
{
let redis_enqueue = redis_health(state.redis_enqueue(), duration);
if let Some(redis_fetch) = state.redis_fetch() {
let (redis_enqueue, redis_fetch) =
tokio::join!(redis_enqueue, redis_health(redis_fetch, duration));
(redis_enqueue, Some(redis_fetch))
} else {
(redis_enqueue.await, None)
}
}
}

#[cfg(feature = "sidekiq")]
#[instrument(skip_all)]
async fn redis_health(redis: &sidekiq::RedisPool) -> ResourceHealth {
async fn redis_health(redis: &sidekiq::RedisPool, duration: Duration) -> ResourceHealth {
let redis_timer = Instant::now();
let (redis_status, acquire_conn_latency, ping_latency) = match ping_redis(redis).await {
let (redis_status, acquire_conn_latency, ping_latency) = match ping_redis(redis, duration).await
{
Ok((a, b)) => (Status::Ok, Some(a.as_millis()), Some(b.as_millis())),
Err(err) => (
Status::Err(ErrorData {
Expand All @@ -152,9 +180,12 @@ async fn redis_health(redis: &sidekiq::RedisPool) -> ResourceHealth {

#[cfg(feature = "sidekiq")]
#[instrument(skip_all)]
async fn ping_redis(redis: &sidekiq::RedisPool) -> RoadsterResult<(Duration, Duration)> {
async fn ping_redis(
redis: &sidekiq::RedisPool,
duration: Duration,
) -> RoadsterResult<(Duration, Duration)> {
let timer = Instant::now();
let mut conn = timeout(Duration::from_secs(1), redis.get()).await??;
let mut conn = timeout(duration, redis.get()).await??;
let acquire_conn_latency = timer.elapsed();

let timer = Instant::now();
Expand Down
2 changes: 2 additions & 0 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ where
A::M::up(context.db(), None).await?;
}

crate::service::runner::before_run(&service_registry, &context).await?;

crate::service::runner::run(service_registry, &context).await?;

Ok(())
Expand Down
8 changes: 8 additions & 0 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ pub trait AppService<A: App + 'static>: Send + Sync {
Ok(false)
}

/// Perform any initialization work or other checks that should be done before the service runs.
///
/// For example, checking that the service is healthy, removing stale items from the
/// service's queue, etc.
async fn before_run(&self, _app_context: &AppContext<A::State>) -> RoadsterResult<()> {
Ok(())
}

/// Run the service in a new tokio task.
///
/// * cancel_token - A tokio [CancellationToken] to use as a signal to gracefully shut down
Expand Down
15 changes: 15 additions & 0 deletions src/service/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ where
Ok(false)
}

pub(crate) async fn before_run<A>(
service_registry: &ServiceRegistry<A>,
context: &AppContext<A::State>,
) -> RoadsterResult<()>
where
A: App,
{
for (name, service) in service_registry.services.iter() {
info!(service=%name, "Running `before run`");
service.before_run(context).await?;
}

Ok(())
}

pub(crate) async fn run<A>(
service_registry: ServiceRegistry<A>,
context: &AppContext<A::State>,
Expand Down
161 changes: 8 additions & 153 deletions src/service/worker/sidekiq/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,14 @@ use crate::service::worker::sidekiq::Processor;
use crate::service::AppServiceBuilder;
use anyhow::anyhow;
use async_trait::async_trait;
use bb8::PooledConnection;
use itertools::Itertools;
use num_traits::ToPrimitive;
use serde::Serialize;
use sidekiq::redis_rs::ToRedisArgs;
use sidekiq::{
periodic, ProcessorConfig, RedisConnection, RedisConnectionManager, RedisError,
ServerMiddleware,
};
use sidekiq::{periodic, ProcessorConfig, ServerMiddleware};
use std::collections::HashSet;
use tracing::{debug, info, warn};
use tracing::{debug, info};

const PERIODIC_KEY: &str = "periodic";
pub(crate) const PERIODIC_KEY: &str = "periodic";

pub struct SidekiqWorkerServiceBuilder<A>
where
Expand Down Expand Up @@ -57,20 +52,16 @@ where
}
}

async fn build(self, context: &AppContext<A::State>) -> RoadsterResult<SidekiqWorkerService> {
async fn build(self, _context: &AppContext<A::State>) -> RoadsterResult<SidekiqWorkerService> {
let service = match self.state {
BuilderState::Enabled {
processor,
registered_periodic_workers,
..
} => {
let mut conn = context.redis_enqueue().get().await?;
remove_stale_periodic_jobs(&mut conn, context, &registered_periodic_workers)
.await?;
SidekiqWorkerService {
processor: processor.into_sidekiq_processor(),
}
}
} => SidekiqWorkerService {
registered_periodic_workers,
processor: processor.into_sidekiq_processor(),
},
BuilderState::Disabled => {
return Err(anyhow!(
"This builder is not enabled; it's build method should not have been called."
Expand Down Expand Up @@ -294,94 +285,6 @@ where
}
}

/// Compares the list of periodic jobs that were registered by the app during app startup with
/// the list of periodic jobs in Redis, and removes any that exist in Redis but weren't
/// registered during start up.
///
/// The jobs are only removed if the [worker.sidekiq.periodic.stale-cleanup][crate::config::worker::Periodic]
/// config is set to [auto-clean-stale][StaleCleanUpBehavior::AutoCleanStale].
///
/// This is run after all the app's periodic jobs have been registered.
async fn remove_stale_periodic_jobs<S, C: RedisCommands>(
conn: &mut C,
context: &AppContext<S>,
registered_periodic_workers: &HashSet<String>,
) -> RoadsterResult<()> {
let stale_jobs = conn
.zrange(PERIODIC_KEY.to_string(), 0, -1)
.await?
.into_iter()
.filter(|job| !registered_periodic_workers.contains(job))
.collect_vec();

if stale_jobs.is_empty() {
info!("No stale periodic jobs found");
return Ok(());
}

if context
.config()
.service
.sidekiq
.custom
.periodic
.stale_cleanup
== StaleCleanUpBehavior::AutoCleanStale
{
info!(
"Removing {} stale periodic jobs:\n{}",
stale_jobs.len(),
stale_jobs.join("\n")
);
conn.zrem(PERIODIC_KEY.to_string(), stale_jobs.clone())
.await?;
} else {
warn!(
"Found {} stale periodic jobs:\n{}",
stale_jobs.len(),
stale_jobs.join("\n")
);
}

Ok(())
}

/// Trait to help with mocking responses from Redis.
// Todo: Make available to other parts of the project?
#[cfg_attr(test, mockall::automock)]
#[async_trait]
trait RedisCommands {
async fn zrange(
&mut self,
key: String,
lower: isize,
upper: isize,
) -> Result<Vec<String>, RedisError>;

async fn zrem<V>(&mut self, key: String, value: V) -> Result<bool, RedisError>
where
V: ToRedisArgs + Send + Sync + 'static;
}

#[async_trait]
impl<'a> RedisCommands for PooledConnection<'a, RedisConnectionManager> {
async fn zrange(
&mut self,
key: String,
lower: isize,
upper: isize,
) -> Result<Vec<String>, RedisError> {
RedisConnection::zrange(self, key, lower, upper).await
}

async fn zrem<V>(&mut self, key: String, value: V) -> Result<bool, RedisError>
where
V: ToRedisArgs + Send + Sync + 'static,
{
RedisConnection::zrem(self, key, value).await
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -584,52 +487,4 @@ mod tests {
// Assert
assert_eq!(result.is_err(), expect_err);
}

#[rstest]
#[case(false, Default::default(), Default::default(), Default::default())]
#[case(true, Default::default(), Default::default(), Default::default())]
#[case(true, Default::default(), vec!["foo".to_string()], vec!["foo".to_string()])]
#[case(true, vec!["foo".to_string()], vec!["foo".to_string()], Default::default())]
#[case(true, vec!["foo".to_string()], vec!["bar".to_string()], vec!["bar".to_string()])]
#[case(false, Default::default(), vec!["foo".to_string()], Default::default())]
#[tokio::test]
#[cfg_attr(coverage_nightly, coverage(off))]
async fn remove_stale_periodic_jobs(
#[case] clean_stale: bool,
#[case] registered_jobs: Vec<String>,
#[case] jobs_in_redis: Vec<String>,
#[case] expected_jobs_removed: Vec<String>,
) {
let mut config = AppConfig::test(None).unwrap();
if clean_stale {
config.service.sidekiq.custom.periodic.stale_cleanup =
StaleCleanUpBehavior::AutoCleanStale;
} else {
config.service.sidekiq.custom.periodic.stale_cleanup = StaleCleanUpBehavior::Manual;
}

let context = AppContext::<()>::test(Some(config), None, None).unwrap();

let mut redis = MockRedisCommands::default();
redis
.expect_zrange()
.times(1)
.return_once(move |_, _, _| Ok(jobs_in_redis));

let zrem = redis.expect_zrem();
if clean_stale && !expected_jobs_removed.is_empty() {
zrem.times(1);
} else {
zrem.never();
}
zrem.withf(move |key, jobs| PERIODIC_KEY == key && expected_jobs_removed.iter().eq(jobs))
.return_once(|_, _: Vec<String>| Ok(true));

let registered_jobs: HashSet<String> =
registered_jobs.iter().map(|s| s.to_string()).collect();

super::remove_stale_periodic_jobs(&mut redis, &context, &registered_jobs)
.await
.unwrap();
}
}
Loading

0 comments on commit 473d805

Please sign in to comment.