diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 84fff651b547c..c4b4713af81cc 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -33,6 +33,7 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- Install dependencies" python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema +apt-get -y install jq echo "--- e2e, inline test" RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ @@ -134,21 +135,7 @@ risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt' echo "--- Kill cluster" risedev ci-kill - -echo "--- e2e, ci-1cn-1fe, protobuf schema registry" export RISINGWAVE_CI=true -RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ -risedev ci-start ci-1cn-1fe -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 20 user -echo "make sure google/protobuf/source_context.proto is NOT in schema registry" -curl --silent 'http://schemaregistry:8082/subjects'; echo -# curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404 -curl --silent 'http://schemaregistry:8082/subjects' | grep -v 'google/protobuf/source_context.proto' -risedev slt './e2e_test/schema_registry/pb.slt' -risedev slt './e2e_test/schema_registry/alter_sr.slt' - -echo "--- Kill cluster" -risedev ci-kill echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ diff --git a/e2e_test/schema_registry/alter_sr.slt b/e2e_test/schema_registry/alter_sr.slt deleted file mode 100644 index d703c0401a35e..0000000000000 --- a/e2e_test/schema_registry/alter_sr.slt +++ /dev/null @@ -1,80 +0,0 @@ -# Before running this test, seed data into kafka: -# python3 e2e_test/schema_registry/pb.py - -statement ok -CREATE SOURCE src_user WITH ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest' -) -FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = 'http://schemaregistry:8082', - message = 'test.User' -); - -statement ok -CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; - -statement ok -CREATE TABLE t_user WITH ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest' -) -FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = 'http://schemaregistry:8082', - message = 'test.User' -); - -statement error -SELECT age FROM mv_user; - -statement error -SELECT age FROM t_user; - -# Push more events with extended fields -system ok -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields - -sleep 5s - -# Refresh source schema -statement ok -ALTER SOURCE src_user REFRESH SCHEMA; - -statement ok -CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; - -# Refresh table schema -statement ok -ALTER TABLE t_user REFRESH SCHEMA; - -query IIII -SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more; ----- -25 4 0 10 - -# Push more events with extended fields -system ok -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields - -sleep 5s - -query IIII -SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; ----- -30 4 0 10 - -statement ok -DROP MATERIALIZED VIEW mv_user_more; - -statement ok -DROP TABLE t_user; - -statement ok -DROP MATERIALIZED VIEW mv_user; - -statement ok -DROP SOURCE src_user; diff --git a/e2e_test/schema_registry/pb.slt b/e2e_test/schema_registry/pb.slt deleted file mode 100644 index 7b60b4fa8d7a4..0000000000000 --- a/e2e_test/schema_registry/pb.slt +++ /dev/null @@ -1,50 +0,0 @@ -# Before running this test, seed data into kafka: -# python3 e2e_test/schema_registry/pb.py - -# Create a table. -statement ok -create table sr_pb_test with ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest') -FORMAT plain ENCODE protobuf( - schema.registry = 'http://schemaregistry:8082', - message = 'test.User' - ); - -# for multiple schema registry nodes -statement ok -create table sr_pb_test_bk with ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest') -FORMAT plain ENCODE protobuf( - schema.registry = 'http://schemaregistry:8082,http://schemaregistry:8082', - message = 'test.User' - ); - -# Wait for source -sleep 10s - -# Flush into storage -statement ok -flush; - -query I -select count(*) from sr_pb_test; ----- -20 - -query IIT -select min(id), max(id), max((sc).file_name) from sr_pb_test; ----- -0 19 source/context_019.proto - - -statement ok -drop table sr_pb_test; - -statement ok -drop table sr_pb_test_bk; diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml index 48342bceafd42..57d09d8237efa 100644 --- a/e2e_test/source_inline/commands.toml +++ b/e2e_test/source_inline/commands.toml @@ -37,6 +37,12 @@ set -e if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then echo "Deleting all Kafka topics..." rpk topic delete -r "*" + echo "Deleting all schema registry subjects" + rpk sr subject list | while read -r subject; do + echo "Deleting schema registry subject: $subject" + rpk sr subject delete "$subject" + rpk sr subject delete "$subject" --permanent + done else echo "No Kafka to clean." fi diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt new file mode 100644 index 0000000000000..e60bf5c0295b0 --- /dev/null +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -0,0 +1,70 @@ +control substitution on + +# https://github.com/risingwavelabs/risingwave/issues/16486 + +# cleanup +system ok +rpk topic delete 'avro_alter_source_test' || true; \\ +(rpk sr subject delete 'avro_alter_source_test-value' && rpk sr subject delete 'avro_alter_source_test-value' --permanent) || true; + +# create topic and sr subject +system ok +rpk topic create 'avro_alter_source_test' + +system ok +echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\ +| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions' + +statement ok +create source s +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'avro_alter_source_test' +) +FORMAT PLAIN ENCODE AVRO ( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}' +); + +# create a new version of schema and produce a message +system ok +echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\ +| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions' + +system ok +echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test + +query ? +select * from s +---- +ABC + +statement error +alter source s format plain encode json; +---- +db error: ERROR: Failed to run the query + +Caused by: + Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Avro, and altering them is not supported yet +No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + + +statement ok +alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); + +query ?? +select * from s +---- +ABC 1 + +statement ok +create materialized view mv as select * from s; + +sleep 2s + +query ?? +select * from mv +---- +ABC 1 + +statement ok +drop source s cascade; diff --git a/e2e_test/source_inline/kafka/protobuf/alter_source.slt b/e2e_test/source_inline/kafka/protobuf/alter_source.slt new file mode 100644 index 0000000000000..c9db2df3ca4ee --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/alter_source.slt @@ -0,0 +1,91 @@ +control substitution on + +system ok +rpk topic delete sr_pb_test || true; \\ +(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true; + +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user + +statement ok +CREATE SOURCE src_user +INCLUDE timestamp -- include explicitly here to test a bug found in https://github.com/risingwavelabs/risingwave/pull/17293 +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest' +) +FORMAT PLAIN ENCODE PROTOBUF( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' +); + +statement ok +CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; + +statement ok +CREATE TABLE t_user WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest' +) +FORMAT PLAIN ENCODE PROTOBUF( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' +); + +statement error +SELECT age FROM mv_user; + +statement error +SELECT age FROM t_user; + +# Push more events with extended fields +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields + +sleep 5s + +# Refresh source schema +statement ok +ALTER SOURCE src_user REFRESH SCHEMA; + +statement ok +CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; + +# Refresh table schema. It consume new data before refresh, so the new fields are NULLs +statement ok +ALTER TABLE t_user REFRESH SCHEMA; + +query ???? +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more; +---- +25 104 0 510 + +query ???? +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; +---- +25 NULL NULL NULL + +# Push more events with extended fields +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields + +sleep 5s + +query ???? +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; +---- +30 104 100 510 + +statement ok +DROP MATERIALIZED VIEW mv_user_more; + +statement ok +DROP TABLE t_user; + +statement ok +DROP MATERIALIZED VIEW mv_user; + +statement ok +DROP SOURCE src_user; diff --git a/e2e_test/source_inline/kafka/protobuf/basic.slt b/e2e_test/source_inline/kafka/protobuf/basic.slt new file mode 100644 index 0000000000000..82eb61560aa4d --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/basic.slt @@ -0,0 +1,58 @@ +control substitution on + +system ok +rpk topic delete sr_pb_test || true; \\ +(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true; + +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user + +# make sure google/protobuf/source_context.proto is NOT in schema registry +system ok +curl --silent '${RISEDEV_SCHEMA_REGISTRY_URL}' | grep -v 'google/protobuf/source_context.proto' + +# Create a table. +statement ok +create table sr_pb_test with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest') +FORMAT plain ENCODE protobuf( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' + ); + +# for multiple schema registry nodes +statement ok +create table sr_pb_test_bk with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest') +FORMAT plain ENCODE protobuf( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL},${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' + ); + +# Wait for source +sleep 2s + +# Flush into storage +statement ok +flush; + +query I +select count(*) from sr_pb_test; +---- +20 + +query IT +select min(id), max(id), max((sc).file_name) from sr_pb_test; +---- +0 19 source/context_019.proto + + +statement ok +drop table sr_pb_test; + +statement ok +drop table sr_pb_test_bk; diff --git a/e2e_test/schema_registry/pb.py b/e2e_test/source_inline/kafka/protobuf/pb.py similarity index 73% rename from e2e_test/schema_registry/pb.py rename to e2e_test/source_inline/kafka/protobuf/pb.py index fd6e0dc478b51..4cab50f899e50 100644 --- a/e2e_test/schema_registry/pb.py +++ b/e2e_test/source_inline/kafka/protobuf/pb.py @@ -25,6 +25,7 @@ def get_user(i): sc=SourceContext(file_name="source/context_{:03}.proto".format(i)), ) + def get_user_with_more_fields(i): return user_pb2.User( id=i, @@ -33,15 +34,18 @@ def get_user_with_more_fields(i): city="City_{}".format(i), gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE, sc=SourceContext(file_name="source/context_{:03}.proto".format(i)), - age=i, + age=100 + i, ) -def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message): + +def send_to_kafka( + producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message +): schema_registry_client = SchemaRegistryClient(schema_registry_conf) serializer = ProtobufSerializer( pb_message, schema_registry_client, - {"use.deprecated.format": False, 'skip.known.types': True}, + {"use.deprecated.format": False, "skip.known.types": True}, ) producer = Producer(producer_conf) @@ -60,7 +64,9 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u if __name__ == "__main__": if len(sys.argv) < 6: - print("pb.py ") + print( + "pb.py " + ) exit(1) broker_list = sys.argv[1] @@ -69,20 +75,29 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u num_records = int(sys.argv[4]) pb_message = sys.argv[5] - user_pb2 = importlib.import_module(f'protobuf.{pb_message}_pb2') + user_pb2 = importlib.import_module(f"{pb_message}_pb2") all_pb_messages = { - 'user': get_user, - 'user_with_more_fields': get_user_with_more_fields, + "user": get_user, + "user_with_more_fields": get_user_with_more_fields, } - assert pb_message in all_pb_messages, f'pb_message must be one of {list(all_pb_messages.keys())}' + assert ( + pb_message in all_pb_messages + ), f"pb_message must be one of {list(all_pb_messages.keys())}" schema_registry_conf = {"url": schema_registry_url} producer_conf = {"bootstrap.servers": broker_list} try: - send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, all_pb_messages[pb_message], user_pb2.User) + send_to_kafka( + producer_conf, + schema_registry_conf, + topic, + num_records, + all_pb_messages[pb_message], + user_pb2.User, + ) except Exception as e: print("Send Protobuf data to schema registry and kafka failed {}", e) exit(1) diff --git a/e2e_test/schema_registry/protobuf/user.proto b/e2e_test/source_inline/kafka/protobuf/user.proto similarity index 100% rename from e2e_test/schema_registry/protobuf/user.proto rename to e2e_test/source_inline/kafka/protobuf/user.proto diff --git a/e2e_test/schema_registry/protobuf/user_pb2.py b/e2e_test/source_inline/kafka/protobuf/user_pb2.py similarity index 100% rename from e2e_test/schema_registry/protobuf/user_pb2.py rename to e2e_test/source_inline/kafka/protobuf/user_pb2.py diff --git a/e2e_test/schema_registry/protobuf/user_with_more_fields.proto b/e2e_test/source_inline/kafka/protobuf/user_with_more_fields.proto similarity index 100% rename from e2e_test/schema_registry/protobuf/user_with_more_fields.proto rename to e2e_test/source_inline/kafka/protobuf/user_with_more_fields.proto diff --git a/e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py b/e2e_test/source_inline/kafka/protobuf/user_with_more_fields_pb2.py similarity index 100% rename from e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py rename to e2e_test/source_inline/kafka/protobuf/user_with_more_fields_pb2.py diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 82d2f22f41cb4..dde746bf3200a 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -21,7 +21,7 @@ use risingwave_pb::plan_common::{ AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, }; -use super::row_id_column_desc; +use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET}; use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID}; use crate::types::DataType; @@ -45,6 +45,10 @@ impl ColumnId { pub const fn placeholder() -> Self { Self(i32::MAX - 1) } + + pub const fn first_user_column() -> Self { + Self(USER_COLUMN_ID_OFFSET) + } } impl ColumnId { @@ -346,6 +350,11 @@ impl ColumnCatalog { self.column_desc.is_default() } + /// If the columns is an `INCLUDE ... AS ...` connector column. + pub fn is_connector_additional_column(&self) -> bool { + self.column_desc.additional_column.column_type.is_some() + } + /// Get a reference to the column desc's data type. pub fn data_type(&self) -> &DataType { &self.column_desc.data_type @@ -430,15 +439,16 @@ pub fn columns_extend(preserved_columns: &mut Vec, columns: Vec bool { - let mut column_ids = columns - .iter() - .map(|column| column.column_id().get_id()) - .collect_vec(); - column_ids.sort(); - let original_len = column_ids.len(); - column_ids.dedup(); - column_ids.len() == original_len +pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) { + debug_assert!( + columns + .iter() + .map(|c| c.column_id()) + .duplicates() + .next() + .is_none(), + "duplicate ColumnId found in source catalog. Columns: {columns:#?}" + ); } #[cfg(test)] diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index f7667a66a3747..26cf746b535dc 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1028,6 +1028,7 @@ pub mod test_utils { } } +/// Note: this is created in `SourceReader::build_stream` #[derive(Debug, Clone, Default)] pub struct ParserConfig { pub common: CommonParserConfig, diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index c5283a2cc592a..94b2980fcb63d 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use either::Either; use itertools::Itertools; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::{is_system_schema, Field}; +use risingwave_common::catalog::{debug_assert_column_ids_distinct, is_system_schema, Field}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_connector::WithPropertiesExt; use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias}; @@ -221,6 +221,7 @@ impl Binder { source_catalog: &SourceCatalog, as_of: Option, ) -> (Relation, Vec<(bool, Field)>) { + debug_assert_column_ids_distinct(&source_catalog.columns); self.included_relations.insert(source_catalog.id.into()); ( Relation::Source(Box::new(BoundSource { diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index bfe4537fa7085..d7c2fd3778c79 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -210,7 +210,7 @@ impl TableType { } } -/// The version of a table, used by schema change. See [`PbTableVersion`]. +/// The version of a table, used by schema change. See [`PbTableVersion`] for more details. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct TableVersion { pub version_id: TableVersionId, diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index fcabedc1149c4..db54cce612a91 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::catalog::ColumnId; +use risingwave_common::catalog::{ColumnCatalog, ColumnId}; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; use risingwave_sqlparser::ast::{ AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, @@ -106,10 +106,7 @@ pub async fn handle_alter_source_column( catalog.definition = alter_definition_add_column(&catalog.definition, column_def.clone())?; let mut bound_column = bind_sql_columns(&[column_def])?.remove(0); - bound_column.column_desc.column_id = columns - .iter() - .fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id())) - .next(); + bound_column.column_desc.column_id = max_column_id(columns).next(); columns.push(bound_column); } _ => unreachable!(), @@ -147,6 +144,20 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul Ok(stmt.to_string()) } +/// FIXME: perhapts we should use sth like `ColumnIdGenerator::new_alter`, +/// However, the `SourceVersion` is problematic: It doesn't contain `next_col_id`. +/// (But for now this isn't a large problem, since drop column is not allowed for source yet..) +/// +/// Besides, the logic of column id handling is a mess. +/// In some places, we use `ColumnId::placeholder()`, and use `col_id_gen` to fill it at the end; +/// In other places, we create column id ad-hoc. +pub fn max_column_id(columns: &Vec) -> ColumnId { + // XXX: should we check the column IDs of struct fields here? + columns + .iter() + .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id())) +} + #[cfg(test)] pub mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index c72cf547365d7..af87960547bc0 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -28,6 +28,7 @@ use risingwave_sqlparser::ast::{ }; use risingwave_sqlparser::parser::Parser; +use super::alter_source_column::max_column_id; use super::alter_table_column::schema_has_schema_registry; use super::create_source::{bind_columns_from_source, validate_compatibility}; use super::util::SourceSchemaCompatExt; @@ -68,14 +69,19 @@ fn encode_type_to_encode(from: EncodeType) -> Option { }) } -/// Returns the columns in `columns_a` but not in `columns_b`, -/// where the comparison is done by name and data type, -/// and hidden columns are ignored. +/// Returns the columns in `columns_a` but not in `columns_b`. +/// +/// Note: +/// - The comparison is done by name and data type, without checking `ColumnId`. +/// - Hidden columns and `INCLUDE ... AS ...` columns are ignored. Because it's only for the special handling of alter sr. +/// For the newly resolved `columns_from_resolve_source` (created by [`bind_columns_from_source`]), it doesn't contain hidden columns (`_row_id`) and `INCLUDE ... AS ...` columns. +/// This is fragile and we should really refactor it later. fn columns_minus(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec { columns_a .iter() .filter(|col_a| { !col_a.is_hidden() + && !col_a.is_connector_additional_column() && !columns_b.iter().any(|col_b| { col_a.name() == col_b.name() && col_a.data_type() == col_b.data_type() }) @@ -162,8 +168,20 @@ pub async fn refresh_sr_and_get_columns_diff( unreachable!("source without schema registry is rejected") }; - let added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); + let mut added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); + // The newly resolved columns' column IDs also starts from 1. They cannot be used directly. + let mut next_col_id = max_column_id(&original_source.columns).next(); + for col in &mut added_columns { + col.column_desc.column_id = next_col_id; + next_col_id = next_col_id.next(); + } let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source); + tracing::debug!( + ?added_columns, + ?dropped_columns, + ?columns_from_resolve_source, + original_source = ?original_source.columns + ); Ok((source_info, added_columns, dropped_columns)) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a29aa86907e0f..1f458bbbc09b5 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -24,7 +24,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::{FromArrow, IcebergArrowConvert}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ - is_column_ids_dedup, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId, + debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_common::types::DataType; @@ -73,8 +73,8 @@ use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported use crate::error::{Result, RwError}; use crate::expr::Expr; use crate::handler::create_table::{ - bind_pk_on_relation, bind_sql_column_constraints, bind_sql_columns, bind_sql_pk_names, - ensure_table_constraints_supported, ColumnIdGenerator, + bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns, + bind_sql_pk_names, ensure_table_constraints_supported, ColumnIdGenerator, }; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; @@ -289,8 +289,11 @@ fn get_name_strategy_or_default(name_strategy: Option) -> Result for more information. -/// return `(columns, source info)` +/// Resolves the schema of the source from external schema file. +/// See for more information. +/// +/// Note: the returned schema strictly corresponds to the schema. +/// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included. pub(crate) async fn bind_columns_from_source( session: &SessionImpl, source_schema: &ConnectorSchema, @@ -489,6 +492,28 @@ pub(crate) async fn bind_columns_from_source( } }; + if cfg!(debug_assertions) { + // validate column ids + // Note: this just documents how it works currently. It doesn't mean whether it's reasonable. + if let Some(ref columns) = columns { + let mut i = 1; + fn check_col(col: &ColumnDesc, i: &mut usize, columns: &Vec) { + for nested_col in &col.field_descs { + // What's the usage of struct fields' column IDs? + check_col(nested_col, i, columns); + } + assert!( + col.column_id.get_id() == *i as i32, + "unexpected column id\ncol: {col:?}\ni: {i}\ncolumns: {columns:#?}" + ); + *i += 1; + } + for col in columns { + check_col(&col.column_desc, &mut i, columns); + } + } + } + if !format_encode_options_to_consume.is_empty() { let err_string = format!( "Get unknown format_encode_options for {:?} {:?}: {}", @@ -1387,10 +1412,12 @@ pub async fn bind_create_source( .into()); } + // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary. + // XXX: should we also chenge the col id for struct fields? for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) } - debug_assert!(is_column_ids_dedup(&columns)); + debug_assert_column_ids_distinct(&columns); let must_need_pk = if is_create_source { with_properties.connector_need_pk() @@ -1403,7 +1430,7 @@ pub async fn bind_create_source( }; let (mut columns, pk_col_ids, row_id_index) = - bind_pk_on_relation(columns, pk_names, must_need_pk)?; + bind_pk_and_row_id_on_relation(columns, pk_names, must_need_pk)?; let watermark_descs = bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 0f3693653ced5..c542762702053 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -24,7 +24,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, - INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET, + INITIAL_TABLE_VERSION_ID, }; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; @@ -110,7 +110,7 @@ impl ColumnIdGenerator { pub fn new_initial() -> Self { Self { existing: HashMap::new(), - next_column_id: ColumnId::from(USER_COLUMN_ID_OFFSET), + next_column_id: ColumnId::first_user_column(), version_id: INITIAL_TABLE_VERSION_ID, } } @@ -404,7 +404,7 @@ fn multiple_pk_definition_err() -> RwError { /// /// It returns the columns together with `pk_column_ids`, and an optional row id column index if /// added. -pub fn bind_pk_on_relation( +pub fn bind_pk_and_row_id_on_relation( mut columns: Vec, pk_names: Vec, must_need_pk: bool, @@ -570,7 +570,8 @@ pub(crate) fn gen_create_table_plan_without_source( ) -> Result<(PlanRef, PbTable)> { ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; + let (mut columns, pk_column_ids, row_id_index) = + bind_pk_and_row_id_on_relation(columns, pk_names, true)?; let watermark_descs: Vec = bind_source_watermark( context.session_ctx(), @@ -762,7 +763,8 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( c.column_desc.column_id = col_id_gen.generate(c.name()) } - let (columns, pk_column_ids, _row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; + let (columns, pk_column_ids, _row_id_index) = + bind_pk_and_row_id_on_relation(columns, pk_names, true)?; let definition = context.normalized_sql().to_owned(); @@ -881,7 +883,6 @@ fn derive_connect_properties( pub(super) async fn handle_create_table_plan( handler_args: HandlerArgs, explain_options: ExplainOptions, - col_id_gen: ColumnIdGenerator, source_schema: Option, cdc_table_info: Option, table_name: ObjectName, @@ -894,6 +895,7 @@ pub(super) async fn handle_create_table_plan( with_version_column: Option, include_column_options: IncludeOption, ) -> Result<(PlanRef, Option, PbTable, TableJobType)> { + let col_id_gen = ColumnIdGenerator::new_initial(); let source_schema = check_create_table_with_source( &handler_args.with_options, source_schema, @@ -1148,11 +1150,9 @@ pub async fn handle_create_table( } let (graph, source, table, job_type) = { - let col_id_gen = ColumnIdGenerator::new_initial(); let (plan, source, table, job_type) = handle_create_table_plan( handler_args, ExplainOptions::default(), - col_id_gen, source_schema, cdc_table_info, table_name.clone(), @@ -1435,7 +1435,8 @@ mod tests { } ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names, true)?; + let (_, pk_column_ids, _) = + bind_pk_and_row_id_on_relation(columns, pk_names, true)?; Ok(pk_column_ids) })(); match (expected, actual) { diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 9f46087c206e8..db124b373181b 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -22,7 +22,6 @@ use thiserror_ext::AsReport; use super::create_index::{gen_create_index_plan, resolve_index_schema}; use super::create_mv::gen_create_mv_plan; use super::create_sink::{gen_sink_plan, get_partition_compute_info}; -use super::create_table::ColumnIdGenerator; use super::query::gen_batch_plan_by_statement; use super::util::SourceSchemaCompatExt; use super::{RwPgResponse, RwPgResponseBuilderExt}; @@ -66,14 +65,11 @@ async fn do_handle_explain( wildcard_idx, .. } => { - let col_id_gen = ColumnIdGenerator::new_initial(); - let source_schema = source_schema.map(|s| s.into_v2_with_warning()); let (plan, _source, _table, _job_type) = handle_create_table_plan( handler_args, explain_options, - col_id_gen, source_schema, cdc_table_info, name.clone(), diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 7fd4f0b92822b..73b52b977c7a4 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -151,6 +151,15 @@ fn to_pg_rows( session_data: &StaticSessionData, ) -> RwResult> { assert_eq!(chunk.dimension(), column_types.len()); + if cfg!(debug_assertions) { + let chunk_data_types = chunk.data_types(); + for (ty1, ty2) in chunk_data_types.iter().zip_eq_fast(column_types) { + debug_assert!( + ty1.equals_datatype(ty2), + "chunk_data_types: {chunk_data_types:?}, column_types: {column_types:?}" + ) + } + } chunk .rows() diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 0310fdbbd439b..918db2919e626 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -70,6 +70,12 @@ impl LogicalSource { ctx: OptimizerContextRef, as_of: Option, ) -> Result { + // XXX: should we reorder the columns? + // The order may be strange if the schema is changed, e.g., [foo:Varchar, _rw_kafka_timestamp:Timestamptz, _row_id:Serial, bar:Int32] + // related: https://github.com/risingwavelabs/risingwave/issues/16486 + // The order does not matter much. The columns field is essentially a map indexed by the column id. + // It will affect what users will see in `SELECT *`. + // But not sure if we rely on the position of hidden column like `_row_id` somewhere. For `projected_row_id` we do so... let core = generic::Source { catalog: source_catalog, column_catalog, diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index 943f37abf655d..bff4062f72097 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -78,13 +78,9 @@ pub fn generate_risedev_env(services: &Vec) -> String { writeln!(env, r#"RPK_BROKERS="{brokers}""#).unwrap(); } ServiceConfig::SchemaRegistry(c) => { - let address = &c.address; - let port = &c.port; - writeln!( - env, - r#"RISEDEV_SCHEMA_REGISTRY_URL="http://{address}:{port}""#, - ) - .unwrap(); + let url = format!("http://{}:{}", c.address, c.port); + writeln!(env, r#"RISEDEV_SCHEMA_REGISTRY_URL="{url}""#,).unwrap(); + writeln!(env, r#"RPK_REGISTRY_HOSTS="{url}""#).unwrap(); } ServiceConfig::MySql(c) if c.application != Application::Metastore => { let host = &c.address;