Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add region migration procedure skeleton #2743

Merged
merged 4 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta-srv/src/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod region_failover;
pub mod region_migration;
#[cfg(test)]
mod tests;
mod utils;
6 changes: 2 additions & 4 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use table::metadata::TableId;
use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::procedure::utils::region_lock_key;
use crate::service::mailbox::MailboxRef;

const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -377,10 +378,7 @@ impl Procedure for RegionFailoverProcedure {

fn lock_key(&self) -> LockKey {
let region_ident = &self.node.failed_region;
let region_key = format!(
"{}/region-{}",
region_ident.table_id, region_ident.region_number
);
let region_key = region_lock_key(region_ident.table_id, region_ident.region_number);
LockKey::single(region_key)
}
}
Expand Down
282 changes: 282 additions & 0 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod migration_end;
pub(crate) mod migration_start;

use std::fmt::Debug;

use common_meta::peer::Peer;
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionId;

use self::migration_start::RegionMigrationStart;
use crate::error::{Error, Result};
use crate::procedure::utils::region_lock_key;

/// It's shared in each step and available even after recovering.
///
/// It will only be updated/stored after the Red node has succeeded.
///
/// **Notes: Stores with too large data in the context might incur replication overhead.**
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistentContext {
/// The [Peer] of migration source.
from_peer: Peer,
/// The [Peer] of migration destination.
to_peer: Option<Peer>,
/// The [RegionId] of migration region.
region_id: RegionId,
}

impl PersistentContext {
pub fn lock_key(&self) -> String {
region_lock_key(self.region_id.table_id(), self.region_id.region_number())
}
}

/// It's shared in each step and available in executing (including retrying).
///
/// It will be dropped if the procedure runner crashes.
///
/// The additional remote fetches are only required in the worst cases.
#[derive(Debug, Clone, Default)]
pub struct VolatileContext {}

/// The context of procedure execution.
#[derive(Debug, Clone)]
pub struct Context {}

#[async_trait::async_trait]
#[typetag::serde(tag = "region_migration_state")]
trait State: Sync + Send + Debug {
/// Yields the next state.
async fn next(
&mut self,
ctx: &Context,
pc: &mut PersistentContext,
vc: &mut VolatileContext,
) -> Result<Box<dyn State>>;

/// Indicates the procedure execution status of the `State`.
fn status(&self) -> Status {
Status::Executing { persist: true }
}
}

/// Persistent data of [RegionMigrationProcedure].
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationData {
context: PersistentContext,
state: Box<dyn State>,
}

#[derive(Debug)]
pub struct RegionMigrationProcedure {
data: RegionMigrationData,
context: Context,
volatile_context: VolatileContext,
}

// TODO(weny): remove it.
#[allow(dead_code)]
impl RegionMigrationProcedure {
const TYPE_NAME: &str = "metasrv-procedure::RegionMigration";

pub fn new(persistent_context: PersistentContext, context: Context) -> Self {
let state = Box::new(RegionMigrationStart {});
Self::new_inner(state, persistent_context, context)
}

fn new_inner(
state: Box<dyn State>,
persistent_context: PersistentContext,
context: Context,
) -> Self {
Self {
data: RegionMigrationData {
context: persistent_context,
state,
},
context,
volatile_context: VolatileContext::default(),
}
}

fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self {
data,
context,
volatile_context: VolatileContext::default(),
})
}
}

#[async_trait::async_trait]
impl Procedure for RegionMigrationProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let data = &mut self.data;
let state = &mut data.state;
let persistent_context = &mut data.context;
let volatile_context = &mut self.volatile_context;

*state = state
.next(&self.context, persistent_context, volatile_context)
.await
.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
LockKey::single(self.data.context.lock_key())
}
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;

use common_procedure::ProcedureId;
use common_procedure_test::MockContextProvider;

use super::migration_end::RegionMigrationEnd;
use super::*;

fn persistent_context_factory() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: None,
region_id: RegionId::new(1024, 1),
}
}

fn context_factory() -> Context {
Context {}
}

fn procedure_context_factory() -> ProcedureContext {
ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
}
}

#[test]
fn test_lock_key() {
let persistent_context = persistent_context_factory();
let expected_key = persistent_context.lock_key();

let context = context_factory();

let procedure = RegionMigrationProcedure::new(persistent_context, context);

let key = procedure.lock_key();
let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();

assert!(keys.contains(&expected_key));
}

#[test]
fn test_data_serialization() {
let persistent_context = persistent_context_factory();

let context = context_factory();

let procedure = RegionMigrationProcedure::new(persistent_context, context);

let serialized = procedure.dump().unwrap();

let expected = r#"{"context":{"from_peer":{"id":1,"addr":""},"to_peer":null,"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct MockState {
count: usize,
}

#[async_trait::async_trait]
#[typetag::serde]
impl State for MockState {
async fn next(
&mut self,
_: &Context,
_: &mut PersistentContext,
_: &mut VolatileContext,
) -> Result<Box<dyn State>> {
if self.count == 2 {
Ok(Box::new(RegionMigrationEnd))
} else {
Ok(Box::new(MockState {
count: self.count + 1,
}))
}
}
}

#[tokio::test]
async fn test_execution_after_deserialized() {
fn new_mock_procedure() -> RegionMigrationProcedure {
let persistent_context = persistent_context_factory();
let context = context_factory();
let state = Box::<MockState>::default();
RegionMigrationProcedure::new_inner(state, persistent_context, context)
}

let ctx = procedure_context_factory();
let mut procedure = new_mock_procedure();
let mut status = None;
for _ in 0..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_matches!(status.unwrap(), Status::Done);

let ctx = procedure_context_factory();
let mut procedure = new_mock_procedure();

status = Some(procedure.execute(&ctx).await.unwrap());

let serialized = procedure.dump().unwrap();

let context = context_factory();
let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap();

for _ in 1..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
}
assert_matches!(status.unwrap(), Status::Done);
}
}
39 changes: 39 additions & 0 deletions src/meta-srv/src/procedure/region_migration/migration_end.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_procedure::Status;
use serde::{Deserialize, Serialize};

use crate::error::Result;
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};

#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationEnd;

#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationEnd {
async fn next(
&mut self,
_: &Context,
_: &mut PersistentContext,
_: &mut VolatileContext,
) -> Result<Box<dyn State>> {
Ok(Box::new(RegionMigrationEnd))
}

fn status(&self) -> Status {
Status::Done
}
}
35 changes: 35 additions & 0 deletions src/meta-srv/src/procedure/region_migration/migration_start.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

use crate::error::Result;
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};

#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationStart {}

#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationStart {
async fn next(
&mut self,
_ctx: &Context,
_pc: &mut PersistentContext,
_vc: &mut VolatileContext,
) -> Result<Box<dyn State>> {
// TODO(weny): It will be added in the following PRs.
todo!()
}
}
6 changes: 6 additions & 0 deletions src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use store_api::storage::{RegionNumber, TableId};

pub fn region_lock_key(table_id: TableId, region_number: RegionNumber) -> String {
format!("{}/region-{}", table_id, region_number)
}

#[cfg(test)]
pub mod mock {
use std::io::Error;
Expand Down
Loading