Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: additional column logic #18633

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ FORMAT UPSERT ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');


statement ok
CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with (
CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id))
INCLUDE TIMESTAMP
with (
connector = 'kafka',
kafka.topic = 'debezium_non_compact_avro_json',
kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
Expand Down
169 changes: 122 additions & 47 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_pb::plan_common::{
AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnTimestamp,
AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName,
AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName, FormatType,
};

use crate::error::ConnectorResult;
Expand All @@ -39,54 +39,122 @@ use crate::source::{
pub static COMMON_COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
LazyLock::new(|| HashSet::from(["partition", "offset"]));

pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet<&'static str>>> =
LazyLock::new(|| {
HashMap::from([
(
KAFKA_CONNECTOR,
HashSet::from([
"key",
"timestamp",
"partition",
"offset",
"header",
"payload",
]),
),
(
PULSAR_CONNECTOR,
HashSet::from(["key", "partition", "offset", "payload"]),
),
(
KINESIS_CONNECTOR,
HashSet::from(["key", "partition", "offset", "timestamp", "payload"]),
),
(
OPENDAL_S3_CONNECTOR,
HashSet::from(["file", "offset", "payload"]),
),
(GCS_CONNECTOR, HashSet::from(["file", "offset", "payload"])),
(
AZBLOB_CONNECTOR,
HashSet::from(["file", "offset", "payload"]),
),
(
POSIX_FS_CONNECTOR,
HashSet::from(["file", "offset", "payload"]),
),
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
static KAFKA_COMMON_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
HashSet::from([
"key",
"timestamp",
"partition",
"offset",
"header",
"payload",
])
});
static PULSAR_COMMON_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
LazyLock::new(|| HashSet::from(["key", "partition", "offset", "payload"]));
static KINESIS_COMMON_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
LazyLock::new(|| HashSet::from(["key", "partition", "offset", "timestamp", "payload"]));
static FS_COMMON_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
LazyLock::new(|| HashSet::from(["file", "offset", "payload"]));

pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<
HashMap<&'static str, HashMap<FormatType, HashSet<&'static str>>>,
> = LazyLock::new(|| {
HashMap::from([
(
KAFKA_CONNECTOR,
HashMap::from([
(FormatType::Plain, KAFKA_COMMON_ADDITIONAL_COLUMNS.clone()),
(FormatType::Upsert, KAFKA_COMMON_ADDITIONAL_COLUMNS.clone()),
(FormatType::Maxwell, KAFKA_COMMON_ADDITIONAL_COLUMNS.clone()),
(
// does not include `key` column because it is specified in the Debezium protocol
FormatType::Debezium,
HashSet::from(["timestamp", "partition", "offset", "header", "payload"]),
),
(FormatType::Canal, KAFKA_COMMON_ADDITIONAL_COLUMNS.clone()),
(
FormatType::DebeziumMongo,
HashSet::from(["timestamp", "partition", "offset", "header", "payload"]),
),
]),
),
(
PULSAR_CONNECTOR,
HashMap::from([
(FormatType::Plain, PULSAR_COMMON_ADDITIONAL_COLUMNS.clone()),
(FormatType::Upsert, PULSAR_COMMON_ADDITIONAL_COLUMNS.clone()),
(
FormatType::Maxwell,
PULSAR_COMMON_ADDITIONAL_COLUMNS.clone(),
),
(
// does not include `key` column because it is specified in the Debezium protocol
FormatType::Debezium,
HashSet::from(["partition", "offset", "payload"]),
),
(FormatType::Canal, PULSAR_COMMON_ADDITIONAL_COLUMNS.clone()),
(
FormatType::DebeziumMongo,
HashSet::from(["partition", "offset", "payload"]),
),
]),
),
(
KINESIS_CONNECTOR,
HashMap::from([
(FormatType::Plain, KINESIS_COMMON_ADDITIONAL_COLUMNS.clone()),
(
FormatType::Upsert,
KINESIS_COMMON_ADDITIONAL_COLUMNS.clone(),
),
(
FormatType::Maxwell,
KINESIS_COMMON_ADDITIONAL_COLUMNS.clone(),
),
(
// does not include `key` column because it is specified in the Debezium protocol
FormatType::Debezium,
HashSet::from(["timestamp", "partition", "offset", "payload"]),
),
(FormatType::Canal, KINESIS_COMMON_ADDITIONAL_COLUMNS.clone()),
(
FormatType::DebeziumMongo,
HashSet::from(["timestamp", "partition", "offset", "payload"]),
),
]),
),
(
OPENDAL_S3_CONNECTOR,
HashMap::from([(FormatType::Plain, FS_COMMON_ADDITIONAL_COLUMNS.clone())]),
),
(
GCS_CONNECTOR,
HashMap::from([(FormatType::Plain, FS_COMMON_ADDITIONAL_COLUMNS.clone())]),
),
(
AZBLOB_CONNECTOR,
HashMap::from([(FormatType::Plain, FS_COMMON_ADDITIONAL_COLUMNS.clone())]),
),
(
POSIX_FS_CONNECTOR,
HashMap::from([(FormatType::Plain, FS_COMMON_ADDITIONAL_COLUMNS.clone())]),
),
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
HashMap::from([(
FormatType::DebeziumMongo,
HashSet::from([
"timestamp",
"partition",
"offset",
"database_name",
"collection_name",
]),
),
])
});
)]),
),
])
});

// For CDC backfill table, the additional columns are added to the schema of `StreamCdcScan`
pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
Expand All @@ -99,14 +167,17 @@ pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'stat
]))
});

pub fn get_supported_additional_columns(
connector_name: &str,
pub fn get_supported_additional_columns<'a>(
connector_name: &'a str,
format: &FormatType,
is_cdc_backfill: bool,
) -> Option<&HashSet<&'static str>> {
) -> Option<&'a HashSet<&'static str>> {
if is_cdc_backfill {
CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref()
} else {
COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name)
COMPATIBLE_ADDITIONAL_COLUMNS
.get(connector_name)
.and_then(|map| map.get(format))
Comment on lines +178 to +180
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think add format to the hashmap makes it harder to manage.

Suggested change
COMPATIBLE_ADDITIONAL_COLUMNS
.get(connector_name)
.and_then(|map| map.get(format))
let compactible_additional_columns = COMPATIBLE_ADDITIONAL_COLUMNS
.get(connector_name)?;
match format { // check format ... }

may be better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because "connector metadata" and "format metadata" are relatively orthogonal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense.
But I am not sure whether we allow including key for debezium format. This is the only concern I use the nested structure.

}
}

Expand Down Expand Up @@ -140,9 +211,10 @@ pub fn build_additional_column_desc(
data_type: Option<&str>,
reject_unknown_connector: bool,
is_cdc_backfill_table: bool,
format_type: &FormatType,
) -> ConnectorResult<ColumnDesc> {
let compatible_columns = match (
get_supported_additional_columns(connector_name, is_cdc_backfill_table),
get_supported_additional_columns(connector_name, format_type, is_cdc_backfill_table),
reject_unknown_connector,
) {
(Some(compat_cols), _) => compat_cols,
Expand Down Expand Up @@ -277,13 +349,15 @@ pub fn build_additional_column_desc(
pub fn source_add_partition_offset_cols(
columns: &[ColumnCatalog],
connector_name: &str,
format_type: &FormatType,
) -> ([bool; 2], [ColumnDesc; 2]) {
let mut columns_exist = [false; 2];
let mut last_column_id = max_column_id(columns);

let additional_columns: Vec<_> = {
let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
.get(connector_name)
.and_then(|format_col_mapping| format_col_mapping.get(format_type))
.unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
["partition", "file", "offset"]
.iter()
Expand All @@ -300,6 +374,7 @@ pub fn source_add_partition_offset_cols(
None,
false,
false,
format_type,
)
.unwrap(),
)
Expand Down
7 changes: 5 additions & 2 deletions src/connector/src/source/reader/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ impl SourceDescBuilder {
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.to_lowercase())
.unwrap();
let (columns_exist, additional_columns) =
source_add_partition_offset_cols(&self.columns, &connector_name);
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&self.columns,
&connector_name,
self.source_info.get_format().as_ref().unwrap(),
);

let mut columns: Vec<_> = self
.columns
Expand Down
29 changes: 20 additions & 9 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,14 @@ pub fn handle_addition_columns(
) -> Result<()> {
let connector_name = with_properties.get_connector().unwrap(); // there must be a connector in source

if get_supported_additional_columns(connector_name.as_str(), is_cdc_backfill_table).is_none()
// CDC source will not pass the source_schema, and get the additional column set from
// `CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS`, does not check FORMAT compatibility.
// So set to FORMAT::Unspecified in this case.
let format_type = &source_schema
.and_then(|schema| format_to_prost(&schema.format).into())
.unwrap_or(FormatType::Unspecified);
if get_supported_additional_columns(connector_name.as_str(), format_type, is_cdc_backfill_table)
.is_none()
&& !additional_columns.is_empty()
{
return Err(RwError::from(ProtocolError(format!(
Expand Down Expand Up @@ -667,6 +674,7 @@ pub fn handle_addition_columns(
data_type_name.as_deref(),
true,
is_cdc_backfill_table,
format_type,
)?;
columns.push(ColumnCatalog::visible(col));
}
Expand Down Expand Up @@ -899,12 +907,6 @@ pub(crate) async fn bind_source_pk(
}

(Format::Debezium, Encode::Json) => {
if !additional_column_names.is_empty() {
return Err(RwError::from(ProtocolError(format!(
"FORMAT DEBEZIUM forbids additional columns, but got {:?}",
additional_column_names
))));
}
Comment on lines -902 to -907
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other branches not covered: "(debezium, avro)", "(maxwell, json)", "(canal, json)", etc.

if !sql_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with FORMAT DEBEZIUM."
Expand Down Expand Up @@ -990,7 +992,11 @@ pub(crate) async fn bind_source_pk(
}

// Add a hidden column `_rw_kafka_timestamp` to each message from Kafka source.
fn check_and_add_timestamp_column(with_properties: &WithOptions, columns: &mut Vec<ColumnCatalog>) {
fn check_and_add_timestamp_column(
with_properties: &WithOptions,
columns: &mut Vec<ColumnCatalog>,
format_type: &FormatType,
) {
if with_properties.is_kafka_connector() {
if columns.iter().any(|col| {
matches!(
Expand All @@ -1012,6 +1018,7 @@ fn check_and_add_timestamp_column(with_properties: &WithOptions, columns: &mut V
None,
true,
false,
format_type,
)
.unwrap();
columns.push(ColumnCatalog::hidden(col));
Expand Down Expand Up @@ -1542,7 +1549,11 @@ pub async fn bind_create_source_or_table_with_connector(
// compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
if is_create_source {
// must behind `handle_addition_columns`
check_and_add_timestamp_column(&with_properties, &mut columns);
check_and_add_timestamp_column(
&with_properties,
&mut columns,
&format_to_prost(&source_schema.format),
);
}

// resolve privatelink connection for Kafka
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl StreamSource {
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&core.column_catalog,
&source_catalog.connector_name(),
&source_catalog.info.get_format().unwrap(),
);
for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
if !existed {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl StreamSourceScan {
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&core.column_catalog,
&source_catalog.connector_name(),
source_catalog.info.get_format().as_ref().unwrap(),
);
for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
if !existed {
Expand Down
Loading