From e7f536b06a24be7a4dd109125a4a522fcbf29a64 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 16 Nov 2023 19:28:45 +0000 Subject: [PATCH] chore: apply suggestions from CR --- .../src/procedure/region_migration.rs | 74 +++++++++---------- .../downgrade_leader_region.rs | 2 +- .../region_migration/migration_end.rs | 2 +- .../region_migration/migration_start.rs | 40 ++++------ .../region_migration/open_candidate_region.rs | 2 +- 5 files changed, 54 insertions(+), 66 deletions(-) diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 824320b4ae3b..8ec21826952e 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -21,7 +21,6 @@ pub(crate) mod test_util; use std::any::Any; use std::fmt::Debug; -use std::sync::{Arc, Mutex, MutexGuard}; use common_meta::key::TableMetadataManagerRef; use common_meta::peer::Peer; @@ -71,17 +70,17 @@ pub struct VolatileContext {} /// Used to generate new [Context]. pub trait ContextFactory { - fn new_context(self, persistent_ctx: Arc>) -> Context; + fn new_context(self, persistent_ctx: PersistentContext) -> Context; } /// Default implementation. pub struct ContextFactoryImpl { - volatile_ctx: Mutex, + volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, } impl ContextFactory for ContextFactoryImpl { - fn new_context(self, persistent_ctx: Arc>) -> Context { + fn new_context(self, persistent_ctx: PersistentContext) -> Context { Context { persistent_ctx, volatile_ctx: self.volatile_ctx, @@ -90,10 +89,12 @@ impl ContextFactory for ContextFactoryImpl { } } +// TODO(weny): remove it. +#[allow(dead_code)] /// The context of procedure execution. pub struct Context { - persistent_ctx: Arc>, - volatile_ctx: Mutex, + persistent_ctx: PersistentContext, + volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, } @@ -102,23 +103,13 @@ impl Context { pub fn server_addr(&self) -> &str { todo!() } - - /// Returns the [MutexGuard] of [PersistentContext]. - pub fn persistent_ctx_guard(&self) -> MutexGuard<'_, PersistentContext> { - self.persistent_ctx.lock().unwrap() - } - - /// Returns the [MutexGuard] of [VolatileContext]. - pub fn volatile_ctx_guard(&self) -> MutexGuard<'_, VolatileContext> { - self.volatile_ctx.lock().unwrap() - } } #[async_trait::async_trait] #[typetag::serde(tag = "region_migration_state")] trait State: Sync + Send + Debug { /// Yields the next state. - async fn next(&mut self, ctx: &Context) -> Result>; + async fn next(&mut self, ctx: &mut Context) -> Result>; /// Indicates the procedure execution status of the `State`. fn status(&self) -> Status { @@ -131,13 +122,20 @@ trait State: Sync + Send + Debug { /// Persistent data of [RegionMigrationProcedure]. #[derive(Debug, Serialize, Deserialize)] -pub struct RegionMigrationData { - persistent_ctx: Arc>, +pub struct RegionMigrationDataOwned { + persistent_ctx: PersistentContext, state: Box, } +/// Persistent data of [RegionMigrationProcedure]. +#[derive(Debug, Serialize)] +pub struct RegionMigrationData<'a> { + persistent_ctx: &'a PersistentContext, + state: &'a dyn State, +} + pub struct RegionMigrationProcedure { - data: RegionMigrationData, + state: Box, context: Context, } @@ -159,22 +157,21 @@ impl RegionMigrationProcedure { persistent_context: PersistentContext, context_factory: impl ContextFactory, ) -> Self { - let shared_persistent_context = Arc::new(Mutex::new(persistent_context)); Self { - data: RegionMigrationData { - persistent_ctx: shared_persistent_context.clone(), - state, - }, - context: context_factory.new_context(shared_persistent_context), + state, + context: context_factory.new_context(persistent_context), } } fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult { - let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?; + let RegionMigrationDataOwned { + persistent_ctx, + state, + } = serde_json::from_str(json).context(FromJsonSnafu)?; - let context = context_factory.new_context(data.persistent_ctx.clone()); + let context = context_factory.new_context(persistent_ctx); - Ok(Self { data, context }) + Ok(Self { state, context }) } } @@ -185,10 +182,9 @@ impl Procedure for RegionMigrationProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let data = &mut self.data; - let state = &mut data.state; + let state = &mut self.state; - *state = state.next(&self.context).await.map_err(|e| { + *state = state.next(&mut self.context).await.map_err(|e| { if matches!(e, Error::RetryLater { .. }) { ProcedureError::retry_later(e) } else { @@ -199,11 +195,15 @@ impl Procedure for RegionMigrationProcedure { } fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.data).context(ToJsonSnafu) + let data = RegionMigrationData { + state: self.state.as_ref(), + persistent_ctx: &self.context.persistent_ctx, + }; + serde_json::to_string(&data).context(ToJsonSnafu) } fn lock_key(&self) -> LockKey { - let key = self.data.persistent_ctx.lock().unwrap().lock_key(); + let key = self.context.persistent_ctx.lock_key(); LockKey::single(key) } } @@ -262,8 +262,8 @@ mod tests { #[async_trait::async_trait] #[typetag::serde] impl State for MockState { - async fn next(&mut self, ctx: &Context) -> Result> { - let mut pc = ctx.persistent_ctx_guard(); + async fn next(&mut self, ctx: &mut Context) -> Result> { + let pc = &mut ctx.persistent_ctx; if pc.cluster_id == 2 { Ok(Box::new(RegionMigrationEnd)) @@ -311,7 +311,7 @@ mod tests { for _ in 1..3 { status = Some(procedure.execute(&ctx).await.unwrap()); } - assert_eq!(procedure.context.persistent_ctx_guard().cluster_id, 2); + assert_eq!(procedure.context.persistent_ctx.cluster_id, 2); assert_matches!(status.unwrap(), Status::Done); } } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 9699b29edc0e..c0ff94330723 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -25,7 +25,7 @@ pub struct DowngradeLeaderRegion; #[async_trait::async_trait] #[typetag::serde] impl State for DowngradeLeaderRegion { - async fn next(&mut self, _ctx: &Context) -> Result> { + async fn next(&mut self, _ctx: &mut Context) -> Result> { todo!() } diff --git a/src/meta-srv/src/procedure/region_migration/migration_end.rs b/src/meta-srv/src/procedure/region_migration/migration_end.rs index 579333aff6bb..c50e0a67b749 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_end.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_end.rs @@ -26,7 +26,7 @@ pub struct RegionMigrationEnd; #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationEnd { - async fn next(&mut self, _: &Context) -> Result> { + async fn next(&mut self, _: &mut Context) -> Result> { Ok(Box::new(RegionMigrationEnd)) } diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 8fb8a47b328d..bed20d2667f0 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -39,17 +39,15 @@ impl State for RegionMigrationStart { /// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state. /// /// Otherwise go to the OpenCandidateRegion state. - async fn next(&mut self, ctx: &Context) -> Result> { - let (region_id, to_peer) = { - let pc = ctx.persistent_ctx_guard(); - (pc.region_id, pc.to_peer.clone()) - }; + async fn next(&mut self, ctx: &mut Context) -> Result> { + let region_id = ctx.persistent_ctx.region_id; + let to_peer = &ctx.persistent_ctx.to_peer; let region_route = self.retrieve_regions_route(ctx, region_id).await?; - if self.check_leader_region_on_peer(®ion_route, &to_peer)? { + if self.check_leader_region_on_peer(®ion_route, to_peer)? { Ok(Box::new(RegionMigrationEnd)) - } else if self.check_candidate_region_on_peer(®ion_route, &to_peer) { + } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { Ok(Box::new(DowngradeLeaderRegion)) } else { Ok(Box::new(OpenCandidateRegion)) @@ -142,7 +140,7 @@ impl RegionMigrationStart { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; - use std::sync::{Arc, Mutex}; + use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -168,9 +166,7 @@ mod tests { let state = RegionMigrationStart; let env = TestingEnv::new(); let persistent_context = new_persistent_context(); - let ctx = env - .context_factory() - .new_context(Arc::new(Mutex::new(persistent_context))); + let ctx = env.context_factory().new_context(persistent_context); let err = state .retrieve_regions_route(&ctx, RegionId::new(1024, 1)) @@ -189,9 +185,7 @@ mod tests { let from_peer = persistent_context.from_peer.clone(); let env = TestingEnv::new(); - let ctx = env - .context_factory() - .new_context(Arc::new(Mutex::new(persistent_context))); + let ctx = env.context_factory().new_context(persistent_context); let table_info = new_test_table_info(1024, vec![1]).into(); let region_route = RegionRoute { @@ -224,9 +218,7 @@ mod tests { let region_id = persistent_context.region_id; let env = TestingEnv::new(); - let ctx = env - .context_factory() - .new_context(Arc::new(Mutex::new(persistent_context))); + let mut ctx = env.context_factory().new_context(persistent_context); let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { @@ -241,7 +233,7 @@ mod tests { .await .unwrap(); - let next = state.next(&ctx).await.unwrap(); + let next = state.next(&mut ctx).await.unwrap(); let _ = next .as_any() @@ -260,9 +252,7 @@ mod tests { let region_id = persistent_context.region_id; let env = TestingEnv::new(); - let ctx = env - .context_factory() - .new_context(Arc::new(Mutex::new(persistent_context))); + let mut ctx = env.context_factory().new_context(persistent_context); let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { @@ -277,7 +267,7 @@ mod tests { .await .unwrap(); - let next = state.next(&ctx).await.unwrap(); + let next = state.next(&mut ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); } @@ -290,9 +280,7 @@ mod tests { let persistent_context = new_persistent_context(); let region_id = persistent_context.region_id; let env = TestingEnv::new(); - let ctx = env - .context_factory() - .new_context(Arc::new(Mutex::new(persistent_context))); + let mut ctx = env.context_factory().new_context(persistent_context); let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { @@ -306,7 +294,7 @@ mod tests { .await .unwrap(); - let next = state.next(&ctx).await.unwrap(); + let next = state.next(&mut ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 9a6f40f33dfd..c056ec741601 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -25,7 +25,7 @@ pub struct OpenCandidateRegion; #[async_trait::async_trait] #[typetag::serde] impl State for OpenCandidateRegion { - async fn next(&mut self, _ctx: &Context) -> Result> { + async fn next(&mut self, _ctx: &mut Context) -> Result> { todo!() }