diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index c300e0e8a424e..b4c00cec53fe8 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -27,6 +27,7 @@ if [[ $mode == "standalone" ]]; then fi if [[ $mode == "single-node" ]]; then + export RUST_MIN_STACK=4194304 source ci/scripts/single-node-utils.sh fi diff --git a/e2e_test/sink/sink_into_table/alter_column.slt b/e2e_test/sink/sink_into_table/alter_column.slt new file mode 100644 index 0000000000000..ce003ebef3fa2 --- /dev/null +++ b/e2e_test/sink/sink_into_table/alter_column.slt @@ -0,0 +1,201 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t_simple_1 (v1 int); + +statement ok +create table m_simple (v1 int primary key); + +statement ok +create sink s_simple_1 into m_simple as select v1 from t_simple_1; + +statement ok +insert into t_simple_1 values (1), (2), (3); + +statement ok +flush; + +query I rowsort +select * from m_simple; +---- +1 +2 +3 + +statement ok +alter table m_simple add column v2 int; + +statement ok +insert into t_simple_1 values (4); + +statement ok +flush; + +query II rowsort +select * from m_simple; +---- +1 NULL +2 NULL +3 NULL +4 NULL + +statement ok +create table t_simple_2 (v1 int, v2 int); + +statement ok +create sink s_simple_2 into m_simple as select v1, v2 from t_simple_2; + +statement ok +insert into t_simple_2 values (100, 101), (200, 201), (300, 301); + +statement ok +flush; + +query II rowsort +select * from m_simple; +---- +1 NULL +100 101 +2 NULL +200 201 +3 NULL +300 301 +4 NULL + +statement error dropping columns in target table of sinks is not supported +alter table m_simple drop column v2; + +statement ok +drop sink s_simple_1; + +statement ok +drop sink s_simple_2; + +statement ok +drop table t_simple_1; + +statement ok +drop table t_simple_2; + +statement ok +drop table m_simple; + +# target table with row_id as primary key +statement ok +create table t_s1 (v1 int); + +statement ok +insert into t_s1 values (1), (2), (3); + +statement ok +create table t_row_id_as_primary_key (v1 int, v2 int default 1000); + +statement ok +create sink s1 into t_row_id_as_primary_key as select v1 from t_s1 with (type = 'append-only', force_append_only = 'true'); + +statement ok +flush; + +query II rowsort +select * from t_row_id_as_primary_key; +---- +1 1000 +2 1000 +3 1000 + +statement ok +alter table t_row_id_as_primary_key add column v3 int; + +query III rowsort +select * from t_row_id_as_primary_key; +---- +1 1000 NULL +2 1000 NULL +3 1000 NULL + +statement ok +create sink s11 into t_row_id_as_primary_key as select v1+1000 as v1, v1+2000 as v2, v1+3000 as v3 from t_s1 with (type = 'append-only', force_append_only = 'true'); + +statement ok +flush; + +query III rowsort +select * from t_row_id_as_primary_key; +---- +1 1000 NULL +1001 2001 3001 +1002 2002 3002 +1003 2003 3003 +2 1000 NULL +3 1000 NULL + +statement ok +drop sink s1; + +statement ok +drop sink s11; + +statement ok +drop table t_row_id_as_primary_key; + +statement ok +drop table t_s1; + +# target table with append only +statement ok +create table t_s2 (v1 int); + +statement ok +insert into t_s2 values (1), (2), (3); + +statement ok +create table t_append_only (v1 int, v2 int default 1000) append only; + +statement ok +create sink s2 into t_append_only as select v1 from t_s2 with (type = 'append-only', force_append_only = 'true'); + +statement ok +flush; + +query II rowsort +select * from t_append_only; +---- +1 1000 +2 1000 +3 1000 + +statement ok +alter table t_append_only add column v3 int; + +query III rowsort +select * from t_append_only; +---- +1 1000 NULL +2 1000 NULL +3 1000 NULL + +statement ok +create sink s21 into t_append_only as select v1+1000 as v1, v1+2000 as v2, v1+3000 as v3 from t_s2 with (type = 'append-only', force_append_only = 'true'); + +query III rowsort +select * from t_append_only; +---- +1 1000 NULL +1001 2001 3001 +1002 2002 3002 +1003 2003 3003 +2 1000 NULL +3 1000 NULL + +statement ok +drop sink s21; + +statement ok +drop sink s2; + +statement ok +drop table t_append_only; + +statement ok +drop table t_s2; diff --git a/e2e_test/sink/sink_into_table/basic.slt b/e2e_test/sink/sink_into_table/basic.slt index 59e43773560f0..e2a10d46fbf37 100644 --- a/e2e_test/sink/sink_into_table/basic.slt +++ b/e2e_test/sink/sink_into_table/basic.slt @@ -13,10 +13,6 @@ create table m_simple (v1 int primary key, v2 int); statement ok create sink s_simple_1 into m_simple as select v1, v2 from t_simple; -# we can't alter a table with incoming sinks -statement error Feature is not yet implemented: alter table with incoming sinks -alter table m_simple add column v3 int; - statement ok drop sink s_simple_1; diff --git a/proto/catalog.proto b/proto/catalog.proto index b18275e32d4ce..e60c99814d9ec 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -195,6 +195,9 @@ message Sink { // Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type. // Used for connect options. map secret_refs = 25; + + // only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan. + repeated plan_common.ColumnCatalog original_target_columns = 26; } message Subscription { diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 614f5d09516e3..cfd632170d3fb 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -113,6 +113,7 @@ impl SinkDesc { created_at_cluster_version: None, initialized_at_cluster_version: None, create_type: self.create_type, + original_target_columns: vec![], } } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 501d20b43cd1e..7a9dc5d564ca7 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -364,6 +364,9 @@ pub struct SinkCatalog { /// The secret reference for the sink, mapping from property name to secret id. pub secret_refs: BTreeMap, + + /// Only for the sink whose target is a table. Columns of the target table when the sink is created. At this point all the default columns of the target table are all handled by the project operator in the sink plan. + pub original_target_columns: Vec, } impl SinkCatalog { @@ -406,6 +409,11 @@ impl SinkCatalog { initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), create_type: self.create_type.to_proto() as i32, secret_refs: self.secret_refs.clone(), + original_target_columns: self + .original_target_columns + .iter() + .map(|c| c.to_protobuf()) + .collect_vec(), } } @@ -442,6 +450,11 @@ impl SinkCatalog { pub fn downstream_pk_indices(&self) -> Vec { self.downstream_pk.clone() } + + pub fn unique_identity(&self) -> String { + // We need to align with meta here, so we've utilized the proto method. + self.to_proto().unique_identity() + } } impl From for SinkCatalog { @@ -500,6 +513,11 @@ impl From for SinkCatalog { created_at_cluster_version: pb.created_at_cluster_version, create_type: CreateType::from_proto(create_type), secret_refs: pb.secret_refs, + original_target_columns: pb + .original_target_columns + .into_iter() + .map(ColumnCatalog::from) + .collect_vec(), } } } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 29c74a64e2551..7d7d05fe92eef 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -464,6 +464,24 @@ impl TableCatalog { } } + pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec { + columns + .iter() + .map(|c| { + if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { + expr, + .. + })) = c.column_desc.generated_or_default_column.as_ref() + { + ExprImpl::from_expr_proto(expr.as_ref().unwrap()) + .expect("expr in default columns corrupted") + } else { + ExprImpl::literal_null(c.data_type().clone()) + } + }) + .collect() + } + pub fn default_columns(&self) -> impl Iterator + '_ { self.columns.iter().enumerate().filter_map(|(i, c)| { if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index bd102f88553b6..ba3ae95399783 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -12,13 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use anyhow::Context; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::catalog::ColumnCatalog; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::{bail, bail_not_implemented}; +use risingwave_connector::sink::catalog::SinkCatalog; +use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; use risingwave_sqlparser::ast::{ AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement, }; @@ -30,8 +35,9 @@ use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::table_catalog::TableType; -use crate::error::{ErrorCode, Result}; -use crate::expr::ExprImpl; +use crate::error::{ErrorCode, Result, RwError}; +use crate::expr::{Expr, ExprImpl, InputRef, Literal}; +use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project}; use crate::session::SessionImpl; use crate::{Binder, TableCatalog, WithOptions}; @@ -60,14 +66,14 @@ pub async fn replace_table_with_definition( panic!("unexpected statement type: {:?}", definition); }; - let (graph, table, source, job_type) = generate_stream_graph_for_table( + let (mut graph, mut table, source, job_type) = generate_stream_graph_for_table( session, table_name, original_catalog, source_schema, - handler_args, + handler_args.clone(), col_id_gen, - columns, + columns.clone(), wildcard_idx, constraints, source_watermarks, @@ -92,6 +98,25 @@ pub async fn replace_table_with_definition( table.columns.len(), ); + let incoming_sink_ids: HashSet<_> = original_catalog.incoming_sinks.iter().copied().collect(); + + let target_columns = table + .columns + .iter() + .map(|col| ColumnCatalog::from(col.clone())) + .collect_vec(); + + for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? { + hijack_merger_for_target_table( + &mut graph, + &target_columns, + &sink, + Some(&sink.unique_identity()), + )?; + } + + table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); + let catalog_writer = session.catalog_writer()?; catalog_writer @@ -100,6 +125,58 @@ pub async fn replace_table_with_definition( Ok(()) } +pub(crate) fn hijack_merger_for_target_table( + graph: &mut StreamFragmentGraph, + target_columns: &[ColumnCatalog], + sink: &SinkCatalog, + uniq_identify: Option<&str>, +) -> Result<()> { + let mut sink_columns = sink.original_target_columns.clone(); + if sink_columns.is_empty() { + // This is due to the fact that the value did not exist in earlier versions, + // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table. + // Therefore the columns of the table at this point are `original_target_columns`. + // This value of sink will be filled on the meta. + sink_columns = target_columns.to_vec(); + } + + let mut i = 0; + let mut j = 0; + let mut exprs = Vec::new(); + + while j < target_columns.len() { + if i < sink_columns.len() && sink_columns[i].data_type() == target_columns[j].data_type() { + exprs.push(ExprImpl::InputRef(Box::new(InputRef { + data_type: sink_columns[i].data_type().clone(), + index: i, + }))); + + i += 1; + j += 1; + } else { + exprs.push(ExprImpl::Literal(Box::new(Literal::new( + None, + target_columns[j].data_type().clone(), + )))); + + j += 1; + } + } + + let pb_project = PbNodeBody::Project(ProjectNode { + select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(), + ..Default::default() + }); + + for fragment in graph.fragments.values_mut() { + if let Some(node) = &mut fragment.node { + insert_merger_to_union_with_project(node, &pb_project, uniq_identify); + } + } + + Ok(()) +} + /// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or /// `DropColumn`. pub async fn handle_alter_table_column( @@ -110,8 +187,11 @@ pub async fn handle_alter_table_column( let session = handler_args.session; let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; - if !original_catalog.incoming_sinks.is_empty() { - bail_not_implemented!("alter table with incoming sinks"); + if !original_catalog.incoming_sinks.is_empty() && original_catalog.has_generated_column() { + return Err(RwError::from(ErrorCode::BindError( + "Alter a table with incoming sink and generated column has not been implemented." + .to_string(), + ))); } // Retrieve the original table definition and parse it to AST. @@ -151,6 +231,14 @@ pub async fn handle_alter_table_column( ))? } + if !original_catalog.incoming_sinks.is_empty() + && matches!(operation, AlterTableOperation::DropColumn { .. }) + { + return Err(ErrorCode::InvalidInputSyntax( + "dropping columns in target table of sinks is not supported".to_string(), + ))?; + } + match operation { AlterTableOperation::AddColumn { column_def: new_column, diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index c44e6a2367bb1..1ca8e39adad69 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::{Arc, LazyLock}; use anyhow::Context; @@ -22,7 +22,9 @@ use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::IcebergArrowConvert; -use risingwave_common::catalog::{ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId}; +use risingwave_common::catalog::{ + ColumnCatalog, ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId, +}; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_common::{bail, catalog}; @@ -31,11 +33,11 @@ use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; -use risingwave_pb::catalog::{PbSource, Table}; +use risingwave_pb::catalog::{PbSink, PbSource, Table}; use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; -use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode}; +use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; +use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode}; use risingwave_sqlparser::ast::{ ConnectorSchema, CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, Query, Statement, @@ -50,6 +52,7 @@ use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::view_catalog::ViewCatalog; +use crate::catalog::SinkId; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{rewrite_now_to_proctime, ExprImpl, InputRef}; use crate::handler::alter_table_column::fetch_table_catalog_for_alter; @@ -290,7 +293,7 @@ pub async fn gen_sink_plan( let exprs = derive_default_column_project_for_sink( &sink_catalog, sink_plan.schema(), - table_catalog, + table_catalog.columns(), user_specified_columns, )?; @@ -300,6 +303,7 @@ pub async fn gen_sink_plan( let exprs = LogicalSource::derive_output_exprs_from_generated_columns(table_catalog.columns())?; + if let Some(exprs) = exprs { let logical_project = generic::Project::new(exprs, sink_plan); sink_plan = StreamProject::new(logical_project).into(); @@ -419,7 +423,7 @@ pub async fn handle_create_sink( return Ok(resp); } - let (sink, graph, target_table_catalog) = { + let (mut sink, graph, target_table_catalog) = { let SinkPlanContext { query, sink_plan: plan, @@ -450,23 +454,38 @@ pub async fn handle_create_sink( let mut target_table_replace_plan = None; if let Some(table_catalog) = target_table_catalog { + use crate::handler::alter_table_column::hijack_merger_for_target_table; + check_cycle_for_sink(session.as_ref(), sink.clone(), table_catalog.id())?; let (mut graph, mut table, source) = reparse_table_for_sink(&session, &table_catalog).await?; + sink.original_target_columns = table + .columns + .iter() + .map(|col| ColumnCatalog::from(col.clone())) + .collect_vec(); + table .incoming_sinks .clone_from(&table_catalog.incoming_sinks); - for _ in 0..(table_catalog.incoming_sinks.len() + 1) { - for fragment in graph.fragments.values_mut() { - if let Some(node) = &mut fragment.node { - insert_merger_to_union(node); - } - } + let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect(); + let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?; + + for existing_sink in incoming_sinks { + hijack_merger_for_target_table( + &mut graph, + table_catalog.columns(), + &existing_sink, + Some(&existing_sink.unique_identity()), + )?; } + // for new creating sink, we don't have a unique identity because the sink id is not generated yet. + hijack_merger_for_target_table(&mut graph, table_catalog.columns(), &sink, None)?; + target_table_replace_plan = Some(ReplaceTablePlan { source, table: Some(table), @@ -495,6 +514,24 @@ pub async fn handle_create_sink( Ok(PgResponse::empty_result(StatementType::CREATE_SINK)) } +pub fn fetch_incoming_sinks( + session: &Arc, + incoming_sink_ids: &HashSet, +) -> Result>> { + let reader = session.env().catalog_reader().read_guard(); + let mut sinks = Vec::with_capacity(incoming_sink_ids.len()); + let db_name = session.database(); + for schema in reader.iter_schemas(db_name)? { + for sink in schema.iter_sink() { + if incoming_sink_ids.contains(&sink.id.sink_id) { + sinks.push(sink.clone()); + } + } + } + + Ok(sinks) +} + fn check_cycle_for_sink( session: &SessionImpl, sink_catalog: SinkCatalog, @@ -662,15 +699,25 @@ pub(crate) async fn reparse_table_for_sink( Ok((graph, table, source)) } -pub(crate) fn insert_merger_to_union(node: &mut StreamNode) { +pub(crate) fn insert_merger_to_union_with_project( + node: &mut StreamNode, + project_node: &PbNodeBody, + uniq_identity: Option<&str>, +) { if let Some(NodeBody::Union(_union_node)) = &mut node.node_body { + // TODO: MergeNode is used as a placeholder, see issue #17658 node.input.push(StreamNode { - identity: "Merge (sink into table)".to_string(), - fields: node.fields.clone(), - node_body: Some(NodeBody::Merge(MergeNode { - upstream_dispatcher_type: DispatcherType::Hash as _, + input: vec![StreamNode { + node_body: Some(NodeBody::Merge(MergeNode { + ..Default::default() + })), ..Default::default() - })), + }], + identity: uniq_identity + .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK) + .to_string(), + fields: node.fields.clone(), + node_body: Some(project_node.clone()), ..Default::default() }); @@ -678,7 +725,7 @@ pub(crate) fn insert_merger_to_union(node: &mut StreamNode) { } for input in &mut node.input { - insert_merger_to_union(input); + insert_merger_to_union_with_project(input, project_node, uniq_identity); } } @@ -703,14 +750,16 @@ fn derive_sink_to_table_expr( } } -fn derive_default_column_project_for_sink( +pub(crate) fn derive_default_column_project_for_sink( sink: &SinkCatalog, sink_schema: &Schema, - target_table_catalog: &Arc, + columns: &[ColumnCatalog], user_specified_columns: bool, ) -> Result> { assert_eq!(sink.full_schema().len(), sink_schema.len()); + let default_column_exprs = TableCatalog::default_column_exprs(columns); + let mut exprs = vec![]; let sink_visible_col_idxes = sink @@ -726,17 +775,16 @@ fn derive_default_column_project_for_sink( .map(|(i, c)| (c.name(), i)) .collect::>(); - for (idx, table_column) in target_table_catalog.columns().iter().enumerate() { - if table_column.is_generated() { + for (idx, column) in columns.iter().enumerate() { + if column.is_generated() { continue; } - let default_col_expr = || -> ExprImpl { - rewrite_now_to_proctime(target_table_catalog.default_column_expr(idx)) - }; + let default_col_expr = + || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) }; let sink_col_expr = |sink_col_idx: usize| -> Result { - derive_sink_to_table_expr(sink_schema, sink_col_idx, table_column.data_type()) + derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type()) }; // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly. @@ -744,7 +792,7 @@ fn derive_default_column_project_for_sink( // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table. #[allow(clippy::collapsible_else_if)] if user_specified_columns { - if let Some(idx) = sink_visible_col_idxes_by_name.get(table_column.name()) { + if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) { exprs.push(sink_col_expr(*idx)?); } else { exprs.push(default_col_expr()); diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index a209a16c88ae3..967fe700dfcde 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType}; use risingwave_sqlparser::ast::ObjectName; @@ -20,7 +22,8 @@ use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::root_catalog::SchemaPath; use crate::error::Result; -use crate::handler::create_sink::{insert_merger_to_union, reparse_table_for_sink}; +use crate::handler::alter_table_column::hijack_merger_for_target_table; +use crate::handler::create_sink::{fetch_incoming_sinks, reparse_table_for_sink}; use crate::handler::HandlerArgs; pub async fn handle_drop_sink( @@ -29,7 +32,7 @@ pub async fn handle_drop_sink( if_exists: bool, cascade: bool, ) -> Result { - let session = handler_args.session; + let session = handler_args.session.clone(); let db_name = session.database(); let (schema_name, sink_name) = Binder::resolve_schema_qualified_name(db_name, sink_name)?; let search_path = session.config().search_path(); @@ -76,12 +79,18 @@ pub async fn handle_drop_sink( .incoming_sinks .clone_from(&table_catalog.incoming_sinks); - for _ in 0..(table_catalog.incoming_sinks.len() - 1) { - for fragment in graph.fragments.values_mut() { - if let Some(node) = &mut fragment.node { - insert_merger_to_union(node); - } - } + let mut incoming_sink_ids: HashSet<_> = + table_catalog.incoming_sinks.iter().copied().collect(); + + assert!(incoming_sink_ids.remove(&sink_id.sink_id)); + + for sink in fetch_incoming_sinks(&session, &incoming_sink_ids)? { + hijack_merger_for_target_table( + &mut graph, + table_catalog.columns(), + &sink, + Some(&sink.unique_identity()), + )?; } affected_table_change = Some(ReplaceTablePlan { diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 770d83bdabea9..79cf3c05e7e08 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -12,6 +12,7 @@ mod m20240418_142249_function_runtime; mod m20240506_112555_subscription_partial_ckpt; mod m20240525_090457_secret; mod m20240617_070131_index_column_properties; +mod m20240617_071625_sink_into_table_column; mod m20240618_072634_function_compressed_binary; mod m20240630_131430_remove_parallel_unit; mod m20240701_060504_hummock_time_travel; @@ -35,6 +36,7 @@ impl MigratorTrait for Migrator { Box::new(m20240525_090457_secret::Migration), Box::new(m20240618_072634_function_compressed_binary::Migration), Box::new(m20240617_070131_index_column_properties::Migration), + Box::new(m20240617_071625_sink_into_table_column::Migration), Box::new(m20240630_131430_remove_parallel_unit::Migration), Box::new(m20240702_080451_system_param_value::Migration), Box::new(m20240702_084927_unnecessary_fk::Migration), diff --git a/src/meta/model_v2/migration/src/m20240617_071625_sink_into_table_column.rs b/src/meta/model_v2/migration/src/m20240617_071625_sink_into_table_column.rs new file mode 100644 index 0000000000000..4904dd3623245 --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240617_071625_sink_into_table_column.rs @@ -0,0 +1,70 @@ +use sea_orm_migration::prelude::{Table as MigrationTable, *}; + +use crate::SubQueryStatement::SelectStatement; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Sink::Table) + .add_column(ColumnDef::new(Sink::OriginalTargetColumns).binary()) + .to_owned(), + ) + .await?; + + let stmt = Query::update() + .table(Sink::Table) + .value( + Sink::OriginalTargetColumns, + SimpleExpr::SubQuery( + None, + Box::new(SelectStatement( + Query::select() + .column((Table::Table, Table::Columns)) + .from(Table::Table) + .and_where( + Expr::col((Table::Table, Table::TableId)) + .equals((Sink::Table, Sink::TargetTable)), + ) + .to_owned(), + )), + ), + ) + .and_where(Expr::col((Sink::Table, Sink::TargetTable)).is_not_null()) + .to_owned(); + + manager.exec_stmt(stmt).await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Sink::Table) + .drop_column(Sink::OriginalTargetColumns) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Sink { + Table, + TargetTable, + OriginalTargetColumns, +} + +#[derive(DeriveIden)] +enum Table { + Table, + TableId, + Columns, +} diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index ad72aa3df981c..21aea7b696f4d 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -74,6 +74,7 @@ pub struct Model { pub target_table: Option, // `secret_ref` stores the mapping info mapping from property name to secret id and type. pub secret_ref: Option, + pub original_target_columns: ColumnCatalogArray, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -130,6 +131,7 @@ impl From for ActiveModel { sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())), target_table: Set(pb_sink.target_table.map(|x| x as _)), secret_ref: Set(Some(SecretRef::from(pb_sink.secret_refs))), + original_target_columns: Set(pb_sink.original_target_columns.into()), } } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index cb5dc2bf41b7e..4ee0a018f5b92 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2656,6 +2656,19 @@ impl CatalogController { .collect()) } + pub async fn get_sink_by_ids(&self, sink_ids: Vec) -> MetaResult> { + let inner = self.inner.read().await; + let sink_objs = Sink::find() + .find_also_related(Object) + .filter(sink::Column::SinkId.is_in(sink_ids)) + .all(&inner.db) + .await?; + Ok(sink_objs + .into_iter() + .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into()) + .collect()) + } + pub async fn get_subscription_by_id( &self, subscription_id: SubscriptionId, diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 5ace222739d52..7228ddbb36eb9 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -240,6 +240,7 @@ impl From> for PbSink { created_at_cluster_version: value.1.created_at_cluster_version, create_type: PbCreateType::Foreground as _, secret_refs: secret_ref_map, + original_target_columns: value.0.original_target_columns.to_protobuf(), } } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index ce86ca96ea507..c589e673038e1 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -28,9 +28,9 @@ use risingwave_meta_model_v2::prelude::{ }; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, - streaming_job, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, - FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, - StreamingParallelism, TableId, TableVersion, UserId, + streaming_job, table, ActorId, ActorUpstreamActors, ColumnCatalogArray, CreateType, DatabaseId, + ExprNodeArray, FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, + StreamNode, StreamingParallelism, TableId, TableVersion, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; @@ -874,6 +874,7 @@ impl CatalogController { None, Some(incoming_sink_id as _), None, + vec![], &txn, streaming_job, ) @@ -923,6 +924,7 @@ impl CatalogController { table_col_index_mapping: Option, creating_sink_id: Option, dropping_sink_id: Option, + updated_sink_catalogs: Vec, ) -> MetaResult { let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -933,6 +935,7 @@ impl CatalogController { table_col_index_mapping, creating_sink_id, dropping_sink_id, + updated_sink_catalogs, &txn, streaming_job, ) @@ -963,6 +966,7 @@ impl CatalogController { table_col_index_mapping: Option, creating_sink_id: Option, dropping_sink_id: Option, + updated_sink_catalogs: Vec, txn: &DatabaseTransaction, streaming_job: StreamingJob, ) -> MetaResult<(Vec, Vec)> { @@ -973,6 +977,25 @@ impl CatalogController { let job_id = table.id as ObjectId; + let original_table_catalogs = Table::find_by_id(job_id) + .select_only() + .columns([table::Column::Columns]) + .into_tuple::() + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?; + + // For sinks created in earlier versions, we need to set the original_target_columns. + for sink_id in updated_sink_catalogs { + sink::ActiveModel { + sink_id: Set(sink_id as _), + original_target_columns: Set(original_table_catalogs.clone()), + ..Default::default() + } + .update(txn) + .await?; + } + let mut table = table::ActiveModel::from(table); let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone(); if let Some(sink_id) = creating_sink_id { diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 02cc9ee8de0bf..62ecb766a2d74 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -264,6 +264,7 @@ impl CatalogManager { self.init_user().await?; self.init_database().await?; self.source_backward_compat_check().await?; + self.table_sink_catalog_update().await?; Ok(()) } @@ -320,6 +321,37 @@ impl CatalogManager { commit_meta!(self, sources)?; Ok(()) } + + // Fill in the original_target_columns that wasn't written in the previous version for the table sink. + async fn table_sink_catalog_update(&self) -> MetaResult<()> { + let core = &mut *self.core.lock().await; + let mut sinks = BTreeMapTransaction::new(&mut core.database.sinks); + let tables = BTreeMapTransaction::new(&mut core.database.tables); + let legacy_sinks = sinks + .tree_ref() + .iter() + .filter(|(_, sink)| { + sink.target_table.is_some() && sink.original_target_columns.is_empty() + }) + .map(|(_, sink)| sink.clone()) + .collect_vec(); + + for mut sink in legacy_sinks { + let target_table = sink.target_table(); + sink.original_target_columns + .clone_from(&tables.get(&target_table).unwrap().columns); + tracing::info!( + "updating sink {} target table columns {:?}", + sink.id, + sink.original_target_columns + ); + + sinks.insert(sink.id, sink); + } + commit_meta!(self, sinks)?; + + Ok(()) + } } // Database catalog related methods @@ -1287,7 +1319,14 @@ impl CatalogManager { if let Some((table, source)) = target_table { version = self - .finish_replace_table_procedure(&source, &table, None, Some(sink_id), None) + .finish_replace_table_procedure( + &source, + &table, + None, + Some(sink_id), + None, + vec![], + ) .await?; } @@ -3750,12 +3789,14 @@ impl CatalogManager { table_col_index_mapping: Option, creating_sink_id: Option, dropping_sink_id: Option, + updated_sink_ids: Vec, ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let mut tables = BTreeMapTransaction::new(&mut database_core.tables); let mut sources = BTreeMapTransaction::new(&mut database_core.sources); let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes); + let mut sinks = BTreeMapTransaction::new(&mut database_core.sinks); let key = (table.database_id, table.schema_id, table.name.clone()); assert!( @@ -3764,6 +3805,15 @@ impl CatalogManager { "table must exist and be in altering procedure" ); + let original_table = tables.get(&table.id).unwrap(); + let mut updated_sinks = vec![]; + for sink_id in updated_sink_ids { + let mut sink = sinks.get_mut(sink_id).unwrap(); + sink.original_target_columns + .clone_from(&original_table.columns); + updated_sinks.push(sink.clone()); + } + if let Some(source) = source { let source_key = (source.database_id, source.schema_id, source.name.clone()); assert!( @@ -3821,7 +3871,7 @@ impl CatalogManager { tables.insert(table.id, table.clone()); - commit_meta!(self, tables, indexes, sources)?; + commit_meta!(self, tables, indexes, sources, sinks)?; // Group notification let version = self @@ -3838,6 +3888,9 @@ impl CatalogManager { .chain(updated_indexes.into_iter().map(|index| Relation { relation_info: RelationInfo::Index(index).into(), })) + .chain(updated_sinks.into_iter().map(|sink| Relation { + relation_info: RelationInfo::Sink(sink).into(), + })) .collect_vec(), }), ) @@ -4204,6 +4257,17 @@ impl CatalogManager { Ok(map) } + pub async fn get_sinks(&self, sink_ids: &[SinkId]) -> Vec { + let mut sinks = vec![]; + let guard = self.core.lock().await; + for sink_id in sink_ids { + if let Some(sink) = guard.database.sinks.get(sink_id) { + sinks.push(sink.clone()); + } + } + sinks + } + pub async fn get_created_table_ids(&self) -> Vec { let guard = self.core.lock().await; guard diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index cb90f2326d20d..e99259810b117 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -20,7 +20,7 @@ use anyhow::anyhow; use futures::future::{select, Either}; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::{ObjectId, SourceId}; -use risingwave_pb::catalog::{PbSource, PbTable}; +use risingwave_pb::catalog::{PbSink, PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; @@ -554,6 +554,17 @@ impl MetadataManager { } } + pub async fn get_sink_catalog_by_ids(&self, ids: &[u32]) -> MetaResult> { + match &self { + MetadataManager::V1(mgr) => Ok(mgr.catalog_manager.get_sinks(ids).await), + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .get_sink_by_ids(ids.iter().map(|id| *id as _).collect()) + .await + } + } + } + pub async fn get_downstream_chain_fragments( &self, job_id: u32, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 04cb6c6d09186..3dd1e8401c226 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -46,8 +46,8 @@ use risingwave_pb::catalog::connection::PrivateLinkService; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ - connection, Comment, Connection, CreateType, Database, Function, PbSource, PbTable, Schema, - Secret, Sink, Source, Subscription, Table, View, + connection, Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, + Schema, Secret, Sink, Source, Subscription, Table, View, }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ @@ -1084,53 +1084,61 @@ impl DdlController { } } - let table = streaming_job.table().unwrap(); + let target_table = streaming_job.table().unwrap(); let target_fragment_id = union_fragment_id.expect("fragment of placeholder merger not found"); if let Some(creating_sink_table_fragments) = creating_sink_table_fragments { let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap(); - + let sink = sink.expect("sink not found"); Self::inject_replace_table_plan_for_sink( - sink.map(|sink| sink.id), + Some(sink.id), &sink_fragment, - table, + target_table, &mut replace_table_ctx, &mut table_fragments, target_fragment_id, + None, ); } let [table_catalog]: [_; 1] = mgr - .get_table_catalog_by_ids(vec![table.id]) + .get_table_catalog_by_ids(vec![target_table.id]) .await? .try_into() .expect("Target table should exist in sink into table"); - assert_eq!(table_catalog.incoming_sinks, table.incoming_sinks); + assert_eq!(table_catalog.incoming_sinks, target_table.incoming_sinks); { - for sink_id in &table_catalog.incoming_sinks { + let catalogs = mgr + .get_sink_catalog_by_ids(&table_catalog.incoming_sinks) + .await?; + + for sink in catalogs { + let sink_id = sink.id; + if let Some(dropping_sink_id) = dropping_sink_id - && *sink_id == dropping_sink_id + && sink_id == dropping_sink_id { continue; }; let sink_table_fragments = mgr - .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) + .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id)) .await?; let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); Self::inject_replace_table_plan_for_sink( - Some(*sink_id), + Some(sink_id), &sink_fragment, - table, + target_table, &mut replace_table_ctx, &mut table_fragments, target_fragment_id, + Some(&sink.unique_identity()), ); } } @@ -1151,13 +1159,14 @@ impl DdlController { Ok((replace_table_ctx, table_fragments)) } - fn inject_replace_table_plan_for_sink( + pub(crate) fn inject_replace_table_plan_for_sink( sink_id: Option, sink_fragment: &PbFragment, table: &Table, replace_table_ctx: &mut ReplaceTableContext, table_fragments: &mut TableFragments, target_fragment_id: FragmentId, + unique_identity: Option<&str>, ) { let sink_actor_ids = sink_fragment .actors @@ -1176,8 +1185,18 @@ impl DdlController { .map(|actor| actor.actor_id) .collect_vec(); - let output_indices = table - .columns + let mut sink_fields = None; + + for actor in &sink_fragment.actors { + if let Some(node) = &actor.nodes { + sink_fields = Some(node.fields.clone()); + break; + } + } + + let sink_fields = sink_fields.expect("sink fields not found"); + + let output_indices = sink_fields .iter() .enumerate() .map(|(idx, _)| idx as _) @@ -1222,29 +1241,47 @@ impl DdlController { } let upstream_fragment_id = sink_fragment.fragment_id; + for actor in &mut union_fragment.actors { if let Some(node) = &mut actor.nodes { - let fields = node.fields.clone(); - visit_stream_node_cont_mut(node, |node| { if let Some(NodeBody::Union(_)) = &mut node.node_body { - for input in &mut node.input { - if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body - && merge_node.upstream_actor_id.is_empty() - { - if let Some(sink_id) = sink_id { - input.identity = - format!("MergeExecutor(from sink {})", sink_id); + for input_project_node in &mut node.input { + if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body { + let merge_stream_node = + input_project_node.input.iter_mut().exactly_one().unwrap(); + + // we need to align nodes here + if input_project_node.identity.as_str() + != unique_identity + .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK) + { + continue; } - *merge_node = MergeNode { - upstream_actor_id: sink_actor_ids.clone(), - upstream_fragment_id, - upstream_dispatcher_type: DispatcherType::Hash as _, - fields: fields.clone(), - }; + if let Some(NodeBody::Merge(merge_node)) = + &mut merge_stream_node.node_body + && merge_node.upstream_actor_id.is_empty() + { + if let Some(sink_id) = sink_id { + merge_stream_node.identity = + format!("MergeExecutor(from sink {})", sink_id); + + input_project_node.identity = + format!("ProjectExecutor(from sink {})", sink_id); + } + + *merge_node = MergeNode { + upstream_actor_id: sink_actor_ids.clone(), + upstream_fragment_id, + upstream_dispatcher_type: DispatcherType::Hash as _, + fields: sink_fields.to_vec(), + }; - return false; + merge_stream_node.fields = sink_fields.to_vec(); + + return false; + } } } } @@ -1462,6 +1499,7 @@ impl DdlController { None, None, Some(sink_id), + vec![], ) .await?; } @@ -1805,6 +1843,7 @@ impl DdlController { let fragment_graph = self .prepare_replace_table(mgr.catalog_manager.clone(), &mut stream_job, fragment_graph) .await?; + let dummy_id = self .env .id_gen_manager() @@ -1812,8 +1851,10 @@ impl DdlController { .generate::<{ IdCategory::Table }>() .await? as u32; + let mut updated_sink_catalogs = vec![]; + let result: MetaResult<()> = try { - let (ctx, table_fragments) = self + let (mut ctx, mut table_fragments) = self .build_replace_table( stream_ctx, &stream_job, @@ -1823,6 +1864,62 @@ impl DdlController { ) .await?; + let StreamingJob::Table(_, table, _) = &stream_job else { + unreachable!("unexpected job: {stream_job:?}"); + }; + + let mut union_fragment_id = None; + + for (fragment_id, fragment) in &mut table_fragments.fragments { + for actor in &mut fragment.actors { + if let Some(node) = &mut actor.nodes { + visit_stream_node(node, |body| { + if let NodeBody::Union(_) = body { + if let Some(union_fragment_id) = union_fragment_id.as_mut() { + // The union fragment should be unique. + assert_eq!(*union_fragment_id, *fragment_id); + } else { + union_fragment_id = Some(*fragment_id); + } + } + }) + }; + } + } + + let target_fragment_id = + union_fragment_id.expect("fragment of placeholder merger not found"); + + let catalogs = self + .metadata_manager + .get_sink_catalog_by_ids(&table.incoming_sinks) + .await?; + + for sink in catalogs { + let sink_id = &sink.id; + + let sink_table_fragments = self + .metadata_manager + .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) + .await?; + + let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); + + Self::inject_replace_table_plan_for_sink( + Some(*sink_id), + &sink_fragment, + table, + &mut ctx, + &mut table_fragments, + target_fragment_id, + Some(&sink.unique_identity()), + ); + + if sink.original_target_columns.is_empty() { + updated_sink_catalogs.push(sink.id); + } + } + // Add table fragments to meta store with state: `State::Initial`. mgr.fragment_manager .start_create_table_fragments(table_fragments.clone()) @@ -1841,6 +1938,7 @@ impl DdlController { table_col_index_mapping, None, None, + updated_sink_catalogs, ) .await } @@ -2029,6 +2127,7 @@ impl DdlController { table_col_index_mapping: Option, creating_sink_id: Option, dropping_sink_id: Option, + updated_sink_ids: Vec, ) -> MetaResult { let StreamingJob::Table(source, table, ..) = stream_job else { unreachable!("unexpected job: {stream_job:?}") @@ -2041,6 +2140,7 @@ impl DdlController { table_col_index_mapping, creating_sink_id, dropping_sink_id, + updated_sink_ids, ) .await } diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index d8308703ddd40..c097fa5acb5c6 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use risingwave_common::util::column_index_mapping::ColIndexMapping; -use risingwave_common::util::stream_graph_visitor::visit_fragment; +use risingwave_common::util::stream_graph_visitor::{visit_fragment, visit_stream_node}; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::ObjectId; use risingwave_pb::catalog::CreateType; @@ -337,6 +337,7 @@ impl DdlController { None, None, Some(sink_id), + vec![], ) .await?; Ok(version) @@ -443,8 +444,10 @@ impl DdlController { .await?; tracing::debug!(id = streaming_job.id(), "building replace streaming job"); + let mut updated_sink_catalogs = vec![]; + let result: MetaResult> = try { - let (ctx, table_fragments) = self + let (mut ctx, mut table_fragments) = self .build_replace_table( ctx, &streaming_job, @@ -453,6 +456,59 @@ impl DdlController { dummy_id as _, ) .await?; + + let mut union_fragment_id = None; + + for (fragment_id, fragment) in &mut table_fragments.fragments { + for actor in &mut fragment.actors { + if let Some(node) = &mut actor.nodes { + visit_stream_node(node, |body| { + if let NodeBody::Union(_) = body { + if let Some(union_fragment_id) = union_fragment_id.as_mut() { + // The union fragment should be unique. + assert_eq!(*union_fragment_id, *fragment_id); + } else { + union_fragment_id = Some(*fragment_id); + } + } + }) + }; + } + } + + let target_fragment_id = + union_fragment_id.expect("fragment of placeholder merger not found"); + + let catalogs = self + .metadata_manager + .get_sink_catalog_by_ids(&table.incoming_sinks) + .await?; + + for sink in catalogs { + let sink_id = &sink.id; + + let sink_table_fragments = self + .metadata_manager + .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) + .await?; + + let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); + + Self::inject_replace_table_plan_for_sink( + Some(*sink_id), + &sink_fragment, + table, + &mut ctx, + &mut table_fragments, + target_fragment_id, + Some(&sink.unique_identity()), + ); + + if sink.original_target_columns.is_empty() { + updated_sink_catalogs.push(sink.id); + } + } + let merge_updates = ctx.merge_updates.clone(); mgr.catalog_controller @@ -476,6 +532,7 @@ impl DdlController { table_col_index_mapping, None, None, + updated_sink_catalogs, ) .await?; Ok(version) diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index f26f8c9f38d91..0f02033c5f003 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -296,6 +296,17 @@ impl catalog::StreamSourceInfo { } } +impl catalog::Sink { + // TODO: remove this placeholder + // creating table sink does not have an id, so we need a placeholder + pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER"; + + pub fn unique_identity(&self) -> String { + // TODO: use a more unique name + format!("{}", self.id) + } +} + #[cfg(test)] mod tests { use crate::data::{data_type, DataType};