From cf6bba09fd2c69c32b702ede3870170180c5af81 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 12 Dec 2023 18:24:17 +0900 Subject: [PATCH] refactor: use downgrading the region instead of closing region (#2863) * refactor: use downgrading the region instead of closing region * feat: enhance the tests for alive keeper * feat: add a metric to track region lease expired * chore: apply suggestions from CR * chore: enable logging for test_distributed_handle_ddl_request * refactor: simplify lease keeper * feat: add metrics for lease keeper * chore: apply suggestions from CR * chore: apply suggestions from CR * chore: apply suggestions from CR * refactor: move OpeningRegionKeeper to common_meta * feat: register operating regions to MemoryRegionKeeper --- Cargo.lock | 1 + src/cmd/src/standalone.rs | 2 + src/common/meta/Cargo.toml | 1 + src/common/meta/src/ddl.rs | 2 + src/common/meta/src/ddl/create_table.rs | 64 +- src/common/meta/src/ddl/drop_table.rs | 52 +- src/common/meta/src/ddl_manager.rs | 7 + src/common/meta/src/error.rs | 34 +- src/common/meta/src/key/table_route.rs | 10 + src/common/meta/src/lib.rs | 1 + src/common/meta/src/region_keeper.rs | 146 ++++ src/common/meta/src/rpc/router.rs | 20 +- src/datanode/src/alive_keeper.rs | 262 ++++--- src/datanode/src/metrics.rs | 7 + .../src/handler/region_lease_handler.rs | 146 ++-- src/meta-srv/src/metasrv.rs | 8 +- src/meta-srv/src/metasrv/builder.rs | 10 +- src/meta-srv/src/metrics.rs | 12 - .../src/procedure/region_migration.rs | 8 +- .../procedure/region_migration/test_util.rs | 10 +- .../upgrade_candidate_region.rs | 6 +- src/meta-srv/src/procedure/tests.rs | 2 +- src/meta-srv/src/procedure/utils.rs | 2 + src/meta-srv/src/region/lease_keeper.rs | 659 +++++++----------- src/meta-srv/src/region/lease_keeper/mito.rs | 122 ---- src/meta-srv/src/region/lease_keeper/utils.rs | 340 --------- src/mito2/src/test_util.rs | 1 - src/store-api/src/region_engine.rs | 10 + tests-integration/src/grpc.rs | 1 + tests-integration/src/standalone.rs | 2 + 30 files changed, 768 insertions(+), 1180 deletions(-) create mode 100644 src/common/meta/src/region_keeper.rs delete mode 100644 src/meta-srv/src/region/lease_keeper/mito.rs delete mode 100644 src/meta-srv/src/region/lease_keeper/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 46dd2146115c..888f34f05245 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1799,6 +1799,7 @@ dependencies = [ "common-telemetry", "common-time", "datatypes", + "derive_builder 0.12.0", "etcd-client", "futures", "humantime-serde", diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 2ebd29f7987d..63b1a8914806 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -23,6 +23,7 @@ use common_meta::ddl::DdlTaskExecutorRef; use common_meta::ddl_manager::DdlManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; +use common_meta::region_keeper::MemoryRegionKeeper; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; @@ -396,6 +397,7 @@ impl StartCommand { Arc::new(DummyCacheInvalidator), table_metadata_manager, Arc::new(StandaloneTableMetadataCreator::new(kv_backend)), + Arc::new(MemoryRegionKeeper::default()), ) .context(InitDdlManagerSnafu)?, ); diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index b645fd75edb9..891076f9149c 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -24,6 +24,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true datatypes.workspace = true +derive_builder.workspace = true etcd-client.workspace = true futures.workspace = true humantime-serde.workspace = true diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 0c3327167037..be3d153c8c9f 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -23,6 +23,7 @@ use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; use crate::error::Result; use crate::key::TableMetadataManagerRef; +use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::router::RegionRoute; @@ -70,4 +71,5 @@ pub struct DdlContext { pub datanode_manager: DatanodeManagerRef, pub cache_invalidator: CacheInvalidatorRef, pub table_metadata_manager: TableMetadataManagerRef, + pub memory_region_keeper: MemoryRegionKeeperRef, } diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 85b4bb8d23da..062f4a7a5984 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -18,7 +18,10 @@ use api::v1::region::{ }; use api::v1::{ColumnDef, SemanticType}; use async_trait::async_trait; -use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; +use common_error::ext::BoxedError; +use common_procedure::error::{ + ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, +}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; @@ -35,8 +38,11 @@ use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::key::table_name::TableNameKey; use crate::metrics; +use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::CreateTableTask; -use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; +use crate::rpc::router::{ + find_leader_regions, find_leaders, operating_leader_regions, RegionRoute, +}; pub struct CreateTableProcedure { pub context: DdlContext, @@ -60,10 +66,18 @@ impl CreateTableProcedure { pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data = serde_json::from_str(json).context(FromJsonSnafu)?; - Ok(CreateTableProcedure { - context, - creator: TableCreator { data }, - }) + + let mut creator = TableCreator { + data, + opening_regions: vec![], + }; + + creator + .register_opening_regions(&context) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + Ok(CreateTableProcedure { context, creator }) } pub fn table_info(&self) -> &RawTableInfo { @@ -169,6 +183,9 @@ impl CreateTableProcedure { } pub async fn on_datanode_create_regions(&mut self) -> Result { + // Registers opening regions + self.creator.register_opening_regions(&self.context)?; + let create_table_data = &self.creator.data; let region_routes = &create_table_data.region_routes; @@ -226,7 +243,9 @@ impl CreateTableProcedure { self.creator.data.state = CreateTableState::CreateMetadata; - Ok(Status::executing(true)) + // Ensures the procedures after the crash start from the `DatanodeCreateRegions` stage. + // TODO(weny): Add more tests. + Ok(Status::executing(false)) } async fn on_create_metadata(&self) -> Result { @@ -282,7 +301,10 @@ impl Procedure for CreateTableProcedure { } pub struct TableCreator { + /// The serializable data. pub data: CreateTableData, + /// The guards of opening. + pub opening_regions: Vec, } impl TableCreator { @@ -294,8 +316,36 @@ impl TableCreator { task, region_routes, }, + opening_regions: vec![], } } + + /// Register opening regions if doesn't exist. + pub fn register_opening_regions(&mut self, context: &DdlContext) -> Result<()> { + let region_routes = &self.data.region_routes; + + let opening_regions = operating_leader_regions(region_routes); + + if self.opening_regions.len() == opening_regions.len() { + return Ok(()); + } + + let mut opening_region_guards = Vec::with_capacity(opening_regions.len()); + + for (region_id, datanode_id) in opening_regions { + let guard = context + .memory_region_keeper + .register(datanode_id, region_id) + .context(error::RegionOperatingRaceSnafu { + region_id, + peer_id: datanode_id, + })?; + opening_region_guards.push(guard); + } + + self.opening_regions = opening_region_guards; + Ok(()) + } } #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 13173b01f314..6076e6125294 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -26,7 +26,7 @@ use common_telemetry::tracing_context::TracingContext; use common_telemetry::{debug, info}; use futures::future::join_all; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionId; use strum::AsRefStr; use table::engine::TableReference; @@ -42,12 +42,19 @@ use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::key::DeserializedValueWithBytes; use crate::metrics; +use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::DropTableTask; -use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; +use crate::rpc::router::{ + find_leader_regions, find_leaders, operating_leader_regions, RegionRoute, +}; pub struct DropTableProcedure { + /// The context of procedure runtime. pub context: DdlContext, + /// The serializable data. pub data: DropTableData, + /// The guards of opening regions. + pub dropping_regions: Vec, } #[allow(dead_code)] @@ -64,12 +71,17 @@ impl DropTableProcedure { Self { context, data: DropTableData::new(cluster_id, task, table_route_value, table_info_value), + dropping_regions: vec![], } } pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data = serde_json::from_str(json).context(FromJsonSnafu)?; - Ok(Self { context, data }) + Ok(Self { + context, + data, + dropping_regions: vec![], + }) } async fn on_prepare(&mut self) -> Result { @@ -102,8 +114,42 @@ impl DropTableProcedure { Ok(Status::executing(true)) } + /// Register dropping regions if doesn't exist. + fn register_dropping_regions(&mut self) -> Result<()> { + let region_routes = self.data.region_routes(); + + let dropping_regions = operating_leader_regions(region_routes); + + if self.dropping_regions.len() == dropping_regions.len() { + return Ok(()); + } + + let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len()); + + for (region_id, datanode_id) in dropping_regions { + let guard = self + .context + .memory_region_keeper + .register(datanode_id, region_id) + .context(error::RegionOperatingRaceSnafu { + region_id, + peer_id: datanode_id, + })?; + dropping_region_guards.push(guard); + } + + self.dropping_regions = dropping_region_guards; + Ok(()) + } + /// Removes the table metadata. async fn on_remove_metadata(&mut self) -> Result { + // NOTES: If the meta server is crashed after the `RemoveMetadata`, + // Corresponding regions of this table on the Datanode will be closed automatically. + // Then any future dropping operation will fail. + + // TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping. + let table_metadata_manager = &self.context.table_metadata_manager; let table_info_value = &self.data.table_info_value; let table_route_value = &self.data.table_route_value; diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index defa6e84b71e..cb57c03e400e 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -37,6 +37,7 @@ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::DdlTask::{AlterTable, CreateTable, DropTable, TruncateTable}; use crate::rpc::ddl::{ AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, @@ -52,6 +53,7 @@ pub struct DdlManager { cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_meta_allocator: TableMetadataAllocatorRef, + memory_region_keeper: MemoryRegionKeeperRef, } impl DdlManager { @@ -62,6 +64,7 @@ impl DdlManager { cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_meta_allocator: TableMetadataAllocatorRef, + memory_region_keeper: MemoryRegionKeeperRef, ) -> Result { let manager = Self { procedure_manager, @@ -69,6 +72,7 @@ impl DdlManager { cache_invalidator, table_metadata_manager, table_meta_allocator, + memory_region_keeper, }; manager.register_loaders()?; Ok(manager) @@ -85,6 +89,7 @@ impl DdlManager { datanode_manager: self.datanode_manager.clone(), cache_invalidator: self.cache_invalidator.clone(), table_metadata_manager: self.table_metadata_manager.clone(), + memory_region_keeper: self.memory_region_keeper.clone(), } } @@ -446,6 +451,7 @@ mod tests { use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; + use crate::region_keeper::MemoryRegionKeeper; use crate::rpc::router::RegionRoute; use crate::state_store::KvStateStore; @@ -488,6 +494,7 @@ mod tests { Arc::new(DummyCacheInvalidator), table_metadata_manager, Arc::new(DummyTableMetadataAllocator), + Arc::new(MemoryRegionKeeper::default()), ); let expected_loaders = vec![ diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 4e3c712082d5..55dd941b63ce 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -19,10 +19,11 @@ use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use crate::peer::Peer; +use crate::DatanodeId; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -31,6 +32,17 @@ pub enum Error { #[snafu(display("Empty key is not allowed"))] EmptyKey { location: Location }, + #[snafu(display( + "Another procedure is operating the region: {} on peer: {}", + region_id, + peer_id + ))] + RegionOperatingRace { + location: Location, + peer_id: DatanodeId, + region_id: RegionId, + }, + #[snafu(display("Invalid result with a txn response: {}", err_msg))] InvalidTxnResult { err_msg: String, location: Location }, @@ -291,7 +303,16 @@ impl ErrorExt for Error { | SequenceOutOfRange { .. } | UnexpectedSequenceValue { .. } | InvalidHeartbeatResponse { .. } - | InvalidTxnResult { .. } => StatusCode::Unexpected, + | InvalidTxnResult { .. } + | EncodeJson { .. } + | DecodeJson { .. } + | PayloadNotExist { .. } + | ConvertRawKey { .. } + | DecodeProto { .. } + | BuildTableMeta { .. } + | TableRouteNotFound { .. } + | ConvertRawTableInfo { .. } + | RegionOperatingRace { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } @@ -306,15 +327,6 @@ impl ErrorExt for Error { TableNotFound { .. } => StatusCode::TableNotFound, TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, - EncodeJson { .. } - | DecodeJson { .. } - | PayloadNotExist { .. } - | ConvertRawKey { .. } - | DecodeProto { .. } - | BuildTableMeta { .. } - | TableRouteNotFound { .. } - | ConvertRawTableInfo { .. } => StatusCode::Unexpected, - SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(), RegisterProcedureLoader { source, .. } => source.status_code(), External { source, .. } => source.status_code(), diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index bbdd1c8e7eab..231c71ccba92 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::fmt::Display; use serde::{Deserialize, Serialize}; +use store_api::storage::RegionId; use table::metadata::TableId; use super::DeserializedValueWithBytes; @@ -50,6 +51,7 @@ impl TableRouteValue { } } + /// Returns a new version [TableRouteValue] with `region_routes`. pub fn update(&self, region_routes: Vec) -> Self { Self { region_routes, @@ -64,6 +66,14 @@ impl TableRouteValue { pub fn version(&self) -> u64 { self.version } + + /// Returns the corresponding [RegionRoute]. + pub fn region_route(&self, region_id: RegionId) -> Option { + self.region_routes + .iter() + .find(|route| route.region.id == region_id) + .cloned() + } } impl TableMetaKey for TableRouteKey { diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 996f793b10de..2345bf7d670a 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -29,6 +29,7 @@ pub mod kv_backend; pub mod metrics; pub mod peer; pub mod range_stream; +pub mod region_keeper; pub mod rpc; pub mod sequence; pub mod state_store; diff --git a/src/common/meta/src/region_keeper.rs b/src/common/meta/src/region_keeper.rs new file mode 100644 index 000000000000..1002dcc6b8d0 --- /dev/null +++ b/src/common/meta/src/region_keeper.rs @@ -0,0 +1,146 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::sync::{Arc, RwLock}; + +use store_api::storage::RegionId; + +use crate::DatanodeId; + +/// Tracks the operating(i.e., creating, opening, dropping) regions. +#[derive(Debug, Clone)] +pub struct OperatingRegionGuard { + datanode_id: DatanodeId, + region_id: RegionId, + inner: Arc>>, +} + +impl Drop for OperatingRegionGuard { + fn drop(&mut self) { + let mut inner = self.inner.write().unwrap(); + inner.remove(&(self.datanode_id, self.region_id)); + } +} + +impl OperatingRegionGuard { + /// Returns opening region info. + pub fn info(&self) -> (DatanodeId, RegionId) { + (self.datanode_id, self.region_id) + } +} + +pub type MemoryRegionKeeperRef = Arc; + +/// Tracks regions in memory. +/// +/// It used in following cases: +/// - Tracks the opening regions before the corresponding metadata is created. +/// - Tracks the deleting regions after the corresponding metadata is deleted. +#[derive(Debug, Clone, Default)] +pub struct MemoryRegionKeeper { + inner: Arc>>, +} + +impl MemoryRegionKeeper { + pub fn new() -> Self { + Default::default() + } + + /// Returns [OpeningRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist. + pub fn register( + &self, + datanode_id: DatanodeId, + region_id: RegionId, + ) -> Option { + let mut inner = self.inner.write().unwrap(); + + if inner.insert((datanode_id, region_id)) { + Some(OperatingRegionGuard { + datanode_id, + region_id, + inner: self.inner.clone(), + }) + } else { + None + } + } + + /// Returns true if the keeper contains a (`datanoe_id`, `region_id`) tuple. + pub fn contains(&self, datanode_id: DatanodeId, region_id: RegionId) -> bool { + let inner = self.inner.read().unwrap(); + inner.contains(&(datanode_id, region_id)) + } + + /// Returns a set of filtered out regions that are opening. + pub fn filter_opening_regions( + &self, + datanode_id: DatanodeId, + mut region_ids: HashSet, + ) -> HashSet { + let inner = self.inner.read().unwrap(); + region_ids.retain(|region_id| !inner.contains(&(datanode_id, *region_id))); + + region_ids + } + + /// Returns number of element in tracking set. + pub fn len(&self) -> usize { + let inner = self.inner.read().unwrap(); + inner.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use store_api::storage::RegionId; + + use crate::region_keeper::MemoryRegionKeeper; + + #[test] + fn test_opening_region_keeper() { + let keeper = MemoryRegionKeeper::new(); + + let guard = keeper.register(1, RegionId::from_u64(1)).unwrap(); + assert!(keeper.register(1, RegionId::from_u64(1)).is_none()); + let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap(); + + let output = keeper.filter_opening_regions( + 1, + HashSet::from([ + RegionId::from_u64(1), + RegionId::from_u64(2), + RegionId::from_u64(3), + ]), + ); + assert_eq!(output.len(), 1); + assert!(output.contains(&RegionId::from_u64(3))); + + assert_eq!(keeper.len(), 2); + drop(guard); + + assert_eq!(keeper.len(), 1); + + assert!(keeper.contains(1, RegionId::from_u64(2))); + drop(guard2); + + assert!(keeper.is_empty()); + } +} diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 6eca316cd496..e37529635390 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -18,6 +18,7 @@ use api::v1::meta::{ Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable, TableRoute as PbTableRoute, }; +use derive_builder::Builder; use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use snafu::OptionExt; @@ -27,6 +28,7 @@ use crate::error::{self, Result}; use crate::key::RegionDistribution; use crate::peer::Peer; use crate::table_name::TableName; +use crate::DatanodeId; pub fn region_distribution(region_routes: &[RegionRoute]) -> Result { let mut regions_id_map = RegionDistribution::new(); @@ -58,6 +60,19 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet { .collect() } +/// Returns the operating leader regions with corresponding [DatanodeId]. +pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> { + region_routes + .iter() + .filter_map(|route| { + route + .leader_peer + .as_ref() + .map(|leader| (route.region.id, leader.id)) + }) + .collect::>() +} + /// Returns the HashMap<[RegionNumber], &[Peer]>; /// /// If the region doesn't have a leader peer, the [Region] will be omitted. @@ -231,12 +246,15 @@ impl From for PbTable { } } -#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)] +#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)] pub struct RegionRoute { pub region: Region, + #[builder(setter(into, strip_option))] pub leader_peer: Option, + #[builder(setter(into), default)] pub follower_peers: Vec, /// `None` by default. + #[builder(setter(into, strip_option), default)] #[serde(default, skip_serializing_if = "Option::is_none")] pub leader_status: Option, } diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index e8bb653d6968..711ab5c3b6a6 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::HashMap; -use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -40,20 +39,14 @@ use crate::error::{self, Result}; use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver}; use crate::region_server::RegionServer; -const MAX_CLOSE_RETRY_TIMES: usize = 10; - -/// [RegionAliveKeeper] manages all `CountdownTaskHandles`. -/// -/// [RegionAliveKeeper] starts a `CountdownTask` for each region. When deadline is reached, -/// the region will be closed. +/// [RegionAliveKeeper] manages all [CountdownTaskHandle]s. /// -/// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its -/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this -/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to -/// countdown. +/// [RegionAliveKeeper] starts a [CountdownTask] for each region. When the deadline is reached, +/// the status of region be set to "readonly", ensures there is no side-effect in the entity system. /// -/// On each lease extension, [RegionAliveKeeper] will reset the deadline to the corresponding time, and -/// set region's status to "writable". +/// The deadline is controlled by the meta server. Datanode will send its opened regions info to meta sever +/// via heartbeat. If the meta server decides some region could be resided in this Datanode, +/// it will renew the lease of region, a deadline of [CountdownTask] will be reset. pub struct RegionAliveKeeper { region_server: RegionServer, tasks: Arc>>>, @@ -61,13 +54,14 @@ pub struct RegionAliveKeeper { started: Arc, /// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing - /// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically - /// non-decreasing). The heartbeat request will carry the duration since this epoch, and the + /// elapsed time when submitting heartbeats to the meta server (because [Instant] is monotonically + /// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the /// duration acts like an "invariant point" for region's keep alive lease. epoch: Instant, } impl RegionAliveKeeper { + /// Returns an empty [RegionAliveKeeper]. pub fn new(region_server: RegionServer, heartbeat_interval_millis: u64) -> Self { Self { region_server, @@ -82,26 +76,16 @@ impl RegionAliveKeeper { self.tasks.lock().await.get(®ion_id).cloned() } + /// Add the countdown task for a specific region. + /// It will be ignored if the task exists. pub async fn register_region(&self, region_id: RegionId) { if self.find_handle(region_id).await.is_some() { return; } - let tasks = Arc::downgrade(&self.tasks); - let on_task_finished = async move { - if let Some(x) = tasks.upgrade() { - let _ = x.lock().await.remove(®ion_id); - } // Else the countdown task handles map could be dropped because the keeper is dropped. - }; let handle = Arc::new(CountdownTaskHandle::new( self.region_server.clone(), region_id, - move |result: Option| { - info!( - "Deregister region: {region_id} after countdown task finished, result: {result:?}", - ); - on_task_finished - }, )); let mut handles = self.tasks.lock().await; @@ -118,13 +102,15 @@ impl RegionAliveKeeper { } } + /// Removes the countdown task for a specific region. pub async fn deregister_region(&self, region_id: RegionId) { if self.tasks.lock().await.remove(®ion_id).is_some() { info!("Deregister alive countdown for region {region_id}") } } - async fn keep_lived(&self, regions: &[GrantedRegion], deadline: Instant) { + /// Renews the lease of regions to `deadline`. + async fn renew_region_leases(&self, regions: &[GrantedRegion], deadline: Instant) { for region in regions { let (role, region_id) = (region.role().into(), RegionId::from(region.region_id)); if let Some(handle) = self.find_handle(region_id).await { @@ -138,6 +124,25 @@ impl RegionAliveKeeper { } } + async fn close_staled_region(&self, region_id: RegionId) { + info!("Closing staled region: {region_id}"); + let request = RegionRequest::Close(RegionCloseRequest {}); + if let Err(e) = self.region_server.handle_request(region_id, request).await { + if e.status_code() != StatusCode::RegionNotFound { + let _ = self.region_server.set_writable(region_id, false); + error!(e; "Failed to close staled region {}, set region to readonly.",region_id); + } + } + } + + /// Closes staled regions. + async fn close_staled_regions(&self, regions: &[u64]) { + for region_id in regions { + self.close_staled_region(RegionId::from_u64(*region_id)) + .await; + } + } + #[cfg(test)] async fn deadline(&self, region_id: RegionId) -> Option { let mut deadline = None; @@ -243,7 +248,11 @@ impl HeartbeatResponseHandler for RegionAliveKeeper { let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch); let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds); - self.keep_lived(®ion_lease.regions, deadline).await; + self.renew_region_leases(®ion_lease.regions, deadline) + .await; + self.close_staled_regions(®ion_lease.closeable_region_ids) + .await; + Ok(HandleControl::Continue) } } @@ -269,19 +278,7 @@ struct CountdownTaskHandle { impl CountdownTaskHandle { /// Creates a new [CountdownTaskHandle] and starts the countdown task. - /// # Params - /// - `on_task_finished`: a callback to be invoked when the task is finished. Note that it will not - /// be invoked if the task is cancelled (by dropping the handle). This is because we want something - /// meaningful to be done when the task is finished, e.g. deregister the handle from the map. - /// While dropping the handle does not necessarily mean the task is finished. - fn new( - region_server: RegionServer, - region_id: RegionId, - on_task_finished: impl FnOnce(Option) -> Fut + Send + 'static, - ) -> Self - where - Fut: Future + Send, - { + fn new(region_server: RegionServer, region_id: RegionId) -> Self { let (tx, rx) = mpsc::channel(1024); let mut countdown_task = CountdownTask { @@ -290,8 +287,7 @@ impl CountdownTaskHandle { rx, }; let handler = common_runtime::spawn_bg(async move { - let result = countdown_task.run().await; - on_task_finished(result).await; + countdown_task.run().await; }); Self { @@ -301,6 +297,8 @@ impl CountdownTaskHandle { } } + /// Starts the [CountdownTask], + /// it will be ignored if the task started. async fn start(&self, heartbeat_interval_millis: u64) { if let Err(e) = self .tx @@ -354,8 +352,7 @@ struct CountdownTask { } impl CountdownTask { - // returns true if region closed successfully - async fn run(&mut self) -> Option { + async fn run(&mut self) { // 30 years. See `Instant::far_future`. let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); @@ -363,37 +360,30 @@ impl CountdownTask { // "start countdown" command will be sent from heartbeat task). let countdown = tokio::time::sleep_until(far_future); tokio::pin!(countdown); - let region_id = self.region_id; + + let mut started = false; loop { tokio::select! { command = self.rx.recv() => { match command { Some(CountdownCommand::Start(heartbeat_interval_millis)) => { - // Set first deadline in 4 heartbeats (roughly after 20 seconds from now if heartbeat - // interval is set to default 5 seconds), to make Datanode and Metasrv more tolerable to - // network or other jitters during startup. - let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4; - countdown.set(tokio::time::sleep_until(first_deadline)); + if !started { + // Set first deadline in 4 heartbeats (roughly after 12 seconds from now if heartbeat + // interval is set to default 3 seconds), to make Datanode and Metasrv more tolerable to + // network or other jitters during startup. + let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4; + countdown.set(tokio::time::sleep_until(first_deadline)); + started = true; + } }, Some(CountdownCommand::Reset((role, deadline))) => { - // The first-time granted regions might be ignored because the `first_deadline` is larger than the `region_lease_timeout`. - // Therefore, we set writable at the outside. - // TODO(weny): Considers setting `first_deadline` to `region_lease_timeout`. let _ = self.region_server.set_writable(self.region_id, role.writable()); - - if countdown.deadline() < deadline { - trace!( - "Reset deadline of region {region_id} to approximately {} seconds later", - (deadline - Instant::now()).as_secs_f32(), - ); - countdown.set(tokio::time::sleep_until(deadline)); - } - // Else the countdown could be either: - // - not started yet; - // - during startup protection; - // - received a lagging heartbeat message. - // All can be safely ignored. + trace!( + "Reset deadline of region {region_id} to approximately {} seconds later.", + (deadline - Instant::now()).as_secs_f32(), + ); + countdown.set(tokio::time::sleep_until(deadline)); }, None => { info!( @@ -409,130 +399,123 @@ impl CountdownTask { } } () = &mut countdown => { - let result = self.close_region().await; - info!( - "Region {region_id} is closed, result: {result:?}. \ - RegionAliveKeeper out.", - ); - return Some(result); + warn!("The region {region_id} lease is expired, set region to readonly."); + let _ = self.region_server.set_writable(self.region_id, false); + // resets the countdown. + let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); + countdown.as_mut().reset(far_future); } } } - None - } - - /// Returns if the region is closed successfully. - async fn close_region(&self) -> bool { - for retry in 0..MAX_CLOSE_RETRY_TIMES { - let request = RegionRequest::Close(RegionCloseRequest {}); - match self - .region_server - .handle_request(self.region_id, request) - .await - { - Ok(_) => return true, - Err(e) if e.status_code() == StatusCode::RegionNotFound => return true, - // If region is failed to close, immediately retry. Maybe we should panic instead? - Err(e) => error!(e; - "Retry {retry}, failed to close region {}. \ - For the integrity of data, retry closing and retry without wait.", - self.region_id, - ), - } - } - false } } #[cfg(test)] mod test { - use api::v1::meta::RegionRole; + + use mito2::config::MitoConfig; + use mito2::test_util::{CreateRequestBuilder, TestEnv}; + use store_api::region_engine::RegionEngine; use super::*; use crate::tests::mock_region_server; #[tokio::test(flavor = "multi_thread")] async fn region_alive_keeper() { - let region_server = mock_region_server(); - let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server, 300)); - let region_id = RegionId::new(1, 2); + common_telemetry::init_default_ut_logging(); + let mut region_server = mock_region_server(); + let mut engine_env = TestEnv::with_prefix("region-alive-keeper"); + let engine = Arc::new(engine_env.create_engine(MitoConfig::default()).await); + region_server.register_engine(engine.clone()); + + let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), 100)); + + let region_id = RegionId::new(1024, 1); + let builder = CreateRequestBuilder::new(); + region_server + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap(); + region_server.set_writable(region_id, true).unwrap(); - // register a region before starting + // Register a region before starting. alive_keeper.register_region(region_id).await; assert!(alive_keeper.find_handle(region_id).await.is_some()); + info!("Start the keeper"); alive_keeper.start(None).await.unwrap(); - // started alive keeper should assign deadline to this region + // The started alive keeper should assign deadline to this region. let deadline = alive_keeper.deadline(region_id).await.unwrap(); assert!(deadline >= Instant::now()); + assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader); + + info!("Wait for lease expired"); + // Sleep to wait lease expired. + tokio::time::sleep(Duration::from_millis(500)).await; + assert!(alive_keeper.find_handle(region_id).await.is_some()); + assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower); - // extend lease then sleep + info!("Renew the region lease"); + // Renew lease then sleep. alive_keeper - .keep_lived( + .renew_region_leases( &[GrantedRegion { region_id: region_id.as_u64(), - role: RegionRole::Leader.into(), + role: api::v1::meta::RegionRole::Leader.into(), }], - Instant::now() + Duration::from_millis(500), + Instant::now() + Duration::from_millis(200), ) .await; - tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(100)).await; assert!(alive_keeper.find_handle(region_id).await.is_some()); let deadline = alive_keeper.deadline(region_id).await.unwrap(); assert!(deadline >= Instant::now()); + assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader); + + info!("Wait for lease expired"); + // Sleep to wait lease expired. + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(alive_keeper.find_handle(region_id).await.is_some()); + assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower); - // sleep to wait lease expired - tokio::time::sleep(Duration::from_millis(1000)).await; - assert!(alive_keeper.find_handle(region_id).await.is_none()); + let deadline = alive_keeper.deadline(region_id).await.unwrap(); + assert!(deadline > Instant::now() + Duration::from_secs(86400 * 365 * 29)); } #[tokio::test(flavor = "multi_thread")] async fn countdown_task() { let region_server = mock_region_server(); - let (tx, rx) = oneshot::channel(); - - let countdown_handle = CountdownTaskHandle::new( - region_server, - RegionId::new(9999, 2), - |result: Option| async move { - tx.send((Instant::now(), result)).unwrap(); - }, - ); + let countdown_handle = CountdownTaskHandle::new(region_server, RegionId::new(9999, 2)); - // if countdown task is not started, its deadline is set to far future + // If countdown task is not started, its deadline is set to far future. assert!( countdown_handle.deadline().await.unwrap() > Instant::now() + Duration::from_secs(86400 * 365 * 29) ); - // the first deadline should be set to 4 * heartbeat_interval_millis - // we assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test + // The first deadline should be set to 4 * heartbeat_interval_millis. + // We assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test. let heartbeat_interval_millis = 100; countdown_handle.start(heartbeat_interval_millis).await; assert!( countdown_handle.deadline().await.unwrap() > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3) ); + tokio::time::sleep(Duration::from_millis(heartbeat_interval_millis * 5)).await; - // reset deadline - // a nearer deadline will be ignored - countdown_handle - .reset_deadline( - RegionRole::Leader.into(), - Instant::now() + Duration::from_millis(heartbeat_interval_millis), - ) - .await; + // No effect. + countdown_handle.start(heartbeat_interval_millis).await; assert!( countdown_handle.deadline().await.unwrap() - > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3) + > Instant::now() + Duration::from_secs(86400 * 365 * 29) ); - // only a farther deadline will be accepted + // Reset deadline. countdown_handle .reset_deadline( - RegionRole::Leader.into(), + RegionRole::Leader, Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5), ) .await; @@ -540,16 +523,5 @@ mod test { countdown_handle.deadline().await.unwrap() > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4) ); - - // wait for countdown task to finish - let before_await = Instant::now(); - let (finish_instant, result) = rx.await.unwrap(); - // it returns `RegionNotFound` - assert_eq!(result, Some(true)); - // this task should be finished after 5 * heartbeat_interval_millis - // we assert 4 times here - assert!( - finish_instant > before_await + Duration::from_millis(heartbeat_interval_millis * 4) - ); } } diff --git a/src/datanode/src/metrics.rs b/src/datanode/src/metrics.rs index 54e2619d7855..0ee6764d19af 100644 --- a/src/datanode/src/metrics.rs +++ b/src/datanode/src/metrics.rs @@ -19,6 +19,7 @@ use prometheus::*; pub const REGION_REQUEST_TYPE: &str = "datanode_region_request_type"; pub const REGION_ROLE: &str = "region_role"; +pub const REGION_ID: &str = "region_id"; lazy_static! { /// The elapsed time of handling a request in the region_server. @@ -34,6 +35,12 @@ lazy_static! { "last received heartbeat lease elapsed", ) .unwrap(); + pub static ref LEASE_EXPIRED_REGION: IntGaugeVec = register_int_gauge_vec!( + "lease_expired_region", + "lease expired region", + &[REGION_ID] + ) + .unwrap(); /// The received region leases via heartbeat. pub static ref HEARTBEAT_REGION_LEASES: IntGaugeVec = register_int_gauge_vec!( "heartbeat_region_leases", diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 1102bee0e6ab..2c4ec0a1a86c 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -12,70 +12,37 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; use common_meta::key::TableMetadataManagerRef; -use common_telemetry::info; -use store_api::region_engine::{GrantedRegion, RegionRole}; -use store_api::storage::RegionId; +use common_meta::region_keeper::MemoryRegionKeeperRef; +use store_api::region_engine::GrantedRegion; use crate::error::Result; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -use crate::region::lease_keeper::{OpeningRegionKeeperRef, RegionLeaseKeeperRef}; +use crate::region::lease_keeper::{RegionLeaseKeeperRef, RenewRegionLeasesResponse}; use crate::region::RegionLeaseKeeper; pub struct RegionLeaseHandler { region_lease_seconds: u64, region_lease_keeper: RegionLeaseKeeperRef, - opening_region_keeper: OpeningRegionKeeperRef, } impl RegionLeaseHandler { pub fn new( region_lease_seconds: u64, table_metadata_manager: TableMetadataManagerRef, - opening_region_keeper: OpeningRegionKeeperRef, + memory_region_keeper: MemoryRegionKeeperRef, ) -> Self { - let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager); + let region_lease_keeper = + RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone()); Self { region_lease_seconds, region_lease_keeper: Arc::new(region_lease_keeper), - opening_region_keeper, - } - } -} - -fn flip_role(role: RegionRole) -> RegionRole { - match role { - RegionRole::Follower => RegionRole::Leader, - RegionRole::Leader => RegionRole::Follower, - } -} - -/// Grants lease of regions. -/// -/// - If a region is in an `operable` set, it will be granted an `flip_role(current)`([RegionRole]); -/// otherwise, it will be granted a `current`([RegionRole]). -/// - If a region is in a `closeable` set, it won't be granted. -fn grant( - granted_regions: &mut Vec, - operable: &HashSet, - closeable: &HashSet, - regions: &[RegionId], - current: RegionRole, -) { - for region in regions { - if operable.contains(region) { - granted_regions.push(GrantedRegion::new(*region, flip_role(current))); - } else if closeable.contains(region) { - // Filters out the closeable regions. - } else { - granted_regions.push(GrantedRegion::new(*region, current)) } } } @@ -99,76 +66,33 @@ impl HeartbeatHandler for RegionLeaseHandler { let regions = stat.regions(); let cluster_id = stat.cluster_id; let datanode_id = stat.id; - let mut granted_regions = Vec::with_capacity(regions.len()); - let mut inactive_regions = HashSet::new(); - let (leaders, followers): (Vec<_>, Vec<_>) = regions - .into_iter() - .map(|(id, role)| match role { - RegionRole::Follower => (None, Some(id)), - RegionRole::Leader => (Some(id), None), - }) - .unzip(); - - let leaders = leaders.into_iter().flatten().collect::>(); - - let (downgradable, closeable) = self + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = self .region_lease_keeper - .find_staled_leader_regions(cluster_id, datanode_id, &leaders) + .renew_region_leases(cluster_id, datanode_id, ®ions) .await?; - grant( - &mut granted_regions, - &downgradable, - &closeable, - &leaders, - RegionRole::Leader, - ); - if !closeable.is_empty() { - info!( - "Granting region lease, found closeable leader regions: {:?} on datanode {}", - closeable, datanode_id - ); - } - inactive_regions.extend(closeable); - - let followers = followers.into_iter().flatten().collect::>(); - - let (upgradeable, closeable) = self - .region_lease_keeper - .find_staled_follower_regions(cluster_id, datanode_id, &followers) - .await?; - - // If a region is opening, it will be filtered out from the closeable regions set. - let closeable = self - .opening_region_keeper - .filter_opening_regions(datanode_id, closeable); - - grant( - &mut granted_regions, - &upgradeable, - &closeable, - &followers, - RegionRole::Follower, - ); - if !closeable.is_empty() { - info!( - "Granting region lease, found closeable follower regions {:?} on datanode {}", - closeable, datanode_id - ); - } - inactive_regions.extend(closeable); + let renewed = renewed + .into_iter() + .map(|(region_id, region_role)| { + GrantedRegion { + region_id, + region_role, + } + .into() + }) + .collect::>(); - acc.inactive_region_ids = inactive_regions; acc.region_lease = Some(RegionLease { - regions: granted_regions - .into_iter() - .map(Into::into) - .collect::>(), + regions: renewed, duration_since_epoch: req.duration_since_epoch, lease_seconds: self.region_lease_seconds, - closeable_region_ids: vec![], + closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(), }); + acc.inactive_region_ids = non_exists; Ok(HandleControl::Continue) } @@ -176,7 +100,7 @@ impl HeartbeatHandler for RegionLeaseHandler { #[cfg(test)] mod test { - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use std::sync::Arc; use common_meta::distributed_time_constants; @@ -184,20 +108,22 @@ mod test { use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; + use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; - use crate::region::lease_keeper::OpeningRegionKeeper; fn new_test_keeper() -> RegionLeaseKeeper { let store = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(store)); - RegionLeaseKeeper::new(table_metadata_manager) + let memory_region_keeper = Arc::new(MemoryRegionKeeper::default()); + RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper) } fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat { @@ -260,7 +186,7 @@ mod test { ..Default::default() }; - let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); + let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); let handler = RegionLeaseHandler::new( distributed_time_constants::REGION_LEASE_SECS, @@ -272,6 +198,10 @@ mod test { assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]); assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); + assert_eq!( + acc.region_lease.as_ref().unwrap().closeable_region_ids, + vec![another_region_id] + ); let acc = &mut HeartbeatAccumulator::default(); @@ -297,6 +227,10 @@ mod test { vec![GrantedRegion::new(region_id, RegionRole::Follower)], ); assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); + assert_eq!( + acc.region_lease.as_ref().unwrap().closeable_region_ids, + vec![another_region_id] + ); let opening_region_id = RegionId::new(table_id, region_number + 2); let _guard = opening_region_keeper @@ -331,6 +265,10 @@ mod test { ], ); assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); + assert_eq!( + acc.region_lease.as_ref().unwrap().closeable_region_ids, + vec![another_region_id] + ); } #[tokio::test] diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 9821d628b718..789cf55dc0e1 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -25,6 +25,7 @@ use common_grpc::channel_manager; use common_meta::ddl::DdlTaskExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; +use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::sequence::SequenceRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; @@ -46,7 +47,6 @@ use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; -use crate::region::lease_keeper::OpeningRegionKeeperRef; use crate::selector::{Selector, SelectorType}; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::LeaderCachedKvBackend; @@ -233,7 +233,7 @@ pub struct MetaSrv { mailbox: MailboxRef, ddl_executor: DdlTaskExecutorRef, table_metadata_manager: TableMetadataManagerRef, - opening_region_keeper: OpeningRegionKeeperRef, + memory_region_keeper: MemoryRegionKeeperRef, greptimedb_telemetry_task: Arc, plugins: Plugins, @@ -396,8 +396,8 @@ impl MetaSrv { &self.table_metadata_manager } - pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef { - &self.opening_region_keeper + pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef { + &self.memory_region_keeper } pub fn publish(&self) -> Option { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 87f5936c32ae..01841b434f5c 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -26,6 +26,7 @@ use common_meta::distributed_time_constants; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; +use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::sequence::Sequence; use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; @@ -55,7 +56,6 @@ use crate::metasrv::{ }; use crate::procedure::region_failover::RegionFailoverManager; use crate::pubsub::PublishRef; -use crate::region::lease_keeper::OpeningRegionKeeper; use crate::selector::lease_based::LeaseBasedSelector; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvBackend}; @@ -211,6 +211,8 @@ impl MetaSrvBuilder { )) }); + let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); + let ddl_manager = build_ddl_manager( &options, datanode_manager, @@ -218,8 +220,8 @@ impl MetaSrvBuilder { &mailbox, &table_metadata_manager, table_metadata_allocator, + &opening_region_keeper, )?; - let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); let handler_group = match handler_group { Some(handler_group) => handler_group, @@ -307,7 +309,7 @@ impl MetaSrvBuilder { ) .await, plugins: plugins.unwrap_or_else(Plugins::default), - opening_region_keeper, + memory_region_keeper: opening_region_keeper, }) } } @@ -350,6 +352,7 @@ fn build_ddl_manager( mailbox: &MailboxRef, table_metadata_manager: &TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, + memory_region_keeper: &MemoryRegionKeeperRef, ) -> Result { let datanode_clients = datanode_clients.unwrap_or_else(|| { let datanode_client_channel_config = ChannelConfig::new() @@ -376,6 +379,7 @@ fn build_ddl_manager( cache_invalidator, table_metadata_manager.clone(), table_metadata_allocator, + memory_region_keeper.clone(), ) .context(error::InitDdlManagerSnafu)?, )) diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 649230ccd2e3..1dc63c624b8b 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -39,18 +39,6 @@ lazy_static! { pub static ref METRIC_META_LEADER_CACHED_KV_LOAD_ELAPSED: HistogramVec = register_histogram_vec!("meta_leader_cache_kv_load", "meta load cache", &["prefix"]) .unwrap(); - /// Elapsed time to load follower region metadata. - pub static ref METRIC_META_LOAD_FOLLOWER_METADATA_ELAPSED: Histogram = register_histogram!( - "meta_load_follower_metadata_elapsed", - "meta load follower regions metadata elapsed" - ) - .unwrap(); - /// Elapsed time to load leader region metadata. - pub static ref METRIC_META_LOAD_LEADER_METADATA_ELAPSED: Histogram = register_histogram!( - "meta_load_leader_metadata_elapsed", - "meta load leader regions metadata elapsed" - ) - .unwrap(); /// Meta kv cache hit counter. pub static ref METRIC_META_KV_CACHE_HIT: IntCounterVec = register_int_counter_vec!("meta_kv_cache_hit", "meta kv cache hit", &["op"]).unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 4f53deb41dea..787462cecf6b 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -32,6 +32,7 @@ use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use common_meta::peer::Peer; +use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; use common_meta::ClusterId; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -45,7 +46,6 @@ use tokio::time::Instant; use self::migration_start::RegionMigrationStart; use crate::error::{self, Error, Result}; use crate::procedure::utils::region_lock_key; -use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef}; use crate::service::mailbox::{BroadcastChannel, MailboxRef}; /// It's shared in each step and available even after recovering. @@ -84,7 +84,7 @@ pub struct VolatileContext { /// `opening_region_guard` should be consumed after /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue). - opening_region_guard: Option, + opening_region_guard: Option, /// `table_route` is stored via previous steps for future use. table_route: Option>, /// `table_info` is stored via previous steps for future use. @@ -126,7 +126,7 @@ pub trait ContextFactory { pub struct ContextFactoryImpl { volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, - opening_region_keeper: OpeningRegionKeeperRef, + opening_region_keeper: MemoryRegionKeeperRef, mailbox: MailboxRef, server_addr: String, } @@ -151,7 +151,7 @@ pub struct Context { persistent_ctx: PersistentContext, volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, - opening_region_keeper: OpeningRegionKeeperRef, + opening_region_keeper: MemoryRegionKeeperRef, mailbox: MailboxRef, server_addr: String, } diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index d1d169b8c298..43c3233723ff 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -23,6 +23,7 @@ use common_meta::instruction::{ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; +use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::rpc::router::RegionRoute; use common_meta::sequence::Sequence; use common_meta::DatanodeId; @@ -43,7 +44,6 @@ use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeader use crate::procedure::region_migration::migration_end::RegionMigrationEnd; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::PersistentContext; -use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef}; use crate::service::mailbox::{Channel, MailboxRef}; pub type MockHeartbeatReceiver = Receiver>; @@ -83,7 +83,7 @@ impl MailboxContext { pub struct TestingEnv { table_metadata_manager: TableMetadataManagerRef, mailbox_ctx: MailboxContext, - opening_region_keeper: OpeningRegionKeeperRef, + opening_region_keeper: MemoryRegionKeeperRef, server_addr: String, } @@ -96,7 +96,7 @@ impl TestingEnv { let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 1, kv_backend.clone()); let mailbox_ctx = MailboxContext::new(mailbox_sequence); - let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); + let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); Self { table_metadata_manager, @@ -127,8 +127,8 @@ impl TestingEnv { &self.table_metadata_manager } - /// Returns the [OpeningRegionKeeperRef] - pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef { + /// Returns the [MemoryRegionKeeperRef] + pub fn opening_region_keeper(&self) -> &MemoryRegionKeeperRef { &self.opening_region_keeper } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index eed43ee4a999..097a282273cc 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -174,6 +174,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; + use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; use store_api::storage::RegionId; @@ -182,7 +183,6 @@ mod tests { use crate::procedure::region_migration::test_util::{self, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; - use crate::region::lease_keeper::OpeningRegionKeeper; fn new_persistent_context() -> PersistentContext { test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) @@ -301,7 +301,7 @@ mod tests { let env = TestingEnv::new(); let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); - let opening_keeper = OpeningRegionKeeper::default(); + let opening_keeper = MemoryRegionKeeper::default(); let table_id = 1024; let table_info = new_test_table_info(table_id, vec![1]).into(); @@ -448,7 +448,7 @@ mod tests { let env = TestingEnv::new(); let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); - let opening_keeper = OpeningRegionKeeper::default(); + let opening_keeper = MemoryRegionKeeper::default(); let table_id = 1024; let table_info = new_test_table_info(table_id, vec![1]).into(); diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 5321481f66a3..cb3ccc0b32a3 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -217,7 +217,7 @@ async fn test_on_datanode_create_regions() { }); let status = procedure.on_datanode_create_regions().await.unwrap(); - assert!(matches!(status, Status::Executing { persist: true })); + assert!(matches!(status, Status::Executing { persist: false })); assert!(matches!( procedure.creator.data.state, CreateTableState::CreateMetadata diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 6f36dc7f9d5b..f90fb46dd5b9 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -115,6 +115,7 @@ pub mod test_data { use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; + use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::RegionRoute; use common_meta::sequence::Sequence; use datatypes::prelude::ConcreteDataType; @@ -205,6 +206,7 @@ pub mod test_data { }, )), table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), + memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), } } } diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 449b6063cd3e..fcc4a8363d85 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -12,38 +12,78 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod mito; -pub mod utils; - use std::collections::{HashMap, HashSet}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManagerRef; +use common_meta::region_keeper::MemoryRegionKeeperRef; +use common_meta::rpc::router::RegionRoute; use common_meta::DatanodeId; -use common_telemetry::warn; use snafu::ResultExt; +use store_api::region_engine::RegionRole; use store_api::storage::{RegionId, TableId}; -use self::mito::find_staled_leader_regions; use crate::error::{self, Result}; -use crate::metrics; -use crate::region::lease_keeper::utils::find_staled_follower_regions; pub type RegionLeaseKeeperRef = Arc; +/// Renews lease of regions. pub struct RegionLeaseKeeper { table_metadata_manager: TableMetadataManagerRef, + memory_region_keeper: MemoryRegionKeeperRef, +} + +/// The result of region lease renewal, +/// contains the renewed region leases and [RegionId] of non-existing regions. +pub struct RenewRegionLeasesResponse { + pub renewed: HashMap, + pub non_exists: HashSet, } impl RegionLeaseKeeper { - pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self { + pub fn new( + table_metadata_manager: TableMetadataManagerRef, + memory_region_keeper: MemoryRegionKeeperRef, + ) -> Self { Self { table_metadata_manager, + memory_region_keeper, } } } +fn renew_region_lease_via_region_route( + region_route: &RegionRoute, + datanode_id: DatanodeId, + region_id: RegionId, +) -> Option<(RegionId, RegionRole)> { + // If it's a leader region on this datanode. + if let Some(leader) = ®ion_route.leader_peer { + if leader.id == datanode_id { + let region_role = if region_route.is_leader_downgraded() { + RegionRole::Follower + } else { + RegionRole::Leader + }; + + return Some((region_id, region_role)); + } + } + + // If it's a follower region on this datanode. + if region_route + .follower_peers + .iter() + .any(|peer| peer.id == datanode_id) + { + return Some((region_id, RegionRole::Follower)); + } + + // The region doesn't belong to this datanode. + None +} + impl RegionLeaseKeeper { fn collect_tables(&self, datanode_regions: &[RegionId]) -> HashMap> { let mut tables = HashMap::>::new(); @@ -73,100 +113,68 @@ impl RegionLeaseKeeper { Ok(metadata_subset) } - /// Returns downgradable regions, and closeable regions. - /// - /// - Downgradable regions: - /// Region's peer(`datanode_id`) is the corresponding downgraded leader peer in `region_routes`. - /// - /// - closeable regions: - /// - It returns a region if it's peer(`datanode_id`) isn't the corresponding leader peer in `region_routes`. - /// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions. - /// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. - /// - It returns a region if the region's table metadata is not found. - pub async fn find_staled_leader_regions( + /// Returns [None] if specific region doesn't belong to the datanode. + fn renew_region_lease( &self, - _cluster_id: u64, - datanode_id: u64, - datanode_regions: &[RegionId], - ) -> Result<(HashSet, HashSet)> { - let tables = self.collect_tables(datanode_regions); - let table_ids = tables.keys().copied().collect::>(); - - let metadata_subset = { - let _timer = metrics::METRIC_META_LOAD_LEADER_METADATA_ELAPSED.start_timer(); - self.collect_tables_metadata(&table_ids).await? - }; - - let mut closeable_set = HashSet::new(); - let mut downgradable_set = HashSet::new(); - - for (table_id, regions) in tables { - if let Some(metadata) = metadata_subset.get(&table_id) { - let region_routes = &metadata.region_routes; - - let (downgradable, closeable) = - find_staled_leader_regions(datanode_id, ®ions, region_routes); + table_metadata: &HashMap, + datanode_id: DatanodeId, + region_id: RegionId, + role: RegionRole, + ) -> Option<(RegionId, RegionRole)> { + // Renews the lease if it's a opening region or deleting region. + if self.memory_region_keeper.contains(datanode_id, region_id) { + return Some((region_id, role)); + } - downgradable_set.extend(downgradable); - closeable_set.extend(closeable); - } else { - warn!( - "The table {} metadata is not found, appends closeable leader regions: {:?}", - table_id, regions - ); - // If table metadata is not found. - closeable_set.extend(regions); + if let Some(table_route) = table_metadata.get(®ion_id.table_id()) { + if let Some(region_route) = table_route.region_route(region_id) { + return renew_region_lease_via_region_route(®ion_route, datanode_id, region_id); } } - Ok((downgradable_set, closeable_set)) + None } - /// Returns upgradable regions, and closeable regions. + /// Renews the lease of regions for specific datanode. /// - /// Upgradable regions: - /// - Region's peer(`datanode_id`) is the corresponding leader peer in `region_routes`. + /// The lease of regions will be renewed if: + /// - The region of the specific datanode exists in [TableRouteValue]. + /// - The region of the specific datanode is opening. /// - /// closeable regions: - /// - Region's peer(`datanode_id`) isn't the corresponding leader/follower peer in `region_routes`. - /// - Region's table metadata is not found. - pub async fn find_staled_follower_regions( + /// Otherwise the lease of regions will not be renewed, + /// and corresponding regions will be added to `non_exists` of [RenewRegionLeasesResponse]. + pub async fn renew_region_leases( &self, _cluster_id: u64, - datanode_id: u64, - datanode_regions: &[RegionId], - ) -> Result<(HashSet, HashSet)> { - let tables = self.collect_tables(datanode_regions); + datanode_id: DatanodeId, + regions: &[(RegionId, RegionRole)], + ) -> Result { + let region_ids = regions + .iter() + .map(|(region_id, _)| *region_id) + .collect::>(); + let tables = self.collect_tables(®ion_ids); let table_ids = tables.keys().copied().collect::>(); - - let metadata_subset = { - let _timer = metrics::METRIC_META_LOAD_FOLLOWER_METADATA_ELAPSED.start_timer(); - self.collect_tables_metadata(&table_ids).await? - }; - - let mut upgradable_set = HashSet::new(); - let mut closeable_set = HashSet::new(); - - for (table_id, regions) in tables { - if let Some(metadata) = metadata_subset.get(&table_id) { - let region_routes = &metadata.region_routes; - - let (upgradable, closeable) = - find_staled_follower_regions(datanode_id, ®ions, region_routes); - - upgradable_set.extend(upgradable); - closeable_set.extend(closeable); - } else { - warn!( - "The table {} metadata is not found, appends closeable followers regions: {:?}", - table_id, regions - ); - // If table metadata is not found. - closeable_set.extend(regions); + let table_metadata = self.collect_tables_metadata(&table_ids).await?; + + let mut renewed = HashMap::new(); + let mut non_exists = HashSet::new(); + + for &(region, role) in regions { + match self.renew_region_lease(&table_metadata, datanode_id, region, role) { + Some((region, renewed_role)) => { + renewed.insert(region, renewed_role); + } + None => { + non_exists.insert(region); + } } } - Ok((upgradable_set, closeable_set)) + Ok(RenewRegionLeasesResponse { + renewed, + non_exists, + }) } #[cfg(test)] @@ -175,394 +183,217 @@ impl RegionLeaseKeeper { } } -#[derive(Debug, Clone)] -pub struct OpeningRegionGuard { - datanode_id: DatanodeId, - region_id: RegionId, - inner: Arc>>, -} - -impl Drop for OpeningRegionGuard { - fn drop(&mut self) { - let mut inner = self.inner.write().unwrap(); - inner.remove(&(self.datanode_id, self.region_id)); - } -} - -impl OpeningRegionGuard { - /// Returns opening region info. - pub fn info(&self) -> (DatanodeId, RegionId) { - (self.datanode_id, self.region_id) - } -} - -pub type OpeningRegionKeeperRef = Arc; - -#[derive(Debug, Clone, Default)] -/// Tracks opening regions. -pub struct OpeningRegionKeeper { - inner: Arc>>, -} - -impl OpeningRegionKeeper { - pub fn new() -> Self { - Default::default() - } - - /// Returns [OpeningRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist. - pub fn register( - &self, - datanode_id: DatanodeId, - region_id: RegionId, - ) -> Option { - let mut inner = self.inner.write().unwrap(); - - if inner.insert((datanode_id, region_id)) { - Some(OpeningRegionGuard { - datanode_id, - region_id, - inner: self.inner.clone(), - }) - } else { - None - } - } - - /// Returns true if the keeper contains a (`datanoe_id`, `region_id`) tuple. - pub fn contains(&self, datanode_id: DatanodeId, region_id: RegionId) -> bool { - let inner = self.inner.read().unwrap(); - inner.contains(&(datanode_id, region_id)) - } - - /// Returns a set of filtered out regions that are opening. - pub fn filter_opening_regions( - &self, - datanode_id: DatanodeId, - mut region_ids: HashSet, - ) -> HashSet { - let inner = self.inner.read().unwrap(); - region_ids.retain(|region_id| !inner.contains(&(datanode_id, *region_id))); - - region_ids - } - - /// Returns number of element in tracking set. - pub fn len(&self) -> usize { - let inner = self.inner.read().unwrap(); - inner.len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; use std::sync::Arc; use common_meta::key::test_utils::new_test_table_info; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use common_meta::region_keeper::MemoryRegionKeeper; + use common_meta::rpc::router::{Region, RegionRouteBuilder, RegionStatus}; + use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use table::metadata::RawTableInfo; - use super::{OpeningRegionKeeper, RegionLeaseKeeper}; + use super::{renew_region_lease_via_region_route, RegionLeaseKeeper}; + use crate::region::lease_keeper::RenewRegionLeasesResponse; fn new_test_keeper() -> RegionLeaseKeeper { let store = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(store)); - RegionLeaseKeeper::new(table_metadata_manager) - } - - #[tokio::test] - async fn test_empty_table_routes() { - let datanode_id = 1; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - - let keeper = new_test_keeper(); - - let datanode_regions = vec![region_id]; - - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); - - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(®ion_id)); - assert!(downgradable.is_empty()); - - let (upgradable, closeable) = keeper - .find_staled_follower_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); - - assert!(upgradable.is_empty()); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(®ion_id)); + let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); + RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper) } - #[tokio::test] - async fn test_find_closeable_regions_simple() { - let datanode_id = 1; - let region_number = 1u32; - let table_id = 10; - let region_id = RegionId::new(table_id, region_number); - let peer = Peer::empty(datanode_id); - let table_info = new_test_table_info(table_id, vec![region_number]).into(); - - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - - let keeper = new_test_keeper(); - let table_metadata_manager = keeper.table_metadata_manager(); - - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - - // `closeable` should be empty. - let datanode_regions = vec![region_id]; - - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) - .await + #[test] + fn test_renew_region_lease_via_region_route() { + let region_id = RegionId::new(1024, 1); + let leader_peer_id = 1024; + let follower_peer_id = 2048; + let mut region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(leader_peer_id)) + .follower_peers(vec![Peer::empty(follower_peer_id)]) + .build() .unwrap(); - assert!(closeable.is_empty()); - assert!(downgradable.is_empty()); - - // `closeable` should be empty. - let datanode_regions = vec![]; + // The region doesn't belong to the datanode. + for region_id in [RegionId::new(1024, 2), region_id] { + assert!(renew_region_lease_via_region_route(®ion_route, 1, region_id).is_none()); + } - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); + // The leader region on the datanode. + assert_eq!( + renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id), + Some((region_id, RegionRole::Leader)) + ); + // The follower region on the datanode. + assert_eq!( + renew_region_lease_via_region_route(®ion_route, follower_peer_id, region_id), + Some((region_id, RegionRole::Follower)) + ); - assert!(closeable.is_empty()); - assert!(downgradable.is_empty()); + region_route.leader_status = Some(RegionStatus::Downgraded); + // The downgraded leader region on the datanode. + assert_eq!( + renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id), + Some((region_id, RegionRole::Follower)) + ); } #[tokio::test] - async fn test_find_closeable_regions_2() { - let datanode_id = 1; - let region_number = 1u32; - let table_id = 10; - let region_id = RegionId::new(table_id, region_number); - let another_region_id = RegionId::new(table_id, region_number + 1); - let unknown_region_id = RegionId::new(table_id + 1, region_number); - - let peer = Peer::empty(datanode_id); - let another_peer = Peer::empty(datanode_id + 1); - - let table_info = - new_test_table_info(table_id, vec![region_number, region_number + 1]).into(); - - let region_routes = vec![ - RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(another_region_id), - leader_peer: None, - follower_peers: vec![another_peer.clone()], - leader_status: None, - }, - ]; - + async fn test_renew_region_leases_non_exists_regions() { let keeper = new_test_keeper(); - let table_metadata_manager = keeper.table_metadata_manager(); - - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - - // Unexpected Leader region. - // `closeable` should be vec![unknown_region_id]. - let datanode_regions = vec![region_id, unknown_region_id]; - - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); - - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(&unknown_region_id)); - assert!(downgradable.is_empty()); - // Expected as Follower region. - // `closeable` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. - let datanode_regions = vec![another_region_id]; - - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases( + 0, + 1, + &[ + (RegionId::new(1024, 1), RegionRole::Follower), + (RegionId::new(1024, 2), RegionRole::Leader), + ], + ) .await .unwrap(); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(&another_region_id)); - assert!(downgradable.is_empty()); + assert!(renewed.is_empty()); + assert_eq!( + non_exists, + HashSet::from([RegionId::new(1024, 1), RegionId::new(1024, 2)]) + ); } #[tokio::test] - async fn test_find_staled_leader_region_downgraded() { - let datanode_id = 1; + async fn test_renew_region_leases_basic() { let region_number = 1u32; - let table_id = 10; - let region_id = RegionId::new(table_id, region_number); - let another_region_id = RegionId::new(table_id, region_number + 1); - let peer = Peer::empty(datanode_id); + let table_id = 1024; let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - leader_status: Some(RegionStatus::Downgraded), - ..Default::default() - }]; - let keeper = new_test_keeper(); - let table_metadata_manager = keeper.table_metadata_manager(); - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - - // `upgradable` should be empty, `closeable` should be empty. - let datanode_regions = vec![region_id, another_region_id]; - - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) - .await + let region_id = RegionId::new(table_id, 1); + let leader_peer_id = 1024; + let follower_peer_id = 2048; + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(leader_peer_id)) + .follower_peers(vec![Peer::empty(follower_peer_id)]) + .build() .unwrap(); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(&another_region_id)); - assert_eq!(downgradable.len(), 1); - assert!(downgradable.contains(®ion_id)); - } - - #[tokio::test] - async fn test_find_staled_follower_regions() { - let datanode_id = 1; - let region_number = 1u32; - let table_id = 10; - let region_id = RegionId::new(table_id, region_number); - let peer = Peer::empty(datanode_id); - let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); - - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - let keeper = new_test_keeper(); let table_metadata_manager = keeper.table_metadata_manager(); - table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, vec![region_route.clone()]) .await .unwrap(); - // `upgradable` should be vec![region_id], `closeable` should be empty. - let datanode_regions = vec![region_id]; - - let (upgradable, closeable) = keeper - .find_staled_follower_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); + // The region doesn't belong to the datanode. + for region_id in [RegionId::new(1024, 2), region_id] { + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(0, 1, &[(region_id, RegionRole::Follower)]) + .await + .unwrap(); + assert!(renewed.is_empty()); + assert_eq!(non_exists, HashSet::from([region_id])); + } - assert!(closeable.is_empty()); - assert_eq!(upgradable.len(), 1); - assert!(upgradable.contains(®ion_id)); + // The leader region on the datanode. + for role in [RegionRole::Leader, RegionRole::Follower] { + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(0, leader_peer_id, &[(region_id, role)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Leader)])); + } - // `upgradable` should be empty, `closeable` should be vec![region_id]. - let datanode_regions = vec![region_id]; + // The follower region on the datanode. + for role in [RegionRole::Leader, RegionRole::Follower] { + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(0, follower_peer_id, &[(region_id, role)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Follower)])); + } - let (upgradable, closeable) = keeper - .find_staled_follower_regions(0, datanode_id + 1, &datanode_regions) - .await + let opening_region_id = RegionId::new(2048, 1); + let _guard = keeper + .memory_region_keeper + .register(leader_peer_id, opening_region_id) .unwrap(); - assert!(upgradable.is_empty()); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(®ion_id)); + // The opening region on the datanode. + // NOTES: The procedure lock will ensure only one opening leader. + for role in [RegionRole::Leader, RegionRole::Follower] { + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(0, leader_peer_id, &[(opening_region_id, role)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!(renewed, HashMap::from([(opening_region_id, role)])); + } } #[tokio::test] - async fn test_find_staled_region_downgraded() { - let datanode_id = 1; + async fn test_renew_region_leases_with_downgrade_leader() { let region_number = 1u32; - let table_id = 10; - let region_id = RegionId::new(table_id, region_number); - let peer = Peer::empty(datanode_id); + let table_id = 1024; let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - leader_status: Some(RegionStatus::Downgraded), - ..Default::default() - }]; + let region_id = RegionId::new(table_id, 1); + let leader_peer_id = 1024; + let follower_peer_id = 2048; + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(leader_peer_id)) + .follower_peers(vec![Peer::empty(follower_peer_id)]) + .leader_status(RegionStatus::Downgraded) + .build() + .unwrap(); - let datanode_regions = vec![region_id]; let keeper = new_test_keeper(); let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, vec![region_route.clone()]) .await .unwrap(); - let (upgradable, closeable) = keeper - .find_staled_follower_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); - assert!(upgradable.is_empty()); - assert!(closeable.is_empty()); - } - - #[test] - fn test_opening_region_keeper() { - let keeper = OpeningRegionKeeper::new(); - - let guard = keeper.register(1, RegionId::from_u64(1)).unwrap(); - assert!(keeper.register(1, RegionId::from_u64(1)).is_none()); - let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap(); - - let output = keeper.filter_opening_regions( - 1, - HashSet::from([ - RegionId::from_u64(1), - RegionId::from_u64(2), - RegionId::from_u64(3), - ]), - ); - assert_eq!(output.len(), 1); - assert!(output.contains(&RegionId::from_u64(3))); - - assert_eq!(keeper.len(), 2); - drop(guard); - - assert_eq!(keeper.len(), 1); - - assert!(keeper.contains(1, RegionId::from_u64(2))); - drop(guard2); - - assert!(keeper.is_empty()); + // The leader region on the datanode. + for role in [RegionRole::Leader, RegionRole::Follower] { + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(0, follower_peer_id, &[(region_id, role)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Follower)])); + } } } diff --git a/src/meta-srv/src/region/lease_keeper/mito.rs b/src/meta-srv/src/region/lease_keeper/mito.rs deleted file mode 100644 index ee3ebe44246b..000000000000 --- a/src/meta-srv/src/region/lease_keeper/mito.rs +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; - -use common_meta::rpc::router::{ - convert_to_region_leader_map, convert_to_region_leader_status_map, RegionRoute, -}; -use store_api::storage::RegionId; - -use super::utils::downgradable_leader_regions; -use crate::region::lease_keeper::utils::closeable_leader_region; - -/// Returns Downgradable regions and closeable regions. -/// -/// - Downgradable regions: -/// Region's peer(`datanode_id`) is the corresponding downgraded leader peer in `region_routes`. -/// -/// - closeable regions: -/// Region's peer(`datanode_id`) isn't the corresponding leader peer in `region_routes`. -/// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions. -/// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. -pub fn find_staled_leader_regions( - datanode_id: u64, - datanode_regions: &[RegionId], - region_routes: &[RegionRoute], -) -> (HashSet, HashSet) { - let region_leader_map = convert_to_region_leader_map(region_routes); - let region_leader_status_map = convert_to_region_leader_status_map(region_routes); - - let (downgradable, closeable): (HashSet<_>, HashSet<_>) = datanode_regions - .iter() - .map(|region_id| { - ( - downgradable_leader_regions( - datanode_id, - *region_id, - ®ion_leader_map, - ®ion_leader_status_map, - ), - closeable_leader_region(datanode_id, *region_id, ®ion_leader_map), - ) - }) - .unzip(); - - let downgradable = downgradable.into_iter().flatten().collect(); - let closeable = closeable.into_iter().flatten().collect(); - - (downgradable, closeable) -} - -#[cfg(test)] -mod tests { - - use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute}; - use store_api::storage::RegionId; - - use crate::region::lease_keeper::mito::find_staled_leader_regions; - - #[test] - fn test_find_staled_regions() { - let datanode_id = 1u64; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let peer = Peer::empty(datanode_id); - let another_peer = Peer::empty(datanode_id + 1); - - let datanode_regions = vec![region_id]; - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - - // Grants lease. - // `closeable` should be empty, `region_id` is a active leader region of the `peer` - let (downgradable, closeable) = - find_staled_leader_regions(datanode_id, &datanode_regions, ®ion_routes); - - assert!(closeable.is_empty()); - assert!(downgradable.is_empty()); - - // Unexpected Leader region. - // `closeable` should be vec![`region_id`]; - let (downgradable, closeable) = - find_staled_leader_regions(datanode_id, &datanode_regions, &[]); - - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(®ion_id)); - assert!(downgradable.is_empty()); - - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(another_peer.clone()), - follower_peers: vec![peer.clone()], - leader_status: None, - }]; - - let retained_active_regions = datanode_regions.clone(); - - // Expected as Follower region. - // `closeable` should be vec![`region_id`], `region_id` is RegionRole::Leader. - let (downgradable, closeable) = - find_staled_leader_regions(datanode_id, &retained_active_regions, ®ion_routes); - - assert!(downgradable.is_empty()); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(®ion_id)); - } -} diff --git a/src/meta-srv/src/region/lease_keeper/utils.rs b/src/meta-srv/src/region/lease_keeper/utils.rs deleted file mode 100644 index 835f37f67dee..000000000000 --- a/src/meta-srv/src/region/lease_keeper/utils.rs +++ /dev/null @@ -1,340 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::{HashMap, HashSet}; - -use common_meta::peer::Peer; -use common_meta::rpc::router::{ - convert_to_region_leader_map, convert_to_region_leader_status_map, convert_to_region_peer_map, - RegionRoute, RegionStatus, -}; -use store_api::storage::{RegionId, RegionNumber}; - -/// Returns Some(region_id) if it's not a leader region in `region_route`. -/// -/// It removes a leader region if its peer(`node_id`) isn't the corresponding leader peer in `region_routes`. -pub fn closeable_leader_region( - node_id: u64, - region_id: RegionId, - region_leader_map: &HashMap, -) -> Option { - let region_number = region_id.region_number(); - if let Some(peer) = region_leader_map.get(®ion_number) { - if peer.id == node_id { - None - } else { - Some(region_id) - } - } else { - Some(region_id) - } -} - -/// Returns Some(region_id) if its peer(`node_id`) a downgrade leader region peer in `region_route`. -pub fn downgradable_leader_regions( - node_id: u64, - region_id: RegionId, - region_leader_map: &HashMap, - region_leader_status: &HashMap, -) -> Option { - let region_number = region_id.region_number(); - let leader_status = region_leader_status.get(®ion_number); - let downgraded = matches!(leader_status, Some(RegionStatus::Downgraded)); - - if let Some(peer) = region_leader_map.get(®ion_number) { - if peer.id == node_id && downgraded { - Some(region_id) - } else { - None - } - } else { - None - } -} - -/// Returns upgradable regions, and closeable regions. -/// -/// Upgradable regions: -/// - Region's peer(`datanode_id`) is the corresponding leader peer in `region_routes`. -/// -/// closeable regions: -/// - Region's peer(`datanode_id`) isn't the corresponding leader/follower peer in `region_routes`. -pub fn find_staled_follower_regions( - datanode_id: u64, - datanode_regions: &[RegionId], - region_routes: &[RegionRoute], -) -> (HashSet, HashSet) { - let region_leader_map = convert_to_region_leader_map(region_routes); - let region_leader_status_map = convert_to_region_leader_status_map(region_routes); - let region_peer_map = convert_to_region_peer_map(region_routes); - - let (upgradable, closeable): (HashSet>, HashSet>) = - datanode_regions - .iter() - .map(|region_id| { - ( - upgradable_follower_region( - datanode_id, - *region_id, - ®ion_leader_map, - ®ion_leader_status_map, - ), - closeable_region(datanode_id, *region_id, ®ion_peer_map), - ) - }) - .unzip(); - - let upgradable = upgradable.into_iter().flatten().collect(); - let closeable = closeable.into_iter().flatten().collect(); - - (upgradable, closeable) -} - -/// Returns Some(region) if its peer(`node_id`) a leader region peer in `region_routes`. -pub fn upgradable_follower_region( - node_id: u64, - region_id: RegionId, - region_leader_map: &HashMap, - region_leader_status: &HashMap, -) -> Option { - let region_number = region_id.region_number(); - let leader_status = region_leader_status.get(®ion_number); - let downgraded = matches!(leader_status, Some(RegionStatus::Downgraded)); - - if let Some(peer) = region_leader_map.get(®ion_number) { - if peer.id == node_id && !downgraded { - Some(region_id) - } else { - None - } - } else { - None - } -} - -/// Returns Some(region) if its peer(`node_id) is't a leader or follower region peer in `region_routes`. -pub fn closeable_region( - node_id: u64, - region_id: RegionId, - region_peer_map: &HashMap>, -) -> Option { - if let Some(set) = region_peer_map.get(®ion_id.region_number()) { - if set.get(&node_id).is_some() { - None - } else { - Some(region_id) - } - } else { - Some(region_id) - } -} - -#[cfg(test)] -mod tests { - - use common_meta::peer::Peer; - use store_api::storage::RegionId; - - use super::*; - - #[test] - fn test_closeable_leader_region() { - let datanode_id = 1u64; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let peer = Peer::empty(datanode_id); - - let region_leader_map = [(region_number, &peer)].into(); - - // Should be None, `region_id` is an active region of `peer`. - assert_eq!( - None, - closeable_leader_region(datanode_id, region_id, ®ion_leader_map,) - ); - - // Should be Some(`region_id`), incorrect datanode_id. - assert_eq!( - Some(region_id), - closeable_leader_region(datanode_id + 1, region_id, ®ion_leader_map,) - ); - - // Should be Some(`region_id`), the inactive_leader_regions is empty. - assert_eq!( - Some(region_id), - closeable_leader_region(datanode_id, region_id, &Default::default(),) - ); - - let another_peer = Peer::empty(datanode_id + 1); - let region_leader_map = [(region_number, &another_peer)].into(); - - // Should be Some(`region_id`), `region_id` is active region of `another_peer`. - assert_eq!( - Some(region_id), - closeable_leader_region(datanode_id, region_id, ®ion_leader_map,) - ); - } - - #[test] - fn test_downgradable_region() { - let datanode_id = 1u64; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let peer = Peer::empty(datanode_id); - - let region_leader_map = [(region_number, &peer)].into(); - let region_leader_status_map = [(region_number, RegionStatus::Downgraded)].into(); - - // Should be Some(region_id), `region_id` is a downgraded leader region. - assert_eq!( - Some(region_id), - downgradable_leader_regions( - datanode_id, - region_id, - ®ion_leader_map, - ®ion_leader_status_map - ) - ); - - // Should be None, `region_id` is a leader region. - assert_eq!( - None, - downgradable_leader_regions( - datanode_id, - region_id, - ®ion_leader_map, - &Default::default(), - ) - ); - - // Should be None, incorrect datanode_id. - assert_eq!( - None, - downgradable_leader_regions( - datanode_id + 1, - region_id, - ®ion_leader_map, - ®ion_leader_status_map - ) - ); - - // Should be None, incorrect datanode_id. - assert_eq!( - None, - downgradable_leader_regions( - datanode_id + 1, - region_id, - ®ion_leader_map, - &Default::default(), - ) - ); - } - - #[test] - fn test_closeable_follower_region() { - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let another_region_id = RegionId::from_u64(region_number as u64 + 1); - let region_peer_map = [(region_number, HashSet::from([1, 2, 3]))].into(); - - // Should be None. - assert_eq!(None, closeable_region(1, region_id, ®ion_peer_map)); - - // Should be Some(`region_id`), incorrect `datanode_id`. - assert_eq!( - Some(region_id), - closeable_region(4, region_id, ®ion_peer_map) - ); - - // Should be Some(`another_region_id`), `another_region_id` doesn't exist. - assert_eq!( - Some(another_region_id), - closeable_region(1, another_region_id, ®ion_peer_map) - ); - - // Should be Some(`another_region_id`), `another_region_id` doesn't exist, incorrect `datanode_id`. - assert_eq!( - Some(another_region_id), - closeable_region(4, another_region_id, ®ion_peer_map) - ); - } - - #[test] - fn test_upgradable_follower_region() { - let datanode_id = 1u64; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let another_region_id = RegionId::from_u64(region_number as u64 + 1); - let peer = Peer::empty(datanode_id); - - let region_leader_map = [(region_number, &peer)].into(); - let region_leader_status = HashMap::new(); - - // Should be Some(region_id), `region_id` is a leader region. - assert_eq!( - Some(region_id), - upgradable_follower_region( - datanode_id, - region_id, - ®ion_leader_map, - ®ion_leader_status - ) - ); - - let downgraded_leader = [(region_number, RegionStatus::Downgraded)].into(); - - // Should be None, `region_id` is a downgraded leader region. - assert_eq!( - None, - upgradable_follower_region( - datanode_id, - region_id, - ®ion_leader_map, - &downgraded_leader - ) - ); - - // Should be None, incorrect `datanode_id`. - assert_eq!( - None, - upgradable_follower_region( - datanode_id + 1, - region_id, - ®ion_leader_map, - ®ion_leader_status - ) - ); - - // Should be None, incorrect `datanode_id`, `another_region_id` doesn't exist. - assert_eq!( - None, - upgradable_follower_region( - datanode_id, - another_region_id, - ®ion_leader_map, - ®ion_leader_status - ) - ); - - // Should be None, incorrect `datanode_id`, `another_region_id` doesn't exist, incorrect `datanode_id`. - assert_eq!( - None, - upgradable_follower_region( - datanode_id + 1, - another_region_id, - ®ion_leader_map, - ®ion_leader_status - ) - ); - } -} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 8ab5394773ca..ed20ab1a9dc6 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -392,7 +392,6 @@ impl CreateRequestBuilder { }); RegionCreateRequest { - // We use empty engine name as we already locates the engine. engine: self.engine.to_string(), column_metadatas, primary_key: self.primary_key.clone().unwrap_or(primary_key), diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index f80c7d94ec06..079772b834b3 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -14,6 +14,7 @@ //! Region Engine's definition +use std::fmt::Display; use std::sync::Arc; use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole}; @@ -84,6 +85,15 @@ pub enum RegionRole { Leader, } +impl Display for RegionRole { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RegionRole::Follower => write!(f, "Follower"), + RegionRole::Leader => write!(f, "Leader"), + } + } +} + impl RegionRole { pub fn writable(&self) -> bool { matches!(self, RegionRole::Leader) diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index ded795861476..e997139b5357 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -44,6 +44,7 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_distributed_handle_ddl_request() { + common_telemetry::init_default_ut_logging(); let instance = tests::create_distributed_instance("test_distributed_handle_ddl_request").await; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index f6b1c35c9e11..352353233ddd 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -20,6 +20,7 @@ use common_config::KvBackendConfig; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; +use common_meta::region_keeper::MemoryRegionKeeper; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use datanode::config::DatanodeOptions; @@ -115,6 +116,7 @@ impl GreptimeDbStandaloneBuilder { Arc::new(DummyCacheInvalidator), table_metadata_manager, Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())), + Arc::new(MemoryRegionKeeper::default()), ) .unwrap(), );