Skip to content

Commit

Permalink
refactor: remove redundant code
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 26, 2023
1 parent 310da43 commit eb285f8
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 19 deletions.
2 changes: 0 additions & 2 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ use crate::metrics;
use crate::region_server::RegionServer;

pub(crate) mod handler;
//TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod task_tracker;

pub struct HeartbeatTask {
Expand Down
22 changes: 5 additions & 17 deletions src/datanode/src/heartbeat/task_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use common_runtime::JoinHandle;
use futures_util::future::BoxFuture;
use snafu::ResultExt;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -55,7 +54,6 @@ pub async fn wait<T: Send + Sync + Clone>(watcher: &mut TaskWatcher<T>) -> Resul

/// The running async task.
pub(crate) struct Task<T: Send + Sync + Clone> {
handle: JoinHandle<()>,
watcher: TaskWatcher<T>,
}

Expand Down Expand Up @@ -93,11 +91,13 @@ impl<T: Send + Sync + Clone> RegisterResult<T> {
}
}

#[cfg(test)]
/// Returns true if it's [RegisterResult::Busy].
pub(crate) fn is_busy(&self) -> bool {
matches!(self, RegisterResult::Busy(_))
}

#[cfg(test)]
/// Returns true if it's [RegisterResult::Running].
pub(crate) fn is_running(&self) -> bool {
matches!(self, RegisterResult::Running(_))
Expand All @@ -111,11 +111,13 @@ pub(crate) enum WaitResult<T> {
}

impl<T> WaitResult<T> {
#[cfg(test)]
/// Returns true if it's [WaitResult::Timeout].
pub(crate) fn is_timeout(&self) -> bool {
matches!(self, WaitResult::Timeout)
}

#[cfg(test)]
/// Into the [WaitResult::Timeout] if it's.
pub(crate) fn into_finish(self) -> Option<Result<T>> {
match self {
Expand Down Expand Up @@ -151,19 +153,6 @@ impl<T: Send + Sync + Clone + 'static> TaskTracker<T> {
}
}

/// Waits for an specified region async task.
pub(crate) async fn wait_with_region(
&self,
region_id: RegionId,
timeout: Duration,
) -> Option<WaitResult<T>> {
let inner = self.inner.read().await;
match inner.state.get(&region_id) {
Some(task) => Some(self.wait(&mut task.watcher.clone(), timeout).await),
None => None,
}
}

/// Tries to register a new async task, returns [RegisterResult::Busy] if previous task is running.
pub(crate) async fn try_register(
&self,
Expand All @@ -177,7 +166,7 @@ impl<T: Send + Sync + Clone + 'static> TaskTracker<T> {
let moved_inner = self.inner.clone();
let (tx, rx) = watch::channel(TaskState::<T>::Running);

let handle = common_runtime::spawn_bg(async move {
common_runtime::spawn_bg(async move {
match fut.await {
Ok(result) => {
let _ = tx.send(TaskState::Done(result));
Expand All @@ -193,7 +182,6 @@ impl<T: Send + Sync + Clone + 'static> TaskTracker<T> {
inner.state.insert(
region_id,
Task {
handle,
watcher: rx.clone(),
},
);
Expand Down

0 comments on commit eb285f8

Please sign in to comment.