Skip to content

Commit

Permalink
recover backfill row count on init + add tests for progress
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jan 29, 2024
1 parent 9cb25db commit 8995fce
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 6 deletions.
5 changes: 2 additions & 3 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ where
let mut snapshot_read_epoch;

// Keep track of rows from the snapshot.
let mut total_snapshot_processed_rows: u64 = 0;
let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count();

// Arrangement Backfill Algorithm:
//
Expand Down Expand Up @@ -276,9 +276,8 @@ where
// mark.
for chunk in upstream_chunk_buffer.drain(..) {
let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows +=
cur_barrier_upstream_processed_rows +=
chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
yield Message::Chunk(mapping_chunk(
chunk,
&self.output_indices,
Expand Down
25 changes: 25 additions & 0 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ impl BackfillState {
));
*committed_state = current_state.clone();
}

pub(crate) fn get_snapshot_row_count(&self) -> u64 {
self.inner
.values()
.map(|p| p.get_snapshot_row_count())
.sum()
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -244,6 +251,10 @@ impl BackfillStatePerVnode {
pub(crate) fn current_state(&self) -> &BackfillProgressPerVnode {
&self.current_state
}

pub(crate) fn get_snapshot_row_count(&self) -> u64 {
self.current_state().get_snapshot_row_count()
}
}

impl From<Vec<(VirtualNode, BackfillStatePerVnode)>> for BackfillState {
Expand Down Expand Up @@ -274,6 +285,20 @@ pub enum BackfillProgressPerVnode {
},
}

impl BackfillProgressPerVnode {
fn get_snapshot_row_count(&self) -> u64 {
match self {
BackfillProgressPerVnode::NotStarted => 0,
BackfillProgressPerVnode::InProgress {
snapshot_row_count, ..
}
| BackfillProgressPerVnode::Completed {
snapshot_row_count, ..
} => *snapshot_row_count,
}
}
}

pub(crate) fn mark_chunk(
chunk: StreamChunk,
current_pos: &OwnedRow,
Expand Down
2 changes: 2 additions & 0 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ metrics_level = "Disabled"
meta_nodes: 1,
compactor_nodes: 1,
compute_node_cores: 1,
per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL = true;".into()]
.into(),
..Default::default()
}
}
Expand Down
57 changes: 54 additions & 3 deletions src/tests/simulation/tests/integration_tests/backfill_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::thread::sleep;
use std::time::Duration;

use anyhow::Result;
use itertools::Itertools;
use risingwave_simulation::cluster::{Cluster, Configuration};
use tokio::time::timeout;
use tokio::time::{sleep, timeout};

const SET_PARALLELISM: &str = "SET STREAMING_PARALLELISM=1;";
const ROOT_TABLE_CREATE: &str = "create table t1 (_id int, data jsonb);";
Expand Down Expand Up @@ -49,6 +48,13 @@ select
from p2;
"#;

async fn kill_cn_and_wait_recover(cluster: &Cluster) {
cluster
.kill_nodes(["compute-1", "compute-2", "compute-3"], 0)
.await;
sleep(Duration::from_secs(10)).await;
}

#[tokio::test]
async fn test_backfill_with_upstream_and_snapshot_read() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_backfill()).await?;
Expand Down Expand Up @@ -151,7 +157,7 @@ async fn test_arrangement_backfill_replication() -> Result<()> {
let upstream_task = tokio::spawn(async move {
// The initial 100 records will take approx 3s
// After that we start ingesting upstream records.
sleep(Duration::from_secs(3));
sleep(Duration::from_secs(3)).await;
for i in 101..=200 {
session2
.run(format!("insert into t values ({})", i))
Expand Down Expand Up @@ -233,3 +239,48 @@ async fn test_backfill_backpressure() -> Result<()> {
// distribution MUST also be single, and arrangement backfill should just use Simple.

// TODO(kwannoel): Test arrangement backfill background recovery.
#[tokio::test]
async fn test_arrangement_backfill_progress() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_arrangement_backfill()).await?;
let mut session = cluster.start_session();

// Create base table
session.run("CREATE TABLE t (v1 int primary key)").await?;

// Ingest data
session
.run("INSERT INTO t SELECT * FROM generate_series(1, 1000)")
.await?;
session.run("FLUSH;").await?;

// Create arrangement backfill with rate limit
session.run("SET STREAMING_PARALLELISM=1").await?;
session.run("SET BACKGROUND_DDL=true").await?;
session.run("SET STREAMING_RATE_LIMIT=1").await?;
session
.run("CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t")
.await?;

// Verify arrangement backfill progress after 10s, it should be 1% at least.
sleep(Duration::from_secs(10)).await;
let progress = session
.run("SELECT progress FROM rw_catalog.rw_ddl_progress")
.await?;
println!("progress: {}", progress);
let progress = progress.replace('%', "");
let progress = progress.parse::<f64>().unwrap();
assert!((1.0..2.0).contains(&progress));

// Trigger recovery and test it again.
kill_cn_and_wait_recover(&cluster).await;
let prev_progress = progress;
let progress = session
.run("SELECT progress FROM rw_catalog.rw_ddl_progress")
.await?;
println!("progress: {}", progress);
let progress = progress.replace('%', "");
let progress = progress.parse::<f64>().unwrap();
assert!((prev_progress..prev_progress + 1.5).contains(&progress));

Ok(())
}

0 comments on commit 8995fce

Please sign in to comment.