diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 924edca79..0da5d5554 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -23,11 +23,11 @@ use std::time::{Duration, SystemTime}; use async_lock::RwLock; use async_trait::async_trait; use bytes::BytesMut; -use filetime::{set_file_atime, FileTime}; use futures::stream::{StreamExt, TryStreamExt}; use futures::{Future, TryFutureExt}; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; +use nativelink_util::background_spawn; use nativelink_util::buf_channel::{ make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf, }; @@ -35,7 +35,6 @@ use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo}; -use nativelink_util::{background_spawn, spawn_blocking}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::time::{sleep, timeout, Sleep}; use tokio_stream::wrappers::ReadDirStream; @@ -320,33 +319,6 @@ impl LenEntry for FileEntryImpl { self.data_size == 0 } - #[inline] - async fn touch(&self) -> bool { - let result = self - .get_file_path_locked(move |full_content_path| async move { - let full_content_path = full_content_path.clone(); - spawn_blocking!("filesystem_touch_set_mtime", move || { - set_file_atime(&full_content_path, FileTime::now()).err_tip(|| { - format!("Failed to touch file in filesystem store {full_content_path:?}") - }) - }) - .await - .map_err(|e| { - make_err!( - Code::Internal, - "Failed to change atime of file due to spawn failing {:?}", - e - ) - })? - }) - .await; - if let Err(err) = result { - event!(Level::ERROR, ?err, "Failed to touch file",); - return false; - } - true - } - // unref() only triggers when an item is removed from the eviction_map. It is possible // that another place in code has a reference to `FileEntryImpl` and may later read the // file. To support this edge case, we first move the file to a temp file and point @@ -404,17 +376,17 @@ const SIMULTANEOUS_METADATA_READS: usize = 200; async fn add_files_to_cache( evicting_map: &EvictingMap, SystemTime>, - anchor_time: &SystemTime, + anchor_time: SystemTime, shared_context: &Arc, block_size: u64, ) -> Result<(), Error> { async fn process_entry( evicting_map: &EvictingMap, SystemTime>, file_name: &str, - atime: SystemTime, + filetime: SystemTime, data_size: u64, block_size: u64, - anchor_time: &SystemTime, + anchor_time: SystemTime, shared_context: &Arc, ) -> Result<(), Error> { let digest = digest_from_filename(file_name)?; @@ -429,7 +401,7 @@ async fn add_files_to_cache( }), ); let time_since_anchor = anchor_time - .duration_since(atime) + .duration_since(filetime) .map_err(|_| make_input_err!("File access time newer than now"))?; evicting_map .insert_with_time( @@ -456,20 +428,21 @@ async fn add_files_to_cache( .metadata() .await .err_tip(|| "Failed to get metadata in filesystem store")?; - let atime = match metadata.accessed() { + let filetime = match metadata.accessed() { Ok(atime) => atime, - Err(err) => { - panic!( - "{}{}{} : {} {:?}", - "It appears this filesystem does not support access time. ", - "Please configure this program to run on a drive that supports ", - "atime", - file_name, - err - ); + Err(_) => { + if let Ok(mtime) = metadata.modified() { + mtime + } else { + anchor_time + } } }; - Result::<(String, SystemTime, u64), Error>::Ok((file_name, atime, metadata.len())) + Result::<(String, SystemTime, u64), Error>::Ok(( + file_name, + filetime, + metadata.len(), + )) }) .buffer_unordered(SIMULTANEOUS_METADATA_READS) .try_collect() @@ -477,11 +450,11 @@ async fn add_files_to_cache( }; file_infos.sort_by(|a, b| a.1.cmp(&b.1)); - for (file_name, atime, data_size) in file_infos { + for (file_name, filetime, data_size) in file_infos { let result = process_entry( evicting_map, &file_name, - atime, + filetime, data_size, block_size, anchor_time, @@ -571,7 +544,7 @@ impl FilesystemStore { } else { config.block_size }; - add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size).await?; + add_files_to_cache(evicting_map.as_ref(), now, &shared_context, block_size).await?; prune_temp_path(&shared_context.temp_path).await?; let read_buffer_size = if config.read_buffer_size == 0 { diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 4013af3a8..63b759248 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -146,10 +146,6 @@ impl LenEntry for TestFileEntry bool { - self.inner.as_ref().unwrap().touch().await - } - async fn unref(&self) { Hooks::on_unref(self); self.inner.as_ref().unwrap().unref().await; @@ -604,53 +600,6 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> { Ok(()) } -#[serial] -#[nativelink_test] -async fn atime_updates_on_get_part_test() -> Result<(), Error> { - let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?; - - let store = Box::pin( - FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { - content_path: make_temp_path("content_path"), - temp_path: make_temp_path("temp_path"), - eviction_policy: None, - ..Default::default() - }) - .await?, - ); - // Insert data into store. - store.update_oneshot(digest1, VALUE1.into()).await?; - - let file_entry = store.get_file_entry_for_digest(&digest1).await?; - file_entry - .get_file_path_locked(move |path| async move { - // Set atime to along time ago. - set_file_atime(&path, FileTime::from_system_time(SystemTime::UNIX_EPOCH))?; - - // Check to ensure it was set to zero from previous command. - assert_eq!( - fs::metadata(&path).await?.accessed()?, - SystemTime::UNIX_EPOCH - ); - Ok(()) - }) - .await?; - - // Now touch digest1. - let data = store.get_part_unchunked(digest1, 0, None).await?; - assert_eq!(data, VALUE1.as_bytes()); - - file_entry - .get_file_path_locked(move |path| async move { - // Ensure it was updated. - assert!(fs::metadata(&path).await?.accessed()? > SystemTime::UNIX_EPOCH); - Ok(()) - }) - .await?; - - Ok(()) -} - #[serial] #[nativelink_test] async fn oldest_entry_evicted_with_access_times_loaded_from_disk() -> Result<(), Error> { @@ -1233,41 +1182,6 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> { Ok(()) } -#[serial] -#[nativelink_test] -async fn deleted_file_removed_from_store() -> Result<(), Error> { - let digest = DigestInfo::try_new(HASH1, VALUE1.len())?; - let content_path = make_temp_path("content_path"); - let temp_path = make_temp_path("temp_path"); - - let store = Box::pin( - FilesystemStore::::new_with_timeout_and_rename_fn( - &nativelink_config::stores::FilesystemStore { - content_path: content_path.clone(), - temp_path: temp_path.clone(), - read_buffer_size: 1, - ..Default::default() - }, - |_| sleep(Duration::ZERO), - |from, to| std::fs::rename(from, to), - ) - .await?, - ); - - store.update_oneshot(digest, VALUE1.into()).await?; - - let stored_file_path = OsString::from(format!("{content_path}/{digest}")); - std::fs::remove_file(stored_file_path)?; - - let digest_result = store - .has(digest) - .await - .err_tip(|| "Failed to execute has")?; - assert!(digest_result.is_none()); - - Ok(()) -} - // Ensure that get_file_size() returns the correct number // ceil(content length / block_size) * block_size // assume block size 4K diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index 17f5905f1..b49798d77 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -50,13 +50,6 @@ pub trait LenEntry: 'static { /// Returns `true` if `self` has zero length. fn is_empty(&self) -> bool; - /// Called when an entry is touched. On failure, will remove the entry - /// from the map. - #[inline] - fn touch(&self) -> impl Future + Send { - std::future::ready(true) - } - /// This will be called when object is removed from map. /// Note: There may still be a reference to it held somewhere else, which /// is why it can't be mutable. This is a good place to mark the item @@ -86,11 +79,6 @@ impl LenEntry for Arc { T::is_empty(self.as_ref()) } - #[inline] - async fn touch(&self) -> bool { - self.as_ref().touch().await - } - #[inline] async fn unref(&self) { self.as_ref().unref().await; @@ -344,18 +332,17 @@ where // based on the current time. In such case, we remove the item while // we are here. let should_evict = self.should_evict(lru_len, entry, 0, u64::MAX); - if !should_evict && peek { - *result = Some(entry.data.len()); - } else if !should_evict && entry.data.touch().await { - entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; + if !should_evict { + if !peek { + entry.seconds_since_anchor = + self.anchor_time.elapsed().as_secs() as i32; + } *result = Some(entry.data.len()); } else { *result = None; if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) { if should_evict { event!(Level::INFO, ?key, "Item expired, evicting"); - } else { - event!(Level::INFO, ?key, "Touch failed, evicting"); } state.remove(key.borrow(), &eviction_item, false).await; } @@ -376,15 +363,8 @@ where let entry = state.lru.get_mut(key.borrow())?; - if entry.data.touch().await { - entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; - return Some(entry.data.clone()); - } - - let (key, eviction_item) = state.lru.pop_entry(key.borrow())?; - event!(Level::INFO, ?key, "Touch failed, evicting"); - state.remove(key.borrow(), &eviction_item, false).await; - None + entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; + Some(entry.data.clone()) } /// Returns the replaced item if any. diff --git a/nativelink-util/tests/evicting_map_test.rs b/nativelink-util/tests/evicting_map_test.rs index abf89dcf8..227ca1516 100644 --- a/nativelink-util/tests/evicting_map_test.rs +++ b/nativelink-util/tests/evicting_map_test.rs @@ -354,11 +354,6 @@ async fn unref_called_on_replace() -> Result<(), Error> { unreachable!("We are not testing this functionality"); } - async fn touch(&self) -> bool { - // Do nothing. We are not testing this functionality. - true - } - async fn unref(&self) { self.unref_called.store(true, Ordering::Relaxed); }