From 60a8074dacf067194c3ba37c3c02dd15084f4b60 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 3 Dec 2024 12:42:53 +0800 Subject: [PATCH 1/2] feat(frontend): support alter shared source --- .../kafka/alter/add_column_shared.slt | 199 ++++++++++++++++++ proto/ddl_service.proto | 2 + src/frontend/src/catalog/catalog_service.rs | 37 +++- .../src/handler/alter_source_column.rs | 36 +++- src/frontend/src/handler/create_source.rs | 31 +-- src/frontend/src/test_utils.rs | 10 + src/rpc_client/src/meta_client.rs | 14 +- 7 files changed, 298 insertions(+), 31 deletions(-) create mode 100644 e2e_test/source_inline/kafka/alter/add_column_shared.slt diff --git a/e2e_test/source_inline/kafka/alter/add_column_shared.slt b/e2e_test/source_inline/kafka/alter/add_column_shared.slt new file mode 100644 index 0000000000000..45454df818afb --- /dev/null +++ b/e2e_test/source_inline/kafka/alter/add_column_shared.slt @@ -0,0 +1,199 @@ +control substitution on + +system ok +rpk topic create shared_source_alter -p 4 + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 1, "v2": "a", "v3": "a1"} +1 {"v1": 2, "v2": "b", "v3": "b1"} +2 {"v1": 3, "v2": "c", "v3": "c1"} +3 {"v1": 4, "v2": "d", "v3": "d1"} +EOF + +statement ok +create source s (v1 int, v2 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source_alter', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + + +statement ok +create materialized view mv_before_alter as select * from s; + +statement ok +create materialized view mv_before_alter_2 as select * from s; + + +sleep 2s + +query ?? rowsort +select * from s; +---- +1 a +2 b +3 c +4 d + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + + +statement ok +alter source s add column v3 varchar; + +# New MV will have v3. + +statement ok +create materialized view mv_after_alter as select * from s; + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d + +# Produce new data. + +system ok +cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0 +0 {"v1": 5, "v2": "e", "v3": "e1"} +1 {"v1": 6, "v2": "f", "v3": "f1"} +2 {"v1": 7, "v2": "g", "v3": "g1"} +3 {"v1": 8, "v2": "h", "v3": "h1"} +EOF + +sleep 2s + + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +query error +select * from mv_after_alter_2; +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: Catalog error + 2: table or source not found: mv_after_alter_2 + + + +# Batch select from source will have v3. + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +# Old MV is not affected. + +query ?? rowsort +select * from mv_before_alter; +---- +1 a +2 b +3 c +4 d +5 e +6 f +7 g +8 h + + +statement ok +drop source s cascade; + +# Test alter source without downstream + +statement ok +create source s (v1 int, v2 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source_alter', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + +statement ok +alter source s add column v3 varchar; + + +statement ok +create materialized view mv_after_alter as select * from s; + + +query ??? rowsort +select * from mv_after_alter; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +query ??? rowsort +select * from s; +---- +1 a a1 +2 b b1 +3 c c1 +4 d d1 +5 e e1 +6 f f1 +7 g g1 +8 h h1 + +statement ok +drop source s cascade; + +system ok +rpk topic delete shared_source_alter; + +# TODO: test alter source with schema registry diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 0b623e840bfa0..19bcc69798941 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -72,10 +72,12 @@ message DropSourceResponse { WaitVersion version = 2; } +// Only for non-shared source message AlterSourceRequest { catalog.Source source = 1; } +// Only for non-shared source message AlterSourceResponse { common.Status status = 1; WaitVersion version = 2; diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 21002f0121b0b..6af775d3584a9 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -25,6 +25,7 @@ use risingwave_pb::catalog::{ PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, PbTable, PbView, }; +use risingwave_pb::ddl_service::replace_job_plan::{ReplaceJob, ReplaceSource, ReplaceTable}; use risingwave_pb::ddl_service::{ alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request, create_connection_request, PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, @@ -99,6 +100,13 @@ pub trait CatalogWriter: Send + Sync { job_type: TableJobType, ) -> Result<()>; + async fn replace_source( + &self, + source: PbSource, + graph: StreamFragmentGraph, + mapping: ColIndexMapping, + ) -> Result<()>; + async fn create_index( &self, index: PbIndex, @@ -311,7 +319,34 @@ impl CatalogWriter for CatalogWriterImpl { ) -> Result<()> { let version = self .meta_client - .replace_table(source, table, graph, mapping, job_type) + .replace_job( + graph, + mapping, + ReplaceJob::ReplaceTable(ReplaceTable { + source, + table: Some(table), + job_type: job_type as _, + }), + ) + .await?; + self.wait_version(version).await + } + + async fn replace_source( + &self, + source: PbSource, + graph: StreamFragmentGraph, + mapping: ColIndexMapping, + ) -> Result<()> { + let version = self + .meta_client + .replace_job( + graph, + mapping, + ReplaceJob::ReplaceSource(ReplaceSource { + source: Some(source), + }), + ) .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 2d2e2c6698282..385fc34dba44a 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -14,14 +14,15 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::bail_not_implemented; use risingwave_common::catalog::max_column_id; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; use risingwave_sqlparser::ast::{ AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, }; use risingwave_sqlparser::parser::Parser; +use super::create_source::generate_stream_graph_for_source; use super::create_table::bind_sql_columns; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; @@ -38,7 +39,7 @@ pub async fn handle_alter_source_column( operation: AlterSourceOperation, ) -> Result { // Get original definition - let session = handler_args.session; + let session = handler_args.session.clone(); let db_name = session.database(); let (schema_name, real_source_name) = Binder::resolve_schema_qualified_name(db_name, source_name.clone())?; @@ -66,9 +67,6 @@ pub async fn handle_alter_source_column( ) .into()); }; - if catalog.info.is_shared() { - bail_not_implemented!(issue = 16003, "alter shared source"); - } // Currently only allow source without schema registry let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?; @@ -96,6 +94,7 @@ pub async fn handle_alter_source_column( SourceEncode::Json | SourceEncode::Csv | SourceEncode::Bytes | SourceEncode::Parquet => {} } + let old_columns = catalog.columns.clone(); let columns = &mut catalog.columns; match operation { AlterSourceOperation::AddColumn { column_def } => { @@ -121,9 +120,30 @@ pub async fn handle_alter_source_column( catalog.version += 1; let catalog_writer = session.catalog_writer()?; - catalog_writer - .alter_source(catalog.to_prost(schema_id, db_id)) - .await?; + if catalog.info.is_shared() { + let graph = generate_stream_graph_for_source(handler_args, catalog.clone())?; + + // Calculate the mapping from the original columns to the new columns. + let col_index_mapping = ColIndexMapping::new( + old_columns + .iter() + .map(|old_c| { + catalog + .columns + .iter() + .position(|new_c| new_c.column_id() == old_c.column_id()) + }) + .collect(), + catalog.columns.len(), + ); + catalog_writer + .replace_source(catalog.to_prost(schema_id, db_id), graph, col_index_mapping) + .await? + } else { + catalog_writer + .alter_source(catalog.to_prost(schema_id, db_id)) + .await? + }; Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 7102e44f9cb5e..e8283a8a78f06 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -66,6 +66,7 @@ use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{EncodeType, FormatType}; +use risingwave_pb::stream_plan::PbStreamFragmentGraph; use risingwave_pb::telemetry::TelemetryDatabaseObject; use risingwave_sqlparser::ast::{ get_delimiter, AstString, ColumnDef, CreateSourceStatement, Encode, Format, @@ -1805,18 +1806,7 @@ pub async fn handle_create_source( let catalog_writer = session.catalog_writer()?; if is_shared { - let graph = { - let context = OptimizerContext::from_handler_args(handler_args); - let source_node = LogicalSource::with_catalog( - Rc::new(source_catalog), - SourceNodeKind::CreateSharedSource, - context.into(), - None, - )?; - - let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?; - build_graph(stream_plan)? - }; + let graph = generate_stream_graph_for_source(handler_args, source_catalog)?; catalog_writer.create_source(source, Some(graph)).await?; } else { // For other sources we don't create a streaming job @@ -1826,6 +1816,23 @@ pub async fn handle_create_source( Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE)) } +pub(super) fn generate_stream_graph_for_source( + handler_args: HandlerArgs, + source_catalog: SourceCatalog, +) -> Result { + let context = OptimizerContext::from_handler_args(handler_args); + let source_node = LogicalSource::with_catalog( + Rc::new(source_catalog), + SourceNodeKind::CreateSharedSource, + context.into(), + None, + )?; + + let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?; + let graph = build_graph(stream_plan)?; + Ok(graph) +} + fn format_to_prost(format: &Format) -> FormatType { match format { Format::Native => FormatType::Native, diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index fc36500415815..d781a58356787 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -331,6 +331,16 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } + async fn replace_source( + &self, + source: PbSource, + _graph: StreamFragmentGraph, + _mapping: ColIndexMapping, + ) -> Result<()> { + self.catalog.write().update_source(&source); + Ok(()) + } + async fn create_source( &self, source: PbSource, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f8e45f69cd982..c2c5fa41c5916 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -27,7 +27,7 @@ use either::Either; use futures::stream::BoxStream; use list_rate_limits_response::RateLimitInfo; use lru::LruCache; -use replace_job_plan::{ReplaceJob, ReplaceTable}; +use replace_job_plan::ReplaceJob; use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; @@ -614,23 +614,17 @@ impl MetaClient { .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn replace_table( + pub async fn replace_job( &self, - source: Option, - table: PbTable, graph: StreamFragmentGraph, table_col_index_mapping: ColIndexMapping, - job_type: PbTableJobType, + replace_job: ReplaceJob, ) -> Result { let request = ReplaceJobPlanRequest { plan: Some(ReplaceJobPlan { fragment_graph: Some(graph), table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()), - replace_job: Some(ReplaceJob::ReplaceTable(ReplaceTable { - source, - table: Some(table), - job_type: job_type as _, - })), + replace_job: Some(replace_job), }), }; let resp = self.inner.replace_job_plan(request).await?; From d2a8cc2855fd8107665cfec77b07e8b7d451eb32 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 5 Dec 2024 17:49:17 +0800 Subject: [PATCH 2/2] fix test --- .../src/handler/alter_source_column.rs | 91 +++++++++++++++---- 1 file changed, 73 insertions(+), 18 deletions(-) diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 385fc34dba44a..ef91e011365e7 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -171,10 +171,9 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul #[cfg(test)] pub mod tests { - use std::collections::HashMap; + use std::collections::BTreeMap; use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME}; - use risingwave_common::types::DataType; use crate::catalog::root_catalog::SchemaPath; use crate::test_utils::LocalFrontend; @@ -211,47 +210,103 @@ pub mod tests { .await .unwrap(); - let get_source = || { + let get_source = |name: &str| { let catalog_reader = session.env().catalog_reader().read_guard(); catalog_reader - .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "s") + .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, name) .unwrap() .0 .clone() }; - let source = get_source(); - let columns: HashMap<_, _> = source + let source = get_source("s"); + + let sql = "alter source s_shared add column v2 varchar;"; + frontend.run_sql(sql).await.unwrap(); + + let altered_source = get_source("s_shared"); + let altered_columns: BTreeMap<_, _> = altered_source .columns .iter() .map(|col| (col.name(), (col.data_type().clone(), col.column_id()))) .collect(); - let sql = "alter source s_shared add column v2 varchar;"; - assert_eq!("Feature is not yet implemented: alter shared source\nTracking issue: https://github.com/risingwavelabs/risingwave/issues/16003", &frontend.run_sql(sql).await.unwrap_err().to_string()); + // Check the new column is added. + // Check the old columns and IDs are not changed. + expect_test::expect![[r#" + { + "_row_id": ( + Serial, + #0, + ), + "_rw_kafka_offset": ( + Varchar, + #4, + ), + "_rw_kafka_partition": ( + Varchar, + #3, + ), + "_rw_kafka_timestamp": ( + Timestamptz, + #2, + ), + "v1": ( + Int32, + #1, + ), + "v2": ( + Varchar, + #5, + ), + } + "#]] + .assert_debug_eq(&altered_columns); + + // Check version + assert_eq!(source.version + 1, altered_source.version); + + // Check definition + expect_test::expect!["CREATE SOURCE s_shared (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition); + let sql = "alter source s add column v2 varchar;"; frontend.run_sql(sql).await.unwrap(); - let altered_source = get_source(); - - let altered_columns: HashMap<_, _> = altered_source + let altered_source = get_source("s"); + let altered_columns: BTreeMap<_, _> = altered_source .columns .iter() .map(|col| (col.name(), (col.data_type().clone(), col.column_id()))) .collect(); - // Check the new column. - assert_eq!(columns.len() + 1, altered_columns.len()); - assert_eq!(altered_columns["v2"].0, DataType::Varchar); - + // Check the new column is added. // Check the old columns and IDs are not changed. - assert_eq!(columns["v1"], altered_columns["v1"]); + expect_test::expect![[r#" + { + "_row_id": ( + Serial, + #0, + ), + "_rw_kafka_timestamp": ( + Timestamptz, + #2, + ), + "v1": ( + Int32, + #1, + ), + "v2": ( + Varchar, + #3, + ), + } + "#]] + .assert_debug_eq(&altered_columns); // Check version assert_eq!(source.version + 1, altered_source.version); // Check definition - let altered_sql = r#"CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"#; - assert_eq!(altered_sql, altered_source.definition); + expect_test::expect!["CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092') FORMAT PLAIN ENCODE JSON"].assert_eq(&altered_source.definition); } }