From 0dd8df7e8fd7165c02411e1e8b372fc50e141513 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 1 Dec 2023 06:26:01 +0000 Subject: [PATCH] feat: add migration abort step --- src/meta-srv/src/error.rs | 6 ++- .../src/procedure/region_migration.rs | 1 + .../region_migration/migration_abort.rs | 54 +++++++++++++++++++ .../region_migration/update_metadata.rs | 5 +- .../rollback_downgraded_region.rs | 7 ++- 5 files changed, 69 insertions(+), 4 deletions(-) create mode 100644 src/meta-srv/src/procedure/region_migration/migration_abort.rs diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index abd60a4947cf..c081f0a94e1b 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -32,6 +32,9 @@ use crate::pubsub::Message; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("The region migration procedure aborted, reason: {}", reason))] + MigrationAbort { location: Location, reason: String }, + #[snafu(display( "Another procedure is opening the region: {} on peer: {}", region_id, @@ -665,7 +668,8 @@ impl ErrorExt for Error { | Error::Txn { .. } | Error::TableIdChanged { .. } | Error::RegionOpeningRace { .. } - | Error::RegionRouteNotFound { .. } => StatusCode::Unexpected, + | Error::RegionRouteNotFound { .. } + | Error::MigrationAbort { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidateTableCache { source, .. } => source.status_code(), Error::RequestDatanode { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index e04f23d52b59..4edc69bc3a59 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -13,6 +13,7 @@ // limitations under the License. pub(crate) mod downgrade_leader_region; +pub(crate) mod migration_abort; pub(crate) mod migration_end; pub(crate) mod migration_start; pub(crate) mod open_candidate_region; diff --git a/src/meta-srv/src/procedure/region_migration/migration_abort.rs b/src/meta-srv/src/procedure/region_migration/migration_abort.rs new file mode 100644 index 000000000000..c47864ae5bab --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/migration_abort.rs @@ -0,0 +1,54 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::Status; +use serde::{Deserialize, Serialize}; + +use crate::error::{self, Result}; +use crate::procedure::region_migration::{Context, State}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct RegionMigrationAbort { + reason: String, +} + +impl RegionMigrationAbort { + /// Returns the [RegionMigrationAbort] with `reason`. + pub fn new(reason: &str) -> Self { + Self { + reason: reason.to_string(), + } + } +} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for RegionMigrationAbort { + async fn next(&mut self, _: &mut Context) -> Result> { + error::MigrationAbortSnafu { + reason: &self.reason, + } + .fail() + } + + fn status(&self) -> Status { + Status::Done + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index b4e859f7166d..7d56c51390ae 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -21,6 +21,7 @@ use std::any::Any; use common_telemetry::warn; use serde::{Deserialize, Serialize}; +use super::migration_abort::RegionMigrationAbort; use super::migration_end::RegionMigrationEnd; use crate::error::Result; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; @@ -61,7 +62,9 @@ impl State for UpdateMetadata { if let Err(err) = ctx.invalidate_table_cache().await { warn!("Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}"); }; - Ok(Box::new(RegionMigrationEnd)) + Ok(Box::new(RegionMigrationAbort::new( + "Failed to upgrade the candidate region.", + ))) } } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 6e1daedf6bd0..b9aed08ad678 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -66,7 +66,7 @@ mod tests { use store_api::storage::RegionId; use crate::error::Error; - use crate::procedure::region_migration::migration_end::RegionMigrationEnd; + use crate::procedure::region_migration::migration_abort::RegionMigrationAbort; use crate::procedure::region_migration::test_util::{self, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; @@ -221,7 +221,10 @@ mod tests { let next = state.next(&mut ctx).await.unwrap(); - let _ = next.as_any().downcast_ref::().unwrap(); + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); assert!(ctx.volatile_ctx.table_route.is_none());