Skip to content

Commit

Permalink
fix(metric): fix backfill snapshot read metric (#15414)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Mar 4, 2024
1 parent bba2374 commit 56af4dd
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 35 deletions.
34 changes: 16 additions & 18 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
use crate::executor::backfill::utils::METADATA_STATE_LEN;
use crate::executor::backfill::utils::{
compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message,
mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode,
BackfillProgressPerVnode, BackfillState,
mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_backfill_metrics,
update_pos_by_vnode, BackfillProgressPerVnode, BackfillState,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
Expand Down Expand Up @@ -300,7 +300,13 @@ where
&self.output_indices,
));
}

update_backfill_metrics(
&self.metrics,
self.actor_id,
upstream_table_id,
cur_barrier_snapshot_processed_rows,
cur_barrier_upstream_processed_rows,
);
break 'backfill_loop;
}
Some((vnode, row)) => {
Expand Down Expand Up @@ -447,21 +453,13 @@ where

upstream_table.commit(barrier.epoch).await?;

self.metrics
.arrangement_backfill_snapshot_read_row_count
.with_label_values(&[
upstream_table_id.to_string().as_str(),
self.actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_snapshot_processed_rows);

self.metrics
.arrangement_backfill_upstream_output_row_count
.with_label_values(&[
upstream_table_id.to_string().as_str(),
self.actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_upstream_processed_rows);
update_backfill_metrics(
&self.metrics,
self.actor_id,
upstream_table_id,
cur_barrier_snapshot_processed_rows,
cur_barrier_upstream_processed_rows,
);

// Update snapshot read epoch.
snapshot_read_epoch = barrier.epoch.prev;
Expand Down
32 changes: 15 additions & 17 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::common::table::state_table::StateTable;
use crate::executor::backfill::utils;
use crate::executor::backfill::utils::{
compute_bounds, construct_initial_finished_state, create_builder, get_new_pos, mapping_chunk,
mapping_message, mark_chunk, owned_row_iter, METADATA_STATE_LEN,
mapping_message, mark_chunk, owned_row_iter, update_backfill_metrics, METADATA_STATE_LEN,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
Expand Down Expand Up @@ -301,7 +301,13 @@ where
&self.output_indices,
));
}

update_backfill_metrics(
&self.metrics,
self.actor_id,
upstream_table_id,
cur_barrier_snapshot_processed_rows,
cur_barrier_upstream_processed_rows,
);
break 'backfill_loop;
}
Some(record) => {
Expand Down Expand Up @@ -403,21 +409,13 @@ where
upstream_chunk_buffer.clear()
}

self.metrics
.backfill_snapshot_read_row_count
.with_label_values(&[
upstream_table_id.to_string().as_str(),
self.actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_snapshot_processed_rows);

self.metrics
.backfill_upstream_output_row_count
.with_label_values(&[
upstream_table_id.to_string().as_str(),
self.actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_upstream_processed_rows);
update_backfill_metrics(
&self.metrics,
self.actor_id,
upstream_table_id,
cur_barrier_snapshot_processed_rows,
cur_barrier_upstream_processed_rows,
);

// Update snapshot read epoch.
snapshot_read_epoch = barrier.epoch.prev;
Expand Down
26 changes: 26 additions & 0 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow};
use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTableInner;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
};
use crate::task::ActorId;

/// `vnode`, `is_finished`, `row_count`, all occupy 1 column each.
pub const METADATA_STATE_LEN: usize = 3;
Expand Down Expand Up @@ -815,3 +817,27 @@ pub fn create_builder(
DataChunkBuilder::new(data_types, chunk_size)
}
}

pub fn update_backfill_metrics(
metrics: &StreamingMetrics,
actor_id: ActorId,
upstream_table_id: u32,
cur_barrier_snapshot_processed_rows: u64,
cur_barrier_upstream_processed_rows: u64,
) {
metrics
.backfill_snapshot_read_row_count
.with_label_values(&[
upstream_table_id.to_string().as_str(),
actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_snapshot_processed_rows);

metrics
.backfill_upstream_output_row_count
.with_label_values(&[
upstream_table_id.to_string().as_str(),
actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_upstream_processed_rows);
}

0 comments on commit 56af4dd

Please sign in to comment.