From e5f6ed079c6ff7cf13acac395a5138e1f129b115 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 22 Oct 2024 23:57:12 +0200 Subject: [PATCH 1/5] feat: initial background cleanup task --- Cargo.lock | 475 +++++++++++++++++++++++++++++ crates/server/Cargo.toml | 4 + crates/server/src/lib.rs | 1 + crates/server/src/main.rs | 74 ++++- crates/server/src/tasks.rs | 1 + crates/server/src/tasks/cleanup.rs | 23 ++ 6 files changed, 572 insertions(+), 6 deletions(-) create mode 100644 crates/server/src/tasks.rs create mode 100644 crates/server/src/tasks/cleanup.rs diff --git a/Cargo.lock b/Cargo.lock index 69442a3..ed234f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -114,6 +114,102 @@ version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +[[package]] +name = "apalis" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3da210662fee6932c2d8b291c4b33a573b5eafe09e23c6bf01295285a7d86e03" +dependencies = [ + "apalis-core", + "apalis-cron", + "apalis-redis", + "apalis-sql", + "futures", + "metrics", + "metrics-exporter-prometheus", + "pin-project-lite", + "sentry-core", + "serde", + "thiserror", + "tokio", + "tower", + "tracing", + "tracing-futures", + "ulid", + "uuid", +] + +[[package]] +name = "apalis-core" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91befa475b114da630f0781fc48a577932a54790adf0a47edac00d4f058282d9" +dependencies = [ + "async-oneshot", + "futures", + "futures-timer", + "pin-project-lite", + "serde", + "serde_json", + "thiserror", + "tower", + "ulid", +] + +[[package]] +name = "apalis-cron" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "344799ff16579e1ae362143682d7bc29527adfbb9f421913ec8d8629276f7386" +dependencies = [ + "apalis-core", + "async-stream", + "chrono", + "cron", + "futures", + "tower", +] + +[[package]] +name = "apalis-redis" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a77c9038106f6fa16e66fac08bc0a04edaf50b48d5a4328a98806d3f2f52c06d" +dependencies = [ + "apalis-core", + "async-stream", + "async-trait", + "chrono", + "futures", + "log", + "redis", + "serde", + "tokio", +] + +[[package]] +name = "apalis-sql" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f5a576d1d6df3de40ab8bf66bd33912718fc9c51d98f63cfb71f60996410bc" +dependencies = [ + "apalis-core", + "async-stream", + "futures", + "futures-lite", + "log", + "serde", + "serde_json", + "sqlx", + "tokio", +] + +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "ariadne" version = "0.4.1" @@ -124,6 +220,37 @@ dependencies = [ "yansi", ] +[[package]] +name = "async-oneshot" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae47de2a02d543205f3f5457a90b6ecbc9494db70557bd29590ec8f1ddff5463" +dependencies = [ + "futures-micro", +] + +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "async-trait" version = "0.1.83" @@ -422,6 +549,20 @@ dependencies = [ "yansi", ] +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -482,6 +623,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "cron" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -491,6 +643,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -551,6 +712,16 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "debugid" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" +dependencies = [ + "serde", + "uuid", +] + [[package]] name = "der" version = "0.7.9" @@ -562,6 +733,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "derive-getters" version = "0.3.0" @@ -823,6 +1003,19 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-lite" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.30" @@ -834,6 +1027,15 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "futures-micro" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b460264b3593d68b16a7bc35f7bc226ddfebdf9a1c8db1ed95d5cc6b7168c826" +dependencies = [ + "pin-project-lite", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -957,13 +1159,17 @@ dependencies = [ name = "gq-server" version = "0.1.0" dependencies = [ + "apalis", "axum", "chrono", + "cron", + "dotenvy", "http-serde", "serde", "sqlx", "thiserror", "tokio", + "tower", "tracing", "tracing-subscriber", "uuid", @@ -1370,6 +1576,45 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "metrics" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884adb57038347dfbaf2d5065887b6cf4312330dc8e94bc30a1a839bd79d3261" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.15.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6" +dependencies = [ + "base64", + "indexmap", + "metrics", + "metrics-util", + "quanta", + "thiserror", +] + +[[package]] +name = "metrics-util" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4259040465c955f9f2f1a4a8a16dc46726169bca0f88e8fb2dbeced487c3e828" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown", + "metrics", + "num_cpus", + "quanta", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -1440,6 +1685,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -1470,6 +1721,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.9", + "libc", +] + [[package]] name = "object" version = "0.36.5" @@ -1544,6 +1805,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf123a161dde1e524adf36f90bc5d8d3462824a9c43553ad07a8183161189ec" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4502d8515ca9f32f1fb543d987f63d95a14934883db45bdb48060b6b69257f8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -1598,6 +1879,12 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -1625,6 +1912,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.37" @@ -1664,6 +1966,38 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags", +] + +[[package]] +name = "redis" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "tokio", + "tokio-retry", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.5.7" @@ -1831,6 +2165,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.9.0" @@ -1881,6 +2224,35 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +[[package]] +name = "sentry-core" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "161283cfe8e99c8f6f236a402b9ccf726b201f365988b5bb637ebca0abbd4a30" +dependencies = [ + "once_cell", + "sentry-types", + "serde", + "serde_json", +] + +[[package]] +name = "sentry-types" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d68cdf6bc41b8ff3ae2a9c4671e97426dcdd154cc1d4b6b72813f285d6b163f" +dependencies = [ + "debugid", + "hex", + "rand", + "serde", + "serde_json", + "thiserror", + "time", + "url", + "uuid", +] + [[package]] name = "serde" version = "1.0.210" @@ -1960,6 +2332,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" @@ -2005,6 +2383,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "sketches-ddsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" + [[package]] name = "slab" version = "0.4.9" @@ -2103,6 +2487,8 @@ dependencies = [ "once_cell", "paste", "percent-encoding", + "rustls", + "rustls-pemfile", "serde", "serde_json", "sha2", @@ -2114,6 +2500,7 @@ dependencies = [ "tracing", "url", "uuid", + "webpki-roots", ] [[package]] @@ -2364,6 +2751,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinyvec" version = "1.8.0" @@ -2408,6 +2826,17 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.16" @@ -2419,6 +2848,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml_datetime" version = "0.6.8" @@ -2447,6 +2889,7 @@ dependencies = [ "pin-project-lite", "sync_wrapper 0.1.2", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -2497,6 +2940,15 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "tracing", +] + [[package]] name = "tracing-log" version = "0.2.0" @@ -2532,6 +2984,18 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ulid" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289" +dependencies = [ + "getrandom", + "rand", + "uuid", + "web-time", +] + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -2608,6 +3072,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] @@ -2731,6 +3196,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.26.6" diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 8975973..9d7ca6b 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -28,3 +28,7 @@ chrono = { version = "0.4.38", features = ["serde"] } http-serde = "2.1.1" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +apalis = { version = "0.5.5", features = ["cron", "layers"] } +tower = { version = "0.5.1" } +cron = "0.12.1" +dotenvy = "0.15.7" diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 673efac..6d8162c 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -6,6 +6,7 @@ pub mod api; pub mod app_state; pub mod model; pub mod services; +pub mod tasks; pub fn app(db_connection: PgPool, max_share_expiration_time_secs: i64) -> Router { let app_state = AppState::new(db_connection, max_share_expiration_time_secs); diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index 925551e..d447ce9 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -1,12 +1,18 @@ +use apalis::{ + cron::{CronStream, Schedule}, + layers::retry::{RetryLayer, RetryPolicy}, + prelude::{Monitor, WorkerBuilder, WorkerFactoryFn}, + utils::TokioExecutor, +}; +use gq_server::tasks::cleanup; +use sqlx::{postgres::PgPoolOptions, PgPool}; use std::{env, net::SocketAddr}; - -use sqlx::postgres::PgPoolOptions; use tracing::level_filters::LevelFilter; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; #[tokio::main] async fn main() { - // TODO: background vacuum job + let _ = dotenvy::dotenv(); let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) @@ -47,18 +53,74 @@ async fn main() { "MAX_SHARE_EXPIRATION_TIME_SECS must be > 0" ); + let cleanup_task_cron_expression = + env::var("CLEANUP_TASK_CRON_EXPRESSION").unwrap_or_else(|_| "0 0 */8 * * * *".to_string()); + sqlx::migrate!("./migrations") .run(&db_connection) .await .expect("Failed to run migrations"); - let app = gq_server::app(db_connection, max_share_expiration_time_secs); + let app = gq_server::app(db_connection.clone(), max_share_expiration_time_secs); let listener = tokio::net::TcpListener::bind(addr) .await .unwrap_or_else(|_| panic!("Failed to bind address {addr}")); tracing::info!("Server started. Listening on {addr}"); - axum::serve(listener, app) + + if let Err(error) = tokio::try_join!( + start_axum_server(listener, app), + start_background_tasks(cleanup_task_cron_expression, db_connection) + ) { + panic!("Error during start: {error}"); + } +} + +#[derive(Debug, thiserror::Error)] +enum StartError { + #[error("Failed to start axum server: {0}")] + Axum(std::io::Error), + #[error("Failed to start scheduler: {0}")] + Scheduler(#[from] SchedulerError), +} + +#[derive(Debug, thiserror::Error)] +enum SchedulerError { + #[error("Failed to parse cron expression {0}: {1}")] + CronExpressionError(String, cron::error::Error), + #[error("Failed to start scheduler: {0}")] + MonitorStartError(std::io::Error), +} + +async fn start_axum_server( + listener: tokio::net::TcpListener, + app: axum::Router, +) -> Result<(), StartError> { + axum::serve(listener, app).await.map_err(StartError::Axum) +} + +async fn start_background_tasks( + cron_expression: String, + db_connection: PgPool, +) -> Result<(), StartError> { + let schedule: Schedule = cron_expression + .parse() + .map_err(|e| SchedulerError::CronExpressionError(cron_expression.to_string(), e))?; + + tracing::info!(schedule = %schedule, "Starting cleanup task worker"); + + let worker = WorkerBuilder::new("cleanup-task-worker") + .layer(RetryLayer::new(RetryPolicy::retries(5))) + // .layer(TraceLayer::new().make_span_with(ReminderSpan::new())) + .stream(CronStream::new(schedule).into_stream()) + .data(db_connection) + .build_fn(cleanup::execute_cleanup); + + Monitor::::new() + .register(worker) + .run() .await - .expect("Failed to start server"); + .map_err(SchedulerError::MonitorStartError)?; + + Ok(()) } diff --git a/crates/server/src/tasks.rs b/crates/server/src/tasks.rs new file mode 100644 index 0000000..9d69e70 --- /dev/null +++ b/crates/server/src/tasks.rs @@ -0,0 +1 @@ +pub mod cleanup; diff --git a/crates/server/src/tasks/cleanup.rs b/crates/server/src/tasks/cleanup.rs new file mode 100644 index 0000000..e384a56 --- /dev/null +++ b/crates/server/src/tasks/cleanup.rs @@ -0,0 +1,23 @@ +use apalis::prelude::{Data, Job}; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; + +#[derive(Default, Debug, Clone)] +pub struct CleanupTask { + start_time: DateTime, +} + +impl From> for CleanupTask { + fn from(start_time: DateTime) -> Self { + CleanupTask { start_time } + } +} + +impl Job for CleanupTask { + const NAME: &'static str = "CleanupTask"; +} + +pub async fn execute_cleanup(task: CleanupTask, db_connection: Data) { + let rfc3339_start_time = task.start_time.to_rfc3339(); + tracing::info!(start_time = rfc3339_start_time, "Executing cleanup task"); +} From 008f66f84cb6abad74767637b57d8713c930a4ae Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Wed, 23 Oct 2024 00:14:35 +0200 Subject: [PATCH 2/5] feat: update --- Cargo.lock | 5 ++--- crates/server/.env.example | 1 + crates/server/Cargo.toml | 1 - crates/server/src/main.rs | 4 +--- crates/server/src/tasks/cleanup.rs | 11 ++++++++++- 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ed234f9..d515e74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,9 +110,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.89" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +checksum = "c042108f3ed77fd83760a5fd79b53be043192bb3b9dba91d8c574c0ada7850c8" [[package]] name = "apalis" @@ -1163,7 +1163,6 @@ dependencies = [ "axum", "chrono", "cron", - "dotenvy", "http-serde", "serde", "sqlx", diff --git a/crates/server/.env.example b/crates/server/.env.example index f176e1d..8147e3f 100644 --- a/crates/server/.env.example +++ b/crates/server/.env.example @@ -2,3 +2,4 @@ PORT=3000 DATABASE_URL=postgres://postgres:password@localhost:5432/db DATABASE_CONNECTIONS=5 MAX_SHARE_EXPIRATION_TIME_SECS=604800 +CLEANUP_TASK_CRON_EXPRESSION="0 0 0 * * * *" diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 9d7ca6b..e0bae22 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -31,4 +31,3 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } apalis = { version = "0.5.5", features = ["cron", "layers"] } tower = { version = "0.5.1" } cron = "0.12.1" -dotenvy = "0.15.7" diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index d447ce9..c36e368 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -12,8 +12,6 @@ use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, Env #[tokio::main] async fn main() { - let _ = dotenvy::dotenv(); - let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy(); @@ -54,7 +52,7 @@ async fn main() { ); let cleanup_task_cron_expression = - env::var("CLEANUP_TASK_CRON_EXPRESSION").unwrap_or_else(|_| "0 0 */8 * * * *".to_string()); + env::var("CLEANUP_TASK_CRON_EXPRESSION").unwrap_or_else(|_| "0 0 0 * * * *".to_string()); sqlx::migrate!("./migrations") .run(&db_connection) diff --git a/crates/server/src/tasks/cleanup.rs b/crates/server/src/tasks/cleanup.rs index e384a56..c0d3461 100644 --- a/crates/server/src/tasks/cleanup.rs +++ b/crates/server/src/tasks/cleanup.rs @@ -17,7 +17,16 @@ impl Job for CleanupTask { const NAME: &'static str = "CleanupTask"; } -pub async fn execute_cleanup(task: CleanupTask, db_connection: Data) { +#[derive(Debug, thiserror::Error)] +pub enum Error { + // TODO + #[error("Cleanup task error")] + CleanupTaskError, +} + +pub async fn execute_cleanup(task: CleanupTask, db_connection: Data) -> Result<(), Error> { let rfc3339_start_time = task.start_time.to_rfc3339(); tracing::info!(start_time = rfc3339_start_time, "Executing cleanup task"); + + Err(Error::CleanupTaskError) } From 3af00a9ebdc87b3377376122db52dd1d56c1c54b Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Wed, 23 Oct 2024 00:16:44 +0200 Subject: [PATCH 3/5] fix: remove unnecessary type annotation --- crates/server/src/main.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index c36e368..81b4f37 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -1,5 +1,5 @@ use apalis::{ - cron::{CronStream, Schedule}, + cron::CronStream, layers::retry::{RetryLayer, RetryPolicy}, prelude::{Monitor, WorkerBuilder, WorkerFactoryFn}, utils::TokioExecutor, @@ -101,7 +101,7 @@ async fn start_background_tasks( cron_expression: String, db_connection: PgPool, ) -> Result<(), StartError> { - let schedule: Schedule = cron_expression + let schedule = cron_expression .parse() .map_err(|e| SchedulerError::CronExpressionError(cron_expression.to_string(), e))?; @@ -109,7 +109,6 @@ async fn start_background_tasks( let worker = WorkerBuilder::new("cleanup-task-worker") .layer(RetryLayer::new(RetryPolicy::retries(5))) - // .layer(TraceLayer::new().make_span_with(ReminderSpan::new())) .stream(CronStream::new(schedule).into_stream()) .data(db_connection) .build_fn(cleanup::execute_cleanup); From a630c301abad96199a8af25f7237cb99d9fb3e89 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sun, 27 Oct 2024 12:35:40 +0100 Subject: [PATCH 4/5] feat: cleanup task --- ...af2f356bdcd0b56e86ca7f1455567a3365a75.json | 14 +++++ Cargo.lock | 1 + crates/server/.env.example | 2 +- crates/server/Cargo.toml | 1 + crates/server/src/main.rs | 8 +-- crates/server/src/tasks/cleanup.rs | 59 ++++++++++++++++--- 6 files changed, 72 insertions(+), 13 deletions(-) create mode 100644 .sqlx/query-8f494358e08f50cd7d9f9de866caf2f356bdcd0b56e86ca7f1455567a3365a75.json diff --git a/.sqlx/query-8f494358e08f50cd7d9f9de866caf2f356bdcd0b56e86ca7f1455567a3365a75.json b/.sqlx/query-8f494358e08f50cd7d9f9de866caf2f356bdcd0b56e86ca7f1455567a3365a75.json new file mode 100644 index 0000000..d21ffe1 --- /dev/null +++ b/.sqlx/query-8f494358e08f50cd7d9f9de866caf2f356bdcd0b56e86ca7f1455567a3365a75.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM share\n WHERE expires_at <= $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "8f494358e08f50cd7d9f9de866caf2f356bdcd0b56e86ca7f1455567a3365a75" +} diff --git a/Cargo.lock b/Cargo.lock index d515e74..2782620 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1164,6 +1164,7 @@ dependencies = [ "chrono", "cron", "http-serde", + "lazy_static", "serde", "sqlx", "thiserror", diff --git a/crates/server/.env.example b/crates/server/.env.example index 8147e3f..d7a5fb3 100644 --- a/crates/server/.env.example +++ b/crates/server/.env.example @@ -2,4 +2,4 @@ PORT=3000 DATABASE_URL=postgres://postgres:password@localhost:5432/db DATABASE_CONNECTIONS=5 MAX_SHARE_EXPIRATION_TIME_SECS=604800 -CLEANUP_TASK_CRON_EXPRESSION="0 0 0 * * * *" +CLEANUP_TASK_CRON_EXPRESSION="0 0 */4 * * * *" diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index e0bae22..79e762c 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -31,3 +31,4 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } apalis = { version = "0.5.5", features = ["cron", "layers"] } tower = { version = "0.5.1" } cron = "0.12.1" +lazy_static = "1.5.0" diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index 81b4f37..6402073 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -52,7 +52,7 @@ async fn main() { ); let cleanup_task_cron_expression = - env::var("CLEANUP_TASK_CRON_EXPRESSION").unwrap_or_else(|_| "0 0 0 * * * *".to_string()); + env::var("CLEANUP_TASK_CRON_EXPRESSION").unwrap_or_else(|_| "0 0 */4 * * * *".to_string()); sqlx::migrate!("./migrations") .run(&db_connection) @@ -105,10 +105,10 @@ async fn start_background_tasks( .parse() .map_err(|e| SchedulerError::CronExpressionError(cron_expression.to_string(), e))?; - tracing::info!(schedule = %schedule, "Starting cleanup task worker"); + tracing::info!(schedule = %schedule, "Started cleanup worker"); - let worker = WorkerBuilder::new("cleanup-task-worker") - .layer(RetryLayer::new(RetryPolicy::retries(5))) + let worker = WorkerBuilder::new("cleanup-worker") + .layer(RetryLayer::new(RetryPolicy::retries(3))) .stream(CronStream::new(schedule).into_stream()) .data(db_connection) .build_fn(cleanup::execute_cleanup); diff --git a/crates/server/src/tasks/cleanup.rs b/crates/server/src/tasks/cleanup.rs index c0d3461..a02809c 100644 --- a/crates/server/src/tasks/cleanup.rs +++ b/crates/server/src/tasks/cleanup.rs @@ -1,6 +1,12 @@ use apalis::prelude::{Data, Job}; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, SecondsFormat, Utc}; +use lazy_static::lazy_static; use sqlx::PgPool; +use tokio::sync::Mutex; + +lazy_static! { + static ref CLEANUP_TASK_LOCK: Mutex<()> = Mutex::new(()); +} #[derive(Default, Debug, Clone)] pub struct CleanupTask { @@ -19,14 +25,51 @@ impl Job for CleanupTask { #[derive(Debug, thiserror::Error)] pub enum Error { - // TODO - #[error("Cleanup task error")] - CleanupTaskError, + #[error("Database error: {0}")] + Database(#[from] sqlx::Error), + #[error("Cleanup task was already running. Skipped execution with timestamp {timestamp}")] + AlreadyRunning { timestamp: String }, } -pub async fn execute_cleanup(task: CleanupTask, db_connection: Data) -> Result<(), Error> { - let rfc3339_start_time = task.start_time.to_rfc3339(); - tracing::info!(start_time = rfc3339_start_time, "Executing cleanup task"); +impl CleanupTask { + async fn cleanup(&self, db_connection: Data) -> Result { + let rfc3339_start_time = self.start_time.to_rfc3339_opts(SecondsFormat::Millis, true); + + let Ok(_lock) = CLEANUP_TASK_LOCK.try_lock() else { + return Err(Error::AlreadyRunning { + timestamp: rfc3339_start_time, + }); + }; + + tracing::info!(timestamp = rfc3339_start_time, "Executing cleanup task"); + + let result = sqlx::query!( + r#" + DELETE FROM share + WHERE expires_at <= $1 + "#, + self.start_time + ) + .execute(&*db_connection) + .await?; - Err(Error::CleanupTaskError) + Ok(result.rows_affected()) + } +} + +pub async fn execute_cleanup(task: CleanupTask, db_connection: Data) -> Result<(), Error> { + match task.cleanup(db_connection).await { + Ok(rows_affected) => { + tracing::info!(deleted_shares = rows_affected, "Cleanup task completed",); + Ok(()) + } + Err(e) => { + tracing::error!(reason = %e,"Error executing cleanup task"); + match e { + // Do not retry if the task is already running + Error::AlreadyRunning { .. } => Ok(()), + _ => Err(e), + } + } + } } From 08aaba89f70d86a2c50766b104245d9e1e72f2ed Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sun, 27 Oct 2024 12:37:45 +0100 Subject: [PATCH 5/5] style: remove trailing comma --- crates/server/src/tasks/cleanup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/server/src/tasks/cleanup.rs b/crates/server/src/tasks/cleanup.rs index a02809c..c03ba34 100644 --- a/crates/server/src/tasks/cleanup.rs +++ b/crates/server/src/tasks/cleanup.rs @@ -60,7 +60,7 @@ impl CleanupTask { pub async fn execute_cleanup(task: CleanupTask, db_connection: Data) -> Result<(), Error> { match task.cleanup(db_connection).await { Ok(rows_affected) => { - tracing::info!(deleted_shares = rows_affected, "Cleanup task completed",); + tracing::info!(deleted_shares = rows_affected, "Cleanup task completed"); Ok(()) } Err(e) => {