diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index ddd6f88a9bd7..130f776dd539 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -135,7 +135,7 @@ use crate::rpc::store::BatchDeleteRequest; use crate::DatanodeId; pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*"; -pub const MAINTENANCE_KEY: &str = "maintenance"; +pub const MAINTENANCE_KEY: &str = "__maintenance"; const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index e544cececec4..a68b93597737 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -33,10 +33,11 @@ use crate::kv_backend::KvBackendRef; use crate::node_manager::{ Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef, }; -use crate::peer::{Peer, StandalonePeerLookupService}; +use crate::peer::{Peer, PeerLookupService, StandalonePeerLookupService}; use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; use crate::wal_options_allocator::WalOptionsAllocator; +use crate::{ClusterId, DatanodeId, FlownodeId}; #[async_trait::async_trait] pub trait MockDatanodeHandler: Sync + Send + Clone { @@ -183,3 +184,16 @@ pub fn new_ddl_context_with_kv_backend( peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), } } + +pub struct NoopPeerLookupService; + +#[async_trait::async_trait] +impl PeerLookupService for NoopPeerLookupService { + async fn datanode(&self, _cluster_id: ClusterId, id: DatanodeId) -> Result> { + Ok(Some(Peer::empty(id))) + } + + async fn flownode(&self, _cluster_id: ClusterId, id: FlownodeId) -> Result> { + Ok(Some(Peer::empty(id))) + } +} diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index ac55127894b6..09b03c5b7dca 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -87,6 +87,11 @@ impl WalOptionsAllocator { } } } + + /// Returns true if it's the remote WAL. + pub fn is_remote_wal(&self) -> bool { + matches!(&self, WalOptionsAllocator::Kafka(_)) + } } /// Allocates a wal options for each region. The allocated wal options is encoded immediately. diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 00946fefba8c..5c38156a71f0 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -39,6 +39,14 @@ pub enum Error { peer_id: u64, }, + #[snafu(display("Failed to lookup peer: {}", peer_id))] + LookupPeer { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + peer_id: u64, + }, + #[snafu(display("Another migration procedure is running for region: {}", region_id))] MigrationRunning { #[snafu(implicit)] @@ -972,6 +980,7 @@ impl ErrorExt for Error { } Error::Other { source, .. } => source.status_code(), + Error::LookupPeer { source, .. } => source.status_code(), } } diff --git a/src/meta-srv/src/failure_detector.rs b/src/meta-srv/src/failure_detector.rs index fd94e58459d1..bc7f4972fd00 100644 --- a/src/meta-srv/src/failure_detector.rs +++ b/src/meta-srv/src/failure_detector.rs @@ -63,7 +63,7 @@ pub(crate) struct PhiAccrualFailureDetector { last_heartbeat_millis: Option, } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct PhiAccrualFailureDetectorOptions { pub threshold: f32, @@ -195,7 +195,7 @@ fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 { /// It is capped by the number of samples specified in `max_sample_size`. /// /// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory. -#[derive(Clone)] +#[derive(Debug, Clone)] struct HeartbeatHistory { /// Number of samples to use for calculation of mean and standard deviation of inter-arrival /// times. diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index dffeb2eb1816..efef360a204a 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -12,48 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod runner; - -use std::sync::Arc; - use api::v1::meta::{HeartbeatRequest, Role}; use async_trait::async_trait; -use common_meta::RegionIdent; +use common_telemetry::info; use crate::error::Result; -use crate::failure_detector::PhiAccrualFailureDetectorOptions; -use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner}; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; -use crate::metasrv::{Context, ElectionRef}; -use crate::procedure::region_failover::RegionFailoverManager; - -pub(crate) struct DatanodeHeartbeat { - region_idents: Vec, - heartbeat_time: i64, -} +use crate::metasrv::Context; +use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor}; pub struct RegionFailureHandler { - failure_detect_runner: FailureDetectRunner, + heartbeat_acceptor: HeartbeatAcceptor, } impl RegionFailureHandler { - pub(crate) async fn try_new( - election: Option, - region_failover_manager: Arc, - failure_detector_options: PhiAccrualFailureDetectorOptions, - ) -> Result { - region_failover_manager.try_start()?; - - let mut failure_detect_runner = FailureDetectRunner::new( - election, - region_failover_manager.clone(), - failure_detector_options, - ); - failure_detect_runner.start().await; - - Ok(Self { - failure_detect_runner, - }) + pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self { + let heartbeat_acceptor = region_supervisor.heartbeat_acceptor(); + info!("Starting region supervisor"); + common_runtime::spawn_bg(async move { region_supervisor.run().await }); + Self { heartbeat_acceptor } } } @@ -66,38 +43,16 @@ impl HeartbeatHandler for RegionFailureHandler { async fn handle( &self, _: &HeartbeatRequest, - ctx: &mut Context, + _ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result { - if ctx.is_infancy { - self.failure_detect_runner - .send_control(FailureDetectControl::Purge) - .await; - } - let Some(stat) = acc.stat.as_ref() else { return Ok(HandleControl::Continue); }; - let heartbeat = DatanodeHeartbeat { - region_idents: stat - .region_stats - .iter() - .map(|x| { - let region_id = x.id; - RegionIdent { - cluster_id: stat.cluster_id, - datanode_id: stat.id, - table_id: region_id.table_id(), - region_number: region_id.region_number(), - engine: x.engine.clone(), - } - }) - .collect(), - heartbeat_time: stat.timestamp_millis, - }; - - self.failure_detect_runner.send_heartbeat(heartbeat).await; + self.heartbeat_acceptor + .accept(DatanodeHeartbeat::from(stat)) + .await; Ok(HandleControl::Continue) } @@ -105,34 +60,28 @@ impl HeartbeatHandler for RegionFailureHandler { #[cfg(test)] mod tests { - use std::assert_matches::assert_matches; - + use api::v1::meta::HeartbeatRequest; use common_catalog::consts::default_engine; - use common_meta::key::MAINTENANCE_KEY; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; + use tokio::sync::oneshot; - use super::*; + use crate::handler::failure_handler::RegionFailureHandler; use crate::handler::node_stat::{RegionStat, Stat}; + use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::builder::MetasrvBuilder; - use crate::test_util::create_region_failover_manager; + use crate::region::supervisor::tests::new_test_supervisor; + use crate::region::supervisor::Event; - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn test_handle_heartbeat() { - let region_failover_manager = create_region_failover_manager(); - let failure_detector_options = PhiAccrualFailureDetectorOptions::default(); - let handler = - RegionFailureHandler::try_new(None, region_failover_manager, failure_detector_options) - .await - .unwrap(); - + let supervisor = new_test_supervisor(); + let sender = supervisor.sender(); + let handler = RegionFailureHandler::new(supervisor); let req = &HeartbeatRequest::default(); - let builder = MetasrvBuilder::new(); let metasrv = builder.build().await.unwrap(); let mut ctx = metasrv.new_ctx(); - ctx.is_infancy = false; - let acc = &mut HeartbeatAccumulator::default(); fn new_region_stat(region_id: u64) -> RegionStat { RegionStat { @@ -153,48 +102,9 @@ mod tests { }); handler.handle(req, &mut ctx, acc).await.unwrap(); - - let dump = handler.failure_detect_runner.dump().await; - assert_eq!(dump.iter().collect::>().len(), 3); - - // infancy makes heartbeats re-accumulated - ctx.is_infancy = true; - acc.stat = None; - handler.handle(req, &mut ctx, acc).await.unwrap(); - let dump = handler.failure_detect_runner.dump().await; - assert_eq!(dump.iter().collect::>().len(), 0); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_maintenance_mode() { - let region_failover_manager = create_region_failover_manager(); - let kv_backend = region_failover_manager.create_context().kv_backend.clone(); - let _handler = RegionFailureHandler::try_new( - None, - region_failover_manager.clone(), - PhiAccrualFailureDetectorOptions::default(), - ) - .await - .unwrap(); - - let kv_req = common_meta::rpc::store::PutRequest { - key: Vec::from(MAINTENANCE_KEY), - value: vec![], - prev_kv: false, - }; - let _ = kv_backend.put(kv_req.clone()).await.unwrap(); - assert_matches!( - region_failover_manager.is_maintenance_mode().await, - Ok(true) - ); - - let _ = kv_backend - .delete(MAINTENANCE_KEY.as_bytes(), false) - .await - .unwrap(); - assert_matches!( - region_failover_manager.is_maintenance_mode().await, - Ok(false) - ); + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + let detector = rx.await.unwrap(); + assert_eq!(detector.iter().collect::>().len(), 3); } } diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs deleted file mode 100644 index 313a02f25362..000000000000 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ /dev/null @@ -1,411 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::ops::DerefMut; -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use common_meta::RegionIdent; -use common_telemetry::{error, info, warn}; -use common_time::util::current_time_millis; -use dashmap::mapref::multiple::RefMulti; -use dashmap::DashMap; -use tokio::sync::mpsc; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::task::JoinHandle; - -use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions}; -use crate::handler::failure_handler::DatanodeHeartbeat; -use crate::metasrv::ElectionRef; -use crate::procedure::region_failover::RegionFailoverManager; - -pub(crate) enum FailureDetectControl { - Purge, - - #[cfg(test)] - Dump(tokio::sync::oneshot::Sender), -} - -pub(crate) struct FailureDetectRunner { - election: Option, - region_failover_manager: Arc, - failure_detector_options: PhiAccrualFailureDetectorOptions, - - heartbeat_tx: Sender, - heartbeat_rx: Option>, - - control_tx: Sender, - control_rx: Option>, - - receiver_handle: Option>, - runner_handle: Option>, -} - -impl FailureDetectRunner { - pub(super) fn new( - election: Option, - region_failover_manager: Arc, - failure_detector_options: PhiAccrualFailureDetectorOptions, - ) -> Self { - let (heartbeat_tx, heartbeat_rx) = mpsc::channel::(1024); - let (control_tx, control_rx) = mpsc::channel::(1024); - Self { - election, - region_failover_manager, - failure_detector_options, - heartbeat_tx, - heartbeat_rx: Some(heartbeat_rx), - control_tx, - control_rx: Some(control_rx), - receiver_handle: None, - runner_handle: None, - } - } - - pub(crate) async fn send_heartbeat(&self, heartbeat: DatanodeHeartbeat) { - if let Err(e) = self.heartbeat_tx.send(heartbeat).await { - error!(e; "FailureDetectRunner is stop receiving heartbeats") - } - } - - pub(crate) async fn send_control(&self, control: FailureDetectControl) { - if let Err(e) = self.control_tx.send(control).await { - error!(e; "FailureDetectRunner is stop receiving controls") - } - } - - pub(crate) async fn start(&mut self) { - let failure_detectors = Arc::new(FailureDetectorContainer { - detectors: DashMap::new(), - options: self.failure_detector_options.clone(), - }); - self.start_with(failure_detectors).await - } - - async fn start_with(&mut self, failure_detectors: Arc) { - let Some(mut heartbeat_rx) = self.heartbeat_rx.take() else { - return; - }; - let Some(mut control_rx) = self.control_rx.take() else { - return; - }; - - let container = failure_detectors.clone(); - let receiver_handle = common_runtime::spawn_bg(async move { - loop { - tokio::select! { - Some(control) = control_rx.recv() => { - match control { - FailureDetectControl::Purge => container.clear(), - - #[cfg(test)] - FailureDetectControl::Dump(tx) => { - // Drain any heartbeats that are not handled before dump. - while let Ok(heartbeat) = heartbeat_rx.try_recv() { - for ident in heartbeat.region_idents { - let mut detector = container.get_failure_detector(ident); - detector.heartbeat(heartbeat.heartbeat_time); - } - } - let _ = tx.send(container.dump()); - } - } - } - Some(heartbeat) = heartbeat_rx.recv() => { - for ident in heartbeat.region_idents { - let mut detector = container.get_failure_detector(ident); - detector.heartbeat(heartbeat.heartbeat_time); - } - } - else => { - warn!("Both control and heartbeat senders are closed, quit receiving."); - break; - } - } - } - }); - self.receiver_handle = Some(receiver_handle); - - let election = self.election.clone(); - let region_failover_manager = self.region_failover_manager.clone(); - let runner_handle = common_runtime::spawn_bg(async move { - async fn maybe_region_failover( - failure_detectors: &Arc, - region_failover_manager: &Arc, - ) { - match region_failover_manager.is_maintenance_mode().await { - Ok(false) => {} - Ok(true) => { - info!("Maintenance mode is enabled, skip failover"); - return; - } - Err(err) => { - error!(err; "Failed to check maintenance mode"); - return; - } - } - - let failed_regions = failure_detectors - .iter() - .filter_map(|e| { - // Intentionally not place `current_time_millis()` out of the iteration. - // The failure detection determination should be happened "just in time", - // i.e., failed or not has to be compared with the most recent "now". - // Besides, it might reduce the false positive of failure detection, - // because during the iteration, heartbeats are coming in as usual, - // and the `phi`s are still updating. - if !e.failure_detector().is_available(current_time_millis()) { - Some(e.region_ident().clone()) - } else { - None - } - }) - .collect::>(); - - for r in failed_regions { - if let Err(e) = region_failover_manager.do_region_failover(&r).await { - error!(e; "Failed to do region failover for {r}"); - } else { - // Now that we know the region is starting to do failover, remove it - // from the failure detectors, avoiding the failover procedure to be - // triggered again. - // If the region is back alive (the failover procedure runs successfully), - // it will be added back to the failure detectors again. - failure_detectors.remove(&r); - } - } - } - - loop { - let start = Instant::now(); - - let is_leader = election.as_ref().map(|x| x.is_leader()).unwrap_or(true); - if is_leader { - maybe_region_failover(&failure_detectors, ®ion_failover_manager).await; - } - - let elapsed = Instant::now().duration_since(start); - if let Some(sleep) = Duration::from_secs(1).checked_sub(elapsed) { - tokio::time::sleep(sleep).await; - } // else the elapsed time is exceeding one second, we should continue working immediately - } - }); - self.runner_handle = Some(runner_handle); - } - - #[cfg(test)] - pub(crate) async fn dump(&self) -> FailureDetectorContainer { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.send_control(FailureDetectControl::Dump(tx)).await; - rx.await.unwrap() - } -} - -impl Drop for FailureDetectRunner { - fn drop(&mut self) { - if let Some(handle) = self.receiver_handle.take() { - handle.abort(); - info!("Heartbeat receiver in FailureDetectRunner is stopped."); - } - - if let Some(handle) = self.runner_handle.take() { - handle.abort(); - info!("Failure detector in FailureDetectRunner is stopped."); - } - } -} - -pub(crate) struct FailureDetectorEntry<'a> { - e: RefMulti<'a, RegionIdent, PhiAccrualFailureDetector>, -} - -impl FailureDetectorEntry<'_> { - fn region_ident(&self) -> &RegionIdent { - self.e.key() - } - - fn failure_detector(&self) -> &PhiAccrualFailureDetector { - self.e.value() - } -} - -pub(crate) struct FailureDetectorContainer { - options: PhiAccrualFailureDetectorOptions, - detectors: DashMap, -} - -impl FailureDetectorContainer { - fn get_failure_detector( - &self, - ident: RegionIdent, - ) -> impl DerefMut + '_ { - self.detectors - .entry(ident) - .or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options.clone())) - } - - pub(crate) fn iter(&self) -> Box + '_> { - Box::new( - self.detectors - .iter() - .map(move |e| FailureDetectorEntry { e }), - ) as _ - } - - fn remove(&self, ident: &RegionIdent) { - let _ = self.detectors.remove(ident); - } - - fn clear(&self) { - self.detectors.clear() - } - - #[cfg(test)] - fn dump(&self) -> FailureDetectorContainer { - let mut m = DashMap::with_capacity(self.detectors.len()); - m.extend( - self.detectors - .iter() - .map(|x| (x.key().clone(), x.value().clone())), - ); - Self { - detectors: m, - options: self.options.clone(), - } - } -} - -#[cfg(test)] -mod tests { - use rand::Rng; - - use super::*; - use crate::test_util::create_region_failover_manager; - - #[test] - fn test_default_failure_detector_container() { - let container = FailureDetectorContainer { - detectors: DashMap::new(), - options: PhiAccrualFailureDetectorOptions::default(), - }; - let ident = RegionIdent { - table_id: 1, - cluster_id: 3, - datanode_id: 2, - region_number: 1, - engine: "mito2".to_string(), - }; - let _ = container.get_failure_detector(ident.clone()); - assert!(container.detectors.contains_key(&ident)); - - { - let mut iter = container.iter(); - let _ = iter.next().unwrap(); - assert!(iter.next().is_none()); - } - - container.clear(); - assert!(container.detectors.is_empty()); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_control() { - let container = FailureDetectorContainer { - detectors: DashMap::new(), - options: PhiAccrualFailureDetectorOptions::default(), - }; - - let ident = RegionIdent { - table_id: 1, - cluster_id: 3, - datanode_id: 2, - region_number: 1, - engine: "mito2".to_string(), - }; - let _ = container.get_failure_detector(ident.clone()); - - let region_failover_manager = create_region_failover_manager(); - let failure_detector_options = PhiAccrualFailureDetectorOptions::default(); - let mut runner = - FailureDetectRunner::new(None, region_failover_manager, failure_detector_options); - runner.start_with(Arc::new(container)).await; - - let dump = runner.dump().await; - assert_eq!(dump.iter().collect::>().len(), 1); - - runner.send_control(FailureDetectControl::Purge).await; - - let dump = runner.dump().await; - assert_eq!(dump.iter().collect::>().len(), 0); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_heartbeat() { - let region_failover_manager = create_region_failover_manager(); - let failure_detector_options = PhiAccrualFailureDetectorOptions::default(); - let mut runner = - FailureDetectRunner::new(None, region_failover_manager, failure_detector_options); - runner.start().await; - - // Generate 2000 heartbeats start from now. Heartbeat interval is one second, plus some random millis. - fn generate_heartbeats(datanode_id: u64, region_ids: Vec) -> Vec { - let mut rng = rand::thread_rng(); - let start = current_time_millis(); - (0..2000) - .map(|i| DatanodeHeartbeat { - region_idents: region_ids - .iter() - .map(|®ion_number| RegionIdent { - table_id: 0, - cluster_id: 1, - datanode_id, - region_number, - engine: "mito2".to_string(), - }) - .collect(), - heartbeat_time: start + i * 1000 + rng.gen_range(0..100), - }) - .collect::>() - } - - let heartbeats = generate_heartbeats(100, vec![1, 2, 3]); - let last_heartbeat_time = heartbeats.last().unwrap().heartbeat_time; - for heartbeat in heartbeats { - runner.send_heartbeat(heartbeat).await; - } - - let dump = runner.dump().await; - let failure_detectors = dump.iter().collect::>(); - assert_eq!(failure_detectors.len(), 3); - - failure_detectors.iter().for_each(|e| { - let fd = e.failure_detector(); - let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64; - let start = last_heartbeat_time; - - // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ... - for i in 1..=acceptable_heartbeat_pause_millis / 1000 { - let now = start + i * 1000; - assert_eq!(fd.phi(now), 0.0); - } - - // ... then in less than two seconds, phi is above the threshold. - // The same effect can be seen in the diagrams in Akka's document. - let now = start + acceptable_heartbeat_pause_millis + 1000; - assert!(fd.phi(now) < fd.threshold() as _); - let now = start + acceptable_heartbeat_pause_millis + 2000; - assert!(fd.phi(now) > fd.threshold() as _); - }); - } -} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 0dbc4235fb9d..9e1727f5cf44 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -54,6 +54,7 @@ use crate::lease::lookup_datanode_peer; use crate::lock::DistLockRef; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::pubsub::{PublisherRef, SubscriptionManagerRef}; +use crate::region::supervisor::RegionSupervisorTickerRef; use crate::selector::{Selector, SelectorType}; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::LeaderCachedKvBackend; @@ -266,6 +267,7 @@ pub struct MetaStateHandler { subscribe_manager: Option, greptimedb_telemetry_task: Arc, leader_cached_kv_backend: Arc, + region_supervisor_ticker: Option, state: StateRef, } @@ -279,6 +281,10 @@ impl MetaStateHandler { self.state.write().unwrap().next_state(become_leader(true)); } + if let Some(ticker) = self.region_supervisor_ticker.as_ref() { + ticker.start(); + } + if let Err(e) = self.procedure_manager.start().await { error!(e; "Failed to start procedure manager"); } @@ -297,6 +303,12 @@ impl MetaStateHandler { if let Err(e) = self.procedure_manager.stop().await { error!(e; "Failed to stop procedure manager"); } + + if let Some(ticker) = self.region_supervisor_ticker.as_ref() { + // Stops the supervisor ticker. + ticker.stop(); + } + // Suspends reporting. self.greptimedb_telemetry_task.should_report(false); @@ -336,6 +348,7 @@ pub struct Metasrv { memory_region_keeper: MemoryRegionKeeperRef, greptimedb_telemetry_task: Arc, region_migration_manager: RegionMigrationManagerRef, + region_supervisor_ticker: Option, plugins: Plugins, } @@ -367,6 +380,7 @@ impl Metasrv { greptimedb_telemetry_task .start() .context(StartTelemetryTaskSnafu)?; + let region_supervisor_ticker = self.region_supervisor_ticker.clone(); let state_handler = MetaStateHandler { greptimedb_telemetry_task, subscribe_manager, @@ -374,6 +388,7 @@ impl Metasrv { wal_options_allocator: self.wal_options_allocator.clone(), state: self.state.clone(), leader_cached_kv_backend: leader_cached_kv_backend.clone(), + region_supervisor_ticker, }; let _handle = common_runtime::spawn_bg(async move { loop { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 844e2f786d3d..3f3a86b2e8bf 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -65,10 +65,10 @@ use crate::lock::DistLockRef; use crate::metasrv::{ ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; -use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; use crate::pubsub::PublisherRef; +use crate::region::supervisor::{RegionSupervisor, DEFAULT_TICK_INTERVAL}; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::round_robin::RoundRobinSelector; use crate::service::mailbox::MailboxRef; @@ -225,6 +225,7 @@ impl MetasrvBuilder { options.wal.clone(), kv_backend.clone(), )); + let is_remote_wal = wal_options_allocator.is_remote_wal(); let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { let sequence = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) @@ -280,6 +281,7 @@ impl MetasrvBuilder { server_addr: options.server_addr.clone(), }, )); + let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone())); let ddl_manager = Arc::new( DdlManager::try_new( DdlContext { @@ -290,9 +292,7 @@ impl MetasrvBuilder { table_metadata_allocator: table_metadata_allocator.clone(), flow_metadata_manager: flow_metadata_manager.clone(), flow_metadata_allocator: flow_metadata_allocator.clone(), - peer_lookup_service: Arc::new(MetaPeerLookupService::new( - meta_peer_client.clone(), - )), + peer_lookup_service: peer_lookup_service.clone(), }, procedure_manager.clone(), true, @@ -311,32 +311,36 @@ impl MetasrvBuilder { )); region_migration_manager.try_start()?; + if !is_remote_wal && options.enable_region_failover { + return error::UnexpectedSnafu { + violated: "Region failover is not supported in the local WAL implementation!", + } + .fail(); + } + + let (region_failover_handler, region_supervisor_ticker) = + if options.enable_region_failover && is_remote_wal { + let region_supervisor = RegionSupervisor::new( + options.failure_detector, + DEFAULT_TICK_INTERVAL, + selector_ctx.clone(), + selector.clone(), + region_migration_manager.clone(), + leader_cached_kv_backend.clone() as _, + peer_lookup_service, + ); + let region_supervisor_ticker = region_supervisor.ticker(); + ( + Some(RegionFailureHandler::new(region_supervisor)), + Some(region_supervisor_ticker), + ) + } else { + (None, None) + }; + let handler_group = match handler_group { Some(handler_group) => handler_group, None => { - let region_failover_handler = if options.enable_region_failover { - let region_failover_manager = Arc::new(RegionFailoverManager::new( - distributed_time_constants::REGION_LEASE_SECS, - in_memory.clone(), - kv_backend.clone(), - mailbox.clone(), - procedure_manager.clone(), - (selector.clone(), selector_ctx.clone()), - lock.clone(), - table_metadata_manager.clone(), - )); - Some( - RegionFailureHandler::try_new( - election.clone(), - region_failover_manager, - options.failure_detector.clone(), - ) - .await?, - ) - } else { - None - }; - let publish_heartbeat_handler = plugins .clone() .and_then(|plugins| plugins.get::()) @@ -406,6 +410,7 @@ impl MetasrvBuilder { plugins: plugins.unwrap_or_else(Plugins::default), memory_region_keeper, region_migration_manager, + region_supervisor_ticker, }) } } diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 009915f7d993..dbe63b762c9f 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +// TODO(weny): remove it. +#[allow(unused)] pub mod region_failover; pub mod region_migration; #[cfg(test)] diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index b1c5a22f69e5..da9a5641a5ac 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -19,7 +19,7 @@ pub(crate) mod migration_end; pub(crate) mod migration_start; pub(crate) mod open_candidate_region; #[cfg(test)] -pub(crate) mod test_util; +pub mod test_util; pub(crate) mod update_metadata; pub(crate) mod upgrade_candidate_region; @@ -43,6 +43,7 @@ use common_procedure::error::{ }; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey}; pub use manager::RegionMigrationProcedureTask; +use manager::{RegionMigrationProcedureGuard, RegionMigrationProcedureTracker}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -364,43 +365,61 @@ pub struct RegionMigrationData<'a> { state: &'a dyn State, } -pub struct RegionMigrationProcedure { +pub(crate) struct RegionMigrationProcedure { state: Box, context: Context, + _guard: Option, } -#[allow(dead_code)] impl RegionMigrationProcedure { const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration"; pub fn new( persistent_context: PersistentContext, context_factory: impl ContextFactory, + guard: Option, ) -> Self { let state = Box::new(RegionMigrationStart {}); - Self::new_inner(state, persistent_context, context_factory) + Self::new_inner(state, persistent_context, context_factory, guard) } fn new_inner( state: Box, persistent_context: PersistentContext, context_factory: impl ContextFactory, + guard: Option, ) -> Self { Self { state, context: context_factory.new_context(persistent_context), + _guard: guard, } } - fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult { + fn from_json( + json: &str, + context_factory: impl ContextFactory, + tracker: RegionMigrationProcedureTracker, + ) -> ProcedureResult { let RegionMigrationDataOwned { persistent_ctx, state, } = serde_json::from_str(json).context(FromJsonSnafu)?; + let guard = tracker.insert_running_procedure(&RegionMigrationProcedureTask { + cluster_id: persistent_ctx.cluster_id, + region_id: persistent_ctx.region_id, + from_peer: persistent_ctx.from_peer.clone(), + to_peer: persistent_ctx.to_peer.clone(), + replay_timeout: persistent_ctx.replay_timeout, + }); let context = context_factory.new_context(persistent_ctx); - Ok(Self { state, context }) + Ok(Self { + state, + context, + _guard: guard, + }) } } @@ -467,7 +486,7 @@ mod tests { let env = TestingEnv::new(); let context = env.context_factory(); - let procedure = RegionMigrationProcedure::new(persistent_context, context); + let procedure = RegionMigrationProcedure::new(persistent_context, context, None); let key = procedure.lock_key(); let keys = key.keys_to_lock().cloned().collect::>(); @@ -484,7 +503,7 @@ mod tests { let env = TestingEnv::new(); let context = env.context_factory(); - let procedure = RegionMigrationProcedure::new(persistent_context, context); + let procedure = RegionMigrationProcedure::new(persistent_context, context, None); let serialized = procedure.dump().unwrap(); let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#; @@ -531,7 +550,7 @@ mod tests { let persistent_context = new_persistent_context(); let context_factory = env.context_factory(); let state = Box::::default(); - RegionMigrationProcedure::new_inner(state, persistent_context, context_factory) + RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, None) } let ctx = TestingEnv::procedure_context(); @@ -550,8 +569,11 @@ mod tests { let serialized = procedure.dump().unwrap(); let context_factory = env.context_factory(); + let tracker = env.tracker(); let mut procedure = - RegionMigrationProcedure::from_json(&serialized, context_factory).unwrap(); + RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone()) + .unwrap(); + assert!(tracker.contains(procedure.context.persistent_ctx.region_id)); for _ in 1..3 { status = Some(procedure.execute(&ctx).await.unwrap()); diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 871342fd4fef..e9080e7fd5ce 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -39,8 +39,41 @@ pub type RegionMigrationManagerRef = Arc; /// Manager of region migration procedure. pub struct RegionMigrationManager { procedure_manager: ProcedureManagerRef, - running_procedures: Arc>>, context_factory: DefaultContextFactory, + tracker: RegionMigrationProcedureTracker, +} + +#[derive(Default, Clone)] +pub(crate) struct RegionMigrationProcedureTracker { + running_procedures: Arc>>, +} + +impl RegionMigrationProcedureTracker { + /// Returns the [RegionMigrationProcedureGuard] if current region isn't migrating. + pub(crate) fn insert_running_procedure( + &self, + task: &RegionMigrationProcedureTask, + ) -> Option { + let mut procedures = self.running_procedures.write().unwrap(); + match procedures.entry(task.region_id) { + Entry::Occupied(_) => None, + Entry::Vacant(v) => { + v.insert(task.clone()); + Some(RegionMigrationProcedureGuard { + region_id: task.region_id, + running_procedures: self.running_procedures.clone(), + }) + } + } + } + + /// Returns true if it contains the specific region(`region_id`). + pub(crate) fn contains(&self, region_id: RegionId) -> bool { + self.running_procedures + .read() + .unwrap() + .contains_key(®ion_id) + } } /// The guard of running [RegionMigrationProcedureTask]. @@ -51,10 +84,17 @@ pub(crate) struct RegionMigrationProcedureGuard { impl Drop for RegionMigrationProcedureGuard { fn drop(&mut self) { - self.running_procedures - .write() + let exists = self + .running_procedures + .read() .unwrap() - .remove(&self.region_id); + .contains_key(&self.region_id); + if exists { + self.running_procedures + .write() + .unwrap() + .remove(&self.region_id); + } } } @@ -96,27 +136,34 @@ impl Display for RegionMigrationProcedureTask { } impl RegionMigrationManager { - /// Returns new [RegionMigrationManager] + /// Returns new [`RegionMigrationManager`] pub(crate) fn new( procedure_manager: ProcedureManagerRef, context_factory: DefaultContextFactory, ) -> Self { Self { procedure_manager, - running_procedures: Arc::new(RwLock::new(HashMap::new())), context_factory, + tracker: RegionMigrationProcedureTracker::default(), } } + /// Returns the [`RegionMigrationProcedureTracker`]. + pub(crate) fn tracker(&self) -> &RegionMigrationProcedureTracker { + &self.tracker + } + /// Registers the loader of [RegionMigrationProcedure] to the `ProcedureManager`. pub(crate) fn try_start(&self) -> Result<()> { let context_factory = self.context_factory.clone(); + let tracker = self.tracker.clone(); self.procedure_manager .register_loader( RegionMigrationProcedure::TYPE_NAME, Box::new(move |json| { let context_factory = context_factory.clone(); - RegionMigrationProcedure::from_json(json, context_factory) + let tracker = tracker.clone(); + RegionMigrationProcedure::from_json(json, context_factory, tracker) .map(|p| Box::new(p) as _) }), ) @@ -129,18 +176,7 @@ impl RegionMigrationManager { &self, task: &RegionMigrationProcedureTask, ) -> Option { - let mut procedures = self.running_procedures.write().unwrap(); - - match procedures.entry(task.region_id) { - Entry::Occupied(_) => None, - Entry::Vacant(v) => { - v.insert(task.clone()); - Some(RegionMigrationProcedureGuard { - region_id: task.region_id, - running_procedures: self.running_procedures.clone(), - }) - } - } + self.tracker.insert_running_procedure(task) } fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> { @@ -210,6 +246,10 @@ impl RegionMigrationManager { region_route: &RegionRoute, task: &RegionMigrationProcedureTask, ) -> Result { + if region_route.is_leader_downgraded() { + return Ok(false); + } + let leader_peer = region_route .leader_peer .as_ref() @@ -301,15 +341,13 @@ impl RegionMigrationManager { replay_timeout, }, self.context_factory.clone(), + Some(guard), ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); let procedure_id = procedure_with_id.id; info!("Starting region migration procedure {procedure_id} for {task}"); - let procedure_manager = self.procedure_manager.clone(); - common_runtime::spawn_bg(async move { - let _ = guard; let watcher = &mut match procedure_manager.submit(procedure_with_id).await { Ok(watcher) => watcher, Err(e) => { @@ -356,6 +394,7 @@ mod test { }; // Inserts one manager + .tracker .running_procedures .write() .unwrap() diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 55cc9003b69b..2f3638ef9323 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -26,6 +26,7 @@ use common_meta::instruction::{ use common_meta::key::table_route::TableRouteValue; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::rpc::router::RegionRoute; @@ -42,6 +43,7 @@ use store_api::storage::RegionId; use table::metadata::RawTableInfo; use tokio::sync::mpsc::{Receiver, Sender}; +use super::manager::RegionMigrationProcedureTracker; use super::migration_abort::RegionMigrationAbort; use super::upgrade_candidate_region::UpgradeCandidateRegion; use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext}; @@ -94,6 +96,14 @@ pub struct TestingEnv { opening_region_keeper: MemoryRegionKeeperRef, server_addr: String, procedure_manager: ProcedureManagerRef, + tracker: RegionMigrationProcedureTracker, + kv_backend: KvBackendRef, +} + +impl Default for TestingEnv { + fn default() -> Self { + Self::new() + } } impl TestingEnv { @@ -117,9 +127,21 @@ impl TestingEnv { mailbox_ctx, server_addr: "localhost".to_string(), procedure_manager, + tracker: Default::default(), + kv_backend, } } + /// Returns the [KvBackendRef]. + pub fn kv_backend(&self) -> KvBackendRef { + self.kv_backend.clone() + } + + /// Returns the [RegionMigrationProcedureTracker]. + pub(crate) fn tracker(&self) -> RegionMigrationProcedureTracker { + self.tracker.clone() + } + /// Returns a context of region migration procedure. pub fn context_factory(&self) -> DefaultContextFactory { DefaultContextFactory { @@ -431,7 +453,7 @@ impl ProcedureMigrationTestSuite { /// The step of test. #[derive(Clone)] -pub enum Step { +pub(crate) enum Step { Setup((String, BeforeTest)), Next((String, Option, Assertion)), } diff --git a/src/meta-srv/src/region.rs b/src/meta-srv/src/region.rs index a3106bd57935..d4f64af7ce1e 100644 --- a/src/meta-srv/src/region.rs +++ b/src/meta-srv/src/region.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod failure_detector; pub mod lease_keeper; +pub mod supervisor; pub use lease_keeper::RegionLeaseKeeper; diff --git a/src/meta-srv/src/region/failure_detector.rs b/src/meta-srv/src/region/failure_detector.rs new file mode 100644 index 000000000000..e9c574cadd3e --- /dev/null +++ b/src/meta-srv/src/region/failure_detector.rs @@ -0,0 +1,131 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::DerefMut; + +use common_meta::{ClusterId, DatanodeId}; +use dashmap::mapref::multiple::RefMulti; +use dashmap::DashMap; +use store_api::storage::RegionId; + +use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions}; + +pub(crate) type Ident = (ClusterId, DatanodeId, RegionId); + +/// Detects the region failures. +pub(crate) struct RegionFailureDetector { + options: PhiAccrualFailureDetectorOptions, + detectors: DashMap, +} + +pub(crate) struct FailureDetectorEntry<'a> { + e: RefMulti<'a, Ident, PhiAccrualFailureDetector>, +} + +impl FailureDetectorEntry<'_> { + pub(crate) fn region_ident(&self) -> &Ident { + self.e.key() + } + + pub(crate) fn failure_detector(&self) -> &PhiAccrualFailureDetector { + self.e.value() + } +} + +impl RegionFailureDetector { + pub(crate) fn new(options: PhiAccrualFailureDetectorOptions) -> Self { + Self { + options, + detectors: DashMap::new(), + } + } + + /// Returns [PhiAccrualFailureDetector] of the specific ([DatanodeId],[RegionId]). + pub(crate) fn region_failure_detector( + &self, + ident: Ident, + ) -> impl DerefMut + '_ { + self.detectors + .entry(ident) + .or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options)) + } + + /// Returns a [FailureDetectorEntry] iterator. + pub(crate) fn iter(&self) -> impl Iterator + '_ { + self.detectors + .iter() + .map(move |e| FailureDetectorEntry { e }) + } + + /// Removes the specific [PhiAccrualFailureDetector] if exists. + pub(crate) fn remove(&self, ident: &Ident) { + self.detectors.remove(ident); + } + + /// Removes all [PhiAccrualFailureDetector]s. + pub(crate) fn clear(&self) { + self.detectors.clear() + } + + /// Returns true if the specific `ident` exists. + #[cfg(test)] + pub(crate) fn contains(&self, ident: &Ident) -> bool { + self.detectors.contains_key(ident) + } + + /// Returns the length + #[cfg(test)] + pub(crate) fn len(&self) -> usize { + self.detectors.len() + } + + /// Returns true if it's empty + #[cfg(test)] + pub(crate) fn is_empty(&self) -> bool { + self.detectors.is_empty() + } + + #[cfg(test)] + pub(crate) fn dump(&self) -> RegionFailureDetector { + let mut m = DashMap::with_capacity(self.detectors.len()); + m.extend(self.detectors.iter().map(|x| (*x.key(), x.value().clone()))); + Self { + detectors: m, + options: self.options, + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_default_failure_detector_container() { + let container = RegionFailureDetector::new(Default::default()); + let ident = (0, 2, RegionId::new(1, 1)); + let _ = container.region_failure_detector(ident); + assert!(container.contains(&ident)); + + { + let mut iter = container.iter(); + let _ = iter.next().unwrap(); + assert!(iter.next().is_none()); + } + + container.clear(); + assert!(container.is_empty()); + } +} diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs new file mode 100644 index 000000000000..83b264eaa4ba --- /dev/null +++ b/src/meta-srv/src/region/supervisor.rs @@ -0,0 +1,529 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use common_meta::key::MAINTENANCE_KEY; +use common_meta::kv_backend::KvBackendRef; +use common_meta::peer::PeerLookupServiceRef; +use common_meta::{ClusterId, DatanodeId}; +use common_runtime::JoinHandle; +use common_telemetry::{error, info, warn}; +use common_time::util::current_time_millis; +use error::Error::{MigrationRunning, TableRouteNotFound}; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::time::{interval, MissedTickBehavior}; + +use super::failure_detector::RegionFailureDetector; +use crate::error::{self, Result}; +use crate::failure_detector::PhiAccrualFailureDetectorOptions; +use crate::handler::node_stat::Stat; +use crate::metasrv::{SelectorContext, SelectorRef}; +use crate::procedure::region_migration::manager::RegionMigrationManagerRef; +use crate::procedure::region_migration::RegionMigrationProcedureTask; +use crate::selector::SelectorOptions; + +/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode. +/// It includes identifiers for the cluster and datanode, a list of regions being monitored, +/// and a timestamp indicating when the heartbeat was sent. +#[derive(Debug)] +pub(crate) struct DatanodeHeartbeat { + cluster_id: ClusterId, + datanode_id: DatanodeId, + // TODO(weny): Considers collecting the memtable size in regions. + regions: Vec, + timestamp: i64, +} + +impl From<&Stat> for DatanodeHeartbeat { + fn from(value: &Stat) -> Self { + DatanodeHeartbeat { + cluster_id: value.cluster_id, + datanode_id: value.id, + regions: value.region_stats.iter().map(|x| x.id).collect(), + timestamp: value.timestamp_millis, + } + } +} + +/// `Event` represents various types of events that can be processed by the region supervisor. +/// These events are crucial for managing state transitions and handling specific scenarios +/// in the region lifecycle. +/// +/// Variants: +/// - `Tick`: This event is used to trigger region failure detection periodically. +/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes. +/// - `Clear`: This event is used to reset the state of the supervisor, typically used +/// when a system-wide reset or reinitialization is needed. +/// - `Dump`: (Available only in test) This event triggers a dump of the +/// current state for debugging purposes. It allows developers to inspect the internal state +/// of the supervisor during tests. +pub(crate) enum Event { + Tick, + HeartbeatArrived(DatanodeHeartbeat), + Clear, + #[cfg(test)] + Dump(tokio::sync::oneshot::Sender), +} + +impl Debug for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Tick => write!(f, "Tick"), + Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(), + Self::Clear => write!(f, "Clear"), + #[cfg(test)] + Self::Dump(_) => f.debug_struct("Dump").finish(), + } + } +} + +pub type RegionSupervisorTickerRef = Arc; + +/// A background job to generate [`Event::Tick`] type events. +#[derive(Debug)] +pub struct RegionSupervisorTicker { + /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`]. + tick_handle: Mutex>>, + + /// The interval of tick. + tick_interval: Duration, + + /// Sends [Event]s. + sender: Sender, +} + +impl RegionSupervisorTicker { + /// Starts the ticker. + pub fn start(&self) { + let mut handle = self.tick_handle.lock().unwrap(); + if handle.is_none() { + let sender = self.sender.clone(); + let tick_interval = self.tick_interval; + let ticker_loop = tokio::spawn(async move { + let mut interval = interval(tick_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + if let Err(err) = sender.send(Event::Clear).await { + warn!(err; "EventReceiver is dropped, failed to send Event::Clear"); + return; + } + loop { + interval.tick().await; + if sender.send(Event::Tick).await.is_err() { + info!("EventReceiver is dropped, tick loop is stopped"); + break; + } + } + }); + *handle = Some(ticker_loop); + } + } + + /// Stops the ticker. + pub fn stop(&self) { + let handle = self.tick_handle.lock().unwrap().take(); + if let Some(handle) = handle { + handle.abort(); + info!("The tick loop is stopped."); + } + } +} + +impl Drop for RegionSupervisorTicker { + fn drop(&mut self) { + self.stop(); + } +} + +pub type RegionSupervisorRef = Arc; + +/// The default tick interval. +pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1); + +/// The [`RegionSupervisor`] is used to detect Region failures +/// and initiate Region failover upon detection, ensuring uninterrupted region service. +pub struct RegionSupervisor { + /// Used to detect the failure of regions. + failure_detector: RegionFailureDetector, + /// The interval of tick + tick_interval: Duration, + /// Receives [Event]s. + receiver: Receiver, + /// [Event] Sender. + sender: Sender, + /// The context of [`SelectorRef`] + selector_context: SelectorContext, + /// Candidate node selector. + selector: SelectorRef, + /// Region migration manager. + region_migration_manager: RegionMigrationManagerRef, + // TODO(weny): find a better way + kv_backend: KvBackendRef, + /// Peer lookup service + peer_lookup: PeerLookupServiceRef, +} + +/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`]. +pub(crate) struct HeartbeatAcceptor { + sender: Sender, +} + +impl HeartbeatAcceptor { + /// Accepts heartbeats from datanodes. + pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) { + if let Err(e) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await { + error!(e; "RegionSupervisor is stop receiving heartbeat"); + } + } +} + +#[cfg(test)] +impl RegionSupervisor { + /// Returns the [Event] sender. + pub(crate) fn sender(&self) -> Sender { + self.sender.clone() + } +} + +impl RegionSupervisor { + pub(crate) fn new( + options: PhiAccrualFailureDetectorOptions, + tick_interval: Duration, + selector_context: SelectorContext, + selector: SelectorRef, + region_migration_manager: RegionMigrationManagerRef, + kv_backend: KvBackendRef, + peer_lookup: PeerLookupServiceRef, + ) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel(1024); + Self { + failure_detector: RegionFailureDetector::new(options), + tick_interval, + receiver: rx, + sender: tx, + selector_context, + selector, + region_migration_manager, + kv_backend, + peer_lookup, + } + } + + /// Returns the [`HeartbeatAcceptor`]. + pub(crate) fn heartbeat_acceptor(&self) -> HeartbeatAcceptor { + HeartbeatAcceptor { + sender: self.sender.clone(), + } + } + + /// Returns the [`RegionSupervisorTicker`]. + pub(crate) fn ticker(&self) -> RegionSupervisorTickerRef { + Arc::new(RegionSupervisorTicker { + tick_interval: self.tick_interval, + sender: self.sender.clone(), + tick_handle: Mutex::new(None), + }) + } + + /// Runs the main loop. + pub(crate) async fn run(&mut self) { + while let Some(event) = self.receiver.recv().await { + match event { + Event::Tick => { + let regions = self.detect_region_failure(); + self.handle_region_failures(regions).await; + } + Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat), + Event::Clear => self.clear(), + #[cfg(test)] + Event::Dump(sender) => { + let _ = sender.send(self.failure_detector.dump()); + } + } + } + info!("RegionSupervisor is stopped!"); + } + + async fn handle_region_failures(&self, mut regions: Vec<(ClusterId, DatanodeId, RegionId)>) { + if regions.is_empty() { + return; + } + match self.is_maintenance_mode().await { + Ok(false) => {} + Ok(true) => { + info!("Maintenance mode is enabled, skip failover"); + return; + } + Err(err) => { + error!(err; "Failed to check maintenance mode"); + return; + } + } + + let migrating_regions = regions + .extract_if(|(_, _, region_id)| { + self.region_migration_manager.tracker().contains(*region_id) + }) + .collect::>(); + + for (cluster_id, datanode_id, region_id) in migrating_regions { + self.failure_detector + .remove(&(cluster_id, datanode_id, region_id)); + } + + warn!("Detects region failures: {:?}", regions); + for (cluster_id, datanode_id, region_id) in regions { + match self.do_failover(cluster_id, datanode_id, region_id).await { + Ok(_) => self + .failure_detector + .remove(&(cluster_id, datanode_id, region_id)), + Err(err) => { + error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}"); + } + } + } + } + + pub(crate) async fn is_maintenance_mode(&self) -> Result { + self.kv_backend + .exists(MAINTENANCE_KEY.as_bytes()) + .await + .context(error::KvBackendSnafu) + } + + async fn do_failover( + &self, + cluster_id: ClusterId, + datanode_id: DatanodeId, + region_id: RegionId, + ) -> Result<()> { + let from_peer = self + .peer_lookup + .datanode(cluster_id, datanode_id) + .await + .context(error::LookupPeerSnafu { + peer_id: datanode_id, + })? + .context(error::PeerUnavailableSnafu { + peer_id: datanode_id, + })?; + let mut peers = self + .selector + .select( + cluster_id, + &self.selector_context, + SelectorOptions { + min_required_items: 1, + allow_duplication: false, + }, + ) + .await?; + let to_peer = peers.remove(0); + let task = RegionMigrationProcedureTask { + cluster_id, + region_id, + from_peer, + to_peer, + replay_timeout: Duration::from_secs(60), + }; + + if let Err(err) = self.region_migration_manager.submit_procedure(task).await { + return match err { + // Returns Ok if it's running or table is dropped. + MigrationRunning { .. } | TableRouteNotFound { .. } => Ok(()), + err => Err(err), + }; + }; + + Ok(()) + } + + /// Detects the failure of regions. + fn detect_region_failure(&self) -> Vec<(ClusterId, DatanodeId, RegionId)> { + self.failure_detector + .iter() + .filter_map(|e| { + // Intentionally not place `current_time_millis()` out of the iteration. + // The failure detection determination should be happened "just in time", + // i.e., failed or not has to be compared with the most recent "now". + // Besides, it might reduce the false positive of failure detection, + // because during the iteration, heartbeats are coming in as usual, + // and the `phi`s are still updating. + if !e.failure_detector().is_available(current_time_millis()) { + Some(*e.region_ident()) + } else { + None + } + }) + .collect::>() + } + + /// Updates the state of corresponding failure detectors. + fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) { + for region_id in heartbeat.regions { + let ident = (heartbeat.cluster_id, heartbeat.datanode_id, region_id); + let mut detector = self.failure_detector.region_failure_detector(ident); + detector.heartbeat(heartbeat.timestamp); + } + } + + fn clear(&self) { + self.failure_detector.clear(); + } +} + +#[cfg(test)] +pub(crate) mod tests { + use std::assert_matches::assert_matches; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + use common_meta::peer::Peer; + use common_meta::test_util::NoopPeerLookupService; + use common_time::util::current_time_millis; + use rand::Rng; + use store_api::storage::RegionId; + use tokio::sync::oneshot; + use tokio::time::sleep; + + use crate::procedure::region_migration::manager::RegionMigrationManager; + use crate::procedure::region_migration::test_util::TestingEnv; + use crate::region::supervisor::{ + DatanodeHeartbeat, Event, RegionSupervisor, RegionSupervisorTicker, + }; + use crate::selector::test_utils::{new_test_selector_context, RandomNodeSelector}; + + pub(crate) fn new_test_supervisor() -> RegionSupervisor { + let env = TestingEnv::new(); + let selector_context = new_test_selector_context(); + let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)])); + let context_factory = env.context_factory(); + let region_migration_manager = Arc::new(RegionMigrationManager::new( + env.procedure_manager().clone(), + context_factory, + )); + let kv_backend = env.kv_backend(); + let peer_lookup = Arc::new(NoopPeerLookupService); + + RegionSupervisor::new( + Default::default(), + Duration::from_secs(1), + selector_context, + selector, + region_migration_manager, + kv_backend, + peer_lookup, + ) + } + + #[tokio::test] + async fn test_heartbeat() { + let mut supervisor = new_test_supervisor(); + let sender = supervisor.sender(); + tokio::spawn(async move { supervisor.run().await }); + + sender + .send(Event::HeartbeatArrived(DatanodeHeartbeat { + cluster_id: 0, + datanode_id: 0, + regions: vec![RegionId::new(1, 1)], + timestamp: 100, + })) + .await + .unwrap(); + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + let detector = rx.await.unwrap(); + assert!(detector.contains(&(0, 0, RegionId::new(1, 1)))); + + // Clear up + sender.send(Event::Clear).await.unwrap(); + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + assert!(rx.await.unwrap().is_empty()); + + fn generate_heartbeats(datanode_id: u64, region_ids: Vec) -> Vec { + let mut rng = rand::thread_rng(); + let start = current_time_millis(); + (0..2000) + .map(|i| DatanodeHeartbeat { + timestamp: start + i * 1000 + rng.gen_range(0..100), + cluster_id: 0, + datanode_id, + regions: region_ids + .iter() + .map(|number| RegionId::new(0, *number)) + .collect(), + }) + .collect::>() + } + + let heartbeats = generate_heartbeats(100, vec![1, 2, 3]); + let last_heartbeat_time = heartbeats.last().unwrap().timestamp; + for heartbeat in heartbeats { + sender + .send(Event::HeartbeatArrived(heartbeat)) + .await + .unwrap(); + } + + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + let detector = rx.await.unwrap(); + assert_eq!(detector.len(), 3); + + for e in detector.iter() { + let fd = e.failure_detector(); + let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64; + let start = last_heartbeat_time; + + // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ... + for i in 1..=acceptable_heartbeat_pause_millis / 1000 { + let now = start + i * 1000; + assert_eq!(fd.phi(now), 0.0); + } + + // ... then in less than two seconds, phi is above the threshold. + // The same effect can be seen in the diagrams in Akka's document. + let now = start + acceptable_heartbeat_pause_millis + 1000; + assert!(fd.phi(now) < fd.threshold() as _); + let now = start + acceptable_heartbeat_pause_millis + 2000; + assert!(fd.phi(now) > fd.threshold() as _); + } + } + + #[tokio::test] + async fn test_supervisor_ticker() { + let (tx, mut rx) = tokio::sync::mpsc::channel(128); + let ticker = RegionSupervisorTicker { + tick_handle: Mutex::new(None), + tick_interval: Duration::from_millis(10), + sender: tx, + }; + // It's ok if we start the ticker again. + for _ in 0..2 { + ticker.start(); + sleep(Duration::from_millis(100)).await; + ticker.stop(); + assert!(!rx.is_empty()); + while let Ok(event) = rx.try_recv() { + assert_matches!(event, Event::Tick | Event::Clear); + } + } + } +} diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index 8cc159445844..d69f0ca5ead2 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -16,9 +16,10 @@ mod common; pub mod lease_based; pub mod load_based; pub mod round_robin; +#[cfg(test)] +pub(crate) mod test_utils; mod weight_compute; mod weighted_choose; - use serde::{Deserialize, Serialize}; use crate::error; diff --git a/src/meta-srv/src/selector/test_utils.rs b/src/meta-srv/src/selector/test_utils.rs new file mode 100644 index 000000000000..0c3b4e3f21d6 --- /dev/null +++ b/src/meta-srv/src/selector/test_utils.rs @@ -0,0 +1,74 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::distributed_time_constants::{FLOWNODE_LEASE_SECS, REGION_LEASE_SECS}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::peer::Peer; +use rand::prelude::SliceRandom; + +use crate::cluster::MetaPeerClientBuilder; +use crate::error::Result; +use crate::metasrv::SelectorContext; +use crate::selector::{Namespace, Selector, SelectorOptions}; + +/// Returns [SelectorContext] for test purpose. +pub fn new_test_selector_context() -> SelectorContext { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let meta_peer_client = MetaPeerClientBuilder::default() + .election(None) + .in_memory(kv_backend.clone()) + .build() + .map(Arc::new) + .unwrap(); + + SelectorContext { + server_addr: "127.0.0.1:3002".to_string(), + datanode_lease_secs: REGION_LEASE_SECS, + flownode_lease_secs: FLOWNODE_LEASE_SECS, + kv_backend, + meta_peer_client, + table_id: None, + } +} + +/// It always returns shuffled `nodes`. +pub struct RandomNodeSelector { + nodes: Vec, +} + +impl RandomNodeSelector { + pub fn new(nodes: Vec) -> Self { + Self { nodes } + } +} + +#[async_trait::async_trait] +impl Selector for RandomNodeSelector { + type Context = SelectorContext; + type Output = Vec; + + async fn select( + &self, + _ns: Namespace, + _ctx: &Self::Context, + _opts: SelectorOptions, + ) -> Result { + let mut rng = rand::thread_rng(); + let mut nodes = self.nodes.clone(); + nodes.shuffle(&mut rng); + Ok(nodes) + } +} diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 1553236406be..9f9f119e7eca 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -18,14 +18,11 @@ use std::sync::Arc; use chrono::DateTime; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::key::table_route::TableRouteValue; -use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; -use common_meta::sequence::SequenceBuilder; -use common_meta::state_store::KvStateStore; use common_meta::ClusterId; -use common_procedure::local::{LocalManager, ManagerConfig}; use common_time::util as time_util; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; @@ -33,12 +30,8 @@ use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use table::requests::TableOptions; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; -use crate::handler::{HeartbeatMailbox, Pushers}; use crate::key::{DatanodeLeaseKey, LeaseValue}; -use crate::lock::memory::MemLock; use crate::metasrv::SelectorContext; -use crate::procedure::region_failover::RegionFailoverManager; -use crate::selector::lease_based::LeaseBasedSelector; pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) -> RegionRoute { let region = Region { @@ -79,33 +72,6 @@ pub(crate) fn create_selector_context() -> SelectorContext { } } -pub(crate) fn create_region_failover_manager() -> Arc { - let kv_backend = Arc::new(MemoryKvBackend::new()); - - let pushers = Pushers::default(); - let mailbox_sequence = - SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); - let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence); - - let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); - let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); - - let selector = Arc::new(LeaseBasedSelector); - let selector_ctx = create_selector_context(); - - let in_memory = Arc::new(MemoryKvBackend::new()); - Arc::new(RegionFailoverManager::new( - 10, - in_memory, - kv_backend.clone(), - mailbox, - procedure_manager, - (selector, selector_ctx), - Arc::new(MemLock::default()), - Arc::new(TableMetadataManager::new(kv_backend)), - )) -} - pub(crate) async fn prepare_table_region_and_info_value( table_metadata_manager: &TableMetadataManagerRef, table: &str,