diff --git a/e2e_test/source_inline/kafka/alter/add_column_shared.slt b/e2e_test/source_inline/kafka/alter/add_column_shared.slt new file mode 100644 index 0000000000000..45454df818afb --- /dev/null +++ b/e2e_test/source_inline/kafka/alter/add_column_shared.slt @@ -0,0 +1,199 @@ +control substitution on + +system ok +rpk topic create shared_source_alter -p 4 + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 1, "v2": "a", "v3": "a1"} +1 {"v1": 2, "v2": "b", "v3": "b1"} +2 {"v1": 3, "v2": "c", "v3": "c1"} +3 {"v1": 4, "v2": "d", "v3": "d1"} +EOF + +statement ok +create source s (v1 int, v2 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source_alter', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + + +statement ok +create materialized view mv_before_alter as select * from s; + +statement ok +create materialized view mv_before_alter_2 as select * from s; + + +sleep 2s + +query ?? rowsort +select * from s; +---- +1 a +2 b +3 c +4 d + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + + +statement ok +alter source s add column v3 varchar; + +# New MV will have v3. + +statement ok +create materialized view mv_after_alter as select * from s; + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + +# Produce new data. + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 5, "v2": "e", "v3": "e1"} +1 {"v1": 6, "v2": "f", "v3": "f1"} +2 {"v1": 7, "v2": "g", "v3": "g1"} +3 {"v1": 8, "v2": "h", "v3": "h1"} +EOF + +sleep 2s + + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +query error +select * from mv_after_alter_2; +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: Catalog error + 2: table or source not found: mv_after_alter_2 + + + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d +5 e +6 f +7 g +8 h + + +statement ok +drop source s cascade; + +# Test alter source without downstream + +statement ok +create source s (v1 int, v2 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source_alter', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + +statement ok +alter source s add column v3 varchar; + + +statement ok +create materialized view mv_after_alter as select * from s; + + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +statement ok +drop source s cascade; + +system ok +rpk topic delete shared_source_alter; + +# TODO: test alter source with schema registry diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 32e94fb06b46b..10334a2f9fe17 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -72,10 +72,12 @@ message DropSourceResponse { WaitVersion version = 2; } +// Only for non-shared source message AlterSourceRequest { catalog.Source source = 1; } +// Only for non-shared source message AlterSourceResponse { common.Status status = 1; WaitVersion version = 2; diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 21002f0121b0b..6af775d3584a9 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -25,6 +25,7 @@ use risingwave_pb::catalog::{ PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, PbTable, PbView, }; +use risingwave_pb::ddl_service::replace_job_plan::{ReplaceJob, ReplaceSource, ReplaceTable}; use risingwave_pb::ddl_service::{ alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request, create_connection_request, PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, @@ -99,6 +100,13 @@ pub trait CatalogWriter: Send + Sync { job_type: TableJobType, ) -> Result<()>; + async fn replace_source( + &self, + source: PbSource, + graph: StreamFragmentGraph, + mapping: ColIndexMapping, + ) -> Result<()>; + async fn create_index( &self, index: PbIndex, @@ -311,7 +319,34 @@ impl CatalogWriter for CatalogWriterImpl { ) -> Result<()> { let version = self .meta_client - .replace_table(source, table, graph, mapping, job_type) + .replace_job( + graph, + mapping, + ReplaceJob::ReplaceTable(ReplaceTable { + source, + table: Some(table), + job_type: job_type as _, + }), + ) + .await?; + self.wait_version(version).await + } + + async fn replace_source( + &self, + source: PbSource, + graph: StreamFragmentGraph, + mapping: ColIndexMapping, + ) -> Result<()> { + let version = self + .meta_client + .replace_job( + graph, + mapping, + ReplaceJob::ReplaceSource(ReplaceSource { + source: Some(source), + }), + ) .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 2d2e2c6698282..385fc34dba44a 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -14,14 +14,15 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::bail_not_implemented; use risingwave_common::catalog::max_column_id; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; use risingwave_sqlparser::ast::{ AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, }; use risingwave_sqlparser::parser::Parser; +use super::create_source::generate_stream_graph_for_source; use super::create_table::bind_sql_columns; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; @@ -38,7 +39,7 @@ pub async fn handle_alter_source_column( operation: AlterSourceOperation, ) -> Result { // Get original definition - let session = handler_args.session; + let session = handler_args.session.clone(); let db_name = session.database(); let (schema_name, real_source_name) = Binder::resolve_schema_qualified_name(db_name, source_name.clone())?; @@ -66,9 +67,6 @@ pub async fn handle_alter_source_column( ) .into()); }; - if catalog.info.is_shared() { - bail_not_implemented!(issue = 16003, "alter shared source"); - } // Currently only allow source without schema registry let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?; @@ -96,6 +94,7 @@ pub async fn handle_alter_source_column( SourceEncode::Json | SourceEncode::Csv | SourceEncode::Bytes | SourceEncode::Parquet => {} } + let old_columns = catalog.columns.clone(); let columns = &mut catalog.columns; match operation { AlterSourceOperation::AddColumn { column_def } => { @@ -121,9 +120,30 @@ pub async fn handle_alter_source_column( catalog.version += 1; let catalog_writer = session.catalog_writer()?; - catalog_writer - .alter_source(catalog.to_prost(schema_id, db_id)) - .await?; + if catalog.info.is_shared() { + let graph = generate_stream_graph_for_source(handler_args, catalog.clone())?; + + // Calculate the mapping from the original columns to the new columns. + let col_index_mapping = ColIndexMapping::new( + old_columns + .iter() + .map(|old_c| { + catalog + .columns + .iter() + .position(|new_c| new_c.column_id() == old_c.column_id()) + }) + .collect(), + catalog.columns.len(), + ); + catalog_writer + .replace_source(catalog.to_prost(schema_id, db_id), graph, col_index_mapping) + .await? + } else { + catalog_writer + .alter_source(catalog.to_prost(schema_id, db_id)) + .await? + }; Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index e910053ba136e..275559280291d 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -65,6 +65,7 @@ use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{EncodeType, FormatType}; +use risingwave_pb::stream_plan::PbStreamFragmentGraph; use risingwave_pb::telemetry::TelemetryDatabaseObject; use risingwave_sqlparser::ast::{ get_delimiter, AstString, ColumnDef, CreateSourceStatement, Encode, Format, @@ -1802,18 +1803,7 @@ pub async fn handle_create_source( let catalog_writer = session.catalog_writer()?; if is_shared { - let graph = { - let context = OptimizerContext::from_handler_args(handler_args); - let source_node = LogicalSource::with_catalog( - Rc::new(source_catalog), - SourceNodeKind::CreateSharedSource, - context.into(), - None, - )?; - - let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?; - build_graph(stream_plan)? - }; + let graph = generate_stream_graph_for_source(handler_args, source_catalog)?; catalog_writer.create_source(source, Some(graph)).await?; } else { // For other sources we don't create a streaming job @@ -1823,6 +1813,23 @@ pub async fn handle_create_source( Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE)) } +pub(super) fn generate_stream_graph_for_source( + handler_args: HandlerArgs, + source_catalog: SourceCatalog, +) -> Result { + let context = OptimizerContext::from_handler_args(handler_args); + let source_node = LogicalSource::with_catalog( + Rc::new(source_catalog), + SourceNodeKind::CreateSharedSource, + context.into(), + None, + )?; + + let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?; + let graph = build_graph(stream_plan)?; + Ok(graph) +} + fn format_to_prost(format: &Format) -> FormatType { match format { Format::Native => FormatType::Native, diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 24e4744450241..9a2515ba10f88 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -331,6 +331,16 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } + async fn replace_source( + &self, + source: PbSource, + _graph: StreamFragmentGraph, + _mapping: ColIndexMapping, + ) -> Result<()> { + self.catalog.write().update_source(&source); + Ok(()) + } + async fn create_source( &self, source: PbSource, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f8e45f69cd982..c2c5fa41c5916 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -27,7 +27,7 @@ use either::Either; use futures::stream::BoxStream; use list_rate_limits_response::RateLimitInfo; use lru::LruCache; -use replace_job_plan::{ReplaceJob, ReplaceTable}; +use replace_job_plan::ReplaceJob; use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; @@ -614,23 +614,17 @@ impl MetaClient { .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn replace_table( + pub async fn replace_job( &self, - source: Option, - table: PbTable, graph: StreamFragmentGraph, table_col_index_mapping: ColIndexMapping, - job_type: PbTableJobType, + replace_job: ReplaceJob, ) -> Result { let request = ReplaceJobPlanRequest { plan: Some(ReplaceJobPlan { fragment_graph: Some(graph), table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()), - replace_job: Some(ReplaceJob::ReplaceTable(ReplaceTable { - source, - table: Some(table), - job_type: job_type as _, - })), + replace_job: Some(replace_job), }), }; let resp = self.inner.replace_job_plan(request).await?;