Skip to content

Commit

Permalink
feat: add maintenance mode (#1586)
Browse files Browse the repository at this point in the history
  • Loading branch information
xifyang committed Sep 4, 2023
1 parent a12ee5c commit f064ef0
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use crate::rpc::router::{region_distribution, RegionRoute};
use crate::DatanodeId;

pub const REMOVED_PREFIX: &str = "__removed";
pub const MAINTENANCE_KEY: &str = "maintenance_key";

const NAME_PATTERN: &str = "[a-zA-Z_:-][a-zA-Z0-9_:-]*";

Expand Down
8 changes: 8 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to parse bool: {}, source: {}", err_msg, source))]
ParseBool {
err_msg: String,
source: std::str::ParseBoolError,
location: Location,
},

#[snafu(display("Invalid arguments: {}", err_msg))]
InvalidArguments { err_msg: String, location: Location },

Expand Down Expand Up @@ -570,6 +577,7 @@ impl ErrorExt for Error {
| Error::InvalidLeaseKey { .. }
| Error::InvalidStatKey { .. }
| Error::ParseNum { .. }
| Error::ParseBool { .. }
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. }
| Error::InvalidHeartbeatRequest { .. }
Expand Down
69 changes: 69 additions & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ impl HeartbeatHandler for RegionFailureHandler {

#[cfg(test)]
mod tests {

use common_meta::key::MAINTENANCE_KEY;

use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
Expand Down Expand Up @@ -161,4 +164,70 @@ mod tests {
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_maintenance_mode() {
let region_failover_manager = create_region_failover_manager();
let kv_store = region_failover_manager
.create_context()
.selector_ctx
.kv_store
.clone();
let handler = RegionFailureHandler::try_new(None, region_failover_manager.clone())
.await
.unwrap();

let req = &HeartbeatRequest::default();

let builder = MetaSrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let mut ctx = metasrv.new_ctx();
ctx.is_infancy = false;

let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
id: region_id,
table_ident: TableIdent {
catalog: "a".to_string(),
schema: "b".to_string(),
table: "c".to_string(),
table_id: 0,
engine: "d".to_string(),
},
rcus: 0,
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
}
}
acc.stat = Some(Stat {
cluster_id: 1,
id: 42,
region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)],
timestamp_millis: 1000,
..Default::default()
});

handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 3);

let kv_req = common_meta::rpc::store::PutRequest {
key: MAINTENANCE_KEY.to_string().into_bytes(),
value: vec![],
prev_kv: false,
};

let _ = kv_store.put(kv_req.clone()).await.unwrap();

tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let dump = handler.failure_detect_runner.dump().await;
let maintenance_node = region_failover_manager.is_maintenance_node().await.unwrap();
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
assert!(maintenance_node);

let _ = kv_store.delete(kv_req.key.as_slice(), false).await.unwrap();
let maintenance_node = region_failover_manager.is_maintenance_node().await.unwrap();
assert!(!maintenance_node);
}
}
6 changes: 6 additions & 0 deletions src/meta-srv/src/handler/failure_handler/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ impl FailureDetectRunner {
})
.collect::<Vec<RegionIdent>>();

if region_failover_manager.is_maintenance_node().await.unwrap() {
failed_regions
.iter()
.for_each(|r| failure_detectors.remove(r));
}

for r in failed_regions {
if let Err(e) = region_failover_manager.do_region_failover(&r).await {
error!(e; "Failed to do region failover for {r}");
Expand Down
12 changes: 11 additions & 1 deletion src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::time::Duration;
use async_trait::async_trait;
use common_meta::ident::TableIdent;
use common_meta::key::datanode_table::DatanodeTableKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY};
use common_meta::{ClusterId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
Expand Down Expand Up @@ -234,6 +234,16 @@ impl RegionFailoverManager {
})
.unwrap_or_default())
}
#[allow(dead_code)]
pub(crate) async fn is_maintenance_node(&self) -> Result<bool> {
let key = MAINTENANCE_KEY.to_string().into_bytes();

self.selector_ctx
.kv_store
.clone()
.exists(key.as_slice())
.await
}
}

/// A "Node" in the state machine of region failover procedure.
Expand Down
8 changes: 8 additions & 0 deletions src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod health;
mod heartbeat;
mod leader;
mod maintenance;
mod meta;
mod node_lease;
mod route;
Expand Down Expand Up @@ -89,6 +90,13 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
},
);

let router = router.route(
"/set-maintenance",
maintenance::MaintenanceHandler {
kv_store: meta_srv.kv_store().clone(),
},
);

let router = Router::nest("/admin", router);

Admin::new(router)
Expand Down
61 changes: 61 additions & 0 deletions src/meta-srv/src/service/admin/maintenance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use common_meta::key::MAINTENANCE_KEY;
use common_meta::rpc::store::PutRequest;
use snafu::{OptionExt, ResultExt};
use tonic::codegen::http;

use crate::error::{self, Result};
use crate::service::admin::HttpHandler;
use crate::service::store::kv::KvStoreRef;
pub struct MaintenanceHandler {
pub kv_store: KvStoreRef,
}

#[async_trait::async_trait]
impl HttpHandler for MaintenanceHandler {
async fn handle(
&self,
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let switch_on = params
.get("switch_on")
.map(|on| on.parse::<bool>())
.context(error::MissingRequiredParameterSnafu { param: "switch-on" })?
.context(error::ParseBoolSnafu {
err_msg: "`switch_on` is not a valid bool",
})?;

let req = PutRequest {
key: MAINTENANCE_KEY.to_string().into_bytes(),
value: vec![],
prev_kv: false,
};

if switch_on {
self.kv_store.put(req).await?;
} else {
self.kv_store.delete(req.key.as_slice(), false).await?;
}

http::Response::builder()
.status(http::StatusCode::OK)
.body(format!("metasvc is succeed to be set maintenance mode",))
.context(error::InvalidHttpBodySnafu)
}
}

0 comments on commit f064ef0

Please sign in to comment.