Skip to content

Commit

Permalink
feat: add migration start state
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 16, 2023
1 parent 9bd1013 commit cac8a98
Show file tree
Hide file tree
Showing 7 changed files with 467 additions and 39 deletions.
7 changes: 7 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,13 @@ pub enum Error {
},
}

impl Error {
/// Returns `true` if the error is retryable.
pub fn is_retryable(&self) -> bool {
matches!(self, Error::RetryLater { .. })
}
}

pub type Result<T> = std::result::Result<T, Error>;

define_into_tonic_status!(Error);
Expand Down
80 changes: 47 additions & 33 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod downgrade_leader_region;
pub(crate) mod migration_end;
pub(crate) mod migration_start;
pub(crate) mod open_candidate_region;
#[cfg(test)]
pub(crate) mod test_util;

use std::any::Any;
use std::fmt::Debug;

use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
use common_meta::ClusterId;
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
Expand All @@ -37,10 +44,12 @@ use crate::procedure::utils::region_lock_key;
/// **Notes: Stores with too large data in the context might incur replication overhead.**
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistentContext {
/// The Id of the cluster.
cluster_id: ClusterId,
/// The [Peer] of migration source.
from_peer: Peer,
/// The [Peer] of migration destination.
to_peer: Option<Peer>,
to_peer: Peer,
/// The [RegionId] of migration region.
region_id: RegionId,
}
Expand All @@ -60,8 +69,16 @@ impl PersistentContext {
pub struct VolatileContext {}

/// The context of procedure execution.
#[derive(Debug, Clone)]
pub struct Context {}
pub struct Context {
table_metadata_manager: TableMetadataManagerRef,
}

impl Context {
/// Returns address of meta server.
pub fn server_addr(&self) -> &str {
todo!()
}
}

#[async_trait::async_trait]
#[typetag::serde(tag = "region_migration_state")]
Expand All @@ -78,6 +95,9 @@ trait State: Sync + Send + Debug {
fn status(&self) -> Status {
Status::Executing { persist: true }
}

/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
}

/// Persistent data of [RegionMigrationProcedure].
Expand All @@ -87,7 +107,6 @@ pub struct RegionMigrationData {
state: Box<dyn State>,
}

#[derive(Debug)]
pub struct RegionMigrationProcedure {
data: RegionMigrationData,
context: Context,
Expand Down Expand Up @@ -167,39 +186,27 @@ impl Procedure for RegionMigrationProcedure {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;

use common_procedure::ProcedureId;
use common_procedure_test::MockContextProvider;

use super::migration_end::RegionMigrationEnd;
use super::*;
use crate::procedure::region_migration::test_util::TestingEnv;

fn persistent_context_factory() -> PersistentContext {
fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: None,
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
}
}

fn context_factory() -> Context {
Context {}
}

fn procedure_context_factory() -> ProcedureContext {
ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
cluster_id: 0,
}
}

#[test]
fn test_lock_key() {
let persistent_context = persistent_context_factory();
let persistent_context = new_persistent_context();
let expected_key = persistent_context.lock_key();

let context = context_factory();
let env = TestingEnv::new();
let context = env.context();

let procedure = RegionMigrationProcedure::new(persistent_context, context);

Expand All @@ -211,9 +218,10 @@ mod tests {

#[test]
fn test_data_serialization() {
let persistent_context = persistent_context_factory();
let persistent_context = new_persistent_context();

let context = context_factory();
let env = TestingEnv::new();
let context = env.context();

let procedure = RegionMigrationProcedure::new(persistent_context, context);

Expand Down Expand Up @@ -245,33 +253,39 @@ mod tests {
}))
}
}

fn as_any(&self) -> &dyn Any {
self
}
}

#[tokio::test]
async fn test_execution_after_deserialized() {
fn new_mock_procedure() -> RegionMigrationProcedure {
let persistent_context = persistent_context_factory();
let context = context_factory();
let env = TestingEnv::new();

fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
let persistent_context = new_persistent_context();
let context = env.context();
let state = Box::<MockState>::default();
RegionMigrationProcedure::new_inner(state, persistent_context, context)
}

let ctx = procedure_context_factory();
let mut procedure = new_mock_procedure();
let ctx = TestingEnv::procedure_context();
let mut procedure = new_mock_procedure(&env);
let mut status = None;
for _ in 0..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_matches!(status.unwrap(), Status::Done);

let ctx = procedure_context_factory();
let mut procedure = new_mock_procedure();
let ctx = TestingEnv::procedure_context();
let mut procedure = new_mock_procedure(&env);

status = Some(procedure.execute(&ctx).await.unwrap());

let serialized = procedure.dump().unwrap();

let context = context_factory();
let context = env.context();
let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap();

for _ in 1..3 {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use serde::{Deserialize, Serialize};

use crate::error::Result;
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};

#[derive(Debug, Serialize, Deserialize)]
pub struct DowngradeLeaderRegion;

#[async_trait::async_trait]
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(
&mut self,
_ctx: &Context,
_pc: &mut PersistentContext,
_vc: &mut VolatileContext,
) -> Result<Box<dyn State>> {
todo!()
}

fn as_any(&self) -> &dyn Any {
self
}
}
6 changes: 6 additions & 0 deletions src/meta-srv/src/procedure/region_migration/migration_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use common_procedure::Status;
use serde::{Deserialize, Serialize};

Expand All @@ -36,4 +38,8 @@ impl State for RegionMigrationEnd {
fn status(&self) -> Status {
Status::Done
}

fn as_any(&self) -> &dyn Any {
self
}
}
Loading

0 comments on commit cac8a98

Please sign in to comment.