diff --git a/e2e_test/source_inline/kafka/avro/alter_table.slt b/e2e_test/source_inline/kafka/avro/alter_table.slt index e8c43739b4746..04e565390e495 100644 --- a/e2e_test/source_inline/kafka/avro/alter_table.slt +++ b/e2e_test/source_inline/kafka/avro/alter_table.slt @@ -80,3 +80,20 @@ ABC statement ok drop table t; + +statement ok +create table t (primary key (kafka_key)) +INCLUDE key as kafka_key +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'avro_alter_table_test' +) +FORMAT UPSERT ENCODE AVRO ( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}' +); + +statement ok +ALTER TABLE t REFRESH SCHEMA; + +statement ok +drop table t; diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 88e886ad667bf..5ba3c804ee748 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -186,6 +186,7 @@ pub async fn get_replace_table_plan( with_version_column, wildcard_idx, cdc_table_info, + include_column_options, .. } = definition else { @@ -208,6 +209,7 @@ pub async fn get_replace_table_plan( with_version_column, cdc_table_info, new_version_columns, + include_column_options, ) .await?; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 92b4cc3352785..59bbe49652a00 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -674,6 +674,7 @@ pub(crate) async fn reparse_table_for_sink( append_only, on_conflict, with_version_column, + include_column_options, .. } = definition else { @@ -696,6 +697,7 @@ pub(crate) async fn reparse_table_for_sink( with_version_column, None, None, + include_column_options, ) .await?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 2c1916174e0b7..000d0c7b60514 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1338,6 +1338,7 @@ pub async fn generate_stream_graph_for_table( with_version_column: Option, cdc_table_info: Option, new_version_columns: Option>, + include_column_options: IncludeOption, ) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; @@ -1356,7 +1357,7 @@ pub async fn generate_stream_graph_for_table( append_only, on_conflict, with_version_column, - vec![], + include_column_options, ) .await?, TableJobType::General,