From 3b59782fca5fdcf860e8a37f6aad782af2266d2a Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 13 Oct 2023 02:43:46 +0000 Subject: [PATCH] chore: apply suggestions from CR --- src/common/meta/src/key.rs | 1 - src/common/procedure/src/error.rs | 4 ++-- src/common/procedure/src/local.rs | 18 ++++++++++-------- src/common/procedure/src/local/runner.rs | 6 +++--- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 46252c729232..d1a980d7023e 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -258,7 +258,6 @@ impl DeserializedValueWithBytes { self.bytes.to_vec() } - #[cfg(feature = "testing")] /// Notes: used for test purpose. pub fn from_inner(inner: T) -> Self { let bytes = serde_json::to_vec(&inner).unwrap(); diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index cf8e0a6f78b2..0e2478f20c3a 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -35,7 +35,7 @@ pub enum Error { LoaderConflict { name: String, location: Location }, #[snafu(display("Procedure Manager is stopped"))] - ProcedureManagerStop { location: Location }, + ProcedureManagerNotStart { location: Location }, #[snafu(display("Failed to serialize to json"))] ToJson { @@ -152,7 +152,7 @@ impl ErrorExt for Error { | Error::RetryTimesExceeded { .. } | Error::RetryLater { .. } | Error::WaitWatcher { .. } - | Error::ProcedureManagerStop { .. } => StatusCode::Internal, + | Error::ProcedureManagerNotStart { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 65132834bd6e..9486ae969c94 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -23,13 +23,13 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use backon::ExponentialBuilder; use common_runtime::{RepeatedTask, TaskFunction}; -use common_telemetry::logging; +use common_telemetry::{info, logging}; use snafu::{ensure, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::Notify; use crate::error::{ - DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ProcedureManagerStopSnafu, Result, + DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ProcedureManagerNotStartSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, }; use crate::local::lock::LockMap; @@ -427,7 +427,7 @@ impl LocalManager { ) -> Result { ensure!( self.running.load(Ordering::Relaxed), - ProcedureManagerStopSnafu + ProcedureManagerNotStartSnafu ); let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key())); @@ -473,7 +473,8 @@ impl ProcedureManager for LocalManager { } fn start(&self) -> Result<()> { - if !self.running.load(Ordering::Relaxed) { + // The previous value should be false + if !self.running.swap(true, Ordering::Relaxed) { let mut state = self.state.lock().unwrap(); let task = state @@ -484,14 +485,15 @@ impl ProcedureManager for LocalManager { .context(StartRemoveOutdatedMetaTaskSnafu)?; self.running.store(true, Ordering::Relaxed); - logging::info!("LocalManager is started."); + info!("LocalManager is started."); } Ok(()) } async fn stop(&self) -> Result<()> { - if self.running.load(Ordering::Relaxed) { + // The previous value should be true + if self.running.swap(false, Ordering::Relaxed) { let remove_outdated_meta_task = self.state.lock().unwrap().remove_outdated_meta_task.take(); @@ -500,7 +502,7 @@ impl ProcedureManager for LocalManager { } self.running.store(false, Ordering::Relaxed); - logging::info!("LocalManager is stopped."); + info!("LocalManager is stopped."); } Ok(()) @@ -944,7 +946,7 @@ mod tests { }) .await .unwrap_err(), - error::Error::ProcedureManagerStop { .. } + error::Error::ProcedureManagerNotStart { .. } ); } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index a8c95fbb550f..70f08e5ac1b0 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -201,7 +201,7 @@ impl Runner { // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerStopSnafu {}.build()), + error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), }); return; } @@ -260,7 +260,7 @@ impl Runner { // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerStopSnafu {}.build()), + error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), }); return ExecResult::Failed; } @@ -307,7 +307,7 @@ impl Runner { // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerStopSnafu {}.build()), + error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), }); return ExecResult::Failed; }