Skip to content

Commit

Permalink
fix(batch): unpin snapshot until the end of distributed batch query (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Nov 19, 2024
1 parent 34a345a commit 3b2f964
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 29 deletions.
16 changes: 1 addition & 15 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl Debug for QueryRunner {
}

impl QueryRunner {
async fn run(mut self, pinned_snapshot: ReadSnapshot) {
async fn run(mut self, _pinned_snapshot: ReadSnapshot) {
self.query_metrics.running_query_num.inc();
// Start leaf stages.
let leaf_stages = self.query.leaf_stages();
Expand All @@ -308,10 +308,6 @@ impl QueryRunner {
stage_id
);
}
let mut stages_with_table_scan = self.query.stages_with_table_scan();
let has_lookup_join_stage = self.query.has_lookup_join_stage();
// To convince the compiler that `pinned_snapshot` will only be dropped once.
let mut pinned_snapshot_to_drop = Some(pinned_snapshot);

let mut finished_stage_cnt = 0usize;
while let Some(msg_inner) = self.msg_receiver.recv().await {
Expand All @@ -323,16 +319,6 @@ impl QueryRunner {
stage_id
);
self.scheduled_stages_count += 1;
stages_with_table_scan.remove(&stage_id);
// If query contains lookup join we need to delay epoch unpin util the end of
// the query.
if !has_lookup_join_stage && stages_with_table_scan.is_empty() {
// We can be sure here that all the Hummock iterators have been created,
// thus they all successfully pinned a HummockVersion.
// So we can now unpin their epoch.
tracing::trace!("Query {:?} has scheduled all of its stages that have table scan (iterator creation).", self.query.query_id);
pinned_snapshot_to_drop.take();
}

// For root stage, we execute in frontend local. We will pass the root fragment
// to QueryResultFetcher and execute to get a Chunk stream.
Expand Down
14 changes: 0 additions & 14 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,20 +245,6 @@ impl Query {
&self.query_id
}

pub fn stages_with_table_scan(&self) -> HashSet<StageId> {
self.stage_graph
.stages
.iter()
.filter_map(|(stage_id, stage_query)| {
if stage_query.has_table_scan() {
Some(*stage_id)
} else {
None
}
})
.collect()
}

pub fn has_lookup_join_stage(&self) -> bool {
self.stage_graph
.stages
Expand Down

0 comments on commit 3b2f964

Please sign in to comment.