From ffe63f1b20755b49f824949bab21964081a20b43 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 11 Dec 2024 15:41:43 +0800 Subject: [PATCH] feat(frontend): support alter source refresh schema for shared source (#19740) Signed-off-by: xxchan --- .../kafka/protobuf/alter_source.slt | 1 + .../kafka/protobuf/alter_source_shared.slt | 103 ++++++++++++++++++ .../src/handler/alter_source_with_sr.rs | 39 +++++-- 3 files changed, 134 insertions(+), 9 deletions(-) create mode 100644 e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt diff --git a/e2e_test/source_inline/kafka/protobuf/alter_source.slt b/e2e_test/source_inline/kafka/protobuf/alter_source.slt index 7f1b431a06ba..010c3a665f66 100644 --- a/e2e_test/source_inline/kafka/protobuf/alter_source.slt +++ b/e2e_test/source_inline/kafka/protobuf/alter_source.slt @@ -37,6 +37,7 @@ FORMAT PLAIN ENCODE PROTOBUF( message = 'test.User' ); +# age is new field statement error SELECT age FROM mv_user; diff --git a/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt b/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt new file mode 100644 index 000000000000..46a7c1aad3d0 --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt @@ -0,0 +1,103 @@ +control substitution on + +system ok +rpk topic delete pb_alter_source_shared_test || true; \ +(rpk sr subject delete 'pb_alter_source_shared_test-value' && rpk sr subject delete 'pb_alter_source_shared_test-value' --permanent) || true; + +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 20 user + +statement ok +CREATE SOURCE src_user +INCLUDE timestamp -- include explicitly here to test a bug found in https://github.com/risingwavelabs/risingwave/pull/17293 +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'pb_alter_source_shared_test', + scan.startup.mode = 'earliest' +) +FORMAT PLAIN ENCODE PROTOBUF( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' +); + +statement ok +CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; + +statement ok +CREATE MATERIALIZED VIEW mv_user_2 AS SELECT * FROM src_user; + +statement ok +CREATE TABLE t_user WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'pb_alter_source_shared_test', + scan.startup.mode = 'earliest' +) +FORMAT PLAIN ENCODE PROTOBUF( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' +); + +# age is new field +statement error +SELECT age FROM mv_user; + +statement error +SELECT age FROM t_user; + +# Push more events with extended fields +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 5 user_with_more_fields + +sleep 5s + +# Refresh source schema +statement ok +ALTER SOURCE src_user REFRESH SCHEMA; + +statement ok +CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; + +statement ok +flush; + + +query ???? +SELECT COUNT(*) FROM mv_user; +---- +25 + +query ???? +SELECT COUNT(*) FROM mv_user_2; +---- +25 + +query ???? +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more; +---- +25 104 0 510 + +# Push more events with extended fields +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 5 user_with_more_fields + +sleep 5s + + +query ???? +SELECT COUNT(*) FROM mv_user; +---- +30 + +query ???? +SELECT COUNT(*) FROM mv_user_2; +---- +30 + +query ???? +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more; +---- +30 104 0 1020 + + +# statement ok +# DROP SOURCE src_user CASCADE; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 25877fd7036d..d4cec17b8b46 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -20,6 +20,7 @@ use itertools::Itertools; use pgwire::pg_response::StatementType; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{max_column_id, ColumnCatalog}; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::WithPropertiesExt; use risingwave_pb::catalog::StreamSourceInfo; use risingwave_pb::plan_common::{EncodeType, FormatType}; @@ -30,7 +31,9 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::alter_table_column::schema_has_schema_registry; -use super::create_source::{bind_columns_from_source, validate_compatibility}; +use super::create_source::{ + bind_columns_from_source, generate_stream_graph_for_source, validate_compatibility, +}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; @@ -214,9 +217,10 @@ pub async fn handle_alter_source_with_sr( name: ObjectName, format_encode: FormatEncodeOptions, ) -> Result { - let session = handler_args.session; + let session = handler_args.session.clone(); let (source, database_id, schema_id) = fetch_source_catalog_with_db_schema_id(&session, &name)?; let mut source = source.as_ref().clone(); + let old_columns = source.columns.clone(); if source.associated_table_id.is_some() { return Err(ErrorCode::NotSupported( @@ -225,9 +229,6 @@ pub async fn handle_alter_source_with_sr( ) .into()); }; - if source.info.is_shared() { - bail_not_implemented!(issue = 16003, "alter shared source"); - } check_format_encode(&source, &format_encode)?; @@ -272,14 +273,34 @@ pub async fn handle_alter_source_with_sr( .format_encode_secret_refs .extend(format_encode_secret_ref); - let mut pb_source = source.to_prost(schema_id, database_id); - // update version - pb_source.version += 1; + source.version += 1; + + let pb_source = source.to_prost(schema_id, database_id); let catalog_writer = session.catalog_writer()?; - catalog_writer.alter_source(pb_source).await?; + if source.info.is_shared() { + let graph = generate_stream_graph_for_source(handler_args, source.clone())?; + // Calculate the mapping from the original columns to the new columns. + let col_index_mapping = ColIndexMapping::new( + old_columns + .iter() + .map(|old_c| { + source + .columns + .iter() + .position(|new_c| new_c.column_id() == old_c.column_id()) + }) + .collect(), + source.columns.len(), + ); + catalog_writer + .replace_source(pb_source, graph, col_index_mapping) + .await? + } else { + catalog_writer.alter_source(pb_source).await?; + } Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE)) }