From a3ed03161dfd4584b00f5c0fd8235dc06df12610 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 7 Feb 2024 23:20:21 +0800 Subject: [PATCH] rebase Signed-off-by: Shanicky Chen --- src/meta/src/controller/catalog.rs | 276 +++++++++++++++++++++-- src/meta/src/controller/rename.rs | 19 ++ src/meta/src/controller/streaming_job.rs | 62 ++++- 3 files changed, 335 insertions(+), 22 deletions(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 6077efa7f88c1..e26e1af0f0cff 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -19,16 +19,18 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; +use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; use risingwave_common::{bail, current_cluster_version}; +use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::table::TableType; use risingwave_meta_model_v2::{ - connection, database, function, index, object, object_dependency, schema, sink, source, - streaming_job, table, user_privilege, view, ActorId, ColumnCatalogArray, ConnectionId, - CreateType, DatabaseId, FragmentId, FunctionId, IndexId, JobStatus, ObjectId, - PrivateLinkService, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism, TableId, - UserId, + actor, connection, database, fragment, function, index, object, object_dependency, schema, + sink, source, streaming_job, table, user_privilege, view, ActorId, ActorUpstreamActors, + ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array, + IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SourceId, StreamSourceInfo, + StreamingParallelism, TableId, UserId, }; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ @@ -41,6 +43,8 @@ use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; use risingwave_pb::meta::{PbRelation, PbRelationGroup}; +use risingwave_pb::stream_plan::stream_node::NodeBody; +use risingwave_pb::stream_plan::FragmentTypeFlag; use risingwave_pb::user::PbUserInfo; use sea_orm::sea_query::{Expr, SimpleExpr}; use sea_orm::ActiveValue::Set; @@ -423,6 +427,7 @@ impl CatalogController { pub async fn clean_dirty_creating_jobs(&self) -> MetaResult { let inner = self.inner.write().await; let txn = inner.db.begin().await?; + let creating_job_ids: Vec = streaming_job::Entity::find() .select_only() .column(streaming_job::Column::JobId) @@ -436,7 +441,14 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; + + let changed = Self::clean_dirty_sink_downstreams(&txn).await?; + if creating_job_ids.is_empty() { + if changed { + txn.commit().await?; + } + return Ok(ReleaseContext::default()); } @@ -476,6 +488,7 @@ impl CatalogController { .exec(&txn) .await?; assert!(res.rows_affected > 0); + txn.commit().await?; Ok(ReleaseContext { @@ -485,6 +498,175 @@ impl CatalogController { }) } + async fn clean_dirty_sink_downstreams(txn: &DatabaseTransaction) -> MetaResult { + // clean incoming sink from (table) + // clean upstream fragment ids from (fragment) + // clean stream node from (fragment) + // clean upstream actor ids from (actor) + let all_fragment_ids: Vec = Fragment::find() + .select_only() + .columns(vec![fragment::Column::FragmentId]) + .into_tuple() + .all(txn) + .await?; + + let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect(); + + let table_sink_ids: Vec = Sink::find() + .select_only() + .column(sink::Column::SinkId) + .filter(sink::Column::TargetTable.is_not_null()) + .into_tuple() + .all(txn) + .await?; + + let all_table_with_incoming_sinks: Vec<(ObjectId, I32Array)> = Table::find() + .select_only() + .columns(vec![table::Column::TableId, table::Column::IncomingSinks]) + .into_tuple() + .all(txn) + .await?; + + let table_incoming_sinks_to_update = all_table_with_incoming_sinks + .into_iter() + .filter(|(_, incoming_sinks)| { + let inner_ref = incoming_sinks.inner_ref(); + !inner_ref.is_empty() + && inner_ref + .iter() + .any(|sink_id| !table_sink_ids.contains(sink_id)) + }) + .collect_vec(); + + let new_table_incoming_sinks = table_incoming_sinks_to_update + .into_iter() + .map(|(table_id, incoming_sinks)| { + let new_incoming_sinks = incoming_sinks + .into_inner() + .extract_if(|id| table_sink_ids.contains(id)) + .collect_vec(); + (table_id, I32Array::from(new_incoming_sinks)) + }) + .collect_vec(); + + // no need to update, returning + if new_table_incoming_sinks.is_empty() { + return Ok(false); + } + + for (table_id, new_incoming_sinks) in new_table_incoming_sinks { + tracing::info!("cleaning dirty table sink downstream table {}", table_id); + Table::update_many() + .col_expr(table::Column::IncomingSinks, new_incoming_sinks.into()) + .filter(table::Column::TableId.eq(table_id)) + .exec(txn) + .await?; + + let fragments: Vec<(FragmentId, I32Array, StreamNode, i32)> = Fragment::find() + .select_only() + .columns(vec![ + fragment::Column::FragmentId, + fragment::Column::UpstreamFragmentId, + fragment::Column::StreamNode, + fragment::Column::FragmentTypeMask, + ]) + .filter(fragment::Column::JobId.eq(table_id)) + .into_tuple() + .all(txn) + .await?; + + for (fragment_id, upstream_fragment_ids, stream_node, fragment_mask) in fragments { + let mut upstream_fragment_ids = upstream_fragment_ids.into_inner(); + + let dirty_upstream_fragment_ids = upstream_fragment_ids + .extract_if(|id| !all_fragment_ids.contains(id)) + .collect_vec(); + + if !dirty_upstream_fragment_ids.is_empty() { + // dirty downstream should be materialize fragment of table + assert!(fragment_mask & FragmentTypeFlag::Mview as i32 > 0); + + tracing::info!( + "cleaning dirty table sink fragment {:?} from downstream fragment {}", + dirty_upstream_fragment_ids, + fragment_id + ); + + let mut pb_stream_node = stream_node.to_protobuf(); + + visit_stream_node_cont(&mut pb_stream_node, |node| { + if let Some(NodeBody::Union(_)) = node.node_body { + node.input.retain_mut(|input| { + if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body + && all_fragment_ids + .contains(&(merge_node.upstream_fragment_id as i32)) + { + true + } else { + false + } + }); + } + true + }); + + Fragment::update_many() + .col_expr( + fragment::Column::UpstreamFragmentId, + I32Array::from(upstream_fragment_ids).into(), + ) + .col_expr( + fragment::Column::StreamNode, + StreamNode::from_protobuf(&pb_stream_node).into(), + ) + .filter(fragment::Column::FragmentId.eq(fragment_id)) + .exec(txn) + .await?; + + let actors: Vec<(ActorId, ActorUpstreamActors)> = Actor::find() + .select_only() + .columns(vec![ + actor::Column::ActorId, + actor::Column::UpstreamActorIds, + ]) + .filter(actor::Column::FragmentId.eq(fragment_id)) + .into_tuple() + .all(txn) + .await?; + + for (actor_id, upstream_actor_ids) in actors { + let mut upstream_actor_ids = upstream_actor_ids.into_inner(); + + let dirty_actor_upstreams = upstream_actor_ids + .extract_if(|id, _| !all_fragment_ids.contains(id)) + .map(|(id, _)| id) + .collect_vec(); + + if !dirty_actor_upstreams.is_empty() { + tracing::debug!( + "cleaning dirty table sink fragment {:?} from downstream fragment {} actor {}", + dirty_actor_upstreams, + fragment_id, + actor_id, + ); + + Actor::update_many() + .col_expr( + actor::Column::UpstreamActorIds, + ActorUpstreamActors::from(upstream_actor_ids).into(), + ) + .filter(actor::Column::ActorId.eq(actor_id)) + .exec(txn) + .await?; + } + } + } + } + } + + Ok(true) + } + /// `finish_streaming_job` marks job related objects as `Created` and notify frontend. pub async fn finish_streaming_job(&self, job_id: ObjectId) -> MetaResult { let inner = self.inner.write().await; @@ -1487,6 +1669,52 @@ impl CatalogController { ); to_drop_objects.push(obj); + // Special handling for 'sink into table'. + if object_type != ObjectType::Sink { + // When dropping a table downstream, all incoming sinks of the table should be dropped as well. + if object_type == ObjectType::Table { + let table = Table::find_by_id(object_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + + let incoming_sinks = table.incoming_sinks.into_inner(); + + if !incoming_sinks.is_empty() { + let objs: Vec = Object::find() + .filter(object::Column::Oid.is_in(incoming_sinks)) + .into_partial_model() + .all(&txn) + .await?; + + to_drop_objects.extend(objs); + } + } + + let to_drop_object_ids: HashSet<_> = + to_drop_objects.iter().map(|obj| obj.oid).collect(); + + // When there is a table sink in the dependency chain of drop cascade, an error message needs to be returned currently to manually drop the sink. + for obj in &to_drop_objects { + if obj.obj_type == ObjectType::Sink { + let sink = Sink::find_by_id(obj.oid) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?; + + // Since dropping the sink into the table requires the frontend to handle some of the logic (regenerating the plan), it’s not compatible with the current cascade dropping. + if let Some(target_table) = sink.target_table + && !to_drop_object_ids.contains(&target_table) + { + bail!( + "Found sink into table with sink id {} in dependency, please drop them manually", + obj.oid, + ); + } + } + } + } + let to_drop_table_ids = to_drop_objects .iter() .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index) @@ -1856,22 +2084,28 @@ impl CatalogController { }); }}; } - let objs = get_referring_objects(object_id, &txn).await?; - // TODO: For sink into table. when sink into table is ready. - // if object_type == ObjectType::Table { - // let incoming_sinks: Vec<_> = Table::find_by_id(object_id) - // .select_only() - // .column(table::Column::IncomingSinks) - // .into_tuple() - // .one(&txn) - // .await? - // .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; - // objs.extend(incoming_sinks.into_iter().map(|id| PartialObject { - // oid: id as _, - // obj_type: ObjectType::Sink, - // ..Default::default() - // })); - // } + let mut objs = get_referring_objects(object_id, &txn).await?; + if object_type == ObjectType::Table { + let incoming_sinks: I32Array = Table::find_by_id(object_id) + .select_only() + .column(table::Column::IncomingSinks) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + + objs.extend( + incoming_sinks + .into_inner() + .into_iter() + .map(|id| PartialObject { + oid: id, + obj_type: ObjectType::Sink, + schema_id: None, + database_id: None, + }), + ); + } for obj in objs { match obj.obj_type { diff --git a/src/meta/src/controller/rename.rs b/src/meta/src/controller/rename.rs index bde954a587fdf..15be4d7ef83b8 100644 --- a/src/meta/src/controller/rename.rs +++ b/src/meta/src/controller/rename.rs @@ -79,6 +79,7 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str stmt: CreateSinkStatement { sink_from: CreateSink::AsQuery(query), + into_table_name: None, .. }, } => { @@ -89,9 +90,27 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str stmt: CreateSinkStatement { sink_from: CreateSink::From(table_name), + into_table_name: None, .. }, } => replace_table_name(table_name, to), + Statement::CreateSink { + stmt: CreateSinkStatement { + sink_from, + into_table_name: Some(table_name), + .. + } + } => { + let idx = table_name.0.len() - 1; + if table_name.0[idx].real_value() == from { + table_name.0[idx] = Ident::new_unchecked(to); + } else { + match sink_from { + CreateSink::From(table_name) => replace_table_name(table_name, to), + CreateSink::AsQuery(query) => QueryRewriter::rewrite_query(query, from, to), + } + } + } _ => unreachable!(), }; stmt.to_string() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 9bb8af6172469..5ba6de0ba1fad 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use std::num::NonZeroUsize; use itertools::Itertools; +use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -34,6 +35,7 @@ use risingwave_meta_model_v2::{ FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamingParallelism, TableId, TableVersion, UserId, }; +use risingwave_pb::catalog; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; use risingwave_pb::catalog::{PbCreateType, PbTable}; @@ -97,6 +99,58 @@ impl CatalogController { Ok(obj.oid) } + async fn check_cycle_for_table_sink( + table: u32, + sink: &catalog::Sink, + txn: &DatabaseTransaction, + ) -> MetaResult { + let mut queue: VecDeque<(ObjectId, ObjectType)> = VecDeque::new(); + + let mut visited_objects = HashSet::new(); + + visited_objects.insert(table as ObjectId); + + for table_id in &sink.dependent_relations { + queue.push_front((*table_id as ObjectId, ObjectType::Table)); + } + + while let Some((object_id, object_type)) = queue.pop_front() { + if visited_objects.contains(&object_id) { + return Ok(true); + } + + visited_objects.insert(object_id); + + if object_type == ObjectType::Table { + let table = Table::find_by_id(object_id) + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + + for sink_id in table.incoming_sinks.inner_ref() { + queue.push_front((*sink_id, ObjectType::Sink)); + } + } + + let dependent_relations: Vec<(ObjectId, ObjectType)> = ObjectDependency::find() + .select_only() + .column(object_dependency::Column::Oid) + .column(object::Column::ObjType) + .join( + JoinType::InnerJoin, + object_dependency::Relation::Object2.def(), + ) + .filter(object_dependency::Column::UsedBy.eq(object_id)) + .into_tuple() + .all(txn) + .await?; + + queue.extend(dependent_relations.into_iter()); + } + + Ok(false) + } + pub async fn create_job_catalog( &self, streaming_job: &mut StreamingJob, @@ -141,6 +195,12 @@ impl CatalogController { Table::insert(table).exec(&txn).await?; } StreamingJob::Sink(sink, _) => { + if let Some(target_table_id) = sink.target_table { + if Self::check_cycle_for_table_sink(target_table_id, sink, &txn).await? { + bail!("Creating such a sink will result in circular dependency."); + } + } + let job_id = Self::create_streaming_job_obj( &txn, ObjectType::Sink,