From fb2b17c0da400098c5e7a573daa2b6f9afd65728 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 26 Feb 2024 15:53:21 +0800 Subject: [PATCH] feat(sql-backend): support diagnose in sql-backend (#15253) --- src/meta/model_v2/src/table.rs | 2 +- src/meta/node/src/server.rs | 21 ++--- src/meta/src/controller/catalog.rs | 46 +++++++++++ src/meta/src/controller/fragment.rs | 17 ++++ src/meta/src/dashboard/mod.rs | 10 +-- src/meta/src/manager/catalog/fragment.rs | 16 ++++ src/meta/src/manager/diagnose.rs | 99 +++++++++++++++--------- src/meta/src/manager/metadata.rs | 13 ++++ 8 files changed, 163 insertions(+), 61 deletions(-) diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 06710d42d9d25..0039b9cccc2e0 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -24,7 +24,7 @@ use crate::{ TableId, TableVersion, }; -#[derive(Clone, Debug, PartialEq, Copy, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum TableType { #[sea_orm(string_value = "TABLE")] diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 4792293863237..3397317fe1939 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -478,20 +478,13 @@ pub async fn start_service_as_election_leader( prometheus_http_query::Client::from_str(x).unwrap() }); let prometheus_selector = opts.prometheus_selector.unwrap_or_default(); - let diagnose_command = match &metadata_manager { - MetadataManager::V1(mgr) => Some(Arc::new( - risingwave_meta::manager::diagnose::DiagnoseCommand::new( - mgr.cluster_manager.clone(), - mgr.catalog_manager.clone(), - mgr.fragment_manager.clone(), - hummock_manager.clone(), - env.event_log_manager_ref(), - prometheus_client.clone(), - prometheus_selector.clone(), - ), - )), - MetadataManager::V2(_) => None, - }; + let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new( + metadata_manager.clone(), + hummock_manager.clone(), + env.event_log_manager_ref(), + prometheus_client.clone(), + prometheus_selector.clone(), + )); let trace_state = otlp_embedded::State::new(otlp_embedded::Config { max_length: opts.cached_traces_num, diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index a1efaa756bb44..fae19874ab407 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2446,6 +2446,18 @@ impl CatalogController { } } +/// `CatalogStats` is a struct to store the statistics of all catalogs. +pub struct CatalogStats { + pub table_num: u64, + pub mview_num: u64, + pub index_num: u64, + pub source_num: u64, + pub sink_num: u64, + pub function_num: u64, + pub streaming_job_num: u64, + pub actor_num: u64, +} + impl CatalogControllerInner { pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec)> { let databases = self.list_databases().await?; @@ -2476,6 +2488,40 @@ impl CatalogControllerInner { )) } + pub async fn stats(&self) -> MetaResult { + let mut table_num_map: HashMap<_, _> = Table::find() + .select_only() + .column(table::Column::TableType) + .column_as(table::Column::TableId.count(), "num") + .group_by(table::Column::TableType) + .having(table::Column::TableType.ne(TableType::Internal)) + .into_tuple::<(TableType, i64)>() + .all(&self.db) + .await? + .into_iter() + .map(|(table_type, num)| (table_type, num as u64)) + .collect(); + + let source_num = Source::find().count(&self.db).await?; + let sink_num = Sink::find().count(&self.db).await?; + let function_num = Function::find().count(&self.db).await?; + let streaming_job_num = StreamingJob::find().count(&self.db).await?; + let actor_num = Actor::find().count(&self.db).await?; + + Ok(CatalogStats { + table_num: table_num_map.remove(&TableType::Table).unwrap_or(0), + mview_num: table_num_map + .remove(&TableType::MaterializedView) + .unwrap_or(0), + index_num: table_num_map.remove(&TableType::Index).unwrap_or(0), + source_num, + sink_num, + function_num, + streaming_job_num, + actor_num, + }) + } + async fn list_databases(&self) -> MetaResult> { let db_objs = Database::find() .find_also_related(Object) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 833e642a83e74..c9beada284d5d 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -702,6 +702,23 @@ impl CatalogController { Ok(count > 0) } + pub async fn worker_actor_count(&self) -> MetaResult> { + let inner = self.inner.read().await; + let actor_cnt: Vec<(WorkerId, i64)> = Actor::find() + .select_only() + .column(actor::Column::WorkerId) + .column_as(actor::Column::ActorId.count(), "count") + .group_by(actor::Column::WorkerId) + .into_tuple() + .all(&inner.db) + .await?; + + Ok(actor_cnt + .into_iter() + .map(|(worker_id, count)| (worker_id, count as usize)) + .collect()) + } + // TODO: This function is too heavy, we should avoid using it and implement others on demand. pub async fn table_fragments(&self) -> MetaResult> { let inner = self.inner.read().await; diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index b5104e557a1b2..b4ae8e2c04b36 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -47,7 +47,7 @@ pub struct DashboardService { pub metadata_manager: MetadataManager, pub compute_clients: ComputeClientPool, pub ui_path: Option, - pub diagnose_command: Option, + pub diagnose_command: DiagnoseCommandRef, pub trace_state: otlp_embedded::StateRef, } @@ -355,13 +355,7 @@ pub(super) mod handlers { } pub async fn diagnose(Extension(srv): Extension) -> Result { - let report = if let Some(cmd) = &srv.diagnose_command { - cmd.report().await - } else { - "Not supported in sql-backend".to_string() - }; - - Ok(report) + Ok(srv.diagnose_command.report().await) } pub async fn get_back_pressure( diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index fdae16efe5a7b..ecfac68833d0e 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -858,6 +858,22 @@ impl FragmentManager { actor_maps } + pub async fn node_actor_count(&self) -> HashMap { + let mut actor_count = HashMap::new(); + + let map = &self.core.read().await.table_fragments; + for fragments in map.values() { + for actor_status in fragments.actor_status.values() { + if let Some(pu) = &actor_status.parallel_unit { + let e = actor_count.entry(pu.worker_node_id).or_insert(0); + *e += 1; + } + } + } + + actor_count + } + // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments // return the actor_ids to be applied pub async fn update_source_rate_limit_by_source_id( diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index a8c1d050ab748..76184de9eb7ce 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cmp::{Ordering, Reverse}; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::BinaryHeap; use std::fmt::Write; use std::sync::Arc; @@ -27,17 +27,16 @@ use risingwave_pb::meta::EventLog; use risingwave_pb::monitor_service::StackTraceResponse; use risingwave_rpc_client::ComputeClientPool; use serde_json::json; +use thiserror_ext::AsReport; use crate::hummock::HummockManagerRef; use crate::manager::event_log::EventLogMangerRef; -use crate::manager::{CatalogManagerRef, ClusterManagerRef, FragmentManagerRef}; +use crate::manager::MetadataManager; pub type DiagnoseCommandRef = Arc; pub struct DiagnoseCommand { - cluster_manager: ClusterManagerRef, - catalog_manager: CatalogManagerRef, - fragment_manager: FragmentManagerRef, + metadata_manager: MetadataManager, hummock_manger: HummockManagerRef, event_log_manager: EventLogMangerRef, prometheus_client: Option, @@ -46,18 +45,14 @@ pub struct DiagnoseCommand { impl DiagnoseCommand { pub fn new( - cluster_manager: ClusterManagerRef, - catalog_manager: CatalogManagerRef, - fragment_manager: FragmentManagerRef, + metadata_manager: MetadataManager, hummock_manger: HummockManagerRef, event_log_manager: EventLogMangerRef, prometheus_client: Option, prometheus_selector: String, ) -> Self { Self { - cluster_manager, - catalog_manager, - fragment_manager, + metadata_manager, hummock_manger, event_log_manager, prometheus_client, @@ -90,49 +85,60 @@ impl DiagnoseCommand { #[cfg_attr(coverage, coverage(off))] async fn write_catalog(&self, s: &mut String) { + match &self.metadata_manager { + MetadataManager::V1(_) => self.write_catalog_v1(s).await, + MetadataManager::V2(_) => self.write_catalog_v2(s).await, + } + } + + #[cfg_attr(coverage, coverage(off))] + async fn write_catalog_v1(&self, s: &mut String) { + let mgr = self.metadata_manager.as_v1_ref(); let _ = writeln!(s, "number of fragment: {}", self.fragment_num().await); let _ = writeln!(s, "number of actor: {}", self.actor_num().await); let _ = writeln!( s, "number of source: {}", - self.catalog_manager.source_count().await + mgr.catalog_manager.source_count().await ); let _ = writeln!( s, "number of table: {}", - self.catalog_manager.table_count().await + mgr.catalog_manager.table_count().await ); let _ = writeln!( s, "number of materialized view: {}", - self.catalog_manager.materialized_view_count().await + mgr.catalog_manager.materialized_view_count().await ); let _ = writeln!( s, "number of sink: {}", - self.catalog_manager.sink_count().await + mgr.catalog_manager.sink_count().await ); let _ = writeln!( s, "number of index: {}", - self.catalog_manager.index_count().await + mgr.catalog_manager.index_count().await ); let _ = writeln!( s, "number of function: {}", - self.catalog_manager.function_count().await + mgr.catalog_manager.function_count().await ); } #[cfg_attr(coverage, coverage(off))] async fn fragment_num(&self) -> usize { - let core = self.fragment_manager.get_fragment_read_guard().await; + let mgr = self.metadata_manager.as_v1_ref(); + let core = mgr.fragment_manager.get_fragment_read_guard().await; core.table_fragments().len() } #[cfg_attr(coverage, coverage(off))] async fn actor_num(&self) -> usize { - let core = self.fragment_manager.get_fragment_read_guard().await; + let mgr = self.metadata_manager.as_v1_ref(); + let core = mgr.fragment_manager.get_fragment_read_guard().await; core.table_fragments() .values() .map(|t| t.actor_status.len()) @@ -140,25 +146,38 @@ impl DiagnoseCommand { } #[cfg_attr(coverage, coverage(off))] - async fn write_worker_nodes(&self, s: &mut String) { - let mut worker_actor_count: HashMap = HashMap::new(); - for f in self - .fragment_manager - .get_fragment_read_guard() - .await - .table_fragments() - .values() - { - for a in f.actor_status.values() { - if let Some(pu) = &a.parallel_unit { - let e = worker_actor_count.entry(pu.worker_node_id).or_insert(0); - *e += 1; - } + async fn write_catalog_v2(&self, s: &mut String) { + let mgr = self.metadata_manager.as_v2_ref(); + let guard = mgr.catalog_controller.get_inner_read_guard().await; + let stat = match guard.stats().await { + Ok(stat) => stat, + Err(err) => { + tracing::warn!(error=?err.as_report(), "failed to get catalog stats"); + return; } - } + }; + let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num); + let _ = writeln!(s, "number of actor: {}", stat.actor_num); + let _ = writeln!(s, "number of source: {}", stat.source_num); + let _ = writeln!(s, "number of table: {}", stat.table_num); + let _ = writeln!(s, "number of materialized view: {}", stat.mview_num); + let _ = writeln!(s, "number of sink: {}", stat.sink_num); + let _ = writeln!(s, "number of index: {}", stat.index_num); + let _ = writeln!(s, "number of function: {}", stat.function_num); + } + + #[cfg_attr(coverage, coverage(off))] + async fn write_worker_nodes(&self, s: &mut String) { + let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else { + tracing::warn!("failed to get worker actor count"); + return; + }; use comfy_table::{Row, Table}; - let worker_nodes = self.cluster_manager.list_worker_node(None, None).await; + let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else { + tracing::warn!("failed to get worker nodes"); + return; + }; let mut table = Table::new(); table.set_header({ let mut row = Row::new(); @@ -636,10 +655,14 @@ impl DiagnoseCommand { #[cfg_attr(coverage, coverage(off))] async fn write_await_tree(&self, s: &mut String) { // Most lines of code are copied from dashboard::handlers::dump_await_tree_all, because the latter cannot be called directly from here. - let worker_nodes = self - .cluster_manager + let Ok(worker_nodes) = self + .metadata_manager .list_worker_node(Some(WorkerType::ComputeNode), None) - .await; + .await + else { + tracing::warn!("failed to get worker nodes"); + return; + }; let mut all = Default::default(); diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index eef1c0c101256..fd9b51fa8ba09 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -637,6 +637,19 @@ impl MetadataManager { } } + pub async fn worker_actor_count(&self) -> MetaResult> { + match &self { + MetadataManager::V1(mgr) => Ok(mgr.fragment_manager.node_actor_count().await), + MetadataManager::V2(mgr) => { + let actor_cnt = mgr.catalog_controller.worker_actor_count().await?; + Ok(actor_cnt + .into_iter() + .map(|(id, cnt)| (id as WorkerId, cnt)) + .collect()) + } + } + } + pub async fn count_streaming_job(&self) -> MetaResult { match self { MetadataManager::V1(mgr) => Ok(mgr.fragment_manager.count_streaming_job().await),