Skip to content

Commit

Permalink
Add tests for remove_stale_periodic_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
spencewenski committed May 17, 2024
1 parent 8bfb27b commit b7481f6
Showing 1 changed file with 156 additions and 49 deletions.
205 changes: 156 additions & 49 deletions src/service/worker/sidekiq/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use crate::service::worker::sidekiq::Processor;
use crate::service::{AppService, AppServiceBuilder};
use anyhow::{anyhow, bail};
use async_trait::async_trait;
use bb8::PooledConnection;
use itertools::Itertools;
use num_traits::ToPrimitive;
use serde::Serialize;
use sidekiq::{periodic, ProcessorConfig};
use sidekiq::redis_rs::ToRedisArgs;
use sidekiq::{periodic, ProcessorConfig, RedisConnection, RedisConnectionManager, RedisError};
use std::collections::HashSet;
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -57,7 +59,9 @@ where
registered_periodic_workers,
..
} => {
Self::remove_stale_periodic_jobs(context, &registered_periodic_workers).await?;
let mut conn = context.redis_enqueue().get().await?;
remove_stale_periodic_jobs(&mut conn, context, &registered_periodic_workers)
.await?;
SidekiqWorkerService {
processor: processor.into_sidekiq_processor(),
}
Expand Down Expand Up @@ -273,56 +277,92 @@ where

Ok(self)
}
}

/// Compares the list of periodic jobs that were registered by the app during app startup with
/// the list of periodic jobs in Redis, and removes any that exist in Redis but weren't
/// registered during start up.
///
/// The jobs are only removed if the [worker.sidekiq.periodic.stale-cleanup][crate::config::worker::Periodic]
/// config is set to [auto-clean-stale][StaleCleanUpBehavior::AutoCleanStale].
///
/// This is run after all the app's periodic jobs have been registered.
pub(crate) async fn remove_stale_periodic_jobs(
context: &AppContext<A::State>,
registered_periodic_workers: &HashSet<String>,
) -> anyhow::Result<()> {
let mut conn = context.redis_enqueue().get().await?;
let stale_jobs = conn
.zrange(PERIODIC_KEY.to_string(), 0, -1)
.await?
.into_iter()
.filter(|job| !registered_periodic_workers.contains(job))
.collect_vec();

if stale_jobs.is_empty() {
info!("No stale periodic jobs found");
return Ok(());
}
/// Compares the list of periodic jobs that were registered by the app during app startup with
/// the list of periodic jobs in Redis, and removes any that exist in Redis but weren't
/// registered during start up.
///
/// The jobs are only removed if the [worker.sidekiq.periodic.stale-cleanup][crate::config::worker::Periodic]
/// config is set to [auto-clean-stale][StaleCleanUpBehavior::AutoCleanStale].
///
/// This is run after all the app's periodic jobs have been registered.
async fn remove_stale_periodic_jobs<S, C: RedisCommands>(
conn: &mut C,
context: &AppContext<S>,
registered_periodic_workers: &HashSet<String>,
) -> anyhow::Result<()> {
let stale_jobs = conn
.zrange(PERIODIC_KEY.to_string(), 0, -1)
.await?
.into_iter()
.filter(|job| !registered_periodic_workers.contains(job))
.collect_vec();

if stale_jobs.is_empty() {
info!("No stale periodic jobs found");
return Ok(());
}

if context
.config()
.service
.sidekiq
.custom
.periodic
.stale_cleanup
== StaleCleanUpBehavior::AutoCleanStale
{
info!(
"Removing {} stale periodic jobs:\n{}",
stale_jobs.len(),
stale_jobs.join("\n")
);
conn.zrem(PERIODIC_KEY.to_string(), &stale_jobs).await?;
} else {
warn!(
"Found {} stale periodic jobs:\n{}",
stale_jobs.len(),
stale_jobs.join("\n")
);
}
if context
.config()
.service
.sidekiq
.custom
.periodic
.stale_cleanup
== StaleCleanUpBehavior::AutoCleanStale
{
info!(
"Removing {} stale periodic jobs:\n{}",
stale_jobs.len(),
stale_jobs.join("\n")
);
conn.zrem(PERIODIC_KEY.to_string(), stale_jobs.clone())
.await?;
} else {
warn!(
"Found {} stale periodic jobs:\n{}",
stale_jobs.len(),
stale_jobs.join("\n")
);
}

Ok(())
Ok(())
}

/// Trait to help with mocking responses from Redis.
// Todo: Make available to other parts of the project?
#[async_trait]
trait RedisCommands {
async fn zrange(
&mut self,
key: String,
lower: isize,
upper: isize,
) -> Result<Vec<String>, RedisError>;

async fn zrem<V>(&mut self, key: String, value: V) -> Result<bool, RedisError>
where
V: ToRedisArgs + Send + Sync + 'static;
}

#[async_trait]
impl<'a> RedisCommands for PooledConnection<'a, RedisConnectionManager> {
async fn zrange(
&mut self,
key: String,
lower: isize,
upper: isize,
) -> Result<Vec<String>, RedisError> {
RedisConnection::zrange(self, key, lower, upper).await
}

async fn zrem<V>(&mut self, key: String, value: V) -> Result<bool, RedisError>
where
V: ToRedisArgs + Send + Sync + 'static,
{
RedisConnection::zrem(self, key, value).await
}
}

Expand Down Expand Up @@ -499,4 +539,71 @@ mod tests {
}
}
}

#[rstest]
#[case(false, Default::default(), Default::default(), Default::default())]
#[case(true, Default::default(), Default::default(), Default::default())]
#[case(true, Default::default(), vec!["foo".to_string()], vec!["foo".to_string()])]
#[case(true, vec!["foo".to_string()], vec!["foo".to_string()], Default::default())]
#[case(true, vec!["foo".to_string()], vec!["bar".to_string()], vec!["bar".to_string()])]
#[case(false, Default::default(), vec!["foo".to_string()], Default::default())]
#[tokio::test]
#[cfg_attr(coverage_nightly, coverage(off))]
async fn remove_stale_periodic_jobs(
#[case] clean_stale: bool,
#[case] registered_jobs: Vec<String>,
#[case] jobs_in_redis: Vec<String>,
#[case] expected_jobs_removed: Vec<String>,
) {
let mut config = AppConfig::empty(None).unwrap();
if clean_stale {
config.service.sidekiq.custom.periodic.stale_cleanup =
StaleCleanUpBehavior::AutoCleanStale;
} else {
config.service.sidekiq.custom.periodic.stale_cleanup = StaleCleanUpBehavior::Manual;
}

let mut context = MockAppContext::<MockTestApp>::default();
context.expect_config().return_const(config);

let mut redis = MockTestRedisCommands::default();
redis
.expect_zrange()
.times(1)
.return_once(move |_, _, _| Ok(jobs_in_redis));

let zrem = redis.expect_zrem();
if clean_stale && !expected_jobs_removed.is_empty() {
zrem.times(1);
} else {
zrem.never();
}
zrem.withf(move |key, jobs| PERIODIC_KEY == key && expected_jobs_removed.iter().eq(jobs))
.return_once(|_, _: Vec<String>| Ok(true));

let registered_jobs: HashSet<String> =
registered_jobs.iter().map(|s| s.to_string()).collect();

super::remove_stale_periodic_jobs(&mut redis, &context, &registered_jobs)
.await
.unwrap();
}

mockall::mock! {
TestRedisCommands {}

#[async_trait]
impl RedisCommands for TestRedisCommands {
async fn zrange(
& mut self,
key: String,
lower: isize,
upper: isize,
) -> Result<Vec<String>, RedisError>;

async fn zrem<V>(&mut self, key: String, value: V) -> Result<bool, RedisError>
where
V: ToRedisArgs + Send + Sync + 'static;
}
}
}

0 comments on commit b7481f6

Please sign in to comment.