diff --git a/risedev.yml b/risedev.yml index 38ed00e15fc63..93c7c0c1e90d0 100644 --- a/risedev.yml +++ b/risedev.yml @@ -570,7 +570,7 @@ profile: steps: - use: minio api-requests-max: 30 - api-requests-deadline: 2s + api-requests-deadline: 3s - use: etcd unsafe-no-fsync: true - use: meta-node diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index a7d2ab5c863eb..76f9271862e29 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -185,7 +185,7 @@ where let mut snapshot_read_epoch; // Keep track of rows from the snapshot. - let mut total_snapshot_processed_rows: u64 = 0; + let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count(); // Arrangement Backfill Algorithm: // @@ -278,9 +278,8 @@ where // mark. for chunk in upstream_chunk_buffer.drain(..) { let chunk_cardinality = chunk.cardinality() as u64; - cur_barrier_snapshot_processed_rows += + cur_barrier_upstream_processed_rows += chunk_cardinality; - total_snapshot_processed_rows += chunk_cardinality; yield Message::Chunk(mapping_chunk( chunk, &self.output_indices, @@ -290,6 +289,8 @@ where break 'backfill_loop; } Some((vnode, chunk)) => { + let chunk_cardinality = chunk.cardinality() as u64; + // Raise the current position. // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. @@ -298,9 +299,9 @@ where &chunk, &pk_in_output_indices, &mut backfill_state, + chunk_cardinality, )?; - let chunk_cardinality = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += chunk_cardinality; total_snapshot_processed_rows += chunk_cardinality; let chunk = Message::Chunk(mapping_chunk( @@ -354,6 +355,7 @@ where }) })) { if let Some(chunk) = chunk { + let chunk_cardinality = chunk.cardinality() as u64; // Raise the current position. // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. @@ -362,9 +364,9 @@ where &chunk, &pk_in_output_indices, &mut backfill_state, + chunk_cardinality, )?; - let chunk_cardinality = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += chunk_cardinality; total_snapshot_processed_rows += chunk_cardinality; yield Message::Chunk(mapping_chunk(chunk, &self.output_indices)); @@ -585,8 +587,10 @@ where let backfill_progress = backfill_state.get_progress(&vnode)?; let current_pos = match backfill_progress { BackfillProgressPerVnode::NotStarted => None, - BackfillProgressPerVnode::Completed(current_pos) - | BackfillProgressPerVnode::InProgress(current_pos) => Some(current_pos.clone()), + BackfillProgressPerVnode::Completed { current_pos, .. } + | BackfillProgressPerVnode::InProgress { current_pos, .. } => { + Some(current_pos.clone()) + } }; let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos.clone()); diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 1a19a3fe201fc..8937d52607748 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -90,12 +90,24 @@ impl BackfillState { &mut self, vnode: VirtualNode, new_pos: OwnedRow, + snapshot_row_count_delta: u64, ) -> StreamExecutorResult<()> { let state = self.get_current_state(&vnode); - let new_state = BackfillProgressPerVnode::InProgress(new_pos); match state { - BackfillProgressPerVnode::NotStarted => *state = new_state, - BackfillProgressPerVnode::InProgress(_current_pos) => *state = new_state, + BackfillProgressPerVnode::NotStarted => { + *state = BackfillProgressPerVnode::InProgress { + current_pos: new_pos, + snapshot_row_count: snapshot_row_count_delta, + }; + } + BackfillProgressPerVnode::InProgress { + snapshot_row_count, .. + } => { + *state = BackfillProgressPerVnode::InProgress { + current_pos: new_pos, + snapshot_row_count: *snapshot_row_count + snapshot_row_count_delta, + }; + } BackfillProgressPerVnode::Completed { .. } => unreachable!(), } Ok(()) @@ -104,14 +116,20 @@ impl BackfillState { pub(crate) fn finish_progress(&mut self, vnode: VirtualNode, pos_len: usize) { let finished_placeholder_position = construct_initial_finished_state(pos_len); let current_state = self.get_current_state(&vnode); - let new_pos = match current_state { - BackfillProgressPerVnode::NotStarted => finished_placeholder_position, - BackfillProgressPerVnode::InProgress(current_pos) => current_pos.clone(), + let (new_pos, snapshot_row_count) = match current_state { + BackfillProgressPerVnode::NotStarted => (finished_placeholder_position, 0), + BackfillProgressPerVnode::InProgress { + current_pos, + snapshot_row_count, + } => (current_pos.clone(), *snapshot_row_count), BackfillProgressPerVnode::Completed { .. } => { return; } }; - *current_state = BackfillProgressPerVnode::Completed(new_pos); + *current_state = BackfillProgressPerVnode::Completed { + current_pos: new_pos, + snapshot_row_count, + }; } /// Return state to be committed. @@ -119,42 +137,56 @@ impl BackfillState { let new_state = self.inner.get(vnode).unwrap().current_state().clone(); let new_encoded_state = match new_state { BackfillProgressPerVnode::NotStarted => unreachable!(), - BackfillProgressPerVnode::InProgress(current_pos) => { + BackfillProgressPerVnode::InProgress { + current_pos, + snapshot_row_count, + } => { let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN]; encoded_state[0] = Some(vnode.to_scalar().into()); encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner()); encoded_state[current_pos.len() + 1] = Some(false.into()); - encoded_state[current_pos.len() + 2] = Some(0i64.into()); + encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into()); encoded_state } - BackfillProgressPerVnode::Completed(current_pos) => { + BackfillProgressPerVnode::Completed { + current_pos, + snapshot_row_count, + } => { let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN]; encoded_state[0] = Some(vnode.to_scalar().into()); encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner()); encoded_state[current_pos.len() + 1] = Some(true.into()); - encoded_state[current_pos.len() + 2] = Some(0i64.into()); + encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into()); encoded_state } }; let old_state = self.inner.get(vnode).unwrap().committed_state().clone(); let old_encoded_state = match old_state { BackfillProgressPerVnode::NotStarted => None, - BackfillProgressPerVnode::InProgress(committed_pos) => { + BackfillProgressPerVnode::InProgress { + current_pos, + snapshot_row_count, + } => { + let committed_pos = current_pos; let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN]; encoded_state[0] = Some(vnode.to_scalar().into()); encoded_state[1..committed_pos.len() + 1] .clone_from_slice(committed_pos.as_inner()); encoded_state[committed_pos.len() + 1] = Some(false.into()); - encoded_state[committed_pos.len() + 2] = Some(0i64.into()); + encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into()); Some(encoded_state) } - BackfillProgressPerVnode::Completed(committed_pos) => { + BackfillProgressPerVnode::Completed { + current_pos, + snapshot_row_count, + } => { + let committed_pos = current_pos; let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN]; encoded_state[0] = Some(vnode.to_scalar().into()); encoded_state[1..committed_pos.len() + 1] .clone_from_slice(committed_pos.as_inner()); encoded_state[committed_pos.len() + 1] = Some(true.into()); - encoded_state[committed_pos.len() + 2] = Some(0i64.into()); + encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into()); Some(encoded_state) } }; @@ -167,8 +199,8 @@ impl BackfillState { let state = self.inner.get(vnode).unwrap(); match state.current_state() { // If current state and committed state are the same, we don't need to commit. - s @ BackfillProgressPerVnode::InProgress(_current_pos) - | s @ BackfillProgressPerVnode::Completed(_current_pos) => s != state.committed_state(), + s @ BackfillProgressPerVnode::InProgress { .. } + | s @ BackfillProgressPerVnode::Completed { .. } => s != state.committed_state(), BackfillProgressPerVnode::NotStarted => false, } } @@ -181,10 +213,18 @@ impl BackfillState { assert!(matches!( current_state, - BackfillProgressPerVnode::InProgress(_) | BackfillProgressPerVnode::Completed(_) + BackfillProgressPerVnode::InProgress { .. } + | BackfillProgressPerVnode::Completed { .. } )); *committed_state = current_state.clone(); } + + pub(crate) fn get_snapshot_row_count(&self) -> u64 { + self.inner + .values() + .map(|p| p.get_snapshot_row_count()) + .sum() + } } #[derive(Clone, Debug, PartialEq, Eq)] @@ -211,6 +251,10 @@ impl BackfillStatePerVnode { pub(crate) fn current_state(&self) -> &BackfillProgressPerVnode { &self.current_state } + + pub(crate) fn get_snapshot_row_count(&self) -> u64 { + self.current_state().get_snapshot_row_count() + } } impl From> for BackfillState { @@ -227,8 +271,32 @@ impl From> for BackfillState { pub enum BackfillProgressPerVnode { /// no entry exists for a vnode, or on initialization of the executor. NotStarted, - InProgress(OwnedRow), - Completed(OwnedRow), + InProgress { + /// The current snapshot offset + current_pos: OwnedRow, + /// Number of snapshot records read for this vnode. + snapshot_row_count: u64, + }, + Completed { + /// The current snapshot offset + current_pos: OwnedRow, + /// Number of snapshot records read for this vnode. + snapshot_row_count: u64, + }, +} + +impl BackfillProgressPerVnode { + fn get_snapshot_row_count(&self) -> u64 { + match self { + BackfillProgressPerVnode::NotStarted => 0, + BackfillProgressPerVnode::InProgress { + snapshot_row_count, .. + } + | BackfillProgressPerVnode::Completed { + snapshot_row_count, .. + } => *snapshot_row_count, + } + } } pub(crate) fn mark_chunk( @@ -280,11 +348,11 @@ pub(crate) fn mark_chunk_ref_by_vnode( let vnode = VirtualNode::compute_row(row, pk_in_output_indices); let v = match backfill_state.get_progress(&vnode)? { // We want to just forward the row, if the vnode has finished backfill. - BackfillProgressPerVnode::Completed(_) => true, + BackfillProgressPerVnode::Completed { .. } => true, // If not started, no need to forward. BackfillProgressPerVnode::NotStarted => false, // If in progress, we need to check row <= current_pos. - BackfillProgressPerVnode::InProgress(current_pos) => { + BackfillProgressPerVnode::InProgress { current_pos, .. } => { let lhs = row.project(pk_in_output_indices); let rhs = current_pos; let order = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()); @@ -402,45 +470,71 @@ pub(crate) fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Optio } /// Recovers progress per vnode, so we know which to backfill. +/// See how it decodes the state with the inline comments. pub(crate) async fn get_progress_per_vnode( state_table: &StateTableInner, ) -> StreamExecutorResult> { debug_assert!(!state_table.vnodes().is_empty()); let vnodes = state_table.vnodes().iter_vnodes(); let mut result = Vec::with_capacity(state_table.vnodes().len()); + // 1. Get the vnode keys, so we can get the state per vnode. let vnode_keys = vnodes.map(|vnode| { let datum: [Datum; 1] = [Some(vnode.to_scalar().into())]; datum }); let tasks = vnode_keys.map(|vnode_key| state_table.get_row(vnode_key)); - let states_for_vnode_keys = try_join_all(tasks).await?; - for (vnode, state_for_vnode_key) in state_table + // 2. Fetch the state for each vnode. + // It should have the following schema, it should not contain vnode: + // | pk | `backfill_finished` | `row_count` | + let state_for_vnodes = try_join_all(tasks).await?; + for (vnode, state_for_vnode) in state_table .vnodes() .iter_vnodes() - .zip_eq_debug(states_for_vnode_keys) + .zip_eq_debug(state_for_vnodes) { - // NOTE(kwannoel): state_for_vnode_key does not include the vnode prefix. - let backfill_progress = match state_for_vnode_key { + let backfill_progress = match state_for_vnode { + // There's some state, means there was progress made. It's either finished / in progress. Some(row) => { + // 3. Decode the `snapshot_row_count`. Decode from the back, since + // pk is variable length. + let snapshot_row_count = row.as_inner().get(row.len() - 1).unwrap(); + let snapshot_row_count = (*snapshot_row_count.as_ref().unwrap().as_int64()) as u64; + + // 4. Decode the `is_finished` flag (whether backfill has finished). + // Decode from the back, since pk is variable length. let vnode_is_finished = row.as_inner().get(row.len() - 2).unwrap(); let vnode_is_finished = vnode_is_finished.as_ref().unwrap(); - // Only the current pos should be contained in the in-memory backfill state. - // Row count will be added later. + // 5. Decode the `current_pos`. let current_pos = row.as_inner().get(..row.len() - 2).unwrap(); let current_pos = current_pos.into_owned_row(); + + // 6. Construct the in-memory state per vnode, based on the decoded state. if *vnode_is_finished.as_bool() { BackfillStatePerVnode::new( - BackfillProgressPerVnode::Completed(current_pos.clone()), - BackfillProgressPerVnode::Completed(current_pos), + BackfillProgressPerVnode::Completed { + current_pos: current_pos.clone(), + snapshot_row_count, + }, + BackfillProgressPerVnode::Completed { + current_pos, + snapshot_row_count, + }, ) } else { BackfillStatePerVnode::new( - BackfillProgressPerVnode::InProgress(current_pos.clone()), - BackfillProgressPerVnode::InProgress(current_pos), + BackfillProgressPerVnode::InProgress { + current_pos: current_pos.clone(), + snapshot_row_count, + }, + BackfillProgressPerVnode::InProgress { + current_pos, + snapshot_row_count, + }, ) } } + // No state, means no progress made. None => BackfillStatePerVnode::new( BackfillProgressPerVnode::NotStarted, BackfillProgressPerVnode::NotStarted, @@ -512,10 +606,11 @@ pub(crate) fn update_pos_by_vnode( chunk: &StreamChunk, pk_in_output_indices: &[usize], backfill_state: &mut BackfillState, + snapshot_row_count_delta: u64, ) -> StreamExecutorResult<()> { let new_pos = get_new_pos(chunk, pk_in_output_indices); assert_eq!(new_pos.len(), pk_in_output_indices.len()); - backfill_state.update_progress(vnode, new_pos)?; + backfill_state.update_progress(vnode, new_pos, snapshot_row_count_delta)?; Ok(()) } @@ -604,7 +699,7 @@ where } /// Schema -/// | vnode | pk | `backfill_finished` | +/// | vnode | pk | `backfill_finished` | `row_count` | /// Persists the state per vnode based on `BackfillState`. /// We track the current committed state via `committed_progress` /// so we know whether we need to persist the state or not. @@ -614,12 +709,12 @@ where /// - Not persist to store at all. /// /// `InProgress`: -/// - Format: | vnode | pk | false | +/// - Format: | vnode | pk | false | `row_count` | /// - If change in current pos: Persist. /// - No change in current pos: Do not persist. /// /// Completed -/// - Format: | vnode | pk | true | +/// - Format: | vnode | pk | true | `row_count` | /// - If previous state is `InProgress` / `NotStarted`: Persist. /// - If previous state is Completed: Do not persist. /// TODO(kwannoel): we should check committed state to be all `finished` in the tests. @@ -693,7 +788,7 @@ pub(crate) async fn persist_state_per_vnode Result<()> { let upstream_task = tokio::spawn(async move { // The initial 100 records will take approx 3s // After that we start ingesting upstream records. - sleep(Duration::from_secs(3)); + sleep(Duration::from_secs(3)).await; for i in 101..=200 { session2 .run(format!("insert into t values ({})", i)) @@ -233,3 +234,54 @@ async fn test_backfill_backpressure() -> Result<()> { // distribution MUST also be single, and arrangement backfill should just use Simple. // TODO(kwannoel): Test arrangement backfill background recovery. +#[tokio::test] +async fn test_arrangement_backfill_progress() -> Result<()> { + let mut cluster = Cluster::start(Configuration::for_arrangement_backfill()).await?; + let mut session = cluster.start_session(); + + // Create base table + session.run("CREATE TABLE t (v1 int primary key)").await?; + + // Ingest data + session + .run("INSERT INTO t SELECT * FROM generate_series(1, 1000)") + .await?; + session.run("FLUSH;").await?; + + // Create arrangement backfill with rate limit + session.run("SET STREAMING_PARALLELISM=1").await?; + session.run("SET BACKGROUND_DDL=true").await?; + session.run("SET STREAMING_RATE_LIMIT=1").await?; + session + .run("CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t") + .await?; + + // Verify arrangement backfill progress after 10s, it should be 1% at least. + sleep(Duration::from_secs(10)).await; + let progress = session + .run("SELECT progress FROM rw_catalog.rw_ddl_progress") + .await?; + let progress = progress.replace('%', ""); + let progress = progress.parse::().unwrap(); + assert!( + (1.0..2.0).contains(&progress), + "progress not within bounds {}", + progress + ); + + // Trigger recovery and test it again. + kill_cn_and_wait_recover(&cluster).await; + let prev_progress = progress; + let progress = session + .run("SELECT progress FROM rw_catalog.rw_ddl_progress") + .await?; + let progress = progress.replace('%', ""); + let progress = progress.parse::().unwrap(); + assert!( + (prev_progress - 0.5..prev_progress + 1.5).contains(&progress), + "progress not within bounds {}", + progress + ); + + Ok(()) +} diff --git a/src/tests/simulation/tests/integration_tests/main.rs b/src/tests/simulation/tests/integration_tests/main.rs index 43262cd7b52a2..475793a88b709 100644 --- a/src/tests/simulation/tests/integration_tests/main.rs +++ b/src/tests/simulation/tests/integration_tests/main.rs @@ -28,3 +28,5 @@ mod scale; mod sink; mod storage; mod throttle; + +mod utils; diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 7f6c2da07d64e..e7792d5930e03 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -16,9 +16,13 @@ use std::time::Duration; use anyhow::Result; use risingwave_common::error::anyhow_error; -use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts, Session}; +use risingwave_simulation::cluster::{Cluster, Configuration, Session}; use tokio::time::sleep; +use crate::utils::{ + kill_cn_and_meta_and_wait_recover, kill_cn_and_wait_recover, kill_random_and_wait_recover, +}; + const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);"; const DROP_TABLE: &str = "DROP TABLE t;"; const SEED_TABLE_500: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);"; @@ -31,39 +35,6 @@ const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;"; const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;"; const WAIT: &str = "WAIT;"; -async fn kill_cn_and_wait_recover(cluster: &Cluster) { - cluster - .kill_nodes(["compute-1", "compute-2", "compute-3"], 0) - .await; - sleep(Duration::from_secs(10)).await; -} - -async fn kill_cn_and_meta_and_wait_recover(cluster: &Cluster) { - cluster - .kill_nodes( - [ - "compute-1", - "compute-2", - "compute-3", - "meta-1", - "meta-2", - "meta-3", - ], - 0, - ) - .await; - sleep(Duration::from_secs(10)).await; -} - -async fn kill_random_and_wait_recover(cluster: &Cluster) { - // Kill it again - for _ in 0..3 { - sleep(Duration::from_secs(2)).await; - cluster.kill_node(&KillOpts::ALL_FAST).await; - } - sleep(Duration::from_secs(10)).await; -} - async fn cancel_stream_jobs(session: &mut Session) -> Result> { tracing::info!("finding streaming jobs to cancel"); let ids = session diff --git a/src/tests/simulation/tests/integration_tests/utils.rs b/src/tests/simulation/tests/integration_tests/utils.rs new file mode 100644 index 0000000000000..8f06d0acbea2f --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/utils.rs @@ -0,0 +1,51 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use risingwave_simulation::cluster::{Cluster, KillOpts}; +use tokio::time::sleep; + +pub(crate) async fn kill_cn_and_wait_recover(cluster: &Cluster) { + cluster + .kill_nodes(["compute-1", "compute-2", "compute-3"], 0) + .await; + sleep(Duration::from_secs(10)).await; +} + +pub(crate) async fn kill_cn_and_meta_and_wait_recover(cluster: &Cluster) { + cluster + .kill_nodes( + [ + "compute-1", + "compute-2", + "compute-3", + "meta-1", + "meta-2", + "meta-3", + ], + 0, + ) + .await; + sleep(Duration::from_secs(10)).await; +} + +pub(crate) async fn kill_random_and_wait_recover(cluster: &Cluster) { + // Kill it again + for _ in 0..3 { + sleep(Duration::from_secs(2)).await; + cluster.kill_node(&KillOpts::ALL_FAST).await; + } + sleep(Duration::from_secs(10)).await; +}