From 25044ad0eaa1d712b213d90daff2840c89ad0299 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 6 Nov 2023 19:36:26 +0800 Subject: [PATCH] Modified `GlobalBarrierManager` and `scale_in_when_recovery.rs` for error handling during recovery and scaling in. --- src/meta/src/barrier/recovery.rs | 20 +-- .../recovery/scale_in_when_recovery.rs | 116 ++++++++++++++++-- 2 files changed, 118 insertions(+), 18 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index faa5fbe72b9e6..fc18ba4fbb612 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -456,20 +456,24 @@ impl GlobalBarrierManager { ) .await?; - match self + if let Err(e) = self .scale_controller .post_apply_reschedule(&reschedule_fragment) .await { - Ok(_) => {} - Err(_e) => { - self.fragment_manager - .cancel_apply_reschedules(applied_reschedules) - .await; - } + tracing::error!( + "failed to apply reschedule for offline scaling in recovery: {}", + e.to_string() + ); + + self.fragment_manager + .cancel_apply_reschedules(applied_reschedules) + .await; + + return Err(e); } - debug!("migrate actors succeed."); + debug!("scaling-in actors succeed."); Ok(true) } diff --git a/src/tests/simulation/tests/integration_tests/recovery/scale_in_when_recovery.rs b/src/tests/simulation/tests/integration_tests/recovery/scale_in_when_recovery.rs index 9fe792b25b3b0..e397fc934f7f2 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/scale_in_when_recovery.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/scale_in_when_recovery.rs @@ -15,8 +15,11 @@ use std::time::Duration; use anyhow::Result; +use itertools::Itertools; +use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_simulation::cluster::{Cluster, Configuration}; -use risingwave_simulation::ctl_ext::predicate::identity_contains; +use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; +use risingwave_simulation::utils::AssertResult; use tokio::time::sleep; #[tokio::test] @@ -25,19 +28,59 @@ async fn test_scale_in_when_recovery() -> Result<()> { let mut cluster = Cluster::start(config.clone()).await?; let mut session = cluster.start_session(); - session.run("CREATE TABLE t1 (v1 int);").await?; + session.run("create table t (v1 int);").await?; session - .run("INSERT INTO t1 select * from generate_series(1, 100)") + .run("create materialized view m as select count(*) from t;") + .await?; + + session + .run("insert into t select * from generate_series(1, 100)") .await?; session.run("flush").await?; sleep(Duration::from_secs(5)).await; - let fragment = cluster - .locate_one_fragment(vec![identity_contains("materialize")]) + let table_mat_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("materialize"), + no_identity_contains("simpleAgg"), + ]) + .await?; + + let single_agg_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("simpleAgg"), + identity_contains("materialize"), + ]) .await?; - let (all_parallel_units, used_parallel_units) = fragment.parallel_unit_usage(); + let (_, single_used_parallel_unit_ids) = single_agg_fragment.parallel_unit_usage(); + + let used_parallel_unit_id = single_used_parallel_unit_ids.iter().next().unwrap(); + + let mut workers: Vec = cluster + .get_cluster_info() + .await? + .worker_nodes + .into_iter() + .filter(|worker| worker.r#type() == WorkerType::ComputeNode) + .collect(); + + let prev_workers = workers + .extract_if(|worker| { + worker + .parallel_units + .iter() + .map(|parallel_unit| parallel_unit.id) + .contains(used_parallel_unit_id) + }) + .collect_vec(); + + let prev_worker = prev_workers.into_iter().exactly_one().unwrap(); + let host = prev_worker.host.unwrap().host; + let host_name = format!("compute-{}", host.split('.').last().unwrap()); + + let (all_parallel_units, used_parallel_units) = table_mat_fragment.parallel_unit_usage(); assert_eq!(all_parallel_units.len(), used_parallel_units.len()); @@ -52,19 +95,72 @@ async fn test_scale_in_when_recovery() -> Result<()> { let restart_delay = 30; cluster - .kill_nodes_and_restart(vec!["compute-1"], restart_delay) + .kill_nodes_and_restart(vec![host_name], restart_delay) .await; - let fragment = cluster - .locate_one_fragment(vec![identity_contains("materialize")]) + let table_mat_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("materialize"), + no_identity_contains("simpleAgg"), + ]) + .await?; + + let (_, used_parallel_units) = table_mat_fragment.parallel_unit_usage(); + + assert_eq!( + initialized_parallelism - config.compute_node_cores, + used_parallel_units.len() + ); + + let chain_fragment = cluster + .locate_one_fragment(vec![identity_contains("chain")]) .await?; - let (_, used_parallel_units) = fragment.parallel_unit_usage(); + let (_, used_parallel_units) = chain_fragment.parallel_unit_usage(); assert_eq!( initialized_parallelism - config.compute_node_cores, used_parallel_units.len() ); + let single_agg_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("simpleAgg"), + identity_contains("materialize"), + ]) + .await?; + + let (_, used_parallel_units_ids) = single_agg_fragment.parallel_unit_usage(); + + assert_eq!(used_parallel_units_ids.len(), 1); + + assert_ne!(single_used_parallel_unit_ids, used_parallel_units_ids); + + session + .run("select count(*) from t") + .await? + .assert_result_eq("100"); + + session + .run("select * from m") + .await? + .assert_result_eq("100"); + + session + .run("INSERT INTO t select * from generate_series(101, 150)") + .await?; + + session.run("flush").await?; + + session + .run("select count(*) from t") + .await? + .assert_result_eq("150"); + + session + .run("select * from m") + .await? + .assert_result_eq("150"); + Ok(()) }