From 1542b882b2bb86c7d81d01fa576ae0a5d29794dd Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Sun, 17 Mar 2024 15:37:34 +0800 Subject: [PATCH] feat(telemetry): report connector name and table_id along with meta (#15663) --- proto/telemetry.proto | 13 +++++++++ src/meta/src/controller/catalog.rs | 44 +++++++++++++++++++++++++++-- src/meta/src/manager/catalog/mod.rs | 36 +++++++++++++++++++++++ src/meta/src/manager/metadata.rs | 12 ++++++++ src/meta/src/telemetry.rs | 41 +++++++++++++++++++++++++++ 5 files changed, 144 insertions(+), 2 deletions(-) diff --git a/proto/telemetry.proto b/proto/telemetry.proto index 563ff3cb965a3..9890f48250538 100644 --- a/proto/telemetry.proto +++ b/proto/telemetry.proto @@ -85,6 +85,19 @@ message MetaReport { // This field represents the "number of running streaming jobs" // and is used to indicate whether the cluster is active. uint32 stream_job_count = 5; + // stream_jobs is the list of running streaming jobs + // and is used to collect the table_id, connector_name and table_optimizations + repeated StreamJobDesc stream_jobs = 6; +} + +enum PlanOptimization { + TABLE_OPTIMIZATION_UNSPECIFIED = 0; +} + +message StreamJobDesc { + int32 table_id = 1; + optional string connector_name = 2; + repeated PlanOptimization plan_optimizations = 3; } message ComputeReport { diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 9c85afc241401..95cf53b3e4fe1 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -21,6 +21,7 @@ use itertools::Itertools; use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; use risingwave_common::{bail, current_cluster_version}; +use risingwave_connector::source::UPSTREAM_SOURCE_KEY; use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; @@ -29,8 +30,8 @@ use risingwave_meta_model_v2::{ actor, connection, database, fragment, function, index, object, object_dependency, schema, sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, - FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SinkId, - SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId, + FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId, + SinkId, SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId, }; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ @@ -69,6 +70,7 @@ use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; use crate::rpc::ddl_controller::DropMode; use crate::stream::SourceManagerRef; +use crate::telemetry::MetaTelemetryJobDesc; use crate::{MetaError, MetaResult}; pub type CatalogControllerRef = Arc; @@ -2395,6 +2397,44 @@ impl CatalogController { Ok(version) } + pub async fn list_stream_job_desc_for_telemetry( + &self, + ) -> MetaResult> { + let inner = self.inner.read().await; + let info: Vec<(TableId, Option)> = Table::find() + .select_only() + .column(table::Column::TableId) + .column(source::Column::WithProperties) + .join(JoinType::LeftJoin, table::Relation::Source.def()) + .filter( + table::Column::TableType + .eq(TableType::Table) + .or(table::Column::TableType.eq(TableType::MaterializedView)), + ) + .into_tuple() + .all(&inner.db) + .await?; + + Ok(info + .into_iter() + .map(|(table_id, properties)| { + let connector_info = if let Some(inner_props) = properties { + inner_props + .inner_ref() + .get(UPSTREAM_SOURCE_KEY) + .map(|v| v.to_lowercase()) + } else { + None + }; + MetaTelemetryJobDesc { + table_id, + connector: connector_info, + optimization: vec![], + } + }) + .collect()) + } + pub async fn alter_source_column( &self, pb_source: PbSource, diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 3db6828df3b09..b6d0c856a1010 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -131,6 +131,7 @@ use crate::manager::catalog::utils::{ refcnt_inc_connection, ReplaceTableExprRewriter, }; use crate::rpc::ddl_controller::DropMode; +use crate::telemetry::MetaTelemetryJobDesc; pub type CatalogManagerRef = Arc; @@ -3532,6 +3533,41 @@ impl CatalogManager { self.core.lock().await.database.list_tables() } + pub async fn list_stream_job_for_telemetry(&self) -> MetaResult> { + let tables = self.list_tables().await; + let mut res = Vec::with_capacity(tables.len()); + let source_read_lock = self.core.lock().await; + for table_def in tables { + // filter out internal tables, only allow Table and MaterializedView + if !(table_def.table_type == TableType::Table as i32 + || table_def.table_type == TableType::MaterializedView as i32) + { + continue; + } + if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) = + table_def.optional_associated_source_id + && let Some(source) = source_read_lock.database.sources.get(&source_id) + { + res.push(MetaTelemetryJobDesc { + table_id: table_def.id as i32, + connector: source + .with_properties + .get(UPSTREAM_SOURCE_KEY) + .map(|v| v.to_lowercase()), + optimization: vec![], + }) + } else { + res.push(MetaTelemetryJobDesc { + table_id: table_def.id as i32, + connector: None, + optimization: vec![], + }) + } + } + + Ok(res) + } + pub async fn list_tables_by_type(&self, table_type: TableType) -> Vec { self.core .lock() diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 66d3fcc9a41e9..cb58eef48cd60 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -34,6 +34,7 @@ use crate::manager::{ }; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; use crate::stream::SplitAssignment; +use crate::telemetry::MetaTelemetryJobDesc; use crate::MetaResult; #[derive(Clone)] @@ -684,6 +685,17 @@ impl MetadataManager { } } + pub async fn list_stream_job_desc(&self) -> MetaResult> { + match self { + MetadataManager::V1(mgr) => mgr.catalog_manager.list_stream_job_for_telemetry().await, + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .list_stream_job_desc_for_telemetry() + .await + } + } + } + pub async fn update_source_rate_limit_by_source_id( &self, source_id: SourceId, diff --git a/src/meta/src/telemetry.rs b/src/meta/src/telemetry.rs index 19e7d46f3c048..ece7aec5d7a2d 100644 --- a/src/meta/src/telemetry.rs +++ b/src/meta/src/telemetry.rs @@ -43,6 +43,19 @@ struct RwVersion { git_sha: String, } +#[derive(Debug, Serialize, Deserialize)] +pub enum PlanOptimization { + // todo: add optimization applied to each job + Placeholder, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MetaTelemetryJobDesc { + pub table_id: i32, + pub connector: Option, + pub optimization: Vec, +} + #[derive(Debug, Serialize, Deserialize)] pub struct MetaTelemetryReport { #[serde(flatten)] @@ -52,6 +65,26 @@ pub struct MetaTelemetryReport { // At this point, it will always be etcd, but we will enable telemetry when using memory. meta_backend: MetaBackend, rw_version: RwVersion, + job_desc: Vec, +} + +impl From for risingwave_pb::telemetry::StreamJobDesc { + fn from(val: MetaTelemetryJobDesc) -> Self { + risingwave_pb::telemetry::StreamJobDesc { + table_id: val.table_id, + connector_name: val.connector, + plan_optimizations: val + .optimization + .iter() + .map(|opt| match opt { + PlanOptimization::Placeholder => { + risingwave_pb::telemetry::PlanOptimization::TableOptimizationUnspecified + as i32 + } + }) + .collect(), + } + } } impl TelemetryToProtobuf for MetaTelemetryReport { @@ -74,6 +107,7 @@ impl TelemetryToProtobuf for MetaTelemetryReport { git_sha: self.rw_version.git_sha, }), stream_job_count: self.streaming_job_count as u32, + stream_jobs: self.job_desc.into_iter().map(|job| job.into()).collect(), }; pb_report.encode_to_vec() } @@ -131,6 +165,11 @@ impl TelemetryReportCreator for MetaReportCreator { .count_streaming_job() .await .map_err(|err| err.as_report().to_string())? as u64; + let stream_job_desc = self + .metadata_manager + .list_stream_job_desc() + .await + .map_err(|err| err.as_report().to_string())?; Ok(MetaTelemetryReport { rw_version: RwVersion { @@ -154,6 +193,7 @@ impl TelemetryReportCreator for MetaReportCreator { }, streaming_job_count, meta_backend: self.meta_backend, + job_desc: stream_job_desc, }) } @@ -202,6 +242,7 @@ mod test { version: "version".to_owned(), git_sha: "git_sha".to_owned(), }, + job_desc: vec![], }; let pb_bytes = report.to_pb_bytes();