diff --git a/proto/catalog.proto b/proto/catalog.proto index 7dfefa003217d..1b37ed7999cc5 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -182,6 +182,8 @@ message Sink { // Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id. map secret_ref = 25; + + repeated plan_common.ColumnCatalog original_target_columns = 26; } message Subscription { diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 87ccc7b96caf7..bb8bacc5ccdf3 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -110,6 +110,7 @@ impl SinkDesc { initialized_at_cluster_version: None, create_type: self.create_type, secret_ref, + original_target_columns: vec![], } } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index f02eb2cdcf9e9..24b184a2044e1 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -340,6 +340,8 @@ pub struct SinkCatalog { /// The secret reference for the sink, mapping from property name to secret id. pub secret_ref: BTreeMap, + + pub original_target_columns: Vec, } impl SinkCatalog { @@ -382,6 +384,11 @@ impl SinkCatalog { initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), create_type: self.create_type.to_proto() as i32, secret_ref: self.secret_ref.clone(), + original_target_columns: self + .original_target_columns + .iter() + .map(|c| c.to_protobuf()) + .collect_vec(), } } @@ -476,6 +483,11 @@ impl From for SinkCatalog { created_at_cluster_version: pb.created_at_cluster_version, create_type: CreateType::from_proto(create_type), secret_ref: pb.secret_ref, + original_target_columns: pb + .original_target_columns + .into_iter() + .map(ColumnCatalog::from) + .collect_vec(), } } } diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index a0d21c0fb4a3e..dea5d6b453dd1 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -13,18 +13,17 @@ // limitations under the License. use std::collections::HashSet; -use std::rc::Rc; use std::sync::Arc; use anyhow::Context; -use create_sink::derive_default_column_project_for_sink; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::ColumnCatalog; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkCatalog; -use risingwave_pb::stream_plan::StreamFragmentGraph; +use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; use risingwave_sqlparser::ast::{ AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement, }; @@ -33,18 +32,14 @@ use risingwave_sqlparser::parser::Parser; use super::create_source::get_json_schema_location; use super::create_table::{bind_sql_columns, generate_stream_graph_for_table, ColumnIdGenerator}; use super::util::SourceSchemaCompatExt; -use super::{create_sink, HandlerArgs, RwPgResponse}; +use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::table_catalog::TableType; use crate::error::{ErrorCode, Result, RwError}; -use crate::expr::ExprImpl; +use crate::expr::{Expr, ExprImpl, InputRef, Literal}; use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project}; -use crate::optimizer::plan_node::generic::SourceNodeKind; -use crate::optimizer::plan_node::{ - generic, LogicalSource, StreamProject, ToStream, ToStreamContext, -}; use crate::session::SessionImpl; -use crate::{Binder, OptimizerContext, TableCatalog, WithOptions}; +use crate::{Binder, TableCatalog, WithOptions}; pub async fn replace_table_with_definition( session: &Arc, @@ -102,19 +97,10 @@ pub async fn replace_table_with_definition( ); let incoming_sink_ids: HashSet<_> = original_catalog.incoming_sinks.iter().copied().collect(); - let incoming_sinks = fetch_incoming_sinks(session, &incoming_sink_ids)?; let target_columns = bind_sql_columns(&columns)?; - let default_columns: Vec = TableCatalog::default_column_exprs(&target_columns); - - for sink in incoming_sinks { - let context = Rc::new(OptimizerContext::from_handler_args(handler_args.clone())); - hijack_merger_for_target_table( - &mut graph, - &target_columns, - &default_columns, - &sink, - context, - )?; + + for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? { + hijack_merger_for_target_table(&mut graph, &target_columns, &sink)?; } table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); @@ -130,31 +116,40 @@ pub async fn replace_table_with_definition( pub(crate) fn hijack_merger_for_target_table( graph: &mut StreamFragmentGraph, target_columns: &[ColumnCatalog], - default_columns: &[ExprImpl], sink: &SinkCatalog, - context: Rc, ) -> Result<()> { - let exprs = derive_default_column_project_for_sink( - sink, - &sink.full_schema(), - target_columns, - default_columns, - false, // todo - )?; - - let pb_project = StreamProject::new(generic::Project::new( - exprs, - LogicalSource::new( - None, - sink.full_columns().to_vec(), - None, - SourceNodeKind::CreateTable, - context, - None, - ) - .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?, - )) - .to_stream_prost_body_inner(); + let mut sink_columns = sink.original_target_columns.clone(); + if sink_columns.is_empty() { + sink_columns = target_columns.to_vec(); + } + + let mut i = 0; + let mut j = 0; + let mut exprs = Vec::new(); + + while j < target_columns.len() { + if i < sink_columns.len() && sink_columns[i].data_type() == target_columns[j].data_type() { + exprs.push(ExprImpl::InputRef(Box::new(InputRef { + data_type: sink_columns[i].data_type().clone(), + index: i, + }))); + + i += 1; + j += 1; + } else { + exprs.push(ExprImpl::Literal(Box::new(Literal::new( + None, + target_columns[j].data_type().clone(), + )))); + + j += 1; + } + } + + let pb_project = PbNodeBody::Project(ProjectNode { + select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(), + ..Default::default() + }); for fragment in graph.fragments.values_mut() { if let Some(node) = &mut fragment.node { @@ -181,9 +176,6 @@ pub async fn handle_alter_table_column( ) -> Result { let session = handler_args.session; let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; - // if !original_catalog.incoming_sinks.is_empty() { - // bail_not_implemented!("alter table with incoming sinks"); - // } // TODO(yuhao): alter table with generated columns. if original_catalog.has_generated_column() { @@ -219,6 +211,14 @@ pub async fn handle_alter_table_column( } } + if !original_catalog.incoming_sinks.is_empty() + && matches!(operation, AlterTableOperation::DropColumn { .. }) + { + return Err(ErrorCode::InvalidInputSyntax( + "dropping columns in target table of sinks is not supported".to_string(), + ))?; + } + match operation { AlterTableOperation::AddColumn { column_def: new_column, diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index b21b1f504285b..2801de351960e 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -285,7 +285,6 @@ pub fn gen_sink_plan( &sink_catalog, sink_plan.schema(), table_catalog.columns(), - TableCatalog::default_column_exprs(table_catalog.columns()).as_ref(), user_specified_columns, )?; @@ -416,7 +415,7 @@ pub async fn handle_create_sink( let partition_info = get_partition_compute_info(&handle_args.with_options).await?; - let (sink, graph, target_table_catalog) = { + let (mut sink, graph, target_table_catalog) = { let context = Rc::new(OptimizerContext::from_handler_args(handle_args.clone())); let SinkPlanContext { @@ -454,30 +453,24 @@ pub async fn handle_create_sink( let (mut graph, mut table, source) = reparse_table_for_sink(&session, &table_catalog).await?; + sink.original_target_columns = table + .columns + .iter() + .map(|col| ColumnCatalog::from(col.clone())) + .collect_vec(); + table .incoming_sinks .clone_from(&table_catalog.incoming_sinks); - // let target_columns = bind_sql_columns(&columns)?; - - let default_columns: Vec = - TableCatalog::default_column_exprs(table_catalog.columns()); - // let default_columns: Vec = TableCatalog::default_column_exprs(&target_columns); - let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect(); - let mut incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?; - incoming_sinks.push(Arc::new(sink.clone())); - - let context = Rc::new(OptimizerContext::from_handler_args(handle_args.clone())); for sink in incoming_sinks { crate::handler::alter_table_column::hijack_merger_for_target_table( &mut graph, table_catalog.columns(), - &default_columns, &sink, - context.clone(), )?; } @@ -746,11 +739,12 @@ pub(crate) fn derive_default_column_project_for_sink( sink: &SinkCatalog, sink_schema: &Schema, columns: &[ColumnCatalog], - default_column_exprs: &[ExprImpl], user_specified_columns: bool, ) -> Result> { assert_eq!(sink.full_schema().len(), sink_schema.len()); + let default_column_exprs = TableCatalog::default_column_exprs(columns); + let mut exprs = vec![]; let sink_visible_col_idxes = sink diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index b661a2c27f403..ef9a13491fcc6 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::HashSet; -use std::rc::Rc; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_pb::ddl_service::ReplaceTablePlan; @@ -23,10 +22,9 @@ use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::root_catalog::SchemaPath; use crate::error::Result; -use crate::expr::ExprImpl; +use crate::handler::alter_table_column::hijack_merger_for_target_table; use crate::handler::create_sink::{fetch_incoming_sinks, reparse_table_for_sink}; use crate::handler::HandlerArgs; -use crate::{OptimizerContext, TableCatalog}; pub async fn handle_drop_sink( handler_args: HandlerArgs, @@ -81,25 +79,13 @@ pub async fn handle_drop_sink( .incoming_sinks .clone_from(&table_catalog.incoming_sinks); - let default_columns: Vec = - TableCatalog::default_column_exprs(table_catalog.columns()); - let mut incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect(); assert!(incoming_sink_ids.remove(&sink_id.sink_id)); - let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?; - - for sink in incoming_sinks { - let context = Rc::new(OptimizerContext::from_handler_args(handler_args.clone())); - crate::handler::alter_table_column::hijack_merger_for_target_table( - &mut graph, - table_catalog.columns(), - &default_columns, - &sink, - context, - )?; + for sink in fetch_incoming_sinks(&session, &incoming_sink_ids)? { + hijack_merger_for_target_table(&mut graph, table_catalog.columns(), &sink)?; } affected_table_change = Some(ReplaceTablePlan { diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index 78d0806f98a5e..7bfa9f6ca855a 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -74,6 +74,7 @@ pub struct Model { pub target_table: Option, // `secret_ref` stores a json string, mapping from property name to secret id. pub secret_ref: Option, + pub original_target_columns: ColumnCatalogArray, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -130,6 +131,7 @@ impl From for ActiveModel { sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())), target_table: Set(pb_sink.target_table.map(|x| x as _)), secret_ref: Set(Some(SecretRef::from(pb_sink.secret_ref))), + original_target_columns: Set(pb_sink.original_target_columns.into()), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index ac61ceb67b77f..38c6ec18c5d15 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -235,6 +235,7 @@ impl From> for PbSink { created_at_cluster_version: value.1.created_at_cluster_version, create_type: PbCreateType::Foreground as _, secret_ref: secret_ref_map, + original_target_columns: value.0.original_target_columns.to_protobuf(), } } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 3066ce223785e..9a64383aaf5dd 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -30,9 +30,9 @@ use risingwave_meta_model_v2::prelude::{ }; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, - streaming_job, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, - FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, - StreamingParallelism, TableId, TableVersion, UserId, + streaming_job, table, ActorId, ActorUpstreamActors, ColumnCatalogArray, CreateType, DatabaseId, + ExprNodeArray, FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, + StreamNode, StreamingParallelism, TableId, TableVersion, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; @@ -770,6 +770,7 @@ impl CatalogController { None, Some(incoming_sink_id as _), None, + vec![], &txn, streaming_job, ) @@ -814,6 +815,7 @@ impl CatalogController { table_col_index_mapping: Option, creating_sink_id: Option, dropping_sink_id: Option, + updated_sink_catalogs: Vec, ) -> MetaResult { let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -824,6 +826,7 @@ impl CatalogController { table_col_index_mapping, creating_sink_id, dropping_sink_id, + updated_sink_catalogs, &txn, streaming_job, ) @@ -854,6 +857,7 @@ impl CatalogController { table_col_index_mapping: Option, creating_sink_id: Option, dropping_sink_id: Option, + updated_sink_catalogs: Vec, txn: &DatabaseTransaction, streaming_job: StreamingJob, ) -> MetaResult<(Vec, Vec)> { @@ -864,6 +868,24 @@ impl CatalogController { let job_id = table.id as ObjectId; + let original_table_catalogs = Table::find_by_id(job_id) + .select_only() + .columns([table::Column::Columns]) + .into_tuple::() + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?; + + for sink_id in updated_sink_catalogs { + sink::ActiveModel { + sink_id: Set(sink_id as _), + original_target_columns: Set(original_table_catalogs.clone()), + ..Default::default() + } + .update(txn) + .await?; + } + let mut table = table::ActiveModel::from(table); let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone(); if let Some(sink_id) = creating_sink_id { diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 05486f5a6c48b..9ccebd9beb1f9 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3375,12 +3375,14 @@ impl CatalogManager { table_col_index_mapping: Option, creating_sink_id: Option, dropping_sink_id: Option, + updated_sink_ids: Vec, ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let mut tables = BTreeMapTransaction::new(&mut database_core.tables); let mut sources = BTreeMapTransaction::new(&mut database_core.sources); let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes); + let mut sinks = BTreeMapTransaction::new(&mut database_core.sinks); let key = (table.database_id, table.schema_id, table.name.clone()); assert!( @@ -3389,6 +3391,16 @@ impl CatalogManager { "table must exist and be in altering procedure" ); + let original_table = tables.get(&table.id).unwrap(); + let mut updated_sinks = vec![]; + for sink_id in updated_sink_ids { + let mut sink = sinks.get_mut(sink_id).unwrap().clone(); + sink.original_target_columns + .clone_from(&original_table.columns); + sinks.insert(sink.id, sink.clone()); + updated_sinks.push(sink); + } + if let Some(source) = source { let source_key = (source.database_id, source.schema_id, source.name.clone()); assert!( @@ -3463,6 +3475,9 @@ impl CatalogManager { .chain(updated_indexes.into_iter().map(|index| Relation { relation_info: RelationInfo::Index(index).into(), })) + .chain(updated_sinks.into_iter().map(|sink| Relation { + relation_info: RelationInfo::Sink(sink).into(), + })) .collect_vec(), }), ) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c072ae4b2a2e2..75d7556c30fe7 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1171,7 +1171,7 @@ impl DdlController { Ok((replace_table_ctx, table_fragments)) } - fn inject_replace_table_plan_for_sink( + pub(crate) fn inject_replace_table_plan_for_sink( sink_id: Option, sink_fragment: &PbFragment, table: &Table, @@ -1252,12 +1252,12 @@ impl DdlController { if let Some(node) = &mut actor.nodes { visit_stream_node_cont_mut(node, |node| { if let Some(NodeBody::Union(_)) = &mut node.node_body { - for input in &mut node.input { - if let Some(NodeBody::Project(_)) = &mut input.node_body { + for input_project_node in &mut node.input { + if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body { let merge_stream_node = - input.input.iter_mut().exactly_one().unwrap(); + input_project_node.input.iter_mut().exactly_one().unwrap(); - if input.identity.as_str() != uniq_name { + if input_project_node.identity.as_str() != uniq_name { continue; } @@ -1279,7 +1279,7 @@ impl DdlController { merge_stream_node.fields = sink_fields.to_vec(); - input.fields.clone_from(&node.fields); + input_project_node.fields.clone_from(&node.fields); return false; } @@ -1502,6 +1502,7 @@ impl DdlController { None, None, Some(sink_id), + vec![], ) .await?; } @@ -1834,6 +1835,7 @@ impl DdlController { None, Some(sink_id), None, + vec![], ) .await?; } @@ -1931,6 +1933,8 @@ impl DdlController { .generate::<{ IdCategory::Table }>() .await? as u32; + let mut updated_sink_catalogs = vec![]; + let result: MetaResult<()> = try { let (mut ctx, mut table_fragments) = self .build_replace_table( @@ -1994,6 +1998,10 @@ impl DdlController { target_fragment_id, uniq_name, ); + + if sink.original_target_columns.is_empty() { + updated_sink_catalogs.push(sink.id); + } } // Add table fragments to meta store with state: `State::Initial`. @@ -2014,6 +2022,7 @@ impl DdlController { table_col_index_mapping, None, None, + updated_sink_catalogs, ) .await } @@ -2159,6 +2168,7 @@ impl DdlController { table_col_index_mapping: Option, creating_sink_id: Option, dropping_sink_id: Option, + updated_sink_ids: Vec, ) -> MetaResult { let StreamingJob::Table(source, table, ..) = stream_job else { unreachable!("unexpected job: {stream_job:?}") @@ -2171,6 +2181,7 @@ impl DdlController { table_col_index_mapping, creating_sink_id, dropping_sink_id, + updated_sink_ids, ) .await } diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 13ecfca13c782..6ba49a3ffa102 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use risingwave_common::util::column_index_mapping::ColIndexMapping; -use risingwave_common::util::stream_graph_visitor::visit_fragment; +use risingwave_common::util::stream_graph_visitor::{visit_fragment, visit_stream_node}; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::ObjectId; use risingwave_pb::catalog::CreateType; @@ -359,6 +359,7 @@ impl DdlController { None, None, Some(sink_id), + vec![], ) .await?; Ok(version) @@ -463,8 +464,10 @@ impl DdlController { .await?; tracing::debug!(id = streaming_job.id(), "building replace streaming job"); + let mut updated_sink_catalogs = vec![]; + let result: MetaResult> = try { - let (ctx, table_fragments) = self + let (mut ctx, mut table_fragments) = self .build_replace_table( ctx, &streaming_job, @@ -473,6 +476,61 @@ impl DdlController { dummy_id as _, ) .await?; + + let mut union_fragment_id = None; + + for (fragment_id, fragment) in &mut table_fragments.fragments { + for actor in &mut fragment.actors { + if let Some(node) = &mut actor.nodes { + visit_stream_node(node, |body| { + if let NodeBody::Union(_) = body { + if let Some(union_fragment_id) = union_fragment_id.as_mut() { + // The union fragment should be unique. + assert_eq!(*union_fragment_id, *fragment_id); + } else { + union_fragment_id = Some(*fragment_id); + } + } + }) + }; + } + } + + let target_fragment_id = + union_fragment_id.expect("fragment of placeholder merger not found"); + + let catalogs = self + .metadata_manager + .get_sink_catalog_by_ids(&table.incoming_sinks) + .await?; + + for sink in catalogs { + let sink_id = &sink.id; + + let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name); + + let sink_table_fragments = self + .metadata_manager + .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) + .await?; + + let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); + + Self::inject_replace_table_plan_for_sink( + Some(*sink_id), + &sink_fragment, + table, + &mut ctx, + &mut table_fragments, + target_fragment_id, + uniq_name, + ); + + if sink.original_target_columns.is_empty() { + updated_sink_catalogs.push(sink.id); + } + } + let merge_updates = ctx.merge_updates.clone(); mgr.catalog_controller @@ -496,6 +554,7 @@ impl DdlController { table_col_index_mapping, None, None, + updated_sink_catalogs, ) .await?; Ok(version)