Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure table route metadata is eventually rolled back on failure #5174

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
use common_telemetry::info;
use manager::RegionMigrationProcedureGuard;
pub use manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
Expand Down Expand Up @@ -91,7 +92,9 @@ impl PersistentContext {
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
TableLock::Read(region_id.table_id()).into(),
// The optimistic updating of table route is not working very well,
// so we need to use the write lock here.
TableLock::Write(region_id.table_id()).into(),
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
RegionLock::Write(region_id).into(),
];

Expand Down Expand Up @@ -474,6 +477,58 @@ impl RegionMigrationProcedure {
_guard: guard,
})
}

async fn rollback_inner(&mut self) -> Result<()> {
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&["rollback"])
.start_timer();

let table_id = self.context.region_id().table_id();
let region_id = self.context.region_id();
let table_route = self
.context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableRoute: {table_id}"),
})?
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
.context(error::TableRouteNotFoundSnafu { table_id })?;

let downgraded = table_route
.region_routes()
.unwrap()
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.filter(|route| route.region.id == region_id)
.any(|route| route.is_leader_downgrading());

if downgraded {
info!("Rollbacking downgraded region leader table route, region: {region_id}");
self.context
.table_metadata_manager
.update_leader_region_status(table_id, &table_route, |route| {
if route.region.id == region_id {
Some(None)
} else {
None
}
})
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
})?;
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}

self.context.register_failure_detectors().await;

Ok(())
}
}

#[async_trait::async_trait]
Expand All @@ -482,6 +537,16 @@ impl Procedure for RegionMigrationProcedure {
Self::TYPE_NAME
}

async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner()
.await
.map_err(ProcedureError::external)
}

fn rollback_supported(&self) -> bool {
true
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use super::open_candidate_region::OpenCandidateRegion;
use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};

/// The behaviors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use common_procedure::Status;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use crate::error::Result;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};

#[derive(Debug, Serialize, Deserialize)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::time::{sleep, Instant};

use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

Expand Down
Loading