Skip to content

Commit

Permalink
decouple cdc backfill table addi cols
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed May 24, 2024
1 parent 0089cc0 commit 4cb7cb6
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 25 deletions.
32 changes: 21 additions & 11 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_pb::plan_common::{
};

use crate::error::ConnectorResult;
use crate::source::cdc::{MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR};
use crate::source::cdc::MONGODB_CDC_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR,
S3_CONNECTOR,
Expand Down Expand Up @@ -56,21 +56,29 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
(OPENDAL_S3_CONNECTOR, HashSet::from(["file", "offset"])),
(S3_CONNECTOR, HashSet::from(["file", "offset"])),
(GCS_CONNECTOR, HashSet::from(["file", "offset"])),
(
MYSQL_CDC_CONNECTOR,
HashSet::from(["timestamp", "partition", "offset"]),
),
(
POSTGRES_CDC_CONNECTOR,
HashSet::from(["timestamp", "partition", "offset"]),
),
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
HashSet::from(["timestamp", "partition", "offset"]),
),
])
});

// For CDC backfill table, the additional columns are added to the schema of `StreamCdcScan`
pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
LazyLock::new(|| Some(HashSet::from(["timestamp"])));

pub fn get_supported_additional_columns(
connector_name: &str,
is_cdc_backfill: bool,
) -> Option<&HashSet<&'static str>> {
if is_cdc_backfill {
CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref()
} else {
COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name)
}
}

pub fn gen_default_addition_col_name(
connector_name: &str,
additional_col_type: &str,
Expand Down Expand Up @@ -100,9 +108,10 @@ pub fn build_additional_column_catalog(
inner_field_name: Option<&str>,
data_type: Option<&str>,
reject_unknown_connector: bool,
is_cdc_backfill_table: bool,
) -> ConnectorResult<ColumnCatalog> {
let compatible_columns = match (
COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name),
get_supported_additional_columns(connector_name, is_cdc_backfill_table),
reject_unknown_connector,
) {
(Some(compat_cols), _) => compat_cols,
Expand Down Expand Up @@ -203,7 +212,7 @@ pub fn build_additional_column_catalog(
/// ## Returns
/// - `columns_exist`: whether 1. `partition`/`file` and 2. `offset` columns are included in `columns`.
/// - `additional_columns`: The `ColumnCatalog` for `partition`/`file` and `offset` columns.
pub fn add_partition_offset_cols(
pub fn source_add_partition_offset_cols(
columns: &[ColumnCatalog],
connector_name: &str,
) -> ([bool; 2], [ColumnCatalog; 2]) {
Expand Down Expand Up @@ -232,6 +241,7 @@ pub fn add_partition_offset_cols(
None,
None,
false,
false,
)
.unwrap(),
)
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/reader/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_pb::plan_common::PbColumnCatalog;
use super::fs_reader::FsSourceReader;
use super::reader::SourceReader;
use crate::error::ConnectorResult;
use crate::parser::additional_columns::add_partition_offset_cols;
use crate::parser::additional_columns::source_add_partition_offset_cols;
use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
use crate::source::monitor::SourceMetrics;
use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY};
Expand Down Expand Up @@ -98,7 +98,7 @@ impl SourceDescBuilder {
.map(|c| ColumnCatalog::from(c.clone()))
.collect_vec();
let (columns_exist, additional_columns) =
add_partition_offset_cols(&columns, &connector_name);
source_add_partition_offset_cols(&columns, &connector_name);

let mut columns: Vec<_> = self
.columns
Expand Down
16 changes: 11 additions & 5 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_common::catalog::{
};
use risingwave_common::types::DataType;
use risingwave_connector::parser::additional_columns::{
build_additional_column_catalog, COMPATIBLE_ADDITIONAL_COLUMNS,
build_additional_column_catalog, get_supported_additional_columns,
};
use risingwave_connector::parser::{
schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig,
Expand Down Expand Up @@ -551,12 +551,11 @@ pub fn handle_addition_columns(
with_properties: &HashMap<String, String>,
mut additional_columns: IncludeOption,
columns: &mut Vec<ColumnCatalog>,
is_cdc_backfill_table: bool,
) -> Result<()> {
let connector_name = with_properties.get_connector().unwrap(); // there must be a connector in source

if COMPATIBLE_ADDITIONAL_COLUMNS
.get(connector_name.as_str())
.is_none()
if get_supported_additional_columns(connector_name.as_str(), is_cdc_backfill_table).is_none()
&& !additional_columns.is_empty()
{
return Err(RwError::from(ProtocolError(format!(
Expand Down Expand Up @@ -595,6 +594,7 @@ pub fn handle_addition_columns(
item.inner_field.as_deref(),
data_type_name.as_deref(),
true,
is_cdc_backfill_table,
)?);
}

Expand Down Expand Up @@ -911,6 +911,7 @@ fn check_and_add_timestamp_column(
None,
None,
true,
false,
)
.unwrap();
catalog.is_hidden = true;
Expand Down Expand Up @@ -1355,7 +1356,12 @@ pub async fn bind_create_source(
)?;

// add additional columns before bind pk, because `format upsert` requires the key column
handle_addition_columns(&with_properties, include_column_options, &mut columns)?;
handle_addition_columns(
&with_properties,
include_column_options,
&mut columns,
false,
)?;
// compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
if is_create_source {
// must behind `handle_addition_columns`
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
let mut columns = bind_sql_columns(&column_defs)?;
let with_properties = source.with_properties.clone().into_iter().collect();
// append additional columns to the end
handle_addition_columns(&with_properties, include_column_options, &mut columns)?;
handle_addition_columns(&with_properties, include_column_options, &mut columns, true)?;

for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
Expand Down
8 changes: 5 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::parser::additional_columns::add_partition_offset_cols;
use risingwave_connector::parser::additional_columns::source_add_partition_offset_cols;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{PbStreamSource, SourceNode};

Expand Down Expand Up @@ -46,8 +46,10 @@ impl StreamSource {
if let Some(source_catalog) = &core.catalog
&& source_catalog.info.is_shared()
{
let (columns_exist, additional_columns) =
add_partition_offset_cols(&core.column_catalog, &source_catalog.connector_name());
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&core.column_catalog,
&source_catalog.connector_name(),
);
for (existed, mut c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
c.is_hidden = true;
if !existed {
Expand Down
8 changes: 5 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::catalog::Field;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::parser::additional_columns::add_partition_offset_cols;
use risingwave_connector::parser::additional_columns::source_add_partition_offset_cols;
use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
use risingwave_pb::stream_plan::PbStreamNode;

Expand Down Expand Up @@ -58,8 +58,10 @@ impl StreamSourceScan {
if let Some(source_catalog) = &core.catalog
&& source_catalog.info.is_shared()
{
let (columns_exist, additional_columns) =
add_partition_offset_cols(&core.column_catalog, &source_catalog.connector_name());
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&core.column_catalog,
&source_catalog.connector_name(),
);
for (existed, mut c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
c.is_hidden = true;
if !existed {
Expand Down

0 comments on commit 4cb7cb6

Please sign in to comment.