Skip to content

Commit

Permalink
merged_tree: use Merge<Tree> to represent pending trees in TreeDiffSt…
Browse files Browse the repository at this point in the history
…reamImpl

This seems a slightly better in that MergedTree no longer represent a subtree.
  • Loading branch information
yuja committed Nov 28, 2024
1 parent 0ba6d84 commit f737690
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
39 changes: 19 additions & 20 deletions lib/src/merged_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ impl MergedTree {
)))
} else {
Box::pin(TreeDiffStreamImpl::new(
self.clone(),
other.clone(),
self.trees.clone(),
other.trees.clone(),
matcher,
concurrency,
))
Expand Down Expand Up @@ -859,7 +859,7 @@ pub struct TreeDiffStreamImpl<'matcher> {
#[allow(clippy::type_complexity)]
pending_trees: VecDeque<(
RepoPathBuf,
BoxFuture<'matcher, BackendResult<(MergedTree, MergedTree)>>,
BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>,
)>,
/// The maximum number of trees to request concurrently. However, we do the
/// accounting per path, so for there will often be twice as many pending
Expand Down Expand Up @@ -922,21 +922,21 @@ impl<'matcher> TreeDiffStreamImpl<'matcher> {
/// Creates a iterator over the differences between two trees. Generally
/// prefer `MergedTree::diff_stream()` of calling this directly.
pub fn new(
tree1: MergedTree,
tree2: MergedTree,
trees1: Merge<Tree>,
trees2: Merge<Tree>,
matcher: &'matcher dyn Matcher,
max_concurrent_reads: usize,
) -> Self {
assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
let mut stream = Self {
store: tree1.store().clone(),
store: trees1.first().store().clone(),
matcher,
items: BTreeMap::new(),
pending_trees: VecDeque::new(),
max_concurrent_reads,
max_queued_items: 10000,
};
stream.add_dir_diff_items(RepoPathBuf::root(), Ok((tree1, tree2)));
stream.add_dir_diff_items(RepoPathBuf::root(), Ok((trees1, trees2)));
stream
}

Expand All @@ -952,37 +952,36 @@ impl<'matcher> TreeDiffStreamImpl<'matcher> {
}

/// Gets the given tree if `value` is a tree, otherwise an empty tree.
async fn tree(
async fn trees(
store: Arc<Store>,
dir: RepoPathBuf,
values: MergedTreeValue,
) -> BackendResult<MergedTree> {
let trees = if values.is_tree() {
) -> BackendResult<Merge<Tree>> {
if values.is_tree() {
let builder: MergeBuilder<Tree> = futures::stream::iter(values.iter())
.then(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
.try_collect()
.await?;
builder.build()
Ok(builder.build())
} else {
Merge::resolved(Tree::empty(store, dir))
};
Ok(MergedTree { trees })
Ok(Merge::resolved(Tree::empty(store, dir)))
}
}

fn add_dir_diff_items(
&mut self,
dir: RepoPathBuf,
tree_diff: BackendResult<(MergedTree, MergedTree)>,
tree_diff: BackendResult<(Merge<Tree>, Merge<Tree>)>,
) {
let (tree1, tree2) = match tree_diff {
let (trees1, trees2) = match tree_diff {
Ok(trees) => trees,
Err(err) => {
self.items.insert(DiffStreamKey::normal(dir), Err(err));
return;
}
};

for (basename, before, after) in merged_tree_entry_diff(&tree1.trees, &tree2.trees) {
for (basename, before, after) in merged_tree_entry_diff(&trees1, &trees2) {
let path = dir.join(basename);
let tree_before = before.is_tree();
let tree_after = after.is_tree();
Expand Down Expand Up @@ -1010,9 +1009,9 @@ impl<'matcher> TreeDiffStreamImpl<'matcher> {
// If the path was a tree on either side of the diff, read those trees.
if tree_matches {
let before_tree_future =
Self::tree(self.store.clone(), path.clone(), before.cloned());
Self::trees(self.store.clone(), path.clone(), before.cloned());
let after_tree_future =
Self::tree(self.store.clone(), path.clone(), after.cloned());
Self::trees(self.store.clone(), path.clone(), after.cloned());
let both_trees_future =
async { futures::try_join!(before_tree_future, after_tree_future) };
self.pending_trees
Expand Down
18 changes: 12 additions & 6 deletions lib/tests/test_merged_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,21 @@ fn diff_entry_tuple(diff: TreeDiffEntry) -> (RepoPathBuf, (MergedTreeValue, Merg
}

fn diff_stream_equals_iter(tree1: &MergedTree, tree2: &MergedTree, matcher: &dyn Matcher) {
let iter_diff: Vec<_> = TreeDiffIterator::new(tree1.as_merge(), tree2.as_merge(), matcher)
let trees1 = tree1.as_merge();
let trees2 = tree2.as_merge();
let iter_diff: Vec<_> = TreeDiffIterator::new(trees1, trees2, matcher)
.map(|diff| (diff.path, diff.values.unwrap()))
.collect();
let max_concurrent_reads = 10;
let stream_diff: Vec<_> =
TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher, max_concurrent_reads)
.map(|diff| (diff.path, diff.values.unwrap()))
.collect()
.block_on();
let stream_diff: Vec<_> = TreeDiffStreamImpl::new(
trees1.clone(),
trees2.clone(),
matcher,
max_concurrent_reads,
)
.map(|diff| (diff.path, diff.values.unwrap()))
.collect()
.block_on();
assert_eq!(stream_diff, iter_diff);
}

Expand Down

0 comments on commit f737690

Please sign in to comment.