From 5c1ad228081de630a6c2a636972c2052f4b42328 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 11 Oct 2023 09:35:57 +0000 Subject: [PATCH 1/6] feat: stop the procedure manager if a new leader is elected --- src/cmd/src/error.rs | 21 ++++ src/cmd/src/standalone.rs | 28 ++++- src/common/procedure/src/error.rs | 6 +- src/common/procedure/src/lib.rs | 2 + src/common/procedure/src/local.rs | 128 +++++++++++++++++++---- src/common/procedure/src/local/runner.rs | 102 +++++++++++++++++- src/common/procedure/src/watcher.rs | 1 + src/meta-srv/src/bootstrap.rs | 3 +- src/meta-srv/src/error.rs | 14 +++ src/meta-srv/src/metasrv.rs | 24 ++++- src/meta-srv/src/test_util.rs | 1 + tests-integration/src/standalone.rs | 2 +- 12 files changed, 300 insertions(+), 32 deletions(-) diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index ae8c363fcd92..e1b1599d5d9f 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -37,6 +37,24 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to start procedure manager"))] + StartProcedureManager { + location: Location, + source: common_procedure::error::Error, + }, + + #[snafu(display("Failed to stop procedure manager"))] + StopProcedureManager { + location: Location, + source: common_procedure::error::Error, + }, + + #[snafu(display("Failed to start procedures"))] + RecoverProcedures { + location: Location, + source: common_procedure::error::Error, + }, + #[snafu(display("Failed to start datanode"))] StartDatanode { location: Location, @@ -241,6 +259,9 @@ impl ErrorExt for Error { | Error::CreateDir { .. } | Error::EmptyResult { .. } | Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments, + Error::StartProcedureManager { source, .. } + | Error::StopProcedureManager { source, .. } + | Error::RecoverProcedures { source, .. } => source.status_code(), Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, Error::RequestDatabase { source, .. } => source.status_code(), diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 86bafc546348..7675c570b832 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -42,8 +42,9 @@ use servers::Mode; use snafu::ResultExt; use crate::error::{ - CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, - ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, + CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, RecoverProceduresSnafu, Result, + ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, + StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; use crate::options::{MixOptions, Options, TopLevelOptions}; @@ -163,6 +164,7 @@ impl StandaloneOptions { pub struct Instance { datanode: Datanode, frontend: FeInstance, + procedure_manager: ProcedureManagerRef, } impl Instance { @@ -171,6 +173,15 @@ impl Instance { self.datanode.start().await.context(StartDatanodeSnafu)?; info!("Datanode instance started"); + self.procedure_manager + .start() + .context(StartProcedureManagerSnafu)?; + + self.procedure_manager + .recover() + .await + .context(RecoverProceduresSnafu)?; + self.frontend.start().await.context(StartFrontendSnafu)?; Ok(()) } @@ -181,6 +192,11 @@ impl Instance { .await .context(ShutdownFrontendSnafu)?; + self.procedure_manager + .stop() + .await + .context(StopProcedureManagerSnafu)?; + self.datanode .shutdown() .await @@ -354,7 +370,7 @@ impl StartCommand { let mut frontend = build_frontend( fe_plugins, kv_store, - procedure_manager, + procedure_manager.clone(), catalog_manager, region_server, ) @@ -365,7 +381,11 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - Ok(Instance { datanode, frontend }) + Ok(Instance { + datanode, + frontend, + procedure_manager, + }) } } diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 1cd63decd282..cf8e0a6f78b2 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -34,6 +34,9 @@ pub enum Error { #[snafu(display("Loader {} is already registered", name))] LoaderConflict { name: String, location: Location }, + #[snafu(display("Procedure Manager is stopped"))] + ProcedureManagerStop { location: Location }, + #[snafu(display("Failed to serialize to json"))] ToJson { #[snafu(source)] @@ -148,7 +151,8 @@ impl ErrorExt for Error { | Error::FromJson { .. } | Error::RetryTimesExceeded { .. } | Error::RetryLater { .. } - | Error::WaitWatcher { .. } => StatusCode::Internal, + | Error::WaitWatcher { .. } + | Error::ProcedureManagerStop { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 7a542c415786..544d5d80367a 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -14,6 +14,8 @@ //! Common traits and structures for the procedure framework. +#![feature(assert_matches)] + pub mod error; pub mod local; pub mod options; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index e32d5f90e8af..65132834bd6e 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -16,6 +16,7 @@ mod lock; mod runner; use std::collections::{HashMap, VecDeque}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; @@ -28,8 +29,8 @@ use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::Notify; use crate::error::{ - DuplicateProcedureSnafu, Error, LoaderConflictSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, - StopRemoveOutdatedMetaTaskSnafu, + DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ProcedureManagerStopSnafu, Result, + StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, }; use crate::local::lock::LockMap; use crate::local::runner::Runner; @@ -368,29 +369,55 @@ pub struct LocalManager { procedure_store: Arc, max_retry_times: usize, retry_delay: Duration, - remove_outdated_meta_task: RepeatedTask, + state: Arc>, + running: Arc, + config: ManagerConfig, +} + +#[derive(Debug, Default)] +pub struct LocalManagerState { + remove_outdated_meta_task: Option>, } impl LocalManager { /// Create a new [LocalManager] with specific `config`. pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager { let manager_ctx = Arc::new(ManagerContext::new()); - let remove_outdated_meta_task = RepeatedTask::new( - config.remove_outdated_meta_task_interval, - Box::new(RemoveOutdatedMetaFunction { - manager_ctx: manager_ctx.clone(), - ttl: config.remove_outdated_meta_ttl, - }), - ); + LocalManager { + running: Arc::new(AtomicBool::new(false)), + state: Arc::new(Mutex::new(LocalManagerState::default())), manager_ctx, procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)), max_retry_times: config.max_retry_times, retry_delay: config.retry_delay, - remove_outdated_meta_task, + config, } } + #[cfg(test)] + fn start_task(&self) -> Result> { + let task = self.build_remove_outdated_meta_task(); + task.start(common_runtime::bg_runtime()) + .context(StartRemoveOutdatedMetaTaskSnafu)?; + Ok(task) + } + + pub fn set_running(&self) { + self.running.store(true, Ordering::Relaxed); + } + + /// Build remove outedated meta task + pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask { + RepeatedTask::new( + self.config.remove_outdated_meta_task_interval, + Box::new(RemoveOutdatedMetaFunction { + manager_ctx: self.manager_ctx.clone(), + ttl: self.config.remove_outdated_meta_ttl, + }), + ) + } + /// Submit a root procedure with given `procedure_id`. fn submit_root( &self, @@ -398,8 +425,14 @@ impl LocalManager { step: u32, procedure: BoxedProcedure, ) -> Result { + ensure!( + self.running.load(Ordering::Relaxed), + ProcedureManagerStopSnafu + ); + let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key())); let runner = Runner { + running: self.running.clone(), meta: meta.clone(), procedure, manager_ctx: self.manager_ctx.clone(), @@ -440,17 +473,36 @@ impl ProcedureManager for LocalManager { } fn start(&self) -> Result<()> { - self.remove_outdated_meta_task - .start(common_runtime::bg_runtime()) - .context(StartRemoveOutdatedMetaTaskSnafu)?; + if !self.running.load(Ordering::Relaxed) { + let mut state = self.state.lock().unwrap(); + + let task = state + .remove_outdated_meta_task + .get_or_insert(self.build_remove_outdated_meta_task()); + + task.start(common_runtime::bg_runtime()) + .context(StartRemoveOutdatedMetaTaskSnafu)?; + + self.running.store(true, Ordering::Relaxed); + logging::info!("LocalManager is started."); + } + Ok(()) } async fn stop(&self) -> Result<()> { - self.remove_outdated_meta_task - .stop() - .await - .context(StopRemoveOutdatedMetaTaskSnafu)?; + if self.running.load(Ordering::Relaxed) { + let remove_outdated_meta_task = + self.state.lock().unwrap().remove_outdated_meta_task.take(); + + if let Some(task) = remove_outdated_meta_task { + task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?; + } + + self.running.store(false, Ordering::Relaxed); + logging::info!("LocalManager is stopped."); + } + Ok(()) } @@ -569,12 +621,14 @@ pub(crate) mod test_util { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + use common_error::mock::MockError; use common_error::status_code::StatusCode; use common_test_util::temp_dir::create_temp_dir; use super::*; - use crate::error::Error; + use crate::error::{self, Error}; use crate::store::state_store::ObjectStateStore; use crate::{Context, Procedure, Status}; @@ -691,6 +745,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); + manager.set_running(); manager .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) @@ -714,6 +769,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let manager = LocalManager::new(config, state_store); + manager.set_running(); manager .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) @@ -762,6 +818,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); + manager.set_running(); let procedure_id = ProcedureId::random(); assert!(manager @@ -812,6 +869,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); + manager.set_running(); #[derive(Debug)] struct MockProcedure { @@ -863,6 +921,33 @@ mod tests { check_procedure(MockProcedure { panic: true }).await; } + #[tokio::test] + async fn test_procedure_manager_stopped() { + let dir = create_temp_dir("procedure_manager_stopped"); + let config = ManagerConfig { + parent_path: "data/".to_string(), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + ..Default::default() + }; + let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); + let manager = LocalManager::new(config, state_store); + + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); + let procedure_id = ProcedureId::random(); + assert_matches!( + manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .unwrap_err(), + error::Error::ProcedureManagerStop { .. } + ); + } + #[tokio::test] async fn test_remove_outdated_meta_task() { let dir = create_temp_dir("remove_outdated_meta_task"); @@ -876,6 +961,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let manager = LocalManager::new(config, state_store); + manager.set_running(); let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single("test.submit"); @@ -889,7 +975,7 @@ mod tests { .is_ok()); let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); watcher.changed().await.unwrap(); - manager.start().unwrap(); + let task = manager.start_task().unwrap(); tokio::time::sleep(Duration::from_millis(10)).await; assert!(manager .procedure_state(procedure_id) @@ -898,7 +984,7 @@ mod tests { .is_none()); // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed. - manager.stop().await.unwrap(); + task.stop().await.unwrap(); let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single("test.submit"); let procedure_id = ProcedureId::random(); diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 3e997562663a..a8c95fbb550f 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -19,7 +20,7 @@ use backon::{BackoffBuilder, ExponentialBuilder}; use common_telemetry::logging; use tokio::time; -use crate::error::{ProcedurePanicSnafu, Result}; +use crate::error::{self, ProcedurePanicSnafu, Result}; use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; use crate::store::ProcedureStore; use crate::ProcedureState::Retrying; @@ -104,6 +105,7 @@ impl Drop for ProcedureGuard { // TODO(yingwen): Support cancellation. pub(crate) struct Runner { + pub(crate) running: Arc, pub(crate) meta: ProcedureMetaRef, pub(crate) procedure: BoxedProcedure, pub(crate) manager_ctx: Arc, @@ -114,6 +116,11 @@ pub(crate) struct Runner { } impl Runner { + /// Return `ProcedureManager` is running. + pub(crate) fn running(&self) -> bool { + self.running.load(Ordering::Relaxed) + } + /// Run the procedure. pub(crate) async fn run(mut self) { // Ensure we can update the procedure state. @@ -147,6 +154,11 @@ impl Runner { // Release locks and notify parent procedure. guard.finish(); + // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure. + if !self.running() { + return; + } + // If this is the root procedure, clean up message cache. if self.meta.parent_id.is_none() { let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta); @@ -186,6 +198,13 @@ impl Runner { let mut retry = self.exponential_builder.build(); let mut retry_times = 0; loop { + // Don't store state if `ProcedureManager` is stopped. + if !self.running() { + self.meta.set_state(ProcedureState::Failed { + error: Arc::new(error::ProcedureManagerStopSnafu {}.build()), + }); + return; + } match self.execute_once(ctx).await { ExecResult::Done | ExecResult::Failed => return, ExecResult::Continue => (), @@ -238,6 +257,14 @@ impl Runner { status.need_persist(), ); + // Don't store state if `ProcedureManager` is stopped. + if !self.running() { + self.meta.set_state(ProcedureState::Failed { + error: Arc::new(error::ProcedureManagerStopSnafu {}.build()), + }); + return ExecResult::Failed; + } + if status.need_persist() { if let Err(err) = self.persist_procedure().await { self.meta.set_state(ProcedureState::retrying(Arc::new(err))); @@ -277,6 +304,14 @@ impl Runner { return ExecResult::RetryLater; } + // Don't store state if `ProcedureManager` is stopped. + if !self.running() { + self.meta.set_state(ProcedureState::Failed { + error: Arc::new(error::ProcedureManagerStopSnafu {}.build()), + }); + return ExecResult::Failed; + } + // Write rollback key so we can skip this procedure while recovering procedures. self.rollback(Arc::new(e)).await } @@ -308,6 +343,7 @@ impl Runner { procedure.lock_key(), )); let runner = Runner { + running: self.running.clone(), meta: meta.clone(), procedure, manager_ctx: self.manager_ctx.clone(), @@ -474,6 +510,7 @@ mod tests { store: Arc, ) -> Runner { Runner { + running: Arc::new(AtomicBool::new(true)), meta, procedure, manager_ctx: Arc::new(ManagerContext::new()), @@ -769,6 +806,69 @@ mod tests { } } + #[tokio::test] + async fn test_running_is_stopped() { + let exec_fn = move |_| async move { Ok(Status::Executing { persist: true }) }.boxed(); + let normal = ProcedureAdapter { + data: "normal".to_string(), + lock_key: LockKey::single("catalog.schema.table"), + exec_fn, + }; + + let dir = create_temp_dir("test_running_is_stopped"); + let meta = normal.new_meta(ROOT_ID); + let ctx = context_without_provider(meta.id); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); + + let res = runner.execute_once(&ctx).await; + assert!(res.is_continue(), "{res:?}"); + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + &["0000000000.step"], + ) + .await; + + runner.running.store(false, Ordering::Relaxed); + let res = runner.execute_once(&ctx).await; + assert!(res.is_failed()); + // Shouldn't write any files + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + &["0000000000.step"], + ) + .await; + } + + #[tokio::test] + async fn test_running_is_stopped_on_error() { + let exec_fn = + |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed(); + let normal = ProcedureAdapter { + data: "fail".to_string(), + lock_key: LockKey::single("catalog.schema.table"), + exec_fn, + }; + + let dir = create_temp_dir("test_running_is_stopped_on_error"); + let meta = normal.new_meta(ROOT_ID); + let ctx = context_without_provider(meta.id); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); + runner.running.store(false, Ordering::Relaxed); + + let res = runner.execute_once(&ctx).await; + assert!(res.is_failed(), "{res:?}"); + // Shouldn't write any files + check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await; + } + #[tokio::test] async fn test_execute_on_error() { let exec_fn = diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 576c3d420b3c..93e385bc7769 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -71,6 +71,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); + manager.set_running(); #[derive(Debug)] struct MockProcedure { diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index b24aa46e4d28..6c71f92375a0 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -111,8 +111,7 @@ impl MetaSrvInstance { .await .context(error::SendShutdownSignalSnafu)?; } - - self.meta_srv.shutdown(); + self.meta_srv.shutdown().await?; self.http_srv .shutdown() .await diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 0835039d12bf..f932ea84e08a 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -393,6 +393,18 @@ pub enum Error { #[snafu(display("Missing required parameter, param: {:?}", param))] MissingRequiredParameter { param: String }, + #[snafu(display("Failed to start procedure manager"))] + StartProcedureManager { + location: Location, + source: common_procedure::Error, + }, + + #[snafu(display("Failed to stop procedure manager"))] + StopProcedureManager { + location: Location, + source: common_procedure::Error, + }, + #[snafu(display("Failed to recover procedure"))] RecoverProcedure { location: Location, @@ -622,6 +634,8 @@ impl ErrorExt for Error { Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => { source.status_code() } + Error::StartProcedureManager { source, .. } + | Error::StopProcedureManager { source, .. } => source.status_code(), Error::ListCatalogs { source, .. } | Error::ListSchemas { source, .. } => { source.status_code() diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 90ae491bc9cb..843ea2d1efe6 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -37,7 +37,10 @@ use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; -use crate::error::{InitMetadataSnafu, RecoverProcedureSnafu, Result}; +use crate::error::{ + InitMetadataSnafu, RecoverProcedureSnafu, Result, StartProcedureManagerSnafu, + StopProcedureManagerSnafu, +}; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; @@ -225,6 +228,9 @@ impl MetaSrv { ); match msg { LeaderChangeMessage::Elected(_) => { + if let Err(e) = procedure_manager.start() { + error!("Failed to start procedure manager, error: {e}"); + } if let Err(e) = procedure_manager.recover().await { error!("Failed to recover procedures, error: {e}"); } @@ -235,6 +241,9 @@ impl MetaSrv { }); } LeaderChangeMessage::StepDown(leader) => { + if let Err(e) = procedure_manager.stop().await { + error!("Failed to stop procedure manager, error: {e}"); + } if let Some(sub_manager) = subscribe_manager.clone() { info!("Leader changed, un_subscribe all"); if let Err(e) = sub_manager.un_subscribe_all() { @@ -259,6 +268,10 @@ impl MetaSrv { } } } + + if let Err(e) = procedure_manager.stop().await { + error!("Failed to stop procedure manager, error: {e}"); + } }); let election = election.clone(); @@ -274,6 +287,9 @@ impl MetaSrv { info!("MetaSrv stopped"); }); } else { + self.procedure_manager + .start() + .context(StartProcedureManagerSnafu)?; self.procedure_manager .recover() .await @@ -291,8 +307,12 @@ impl MetaSrv { .context(InitMetadataSnafu) } - pub fn shutdown(&self) { + pub async fn shutdown(&self) -> Result<()> { self.started.store(false, Ordering::Relaxed); + self.procedure_manager + .stop() + .await + .context(StopProcedureManagerSnafu) } #[inline] diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index b5317e04b76d..fc6c5d9c4de5 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -59,6 +59,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { let state_store = Arc::new(KvStateStore::new(KvBackendAdapter::wrap(kv_store.clone()))); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); + procedure_manager.set_running(); let in_memory = Arc::new(MemStore::new()); let meta_peer_client = MetaPeerClientBuilder::default() diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index f17dd303fb38..cf41b9c6765f 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -92,7 +92,7 @@ impl GreptimeDbStandaloneBuilder { .init() .await .unwrap(); - + procedure_manager.start().unwrap(); let instance = Instance::try_new_standalone( kv_store, procedure_manager, From 9ba05acaff320e1ada0feea0850829fb1f59569a Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 13 Oct 2023 02:43:46 +0000 Subject: [PATCH 2/6] chore: apply suggestions from CR --- src/common/meta/src/key.rs | 1 - src/common/procedure/src/error.rs | 4 ++-- src/common/procedure/src/local.rs | 18 ++++++++++-------- src/common/procedure/src/local/runner.rs | 6 +++--- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 46252c729232..d1a980d7023e 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -258,7 +258,6 @@ impl DeserializedValueWithBytes { self.bytes.to_vec() } - #[cfg(feature = "testing")] /// Notes: used for test purpose. pub fn from_inner(inner: T) -> Self { let bytes = serde_json::to_vec(&inner).unwrap(); diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index cf8e0a6f78b2..0e2478f20c3a 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -35,7 +35,7 @@ pub enum Error { LoaderConflict { name: String, location: Location }, #[snafu(display("Procedure Manager is stopped"))] - ProcedureManagerStop { location: Location }, + ProcedureManagerNotStart { location: Location }, #[snafu(display("Failed to serialize to json"))] ToJson { @@ -152,7 +152,7 @@ impl ErrorExt for Error { | Error::RetryTimesExceeded { .. } | Error::RetryLater { .. } | Error::WaitWatcher { .. } - | Error::ProcedureManagerStop { .. } => StatusCode::Internal, + | Error::ProcedureManagerNotStart { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 65132834bd6e..9486ae969c94 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -23,13 +23,13 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use backon::ExponentialBuilder; use common_runtime::{RepeatedTask, TaskFunction}; -use common_telemetry::logging; +use common_telemetry::{info, logging}; use snafu::{ensure, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::Notify; use crate::error::{ - DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ProcedureManagerStopSnafu, Result, + DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ProcedureManagerNotStartSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, }; use crate::local::lock::LockMap; @@ -427,7 +427,7 @@ impl LocalManager { ) -> Result { ensure!( self.running.load(Ordering::Relaxed), - ProcedureManagerStopSnafu + ProcedureManagerNotStartSnafu ); let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key())); @@ -473,7 +473,8 @@ impl ProcedureManager for LocalManager { } fn start(&self) -> Result<()> { - if !self.running.load(Ordering::Relaxed) { + // The previous value should be false + if !self.running.swap(true, Ordering::Relaxed) { let mut state = self.state.lock().unwrap(); let task = state @@ -484,14 +485,15 @@ impl ProcedureManager for LocalManager { .context(StartRemoveOutdatedMetaTaskSnafu)?; self.running.store(true, Ordering::Relaxed); - logging::info!("LocalManager is started."); + info!("LocalManager is started."); } Ok(()) } async fn stop(&self) -> Result<()> { - if self.running.load(Ordering::Relaxed) { + // The previous value should be true + if self.running.swap(false, Ordering::Relaxed) { let remove_outdated_meta_task = self.state.lock().unwrap().remove_outdated_meta_task.take(); @@ -500,7 +502,7 @@ impl ProcedureManager for LocalManager { } self.running.store(false, Ordering::Relaxed); - logging::info!("LocalManager is stopped."); + info!("LocalManager is stopped."); } Ok(()) @@ -944,7 +946,7 @@ mod tests { }) .await .unwrap_err(), - error::Error::ProcedureManagerStop { .. } + error::Error::ProcedureManagerNotStart { .. } ); } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index a8c95fbb550f..70f08e5ac1b0 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -201,7 +201,7 @@ impl Runner { // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerStopSnafu {}.build()), + error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), }); return; } @@ -260,7 +260,7 @@ impl Runner { // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerStopSnafu {}.build()), + error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), }); return ExecResult::Failed; } @@ -307,7 +307,7 @@ impl Runner { // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerStopSnafu {}.build()), + error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), }); return ExecResult::Failed; } From 6fc4078aec5b3d44c36d6a8fd8fe53098ab90b9c Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 17 Oct 2023 04:12:16 +0000 Subject: [PATCH 3/6] chore: apply suggestions --- src/cmd/src/error.rs | 10 +- src/cmd/src/standalone.rs | 12 +- src/common/procedure/src/error.rs | 4 +- src/common/procedure/src/local.rs | 243 ++++++++++++++--------- src/common/procedure/src/local/runner.rs | 48 +++-- src/common/procedure/src/procedure.rs | 13 +- src/common/procedure/src/watcher.rs | 2 +- src/meta-srv/src/error.rs | 12 +- src/meta-srv/src/metasrv.rs | 19 +- src/meta-srv/src/test_util.rs | 1 - tests-integration/src/standalone.rs | 2 +- 11 files changed, 200 insertions(+), 166 deletions(-) diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index e1b1599d5d9f..40cad2e44ec8 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -49,12 +49,6 @@ pub enum Error { source: common_procedure::error::Error, }, - #[snafu(display("Failed to start procedures"))] - RecoverProcedures { - location: Location, - source: common_procedure::error::Error, - }, - #[snafu(display("Failed to start datanode"))] StartDatanode { location: Location, @@ -260,9 +254,7 @@ impl ErrorExt for Error { | Error::EmptyResult { .. } | Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments, Error::StartProcedureManager { source, .. } - | Error::StopProcedureManager { source, .. } - | Error::RecoverProcedures { source, .. } => source.status_code(), - + | Error::StopProcedureManager { source, .. } => source.status_code(), Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, Error::RequestDatabase { source, .. } => source.status_code(), Error::CollectRecordBatches { source, .. } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 7675c570b832..6b313ffa6ec1 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -42,9 +42,9 @@ use servers::Mode; use snafu::ResultExt; use crate::error::{ - CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, RecoverProceduresSnafu, Result, - ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, - StartProcedureManagerSnafu, StopProcedureManagerSnafu, + CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, + ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, + StopProcedureManagerSnafu, }; use crate::options::{MixOptions, Options, TopLevelOptions}; @@ -175,12 +175,8 @@ impl Instance { self.procedure_manager .start() - .context(StartProcedureManagerSnafu)?; - - self.procedure_manager - .recover() .await - .context(RecoverProceduresSnafu)?; + .context(StartProcedureManagerSnafu)?; self.frontend.start().await.context(StartFrontendSnafu)?; Ok(()) diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 0e2478f20c3a..25a974df86c6 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -35,7 +35,7 @@ pub enum Error { LoaderConflict { name: String, location: Location }, #[snafu(display("Procedure Manager is stopped"))] - ProcedureManagerNotStart { location: Location }, + ManagerNotStart { location: Location }, #[snafu(display("Failed to serialize to json"))] ToJson { @@ -152,7 +152,7 @@ impl ErrorExt for Error { | Error::RetryTimesExceeded { .. } | Error::RetryLater { .. } | Error::WaitWatcher { .. } - | Error::ProcedureManagerNotStart { .. } => StatusCode::Internal, + | Error::ManagerNotStart { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 9486ae969c94..1986d64a94dc 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -26,10 +26,10 @@ use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{info, logging}; use snafu::{ensure, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; -use tokio::sync::Notify; +use tokio::sync::{Mutex as TokioMutex, Notify}; use crate::error::{ - DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ProcedureManagerNotStartSnafu, Result, + DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, }; use crate::local::lock::LockMap; @@ -136,6 +136,8 @@ pub(crate) struct ManagerContext { messages: Mutex>, /// Ids and finished time of finished procedures. finished_procedures: Mutex>, + /// Running flag. + running: Arc, } #[async_trait] @@ -154,9 +156,29 @@ impl ManagerContext { procedures: RwLock::new(HashMap::new()), messages: Mutex::new(HashMap::new()), finished_procedures: Mutex::new(VecDeque::new()), + running: Arc::new(AtomicBool::new(false)), } } + #[cfg(test)] + pub(crate) fn set_running(&self) { + self.running.store(true, Ordering::Relaxed); + } + + /// Set the running flag. + pub(crate) fn start(&self) { + self.running.store(true, Ordering::Relaxed); + } + + pub(crate) fn stop(&self) { + self.running.store(false, Ordering::Relaxed); + } + + /// Return `ProcedureManager` is running. + pub(crate) fn running(&self) -> bool { + self.running.load(Ordering::Relaxed) + } + /// Returns true if the procedure with specific `procedure_id` exists. fn contains_procedure(&self, procedure_id: ProcedureId) -> bool { let procedures = self.procedures.read().unwrap(); @@ -369,44 +391,26 @@ pub struct LocalManager { procedure_store: Arc, max_retry_times: usize, retry_delay: Duration, - state: Arc>, - running: Arc, + /// GC task. + remove_outdated_meta_task: TokioMutex>>, config: ManagerConfig, } -#[derive(Debug, Default)] -pub struct LocalManagerState { - remove_outdated_meta_task: Option>, -} - impl LocalManager { /// Create a new [LocalManager] with specific `config`. pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager { let manager_ctx = Arc::new(ManagerContext::new()); LocalManager { - running: Arc::new(AtomicBool::new(false)), - state: Arc::new(Mutex::new(LocalManagerState::default())), manager_ctx, procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)), max_retry_times: config.max_retry_times, retry_delay: config.retry_delay, + remove_outdated_meta_task: TokioMutex::new(None), config, } } - #[cfg(test)] - fn start_task(&self) -> Result> { - let task = self.build_remove_outdated_meta_task(); - task.start(common_runtime::bg_runtime()) - .context(StartRemoveOutdatedMetaTaskSnafu)?; - Ok(task) - } - - pub fn set_running(&self) { - self.running.store(true, Ordering::Relaxed); - } - /// Build remove outedated meta task pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask { RepeatedTask::new( @@ -425,14 +429,10 @@ impl LocalManager { step: u32, procedure: BoxedProcedure, ) -> Result { - ensure!( - self.running.load(Ordering::Relaxed), - ProcedureManagerNotStartSnafu - ); + ensure!(self.manager_ctx.running(), ManagerNotStartSnafu); let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key())); let runner = Runner { - running: self.running.clone(), meta: meta.clone(), procedure, manager_ctx: self.manager_ctx.clone(), @@ -459,65 +459,8 @@ impl LocalManager { Ok(watcher) } -} - -#[async_trait] -impl ProcedureManager for LocalManager { - fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> { - let mut loaders = self.manager_ctx.loaders.lock().unwrap(); - ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name }); - - let _ = loaders.insert(name.to_string(), loader); - - Ok(()) - } - - fn start(&self) -> Result<()> { - // The previous value should be false - if !self.running.swap(true, Ordering::Relaxed) { - let mut state = self.state.lock().unwrap(); - - let task = state - .remove_outdated_meta_task - .get_or_insert(self.build_remove_outdated_meta_task()); - - task.start(common_runtime::bg_runtime()) - .context(StartRemoveOutdatedMetaTaskSnafu)?; - - self.running.store(true, Ordering::Relaxed); - info!("LocalManager is started."); - } - - Ok(()) - } - - async fn stop(&self) -> Result<()> { - // The previous value should be true - if self.running.swap(false, Ordering::Relaxed) { - let remove_outdated_meta_task = - self.state.lock().unwrap().remove_outdated_meta_task.take(); - - if let Some(task) = remove_outdated_meta_task { - task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?; - } - - self.running.store(false, Ordering::Relaxed); - info!("LocalManager is stopped."); - } - - Ok(()) - } - - async fn submit(&self, procedure: ProcedureWithId) -> Result { - let procedure_id = procedure.id; - ensure!( - !self.manager_ctx.contains_procedure(procedure_id), - DuplicateProcedureSnafu { procedure_id } - ); - - self.submit_root(procedure.id, 0, procedure.procedure) - } + /// Recovers unfinished procedures and reruns them. async fn recover(&self) -> Result<()> { logging::info!("LocalManager start to recover"); let recover_start = Instant::now(); @@ -573,6 +516,60 @@ impl ProcedureManager for LocalManager { Ok(()) } +} + +#[async_trait] +impl ProcedureManager for LocalManager { + fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> { + let mut loaders = self.manager_ctx.loaders.lock().unwrap(); + ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name }); + + let _ = loaders.insert(name.to_string(), loader); + + Ok(()) + } + + async fn start(&self) -> Result<()> { + let mut task = self.remove_outdated_meta_task.lock().await; + + let task_inner = self.build_remove_outdated_meta_task(); + + task_inner + .start(common_runtime::bg_runtime()) + .context(StartRemoveOutdatedMetaTaskSnafu)?; + + *task = Some(task_inner); + + self.manager_ctx.start(); + + info!("LocalManager is start."); + + self.recover().await + } + + async fn stop(&self) -> Result<()> { + let mut task = self.remove_outdated_meta_task.lock().await; + + if let Some(task) = task.take() { + task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?; + } + + self.manager_ctx.stop(); + + info!("LocalManager is stopped."); + + Ok(()) + } + + async fn submit(&self, procedure: ProcedureWithId) -> Result { + let procedure_id = procedure.id; + ensure!( + !self.manager_ctx.contains_procedure(procedure_id), + DuplicateProcedureSnafu { procedure_id } + ); + + self.submit_root(procedure.id, 0, procedure.procedure) + } async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { Ok(self.manager_ctx.state(procedure_id)) @@ -747,7 +744,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.manager_ctx.start(); manager .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) @@ -771,7 +768,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.manager_ctx.start(); manager .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) @@ -820,7 +817,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.manager_ctx.start(); let procedure_id = ProcedureId::random(); assert!(manager @@ -871,7 +868,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.manager_ctx.start(); #[derive(Debug)] struct MockProcedure { @@ -946,10 +943,43 @@ mod tests { }) .await .unwrap_err(), - error::Error::ProcedureManagerNotStart { .. } + error::Error::ManagerNotStart { .. } ); } + #[tokio::test] + async fn test_procedure_manager_restart() { + let dir = create_temp_dir("procedure_manager_restart"); + let config = ManagerConfig { + parent_path: "data/".to_string(), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + ..Default::default() + }; + let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); + let manager = LocalManager::new(config, state_store); + + manager.start().await.unwrap(); + manager.stop().await.unwrap(); + manager.start().await.unwrap(); + + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); + let procedure_id = ProcedureId::random(); + assert!(manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .is_ok()); + assert!(manager + .procedure_state(procedure_id) + .await + .unwrap() + .is_some()); + } + #[tokio::test] async fn test_remove_outdated_meta_task() { let dir = create_temp_dir("remove_outdated_meta_task"); @@ -963,7 +993,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.manager_ctx.set_running(); let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single("test.submit"); @@ -977,7 +1007,8 @@ mod tests { .is_ok()); let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); watcher.changed().await.unwrap(); - let task = manager.start_task().unwrap(); + + manager.start().await.unwrap(); tokio::time::sleep(Duration::from_millis(10)).await; assert!(manager .procedure_state(procedure_id) @@ -986,10 +1017,12 @@ mod tests { .is_none()); // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed. - task.stop().await.unwrap(); + manager.stop().await.unwrap(); let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single("test.submit"); let procedure_id = ProcedureId::random(); + + manager.manager_ctx.set_running(); assert!(manager .submit(ProcedureWithId { id: procedure_id, @@ -1005,5 +1038,27 @@ mod tests { .await .unwrap() .is_some()); + + // After restart + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); + let procedure_id = ProcedureId::random(); + assert!(manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .is_ok()); + let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); + watcher.changed().await.unwrap(); + + manager.start().await.unwrap(); + tokio::time::sleep(Duration::from_millis(10)).await; + assert!(manager + .procedure_state(procedure_id) + .await + .unwrap() + .is_none()); } } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 70f08e5ac1b0..0b50f4497f03 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -103,9 +102,7 @@ impl Drop for ProcedureGuard { } } -// TODO(yingwen): Support cancellation. pub(crate) struct Runner { - pub(crate) running: Arc, pub(crate) meta: ProcedureMetaRef, pub(crate) procedure: BoxedProcedure, pub(crate) manager_ctx: Arc, @@ -118,7 +115,7 @@ pub(crate) struct Runner { impl Runner { /// Return `ProcedureManager` is running. pub(crate) fn running(&self) -> bool { - self.running.load(Ordering::Relaxed) + self.manager_ctx.running() } /// Run the procedure. @@ -154,16 +151,17 @@ impl Runner { // Release locks and notify parent procedure. guard.finish(); - // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure. - if !self.running() { - return; - } - // If this is the root procedure, clean up message cache. if self.meta.parent_id.is_none() { let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta); // Clean resources. self.manager_ctx.on_procedures_finish(&procedure_ids); + + // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure. + if !self.running() { + return; + } + for id in procedure_ids { if let Err(e) = self.store.delete_procedure(id).await { logging::error!( @@ -201,7 +199,7 @@ impl Runner { // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), + error: Arc::new(error::ManagerNotStartSnafu {}.build()), }); return; } @@ -260,7 +258,7 @@ impl Runner { // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), + error: Arc::new(error::ManagerNotStartSnafu {}.build()), }); return ExecResult::Failed; } @@ -299,19 +297,19 @@ impl Runner { e.is_retry_later(), ); - if e.is_retry_later() { - self.meta.set_state(ProcedureState::retrying(Arc::new(e))); - return ExecResult::RetryLater; - } - // Don't store state if `ProcedureManager` is stopped. if !self.running() { self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ProcedureManagerNotStartSnafu {}.build()), + error: Arc::new(error::ManagerNotStartSnafu {}.build()), }); return ExecResult::Failed; } + if e.is_retry_later() { + self.meta.set_state(ProcedureState::retrying(Arc::new(e))); + return ExecResult::RetryLater; + } + // Write rollback key so we can skip this procedure while recovering procedures. self.rollback(Arc::new(e)).await } @@ -343,7 +341,6 @@ impl Runner { procedure.lock_key(), )); let runner = Runner { - running: self.running.clone(), meta: meta.clone(), procedure, manager_ctx: self.manager_ctx.clone(), @@ -510,7 +507,6 @@ mod tests { store: Arc, ) -> Runner { Runner { - running: Arc::new(AtomicBool::new(true)), meta, procedure, manager_ctx: Arc::new(ManagerContext::new()), @@ -618,6 +614,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_continue(), "{res:?}"); @@ -678,6 +675,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(suspend), procedure_store); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_continue(), "{res:?}"); @@ -779,6 +777,7 @@ mod tests { let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone()); let manager_ctx = Arc::new(ManagerContext::new()); + manager_ctx.start(); // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta)); // Replace the manager ctx. @@ -821,6 +820,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_continue(), "{res:?}"); @@ -832,7 +832,7 @@ mod tests { ) .await; - runner.running.store(false, Ordering::Relaxed); + runner.manager_ctx.stop(); let res = runner.execute_once(&ctx).await; assert!(res.is_failed()); // Shouldn't write any files @@ -861,7 +861,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); - runner.running.store(false, Ordering::Relaxed); + runner.manager_ctx.stop(); let res = runner.execute_once(&ctx).await; assert!(res.is_failed(), "{res:?}"); @@ -885,6 +885,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_failed(), "{res:?}"); @@ -926,6 +927,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_retry_later(), "{res:?}"); @@ -963,6 +965,8 @@ mod tests { Box::new(exceed_max_retry_later), procedure_store, ); + runner.manager_ctx.start(); + runner.exponential_builder = ExponentialBuilder::default() .with_min_delay(Duration::from_millis(1)) .with_max_times(3); @@ -1033,8 +1037,8 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store); - let manager_ctx = Arc::new(ManagerContext::new()); + manager_ctx.start(); // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta.clone())); // Replace the manager ctx. diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index c20ddef2b203..54f34b7d7ccf 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -279,8 +279,14 @@ pub trait ProcedureManager: Send + Sync + 'static { /// Registers loader for specific procedure type `name`. fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()>; - fn start(&self) -> Result<()>; + /// Starts the background GC task. + /// + /// Recovers unfinished procedures and reruns them. + /// + /// Callers should ensure all loaders are registered. + async fn start(&self) -> Result<()>; + /// Stops the background GC task. async fn stop(&self) -> Result<()>; /// Submits a procedure to execute. @@ -288,11 +294,6 @@ pub trait ProcedureManager: Send + Sync + 'static { /// Returns a [Watcher] to watch the created procedure. async fn submit(&self, procedure: ProcedureWithId) -> Result; - /// Recovers unfinished procedures and reruns them. - /// - /// Callers should ensure all loaders are registered. - async fn recover(&self) -> Result<()>; - /// Query the procedure state. /// /// Returns `Ok(None)` if the procedure doesn't exist. diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 93e385bc7769..75cf777beece 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -71,7 +71,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); - manager.set_running(); + manager.start().await.unwrap(); #[derive(Debug)] struct MockProcedure { diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index f932ea84e08a..25605af6db19 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -405,12 +405,6 @@ pub enum Error { source: common_procedure::Error, }, - #[snafu(display("Failed to recover procedure"))] - RecoverProcedure { - location: Location, - source: common_procedure::Error, - }, - #[snafu(display("Failed to wait procedure done"))] WaitProcedure { location: Location, @@ -628,9 +622,9 @@ impl ErrorExt for Error { Error::RequestDatanode { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } | Error::InvalidFullTableName { source, .. } => source.status_code(), - Error::RecoverProcedure { source, .. } - | Error::SubmitProcedure { source, .. } - | Error::WaitProcedure { source, .. } => source.status_code(), + Error::SubmitProcedure { source, .. } | Error::WaitProcedure { source, .. } => { + source.status_code() + } Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => { source.status_code() } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 843ea2d1efe6..037925d2c6f9 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -38,8 +38,7 @@ use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{ - InitMetadataSnafu, RecoverProcedureSnafu, Result, StartProcedureManagerSnafu, - StopProcedureManagerSnafu, + InitMetadataSnafu, Result, StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; @@ -228,11 +227,8 @@ impl MetaSrv { ); match msg { LeaderChangeMessage::Elected(_) => { - if let Err(e) = procedure_manager.start() { - error!("Failed to start procedure manager, error: {e}"); - } - if let Err(e) = procedure_manager.recover().await { - error!("Failed to recover procedures, error: {e}"); + if let Err(e) = procedure_manager.start().await { + error!(e; "Failed to start procedure manager"); } let _ = task_handler.start().map_err(|e| { debug!( @@ -242,7 +238,7 @@ impl MetaSrv { } LeaderChangeMessage::StepDown(leader) => { if let Err(e) = procedure_manager.stop().await { - error!("Failed to stop procedure manager, error: {e}"); + error!(e; "Failed to stop procedure manager"); } if let Some(sub_manager) = subscribe_manager.clone() { info!("Leader changed, un_subscribe all"); @@ -270,7 +266,7 @@ impl MetaSrv { } if let Err(e) = procedure_manager.stop().await { - error!("Failed to stop procedure manager, error: {e}"); + error!(e; "Failed to stop procedure manager"); } }); @@ -289,11 +285,8 @@ impl MetaSrv { } else { self.procedure_manager .start() - .context(StartProcedureManagerSnafu)?; - self.procedure_manager - .recover() .await - .context(RecoverProcedureSnafu)?; + .context(StartProcedureManagerSnafu)?; } info!("MetaSrv started"); diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index fc6c5d9c4de5..b5317e04b76d 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -59,7 +59,6 @@ pub(crate) fn create_region_failover_manager() -> Arc { let state_store = Arc::new(KvStateStore::new(KvBackendAdapter::wrap(kv_store.clone()))); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); - procedure_manager.set_running(); let in_memory = Arc::new(MemStore::new()); let meta_peer_client = MetaPeerClientBuilder::default() diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index cf41b9c6765f..1cb34d5a12f0 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -92,7 +92,7 @@ impl GreptimeDbStandaloneBuilder { .init() .await .unwrap(); - procedure_manager.start().unwrap(); + procedure_manager.start().await.unwrap(); let instance = Instance::try_new_standalone( kv_store, procedure_manager, From 1e52d5029f3e31a5b89141bb93171d30f681bc1e Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 17 Oct 2023 07:24:16 +0000 Subject: [PATCH 4/6] chore: apply suggestions from CR --- src/common/procedure/src/local.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 1986d64a94dc..fda20cefde80 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -532,6 +532,10 @@ impl ProcedureManager for LocalManager { async fn start(&self) -> Result<()> { let mut task = self.remove_outdated_meta_task.lock().await; + if task.is_some() { + return Ok(()); + } + let task_inner = self.build_remove_outdated_meta_task(); task_inner From c840b8e7e554d82ddabbe648c32fe73800facbe4 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 17 Oct 2023 09:28:53 +0000 Subject: [PATCH 5/6] feat: add should_report to GreptimeDBTelemetry Signed-off-by: WenyXu --- src/common/greptimedb-telemetry/src/lib.rs | 52 +++++++++++++++++----- src/datanode/src/greptimedb_telemetry.rs | 5 +++ src/meta-srv/src/greptimedb_telemetry.rs | 6 ++- 3 files changed, 52 insertions(+), 11 deletions(-) diff --git a/src/common/greptimedb-telemetry/src/lib.rs b/src/common/greptimedb-telemetry/src/lib.rs index 57d680e40411..817d437d4d8d 100644 --- a/src/common/greptimedb-telemetry/src/lib.rs +++ b/src/common/greptimedb-telemetry/src/lib.rs @@ -15,6 +15,8 @@ use std::env; use std::io::ErrorKind; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::Duration; use common_runtime::error::{Error, Result}; @@ -36,13 +38,26 @@ const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_sec const GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); pub enum GreptimeDBTelemetryTask { - Enable(RepeatedTask), + Enable((RepeatedTask, Arc)), Disable, } impl GreptimeDBTelemetryTask { - pub fn enable(interval: Duration, task_fn: BoxedTaskFunction) -> Self { - GreptimeDBTelemetryTask::Enable(RepeatedTask::new(interval, task_fn)) + pub fn should_report(&self, value: bool) { + match self { + GreptimeDBTelemetryTask::Enable((_, should_report)) => { + should_report.store(value, Ordering::Relaxed); + } + GreptimeDBTelemetryTask::Disable => {} + } + } + + pub fn enable( + interval: Duration, + task_fn: BoxedTaskFunction, + should_report: Arc, + ) -> Self { + GreptimeDBTelemetryTask::Enable((RepeatedTask::new(interval, task_fn), should_report)) } pub fn disable() -> Self { @@ -51,7 +66,7 @@ impl GreptimeDBTelemetryTask { pub fn start(&self) -> Result<()> { match self { - GreptimeDBTelemetryTask::Enable(task) => { + GreptimeDBTelemetryTask::Enable((task, _)) => { print_anonymous_usage_data_disclaimer(); task.start(common_runtime::bg_runtime()) } @@ -61,7 +76,7 @@ impl GreptimeDBTelemetryTask { pub async fn stop(&self) -> Result<()> { match self { - GreptimeDBTelemetryTask::Enable(task) => task.stop().await, + GreptimeDBTelemetryTask::Enable((task, _)) => task.stop().await, GreptimeDBTelemetryTask::Disable => Ok(()), } } @@ -191,6 +206,7 @@ pub struct GreptimeDBTelemetry { client: Option, working_home: Option, telemetry_url: &'static str, + should_report: Arc, } #[async_trait::async_trait] @@ -200,13 +216,19 @@ impl TaskFunction for GreptimeDBTelemetry { } async fn call(&mut self) -> Result<()> { - self.report_telemetry_info().await; + if self.should_report.load(Ordering::Relaxed) { + self.report_telemetry_info().await; + } Ok(()) } } impl GreptimeDBTelemetry { - pub fn new(working_home: Option, statistics: Box) -> Self { + pub fn new( + working_home: Option, + statistics: Box, + should_report: Arc, + ) -> Self { let client = Client::builder() .connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT) .timeout(GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT) @@ -216,6 +238,7 @@ impl GreptimeDBTelemetry { statistics, client: client.ok(), telemetry_url: TELEMETRY_URL, + should_report, } } @@ -250,7 +273,8 @@ impl GreptimeDBTelemetry { mod tests { use std::convert::Infallible; use std::env; - use std::sync::atomic::AtomicUsize; + use std::sync::atomic::{AtomicBool, AtomicUsize}; + use std::sync::Arc; use std::time::Duration; use common_test_util::ports; @@ -370,7 +394,11 @@ mod tests { let working_home = working_home_temp.path().to_str().unwrap().to_string(); let test_statistic = Box::new(TestStatistic); - let mut test_report = GreptimeDBTelemetry::new(Some(working_home.clone()), test_statistic); + let mut test_report = GreptimeDBTelemetry::new( + Some(working_home.clone()), + test_statistic, + Arc::new(AtomicBool::new(true)), + ); let url = Box::leak(format!("{}:{}", "http://localhost", port).into_boxed_str()); test_report.telemetry_url = url; let response = test_report.report_telemetry_info().await.unwrap(); @@ -384,7 +412,11 @@ mod tests { assert_eq!(1, body.nodes.unwrap()); let failed_statistic = Box::new(FailedStatistic); - let mut failed_report = GreptimeDBTelemetry::new(Some(working_home), failed_statistic); + let mut failed_report = GreptimeDBTelemetry::new( + Some(working_home), + failed_statistic, + Arc::new(AtomicBool::new(true)), + ); failed_report.telemetry_url = url; let response = failed_report.report_telemetry_info().await; assert!(response.is_none()); diff --git a/src/datanode/src/greptimedb_telemetry.rs b/src/datanode/src/greptimedb_telemetry.rs index a8043ea9a2d6..c6991f1edeba 100644 --- a/src/datanode/src/greptimedb_telemetry.rs +++ b/src/datanode/src/greptimedb_telemetry.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::AtomicBool; use std::sync::Arc; use async_trait::async_trait; @@ -60,6 +61,8 @@ pub async fn get_greptimedb_telemetry_task( if !enable || cfg!(test) || cfg!(debug_assertions) { return Arc::new(GreptimeDBTelemetryTask::disable()); } + // Always enable. + let should_report = Arc::new(AtomicBool::new(true)); match mode { Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable( @@ -70,7 +73,9 @@ pub async fn get_greptimedb_telemetry_task( uuid: default_get_uuid(&working_home), retry: 0, }), + should_report.clone(), )), + should_report, )), Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()), } diff --git a/src/meta-srv/src/greptimedb_telemetry.rs b/src/meta-srv/src/greptimedb_telemetry.rs index 043a8de4f9d5..9cd9c2f672f0 100644 --- a/src/meta-srv/src/greptimedb_telemetry.rs +++ b/src/meta-srv/src/greptimedb_telemetry.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::AtomicBool; use std::sync::Arc; use async_trait::async_trait; @@ -63,7 +64,8 @@ pub async fn get_greptimedb_telemetry_task( if !enable || cfg!(test) || cfg!(debug_assertions) { return Arc::new(GreptimeDBTelemetryTask::disable()); } - + // Controlled by meta server state, only leader reports the info. + let should_report = Arc::new(AtomicBool::new(false)); Arc::new(GreptimeDBTelemetryTask::enable( TELEMETRY_INTERVAL, Box::new(GreptimeDBTelemetry::new( @@ -73,6 +75,8 @@ pub async fn get_greptimedb_telemetry_task( uuid: default_get_uuid(&working_home), retry: 0, }), + should_report.clone(), )), + should_report, )) } From b6f3434f8208f623e871c0a665b1d2659ef0d5ea Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 17 Oct 2023 10:05:42 +0000 Subject: [PATCH 6/6] refactor: refactor subscribing leader change loop --- src/meta-srv/src/error.rs | 7 ++++ src/meta-srv/src/metasrv.rs | 75 +++++++++++++++++++++++-------------- 2 files changed, 54 insertions(+), 28 deletions(-) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 25605af6db19..0a5334501a09 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -41,6 +41,12 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to start telemetry task"))] + StartTelemetryTask { + location: Location, + source: common_runtime::error::Error, + }, + #[snafu(display("Failed to submit ddl task"))] SubmitDdlTask { location: Location, @@ -634,6 +640,7 @@ impl ErrorExt for Error { Error::ListCatalogs { source, .. } | Error::ListSchemas { source, .. } => { source.status_code() } + Error::StartTelemetryTask { source, .. } => source.status_code(), Error::RegionFailoverCandidatesNotFound { .. } => StatusCode::RuntimeResourcesExhausted, Error::NextSequence { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 037925d2c6f9..e67a6c4e9232 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -28,7 +28,7 @@ use common_meta::sequence::SequenceRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; -use common_telemetry::{debug, error, info, warn}; +use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use snafu::ResultExt; @@ -38,7 +38,8 @@ use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{ - InitMetadataSnafu, Result, StartProcedureManagerSnafu, StopProcedureManagerSnafu, + InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu, + StopProcedureManagerSnafu, }; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; @@ -171,6 +172,37 @@ pub struct SelectorContext { pub type SelectorRef = Arc>>; pub type ElectionRef = Arc>; +pub struct MetaStateHandler { + procedure_manager: ProcedureManagerRef, + subscribe_manager: Option, + greptimedb_telemetry_task: Arc, +} + +impl MetaStateHandler { + pub async fn on_become_leader(&self) { + if let Err(e) = self.procedure_manager.start().await { + error!(e; "Failed to start procedure manager"); + } + self.greptimedb_telemetry_task.should_report(true); + } + + pub async fn on_become_follower(&self) { + // Stops the procedures. + if let Err(e) = self.procedure_manager.stop().await { + error!(e; "Failed to stop procedure manager"); + } + // Suspends reporting. + self.greptimedb_telemetry_task.should_report(false); + + if let Some(sub_manager) = self.subscribe_manager.clone() { + info!("Leader changed, un_subscribe all"); + if let Err(e) = sub_manager.un_subscribe_all() { + error!("Failed to un_subscribe all, error: {}", e); + } + } + } +} + #[derive(Clone)] pub struct MetaSrv { started: Arc, @@ -214,7 +246,15 @@ impl MetaSrv { let leader_cached_kv_store = self.leader_cached_kv_store.clone(); let subscribe_manager = self.subscribe_manager(); let mut rx = election.subscribe_leader_change(); - let task_handler = self.greptimedb_telemetry_task.clone(); + let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone(); + greptimedb_telemetry_task + .start() + .context(StartTelemetryTaskSnafu)?; + let state_handler = MetaStateHandler { + greptimedb_telemetry_task, + subscribe_manager, + procedure_manager, + }; let _handle = common_runtime::spawn_bg(async move { loop { match rx.recv().await { @@ -227,31 +267,12 @@ impl MetaSrv { ); match msg { LeaderChangeMessage::Elected(_) => { - if let Err(e) = procedure_manager.start().await { - error!(e; "Failed to start procedure manager"); - } - let _ = task_handler.start().map_err(|e| { - debug!( - "Failed to start greptimedb telemetry task, error: {e}" - ); - }); + state_handler.on_become_leader().await; } LeaderChangeMessage::StepDown(leader) => { - if let Err(e) = procedure_manager.stop().await { - error!(e; "Failed to stop procedure manager"); - } - if let Some(sub_manager) = subscribe_manager.clone() { - info!("Leader changed, un_subscribe all"); - if let Err(e) = sub_manager.un_subscribe_all() { - error!("Failed to un_subscribe all, error: {}", e); - } - } error!("Leader :{:?} step down", leader); - let _ = task_handler.stop().await.map_err(|e| { - debug!( - "Failed to stop greptimedb telemetry task, error: {e}" - ); - }); + + state_handler.on_become_follower().await; } } } @@ -265,9 +286,7 @@ impl MetaSrv { } } - if let Err(e) = procedure_manager.stop().await { - error!(e; "Failed to stop procedure manager"); - } + state_handler.on_become_follower().await; }); let election = election.clone();