Skip to content

Commit

Permalink
Removes touch() api from EvictionMap
Browse files Browse the repository at this point in the history
The only reason this existed is for FilesystemStore to make it more
deterministic when starting up from a cold start, however the
overhead of changing the atime on every has() or get() request is
a bit excessive so we remove it to reduce the sys calls.
  • Loading branch information
allada committed Oct 23, 2024
1 parent 6b21200 commit b6cf345
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 165 deletions.
67 changes: 20 additions & 47 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ use std::time::{Duration, SystemTime};
use async_lock::RwLock;
use async_trait::async_trait;
use bytes::BytesMut;
use filetime::{set_file_atime, FileTime};
use futures::stream::{StreamExt, TryStreamExt};
use futures::{Future, TryFutureExt};
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::background_spawn;
use nativelink_util::buf_channel::{
make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
};
use nativelink_util::common::{fs, DigestInfo};
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo};
use nativelink_util::{background_spawn, spawn_blocking};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use tokio::time::{sleep, timeout, Sleep};
use tokio_stream::wrappers::ReadDirStream;
Expand Down Expand Up @@ -320,33 +319,6 @@ impl LenEntry for FileEntryImpl {
self.data_size == 0
}

#[inline]
async fn touch(&self) -> bool {
let result = self
.get_file_path_locked(move |full_content_path| async move {
let full_content_path = full_content_path.clone();
spawn_blocking!("filesystem_touch_set_mtime", move || {
set_file_atime(&full_content_path, FileTime::now()).err_tip(|| {
format!("Failed to touch file in filesystem store {full_content_path:?}")
})
})
.await
.map_err(|e| {
make_err!(
Code::Internal,
"Failed to change atime of file due to spawn failing {:?}",
e
)
})?
})
.await;
if let Err(err) = result {
event!(Level::ERROR, ?err, "Failed to touch file",);
return false;
}
true
}

// unref() only triggers when an item is removed from the eviction_map. It is possible
// that another place in code has a reference to `FileEntryImpl` and may later read the
// file. To support this edge case, we first move the file to a temp file and point
Expand Down Expand Up @@ -404,17 +376,17 @@ const SIMULTANEOUS_METADATA_READS: usize = 200;

async fn add_files_to_cache<Fe: FileEntry>(
evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
anchor_time: &SystemTime,
anchor_time: SystemTime,
shared_context: &Arc<SharedContext>,
block_size: u64,
) -> Result<(), Error> {
async fn process_entry<Fe: FileEntry>(
evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
file_name: &str,
atime: SystemTime,
filetime: SystemTime,
data_size: u64,
block_size: u64,
anchor_time: &SystemTime,
anchor_time: SystemTime,
shared_context: &Arc<SharedContext>,
) -> Result<(), Error> {
let digest = digest_from_filename(file_name)?;
Expand All @@ -429,7 +401,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
}),
);
let time_since_anchor = anchor_time
.duration_since(atime)
.duration_since(filetime)
.map_err(|_| make_input_err!("File access time newer than now"))?;
evicting_map
.insert_with_time(
Expand All @@ -456,32 +428,33 @@ async fn add_files_to_cache<Fe: FileEntry>(
.metadata()
.await
.err_tip(|| "Failed to get metadata in filesystem store")?;
let atime = match metadata.accessed() {
let filetime = match metadata.accessed() {
Ok(atime) => atime,
Err(err) => {
panic!(
"{}{}{} : {} {:?}",
"It appears this filesystem does not support access time. ",
"Please configure this program to run on a drive that supports ",
"atime",
file_name,
err
);
Err(_) => {
if let Ok(mtime) = metadata.modified() {
mtime
} else {
anchor_time
}
}
};
Result::<(String, SystemTime, u64), Error>::Ok((file_name, atime, metadata.len()))
Result::<(String, SystemTime, u64), Error>::Ok((
file_name,
filetime,
metadata.len(),
))
})
.buffer_unordered(SIMULTANEOUS_METADATA_READS)
.try_collect()
.await?
};

file_infos.sort_by(|a, b| a.1.cmp(&b.1));
for (file_name, atime, data_size) in file_infos {
for (file_name, filetime, data_size) in file_infos {
let result = process_entry(
evicting_map,
&file_name,
atime,
filetime,
data_size,
block_size,
anchor_time,
Expand Down Expand Up @@ -571,7 +544,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
} else {
config.block_size
};
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size).await?;
add_files_to_cache(evicting_map.as_ref(), now, &shared_context, block_size).await?;
prune_temp_path(&shared_context.temp_path).await?;

let read_buffer_size = if config.read_buffer_size == 0 {
Expand Down
86 changes: 0 additions & 86 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> LenEntry for TestFileEntry<H
self.inner.as_ref().unwrap().is_empty()
}

async fn touch(&self) -> bool {
self.inner.as_ref().unwrap().touch().await
}

async fn unref(&self) {
Hooks::on_unref(self);
self.inner.as_ref().unwrap().unref().await;
Expand Down Expand Up @@ -604,53 +600,6 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
Ok(())
}

#[serial]
#[nativelink_test]
async fn atime_updates_on_get_part_test() -> Result<(), Error> {
let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?;

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new(&nativelink_config::stores::FilesystemStore {
content_path: make_temp_path("content_path"),
temp_path: make_temp_path("temp_path"),
eviction_policy: None,
..Default::default()
})
.await?,
);
// Insert data into store.
store.update_oneshot(digest1, VALUE1.into()).await?;

let file_entry = store.get_file_entry_for_digest(&digest1).await?;
file_entry
.get_file_path_locked(move |path| async move {
// Set atime to along time ago.
set_file_atime(&path, FileTime::from_system_time(SystemTime::UNIX_EPOCH))?;

// Check to ensure it was set to zero from previous command.
assert_eq!(
fs::metadata(&path).await?.accessed()?,
SystemTime::UNIX_EPOCH
);
Ok(())
})
.await?;

// Now touch digest1.
let data = store.get_part_unchunked(digest1, 0, None).await?;
assert_eq!(data, VALUE1.as_bytes());

file_entry
.get_file_path_locked(move |path| async move {
// Ensure it was updated.
assert!(fs::metadata(&path).await?.accessed()? > SystemTime::UNIX_EPOCH);
Ok(())
})
.await?;

Ok(())
}

#[serial]
#[nativelink_test]
async fn oldest_entry_evicted_with_access_times_loaded_from_disk() -> Result<(), Error> {
Expand Down Expand Up @@ -1233,41 +1182,6 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> {
Ok(())
}

#[serial]
#[nativelink_test]
async fn deleted_file_removed_from_store() -> Result<(), Error> {
let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&nativelink_config::stores::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
);

store.update_oneshot(digest, VALUE1.into()).await?;

let stored_file_path = OsString::from(format!("{content_path}/{digest}"));
std::fs::remove_file(stored_file_path)?;

let digest_result = store
.has(digest)
.await
.err_tip(|| "Failed to execute has")?;
assert!(digest_result.is_none());

Ok(())
}

// Ensure that get_file_size() returns the correct number
// ceil(content length / block_size) * block_size
// assume block size 4K
Expand Down
34 changes: 7 additions & 27 deletions nativelink-util/src/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ pub trait LenEntry: 'static {
/// Returns `true` if `self` has zero length.
fn is_empty(&self) -> bool;

/// Called when an entry is touched. On failure, will remove the entry
/// from the map.
#[inline]
fn touch(&self) -> impl Future<Output = bool> + Send {
std::future::ready(true)
}

/// This will be called when object is removed from map.
/// Note: There may still be a reference to it held somewhere else, which
/// is why it can't be mutable. This is a good place to mark the item
Expand Down Expand Up @@ -86,11 +79,6 @@ impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> {
T::is_empty(self.as_ref())
}

#[inline]
async fn touch(&self) -> bool {
self.as_ref().touch().await
}

#[inline]
async fn unref(&self) {
self.as_ref().unref().await;
Expand Down Expand Up @@ -344,18 +332,17 @@ where
// based on the current time. In such case, we remove the item while
// we are here.
let should_evict = self.should_evict(lru_len, entry, 0, u64::MAX);
if !should_evict && peek {
*result = Some(entry.data.len());
} else if !should_evict && entry.data.touch().await {
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
if !should_evict {
if !peek {
entry.seconds_since_anchor =
self.anchor_time.elapsed().as_secs() as i32;
}
*result = Some(entry.data.len());
} else {
*result = None;
if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) {
if should_evict {
event!(Level::INFO, ?key, "Item expired, evicting");
} else {
event!(Level::INFO, ?key, "Touch failed, evicting");
}
state.remove(key.borrow(), &eviction_item, false).await;
}
Expand All @@ -376,15 +363,8 @@ where

let entry = state.lru.get_mut(key.borrow())?;

if entry.data.touch().await {
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
return Some(entry.data.clone());
}

let (key, eviction_item) = state.lru.pop_entry(key.borrow())?;
event!(Level::INFO, ?key, "Touch failed, evicting");
state.remove(key.borrow(), &eviction_item, false).await;
None
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
return Some(entry.data.clone());
}

/// Returns the replaced item if any.
Expand Down
5 changes: 0 additions & 5 deletions nativelink-util/tests/evicting_map_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,6 @@ async fn unref_called_on_replace() -> Result<(), Error> {
unreachable!("We are not testing this functionality");
}

async fn touch(&self) -> bool {
// Do nothing. We are not testing this functionality.
true
}

async fn unref(&self) {
self.unref_called.store(true, Ordering::Relaxed);
}
Expand Down

0 comments on commit b6cf345

Please sign in to comment.