From f062d3278cd98dec4602660843ab20c1ca882c18 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Mon, 6 Nov 2023 23:16:22 -0800 Subject: [PATCH] merged_tree: drop outer loop in `TreeDiffStreamImpl::poll_next()` As suggested by Yuya. I also added a comment and an assertion in the case where return `Poll::Pending`. --- lib/src/merged_tree.rs | 96 +++++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/lib/src/merged_tree.rs b/lib/src/merged_tree.rs index c98d559d1a..a9135dceec 100644 --- a/lib/src/merged_tree.rs +++ b/lib/src/merged_tree.rs @@ -1176,59 +1176,59 @@ impl Stream for TreeDiffStreamImpl<'_> { type Item = (RepoPath, BackendResult<(MergedTreeValue, MergedTreeValue)>); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - while !(self.items.is_empty() && self.pending_trees.is_empty()) { - // Go through all pending tree futures and poll them. - let mut pending_index = 0; - while pending_index < self.pending_trees.len() - && (pending_index < self.max_concurrent_reads - || self.items.len() < self.max_queued_items) - { - let (_, future) = &mut self.pending_trees[pending_index]; - if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) { - let (dir, _) = self.pending_trees.remove(pending_index).unwrap(); - let key = DiffStreamKey::normal(dir); - // Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry - // to `self.items`. - let (before, after) = self.items.remove(&key).unwrap().unwrap(); - // If this was a transition from file to tree or vice versa, add back an item - // for just the removal/addition of the file. - if before.is_present() && !before.is_tree() { - self.items - .insert(key.clone(), Ok((before, Merge::absent()))); - } else if after.is_present() && !after.is_tree() { - self.items.insert( - DiffStreamKey::file_after_dir(key.path.clone()), - Ok((Merge::absent(), after)), - ); - } - self.add_dir_diff_items(key.path, tree_diff); - } else { - pending_index += 1; + // Go through all pending tree futures and poll them. + let mut pending_index = 0; + while pending_index < self.pending_trees.len() + && (pending_index < self.max_concurrent_reads + || self.items.len() < self.max_queued_items) + { + let (_, future) = &mut self.pending_trees[pending_index]; + if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) { + let (dir, _) = self.pending_trees.remove(pending_index).unwrap(); + let key = DiffStreamKey::normal(dir); + // Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry + // to `self.items`. + let (before, after) = self.items.remove(&key).unwrap().unwrap(); + // If this was a transition from file to tree or vice versa, add back an item + // for just the removal/addition of the file. + if before.is_present() && !before.is_tree() { + self.items + .insert(key.clone(), Ok((before, Merge::absent()))); + } else if after.is_present() && !after.is_tree() { + self.items.insert( + DiffStreamKey::file_after_dir(key.path.clone()), + Ok((Merge::absent(), after)), + ); } + self.add_dir_diff_items(key.path, tree_diff); + } else { + pending_index += 1; } + } - // Now emit the first file, or the first tree that completed with an error - while let Some(entry) = self.items.first_entry() { - match entry.get() { - Err(_) => { - // File or tree with error - let (key, result) = entry.remove_entry(); - return Poll::Ready(Some((key.path, result))); - } - Ok((before, after)) if !before.is_tree() && !after.is_tree() => { - let (key, result) = entry.remove_entry(); - return Poll::Ready(Some((key.path, result))); - } - _ => { - if !self.pending_trees.is_empty() { - return Poll::Pending; - } - } - }; + // Now emit the first file, or the first tree that completed with an error + if let Some(entry) = self.items.first_entry() { + match entry.get() { + Err(_) => { + // File or tree with error + let (key, result) = entry.remove_entry(); + Poll::Ready(Some((key.path, result))) + } + Ok((before, after)) if !before.is_tree() && !after.is_tree() => { + // A diff with no trees involved + let (key, result) = entry.remove_entry(); + Poll::Ready(Some((key.path, result))) + } + _ => { + // The first entry has a tree on at least one side (before or after). We need to + // wait for that future to complete. + assert!(!self.pending_trees.is_empty()); + Poll::Pending + } } + } else { + Poll::Ready(None) } - - Poll::Ready(None) } }