From 7bcb908039547d7b762d73b7d63f3d02e9b00a7b Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 16 Dec 2024 11:12:52 +0800 Subject: [PATCH] fix: alter shared source fresh schema will make it non-shared (#19802) Signed-off-by: xxchan --- .../kafka/alter/add_column_shared.slt | 35 +++--- .../kafka/protobuf/alter_source_shared.slt | 10 ++ src/connector/src/with_options.rs | 6 ++ .../src/handler/alter_source_with_sr.rs | 15 +-- src/frontend/src/handler/create_source.rs | 101 +++++++++++++----- src/frontend/src/handler/create_table.rs | 17 +-- 6 files changed, 132 insertions(+), 52 deletions(-) diff --git a/e2e_test/source_inline/kafka/alter/add_column_shared.slt b/e2e_test/source_inline/kafka/alter/add_column_shared.slt index 45454df818afb..bbb03c178fa2f 100644 --- a/e2e_test/source_inline/kafka/alter/add_column_shared.slt +++ b/e2e_test/source_inline/kafka/alter/add_column_shared.slt @@ -50,6 +50,16 @@ alter source s add column v3 varchar; # New MV will have v3. +# Check it should still be shared source +query +explain create materialized view mv_after_alter as select * from s; +---- +StreamMaterialize { columns: [v1, v2, v3, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } +└─StreamProject { exprs: [v1, v2, v3, _row_id] } + └─StreamRowIdGen { row_id_index: 5 } + └─StreamSourceScan { columns: [v1, v2, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id, v3] } + + statement ok create materialized view mv_after_alter as select * from s; @@ -106,16 +116,6 @@ select * from mv_after_alter; 7 g g1 8 h h1 -query error -select * from mv_after_alter_2; ----- -db error: ERROR: Failed to run the query - -Caused by these errors (recent errors listed first): - 1: Catalog error - 2: table or source not found: mv_after_alter_2 - - # Batch select from source will have v3. @@ -146,6 +146,19 @@ select * from mv_before_alter; 8 h +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d +5 e +6 f +7 g +8 h + + statement ok drop source s cascade; @@ -195,5 +208,3 @@ drop source s cascade; system ok rpk topic delete shared_source_alter; - -# TODO: test alter source with schema registry diff --git a/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt b/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt index e4edec6d535dc..5301eda7679b1 100644 --- a/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt +++ b/e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt @@ -54,6 +54,16 @@ sleep 5s statement ok ALTER SOURCE src_user REFRESH SCHEMA; +# Check it should still be shared source +query +EXPLAIN CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; +---- +StreamMaterialize { columns: [id, name, address, city, gender, sc, _rw_kafka_timestamp, age, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } +└─StreamProject { exprs: [id, name, address, city, gender, sc, _rw_kafka_timestamp, age, _row_id] } + └─StreamRowIdGen { row_id_index: 9 } + └─StreamSourceScan { columns: [id, name, address, city, gender, sc, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id, age] } + + statement ok CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index 6c857de148fad..59310e5a15509 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -230,3 +230,9 @@ impl TryFrom<&WithOptionsSecResolved> for Option { } } } + +impl Get for WithOptionsSecResolved { + fn get(&self, key: &str) -> Option<&String> { + self.inner.get(key) + } +} diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index d4cec17b8b460..41dcc43b0f8f6 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -31,15 +31,14 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::alter_table_column::schema_has_schema_registry; -use super::create_source::{ - bind_columns_from_source, generate_stream_graph_for_source, validate_compatibility, -}; +use super::create_source::{generate_stream_graph_for_source, validate_compatibility}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::{DatabaseId, SchemaId}; use crate::error::{ErrorCode, Result}; +use crate::handler::create_source::{bind_columns_from_source, CreateSourceType}; use crate::session::SessionImpl; use crate::utils::resolve_secret_ref_in_with_options; use crate::{Binder, WithOptions}; @@ -164,8 +163,13 @@ pub async fn refresh_sr_and_get_columns_diff( bail_not_implemented!("altering a cdc source is not supported"); } - let (Some(columns_from_resolve_source), source_info) = - bind_columns_from_source(session, format_encode, Either::Right(&with_properties)).await? + let (Some(columns_from_resolve_source), source_info) = bind_columns_from_source( + session, + format_encode, + Either::Right(&with_properties), + CreateSourceType::from_with_properties(session, &with_properties), + ) + .await? else { // Source without schema registry is rejected. unreachable!("source without schema registry is rejected") @@ -277,7 +281,6 @@ pub async fn handle_alter_source_with_sr( source.version += 1; let pb_source = source.to_prost(schema_id, database_id); - let catalog_writer = session.catalog_writer()?; if source.info.is_shared() { let graph = generate_stream_graph_for_source(handler_args, source.clone())?; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 09aebe2be26f0..a6e9ca9b1d93d 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -302,12 +302,71 @@ fn get_name_strategy_or_default(name_strategy: Option) -> Result Self { + if with_properties.is_shareable_cdc_connector() { + CreateSourceType::SharedCdc + } else if with_properties.is_shareable_non_cdc_connector() + && session + .env() + .streaming_config() + .developer + .enable_shared_source + && session.config().streaming_use_shared_source() + { + CreateSourceType::SharedNonCdc + } else { + CreateSourceType::NonShared + } + } + + pub fn is_shared(&self) -> bool { + matches!( + self, + CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc + ) + } +} + /// Resolves the schema of the source from external schema file. /// See for more information. /// /// Note: the returned schema strictly corresponds to the schema. /// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included. -pub(crate) async fn bind_columns_from_source( +pub async fn bind_columns_from_source( + session: &SessionImpl, + format_encode: &FormatEncodeOptions, + with_properties: Either<&WithOptions, &WithOptionsSecResolved>, + create_source_type: CreateSourceType, +) -> Result<(Option>, StreamSourceInfo)> { + let (columns_from_resolve_source, mut source_info) = + if create_source_type == CreateSourceType::SharedCdc { + bind_columns_from_source_for_cdc(session, format_encode)? + } else { + bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await? + }; + if create_source_type.is_shared() { + // Note: this field should be called is_shared. Check field doc for more details. + source_info.cdc_source_job = true; + source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc; + } + Ok((columns_from_resolve_source, source_info)) +} + +async fn bind_columns_from_source_for_non_cdc( session: &SessionImpl, format_encode: &FormatEncodeOptions, with_properties: Either<&WithOptions, &WithOptionsSecResolved>, @@ -1542,9 +1601,7 @@ pub async fn bind_create_source_or_table_with_connector( source_info: StreamSourceInfo, include_column_options: IncludeOption, col_id_gen: &mut ColumnIdGenerator, - // `true` for "create source", `false` for "create table with connector" - is_create_source: bool, - is_shared_non_cdc: bool, + create_source_type: CreateSourceType, source_rate_limit: Option, ) -> Result<(SourceCatalog, DatabaseId, SchemaId)> { let session = &handler_args.session; @@ -1553,6 +1610,7 @@ pub async fn bind_create_source_or_table_with_connector( let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name.clone())?; + let is_create_source = create_source_type != CreateSourceType::Table; if !is_create_source && with_properties.is_iceberg_connector() { return Err(ErrorCode::BindError( "can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead" @@ -1609,7 +1667,7 @@ pub async fn bind_create_source_or_table_with_connector( // For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor. // For shared CDC source, the schema is different. See debezium_cdc_source_schema, CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS - if is_shared_non_cdc { + if create_source_type == CreateSourceType::SharedNonCdc { let (columns_exist, additional_columns) = source_add_partition_offset_cols( &columns, &with_properties.get_connector().unwrap(), @@ -1748,26 +1806,14 @@ pub async fn handle_create_source( let format_encode = stmt.format_encode.into_v2_with_warning(); let with_properties = bind_connector_props(&handler_args, &format_encode, true)?; - let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); - let is_shared_non_cdc = with_properties.is_shareable_non_cdc_connector() - && session - .env() - .streaming_config() - .developer - .enable_shared_source - && session.config().streaming_use_shared_source(); - let is_shared = create_cdc_source_job || is_shared_non_cdc; - - let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { - bind_columns_from_source_for_cdc(&session, &format_encode)? - } else { - bind_columns_from_source(&session, &format_encode, Either::Left(&with_properties)).await? - }; - if is_shared { - // Note: this field should be called is_shared. Check field doc for more details. - source_info.cdc_source_job = true; - source_info.is_distributed = !create_cdc_source_job; - } + let create_source_type = CreateSourceType::from_with_properties(&session, &*with_properties); + let (columns_from_resolve_source, source_info) = bind_columns_from_source( + &session, + &format_encode, + Either::Left(&with_properties), + create_source_type, + ) + .await?; let mut col_id_gen = ColumnIdGenerator::new_initial(); let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector( @@ -1783,8 +1829,7 @@ pub async fn handle_create_source( source_info, stmt.include_column_options, &mut col_id_gen, - true, - is_shared_non_cdc, + create_source_type, overwrite_options.source_rate_limit, ) .await?; @@ -1802,7 +1847,7 @@ pub async fn handle_create_source( let catalog_writer = session.catalog_writer()?; - if is_shared { + if create_source_type.is_shared() { let graph = generate_stream_graph_for_source(handler_args, source_catalog)?; catalog_writer.create_source(source, Some(graph)).await?; } else { diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1520374d7e503..1fa960da971d9 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -59,6 +59,7 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::{IncludeOption, Parser}; use thiserror_ext::AsReport; +use super::create_source::{bind_columns_from_source, CreateSourceType}; use super::{create_sink, create_source, RwPgResponse}; use crate::binder::{bind_data_type, bind_struct_field, Clause, SecureCompareContext}; use crate::catalog::root_catalog::SchemaPath; @@ -68,8 +69,8 @@ use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId}; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::handler::create_source::{ - bind_columns_from_source, bind_connector_props, bind_create_source_or_table_with_connector, - bind_source_watermark, handle_addition_columns, UPSTREAM_SOURCE_KEY, + bind_connector_props, bind_create_source_or_table_with_connector, bind_source_watermark, + handle_addition_columns, UPSTREAM_SOURCE_KEY, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind}; @@ -497,8 +498,13 @@ pub(crate) async fn gen_create_table_plan_with_source( let session = &handler_args.session; let with_properties = bind_connector_props(&handler_args, &format_encode, false)?; - let (columns_from_resolve_source, source_info) = - bind_columns_from_source(session, &format_encode, Either::Left(&with_properties)).await?; + let (columns_from_resolve_source, source_info) = bind_columns_from_source( + session, + &format_encode, + Either::Left(&with_properties), + CreateSourceType::Table, + ) + .await?; let overwrite_options = OverwriteOptions::new(&mut handler_args); let rate_limit = overwrite_options.source_rate_limit; @@ -515,8 +521,7 @@ pub(crate) async fn gen_create_table_plan_with_source( source_info, include_column_options, &mut col_id_gen, - false, - false, + CreateSourceType::Table, rate_limit, ) .await?;