From a97a3aaf62a333573a77dac35744989b466c474b Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 25 Nov 2024 17:29:41 +0800 Subject: [PATCH] a little more changes Signed-off-by: xxchan --- src/meta/src/controller/streaming_job.rs | 76 +++++++++++++----------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index cf4b283e60a08..b5c7633d475ea 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -73,7 +73,7 @@ use crate::controller::utils::{ use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, StreamingJob}; use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; -use crate::stream::SplitAssignment; +use crate::stream::{self, SplitAssignment}; use crate::{MetaError, MetaResult}; impl CatalogController { @@ -1014,47 +1014,51 @@ impl CatalogController { txn: &DatabaseTransaction, streaming_job: StreamingJob, ) -> MetaResult<(Vec, Vec)> { - // Question: The source catalog should be remain unchanged? - let StreamingJob::Table(_, table, ..) = streaming_job else { - unreachable!("unexpected job: {streaming_job:?}") - }; + let original_job_id = streaming_job.id() as ObjectId; - let original_job_id = table.id as ObjectId; + match streaming_job { + StreamingJob::Table(_source, table, _table_job_type) => { + // The source catalog should be remain unchanged - let original_table_catalogs = Table::find_by_id(original_job_id) - .select_only() - .columns([table::Column::Columns]) - .into_tuple::() - .one(txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?; + let original_table_catalogs = Table::find_by_id(original_job_id) + .select_only() + .columns([table::Column::Columns]) + .into_tuple::() + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?; - // For sinks created in earlier versions, we need to set the original_target_columns. - for sink_id in updated_sink_catalogs { - sink::ActiveModel { - sink_id: Set(sink_id as _), - original_target_columns: Set(Some(original_table_catalogs.clone())), - ..Default::default() - } - .update(txn) - .await?; - } + // For sinks created in earlier versions, we need to set the original_target_columns. + for sink_id in updated_sink_catalogs { + sink::ActiveModel { + sink_id: Set(sink_id as _), + original_target_columns: Set(Some(original_table_catalogs.clone())), + ..Default::default() + } + .update(txn) + .await?; + } - let mut table = table::ActiveModel::from(table); - let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone(); - if let Some(sink_id) = creating_sink_id { - debug_assert!(!incoming_sinks.contains(&{ sink_id })); - incoming_sinks.push(sink_id as _); - } + let mut table = table::ActiveModel::from(table); + let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone(); + if let Some(sink_id) = creating_sink_id { + debug_assert!(!incoming_sinks.contains(&{ sink_id })); + incoming_sinks.push(sink_id as _); + } - if let Some(sink_id) = dropping_sink_id { - let drained = incoming_sinks.extract_if(|id| *id == sink_id).collect_vec(); - debug_assert_eq!(drained, vec![sink_id]); - } + if let Some(sink_id) = dropping_sink_id { + let drained = incoming_sinks.extract_if(|id| *id == sink_id).collect_vec(); + debug_assert_eq!(drained, vec![sink_id]); + } - table.incoming_sinks = Set(incoming_sinks.into()); - let table = table.update(txn).await?; + table.incoming_sinks = Set(incoming_sinks.into()); + let table = table.update(txn).await?; + } + // TODO: support other streaming jobs + _ => unreachable!("invalid streaming job type: {:?}", streaming_job.job_type()), + } + // 0. update internal tables // Fields including `fragment_id` were placeholder values before. // After table fragments are created, update them for all internal tables. let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find() @@ -1107,6 +1111,7 @@ impl CatalogController { // TODO: remove cache upstream fragment/actor ids and derive them from `actor_dispatcher` table. let mut to_update_fragment_ids = HashSet::new(); + // 2.1 update downstream actor's upstream_actor_ids for merge_update in merge_updates { assert!(merge_update.removed_upstream_actor_id.is_empty()); assert!(merge_update.new_upstream_fragment_id.is_some()); @@ -1147,6 +1152,7 @@ impl CatalogController { to_update_fragment_ids.insert(fragment_id); } + // 2.2 update downstream fragment's Merge node, and upstream_fragment_id for fragment_id in to_update_fragment_ids { let (fragment_id, mut stream_node, mut upstream_fragment_id) = Fragment::find_by_id(fragment_id)