Skip to content

Commit

Permalink
feat(sql-backend): support diagnose in sql-backend (#15253)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Feb 26, 2024
1 parent ead3e53 commit fb2b17c
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 61 deletions.
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
TableId, TableVersion,
};

#[derive(Clone, Debug, PartialEq, Copy, Eq, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum TableType {
#[sea_orm(string_value = "TABLE")]
Expand Down
21 changes: 7 additions & 14 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,20 +478,13 @@ pub async fn start_service_as_election_leader(
prometheus_http_query::Client::from_str(x).unwrap()
});
let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
let diagnose_command = match &metadata_manager {
MetadataManager::V1(mgr) => Some(Arc::new(
risingwave_meta::manager::diagnose::DiagnoseCommand::new(
mgr.cluster_manager.clone(),
mgr.catalog_manager.clone(),
mgr.fragment_manager.clone(),
hummock_manager.clone(),
env.event_log_manager_ref(),
prometheus_client.clone(),
prometheus_selector.clone(),
),
)),
MetadataManager::V2(_) => None,
};
let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new(
metadata_manager.clone(),
hummock_manager.clone(),
env.event_log_manager_ref(),
prometheus_client.clone(),
prometheus_selector.clone(),
));

let trace_state = otlp_embedded::State::new(otlp_embedded::Config {
max_length: opts.cached_traces_num,
Expand Down
46 changes: 46 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2446,6 +2446,18 @@ impl CatalogController {
}
}

/// `CatalogStats` is a struct to store the statistics of all catalogs.
pub struct CatalogStats {
pub table_num: u64,
pub mview_num: u64,
pub index_num: u64,
pub source_num: u64,
pub sink_num: u64,
pub function_num: u64,
pub streaming_job_num: u64,
pub actor_num: u64,
}

impl CatalogControllerInner {
pub async fn snapshot(&self) -> MetaResult<(Catalog, Vec<PbUserInfo>)> {
let databases = self.list_databases().await?;
Expand Down Expand Up @@ -2476,6 +2488,40 @@ impl CatalogControllerInner {
))
}

pub async fn stats(&self) -> MetaResult<CatalogStats> {
let mut table_num_map: HashMap<_, _> = Table::find()
.select_only()
.column(table::Column::TableType)
.column_as(table::Column::TableId.count(), "num")
.group_by(table::Column::TableType)
.having(table::Column::TableType.ne(TableType::Internal))
.into_tuple::<(TableType, i64)>()
.all(&self.db)
.await?
.into_iter()
.map(|(table_type, num)| (table_type, num as u64))
.collect();

let source_num = Source::find().count(&self.db).await?;
let sink_num = Sink::find().count(&self.db).await?;
let function_num = Function::find().count(&self.db).await?;
let streaming_job_num = StreamingJob::find().count(&self.db).await?;
let actor_num = Actor::find().count(&self.db).await?;

Ok(CatalogStats {
table_num: table_num_map.remove(&TableType::Table).unwrap_or(0),
mview_num: table_num_map
.remove(&TableType::MaterializedView)
.unwrap_or(0),
index_num: table_num_map.remove(&TableType::Index).unwrap_or(0),
source_num,
sink_num,
function_num,
streaming_job_num,
actor_num,
})
}

async fn list_databases(&self) -> MetaResult<Vec<PbDatabase>> {
let db_objs = Database::find()
.find_also_related(Object)
Expand Down
17 changes: 17 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,23 @@ impl CatalogController {
Ok(count > 0)
}

pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
let inner = self.inner.read().await;
let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
.select_only()
.column(actor::Column::WorkerId)
.column_as(actor::Column::ActorId.count(), "count")
.group_by(actor::Column::WorkerId)
.into_tuple()
.all(&inner.db)
.await?;

Ok(actor_cnt
.into_iter()
.map(|(worker_id, count)| (worker_id, count as usize))
.collect())
}

// TODO: This function is too heavy, we should avoid using it and implement others on demand.
pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, PbTableFragments>> {
let inner = self.inner.read().await;
Expand Down
10 changes: 2 additions & 8 deletions src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct DashboardService {
pub metadata_manager: MetadataManager,
pub compute_clients: ComputeClientPool,
pub ui_path: Option<String>,
pub diagnose_command: Option<DiagnoseCommandRef>,
pub diagnose_command: DiagnoseCommandRef,
pub trace_state: otlp_embedded::StateRef,
}

Expand Down Expand Up @@ -355,13 +355,7 @@ pub(super) mod handlers {
}

pub async fn diagnose(Extension(srv): Extension<Service>) -> Result<String> {
let report = if let Some(cmd) = &srv.diagnose_command {
cmd.report().await
} else {
"Not supported in sql-backend".to_string()
};

Ok(report)
Ok(srv.diagnose_command.report().await)
}

pub async fn get_back_pressure(
Expand Down
16 changes: 16 additions & 0 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,22 @@ impl FragmentManager {
actor_maps
}

pub async fn node_actor_count(&self) -> HashMap<WorkerId, usize> {
let mut actor_count = HashMap::new();

let map = &self.core.read().await.table_fragments;
for fragments in map.values() {
for actor_status in fragments.actor_status.values() {
if let Some(pu) = &actor_status.parallel_unit {
let e = actor_count.entry(pu.worker_node_id).or_insert(0);
*e += 1;
}
}
}

actor_count
}

// edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
// return the actor_ids to be applied
pub async fn update_source_rate_limit_by_source_id(
Expand Down
99 changes: 61 additions & 38 deletions src/meta/src/manager/diagnose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::cmp::{Ordering, Reverse};
use std::collections::{BinaryHeap, HashMap};
use std::collections::BinaryHeap;
use std::fmt::Write;
use std::sync::Arc;

Expand All @@ -27,17 +27,16 @@ use risingwave_pb::meta::EventLog;
use risingwave_pb::monitor_service::StackTraceResponse;
use risingwave_rpc_client::ComputeClientPool;
use serde_json::json;
use thiserror_ext::AsReport;

use crate::hummock::HummockManagerRef;
use crate::manager::event_log::EventLogMangerRef;
use crate::manager::{CatalogManagerRef, ClusterManagerRef, FragmentManagerRef};
use crate::manager::MetadataManager;

pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;

pub struct DiagnoseCommand {
cluster_manager: ClusterManagerRef,
catalog_manager: CatalogManagerRef,
fragment_manager: FragmentManagerRef,
metadata_manager: MetadataManager,
hummock_manger: HummockManagerRef,
event_log_manager: EventLogMangerRef,
prometheus_client: Option<prometheus_http_query::Client>,
Expand All @@ -46,18 +45,14 @@ pub struct DiagnoseCommand {

impl DiagnoseCommand {
pub fn new(
cluster_manager: ClusterManagerRef,
catalog_manager: CatalogManagerRef,
fragment_manager: FragmentManagerRef,
metadata_manager: MetadataManager,
hummock_manger: HummockManagerRef,
event_log_manager: EventLogMangerRef,
prometheus_client: Option<prometheus_http_query::Client>,
prometheus_selector: String,
) -> Self {
Self {
cluster_manager,
catalog_manager,
fragment_manager,
metadata_manager,
hummock_manger,
event_log_manager,
prometheus_client,
Expand Down Expand Up @@ -90,75 +85,99 @@ impl DiagnoseCommand {

#[cfg_attr(coverage, coverage(off))]
async fn write_catalog(&self, s: &mut String) {
match &self.metadata_manager {
MetadataManager::V1(_) => self.write_catalog_v1(s).await,
MetadataManager::V2(_) => self.write_catalog_v2(s).await,
}
}

#[cfg_attr(coverage, coverage(off))]
async fn write_catalog_v1(&self, s: &mut String) {
let mgr = self.metadata_manager.as_v1_ref();
let _ = writeln!(s, "number of fragment: {}", self.fragment_num().await);
let _ = writeln!(s, "number of actor: {}", self.actor_num().await);
let _ = writeln!(
s,
"number of source: {}",
self.catalog_manager.source_count().await
mgr.catalog_manager.source_count().await
);
let _ = writeln!(
s,
"number of table: {}",
self.catalog_manager.table_count().await
mgr.catalog_manager.table_count().await
);
let _ = writeln!(
s,
"number of materialized view: {}",
self.catalog_manager.materialized_view_count().await
mgr.catalog_manager.materialized_view_count().await
);
let _ = writeln!(
s,
"number of sink: {}",
self.catalog_manager.sink_count().await
mgr.catalog_manager.sink_count().await
);
let _ = writeln!(
s,
"number of index: {}",
self.catalog_manager.index_count().await
mgr.catalog_manager.index_count().await
);
let _ = writeln!(
s,
"number of function: {}",
self.catalog_manager.function_count().await
mgr.catalog_manager.function_count().await
);
}

#[cfg_attr(coverage, coverage(off))]
async fn fragment_num(&self) -> usize {
let core = self.fragment_manager.get_fragment_read_guard().await;
let mgr = self.metadata_manager.as_v1_ref();
let core = mgr.fragment_manager.get_fragment_read_guard().await;
core.table_fragments().len()
}

#[cfg_attr(coverage, coverage(off))]
async fn actor_num(&self) -> usize {
let core = self.fragment_manager.get_fragment_read_guard().await;
let mgr = self.metadata_manager.as_v1_ref();
let core = mgr.fragment_manager.get_fragment_read_guard().await;
core.table_fragments()
.values()
.map(|t| t.actor_status.len())
.sum()
}

#[cfg_attr(coverage, coverage(off))]
async fn write_worker_nodes(&self, s: &mut String) {
let mut worker_actor_count: HashMap<u32, usize> = HashMap::new();
for f in self
.fragment_manager
.get_fragment_read_guard()
.await
.table_fragments()
.values()
{
for a in f.actor_status.values() {
if let Some(pu) = &a.parallel_unit {
let e = worker_actor_count.entry(pu.worker_node_id).or_insert(0);
*e += 1;
}
async fn write_catalog_v2(&self, s: &mut String) {
let mgr = self.metadata_manager.as_v2_ref();
let guard = mgr.catalog_controller.get_inner_read_guard().await;
let stat = match guard.stats().await {
Ok(stat) => stat,
Err(err) => {
tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
return;
}
}
};
let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
let _ = writeln!(s, "number of actor: {}", stat.actor_num);
let _ = writeln!(s, "number of source: {}", stat.source_num);
let _ = writeln!(s, "number of table: {}", stat.table_num);
let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
let _ = writeln!(s, "number of sink: {}", stat.sink_num);
let _ = writeln!(s, "number of index: {}", stat.index_num);
let _ = writeln!(s, "number of function: {}", stat.function_num);
}

#[cfg_attr(coverage, coverage(off))]
async fn write_worker_nodes(&self, s: &mut String) {
let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
tracing::warn!("failed to get worker actor count");
return;
};

use comfy_table::{Row, Table};
let worker_nodes = self.cluster_manager.list_worker_node(None, None).await;
let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
tracing::warn!("failed to get worker nodes");
return;
};
let mut table = Table::new();
table.set_header({
let mut row = Row::new();
Expand Down Expand Up @@ -636,10 +655,14 @@ impl DiagnoseCommand {
#[cfg_attr(coverage, coverage(off))]
async fn write_await_tree(&self, s: &mut String) {
// Most lines of code are copied from dashboard::handlers::dump_await_tree_all, because the latter cannot be called directly from here.
let worker_nodes = self
.cluster_manager
let Ok(worker_nodes) = self
.metadata_manager
.list_worker_node(Some(WorkerType::ComputeNode), None)
.await;
.await
else {
tracing::warn!("failed to get worker nodes");
return;
};

let mut all = Default::default();

Expand Down
13 changes: 13 additions & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,19 @@ impl MetadataManager {
}
}

pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
match &self {
MetadataManager::V1(mgr) => Ok(mgr.fragment_manager.node_actor_count().await),
MetadataManager::V2(mgr) => {
let actor_cnt = mgr.catalog_controller.worker_actor_count().await?;
Ok(actor_cnt
.into_iter()
.map(|(id, cnt)| (id as WorkerId, cnt))
.collect())
}
}
}

pub async fn count_streaming_job(&self) -> MetaResult<usize> {
match self {
MetadataManager::V1(mgr) => Ok(mgr.fragment_manager.count_streaming_job().await),
Expand Down

0 comments on commit fb2b17c

Please sign in to comment.