diff --git a/components/health_controller/src/lib.rs b/components/health_controller/src/lib.rs index baf7f794b85..75427cd8e7c 100644 --- a/components/health_controller/src/lib.rs +++ b/components/health_controller/src/lib.rs @@ -30,6 +30,8 @@ //! that are specific to different modules, increasing the complexity and //! possibility to misuse of `HealthController`. +#![feature(div_duration)] + pub mod reporters; pub mod slow_score; pub mod trend; diff --git a/components/health_controller/src/reporters.rs b/components/health_controller/src/reporters.rs index 96514cf5414..56624c37d64 100644 --- a/components/health_controller/src/reporters.rs +++ b/components/health_controller/src/reporters.rs @@ -12,6 +12,7 @@ use prometheus::IntGauge; use crate::{ slow_score::{SlowScore, SlowScoreTickResult}, trend::{RequestPerSecRecorder, Trend}, + types::InspectFactor, HealthController, HealthControllerInner, RaftstoreDuration, }; @@ -27,6 +28,7 @@ pub struct RaftstoreReporterConfig { /// worker) is expected to tick it. But the interval is necessary in /// some internal calculations. pub inspect_interval: Duration, + pub inspect_kvdb_interval: Duration, pub unsensitive_cause: f64, pub unsensitive_result: f64, @@ -43,9 +45,72 @@ pub struct RaftstoreReporterConfig { pub result_l2_gap_gauges: IntGauge, } +/// A unified slow score that combines multiple slow scores. +/// +/// It calculates the final slow score of a store by picking the maximum +/// score among multiple factors. Each factor represents a different aspect of +/// the store's performance. Typically, we have two factors: Raft Disk I/O and +/// KvDB Disk I/O. If there are more factors in the future, we can add them +/// here. +#[derive(Default)] +pub struct UnifiedSlowScore { + factors: Vec, +} + +impl UnifiedSlowScore { + pub fn new(cfg: &RaftstoreReporterConfig) -> Self { + let mut unified_slow_score = UnifiedSlowScore::default(); + // The first factor is for Raft Disk I/O. + unified_slow_score + .factors + .push(SlowScore::new(cfg.inspect_interval)); + // The second factor is for KvDB Disk I/O. + unified_slow_score + .factors + .push(SlowScore::new_with_extra_config( + cfg.inspect_kvdb_interval, + 0.6, + )); + unified_slow_score + } + + #[inline] + pub fn record( + &mut self, + id: u64, + factor: InspectFactor, + duration: &RaftstoreDuration, + not_busy: bool, + ) { + self.factors[factor as usize].record(id, duration.delays_on_disk_io(false), not_busy); + } + + #[inline] + pub fn get(&self, factor: InspectFactor) -> &SlowScore { + &self.factors[factor as usize] + } + + #[inline] + pub fn get_mut(&mut self, factor: InspectFactor) -> &mut SlowScore { + &mut self.factors[factor as usize] + } + + // Returns the maximum score of all factors. + pub fn get_score(&self) -> f64 { + self.factors + .iter() + .map(|factor| factor.get()) + .fold(1.0, f64::max) + } + + pub fn last_tick_finished(&self) -> bool { + self.factors.iter().all(SlowScore::last_tick_finished) + } +} + pub struct RaftstoreReporter { health_controller_inner: Arc, - slow_score: SlowScore, + slow_score: UnifiedSlowScore, slow_trend: SlowTrendStatistics, is_healthy: bool, } @@ -56,18 +121,14 @@ impl RaftstoreReporter { pub fn new(health_controller: &HealthController, cfg: RaftstoreReporterConfig) -> Self { Self { health_controller_inner: health_controller.inner.clone(), - slow_score: SlowScore::new(cfg.inspect_interval), + slow_score: UnifiedSlowScore::new(&cfg), slow_trend: SlowTrendStatistics::new(cfg), is_healthy: true, } } - pub fn get_tick_interval(&self) -> Duration { - self.slow_score.get_inspect_interval() - } - pub fn get_slow_score(&self) -> f64 { - self.slow_score.get() + self.slow_score.get_score() } pub fn get_slow_trend(&self) -> &SlowTrendStatistics { @@ -77,17 +138,18 @@ impl RaftstoreReporter { pub fn record_raftstore_duration( &mut self, id: u64, + factor: InspectFactor, duration: RaftstoreDuration, store_not_busy: bool, ) { // Fine-tuned, `SlowScore` only takes the I/O jitters on the disk into account. self.slow_score - .record(id, duration.delays_on_disk_io(false), store_not_busy); + .record(id, factor, &duration, store_not_busy); self.slow_trend.record(duration); // Publish slow score to health controller self.health_controller_inner - .update_raftstore_slow_score(self.slow_score.get()); + .update_raftstore_slow_score(self.slow_score.get_score()); } fn is_healthy(&self) -> bool { @@ -109,34 +171,42 @@ impl RaftstoreReporter { } } - pub fn tick(&mut self, store_maybe_busy: bool) -> SlowScoreTickResult { + pub fn tick(&mut self, store_maybe_busy: bool, factor: InspectFactor) -> SlowScoreTickResult { // Record a fairly great value when timeout self.slow_trend.slow_cause.record(500_000, Instant::now()); + // healthy: The health status of the current store. + // all_ticks_finished: The last tick of all factors is finished. + // factor_tick_finished: The last tick of the current factor is finished. + let (healthy, all_ticks_finished, factor_tick_finished) = ( + self.is_healthy(), + self.slow_score.last_tick_finished(), + self.slow_score.get(factor).last_tick_finished(), + ); // The health status is recovered to serving as long as any tick // does not timeout. - if !self.is_healthy() && self.slow_score.last_tick_finished() { + if !healthy && all_ticks_finished { self.set_is_healthy(true); } - if !self.slow_score.last_tick_finished() { + if !all_ticks_finished { // If the last tick is not finished, it means that the current store might // be busy on handling requests or delayed on I/O operations. And only when // the current store is not busy, it should record the last_tick as a timeout. - if !store_maybe_busy { - self.slow_score.record_timeout(); + if !store_maybe_busy && !factor_tick_finished { + self.slow_score.get_mut(factor).record_timeout(); } } - let slow_score_tick_result = self.slow_score.tick(); + let slow_score_tick_result = self.slow_score.get_mut(factor).tick(); if slow_score_tick_result.updated_score.is_some() && !slow_score_tick_result.has_new_record { self.set_is_healthy(false); } // Publish the slow score to health controller - if let Some(slow_score_value) = slow_score_tick_result.updated_score { + if slow_score_tick_result.updated_score.is_some() { self.health_controller_inner - .update_raftstore_slow_score(slow_score_value); + .update_raftstore_slow_score(self.slow_score.get_score()); } slow_score_tick_result diff --git a/components/health_controller/src/slow_score.rs b/components/health_controller/src/slow_score.rs index 12e043b5668..846e3f98517 100644 --- a/components/health_controller/src/slow_score.rs +++ b/components/health_controller/src/slow_score.rs @@ -7,6 +7,12 @@ use std::{ use ordered_float::OrderedFloat; +/// Interval for updating the slow score. +const UPDATE_INTERVALS: Duration = Duration::from_secs(10); +/// Recovery intervals for the slow score. +/// If the score has reached 100 and there is no timeout inspecting requests +/// during this interval, the score will go back to 1 after 5min. +const RECOVERY_INTERVALS: Duration = Duration::from_secs(60 * 5); // Slow score is a value that represents the speed of a store and ranges in [1, // 100]. It is maintained in the AIMD way. // If there are some inspecting requests timeout during a round, by default the @@ -45,7 +51,7 @@ impl SlowScore { inspect_interval, ratio_thresh: OrderedFloat(0.1), - min_ttr: Duration::from_secs(5 * 60), + min_ttr: RECOVERY_INTERVALS, last_record_time: Instant::now(), last_update_time: Instant::now(), round_ticks: 30, @@ -54,6 +60,29 @@ impl SlowScore { } } + // Only for kvdb. + pub fn new_with_extra_config(inspect_interval: Duration, timeout_ratio: f64) -> SlowScore { + SlowScore { + value: OrderedFloat(1.0), + + timeout_requests: 0, + total_requests: 0, + + inspect_interval, + ratio_thresh: OrderedFloat(timeout_ratio), + min_ttr: RECOVERY_INTERVALS, + last_record_time: Instant::now(), + last_update_time: Instant::now(), + // The minimal round ticks is 1 for kvdb. + round_ticks: cmp::max( + UPDATE_INTERVALS.div_duration_f64(inspect_interval) as u64, + 1_u64, + ), + last_tick_id: 0, + last_tick_finished: true, + } + } + pub fn record(&mut self, id: u64, duration: Duration, not_busy: bool) { self.last_record_time = Instant::now(); if id != self.last_tick_id { @@ -207,4 +236,52 @@ mod tests { slow_score.update_impl(Duration::from_secs(57)) ); } + + #[test] + fn test_slow_score_extra() { + let mut slow_score = SlowScore::new_with_extra_config(Duration::from_millis(1000), 0.6); + slow_score.timeout_requests = 1; + slow_score.total_requests = 10; + let score = slow_score.update_impl(Duration::from_secs(10)); + assert!(score > OrderedFloat(1.16)); + assert!(score < OrderedFloat(1.17)); + + slow_score.timeout_requests = 2; + slow_score.total_requests = 10; + let score = slow_score.update_impl(Duration::from_secs(10)); + assert!(score > OrderedFloat(1.5)); + assert!(score < OrderedFloat(1.6)); + + slow_score.timeout_requests = 0; + slow_score.total_requests = 100; + assert_eq!( + OrderedFloat(1.0), + slow_score.update_impl(Duration::from_secs(57)) + ); + + slow_score.timeout_requests = 3; + slow_score.total_requests = 10; + assert_eq!( + OrderedFloat(1.5), + slow_score.update_impl(Duration::from_secs(10)) + ); + + slow_score.timeout_requests = 6; + slow_score.total_requests = 10; + assert_eq!( + OrderedFloat(3.0), + slow_score.update_impl(Duration::from_secs(10)) + ); + + slow_score.timeout_requests = 10; + slow_score.total_requests = 10; + assert_eq!( + OrderedFloat(6.0), + slow_score.update_impl(Duration::from_secs(10)) + ); + + // Test too large inspect interval. + let slow_score = SlowScore::new_with_extra_config(Duration::from_secs(11), 0.1); + assert_eq!(slow_score.round_ticks, 1); + } } diff --git a/components/health_controller/src/types.rs b/components/health_controller/src/types.rs index 5cbf5490511..7342273e972 100644 --- a/components/health_controller/src/types.rs +++ b/components/health_controller/src/types.rs @@ -50,6 +50,22 @@ impl RaftstoreDuration { } } +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum InspectFactor { + RaftDisk = 0, + KvDisk, + // TODO: Add more factors, like network io. +} + +impl InspectFactor { + pub fn as_str(&self) -> &str { + match *self { + InspectFactor::RaftDisk => "raft", + InspectFactor::KvDisk => "kvdb", + } + } +} + /// Used to inspect the latency of all stages of raftstore. pub struct LatencyInspector { id: u64, diff --git a/components/raftstore-v2/src/worker/pd/mod.rs b/components/raftstore-v2/src/worker/pd/mod.rs index 3ae31083d9f..7917ed5cd73 100644 --- a/components/raftstore-v2/src/worker/pd/mod.rs +++ b/components/raftstore-v2/src/worker/pd/mod.rs @@ -9,7 +9,7 @@ use causal_ts::CausalTsProviderImpl; use collections::HashMap; use concurrency_manager::ConcurrencyManager; use engine_traits::{KvEngine, RaftEngine, TabletRegistry}; -use health_controller::types::{LatencyInspector, RaftstoreDuration}; +use health_controller::types::{InspectFactor, LatencyInspector, RaftstoreDuration}; use kvproto::{metapb, pdpb}; use pd_client::{BucketStat, PdClient}; use raftstore::store::{ @@ -254,6 +254,7 @@ where let mut stats_monitor = PdStatsMonitor::new( store_heartbeat_interval / NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT, cfg.value().inspect_interval.0, + std::time::Duration::default(), PdReporter::new(pd_scheduler, logger.clone()), ); stats_monitor.start(auto_split_controller, collector_reg_handle)?; @@ -428,7 +429,7 @@ impl StoreStatsReporter for PdReporter { } } - fn update_latency_stats(&self, timer_tick: u64) { + fn update_latency_stats(&self, timer_tick: u64, _factor: InspectFactor) { // Tick slowness statistics. { if let Err(e) = self.scheduler.schedule(Task::TickSlownessStats) { diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index 005896ef6de..3832adac060 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -367,16 +367,30 @@ pub struct Config { #[deprecated = "The configuration has been removed. The time to clean stale peer safely can be decided based on RocksDB snapshot sequence number."] pub clean_stale_peer_delay: ReadableDuration, - // Interval to inspect the latency of raftstore for slow store detection. + #[online_config(hidden)] + // Interval to inspect the latency of flushing raft logs for slow store detection. pub inspect_interval: ReadableDuration, + // Interval to inspect the latency of flushes on kvdb for slow store detection. + // If the kvdb uses the same mount path with raftdb, the default value will be + // optimized to `0` to avoid duplicated inspection. + #[doc(hidden)] + #[online_config(hidden)] + pub inspect_kvdb_interval: ReadableDuration, /// Threshold of CPU utilization to inspect for slow store detection. #[doc(hidden)] + #[online_config(hidden)] pub inspect_cpu_util_thd: f64, + #[doc(hidden)] + #[online_config(hidden)] // The unsensitive(increase it to reduce sensitiveness) of the cause-trend detection pub slow_trend_unsensitive_cause: f64, + #[doc(hidden)] + #[online_config(hidden)] // The unsensitive(increase it to reduce sensitiveness) of the result-trend detection pub slow_trend_unsensitive_result: f64, + #[doc(hidden)] + #[online_config(hidden)] // The sensitiveness of slowness on network-io. pub slow_trend_network_io_factor: f64, @@ -552,6 +566,7 @@ impl Default for Config { region_split_size: ReadableSize(0), clean_stale_peer_delay: ReadableDuration::minutes(0), inspect_interval: ReadableDuration::millis(100), + inspect_kvdb_interval: ReadableDuration::secs(2), // The default value of `inspect_cpu_util_thd` is 0.4, which means // when the cpu utilization is greater than 40%, the store might be // regarded as a slow node if there exists delayed inspected messages. @@ -685,6 +700,29 @@ impl Config { } } + /// Optimize the interval of different inspectors according to the + /// configuration. + pub fn optimize_inspector(&mut self, separated_raft_mount_path: bool) { + // If the kvdb uses the same mount path with raftdb, the health status + // of kvdb will be inspected by raftstore automatically. So it's not necessary + // to inspect kvdb. + if !separated_raft_mount_path { + self.inspect_kvdb_interval = ReadableDuration::ZERO; + } else { + // If the inspect_kvdb_interval is less than inspect_interval, it should + // use `inspect_interval` * 10 as an empirical inspect interval for KvDB Disk + // I/O. + let inspect_kvdb_interval = if self.inspect_kvdb_interval < self.inspect_interval + && self.inspect_kvdb_interval != ReadableDuration::ZERO + { + self.inspect_interval * 10 + } else { + self.inspect_kvdb_interval + }; + self.inspect_kvdb_interval = inspect_kvdb_interval; + } + } + pub fn validate( &mut self, region_split_size: ReadableSize, @@ -1629,5 +1667,26 @@ mod tests { cfg.raft_write_wait_duration = ReadableDuration::micros(1001); cfg.validate(split_size, true, split_size / 20, false) .unwrap_err(); + + cfg = Config::new(); + cfg.optimize_inspector(false); + assert_eq!(cfg.inspect_kvdb_interval, ReadableDuration::ZERO); + + cfg = Config::new(); + cfg.inspect_kvdb_interval = ReadableDuration::secs(1); + cfg.optimize_inspector(false); + assert_eq!(cfg.inspect_kvdb_interval, ReadableDuration::ZERO); + cfg.optimize_inspector(true); + assert_eq!(cfg.inspect_kvdb_interval, ReadableDuration::ZERO); + + cfg.inspect_kvdb_interval = ReadableDuration::secs(1); + cfg.optimize_inspector(true); + assert_eq!(cfg.inspect_kvdb_interval, ReadableDuration::secs(1)); + + cfg = Config::new(); + cfg.inspect_kvdb_interval = ReadableDuration::millis(1); + cfg.inspect_interval = ReadableDuration::millis(100); + cfg.optimize_inspector(true); + assert_eq!(cfg.inspect_kvdb_interval, ReadableDuration::secs(1)); } } diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index bb12e8c0ed7..25faa03ae72 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -33,7 +33,10 @@ use engine_traits::{ use fail::fail_point; use file_system::{IoType, WithIoType}; use futures::{compat::Future01CompatExt, FutureExt}; -use health_controller::{types::LatencyInspector, HealthController}; +use health_controller::{ + types::{InspectFactor, LatencyInspector}, + HealthController, +}; use itertools::Itertools; use keys::{self, data_end_key, data_key, enc_end_key, enc_start_key}; use kvproto::{ @@ -105,9 +108,10 @@ use crate::{ worker::{ AutoSplitController, CleanupRunner, CleanupSstRunner, CleanupSstTask, CleanupTask, CompactRunner, CompactTask, ConsistencyCheckRunner, ConsistencyCheckTask, - GcSnapshotRunner, GcSnapshotTask, PdRunner, RaftlogGcRunner, RaftlogGcTask, - ReadDelegate, RefreshConfigRunner, RefreshConfigTask, RegionRunner, RegionTask, - SnapGenRunner, SnapGenTask, SplitCheckTask, SNAP_GENERATOR_MAX_POOL_SIZE, + DiskCheckRunner, DiskCheckTask, GcSnapshotRunner, GcSnapshotTask, PdRunner, + RaftlogGcRunner, RaftlogGcTask, ReadDelegate, RefreshConfigRunner, RefreshConfigTask, + RegionRunner, RegionTask, SnapGenRunner, SnapGenTask, SplitCheckTask, + SNAP_GENERATOR_MAX_POOL_SIZE, }, worker_metrics::PROCESS_STAT_CPU_USAGE, Callback, CasualMessage, CompactThreshold, FullCompactController, GlobalReplicationState, @@ -564,6 +568,7 @@ where pub raftlog_gc_scheduler: Scheduler, pub raftlog_fetch_scheduler: Scheduler>, pub region_scheduler: Scheduler, + pub disk_check_scheduler: Scheduler, pub apply_router: ApplyRouter, pub router: RaftRouter, pub importer: Arc>, @@ -886,19 +891,38 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport> #[cfg(any(test, feature = "testexport"))] StoreMsg::Validate(f) => f(&self.ctx.cfg), StoreMsg::LatencyInspect { + factor, send_time, mut inspector, } => { - inspector.record_store_wait(send_time.saturating_elapsed()); - inspector.record_store_commit( - self.ctx - .raft_metrics - .health_stats - .avg(InspectIoType::Network), - ); - // Reset the health_stats and wait it to be refreshed in the next tick. - self.ctx.raft_metrics.health_stats.reset(); - self.ctx.pending_latency_inspect.push(inspector); + match factor { + InspectFactor::RaftDisk => { + inspector.record_store_wait(send_time.saturating_elapsed()); + inspector.record_store_commit( + self.ctx + .raft_metrics + .health_stats + .avg(InspectIoType::Network), + ); + // Reset the health_stats and wait it to be refreshed in the next tick. + self.ctx.raft_metrics.health_stats.reset(); + self.ctx.pending_latency_inspect.push(inspector); + } + InspectFactor::KvDisk => { + // Send LatencyInspector to disk_check_scheduler to inspect latency. + if let Err(e) = self + .ctx + .disk_check_scheduler + .schedule(DiskCheckTask::InspectLatency { inspector }) + { + warn!( + "Failed to schedule disk check task"; + "error" => ?e, + "store_id" => self.fsm.store.id + ); + } + } + } } StoreMsg::UnsafeRecoveryReport(report) => self.store_heartbeat_pd(Some(report)), StoreMsg::UnsafeRecoveryCreatePeer { syncer, create } => { @@ -1258,6 +1282,7 @@ pub struct RaftPollerBuilder { raftlog_gc_scheduler: Scheduler, raftlog_fetch_scheduler: Scheduler>, pub snap_gen_scheduler: Scheduler>, + disk_check_scheduler: Scheduler, pub region_scheduler: Scheduler, apply_router: ApplyRouter, pub router: RaftRouter, @@ -1493,6 +1518,7 @@ where store: self.store.clone(), pd_scheduler: self.pd_scheduler.clone(), consistency_check_scheduler: self.consistency_check_scheduler.clone(), + disk_check_scheduler: self.disk_check_scheduler.clone(), split_check_scheduler: self.split_check_scheduler.clone(), region_scheduler: self.region_scheduler.clone(), apply_router: self.apply_router.clone(), @@ -1572,6 +1598,7 @@ where raftlog_gc_scheduler: self.raftlog_gc_scheduler.clone(), raftlog_fetch_scheduler: self.raftlog_fetch_scheduler.clone(), snap_gen_scheduler: self.snap_gen_scheduler.clone(), + disk_check_scheduler: self.disk_check_scheduler.clone(), region_scheduler: self.region_scheduler.clone(), apply_router: self.apply_router.clone(), router: self.router.clone(), @@ -1663,6 +1690,7 @@ impl RaftBatchSystem { collector_reg_handle: CollectorRegHandle, health_controller: HealthController, causal_ts_provider: Option>, // used for rawkv apiv2 + mut disk_check_runner: DiskCheckRunner, grpc_service_mgr: GrpcServiceManager, safe_point: Arc, ) -> Result<()> { @@ -1771,6 +1799,12 @@ impl RaftBatchSystem { let consistency_check_scheduler = workers .background_worker .start("consistency-check", consistency_check_runner); + // The scheduler dedicated to health checking the KvEngine disk when it's using + // a separate disk from RaftEngine. + disk_check_runner.bind_background_worker(workers.background_worker.clone()); + let disk_check_scheduler = workers + .background_worker + .start("disk-check-worker", disk_check_runner); self.store_writers.spawn( meta.get_id(), @@ -1789,6 +1823,7 @@ impl RaftBatchSystem { split_check_scheduler, region_scheduler, snap_gen_scheduler, + disk_check_scheduler, pd_scheduler: workers.pd_worker.scheduler(), consistency_check_scheduler, cleanup_scheduler, @@ -1932,7 +1967,7 @@ impl RaftBatchSystem { causal_ts_provider, grpc_service_mgr, ); - assert!(workers.pd_worker.start_with_timer(pd_runner)); + assert!(workers.pd_worker.start(pd_runner)); if let Err(e) = sys_util::thread::set_priority(sys_util::HIGH_PRI) { warn!("set thread priority for raftstore failed"; "error" => ?e); diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index 762ce4d3001..9428c5025db 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -858,8 +858,11 @@ lazy_static! { exponential_buckets(0.00001, 2.0, 26).unwrap() ).unwrap(); - pub static ref STORE_SLOW_SCORE_GAUGE: Gauge = - register_gauge!("tikv_raftstore_slow_score", "Slow score of the store.").unwrap(); + pub static ref STORE_SLOW_SCORE_GAUGE: IntGaugeVec = register_int_gauge_vec!( + "tikv_raftstore_slow_score", + "Slow score of the store.", + &["type"] + ).unwrap(); pub static ref STORE_SLOW_TREND_GAUGE: Gauge = register_gauge!("tikv_raftstore_slow_trend", "Slow trend changing rate.").unwrap(); diff --git a/components/raftstore/src/store/mod.rs b/components/raftstore/src/store/mod.rs index 9e8e66b7522..2c9c92ebbe3 100644 --- a/components/raftstore/src/store/mod.rs +++ b/components/raftstore/src/store/mod.rs @@ -88,13 +88,14 @@ pub use self::{ worker::{ metrics as worker_metrics, need_compact, AutoSplitController, BatchComponent, Bucket, BucketRange, BucketStatsInfo, CachedReadDelegate, CheckLeaderRunner, CheckLeaderTask, - CompactThreshold, FlowStatistics, FlowStatsReporter, FullCompactController, KeyEntry, - LocalReadContext, LocalReader, LocalReaderCore, PdStatsMonitor, PdTask, ReadDelegate, - ReadExecutor, ReadExecutorProvider, ReadProgress, ReadStats, RefreshConfigTask, RegionTask, - SnapGenTask, SplitCheckRunner, SplitCheckTask, SplitConfig, SplitConfigManager, SplitInfo, - StoreMetaDelegate, StoreStatsReporter, TrackVer, WriteStats, WriterContoller, - BIG_REGION_CPU_OVERLOAD_THRESHOLD_RATIO, DEFAULT_BIG_REGION_BYTE_THRESHOLD, - DEFAULT_BIG_REGION_QPS_THRESHOLD, DEFAULT_BYTE_THRESHOLD, DEFAULT_QPS_THRESHOLD, - NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT, REGION_CPU_OVERLOAD_THRESHOLD_RATIO, + CompactThreshold, DiskCheckRunner, FlowStatistics, FlowStatsReporter, + FullCompactController, KeyEntry, LocalReadContext, LocalReader, LocalReaderCore, + PdStatsMonitor, PdTask, ReadDelegate, ReadExecutor, ReadExecutorProvider, ReadProgress, + ReadStats, RefreshConfigTask, RegionTask, SnapGenTask, SplitCheckRunner, SplitCheckTask, + SplitConfig, SplitConfigManager, SplitInfo, StoreMetaDelegate, StoreStatsReporter, + TrackVer, WriteStats, WriterContoller, BIG_REGION_CPU_OVERLOAD_THRESHOLD_RATIO, + DEFAULT_BIG_REGION_BYTE_THRESHOLD, DEFAULT_BIG_REGION_QPS_THRESHOLD, + DEFAULT_BYTE_THRESHOLD, DEFAULT_QPS_THRESHOLD, NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT, + REGION_CPU_OVERLOAD_THRESHOLD_RATIO, }, }; diff --git a/components/raftstore/src/store/msg.rs b/components/raftstore/src/store/msg.rs index 4a2229949f2..dc88e881b20 100644 --- a/components/raftstore/src/store/msg.rs +++ b/components/raftstore/src/store/msg.rs @@ -8,7 +8,7 @@ use std::{borrow::Cow, fmt}; use collections::HashSet; use engine_traits::{CompactedEvent, KvEngine, Snapshot}; use futures::channel::mpsc::UnboundedSender; -use health_controller::types::LatencyInspector; +use health_controller::types::{InspectFactor, LatencyInspector}; use kvproto::{ brpb::CheckAdminResponse, kvrpcpb::{DiskFullOpt, ExtraOp as TxnExtraOp}, @@ -961,6 +961,7 @@ where /// Inspect the latency of raftstore. LatencyInspect { + factor: InspectFactor, send_time: Instant, inspector: LatencyInspector, }, diff --git a/components/raftstore/src/store/worker/disk_check.rs b/components/raftstore/src/store/worker/disk_check.rs new file mode 100644 index 00000000000..44c66892041 --- /dev/null +++ b/components/raftstore/src/store/worker/disk_check.rs @@ -0,0 +1,178 @@ +// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{ + fmt::{self, Display, Formatter}, + io::Write, + path::PathBuf, + time::Duration, +}; + +use crossbeam::channel::{bounded, Receiver, Sender}; +use health_controller::types::LatencyInspector; +use tikv_util::{ + time::Instant, + warn, + worker::{Runnable, Worker}, +}; + +#[derive(Debug)] +pub enum Task { + InspectLatency { inspector: LatencyInspector }, +} + +impl Display for Task { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match *self { + Task::InspectLatency { .. } => write!(f, "InspectLatency"), + } + } +} + +#[derive(Clone)] +/// A simple inspector to measure the latency of disk IO. +/// +/// This is used to measure the latency of disk IO, which is used to determine +/// the health status of the TiKV server. +/// The inspector writes a file to the disk and measures the time it takes to +/// complete the write operation. +pub struct Runner { + target: PathBuf, + notifier: Sender, + receiver: Receiver, + bg_worker: Option, +} + +impl Runner { + /// The filename to write to the disk to measure the latency. + const DISK_IO_LATENCY_INSPECT_FILENAME: &'static str = ".disk_latency_inspector.tmp"; + /// The content to write to the file to measure the latency. + const DISK_IO_LATENCY_INSPECT_FLUSH_STR: &'static [u8] = b"inspect disk io latency"; + + #[inline] + fn build(target: PathBuf) -> Self { + // The disk check mechanism only cares about the latency of the most + // recent request; older requests become stale and irrelevant. To avoid + // unnecessary accumulation of multiple requests, we set a small + // `capacity` for the disk check worker. + let (notifier, receiver) = bounded(3); + Self { + target, + notifier, + receiver, + bg_worker: None, + } + } + + #[inline] + pub fn new(inspect_dir: PathBuf) -> Self { + Self::build(inspect_dir.join(Self::DISK_IO_LATENCY_INSPECT_FILENAME)) + } + + #[inline] + /// Only for test. + /// Generate a dummy Runner. + pub fn dummy() -> Self { + Self::build(PathBuf::from("./").join(Self::DISK_IO_LATENCY_INSPECT_FILENAME)) + } + + #[inline] + pub fn bind_background_worker(&mut self, bg_worker: Worker) { + self.bg_worker = Some(bg_worker); + } + + fn inspect(&self) -> Option { + let mut file = std::fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&self.target) + .ok()?; + + let start = Instant::now(); + // Ignore the error + file.write_all(Self::DISK_IO_LATENCY_INSPECT_FLUSH_STR) + .ok()?; + file.sync_all().ok()?; + Some(start.saturating_elapsed()) + } + + fn execute(&self) { + if let Ok(task) = self.receiver.try_recv() { + match task { + Task::InspectLatency { mut inspector } => { + if let Some(latency) = self.inspect() { + inspector.record_apply_process(latency); + inspector.finish(); + } else { + warn!("failed to inspect disk io latency"); + } + } + } + } + } +} + +impl Runnable for Runner { + type Task = Task; + + fn run(&mut self, task: Task) { + // Send the task to the limited capacity channel. + if let Err(e) = self.notifier.try_send(task) { + warn!("failed to send task to disk check bg_worker: {:?}", e); + } else { + let runner = self.clone(); + if let Some(bg_worker) = self.bg_worker.as_ref() { + bg_worker.spawn_async_task(async move { + runner.execute(); + }); + } + } + } +} + +#[cfg(test)] +mod tests { + use tikv_util::worker::Builder; + + use super::*; + + #[test] + fn test_disk_check_runner() { + let background_worker = Builder::new("disk-check-worker") + .pending_capacity(256) + .create(); + let (tx, rx) = std::sync::mpsc::sync_channel(1); + let mut runner = Runner::dummy(); + runner.bind_background_worker(background_worker); + // Validate the disk check runner. + { + let tx_1 = tx.clone(); + let inspector = LatencyInspector::new( + 1, + Box::new(move |_, duration| { + let dur = duration.sum(); + tx_1.send(dur).unwrap(); + }), + ); + runner.run(Task::InspectLatency { inspector }); + let latency = rx.recv().unwrap(); + assert!(latency > Duration::from_secs(0)); + } + // Invalid bg_worker and out of capacity + { + runner.bg_worker = None; + for i in 2..=10 { + let tx_2 = tx.clone(); + let inspector = LatencyInspector::new( + i as u64, + Box::new(move |_, duration| { + let dur = duration.sum(); + tx_2.send(dur).unwrap(); + }), + ); + runner.run(Task::InspectLatency { inspector }); + rx.recv_timeout(Duration::from_secs(1)).unwrap_err(); + } + } + } +} diff --git a/components/raftstore/src/store/worker/mod.rs b/components/raftstore/src/store/worker/mod.rs index af620bdef6e..3cd4534b8f0 100644 --- a/components/raftstore/src/store/worker/mod.rs +++ b/components/raftstore/src/store/worker/mod.rs @@ -6,6 +6,7 @@ mod cleanup_snapshot; mod cleanup_sst; mod compact; mod consistency_check; +mod disk_check; pub mod metrics; mod pd; mod raftlog_gc; @@ -27,6 +28,7 @@ pub use self::{ Task as CompactTask, }, consistency_check::{Runner as ConsistencyCheckRunner, Task as ConsistencyCheckTask}, + disk_check::{Runner as DiskCheckRunner, Task as DiskCheckTask}, pd::{ new_change_peer_v2_request, FlowStatistics, FlowStatsReporter, HeartbeatTask, Runner as PdRunner, StatsMonitor as PdStatsMonitor, StoreStatsReporter, Task as PdTask, diff --git a/components/raftstore/src/store/worker/pd.rs b/components/raftstore/src/store/worker/pd.rs index 4f4d6b85034..5665318c259 100644 --- a/components/raftstore/src/store/worker/pd.rs +++ b/components/raftstore/src/store/worker/pd.rs @@ -22,7 +22,7 @@ use fail::fail_point; use futures::{compat::Future01CompatExt, FutureExt}; use health_controller::{ reporters::{RaftstoreReporter, RaftstoreReporterConfig}, - types::{LatencyInspector, RaftstoreDuration}, + types::{InspectFactor, LatencyInspector, RaftstoreDuration}, HealthController, }; use kvproto::{ @@ -50,7 +50,7 @@ use tikv_util::{ timer::GLOBAL_TIMER_HANDLE, topn::TopN, warn, - worker::{Runnable, RunnableWithTimer, ScheduleError, Scheduler}, + worker::{Runnable, ScheduleError, Scheduler}, }; use txn_types::TimeStamp; use yatp::Remote; @@ -201,6 +201,7 @@ where }, UpdateSlowScore { id: u64, + factor: InspectFactor, duration: RaftstoreDuration, }, RegionCpuRecords(Arc), @@ -210,6 +211,9 @@ where }, ReportBuckets(BucketStat), ControlGrpcServer(pdpb::ControlGrpcEvent), + InspectLatency { + factor: InspectFactor, + }, } pub struct StoreStat { @@ -449,8 +453,16 @@ where Task::QueryRegionLeader { region_id } => { write!(f, "query the leader of region {}", region_id) } - Task::UpdateSlowScore { id, ref duration } => { - write!(f, "compute slow score: id {}, duration {:?}", id, duration) + Task::UpdateSlowScore { + id, + factor, + ref duration, + } => { + write!( + f, + "compute slow score: id {}, factor: {:?}, duration {:?}", + id, factor, duration + ) } Task::RegionCpuRecords(ref cpu_records) => { write!(f, "get region cpu records: {:?}", cpu_records) @@ -471,6 +483,9 @@ where Task::ControlGrpcServer(ref event) => { write!(f, "control grpc server: {:?}", event) } + Task::InspectLatency { factor } => { + write!(f, "inspect raftstore latency: {:?}", factor) + } } } } @@ -519,7 +534,7 @@ pub trait StoreStatsReporter: Send + Clone + Sync + 'static + Collector { ); fn report_min_resolved_ts(&self, store_id: u64, min_resolved_ts: u64); fn auto_split(&self, split_infos: Vec); - fn update_latency_stats(&self, timer_tick: u64); + fn update_latency_stats(&self, timer_tick: u64, factor: InspectFactor); } impl StoreStatsReporter for WrappedScheduler @@ -569,9 +584,16 @@ where } } - fn update_latency_stats(&self, timer_tick: u64) { - debug!("update latency statistics not implemented for raftstore-v1"; + fn update_latency_stats(&self, timer_tick: u64, factor: InspectFactor) { + debug!("update latency statistics for raftstore-v1"; "tick" => timer_tick); + let task = Task::InspectLatency { factor }; + if let Err(e) = self.0.schedule(task) { + error!( + "failed to send inspect raftstore latency task to pd worker"; + "err" => ?e, + ); + } } } @@ -588,13 +610,19 @@ where load_base_split_check_interval: Duration, collect_tick_interval: Duration, inspect_latency_interval: Duration, + inspect_kvdb_latency_interval: Duration, } impl StatsMonitor where T: StoreStatsReporter, { - pub fn new(interval: Duration, inspect_latency_interval: Duration, reporter: T) -> Self { + pub fn new( + interval: Duration, + inspect_latency_interval: Duration, + inspect_kvdb_latency_interval: Duration, + reporter: T, + ) -> Self { StatsMonitor { reporter, handle: None, @@ -612,6 +640,7 @@ where cmp::min(default_collect_tick_interval(), interval), ), inspect_latency_interval, + inspect_kvdb_latency_interval, } } @@ -641,9 +670,12 @@ where let load_base_split_check_interval = self .load_base_split_check_interval .div_duration_f64(tick_interval) as u64; - let update_latency_stats_interval = self - .inspect_latency_interval - .div_duration_f64(tick_interval) as u64; + let update_raftdisk_latency_stats_interval = + self.inspect_latency_interval + .div_duration_f64(tick_interval) as u64; + let update_kvdisk_latency_stats_interval = + self.inspect_kvdb_latency_interval + .div_duration_f64(tick_interval) as u64; let (timer_tx, timer_rx) = mpsc::channel(); self.timer = Some(timer_tx); @@ -704,8 +736,11 @@ where &mut region_cpu_records_collector, ); } - if is_enable_tick(timer_cnt, update_latency_stats_interval) { - reporter.update_latency_stats(timer_cnt); + if is_enable_tick(timer_cnt, update_raftdisk_latency_stats_interval) { + reporter.update_latency_stats(timer_cnt, InspectFactor::RaftDisk); + } + if is_enable_tick(timer_cnt, update_kvdisk_latency_stats_interval) { + reporter.update_latency_stats(timer_cnt, InspectFactor::KvDisk); } timer_cnt += 1; } @@ -895,6 +930,7 @@ where let mut stats_monitor = StatsMonitor::new( interval, cfg.inspect_interval.0, + cfg.inspect_kvdb_interval.0, WrappedScheduler(scheduler.clone()), ); if let Err(e) = stats_monitor.start(auto_split_controller, collector_reg_handle) { @@ -903,6 +939,7 @@ where let health_reporter_config = RaftstoreReporterConfig { inspect_interval: cfg.inspect_interval.0, + inspect_kvdb_interval: cfg.inspect_kvdb_interval.0, unsensitive_cause: cfg.slow_trend_unsensitive_cause, unsensitive_result: cfg.slow_trend_unsensitive_result, @@ -1890,6 +1927,89 @@ where } } } + + fn handle_inspect_latency(&mut self, factor: InspectFactor) { + let slow_score_tick_result = self + .health_reporter + .tick(self.store_stat.maybe_busy(), factor); + if let Some(score) = slow_score_tick_result.updated_score { + STORE_SLOW_SCORE_GAUGE + .with_label_values(&[factor.as_str()]) + .set(score as i64); + } + let id = slow_score_tick_result.tick_id; + let scheduler = self.scheduler.clone(); + let inspector = { + match factor { + InspectFactor::RaftDisk => { + // If the last slow_score already reached abnormal state and was delayed for + // reporting by `store-heartbeat` to PD, we should report it here manually as + // a FAKE `store-heartbeat`. + if slow_score_tick_result.should_force_report_slow_store + && self.is_store_heartbeat_delayed() + { + self.handle_fake_store_heartbeat(); + } + LatencyInspector::new( + id, + Box::new(move |id, duration| { + STORE_INSPECT_DURATION_HISTOGRAM + .with_label_values(&["store_wait"]) + .observe(tikv_util::time::duration_to_sec( + duration.store_wait_duration.unwrap_or_default(), + )); + STORE_INSPECT_DURATION_HISTOGRAM + .with_label_values(&["store_commit"]) + .observe(tikv_util::time::duration_to_sec( + duration.store_commit_duration.unwrap_or_default(), + )); + + STORE_INSPECT_DURATION_HISTOGRAM + .with_label_values(&["all"]) + .observe(tikv_util::time::duration_to_sec(duration.sum())); + if let Err(e) = scheduler.schedule(Task::UpdateSlowScore { + id, + factor, + duration, + }) { + warn!("schedule pd task failed"; "err" => ?e); + } + }), + ) + } + InspectFactor::KvDisk => LatencyInspector::new( + id, + Box::new(move |id, duration| { + STORE_INSPECT_DURATION_HISTOGRAM + .with_label_values(&["apply_wait"]) + .observe(tikv_util::time::duration_to_sec( + duration.apply_wait_duration.unwrap_or_default(), + )); + STORE_INSPECT_DURATION_HISTOGRAM + .with_label_values(&["apply_process"]) + .observe(tikv_util::time::duration_to_sec( + duration.apply_process_duration.unwrap_or_default(), + )); + if let Err(e) = scheduler.schedule(Task::UpdateSlowScore { + id, + factor, + duration, + }) { + warn!("schedule pd task failed"; "err" => ?e); + } + }), + ), + } + }; + let msg = StoreMsg::LatencyInspect { + factor, + send_time: TiInstant::now(), + inspector, + }; + if let Err(e) = self.router.send_control(msg) { + warn!("pd worker send latency inspecter failed"; "err" => ?e); + } + } } fn calculate_region_cpu_records( @@ -2140,9 +2260,14 @@ where txn_ext, } => self.handle_update_max_timestamp(region_id, initial_status, txn_ext), Task::QueryRegionLeader { region_id } => self.handle_query_region_leader(region_id), - Task::UpdateSlowScore { id, duration } => { + Task::UpdateSlowScore { + id, + factor, + duration, + } => { self.health_reporter.record_raftstore_duration( id, + factor, duration, !self.store_stat.maybe_busy(), ); @@ -2158,6 +2283,9 @@ where Task::ControlGrpcServer(event) => { self.handle_control_grpc_server(event); } + Task::InspectLatency { factor } => { + self.handle_inspect_latency(factor); + } }; } @@ -2166,71 +2294,6 @@ where } } -impl RunnableWithTimer for Runner -where - EK: KvEngine, - ER: RaftEngine, - T: PdClient + 'static, -{ - fn on_timeout(&mut self) { - let slow_score_tick_result = self.health_reporter.tick(self.store_stat.maybe_busy()); - if let Some(score) = slow_score_tick_result.updated_score { - STORE_SLOW_SCORE_GAUGE.set(score); - } - - // If the last slow_score already reached abnormal state and was delayed for - // reporting by `store-heartbeat` to PD, we should report it here manually as - // a FAKE `store-heartbeat`. - if slow_score_tick_result.should_force_report_slow_store - && self.is_store_heartbeat_delayed() - { - self.handle_fake_store_heartbeat(); - } - - let id = slow_score_tick_result.tick_id; - - let scheduler = self.scheduler.clone(); - let inspector = LatencyInspector::new( - id, - Box::new(move |id, duration| { - STORE_INSPECT_DURATION_HISTOGRAM - .with_label_values(&["store_process"]) - .observe(tikv_util::time::duration_to_sec( - duration.store_process_duration.unwrap_or_default(), - )); - STORE_INSPECT_DURATION_HISTOGRAM - .with_label_values(&["store_wait"]) - .observe(tikv_util::time::duration_to_sec( - duration.store_wait_duration.unwrap_or_default(), - )); - STORE_INSPECT_DURATION_HISTOGRAM - .with_label_values(&["store_commit"]) - .observe(tikv_util::time::duration_to_sec( - duration.store_commit_duration.unwrap_or_default(), - )); - - STORE_INSPECT_DURATION_HISTOGRAM - .with_label_values(&["all"]) - .observe(tikv_util::time::duration_to_sec(duration.sum())); - if let Err(e) = scheduler.schedule(Task::UpdateSlowScore { id, duration }) { - warn!("schedule pd task failed"; "err" => ?e); - } - }), - ); - let msg = StoreMsg::LatencyInspect { - send_time: TiInstant::now(), - inspector, - }; - if let Err(e) = self.router.send_control(msg) { - warn!("pd worker send latency inspecter failed"; "err" => ?e); - } - } - - fn get_interval(&self) -> Duration { - self.health_reporter.get_tick_interval() - } -} - fn new_change_peer_request(change_type: ConfChangeType, peer: metapb::Peer) -> AdminRequest { let mut req = AdminRequest::default(); req.set_cmd_type(AdminCmdType::ChangePeer); @@ -2519,6 +2582,7 @@ mod tests { let mut stats_monitor = StatsMonitor::new( Duration::from_secs(interval), Duration::from_secs(interval), + Duration::default(), WrappedScheduler(scheduler), ); if let Err(e) = stats_monitor.start( @@ -2767,6 +2831,7 @@ mod tests { let mut stats_monitor = StatsMonitor::new( Duration::from_secs(interval), Duration::from_secs(interval), + Duration::default(), WrappedScheduler(pd_worker.scheduler()), ); stats_monitor diff --git a/components/server/src/server.rs b/components/server/src/server.rs index da6a7a85b76..35f160de2fd 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -74,8 +74,8 @@ use raftstore::{ }, memory::MEMTRACE_ROOT as MEMTRACE_RAFTSTORE, snapshot_backup::PrepareDiskSnapObserver, - AutoSplitController, CheckLeaderRunner, LocalReader, SnapManager, SnapManagerBuilder, - SplitCheckRunner, SplitConfigManager, StoreMetaDelegate, + AutoSplitController, CheckLeaderRunner, DiskCheckRunner, LocalReader, SnapManager, + SnapManagerBuilder, SplitCheckRunner, SplitConfigManager, StoreMetaDelegate, }, RaftRouterCompactedEventSender, }; @@ -811,6 +811,13 @@ where let server_config = Arc::new(VersionTrack::new(self.core.config.server.clone())); self.core.config.raft_store.optimize_for(false); + self.core + .config + .raft_store + .optimize_inspector(path_in_diff_mount_point( + engines.engines.raft.get_engine_path().to_string().as_str(), + engines.engines.kv.path(), + )); self.core .config .raft_store @@ -1024,6 +1031,8 @@ where .registry .register_consistency_check_observer(100, observer); + let disk_check_runner = DiskCheckRunner::new(self.core.store_path.clone()); + raft_server .start( engines.engines.clone(), @@ -1038,6 +1047,7 @@ where self.concurrency_manager.clone(), collector_reg_handle, self.causal_ts_provider.clone(), + disk_check_runner, self.grpc_service_mgr.clone(), safe_point.clone(), ) diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index 26319d43e27..c87609a0c02 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -346,6 +346,7 @@ impl Simulator for NodeCluster { cm, CollectorRegHandle::new_for_test(), None, + DiskCheckRunner::dummy(), GrpcServiceManager::dummy(), Arc::new(AtomicU64::new(0)), )?; diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index d73157c51ac..a50e226f640 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -42,8 +42,9 @@ use raftstore::{ store::{ fsm::{store::StoreMeta, ApplyRouter, RaftBatchSystem, RaftRouter}, msg::RaftCmdExtraOpts, - AutoSplitController, Callback, CheckLeaderRunner, LocalReader, RegionSnapshot, SnapManager, - SnapManagerBuilder, SplitCheckRunner, SplitConfigManager, StoreMetaDelegate, + AutoSplitController, Callback, CheckLeaderRunner, DiskCheckRunner, LocalReader, + RegionSnapshot, SnapManager, SnapManagerBuilder, SplitCheckRunner, SplitConfigManager, + StoreMetaDelegate, }, Result, }; @@ -681,6 +682,7 @@ impl ServerCluster { concurrency_manager.clone(), collector_reg_handle, causal_ts_provider, + DiskCheckRunner::dummy(), GrpcServiceManager::dummy(), Arc::new(AtomicU64::new(0)), )?; diff --git a/metrics/grafana/tikv_details.dashboard.py b/metrics/grafana/tikv_details.dashboard.py index 88d4d09a5e0..d647293b38e 100644 --- a/metrics/grafana/tikv_details.dashboard.py +++ b/metrics/grafana/tikv_details.dashboard.py @@ -9841,6 +9841,7 @@ def SlowTrendStatistics() -> RowPanel: target( expr=expr_sum( "tikv_raftstore_slow_score", + by_labels=["instance", "type"], ), ), ], diff --git a/metrics/grafana/tikv_details.json b/metrics/grafana/tikv_details.json index 17b1984f098..c630c914cc6 100644 --- a/metrics/grafana/tikv_details.json +++ b/metrics/grafana/tikv_details.json @@ -57183,15 +57183,15 @@ "targets": [ { "datasource": "${DS_TEST-CLUSTER}", - "expr": "sum((\n tikv_raftstore_slow_score\n {k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",instance=~\"$instance\"}\n \n)) by (instance) ", + "expr": "sum((\n tikv_raftstore_slow_score\n {k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",instance=~\"$instance\"}\n \n)) by (instance, type) ", "format": "time_series", "hide": false, "instant": false, "interval": "", "intervalFactor": 1, - "legendFormat": "{{instance}}", + "legendFormat": "{{instance}}-{{type}}", "metric": "", - "query": "sum((\n tikv_raftstore_slow_score\n {k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",instance=~\"$instance\"}\n \n)) by (instance) ", + "query": "sum((\n tikv_raftstore_slow_score\n {k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",instance=~\"$instance\"}\n \n)) by (instance, type) ", "refId": "", "step": 10, "target": "" diff --git a/metrics/grafana/tikv_details.json.sha256 b/metrics/grafana/tikv_details.json.sha256 index 33425b3b570..a265e768e7e 100644 --- a/metrics/grafana/tikv_details.json.sha256 +++ b/metrics/grafana/tikv_details.json.sha256 @@ -1 +1 @@ -857f7b413acdc5cafdcb68bf40ec6184d6c30ad0a4bc8a351431d8f25d552752 ./metrics/grafana/tikv_details.json +cb8f293df9c6a35f210f292e493bcf95571c6dd7be22eaea258fb2edc42b5007 ./metrics/grafana/tikv_details.json diff --git a/src/server/raft_server.rs b/src/server/raft_server.rs index 36c0cab22fc..15e98441583 100644 --- a/src/server/raft_server.rs +++ b/src/server/raft_server.rs @@ -20,8 +20,8 @@ use raftstore::{ store::{ self, fsm::{store::StoreMeta, ApplyRouter, RaftBatchSystem, RaftRouter}, - initial_region, AutoSplitController, Config as StoreConfig, GlobalReplicationState, PdTask, - RefreshConfigTask, SnapManager, SplitCheckTask, Transport, + initial_region, AutoSplitController, Config as StoreConfig, DiskCheckRunner, + GlobalReplicationState, PdTask, RefreshConfigTask, SnapManager, SplitCheckTask, Transport, }, }; use resource_metering::CollectorRegHandle; @@ -172,6 +172,7 @@ where concurrency_manager: ConcurrencyManager, collector_reg_handle: CollectorRegHandle, causal_ts_provider: Option>, // used for rawkv apiv2 + disk_check_runner: DiskCheckRunner, grpc_service_mgr: GrpcServiceManager, safe_point: Arc, ) -> Result<()> @@ -211,6 +212,7 @@ where concurrency_manager, collector_reg_handle, causal_ts_provider, + disk_check_runner, grpc_service_mgr, safe_point, )?; @@ -460,6 +462,7 @@ where concurrency_manager: ConcurrencyManager, collector_reg_handle: CollectorRegHandle, causal_ts_provider: Option>, // used for rawkv apiv2 + disk_check_runner: DiskCheckRunner, grpc_service_mgr: GrpcServiceManager, safe_point: Arc, ) -> Result<()> @@ -495,6 +498,7 @@ where collector_reg_handle, self.health_controller.clone(), causal_ts_provider, + disk_check_runner, grpc_service_mgr, safe_point, )?; diff --git a/tests/integrations/config/dynamic/raftstore.rs b/tests/integrations/config/dynamic/raftstore.rs index 003f9851642..7c39487d4dd 100644 --- a/tests/integrations/config/dynamic/raftstore.rs +++ b/tests/integrations/config/dynamic/raftstore.rs @@ -16,7 +16,7 @@ use raftstore::{ store::{ config::{Config, RaftstoreConfigManager}, fsm::{StoreMeta, *}, - AutoSplitController, SnapManager, StoreMsg, Transport, + AutoSplitController, DiskCheckRunner, SnapManager, StoreMsg, Transport, }, Result, }; @@ -114,6 +114,7 @@ fn start_raftstore( CollectorRegHandle::new_for_test(), HealthController::new(), None, + DiskCheckRunner::dummy(), GrpcServiceManager::dummy(), Arc::new(AtomicU64::new(0)), ) diff --git a/tests/integrations/raftstore/test_bootstrap.rs b/tests/integrations/raftstore/test_bootstrap.rs index 99ad19c21c1..e994b7cfc68 100644 --- a/tests/integrations/raftstore/test_bootstrap.rs +++ b/tests/integrations/raftstore/test_bootstrap.rs @@ -14,7 +14,10 @@ use health_controller::HealthController; use kvproto::{kvrpcpb::ApiVersion, metapb, raft_serverpb::RegionLocalState}; use raftstore::{ coprocessor::CoprocessorHost, - store::{bootstrap_store, fsm, fsm::store::StoreMeta, AutoSplitController, SnapManager}, + store::{ + bootstrap_store, fsm, fsm::store::StoreMeta, AutoSplitController, DiskCheckRunner, + SnapManager, + }, }; use raftstore_v2::router::PeerMsg; use resource_metering::CollectorRegHandle; @@ -122,6 +125,7 @@ fn test_node_bootstrap_with_prepared_data() { ConcurrencyManager::new(1.into()), CollectorRegHandle::new_for_test(), None, + DiskCheckRunner::dummy(), GrpcServiceManager::dummy(), Arc::new(AtomicU64::new(0)), ) diff --git a/tests/integrations/raftstore/test_status_command.rs b/tests/integrations/raftstore/test_status_command.rs index 37e78de3d50..0d42c1ec869 100644 --- a/tests/integrations/raftstore/test_status_command.rs +++ b/tests/integrations/raftstore/test_status_command.rs @@ -1,11 +1,11 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. -use health_controller::types::LatencyInspector; +use health_controller::types::{InspectFactor, LatencyInspector}; use raftstore::store::msg::StoreMsg as StoreMsgV1; use raftstore_v2::router::StoreMsg as StoreMsgV2; use test_raftstore::Simulator as S1; use test_raftstore_v2::Simulator as S2; -use tikv_util::{time::Instant, HandyRwLock}; +use tikv_util::{config::ReadableDuration, time::Instant, HandyRwLock}; #[test] fn test_region_detail() { @@ -33,6 +33,7 @@ fn test_region_detail() { fn test_latency_inspect() { let mut cluster_v1 = test_raftstore::new_node_cluster(0, 1); cluster_v1.cfg.raft_store.store_io_pool_size = 2; + cluster_v1.cfg.raft_store.inspect_kvdb_interval = ReadableDuration::millis(500); cluster_v1.run(); let mut cluster_v2 = test_raftstore_v2::new_node_cluster(0, 1); cluster_v2.run(); @@ -43,19 +44,24 @@ fn test_latency_inspect() { { // Test send LatencyInspect to V1. let (tx, rx) = std::sync::mpsc::sync_channel(10); - let inspector = LatencyInspector::new( - 1, - Box::new(move |_, duration| { - let dur = duration.sum(); - tx.send(dur).unwrap(); - }), - ); - let msg = StoreMsgV1::LatencyInspect { - send_time: Instant::now(), - inspector, - }; - router_v1.send_control(msg).unwrap(); - rx.recv_timeout(std::time::Duration::from_secs(2)).unwrap(); + // Inspect different factors. + for factor in [InspectFactor::RaftDisk, InspectFactor::KvDisk].iter() { + let cloned_tx = tx.clone(); + let inspector = LatencyInspector::new( + 1, + Box::new(move |_, duration| { + let dur = duration.sum(); + cloned_tx.send(dur).unwrap(); + }), + ); + let msg = StoreMsgV1::LatencyInspect { + factor: *factor, + send_time: Instant::now(), + inspector, + }; + router_v1.send_control(msg).unwrap(); + rx.recv_timeout(std::time::Duration::from_secs(2)).unwrap(); + } } { // Test send LatencyInspect to V2. @@ -83,17 +89,22 @@ fn test_sync_latency_inspect() { cluster.run(); let router = cluster.sim.wl().get_router(1).unwrap(); let (tx, rx) = std::sync::mpsc::sync_channel(10); - let inspector = LatencyInspector::new( - 1, - Box::new(move |_, duration| { - let dur = duration.sum(); - tx.send(dur).unwrap(); - }), - ); - let msg = StoreMsgV1::LatencyInspect { - send_time: Instant::now(), - inspector, - }; - router.send_control(msg).unwrap(); - rx.recv_timeout(std::time::Duration::from_secs(2)).unwrap(); + // Inspect different factors. + for factor in [InspectFactor::RaftDisk, InspectFactor::KvDisk].iter() { + let cloned_tx = tx.clone(); + let inspector = LatencyInspector::new( + 1, + Box::new(move |_, duration| { + let dur = duration.sum(); + cloned_tx.send(dur).unwrap(); + }), + ); + let msg = StoreMsgV1::LatencyInspect { + factor: *factor, + send_time: Instant::now(), + inspector, + }; + router.send_control(msg).unwrap(); + rx.recv_timeout(std::time::Duration::from_secs(2)).unwrap(); + } } diff --git a/tests/integrations/server/kv_service.rs b/tests/integrations/server/kv_service.rs index 3fc08306688..52eb3563dff 100644 --- a/tests/integrations/server/kv_service.rs +++ b/tests/integrations/server/kv_service.rs @@ -30,7 +30,7 @@ use pd_client::PdClient; use raft::eraftpb; use raftstore::{ coprocessor::CoprocessorHost, - store::{fsm::store::StoreMeta, AutoSplitController, SnapManager}, + store::{fsm::store::StoreMeta, AutoSplitController, DiskCheckRunner, SnapManager}, }; use resource_metering::CollectorRegHandle; use service::service_manager::GrpcServiceManager; @@ -1411,6 +1411,7 @@ fn test_double_run_node() { ConcurrencyManager::new(1.into()), CollectorRegHandle::new_for_test(), None, + DiskCheckRunner::dummy(), GrpcServiceManager::dummy(), Arc::new(AtomicU64::new(0)), )