diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index e05168d9d68c..621a68243036 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -551,6 +551,13 @@ pub enum Error { }, } +impl Error { + /// Returns `true` if the error is retryable. + pub fn is_retryable(&self) -> bool { + matches!(self, Error::RetryLater { .. }) + } +} + pub type Result = std::result::Result; define_into_tonic_status!(Error); diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 6bd13c65abde..8ec21826952e 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -12,12 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod downgrade_leader_region; pub(crate) mod migration_end; pub(crate) mod migration_start; +pub(crate) mod open_candidate_region; +#[cfg(test)] +pub(crate) mod test_util; +use std::any::Any; use std::fmt::Debug; +use common_meta::key::TableMetadataManagerRef; use common_meta::peer::Peer; +use common_meta::ClusterId; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; @@ -37,10 +44,12 @@ use crate::procedure::utils::region_lock_key; /// **Notes: Stores with too large data in the context might incur replication overhead.** #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PersistentContext { + /// The Id of the cluster. + cluster_id: ClusterId, /// The [Peer] of migration source. from_peer: Peer, /// The [Peer] of migration destination. - to_peer: Option, + to_peer: Peer, /// The [RegionId] of migration region. region_id: RegionId, } @@ -59,39 +68,75 @@ impl PersistentContext { #[derive(Debug, Clone, Default)] pub struct VolatileContext {} +/// Used to generate new [Context]. +pub trait ContextFactory { + fn new_context(self, persistent_ctx: PersistentContext) -> Context; +} + +/// Default implementation. +pub struct ContextFactoryImpl { + volatile_ctx: VolatileContext, + table_metadata_manager: TableMetadataManagerRef, +} + +impl ContextFactory for ContextFactoryImpl { + fn new_context(self, persistent_ctx: PersistentContext) -> Context { + Context { + persistent_ctx, + volatile_ctx: self.volatile_ctx, + table_metadata_manager: self.table_metadata_manager, + } + } +} + +// TODO(weny): remove it. +#[allow(dead_code)] /// The context of procedure execution. -#[derive(Debug, Clone)] -pub struct Context {} +pub struct Context { + persistent_ctx: PersistentContext, + volatile_ctx: VolatileContext, + table_metadata_manager: TableMetadataManagerRef, +} + +impl Context { + /// Returns address of meta server. + pub fn server_addr(&self) -> &str { + todo!() + } +} #[async_trait::async_trait] #[typetag::serde(tag = "region_migration_state")] trait State: Sync + Send + Debug { /// Yields the next state. - async fn next( - &mut self, - ctx: &Context, - pc: &mut PersistentContext, - vc: &mut VolatileContext, - ) -> Result>; + async fn next(&mut self, ctx: &mut Context) -> Result>; /// Indicates the procedure execution status of the `State`. fn status(&self) -> Status { Status::Executing { persist: true } } + + /// Returns as [Any](std::any::Any). + fn as_any(&self) -> &dyn Any; } /// Persistent data of [RegionMigrationProcedure]. #[derive(Debug, Serialize, Deserialize)] -pub struct RegionMigrationData { - context: PersistentContext, +pub struct RegionMigrationDataOwned { + persistent_ctx: PersistentContext, state: Box, } -#[derive(Debug)] +/// Persistent data of [RegionMigrationProcedure]. +#[derive(Debug, Serialize)] +pub struct RegionMigrationData<'a> { + persistent_ctx: &'a PersistentContext, + state: &'a dyn State, +} + pub struct RegionMigrationProcedure { - data: RegionMigrationData, + state: Box, context: Context, - volatile_context: VolatileContext, } // TODO(weny): remove it. @@ -99,34 +144,34 @@ pub struct RegionMigrationProcedure { impl RegionMigrationProcedure { const TYPE_NAME: &str = "metasrv-procedure::RegionMigration"; - pub fn new(persistent_context: PersistentContext, context: Context) -> Self { + pub fn new( + persistent_context: PersistentContext, + context_factory: impl ContextFactory, + ) -> Self { let state = Box::new(RegionMigrationStart {}); - Self::new_inner(state, persistent_context, context) + Self::new_inner(state, persistent_context, context_factory) } fn new_inner( state: Box, persistent_context: PersistentContext, - context: Context, + context_factory: impl ContextFactory, ) -> Self { Self { - data: RegionMigrationData { - context: persistent_context, - state, - }, - context, - volatile_context: VolatileContext::default(), + state, + context: context_factory.new_context(persistent_context), } } - fn from_json(json: &str, context: Context) -> ProcedureResult { - let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?; + fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult { + let RegionMigrationDataOwned { + persistent_ctx, + state, + } = serde_json::from_str(json).context(FromJsonSnafu)?; - Ok(Self { - data, - context, - volatile_context: VolatileContext::default(), - }) + let context = context_factory.new_context(persistent_ctx); + + Ok(Self { state, context }) } } @@ -137,69 +182,56 @@ impl Procedure for RegionMigrationProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let data = &mut self.data; - let state = &mut data.state; - let persistent_context = &mut data.context; - let volatile_context = &mut self.volatile_context; - - *state = state - .next(&self.context, persistent_context, volatile_context) - .await - .map_err(|e| { - if matches!(e, Error::RetryLater { .. }) { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) - } - })?; + let state = &mut self.state; + + *state = state.next(&mut self.context).await.map_err(|e| { + if matches!(e, Error::RetryLater { .. }) { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + })?; Ok(state.status()) } fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.data).context(ToJsonSnafu) + let data = RegionMigrationData { + state: self.state.as_ref(), + persistent_ctx: &self.context.persistent_ctx, + }; + serde_json::to_string(&data).context(ToJsonSnafu) } fn lock_key(&self) -> LockKey { - LockKey::single(self.data.context.lock_key()) + let key = self.context.persistent_ctx.lock_key(); + LockKey::single(key) } } #[cfg(test)] mod tests { use std::assert_matches::assert_matches; - use std::sync::Arc; - - use common_procedure::ProcedureId; - use common_procedure_test::MockContextProvider; use super::migration_end::RegionMigrationEnd; use super::*; + use crate::procedure::region_migration::test_util::TestingEnv; - fn persistent_context_factory() -> PersistentContext { + fn new_persistent_context() -> PersistentContext { PersistentContext { from_peer: Peer::empty(1), - to_peer: None, + to_peer: Peer::empty(2), region_id: RegionId::new(1024, 1), - } - } - - fn context_factory() -> Context { - Context {} - } - - fn procedure_context_factory() -> ProcedureContext { - ProcedureContext { - procedure_id: ProcedureId::random(), - provider: Arc::new(MockContextProvider::default()), + cluster_id: 0, } } #[test] fn test_lock_key() { - let persistent_context = persistent_context_factory(); + let persistent_context = new_persistent_context(); let expected_key = persistent_context.lock_key(); - let context = context_factory(); + let env = TestingEnv::new(); + let context = env.context_factory(); let procedure = RegionMigrationProcedure::new(persistent_context, context); @@ -211,72 +243,75 @@ mod tests { #[test] fn test_data_serialization() { - let persistent_context = persistent_context_factory(); + let persistent_context = new_persistent_context(); - let context = context_factory(); + let env = TestingEnv::new(); + let context = env.context_factory(); let procedure = RegionMigrationProcedure::new(persistent_context, context); let serialized = procedure.dump().unwrap(); - let expected = r#"{"context":{"from_peer":{"id":1,"addr":""},"to_peer":null,"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; + let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; assert_eq!(expected, serialized); } #[derive(Debug, Serialize, Deserialize, Default)] - pub struct MockState { - count: usize, - } + pub struct MockState; #[async_trait::async_trait] #[typetag::serde] impl State for MockState { - async fn next( - &mut self, - _: &Context, - _: &mut PersistentContext, - _: &mut VolatileContext, - ) -> Result> { - if self.count == 2 { + async fn next(&mut self, ctx: &mut Context) -> Result> { + let pc = &mut ctx.persistent_ctx; + + if pc.cluster_id == 2 { Ok(Box::new(RegionMigrationEnd)) } else { - Ok(Box::new(MockState { - count: self.count + 1, - })) + pc.cluster_id += 1; + Ok(Box::new(MockState)) } } + + fn as_any(&self) -> &dyn Any { + self + } } #[tokio::test] async fn test_execution_after_deserialized() { - fn new_mock_procedure() -> RegionMigrationProcedure { - let persistent_context = persistent_context_factory(); - let context = context_factory(); + let env = TestingEnv::new(); + + fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure { + let persistent_context = new_persistent_context(); + let context_factory = env.context_factory(); let state = Box::::default(); - RegionMigrationProcedure::new_inner(state, persistent_context, context) + RegionMigrationProcedure::new_inner(state, persistent_context, context_factory) } - let ctx = procedure_context_factory(); - let mut procedure = new_mock_procedure(); + let ctx = TestingEnv::procedure_context(); + let mut procedure = new_mock_procedure(&env); let mut status = None; for _ in 0..3 { status = Some(procedure.execute(&ctx).await.unwrap()); } assert_matches!(status.unwrap(), Status::Done); - let ctx = procedure_context_factory(); - let mut procedure = new_mock_procedure(); + let ctx = TestingEnv::procedure_context(); + let mut procedure = new_mock_procedure(&env); status = Some(procedure.execute(&ctx).await.unwrap()); let serialized = procedure.dump().unwrap(); - let context = context_factory(); - let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap(); + let context_factory = env.context_factory(); + let mut procedure = + RegionMigrationProcedure::from_json(&serialized, context_factory).unwrap(); for _ in 1..3 { status = Some(procedure.execute(&ctx).await.unwrap()); } + assert_eq!(procedure.context.persistent_ctx.cluster_id, 2); assert_matches!(status.unwrap(), Status::Done); } } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs new file mode 100644 index 000000000000..c0ff94330723 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::procedure::region_migration::{Context, State}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DowngradeLeaderRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for DowngradeLeaderRegion { + async fn next(&mut self, _ctx: &mut Context) -> Result> { + todo!() + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/meta-srv/src/procedure/region_migration/migration_end.rs b/src/meta-srv/src/procedure/region_migration/migration_end.rs index 87e64b79dc22..c50e0a67b749 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_end.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_end.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + use common_procedure::Status; use serde::{Deserialize, Serialize}; use crate::error::Result; -use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; +use crate::procedure::region_migration::{Context, State}; #[derive(Debug, Serialize, Deserialize)] pub struct RegionMigrationEnd; @@ -24,16 +26,15 @@ pub struct RegionMigrationEnd; #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationEnd { - async fn next( - &mut self, - _: &Context, - _: &mut PersistentContext, - _: &mut VolatileContext, - ) -> Result> { + async fn next(&mut self, _: &mut Context) -> Result> { Ok(Box::new(RegionMigrationEnd)) } fn status(&self) -> Status { Status::Done } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 7f445079143a..6f2e43c8ace4 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -12,24 +12,289 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + +use common_meta::peer::Peer; +use common_meta::rpc::router::RegionRoute; use serde::{Deserialize, Serialize}; +use snafu::{location, Location, OptionExt, ResultExt}; +use store_api::storage::RegionId; -use crate::error::Result; -use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; +use super::downgrade_leader_region::DowngradeLeaderRegion; +use super::migration_end::RegionMigrationEnd; +use super::open_candidate_region::OpenCandidateRegion; +use crate::error::{self, Result}; +use crate::procedure::region_migration::{Context, State}; #[derive(Debug, Serialize, Deserialize)] -pub struct RegionMigrationStart {} +pub struct RegionMigrationStart; #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationStart { - async fn next( - &mut self, - _ctx: &Context, - _pc: &mut PersistentContext, - _vc: &mut VolatileContext, - ) -> Result> { - // TODO(weny): It will be added in the following PRs. - todo!() + /// Yields next [State]. + /// + /// If the expected leader region has been opened on `to_peer`, go to the MigrationEnd state. + /// + /// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state. + /// + /// Otherwise go to the OpenCandidateRegion state. + async fn next(&mut self, ctx: &mut Context) -> Result> { + let region_id = ctx.persistent_ctx.region_id; + let to_peer = &ctx.persistent_ctx.to_peer; + + let region_route = self.retrieve_region_route(ctx, region_id).await?; + + if self.check_leader_region_on_peer(®ion_route, to_peer)? { + Ok(Box::new(RegionMigrationEnd)) + } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { + Ok(Box::new(DowngradeLeaderRegion)) + } else { + Ok(Box::new(OpenCandidateRegion)) + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl RegionMigrationStart { + /// Retrieves region route. + /// + /// Abort(non-retry): + /// - TableRoute is not found. + /// - RegionRoute is not found. + /// + /// Retry: + /// - Failed to retrieve the metadata of table. + async fn retrieve_region_route( + &self, + ctx: &Context, + region_id: RegionId, + ) -> Result { + let table_id = region_id.table_id(); + let table_route = ctx + .table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(|e| error::Error::RetryLater { + reason: e.to_string(), + location: location!(), + })? + .context(error::TableRouteNotFoundSnafu { table_id })?; + + let region_route = table_route + .region_routes + .iter() + .find(|route| route.region.id == region_id) + .cloned() + .context(error::UnexpectedSnafu { + violated: format!( + "RegionRoute({}) is not found in TableRoute({})", + region_id, table_id + ), + })?; + + Ok(region_route) + } + + /// Checks whether the candidate region on region has been opened. + /// Returns true if it's been opened. + fn check_candidate_region_on_peer(&self, region_route: &RegionRoute, to_peer: &Peer) -> bool { + let region_opened = region_route + .follower_peers + .iter() + .any(|peer| peer.id == to_peer.id); + + region_opened + } + + /// Checks whether the leader region on region has been opened. + /// Returns true if it's been opened. + /// + /// Abort(non-retry): + /// - Leader peer of RegionRoute is not found. + fn check_leader_region_on_peer( + &self, + region_route: &RegionRoute, + to_peer: &Peer, + ) -> Result { + let region_id = region_route.region.id; + + let region_opened = region_route + .leader_peer + .as_ref() + .context(error::UnexpectedSnafu { + violated: format!("Leader peer is not found in TableRoute({})", region_id), + })? + .id + == to_peer.id; + + Ok(region_opened) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::key::test_utils::new_test_table_info; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + use crate::procedure::region_migration::test_util::TestingEnv; + use crate::procedure::region_migration::{ContextFactory, PersistentContext}; + + fn new_persistent_context() -> PersistentContext { + PersistentContext { + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + region_id: RegionId::new(1024, 1), + cluster_id: 0, + } + } + + #[tokio::test] + async fn test_table_route_is_not_found_error() { + let state = RegionMigrationStart; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let ctx = env.context_factory().new_context(persistent_context); + + let err = state + .retrieve_region_route(&ctx, RegionId::new(1024, 1)) + .await + .unwrap_err(); + + assert_matches!(err, Error::TableRouteNotFound { .. }); + + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_region_route_is_not_found_error() { + let state = RegionMigrationStart; + let persistent_context = new_persistent_context(); + let from_peer = persistent_context.from_peer.clone(); + + let env = TestingEnv::new(); + let ctx = env.context_factory().new_context(persistent_context); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_route = RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(from_peer.clone()), + ..Default::default() + }; + + env.table_metadata_manager() + .create_table_metadata(table_info, vec![region_route]) + .await + .unwrap(); + + let err = state + .retrieve_region_route(&ctx, RegionId::new(1024, 3)) + .await + .unwrap_err(); + + assert_matches!(err, Error::Unexpected { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_next_downgrade_leader_region_state() { + let mut state = Box::new(RegionMigrationStart); + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let to_peer = persistent_context.to_peer.clone(); + let region_id = persistent_context.region_id; + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![to_peer], + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state.next(&mut ctx).await.unwrap(); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + } + + #[tokio::test] + async fn test_next_migration_end_state() { + let mut state = Box::new(RegionMigrationStart); + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let to_peer = persistent_context.to_peer.clone(); + let from_peer = persistent_context.from_peer.clone(); + let region_id = persistent_context.region_id; + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(to_peer), + follower_peers: vec![from_peer], + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state.next(&mut ctx).await.unwrap(); + + let _ = next.as_any().downcast_ref::().unwrap(); + } + + #[tokio::test] + async fn test_next_open_candidate_region_state() { + let mut state = Box::new(RegionMigrationStart); + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(3)), + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let next = state.next(&mut ctx).await.unwrap(); + + let _ = next.as_any().downcast_ref::().unwrap(); } } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs new file mode 100644 index 000000000000..c056ec741601 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::procedure::region_migration::{Context, State}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct OpenCandidateRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for OpenCandidateRegion { + async fn next(&mut self, _ctx: &mut Context) -> Result> { + todo!() + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs new file mode 100644 index 000000000000..cfac3b4cb789 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -0,0 +1,59 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_procedure::{Context as ProcedureContext, ProcedureId}; +use common_procedure_test::MockContextProvider; + +use super::ContextFactoryImpl; + +/// `TestingEnv` provides components during the tests. +pub struct TestingEnv { + table_metadata_manager: TableMetadataManagerRef, +} + +impl TestingEnv { + /// Returns an empty [TestingEnv]. + pub fn new() -> Self { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + + Self { + table_metadata_manager, + } + } + + /// Returns a context of region migration procedure. + pub fn context_factory(&self) -> ContextFactoryImpl { + ContextFactoryImpl { + table_metadata_manager: self.table_metadata_manager.clone(), + volatile_ctx: Default::default(), + } + } + + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } + + /// Returns a [ProcedureContext] with a random [ProcedureId] and a [MockContextProvider]. + pub fn procedure_context() -> ProcedureContext { + ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + } + } +}