Skip to content

Commit

Permalink
feat: RepeatedTask adds execute-first-wait-later behavior.
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Oct 19, 2023
1 parent ba15c14 commit 65756de
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
5 changes: 4 additions & 1 deletion src/common/greptimedb-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ impl GreptimeDBTelemetryTask {
task_fn: BoxedTaskFunction<Error>,
should_report: Arc<AtomicBool>,
) -> Self {
GreptimeDBTelemetryTask::Enable((RepeatedTask::new(interval, task_fn), should_report))
GreptimeDBTelemetryTask::Enable((
RepeatedTask::new_prior_exec(interval, task_fn),
should_report,
))
}

pub fn disable() -> Self {
Expand Down
58 changes: 51 additions & 7 deletions src/common/runtime/src/repeated_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct RepeatedTask<E> {
inner: Mutex<TaskInner<E>>,
started: AtomicBool,
interval: Duration,
prior_exec: bool,
}

impl<E> std::fmt::Display for RepeatedTask<E> {
Expand Down Expand Up @@ -85,13 +86,37 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
}),
started: AtomicBool::new(false),
interval,
prior_exec: false,
}
}

pub fn new_prior_exec(interval: Duration, task_fn: BoxedTaskFunction<E>) -> Self {
let mut task = Self::new(interval, task_fn);
task.set_prior_exec(true);
task
}

fn set_prior_exec(&mut self, prior_exec: bool) {
self.prior_exec = prior_exec;
}

pub fn started(&self) -> bool {
self.started.load(Ordering::Relaxed)
}

async fn sleep_or_canceled(interval: &Duration, child: &CancellationToken) -> bool {
tokio::select! {
_ = tokio::time::sleep(*interval) => { false }
_ = child.cancelled() => { true }
}
}

async fn run(task_fn: &mut Box<dyn TaskFunction<E> + Send + Sync>) {
if let Err(e) = task_fn.call().await {
logging::error!(e; "Failed to run repeated task: {}", task_fn.name());
}
}

pub fn start(&self, runtime: Runtime) -> Result<()> {
let mut inner = self.inner.lock().unwrap();
ensure!(
Expand All @@ -103,17 +128,20 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
let child = self.cancel_token.child_token();
// Safety: The task is not started.
let mut task_fn = inner.task_fn.take().unwrap();
let prior_exec = self.prior_exec;
// TODO(hl): Maybe spawn to a blocking runtime.
let handle = runtime.spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = child.cancelled() => {
return;
if prior_exec {
Self::run(&mut task_fn).await;
if Self::sleep_or_canceled(&interval, &child).await {
break;
}
}
if let Err(e) = task_fn.call().await {
logging::error!(e; "Failed to run repeated task: {}", task_fn.name());
} else {
if Self::sleep_or_canceled(&interval, &child).await {
break;
}
Self::run(&mut task_fn).await;
}
}
});
Expand Down Expand Up @@ -192,4 +220,20 @@ mod tests {

assert_eq!(n.load(Ordering::Relaxed), 5);
}

#[tokio::test]
async fn test_repeated_task_prior_exec() {
common_telemetry::init_default_ut_logging();

let n = Arc::new(AtomicI32::new(0));
let task_fn = TickTask { n: n.clone() };

let task = RepeatedTask::new_prior_exec(Duration::from_millis(100), Box::new(task_fn));

task.start(crate::bg_runtime()).unwrap();
tokio::time::sleep(Duration::from_millis(550)).await;
task.stop().await.unwrap();

assert_eq!(n.load(Ordering::Relaxed), 6);
}
}

0 comments on commit 65756de

Please sign in to comment.