Skip to content

Commit

Permalink
add option
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Feb 2, 2024
1 parent d0e4ca7 commit 7ab6d16
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 11 deletions.
5 changes: 3 additions & 2 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use risingwave_meta_model_v2::table::TableType;
use risingwave_meta_model_v2::{
connection, database, function, index, object, object_dependency, schema, sink, source,
streaming_job, table, user_privilege, view, ActorId, ColumnCatalogArray, ConnectionId,
CreateType, DatabaseId, FunctionId, IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId,
SourceId, StreamSourceInfo, TableId, UserId,
CreateType, DatabaseId, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
PrivateLinkService, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism, TableId,
UserId,
};
use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::{
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::model::{
BTreeMapTransactionWrapper, MetadataModel, MetadataModelError, TableFragments, ValTransaction,
};
use crate::storage::MetaStore;
use crate::stream::CreateStreamingJobOption;

impl HummockManager {
pub(super) async fn build_compaction_group_manager(
Expand Down Expand Up @@ -106,7 +107,7 @@ impl HummockManager {
&self,
mv_table: Option<u32>,
mut internal_tables: Vec<u32>,
new_independent_compaction_group: bool,
create_stream_job_option: CreateStreamingJobOption,
) -> Result<Vec<StateTableId>> {
let mut pairs = vec![];
if let Some(mv_table) = mv_table {
Expand All @@ -116,7 +117,7 @@ impl HummockManager {
// materialized_view
pairs.push((
mv_table,
if new_independent_compaction_group {
if create_stream_job_option.new_independent_compaction_group {
CompactionGroupId::from(StaticCompactionGroupId::NewCompactionGroup)
} else {
CompactionGroupId::from(StaticCompactionGroupId::MaterializedView)
Expand All @@ -127,7 +128,7 @@ impl HummockManager {
for table_id in internal_tables {
pairs.push((
table_id,
if new_independent_compaction_group {
if create_stream_job_option.new_independent_compaction_group {
CompactionGroupId::from(StaticCompactionGroupId::NewCompactionGroup)
} else {
CompactionGroupId::from(StaticCompactionGroupId::StateDefault)
Expand Down
8 changes: 5 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism};
use crate::rpc::cloud_provider::AwsEc2Client;
use crate::stream::{
validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
CreateStreamingJobContext, GlobalStreamManagerRef, ReplaceTableContext, SourceManagerRef,
StreamFragmentGraph,
CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef,
ReplaceTableContext, SourceManagerRef, StreamFragmentGraph,
};
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -1420,7 +1420,9 @@ impl DdlController {
ddl_type: stream_job.into(),
replace_table_job_info,
// TODO: https://github.com/risingwavelabs/risingwave/issues/14793
new_independent_compaction_group: false,
option: CreateStreamingJobOption {
new_independent_compaction_group: false,
},
};

// 4. Mark tables as creating, including internal tables and the table of the stream job.
Expand Down
12 changes: 9 additions & 3 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ use crate::{MetaError, MetaResult};

pub type GlobalStreamManagerRef = Arc<GlobalStreamManager>;

pub struct CreateStreamingJobOption {

pub new_independent_compaction_group: bool,

}

/// [`CreateStreamingJobContext`] carries one-time infos for creating a streaming job.
///
/// Note: for better readability, keep this struct complete and immutable once created.
Expand Down Expand Up @@ -69,7 +75,7 @@ pub struct CreateStreamingJobContext {
/// Context provided for potential replace table, typically used when sinking into a table.
pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, TableFragments)>,

pub new_independent_compaction_group: bool,
pub option: CreateStreamingJobOption,
}

impl CreateStreamingJobContext {
Expand Down Expand Up @@ -408,7 +414,7 @@ impl GlobalStreamManager {
create_type,
ddl_type,
replace_table_job_info,
new_independent_compaction_group,
option,
}: CreateStreamingJobContext,
) -> MetaResult<()> {
let mut replace_table_command = None;
Expand All @@ -420,7 +426,7 @@ impl GlobalStreamManager {
.register_table_fragments(
mv_table_id,
internal_tables.keys().copied().collect(),
new_independent_compaction_group,
option,
)
.await?;
debug_assert_eq!(
Expand Down

0 comments on commit 7ab6d16

Please sign in to comment.