Skip to content

Commit

Permalink
Added deferred flag for parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jan 29, 2024
1 parent 44d097c commit d78be0a
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 37 deletions.
1 change: 1 addition & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ message AlterSetSchemaResponse {
message AlterParallelismRequest {
uint32 table_id = 1;
meta.TableParallelism parallelism = 2;
bool deferred = 3;
}

message AlterParallelismResponse {}
Expand Down
11 changes: 8 additions & 3 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ pub trait CatalogWriter: Send + Sync {

async fn alter_source_with_sr(&self, source: PbSource) -> Result<()>;

async fn alter_parallelism(&self, table_id: u32, parallelism: PbTableParallelism)
-> Result<()>;
async fn alter_parallelism(
&self,
table_id: u32,
parallelism: PbTableParallelism,
deferred: bool,
) -> Result<()>;

async fn alter_set_schema(
&self,
Expand Down Expand Up @@ -506,9 +510,10 @@ impl CatalogWriter for CatalogWriterImpl {
&self,
table_id: u32,
parallelism: PbTableParallelism,
deferred: bool,
) -> Result<()> {
self.meta_client
.alter_parallelism(table_id, parallelism)
.alter_parallelism(table_id, parallelism, deferred)
.await
.map_err(|e| anyhow!(e))?;

Expand Down
11 changes: 9 additions & 2 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub async fn handle_alter_parallelism(
obj_name: ObjectName,
parallelism: SetVariableValue,
stmt_type: StatementType,
deferred: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
Expand Down Expand Up @@ -93,10 +94,16 @@ pub async fn handle_alter_parallelism(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_parallelism(table_id, target_parallelism)
.alter_parallelism(table_id, target_parallelism, deferred)
.await?;

Ok(RwPgResponse::empty_result(stmt_type))
let mut builder = RwPgResponse::builder(stmt_type);

if deferred {
builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_string());
}

Ok(builder.into())
}

fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
Expand Down
28 changes: 24 additions & 4 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,18 @@ pub async fn handle(
}
Statement::AlterTable {
name,
operation: AlterTableOperation::SetParallelism { parallelism },
operation:
AlterTableOperation::SetParallelism {
parallelism,
deferred,
},
} => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_TABLE,
deferred,
)
.await
}
Expand All @@ -557,13 +562,18 @@ pub async fn handle(
} => alter_rename::handle_rename_index(handler_args, name, index_name).await,
Statement::AlterIndex {
name,
operation: AlterIndexOperation::SetParallelism { parallelism },
operation:
AlterIndexOperation::SetParallelism {
parallelism,
deferred,
},
} => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_INDEX,
deferred,
)
.await
}
Expand All @@ -587,13 +597,18 @@ pub async fn handle(
Statement::AlterView {
materialized,
name,
operation: AlterViewOperation::SetParallelism { parallelism },
operation:
AlterViewOperation::SetParallelism {
parallelism,
deferred,
},
} if materialized => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_MATERIALIZED_VIEW,
deferred,
)
.await
}
Expand Down Expand Up @@ -677,13 +692,18 @@ pub async fn handle(
}
Statement::AlterSink {
name,
operation: AlterSinkOperation::SetParallelism { parallelism },
operation:
AlterSinkOperation::SetParallelism {
parallelism,
deferred,
},
} => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_SINK,
deferred,
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ impl CatalogWriter for MockCatalogWriter {
&self,
_table_id: u32,
_parallelism: PbTableParallelism,
_deferred: bool,
) -> Result<()> {
todo!()
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,9 +841,10 @@ impl DdlService for DdlServiceImpl {

let table_id = req.get_table_id();
let parallelism = req.get_parallelism()?.clone();
let deferred = req.get_deferred();

self.ddl_controller
.alter_parallelism(table_id, parallelism)
.alter_parallelism(table_id, parallelism, deferred)
.await?;

Ok(Response::new(AlterParallelismResponse {}))
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,10 @@ impl DdlController {
&self,
table_id: u32,
parallelism: PbTableParallelism,
deferred: bool,
) -> MetaResult<()> {
self.stream_manager
.alter_table_parallelism(table_id, parallelism.into())
.alter_table_parallelism(table_id, parallelism.into(), deferred)
.await
}

Expand Down
22 changes: 13 additions & 9 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ impl GlobalStreamManager {
&self,
table_id: u32,
parallelism: TableParallelism,
deferred: bool,
) -> MetaResult<()> {
let MetadataManager::V1(mgr) = &self.metadata_manager else {
unimplemented!("support alter table parallelism in v2");
Expand All @@ -754,15 +755,18 @@ impl GlobalStreamManager {
.map(|node| node.id)
.collect::<BTreeSet<_>>();

let reschedules = self
.scale_controller
.as_ref()
.unwrap()
.generate_table_resize_plan(TableResizePolicy {
worker_ids,
table_parallelisms: vec![(table_id, parallelism)].into_iter().collect(),
})
.await?;
let reschedules = if deferred {
HashMap::new()
} else {
self.scale_controller
.as_ref()
.unwrap()
.generate_table_resize_plan(TableResizePolicy {
worker_ids,
table_parallelisms: vec![(table_id, parallelism)].into_iter().collect(),
})
.await?
};

self.reschedule_actors(
reschedules,
Expand Down
2 changes: 2 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,12 @@ impl MetaClient {
&self,
table_id: u32,
parallelism: PbTableParallelism,
deferred: bool,
) -> Result<()> {
let request = AlterParallelismRequest {
table_id,
parallelism: Some(parallelism),
deferred,
};

self.inner.alter_parallelism(request).await?;
Expand Down
64 changes: 51 additions & 13 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ pub enum AlterTableOperation {
ChangeOwner { new_owner_name: Ident },
/// `SET SCHEMA <schema_name>`
SetSchema { new_schema_name: ObjectName },
/// `SET PARALLELISM TO <parallelism>`
SetParallelism { parallelism: SetVariableValue },
/// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
SetParallelism {
parallelism: SetVariableValue,
deferred: bool,
},
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -96,9 +99,10 @@ pub enum AlterIndexOperation {
RenameIndex {
index_name: ObjectName,
},
/// `SET PARALLELISM TO <parallelism>`
/// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
SetParallelism {
parallelism: SetVariableValue,
deferred: bool,
},
}

Expand All @@ -115,9 +119,10 @@ pub enum AlterViewOperation {
SetSchema {
new_schema_name: ObjectName,
},
/// `SET PARALLELISM TO <parallelism>`
/// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
SetParallelism {
parallelism: SetVariableValue,
deferred: bool,
},
}

Expand All @@ -134,9 +139,10 @@ pub enum AlterSinkOperation {
SetSchema {
new_schema_name: ObjectName,
},
/// `SET PARALLELISM TO <parallelism>`
/// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
SetParallelism {
parallelism: SetVariableValue,
deferred: bool,
},
}

Expand Down Expand Up @@ -246,8 +252,16 @@ impl fmt::Display for AlterTableOperation {
AlterTableOperation::SetSchema { new_schema_name } => {
write!(f, "SET SCHEMA {}", new_schema_name)
}
AlterTableOperation::SetParallelism { parallelism } => {
write!(f, "SET PARALLELISM TO {}", parallelism)
AlterTableOperation::SetParallelism {
parallelism,
deferred,
} => {
write!(
f,
"SET PARALLELISM TO {} {}",
parallelism,
if *deferred { " DEFERRED" } else { "" }
)
}
}
}
Expand All @@ -259,8 +273,16 @@ impl fmt::Display for AlterIndexOperation {
AlterIndexOperation::RenameIndex { index_name } => {
write!(f, "RENAME TO {index_name}")
}
AlterIndexOperation::SetParallelism { parallelism } => {
write!(f, "SET PARALLELISM TO {}", parallelism)
AlterIndexOperation::SetParallelism {
parallelism,
deferred,
} => {
write!(
f,
"SET PARALLELISM TO {} {}",
parallelism,
if *deferred { " DEFERRED" } else { "" }
)
}
}
}
Expand All @@ -278,8 +300,16 @@ impl fmt::Display for AlterViewOperation {
AlterViewOperation::SetSchema { new_schema_name } => {
write!(f, "SET SCHEMA {}", new_schema_name)
}
AlterViewOperation::SetParallelism { parallelism } => {
write!(f, "SET PARALLELISM TO {}", parallelism)
AlterViewOperation::SetParallelism {
parallelism,
deferred,
} => {
write!(
f,
"SET PARALLELISM TO {} {}",
parallelism,
if *deferred { " DEFERRED" } else { "" }
)
}
}
}
Expand All @@ -297,8 +327,16 @@ impl fmt::Display for AlterSinkOperation {
AlterSinkOperation::SetSchema { new_schema_name } => {
write!(f, "SET SCHEMA {}", new_schema_name)
}
AlterSinkOperation::SetParallelism { parallelism } => {
write!(f, "SET PARALLELISM TO {}", parallelism)
AlterSinkOperation::SetParallelism {
parallelism,
deferred,
} => {
write!(
f,
"SET PARALLELISM TO {} {}",
parallelism,
if *deferred { " DEFERRED" } else { "" }
)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ define_keywords!(
DECLARE,
DEFAULT,
DEFERRABLE,
DEFERRED,
DELETE,
DELIMITED,
DENSE_RANK,
Expand Down
Loading

0 comments on commit d78be0a

Please sign in to comment.