Skip to content

Commit

Permalink
Merge branch 'release-1.6' into auto-release-1.6-0523b41bcd07906b5511…
Browse files Browse the repository at this point in the history
…7483bdd30c6e64fb534c
  • Loading branch information
yezizp2012 authored Jan 5, 2024
2 parents ed4d033 + d1fc91a commit 18a149d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 107 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/cron-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
set -euo pipefail

source ci/scripts/common.sh
export RUN_COMPACTION=1;
export RUN_COMPACTION=0;
export RUN_META_BACKUP=1;
export RUN_DELETE_RANGE=1;
source ci/scripts/run-e2e-test.sh
58 changes: 27 additions & 31 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,48 +125,44 @@ impl GlobalBarrierManager {
unreachable!()
};
let mviews = mgr.catalog_manager.list_creating_background_mvs().await;
let creating_mview_ids = mviews.iter().map(|m| TableId::new(m.id)).collect_vec();
let mview_definitions = mviews
.into_iter()
.map(|m| (TableId::new(m.id), m.definition))
.collect::<HashMap<_, _>>();

let mut mview_definitions = HashMap::new();
let mut table_map = HashMap::new();
let mut table_fragment_map = HashMap::new();
let mut upstream_mv_counts = HashMap::new();
let mut senders = HashMap::new();
let mut receivers = Vec::new();
for table_id in creating_mview_ids.iter().copied() {
let (finished_tx, finished_rx) = oneshot::channel();
senders.insert(
table_id,
Notifier {
finished: Some(finished_tx),
..Default::default()
},
);

for mview in mviews {
let table_id = TableId::new(mview.id);
let fragments = mgr
.fragment_manager
.select_table_fragments_by_table_id(&table_id)
.await?;
let internal_table_ids = fragments.internal_table_ids();
let internal_tables = mgr.catalog_manager.get_tables(&internal_table_ids).await;
let table = mgr.catalog_manager.get_tables(&[table_id.table_id]).await;
assert_eq!(table.len(), 1, "should only have 1 materialized table");
let table = table.into_iter().next().unwrap();
receivers.push((table, internal_tables, finished_rx));
if fragments.is_created() {
// If the mview is already created, we don't need to recover it.
mgr.catalog_manager
.finish_create_table_procedure(internal_tables, mview)
.await?;
tracing::debug!("notified frontend for stream job {}", table_id.table_id);
} else {
table_map.insert(table_id, fragments.backfill_actor_ids());
mview_definitions.insert(table_id, mview.definition.clone());
upstream_mv_counts.insert(table_id, fragments.dependent_table_ids());
table_fragment_map.insert(table_id, fragments);
let (finished_tx, finished_rx) = oneshot::channel();
senders.insert(
table_id,
Notifier {
finished: Some(finished_tx),
..Default::default()
},
);
receivers.push((mview, internal_tables, finished_rx));
}
}

let table_map = mgr
.fragment_manager
.get_table_id_stream_scan_actor_mapping(&creating_mview_ids)
.await;
let table_fragment_map = mgr
.fragment_manager
.get_table_id_table_fragment_map(&creating_mview_ids)
.await?;
let upstream_mv_counts = mgr
.fragment_manager
.get_upstream_relation_counts(&creating_mview_ids)
.await;
let version_stats = self.hummock_manager.get_version_stats().await;
// If failed, enter recovery mode.
{
Expand Down
83 changes: 8 additions & 75 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::{
DispatchStrategy, Dispatcher, DispatcherType, FragmentTypeFlag, StreamActor, StreamNode,
StreamScanType,
};
use tokio::sync::{RwLock, RwLockReadGuard};

Expand Down Expand Up @@ -170,74 +169,6 @@ impl FragmentManager {
map.values().cloned().collect()
}

/// The `table_ids` here should correspond to stream jobs.
/// We get their corresponding table fragment, and from there,
/// we get the actors that are in the table fragment.
pub async fn get_table_id_stream_scan_actor_mapping(
&self,
table_ids: &[TableId],
) -> HashMap<TableId, HashSet<ActorId>> {
let map = &self.core.read().await.table_fragments;
let mut table_map = HashMap::new();
// TODO(kwannoel): Can this be unified with `PlanVisitor`?
fn has_backfill(stream_node: &StreamNode) -> bool {
let is_backfill = if let Some(node) = &stream_node.node_body
&& let Some(node) = node.as_stream_scan()
{
node.stream_scan_type == StreamScanType::Backfill as i32
|| node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
} else {
false
};
is_backfill || stream_node.get_input().iter().any(has_backfill)
}
for table_id in table_ids {
if let Some(table_fragment) = map.get(table_id) {
let mut actors = HashSet::new();
for fragment in table_fragment.fragments.values() {
for actor in &fragment.actors {
if let Some(node) = &actor.nodes
&& has_backfill(node)
{
actors.insert(actor.actor_id);
} else {
tracing::trace!("ignoring actor: {:?}", actor);
}
}
}
table_map.insert(*table_id, actors);
}
}
table_map
}

/// Gets the counts for each upstream relation that each stream job
/// indicated by `table_ids` depends on.
/// For example in the following query:
/// ```sql
/// CREATE MATERIALIZED VIEW m1 AS
/// SELECT * FROM t1 JOIN t2 ON t1.a = t2.a JOIN t3 ON t2.b = t3.b
/// ```
///
/// We have t1 occurring once, and t2 occurring once.
pub async fn get_upstream_relation_counts(
&self,
table_ids: &[TableId],
) -> HashMap<TableId, HashMap<TableId, usize>> {
let map = &self.core.read().await.table_fragments;
let mut upstream_relation_counts = HashMap::new();
for table_id in table_ids {
if let Some(table_fragments) = map.get(table_id) {
let dependent_ids = table_fragments.dependent_table_ids();
let r = upstream_relation_counts.insert(*table_id, dependent_ids);
assert!(r.is_none(), "Each table_id should be unique!")
} else {
upstream_relation_counts.insert(*table_id, HashMap::new());
}
}
upstream_relation_counts
}

pub fn get_mv_id_to_internal_table_ids_mapping(&self) -> Option<Vec<(u32, Vec<u32>)>> {
match self.core.try_read() {
Ok(core) => Some(
Expand Down Expand Up @@ -614,14 +545,16 @@ impl FragmentManager {
if table_ids.contains(&dependent_table_id) {
continue;
}
let mut dependent_table = table_fragments
.get_mut(dependent_table_id)
.with_context(|| {
format!(
let mut dependent_table =
if let Some(dependent_table) = table_fragments.get_mut(dependent_table_id) {
dependent_table
} else {
tracing::error!(
"dependent table_fragment not exist: id={}",
dependent_table_id
)
})?;
);
continue;
};

dependent_table
.fragments
Expand Down

0 comments on commit 18a149d

Please sign in to comment.