diff --git a/cli/examples/custom-backend/main.rs b/cli/examples/custom-backend/main.rs index dddb71586b..d3d3825bd1 100644 --- a/cli/examples/custom-backend/main.rs +++ b/cli/examples/custom-backend/main.rs @@ -117,6 +117,10 @@ impl Backend for JitBackend { self.inner.empty_tree_id() } + fn concurrency(&self) -> usize { + 1 + } + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { self.inner.read_file(path, id).await } diff --git a/lib/src/backend.rs b/lib/src/backend.rs index 91c0f6a01b..57c3e237f9 100644 --- a/lib/src/backend.rs +++ b/lib/src/backend.rs @@ -473,6 +473,15 @@ pub trait Backend: Send + Sync + Debug { fn empty_tree_id(&self) -> &TreeId; + /// An estimate of how many concurrent requests this backend handles well. A + /// local backend like the Git backend (at until it supports partial clones) + /// may want to set this to 1. A cloud-backed backend may want to set it to + /// 100 or so. + /// + /// It is not guaranteed that at most this number of concurrent requests are + /// sent. + fn concurrency(&self) -> usize; + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult>; fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult; diff --git a/lib/src/git_backend.rs b/lib/src/git_backend.rs index 36057de1b3..656607f10c 100644 --- a/lib/src/git_backend.rs +++ b/lib/src/git_backend.rs @@ -612,6 +612,10 @@ impl Backend for GitBackend { &self.empty_tree_id } + fn concurrency(&self) -> usize { + 1 + } + async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult> { self.read_file_sync(id) } diff --git a/lib/src/local_backend.rs b/lib/src/local_backend.rs index e1ad0f8360..a510f58a61 100644 --- a/lib/src/local_backend.rs +++ b/lib/src/local_backend.rs @@ -149,6 +149,10 @@ impl Backend for LocalBackend { &self.empty_tree_id } + fn concurrency(&self) -> usize { + 1 + } + async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult> { let path = self.file_path(id); let file = File::open(path).map_err(|err| map_not_found_err(err, id))?; diff --git a/lib/src/merged_tree.rs b/lib/src/merged_tree.rs index 879b74f1f7..c98d559d1a 100644 --- a/lib/src/merged_tree.rs +++ b/lib/src/merged_tree.rs @@ -342,11 +342,21 @@ impl MergedTree { other: &MergedTree, matcher: &'matcher dyn Matcher, ) -> TreeDiffStream<'matcher> { - Box::pin(futures::stream::iter(TreeDiffIterator::new( - self.clone(), - other.clone(), - matcher, - ))) + let concurrency = self.store().concurrency(); + if concurrency <= 1 { + Box::pin(futures::stream::iter(TreeDiffIterator::new( + self.clone(), + other.clone(), + matcher, + ))) + } else { + Box::pin(TreeDiffStreamImpl::new( + self.clone(), + other.clone(), + matcher, + concurrency, + )) + } } /// Collects lists of modified, added, and removed files between this tree @@ -1048,15 +1058,19 @@ impl Ord for DiffStreamKey { impl<'matcher> TreeDiffStreamImpl<'matcher> { /// Creates a iterator over the differences between two trees. Generally /// prefer `MergedTree::diff_stream()` of calling this directly. - pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self { + pub fn new( + tree1: MergedTree, + tree2: MergedTree, + matcher: &'matcher dyn Matcher, + max_concurrent_reads: usize, + ) -> Self { let mut stream = Self { matcher, legacy_format_before: matches!(tree1, MergedTree::Legacy(_)), legacy_format_after: matches!(tree2, MergedTree::Legacy(_)), items: BTreeMap::new(), pending_trees: VecDeque::new(), - // TODO: maybe the backends can suggest the conurrency limit? - max_concurrent_reads: 100, + max_concurrent_reads, max_queued_items: 10000, }; stream.add_dir_diff_items(RepoPath::root(), Ok((tree1, tree2))); diff --git a/lib/src/store.rs b/lib/src/store.rs index 7291a9254f..832c2c381b 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -89,6 +89,10 @@ impl Store { self.backend.empty_tree_id() } + pub fn concurrency(&self) -> usize { + self.backend.concurrency() + } + pub fn empty_merged_tree_id(&self) -> MergedTreeId { MergedTreeId::Legacy(self.backend.empty_tree_id().clone()) } diff --git a/lib/tests/test_merged_tree.rs b/lib/tests/test_merged_tree.rs index 9e7ec1361b..b3a52deb1b 100644 --- a/lib/tests/test_merged_tree.rs +++ b/lib/tests/test_merged_tree.rs @@ -39,8 +39,9 @@ fn diff_stream_equals_iter(tree1: &MergedTree, tree2: &MergedTree, matcher: &dyn let iter_diff: Vec<_> = TreeDiffIterator::new(tree1.clone(), tree2.clone(), matcher) .map(|(path, diff)| (path, diff.unwrap())) .collect(); + let max_concurrent_reads = 10; let stream_diff: Vec<_> = block_on( - TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher) + TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher, max_concurrent_reads) .map(|(path, diff)| (path, diff.unwrap())) .collect(), ); diff --git a/lib/testutils/src/test_backend.rs b/lib/testutils/src/test_backend.rs index fcc88b4110..f0d263fc14 100644 --- a/lib/testutils/src/test_backend.rs +++ b/lib/testutils/src/test_backend.rs @@ -138,6 +138,11 @@ impl Backend for TestBackend { &self.empty_tree_id } + fn concurrency(&self) -> usize { + // Not optimal, just for testing the async code more + 10 + } + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { match self .locked_data()