diff --git a/e2e_test/source_inline/kafka/alter/add_column_shared.slt b/e2e_test/source_inline/kafka/alter/add_column_shared.slt new file mode 100644 index 0000000000000..f6e38a25a4595 --- /dev/null +++ b/e2e_test/source_inline/kafka/alter/add_column_shared.slt @@ -0,0 +1,138 @@ +control substitution on + +system ok +rpk topic create shared_source_alter -p 4 + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 1, "v2": "a", "v3": "a1"} +1 {"v1": 2, "v2": "b", "v3": "b1"} +2 {"v1": 3, "v2": "c", "v3": "c1"} +3 {"v1": 4, "v2": "d", "v3": "d1"} +EOF + +statement ok +create source s (v1 int, v2 varchar, v3 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source_alter', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + + +statement ok +create materialized view mv_before_alter as select * from s; + +sleep 2s + +query ?? rowsort +select * from s; +---- +1 a +2 b +3 c +4 d + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + + +statement ok +alter source s add column v3 varchar; + +# New MV will have v3. + +statement ok +create materialized view mv_after_alter as select * from s; + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + +# Produce new data. + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 5, "v2": "e", "v3": "e1"} +1 {"v1": 6, "v2": "f", "v3": "f1"} +2 {"v1": 7, "v2": "g", "v3": "g1"} +3 {"v1": 8, "v2": "h", "v3": "h1"} +EOF + +sleep 2s + + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d +5 e +6 f +7 g +8 h + + +statement ok +drop source s cascade; + +# TODO: test alter source with schema registry diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 32e94fb06b46b..f59b494b5ab4f 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -74,6 +74,8 @@ message DropSourceResponse { message AlterSourceRequest { catalog.Source source = 1; + // for shared source, we need to replace the streaming job + optional ReplaceStreamingJobPlan plan = 2; } message AlterSourceResponse { diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 21002f0121b0b..c3b08836bbf6f 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -197,7 +197,11 @@ pub trait CatalogWriter: Send + Sync { async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>; /// Replace the source in the catalog. - async fn alter_source(&self, source: PbSource) -> Result<()>; + async fn alter_source( + &self, + source: PbSource, + replace_streaming_job_plan: Option, + ) -> Result<()>; async fn alter_parallelism( &self, @@ -498,8 +502,15 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn alter_source(&self, source: PbSource) -> Result<()> { - let version = self.meta_client.alter_source(source).await?; + async fn alter_source( + &self, + source: PbSource, + replace_streaming_job_plan: Option, + ) -> Result<()> { + let version = self + .meta_client + .alter_source(source, replace_streaming_job_plan) + .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 2d2e2c6698282..b4f5b52511da4 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -121,8 +121,9 @@ pub async fn handle_alter_source_column( catalog.version += 1; let catalog_writer = session.catalog_writer()?; + let replace_plan = todo!(); catalog_writer - .alter_source(catalog.to_prost(schema_id, db_id)) + .alter_source(catalog.to_prost(schema_id, db_id), replace_plan) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 5e889ef9f0d7e..377e0256c3d7f 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -278,7 +278,7 @@ pub async fn handle_alter_source_with_sr( pb_source.version += 1; let catalog_writer = session.catalog_writer()?; - catalog_writer.alter_source(pb_source).await?; + catalog_writer.alter_source(pb_source, todo!()).await?; Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE)) } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 24e4744450241..d5b33c5e89c80 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -591,7 +591,11 @@ impl CatalogWriter for MockCatalogWriter { } } - async fn alter_source(&self, source: PbSource) -> Result<()> { + async fn alter_source( + &self, + source: PbSource, + _replace_streaming_job_plan: Option, + ) -> Result<()> { self.catalog.write().update_source(&source); Ok(()) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 033b0e135ccc6..a6844903564fe 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -48,7 +48,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, alter_swap_rename_request, DdlProgress, - TableJobType, WaitVersion, + PbReplaceStreamingJobPlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f8e45f69cd982..a1ac7a4024c0a 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -552,9 +552,14 @@ impl MetaClient { .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_source(&self, source: PbSource) -> Result { + pub async fn alter_source( + &self, + source: PbSource, + plan: Option, + ) -> Result { let request = AlterSourceRequest { source: Some(source), + plan, }; let resp = self.inner.alter_source(request).await?; Ok(resp