Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RepeatedTask adds execute-first-wait-later behavior. #2625

Merged
merged 6 commits into from
Oct 20, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: impl debug for dyn IntervalGenerator trait
  • Loading branch information
paomian committed Oct 19, 2023
commit 5215edc608afb6b7de20b7860de6a668509b700c
4 changes: 2 additions & 2 deletions src/common/greptimedb-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
use std::time::Duration;

use common_runtime::error::{Error, Result};
use common_runtime::{BoxedTaskFunction, ImmediatelyInterval, RepeatedTask, TaskFunction};
use common_runtime::{BoxedTaskFunction, FirstZeroInterval, RepeatedTask, TaskFunction};
use common_telemetry::{debug, info};
use reqwest::{Client, Response};
use serde::{Deserialize, Serialize};
@@ -58,7 +58,7 @@ impl GreptimeDBTelemetryTask {
should_report: Arc<AtomicBool>,
) -> Self {
GreptimeDBTelemetryTask::Enable((
RepeatedTask::new(ImmediatelyInterval::new(interval), task_fn),
RepeatedTask::new(FirstZeroInterval::new(interval), task_fn),
should_report,
))
}
4 changes: 1 addition & 3 deletions src/common/runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -24,7 +24,5 @@ pub use global::{
spawn_read, spawn_write, write_runtime,
};

pub use crate::repeated_task::{
BoxedTaskFunction, ImmediatelyInterval, RepeatedTask, TaskFunction,
};
pub use crate::repeated_task::{BoxedTaskFunction, FirstZeroInterval, RepeatedTask, TaskFunction};
pub use crate::runtime::{Builder, JoinError, JoinHandle, Runtime};
44 changes: 29 additions & 15 deletions src/common/runtime/src/repeated_task.rs
Original file line number Diff line number Diff line change
@@ -41,23 +41,41 @@ pub type BoxedTaskFunction<E> = Box<dyn TaskFunction<E> + Send + Sync + 'static>
struct TaskInner<E> {
/// The repeated task handle. This handle is Some if the task is started.
task_handle: Option<JoinHandle<()>>,

/// The task_fn to run. This is Some if the task is not started.
task_fn: Option<BoxedTaskFunction<E>>,

/// Generates the next interval.
interval_generator: Option<Box<dyn IntervalGenerator>>,
}

pub trait IntervalGenerator: Send + Sync {
/// return the next interval.
paomian marked this conversation as resolved.
Show resolved Hide resolved
fn next(&mut self) -> Duration;
fn is_regular(&self) -> bool {
true

/// return whether the interval is regular and the interval if it is regular.
paomian marked this conversation as resolved.
Show resolved Hide resolved
fn is_regular(&self) -> (bool, Option<Duration>);
}

impl std::fmt::Debug for dyn IntervalGenerator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut binding = f.debug_struct("IntervalGenerator");
let mut builder = binding.field("is_regular", &self.is_regular().0);
if self.is_regular().0 {
builder = builder.field("interval", &self.is_regular().1);
}
builder.finish()
}
}

impl IntervalGenerator for Duration {
fn next(&mut self) -> Duration {
*self
}

fn is_regular(&self) -> (bool, Option<Duration>) {
(true, Some(*self))
}
}

impl From<Duration> for Box<dyn IntervalGenerator> {
@@ -66,12 +84,12 @@ impl From<Duration> for Box<dyn IntervalGenerator> {
}
}

pub struct ImmediatelyInterval {
pub struct FirstZeroInterval {
first: bool,
interval: Duration,
}

impl ImmediatelyInterval {
impl FirstZeroInterval {
pub fn new(interval: Duration) -> Self {
Self {
first: false,
@@ -80,7 +98,7 @@ impl ImmediatelyInterval {
}
}

impl IntervalGenerator for ImmediatelyInterval {
impl IntervalGenerator for FirstZeroInterval {
fn next(&mut self) -> Duration {
if !self.first {
self.first = true;
@@ -90,13 +108,13 @@ impl IntervalGenerator for ImmediatelyInterval {
}
}

fn is_regular(&self) -> bool {
false
fn is_regular(&self) -> (bool, Option<Duration>) {
(false, None)
}
}

impl From<ImmediatelyInterval> for Box<dyn IntervalGenerator> {
fn from(value: ImmediatelyInterval) -> Self {
impl From<FirstZeroInterval> for Box<dyn IntervalGenerator> {
fn from(value: FirstZeroInterval) -> Self {
Box::new(value)
}
}
@@ -163,11 +181,7 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
// Safety: The task is not started.
let mut task_fn = inner.task_fn.take().unwrap();
let interval = interval_generator.next();
let interval_str = if interval_generator.is_regular() {
format!("{:?}", interval)
} else {
"irregular".to_string()
};
let interval_str = format!("{:?}", interval_generator);
// TODO(hl): Maybe spawn to a blocking runtime.
let handle = runtime.spawn(async move {
let sleep = tokio::time::sleep(interval);
@@ -269,7 +283,7 @@ mod tests {

let n = Arc::new(AtomicI32::new(0));
let task_fn = TickTask { n: n.clone() };
let interval = ImmediatelyInterval::new(Duration::from_millis(100));
let interval = FirstZeroInterval::new(Duration::from_millis(100));
let task = RepeatedTask::new(interval, Box::new(task_fn));

task.start(crate::bg_runtime()).unwrap();