From c1e8f9a2d177d8a9c9733e06f1c9ce77517582d3 Mon Sep 17 00:00:00 2001 From: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> Date: Wed, 20 Nov 2024 17:21:39 +0800 Subject: [PATCH] fix(source): `REFRESH SCHEMA` shall keep `INCLUDE` pk for `UPSERT` (#19384) --- .../source_inline/kafka/avro/alter_table.slt | 17 +++++++++++++++++ src/frontend/src/handler/alter_table_column.rs | 2 ++ src/frontend/src/handler/create_sink.rs | 2 ++ src/frontend/src/handler/create_table.rs | 3 ++- 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/e2e_test/source_inline/kafka/avro/alter_table.slt b/e2e_test/source_inline/kafka/avro/alter_table.slt index 330cdc490cdba..08a98c2cca4c9 100644 --- a/e2e_test/source_inline/kafka/avro/alter_table.slt +++ b/e2e_test/source_inline/kafka/avro/alter_table.slt @@ -78,3 +78,20 @@ ABC statement ok drop table t; + +statement ok +create table t (primary key (kafka_key)) +INCLUDE key as kafka_key +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'avro_alter_table_test' +) +FORMAT UPSERT ENCODE AVRO ( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}' +); + +statement ok +ALTER TABLE t REFRESH SCHEMA; + +statement ok +drop table t; diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index cf55b82a47500..19f8355a77bc1 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -180,6 +180,7 @@ pub async fn get_replace_table_plan( wildcard_idx, cdc_table_info, format_encode, + include_column_options, .. } = new_definition else { @@ -206,6 +207,7 @@ pub async fn get_replace_table_plan( with_version_column, cdc_table_info, new_version_columns, + include_column_options, ) .await?; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index a7c997d6232e4..e280f90909267 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -559,6 +559,7 @@ pub(crate) async fn reparse_table_for_sink( append_only, on_conflict, with_version_column, + include_column_options, .. } = definition else { @@ -581,6 +582,7 @@ pub(crate) async fn reparse_table_for_sink( with_version_column, None, None, + include_column_options, ) .await?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 6118ba5ccd36b..ff2e410370782 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1310,6 +1310,7 @@ pub async fn generate_stream_graph_for_replace_table( with_version_column: Option, cdc_table_info: Option, new_version_columns: Option>, + include_column_options: IncludeOption, ) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; @@ -1328,7 +1329,7 @@ pub async fn generate_stream_graph_for_replace_table( append_only, on_conflict, with_version_column, - vec![], + include_column_options, ) .await?, TableJobType::General,