Skip to content

Commit

Permalink
feat: add migration abort step
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 1, 2023
1 parent e716d67 commit 0dd8df7
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 4 deletions.
6 changes: 5 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("The region migration procedure aborted, reason: {}", reason))]
MigrationAbort { location: Location, reason: String },

#[snafu(display(
"Another procedure is opening the region: {} on peer: {}",
region_id,
Expand Down Expand Up @@ -665,7 +668,8 @@ impl ErrorExt for Error {
| Error::Txn { .. }
| Error::TableIdChanged { .. }
| Error::RegionOpeningRace { .. }
| Error::RegionRouteNotFound { .. } => StatusCode::Unexpected,
| Error::RegionRouteNotFound { .. }
| Error::MigrationAbort { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub(crate) mod downgrade_leader_region;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
pub(crate) mod migration_start;
pub(crate) mod open_candidate_region;
Expand Down
54 changes: 54 additions & 0 deletions src/meta-srv/src/procedure/region_migration/migration_abort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use common_procedure::Status;
use serde::{Deserialize, Serialize};

use crate::error::{self, Result};
use crate::procedure::region_migration::{Context, State};

#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationAbort {
reason: String,
}

impl RegionMigrationAbort {
/// Returns the [RegionMigrationAbort] with `reason`.
pub fn new(reason: &str) -> Self {
Self {
reason: reason.to_string(),
}
}
}

#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationAbort {
async fn next(&mut self, _: &mut Context) -> Result<Box<dyn State>> {
error::MigrationAbortSnafu {
reason: &self.reason,
}
.fail()
}

fn status(&self) -> Status {
Status::Done
}

fn as_any(&self) -> &dyn Any {
self
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::any::Any;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use crate::error::Result;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
Expand Down Expand Up @@ -61,7 +62,9 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}");
};
Ok(Box::new(RegionMigrationEnd))
Ok(Box::new(RegionMigrationAbort::new(
"Failed to upgrade the candidate region.",
)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ mod tests {
use store_api::storage::RegionId;

use crate::error::Error;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
Expand Down Expand Up @@ -221,7 +221,10 @@ mod tests {

let next = state.next(&mut ctx).await.unwrap();

let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
let _ = next
.as_any()
.downcast_ref::<RegionMigrationAbort>()
.unwrap();

assert!(ctx.volatile_ctx.table_route.is_none());

Expand Down

0 comments on commit 0dd8df7

Please sign in to comment.