diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 1a95d84371cb..9cbc1f64babe 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -21,7 +21,7 @@ use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}; -use risingwave_common::util::stream_graph_visitor::visit_stream_node; +use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont}; use risingwave_connector::source::SplitImpl; use risingwave_meta_model_v2::SourceId; use risingwave_pb::ddl_service::TableJobType; @@ -603,6 +603,7 @@ impl FragmentManager { .filter_map(|table_id| map.get(table_id).cloned()) .collect_vec(); + let mut dirty_sink_into_table_upstream_fragment_id = HashSet::new(); let mut table_fragments = BTreeMapTransaction::new(map); for table_fragment in &to_delete_table_fragments { table_fragments.remove(table_fragment.table_id()); @@ -634,6 +635,31 @@ impl FragmentManager { }) }); } + + if let Some(sink_fragment) = table_fragment.sink_fragment() { + let dispatchers = sink_fragment + .get_actors() + .iter() + .map(|actor| actor.get_dispatcher()) + .collect_vec(); + + if !dispatchers.is_empty() { + dirty_sink_into_table_upstream_fragment_id.insert(sink_fragment.fragment_id); + } + } + } + + if !dirty_sink_into_table_upstream_fragment_id.is_empty() { + let to_delete_table_ids: HashSet<_> = to_delete_table_fragments + .iter() + .map(|table| table.table_id()) + .collect(); + + Self::clean_dirty_table_sink_downstreams( + dirty_sink_into_table_upstream_fragment_id, + to_delete_table_ids, + &mut table_fragments, + )?; } if table_ids.is_empty() { @@ -656,6 +682,83 @@ impl FragmentManager { Ok(()) } + // When dropping sink into a table, there could be an unexpected meta reboot. At this time, the sink’s catalog might have been deleted, + // but the union branch that attaches the downstream table to the sink fragment may still exist. + // This could lead to issues. Therefore, we need to find the sink fragment’s downstream, then locate its union node and delete the dirty merge. + fn clean_dirty_table_sink_downstreams( + dirty_sink_into_table_upstream_fragment_id: HashSet, + to_delete_table_ids: HashSet, + table_fragments: &mut BTreeMapTransaction<'_, TableId, TableFragments>, + ) -> MetaResult<()> { + tracing::info!("cleaning dirty downstream merge nodes for table sink"); + + let mut dirty_downstream_table_ids = HashMap::new(); + for (table_id, table_fragment) in table_fragments.tree_mut() { + if to_delete_table_ids.contains(table_id) { + continue; + } + + for fragment in table_fragment.fragments.values_mut() { + if fragment + .get_upstream_fragment_ids() + .iter() + .all(|upstream_fragment_id| { + !dirty_sink_into_table_upstream_fragment_id.contains(upstream_fragment_id) + }) + { + continue; + } + + for actor in &mut fragment.actors { + visit_stream_node_cont(actor.nodes.as_mut().unwrap(), |node| { + if let Some(NodeBody::Union(_)) = node.node_body { + for input in &mut node.input { + if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body && dirty_sink_into_table_upstream_fragment_id.contains(&merge_node.upstream_fragment_id) { + dirty_downstream_table_ids.insert(*table_id, fragment.fragment_id); + return false; + } + } + } + true + }) + } + + fragment + .upstream_fragment_ids + .retain(|upstream_fragment_id| { + !dirty_sink_into_table_upstream_fragment_id.contains(upstream_fragment_id) + }); + } + } + + for (table_id, fragment_id) in dirty_downstream_table_ids { + let mut table_fragment = table_fragments + .get_mut(table_id) + .with_context(|| format!("table_fragment not exist: id={}", table_id))?; + + let fragment = table_fragment + .fragments + .get_mut(&fragment_id) + .with_context(|| format!("fragment not exist: id={}", fragment_id))?; + + for actor in &mut fragment.actors { + visit_stream_node_cont(actor.nodes.as_mut().unwrap(), |node| { + if let Some(NodeBody::Union(_)) = node.node_body { + node.input.retain_mut(|input| { + if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body && dirty_sink_into_table_upstream_fragment_id.contains(&merge_node.upstream_fragment_id) { + false + } else { + true + } + }); + } + true + }) + } + } + Ok(()) + } + /// Used in [`crate::barrier::GlobalBarrierManager`], load all actor that need to be sent or /// collected pub async fn load_all_actors( diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index a8b4757203b1..8ee3f9019c3f 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -899,6 +899,27 @@ impl CatalogManager { } } + let mut tables_to_update = vec![]; + for table in database_core.tables.values() { + if table.incoming_sinks.is_empty() { + continue; + } + + if table + .incoming_sinks + .iter() + .all(|sink_id| database_core.sinks.contains_key(sink_id)) + { + continue; + } + + let mut table = table.clone(); + table + .incoming_sinks + .retain(|sink_id| database_core.sinks.contains_key(sink_id)); + tables_to_update.push(table); + } + let tables = &mut database_core.tables; let mut tables = BTreeMapTransaction::new(tables); for table in &tables_to_clean { @@ -907,8 +928,33 @@ impl CatalogManager { let table = tables.remove(table_id); assert!(table.is_some(), "table_id {} missing", table_id) } + + for table in &tables_to_update { + let table_id = table.id; + if tables.contains_key(&table_id) { + tracing::debug!("updating sink target table_id: {}", table_id); + tables.insert(table_id, table.clone()); + } + } + commit_meta!(self, tables)?; + if !tables_to_update.is_empty() { + let _ = self + .notify_frontend( + Operation::Update, + Info::RelationGroup(RelationGroup { + relations: tables_to_update + .into_iter() + .map(|table| Relation { + relation_info: RelationInfo::Table(table).into(), + }) + .collect(), + }), + ) + .await; + } + // Note that `tables_to_clean` doesn't include sink/index/table_with_source creation, // because their states are not persisted in the first place, see `start_create_stream_job_procedure`. let event_logs = tables_to_clean diff --git a/src/meta/src/model/mod.rs b/src/meta/src/model/mod.rs index 870b82d7c0a5..6707108974c6 100644 --- a/src/meta/src/model/mod.rs +++ b/src/meta/src/model/mod.rs @@ -473,6 +473,10 @@ impl<'a, K: Ord + Debug, V: Clone, TXN> BTreeMapTransaction<'a, K, V, TXN> { self.tree_ref } + pub fn tree_mut(&mut self) -> &mut BTreeMap { + self.tree_ref + } + /// Get the value of the provided key by merging the staging value and the original value pub fn get(&self, key: &K) -> Option<&V> { self.staging diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 1b33922e0ea5..3bce1208b596 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -826,6 +826,10 @@ impl DdlController { }); } } + + union_fragment + .upstream_fragment_ids + .push(upstream_fragment_id); } /// Let the stream manager to create the actors, and do some cleanup work after it fails or finishes.