Skip to content

Commit

Permalink
feat(frontend): support alter source refresh schema for shared source (
Browse files Browse the repository at this point in the history
…#19740)

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Dec 11, 2024
1 parent 61d4a79 commit ffe63f1
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 9 deletions.
1 change: 1 addition & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ FORMAT PLAIN ENCODE PROTOBUF(
message = 'test.User'
);

# age is new field
statement error
SELECT age FROM mv_user;

Expand Down
103 changes: 103 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
control substitution on

system ok
rpk topic delete pb_alter_source_shared_test || true; \
(rpk sr subject delete 'pb_alter_source_shared_test-value' && rpk sr subject delete 'pb_alter_source_shared_test-value' --permanent) || true;

system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 20 user

statement ok
CREATE SOURCE src_user
INCLUDE timestamp -- include explicitly here to test a bug found in https://github.com/risingwavelabs/risingwave/pull/17293
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'pb_alter_source_shared_test',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

statement ok
CREATE MATERIALIZED VIEW mv_user_2 AS SELECT * FROM src_user;

statement ok
CREATE TABLE t_user WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'pb_alter_source_shared_test',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# age is new field
statement error
SELECT age FROM mv_user;

statement error
SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 5 user_with_more_fields

sleep 5s

# Refresh source schema
statement ok
ALTER SOURCE src_user REFRESH SCHEMA;

statement ok
CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;

statement ok
flush;


query ????
SELECT COUNT(*) FROM mv_user;
----
25

query ????
SELECT COUNT(*) FROM mv_user_2;
----
25

query ????
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;
----
25 104 0 510

# Push more events with extended fields
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 5 user_with_more_fields

sleep 5s


query ????
SELECT COUNT(*) FROM mv_user;
----
30

query ????
SELECT COUNT(*) FROM mv_user_2;
----
30

query ????
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;
----
30 104 0 1020


# statement ok
# DROP SOURCE src_user CASCADE;
39 changes: 30 additions & 9 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use itertools::Itertools;
use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{max_column_id, ColumnCatalog};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{EncodeType, FormatType};
Expand All @@ -30,7 +31,9 @@ use risingwave_sqlparser::ast::{
use risingwave_sqlparser::parser::Parser;

use super::alter_table_column::schema_has_schema_registry;
use super::create_source::{bind_columns_from_source, validate_compatibility};
use super::create_source::{
bind_columns_from_source, generate_stream_graph_for_source, validate_compatibility,
};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
Expand Down Expand Up @@ -214,9 +217,10 @@ pub async fn handle_alter_source_with_sr(
name: ObjectName,
format_encode: FormatEncodeOptions,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let session = handler_args.session.clone();
let (source, database_id, schema_id) = fetch_source_catalog_with_db_schema_id(&session, &name)?;
let mut source = source.as_ref().clone();
let old_columns = source.columns.clone();

if source.associated_table_id.is_some() {
return Err(ErrorCode::NotSupported(
Expand All @@ -225,9 +229,6 @@ pub async fn handle_alter_source_with_sr(
)
.into());
};
if source.info.is_shared() {
bail_not_implemented!(issue = 16003, "alter shared source");
}

check_format_encode(&source, &format_encode)?;

Expand Down Expand Up @@ -272,14 +273,34 @@ pub async fn handle_alter_source_with_sr(
.format_encode_secret_refs
.extend(format_encode_secret_ref);

let mut pb_source = source.to_prost(schema_id, database_id);

// update version
pb_source.version += 1;
source.version += 1;

let pb_source = source.to_prost(schema_id, database_id);

let catalog_writer = session.catalog_writer()?;
catalog_writer.alter_source(pb_source).await?;
if source.info.is_shared() {
let graph = generate_stream_graph_for_source(handler_args, source.clone())?;

// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::new(
old_columns
.iter()
.map(|old_c| {
source
.columns
.iter()
.position(|new_c| new_c.column_id() == old_c.column_id())
})
.collect(),
source.columns.len(),
);
catalog_writer
.replace_source(pb_source, graph, col_index_mapping)
.await?
} else {
catalog_writer.alter_source(pb_source).await?;
}
Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
}

Expand Down

0 comments on commit ffe63f1

Please sign in to comment.