Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes touch() api from EvictionMap #1428

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 20 additions & 47 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ use std::time::{Duration, SystemTime};
use async_lock::RwLock;
use async_trait::async_trait;
use bytes::BytesMut;
use filetime::{set_file_atime, FileTime};
use futures::stream::{StreamExt, TryStreamExt};
use futures::{Future, TryFutureExt};
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::background_spawn;
use nativelink_util::buf_channel::{
make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
};
use nativelink_util::common::{fs, DigestInfo};
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo};
use nativelink_util::{background_spawn, spawn_blocking};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use tokio::time::{sleep, timeout, Sleep};
use tokio_stream::wrappers::ReadDirStream;
Expand Down Expand Up @@ -320,33 +319,6 @@ impl LenEntry for FileEntryImpl {
self.data_size == 0
}

#[inline]
async fn touch(&self) -> bool {
let result = self
.get_file_path_locked(move |full_content_path| async move {
let full_content_path = full_content_path.clone();
spawn_blocking!("filesystem_touch_set_mtime", move || {
set_file_atime(&full_content_path, FileTime::now()).err_tip(|| {
format!("Failed to touch file in filesystem store {full_content_path:?}")
})
})
.await
.map_err(|e| {
make_err!(
Code::Internal,
"Failed to change atime of file due to spawn failing {:?}",
e
)
})?
})
.await;
if let Err(err) = result {
event!(Level::ERROR, ?err, "Failed to touch file",);
return false;
}
true
}

// unref() only triggers when an item is removed from the eviction_map. It is possible
// that another place in code has a reference to `FileEntryImpl` and may later read the
// file. To support this edge case, we first move the file to a temp file and point
Expand Down Expand Up @@ -404,17 +376,17 @@ const SIMULTANEOUS_METADATA_READS: usize = 200;

async fn add_files_to_cache<Fe: FileEntry>(
evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
anchor_time: &SystemTime,
anchor_time: SystemTime,
shared_context: &Arc<SharedContext>,
block_size: u64,
) -> Result<(), Error> {
async fn process_entry<Fe: FileEntry>(
evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
file_name: &str,
atime: SystemTime,
filetime: SystemTime,
data_size: u64,
block_size: u64,
anchor_time: &SystemTime,
anchor_time: SystemTime,
shared_context: &Arc<SharedContext>,
) -> Result<(), Error> {
let digest = digest_from_filename(file_name)?;
Expand All @@ -429,7 +401,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
}),
);
let time_since_anchor = anchor_time
.duration_since(atime)
.duration_since(filetime)
.map_err(|_| make_input_err!("File access time newer than now"))?;
evicting_map
.insert_with_time(
Expand All @@ -456,32 +428,33 @@ async fn add_files_to_cache<Fe: FileEntry>(
.metadata()
.await
.err_tip(|| "Failed to get metadata in filesystem store")?;
let atime = match metadata.accessed() {
let filetime = match metadata.accessed() {
Ok(atime) => atime,
Err(err) => {
panic!(
"{}{}{} : {} {:?}",
"It appears this filesystem does not support access time. ",
"Please configure this program to run on a drive that supports ",
"atime",
file_name,
err
);
Err(_) => {
if let Ok(mtime) = metadata.modified() {
mtime
} else {
anchor_time
}
}
};
Result::<(String, SystemTime, u64), Error>::Ok((file_name, atime, metadata.len()))
Result::<(String, SystemTime, u64), Error>::Ok((
file_name,
filetime,
metadata.len(),
))
})
.buffer_unordered(SIMULTANEOUS_METADATA_READS)
.try_collect()
.await?
};

file_infos.sort_by(|a, b| a.1.cmp(&b.1));
for (file_name, atime, data_size) in file_infos {
for (file_name, filetime, data_size) in file_infos {
let result = process_entry(
evicting_map,
&file_name,
atime,
filetime,
data_size,
block_size,
anchor_time,
Expand Down Expand Up @@ -571,7 +544,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
} else {
config.block_size
};
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size).await?;
add_files_to_cache(evicting_map.as_ref(), now, &shared_context, block_size).await?;
prune_temp_path(&shared_context.temp_path).await?;

let read_buffer_size = if config.read_buffer_size == 0 {
Expand Down
86 changes: 0 additions & 86 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> LenEntry for TestFileEntry<H
self.inner.as_ref().unwrap().is_empty()
}

async fn touch(&self) -> bool {
self.inner.as_ref().unwrap().touch().await
}

async fn unref(&self) {
Hooks::on_unref(self);
self.inner.as_ref().unwrap().unref().await;
Expand Down Expand Up @@ -604,53 +600,6 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
Ok(())
}

#[serial]
#[nativelink_test]
async fn atime_updates_on_get_part_test() -> Result<(), Error> {
let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?;

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new(&nativelink_config::stores::FilesystemStore {
content_path: make_temp_path("content_path"),
temp_path: make_temp_path("temp_path"),
eviction_policy: None,
..Default::default()
})
.await?,
);
// Insert data into store.
store.update_oneshot(digest1, VALUE1.into()).await?;

let file_entry = store.get_file_entry_for_digest(&digest1).await?;
file_entry
.get_file_path_locked(move |path| async move {
// Set atime to along time ago.
set_file_atime(&path, FileTime::from_system_time(SystemTime::UNIX_EPOCH))?;

// Check to ensure it was set to zero from previous command.
assert_eq!(
fs::metadata(&path).await?.accessed()?,
SystemTime::UNIX_EPOCH
);
Ok(())
})
.await?;

// Now touch digest1.
let data = store.get_part_unchunked(digest1, 0, None).await?;
assert_eq!(data, VALUE1.as_bytes());

file_entry
.get_file_path_locked(move |path| async move {
// Ensure it was updated.
assert!(fs::metadata(&path).await?.accessed()? > SystemTime::UNIX_EPOCH);
Ok(())
})
.await?;

Ok(())
}

#[serial]
#[nativelink_test]
async fn oldest_entry_evicted_with_access_times_loaded_from_disk() -> Result<(), Error> {
Expand Down Expand Up @@ -1233,41 +1182,6 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> {
Ok(())
}

#[serial]
#[nativelink_test]
async fn deleted_file_removed_from_store() -> Result<(), Error> {
let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new_with_timeout_and_rename_fn(
&nativelink_config::stores::FilesystemStore {
content_path: content_path.clone(),
temp_path: temp_path.clone(),
read_buffer_size: 1,
..Default::default()
},
|_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
);

store.update_oneshot(digest, VALUE1.into()).await?;

let stored_file_path = OsString::from(format!("{content_path}/{digest}"));
std::fs::remove_file(stored_file_path)?;

let digest_result = store
.has(digest)
.await
.err_tip(|| "Failed to execute has")?;
assert!(digest_result.is_none());

Ok(())
}

// Ensure that get_file_size() returns the correct number
// ceil(content length / block_size) * block_size
// assume block size 4K
Expand Down
34 changes: 7 additions & 27 deletions nativelink-util/src/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ pub trait LenEntry: 'static {
/// Returns `true` if `self` has zero length.
fn is_empty(&self) -> bool;

/// Called when an entry is touched. On failure, will remove the entry
/// from the map.
#[inline]
fn touch(&self) -> impl Future<Output = bool> + Send {
std::future::ready(true)
}

/// This will be called when object is removed from map.
/// Note: There may still be a reference to it held somewhere else, which
/// is why it can't be mutable. This is a good place to mark the item
Expand Down Expand Up @@ -86,11 +79,6 @@ impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> {
T::is_empty(self.as_ref())
}

#[inline]
async fn touch(&self) -> bool {
self.as_ref().touch().await
}

#[inline]
async fn unref(&self) {
self.as_ref().unref().await;
Expand Down Expand Up @@ -344,18 +332,17 @@ where
// based on the current time. In such case, we remove the item while
// we are here.
let should_evict = self.should_evict(lru_len, entry, 0, u64::MAX);
if !should_evict && peek {
*result = Some(entry.data.len());
} else if !should_evict && entry.data.touch().await {
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
if !should_evict {
if !peek {
entry.seconds_since_anchor =
self.anchor_time.elapsed().as_secs() as i32;
}
*result = Some(entry.data.len());
} else {
*result = None;
if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) {
if should_evict {
event!(Level::INFO, ?key, "Item expired, evicting");
} else {
event!(Level::INFO, ?key, "Touch failed, evicting");
}
state.remove(key.borrow(), &eviction_item, false).await;
}
Expand All @@ -376,15 +363,8 @@ where

let entry = state.lru.get_mut(key.borrow())?;

if entry.data.touch().await {
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
return Some(entry.data.clone());
}

let (key, eviction_item) = state.lru.pop_entry(key.borrow())?;
event!(Level::INFO, ?key, "Touch failed, evicting");
state.remove(key.borrow(), &eviction_item, false).await;
None
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
Some(entry.data.clone())
}

/// Returns the replaced item if any.
Expand Down
5 changes: 0 additions & 5 deletions nativelink-util/tests/evicting_map_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,6 @@ async fn unref_called_on_replace() -> Result<(), Error> {
unreachable!("We are not testing this functionality");
}

async fn touch(&self) -> bool {
// Do nothing. We are not testing this functionality.
true
}

async fn unref(&self) {
self.unref_called.store(true, Ordering::Relaxed);
}
Expand Down
Loading