diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index ae8c363fcd92..40cad2e44ec8 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -37,6 +37,18 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to start procedure manager"))] + StartProcedureManager { + location: Location, + source: common_procedure::error::Error, + }, + + #[snafu(display("Failed to stop procedure manager"))] + StopProcedureManager { + location: Location, + source: common_procedure::error::Error, + }, + #[snafu(display("Failed to start datanode"))] StartDatanode { location: Location, @@ -241,7 +253,8 @@ impl ErrorExt for Error { | Error::CreateDir { .. } | Error::EmptyResult { .. } | Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments, - + Error::StartProcedureManager { source, .. } + | Error::StopProcedureManager { source, .. } => source.status_code(), Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, Error::RequestDatabase { source, .. } => source.status_code(), Error::CollectRecordBatches { source, .. } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 86bafc546348..6b313ffa6ec1 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -43,7 +43,8 @@ use snafu::ResultExt; use crate::error::{ CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, - ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, + ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, + StopProcedureManagerSnafu, }; use crate::options::{MixOptions, Options, TopLevelOptions}; @@ -163,6 +164,7 @@ impl StandaloneOptions { pub struct Instance { datanode: Datanode, frontend: FeInstance, + procedure_manager: ProcedureManagerRef, } impl Instance { @@ -171,6 +173,11 @@ impl Instance { self.datanode.start().await.context(StartDatanodeSnafu)?; info!("Datanode instance started"); + self.procedure_manager + .start() + .await + .context(StartProcedureManagerSnafu)?; + self.frontend.start().await.context(StartFrontendSnafu)?; Ok(()) } @@ -181,6 +188,11 @@ impl Instance { .await .context(ShutdownFrontendSnafu)?; + self.procedure_manager + .stop() + .await + .context(StopProcedureManagerSnafu)?; + self.datanode .shutdown() .await @@ -354,7 +366,7 @@ impl StartCommand { let mut frontend = build_frontend( fe_plugins, kv_store, - procedure_manager, + procedure_manager.clone(), catalog_manager, region_server, ) @@ -365,7 +377,11 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - Ok(Instance { datanode, frontend }) + Ok(Instance { + datanode, + frontend, + procedure_manager, + }) } } diff --git a/src/common/greptimedb-telemetry/src/lib.rs b/src/common/greptimedb-telemetry/src/lib.rs index 57d680e40411..817d437d4d8d 100644 --- a/src/common/greptimedb-telemetry/src/lib.rs +++ b/src/common/greptimedb-telemetry/src/lib.rs @@ -15,6 +15,8 @@ use std::env; use std::io::ErrorKind; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::Duration; use common_runtime::error::{Error, Result}; @@ -36,13 +38,26 @@ const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_sec const GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); pub enum GreptimeDBTelemetryTask { - Enable(RepeatedTask), + Enable((RepeatedTask, Arc)), Disable, } impl GreptimeDBTelemetryTask { - pub fn enable(interval: Duration, task_fn: BoxedTaskFunction) -> Self { - GreptimeDBTelemetryTask::Enable(RepeatedTask::new(interval, task_fn)) + pub fn should_report(&self, value: bool) { + match self { + GreptimeDBTelemetryTask::Enable((_, should_report)) => { + should_report.store(value, Ordering::Relaxed); + } + GreptimeDBTelemetryTask::Disable => {} + } + } + + pub fn enable( + interval: Duration, + task_fn: BoxedTaskFunction, + should_report: Arc, + ) -> Self { + GreptimeDBTelemetryTask::Enable((RepeatedTask::new(interval, task_fn), should_report)) } pub fn disable() -> Self { @@ -51,7 +66,7 @@ impl GreptimeDBTelemetryTask { pub fn start(&self) -> Result<()> { match self { - GreptimeDBTelemetryTask::Enable(task) => { + GreptimeDBTelemetryTask::Enable((task, _)) => { print_anonymous_usage_data_disclaimer(); task.start(common_runtime::bg_runtime()) } @@ -61,7 +76,7 @@ impl GreptimeDBTelemetryTask { pub async fn stop(&self) -> Result<()> { match self { - GreptimeDBTelemetryTask::Enable(task) => task.stop().await, + GreptimeDBTelemetryTask::Enable((task, _)) => task.stop().await, GreptimeDBTelemetryTask::Disable => Ok(()), } } @@ -191,6 +206,7 @@ pub struct GreptimeDBTelemetry { client: Option, working_home: Option, telemetry_url: &'static str, + should_report: Arc, } #[async_trait::async_trait] @@ -200,13 +216,19 @@ impl TaskFunction for GreptimeDBTelemetry { } async fn call(&mut self) -> Result<()> { - self.report_telemetry_info().await; + if self.should_report.load(Ordering::Relaxed) { + self.report_telemetry_info().await; + } Ok(()) } } impl GreptimeDBTelemetry { - pub fn new(working_home: Option, statistics: Box) -> Self { + pub fn new( + working_home: Option, + statistics: Box, + should_report: Arc, + ) -> Self { let client = Client::builder() .connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT) .timeout(GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT) @@ -216,6 +238,7 @@ impl GreptimeDBTelemetry { statistics, client: client.ok(), telemetry_url: TELEMETRY_URL, + should_report, } } @@ -250,7 +273,8 @@ impl GreptimeDBTelemetry { mod tests { use std::convert::Infallible; use std::env; - use std::sync::atomic::AtomicUsize; + use std::sync::atomic::{AtomicBool, AtomicUsize}; + use std::sync::Arc; use std::time::Duration; use common_test_util::ports; @@ -370,7 +394,11 @@ mod tests { let working_home = working_home_temp.path().to_str().unwrap().to_string(); let test_statistic = Box::new(TestStatistic); - let mut test_report = GreptimeDBTelemetry::new(Some(working_home.clone()), test_statistic); + let mut test_report = GreptimeDBTelemetry::new( + Some(working_home.clone()), + test_statistic, + Arc::new(AtomicBool::new(true)), + ); let url = Box::leak(format!("{}:{}", "http://localhost", port).into_boxed_str()); test_report.telemetry_url = url; let response = test_report.report_telemetry_info().await.unwrap(); @@ -384,7 +412,11 @@ mod tests { assert_eq!(1, body.nodes.unwrap()); let failed_statistic = Box::new(FailedStatistic); - let mut failed_report = GreptimeDBTelemetry::new(Some(working_home), failed_statistic); + let mut failed_report = GreptimeDBTelemetry::new( + Some(working_home), + failed_statistic, + Arc::new(AtomicBool::new(true)), + ); failed_report.telemetry_url = url; let response = failed_report.report_telemetry_info().await; assert!(response.is_none()); diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 46252c729232..d1a980d7023e 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -258,7 +258,6 @@ impl DeserializedValueWithBytes { self.bytes.to_vec() } - #[cfg(feature = "testing")] /// Notes: used for test purpose. pub fn from_inner(inner: T) -> Self { let bytes = serde_json::to_vec(&inner).unwrap(); diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 1cd63decd282..25a974df86c6 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -34,6 +34,9 @@ pub enum Error { #[snafu(display("Loader {} is already registered", name))] LoaderConflict { name: String, location: Location }, + #[snafu(display("Procedure Manager is stopped"))] + ManagerNotStart { location: Location }, + #[snafu(display("Failed to serialize to json"))] ToJson { #[snafu(source)] @@ -148,7 +151,8 @@ impl ErrorExt for Error { | Error::FromJson { .. } | Error::RetryTimesExceeded { .. } | Error::RetryLater { .. } - | Error::WaitWatcher { .. } => StatusCode::Internal, + | Error::WaitWatcher { .. } + | Error::ManagerNotStart { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 7a542c415786..544d5d80367a 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -14,6 +14,8 @@ //! Common traits and structures for the procedure framework. +#![feature(assert_matches)] + pub mod error; pub mod local; pub mod options; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index e32d5f90e8af..fda20cefde80 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -16,20 +16,21 @@ mod lock; mod runner; use std::collections::{HashMap, VecDeque}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; use async_trait::async_trait; use backon::ExponentialBuilder; use common_runtime::{RepeatedTask, TaskFunction}; -use common_telemetry::logging; +use common_telemetry::{info, logging}; use snafu::{ensure, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; -use tokio::sync::Notify; +use tokio::sync::{Mutex as TokioMutex, Notify}; use crate::error::{ - DuplicateProcedureSnafu, Error, LoaderConflictSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, - StopRemoveOutdatedMetaTaskSnafu, + DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result, + StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, }; use crate::local::lock::LockMap; use crate::local::runner::Runner; @@ -135,6 +136,8 @@ pub(crate) struct ManagerContext { messages: Mutex>, /// Ids and finished time of finished procedures. finished_procedures: Mutex>, + /// Running flag. + running: Arc, } #[async_trait] @@ -153,9 +156,29 @@ impl ManagerContext { procedures: RwLock::new(HashMap::new()), messages: Mutex::new(HashMap::new()), finished_procedures: Mutex::new(VecDeque::new()), + running: Arc::new(AtomicBool::new(false)), } } + #[cfg(test)] + pub(crate) fn set_running(&self) { + self.running.store(true, Ordering::Relaxed); + } + + /// Set the running flag. + pub(crate) fn start(&self) { + self.running.store(true, Ordering::Relaxed); + } + + pub(crate) fn stop(&self) { + self.running.store(false, Ordering::Relaxed); + } + + /// Return `ProcedureManager` is running. + pub(crate) fn running(&self) -> bool { + self.running.load(Ordering::Relaxed) + } + /// Returns true if the procedure with specific `procedure_id` exists. fn contains_procedure(&self, procedure_id: ProcedureId) -> bool { let procedures = self.procedures.read().unwrap(); @@ -368,29 +391,37 @@ pub struct LocalManager { procedure_store: Arc, max_retry_times: usize, retry_delay: Duration, - remove_outdated_meta_task: RepeatedTask, + /// GC task. + remove_outdated_meta_task: TokioMutex>>, + config: ManagerConfig, } impl LocalManager { /// Create a new [LocalManager] with specific `config`. pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager { let manager_ctx = Arc::new(ManagerContext::new()); - let remove_outdated_meta_task = RepeatedTask::new( - config.remove_outdated_meta_task_interval, - Box::new(RemoveOutdatedMetaFunction { - manager_ctx: manager_ctx.clone(), - ttl: config.remove_outdated_meta_ttl, - }), - ); + LocalManager { manager_ctx, procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)), max_retry_times: config.max_retry_times, retry_delay: config.retry_delay, - remove_outdated_meta_task, + remove_outdated_meta_task: TokioMutex::new(None), + config, } } + /// Build remove outedated meta task + pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask { + RepeatedTask::new( + self.config.remove_outdated_meta_task_interval, + Box::new(RemoveOutdatedMetaFunction { + manager_ctx: self.manager_ctx.clone(), + ttl: self.config.remove_outdated_meta_ttl, + }), + ) + } + /// Submit a root procedure with given `procedure_id`. fn submit_root( &self, @@ -398,6 +429,8 @@ impl LocalManager { step: u32, procedure: BoxedProcedure, ) -> Result { + ensure!(self.manager_ctx.running(), ManagerNotStartSnafu); + let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key())); let runner = Runner { meta: meta.clone(), @@ -426,44 +459,8 @@ impl LocalManager { Ok(watcher) } -} - -#[async_trait] -impl ProcedureManager for LocalManager { - fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> { - let mut loaders = self.manager_ctx.loaders.lock().unwrap(); - ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name }); - - let _ = loaders.insert(name.to_string(), loader); - - Ok(()) - } - - fn start(&self) -> Result<()> { - self.remove_outdated_meta_task - .start(common_runtime::bg_runtime()) - .context(StartRemoveOutdatedMetaTaskSnafu)?; - Ok(()) - } - - async fn stop(&self) -> Result<()> { - self.remove_outdated_meta_task - .stop() - .await - .context(StopRemoveOutdatedMetaTaskSnafu)?; - Ok(()) - } - - async fn submit(&self, procedure: ProcedureWithId) -> Result { - let procedure_id = procedure.id; - ensure!( - !self.manager_ctx.contains_procedure(procedure_id), - DuplicateProcedureSnafu { procedure_id } - ); - - self.submit_root(procedure.id, 0, procedure.procedure) - } + /// Recovers unfinished procedures and reruns them. async fn recover(&self) -> Result<()> { logging::info!("LocalManager start to recover"); let recover_start = Instant::now(); @@ -519,6 +516,64 @@ impl ProcedureManager for LocalManager { Ok(()) } +} + +#[async_trait] +impl ProcedureManager for LocalManager { + fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> { + let mut loaders = self.manager_ctx.loaders.lock().unwrap(); + ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name }); + + let _ = loaders.insert(name.to_string(), loader); + + Ok(()) + } + + async fn start(&self) -> Result<()> { + let mut task = self.remove_outdated_meta_task.lock().await; + + if task.is_some() { + return Ok(()); + } + + let task_inner = self.build_remove_outdated_meta_task(); + + task_inner + .start(common_runtime::bg_runtime()) + .context(StartRemoveOutdatedMetaTaskSnafu)?; + + *task = Some(task_inner); + + self.manager_ctx.start(); + + info!("LocalManager is start."); + + self.recover().await + } + + async fn stop(&self) -> Result<()> { + let mut task = self.remove_outdated_meta_task.lock().await; + + if let Some(task) = task.take() { + task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?; + } + + self.manager_ctx.stop(); + + info!("LocalManager is stopped."); + + Ok(()) + } + + async fn submit(&self, procedure: ProcedureWithId) -> Result { + let procedure_id = procedure.id; + ensure!( + !self.manager_ctx.contains_procedure(procedure_id), + DuplicateProcedureSnafu { procedure_id } + ); + + self.submit_root(procedure.id, 0, procedure.procedure) + } async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { Ok(self.manager_ctx.state(procedure_id)) @@ -569,12 +624,14 @@ pub(crate) mod test_util { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + use common_error::mock::MockError; use common_error::status_code::StatusCode; use common_test_util::temp_dir::create_temp_dir; use super::*; - use crate::error::Error; + use crate::error::{self, Error}; use crate::store::state_store::ObjectStateStore; use crate::{Context, Procedure, Status}; @@ -691,6 +748,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); + manager.manager_ctx.start(); manager .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) @@ -714,6 +772,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let manager = LocalManager::new(config, state_store); + manager.manager_ctx.start(); manager .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) @@ -762,6 +821,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); + manager.manager_ctx.start(); let procedure_id = ProcedureId::random(); assert!(manager @@ -812,6 +872,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); + manager.manager_ctx.start(); #[derive(Debug)] struct MockProcedure { @@ -863,6 +924,66 @@ mod tests { check_procedure(MockProcedure { panic: true }).await; } + #[tokio::test] + async fn test_procedure_manager_stopped() { + let dir = create_temp_dir("procedure_manager_stopped"); + let config = ManagerConfig { + parent_path: "data/".to_string(), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + ..Default::default() + }; + let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); + let manager = LocalManager::new(config, state_store); + + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); + let procedure_id = ProcedureId::random(); + assert_matches!( + manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .unwrap_err(), + error::Error::ManagerNotStart { .. } + ); + } + + #[tokio::test] + async fn test_procedure_manager_restart() { + let dir = create_temp_dir("procedure_manager_restart"); + let config = ManagerConfig { + parent_path: "data/".to_string(), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + ..Default::default() + }; + let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); + let manager = LocalManager::new(config, state_store); + + manager.start().await.unwrap(); + manager.stop().await.unwrap(); + manager.start().await.unwrap(); + + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); + let procedure_id = ProcedureId::random(); + assert!(manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .is_ok()); + assert!(manager + .procedure_state(procedure_id) + .await + .unwrap() + .is_some()); + } + #[tokio::test] async fn test_remove_outdated_meta_task() { let dir = create_temp_dir("remove_outdated_meta_task"); @@ -876,6 +997,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let manager = LocalManager::new(config, state_store); + manager.manager_ctx.set_running(); let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single("test.submit"); @@ -889,7 +1011,8 @@ mod tests { .is_ok()); let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); watcher.changed().await.unwrap(); - manager.start().unwrap(); + + manager.start().await.unwrap(); tokio::time::sleep(Duration::from_millis(10)).await; assert!(manager .procedure_state(procedure_id) @@ -902,6 +1025,8 @@ mod tests { let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single("test.submit"); let procedure_id = ProcedureId::random(); + + manager.manager_ctx.set_running(); assert!(manager .submit(ProcedureWithId { id: procedure_id, @@ -917,5 +1042,27 @@ mod tests { .await .unwrap() .is_some()); + + // After restart + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); + let procedure_id = ProcedureId::random(); + assert!(manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .is_ok()); + let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); + watcher.changed().await.unwrap(); + + manager.start().await.unwrap(); + tokio::time::sleep(Duration::from_millis(10)).await; + assert!(manager + .procedure_state(procedure_id) + .await + .unwrap() + .is_none()); } } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 3e997562663a..0b50f4497f03 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -19,7 +19,7 @@ use backon::{BackoffBuilder, ExponentialBuilder}; use common_telemetry::logging; use tokio::time; -use crate::error::{ProcedurePanicSnafu, Result}; +use crate::error::{self, ProcedurePanicSnafu, Result}; use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; use crate::store::ProcedureStore; use crate::ProcedureState::Retrying; @@ -102,7 +102,6 @@ impl Drop for ProcedureGuard { } } -// TODO(yingwen): Support cancellation. pub(crate) struct Runner { pub(crate) meta: ProcedureMetaRef, pub(crate) procedure: BoxedProcedure, @@ -114,6 +113,11 @@ pub(crate) struct Runner { } impl Runner { + /// Return `ProcedureManager` is running. + pub(crate) fn running(&self) -> bool { + self.manager_ctx.running() + } + /// Run the procedure. pub(crate) async fn run(mut self) { // Ensure we can update the procedure state. @@ -152,6 +156,12 @@ impl Runner { let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta); // Clean resources. self.manager_ctx.on_procedures_finish(&procedure_ids); + + // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure. + if !self.running() { + return; + } + for id in procedure_ids { if let Err(e) = self.store.delete_procedure(id).await { logging::error!( @@ -186,6 +196,13 @@ impl Runner { let mut retry = self.exponential_builder.build(); let mut retry_times = 0; loop { + // Don't store state if `ProcedureManager` is stopped. + if !self.running() { + self.meta.set_state(ProcedureState::Failed { + error: Arc::new(error::ManagerNotStartSnafu {}.build()), + }); + return; + } match self.execute_once(ctx).await { ExecResult::Done | ExecResult::Failed => return, ExecResult::Continue => (), @@ -238,6 +255,14 @@ impl Runner { status.need_persist(), ); + // Don't store state if `ProcedureManager` is stopped. + if !self.running() { + self.meta.set_state(ProcedureState::Failed { + error: Arc::new(error::ManagerNotStartSnafu {}.build()), + }); + return ExecResult::Failed; + } + if status.need_persist() { if let Err(err) = self.persist_procedure().await { self.meta.set_state(ProcedureState::retrying(Arc::new(err))); @@ -272,6 +297,14 @@ impl Runner { e.is_retry_later(), ); + // Don't store state if `ProcedureManager` is stopped. + if !self.running() { + self.meta.set_state(ProcedureState::Failed { + error: Arc::new(error::ManagerNotStartSnafu {}.build()), + }); + return ExecResult::Failed; + } + if e.is_retry_later() { self.meta.set_state(ProcedureState::retrying(Arc::new(e))); return ExecResult::RetryLater; @@ -581,6 +614,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_continue(), "{res:?}"); @@ -641,6 +675,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(suspend), procedure_store); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_continue(), "{res:?}"); @@ -742,6 +777,7 @@ mod tests { let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone()); let manager_ctx = Arc::new(ManagerContext::new()); + manager_ctx.start(); // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta)); // Replace the manager ctx. @@ -769,6 +805,70 @@ mod tests { } } + #[tokio::test] + async fn test_running_is_stopped() { + let exec_fn = move |_| async move { Ok(Status::Executing { persist: true }) }.boxed(); + let normal = ProcedureAdapter { + data: "normal".to_string(), + lock_key: LockKey::single("catalog.schema.table"), + exec_fn, + }; + + let dir = create_temp_dir("test_running_is_stopped"); + let meta = normal.new_meta(ROOT_ID); + let ctx = context_without_provider(meta.id); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); + runner.manager_ctx.start(); + + let res = runner.execute_once(&ctx).await; + assert!(res.is_continue(), "{res:?}"); + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + &["0000000000.step"], + ) + .await; + + runner.manager_ctx.stop(); + let res = runner.execute_once(&ctx).await; + assert!(res.is_failed()); + // Shouldn't write any files + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + &["0000000000.step"], + ) + .await; + } + + #[tokio::test] + async fn test_running_is_stopped_on_error() { + let exec_fn = + |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed(); + let normal = ProcedureAdapter { + data: "fail".to_string(), + lock_key: LockKey::single("catalog.schema.table"), + exec_fn, + }; + + let dir = create_temp_dir("test_running_is_stopped_on_error"); + let meta = normal.new_meta(ROOT_ID); + let ctx = context_without_provider(meta.id); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); + runner.manager_ctx.stop(); + + let res = runner.execute_once(&ctx).await; + assert!(res.is_failed(), "{res:?}"); + // Shouldn't write any files + check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await; + } + #[tokio::test] async fn test_execute_on_error() { let exec_fn = @@ -785,6 +885,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_failed(), "{res:?}"); @@ -826,6 +927,7 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone()); + runner.manager_ctx.start(); let res = runner.execute_once(&ctx).await; assert!(res.is_retry_later(), "{res:?}"); @@ -863,6 +965,8 @@ mod tests { Box::new(exceed_max_retry_later), procedure_store, ); + runner.manager_ctx.start(); + runner.exponential_builder = ExponentialBuilder::default() .with_min_delay(Duration::from_millis(1)) .with_max_times(3); @@ -933,8 +1037,8 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store); - let manager_ctx = Arc::new(ManagerContext::new()); + manager_ctx.start(); // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta.clone())); // Replace the manager ctx. diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index c20ddef2b203..54f34b7d7ccf 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -279,8 +279,14 @@ pub trait ProcedureManager: Send + Sync + 'static { /// Registers loader for specific procedure type `name`. fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()>; - fn start(&self) -> Result<()>; + /// Starts the background GC task. + /// + /// Recovers unfinished procedures and reruns them. + /// + /// Callers should ensure all loaders are registered. + async fn start(&self) -> Result<()>; + /// Stops the background GC task. async fn stop(&self) -> Result<()>; /// Submits a procedure to execute. @@ -288,11 +294,6 @@ pub trait ProcedureManager: Send + Sync + 'static { /// Returns a [Watcher] to watch the created procedure. async fn submit(&self, procedure: ProcedureWithId) -> Result; - /// Recovers unfinished procedures and reruns them. - /// - /// Callers should ensure all loaders are registered. - async fn recover(&self) -> Result<()>; - /// Query the procedure state. /// /// Returns `Ok(None)` if the procedure doesn't exist. diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 576c3d420b3c..75cf777beece 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -71,6 +71,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); + manager.start().await.unwrap(); #[derive(Debug)] struct MockProcedure { diff --git a/src/datanode/src/greptimedb_telemetry.rs b/src/datanode/src/greptimedb_telemetry.rs index a8043ea9a2d6..c6991f1edeba 100644 --- a/src/datanode/src/greptimedb_telemetry.rs +++ b/src/datanode/src/greptimedb_telemetry.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::AtomicBool; use std::sync::Arc; use async_trait::async_trait; @@ -60,6 +61,8 @@ pub async fn get_greptimedb_telemetry_task( if !enable || cfg!(test) || cfg!(debug_assertions) { return Arc::new(GreptimeDBTelemetryTask::disable()); } + // Always enable. + let should_report = Arc::new(AtomicBool::new(true)); match mode { Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable( @@ -70,7 +73,9 @@ pub async fn get_greptimedb_telemetry_task( uuid: default_get_uuid(&working_home), retry: 0, }), + should_report.clone(), )), + should_report, )), Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()), } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index b24aa46e4d28..6c71f92375a0 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -111,8 +111,7 @@ impl MetaSrvInstance { .await .context(error::SendShutdownSignalSnafu)?; } - - self.meta_srv.shutdown(); + self.meta_srv.shutdown().await?; self.http_srv .shutdown() .await diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 0835039d12bf..0a5334501a09 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -41,6 +41,12 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to start telemetry task"))] + StartTelemetryTask { + location: Location, + source: common_runtime::error::Error, + }, + #[snafu(display("Failed to submit ddl task"))] SubmitDdlTask { location: Location, @@ -393,8 +399,14 @@ pub enum Error { #[snafu(display("Missing required parameter, param: {:?}", param))] MissingRequiredParameter { param: String }, - #[snafu(display("Failed to recover procedure"))] - RecoverProcedure { + #[snafu(display("Failed to start procedure manager"))] + StartProcedureManager { + location: Location, + source: common_procedure::Error, + }, + + #[snafu(display("Failed to stop procedure manager"))] + StopProcedureManager { location: Location, source: common_procedure::Error, }, @@ -616,16 +628,19 @@ impl ErrorExt for Error { Error::RequestDatanode { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } | Error::InvalidFullTableName { source, .. } => source.status_code(), - Error::RecoverProcedure { source, .. } - | Error::SubmitProcedure { source, .. } - | Error::WaitProcedure { source, .. } => source.status_code(), + Error::SubmitProcedure { source, .. } | Error::WaitProcedure { source, .. } => { + source.status_code() + } Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => { source.status_code() } + Error::StartProcedureManager { source, .. } + | Error::StopProcedureManager { source, .. } => source.status_code(), Error::ListCatalogs { source, .. } | Error::ListSchemas { source, .. } => { source.status_code() } + Error::StartTelemetryTask { source, .. } => source.status_code(), Error::RegionFailoverCandidatesNotFound { .. } => StatusCode::RuntimeResourcesExhausted, Error::NextSequence { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/greptimedb_telemetry.rs b/src/meta-srv/src/greptimedb_telemetry.rs index 043a8de4f9d5..9cd9c2f672f0 100644 --- a/src/meta-srv/src/greptimedb_telemetry.rs +++ b/src/meta-srv/src/greptimedb_telemetry.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::AtomicBool; use std::sync::Arc; use async_trait::async_trait; @@ -63,7 +64,8 @@ pub async fn get_greptimedb_telemetry_task( if !enable || cfg!(test) || cfg!(debug_assertions) { return Arc::new(GreptimeDBTelemetryTask::disable()); } - + // Controlled by meta server state, only leader reports the info. + let should_report = Arc::new(AtomicBool::new(false)); Arc::new(GreptimeDBTelemetryTask::enable( TELEMETRY_INTERVAL, Box::new(GreptimeDBTelemetry::new( @@ -73,6 +75,8 @@ pub async fn get_greptimedb_telemetry_task( uuid: default_get_uuid(&working_home), retry: 0, }), + should_report.clone(), )), + should_report, )) } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 90ae491bc9cb..e67a6c4e9232 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -28,7 +28,7 @@ use common_meta::sequence::SequenceRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; -use common_telemetry::{debug, error, info, warn}; +use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use snafu::ResultExt; @@ -37,7 +37,10 @@ use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; -use crate::error::{InitMetadataSnafu, RecoverProcedureSnafu, Result}; +use crate::error::{ + InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu, + StopProcedureManagerSnafu, +}; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; @@ -169,6 +172,37 @@ pub struct SelectorContext { pub type SelectorRef = Arc>>; pub type ElectionRef = Arc>; +pub struct MetaStateHandler { + procedure_manager: ProcedureManagerRef, + subscribe_manager: Option, + greptimedb_telemetry_task: Arc, +} + +impl MetaStateHandler { + pub async fn on_become_leader(&self) { + if let Err(e) = self.procedure_manager.start().await { + error!(e; "Failed to start procedure manager"); + } + self.greptimedb_telemetry_task.should_report(true); + } + + pub async fn on_become_follower(&self) { + // Stops the procedures. + if let Err(e) = self.procedure_manager.stop().await { + error!(e; "Failed to stop procedure manager"); + } + // Suspends reporting. + self.greptimedb_telemetry_task.should_report(false); + + if let Some(sub_manager) = self.subscribe_manager.clone() { + info!("Leader changed, un_subscribe all"); + if let Err(e) = sub_manager.un_subscribe_all() { + error!("Failed to un_subscribe all, error: {}", e); + } + } + } +} + #[derive(Clone)] pub struct MetaSrv { started: Arc, @@ -212,7 +246,15 @@ impl MetaSrv { let leader_cached_kv_store = self.leader_cached_kv_store.clone(); let subscribe_manager = self.subscribe_manager(); let mut rx = election.subscribe_leader_change(); - let task_handler = self.greptimedb_telemetry_task.clone(); + let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone(); + greptimedb_telemetry_task + .start() + .context(StartTelemetryTaskSnafu)?; + let state_handler = MetaStateHandler { + greptimedb_telemetry_task, + subscribe_manager, + procedure_manager, + }; let _handle = common_runtime::spawn_bg(async move { loop { match rx.recv().await { @@ -225,28 +267,12 @@ impl MetaSrv { ); match msg { LeaderChangeMessage::Elected(_) => { - if let Err(e) = procedure_manager.recover().await { - error!("Failed to recover procedures, error: {e}"); - } - let _ = task_handler.start().map_err(|e| { - debug!( - "Failed to start greptimedb telemetry task, error: {e}" - ); - }); + state_handler.on_become_leader().await; } LeaderChangeMessage::StepDown(leader) => { - if let Some(sub_manager) = subscribe_manager.clone() { - info!("Leader changed, un_subscribe all"); - if let Err(e) = sub_manager.un_subscribe_all() { - error!("Failed to un_subscribe all, error: {}", e); - } - } error!("Leader :{:?} step down", leader); - let _ = task_handler.stop().await.map_err(|e| { - debug!( - "Failed to stop greptimedb telemetry task, error: {e}" - ); - }); + + state_handler.on_become_follower().await; } } } @@ -259,6 +285,8 @@ impl MetaSrv { } } } + + state_handler.on_become_follower().await; }); let election = election.clone(); @@ -275,9 +303,9 @@ impl MetaSrv { }); } else { self.procedure_manager - .recover() + .start() .await - .context(RecoverProcedureSnafu)?; + .context(StartProcedureManagerSnafu)?; } info!("MetaSrv started"); @@ -291,8 +319,12 @@ impl MetaSrv { .context(InitMetadataSnafu) } - pub fn shutdown(&self) { + pub async fn shutdown(&self) -> Result<()> { self.started.store(false, Ordering::Relaxed); + self.procedure_manager + .stop() + .await + .context(StopProcedureManagerSnafu) } #[inline] diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index f17dd303fb38..1cb34d5a12f0 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -92,7 +92,7 @@ impl GreptimeDbStandaloneBuilder { .init() .await .unwrap(); - + procedure_manager.start().await.unwrap(); let instance = Instance::try_new_standalone( kv_store, procedure_manager,