diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 65178d2f0157..bbdd1c8e7eab 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -56,6 +56,14 @@ impl TableRouteValue { version: self.version + 1, } } + + /// Returns the version. + /// + /// For test purpose. + #[cfg(any(tets, feature = "testing"))] + pub fn version(&self) -> u64 { + self.version + } } impl TableMetaKey for TableRouteKey { diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 4edc69bc3a59..4f53deb41dea 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -53,7 +53,7 @@ use crate::service::mailbox::{BroadcastChannel, MailboxRef}; /// It will only be updated/stored after the Red node has succeeded. /// /// **Notes: Stores with too large data in the context might incur replication overhead.** -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistentContext { /// The Id of the cluster. cluster_id: ClusterId, @@ -263,14 +263,9 @@ impl Context { #[async_trait::async_trait] #[typetag::serde(tag = "region_migration_state")] -trait State: Sync + Send + Debug { - /// Yields the next state. - async fn next(&mut self, ctx: &mut Context) -> Result>; - - /// Indicates the procedure execution status of the `State`. - fn status(&self) -> Status { - Status::Executing { persist: true } - } +pub(crate) trait State: Sync + Send + Debug { + /// Yields the next [State] and [Status]. + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)>; /// Returns as [Any](std::any::Any). fn as_any(&self) -> &dyn Any; @@ -340,14 +335,16 @@ impl Procedure for RegionMigrationProcedure { async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &mut self.state; - *state = state.next(&mut self.context).await.map_err(|e| { + let (next, status) = state.next(&mut self.context).await.map_err(|e| { if matches!(e, Error::RetryLater { .. }) { ProcedureError::retry_later(e) } else { ProcedureError::external(e) } })?; - Ok(state.status()) + + *state = next; + Ok(status) } fn dump(&self) -> ProcedureResult { @@ -367,20 +364,21 @@ impl Procedure for RegionMigrationProcedure { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::sync::Arc; + + use common_meta::distributed_time_constants::REGION_LEASE_SECS; + use common_meta::key::test_utils::new_test_table_info; + use common_meta::rpc::router::{Region, RegionRoute}; use super::migration_end::RegionMigrationEnd; + use super::update_metadata::UpdateMetadata; use super::*; use crate::handler::HeartbeatMailbox; - use crate::procedure::region_migration::test_util::TestingEnv; + use crate::procedure::region_migration::test_util::*; use crate::service::mailbox::Channel; fn new_persistent_context() -> PersistentContext { - PersistentContext { - from_peer: Peer::empty(1), - to_peer: Peer::empty(2), - region_id: RegionId::new(1024, 1), - cluster_id: 0, - } + test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) } #[test] @@ -414,20 +412,30 @@ mod tests { assert_eq!(expected, serialized); } + #[test] + fn test_backward_compatibility() { + let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); + // NOTES: Changes it will break backward compatibility. + let serialized = r#"{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#; + let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap(); + + assert_eq!(persistent_ctx, deserialized); + } + #[derive(Debug, Serialize, Deserialize, Default)] pub struct MockState; #[async_trait::async_trait] #[typetag::serde] impl State for MockState { - async fn next(&mut self, ctx: &mut Context) -> Result> { + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { let pc = &mut ctx.persistent_ctx; if pc.cluster_id == 2 { - Ok(Box::new(RegionMigrationEnd)) + Ok((Box::new(RegionMigrationEnd), Status::Done)) } else { pc.cluster_id += 1; - Ok(Box::new(MockState)) + Ok((Box::new(MockState), Status::executing(false))) } } @@ -497,4 +505,145 @@ mod tests { let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap(); assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024)); } + + fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec { + vec![ + // MigrationStart + Step::next( + "Should be the update metadata for downgrading", + None, + Assertion::simple(assert_update_metadata_downgrade, assert_need_persist), + ), + // UpdateMetadata::Downgrade + Step::next( + "Should be the downgrade leader region", + None, + Assertion::simple(assert_downgrade_leader_region, assert_no_persist), + ), + // Downgrade Candidate + Step::next( + "Should be the upgrade candidate region", + Some(mock_datanode_reply( + from_peer_id, + Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))), + )), + Assertion::simple(assert_upgrade_candidate_region, assert_no_persist), + ), + // Upgrade Candidate + Step::next( + "Should be the update metadata for upgrading", + Some(mock_datanode_reply( + to_peer_id, + Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))), + )), + Assertion::simple(assert_update_metadata_upgrade, assert_no_persist), + ), + // UpdateMetadata::Upgrade + Step::next( + "Should be the region migration end", + None, + Assertion::simple(assert_region_migration_end, assert_done), + ), + // RegionMigrationEnd + Step::next( + "Should be the region migration end again", + None, + Assertion::simple(assert_region_migration_end, assert_done), + ), + ] + } + + #[tokio::test] + async fn test_procedure_flow() { + common_telemetry::init_default_ut_logging(); + + let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); + let state = Box::new(RegionMigrationStart); + + // The table metadata. + let from_peer_id = persistent_context.from_peer.id; + let to_peer_id = persistent_context.to_peer.id; + let from_peer = persistent_context.from_peer.clone(); + let to_peer = persistent_context.to_peer.clone(); + let region_id = persistent_context.region_id; + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(from_peer), + follower_peers: vec![to_peer], + ..Default::default() + }]; + + let suite = ProcedureMigrationTestSuite::new(persistent_context, state); + suite.init_table_metadata(table_info, region_routes).await; + + let steps = procedure_flow_steps(from_peer_id, to_peer_id); + let timer = Instant::now(); + + // Run the table tests. + let runner = ProcedureMigrationSuiteRunner::new(suite) + .steps(steps) + .run_once() + .await; + + // Ensure it didn't run into the slow path. + assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2); + + runner.suite.verify_table_metadata().await; + } + + #[tokio::test] + async fn test_procedure_flow_idempotent() { + common_telemetry::init_default_ut_logging(); + + let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); + let state = Box::new(RegionMigrationStart); + + // The table metadata. + let from_peer_id = persistent_context.from_peer.id; + let to_peer_id = persistent_context.to_peer.id; + let from_peer = persistent_context.from_peer.clone(); + let to_peer = persistent_context.to_peer.clone(); + let region_id = persistent_context.region_id; + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(from_peer), + follower_peers: vec![to_peer], + ..Default::default() + }]; + + let suite = ProcedureMigrationTestSuite::new(persistent_context, state); + suite.init_table_metadata(table_info, region_routes).await; + + let steps = procedure_flow_steps(from_peer_id, to_peer_id); + let setup_to_latest_persisted_state = Step::setup( + "Sets state to UpdateMetadata::Downgrade", + merge_before_test_fn(vec![ + setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))), + Arc::new(reset_volatile_ctx), + ]), + ); + + let steps = [ + steps.clone(), + vec![setup_to_latest_persisted_state.clone()], + steps.clone()[1..].to_vec(), + vec![setup_to_latest_persisted_state], + steps.clone()[1..].to_vec(), + ] + .concat(); + let timer = Instant::now(); + + // Run the table tests. + let runner = ProcedureMigrationSuiteRunner::new(suite) + .steps(steps.clone()) + .run_once() + .await; + + // Ensure it didn't run into the slow path. + assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2); + + runner.suite.verify_table_metadata().await; + } } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index f3834cc5e9b5..6cb6939c9316 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -20,7 +20,8 @@ use common_meta::distributed_time_constants::{MAILBOX_RTT_SECS, REGION_LEASE_SEC use common_meta::instruction::{ DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, }; -use common_telemetry::warn; +use common_procedure::Status; +use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use tokio::time::sleep; @@ -53,18 +54,24 @@ impl Default for DowngradeLeaderRegion { #[async_trait::async_trait] #[typetag::serde] impl State for DowngradeLeaderRegion { - async fn next(&mut self, ctx: &mut Context) -> Result> { + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { // Ensures the `leader_region_lease_deadline` must exist after recovering. ctx.volatile_ctx .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS)); self.downgrade_region_with_retry(ctx).await; - // Safety: must exist. if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() { + info!( + "Running into the downgrade leader slow path, sleep until {:?}", + deadline + ); tokio::time::sleep_until(*deadline).await; } - Ok(Box::::default()) + Ok(( + Box::::default(), + Status::executing(false), + )) } fn as_any(&self) -> &dyn Any { @@ -202,16 +209,14 @@ impl DowngradeLeaderRegion { mod tests { use std::assert_matches::assert_matches; - use api::v1::meta::mailbox_message::Payload; use common_meta::peer::Peer; - use common_time::util::current_time_millis; use store_api::storage::RegionId; use tokio::time::Instant; use super::*; use crate::error::Error; use crate::procedure::region_migration::test_util::{ - new_close_region_reply, send_mock_reply, TestingEnv, + new_close_region_reply, new_downgrade_region_reply, send_mock_reply, TestingEnv, }; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; @@ -224,29 +229,6 @@ mod tests { } } - fn new_downgrade_region_reply( - id: u64, - last_entry_id: Option, - exist: bool, - error: Option, - ) -> MailboxMessage { - MailboxMessage { - id, - subject: "mock".to_string(), - from: "datanode".to_string(), - to: "meta".to_string(), - timestamp_millis: current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id, - exists: exist, - error, - })) - .unwrap(), - )), - } - } - #[tokio::test] async fn test_datanode_is_unreachable() { let state = DowngradeLeaderRegion::default(); @@ -504,7 +486,7 @@ mod tests { }); let timer = Instant::now(); - let next = state.next(&mut ctx).await.unwrap(); + let (next, _) = state.next(&mut ctx).await.unwrap(); let elapsed = timer.elapsed().as_secs(); assert!(elapsed < REGION_LEASE_SECS / 2); assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1)); diff --git a/src/meta-srv/src/procedure/region_migration/migration_abort.rs b/src/meta-srv/src/procedure/region_migration/migration_abort.rs index c47864ae5bab..af5684304534 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_abort.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_abort.rs @@ -37,17 +37,13 @@ impl RegionMigrationAbort { #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationAbort { - async fn next(&mut self, _: &mut Context) -> Result> { + async fn next(&mut self, _: &mut Context) -> Result<(Box, Status)> { error::MigrationAbortSnafu { reason: &self.reason, } .fail() } - fn status(&self) -> Status { - Status::Done - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/meta-srv/src/procedure/region_migration/migration_end.rs b/src/meta-srv/src/procedure/region_migration/migration_end.rs index c50e0a67b749..dd7efdc92bb7 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_end.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_end.rs @@ -26,12 +26,8 @@ pub struct RegionMigrationEnd; #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationEnd { - async fn next(&mut self, _: &mut Context) -> Result> { - Ok(Box::new(RegionMigrationEnd)) - } - - fn status(&self) -> Status { - Status::Done + async fn next(&mut self, _: &mut Context) -> Result<(Box, Status)> { + Ok((Box::new(RegionMigrationEnd), Status::Done)) } fn as_any(&self) -> &dyn Any { diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index b10a26886aec..80475904e6f3 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -16,16 +16,24 @@ use std::any::Any; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; +use common_procedure::Status; use serde::{Deserialize, Serialize}; use snafu::OptionExt; use store_api::storage::RegionId; -use super::downgrade_leader_region::DowngradeLeaderRegion; use super::migration_end::RegionMigrationEnd; use super::open_candidate_region::OpenCandidateRegion; +use super::update_metadata::UpdateMetadata; use crate::error::{self, Result}; use crate::procedure::region_migration::{Context, State}; +/// The behaviors: +/// +/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state. +/// +/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state. +/// +/// Otherwise go to the [OpenCandidateRegion] state. #[derive(Debug, Serialize, Deserialize)] pub struct RegionMigrationStart; @@ -34,22 +42,22 @@ pub struct RegionMigrationStart; impl State for RegionMigrationStart { /// Yields next [State]. /// - /// If the expected leader region has been opened on `to_peer`, go to the MigrationEnd state. + /// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state. /// - /// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state. + /// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state. /// - /// Otherwise go to the OpenCandidateRegion state. - async fn next(&mut self, ctx: &mut Context) -> Result> { + /// Otherwise go to the [OpenCandidateRegion] state. + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { let region_id = ctx.persistent_ctx.region_id; let region_route = self.retrieve_region_route(ctx, region_id).await?; let to_peer = &ctx.persistent_ctx.to_peer; if self.check_leader_region_on_peer(®ion_route, to_peer)? { - Ok(Box::new(RegionMigrationEnd)) + Ok((Box::new(RegionMigrationEnd), Status::Done)) } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { - Ok(Box::::default()) + Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true))) } else { - Ok(Box::new(OpenCandidateRegion)) + Ok((Box::new(OpenCandidateRegion), Status::executing(true))) } } @@ -138,6 +146,7 @@ mod tests { use super::*; use crate::error::Error; use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; fn new_persistent_context() -> PersistentContext { @@ -216,12 +225,11 @@ mod tests { .await .unwrap(); - let next = state.next(&mut ctx).await.unwrap(); + let (next, _) = state.next(&mut ctx).await.unwrap(); - let _ = next - .as_any() - .downcast_ref::() - .unwrap(); + let update_metadata = next.as_any().downcast_ref::().unwrap(); + + assert_matches!(update_metadata, UpdateMetadata::Downgrade); } #[tokio::test] @@ -250,7 +258,7 @@ mod tests { .await .unwrap(); - let next = state.next(&mut ctx).await.unwrap(); + let (next, _) = state.next(&mut ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); } @@ -277,7 +285,7 @@ mod tests { .await .unwrap(); - let next = state.next(&mut ctx).await.unwrap(); + let (next, _) = state.next(&mut ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index bd92217d8722..450e9b7e4464 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -21,6 +21,7 @@ use common_meta::ddl::utils::region_storage_path; use common_meta::distributed_time_constants::MAILBOX_RTT_SECS; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::RegionIdent; +use common_procedure::Status; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -38,11 +39,14 @@ pub struct OpenCandidateRegion; #[async_trait::async_trait] #[typetag::serde] impl State for OpenCandidateRegion { - async fn next(&mut self, ctx: &mut Context) -> Result> { + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { let instruction = self.build_open_region_instruction(ctx).await?; self.open_candidate_region(ctx, instruction).await?; - Ok(Box::::default()) + Ok(( + Box::::default(), + Status::executing(false), + )) } fn as_any(&self) -> &dyn Any { @@ -430,7 +434,7 @@ mod tests { send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None))); - let next = state.next(&mut ctx).await.unwrap(); + let (next, _) = state.next(&mut ctx).await.unwrap(); let vc = ctx.volatile_ctx; assert_eq!( vc.opening_region_guard.unwrap().info(), diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 753234985b1b..d1d169b8c298 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -12,24 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::sync::Arc; use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader}; -use common_meta::instruction::{InstructionReply, SimpleReply}; +use common_meta::instruction::{ + DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, +}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; +use common_meta::rpc::router::RegionRoute; use common_meta::sequence::Sequence; -use common_procedure::{Context as ProcedureContext, ProcedureId}; +use common_meta::DatanodeId; +use common_procedure::{Context as ProcedureContext, ProcedureId, Status}; use common_procedure_test::MockContextProvider; +use common_telemetry::debug; use common_time::util::current_time_millis; +use futures::future::BoxFuture; use store_api::storage::RegionId; +use table::metadata::RawTableInfo; use tokio::sync::mpsc::{Receiver, Sender}; -use super::ContextFactoryImpl; +use super::upgrade_candidate_region::UpgradeCandidateRegion; +use super::{Context, ContextFactory, ContextFactoryImpl, State, VolatileContext}; use crate::error::Result; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; +use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; +use crate::procedure::region_migration::migration_end::RegionMigrationEnd; +use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::PersistentContext; use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef}; use crate::service::mailbox::{Channel, MailboxRef}; @@ -147,11 +159,59 @@ pub fn new_close_region_reply(id: u64) -> MailboxMessage { } } +/// Generates a [InstructionReply::DowngradeRegion] reply. +pub fn new_downgrade_region_reply( + id: u64, + last_entry_id: Option, + exist: bool, + error: Option, +) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id, + exists: exist, + error, + })) + .unwrap(), + )), + } +} + +/// Generates a [InstructionReply::UpgradeRegion] reply. +pub fn new_upgrade_region_reply( + id: u64, + ready: bool, + exists: bool, + error: Option, +) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply { + ready, + exists, + error, + })) + .unwrap(), + )), + } +} + /// Sends a mock reply. pub fn send_mock_reply( mailbox: MailboxRef, mut rx: MockHeartbeatReceiver, - msg: impl FnOnce(u64) -> Result + Send + 'static, + msg: impl Fn(u64) -> Result + Send + 'static, ) { common_runtime::spawn_bg(async move { let resp = rx.recv().await.unwrap().unwrap(); @@ -169,3 +229,300 @@ pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> Persis cluster_id: 0, } } + +/// The test suite for region migration procedure. +pub(crate) struct ProcedureMigrationTestSuite { + pub(crate) env: TestingEnv, + context: Context, + state: Box, +} + +/// The hook is called before the test starts. +pub(crate) type BeforeTest = + Arc BoxFuture<'_, ()> + Send + Sync>; + +/// Custom assertion. +pub(crate) type CustomAssertion = Arc< + dyn Fn( + &mut ProcedureMigrationTestSuite, + Result<(Box, Status)>, + ) -> BoxFuture<'_, Result<()>> + + Send + + Sync, +>; + +/// State assertion function. +pub(crate) type StateAssertion = Arc; + +/// Status assertion function. +pub(crate) type StatusAssertion = Arc; + +// TODO(weny): Remove it. +#[allow(dead_code)] +/// The type of assertion. +#[derive(Clone)] +pub(crate) enum Assertion { + Simple(StateAssertion, StatusAssertion), + Custom(CustomAssertion), +} + +impl Assertion { + /// Returns an [Assertion::Simple]. + pub(crate) fn simple< + T: Fn(&dyn State) + Send + Sync + 'static, + U: Fn(Status) + Send + Sync + 'static, + >( + state: T, + status: U, + ) -> Self { + Self::Simple(Arc::new(state), Arc::new(status)) + } +} + +impl ProcedureMigrationTestSuite { + /// Returns a [ProcedureMigrationTestSuite]. + pub(crate) fn new(persistent_ctx: PersistentContext, start: Box) -> Self { + let env = TestingEnv::new(); + let context = env.context_factory().new_context(persistent_ctx); + + Self { + env, + context, + state: start, + } + } + + /// Mocks the `next` of [State] is called. + pub(crate) async fn next( + &mut self, + name: &str, + before: Option, + assertion: Assertion, + ) -> Result<()> { + debug!("suite test: {name}"); + + if let Some(before) = before { + before(self).await; + } + + debug!("suite test: {name} invoking next"); + let result = self.state.next(&mut self.context).await; + + match assertion { + Assertion::Simple(state_assert, status_assert) => { + let (next, status) = result?; + state_assert(&*next); + status_assert(status); + self.state = next; + } + Assertion::Custom(assert_fn) => { + assert_fn(self, result); + } + } + + Ok(()) + } + + /// Initializes table metadata. + pub(crate) async fn init_table_metadata( + &self, + table_info: RawTableInfo, + region_routes: Vec, + ) { + self.env + .table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + } + + /// Verifies table metadata after region migration. + pub(crate) async fn verify_table_metadata(&self) { + let region_id = self.context.persistent_ctx.region_id; + let region_routes = self + .env + .table_metadata_manager + .table_route_manager() + .get(region_id.table_id()) + .await + .unwrap() + .unwrap() + .into_inner() + .region_routes; + + let expected_leader_id = self.context.persistent_ctx.to_peer.id; + let removed_follower_id = self.context.persistent_ctx.from_peer.id; + + let region_route = region_routes + .into_iter() + .find(|route| route.region.id == region_id) + .unwrap(); + + assert!(!region_route.is_leader_downgraded()); + assert_eq!(region_route.leader_peer.unwrap().id, expected_leader_id); + assert!(!region_route + .follower_peers + .into_iter() + .any(|route| route.id == removed_follower_id)) + } +} + +/// The step of test. +#[derive(Clone)] +pub enum Step { + Setup((String, BeforeTest)), + Next((String, Option, Assertion)), +} + +impl Step { + /// Returns the [Step::Setup]. + pub(crate) fn setup(name: &str, before: BeforeTest) -> Self { + Self::Setup((name.to_string(), before)) + } + + /// Returns the [Step::Next]. + pub(crate) fn next(name: &str, before: Option, assertion: Assertion) -> Self { + Self::Next((name.to_string(), before, assertion)) + } +} + +/// The test runner of [ProcedureMigrationTestSuite]. +pub(crate) struct ProcedureMigrationSuiteRunner { + pub(crate) suite: ProcedureMigrationTestSuite, + steps: Vec, +} + +impl ProcedureMigrationSuiteRunner { + /// Returns the [ProcedureMigrationSuiteRunner] + pub(crate) fn new(suite: ProcedureMigrationTestSuite) -> Self { + Self { + suite, + steps: vec![], + } + } + + /// Sets [Step]s . + pub(crate) fn steps(self, steps: Vec) -> Self { + Self { + suite: self.suite, + steps, + } + } + + /// Consumes all steps and runs once. + pub(crate) async fn run_once(mut self) -> Self { + for step in self.steps.drain(..) { + match step { + Step::Setup((name, before)) => { + debug!("Running the before hook: {name}"); + before(&mut self.suite).await; + } + Step::Next((name, before, assertion)) => { + self.suite.next(&name, before, assertion).await.unwrap(); + } + } + } + + self + } +} + +/// Asserts the [Status] needs to be persistent. +pub(crate) fn assert_need_persist(status: Status) { + assert!(status.need_persist()); +} + +/// Asserts the [Status] doesn't need to be persistent. +pub(crate) fn assert_no_persist(status: Status) { + assert!(!status.need_persist()); +} + +/// Asserts the [Status] should be [Status::Done]. +pub(crate) fn assert_done(status: Status) { + assert_matches!(status, Status::Done) +} + +/// Asserts the [State] should be [UpdateMetadata::Downgrade]. +pub(crate) fn assert_update_metadata_downgrade(next: &dyn State) { + let state = next.as_any().downcast_ref::().unwrap(); + assert_matches!(state, UpdateMetadata::Downgrade); +} + +/// Asserts the [State] should be [UpdateMetadata::Upgrade]. +pub(crate) fn assert_update_metadata_upgrade(next: &dyn State) { + let state = next.as_any().downcast_ref::().unwrap(); + assert_matches!(state, UpdateMetadata::Upgrade); +} + +/// Asserts the [State] should be [RegionMigrationEnd]. +pub(crate) fn assert_region_migration_end(next: &dyn State) { + let _ = next.as_any().downcast_ref::().unwrap(); +} + +/// Asserts the [State] should be [DowngradeLeaderRegion]. +pub(crate) fn assert_downgrade_leader_region(next: &dyn State) { + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); +} + +/// Asserts the [State] should be [UpgradeCandidateRegion]. +pub(crate) fn assert_upgrade_candidate_region(next: &dyn State) { + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); +} + +/// Mocks the reply from the datanode. +pub(crate) fn mock_datanode_reply( + peer_id: DatanodeId, + msg: Arc Result + Send + Sync>, +) -> BeforeTest { + Arc::new(move |suite| { + let msg_moved = msg.clone(); + Box::pin(async move { + let mailbox_ctx = suite.env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(peer_id), tx) + .await; + + send_mock_reply(mailbox, rx, move |id| msg_moved(id)); + }) + }) +} + +/// Setups the [State] of the [ProcedureMigrationTestSuite]. +pub(crate) fn setup_state( + state_factory: Arc Box + Send + Sync>, +) -> BeforeTest { + Arc::new(move |suite| { + let factory_moved = state_factory.clone(); + Box::pin(async move { + suite.state = factory_moved(); + }) + }) +} + +/// Setups the [VolatileContext] of the [Context]. +pub(crate) fn reset_volatile_ctx(suite: &mut ProcedureMigrationTestSuite) -> BoxFuture<'_, ()> { + Box::pin(async { + suite.context.volatile_ctx = VolatileContext::default(); + }) +} + +/// Merges the batch of [BeforeTest]. +pub(crate) fn merge_before_test_fn(hooks: Vec) -> BeforeTest { + Arc::new(move |suite| { + let hooks_moved = hooks.clone(); + Box::pin(async move { + for hook in hooks_moved { + hook(suite).await; + } + }) + }) +} diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index a0ea0fa1f3b9..90b60621a407 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -18,6 +18,7 @@ pub(crate) mod upgrade_candidate_region; use std::any::Any; +use common_procedure::Status; use common_telemetry::warn; use serde::{Deserialize, Serialize}; @@ -41,12 +42,15 @@ pub enum UpdateMetadata { #[async_trait::async_trait] #[typetag::serde] impl State for UpdateMetadata { - async fn next(&mut self, ctx: &mut Context) -> Result> { + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { match self { UpdateMetadata::Downgrade => { self.downgrade_leader_region(ctx).await?; - Ok(Box::::default()) + Ok(( + Box::::default(), + Status::executing(false), + )) } UpdateMetadata::Upgrade => { self.upgrade_candidate_region(ctx).await?; @@ -54,7 +58,7 @@ impl State for UpdateMetadata { if let Err(err) = ctx.invalidate_table_cache().await { warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}"); }; - Ok(Box::new(RegionMigrationEnd)) + Ok((Box::new(RegionMigrationEnd), Status::Done)) } UpdateMetadata::Rollback => { self.rollback_downgraded_region(ctx).await?; @@ -62,9 +66,12 @@ impl State for UpdateMetadata { if let Err(err) = ctx.invalidate_table_cache().await { warn!("Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}"); }; - Ok(Box::new(RegionMigrationAbort::new( - "Failed to upgrade the candidate region.", - ))) + Ok(( + Box::new(RegionMigrationAbort::new( + "Failed to upgrade the candidate region.", + )), + Status::executing(false), + )) } } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 7ff5e59942b7..3bc665652ab9 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -38,13 +38,19 @@ impl UpdateMetadata { /// - There is no other DDL procedure executed concurrently for the current table. pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> { let table_metadata_manager = ctx.table_metadata_manager.clone(); + let from_peer_id = ctx.persistent_ctx.from_peer.id; let region_id = ctx.region_id(); let table_id = region_id.table_id(); let current_table_route_value = ctx.get_table_route_value().await?; if let Err(err) = table_metadata_manager .update_leader_region_status(table_id, current_table_route_value, |route| { - if route.region.id == region_id { + if route.region.id == region_id + && route + .leader_peer + .as_ref() + .is_some_and(|leader_peer| leader_peer.id == from_peer_id) + { Some(Some(RegionStatus::Downgraded)) } else { None @@ -167,6 +173,48 @@ mod tests { assert!(err.to_string().contains("Failed to update the table route")); } + #[tokio::test] + async fn test_only_downgrade_from_peer() { + let mut state = Box::new(UpdateMetadata::Downgrade); + let persistent_context = new_persistent_context(); + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let table_id = ctx.region_id().table_id(); + + let table_info = new_test_table_info(1024, vec![1, 2]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(1024)), + ..Default::default() + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let (next, _) = state.next(&mut ctx).await.unwrap(); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + + let latest_table_route = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap(); + + // It should remain unchanged. + assert_eq!(latest_table_route.version(), 0); + assert!(!latest_table_route.region_routes[0].is_leader_downgraded()); + assert!(ctx.volatile_ctx.table_route.is_none()); + } + #[tokio::test] async fn test_next_downgrade_leader_region_state() { let mut state = Box::new(UpdateMetadata::Downgrade); @@ -190,7 +238,7 @@ mod tests { .await .unwrap(); - let next = state.next(&mut ctx).await.unwrap(); + let (next, _) = state.next(&mut ctx).await.unwrap(); let _ = next .as_any() diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index b9aed08ad678..603850b60d58 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -219,7 +219,7 @@ mod tests { .await .unwrap(); - let next = state.next(&mut ctx).await.unwrap(); + let (next, _) = state.next(&mut ctx).await.unwrap(); let _ = next .as_any() diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 22c732815ea1..eed43ee4a999 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -76,6 +76,40 @@ impl UpdateMetadata { Ok(region_routes) } + /// Returns true if region metadata has been updated. + async fn check_metadata_updated(&self, ctx: &mut Context) -> Result { + let region_id = ctx.region_id(); + let table_route_value = ctx.get_table_route_value().await?.clone(); + + let region_routes = table_route_value.region_routes.clone(); + let region_route = region_routes + .into_iter() + .find(|route| route.region.id == region_id) + .context(error::RegionRouteNotFoundSnafu { region_id })?; + + let leader_peer = region_route + .leader_peer + .as_ref() + .context(error::UnexpectedSnafu { + violated: format!("The leader peer of region {region_id} is not found during the update metadata for upgrading"), + })?; + + let candidate_peer_id = ctx.persistent_ctx.to_peer.id; + + if leader_peer.id == candidate_peer_id { + ensure!( + !region_route.is_leader_downgraded(), + error::UnexpectedSnafu { + violated: format!("Unexpected intermediate state is found during the update metadata for upgrading region {region_id}"), + } + ); + + Ok(true) + } else { + Ok(false) + } + } + /// Upgrades the candidate region. /// /// Abort(non-retry): @@ -89,6 +123,10 @@ impl UpdateMetadata { let region_id = ctx.region_id(); let table_metadata_manager = ctx.table_metadata_manager.clone(); + if self.check_metadata_updated(ctx).await? { + return Ok(()); + } + let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?; let table_info_value = ctx.get_table_info_value().await?; @@ -325,6 +363,85 @@ mod tests { assert!(err.to_string().contains("Failed to update the table route")); } + #[tokio::test] + async fn test_check_metadata() { + let state = UpdateMetadata::Upgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let leader_peer = persistent_context.from_peer.clone(); + + let mut ctx = env.context_factory().new_context(persistent_context); + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(leader_peer), + follower_peers: vec![Peer::empty(2), Peer::empty(3)], + leader_status: None, + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let updated = state.check_metadata_updated(&mut ctx).await.unwrap(); + assert!(!updated); + } + + #[tokio::test] + async fn test_check_metadata_updated() { + let state = UpdateMetadata::Upgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let candidate_peer = persistent_context.to_peer.clone(); + + let mut ctx = env.context_factory().new_context(persistent_context); + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(candidate_peer), + follower_peers: vec![Peer::empty(2), Peer::empty(3)], + leader_status: None, + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let updated = state.check_metadata_updated(&mut ctx).await.unwrap(); + assert!(updated); + } + + #[tokio::test] + async fn test_check_metadata_intermediate_state() { + let state = UpdateMetadata::Upgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let candidate_peer = persistent_context.to_peer.clone(); + + let mut ctx = env.context_factory().new_context(persistent_context); + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(candidate_peer), + follower_peers: vec![Peer::empty(2), Peer::empty(3)], + leader_status: Some(RegionStatus::Downgraded), + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let err = state.check_metadata_updated(&mut ctx).await.unwrap_err(); + assert_matches!(err, Error::Unexpected { .. }); + assert!(err.to_string().contains("intermediate state")); + } + #[tokio::test] async fn test_next_migration_end_state() { let mut state = Box::new(UpdateMetadata::Upgrade); @@ -353,7 +470,7 @@ mod tests { .await .unwrap(); - let next = state.next(&mut ctx).await.unwrap(); + let (next, _) = state.next(&mut ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index e3eb6e2f1b9d..31266fe3324d 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -18,6 +18,7 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_meta::distributed_time_constants::MAILBOX_RTT_SECS; use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply}; +use common_procedure::Status; use common_telemetry::warn; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -56,11 +57,11 @@ impl Default for UpgradeCandidateRegion { #[async_trait::async_trait] #[typetag::serde] impl State for UpgradeCandidateRegion { - async fn next(&mut self, ctx: &mut Context) -> Result> { + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { if self.upgrade_region_with_retry(ctx).await { - Ok(Box::new(UpdateMetadata::Upgrade)) + Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false))) } else { - Ok(Box::new(UpdateMetadata::Rollback)) + Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false))) } } @@ -219,15 +220,13 @@ impl UpgradeCandidateRegion { mod tests { use std::assert_matches::assert_matches; - use api::v1::meta::mailbox_message::Payload; use common_meta::peer::Peer; - use common_time::util::current_time_millis; use store_api::storage::RegionId; use super::*; use crate::error::Error; use crate::procedure::region_migration::test_util::{ - new_close_region_reply, send_mock_reply, TestingEnv, + new_close_region_reply, new_upgrade_region_reply, send_mock_reply, TestingEnv, }; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; @@ -240,31 +239,6 @@ mod tests { } } - fn new_upgrade_region_reply( - id: u64, - ready: bool, - - exists: bool, - - error: Option, - ) -> MailboxMessage { - MailboxMessage { - id, - subject: "mock".to_string(), - from: "datanode".to_string(), - to: "meta".to_string(), - timestamp_millis: current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready, - exists, - error, - })) - .unwrap(), - )), - } - } - #[tokio::test] async fn test_datanode_is_unreachable() { let state = UpgradeCandidateRegion::default(); @@ -495,7 +469,7 @@ mod tests { .unwrap(); }); - let next = state.next(&mut ctx).await.unwrap(); + let (next, _) = state.next(&mut ctx).await.unwrap(); let update_metadata = next.as_any().downcast_ref::().unwrap(); @@ -554,7 +528,7 @@ mod tests { .unwrap(); }); - let next = state.next(&mut ctx).await.unwrap(); + let (next, _) = state.next(&mut ctx).await.unwrap(); let update_metadata = next.as_any().downcast_ref::().unwrap(); assert_matches!(update_metadata, UpdateMetadata::Rollback);