Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support deferred mode for alter parallelism #14826

Merged
merged 13 commits into from
Jan 30, 2024
1 change: 1 addition & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ message AlterSetSchemaResponse {
message AlterParallelismRequest {
uint32 table_id = 1;
meta.TableParallelism parallelism = 2;
bool deferred = 3;
}

message AlterParallelismResponse {}
Expand Down
11 changes: 8 additions & 3 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ pub trait CatalogWriter: Send + Sync {

async fn alter_source_with_sr(&self, source: PbSource) -> Result<()>;

async fn alter_parallelism(&self, table_id: u32, parallelism: PbTableParallelism)
-> Result<()>;
async fn alter_parallelism(
&self,
table_id: u32,
parallelism: PbTableParallelism,
deferred: bool,
) -> Result<()>;

async fn alter_set_schema(
&self,
Expand Down Expand Up @@ -506,9 +510,10 @@ impl CatalogWriter for CatalogWriterImpl {
&self,
table_id: u32,
parallelism: PbTableParallelism,
deferred: bool,
) -> Result<()> {
self.meta_client
.alter_parallelism(table_id, parallelism)
.alter_parallelism(table_id, parallelism, deferred)
.await
.map_err(|e| anyhow!(e))?;

Expand Down
11 changes: 9 additions & 2 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub async fn handle_alter_parallelism(
obj_name: ObjectName,
parallelism: SetVariableValue,
stmt_type: StatementType,
deferred: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
Expand Down Expand Up @@ -93,10 +94,16 @@ pub async fn handle_alter_parallelism(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_parallelism(table_id, target_parallelism)
.alter_parallelism(table_id, target_parallelism, deferred)
.await?;

Ok(RwPgResponse::empty_result(stmt_type))
let mut builder = RwPgResponse::builder(stmt_type);

if deferred {
builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_string());
}

Ok(builder.into())
}

fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
Expand Down
28 changes: 24 additions & 4 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,18 @@ pub async fn handle(
}
Statement::AlterTable {
name,
operation: AlterTableOperation::SetParallelism { parallelism },
operation:
AlterTableOperation::SetParallelism {
parallelism,
deferred,
},
} => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_TABLE,
deferred,
)
.await
}
Expand All @@ -557,13 +562,18 @@ pub async fn handle(
} => alter_rename::handle_rename_index(handler_args, name, index_name).await,
Statement::AlterIndex {
name,
operation: AlterIndexOperation::SetParallelism { parallelism },
operation:
AlterIndexOperation::SetParallelism {
parallelism,
deferred,
},
} => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_INDEX,
deferred,
)
.await
}
Expand All @@ -587,13 +597,18 @@ pub async fn handle(
Statement::AlterView {
materialized,
name,
operation: AlterViewOperation::SetParallelism { parallelism },
operation:
AlterViewOperation::SetParallelism {
parallelism,
deferred,
},
} if materialized => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_MATERIALIZED_VIEW,
deferred,
)
.await
}
Expand Down Expand Up @@ -677,13 +692,18 @@ pub async fn handle(
}
Statement::AlterSink {
name,
operation: AlterSinkOperation::SetParallelism { parallelism },
operation:
AlterSinkOperation::SetParallelism {
parallelism,
deferred,
},
} => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_SINK,
deferred,
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ impl CatalogWriter for MockCatalogWriter {
&self,
_table_id: u32,
_parallelism: PbTableParallelism,
_deferred: bool,
) -> Result<()> {
todo!()
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,9 +841,10 @@ impl DdlService for DdlServiceImpl {

let table_id = req.get_table_id();
let parallelism = req.get_parallelism()?.clone();
let deferred = req.get_deferred();

self.ddl_controller
.alter_parallelism(table_id, parallelism)
.alter_parallelism(table_id, parallelism, deferred)
.await?;

Ok(Response::new(AlterParallelismResponse {}))
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::PausedReason;
Expand All @@ -34,7 +35,7 @@ use risingwave_pb::stream_service::{
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::{debug, warn, Instrument};
use tracing::{debug, info, warn, Instrument};
use uuid::Uuid;

use super::TracedEpoch;
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,10 @@ impl DdlController {
&self,
table_id: u32,
parallelism: PbTableParallelism,
deferred: bool,
) -> MetaResult<()> {
self.stream_manager
.alter_table_parallelism(table_id, parallelism.into())
.alter_table_parallelism(table_id, parallelism.into(), deferred)
.await
}

Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2407,12 +2407,15 @@ impl GlobalStreamManager {
.await;
}));

tracing::debug!("pausing tick lock in source manager");
let _source_pause_guard = self.source_manager.paused.lock().await;

self.barrier_scheduler
.run_config_change_command_with_pause(command)
.await?;

tracing::info!("reschedule done");

Ok(())
}

Expand Down
62 changes: 45 additions & 17 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ impl GlobalStreamManager {
&self,
table_id: u32,
parallelism: TableParallelism,
deferred: bool,
) -> MetaResult<()> {
let MetadataManager::V1(mgr) = &self.metadata_manager else {
unimplemented!("support alter table parallelism in v2");
Expand All @@ -755,24 +756,51 @@ impl GlobalStreamManager {
.map(|node| node.id)
.collect::<BTreeSet<_>>();

let reschedules = self
.scale_controller
.as_ref()
.unwrap()
.generate_table_resize_plan(TableResizePolicy {
worker_ids,
table_parallelisms: vec![(table_id, parallelism)].into_iter().collect(),
})
.await?;
let table_parallelism_assignment = HashMap::from([(TableId::new(table_id), parallelism)]);

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: false,
},
Some(HashMap::from([(TableId::new(table_id), parallelism)])),
)
.await?;
if deferred {
tracing::debug!(
"deferred mode enabled for job {}, set the parallelism directly to {:?}",
table_id,
parallelism
);
self.scale_controller
.as_ref()
.unwrap()
.post_apply_reschedule(&HashMap::new(), &table_parallelism_assignment)
.await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is acceptable to keep such a quick path to modify the streaming job parallelism, but it is better not to have any public documents. It should only be used in urgent situations when the user cluster load is too high and the response time is not timely. Because this can easily cause inconsistencies, after supporting trigger recovery manually we'd better call it here rather than rebooting the meta.

} else {
let reschedules = self
.scale_controller
.as_ref()
.unwrap()
.generate_table_resize_plan(TableResizePolicy {
worker_ids,
table_parallelisms: table_parallelism_assignment
.iter()
.map(|(id, parallelism)| (id.table_id, *parallelism))
.collect(),
})
.await?;

if reschedules.is_empty() {
tracing::debug!("empty reschedule plan generated for job {}, set the parallelism directly to {:?}", table_id, parallelism);
self.scale_controller
.as_ref()
.unwrap()
.post_apply_reschedule(&HashMap::new(), &table_parallelism_assignment)
.await?;
} else {
self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: false,
},
Some(table_parallelism_assignment),
)
.await?;
}
};

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,12 @@ impl MetaClient {
&self,
table_id: u32,
parallelism: PbTableParallelism,
deferred: bool,
) -> Result<()> {
let request = AlterParallelismRequest {
table_id,
parallelism: Some(parallelism),
deferred,
};

self.inner.alter_parallelism(request).await?;
Expand Down
Loading
Loading