Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): use only StreamTableScan actors on recovery to track progress #13529

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<T> From<TableMap<T>> for HashMap<TableId, T> {
}
}

pub(crate) type TableActorMap = TableMap<Vec<ActorId>>;
pub(crate) type TableActorMap = TableMap<HashSet<ActorId>>;
pub(crate) type TableUpstreamMvCountMap = TableMap<HashMap<TableId, usize>>;
pub(crate) type TableDefinitionMap = TableMap<String>;
pub(crate) type TableNotifierMap = TableMap<Notifier>;
Expand Down Expand Up @@ -1073,7 +1073,9 @@ impl GlobalBarrierManager {
commands.push(command);
}
for progress in resps.iter().flat_map(|r| &r.create_mview_progress) {
tracing::trace!(?progress, "update progress");
if let Some(command) = tracker.update(progress, &version_stats) {
tracing::trace!(?progress, "update progress");
commands.push(command);
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl Progress {
/// Update the progress of `actor`.
fn update(&mut self, actor: ActorId, new_state: ChainState, upstream_total_key_count: u64) {
self.upstream_total_key_count = upstream_total_key_count;
let total_actors = self.states.len();
match self.states.remove(&actor).unwrap() {
ChainState::Init => {}
ChainState::ConsumingUpstream(_, old_consumed_rows) => {
Expand All @@ -104,8 +105,14 @@ impl Progress {
self.consumed_rows += new_consumed_rows;
}
ChainState::Done(new_consumed_rows) => {
tracing::debug!("actor {} done", actor);
self.consumed_rows += new_consumed_rows;
self.done_count += 1;
tracing::debug!(
"{} actors out of {} complete",
self.done_count,
total_actors,
);
}
};
self.states.insert(actor, new_state);
Expand Down Expand Up @@ -258,7 +265,7 @@ impl CreateMviewProgressTracker {
) -> Self {
let mut actor_map = HashMap::new();
let mut progress_map = HashMap::new();
let table_map: HashMap<_, Vec<ActorId>> = table_map.into();
let table_map: HashMap<_, HashSet<ActorId>> = table_map.into();
for (creating_table_id, actors) in table_map {
// 1. Recover `ChainState` in the tracker.
let mut states = HashMap::new();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl GlobalBarrierManager {

let table_map = self
.fragment_manager
.get_table_id_actor_mapping(&creating_table_ids)
.get_table_id_stream_scan_actor_mapping(&creating_table_ids)
.await;
let table_fragment_map = self
.fragment_manager
Expand Down
23 changes: 19 additions & 4 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,33 @@ impl FragmentManager {
/// The `table_ids` here should correspond to stream jobs.
/// We get their corresponding table fragment, and from there,
/// we get the actors that are in the table fragment.
pub async fn get_table_id_actor_mapping(
pub async fn get_table_id_stream_scan_actor_mapping(
&self,
table_ids: &[TableId],
) -> HashMap<TableId, Vec<ActorId>> {
) -> HashMap<TableId, HashSet<ActorId>> {
let map = &self.core.read().await.table_fragments;
let mut table_map = HashMap::new();
// TODO(kwannoel): Can this be unified with `PlanVisitor`?
fn has_stream_scan(stream_node: &StreamNode) -> bool {
let is_node_scan = if let Some(node) = &stream_node.node_body {
node.is_chain()
} else {
false
};
is_node_scan || stream_node.get_input().iter().any(has_stream_scan)
}
for table_id in table_ids {
if let Some(table_fragment) = map.get(table_id) {
let mut actors = vec![];
let mut actors = HashSet::new();
for fragment in table_fragment.fragments.values() {
for actor in &fragment.actors {
actors.push(actor.actor_id)
if let Some(node) = &actor.nodes
&& has_stream_scan(node)
{
actors.insert(actor.actor_id);
} else {
tracing::trace!("ignoring actor: {:?}", actor);
}
}
}
table_map.insert(*table_id, actors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,24 @@ async fn kill_cn_and_wait_recover(cluster: &Cluster) {
sleep(Duration::from_secs(10)).await;
}

async fn kill_and_wait_recover(cluster: &Cluster) {
async fn kill_cn_and_meta_and_wait_recover(cluster: &Cluster) {
cluster
.kill_nodes(
[
"compute-1",
"compute-2",
"compute-3",
"meta-1",
"meta-2",
"meta-3",
],
0,
)
.await;
sleep(Duration::from_secs(10)).await;
}

async fn kill_random_and_wait_recover(cluster: &Cluster) {
// Kill it again
for _ in 0..3 {
sleep(Duration::from_secs(2)).await;
Expand Down Expand Up @@ -106,16 +123,14 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {
.await?;
cluster.run("flush;").await?;

kill_cn_and_wait_recover(&cluster).await;
kill_and_wait_recover(&cluster).await;
kill_random_and_wait_recover(&cluster).await;

// Send some upstream updates.
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;

kill_and_wait_recover(&cluster).await;
kill_cn_and_wait_recover(&cluster).await;

// Send some upstream updates.
Expand All @@ -142,6 +157,46 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_background_join_mv_recovery() -> Result<()> {
init_logger();
let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?;
let mut session = cluster.start_session();

session.run("CREATE TABLE t1 (v1 int)").await?;
session.run("CREATE TABLE t2 (v1 int)").await?;
session
.run("INSERT INTO t1 SELECT generate_series FROM generate_series(1, 200);")
.await?;
session
.run("INSERT INTO t2 SELECT generate_series FROM generate_series(1, 200);")
.await?;
session.flush().await?;
session.run(SET_RATE_LIMIT_2).await?;
session.run(SET_BACKGROUND_DDL).await?;
session
.run("CREATE MATERIALIZED VIEW mv1 as select t1.v1 from t1 join t2 on t1.v1 = t2.v1;")
.await?;
sleep(Duration::from_secs(2)).await;

kill_cn_and_meta_and_wait_recover(&cluster).await;

// Now just wait for it to complete.
session.run(WAIT).await?;

let t_count = session.run("SELECT COUNT(v1) FROM t1").await?;
let mv1_count = session.run("SELECT COUNT(v1) FROM mv1").await?;
assert_eq!(t_count, mv1_count);

// Make sure that if MV killed and restarted
// it will not be dropped.
session.run("DROP MATERIALIZED VIEW mv1;").await?;
session.run("DROP TABLE t1;").await?;
session.run("DROP TABLE t2;").await?;

Ok(())
}

#[tokio::test]
async fn test_background_ddl_cancel() -> Result<()> {
async fn create_mv(session: &mut Session) -> Result<()> {
Expand Down Expand Up @@ -177,7 +232,7 @@ async fn test_background_ddl_cancel() -> Result<()> {
create_mv(&mut session).await?;

// Test cancel after kill meta
kill_and_wait_recover(&cluster).await;
kill_random_and_wait_recover(&cluster).await;

let ids = cancel_stream_jobs(&mut session).await?;
assert_eq!(ids.len(), 1);
Expand Down