Skip to content

Commit

Permalink
test: add simple region migration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 4, 2023
1 parent 50bbecb commit 18ad8d8
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 68 deletions.
8 changes: 8 additions & 0 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ impl TableRouteValue {
version: self.version + 1,
}
}

/// Returns the version.
///
/// For test purpose.
#[cfg(any(tets, feature = "testing"))]
pub fn version(&self) -> u64 {
self.version
}
}

impl TableMetaKey for TableRouteKey {
Expand Down
151 changes: 148 additions & 3 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ pub(crate) mod migration_abort;
pub(crate) mod migration_end;
pub(crate) mod migration_start;
pub(crate) mod open_candidate_region;
// TODO(weny): remove it.
#[allow(unused)]
#[cfg(test)]
pub(crate) mod test_util;
pub(crate) mod update_metadata;
Expand Down Expand Up @@ -366,11 +364,17 @@ impl Procedure for RegionMigrationProcedure {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;

use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::{Region, RegionRoute};

use super::migration_end::RegionMigrationEnd;
use super::update_metadata::UpdateMetadata;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::test_util::*;
use crate::service::mailbox::Channel;

fn new_persistent_context() -> PersistentContext {
Expand Down Expand Up @@ -501,4 +505,145 @@ mod tests {
let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024));
}

fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
vec![
// MigrationStart
Step::next(
"Should be the update metadata for downgrading",
None,
Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
),
// UpdateMetadata::Downgrade
Step::next(
"Should be the downgrade leader region",
None,
Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
),
// Downgrade Candidate
Step::next(
"Should be the upgrade candidate region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
)),
Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
),
// Upgrade Candidate
Step::next(
"Should be the update metadata for upgrading",
Some(mock_datanode_reply(
to_peer_id,
Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
)),
Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
),
// UpdateMetadata::Upgrade
Step::next(
"Should be the region migration end",
None,
Assertion::simple(assert_region_migration_end, assert_done),
),
// RegionMigrationEnd
Step::next(
"Should be the region migration end again",
None,
Assertion::simple(assert_region_migration_end, assert_done),
),
]
}

#[tokio::test]
async fn test_procedure_flow() {
common_telemetry::init_default_ut_logging();

let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let state = Box::new(RegionMigrationStart);

// The table metadata.
let from_peer_id = persistent_context.from_peer.id;
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![to_peer],
..Default::default()
}];

let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
suite.init_table_metadata(table_info, region_routes).await;

let steps = procedure_flow_steps(from_peer_id, to_peer_id);
let timer = Instant::now();

// Run the table tests.
let runner = ProcedureMigrationSuiteRunner::new(suite)
.steps(steps)
.run_once()
.await;

// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);

runner.suite.verify_table_metadata().await;
}

#[tokio::test]
async fn test_procedure_flow_idempotent() {
common_telemetry::init_default_ut_logging();

let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let state = Box::new(RegionMigrationStart);

// The table metadata.
let from_peer_id = persistent_context.from_peer.id;
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![to_peer],
..Default::default()
}];

let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
suite.init_table_metadata(table_info, region_routes).await;

let steps = procedure_flow_steps(from_peer_id, to_peer_id);
let setup_to_latest_persisted_state = Step::setup(
"Sets state to UpdateMetadata::Downgrade",
merge_before_test_fn(vec![
setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
Arc::new(reset_volatile_ctx),
]),
);

let steps = [
steps.clone(),
vec![setup_to_latest_persisted_state.clone()],
steps.clone()[1..].to_vec(),
vec![setup_to_latest_persisted_state],
steps.clone()[1..].to_vec(),
]
.concat();
let timer = Instant::now();

// Run the table tests.
let runner = ProcedureMigrationSuiteRunner::new(suite)
.steps(steps.clone())
.run_once()
.await;

// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);

runner.suite.verify_table_metadata().await;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
use common_procedure::Status;
use common_telemetry::warn;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tokio::time::sleep;
Expand Down Expand Up @@ -62,6 +62,10 @@ impl State for DowngradeLeaderRegion {

// Safety: must exist.
if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
info!(
"Running into the downgrade leader slow path, sleep until {:?}",
deadline
);
tokio::time::sleep_until(*deadline).await;
}

Expand Down Expand Up @@ -206,16 +210,14 @@ impl DowngradeLeaderRegion {
mod tests {
use std::assert_matches::assert_matches;

use api::v1::meta::mailbox_message::Payload;
use common_meta::peer::Peer;
use common_time::util::current_time_millis;
use store_api::storage::RegionId;
use tokio::time::Instant;

use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{
new_close_region_reply, send_mock_reply, TestingEnv,
new_close_region_reply, new_downgrade_region_reply, send_mock_reply, TestingEnv,
};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};

Expand All @@ -228,29 +230,6 @@ mod tests {
}
}

fn new_downgrade_region_reply(
id: u64,
last_entry_id: Option<u64>,
exist: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists: exist,
error,
}))
.unwrap(),
)),
}
}

#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = DowngradeLeaderRegion::default();
Expand Down
22 changes: 13 additions & 9 deletions src/meta-srv/src/procedure/region_migration/migration_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@ use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::storage::RegionId;

use super::downgrade_leader_region::DowngradeLeaderRegion;
use super::migration_end::RegionMigrationEnd;
use super::open_candidate_region::OpenCandidateRegion;
use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::procedure::region_migration::{Context, State};

/// The behaviors:
///
/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state.
///
/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
///
/// Otherwise go to the [OpenCandidateRegion] state.
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationStart;

Expand All @@ -35,11 +42,11 @@ pub struct RegionMigrationStart;
impl State for RegionMigrationStart {
/// Yields next [State].
///
/// If the expected leader region has been opened on `to_peer`, go to the MigrationEnd state.
/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state.
///
/// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state.
/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
///
/// Otherwise go to the OpenCandidateRegion state.
/// Otherwise go to the [OpenCandidateRegion] state.
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let region_id = ctx.persistent_ctx.region_id;
let region_route = self.retrieve_region_route(ctx, region_id).await?;
Expand All @@ -48,12 +55,9 @@ impl State for RegionMigrationStart {
if self.check_leader_region_on_peer(&region_route, to_peer)? {
Ok((Box::new(RegionMigrationEnd), Status::Done))
} else if self.check_candidate_region_on_peer(&region_route, to_peer) {
Ok((
Box::<DowngradeLeaderRegion>::default(),
Status::executing(false),
))
Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true)))
} else {
Ok((Box::new(OpenCandidateRegion), Status::executing(false)))
Ok((Box::new(OpenCandidateRegion), Status::executing(true)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,19 @@ impl UpdateMetadata {
/// - There is no other DDL procedure executed concurrently for the current table.
pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> {
let table_metadata_manager = ctx.table_metadata_manager.clone();
let from_peer_id = ctx.persistent_ctx.from_peer.id;
let region_id = ctx.region_id();
let table_id = region_id.table_id();
let current_table_route_value = ctx.get_table_route_value().await?;

if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, current_table_route_value, |route| {
if route.region.id == region_id {
if route.region.id == region_id
&& route
.leader_peer
.as_ref()
.is_some_and(|leader_peer| leader_peer.id == from_peer_id)
{
Some(Some(RegionStatus::Downgraded))
} else {
None
Expand Down Expand Up @@ -167,6 +173,48 @@ mod tests {
assert!(err.to_string().contains("Failed to update the table route"));
}

#[tokio::test]
async fn test_only_downgrade_from_peer() {
let mut state = Box::new(UpdateMetadata::Downgrade);
let persistent_context = new_persistent_context();

let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.region_id().table_id();

let table_info = new_test_table_info(1024, vec![1, 2]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(1024)),
..Default::default()
}];

let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();

let (next, _) = state.next(&mut ctx).await.unwrap();

let _ = next
.as_any()
.downcast_ref::<DowngradeLeaderRegion>()
.unwrap();

let latest_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap();

// It should remain unchanged.
assert_eq!(latest_table_route.version(), 0);
assert!(!latest_table_route.region_routes[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}

#[tokio::test]
async fn test_next_downgrade_leader_region_state() {
let mut state = Box::new(UpdateMetadata::Downgrade);
Expand Down
Loading

0 comments on commit 18ad8d8

Please sign in to comment.