From 30f5c4aa1ab2d10ba89525a98646837b38f35482 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 23 Nov 2023 10:17:22 +0000 Subject: [PATCH] feat: add update metadata step for upgrading candidate region --- src/common/meta/src/key.rs | 12 +- src/meta-srv/src/error.rs | 9 +- src/meta-srv/src/lib.rs | 2 + .../region_failover/update_metadata.rs | 2 +- .../src/procedure/region_migration.rs | 54 ++- .../region_migration/migration_start.rs | 9 +- .../region_migration/open_candidate_region.rs | 42 +- .../procedure/region_migration/test_util.rs | 15 + .../region_migration/update_metadata.rs | 214 +--------- .../downgrade_leader_region.rs | 210 ++++++++++ .../upgrade_candidate_region.rs | 376 ++++++++++++++++++ 11 files changed, 697 insertions(+), 248 deletions(-) create mode 100644 src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs create mode 100644 src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 39a942d5f3bd..850051fa0b93 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -584,7 +584,7 @@ impl TableMetadataManager { &self, table_id: TableId, region_info: RegionInfo, - current_table_route_value: DeserializedValueWithBytes, + current_table_route_value: &DeserializedValueWithBytes, new_region_routes: Vec, new_region_options: &HashMap, ) -> Result<()> { @@ -606,7 +606,7 @@ impl TableMetadataManager { let (update_table_route_txn, on_update_table_route_failure) = self .table_route_manager() - .build_update_txn(table_id, ¤t_table_route_value, &new_table_route_value)?; + .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?; let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]); @@ -1173,7 +1173,7 @@ mod tests { region_storage_path: region_storage_path.to_string(), region_options: HashMap::new(), }, - current_table_route_value.clone(), + ¤t_table_route_value, new_region_routes.clone(), &HashMap::new(), ) @@ -1190,7 +1190,7 @@ mod tests { region_storage_path: region_storage_path.to_string(), region_options: HashMap::new(), }, - current_table_route_value.clone(), + ¤t_table_route_value, new_region_routes.clone(), &HashMap::new(), ) @@ -1212,7 +1212,7 @@ mod tests { region_storage_path: region_storage_path.to_string(), region_options: HashMap::new(), }, - current_table_route_value.clone(), + ¤t_table_route_value, new_region_routes.clone(), &HashMap::new(), ) @@ -1237,7 +1237,7 @@ mod tests { region_storage_path: region_storage_path.to_string(), region_options: HashMap::new(), }, - wrong_table_route_value, + &wrong_table_route_value, new_region_routes, &HashMap::new(), ) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 3a007b3163c4..abd60a4947cf 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -298,6 +298,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to find table route for {region_id}"))] + RegionRouteNotFound { + region_id: RegionId, + location: Location, + }, + #[snafu(display("Table info not found: {}", table_id))] TableInfoNotFound { table_id: TableId, @@ -658,7 +664,8 @@ impl ErrorExt for Error { | Error::Unexpected { .. } | Error::Txn { .. } | Error::TableIdChanged { .. } - | Error::RegionOpeningRace { .. } => StatusCode::Unexpected, + | Error::RegionOpeningRace { .. } + | Error::RegionRouteNotFound { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidateTableCache { source, .. } => source.status_code(), Error::RequestDatanode { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index b30c6779b36a..6515ade81ebe 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -15,6 +15,8 @@ #![feature(async_closure)] #![feature(result_flattening)] #![feature(assert_matches)] +#![feature(option_take_if)] +#![feature(extract_if)] pub mod bootstrap; mod cache_invalidator; diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 22c1a89b542e..225588ac0942 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -105,7 +105,7 @@ impl UpdateRegionMetadata { region_storage_path: self.region_storage_path.to_string(), region_options: self.region_options.clone(), }, - table_route_value, + &table_route_value, new_region_routes, &self.region_options, ) diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 295aab78acbd..d5ae7235c79d 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -25,6 +25,7 @@ use std::any::Any; use std::fmt::Debug; use std::time::Duration; +use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use common_meta::peer::Peer; @@ -81,8 +82,13 @@ pub struct VolatileContext { /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue). opening_region_guard: Option, - /// `table_route_info` is stored via previous steps for future use. - table_route_info: Option>, + /// `table_route` is stored via previous steps for future use. + table_route: Option>, + /// `table_info` is stored via previous steps for future use. + /// + /// `table_info` should remain unchanged during the procedure; + /// no other DDL procedure executed concurrently for the current table. + table_info: Option>, /// The deadline of leader region lease. leader_region_lease_deadline: Option, /// The last_entry_id of leader region. @@ -153,7 +159,7 @@ impl Context { &self.server_addr } - /// Returns the `table_route_value` of [VolatileContext] if any. + /// Returns the `table_route` of [VolatileContext] if any. /// Otherwise, returns the value retrieved from remote. /// /// Retry: @@ -161,7 +167,7 @@ impl Context { pub async fn get_table_route_value( &mut self, ) -> Result<&DeserializedValueWithBytes> { - let table_route_value = &mut self.volatile_ctx.table_route_info; + let table_route_value = &mut self.volatile_ctx.table_route; if table_route_value.is_none() { let table_id = self.persistent_ctx.region_id.table_id(); @@ -183,9 +189,45 @@ impl Context { Ok(table_route_value.as_ref().unwrap()) } - /// Removes the `table_route_value` of [VolatileContext], returns true if any. + /// Removes the `table_route` of [VolatileContext], returns true if any. pub fn remove_table_route_value(&mut self) -> bool { - let value = self.volatile_ctx.table_route_info.take(); + let value = self.volatile_ctx.table_route.take(); + value.is_some() + } + + /// Returns the `table_info` of [VolatileContext] if any. + /// Otherwise, returns the value retrieved from remote. + /// + /// Retry: + /// - Failed to retrieve the metadata of table. + pub async fn get_table_info_value( + &mut self, + ) -> Result<&DeserializedValueWithBytes> { + let table_info_value = &mut self.volatile_ctx.table_info; + + if table_info_value.is_none() { + let table_id = self.persistent_ctx.region_id.table_id(); + let table_info = self + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(|e| error::Error::RetryLater { + reason: e.to_string(), + location: location!(), + })? + .context(error::TableInfoNotFoundSnafu { table_id })?; + + *table_info_value = Some(table_info); + } + + Ok(table_info_value.as_ref().unwrap()) + } + + /// Removes the `table_info` of [VolatileContext], returns true if any. + pub fn remove_table_info_value(&mut self) -> bool { + let value = self.volatile_ctx.table_info.take(); value.is_some() } diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 47eb0abb980f..b10a26886aec 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -137,16 +137,11 @@ mod tests { use super::*; use crate::error::Error; - use crate::procedure::region_migration::test_util::TestingEnv; + use crate::procedure::region_migration::test_util::{self, TestingEnv}; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; fn new_persistent_context() -> PersistentContext { - PersistentContext { - from_peer: Peer::empty(1), - to_peer: Peer::empty(2), - region_id: RegionId::new(1024, 1), - cluster_id: 0, - } + test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) } #[tokio::test] diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index fc4d9c7d9a45..2e126625347f 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -21,7 +21,7 @@ use common_meta::ddl::utils::region_storage_path; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::RegionIdent; use serde::{Deserialize, Serialize}; -use snafu::{location, Location, OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt}; use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; @@ -54,38 +54,28 @@ impl OpenCandidateRegion { /// /// Abort(non-retry): /// - Table Info is not found. - async fn build_open_region_instruction(&self, ctx: &Context) -> Result { + async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result { let pc = &ctx.persistent_ctx; let cluster_id = pc.cluster_id; let table_id = pc.region_id.table_id(); let region_number = pc.region_id.region_number(); - let candidate = &pc.to_peer; - let table_info = ctx - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(error::TableMetadataManagerSnafu) - .map_err(|e| error::Error::RetryLater { - reason: e.to_string(), - location: location!(), - })? - .context(error::TableInfoNotFoundSnafu { table_id })? - .into_inner() - .table_info; + let candidate_id = pc.to_peer.id; + + let table_info_value = ctx.get_table_info_value().await?; + let table_info = &table_info_value.table_info; // The region storage path is immutable after the region is created. // Therefore, it's safe to store it in `VolatileContext` for future use. let region_storage_path = region_storage_path(&table_info.catalog_name, &table_info.schema_name); - let engine = table_info.meta.engine; + let engine = table_info.meta.engine.clone(); let region_options: HashMap = (&table_info.meta.options).into(); let open_instruction = Instruction::OpenRegion(OpenRegion::new( RegionIdent { cluster_id, - datanode_id: candidate.id, + datanode_id: candidate_id, table_id, region_number, engine, @@ -198,17 +188,12 @@ mod tests { use crate::error::Error; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; use crate::procedure::region_migration::test_util::{ - new_close_region_reply, send_mock_reply, TestingEnv, + self, new_close_region_reply, send_mock_reply, TestingEnv, }; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; fn new_persistent_context() -> PersistentContext { - PersistentContext { - from_peer: Peer::empty(1), - to_peer: Peer::empty(2), - region_id: RegionId::new(1024, 1), - cluster_id: 0, - } + test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) } fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction { @@ -244,9 +229,12 @@ mod tests { let state = OpenCandidateRegion; let persistent_context = new_persistent_context(); let env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); - let err = state.build_open_region_instruction(&ctx).await.unwrap_err(); + let err = state + .build_open_region_instruction(&mut ctx) + .await + .unwrap_err(); assert_matches!(err, Error::TableInfoNotFound { .. }); assert!(!err.is_retryable()); diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 277c9f8d90fa..cc3779b8f54d 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -19,16 +19,19 @@ use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader}; use common_meta::instruction::{InstructionReply, SimpleReply}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::peer::Peer; use common_meta::sequence::Sequence; use common_meta::DatanodeId; use common_procedure::{Context as ProcedureContext, ProcedureId}; use common_procedure_test::MockContextProvider; use common_time::util::current_time_millis; +use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use super::ContextFactoryImpl; use crate::error::Result; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; +use crate::procedure::region_migration::PersistentContext; use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef}; use crate::service::mailbox::{Channel, MailboxRef}; @@ -127,6 +130,7 @@ impl TestingEnv { } } +/// Generates a [InstructionReply::CloseRegion] reply. pub fn new_close_region_reply(id: u64) -> MailboxMessage { MailboxMessage { id, @@ -144,6 +148,7 @@ pub fn new_close_region_reply(id: u64) -> MailboxMessage { } } +/// Sends a mock reply. pub fn send_mock_reply( mailbox: MailboxRef, mut rx: MockHeartbeatReceiver, @@ -155,3 +160,13 @@ pub fn send_mock_reply( mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap(); }); } + +/// Generates a [PersistentContext]. +pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> PersistentContext { + PersistentContext { + from_peer: Peer::empty(from), + to_peer: Peer::empty(to), + region_id, + cluster_id: 0, + } +} diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index 9c8d4b85b7e5..ba3092548efb 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -12,22 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod downgrade_leader_region; +pub(crate) mod upgrade_candidate_region; + use std::any::Any; -use std::time::Duration; -use common_meta::distributed_time_constants::REGION_LEASE_SECS; -use common_meta::rpc::router::RegionStatus; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; -use crate::error::{self, Result}; +use super::migration_end::RegionMigrationEnd; +use crate::error::Result; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; use crate::procedure::region_migration::{Context, State}; #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "UpdateMetadata")] pub enum UpdateMetadata { + /// Downgrades the leader region. Downgrade, + /// Upgrade the candidate region. + Upgrade, } #[async_trait::async_trait] @@ -40,6 +43,12 @@ impl State for UpdateMetadata { Ok(Box::::default()) } + UpdateMetadata::Upgrade => { + self.upgrade_candidate_region(ctx).await?; + + // TODO(weny): invalidate fe cache. + Ok(Box::new(RegionMigrationEnd)) + } } } @@ -47,198 +56,3 @@ impl State for UpdateMetadata { self } } - -impl UpdateMetadata { - /// Downgrades the leader region. - /// - /// Abort(non-retry): - /// - TableRoute is not found. - /// - /// Retry: - /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue). - /// - Failed to retrieve the metadata of table. - /// - /// About the failure of updating the [TableRouteValue](common_meta::key::table_region::TableRegionValue): - /// - /// - There may be another [RegionMigrationProcedure](crate::procedure::region_migration::RegionMigrationProcedure) - /// that is executed concurrently for **other region**. - /// It will only update **other region** info. Therefore, It's safe to retry after failure. - /// - /// - There is no other DDL procedure executed concurrently for the current table. - async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> { - let table_metadata_manager = ctx.table_metadata_manager.clone(); - let region_id = ctx.region_id(); - let table_id = region_id.table_id(); - let current_table_route_value = ctx.get_table_route_value().await?; - - if let Err(err) = table_metadata_manager - .update_leader_region_status(table_id, current_table_route_value, |route| { - if route.region.id == region_id { - Some(Some(RegionStatus::Downgraded)) - } else { - None - } - }) - .await - .context(error::TableMetadataManagerSnafu) - { - debug_assert!(ctx.remove_table_route_value()); - return error::RetryLaterSnafu { - reason: format!("Failed to update the table route during the downgrading leader region, error: {err}") - }.fail(); - } - - debug_assert!(ctx.remove_table_route_value()); - - ctx.volatile_ctx - .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS)); - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use common_meta::key::test_utils::new_test_table_info; - use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute}; - use store_api::storage::RegionId; - - use super::*; - use crate::error::Error; - use crate::procedure::region_migration::test_util::TestingEnv; - use crate::procedure::region_migration::{ContextFactory, PersistentContext}; - - fn new_persistent_context() -> PersistentContext { - PersistentContext { - from_peer: Peer::empty(1), - to_peer: Peer::empty(2), - region_id: RegionId::new(1024, 1), - cluster_id: 0, - } - } - - #[test] - fn test_state_serialization() { - let state = UpdateMetadata::Downgrade; - let expected = r#"{"UpdateMetadata":"Downgrade"}"#; - assert_eq!(expected, serde_json::to_string(&state).unwrap()); - } - - #[tokio::test] - async fn test_table_route_is_not_found_error() { - let state = UpdateMetadata::Downgrade; - let env = TestingEnv::new(); - let persistent_context = new_persistent_context(); - let mut ctx = env.context_factory().new_context(persistent_context); - - let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); - - assert_matches!(err, Error::TableRouteNotFound { .. }); - - assert!(!err.is_retryable()); - } - - #[tokio::test] - async fn test_failed_to_update_table_route_error() { - let state = UpdateMetadata::Downgrade; - let persistent_context = new_persistent_context(); - let from_peer = persistent_context.from_peer.clone(); - - let env = TestingEnv::new(); - let mut ctx = env.context_factory().new_context(persistent_context); - let table_id = ctx.region_id().table_id(); - - let table_info = new_test_table_info(1024, vec![1, 2]).into(); - let region_routes = vec![ - RegionRoute { - region: Region::new_test(RegionId::new(1024, 1)), - leader_peer: Some(from_peer.clone()), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(RegionId::new(1024, 2)), - leader_peer: Some(Peer::empty(4)), - ..Default::default() - }, - ]; - - let table_metadata_manager = env.table_metadata_manager(); - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - - let original_table_route = table_metadata_manager - .table_route_manager() - .get(table_id) - .await - .unwrap() - .unwrap(); - - // modifies the table route. - table_metadata_manager - .update_leader_region_status(table_id, &original_table_route, |route| { - if route.region.id == RegionId::new(1024, 2) { - Some(Some(RegionStatus::Downgraded)) - } else { - None - } - }) - .await - .unwrap(); - - // sets the old table route. - ctx.volatile_ctx.table_route_info = Some(original_table_route); - - let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); - - assert_matches!(err, Error::RetryLater { .. }); - - assert!(err.is_retryable()); - assert!(err.to_string().contains("Failed to update the table route")); - } - - #[tokio::test] - async fn test_next_downgrade_leader_region_state() { - let mut state = Box::new(UpdateMetadata::Downgrade); - let persistent_context = new_persistent_context(); - let from_peer = persistent_context.from_peer.clone(); - - let env = TestingEnv::new(); - let mut ctx = env.context_factory().new_context(persistent_context); - let table_id = ctx.region_id().table_id(); - - let table_info = new_test_table_info(1024, vec![1, 2]).into(); - let region_routes = vec![RegionRoute { - region: Region::new_test(RegionId::new(1024, 1)), - leader_peer: Some(from_peer.clone()), - ..Default::default() - }]; - - let table_metadata_manager = env.table_metadata_manager(); - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - - let next = state.next(&mut ctx).await.unwrap(); - - let _ = next - .as_any() - .downcast_ref::() - .unwrap(); - - let latest_table_route = table_metadata_manager - .table_route_manager() - .get(table_id) - .await - .unwrap() - .unwrap(); - - assert!(latest_table_route.region_routes[0].is_leader_downgraded()); - assert!(ctx.volatile_ctx.table_route_info.is_none()); - } -} diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs new file mode 100644 index 000000000000..7ff5e59942b7 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -0,0 +1,210 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::rpc::router::RegionStatus; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::procedure::region_migration::update_metadata::UpdateMetadata; +use crate::procedure::region_migration::Context; + +impl UpdateMetadata { + /// Downgrades the leader region. + /// + /// Abort(non-retry): + /// - TableRoute is not found. + /// + /// Retry: + /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue). + /// - Failed to retrieve the metadata of table. + /// + /// About the failure of updating the [TableRouteValue](common_meta::key::table_region::TableRegionValue): + /// + /// - There may be another [RegionMigrationProcedure](crate::procedure::region_migration::RegionMigrationProcedure) + /// that is executed concurrently for **other region**. + /// It will only update **other region** info. Therefore, It's safe to retry after failure. + /// + /// - There is no other DDL procedure executed concurrently for the current table. + pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> { + let table_metadata_manager = ctx.table_metadata_manager.clone(); + let region_id = ctx.region_id(); + let table_id = region_id.table_id(); + let current_table_route_value = ctx.get_table_route_value().await?; + + if let Err(err) = table_metadata_manager + .update_leader_region_status(table_id, current_table_route_value, |route| { + if route.region.id == region_id { + Some(Some(RegionStatus::Downgraded)) + } else { + None + } + }) + .await + .context(error::TableMetadataManagerSnafu) + { + debug_assert!(ctx.remove_table_route_value()); + return error::RetryLaterSnafu { + reason: format!("Failed to update the table route during the downgrading leader region, error: {err}") + }.fail(); + } + + debug_assert!(ctx.remove_table_route_value()); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::key::test_utils::new_test_table_info; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use store_api::storage::RegionId; + + use crate::error::Error; + use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; + use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::update_metadata::UpdateMetadata; + use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; + + fn new_persistent_context() -> PersistentContext { + test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) + } + + #[test] + fn test_state_serialization() { + let state = UpdateMetadata::Downgrade; + let expected = r#"{"UpdateMetadata":"Downgrade"}"#; + assert_eq!(expected, serde_json::to_string(&state).unwrap()); + } + + #[tokio::test] + async fn test_table_route_is_not_found_error() { + let state = UpdateMetadata::Downgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); + + assert_matches!(err, Error::TableRouteNotFound { .. }); + + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_failed_to_update_table_route_error() { + let state = UpdateMetadata::Downgrade; + let persistent_context = new_persistent_context(); + let from_peer = persistent_context.from_peer.clone(); + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let table_id = ctx.region_id().table_id(); + + let table_info = new_test_table_info(1024, vec![1, 2]).into(); + let region_routes = vec![ + RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(from_peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(4)), + ..Default::default() + }, + ]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let original_table_route = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap(); + + // modifies the table route. + table_metadata_manager + .update_leader_region_status(table_id, &original_table_route, |route| { + if route.region.id == RegionId::new(1024, 2) { + Some(Some(RegionStatus::Downgraded)) + } else { + None + } + }) + .await + .unwrap(); + + // sets the old table route. + ctx.volatile_ctx.table_route = Some(original_table_route); + + let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); + + assert!(ctx.volatile_ctx.table_route.is_none()); + + assert_matches!(err, Error::RetryLater { .. }); + + assert!(err.is_retryable()); + assert!(err.to_string().contains("Failed to update the table route")); + } + + #[tokio::test] + async fn test_next_downgrade_leader_region_state() { + let mut state = Box::new(UpdateMetadata::Downgrade); + let persistent_context = new_persistent_context(); + let from_peer = persistent_context.from_peer.clone(); + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let table_id = ctx.region_id().table_id(); + + let table_info = new_test_table_info(1024, vec![1, 2]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(from_peer.clone()), + ..Default::default() + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state.next(&mut ctx).await.unwrap(); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + + let latest_table_route = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap(); + + assert!(latest_table_route.region_routes[0].is_leader_downgraded()); + assert!(ctx.volatile_ctx.table_route.is_none()); + } +} diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs new file mode 100644 index 000000000000..22c732815ea1 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -0,0 +1,376 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_meta::ddl::utils::region_storage_path; +use common_meta::key::datanode_table::RegionInfo; +use common_meta::rpc::router::RegionRoute; +use common_telemetry::{info, warn}; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::error::{self, Result}; +use crate::procedure::region_migration::update_metadata::UpdateMetadata; +use crate::procedure::region_migration::Context; + +impl UpdateMetadata { + /// Returns new [Vec]. + async fn build_upgrade_candidate_region_metadata( + &self, + ctx: &mut Context, + ) -> Result> { + let region_id = ctx.region_id(); + let table_route_value = ctx.get_table_route_value().await?.clone(); + + let mut region_routes = table_route_value.region_routes.clone(); + let region_route = region_routes + .iter_mut() + .find(|route| route.region.id == region_id) + .context(error::RegionRouteNotFoundSnafu { region_id })?; + + // Removes downgraded status. + region_route.set_leader_status(None); + + let candidate = &ctx.persistent_ctx.to_peer; + let expected_old_leader = &ctx.persistent_ctx.from_peer; + + // Upgrades candidate to leader. + ensure!(region_route + .leader_peer + .take_if(|old_leader| old_leader.id == expected_old_leader.id) + .is_some(), + error::UnexpectedSnafu{ + violated: format!("Unexpected region leader: {:?} during the upgrading candidate metadata, expected: {:?}", region_route.leader_peer, expected_old_leader), + } + ); + + region_route.leader_peer = Some(candidate.clone()); + info!( + "Upgrading candidate region to leader region: {:?} for region: {}", + candidate, region_id + ); + + // Removes the candidate region in followers. + let removed = region_route + .follower_peers + .extract_if(|peer| peer.id == candidate.id) + .collect::>(); + + if removed.len() > 1 { + warn!( + "Removes duplicated regions: {removed:?} during the upgrading candidate metadata for region: {region_id}" + ); + } + + Ok(region_routes) + } + + /// Upgrades the candidate region. + /// + /// Abort(non-retry): + /// - TableRoute or RegionRoute is not found. + /// Typically, it's impossible, there is no other DDL procedure executed concurrently for the current table. + /// + /// Retry: + /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue). + /// - Failed to retrieve the metadata of table. + pub async fn upgrade_candidate_region(&self, ctx: &mut Context) -> Result<()> { + let region_id = ctx.region_id(); + let table_metadata_manager = ctx.table_metadata_manager.clone(); + + let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?; + let table_info_value = ctx.get_table_info_value().await?; + + let table_info = &table_info_value.table_info; + let region_storage_path = + region_storage_path(&table_info.catalog_name, &table_info.schema_name); + let engine = table_info.meta.engine.clone(); + let region_options: HashMap = (&table_info.meta.options).into(); + + // No remote fetch. + let table_route_value = ctx.get_table_route_value().await?; + + if let Err(err) = table_metadata_manager + .update_table_route( + region_id.table_id(), + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.to_string(), + region_options: region_options.clone(), + }, + table_route_value, + region_routes, + ®ion_options, + ) + .await + .context(error::TableMetadataManagerSnafu) + { + debug_assert!(ctx.remove_table_route_value()); + return error::RetryLaterSnafu { + reason: format!("Failed to update the table route during the upgrading candidate region, error: {err}") + }.fail(); + }; + + debug_assert!(ctx.remove_table_route_value()); + // Consumes the guard. + ctx.volatile_ctx.opening_region_guard.take(); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::key::test_utils::new_test_table_info; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use store_api::storage::RegionId; + + use crate::error::Error; + use crate::procedure::region_migration::migration_end::RegionMigrationEnd; + use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::update_metadata::UpdateMetadata; + use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; + use crate::region::lease_keeper::OpeningRegionKeeper; + + fn new_persistent_context() -> PersistentContext { + test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) + } + + #[tokio::test] + async fn test_table_route_is_not_found_error() { + let state = UpdateMetadata::Upgrade; + + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let err = state + .build_upgrade_candidate_region_metadata(&mut ctx) + .await + .unwrap_err(); + + assert_matches!(err, Error::TableRouteNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_region_route_is_not_found() { + let state = UpdateMetadata::Upgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let table_info = new_test_table_info(1024, vec![2]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(4)), + ..Default::default() + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let err = state + .build_upgrade_candidate_region_metadata(&mut ctx) + .await + .unwrap_err(); + + assert_matches!(err, Error::RegionRouteNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_region_route_expected_leader() { + let state = UpdateMetadata::Upgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(3)), + ..Default::default() + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let err = state + .build_upgrade_candidate_region_metadata(&mut ctx) + .await + .unwrap_err(); + + assert_matches!(err, Error::Unexpected { .. }); + assert!(!err.is_retryable()); + assert!(err.to_string().contains("Unexpected region leader")); + } + + #[tokio::test] + async fn test_build_upgrade_candidate_region_metadata() { + let state = UpdateMetadata::Upgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(2), Peer::empty(3)], + leader_status: Some(RegionStatus::Downgraded), + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let new_region_routes = state + .build_upgrade_candidate_region_metadata(&mut ctx) + .await + .unwrap(); + + assert!(!new_region_routes[0].is_leader_downgraded()); + assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]); + assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2); + } + + #[tokio::test] + async fn test_failed_to_update_table_route_error() { + let state = UpdateMetadata::Upgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let mut ctx = env.context_factory().new_context(persistent_context); + let opening_keeper = OpeningRegionKeeper::default(); + + let table_id = 1024; + let table_info = new_test_table_info(table_id, vec![1]).into(); + let region_routes = vec![ + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(5), Peer::empty(3)], + leader_status: Some(RegionStatus::Downgraded), + }, + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 2)), + leader_peer: Some(Peer::empty(4)), + leader_status: Some(RegionStatus::Downgraded), + ..Default::default() + }, + ]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let original_table_route = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap(); + + // modifies the table route. + table_metadata_manager + .update_leader_region_status(table_id, &original_table_route, |route| { + if route.region.id == RegionId::new(1024, 2) { + // Removes the status. + Some(None) + } else { + None + } + }) + .await + .unwrap(); + + // sets the old table route. + ctx.volatile_ctx.table_route = Some(original_table_route); + let guard = opening_keeper + .register(2, RegionId::new(table_id, 1)) + .unwrap(); + ctx.volatile_ctx.opening_region_guard = Some(guard); + + let err = state.upgrade_candidate_region(&mut ctx).await.unwrap_err(); + + assert!(ctx.volatile_ctx.table_route.is_none()); + assert!(ctx.volatile_ctx.opening_region_guard.is_some()); + assert_matches!(err, Error::RetryLater { .. }); + + assert!(err.is_retryable()); + assert!(err.to_string().contains("Failed to update the table route")); + } + + #[tokio::test] + async fn test_next_migration_end_state() { + let mut state = Box::new(UpdateMetadata::Upgrade); + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let mut ctx = env.context_factory().new_context(persistent_context); + let opening_keeper = OpeningRegionKeeper::default(); + + let table_id = 1024; + let table_info = new_test_table_info(table_id, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + leader_status: Some(RegionStatus::Downgraded), + ..Default::default() + }]; + + let guard = opening_keeper + .register(2, RegionId::new(table_id, 1)) + .unwrap(); + ctx.volatile_ctx.opening_region_guard = Some(guard); + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state.next(&mut ctx).await.unwrap(); + + let _ = next.as_any().downcast_ref::().unwrap(); + + let region_routes = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap() + .into_inner() + .region_routes; + + assert!(ctx.volatile_ctx.table_route.is_none()); + assert!(ctx.volatile_ctx.opening_region_guard.is_none()); + assert_eq!(region_routes.len(), 1); + assert!(!region_routes[0].is_leader_downgraded()); + assert!(region_routes[0].follower_peers.is_empty()); + assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2); + } +}