From 8f0588f37372e789e22033e5b9bbe8a2bf611618 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 26 Jan 2024 19:17:31 +0800 Subject: [PATCH 01/11] init Signed-off-by: Shanicky Chen --- src/meta/src/manager/cluster.rs | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index a8cf96d95a8b5..bf4b694cf7d82 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -108,7 +108,7 @@ impl ClusterManager { property: AddNodeProperty, resource: risingwave_pb::common::worker_node::Resource, ) -> MetaResult { - let worker_node_parallelism = property.worker_node_parallelism as usize; + let new_worker_parallelism = property.worker_node_parallelism as usize; let mut property = self.parse_property(r#type, property); let mut core = self.core.write().await; @@ -124,8 +124,8 @@ impl ClusterManager { .unwrap_or_default(); } - let current_parallelism = worker.worker_node.parallel_units.len(); - if current_parallelism == worker_node_parallelism + let old_worker_parallelism = worker.worker_node.parallel_units.len(); + if old_worker_parallelism == new_worker_parallelism && worker.worker_node.property == property { worker.update_expire_at(self.max_heartbeat_interval); @@ -133,32 +133,26 @@ impl ClusterManager { } let mut new_worker = worker.clone(); - match current_parallelism.cmp(&worker_node_parallelism) { + match old_worker_parallelism.cmp(&new_worker_parallelism) { Ordering::Less => { tracing::info!( "worker {} parallelism updated from {} to {}", new_worker.worker_node.id, - current_parallelism, - worker_node_parallelism + old_worker_parallelism, + new_worker_parallelism ); let parallel_units = self .generate_cn_parallel_units( - worker_node_parallelism - current_parallelism, + new_worker_parallelism - old_worker_parallelism, new_worker.worker_id(), ) .await?; new_worker.worker_node.parallel_units.extend(parallel_units); } - Ordering::Greater => { - // Warn and keep the original parallelism if the worker registered with a - // smaller parallelism. - tracing::warn!( - "worker {} parallelism is less than current, current is {}, but received {}", - new_worker.worker_id(), - current_parallelism, - worker_node_parallelism - ); - } + Ordering::Greater => new_worker + .worker_node + .parallel_units + .truncate(new_worker_parallelism), Ordering::Equal => {} } if property != new_worker.worker_node.property { @@ -194,7 +188,7 @@ impl ClusterManager { // Generate parallel units. let parallel_units = if r#type == WorkerType::ComputeNode { - self.generate_cn_parallel_units(worker_node_parallelism, worker_id) + self.generate_cn_parallel_units(new_worker_parallelism, worker_id) .await? } else { vec![] From 7b263e8123d78e96b73e15f8454b2d2c3cf701fe Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sat, 27 Jan 2024 01:14:56 +0800 Subject: [PATCH 02/11] Update recovery & scaling logic --- risedev.yml | 22 ++++++++++++++++++++ src/meta/src/barrier/recovery.rs | 35 ++++++++++++++++++++++++-------- src/meta/src/manager/cluster.rs | 30 ++++++++++++++++++++++----- src/meta/src/stream/scale.rs | 2 +- 4 files changed, 75 insertions(+), 14 deletions(-) diff --git a/risedev.yml b/risedev.yml index 7be1334deb4b4..38ed00e15fc63 100644 --- a/risedev.yml +++ b/risedev.yml @@ -809,6 +809,28 @@ profile: - use: frontend - use: compactor + ci-3cn-1fe-with-recovery: + config-path: src/config/ci-recovery.toml + steps: + - use: minio + - use: etcd + unsafe-no-fsync: true + - use: meta-node + - use: compute-node + port: 5687 + exporter-port: 1222 + enable-tiered-cache: true + - use: compute-node + port: 5688 + exporter-port: 1223 + enable-tiered-cache: true + - use: compute-node + port: 5689 + exporter-port: 1224 + enable-tiered-cache: true + - use: frontend + - use: compactor + ci-1cn-1fe-kafka-with-recovery: config-path: src/config/ci-recovery.toml steps: diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index e0ace5f9678a4..30e6e9ccf29db 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -22,6 +22,7 @@ use futures::stream::FuturesUnordered; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::hash::ParallelUnitId; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::PausedReason; @@ -34,7 +35,7 @@ use risingwave_pb::stream_service::{ use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; -use tracing::{debug, warn, Instrument}; +use tracing::{debug, info, warn, Instrument}; use uuid::Uuid; use super::TracedEpoch; @@ -603,18 +604,36 @@ impl GlobalBarrierManagerContext { }; debug!("start scaling-in offline actors."); - let expired_workers: HashSet = info - .actor_map + let prev_worker_parallel_units = mgr.fragment_manager.all_worker_parallel_units().await; + + let curr_worker_parallel_units: HashMap> = info + .node_map .iter() - .filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker)) - .map(|(&worker, _)| worker) + .map(|(worker_id, worker_node)| { + ( + *worker_id, + worker_node + .parallel_units + .iter() + .map(|parallel_unit| parallel_unit.id) + .collect(), + ) + }) .collect(); - if expired_workers.is_empty() { - debug!("no expired workers, skipping."); + // todo: maybe we can only check the reduced workers + if curr_worker_parallel_units == prev_worker_parallel_units { + debug!("no changed workers, skipping."); return Ok(false); } + info!("parallel unit has changed, triggering a forced reschedule."); + + debug!( + "previous worker parallel units {:?}, current worker parallel units {:?}", + prev_worker_parallel_units, curr_worker_parallel_units + ); + let table_parallelisms = { let guard = mgr.fragment_manager.get_fragment_read_guard().await; diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index bf4b694cf7d82..6ee3db33d2956 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -149,10 +149,30 @@ impl ClusterManager { .await?; new_worker.worker_node.parallel_units.extend(parallel_units); } - Ordering::Greater => new_worker - .worker_node - .parallel_units - .truncate(new_worker_parallelism), + Ordering::Greater => { + if self.env.opts.enable_scale_in_when_recovery { + // Handing over to the subsequent recovery loop for a forced reschedule. + tracing::info!( + "worker {} parallelism reduced from {} to {}", + new_worker.worker_node.id, + old_worker_parallelism, + new_worker_parallelism + ); + new_worker + .worker_node + .parallel_units + .truncate(new_worker_parallelism) + } else { + // Warn and keep the original parallelism if the worker registered with a + // smaller parallelism, entering compatibility mode. + tracing::warn!( + "worker {} parallelism is less than current, current is {}, but received {}", + new_worker.worker_id(), + new_worker_parallelism, + old_worker_parallelism, + ); + } + } Ordering::Equal => {} } if property != new_worker.worker_node.property { @@ -572,7 +592,7 @@ impl ClusterManagerCore { None => { return Err(MetaError::unavailable( "no available transactional id for worker", - )) + )); } Some(id) => id, }; diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 2674975488e45..e0a2fb390f42d 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2420,7 +2420,7 @@ impl GlobalStreamManager { .prepare_reschedule_command(reschedules, options, table_parallelism.as_mut()) .await?; - tracing::debug!("reschedule plan: {:#?}", reschedule_fragment); + tracing::debug!("reschedule plan: {:?}", reschedule_fragment); let command = Command::RescheduleFragment { reschedules: reschedule_fragment, From 570c20eb018c67149f32f515353accf47d37719f Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sat, 27 Jan 2024 01:28:33 +0800 Subject: [PATCH 03/11] Remove BTreeMap from recovery.rs --- src/meta/src/barrier/recovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 30e6e9ccf29db..c9e38bcad7d69 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; From 14961c6a505a6d1e257630c3fa8f1764ab87aef6 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sat, 27 Jan 2024 01:14:56 +0800 Subject: [PATCH 04/11] Update recovery & scaling logic --- src/meta/src/barrier/recovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index c9e38bcad7d69..30e6e9ccf29db 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; From e2411ce6e76aadccf9b8b1da956678ebd156c276 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sat, 27 Jan 2024 01:28:33 +0800 Subject: [PATCH 05/11] Remove BTreeMap from recovery.rs --- src/meta/src/barrier/recovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 30e6e9ccf29db..c9e38bcad7d69 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; From 55fc99897216106f333e06ec9d8da6dd06808a18 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sat, 27 Jan 2024 01:14:56 +0800 Subject: [PATCH 06/11] Update recovery & scaling logic --- src/meta/src/barrier/recovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index c9e38bcad7d69..30e6e9ccf29db 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; From c887863c418d4988b3fb9ce1b32d51eda28d0f89 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sat, 27 Jan 2024 01:28:33 +0800 Subject: [PATCH 07/11] Remove BTreeMap from recovery.rs --- src/meta/src/barrier/recovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 30e6e9ccf29db..c9e38bcad7d69 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; From 77146e3bdb8d903488149669488030fde26f9b21 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sat, 27 Jan 2024 02:06:51 +0800 Subject: [PATCH 08/11] Added deferred flag for parallelism --- proto/ddl_service.proto | 1 + src/frontend/src/catalog/catalog_service.rs | 11 +++- src/frontend/src/handler/alter_parallelism.rs | 11 +++- src/frontend/src/handler/mod.rs | 28 ++++++-- src/frontend/src/test_utils.rs | 1 + src/meta/service/src/ddl_service.rs | 3 +- src/meta/src/rpc/ddl_controller.rs | 3 +- src/meta/src/stream/stream_manager.rs | 22 ++++--- src/rpc_client/src/meta_client.rs | 2 + src/sqlparser/src/ast/ddl.rs | 64 +++++++++++++++---- src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 27 ++++++-- 12 files changed, 137 insertions(+), 37 deletions(-) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 1b584a7df78e1..6e66543627639 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -220,6 +220,7 @@ message AlterSetSchemaResponse { message AlterParallelismRequest { uint32 table_id = 1; meta.TableParallelism parallelism = 2; + bool deferred = 3; } message AlterParallelismResponse {} diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 28b2c64b28551..a785ed9ac0282 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -178,8 +178,12 @@ pub trait CatalogWriter: Send + Sync { async fn alter_source_with_sr(&self, source: PbSource) -> Result<()>; - async fn alter_parallelism(&self, table_id: u32, parallelism: PbTableParallelism) - -> Result<()>; + async fn alter_parallelism( + &self, + table_id: u32, + parallelism: PbTableParallelism, + deferred: bool, + ) -> Result<()>; async fn alter_set_schema( &self, @@ -506,9 +510,10 @@ impl CatalogWriter for CatalogWriterImpl { &self, table_id: u32, parallelism: PbTableParallelism, + deferred: bool, ) -> Result<()> { self.meta_client - .alter_parallelism(table_id, parallelism) + .alter_parallelism(table_id, parallelism, deferred) .await .map_err(|e| anyhow!(e))?; diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 50bbb1792ff9a..586725c564885 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -32,6 +32,7 @@ pub async fn handle_alter_parallelism( obj_name: ObjectName, parallelism: SetVariableValue, stmt_type: StatementType, + deferred: bool, ) -> Result { let session = handler_args.session; let db_name = session.database(); @@ -93,10 +94,16 @@ pub async fn handle_alter_parallelism( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_parallelism(table_id, target_parallelism) + .alter_parallelism(table_id, target_parallelism, deferred) .await?; - Ok(RwPgResponse::empty_result(stmt_type)) + let mut builder = RwPgResponse::builder(stmt_type); + + if deferred { + builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_string()); + } + + Ok(builder.into()) } fn extract_table_parallelism(parallelism: SetVariableValue) -> Result { diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 806daa89ce026..5becea016e108 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -528,13 +528,18 @@ pub async fn handle( } Statement::AlterTable { name, - operation: AlterTableOperation::SetParallelism { parallelism }, + operation: + AlterTableOperation::SetParallelism { + parallelism, + deferred, + }, } => { alter_parallelism::handle_alter_parallelism( handler_args, name, parallelism, StatementType::ALTER_TABLE, + deferred, ) .await } @@ -557,13 +562,18 @@ pub async fn handle( } => alter_rename::handle_rename_index(handler_args, name, index_name).await, Statement::AlterIndex { name, - operation: AlterIndexOperation::SetParallelism { parallelism }, + operation: + AlterIndexOperation::SetParallelism { + parallelism, + deferred, + }, } => { alter_parallelism::handle_alter_parallelism( handler_args, name, parallelism, StatementType::ALTER_INDEX, + deferred, ) .await } @@ -587,13 +597,18 @@ pub async fn handle( Statement::AlterView { materialized, name, - operation: AlterViewOperation::SetParallelism { parallelism }, + operation: + AlterViewOperation::SetParallelism { + parallelism, + deferred, + }, } if materialized => { alter_parallelism::handle_alter_parallelism( handler_args, name, parallelism, StatementType::ALTER_MATERIALIZED_VIEW, + deferred, ) .await } @@ -677,13 +692,18 @@ pub async fn handle( } Statement::AlterSink { name, - operation: AlterSinkOperation::SetParallelism { parallelism }, + operation: + AlterSinkOperation::SetParallelism { + parallelism, + deferred, + }, } => { alter_parallelism::handle_alter_parallelism( handler_args, name, parallelism, StatementType::ALTER_SINK, + deferred, ) .await } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 223fc739f6453..c7fbea9d401f7 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -598,6 +598,7 @@ impl CatalogWriter for MockCatalogWriter { &self, _table_id: u32, _parallelism: PbTableParallelism, + _deferred: bool, ) -> Result<()> { todo!() } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 7b7d46260052b..223ee2238032c 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -841,9 +841,10 @@ impl DdlService for DdlServiceImpl { let table_id = req.get_table_id(); let parallelism = req.get_parallelism()?.clone(); + let deferred = req.get_deferred(); self.ddl_controller - .alter_parallelism(table_id, parallelism) + .alter_parallelism(table_id, parallelism, deferred) .await?; Ok(Response::new(AlterParallelismResponse {})) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index e4296f7f403c2..0f9a537729b98 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -354,9 +354,10 @@ impl DdlController { &self, table_id: u32, parallelism: PbTableParallelism, + deferred: bool, ) -> MetaResult<()> { self.stream_manager - .alter_table_parallelism(table_id, parallelism.into()) + .alter_table_parallelism(table_id, parallelism.into(), deferred) .await } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index ff1758aa20b96..8aef7a68d5483 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -737,6 +737,7 @@ impl GlobalStreamManager { &self, table_id: u32, parallelism: TableParallelism, + deferred: bool, ) -> MetaResult<()> { let MetadataManager::V1(mgr) = &self.metadata_manager else { unimplemented!("support alter table parallelism in v2"); @@ -754,15 +755,18 @@ impl GlobalStreamManager { .map(|node| node.id) .collect::>(); - let reschedules = self - .scale_controller - .as_ref() - .unwrap() - .generate_table_resize_plan(TableResizePolicy { - worker_ids, - table_parallelisms: vec![(table_id, parallelism)].into_iter().collect(), - }) - .await?; + let reschedules = if deferred { + HashMap::new() + } else { + self.scale_controller + .as_ref() + .unwrap() + .generate_table_resize_plan(TableResizePolicy { + worker_ids, + table_parallelisms: vec![(table_id, parallelism)].into_iter().collect(), + }) + .await? + }; self.reschedule_actors( reschedules, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 812c522e5e033..4d0b3b6b7a673 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -470,10 +470,12 @@ impl MetaClient { &self, table_id: u32, parallelism: PbTableParallelism, + deferred: bool, ) -> Result<()> { let request = AlterParallelismRequest { table_id, parallelism: Some(parallelism), + deferred, }; self.inner.alter_parallelism(request).await?; diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index b0a95cacc1e16..59f74a3ad3acb 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -85,8 +85,11 @@ pub enum AlterTableOperation { ChangeOwner { new_owner_name: Ident }, /// `SET SCHEMA ` SetSchema { new_schema_name: ObjectName }, - /// `SET PARALLELISM TO ` - SetParallelism { parallelism: SetVariableValue }, + /// `SET PARALLELISM TO [ DEFERRED ]` + SetParallelism { + parallelism: SetVariableValue, + deferred: bool, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -96,9 +99,10 @@ pub enum AlterIndexOperation { RenameIndex { index_name: ObjectName, }, - /// `SET PARALLELISM TO ` + /// `SET PARALLELISM TO [ DEFERRED ]` SetParallelism { parallelism: SetVariableValue, + deferred: bool, }, } @@ -115,9 +119,10 @@ pub enum AlterViewOperation { SetSchema { new_schema_name: ObjectName, }, - /// `SET PARALLELISM TO ` + /// `SET PARALLELISM TO [ DEFERRED ]` SetParallelism { parallelism: SetVariableValue, + deferred: bool, }, } @@ -134,9 +139,10 @@ pub enum AlterSinkOperation { SetSchema { new_schema_name: ObjectName, }, - /// `SET PARALLELISM TO ` + /// `SET PARALLELISM TO [ DEFERRED ]` SetParallelism { parallelism: SetVariableValue, + deferred: bool, }, } @@ -246,8 +252,16 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::SetSchema { new_schema_name } => { write!(f, "SET SCHEMA {}", new_schema_name) } - AlterTableOperation::SetParallelism { parallelism } => { - write!(f, "SET PARALLELISM TO {}", parallelism) + AlterTableOperation::SetParallelism { + parallelism, + deferred, + } => { + write!( + f, + "SET PARALLELISM TO {} {}", + parallelism, + if *deferred { " DEFERRED" } else { "" } + ) } } } @@ -259,8 +273,16 @@ impl fmt::Display for AlterIndexOperation { AlterIndexOperation::RenameIndex { index_name } => { write!(f, "RENAME TO {index_name}") } - AlterIndexOperation::SetParallelism { parallelism } => { - write!(f, "SET PARALLELISM TO {}", parallelism) + AlterIndexOperation::SetParallelism { + parallelism, + deferred, + } => { + write!( + f, + "SET PARALLELISM TO {} {}", + parallelism, + if *deferred { " DEFERRED" } else { "" } + ) } } } @@ -278,8 +300,16 @@ impl fmt::Display for AlterViewOperation { AlterViewOperation::SetSchema { new_schema_name } => { write!(f, "SET SCHEMA {}", new_schema_name) } - AlterViewOperation::SetParallelism { parallelism } => { - write!(f, "SET PARALLELISM TO {}", parallelism) + AlterViewOperation::SetParallelism { + parallelism, + deferred, + } => { + write!( + f, + "SET PARALLELISM TO {} {}", + parallelism, + if *deferred { " DEFERRED" } else { "" } + ) } } } @@ -297,8 +327,16 @@ impl fmt::Display for AlterSinkOperation { AlterSinkOperation::SetSchema { new_schema_name } => { write!(f, "SET SCHEMA {}", new_schema_name) } - AlterSinkOperation::SetParallelism { parallelism } => { - write!(f, "SET PARALLELISM TO {}", parallelism) + AlterSinkOperation::SetParallelism { + parallelism, + deferred, + } => { + write!( + f, + "SET PARALLELISM TO {} {}", + parallelism, + if *deferred { " DEFERRED" } else { "" } + ) } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index dae6529376c4c..b73cacaa6a402 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -183,6 +183,7 @@ define_keywords!( DECLARE, DEFAULT, DEFERRABLE, + DEFERRED, DELETE, DELIMITED, DENSE_RANK, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index dfb0a030125de..9257c28e94d39 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3017,7 +3017,12 @@ impl Parser { let value = self.parse_set_variable()?; - AlterTableOperation::SetParallelism { parallelism: value } + let deferred = self.parse_keyword(Keyword::DEFERRED); + + AlterTableOperation::SetParallelism { + parallelism: value, + deferred, + } } else { return self.expected("SCHEMA/PARALLELISM after SET", self.peek_token()); } @@ -3096,7 +3101,12 @@ impl Parser { let value = self.parse_set_variable()?; - AlterIndexOperation::SetParallelism { parallelism: value } + let deferred = self.parse_keyword(Keyword::DEFERRED); + + AlterIndexOperation::SetParallelism { + parallelism: value, + deferred, + } } else { return self.expected("PARALLELISM after SET", self.peek_token()); } @@ -3142,7 +3152,12 @@ impl Parser { let value = self.parse_set_variable()?; - AlterViewOperation::SetParallelism { parallelism: value } + let deferred = self.parse_keyword(Keyword::DEFERRED); + + AlterViewOperation::SetParallelism { + parallelism: value, + deferred, + } } else { return self.expected("SCHEMA/PARALLELISM after SET", self.peek_token()); } @@ -3194,8 +3209,12 @@ impl Parser { } let value = self.parse_set_variable()?; + let deferred = self.parse_keyword(Keyword::DEFERRED); - AlterSinkOperation::SetParallelism { parallelism: value } + AlterSinkOperation::SetParallelism { + parallelism: value, + deferred, + } } else { return self.expected("SCHEMA/PARALLELISM after SET", self.peek_token()); } From c17766e2ae8810cd7538db5aa0337af45105681b Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Sat, 27 Jan 2024 14:10:53 +0800 Subject: [PATCH 09/11] Update stream scaling logic & logging --- src/meta/src/stream/scale.rs | 3 ++ src/meta/src/stream/stream_manager.rs | 52 +++++++++++++++++++-------- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index e0a2fb390f42d..6d2dc256ec54a 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2435,12 +2435,15 @@ impl GlobalStreamManager { .await; })); + tracing::debug!("pausing tick lock in source manager"); let _source_pause_guard = self.source_manager.paused.lock().await; self.barrier_scheduler .run_config_change_command_with_pause(command) .await?; + tracing::info!("reschedule done"); + Ok(()) } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 8aef7a68d5483..14922ac3e3366 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -755,27 +755,51 @@ impl GlobalStreamManager { .map(|node| node.id) .collect::>(); - let reschedules = if deferred { - HashMap::new() - } else { + let table_parallelism_assignment = HashMap::from([(TableId::new(table_id), parallelism)]); + + if deferred { + tracing::debug!( + "deferred mode enabled for job {}, set the parallelism directly to {:?}", + table_id, + parallelism + ); self.scale_controller + .as_ref() + .unwrap() + .post_apply_reschedule(&HashMap::new(), &table_parallelism_assignment) + .await?; + } else { + let reschedules = self + .scale_controller .as_ref() .unwrap() .generate_table_resize_plan(TableResizePolicy { worker_ids, - table_parallelisms: vec![(table_id, parallelism)].into_iter().collect(), + table_parallelisms: table_parallelism_assignment + .iter() + .map(|(id, parallelism)| (id.table_id, *parallelism)) + .collect(), }) - .await? - }; + .await?; - self.reschedule_actors( - reschedules, - RescheduleOptions { - resolve_no_shuffle_upstream: false, - }, - Some(HashMap::from([(TableId::new(table_id), parallelism)])), - ) - .await?; + if reschedules.is_empty() { + tracing::debug!("empty reschedule plan generated for job {}, set the parallelism directly to {:?}", table_id, parallelism); + self.scale_controller + .as_ref() + .unwrap() + .post_apply_reschedule(&HashMap::new(), &table_parallelism_assignment) + .await?; + } else { + self.reschedule_actors( + reschedules, + RescheduleOptions { + resolve_no_shuffle_upstream: false, + }, + Some(table_parallelism_assignment), + ) + .await?; + } + }; Ok(()) } From f996893d2cd5ce41ac65b01350ed93c11fe57fb4 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 30 Jan 2024 21:56:59 +0800 Subject: [PATCH 10/11] Removed unused imports in recovery.rs --- src/meta/src/barrier/recovery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index f4d22baa4e170..eb89f74c998cb 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -22,7 +22,7 @@ use futures::stream::FuturesUnordered; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::hash::ParallelUnitId; + use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::PausedReason; @@ -35,7 +35,7 @@ use risingwave_pb::stream_service::{ use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; -use tracing::{debug, info, warn, Instrument}; +use tracing::{debug, warn, Instrument}; use uuid::Uuid; use super::TracedEpoch; From aabfbc321f296c849cbdd925c9f8b36fa6c952f0 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 30 Jan 2024 22:00:25 +0800 Subject: [PATCH 11/11] tmp Signed-off-by: Shanicky Chen --- src/meta/src/barrier/recovery.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index eb89f74c998cb..1958ab03a166d 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -22,7 +22,6 @@ use futures::stream::FuturesUnordered; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; - use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::PausedReason;