From e12dcd9af4b81ea852ccd92490dab0523086ce3d Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 16:32:29 +0800 Subject: [PATCH 01/22] reduce workload of background_mv_barrier_recovery --- src/stream/src/executor/flow_control.rs | 1 + .../recovery/background_ddl.rs | 70 ++++++++----------- 2 files changed, 32 insertions(+), 39 deletions(-) diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs index c48fdc8a4392f..fcc9fd2b6f056 100644 --- a/src/stream/src/executor/flow_control.rs +++ b/src/stream/src/executor/flow_control.rs @@ -55,6 +55,7 @@ impl FlowControlExecutor { match msg { Message::Chunk(chunk) => { let Some(n) = NonZeroU32::new(chunk.cardinality() as u32) else { + tracing::trace!(?chunk, "empty chunk"); // Handle case where chunk is empty continue; }; diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index b4849c877f8ec..6cd7a0b5d903e 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -21,12 +21,14 @@ use risingwave_simulation::utils::AssertResult; use tokio::time::sleep; const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);"; +const DROP_TABLE: &str = "DROP TABLE t;"; const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);"; const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;"; const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;"; const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;"; const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=0;"; const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;"; +const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;"; const WAIT: &str = "WAIT;"; async fn kill_cn_and_wait_recover(cluster: &Cluster) { @@ -80,80 +82,70 @@ fn init_logger() { .try_init(); } +async fn create_mv(session: &mut Session) -> Result<()> { + session.run(CREATE_MV1).await?; + sleep(Duration::from_secs(2)).await; + Ok(()) +} + #[tokio::test] async fn test_background_mv_barrier_recovery() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + init_logger(); + let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; let mut session = cluster.start_session(); - session.run("CREATE TABLE t1 (v1 int);").await?; - session - .run("INSERT INTO t1 select * from generate_series(1, 400000)") - .await?; - session.run("flush").await?; - session.run("SET BACKGROUND_DDL=true;").await?; - session - .run("create materialized view m1 as select * from t1;") - .await?; + session.run(CREATE_TABLE).await?; + session.run(SEED_TABLE).await?; + session.flush().await?; + session.run(SET_RATE_LIMIT_1).await?; + session.run(SET_BACKGROUND_DDL).await?; + create_mv(&mut session).await?; // If the CN is killed before first barrier pass for the MV, the MV will be dropped. // This is because it's table fragments will NOT be committed until first barrier pass. - sleep(Duration::from_secs(5)).await; - kill_cn_and_wait_recover(&cluster).await; - - // Send some upstream updates. - cluster - .run("INSERT INTO t1 select * from generate_series(1, 100000);") - .await?; - cluster.run("flush;").await?; - kill_cn_and_wait_recover(&cluster).await; kill_and_wait_recover(&cluster).await; // Send some upstream updates. - cluster - .run("INSERT INTO t1 select * from generate_series(1, 100000);") - .await?; - cluster.run("flush;").await?; + session.run(SEED_TABLE).await?; + session.run(SEED_TABLE).await?; + session.flush().await?; kill_and_wait_recover(&cluster).await; kill_cn_and_wait_recover(&cluster).await; // Send some upstream updates. - cluster - .run("INSERT INTO t1 select * from generate_series(1, 100000);") - .await?; - cluster.run("flush;").await?; + session.run(SEED_TABLE).await?; + session.run(SEED_TABLE).await?; + session.flush().await?; - // Now just wait for it to complete. + kill_cn_and_wait_recover(&cluster).await; + kill_and_wait_recover(&cluster).await; - sleep(Duration::from_secs(10)).await; + // Now just wait for it to complete. + session.run(WAIT).await?; session - .run("SELECT COUNT(v1) FROM m1") + .run("SELECT COUNT(v1) FROM mv1") .await? - .assert_result_eq("700000"); + .assert_result_eq("3500"); // Make sure that if MV killed and restarted // it will not be dropped. - - session.run("DROP MATERIALIZED VIEW m1").await?; - session.run("DROP TABLE t1").await?; + session.run(DROP_MV1).await?; + session.run(DROP_TABLE).await?; Ok(()) } #[tokio::test] async fn test_background_ddl_cancel() -> Result<()> { - async fn create_mv(session: &mut Session) -> Result<()> { - session.run(CREATE_MV1).await?; - sleep(Duration::from_secs(2)).await; - Ok(()) - } init_logger(); let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; let mut session = cluster.start_session(); session.run(CREATE_TABLE).await?; session.run(SEED_TABLE).await?; + session.flush().await?; session.run(SET_RATE_LIMIT_2).await?; session.run(SET_BACKGROUND_DDL).await?; From a2251738fa422125cbd1d085f50ac42cc81b23ad Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 17:54:20 +0800 Subject: [PATCH 02/22] simplify background_ddl test --- .../recovery/background_ddl.rs | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 6cd7a0b5d903e..880e77853483b 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -97,38 +97,30 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { session.run(CREATE_TABLE).await?; session.run(SEED_TABLE).await?; session.flush().await?; - session.run(SET_RATE_LIMIT_1).await?; + session.run(SET_RATE_LIMIT_2).await?; session.run(SET_BACKGROUND_DDL).await?; create_mv(&mut session).await?; // If the CN is killed before first barrier pass for the MV, the MV will be dropped. // This is because it's table fragments will NOT be committed until first barrier pass. kill_cn_and_wait_recover(&cluster).await; - kill_and_wait_recover(&cluster).await; - - // Send some upstream updates. - session.run(SEED_TABLE).await?; - session.run(SEED_TABLE).await?; - session.flush().await?; - - kill_and_wait_recover(&cluster).await; - kill_cn_and_wait_recover(&cluster).await; // Send some upstream updates. session.run(SEED_TABLE).await?; - session.run(SEED_TABLE).await?; session.flush().await?; - kill_cn_and_wait_recover(&cluster).await; kill_and_wait_recover(&cluster).await; // Now just wait for it to complete. session.run(WAIT).await?; - session + let t_count = session.run("SELECT COUNT(v1) FROM t").await?; + + let mv1_count = session .run("SELECT COUNT(v1) FROM mv1") - .await? - .assert_result_eq("3500"); + .await?; + + assert_eq!(t_count, mv1_count); // Make sure that if MV killed and restarted // it will not be dropped. From 967d2d909366f68346dd6c4ccf574193b7814e81 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 18:05:39 +0800 Subject: [PATCH 03/22] fix flow_control insufficient capacity trace --- src/stream/src/executor/flow_control.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs index fcc9fd2b6f056..59a46356101cd 100644 --- a/src/stream/src/executor/flow_control.rs +++ b/src/stream/src/executor/flow_control.rs @@ -54,16 +54,16 @@ impl FlowControlExecutor { let msg = msg?; match msg { Message::Chunk(chunk) => { - let Some(n) = NonZeroU32::new(chunk.cardinality() as u32) else { - tracing::trace!(?chunk, "empty chunk"); + let chunk_cardinality = chunk.cardinality(); + let Some(n) = NonZeroU32::new(chunk_cardinality as u32) else { // Handle case where chunk is empty continue; }; if let Some(rate_limiter) = &rate_limiter { let result = rate_limiter.until_n_ready(n).await; - if let Err(InsufficientCapacity(n)) = result { + if let Err(InsufficientCapacity(_max_cells)) = result { tracing::error!( - "Rate Limit {:?} smaller than chunk cardinality {n}", + "Rate Limit {:?} smaller than chunk cardinality {chunk_cardinality}", self.rate_limit, ); } From d0ce9cd9e55f1835c113a970639bfee3474bd74b Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 18:17:28 +0800 Subject: [PATCH 04/22] upload failing logs for integration tests --- ci/scripts/deterministic-it-test.sh | 5 ++++- ci/workflows/main-cron.yml | 3 +++ ci/workflows/pull-request.yml | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/ci/scripts/deterministic-it-test.sh b/ci/scripts/deterministic-it-test.sh index f281eaa467bfd..1eb0b0e9f91a6 100755 --- a/ci/scripts/deterministic-it-test.sh +++ b/ci/scripts/deterministic-it-test.sh @@ -5,6 +5,9 @@ set -euo pipefail source ci/scripts/common.sh +export LOGDIR=.risingwave/log +mkdir -p $LOGDIR + echo "--- Download artifacts" buildkite-agent artifact download simulation-it-test.tar.zst . @@ -19,4 +22,4 @@ seq $TEST_NUM | parallel MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ --no-fail-fast \ --cargo-metadata target/nextest/cargo-metadata.json \ --binaries-metadata target/nextest/binaries-metadata.json \ - "$@" + "$@" 2>$LOGDIR/deterministic-it-test-{}.log && rm $LOGDIR/deterministic-it-test-{}.log diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 6f58222424e18..891cb69ecb98f 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -193,6 +193,7 @@ steps: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs timeout_in_minutes: 70 retry: *auto-retry @@ -205,6 +206,7 @@ steps: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs timeout_in_minutes: 70 retry: *auto-retry @@ -217,6 +219,7 @@ steps: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs timeout_in_minutes: 70 retry: *auto-retry diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 3aaa09f0d7716..bd71c7f7044ef 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -344,6 +344,7 @@ steps: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs timeout_in_minutes: 20 retry: *auto-retry From f8bbc2d4fd24362ad1184ee8ba1b5e55781bce38 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 18:35:11 +0800 Subject: [PATCH 05/22] trace no_shuffle_backfill recovery --- src/stream/src/executor/backfill/no_shuffle_backfill.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 97a9da0ff6a99..550fab11e9e9e 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -164,6 +164,7 @@ where mut old_state, } = Self::recover_backfill_state(self.state_table.as_ref(), pk_in_output_indices.len()) .await?; + tracing::trace!(is_finished, row_count, "backfill state recovered"); let mut builder = DataChunkBuilder::new(self.upstream_table.schema().data_types(), self.chunk_size); From 2271c492a968b95a834a5afae0d8afe28d03b203 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 18:58:36 +0800 Subject: [PATCH 06/22] migrate foreground_ddl_no_recovery test --- .../recovery/background_ddl.rs | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 880e77853483b..32587a7069bc6 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -22,7 +22,8 @@ use tokio::time::sleep; const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);"; const DROP_TABLE: &str = "DROP TABLE t;"; -const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);"; +const SEED_TABLE_500: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);"; +const SEED_TABLE_100: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100);"; const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;"; const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;"; const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;"; @@ -95,7 +96,7 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { let mut session = cluster.start_session(); session.run(CREATE_TABLE).await?; - session.run(SEED_TABLE).await?; + session.run(SEED_TABLE_500).await?; session.flush().await?; session.run(SET_RATE_LIMIT_2).await?; session.run(SET_BACKGROUND_DDL).await?; @@ -106,7 +107,7 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { kill_cn_and_wait_recover(&cluster).await; // Send some upstream updates. - session.run(SEED_TABLE).await?; + session.run(SEED_TABLE_500).await?; session.flush().await?; kill_and_wait_recover(&cluster).await; @@ -136,7 +137,7 @@ async fn test_background_ddl_cancel() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; let mut session = cluster.start_session(); session.run(CREATE_TABLE).await?; - session.run(SEED_TABLE).await?; + session.run(SEED_TABLE_500).await?; session.flush().await?; session.run(SET_RATE_LIMIT_2).await?; session.run(SET_BACKGROUND_DDL).await?; @@ -178,3 +179,37 @@ async fn test_background_ddl_cancel() -> Result<()> { Ok(()) } + +// When cluster stop, foreground ddl job must be cancelled. +#[tokio::test] +async fn test_foreground_ddl_no_recovery() -> Result<()> { + init_logger(); + let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; + let mut session = cluster.start_session(); + session.run(CREATE_TABLE).await?; + session.run(SEED_TABLE_100).await?; + session.flush().await?; + + let mut session2 = cluster.start_session(); + tokio::spawn(async move { + session2.run(SET_RATE_LIMIT_2).await.unwrap(); + let result = create_mv(&mut session2).await; + assert!(result.is_err()); + }); + + // Wait for job to start + sleep(Duration::from_secs(2)).await; + + // Kill CN should stop the job + cluster.kill_nodes(["compute-1", "compute-2", "compute-3"], 0).await; + sleep(Duration::from_secs(10)).await; + + // Create MV should succeed, since the previous foreground job should be cancelled. + session.run(SET_RATE_LIMIT_2).await?; + create_mv(&mut session).await?; + + session.run(DROP_MV1).await?; + session.run(DROP_TABLE).await?; + + Ok(()) +} \ No newline at end of file From 3ac170e15523a00e8590f8c5569582d48e0b9cf3 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 19:05:02 +0800 Subject: [PATCH 07/22] add foreground index and sink cancel --- .../recovery/background_ddl.rs | 73 +++++++++++++++++-- 1 file changed, 68 insertions(+), 5 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 32587a7069bc6..010b06389325c 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -17,7 +17,6 @@ use std::time::Duration; use anyhow::Result; use itertools::Itertools; use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts, Session}; -use risingwave_simulation::utils::AssertResult; use tokio::time::sleep; const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);"; @@ -117,9 +116,7 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { let t_count = session.run("SELECT COUNT(v1) FROM t").await?; - let mv1_count = session - .run("SELECT COUNT(v1) FROM mv1") - .await?; + let mv1_count = session.run("SELECT COUNT(v1) FROM mv1").await?; assert_eq!(t_count, mv1_count); @@ -201,7 +198,9 @@ async fn test_foreground_ddl_no_recovery() -> Result<()> { sleep(Duration::from_secs(2)).await; // Kill CN should stop the job - cluster.kill_nodes(["compute-1", "compute-2", "compute-3"], 0).await; + cluster + .kill_nodes(["compute-1", "compute-2", "compute-3"], 0) + .await; sleep(Duration::from_secs(10)).await; // Create MV should succeed, since the previous foreground job should be cancelled. @@ -211,5 +210,69 @@ async fn test_foreground_ddl_no_recovery() -> Result<()> { session.run(DROP_MV1).await?; session.run(DROP_TABLE).await?; + Ok(()) +} + +#[tokio::test] +async fn test_foreground_index_cancel() -> Result<()> { + init_logger(); + let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; + let mut session = cluster.start_session(); + session.run(CREATE_TABLE).await?; + session.run(SEED_TABLE_100).await?; + session.flush().await?; + + let mut session2 = cluster.start_session(); + tokio::spawn(async move { + session2.run(SET_RATE_LIMIT_2).await.unwrap(); + let result = session2.run("CREATE INDEX i ON t (v1);").await; + assert!(result.is_err()); + }); + + // Wait for job to start + sleep(Duration::from_secs(2)).await; + + // Kill CN should stop the job + cancel_stream_jobs(&mut session).await?; + + // Create MV should succeed, since the previous foreground job should be cancelled. + session.run(SET_RATE_LIMIT_2).await?; + session.run("CREATE INDEX i ON t (v1);").await?; + + session.run("DROP INDEX i;").await?; + session.run(DROP_TABLE).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_foreground_sink_cancel() -> Result<()> { + init_logger(); + let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; + let mut session = cluster.start_session(); + session.run(CREATE_TABLE).await?; + session.run(SEED_TABLE_100).await?; + session.flush().await?; + + let mut session2 = cluster.start_session(); + tokio::spawn(async move { + session2.run(SET_RATE_LIMIT_2).await.unwrap(); + let result = session2.run("CREATE SINK s ON t (v1);").await; + assert!(result.is_err()); + }); + + // Wait for job to start + sleep(Duration::from_secs(2)).await; + + // Kill CN should stop the job + cancel_stream_jobs(&mut session).await?; + + // Create MV should succeed, since the previous foreground job should be cancelled. + session.run(SET_RATE_LIMIT_2).await?; + session.run("CREATE SINK s ON t (v1);").await?; + + session.run("DROP SINK s;").await?; + session.run(DROP_TABLE).await?; + Ok(()) } \ No newline at end of file From ee9abfd51b1c1e07543de26d41b93f2a2bb08813 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 19:26:11 +0800 Subject: [PATCH 08/22] fix sink test + deterministically kill cn --- .../recovery/background_ddl.rs | 32 +++++++------------ 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 010b06389325c..248c7cdd7f051 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -32,20 +32,9 @@ const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;"; const WAIT: &str = "WAIT;"; async fn kill_cn_and_wait_recover(cluster: &Cluster) { - // Kill it again - for _ in 0..5 { - cluster - .kill_node(&KillOpts { - kill_rate: 1.0, - kill_meta: false, - kill_frontend: false, - kill_compute: true, - kill_compactor: false, - restart_delay_secs: 1, - }) - .await; - sleep(Duration::from_secs(2)).await; - } + cluster + .kill_nodes(["compute-1", "compute-2", "compute-3"], 0) + .await; sleep(Duration::from_secs(10)).await; } @@ -198,10 +187,7 @@ async fn test_foreground_ddl_no_recovery() -> Result<()> { sleep(Duration::from_secs(2)).await; // Kill CN should stop the job - cluster - .kill_nodes(["compute-1", "compute-2", "compute-3"], 0) - .await; - sleep(Duration::from_secs(10)).await; + kill_cn_and_wait_recover(&cluster).await; // Create MV should succeed, since the previous foreground job should be cancelled. session.run(SET_RATE_LIMIT_2).await?; @@ -257,7 +243,9 @@ async fn test_foreground_sink_cancel() -> Result<()> { let mut session2 = cluster.start_session(); tokio::spawn(async move { session2.run(SET_RATE_LIMIT_2).await.unwrap(); - let result = session2.run("CREATE SINK s ON t (v1);").await; + let result = session2 + .run("CREATE SINK s FROM t WITH (connector='blackhole');") + .await; assert!(result.is_err()); }); @@ -269,10 +257,12 @@ async fn test_foreground_sink_cancel() -> Result<()> { // Create MV should succeed, since the previous foreground job should be cancelled. session.run(SET_RATE_LIMIT_2).await?; - session.run("CREATE SINK s ON t (v1);").await?; + session + .run("CREATE SINK s FROM t WITH (connector='blackhole');") + .await?; session.run("DROP SINK s;").await?; session.run(DROP_TABLE).await?; Ok(()) -} \ No newline at end of file +} From 63869c2a074ff9f137afcd205c8ceebda085b477 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 19:28:38 +0800 Subject: [PATCH 09/22] prune background / foreground ddl tests, they have been migrated --- ci/scripts/run-backfill-tests.sh | 283 ------------------------------- 1 file changed, 283 deletions(-) diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index d0d5eafb3c917..a5bac061e8831 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -117,214 +117,6 @@ test_snapshot_and_upstream_read() { cargo make wait-processes-exit } -# Test background ddl recovery -test_background_ddl_recovery() { - echo "--- e2e, $CLUSTER_PROFILE, test_background_ddl_recovery" - cargo make ci-start $CLUSTER_PROFILE - - # Test before recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - sleep 1 - OLD_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") - - # Restart - restart_cluster - - # Test after recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - # Recover the mview progress - sleep 5 - - NEW_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") - - if [[ ${OLD_PROGRESS%.*} -le ${NEW_PROGRESS%.*} ]]; then - echo "OK: $OLD_PROGRESS smaller or equal to $NEW_PROGRESS" - else - echo "FAILED: $OLD_PROGRESS larger than $NEW_PROGRESS" - exit 1 - fi - - sleep 60 - - # Test after backfill finished - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - - # After cluster restart(s), backfilled mv should still be present. - restart_cluster - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - restart_cluster - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_mv.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - -test_background_ddl_cancel() { - echo "--- e2e, $CLUSTER_PROFILE, test background ddl" - cargo make ci-start $CLUSTER_PROFILE - - # Test before recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - sleep 1 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - - # Restart - restart_cluster - - # Recover - sleep 3 - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - sleep 1 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - # After cancel should be able to create MV - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - sleep 1 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - -# Test foreground ddl should not recover -test_foreground_ddl_cancel() { - echo "--- e2e, $CLUSTER_PROFILE, test_foreground_ddl_cancel" - cargo make ci-start $CLUSTER_PROFILE - - # Test before recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - run_sql "CREATE MATERIALIZED VIEW m1 as select * FROM t;" & - sleep 1 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_fg_mv.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_mv.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - -# Test foreground ddl should not recover -test_foreground_ddl_no_recover() { - echo "--- e2e, $CLUSTER_PROFILE, test_foreground_ddl_no_recover" - cargo make ci-start $CLUSTER_PROFILE - - # Test before recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - run_sql "CREATE MATERIALIZED VIEW m1 as select * FROM t;" & - sleep 3 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - - # Restart - restart_cluster - - # Leave sometime for recovery - sleep 5 - - # Test after recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - sleep 30 - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - -test_foreground_index_cancel() { - echo "--- e2e, $CLUSTER_PROFILE, test_foreground_index_cancel" - cargo make ci-start $CLUSTER_PROFILE - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - - # Test cancel - run_sql "CREATE INDEX i ON t (v1);" & - sleep 3 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - # Test index over recovery - run_sql "CREATE INDEX i ON t (v1);" & - sleep 3 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - - # Restart - restart_cluster - - # Leave sometime for recovery - sleep 5 - - # Test after recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_index.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_index.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - -test_foreground_sink_cancel() { - echo "--- e2e, $CLUSTER_PROFILE, test_foreground_sink_ddl_cancel" - cargo make ci-start $CLUSTER_PROFILE - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - - # Test cancel - run_sql "CREATE SINK i FROM t WITH (connector='blackhole');" & - sleep 3 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - # Test sink over recovery - run_sql "CREATE SINK i FROM t WITH (connector='blackhole');" & - sleep 3 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - - # Restart - restart_cluster - - # Leave sometime for recovery - sleep 5 - - # Test after recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_sink.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_sink.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - # Lots of upstream tombstone, backfill should still proceed. test_backfill_tombstone() { echo "--- e2e, test_backfill_tombstone" @@ -358,85 +150,10 @@ test_backfill_tombstone() { wait } -test_backfill_restart_cn_recovery() { - echo "--- e2e, $CLUSTER_PROFILE, test_background_restart_cn_recovery" - cargo make ci-start $CLUSTER_PROFILE - - # Test before recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - sleep 1 - OLD_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") - - # Restart 1 CN - restart_cn - - # Give some time to recover. - sleep 3 - - # Test after recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - # Recover the mview progress - sleep 5 - - NEW_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") - - if [[ ${OLD_PROGRESS%.*} -le ${NEW_PROGRESS%.*} ]]; then - echo "OK: $OLD_PROGRESS smaller or equal to $NEW_PROGRESS" - else - echo "FAILED: $OLD_PROGRESS larger than $NEW_PROGRESS" - exit 1 - fi - - # Trigger a bootstrap recovery - pkill compute-node - kill_cluster - rename_logs_with_prefix "before-restart" - sleep 10 - cargo make dev $CLUSTER_PROFILE - - # Recover mview progress - sleep 5 - - OLD_PROGRESS=$NEW_PROGRESS - NEW_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") - - if [[ ${OLD_PROGRESS%.*} -le ${NEW_PROGRESS%.*} ]]; then - echo "OK: $OLD_PROGRESS smaller or equal to $NEW_PROGRESS" - else - echo "FAILED: $OLD_PROGRESS larger than $NEW_PROGRESS" - exit 1 - fi - - sleep 60 - - # Test after backfill finished - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - - # After cluster restart(s), backfilled mv should still be present. - restart_cluster - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - restart_cluster - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_mv.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - main() { set -euo pipefail test_snapshot_and_upstream_read test_backfill_tombstone - test_background_ddl_recovery - test_background_ddl_cancel - test_foreground_ddl_no_recover - test_foreground_ddl_cancel - test_foreground_index_cancel - test_foreground_sink_cancel - test_backfill_restart_cn_recovery } main From fd3e33cd2a6b1cae9df6e3fa424c3961d5b56e3e Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 22:34:55 +0800 Subject: [PATCH 10/22] add logs --- ci/scripts/deterministic-it-test.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ci/scripts/deterministic-it-test.sh b/ci/scripts/deterministic-it-test.sh index 1eb0b0e9f91a6..4e6fc9d5bcbb8 100755 --- a/ci/scripts/deterministic-it-test.sh +++ b/ci/scripts/deterministic-it-test.sh @@ -16,10 +16,12 @@ tar -xvf simulation-it-test.tar.zst mkdir target/sim mv target/ci-sim target/sim +TEST_PATTERN="$@" + echo "--- Run integration tests in deterministic simulation mode" seq $TEST_NUM | parallel MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ - cargo nextest run \ + 'cargo nextest run \ --no-fail-fast \ --cargo-metadata target/nextest/cargo-metadata.json \ --binaries-metadata target/nextest/binaries-metadata.json \ - "$@" 2>$LOGDIR/deterministic-it-test-{}.log && rm $LOGDIR/deterministic-it-test-{}.log + "$TEST_PATTERN" 2>$LOGDIR/deterministic-it-test-{}.log && rm $LOGDIR/deterministic-it-test-{}.log' From 3ac2895268011fd8d525877367ed062f5d78b0ef Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 00:24:24 +0800 Subject: [PATCH 11/22] improve error message for WAIT --- src/meta/src/rpc/ddl_controller.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 780f96552115a..9866061a14628 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1128,7 +1128,8 @@ impl DdlController { } pub async fn wait(&self) -> MetaResult<()> { - for _ in 0..30 * 60 { + let timeout_secs = 30 * 60; + for _ in 0..timeout_secs { if self .catalog_manager .list_creating_background_mvs() @@ -1139,7 +1140,9 @@ impl DdlController { } sleep(Duration::from_secs(1)).await; } - Err(MetaError::cancelled("timeout".into())) + Err(MetaError::cancelled(format!( + "timeout after {timeout_secs}s" + ))) } async fn comment_on(&self, comment: Comment) -> MetaResult { From f316af962a97c8f1ebd1c9429374a169a68bec7f Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 00:31:41 +0800 Subject: [PATCH 12/22] add more trace logs + fix progress.finish bug + improve test --- .../src/executor/backfill/cdc/cdc_backfill.rs | 29 ++++++++-- .../executor/backfill/no_shuffle_backfill.rs | 54 ++++++++++++++----- .../recovery/background_ddl.rs | 9 +++- 3 files changed, 74 insertions(+), 18 deletions(-) diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 333a1ad106340..ef671fa78e49c 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -464,11 +464,6 @@ impl CdcBackfillExecutor { if let Message::Barrier(barrier) = &msg { // persist the backfill state state_impl.commit_state(barrier.epoch).await?; - - // mark progress as finished - if let Some(progress) = self.progress.as_mut() { - progress.finish(barrier.epoch.curr, total_snapshot_processed_rows); - } yield msg; // break after the state have been saved break; @@ -477,6 +472,30 @@ impl CdcBackfillExecutor { } } + // NOTE(kwannoel): + // Progress can only be finished after at least 1 barrier. + // This is to make sure that downstream state table is flushed, + // before the mview is made visible. + // Otherwise the mview could be inconsistent with upstream. + // It also cannot be immediately `finished()` after yielding the barrier, + // by using the current_epoch of that barrier, since that will now be the previous epoch. + // When notifying the global barrier manager, local barrier manager always uses the + // current epoch, so it won't see the `finished` state when that happens, + // leading to the stream job never finishing. + // Instead we must wait for the next barrier, and finish the progress there. + if let Some(progress) = self.progress.as_mut() { + while let Some(Ok(msg)) = upstream.next().await { + if let Message::Barrier(barrier) = &msg { + let epoch = barrier.epoch; + progress.finish(epoch.curr, total_snapshot_processed_rows); + yield msg; + break; + } else { + yield msg; + } + } + } + // After backfill progress finished // we can forward messages directly to the downstream, // as backfill is finished. diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 550fab11e9e9e..25ff35bc9f592 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -301,6 +301,11 @@ where // just use the last row to update `current_pos`. current_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); + tracing::trace!( + epoch = snapshot_read_epoch, + ?current_pos, + "snapshot_read: update current_pos" + ); let chunk_cardinality = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += chunk_cardinality; @@ -338,6 +343,11 @@ where // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. current_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); + tracing::trace!( + epoch = snapshot_read_epoch, + ?current_pos, + "snapshot_read_before_barrier: update current_pos" + ); let chunk_cardinality = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += chunk_cardinality; @@ -372,6 +382,11 @@ where let ops = vec![Op::Insert; chunk.capacity()]; let chunk = StreamChunk::from_parts(ops, chunk); current_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); + tracing::trace!( + epoch = ?barrier.epoch, + ?current_pos, + "barrier: update current_pos from residual snapshot rows" + ); cur_barrier_snapshot_processed_rows += chunk_cardinality; total_snapshot_processed_rows += chunk_cardinality; @@ -456,6 +471,7 @@ where if let Some(msg) = mapping_message(msg, &self.output_indices) { // If not finished then we need to update state, otherwise no need. if let Message::Barrier(barrier) = &msg { + let epoch = barrier.epoch; if is_finished { // If already finished, no need persist any state. } else { @@ -476,7 +492,7 @@ where debug_assert_ne!(current_pos, None); Self::persist_state( - barrier.epoch, + epoch, &mut self.state_table, true, ¤t_pos, @@ -486,22 +502,12 @@ where ) .await?; tracing::trace!( - epoch = ?barrier.epoch, + ?epoch, ?current_pos, total_snapshot_processed_rows, "Backfill position persisted after completion" ); } - - // For both backfill finished before recovery, - // and backfill which just finished, we need to update mview tracker, - // it does not persist this information. - self.progress - .finish(barrier.epoch.curr, total_snapshot_processed_rows); - tracing::trace!( - epoch = ?barrier.epoch, - "Updated CreateMaterializedTracker" - ); yield msg; break; } @@ -509,6 +515,30 @@ where } } + // NOTE(kwannoel): + // Progress can only be finished after at least 1 barrier. + // This is to make sure that downstream state table is flushed, + // before the mview is made visible. + // Otherwise the mview could be inconsistent with upstream. + // It also cannot be immediately `finished()` after yielding the barrier, + // by using the current_epoch of that barrier, since that will now be the previous epoch. + // When notifying the global barrier manager, local barrier manager always uses the + // current epoch, so it won't see the `finished` state when that happens, + // leading to the stream job never finishing. + // Instead we must wait for the next barrier, and finish the progress there. + while let Some(Ok(msg)) = upstream.next().await { + if let Message::Barrier(barrier) = &msg { + let epoch = barrier.epoch; + self.progress + .finish(epoch.curr, total_snapshot_processed_rows); + tracing::trace!(?epoch, "Updated CreateMaterializedTracker"); + yield msg; + break; + } else { + yield msg; + } + } + tracing::trace!( "Backfill has already finished and forward messages directly to the downstream" ); diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 248c7cdd7f051..c700afa569134 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -95,7 +95,9 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { kill_cn_and_wait_recover(&cluster).await; // Send some upstream updates. - session.run(SEED_TABLE_500).await?; + session + .run("INSERT INTO t SELECT generate_series FROM generate_series(501, 1000);") + .await?; session.flush().await?; kill_and_wait_recover(&cluster).await; @@ -107,6 +109,11 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { let mv1_count = session.run("SELECT COUNT(v1) FROM mv1").await?; + let missing_rows = session + .run("SELECT v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)") + .await?; + tracing::debug!(missing_rows); + assert_eq!(t_count, mv1_count); // Make sure that if MV killed and restarted From 4c6dc08601775a5568549ff005ba8e30b5e9726f Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 11:03:51 +0800 Subject: [PATCH 13/22] bump e2e test timeout --- ci/workflows/pull-request.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index bd71c7f7044ef..6ee13ec730921 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -82,7 +82,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 15 + timeout_in_minutes: 16 retry: *auto-retry - label: "end-to-end test (parallel)" From 2745d99d3d8a21f310514cc28f8f7a8477fee680 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 12:09:58 +0800 Subject: [PATCH 14/22] revert progress.finish --- .../src/executor/backfill/cdc/cdc_backfill.rs | 29 ++-------- .../executor/backfill/no_shuffle_backfill.rs | 54 +++++-------------- 2 files changed, 17 insertions(+), 66 deletions(-) diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index ef671fa78e49c..333a1ad106340 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -464,6 +464,11 @@ impl CdcBackfillExecutor { if let Message::Barrier(barrier) = &msg { // persist the backfill state state_impl.commit_state(barrier.epoch).await?; + + // mark progress as finished + if let Some(progress) = self.progress.as_mut() { + progress.finish(barrier.epoch.curr, total_snapshot_processed_rows); + } yield msg; // break after the state have been saved break; @@ -472,30 +477,6 @@ impl CdcBackfillExecutor { } } - // NOTE(kwannoel): - // Progress can only be finished after at least 1 barrier. - // This is to make sure that downstream state table is flushed, - // before the mview is made visible. - // Otherwise the mview could be inconsistent with upstream. - // It also cannot be immediately `finished()` after yielding the barrier, - // by using the current_epoch of that barrier, since that will now be the previous epoch. - // When notifying the global barrier manager, local barrier manager always uses the - // current epoch, so it won't see the `finished` state when that happens, - // leading to the stream job never finishing. - // Instead we must wait for the next barrier, and finish the progress there. - if let Some(progress) = self.progress.as_mut() { - while let Some(Ok(msg)) = upstream.next().await { - if let Message::Barrier(barrier) = &msg { - let epoch = barrier.epoch; - progress.finish(epoch.curr, total_snapshot_processed_rows); - yield msg; - break; - } else { - yield msg; - } - } - } - // After backfill progress finished // we can forward messages directly to the downstream, // as backfill is finished. diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 25ff35bc9f592..550fab11e9e9e 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -301,11 +301,6 @@ where // just use the last row to update `current_pos`. current_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); - tracing::trace!( - epoch = snapshot_read_epoch, - ?current_pos, - "snapshot_read: update current_pos" - ); let chunk_cardinality = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += chunk_cardinality; @@ -343,11 +338,6 @@ where // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. current_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); - tracing::trace!( - epoch = snapshot_read_epoch, - ?current_pos, - "snapshot_read_before_barrier: update current_pos" - ); let chunk_cardinality = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += chunk_cardinality; @@ -382,11 +372,6 @@ where let ops = vec![Op::Insert; chunk.capacity()]; let chunk = StreamChunk::from_parts(ops, chunk); current_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); - tracing::trace!( - epoch = ?barrier.epoch, - ?current_pos, - "barrier: update current_pos from residual snapshot rows" - ); cur_barrier_snapshot_processed_rows += chunk_cardinality; total_snapshot_processed_rows += chunk_cardinality; @@ -471,7 +456,6 @@ where if let Some(msg) = mapping_message(msg, &self.output_indices) { // If not finished then we need to update state, otherwise no need. if let Message::Barrier(barrier) = &msg { - let epoch = barrier.epoch; if is_finished { // If already finished, no need persist any state. } else { @@ -492,7 +476,7 @@ where debug_assert_ne!(current_pos, None); Self::persist_state( - epoch, + barrier.epoch, &mut self.state_table, true, ¤t_pos, @@ -502,12 +486,22 @@ where ) .await?; tracing::trace!( - ?epoch, + epoch = ?barrier.epoch, ?current_pos, total_snapshot_processed_rows, "Backfill position persisted after completion" ); } + + // For both backfill finished before recovery, + // and backfill which just finished, we need to update mview tracker, + // it does not persist this information. + self.progress + .finish(barrier.epoch.curr, total_snapshot_processed_rows); + tracing::trace!( + epoch = ?barrier.epoch, + "Updated CreateMaterializedTracker" + ); yield msg; break; } @@ -515,30 +509,6 @@ where } } - // NOTE(kwannoel): - // Progress can only be finished after at least 1 barrier. - // This is to make sure that downstream state table is flushed, - // before the mview is made visible. - // Otherwise the mview could be inconsistent with upstream. - // It also cannot be immediately `finished()` after yielding the barrier, - // by using the current_epoch of that barrier, since that will now be the previous epoch. - // When notifying the global barrier manager, local barrier manager always uses the - // current epoch, so it won't see the `finished` state when that happens, - // leading to the stream job never finishing. - // Instead we must wait for the next barrier, and finish the progress there. - while let Some(Ok(msg)) = upstream.next().await { - if let Message::Barrier(barrier) = &msg { - let epoch = barrier.epoch; - self.progress - .finish(epoch.curr, total_snapshot_processed_rows); - tracing::trace!(?epoch, "Updated CreateMaterializedTracker"); - yield msg; - break; - } else { - yield msg; - } - } - tracing::trace!( "Backfill has already finished and forward messages directly to the downstream" ); From 058b343d63f1571f7dd8c95515852fa500dd8aac Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 13:32:58 +0800 Subject: [PATCH 15/22] recovered stream job requires checkpoint barrier --- src/meta/src/barrier/mod.rs | 2 +- src/meta/src/barrier/progress.rs | 6 ++++-- .../tests/integration_tests/recovery/background_ddl.rs | 4 ++++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index d39dde51399d8..bd9ee3ecb9bd1 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -234,7 +234,7 @@ impl CheckpointControl { async fn finish_commands(&mut self, checkpoint: bool) -> MetaResult { for command in self .finished_commands - .extract_if(|c| checkpoint || c.is_barrier()) + .extract_if(|c| checkpoint || !c.is_checkpoint()) { // The command is ready to finish. We can now call `pre_finish`. command.pre_finish().await?; diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 22cd6f8d9e200..66209a18812fe 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -159,10 +159,12 @@ impl TrackingJob { } } - pub(crate) fn is_barrier(&self) -> bool { + pub(crate) fn is_checkpoint(&self) -> bool { match self { TrackingJob::Recovered(_) => true, - TrackingJob::New(command) => command.context.kind.is_barrier(), + TrackingJob::New(command) => { + command.context.kind.is_initial() || command.context.kind.is_checkpoint() + } } } diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index c700afa569134..e0182ea6e55bb 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -113,6 +113,10 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { .run("SELECT v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)") .await?; tracing::debug!(missing_rows); + let missing_rows_with_row_id = session + .run("SELECT _row_id, v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)") + .await?; + tracing::debug!(missing_rows_with_row_id); assert_eq!(t_count, mv1_count); From 7e1ea160495f4fc927e2fbf39afd95b23436449f Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 15:42:45 +0800 Subject: [PATCH 16/22] dont run batch query if necessary --- .../recovery/background_ddl.rs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index e0182ea6e55bb..18cb6052ebcfd 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -106,18 +106,17 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { session.run(WAIT).await?; let t_count = session.run("SELECT COUNT(v1) FROM t").await?; - let mv1_count = session.run("SELECT COUNT(v1) FROM mv1").await?; - - let missing_rows = session - .run("SELECT v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)") - .await?; - tracing::debug!(missing_rows); - let missing_rows_with_row_id = session - .run("SELECT _row_id, v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)") - .await?; - tracing::debug!(missing_rows_with_row_id); - + if t_count != mv1_count { + let missing_rows = session + .run("SELECT v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)") + .await?; + tracing::debug!(missing_rows); + let missing_rows_with_row_id = session + .run("SELECT _row_id, v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)") + .await?; + tracing::debug!(missing_rows_with_row_id); + } assert_eq!(t_count, mv1_count); // Make sure that if MV killed and restarted From 4d487f0e0e225a58ded5c17876e7ce47943940ee Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 16:43:46 +0800 Subject: [PATCH 17/22] decrease workload --- .../tests/integration_tests/recovery/background_ddl.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 18cb6052ebcfd..0f8fceb46fd58 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -84,7 +84,9 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { let mut session = cluster.start_session(); session.run(CREATE_TABLE).await?; - session.run(SEED_TABLE_500).await?; + session + .run("INSERT INTO t SELECT generate_series FROM generate_series(1, 200);") + .await?; session.flush().await?; session.run(SET_RATE_LIMIT_2).await?; session.run(SET_BACKGROUND_DDL).await?; @@ -96,7 +98,7 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { // Send some upstream updates. session - .run("INSERT INTO t SELECT generate_series FROM generate_series(501, 1000);") + .run("INSERT INTO t SELECT generate_series FROM generate_series(201, 400);") .await?; session.flush().await?; From 54bd7b62552abe7bf116edd5f54d189ee65d64dd Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 16:50:13 +0800 Subject: [PATCH 18/22] docs --- src/meta/src/barrier/progress.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 66209a18812fe..25a4d4d91dff3 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -159,8 +159,11 @@ impl TrackingJob { } } + /// Returns whether the `TrackingJob` requires a checkpoint to complete. pub(crate) fn is_checkpoint(&self) -> bool { match self { + // Recovered tracking job is always a streaming job, + // It requires a checkpoint to complete. TrackingJob::Recovered(_) => true, TrackingJob::New(command) => { command.context.kind.is_initial() || command.context.kind.is_checkpoint() From 35f82d2f5a2aa4013dcab8eea19422bebd9c2ded Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 19:39:03 +0800 Subject: [PATCH 19/22] redirect stdout as well --- ci/scripts/deterministic-it-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/scripts/deterministic-it-test.sh b/ci/scripts/deterministic-it-test.sh index 4e6fc9d5bcbb8..0cdadf017fc60 100755 --- a/ci/scripts/deterministic-it-test.sh +++ b/ci/scripts/deterministic-it-test.sh @@ -24,4 +24,4 @@ seq $TEST_NUM | parallel MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ --no-fail-fast \ --cargo-metadata target/nextest/cargo-metadata.json \ --binaries-metadata target/nextest/binaries-metadata.json \ - "$TEST_PATTERN" 2>$LOGDIR/deterministic-it-test-{}.log && rm $LOGDIR/deterministic-it-test-{}.log' + "$TEST_PATTERN" 1>$LOGDIR/deterministic-it-test-{}.log 2>&1 && rm $LOGDIR/deterministic-it-test-{}.log' From 1f5e493e482cc9f303325928e40062173e76d334 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 20:40:24 +0800 Subject: [PATCH 20/22] fix deterministic-it-test expression --- ci/scripts/deterministic-it-test.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ci/scripts/deterministic-it-test.sh b/ci/scripts/deterministic-it-test.sh index 0cdadf017fc60..1c43bd3faae97 100755 --- a/ci/scripts/deterministic-it-test.sh +++ b/ci/scripts/deterministic-it-test.sh @@ -19,9 +19,9 @@ mv target/ci-sim target/sim TEST_PATTERN="$@" echo "--- Run integration tests in deterministic simulation mode" -seq $TEST_NUM | parallel MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ - 'cargo nextest run \ +seq $TEST_NUM | parallel "MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ + cargo nextest run \ --no-fail-fast \ --cargo-metadata target/nextest/cargo-metadata.json \ --binaries-metadata target/nextest/binaries-metadata.json \ - "$TEST_PATTERN" 1>$LOGDIR/deterministic-it-test-{}.log 2>&1 && rm $LOGDIR/deterministic-it-test-{}.log' + $TEST_PATTERN 1>$LOGDIR/deterministic-it-test-{}.log 2>&1 && rm $LOGDIR/deterministic-it-test-{}.log" From e168f224fdc66b9be6a8ea6c4386d45f6e9a4044 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 22:10:23 +0800 Subject: [PATCH 21/22] revert timeout change --- ci/workflows/pull-request.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 6ee13ec730921..bd71c7f7044ef 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -82,7 +82,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 16 + timeout_in_minutes: 15 retry: *auto-retry - label: "end-to-end test (parallel)" From ba023ea5e1972ab95a4925c174ecc863f648bdc1 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 3 Nov 2023 23:28:29 +0800 Subject: [PATCH 22/22] improve readability --- src/meta/src/barrier/mod.rs | 36 +++++++++++++++----------------- src/meta/src/barrier/progress.rs | 2 +- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index bd9ee3ecb9bd1..654c6359ca1bf 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -206,7 +206,7 @@ struct CheckpointControl { metrics: Arc, /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - finished_commands: Vec, + finished_jobs: Vec, } impl CheckpointControl { @@ -218,30 +218,30 @@ impl CheckpointControl { adding_actors: Default::default(), removing_actors: Default::default(), metrics, - finished_commands: Default::default(), + finished_jobs: Default::default(), } } /// Stash a command to finish later. fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { - self.finished_commands.push(finished_job); + self.finished_jobs.push(finished_job); } - /// Finish stashed commands. If the current barrier is not a `checkpoint`, we will not finish - /// the commands that requires a checkpoint, else we will finish all the commands. + /// Finish stashed jobs. + /// If checkpoint, means all jobs can be finished. + /// If not checkpoint, jobs which do not require checkpoint can be finished. /// - /// Returns whether there are still remaining stashed commands to finish. - async fn finish_commands(&mut self, checkpoint: bool) -> MetaResult { - for command in self - .finished_commands - .extract_if(|c| checkpoint || !c.is_checkpoint()) + /// Returns whether there are still remaining stashed jobs to finish. + async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { + for job in self + .finished_jobs + .extract_if(|job| checkpoint || !job.is_checkpoint_required()) { // The command is ready to finish. We can now call `pre_finish`. - command.pre_finish().await?; - command.notify_finished(); + job.pre_finish().await?; + job.notify_finished(); } - - Ok(!self.finished_commands.is_empty()) + Ok(!self.finished_jobs.is_empty()) } fn cancel_command(&mut self, cancelled_job: TrackingJob) { @@ -258,7 +258,7 @@ impl CheckpointControl { } fn cancel_stashed_command(&mut self, id: TableId) { - self.finished_commands + self.finished_jobs .retain(|x| x.table_to_create() != Some(id)); } @@ -476,7 +476,7 @@ impl CheckpointControl { tracing::warn!("there are some changes in dropping_tables"); self.dropping_tables.clear(); } - self.finished_commands.clear(); + self.finished_jobs.clear(); } } @@ -1092,9 +1092,7 @@ impl GlobalBarrierManager { checkpoint_control.cancel_stashed_command(table_id); } - let remaining = checkpoint_control - .finish_commands(kind.is_checkpoint()) - .await?; + let remaining = checkpoint_control.finish_jobs(kind.is_checkpoint()).await?; // If there are remaining commands (that requires checkpoint to finish), we force // the next barrier to be a checkpoint. if remaining { diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 25a4d4d91dff3..2d5a745eaef5e 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -160,7 +160,7 @@ impl TrackingJob { } /// Returns whether the `TrackingJob` requires a checkpoint to complete. - pub(crate) fn is_checkpoint(&self) -> bool { + pub(crate) fn is_checkpoint_required(&self) -> bool { match self { // Recovered tracking job is always a streaming job, // It requires a checkpoint to complete.