From 31ff3707874575723fa69bef5c919eb8a0999a49 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 22 Sep 2023 15:26:52 +0800 Subject: [PATCH 1/9] docs --- src/stream/src/executor/backfill/no_shuffle_backfill.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index b6241acdea569..096650cc03896 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -201,6 +201,8 @@ where // However, they do not contain the vnode key at index 0. // That is filled in when we flush the state table. let mut current_state: Vec = vec![None; state_len]; + // Old state is just the previous current_state, + // We need it to update the state per vnode. let mut old_state: Option> = None; // The first barrier message should be propagated. From bd4149426737e1bb5b19489059e49689b6a1d0b7 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 22 Sep 2023 16:40:35 +0800 Subject: [PATCH 2/9] recover backfill state --- .../executor/backfill/no_shuffle_backfill.rs | 119 ++++++++++++------ src/stream/src/executor/backfill/utils.rs | 73 ----------- 2 files changed, 84 insertions(+), 108 deletions(-) diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 096650cc03896..4d5e5c81e6175 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -21,7 +21,8 @@ use futures::{pin_mut, stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; -use risingwave_common::row::OwnedRow; +use risingwave_common::hash::VnodeBitmapExt; +use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::Datum; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::epoch::EpochPair; @@ -34,8 +35,8 @@ use risingwave_storage::StateStore; use crate::common::table::state_table::StateTable; use crate::executor::backfill::utils; use crate::executor::backfill::utils::{ - check_all_vnode_finished, compute_bounds, construct_initial_finished_state, get_new_pos, - get_row_count_state, iter_chunks, mapping_chunk, mapping_message, mark_chunk, owned_row_iter, + compute_bounds, construct_initial_finished_state, get_new_pos, iter_chunks, mapping_chunk, + mapping_message, mark_chunk, owned_row_iter, }; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ @@ -44,6 +45,18 @@ use crate::executor::{ }; use crate::task::{ActorId, CreateMviewProgress}; +// schema: | vnode | pk ... | backfill_finished | row_count | +// +1 for vnode, +1 for backfill_finished, +1 for row_count. +const METADATA_STATE_LEN: usize = 3; + +#[derive(Debug, Eq, PartialEq)] +pub struct BackfillState { + current_pos: Option, + old_state: Option>, + is_finished: bool, + row_count: u64, +} + /// An implementation of the [RFC: Use Backfill To Let Mv On Mv Stream Again](https://github.com/risingwavelabs/rfcs/pull/13). /// `BackfillExecutor` is used to create a materialized view on another materialized view. /// @@ -128,9 +141,7 @@ where // The primary key columns, in the output columns of the upstream_table scan. let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap(); - // schema: | vnode | pk ... | backfill_finished | row_count | - // +1 for vnode, +1 for backfill_finished, +1 for row_count. - let state_len = pk_in_output_indices.len() + 3; + let state_len = pk_in_output_indices.len() + METADATA_STATE_LEN; let pk_order = self.upstream_table.pk_serializer().get_order_types(); @@ -145,16 +156,13 @@ where state_table.init_epoch(first_barrier.epoch); } - let is_finished = if let Some(state_table) = self.state_table.as_ref() { - let is_finished = check_all_vnode_finished(state_table, state_len).await?; - if is_finished { - assert!(!first_barrier.is_newly_added(self.actor_id)); - } - is_finished - } else { - // Maintain backwards compatibility with no state table - !first_barrier.is_newly_added(self.actor_id) - }; + let BackfillState { + mut current_pos, + is_finished, + row_count, + mut old_state, + } = Self::recover_backfill_state(self.state_table.as_ref(), pk_in_output_indices.len()) + .await?; let mut builder = DataChunkBuilder::new(self.upstream_table.schema().data_types(), self.chunk_size); @@ -191,19 +199,9 @@ where // | f | f | t | let to_backfill = !is_finished && !is_snapshot_empty; - // Current position of the upstream_table storage primary key. - // `None` means it starts from the beginning. - let mut current_pos: Option = None; - - // Use these to persist state. - // They contain the backfill position, - // as well as the progress. - // However, they do not contain the vnode key at index 0. - // That is filled in when we flush the state table. + // Use this buffer to construct state, + // which will then be persisted. let mut current_state: Vec = vec![None; state_len]; - // Old state is just the previous current_state, - // We need it to update the state per vnode. - let mut old_state: Option> = None; // The first barrier message should be propagated. yield Message::Barrier(first_barrier); @@ -222,13 +220,7 @@ where let mut snapshot_read_epoch = init_epoch; // Keep track of rows from the snapshot. - let mut total_snapshot_processed_rows: u64 = - if let Some(state_table) = self.state_table.as_ref() { - get_row_count_state(state_table, state_len).await? - } else { - // Maintain backwards compatibility with no state_table. - 0 - }; + let mut total_snapshot_processed_rows: u64 = row_count; // Backfill Algorithm: // @@ -510,6 +502,63 @@ where } } + async fn recover_backfill_state( + state_table: Option<&StateTable>, + pk_len: usize, + ) -> StreamExecutorResult { + let Some(state_table) = state_table else { + // If no state table, but backfill is present, it must be from an old cluster. + // In that case backfill must be finished, otherwise it won't have been persisted. + return Ok(BackfillState { + current_pos: None, + is_finished: true, + row_count: 0, + old_state: None, + }); + }; + let mut vnodes = state_table.vnodes().iter_vnodes_scalar(); + let first_vnode = vnodes.next().unwrap(); + let key: &[Datum] = &[Some(first_vnode.into())]; + let row = state_table.get_row(key).await?; + let expected_state = Self::deserialize_backfill_state(row, pk_len); + + // All vnode partitions should have same state (no scale-in supported). + for vnode in vnodes { + let key: &[Datum] = &[Some(vnode.into())]; + let row = state_table.get_row(key).await?; + let state = Self::deserialize_backfill_state(row, pk_len); + assert_eq!(state, expected_state); + } + Ok(expected_state) + } + + fn deserialize_backfill_state(row: Option, pk_len: usize) -> BackfillState { + let Some(row) = row else { + return BackfillState { + current_pos: None, + is_finished: false, + row_count: 0, + old_state: None, + }; + }; + let row = row.into_inner(); + let mut old_state = vec![None; pk_len + METADATA_STATE_LEN]; + old_state[1..row.len() + 1].clone_from_slice(&row); + let current_pos = Some((&row[0..pk_len]).into_owned_row()); + let is_finished = row[pk_len].clone().map_or(false, |d| d.into_bool()); + let row_count = row + .get(pk_len + 1) + .cloned() + .unwrap_or(None) + .map_or(0, |d| d.into_int64() as u64); + BackfillState { + current_pos, + is_finished, + row_count, + old_state: Some(old_state), + } + } + /// Snapshot read the upstream mv. /// The rows from upstream snapshot read will be buffered inside the `builder`. /// If snapshot is dropped before its rows are consumed, diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 8a2cded5ca8d3..259b67d5f202b 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -308,79 +308,6 @@ pub(crate) async fn get_progress_per_vnode Option { - let datum = if row.len() == state_len - 2 { - // Handle backwards compatibility case where - // we did not have row count (-1 for this). - // -1 to exclude `vnode` as well. - row.last() - } else { - row.datum_at(row.len() - 2) - }; - datum.map(|d| d.into_bool()) -} - -/// The row here does not include `vnode`, -/// it should have been excluded by setting `value_indices`. -/// Row schema: | `pk_indices` ... | `backfill_finished` | `row_count` -pub(crate) fn get_row_count(row: OwnedRow, state_len: usize) -> u64 { - if row.len() == state_len - 2 { - // Handle backwards compatibility case where - // we did not have row count (-1 for this). - // -1 to exclude `vnode` as well. - return 0; - } - match row.last() { - None => 0, - Some(d) => d.into_int64() as u64, - } -} - -pub(crate) async fn get_row_count_state( - state_table: &StateTableInner, - state_len: usize, -) -> StreamExecutorResult { - let mut vnodes = state_table.vnodes().iter_vnodes_scalar(); - let vnode = vnodes.next().unwrap(); - let key: &[Datum] = &[Some(vnode.into())]; - let row = state_table.get_row(key).await?; - let row_count = match row { - None => 0, - Some(row) => get_row_count(row, state_len), - }; - Ok(row_count) -} - -/// All vnodes should be persisted with status finished. -pub(crate) async fn check_all_vnode_finished( - state_table: &StateTableInner, - state_len: usize, -) -> StreamExecutorResult { - debug_assert!(!state_table.vnode_bitmap().is_empty()); - let vnodes = state_table.vnodes().iter_vnodes_scalar(); - let mut is_finished = true; - for vnode in vnodes { - let key: &[Datum] = &[Some(vnode.into())]; - let row = state_table.get_row(key).await?; - - let vnode_is_finished = if let Some(row) = row - && let Some(vnode_is_finished) = get_backfill_finished(row, state_len) - { - vnode_is_finished - } else { - false - }; - if !vnode_is_finished { - is_finished = false; - break; - } - } - Ok(is_finished) -} - /// Flush the data // This is a clippy bug, see https://github.com/rust-lang/rust-clippy/issues/11380. // TODO: remove `allow` here after the issued is closed. From ba9685295436bc03828e7ac2d6e12adcf561d6b2 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 22 Sep 2023 17:31:40 +0800 Subject: [PATCH 3/9] add more trace --- .../executor/backfill/no_shuffle_backfill.rs | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 4d5e5c81e6175..f24a1910832a8 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -45,10 +45,11 @@ use crate::executor::{ }; use crate::task::{ActorId, CreateMviewProgress}; -// schema: | vnode | pk ... | backfill_finished | row_count | -// +1 for vnode, +1 for backfill_finished, +1 for row_count. +/// vnode, `is_finished`, `row_count`, all occupy 1 column each. const METADATA_STATE_LEN: usize = 3; +/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` | +/// We can decode that into `BackfillState` on recovery. #[derive(Debug, Eq, PartialEq)] pub struct BackfillState { current_pos: Option, @@ -436,7 +437,7 @@ where tracing::trace!( actor = self.actor_id, - "Backfill has already finished and forward messages directly to the downstream" + "Backfill has finished, waiting for checkpoint barrier" ); // Wait for first barrier to come after backfill is finished. @@ -461,13 +462,6 @@ where // Or snapshot was empty and we construct a placeholder state. debug_assert_ne!(current_pos, None); - tracing::trace!( - actor = self.actor_id, - epoch = ?barrier.epoch, - ?current_pos, - total_snapshot_processed_rows, - "Backfill position persisted after completion" - ); Self::persist_state( barrier.epoch, &mut self.state_table, @@ -479,6 +473,13 @@ where ) .await?; self.progress.finish(barrier.epoch.curr); + tracing::trace!( + actor = self.actor_id, + epoch = ?barrier.epoch, + ?current_pos, + total_snapshot_processed_rows, + "Backfill position persisted after completion" + ); yield msg; break; } @@ -486,6 +487,11 @@ where } } + tracing::trace!( + actor = self.actor_id, + "Backfill has already finished and forward messages directly to the downstream" + ); + // After progress finished + state persisted, // we can forward messages directly to the downstream, // as backfill is finished. From a4da77dca276409d3fa3d38c1ee12338c331d981 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 22 Sep 2023 18:39:03 +0800 Subject: [PATCH 4/9] update mview tracker on recovery --- .../executor/backfill/no_shuffle_backfill.rs | 79 ++++++++++++------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index f24a1910832a8..59743efc42f73 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -445,40 +445,62 @@ where while let Some(Ok(msg)) = upstream.next().await { if let Some(msg) = mapping_message(msg, &self.output_indices) { // If not finished then we need to update state, otherwise no need. - if let Message::Barrier(barrier) = &msg && !is_finished { - // If snapshot was empty, we do not need to backfill, - // but we still need to persist the finished state. - // We currently persist it on the second barrier here rather than first. - // This is because we can't update state table in first epoch, - // since it expects to have been initialized in previous epoch - // (there's no epoch before the first epoch). - if is_snapshot_empty { - current_pos = - Some(construct_initial_finished_state(pk_in_output_indices.len())) - } + if let Message::Barrier(barrier) = &msg { + if is_finished { + // No need to persist any state, we already finished before. + } else { + // If snapshot was empty, we do not need to backfill, + // but we still need to persist the finished state. + // We currently persist it on the second barrier here rather than first. + // This is because we can't update state table in first epoch, + // since it expects to have been initialized in previous epoch + // (there's no epoch before the first epoch). + if is_snapshot_empty { + current_pos = + Some(construct_initial_finished_state(pk_in_output_indices.len())) + } - // We will update current_pos at least once, - // since snapshot read has to be non-empty, - // Or snapshot was empty and we construct a placeholder state. - debug_assert_ne!(current_pos, None); + // We will update current_pos at least once, + // since snapshot read has to be non-empty, + // Or snapshot was empty and we construct a placeholder state. + debug_assert_ne!(current_pos, None); + + Self::persist_state( + barrier.epoch, + &mut self.state_table, + true, + ¤t_pos, + total_snapshot_processed_rows, + &mut old_state, + &mut current_state, + ) + .await?; + tracing::trace!( + actor = self.actor_id, + epoch = ?barrier.epoch, + ?current_pos, + total_snapshot_processed_rows, + "Backfill position persisted after completion" + ); + } - Self::persist_state( - barrier.epoch, - &mut self.state_table, - true, - ¤t_pos, + // In the event that some actors already finished, + // on recovery we want to recover the backfill progress for them too, + // so that we can provide an accurate estimate. + // so we call that here, before finally completing the backfill progress. + self.progress.update( + barrier.epoch.curr, + snapshot_read_epoch, total_snapshot_processed_rows, - &mut old_state, - &mut current_state, - ) - .await?; + ); + // For both backfill finished before recovery, + // and backfill which just finished, we need to update mview tracker, + // it does not persist this information. self.progress.finish(barrier.epoch.curr); tracing::trace!( actor = self.actor_id, epoch = ?barrier.epoch, - ?current_pos, - total_snapshot_processed_rows, - "Backfill position persisted after completion" + "Updated CreateMaterializedTracker" ); yield msg; break; @@ -500,9 +522,6 @@ where #[for_await] for msg in upstream { if let Some(msg) = mapping_message(msg?, &self.output_indices) { - if let Some(state_table) = self.state_table.as_mut() && let Message::Barrier(barrier) = &msg { - state_table.commit_no_data_expected(barrier.epoch); - } yield msg; } } From 8f2abc07ff029db4d659a77cab55a023fbb72ad0 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 22 Sep 2023 19:06:12 +0800 Subject: [PATCH 5/9] improve trace --- .../executor/backfill/no_shuffle_backfill.rs | 29 +++++++------------ .../src/task/barrier_manager/progress.rs | 7 ++++- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 59743efc42f73..49dda9cac455c 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -410,13 +410,6 @@ where total_snapshot_processed_rows, ); - tracing::trace!( - actor = self.actor_id, - epoch = ?barrier.epoch, - ?current_pos, - total_snapshot_processed_rows, - "Backfill position persisted" - ); // Persist state on barrier Self::persist_state( barrier.epoch, @@ -429,16 +422,19 @@ where ) .await?; + tracing::trace!( + ?current_pos, + total_snapshot_processed_rows, + "Backfill state persisted" + ); + yield Message::Barrier(barrier); // We will switch snapshot at the start of the next iteration of the backfill loop. } } - tracing::trace!( - actor = self.actor_id, - "Backfill has finished, waiting for checkpoint barrier" - ); + tracing::trace!("Backfill has finished, waiting for barrier"); // Wait for first barrier to come after backfill is finished. // So we can update our progress + persist the status. @@ -446,6 +442,8 @@ where if let Some(msg) = mapping_message(msg, &self.output_indices) { // If not finished then we need to update state, otherwise no need. if let Message::Barrier(barrier) = &msg { + tracing::trace!("Backfill has received barrier for preserving its progress"); + if is_finished { // No need to persist any state, we already finished before. } else { @@ -476,8 +474,6 @@ where ) .await?; tracing::trace!( - actor = self.actor_id, - epoch = ?barrier.epoch, ?current_pos, total_snapshot_processed_rows, "Backfill position persisted after completion" @@ -497,11 +493,7 @@ where // and backfill which just finished, we need to update mview tracker, // it does not persist this information. self.progress.finish(barrier.epoch.curr); - tracing::trace!( - actor = self.actor_id, - epoch = ?barrier.epoch, - "Updated CreateMaterializedTracker" - ); + tracing::trace!("Updated CreateMaterializedTracker"); yield msg; break; } @@ -510,7 +502,6 @@ where } tracing::trace!( - actor = self.actor_id, "Backfill has already finished and forward messages directly to the downstream" ); diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index adea59cdf656a..698e00a5d76fc 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -129,7 +129,12 @@ impl CreateMviewProgress { ) { match self.state { Some(ChainState::ConsumingUpstream(last, last_consumed_rows)) => { - assert!(last < consumed_epoch); + assert!( + last < consumed_epoch, + "last_epoch: {:#?} must be greater than consumed epoch: {:#?}", + last, + consumed_epoch + ); assert!(last_consumed_rows <= current_consumed_rows); } Some(ChainState::Done) => unreachable!(), From 1461c29ec13712ecefde910dd6851134aa073981 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 22 Sep 2023 21:31:53 +0800 Subject: [PATCH 6/9] update mview count if finished --- .../executor/backfill/no_shuffle_backfill.rs | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 49dda9cac455c..cde602f2e272f 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -423,6 +423,7 @@ where .await?; tracing::trace!( + epoch = ?barrier.epoch, ?current_pos, total_snapshot_processed_rows, "Backfill state persisted" @@ -442,10 +443,16 @@ where if let Some(msg) = mapping_message(msg, &self.output_indices) { // If not finished then we need to update state, otherwise no need. if let Message::Barrier(barrier) = &msg { - tracing::trace!("Backfill has received barrier for preserving its progress"); - if is_finished { - // No need to persist any state, we already finished before. + // In the event that some actors already finished, + // on recovery we want to recover the backfill progress for them too, + // so that we can provide an accurate estimate. + // so we call that here, before finally completing the backfill progress. + self.progress.update( + barrier.epoch.curr, + snapshot_read_epoch, + total_snapshot_processed_rows, + ); } else { // If snapshot was empty, we do not need to backfill, // but we still need to persist the finished state. @@ -474,26 +481,21 @@ where ) .await?; tracing::trace!( + epoch = ?barrier.epoch, ?current_pos, total_snapshot_processed_rows, "Backfill position persisted after completion" ); } - // In the event that some actors already finished, - // on recovery we want to recover the backfill progress for them too, - // so that we can provide an accurate estimate. - // so we call that here, before finally completing the backfill progress. - self.progress.update( - barrier.epoch.curr, - snapshot_read_epoch, - total_snapshot_processed_rows, - ); // For both backfill finished before recovery, // and backfill which just finished, we need to update mview tracker, // it does not persist this information. self.progress.finish(barrier.epoch.curr); - tracing::trace!("Updated CreateMaterializedTracker"); + tracing::trace!( + epoch = ?barrier.epoch, + "Updated CreateMaterializedTracker" + ); yield msg; break; } From 27af793d95a9eb92d701d0bbe9e01ecf96f97e71 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 22 Sep 2023 22:13:22 +0800 Subject: [PATCH 7/9] fix --- src/meta/src/barrier/progress.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index d484e471f4a31..75d13d8379ca9 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -281,10 +281,17 @@ impl CreateMviewProgressTracker { ) -> Option { let actor = progress.chain_actor_id; let Some(epoch) = self.actor_map.get(&actor).copied() else { - panic!( - "no tracked progress for actor {}, is it already finished?", + // On restart, backfill will ALWAYS notify CreateMviewProgressTracker, + // even if backfill is finished on recovery. + // This is because we don't know if only this actor is finished, + // OR the entire stream job is finished. + // For the first case, we must notify meta. + // For the second case, we can still notify meta, but ignore it here. + tracing::info!( + "no tracked progress for actor {}, the stream job could already be finished", actor ); + return None; }; let new_state = if progress.done { From db9265b4232f89df5d4dd58105d535e114802e13 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 22 Sep 2023 23:43:55 +0800 Subject: [PATCH 8/9] reduce to 100,000 rows --- ci/scripts/run-backfill-tests.sh | 3 +-- ci/scripts/sql/backfill/insert.sql | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index 6c02442a06255..77a6ef1a5510c 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -7,9 +7,8 @@ # Hence keeping it in case we ever need to debug backfill again. # USAGE: -# Start a rw cluster then run this script. # ```sh -# ./risedev d +# cargo make ci-start ci-backfill # ./ci/scripts/run-backfill-tests.sh # ``` diff --git a/ci/scripts/sql/backfill/insert.sql b/ci/scripts/sql/backfill/insert.sql index 18ed763429231..6c12a3e0b897e 100644 --- a/ci/scripts/sql/backfill/insert.sql +++ b/ci/scripts/sql/backfill/insert.sql @@ -2,5 +2,5 @@ insert into t1 SELECT generate_series, '{"orders": {"id": 1, "price": "2.30", "customer_id": 2}}'::jsonb -FROM generate_series(1, 200000); +FROM generate_series(1, 100000); FLUSH; \ No newline at end of file From cd4e3d55501285e500fa05e9df1f330b8a3efbef Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 26 Sep 2023 12:39:41 +0800 Subject: [PATCH 9/9] refactor: include consumed rows in ChainState::Done --- src/meta/src/barrier/progress.rs | 11 ++++++----- .../src/executor/backfill/arrangement_backfill.rs | 2 +- .../src/executor/backfill/no_shuffle_backfill.rs | 13 +++---------- src/stream/src/executor/chain.rs | 4 ++-- src/stream/src/executor/rearranged_chain.rs | 8 ++++---- src/stream/src/executor/values.rs | 2 +- .../src/task/barrier_manager/managed_state.rs | 6 +++--- src/stream/src/task/barrier_manager/progress.rs | 10 +++++----- 8 files changed, 25 insertions(+), 31 deletions(-) diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 75d13d8379ca9..057e0646279dd 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -35,7 +35,7 @@ type ConsumedRows = u64; enum ChainState { Init, ConsumingUpstream(Epoch, ConsumedRows), - Done, + Done(ConsumedRows), } /// Progress of all actors containing chain nodes while creating mview. @@ -93,18 +93,19 @@ impl Progress { match self.states.remove(&actor).unwrap() { ChainState::Init => {} ChainState::ConsumingUpstream(_, old_consumed_rows) => { - if !matches!(new_state, ChainState::Done) { + if !matches!(new_state, ChainState::Done(_)) { self.consumed_rows -= old_consumed_rows; } } - ChainState::Done => panic!("should not report done multiple times"), + ChainState::Done(_) => panic!("should not report done multiple times"), }; match &new_state { ChainState::Init => {} ChainState::ConsumingUpstream(_, new_consumed_rows) => { self.consumed_rows += new_consumed_rows; } - ChainState::Done => { + ChainState::Done(new_consumed_rows) => { + self.consumed_rows += new_consumed_rows; self.done_count += 1; } }; @@ -295,7 +296,7 @@ impl CreateMviewProgressTracker { }; let new_state = if progress.done { - ChainState::Done + ChainState::Done(progress.consumed_rows) } else { ChainState::ConsumingUpstream(progress.consumed_epoch.into(), progress.consumed_rows) }; diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index d33aed6d6c441..951e4a4235d3d 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -473,7 +473,7 @@ where &mut temporary_state, ).await?; - self.progress.finish(barrier.epoch.curr); + self.progress.finish(barrier.epoch.curr, total_snapshot_processed_rows); yield msg; break; } diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index cde602f2e272f..bd40ea8b34e7d 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -444,15 +444,7 @@ where // If not finished then we need to update state, otherwise no need. if let Message::Barrier(barrier) = &msg { if is_finished { - // In the event that some actors already finished, - // on recovery we want to recover the backfill progress for them too, - // so that we can provide an accurate estimate. - // so we call that here, before finally completing the backfill progress. - self.progress.update( - barrier.epoch.curr, - snapshot_read_epoch, - total_snapshot_processed_rows, - ); + // If already finished, no need persist any state. } else { // If snapshot was empty, we do not need to backfill, // but we still need to persist the finished state. @@ -491,7 +483,8 @@ where // For both backfill finished before recovery, // and backfill which just finished, we need to update mview tracker, // it does not persist this information. - self.progress.finish(barrier.epoch.curr); + self.progress + .finish(barrier.epoch.curr, total_snapshot_processed_rows); tracing::trace!( epoch = ?barrier.epoch, "Updated CreateMaterializedTracker" diff --git a/src/stream/src/executor/chain.rs b/src/stream/src/executor/chain.rs index ab3ef9ae44973..a51c9e95abbb1 100644 --- a/src/stream/src/executor/chain.rs +++ b/src/stream/src/executor/chain.rs @@ -79,7 +79,7 @@ impl ChainExecutor { // If the barrier is a conf change of creating this mview, and the snapshot is not to be // consumed, we can finish the progress immediately. if barrier.is_newly_added(self.actor_id) && self.upstream_only { - self.progress.finish(barrier.epoch.curr); + self.progress.finish(barrier.epoch.curr, 0); } // The first barrier message should be propagated. @@ -103,7 +103,7 @@ impl ChainExecutor { for msg in upstream { let msg = msg?; if to_consume_snapshot && let Message::Barrier(barrier) = &msg { - self.progress.finish(barrier.epoch.curr); + self.progress.finish(barrier.epoch.curr, 0); } yield msg; } diff --git a/src/stream/src/executor/rearranged_chain.rs b/src/stream/src/executor/rearranged_chain.rs index 1ad43de432551..d2aaae9fd5025 100644 --- a/src/stream/src/executor/rearranged_chain.rs +++ b/src/stream/src/executor/rearranged_chain.rs @@ -135,6 +135,8 @@ impl RearrangedChainExecutor { .unbounded_send(RearrangedMessage::PhantomBarrier(first_barrier)) .unwrap(); + let mut processed_rows: u64 = 0; + { // 3. Rearrange stream, will yield the barriers polled from upstream to rearrange. let rearranged_barrier = @@ -162,8 +164,6 @@ impl RearrangedChainExecutor { let mut last_rearranged_epoch = create_epoch; let mut stop_rearrange_tx = Some(stop_rearrange_tx); - let mut processed_rows: u64 = 0; - #[for_await] for rearranged_msg in &mut rearranged { match rearranged_msg? { @@ -223,7 +223,7 @@ impl RearrangedChainExecutor { continue; }; if let Some(barrier) = msg.as_barrier() { - self.progress.finish(barrier.epoch.curr); + self.progress.finish(barrier.epoch.curr, processed_rows); } yield msg; } @@ -236,7 +236,7 @@ impl RearrangedChainExecutor { for msg in upstream { let msg: Message = msg?; if let Some(barrier) = msg.as_barrier() { - self.progress.finish(barrier.epoch.curr); + self.progress.finish(barrier.epoch.curr, processed_rows); } yield msg; } diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index 512e9f6c28da3..1274acee3dac7 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -123,7 +123,7 @@ impl ValuesExecutor { while let Some(barrier) = barrier_receiver.recv().await { if emit { - progress.finish(barrier.epoch.curr); + progress.finish(barrier.epoch.curr, 0); } yield Message::Barrier(barrier); } diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index c438272033831..2c14d6672eb69 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -112,14 +112,14 @@ impl ManagedBarrierState { .into_iter() .map(|(actor, state)| CreateMviewProgress { chain_actor_id: actor, - done: matches!(state, ChainState::Done), + done: matches!(state, ChainState::Done(_)), consumed_epoch: match state { ChainState::ConsumingUpstream(consumed_epoch, _) => consumed_epoch, - ChainState::Done => epoch, + ChainState::Done(_) => epoch, }, consumed_rows: match state { ChainState::ConsumingUpstream(_, consumed_rows) => consumed_rows, - ChainState::Done => 0, + ChainState::Done(consumed_rows) => consumed_rows, }, }) .collect(); diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index 698e00a5d76fc..5abeab216cd00 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -23,7 +23,7 @@ type ConsumedRows = u64; #[derive(Debug, Clone, Copy)] pub(super) enum ChainState { ConsumingUpstream(ConsumedEpoch, ConsumedRows), - Done, + Done(ConsumedRows), } impl LocalBarrierManager { @@ -137,7 +137,7 @@ impl CreateMviewProgress { ); assert!(last_consumed_rows <= current_consumed_rows); } - Some(ChainState::Done) => unreachable!(), + Some(ChainState::Done(_)) => unreachable!(), None => {} }; self.update_inner( @@ -148,11 +148,11 @@ impl CreateMviewProgress { /// Finish the progress. If the progress is already finished, then perform no-op. /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint. - pub fn finish(&mut self, current_epoch: u64) { - if let Some(ChainState::Done) = self.state { + pub fn finish(&mut self, current_epoch: u64, current_consumed_rows: ConsumedRows) { + if let Some(ChainState::Done(_)) = self.state { return; } - self.update_inner(current_epoch, ChainState::Done); + self.update_inner(current_epoch, ChainState::Done(current_consumed_rows)); } }