diff --git a/proto/catalog.proto b/proto/catalog.proto index c6d83f51612c4..4e410472128ea 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -170,6 +170,9 @@ message Sink { // Cluster version (tracked by git commit) when initialized/created optional string initialized_at_cluster_version = 22; optional string created_at_cluster_version = 23; + + // Whether it should use background ddl or block until backfill finishes. + CreateType create_type = 24; } message Subscription { diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 10ab490278952..a7bf6f980f36d 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -27,7 +27,9 @@ pub use external_table::*; pub use internal_table::*; use parse_display::Display; pub use physical_table::*; -use risingwave_pb::catalog::HandleConflictBehavior as PbHandleConflictBehavior; +use risingwave_pb::catalog::{ + CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior, +}; use risingwave_pb::plan_common::ColumnDescVersion; pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, Schema}; @@ -475,3 +477,31 @@ impl ConflictBehavior { } } } + +#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord)] +pub enum CreateType { + Foreground, + Background, +} + +impl Default for CreateType { + fn default() -> Self { + Self::Foreground + } +} + +impl CreateType { + pub fn from_proto(pb_create_type: PbCreateType) -> Self { + match pb_create_type { + PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground, + PbCreateType::Background => CreateType::Background, + } + } + + pub fn to_proto(self) -> PbCreateType { + match self { + CreateType::Foreground => PbCreateType::Foreground, + CreateType::Background => PbCreateType::Background, + } + } +} diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 37d35e1d7edde..ed17ebd1504ed 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -16,7 +16,7 @@ use std::collections::BTreeMap; use itertools::Itertools; use risingwave_common::catalog::{ - ColumnCatalog, ConnectionId, DatabaseId, SchemaId, TableId, UserId, + ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, TableId, UserId, }; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::stream_plan::PbSinkDesc; @@ -70,6 +70,9 @@ pub struct SinkDesc { /// See the same name field in `SinkWriterParam`. pub extra_partition_col_idx: Option, + + /// Whether the sink job should run in foreground or background. + pub create_type: CreateType, } impl SinkDesc { @@ -104,6 +107,7 @@ impl SinkDesc { target_table: self.target_table, created_at_cluster_version: None, initialized_at_cluster_version: None, + create_type: CreateType::Foreground, } } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index e6a654f75a5fd..c0c0da9bc2047 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -19,12 +19,14 @@ use std::collections::{BTreeMap, HashMap}; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::{ - ColumnCatalog, ConnectionId, DatabaseId, Field, Schema, SchemaId, TableId, UserId, + ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, Schema, SchemaId, TableId, UserId, OBJECT_ID_PLACEHOLDER, }; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_pb::catalog::{PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus}; +use risingwave_pb::catalog::{ + PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus, +}; use super::{ SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, @@ -307,6 +309,7 @@ pub struct SinkCatalog { pub created_at_cluster_version: Option, pub initialized_at_cluster_version: Option, + pub create_type: CreateType, } impl SinkCatalog { @@ -347,6 +350,7 @@ impl SinkCatalog { target_table: self.target_table.map(|table_id| table_id.table_id()), created_at_cluster_version: self.created_at_cluster_version.clone(), initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), + create_type: self.create_type.to_proto() as i32, } } @@ -388,6 +392,7 @@ impl SinkCatalog { impl From for SinkCatalog { fn from(pb: PbSink) -> Self { let sink_type = pb.get_sink_type().unwrap(); + let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground); let format_desc = match pb.format_desc { Some(f) => f.try_into().ok(), None => { @@ -438,6 +443,7 @@ impl From for SinkCatalog { target_table: pb.target_table.map(TableId::new), initialized_at_cluster_version: pb.initialized_at_cluster_version, created_at_cluster_version: pb.created_at_cluster_version, + create_type: CreateType::from_proto(create_type), } } } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 59bc3f8b4b343..145d0c78ea479 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet}; use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::catalog::{ - ColumnCatalog, ConflictBehavior, Field, Schema, TableDesc, TableId, TableVersionId, + ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, TableDesc, TableId, TableVersionId, }; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; @@ -163,38 +163,6 @@ pub struct TableCatalog { pub initialized_at_cluster_version: Option, } -// How the stream job was created will determine -// whether they are persisted. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub enum CreateType { - Background, - Foreground, -} - -#[cfg(test)] -impl Default for CreateType { - fn default() -> Self { - Self::Foreground - } -} - -impl CreateType { - fn from_prost(prost: PbCreateType) -> Self { - match prost { - PbCreateType::Background => Self::Background, - PbCreateType::Foreground => Self::Foreground, - PbCreateType::Unspecified => unreachable!(), - } - } - - pub(crate) fn to_prost(self) -> PbCreateType { - match self { - Self::Background => PbCreateType::Background, - Self::Foreground => PbCreateType::Foreground, - } - } -} - #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum TableType { /// Tables created by `CREATE TABLE`. @@ -443,7 +411,7 @@ impl TableCatalog { created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0), cleaned_by_watermark: self.cleaned_by_watermark, stream_job_status: PbStreamJobStatus::Creating.into(), - create_type: self.create_type.to_prost().into(), + create_type: self.create_type.to_proto().into(), description: self.description.clone(), incoming_sinks: self.incoming_sinks.clone(), created_at_cluster_version: self.created_at_cluster_version.clone(), @@ -569,7 +537,7 @@ impl From for TableCatalog { created_at_epoch: tb.created_at_epoch.map(Epoch::from), initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from), cleaned_by_watermark: tb.cleaned_by_watermark, - create_type: CreateType::from_prost(create_type), + create_type: CreateType::from_proto(create_type), description: tb.description, incoming_sinks: tb.incoming_sinks.clone(), created_at_cluster_version: tb.created_at_cluster_version.clone(), diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 792eaf4d7a068..3e7e24582f9e4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -18,7 +18,9 @@ use std::num::NonZeroU32; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, TableId, OBJECT_ID_PLACEHOLDER}; +use risingwave_common::catalog::{ + ColumnCatalog, ConflictBehavior, CreateType, TableId, OBJECT_ID_PLACEHOLDER, +}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; @@ -28,7 +30,7 @@ use super::stream::prelude::*; use super::stream::StreamPlanRef; use super::utils::{childless_record, Distill}; use super::{reorganize_elements_id, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode}; -use crate::catalog::table_catalog::{CreateType, TableCatalog, TableType, TableVersion}; +use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion}; use crate::error::Result; use crate::optimizer::plan_node::derive::derive_pk; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 7a724a20be46a..e0bd1eb225e07 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -20,7 +20,7 @@ use fixedbitset::FixedBitSet; use icelake::types::Transform; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::{ColumnCatalog, TableId}; +use risingwave_common::catalog::{ColumnCatalog, CreateType, TableId}; use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_connector::match_sink_name_str; @@ -371,6 +371,11 @@ impl StreamSink { }; let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?; let distribution_key = input.distribution().dist_column_indices().to_vec(); + let create_type = if input.ctx().session_ctx().config().background_ddl() { + CreateType::Background + } else { + CreateType::Foreground + }; let sink_desc = SinkDesc { id: SinkId::placeholder(), name, @@ -386,6 +391,7 @@ impl StreamSink { format_desc, target_table, extra_partition_col_idx, + create_type, }; Ok((input, sink_desc)) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 6d86e895bcb18..1c979dcee2ee8 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -20,14 +20,15 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode}; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ConflictBehavior, Field, FieldDisplay, Schema, OBJECT_ID_PLACEHOLDER, + ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Field, FieldDisplay, Schema, + OBJECT_ID_PLACEHOLDER, }; use risingwave_common::constants::log_store::v2::{ KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX, }; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use crate::catalog::table_catalog::{CreateType, TableType}; +use crate::catalog::table_catalog::TableType; use crate::catalog::{ColumnId, TableCatalog, TableId}; use crate::optimizer::property::{Cardinality, Order, RequiredDist}; use crate::utils::{Condition, IndexSet}; diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 2aa52fa96cebb..40f2273da4847 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -474,7 +474,7 @@ pub(crate) mod tests { WorkerNodeManager, WorkerNodeSelector, }; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SUPER_USER_ID, + ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, DEFAULT_SUPER_USER_ID, }; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::types::DataType; @@ -485,7 +485,7 @@ pub(crate) mod tests { use crate::catalog::catalog_service::CatalogReader; use crate::catalog::root_catalog::Catalog; - use crate::catalog::table_catalog::{CreateType, TableType}; + use crate::catalog::table_catalog::TableType; use crate::expr::InputRef; use crate::optimizer::plan_node::{ generic, BatchExchange, BatchFilter, BatchHashJoin, EqJoinPredicate, LogicalScan, ToBatch, diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 42d6b27771edb..eff4a31c9e90b 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -206,6 +206,7 @@ impl From> for PbSink { target_table: value.0.target_table.map(|id| id as _), initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, + create_type: PbCreateType::Foreground as _, } } }