diff --git a/e2e_test/source_inline/kafka/alter/add_column_shared.slt b/e2e_test/source_inline/kafka/alter/add_column_shared.slt new file mode 100644 index 0000000000000..f6e38a25a4595 --- /dev/null +++ b/e2e_test/source_inline/kafka/alter/add_column_shared.slt @@ -0,0 +1,138 @@ +control substitution on + +system ok +rpk topic create shared_source_alter -p 4 + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 1, "v2": "a", "v3": "a1"} +1 {"v1": 2, "v2": "b", "v3": "b1"} +2 {"v1": 3, "v2": "c", "v3": "c1"} +3 {"v1": 4, "v2": "d", "v3": "d1"} +EOF + +statement ok +create source s (v1 int, v2 varchar, v3 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source_alter', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + + +statement ok +create materialized view mv_before_alter as select * from s; + +sleep 2s + +query ?? rowsort +select * from s; +---- +1 a +2 b +3 c +4 d + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + + +statement ok +alter source s add column v3 varchar; + +# New MV will have v3. + +statement ok +create materialized view mv_after_alter as select * from s; + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + +# Produce new data. + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 5, "v2": "e", "v3": "e1"} +1 {"v1": 6, "v2": "f", "v3": "f1"} +2 {"v1": 7, "v2": "g", "v3": "g1"} +3 {"v1": 8, "v2": "h", "v3": "h1"} +EOF + +sleep 2s + + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d +5 e +6 f +7 g +8 h + + +statement ok +drop source s cascade; + +# TODO: test alter source with schema registry diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 32e94fb06b46b..10334a2f9fe17 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -72,10 +72,12 @@ message DropSourceResponse { WaitVersion version = 2; } +// Only for non-shared source message AlterSourceRequest { catalog.Source source = 1; } +// Only for non-shared source message AlterSourceResponse { common.Status status = 1; WaitVersion version = 2; diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 21002f0121b0b..4c807493c8691 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -25,6 +25,7 @@ use risingwave_pb::catalog::{ PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, PbTable, PbView, }; +use risingwave_pb::ddl_service::replace_job_plan::{ReplaceJob, ReplaceSource, ReplaceTable}; use risingwave_pb::ddl_service::{ alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request, create_connection_request, PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, @@ -99,6 +100,13 @@ pub trait CatalogWriter: Send + Sync { job_type: TableJobType, ) -> Result<()>; + async fn replace_source( + &self, + source: PbSource, + graph: StreamFragmentGraph, + mapping: ColIndexMapping, + ) -> Result<()>; + async fn create_index( &self, index: PbIndex, @@ -311,7 +319,34 @@ impl CatalogWriter for CatalogWriterImpl { ) -> Result<()> { let version = self .meta_client - .replace_table(source, table, graph, mapping, job_type) + .replace_job( + graph, + mapping, + ReplaceJob::ReplaceTable(ReplaceTable { + source, + table: Some(table), + job_type: job_type as _, + }), + ) + .await?; + self.wait_version(version).await + } + + async fn replace_source( + &self, + source: PbSource, + graph: StreamFragmentGraph, + mapping: ColIndexMapping, + ) -> Result<()> { + let version = self + .meta_client + .replace_job( + graph, + mapping, + ReplaceJob::ReplaceSource(ReplaceSource { + source: Some(source), + }), + ) .await?; self.wait_version(version).await } @@ -499,7 +534,10 @@ impl CatalogWriter for CatalogWriterImpl { } async fn alter_source(&self, source: PbSource) -> Result<()> { - let version = self.meta_client.alter_source(source).await?; + let version = self + .meta_client + .alter_source(source, replace_streaming_job_plan) + .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 2d2e2c6698282..9dcfcd75a7cc1 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -12,19 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::max_column_id; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; +use risingwave_pb::catalog::PbSource; +use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, }; use risingwave_sqlparser::parser::Parser; +use super::create_source::generate_stream_graph_for_source; use super::create_table::bind_sql_columns; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::source_catalog::SourceCatalog; use crate::error::{ErrorCode, Result, RwError}; use crate::Binder; @@ -66,9 +73,6 @@ pub async fn handle_alter_source_column( ) .into()); }; - if catalog.info.is_shared() { - bail_not_implemented!(issue = 16003, "alter shared source"); - } // Currently only allow source without schema registry let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?; @@ -96,6 +100,7 @@ pub async fn handle_alter_source_column( SourceEncode::Json | SourceEncode::Csv | SourceEncode::Bytes | SourceEncode::Parquet => {} } + let old_columns = catalog.columns.clone(); let columns = &mut catalog.columns; match operation { AlterSourceOperation::AddColumn { column_def } => { @@ -121,9 +126,30 @@ pub async fn handle_alter_source_column( catalog.version += 1; let catalog_writer = session.catalog_writer()?; - catalog_writer - .alter_source(catalog.to_prost(schema_id, db_id)) - .await?; + if catalog.info.is_shared() { + let graph = generate_stream_graph_for_source(handler_args, Rc::new(catalog))?; + + // Calculate the mapping from the original columns to the new columns. + let col_index_mapping = ColIndexMapping::new( + old_columns + .iter() + .map(|old_c| { + catalog + .columns + .iter() + .position(|new_c| new_c.column_id() == old_c.column_id()) + }) + .collect(), + catalog.columns.len(), + ); + catalog_writer + .replace_source(catalog.to_prost(schema_id, db_id), graph, col_index_mapping) + .await? + } else { + catalog_writer + .alter_source(catalog.to_prost(schema_id, db_id)) + .await? + }; Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) } diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 5e889ef9f0d7e..377e0256c3d7f 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -278,7 +278,7 @@ pub async fn handle_alter_source_with_sr( pb_source.version += 1; let catalog_writer = session.catalog_writer()?; - catalog_writer.alter_source(pb_source).await?; + catalog_writer.alter_source(pb_source, todo!()).await?; Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE)) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index e910053ba136e..5dbc4e39d1c0e 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -65,6 +65,7 @@ use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{EncodeType, FormatType}; +use risingwave_pb::stream_plan::PbStreamFragmentGraph; use risingwave_pb::telemetry::TelemetryDatabaseObject; use risingwave_sqlparser::ast::{ get_delimiter, AstString, ColumnDef, CreateSourceStatement, Encode, Format, @@ -1802,18 +1803,7 @@ pub async fn handle_create_source( let catalog_writer = session.catalog_writer()?; if is_shared { - let graph = { - let context = OptimizerContext::from_handler_args(handler_args); - let source_node = LogicalSource::with_catalog( - Rc::new(source_catalog), - SourceNodeKind::CreateSharedSource, - context.into(), - None, - )?; - - let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?; - build_graph(stream_plan)? - }; + let graph = generate_stream_graph_for_source(handler_args, Rc::new(source_catalog))?; catalog_writer.create_source(source, Some(graph)).await?; } else { // For other sources we don't create a streaming job @@ -1823,6 +1813,22 @@ pub async fn handle_create_source( Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE)) } +pub(super) fn generate_stream_graph_for_source( + handler_args: HandlerArgs, + source_catalog: Rc, +) -> Result { + let context = OptimizerContext::from_handler_args(handler_args); + let source_node = LogicalSource::with_catalog( + source_catalog, + SourceNodeKind::CreateSharedSource, + context.into(), + None, + )?; + + let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?; + build_graph(stream_plan) +} + fn format_to_prost(format: &Format) -> FormatType { match format { Format::Native => FormatType::Native, diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 24e4744450241..9a2515ba10f88 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -331,6 +331,16 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } + async fn replace_source( + &self, + source: PbSource, + _graph: StreamFragmentGraph, + _mapping: ColIndexMapping, + ) -> Result<()> { + self.catalog.write().update_source(&source); + Ok(()) + } + async fn create_source( &self, source: PbSource, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 033b0e135ccc6..a6844903564fe 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -48,7 +48,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, alter_swap_rename_request, DdlProgress, - TableJobType, WaitVersion, + PbReplaceStreamingJobPlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f8e45f69cd982..c2c5fa41c5916 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -27,7 +27,7 @@ use either::Either; use futures::stream::BoxStream; use list_rate_limits_response::RateLimitInfo; use lru::LruCache; -use replace_job_plan::{ReplaceJob, ReplaceTable}; +use replace_job_plan::ReplaceJob; use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; @@ -614,23 +614,17 @@ impl MetaClient { .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn replace_table( + pub async fn replace_job( &self, - source: Option, - table: PbTable, graph: StreamFragmentGraph, table_col_index_mapping: ColIndexMapping, - job_type: PbTableJobType, + replace_job: ReplaceJob, ) -> Result { let request = ReplaceJobPlanRequest { plan: Some(ReplaceJobPlan { fragment_graph: Some(graph), table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()), - replace_job: Some(ReplaceJob::ReplaceTable(ReplaceTable { - source, - table: Some(table), - job_type: job_type as _, - })), + replace_job: Some(replace_job), }), }; let resp = self.inner.replace_job_plan(request).await?;