Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): report connector name and table_id along with meta #15663

Merged
merged 8 commits into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ message MetaReport {
// This field represents the "number of running streaming jobs"
// and is used to indicate whether the cluster is active.
uint32 stream_job_count = 5;
// stream_jobs is the list of running streaming jobs
// and is used to collect the table_id, connector_name and table_optimizations
repeated StreamJobDesc stream_jobs = 6;
}

enum PlanOptimization {
TABLE_OPTIMIZATION_UNSPECIFIED = 0;
}

message StreamJobDesc {
int32 table_id = 1;
optional string connector_name = 2;
repeated PlanOptimization plan_optimizations = 3;
}

message ComputeReport {
Expand Down
44 changes: 42 additions & 2 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use itertools::Itertools;
use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
use risingwave_common::{bail, current_cluster_version};
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
Expand All @@ -29,8 +30,8 @@ use risingwave_meta_model_v2::{
actor, connection, database, fragment, function, index, object, object_dependency, schema,
sink, source, streaming_job, subscription, table, user_privilege, view, ActorId,
ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId,
FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SinkId,
SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId,
FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId,
SinkId, SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId,
};
use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::{
Expand Down Expand Up @@ -69,6 +70,7 @@ use crate::controller::ObjectModel;
use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
use crate::rpc::ddl_controller::DropMode;
use crate::stream::SourceManagerRef;
use crate::telemetry::MetaTelemetryJobDesc;
use crate::{MetaError, MetaResult};

pub type CatalogControllerRef = Arc<CatalogController>;
Expand Down Expand Up @@ -2395,6 +2397,44 @@ impl CatalogController {
Ok(version)
}

pub async fn list_stream_job_desc_for_telemetry(
&self,
) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
let inner = self.inner.read().await;
let info: Vec<(TableId, Option<Property>)> = Table::find()
.select_only()
.column(table::Column::TableId)
.column(source::Column::WithProperties)
.join(JoinType::LeftJoin, table::Relation::Source.def())
.filter(
table::Column::TableType
.eq(TableType::Table)
.or(table::Column::TableType.eq(TableType::MaterializedView)),
)
.into_tuple()
.all(&inner.db)
.await?;

Ok(info
.into_iter()
.map(|(table_id, properties)| {
let connector_info = if let Some(inner_props) = properties {
inner_props
.inner_ref()
.get(UPSTREAM_SOURCE_KEY)
.map(|v| v.to_lowercase())
} else {
None
};
MetaTelemetryJobDesc {
table_id,
connector: connector_info,
optimization: vec![],
}
})
.collect())
}

pub async fn alter_source_column(
&self,
pb_source: PbSource,
Expand Down
36 changes: 36 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ use crate::manager::catalog::utils::{
refcnt_inc_connection, ReplaceTableExprRewriter,
};
use crate::rpc::ddl_controller::DropMode;
use crate::telemetry::MetaTelemetryJobDesc;

pub type CatalogManagerRef = Arc<CatalogManager>;

Expand Down Expand Up @@ -3532,6 +3533,41 @@ impl CatalogManager {
self.core.lock().await.database.list_tables()
}

pub async fn list_stream_job_for_telemetry(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
let tables = self.list_tables().await;
let mut res = Vec::with_capacity(tables.len());
let source_read_lock = self.core.lock().await;
for table_def in tables {
// filter out internal tables, only allow Table and MaterializedView
if !(table_def.table_type == TableType::Table as i32
|| table_def.table_type == TableType::MaterializedView as i32)
{
continue;
}
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) =
table_def.optional_associated_source_id
&& let Some(source) = source_read_lock.database.sources.get(&source_id)
{
res.push(MetaTelemetryJobDesc {
table_id: table_def.id as i32,
connector: source
.with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|v| v.to_lowercase()),
optimization: vec![],
})
} else {
res.push(MetaTelemetryJobDesc {
table_id: table_def.id as i32,
connector: None,
optimization: vec![],
})
}
}

Ok(res)
}

pub async fn list_tables_by_type(&self, table_type: TableType) -> Vec<Table> {
self.core
.lock()
Expand Down
12 changes: 12 additions & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::manager::{
};
use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism};
use crate::stream::SplitAssignment;
use crate::telemetry::MetaTelemetryJobDesc;
use crate::MetaResult;

#[derive(Clone)]
Expand Down Expand Up @@ -684,6 +685,17 @@ impl MetadataManager {
}
}

pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
match self {
MetadataManager::V1(mgr) => mgr.catalog_manager.list_stream_job_for_telemetry().await,
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.list_stream_job_desc_for_telemetry()
.await
}
}
}

pub async fn update_source_rate_limit_by_source_id(
&self,
source_id: SourceId,
Expand Down
41 changes: 41 additions & 0 deletions src/meta/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ struct RwVersion {
git_sha: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum PlanOptimization {
// todo: add optimization applied to each job
Placeholder,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MetaTelemetryJobDesc {
pub table_id: i32,
pub connector: Option<String>,
pub optimization: Vec<PlanOptimization>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MetaTelemetryReport {
#[serde(flatten)]
Expand All @@ -52,6 +65,26 @@ pub struct MetaTelemetryReport {
// At this point, it will always be etcd, but we will enable telemetry when using memory.
meta_backend: MetaBackend,
rw_version: RwVersion,
job_desc: Vec<MetaTelemetryJobDesc>,
}

impl From<MetaTelemetryJobDesc> for risingwave_pb::telemetry::StreamJobDesc {
fn from(val: MetaTelemetryJobDesc) -> Self {
risingwave_pb::telemetry::StreamJobDesc {
table_id: val.table_id,
connector_name: val.connector,
plan_optimizations: val
.optimization
.iter()
.map(|opt| match opt {
PlanOptimization::Placeholder => {
risingwave_pb::telemetry::PlanOptimization::TableOptimizationUnspecified
as i32
}
})
.collect(),
}
}
}

impl TelemetryToProtobuf for MetaTelemetryReport {
Expand All @@ -74,6 +107,7 @@ impl TelemetryToProtobuf for MetaTelemetryReport {
git_sha: self.rw_version.git_sha,
}),
stream_job_count: self.streaming_job_count as u32,
stream_jobs: self.job_desc.into_iter().map(|job| job.into()).collect(),
};
pb_report.encode_to_vec()
}
Expand Down Expand Up @@ -131,6 +165,11 @@ impl TelemetryReportCreator for MetaReportCreator {
.count_streaming_job()
.await
.map_err(|err| err.as_report().to_string())? as u64;
let stream_job_desc = self
.metadata_manager
.list_stream_job_desc()
.await
.map_err(|err| err.as_report().to_string())?;

Ok(MetaTelemetryReport {
rw_version: RwVersion {
Expand All @@ -154,6 +193,7 @@ impl TelemetryReportCreator for MetaReportCreator {
},
streaming_job_count,
meta_backend: self.meta_backend,
job_desc: stream_job_desc,
})
}

Expand Down Expand Up @@ -202,6 +242,7 @@ mod test {
version: "version".to_owned(),
git_sha: "git_sha".to_owned(),
},
job_desc: vec![],
};

let pb_bytes = report.to_pb_bytes();
Expand Down
Loading