Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up concurrent tree diff a bit #2539

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 54 additions & 50 deletions lib/src/merged_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1170,65 +1170,69 @@ impl<'matcher> TreeDiffStreamImpl<'matcher> {
}
}
}

fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
let mut pending_index = 0;
while pending_index < self.pending_trees.len()
&& (pending_index < self.max_concurrent_reads
|| self.items.len() < self.max_queued_items)
{
let (_, future) = &mut self.pending_trees[pending_index];
if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
let key = DiffStreamKey::normal(dir);
// Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry
// to `self.items`.
let (before, after) = self.items.remove(&key).unwrap().unwrap();
// If this was a transition from file to tree or vice versa, add back an item
// for just the removal/addition of the file.
if before.is_present() && !before.is_tree() {
self.items
.insert(key.clone(), Ok((before, Merge::absent())));
} else if after.is_present() && !after.is_tree() {
self.items.insert(
DiffStreamKey::file_after_dir(key.path.clone()),
Ok((Merge::absent(), after)),
);
}
self.add_dir_diff_items(key.path, tree_diff);
} else {
pending_index += 1;
}
}
}
}

impl Stream for TreeDiffStreamImpl<'_> {
type Item = (RepoPath, BackendResult<(MergedTreeValue, MergedTreeValue)>);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while !(self.items.is_empty() && self.pending_trees.is_empty()) {
// Go through all pending tree futures and poll them.
let mut pending_index = 0;
while pending_index < self.pending_trees.len()
&& (pending_index < self.max_concurrent_reads
|| self.items.len() < self.max_queued_items)
{
let (_, future) = &mut self.pending_trees[pending_index];
if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
let key = DiffStreamKey::normal(dir);
// Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry
// to `self.items`.
let (before, after) = self.items.remove(&key).unwrap().unwrap();
// If this was a transition from file to tree or vice versa, add back an item
// for just the removal/addition of the file.
if before.is_present() && !before.is_tree() {
self.items
.insert(key.clone(), Ok((before, Merge::absent())));
} else if after.is_present() && !after.is_tree() {
self.items.insert(
DiffStreamKey::file_after_dir(key.path.clone()),
Ok((Merge::absent(), after)),
);
}
self.add_dir_diff_items(key.path, tree_diff);
} else {
pending_index += 1;
// Go through all pending tree futures and poll them.
self.poll_tree_futures(cx);

// Now emit the first file, or the first tree that completed with an error
if let Some(entry) = self.items.first_entry() {
match entry.get() {
Err(_) => {
// File or tree with error
let (key, result) = entry.remove_entry();
Poll::Ready(Some((key.path, result)))
}
Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
// A diff with no trees involved
let (key, result) = entry.remove_entry();
Poll::Ready(Some((key.path, result)))
}
_ => {
// The first entry has a tree on at least one side (before or after). We need to
// wait for that future to complete.
assert!(!self.pending_trees.is_empty());
Poll::Pending
}
}

// Now emit the first file, or the first tree that completed with an error
while let Some(entry) = self.items.first_entry() {
match entry.get() {
Err(_) => {
// File or tree with error
let (key, result) = entry.remove_entry();
return Poll::Ready(Some((key.path, result)));
}
Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
let (key, result) = entry.remove_entry();
return Poll::Ready(Some((key.path, result)));
}
_ => {
if !self.pending_trees.is_empty() {
return Poll::Pending;
}
}
};
}
} else {
Poll::Ready(None)
}

Poll::Ready(None)
}
}

Expand Down