diff --git a/Cargo.toml b/Cargo.toml index fd10c03..be46be1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,13 @@ serde_json = "1.0.117" serde_qs = "0.13.0" thiserror = "1.0.61" chrono = { version = "0.4.38", features = ["serde"] } -sqlx = { version = "0.7.4", features = ["chrono", "postgres", "json", "macros", "runtime-tokio"], default-features = false } +sqlx = { version = "0.7.4", features = [ + "chrono", + "postgres", + "json", + "macros", + "runtime-tokio", +], default-features = false } getset = "0.1.2" tracing = "0.1.40" once_cell = "1.19.0" @@ -61,13 +67,20 @@ graphile_worker_ctx = { path = "./crates/ctx", version = "0.1.2" } graphile_worker_migrations = { path = "./crates/migrations", version = "0.4.3", default-features = false } graphile_worker_task_handler = { path = "./crates/task_handler", version = "0.4.1" } graphile_worker_shutdown_signal = { path = "./crates/shutdown_signal", version = "0.3.2" } +graphile_worker_extensions = { path = "./crates/extensions", version = "0.1.2" } chrono = { version = "0.4.38", features = ["serde"] } futures = "0.3.30" getset = "0.1.2" num_cpus = "1.16.0" serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" -sqlx = { version = "0.7.4", features = ["postgres", "json", "chrono", "macros", "runtime-tokio"], default-features = false } +sqlx = { version = "0.7.4", features = [ + "postgres", + "json", + "chrono", + "macros", + "runtime-tokio", +], default-features = false } thiserror = "1.0.61" tracing = "0.1.40" tokio = { version = "1.37.0", features = ["macros", "signal"] } diff --git a/crates/ctx/Cargo.toml b/crates/ctx/Cargo.toml index 7f4c7c7..d99ba27 100644 --- a/crates/ctx/Cargo.toml +++ b/crates/ctx/Cargo.toml @@ -13,6 +13,7 @@ readme = "README.md" [dependencies] graphile_worker_job = { path = "../job", version = "0.1.2" } +graphile_worker_extensions = { path = "../extensions", version = "0.1.2" } getset.workspace = true serde_json.workspace = true sqlx.workspace = true diff --git a/crates/ctx/src/lib.rs b/crates/ctx/src/lib.rs index f493bf4..692b2cc 100644 --- a/crates/ctx/src/lib.rs +++ b/crates/ctx/src/lib.rs @@ -1,4 +1,5 @@ use getset::Getters; +use graphile_worker_extensions::ReadOnlyExtensions; use graphile_worker_job::Job; use serde_json::Value; use sqlx::PgPool; @@ -10,15 +11,23 @@ pub struct WorkerContext { pg_pool: PgPool, job: Job, worker_id: String, + extensions: ReadOnlyExtensions, } impl WorkerContext { - pub fn new(payload: Value, pg_pool: PgPool, job: Job, worker_id: String) -> Self { + pub fn new( + payload: Value, + pg_pool: PgPool, + job: Job, + worker_id: String, + extensions: ReadOnlyExtensions, + ) -> Self { WorkerContext { payload, pg_pool, job, worker_id, + extensions, } } } diff --git a/crates/extensions/Cargo.toml b/crates/extensions/Cargo.toml new file mode 100644 index 0000000..5960bfb --- /dev/null +++ b/crates/extensions/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "graphile_worker_extensions" +version = "0.1.2" +edition = "2021" +license-file = "LICENSE.md" +description = "Extensions package for graphile_worker, a high performance Rust/PostgreSQL job queue" +homepage = "https://docs.rs/graphile_worker_crontab_types" +documentation = "https://docs.rs/graphile_worker_crontab_types" +repository = "https://github.com/leo91000/graphile_worker/crates/crontab_types" +keywords = [] +categories = [] +readme = "README.md" diff --git a/crates/extensions/LICENSE.md b/crates/extensions/LICENSE.md new file mode 100644 index 0000000..621ec70 --- /dev/null +++ b/crates/extensions/LICENSE.md @@ -0,0 +1,18 @@ +# The MIT License (MIT) + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the “Software”), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/crates/extensions/README.md b/crates/extensions/README.md new file mode 100644 index 0000000..8a944e5 --- /dev/null +++ b/crates/extensions/README.md @@ -0,0 +1,3 @@ +# Graphile Worker Extensions + +This crate provides Extensions struct and utils for the [Graphile Worker](docs.rs/archimedes) job scheduler diff --git a/crates/extensions/src/lib.rs b/crates/extensions/src/lib.rs new file mode 100644 index 0000000..acaaf67 --- /dev/null +++ b/crates/extensions/src/lib.rs @@ -0,0 +1,419 @@ +use std::{ + any::{Any, TypeId}, + collections::HashMap, + fmt::Debug, + hash::{BuildHasherDefault, Hasher}, + sync::Arc, +}; + +pub(crate) type AnyMap = + HashMap, BuildHasherDefault>; + +// With TypeIds as keys, there's no need to hash them. They are already hashes +// themselves, coming from the compiler. The IdHasher just holds the u64 of +// the TypeId, and then returns it, instead of doing any bit fiddling. +#[derive(Default)] +pub(crate) struct IdHasher(u64); + +impl Hasher for IdHasher { + fn write(&mut self, _: &[u8]) { + unreachable!("TypeId calls write_u64"); + } + + #[inline] + fn write_u64(&mut self, id: u64) { + self.0 = id; + } + + #[inline] + fn finish(&self) -> u64 { + self.0 + } +} + +/// A type map of worker extensions. +/// +/// `Extensions` can be used by worker job to store extra data. +#[derive(Clone, Default, Debug)] +pub struct Extensions { + // If extensions are never used, no need to carry around an empty HashMap. + // That's 3 words. Instead, this is only 1 word. + map: Option>, +} + +impl Extensions { + /// Create an empty `Extensions`. + #[inline] + pub fn new() -> Extensions { + Extensions { map: None } + } + + /// Insert a type into this `Extensions`. + /// + /// If a extension of this type already existed, it will + /// be returned. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext = Extensions::new(); + /// assert!(ext.insert(5i32).is_none()); + /// assert!(ext.insert(4u8).is_none()); + /// assert_eq!(ext.insert(9i32), Some(5i32)); + /// ``` + pub fn insert(&mut self, val: T) -> Option { + self.map + .get_or_insert_with(Box::default) + .insert(TypeId::of::(), Box::new(val)) + .and_then(|boxed| boxed.into_any().downcast().ok().map(|boxed| *boxed)) + } + + /// Get a reference to a type previously inserted on this `Extensions`. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext = Extensions::new(); + /// assert!(ext.get::().is_none()); + /// ext.insert(5i32); + /// + /// assert_eq!(ext.get::(), Some(&5i32)); + /// ``` + pub fn get(&self) -> Option<&T> { + self.map + .as_ref() + .and_then(|map| map.get(&TypeId::of::())) + .and_then(|boxed| (**boxed).as_any().downcast_ref()) + } + + /// Get a mutable reference to a type previously inserted on this `Extensions`. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext = Extensions::new(); + /// ext.insert(String::from("Hello")); + /// ext.get_mut::().unwrap().push_str(" World"); + /// + /// assert_eq!(ext.get::().unwrap(), "Hello World"); + /// ``` + pub fn get_mut(&mut self) -> Option<&mut T> { + self.map + .as_mut() + .and_then(|map| map.get_mut(&TypeId::of::())) + .and_then(|boxed| (**boxed).as_any_mut().downcast_mut()) + } + + /// Get a mutable reference to a type, inserting `value` if not already present on this + /// `Extensions`. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext = Extensions::new(); + /// *ext.get_or_insert(1i32) += 2; + /// + /// assert_eq!(*ext.get::().unwrap(), 3); + /// ``` + pub fn get_or_insert(&mut self, value: T) -> &mut T { + self.get_or_insert_with(|| value) + } + + /// Get a mutable reference to a type, inserting the value created by `f` if not already present + /// on this `Extensions`. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext = Extensions::new(); + /// *ext.get_or_insert_with(|| 1i32) += 2; + /// + /// assert_eq!(*ext.get::().unwrap(), 3); + /// ``` + pub fn get_or_insert_with T>( + &mut self, + f: F, + ) -> &mut T { + let out = self + .map + .get_or_insert_with(Box::default) + .entry(TypeId::of::()) + .or_insert_with(|| Box::new(f())); + (**out).as_any_mut().downcast_mut().unwrap() + } + + /// Get a mutable reference to a type, inserting the type's default value if not already present + /// on this `Extensions`. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext = Extensions::new(); + /// *ext.get_or_insert_default::() += 2; + /// + /// assert_eq!(*ext.get::().unwrap(), 2); + /// ``` + pub fn get_or_insert_default( + &mut self, + ) -> &mut T { + self.get_or_insert_with(T::default) + } + + /// Remove a type from this `Extensions`. + /// + /// If a extension of this type existed, it will be returned. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext = Extensions::new(); + /// ext.insert(5i32); + /// assert_eq!(ext.remove::(), Some(5i32)); + /// assert!(ext.get::().is_none()); + /// ``` + pub fn remove(&mut self) -> Option { + self.map + .as_mut() + .and_then(|map| map.remove(&TypeId::of::())) + .and_then(|boxed| boxed.into_any().downcast().ok().map(|boxed| *boxed)) + } + + /// Clear the `Extensions` of all inserted extensions. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext = Extensions::new(); + /// ext.insert(5i32); + /// ext.clear(); + /// + /// assert!(ext.get::().is_none()); + /// ``` + #[inline] + pub fn clear(&mut self) { + if let Some(ref mut map) = self.map { + map.clear(); + } + } + + /// Check whether the extension set is empty or not. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext = Extensions::new(); + /// assert!(ext.is_empty()); + /// ext.insert(5i32); + /// assert!(!ext.is_empty()); + /// ``` + #[inline] + pub fn is_empty(&self) -> bool { + self.map.as_ref().map_or(true, |map| map.is_empty()) + } + + /// Get the numer of extensions available. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext = Extensions::new(); + /// assert_eq!(ext.len(), 0); + /// ext.insert(5i32); + /// assert_eq!(ext.len(), 1); + /// ``` + #[inline] + pub fn len(&self) -> usize { + self.map.as_ref().map_or(0, |map| map.len()) + } + + /// Extends `self` with another `Extensions`. + /// + /// If an instance of a specific type exists in both, the one in `self` is overwritten with the + /// one from `other`. + /// + /// # Example + /// + /// ``` + /// use graphile_worker_extensions::Extensions; + /// let mut ext_a = Extensions::new(); + /// ext_a.insert(8u8); + /// ext_a.insert(16u16); + /// + /// let mut ext_b = Extensions::new(); + /// ext_b.insert(4u8); + /// ext_b.insert("hello"); + /// + /// ext_a.extend(ext_b); + /// assert_eq!(ext_a.len(), 3); + /// assert_eq!(ext_a.get::(), Some(&4u8)); + /// assert_eq!(ext_a.get::(), Some(&16u16)); + /// assert_eq!(ext_a.get::<&'static str>().copied(), Some("hello")); + /// ``` + pub fn extend(&mut self, other: Self) { + if let Some(other) = other.map { + if let Some(map) = &mut self.map { + map.extend(*other); + } else { + self.map = Some(other); + } + } + } +} + +#[derive(Clone, Debug)] +pub struct ReadOnlyExtensions(Arc); + +impl ReadOnlyExtensions { + pub fn new(ext: Extensions) -> Self { + ReadOnlyExtensions(Arc::new(ext)) + } + + pub fn get(&self) -> Option<&T> { + self.0.get() + } + + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl From for ReadOnlyExtensions { + fn from(ext: Extensions) -> Self { + ReadOnlyExtensions::new(ext) + } +} + +pub trait AnyClone: Any + Debug { + fn clone_box(&self) -> Box; + fn as_any(&self) -> &dyn Any; + fn as_any_mut(&mut self) -> &mut dyn Any; + fn into_any(self: Box) -> Box; +} + +impl AnyClone for T { + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn into_any(self: Box) -> Box { + self + } +} + +impl Clone for Box { + fn clone(&self) -> Self { + (**self).clone_box() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_insert_and_get() { + let mut ext = Extensions::new(); + assert!(ext.insert(5i32).is_none()); + assert_eq!(ext.get::(), Some(&5i32)); + } + + #[test] + fn test_insert_and_get_mut() { + let mut ext = Extensions::new(); + ext.insert(String::from("Hello")); + ext.get_mut::().unwrap().push_str(" World"); + assert_eq!(ext.get::().unwrap(), "Hello World"); + } + + #[test] + fn test_get_or_insert() { + let mut ext = Extensions::new(); + *ext.get_or_insert(1i32) += 2; + assert_eq!(ext.get::(), Some(&3i32)); + } + + #[test] + fn test_get_or_insert_with() { + let mut ext = Extensions::new(); + *ext.get_or_insert_with(|| 1i32) += 2; + assert_eq!(ext.get::(), Some(&3i32)); + } + + #[test] + fn test_get_or_insert_default() { + let mut ext = Extensions::new(); + *ext.get_or_insert_default::() += 2; + assert_eq!(ext.get::(), Some(&2i32)); + } + + #[test] + fn test_remove() { + let mut ext = Extensions::new(); + ext.insert(5i32); + assert_eq!(ext.remove::(), Some(5i32)); + assert!(ext.get::().is_none()); + } + + #[test] + fn test_clear() { + let mut ext = Extensions::new(); + ext.insert(5i32); + ext.clear(); + assert!(ext.get::().is_none()); + } + + #[test] + fn test_is_empty() { + let mut ext = Extensions::new(); + assert!(ext.is_empty()); + ext.insert(5i32); + assert!(!ext.is_empty()); + } + + #[test] + fn test_len() { + let mut ext = Extensions::new(); + assert_eq!(ext.len(), 0); + ext.insert(5i32); + assert_eq!(ext.len(), 1); + } + + #[test] + fn test_extend() { + let mut ext_a = Extensions::new(); + ext_a.insert(8u8); + ext_a.insert(16u16); + + let mut ext_b = Extensions::new(); + ext_b.insert(4u8); + ext_b.insert("hello"); + + ext_a.extend(ext_b); + assert_eq!(ext_a.len(), 3); + assert_eq!(ext_a.get::(), Some(&4u8)); + assert_eq!(ext_a.get::(), Some(&16u16)); + assert_eq!(ext_a.get::<&'static str>().copied(), Some("hello")); + } +} diff --git a/examples/app_state.rs b/examples/app_state.rs new file mode 100644 index 0000000..9027693 --- /dev/null +++ b/examples/app_state.rs @@ -0,0 +1,85 @@ +use graphile_worker::WorkerContext; +use graphile_worker::WorkerOptions; +use std::str::FromStr; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::Arc; + +use graphile_worker_task_handler::TaskHandler; +use serde::{Deserialize, Serialize}; +use sqlx::postgres::PgConnectOptions; +use tracing_subscriber::{ + filter::EnvFilter, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, +}; + +fn enable_logs() { + let fmt_layer = tracing_subscriber::fmt::layer(); + // Log level set to debug except for sqlx set at warn (to not show all sql requests) + let filter_layer = EnvFilter::try_new("debug,sqlx=warn").unwrap(); + + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer) + .init(); +} + +#[derive(Clone, Debug)] +struct AppState { + run_count: Arc, +} + +#[derive(Deserialize, Serialize)] +struct ShowRunCount; + +impl TaskHandler for ShowRunCount { + const IDENTIFIER: &'static str = "show_run_count"; + + async fn run(self, ctx: WorkerContext) -> Result<(), String> { + let app_state = ctx.extensions().get::().unwrap(); + let run_count = app_state.run_count.fetch_add(1, SeqCst); + println!("Run count: {run_count}"); + Ok(()) + } +} + +#[tokio::main] +async fn main() { + enable_logs(); + + let pg_options = PgConnectOptions::from_str("postgres://postgres:root@localhost:5432").unwrap(); + + let pg_pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect_with(pg_options) + .await + .unwrap(); + + let worker = WorkerOptions::default() + .concurrency(10) + .schema("example_simple_worker") + .define_job::() + .pg_pool(pg_pool) + .add_extension(AppState { + run_count: Arc::new(AtomicUsize::new(0)), + }) + .with_crontab( + r#" + * * * * * show_run_count ?fill=1h + "#, + ) + .expect("Failed to parse crontab") + .init() + .await + .unwrap(); + + let utils = worker.create_utils(); + + for _ in 0..10 { + utils + .add_job(ShowRunCount, Default::default()) + .await + .unwrap(); + } + + worker.run().await.unwrap(); +} diff --git a/src/builder.rs b/src/builder.rs index aa886b8..0925897 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -6,6 +6,7 @@ use futures::FutureExt; use graphile_worker_crontab_parser::{parse_crontab, CrontabParseError}; use graphile_worker_crontab_types::Crontab; use graphile_worker_ctx::WorkerContext; +use graphile_worker_extensions::Extensions; use graphile_worker_migrations::migrate; use graphile_worker_shutdown_signal::shutdown_signal; use graphile_worker_task_handler::TaskHandler; @@ -29,6 +30,7 @@ pub struct WorkerOptions { forbidden_flags: Vec, crontabs: Option>, use_local_time: bool, + extensions: Extensions, } #[derive(Error, Debug)] @@ -88,6 +90,7 @@ impl WorkerOptions { crontabs: self.crontabs.unwrap_or_default(), use_local_time: self.use_local_time, shutdown_signal: shutdown_signal(), + extensions: self.extensions.into(), }; Ok(worker) @@ -156,4 +159,9 @@ impl WorkerOptions { self.use_local_time = value; self } + + pub fn add_extension(mut self, value: T) -> Self { + self.extensions.insert(value); + self + } } diff --git a/src/runner.rs b/src/runner.rs index a893f57..5a357e6 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -13,6 +13,7 @@ use getset::Getters; use graphile_worker_crontab_runner::{cron_main, ScheduleCronJobError}; use graphile_worker_crontab_types::Crontab; use graphile_worker_ctx::WorkerContext; +use graphile_worker_extensions::ReadOnlyExtensions; use graphile_worker_job::Job; use graphile_worker_shutdown_signal::ShutdownSignal; use thiserror::Error; @@ -39,6 +40,7 @@ pub struct Worker { pub(crate) crontabs: Vec, pub(crate) use_local_time: bool, pub(crate) shutdown_signal: ShutdownSignal, + pub(crate) extensions: ReadOnlyExtensions, } #[derive(Error, Debug)] @@ -251,6 +253,7 @@ async fn run_job(job: &Job, worker: &Worker, source: &StreamSource) -> Result<() worker.pg_pool().clone(), job.clone(), worker.worker_id().clone(), + worker.extensions().clone(), ); let task_fut = task_fn(worker_ctx);