Skip to content

Commit

Permalink
fix(source): REFRESH SCHEMA shall keep INCLUDE pk for UPSERT (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Nov 20, 2024
1 parent 5f1a59b commit c1e8f9a
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 1 deletion.
17 changes: 17 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,20 @@ ABC

statement ok
drop table t;

statement ok
create table t (primary key (kafka_key))
INCLUDE key as kafka_key
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_alter_table_test'
)
FORMAT UPSERT ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

statement ok
ALTER TABLE t REFRESH SCHEMA;

statement ok
drop table t;
2 changes: 2 additions & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub async fn get_replace_table_plan(
wildcard_idx,
cdc_table_info,
format_encode,
include_column_options,
..
} = new_definition
else {
Expand All @@ -206,6 +207,7 @@ pub async fn get_replace_table_plan(
with_version_column,
cdc_table_info,
new_version_columns,
include_column_options,
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ pub(crate) async fn reparse_table_for_sink(
append_only,
on_conflict,
with_version_column,
include_column_options,
..
} = definition
else {
Expand All @@ -581,6 +582,7 @@ pub(crate) async fn reparse_table_for_sink(
with_version_column,
None,
None,
include_column_options,
)
.await?;

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,7 @@ pub async fn generate_stream_graph_for_replace_table(
with_version_column: Option<String>,
cdc_table_info: Option<CdcTableInfo>,
new_version_columns: Option<Vec<ColumnCatalog>>,
include_column_options: IncludeOption,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;

Expand All @@ -1328,7 +1329,7 @@ pub async fn generate_stream_graph_for_replace_table(
append_only,
on_conflict,
with_version_column,
vec![],
include_column_options,
)
.await?,
TableJobType::General,
Expand Down

0 comments on commit c1e8f9a

Please sign in to comment.