diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 6bb2c1bdabc5..c9522a9d64b5 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -54,6 +54,8 @@ pub mod table_region; // TODO(weny): removes it. #[allow(deprecated)] pub mod table_route; +#[cfg(any(test, feature = "testing"))] +pub mod test_utils; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; @@ -684,12 +686,11 @@ mod tests { use std::sync::Arc; use bytes::Bytes; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, SchemaBuilder}; use futures::TryStreamExt; - use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder}; + use table::metadata::{RawTableInfo, TableInfo}; use super::datanode_table::DatanodeTableKey; + use super::test_utils; use crate::ddl::utils::region_storage_path; use crate::key::datanode_table::RegionInfo; use crate::key::table_info::TableInfoValue; @@ -735,40 +736,6 @@ mod tests { assert_eq!(removed, to_removed_key(key)); } - fn new_test_table_info(region_numbers: impl Iterator) -> TableInfo { - let column_schemas = vec![ - ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), - ]; - let schema = SchemaBuilder::try_from(column_schemas) - .unwrap() - .version(123) - .build() - .unwrap(); - - let meta = TableMetaBuilder::default() - .schema(Arc::new(schema)) - .primary_key_indices(vec![0]) - .engine("engine") - .next_column_id(3) - .region_numbers(region_numbers.collect::>()) - .build() - .unwrap(); - TableInfoBuilder::default() - .table_id(10) - .table_version(5) - .name("mytable") - .meta(meta) - .build() - .unwrap() - } - fn new_test_region_route() -> RegionRoute { new_region_route(1, 2) } @@ -787,6 +754,10 @@ mod tests { } } + fn new_test_table_info(region_numbers: impl Iterator) -> TableInfo { + test_utils::new_test_table_info(10, region_numbers) + } + #[tokio::test] async fn test_create_table_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 57abcf103362..885300c2edff 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::Display; use serde::{Deserialize, Serialize}; @@ -23,6 +24,7 @@ use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_P use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute}; +use crate::rpc::store::BatchGetRequest; pub struct TableRouteKey { pub table_id: TableId, @@ -197,6 +199,38 @@ impl TableRouteManager { .transpose() } + /// It may return a subset of the `table_ids`. + pub async fn batch_get( + &self, + table_ids: &[TableId], + ) -> Result> { + let lookup_table = table_ids + .iter() + .map(|id| (TableRouteKey::new(*id).as_raw_key(), id)) + .collect::>(); + + let resp = self + .kv_backend + .batch_get(BatchGetRequest { + keys: lookup_table.keys().cloned().collect::>(), + }) + .await?; + + let values = resp + .kvs + .iter() + .map(|kv| { + Ok(( + // Safety: must exist. + **lookup_table.get(kv.key()).unwrap(), + TableRouteValue::try_from_raw_value(&kv.value)?, + )) + }) + .collect::>>()?; + + Ok(values) + } + #[cfg(test)] pub async fn get_removed( &self, diff --git a/src/common/meta/src/key/test_utils.rs b/src/common/meta/src/key/test_utils.rs new file mode 100644 index 000000000000..aa4fa21ea992 --- /dev/null +++ b/src/common/meta/src/key/test_utils.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use store_api::storage::TableId; +use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; + +pub fn new_test_table_info>( + table_id: TableId, + region_numbers: I, +) -> TableInfo { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), + ]; + let schema = SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(123) + .build() + .unwrap(); + + let meta = TableMetaBuilder::default() + .schema(Arc::new(schema)) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .region_numbers(region_numbers.into_iter().collect::>()) + .build() + .unwrap(); + TableInfoBuilder::default() + .table_id(table_id) + .table_version(5) + .name("mytable") + .meta(meta) + .build() + .unwrap() +} diff --git a/src/common/meta/src/peer.rs b/src/common/meta/src/peer.rs index 483250cbf1df..dd012a12a1c5 100644 --- a/src/common/meta/src/peer.rs +++ b/src/common/meta/src/peer.rs @@ -49,6 +49,14 @@ impl Peer { addr: addr.into(), } } + + #[cfg(any(test, feature = "testing"))] + pub fn empty(id: u64) -> Self { + Self { + id, + addr: String::new(), + } + } } impl Display for Peer { diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index ffc311fbe511..8118e049e19c 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -58,7 +58,10 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet { .collect() } -pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap { +/// Returns the HashMap<[RegionNumber], &[Peer]>; +/// +/// If the region doesn't have a leader peer, the [Region] will be omitted. +pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap { region_routes .iter() .filter_map(|x| { @@ -69,7 +72,10 @@ pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap>() } -pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> { +pub fn find_region_leader( + region_routes: &[RegionRoute], + region_number: RegionNumber, +) -> Option<&Peer> { region_routes .iter() .find(|x| x.region.id.region_number() == region_number) @@ -241,12 +247,12 @@ impl RegionRoute { pub struct RegionRoutes(pub Vec); impl RegionRoutes { - pub fn region_map(&self) -> HashMap { - convert_to_region_map(&self.0) + pub fn region_leader_map(&self) -> HashMap { + convert_to_region_leader_map(&self.0) } - pub fn find_region_leader(&self, region_number: u32) -> Option<&Peer> { - self.region_map().get(®ion_number).copied() + pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> { + self.region_leader_map().get(®ion_number).copied() } } @@ -258,6 +264,16 @@ pub struct Region { pub attrs: BTreeMap, } +impl Region { + #[cfg(any(test, feature = "testing"))] + pub fn new_test(id: RegionId) -> Self { + Self { + id, + ..Default::default() + } + } +} + impl From for Region { fn from(r: PbRegion) -> Self { Self { diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index a829d969df86..4e2242288c05 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -293,7 +293,9 @@ impl HeartbeatTask { role: RegionRole::from(stat.role).into(), approximate_bytes, // TODO(ruihang): scratch more info - ..Default::default() + rcus: 0, + wcus: 0, + approximate_rows: 0, }; region_stats.push(region_stat); } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 4998969e5006..d6bb07a74b22 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -59,6 +59,7 @@ uuid.workspace = true [dev-dependencies] chrono.workspace = true client = { workspace = true, features = ["testing"] } +common-meta = { workspace = true, features = ["testing"] } common-procedure-test = { workspace = true } session = { workspace = true } tracing = "0.1" diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 85bb0c49aac3..d1c39d99e75f 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -102,6 +102,8 @@ impl HeartbeatHandler for RegionFailureHandler { #[cfg(test)] mod tests { + use store_api::region_engine::RegionRole; + use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; @@ -129,6 +131,8 @@ mod tests { wcus: 0, approximate_bytes: 0, approximate_rows: 0, + engine: default_engine().to_string(), + role: RegionRole::Follower, } } acc.stat = Some(Stat { diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 8601ec5dbf67..e48168be94a0 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -17,6 +17,7 @@ use std::collections::HashSet; use api::v1::meta::HeartbeatRequest; use common_time::util as time_util; use serde::{Deserialize, Serialize}; +use store_api::region_engine::RegionRole; use crate::error::{Error, InvalidHeartbeatRequestSnafu}; use crate::keys::StatKey; @@ -25,7 +26,9 @@ use crate::keys::StatKey; pub struct Stat { pub timestamp_millis: i64, pub cluster_id: u64, + // The datanode Id. pub id: u64, + // The datanode address. pub addr: String, /// The read capacity units during this period pub rcus: i64, @@ -38,8 +41,9 @@ pub struct Stat { pub node_epoch: u64, } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct RegionStat { + /// The region_id. pub id: u64, /// The read capacity units during this period pub rcus: i64, @@ -49,6 +53,10 @@ pub struct RegionStat { pub approximate_bytes: i64, /// Approximate number of rows in this region pub approximate_rows: i64, + /// The engine name. + pub engine: String, + /// The region role. + pub role: RegionRole, } impl Stat { @@ -132,6 +140,8 @@ impl TryFrom for RegionStat { wcus: value.wcus, approximate_bytes: value.approximate_bytes, approximate_rows: value.approximate_rows, + engine: value.engine.to_string(), + role: RegionRole::from(value.role()), }) } } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 5090d1e1b64b..fc156ff99149 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -72,6 +72,7 @@ mod test { use common_meta::key::TableMetadataManager; use common_meta::{distributed_time_constants, RegionIdent}; + use store_api::region_engine::RegionRole; use store_api::storage::{RegionId, RegionNumber}; use super::*; @@ -110,7 +111,12 @@ mod test { let region_id = RegionId::new(table_id, region_number); RegionStat { id: region_id.as_u64(), - ..Default::default() + rcus: 0, + wcus: 0, + approximate_bytes: 0, + approximate_rows: 0, + engine: String::new(), + role: RegionRole::Leader, } }; acc.stat = Some(Stat { diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index e27196b478a7..77f8f3f82145 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -31,6 +31,7 @@ mod metrics; pub mod mocks; pub mod procedure; pub mod pubsub; +pub mod region; pub mod selector; pub mod service; pub mod table_meta_alloc; diff --git a/src/meta-srv/src/region.rs b/src/meta-srv/src/region.rs new file mode 100644 index 000000000000..a3106bd57935 --- /dev/null +++ b/src/meta-srv/src/region.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod lease_keeper; + +pub use lease_keeper::RegionLeaseKeeper; diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs new file mode 100644 index 000000000000..bad7719ac8e2 --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -0,0 +1,243 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod mito; +pub mod utils; + +use std::collections::{HashMap, HashSet}; + +use common_meta::key::TableMetadataManagerRef; +use snafu::ResultExt; +use store_api::storage::{RegionId, TableId}; + +use self::mito::find_staled_leader_regions; +use crate::error::{self, Result}; + +pub struct RegionLeaseKeeper { + table_metadata_manager: TableMetadataManagerRef, +} + +impl RegionLeaseKeeper { + pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self { + Self { + table_metadata_manager, + } + } +} + +impl RegionLeaseKeeper { + /// Returns staled [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. + /// + /// - It returns a region if the `datanode_id` isn't the corresponding leader peer in `region_routes`. + /// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions. + /// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. + /// - It returns a region if the region's table metadata is not found. + pub async fn find_staled_leader_regions( + &self, + _cluster_id: u64, + datanode_id: u64, + datanode_regions: &[RegionId], + ) -> Result> { + let table_route_manager = self.table_metadata_manager.table_route_manager(); + + let mut tables = HashMap::>::new(); + + // Group by `table_id`. + for region_id in datanode_regions.iter() { + let table = tables.entry(region_id.table_id()).or_default(); + table.push(*region_id); + } + + let table_ids = tables.keys().cloned().collect::>(); + + // The subset of all table metadata. + // TODO: considers storing all active regions in meta's memory. + let metadata_subset = table_route_manager + .batch_get(&table_ids) + .await + .context(error::TableMetadataManagerSnafu)?; + + let mut staled_regions = HashSet::new(); + + for (table_id, regions) in &mut tables { + if let Some(metadata) = metadata_subset.get(table_id) { + let region_routes = &metadata.region_routes; + + staled_regions.extend(find_staled_leader_regions( + datanode_id, + regions, + region_routes, + )); + } else { + // If table metadata is not found. + staled_regions.extend(regions.drain(..)); + } + } + + Ok(staled_regions) + } + + #[cfg(test)] + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_meta::key::test_utils::new_test_table_info; + use common_meta::key::TableMetadataManager; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::storage::RegionId; + + use super::RegionLeaseKeeper; + use crate::service::store::kv::KvBackendAdapter; + use crate::service::store::memory::MemStore; + + fn new_test_keeper() -> RegionLeaseKeeper { + let store = KvBackendAdapter::wrap(Arc::new(MemStore::new())); + + let table_metadata_manager = Arc::new(TableMetadataManager::new(store)); + + RegionLeaseKeeper::new(table_metadata_manager) + } + + #[tokio::test] + async fn test_empty_table_routes() { + let datanode_id = 1; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + + let keeper = new_test_keeper(); + + let datanode_regions = vec![region_id]; + + let staled_regions = keeper + .find_staled_leader_regions(0, datanode_id, &datanode_regions) + .await + .unwrap(); + + assert_eq!(staled_regions.len(), 1); + assert!(staled_regions.contains(®ion_id)); + } + + #[tokio::test] + async fn test_find_staled_regions_simple() { + let datanode_id = 1; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let peer = Peer::empty(datanode_id); + let table_info = new_test_table_info(table_id, vec![region_number]).into(); + + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + // `staled_regions` should be empty. + let datanode_regions = vec![region_id]; + + let staled_regions = keeper + .find_staled_leader_regions(0, datanode_id, &datanode_regions) + .await + .unwrap(); + + assert!(staled_regions.is_empty()); + + // `staled_regions` should be empty. + let datanode_regions = vec![]; + + let staled_regions = keeper + .find_staled_leader_regions(0, datanode_id, &datanode_regions) + .await + .unwrap(); + + assert!(staled_regions.is_empty()); + } + + #[tokio::test] + async fn test_find_staled_regions_2() { + let datanode_id = 1; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let another_region_id = RegionId::new(table_id, region_number + 1); + let unknown_region_id = RegionId::new(table_id + 1, region_number); + + let peer = Peer::empty(datanode_id); + let another_peer = Peer::empty(datanode_id + 1); + + let table_info = + new_test_table_info(table_id, vec![region_number, region_number + 1]).into(); + + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![another_peer.clone()], + leader_status: None, + }, + ]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + // Unexpected Leader region. + // `staled_regions` should be vec![unknown_region_id]. + let datanode_regions = vec![region_id, unknown_region_id]; + + let staled_regions = keeper + .find_staled_leader_regions(0, datanode_id, &datanode_regions) + .await + .unwrap(); + + assert_eq!(staled_regions.len(), 1); + assert!(staled_regions.contains(&unknown_region_id)); + + // Expected as Follower region. + // `staled_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. + let datanode_regions = vec![another_region_id]; + + let staled_regions = keeper + .find_staled_leader_regions(0, datanode_id, &datanode_regions) + .await + .unwrap(); + + assert_eq!(staled_regions.len(), 1); + assert!(staled_regions.contains(&another_region_id)); + } +} diff --git a/src/meta-srv/src/region/lease_keeper/mito.rs b/src/meta-srv/src/region/lease_keeper/mito.rs new file mode 100644 index 000000000000..0a6e53013af9 --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper/mito.rs @@ -0,0 +1,95 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use common_meta::rpc::router::{convert_to_region_leader_map, RegionRoute}; +use store_api::storage::RegionId; + +use crate::region::lease_keeper::utils::staled_leader_regions; + +/// Returns staled regions. +/// +/// It returns a region if the `datanode_id` isn't the corresponding leader peer in `region_routes`. +/// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions. +/// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. +pub fn find_staled_leader_regions( + datanode_id: u64, + datanode_regions: &[RegionId], + region_routes: &[RegionRoute], +) -> HashSet { + let region_leader_map = convert_to_region_leader_map(region_routes); + + datanode_regions + .iter() + .filter_map(|region_id| staled_leader_regions(datanode_id, *region_id, ®ion_leader_map)) + .collect::>() +} + +#[cfg(test)] +mod tests { + + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::storage::RegionId; + + use crate::region::lease_keeper::mito::find_staled_leader_regions; + + #[test] + fn test_find_staled_regions() { + let datanode_id = 1u64; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + let peer = Peer::empty(datanode_id); + let another_peer = Peer::empty(datanode_id + 1); + + let datanode_regions = vec![region_id]; + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + + // Grants lease. + // `staled_regions` should be empty, `region_id` is a active leader region of the `peer` + let staled_regions = + find_staled_leader_regions(datanode_id, &datanode_regions, ®ion_routes); + + assert!(staled_regions.is_empty()); + + // Unexpected Leader region. + // `staled_regions` should be vec![`region_id`]; + let staled_regions = find_staled_leader_regions(datanode_id, &datanode_regions, &[]); + + assert_eq!(staled_regions.len(), 1); + assert!(staled_regions.contains(®ion_id)); + + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(another_peer.clone()), + follower_peers: vec![peer.clone()], + leader_status: None, + }]; + + let retained_active_regions = datanode_regions.clone(); + + // Expected as Follower region. + // `staled_regions` should be vec![`region_id`], `region_id` is RegionRole::Leader. + let staled_regions = + find_staled_leader_regions(datanode_id, &retained_active_regions, ®ion_routes); + + assert_eq!(staled_regions.len(), 1); + assert!(staled_regions.contains(®ion_id)); + } +} diff --git a/src/meta-srv/src/region/lease_keeper/utils.rs b/src/meta-srv/src/region/lease_keeper/utils.rs new file mode 100644 index 000000000000..dfc82066cdc5 --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper/utils.rs @@ -0,0 +1,78 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_meta::peer::Peer; +use store_api::storage::RegionId; + +/// Returns Some(region_id) if it's a inactive leader region. +/// +/// It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`. +pub fn staled_leader_regions( + node_id: u64, + region_id: RegionId, + region_leader_map: &HashMap, +) -> Option { + if let Some(peer) = region_leader_map.get(®ion_id.region_number()) { + // TODO(weny): treats the leader peer as inactive if it's readonly or downgraded. + if peer.id == node_id { + None + } else { + Some(region_id) + } + } else { + Some(region_id) + } +} + +#[cfg(test)] +mod tests { + + use common_meta::peer::Peer; + use store_api::storage::RegionId; + + use super::*; + + #[test] + fn test_inactive_leader_regions() { + let datanode_id = 1u64; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + let peer = Peer::empty(datanode_id); + + let region_leader_map = [(region_number, &peer)].into(); + + // Should be None, `region_id` is a active region of `peer`. + assert_eq!( + None, + staled_leader_regions(datanode_id, region_id, ®ion_leader_map) + ); + + // Should be Some(`region_id`), the inactive_leader_regions is empty. + assert_eq!( + Some(region_id), + staled_leader_regions(datanode_id, region_id, &Default::default()) + ); + + let another_peer = Peer::empty(datanode_id + 1); + let region_leader_map = [(region_number, &another_peer)].into(); + + // Should be Some(`region_id`), `region_id` is active region of `another_peer`. + assert_eq!( + Some(region_id), + staled_leader_regions(datanode_id, region_id, ®ion_leader_map) + ); + } +} diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 6a7f74d30de2..41b3bef065f8 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -19,7 +19,7 @@ use api::v1::Rows; use common_meta::key::table_route::TableRouteManager; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; -use common_meta::rpc::router::{convert_to_region_map, RegionRoutes}; +use common_meta::rpc::router::{convert_to_region_leader_map, RegionRoutes}; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; @@ -93,7 +93,7 @@ impl PartitionRuleManager { .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); let mut datanodes = HashMap::with_capacity(regions.len()); - let region_map = convert_to_region_map(&route.region_routes); + let region_map = convert_to_region_leader_map(&route.region_routes); for region in regions.iter() { let datanode = *region_map.get(region).context(error::FindDatanodeSnafu { table_id, diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 7f6762562a10..3dbeaa6e74a5 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -21,12 +21,13 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; +use serde::{Deserialize, Serialize}; use crate::metadata::RegionMetadataRef; use crate::region_request::RegionRequest; use crate::storage::{RegionId, ScanRequest}; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum RegionRole { // Readonly region(mito2), Readonly region(file). Follower, @@ -43,6 +44,15 @@ impl From for PbRegionRole { } } +impl From for RegionRole { + fn from(value: PbRegionRole) -> Self { + match value { + PbRegionRole::Leader => RegionRole::Leader, + PbRegionRole::Follower => RegionRole::Follower, + } + } +} + #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine