From 3619c470452a5e3485fa014ad31ce63ab6b6ddad Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 29 Aug 2023 08:23:29 +0000 Subject: [PATCH 1/2] fix: skip procedure if target route is not found --- src/meta-srv/src/error.rs | 9 +++- .../region_failover/failover_start.rs | 46 ++++++++++++++++++- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index f608bd90b194..b92dfff1cc13 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -26,6 +26,12 @@ use crate::pubsub::Message; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display( + "Failed to found failed region: {} in route, it may be already failover by another procedure", + region + ))] + FailedRegionNotFound { location: Location, region: String }, + #[snafu(display("Failed to list catalogs: {}", source))] ListCatalogs { location: Location, @@ -561,7 +567,8 @@ impl ErrorExt for Error { | Error::ConvertGrpcExpr { .. } | Error::PublishMessage { .. } | Error::Join { .. } - | Error::Unsupported { .. } => StatusCode::Internal, + | Error::Unsupported { .. } + | Error::FailedRegionNotFound { .. } => StatusCode::Internal, Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } diff --git a/src/meta-srv/src/procedure/region_failover/failover_start.rs b/src/meta-srv/src/procedure/region_failover/failover_start.rs index 52d55bbe9272..e0fd24c15132 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_start.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_start.rs @@ -13,6 +13,7 @@ // limitations under the License. use async_trait::async_trait; +use common_catalog::format_full_table_name; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_meta::ident::TableIdent; @@ -20,11 +21,11 @@ use common_meta::peer::Peer; use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; -use snafu::ensure; +use snafu::{ensure, OptionExt, ResultExt}; use super::deactivate_region::DeactivateRegion; use super::{RegionFailoverContext, State}; -use crate::error::{RegionFailoverCandidatesNotFoundSnafu, Result, RetryLaterSnafu}; +use crate::error::{self, RegionFailoverCandidatesNotFoundSnafu, Result, RetryLaterSnafu}; #[derive(Serialize, Deserialize, Debug)] pub(super) struct RegionFailoverStart { @@ -38,6 +39,38 @@ impl RegionFailoverStart { } } + async fn failed_region_exist( + &self, + ctx: &RegionFailoverContext, + failed_region: &RegionIdent, + ) -> Result { + let table_id = failed_region.table_ident.table_id; + let route = ctx + .table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .with_context(|| error::TableRouteNotFoundSnafu { + table_name: format_full_table_name( + &failed_region.table_ident.catalog, + &failed_region.table_ident.schema, + &failed_region.table_ident.table, + ), + })?; + + let failed_region = route.region_routes.iter().any(|route| { + route.region.id.region_number() == failed_region.region_number + && route + .leader_peer + .as_ref() + .map(|route| route.id == failed_region.datanode_id) + .unwrap_or_default() + }); + + Ok(failed_region) + } + async fn choose_candidate( &mut self, ctx: &RegionFailoverContext, @@ -95,6 +128,15 @@ impl State for RegionFailoverStart { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { + let exist = self.failed_region_exist(ctx, failed_region).await?; + + ensure!( + exist, + error::FailedRegionNotFoundSnafu { + region: failed_region.to_string() + } + ); + let candidate = self .choose_candidate(ctx, failed_region) .await From 1c76ebca4ff21e8fae876c574dea0d1ef0b764fa Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 30 Aug 2023 03:29:43 +0000 Subject: [PATCH 2/2] chore: apply suggestions from CR --- src/meta-srv/src/error.rs | 9 +--- src/meta-srv/src/procedure/region_failover.rs | 27 +++++++++++ .../region_failover/failover_start.rs | 46 +------------------ 3 files changed, 30 insertions(+), 52 deletions(-) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index b92dfff1cc13..f608bd90b194 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -26,12 +26,6 @@ use crate::pubsub::Message; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display( - "Failed to found failed region: {} in route, it may be already failover by another procedure", - region - ))] - FailedRegionNotFound { location: Location, region: String }, - #[snafu(display("Failed to list catalogs: {}", source))] ListCatalogs { location: Location, @@ -567,8 +561,7 @@ impl ErrorExt for Error { | Error::ConvertGrpcExpr { .. } | Error::PublishMessage { .. } | Error::Join { .. } - | Error::Unsupported { .. } - | Error::FailedRegionNotFound { .. } => StatusCode::Internal, + | Error::Unsupported { .. } => StatusCode::Internal, Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 08e4f9ad675d..af19ce9c240f 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -26,6 +26,7 @@ use std::time::Duration; use async_trait::async_trait; use common_meta::ident::TableIdent; +use common_meta::key::datanode_table::DatanodeTableKey; use common_meta::key::TableMetadataManagerRef; use common_meta::{ClusterId, RegionIdent}; use common_procedure::error::{ @@ -168,6 +169,11 @@ impl RegionFailoverManager { return Ok(()); } + if !self.failed_region_exists(failed_region).await? { + // The failed region could be failover by another procedure. + return Ok(()); + } + let context = self.create_context(); let procedure = RegionFailoverProcedure::new(failed_region.clone(), context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -207,6 +213,27 @@ impl RegionFailoverManager { .context(TableMetadataManagerSnafu)? .is_some()) } + + async fn failed_region_exists(&self, failed_region: &RegionIdent) -> Result { + let table_id = failed_region.table_ident.table_id; + let datanode_id = failed_region.datanode_id; + + let value = self + .table_metadata_manager + .datanode_table_manager() + .get(&DatanodeTableKey::new(datanode_id, table_id)) + .await + .context(TableMetadataManagerSnafu)?; + + Ok(value + .map(|value| { + value + .regions + .iter() + .any(|region| *region == failed_region.region_number) + }) + .unwrap_or_default()) + } } /// A "Node" in the state machine of region failover procedure. diff --git a/src/meta-srv/src/procedure/region_failover/failover_start.rs b/src/meta-srv/src/procedure/region_failover/failover_start.rs index e0fd24c15132..52d55bbe9272 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_start.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_start.rs @@ -13,7 +13,6 @@ // limitations under the License. use async_trait::async_trait; -use common_catalog::format_full_table_name; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_meta::ident::TableIdent; @@ -21,11 +20,11 @@ use common_meta::peer::Peer; use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::ensure; use super::deactivate_region::DeactivateRegion; use super::{RegionFailoverContext, State}; -use crate::error::{self, RegionFailoverCandidatesNotFoundSnafu, Result, RetryLaterSnafu}; +use crate::error::{RegionFailoverCandidatesNotFoundSnafu, Result, RetryLaterSnafu}; #[derive(Serialize, Deserialize, Debug)] pub(super) struct RegionFailoverStart { @@ -39,38 +38,6 @@ impl RegionFailoverStart { } } - async fn failed_region_exist( - &self, - ctx: &RegionFailoverContext, - failed_region: &RegionIdent, - ) -> Result { - let table_id = failed_region.table_ident.table_id; - let route = ctx - .table_metadata_manager - .table_route_manager() - .get(table_id) - .await - .context(error::TableMetadataManagerSnafu)? - .with_context(|| error::TableRouteNotFoundSnafu { - table_name: format_full_table_name( - &failed_region.table_ident.catalog, - &failed_region.table_ident.schema, - &failed_region.table_ident.table, - ), - })?; - - let failed_region = route.region_routes.iter().any(|route| { - route.region.id.region_number() == failed_region.region_number - && route - .leader_peer - .as_ref() - .map(|route| route.id == failed_region.datanode_id) - .unwrap_or_default() - }); - - Ok(failed_region) - } - async fn choose_candidate( &mut self, ctx: &RegionFailoverContext, @@ -128,15 +95,6 @@ impl State for RegionFailoverStart { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { - let exist = self.failed_region_exist(ctx, failed_region).await?; - - ensure!( - exist, - error::FailedRegionNotFoundSnafu { - region: failed_region.to_string() - } - ); - let candidate = self .choose_candidate(ctx, failed_region) .await