Skip to content

Commit

Permalink
chore: instead of complicated way, we add an initial_delay to control…
Browse files Browse the repository at this point in the history
… task interval
  • Loading branch information
paomian committed Oct 20, 2023
1 parent 848f120 commit 8b09341
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 98 deletions.
4 changes: 2 additions & 2 deletions src/common/greptimedb-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use std::time::Duration;

use common_runtime::error::{Error, Result};
use common_runtime::{BoxedTaskFunction, FirstZeroInterval, RepeatedTask, TaskFunction};
use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction};
use common_telemetry::{debug, info};
use reqwest::{Client, Response};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -58,7 +58,7 @@ impl GreptimeDBTelemetryTask {
should_report: Arc<AtomicBool>,
) -> Self {
GreptimeDBTelemetryTask::Enable((
RepeatedTask::new(FirstZeroInterval::new(interval), task_fn),
RepeatedTask::new(interval, task_fn).with_initial_delay(Duration::from_secs(0)),
should_report,
))
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pub use global::{
spawn_read, spawn_write, write_runtime,
};

pub use crate::repeated_task::{BoxedTaskFunction, FirstZeroInterval, RepeatedTask, TaskFunction};
pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction};
pub use crate::runtime::{Builder, JoinError, JoinHandle, Runtime};
117 changes: 22 additions & 95 deletions src/common/runtime/src/repeated_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use common_error::ext::ErrorExt;
use common_telemetry::logging;
use snafu::{ensure, ResultExt};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;

use crate::error::{IllegalStateSnafu, Result, WaitGcTaskStopSnafu};
Expand All @@ -44,86 +43,15 @@ struct TaskInner<E> {

/// The task_fn to run. This is Some if the task is not started.
task_fn: Option<BoxedTaskFunction<E>>,

/// Generates the next interval.
interval_generator: Option<Box<dyn IntervalGenerator>>,
}

pub trait IntervalGenerator: Send + Sync {
/// Returns the next interval.
fn next(&mut self) -> Duration;

/// Returns whether the interval is regular and the interval if it is regular.
fn is_regular(&self) -> (bool, Option<Duration>);
}

impl std::fmt::Debug for dyn IntervalGenerator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut binding = f.debug_struct("IntervalGenerator");
let mut builder = binding.field("is_regular", &self.is_regular().0);
if self.is_regular().0 {
builder = builder.field("interval", &self.is_regular().1);
}
builder.finish()
}
}

impl IntervalGenerator for Duration {
fn next(&mut self) -> Duration {
*self
}

fn is_regular(&self) -> (bool, Option<Duration>) {
(true, Some(*self))
}
}

impl From<Duration> for Box<dyn IntervalGenerator> {
fn from(value: Duration) -> Self {
Box::new(value)
}
}

pub struct FirstZeroInterval {
first: bool,
interval: Duration,
}

impl FirstZeroInterval {
pub fn new(interval: Duration) -> Self {
Self {
first: false,
interval,
}
}
}

impl IntervalGenerator for FirstZeroInterval {
fn next(&mut self) -> Duration {
if !self.first {
self.first = true;
Duration::ZERO
} else {
self.interval
}
}

fn is_regular(&self) -> (bool, Option<Duration>) {
(false, None)
}
}

impl From<FirstZeroInterval> for Box<dyn IntervalGenerator> {
fn from(value: FirstZeroInterval) -> Self {
Box::new(value)
}
}

pub struct RepeatedTask<E> {
name: String,
cancel_token: CancellationToken,
inner: Mutex<TaskInner<E>>,
started: AtomicBool,
interval: Duration,
initial_delay: Option<Duration>,
}

impl<E> std::fmt::Display for RepeatedTask<E> {
Expand All @@ -149,22 +77,25 @@ impl<E> Drop for RepeatedTask<E> {
}

impl<E: ErrorExt + 'static> RepeatedTask<E> {
pub fn new<I: Into<Box<dyn IntervalGenerator>>>(
interval: I,
task_fn: BoxedTaskFunction<E>,
) -> Self {
pub fn new(interval: Duration, task_fn: BoxedTaskFunction<E>) -> Self {
Self {
name: task_fn.name().to_string(),
cancel_token: CancellationToken::new(),
inner: Mutex::new(TaskInner {
task_handle: None,
task_fn: Some(task_fn),
interval_generator: Some(interval.into()),
}),
started: AtomicBool::new(false),
interval,
initial_delay: Some(interval),
}
}

pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
self.initial_delay = Some(initial_delay);
self
}

pub fn started(&self) -> bool {
self.started.load(Ordering::Relaxed)
}
Expand All @@ -176,26 +107,21 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
IllegalStateSnafu { name: &self.name }
);

let mut interval_generator = inner.interval_generator.take().unwrap();
let child = self.cancel_token.child_token();
// Safety: The task is not started.
let mut task_fn = inner.task_fn.take().unwrap();
let interval = interval_generator.next();
let interval_str = format!("{:?}", interval_generator);
let interval = self.interval;
let mut initial_delay = self.initial_delay;
// TODO(hl): Maybe spawn to a blocking runtime.
let handle = runtime.spawn(async move {
let sleep = tokio::time::sleep(interval);

tokio::pin!(sleep);
loop {
let sleep_time = initial_delay.take().unwrap_or(interval);
tokio::select! {
_ = &mut sleep => {
let interval = interval_generator.next();
sleep.as_mut().reset(Instant::now() + interval);
_ = tokio::time::sleep(sleep_time) => {}
_ = child.cancelled() => {
return;
}
}
_ = child.cancelled() => {
return;
}}
if let Err(e) = task_fn.call().await {
logging::error!(e; "Failed to run repeated task: {}", task_fn.name());
}
Expand All @@ -205,9 +131,9 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
self.started.store(true, Ordering::Relaxed);

logging::debug!(
"Repeated task {} started with interval: {}",
"Repeated task {} started with interval: {:?}",
self.name,
interval_str
self.interval
);

Ok(())
Expand Down Expand Up @@ -283,8 +209,9 @@ mod tests {

let n = Arc::new(AtomicI32::new(0));
let task_fn = TickTask { n: n.clone() };
let interval = FirstZeroInterval::new(Duration::from_millis(100));
let task = RepeatedTask::new(interval, Box::new(task_fn));

let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn))
.with_initial_delay(Duration::from_secs(0));

task.start(crate::bg_runtime()).unwrap();
tokio::time::sleep(Duration::from_millis(550)).await;
Expand Down

0 comments on commit 8b09341

Please sign in to comment.