Skip to content

Commit

Permalink
fix(source): REFRESH SCHEMA shall keep INCLUDE pk for UPSERT (r…
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Nov 22, 2024
1 parent 2d1ffde commit 6aae038
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 1 deletion.
17 changes: 17 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,20 @@ ABC

statement ok
drop table t;

statement ok
create table t (primary key (kafka_key))
INCLUDE key as kafka_key
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_alter_table_test'
)
FORMAT UPSERT ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

statement ok
ALTER TABLE t REFRESH SCHEMA;

statement ok
drop table t;
2 changes: 2 additions & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ pub async fn get_replace_table_plan(
with_version_column,
wildcard_idx,
cdc_table_info,
include_column_options,
..
} = definition
else {
Expand All @@ -208,6 +209,7 @@ pub async fn get_replace_table_plan(
with_version_column,
cdc_table_info,
new_version_columns,
include_column_options,
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ pub(crate) async fn reparse_table_for_sink(
append_only,
on_conflict,
with_version_column,
include_column_options,
..
} = definition
else {
Expand All @@ -696,6 +697,7 @@ pub(crate) async fn reparse_table_for_sink(
with_version_column,
None,
None,
include_column_options,
)
.await?;

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,7 @@ pub async fn generate_stream_graph_for_table(
with_version_column: Option<String>,
cdc_table_info: Option<CdcTableInfo>,
new_version_columns: Option<Vec<ColumnCatalog>>,
include_column_options: IncludeOption,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;

Expand All @@ -1356,7 +1357,7 @@ pub async fn generate_stream_graph_for_table(
append_only,
on_conflict,
with_version_column,
vec![],
include_column_options,
)
.await?,
TableJobType::General,
Expand Down

0 comments on commit 6aae038

Please sign in to comment.