Skip to content

Commit

Permalink
chore: some improve by pr comment
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Oct 20, 2023
1 parent 274a072 commit 32538d8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/common/greptimedb-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl GreptimeDBTelemetryTask {
should_report: Arc<AtomicBool>,
) -> Self {
GreptimeDBTelemetryTask::Enable((
RepeatedTask::new(interval, task_fn).with_initial_delay(Duration::from_secs(0)),
RepeatedTask::new(interval, task_fn).with_initial_delay(Some(Duration::ZERO)),
should_report,
))
}
Expand Down
19 changes: 12 additions & 7 deletions src/common/runtime/src/repeated_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ impl<E> Drop for RepeatedTask<E> {
}

impl<E: ErrorExt + 'static> RepeatedTask<E> {
/// Creates a new repeated task. The `initial_delay` is the delay before the first execution.
/// `initial_delay` default is None, the initial interval uses the `interval`.
/// You can use `with_initial_delay` to set the `initial_delay`.
pub fn new(interval: Duration, task_fn: BoxedTaskFunction<E>) -> Self {
Self {
name: task_fn.name().to_string(),
Expand All @@ -91,8 +94,8 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
}
}

pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
self.initial_delay = Some(initial_delay);
pub fn with_initial_delay(mut self, initial_delay: Option<Duration>) -> Self {
self.initial_delay = initial_delay;
self
}

Expand All @@ -116,10 +119,12 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
let handle = runtime.spawn(async move {
loop {
let sleep_time = initial_delay.take().unwrap_or(interval);
tokio::select! {
_ = tokio::time::sleep(sleep_time) => {}
_ = child.cancelled() => {
return;
if sleep_time > Duration::ZERO {
tokio::select! {
_ = tokio::time::sleep(sleep_time) => {}
_ = child.cancelled() => {
return;
}
}
}
if let Err(e) = task_fn.call().await {
Expand Down Expand Up @@ -211,7 +216,7 @@ mod tests {
let task_fn = TickTask { n: n.clone() };

let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn))
.with_initial_delay(Duration::from_secs(0));
.with_initial_delay(Some(Duration::ZERO));

task.start(crate::bg_runtime()).unwrap();
tokio::time::sleep(Duration::from_millis(550)).await;
Expand Down

0 comments on commit 32538d8

Please sign in to comment.