From 99d8703d3b4b4cca3c31fecc92e66a0ba4d9852d Mon Sep 17 00:00:00 2001 From: Yuya Nishihara Date: Mon, 2 Dec 2024 17:23:37 +0900 Subject: [PATCH] local_working_copy: spawn snapshot job per directory with file count threshold MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change basically means two things: a. a directory scan isn't split into too many small jobs, and b. a directory scan isn't blocked by recursive visit_directory() calls. Before, small jobs were created at each recursion depth, so there were silent time slice before these jobs started producing work. I don't know if this mitigates the issue #4508, but it's slightly faster on my Linux machine. matcher.visit(dir) is moved to caller because it's silly to spawn an empty job. TreeState::snapshot() already checks that for the root path. Benchmark: 1. original 2. per-directory spawn (this patch) 3. per-directory deleted files (omitted) 4. shorter path comparison (omitted) gecko-dev (~357k files, ~25k dirs) ``` % JJ_CONFIG=/dev/null hyperfine --sort command --warmup 3 --runs 30 .. Benchmark 1: target/release-with-debug/jj-1 -R ~/mirrors/gecko-dev debug snapshot Time (mean ± σ): 764.9 ms ± 16.7 ms [User: 3274.7 ms, System: 2183.3 ms] Range (min … max): 731.9 ms … 814.2 ms 30 runs Benchmark 2: target/release-with-debug/jj-2 -R ~/mirrors/gecko-dev debug snapshot Time (mean ± σ): 710.7 ms ± 9.1 ms [User: 3070.7 ms, System: 2142.6 ms] Range (min … max): 695.9 ms … 740.1 ms 30 runs Relative speed comparison 1.89 ± 0.05 target/release-with-debug/jj-1 -R ~/mirrors/gecko-dev debug snapshot 1.76 ± 0.03 target/release-with-debug/jj-2 -R ~/mirrors/gecko-dev debug snapshot ``` linux (~87k files, ~6k dirs) ``` % JJ_CONFIG=/dev/null hyperfine --sort command --warmup 3 --runs 30 .. Benchmark 1: target/release-with-debug/jj-1 -R ~/mirrors/linux debug snapshot Time (mean ± σ): 268.2 ms ± 11.3 ms [User: 636.6 ms, System: 518.5 ms] Range (min … max): 247.5 ms … 295.2 ms 30 runs Benchmark 2: target/release-with-debug/jj-2 -R ~/mirrors/linux debug snapshot Time (mean ± σ): 242.3 ms ± 3.3 ms [User: 656.8 ms, System: 538.0 ms] Range (min … max): 236.9 ms … 252.3 ms 30 runs Relative speed comparison 1.40 ± 0.06 target/release-with-debug/jj-1 -R ~/mirrors/linux debug snapshot 1.27 ± 0.03 target/release-with-debug/jj-2 -R ~/mirrors/linux debug snapshot ``` nixpkgs (~45k files, ~31k dirs) ``` % JJ_CONFIG=/dev/null hyperfine --sort command --warmup 3 --runs 30 .. Benchmark 1: target/release-with-debug/jj-1 -R ~/mirrors/nixpkgs debug snapshot Time (mean ± σ): 201.0 ms ± 8.5 ms [User: 929.3 ms, System: 917.6 ms] Range (min … max): 170.3 ms … 218.5 ms 30 runs Benchmark 2: target/release-with-debug/jj-2 -R ~/mirrors/nixpkgs debug snapshot Time (mean ± σ): 190.7 ms ± 4.1 ms [User: 859.3 ms, System: 881.1 ms] Range (min … max): 184.6 ms … 202.4 ms 30 runs Relative speed comparison 1.24 ± 0.06 target/release-with-debug/jj-1 -R ~/mirrors/nixpkgs debug snapshot 1.18 ± 0.03 target/release-with-debug/jj-2 -R ~/mirrors/nixpkgs debug snapshot ``` git (~4.5k files, 0.2k dirs) ``` % JJ_CONFIG=/dev/null hyperfine --sort command --warmup 30 --runs 50 .. Benchmark 1: target/release-with-debug/jj-1 -R ~/mirrors/git debug snapshot Time (mean ± σ): 30.3 ms ± 1.1 ms [User: 40.5 ms, System: 39.4 ms] Range (min … max): 28.3 ms … 35.7 ms 50 runs Benchmark 2: target/release-with-debug/jj-2 -R ~/mirrors/git debug snapshot Time (mean ± σ): 30.6 ms ± 1.1 ms [User: 33.8 ms, System: 39.0 ms] Range (min … max): 29.0 ms … 35.0 ms 50 runs Relative speed comparison 1.05 ± 0.05 target/release-with-debug/jj-1 -R ~/mirrors/git debug snapshot 1.06 ± 0.05 target/release-with-debug/jj-2 -R ~/mirrors/git debug snapshot ``` - CPU: 8-core AMD Ryzen 7 PRO 4750U with Radeon Graphics (-MT MCP-) - speed/min/max: 1600/1400/1700 MHz Kernel: 6.11.10-amd64 x86_64 - Filesystem: ext4 --- lib/src/local_working_copy.rs | 69 ++++++++++++++++++++++------ lib/tests/test_local_working_copy.rs | 17 +++++++ 2 files changed, 73 insertions(+), 13 deletions(-) diff --git a/lib/src/local_working_copy.rs b/lib/src/local_working_copy.rs index 0628df7eb4..ec1cb5e96d 100644 --- a/lib/src/local_working_copy.rs +++ b/lib/src/local_working_copy.rs @@ -37,6 +37,7 @@ use std::slice; use std::sync::mpsc::channel; use std::sync::mpsc::Sender; use std::sync::Arc; +use std::sync::OnceLock; use std::time::UNIX_EPOCH; use futures::StreamExt; @@ -46,6 +47,7 @@ use once_cell::unsync::OnceCell; use pollster::FutureExt; use prost::Message; use rayon::iter::IntoParallelIterator; +use rayon::prelude::IndexedParallelIterator; use rayon::prelude::ParallelIterator; use tempfile::NamedTempFile; use thiserror::Error; @@ -900,6 +902,7 @@ impl TreeState { tree_entries_tx, file_states_tx, present_files_tx, + error: OnceLock::new(), progress, max_new_file_size, conflict_marker_style, @@ -910,7 +913,13 @@ impl TreeState { git_ignore: base_ignores.clone(), file_states: self.file_states.all(), }; - snapshotter.visit_directory(directory_to_visit) + // Here we use scope as a queue of per-directory jobs. + rayon::scope(|scope| { + snapshotter.spawn_ok(scope, |scope| { + snapshotter.visit_directory(directory_to_visit, scope) + }); + }); + snapshotter.into_result() })?; let mut tree_builder = MergedTreeBuilder::new(self.tree_id.clone()); @@ -1033,13 +1042,43 @@ struct FileSnapshotter<'a> { tree_entries_tx: Sender<(RepoPathBuf, MergedTreeValue)>, file_states_tx: Sender<(RepoPathBuf, FileState)>, present_files_tx: Sender, + error: OnceLock, progress: Option<&'a SnapshotProgress<'a>>, max_new_file_size: u64, conflict_marker_style: ConflictMarkerStyle, } impl FileSnapshotter<'_> { - fn visit_directory(&self, directory_to_visit: DirectoryToVisit) -> Result<(), SnapshotError> { + fn spawn_ok<'scope, F>(&'scope self, scope: &rayon::Scope<'scope>, body: F) + where + F: FnOnce(&rayon::Scope<'scope>) -> Result<(), SnapshotError> + Send + 'scope, + { + scope.spawn(|scope| { + if self.error.get().is_some() { + return; + } + match body(scope) { + Ok(()) => {} + Err(err) => self.error.set(err).unwrap_or(()), + }; + }); + } + + /// Extracts the result of the snapshot. + fn into_result(self) -> Result<(), SnapshotError> { + match self.error.into_inner() { + Some(err) => Err(err), + None => Ok(()), + } + } + + /// Visits the directory entries, spawns jobs to recurse into sub + /// directories. + fn visit_directory<'scope>( + &'scope self, + directory_to_visit: DirectoryToVisit<'scope>, + scope: &rayon::Scope<'scope>, + ) -> Result<(), SnapshotError> { let DirectoryToVisit { dir, disk_dir, @@ -1047,10 +1086,6 @@ impl FileSnapshotter<'_> { file_states, } = directory_to_visit; - if self.matcher.visit(&dir).is_nothing() { - return Ok(()); - } - let git_ignore = git_ignore .chain_with_file(&dir.to_internal_dir_string(), disk_dir.join(".gitignore"))?; let dir_entries: Vec<_> = disk_dir @@ -1062,16 +1097,22 @@ impl FileSnapshotter<'_> { })?; dir_entries .into_par_iter() - .try_for_each(|entry| self.process_dir_entry(&dir, &git_ignore, file_states, &entry))?; + // Don't split into too many small jobs. For a small directory, + // sequential scan should be fast enough. + .with_min_len(100) + .try_for_each(|entry| { + self.process_dir_entry(&dir, &git_ignore, file_states, &entry, scope) + })?; Ok(()) } - fn process_dir_entry( - &self, + fn process_dir_entry<'scope>( + &'scope self, dir: &RepoPath, git_ignore: &Arc, - file_states: FileStates<'_>, + file_states: FileStates<'scope>, entry: &DirEntry, + scope: &rayon::Scope<'scope>, ) -> Result<(), SnapshotError> { let file_type = entry.file_type().unwrap(); let file_name = entry.file_name(); @@ -1102,15 +1143,17 @@ impl FileSnapshotter<'_> { // If the whole directory is ignored, visit only paths we're already // tracking. - self.visit_tracked_files(file_states)?; - } else { + self.spawn_ok(scope, move |_| self.visit_tracked_files(file_states)); + } else if !self.matcher.visit(&path).is_nothing() { let directory_to_visit = DirectoryToVisit { dir: path, disk_dir: entry.path(), git_ignore: git_ignore.clone(), file_states, }; - self.visit_directory(directory_to_visit)?; + self.spawn_ok(scope, |scope| { + self.visit_directory(directory_to_visit, scope) + }); } } else if self.matcher.matches(&path) { if let Some(progress) = self.progress { diff --git a/lib/tests/test_local_working_copy.rs b/lib/tests/test_local_working_copy.rs index 5bb5d21a57..3f739923b4 100644 --- a/lib/tests/test_local_working_copy.rs +++ b/lib/tests/test_local_working_copy.rs @@ -2111,4 +2111,21 @@ fn test_snapshot_max_new_file_size() { matches!(err, SnapshotError::NewFileTooLarge { .. }), "the failure should be attributed to new file size" ); + + // A file in sub directory should also be caught + let sub_large_path = RepoPath::from_internal_string("sub/large"); + std::fs::create_dir( + sub_large_path + .parent() + .unwrap() + .to_fs_path_unchecked(&workspace_root), + ) + .unwrap(); + std::fs::rename( + large_path.to_fs_path_unchecked(&workspace_root), + sub_large_path.to_fs_path_unchecked(&workspace_root), + ) + .unwrap(); + let result = test_workspace.snapshot_with_options(&options); + assert_matches!(result, Err(SnapshotError::NewFileTooLarge { .. })); }