Skip to content

Commit

Permalink
Remove stale periodic jobs
Browse files Browse the repository at this point in the history
Sidekiq.rs only provides the ability to clear all periodic jobs. If this
method is used to remove stale periodic jobs when deploying, then the
periodic jobs are repeatedly removed and re-added, even when they aren't
stale.

Add a way to track which periodic workers are added, and once all
periodic jobs are added, remove any that were added by a previous
deployment but not the current deployment. This way, only the actually
stale periodic jobs are removed.
  • Loading branch information
spencewenski committed Apr 23, 2024
1 parent bca26d0 commit 721ad12
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ sea-orm-migration = { version = "1.0.0-rc.2", features = ["runtime-tokio-rustls"

# Workers
# Todo: the default `rss-stats` feature has a dependency that currently can't be satisfied (memchr: ~2.3)
rusty-sidekiq = { version = "0.10.0", default-features = false, optional = true }
rusty-sidekiq = { version = "0.10.2", default-features = false, optional = true }
bb8 = { version = "0.8.3", optional = true }
num_cpus = { version = "1.16.0", optional = true }

Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
[![Feature Powerset](https://github.com/roadster-rs/roadster/actions/workflows/feature_powerset.yml/badge.svg)](https://github.com/roadster-rs/roadster/actions/workflows/feature_powerset.yml)

A "Batteries Included" web framework for rust designed to get you moving fast 🏎️. Inspired by other fully-featured
frameworks such as [Rails](https://rubyonrails.org/), [Django](https://www.djangoproject.com/), [Laravel](https://laravel.com/), [Loco](https://github.com/loco-rs/loco),
frameworks such
as [Rails](https://rubyonrails.org/), [Django](https://www.djangoproject.com/), [Laravel](https://laravel.com/), [Loco](https://github.com/loco-rs/loco),
and [Poem](https://github.com/poem-web/poem).

## Features
Expand All @@ -24,7 +25,8 @@ and [Poem](https://github.com/poem-web/poem).
the `db-sql` feature)
- Built-in support for [Sidekiq.rs](https://crates.io/crates/rusty-sidekiq) for running async/background jobs (requires
the `sidekiq` feature)
- Structured logs/traces using tokio's [tracing](https://docs.rs/tracing/latest/tracing/) crate. Export traces/metrics using OpenTelemetry (requires the `otel` feature).
- Structured logs/traces using tokio's [tracing](https://docs.rs/tracing/latest/tracing/) crate. Export traces/metrics
using OpenTelemetry (requires the `otel` feature).

# Start local DB

Expand Down Expand Up @@ -136,6 +138,7 @@ You can also inspect the Redis DB directly using [RedisInsight](https://redis.io
# Linux docker commands
docker run -d --name redisinsight --network=host -p 5540:5540 redis/redisinsight:latest
# Mac docker commands -- todo: see if there's a command that will work on both mac and linux
# Use `host.docker.internal` as the host domain in redis insight (instead of `127.0.0.1`)
docker run -d --name redisinsight -p 5540:5540 redis/redisinsight:latest
```

Expand Down
19 changes: 12 additions & 7 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::cli::{RoadsterCli, RunCommand, RunRoadsterCommand};
use crate::config::app_config::AppConfig;
#[cfg(not(feature = "cli"))]
use crate::config::environment::Environment;
#[cfg(feature = "sidekiq")]
use crate::config::worker::StaleCleanUpBehavior;
use crate::controller::middleware::default::default_middleware;
use crate::controller::middleware::Middleware;
use crate::initializer::default::default_initializers;
Expand Down Expand Up @@ -205,9 +207,13 @@ where

#[cfg(feature = "sidekiq")]
let (processor, sidekiq_cancellation_token, _sidekiq_cancellation_token_drop_guard) = {
// Periodic jobs are not removed automatically. Remove any periodic jobs that were
// previously added. They should be re-added by `App::worker`.
periodic::destroy_all(redis.clone()).await?;
if context.config.worker.sidekiq.periodic.stale_cleanup
== StaleCleanUpBehavior::AutoCleanAll
{
// Periodic jobs are not removed automatically. Remove any periodic jobs that were
// previously added. They should be re-added by `App::worker`.
periodic::destroy_all(redis.clone()).await?;
}
let custom_queue_names = context
.config
.worker
Expand All @@ -224,11 +230,10 @@ where
);
debug!("Sidekiq.rs queues: {queues:?}");
let processor = {
let mut registry = WorkerRegistry {
processor: Processor::new(redis, queues.clone()),
state: state.clone(),
};
let mut registry =
WorkerRegistry::new(Processor::new(redis, queues.clone()), state.clone());
A::workers(&mut registry, &context, &state).await?;
registry.remove_stale_periodic_jobs(&context).await?;
registry.processor
};
let token = processor.get_cancellation_token();
Expand Down
28 changes: 28 additions & 0 deletions src/config/worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::config::app_config::Redis;
use crate::worker::app_worker::AppWorkerConfig;
use serde_derive::{Deserialize, Serialize};
use strum_macros::{EnumString, IntoStaticStr};

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -23,8 +24,35 @@ pub struct Sidekiq {
#[serde(default)]
pub queues: Vec<String>,

#[serde(default)]
pub periodic: Periodic,

/// The default app worker config. Values can be overridden on a per-worker basis by
/// implementing the corresponding [crate::worker::app_worker::AppWorker] methods.
#[serde(default, flatten)]
pub worker_config: AppWorkerConfig,
}

#[cfg(feature = "sidekiq")]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Periodic {
pub stale_cleanup: StaleCleanUpBehavior,
}

impl Default for Periodic {
fn default() -> Self {
Self {
stale_cleanup: StaleCleanUpBehavior::AutoCleanStale,
}
}
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, EnumString, IntoStaticStr)]
#[serde(rename_all = "kebab-case")]
#[strum(serialize_all = "kebab-case")]
pub enum StaleCleanUpBehavior {
Manual,
AutoCleanAll,
AutoCleanStale,
}
2 changes: 1 addition & 1 deletion src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn queues(custom_queue_names: &Vec<String>) -> Vec<String> {
/// Worker used by Roadster to wrap the consuming app's workers to add additional behavior. For
/// example, [RoadsterWorker] is by default configured to automatically abort the app's worker
/// when it exceeds a certain timeout.
pub(crate) struct RoadsterWorker<A, Args, W>
pub struct RoadsterWorker<A, Args, W>
where
A: App,
Args: Send + Sync + Serialize + 'static,
Expand Down
75 changes: 71 additions & 4 deletions src/worker/registry.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use crate::app::App;
use crate::app_context::AppContext;
use crate::config::worker::StaleCleanUpBehavior;
use crate::worker::app_worker::AppWorker;
use crate::worker::RoadsterWorker;
use itertools::Itertools;
use serde::Serialize;
use sidekiq::{periodic, Processor};
use std::collections::HashSet;
use std::sync::Arc;
use tracing::debug;
use tracing::{debug, info, warn};

const PERIODIC_KEY: &str = "periodic";

/// Custom wrapper around [Processor] to help with registering [workers][AppWorker] that are
/// wrapped by [RoadsterWorker].
Expand All @@ -14,12 +20,23 @@ where
{
pub(crate) processor: Processor,
pub(crate) state: Arc<A::State>,
pub(crate) registered_workers: HashSet<String>,
pub(crate) registered_periodic_workers: HashSet<String>,
}

impl<A> WorkerRegistry<A>
where
A: App + 'static,
{
pub(crate) fn new(processor: Processor, state: Arc<A::State>) -> Self {
Self {
processor,
state,
registered_workers: Default::default(),
registered_periodic_workers: Default::default(),
}
}

/// Register a [worker][AppWorker] to handle Sidekiq.rs jobs.
///
/// The worker will be wrapped by a [RoadsterWorker], which provides some common behavior, such
Expand All @@ -29,7 +46,9 @@ where
Args: Sync + Send + Serialize + for<'de> serde::Deserialize<'de> + 'static,
W: AppWorker<A, Args> + 'static,
{
debug!("Registering worker: `{}`", W::class_name());
let class_name = W::class_name();
debug!(worker = class_name, "Registering worker");
self.registered_workers.insert(class_name.clone());
let roadster_worker = RoadsterWorker::new(worker, self.state.clone());
self.processor.register(roadster_worker);
}
Expand All @@ -51,12 +70,60 @@ where
Args: Sync + Send + Serialize + for<'de> serde::Deserialize<'de> + 'static,
W: AppWorker<A, Args> + 'static,
{
debug!("Registering periodic worker: `{}`", W::class_name());
let class_name = W::class_name();
debug!(worker = class_name, "Registering periodic worker");
let roadster_worker = RoadsterWorker::new(worker, self.state.clone());
let builder = builder.args(args)?;
let job_json = serde_json::to_string(&builder.into_periodic_job(class_name)?)?;
self.registered_periodic_workers.insert(job_json);
builder
.args(args)?
.register(&mut self.processor, roadster_worker)
.await?;
Ok(())
}

/// Compares the list of periodic jobs that were registered by the app during app startup with
/// the list of periodic jobs in Redis, and removes any that exist in Redis but weren't
/// registered during start up.
///
/// The jobs are only removed if the [worker.sidekiq.periodic.stale-cleanup][crate::config::worker::Periodic]
/// config is set to [auto-clean-stale][StaleCleanUpBehavior::AutoCleanStale].
///
/// This is run after all the app's periodic jobs have been registered.
pub(crate) async fn remove_stale_periodic_jobs(
&self,
context: &AppContext,
) -> anyhow::Result<()> {
let mut conn = context.redis.get().await?;
let stale_jobs = conn
.zrange(PERIODIC_KEY.to_string(), 0, -1)
.await?
.into_iter()
.filter(|job| !self.registered_periodic_workers.contains(job))
.collect_vec();

if stale_jobs.is_empty() {
info!("No stale periodic jobs found");
return Ok(());
}

if context.config.worker.sidekiq.periodic.stale_cleanup
== StaleCleanUpBehavior::AutoCleanStale
{
info!(
"Removing {} stale periodic jobs:\n{}",
stale_jobs.len(),
stale_jobs.join("\n")
);
conn.zrem(PERIODIC_KEY.to_string(), &stale_jobs).await?;
} else {
warn!(
"Found {} stale periodic jobs:\n{}",
stale_jobs.len(),
stale_jobs.join("\n")
);
}

Ok(())
}
}

0 comments on commit 721ad12

Please sign in to comment.