From 7cef4e11183a67efcc24b61af24735c17d0ae0b7 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Sun, 29 Oct 2023 07:03:15 -0700 Subject: [PATCH 1/6] merged_tree: rename `all_tree_conflict_names()` since it's not about conflicts --- lib/src/merged_tree.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/src/merged_tree.rs b/lib/src/merged_tree.rs index b8708a970c..f86cd29864 100644 --- a/lib/src/merged_tree.rs +++ b/lib/src/merged_tree.rs @@ -170,7 +170,7 @@ impl MergedTree { pub fn names<'a>(&'a self) -> Box + 'a> { match self { MergedTree::Legacy(tree) => Box::new(tree.data().names()), - MergedTree::Merge(conflict) => Box::new(all_tree_conflict_names(conflict)), + MergedTree::Merge(conflict) => Box::new(all_tree_basenames(conflict)), } } @@ -434,7 +434,7 @@ pub type TreeDiffStream<'matcher> = Pin< >, >; -fn all_tree_conflict_names(trees: &Merge) -> impl Iterator { +fn all_tree_basenames(trees: &Merge) -> impl Iterator { trees .iter() .map(|tree| tree.data().names()) @@ -455,7 +455,7 @@ fn merge_trees(merge: &Merge) -> Result, TreeMergeError> { // any conflicts. let mut new_tree = backend::Tree::default(); let mut conflicts = vec![]; - for basename in all_tree_conflict_names(merge) { + for basename in all_tree_basenames(merge) { let path_merge = merge.map(|tree| tree.value(basename).cloned()); let path = dir.join(basename); let path_merge = merge_tree_values(store, &path, path_merge)?; @@ -638,7 +638,7 @@ impl<'a> ConflictEntriesNonRecursiveIterator<'a> { if trees.is_resolved() { Box::new(iter::empty()) } else { - Box::new(all_tree_conflict_names(trees)) + Box::new(all_tree_basenames(trees)) } } }; @@ -774,15 +774,14 @@ impl<'a> TreeEntryDiffIterator<'a> { merge_iters(before.data().names(), after.data().names()) } (MergedTree::Merge(before), MergedTree::Legacy(after)) => { - merge_iters(all_tree_conflict_names(before), after.data().names()) + merge_iters(all_tree_basenames(before), after.data().names()) } (MergedTree::Legacy(before), MergedTree::Merge(after)) => { - merge_iters(before.data().names(), all_tree_conflict_names(after)) + merge_iters(before.data().names(), all_tree_basenames(after)) + } + (MergedTree::Merge(before), MergedTree::Merge(after)) => { + merge_iters(all_tree_basenames(before), all_tree_basenames(after)) } - (MergedTree::Merge(before), MergedTree::Merge(after)) => merge_iters( - all_tree_conflict_names(before), - all_tree_conflict_names(after), - ), }; TreeEntryDiffIterator { before, From a9d70e1c6dd5311713bd5f40afcae2ead355f5c1 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Sun, 29 Oct 2023 07:17:54 -0700 Subject: [PATCH 2/6] merged_tree: extract function for merged iterator of basenames in diff I'm going to reuse this for stream/async diffing. --- lib/src/merged_tree.rs | 48 ++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/lib/src/merged_tree.rs b/lib/src/merged_tree.rs index f86cd29864..3c4a7c93dd 100644 --- a/lib/src/merged_tree.rs +++ b/lib/src/merged_tree.rs @@ -442,6 +442,32 @@ fn all_tree_basenames(trees: &Merge) -> impl Iterator( + tree1: &'a MergedTree, + tree2: &'a MergedTree, +) -> Box + 'a> { + fn merge_iters<'a>( + iter1: impl Iterator + 'a, + iter2: impl Iterator + 'a, + ) -> Box + 'a> { + Box::new(iter1.merge(iter2).dedup()) + } + match (&tree1, &tree2) { + (MergedTree::Legacy(before), MergedTree::Legacy(after)) => { + merge_iters(before.data().names(), after.data().names()) + } + (MergedTree::Merge(before), MergedTree::Legacy(after)) => { + merge_iters(all_tree_basenames(before), after.data().names()) + } + (MergedTree::Legacy(before), MergedTree::Merge(after)) => { + merge_iters(before.data().names(), all_tree_basenames(after)) + } + (MergedTree::Merge(before), MergedTree::Merge(after)) => { + merge_iters(all_tree_basenames(before), all_tree_basenames(after)) + } + } +} + fn merge_trees(merge: &Merge) -> Result, TreeMergeError> { if let Some(tree) = merge.resolve_trivial() { return Ok(Merge::resolved(tree.clone())); @@ -762,27 +788,7 @@ struct TreeEntryDiffIterator<'a> { impl<'a> TreeEntryDiffIterator<'a> { fn new(before: &'a MergedTree, after: &'a MergedTree) -> Self { - fn merge_iters<'a>( - iter1: impl Iterator + 'a, - iter2: impl Iterator + 'a, - ) -> Box + 'a> { - Box::new(iter1.merge(iter2).dedup()) - } - let basename_iter: Box + 'a> = - match (before, after) { - (MergedTree::Legacy(before), MergedTree::Legacy(after)) => { - merge_iters(before.data().names(), after.data().names()) - } - (MergedTree::Merge(before), MergedTree::Legacy(after)) => { - merge_iters(all_tree_basenames(before), after.data().names()) - } - (MergedTree::Legacy(before), MergedTree::Merge(after)) => { - merge_iters(before.data().names(), all_tree_basenames(after)) - } - (MergedTree::Merge(before), MergedTree::Merge(after)) => { - merge_iters(all_tree_basenames(before), all_tree_basenames(after)) - } - }; + let basename_iter = merged_tree_basenames(before, after); TreeEntryDiffIterator { before, after, From 6fcc6b997939b2625cecb85062ee647084d3dcd9 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Sat, 4 Nov 2023 15:51:39 -0700 Subject: [PATCH 3/6] test_merged_tree: nest each part of `test_diff_dir_file()` I'm about to add a few more checks for diffing with a matcher. I think it will help make it readable and reduce the risk of mixing up variables between each part of the test if we use some nested blocks. I also removed some unnecessary `.clone()` calls while at it. --- lib/tests/test_merged_tree.rs | 248 +++++++++++++++++----------------- 1 file changed, 124 insertions(+), 124 deletions(-) diff --git a/lib/tests/test_merged_tree.rs b/lib/tests/test_merged_tree.rs index 545dfdea7f..1bf72b994b 100644 --- a/lib/tests/test_merged_tree.rs +++ b/lib/tests/test_merged_tree.rs @@ -906,137 +906,137 @@ fn test_diff_dir_file() { (&path6.join(&file), "right"), ], ); - let left_merged = MergedTree::new(Merge::new( - vec![left_base.clone()], - vec![left_side1.clone(), left_side2.clone()], - )); - let right_merged = MergedTree::new(Merge::new( - vec![right_base.clone()], - vec![right_side1.clone(), right_side2.clone()], - )); + let left_merged = MergedTree::new(Merge::new(vec![left_base], vec![left_side1, left_side2])); + let right_merged = + MergedTree::new(Merge::new(vec![right_base], vec![right_side1, right_side2])); // Test the forwards diff - let actual_diff = left_merged - .diff(&right_merged, &EverythingMatcher) - .map(|(path, diff)| (path, diff.unwrap())) - .collect_vec(); - let expected_diff = vec![ - // path1: file1 -> directory1 - ( - path1.clone(), - (left_merged.path_value(&path1), Merge::absent()), - ), - ( - path1.join(&file), - (Merge::absent(), right_merged.path_value(&path1.join(&file))), - ), - // path2: file1 -> directory1+(directory2-absent) - ( - path2.clone(), - (left_merged.path_value(&path2), Merge::absent()), - ), - ( - path2.join(&file), - (Merge::absent(), right_merged.path_value(&path2.join(&file))), - ), - // path3: file1 -> directory1+(file1-absent) - ( - path3.clone(), + { + let actual_diff = left_merged + .diff(&right_merged, &EverythingMatcher) + .map(|(path, diff)| (path, diff.unwrap())) + .collect_vec(); + let expected_diff = vec![ + // path1: file1 -> directory1 ( - left_merged.path_value(&path3), - right_merged.path_value(&path3), + path1.clone(), + (left_merged.path_value(&path1), Merge::absent()), ), - ), - // path4: file1+(file2-file3) -> directory1+(directory2-directory3) - ( - path4.clone(), - (left_merged.path_value(&path4), Merge::absent()), - ), - ( - path4.join(&file), - (Merge::absent(), right_merged.path_value(&path4.join(&file))), - ), - // path5: directory1 -> file1+(file2-absent) - ( - path5.join(&file), - (left_merged.path_value(&path5.join(&file)), Merge::absent()), - ), - ( - path5.clone(), - (Merge::absent(), right_merged.path_value(&path5)), - ), - // path6: directory1 -> file1+(directory1-absent) - ( - path6.join(&file), - (left_merged.path_value(&path6.join(&file)), Merge::absent()), - ), - ( - path6.clone(), - (Merge::absent(), right_merged.path_value(&path6)), - ), - ]; - assert_eq!(actual_diff, expected_diff); + ( + path1.join(&file), + (Merge::absent(), right_merged.path_value(&path1.join(&file))), + ), + // path2: file1 -> directory1+(directory2-absent) + ( + path2.clone(), + (left_merged.path_value(&path2), Merge::absent()), + ), + ( + path2.join(&file), + (Merge::absent(), right_merged.path_value(&path2.join(&file))), + ), + // path3: file1 -> directory1+(file1-absent) + ( + path3.clone(), + ( + left_merged.path_value(&path3), + right_merged.path_value(&path3), + ), + ), + // path4: file1+(file2-file3) -> directory1+(directory2-directory3) + ( + path4.clone(), + (left_merged.path_value(&path4), Merge::absent()), + ), + ( + path4.join(&file), + (Merge::absent(), right_merged.path_value(&path4.join(&file))), + ), + // path5: directory1 -> file1+(file2-absent) + ( + path5.join(&file), + (left_merged.path_value(&path5.join(&file)), Merge::absent()), + ), + ( + path5.clone(), + (Merge::absent(), right_merged.path_value(&path5)), + ), + // path6: directory1 -> file1+(directory1-absent) + ( + path6.join(&file), + (left_merged.path_value(&path6.join(&file)), Merge::absent()), + ), + ( + path6.clone(), + (Merge::absent(), right_merged.path_value(&path6)), + ), + ]; + assert_eq!(actual_diff, expected_diff); + } + // Test the reverse diff - let actual_diff = right_merged - .diff(&left_merged, &EverythingMatcher) - .map(|(path, diff)| (path, diff.unwrap())) - .collect_vec(); - let expected_diff = vec![ - // path1: file1 -> directory1 - ( - path1.join(&file), - (right_merged.path_value(&path1.join(&file)), Merge::absent()), - ), - ( - path1.clone(), - (Merge::absent(), left_merged.path_value(&path1)), - ), - // path2: file1 -> directory1+(directory2-absent) - ( - path2.join(&file), - (right_merged.path_value(&path2.join(&file)), Merge::absent()), - ), - ( - path2.clone(), - (Merge::absent(), left_merged.path_value(&path2)), - ), - // path3: file1 -> directory1+(file1-absent) - ( - path3.clone(), + { + let actual_diff = right_merged + .diff(&left_merged, &EverythingMatcher) + .map(|(path, diff)| (path, diff.unwrap())) + .collect_vec(); + let expected_diff = vec![ + // path1: file1 -> directory1 ( - right_merged.path_value(&path3), - left_merged.path_value(&path3), + path1.join(&file), + (right_merged.path_value(&path1.join(&file)), Merge::absent()), ), - ), - // path4: file1+(file2-file3) -> directory1+(directory2-directory3) - ( - path4.join(&file), - (right_merged.path_value(&path4.join(&file)), Merge::absent()), - ), - ( - path4.clone(), - (Merge::absent(), left_merged.path_value(&path4)), - ), - // path5: directory1 -> file1+(file2-absent) - ( - path5.clone(), - (right_merged.path_value(&path5), Merge::absent()), - ), - ( - path5.join(&file), - (Merge::absent(), left_merged.path_value(&path5.join(&file))), - ), - // path6: directory1 -> file1+(directory1-absent) - ( - path6.clone(), - (right_merged.path_value(&path6), Merge::absent()), - ), - ( - path6.join(&file), - (Merge::absent(), left_merged.path_value(&path6.join(&file))), - ), - ]; - assert_eq!(actual_diff, expected_diff); + ( + path1.clone(), + (Merge::absent(), left_merged.path_value(&path1)), + ), + // path2: file1 -> directory1+(directory2-absent) + ( + path2.join(&file), + (right_merged.path_value(&path2.join(&file)), Merge::absent()), + ), + ( + path2.clone(), + (Merge::absent(), left_merged.path_value(&path2)), + ), + // path3: file1 -> directory1+(file1-absent) + ( + path3.clone(), + ( + right_merged.path_value(&path3), + left_merged.path_value(&path3), + ), + ), + // path4: file1+(file2-file3) -> directory1+(directory2-directory3) + ( + path4.join(&file), + (right_merged.path_value(&path4.join(&file)), Merge::absent()), + ), + ( + path4.clone(), + (Merge::absent(), left_merged.path_value(&path4)), + ), + // path5: directory1 -> file1+(file2-absent) + ( + path5.clone(), + (right_merged.path_value(&path5), Merge::absent()), + ), + ( + path5.join(&file), + (Merge::absent(), left_merged.path_value(&path5.join(&file))), + ), + // path6: directory1 -> file1+(directory1-absent) + ( + path6.clone(), + (right_merged.path_value(&path6), Merge::absent()), + ), + ( + path6.join(&file), + (Merge::absent(), left_merged.path_value(&path6.join(&file))), + ), + ]; + assert_eq!(actual_diff, expected_diff); + } } /// Merge 3 resolved trees that can be resolved From 41a6511bf7b829e69dd00342fad05a3eca788e48 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Sun, 5 Nov 2023 10:06:47 -0800 Subject: [PATCH 4/6] test_meregd_tree: test diffing with a matcher We didn't have any tests at all for `MergedTree::diff()` with a matcher other than `EverythingMatcher`. This patch adds a few. --- lib/tests/test_merged_tree.rs | 69 ++++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/lib/tests/test_merged_tree.rs b/lib/tests/test_merged_tree.rs index 1bf72b994b..0216e1968c 100644 --- a/lib/tests/test_merged_tree.rs +++ b/lib/tests/test_merged_tree.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use jj_lib::backend::{FileId, MergedTreeId, TreeValue}; use jj_lib::files::MergeResult; -use jj_lib::matchers::{EverythingMatcher, FilesMatcher}; +use jj_lib::matchers::{EverythingMatcher, FilesMatcher, PrefixMatcher}; use jj_lib::merge::{Merge, MergeBuilder}; use jj_lib::merged_tree::{MergedTree, MergedTreeBuilder, MergedTreeVal}; use jj_lib::repo::Repo; @@ -1037,6 +1037,73 @@ fn test_diff_dir_file() { ]; assert_eq!(actual_diff, expected_diff); } + + // Diff while filtering by `path1` (file1 -> directory1) as a file + { + let actual_diff = left_merged + .diff(&right_merged, &FilesMatcher::new(&[path1.clone()])) + .map(|(path, diff)| (path, diff.unwrap())) + .collect_vec(); + let expected_diff = vec![ + // path1: file1 -> directory1 + ( + path1.clone(), + (left_merged.path_value(&path1), Merge::absent()), + ), + ]; + assert_eq!(actual_diff, expected_diff); + } + + // Diff while filtering by `path1/file` (file1 -> directory1) as a file + { + let actual_diff = left_merged + .diff(&right_merged, &FilesMatcher::new(&[path1.join(&file)])) + .map(|(path, diff)| (path, diff.unwrap())) + .collect_vec(); + let expected_diff = vec![ + // path1: file1 -> directory1 + ( + path1.join(&file), + (Merge::absent(), right_merged.path_value(&path1.join(&file))), + ), + ]; + assert_eq!(actual_diff, expected_diff); + } + + // Diff while filtering by `path1` (file1 -> directory1) as a prefix + { + let actual_diff = left_merged + .diff(&right_merged, &PrefixMatcher::new(&[path1.clone()])) + .map(|(path, diff)| (path, diff.unwrap())) + .collect_vec(); + let expected_diff = vec![ + ( + path1.clone(), + (left_merged.path_value(&path1), Merge::absent()), + ), + ( + path1.join(&file), + (Merge::absent(), right_merged.path_value(&path1.join(&file))), + ), + ]; + assert_eq!(actual_diff, expected_diff); + } + + // Diff while filtering by `path6` (directory1 -> file1+(directory1-absent)) as + // a file. We don't see the directory at `path6` on the left side, but we + // do see the directory that's included in the conflict with a file on the right + // side. + { + let actual_diff = left_merged + .diff(&right_merged, &FilesMatcher::new(&[path6.clone()])) + .map(|(path, diff)| (path, diff.unwrap())) + .collect_vec(); + let expected_diff = vec![( + path6.clone(), + (Merge::absent(), right_merged.path_value(&path6)), + )]; + assert_eq!(actual_diff, expected_diff); + } } /// Merge 3 resolved trees that can be resolved From 8c8ef1da70cc6b318a5d0d41396b23d0120ff663 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Thu, 19 Oct 2023 11:27:55 -0700 Subject: [PATCH 5/6] merged_tree: add a `Stream` for concurrent diff off trees When diffing two trees, we currently start at the root and diff those trees. Then we diff each subtree, one at a time, recursively. When using a commit backend that uses remote storage, like our backend at Google does, diffing the subtrees one at a time gets very slow. We should be able to diff subtrees concurrently. That way, the number of roundtrips to a server becomes determined by the depth of the deepest difference instead of by the number of differing trees (times 2, even). This patch implements such an algorithm behind a `Stream` interface. It's not hooked in to `MergedTree::diff_stream()` yet; that will happen in the next commit. I timed the new implementation by updating `jj diff -s` to use the new diff stream and then ran it on the Linux repo with `jj diff --ignore-working-copy -s --from v5.0 --to v6.0`. That slowed down by ~20%, from ~750 ms to ~900 ms. Maybe we can get some of that performance back but I think it'll be hard to match `MergedTree::diff()`. We can decide later if we're okay with the difference (after hopefully reducing the gap a bit) or if we want to keep both implementations. I also timed the new implementation on our cloud-based repo at Google. As expected, it made some diffs much faster (I'm not sure if I'm allowed to share figures). --- lib/src/merged_tree.rs | 256 +++++++++++++++++++++++++++++++++- lib/tests/test_merged_tree.rs | 41 +++++- 2 files changed, 287 insertions(+), 10 deletions(-) diff --git a/lib/src/merged_tree.rs b/lib/src/merged_tree.rs index 3c4a7c93dd..879b74f1f7 100644 --- a/lib/src/merged_tree.rs +++ b/lib/src/merged_tree.rs @@ -14,15 +14,16 @@ //! A lazily merged view of a set of trees. -use std::cmp::max; -use std::collections::BTreeMap; +use std::cmp::{max, Ordering}; +use std::collections::{BTreeMap, VecDeque}; use std::iter::zip; use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::{iter, vec}; use futures::stream::StreamExt; -use futures::{Stream, TryStreamExt}; +use futures::{Future, Stream, TryStreamExt}; use itertools::Itertools; use pollster::FutureExt; @@ -836,7 +837,9 @@ enum TreeDiffItem { } impl<'matcher> TreeDiffIterator<'matcher> { - fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self { + /// Creates a iterator over the differences between two trees. Generally + /// prefer `MergedTree::diff()` of calling this directly. + pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self { let root_dir = RepoPath::root(); let mut stack = Vec::new(); if !matcher.visit(&root_dir).is_nothing() { @@ -970,6 +973,251 @@ impl Iterator for TreeDiffIterator<'_> { } } +/// Stream of differences between two trees. +pub struct TreeDiffStreamImpl<'matcher> { + matcher: &'matcher dyn Matcher, + legacy_format_before: bool, + legacy_format_after: bool, + /// Pairs of tree values that may or may not be ready to emit, sorted in the + /// order we want to emit them. If either side is a tree, there will be + /// a corresponding entry in `pending_trees`. + items: BTreeMap>, + // TODO: Is it better to combine this and `items` into a single map? + #[allow(clippy::type_complexity)] + pending_trees: VecDeque<( + RepoPath, + Pin> + 'matcher>>, + )>, + /// The maximum number of trees to request concurrently. However, we do the + /// accounting per path, so for there will often be twice as many pending + /// `Backend::read_tree()` calls - for the "before" and "after" sides. For + /// conflicts, there will be even more. + max_concurrent_reads: usize, + /// The maximum number of items in `items`. However, we will always add the + /// full differences from a particular pair of trees, so it may temporarily + /// go over the limit (until we emit those items). It may also go over the + /// limit because we have a file item that's blocked by pending subdirectory + /// items. + max_queued_items: usize, +} + +/// A wrapper around `RepoPath` that allows us to optionally sort files after +/// directories that have the file as a prefix. +#[derive(PartialEq, Eq, Clone, Debug)] +struct DiffStreamKey { + path: RepoPath, + file_after_dir: bool, +} + +impl DiffStreamKey { + fn normal(path: RepoPath) -> Self { + DiffStreamKey { + path, + file_after_dir: false, + } + } + + fn file_after_dir(path: RepoPath) -> Self { + DiffStreamKey { + path, + file_after_dir: true, + } + } +} + +impl PartialOrd for DiffStreamKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for DiffStreamKey { + fn cmp(&self, other: &Self) -> Ordering { + if self == other { + Ordering::Equal + } else if self.file_after_dir && self.path.contains(&other.path) { + Ordering::Greater + } else if other.file_after_dir && other.path.contains(&self.path) { + Ordering::Less + } else { + self.path.cmp(&other.path) + } + } +} + +impl<'matcher> TreeDiffStreamImpl<'matcher> { + /// Creates a iterator over the differences between two trees. Generally + /// prefer `MergedTree::diff_stream()` of calling this directly. + pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self { + let mut stream = Self { + matcher, + legacy_format_before: matches!(tree1, MergedTree::Legacy(_)), + legacy_format_after: matches!(tree2, MergedTree::Legacy(_)), + items: BTreeMap::new(), + pending_trees: VecDeque::new(), + // TODO: maybe the backends can suggest the conurrency limit? + max_concurrent_reads: 100, + max_queued_items: 10000, + }; + stream.add_dir_diff_items(RepoPath::root(), Ok((tree1, tree2))); + stream + } + + /// Gets the given tree if `value` is a tree, otherwise an empty tree. + async fn tree( + store: Arc, + legacy_format: bool, + dir: RepoPath, + values: MergedTreeValue, + ) -> BackendResult { + let trees = if values.is_tree() { + let builder: MergeBuilder = futures::stream::iter(values.iter()) + .then(|value| TreeDiffIterator::single_tree(&store, &dir, value.as_ref())) + .try_collect() + .await?; + builder.build() + } else { + Merge::resolved(Tree::null(store, dir.clone())) + }; + // Maintain the type of tree, so we resolve `TreeValue::Conflict` as necessary + // in the subtree + if legacy_format { + Ok(MergedTree::Legacy(trees.into_resolved().unwrap())) + } else { + Ok(MergedTree::Merge(trees)) + } + } + + fn add_dir_diff_items( + &mut self, + dir: RepoPath, + tree_diff: BackendResult<(MergedTree, MergedTree)>, + ) { + let (tree1, tree2) = match tree_diff { + Ok(trees) => trees, + Err(err) => { + self.items.insert(DiffStreamKey::normal(dir), Err(err)); + return; + } + }; + + for basename in merged_tree_basenames(&tree1, &tree2) { + let value_before = tree1.value(basename); + let value_after = tree2.value(basename); + if value_after != value_before { + let path = dir.join(basename); + let before = value_before.to_merge(); + let after = value_after.to_merge(); + let tree_before = before.is_tree(); + let tree_after = after.is_tree(); + // Check if trees and files match, but only if either side is a tree or a file + // (don't query the matcher unnecessarily). + let tree_matches = + (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing(); + let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path); + + // Replace trees or files that don't match by `Merge::absent()` + let before = if (tree_before && tree_matches) || (!tree_before && file_matches) { + before + } else { + Merge::absent() + }; + let after = if (tree_after && tree_matches) || (!tree_after && file_matches) { + after + } else { + Merge::absent() + }; + if before.is_absent() && after.is_absent() { + continue; + } + + // If the path was a tree on either side of the diff, read those trees. + if tree_matches { + let before_tree_future = Self::tree( + tree1.store().clone(), + self.legacy_format_before, + path.clone(), + before.clone(), + ); + let after_tree_future = Self::tree( + tree2.store().clone(), + self.legacy_format_after, + path.clone(), + after.clone(), + ); + let both_trees_future = + async { futures::try_join!(before_tree_future, after_tree_future) }; + self.pending_trees + .push_back((path.clone(), Box::pin(both_trees_future))); + } + + self.items + .insert(DiffStreamKey::normal(path), Ok((before, after))); + } + } + } +} + +impl Stream for TreeDiffStreamImpl<'_> { + type Item = (RepoPath, BackendResult<(MergedTreeValue, MergedTreeValue)>); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + while !(self.items.is_empty() && self.pending_trees.is_empty()) { + // Go through all pending tree futures and poll them. + let mut pending_index = 0; + while pending_index < self.pending_trees.len() + && (pending_index < self.max_concurrent_reads + || self.items.len() < self.max_queued_items) + { + let (_, future) = &mut self.pending_trees[pending_index]; + if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) { + let (dir, _) = self.pending_trees.remove(pending_index).unwrap(); + let key = DiffStreamKey::normal(dir); + // Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry + // to `self.items`. + let (before, after) = self.items.remove(&key).unwrap().unwrap(); + // If this was a transition from file to tree or vice versa, add back an item + // for just the removal/addition of the file. + if before.is_present() && !before.is_tree() { + self.items + .insert(key.clone(), Ok((before, Merge::absent()))); + } else if after.is_present() && !after.is_tree() { + self.items.insert( + DiffStreamKey::file_after_dir(key.path.clone()), + Ok((Merge::absent(), after)), + ); + } + self.add_dir_diff_items(key.path, tree_diff); + } else { + pending_index += 1; + } + } + + // Now emit the first file, or the first tree that completed with an error + while let Some(entry) = self.items.first_entry() { + match entry.get() { + Err(_) => { + // File or tree with error + let (key, result) = entry.remove_entry(); + return Poll::Ready(Some((key.path, result))); + } + Ok((before, after)) if !before.is_tree() && !after.is_tree() => { + let (key, result) = entry.remove_entry(); + return Poll::Ready(Some((key.path, result))); + } + _ => { + if !self.pending_trees.is_empty() { + return Poll::Pending; + } + } + }; + } + } + + Poll::Ready(None) + } +} + /// Helps with writing trees with conflicts. You start by creating an instance /// of this type with one or more base trees. You then add overrides on top. The /// overrides may be conflicts. Then you can write the result as a legacy tree diff --git a/lib/tests/test_merged_tree.rs b/lib/tests/test_merged_tree.rs index 0216e1968c..9e7ec1361b 100644 --- a/lib/tests/test_merged_tree.rs +++ b/lib/tests/test_merged_tree.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use futures::executor::block_on; +use futures::StreamExt; use itertools::Itertools; use jj_lib::backend::{FileId, MergedTreeId, TreeValue}; use jj_lib::files::MergeResult; -use jj_lib::matchers::{EverythingMatcher, FilesMatcher, PrefixMatcher}; +use jj_lib::matchers::{EverythingMatcher, FilesMatcher, Matcher, PrefixMatcher}; use jj_lib::merge::{Merge, MergeBuilder}; -use jj_lib::merged_tree::{MergedTree, MergedTreeBuilder, MergedTreeVal}; +use jj_lib::merged_tree::{ + MergedTree, MergedTreeBuilder, MergedTreeVal, TreeDiffIterator, TreeDiffStreamImpl, +}; use jj_lib::repo::Repo; use jj_lib::repo_path::{RepoPath, RepoPathComponent, RepoPathJoin}; use jj_lib::tree::merge_trees; @@ -31,6 +35,18 @@ fn file_value(file_id: &FileId) -> TreeValue { } } +fn diff_stream_equals_iter(tree1: &MergedTree, tree2: &MergedTree, matcher: &dyn Matcher) { + let iter_diff: Vec<_> = TreeDiffIterator::new(tree1.clone(), tree2.clone(), matcher) + .map(|(path, diff)| (path, diff.unwrap())) + .collect(); + let stream_diff: Vec<_> = block_on( + TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher) + .map(|(path, diff)| (path, diff.unwrap())) + .collect(), + ); + assert_eq!(stream_diff, iter_diff); +} + #[test] fn test_from_legacy_tree() { let test_repo = TestRepo::init(); @@ -714,6 +730,7 @@ fn test_diff_resolved() { ), ) ); + diff_stream_equals_iter(&before_merged, &after_merged, &EverythingMatcher); } /// Diff two conflicted trees @@ -805,6 +822,7 @@ fn test_diff_conflicted() { }) .collect_vec(); assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &EverythingMatcher); // Test the reverse diff let actual_diff = right_merged .diff(&left_merged, &EverythingMatcher) @@ -820,6 +838,7 @@ fn test_diff_conflicted() { }) .collect_vec(); assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&right_merged, &left_merged, &EverythingMatcher); } #[test] @@ -972,6 +991,7 @@ fn test_diff_dir_file() { ), ]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &EverythingMatcher); } // Test the reverse diff @@ -1036,12 +1056,14 @@ fn test_diff_dir_file() { ), ]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&right_merged, &left_merged, &EverythingMatcher); } // Diff while filtering by `path1` (file1 -> directory1) as a file { + let matcher = FilesMatcher::new(&[path1.clone()]); let actual_diff = left_merged - .diff(&right_merged, &FilesMatcher::new(&[path1.clone()])) + .diff(&right_merged, &matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect_vec(); let expected_diff = vec![ @@ -1052,12 +1074,14 @@ fn test_diff_dir_file() { ), ]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &matcher); } // Diff while filtering by `path1/file` (file1 -> directory1) as a file { + let matcher = FilesMatcher::new(&[path1.join(&file)]); let actual_diff = left_merged - .diff(&right_merged, &FilesMatcher::new(&[path1.join(&file)])) + .diff(&right_merged, &matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect_vec(); let expected_diff = vec![ @@ -1068,12 +1092,14 @@ fn test_diff_dir_file() { ), ]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &matcher); } // Diff while filtering by `path1` (file1 -> directory1) as a prefix { + let matcher = PrefixMatcher::new(&[path1.clone()]); let actual_diff = left_merged - .diff(&right_merged, &PrefixMatcher::new(&[path1.clone()])) + .diff(&right_merged, &matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect_vec(); let expected_diff = vec![ @@ -1087,6 +1113,7 @@ fn test_diff_dir_file() { ), ]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &matcher); } // Diff while filtering by `path6` (directory1 -> file1+(directory1-absent)) as @@ -1094,8 +1121,9 @@ fn test_diff_dir_file() { // do see the directory that's included in the conflict with a file on the right // side. { + let matcher = FilesMatcher::new(&[path6.clone()]); let actual_diff = left_merged - .diff(&right_merged, &FilesMatcher::new(&[path6.clone()])) + .diff(&right_merged, &matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect_vec(); let expected_diff = vec![( @@ -1103,6 +1131,7 @@ fn test_diff_dir_file() { (Merge::absent(), right_merged.path_value(&path6)), )]; assert_eq!(actual_diff, expected_diff); + diff_stream_equals_iter(&left_merged, &right_merged, &matcher); } } From e6a1ed22d425a653e858cefbf20c49173c28bac8 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Thu, 19 Oct 2023 11:27:55 -0700 Subject: [PATCH 6/6] merged_tree: let backend influence whether to use new diff algo Since the concurrent diff algorithm is significantly slower when using the Git backend, I think we'll have to use switch between the two algorithms depending on backend. Even if the concurrent version always performed as well as the sequential version, exactly how concurrent it should be probably still depends on the backend. This commit therefore adds a function to the `Backend` trait, so each backend can say how much concurrency they deal well with. I then use that number for choosing between the sequential and concurrent versions in `MergedTree::diff_stream()`, and also to decide the number of concurrent reads to do in the concurrent version. --- cli/examples/custom-backend/main.rs | 4 ++++ lib/src/backend.rs | 9 +++++++++ lib/src/git_backend.rs | 4 ++++ lib/src/local_backend.rs | 4 ++++ lib/src/merged_tree.rs | 30 +++++++++++++++++++++-------- lib/src/store.rs | 4 ++++ lib/tests/test_merged_tree.rs | 3 ++- lib/testutils/src/test_backend.rs | 5 +++++ 8 files changed, 54 insertions(+), 9 deletions(-) diff --git a/cli/examples/custom-backend/main.rs b/cli/examples/custom-backend/main.rs index dddb71586b..d3d3825bd1 100644 --- a/cli/examples/custom-backend/main.rs +++ b/cli/examples/custom-backend/main.rs @@ -117,6 +117,10 @@ impl Backend for JitBackend { self.inner.empty_tree_id() } + fn concurrency(&self) -> usize { + 1 + } + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { self.inner.read_file(path, id).await } diff --git a/lib/src/backend.rs b/lib/src/backend.rs index 91c0f6a01b..57c3e237f9 100644 --- a/lib/src/backend.rs +++ b/lib/src/backend.rs @@ -473,6 +473,15 @@ pub trait Backend: Send + Sync + Debug { fn empty_tree_id(&self) -> &TreeId; + /// An estimate of how many concurrent requests this backend handles well. A + /// local backend like the Git backend (at until it supports partial clones) + /// may want to set this to 1. A cloud-backed backend may want to set it to + /// 100 or so. + /// + /// It is not guaranteed that at most this number of concurrent requests are + /// sent. + fn concurrency(&self) -> usize; + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult>; fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult; diff --git a/lib/src/git_backend.rs b/lib/src/git_backend.rs index 36057de1b3..656607f10c 100644 --- a/lib/src/git_backend.rs +++ b/lib/src/git_backend.rs @@ -612,6 +612,10 @@ impl Backend for GitBackend { &self.empty_tree_id } + fn concurrency(&self) -> usize { + 1 + } + async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult> { self.read_file_sync(id) } diff --git a/lib/src/local_backend.rs b/lib/src/local_backend.rs index e1ad0f8360..a510f58a61 100644 --- a/lib/src/local_backend.rs +++ b/lib/src/local_backend.rs @@ -149,6 +149,10 @@ impl Backend for LocalBackend { &self.empty_tree_id } + fn concurrency(&self) -> usize { + 1 + } + async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult> { let path = self.file_path(id); let file = File::open(path).map_err(|err| map_not_found_err(err, id))?; diff --git a/lib/src/merged_tree.rs b/lib/src/merged_tree.rs index 879b74f1f7..c98d559d1a 100644 --- a/lib/src/merged_tree.rs +++ b/lib/src/merged_tree.rs @@ -342,11 +342,21 @@ impl MergedTree { other: &MergedTree, matcher: &'matcher dyn Matcher, ) -> TreeDiffStream<'matcher> { - Box::pin(futures::stream::iter(TreeDiffIterator::new( - self.clone(), - other.clone(), - matcher, - ))) + let concurrency = self.store().concurrency(); + if concurrency <= 1 { + Box::pin(futures::stream::iter(TreeDiffIterator::new( + self.clone(), + other.clone(), + matcher, + ))) + } else { + Box::pin(TreeDiffStreamImpl::new( + self.clone(), + other.clone(), + matcher, + concurrency, + )) + } } /// Collects lists of modified, added, and removed files between this tree @@ -1048,15 +1058,19 @@ impl Ord for DiffStreamKey { impl<'matcher> TreeDiffStreamImpl<'matcher> { /// Creates a iterator over the differences between two trees. Generally /// prefer `MergedTree::diff_stream()` of calling this directly. - pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self { + pub fn new( + tree1: MergedTree, + tree2: MergedTree, + matcher: &'matcher dyn Matcher, + max_concurrent_reads: usize, + ) -> Self { let mut stream = Self { matcher, legacy_format_before: matches!(tree1, MergedTree::Legacy(_)), legacy_format_after: matches!(tree2, MergedTree::Legacy(_)), items: BTreeMap::new(), pending_trees: VecDeque::new(), - // TODO: maybe the backends can suggest the conurrency limit? - max_concurrent_reads: 100, + max_concurrent_reads, max_queued_items: 10000, }; stream.add_dir_diff_items(RepoPath::root(), Ok((tree1, tree2))); diff --git a/lib/src/store.rs b/lib/src/store.rs index 7291a9254f..832c2c381b 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -89,6 +89,10 @@ impl Store { self.backend.empty_tree_id() } + pub fn concurrency(&self) -> usize { + self.backend.concurrency() + } + pub fn empty_merged_tree_id(&self) -> MergedTreeId { MergedTreeId::Legacy(self.backend.empty_tree_id().clone()) } diff --git a/lib/tests/test_merged_tree.rs b/lib/tests/test_merged_tree.rs index 9e7ec1361b..b3a52deb1b 100644 --- a/lib/tests/test_merged_tree.rs +++ b/lib/tests/test_merged_tree.rs @@ -39,8 +39,9 @@ fn diff_stream_equals_iter(tree1: &MergedTree, tree2: &MergedTree, matcher: &dyn let iter_diff: Vec<_> = TreeDiffIterator::new(tree1.clone(), tree2.clone(), matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect(); + let max_concurrent_reads = 10; let stream_diff: Vec<_> = block_on( - TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher) + TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher, max_concurrent_reads) .map(|(path, diff)| (path, diff.unwrap())) .collect(), ); diff --git a/lib/testutils/src/test_backend.rs b/lib/testutils/src/test_backend.rs index fcc88b4110..f0d263fc14 100644 --- a/lib/testutils/src/test_backend.rs +++ b/lib/testutils/src/test_backend.rs @@ -138,6 +138,11 @@ impl Backend for TestBackend { &self.empty_tree_id } + fn concurrency(&self) -> usize { + // Not optimal, just for testing the async code more + 10 + } + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { match self .locked_data()