From 8c8ef1da70cc6b318a5d0d41396b23d0120ff663 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Thu, 19 Oct 2023 11:27:55 -0700 Subject: [PATCH] merged_tree: add a `Stream` for concurrent diff off trees When diffing two trees, we currently start at the root and diff those trees. Then we diff each subtree, one at a time, recursively. When using a commit backend that uses remote storage, like our backend at Google does, diffing the subtrees one at a time gets very slow. We should be able to diff subtrees concurrently. That way, the number of roundtrips to a server becomes determined by the depth of the deepest difference instead of by the number of differing trees (times 2, even). This patch implements such an algorithm behind a `Stream` interface. It's not hooked in to `MergedTree::diff_stream()` yet; that will happen in the next commit. I timed the new implementation by updating `jj diff -s` to use the new diff stream and then ran it on the Linux repo with `jj diff --ignore-working-copy -s --from v5.0 --to v6.0`. That slowed down by ~20%, from ~750 ms to ~900 ms. Maybe we can get some of that performance back but I think it'll be hard to match `MergedTree::diff()`. We can decide later if we're okay with the difference (after hopefully reducing the gap a bit) or if we want to keep both implementations. I also timed the new implementation on our cloud-based repo at Google. As expected, it made some diffs much faster (I'm not sure if I'm allowed to share figures). --- lib/src/merged_tree.rs | 256 +++++++++++++++++++++++++++++++++- lib/tests/test_merged_tree.rs | 41 +++++- 2 files changed, 287 insertions(+), 10 deletions(-) diff --git a/lib/src/merged_tree.rs b/lib/src/merged_tree.rs index 3c4a7c93dd..879b74f1f7 100644 --- a/lib/src/merged_tree.rs +++ b/lib/src/merged_tree.rs @@ -14,15 +14,16 @@ //! A lazily merged view of a set of trees. -use std::cmp::max; -use std::collections::BTreeMap; +use std::cmp::{max, Ordering}; +use std::collections::{BTreeMap, VecDeque}; use std::iter::zip; use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::{iter, vec}; use futures::stream::StreamExt; -use futures::{Stream, TryStreamExt}; +use futures::{Future, Stream, TryStreamExt}; use itertools::Itertools; use pollster::FutureExt; @@ -836,7 +837,9 @@ enum TreeDiffItem { } impl<'matcher> TreeDiffIterator<'matcher> { - fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self { + /// Creates a iterator over the differences between two trees. Generally + /// prefer `MergedTree::diff()` of calling this directly. + pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self { let root_dir = RepoPath::root(); let mut stack = Vec::new(); if !matcher.visit(&root_dir).is_nothing() { @@ -970,6 +973,251 @@ impl Iterator for TreeDiffIterator<'_> { } } +/// Stream of differences between two trees. +pub struct TreeDiffStreamImpl<'matcher> { + matcher: &'matcher dyn Matcher, + legacy_format_before: bool, + legacy_format_after: bool, + /// Pairs of tree values that may or may not be ready to emit, sorted in the + /// order we want to emit them. If either side is a tree, there will be + /// a corresponding entry in `pending_trees`. + items: BTreeMap>, + // TODO: Is it better to combine this and `items` into a single map? + #[allow(clippy::type_complexity)] + pending_trees: VecDeque<( + RepoPath, + Pin> + 'matcher>>, + )>, + /// The maximum number of trees to request concurrently. However, we do the + /// accounting per path, so for there will often be twice as many pending + /// `Backend::read_tree()` calls - for the "before" and "after" sides. For + /// conflicts, there will be even more. + max_concurrent_reads: usize, + /// The maximum number of items in `items`. However, we will always add the + /// full differences from a particular pair of trees, so it may temporarily + /// go over the limit (until we emit those items). It may also go over the + /// limit because we have a file item that's blocked by pending subdirectory + /// items. + max_queued_items: usize, +} + +/// A wrapper around `RepoPath` that allows us to optionally sort files after +/// directories that have the file as a prefix. +#[derive(PartialEq, Eq, Clone, Debug)] +struct DiffStreamKey { + path: RepoPath, + file_after_dir: bool, +} + +impl DiffStreamKey { + fn normal(path: RepoPath) -> Self { + DiffStreamKey { + path, + file_after_dir: false, + } + } + + fn file_after_dir(path: RepoPath) -> Self { + DiffStreamKey { + path, + file_after_dir: true, + } + } +} + +impl PartialOrd for DiffStreamKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for DiffStreamKey { + fn cmp(&self, other: &Self) -> Ordering { + if self == other { + Ordering::Equal + } else if self.file_after_dir && self.path.contains(&other.path) { + Ordering::Greater + } else if other.file_after_dir && other.path.contains(&self.path) { + Ordering::Less + } else { + self.path.cmp(&other.path) + } + } +} + +impl<'matcher> TreeDiffStreamImpl<'matcher> { + /// Creates a iterator over the differences between two trees. Generally + /// prefer `MergedTree::diff_stream()` of calling this directly. + pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self { + let mut stream = Self { + matcher, + legacy_format_before: matches!(tree1, MergedTree::Legacy(_)), + legacy_format_after: matches!(tree2, MergedTree::Legacy(_)), + items: BTreeMap::new(), + pending_trees: VecDeque::new(), + // TODO: maybe the backends can suggest the conurrency limit? + max_concurrent_reads: 100, + max_queued_items: 10000, + }; + stream.add_dir_diff_items(RepoPath::root(), Ok((tree1, tree2))); + stream + } + + /// Gets the given tree if `value` is a tree, otherwise an empty tree. + async fn tree( + store: Arc, + legacy_format: bool, + dir: RepoPath, + values: MergedTreeValue, + ) -> BackendResult { + let trees = if values.is_tree() { + let builder: MergeBuilder = futures::stream::iter(values.iter()) + .then(|value| TreeDiffIterator::single_tree(&store, &dir, value.as_ref())) + .try_collect() + .await?; + builder.build() + } else { + Merge::resolved(Tree::null(store, dir.clone())) + }; + // Maintain the type of tree, so we resolve `TreeValue::Conflict` as necessary + // in the subtree + if legacy_format { + Ok(MergedTree::Legacy(trees.into_resolved().unwrap())) + } else { + Ok(MergedTree::Merge(trees)) + } + } + + fn add_dir_diff_items( + &mut self, + dir: RepoPath, + tree_diff: BackendResult<(MergedTree, MergedTree)>, + ) { + let (tree1, tree2) = match tree_diff { + Ok(trees) => trees, + Err(err) => { + self.items.insert(DiffStreamKey::normal(dir), Err(err)); + return; + } + }; + + for basename in merged_tree_basenames(&tree1, &tree2) { + let value_before = tree1.value(basename); + let value_after = tree2.value(basename); + if value_after != value_before { + let path = dir.join(basename); + let before = value_before.to_merge(); + let after = value_after.to_merge(); + let tree_before = before.is_tree(); + let tree_after = after.is_tree(); + // Check if trees and files match, but only if either side is a tree or a file + // (don't query the matcher unnecessarily). + let tree_matches = + (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing(); + let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path); + + // Replace trees or files that don't match by `Merge::absent()` + let before = if (tree_before && tree_matches) || (!tree_before && file_matches) { + before + } else { + Merge::absent() + }; + let after = if (tree_after && tree_matches) || (!tree_after && file_matches) { + after + } else { + Merge::absent() + }; + if before.is_absent() && after.is_absent() { + continue; + } + + // If the path was a tree on either side of the diff, read those trees. + if tree_matches { + let before_tree_future = Self::tree( + tree1.store().clone(), + self.legacy_format_before, + path.clone(), + before.clone(), + ); + let after_tree_future = Self::tree( + tree2.store().clone(), + self.legacy_format_after, + path.clone(), + after.clone(), + ); + let both_trees_future = + async { futures::try_join!(before_tree_future, after_tree_future) }; + self.pending_trees + .push_back((path.clone(), Box::pin(both_trees_future))); + } + + self.items + .insert(DiffStreamKey::normal(path), Ok((before, after))); + } + } + } +} + +impl Stream for TreeDiffStreamImpl<'_> { + type Item = (RepoPath, BackendResult<(MergedTreeValue, MergedTreeValue)>); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + while !(self.items.is_empty() && self.pending_trees.is_empty()) { + // Go through all pending tree futures and poll them. + let mut pending_index = 0; + while pending_index < self.pending_trees.len() + && (pending_index < self.max_concurrent_reads + || self.items.len() < self.max_queued_items) + { + let (_, future) = &mut self.pending_trees[pending_index]; + if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) { + let (dir, _) = self.pending_trees.remove(pending_index).unwrap(); + let key = DiffStreamKey::normal(dir); + // Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry + // to `self.items`. + let (before, after) = self.items.remove(&key).unwrap().unwrap(); + // If this was a transition from file to tree or vice versa, add back an item + // for just the removal/addition of the file. + if before.is_present() && !before.is_tree() { + self.items + .insert(key.clone(), Ok((before, Merge::absent()))); + } else if after.is_present() && !after.is_tree() { + self.items.insert( + DiffStreamKey::file_after_dir(key.path.clone()), + Ok((Merge::absent(), after)), + ); + } + self.add_dir_diff_items(key.path, tree_diff); + } else { + pending_index += 1; + } + } + + // Now emit the first file, or the first tree that completed with an error + while let Some(entry) = self.items.first_entry() { + match entry.get() { + Err(_) => { + // File or tree with error + let (key, result) = entry.remove_entry(); + return Poll::Ready(Some((key.path, result))); + } + Ok((before, after)) if !before.is_tree() && !after.is_tree() => { + let (key, result) = entry.remove_entry(); + return Poll::Ready(Some((key.path, result))); + } + _ => { + if !self.pending_trees.is_empty() { + return Poll::Pending; + } + } + }; + } + } + + Poll::Ready(None) + } +} + /// Helps with writing trees with conflicts. You start by creating an instance /// of this type with one or more base trees. You then add overrides on top. The /// overrides may be conflicts. Then you can write the result as a legacy tree diff --git a/lib/tests/test_merged_tree.rs b/lib/tests/test_merged_tree.rs index 0216e1968c..9e7ec1361b 100644 --- a/lib/tests/test_merged_tree.rs +++ b/lib/tests/test_merged_tree.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use futures::executor::block_on; +use futures::StreamExt; use itertools::Itertools; use jj_lib::backend::{FileId, MergedTreeId, TreeValue}; use jj_lib::files::MergeResult; -use jj_lib::matchers::{EverythingMatcher, FilesMatcher, PrefixMatcher}; +use jj_lib::matchers::{EverythingMatcher, FilesMatcher, Matcher, PrefixMatcher}; use jj_lib::merge::{Merge, MergeBuilder}; -use jj_lib::merged_tree::{MergedTree, MergedTreeBuilder, MergedTreeVal}; +use jj_lib::merged_tree::{ + MergedTree, MergedTreeBuilder, MergedTreeVal, TreeDiffIterator, TreeDiffStreamImpl, +}; use jj_lib::repo::Repo; use jj_lib::repo_path::{RepoPath, RepoPathComponent, RepoPathJoin}; use jj_lib::tree::merge_trees; @@ -31,6 +35,18 @@ fn file_value(file_id: &FileId) -> TreeValue { } } +fn diff_stream_equals_iter(tree1: &MergedTree, tree2: &MergedTree, matcher: &dyn Matcher) { + let iter_diff: Vec<_> = TreeDiffIterator::new(tree1.clone(), tree2.clone(), matcher) + .map(|(path, diff)| (path, diff.unwrap())) + .collect(); + let stream_diff: Vec<_> = block_on( + TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher) + .map(|(path, diff)| (path, diff.unwrap())) + .collect(), + ); + assert_eq!(stream_diff, iter_diff); +} + #[test] fn test_from_legacy_tree() { let test_repo = TestRepo::init(); @@ -714,6 +730,7 @@ fn test_diff_resolved() { ), ) ); + diff_stream_equals_iter(&before_merged, &after_merged, &EverythingMatcher); } /// Diff two conflicted trees @@ -805,6 +822,7 @@ fn test_diff_conflicted() { }) .collect_vec(); assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &EverythingMatcher); // Test the reverse diff let actual_diff = right_merged .diff(&left_merged, &EverythingMatcher) @@ -820,6 +838,7 @@ fn test_diff_conflicted() { }) .collect_vec(); assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&right_merged, &left_merged, &EverythingMatcher); } #[test] @@ -972,6 +991,7 @@ fn test_diff_dir_file() { ), ]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &EverythingMatcher); } // Test the reverse diff @@ -1036,12 +1056,14 @@ fn test_diff_dir_file() { ), ]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&right_merged, &left_merged, &EverythingMatcher); } // Diff while filtering by `path1` (file1 -> directory1) as a file { + let matcher = FilesMatcher::new(&[path1.clone()]); let actual_diff = left_merged - .diff(&right_merged, &FilesMatcher::new(&[path1.clone()])) + .diff(&right_merged, &matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect_vec(); let expected_diff = vec![ @@ -1052,12 +1074,14 @@ fn test_diff_dir_file() { ), ]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &matcher); } // Diff while filtering by `path1/file` (file1 -> directory1) as a file { + let matcher = FilesMatcher::new(&[path1.join(&file)]); let actual_diff = left_merged - .diff(&right_merged, &FilesMatcher::new(&[path1.join(&file)])) + .diff(&right_merged, &matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect_vec(); let expected_diff = vec![ @@ -1068,12 +1092,14 @@ fn test_diff_dir_file() { ), ]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &matcher); } // Diff while filtering by `path1` (file1 -> directory1) as a prefix { + let matcher = PrefixMatcher::new(&[path1.clone()]); let actual_diff = left_merged - .diff(&right_merged, &PrefixMatcher::new(&[path1.clone()])) + .diff(&right_merged, &matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect_vec(); let expected_diff = vec![ @@ -1087,6 +1113,7 @@ fn test_diff_dir_file() { ), ]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &matcher); } // Diff while filtering by `path6` (directory1 -> file1+(directory1-absent)) as @@ -1094,8 +1121,9 @@ fn test_diff_dir_file() { // do see the directory that's included in the conflict with a file on the right // side. { + let matcher = FilesMatcher::new(&[path6.clone()]); let actual_diff = left_merged - .diff(&right_merged, &FilesMatcher::new(&[path6.clone()])) + .diff(&right_merged, &matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect_vec(); let expected_diff = vec![( @@ -1103,6 +1131,7 @@ fn test_diff_dir_file() { (Merge::absent(), right_merged.path_value(&path6)), )]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &matcher); } }