Skip to content

Commit

Permalink
fix: clean the downstream table of the dirty table sink during recove…
Browse files Browse the repository at this point in the history
…ry. (#13957)

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Dec 15, 2023
1 parent 0561cd7 commit 2243a02
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 1 deletion.
105 changes: 104 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping};
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont};
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model_v2::SourceId;
use risingwave_pb::ddl_service::TableJobType;
Expand Down Expand Up @@ -603,6 +603,7 @@ impl FragmentManager {
.filter_map(|table_id| map.get(table_id).cloned())
.collect_vec();

let mut dirty_sink_into_table_upstream_fragment_id = HashSet::new();
let mut table_fragments = BTreeMapTransaction::new(map);
for table_fragment in &to_delete_table_fragments {
table_fragments.remove(table_fragment.table_id());
Expand Down Expand Up @@ -634,6 +635,31 @@ impl FragmentManager {
})
});
}

if let Some(sink_fragment) = table_fragment.sink_fragment() {
let dispatchers = sink_fragment
.get_actors()
.iter()
.map(|actor| actor.get_dispatcher())
.collect_vec();

if !dispatchers.is_empty() {
dirty_sink_into_table_upstream_fragment_id.insert(sink_fragment.fragment_id);
}
}
}

if !dirty_sink_into_table_upstream_fragment_id.is_empty() {
let to_delete_table_ids: HashSet<_> = to_delete_table_fragments
.iter()
.map(|table| table.table_id())
.collect();

Self::clean_dirty_table_sink_downstreams(
dirty_sink_into_table_upstream_fragment_id,
to_delete_table_ids,
&mut table_fragments,
)?;
}

if table_ids.is_empty() {
Expand All @@ -656,6 +682,83 @@ impl FragmentManager {
Ok(())
}

// When dropping sink into a table, there could be an unexpected meta reboot. At this time, the sink’s catalog might have been deleted,
// but the union branch that attaches the downstream table to the sink fragment may still exist.
// This could lead to issues. Therefore, we need to find the sink fragment’s downstream, then locate its union node and delete the dirty merge.
fn clean_dirty_table_sink_downstreams(
dirty_sink_into_table_upstream_fragment_id: HashSet<u32>,
to_delete_table_ids: HashSet<TableId>,
table_fragments: &mut BTreeMapTransaction<'_, TableId, TableFragments>,
) -> MetaResult<()> {
tracing::info!("cleaning dirty downstream merge nodes for table sink");

let mut dirty_downstream_table_ids = HashMap::new();
for (table_id, table_fragment) in table_fragments.tree_mut() {
if to_delete_table_ids.contains(table_id) {
continue;
}

for fragment in table_fragment.fragments.values_mut() {
if fragment
.get_upstream_fragment_ids()
.iter()
.all(|upstream_fragment_id| {
!dirty_sink_into_table_upstream_fragment_id.contains(upstream_fragment_id)
})
{
continue;
}

for actor in &mut fragment.actors {
visit_stream_node_cont(actor.nodes.as_mut().unwrap(), |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
for input in &mut node.input {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body && dirty_sink_into_table_upstream_fragment_id.contains(&merge_node.upstream_fragment_id) {
dirty_downstream_table_ids.insert(*table_id, fragment.fragment_id);
return false;
}
}
}
true
})
}

fragment
.upstream_fragment_ids
.retain(|upstream_fragment_id| {
!dirty_sink_into_table_upstream_fragment_id.contains(upstream_fragment_id)
});
}
}

for (table_id, fragment_id) in dirty_downstream_table_ids {
let mut table_fragment = table_fragments
.get_mut(table_id)
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;

let fragment = table_fragment
.fragments
.get_mut(&fragment_id)
.with_context(|| format!("fragment not exist: id={}", fragment_id))?;

for actor in &mut fragment.actors {
visit_stream_node_cont(actor.nodes.as_mut().unwrap(), |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
node.input.retain_mut(|input| {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body && dirty_sink_into_table_upstream_fragment_id.contains(&merge_node.upstream_fragment_id) {
false
} else {
true
}
});
}
true
})
}
}
Ok(())
}

/// Used in [`crate::barrier::GlobalBarrierManager`], load all actor that need to be sent or
/// collected
pub async fn load_all_actors(
Expand Down
46 changes: 46 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,27 @@ impl CatalogManager {
}
}

let mut tables_to_update = vec![];
for table in database_core.tables.values() {
if table.incoming_sinks.is_empty() {
continue;
}

if table
.incoming_sinks
.iter()
.all(|sink_id| database_core.sinks.contains_key(sink_id))
{
continue;
}

let mut table = table.clone();
table
.incoming_sinks
.retain(|sink_id| database_core.sinks.contains_key(sink_id));
tables_to_update.push(table);
}

let tables = &mut database_core.tables;
let mut tables = BTreeMapTransaction::new(tables);
for table in &tables_to_clean {
Expand All @@ -907,8 +928,33 @@ impl CatalogManager {
let table = tables.remove(table_id);
assert!(table.is_some(), "table_id {} missing", table_id)
}

for table in &tables_to_update {
let table_id = table.id;
if tables.contains_key(&table_id) {
tracing::debug!("updating sink target table_id: {}", table_id);
tables.insert(table_id, table.clone());
}
}

commit_meta!(self, tables)?;

if !tables_to_update.is_empty() {
let _ = self
.notify_frontend(
Operation::Update,
Info::RelationGroup(RelationGroup {
relations: tables_to_update
.into_iter()
.map(|table| Relation {
relation_info: RelationInfo::Table(table).into(),
})
.collect(),
}),
)
.await;
}

// Note that `tables_to_clean` doesn't include sink/index/table_with_source creation,
// because their states are not persisted in the first place, see `start_create_stream_job_procedure`.
let event_logs = tables_to_clean
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ impl<'a, K: Ord + Debug, V: Clone, TXN> BTreeMapTransaction<'a, K, V, TXN> {
self.tree_ref
}

pub fn tree_mut(&mut self) -> &mut BTreeMap<K, V> {
self.tree_ref
}

/// Get the value of the provided key by merging the staging value and the original value
pub fn get(&self, key: &K) -> Option<&V> {
self.staging
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,10 @@ impl DdlController {
});
}
}

union_fragment
.upstream_fragment_ids
.push(upstream_fragment_id);
}

/// Let the stream manager to create the actors, and do some cleanup work after it fails or finishes.
Expand Down

0 comments on commit 2243a02

Please sign in to comment.