Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 16, 2023
1 parent 8670a21 commit a2cba0f
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 70 deletions.
74 changes: 37 additions & 37 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub(crate) mod test_util;

use std::any::Any;
use std::fmt::Debug;
use std::sync::{Arc, Mutex, MutexGuard};

use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
Expand Down Expand Up @@ -71,17 +70,17 @@ pub struct VolatileContext {}

/// Used to generate new [Context].
pub trait ContextFactory {
fn new_context(self, persistent_ctx: Arc<Mutex<PersistentContext>>) -> Context;
fn new_context(self, persistent_ctx: PersistentContext) -> Context;
}

/// Default implementation.
pub struct ContextFactoryImpl {
volatile_ctx: Mutex<VolatileContext>,
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
}

impl ContextFactory for ContextFactoryImpl {
fn new_context(self, persistent_ctx: Arc<Mutex<PersistentContext>>) -> Context {
fn new_context(self, persistent_ctx: PersistentContext) -> Context {
Context {
persistent_ctx,
volatile_ctx: self.volatile_ctx,
Expand All @@ -90,10 +89,12 @@ impl ContextFactory for ContextFactoryImpl {
}
}

// TODO(weny): remove it.
#[allow(dead_code)]
/// The context of procedure execution.
pub struct Context {
persistent_ctx: Arc<Mutex<PersistentContext>>,
volatile_ctx: Mutex<VolatileContext>,
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
}

Expand All @@ -102,23 +103,13 @@ impl Context {
pub fn server_addr(&self) -> &str {
todo!()
}

/// Returns the [MutexGuard] of [PersistentContext].
pub fn persistent_ctx_guard(&self) -> MutexGuard<'_, PersistentContext> {
self.persistent_ctx.lock().unwrap()
}

/// Returns the [MutexGuard] of [VolatileContext].
pub fn volatile_ctx_guard(&self) -> MutexGuard<'_, VolatileContext> {
self.volatile_ctx.lock().unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(tag = "region_migration_state")]
trait State: Sync + Send + Debug {
/// Yields the next state.
async fn next(&mut self, ctx: &Context) -> Result<Box<dyn State>>;
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>>;

/// Indicates the procedure execution status of the `State`.
fn status(&self) -> Status {
Expand All @@ -131,13 +122,20 @@ trait State: Sync + Send + Debug {

/// Persistent data of [RegionMigrationProcedure].
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationData {
persistent_ctx: Arc<Mutex<PersistentContext>>,
pub struct RegionMigrationDataOwned {
persistent_ctx: PersistentContext,
state: Box<dyn State>,
}

/// Persistent data of [RegionMigrationProcedure].
#[derive(Debug, Serialize)]
pub struct RegionMigrationData<'a> {
persistent_ctx: &'a PersistentContext,
state: &'a dyn State,
}

pub struct RegionMigrationProcedure {
data: RegionMigrationData,
state: Box<dyn State>,
context: Context,
}

Expand All @@ -159,22 +157,21 @@ impl RegionMigrationProcedure {
persistent_context: PersistentContext,
context_factory: impl ContextFactory,
) -> Self {
let shared_persistent_context = Arc::new(Mutex::new(persistent_context));
Self {
data: RegionMigrationData {
persistent_ctx: shared_persistent_context.clone(),
state,
},
context: context_factory.new_context(shared_persistent_context),
state,
context: context_factory.new_context(persistent_context),
}
}

fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult<Self> {
let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?;
let RegionMigrationDataOwned {
persistent_ctx,
state,
} = serde_json::from_str(json).context(FromJsonSnafu)?;

let context = context_factory.new_context(data.persistent_ctx.clone());
let context = context_factory.new_context(persistent_ctx);

Ok(Self { data, context })
Ok(Self { state, context })
}
}

Expand All @@ -185,10 +182,9 @@ impl Procedure for RegionMigrationProcedure {
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let data = &mut self.data;
let state = &mut data.state;
let state = &mut self.state;

*state = state.next(&self.context).await.map_err(|e| {
*state = state.next(&mut self.context).await.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
Expand All @@ -199,11 +195,15 @@ impl Procedure for RegionMigrationProcedure {
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
let data = RegionMigrationData {
state: self.state.as_ref(),
persistent_ctx: &self.context.persistent_ctx,
};
serde_json::to_string(&data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let key = self.data.persistent_ctx.lock().unwrap().lock_key();
let key = self.context.persistent_ctx.lock_key();
LockKey::single(key)
}
}
Expand Down Expand Up @@ -262,8 +262,8 @@ mod tests {
#[async_trait::async_trait]
#[typetag::serde]
impl State for MockState {
async fn next(&mut self, ctx: &Context) -> Result<Box<dyn State>> {
let mut pc = ctx.persistent_ctx_guard();
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
let pc = &mut ctx.persistent_ctx;

if pc.cluster_id == 2 {
Ok(Box::new(RegionMigrationEnd))
Expand Down Expand Up @@ -311,7 +311,7 @@ mod tests {
for _ in 1..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_eq!(procedure.context.persistent_ctx_guard().cluster_id, 2);
assert_eq!(procedure.context.persistent_ctx.cluster_id, 2);
assert_matches!(status.unwrap(), Status::Done);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct DowngradeLeaderRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(&mut self, _ctx: &Context) -> Result<Box<dyn State>> {
async fn next(&mut self, _ctx: &mut Context) -> Result<Box<dyn State>> {
todo!()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct RegionMigrationEnd;
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationEnd {
async fn next(&mut self, _: &Context) -> Result<Box<dyn State>> {
async fn next(&mut self, _: &mut Context) -> Result<Box<dyn State>> {
Ok(Box::new(RegionMigrationEnd))
}

Expand Down
47 changes: 17 additions & 30 deletions src/meta-srv/src/procedure/region_migration/migration_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,15 @@ impl State for RegionMigrationStart {
/// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state.
///
/// Otherwise go to the OpenCandidateRegion state.
async fn next(&mut self, ctx: &Context) -> Result<Box<dyn State>> {
let (region_id, to_peer) = {
let pc = ctx.persistent_ctx_guard();
(pc.region_id, pc.to_peer.clone())
};
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
let region_id = ctx.persistent_ctx.region_id;
let to_peer = &ctx.persistent_ctx.to_peer;

let region_route = self.retrieve_regions_route(ctx, region_id).await?;
let region_route = self.retrieve_region_route(ctx, region_id).await?;

if self.check_leader_region_on_peer(&region_route, &to_peer)? {
if self.check_leader_region_on_peer(&region_route, to_peer)? {
Ok(Box::new(RegionMigrationEnd))
} else if self.check_candidate_region_on_peer(&region_route, &to_peer) {
} else if self.check_candidate_region_on_peer(&region_route, to_peer) {
Ok(Box::new(DowngradeLeaderRegion))
} else {
Ok(Box::new(OpenCandidateRegion))
Expand All @@ -70,7 +68,7 @@ impl RegionMigrationStart {
///
/// Retry:
/// - Failed to retrieve the metadata of table.
async fn retrieve_regions_route(
async fn retrieve_region_route(
&self,
ctx: &Context,
region_id: RegionId,
Expand Down Expand Up @@ -142,7 +140,6 @@ impl RegionMigrationStart {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::{Arc, Mutex};

use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
Expand All @@ -168,12 +165,10 @@ mod tests {
let state = RegionMigrationStart;
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let ctx = env
.context_factory()
.new_context(Arc::new(Mutex::new(persistent_context)));
let ctx = env.context_factory().new_context(persistent_context);

let err = state
.retrieve_regions_route(&ctx, RegionId::new(1024, 1))
.retrieve_region_route(&ctx, RegionId::new(1024, 1))
.await
.unwrap_err();

Expand All @@ -189,9 +184,7 @@ mod tests {
let from_peer = persistent_context.from_peer.clone();

let env = TestingEnv::new();
let ctx = env
.context_factory()
.new_context(Arc::new(Mutex::new(persistent_context)));
let ctx = env.context_factory().new_context(persistent_context);

let table_info = new_test_table_info(1024, vec![1]).into();
let region_route = RegionRoute {
Expand All @@ -206,7 +199,7 @@ mod tests {
.unwrap();

let err = state
.retrieve_regions_route(&ctx, RegionId::new(1024, 3))
.retrieve_region_route(&ctx, RegionId::new(1024, 3))
.await
.unwrap_err();

Expand All @@ -224,9 +217,7 @@ mod tests {
let region_id = persistent_context.region_id;

let env = TestingEnv::new();
let ctx = env
.context_factory()
.new_context(Arc::new(Mutex::new(persistent_context)));
let mut ctx = env.context_factory().new_context(persistent_context);

let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
Expand All @@ -241,7 +232,7 @@ mod tests {
.await
.unwrap();

let next = state.next(&ctx).await.unwrap();
let next = state.next(&mut ctx).await.unwrap();

let _ = next
.as_any()
Expand All @@ -260,9 +251,7 @@ mod tests {
let region_id = persistent_context.region_id;

let env = TestingEnv::new();
let ctx = env
.context_factory()
.new_context(Arc::new(Mutex::new(persistent_context)));
let mut ctx = env.context_factory().new_context(persistent_context);

let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
Expand All @@ -277,7 +266,7 @@ mod tests {
.await
.unwrap();

let next = state.next(&ctx).await.unwrap();
let next = state.next(&mut ctx).await.unwrap();

let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}
Expand All @@ -290,9 +279,7 @@ mod tests {
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let ctx = env
.context_factory()
.new_context(Arc::new(Mutex::new(persistent_context)));
let mut ctx = env.context_factory().new_context(persistent_context);

let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
Expand All @@ -306,7 +293,7 @@ mod tests {
.await
.unwrap();

let next = state.next(&ctx).await.unwrap();
let next = state.next(&mut ctx).await.unwrap();

let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct OpenCandidateRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for OpenCandidateRegion {
async fn next(&mut self, _ctx: &Context) -> Result<Box<dyn State>> {
async fn next(&mut self, _ctx: &mut Context) -> Result<Box<dyn State>> {
todo!()
}

Expand Down

0 comments on commit a2cba0f

Please sign in to comment.