diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 52919c2fe..a233474f4 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -437,25 +437,27 @@ async fn prune_temp_path(temp_path: &str) -> Result<(), Error> { pub struct FilesystemStore { shared_context: Arc, - evicting_map: EvictingMap, SystemTime>, + evicting_map: Arc, SystemTime>>, read_buffer_size: usize, sleep_fn: fn(Duration) -> Sleep, + rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>, } impl FilesystemStore { pub async fn new(config: &nativelink_config::stores::FilesystemStore) -> Result { - Self::new_with_timeout(config, sleep).await + Self::new_with_timeout_and_rename_fn(config, sleep, |from, to| std::fs::rename(from, to)).await } - pub async fn new_with_timeout( + pub async fn new_with_timeout_and_rename_fn( config: &nativelink_config::stores::FilesystemStore, sleep_fn: fn(Duration) -> Sleep, + rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>, ) -> Result { let now = SystemTime::now(); let empty_policy = nativelink_config::stores::EvictionPolicy::default(); let eviction_policy = config.eviction_policy.as_ref().unwrap_or(&empty_policy); - let evicting_map = EvictingMap::new(eviction_policy, now); + let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now)); fs::create_dir_all(&config.temp_path) .await @@ -469,7 +471,7 @@ impl FilesystemStore { temp_path: config.temp_path.clone(), content_path: config.content_path.clone(), }); - add_files_to_cache(&evicting_map, &now, &shared_context).await?; + add_files_to_cache(evicting_map.as_ref(), &now, &shared_context).await?; prune_temp_path(&shared_context.temp_path).await?; let read_buffer_size = if config.read_buffer_size == 0 { @@ -482,6 +484,7 @@ impl FilesystemStore { evicting_map, read_buffer_size, sleep_fn, + rename_fn, }; Ok(store) } @@ -561,7 +564,12 @@ impl FilesystemStore { // 5. Move the file into place. Since we hold a write lock still anyone that gets our new // FileEntry (which has not yet been placed on disk) will not be able to read the file's // contents until we relese the lock. - { + let evicting_map = self.evicting_map.clone(); + let rename_fn = self.rename_fn; + + // We need to guarantee that this will get to the end even if the parent future is dropped. + // See: https://github.com/TraceMachina/nativelink/issues/495 + tokio::spawn(async move { let mut encoded_file_path = entry.get_encoded_file_path().write().await; let final_path = get_file_path_raw( &PathType::Content, @@ -569,24 +577,29 @@ impl FilesystemStore { &final_digest, ); - self.evicting_map.insert(final_digest, entry.clone()).await; + evicting_map.insert(final_digest, entry.clone()).await; - let result = fs::rename(encoded_file_path.get_file_path(), &final_path) - .await - .err_tip(|| format!("Failed to rename temp file to final path {:?}", final_path)); + let from_path = encoded_file_path.get_file_path(); + // Internally tokio spawns fs commands onto a blocking thread anyways. + // Since we are already on a blocking thread, we just need the `fs` wrapper to manage + // an open-file permit (ensure we don't open too many files at once). + let result = fs::call_with_permit(|| { + (rename_fn)(&from_path, &final_path) + .err_tip(|| format!("Failed to rename temp file to final path {final_path:?}")) + }).await; // In the event our move from temp file to final file fails we need to ensure we remove // the entry from our map. // Remember: At this point it is possible for another thread to have a reference to // `entry`, so we can't delete the file, only drop() should ever delete files. if let Err(err) = result { - warn!("{}", err); + warn!("Error while renaming file: {err} - {from_path:?} -> {final_path:?}"); // Warning: To prevent deadlock we need to release our lock or during `remove_if()` // it will call `unref()`, which triggers a write-lock on `encoded_file_path`. drop(encoded_file_path); // It is possible that the item in our map is no longer the item we inserted, // So, we need to conditionally remove it only if the pointers are the same. - self.evicting_map + evicting_map .remove_if(&final_digest, |map_entry| Arc::::ptr_eq(map_entry, &entry)) .await; return Err(err); @@ -594,7 +607,9 @@ impl FilesystemStore { encoded_file_path.path_type = PathType::Content; encoded_file_path.digest = final_digest; Ok(()) - } + }) + .await + .err_tip(|| "Failed to create spawn in filesystem store update_file")? } } @@ -756,6 +771,6 @@ impl MetricsComponent for FilesystemStore { &self.shared_context.content_path, "Path to the configured content path", ); - c.publish("evicting_map", &self.evicting_map, ""); + c.publish("evicting_map", self.evicting_map.as_ref(), ""); } } diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 9743827ec..fc93cd90a 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -21,7 +21,7 @@ use std::ops::DerefMut; use std::path::Path; use std::pin::Pin; use std::rc::Rc; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; @@ -31,7 +31,7 @@ use bytes::Bytes; use filetime::{set_file_atime, FileTime}; use futures::executor::block_on; use futures::task::Poll; -use futures::{poll, Future}; +use futures::{poll, Future, FutureExt}; use nativelink_error::{Code, Error, ResultExt}; use nativelink_store::filesystem_store::{ digest_from_filename, EncodedFilePath, FileEntry, FileEntryImpl, FilesystemStore, @@ -876,7 +876,7 @@ mod filesystem_store_tests { let temp_path = make_temp_path("temp_path"); let store = Arc::new( - FilesystemStore::::new_with_timeout( + FilesystemStore::::new_with_timeout_and_rename_fn( &nativelink_config::stores::FilesystemStore { content_path: content_path.clone(), temp_path: temp_path.clone(), @@ -884,6 +884,7 @@ mod filesystem_store_tests { ..Default::default() }, |_| sleep(Duration::ZERO), + |from, to| std::fs::rename(from, to), ) .await?, ); @@ -922,7 +923,7 @@ mod filesystem_store_tests { let temp_path = make_temp_path("temp_path"); let store = Arc::new( - FilesystemStore::::new_with_timeout( + FilesystemStore::::new_with_timeout_and_rename_fn( &nativelink_config::stores::FilesystemStore { content_path: content_path.clone(), temp_path: temp_path.clone(), @@ -930,6 +931,7 @@ mod filesystem_store_tests { ..Default::default() }, |_| sleep(Duration::ZERO), + |from, to| std::fs::rename(from, to), ) .await?, ); @@ -964,7 +966,7 @@ mod filesystem_store_tests { let temp_path = make_temp_path("temp_path"); let store = Arc::new( - FilesystemStore::::new_with_timeout( + FilesystemStore::::new_with_timeout_and_rename_fn( &nativelink_config::stores::FilesystemStore { content_path: content_path.clone(), temp_path: temp_path.clone(), @@ -972,6 +974,7 @@ mod filesystem_store_tests { ..Default::default() }, |_| sleep(Duration::ZERO), + |from, to| std::fs::rename(from, to), ) .await?, ); @@ -1016,4 +1019,90 @@ mod filesystem_store_tests { Ok(()) } + + /// Regression test for: https://github.com/TraceMachina/nativelink/issues/495. + #[tokio::test(flavor = "multi_thread")] + async fn update_file_future_drops_before_rename() -> Result<(), Error> { + let digest = DigestInfo::try_new(HASH1, VALUE1.len())?; + + // Mutex can be used to signal to the rename function to pause execution. + static RENAME_REQUEST_PAUSE_MUX: Mutex<()> = Mutex::new(()); + // Boolean used to know if the rename function is currently paused. + static RENAME_IS_PAUSED: AtomicBool = AtomicBool::new(false); + + let content_path = make_temp_path("content_path"); + let store = Arc::pin( + FilesystemStore::::new_with_timeout_and_rename_fn( + &nativelink_config::stores::FilesystemStore { + content_path: content_path.clone(), + temp_path: make_temp_path("temp_path"), + eviction_policy: None, + ..Default::default() + }, + |_| sleep(Duration::ZERO), + |_, _| { + // If someone locked our mutex, it means we need to pause, so we + // simply request a lock on the same mutex. + if RENAME_REQUEST_PAUSE_MUX.try_lock().is_err() { + RENAME_IS_PAUSED.store(true, Ordering::Release); + let _lock = RENAME_REQUEST_PAUSE_MUX.lock(); + RENAME_IS_PAUSED.store(false, Ordering::Release); + } + Ok(()) + }, + ) + .await?, + ); + + // Populate our first store entry. + let first_file_entry = { + store.as_ref().update_oneshot(digest, VALUE1.into()).await?; + store.get_file_entry_for_digest(&digest).await? + }; + + // 1. Request the next rename function to block. + // 2. Request to replace our data. + // 3. When we are certain that our rename function is paused, drop + // the replace/update future. + // 4. Then drop the lock. + { + let rename_pause_request_lock = RENAME_REQUEST_PAUSE_MUX.lock(); + let mut update_fut = store.as_ref().update_oneshot(digest, VALUE2.into()).boxed(); + + loop { + // Try to advance our update future. + assert_eq!(poll!(&mut update_fut), Poll::Pending); + + // Once we are sure the rename fuction is paused break. + if RENAME_IS_PAUSED.load(Ordering::Acquire) { + break; + } + // Give a little time for background/kernel threads to run. + sleep(Duration::from_millis(1)).await; + } + // Writing these out explicitly so users know this is what we are testing. + // Note: The order they are dropped matters. + drop(update_fut); + drop(rename_pause_request_lock); + } + // Grab the newly inserted item in our store. + let new_file_entry = store.get_file_entry_for_digest(&digest).await?; + assert!( + !Arc::ptr_eq(&first_file_entry, &new_file_entry), + "Expected file entries to not be the same" + ); + + // Ensure the entry we inserted was properly flagged as moved (from temp -> content dir). + new_file_entry + .get_file_path_locked(move |file_path| async move { + assert_eq!( + file_path, + OsString::from(format!("{}/{}-{}", content_path, digest.hash_str(), digest.size_bytes)) + ); + Ok(()) + }) + .await?; + + Ok(()) + } } diff --git a/nativelink-util/src/fs.rs b/nativelink-util/src/fs.rs index 9a8e8a286..dcf5850c3 100644 --- a/nativelink-util/src/fs.rs +++ b/nativelink-util/src/fs.rs @@ -241,6 +241,16 @@ const DEFAULT_OPEN_FILE_PERMITS: usize = 10; static TOTAL_FILE_SEMAPHORES: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_PERMITS); pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_PERMITS); +/// Acquire a permit from the open file semaphore and call a raw function. +#[inline] +pub async fn call_with_permit(func: impl FnOnce() -> Result) -> Result { + let _permit = OPEN_FILE_SEMAPHORE + .acquire() + .await + .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e))?; + (func)() +} + pub fn set_open_file_limit(limit: usize) { let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire); if limit < current_total {