Skip to content

Commit

Permalink
refactor(test): refactor scaling tests for arrangement backfill (#14577)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Jan 16, 2024
1 parent 83e829e commit 08a08b4
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 18 deletions.
10 changes: 10 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,20 @@ use crate::stream::{build_actor_connector_splits, build_actor_split_impls, Split
/// Column family name for table fragments.
const TABLE_FRAGMENTS_CF_NAME: &str = "cf/table_fragments";

/// The parallelism for a `TableFragments`.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TableParallelism {
/// This is when the system decides the parallelism, based on the available parallel units.
Auto,
/// We set this when the `TableFragments` parallelism is changed.
/// All fragments which are part of the `TableFragment` will have the same parallelism as this.
Fixed(usize),
/// We set this when the individual parallelisms of the `Fragments`
/// can differ within a `TableFragments`.
/// This is set for `risectl`, since it has a low-level interface,
/// scale individual `Fragments` within `TableFragments`.
/// When that happens, the `TableFragments` no longer has a consistent
/// parallelism, so we set this to indicate that.
Custom,
}

Expand Down
40 changes: 37 additions & 3 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Default for Configuration {
}

impl Configuration {
/// Returns the config for scale test.
/// Returns the configuration for scale test.
pub fn for_scale() -> Self {
// Embed the config file and create a temporary file at runtime. The file will be deleted
// automatically when it's dropped.
Expand All @@ -128,7 +128,31 @@ impl Configuration {
meta_nodes: 3,
compactor_nodes: 2,
compute_node_cores: 2,
per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()]
..Default::default()
}
}

/// Provides a configuration for scale test which ensures that the arrangement backfill is disabled,
/// so table scan will use `no_shuffle`.
pub fn for_scale_no_shuffle() -> Self {
// Embed the config file and create a temporary file at runtime. The file will be deleted
// automatically when it's dropped.
let config_path = {
let mut file =
tempfile::NamedTempFile::new().expect("failed to create temp config file");
file.write_all(include_bytes!("risingwave-scale.toml"))
.expect("failed to write config file");
file.into_temp_path()
};

Configuration {
config_path: ConfigPath::Temp(config_path.into()),
frontend_nodes: 2,
compute_nodes: 3,
meta_nodes: 3,
compactor_nodes: 2,
compute_node_cores: 2,
per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL = false;".into()]
.into(),
..Default::default()
}
Expand Down Expand Up @@ -170,7 +194,10 @@ metrics_level = "Disabled"
meta_nodes: 1,
compactor_nodes: 1,
compute_node_cores: 2,
per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()]
per_session_queries: vec![
"create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
"create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
]
.into(),
..Default::default()
}
Expand Down Expand Up @@ -819,6 +846,13 @@ impl Session {
self.run("FLUSH").await?;
Ok(())
}

pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
let result = self
.run("show streaming_enable_arrangement_backfill")
.await?;
Ok(result == "true")
}
}

/// Options for killing nodes.
Expand Down
123 changes: 118 additions & 5 deletions src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ async fn test_compatibility_with_low_level() -> Result<()> {
);
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();
session
.run("SET streaming_enable_arrangement_backfill = false;")
.await?;

// Keep one worker reserved for adding later.
let select_worker = "compute-2";
Expand All @@ -505,10 +508,6 @@ async fn test_compatibility_with_low_level() -> Result<()> {
))
.await;

// helper views
session.run("create view table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;").await?;
session.run("create view mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;").await?;

session.run("create table t(v int);").await?;

// single fragment downstream
Expand Down Expand Up @@ -593,7 +592,7 @@ async fn test_compatibility_with_low_level() -> Result<()> {

let hash_join_fragment_id = hash_join_fragment.id();

// manual scale in m_simple materialize fragment
// manual scale in m_join materialize fragment
cluster
.reschedule_resolve_no_shuffle(format!(
"{hash_join_fragment_id}-[{chosen_parallel_unit_a}]"
Expand Down Expand Up @@ -626,3 +625,117 @@ async fn test_compatibility_with_low_level() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<()> {
let config = Configuration::for_auto_parallelism(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE,
false,
true,
);
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();

// Keep one worker reserved for adding later.
let select_worker = "compute-2";
cluster
.simple_kill_nodes(vec![select_worker.to_string()])
.await;

sleep(Duration::from_secs(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2,
))
.await;

session.run("create table t(v int);").await?;

// Streaming arrangement backfill
session
.run("SET streaming_enable_arrangement_backfill = true;")
.await?;
session
.run("create materialized view m_simple as select * from t;")
.await?;

session
.run("select parallelism from table_parallelism")
.await?
.assert_result_eq("AUTO");

// Find the table materialize fragment
let table_mat_fragment = cluster
.locate_one_fragment(vec![
identity_contains("materialize"),
identity_contains("union"),
])
.await?;

let (mut all_parallel_units, _) = table_mat_fragment.parallel_unit_usage();

let chosen_parallel_unit_a = all_parallel_units.pop().unwrap();
let chosen_parallel_unit_b = all_parallel_units.pop().unwrap();

let table_mat_fragment_id = table_mat_fragment.id();

// manual scale in table materialize fragment
cluster
.reschedule(format!(
"{table_mat_fragment_id}-[{chosen_parallel_unit_a}]",
))
.await?;

session
.run("select parallelism from table_parallelism")
.await?
.assert_result_eq("CUSTOM");

// Upstream changes should not affect downstream.
session
.run("select parallelism from mview_parallelism where name = 'm_simple'")
.await?
.assert_result_eq("AUTO");

// Find the table fragment for materialized view
let simple_mv_fragment = cluster
.locate_one_fragment(vec![
identity_contains("materialize"),
identity_contains("StreamTableScan"),
])
.await?;

let simple_mv_fragment_id = simple_mv_fragment.id();

// manual scale in m_simple materialize fragment
cluster
.reschedule_resolve_no_shuffle(format!(
"{simple_mv_fragment_id}-[{chosen_parallel_unit_b}]",
))
.await?;

// The downstream table fragment should be separate from the upstream table fragment.
session
.run("select parallelism from mview_parallelism where name = 'm_simple'")
.await?
.assert_result_eq("CUSTOM");

let before_fragment_parallelism = session
.run("select fragment_id, parallelism from rw_fragments order by fragment_id;")
.await?;

cluster
.simple_restart_nodes(vec![select_worker.to_string()])
.await;

sleep(Duration::from_secs(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2,
))
.await;

let after_fragment_parallelism = session
.run("select fragment_id, parallelism from rw_fragments order by fragment_id;")
.await?;

assert_eq!(before_fragment_parallelism, after_fragment_parallelism);

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const MV5: &str = "create materialized view m5 as select * from m4;";
async fn test_simple_cascade_materialized_view() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut session = cluster.start_session();
let arrangement_backfill_is_enabled = session.is_arrangement_backfill_enabled().await?;

session.run(ROOT_TABLE_CREATE).await?;
session.run(MV1).await?;
Expand All @@ -57,10 +58,19 @@ async fn test_simple_cascade_materialized_view() -> Result<()> {
.locate_one_fragment([identity_contains("StreamTableScan")])
.await?;

assert_eq!(
chain_fragment.inner.actors.len(),
fragment.inner.actors.len()
);
if arrangement_backfill_is_enabled {
// The chain fragment is in a different table fragment.
assert_eq!(chain_fragment.inner.actors.len(), 6,);
// The upstream materialized fragment should be scaled in
assert_eq!(fragment.inner.actors.len(), 1,);
} else {
// No shuffle, so the fragment of upstream materialized node is the same
// as stream table scan.
assert_eq!(
chain_fragment.inner.actors.len(),
fragment.inner.actors.len()
);
}

session
.run(&format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_simulation::utils::AssertResult;

#[tokio::test]
async fn test_delta_join() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
let mut session = cluster.start_session();

session.run("set rw_implicit_flush = true;").await?;
Expand Down Expand Up @@ -125,7 +125,7 @@ async fn test_delta_join() -> Result<()> {

#[tokio::test]
async fn test_share_multiple_no_shuffle_upstream() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
let mut session = cluster.start_session();

session.run("create table t (a int, b int);").await?;
Expand All @@ -145,7 +145,7 @@ async fn test_share_multiple_no_shuffle_upstream() -> Result<()> {

#[tokio::test]
async fn test_resolve_no_shuffle_upstream() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
let mut session = cluster.start_session();

session.run("create table t (v int);").await?;
Expand Down
4 changes: 2 additions & 2 deletions src/tests/simulation/tests/integration_tests/scale/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async fn test_resize_single() -> Result<()> {

#[tokio::test]
async fn test_resize_single_failed() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
let mut session = cluster.start_session();

session.run("create table t (v int);").await?;
Expand Down Expand Up @@ -245,7 +245,7 @@ async fn test_resize_single_failed() -> Result<()> {
}
#[tokio::test]
async fn test_resize_no_shuffle() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
let mut session = cluster.start_session();

session.run("create table t (v int);").await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn test_mv_on_scaled_table() -> Result<()> {

#[tokio::test]
async fn test_scale_on_schema_change() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
cluster.run(ROOT_TABLE_CREATE).await?;

cluster.run(MV1).await?;
Expand Down

0 comments on commit 08a08b4

Please sign in to comment.