From d78be0a78f82cf5a044f8370449244eaeaff8f01 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sat, 27 Jan 2024 02:06:51 +0800 Subject: [PATCH] Added deferred flag for parallelism --- proto/ddl_service.proto | 1 + src/frontend/src/catalog/catalog_service.rs | 11 +++- src/frontend/src/handler/alter_parallelism.rs | 11 +++- src/frontend/src/handler/mod.rs | 28 ++++++-- src/frontend/src/test_utils.rs | 1 + src/meta/service/src/ddl_service.rs | 3 +- src/meta/src/rpc/ddl_controller.rs | 3 +- src/meta/src/stream/stream_manager.rs | 22 ++++--- src/rpc_client/src/meta_client.rs | 2 + src/sqlparser/src/ast/ddl.rs | 64 +++++++++++++++---- src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 27 ++++++-- 12 files changed, 137 insertions(+), 37 deletions(-) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 1b584a7df78e1..6e66543627639 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -220,6 +220,7 @@ message AlterSetSchemaResponse { message AlterParallelismRequest { uint32 table_id = 1; meta.TableParallelism parallelism = 2; + bool deferred = 3; } message AlterParallelismResponse {} diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 28b2c64b28551..a785ed9ac0282 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -178,8 +178,12 @@ pub trait CatalogWriter: Send + Sync { async fn alter_source_with_sr(&self, source: PbSource) -> Result<()>; - async fn alter_parallelism(&self, table_id: u32, parallelism: PbTableParallelism) - -> Result<()>; + async fn alter_parallelism( + &self, + table_id: u32, + parallelism: PbTableParallelism, + deferred: bool, + ) -> Result<()>; async fn alter_set_schema( &self, @@ -506,9 +510,10 @@ impl CatalogWriter for CatalogWriterImpl { &self, table_id: u32, parallelism: PbTableParallelism, + deferred: bool, ) -> Result<()> { self.meta_client - .alter_parallelism(table_id, parallelism) + .alter_parallelism(table_id, parallelism, deferred) .await .map_err(|e| anyhow!(e))?; diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 50bbb1792ff9a..586725c564885 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -32,6 +32,7 @@ pub async fn handle_alter_parallelism( obj_name: ObjectName, parallelism: SetVariableValue, stmt_type: StatementType, + deferred: bool, ) -> Result { let session = handler_args.session; let db_name = session.database(); @@ -93,10 +94,16 @@ pub async fn handle_alter_parallelism( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_parallelism(table_id, target_parallelism) + .alter_parallelism(table_id, target_parallelism, deferred) .await?; - Ok(RwPgResponse::empty_result(stmt_type)) + let mut builder = RwPgResponse::builder(stmt_type); + + if deferred { + builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_string()); + } + + Ok(builder.into()) } fn extract_table_parallelism(parallelism: SetVariableValue) -> Result { diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 806daa89ce026..5becea016e108 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -528,13 +528,18 @@ pub async fn handle( } Statement::AlterTable { name, - operation: AlterTableOperation::SetParallelism { parallelism }, + operation: + AlterTableOperation::SetParallelism { + parallelism, + deferred, + }, } => { alter_parallelism::handle_alter_parallelism( handler_args, name, parallelism, StatementType::ALTER_TABLE, + deferred, ) .await } @@ -557,13 +562,18 @@ pub async fn handle( } => alter_rename::handle_rename_index(handler_args, name, index_name).await, Statement::AlterIndex { name, - operation: AlterIndexOperation::SetParallelism { parallelism }, + operation: + AlterIndexOperation::SetParallelism { + parallelism, + deferred, + }, } => { alter_parallelism::handle_alter_parallelism( handler_args, name, parallelism, StatementType::ALTER_INDEX, + deferred, ) .await } @@ -587,13 +597,18 @@ pub async fn handle( Statement::AlterView { materialized, name, - operation: AlterViewOperation::SetParallelism { parallelism }, + operation: + AlterViewOperation::SetParallelism { + parallelism, + deferred, + }, } if materialized => { alter_parallelism::handle_alter_parallelism( handler_args, name, parallelism, StatementType::ALTER_MATERIALIZED_VIEW, + deferred, ) .await } @@ -677,13 +692,18 @@ pub async fn handle( } Statement::AlterSink { name, - operation: AlterSinkOperation::SetParallelism { parallelism }, + operation: + AlterSinkOperation::SetParallelism { + parallelism, + deferred, + }, } => { alter_parallelism::handle_alter_parallelism( handler_args, name, parallelism, StatementType::ALTER_SINK, + deferred, ) .await } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 223fc739f6453..c7fbea9d401f7 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -598,6 +598,7 @@ impl CatalogWriter for MockCatalogWriter { &self, _table_id: u32, _parallelism: PbTableParallelism, + _deferred: bool, ) -> Result<()> { todo!() } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 7b7d46260052b..223ee2238032c 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -841,9 +841,10 @@ impl DdlService for DdlServiceImpl { let table_id = req.get_table_id(); let parallelism = req.get_parallelism()?.clone(); + let deferred = req.get_deferred(); self.ddl_controller - .alter_parallelism(table_id, parallelism) + .alter_parallelism(table_id, parallelism, deferred) .await?; Ok(Response::new(AlterParallelismResponse {})) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index e4296f7f403c2..0f9a537729b98 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -354,9 +354,10 @@ impl DdlController { &self, table_id: u32, parallelism: PbTableParallelism, + deferred: bool, ) -> MetaResult<()> { self.stream_manager - .alter_table_parallelism(table_id, parallelism.into()) + .alter_table_parallelism(table_id, parallelism.into(), deferred) .await } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index ff1758aa20b96..8aef7a68d5483 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -737,6 +737,7 @@ impl GlobalStreamManager { &self, table_id: u32, parallelism: TableParallelism, + deferred: bool, ) -> MetaResult<()> { let MetadataManager::V1(mgr) = &self.metadata_manager else { unimplemented!("support alter table parallelism in v2"); @@ -754,15 +755,18 @@ impl GlobalStreamManager { .map(|node| node.id) .collect::>(); - let reschedules = self - .scale_controller - .as_ref() - .unwrap() - .generate_table_resize_plan(TableResizePolicy { - worker_ids, - table_parallelisms: vec![(table_id, parallelism)].into_iter().collect(), - }) - .await?; + let reschedules = if deferred { + HashMap::new() + } else { + self.scale_controller + .as_ref() + .unwrap() + .generate_table_resize_plan(TableResizePolicy { + worker_ids, + table_parallelisms: vec![(table_id, parallelism)].into_iter().collect(), + }) + .await? + }; self.reschedule_actors( reschedules, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 812c522e5e033..4d0b3b6b7a673 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -470,10 +470,12 @@ impl MetaClient { &self, table_id: u32, parallelism: PbTableParallelism, + deferred: bool, ) -> Result<()> { let request = AlterParallelismRequest { table_id, parallelism: Some(parallelism), + deferred, }; self.inner.alter_parallelism(request).await?; diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index b0a95cacc1e16..59f74a3ad3acb 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -85,8 +85,11 @@ pub enum AlterTableOperation { ChangeOwner { new_owner_name: Ident }, /// `SET SCHEMA ` SetSchema { new_schema_name: ObjectName }, - /// `SET PARALLELISM TO ` - SetParallelism { parallelism: SetVariableValue }, + /// `SET PARALLELISM TO [ DEFERRED ]` + SetParallelism { + parallelism: SetVariableValue, + deferred: bool, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -96,9 +99,10 @@ pub enum AlterIndexOperation { RenameIndex { index_name: ObjectName, }, - /// `SET PARALLELISM TO ` + /// `SET PARALLELISM TO [ DEFERRED ]` SetParallelism { parallelism: SetVariableValue, + deferred: bool, }, } @@ -115,9 +119,10 @@ pub enum AlterViewOperation { SetSchema { new_schema_name: ObjectName, }, - /// `SET PARALLELISM TO ` + /// `SET PARALLELISM TO [ DEFERRED ]` SetParallelism { parallelism: SetVariableValue, + deferred: bool, }, } @@ -134,9 +139,10 @@ pub enum AlterSinkOperation { SetSchema { new_schema_name: ObjectName, }, - /// `SET PARALLELISM TO ` + /// `SET PARALLELISM TO [ DEFERRED ]` SetParallelism { parallelism: SetVariableValue, + deferred: bool, }, } @@ -246,8 +252,16 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::SetSchema { new_schema_name } => { write!(f, "SET SCHEMA {}", new_schema_name) } - AlterTableOperation::SetParallelism { parallelism } => { - write!(f, "SET PARALLELISM TO {}", parallelism) + AlterTableOperation::SetParallelism { + parallelism, + deferred, + } => { + write!( + f, + "SET PARALLELISM TO {} {}", + parallelism, + if *deferred { " DEFERRED" } else { "" } + ) } } } @@ -259,8 +273,16 @@ impl fmt::Display for AlterIndexOperation { AlterIndexOperation::RenameIndex { index_name } => { write!(f, "RENAME TO {index_name}") } - AlterIndexOperation::SetParallelism { parallelism } => { - write!(f, "SET PARALLELISM TO {}", parallelism) + AlterIndexOperation::SetParallelism { + parallelism, + deferred, + } => { + write!( + f, + "SET PARALLELISM TO {} {}", + parallelism, + if *deferred { " DEFERRED" } else { "" } + ) } } } @@ -278,8 +300,16 @@ impl fmt::Display for AlterViewOperation { AlterViewOperation::SetSchema { new_schema_name } => { write!(f, "SET SCHEMA {}", new_schema_name) } - AlterViewOperation::SetParallelism { parallelism } => { - write!(f, "SET PARALLELISM TO {}", parallelism) + AlterViewOperation::SetParallelism { + parallelism, + deferred, + } => { + write!( + f, + "SET PARALLELISM TO {} {}", + parallelism, + if *deferred { " DEFERRED" } else { "" } + ) } } } @@ -297,8 +327,16 @@ impl fmt::Display for AlterSinkOperation { AlterSinkOperation::SetSchema { new_schema_name } => { write!(f, "SET SCHEMA {}", new_schema_name) } - AlterSinkOperation::SetParallelism { parallelism } => { - write!(f, "SET PARALLELISM TO {}", parallelism) + AlterSinkOperation::SetParallelism { + parallelism, + deferred, + } => { + write!( + f, + "SET PARALLELISM TO {} {}", + parallelism, + if *deferred { " DEFERRED" } else { "" } + ) } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index dae6529376c4c..b73cacaa6a402 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -183,6 +183,7 @@ define_keywords!( DECLARE, DEFAULT, DEFERRABLE, + DEFERRED, DELETE, DELIMITED, DENSE_RANK, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index dfb0a030125de..9257c28e94d39 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3017,7 +3017,12 @@ impl Parser { let value = self.parse_set_variable()?; - AlterTableOperation::SetParallelism { parallelism: value } + let deferred = self.parse_keyword(Keyword::DEFERRED); + + AlterTableOperation::SetParallelism { + parallelism: value, + deferred, + } } else { return self.expected("SCHEMA/PARALLELISM after SET", self.peek_token()); } @@ -3096,7 +3101,12 @@ impl Parser { let value = self.parse_set_variable()?; - AlterIndexOperation::SetParallelism { parallelism: value } + let deferred = self.parse_keyword(Keyword::DEFERRED); + + AlterIndexOperation::SetParallelism { + parallelism: value, + deferred, + } } else { return self.expected("PARALLELISM after SET", self.peek_token()); } @@ -3142,7 +3152,12 @@ impl Parser { let value = self.parse_set_variable()?; - AlterViewOperation::SetParallelism { parallelism: value } + let deferred = self.parse_keyword(Keyword::DEFERRED); + + AlterViewOperation::SetParallelism { + parallelism: value, + deferred, + } } else { return self.expected("SCHEMA/PARALLELISM after SET", self.peek_token()); } @@ -3194,8 +3209,12 @@ impl Parser { } let value = self.parse_set_variable()?; + let deferred = self.parse_keyword(Keyword::DEFERRED); - AlterSinkOperation::SetParallelism { parallelism: value } + AlterSinkOperation::SetParallelism { + parallelism: value, + deferred, + } } else { return self.expected("SCHEMA/PARALLELISM after SET", self.peek_token()); }