From 7c0ac3f75692ff109e7ddbcda4181854f99832c4 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 3 Dec 2024 12:42:53 +0800 Subject: [PATCH] feat(frontend): support alter shared source --- .../kafka/alter/add_column_shared.slt | 138 ++++++++++++++++++ proto/ddl_service.proto | 2 + src/frontend/src/catalog/catalog_service.rs | 17 ++- .../src/handler/alter_source_column.rs | 72 ++++++++- .../src/handler/alter_source_with_sr.rs | 2 +- src/frontend/src/test_utils.rs | 6 +- src/meta/src/rpc/ddl_controller.rs | 2 +- src/rpc_client/src/meta_client.rs | 7 +- 8 files changed, 238 insertions(+), 8 deletions(-) create mode 100644 e2e_test/source_inline/kafka/alter/add_column_shared.slt diff --git a/e2e_test/source_inline/kafka/alter/add_column_shared.slt b/e2e_test/source_inline/kafka/alter/add_column_shared.slt new file mode 100644 index 0000000000000..f6e38a25a4595 --- /dev/null +++ b/e2e_test/source_inline/kafka/alter/add_column_shared.slt @@ -0,0 +1,138 @@ +control substitution on + +system ok +rpk topic create shared_source_alter -p 4 + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 1, "v2": "a", "v3": "a1"} +1 {"v1": 2, "v2": "b", "v3": "b1"} +2 {"v1": 3, "v2": "c", "v3": "c1"} +3 {"v1": 4, "v2": "d", "v3": "d1"} +EOF + +statement ok +create source s (v1 int, v2 varchar, v3 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source_alter', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + + +statement ok +create materialized view mv_before_alter as select * from s; + +sleep 2s + +query ?? rowsort +select * from s; +---- +1 a +2 b +3 c +4 d + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + + +statement ok +alter source s add column v3 varchar; + +# New MV will have v3. + +statement ok +create materialized view mv_after_alter as select * from s; + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + +# Produce new data. + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 5, "v2": "e", "v3": "e1"} +1 {"v1": 6, "v2": "f", "v3": "f1"} +2 {"v1": 7, "v2": "g", "v3": "g1"} +3 {"v1": 8, "v2": "h", "v3": "h1"} +EOF + +sleep 2s + + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d +5 e +6 f +7 g +8 h + + +statement ok +drop source s cascade; + +# TODO: test alter source with schema registry diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 32e94fb06b46b..f59b494b5ab4f 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -74,6 +74,8 @@ message DropSourceResponse { message AlterSourceRequest { catalog.Source source = 1; + // for shared source, we need to replace the streaming job + optional ReplaceStreamingJobPlan plan = 2; } message AlterSourceResponse { diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 21002f0121b0b..c3b08836bbf6f 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -197,7 +197,11 @@ pub trait CatalogWriter: Send + Sync { async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>; /// Replace the source in the catalog. - async fn alter_source(&self, source: PbSource) -> Result<()>; + async fn alter_source( + &self, + source: PbSource, + replace_streaming_job_plan: Option, + ) -> Result<()>; async fn alter_parallelism( &self, @@ -498,8 +502,15 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn alter_source(&self, source: PbSource) -> Result<()> { - let version = self.meta_client.alter_source(source).await?; + async fn alter_source( + &self, + source: PbSource, + replace_streaming_job_plan: Option, + ) -> Result<()> { + let version = self + .meta_client + .alter_source(source, replace_streaming_job_plan) + .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 2d2e2c6698282..f8d05e1bcf596 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -17,6 +17,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::max_column_id; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; +use risingwave_pb::catalog::PbSource; use risingwave_sqlparser::ast::{ AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, }; @@ -25,6 +26,7 @@ use risingwave_sqlparser::parser::Parser; use super::create_table::bind_sql_columns; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::source_catalog::SourceCatalog; use crate::error::{ErrorCode, Result, RwError}; use crate::Binder; @@ -121,8 +123,9 @@ pub async fn handle_alter_source_column( catalog.version += 1; let catalog_writer = session.catalog_writer()?; + let replace_plan = todo!(); catalog_writer - .alter_source(catalog.to_prost(schema_id, db_id)) + .alter_source(catalog.to_prost(schema_id, db_id), replace_plan) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) @@ -149,6 +152,73 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul Ok(stmt.to_string()) } +pub async fn get_replace_source_plan( + session: &Arc, + table_name: ObjectName, + new_definition: Statement, + old_catalog: &Arc, +) -> Result<(PbSource, StreamFragmentGraph, ColIndexMapping, TableJobType)> { + // Create handler args as if we're creating a new table with the altered definition. + let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?; + // let col_id_gen = ColumnIdGenerator::new_alter(old_catalog); + let Statement::CreateTable { + columns, + constraints, + source_watermarks, + append_only, + on_conflict, + with_version_column, + wildcard_idx, + cdc_table_info, + format_encode, + include_column_options, + .. + } = new_definition + else { + panic!("unexpected statement type: {:?}", new_definition); + }; + + let format_encode = format_encode + .clone() + .map(|format_encode| format_encode.into_v2_with_warning()); + + let (mut graph, source, job_type) = generate_stream_graph_for_replace_table( + session, + table_name, + old_catalog, + format_encode, + handler_args.clone(), + col_id_gen, + columns.clone(), + wildcard_idx, + constraints, + source_watermarks, + append_only, + on_conflict, + with_version_column, + cdc_table_info, + new_version_columns, + include_column_options, + ) + .await?; + + // Calculate the mapping from the original columns to the new columns. + let col_index_mapping = ColIndexMapping::new( + old_catalog + .columns() + .iter() + .map(|old_c| { + source.columns.iter().position(|new_c| { + new_c.get_column_desc().unwrap().column_id == old_c.column_id().get_id() + }) + }) + .collect(), + table.columns.len(), + ); + + Ok((source, graph, col_index_mapping, job_type)) +} + #[cfg(test)] pub mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 5e889ef9f0d7e..377e0256c3d7f 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -278,7 +278,7 @@ pub async fn handle_alter_source_with_sr( pb_source.version += 1; let catalog_writer = session.catalog_writer()?; - catalog_writer.alter_source(pb_source).await?; + catalog_writer.alter_source(pb_source, todo!()).await?; Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE)) } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 24e4744450241..d5b33c5e89c80 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -591,7 +591,11 @@ impl CatalogWriter for MockCatalogWriter { } } - async fn alter_source(&self, source: PbSource) -> Result<()> { + async fn alter_source( + &self, + source: PbSource, + _replace_streaming_job_plan: Option, + ) -> Result<()> { self.catalog.write().update_source(&source); Ok(()) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 033b0e135ccc6..a6844903564fe 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -48,7 +48,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, alter_swap_rename_request, DdlProgress, - TableJobType, WaitVersion, + PbReplaceStreamingJobPlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f8e45f69cd982..a1ac7a4024c0a 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -552,9 +552,14 @@ impl MetaClient { .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_source(&self, source: PbSource) -> Result { + pub async fn alter_source( + &self, + source: PbSource, + plan: Option, + ) -> Result { let request = AlterSourceRequest { source: Some(source), + plan, }; let resp = self.inner.alter_source(request).await?; Ok(resp