Skip to content

Commit

Permalink
fix(planner): correctly handle hidden columns for SourceBackfill (#19578
Browse files Browse the repository at this point in the history
)

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 28, 2024
1 parent 633b6aa commit c551bd2
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 104 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ repos:
rev: v2.3.0
hooks:
- id: end-of-file-fixer
exclude: 'src/frontend/planner_test/tests/testdata/.*'
- id: trailing-whitespace
- repo: https://github.com/crate-ci/typos
rev: v1.23.1
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ python3 -m pip install --break-system-packages requests protobuf fastavro conflu
apt-get -y install jq

echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_meta=info" \
risedev ci-start ci-inline-source-test
risedev slt './e2e_test/source_inline/**/*.slt' -j16
risedev slt './e2e_test/source_inline/**/*.slt.serial'
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/source_inline/kafka/issue_19563.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ explain create materialized view mv1 as select v1 from kafkasource where v1 betw
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamDynamicFilter { predicate: (v1 <= $expr1), output: [v1, _row_id], cleaned_by_watermark: true }
├─StreamProject { exprs: [v1, _row_id], output_watermarks: [v1] }
│ └─StreamDynamicFilter { predicate: (v1 >= now), output_watermarks: [v1], output: [v1, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset], cleaned_by_watermark: true }
│ ├─StreamRowIdGen { row_id_index: 2 }
│ │ └─StreamSourceScan { columns: [v1, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
│ └─StreamDynamicFilter { predicate: (v1 >= now), output_watermarks: [v1], output: [v1, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], cleaned_by_watermark: true }
│ ├─StreamRowIdGen { row_id_index: 4 }
│ │ └─StreamSourceScan { columns: [v1, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow
└─StreamExchange { dist: Broadcast }
Expand Down
14 changes: 12 additions & 2 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,20 @@ pub fn build_additional_column_desc(
pub fn source_add_partition_offset_cols(
columns: &[ColumnCatalog],
connector_name: &str,
skip_col_id: bool,
) -> ([bool; 2], [ColumnDesc; 2]) {
let mut columns_exist = [false; 2];

let mut last_column_id = max_column_id(columns);
let mut assign_col_id = || {
if skip_col_id {
// col id will be filled outside later. Here just use a placeholder.
ColumnId::placeholder()
} else {
last_column_id = last_column_id.next();
last_column_id
}
};

let additional_columns: Vec<_> = {
let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
Expand All @@ -292,11 +303,10 @@ pub fn source_add_partition_offset_cols(
["partition", "file", "offset"]
.iter()
.filter_map(|col_type| {
last_column_id = last_column_id.next();
if compat_col_types.contains(col_type) {
Some(
build_additional_column_desc(
last_column_id,
assign_col_id(),
connector_name,
col_type,
None,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/reader/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl SourceDescBuilder {
.map(|s| s.to_lowercase())
.unwrap();
let (columns_exist, additional_columns) =
source_add_partition_offset_cols(&self.columns, &connector_name);
source_add_partition_offset_cols(&self.columns, &connector_name, false);

let mut columns: Vec<_> = self
.columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
select * from s
logical_plan: |-
LogicalProject { exprs: [id, value] }
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id] }
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [id, value] }
└─BatchKafkaScan { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [id, value, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], filter: (None, None) }
create_source:
format: plain
encode: protobuf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [x, y] }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [x, y, _row_id] }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
└─StreamRowIdGen { row_id_index: 5 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
with_config_map:
streaming_use_shared_source: 'true'
- before:
Expand All @@ -84,11 +84,11 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [x, y] }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [x, y, _row_id] }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
└─StreamRowIdGen { row_id_index: 5 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
with_config_map:
streaming_use_shared_source: 'true'
113 changes: 85 additions & 28 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ use risingwave_common::catalog::{
use risingwave_common::license::Feature;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::parser::additional_columns::{
build_additional_column_desc, get_supported_additional_columns,
source_add_partition_offset_cols,
};
use risingwave_connector::parser::{
fetch_json_schema_and_map_to_columns, AvroParserConfig, DebeziumAvroParserConfig,
Expand Down Expand Up @@ -1476,6 +1478,7 @@ pub async fn bind_create_source_or_table_with_connector(
col_id_gen: &mut ColumnIdGenerator,
// `true` for "create source", `false` for "create table with connector"
is_create_source: bool,
is_shared_non_cdc: bool,
source_rate_limit: Option<u32>,
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
let session = &handler_args.session;
Expand Down Expand Up @@ -1538,6 +1541,21 @@ pub async fn bind_create_source_or_table_with_connector(
if is_create_source {
// must behind `handle_addition_columns`
check_and_add_timestamp_column(&with_properties, &mut columns);

// For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
// For shared CDC source, the schema is different. See debezium_cdc_source_schema, CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
if is_shared_non_cdc {
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&columns,
&with_properties.get_connector().unwrap(),
true, // col_id filled below at col_id_gen.generate
);
for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
if !existed {
columns.push(ColumnCatalog::hidden(c));
}
}
}
}

// resolve privatelink connection for Kafka
Expand Down Expand Up @@ -1655,14 +1673,14 @@ pub async fn handle_create_source(
let with_properties = bind_connector_props(&handler_args, &source_schema, true)?;

let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_shareable_non_cdc_connector()
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source());
let is_shared_non_cdc = with_properties.is_shareable_non_cdc_connector()
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source();
let is_shared = create_cdc_source_job || is_shared_non_cdc;

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema)?
Expand Down Expand Up @@ -1690,6 +1708,7 @@ pub async fn handle_create_source(
stmt.include_column_options,
&mut col_id_gen,
true,
is_shared_non_cdc,
overwrite_options.source_rate_limit,
)
.await?;
Expand Down Expand Up @@ -1762,8 +1781,7 @@ pub mod tests {
use std::sync::Arc;

use risingwave_common::catalog::{
CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME,
ROWID_PREFIX, TABLE_NAME_COLUMN_NAME,
CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROWID_PREFIX,
};
use risingwave_common::types::DataType;

Expand Down Expand Up @@ -1917,15 +1935,29 @@ pub mod tests {
.columns
.iter()
.map(|col| (col.name(), col.data_type().clone()))
.collect::<HashMap<&str, DataType>>();
.collect::<Vec<(&str, DataType)>>();

let expected_columns = maplit::hashmap! {
ROWID_PREFIX => DataType::Serial,
"payload" => DataType::Jsonb,
OFFSET_COLUMN_NAME => DataType::Varchar,
TABLE_NAME_COLUMN_NAME => DataType::Varchar,
};
assert_eq!(columns, expected_columns);
expect_test::expect![[r#"
[
(
"payload",
Jsonb,
),
(
"_rw_offset",
Varchar,
),
(
"_rw_table_name",
Varchar,
),
(
"_row_id",
Serial,
),
]
"#]]
.assert_debug_eq(&columns);
}

#[tokio::test]
Expand Down Expand Up @@ -1954,16 +1986,41 @@ pub mod tests {
.unwrap();
assert_eq!(source.name, "s");

let columns = GET_COLUMN_FROM_CATALOG(source);
let expect_columns = maplit::hashmap! {
ROWID_PREFIX => DataType::Serial,
"v1" => DataType::Int32,
"_rw_kafka_key" => DataType::Bytea,
// todo: kafka connector will automatically derive the column
// will change to a required field in the include clause
"_rw_kafka_timestamp" => DataType::Timestamptz,
};
assert_eq!(columns, expect_columns);
let columns = source
.columns
.iter()
.map(|col| (col.name(), col.data_type().clone()))
.collect::<Vec<(&str, DataType)>>();

expect_test::expect![[r#"
[
(
"v1",
Int32,
),
(
"_rw_kafka_key",
Bytea,
),
(
"_rw_kafka_timestamp",
Timestamptz,
),
(
"_rw_kafka_partition",
Varchar,
),
(
"_rw_kafka_offset",
Varchar,
),
(
"_row_id",
Serial,
),
]
"#]]
.assert_debug_eq(&columns);

let sql =
"CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json"
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
include_column_options,
&mut col_id_gen,
false,
false,
rate_limit,
)
.await?;
Expand Down
Loading

0 comments on commit c551bd2

Please sign in to comment.