Skip to content

Commit

Permalink
feat(cdc): add metrics for auto schema change (#18216) (cherry-pick) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Aug 28, 2024
1 parent dcdc41c commit f07f8e1
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 7 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,41 @@ def section_streaming_cdc(outer_panels):
),
],
),
panels.timeseries_count(
"Auto Schema Change Failure Count",
"Total number of failed auto schema change attempts of CDC Table",
[
panels.target(
f"sum({metric('auto_schema_change_failure_cnt')}) by (table_id, table_name)",
"{{table_id}} - {{table_name}}",
)
],
["last"],
),
panels.timeseries_count(
"Auto Schema Change Success Count",
"Total number of succeeded auto schema change of CDC Table",
[
panels.target(
f"sum({metric('auto_schema_change_success_cnt')}) by (table_id, table_name)",
"{{table_id}} - {{table_name}}",
)
],
["last"],
),
panels.timeseries_latency(
"Auto Schema Change Latency (sec)",
"Latency of Auto Schema Change Process",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('auto_schema_change_latency_bucket')}[$__rate_interval])) by (le, table_id, table_name))",
f"lag p{legend}" + "{{table_id}} - {{table_name}}",
),
[50, 99, "max"],
),
],
),
],
),
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,9 @@ message EventLog {
}
message EventAutoSchemaChangeFail {
uint32 table_id = 1;
string cdc_table_id = 2;
string upstream_ddl = 3;
string table_name = 2;
string cdc_table_id = 3;
string upstream_ddl = 4;
}
// Event logs identifier, which should be populated by event log service.
optional string unique_id = 1;
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ pub async fn start_service_as_election_leader(
source_manager.clone(),
barrier_manager.context().clone(),
sink_manager.clone(),
meta_metrics.clone(),
)
.await;

Expand Down
31 changes: 30 additions & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info;
use risingwave_meta::rpc::metrics::MetaMetrics;
use risingwave_pb::catalog::connection::private_link_service::{
PbPrivateLinkProvider, PrivateLinkProvider,
};
Expand Down Expand Up @@ -56,6 +57,7 @@ pub struct DdlServiceImpl {
sink_manager: SinkCoordinatorManager,
ddl_controller: DdlController,
aws_client: Arc<Option<AwsEc2Client>>,
meta_metrics: Arc<MetaMetrics>,
}

impl DdlServiceImpl {
Expand All @@ -68,6 +70,7 @@ impl DdlServiceImpl {
source_manager: SourceManagerRef,
barrier_manager: BarrierManagerRef,
sink_manager: SinkCoordinatorManager,
meta_metrics: Arc<MetaMetrics>,
) -> Self {
let aws_cli_ref = Arc::new(aws_client);
let ddl_controller = DdlController::new(
Expand All @@ -85,6 +88,7 @@ impl DdlServiceImpl {
ddl_controller,
aws_client: aws_cli_ref,
sink_manager,
meta_metrics,
}
}

Expand Down Expand Up @@ -962,13 +966,18 @@ impl DdlService for DdlServiceImpl {
.await?;

for table in tables {
let latency_timer = self
.meta_metrics
.auto_schema_change_latency
.with_guarded_label_values(&[&table.id.to_string(), &table.name])
.start_timer();
// send a request to the frontend to get the ReplaceTablePlan
// will retry with exponential backoff if the request fails
let resp = client
.get_table_replace_plan(GetTableReplacePlanRequest {
database_id: table.database_id,
owner: table.owner,
table_name: table.name,
table_name: table.name.clone(),
table_change: Some(table_change.clone()),
})
.await;
Expand Down Expand Up @@ -1000,6 +1009,15 @@ impl DdlService for DdlServiceImpl {
table_id = table.id,
cdc_table_id = table.cdc_table_id,
"Table replaced success");

self.meta_metrics
.auto_schema_change_success_cnt
.with_guarded_label_values(&[
&table.id.to_string(),
&table.name,
])
.inc();
latency_timer.observe_duration();
}
Err(e) => {
tracing::error!(
Expand All @@ -1011,7 +1029,9 @@ impl DdlService for DdlServiceImpl {
"failed to replace the table",
);
add_auto_schema_change_fail_event_log(
&self.meta_metrics,
table.id,
table.name.clone(),
table_change.cdc_table_id.clone(),
table_change.upstream_ddl.clone(),
&self.env.event_log_manager_ref(),
Expand All @@ -1029,7 +1049,9 @@ impl DdlService for DdlServiceImpl {
"failed to get replace table plan",
);
add_auto_schema_change_fail_event_log(
&self.meta_metrics,
table.id,
table.name.clone(),
table_change.cdc_table_id.clone(),
table_change.upstream_ddl.clone(),
&self.env.event_log_manager_ref(),
Expand Down Expand Up @@ -1078,13 +1100,20 @@ impl DdlServiceImpl {
}

fn add_auto_schema_change_fail_event_log(
meta_metrics: &Arc<MetaMetrics>,
table_id: u32,
table_name: String,
cdc_table_id: String,
upstream_ddl: String,
event_log_manager: &EventLogManagerRef,
) {
meta_metrics
.auto_schema_change_failure_cnt
.with_guarded_label_values(&[&table_id.to_string(), &table_name])
.inc();
let event = event_log::EventAutoSchemaChangeFail {
table_id,
table_name,
cdc_table_id,
upstream_ddl,
};
Expand Down
43 changes: 41 additions & 2 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ use prometheus::{
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use risingwave_common::metrics::{LabelGuardedHistogramVec, LabelGuardedIntGaugeVec};
use risingwave_common::metrics::{
LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::{
register_guarded_histogram_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
register_guarded_int_gauge_vec_with_registry,
};
use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
use risingwave_meta_model_v2::WorkerId;
Expand Down Expand Up @@ -195,6 +198,11 @@ pub struct MetaMetrics {

/// Write throughput of commit epoch for each stable
pub table_write_throughput: IntCounterVec,

// ********************************** Auto Schema Change ************************************
pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec<2>,
pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec<2>,
pub auto_schema_change_latency: LabelGuardedHistogramVec<2>,
}

pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
Expand Down Expand Up @@ -573,6 +581,34 @@ impl MetaMetrics {
);
let recovery_latency = register_histogram_with_registry!(opts, registry).unwrap();

let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
"auto_schema_change_failure_cnt",
"Number of failed auto schema change",
&["table_id", "table_name"],
registry
)
.unwrap();

let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
"auto_schema_change_success_cnt",
"Number of success auto schema change",
&["table_id", "table_name"],
registry
)
.unwrap();

let opts = histogram_opts!(
"auto_schema_change_latency",
"Latency of the auto schema change process",
exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
);
let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
opts,
&["table_id", "table_name"],
registry
)
.unwrap();

let source_is_up = register_guarded_int_gauge_vec_with_registry!(
"source_status_is_up",
"source is up or not",
Expand Down Expand Up @@ -763,6 +799,9 @@ impl MetaMetrics {
branched_sst_count,
compaction_event_consumed_latency,
compaction_event_loop_iteration_latency,
auto_schema_change_failure_cnt,
auto_schema_change_success_cnt,
auto_schema_change_latency,
}
}

Expand Down

0 comments on commit f07f8e1

Please sign in to comment.