Skip to content

Commit

Permalink
fix: ensure table route metadata is eventually rolled back on procedu…
Browse files Browse the repository at this point in the history
…re failure
  • Loading branch information
WenyXu committed Dec 16, 2024
1 parent 5ffda7e commit 5a52ab8
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 9 deletions.
67 changes: 66 additions & 1 deletion src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
use common_telemetry::info;
use manager::RegionMigrationProcedureGuard;
pub use manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
Expand Down Expand Up @@ -91,7 +92,9 @@ impl PersistentContext {
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
TableLock::Read(region_id.table_id()).into(),
// The optimistic updating of table route is not working very well,
// so we need to use the write lock here.
TableLock::Write(region_id.table_id()).into(),
RegionLock::Write(region_id).into(),
];

Expand Down Expand Up @@ -474,6 +477,58 @@ impl RegionMigrationProcedure {
_guard: guard,
})
}

async fn rollback_inner(&mut self) -> Result<()> {
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&["rollback"])
.start_timer();

let table_id = self.context.region_id().table_id();
let region_id = self.context.region_id();
let table_route = self
.context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableRoute: {table_id}"),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;

let downgraded = table_route
.region_routes()
.unwrap()
.iter()
.filter(|route| route.region.id == region_id)
.any(|route| route.is_leader_downgrading());

if downgraded {
info!("Rollbacking downgraded region leader table route, region: {region_id}");
self.context
.table_metadata_manager
.update_leader_region_status(table_id, &table_route, |route| {
if route.region.id == region_id {
Some(None)
} else {
None
}
})
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
})?;
}

self.context.register_failure_detectors().await;

Ok(())
}
}

#[async_trait::async_trait]
Expand All @@ -482,6 +537,16 @@ impl Procedure for RegionMigrationProcedure {
Self::TYPE_NAME
}

async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner()
.await
.map_err(ProcedureError::external)
}

fn rollback_supported(&self) -> bool {
true
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use super::open_candidate_region::OpenCandidateRegion;
use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};

/// The behaviors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use common_procedure::Status;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use crate::error::Result;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};

#[derive(Debug, Serialize, Deserialize)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::time::{sleep, Instant};

use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

Expand Down

0 comments on commit 5a52ab8

Please sign in to comment.