Skip to content

Commit

Permalink
Fix case where filesystem store future dropping causes issues
Browse files Browse the repository at this point in the history
In a rare case where a file is uploaded, but while the rename()
function is called the future is dropped the filesystem store can
get into a bad state.

fixes: #495
  • Loading branch information
allada committed Jan 17, 2024
1 parent 6a379b3 commit 4fd99b6
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 19 deletions.
43 changes: 29 additions & 14 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,25 +437,27 @@ async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {

pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
shared_context: Arc<SharedContext>,
evicting_map: EvictingMap<Arc<Fe>, SystemTime>,
evicting_map: Arc<EvictingMap<Arc<Fe>, SystemTime>>,
read_buffer_size: usize,
sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>,
}

impl<Fe: FileEntry> FilesystemStore<Fe> {
pub async fn new(config: &nativelink_config::stores::FilesystemStore) -> Result<Self, Error> {
Self::new_with_timeout(config, sleep).await
Self::new_with_timeout_and_rename_fn(config, sleep, |from, to| std::fs::rename(from, to)).await
}

pub async fn new_with_timeout(
pub async fn new_with_timeout_and_rename_fn(
config: &nativelink_config::stores::FilesystemStore,
sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>,
) -> Result<Self, Error> {
let now = SystemTime::now();

let empty_policy = nativelink_config::stores::EvictionPolicy::default();
let eviction_policy = config.eviction_policy.as_ref().unwrap_or(&empty_policy);
let evicting_map = EvictingMap::new(eviction_policy, now);
let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));

fs::create_dir_all(&config.temp_path)
.await
Expand All @@ -469,7 +471,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
temp_path: config.temp_path.clone(),
content_path: config.content_path.clone(),
});
add_files_to_cache(&evicting_map, &now, &shared_context).await?;
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context).await?;
prune_temp_path(&shared_context.temp_path).await?;

let read_buffer_size = if config.read_buffer_size == 0 {
Expand All @@ -482,6 +484,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
evicting_map,
read_buffer_size,
sleep_fn,
rename_fn,
};
Ok(store)
}
Expand Down Expand Up @@ -561,40 +564,52 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
// 5. Move the file into place. Since we hold a write lock still anyone that gets our new
// FileEntry (which has not yet been placed on disk) will not be able to read the file's
// contents until we relese the lock.
{
let evicting_map = self.evicting_map.clone();
let rename_fn = self.rename_fn;

// We need to guarantee that this will get to the end even if the parent future is dropped.
// See: https://github.com/TraceMachina/nativelink/issues/495
tokio::spawn(async move {
let mut encoded_file_path = entry.get_encoded_file_path().write().await;
let final_path = get_file_path_raw(
&PathType::Content,
encoded_file_path.shared_context.as_ref(),
&final_digest,
);

self.evicting_map.insert(final_digest, entry.clone()).await;
evicting_map.insert(final_digest, entry.clone()).await;

let result = fs::rename(encoded_file_path.get_file_path(), &final_path)
.await
.err_tip(|| format!("Failed to rename temp file to final path {:?}", final_path));
let from_path = encoded_file_path.get_file_path();
// Internally tokio spawns fs commands onto a blocking thread anyways.
// Since we are already on a blocking thread, we just need the `fs` wrapper to manage
// an open-file permit (ensure we don't open too many files at once).
let result = fs::call_with_permit(|| {
(rename_fn)(&from_path, &final_path)
.err_tip(|| format!("Failed to rename temp file to final path {final_path:?}"))
}).await;

// In the event our move from temp file to final file fails we need to ensure we remove
// the entry from our map.
// Remember: At this point it is possible for another thread to have a reference to
// `entry`, so we can't delete the file, only drop() should ever delete files.
if let Err(err) = result {
warn!("{}", err);
warn!("Error while renaming file: {err} - {from_path:?} -> {final_path:?}");
// Warning: To prevent deadlock we need to release our lock or during `remove_if()`
// it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
drop(encoded_file_path);
// It is possible that the item in our map is no longer the item we inserted,
// So, we need to conditionally remove it only if the pointers are the same.
self.evicting_map
evicting_map
.remove_if(&final_digest, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
.await;
return Err(err);
}
encoded_file_path.path_type = PathType::Content;
encoded_file_path.digest = final_digest;
Ok(())
}
})
.await
.err_tip(|| "Failed to create spawn in filesystem store update_file")?
}
}

Expand Down Expand Up @@ -756,6 +771,6 @@ impl<Fe: FileEntry> MetricsComponent for FilesystemStore<Fe> {
&self.shared_context.content_path,
"Path to the configured content path",
);
c.publish("evicting_map", &self.evicting_map, "");
c.publish("evicting_map", self.evicting_map.as_ref(), "");
}
}
99 changes: 94 additions & 5 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::ops::DerefMut;
use std::path::Path;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};

Expand All @@ -31,7 +31,7 @@ use bytes::Bytes;
use filetime::{set_file_atime, FileTime};
use futures::executor::block_on;
use futures::task::Poll;
use futures::{poll, Future};
use futures::{poll, Future, FutureExt};
use nativelink_error::{Code, Error, ResultExt};
use nativelink_store::filesystem_store::{
digest_from_filename, EncodedFilePath, FileEntry, FileEntryImpl, FilesystemStore,
Expand Down Expand Up @@ -876,14 +876,15 @@ mod filesystem_store_tests {
let temp_path = make_temp_path("temp_path");

let store = Arc::new(
FilesystemStore::<FileEntryImpl>::new_with_timeout(
FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&nativelink_config::stores::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
);
Expand Down Expand Up @@ -922,14 +923,15 @@ mod filesystem_store_tests {
let temp_path = make_temp_path("temp_path");

let store = Arc::new(
FilesystemStore::<FileEntryImpl>::new_with_timeout(
FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&nativelink_config::stores::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
);
Expand Down Expand Up @@ -964,14 +966,15 @@ mod filesystem_store_tests {
let temp_path = make_temp_path("temp_path");

let store = Arc::new(
FilesystemStore::<FileEntryImpl>::new_with_timeout(
FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&nativelink_config::stores::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
);
Expand Down Expand Up @@ -1016,4 +1019,90 @@ mod filesystem_store_tests {

Ok(())
}

/// Regression test for: https://github.com/TraceMachina/nativelink/issues/495.
#[tokio::test(flavor = "multi_thread")]
async fn update_file_future_drops_before_rename() -> Result<(), Error> {
let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;

// Mutex can be used to signal to the rename function to pause execution.
static RENAME_REQUEST_PAUSE_MUX: Mutex<()> = Mutex::new(());
// Boolean used to know if the rename function is currently paused.
static RENAME_IS_PAUSED: AtomicBool = AtomicBool::new(false);

let content_path = make_temp_path("content_path");
let store = Arc::pin(
FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&nativelink_config::stores::FilesystemStore {
content_path: content_path.clone(),
temp_path: make_temp_path("temp_path"),
eviction_policy: None,
..Default::default()
},
|_| sleep(Duration::ZERO),
|_, _| {
// If someone locked our mutex, it means we need to pause, so we
// simply request a lock on the same mutex.
if RENAME_REQUEST_PAUSE_MUX.try_lock().is_err() {
RENAME_IS_PAUSED.store(true, Ordering::Release);
let _lock = RENAME_REQUEST_PAUSE_MUX.lock();
RENAME_IS_PAUSED.store(false, Ordering::Release);
}
Ok(())
},
)
.await?,
);

// Populate our first store entry.
let first_file_entry = {
store.as_ref().update_oneshot(digest, VALUE1.into()).await?;
store.get_file_entry_for_digest(&digest).await?
};

// 1. Request the next rename function to block.
// 2. Request to replace our data.
// 3. When we are certain that our rename function is paused, drop
// the replace/update future.
// 4. Then drop the lock.
{
let rename_pause_request_lock = RENAME_REQUEST_PAUSE_MUX.lock();
let mut update_fut = store.as_ref().update_oneshot(digest, VALUE2.into()).boxed();

loop {
// Try to advance our update future.
assert_eq!(poll!(&mut update_fut), Poll::Pending);

// Once we are sure the rename fuction is paused break.
if RENAME_IS_PAUSED.load(Ordering::Acquire) {
break;
}
// Give a little time for background/kernel threads to run.
sleep(Duration::from_millis(1)).await;
}
// Writing these out explicitly so users know this is what we are testing.
// Note: The order they are dropped matters.
drop(update_fut);
drop(rename_pause_request_lock);
}
// Grab the newly inserted item in our store.
let new_file_entry = store.get_file_entry_for_digest(&digest).await?;
assert!(
!Arc::ptr_eq(&first_file_entry, &new_file_entry),
"Expected file entries to not be the same"
);

// Ensure the entry we inserted was properly flagged as moved (from temp -> content dir).
new_file_entry
.get_file_path_locked(move |file_path| async move {
assert_eq!(
file_path,
OsString::from(format!("{}/{}-{}", content_path, digest.hash_str(), digest.size_bytes))
);
Ok(())
})
.await?;

Ok(())
}
}
10 changes: 10 additions & 0 deletions nativelink-util/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,16 @@ const DEFAULT_OPEN_FILE_PERMITS: usize = 10;
static TOTAL_FILE_SEMAPHORES: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_PERMITS);
pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_PERMITS);

/// Acquire a permit from the open file semaphore and call a raw function.
#[inline]
pub async fn call_with_permit<R>(func: impl FnOnce() -> Result<R, Error>) -> Result<R, Error> {
let _permit = OPEN_FILE_SEMAPHORE
.acquire()
.await
.map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e))?;
(func)()
}

pub fn set_open_file_limit(limit: usize) {
let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire);
if limit < current_total {
Expand Down

0 comments on commit 4fd99b6

Please sign in to comment.