From e3f6ec8b00140b418d31f2da544b736cf4f9b0d6 Mon Sep 17 00:00:00 2001 From: xifyang Date: Sun, 20 Aug 2023 13:58:04 +0800 Subject: [PATCH] feat: add maintenance mode (#1586) --- src/common/meta/src/key.rs | 1 + src/meta-srv/src/error.rs | 8 +++ src/meta-srv/src/handler/failure_handler.rs | 61 ++++++++++++++++++ src/meta-srv/src/metasrv.rs | 11 +++- src/meta-srv/src/procedure/region_failover.rs | 10 ++- src/meta-srv/src/service/admin.rs | 9 +++ src/meta-srv/src/service/admin/maintenance.rs | 64 +++++++++++++++++++ 7 files changed, 162 insertions(+), 2 deletions(-) create mode 100644 src/meta-srv/src/service/admin/maintenance.rs diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 66b80f56b128..8cab551a3bdb 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -78,6 +78,7 @@ use crate::rpc::router::{region_distribution, RegionRoute}; use crate::DatanodeId; pub const REMOVED_PREFIX: &str = "__removed"; +pub const MAINTENANCE_KEY: &[u8] = "maintenance_key".as_bytes(); const NAME_PATTERN: &str = "[a-zA-Z_:-][a-zA-Z0-9_:-]*"; diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 9d96b333ad47..5f41a8b65169 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -222,6 +222,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse bool: {}, source: {}", err_msg, source))] + ParseBool { + err_msg: String, + source: std::str::ParseBoolError, + location: Location, + }, + #[snafu(display("Invalid arguments: {}", err_msg))] InvalidArguments { err_msg: String, location: Location }, @@ -570,6 +577,7 @@ impl ErrorExt for Error { | Error::InvalidStatKey { .. } | Error::InvalidInactiveRegionKey { .. } | Error::ParseNum { .. } + | Error::ParseBool { .. } | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } | Error::InvalidHeartbeatRequest { .. } diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 8676113fbcf1..df0906028262 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -106,6 +106,9 @@ impl HeartbeatHandler for RegionFailureHandler { #[cfg(test)] mod tests { + + use common_meta::key::MAINTENANCE_KEY; + use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; @@ -155,4 +158,62 @@ mod tests { let dump = handler.failure_detect_runner.dump().await; assert_eq!(dump.iter().collect::>().len(), 0); } + #[tokio::test(flavor = "multi_thread")] + async fn test_maintenance_mode() { + let region_failover_manager = create_region_failover_manager(); + let in_memory = region_failover_manager.create_context().in_memory.clone(); + let handler = RegionFailureHandler::try_new(None, region_failover_manager.clone()) + .await + .unwrap(); + + let req = &HeartbeatRequest::default(); + + let builder = MetaSrvBuilder::new(); + let metasrv = builder.build().await.unwrap(); + let mut ctx = metasrv.new_ctx(); + ctx.is_infancy = false; + + let acc = &mut HeartbeatAccumulator::default(); + fn new_region_stat(region_id: u64) -> RegionStat { + RegionStat { + id: region_id, + rcus: 0, + wcus: 0, + approximate_bytes: 0, + approximate_rows: 0, + } + } + acc.stat = Some(Stat { + cluster_id: 1, + id: 42, + region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)], + timestamp_millis: 1000, + ..Default::default() + }); + + handler.handle(req, &mut ctx, acc).await.unwrap(); + let dump = handler.failure_detect_runner.dump().await; + assert_eq!(dump.iter().collect::>().len(), 3); + + let kv_req = common_meta::rpc::store::PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; + + let _ = in_memory.put(kv_req.clone()).await.unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + let dump = handler.failure_detect_runner.dump().await; + let maintenance_node = region_failover_manager.is_maintenance_node().await.unwrap(); + assert_eq!(dump.iter().collect::>().len(), 0); + assert!(maintenance_node); + + let _ = in_memory + .delete(kv_req.key.as_slice(), false) + .await + .unwrap(); + let maintenance_node = region_failover_manager.is_maintenance_node().await.unwrap(); + assert!(!maintenance_node); + } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 727f6a0c4187..fe0be805994b 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -22,7 +22,8 @@ use api::v1::meta::Peer; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::DdlTaskExecutorRef; -use common_meta::key::TableMetadataManagerRef; +use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY}; +use common_meta::rpc::store::PutRequest; use common_meta::sequence::SequenceRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; @@ -288,6 +289,14 @@ impl MetaSrv { .context(RecoverProcedureSnafu)?; } + if self.kv_store.exists(MAINTENANCE_KEY).await? { + let req = PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; + self.in_memory.put(req).await?; + } info!("MetaSrv started"); Ok(()) } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index d328c9f112d0..0d8644c52713 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -27,7 +27,7 @@ use std::time::Duration; use async_trait::async_trait; use common_meta::ident::TableIdent; use common_meta::key::datanode_table::DatanodeTableKey; -use common_meta::key::TableMetadataManagerRef; +use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY}; use common_meta::{ClusterId, RegionIdent}; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -158,6 +158,10 @@ impl RegionFailoverManager { } pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> { + if self.is_maintenance_node().await? { + return Ok(()); + } + let Some(guard) = self.insert_running_procedures(failed_region) else { warn!("Region failover procedure for region {failed_region} is already running!"); return Ok(()); @@ -236,6 +240,10 @@ impl RegionFailoverManager { }) .unwrap_or_default()) } + #[allow(dead_code)] + pub(crate) async fn is_maintenance_node(&self) -> Result { + self.in_memory.exists(MAINTENANCE_KEY).await + } } /// A "Node" in the state machine of region failover procedure. diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 53184355303d..5919ea5b6336 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -16,6 +16,7 @@ mod health; mod heartbeat; mod inactive_regions; mod leader; +mod maintenance; mod meta; mod node_lease; mod route; @@ -105,6 +106,14 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { }, ); + let router = router.route( + "/set-maintenance", + maintenance::MaintenanceHandler { + kv_store: meta_srv.kv_store().clone(), + in_memory: meta_srv.in_memory().clone(), + }, + ); + let router = Router::nest("/admin", router); Admin::new(router) diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs new file mode 100644 index 000000000000..a29edc435618 --- /dev/null +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_meta::key::MAINTENANCE_KEY; +use common_meta::rpc::store::PutRequest; +use snafu::{OptionExt, ResultExt}; +use tonic::codegen::http; + +use crate::error::{self, Result}; +use crate::service::admin::HttpHandler; +use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; +pub struct MaintenanceHandler { + pub kv_store: KvStoreRef, + pub in_memory: ResettableKvStoreRef, +} + +#[async_trait::async_trait] +impl HttpHandler for MaintenanceHandler { + async fn handle( + &self, + _: &str, + params: &HashMap, + ) -> Result> { + let switch_on = params + .get("switch_on") + .map(|on| on.parse::()) + .context(error::MissingRequiredParameterSnafu { param: "switch-on" })? + .context(error::ParseBoolSnafu { + err_msg: "`switch_on` is not a valid bool", + })?; + + let req = PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; + + if switch_on { + self.kv_store.put(req.clone()).await?; + self.in_memory.put(req).await?; + } else { + self.kv_store.delete(MAINTENANCE_KEY, false).await?; + self.in_memory.delete(MAINTENANCE_KEY, false).await?; + } + + http::Response::builder() + .status(http::StatusCode::OK) + .body(format!("metasvc is succeed to be set maintenance mode",)) + .context(error::InvalidHttpBodySnafu) + } +}