From bd1a5453c38d101146fcd83646952f3f2a31b5a6 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 16 Nov 2023 04:28:29 +0000 Subject: [PATCH 1/3] feat: add migration start state --- src/meta-srv/src/error.rs | 7 + .../src/procedure/region_migration.rs | 82 ++--- .../downgrade_leader_region.rs | 40 +++ .../region_migration/migration_end.rs | 6 + .../region_migration/migration_start.rs | 281 +++++++++++++++++- .../region_migration/open_candidate_region.rs | 40 +++ .../procedure/region_migration/test_util.rs | 58 ++++ 7 files changed, 474 insertions(+), 40 deletions(-) create mode 100644 src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs create mode 100644 src/meta-srv/src/procedure/region_migration/open_candidate_region.rs create mode 100644 src/meta-srv/src/procedure/region_migration/test_util.rs diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index e05168d9d68c..621a68243036 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -551,6 +551,13 @@ pub enum Error { }, } +impl Error { + /// Returns `true` if the error is retryable. + pub fn is_retryable(&self) -> bool { + matches!(self, Error::RetryLater { .. }) + } +} + pub type Result = std::result::Result; define_into_tonic_status!(Error); diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 6bd13c65abde..7c91ca8a30ae 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -12,12 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod downgrade_leader_region; pub(crate) mod migration_end; pub(crate) mod migration_start; +pub(crate) mod open_candidate_region; +#[cfg(test)] +pub(crate) mod test_util; +use std::any::Any; use std::fmt::Debug; +use common_meta::key::TableMetadataManagerRef; use common_meta::peer::Peer; +use common_meta::ClusterId; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; @@ -37,10 +44,12 @@ use crate::procedure::utils::region_lock_key; /// **Notes: Stores with too large data in the context might incur replication overhead.** #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PersistentContext { + /// The Id of the cluster. + cluster_id: ClusterId, /// The [Peer] of migration source. from_peer: Peer, /// The [Peer] of migration destination. - to_peer: Option, + to_peer: Peer, /// The [RegionId] of migration region. region_id: RegionId, } @@ -60,8 +69,16 @@ impl PersistentContext { pub struct VolatileContext {} /// The context of procedure execution. -#[derive(Debug, Clone)] -pub struct Context {} +pub struct Context { + table_metadata_manager: TableMetadataManagerRef, +} + +impl Context { + /// Returns address of meta server. + pub fn server_addr(&self) -> &str { + todo!() + } +} #[async_trait::async_trait] #[typetag::serde(tag = "region_migration_state")] @@ -78,6 +95,9 @@ trait State: Sync + Send + Debug { fn status(&self) -> Status { Status::Executing { persist: true } } + + /// Returns as [Any](std::any::Any). + fn as_any(&self) -> &dyn Any; } /// Persistent data of [RegionMigrationProcedure]. @@ -87,7 +107,6 @@ pub struct RegionMigrationData { state: Box, } -#[derive(Debug)] pub struct RegionMigrationProcedure { data: RegionMigrationData, context: Context, @@ -167,39 +186,27 @@ impl Procedure for RegionMigrationProcedure { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; - use std::sync::Arc; - - use common_procedure::ProcedureId; - use common_procedure_test::MockContextProvider; use super::migration_end::RegionMigrationEnd; use super::*; + use crate::procedure::region_migration::test_util::TestingEnv; - fn persistent_context_factory() -> PersistentContext { + fn new_persistent_context() -> PersistentContext { PersistentContext { from_peer: Peer::empty(1), - to_peer: None, + to_peer: Peer::empty(2), region_id: RegionId::new(1024, 1), - } - } - - fn context_factory() -> Context { - Context {} - } - - fn procedure_context_factory() -> ProcedureContext { - ProcedureContext { - procedure_id: ProcedureId::random(), - provider: Arc::new(MockContextProvider::default()), + cluster_id: 0, } } #[test] fn test_lock_key() { - let persistent_context = persistent_context_factory(); + let persistent_context = new_persistent_context(); let expected_key = persistent_context.lock_key(); - let context = context_factory(); + let env = TestingEnv::new(); + let context = env.context(); let procedure = RegionMigrationProcedure::new(persistent_context, context); @@ -211,15 +218,16 @@ mod tests { #[test] fn test_data_serialization() { - let persistent_context = persistent_context_factory(); + let persistent_context = new_persistent_context(); - let context = context_factory(); + let env = TestingEnv::new(); + let context = env.context(); let procedure = RegionMigrationProcedure::new(persistent_context, context); let serialized = procedure.dump().unwrap(); - let expected = r#"{"context":{"from_peer":{"id":1,"addr":""},"to_peer":null,"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; + let expected = r#"{"context":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; assert_eq!(expected, serialized); } @@ -245,33 +253,39 @@ mod tests { })) } } + + fn as_any(&self) -> &dyn Any { + self + } } #[tokio::test] async fn test_execution_after_deserialized() { - fn new_mock_procedure() -> RegionMigrationProcedure { - let persistent_context = persistent_context_factory(); - let context = context_factory(); + let env = TestingEnv::new(); + + fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure { + let persistent_context = new_persistent_context(); + let context = env.context(); let state = Box::::default(); RegionMigrationProcedure::new_inner(state, persistent_context, context) } - let ctx = procedure_context_factory(); - let mut procedure = new_mock_procedure(); + let ctx = TestingEnv::procedure_context(); + let mut procedure = new_mock_procedure(&env); let mut status = None; for _ in 0..3 { status = Some(procedure.execute(&ctx).await.unwrap()); } assert_matches!(status.unwrap(), Status::Done); - let ctx = procedure_context_factory(); - let mut procedure = new_mock_procedure(); + let ctx = TestingEnv::procedure_context(); + let mut procedure = new_mock_procedure(&env); status = Some(procedure.execute(&ctx).await.unwrap()); let serialized = procedure.dump().unwrap(); - let context = context_factory(); + let context = env.context(); let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap(); for _ in 1..3 { diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs new file mode 100644 index 000000000000..68034c1803be --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DowngradeLeaderRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for DowngradeLeaderRegion { + async fn next( + &mut self, + _ctx: &Context, + _pc: &mut PersistentContext, + _vc: &mut VolatileContext, + ) -> Result> { + todo!() + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/meta-srv/src/procedure/region_migration/migration_end.rs b/src/meta-srv/src/procedure/region_migration/migration_end.rs index 87e64b79dc22..67acca3aec63 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_end.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_end.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + use common_procedure::Status; use serde::{Deserialize, Serialize}; @@ -36,4 +38,8 @@ impl State for RegionMigrationEnd { fn status(&self) -> Status { Status::Done } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 7f445079143a..706c9c3fa777 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -12,24 +12,293 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + +use common_meta::peer::Peer; +use common_meta::rpc::router::RegionRoute; use serde::{Deserialize, Serialize}; +use snafu::{location, Location, OptionExt, ResultExt}; +use store_api::storage::RegionId; -use crate::error::Result; +use super::downgrade_leader_region::DowngradeLeaderRegion; +use super::migration_end::RegionMigrationEnd; +use super::open_candidate_region::OpenCandidateRegion; +use crate::error::{self, Result}; use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; #[derive(Debug, Serialize, Deserialize)] -pub struct RegionMigrationStart {} +pub struct RegionMigrationStart; #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationStart { + /// Yields next [State]. + /// + /// If the expected leader region has been opened on `to_peer`, go to the MigrationEnd state. + /// + /// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state. + /// + /// Otherwise go to the OpenCandidateRegion state. async fn next( &mut self, - _ctx: &Context, - _pc: &mut PersistentContext, + ctx: &Context, + pc: &mut PersistentContext, _vc: &mut VolatileContext, ) -> Result> { - // TODO(weny): It will be added in the following PRs. - todo!() + let region_id = pc.region_id; + let to_peer = &pc.to_peer; + let region_route = self.retrieve_regions_route(ctx, region_id).await?; + + if self.check_leader_region_on_peer(®ion_route, to_peer)? { + Ok(Box::new(RegionMigrationEnd)) + } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { + Ok(Box::new(DowngradeLeaderRegion)) + } else { + Ok(Box::new(OpenCandidateRegion)) + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl RegionMigrationStart { + /// Retrieves region route. + /// + /// Abort(non-retry): + /// - TableRoute is not found. + /// - RegionRoute is not found. + /// + /// Retry: + /// - Failed to retrieve the metadata of table. + async fn retrieve_regions_route( + &self, + ctx: &Context, + region_id: RegionId, + ) -> Result { + let table_id = region_id.table_id(); + let table_route = ctx + .table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(|e| error::Error::RetryLater { + reason: e.to_string(), + location: location!(), + })? + .context(error::TableRouteNotFoundSnafu { table_id })?; + + let region_route = table_route + .region_routes + .iter() + .find(|route| route.region.id == region_id) + .cloned() + .context(error::UnexpectedSnafu { + violated: format!( + "RegionRoute({}) is not found in TableRoute({})", + region_id, table_id + ), + })?; + + Ok(region_route) + } + + /// Checks whether the candidate region on region has been opened. + /// Returns true if it's been opened. + fn check_candidate_region_on_peer(&self, region_route: &RegionRoute, to_peer: &Peer) -> bool { + let region_opened = region_route + .follower_peers + .iter() + .any(|peer| peer.id == to_peer.id); + + region_opened + } + + /// Checks whether the leader region on region has been opened. + /// Returns true if it's been opened. + /// + /// Abort(non-retry): + /// - Leader peer of RegionRoute is not found. + fn check_leader_region_on_peer( + &self, + region_route: &RegionRoute, + to_peer: &Peer, + ) -> Result { + let region_id = region_route.region.id; + + let region_opened = region_route + .leader_peer + .as_ref() + .context(error::UnexpectedSnafu { + violated: format!("Leader peer is not found in TableRoute({})", region_id), + })? + .id + == to_peer.id; + + Ok(region_opened) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::key::test_utils::new_test_table_info; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + use crate::procedure::region_migration::test_util::TestingEnv; + + fn new_persistent_context() -> PersistentContext { + PersistentContext { + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + region_id: RegionId::new(1024, 1), + cluster_id: 0, + } + } + + #[tokio::test] + async fn test_table_route_is_not_found_error() { + let state = RegionMigrationStart; + let env = TestingEnv::new(); + let ctx = env.context(); + + let err = state + .retrieve_regions_route(&ctx, RegionId::new(1024, 1)) + .await + .unwrap_err(); + + assert_matches!(err, Error::TableRouteNotFound { .. }); + + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_region_route_is_not_found_error() { + let state = RegionMigrationStart; + let persistent_context = new_persistent_context(); + let env = TestingEnv::new(); + let ctx = env.context(); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_route = RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(persistent_context.from_peer.clone()), + ..Default::default() + }; + + env.table_metadata_manager() + .create_table_metadata(table_info, vec![region_route]) + .await + .unwrap(); + + let err = state + .retrieve_regions_route(&ctx, RegionId::new(1024, 3)) + .await + .unwrap_err(); + + assert_matches!(err, Error::Unexpected { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_next_downgrade_leader_region_state() { + let mut state = Box::new(RegionMigrationStart); + // from_peer: 1 + // to_peer: 2 + let mut persistent_context = new_persistent_context(); + let mut volatile_context = VolatileContext::default(); + let env = TestingEnv::new(); + let ctx = env.context(); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(persistent_context.region_id), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![persistent_context.to_peer.clone()], + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state + .next(&ctx, &mut persistent_context, &mut volatile_context) + .await + .unwrap(); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + } + + #[tokio::test] + async fn test_next_migration_end_state() { + let mut state = Box::new(RegionMigrationStart); + // from_peer: 1 + // to_peer: 2 + let mut persistent_context = new_persistent_context(); + let mut volatile_context = VolatileContext::default(); + let env = TestingEnv::new(); + let ctx = env.context(); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(persistent_context.region_id), + leader_peer: Some(persistent_context.to_peer.clone()), + follower_peers: vec![persistent_context.from_peer.clone()], + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state + .next(&ctx, &mut persistent_context, &mut volatile_context) + .await + .unwrap(); + + let _ = next.as_any().downcast_ref::().unwrap(); + } + + #[tokio::test] + async fn test_next_open_candidate_region_state() { + let mut state = Box::new(RegionMigrationStart); + // from_peer: 1 + // to_peer: 2 + let mut persistent_context = new_persistent_context(); + let mut volatile_context = VolatileContext::default(); + let env = TestingEnv::new(); + let ctx = env.context(); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(persistent_context.region_id), + leader_peer: Some(Peer::empty(3)), + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state + .next(&ctx, &mut persistent_context, &mut volatile_context) + .await + .unwrap(); + + let _ = next.as_any().downcast_ref::().unwrap(); } } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs new file mode 100644 index 000000000000..16707ba01937 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct OpenCandidateRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for OpenCandidateRegion { + async fn next( + &mut self, + _ctx: &Context, + _pc: &mut PersistentContext, + _vc: &mut VolatileContext, + ) -> Result> { + todo!() + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs new file mode 100644 index 000000000000..436645f4dc62 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -0,0 +1,58 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_procedure::{Context as ProcedureContext, ProcedureId}; +use common_procedure_test::MockContextProvider; + +use crate::procedure::region_migration::Context; + +/// `TestingEnv` provides components during the tests. +pub struct TestingEnv { + table_metadata_manager: TableMetadataManagerRef, +} + +impl TestingEnv { + /// Returns an empty [TestingEnv]. + pub fn new() -> Self { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + + Self { + table_metadata_manager, + } + } + + /// Returns a context of region migration procedure. + pub fn context(&self) -> Context { + Context { + table_metadata_manager: self.table_metadata_manager.clone(), + } + } + + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } + + /// Returns a [ProcedureContext] with a random [ProcedureId] and a [MockContextProvider]. + pub fn procedure_context() -> ProcedureContext { + ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + } + } +} From 8670a217d1372c41a5ba53103e91653eb8a7b0aa Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 16 Nov 2023 10:25:59 +0000 Subject: [PATCH 2/3] refactor: move PersistentContext and VolatileContext into Context --- .../src/procedure/region_migration.rs | 129 ++++++++++-------- .../downgrade_leader_region.rs | 9 +- .../region_migration/migration_end.rs | 9 +- .../region_migration/migration_start.rs | 91 ++++++------ .../region_migration/open_candidate_region.rs | 9 +- .../procedure/region_migration/test_util.rs | 7 +- 6 files changed, 135 insertions(+), 119 deletions(-) diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 7c91ca8a30ae..824320b4ae3b 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -21,6 +21,7 @@ pub(crate) mod test_util; use std::any::Any; use std::fmt::Debug; +use std::sync::{Arc, Mutex, MutexGuard}; use common_meta::key::TableMetadataManagerRef; use common_meta::peer::Peer; @@ -68,8 +69,31 @@ impl PersistentContext { #[derive(Debug, Clone, Default)] pub struct VolatileContext {} +/// Used to generate new [Context]. +pub trait ContextFactory { + fn new_context(self, persistent_ctx: Arc>) -> Context; +} + +/// Default implementation. +pub struct ContextFactoryImpl { + volatile_ctx: Mutex, + table_metadata_manager: TableMetadataManagerRef, +} + +impl ContextFactory for ContextFactoryImpl { + fn new_context(self, persistent_ctx: Arc>) -> Context { + Context { + persistent_ctx, + volatile_ctx: self.volatile_ctx, + table_metadata_manager: self.table_metadata_manager, + } + } +} + /// The context of procedure execution. pub struct Context { + persistent_ctx: Arc>, + volatile_ctx: Mutex, table_metadata_manager: TableMetadataManagerRef, } @@ -78,18 +102,23 @@ impl Context { pub fn server_addr(&self) -> &str { todo!() } + + /// Returns the [MutexGuard] of [PersistentContext]. + pub fn persistent_ctx_guard(&self) -> MutexGuard<'_, PersistentContext> { + self.persistent_ctx.lock().unwrap() + } + + /// Returns the [MutexGuard] of [VolatileContext]. + pub fn volatile_ctx_guard(&self) -> MutexGuard<'_, VolatileContext> { + self.volatile_ctx.lock().unwrap() + } } #[async_trait::async_trait] #[typetag::serde(tag = "region_migration_state")] trait State: Sync + Send + Debug { /// Yields the next state. - async fn next( - &mut self, - ctx: &Context, - pc: &mut PersistentContext, - vc: &mut VolatileContext, - ) -> Result>; + async fn next(&mut self, ctx: &Context) -> Result>; /// Indicates the procedure execution status of the `State`. fn status(&self) -> Status { @@ -103,14 +132,13 @@ trait State: Sync + Send + Debug { /// Persistent data of [RegionMigrationProcedure]. #[derive(Debug, Serialize, Deserialize)] pub struct RegionMigrationData { - context: PersistentContext, + persistent_ctx: Arc>, state: Box, } pub struct RegionMigrationProcedure { data: RegionMigrationData, context: Context, - volatile_context: VolatileContext, } // TODO(weny): remove it. @@ -118,34 +146,35 @@ pub struct RegionMigrationProcedure { impl RegionMigrationProcedure { const TYPE_NAME: &str = "metasrv-procedure::RegionMigration"; - pub fn new(persistent_context: PersistentContext, context: Context) -> Self { + pub fn new( + persistent_context: PersistentContext, + context_factory: impl ContextFactory, + ) -> Self { let state = Box::new(RegionMigrationStart {}); - Self::new_inner(state, persistent_context, context) + Self::new_inner(state, persistent_context, context_factory) } fn new_inner( state: Box, persistent_context: PersistentContext, - context: Context, + context_factory: impl ContextFactory, ) -> Self { + let shared_persistent_context = Arc::new(Mutex::new(persistent_context)); Self { data: RegionMigrationData { - context: persistent_context, + persistent_ctx: shared_persistent_context.clone(), state, }, - context, - volatile_context: VolatileContext::default(), + context: context_factory.new_context(shared_persistent_context), } } - fn from_json(json: &str, context: Context) -> ProcedureResult { + fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult { let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?; - Ok(Self { - data, - context, - volatile_context: VolatileContext::default(), - }) + let context = context_factory.new_context(data.persistent_ctx.clone()); + + Ok(Self { data, context }) } } @@ -158,19 +187,14 @@ impl Procedure for RegionMigrationProcedure { async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let data = &mut self.data; let state = &mut data.state; - let persistent_context = &mut data.context; - let volatile_context = &mut self.volatile_context; - - *state = state - .next(&self.context, persistent_context, volatile_context) - .await - .map_err(|e| { - if matches!(e, Error::RetryLater { .. }) { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) - } - })?; + + *state = state.next(&self.context).await.map_err(|e| { + if matches!(e, Error::RetryLater { .. }) { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + })?; Ok(state.status()) } @@ -179,7 +203,8 @@ impl Procedure for RegionMigrationProcedure { } fn lock_key(&self) -> LockKey { - LockKey::single(self.data.context.lock_key()) + let key = self.data.persistent_ctx.lock().unwrap().lock_key(); + LockKey::single(key) } } @@ -206,7 +231,7 @@ mod tests { let expected_key = persistent_context.lock_key(); let env = TestingEnv::new(); - let context = env.context(); + let context = env.context_factory(); let procedure = RegionMigrationProcedure::new(persistent_context, context); @@ -221,36 +246,30 @@ mod tests { let persistent_context = new_persistent_context(); let env = TestingEnv::new(); - let context = env.context(); + let context = env.context_factory(); let procedure = RegionMigrationProcedure::new(persistent_context, context); let serialized = procedure.dump().unwrap(); - let expected = r#"{"context":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; + let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; assert_eq!(expected, serialized); } #[derive(Debug, Serialize, Deserialize, Default)] - pub struct MockState { - count: usize, - } + pub struct MockState; #[async_trait::async_trait] #[typetag::serde] impl State for MockState { - async fn next( - &mut self, - _: &Context, - _: &mut PersistentContext, - _: &mut VolatileContext, - ) -> Result> { - if self.count == 2 { + async fn next(&mut self, ctx: &Context) -> Result> { + let mut pc = ctx.persistent_ctx_guard(); + + if pc.cluster_id == 2 { Ok(Box::new(RegionMigrationEnd)) } else { - Ok(Box::new(MockState { - count: self.count + 1, - })) + pc.cluster_id += 1; + Ok(Box::new(MockState)) } } @@ -265,9 +284,9 @@ mod tests { fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure { let persistent_context = new_persistent_context(); - let context = env.context(); + let context_factory = env.context_factory(); let state = Box::::default(); - RegionMigrationProcedure::new_inner(state, persistent_context, context) + RegionMigrationProcedure::new_inner(state, persistent_context, context_factory) } let ctx = TestingEnv::procedure_context(); @@ -285,12 +304,14 @@ mod tests { let serialized = procedure.dump().unwrap(); - let context = env.context(); - let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap(); + let context_factory = env.context_factory(); + let mut procedure = + RegionMigrationProcedure::from_json(&serialized, context_factory).unwrap(); for _ in 1..3 { status = Some(procedure.execute(&ctx).await.unwrap()); } + assert_eq!(procedure.context.persistent_ctx_guard().cluster_id, 2); assert_matches!(status.unwrap(), Status::Done); } } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 68034c1803be..9699b29edc0e 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -17,7 +17,7 @@ use std::any::Any; use serde::{Deserialize, Serialize}; use crate::error::Result; -use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; +use crate::procedure::region_migration::{Context, State}; #[derive(Debug, Serialize, Deserialize)] pub struct DowngradeLeaderRegion; @@ -25,12 +25,7 @@ pub struct DowngradeLeaderRegion; #[async_trait::async_trait] #[typetag::serde] impl State for DowngradeLeaderRegion { - async fn next( - &mut self, - _ctx: &Context, - _pc: &mut PersistentContext, - _vc: &mut VolatileContext, - ) -> Result> { + async fn next(&mut self, _ctx: &Context) -> Result> { todo!() } diff --git a/src/meta-srv/src/procedure/region_migration/migration_end.rs b/src/meta-srv/src/procedure/region_migration/migration_end.rs index 67acca3aec63..579333aff6bb 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_end.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_end.rs @@ -18,7 +18,7 @@ use common_procedure::Status; use serde::{Deserialize, Serialize}; use crate::error::Result; -use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; +use crate::procedure::region_migration::{Context, State}; #[derive(Debug, Serialize, Deserialize)] pub struct RegionMigrationEnd; @@ -26,12 +26,7 @@ pub struct RegionMigrationEnd; #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationEnd { - async fn next( - &mut self, - _: &Context, - _: &mut PersistentContext, - _: &mut VolatileContext, - ) -> Result> { + async fn next(&mut self, _: &Context) -> Result> { Ok(Box::new(RegionMigrationEnd)) } diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 706c9c3fa777..8fb8a47b328d 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -24,7 +24,7 @@ use super::downgrade_leader_region::DowngradeLeaderRegion; use super::migration_end::RegionMigrationEnd; use super::open_candidate_region::OpenCandidateRegion; use crate::error::{self, Result}; -use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; +use crate::procedure::region_migration::{Context, State}; #[derive(Debug, Serialize, Deserialize)] pub struct RegionMigrationStart; @@ -39,19 +39,17 @@ impl State for RegionMigrationStart { /// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state. /// /// Otherwise go to the OpenCandidateRegion state. - async fn next( - &mut self, - ctx: &Context, - pc: &mut PersistentContext, - _vc: &mut VolatileContext, - ) -> Result> { - let region_id = pc.region_id; - let to_peer = &pc.to_peer; + async fn next(&mut self, ctx: &Context) -> Result> { + let (region_id, to_peer) = { + let pc = ctx.persistent_ctx_guard(); + (pc.region_id, pc.to_peer.clone()) + }; + let region_route = self.retrieve_regions_route(ctx, region_id).await?; - if self.check_leader_region_on_peer(®ion_route, to_peer)? { + if self.check_leader_region_on_peer(®ion_route, &to_peer)? { Ok(Box::new(RegionMigrationEnd)) - } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { + } else if self.check_candidate_region_on_peer(®ion_route, &to_peer) { Ok(Box::new(DowngradeLeaderRegion)) } else { Ok(Box::new(OpenCandidateRegion)) @@ -144,6 +142,7 @@ impl RegionMigrationStart { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::sync::{Arc, Mutex}; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -153,6 +152,7 @@ mod tests { use super::*; use crate::error::Error; use crate::procedure::region_migration::test_util::TestingEnv; + use crate::procedure::region_migration::{ContextFactory, PersistentContext}; fn new_persistent_context() -> PersistentContext { PersistentContext { @@ -167,7 +167,10 @@ mod tests { async fn test_table_route_is_not_found_error() { let state = RegionMigrationStart; let env = TestingEnv::new(); - let ctx = env.context(); + let persistent_context = new_persistent_context(); + let ctx = env + .context_factory() + .new_context(Arc::new(Mutex::new(persistent_context))); let err = state .retrieve_regions_route(&ctx, RegionId::new(1024, 1)) @@ -183,13 +186,17 @@ mod tests { async fn test_region_route_is_not_found_error() { let state = RegionMigrationStart; let persistent_context = new_persistent_context(); + let from_peer = persistent_context.from_peer.clone(); + let env = TestingEnv::new(); - let ctx = env.context(); + let ctx = env + .context_factory() + .new_context(Arc::new(Mutex::new(persistent_context))); let table_info = new_test_table_info(1024, vec![1]).into(); let region_route = RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), - leader_peer: Some(persistent_context.from_peer.clone()), + leader_peer: Some(from_peer.clone()), ..Default::default() }; @@ -212,16 +219,20 @@ mod tests { let mut state = Box::new(RegionMigrationStart); // from_peer: 1 // to_peer: 2 - let mut persistent_context = new_persistent_context(); - let mut volatile_context = VolatileContext::default(); + let persistent_context = new_persistent_context(); + let to_peer = persistent_context.to_peer.clone(); + let region_id = persistent_context.region_id; + let env = TestingEnv::new(); - let ctx = env.context(); + let ctx = env + .context_factory() + .new_context(Arc::new(Mutex::new(persistent_context))); let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { - region: Region::new_test(persistent_context.region_id), + region: Region::new_test(region_id), leader_peer: Some(Peer::empty(3)), - follower_peers: vec![persistent_context.to_peer.clone()], + follower_peers: vec![to_peer], ..Default::default() }]; @@ -230,10 +241,7 @@ mod tests { .await .unwrap(); - let next = state - .next(&ctx, &mut persistent_context, &mut volatile_context) - .await - .unwrap(); + let next = state.next(&ctx).await.unwrap(); let _ = next .as_any() @@ -246,16 +254,21 @@ mod tests { let mut state = Box::new(RegionMigrationStart); // from_peer: 1 // to_peer: 2 - let mut persistent_context = new_persistent_context(); - let mut volatile_context = VolatileContext::default(); + let persistent_context = new_persistent_context(); + let to_peer = persistent_context.to_peer.clone(); + let from_peer = persistent_context.from_peer.clone(); + let region_id = persistent_context.region_id; + let env = TestingEnv::new(); - let ctx = env.context(); + let ctx = env + .context_factory() + .new_context(Arc::new(Mutex::new(persistent_context))); let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { - region: Region::new_test(persistent_context.region_id), - leader_peer: Some(persistent_context.to_peer.clone()), - follower_peers: vec![persistent_context.from_peer.clone()], + region: Region::new_test(region_id), + leader_peer: Some(to_peer), + follower_peers: vec![from_peer], ..Default::default() }]; @@ -264,10 +277,7 @@ mod tests { .await .unwrap(); - let next = state - .next(&ctx, &mut persistent_context, &mut volatile_context) - .await - .unwrap(); + let next = state.next(&ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); } @@ -277,14 +287,16 @@ mod tests { let mut state = Box::new(RegionMigrationStart); // from_peer: 1 // to_peer: 2 - let mut persistent_context = new_persistent_context(); - let mut volatile_context = VolatileContext::default(); + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; let env = TestingEnv::new(); - let ctx = env.context(); + let ctx = env + .context_factory() + .new_context(Arc::new(Mutex::new(persistent_context))); let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { - region: Region::new_test(persistent_context.region_id), + region: Region::new_test(region_id), leader_peer: Some(Peer::empty(3)), ..Default::default() }]; @@ -294,10 +306,7 @@ mod tests { .await .unwrap(); - let next = state - .next(&ctx, &mut persistent_context, &mut volatile_context) - .await - .unwrap(); + let next = state.next(&ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 16707ba01937..9a6f40f33dfd 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -17,7 +17,7 @@ use std::any::Any; use serde::{Deserialize, Serialize}; use crate::error::Result; -use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; +use crate::procedure::region_migration::{Context, State}; #[derive(Debug, Serialize, Deserialize)] pub struct OpenCandidateRegion; @@ -25,12 +25,7 @@ pub struct OpenCandidateRegion; #[async_trait::async_trait] #[typetag::serde] impl State for OpenCandidateRegion { - async fn next( - &mut self, - _ctx: &Context, - _pc: &mut PersistentContext, - _vc: &mut VolatileContext, - ) -> Result> { + async fn next(&mut self, _ctx: &Context) -> Result> { todo!() } diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 436645f4dc62..cfac3b4cb789 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -19,7 +19,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_procedure::{Context as ProcedureContext, ProcedureId}; use common_procedure_test::MockContextProvider; -use crate::procedure::region_migration::Context; +use super::ContextFactoryImpl; /// `TestingEnv` provides components during the tests. pub struct TestingEnv { @@ -38,9 +38,10 @@ impl TestingEnv { } /// Returns a context of region migration procedure. - pub fn context(&self) -> Context { - Context { + pub fn context_factory(&self) -> ContextFactoryImpl { + ContextFactoryImpl { table_metadata_manager: self.table_metadata_manager.clone(), + volatile_ctx: Default::default(), } } From a2cba0fb08a1a6ec71d63b5e1f37c9614f2acf0c Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 16 Nov 2023 19:28:45 +0000 Subject: [PATCH 3/3] chore: apply suggestions from CR --- .../src/procedure/region_migration.rs | 74 +++++++++---------- .../downgrade_leader_region.rs | 2 +- .../region_migration/migration_end.rs | 2 +- .../region_migration/migration_start.rs | 47 +++++------- .../region_migration/open_candidate_region.rs | 2 +- 5 files changed, 57 insertions(+), 70 deletions(-) diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 824320b4ae3b..8ec21826952e 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -21,7 +21,6 @@ pub(crate) mod test_util; use std::any::Any; use std::fmt::Debug; -use std::sync::{Arc, Mutex, MutexGuard}; use common_meta::key::TableMetadataManagerRef; use common_meta::peer::Peer; @@ -71,17 +70,17 @@ pub struct VolatileContext {} /// Used to generate new [Context]. pub trait ContextFactory { - fn new_context(self, persistent_ctx: Arc>) -> Context; + fn new_context(self, persistent_ctx: PersistentContext) -> Context; } /// Default implementation. pub struct ContextFactoryImpl { - volatile_ctx: Mutex, + volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, } impl ContextFactory for ContextFactoryImpl { - fn new_context(self, persistent_ctx: Arc>) -> Context { + fn new_context(self, persistent_ctx: PersistentContext) -> Context { Context { persistent_ctx, volatile_ctx: self.volatile_ctx, @@ -90,10 +89,12 @@ impl ContextFactory for ContextFactoryImpl { } } +// TODO(weny): remove it. +#[allow(dead_code)] /// The context of procedure execution. pub struct Context { - persistent_ctx: Arc>, - volatile_ctx: Mutex, + persistent_ctx: PersistentContext, + volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, } @@ -102,23 +103,13 @@ impl Context { pub fn server_addr(&self) -> &str { todo!() } - - /// Returns the [MutexGuard] of [PersistentContext]. - pub fn persistent_ctx_guard(&self) -> MutexGuard<'_, PersistentContext> { - self.persistent_ctx.lock().unwrap() - } - - /// Returns the [MutexGuard] of [VolatileContext]. - pub fn volatile_ctx_guard(&self) -> MutexGuard<'_, VolatileContext> { - self.volatile_ctx.lock().unwrap() - } } #[async_trait::async_trait] #[typetag::serde(tag = "region_migration_state")] trait State: Sync + Send + Debug { /// Yields the next state. - async fn next(&mut self, ctx: &Context) -> Result>; + async fn next(&mut self, ctx: &mut Context) -> Result>; /// Indicates the procedure execution status of the `State`. fn status(&self) -> Status { @@ -131,13 +122,20 @@ trait State: Sync + Send + Debug { /// Persistent data of [RegionMigrationProcedure]. #[derive(Debug, Serialize, Deserialize)] -pub struct RegionMigrationData { - persistent_ctx: Arc>, +pub struct RegionMigrationDataOwned { + persistent_ctx: PersistentContext, state: Box, } +/// Persistent data of [RegionMigrationProcedure]. +#[derive(Debug, Serialize)] +pub struct RegionMigrationData<'a> { + persistent_ctx: &'a PersistentContext, + state: &'a dyn State, +} + pub struct RegionMigrationProcedure { - data: RegionMigrationData, + state: Box, context: Context, } @@ -159,22 +157,21 @@ impl RegionMigrationProcedure { persistent_context: PersistentContext, context_factory: impl ContextFactory, ) -> Self { - let shared_persistent_context = Arc::new(Mutex::new(persistent_context)); Self { - data: RegionMigrationData { - persistent_ctx: shared_persistent_context.clone(), - state, - }, - context: context_factory.new_context(shared_persistent_context), + state, + context: context_factory.new_context(persistent_context), } } fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult { - let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?; + let RegionMigrationDataOwned { + persistent_ctx, + state, + } = serde_json::from_str(json).context(FromJsonSnafu)?; - let context = context_factory.new_context(data.persistent_ctx.clone()); + let context = context_factory.new_context(persistent_ctx); - Ok(Self { data, context }) + Ok(Self { state, context }) } } @@ -185,10 +182,9 @@ impl Procedure for RegionMigrationProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let data = &mut self.data; - let state = &mut data.state; + let state = &mut self.state; - *state = state.next(&self.context).await.map_err(|e| { + *state = state.next(&mut self.context).await.map_err(|e| { if matches!(e, Error::RetryLater { .. }) { ProcedureError::retry_later(e) } else { @@ -199,11 +195,15 @@ impl Procedure for RegionMigrationProcedure { } fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.data).context(ToJsonSnafu) + let data = RegionMigrationData { + state: self.state.as_ref(), + persistent_ctx: &self.context.persistent_ctx, + }; + serde_json::to_string(&data).context(ToJsonSnafu) } fn lock_key(&self) -> LockKey { - let key = self.data.persistent_ctx.lock().unwrap().lock_key(); + let key = self.context.persistent_ctx.lock_key(); LockKey::single(key) } } @@ -262,8 +262,8 @@ mod tests { #[async_trait::async_trait] #[typetag::serde] impl State for MockState { - async fn next(&mut self, ctx: &Context) -> Result> { - let mut pc = ctx.persistent_ctx_guard(); + async fn next(&mut self, ctx: &mut Context) -> Result> { + let pc = &mut ctx.persistent_ctx; if pc.cluster_id == 2 { Ok(Box::new(RegionMigrationEnd)) @@ -311,7 +311,7 @@ mod tests { for _ in 1..3 { status = Some(procedure.execute(&ctx).await.unwrap()); } - assert_eq!(procedure.context.persistent_ctx_guard().cluster_id, 2); + assert_eq!(procedure.context.persistent_ctx.cluster_id, 2); assert_matches!(status.unwrap(), Status::Done); } } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 9699b29edc0e..c0ff94330723 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -25,7 +25,7 @@ pub struct DowngradeLeaderRegion; #[async_trait::async_trait] #[typetag::serde] impl State for DowngradeLeaderRegion { - async fn next(&mut self, _ctx: &Context) -> Result> { + async fn next(&mut self, _ctx: &mut Context) -> Result> { todo!() } diff --git a/src/meta-srv/src/procedure/region_migration/migration_end.rs b/src/meta-srv/src/procedure/region_migration/migration_end.rs index 579333aff6bb..c50e0a67b749 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_end.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_end.rs @@ -26,7 +26,7 @@ pub struct RegionMigrationEnd; #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationEnd { - async fn next(&mut self, _: &Context) -> Result> { + async fn next(&mut self, _: &mut Context) -> Result> { Ok(Box::new(RegionMigrationEnd)) } diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 8fb8a47b328d..6f2e43c8ace4 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -39,17 +39,15 @@ impl State for RegionMigrationStart { /// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state. /// /// Otherwise go to the OpenCandidateRegion state. - async fn next(&mut self, ctx: &Context) -> Result> { - let (region_id, to_peer) = { - let pc = ctx.persistent_ctx_guard(); - (pc.region_id, pc.to_peer.clone()) - }; + async fn next(&mut self, ctx: &mut Context) -> Result> { + let region_id = ctx.persistent_ctx.region_id; + let to_peer = &ctx.persistent_ctx.to_peer; - let region_route = self.retrieve_regions_route(ctx, region_id).await?; + let region_route = self.retrieve_region_route(ctx, region_id).await?; - if self.check_leader_region_on_peer(®ion_route, &to_peer)? { + if self.check_leader_region_on_peer(®ion_route, to_peer)? { Ok(Box::new(RegionMigrationEnd)) - } else if self.check_candidate_region_on_peer(®ion_route, &to_peer) { + } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { Ok(Box::new(DowngradeLeaderRegion)) } else { Ok(Box::new(OpenCandidateRegion)) @@ -70,7 +68,7 @@ impl RegionMigrationStart { /// /// Retry: /// - Failed to retrieve the metadata of table. - async fn retrieve_regions_route( + async fn retrieve_region_route( &self, ctx: &Context, region_id: RegionId, @@ -142,7 +140,6 @@ impl RegionMigrationStart { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; - use std::sync::{Arc, Mutex}; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -168,12 +165,10 @@ mod tests { let state = RegionMigrationStart; let env = TestingEnv::new(); let persistent_context = new_persistent_context(); - let ctx = env - .context_factory() - .new_context(Arc::new(Mutex::new(persistent_context))); + let ctx = env.context_factory().new_context(persistent_context); let err = state - .retrieve_regions_route(&ctx, RegionId::new(1024, 1)) + .retrieve_region_route(&ctx, RegionId::new(1024, 1)) .await .unwrap_err(); @@ -189,9 +184,7 @@ mod tests { let from_peer = persistent_context.from_peer.clone(); let env = TestingEnv::new(); - let ctx = env - .context_factory() - .new_context(Arc::new(Mutex::new(persistent_context))); + let ctx = env.context_factory().new_context(persistent_context); let table_info = new_test_table_info(1024, vec![1]).into(); let region_route = RegionRoute { @@ -206,7 +199,7 @@ mod tests { .unwrap(); let err = state - .retrieve_regions_route(&ctx, RegionId::new(1024, 3)) + .retrieve_region_route(&ctx, RegionId::new(1024, 3)) .await .unwrap_err(); @@ -224,9 +217,7 @@ mod tests { let region_id = persistent_context.region_id; let env = TestingEnv::new(); - let ctx = env - .context_factory() - .new_context(Arc::new(Mutex::new(persistent_context))); + let mut ctx = env.context_factory().new_context(persistent_context); let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { @@ -241,7 +232,7 @@ mod tests { .await .unwrap(); - let next = state.next(&ctx).await.unwrap(); + let next = state.next(&mut ctx).await.unwrap(); let _ = next .as_any() @@ -260,9 +251,7 @@ mod tests { let region_id = persistent_context.region_id; let env = TestingEnv::new(); - let ctx = env - .context_factory() - .new_context(Arc::new(Mutex::new(persistent_context))); + let mut ctx = env.context_factory().new_context(persistent_context); let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { @@ -277,7 +266,7 @@ mod tests { .await .unwrap(); - let next = state.next(&ctx).await.unwrap(); + let next = state.next(&mut ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); } @@ -290,9 +279,7 @@ mod tests { let persistent_context = new_persistent_context(); let region_id = persistent_context.region_id; let env = TestingEnv::new(); - let ctx = env - .context_factory() - .new_context(Arc::new(Mutex::new(persistent_context))); + let mut ctx = env.context_factory().new_context(persistent_context); let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { @@ -306,7 +293,7 @@ mod tests { .await .unwrap(); - let next = state.next(&ctx).await.unwrap(); + let next = state.next(&mut ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 9a6f40f33dfd..c056ec741601 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -25,7 +25,7 @@ pub struct OpenCandidateRegion; #[async_trait::async_trait] #[typetag::serde] impl State for OpenCandidateRegion { - async fn next(&mut self, _ctx: &Context) -> Result> { + async fn next(&mut self, _ctx: &mut Context) -> Result> { todo!() }