Skip to content

Commit

Permalink
feat: add region migration procedure skeleton (#2743)
Browse files Browse the repository at this point in the history
* feat: add region migration procedure skeleton

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: unify the lock key
  • Loading branch information
WenyXu authored Nov 15, 2023
1 parent 3329da5 commit 9bd1013
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/meta-srv/src/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod region_failover;
pub mod region_migration;
#[cfg(test)]
mod tests;
mod utils;
6 changes: 2 additions & 4 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use table::metadata::TableId;
use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::procedure::utils::region_lock_key;
use crate::service::mailbox::MailboxRef;

const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -371,10 +372,7 @@ impl Procedure for RegionFailoverProcedure {

fn lock_key(&self) -> LockKey {
let region_ident = &self.node.failed_region;
let region_key = format!(
"{}/region-{}",
region_ident.table_id, region_ident.region_number
);
let region_key = region_lock_key(region_ident.table_id, region_ident.region_number);
LockKey::single(region_key)
}
}
Expand Down
282 changes: 282 additions & 0 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod migration_end;
pub(crate) mod migration_start;

use std::fmt::Debug;

use common_meta::peer::Peer;
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionId;

use self::migration_start::RegionMigrationStart;
use crate::error::{Error, Result};
use crate::procedure::utils::region_lock_key;

/// It's shared in each step and available even after recovering.
///
/// It will only be updated/stored after the Red node has succeeded.
///
/// **Notes: Stores with too large data in the context might incur replication overhead.**
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistentContext {
/// The [Peer] of migration source.
from_peer: Peer,
/// The [Peer] of migration destination.
to_peer: Option<Peer>,
/// The [RegionId] of migration region.
region_id: RegionId,
}

impl PersistentContext {
pub fn lock_key(&self) -> String {
region_lock_key(self.region_id.table_id(), self.region_id.region_number())
}
}

/// It's shared in each step and available in executing (including retrying).
///
/// It will be dropped if the procedure runner crashes.
///
/// The additional remote fetches are only required in the worst cases.
#[derive(Debug, Clone, Default)]
pub struct VolatileContext {}

/// The context of procedure execution.
#[derive(Debug, Clone)]
pub struct Context {}

#[async_trait::async_trait]
#[typetag::serde(tag = "region_migration_state")]
trait State: Sync + Send + Debug {
/// Yields the next state.
async fn next(
&mut self,
ctx: &Context,
pc: &mut PersistentContext,
vc: &mut VolatileContext,
) -> Result<Box<dyn State>>;

/// Indicates the procedure execution status of the `State`.
fn status(&self) -> Status {
Status::Executing { persist: true }
}
}

/// Persistent data of [RegionMigrationProcedure].
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationData {
context: PersistentContext,
state: Box<dyn State>,
}

#[derive(Debug)]
pub struct RegionMigrationProcedure {
data: RegionMigrationData,
context: Context,
volatile_context: VolatileContext,
}

// TODO(weny): remove it.
#[allow(dead_code)]
impl RegionMigrationProcedure {
const TYPE_NAME: &str = "metasrv-procedure::RegionMigration";

pub fn new(persistent_context: PersistentContext, context: Context) -> Self {
let state = Box::new(RegionMigrationStart {});
Self::new_inner(state, persistent_context, context)
}

fn new_inner(
state: Box<dyn State>,
persistent_context: PersistentContext,
context: Context,
) -> Self {
Self {
data: RegionMigrationData {
context: persistent_context,
state,
},
context,
volatile_context: VolatileContext::default(),
}
}

fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self {
data,
context,
volatile_context: VolatileContext::default(),
})
}
}

#[async_trait::async_trait]
impl Procedure for RegionMigrationProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let data = &mut self.data;
let state = &mut data.state;
let persistent_context = &mut data.context;
let volatile_context = &mut self.volatile_context;

*state = state
.next(&self.context, persistent_context, volatile_context)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
LockKey::single(self.data.context.lock_key())
}
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;

use common_procedure::ProcedureId;
use common_procedure_test::MockContextProvider;

use super::migration_end::RegionMigrationEnd;
use super::*;

fn persistent_context_factory() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: None,
region_id: RegionId::new(1024, 1),
}
}

fn context_factory() -> Context {
Context {}
}

fn procedure_context_factory() -> ProcedureContext {
ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
}
}

#[test]
fn test_lock_key() {
let persistent_context = persistent_context_factory();
let expected_key = persistent_context.lock_key();

let context = context_factory();

let procedure = RegionMigrationProcedure::new(persistent_context, context);

let key = procedure.lock_key();
let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();

assert!(keys.contains(&expected_key));
}

#[test]
fn test_data_serialization() {
let persistent_context = persistent_context_factory();

let context = context_factory();

let procedure = RegionMigrationProcedure::new(persistent_context, context);

let serialized = procedure.dump().unwrap();

let expected = r#"{"context":{"from_peer":{"id":1,"addr":""},"to_peer":null,"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct MockState {
count: usize,
}

#[async_trait::async_trait]
#[typetag::serde]
impl State for MockState {
async fn next(
&mut self,
_: &Context,
_: &mut PersistentContext,
_: &mut VolatileContext,
) -> Result<Box<dyn State>> {
if self.count == 2 {
Ok(Box::new(RegionMigrationEnd))
} else {
Ok(Box::new(MockState {
count: self.count + 1,
}))
}
}
}

#[tokio::test]
async fn test_execution_after_deserialized() {
fn new_mock_procedure() -> RegionMigrationProcedure {
let persistent_context = persistent_context_factory();
let context = context_factory();
let state = Box::<MockState>::default();
RegionMigrationProcedure::new_inner(state, persistent_context, context)
}

let ctx = procedure_context_factory();
let mut procedure = new_mock_procedure();
let mut status = None;
for _ in 0..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_matches!(status.unwrap(), Status::Done);

let ctx = procedure_context_factory();
let mut procedure = new_mock_procedure();

status = Some(procedure.execute(&ctx).await.unwrap());

let serialized = procedure.dump().unwrap();

let context = context_factory();
let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap();

for _ in 1..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_matches!(status.unwrap(), Status::Done);
}
}
39 changes: 39 additions & 0 deletions src/meta-srv/src/procedure/region_migration/migration_end.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_procedure::Status;
use serde::{Deserialize, Serialize};

use crate::error::Result;
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};

#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationEnd;

#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationEnd {
async fn next(
&mut self,
_: &Context,
_: &mut PersistentContext,
_: &mut VolatileContext,
) -> Result<Box<dyn State>> {
Ok(Box::new(RegionMigrationEnd))
}

fn status(&self) -> Status {
Status::Done
}
}
35 changes: 35 additions & 0 deletions src/meta-srv/src/procedure/region_migration/migration_start.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

use crate::error::Result;
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};

#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationStart {}

#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationStart {
async fn next(
&mut self,
_ctx: &Context,
_pc: &mut PersistentContext,
_vc: &mut VolatileContext,
) -> Result<Box<dyn State>> {
// TODO(weny): It will be added in the following PRs.
todo!()
}
}
6 changes: 6 additions & 0 deletions src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use store_api::storage::{RegionNumber, TableId};

pub fn region_lock_key(table_id: TableId, region_number: RegionNumber) -> String {
format!("{}/region-{}", table_id, region_number)
}

#[cfg(test)]
pub mod mock {
use std::io::Error;
Expand Down

0 comments on commit 9bd1013

Please sign in to comment.