Skip to content

Commit

Permalink
merged_tree: drop outer loop in TreeDiffStreamImpl::poll_next()
Browse files Browse the repository at this point in the history
As suggested by Yuya. I also added a comment and an assertion in the
case where return `Poll::Pending`.
  • Loading branch information
martinvonz committed Nov 7, 2023
1 parent d989d40 commit f062d32
Showing 1 changed file with 48 additions and 48 deletions.
96 changes: 48 additions & 48 deletions lib/src/merged_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,59 +1176,59 @@ impl Stream for TreeDiffStreamImpl<'_> {
type Item = (RepoPath, BackendResult<(MergedTreeValue, MergedTreeValue)>);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while !(self.items.is_empty() && self.pending_trees.is_empty()) {
// Go through all pending tree futures and poll them.
let mut pending_index = 0;
while pending_index < self.pending_trees.len()
&& (pending_index < self.max_concurrent_reads
|| self.items.len() < self.max_queued_items)
{
let (_, future) = &mut self.pending_trees[pending_index];
if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
let key = DiffStreamKey::normal(dir);
// Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry
// to `self.items`.
let (before, after) = self.items.remove(&key).unwrap().unwrap();
// If this was a transition from file to tree or vice versa, add back an item
// for just the removal/addition of the file.
if before.is_present() && !before.is_tree() {
self.items
.insert(key.clone(), Ok((before, Merge::absent())));
} else if after.is_present() && !after.is_tree() {
self.items.insert(
DiffStreamKey::file_after_dir(key.path.clone()),
Ok((Merge::absent(), after)),
);
}
self.add_dir_diff_items(key.path, tree_diff);
} else {
pending_index += 1;
// Go through all pending tree futures and poll them.
let mut pending_index = 0;
while pending_index < self.pending_trees.len()
&& (pending_index < self.max_concurrent_reads
|| self.items.len() < self.max_queued_items)
{
let (_, future) = &mut self.pending_trees[pending_index];
if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
let key = DiffStreamKey::normal(dir);
// Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry
// to `self.items`.
let (before, after) = self.items.remove(&key).unwrap().unwrap();
// If this was a transition from file to tree or vice versa, add back an item
// for just the removal/addition of the file.
if before.is_present() && !before.is_tree() {
self.items
.insert(key.clone(), Ok((before, Merge::absent())));
} else if after.is_present() && !after.is_tree() {
self.items.insert(
DiffStreamKey::file_after_dir(key.path.clone()),
Ok((Merge::absent(), after)),
);
}
self.add_dir_diff_items(key.path, tree_diff);
} else {
pending_index += 1;
}
}

// Now emit the first file, or the first tree that completed with an error
while let Some(entry) = self.items.first_entry() {
match entry.get() {
Err(_) => {
// File or tree with error
let (key, result) = entry.remove_entry();
return Poll::Ready(Some((key.path, result)));
}
Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
let (key, result) = entry.remove_entry();
return Poll::Ready(Some((key.path, result)));
}
_ => {
if !self.pending_trees.is_empty() {
return Poll::Pending;
}
}
};
// Now emit the first file, or the first tree that completed with an error
if let Some(entry) = self.items.first_entry() {
match entry.get() {
Err(_) => {
// File or tree with error
let (key, result) = entry.remove_entry();
Poll::Ready(Some((key.path, result)))
}
Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
// A diff with no trees involved
let (key, result) = entry.remove_entry();
Poll::Ready(Some((key.path, result)))
}
_ => {
// The first entry has a tree on at least one side (before or after). We need to
// wait for that future to complete.
assert!(!self.pending_trees.is_empty());
Poll::Pending
}
}
} else {
Poll::Ready(None)
}

Poll::Ready(None)
}
}

Expand Down

0 comments on commit f062d32

Please sign in to comment.