diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 5970868f1a4e..fa06558a9ba3 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -660,9 +660,7 @@ impl TableMetadataManager { .table_route_manager() .build_update_txn(table_id, ¤t_table_route_value, &new_table_route_value)?; - let txn = Txn::merge_all(vec![update_table_route_txn]); - - let r = self.kv_backend.txn(txn).await?; + let r = self.kv_backend.txn(update_table_route_txn).await?; // Checks whether metadata was already updated. if !r.succeeded { diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 21282e9ff54b..d8b0b7c50d67 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -373,12 +373,16 @@ impl CountdownTask { countdown.set(tokio::time::sleep_until(first_deadline)); }, Some(CountdownCommand::Reset((role, deadline))) => { + // The first-time granted regions might be ignored because the `first_deadline` is larger than the `region_lease_timeout`. + // Therefore, we set writable at the outside. + // TODO(weny): Considers setting `first_deadline` to `region_lease_timeout`. + let _ = self.region_server.set_writable(self.region_id, role.writable()); + if countdown.deadline() < deadline { trace!( "Reset deadline of region {region_id} to approximately {} seconds later", (deadline - Instant::now()).as_secs_f32(), ); - let _ = self.region_server.set_writable(self.region_id, role.writable()); countdown.set(tokio::time::sleep_until(deadline)); } // Else the countdown could be either: diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 2295a42c9c6f..609e806296d6 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -18,6 +18,7 @@ use api::v1::meta::HeartbeatRequest; use common_time::util as time_util; use serde::{Deserialize, Serialize}; use store_api::region_engine::RegionRole; +use store_api::storage::RegionId; use crate::error::{Error, InvalidHeartbeatRequestSnafu}; use crate::keys::StatKey; @@ -72,8 +73,12 @@ impl Stat { } } - pub fn region_ids(&self) -> Vec { - self.region_stats.iter().map(|s| s.id).collect() + /// Returns a tuple array containing [RegionId] and [RegionRole]. + pub fn regions(&self) -> Vec<(RegionId, RegionRole)> { + self.region_stats + .iter() + .map(|s| (RegionId::from(s.id), s.role)) + .collect() } pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet) { diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 96808dcd82a5..24827ef40f17 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -12,22 +12,63 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{GrantedRegion, HeartbeatRequest, RegionLease, RegionRole, Role}; +use std::collections::HashSet; +use std::sync::Arc; + +use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; +use common_meta::key::TableMetadataManagerRef; +use store_api::region_engine::{GrantedRegion, RegionRole}; +use store_api::storage::RegionId; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; -use crate::inactive_region_manager::InactiveRegionManager; use crate::metasrv::Context; +use crate::region::lease_keeper::RegionLeaseKeeperRef; +use crate::region::RegionLeaseKeeper; pub struct RegionLeaseHandler { region_lease_seconds: u64, + region_lease_keeper: RegionLeaseKeeperRef, } impl RegionLeaseHandler { - pub fn new(region_lease_seconds: u64) -> Self { + pub fn new(region_lease_seconds: u64, table_metadata_manager: TableMetadataManagerRef) -> Self { + let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager); + Self { region_lease_seconds, + region_lease_keeper: Arc::new(region_lease_keeper), + } + } +} + +fn flip_role(role: RegionRole) -> RegionRole { + match role { + RegionRole::Follower => RegionRole::Leader, + RegionRole::Leader => RegionRole::Follower, + } +} + +/// Grants lease of regions. +/// +/// - If a region is in an `operable` set, it will be granted an `flip_role(current)`([RegionRole]); +/// otherwise, it will be granted a `current`([RegionRole]). +/// - If a region is in a `closable` set, it won't be granted. +fn grant( + granted_regions: &mut Vec, + operable: &HashSet, + closable: &HashSet, + regions: &[RegionId], + current: RegionRole, +) { + for region in regions { + if operable.contains(region) { + granted_regions.push(GrantedRegion::new(*region, flip_role(current))); + } else if closable.contains(region) { + // Filters out the closable regions. + } else { + granted_regions.push(GrantedRegion::new(*region, current)) } } } @@ -41,31 +82,61 @@ impl HeartbeatHandler for RegionLeaseHandler { async fn handle( &self, req: &HeartbeatRequest, - ctx: &mut Context, + _ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { let Some(stat) = acc.stat.as_ref() else { return Ok(()); }; - let mut region_ids = stat.region_ids(); + let regions = stat.regions(); + let cluster_id = stat.cluster_id; + let datanode_id = stat.id; + let mut granted_regions = Vec::with_capacity(regions.len()); - let inactive_region_manager = InactiveRegionManager::new(&ctx.in_memory); - let inactive_region_ids = inactive_region_manager - .retain_active_regions(stat.cluster_id, stat.id, &mut region_ids) - .await?; - - let regions = region_ids + let (leaders, followers): (Vec<_>, Vec<_>) = regions .into_iter() - .map(|region_id| GrantedRegion { - region_id, - role: RegionRole::Leader.into(), + .map(|(id, role)| match role { + RegionRole::Follower => (None, Some(id)), + RegionRole::Leader => (Some(id), None), }) - .collect(); + .unzip(); + + let leaders = leaders.into_iter().flatten().collect::>(); + + let (downgradable, closable) = self + .region_lease_keeper + .find_staled_leader_regions(cluster_id, datanode_id, &leaders) + .await?; + + grant( + &mut granted_regions, + &downgradable, + &closable, + &leaders, + RegionRole::Leader, + ); + + let followers = followers.into_iter().flatten().collect::>(); + + let (upgradeable, closable) = self + .region_lease_keeper + .find_staled_follower_regions(cluster_id, datanode_id, &followers) + .await?; + + grant( + &mut granted_regions, + &upgradeable, + &closable, + &followers, + RegionRole::Follower, + ); - acc.inactive_region_ids = inactive_region_ids; acc.region_lease = Some(RegionLease { - regions, + regions: granted_regions + .into_iter() + .map(Into::into) + .collect::>(), duration_since_epoch: req.duration_since_epoch, lease_seconds: self.region_lease_seconds, }); @@ -76,101 +147,215 @@ impl HeartbeatHandler for RegionLeaseHandler { #[cfg(test)] mod test { + use std::collections::HashMap; use std::sync::Arc; - use api::v1::meta::RegionRole; + use common_meta::distributed_time_constants; + use common_meta::key::test_utils::new_test_table_info; use common_meta::key::TableMetadataManager; - use common_meta::{distributed_time_constants, RegionIdent}; - use store_api::storage::{RegionId, RegionNumber}; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use store_api::storage::RegionId; use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; - use crate::test_util; + + fn new_test_keeper() -> RegionLeaseKeeper { + let store = Arc::new(MemoryKvBackend::new()); + + let table_metadata_manager = Arc::new(TableMetadataManager::new(store)); + + RegionLeaseKeeper::new(table_metadata_manager) + } + + fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat { + RegionStat { + id: region_id.as_u64(), + role, + rcus: 0, + wcus: 0, + approximate_bytes: 0, + approximate_rows: 0, + engine: String::new(), + } + } #[tokio::test] - async fn test_handle_region_lease() { - let region_failover_manager = test_util::create_region_failover_manager(); - let kv_backend = region_failover_manager - .create_context() - .selector_ctx - .kv_backend - .clone(); - - let table_id = 1; - let table_name = "my_table"; - let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - test_util::prepare_table_region_and_info_value(&table_metadata_manager, table_name).await; + async fn test_handle_upgradable_follower() { + let datanode_id = 1; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let another_region_id = RegionId::new(table_id, region_number + 1); + let peer = Peer::empty(datanode_id); + let follower_peer = Peer::empty(datanode_id + 1); + let table_info = new_test_table_info(table_id, vec![region_number]).into(); + let cluster_id = 1; - let req = HeartbeatRequest { - duration_since_epoch: 1234, + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + follower_peers: vec![follower_peer.clone()], ..Default::default() - }; + }]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); let builder = MetaSrvBuilder::new(); let metasrv = builder.build().await.unwrap(); let ctx = &mut metasrv.new_ctx(); let acc = &mut HeartbeatAccumulator::default(); - let new_region_stat = |region_number: RegionNumber| -> RegionStat { - let region_id = RegionId::new(table_id, region_number); - RegionStat { - id: region_id.as_u64(), - rcus: 0, - wcus: 0, - approximate_bytes: 0, - approximate_rows: 0, - engine: String::new(), - role: RegionRole::Leader.into(), - } + + acc.stat = Some(Stat { + cluster_id, + id: peer.id, + region_stats: vec![ + new_empty_region_stat(region_id, RegionRole::Follower), + new_empty_region_stat(another_region_id, RegionRole::Follower), + ], + ..Default::default() + }); + + let req = HeartbeatRequest { + duration_since_epoch: 1234, + ..Default::default() }; + + let handler = RegionLeaseHandler::new( + distributed_time_constants::REGION_LEASE_SECS, + table_metadata_manager.clone(), + ); + + handler.handle(&req, ctx, acc).await.unwrap(); + + assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]); + + let acc = &mut HeartbeatAccumulator::default(); + acc.stat = Some(Stat { - cluster_id: 1, - id: 1, - region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)], + cluster_id, + id: follower_peer.id, + region_stats: vec![ + new_empty_region_stat(region_id, RegionRole::Follower), + new_empty_region_stat(another_region_id, RegionRole::Follower), + ], ..Default::default() }); - let inactive_region_manager = InactiveRegionManager::new(&ctx.in_memory); - inactive_region_manager - .register_inactive_region(&RegionIdent { - cluster_id: 1, - datanode_id: 1, - table_id: 1, - region_number: 1, - engine: "mito2".to_string(), - }) - .await - .unwrap(); - inactive_region_manager - .register_inactive_region(&RegionIdent { - cluster_id: 1, - datanode_id: 1, - table_id: 1, - region_number: 3, - engine: "mito2".to_string(), - }) - .await - .unwrap(); + handler.handle(&req, ctx, acc).await.unwrap(); + + assert_eq!( + acc.region_lease.as_ref().unwrap().lease_seconds, + distributed_time_constants::REGION_LEASE_SECS + ); - RegionLeaseHandler::new(distributed_time_constants::REGION_LEASE_SECS) - .handle(&req, ctx, acc) + assert_region_lease( + acc, + vec![GrantedRegion::new(region_id, RegionRole::Follower)], + ); + } + + #[tokio::test] + + async fn test_handle_downgradable_leader() { + let datanode_id = 1; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let another_region_id = RegionId::new(table_id, region_number + 1); + let no_exist_region_id = RegionId::new(table_id, region_number + 2); + let peer = Peer::empty(datanode_id); + let follower_peer = Peer::empty(datanode_id + 1); + let table_info = new_test_table_info(table_id, vec![region_number]).into(); + let cluster_id = 1; + + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + follower_peers: vec![follower_peer.clone()], + leader_status: Some(RegionStatus::Downgraded), + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + ]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata(table_info, region_routes) .await .unwrap(); - assert!(acc.region_lease.is_some()); - let lease = acc.region_lease.as_ref().unwrap(); - assert_eq!( - lease.regions, - vec![GrantedRegion { - region_id: RegionId::new(table_id, 2).as_u64(), - role: RegionRole::Leader.into() - }] + let builder = MetaSrvBuilder::new(); + let metasrv = builder.build().await.unwrap(); + let ctx = &mut metasrv.new_ctx(); + + let req = HeartbeatRequest { + duration_since_epoch: 1234, + ..Default::default() + }; + + let acc = &mut HeartbeatAccumulator::default(); + + acc.stat = Some(Stat { + cluster_id, + id: peer.id, + region_stats: vec![ + new_empty_region_stat(region_id, RegionRole::Leader), + new_empty_region_stat(another_region_id, RegionRole::Leader), + new_empty_region_stat(no_exist_region_id, RegionRole::Leader), + ], + ..Default::default() + }); + + let handler = RegionLeaseHandler::new( + distributed_time_constants::REGION_LEASE_SECS, + table_metadata_manager.clone(), ); - assert_eq!(lease.duration_since_epoch, 1234); - assert_eq!( - lease.lease_seconds, - distributed_time_constants::REGION_LEASE_SECS + + handler.handle(&req, ctx, acc).await.unwrap(); + + assert_region_lease( + acc, + vec![ + GrantedRegion::new(region_id, RegionRole::Follower), + GrantedRegion::new(another_region_id, RegionRole::Leader), + ], ); } + + fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec) { + let region_lease = acc.region_lease.as_ref().unwrap().clone(); + let granted: Vec = region_lease + .regions + .into_iter() + .map(Into::into) + .collect::>(); + + let granted = granted + .into_iter() + .map(|region| (region.region_id, region)) + .collect::>(); + + let expected = expected + .into_iter() + .map(|region| (region.region_id, region)) + .collect::>(); + + assert_eq!(granted, expected); + } } diff --git a/src/meta-srv/src/inactive_region_manager.rs b/src/meta-srv/src/inactive_region_manager.rs deleted file mode 100644 index 273aad844b4d..000000000000 --- a/src/meta-srv/src/inactive_region_manager.rs +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; - -use common_meta::kv_backend::ResettableKvBackendRef; -use common_meta::rpc::store::{BatchGetRequest, DeleteRangeRequest, PutRequest, RangeRequest}; -use common_meta::RegionIdent; -use snafu::ResultExt; - -use crate::error::{self, Result}; -use crate::keys::InactiveRegionKey; -use crate::metrics::METRIC_META_INACTIVE_REGIONS; - -pub struct InactiveRegionManager<'a> { - store: &'a ResettableKvBackendRef, -} - -impl<'a> InactiveRegionManager<'a> { - pub fn new(store: &'a ResettableKvBackendRef) -> Self { - Self { store } - } - - pub async fn register_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> { - let region_id = region_ident.get_region_id().as_u64(); - let key = InactiveRegionKey { - cluster_id: region_ident.cluster_id, - node_id: region_ident.datanode_id, - region_id, - }; - let req = PutRequest { - key: key.into(), - value: vec![], - prev_kv: false, - }; - self.store.put(req).await.context(error::KvBackendSnafu)?; - - METRIC_META_INACTIVE_REGIONS.inc(); - - Ok(()) - } - - pub async fn deregister_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> { - let region_id = region_ident.get_region_id().as_u64(); - let key: Vec = InactiveRegionKey { - cluster_id: region_ident.cluster_id, - node_id: region_ident.datanode_id, - region_id, - } - .into(); - self.store - .delete(&key, false) - .await - .context(error::KvBackendSnafu)?; - - METRIC_META_INACTIVE_REGIONS.dec(); - - Ok(()) - } - - /// The input is a list of regions on a specific node. If one or more regions have been - /// set to inactive state by metasrv, the corresponding regions will be removed(update the - /// `region_ids`), then returns the removed regions. - pub async fn retain_active_regions( - &self, - cluster_id: u64, - node_id: u64, - region_ids: &mut Vec, - ) -> Result> { - let key_region_ids = region_ids - .iter() - .map(|region_id| { - ( - InactiveRegionKey { - cluster_id, - node_id, - region_id: *region_id, - } - .into(), - *region_id, - ) - }) - .collect::, _)>>(); - let keys = key_region_ids.iter().map(|(key, _)| key.clone()).collect(); - let resp = self - .store - .batch_get(BatchGetRequest { keys }) - .await - .context(error::KvBackendSnafu)?; - let kvs = resp.kvs; - if kvs.is_empty() { - return Ok(HashSet::new()); - } - - let inactive_keys = kvs.into_iter().map(|kv| kv.key).collect::>(); - let (active_region_ids, inactive_region_ids): (Vec>, Vec>) = - key_region_ids - .into_iter() - .map(|(key, region_id)| { - let is_active = !inactive_keys.contains(&key); - if is_active { - (Some(region_id), None) - } else { - (None, Some(region_id)) - } - }) - .unzip(); - *region_ids = active_region_ids.into_iter().flatten().collect(); - - Ok(inactive_region_ids.into_iter().flatten().collect()) - } - - /// Scan all inactive regions in the cluster. - /// - /// When will these data appear? - /// Generally, it is because the corresponding Datanode is disconnected and - /// did not respond to the `Failover` scheduling instructions of metasrv. - pub async fn scan_all_inactive_regions( - &self, - cluster_id: u64, - ) -> Result> { - let prefix = InactiveRegionKey::get_prefix_by_cluster(cluster_id); - let request = RangeRequest::new().with_prefix(prefix); - let resp = self - .store - .range(request) - .await - .context(error::KvBackendSnafu)?; - let kvs = resp.kvs; - kvs.into_iter() - .map(|kv| InactiveRegionKey::try_from(kv.key)) - .collect::>>() - } - - pub async fn clear_all_inactive_regions(&self, cluster_id: u64) -> Result<()> { - let prefix = InactiveRegionKey::get_prefix_by_cluster(cluster_id); - let request = DeleteRangeRequest::new().with_prefix(prefix); - let _ = self - .store - .delete_range(request) - .await - .context(error::KvBackendSnafu)?; - Ok(()) - } -} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 14af05f8c0c5..b30c6779b36a 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -40,8 +40,6 @@ pub mod table_meta_alloc; pub use crate::error::Result; -mod inactive_region_manager; - mod greptimedb_telemetry; #[cfg(test)] diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 8ad55b799918..12550f34f60e 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -168,7 +168,6 @@ impl MetaSrvBuilder { state.clone(), kv_backend.clone(), )); - let kv_backend = leader_cached_kv_backend.clone() as _; let meta_peer_client = meta_peer_client .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory)); @@ -177,7 +176,9 @@ impl MetaSrvBuilder { let mailbox = build_mailbox(&kv_backend, &pushers); let procedure_manager = build_procedure_manager(&options, &kv_backend); let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend.clone())); - let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let table_metadata_manager = Arc::new(TableMetadataManager::new( + leader_cached_kv_backend.clone() as _, + )); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); let selector_ctx = SelectorContext { server_addr: options.server_addr.clone(), @@ -227,8 +228,10 @@ impl MetaSrvBuilder { .and_then(|plugins| plugins.get::()) .map(|publish| PublishHeartbeatHandler::new(publish.clone())); - let region_lease_handler = - RegionLeaseHandler::new(distributed_time_constants::REGION_LEASE_SECS); + let region_lease_handler = RegionLeaseHandler::new( + distributed_time_constants::REGION_LEASE_SECS, + table_metadata_manager.clone(), + ); let group = HeartbeatHandlerGroup::new(pushers); group.add_handler(ResponseHeaderHandler).await; diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index a09bb1c2c0c4..fde254ea7084 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -270,8 +270,6 @@ trait State: Sync + Send + Debug { fn status(&self) -> Status { Status::executing(true) } - - fn remark_inactive_region_if_needed(&mut self) {} } /// The states transition of region failover procedure: @@ -341,11 +339,7 @@ impl RegionFailoverProcedure { } fn from_json(json: &str, context: RegionFailoverContext) -> ProcedureResult { - let mut node: Node = serde_json::from_str(json).context(FromJsonSnafu)?; - // If the meta leader node dies during the execution of the procedure, - // the new leader node needs to remark the failed region as "inactive" - // to prevent it from renewing the lease. - node.state.remark_inactive_region_if_needed(); + let node: Node = serde_json::from_str(json).context(FromJsonSnafu)?; Ok(Self { node, context }) } } diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index 69dc51334358..b758524018b9 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -31,7 +31,6 @@ use crate::error::{ self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, }; use crate::handler::HeartbeatMailbox; -use crate::inactive_region_manager::InactiveRegionManager; use crate::procedure::region_failover::OPEN_REGION_MESSAGE_TIMEOUT; use crate::service::mailbox::{Channel, MailboxReceiver}; @@ -104,17 +103,6 @@ impl ActivateRegion { input: instruction.to_string(), })?; - // Ensure that metasrv will renew the lease for this candidate node. - // - // This operation may not be redundant, imagine the following scenario: - // This candidate once had the current region, and because it did not respond to the `close` - // command in time, it was considered an inactive node by metasrv, then it replied, and the - // current region failed over again, and the node was selected as a candidate, so it needs - // to clear its previous state first. - InactiveRegionManager::new(&ctx.in_memory) - .deregister_inactive_region(&candidate_ident) - .await?; - let ch = Channel::Datanode(self.candidate.id); ctx.mailbox.send(&ch, msg, timeout).await } @@ -182,23 +170,12 @@ impl State for ActivateRegion { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { - if self.remark_inactive_region { - // Remark the fail region as inactive to prevent it from renewing the lease. - InactiveRegionManager::new(&ctx.in_memory) - .register_inactive_region(failed_region) - .await?; - } - let mailbox_receiver = self .send_open_region_message(ctx, failed_region, OPEN_REGION_MESSAGE_TIMEOUT) .await?; self.handle_response(mailbox_receiver, failed_region).await } - - fn remark_inactive_region_if_needed(&mut self) { - self.remark_inactive_region = true; - } } #[cfg(test)] diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index d24ae9f68b8c..04b3ccde97e4 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -30,7 +30,6 @@ use crate::error::{ self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, }; use crate::handler::HeartbeatMailbox; -use crate::inactive_region_manager::InactiveRegionManager; use crate::service::mailbox::{Channel, MailboxReceiver}; #[derive(Serialize, Deserialize, Debug)] @@ -91,22 +90,13 @@ impl DeactivateRegion { })?; let ch = Channel::Datanode(failed_region.datanode_id); - // Mark the region as inactive - InactiveRegionManager::new(&ctx.in_memory) - .register_inactive_region(failed_region) - .await?; - // We first marked the region as inactive, which means that the failed region cannot - // be successfully renewed from now on, so after the lease time is exceeded, the region - // will be automatically closed. - // If the deadline is exceeded, we can proceed to the next step with confidence, - // as the expiration means that the region has been closed. let timeout = Duration::from_secs(ctx.region_lease_secs); ctx.mailbox.send(&ch, msg, timeout).await } async fn handle_response( &self, - ctx: &RegionFailoverContext, + _ctx: &RegionFailoverContext, mailbox_receiver: MailboxReceiver, failed_region: &RegionIdent, ) -> Result> { @@ -123,10 +113,6 @@ impl DeactivateRegion { .fail(); }; if result { - InactiveRegionManager::new(&ctx.in_memory) - .deregister_inactive_region(failed_region) - .await?; - Ok(Box::new(ActivateRegion::new(self.candidate.clone()))) } else { // Under rare circumstances would a Datanode fail to close a Region. diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index a8d7c8eeca12..76f5f57dd866 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -16,6 +16,7 @@ pub mod mito; pub mod utils; use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManagerRef; @@ -26,6 +27,8 @@ use self::mito::find_staled_leader_regions; use crate::error::{self, Result}; use crate::region::lease_keeper::utils::find_staled_follower_regions; +pub type RegionLeaseKeeperRef = Arc; + pub struct RegionLeaseKeeper { table_metadata_manager: TableMetadataManagerRef, } diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index c7eac3af4231..a5867e376924 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -14,7 +14,6 @@ mod health; mod heartbeat; -mod inactive_regions; mod leader; mod meta; mod node_lease; @@ -91,20 +90,6 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { .route("/route", handler.clone()) .route("/route/help", handler); - let router = router.route( - "/inactive-regions/view", - inactive_regions::ViewInactiveRegionsHandler { - store: meta_srv.in_memory().clone(), - }, - ); - - let router = router.route( - "/inactive-regions/clear", - inactive_regions::ClearInactiveRegionsHandler { - store: meta_srv.in_memory().clone(), - }, - ); - let router = Router::nest("/admin", router); Admin::new(router) diff --git a/src/meta-srv/src/service/admin/inactive_regions.rs b/src/meta-srv/src/service/admin/inactive_regions.rs deleted file mode 100644 index 6c3c4184903e..000000000000 --- a/src/meta-srv/src/service/admin/inactive_regions.rs +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use common_meta::kv_backend::ResettableKvBackendRef; -use serde::{Deserialize, Serialize}; -use snafu::ResultExt; -use tonic::codegen::http; - -use crate::error::{self, Result}; -use crate::inactive_region_manager::InactiveRegionManager; -use crate::keys::InactiveRegionKey; -use crate::service::admin::{util, HttpHandler}; - -pub struct ViewInactiveRegionsHandler { - pub store: ResettableKvBackendRef, -} - -#[async_trait::async_trait] -impl HttpHandler for ViewInactiveRegionsHandler { - async fn handle( - &self, - _: &str, - params: &HashMap, - ) -> Result> { - let cluster_id = util::extract_cluster_id(params)?; - - let inactive_region_manager = InactiveRegionManager::new(&self.store); - let inactive_regions = inactive_region_manager - .scan_all_inactive_regions(cluster_id) - .await?; - let result = InactiveRegions { inactive_regions }.try_into()?; - - http::Response::builder() - .status(http::StatusCode::OK) - .body(result) - .context(error::InvalidHttpBodySnafu) - } -} - -pub struct ClearInactiveRegionsHandler { - pub store: ResettableKvBackendRef, -} - -#[async_trait::async_trait] -impl HttpHandler for ClearInactiveRegionsHandler { - async fn handle( - &self, - _: &str, - params: &HashMap, - ) -> Result> { - let cluster_id = util::extract_cluster_id(params)?; - - let inactive_region_manager = InactiveRegionManager::new(&self.store); - inactive_region_manager - .clear_all_inactive_regions(cluster_id) - .await?; - - Ok(http::Response::builder() - .status(http::StatusCode::OK) - .body("Success\n".to_owned()) - .unwrap()) - } -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(transparent)] -struct InactiveRegions { - inactive_regions: Vec, -} - -impl TryFrom for String { - type Error = error::Error; - - fn try_from(value: InactiveRegions) -> Result { - serde_json::to_string(&value).context(error::SerializeToJsonSnafu { - input: format!("{value:?}"), - }) - } -} diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index bac3df5bf458..f300931bb109 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -16,7 +16,7 @@ use std::sync::Arc; -use api::greptime_proto::v1::meta::RegionRole as PbRegionRole; +use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole}; use async_trait::async_trait; use common_error::ext::BoxedError; use common_query::Output; @@ -27,6 +27,38 @@ use crate::metadata::RegionMetadataRef; use crate::region_request::RegionRequest; use crate::storage::{RegionId, ScanRequest}; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct GrantedRegion { + pub region_id: RegionId, + pub region_role: RegionRole, +} +impl GrantedRegion { + pub fn new(region_id: RegionId, region_role: RegionRole) -> Self { + Self { + region_id, + region_role, + } + } +} + +impl From for PbGrantedRegion { + fn from(value: GrantedRegion) -> Self { + PbGrantedRegion { + region_id: value.region_id.as_u64(), + role: PbRegionRole::from(value.region_role).into(), + } + } +} + +impl From for GrantedRegion { + fn from(value: PbGrantedRegion) -> Self { + GrantedRegion { + region_id: RegionId::from_u64(value.region_id), + region_role: value.role().into(), + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum RegionRole { // Readonly region(mito2), Readonly region(file).