Skip to content

Commit

Permalink
feat: enhance the tests for alive keeper
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 4, 2023
1 parent 3223b72 commit f052d99
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,5 @@ client.workspace = true
common-query.workspace = true
common-test-util.workspace = true
datafusion-common.workspace = true
mito2 = { workspace = true, features = ["test"] }
session.workspace = true
45 changes: 33 additions & 12 deletions src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,45 +418,66 @@ impl CountdownTask {

#[cfg(test)]
mod test {
use api::v1::meta::RegionRole;

use mito2::config::MitoConfig;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::region_engine::RegionEngine;

use super::*;
use crate::tests::mock_region_server;

#[tokio::test(flavor = "multi_thread")]
async fn region_alive_keeper() {
let region_server = mock_region_server();
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server, 300));
let region_id = RegionId::new(1, 2);
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper");
let engine = Arc::new(engine_env.create_engine(MitoConfig::default()).await);
region_server.register_engine(engine.clone());

let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), 100));

let region_id = RegionId::new(1024, 1);
let builder = CreateRequestBuilder::new();
region_server
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();
region_server.set_writable(region_id, true).unwrap();

// Register a region before starting.
alive_keeper.register_region(region_id).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());

info!("Start the keeper");
alive_keeper.start(None).await.unwrap();

// The started alive keeper should assign deadline to this region.
let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline >= Instant::now());
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);

info!("Wait for lease expired");
// Sleep to wait lease expired.
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);

info!("Renew the region lease");
// Renew lease then sleep.
alive_keeper
.new_region_leases(
&[GrantedRegion {
region_id: region_id.as_u64(),
role: RegionRole::Leader.into(),
role: api::v1::meta::RegionRole::Leader.into(),
}],
Instant::now() + Duration::from_millis(500),
)
.await;
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline >= Instant::now());

// Sleep to wait lease expired.
tokio::time::sleep(Duration::from_millis(1000)).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -484,7 +505,7 @@ mod test {
// A nearer deadline will be ignored.
countdown_handle
.reset_deadline(
RegionRole::Leader.into(),
RegionRole::Leader,
Instant::now() + Duration::from_millis(heartbeat_interval_millis),
)
.await;
Expand All @@ -496,7 +517,7 @@ mod test {
// Only a farther deadline will be accepted.
countdown_handle
.reset_deadline(
RegionRole::Leader.into(),
RegionRole::Leader,
Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
)
.await;
Expand Down
7 changes: 4 additions & 3 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use store_api::storage::{ColumnId, RegionId};

use crate::config::MitoConfig;
use crate::engine::listener::EventListenerRef;
use crate::engine::MitoEngine;
use crate::engine::{MitoEngine, MITO_ENGINE_NAME};
use crate::error::Result;
use crate::flush::{WriteBufferManager, WriteBufferManagerRef};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
Expand Down Expand Up @@ -278,6 +278,7 @@ pub struct CreateRequestBuilder {
options: HashMap<String, String>,
primary_key: Option<Vec<ColumnId>>,
all_not_null: bool,
engine: String,
}

impl Default for CreateRequestBuilder {
Expand All @@ -289,6 +290,7 @@ impl Default for CreateRequestBuilder {
options: HashMap::new(),
primary_key: None,
all_not_null: false,
engine: MITO_ENGINE_NAME.to_string(),
}
}
}
Expand Down Expand Up @@ -377,8 +379,7 @@ impl CreateRequestBuilder {
});

RegionCreateRequest {
// We use empty engine name as we already locates the engine.
engine: String::new(),
engine: self.engine.to_string(),
column_metadatas,
primary_key: self.primary_key.clone().unwrap_or(primary_key),
options: self.options.clone(),
Expand Down

0 comments on commit f052d99

Please sign in to comment.