Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Oct 13, 2023
1 parent e201f83 commit 3b59782
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
1 change: 0 additions & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ impl<T: Serialize + DeserializeOwned> DeserializedValueWithBytes<T> {
self.bytes.to_vec()
}

#[cfg(feature = "testing")]
/// Notes: used for test purpose.
pub fn from_inner(inner: T) -> Self {
let bytes = serde_json::to_vec(&inner).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum Error {
LoaderConflict { name: String, location: Location },

#[snafu(display("Procedure Manager is stopped"))]
ProcedureManagerStop { location: Location },
ProcedureManagerNotStart { location: Location },

#[snafu(display("Failed to serialize to json"))]
ToJson {
Expand Down Expand Up @@ -152,7 +152,7 @@ impl ErrorExt for Error {
| Error::RetryTimesExceeded { .. }
| Error::RetryLater { .. }
| Error::WaitWatcher { .. }
| Error::ProcedureManagerStop { .. } => StatusCode::Internal,
| Error::ProcedureManagerNotStart { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}
Expand Down
18 changes: 10 additions & 8 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::logging;
use common_telemetry::{info, logging};
use snafu::{ensure, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::Notify;

use crate::error::{
DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ProcedureManagerStopSnafu, Result,
DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ProcedureManagerNotStartSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
};
use crate::local::lock::LockMap;
Expand Down Expand Up @@ -427,7 +427,7 @@ impl LocalManager {
) -> Result<Watcher> {
ensure!(
self.running.load(Ordering::Relaxed),
ProcedureManagerStopSnafu
ProcedureManagerNotStartSnafu
);

let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key()));
Expand Down Expand Up @@ -473,7 +473,8 @@ impl ProcedureManager for LocalManager {
}

fn start(&self) -> Result<()> {
if !self.running.load(Ordering::Relaxed) {
// The previous value should be false
if !self.running.swap(true, Ordering::Relaxed) {
let mut state = self.state.lock().unwrap();

let task = state
Expand All @@ -484,14 +485,15 @@ impl ProcedureManager for LocalManager {
.context(StartRemoveOutdatedMetaTaskSnafu)?;

self.running.store(true, Ordering::Relaxed);
logging::info!("LocalManager is started.");
info!("LocalManager is started.");
}

Ok(())
}

async fn stop(&self) -> Result<()> {
if self.running.load(Ordering::Relaxed) {
// The previous value should be true
if self.running.swap(false, Ordering::Relaxed) {
let remove_outdated_meta_task =
self.state.lock().unwrap().remove_outdated_meta_task.take();

Expand All @@ -500,7 +502,7 @@ impl ProcedureManager for LocalManager {
}

self.running.store(false, Ordering::Relaxed);
logging::info!("LocalManager is stopped.");
info!("LocalManager is stopped.");
}

Ok(())
Expand Down Expand Up @@ -944,7 +946,7 @@ mod tests {
})
.await
.unwrap_err(),
error::Error::ProcedureManagerStop { .. }
error::Error::ProcedureManagerNotStart { .. }
);
}

Expand Down
6 changes: 3 additions & 3 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl Runner {
// Don't store state if `ProcedureManager` is stopped.
if !self.running() {
self.meta.set_state(ProcedureState::Failed {
error: Arc::new(error::ProcedureManagerStopSnafu {}.build()),
error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()),
});
return;
}
Expand Down Expand Up @@ -260,7 +260,7 @@ impl Runner {
// Don't store state if `ProcedureManager` is stopped.
if !self.running() {
self.meta.set_state(ProcedureState::Failed {
error: Arc::new(error::ProcedureManagerStopSnafu {}.build()),
error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()),
});
return ExecResult::Failed;
}
Expand Down Expand Up @@ -307,7 +307,7 @@ impl Runner {
// Don't store state if `ProcedureManager` is stopped.
if !self.running() {
self.meta.set_state(ProcedureState::Failed {
error: Arc::new(error::ProcedureManagerStopSnafu {}.build()),
error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()),
});
return ExecResult::Failed;
}
Expand Down

0 comments on commit 3b59782

Please sign in to comment.