diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 65178d2f0157..bbdd1c8e7eab 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -56,6 +56,14 @@ impl TableRouteValue { version: self.version + 1, } } + + /// Returns the version. + /// + /// For test purpose. + #[cfg(any(tets, feature = "testing"))] + pub fn version(&self) -> u64 { + self.version + } } impl TableMetaKey for TableRouteKey { diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 4bb4551776f3..4f53deb41dea 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -17,8 +17,6 @@ pub(crate) mod migration_abort; pub(crate) mod migration_end; pub(crate) mod migration_start; pub(crate) mod open_candidate_region; -// TODO(weny): remove it. -#[allow(unused)] #[cfg(test)] pub(crate) mod test_util; pub(crate) mod update_metadata; @@ -366,11 +364,17 @@ impl Procedure for RegionMigrationProcedure { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::sync::Arc; + + use common_meta::distributed_time_constants::REGION_LEASE_SECS; + use common_meta::key::test_utils::new_test_table_info; + use common_meta::rpc::router::{Region, RegionRoute}; use super::migration_end::RegionMigrationEnd; + use super::update_metadata::UpdateMetadata; use super::*; use crate::handler::HeartbeatMailbox; - use crate::procedure::region_migration::test_util::TestingEnv; + use crate::procedure::region_migration::test_util::*; use crate::service::mailbox::Channel; fn new_persistent_context() -> PersistentContext { @@ -501,4 +505,145 @@ mod tests { let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap(); assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024)); } + + fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec { + vec![ + // MigrationStart + Step::next( + "Should be the update metadata for downgrading", + None, + Assertion::simple(assert_update_metadata_downgrade, assert_need_persist), + ), + // UpdateMetadata::Downgrade + Step::next( + "Should be the downgrade leader region", + None, + Assertion::simple(assert_downgrade_leader_region, assert_no_persist), + ), + // Downgrade Candidate + Step::next( + "Should be the upgrade candidate region", + Some(mock_datanode_reply( + from_peer_id, + Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))), + )), + Assertion::simple(assert_upgrade_candidate_region, assert_no_persist), + ), + // Upgrade Candidate + Step::next( + "Should be the update metadata for upgrading", + Some(mock_datanode_reply( + to_peer_id, + Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))), + )), + Assertion::simple(assert_update_metadata_upgrade, assert_no_persist), + ), + // UpdateMetadata::Upgrade + Step::next( + "Should be the region migration end", + None, + Assertion::simple(assert_region_migration_end, assert_done), + ), + // RegionMigrationEnd + Step::next( + "Should be the region migration end again", + None, + Assertion::simple(assert_region_migration_end, assert_done), + ), + ] + } + + #[tokio::test] + async fn test_procedure_flow() { + common_telemetry::init_default_ut_logging(); + + let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); + let state = Box::new(RegionMigrationStart); + + // The table metadata. + let from_peer_id = persistent_context.from_peer.id; + let to_peer_id = persistent_context.to_peer.id; + let from_peer = persistent_context.from_peer.clone(); + let to_peer = persistent_context.to_peer.clone(); + let region_id = persistent_context.region_id; + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(from_peer), + follower_peers: vec![to_peer], + ..Default::default() + }]; + + let suite = ProcedureMigrationTestSuite::new(persistent_context, state); + suite.init_table_metadata(table_info, region_routes).await; + + let steps = procedure_flow_steps(from_peer_id, to_peer_id); + let timer = Instant::now(); + + // Run the table tests. + let runner = ProcedureMigrationSuiteRunner::new(suite) + .steps(steps) + .run_once() + .await; + + // Ensure it didn't run into the slow path. + assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2); + + runner.suite.verify_table_metadata().await; + } + + #[tokio::test] + async fn test_procedure_flow_idempotent() { + common_telemetry::init_default_ut_logging(); + + let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); + let state = Box::new(RegionMigrationStart); + + // The table metadata. + let from_peer_id = persistent_context.from_peer.id; + let to_peer_id = persistent_context.to_peer.id; + let from_peer = persistent_context.from_peer.clone(); + let to_peer = persistent_context.to_peer.clone(); + let region_id = persistent_context.region_id; + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(from_peer), + follower_peers: vec![to_peer], + ..Default::default() + }]; + + let suite = ProcedureMigrationTestSuite::new(persistent_context, state); + suite.init_table_metadata(table_info, region_routes).await; + + let steps = procedure_flow_steps(from_peer_id, to_peer_id); + let setup_to_latest_persisted_state = Step::setup( + "Sets state to UpdateMetadata::Downgrade", + merge_before_test_fn(vec![ + setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))), + Arc::new(reset_volatile_ctx), + ]), + ); + + let steps = [ + steps.clone(), + vec![setup_to_latest_persisted_state.clone()], + steps.clone()[1..].to_vec(), + vec![setup_to_latest_persisted_state], + steps.clone()[1..].to_vec(), + ] + .concat(); + let timer = Instant::now(); + + // Run the table tests. + let runner = ProcedureMigrationSuiteRunner::new(suite) + .steps(steps.clone()) + .run_once() + .await; + + // Ensure it didn't run into the slow path. + assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2); + + runner.suite.verify_table_metadata().await; + } } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index e37bdb443658..7982b4fd8910 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -21,7 +21,7 @@ use common_meta::instruction::{ DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, }; use common_procedure::Status; -use common_telemetry::warn; +use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use tokio::time::sleep; @@ -62,6 +62,10 @@ impl State for DowngradeLeaderRegion { // Safety: must exist. if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() { + info!( + "Running into the downgrade leader slow path, sleep until {:?}", + deadline + ); tokio::time::sleep_until(*deadline).await; } @@ -206,16 +210,14 @@ impl DowngradeLeaderRegion { mod tests { use std::assert_matches::assert_matches; - use api::v1::meta::mailbox_message::Payload; use common_meta::peer::Peer; - use common_time::util::current_time_millis; use store_api::storage::RegionId; use tokio::time::Instant; use super::*; use crate::error::Error; use crate::procedure::region_migration::test_util::{ - new_close_region_reply, send_mock_reply, TestingEnv, + new_close_region_reply, new_downgrade_region_reply, send_mock_reply, TestingEnv, }; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; @@ -228,29 +230,6 @@ mod tests { } } - fn new_downgrade_region_reply( - id: u64, - last_entry_id: Option, - exist: bool, - error: Option, - ) -> MailboxMessage { - MailboxMessage { - id, - subject: "mock".to_string(), - from: "datanode".to_string(), - to: "meta".to_string(), - timestamp_millis: current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id, - exists: exist, - error, - })) - .unwrap(), - )), - } - } - #[tokio::test] async fn test_datanode_is_unreachable() { let state = DowngradeLeaderRegion::default(); diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 23417ba4f60a..80475904e6f3 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -21,12 +21,19 @@ use serde::{Deserialize, Serialize}; use snafu::OptionExt; use store_api::storage::RegionId; -use super::downgrade_leader_region::DowngradeLeaderRegion; use super::migration_end::RegionMigrationEnd; use super::open_candidate_region::OpenCandidateRegion; +use super::update_metadata::UpdateMetadata; use crate::error::{self, Result}; use crate::procedure::region_migration::{Context, State}; +/// The behaviors: +/// +/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state. +/// +/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state. +/// +/// Otherwise go to the [OpenCandidateRegion] state. #[derive(Debug, Serialize, Deserialize)] pub struct RegionMigrationStart; @@ -35,11 +42,11 @@ pub struct RegionMigrationStart; impl State for RegionMigrationStart { /// Yields next [State]. /// - /// If the expected leader region has been opened on `to_peer`, go to the MigrationEnd state. + /// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state. /// - /// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state. + /// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state. /// - /// Otherwise go to the OpenCandidateRegion state. + /// Otherwise go to the [OpenCandidateRegion] state. async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { let region_id = ctx.persistent_ctx.region_id; let region_route = self.retrieve_region_route(ctx, region_id).await?; @@ -48,12 +55,9 @@ impl State for RegionMigrationStart { if self.check_leader_region_on_peer(®ion_route, to_peer)? { Ok((Box::new(RegionMigrationEnd), Status::Done)) } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { - Ok(( - Box::::default(), - Status::executing(false), - )) + Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true))) } else { - Ok((Box::new(OpenCandidateRegion), Status::executing(false))) + Ok((Box::new(OpenCandidateRegion), Status::executing(true))) } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 339bc136c8c0..3bc665652ab9 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -38,13 +38,19 @@ impl UpdateMetadata { /// - There is no other DDL procedure executed concurrently for the current table. pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> { let table_metadata_manager = ctx.table_metadata_manager.clone(); + let from_peer_id = ctx.persistent_ctx.from_peer.id; let region_id = ctx.region_id(); let table_id = region_id.table_id(); let current_table_route_value = ctx.get_table_route_value().await?; if let Err(err) = table_metadata_manager .update_leader_region_status(table_id, current_table_route_value, |route| { - if route.region.id == region_id { + if route.region.id == region_id + && route + .leader_peer + .as_ref() + .is_some_and(|leader_peer| leader_peer.id == from_peer_id) + { Some(Some(RegionStatus::Downgraded)) } else { None @@ -167,6 +173,48 @@ mod tests { assert!(err.to_string().contains("Failed to update the table route")); } + #[tokio::test] + async fn test_only_downgrade_from_peer() { + let mut state = Box::new(UpdateMetadata::Downgrade); + let persistent_context = new_persistent_context(); + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let table_id = ctx.region_id().table_id(); + + let table_info = new_test_table_info(1024, vec![1, 2]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(1024)), + ..Default::default() + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let (next, _) = state.next(&mut ctx).await.unwrap(); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + + let latest_table_route = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap(); + + // It should remain unchanged. + assert_eq!(latest_table_route.version(), 0); + assert!(!latest_table_route.region_routes[0].is_leader_downgraded()); + assert!(ctx.volatile_ctx.table_route.is_none()); + } + #[tokio::test] async fn test_next_downgrade_leader_region_state() { let mut state = Box::new(UpdateMetadata::Downgrade); diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 8114802ea070..dc5feebc7cc5 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -76,6 +76,40 @@ impl UpdateMetadata { Ok(region_routes) } + /// Returns true if region metadata has been updated. + async fn check_metadata_updated(&self, ctx: &mut Context) -> Result { + let region_id = ctx.region_id(); + let table_route_value = ctx.get_table_route_value().await?.clone(); + + let region_routes = table_route_value.region_routes.clone(); + let region_route = region_routes + .into_iter() + .find(|route| route.region.id == region_id) + .context(error::RegionRouteNotFoundSnafu { region_id })?; + + let leader_peer = region_route + .leader_peer + .as_ref() + .context(error::UnexpectedSnafu { + violated: format!("The leader peer of region {region_id} is not found during the update metadata for upgrading"), + })?; + + let candidate_peer_id = ctx.persistent_ctx.to_peer.id; + + if leader_peer.id == candidate_peer_id { + ensure!( + !region_route.is_leader_downgraded(), + error::UnexpectedSnafu { + violated: "Unexpected intermediate state is found during the update metadata for upgrading", + } + ); + + Ok(true) + } else { + Ok(false) + } + } + /// Upgrades the candidate region. /// /// Abort(non-retry): @@ -89,6 +123,10 @@ impl UpdateMetadata { let region_id = ctx.region_id(); let table_metadata_manager = ctx.table_metadata_manager.clone(); + if self.check_metadata_updated(ctx).await? { + return Ok(()); + } + let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?; let table_info_value = ctx.get_table_info_value().await?; @@ -325,6 +363,85 @@ mod tests { assert!(err.to_string().contains("Failed to update the table route")); } + #[tokio::test] + async fn test_check_metadata() { + let state = UpdateMetadata::Upgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let leader_peer = persistent_context.from_peer.clone(); + + let mut ctx = env.context_factory().new_context(persistent_context); + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(leader_peer), + follower_peers: vec![Peer::empty(2), Peer::empty(3)], + leader_status: None, + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let updated = state.check_metadata_updated(&mut ctx).await.unwrap(); + assert!(!updated); + } + + #[tokio::test] + async fn test_check_metadata_updated() { + let state = UpdateMetadata::Upgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let candidate_peer = persistent_context.to_peer.clone(); + + let mut ctx = env.context_factory().new_context(persistent_context); + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(candidate_peer), + follower_peers: vec![Peer::empty(2), Peer::empty(3)], + leader_status: None, + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let updated = state.check_metadata_updated(&mut ctx).await.unwrap(); + assert!(updated); + } + + #[tokio::test] + async fn test_check_metadata_intermediate_state() { + let state = UpdateMetadata::Upgrade; + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(); + let candidate_peer = persistent_context.to_peer.clone(); + + let mut ctx = env.context_factory().new_context(persistent_context); + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(candidate_peer), + follower_peers: vec![Peer::empty(2), Peer::empty(3)], + leader_status: Some(RegionStatus::Downgraded), + }]; + + let table_metadata_manager = env.table_metadata_manager(); + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let err = state.check_metadata_updated(&mut ctx).await.unwrap_err(); + assert_matches!(err, Error::Unexpected { .. }); + assert!(err.to_string().contains("intermediate state")); + } + #[tokio::test] async fn test_next_migration_end_state() { let mut state = Box::new(UpdateMetadata::Upgrade); diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index a3dee7ee8dcb..31266fe3324d 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -220,15 +220,13 @@ impl UpgradeCandidateRegion { mod tests { use std::assert_matches::assert_matches; - use api::v1::meta::mailbox_message::Payload; use common_meta::peer::Peer; - use common_time::util::current_time_millis; use store_api::storage::RegionId; use super::*; use crate::error::Error; use crate::procedure::region_migration::test_util::{ - new_close_region_reply, send_mock_reply, TestingEnv, + new_close_region_reply, new_upgrade_region_reply, send_mock_reply, TestingEnv, }; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; @@ -241,31 +239,6 @@ mod tests { } } - fn new_upgrade_region_reply( - id: u64, - ready: bool, - - exists: bool, - - error: Option, - ) -> MailboxMessage { - MailboxMessage { - id, - subject: "mock".to_string(), - from: "datanode".to_string(), - to: "meta".to_string(), - timestamp_millis: current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready, - exists, - error, - })) - .unwrap(), - )), - } - } - #[tokio::test] async fn test_datanode_is_unreachable() { let state = UpgradeCandidateRegion::default();