diff --git a/e2e_test/source_inline/kafka/shared_source_alter.slt b/e2e_test/source_inline/kafka/shared_source_alter.slt new file mode 100644 index 0000000000000..eee79e3168c1e --- /dev/null +++ b/e2e_test/source_inline/kafka/shared_source_alter.slt @@ -0,0 +1,140 @@ +control substitution on + +system ok +rpk topic create shared_source_alter -p 4 + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 1, "v2": "a", "v3": "a1"} +1 {"v1": 2, "v2": "b", "v3": "b1"} +2 {"v1": 3, "v2": "c", "v3": "c1"} +3 {"v1": 4, "v2": "d", "v3": "d1"} +EOF + +statement ok +create source s (v1 int, v2 varchar, v3 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source_alter', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + + +statement ok +create materialized view mv_before_alter as select * from s; + +sleep 2s + +query ?? rowsort +select * from s; +---- +1 a +2 b +3 c +4 d + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + + +statement ok +alter source s add column v3 varchar; + +# New MV will have v3. + +statement ok +create materialized view mv_after_alter as select * from s; + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + +# Produce new data. + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 5, "v2": "e", "v3": "e1"} +1 {"v1": 6, "v2": "f", "v3": "f1"} +2 {"v1": 7, "v2": "g", "v3": "g1"} +3 {"v1": 8, "v2": "h", "v3": "h1"} +EOF + +sleep 2s + + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d +5 e +6 f +7 g +8 h + + +statement ok +drop source s cascade; + +# TODO: test alter source with schema registry + +# TODO: test alter source rename, change owner, etc. diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index de860593e8105..9dfdb53b5dee3 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -74,6 +74,8 @@ message DropSourceResponse { message AlterSourceRequest { catalog.Source source = 1; + // for shared source, we need to replace the streaming job + optional ReplaceStreamingJobPlan plan = 2; } message AlterSourceResponse { @@ -343,6 +345,18 @@ message ReplaceTablePlan { TableJobType job_type = 5; } +// Replace a streaming job, but not a table. e.g., alter a shared source. +message ReplaceStreamingJobPlan { + // The new materialization plan, where all schema are updated. + stream_plan.StreamFragmentGraph fragment_graph = 2; + // The mapping from the old columns to the new columns of the table. + // If no column modifications occur (such as for sinking into table), this will be None. + catalog.ColIndexMapping table_col_index_mapping = 3; + // Source catalog of table's associated source + catalog.Source source = 4; + TableJobType job_type = 5; +} + message ReplaceTablePlanRequest { ReplaceTablePlan plan = 1; } diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 2d3fbd7e0178a..3fd1f7c08541c 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -26,7 +26,8 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::{ alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request, - PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion, + PbReplaceStreamingJobPlan, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, + WaitVersion, }; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -183,7 +184,11 @@ pub trait CatalogWriter: Send + Sync { async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>; /// Replace the source in the catalog. - async fn alter_source(&self, source: PbSource) -> Result<()>; + async fn alter_source( + &self, + source: PbSource, + replace_streaming_job_plan: Option, + ) -> Result<()>; async fn alter_parallelism( &self, @@ -480,8 +485,15 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn alter_source(&self, source: PbSource) -> Result<()> { - let version = self.meta_client.alter_source(source).await?; + async fn alter_source( + &self, + source: PbSource, + replace_streaming_job_plan: Option, + ) -> Result<()> { + let version = self + .meta_client + .alter_source(source, replace_streaming_job_plan) + .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 2d2e2c6698282..2354bfa822b2b 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -122,7 +122,7 @@ pub async fn handle_alter_source_column( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_source(catalog.to_prost(schema_id, db_id)) + .alter_source(catalog.to_prost(schema_id, db_id), todo!()) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index bf8cf991d1a4f..2331f2e81f19d 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -281,7 +281,7 @@ pub async fn handle_alter_source_with_sr( pb_source.version += 1; let catalog_writer = session.catalog_writer()?; - catalog_writer.alter_source(pb_source).await?; + catalog_writer.alter_source(pb_source, todo!()).await?; Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE)) } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 14befbaeb7357..c4805b703f3e9 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -45,7 +45,7 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, create_connection_request, DdlProgress, - PbTableJobType, ReplaceTablePlan, TableJobType, + PbReplaceStreamingJobPlan, PbTableJobType, ReplaceTablePlan, TableJobType, }; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ @@ -586,7 +586,11 @@ impl CatalogWriter for MockCatalogWriter { } } - async fn alter_source(&self, source: PbSource) -> Result<()> { + async fn alter_source( + &self, + source: PbSource, + _replace_streaming_job_plan: Option, + ) -> Result<()> { self.catalog.write().update_source(&source); Ok(()) } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 1578813e2ead9..097f3f6d93be8 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -682,10 +682,10 @@ impl DdlService for DdlServiceImpl { &self, request: Request, ) -> Result, Status> { - let AlterSourceRequest { source } = request.into_inner(); + let AlterSourceRequest { source, plan } = request.into_inner(); let version = self .ddl_controller - .run_command(DdlCommand::AlterSourceColumn(source.unwrap())) + .run_command(DdlCommand::AlterSource(source.unwrap(), plan)) .await?; Ok(Response::new(AlterSourceResponse { status: None, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 516adbf8b6c90..eb70b6b5641b1 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -395,6 +395,7 @@ impl CatalogController { Ok(table_id_map) } + /// Insert pub async fn prepare_streaming_job( &self, table_fragment: PbTableFragments, @@ -451,7 +452,7 @@ impl CatalogController { } if !for_replace { - // // Update dml fragment id. + // Update dml fragment id. if let StreamingJob::Table(_, table, ..) = streaming_job { Table::update(table::ActiveModel { table_id: Set(table.id as _), diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index baffdbd72fba9..c9809dc1dce4b 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -53,7 +53,8 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, WaitVersion, + alter_name_request, alter_set_schema_request, DdlProgress, PbReplaceStreamingJobPlan, + TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; @@ -149,7 +150,7 @@ pub enum DdlCommand { DropStreamingJob(StreamingJobId, DropMode, Option), AlterName(alter_name_request::Object, String), ReplaceTable(ReplaceTableInfo), - AlterSourceColumn(Source), + AlterSource(Source, Option), AlterObjectOwner(Object, UserId), AlterSetSchema(alter_set_schema_request::Object, SchemaId), CreateConnection(Connection), @@ -341,7 +342,7 @@ impl DdlController { } DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await, DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await, - DdlCommand::AlterSourceColumn(source) => ctrl.alter_source(source).await, + DdlCommand::AlterSource(source, plan) => ctrl.alter_source(source, plan).await, DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await, DdlCommand::CreateSubscription(subscription) => { ctrl.create_subscription(subscription).await @@ -458,11 +459,22 @@ impl DdlController { /// This replaces the source in the catalog. /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated. - async fn alter_source(&self, source: Source) -> MetaResult { - self.metadata_manager + async fn alter_source( + &self, + source: Source, + plan: Option, + ) -> MetaResult { + let version = self + .metadata_manager .catalog_controller .alter_source(source) - .await + .await?; + if let Some(plan) = plan { + // finish_replace_streaming_job + } else { + // Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated. + } + Ok(version) } async fn create_function(&self, function: Function) -> MetaResult { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 6e5dd813a240b..9a9572fd0160d 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -547,9 +547,14 @@ impl MetaClient { .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_source(&self, source: PbSource) -> Result { + pub async fn alter_source( + &self, + source: PbSource, + plan: Option, + ) -> Result { let request = AlterSourceRequest { source: Some(source), + plan, }; let resp = self.inner.alter_source(request).await?; Ok(resp