From ee2311e2a05d4df5f77c581fe0f9bec977bea590 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 14 Nov 2023 06:57:12 +0000 Subject: [PATCH 1/4] feat: add region migration procedure skeleton --- src/meta-srv/src/procedure.rs | 1 + .../src/procedure/region_migration.rs | 294 ++++++++++++++++++ .../region_migration/migration_end.rs | 39 +++ .../region_migration/migration_start.rs | 35 +++ 4 files changed, 369 insertions(+) create mode 100644 src/meta-srv/src/procedure/region_migration.rs create mode 100644 src/meta-srv/src/procedure/region_migration/migration_end.rs create mode 100644 src/meta-srv/src/procedure/region_migration/migration_start.rs diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 8e82b7cf16ea..45aaf061db94 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod region_failover; +pub mod region_migration; #[cfg(test)] mod tests; mod utils; diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs new file mode 100644 index 000000000000..fe1456c725f9 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -0,0 +1,294 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod migration_end; +pub(crate) mod migration_start; + +use std::fmt::Debug; + +use common_meta::peer::Peer; +use common_meta::table_name::TableName; +use common_procedure::error::{ + Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, +}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::storage::RegionId; + +use self::migration_start::RegionMigrationStart; +use crate::error::{Error, Result}; + +/// It's shared in each step and available even after recovering. +/// +/// It will only be updated/stored after the Red node has succeeded. +/// +/// **Notes: Stores with too large data in the context might incur replication overhead.** +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PersistentContext { + /// The [Peer] of migration source. + src_peer: Peer, + /// The [Peer] of migration destination. + dst_peer: Option, + /// Closes the migrated region on the `src_peer`. + close_migrated_region: bool, + /// The [TableName] of migration region. + table_name: TableName, + /// The [RegionId] of migration region. + region_id: RegionId, +} + +impl PersistentContext { + pub fn lock_key(&self) -> String { + let table_name = &self.table_name; + + common_catalog::format_full_table_name( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ) + } +} + +/// It's shared in each step and available in executing (including retrying). +/// +/// It will be dropped if the procedure runner crashes. +/// +/// The additional remote fetches are only required in the worst cases. +#[derive(Debug, Clone, Default)] +pub struct VolatileContext {} + +/// The context of procedure execution. +#[derive(Debug, Clone)] +pub struct Context {} + +#[async_trait::async_trait] +#[typetag::serde(tag = "region_migration_state")] +trait State: Sync + Send + Debug { + /// Yields the next state. + async fn next( + &mut self, + ctx: &Context, + pc: &mut PersistentContext, + vc: &mut VolatileContext, + ) -> Result>; + + /// Indicates the procedure execution status of the `State`. + fn status(&self) -> Status { + Status::Executing { persist: true } + } +} + +/// Persistent data of [RegionMigrationProcedure]. +#[derive(Debug, Serialize, Deserialize)] +pub struct RegionMigrationData { + context: PersistentContext, + state: Box, +} + +#[derive(Debug)] +pub struct RegionMigrationProcedure { + data: RegionMigrationData, + context: Context, + volatile_context: VolatileContext, +} + +// TODO(weny): remove it. +#[allow(dead_code)] +impl RegionMigrationProcedure { + const TYPE_NAME: &str = "metasrv-procedure::RegionMigration"; + + pub fn new(persistent_context: PersistentContext, context: Context) -> Self { + let state = Box::new(RegionMigrationStart {}); + Self::new_inner(state, persistent_context, context) + } + + fn new_inner( + state: Box, + persistent_context: PersistentContext, + context: Context, + ) -> Self { + Self { + data: RegionMigrationData { + context: persistent_context, + state, + }, + context, + volatile_context: VolatileContext::default(), + } + } + + fn from_json(json: &str, context: Context) -> ProcedureResult { + let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?; + + Ok(Self { + data, + context, + volatile_context: VolatileContext::default(), + }) + } +} + +#[async_trait::async_trait] +impl Procedure for RegionMigrationProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let data = &mut self.data; + let state = &mut data.state; + let persistent_context = &mut data.context; + let volatile_context = &mut self.volatile_context; + + *state = state + .next(&self.context, persistent_context, volatile_context) + .await + .map_err(|e| { + if matches!(e, Error::RetryLater { .. }) { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + })?; + Ok(state.status()) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + LockKey::single(self.data.context.lock_key()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use common_procedure::ProcedureId; + use common_procedure_test::MockContextProvider; + + use super::migration_end::RegionMigrationEnd; + use super::*; + + fn persistent_context_factory() -> PersistentContext { + PersistentContext { + src_peer: Peer::empty(1), + dst_peer: None, + close_migrated_region: false, + table_name: TableName::new("foo", "bar", "name"), + region_id: RegionId::new(1024, 1), + } + } + + fn context_factory() -> Context { + Context {} + } + + fn procedure_context_factory() -> ProcedureContext { + ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + } + } + + #[test] + fn test_lock_key() { + let persistent_context = persistent_context_factory(); + let expected_key = persistent_context.lock_key(); + + let context = context_factory(); + + let procedure = RegionMigrationProcedure::new(persistent_context, context); + + let key = procedure.lock_key(); + let keys = key.keys_to_lock().cloned().collect::>(); + + assert!(keys.contains(&expected_key)); + } + + #[test] + fn test_data_serialization() { + let persistent_context = persistent_context_factory(); + + let context = context_factory(); + + let procedure = RegionMigrationProcedure::new(persistent_context, context); + + let serialized = procedure.dump().unwrap(); + + let expected = r#"{"context":{"src_peer":{"id":1,"addr":""},"dst_peer":null,"close_migrated_region":false,"table_name":{"catalog_name":"foo","schema_name":"bar","table_name":"name"},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; + assert_eq!(expected, serialized); + } + + #[derive(Debug, Serialize, Deserialize, Default)] + pub struct MockState { + count: usize, + } + + #[async_trait::async_trait] + #[typetag::serde] + impl State for MockState { + async fn next( + &mut self, + _: &Context, + _: &mut PersistentContext, + _: &mut VolatileContext, + ) -> Result> { + if self.count == 2 { + Ok(Box::new(RegionMigrationEnd)) + } else { + Ok(Box::new(MockState { + count: self.count + 1, + })) + } + } + } + + #[tokio::test] + async fn test_execution_after_deserialized() { + fn new_mock_procedure() -> RegionMigrationProcedure { + let persistent_context = persistent_context_factory(); + let context = context_factory(); + let state = Box::::default(); + RegionMigrationProcedure::new_inner(state, persistent_context, context) + } + + let ctx = procedure_context_factory(); + let mut procedure = new_mock_procedure(); + let mut status = None; + for _ in 0..3 { + status = Some(procedure.execute(&ctx).await.unwrap()); + } + assert_matches!(status.unwrap(), Status::Done); + + let ctx = procedure_context_factory(); + let mut procedure = new_mock_procedure(); + + status = Some(procedure.execute(&ctx).await.unwrap()); + + let serialized = procedure.dump().unwrap(); + + let context = context_factory(); + let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap(); + + for _ in 1..3 { + status = Some(procedure.execute(&ctx).await.unwrap()); + } + assert_matches!(status.unwrap(), Status::Done); + } +} diff --git a/src/meta-srv/src/procedure/region_migration/migration_end.rs b/src/meta-srv/src/procedure/region_migration/migration_end.rs new file mode 100644 index 000000000000..87e64b79dc22 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/migration_end.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_procedure::Status; +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct RegionMigrationEnd; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for RegionMigrationEnd { + async fn next( + &mut self, + _: &Context, + _: &mut PersistentContext, + _: &mut VolatileContext, + ) -> Result> { + Ok(Box::new(RegionMigrationEnd)) + } + + fn status(&self) -> Status { + Status::Done + } +} diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs new file mode 100644 index 000000000000..7f445079143a --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct RegionMigrationStart {} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for RegionMigrationStart { + async fn next( + &mut self, + _ctx: &Context, + _pc: &mut PersistentContext, + _vc: &mut VolatileContext, + ) -> Result> { + // TODO(weny): It will be added in the following PRs. + todo!() + } +} From 3896bd62bf44119c3fb9cffd151a2040dec481e6 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 14 Nov 2023 11:58:41 +0000 Subject: [PATCH 2/4] chore: apply suggestions from CR --- .../src/procedure/region_migration.rs | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index fe1456c725f9..085ab2368a51 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -18,7 +18,6 @@ pub(crate) mod migration_start; use std::fmt::Debug; use common_meta::peer::Peer; -use common_meta::table_name::TableName; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; @@ -38,25 +37,21 @@ use crate::error::{Error, Result}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PersistentContext { /// The [Peer] of migration source. - src_peer: Peer, + from_peer: Peer, /// The [Peer] of migration destination. - dst_peer: Option, - /// Closes the migrated region on the `src_peer`. + to_peer: Option, + /// Closes the migrated region on the `from_peer`. close_migrated_region: bool, - /// The [TableName] of migration region. - table_name: TableName, /// The [RegionId] of migration region. region_id: RegionId, } impl PersistentContext { pub fn lock_key(&self) -> String { - let table_name = &self.table_name; - - common_catalog::format_full_table_name( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, + format!( + "{}/{}", + self.region_id.table_id(), + self.region_id.region_number() ) } } @@ -187,10 +182,9 @@ mod tests { fn persistent_context_factory() -> PersistentContext { PersistentContext { - src_peer: Peer::empty(1), - dst_peer: None, + from_peer: Peer::empty(1), + to_peer: None, close_migrated_region: false, - table_name: TableName::new("foo", "bar", "name"), region_id: RegionId::new(1024, 1), } } @@ -231,7 +225,7 @@ mod tests { let serialized = procedure.dump().unwrap(); - let expected = r#"{"context":{"src_peer":{"id":1,"addr":""},"dst_peer":null,"close_migrated_region":false,"table_name":{"catalog_name":"foo","schema_name":"bar","table_name":"name"},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; + let expected = r#"{"context":{"from_peer":{"id":1,"addr":""},"to_peer":null,"close_migrated_region":false,"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; assert_eq!(expected, serialized); } From f4112f29e2715a7f51fd9ad10a395280fec963ce Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 15 Nov 2023 03:52:04 +0000 Subject: [PATCH 3/4] chore: apply suggestions from CR --- src/meta-srv/src/procedure/region_migration.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 085ab2368a51..5ea08840647e 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -40,8 +40,6 @@ pub struct PersistentContext { from_peer: Peer, /// The [Peer] of migration destination. to_peer: Option, - /// Closes the migrated region on the `from_peer`. - close_migrated_region: bool, /// The [RegionId] of migration region. region_id: RegionId, } @@ -184,7 +182,6 @@ mod tests { PersistentContext { from_peer: Peer::empty(1), to_peer: None, - close_migrated_region: false, region_id: RegionId::new(1024, 1), } } @@ -225,7 +222,7 @@ mod tests { let serialized = procedure.dump().unwrap(); - let expected = r#"{"context":{"from_peer":{"id":1,"addr":""},"to_peer":null,"close_migrated_region":false,"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; + let expected = r#"{"context":{"from_peer":{"id":1,"addr":""},"to_peer":null,"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#; assert_eq!(expected, serialized); } From 5b42ea5dfbbb32058ec025d58748d62eac4c42b4 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 15 Nov 2023 08:49:16 +0000 Subject: [PATCH 4/4] refactor: unify the lock key --- src/meta-srv/src/procedure/region_failover.rs | 6 ++---- src/meta-srv/src/procedure/region_migration.rs | 7 ++----- src/meta-srv/src/procedure/utils.rs | 6 ++++++ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index a09bb1c2c0c4..b56b17e7058f 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -46,6 +46,7 @@ use table::metadata::TableId; use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu}; use crate::lock::DistLockRef; use crate::metasrv::{SelectorContext, SelectorRef}; +use crate::procedure::utils::region_lock_key; use crate::service::mailbox::MailboxRef; const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30); @@ -377,10 +378,7 @@ impl Procedure for RegionFailoverProcedure { fn lock_key(&self) -> LockKey { let region_ident = &self.node.failed_region; - let region_key = format!( - "{}/region-{}", - region_ident.table_id, region_ident.region_number - ); + let region_key = region_lock_key(region_ident.table_id, region_ident.region_number); LockKey::single(region_key) } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 5ea08840647e..6bd13c65abde 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -28,6 +28,7 @@ use store_api::storage::RegionId; use self::migration_start::RegionMigrationStart; use crate::error::{Error, Result}; +use crate::procedure::utils::region_lock_key; /// It's shared in each step and available even after recovering. /// @@ -46,11 +47,7 @@ pub struct PersistentContext { impl PersistentContext { pub fn lock_key(&self) -> String { - format!( - "{}/{}", - self.region_id.table_id(), - self.region_id.region_number() - ) + region_lock_key(self.region_id.table_id(), self.region_id.region_number()) } } diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index c57925697dd7..6f36dc7f9d5b 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -12,6 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use store_api::storage::{RegionNumber, TableId}; + +pub fn region_lock_key(table_id: TableId, region_number: RegionNumber) -> String { + format!("{}/region-{}", table_id, region_number) +} + #[cfg(test)] pub mod mock { use std::io::Error;