From 89ebe47cd9aa3ca9a68c33decba1405629019e2b Mon Sep 17 00:00:00 2001 From: localhost Date: Fri, 20 Oct 2023 17:43:45 +0800 Subject: [PATCH] feat: RepeatedTask adds execute-first-wait-later behavior. (#2625) * feat: RepeatedTask adds execute-first-wait-later behavior. * feat: add inverval generator for repeate task component * feat: impl debug for dyn IntervalGenerator trait * chore: change some words * chore: instead of complicated way, we add an initial_delay to control task interval * chore: some improve by pr comment --- src/common/greptimedb-telemetry/src/lib.rs | 5 ++- src/common/runtime/src/repeated_task.rs | 42 +++++++++++++++++++--- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/common/greptimedb-telemetry/src/lib.rs b/src/common/greptimedb-telemetry/src/lib.rs index 37b86c642544..42e367652153 100644 --- a/src/common/greptimedb-telemetry/src/lib.rs +++ b/src/common/greptimedb-telemetry/src/lib.rs @@ -57,7 +57,10 @@ impl GreptimeDBTelemetryTask { task_fn: BoxedTaskFunction, should_report: Arc, ) -> Self { - GreptimeDBTelemetryTask::Enable((RepeatedTask::new(interval, task_fn), should_report)) + GreptimeDBTelemetryTask::Enable(( + RepeatedTask::new(interval, task_fn).with_initial_delay(Some(Duration::ZERO)), + should_report, + )) } pub fn disable() -> Self { diff --git a/src/common/runtime/src/repeated_task.rs b/src/common/runtime/src/repeated_task.rs index b3dcc781f1bd..a4f2bde8b00a 100644 --- a/src/common/runtime/src/repeated_task.rs +++ b/src/common/runtime/src/repeated_task.rs @@ -40,6 +40,7 @@ pub type BoxedTaskFunction = Box + Send + Sync + 'static> struct TaskInner { /// The repeated task handle. This handle is Some if the task is started. task_handle: Option>, + /// The task_fn to run. This is Some if the task is not started. task_fn: Option>, } @@ -50,6 +51,7 @@ pub struct RepeatedTask { inner: Mutex>, started: AtomicBool, interval: Duration, + initial_delay: Option, } impl std::fmt::Display for RepeatedTask { @@ -75,6 +77,9 @@ impl Drop for RepeatedTask { } impl RepeatedTask { + /// Creates a new repeated task. The `initial_delay` is the delay before the first execution. + /// `initial_delay` default is None, the initial interval uses the `interval`. + /// You can use `with_initial_delay` to set the `initial_delay`. pub fn new(interval: Duration, task_fn: BoxedTaskFunction) -> Self { Self { name: task_fn.name().to_string(), @@ -85,9 +90,15 @@ impl RepeatedTask { }), started: AtomicBool::new(false), interval, + initial_delay: None, } } + pub fn with_initial_delay(mut self, initial_delay: Option) -> Self { + self.initial_delay = initial_delay; + self + } + pub fn started(&self) -> bool { self.started.load(Ordering::Relaxed) } @@ -99,17 +110,21 @@ impl RepeatedTask { IllegalStateSnafu { name: &self.name } ); - let interval = self.interval; let child = self.cancel_token.child_token(); // Safety: The task is not started. let mut task_fn = inner.task_fn.take().unwrap(); + let interval = self.interval; + let mut initial_delay = self.initial_delay; // TODO(hl): Maybe spawn to a blocking runtime. let handle = runtime.spawn(async move { loop { - tokio::select! { - _ = tokio::time::sleep(interval) => {} - _ = child.cancelled() => { - return; + let sleep_time = initial_delay.take().unwrap_or(interval); + if sleep_time > Duration::ZERO { + tokio::select! { + _ = tokio::time::sleep(sleep_time) => {} + _ = child.cancelled() => { + return; + } } } if let Err(e) = task_fn.call().await { @@ -192,4 +207,21 @@ mod tests { assert_eq!(n.load(Ordering::Relaxed), 5); } + + #[tokio::test] + async fn test_repeated_task_prior_exec() { + common_telemetry::init_default_ut_logging(); + + let n = Arc::new(AtomicI32::new(0)); + let task_fn = TickTask { n: n.clone() }; + + let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn)) + .with_initial_delay(Some(Duration::ZERO)); + + task.start(crate::bg_runtime()).unwrap(); + tokio::time::sleep(Duration::from_millis(550)).await; + task.stop().await.unwrap(); + + assert_eq!(n.load(Ordering::Relaxed), 6); + } }