Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source): fix panic for ALTER SOURCE with schema registry (#17293) #17353

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- Install dependencies"
python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema
apt-get -y install jq

echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
Expand Down Expand Up @@ -134,21 +135,7 @@ risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt'

echo "--- Kill cluster"
risedev ci-kill

echo "--- e2e, ci-1cn-1fe, protobuf schema registry"
export RISINGWAVE_CI=true
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-1cn-1fe
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 20 user
echo "make sure google/protobuf/source_context.proto is NOT in schema registry"
curl --silent 'http://schemaregistry:8082/subjects'; echo
# curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404
curl --silent 'http://schemaregistry:8082/subjects' | grep -v 'google/protobuf/source_context.proto'
risedev slt './e2e_test/schema_registry/pb.slt'
risedev slt './e2e_test/schema_registry/alter_sr.slt'

echo "--- Kill cluster"
risedev ci-kill

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
Expand Down
80 changes: 0 additions & 80 deletions e2e_test/schema_registry/alter_sr.slt

This file was deleted.

50 changes: 0 additions & 50 deletions e2e_test/schema_registry/pb.slt

This file was deleted.

6 changes: 6 additions & 0 deletions e2e_test/source_inline/commands.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ set -e
if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then
echo "Deleting all Kafka topics..."
rpk topic delete -r "*"
echo "Deleting all schema registry subjects"
rpk sr subject list | while read -r subject; do
echo "Deleting schema registry subject: $subject"
rpk sr subject delete "$subject"
rpk sr subject delete "$subject" --permanent
done
else
echo "No Kafka to clean."
fi
Expand Down
70 changes: 70 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
control substitution on

# https://github.com/risingwavelabs/risingwave/issues/16486

# cleanup
system ok
rpk topic delete 'avro_alter_source_test' || true; \\
(rpk sr subject delete 'avro_alter_source_test-value' && rpk sr subject delete 'avro_alter_source_test-value' --permanent) || true;

# create topic and sr subject
system ok
rpk topic create 'avro_alter_source_test'

system ok
echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\
| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions'

statement ok
create source s
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro_alter_source_test'
)
FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);

# create a new version of schema and produce a message
system ok
echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\
| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions'

system ok
echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test

query ?
select * from s
----
ABC

statement error
alter source s format plain encode json;
----
db error: ERROR: Failed to run the query

Caused by:
Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Avro, and altering them is not supported yet
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml


statement ok
alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

query ??
select * from s
----
ABC 1

statement ok
create materialized view mv as select * from s;

sleep 2s

query ??
select * from mv
----
ABC 1

statement ok
drop source s cascade;
91 changes: 91 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
control substitution on

system ok
rpk topic delete sr_pb_test || true; \\
(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true;

system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user

statement ok
CREATE SOURCE src_user
INCLUDE timestamp -- include explicitly here to test a bug found in https://github.com/risingwavelabs/risingwave/pull/17293
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

statement ok
CREATE TABLE t_user WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

statement error
SELECT age FROM mv_user;

statement error
SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields

sleep 5s

# Refresh source schema
statement ok
ALTER SOURCE src_user REFRESH SCHEMA;

statement ok
CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;

# Refresh table schema. It consume new data before refresh, so the new fields are NULLs
statement ok
ALTER TABLE t_user REFRESH SCHEMA;

query ????
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;
----
25 104 0 510

query ????
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user;
----
25 NULL NULL NULL

# Push more events with extended fields
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields

sleep 5s

query ????
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user;
----
30 104 100 510

statement ok
DROP MATERIALIZED VIEW mv_user_more;

statement ok
DROP TABLE t_user;

statement ok
DROP MATERIALIZED VIEW mv_user;

statement ok
DROP SOURCE src_user;
58 changes: 58 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/basic.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
control substitution on

system ok
rpk topic delete sr_pb_test || true; \\
(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true;

system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user

# make sure google/protobuf/source_context.proto is NOT in schema registry
system ok
curl --silent '${RISEDEV_SCHEMA_REGISTRY_URL}' | grep -v 'google/protobuf/source_context.proto'

# Create a table.
statement ok
create table sr_pb_test with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# for multiple schema registry nodes
statement ok
create table sr_pb_test_bk with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'sr_pb_test',
scan.startup.mode = 'earliest')
FORMAT plain ENCODE protobuf(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL},${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# Wait for source
sleep 2s

# Flush into storage
statement ok
flush;

query I
select count(*) from sr_pb_test;
----
20

query IT
select min(id), max(id), max((sc).file_name) from sr_pb_test;
----
0 19 source/context_019.proto


statement ok
drop table sr_pb_test;

statement ok
drop table sr_pb_test_bk;
Loading
Loading