From ad3bf1d852f71b23a278c0ddb608e1ed755f8a8d Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 21 Feb 2024 16:31:30 +0800 Subject: [PATCH] Refactor streaming job ctrl: Clean imports, remove cycle check, adjust types --- src/meta/src/controller/streaming_job.rs | 70 +++++------------------- src/meta/src/controller/utils.rs | 3 +- 2 files changed, 15 insertions(+), 58 deletions(-) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 5ba6de0ba1fad..7c4360a92f285 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::num::NonZeroUsize; use itertools::Itertools; @@ -35,7 +35,6 @@ use risingwave_meta_model_v2::{ FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamingParallelism, TableId, TableVersion, UserId, }; -use risingwave_pb::catalog; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; use risingwave_pb::catalog::{PbCreateType, PbTable}; @@ -66,8 +65,8 @@ use crate::barrier::Reschedule; use crate::controller::catalog::CatalogController; use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ - check_relation_name_duplicate, ensure_object_id, ensure_user_id, get_fragment_actor_ids, - get_fragment_mappings, + check_relation_name_duplicate, check_sink_into_table_cycle, ensure_object_id, ensure_user_id, + get_fragment_actor_ids, get_fragment_mappings, }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, SinkId, StreamingJob}; @@ -99,58 +98,6 @@ impl CatalogController { Ok(obj.oid) } - async fn check_cycle_for_table_sink( - table: u32, - sink: &catalog::Sink, - txn: &DatabaseTransaction, - ) -> MetaResult { - let mut queue: VecDeque<(ObjectId, ObjectType)> = VecDeque::new(); - - let mut visited_objects = HashSet::new(); - - visited_objects.insert(table as ObjectId); - - for table_id in &sink.dependent_relations { - queue.push_front((*table_id as ObjectId, ObjectType::Table)); - } - - while let Some((object_id, object_type)) = queue.pop_front() { - if visited_objects.contains(&object_id) { - return Ok(true); - } - - visited_objects.insert(object_id); - - if object_type == ObjectType::Table { - let table = Table::find_by_id(object_id) - .one(txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; - - for sink_id in table.incoming_sinks.inner_ref() { - queue.push_front((*sink_id, ObjectType::Sink)); - } - } - - let dependent_relations: Vec<(ObjectId, ObjectType)> = ObjectDependency::find() - .select_only() - .column(object_dependency::Column::Oid) - .column(object::Column::ObjType) - .join( - JoinType::InnerJoin, - object_dependency::Relation::Object2.def(), - ) - .filter(object_dependency::Column::UsedBy.eq(object_id)) - .into_tuple() - .all(txn) - .await?; - - queue.extend(dependent_relations.into_iter()); - } - - Ok(false) - } - pub async fn create_job_catalog( &self, streaming_job: &mut StreamingJob, @@ -196,7 +143,16 @@ impl CatalogController { } StreamingJob::Sink(sink, _) => { if let Some(target_table_id) = sink.target_table { - if Self::check_cycle_for_table_sink(target_table_id, sink, &txn).await? { + if check_sink_into_table_cycle( + target_table_id as ObjectId, + sink.dependent_relations + .iter() + .map(|id| *id as ObjectId) + .collect(), + &txn, + ) + .await? + { bail!("Creating such a sink will result in circular dependency."); } } diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index fe22d6dc079d6..6c7e61a316add 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -300,7 +300,8 @@ where )) .await? .unwrap(); - let cnt: i32 = res.try_get_by(0)?; + + let cnt: i64 = res.try_get_by(0)?; Ok(cnt != 0) }