From 7ab6d16e03521e62ff8cf53ee83f9ab5efaef377 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Fri, 2 Feb 2024 17:05:07 +0800 Subject: [PATCH] add option --- src/meta/src/controller/catalog.rs | 5 +++-- .../src/hummock/manager/compaction_group_manager.rs | 7 ++++--- src/meta/src/rpc/ddl_controller.rs | 8 +++++--- src/meta/src/stream/stream_manager.rs | 12 +++++++++--- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 550edabbb9ed8..bc478b4184213 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -26,8 +26,9 @@ use risingwave_meta_model_v2::table::TableType; use risingwave_meta_model_v2::{ connection, database, function, index, object, object_dependency, schema, sink, source, streaming_job, table, user_privilege, view, ActorId, ColumnCatalogArray, ConnectionId, - CreateType, DatabaseId, FunctionId, IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, - SourceId, StreamSourceInfo, TableId, UserId, + CreateType, DatabaseId, FragmentId, FunctionId, IndexId, JobStatus, ObjectId, + PrivateLinkService, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism, TableId, + UserId, }; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index de27c8eca28dc..44708d3bda624 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -56,6 +56,7 @@ use crate::model::{ BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, TableFragments, ValTransaction, }; use crate::storage::MetaStore; +use crate::stream::CreateStreamingJobOption; impl HummockManager { pub(super) async fn build_compaction_group_manager( @@ -106,7 +107,7 @@ impl HummockManager { &self, mv_table: Option, mut internal_tables: Vec, - new_independent_compaction_group: bool, + create_stream_job_option: CreateStreamingJobOption, ) -> Result> { let mut pairs = vec![]; if let Some(mv_table) = mv_table { @@ -116,7 +117,7 @@ impl HummockManager { // materialized_view pairs.push(( mv_table, - if new_independent_compaction_group { + if create_stream_job_option.new_independent_compaction_group { CompactionGroupId::from(StaticCompactionGroupId::NewCompactionGroup) } else { CompactionGroupId::from(StaticCompactionGroupId::MaterializedView) @@ -127,7 +128,7 @@ impl HummockManager { for table_id in internal_tables { pairs.push(( table_id, - if new_independent_compaction_group { + if create_stream_job_option.new_independent_compaction_group { CompactionGroupId::from(StaticCompactionGroupId::NewCompactionGroup) } else { CompactionGroupId::from(StaticCompactionGroupId::StateDefault) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index eed67dbca850c..84ff4ea1de682 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -74,8 +74,8 @@ use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; use crate::rpc::cloud_provider::AwsEc2Client; use crate::stream::{ validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, - CreateStreamingJobContext, GlobalStreamManagerRef, ReplaceTableContext, SourceManagerRef, - StreamFragmentGraph, + CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef, + ReplaceTableContext, SourceManagerRef, StreamFragmentGraph, }; use crate::{MetaError, MetaResult}; @@ -1420,7 +1420,9 @@ impl DdlController { ddl_type: stream_job.into(), replace_table_job_info, // TODO: https://github.com/risingwavelabs/risingwave/issues/14793 - new_independent_compaction_group: false, + option: CreateStreamingJobOption { + new_independent_compaction_group: false, + }, }; // 4. Mark tables as creating, including internal tables and the table of the stream job. diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 4eda2c31bad7d..d2f5c6ca21f05 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -37,6 +37,12 @@ use crate::{MetaError, MetaResult}; pub type GlobalStreamManagerRef = Arc; +pub struct CreateStreamingJobOption { + + pub new_independent_compaction_group: bool, + +} + /// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job. /// /// Note: for better readability, keep this struct complete and immutable once created. @@ -69,7 +75,7 @@ pub struct CreateStreamingJobContext { /// Context provided for potential replace table, typically used when sinking into a table. pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, TableFragments)>, - pub new_independent_compaction_group: bool, + pub option: CreateStreamingJobOption, } impl CreateStreamingJobContext { @@ -408,7 +414,7 @@ impl GlobalStreamManager { create_type, ddl_type, replace_table_job_info, - new_independent_compaction_group, + option, }: CreateStreamingJobContext, ) -> MetaResult<()> { let mut replace_table_command = None; @@ -420,7 +426,7 @@ impl GlobalStreamManager { .register_table_fragments( mv_table_id, internal_tables.keys().copied().collect(), - new_independent_compaction_group, + option, ) .await?; debug_assert_eq!(