Skip to content

Commit

Permalink
fix(source): fix panic for alter source with sr
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 18, 2024
1 parent f7d5a3f commit 8ccdb38
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 76 deletions.
82 changes: 12 additions & 70 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -51,79 +51,21 @@ No tracking issue yet. Feel free to submit a feature request at https://github.c
statement ok
alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');


query ?
explain (verbose, trace) select * from s;
query ??
select * from s
----
Begin:
(empty)
LogicalProject { exprs: [foo, bar] }
└─LogicalSource { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar] }
(empty)
Rewrite Source For Batch:
(empty)
apply SourceToKafkaScanRule 1 time(s)
(empty)
LogicalProject { exprs: [foo, bar] }
└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) }
(empty)
Predicate Push Down:
(empty)
LogicalProject { exprs: [foo, bar] }
└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) }
(empty)
Prune Columns:
(empty)
LogicalProject { exprs: [foo, bar] }
└─LogicalProject { exprs: [foo, bar] }
└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) }
(empty)
Project Remove:
(empty)
apply ProjectMergeRule 1 time(s)
(empty)
LogicalProject { exprs: [foo, bar] }
└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) }
(empty)
Const eval exprs:
(empty)
LogicalProject { exprs: [foo, bar] }
└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) }
(empty)
To Batch Plan:
(empty)
BatchProject { exprs: [foo, bar] }
└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) }
(empty)
Inline Session Timezone:
(empty)
BatchProject { exprs: [foo, bar] }
└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) }
(empty)
To Batch Physical Plan:
(empty)
BatchProject { exprs: [foo, bar] }
└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) }
(empty)
To Batch Distributed Plan:
(empty)
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [foo, bar] }
└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) }
(empty)

# query error
# select * from s
# ----
# db error: ERROR: Panicked when handling the request: assertion `left == right` failed
# left: [Varchar, Varchar]
# right: [Varchar, Int32]
# This is a bug. We would appreciate a bug report at:
# https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
ABC 1

statement ok
create materialized view mv as select * from s;


# statement ok
# drop source s;
sleep 2s

query ??
select * from mv
----
ABC 1

statement ok
drop source s;
1 change: 1 addition & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,7 @@ pub mod test_utils {
}
}

/// Note: this is created in `SourceReader::build_stream`
#[derive(Debug, Clone, Default)]
pub struct ParserConfig {
pub common: CommonParserConfig,
Expand Down
13 changes: 13 additions & 0 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,19 @@ impl Binder {
source_catalog: &SourceCatalog,
as_of: Option<AsOf>,
) -> (Relation, Vec<(bool, Field)>) {
if cfg!(debug_assertions) {
let columns = &source_catalog.columns;
assert!(
columns
.iter()
.map(|c| c.column_id())
.duplicates()
.next()
.is_none(),
"duplicate ColumnId found in source catalog. Columns: {:?}",
columns
);
}
self.included_relations.insert(source_catalog.id.into());
(
Relation::Source(Box::new(BoundSource {
Expand Down
14 changes: 9 additions & 5 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::ColumnId;
use risingwave_common::catalog::{ColumnCatalog, ColumnId};
use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct};
use risingwave_sqlparser::ast::{
AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement,
Expand Down Expand Up @@ -106,10 +106,7 @@ pub async fn handle_alter_source_column(
catalog.definition =
alter_definition_add_column(&catalog.definition, column_def.clone())?;
let mut bound_column = bind_sql_columns(&[column_def])?.remove(0);
bound_column.column_desc.column_id = columns
.iter()
.fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id()))
.next();
bound_column.column_desc.column_id = max_column_id(columns).next();
columns.push(bound_column);
}
_ => unreachable!(),
Expand Down Expand Up @@ -147,6 +144,13 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul
Ok(stmt.to_string())
}

pub fn max_column_id(columns: &Vec<ColumnCatalog>) -> ColumnId {
// XXX: should we check the column IDs of struct fields here?
columns
.iter()
.fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id()))
}

#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_sqlparser::ast::{
};
use risingwave_sqlparser::parser::Parser;

use super::alter_source_column::max_column_id;
use super::alter_table_column::schema_has_schema_registry;
use super::create_source::{bind_columns_from_source, validate_compatibility};
use super::util::SourceSchemaCompatExt;
Expand Down Expand Up @@ -162,7 +163,13 @@ pub async fn refresh_sr_and_get_columns_diff(
unreachable!("source without schema registry is rejected")
};

let added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns);
let mut added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns);
// The newly resolved columns' column IDs also starts from 0. They cannot be used directly.
let mut next_col_id = max_column_id(&original_source.columns).next();
for col in &mut added_columns {
col.column_desc.column_id = next_col_id;
next_col_id = next_col_id.next();
}
let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source);

Ok((source_info, added_columns, dropped_columns))
Expand Down
22 changes: 22 additions & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,28 @@ pub(crate) async fn bind_columns_from_source(
}
};

if cfg!(debug_assertions) {
// validate column ids
// Note: this just documents how it works currently. It doesn't mean whether it's reasonable.
if let Some(ref columns) = columns {
let mut i = 1;
fn check_col(col: &ColumnDesc, i: &mut usize, columns: &Vec<ColumnCatalog>) {
assert!(
col.column_id.get_id() == *i as i32,
"unexpected column id, col: {col:?}, i: {i}, columns: {columns:?}"
);
*i += 1;
}
for col in columns {
check_col(&col.column_desc, &mut i, columns);
for nested_col in &col.column_desc.field_descs {
// What's the usage of struct fields' column IDs?
check_col(nested_col, &mut i, columns);
}
}
}
}

if !format_encode_options_to_consume.is_empty() {
let err_string = format!(
"Get unknown format_encode_options for {:?} {:?}: {}",
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ fn to_pg_rows(
session_data: &StaticSessionData,
) -> RwResult<Vec<Row>> {
assert_eq!(chunk.dimension(), column_types.len());
debug_assert_eq!(chunk.data_types(), column_types);

chunk
.rows()
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl LogicalSource {
ctx: OptimizerContextRef,
as_of: Option<AsOf>,
) -> Result<Self> {
// XXX: should we reorder the columns?
// The order may be strange if the schema is changed, e.g., [foo:Varchar, _rw_kafka_timestamp:Timestamptz, _row_id:Serial, bar:Int32]
// related: https://github.com/risingwavelabs/risingwave/issues/16486
let core = generic::Source {
catalog: source_catalog,
column_catalog,
Expand Down

0 comments on commit 8ccdb38

Please sign in to comment.