From f8aee748c0c1848b8f2947c9bbaf3c24434a3ae2 Mon Sep 17 00:00:00 2001 From: August Date: Fri, 5 Jan 2024 14:53:12 +0800 Subject: [PATCH 1/2] fix: Do not recover background streaming jobs whose table fragments have been marked as created (#14367) (#14378) Co-authored-by: zwang28 <84491488@qq.com> --- ci/scripts/cron-e2e-test.sh | 2 +- src/meta/src/barrier/recovery.rs | 58 ++++++++++---------- src/meta/src/manager/catalog/fragment.rs | 69 ------------------------ 3 files changed, 28 insertions(+), 101 deletions(-) diff --git a/ci/scripts/cron-e2e-test.sh b/ci/scripts/cron-e2e-test.sh index 2e8c56c3c1d5f..52c7a6faa366c 100755 --- a/ci/scripts/cron-e2e-test.sh +++ b/ci/scripts/cron-e2e-test.sh @@ -4,7 +4,7 @@ set -euo pipefail source ci/scripts/common.sh -export RUN_COMPACTION=1; +export RUN_COMPACTION=0; export RUN_META_BACKUP=1; export RUN_DELETE_RANGE=1; source ci/scripts/run-e2e-test.sh diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 53fc94e7c7c5e..82dd5509a5845 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -125,48 +125,44 @@ impl GlobalBarrierManager { unreachable!() }; let mviews = mgr.catalog_manager.list_creating_background_mvs().await; - let creating_mview_ids = mviews.iter().map(|m| TableId::new(m.id)).collect_vec(); - let mview_definitions = mviews - .into_iter() - .map(|m| (TableId::new(m.id), m.definition)) - .collect::>(); + let mut mview_definitions = HashMap::new(); + let mut table_map = HashMap::new(); + let mut table_fragment_map = HashMap::new(); + let mut upstream_mv_counts = HashMap::new(); let mut senders = HashMap::new(); let mut receivers = Vec::new(); - for table_id in creating_mview_ids.iter().copied() { - let (finished_tx, finished_rx) = oneshot::channel(); - senders.insert( - table_id, - Notifier { - finished: Some(finished_tx), - ..Default::default() - }, - ); - + for mview in mviews { + let table_id = TableId::new(mview.id); let fragments = mgr .fragment_manager .select_table_fragments_by_table_id(&table_id) .await?; let internal_table_ids = fragments.internal_table_ids(); let internal_tables = mgr.catalog_manager.get_tables(&internal_table_ids).await; - let table = mgr.catalog_manager.get_tables(&[table_id.table_id]).await; - assert_eq!(table.len(), 1, "should only have 1 materialized table"); - let table = table.into_iter().next().unwrap(); - receivers.push((table, internal_tables, finished_rx)); + if fragments.is_created() { + // If the mview is already created, we don't need to recover it. + mgr.catalog_manager + .finish_create_table_procedure(internal_tables, mview) + .await?; + tracing::debug!("notified frontend for stream job {}", table_id.table_id); + } else { + table_map.insert(table_id, fragments.backfill_actor_ids()); + mview_definitions.insert(table_id, mview.definition.clone()); + upstream_mv_counts.insert(table_id, fragments.dependent_table_ids()); + table_fragment_map.insert(table_id, fragments); + let (finished_tx, finished_rx) = oneshot::channel(); + senders.insert( + table_id, + Notifier { + finished: Some(finished_tx), + ..Default::default() + }, + ); + receivers.push((mview, internal_tables, finished_rx)); + } } - let table_map = mgr - .fragment_manager - .get_table_id_stream_scan_actor_mapping(&creating_mview_ids) - .await; - let table_fragment_map = mgr - .fragment_manager - .get_table_id_table_fragment_map(&creating_mview_ids) - .await?; - let upstream_mv_counts = mgr - .fragment_manager - .get_upstream_relation_counts(&creating_mview_ids) - .await; let version_stats = self.hummock_manager.get_version_stats().await; // If failed, enter recovery mode. { diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index f093bbc9081de..46fb7fc7bd36c 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -33,7 +33,6 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ DispatchStrategy, Dispatcher, DispatcherType, FragmentTypeFlag, StreamActor, StreamNode, - StreamScanType, }; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -170,74 +169,6 @@ impl FragmentManager { map.values().cloned().collect() } - /// The `table_ids` here should correspond to stream jobs. - /// We get their corresponding table fragment, and from there, - /// we get the actors that are in the table fragment. - pub async fn get_table_id_stream_scan_actor_mapping( - &self, - table_ids: &[TableId], - ) -> HashMap> { - let map = &self.core.read().await.table_fragments; - let mut table_map = HashMap::new(); - // TODO(kwannoel): Can this be unified with `PlanVisitor`? - fn has_backfill(stream_node: &StreamNode) -> bool { - let is_backfill = if let Some(node) = &stream_node.node_body - && let Some(node) = node.as_stream_scan() - { - node.stream_scan_type == StreamScanType::Backfill as i32 - || node.stream_scan_type == StreamScanType::ArrangementBackfill as i32 - } else { - false - }; - is_backfill || stream_node.get_input().iter().any(has_backfill) - } - for table_id in table_ids { - if let Some(table_fragment) = map.get(table_id) { - let mut actors = HashSet::new(); - for fragment in table_fragment.fragments.values() { - for actor in &fragment.actors { - if let Some(node) = &actor.nodes - && has_backfill(node) - { - actors.insert(actor.actor_id); - } else { - tracing::trace!("ignoring actor: {:?}", actor); - } - } - } - table_map.insert(*table_id, actors); - } - } - table_map - } - - /// Gets the counts for each upstream relation that each stream job - /// indicated by `table_ids` depends on. - /// For example in the following query: - /// ```sql - /// CREATE MATERIALIZED VIEW m1 AS - /// SELECT * FROM t1 JOIN t2 ON t1.a = t2.a JOIN t3 ON t2.b = t3.b - /// ``` - /// - /// We have t1 occurring once, and t2 occurring once. - pub async fn get_upstream_relation_counts( - &self, - table_ids: &[TableId], - ) -> HashMap> { - let map = &self.core.read().await.table_fragments; - let mut upstream_relation_counts = HashMap::new(); - for table_id in table_ids { - if let Some(table_fragments) = map.get(table_id) { - let dependent_ids = table_fragments.dependent_table_ids(); - let r = upstream_relation_counts.insert(*table_id, dependent_ids); - assert!(r.is_none(), "Each table_id should be unique!") - } else { - upstream_relation_counts.insert(*table_id, HashMap::new()); - } - } - upstream_relation_counts - } - pub fn get_mv_id_to_internal_table_ids_mapping(&self) -> Option)>> { match self.core.try_read() { Ok(core) => Some( From d1fc91ac06d87044a6dac4197b546464247e8b36 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 5 Jan 2024 08:38:51 +0000 Subject: [PATCH 2/2] fix: ignore dependent table_fragment not exist (#14308) (#14359) Co-authored-by: Eric Fu Co-authored-by: August --- src/meta/src/manager/catalog/fragment.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 46fb7fc7bd36c..89a9cc14c25af 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -545,14 +545,16 @@ impl FragmentManager { if table_ids.contains(&dependent_table_id) { continue; } - let mut dependent_table = table_fragments - .get_mut(dependent_table_id) - .with_context(|| { - format!( + let mut dependent_table = + if let Some(dependent_table) = table_fragments.get_mut(dependent_table_id) { + dependent_table + } else { + tracing::error!( "dependent table_fragment not exist: id={}", dependent_table_id - ) - })?; + ); + continue; + }; dependent_table .fragments