Skip to content

Commit

Permalink
feat: add maintenance mode (#1586)
Browse files Browse the repository at this point in the history
  • Loading branch information
xifyang committed Sep 14, 2023
1 parent a7df5a7 commit e3f6ec8
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ use crate::rpc::router::{region_distribution, RegionRoute};
use crate::DatanodeId;

pub const REMOVED_PREFIX: &str = "__removed";
pub const MAINTENANCE_KEY: &[u8] = "maintenance_key".as_bytes();

const NAME_PATTERN: &str = "[a-zA-Z_:-][a-zA-Z0-9_:-]*";

Expand Down
8 changes: 8 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to parse bool: {}, source: {}", err_msg, source))]
ParseBool {
err_msg: String,
source: std::str::ParseBoolError,
location: Location,
},

#[snafu(display("Invalid arguments: {}", err_msg))]
InvalidArguments { err_msg: String, location: Location },

Expand Down Expand Up @@ -570,6 +577,7 @@ impl ErrorExt for Error {
| Error::InvalidStatKey { .. }
| Error::InvalidInactiveRegionKey { .. }
| Error::ParseNum { .. }
| Error::ParseBool { .. }
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. }
| Error::InvalidHeartbeatRequest { .. }
Expand Down
61 changes: 61 additions & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ impl HeartbeatHandler for RegionFailureHandler {

#[cfg(test)]
mod tests {

use common_meta::key::MAINTENANCE_KEY;

use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
Expand Down Expand Up @@ -155,4 +158,62 @@ mod tests {
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_maintenance_mode() {
let region_failover_manager = create_region_failover_manager();
let in_memory = region_failover_manager.create_context().in_memory.clone();
let handler = RegionFailureHandler::try_new(None, region_failover_manager.clone())
.await
.unwrap();

let req = &HeartbeatRequest::default();

let builder = MetaSrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let mut ctx = metasrv.new_ctx();
ctx.is_infancy = false;

let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
id: region_id,
rcus: 0,
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
}
}
acc.stat = Some(Stat {
cluster_id: 1,
id: 42,
region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)],
timestamp_millis: 1000,
..Default::default()
});

handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 3);

let kv_req = common_meta::rpc::store::PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};

let _ = in_memory.put(kv_req.clone()).await.unwrap();

tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let dump = handler.failure_detect_runner.dump().await;
let maintenance_node = region_failover_manager.is_maintenance_node().await.unwrap();
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
assert!(maintenance_node);

let _ = in_memory
.delete(kv_req.key.as_slice(), false)
.await
.unwrap();
let maintenance_node = region_failover_manager.is_maintenance_node().await.unwrap();
assert!(!maintenance_node);
}
}
11 changes: 10 additions & 1 deletion src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use api::v1::meta::Peer;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY};
use common_meta::rpc::store::PutRequest;
use common_meta::sequence::SequenceRef;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
Expand Down Expand Up @@ -288,6 +289,14 @@ impl MetaSrv {
.context(RecoverProcedureSnafu)?;
}

if self.kv_store.exists(MAINTENANCE_KEY).await? {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.in_memory.put(req).await?;
}
info!("MetaSrv started");
Ok(())
}
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::time::Duration;
use async_trait::async_trait;
use common_meta::ident::TableIdent;
use common_meta::key::datanode_table::DatanodeTableKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY};
use common_meta::{ClusterId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
Expand Down Expand Up @@ -158,6 +158,10 @@ impl RegionFailoverManager {
}

pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> {
if self.is_maintenance_node().await? {
return Ok(());
}

let Some(guard) = self.insert_running_procedures(failed_region) else {
warn!("Region failover procedure for region {failed_region} is already running!");
return Ok(());
Expand Down Expand Up @@ -236,6 +240,10 @@ impl RegionFailoverManager {
})
.unwrap_or_default())
}
#[allow(dead_code)]
pub(crate) async fn is_maintenance_node(&self) -> Result<bool> {
self.in_memory.exists(MAINTENANCE_KEY).await
}
}

/// A "Node" in the state machine of region failover procedure.
Expand Down
9 changes: 9 additions & 0 deletions src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod health;
mod heartbeat;
mod inactive_regions;
mod leader;
mod maintenance;
mod meta;
mod node_lease;
mod route;
Expand Down Expand Up @@ -105,6 +106,14 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
},
);

let router = router.route(
"/set-maintenance",
maintenance::MaintenanceHandler {
kv_store: meta_srv.kv_store().clone(),
in_memory: meta_srv.in_memory().clone(),
},
);

let router = Router::nest("/admin", router);

Admin::new(router)
Expand Down
64 changes: 64 additions & 0 deletions src/meta-srv/src/service/admin/maintenance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use common_meta::key::MAINTENANCE_KEY;
use common_meta::rpc::store::PutRequest;
use snafu::{OptionExt, ResultExt};
use tonic::codegen::http;

use crate::error::{self, Result};
use crate::service::admin::HttpHandler;
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
pub struct MaintenanceHandler {
pub kv_store: KvStoreRef,
pub in_memory: ResettableKvStoreRef,
}

#[async_trait::async_trait]
impl HttpHandler for MaintenanceHandler {
async fn handle(
&self,
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let switch_on = params
.get("switch_on")
.map(|on| on.parse::<bool>())
.context(error::MissingRequiredParameterSnafu { param: "switch-on" })?
.context(error::ParseBoolSnafu {
err_msg: "`switch_on` is not a valid bool",
})?;

let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};

if switch_on {
self.kv_store.put(req.clone()).await?;
self.in_memory.put(req).await?;
} else {
self.kv_store.delete(MAINTENANCE_KEY, false).await?;
self.in_memory.delete(MAINTENANCE_KEY, false).await?;
}

http::Response::builder()
.status(http::StatusCode::OK)
.body(format!("metasvc is succeed to be set maintenance mode",))
.context(error::InvalidHttpBodySnafu)
}
}

0 comments on commit e3f6ec8

Please sign in to comment.