Skip to content

Commit

Permalink
fix: Do not recover background streaming jobs whose table fragments h…
Browse files Browse the repository at this point in the history
…ave been marked as created (#14367)

Co-authored-by: zwang28 <[email protected]>
  • Loading branch information
2 people authored and Li0k committed Jan 10, 2024
1 parent abee759 commit 7a784cc
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 106 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/cron-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
set -euo pipefail

source ci/scripts/common.sh
export RUN_COMPACTION=1;
export RUN_COMPACTION=0;
export RUN_META_BACKUP=1;
export RUN_DELETE_RANGE=1;
source ci/scripts/run-e2e-test.sh
66 changes: 30 additions & 36 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager};
use crate::controller::catalog::ReleaseContext;
use crate::manager::{MetadataManager, WorkerId};
use crate::model::{ActorId, BarrierManagerState, MetadataModel, MigrationPlan, TableFragments};
use crate::model::{BarrierManagerState, MetadataModel, MigrationPlan, TableFragments};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
use crate::MetaResult;

Expand Down Expand Up @@ -149,48 +149,44 @@ impl GlobalBarrierManager {
unreachable!()
};
let mviews = mgr.catalog_manager.list_creating_background_mvs().await;
let creating_mview_ids = mviews.iter().map(|m| TableId::new(m.id)).collect_vec();
let mview_definitions = mviews
.into_iter()
.map(|m| (TableId::new(m.id), m.definition))
.collect::<HashMap<_, _>>();

let mut mview_definitions = HashMap::new();
let mut table_map = HashMap::new();
let mut table_fragment_map = HashMap::new();
let mut upstream_mv_counts = HashMap::new();
let mut senders = HashMap::new();
let mut receivers = Vec::new();
for table_id in creating_mview_ids.iter().copied() {
let (finished_tx, finished_rx) = oneshot::channel();
senders.insert(
table_id,
Notifier {
finished: Some(finished_tx),
..Default::default()
},
);

for mview in mviews {
let table_id = TableId::new(mview.id);
let fragments = mgr
.fragment_manager
.select_table_fragments_by_table_id(&table_id)
.await?;
let internal_table_ids = fragments.internal_table_ids();
let internal_tables = mgr.catalog_manager.get_tables(&internal_table_ids).await;
let table = mgr.catalog_manager.get_tables(&[table_id.table_id]).await;
assert_eq!(table.len(), 1, "should only have 1 materialized table");
let table = table.into_iter().next().unwrap();
receivers.push((table, internal_tables, finished_rx));
if fragments.is_created() {
// If the mview is already created, we don't need to recover it.
mgr.catalog_manager
.finish_create_table_procedure(internal_tables, mview)
.await?;
tracing::debug!("notified frontend for stream job {}", table_id.table_id);
} else {
table_map.insert(table_id, fragments.backfill_actor_ids());
mview_definitions.insert(table_id, mview.definition.clone());
upstream_mv_counts.insert(table_id, fragments.dependent_table_ids());
table_fragment_map.insert(table_id, fragments);
let (finished_tx, finished_rx) = oneshot::channel();
senders.insert(
table_id,
Notifier {
finished: Some(finished_tx),
..Default::default()
},
);
receivers.push((mview, internal_tables, finished_rx));
}
}

let table_map = mgr
.fragment_manager
.get_table_id_stream_scan_actor_mapping(&creating_mview_ids)
.await;
let table_fragment_map = mgr
.fragment_manager
.get_table_id_table_fragment_map(&creating_mview_ids)
.await?;
let upstream_mv_counts = mgr
.fragment_manager
.get_upstream_relation_counts(&creating_mview_ids)
.await;
let version_stats = self.hummock_manager.get_version_stats().await;
// If failed, enter recovery mode.
{
Expand Down Expand Up @@ -252,6 +248,7 @@ impl GlobalBarrierManager {
let mut receivers = Vec::new();
let mut table_fragment_map = HashMap::new();
let mut mview_definitions = HashMap::new();
let mut table_map = HashMap::new();
let mut upstream_mv_counts = HashMap::new();
for mview in &mviews {
let (finished_tx, finished_rx) = oneshot::channel();
Expand All @@ -270,15 +267,12 @@ impl GlobalBarrierManager {
.await?;
let table_fragments = TableFragments::from_protobuf(table_fragments);
upstream_mv_counts.insert(table_id, table_fragments.dependent_table_ids());
table_map.insert(table_id, table_fragments.backfill_actor_ids());
table_fragment_map.insert(table_id, table_fragments);
mview_definitions.insert(table_id, mview.definition.clone());
receivers.push((mview.table_id, finished_rx));
}

let table_map: HashMap<TableId, HashSet<ActorId>> = table_fragment_map
.iter()
.map(|(id, tf)| (*id, tf.actor_ids().into_iter().collect()))
.collect();
let version_stats = self.hummock_manager.get_version_stats().await;
// If failed, enter recovery mode.
{
Expand Down
69 changes: 0 additions & 69 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::{
DispatchStrategy, Dispatcher, DispatcherType, FragmentTypeFlag, StreamActor, StreamNode,
StreamScanType,
};
use tokio::sync::{RwLock, RwLockReadGuard};

Expand Down Expand Up @@ -170,74 +169,6 @@ impl FragmentManager {
map.values().cloned().collect()
}

/// The `table_ids` here should correspond to stream jobs.
/// We get their corresponding table fragment, and from there,
/// we get the actors that are in the table fragment.
pub async fn get_table_id_stream_scan_actor_mapping(
&self,
table_ids: &[TableId],
) -> HashMap<TableId, HashSet<ActorId>> {
let map = &self.core.read().await.table_fragments;
let mut table_map = HashMap::new();
// TODO(kwannoel): Can this be unified with `PlanVisitor`?
fn has_backfill(stream_node: &StreamNode) -> bool {
let is_backfill = if let Some(node) = &stream_node.node_body
&& let Some(node) = node.as_stream_scan()
{
node.stream_scan_type == StreamScanType::Backfill as i32
|| node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
} else {
false
};
is_backfill || stream_node.get_input().iter().any(has_backfill)
}
for table_id in table_ids {
if let Some(table_fragment) = map.get(table_id) {
let mut actors = HashSet::new();
for fragment in table_fragment.fragments.values() {
for actor in &fragment.actors {
if let Some(node) = &actor.nodes
&& has_backfill(node)
{
actors.insert(actor.actor_id);
} else {
tracing::trace!("ignoring actor: {:?}", actor);
}
}
}
table_map.insert(*table_id, actors);
}
}
table_map
}

/// Gets the counts for each upstream relation that each stream job
/// indicated by `table_ids` depends on.
/// For example in the following query:
/// ```sql
/// CREATE MATERIALIZED VIEW m1 AS
/// SELECT * FROM t1 JOIN t2 ON t1.a = t2.a JOIN t3 ON t2.b = t3.b
/// ```
///
/// We have t1 occurring once, and t2 occurring once.
pub async fn get_upstream_relation_counts(
&self,
table_ids: &[TableId],
) -> HashMap<TableId, HashMap<TableId, usize>> {
let map = &self.core.read().await.table_fragments;
let mut upstream_relation_counts = HashMap::new();
for table_id in table_ids {
if let Some(table_fragments) = map.get(table_id) {
let dependent_ids = table_fragments.dependent_table_ids();
let r = upstream_relation_counts.insert(*table_id, dependent_ids);
assert!(r.is_none(), "Each table_id should be unique!")
} else {
upstream_relation_counts.insert(*table_id, HashMap::new());
}
}
upstream_relation_counts
}

pub fn get_mv_id_to_internal_table_ids_mapping(&self) -> Option<Vec<(u32, Vec<u32>)>> {
match self.core.try_read() {
Ok(core) => Some(
Expand Down

0 comments on commit 7a784cc

Please sign in to comment.