Skip to content

Commit

Permalink
merged_tree: let backend influence whether to use new diff algo
Browse files Browse the repository at this point in the history
Since the concurrent diff algorithm is significantly slower when using
the Git backend, I think we'll have to use switch between the two
algorithms depending on backend. Even if the concurrent version always
performed as well as the sequential version, exactly how concurrent it
should be probably still depends on the backend. This commit therefore
adds a function to the `Backend` trait, so each backend can say how
much concurrency they deal well with. I then use that number for
choosing between the sequential and concurrent versions in
`MergedTree::diff_stream()`, and also to decide the number of
concurrent reads to do in the concurrent version.
  • Loading branch information
martinvonz committed Nov 7, 2023
1 parent f40adb8 commit d989d40
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 9 deletions.
4 changes: 4 additions & 0 deletions cli/examples/custom-backend/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ impl Backend for JitBackend {
self.inner.empty_tree_id()
}

fn concurrency(&self) -> usize {
1
}

async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> {
self.inner.read_file(path, id).await
}
Expand Down
9 changes: 9 additions & 0 deletions lib/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,15 @@ pub trait Backend: Send + Sync + Debug {

fn empty_tree_id(&self) -> &TreeId;

/// An estimate of how many concurrent requests this backend handles well. A
/// local backend like the Git backend (at until it supports partial clones)
/// may want to set this to 1. A cloud-backed backend may want to set it to
/// 100 or so.
///
/// It is not guaranteed that at most this number of concurrent requests are
/// sent.
fn concurrency(&self) -> usize;

async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>>;

fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult<FileId>;
Expand Down
4 changes: 4 additions & 0 deletions lib/src/git_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,10 @@ impl Backend for GitBackend {
&self.empty_tree_id
}

fn concurrency(&self) -> usize {
1
}

async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> {
self.read_file_sync(id)
}
Expand Down
4 changes: 4 additions & 0 deletions lib/src/local_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ impl Backend for LocalBackend {
&self.empty_tree_id
}

fn concurrency(&self) -> usize {
1
}

async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> {
let path = self.file_path(id);
let file = File::open(path).map_err(|err| map_not_found_err(err, id))?;
Expand Down
30 changes: 22 additions & 8 deletions lib/src/merged_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,21 @@ impl MergedTree {
other: &MergedTree,
matcher: &'matcher dyn Matcher,
) -> TreeDiffStream<'matcher> {
Box::pin(futures::stream::iter(TreeDiffIterator::new(
self.clone(),
other.clone(),
matcher,
)))
let concurrency = self.store().concurrency();
if concurrency <= 1 {
Box::pin(futures::stream::iter(TreeDiffIterator::new(
self.clone(),
other.clone(),
matcher,
)))
} else {
Box::pin(TreeDiffStreamImpl::new(
self.clone(),
other.clone(),
matcher,
concurrency,
))
}
}

/// Collects lists of modified, added, and removed files between this tree
Expand Down Expand Up @@ -1048,15 +1058,19 @@ impl Ord for DiffStreamKey {
impl<'matcher> TreeDiffStreamImpl<'matcher> {
/// Creates a iterator over the differences between two trees. Generally
/// prefer `MergedTree::diff_stream()` of calling this directly.
pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self {
pub fn new(
tree1: MergedTree,
tree2: MergedTree,
matcher: &'matcher dyn Matcher,
max_concurrent_reads: usize,
) -> Self {
let mut stream = Self {
matcher,
legacy_format_before: matches!(tree1, MergedTree::Legacy(_)),
legacy_format_after: matches!(tree2, MergedTree::Legacy(_)),
items: BTreeMap::new(),
pending_trees: VecDeque::new(),
// TODO: maybe the backends can suggest the conurrency limit?
max_concurrent_reads: 100,
max_concurrent_reads,
max_queued_items: 10000,
};
stream.add_dir_diff_items(RepoPath::root(), Ok((tree1, tree2)));
Expand Down
4 changes: 4 additions & 0 deletions lib/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ impl Store {
self.backend.empty_tree_id()
}

pub fn concurrency(&self) -> usize {
self.backend.concurrency()
}

pub fn empty_merged_tree_id(&self) -> MergedTreeId {
MergedTreeId::Legacy(self.backend.empty_tree_id().clone())
}
Expand Down
3 changes: 2 additions & 1 deletion lib/tests/test_merged_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ fn diff_stream_equals_iter(tree1: &MergedTree, tree2: &MergedTree, matcher: &dyn
let iter_diff: Vec<_> = TreeDiffIterator::new(tree1.clone(), tree2.clone(), matcher)
.map(|(path, diff)| (path, diff.unwrap()))
.collect();
let max_concurrent_reads = 10;
let stream_diff: Vec<_> = block_on(
TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher)
TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher, max_concurrent_reads)
.map(|(path, diff)| (path, diff.unwrap()))
.collect(),
);
Expand Down
5 changes: 5 additions & 0 deletions lib/testutils/src/test_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ impl Backend for TestBackend {
&self.empty_tree_id
}

fn concurrency(&self) -> usize {
// Not optimal, just for testing the async code more
10
}

async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> {
match self
.locked_data()
Expand Down

0 comments on commit d989d40

Please sign in to comment.