From 8ccdb38acf7f177309550000a3bbf2a9ee1b0e05 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 18 Jun 2024 13:28:17 +0800 Subject: [PATCH] fix(source): fix panic for alter source with sr --- .../source_inline/kafka/avro/alter_source.slt | 82 +++---------------- src/connector/src/parser/mod.rs | 1 + .../src/binder/relation/table_or_source.rs | 13 +++ .../src/handler/alter_source_column.rs | 14 ++-- .../src/handler/alter_source_with_sr.rs | 9 +- src/frontend/src/handler/create_source.rs | 22 +++++ src/frontend/src/handler/util.rs | 1 + .../src/optimizer/plan_node/logical_source.rs | 3 + 8 files changed, 69 insertions(+), 76 deletions(-) diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index 15d48d9df0a18..75dc8a7d03778 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -51,79 +51,21 @@ No tracking issue yet. Feel free to submit a feature request at https://github.c statement ok alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); - -query ? -explain (verbose, trace) select * from s; +query ?? +select * from s ---- -Begin: -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalSource { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar] } -(empty) -Rewrite Source For Batch: -(empty) -apply SourceToKafkaScanRule 1 time(s) -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } -(empty) -Predicate Push Down: -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } -(empty) -Prune Columns: -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalProject { exprs: [foo, bar] } - └─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } -(empty) -Project Remove: -(empty) -apply ProjectMergeRule 1 time(s) -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } -(empty) -Const eval exprs: -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } -(empty) -To Batch Plan: -(empty) -BatchProject { exprs: [foo, bar] } -└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } -(empty) -Inline Session Timezone: -(empty) -BatchProject { exprs: [foo, bar] } -└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } -(empty) -To Batch Physical Plan: -(empty) -BatchProject { exprs: [foo, bar] } -└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } -(empty) -To Batch Distributed Plan: -(empty) -BatchExchange { order: [], dist: Single } -└─BatchProject { exprs: [foo, bar] } - └─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } -(empty) - -# query error -# select * from s -# ---- -# db error: ERROR: Panicked when handling the request: assertion `left == right` failed -# left: [Varchar, Varchar] -# right: [Varchar, Int32] -# This is a bug. We would appreciate a bug report at: -# https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml +ABC 1 statement ok create materialized view mv as select * from s; -# statement ok -# drop source s; +sleep 2s + +query ?? +select * from mv +---- +ABC 1 + +statement ok +drop source s; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index f7667a66a3747..26cf746b535dc 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1028,6 +1028,7 @@ pub mod test_utils { } } +/// Note: this is created in `SourceReader::build_stream` #[derive(Debug, Clone, Default)] pub struct ParserConfig { pub common: CommonParserConfig, diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index c5283a2cc592a..067641178fdc9 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -221,6 +221,19 @@ impl Binder { source_catalog: &SourceCatalog, as_of: Option, ) -> (Relation, Vec<(bool, Field)>) { + if cfg!(debug_assertions) { + let columns = &source_catalog.columns; + assert!( + columns + .iter() + .map(|c| c.column_id()) + .duplicates() + .next() + .is_none(), + "duplicate ColumnId found in source catalog. Columns: {:?}", + columns + ); + } self.included_relations.insert(source_catalog.id.into()); ( Relation::Source(Box::new(BoundSource { diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index fcabedc1149c4..45a22405499fd 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::catalog::ColumnId; +use risingwave_common::catalog::{ColumnCatalog, ColumnId}; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; use risingwave_sqlparser::ast::{ AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, @@ -106,10 +106,7 @@ pub async fn handle_alter_source_column( catalog.definition = alter_definition_add_column(&catalog.definition, column_def.clone())?; let mut bound_column = bind_sql_columns(&[column_def])?.remove(0); - bound_column.column_desc.column_id = columns - .iter() - .fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id())) - .next(); + bound_column.column_desc.column_id = max_column_id(columns).next(); columns.push(bound_column); } _ => unreachable!(), @@ -147,6 +144,13 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul Ok(stmt.to_string()) } +pub fn max_column_id(columns: &Vec) -> ColumnId { + // XXX: should we check the column IDs of struct fields here? + columns + .iter() + .fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id())) +} + #[cfg(test)] pub mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index c72cf547365d7..6452e820b13e1 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -28,6 +28,7 @@ use risingwave_sqlparser::ast::{ }; use risingwave_sqlparser::parser::Parser; +use super::alter_source_column::max_column_id; use super::alter_table_column::schema_has_schema_registry; use super::create_source::{bind_columns_from_source, validate_compatibility}; use super::util::SourceSchemaCompatExt; @@ -162,7 +163,13 @@ pub async fn refresh_sr_and_get_columns_diff( unreachable!("source without schema registry is rejected") }; - let added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); + let mut added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); + // The newly resolved columns' column IDs also starts from 0. They cannot be used directly. + let mut next_col_id = max_column_id(&original_source.columns).next(); + for col in &mut added_columns { + col.column_desc.column_id = next_col_id; + next_col_id = next_col_id.next(); + } let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source); Ok((source_info, added_columns, dropped_columns)) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a29aa86907e0f..31df775c0f43c 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -489,6 +489,28 @@ pub(crate) async fn bind_columns_from_source( } }; + if cfg!(debug_assertions) { + // validate column ids + // Note: this just documents how it works currently. It doesn't mean whether it's reasonable. + if let Some(ref columns) = columns { + let mut i = 1; + fn check_col(col: &ColumnDesc, i: &mut usize, columns: &Vec) { + assert!( + col.column_id.get_id() == *i as i32, + "unexpected column id, col: {col:?}, i: {i}, columns: {columns:?}" + ); + *i += 1; + } + for col in columns { + check_col(&col.column_desc, &mut i, columns); + for nested_col in &col.column_desc.field_descs { + // What's the usage of struct fields' column IDs? + check_col(nested_col, &mut i, columns); + } + } + } + } + if !format_encode_options_to_consume.is_empty() { let err_string = format!( "Get unknown format_encode_options for {:?} {:?}: {}", diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 7fd4f0b92822b..42e63eed38545 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -151,6 +151,7 @@ fn to_pg_rows( session_data: &StaticSessionData, ) -> RwResult> { assert_eq!(chunk.dimension(), column_types.len()); + debug_assert_eq!(chunk.data_types(), column_types); chunk .rows() diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 0310fdbbd439b..faf057d2a3a84 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -70,6 +70,9 @@ impl LogicalSource { ctx: OptimizerContextRef, as_of: Option, ) -> Result { + // XXX: should we reorder the columns? + // The order may be strange if the schema is changed, e.g., [foo:Varchar, _rw_kafka_timestamp:Timestamptz, _row_id:Serial, bar:Int32] + // related: https://github.com/risingwavelabs/risingwave/issues/16486 let core = generic::Source { catalog: source_catalog, column_catalog,