Skip to content

Commit

Permalink
feat: stop the procedure manager if a new leader is elected (#2576)
Browse files Browse the repository at this point in the history
* feat: stop the procedure manager if a new leader is elected

* chore: apply suggestions from CR

* chore: apply suggestions

* chore: apply suggestions from CR

* feat: add should_report to GreptimeDBTelemetry

Signed-off-by: WenyXu <[email protected]>

* refactor: refactor subscribing leader change loop

---------

Signed-off-by: WenyXu <[email protected]>
  • Loading branch information
WenyXu authored Oct 18, 2023
1 parent 3217b56 commit dcd481e
Show file tree
Hide file tree
Showing 16 changed files with 485 additions and 111 deletions.
15 changes: 14 additions & 1 deletion src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ pub enum Error {
source: common_meta::error::Error,
},

#[snafu(display("Failed to start procedure manager"))]
StartProcedureManager {
location: Location,
source: common_procedure::error::Error,
},

#[snafu(display("Failed to stop procedure manager"))]
StopProcedureManager {
location: Location,
source: common_procedure::error::Error,
},

#[snafu(display("Failed to start datanode"))]
StartDatanode {
location: Location,
Expand Down Expand Up @@ -241,7 +253,8 @@ impl ErrorExt for Error {
| Error::CreateDir { .. }
| Error::EmptyResult { .. }
| Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments,

Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal,
Error::RequestDatabase { source, .. } => source.status_code(),
Error::CollectRecordBatches { source, .. }
Expand Down
22 changes: 19 additions & 3 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use snafu::ResultExt;

use crate::error::{
CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu,
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StopProcedureManagerSnafu,
};
use crate::options::{MixOptions, Options, TopLevelOptions};

Expand Down Expand Up @@ -163,6 +164,7 @@ impl StandaloneOptions {
pub struct Instance {
datanode: Datanode,
frontend: FeInstance,
procedure_manager: ProcedureManagerRef,
}

impl Instance {
Expand All @@ -171,6 +173,11 @@ impl Instance {
self.datanode.start().await.context(StartDatanodeSnafu)?;
info!("Datanode instance started");

self.procedure_manager
.start()
.await
.context(StartProcedureManagerSnafu)?;

self.frontend.start().await.context(StartFrontendSnafu)?;
Ok(())
}
Expand All @@ -181,6 +188,11 @@ impl Instance {
.await
.context(ShutdownFrontendSnafu)?;

self.procedure_manager
.stop()
.await
.context(StopProcedureManagerSnafu)?;

self.datanode
.shutdown()
.await
Expand Down Expand Up @@ -354,7 +366,7 @@ impl StartCommand {
let mut frontend = build_frontend(
fe_plugins,
kv_store,
procedure_manager,
procedure_manager.clone(),
catalog_manager,
region_server,
)
Expand All @@ -365,7 +377,11 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

Ok(Instance { datanode, frontend })
Ok(Instance {
datanode,
frontend,
procedure_manager,
})
}
}

Expand Down
52 changes: 42 additions & 10 deletions src/common/greptimedb-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::env;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use common_runtime::error::{Error, Result};
Expand All @@ -36,13 +38,26 @@ const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_sec
const GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);

pub enum GreptimeDBTelemetryTask {
Enable(RepeatedTask<Error>),
Enable((RepeatedTask<Error>, Arc<AtomicBool>)),
Disable,
}

impl GreptimeDBTelemetryTask {
pub fn enable(interval: Duration, task_fn: BoxedTaskFunction<Error>) -> Self {
GreptimeDBTelemetryTask::Enable(RepeatedTask::new(interval, task_fn))
pub fn should_report(&self, value: bool) {
match self {
GreptimeDBTelemetryTask::Enable((_, should_report)) => {
should_report.store(value, Ordering::Relaxed);
}
GreptimeDBTelemetryTask::Disable => {}
}
}

pub fn enable(
interval: Duration,
task_fn: BoxedTaskFunction<Error>,
should_report: Arc<AtomicBool>,
) -> Self {
GreptimeDBTelemetryTask::Enable((RepeatedTask::new(interval, task_fn), should_report))
}

pub fn disable() -> Self {
Expand All @@ -51,7 +66,7 @@ impl GreptimeDBTelemetryTask {

pub fn start(&self) -> Result<()> {
match self {
GreptimeDBTelemetryTask::Enable(task) => {
GreptimeDBTelemetryTask::Enable((task, _)) => {
print_anonymous_usage_data_disclaimer();
task.start(common_runtime::bg_runtime())
}
Expand All @@ -61,7 +76,7 @@ impl GreptimeDBTelemetryTask {

pub async fn stop(&self) -> Result<()> {
match self {
GreptimeDBTelemetryTask::Enable(task) => task.stop().await,
GreptimeDBTelemetryTask::Enable((task, _)) => task.stop().await,
GreptimeDBTelemetryTask::Disable => Ok(()),
}
}
Expand Down Expand Up @@ -191,6 +206,7 @@ pub struct GreptimeDBTelemetry {
client: Option<Client>,
working_home: Option<String>,
telemetry_url: &'static str,
should_report: Arc<AtomicBool>,
}

#[async_trait::async_trait]
Expand All @@ -200,13 +216,19 @@ impl TaskFunction<Error> for GreptimeDBTelemetry {
}

async fn call(&mut self) -> Result<()> {
self.report_telemetry_info().await;
if self.should_report.load(Ordering::Relaxed) {
self.report_telemetry_info().await;
}
Ok(())
}
}

impl GreptimeDBTelemetry {
pub fn new(working_home: Option<String>, statistics: Box<dyn Collector + Send + Sync>) -> Self {
pub fn new(
working_home: Option<String>,
statistics: Box<dyn Collector + Send + Sync>,
should_report: Arc<AtomicBool>,
) -> Self {
let client = Client::builder()
.connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT)
.timeout(GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT)
Expand All @@ -216,6 +238,7 @@ impl GreptimeDBTelemetry {
statistics,
client: client.ok(),
telemetry_url: TELEMETRY_URL,
should_report,
}
}

Expand Down Expand Up @@ -250,7 +273,8 @@ impl GreptimeDBTelemetry {
mod tests {
use std::convert::Infallible;
use std::env;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;

use common_test_util::ports;
Expand Down Expand Up @@ -370,7 +394,11 @@ mod tests {
let working_home = working_home_temp.path().to_str().unwrap().to_string();

let test_statistic = Box::new(TestStatistic);
let mut test_report = GreptimeDBTelemetry::new(Some(working_home.clone()), test_statistic);
let mut test_report = GreptimeDBTelemetry::new(
Some(working_home.clone()),
test_statistic,
Arc::new(AtomicBool::new(true)),
);
let url = Box::leak(format!("{}:{}", "http://localhost", port).into_boxed_str());
test_report.telemetry_url = url;
let response = test_report.report_telemetry_info().await.unwrap();
Expand All @@ -384,7 +412,11 @@ mod tests {
assert_eq!(1, body.nodes.unwrap());

let failed_statistic = Box::new(FailedStatistic);
let mut failed_report = GreptimeDBTelemetry::new(Some(working_home), failed_statistic);
let mut failed_report = GreptimeDBTelemetry::new(
Some(working_home),
failed_statistic,
Arc::new(AtomicBool::new(true)),
);
failed_report.telemetry_url = url;
let response = failed_report.report_telemetry_info().await;
assert!(response.is_none());
Expand Down
1 change: 0 additions & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ impl<T: Serialize + DeserializeOwned> DeserializedValueWithBytes<T> {
self.bytes.to_vec()
}

#[cfg(feature = "testing")]
/// Notes: used for test purpose.
pub fn from_inner(inner: T) -> Self {
let bytes = serde_json::to_vec(&inner).unwrap();
Expand Down
6 changes: 5 additions & 1 deletion src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub enum Error {
#[snafu(display("Loader {} is already registered", name))]
LoaderConflict { name: String, location: Location },

#[snafu(display("Procedure Manager is stopped"))]
ManagerNotStart { location: Location },

#[snafu(display("Failed to serialize to json"))]
ToJson {
#[snafu(source)]
Expand Down Expand Up @@ -148,7 +151,8 @@ impl ErrorExt for Error {
| Error::FromJson { .. }
| Error::RetryTimesExceeded { .. }
| Error::RetryLater { .. }
| Error::WaitWatcher { .. } => StatusCode::Internal,
| Error::WaitWatcher { .. }
| Error::ManagerNotStart { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/procedure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Common traits and structures for the procedure framework.
#![feature(assert_matches)]

pub mod error;
pub mod local;
pub mod options;
Expand Down
Loading

0 comments on commit dcd481e

Please sign in to comment.