Skip to content

Commit

Permalink
feat: stop the procedure manager if a new leader is elected
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Oct 11, 2023
1 parent d7aeb36 commit e201f83
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 33 deletions.
22 changes: 21 additions & 1 deletion src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,24 @@ pub enum Error {
source: common_meta::error::Error,
},

#[snafu(display("Failed to start procedure manager"))]
StartProcedureManager {
location: Location,
source: common_procedure::error::Error,
},

#[snafu(display("Failed to stop procedure manager"))]
StopProcedureManager {
location: Location,
source: common_procedure::error::Error,
},

#[snafu(display("Failed to start procedures"))]
RecoverProcedures {
location: Location,
source: common_procedure::error::Error,
},

#[snafu(display("Failed to start datanode"))]
StartDatanode {
location: Location,
Expand Down Expand Up @@ -203,7 +221,9 @@ impl ErrorExt for Error {
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::ConnectEtcd { .. } => StatusCode::InvalidArguments,

Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. }
| Error::RecoverProcedures { source, .. } => source.status_code(),
Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal,
Error::RequestDatabase { source, .. } => source.status_code(),
Error::CollectRecordBatches { source, .. }
Expand Down
28 changes: 24 additions & 4 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ use servers::Mode;
use snafu::ResultExt;

use crate::error::{
IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFrontendSnafu,
IllegalConfigSnafu, InitMetadataSnafu, RecoverProceduresSnafu, Result, ShutdownDatanodeSnafu,
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StopProcedureManagerSnafu,
};
use crate::options::{MixOptions, Options, TopLevelOptions};

Expand Down Expand Up @@ -159,6 +160,7 @@ impl StandaloneOptions {
pub struct Instance {
datanode: Datanode,
frontend: FeInstance,
procedure_manager: ProcedureManagerRef,
}

impl Instance {
Expand All @@ -167,6 +169,15 @@ impl Instance {
self.datanode.start().await.context(StartDatanodeSnafu)?;
info!("Datanode instance started");

self.procedure_manager
.start()
.context(StartProcedureManagerSnafu)?;

self.procedure_manager
.recover()
.await
.context(RecoverProceduresSnafu)?;

self.frontend.start().await.context(StartFrontendSnafu)?;
Ok(())
}
Expand All @@ -177,6 +188,11 @@ impl Instance {
.await
.context(ShutdownFrontendSnafu)?;

self.procedure_manager
.stop()
.await
.context(StopProcedureManagerSnafu)?;

self.datanode
.shutdown()
.await
Expand Down Expand Up @@ -342,7 +358,7 @@ impl StartCommand {
let mut frontend = build_frontend(
fe_plugins,
kv_store,
procedure_manager,
procedure_manager.clone(),
catalog_manager,
region_server,
)
Expand All @@ -353,7 +369,11 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

Ok(Instance { datanode, frontend })
Ok(Instance {
datanode,
frontend,
procedure_manager,
})
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub enum Error {
#[snafu(display("Loader {} is already registered", name))]
LoaderConflict { name: String, location: Location },

#[snafu(display("Procedure Manager is stopped"))]
ProcedureManagerStop { location: Location },

#[snafu(display("Failed to serialize to json"))]
ToJson {
#[snafu(source)]
Expand Down Expand Up @@ -148,7 +151,8 @@ impl ErrorExt for Error {
| Error::FromJson { .. }
| Error::RetryTimesExceeded { .. }
| Error::RetryLater { .. }
| Error::WaitWatcher { .. } => StatusCode::Internal,
| Error::WaitWatcher { .. }
| Error::ProcedureManagerStop { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/procedure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Common traits and structures for the procedure framework.
#![feature(assert_matches)]

pub mod error;
pub mod local;
pub mod options;
Expand Down
128 changes: 107 additions & 21 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod lock;
mod runner;

use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};

Expand All @@ -28,8 +29,8 @@ use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::Notify;

use crate::error::{
DuplicateProcedureSnafu, Error, LoaderConflictSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
StopRemoveOutdatedMetaTaskSnafu,
DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ProcedureManagerStopSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
};
use crate::local::lock::LockMap;
use crate::local::runner::Runner;
Expand Down Expand Up @@ -368,38 +369,70 @@ pub struct LocalManager {
procedure_store: Arc<ProcedureStore>,
max_retry_times: usize,
retry_delay: Duration,
remove_outdated_meta_task: RepeatedTask<Error>,
state: Arc<Mutex<LocalManagerState>>,
running: Arc<AtomicBool>,
config: ManagerConfig,
}

#[derive(Debug, Default)]
pub struct LocalManagerState {
remove_outdated_meta_task: Option<RepeatedTask<Error>>,
}

impl LocalManager {
/// Create a new [LocalManager] with specific `config`.
pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager {
let manager_ctx = Arc::new(ManagerContext::new());
let remove_outdated_meta_task = RepeatedTask::new(
config.remove_outdated_meta_task_interval,
Box::new(RemoveOutdatedMetaFunction {
manager_ctx: manager_ctx.clone(),
ttl: config.remove_outdated_meta_ttl,
}),
);

LocalManager {
running: Arc::new(AtomicBool::new(false)),
state: Arc::new(Mutex::new(LocalManagerState::default())),
manager_ctx,
procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
max_retry_times: config.max_retry_times,
retry_delay: config.retry_delay,
remove_outdated_meta_task,
config,
}
}

#[cfg(test)]
fn start_task(&self) -> Result<RepeatedTask<Error>> {
let task = self.build_remove_outdated_meta_task();
task.start(common_runtime::bg_runtime())
.context(StartRemoveOutdatedMetaTaskSnafu)?;
Ok(task)
}

pub fn set_running(&self) {
self.running.store(true, Ordering::Relaxed);
}

/// Build remove outedated meta task
pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
RepeatedTask::new(
self.config.remove_outdated_meta_task_interval,
Box::new(RemoveOutdatedMetaFunction {
manager_ctx: self.manager_ctx.clone(),
ttl: self.config.remove_outdated_meta_ttl,
}),
)
}

/// Submit a root procedure with given `procedure_id`.
fn submit_root(
&self,
procedure_id: ProcedureId,
step: u32,
procedure: BoxedProcedure,
) -> Result<Watcher> {
ensure!(
self.running.load(Ordering::Relaxed),
ProcedureManagerStopSnafu
);

let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key()));
let runner = Runner {
running: self.running.clone(),
meta: meta.clone(),
procedure,
manager_ctx: self.manager_ctx.clone(),
Expand Down Expand Up @@ -440,17 +473,36 @@ impl ProcedureManager for LocalManager {
}

fn start(&self) -> Result<()> {
self.remove_outdated_meta_task
.start(common_runtime::bg_runtime())
.context(StartRemoveOutdatedMetaTaskSnafu)?;
if !self.running.load(Ordering::Relaxed) {
let mut state = self.state.lock().unwrap();

let task = state
.remove_outdated_meta_task
.get_or_insert(self.build_remove_outdated_meta_task());

task.start(common_runtime::bg_runtime())
.context(StartRemoveOutdatedMetaTaskSnafu)?;

self.running.store(true, Ordering::Relaxed);
logging::info!("LocalManager is started.");
}

Ok(())
}

async fn stop(&self) -> Result<()> {
self.remove_outdated_meta_task
.stop()
.await
.context(StopRemoveOutdatedMetaTaskSnafu)?;
if self.running.load(Ordering::Relaxed) {
let remove_outdated_meta_task =
self.state.lock().unwrap().remove_outdated_meta_task.take();

if let Some(task) = remove_outdated_meta_task {
task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
}

self.running.store(false, Ordering::Relaxed);
logging::info!("LocalManager is stopped.");
}

Ok(())
}

Expand Down Expand Up @@ -569,12 +621,14 @@ pub(crate) mod test_util {

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;

use super::*;
use crate::error::Error;
use crate::error::{self, Error};
use crate::store::state_store::ObjectStateStore;
use crate::{Context, Procedure, Status};

Expand Down Expand Up @@ -691,6 +745,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
manager.set_running();

manager
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
Expand All @@ -714,6 +769,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);
manager.set_running();

manager
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
Expand Down Expand Up @@ -762,6 +818,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
manager.set_running();

let procedure_id = ProcedureId::random();
assert!(manager
Expand Down Expand Up @@ -812,6 +869,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
manager.set_running();

#[derive(Debug)]
struct MockProcedure {
Expand Down Expand Up @@ -863,6 +921,33 @@ mod tests {
check_procedure(MockProcedure { panic: true }).await;
}

#[tokio::test]
async fn test_procedure_manager_stopped() {
let dir = create_temp_dir("procedure_manager_stopped");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);

let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
let procedure_id = ProcedureId::random();
assert_matches!(
manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.unwrap_err(),
error::Error::ProcedureManagerStop { .. }
);
}

#[tokio::test]
async fn test_remove_outdated_meta_task() {
let dir = create_temp_dir("remove_outdated_meta_task");
Expand All @@ -876,6 +961,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);
manager.set_running();

let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
Expand All @@ -889,7 +975,7 @@ mod tests {
.is_ok());
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
watcher.changed().await.unwrap();
manager.start().unwrap();
let task = manager.start_task().unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(manager
.procedure_state(procedure_id)
Expand All @@ -898,7 +984,7 @@ mod tests {
.is_none());

// The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed.
manager.stop().await.unwrap();
task.stop().await.unwrap();
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
let procedure_id = ProcedureId::random();
Expand Down
Loading

0 comments on commit e201f83

Please sign in to comment.