From 8b0934130f7163fccae11bb49136f99b5606e12a Mon Sep 17 00:00:00 2001 From: paomian <xpaomian@gmail.com> Date: Fri, 20 Oct 2023 16:54:39 +0800 Subject: [PATCH] chore: instead of complicated way, we add an initial_delay to control task interval --- src/common/greptimedb-telemetry/src/lib.rs | 4 +- src/common/runtime/src/lib.rs | 2 +- src/common/runtime/src/repeated_task.rs | 117 ++++----------------- 3 files changed, 25 insertions(+), 98 deletions(-) diff --git a/src/common/greptimedb-telemetry/src/lib.rs b/src/common/greptimedb-telemetry/src/lib.rs index 1f2f652b91bc..21bc4c5cd079 100644 --- a/src/common/greptimedb-telemetry/src/lib.rs +++ b/src/common/greptimedb-telemetry/src/lib.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::time::Duration; use common_runtime::error::{Error, Result}; -use common_runtime::{BoxedTaskFunction, FirstZeroInterval, RepeatedTask, TaskFunction}; +use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction}; use common_telemetry::{debug, info}; use reqwest::{Client, Response}; use serde::{Deserialize, Serialize}; @@ -58,7 +58,7 @@ impl GreptimeDBTelemetryTask { should_report: Arc<AtomicBool>, ) -> Self { GreptimeDBTelemetryTask::Enable(( - RepeatedTask::new(FirstZeroInterval::new(interval), task_fn), + RepeatedTask::new(interval, task_fn).with_initial_delay(Duration::from_secs(0)), should_report, )) } diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index 7ab4cedc5456..08baed46cbd3 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -24,5 +24,5 @@ pub use global::{ spawn_read, spawn_write, write_runtime, }; -pub use crate::repeated_task::{BoxedTaskFunction, FirstZeroInterval, RepeatedTask, TaskFunction}; +pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction}; pub use crate::runtime::{Builder, JoinError, JoinHandle, Runtime}; diff --git a/src/common/runtime/src/repeated_task.rs b/src/common/runtime/src/repeated_task.rs index d2e3fa45d6bd..665e6d602817 100644 --- a/src/common/runtime/src/repeated_task.rs +++ b/src/common/runtime/src/repeated_task.rs @@ -20,7 +20,6 @@ use common_error::ext::ErrorExt; use common_telemetry::logging; use snafu::{ensure, ResultExt}; use tokio::task::JoinHandle; -use tokio::time::Instant; use tokio_util::sync::CancellationToken; use crate::error::{IllegalStateSnafu, Result, WaitGcTaskStopSnafu}; @@ -44,79 +43,6 @@ struct TaskInner<E> { /// The task_fn to run. This is Some if the task is not started. task_fn: Option<BoxedTaskFunction<E>>, - - /// Generates the next interval. - interval_generator: Option<Box<dyn IntervalGenerator>>, -} - -pub trait IntervalGenerator: Send + Sync { - /// Returns the next interval. - fn next(&mut self) -> Duration; - - /// Returns whether the interval is regular and the interval if it is regular. - fn is_regular(&self) -> (bool, Option<Duration>); -} - -impl std::fmt::Debug for dyn IntervalGenerator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut binding = f.debug_struct("IntervalGenerator"); - let mut builder = binding.field("is_regular", &self.is_regular().0); - if self.is_regular().0 { - builder = builder.field("interval", &self.is_regular().1); - } - builder.finish() - } -} - -impl IntervalGenerator for Duration { - fn next(&mut self) -> Duration { - *self - } - - fn is_regular(&self) -> (bool, Option<Duration>) { - (true, Some(*self)) - } -} - -impl From<Duration> for Box<dyn IntervalGenerator> { - fn from(value: Duration) -> Self { - Box::new(value) - } -} - -pub struct FirstZeroInterval { - first: bool, - interval: Duration, -} - -impl FirstZeroInterval { - pub fn new(interval: Duration) -> Self { - Self { - first: false, - interval, - } - } -} - -impl IntervalGenerator for FirstZeroInterval { - fn next(&mut self) -> Duration { - if !self.first { - self.first = true; - Duration::ZERO - } else { - self.interval - } - } - - fn is_regular(&self) -> (bool, Option<Duration>) { - (false, None) - } -} - -impl From<FirstZeroInterval> for Box<dyn IntervalGenerator> { - fn from(value: FirstZeroInterval) -> Self { - Box::new(value) - } } pub struct RepeatedTask<E> { @@ -124,6 +50,8 @@ pub struct RepeatedTask<E> { cancel_token: CancellationToken, inner: Mutex<TaskInner<E>>, started: AtomicBool, + interval: Duration, + initial_delay: Option<Duration>, } impl<E> std::fmt::Display for RepeatedTask<E> { @@ -149,22 +77,25 @@ impl<E> Drop for RepeatedTask<E> { } impl<E: ErrorExt + 'static> RepeatedTask<E> { - pub fn new<I: Into<Box<dyn IntervalGenerator>>>( - interval: I, - task_fn: BoxedTaskFunction<E>, - ) -> Self { + pub fn new(interval: Duration, task_fn: BoxedTaskFunction<E>) -> Self { Self { name: task_fn.name().to_string(), cancel_token: CancellationToken::new(), inner: Mutex::new(TaskInner { task_handle: None, task_fn: Some(task_fn), - interval_generator: Some(interval.into()), }), started: AtomicBool::new(false), + interval, + initial_delay: Some(interval), } } + pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self { + self.initial_delay = Some(initial_delay); + self + } + pub fn started(&self) -> bool { self.started.load(Ordering::Relaxed) } @@ -176,26 +107,21 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> { IllegalStateSnafu { name: &self.name } ); - let mut interval_generator = inner.interval_generator.take().unwrap(); let child = self.cancel_token.child_token(); // Safety: The task is not started. let mut task_fn = inner.task_fn.take().unwrap(); - let interval = interval_generator.next(); - let interval_str = format!("{:?}", interval_generator); + let interval = self.interval; + let mut initial_delay = self.initial_delay; // TODO(hl): Maybe spawn to a blocking runtime. let handle = runtime.spawn(async move { - let sleep = tokio::time::sleep(interval); - - tokio::pin!(sleep); loop { + let sleep_time = initial_delay.take().unwrap_or(interval); tokio::select! { - _ = &mut sleep => { - let interval = interval_generator.next(); - sleep.as_mut().reset(Instant::now() + interval); + _ = tokio::time::sleep(sleep_time) => {} + _ = child.cancelled() => { + return; + } } - _ = child.cancelled() => { - return; - }} if let Err(e) = task_fn.call().await { logging::error!(e; "Failed to run repeated task: {}", task_fn.name()); } @@ -205,9 +131,9 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> { self.started.store(true, Ordering::Relaxed); logging::debug!( - "Repeated task {} started with interval: {}", + "Repeated task {} started with interval: {:?}", self.name, - interval_str + self.interval ); Ok(()) @@ -283,8 +209,9 @@ mod tests { let n = Arc::new(AtomicI32::new(0)); let task_fn = TickTask { n: n.clone() }; - let interval = FirstZeroInterval::new(Duration::from_millis(100)); - let task = RepeatedTask::new(interval, Box::new(task_fn)); + + let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn)) + .with_initial_delay(Duration::from_secs(0)); task.start(crate::bg_runtime()).unwrap(); tokio::time::sleep(Duration::from_millis(550)).await;