Skip to content

Commit

Permalink
fix(source): REFRESH SCHEMA shall keep INCLUDE pk for UPSERT (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Nov 21, 2024
1 parent efbd18a commit 299c977
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 1 deletion.
17 changes: 17 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,20 @@ ABC

statement ok
drop table t;

statement ok
create table t (primary key (kafka_key))
INCLUDE key as kafka_key
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_alter_table_test'
)
FORMAT UPSERT ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

statement ok
ALTER TABLE t REFRESH SCHEMA;

statement ok
drop table t;
2 changes: 2 additions & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ pub async fn get_replace_table_plan(
with_version_column,
wildcard_idx,
cdc_table_info,
include_column_options,
..
} = definition
else {
Expand All @@ -207,6 +208,7 @@ pub async fn get_replace_table_plan(
with_version_column,
cdc_table_info,
new_version_columns,
include_column_options,
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ pub(crate) async fn reparse_table_for_sink(
append_only,
on_conflict,
with_version_column,
include_column_options,
..
} = definition
else {
Expand All @@ -696,6 +697,7 @@ pub(crate) async fn reparse_table_for_sink(
with_version_column,
None,
None,
include_column_options,
)
.await?;

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,7 @@ pub async fn generate_stream_graph_for_table(
with_version_column: Option<String>,
cdc_table_info: Option<CdcTableInfo>,
new_version_columns: Option<Vec<ColumnCatalog>>,
include_column_options: IncludeOption,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;

Expand All @@ -1353,7 +1354,7 @@ pub async fn generate_stream_graph_for_table(
append_only,
on_conflict,
with_version_column,
vec![],
include_column_options,
)
.await?,
TableJobType::General,
Expand Down

0 comments on commit 299c977

Please sign in to comment.