diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index e05168d9d68c..621a68243036 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -551,6 +551,13 @@ pub enum Error { }, } +impl Error { + /// Returns `true` if the error is retryable. + pub fn is_retryable(&self) -> bool { + matches!(self, Error::RetryLater { .. }) + } +} + pub type Result = std::result::Result; define_into_tonic_status!(Error); diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 6bd13c65abde..025bd96ca1aa 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -12,12 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod downgrade_leader_region; pub(crate) mod migration_end; pub(crate) mod migration_start; +pub(crate) mod open_candidate_region; +#[cfg(test)] +pub(crate) mod test_util; +use std::any::Any; use std::fmt::Debug; +use common_meta::key::TableMetadataManagerRef; use common_meta::peer::Peer; +use common_meta::ClusterId; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; @@ -37,10 +44,12 @@ use crate::procedure::utils::region_lock_key; /// **Notes: Stores with too large data in the context might incur replication overhead.** #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PersistentContext { + /// The Id of the cluster. + cluster_id: ClusterId, /// The [Peer] of migration source. from_peer: Peer, /// The [Peer] of migration destination. - to_peer: Option, + to_peer: Peer, /// The [RegionId] of migration region. region_id: RegionId, } @@ -60,8 +69,16 @@ impl PersistentContext { pub struct VolatileContext {} /// The context of procedure execution. -#[derive(Debug, Clone)] -pub struct Context {} +pub struct Context { + table_metadata_manager: TableMetadataManagerRef, +} + +impl Context { + /// Returns address of meta server. + pub fn server_addr(&self) -> &str { + todo!() + } +} #[async_trait::async_trait] #[typetag::serde(tag = "region_migration_state")] @@ -78,6 +95,9 @@ trait State: Sync + Send + Debug { fn status(&self) -> Status { Status::Executing { persist: true } } + + /// Returns as [Any](std::any::Any). + fn as_any(&self) -> &dyn Any; } /// Persistent data of [RegionMigrationProcedure]. @@ -87,7 +107,6 @@ pub struct RegionMigrationData { state: Box, } -#[derive(Debug)] pub struct RegionMigrationProcedure { data: RegionMigrationData, context: Context, @@ -167,39 +186,27 @@ impl Procedure for RegionMigrationProcedure { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; - use std::sync::Arc; - - use common_procedure::ProcedureId; - use common_procedure_test::MockContextProvider; use super::migration_end::RegionMigrationEnd; use super::*; + use crate::procedure::region_migration::test_util::TestingEnv; - fn persistent_context_factory() -> PersistentContext { + fn new_persistent_context() -> PersistentContext { PersistentContext { from_peer: Peer::empty(1), - to_peer: None, + to_peer: Peer::empty(2), region_id: RegionId::new(1024, 1), - } - } - - fn context_factory() -> Context { - Context {} - } - - fn procedure_context_factory() -> ProcedureContext { - ProcedureContext { - procedure_id: ProcedureId::random(), - provider: Arc::new(MockContextProvider::default()), + cluster_id: 0, } } #[test] fn test_lock_key() { - let persistent_context = persistent_context_factory(); + let persistent_context = new_persistent_context(); let expected_key = persistent_context.lock_key(); - let context = context_factory(); + let env = TestingEnv::new(); + let context = env.context(); let procedure = RegionMigrationProcedure::new(persistent_context, context); @@ -211,9 +218,10 @@ mod tests { #[test] fn test_data_serialization() { - let persistent_context = persistent_context_factory(); + let persistent_context = new_persistent_context(); - let context = context_factory(); + let env = TestingEnv::new(); + let context = env.context(); let procedure = RegionMigrationProcedure::new(persistent_context, context); @@ -245,33 +253,39 @@ mod tests { })) } } + + fn as_any(&self) -> &dyn Any { + self + } } #[tokio::test] async fn test_execution_after_deserialized() { - fn new_mock_procedure() -> RegionMigrationProcedure { - let persistent_context = persistent_context_factory(); - let context = context_factory(); + let env = TestingEnv::new(); + + fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure { + let persistent_context = new_persistent_context(); + let context = env.context(); let state = Box::::default(); RegionMigrationProcedure::new_inner(state, persistent_context, context) } - let ctx = procedure_context_factory(); - let mut procedure = new_mock_procedure(); + let ctx = TestingEnv::procedure_context(); + let mut procedure = new_mock_procedure(&env); let mut status = None; for _ in 0..3 { status = Some(procedure.execute(&ctx).await.unwrap()); } assert_matches!(status.unwrap(), Status::Done); - let ctx = procedure_context_factory(); - let mut procedure = new_mock_procedure(); + let ctx = TestingEnv::procedure_context(); + let mut procedure = new_mock_procedure(&env); status = Some(procedure.execute(&ctx).await.unwrap()); let serialized = procedure.dump().unwrap(); - let context = context_factory(); + let context = env.context(); let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap(); for _ in 1..3 { diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs new file mode 100644 index 000000000000..68034c1803be --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DowngradeLeaderRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for DowngradeLeaderRegion { + async fn next( + &mut self, + _ctx: &Context, + _pc: &mut PersistentContext, + _vc: &mut VolatileContext, + ) -> Result> { + todo!() + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/meta-srv/src/procedure/region_migration/migration_end.rs b/src/meta-srv/src/procedure/region_migration/migration_end.rs index 87e64b79dc22..67acca3aec63 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_end.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_end.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + use common_procedure::Status; use serde::{Deserialize, Serialize}; @@ -36,4 +38,8 @@ impl State for RegionMigrationEnd { fn status(&self) -> Status { Status::Done } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 7f445079143a..97980ba0d52c 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -12,24 +12,287 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + +use common_meta::peer::Peer; +use common_meta::rpc::router::RegionRoute; use serde::{Deserialize, Serialize}; +use snafu::{location, Location, OptionExt, ResultExt}; +use store_api::storage::RegionId; -use crate::error::Result; +use super::downgrade_leader_region::DowngradeLeaderRegion; +use super::migration_end::RegionMigrationEnd; +use super::open_candidate_region::OpenCandidateRegion; +use crate::error::{self, Result}; use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; #[derive(Debug, Serialize, Deserialize)] -pub struct RegionMigrationStart {} +pub struct RegionMigrationStart; #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationStart { async fn next( &mut self, - _ctx: &Context, - _pc: &mut PersistentContext, + ctx: &Context, + pc: &mut PersistentContext, _vc: &mut VolatileContext, ) -> Result> { - // TODO(weny): It will be added in the following PRs. - todo!() + let region_id = pc.region_id; + let candidate = &pc.from_peer; + let leader = &pc.to_peer; + let region_routes = self.retrieve_regions_route(ctx, region_id).await?; + + if self.check_region_on_leader_peer(®ion_routes, leader)? { + Ok(Box::new(RegionMigrationEnd)) + } else if self.check_region_on_candidate_peer(®ion_routes, candidate) { + Ok(Box::new(DowngradeLeaderRegion)) + } else { + Ok(Box::new(OpenCandidateRegion)) + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl RegionMigrationStart { + /// Retrieves region route. + /// + /// Abort(non-retry): + /// - TableRoute is not found. + /// - RegionRoute is not found. + /// + /// Retry: + /// - Failed to retrieve the metadata of table. + async fn retrieve_regions_route( + &self, + ctx: &Context, + region_id: RegionId, + ) -> Result { + let table_id = region_id.table_id(); + let table_route = ctx + .table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(|e| error::Error::RetryLater { + reason: e.to_string(), + location: location!(), + })? + .context(error::TableRouteNotFoundSnafu { table_id })?; + + let region_route = table_route + .region_routes + .iter() + .find(|route| route.region.id == region_id) + .cloned() + .context(error::UnexpectedSnafu { + violated: format!( + "RegionRoute({}) is not found in TableRoute({})", + region_id, table_id + ), + })?; + + Ok(region_route) + } + + /// Checks whether the region on candidate region has been opened. + /// Returns true if it's been opened. + fn check_region_on_candidate_peer(&self, region_route: &RegionRoute, candidate: &Peer) -> bool { + let region_opened = region_route + .follower_peers + .iter() + .any(|peer| peer.id == candidate.id); + + region_opened + } + + /// Checks whether the region on candidate region has been opened. + /// Returns true if it's been opened. + /// + /// Abort(non-retry): + /// - Leader peer of RegionRoute is not found. + fn check_region_on_leader_peer( + &self, + region_route: &RegionRoute, + leader: &Peer, + ) -> Result { + let region_id = region_route.region.id; + + let region_opened = region_route + .leader_peer + .as_ref() + .context(error::UnexpectedSnafu { + violated: format!("Leader peer is not found in TableRoute({})", region_id), + })? + .id + == leader.id; + + Ok(region_opened) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::key::test_utils::new_test_table_info; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + use crate::procedure::region_migration::test_util::TestingEnv; + + fn new_persistent_context() -> PersistentContext { + PersistentContext { + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + region_id: RegionId::new(1024, 1), + cluster_id: 0, + } + } + + #[tokio::test] + async fn test_table_route_is_not_found_error() { + let state = RegionMigrationStart; + let env = TestingEnv::new(); + let ctx = env.context(); + + let err = state + .retrieve_regions_route(&ctx, RegionId::new(1024, 1)) + .await + .unwrap_err(); + + assert_matches!(err, Error::TableRouteNotFound { .. }); + + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_region_route_is_not_found_error() { + let state = RegionMigrationStart; + let persistent_context = new_persistent_context(); + let env = TestingEnv::new(); + let ctx = env.context(); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_route = RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(persistent_context.from_peer.clone()), + ..Default::default() + }; + + env.table_metadata_manager() + .create_table_metadata(table_info, vec![region_route]) + .await + .unwrap(); + + let err = state + .retrieve_regions_route(&ctx, RegionId::new(1024, 3)) + .await + .unwrap_err(); + + assert_matches!(err, Error::Unexpected { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_next_downgrade_leader_region_state() { + let mut state = Box::new(RegionMigrationStart); + // from_peer: 1 + // to_peer: 2 + let mut persistent_context = new_persistent_context(); + let mut volatile_context = VolatileContext::default(); + let env = TestingEnv::new(); + let ctx = env.context(); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(persistent_context.region_id), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![persistent_context.from_peer.clone()], + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state + .next(&ctx, &mut persistent_context, &mut volatile_context) + .await + .unwrap(); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + } + + #[tokio::test] + async fn test_next_migration_end_state() { + let mut state = Box::new(RegionMigrationStart); + // from_peer: 1 + // to_peer: 2 + let mut persistent_context = new_persistent_context(); + let mut volatile_context = VolatileContext::default(); + let env = TestingEnv::new(); + let ctx = env.context(); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(persistent_context.region_id), + leader_peer: Some(persistent_context.to_peer.clone()), + follower_peers: vec![persistent_context.from_peer.clone()], + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state + .next(&ctx, &mut persistent_context, &mut volatile_context) + .await + .unwrap(); + + let _ = next.as_any().downcast_ref::().unwrap(); + } + + #[tokio::test] + async fn test_next_open_candidate_region_state() { + let mut state = Box::new(RegionMigrationStart); + // from_peer: 1 + // to_peer: 2 + let mut persistent_context = new_persistent_context(); + let mut volatile_context = VolatileContext::default(); + let env = TestingEnv::new(); + let ctx = env.context(); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(persistent_context.region_id), + leader_peer: Some(Peer::empty(3)), + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state + .next(&ctx, &mut persistent_context, &mut volatile_context) + .await + .unwrap(); + + let _ = next.as_any().downcast_ref::().unwrap(); } } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs new file mode 100644 index 000000000000..16707ba01937 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct OpenCandidateRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for OpenCandidateRegion { + async fn next( + &mut self, + _ctx: &Context, + _pc: &mut PersistentContext, + _vc: &mut VolatileContext, + ) -> Result> { + todo!() + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs new file mode 100644 index 000000000000..566a6721cd78 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_procedure::{Context as ProcedureContext, ProcedureId}; +use common_procedure_test::MockContextProvider; + +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use crate::procedure::region_migration::Context; + +/// `TestingEnv` provides components during the tests. +pub struct TestingEnv { + table_metadata_manager: TableMetadataManagerRef, +} + +impl TestingEnv { + /// Returns an empty [TestingEnv]. + pub fn new() -> Self { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + + Self { + table_metadata_manager, + } + } + + /// Returns a context of region migration procedure. + pub fn context(&self) -> Context { + Context { + table_metadata_manager: self.table_metadata_manager.clone(), + } + } + + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } + + /// Returns a [ProcedureContext] with a random [ProcedureId] and a [MockContextProvider]. + pub fn procedure_context() -> ProcedureContext { + ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + } + } +}