diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 2dd277449be1..dbd4a0848a6b 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -34,8 +34,8 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; use crate::executor::backfill::utils::METADATA_STATE_LEN; use crate::executor::backfill::utils::{ compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message, - mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode, - BackfillProgressPerVnode, BackfillState, + mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_backfill_metrics, + update_pos_by_vnode, BackfillProgressPerVnode, BackfillState, }; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ @@ -300,7 +300,13 @@ where &self.output_indices, )); } - + update_backfill_metrics( + &self.metrics, + self.actor_id, + upstream_table_id, + cur_barrier_snapshot_processed_rows, + cur_barrier_upstream_processed_rows, + ); break 'backfill_loop; } Some((vnode, row)) => { @@ -447,21 +453,13 @@ where upstream_table.commit(barrier.epoch).await?; - self.metrics - .arrangement_backfill_snapshot_read_row_count - .with_label_values(&[ - upstream_table_id.to_string().as_str(), - self.actor_id.to_string().as_str(), - ]) - .inc_by(cur_barrier_snapshot_processed_rows); - - self.metrics - .arrangement_backfill_upstream_output_row_count - .with_label_values(&[ - upstream_table_id.to_string().as_str(), - self.actor_id.to_string().as_str(), - ]) - .inc_by(cur_barrier_upstream_processed_rows); + update_backfill_metrics( + &self.metrics, + self.actor_id, + upstream_table_id, + cur_barrier_snapshot_processed_rows, + cur_barrier_upstream_processed_rows, + ); // Update snapshot read epoch. snapshot_read_epoch = barrier.epoch.prev; diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 4aea8f63b26e..d9ee88b2d226 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -34,7 +34,7 @@ use crate::common::table::state_table::StateTable; use crate::executor::backfill::utils; use crate::executor::backfill::utils::{ compute_bounds, construct_initial_finished_state, create_builder, get_new_pos, mapping_chunk, - mapping_message, mark_chunk, owned_row_iter, METADATA_STATE_LEN, + mapping_message, mark_chunk, owned_row_iter, update_backfill_metrics, METADATA_STATE_LEN, }; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ @@ -301,7 +301,13 @@ where &self.output_indices, )); } - + update_backfill_metrics( + &self.metrics, + self.actor_id, + upstream_table_id, + cur_barrier_snapshot_processed_rows, + cur_barrier_upstream_processed_rows, + ); break 'backfill_loop; } Some(record) => { @@ -403,21 +409,13 @@ where upstream_chunk_buffer.clear() } - self.metrics - .backfill_snapshot_read_row_count - .with_label_values(&[ - upstream_table_id.to_string().as_str(), - self.actor_id.to_string().as_str(), - ]) - .inc_by(cur_barrier_snapshot_processed_rows); - - self.metrics - .backfill_upstream_output_row_count - .with_label_values(&[ - upstream_table_id.to_string().as_str(), - self.actor_id.to_string().as_str(), - ]) - .inc_by(cur_barrier_upstream_processed_rows); + update_backfill_metrics( + &self.metrics, + self.actor_id, + upstream_table_id, + cur_barrier_snapshot_processed_rows, + cur_barrier_upstream_processed_rows, + ); // Update snapshot read epoch. snapshot_read_epoch = barrier.epoch.prev; diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index c11dd6847125..f7b28e2adce3 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -39,9 +39,11 @@ use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow}; use risingwave_storage::StateStore; use crate::common::table::state_table::StateTableInner; +use crate::executor::monitor::StreamingMetrics; use crate::executor::{ Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark, }; +use crate::task::ActorId; /// `vnode`, `is_finished`, `row_count`, all occupy 1 column each. pub const METADATA_STATE_LEN: usize = 3; @@ -815,3 +817,27 @@ pub fn create_builder( DataChunkBuilder::new(data_types, chunk_size) } } + +pub fn update_backfill_metrics( + metrics: &StreamingMetrics, + actor_id: ActorId, + upstream_table_id: u32, + cur_barrier_snapshot_processed_rows: u64, + cur_barrier_upstream_processed_rows: u64, +) { + metrics + .backfill_snapshot_read_row_count + .with_label_values(&[ + upstream_table_id.to_string().as_str(), + actor_id.to_string().as_str(), + ]) + .inc_by(cur_barrier_snapshot_processed_rows); + + metrics + .backfill_upstream_output_row_count + .with_label_values(&[ + upstream_table_id.to_string().as_str(), + actor_id.to_string().as_str(), + ]) + .inc_by(cur_barrier_upstream_processed_rows); +}