Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add maintenance mode (#1586) #2211

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ use crate::rpc::router::{region_distribution, RegionRoute};
use crate::DatanodeId;

pub const REMOVED_PREFIX: &str = "__removed";
pub const MAINTENANCE_KEY: &[u8] = "maintenance".as_bytes();

const NAME_PATTERN: &str = "[a-zA-Z_:-][a-zA-Z0-9_:-]*";

Expand Down
8 changes: 8 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to parse bool: {}, source: {}", err_msg, source))]
ParseBool {
err_msg: String,
source: std::str::ParseBoolError,
location: Location,
},

#[snafu(display("Invalid arguments: {}", err_msg))]
InvalidArguments { err_msg: String, location: Location },

Expand Down Expand Up @@ -570,6 +577,7 @@ impl ErrorExt for Error {
| Error::InvalidStatKey { .. }
| Error::InvalidInactiveRegionKey { .. }
| Error::ParseNum { .. }
| Error::ParseBool { .. }
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. }
| Error::InvalidHeartbeatRequest { .. }
Expand Down
62 changes: 62 additions & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ impl HeartbeatHandler for RegionFailureHandler {

#[cfg(test)]
mod tests {

use common_meta::key::MAINTENANCE_KEY;

use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
Expand Down Expand Up @@ -155,4 +158,63 @@ mod tests {
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_maintenance_mode() {
let region_failover_manager = create_region_failover_manager();
let in_memory = region_failover_manager.create_context().in_memory.clone();
let handler = RegionFailureHandler::try_new(None, region_failover_manager.clone())
.await
.unwrap();

let req = &HeartbeatRequest::default();

let builder = MetaSrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let mut ctx = metasrv.new_ctx();
ctx.is_infancy = false;

let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
id: region_id,
rcus: 0,
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
}
}
acc.stat = Some(Stat {
cluster_id: 1,
id: 42,
region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)],
timestamp_millis: 1000,
..Default::default()
});

handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 3);
Comment on lines +170 to +197
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Are they related to the test?

Only the following assertion is related to the maintenance mode.

        let maintenance_node = region_failover_manager.is_maintenance_node().await.unwrap();
        assert!(maintenance_node);

        let _ = in_memory
            .delete(kv_req.key.as_slice(), false)
            .await
            .unwrap();
        let maintenance_node = region_failover_manager.is_maintenance_node().await.unwrap();
        assert!(!maintenance_node);


let kv_req = common_meta::rpc::store::PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};

let _ = in_memory.put(kv_req.clone()).await.unwrap();

tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we test this without sleep? @fengjiachun @MichaelScofield

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think it's needed.

let dump = handler.failure_detect_runner.dump().await;
let maintenance_node = region_failover_manager.is_maintenance_node().await.unwrap();
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
assert!(maintenance_node);

let _ = in_memory
.delete(kv_req.key.as_slice(), false)
.await
.unwrap();
let maintenance_node = region_failover_manager.is_maintenance_node().await.unwrap();
assert!(!maintenance_node);
}
}
11 changes: 10 additions & 1 deletion src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use api::v1::meta::Peer;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY};
use common_meta::rpc::store::PutRequest;
use common_meta::sequence::SequenceRef;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
Expand Down Expand Up @@ -288,6 +289,14 @@ impl MetaSrv {
.context(RecoverProcedureSnafu)?;
}

if self.kv_store.exists(MAINTENANCE_KEY).await? {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.in_memory.put(req).await?;
}
info!("MetaSrv started");
Ok(())
}
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::time::Duration;
use async_trait::async_trait;
use common_meta::ident::TableIdent;
use common_meta::key::datanode_table::DatanodeTableKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY};
use common_meta::{ClusterId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
Expand Down Expand Up @@ -158,6 +158,10 @@ impl RegionFailoverManager {
}

pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> {
if self.is_maintenance_node().await? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be mode?

Suggested change
if self.is_maintenance_node().await? {
if self.is_maintenance_mode().await? {

return Ok(());
}

let Some(guard) = self.insert_running_procedures(failed_region) else {
warn!("Region failover procedure for region {failed_region} is already running!");
return Ok(());
Expand Down Expand Up @@ -236,6 +240,10 @@ impl RegionFailoverManager {
})
.unwrap_or_default())
}
#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why allow dead code?

Suggested change
#[allow(dead_code)]

pub(crate) async fn is_maintenance_node(&self) -> Result<bool> {
self.in_memory.exists(MAINTENANCE_KEY).await
}
}

/// A "Node" in the state machine of region failover procedure.
Expand Down
9 changes: 9 additions & 0 deletions src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod health;
mod heartbeat;
mod inactive_regions;
mod leader;
mod maintenance;
mod meta;
mod node_lease;
mod route;
Expand Down Expand Up @@ -105,6 +106,14 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
},
);

let router = router.route(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we provide an API to query whether we are in maintenance mode?

"/set-maintenance",
maintenance::MaintenanceHandler {
kv_store: meta_srv.kv_store().clone(),
in_memory: meta_srv.in_memory().clone(),
},
);

let router = Router::nest("/admin", router);

Admin::new(router)
Expand Down
64 changes: 64 additions & 0 deletions src/meta-srv/src/service/admin/maintenance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use common_meta::key::MAINTENANCE_KEY;
use common_meta::rpc::store::PutRequest;
use snafu::{OptionExt, ResultExt};
use tonic::codegen::http;

use crate::error::{self, Result};
use crate::service::admin::HttpHandler;
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
pub struct MaintenanceHandler {
Comment on lines +24 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
pub struct MaintenanceHandler {
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
pub struct MaintenanceHandler {

pub kv_store: KvStoreRef,
pub in_memory: ResettableKvStoreRef,
}

#[async_trait::async_trait]
impl HttpHandler for MaintenanceHandler {
async fn handle(
&self,
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let switch_on = params
.get("switch_on")
.map(|on| on.parse::<bool>())
.context(error::MissingRequiredParameterSnafu { param: "switch-on" })?
.context(error::ParseBoolSnafu {
err_msg: "`switch_on` is not a valid bool",
})?;

let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};

if switch_on {
self.kv_store.put(req.clone()).await?;
self.in_memory.put(req).await?;
} else {
self.kv_store.delete(MAINTENANCE_KEY, false).await?;
self.in_memory.delete(MAINTENANCE_KEY, false).await?;
}

http::Response::builder()
.status(http::StatusCode::OK)
.body("metasvc is succeed to be set maintenance mode".to_string())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If switch_on = false, this body content is untrue.

.context(error::InvalidHttpBodySnafu)
}
}
Loading