diff --git a/e2e_test/ddl/alter_parallelism.slt b/e2e_test/ddl/alter_parallelism.slt index e030b44273575..025496ca1c571 100644 --- a/e2e_test/ddl/alter_parallelism.slt +++ b/e2e_test/ddl/alter_parallelism.slt @@ -22,7 +22,7 @@ create table t (v int); query T select parallelism from table_parallelism where name = 't'; ---- -AUTO +ADAPTIVE statement ok alter table t set parallelism = 2; @@ -39,12 +39,12 @@ select parallelism from fragment_parallelism where table_name = 't'; 2 statement ok -alter table t set parallelism = auto; +alter table t set parallelism = adaptive; query T select parallelism from table_parallelism where name = 't'; ---- -AUTO +ADAPTIVE statement ok create materialized view m_simple as select * from t; @@ -52,7 +52,7 @@ create materialized view m_simple as select * from t; query T select parallelism from mview_parallelism where name = 'm_simple'; ---- -AUTO +ADAPTIVE statement ok alter materialized view m_simple set parallelism = 3; @@ -68,7 +68,7 @@ create materialized view m_join as select t1.v as t1v, t2.v as t2v from t t1, t query T select parallelism from mview_parallelism where name = 'm_join'; ---- -AUTO +ADAPTIVE statement ok alter materialized view m_join set parallelism = 3; @@ -84,7 +84,7 @@ create sink s as select t1.v as t1v, t2.v as t2v from t t1, t t2 where t1.v = t2 query T select parallelism from sink_parallelism where name = 's'; ---- -AUTO +ADAPTIVE statement ok alter sink s set parallelism = 4; @@ -121,7 +121,7 @@ statement ok drop table t; statement ok -set streaming_parallelism to auto; +set streaming_parallelism to adaptive; statement ok create table t (v1 int); @@ -129,7 +129,7 @@ create table t (v1 int); query T select parallelism from table_parallelism where name = 't'; ---- -AUTO +ADAPTIVE statement ok drop table t; @@ -143,7 +143,7 @@ create table t (v1 int); query T select parallelism from table_parallelism where name = 't'; ---- -AUTO +ADAPTIVE statement ok drop table t; diff --git a/proto/meta.proto b/proto/meta.proto index 3e41032b33bfa..01492cc0c4fff 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -469,14 +469,18 @@ message TableParallelism { uint32 parallelism = 1; } + // deprecated, treated as AdaptiveParallelism message AutoParallelism {} + message AdaptiveParallelism {} + message CustomParallelism {} oneof parallelism { FixedParallelism fixed = 1; AutoParallelism auto = 2; CustomParallelism custom = 3; + AdaptiveParallelism adaptive = 4; } } diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 63d5f8c7dfb4f..52462d9111ad9 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -212,19 +212,18 @@ fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String { } fn extract_parallelism_from_table_state(state: &TableFragmentState) -> String { - let parallelism = match state + match state .parallelism .as_ref() .and_then(|parallelism| parallelism.parallelism.as_ref()) { - None => "unknown".to_string(), - Some(PbParallelism::Auto(_)) => "auto".to_string(), + Some(PbParallelism::Auto(_)) | Some(PbParallelism::Adaptive(_)) => "adaptive".to_string(), Some(PbParallelism::Fixed(PbFixedParallelism { parallelism })) => { format!("fixed({parallelism})") } Some(PbParallelism::Custom(_)) => "custom".to_string(), - }; - parallelism + None => "unknown".to_string(), + } } /// get acl items of `object` in string, ignore public. diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 586725c564885..2e1f1facea0f2 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -15,7 +15,9 @@ use pgwire::pg_response::StatementType; use risingwave_common::bail; use risingwave_common::error::{ErrorCode, Result}; -use risingwave_pb::meta::table_parallelism::{AutoParallelism, FixedParallelism, PbParallelism}; +use risingwave_pb::meta::table_parallelism::{ + AdaptiveParallelism, FixedParallelism, PbParallelism, +}; use risingwave_pb::meta::{PbTableParallelism, TableParallelism}; use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value}; use risingwave_sqlparser::keywords::Keyword; @@ -107,8 +109,8 @@ pub async fn handle_alter_parallelism( } fn extract_table_parallelism(parallelism: SetVariableValue) -> Result { - let auto_parallelism = PbTableParallelism { - parallelism: Some(PbParallelism::Auto(AutoParallelism {})), + let adaptive_parallelism = PbTableParallelism { + parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})), }; // If the target parallelism is set to 0/auto/default, we would consider it as auto parallelism. @@ -116,22 +118,22 @@ fn extract_table_parallelism(parallelism: SetVariableValue) -> Result + .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) => { - auto_parallelism + adaptive_parallelism } - SetVariableValue::Default => auto_parallelism, + SetVariableValue::Default => adaptive_parallelism, SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => { let fixed_parallelism = v.parse::().map_err(|e| { ErrorCode::InvalidInputSyntax(format!( - "target parallelism must be a valid number or auto: {}", + "target parallelism must be a valid number or adaptive: {}", e.as_report() )) })?; if fixed_parallelism == 0 { - auto_parallelism + adaptive_parallelism } else { PbTableParallelism { parallelism: Some(PbParallelism::Fixed(FixedParallelism { @@ -143,7 +145,7 @@ fn extract_table_parallelism(parallelism: SetVariableValue) -> Result { return Err(ErrorCode::InvalidInputSyntax( - "target parallelism must be a valid number or auto".to_string(), + "target parallelism must be a valid number or adaptive".to_string(), ) .into()); } diff --git a/src/frontend/src/handler/variable.rs b/src/frontend/src/handler/variable.rs index c317bfde004b2..dd633449050ce 100644 --- a/src/frontend/src/handler/variable.rs +++ b/src/frontend/src/handler/variable.rs @@ -22,6 +22,7 @@ use risingwave_common::session_config::{ConfigReporter, SESSION_CONFIG_LIST_SEP} use risingwave_common::system_param::is_mutable; use risingwave_common::types::{DataType, ScalarRefImpl}; use risingwave_sqlparser::ast::{Ident, SetTimeZoneValue, SetVariableValue, Value}; +use risingwave_sqlparser::keywords::Keyword; use super::RwPgResponse; use crate::handler::HandlerArgs; @@ -68,7 +69,7 @@ pub fn handle_set( .eq_ignore_ascii_case("streaming_parallelism") && string_val .as_ref() - .map(|val| val.eq_ignore_ascii_case("auto")) + .map(|val| val.eq_ignore_ascii_case(Keyword::ADAPTIVE.to_string().as_str())) .unwrap_or(false) { string_val = None; diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 58397cc069ada..8c85cad2a6597 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -230,7 +230,7 @@ derive_from_json_struct!( #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)] pub enum StreamingParallelism { - Auto, + Adaptive, Fixed(usize), } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index f324e083b4c43..88f0f22b0e07e 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -625,7 +625,7 @@ impl GlobalBarrierManagerContext { .map(|(table_id, parallelism)| { // no custom for sql backend let table_parallelism = match parallelism { - StreamingParallelism::Auto => TableParallelism::Auto, + StreamingParallelism::Adaptive => TableParallelism::Adaptive, StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n), }; @@ -725,12 +725,12 @@ impl GlobalBarrierManagerContext { if let Some(fragment) = derive_from_fragment { let fragment_parallelism = fragment.get_actors().len(); if fragment_parallelism >= current_parallelism { - TableParallelism::Auto + TableParallelism::Adaptive } else { TableParallelism::Fixed(fragment_parallelism) } } else { - TableParallelism::Auto + TableParallelism::Adaptive } } else { table.assigned_parallelism diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 37b31d047cec7..62b532c6d2f46 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -108,7 +108,7 @@ impl CatalogController { let create_type = streaming_job.create_type(); let streaming_parallelism = match parallelism { - None => StreamingParallelism::Auto, + None => StreamingParallelism::Adaptive, Some(n) => StreamingParallelism::Fixed(n.parallelism as _), }; @@ -493,7 +493,7 @@ impl CatalogController { } let parallelism = match default_parallelism { - None => StreamingParallelism::Auto, + None => StreamingParallelism::Adaptive, Some(n) => StreamingParallelism::Fixed(n.get() as _), }; @@ -1185,7 +1185,7 @@ impl CatalogController { .into_active_model(); streaming_job.parallelism = Set(match parallelism { - TableParallelism::Auto => StreamingParallelism::Auto, + TableParallelism::Adaptive => StreamingParallelism::Adaptive, TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n as _), TableParallelism::Custom => { unreachable!("sql backend does't support custom parallelism") diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 54993d8fee805..ef55f78493f85 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -23,7 +23,7 @@ use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; use risingwave_pb::meta::table_parallelism::{ - FixedParallelism, Parallelism, PbAutoParallelism, PbCustomParallelism, PbFixedParallelism, + FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism, PbParallelism, }; use risingwave_pb::meta::{PbTableFragments, PbTableParallelism}; @@ -45,7 +45,7 @@ const TABLE_FRAGMENTS_CF_NAME: &str = "cf/table_fragments"; #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum TableParallelism { /// This is when the system decides the parallelism, based on the available parallel units. - Auto, + Adaptive, /// We set this when the `TableFragments` parallelism is changed. /// All fragments which are part of the `TableFragment` will have the same parallelism as this. Fixed(usize), @@ -63,7 +63,7 @@ impl From for TableParallelism { use Parallelism::*; match &value.parallelism { Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize), - Some(Auto(_)) => Self::Auto, + Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive, Some(Custom(_)) => Self::Custom, _ => unreachable!(), } @@ -75,7 +75,7 @@ impl From for PbTableParallelism { use TableParallelism::*; let parallelism = match value { - Auto => PbParallelism::Auto(PbAutoParallelism {}), + Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}), Fixed(n) => PbParallelism::Fixed(PbFixedParallelism { parallelism: n as u32, }), @@ -198,7 +198,7 @@ impl TableFragments { fragments, &BTreeMap::new(), StreamContext::default(), - TableParallelism::Auto, + TableParallelism::Adaptive, ) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index bb7ffefb1f3ba..52459bce9ddc2 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1370,7 +1370,7 @@ impl DdlController { // actors on the compute nodes. let table_parallelism = match default_parallelism { - None => TableParallelism::Auto, + None => TableParallelism::Adaptive, Some(parallelism) => TableParallelism::Fixed(parallelism.get()), }; @@ -1801,7 +1801,7 @@ impl DdlController { assert!(dispatchers.is_empty()); let table_parallelism = match default_parallelism { - None => TableParallelism::Auto, + None => TableParallelism::Adaptive, Some(parallelism) => TableParallelism::Fixed(parallelism.get()), }; diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index b56757ac322c6..d73d38841322b 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1794,7 +1794,7 @@ impl ScaleController { ); } FragmentDistributionType::Hash => match parallelism { - TableParallelism::Auto => { + TableParallelism::Adaptive => { target_plan.insert( fragment_id, Self::diff_parallel_unit_change( diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 057a73ea487eb..d7a35fc1d90f4 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -1111,7 +1111,7 @@ mod tests { fragments, &locations.actor_locations, Default::default(), - TableParallelism::Auto, + TableParallelism::Adaptive, ); let ctx = CreateStreamingJobContext { building_locations: locations, diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index b73cacaa6a402..a82c1c1c04c6b 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -70,6 +70,7 @@ define_keywords!( ABORT, ABS, ACTION, + ADAPTIVE, ADD, AGGREGATE, ALL, diff --git a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs index c042451f286fe..9ec41df977c3b 100644 --- a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs @@ -324,7 +324,7 @@ async fn test_auto_parallelism_control_with_fixed_and_auto_helper( session .run("select parallelism from rw_table_fragments") .await? - .assert_result_eq("AUTO"); + .assert_result_eq("ADAPTIVE"); async fn locate_table_fragment(cluster: &mut Cluster) -> Result { cluster @@ -434,12 +434,14 @@ async fn test_auto_parallelism_control_with_fixed_and_auto_helper( // We alter parallelism back to auto - session.run("alter table t set parallelism = auto").await?; + session + .run("alter table t set parallelism = adaptive") + .await?; session .run("select parallelism from rw_table_fragments") .await? - .assert_result_eq("AUTO"); + .assert_result_eq("ADAPTIVE"); let table_mat_fragment = locate_table_fragment(&mut cluster).await?; @@ -523,7 +525,7 @@ async fn test_compatibility_with_low_level() -> Result<()> { session .run("select parallelism from table_parallelism") .await? - .assert_result_eq("AUTO"); + .assert_result_eq("ADAPTIVE"); let table_mat_fragment = cluster .locate_one_fragment(vec![ @@ -554,7 +556,7 @@ async fn test_compatibility_with_low_level() -> Result<()> { session .run("select parallelism from mview_parallelism where name = 'm_simple'") .await? - .assert_result_eq("AUTO"); + .assert_result_eq("ADAPTIVE"); let simple_mv_fragment = cluster .locate_one_fragment(vec![ @@ -575,16 +577,16 @@ async fn test_compatibility_with_low_level() -> Result<()> { // Since `m_simple` only has 1 fragment, and this fragment is a downstream of NO_SHUFFLE relation, // in reality, `m_simple` does not have a fragment of its own. // Therefore, any low-level modifications to this fragment will only be passed up to the highest level through the NO_SHUFFLE relationship and then passed back down. - // Hence, the parallelism of `m_simple` should still be equivalent to AUTO of 0 fragment. + // Hence, the parallelism of `m_simple` should still be equivalent to ADAPTIVE of 0 fragment. session .run("select parallelism from mview_parallelism where name = 'm_simple'") .await? - .assert_result_eq("AUTO"); + .assert_result_eq("ADAPTIVE"); session .run("select parallelism from mview_parallelism where name = 'm_join'") .await? - .assert_result_eq("AUTO"); + .assert_result_eq("ADAPTIVE"); let hash_join_fragment = cluster .locate_one_fragment(vec![identity_contains("hashJoin")]) @@ -660,7 +662,7 @@ async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result< session .run("select parallelism from table_parallelism") .await? - .assert_result_eq("AUTO"); + .assert_result_eq("ADAPTIVE"); // Find the table materialize fragment let table_mat_fragment = cluster @@ -693,7 +695,7 @@ async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result< session .run("select parallelism from mview_parallelism where name = 'm_simple'") .await? - .assert_result_eq("AUTO"); + .assert_result_eq("ADAPTIVE"); // Find the table fragment for materialized view let simple_mv_fragment = cluster