Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source): REFRESH SCHEMA shall keep INCLUDE pk for UPSERT #19384

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,20 @@ ABC

statement ok
drop table t;

statement ok
create table t (primary key (kafka_key))
INCLUDE key as kafka_key
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_alter_table_test'
)
FORMAT UPSERT ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

statement ok
ALTER TABLE t REFRESH SCHEMA;

statement ok
drop table t;
Comment on lines +93 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to check the new schema meets expectation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a no-op refresh. We refresh immediately after creation without any schema change. But yes it is always good to test more:

  • schema meets expectation after refresh
  • actual data meets schema after refresh (rather than panic due to mismatch)
  • works for protobuf in addition to avro
  • works for debezium in addition to plain or upsert

2 changes: 2 additions & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub async fn get_replace_table_plan(
wildcard_idx,
cdc_table_info,
format_encode,
include_column_options,
..
} = new_definition
else {
Expand All @@ -206,6 +207,7 @@ pub async fn get_replace_table_plan(
with_version_column,
cdc_table_info,
new_version_columns,
include_column_options,
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ pub(crate) async fn reparse_table_for_sink(
append_only,
on_conflict,
with_version_column,
include_column_options,
..
} = definition
else {
Expand All @@ -581,6 +582,7 @@ pub(crate) async fn reparse_table_for_sink(
with_version_column,
None,
None,
include_column_options,
)
.await?;

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,7 @@ pub async fn generate_stream_graph_for_replace_table(
with_version_column: Option<String>,
cdc_table_info: Option<CdcTableInfo>,
new_version_columns: Option<Vec<ColumnCatalog>>,
include_column_options: IncludeOption,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;

Expand All @@ -1328,7 +1329,7 @@ pub async fn generate_stream_graph_for_replace_table(
append_only,
on_conflict,
with_version_column,
vec![],
include_column_options,
)
.await?,
TableJobType::General,
Expand Down
Loading