From 8995fcea8ff0bb2a1abf23dad3a39ea5b44b35d1 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 29 Jan 2024 15:16:01 +0800 Subject: [PATCH] recover backfill row count on init + add tests for progress --- .../executor/backfill/arrangement_backfill.rs | 5 +- src/stream/src/executor/backfill/utils.rs | 25 ++++++++ src/tests/simulation/src/cluster.rs | 2 + .../tests/integration_tests/backfill_tests.rs | 57 ++++++++++++++++++- 4 files changed, 83 insertions(+), 6 deletions(-) diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index c4dc003db4849..3a64c1a3b2297 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -184,7 +184,7 @@ where let mut snapshot_read_epoch; // Keep track of rows from the snapshot. - let mut total_snapshot_processed_rows: u64 = 0; + let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count(); // Arrangement Backfill Algorithm: // @@ -276,9 +276,8 @@ where // mark. for chunk in upstream_chunk_buffer.drain(..) { let chunk_cardinality = chunk.cardinality() as u64; - cur_barrier_snapshot_processed_rows += + cur_barrier_upstream_processed_rows += chunk_cardinality; - total_snapshot_processed_rows += chunk_cardinality; yield Message::Chunk(mapping_chunk( chunk, &self.output_indices, diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index e1b21bfb03845..8937d52607748 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -218,6 +218,13 @@ impl BackfillState { )); *committed_state = current_state.clone(); } + + pub(crate) fn get_snapshot_row_count(&self) -> u64 { + self.inner + .values() + .map(|p| p.get_snapshot_row_count()) + .sum() + } } #[derive(Clone, Debug, PartialEq, Eq)] @@ -244,6 +251,10 @@ impl BackfillStatePerVnode { pub(crate) fn current_state(&self) -> &BackfillProgressPerVnode { &self.current_state } + + pub(crate) fn get_snapshot_row_count(&self) -> u64 { + self.current_state().get_snapshot_row_count() + } } impl From> for BackfillState { @@ -274,6 +285,20 @@ pub enum BackfillProgressPerVnode { }, } +impl BackfillProgressPerVnode { + fn get_snapshot_row_count(&self) -> u64 { + match self { + BackfillProgressPerVnode::NotStarted => 0, + BackfillProgressPerVnode::InProgress { + snapshot_row_count, .. + } + | BackfillProgressPerVnode::Completed { + snapshot_row_count, .. + } => *snapshot_row_count, + } + } +} + pub(crate) fn mark_chunk( chunk: StreamChunk, current_pos: &OwnedRow, diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index adb909c95c362..a81a770bf0bd9 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -259,6 +259,8 @@ metrics_level = "Disabled" meta_nodes: 1, compactor_nodes: 1, compute_node_cores: 1, + per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL = true;".into()] + .into(), ..Default::default() } } diff --git a/src/tests/simulation/tests/integration_tests/backfill_tests.rs b/src/tests/simulation/tests/integration_tests/backfill_tests.rs index b0627144a9175..d60534b0426db 100644 --- a/src/tests/simulation/tests/integration_tests/backfill_tests.rs +++ b/src/tests/simulation/tests/integration_tests/backfill_tests.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::thread::sleep; use std::time::Duration; use anyhow::Result; use itertools::Itertools; use risingwave_simulation::cluster::{Cluster, Configuration}; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; const SET_PARALLELISM: &str = "SET STREAMING_PARALLELISM=1;"; const ROOT_TABLE_CREATE: &str = "create table t1 (_id int, data jsonb);"; @@ -49,6 +48,13 @@ select from p2; "#; +async fn kill_cn_and_wait_recover(cluster: &Cluster) { + cluster + .kill_nodes(["compute-1", "compute-2", "compute-3"], 0) + .await; + sleep(Duration::from_secs(10)).await; +} + #[tokio::test] async fn test_backfill_with_upstream_and_snapshot_read() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_backfill()).await?; @@ -151,7 +157,7 @@ async fn test_arrangement_backfill_replication() -> Result<()> { let upstream_task = tokio::spawn(async move { // The initial 100 records will take approx 3s // After that we start ingesting upstream records. - sleep(Duration::from_secs(3)); + sleep(Duration::from_secs(3)).await; for i in 101..=200 { session2 .run(format!("insert into t values ({})", i)) @@ -233,3 +239,48 @@ async fn test_backfill_backpressure() -> Result<()> { // distribution MUST also be single, and arrangement backfill should just use Simple. // TODO(kwannoel): Test arrangement backfill background recovery. +#[tokio::test] +async fn test_arrangement_backfill_progress() -> Result<()> { + let mut cluster = Cluster::start(Configuration::for_arrangement_backfill()).await?; + let mut session = cluster.start_session(); + + // Create base table + session.run("CREATE TABLE t (v1 int primary key)").await?; + + // Ingest data + session + .run("INSERT INTO t SELECT * FROM generate_series(1, 1000)") + .await?; + session.run("FLUSH;").await?; + + // Create arrangement backfill with rate limit + session.run("SET STREAMING_PARALLELISM=1").await?; + session.run("SET BACKGROUND_DDL=true").await?; + session.run("SET STREAMING_RATE_LIMIT=1").await?; + session + .run("CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t") + .await?; + + // Verify arrangement backfill progress after 10s, it should be 1% at least. + sleep(Duration::from_secs(10)).await; + let progress = session + .run("SELECT progress FROM rw_catalog.rw_ddl_progress") + .await?; + println!("progress: {}", progress); + let progress = progress.replace('%', ""); + let progress = progress.parse::().unwrap(); + assert!((1.0..2.0).contains(&progress)); + + // Trigger recovery and test it again. + kill_cn_and_wait_recover(&cluster).await; + let prev_progress = progress; + let progress = session + .run("SELECT progress FROM rw_catalog.rw_ddl_progress") + .await?; + println!("progress: {}", progress); + let progress = progress.replace('%', ""); + let progress = progress.parse::().unwrap(); + assert!((prev_progress..prev_progress + 1.5).contains(&progress)); + + Ok(()) +}