Skip to content

Commit

Permalink
feat(frontend): update SinkCatalog to support background ddl (#16156)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Apr 9, 2024
1 parent 3f50c60 commit 5ee8bb8
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 46 deletions.
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ message Sink {
// Cluster version (tracked by git commit) when initialized/created
optional string initialized_at_cluster_version = 22;
optional string created_at_cluster_version = 23;

// Whether it should use background ddl or block until backfill finishes.
CreateType create_type = 24;
}

message Subscription {
Expand Down
32 changes: 31 additions & 1 deletion src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ pub use external_table::*;
pub use internal_table::*;
use parse_display::Display;
pub use physical_table::*;
use risingwave_pb::catalog::HandleConflictBehavior as PbHandleConflictBehavior;
use risingwave_pb::catalog::{
CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
};
use risingwave_pb::plan_common::ColumnDescVersion;
pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, Schema};

Expand Down Expand Up @@ -475,3 +477,31 @@ impl ConflictBehavior {
}
}
}

#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
pub enum CreateType {
Foreground,
Background,
}

impl Default for CreateType {
fn default() -> Self {
Self::Foreground
}
}

impl CreateType {
pub fn from_proto(pb_create_type: PbCreateType) -> Self {
match pb_create_type {
PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
PbCreateType::Background => CreateType::Background,
}
}

pub fn to_proto(self) -> PbCreateType {
match self {
CreateType::Foreground => PbCreateType::Foreground,
CreateType::Background => PbCreateType::Background,
}
}
}
6 changes: 5 additions & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::BTreeMap;

use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, DatabaseId, SchemaId, TableId, UserId,
ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, TableId, UserId,
};
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::stream_plan::PbSinkDesc;
Expand Down Expand Up @@ -70,6 +70,9 @@ pub struct SinkDesc {

/// See the same name field in `SinkWriterParam`.
pub extra_partition_col_idx: Option<usize>,

/// Whether the sink job should run in foreground or background.
pub create_type: CreateType,
}

impl SinkDesc {
Expand Down Expand Up @@ -104,6 +107,7 @@ impl SinkDesc {
target_table: self.target_table,
created_at_cluster_version: None,
initialized_at_cluster_version: None,
create_type: CreateType::Foreground,
}
}

Expand Down
10 changes: 8 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ use std::collections::{BTreeMap, HashMap};
use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, DatabaseId, Field, Schema, SchemaId, TableId, UserId,
ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, Schema, SchemaId, TableId, UserId,
OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus};
use risingwave_pb::catalog::{
PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
};

use super::{
SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
Expand Down Expand Up @@ -307,6 +309,7 @@ pub struct SinkCatalog {

pub created_at_cluster_version: Option<String>,
pub initialized_at_cluster_version: Option<String>,
pub create_type: CreateType,
}

impl SinkCatalog {
Expand Down Expand Up @@ -347,6 +350,7 @@ impl SinkCatalog {
target_table: self.target_table.map(|table_id| table_id.table_id()),
created_at_cluster_version: self.created_at_cluster_version.clone(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
create_type: self.create_type.to_proto() as i32,
}
}

Expand Down Expand Up @@ -388,6 +392,7 @@ impl SinkCatalog {
impl From<PbSink> for SinkCatalog {
fn from(pb: PbSink) -> Self {
let sink_type = pb.get_sink_type().unwrap();
let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
let format_desc = match pb.format_desc {
Some(f) => f.try_into().ok(),
None => {
Expand Down Expand Up @@ -438,6 +443,7 @@ impl From<PbSink> for SinkCatalog {
target_table: pb.target_table.map(TableId::new),
initialized_at_cluster_version: pb.initialized_at_cluster_version,
created_at_cluster_version: pb.created_at_cluster_version,
create_type: CreateType::from_proto(create_type),
}
}
}
Expand Down
38 changes: 3 additions & 35 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet};
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, Field, Schema, TableDesc, TableId, TableVersionId,
ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, TableDesc, TableId, TableVersionId,
};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
Expand Down Expand Up @@ -163,38 +163,6 @@ pub struct TableCatalog {
pub initialized_at_cluster_version: Option<String>,
}

// How the stream job was created will determine
// whether they are persisted.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum CreateType {
Background,
Foreground,
}

#[cfg(test)]
impl Default for CreateType {
fn default() -> Self {
Self::Foreground
}
}

impl CreateType {
fn from_prost(prost: PbCreateType) -> Self {
match prost {
PbCreateType::Background => Self::Background,
PbCreateType::Foreground => Self::Foreground,
PbCreateType::Unspecified => unreachable!(),
}
}

pub(crate) fn to_prost(self) -> PbCreateType {
match self {
Self::Background => PbCreateType::Background,
Self::Foreground => PbCreateType::Foreground,
}
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum TableType {
/// Tables created by `CREATE TABLE`.
Expand Down Expand Up @@ -443,7 +411,7 @@ impl TableCatalog {
created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
cleaned_by_watermark: self.cleaned_by_watermark,
stream_job_status: PbStreamJobStatus::Creating.into(),
create_type: self.create_type.to_prost().into(),
create_type: self.create_type.to_proto().into(),
description: self.description.clone(),
incoming_sinks: self.incoming_sinks.clone(),
created_at_cluster_version: self.created_at_cluster_version.clone(),
Expand Down Expand Up @@ -569,7 +537,7 @@ impl From<PbTable> for TableCatalog {
created_at_epoch: tb.created_at_epoch.map(Epoch::from),
initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
cleaned_by_watermark: tb.cleaned_by_watermark,
create_type: CreateType::from_prost(create_type),
create_type: CreateType::from_proto(create_type),
description: tb.description,
incoming_sinks: tb.incoming_sinks.clone(),
created_at_cluster_version: tb.created_at_cluster_version.clone(),
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use std::num::NonZeroU32;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, TableId, OBJECT_ID_PLACEHOLDER};
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, TableId, OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
Expand All @@ -28,7 +30,7 @@ use super::stream::prelude::*;
use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill};
use super::{reorganize_elements_id, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::catalog::table_catalog::{CreateType, TableCatalog, TableType, TableVersion};
use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
use crate::error::Result;
use crate::optimizer::plan_node::derive::derive_pk;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
Expand Down
8 changes: 7 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use fixedbitset::FixedBitSet;
use icelake::types::Transform;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnCatalog, TableId};
use risingwave_common::catalog::{ColumnCatalog, CreateType, TableId};
use risingwave_common::types::{DataType, StructType};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_connector::match_sink_name_str;
Expand Down Expand Up @@ -371,6 +371,11 @@ impl StreamSink {
};
let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
let distribution_key = input.distribution().dist_column_indices().to_vec();
let create_type = if input.ctx().session_ctx().config().background_ddl() {
CreateType::Background
} else {
CreateType::Foreground
};
let sink_desc = SinkDesc {
id: SinkId::placeholder(),
name,
Expand All @@ -386,6 +391,7 @@ impl StreamSink {
format_desc,
target_table,
extra_partition_col_idx,
create_type,
};
Ok((input, sink_desc))
}
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, ConflictBehavior, Field, FieldDisplay, Schema, OBJECT_ID_PLACEHOLDER,
ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Field, FieldDisplay, Schema,
OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::constants::log_store::v2::{
KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};

use crate::catalog::table_catalog::{CreateType, TableType};
use crate::catalog::table_catalog::TableType;
use crate::catalog::{ColumnId, TableCatalog, TableId};
use crate::optimizer::property::{Cardinality, Order, RequiredDist};
use crate::utils::{Condition, IndexSet};
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ pub(crate) mod tests {
WorkerNodeManager, WorkerNodeSelector,
};
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SUPER_USER_ID,
ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, DEFAULT_SUPER_USER_ID,
};
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::types::DataType;
Expand All @@ -485,7 +485,7 @@ pub(crate) mod tests {

use crate::catalog::catalog_service::CatalogReader;
use crate::catalog::root_catalog::Catalog;
use crate::catalog::table_catalog::{CreateType, TableType};
use crate::catalog::table_catalog::TableType;
use crate::expr::InputRef;
use crate::optimizer::plan_node::{
generic, BatchExchange, BatchFilter, BatchHashJoin, EqJoinPredicate, LogicalScan, ToBatch,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ impl From<ObjectModel<sink::Model>> for PbSink {
target_table: value.0.target_table.map(|id| id as _),
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
create_type: PbCreateType::Foreground as _,
}
}
}
Expand Down

0 comments on commit 5ee8bb8

Please sign in to comment.