From 073f0cc8f4be808e54f63c370fd774eb9628d715 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 18 Jun 2024 12:44:53 +0800 Subject: [PATCH 01/13] add tests Signed-off-by: xxchan --- e2e_test/source_inline/commands.toml | 6 + .../source_inline/kafka/avro/alter_source.slt | 129 ++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 e2e_test/source_inline/kafka/avro/alter_source.slt diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml index 48342bceafd42..57d09d8237efa 100644 --- a/e2e_test/source_inline/commands.toml +++ b/e2e_test/source_inline/commands.toml @@ -37,6 +37,12 @@ set -e if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then echo "Deleting all Kafka topics..." rpk topic delete -r "*" + echo "Deleting all schema registry subjects" + rpk sr subject list | while read -r subject; do + echo "Deleting schema registry subject: $subject" + rpk sr subject delete "$subject" + rpk sr subject delete "$subject" --permanent + done else echo "No Kafka to clean." fi diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt new file mode 100644 index 0000000000000..15d48d9df0a18 --- /dev/null +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -0,0 +1,129 @@ +control substitution on + +# https://github.com/risingwavelabs/risingwave/issues/16486 + +# cleanup +system ok +rpk topic delete 'avro_alter_source_test' || true; \\ +(rpk sr subject delete 'avro_alter_source_test-value' && rpk sr subject delete 'avro_alter_source_test-value' --permanent) || true; + +# create topic and sr subject +system ok +rpk topic create 'avro_alter_source_test' + +system ok +echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\ +| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions' + +statement ok +create source s +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'avro_alter_source_test' +) +FORMAT PLAIN ENCODE AVRO ( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}' +); + +# create a new version of schema and produce a message +system ok +echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\ +| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions' + +system ok +echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test + +query ? +select * from s +---- +ABC + +statement error +alter source s format plain encode json; +---- +db error: ERROR: Failed to run the query + +Caused by: + Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Avro, and altering them is not supported yet +No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + + +statement ok +alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); + + +query ? +explain (verbose, trace) select * from s; +---- +Begin: +(empty) +LogicalProject { exprs: [foo, bar] } +└─LogicalSource { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar] } +(empty) +Rewrite Source For Batch: +(empty) +apply SourceToKafkaScanRule 1 time(s) +(empty) +LogicalProject { exprs: [foo, bar] } +└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } +(empty) +Predicate Push Down: +(empty) +LogicalProject { exprs: [foo, bar] } +└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } +(empty) +Prune Columns: +(empty) +LogicalProject { exprs: [foo, bar] } +└─LogicalProject { exprs: [foo, bar] } + └─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } +(empty) +Project Remove: +(empty) +apply ProjectMergeRule 1 time(s) +(empty) +LogicalProject { exprs: [foo, bar] } +└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } +(empty) +Const eval exprs: +(empty) +LogicalProject { exprs: [foo, bar] } +└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } +(empty) +To Batch Plan: +(empty) +BatchProject { exprs: [foo, bar] } +└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } +(empty) +Inline Session Timezone: +(empty) +BatchProject { exprs: [foo, bar] } +└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } +(empty) +To Batch Physical Plan: +(empty) +BatchProject { exprs: [foo, bar] } +└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } +(empty) +To Batch Distributed Plan: +(empty) +BatchExchange { order: [], dist: Single } +└─BatchProject { exprs: [foo, bar] } + └─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } +(empty) + +# query error +# select * from s +# ---- +# db error: ERROR: Panicked when handling the request: assertion `left == right` failed +# left: [Varchar, Varchar] +# right: [Varchar, Int32] +# This is a bug. We would appreciate a bug report at: +# https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml + +statement ok +create materialized view mv as select * from s; + + +# statement ok +# drop source s; From 12473cf168feec006d3cc9389d665d91ee9f2c82 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 18 Jun 2024 13:28:17 +0800 Subject: [PATCH 02/13] fix(source): fix panic for alter source with sr --- .../source_inline/kafka/avro/alter_source.slt | 82 +++---------------- src/connector/src/parser/mod.rs | 1 + .../src/binder/relation/table_or_source.rs | 13 +++ .../src/handler/alter_source_column.rs | 14 ++-- .../src/handler/alter_source_with_sr.rs | 9 +- src/frontend/src/handler/create_source.rs | 22 +++++ src/frontend/src/handler/util.rs | 1 + .../src/optimizer/plan_node/logical_source.rs | 3 + 8 files changed, 69 insertions(+), 76 deletions(-) diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index 15d48d9df0a18..75dc8a7d03778 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -51,79 +51,21 @@ No tracking issue yet. Feel free to submit a feature request at https://github.c statement ok alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); - -query ? -explain (verbose, trace) select * from s; +query ?? +select * from s ---- -Begin: -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalSource { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar] } -(empty) -Rewrite Source For Batch: -(empty) -apply SourceToKafkaScanRule 1 time(s) -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } -(empty) -Predicate Push Down: -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } -(empty) -Prune Columns: -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalProject { exprs: [foo, bar] } - └─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } -(empty) -Project Remove: -(empty) -apply ProjectMergeRule 1 time(s) -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } -(empty) -Const eval exprs: -(empty) -LogicalProject { exprs: [foo, bar] } -└─LogicalKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], time_range: (Unbounded, Unbounded) } -(empty) -To Batch Plan: -(empty) -BatchProject { exprs: [foo, bar] } -└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } -(empty) -Inline Session Timezone: -(empty) -BatchProject { exprs: [foo, bar] } -└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } -(empty) -To Batch Physical Plan: -(empty) -BatchProject { exprs: [foo, bar] } -└─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } -(empty) -To Batch Distributed Plan: -(empty) -BatchExchange { order: [], dist: Single } -└─BatchProject { exprs: [foo, bar] } - └─BatchKafkaScan { source: s, columns: [foo, _rw_kafka_timestamp, _row_id, bar], filter: (None, None) } -(empty) - -# query error -# select * from s -# ---- -# db error: ERROR: Panicked when handling the request: assertion `left == right` failed -# left: [Varchar, Varchar] -# right: [Varchar, Int32] -# This is a bug. We would appreciate a bug report at: -# https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml +ABC 1 statement ok create materialized view mv as select * from s; -# statement ok -# drop source s; +sleep 2s + +query ?? +select * from mv +---- +ABC 1 + +statement ok +drop source s; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index f7667a66a3747..26cf746b535dc 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1028,6 +1028,7 @@ pub mod test_utils { } } +/// Note: this is created in `SourceReader::build_stream` #[derive(Debug, Clone, Default)] pub struct ParserConfig { pub common: CommonParserConfig, diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index c5283a2cc592a..067641178fdc9 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -221,6 +221,19 @@ impl Binder { source_catalog: &SourceCatalog, as_of: Option, ) -> (Relation, Vec<(bool, Field)>) { + if cfg!(debug_assertions) { + let columns = &source_catalog.columns; + assert!( + columns + .iter() + .map(|c| c.column_id()) + .duplicates() + .next() + .is_none(), + "duplicate ColumnId found in source catalog. Columns: {:?}", + columns + ); + } self.included_relations.insert(source_catalog.id.into()); ( Relation::Source(Box::new(BoundSource { diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index fcabedc1149c4..45a22405499fd 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::catalog::ColumnId; +use risingwave_common::catalog::{ColumnCatalog, ColumnId}; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; use risingwave_sqlparser::ast::{ AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, @@ -106,10 +106,7 @@ pub async fn handle_alter_source_column( catalog.definition = alter_definition_add_column(&catalog.definition, column_def.clone())?; let mut bound_column = bind_sql_columns(&[column_def])?.remove(0); - bound_column.column_desc.column_id = columns - .iter() - .fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id())) - .next(); + bound_column.column_desc.column_id = max_column_id(columns).next(); columns.push(bound_column); } _ => unreachable!(), @@ -147,6 +144,13 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul Ok(stmt.to_string()) } +pub fn max_column_id(columns: &Vec) -> ColumnId { + // XXX: should we check the column IDs of struct fields here? + columns + .iter() + .fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id())) +} + #[cfg(test)] pub mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index c72cf547365d7..6452e820b13e1 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -28,6 +28,7 @@ use risingwave_sqlparser::ast::{ }; use risingwave_sqlparser::parser::Parser; +use super::alter_source_column::max_column_id; use super::alter_table_column::schema_has_schema_registry; use super::create_source::{bind_columns_from_source, validate_compatibility}; use super::util::SourceSchemaCompatExt; @@ -162,7 +163,13 @@ pub async fn refresh_sr_and_get_columns_diff( unreachable!("source without schema registry is rejected") }; - let added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); + let mut added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); + // The newly resolved columns' column IDs also starts from 0. They cannot be used directly. + let mut next_col_id = max_column_id(&original_source.columns).next(); + for col in &mut added_columns { + col.column_desc.column_id = next_col_id; + next_col_id = next_col_id.next(); + } let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source); Ok((source_info, added_columns, dropped_columns)) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a29aa86907e0f..31df775c0f43c 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -489,6 +489,28 @@ pub(crate) async fn bind_columns_from_source( } }; + if cfg!(debug_assertions) { + // validate column ids + // Note: this just documents how it works currently. It doesn't mean whether it's reasonable. + if let Some(ref columns) = columns { + let mut i = 1; + fn check_col(col: &ColumnDesc, i: &mut usize, columns: &Vec) { + assert!( + col.column_id.get_id() == *i as i32, + "unexpected column id, col: {col:?}, i: {i}, columns: {columns:?}" + ); + *i += 1; + } + for col in columns { + check_col(&col.column_desc, &mut i, columns); + for nested_col in &col.column_desc.field_descs { + // What's the usage of struct fields' column IDs? + check_col(nested_col, &mut i, columns); + } + } + } + } + if !format_encode_options_to_consume.is_empty() { let err_string = format!( "Get unknown format_encode_options for {:?} {:?}: {}", diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 7fd4f0b92822b..42e63eed38545 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -151,6 +151,7 @@ fn to_pg_rows( session_data: &StaticSessionData, ) -> RwResult> { assert_eq!(chunk.dimension(), column_types.len()); + debug_assert_eq!(chunk.data_types(), column_types); chunk .rows() diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 0310fdbbd439b..faf057d2a3a84 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -70,6 +70,9 @@ impl LogicalSource { ctx: OptimizerContextRef, as_of: Option, ) -> Result { + // XXX: should we reorder the columns? + // The order may be strange if the schema is changed, e.g., [foo:Varchar, _rw_kafka_timestamp:Timestamptz, _row_id:Serial, bar:Int32] + // related: https://github.com/risingwavelabs/risingwave/issues/16486 let core = generic::Source { catalog: source_catalog, column_catalog, From ec5cf0eba422efe090ecf2d4266b25888afdc04c Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 18 Jun 2024 14:29:21 +0800 Subject: [PATCH 03/13] refine --- ci/scripts/e2e-source-test.sh | 1 - e2e_test/schema_registry/alter_sr.slt | 39 ++++++++++++++--------- e2e_test/schema_registry/pb.py | 2 +- src/frontend/src/handler/create_source.rs | 2 +- src/frontend/src/handler/util.rs | 10 +++++- 5 files changed, 35 insertions(+), 19 deletions(-) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 84fff651b547c..1c80d2b08daed 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -139,7 +139,6 @@ echo "--- e2e, ci-1cn-1fe, protobuf schema registry" export RISINGWAVE_CI=true RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-1cn-1fe -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 20 user echo "make sure google/protobuf/source_context.proto is NOT in schema registry" curl --silent 'http://schemaregistry:8082/subjects'; echo # curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404 diff --git a/e2e_test/schema_registry/alter_sr.slt b/e2e_test/schema_registry/alter_sr.slt index d703c0401a35e..49fdb897f2eaf 100644 --- a/e2e_test/schema_registry/alter_sr.slt +++ b/e2e_test/schema_registry/alter_sr.slt @@ -1,15 +1,20 @@ -# Before running this test, seed data into kafka: -# python3 e2e_test/schema_registry/pb.py +control substitution on + +system ok +rpk topic delete sr_pb_test || true; \\ +(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true; + +system ok +python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user statement ok CREATE SOURCE src_user WITH ( - connector = 'kafka', + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = 'http://schemaregistry:8082', + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', message = 'test.User' ); @@ -18,13 +23,12 @@ CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; statement ok CREATE TABLE t_user WITH ( - connector = 'kafka', + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = 'http://schemaregistry:8082', + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', message = 'test.User' ); @@ -36,7 +40,7 @@ SELECT age FROM t_user; # Push more events with extended fields system ok -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields +python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields sleep 5s @@ -47,25 +51,30 @@ ALTER SOURCE src_user REFRESH SCHEMA; statement ok CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; -# Refresh table schema +# Refresh table schema. It consume new data before refresh, so the new fields are NULLs statement ok ALTER TABLE t_user REFRESH SCHEMA; -query IIII +query ???? SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more; ---- -25 4 0 10 +25 104 0 510 + +query ???? +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; +---- +25 NULL NULL NULL # Push more events with extended fields system ok -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields +python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields sleep 5s -query IIII +query ???? SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; ---- -30 4 0 10 +30 104 100 510 statement ok DROP MATERIALIZED VIEW mv_user_more; diff --git a/e2e_test/schema_registry/pb.py b/e2e_test/schema_registry/pb.py index fd6e0dc478b51..f970353be56ae 100644 --- a/e2e_test/schema_registry/pb.py +++ b/e2e_test/schema_registry/pb.py @@ -33,7 +33,7 @@ def get_user_with_more_fields(i): city="City_{}".format(i), gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE, sc=SourceContext(file_name="source/context_{:03}.proto".format(i)), - age=i, + age=100 + i, ) def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message): diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 31df775c0f43c..4a762bd75603c 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -502,11 +502,11 @@ pub(crate) async fn bind_columns_from_source( *i += 1; } for col in columns { - check_col(&col.column_desc, &mut i, columns); for nested_col in &col.column_desc.field_descs { // What's the usage of struct fields' column IDs? check_col(nested_col, &mut i, columns); } + check_col(&col.column_desc, &mut i, columns); } } } diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 42e63eed38545..73b52b977c7a4 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -151,7 +151,15 @@ fn to_pg_rows( session_data: &StaticSessionData, ) -> RwResult> { assert_eq!(chunk.dimension(), column_types.len()); - debug_assert_eq!(chunk.data_types(), column_types); + if cfg!(debug_assertions) { + let chunk_data_types = chunk.data_types(); + for (ty1, ty2) in chunk_data_types.iter().zip_eq_fast(column_types) { + debug_assert!( + ty1.equals_datatype(ty2), + "chunk_data_types: {chunk_data_types:?}, column_types: {column_types:?}" + ) + } + } chunk .rows() From 236c79d4c01c63bec931f0d4832d7841b94915ef Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 18 Jun 2024 16:56:40 +0800 Subject: [PATCH 04/13] refine --- src/common/src/catalog/column.rs | 2 +- .../src/handler/alter_source_with_sr.rs | 6 ++++++ src/frontend/src/handler/create_source.rs | 19 ++++++++++++------- src/frontend/src/handler/create_table.rs | 15 ++++++++------- src/frontend/src/handler/explain.rs | 3 --- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 82d2f22f41cb4..33fa0dd4e7302 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -430,7 +430,7 @@ pub fn columns_extend(preserved_columns: &mut Vec, columns: Vec bool { +pub fn is_column_ids_distinct(columns: &[ColumnCatalog]) -> bool { let mut column_ids = columns .iter() .map(|column| column.column_id().get_id()) diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 6452e820b13e1..e1caeb588b145 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -171,6 +171,12 @@ pub async fn refresh_sr_and_get_columns_diff( next_col_id = next_col_id.next(); } let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source); + tracing::debug!( + added_columns = ?added_columns, + dropped_columns = ?dropped_columns, + columns_from_resolve_source = ?columns_from_resolve_source, + original_source = ?original_source.columns + ); Ok((source_info, added_columns, dropped_columns)) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 4a762bd75603c..a022b5add3989 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -24,7 +24,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::{FromArrow, IcebergArrowConvert}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ - is_column_ids_dedup, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId, + is_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_common::types::DataType; @@ -73,8 +73,8 @@ use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported use crate::error::{Result, RwError}; use crate::expr::Expr; use crate::handler::create_table::{ - bind_pk_on_relation, bind_sql_column_constraints, bind_sql_columns, bind_sql_pk_names, - ensure_table_constraints_supported, ColumnIdGenerator, + bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns, + bind_sql_pk_names, ensure_table_constraints_supported, ColumnIdGenerator, }; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; @@ -289,8 +289,11 @@ fn get_name_strategy_or_default(name_strategy: Option) -> Result for more information. -/// return `(columns, source info)` +/// Resolves the schema of the source from external schema file. +/// See for more information. +/// +/// Note: the returned schema strictly corresponds to the schema. +/// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included. pub(crate) async fn bind_columns_from_source( session: &SessionImpl, source_schema: &ConnectorSchema, @@ -1409,10 +1412,12 @@ pub async fn bind_create_source( .into()); } + // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary. + // XXX: should we also chenge the col id for struct fields? for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) } - debug_assert!(is_column_ids_dedup(&columns)); + debug_assert!(is_column_ids_distinct(&columns)); let must_need_pk = if is_create_source { with_properties.connector_need_pk() @@ -1425,7 +1430,7 @@ pub async fn bind_create_source( }; let (mut columns, pk_col_ids, row_id_index) = - bind_pk_on_relation(columns, pk_names, must_need_pk)?; + bind_pk_and_row_id_on_relation(columns, pk_names, must_need_pk)?; let watermark_descs = bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 0f3693653ced5..1c5cf382ffec7 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -404,7 +404,7 @@ fn multiple_pk_definition_err() -> RwError { /// /// It returns the columns together with `pk_column_ids`, and an optional row id column index if /// added. -pub fn bind_pk_on_relation( +pub fn bind_pk_and_row_id_on_relation( mut columns: Vec, pk_names: Vec, must_need_pk: bool, @@ -570,7 +570,8 @@ pub(crate) fn gen_create_table_plan_without_source( ) -> Result<(PlanRef, PbTable)> { ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; + let (mut columns, pk_column_ids, row_id_index) = + bind_pk_and_row_id_on_relation(columns, pk_names, true)?; let watermark_descs: Vec = bind_source_watermark( context.session_ctx(), @@ -762,7 +763,8 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( c.column_desc.column_id = col_id_gen.generate(c.name()) } - let (columns, pk_column_ids, _row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; + let (columns, pk_column_ids, _row_id_index) = + bind_pk_and_row_id_on_relation(columns, pk_names, true)?; let definition = context.normalized_sql().to_owned(); @@ -881,7 +883,6 @@ fn derive_connect_properties( pub(super) async fn handle_create_table_plan( handler_args: HandlerArgs, explain_options: ExplainOptions, - col_id_gen: ColumnIdGenerator, source_schema: Option, cdc_table_info: Option, table_name: ObjectName, @@ -894,6 +895,7 @@ pub(super) async fn handle_create_table_plan( with_version_column: Option, include_column_options: IncludeOption, ) -> Result<(PlanRef, Option, PbTable, TableJobType)> { + let col_id_gen = ColumnIdGenerator::new_initial(); let source_schema = check_create_table_with_source( &handler_args.with_options, source_schema, @@ -1148,11 +1150,9 @@ pub async fn handle_create_table( } let (graph, source, table, job_type) = { - let col_id_gen = ColumnIdGenerator::new_initial(); let (plan, source, table, job_type) = handle_create_table_plan( handler_args, ExplainOptions::default(), - col_id_gen, source_schema, cdc_table_info, table_name.clone(), @@ -1435,7 +1435,8 @@ mod tests { } ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names, true)?; + let (_, pk_column_ids, _) = + bind_pk_and_row_id_on_relation(columns, pk_names, true)?; Ok(pk_column_ids) })(); match (expected, actual) { diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 9f46087c206e8..e5ba692372d33 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -66,14 +66,11 @@ async fn do_handle_explain( wildcard_idx, .. } => { - let col_id_gen = ColumnIdGenerator::new_initial(); - let source_schema = source_schema.map(|s| s.into_v2_with_warning()); let (plan, _source, _table, _job_type) = handle_create_table_plan( handler_args, explain_options, - col_id_gen, source_schema, cdc_table_info, name.clone(), From 10ee1ace18607ce9e9644e2d164d24de8d35d0ae Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 18 Jun 2024 17:10:49 +0800 Subject: [PATCH 05/13] include timestamp --- e2e_test/schema_registry/alter_sr.slt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/e2e_test/schema_registry/alter_sr.slt b/e2e_test/schema_registry/alter_sr.slt index 49fdb897f2eaf..17bb7f1ef23e3 100644 --- a/e2e_test/schema_registry/alter_sr.slt +++ b/e2e_test/schema_registry/alter_sr.slt @@ -8,7 +8,9 @@ system ok python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user statement ok -CREATE SOURCE src_user WITH ( +CREATE SOURCE src_user +INCLUDE timestamp +WITH ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'sr_pb_test', scan.startup.mode = 'earliest' From f7b51b5b1b5392beb9665f989fdc5b30b2d24e71 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 18 Jun 2024 17:37:59 +0800 Subject: [PATCH 06/13] fix --- e2e_test/source_inline/kafka/avro/alter_source.slt | 4 ++++ src/frontend/src/handler/create_source.rs | 10 +++++----- src/frontend/src/handler/explain.rs | 1 - 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index 75dc8a7d03778..db53c0d40a8b7 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -15,6 +15,8 @@ system ok echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\ | curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions' +sleep 5s + statement ok create source s WITH ( @@ -30,6 +32,8 @@ system ok echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\ | curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions' +sleep 5s + system ok echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a022b5add3989..25794e7e7dc2f 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -498,17 +498,17 @@ pub(crate) async fn bind_columns_from_source( if let Some(ref columns) = columns { let mut i = 1; fn check_col(col: &ColumnDesc, i: &mut usize, columns: &Vec) { + for nested_col in &col.field_descs { + // What's the usage of struct fields' column IDs? + check_col(nested_col, i, columns); + } assert!( col.column_id.get_id() == *i as i32, - "unexpected column id, col: {col:?}, i: {i}, columns: {columns:?}" + "unexpected column id\ncol: {col:?}\ni: {i}\ncolumns: {columns:#?}" ); *i += 1; } for col in columns { - for nested_col in &col.column_desc.field_descs { - // What's the usage of struct fields' column IDs? - check_col(nested_col, &mut i, columns); - } check_col(&col.column_desc, &mut i, columns); } } diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index e5ba692372d33..db124b373181b 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -22,7 +22,6 @@ use thiserror_ext::AsReport; use super::create_index::{gen_create_index_plan, resolve_index_schema}; use super::create_mv::gen_create_mv_plan; use super::create_sink::{gen_sink_plan, get_partition_compute_info}; -use super::create_table::ColumnIdGenerator; use super::query::gen_batch_plan_by_statement; use super::util::SourceSchemaCompatExt; use super::{RwPgResponse, RwPgResponseBuilderExt}; From c7f5f5f0972a10fe564a906ccb28c28d53d8eb26 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 18 Jun 2024 17:56:50 +0800 Subject: [PATCH 07/13] fix --- ci/scripts/e2e-source-test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 1c80d2b08daed..919046bea586b 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -33,6 +33,7 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- Install dependencies" python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema +apt-get -y install jq echo "--- e2e, inline test" RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ From 8afeaf179790112088070eeb3ee8e420916c61e6 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 18 Jun 2024 19:33:16 +0800 Subject: [PATCH 08/13] fix --- .../source_inline/kafka/avro/alter_source.slt | 6 ++---- src/common/src/catalog/column.rs | 19 ++++++++++--------- .../src/binder/relation/table_or_source.rs | 16 ++-------------- src/frontend/src/catalog/table_catalog.rs | 2 +- .../src/handler/alter_source_column.rs | 7 +++++++ .../src/handler/alter_source_with_sr.rs | 2 +- src/frontend/src/handler/create_source.rs | 4 ++-- 7 files changed, 25 insertions(+), 31 deletions(-) diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index db53c0d40a8b7..c22648d794224 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -15,8 +15,6 @@ system ok echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\ | curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions' -sleep 5s - statement ok create source s WITH ( @@ -32,11 +30,11 @@ system ok echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\ | curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions' -sleep 5s - system ok echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test +sleep 5s + query ? select * from s ---- diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 33fa0dd4e7302..8e1fb00b60779 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -430,15 +430,16 @@ pub fn columns_extend(preserved_columns: &mut Vec, columns: Vec bool { - let mut column_ids = columns - .iter() - .map(|column| column.column_id().get_id()) - .collect_vec(); - column_ids.sort(); - let original_len = column_ids.len(); - column_ids.dedup(); - column_ids.len() == original_len +pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) { + debug_assert!( + columns + .iter() + .map(|c| c.column_id()) + .duplicates() + .next() + .is_none(), + "duplicate ColumnId found in source catalog. Columns: {columns:#?}" + ); } #[cfg(test)] diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 067641178fdc9..94b2980fcb63d 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use either::Either; use itertools::Itertools; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::{is_system_schema, Field}; +use risingwave_common::catalog::{debug_assert_column_ids_distinct, is_system_schema, Field}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_connector::WithPropertiesExt; use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias}; @@ -221,19 +221,7 @@ impl Binder { source_catalog: &SourceCatalog, as_of: Option, ) -> (Relation, Vec<(bool, Field)>) { - if cfg!(debug_assertions) { - let columns = &source_catalog.columns; - assert!( - columns - .iter() - .map(|c| c.column_id()) - .duplicates() - .next() - .is_none(), - "duplicate ColumnId found in source catalog. Columns: {:?}", - columns - ); - } + debug_assert_column_ids_distinct(&source_catalog.columns); self.included_relations.insert(source_catalog.id.into()); ( Relation::Source(Box::new(BoundSource { diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index bfe4537fa7085..d7c2fd3778c79 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -210,7 +210,7 @@ impl TableType { } } -/// The version of a table, used by schema change. See [`PbTableVersion`]. +/// The version of a table, used by schema change. See [`PbTableVersion`] for more details. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct TableVersion { pub version_id: TableVersionId, diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 45a22405499fd..b30bbf1264796 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -144,6 +144,13 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul Ok(stmt.to_string()) } +/// FIXME: perhapts we should use sth like `ColumnIdGenerator::new_alter`, +/// However, the `SourceVersion` is problematic: It doesn't contain `next_col_id`. +/// (But for now this isn't a large problem, since drop column is not allowed for source yet..) +/// +/// Besides, the logic of column id handling is a mess. +/// In some places, we use `ColumnId::placeholder()`, and use `col_id_gen` to fill it at the end; +/// In other places, we create column id ad-hoc. pub fn max_column_id(columns: &Vec) -> ColumnId { // XXX: should we check the column IDs of struct fields here? columns diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index e1caeb588b145..e0ff9fe68c7d8 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -164,7 +164,7 @@ pub async fn refresh_sr_and_get_columns_diff( }; let mut added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); - // The newly resolved columns' column IDs also starts from 0. They cannot be used directly. + // The newly resolved columns' column IDs also starts from 1. They cannot be used directly. let mut next_col_id = max_column_id(&original_source.columns).next(); for col in &mut added_columns { col.column_desc.column_id = next_col_id; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 25794e7e7dc2f..1f458bbbc09b5 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -24,7 +24,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::{FromArrow, IcebergArrowConvert}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ - is_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId, + debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_common::types::DataType; @@ -1417,7 +1417,7 @@ pub async fn bind_create_source( for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) } - debug_assert!(is_column_ids_distinct(&columns)); + debug_assert_column_ids_distinct(&columns); let must_need_pk = if is_create_source { with_properties.connector_need_pk() From c4fec4c9d87f5f5a18ac951bb0bfc9247a39d9c0 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 19 Jun 2024 14:45:48 +0800 Subject: [PATCH 09/13] debug Signed-off-by: xxchan --- e2e_test/source_inline/kafka/avro/alter_source.slt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index c22648d794224..81a878bc2baca 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -35,6 +35,12 @@ echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_s sleep 5s +system ok +rpk topic consume avro_alter_source_test --offset :end | jq .value +---- +"\u0000\u0000\u0000\u0000\u0010\u0002\u0006ABC" + + query ? select * from s ---- @@ -70,4 +76,4 @@ select * from mv ABC 1 statement ok -drop source s; +drop source s cascade; From 02e66681bb77a0a6a2696873a90c11ae521afd35 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 19 Jun 2024 15:09:35 +0800 Subject: [PATCH 10/13] resolve comments Signed-off-by: xxchan --- src/common/src/catalog/column.rs | 6 +++++- src/frontend/src/handler/alter_source_column.rs | 2 +- src/frontend/src/handler/alter_source_with_sr.rs | 4 ++-- src/frontend/src/handler/create_table.rs | 4 ++-- src/frontend/src/optimizer/plan_node/logical_source.rs | 3 +++ 5 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 8e1fb00b60779..cfbc91136592b 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -21,7 +21,7 @@ use risingwave_pb::plan_common::{ AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, }; -use super::row_id_column_desc; +use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET}; use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID}; use crate::types::DataType; @@ -45,6 +45,10 @@ impl ColumnId { pub const fn placeholder() -> Self { Self(i32::MAX - 1) } + + pub const fn first_user_column() -> Self { + Self(USER_COLUMN_ID_OFFSET) + } } impl ColumnId { diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index b30bbf1264796..db54cce612a91 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -155,7 +155,7 @@ pub fn max_column_id(columns: &Vec) -> ColumnId { // XXX: should we check the column IDs of struct fields here? columns .iter() - .fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id())) + .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id())) } #[cfg(test)] diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index e0ff9fe68c7d8..1ef38264dab5a 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -173,8 +173,8 @@ pub async fn refresh_sr_and_get_columns_diff( let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source); tracing::debug!( added_columns = ?added_columns, - dropped_columns = ?dropped_columns, - columns_from_resolve_source = ?columns_from_resolve_source, + ?dropped_columns, + ?columns_from_resolve_source, original_source = ?original_source.columns ); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1c5cf382ffec7..c542762702053 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -24,7 +24,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, - INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET, + INITIAL_TABLE_VERSION_ID, }; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; @@ -110,7 +110,7 @@ impl ColumnIdGenerator { pub fn new_initial() -> Self { Self { existing: HashMap::new(), - next_column_id: ColumnId::from(USER_COLUMN_ID_OFFSET), + next_column_id: ColumnId::first_user_column(), version_id: INITIAL_TABLE_VERSION_ID, } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index faf057d2a3a84..918db2919e626 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -73,6 +73,9 @@ impl LogicalSource { // XXX: should we reorder the columns? // The order may be strange if the schema is changed, e.g., [foo:Varchar, _rw_kafka_timestamp:Timestamptz, _row_id:Serial, bar:Int32] // related: https://github.com/risingwavelabs/risingwave/issues/16486 + // The order does not matter much. The columns field is essentially a map indexed by the column id. + // It will affect what users will see in `SELECT *`. + // But not sure if we rely on the position of hidden column like `_row_id` somewhere. For `projected_row_id` we do so... let core = generic::Source { catalog: source_catalog, column_catalog, From 6acfac5208b1457e2490b23ef32b8803b143141a Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 19 Jun 2024 16:19:37 +0800 Subject: [PATCH 11/13] fix ci registry failure Signed-off-by: xxchan --- e2e_test/source_inline/kafka/avro/alter_source.slt | 11 ----------- src/risedevtool/src/risedev_env.rs | 10 +++------- 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index 81a878bc2baca..58baaba341592 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -33,14 +33,6 @@ echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","defau system ok echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test -sleep 5s - -system ok -rpk topic consume avro_alter_source_test --offset :end | jq .value ----- -"\u0000\u0000\u0000\u0000\u0010\u0002\u0006ABC" - - query ? select * from s ---- @@ -67,9 +59,6 @@ ABC 1 statement ok create materialized view mv as select * from s; - -sleep 2s - query ?? select * from mv ---- diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index 943f37abf655d..bff4062f72097 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -78,13 +78,9 @@ pub fn generate_risedev_env(services: &Vec) -> String { writeln!(env, r#"RPK_BROKERS="{brokers}""#).unwrap(); } ServiceConfig::SchemaRegistry(c) => { - let address = &c.address; - let port = &c.port; - writeln!( - env, - r#"RISEDEV_SCHEMA_REGISTRY_URL="http://{address}:{port}""#, - ) - .unwrap(); + let url = format!("http://{}:{}", c.address, c.port); + writeln!(env, r#"RISEDEV_SCHEMA_REGISTRY_URL="{url}""#,).unwrap(); + writeln!(env, r#"RPK_REGISTRY_HOSTS="{url}""#).unwrap(); } ServiceConfig::MySql(c) if c.application != Application::Metastore => { let host = &c.address; From 97e0a4d0a716fc8df9d09cfba342be8b1e4a3f35 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 19 Jun 2024 18:04:05 +0800 Subject: [PATCH 12/13] fix include column (hacky) --- e2e_test/schema_registry/alter_sr.slt | 2 +- e2e_test/source_inline/kafka/avro/alter_source.slt | 2 ++ src/common/src/catalog/column.rs | 5 +++++ src/frontend/src/handler/alter_source_with_sr.rs | 13 +++++++++---- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/e2e_test/schema_registry/alter_sr.slt b/e2e_test/schema_registry/alter_sr.slt index 17bb7f1ef23e3..051bebcd5ef32 100644 --- a/e2e_test/schema_registry/alter_sr.slt +++ b/e2e_test/schema_registry/alter_sr.slt @@ -9,7 +9,7 @@ python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${R statement ok CREATE SOURCE src_user -INCLUDE timestamp +INCLUDE timestamp -- include explicitly here to test a bug found in https://github.com/risingwavelabs/risingwave/pull/17293 WITH ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'sr_pb_test', diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index 58baaba341592..e60bf5c0295b0 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -59,6 +59,8 @@ ABC 1 statement ok create materialized view mv as select * from s; +sleep 2s + query ?? select * from mv ---- diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index cfbc91136592b..dde746bf3200a 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -350,6 +350,11 @@ impl ColumnCatalog { self.column_desc.is_default() } + /// If the columns is an `INCLUDE ... AS ...` connector column. + pub fn is_connector_additional_column(&self) -> bool { + self.column_desc.additional_column.column_type.is_some() + } + /// Get a reference to the column desc's data type. pub fn data_type(&self) -> &DataType { &self.column_desc.data_type diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 1ef38264dab5a..af87960547bc0 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -69,14 +69,19 @@ fn encode_type_to_encode(from: EncodeType) -> Option { }) } -/// Returns the columns in `columns_a` but not in `columns_b`, -/// where the comparison is done by name and data type, -/// and hidden columns are ignored. +/// Returns the columns in `columns_a` but not in `columns_b`. +/// +/// Note: +/// - The comparison is done by name and data type, without checking `ColumnId`. +/// - Hidden columns and `INCLUDE ... AS ...` columns are ignored. Because it's only for the special handling of alter sr. +/// For the newly resolved `columns_from_resolve_source` (created by [`bind_columns_from_source`]), it doesn't contain hidden columns (`_row_id`) and `INCLUDE ... AS ...` columns. +/// This is fragile and we should really refactor it later. fn columns_minus(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec { columns_a .iter() .filter(|col_a| { !col_a.is_hidden() + && !col_a.is_connector_additional_column() && !columns_b.iter().any(|col_b| { col_a.name() == col_b.name() && col_a.data_type() == col_b.data_type() }) @@ -172,7 +177,7 @@ pub async fn refresh_sr_and_get_columns_diff( } let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source); tracing::debug!( - added_columns = ?added_columns, + ?added_columns, ?dropped_columns, ?columns_from_resolve_source, original_source = ?original_source.columns From 55a0f98f083e2e6d6c48cab243a98cd11e735387 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 19 Jun 2024 19:46:20 +0800 Subject: [PATCH 13/13] move tests Signed-off-by: xxchan --- ci/scripts/e2e-source-test.sh | 13 ----- e2e_test/schema_registry/pb.slt | 50 ---------------- .../kafka/protobuf/alter_source.slt} | 6 +- .../source_inline/kafka/protobuf/basic.slt | 58 +++++++++++++++++++ .../kafka/protobuf}/pb.py | 31 +++++++--- .../kafka}/protobuf/user.proto | 0 .../kafka}/protobuf/user_pb2.py | 0 .../protobuf/user_with_more_fields.proto | 0 .../protobuf/user_with_more_fields_pb2.py | 0 9 files changed, 84 insertions(+), 74 deletions(-) delete mode 100644 e2e_test/schema_registry/pb.slt rename e2e_test/{schema_registry/alter_sr.slt => source_inline/kafka/protobuf/alter_source.slt} (80%) create mode 100644 e2e_test/source_inline/kafka/protobuf/basic.slt rename e2e_test/{schema_registry => source_inline/kafka/protobuf}/pb.py (74%) rename e2e_test/{schema_registry => source_inline/kafka}/protobuf/user.proto (100%) rename e2e_test/{schema_registry => source_inline/kafka}/protobuf/user_pb2.py (100%) rename e2e_test/{schema_registry => source_inline/kafka}/protobuf/user_with_more_fields.proto (100%) rename e2e_test/{schema_registry => source_inline/kafka}/protobuf/user_with_more_fields_pb2.py (100%) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 919046bea586b..c4b4713af81cc 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -135,20 +135,7 @@ risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt' echo "--- Kill cluster" risedev ci-kill - -echo "--- e2e, ci-1cn-1fe, protobuf schema registry" export RISINGWAVE_CI=true -RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ -risedev ci-start ci-1cn-1fe -echo "make sure google/protobuf/source_context.proto is NOT in schema registry" -curl --silent 'http://schemaregistry:8082/subjects'; echo -# curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404 -curl --silent 'http://schemaregistry:8082/subjects' | grep -v 'google/protobuf/source_context.proto' -risedev slt './e2e_test/schema_registry/pb.slt' -risedev slt './e2e_test/schema_registry/alter_sr.slt' - -echo "--- Kill cluster" -risedev ci-kill echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ diff --git a/e2e_test/schema_registry/pb.slt b/e2e_test/schema_registry/pb.slt deleted file mode 100644 index 7b60b4fa8d7a4..0000000000000 --- a/e2e_test/schema_registry/pb.slt +++ /dev/null @@ -1,50 +0,0 @@ -# Before running this test, seed data into kafka: -# python3 e2e_test/schema_registry/pb.py - -# Create a table. -statement ok -create table sr_pb_test with ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest') -FORMAT plain ENCODE protobuf( - schema.registry = 'http://schemaregistry:8082', - message = 'test.User' - ); - -# for multiple schema registry nodes -statement ok -create table sr_pb_test_bk with ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest') -FORMAT plain ENCODE protobuf( - schema.registry = 'http://schemaregistry:8082,http://schemaregistry:8082', - message = 'test.User' - ); - -# Wait for source -sleep 10s - -# Flush into storage -statement ok -flush; - -query I -select count(*) from sr_pb_test; ----- -20 - -query IIT -select min(id), max(id), max((sc).file_name) from sr_pb_test; ----- -0 19 source/context_019.proto - - -statement ok -drop table sr_pb_test; - -statement ok -drop table sr_pb_test_bk; diff --git a/e2e_test/schema_registry/alter_sr.slt b/e2e_test/source_inline/kafka/protobuf/alter_source.slt similarity index 80% rename from e2e_test/schema_registry/alter_sr.slt rename to e2e_test/source_inline/kafka/protobuf/alter_source.slt index 051bebcd5ef32..c9db2df3ca4ee 100644 --- a/e2e_test/schema_registry/alter_sr.slt +++ b/e2e_test/source_inline/kafka/protobuf/alter_source.slt @@ -5,7 +5,7 @@ rpk topic delete sr_pb_test || true; \\ (rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true; system ok -python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user statement ok CREATE SOURCE src_user @@ -42,7 +42,7 @@ SELECT age FROM t_user; # Push more events with extended fields system ok -python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields sleep 5s @@ -69,7 +69,7 @@ SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; # Push more events with extended fields system ok -python3 e2e_test/schema_registry/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields sleep 5s diff --git a/e2e_test/source_inline/kafka/protobuf/basic.slt b/e2e_test/source_inline/kafka/protobuf/basic.slt new file mode 100644 index 0000000000000..82eb61560aa4d --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/basic.slt @@ -0,0 +1,58 @@ +control substitution on + +system ok +rpk topic delete sr_pb_test || true; \\ +(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true; + +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user + +# make sure google/protobuf/source_context.proto is NOT in schema registry +system ok +curl --silent '${RISEDEV_SCHEMA_REGISTRY_URL}' | grep -v 'google/protobuf/source_context.proto' + +# Create a table. +statement ok +create table sr_pb_test with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest') +FORMAT plain ENCODE protobuf( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' + ); + +# for multiple schema registry nodes +statement ok +create table sr_pb_test_bk with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest') +FORMAT plain ENCODE protobuf( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL},${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' + ); + +# Wait for source +sleep 2s + +# Flush into storage +statement ok +flush; + +query I +select count(*) from sr_pb_test; +---- +20 + +query IT +select min(id), max(id), max((sc).file_name) from sr_pb_test; +---- +0 19 source/context_019.proto + + +statement ok +drop table sr_pb_test; + +statement ok +drop table sr_pb_test_bk; diff --git a/e2e_test/schema_registry/pb.py b/e2e_test/source_inline/kafka/protobuf/pb.py similarity index 74% rename from e2e_test/schema_registry/pb.py rename to e2e_test/source_inline/kafka/protobuf/pb.py index f970353be56ae..4cab50f899e50 100644 --- a/e2e_test/schema_registry/pb.py +++ b/e2e_test/source_inline/kafka/protobuf/pb.py @@ -25,6 +25,7 @@ def get_user(i): sc=SourceContext(file_name="source/context_{:03}.proto".format(i)), ) + def get_user_with_more_fields(i): return user_pb2.User( id=i, @@ -36,12 +37,15 @@ def get_user_with_more_fields(i): age=100 + i, ) -def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message): + +def send_to_kafka( + producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message +): schema_registry_client = SchemaRegistryClient(schema_registry_conf) serializer = ProtobufSerializer( pb_message, schema_registry_client, - {"use.deprecated.format": False, 'skip.known.types': True}, + {"use.deprecated.format": False, "skip.known.types": True}, ) producer = Producer(producer_conf) @@ -60,7 +64,9 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u if __name__ == "__main__": if len(sys.argv) < 6: - print("pb.py ") + print( + "pb.py " + ) exit(1) broker_list = sys.argv[1] @@ -69,20 +75,29 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u num_records = int(sys.argv[4]) pb_message = sys.argv[5] - user_pb2 = importlib.import_module(f'protobuf.{pb_message}_pb2') + user_pb2 = importlib.import_module(f"{pb_message}_pb2") all_pb_messages = { - 'user': get_user, - 'user_with_more_fields': get_user_with_more_fields, + "user": get_user, + "user_with_more_fields": get_user_with_more_fields, } - assert pb_message in all_pb_messages, f'pb_message must be one of {list(all_pb_messages.keys())}' + assert ( + pb_message in all_pb_messages + ), f"pb_message must be one of {list(all_pb_messages.keys())}" schema_registry_conf = {"url": schema_registry_url} producer_conf = {"bootstrap.servers": broker_list} try: - send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, all_pb_messages[pb_message], user_pb2.User) + send_to_kafka( + producer_conf, + schema_registry_conf, + topic, + num_records, + all_pb_messages[pb_message], + user_pb2.User, + ) except Exception as e: print("Send Protobuf data to schema registry and kafka failed {}", e) exit(1) diff --git a/e2e_test/schema_registry/protobuf/user.proto b/e2e_test/source_inline/kafka/protobuf/user.proto similarity index 100% rename from e2e_test/schema_registry/protobuf/user.proto rename to e2e_test/source_inline/kafka/protobuf/user.proto diff --git a/e2e_test/schema_registry/protobuf/user_pb2.py b/e2e_test/source_inline/kafka/protobuf/user_pb2.py similarity index 100% rename from e2e_test/schema_registry/protobuf/user_pb2.py rename to e2e_test/source_inline/kafka/protobuf/user_pb2.py diff --git a/e2e_test/schema_registry/protobuf/user_with_more_fields.proto b/e2e_test/source_inline/kafka/protobuf/user_with_more_fields.proto similarity index 100% rename from e2e_test/schema_registry/protobuf/user_with_more_fields.proto rename to e2e_test/source_inline/kafka/protobuf/user_with_more_fields.proto diff --git a/e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py b/e2e_test/source_inline/kafka/protobuf/user_with_more_fields_pb2.py similarity index 100% rename from e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py rename to e2e_test/source_inline/kafka/protobuf/user_with_more_fields_pb2.py