From 15e26b30454a728615f765c459520e450a9c6d37 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Thu, 5 Oct 2023 21:39:14 -0700 Subject: [PATCH 1/4] git_backend: rename some `store` variables `backend` in tests This is to avoid confusion with instances of the `Store` type. --- lib/src/git_backend.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/lib/src/git_backend.rs b/lib/src/git_backend.rs index 1218f6c64d..364f397b24 100644 --- a/lib/src/git_backend.rs +++ b/lib/src/git_backend.rs @@ -990,14 +990,14 @@ mod tests { .unwrap(); let commit_id2 = CommitId::from_bytes(git_commit_id2.as_bytes()); - let store = GitBackend::init_external(store_path, &git_repo_path).unwrap(); + let backend = GitBackend::init_external(store_path, &git_repo_path).unwrap(); // Import the head commit and its ancestors - store + backend .import_head_commits([&commit_id2], uses_tree_conflict_format) .unwrap(); // Ref should be created only for the head commit - let git_refs = store + let git_refs = backend .git_repo() .references_glob("refs/jj/keep/*") .unwrap() @@ -1005,7 +1005,7 @@ mod tests { .collect_vec(); assert_eq!(git_refs, vec![git_commit_id2]); - let commit = store.read_commit(&commit_id).unwrap(); + let commit = backend.read_commit(&commit_id).unwrap(); assert_eq!(&commit.change_id, &change_id); assert_eq!(commit.parents, vec![CommitId::from_bytes(&[0; 20])]); assert_eq!(commit.predecessors, vec![]); @@ -1034,7 +1034,7 @@ mod tests { ); assert_eq!(commit.committer.timestamp.tz_offset, -480); - let root_tree = store + let root_tree = backend .read_tree( &RepoPath::root(), &TreeId::from_bytes(root_tree_id.as_bytes()), @@ -1049,7 +1049,7 @@ mod tests { &TreeValue::Tree(TreeId::from_bytes(dir_tree_id.as_bytes())) ); - let dir_tree = store + let dir_tree = backend .read_tree( &RepoPath::from_internal_string("dir"), &TreeId::from_bytes(dir_tree_id.as_bytes()), @@ -1073,7 +1073,7 @@ mod tests { &TreeValue::Symlink(SymlinkId::from_bytes(blob2.as_bytes())) ); - let commit2 = store.read_commit(&commit_id2).unwrap(); + let commit2 = backend.read_commit(&commit_id2).unwrap(); assert_eq!(commit2.parents, vec![commit_id.clone()]); assert_eq!(commit.predecessors, vec![]); assert_eq!( @@ -1314,7 +1314,7 @@ mod tests { #[test] fn commit_has_ref() { let temp_dir = testutils::new_temp_dir(); - let store = GitBackend::init_internal(temp_dir.path()).unwrap(); + let backend = GitBackend::init_internal(temp_dir.path()).unwrap(); let signature = Signature { name: "Someone".to_string(), email: "someone@example.com".to_string(), @@ -1324,16 +1324,16 @@ mod tests { }, }; let commit = Commit { - parents: vec![store.root_commit_id().clone()], + parents: vec![backend.root_commit_id().clone()], predecessors: vec![], - root_tree: MergedTreeId::Legacy(store.empty_tree_id().clone()), + root_tree: MergedTreeId::Legacy(backend.empty_tree_id().clone()), change_id: ChangeId::new(vec![]), description: "initial".to_string(), author: signature.clone(), committer: signature, }; - let commit_id = store.write_commit(commit).unwrap().0; - let git_refs = store + let commit_id = backend.write_commit(commit).unwrap().0; + let git_refs = backend .git_repo() .references_glob("refs/jj/keep/*") .unwrap() @@ -1345,11 +1345,11 @@ mod tests { #[test] fn overlapping_git_commit_id() { let temp_dir = testutils::new_temp_dir(); - let store = GitBackend::init_internal(temp_dir.path()).unwrap(); + let backend = GitBackend::init_internal(temp_dir.path()).unwrap(); let mut commit1 = Commit { - parents: vec![store.root_commit_id().clone()], + parents: vec![backend.root_commit_id().clone()], predecessors: vec![], - root_tree: MergedTreeId::Legacy(store.empty_tree_id().clone()), + root_tree: MergedTreeId::Legacy(backend.empty_tree_id().clone()), change_id: ChangeId::new(vec![]), description: "initial".to_string(), author: create_signature(), @@ -1359,13 +1359,13 @@ mod tests { // second after the epoch, so the timestamp adjustment can remove 1 // second and it will still be nonnegative commit1.committer.timestamp.timestamp = MillisSinceEpoch(1000); - let (commit_id1, mut commit2) = store.write_commit(commit1).unwrap(); + let (commit_id1, mut commit2) = backend.write_commit(commit1).unwrap(); commit2.predecessors.push(commit_id1.clone()); // `write_commit` should prevent the ids from being the same by changing the // committer timestamp of the commit it actually writes. - let (commit_id2, mut actual_commit2) = store.write_commit(commit2.clone()).unwrap(); + let (commit_id2, mut actual_commit2) = backend.write_commit(commit2.clone()).unwrap(); // The returned matches the ID - assert_eq!(store.read_commit(&commit_id2).unwrap(), actual_commit2); + assert_eq!(backend.read_commit(&commit_id2).unwrap(), actual_commit2); assert_ne!(commit_id2, commit_id1); // The committer timestamp should differ assert_ne!( From 9181b44d564fdc0bdf1084acf7355faf874b747f Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Wed, 6 Sep 2023 12:59:17 -0700 Subject: [PATCH 2/4] backend: make read functions async The commit backend at Google is cloud-based (and so are the other backends); it reads and writes commits from/to a server, which stores them in a database. That makes latency much higher than for disk-based backends. To reduce the latency, we have a local daemon process that caches and prefetches objects. There are still many cases where latency is high, such as when diffing two uncached commits. We can improve that by changing some of our (jj's) algorithms to read many objects concurrently from the backend. In the case of tree-diffing, we can fetch one level (depth) of the tree at a time. There are several ways of doing that: * Make the backend methods `async` * Use many threads for reading from the backend * Add backend methods for batch reading I don't think we typically need CPU parallelism, so it's wasteful to have hundreds of threads running in order to fetch hundreds of objects in parallel (especially when using a synchronous backend like the Git backend). Batching would work well for the tree-diffing case, but it's not as composable as `async`. For example, if we wanted to fetch some commits at the same time as we were doing a diff, it's hard to see how to do that with batching. Using async seems like our best bet. I didn't make the backend interface's write functions async because writes are already async with the daemon we have at Google. That daemon will hash the object and immediately return, and then send the object to the server in the background. I think any cloud-based solution will need a similar daemon process. However, we may need to reconsider this if/when jj gets used on a server with a custom backend that writes directly to a database (i.e. no async daemon in between). I've tried to measure the performance impact. That's the largest difference I've been able to measure was on `jj diff --ignore-working-copy -s --from v5.0 --to v6.0` in the Linux repo, which increases from 749 ms to 773 ms (3.3%). In most cases I've tested, there's no measurable difference. I've tried diffing from the root commit, as well as `jj --ignore-working-copy log --no-graph -r '::v3.0 & author(torvalds)' -T 'commit_id ++ "\n"'` (to test a commit-heavy load). --- Cargo.lock | 4 ++ Cargo.toml | 2 + cli/Cargo.toml | 1 + cli/examples/custom-backend/main.rs | 22 +++++---- lib/Cargo.toml | 3 ++ lib/src/backend.rs | 12 +++-- lib/src/git_backend.rs | 72 ++++++++++++++++------------- lib/src/local_backend.rs | 21 +++++---- lib/src/store.rs | 51 ++++++++++++++++---- lib/testutils/Cargo.toml | 1 + lib/testutils/src/test_backend.rs | 12 +++-- 11 files changed, 131 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9658e1c3ee..2dd3aa1107 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -990,6 +990,7 @@ dependencies = [ "anyhow", "assert_cmd", "assert_matches", + "async-trait", "cargo_metadata", "chrono", "clap", @@ -1036,6 +1037,7 @@ name = "jj-lib" version = "0.10.0" dependencies = [ "assert_matches", + "async-trait", "backoff", "blake2", "byteorder", @@ -1046,6 +1048,7 @@ dependencies = [ "digest", "either", "esl01-renderdag", + "futures 0.3.28", "git2", "hex", "insta", @@ -2083,6 +2086,7 @@ dependencies = [ name = "testutils" version = "0.10.0" dependencies = [ + "async-trait", "config", "git2", "itertools 0.11.0", diff --git a/Cargo.toml b/Cargo.toml index 98e046d5bb..a485b72aab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ keywords = ["VCS", "DVCS", "SCM", "Git", "Mercurial"] anyhow = "1.0.75" assert_cmd = "2.0.8" assert_matches = "1.5.0" +async-trait = "0.1.73" backoff = "0.4.0" blake2 = "0.10.6" byteorder = "1.5.0" @@ -39,6 +40,7 @@ digest = "0.10.7" dirs = "5.0.1" either = "1.9.0" esl01-renderdag = "0.3.0" +futures = "0.3.28" glob = "0.3.1" git2 = "0.17.2" hex = "0.4.3" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 261fef02dd..abdfd47685 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -69,6 +69,7 @@ libc = { workspace = true } [dev-dependencies] anyhow = { workspace = true } +async-trait = { workspace = true } assert_cmd = { workspace = true } assert_matches = { workspace = true } insta = { workspace = true } diff --git a/cli/examples/custom-backend/main.rs b/cli/examples/custom-backend/main.rs index 5a39f39ab7..f834d0398c 100644 --- a/cli/examples/custom-backend/main.rs +++ b/cli/examples/custom-backend/main.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::io::Read; use std::path::Path; +use async_trait::async_trait; use jj_cli::cli_util::{CliRunner, CommandError, CommandHelper}; use jj_cli::ui::Ui; use jj_lib::backend::{ @@ -86,6 +87,7 @@ impl JitBackend { } } +#[async_trait] impl Backend for JitBackend { fn as_any(&self) -> &dyn Any { self @@ -115,40 +117,40 @@ impl Backend for JitBackend { self.inner.empty_tree_id() } - fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { - self.inner.read_file(path, id) + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { + self.inner.read_file(path, id).await } fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult { self.inner.write_file(path, contents) } - fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult { - self.inner.read_symlink(path, id) + async fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult { + self.inner.read_symlink(path, id).await } fn write_symlink(&self, path: &RepoPath, target: &str) -> BackendResult { self.inner.write_symlink(path, target) } - fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult { - self.inner.read_tree(path, id) + async fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult { + self.inner.read_tree(path, id).await } fn write_tree(&self, path: &RepoPath, contents: &Tree) -> BackendResult { self.inner.write_tree(path, contents) } - fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult { - self.inner.read_conflict(path, id) + async fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult { + self.inner.read_conflict(path, id).await } fn write_conflict(&self, path: &RepoPath, contents: &Conflict) -> BackendResult { self.inner.write_conflict(path, contents) } - fn read_commit(&self, id: &CommitId) -> BackendResult { - self.inner.read_commit(id) + async fn read_commit(&self, id: &CommitId) -> BackendResult { + self.inner.read_commit(id).await } fn write_commit(&self, contents: Commit) -> BackendResult<(CommitId, Commit)> { diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 530f681625..cce1d098f6 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -19,6 +19,7 @@ harness = false version_check = { workspace = true } [dependencies] +async-trait = { workspace = true} backoff = { workspace = true } blake2 = { workspace = true } byteorder = { workspace = true } @@ -26,6 +27,7 @@ bytes = { workspace = true } chrono = { workspace = true } config = { workspace = true } digest = { workspace = true } +futures = { workspace = true } either = { workspace = true } git2 = { workspace = true } hex = { workspace = true } @@ -63,6 +65,7 @@ num_cpus = { workspace = true } pretty_assertions = { workspace = true } test-case = { workspace = true } testutils = { workspace = true } +tokio = { workspace = true, features = ["full"] } [features] default = [] diff --git a/lib/src/backend.rs b/lib/src/backend.rs index bfb702a816..e059546e6d 100644 --- a/lib/src/backend.rs +++ b/lib/src/backend.rs @@ -21,6 +21,7 @@ use std::io::Read; use std::result::Result; use std::vec::Vec; +use async_trait::async_trait; use thiserror::Error; use crate::content_hash::ContentHash; @@ -465,6 +466,7 @@ pub fn make_root_commit(root_change_id: ChangeId, empty_tree_id: TreeId) -> Comm } } +#[async_trait] pub trait Backend: Send + Sync + Debug { fn as_any(&self) -> &dyn Any; @@ -484,23 +486,23 @@ pub trait Backend: Send + Sync + Debug { fn empty_tree_id(&self) -> &TreeId; - fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult>; + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult>; fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult; - fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult; + async fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult; fn write_symlink(&self, path: &RepoPath, target: &str) -> BackendResult; - fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult; + async fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult; fn write_tree(&self, path: &RepoPath, contents: &Tree) -> BackendResult; - fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult; + async fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult; fn write_conflict(&self, path: &RepoPath, contents: &Conflict) -> BackendResult; - fn read_commit(&self, id: &CommitId) -> BackendResult; + async fn read_commit(&self, id: &CommitId) -> BackendResult; /// Writes a commit and returns its ID and the commit itself. The commit /// should contain the data that was actually written, which may differ diff --git a/lib/src/git_backend.rs b/lib/src/git_backend.rs index 364f397b24..5058c09d9a 100644 --- a/lib/src/git_backend.rs +++ b/lib/src/git_backend.rs @@ -22,6 +22,7 @@ use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, MutexGuard}; +use async_trait::async_trait; use git2::Oid; use itertools::Itertools; use prost::Message; @@ -479,6 +480,7 @@ impl Debug for GitBackend { } } +#[async_trait] impl Backend for GitBackend { fn as_any(&self) -> &dyn Any { self @@ -508,7 +510,7 @@ impl Backend for GitBackend { &self.empty_tree_id } - fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult> { + async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult> { let git_blob_id = validate_git_object_id(id)?; let locked_repo = self.repo.lock().unwrap(); let blob = locked_repo @@ -531,7 +533,7 @@ impl Backend for GitBackend { Ok(FileId::new(oid.as_bytes().to_vec())) } - fn read_symlink(&self, _path: &RepoPath, id: &SymlinkId) -> Result { + async fn read_symlink(&self, _path: &RepoPath, id: &SymlinkId) -> Result { let git_blob_id = validate_git_object_id(id)?; let locked_repo = self.repo.lock().unwrap(); let blob = locked_repo @@ -558,7 +560,7 @@ impl Backend for GitBackend { Ok(SymlinkId::new(oid.as_bytes().to_vec())) } - fn read_tree(&self, _path: &RepoPath, id: &TreeId) -> BackendResult { + async fn read_tree(&self, _path: &RepoPath, id: &TreeId) -> BackendResult { if id == &self.empty_tree_id { return Ok(Tree::default()); } @@ -653,11 +655,13 @@ impl Backend for GitBackend { Ok(TreeId::from_bytes(oid.as_bytes())) } - fn read_conflict(&self, _path: &RepoPath, id: &ConflictId) -> BackendResult { - let mut file = self.read_file( - &RepoPath::from_internal_string("unused"), - &FileId::new(id.to_bytes()), - )?; + async fn read_conflict(&self, _path: &RepoPath, id: &ConflictId) -> BackendResult { + let mut file = self + .read_file( + &RepoPath::from_internal_string("unused"), + &FileId::new(id.to_bytes()), + ) + .await?; let mut data = String::new(); file.read_to_string(&mut data) .map_err(|err| BackendError::ReadObject { @@ -690,7 +694,7 @@ impl Backend for GitBackend { } #[tracing::instrument(skip(self))] - fn read_commit(&self, id: &CommitId) -> BackendResult { + async fn read_commit(&self, id: &CommitId) -> BackendResult { if *id == self.root_commit_id { return Ok(make_root_commit( self.root_change_id().clone(), @@ -1005,7 +1009,7 @@ mod tests { .collect_vec(); assert_eq!(git_refs, vec![git_commit_id2]); - let commit = backend.read_commit(&commit_id).unwrap(); + let commit = futures::executor::block_on(backend.read_commit(&commit_id)).unwrap(); assert_eq!(&commit.change_id, &change_id); assert_eq!(commit.parents, vec![CommitId::from_bytes(&[0; 20])]); assert_eq!(commit.predecessors, vec![]); @@ -1034,12 +1038,11 @@ mod tests { ); assert_eq!(commit.committer.timestamp.tz_offset, -480); - let root_tree = backend - .read_tree( - &RepoPath::root(), - &TreeId::from_bytes(root_tree_id.as_bytes()), - ) - .unwrap(); + let root_tree = futures::executor::block_on(backend.read_tree( + &RepoPath::root(), + &TreeId::from_bytes(root_tree_id.as_bytes()), + )) + .unwrap(); let mut root_entries = root_tree.entries(); let dir = root_entries.next().unwrap(); assert_eq!(root_entries.next(), None); @@ -1049,12 +1052,11 @@ mod tests { &TreeValue::Tree(TreeId::from_bytes(dir_tree_id.as_bytes())) ); - let dir_tree = backend - .read_tree( - &RepoPath::from_internal_string("dir"), - &TreeId::from_bytes(dir_tree_id.as_bytes()), - ) - .unwrap(); + let dir_tree = futures::executor::block_on(backend.read_tree( + &RepoPath::from_internal_string("dir"), + &TreeId::from_bytes(dir_tree_id.as_bytes()), + )) + .unwrap(); let mut entries = dir_tree.entries(); let file = entries.next().unwrap(); let symlink = entries.next().unwrap(); @@ -1073,7 +1075,7 @@ mod tests { &TreeValue::Symlink(SymlinkId::from_bytes(blob2.as_bytes())) ); - let commit2 = backend.read_commit(&commit_id2).unwrap(); + let commit2 = futures::executor::block_on(backend.read_commit(&commit_id2)).unwrap(); assert_eq!(commit2.parents, vec![commit_id.clone()]); assert_eq!(commit.predecessors, vec![]); assert_eq!( @@ -1112,9 +1114,10 @@ mod tests { // read_commit() without import_head_commits() works as of now. This might be // changed later. - assert!(backend - .read_commit(&CommitId::from_bytes(git_commit_id.as_bytes())) - .is_ok()); + assert!(futures::executor::block_on( + backend.read_commit(&CommitId::from_bytes(git_commit_id.as_bytes())) + ) + .is_ok()); assert!( backend .cached_extra_metadata_table() @@ -1202,7 +1205,7 @@ mod tests { // Only root commit as parent commit.parents = vec![backend.root_commit_id().clone()]; let first_id = backend.write_commit(commit.clone()).unwrap().0; - let first_commit = backend.read_commit(&first_id).unwrap(); + let first_commit = futures::executor::block_on(backend.read_commit(&first_id)).unwrap(); assert_eq!(first_commit, commit); let first_git_commit = git_repo.find_commit(git_id(&first_id)).unwrap(); assert_eq!(first_git_commit.parent_ids().collect_vec(), vec![]); @@ -1210,7 +1213,7 @@ mod tests { // Only non-root commit as parent commit.parents = vec![first_id.clone()]; let second_id = backend.write_commit(commit.clone()).unwrap().0; - let second_commit = backend.read_commit(&second_id).unwrap(); + let second_commit = futures::executor::block_on(backend.read_commit(&second_id)).unwrap(); assert_eq!(second_commit, commit); let second_git_commit = git_repo.find_commit(git_id(&second_id)).unwrap(); assert_eq!( @@ -1221,7 +1224,7 @@ mod tests { // Merge commit commit.parents = vec![first_id.clone(), second_id.clone()]; let merge_id = backend.write_commit(commit.clone()).unwrap().0; - let merge_commit = backend.read_commit(&merge_id).unwrap(); + let merge_commit = futures::executor::block_on(backend.read_commit(&merge_id)).unwrap(); assert_eq!(merge_commit, commit); let merge_git_commit = git_repo.find_commit(git_id(&merge_id)).unwrap(); assert_eq!( @@ -1271,7 +1274,8 @@ mod tests { // When writing a tree-level conflict, the root tree on the git side has the // individual trees as subtrees. let read_commit_id = backend.write_commit(commit.clone()).unwrap().0; - let read_commit = backend.read_commit(&read_commit_id).unwrap(); + let read_commit = + futures::executor::block_on(backend.read_commit(&read_commit_id)).unwrap(); assert_eq!(read_commit, commit); let git_commit = git_repo .find_commit(Oid::from_bytes(read_commit_id.as_bytes()).unwrap()) @@ -1300,7 +1304,8 @@ mod tests { // regular git tree. commit.root_tree = MergedTreeId::resolved(create_tree(5)); let read_commit_id = backend.write_commit(commit.clone()).unwrap().0; - let read_commit = backend.read_commit(&read_commit_id).unwrap(); + let read_commit = + futures::executor::block_on(backend.read_commit(&read_commit_id)).unwrap(); assert_eq!(read_commit, commit); let git_commit = git_repo .find_commit(Oid::from_bytes(read_commit_id.as_bytes()).unwrap()) @@ -1365,7 +1370,10 @@ mod tests { // committer timestamp of the commit it actually writes. let (commit_id2, mut actual_commit2) = backend.write_commit(commit2.clone()).unwrap(); // The returned matches the ID - assert_eq!(backend.read_commit(&commit_id2).unwrap(), actual_commit2); + assert_eq!( + futures::executor::block_on(backend.read_commit(&commit_id2)).unwrap(), + actual_commit2 + ); assert_ne!(commit_id2, commit_id1); // The committer timestamp should differ assert_ne!( diff --git a/lib/src/local_backend.rs b/lib/src/local_backend.rs index 77a13da1ed..e9a13cc7d5 100644 --- a/lib/src/local_backend.rs +++ b/lib/src/local_backend.rs @@ -21,6 +21,7 @@ use std::fs::File; use std::io::{Read, Write}; use std::path::{Path, PathBuf}; +use async_trait::async_trait; use blake2::{Blake2b512, Digest}; use prost::Message; use tempfile::NamedTempFile; @@ -114,6 +115,7 @@ impl LocalBackend { } } +#[async_trait] impl Backend for LocalBackend { fn as_any(&self) -> &dyn Any { self @@ -143,7 +145,7 @@ impl Backend for LocalBackend { &self.empty_tree_id } - fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult> { + async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult> { let path = self.file_path(id); let file = File::open(path).map_err(|err| map_not_found_err(err, id))?; Ok(Box::new(zstd::Decoder::new(file).map_err(to_other_err)?)) @@ -171,7 +173,7 @@ impl Backend for LocalBackend { Ok(id) } - fn read_symlink(&self, _path: &RepoPath, id: &SymlinkId) -> Result { + async fn read_symlink(&self, _path: &RepoPath, id: &SymlinkId) -> Result { let path = self.symlink_path(id); let target = fs::read_to_string(path).map_err(|err| map_not_found_err(err, id))?; Ok(target) @@ -191,7 +193,7 @@ impl Backend for LocalBackend { Ok(id) } - fn read_tree(&self, _path: &RepoPath, id: &TreeId) -> BackendResult { + async fn read_tree(&self, _path: &RepoPath, id: &TreeId) -> BackendResult { let path = self.tree_path(id); let buf = fs::read(path).map_err(|err| map_not_found_err(err, id))?; @@ -215,7 +217,7 @@ impl Backend for LocalBackend { Ok(id) } - fn read_conflict(&self, _path: &RepoPath, id: &ConflictId) -> BackendResult { + async fn read_conflict(&self, _path: &RepoPath, id: &ConflictId) -> BackendResult { let path = self.conflict_path(id); let buf = fs::read(path).map_err(|err| map_not_found_err(err, id))?; @@ -239,7 +241,7 @@ impl Backend for LocalBackend { Ok(id) } - fn read_commit(&self, id: &CommitId) -> BackendResult { + async fn read_commit(&self, id: &CommitId) -> BackendResult { if *id == self.root_commit_id { return Ok(make_root_commit( self.root_change_id().clone(), @@ -486,25 +488,26 @@ mod tests { // Only root commit as parent commit.parents = vec![backend.root_commit_id().clone()]; let first_id = backend.write_commit(commit.clone()).unwrap().0; - let first_commit = backend.read_commit(&first_id).unwrap(); + let first_commit = futures::executor::block_on(backend.read_commit(&first_id)).unwrap(); assert_eq!(first_commit, commit); // Only non-root commit as parent commit.parents = vec![first_id.clone()]; let second_id = backend.write_commit(commit.clone()).unwrap().0; - let second_commit = backend.read_commit(&second_id).unwrap(); + let second_commit = futures::executor::block_on(backend.read_commit(&second_id)).unwrap(); assert_eq!(second_commit, commit); // Merge commit commit.parents = vec![first_id.clone(), second_id.clone()]; let merge_id = backend.write_commit(commit.clone()).unwrap().0; - let merge_commit = backend.read_commit(&merge_id).unwrap(); + let merge_commit = futures::executor::block_on(backend.read_commit(&merge_id)).unwrap(); assert_eq!(merge_commit, commit); // Merge commit with root as one parent commit.parents = vec![first_id, backend.root_commit_id().clone()]; let root_merge_id = backend.write_commit(commit.clone()).unwrap().0; - let root_merge_commit = backend.read_commit(&root_merge_id).unwrap(); + let root_merge_commit = + futures::executor::block_on(backend.read_commit(&root_merge_id)).unwrap(); assert_eq!(root_merge_commit, commit); } diff --git a/lib/src/store.rs b/lib/src/store.rs index a82be9f3db..ed743ac84e 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -97,18 +97,22 @@ impl Store { } pub fn get_commit(self: &Arc, id: &CommitId) -> BackendResult { - let data = self.get_backend_commit(id)?; + futures::executor::block_on(self.get_commit_async(id)) + } + + pub async fn get_commit_async(self: &Arc, id: &CommitId) -> BackendResult { + let data = self.get_backend_commit(id).await?; Ok(Commit::new(self.clone(), id.clone(), data)) } - fn get_backend_commit(&self, id: &CommitId) -> BackendResult> { + async fn get_backend_commit(&self, id: &CommitId) -> BackendResult> { { let read_locked_cached = self.commit_cache.read().unwrap(); if let Some(data) = read_locked_cached.get(id).cloned() { return Ok(data); } } - let commit = self.backend.read_commit(id)?; + let commit = self.backend.read_commit(id).await?; let data = Arc::new(commit); let mut write_locked_cache = self.commit_cache.write().unwrap(); write_locked_cache.insert(id.clone(), data.clone()); @@ -128,11 +132,23 @@ impl Store { } pub fn get_tree(self: &Arc, dir: &RepoPath, id: &TreeId) -> BackendResult { - let data = self.get_backend_tree(dir, id)?; + futures::executor::block_on(self.get_tree_async(dir, id)) + } + + pub async fn get_tree_async( + self: &Arc, + dir: &RepoPath, + id: &TreeId, + ) -> BackendResult { + let data = self.get_backend_tree(dir, id).await?; Ok(Tree::new(self.clone(), dir.clone(), id.clone(), data)) } - fn get_backend_tree(&self, dir: &RepoPath, id: &TreeId) -> BackendResult> { + async fn get_backend_tree( + &self, + dir: &RepoPath, + id: &TreeId, + ) -> BackendResult> { let key = (dir.clone(), id.clone()); { let read_locked_cache = self.tree_cache.read().unwrap(); @@ -140,7 +156,8 @@ impl Store { return Ok(data); } } - let data = Arc::new(self.backend.read_tree(dir, id)?); + let data = self.backend.read_tree(dir, id).await?; + let data = Arc::new(data); let mut write_locked_cache = self.tree_cache.write().unwrap(); write_locked_cache.insert(key, data.clone()); Ok(data) @@ -175,7 +192,15 @@ impl Store { } pub fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { - self.backend.read_file(path, id) + futures::executor::block_on(self.read_file_async(path, id)) + } + + pub async fn read_file_async( + &self, + path: &RepoPath, + id: &FileId, + ) -> BackendResult> { + self.backend.read_file(path, id).await } pub fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult { @@ -183,7 +208,15 @@ impl Store { } pub fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult { - self.backend.read_symlink(path, id) + futures::executor::block_on(self.read_symlink_async(path, id)) + } + + pub async fn read_symlink_async( + &self, + path: &RepoPath, + id: &SymlinkId, + ) -> BackendResult { + self.backend.read_symlink(path, id).await } pub fn write_symlink(&self, path: &RepoPath, contents: &str) -> BackendResult { @@ -195,7 +228,7 @@ impl Store { path: &RepoPath, id: &ConflictId, ) -> BackendResult>> { - let backend_conflict = self.backend.read_conflict(path, id)?; + let backend_conflict = futures::executor::block_on(self.backend.read_conflict(path, id))?; Ok(Merge::from_backend_conflict(backend_conflict)) } diff --git a/lib/testutils/Cargo.toml b/lib/testutils/Cargo.toml index f5f98680f6..baefbe85e5 100644 --- a/lib/testutils/Cargo.toml +++ b/lib/testutils/Cargo.toml @@ -15,6 +15,7 @@ readme = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = { workspace = true } config = { workspace = true } git2 = { workspace = true } itertools = { workspace = true } diff --git a/lib/testutils/src/test_backend.rs b/lib/testutils/src/test_backend.rs index 5cc2146835..9bd7be0bdf 100644 --- a/lib/testutils/src/test_backend.rs +++ b/lib/testutils/src/test_backend.rs @@ -19,6 +19,7 @@ use std::io::{Cursor, Read}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, MutexGuard, OnceLock}; +use async_trait::async_trait; use jj_lib::backend::{ make_root_commit, Backend, BackendError, BackendResult, ChangeId, Commit, CommitId, Conflict, ConflictId, FileId, ObjectId, SymlinkId, Tree, TreeId, @@ -107,6 +108,7 @@ impl Debug for TestBackend { } } +#[async_trait] impl Backend for TestBackend { fn as_any(&self) -> &dyn Any { self @@ -136,7 +138,7 @@ impl Backend for TestBackend { &self.empty_tree_id } - fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { + async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult> { match self .locked_data() .files @@ -165,7 +167,7 @@ impl Backend for TestBackend { Ok(id) } - fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> Result { + async fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> Result { match self .locked_data() .symlinks @@ -192,7 +194,7 @@ impl Backend for TestBackend { Ok(id) } - fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult { + async fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult { if id == &self.empty_tree_id { return Ok(Tree::default()); } @@ -222,7 +224,7 @@ impl Backend for TestBackend { Ok(id) } - fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult { + async fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult { match self .locked_data() .conflicts @@ -249,7 +251,7 @@ impl Backend for TestBackend { Ok(id) } - fn read_commit(&self, id: &CommitId) -> BackendResult { + async fn read_commit(&self, id: &CommitId) -> BackendResult { if id == &self.root_commit_id { return Ok(make_root_commit( self.root_change_id.clone(), From 8f46affcdf44d4a28ef775f19290f9f58561d370 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Mon, 25 Sep 2023 11:07:22 -0700 Subject: [PATCH 3/4] merge: implement Default and Extend on MergeBuilder `futures::stream::Stream::collect()` requires a collection that implements `Default` and `Extend`, and I would like to to be able to collect a stream of trees. --- lib/src/merge.rs | 40 +++++++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/lib/src/merge.rs b/lib/src/merge.rs index 102fa4b4ba..334427cf47 100644 --- a/lib/src/merge.rs +++ b/lib/src/merge.rs @@ -277,6 +277,15 @@ pub struct MergeBuilder { adds: Vec, } +impl Default for MergeBuilder { + fn default() -> Self { + Self { + removes: Default::default(), + adds: Default::default(), + } + } +} + impl MergeBuilder { /// Requires that exactly one more "adds" than "removes" have been added to /// this builder. @@ -296,15 +305,23 @@ impl IntoIterator for Merge { impl FromIterator for MergeBuilder { fn from_iter>(iter: I) -> Self { - let mut removes = vec![]; - let mut adds = vec![]; - let mut curr = &mut adds; - let mut next = &mut removes; + let mut builder = MergeBuilder::default(); + builder.extend(iter); + builder + } +} + +impl Extend for MergeBuilder { + fn extend>(&mut self, iter: I) { + let (mut curr, mut next) = if self.adds.len() != self.removes.len() { + (&mut self.removes, &mut self.adds) + } else { + (&mut self.adds, &mut self.removes) + }; for item in iter { curr.push(item); std::mem::swap(&mut curr, &mut next); } - MergeBuilder { removes, adds } } } @@ -789,6 +806,19 @@ mod tests { MergeBuilder::from_iter([1, 2]).build(); } + #[test] + fn test_extend() { + // 1-way merge + let mut builder: MergeBuilder = Default::default(); + builder.extend([1]); + assert_eq!(builder.build(), c(&[], &[1])); + // 5-way merge + let mut builder: MergeBuilder = Default::default(); + builder.extend([1, 2]); + builder.extend([3, 4, 5]); + assert_eq!(builder.build(), c(&[2, 4], &[1, 3, 5])); + } + #[test] fn test_map() { fn increment(i: &i32) -> i32 { From a7844f1f6c8d2cf2e4ae5983596aed19da4d64ed Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Mon, 25 Sep 2023 11:07:22 -0700 Subject: [PATCH 4/4] merged_tree: read before/after trees concurrently I'm going to rewrite `TreeDiffIterator` to fetch one level (depth) of the tree at a time and concurrently. One step towards that is to convert the iterator to a `Stream`. I'd like to do that by making the current `Iterator` implementation call the new `Stream` implementation. However, we can't call `futures::executor::block_on()` on a future that itself calls `futures::executor::block_on()` (as `Store::read_tree()` does), so the first step is to bubble up the async-ness a bit. This patch does that by fetching both sides of the diff concurrently. That should give close to a 2x speedup on high-latency backends. (It doesn't help with our backend at Google, however, because we have a daemon process that does some speculative prefetching that usually downloads the child trees anyway.) --- lib/src/merged_tree.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/lib/src/merged_tree.rs b/lib/src/merged_tree.rs index 38b144e172..c650e98697 100644 --- a/lib/src/merged_tree.rs +++ b/lib/src/merged_tree.rs @@ -20,6 +20,7 @@ use std::iter::zip; use std::sync::Arc; use std::{iter, vec}; +use futures::stream::StreamExt; use itertools::Itertools; use crate::backend::{BackendError, BackendResult, ConflictId, MergedTreeId, TreeId, TreeValue}; @@ -816,17 +817,25 @@ impl<'matcher> TreeDiffIterator<'matcher> { Self { stack, matcher } } - fn single_tree(store: &Arc, dir: &RepoPath, value: Option<&TreeValue>) -> Tree { + async fn single_tree(store: &Arc, dir: &RepoPath, value: Option<&TreeValue>) -> Tree { match value { - Some(TreeValue::Tree(tree_id)) => store.get_tree(dir, tree_id).unwrap(), + Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await.unwrap(), _ => Tree::null(store.clone(), dir.clone()), } } /// Gets the given tree if `value` is a tree, otherwise an empty tree. - fn tree(tree: &MergedTree, dir: &RepoPath, values: &Merge>) -> MergedTree { + async fn tree( + tree: &MergedTree, + dir: &RepoPath, + values: &Merge>, + ) -> MergedTree { let trees = if values.is_tree() { - values.map(|value| Self::single_tree(tree.store(), dir, value.as_ref())) + let builder: MergeBuilder = futures::stream::iter(values.iter()) + .then(|value| Self::single_tree(tree.store(), dir, value.as_ref())) + .collect() + .await; + builder.build() } else { Merge::resolved(Tree::null(tree.store().clone(), dir.clone())) }; @@ -882,8 +891,11 @@ impl Iterator for TreeDiffIterator<'_> { let tree_after = after.is_tree(); let post_subdir = if (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing() { - let before_tree = Self::tree(dir.tree1.as_ref(), &path, &before); - let after_tree = Self::tree(dir.tree2.as_ref(), &path, &after); + let (before_tree, after_tree) = futures::executor::block_on(async { + let before_tree = Self::tree(dir.tree1.as_ref(), &path, &before); + let after_tree = Self::tree(dir.tree2.as_ref(), &path, &after); + futures::join!(before_tree, after_tree) + }); let subdir = TreeDiffDirItem::new(path.clone(), before_tree, after_tree); self.stack.push(TreeDiffItem::Dir(subdir)); self.stack.len() - 1