Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): deprecate executor identity in sink metrics #17810

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

106 changes: 54 additions & 52 deletions grafana/risingwave-dev-dashboard.dashboard.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ use risingwave_common::metrics::LabelGuardedIntGauge;
pub struct MonitoredBaseFileWriterBuilder<B: FileWriterBuilder> {
inner: BaseFileWriterBuilder<B>,
// metrics
unflush_data_file: LabelGuardedIntGauge<2>,
unflush_data_file: LabelGuardedIntGauge<3>,
}

impl<B: FileWriterBuilder> MonitoredBaseFileWriterBuilder<B> {
pub fn new(
inner: BaseFileWriterBuilder<B>,
unflush_data_file: LabelGuardedIntGauge<2>,
unflush_data_file: LabelGuardedIntGauge<3>,
) -> Self {
Self {
inner,
Expand Down Expand Up @@ -59,7 +59,7 @@ pub struct MonitoredBaseFileWriter<B: FileWriterBuilder> {
inner: BaseFileWriter<B>,

// metrics
unflush_data_file: LabelGuardedIntGauge<2>,
unflush_data_file: LabelGuardedIntGauge<3>,

cur_metrics: BaseFileWriterMetrics,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use risingwave_common::metrics::LabelGuardedIntGauge;

#[derive(Clone)]
pub struct MonitoredPositionDeleteWriterBuilder<B: FileWriterBuilder> {
current_cache_number: LabelGuardedIntGauge<2>,
current_cache_number: LabelGuardedIntGauge<3>,
inner: PositionDeleteWriterBuilder<B>,
}

impl<B: FileWriterBuilder> MonitoredPositionDeleteWriterBuilder<B> {
pub fn new(
inner: PositionDeleteWriterBuilder<B>,
current_cache_number: LabelGuardedIntGauge<2>,
current_cache_number: LabelGuardedIntGauge<3>,
) -> Self {
Self {
current_cache_number,
Expand Down Expand Up @@ -59,7 +59,7 @@ pub struct MonitoredPositionDeleteWriter<B: FileWriterBuilder> {
writer: PositionDeleteWriter<B>,

// metrics
cache_number: LabelGuardedIntGauge<2>,
cache_number: LabelGuardedIntGauge<3>,
current_metrics: PositionDeleteMetrics,
}

Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,11 @@ pub struct BackpressureMonitoredLogReader<R: LogReader> {
inner: R,
/// Start time to wait for new future after poll ready
wait_new_future_start_time: Option<Instant>,
wait_new_future_duration_ns: LabelGuardedIntCounter<3>,
wait_new_future_duration_ns: LabelGuardedIntCounter<4>,
}

impl<R: LogReader> BackpressureMonitoredLogReader<R> {
fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter<3>) -> Self {
fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter<4>) -> Self {
Self {
inner,
wait_new_future_start_time: None,
Expand Down
29 changes: 14 additions & 15 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,21 +272,20 @@ impl SinkParam {

#[derive(Clone)]
pub struct SinkMetrics {
pub sink_commit_duration_metrics: LabelGuardedHistogram<3>,
pub connector_sink_rows_received: LabelGuardedIntCounter<2>,
pub log_store_first_write_epoch: LabelGuardedIntGauge<3>,
pub log_store_latest_write_epoch: LabelGuardedIntGauge<3>,
pub log_store_write_rows: LabelGuardedIntCounter<3>,
pub log_store_latest_read_epoch: LabelGuardedIntGauge<3>,
pub log_store_read_rows: LabelGuardedIntCounter<3>,

pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<3>,

pub iceberg_write_qps: LabelGuardedIntCounter<2>,
pub iceberg_write_latency: LabelGuardedHistogram<2>,
pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge<2>,
pub iceberg_position_delete_cache_num: LabelGuardedIntGauge<2>,
pub iceberg_partition_num: LabelGuardedIntGauge<2>,
pub sink_commit_duration_metrics: LabelGuardedHistogram<4>,
pub connector_sink_rows_received: LabelGuardedIntCounter<3>,
pub log_store_first_write_epoch: LabelGuardedIntGauge<4>,
pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>,
pub log_store_write_rows: LabelGuardedIntCounter<4>,
pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>,
pub log_store_read_rows: LabelGuardedIntCounter<4>,
pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>,

pub iceberg_write_qps: LabelGuardedIntCounter<3>,
pub iceberg_write_latency: LabelGuardedHistogram<3>,
pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge<3>,
pub iceberg_position_delete_cache_num: LabelGuardedIntGauge<3>,
pub iceberg_partition_num: LabelGuardedIntGauge<3>,
}

impl SinkMetrics {
Expand Down
56 changes: 34 additions & 22 deletions src/stream/src/common/log_store_impl/kv_log_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub(crate) type ReaderTruncationOffsetType = (u64, Option<SeqIdType>);

#[derive(Clone)]
pub(crate) struct KvLogStoreReadMetrics {
pub storage_read_count: LabelGuardedIntCounter<4>,
pub storage_read_size: LabelGuardedIntCounter<4>,
pub storage_read_count: LabelGuardedIntCounter<5>,
pub storage_read_size: LabelGuardedIntCounter<5>,
}

impl KvLogStoreReadMetrics {
Expand All @@ -72,28 +72,34 @@ impl KvLogStoreReadMetrics {

#[derive(Clone)]
pub(crate) struct KvLogStoreMetrics {
pub storage_write_count: LabelGuardedIntCounter<3>,
pub storage_write_size: LabelGuardedIntCounter<3>,
pub rewind_count: LabelGuardedIntCounter<3>,
pub rewind_delay: LabelGuardedHistogram<3>,
pub buffer_unconsumed_item_count: LabelGuardedIntGauge<3>,
pub buffer_unconsumed_row_count: LabelGuardedIntGauge<3>,
pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge<3>,
pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge<3>,
pub storage_write_count: LabelGuardedIntCounter<4>,
pub storage_write_size: LabelGuardedIntCounter<4>,
pub rewind_count: LabelGuardedIntCounter<4>,
pub rewind_delay: LabelGuardedHistogram<4>,
pub buffer_unconsumed_item_count: LabelGuardedIntGauge<4>,
pub buffer_unconsumed_row_count: LabelGuardedIntGauge<4>,
pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge<4>,
pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge<4>,
pub persistent_log_read_metrics: KvLogStoreReadMetrics,
pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
}

impl KvLogStoreMetrics {
pub(crate) fn new(
metrics: &StreamingMetrics,
identity: &String,
actor_id: ActorId,
sink_param: &SinkParam,
connector: &'static str,
) -> Self {
let executor_id = identity;
let sink_id = format!("{}", sink_param.sink_id.sink_id);
let labels = &[executor_id.as_str(), connector, sink_id.as_str()];
let actor_id_str = actor_id.to_string();
let sink_id_str = sink_param.sink_id.sink_id.to_string();

let labels = &[
&actor_id_str,
connector,
&sink_id_str,
&sink_param.sink_name,
];
let storage_write_size = metrics
.kv_log_store_storage_write_size
.with_guarded_label_values(labels);
Expand All @@ -107,34 +113,38 @@ impl KvLogStoreMetrics {
let persistent_log_read_size = metrics
.kv_log_store_storage_read_size
.with_guarded_label_values(&[
executor_id.as_str(),
&actor_id_str,
connector,
sink_id.as_str(),
&sink_id_str,
&sink_param.sink_name,
READ_PERSISTENT_LOG,
]);
let persistent_log_read_count = metrics
.kv_log_store_storage_read_count
.with_guarded_label_values(&[
executor_id.as_str(),
&actor_id_str,
connector,
sink_id.as_str(),
&sink_id_str,
&sink_param.sink_name,
READ_PERSISTENT_LOG,
]);

let flushed_buffer_read_size = metrics
.kv_log_store_storage_read_size
.with_guarded_label_values(&[
executor_id.as_str(),
&actor_id_str,
connector,
sink_id.as_str(),
&sink_id_str,
&sink_param.sink_name,
READ_FLUSHED_BUFFER,
]);
let flushed_buffer_read_count = metrics
.kv_log_store_storage_read_count
.with_guarded_label_values(&[
executor_id.as_str(),
&actor_id_str,
connector,
sink_id.as_str(),
&sink_id_str,
&sink_param.sink_name,
READ_FLUSHED_BUFFER,
]);

Expand Down Expand Up @@ -293,6 +303,8 @@ mod v1 {

pub(crate) use v2::KV_LOG_STORE_V2_INFO;

use crate::task::ActorId;

/// A new version of log store schema. Compared to v1, the v2 added a new vnode column to the log store pk,
/// becomes `epoch`, `seq_id` and `vnode`. In this way, providing a log store pk, we can get exactly one single row.
///
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/common/log_store_impl/kv_log_store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ fn initial_rewind_backoff_policy() -> RewindBackoffPolicy {
struct RewindDelay {
last_rewind_truncate_offset: Option<TruncateOffset>,
backoff_policy: RewindBackoffPolicy,
rewind_count: LabelGuardedIntCounter<3>,
rewind_delay: LabelGuardedHistogram<3>,
rewind_count: LabelGuardedIntCounter<4>,
rewind_delay: LabelGuardedHistogram<4>,
}

impl RewindDelay {
Expand Down
Loading
Loading