Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RepeatedTask adds execute-first-wait-later behavior. #2625

Merged
merged 6 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/common/greptimedb-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use std::time::Duration;

use common_runtime::error::{Error, Result};
use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction};
use common_runtime::{BoxedTaskFunction, FirstZeroInterval, RepeatedTask, TaskFunction};
use common_telemetry::{debug, info};
use reqwest::{Client, Response};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -57,7 +57,10 @@ impl GreptimeDBTelemetryTask {
task_fn: BoxedTaskFunction<Error>,
should_report: Arc<AtomicBool>,
) -> Self {
GreptimeDBTelemetryTask::Enable((RepeatedTask::new(interval, task_fn), should_report))
GreptimeDBTelemetryTask::Enable((
RepeatedTask::new(FirstZeroInterval::new(interval), task_fn),
should_report,
))
}

pub fn disable() -> Self {
Expand Down
2 changes: 1 addition & 1 deletion src/common/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pub use global::{
spawn_read, spawn_write, write_runtime,
};

pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction};
pub use crate::repeated_task::{BoxedTaskFunction, FirstZeroInterval, RepeatedTask, TaskFunction};
pub use crate::runtime::{Builder, JoinError, JoinHandle, Runtime};
120 changes: 110 additions & 10 deletions src/common/runtime/src/repeated_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_error::ext::ErrorExt;
use common_telemetry::logging;
use snafu::{ensure, ResultExt};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;

use crate::error::{IllegalStateSnafu, Result, WaitGcTaskStopSnafu};
Expand All @@ -40,16 +41,89 @@ pub type BoxedTaskFunction<E> = Box<dyn TaskFunction<E> + Send + Sync + 'static>
struct TaskInner<E> {
/// The repeated task handle. This handle is Some if the task is started.
task_handle: Option<JoinHandle<()>>,

/// The task_fn to run. This is Some if the task is not started.
task_fn: Option<BoxedTaskFunction<E>>,

/// Generates the next interval.
interval_generator: Option<Box<dyn IntervalGenerator>>,
}

pub trait IntervalGenerator: Send + Sync {
/// return the next interval.
paomian marked this conversation as resolved.
Show resolved Hide resolved
fn next(&mut self) -> Duration;

/// return whether the interval is regular and the interval if it is regular.
paomian marked this conversation as resolved.
Show resolved Hide resolved
fn is_regular(&self) -> (bool, Option<Duration>);
}

impl std::fmt::Debug for dyn IntervalGenerator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut binding = f.debug_struct("IntervalGenerator");
let mut builder = binding.field("is_regular", &self.is_regular().0);
if self.is_regular().0 {
builder = builder.field("interval", &self.is_regular().1);
}
builder.finish()
}
}

impl IntervalGenerator for Duration {
fn next(&mut self) -> Duration {
*self
}

fn is_regular(&self) -> (bool, Option<Duration>) {
(true, Some(*self))
}
}

impl From<Duration> for Box<dyn IntervalGenerator> {
fn from(value: Duration) -> Self {
Box::new(value)
}
}

pub struct FirstZeroInterval {
first: bool,
interval: Duration,
}

impl FirstZeroInterval {
pub fn new(interval: Duration) -> Self {
Self {
first: false,
interval,
}
}
}

impl IntervalGenerator for FirstZeroInterval {
fn next(&mut self) -> Duration {
if !self.first {
self.first = true;
Duration::ZERO
} else {
self.interval
}
}

fn is_regular(&self) -> (bool, Option<Duration>) {
(false, None)
}
}

impl From<FirstZeroInterval> for Box<dyn IntervalGenerator> {
fn from(value: FirstZeroInterval) -> Self {
Box::new(value)
}
}

pub struct RepeatedTask<E> {
name: String,
cancel_token: CancellationToken,
inner: Mutex<TaskInner<E>>,
started: AtomicBool,
interval: Duration,
}

impl<E> std::fmt::Display for RepeatedTask<E> {
Expand All @@ -75,16 +149,19 @@ impl<E> Drop for RepeatedTask<E> {
}

impl<E: ErrorExt + 'static> RepeatedTask<E> {
pub fn new(interval: Duration, task_fn: BoxedTaskFunction<E>) -> Self {
pub fn new<I: Into<Box<dyn IntervalGenerator>>>(
paomian marked this conversation as resolved.
Show resolved Hide resolved
interval: I,
task_fn: BoxedTaskFunction<E>,
) -> Self {
Self {
name: task_fn.name().to_string(),
cancel_token: CancellationToken::new(),
inner: Mutex::new(TaskInner {
task_handle: None,
task_fn: Some(task_fn),
interval_generator: Some(interval.into()),
}),
started: AtomicBool::new(false),
interval,
}
}

Expand All @@ -99,19 +176,26 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
IllegalStateSnafu { name: &self.name }
);

let interval = self.interval;
let mut interval_generator = inner.interval_generator.take().unwrap();
let child = self.cancel_token.child_token();
// Safety: The task is not started.
let mut task_fn = inner.task_fn.take().unwrap();
let interval = interval_generator.next();
let interval_str = format!("{:?}", interval_generator);
// TODO(hl): Maybe spawn to a blocking runtime.
let handle = runtime.spawn(async move {
let sleep = tokio::time::sleep(interval);

tokio::pin!(sleep);
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = child.cancelled() => {
return;
}
_ = &mut sleep => {
let interval = interval_generator.next();
sleep.as_mut().reset(Instant::now() + interval);
}
_ = child.cancelled() => {
return;
}}
if let Err(e) = task_fn.call().await {
logging::error!(e; "Failed to run repeated task: {}", task_fn.name());
}
Expand All @@ -121,9 +205,9 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
self.started.store(true, Ordering::Relaxed);

logging::debug!(
"Repeated task {} started with interval: {:?}",
"Repeated task {} started with interval: {}",
self.name,
self.interval
interval_str
);

Ok(())
Expand Down Expand Up @@ -192,4 +276,20 @@ mod tests {

assert_eq!(n.load(Ordering::Relaxed), 5);
}

#[tokio::test]
async fn test_repeated_task_prior_exec() {
common_telemetry::init_default_ut_logging();

let n = Arc::new(AtomicI32::new(0));
let task_fn = TickTask { n: n.clone() };
let interval = FirstZeroInterval::new(Duration::from_millis(100));
let task = RepeatedTask::new(interval, Box::new(task_fn));

task.start(crate::bg_runtime()).unwrap();
tokio::time::sleep(Duration::from_millis(550)).await;
task.stop().await.unwrap();

assert_eq!(n.load(Ordering::Relaxed), 6);
}
}
Loading