Skip to content

Commit

Permalink
feat(telemetry): report connector name and table_id along with meta (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored Mar 17, 2024
1 parent 9b84ae0 commit 1542b88
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 2 deletions.
13 changes: 13 additions & 0 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ message MetaReport {
// This field represents the "number of running streaming jobs"
// and is used to indicate whether the cluster is active.
uint32 stream_job_count = 5;
// stream_jobs is the list of running streaming jobs
// and is used to collect the table_id, connector_name and table_optimizations
repeated StreamJobDesc stream_jobs = 6;
}

enum PlanOptimization {
TABLE_OPTIMIZATION_UNSPECIFIED = 0;
}

message StreamJobDesc {
int32 table_id = 1;
optional string connector_name = 2;
repeated PlanOptimization plan_optimizations = 3;
}

message ComputeReport {
Expand Down
44 changes: 42 additions & 2 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use itertools::Itertools;
use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
use risingwave_common::{bail, current_cluster_version};
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
Expand All @@ -29,8 +30,8 @@ use risingwave_meta_model_v2::{
actor, connection, database, fragment, function, index, object, object_dependency, schema,
sink, source, streaming_job, subscription, table, user_privilege, view, ActorId,
ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId,
FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SinkId,
SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId,
FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId,
SinkId, SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId,
};
use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::{
Expand Down Expand Up @@ -69,6 +70,7 @@ use crate::controller::ObjectModel;
use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
use crate::rpc::ddl_controller::DropMode;
use crate::stream::SourceManagerRef;
use crate::telemetry::MetaTelemetryJobDesc;
use crate::{MetaError, MetaResult};

pub type CatalogControllerRef = Arc<CatalogController>;
Expand Down Expand Up @@ -2395,6 +2397,44 @@ impl CatalogController {
Ok(version)
}

pub async fn list_stream_job_desc_for_telemetry(
&self,
) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
let inner = self.inner.read().await;
let info: Vec<(TableId, Option<Property>)> = Table::find()
.select_only()
.column(table::Column::TableId)
.column(source::Column::WithProperties)
.join(JoinType::LeftJoin, table::Relation::Source.def())
.filter(
table::Column::TableType
.eq(TableType::Table)
.or(table::Column::TableType.eq(TableType::MaterializedView)),
)
.into_tuple()
.all(&inner.db)
.await?;

Ok(info
.into_iter()
.map(|(table_id, properties)| {
let connector_info = if let Some(inner_props) = properties {
inner_props
.inner_ref()
.get(UPSTREAM_SOURCE_KEY)
.map(|v| v.to_lowercase())
} else {
None
};
MetaTelemetryJobDesc {
table_id,
connector: connector_info,
optimization: vec![],
}
})
.collect())
}

pub async fn alter_source_column(
&self,
pb_source: PbSource,
Expand Down
36 changes: 36 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ use crate::manager::catalog::utils::{
refcnt_inc_connection, ReplaceTableExprRewriter,
};
use crate::rpc::ddl_controller::DropMode;
use crate::telemetry::MetaTelemetryJobDesc;

pub type CatalogManagerRef = Arc<CatalogManager>;

Expand Down Expand Up @@ -3532,6 +3533,41 @@ impl CatalogManager {
self.core.lock().await.database.list_tables()
}

pub async fn list_stream_job_for_telemetry(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
let tables = self.list_tables().await;
let mut res = Vec::with_capacity(tables.len());
let source_read_lock = self.core.lock().await;
for table_def in tables {
// filter out internal tables, only allow Table and MaterializedView
if !(table_def.table_type == TableType::Table as i32
|| table_def.table_type == TableType::MaterializedView as i32)
{
continue;
}
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) =
table_def.optional_associated_source_id
&& let Some(source) = source_read_lock.database.sources.get(&source_id)
{
res.push(MetaTelemetryJobDesc {
table_id: table_def.id as i32,
connector: source
.with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|v| v.to_lowercase()),
optimization: vec![],
})
} else {
res.push(MetaTelemetryJobDesc {
table_id: table_def.id as i32,
connector: None,
optimization: vec![],
})
}
}

Ok(res)
}

pub async fn list_tables_by_type(&self, table_type: TableType) -> Vec<Table> {
self.core
.lock()
Expand Down
12 changes: 12 additions & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::manager::{
};
use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism};
use crate::stream::SplitAssignment;
use crate::telemetry::MetaTelemetryJobDesc;
use crate::MetaResult;

#[derive(Clone)]
Expand Down Expand Up @@ -684,6 +685,17 @@ impl MetadataManager {
}
}

pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
match self {
MetadataManager::V1(mgr) => mgr.catalog_manager.list_stream_job_for_telemetry().await,
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.list_stream_job_desc_for_telemetry()
.await
}
}
}

pub async fn update_source_rate_limit_by_source_id(
&self,
source_id: SourceId,
Expand Down
41 changes: 41 additions & 0 deletions src/meta/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ struct RwVersion {
git_sha: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum PlanOptimization {
// todo: add optimization applied to each job
Placeholder,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MetaTelemetryJobDesc {
pub table_id: i32,
pub connector: Option<String>,
pub optimization: Vec<PlanOptimization>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MetaTelemetryReport {
#[serde(flatten)]
Expand All @@ -52,6 +65,26 @@ pub struct MetaTelemetryReport {
// At this point, it will always be etcd, but we will enable telemetry when using memory.
meta_backend: MetaBackend,
rw_version: RwVersion,
job_desc: Vec<MetaTelemetryJobDesc>,
}

impl From<MetaTelemetryJobDesc> for risingwave_pb::telemetry::StreamJobDesc {
fn from(val: MetaTelemetryJobDesc) -> Self {
risingwave_pb::telemetry::StreamJobDesc {
table_id: val.table_id,
connector_name: val.connector,
plan_optimizations: val
.optimization
.iter()
.map(|opt| match opt {
PlanOptimization::Placeholder => {
risingwave_pb::telemetry::PlanOptimization::TableOptimizationUnspecified
as i32
}
})
.collect(),
}
}
}

impl TelemetryToProtobuf for MetaTelemetryReport {
Expand All @@ -74,6 +107,7 @@ impl TelemetryToProtobuf for MetaTelemetryReport {
git_sha: self.rw_version.git_sha,
}),
stream_job_count: self.streaming_job_count as u32,
stream_jobs: self.job_desc.into_iter().map(|job| job.into()).collect(),
};
pb_report.encode_to_vec()
}
Expand Down Expand Up @@ -131,6 +165,11 @@ impl TelemetryReportCreator for MetaReportCreator {
.count_streaming_job()
.await
.map_err(|err| err.as_report().to_string())? as u64;
let stream_job_desc = self
.metadata_manager
.list_stream_job_desc()
.await
.map_err(|err| err.as_report().to_string())?;

Ok(MetaTelemetryReport {
rw_version: RwVersion {
Expand All @@ -154,6 +193,7 @@ impl TelemetryReportCreator for MetaReportCreator {
},
streaming_job_count,
meta_backend: self.meta_backend,
job_desc: stream_job_desc,
})
}

Expand Down Expand Up @@ -202,6 +242,7 @@ mod test {
version: "version".to_owned(),
git_sha: "git_sha".to_owned(),
},
job_desc: vec![],
};

let pb_bytes = report.to_pb_bytes();
Expand Down

0 comments on commit 1542b88

Please sign in to comment.