Skip to content

Commit

Permalink
feat: add maintenance mode (#1586)
Browse files Browse the repository at this point in the history
  • Loading branch information
xifyang committed Aug 28, 2023
1 parent c56f5e3 commit 1ca01fd
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 3 deletions.
14 changes: 13 additions & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use crate::rpc::router::{region_distribution, RegionRoute};
use crate::DatanodeId;

pub const REMOVED_PREFIX: &str = "__removed";
pub const MAINTENANCE_PREFIX: &str = "__maintenance";

const NAME_PATTERN: &str = "[a-zA-Z_:-][a-zA-Z0-9_:-]*";

Expand Down Expand Up @@ -123,6 +124,10 @@ pub fn to_removed_key(key: &str) -> String {
format!("{REMOVED_PREFIX}-{key}")
}

pub fn to_maintenance_key(key: &str) -> String {
format!("{MAINTENANCE_PREFIX}-{key}")
}

pub trait TableMetaKey {
fn as_raw_key(&self) -> Vec<u8>;
}
Expand Down Expand Up @@ -517,7 +522,7 @@ mod tests {
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::{to_removed_key, TableMetadataManager};
use crate::key::{to_maintenance_key, to_removed_key, TableMetadataManager};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{region_distribution, Region, RegionRoute};
Expand All @@ -529,6 +534,13 @@ mod tests {
assert_eq!(removed, to_removed_key(key));
}

#[test]
fn test_to_maintenance_key() {
let key = "test_key";
let maintenance = "__maintenance-test_key";
assert_eq!(maintenance, to_maintenance_key(key));
}

fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
Expand Down
8 changes: 8 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to parse bool: {}, source: {}", err_msg, source))]
ParseBool {
err_msg: String,
source: std::str::ParseBoolError,
location: Location,
},

#[snafu(display("Invalid arguments: {}", err_msg))]
InvalidArguments { err_msg: String, location: Location },

Expand Down Expand Up @@ -567,6 +574,7 @@ impl ErrorExt for Error {
| Error::InvalidLeaseKey { .. }
| Error::InvalidStatKey { .. }
| Error::ParseNum { .. }
| Error::ParseBool { .. }
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. }
| Error::InvalidHeartbeatRequest { .. }
Expand Down
73 changes: 73 additions & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ impl HeartbeatHandler for RegionFailureHandler {

#[cfg(test)]
mod tests {
use common_meta::key::to_maintenance_key;

use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
Expand Down Expand Up @@ -161,4 +163,75 @@ mod tests {
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_maintenance_mode() {
let region_failover_manager = create_region_failover_manager();
let in_memory = region_failover_manager.create_context().in_memory.clone();
let handler = RegionFailureHandler::try_new(None, region_failover_manager.clone())
.await
.unwrap();

let req = &HeartbeatRequest::default();

let builder = MetaSrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let mut ctx = metasrv.new_ctx();
ctx.is_infancy = false;

let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
id: region_id,
table_ident: TableIdent {
catalog: "a".to_string(),
schema: "b".to_string(),
table: "c".to_string(),
table_id: 0,
engine: "d".to_string(),
},
rcus: 0,
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
}
}
acc.stat = Some(Stat {
cluster_id: 1,
id: 42,
region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)],
timestamp_millis: 1000,
..Default::default()
});

handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 3);

let kv_req = common_meta::rpc::store::PutRequest {
key: to_maintenance_key(format!("{}-{}", 1, 42).as_str()).into_bytes(),
value: vec![],
prev_kv: false,
};

let _ = in_memory.put(kv_req.clone()).await.unwrap();

tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let dump = handler.failure_detect_runner.dump().await;
let maintenance_node = region_failover_manager
.get_all_maintenance_node()
.await
.unwrap();
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
assert_eq!(maintenance_node, vec![(1, 42)]);

let _ = in_memory
.delete(kv_req.key.as_slice(), false)
.await
.unwrap();
let maintenance_node = region_failover_manager
.get_all_maintenance_node()
.await
.unwrap();
assert_eq!(maintenance_node, vec![]);
}
}
33 changes: 31 additions & 2 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use std::time::Duration;

use async_trait::async_trait;
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManagerRef;
use common_meta::{ClusterId, RegionIdent};
use common_meta::key::{to_maintenance_key, TableMetadataManagerRef};
use common_meta::{ClusterId, DatanodeId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
Expand Down Expand Up @@ -155,6 +155,14 @@ impl RegionFailoverManager {
}

pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> {
let key = to_maintenance_key(
format!("{}-{}", failed_region.cluster_id, failed_region.datanode_id).as_str(),
)
.into_bytes();
if self.in_memory.exists(key.as_slice()).await? {
return Ok(());
}

let Some(guard) = self.insert_running_procedures(failed_region) else {
warn!("Region failover procedure for region {failed_region} is already running!");
return Ok(());
Expand Down Expand Up @@ -207,6 +215,27 @@ impl RegionFailoverManager {
.context(TableMetadataManagerSnafu)?
.is_some())
}

#[allow(dead_code)]
pub(crate) async fn get_all_maintenance_node(&self) -> Result<Vec<(ClusterId, DatanodeId)>> {
let key = to_maintenance_key("").into_bytes();
let range_end = common_meta::util::get_prefix_end_key(&key);
let rang_req = common_meta::rpc::store::RangeRequest::new().with_range(key, range_end);
let mut res = self.in_memory.range(rang_req).await?;
let all_node = res
.take_kvs()
.into_iter()
.map(|kv| {
let maintenance_key = String::from_utf8(kv.key).unwrap();
let keys = maintenance_key.split('-').collect::<Vec<_>>();
(
keys[1].parse::<ClusterId>().unwrap(),
keys[2].parse::<DatanodeId>().unwrap(),
)
})
.collect::<Vec<(ClusterId, DatanodeId)>>();
Ok(all_node)
}
}

/// A "Node" in the state machine of region failover procedure.
Expand Down
8 changes: 8 additions & 0 deletions src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod health;
mod heartbeat;
mod leader;
mod maintenance;
mod meta;
mod node_lease;
mod route;
Expand Down Expand Up @@ -89,6 +90,13 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
},
);

let router = router.route(
"/set-maintenance",
maintenance::MaintenanceHandler {
store: meta_srv.in_memory().clone(),
},
);

let router = Router::nest("/admin", router);

Admin::new(router)
Expand Down
83 changes: 83 additions & 0 deletions src/meta-srv/src/service/admin/maintenance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use common_meta::key::to_maintenance_key;
use common_meta::rpc::store::PutRequest;
use common_meta::{ClusterId, DatanodeId};
use snafu::{OptionExt, ResultExt};
use tonic::codegen::http;

use crate::error::{self, Result};
use crate::service::admin::HttpHandler;
use crate::service::store::kv::ResettableKvStoreRef;
pub struct MaintenanceHandler {
pub store: ResettableKvStoreRef,
}

#[async_trait::async_trait]
impl HttpHandler for MaintenanceHandler {
async fn handle(
&self,
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let cluster_id = params
.get("cluster_id")
.map(|id| id.parse::<ClusterId>())
.context(error::MissingRequiredParameterSnafu {
param: "cluster_id",
})?
.context(error::ParseNumSnafu {
err_msg: "`cluster_id` is not a valid number",
})?;

let node_id = params
.get("node_id")
.map(|id| id.parse::<DatanodeId>())
.context(error::MissingRequiredParameterSnafu { param: "node_id" })?
.context(error::ParseNumSnafu {
err_msg: "`node_id` is not a valid number",
})?;

let switch_on = params
.get("switch_on")
.map(|on| on.parse::<bool>())
.context(error::MissingRequiredParameterSnafu { param: "switch-on" })?
.context(error::ParseBoolSnafu {
err_msg: "`switch_on` is not a valid bool",
})?;

let req = PutRequest {
key: to_maintenance_key(format!("{}-{}", cluster_id, node_id).as_str()).into_bytes(),
value: vec![],
prev_kv: false,
};

if switch_on {
self.store.put(req).await?;
} else {
self.store.delete(req.key.as_slice(), false).await?;
}

http::Response::builder()
.status(http::StatusCode::OK)
.body(format!(
"Datanode {}-{} is succeed to be set maintenance mode",
cluster_id, node_id
))
.context(error::InvalidHttpBodySnafu)
}
}

0 comments on commit 1ca01fd

Please sign in to comment.