Skip to content

Commit

Permalink
Modified GlobalBarrierManager and scale_in_when_recovery.rs for e…
Browse files Browse the repository at this point in the history
…rror handling during recovery and scaling in.
  • Loading branch information
shanicky committed Nov 6, 2023
1 parent 766f79c commit 25044ad
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 18 deletions.
20 changes: 12 additions & 8 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,20 +456,24 @@ impl GlobalBarrierManager {
)
.await?;

match self
if let Err(e) = self
.scale_controller
.post_apply_reschedule(&reschedule_fragment)
.await
{
Ok(_) => {}
Err(_e) => {
self.fragment_manager
.cancel_apply_reschedules(applied_reschedules)
.await;
}
tracing::error!(
"failed to apply reschedule for offline scaling in recovery: {}",
e.to_string()
);

self.fragment_manager
.cancel_apply_reschedules(applied_reschedules)
.await;

return Err(e);
}

debug!("migrate actors succeed.");
debug!("scaling-in actors succeed.");
Ok(true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
use std::time::Duration;

use anyhow::Result;
use itertools::Itertools;
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_simulation::cluster::{Cluster, Configuration};
use risingwave_simulation::ctl_ext::predicate::identity_contains;
use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains};
use risingwave_simulation::utils::AssertResult;
use tokio::time::sleep;

#[tokio::test]
Expand All @@ -25,19 +28,59 @@ async fn test_scale_in_when_recovery() -> Result<()> {
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();

session.run("CREATE TABLE t1 (v1 int);").await?;
session.run("create table t (v1 int);").await?;
session
.run("INSERT INTO t1 select * from generate_series(1, 100)")
.run("create materialized view m as select count(*) from t;")
.await?;

session
.run("insert into t select * from generate_series(1, 100)")
.await?;
session.run("flush").await?;

sleep(Duration::from_secs(5)).await;

let fragment = cluster
.locate_one_fragment(vec![identity_contains("materialize")])
let table_mat_fragment = cluster
.locate_one_fragment(vec![
identity_contains("materialize"),
no_identity_contains("simpleAgg"),
])
.await?;

let single_agg_fragment = cluster
.locate_one_fragment(vec![
identity_contains("simpleAgg"),
identity_contains("materialize"),
])
.await?;

let (all_parallel_units, used_parallel_units) = fragment.parallel_unit_usage();
let (_, single_used_parallel_unit_ids) = single_agg_fragment.parallel_unit_usage();

let used_parallel_unit_id = single_used_parallel_unit_ids.iter().next().unwrap();

let mut workers: Vec<WorkerNode> = cluster
.get_cluster_info()
.await?
.worker_nodes
.into_iter()
.filter(|worker| worker.r#type() == WorkerType::ComputeNode)
.collect();

let prev_workers = workers
.extract_if(|worker| {
worker
.parallel_units
.iter()
.map(|parallel_unit| parallel_unit.id)
.contains(used_parallel_unit_id)
})
.collect_vec();

let prev_worker = prev_workers.into_iter().exactly_one().unwrap();
let host = prev_worker.host.unwrap().host;
let host_name = format!("compute-{}", host.split('.').last().unwrap());

let (all_parallel_units, used_parallel_units) = table_mat_fragment.parallel_unit_usage();

assert_eq!(all_parallel_units.len(), used_parallel_units.len());

Expand All @@ -52,19 +95,72 @@ async fn test_scale_in_when_recovery() -> Result<()> {
let restart_delay = 30;

cluster
.kill_nodes_and_restart(vec!["compute-1"], restart_delay)
.kill_nodes_and_restart(vec![host_name], restart_delay)
.await;

let fragment = cluster
.locate_one_fragment(vec![identity_contains("materialize")])
let table_mat_fragment = cluster
.locate_one_fragment(vec![
identity_contains("materialize"),
no_identity_contains("simpleAgg"),
])
.await?;

let (_, used_parallel_units) = table_mat_fragment.parallel_unit_usage();

assert_eq!(
initialized_parallelism - config.compute_node_cores,
used_parallel_units.len()
);

let chain_fragment = cluster
.locate_one_fragment(vec![identity_contains("chain")])
.await?;

let (_, used_parallel_units) = fragment.parallel_unit_usage();
let (_, used_parallel_units) = chain_fragment.parallel_unit_usage();

assert_eq!(
initialized_parallelism - config.compute_node_cores,
used_parallel_units.len()
);

let single_agg_fragment = cluster
.locate_one_fragment(vec![
identity_contains("simpleAgg"),
identity_contains("materialize"),
])
.await?;

let (_, used_parallel_units_ids) = single_agg_fragment.parallel_unit_usage();

assert_eq!(used_parallel_units_ids.len(), 1);

assert_ne!(single_used_parallel_unit_ids, used_parallel_units_ids);

session
.run("select count(*) from t")
.await?
.assert_result_eq("100");

session
.run("select * from m")
.await?
.assert_result_eq("100");

session
.run("INSERT INTO t select * from generate_series(101, 150)")
.await?;

session.run("flush").await?;

session
.run("select count(*) from t")
.await?
.assert_result_eq("150");

session
.run("select * from m")
.await?
.assert_result_eq("150");

Ok(())
}

0 comments on commit 25044ad

Please sign in to comment.