From e6a1ed22d425a653e858cefbf20c49173c28bac8 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Thu, 19 Oct 2023 11:27:55 -0700 Subject: [PATCH] merged_tree: let backend influence whether to use new diff algo Since the concurrent diff algorithm is significantly slower when using the Git backend, I think we'll have to use switch between the two algorithms depending on backend. Even if the concurrent version always performed as well as the sequential version, exactly how concurrent it should be probably still depends on the backend. This commit therefore adds a function to the `Backend` trait, so each backend can say how much concurrency they deal well with. I then use that number for choosing between the sequential and concurrent versions in `MergedTree::diff_stream()`, and also to decide the number of concurrent reads to do in the concurrent version. --- cli/examples/custom-backend/main.rs | 4 ++++ lib/src/backend.rs | 9 +++++++++ lib/src/git_backend.rs | 4 ++++ lib/src/local_backend.rs | 4 ++++ lib/src/merged_tree.rs | 30 +++++++++++++++++++++-------- lib/src/store.rs | 4 ++++ lib/tests/test_merged_tree.rs | 3 ++- lib/testutils/src/test_backend.rs | 5 +++++ 8 files changed, 54 insertions(+), 9 deletions(-) diff --git a/cli/examples/custom-backend/main.rs b/cli/examples/custom-backend/main.rs index dddb71586b..d3d3825bd1 100644 --- a/cli/examples/custom-backend/main.rs +++ b/cli/examples/custom-backend/main.rs @@ -117,6 +117,10 @@ impl Backend for JitBackend { self.inner.empty_tree_id() } + fn concurrency(&self) -> usize { + 1 + } + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { self.inner.read_file(path, id).await } diff --git a/lib/src/backend.rs b/lib/src/backend.rs index 91c0f6a01b..57c3e237f9 100644 --- a/lib/src/backend.rs +++ b/lib/src/backend.rs @@ -473,6 +473,15 @@ pub trait Backend: Send + Sync + Debug { fn empty_tree_id(&self) -> &TreeId; + /// An estimate of how many concurrent requests this backend handles well. A + /// local backend like the Git backend (at until it supports partial clones) + /// may want to set this to 1. A cloud-backed backend may want to set it to + /// 100 or so. + /// + /// It is not guaranteed that at most this number of concurrent requests are + /// sent. + fn concurrency(&self) -> usize; + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult>; fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult; diff --git a/lib/src/git_backend.rs b/lib/src/git_backend.rs index 36057de1b3..656607f10c 100644 --- a/lib/src/git_backend.rs +++ b/lib/src/git_backend.rs @@ -612,6 +612,10 @@ impl Backend for GitBackend { &self.empty_tree_id } + fn concurrency(&self) -> usize { + 1 + } + async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult> { self.read_file_sync(id) } diff --git a/lib/src/local_backend.rs b/lib/src/local_backend.rs index e1ad0f8360..a510f58a61 100644 --- a/lib/src/local_backend.rs +++ b/lib/src/local_backend.rs @@ -149,6 +149,10 @@ impl Backend for LocalBackend { &self.empty_tree_id } + fn concurrency(&self) -> usize { + 1 + } + async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult> { let path = self.file_path(id); let file = File::open(path).map_err(|err| map_not_found_err(err, id))?; diff --git a/lib/src/merged_tree.rs b/lib/src/merged_tree.rs index 879b74f1f7..c98d559d1a 100644 --- a/lib/src/merged_tree.rs +++ b/lib/src/merged_tree.rs @@ -342,11 +342,21 @@ impl MergedTree { other: &MergedTree, matcher: &'matcher dyn Matcher, ) -> TreeDiffStream<'matcher> { - Box::pin(futures::stream::iter(TreeDiffIterator::new( - self.clone(), - other.clone(), - matcher, - ))) + let concurrency = self.store().concurrency(); + if concurrency <= 1 { + Box::pin(futures::stream::iter(TreeDiffIterator::new( + self.clone(), + other.clone(), + matcher, + ))) + } else { + Box::pin(TreeDiffStreamImpl::new( + self.clone(), + other.clone(), + matcher, + concurrency, + )) + } } /// Collects lists of modified, added, and removed files between this tree @@ -1048,15 +1058,19 @@ impl Ord for DiffStreamKey { impl<'matcher> TreeDiffStreamImpl<'matcher> { /// Creates a iterator over the differences between two trees. Generally /// prefer `MergedTree::diff_stream()` of calling this directly. - pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self { + pub fn new( + tree1: MergedTree, + tree2: MergedTree, + matcher: &'matcher dyn Matcher, + max_concurrent_reads: usize, + ) -> Self { let mut stream = Self { matcher, legacy_format_before: matches!(tree1, MergedTree::Legacy(_)), legacy_format_after: matches!(tree2, MergedTree::Legacy(_)), items: BTreeMap::new(), pending_trees: VecDeque::new(), - // TODO: maybe the backends can suggest the conurrency limit? - max_concurrent_reads: 100, + max_concurrent_reads, max_queued_items: 10000, }; stream.add_dir_diff_items(RepoPath::root(), Ok((tree1, tree2))); diff --git a/lib/src/store.rs b/lib/src/store.rs index 7291a9254f..832c2c381b 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -89,6 +89,10 @@ impl Store { self.backend.empty_tree_id() } + pub fn concurrency(&self) -> usize { + self.backend.concurrency() + } + pub fn empty_merged_tree_id(&self) -> MergedTreeId { MergedTreeId::Legacy(self.backend.empty_tree_id().clone()) } diff --git a/lib/tests/test_merged_tree.rs b/lib/tests/test_merged_tree.rs index 9e7ec1361b..b3a52deb1b 100644 --- a/lib/tests/test_merged_tree.rs +++ b/lib/tests/test_merged_tree.rs @@ -39,8 +39,9 @@ fn diff_stream_equals_iter(tree1: &MergedTree, tree2: &MergedTree, matcher: &dyn let iter_diff: Vec<_> = TreeDiffIterator::new(tree1.clone(), tree2.clone(), matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect(); + let max_concurrent_reads = 10; let stream_diff: Vec<_> = block_on( - TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher) + TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher, max_concurrent_reads) .map(|(path, diff)| (path, diff.unwrap())) .collect(), ); diff --git a/lib/testutils/src/test_backend.rs b/lib/testutils/src/test_backend.rs index fcc88b4110..f0d263fc14 100644 --- a/lib/testutils/src/test_backend.rs +++ b/lib/testutils/src/test_backend.rs @@ -138,6 +138,11 @@ impl Backend for TestBackend { &self.empty_tree_id } + fn concurrency(&self) -> usize { + // Not optimal, just for testing the async code more + 10 + } + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { match self .locked_data()