Skip to content

Commit

Permalink
refactor: move PersistentContext and VolatileContext into Context
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 16, 2023
1 parent bd1a545 commit 8670a21
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 119 deletions.
129 changes: 75 additions & 54 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) mod test_util;

use std::any::Any;
use std::fmt::Debug;
use std::sync::{Arc, Mutex, MutexGuard};

use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
Expand Down Expand Up @@ -68,8 +69,31 @@ impl PersistentContext {
#[derive(Debug, Clone, Default)]
pub struct VolatileContext {}

/// Used to generate new [Context].
pub trait ContextFactory {
fn new_context(self, persistent_ctx: Arc<Mutex<PersistentContext>>) -> Context;
}

/// Default implementation.
pub struct ContextFactoryImpl {
volatile_ctx: Mutex<VolatileContext>,
table_metadata_manager: TableMetadataManagerRef,
}

impl ContextFactory for ContextFactoryImpl {
fn new_context(self, persistent_ctx: Arc<Mutex<PersistentContext>>) -> Context {
Context {
persistent_ctx,
volatile_ctx: self.volatile_ctx,
table_metadata_manager: self.table_metadata_manager,
}
}
}

/// The context of procedure execution.
pub struct Context {
persistent_ctx: Arc<Mutex<PersistentContext>>,
volatile_ctx: Mutex<VolatileContext>,
table_metadata_manager: TableMetadataManagerRef,
}

Expand All @@ -78,18 +102,23 @@ impl Context {
pub fn server_addr(&self) -> &str {
todo!()
}

/// Returns the [MutexGuard] of [PersistentContext].
pub fn persistent_ctx_guard(&self) -> MutexGuard<'_, PersistentContext> {
self.persistent_ctx.lock().unwrap()
}

/// Returns the [MutexGuard] of [VolatileContext].
pub fn volatile_ctx_guard(&self) -> MutexGuard<'_, VolatileContext> {
self.volatile_ctx.lock().unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(tag = "region_migration_state")]
trait State: Sync + Send + Debug {
/// Yields the next state.
async fn next(
&mut self,
ctx: &Context,
pc: &mut PersistentContext,
vc: &mut VolatileContext,
) -> Result<Box<dyn State>>;
async fn next(&mut self, ctx: &Context) -> Result<Box<dyn State>>;

/// Indicates the procedure execution status of the `State`.
fn status(&self) -> Status {
Expand All @@ -103,49 +132,49 @@ trait State: Sync + Send + Debug {
/// Persistent data of [RegionMigrationProcedure].
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationData {
context: PersistentContext,
persistent_ctx: Arc<Mutex<PersistentContext>>,
state: Box<dyn State>,
}

pub struct RegionMigrationProcedure {
data: RegionMigrationData,
context: Context,
volatile_context: VolatileContext,
}

// TODO(weny): remove it.
#[allow(dead_code)]
impl RegionMigrationProcedure {
const TYPE_NAME: &str = "metasrv-procedure::RegionMigration";

pub fn new(persistent_context: PersistentContext, context: Context) -> Self {
pub fn new(
persistent_context: PersistentContext,
context_factory: impl ContextFactory,
) -> Self {
let state = Box::new(RegionMigrationStart {});
Self::new_inner(state, persistent_context, context)
Self::new_inner(state, persistent_context, context_factory)
}

fn new_inner(
state: Box<dyn State>,
persistent_context: PersistentContext,
context: Context,
context_factory: impl ContextFactory,
) -> Self {
let shared_persistent_context = Arc::new(Mutex::new(persistent_context));
Self {
data: RegionMigrationData {
context: persistent_context,
persistent_ctx: shared_persistent_context.clone(),
state,
},
context,
volatile_context: VolatileContext::default(),
context: context_factory.new_context(shared_persistent_context),
}
}

fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult<Self> {
let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self {
data,
context,
volatile_context: VolatileContext::default(),
})
let context = context_factory.new_context(data.persistent_ctx.clone());

Ok(Self { data, context })
}
}

Expand All @@ -158,19 +187,14 @@ impl Procedure for RegionMigrationProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let data = &mut self.data;
let state = &mut data.state;
let persistent_context = &mut data.context;
let volatile_context = &mut self.volatile_context;

*state = state
.next(&self.context, persistent_context, volatile_context)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;

*state = state.next(&self.context).await.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())
}

Expand All @@ -179,7 +203,8 @@ impl Procedure for RegionMigrationProcedure {
}

fn lock_key(&self) -> LockKey {
LockKey::single(self.data.context.lock_key())
let key = self.data.persistent_ctx.lock().unwrap().lock_key();
LockKey::single(key)
}
}

Expand All @@ -206,7 +231,7 @@ mod tests {
let expected_key = persistent_context.lock_key();

let env = TestingEnv::new();
let context = env.context();
let context = env.context_factory();

let procedure = RegionMigrationProcedure::new(persistent_context, context);

Expand All @@ -221,36 +246,30 @@ mod tests {
let persistent_context = new_persistent_context();

let env = TestingEnv::new();
let context = env.context();
let context = env.context_factory();

let procedure = RegionMigrationProcedure::new(persistent_context, context);

let serialized = procedure.dump().unwrap();

let expected = r#"{"context":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct MockState {
count: usize,
}
pub struct MockState;

#[async_trait::async_trait]
#[typetag::serde]
impl State for MockState {
async fn next(
&mut self,
_: &Context,
_: &mut PersistentContext,
_: &mut VolatileContext,
) -> Result<Box<dyn State>> {
if self.count == 2 {
async fn next(&mut self, ctx: &Context) -> Result<Box<dyn State>> {
let mut pc = ctx.persistent_ctx_guard();

if pc.cluster_id == 2 {
Ok(Box::new(RegionMigrationEnd))
} else {
Ok(Box::new(MockState {
count: self.count + 1,
}))
pc.cluster_id += 1;
Ok(Box::new(MockState))
}
}

Expand All @@ -265,9 +284,9 @@ mod tests {

fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
let persistent_context = new_persistent_context();
let context = env.context();
let context_factory = env.context_factory();
let state = Box::<MockState>::default();
RegionMigrationProcedure::new_inner(state, persistent_context, context)
RegionMigrationProcedure::new_inner(state, persistent_context, context_factory)
}

let ctx = TestingEnv::procedure_context();
Expand All @@ -285,12 +304,14 @@ mod tests {

let serialized = procedure.dump().unwrap();

let context = env.context();
let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap();
let context_factory = env.context_factory();
let mut procedure =
RegionMigrationProcedure::from_json(&serialized, context_factory).unwrap();

for _ in 1..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_eq!(procedure.context.persistent_ctx_guard().cluster_id, 2);
assert_matches!(status.unwrap(), Status::Done);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@ use std::any::Any;
use serde::{Deserialize, Serialize};

use crate::error::Result;
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};
use crate::procedure::region_migration::{Context, State};

#[derive(Debug, Serialize, Deserialize)]
pub struct DowngradeLeaderRegion;

#[async_trait::async_trait]
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(
&mut self,
_ctx: &Context,
_pc: &mut PersistentContext,
_vc: &mut VolatileContext,
) -> Result<Box<dyn State>> {
async fn next(&mut self, _ctx: &Context) -> Result<Box<dyn State>> {
todo!()
}

Expand Down
9 changes: 2 additions & 7 deletions src/meta-srv/src/procedure/region_migration/migration_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,15 @@ use common_procedure::Status;
use serde::{Deserialize, Serialize};

use crate::error::Result;
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};
use crate::procedure::region_migration::{Context, State};

#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationEnd;

#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationEnd {
async fn next(
&mut self,
_: &Context,
_: &mut PersistentContext,
_: &mut VolatileContext,
) -> Result<Box<dyn State>> {
async fn next(&mut self, _: &Context) -> Result<Box<dyn State>> {
Ok(Box::new(RegionMigrationEnd))
}

Expand Down
Loading

0 comments on commit 8670a21

Please sign in to comment.