diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index fa06558a9ba3..c66ead5b8002 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -631,7 +631,7 @@ impl TableMetadataManager { pub async fn update_leader_region_status( &self, table_id: TableId, - current_table_route_value: DeserializedValueWithBytes, + current_table_route_value: &DeserializedValueWithBytes, next_region_route_status: F, ) -> Result<()> where @@ -658,7 +658,7 @@ impl TableMetadataManager { let (update_table_route_txn, on_update_table_route_failure) = self .table_route_manager() - .build_update_txn(table_id, ¤t_table_route_value, &new_table_route_value)?; + .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?; let r = self.kv_backend.txn(update_table_route_txn).await?; @@ -1094,7 +1094,7 @@ mod tests { .unwrap(); table_metadata_manager - .update_leader_region_status(table_id, current_table_route_value, |region_route| { + .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| { if region_route.leader_status.is_some() { None } else { diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index 04b3ccde97e4..c120b30a26da 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -58,7 +58,7 @@ impl DeactivateRegion { .context(error::TableRouteNotFoundSnafu { table_id })?; ctx.table_metadata_manager - .update_leader_region_status(table_id, table_route_value, |region| { + .update_leader_region_status(table_id, &table_route_value, |region| { if region.region.id.region_number() == failed_region.region_number { Some(Some(RegionStatus::Downgraded)) } else { diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index aa5ff5452884..cabd1f7805ab 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -18,11 +18,13 @@ pub(crate) mod migration_start; pub(crate) mod open_candidate_region; #[cfg(test)] pub(crate) mod test_util; +pub(crate) mod update_metadata; use std::any::Any; use std::fmt::Debug; -use common_meta::key::TableMetadataManagerRef; +use common_meta::key::table_route::TableRouteValue; +use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use common_meta::peer::Peer; use common_meta::ClusterId; use common_procedure::error::{ @@ -30,11 +32,11 @@ use common_procedure::error::{ }; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use snafu::{location, Location, OptionExt, ResultExt}; use store_api::storage::RegionId; use self::migration_start::RegionMigrationStart; -use crate::error::{Error, Result}; +use crate::error::{self, Error, Result}; use crate::procedure::utils::region_lock_key; use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef}; use crate::service::mailbox::MailboxRef; @@ -74,8 +76,10 @@ pub struct VolatileContext { /// /// `opening_region_guard` should be consumed after /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region - /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue) . + /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue). opening_region_guard: Option, + /// `table_route_info` is stored via previous steps for future use. + table_route_info: Option>, } /// Used to generate new [Context]. @@ -122,6 +126,47 @@ impl Context { pub fn server_addr(&self) -> &str { &self.server_addr } + + /// Returns the `table_route_value` of [VolatileContext] if any. + /// Otherwise, returns the value retrieved from remote. + /// + /// Retry: + /// - Failed to retrieve the metadata of table. + pub async fn get_table_route_value( + &mut self, + ) -> Result<&DeserializedValueWithBytes> { + let table_route_value = &mut self.volatile_ctx.table_route_info; + + if table_route_value.is_none() { + let table_id = self.persistent_ctx.region_id.table_id(); + let table_route = self + .table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(|e| error::Error::RetryLater { + reason: e.to_string(), + location: location!(), + })? + .context(error::TableRouteNotFoundSnafu { table_id })?; + + *table_route_value = Some(table_route); + } + + Ok(table_route_value.as_ref().unwrap()) + } + + /// Removes the `table_route_value` of [VolatileContext], returns true if any. + pub fn remove_table_route_value(&mut self) -> bool { + let value = self.volatile_ctx.table_route_info.take(); + value.is_some() + } + + /// Returns the [RegionId]. + pub fn region_id(&self) -> RegionId { + self.persistent_ctx.region_id + } } #[async_trait::async_trait] diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 6f2e43c8ace4..ab61da316c8d 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -17,7 +17,7 @@ use std::any::Any; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; use serde::{Deserialize, Serialize}; -use snafu::{location, Location, OptionExt, ResultExt}; +use snafu::OptionExt; use store_api::storage::RegionId; use super::downgrade_leader_region::DowngradeLeaderRegion; @@ -41,9 +41,8 @@ impl State for RegionMigrationStart { /// Otherwise go to the OpenCandidateRegion state. async fn next(&mut self, ctx: &mut Context) -> Result> { let region_id = ctx.persistent_ctx.region_id; - let to_peer = &ctx.persistent_ctx.to_peer; - let region_route = self.retrieve_region_route(ctx, region_id).await?; + let to_peer = &ctx.persistent_ctx.to_peer; if self.check_leader_region_on_peer(®ion_route, to_peer)? { Ok(Box::new(RegionMigrationEnd)) @@ -70,21 +69,11 @@ impl RegionMigrationStart { /// - Failed to retrieve the metadata of table. async fn retrieve_region_route( &self, - ctx: &Context, + ctx: &mut Context, region_id: RegionId, ) -> Result { let table_id = region_id.table_id(); - let table_route = ctx - .table_metadata_manager - .table_route_manager() - .get(table_id) - .await - .context(error::TableMetadataManagerSnafu) - .map_err(|e| error::Error::RetryLater { - reason: e.to_string(), - location: location!(), - })? - .context(error::TableRouteNotFoundSnafu { table_id })?; + let table_route = ctx.get_table_route_value().await?; let region_route = table_route .region_routes @@ -165,10 +154,10 @@ mod tests { let state = RegionMigrationStart; let env = TestingEnv::new(); let persistent_context = new_persistent_context(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); let err = state - .retrieve_region_route(&ctx, RegionId::new(1024, 1)) + .retrieve_region_route(&mut ctx, RegionId::new(1024, 1)) .await .unwrap_err(); @@ -184,7 +173,7 @@ mod tests { let from_peer = persistent_context.from_peer.clone(); let env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); let table_info = new_test_table_info(1024, vec![1]).into(); let region_route = RegionRoute { @@ -199,7 +188,7 @@ mod tests { .unwrap(); let err = state - .retrieve_region_route(&ctx, RegionId::new(1024, 3)) + .retrieve_region_route(&mut ctx, RegionId::new(1024, 3)) .await .unwrap_err(); diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs new file mode 100644 index 000000000000..f41b66f4c09e --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -0,0 +1,239 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_meta::rpc::router::RegionStatus; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; +use crate::procedure::region_migration::{Context, State}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "UpdateMetadata")] +pub enum UpdateMetadata { + Downgrade, +} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for UpdateMetadata { + async fn next(&mut self, ctx: &mut Context) -> Result> { + match self { + UpdateMetadata::Downgrade => { + self.downgrade_leader_region(ctx).await?; + + Ok(Box::new(DowngradeLeaderRegion)) + } + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl UpdateMetadata { + /// Downgrades the leader region. + /// + /// Abort(non-retry): + /// - TableRoute is not found. + /// + /// Retry: + /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue). + /// - Failed to retrieve the metadata of table. + /// + /// About the failure of updating the [TableRouteValue](common_meta::key::table_region::TableRegionValue): + /// + /// - There may be another [RegionMigrationProcedure](crate::procedure::region_migration::RegionMigrationProcedure) + /// that is executed concurrently for **other region**. + /// It will only update **other region** info. Therefore, It's safe to retry after failure. + /// + /// - There is no other DDL procedure executed concurrently for the current table. + async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> { + let table_metadata_manager = ctx.table_metadata_manager.clone(); + let region_id = ctx.region_id(); + let table_id = region_id.table_id(); + let current_table_route_value = ctx.get_table_route_value().await?; + + if let Err(err) = table_metadata_manager + .update_leader_region_status(table_id, current_table_route_value, |route| { + if route.region.id == region_id { + Some(Some(RegionStatus::Downgraded)) + } else { + None + } + }) + .await + .context(error::TableMetadataManagerSnafu) + { + debug_assert!(ctx.remove_table_route_value()); + return error::RetryLaterSnafu { + reason: format!("Failed to update the table route during the downgrading leader region, error: {err}") + }.fail(); + } + + debug_assert!(ctx.remove_table_route_value()); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::key::test_utils::new_test_table_info; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + use crate::procedure::region_migration::test_util::TestingEnv; + use crate::procedure::region_migration::{ContextFactory, PersistentContext}; + + fn new_persistent_context() -> PersistentContext { + PersistentContext { + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + region_id: RegionId::new(1024, 1), + cluster_id: 0, + } + } + + #[test] + fn test_state_serialization() { + let state = UpdateMetadata::Downgrade; + let expected = r#"{"UpdateMetadata":"Downgrade"}"#; + assert_eq!(expected, serde_json::to_string(&state).unwrap()); + } + + #[tokio::test] + async fn test_table_route_is_not_found_error() { + let state = UpdateMetadata::Downgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); + + assert_matches!(err, Error::TableRouteNotFound { .. }); + + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_failed_to_update_table_route_error() { + let state = UpdateMetadata::Downgrade; + let persistent_context = new_persistent_context(); + let from_peer = persistent_context.from_peer.clone(); + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let table_id = ctx.region_id().table_id(); + + let table_info = new_test_table_info(1024, vec![1, 2]).into(); + let region_routes = vec![ + RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(from_peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(4)), + ..Default::default() + }, + ]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let original_table_route = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap(); + + // modifies the table route. + table_metadata_manager + .update_leader_region_status(table_id, &original_table_route, |route| { + if route.region.id == RegionId::new(1024, 2) { + Some(Some(RegionStatus::Downgraded)) + } else { + None + } + }) + .await + .unwrap(); + + // sets the old table route. + ctx.volatile_ctx.table_route_info = Some(original_table_route); + + let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); + + assert_matches!(err, Error::RetryLater { .. }); + + assert!(err.is_retryable()); + assert!(err.to_string().contains("Failed to update the table route")); + } + + #[tokio::test] + async fn test_next_downgrade_leader_region_state() { + let mut state = Box::new(UpdateMetadata::Downgrade); + let persistent_context = new_persistent_context(); + let from_peer = persistent_context.from_peer.clone(); + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let table_id = ctx.region_id().table_id(); + + let table_info = new_test_table_info(1024, vec![1, 2]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(from_peer.clone()), + ..Default::default() + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state.next(&mut ctx).await.unwrap(); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + + let latest_table_route = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap(); + + assert!(latest_table_route.region_routes[0].is_leader_downgraded()); + assert!(ctx.volatile_ctx.table_route_info.is_none()); + } +}