Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add migration start step #2756

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,13 @@ pub enum Error {
},
}

impl Error {
/// Returns `true` if the error is retryable.
pub fn is_retryable(&self) -> bool {
matches!(self, Error::RetryLater { .. })
}
}

pub type Result<T> = std::result::Result<T, Error>;

define_into_tonic_status!(Error);
Expand Down
219 changes: 127 additions & 92 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod downgrade_leader_region;
pub(crate) mod migration_end;
pub(crate) mod migration_start;
pub(crate) mod open_candidate_region;
#[cfg(test)]
pub(crate) mod test_util;

use std::any::Any;
use std::fmt::Debug;

use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
use common_meta::ClusterId;
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
Expand All @@ -37,10 +44,12 @@ use crate::procedure::utils::region_lock_key;
/// **Notes: Stores with too large data in the context might incur replication overhead.**
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistentContext {
/// The Id of the cluster.
cluster_id: ClusterId,
/// The [Peer] of migration source.
from_peer: Peer,
/// The [Peer] of migration destination.
to_peer: Option<Peer>,
to_peer: Peer,
/// The [RegionId] of migration region.
region_id: RegionId,
}
Expand All @@ -59,74 +68,110 @@ impl PersistentContext {
#[derive(Debug, Clone, Default)]
pub struct VolatileContext {}

/// Used to generate new [Context].
pub trait ContextFactory {
fn new_context(self, persistent_ctx: PersistentContext) -> Context;
}

/// Default implementation.
pub struct ContextFactoryImpl {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
}

impl ContextFactory for ContextFactoryImpl {
fn new_context(self, persistent_ctx: PersistentContext) -> Context {
Context {
persistent_ctx,
volatile_ctx: self.volatile_ctx,
table_metadata_manager: self.table_metadata_manager,
}
}
}

// TODO(weny): remove it.
#[allow(dead_code)]
/// The context of procedure execution.
#[derive(Debug, Clone)]
pub struct Context {}
pub struct Context {
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
}

impl Context {
/// Returns address of meta server.
pub fn server_addr(&self) -> &str {
todo!()
}
}

#[async_trait::async_trait]
#[typetag::serde(tag = "region_migration_state")]
trait State: Sync + Send + Debug {
/// Yields the next state.
async fn next(
&mut self,
ctx: &Context,
pc: &mut PersistentContext,
vc: &mut VolatileContext,
) -> Result<Box<dyn State>>;
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>>;

/// Indicates the procedure execution status of the `State`.
fn status(&self) -> Status {
Status::Executing { persist: true }
}

/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
}

/// Persistent data of [RegionMigrationProcedure].
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationData {
context: PersistentContext,
pub struct RegionMigrationDataOwned {
persistent_ctx: PersistentContext,
state: Box<dyn State>,
}

#[derive(Debug)]
/// Persistent data of [RegionMigrationProcedure].
#[derive(Debug, Serialize)]
pub struct RegionMigrationData<'a> {
persistent_ctx: &'a PersistentContext,
state: &'a dyn State,
}

pub struct RegionMigrationProcedure {
data: RegionMigrationData,
state: Box<dyn State>,
context: Context,
volatile_context: VolatileContext,
}

// TODO(weny): remove it.
#[allow(dead_code)]
impl RegionMigrationProcedure {
const TYPE_NAME: &str = "metasrv-procedure::RegionMigration";

pub fn new(persistent_context: PersistentContext, context: Context) -> Self {
pub fn new(
persistent_context: PersistentContext,
context_factory: impl ContextFactory,
) -> Self {
let state = Box::new(RegionMigrationStart {});
Self::new_inner(state, persistent_context, context)
Self::new_inner(state, persistent_context, context_factory)
}

fn new_inner(
state: Box<dyn State>,
persistent_context: PersistentContext,
context: Context,
context_factory: impl ContextFactory,
) -> Self {
Self {
data: RegionMigrationData {
context: persistent_context,
state,
},
context,
volatile_context: VolatileContext::default(),
state,
context: context_factory.new_context(persistent_context),
}
}

fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?;
fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult<Self> {
let RegionMigrationDataOwned {
persistent_ctx,
state,
} = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self {
data,
context,
volatile_context: VolatileContext::default(),
})
let context = context_factory.new_context(persistent_ctx);

Ok(Self { state, context })
}
}

Expand All @@ -137,69 +182,56 @@ impl Procedure for RegionMigrationProcedure {
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let data = &mut self.data;
let state = &mut data.state;
let persistent_context = &mut data.context;
let volatile_context = &mut self.volatile_context;

*state = state
.next(&self.context, persistent_context, volatile_context)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
let state = &mut self.state;

*state = state.next(&mut self.context).await.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
let data = RegionMigrationData {
state: self.state.as_ref(),
persistent_ctx: &self.context.persistent_ctx,
};
serde_json::to_string(&data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
LockKey::single(self.data.context.lock_key())
let key = self.context.persistent_ctx.lock_key();
LockKey::single(key)
}
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;

use common_procedure::ProcedureId;
use common_procedure_test::MockContextProvider;

use super::migration_end::RegionMigrationEnd;
use super::*;
use crate::procedure::region_migration::test_util::TestingEnv;

fn persistent_context_factory() -> PersistentContext {
fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: None,
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
}
}

fn context_factory() -> Context {
Context {}
}

fn procedure_context_factory() -> ProcedureContext {
ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
cluster_id: 0,
}
}

#[test]
fn test_lock_key() {
let persistent_context = persistent_context_factory();
let persistent_context = new_persistent_context();
let expected_key = persistent_context.lock_key();

let context = context_factory();
let env = TestingEnv::new();
let context = env.context_factory();

let procedure = RegionMigrationProcedure::new(persistent_context, context);

Expand All @@ -211,72 +243,75 @@ mod tests {

#[test]
fn test_data_serialization() {
let persistent_context = persistent_context_factory();
let persistent_context = new_persistent_context();

let context = context_factory();
let env = TestingEnv::new();
let context = env.context_factory();

let procedure = RegionMigrationProcedure::new(persistent_context, context);

let serialized = procedure.dump().unwrap();

let expected = r#"{"context":{"from_peer":{"id":1,"addr":""},"to_peer":null,"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct MockState {
count: usize,
}
pub struct MockState;

#[async_trait::async_trait]
#[typetag::serde]
impl State for MockState {
async fn next(
&mut self,
_: &Context,
_: &mut PersistentContext,
_: &mut VolatileContext,
) -> Result<Box<dyn State>> {
if self.count == 2 {
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
let pc = &mut ctx.persistent_ctx;

if pc.cluster_id == 2 {
Ok(Box::new(RegionMigrationEnd))
} else {
Ok(Box::new(MockState {
count: self.count + 1,
}))
pc.cluster_id += 1;
Ok(Box::new(MockState))
}
}

fn as_any(&self) -> &dyn Any {
self
}
}

#[tokio::test]
async fn test_execution_after_deserialized() {
fn new_mock_procedure() -> RegionMigrationProcedure {
let persistent_context = persistent_context_factory();
let context = context_factory();
let env = TestingEnv::new();

fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
let persistent_context = new_persistent_context();
let context_factory = env.context_factory();
let state = Box::<MockState>::default();
RegionMigrationProcedure::new_inner(state, persistent_context, context)
RegionMigrationProcedure::new_inner(state, persistent_context, context_factory)
}

let ctx = procedure_context_factory();
let mut procedure = new_mock_procedure();
let ctx = TestingEnv::procedure_context();
let mut procedure = new_mock_procedure(&env);
let mut status = None;
for _ in 0..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_matches!(status.unwrap(), Status::Done);

let ctx = procedure_context_factory();
let mut procedure = new_mock_procedure();
let ctx = TestingEnv::procedure_context();
let mut procedure = new_mock_procedure(&env);

status = Some(procedure.execute(&ctx).await.unwrap());

let serialized = procedure.dump().unwrap();

let context = context_factory();
let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap();
let context_factory = env.context_factory();
let mut procedure =
RegionMigrationProcedure::from_json(&serialized, context_factory).unwrap();

for _ in 1..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_eq!(procedure.context.persistent_ctx.cluster_id, 2);
assert_matches!(status.unwrap(), Status::Done);
}
}
Loading
Loading