Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(test): refactor scaling tests for arrangement backfill #14577

Merged
merged 10 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,20 @@ use crate::stream::{build_actor_connector_splits, build_actor_split_impls, Split
/// Column family name for table fragments.
const TABLE_FRAGMENTS_CF_NAME: &str = "cf/table_fragments";

/// The parallelism for a `TableFragments`.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TableParallelism {
/// This is when the system decides the parallelism, based on the available parallel units.
Auto,
/// We set this when the `TableFragments` parallelism is changed.
/// All fragments which are part of the `TableFragment` will have the same parallelism as this.
Fixed(usize),
/// We set this when the individual parallelisms of the `Fragments`
/// can differ within a `TableFragments`.
/// This is set for `risectl`, since it has a low-level interface,
/// scale individual `Fragments` within `TableFragments`.
/// When that happens, the `TableFragments` no longer has a consistent
/// parallelism, so we set this to indicate that.
Custom,
}

Expand Down
36 changes: 34 additions & 2 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,29 @@ impl Configuration {
meta_nodes: 3,
compactor_nodes: 2,
compute_node_cores: 2,
per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()]
..Default::default()
}
}

pub fn for_scale_no_shuffle() -> Self {
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
// Embed the config file and create a temporary file at runtime. The file will be deleted
// automatically when it's dropped.
let config_path = {
let mut file =
tempfile::NamedTempFile::new().expect("failed to create temp config file");
file.write_all(include_bytes!("risingwave-scale.toml"))
.expect("failed to write config file");
file.into_temp_path()
};

Configuration {
config_path: ConfigPath::Temp(config_path.into()),
frontend_nodes: 2,
compute_nodes: 3,
meta_nodes: 3,
compactor_nodes: 2,
compute_node_cores: 2,
per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL = false;".into()]
.into(),
..Default::default()
}
Expand Down Expand Up @@ -170,7 +192,10 @@ metrics_level = "Disabled"
meta_nodes: 1,
compactor_nodes: 1,
compute_node_cores: 2,
per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()]
per_session_queries: vec![
"create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
"create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
]
.into(),
..Default::default()
}
Expand Down Expand Up @@ -819,6 +844,13 @@ impl Session {
self.run("FLUSH").await?;
Ok(())
}

pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
let result = self
.run("show streaming_enable_arrangement_backfill")
.await?;
Ok(result == "true")
}
}

/// Options for killing nodes.
Expand Down
123 changes: 118 additions & 5 deletions src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ async fn test_compatibility_with_low_level() -> Result<()> {
);
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();
session
.run("SET streaming_enable_arrangement_backfill = false;")
.await?;

// Keep one worker reserved for adding later.
let select_worker = "compute-2";
Expand All @@ -505,10 +508,6 @@ async fn test_compatibility_with_low_level() -> Result<()> {
))
.await;

// helper views
session.run("create view table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;").await?;
session.run("create view mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;").await?;

session.run("create table t(v int);").await?;

// single fragment downstream
Expand Down Expand Up @@ -593,7 +592,7 @@ async fn test_compatibility_with_low_level() -> Result<()> {

let hash_join_fragment_id = hash_join_fragment.id();

// manual scale in m_simple materialize fragment
// manual scale in m_join materialize fragment
cluster
.reschedule_resolve_no_shuffle(format!(
"{hash_join_fragment_id}-[{chosen_parallel_unit_a}]"
Expand Down Expand Up @@ -626,3 +625,117 @@ async fn test_compatibility_with_low_level() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<()> {
let config = Configuration::for_auto_parallelism(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE,
false,
true,
);
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();

// Keep one worker reserved for adding later.
let select_worker = "compute-2";
cluster
.simple_kill_nodes(vec![select_worker.to_string()])
.await;

sleep(Duration::from_secs(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2,
))
.await;

session.run("create table t(v int);").await?;

// Streaming arrangement backfill
session
.run("SET streaming_enable_arrangement_backfill = true;")
.await?;
session
.run("create materialized view m_simple as select * from t;")
.await?;

session
.run("select parallelism from table_parallelism")
.await?
.assert_result_eq("AUTO");

// Find the table materialize fragment
let table_mat_fragment = cluster
.locate_one_fragment(vec![
identity_contains("materialize"),
identity_contains("union"),
])
.await?;

let (mut all_parallel_units, _) = table_mat_fragment.parallel_unit_usage();

let chosen_parallel_unit_a = all_parallel_units.pop().unwrap();
let chosen_parallel_unit_b = all_parallel_units.pop().unwrap();

let table_mat_fragment_id = table_mat_fragment.id();

// manual scale in table materialize fragment
cluster
.reschedule(format!(
"{table_mat_fragment_id}-[{chosen_parallel_unit_a}]",
))
.await?;

session
.run("select parallelism from table_parallelism")
.await?
.assert_result_eq("CUSTOM");

// Upstream changes should not affect downstream.
session
.run("select parallelism from mview_parallelism where name = 'm_simple'")
.await?
.assert_result_eq("AUTO");

// Find the table fragment for materialized view
let simple_mv_fragment = cluster
.locate_one_fragment(vec![
identity_contains("materialize"),
identity_contains("StreamTableScan"),
])
.await?;

let simple_mv_fragment_id = simple_mv_fragment.id();

// manual scale in m_simple materialize fragment
cluster
.reschedule_resolve_no_shuffle(format!(
"{simple_mv_fragment_id}-[{chosen_parallel_unit_b}]",
))
.await?;

// The downstream table fragment should be separate from the upstream table fragment.
session
.run("select parallelism from mview_parallelism where name = 'm_simple'")
.await?
.assert_result_eq("CUSTOM");

let before_fragment_parallelism = session
.run("select fragment_id, parallelism from rw_fragments order by fragment_id;")
.await?;

cluster
.simple_restart_nodes(vec![select_worker.to_string()])
.await;

sleep(Duration::from_secs(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2,
))
.await;

let after_fragment_parallelism = session
.run("select fragment_id, parallelism from rw_fragments order by fragment_id;")
.await?;

assert_eq!(before_fragment_parallelism, after_fragment_parallelism);

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const MV5: &str = "create materialized view m5 as select * from m4;";
async fn test_simple_cascade_materialized_view() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut session = cluster.start_session();
let arrangement_backfill_is_enabled = session.is_arrangement_backfill_enabled().await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why could this vary during runtime? Didn't quite get it. 😕

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it's for the tests to pass when the default value for enable_arrangment_backfill is set to true manually.

Can we integrate the tests with arrangement backfill into CI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it's for the tests to pass when the default value for enable_arrangment_backfill is set to true manually.

Can we integrate the tests with arrangement backfill into CI?

Yeah that's correct. Will do in separate PR, requires more refactor.


session.run(ROOT_TABLE_CREATE).await?;
session.run(MV1).await?;
Expand All @@ -57,10 +58,19 @@ async fn test_simple_cascade_materialized_view() -> Result<()> {
.locate_one_fragment([identity_contains("StreamTableScan")])
.await?;

assert_eq!(
chain_fragment.inner.actors.len(),
fragment.inner.actors.len()
);
if arrangement_backfill_is_enabled {
// The chain fragment is in a different table fragment.
assert_eq!(chain_fragment.inner.actors.len(), 6,);
// The upstream materialized fragment should be scaled in
assert_eq!(fragment.inner.actors.len(), 1,);
} else {
// No shuffle, so the fragment of upstream materialized node is the same
// as stream table scan.
assert_eq!(
chain_fragment.inner.actors.len(),
fragment.inner.actors.len()
);
}

session
.run(&format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_simulation::utils::AssertResult;

#[tokio::test]
async fn test_delta_join() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
let mut session = cluster.start_session();

session.run("set rw_implicit_flush = true;").await?;
Expand Down Expand Up @@ -125,7 +125,7 @@ async fn test_delta_join() -> Result<()> {

#[tokio::test]
async fn test_share_multiple_no_shuffle_upstream() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
let mut session = cluster.start_session();

session.run("create table t (a int, b int);").await?;
Expand All @@ -145,7 +145,7 @@ async fn test_share_multiple_no_shuffle_upstream() -> Result<()> {

#[tokio::test]
async fn test_resolve_no_shuffle_upstream() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
let mut session = cluster.start_session();

session.run("create table t (v int);").await?;
Expand Down
4 changes: 2 additions & 2 deletions src/tests/simulation/tests/integration_tests/scale/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async fn test_resize_single() -> Result<()> {

#[tokio::test]
async fn test_resize_single_failed() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
let mut session = cluster.start_session();

session.run("create table t (v int);").await?;
Expand Down Expand Up @@ -245,7 +245,7 @@ async fn test_resize_single_failed() -> Result<()> {
}
#[tokio::test]
async fn test_resize_no_shuffle() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
let mut session = cluster.start_session();

session.run("create table t (v int);").await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn test_mv_on_scaled_table() -> Result<()> {

#[tokio::test]
async fn test_scale_on_schema_change() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?;
cluster.run(ROOT_TABLE_CREATE).await?;

cluster.run(MV1).await?;
Expand Down
Loading