From 3b2f9640af562740507e7ba0d6140be8fa9b1453 Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Tue, 19 Nov 2024 10:50:40 +0800 Subject: [PATCH] fix(batch): unpin snapshot until the end of distributed batch query (#19429) --- src/frontend/src/scheduler/distributed/query.rs | 16 +--------------- src/frontend/src/scheduler/plan_fragmenter.rs | 14 -------------- 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index e8ebaf8256388..18ace169da129 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -296,7 +296,7 @@ impl Debug for QueryRunner { } impl QueryRunner { - async fn run(mut self, pinned_snapshot: ReadSnapshot) { + async fn run(mut self, _pinned_snapshot: ReadSnapshot) { self.query_metrics.running_query_num.inc(); // Start leaf stages. let leaf_stages = self.query.leaf_stages(); @@ -308,10 +308,6 @@ impl QueryRunner { stage_id ); } - let mut stages_with_table_scan = self.query.stages_with_table_scan(); - let has_lookup_join_stage = self.query.has_lookup_join_stage(); - // To convince the compiler that `pinned_snapshot` will only be dropped once. - let mut pinned_snapshot_to_drop = Some(pinned_snapshot); let mut finished_stage_cnt = 0usize; while let Some(msg_inner) = self.msg_receiver.recv().await { @@ -323,16 +319,6 @@ impl QueryRunner { stage_id ); self.scheduled_stages_count += 1; - stages_with_table_scan.remove(&stage_id); - // If query contains lookup join we need to delay epoch unpin util the end of - // the query. - if !has_lookup_join_stage && stages_with_table_scan.is_empty() { - // We can be sure here that all the Hummock iterators have been created, - // thus they all successfully pinned a HummockVersion. - // So we can now unpin their epoch. - tracing::trace!("Query {:?} has scheduled all of its stages that have table scan (iterator creation).", self.query.query_id); - pinned_snapshot_to_drop.take(); - } // For root stage, we execute in frontend local. We will pass the root fragment // to QueryResultFetcher and execute to get a Chunk stream. diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 74ade3fdab836..d9c563c4406cf 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -245,20 +245,6 @@ impl Query { &self.query_id } - pub fn stages_with_table_scan(&self) -> HashSet { - self.stage_graph - .stages - .iter() - .filter_map(|(stage_id, stage_query)| { - if stage_query.has_table_scan() { - Some(*stage_id) - } else { - None - } - }) - .collect() - } - pub fn has_lookup_join_stage(&self) -> bool { self.stage_graph .stages