diff --git a/e2e_test/source_inline/kafka/alter/add_column_shared.slt b/e2e_test/source_inline/kafka/alter/add_column_shared.slt new file mode 100644 index 0000000000000..f6e38a25a4595 --- /dev/null +++ b/e2e_test/source_inline/kafka/alter/add_column_shared.slt @@ -0,0 +1,138 @@ +control substitution on + +system ok +rpk topic create shared_source_alter -p 4 + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 1, "v2": "a", "v3": "a1"} +1 {"v1": 2, "v2": "b", "v3": "b1"} +2 {"v1": 3, "v2": "c", "v3": "c1"} +3 {"v1": 4, "v2": "d", "v3": "d1"} +EOF + +statement ok +create source s (v1 int, v2 varchar, v3 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source_alter', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + + +statement ok +create materialized view mv_before_alter as select * from s; + +sleep 2s + +query ?? rowsort +select * from s; +---- +1 a +2 b +3 c +4 d + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + + +statement ok +alter source s add column v3 varchar; + +# New MV will have v3. + +statement ok +create materialized view mv_after_alter as select * from s; + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + +# Produce new data. + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 5, "v2": "e", "v3": "e1"} +1 {"v1": 6, "v2": "f", "v3": "f1"} +2 {"v1": 7, "v2": "g", "v3": "g1"} +3 {"v1": 8, "v2": "h", "v3": "h1"} +EOF + +sleep 2s + + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d +5 e +6 f +7 g +8 h + + +statement ok +drop source s cascade; + +# TODO: test alter source with schema registry diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 3b94b4d9f2bd9..d117f0e3a346a 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -74,6 +74,8 @@ message DropSourceResponse { message AlterSourceRequest { catalog.Source source = 1; + // for shared source, we need to replace the streaming job + optional ReplaceStreamingJobPlan plan = 2; } message AlterSourceResponse { @@ -368,6 +370,18 @@ message ReplaceTablePlan { TableJobType job_type = 5; } +// Replace a streaming job, but not a table. e.g., alter a shared source. +message ReplaceStreamingJobPlan { + // The new materialization plan, where all schema are updated. + stream_plan.StreamFragmentGraph fragment_graph = 2; + // The mapping from the old columns to the new columns of the table. + // If no column modifications occur (such as for sinking into table), this will be None. + catalog.ColIndexMapping table_col_index_mapping = 3; + // Source catalog of table's associated source + catalog.Source source = 4; + TableJobType job_type = 5; +} + message ReplaceTablePlanRequest { ReplaceTablePlan plan = 1; } diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index f7dcb919a6ad7..87486698c7968 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -27,8 +27,8 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::{ alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request, - create_connection_request, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, - WaitVersion, + create_connection_request, PbReplaceStreamingJobPlan, PbReplaceTablePlan, PbTableJobType, + ReplaceTablePlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -197,7 +197,11 @@ pub trait CatalogWriter: Send + Sync { async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>; /// Replace the source in the catalog. - async fn alter_source(&self, source: PbSource) -> Result<()>; + async fn alter_source( + &self, + source: PbSource, + replace_streaming_job_plan: Option, + ) -> Result<()>; async fn alter_parallelism( &self, @@ -498,8 +502,15 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn alter_source(&self, source: PbSource) -> Result<()> { - let version = self.meta_client.alter_source(source).await?; + async fn alter_source( + &self, + source: PbSource, + replace_streaming_job_plan: Option, + ) -> Result<()> { + let version = self + .meta_client + .alter_source(source, replace_streaming_job_plan) + .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 2d2e2c6698282..b4f5b52511da4 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -121,8 +121,9 @@ pub async fn handle_alter_source_column( catalog.version += 1; let catalog_writer = session.catalog_writer()?; + let replace_plan = todo!(); catalog_writer - .alter_source(catalog.to_prost(schema_id, db_id)) + .alter_source(catalog.to_prost(schema_id, db_id), replace_plan) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 5e889ef9f0d7e..377e0256c3d7f 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -278,7 +278,7 @@ pub async fn handle_alter_source_with_sr( pb_source.version += 1; let catalog_writer = session.catalog_writer()?; - catalog_writer.alter_source(pb_source).await?; + catalog_writer.alter_source(pb_source, todo!()).await?; Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE)) } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 77a9bc23d5a53..089492c71b5d0 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -46,7 +46,8 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, alter_swap_rename_request, - create_connection_request, DdlProgress, PbTableJobType, ReplaceTablePlan, TableJobType, + create_connection_request, DdlProgress, PbReplaceStreamingJobPlan, PbTableJobType, + ReplaceTablePlan, TableJobType, }; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ @@ -591,7 +592,11 @@ impl CatalogWriter for MockCatalogWriter { } } - async fn alter_source(&self, source: PbSource) -> Result<()> { + async fn alter_source( + &self, + source: PbSource, + _replace_streaming_job_plan: Option, + ) -> Result<()> { self.catalog.write().update_source(&source); Ok(()) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 9527cffa5c773..60ab37ef1adf9 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -48,7 +48,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, alter_swap_rename_request, DdlProgress, - TableJobType, WaitVersion, + PbReplaceStreamingJobPlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; @@ -147,6 +147,7 @@ pub enum DdlCommand { AlterSwapRename(alter_swap_rename_request::Object), ReplaceTable(ReplaceStreamJobInfo), AlterNonSharedSource(Source), + AlterSourceColumn(Source), AlterObjectOwner(Object, UserId), AlterSetSchema(alter_set_schema_request::Object, SchemaId), CreateConnection(Connection), @@ -354,7 +355,9 @@ impl DdlController { DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await, DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await, DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await, - DdlCommand::AlterNonSharedSource(source) => ctrl.alter_source(source).await, + DdlCommand::AlterNonSharedSource(source, plan) => { + ctrl.alter_non_shared_source(source, plan).await + } DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await, DdlCommand::CreateSubscription(subscription) => { ctrl.create_subscription(subscription).await @@ -464,7 +467,11 @@ impl DdlController { /// This replaces the source in the catalog. /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated. - async fn alter_source(&self, source: Source) -> MetaResult { + async fn alter_non_shared_source( + &self, + source: Source, + plan: Option, + ) -> MetaResult { self.metadata_manager .catalog_controller .alter_non_shared_source(source) diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index edf83355651aa..3e249f512c2bf 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -551,9 +551,14 @@ impl MetaClient { .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_source(&self, source: PbSource) -> Result { + pub async fn alter_source( + &self, + source: PbSource, + plan: Option, + ) -> Result { let request = AlterSourceRequest { source: Some(source), + plan, }; let resp = self.inner.alter_source(request).await?; Ok(resp