diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 4aca03838cec..29515dabcd25 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -49,12 +49,6 @@ pub enum Error { source: common_procedure::error::Error, }, - #[snafu(display("Failed to start procedures"))] - RecoverProcedures { - location: Location, - source: common_procedure::error::Error, - }, - #[snafu(display("Failed to start datanode"))] StartDatanode { location: Location, @@ -229,8 +223,7 @@ impl ErrorExt for Error { | Error::CreateDir { .. } | Error::ConnectEtcd { .. } => StatusCode::InvalidArguments, Error::StartProcedureManager { source, .. } - | Error::StopProcedureManager { source, .. } - | Error::RecoverProcedures { source, .. } => source.status_code(), + | Error::StopProcedureManager { source, .. } => source.status_code(), Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, Error::RequestDatabase { source, .. } => source.status_code(), Error::CollectRecordBatches { source, .. } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 2c0138207c7e..7aad80491ba8 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -42,9 +42,9 @@ use servers::Mode; use snafu::ResultExt; use crate::error::{ - CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, RecoverProceduresSnafu, Result, - ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, - StartProcedureManagerSnafu, StopProcedureManagerSnafu, + CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, + ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, + StopProcedureManagerSnafu, }; use crate::options::{MixOptions, Options, TopLevelOptions}; @@ -172,12 +172,8 @@ impl Instance { self.procedure_manager .start() - .context(StartProcedureManagerSnafu)?; - - self.procedure_manager - .recover() .await - .context(RecoverProceduresSnafu)?; + .context(StartProcedureManagerSnafu)?; self.frontend.start().await.context(StartFrontendSnafu)?; Ok(()) diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 0e2478f20c3a..25a974df86c6 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -35,7 +35,7 @@ pub enum Error { LoaderConflict { name: String, location: Location }, #[snafu(display("Procedure Manager is stopped"))] - ProcedureManagerNotStart { location: Location }, + ManagerNotStart { location: Location }, #[snafu(display("Failed to serialize to json"))] ToJson { @@ -152,7 +152,7 @@ impl ErrorExt for Error { | Error::RetryTimesExceeded { .. } | Error::RetryLater { .. } | Error::WaitWatcher { .. } - | Error::ProcedureManagerNotStart { .. } => StatusCode::Internal, + | Error::ManagerNotStart { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 9486ae969c94..1986d64a94dc 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -26,10 +26,10 @@ use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{info, logging}; use snafu::{ensure, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; -use tokio::sync::Notify; +use tokio::sync::{Mutex as TokioMutex, Notify}; use crate::error::{ - DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ProcedureManagerNotStartSnafu, Result, + DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, }; use crate::local::lock::LockMap; @@ -136,6 +136,8 @@ pub(crate) struct ManagerContext { messages: Mutex>, /// Ids and finished time of finished procedures. finished_procedures: Mutex>, + /// Running flag. + running: Arc, } #[async_trait] @@ -154,9 +156,29 @@ impl ManagerContext { procedures: RwLock::new(HashMap::new()), messages: Mutex::new(HashMap::new()), finished_procedures: Mutex::new(VecDeque::new()), + running: Arc::new(AtomicBool::new(false)), } } + #[cfg(test)] + pub(crate) fn set_running(&self) { + self.running.store(true, Ordering::Relaxed); + } + + /// Set the running flag. + pub(crate) fn start(&self) { + self.running.store(true, Ordering::Relaxed); + } + + pub(crate) fn stop(&self) { + self.running.store(false, Ordering::Relaxed); + } + + /// Return `ProcedureManager` is running. + pub(crate) fn running(&self) -> bool { + self.running.load(Ordering::Relaxed) + } + /// Returns true if the procedure with specific `procedure_id` exists. fn contains_procedure(&self, procedure_id: ProcedureId) -> bool { let procedures = self.procedures.read().unwrap(); @@ -369,44 +391,26 @@ pub struct LocalManager { procedure_store: Arc, max_retry_times: usize, retry_delay: Duration, - state: Arc>, - running: Arc, + /// GC task. + remove_outdated_meta_task: TokioMutex>>, config: ManagerConfig, } -#[derive(Debug, Default)] -pub struct LocalManagerState { - remove_outdated_meta_task: Option>, -} - impl LocalManager { /// Create a new [LocalManager] with specific `config`. pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager { let manager_ctx = Arc::new(ManagerContext::new()); LocalManager { - running: Arc::new(AtomicBool::new(false)), - state: Arc::new(Mutex::new(LocalManagerState::default())), manager_ctx, procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)), max_retry_times: config.max_retry_times, retry_delay: config.retry_delay, + remove_outdated_meta_task: TokioMutex::new(None), config, } } - #[cfg(test)] - fn start_task(&self) -> Result> { - let task = self.build_remove_outdated_meta_task(); - task.start(common_runtime::bg_runtime()) - .context(StartRemoveOutdatedMetaTaskSnafu)?; - Ok(task) - } - - pub fn set_running(&self) { - self.running.store(true, Ordering::Relaxed); - } - /// Build remove outedated meta task pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask { RepeatedTask::new( @@ -425,14 +429,10 @@ impl LocalManager { step: u32, procedure: BoxedProcedure, ) -> Result { - ensure!( - self.running.load(Ordering::Relaxed), - ProcedureManagerNotStartSnafu - ); + ensure!(self.manager_ctx.running(), ManagerNotStartSnafu); let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key())); let runner = Runner { - running: self.running.clone(), meta: meta.clone(), procedure, manager_ctx: self.manager_ctx.clone(), @@ -459,65 +459,8 @@ impl LocalManager { Ok(watcher) } -} - -#[async_trait] -impl ProcedureManager for LocalManager { - fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> { - let mut loaders = self.manager_ctx.loaders.lock().unwrap(); - ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name }); - - let _ = loaders.insert(name.to_string(), loader); - - Ok(()) - } - - fn start(&self) -> Result<()> { - // The previous value should be false - if !self.running.swap(true, Ordering::Relaxed) { - let mut state = self.state.lock().unwrap(); - - let task = state - .remove_outdated_meta_task - .get_or_insert(self.build_remove_outdated_meta_task()); - - task.start(common_runtime::bg_runtime()) - .context(StartRemoveOutdatedMetaTaskSnafu)?; - - self.running.store(true, Ordering::Relaxed); - info!("LocalManager is started."); - } - - Ok(()) - } - - async fn stop(&self) -> Result<()> { - // The previous value should be true - if self.running.swap(false, Ordering::Relaxed) { - let remove_outdated_meta_task = - self.state.lock().unwrap().remove_outdated_meta_task.take(); - - if let Some(task) = remove_outdated_meta_task { - task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?; - } - - self.running.store(false, Ordering::Relaxed); - info!("LocalManager is stopped."); - } - - Ok(()) - } - - async fn submit(&self, procedure: ProcedureWithId) -> Result { - let procedure_id = procedure.id; - ensure!( - !self.manager_ctx.contains_procedure(procedure_id), - DuplicateProcedureSnafu { procedure_id } - ); - - self.submit_root(procedure.id, 0, procedure.procedure) - } + /// Recovers unfinished procedures and reruns them. async fn recover(&self) -> Result<()> { logging::info!("LocalManager start to recover"); let recover_start = Instant::now(); @@ -573,6 +516,60 @@ impl ProcedureManager for LocalManager { Ok(()) } +} + +#[async_trait] +impl ProcedureManager for LocalManager { + fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> { + let mut loaders = self.manager_ctx.loaders.lock().unwrap(); + ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name }); + + let _ = loaders.insert(name.to_string(), loader); + + Ok(()) + } + + async fn start(&self) -> Result<()> { + let mut task = self.remove_outdated_meta_task.lock().await; + + let task_inner = self.build_remove_outdated_meta_task(); + + task_inner + .start(common_runtime::bg_runtime()) + .context(StartRemoveOutdatedMetaTaskSnafu)?; + + *task = Some(task_inner); + + self.manager_ctx.start(); + + info!("LocalManager is start."); + + self.recover().await + } + + async fn stop(&self) -> Result<()> { + let mut task = self.remove_outdated_meta_task.lock().await; + + if let Some(task) = task.take() { + task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?; + } + + self.manager_ctx.stop(); + + info!("LocalManager is stopped."); + + Ok(()) + } + + async fn submit(&self, procedure: ProcedureWithId) -> Result { + let procedure_id = procedure.id; + ensure!( + !self.manager_ctx.contains_procedure(procedure_id), + DuplicateProcedureSnafu { procedure_id } + ); + + self.submit_root(procedure.id, 0, procedure.procedure) + } async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { Ok(self.manager_ctx.state(procedure_id)) @@ -747,7 +744,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.manager_ctx.start(); manager .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) @@ -771,7 +768,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.manager_ctx.start(); manager .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) @@ -820,7 +817,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.manager_ctx.start(); let procedure_id = ProcedureId::random(); assert!(manager @@ -871,7 +868,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.manager_ctx.start(); #[derive(Debug)] struct MockProcedure { @@ -946,10 +943,43 @@ mod tests { }) .await .unwrap_err(), - error::Error::ProcedureManagerNotStart { .. } + error::Error::ManagerNotStart { .. } ); } + #[tokio::test] + async fn test_procedure_manager_restart() { + let dir = create_temp_dir("procedure_manager_restart"); + let config = ManagerConfig { + parent_path: "data/".to_string(), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + ..Default::default() + }; + let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); + let manager = LocalManager::new(config, state_store); + + manager.start().await.unwrap(); + manager.stop().await.unwrap(); + manager.start().await.unwrap(); + + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); + let procedure_id = ProcedureId::random(); + assert!(manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .is_ok()); + assert!(manager + .procedure_state(procedure_id) + .await + .unwrap() + .is_some()); + } + #[tokio::test] async fn test_remove_outdated_meta_task() { let dir = create_temp_dir("remove_outdated_meta_task"); @@ -963,7 +993,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.manager_ctx.set_running(); let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single("test.submit"); @@ -977,7 +1007,8 @@ mod tests { .is_ok()); let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); watcher.changed().await.unwrap(); - let task = manager.start_task().unwrap(); + + manager.start().await.unwrap(); tokio::time::sleep(Duration::from_millis(10)).await; assert!(manager .procedure_state(procedure_id) @@ -986,10 +1017,12 @@ mod tests { .is_none()); // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed. - task.stop().await.unwrap(); + manager.stop().await.unwrap(); let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single("test.submit"); let procedure_id = ProcedureId::random(); + + manager.manager_ctx.set_running(); assert!(manager .submit(ProcedureWithId { id: procedure_id, @@ -1005,5 +1038,27 @@ mod tests { .await .unwrap() .is_some()); + + // After restart + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); + let procedure_id = ProcedureId::random(); + assert!(manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .is_ok()); + let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); + watcher.changed().await.unwrap(); + + manager.start().await.unwrap(); + tokio::time::sleep(Duration::from_millis(10)).await; + assert!(manager + .procedure_state(procedure_id) + .await + .unwrap() + .is_none()); } } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 70f08e5ac1b0..0b50f4497f03 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -103,9 +102,7 @@ impl Drop for ProcedureGuard { } } -// TODO(yingwen): Support cancellation. pub(crate) struct Runner { - pub(crate) running: Arc, pub(crate) meta: ProcedureMetaRef, pub(crate) procedure: BoxedProcedure, pub(crate) manager_ctx: Arc, @@ -118,7 +115,7 @@ pub(crate) struct Runner { impl Runner { /// Return `ProcedureManager` is running. pub(crate) fn running(&self) -> bool { - self.running.load(Ordering::Relaxed) + self.manager_ctx.running() } /// Run the procedure. @@ -154,16 +151,17 @@ impl Runner { // Release locks and notify parent procedure. guard.finish(); - // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure. - if !self.running() { - return; - } - // If this is the root procedure, clean up message cache. if self.meta.parent_id.is_none() { let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta); // Clean resources. self.manager_ctx.on_procedures_finish(&procedure_ids); + + // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure. + if !self.running() { + return; + } + for id in procedure_ids { if let Err(e) = self.store.delete_procedure(id).await { logging::error!( @@ -201,7 +199,7 @@ impl Runner { // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), + error: Arc::new(error::ManagerNotStartSnafu {}.build()), }); return; } @@ -260,7 +258,7 @@ impl Runner { // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), + error: Arc::new(error::ManagerNotStartSnafu {}.build()), }); return ExecResult::Failed; } @@ -299,19 +297,19 @@ impl Runner { e.is_retry_later(), ); - if e.is_retry_later() { - self.meta.set_state(ProcedureState::retrying(Arc::new(e))); - return ExecResult::RetryLater; - } - // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), + error: Arc::new(error::ManagerNotStartSnafu {}.build()), }); return ExecResult::Failed; } + if e.is_retry_later() { + self.meta.set_state(ProcedureState::retrying(Arc::new(e))); + return ExecResult::RetryLater; + } + // Write rollback key so we can skip this procedure while recovering procedures. self.rollback(Arc::new(e)).await } @@ -343,7 +341,6 @@ impl Runner { procedure.lock_key(), )); let runner = Runner { - running: self.running.clone(), meta: meta.clone(), procedure, manager_ctx: self.manager_ctx.clone(), @@ -510,7 +507,6 @@ mod tests { store: Arc, ) -> Runner { Runner { - running: Arc::new(AtomicBool::new(true)), meta, procedure, manager_ctx: Arc::new(ManagerContext::new()), @@ -618,6 +614,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_continue(), "{res:?}"); @@ -678,6 +675,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(suspend), procedure_store); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_continue(), "{res:?}"); @@ -779,6 +777,7 @@ mod tests { let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone()); let manager_ctx = Arc::new(ManagerContext::new()); + manager_ctx.start(); // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta)); // Replace the manager ctx. @@ -821,6 +820,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_continue(), "{res:?}"); @@ -832,7 +832,7 @@ mod tests { ) .await; - runner.running.store(false, Ordering::Relaxed); + runner.manager_ctx.stop(); let res = runner.execute_once(&ctx).await; assert!(res.is_failed()); // Shouldn't write any files @@ -861,7 +861,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); - runner.running.store(false, Ordering::Relaxed); + runner.manager_ctx.stop(); let res = runner.execute_once(&ctx).await; assert!(res.is_failed(), "{res:?}"); @@ -885,6 +885,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_failed(), "{res:?}"); @@ -926,6 +927,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_retry_later(), "{res:?}"); @@ -963,6 +965,8 @@ mod tests { Box::new(exceed_max_retry_later), procedure_store, ); + runner.manager_ctx.start(); + runner.exponential_builder = ExponentialBuilder::default() .with_min_delay(Duration::from_millis(1)) .with_max_times(3); @@ -1033,8 +1037,8 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store); - let manager_ctx = Arc::new(ManagerContext::new()); + manager_ctx.start(); // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta.clone())); // Replace the manager ctx. diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index c20ddef2b203..54f34b7d7ccf 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -279,8 +279,14 @@ pub trait ProcedureManager: Send + Sync + 'static { /// Registers loader for specific procedure type `name`. fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()>; - fn start(&self) -> Result<()>; + /// Starts the background GC task. + /// + /// Recovers unfinished procedures and reruns them. + /// + /// Callers should ensure all loaders are registered. + async fn start(&self) -> Result<()>; + /// Stops the background GC task. async fn stop(&self) -> Result<()>; /// Submits a procedure to execute. @@ -288,11 +294,6 @@ pub trait ProcedureManager: Send + Sync + 'static { /// Returns a [Watcher] to watch the created procedure. async fn submit(&self, procedure: ProcedureWithId) -> Result; - /// Recovers unfinished procedures and reruns them. - /// - /// Callers should ensure all loaders are registered. - async fn recover(&self) -> Result<()>; - /// Query the procedure state. /// /// Returns `Ok(None)` if the procedure doesn't exist. diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 93e385bc7769..75cf777beece 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -71,7 +71,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.start().await.unwrap(); #[derive(Debug)] struct MockProcedure { diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index f932ea84e08a..25605af6db19 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -405,12 +405,6 @@ pub enum Error { source: common_procedure::Error, }, - #[snafu(display("Failed to recover procedure"))] - RecoverProcedure { - location: Location, - source: common_procedure::Error, - }, - #[snafu(display("Failed to wait procedure done"))] WaitProcedure { location: Location, @@ -628,9 +622,9 @@ impl ErrorExt for Error { Error::RequestDatanode { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } | Error::InvalidFullTableName { source, .. } => source.status_code(), - Error::RecoverProcedure { source, .. } - | Error::SubmitProcedure { source, .. } - | Error::WaitProcedure { source, .. } => source.status_code(), + Error::SubmitProcedure { source, .. } | Error::WaitProcedure { source, .. } => { + source.status_code() + } Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => { source.status_code() } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 843ea2d1efe6..5d49c8f86d1c 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -38,8 +38,7 @@ use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{ - InitMetadataSnafu, RecoverProcedureSnafu, Result, StartProcedureManagerSnafu, - StopProcedureManagerSnafu, + InitMetadataSnafu, Result, StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; @@ -228,11 +227,8 @@ impl MetaSrv { ); match msg { LeaderChangeMessage::Elected(_) => { - if let Err(e) = procedure_manager.start() { - error!("Failed to start procedure manager, error: {e}"); - } - if let Err(e) = procedure_manager.recover().await { - error!("Failed to recover procedures, error: {e}"); + if let Err(e) = procedure_manager.start().await { + error!(e; "Failed to start procedure manager"); } let _ = task_handler.start().map_err(|e| { debug!( @@ -242,7 +238,7 @@ impl MetaSrv { } LeaderChangeMessage::StepDown(leader) => { if let Err(e) = procedure_manager.stop().await { - error!("Failed to stop procedure manager, error: {e}"); + error!(e; "Failed to stop procedure manager"); } if let Some(sub_manager) = subscribe_manager.clone() { info!("Leader changed, un_subscribe all"); @@ -289,11 +285,8 @@ impl MetaSrv { } else { self.procedure_manager .start() - .context(StartProcedureManagerSnafu)?; - self.procedure_manager - .recover() .await - .context(RecoverProcedureSnafu)?; + .context(StartProcedureManagerSnafu)?; } info!("MetaSrv started"); diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index fc6c5d9c4de5..b5317e04b76d 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -59,7 +59,6 @@ pub(crate) fn create_region_failover_manager() -> Arc { let state_store = Arc::new(KvStateStore::new(KvBackendAdapter::wrap(kv_store.clone()))); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); - procedure_manager.set_running(); let in_memory = Arc::new(MemStore::new()); let meta_peer_client = MetaPeerClientBuilder::default() diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index cf41b9c6765f..1cb34d5a12f0 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -92,7 +92,7 @@ impl GreptimeDbStandaloneBuilder { .init() .await .unwrap(); - procedure_manager.start().unwrap(); + procedure_manager.start().await.unwrap(); let instance = Instance::try_new_standalone( kv_store, procedure_manager,