diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index f799f321e544..d767d098a79f 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -87,6 +87,11 @@ impl TableRouteValue { .cloned() } + /// Returns true if it's [TableRouteValue::Physical]. + pub fn is_physical(&self) -> bool { + matches!(self, TableRouteValue::Physical(_)) + } + /// Gets the [RegionRoute]s of this [TableRouteValue::Physical]. /// /// # Panics diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 7fa9f7d217c3..92d7249e33ca 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -32,6 +32,12 @@ use crate::pubsub::Message; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Another migration procedure is running for region: {}", region_id))] + MigrationRunning { + location: Location, + region_id: RegionId, + }, + #[snafu(display("The region migration procedure aborted, reason: {}", reason))] MigrationAbort { location: Location, reason: String }, @@ -675,7 +681,8 @@ impl ErrorExt for Error { | Error::TableIdChanged { .. } | Error::RegionOpeningRace { .. } | Error::RegionRouteNotFound { .. } - | Error::MigrationAbort { .. } => StatusCode::Unexpected, + | Error::MigrationAbort { .. } + | Error::MigrationRunning { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidateTableCache { source, .. } => source.status_code(), Error::RequestDatanode { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 5e935d9328bf..5299972236e0 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -13,6 +13,9 @@ // limitations under the License. pub(crate) mod downgrade_leader_region; +// TODO(weny): remove it. +#[allow(dead_code)] +pub(crate) mod manager; pub(crate) mod migration_abort; pub(crate) mod migration_end; pub(crate) mod migration_start; @@ -123,6 +126,7 @@ pub trait ContextFactory { } /// Default implementation. +#[derive(Clone)] pub struct ContextFactoryImpl { volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs new file mode 100644 index 000000000000..cfb125c49ba2 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -0,0 +1,452 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt::Display; +use std::sync::{Arc, RwLock}; + +use common_meta::key::table_route::TableRouteValue; +use common_meta::peer::Peer; +use common_meta::rpc::router::RegionRoute; +use common_meta::ClusterId; +use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId}; +use common_telemetry::{error, info}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::storage::RegionId; + +use crate::error::{self, Result}; +use crate::procedure::region_migration::{ + ContextFactoryImpl, PersistentContext, RegionMigrationProcedure, +}; + +/// Manager of region migration procedure. +pub(crate) struct RegionMigrationManager { + procedure_manager: ProcedureManagerRef, + running_procedures: Arc>>, + context_factory: ContextFactoryImpl, +} + +/// The guard of running [RegionMigrationProcedureTask]. +pub(crate) struct RegionMigrationProcedureGuard { + region_id: RegionId, + running_procedures: Arc>>, +} + +impl Drop for RegionMigrationProcedureGuard { + fn drop(&mut self) { + self.running_procedures + .write() + .unwrap() + .remove(&self.region_id); + } +} + +#[derive(Debug, Clone)] +pub(crate) struct RegionMigrationProcedureTask { + cluster_id: ClusterId, + region_id: RegionId, + from_peer: Peer, + to_peer: Peer, +} + +impl Display for RegionMigrationProcedureTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "cluster: {}, region: {}, from_peer: {}, to_peer: {}", + self.cluster_id, self.region_id, self.from_peer, self.to_peer + ) + } +} + +impl From for PersistentContext { + fn from( + RegionMigrationProcedureTask { + cluster_id, + region_id, + from_peer, + to_peer, + }: RegionMigrationProcedureTask, + ) -> Self { + PersistentContext { + cluster_id, + from_peer, + to_peer, + region_id, + } + } +} + +impl RegionMigrationManager { + /// Returns new [RegionMigrationManager] + pub(crate) fn new( + procedure_manager: ProcedureManagerRef, + context_factory: ContextFactoryImpl, + ) -> Self { + Self { + procedure_manager, + running_procedures: Arc::new(RwLock::new(HashMap::new())), + context_factory, + } + } + + /// Registers the loader of [RegionMigrationProcedure] to the `ProcedureManager`. + pub(crate) fn try_start(&self) -> Result<()> { + let context_factory = self.context_factory.clone(); + self.procedure_manager + .register_loader( + RegionMigrationProcedure::TYPE_NAME, + Box::new(move |json| { + let context_factory = context_factory.clone(); + RegionMigrationProcedure::from_json(json, context_factory) + .map(|p| Box::new(p) as _) + }), + ) + .context(error::RegisterProcedureLoaderSnafu { + type_name: RegionMigrationProcedure::TYPE_NAME, + }) + } + + fn insert_running_procedure( + &self, + task: &RegionMigrationProcedureTask, + ) -> Option { + let mut procedures = self.running_procedures.write().unwrap(); + + match procedures.entry(task.region_id) { + Entry::Occupied(_) => None, + Entry::Vacant(v) => { + v.insert(task.clone()); + Some(RegionMigrationProcedureGuard { + region_id: task.region_id, + running_procedures: self.running_procedures.clone(), + }) + } + } + } + + fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> { + if task.to_peer.id == task.from_peer.id { + return error::InvalidArgumentsSnafu { + err_msg: "The `from_peer_id` can't equal `to_peer_id`", + } + .fail(); + } + + Ok(()) + } + + async fn retrieve_table_route(&self, region_id: RegionId) -> Result { + let table_route = self + .context_factory + .table_metadata_manager + .table_route_manager() + .get(region_id.table_id()) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::TableRouteNotFoundSnafu { + table_id: region_id.table_id(), + })?; + + Ok(table_route.into_inner()) + } + + /// Verifies the type of region migration table route. + fn verify_table_route( + &self, + table_route: &TableRouteValue, + task: &RegionMigrationProcedureTask, + ) -> Result<()> { + if !table_route.is_physical() { + return error::UnexpectedSnafu { + violated: format!( + "Trying to execute region migration on the logical table, task {task}" + ), + } + .fail(); + } + + Ok(()) + } + + /// Returns true if the region has been migrated. + fn has_migrated( + &self, + region_route: &RegionRoute, + task: &RegionMigrationProcedureTask, + ) -> Result { + let leader_peer = region_route + .leader_peer + .as_ref() + .context(error::UnexpectedSnafu { + violated: "Region route leader peer is not found", + })?; + + Ok(leader_peer.id == task.to_peer.id) + } + + /// Throws an error if `leader_peer` is not the `from_peer`. + fn verify_region_leader_peer( + &self, + region_route: &RegionRoute, + task: &RegionMigrationProcedureTask, + ) -> Result<()> { + let leader_peer = region_route + .leader_peer + .as_ref() + .context(error::UnexpectedSnafu { + violated: "Region route leader peer is not found", + })?; + + ensure!( + leader_peer.id == task.from_peer.id, + error::InvalidArgumentsSnafu { + err_msg: "Invalid region migration `from_peer` argument" + } + ); + + Ok(()) + } + + /// Submits a new region migration procedure. + pub(crate) async fn submit_procedure(&self, task: RegionMigrationProcedureTask) -> Result<()> { + let Some(guard) = self.insert_running_procedure(&task) else { + return error::MigrationRunningSnafu { + region_id: task.region_id, + } + .fail(); + }; + + self.verify_task(&task)?; + + let region_id = task.region_id; + + let table_route = self.retrieve_table_route(region_id).await?; + self.verify_table_route(&table_route, &task)?; + + // Safety: checked before. + let region_route = table_route + .region_route(region_id) + .context(error::RegionRouteNotFoundSnafu { region_id })?; + + if self.has_migrated(®ion_route, &task)? { + info!("Skipping region migration task: {task}"); + return Ok(()); + } + + self.verify_region_leader_peer(®ion_route, &task)?; + + let procedure = + RegionMigrationProcedure::new(task.clone().into(), self.context_factory.clone()); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; + info!("Starting region migration procedure {procedure_id} for {task}"); + + let procedure_manager = self.procedure_manager.clone(); + + common_runtime::spawn_bg(async move { + let _ = guard; + let watcher = &mut match procedure_manager.submit(procedure_with_id).await { + Ok(watcher) => watcher, + Err(e) => { + error!(e; "Failed to submit region migration procedure {procedure_id} for {task}"); + return; + } + }; + + if let Err(e) = watcher::wait(watcher).await { + error!(e; "Failed to wait region migration procedure {procedure_id} for {task}"); + return; + } + + info!("Region migration procedure {procedure_id} for {task} is finished successfully!"); + }); + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::assert_matches::assert_matches; + + use common_meta::key::table_route::LogicalTableRouteValue; + use common_meta::key::test_utils::new_test_table_info; + use common_meta::rpc::router::Region; + + use super::*; + use crate::procedure::region_migration::test_util::TestingEnv; + + #[tokio::test] + async fn test_insert_running_procedure() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationProcedureTask { + cluster_id: 1, + region_id, + from_peer: Peer::empty(2), + to_peer: Peer::empty(1), + }; + // Inserts one + manager + .running_procedures + .write() + .unwrap() + .insert(region_id, task.clone()); + + let err = manager.submit_procedure(task).await.unwrap_err(); + assert_matches!(err, error::Error::MigrationRunning { .. }); + } + + #[tokio::test] + async fn test_submit_procedure_invalid_task() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationProcedureTask { + cluster_id: 1, + region_id, + from_peer: Peer::empty(1), + to_peer: Peer::empty(1), + }; + + let err = manager.submit_procedure(task).await.unwrap_err(); + assert_matches!(err, error::Error::InvalidArguments { .. }); + } + + #[tokio::test] + async fn test_submit_procedure_table_not_found() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationProcedureTask { + cluster_id: 1, + region_id, + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + }; + + let err = manager.submit_procedure(task).await.unwrap_err(); + assert_matches!(err, error::Error::TableRouteNotFound { .. }); + } + + #[tokio::test] + async fn test_submit_procedure_region_route_not_found() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationProcedureTask { + cluster_id: 1, + region_id, + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + }; + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(3)), + ..Default::default() + }]; + + env.create_physical_table_metadata(table_info, region_routes) + .await; + + let err = manager.submit_procedure(task).await.unwrap_err(); + assert_matches!(err, error::Error::RegionRouteNotFound { .. }); + } + + #[tokio::test] + async fn test_submit_procedure_incorrect_from_peer() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationProcedureTask { + cluster_id: 1, + region_id, + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + }; + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(3)), + ..Default::default() + }]; + + env.create_physical_table_metadata(table_info, region_routes) + .await; + + let err = manager.submit_procedure(task).await.unwrap_err(); + assert_matches!(err, error::Error::InvalidArguments { .. }); + assert!(err + .to_string() + .contains("Invalid region migration `from_peer` argument")); + } + + #[tokio::test] + async fn test_submit_procedure_has_migrated() { + common_telemetry::init_default_ut_logging(); + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationProcedureTask { + cluster_id: 1, + region_id, + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + }; + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(2)), + ..Default::default() + }]; + + env.create_physical_table_metadata(table_info, region_routes) + .await; + + manager.submit_procedure(task).await.unwrap(); + } + + #[tokio::test] + async fn test_verify_table_route_error() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationProcedureTask { + cluster_id: 1, + region_id, + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + }; + + let err = manager + .verify_table_route(&TableRouteValue::Logical(LogicalTableRouteValue {}), &task) + .unwrap_err(); + + assert_matches!(err, error::Error::Unexpected { .. }); + } +} diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index b34b1e655f4c..f3a0183f233f 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -29,8 +29,10 @@ use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::rpc::router::RegionRoute; use common_meta::sequence::{Sequence, SequenceBuilder}; +use common_meta::state_store::KvStateStore; use common_meta::DatanodeId; -use common_procedure::{Context as ProcedureContext, ProcedureId, Status}; +use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::{Context as ProcedureContext, ProcedureId, ProcedureManagerRef, Status}; use common_procedure_test::MockContextProvider; use common_telemetry::debug; use common_time::util::current_time_millis; @@ -90,6 +92,7 @@ pub struct TestingEnv { mailbox_ctx: MailboxContext, opening_region_keeper: MemoryRegionKeeperRef, server_addr: String, + procedure_manager: ProcedureManagerRef, } impl TestingEnv { @@ -104,11 +107,15 @@ impl TestingEnv { let mailbox_ctx = MailboxContext::new(mailbox_sequence); let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); + let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); + let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); + Self { table_metadata_manager, opening_region_keeper, mailbox_ctx, server_addr: "localhost".to_string(), + procedure_manager, } } @@ -146,6 +153,11 @@ impl TestingEnv { } } + /// Returns the [ProcedureManagerRef]. + pub fn procedure_manager(&self) -> &ProcedureManagerRef { + &self.procedure_manager + } + // Creates a table metadata with the physical table route. pub async fn create_physical_table_metadata( &self, diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 5a76d34819e7..cc67aa7ca8e9 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -43,6 +43,7 @@ impl UpdateMetadata { let table_id = region_id.table_id(); let current_table_route_value = ctx.get_table_route_value().await?; + // TODO(weny): ensures the leader region peer is the `from_peer`. if let Err(err) = table_metadata_manager .update_leader_region_status(table_id, current_table_route_value, |route| { if route.region.id == region_id