Skip to content

Commit

Permalink
merged_tree: add a Stream for concurrent diff off trees
Browse files Browse the repository at this point in the history
When diffing two trees, we currently start at the root and diff those
trees. Then we diff each subtree, one at a time, recursively. When
using a commit backend that uses remote storage, like our backend at
Google does, diffing the subtrees one at a time gets very slow. We
should be able to diff subtrees concurrently. That way, the number of
roundtrips to a server becomes determined by the depth of the deepest
difference instead of by the number of differing trees (times 2,
even). This patch implements such an algorithm behind a `Stream`
interface. It's not hooked in to `MergedTree::diff_stream()` yet; that
will happen in the next commit.

I timed the new implementation by updating `jj diff -s` to use the new
diff stream and then ran it on the Linux repo with `jj diff
--ignore-working-copy -s --from v5.0 --to v6.0`. That slowed down by
~20%, from ~750 ms to ~900 ms. Maybe we can get some of that
performance back but I think it'll be hard to match
`MergedTree::diff()`. We can decide later if we're okay with the
difference (after hopefully reducing the gap a bit) or if we want to
keep both implementations.

I also timed the new implementation on our cloud-based repo at
Google. As expected, it made some diffs much faster (I'm not sure if
I'm allowed to share figures).
  • Loading branch information
martinvonz committed Nov 7, 2023
1 parent 41a6511 commit 8c8ef1d
Show file tree
Hide file tree
Showing 2 changed files with 287 additions and 10 deletions.
256 changes: 252 additions & 4 deletions lib/src/merged_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@

//! A lazily merged view of a set of trees.
use std::cmp::max;
use std::collections::BTreeMap;
use std::cmp::{max, Ordering};
use std::collections::{BTreeMap, VecDeque};
use std::iter::zip;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{iter, vec};

use futures::stream::StreamExt;
use futures::{Stream, TryStreamExt};
use futures::{Future, Stream, TryStreamExt};
use itertools::Itertools;
use pollster::FutureExt;

Expand Down Expand Up @@ -836,7 +837,9 @@ enum TreeDiffItem {
}

impl<'matcher> TreeDiffIterator<'matcher> {
fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self {
/// Creates a iterator over the differences between two trees. Generally
/// prefer `MergedTree::diff()` of calling this directly.
pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self {
let root_dir = RepoPath::root();
let mut stack = Vec::new();
if !matcher.visit(&root_dir).is_nothing() {
Expand Down Expand Up @@ -970,6 +973,251 @@ impl Iterator for TreeDiffIterator<'_> {
}
}

/// Stream of differences between two trees.
pub struct TreeDiffStreamImpl<'matcher> {
matcher: &'matcher dyn Matcher,
legacy_format_before: bool,
legacy_format_after: bool,
/// Pairs of tree values that may or may not be ready to emit, sorted in the
/// order we want to emit them. If either side is a tree, there will be
/// a corresponding entry in `pending_trees`.
items: BTreeMap<DiffStreamKey, BackendResult<(MergedTreeValue, MergedTreeValue)>>,
// TODO: Is it better to combine this and `items` into a single map?
#[allow(clippy::type_complexity)]
pending_trees: VecDeque<(
RepoPath,
Pin<Box<dyn Future<Output = BackendResult<(MergedTree, MergedTree)>> + 'matcher>>,
)>,
/// The maximum number of trees to request concurrently. However, we do the
/// accounting per path, so for there will often be twice as many pending
/// `Backend::read_tree()` calls - for the "before" and "after" sides. For
/// conflicts, there will be even more.
max_concurrent_reads: usize,
/// The maximum number of items in `items`. However, we will always add the
/// full differences from a particular pair of trees, so it may temporarily
/// go over the limit (until we emit those items). It may also go over the
/// limit because we have a file item that's blocked by pending subdirectory
/// items.
max_queued_items: usize,
}

/// A wrapper around `RepoPath` that allows us to optionally sort files after
/// directories that have the file as a prefix.
#[derive(PartialEq, Eq, Clone, Debug)]
struct DiffStreamKey {
path: RepoPath,
file_after_dir: bool,
}

impl DiffStreamKey {
fn normal(path: RepoPath) -> Self {
DiffStreamKey {
path,
file_after_dir: false,
}
}

fn file_after_dir(path: RepoPath) -> Self {
DiffStreamKey {
path,
file_after_dir: true,
}
}
}

impl PartialOrd for DiffStreamKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for DiffStreamKey {
fn cmp(&self, other: &Self) -> Ordering {
if self == other {
Ordering::Equal
} else if self.file_after_dir && self.path.contains(&other.path) {
Ordering::Greater
} else if other.file_after_dir && other.path.contains(&self.path) {
Ordering::Less
} else {
self.path.cmp(&other.path)
}
}
}

impl<'matcher> TreeDiffStreamImpl<'matcher> {
/// Creates a iterator over the differences between two trees. Generally
/// prefer `MergedTree::diff_stream()` of calling this directly.
pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self {
let mut stream = Self {
matcher,
legacy_format_before: matches!(tree1, MergedTree::Legacy(_)),
legacy_format_after: matches!(tree2, MergedTree::Legacy(_)),
items: BTreeMap::new(),
pending_trees: VecDeque::new(),
// TODO: maybe the backends can suggest the conurrency limit?
max_concurrent_reads: 100,
max_queued_items: 10000,
};
stream.add_dir_diff_items(RepoPath::root(), Ok((tree1, tree2)));
stream
}

/// Gets the given tree if `value` is a tree, otherwise an empty tree.
async fn tree(
store: Arc<Store>,
legacy_format: bool,
dir: RepoPath,
values: MergedTreeValue,
) -> BackendResult<MergedTree> {
let trees = if values.is_tree() {
let builder: MergeBuilder<Tree> = futures::stream::iter(values.iter())
.then(|value| TreeDiffIterator::single_tree(&store, &dir, value.as_ref()))
.try_collect()
.await?;
builder.build()
} else {
Merge::resolved(Tree::null(store, dir.clone()))
};
// Maintain the type of tree, so we resolve `TreeValue::Conflict` as necessary
// in the subtree
if legacy_format {
Ok(MergedTree::Legacy(trees.into_resolved().unwrap()))
} else {
Ok(MergedTree::Merge(trees))
}
}

fn add_dir_diff_items(
&mut self,
dir: RepoPath,
tree_diff: BackendResult<(MergedTree, MergedTree)>,
) {
let (tree1, tree2) = match tree_diff {
Ok(trees) => trees,
Err(err) => {
self.items.insert(DiffStreamKey::normal(dir), Err(err));
return;
}
};

for basename in merged_tree_basenames(&tree1, &tree2) {
let value_before = tree1.value(basename);
let value_after = tree2.value(basename);
if value_after != value_before {
let path = dir.join(basename);
let before = value_before.to_merge();
let after = value_after.to_merge();
let tree_before = before.is_tree();
let tree_after = after.is_tree();
// Check if trees and files match, but only if either side is a tree or a file
// (don't query the matcher unnecessarily).
let tree_matches =
(tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);

// Replace trees or files that don't match by `Merge::absent()`
let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
before
} else {
Merge::absent()
};
let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
after
} else {
Merge::absent()
};
if before.is_absent() && after.is_absent() {
continue;
}

// If the path was a tree on either side of the diff, read those trees.
if tree_matches {
let before_tree_future = Self::tree(
tree1.store().clone(),
self.legacy_format_before,
path.clone(),
before.clone(),
);
let after_tree_future = Self::tree(
tree2.store().clone(),
self.legacy_format_after,
path.clone(),
after.clone(),
);
let both_trees_future =
async { futures::try_join!(before_tree_future, after_tree_future) };
self.pending_trees
.push_back((path.clone(), Box::pin(both_trees_future)));
}

self.items
.insert(DiffStreamKey::normal(path), Ok((before, after)));
}
}
}
}

impl Stream for TreeDiffStreamImpl<'_> {
type Item = (RepoPath, BackendResult<(MergedTreeValue, MergedTreeValue)>);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while !(self.items.is_empty() && self.pending_trees.is_empty()) {
// Go through all pending tree futures and poll them.
let mut pending_index = 0;
while pending_index < self.pending_trees.len()
&& (pending_index < self.max_concurrent_reads
|| self.items.len() < self.max_queued_items)
{
let (_, future) = &mut self.pending_trees[pending_index];
if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
let key = DiffStreamKey::normal(dir);
// Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry
// to `self.items`.
let (before, after) = self.items.remove(&key).unwrap().unwrap();
// If this was a transition from file to tree or vice versa, add back an item
// for just the removal/addition of the file.
if before.is_present() && !before.is_tree() {
self.items
.insert(key.clone(), Ok((before, Merge::absent())));
} else if after.is_present() && !after.is_tree() {
self.items.insert(
DiffStreamKey::file_after_dir(key.path.clone()),
Ok((Merge::absent(), after)),
);
}
self.add_dir_diff_items(key.path, tree_diff);
} else {
pending_index += 1;
}
}

// Now emit the first file, or the first tree that completed with an error
while let Some(entry) = self.items.first_entry() {
match entry.get() {
Err(_) => {
// File or tree with error
let (key, result) = entry.remove_entry();
return Poll::Ready(Some((key.path, result)));
}
Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
let (key, result) = entry.remove_entry();
return Poll::Ready(Some((key.path, result)));
}
_ => {
if !self.pending_trees.is_empty() {
return Poll::Pending;
}
}
};
}
}

Poll::Ready(None)
}
}

/// Helps with writing trees with conflicts. You start by creating an instance
/// of this type with one or more base trees. You then add overrides on top. The
/// overrides may be conflicts. Then you can write the result as a legacy tree
Expand Down
Loading

0 comments on commit 8c8ef1d

Please sign in to comment.