diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 2295a42c9c6f..f17d7a2d8ff7 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -18,6 +18,7 @@ use api::v1::meta::HeartbeatRequest; use common_time::util as time_util; use serde::{Deserialize, Serialize}; use store_api::region_engine::RegionRole; +use store_api::storage::RegionId; use crate::error::{Error, InvalidHeartbeatRequestSnafu}; use crate::keys::StatKey; @@ -72,8 +73,12 @@ impl Stat { } } - pub fn region_ids(&self) -> Vec { - self.region_stats.iter().map(|s| s.id).collect() + /// Returns a tuple array containing [RegionId] and [RegionRole]. + pub fn regions(&self) -> Vec<(RegionId, RegionRole)> { + self.region_stats + .iter() + .map(|s| (RegionId::from(s.id), s.role.into())) + .collect() } pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet) { diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 96808dcd82a5..cae254cbc262 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -12,22 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{GrantedRegion, HeartbeatRequest, RegionLease, RegionRole, Role}; +use std::sync::Arc; + +use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; +use common_meta::key::TableMetadataManagerRef; +use store_api::region_engine::{GrantedRegion, RegionRole}; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; -use crate::inactive_region_manager::InactiveRegionManager; use crate::metasrv::Context; +use crate::region::lease_keeper::RegionLeaseKeeperRef; +use crate::region::RegionLeaseKeeper; pub struct RegionLeaseHandler { region_lease_seconds: u64, + region_lease_keeper: RegionLeaseKeeperRef, } impl RegionLeaseHandler { - pub fn new(region_lease_seconds: u64) -> Self { + pub fn new(region_lease_seconds: u64, table_metadata_manager: TableMetadataManagerRef) -> Self { + let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager); + Self { region_lease_seconds, + region_lease_keeper: Arc::new(region_lease_keeper), } } } @@ -41,31 +50,65 @@ impl HeartbeatHandler for RegionLeaseHandler { async fn handle( &self, req: &HeartbeatRequest, - ctx: &mut Context, + _ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { let Some(stat) = acc.stat.as_ref() else { return Ok(()); }; - let mut region_ids = stat.region_ids(); - - let inactive_region_manager = InactiveRegionManager::new(&ctx.in_memory); - let inactive_region_ids = inactive_region_manager - .retain_active_regions(stat.cluster_id, stat.id, &mut region_ids) - .await?; + let regions = stat.regions(); + let cluster_id = stat.cluster_id; + let datanode_id = stat.id; + let mut granted_regions = Vec::with_capacity(regions.len()); - let regions = region_ids + let (leaders, followers): (Vec<_>, Vec<_>) = regions .into_iter() - .map(|region_id| GrantedRegion { - region_id, - role: RegionRole::Leader.into(), + .map(|(id, role)| match role { + RegionRole::Follower => (None, Some(id)), + RegionRole::Leader => (Some(id), None), }) - .collect(); + .unzip(); + + let leaders = leaders.into_iter().flatten().collect::>(); + + let (downgradable, closable) = self + .region_lease_keeper + .find_staled_leader_regions(cluster_id, datanode_id, &leaders) + .await?; + + for leader in leaders { + if downgradable.contains(&leader) { + granted_regions.push(GrantedRegion::new(leader, RegionRole::Follower)) + } else if closable.contains(&leader) { + // Filters out the closable regions. + } else { + granted_regions.push(GrantedRegion::new(leader, RegionRole::Leader)) + } + } + + let followers = followers.into_iter().flatten().collect::>(); + + let (upgradeable, closable) = self + .region_lease_keeper + .find_staled_follower_regions(cluster_id, datanode_id, &followers) + .await?; + + for follower in followers { + if upgradeable.contains(&follower) { + granted_regions.push(GrantedRegion::new(follower, RegionRole::Leader)) + } else if closable.contains(&follower) { + // Filters out the closable regions. + } else { + granted_regions.push(GrantedRegion::new(follower, RegionRole::Follower)) + } + } - acc.inactive_region_ids = inactive_region_ids; acc.region_lease = Some(RegionLease { - regions, + regions: granted_regions + .into_iter() + .map(Into::into) + .collect::>(), duration_since_epoch: req.duration_since_epoch, lease_seconds: self.region_lease_seconds, }); @@ -76,101 +119,218 @@ impl HeartbeatHandler for RegionLeaseHandler { #[cfg(test)] mod test { + use std::collections::HashMap; use std::sync::Arc; - use api::v1::meta::RegionRole; + use common_meta::distributed_time_constants; + use common_meta::key::test_utils::new_test_table_info; use common_meta::key::TableMetadataManager; - use common_meta::{distributed_time_constants, RegionIdent}; - use store_api::storage::{RegionId, RegionNumber}; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use store_api::storage::RegionId; use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; - use crate::test_util; + + fn new_test_keeper() -> RegionLeaseKeeper { + let store = Arc::new(MemoryKvBackend::new()); + + let table_metadata_manager = Arc::new(TableMetadataManager::new(store)); + + RegionLeaseKeeper::new(table_metadata_manager) + } + + fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat { + RegionStat { + id: region_id.as_u64(), + role: role.into(), + rcus: 0, + wcus: 0, + approximate_bytes: 0, + approximate_rows: 0, + engine: String::new(), + } + } #[tokio::test] - async fn test_handle_region_lease() { - let region_failover_manager = test_util::create_region_failover_manager(); - let kv_backend = region_failover_manager - .create_context() - .selector_ctx - .kv_backend - .clone(); - - let table_id = 1; - let table_name = "my_table"; - let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - test_util::prepare_table_region_and_info_value(&table_metadata_manager, table_name).await; + async fn test_handle_upgradable_follower() { + let datanode_id = 1; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let another_region_id = RegionId::new(table_id, region_number + 1); + let peer = Peer::empty(datanode_id); + let follower_peer = Peer::empty(datanode_id + 1); + let table_info = new_test_table_info(table_id, vec![region_number]).into(); + let cluster_id = 1; - let req = HeartbeatRequest { - duration_since_epoch: 1234, + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + follower_peers: vec![follower_peer.clone()], ..Default::default() - }; + }]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); let builder = MetaSrvBuilder::new(); let metasrv = builder.build().await.unwrap(); let ctx = &mut metasrv.new_ctx(); let acc = &mut HeartbeatAccumulator::default(); - let new_region_stat = |region_number: RegionNumber| -> RegionStat { - let region_id = RegionId::new(table_id, region_number); - RegionStat { - id: region_id.as_u64(), - rcus: 0, - wcus: 0, - approximate_bytes: 0, - approximate_rows: 0, - engine: String::new(), - role: RegionRole::Leader.into(), - } + + acc.stat = Some(Stat { + cluster_id, + id: peer.id, + region_stats: vec![ + new_empty_region_stat(region_id, RegionRole::Follower), + new_empty_region_stat(another_region_id, RegionRole::Follower), + ], + ..Default::default() + }); + + let req = HeartbeatRequest { + duration_since_epoch: 1234, + ..Default::default() }; + + let handler = RegionLeaseHandler::new( + distributed_time_constants::REGION_LEASE_SECS, + table_metadata_manager.clone(), + ); + + handler.handle(&req, ctx, acc).await.unwrap(); + + assert_region_lease( + &acc, + vec![GrantedRegion::new(region_id, RegionRole::Leader)], + ); + + let acc = &mut HeartbeatAccumulator::default(); + acc.stat = Some(Stat { - cluster_id: 1, - id: 1, - region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)], + cluster_id, + id: follower_peer.id, + region_stats: vec![ + new_empty_region_stat(region_id, RegionRole::Follower), + new_empty_region_stat(another_region_id, RegionRole::Follower), + ], ..Default::default() }); - let inactive_region_manager = InactiveRegionManager::new(&ctx.in_memory); - inactive_region_manager - .register_inactive_region(&RegionIdent { - cluster_id: 1, - datanode_id: 1, - table_id: 1, - region_number: 1, - engine: "mito2".to_string(), - }) - .await - .unwrap(); - inactive_region_manager - .register_inactive_region(&RegionIdent { - cluster_id: 1, - datanode_id: 1, - table_id: 1, - region_number: 3, - engine: "mito2".to_string(), - }) - .await - .unwrap(); + handler.handle(&req, ctx, acc).await.unwrap(); - RegionLeaseHandler::new(distributed_time_constants::REGION_LEASE_SECS) - .handle(&req, ctx, acc) + assert_eq!( + acc.region_lease.as_ref().unwrap().lease_seconds, + distributed_time_constants::REGION_LEASE_SECS + ); + + assert_region_lease( + &acc, + vec![GrantedRegion::new(region_id, RegionRole::Follower)], + ); + } + + #[tokio::test] + + async fn test_handle_downgradable_leader() { + let datanode_id = 1; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let another_region_id = RegionId::new(table_id, region_number + 1); + let no_exist_region_id = RegionId::new(table_id, region_number + 2); + let peer = Peer::empty(datanode_id); + let follower_peer = Peer::empty(datanode_id + 1); + let table_info = new_test_table_info(table_id, vec![region_number]).into(); + let cluster_id = 1; + + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + follower_peers: vec![follower_peer.clone()], + leader_status: Some(RegionStatus::Downgraded), + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + ]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata(table_info, region_routes) .await .unwrap(); - assert!(acc.region_lease.is_some()); - let lease = acc.region_lease.as_ref().unwrap(); - assert_eq!( - lease.regions, - vec![GrantedRegion { - region_id: RegionId::new(table_id, 2).as_u64(), - role: RegionRole::Leader.into() - }] + let builder = MetaSrvBuilder::new(); + let metasrv = builder.build().await.unwrap(); + let ctx = &mut metasrv.new_ctx(); + + let req = HeartbeatRequest { + duration_since_epoch: 1234, + ..Default::default() + }; + + let acc = &mut HeartbeatAccumulator::default(); + + acc.stat = Some(Stat { + cluster_id, + id: peer.id, + region_stats: vec![ + new_empty_region_stat(region_id, RegionRole::Leader), + new_empty_region_stat(another_region_id, RegionRole::Leader), + new_empty_region_stat(no_exist_region_id, RegionRole::Leader), + ], + ..Default::default() + }); + + let handler = RegionLeaseHandler::new( + distributed_time_constants::REGION_LEASE_SECS, + table_metadata_manager.clone(), ); - assert_eq!(lease.duration_since_epoch, 1234); - assert_eq!( - lease.lease_seconds, - distributed_time_constants::REGION_LEASE_SECS + + handler.handle(&req, ctx, acc).await.unwrap(); + + assert_region_lease( + &acc, + vec![ + GrantedRegion::new(region_id, RegionRole::Follower), + GrantedRegion::new(another_region_id, RegionRole::Leader), + ], ); } + + fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec) { + let region_lease = acc.region_lease.as_ref().unwrap().clone(); + let granted: Vec = region_lease + .regions + .into_iter() + .map(Into::into) + .collect::>(); + + let granted = granted + .into_iter() + .map(|region| (region.region_id, region)) + .collect::>(); + + let expected = expected + .into_iter() + .map(|region| (region.region_id, region)) + .collect::>(); + + assert_eq!(granted, expected); + } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 8ad55b799918..1f5b5aa73f42 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -227,8 +227,10 @@ impl MetaSrvBuilder { .and_then(|plugins| plugins.get::()) .map(|publish| PublishHeartbeatHandler::new(publish.clone())); - let region_lease_handler = - RegionLeaseHandler::new(distributed_time_constants::REGION_LEASE_SECS); + let region_lease_handler = RegionLeaseHandler::new( + distributed_time_constants::REGION_LEASE_SECS, + table_metadata_manager.clone(), + ); let group = HeartbeatHandlerGroup::new(pushers); group.add_handler(ResponseHeaderHandler).await; diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index a8d7c8eeca12..76f5f57dd866 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -16,6 +16,7 @@ pub mod mito; pub mod utils; use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManagerRef; @@ -26,6 +27,8 @@ use self::mito::find_staled_leader_regions; use crate::error::{self, Result}; use crate::region::lease_keeper::utils::find_staled_follower_regions; +pub type RegionLeaseKeeperRef = Arc; + pub struct RegionLeaseKeeper { table_metadata_manager: TableMetadataManagerRef, } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index bac3df5bf458..f300931bb109 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -16,7 +16,7 @@ use std::sync::Arc; -use api::greptime_proto::v1::meta::RegionRole as PbRegionRole; +use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole}; use async_trait::async_trait; use common_error::ext::BoxedError; use common_query::Output; @@ -27,6 +27,38 @@ use crate::metadata::RegionMetadataRef; use crate::region_request::RegionRequest; use crate::storage::{RegionId, ScanRequest}; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct GrantedRegion { + pub region_id: RegionId, + pub region_role: RegionRole, +} +impl GrantedRegion { + pub fn new(region_id: RegionId, region_role: RegionRole) -> Self { + Self { + region_id, + region_role, + } + } +} + +impl From for PbGrantedRegion { + fn from(value: GrantedRegion) -> Self { + PbGrantedRegion { + region_id: value.region_id.as_u64(), + role: PbRegionRole::from(value.region_role).into(), + } + } +} + +impl From for GrantedRegion { + fn from(value: PbGrantedRegion) -> Self { + GrantedRegion { + region_id: RegionId::from_u64(value.region_id), + region_role: value.role().into(), + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum RegionRole { // Readonly region(mito2), Readonly region(file).