Skip to content

Commit

Permalink
fix: alter shared source fresh schema will make it non-shared (#19802)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Dec 16, 2024
1 parent 680c098 commit 7bcb908
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 52 deletions.
35 changes: 23 additions & 12 deletions e2e_test/source_inline/kafka/alter/add_column_shared.slt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ alter source s add column v3 varchar;

# New MV will have v3.

# Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799>
query
explain create materialized view mv_after_alter as select * from s;
----
StreamMaterialize { columns: [v1, v2, v3, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [v1, v2, v3, _row_id] }
└─StreamRowIdGen { row_id_index: 5 }
└─StreamSourceScan { columns: [v1, v2, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id, v3] }


statement ok
create materialized view mv_after_alter as select * from s;

Expand Down Expand Up @@ -106,16 +116,6 @@ select * from mv_after_alter;
7 g g1
8 h h1

query error
select * from mv_after_alter_2;
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Catalog error
2: table or source not found: mv_after_alter_2



# Batch select from source will have v3.

Expand Down Expand Up @@ -146,6 +146,19 @@ select * from mv_before_alter;
8 h


query ?? rowsort
select * from mv_before_alter;
----
1 a
2 b
3 c
4 d
5 e
6 f
7 g
8 h


statement ok
drop source s cascade;

Expand Down Expand Up @@ -195,5 +208,3 @@ drop source s cascade;

system ok
rpk topic delete shared_source_alter;

# TODO: test alter source with schema registry
10 changes: 10 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ sleep 5s
statement ok
ALTER SOURCE src_user REFRESH SCHEMA;

# Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799>
query
EXPLAIN CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;
----
StreamMaterialize { columns: [id, name, address, city, gender, sc, _rw_kafka_timestamp, age, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [id, name, address, city, gender, sc, _rw_kafka_timestamp, age, _row_id] }
└─StreamRowIdGen { row_id_index: 9 }
└─StreamSourceScan { columns: [id, name, address, city, gender, sc, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id, age] }


statement ok
CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;

Expand Down
6 changes: 6 additions & 0 deletions src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,9 @@ impl TryFrom<&WithOptionsSecResolved> for Option<SinkFormatDesc> {
}
}
}

impl Get for WithOptionsSecResolved {
fn get(&self, key: &str) -> Option<&String> {
self.inner.get(key)
}
}
15 changes: 9 additions & 6 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ use risingwave_sqlparser::ast::{
use risingwave_sqlparser::parser::Parser;

use super::alter_table_column::schema_has_schema_registry;
use super::create_source::{
bind_columns_from_source, generate_stream_graph_for_source, validate_compatibility,
};
use super::create_source::{generate_stream_graph_for_source, validate_compatibility};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::{DatabaseId, SchemaId};
use crate::error::{ErrorCode, Result};
use crate::handler::create_source::{bind_columns_from_source, CreateSourceType};
use crate::session::SessionImpl;
use crate::utils::resolve_secret_ref_in_with_options;
use crate::{Binder, WithOptions};
Expand Down Expand Up @@ -164,8 +163,13 @@ pub async fn refresh_sr_and_get_columns_diff(
bail_not_implemented!("altering a cdc source is not supported");
}

let (Some(columns_from_resolve_source), source_info) =
bind_columns_from_source(session, format_encode, Either::Right(&with_properties)).await?
let (Some(columns_from_resolve_source), source_info) = bind_columns_from_source(
session,
format_encode,
Either::Right(&with_properties),
CreateSourceType::from_with_properties(session, &with_properties),
)
.await?
else {
// Source without schema registry is rejected.
unreachable!("source without schema registry is rejected")
Expand Down Expand Up @@ -277,7 +281,6 @@ pub async fn handle_alter_source_with_sr(
source.version += 1;

let pb_source = source.to_prost(schema_id, database_id);

let catalog_writer = session.catalog_writer()?;
if source.info.is_shared() {
let graph = generate_stream_graph_for_source(handler_args, source.clone())?;
Expand Down
101 changes: 73 additions & 28 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,71 @@ fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Opti
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CreateSourceType {
SharedCdc,
/// e.g., shared Kafka source
SharedNonCdc,
NonShared,
/// create table with connector
Table,
}

impl CreateSourceType {
pub fn from_with_properties(
session: &SessionImpl,
with_properties: &impl WithPropertiesExt,
) -> Self {
if with_properties.is_shareable_cdc_connector() {
CreateSourceType::SharedCdc
} else if with_properties.is_shareable_non_cdc_connector()
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source()
{
CreateSourceType::SharedNonCdc
} else {
CreateSourceType::NonShared
}
}

pub fn is_shared(&self) -> bool {
matches!(
self,
CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
)
}
}

/// Resolves the schema of the source from external schema file.
/// See <https://www.risingwave.dev/docs/current/sql-create-source> for more information.
///
/// Note: the returned schema strictly corresponds to the schema.
/// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included.
pub(crate) async fn bind_columns_from_source(
pub async fn bind_columns_from_source(
session: &SessionImpl,
format_encode: &FormatEncodeOptions,
with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
create_source_type: CreateSourceType,
) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
let (columns_from_resolve_source, mut source_info) =
if create_source_type == CreateSourceType::SharedCdc {
bind_columns_from_source_for_cdc(session, format_encode)?
} else {
bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await?
};
if create_source_type.is_shared() {
// Note: this field should be called is_shared. Check field doc for more details.
source_info.cdc_source_job = true;
source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc;
}
Ok((columns_from_resolve_source, source_info))
}

async fn bind_columns_from_source_for_non_cdc(
session: &SessionImpl,
format_encode: &FormatEncodeOptions,
with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
Expand Down Expand Up @@ -1542,9 +1601,7 @@ pub async fn bind_create_source_or_table_with_connector(
source_info: StreamSourceInfo,
include_column_options: IncludeOption,
col_id_gen: &mut ColumnIdGenerator,
// `true` for "create source", `false` for "create table with connector"
is_create_source: bool,
is_shared_non_cdc: bool,
create_source_type: CreateSourceType,
source_rate_limit: Option<u32>,
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
let session = &handler_args.session;
Expand All @@ -1553,6 +1610,7 @@ pub async fn bind_create_source_or_table_with_connector(
let (database_id, schema_id) =
session.get_database_and_schema_id_for_create(schema_name.clone())?;

let is_create_source = create_source_type != CreateSourceType::Table;
if !is_create_source && with_properties.is_iceberg_connector() {
return Err(ErrorCode::BindError(
"can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead"
Expand Down Expand Up @@ -1609,7 +1667,7 @@ pub async fn bind_create_source_or_table_with_connector(

// For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
// For shared CDC source, the schema is different. See debezium_cdc_source_schema, CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
if is_shared_non_cdc {
if create_source_type == CreateSourceType::SharedNonCdc {
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&columns,
&with_properties.get_connector().unwrap(),
Expand Down Expand Up @@ -1748,26 +1806,14 @@ pub async fn handle_create_source(
let format_encode = stmt.format_encode.into_v2_with_warning();
let with_properties = bind_connector_props(&handler_args, &format_encode, true)?;

let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared_non_cdc = with_properties.is_shareable_non_cdc_connector()
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source();
let is_shared = create_cdc_source_job || is_shared_non_cdc;

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &format_encode)?
} else {
bind_columns_from_source(&session, &format_encode, Either::Left(&with_properties)).await?
};
if is_shared {
// Note: this field should be called is_shared. Check field doc for more details.
source_info.cdc_source_job = true;
source_info.is_distributed = !create_cdc_source_job;
}
let create_source_type = CreateSourceType::from_with_properties(&session, &*with_properties);
let (columns_from_resolve_source, source_info) = bind_columns_from_source(
&session,
&format_encode,
Either::Left(&with_properties),
create_source_type,
)
.await?;
let mut col_id_gen = ColumnIdGenerator::new_initial();

let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector(
Expand All @@ -1783,8 +1829,7 @@ pub async fn handle_create_source(
source_info,
stmt.include_column_options,
&mut col_id_gen,
true,
is_shared_non_cdc,
create_source_type,
overwrite_options.source_rate_limit,
)
.await?;
Expand All @@ -1802,7 +1847,7 @@ pub async fn handle_create_source(

let catalog_writer = session.catalog_writer()?;

if is_shared {
if create_source_type.is_shared() {
let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
catalog_writer.create_source(source, Some(graph)).await?;
} else {
Expand Down
17 changes: 11 additions & 6 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use risingwave_sqlparser::ast::{
use risingwave_sqlparser::parser::{IncludeOption, Parser};
use thiserror_ext::AsReport;

use super::create_source::{bind_columns_from_source, CreateSourceType};
use super::{create_sink, create_source, RwPgResponse};
use crate::binder::{bind_data_type, bind_struct_field, Clause, SecureCompareContext};
use crate::catalog::root_catalog::SchemaPath;
Expand All @@ -68,8 +69,8 @@ use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
use crate::handler::create_source::{
bind_columns_from_source, bind_connector_props, bind_create_source_or_table_with_connector,
bind_source_watermark, handle_addition_columns, UPSTREAM_SOURCE_KEY,
bind_connector_props, bind_create_source_or_table_with_connector, bind_source_watermark,
handle_addition_columns, UPSTREAM_SOURCE_KEY,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
Expand Down Expand Up @@ -497,8 +498,13 @@ pub(crate) async fn gen_create_table_plan_with_source(
let session = &handler_args.session;
let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;

let (columns_from_resolve_source, source_info) =
bind_columns_from_source(session, &format_encode, Either::Left(&with_properties)).await?;
let (columns_from_resolve_source, source_info) = bind_columns_from_source(
session,
&format_encode,
Either::Left(&with_properties),
CreateSourceType::Table,
)
.await?;

let overwrite_options = OverwriteOptions::new(&mut handler_args);
let rate_limit = overwrite_options.source_rate_limit;
Expand All @@ -515,8 +521,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
source_info,
include_column_options,
&mut col_id_gen,
false,
false,
CreateSourceType::Table,
rate_limit,
)
.await?;
Expand Down

0 comments on commit 7bcb908

Please sign in to comment.