diff --git a/lib/src/local_working_copy.rs b/lib/src/local_working_copy.rs index 0628df7eb4..ec1cb5e96d 100644 --- a/lib/src/local_working_copy.rs +++ b/lib/src/local_working_copy.rs @@ -37,6 +37,7 @@ use std::slice; use std::sync::mpsc::channel; use std::sync::mpsc::Sender; use std::sync::Arc; +use std::sync::OnceLock; use std::time::UNIX_EPOCH; use futures::StreamExt; @@ -46,6 +47,7 @@ use once_cell::unsync::OnceCell; use pollster::FutureExt; use prost::Message; use rayon::iter::IntoParallelIterator; +use rayon::prelude::IndexedParallelIterator; use rayon::prelude::ParallelIterator; use tempfile::NamedTempFile; use thiserror::Error; @@ -900,6 +902,7 @@ impl TreeState { tree_entries_tx, file_states_tx, present_files_tx, + error: OnceLock::new(), progress, max_new_file_size, conflict_marker_style, @@ -910,7 +913,13 @@ impl TreeState { git_ignore: base_ignores.clone(), file_states: self.file_states.all(), }; - snapshotter.visit_directory(directory_to_visit) + // Here we use scope as a queue of per-directory jobs. + rayon::scope(|scope| { + snapshotter.spawn_ok(scope, |scope| { + snapshotter.visit_directory(directory_to_visit, scope) + }); + }); + snapshotter.into_result() })?; let mut tree_builder = MergedTreeBuilder::new(self.tree_id.clone()); @@ -1033,13 +1042,43 @@ struct FileSnapshotter<'a> { tree_entries_tx: Sender<(RepoPathBuf, MergedTreeValue)>, file_states_tx: Sender<(RepoPathBuf, FileState)>, present_files_tx: Sender, + error: OnceLock, progress: Option<&'a SnapshotProgress<'a>>, max_new_file_size: u64, conflict_marker_style: ConflictMarkerStyle, } impl FileSnapshotter<'_> { - fn visit_directory(&self, directory_to_visit: DirectoryToVisit) -> Result<(), SnapshotError> { + fn spawn_ok<'scope, F>(&'scope self, scope: &rayon::Scope<'scope>, body: F) + where + F: FnOnce(&rayon::Scope<'scope>) -> Result<(), SnapshotError> + Send + 'scope, + { + scope.spawn(|scope| { + if self.error.get().is_some() { + return; + } + match body(scope) { + Ok(()) => {} + Err(err) => self.error.set(err).unwrap_or(()), + }; + }); + } + + /// Extracts the result of the snapshot. + fn into_result(self) -> Result<(), SnapshotError> { + match self.error.into_inner() { + Some(err) => Err(err), + None => Ok(()), + } + } + + /// Visits the directory entries, spawns jobs to recurse into sub + /// directories. + fn visit_directory<'scope>( + &'scope self, + directory_to_visit: DirectoryToVisit<'scope>, + scope: &rayon::Scope<'scope>, + ) -> Result<(), SnapshotError> { let DirectoryToVisit { dir, disk_dir, @@ -1047,10 +1086,6 @@ impl FileSnapshotter<'_> { file_states, } = directory_to_visit; - if self.matcher.visit(&dir).is_nothing() { - return Ok(()); - } - let git_ignore = git_ignore .chain_with_file(&dir.to_internal_dir_string(), disk_dir.join(".gitignore"))?; let dir_entries: Vec<_> = disk_dir @@ -1062,16 +1097,22 @@ impl FileSnapshotter<'_> { })?; dir_entries .into_par_iter() - .try_for_each(|entry| self.process_dir_entry(&dir, &git_ignore, file_states, &entry))?; + // Don't split into too many small jobs. For a small directory, + // sequential scan should be fast enough. + .with_min_len(100) + .try_for_each(|entry| { + self.process_dir_entry(&dir, &git_ignore, file_states, &entry, scope) + })?; Ok(()) } - fn process_dir_entry( - &self, + fn process_dir_entry<'scope>( + &'scope self, dir: &RepoPath, git_ignore: &Arc, - file_states: FileStates<'_>, + file_states: FileStates<'scope>, entry: &DirEntry, + scope: &rayon::Scope<'scope>, ) -> Result<(), SnapshotError> { let file_type = entry.file_type().unwrap(); let file_name = entry.file_name(); @@ -1102,15 +1143,17 @@ impl FileSnapshotter<'_> { // If the whole directory is ignored, visit only paths we're already // tracking. - self.visit_tracked_files(file_states)?; - } else { + self.spawn_ok(scope, move |_| self.visit_tracked_files(file_states)); + } else if !self.matcher.visit(&path).is_nothing() { let directory_to_visit = DirectoryToVisit { dir: path, disk_dir: entry.path(), git_ignore: git_ignore.clone(), file_states, }; - self.visit_directory(directory_to_visit)?; + self.spawn_ok(scope, |scope| { + self.visit_directory(directory_to_visit, scope) + }); } } else if self.matcher.matches(&path) { if let Some(progress) = self.progress { diff --git a/lib/tests/test_local_working_copy.rs b/lib/tests/test_local_working_copy.rs index 5bb5d21a57..3f739923b4 100644 --- a/lib/tests/test_local_working_copy.rs +++ b/lib/tests/test_local_working_copy.rs @@ -2111,4 +2111,21 @@ fn test_snapshot_max_new_file_size() { matches!(err, SnapshotError::NewFileTooLarge { .. }), "the failure should be attributed to new file size" ); + + // A file in sub directory should also be caught + let sub_large_path = RepoPath::from_internal_string("sub/large"); + std::fs::create_dir( + sub_large_path + .parent() + .unwrap() + .to_fs_path_unchecked(&workspace_root), + ) + .unwrap(); + std::fs::rename( + large_path.to_fs_path_unchecked(&workspace_root), + sub_large_path.to_fs_path_unchecked(&workspace_root), + ) + .unwrap(); + let result = test_workspace.snapshot_with_options(&options); + assert_matches!(result, Err(SnapshotError::NewFileTooLarge { .. })); }