From 819896802a50038ccd88b36b7f3d25ef6d454e57 Mon Sep 17 00:00:00 2001 From: Kirpal Grewal Date: Thu, 21 Nov 2024 17:38:42 +0000 Subject: [PATCH] Support native StoreKey in FilesystemStore remove sort precondition --- nativelink-store/src/filesystem_store.rs | 362 +++++++++++++----- nativelink-store/src/memory_store.rs | 22 +- .../tests/filesystem_store_test.rs | 205 ++++------ nativelink-util/src/store_trait.rs | 37 +- 4 files changed, 386 insertions(+), 240 deletions(-) diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 7e8c24cba..b76160ec7 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::panic; use std::borrow::Cow; use std::ffi::{OsStr, OsString}; use std::fmt::{Debug, Formatter}; @@ -35,7 +36,9 @@ use nativelink_util::buf_channel::{ use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; -use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo}; +use nativelink_util::store_trait::{ + StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo, +}; use nativelink_util::{background_spawn, spawn_blocking}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::time::{sleep, timeout, Sleep}; @@ -49,6 +52,15 @@ const DEFAULT_BUFF_SIZE: usize = 32 * 1024; // Default block size of all major filesystems is 4KB const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024; +pub const STR_FOLDER: &str = "s"; +pub const DIGEST_FOLDER: &str = "d"; + +#[derive(Clone, Copy, Debug)] +pub enum FileType { + Digest, + String, +} + #[derive(Debug, MetricsComponent)] pub struct SharedContext { // Used in testing to know how many active drop() spawns are running. @@ -70,22 +82,21 @@ enum PathType { Custom(OsString), } -// Note: We don't store the full path of the file because it would cause -// a lot of needless memeory bloat. There's a high chance we'll end up with a -// lot of small files, so to prevent storing duplicate data, we store an Arc -// to the path of the directory where the file is stored and the packed digest. -// Resulting in usize + sizeof(DigestInfo). -type FileNameDigest = DigestInfo; +/// [`EncodedFilePath`] stores the path to the file +/// including the context, path type and key to the file. +/// The whole [`StoreKey`] is stored as opposed to solely +/// the [`DigestInfo`] so that it is more usable for things +/// such as BEP -see Issue #1108 pub struct EncodedFilePath { shared_context: Arc, path_type: PathType, - digest: FileNameDigest, + key: StoreKey<'static>, } impl EncodedFilePath { #[inline] fn get_file_path(&self) -> Cow<'_, OsStr> { - get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.digest) + get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key) } } @@ -93,14 +104,14 @@ impl EncodedFilePath { fn get_file_path_raw<'a>( path_type: &'a PathType, shared_context: &SharedContext, - digest: &DigestInfo, + key: &StoreKey<'a>, ) -> Cow<'a, OsStr> { let folder = match path_type { PathType::Content => &shared_context.content_path, PathType::Temp => &shared_context.temp_path, PathType::Custom(path) => return Cow::Borrowed(path), }; - Cow::Owned(to_full_path_from_digest(folder, digest)) + Cow::Owned(to_full_path_from_key(folder, key)) } impl Drop for EncodedFilePath { @@ -131,9 +142,23 @@ impl Drop for EncodedFilePath { } } +/// This creates the file path from the [`StoreKey`]. If +/// it is a string, the string, prefixed with [`STR_PREFIX`] +/// for backwards compatibility, is stored. +/// +/// If it is a [`DigestInfo`], it is prefixed by [`DIGEST_PREFIX`] +/// followed by the string representation of a digest - the hash in hex, +/// a hyphen then the size in bytes +/// +/// Previously, only the string representation of the [`DigestInfo`] was +/// used with no prefix #[inline] -fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString { - format!("{folder}/{digest}").into() +fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString { + match key { + StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"), + StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"), + } + .into() } pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { @@ -300,7 +325,7 @@ impl Debug for FileEntryImpl { } } -fn make_temp_digest(digest: &mut DigestInfo) { +fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo { static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0); let mut hash = *digest.packed_hash(); hash[24..].clone_from_slice( @@ -309,6 +334,11 @@ fn make_temp_digest(digest: &mut DigestInfo) { .to_le_bytes(), ); digest.set_packed_hash(*hash); + digest +} + +fn make_temp_key(key: &StoreKey) -> StoreKey<'static> { + StoreKey::Digest(make_temp_digest(key.borrow().into_digest())) } impl LenEntry for FileEntryImpl { @@ -362,16 +392,15 @@ impl LenEntry for FileEntryImpl { return; } let from_path = encoded_file_path.get_file_path(); - let mut new_digest = encoded_file_path.digest; - make_temp_digest(&mut new_digest); + let new_key = make_temp_key(&encoded_file_path.key); let to_path = - to_full_path_from_digest(&encoded_file_path.shared_context.temp_path, &new_digest); + to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key); if let Err(err) = fs::rename(&from_path, &to_path).await { event!( Level::WARN, - digest = ?encoded_file_path.digest, + key = ?encoded_file_path.key, ?from_path, ?to_path, ?err, @@ -380,45 +409,55 @@ impl LenEntry for FileEntryImpl { } else { event!( Level::INFO, - digest = ?encoded_file_path.digest, + key = ?encoded_file_path.key, ?from_path, ?to_path, "Renamed file", ); encoded_file_path.path_type = PathType::Temp; - encoded_file_path.digest = new_digest; + encoded_file_path.key = new_key; } } } } #[inline] -pub fn digest_from_filename(file_name: &str) -> Result { +fn digest_from_filename(file_name: &str) -> Result { let (hash, size) = file_name.split_once('-').err_tip(|| "")?; let size = size.parse::()?; DigestInfo::try_new(hash, size) } +pub fn key_from_file(file_name: &str, file_type: FileType) -> Result, Error> { + match file_type { + FileType::String => Ok(StoreKey::new_str(file_name)), + FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest), + } +} + /// The number of files to read the metadata for at the same time when running /// `add_files_to_cache`. const SIMULTANEOUS_METADATA_READS: usize = 200; async fn add_files_to_cache( - evicting_map: &EvictingMap, SystemTime>, + evicting_map: &EvictingMap, SystemTime>, anchor_time: &SystemTime, shared_context: &Arc, block_size: u64, + rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>, ) -> Result<(), Error> { + #[expect(clippy::too_many_arguments)] async fn process_entry( - evicting_map: &EvictingMap, SystemTime>, + evicting_map: &EvictingMap, SystemTime>, file_name: &str, + file_type: FileType, atime: SystemTime, data_size: u64, block_size: u64, anchor_time: &SystemTime, shared_context: &Arc, ) -> Result<(), Error> { - let digest = digest_from_filename(file_name)?; + let key = key_from_file(file_name, file_type)?; let file_entry = Fe::create( data_size, @@ -426,7 +465,7 @@ async fn add_files_to_cache( RwLock::new(EncodedFilePath { shared_context: shared_context.clone(), path_type: PathType::Content, - digest, + key: key.borrow().into_owned(), }), ); let time_since_anchor = anchor_time @@ -434,7 +473,7 @@ async fn add_files_to_cache( .map_err(|_| make_input_err!("File access time newer than now"))?; evicting_map .insert_with_time( - digest, + key.into_owned().into(), Arc::new(file_entry), time_since_anchor.as_secs() as i32, ) @@ -442,8 +481,21 @@ async fn add_files_to_cache( Ok(()) } - let mut file_infos: Vec<(String, SystemTime, u64)> = { - let (_permit, dir_handle) = fs::read_dir(format!("{}/", shared_context.content_path)) + async fn read_files( + folder: Option<&str>, + shared_context: &SharedContext, + ) -> Result, Error> { + // Note: In Dec 2024 this is for backwards compatibility with the old + // way files were stored on disk. Previously all files were in a single + // folder regardless of the StoreKey type. This allows old versions of + // nativelink file layout to be upgraded at startup time. + // This logic an be removed once more time has passed. + let read_dir = if let Some(folder) = folder { + format!("{}/{folder}/", shared_context.content_path) + } else { + format!("{}/", shared_context.content_path) + }; + let (_permit, dir_handle) = fs::read_dir(read_dir) .await .err_tip(|| "Failed opening content directory for iterating in filesystem store")? .into_inner(); @@ -457,6 +509,9 @@ async fn add_files_to_cache( .metadata() .await .err_tip(|| "Failed to get metadata in filesystem store")?; + // We need to filter out folders - we do not want to try to cache the s and d folders. + let is_file = + metadata.is_file() || !(file_name == STR_FOLDER || file_name == DIGEST_FOLDER); let atime = match metadata.accessed() { Ok(atime) => atime, Err(err) => { @@ -470,53 +525,136 @@ async fn add_files_to_cache( ); } }; - Result::<(String, SystemTime, u64), Error>::Ok((file_name, atime, metadata.len())) + Result::<(String, SystemTime, u64, bool), Error>::Ok(( + file_name, + atime, + metadata.len(), + is_file, + )) }) .buffer_unordered(SIMULTANEOUS_METADATA_READS) .try_collect() - .await? - }; + .await + } - file_infos.sort_by(|a, b| a.1.cmp(&b.1)); - for (file_name, atime, data_size) in file_infos { - let result = process_entry( - evicting_map, - &file_name, - atime, - data_size, - block_size, - anchor_time, - shared_context, - ) - .await; - if let Err(err) = result { - event!( - Level::WARN, - ?file_name, - ?err, - "Failed to add file to eviction cache", - ); - // Ignore result. - let _ = - fs::remove_file(format!("{}/{}", &shared_context.content_path, &file_name)).await; + /// Note: In Dec 2024 this is for backwards compatibility with the old + /// way files were stored on disk. Previously all files were in a single + /// folder regardless of the StoreKey type. This moves files from the old cache + /// location to the new cache location, under [`DIGEST_FOLDER`]. + async fn move_old_cache( + shared_context: &Arc, + rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>, + ) -> Result<(), Error> { + let file_infos = read_files(None, shared_context).await?; + + let from_path = shared_context.content_path.to_string(); + + let to_path = format!("{}/{DIGEST_FOLDER}", shared_context.content_path); + + for (file_name, _, _, _) in file_infos.into_iter().filter(|x| x.3) { + let from_file: OsString = format!("{from_path}/{file_name}").into(); + let to_file: OsString = format!("{to_path}/{file_name}").into(); + + if let Err(err) = rename_fn(&from_file, &to_file) { + event!( + Level::WARN, + ?from_file, + ?to_file, + ?err, + "Failed to rename file", + ); + } else { + event!(Level::INFO, ?from_path, ?to_path, "Renamed file",); + } } + Ok(()) } + + async fn add_files_to_cache( + evicting_map: &EvictingMap, SystemTime>, + anchor_time: &SystemTime, + shared_context: &Arc, + block_size: u64, + folder: &str, + ) -> Result<(), Error> { + let file_infos = read_files(Some(folder), shared_context).await?; + let file_type = match folder { + STR_FOLDER => FileType::String, + DIGEST_FOLDER => FileType::Digest, + _ => panic!("Invalid folder type"), + }; + + let path_root = format!("{}/{folder}", shared_context.content_path); + + for (file_name, atime, data_size, _) in file_infos.into_iter().filter(|x| x.3) { + let result = process_entry( + evicting_map, + &file_name, + file_type, + atime, + data_size, + block_size, + anchor_time, + shared_context, + ) + .await; + if let Err(err) = result { + event!( + Level::WARN, + ?file_name, + ?err, + "Failed to add file to eviction cache", + ); + // Ignore result. + let _ = fs::remove_file(format!("{path_root}/{file_name}")).await; + } + } + Ok(()) + } + + move_old_cache(shared_context, rename_fn).await?; + + add_files_to_cache( + evicting_map, + anchor_time, + shared_context, + block_size, + DIGEST_FOLDER, + ) + .await?; + + add_files_to_cache( + evicting_map, + anchor_time, + shared_context, + block_size, + STR_FOLDER, + ) + .await?; Ok(()) } async fn prune_temp_path(temp_path: &str) -> Result<(), Error> { - let (_permit, dir_handle) = fs::read_dir(temp_path) - .await - .err_tip(|| "Failed opening temp directory to prune partial downloads in filesystem store")? - .into_inner(); - - let mut read_dir_stream = ReadDirStream::new(dir_handle); - while let Some(dir_entry) = read_dir_stream.next().await { - let path = dir_entry?.path(); - if let Err(err) = fs::remove_file(&path).await { - event!(Level::WARN, ?path, ?err, "Failed to delete file",); + async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> { + let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}")) + .await + .err_tip(|| { + "Failed opening temp directory to prune partial downloads in filesystem store" + })? + .into_inner(); + + let mut read_dir_stream = ReadDirStream::new(dir_handle); + while let Some(dir_entry) = read_dir_stream.next().await { + let path = dir_entry?.path(); + if let Err(err) = fs::remove_file(&path).await { + event!(Level::WARN, ?path, ?err, "Failed to delete file",); + } } + Ok(()) } + + prune_temp_inner(temp_path, STR_FOLDER).await?; + prune_temp_inner(temp_path, DIGEST_FOLDER).await?; Ok(()) } @@ -525,7 +663,7 @@ pub struct FilesystemStore { #[metric] shared_context: Arc, #[metric(group = "evicting_map")] - evicting_map: Arc, SystemTime>>, + evicting_map: Arc, SystemTime>>, #[metric(help = "Block size of the configured filesystem")] block_size: u64, #[metric(help = "Size of the configured read buffer size")] @@ -546,18 +684,25 @@ impl FilesystemStore { sleep_fn: fn(Duration) -> Sleep, rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>, ) -> Result, Error> { + async fn create_subdirs(path: &str) -> Result<(), Error> { + fs::create_dir_all(format!("{path}/{STR_FOLDER}")) + .await + .err_tip(|| format!("Failed to create directory {path}/{STR_FOLDER}"))?; + fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}")) + .await + .err_tip(|| format!("Failed to create directory {path}/{DIGEST_FOLDER}")) + } + let now = SystemTime::now(); let empty_policy = nativelink_config::stores::EvictionPolicy::default(); let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy); let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now)); - fs::create_dir_all(&spec.temp_path) - .await - .err_tip(|| format!("Failed to temp directory {:?}", &spec.temp_path))?; - fs::create_dir_all(&spec.content_path) - .await - .err_tip(|| format!("Failed to content directory {:?}", &spec.content_path))?; + // Create temp and content directories and the s and d subdirectories. + + create_subdirs(&spec.temp_path).await?; + create_subdirs(&spec.content_path).await?; let shared_context = Arc::new(SharedContext { active_drop_spawns: AtomicU64::new(0), @@ -570,7 +715,14 @@ impl FilesystemStore { } else { spec.block_size }; - add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size).await?; + add_files_to_cache( + evicting_map.as_ref(), + &now, + &shared_context, + block_size, + rename_fn, + ) + .await?; prune_temp_path(&shared_context.temp_path).await?; let read_buffer_size = if spec.read_buffer_size == 0 { @@ -595,7 +747,7 @@ impl FilesystemStore { pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result, Error> { self.evicting_map - .get(digest) + .get::>(&digest.into()) .await .ok_or_else(|| make_err!(Code::NotFound, "{digest} not found in filesystem store")) } @@ -604,7 +756,7 @@ impl FilesystemStore { self: Pin<&'a Self>, mut entry: Fe, mut resumeable_temp_file: fs::ResumeableFileSlot, - final_digest: DigestInfo, + final_key: StoreKey<'static>, mut reader: DropCloserReadHalf, ) -> Result<(), Error> { let mut data_size = 0; @@ -649,10 +801,10 @@ impl FilesystemStore { drop(resumeable_temp_file); *entry.data_size_mut() = data_size; - self.emplace_file(final_digest, Arc::new(entry)).await + self.emplace_file(final_key, Arc::new(entry)).await } - async fn emplace_file(&self, digest: DigestInfo, entry: Arc) -> Result<(), Error> { + async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc) -> Result<(), Error> { // This sequence of events is quite ticky to understand due to the amount of triggers that // happen, async'ness of it and the locking. So here is a breakdown of what happens: // 1. Here will hold a write lock on any file operations of this FileEntry. @@ -680,10 +832,12 @@ impl FilesystemStore { let final_path = get_file_path_raw( &PathType::Content, encoded_file_path.shared_context.as_ref(), - &digest, + &key, ); - evicting_map.insert(digest, entry.clone()).await; + evicting_map + .insert(key.borrow().into_owned().into(), entry.clone()) + .await; let from_path = encoded_file_path.get_file_path(); // Internally tokio spawns fs commands onto a blocking thread anyways. @@ -709,13 +863,14 @@ impl FilesystemStore { drop(encoded_file_path); // It is possible that the item in our map is no longer the item we inserted, // So, we need to conditionally remove it only if the pointers are the same. + evicting_map - .remove_if(&digest, |map_entry| Arc::::ptr_eq(map_entry, &entry)) + .remove_if(&key, |map_entry| Arc::::ptr_eq(map_entry, &entry)) .await; return Err(err); } encoded_file_path.path_type = PathType::Content; - encoded_file_path.digest = digest; + encoded_file_path.key = key; Ok(()) }) .await @@ -730,26 +885,25 @@ impl StoreDriver for FilesystemStore { keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - // TODO(allada) This is a bit of a hack to get around the lifetime issues with the - // existence_cache. We need to convert the digests to owned values to be able to - // insert them into the cache. In theory it should be able to elide this conversion - // but it seems to be a bit tricky to get right. - let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_digest()).collect(); self.evicting_map - .sizes_for_keys(&keys, results, false /* peek */) + .sizes_for_keys::<_, StoreKey<'_>, &StoreKey<'_>>( + keys.iter(), + results, + false, /* peek */ + ) .await; // We need to do a special pass to ensure our zero files exist. // If our results failed and the result was a zero file, we need to // create the file by spec. - for (digest, result) in keys.iter().zip(results.iter_mut()) { - if result.is_some() || !is_zero_digest(digest) { + for (key, result) in keys.iter().zip(results.iter_mut()) { + if result.is_some() || !is_zero_digest(key.borrow()) { continue; } let (mut tx, rx) = make_buf_channel_pair(); let send_eof_result = tx.send_eof(); - self.update(digest.into(), rx, UploadSizeInfo::ExactSize(0)) + self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0)) .await - .err_tip(|| format!("Failed to create zero file for key {digest}")) + .err_tip(|| format!("Failed to create zero file for key {}", key.as_str())) .merge( send_eof_result .err_tip(|| "Failed to send zero file EOF in filesystem store has"), @@ -766,21 +920,18 @@ impl StoreDriver for FilesystemStore { reader: DropCloserReadHalf, _upload_size: UploadSizeInfo, ) -> Result<(), Error> { - let digest = key.into_digest(); - let mut temp_digest = digest; - make_temp_digest(&mut temp_digest); - + let temp_key = make_temp_key(&key); let (entry, temp_file, temp_full_path) = Fe::make_and_open_file( self.block_size, EncodedFilePath { shared_context: self.shared_context.clone(), path_type: PathType::Temp, - digest: temp_digest, + key: temp_key, }, ) .await?; - self.update_file(entry, temp_file, digest, reader) + self.update_file(entry, temp_file, key.into_owned(), reader) .await .err_tip(|| format!("While processing with temp file {temp_full_path:?}")) } @@ -795,7 +946,6 @@ impl StoreDriver for FilesystemStore { mut file: fs::ResumeableFileSlot, upload_size: UploadSizeInfo, ) -> Result, Error> { - let digest = key.into_digest(); let path = file.get_path().as_os_str().to_os_string(); let file_size = match upload_size { UploadSizeInfo::ExactSize(size) => size, @@ -818,13 +968,13 @@ impl StoreDriver for FilesystemStore { RwLock::new(EncodedFilePath { shared_context: self.shared_context.clone(), path_type: PathType::Custom(path), - digest, + key: key.borrow().into_owned(), }), ); // We are done with the file, if we hold a reference to the file here, it could // result in a deadlock if `emplace_file()` also needs file descriptors. drop(file); - self.emplace_file(digest, Arc::new(entry)) + self.emplace_file(key.into_owned(), Arc::new(entry)) .await .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?; return Ok(None); @@ -837,9 +987,8 @@ impl StoreDriver for FilesystemStore { offset: u64, length: Option, ) -> Result<(), Error> { - let digest = key.into_digest(); - if is_zero_digest(digest) { - self.has(digest.into()) + if is_zero_digest(key.borrow()) { + self.has(key.borrow()) .await .err_tip(|| "Failed to check if zero digest exists in filesystem store")?; writer @@ -848,10 +997,13 @@ impl StoreDriver for FilesystemStore { return Ok(()); } - let entry = - self.evicting_map.get(&digest).await.ok_or_else(|| { - make_err!(Code::NotFound, "{digest} not found in filesystem store") - })?; + let entry = self.evicting_map.get(&key).await.ok_or_else(|| { + make_err!( + Code::NotFound, + "{} not found in filesystem store here", + key.as_str() + ) + })?; let read_limit = length.unwrap_or(u64::MAX); let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?; diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index 27cf2cda0..edc5e2dd5 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Borrow; use std::fmt::Debug; use std::ops::Bound; use std::pin::Pin; @@ -26,7 +27,7 @@ use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; -use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreKeyBorrow, UploadSizeInfo}; use crate::cas_utils::is_zero_digest; @@ -54,7 +55,7 @@ impl LenEntry for BytesWrapper { #[derive(MetricsComponent)] pub struct MemoryStore { #[metric(group = "evicting_map")] - evicting_map: EvictingMap, BytesWrapper, SystemTime>, + evicting_map: EvictingMap, } impl MemoryStore { @@ -73,7 +74,7 @@ impl MemoryStore { } pub async fn remove_entry(&self, key: StoreKey<'_>) -> bool { - self.evicting_map.remove(&key.into_owned()).await + self.evicting_map.remove(&key).await } } @@ -84,11 +85,12 @@ impl StoreDriver for MemoryStore { keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - // TODO(allada): This is a dirty hack to get around the lifetime issues with the - // evicting map. - let digests: Vec<_> = keys.iter().map(|key| key.borrow().into_owned()).collect(); self.evicting_map - .sizes_for_keys(digests, results, false /* peek */) + .sizes_for_keys::<_, StoreKey<'_>, &StoreKey<'_>>( + keys.iter(), + results, + false, /* peek */ + ) .await; // We need to do a special pass to ensure our zero digest exist. keys.iter() @@ -112,7 +114,7 @@ impl StoreDriver for MemoryStore { ); let iterations = self .evicting_map - .range(range, move |key, _value| handler(key)) + .range(range, move |key, _value| handler(key.borrow())) .await; Ok(iterations) } @@ -136,7 +138,7 @@ impl StoreDriver for MemoryStore { }; self.evicting_map - .insert(key.borrow().into_owned(), BytesWrapper(final_buffer)) + .insert(key.into_owned().into(), BytesWrapper(final_buffer)) .await; Ok(()) } @@ -162,7 +164,7 @@ impl StoreDriver for MemoryStore { let value = self .evicting_map - .get(&key.borrow().into_owned()) + .get(&key) .await .err_tip_with_code(|_| (Code::NotFound, format!("Key {key:?} not found")))?; let default_len = usize::try_from(value.len()) diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 99b3333c8..f02b32e82 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -32,13 +32,14 @@ use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::{ - digest_from_filename, EncodedFilePath, FileEntry, FileEntryImpl, FilesystemStore, + key_from_file, EncodedFilePath, FileEntry, FileEntryImpl, FileType, FilesystemStore, + DIGEST_FOLDER, STR_FOLDER, }; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::LenEntry; use nativelink_util::origin_context::ContextAwareFuture; -use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreKey, StoreLike, UploadSizeInfo}; use nativelink_util::{background_spawn, spawn}; use parking_lot::Mutex; use pretty_assertions::assert_eq; @@ -215,19 +216,6 @@ async fn read_file_contents(file_name: &OsStr) -> Result, Error> { Ok(data) } -async fn write_file(file_name: &OsStr, data: &[u8]) -> Result<(), Error> { - let mut file = fs::create_file(file_name) - .await - .err_tip(|| format!("Failed to create file: {file_name:?}"))?; - file.as_writer().await?.write_all(data).await?; - file.as_writer() - .await? - .as_mut() - .sync_all() - .await - .err_tip(|| "Could not sync file") -} - async fn wait_for_no_open_files() -> Result<(), Error> { let mut counter = 0; while fs::get_open_files_for_test() != 0 { @@ -243,10 +231,39 @@ async fn wait_for_no_open_files() -> Result<(), Error> { Ok(()) } +/// Helper function to ensure there are no temporary files left. +async fn check_temp_empty(temp_path: &str) -> Result<(), Error> { + let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{DIGEST_FOLDER}")) + .await + .err_tip(|| "Failed opening temp directory")? + .into_inner(); + + let mut read_dir_stream = ReadDirStream::new(temp_dir_handle); + + if let Some(temp_dir_entry) = read_dir_stream.next().await { + let path = temp_dir_entry?.path(); + panic!("No files should exist in temp directory, found: {path:?}"); + } + + let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{STR_FOLDER}")) + .await + .err_tip(|| "Failed opening temp directory")? + .into_inner(); + + let mut read_dir_stream = ReadDirStream::new(temp_dir_handle); + + if let Some(temp_dir_entry) = read_dir_stream.next().await { + let path = temp_dir_entry?.path(); + panic!("No files should exist in temp directory, found: {path:?}"); + } + Ok(()) +} + const HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef"; const HASH2: &str = "0123456789abcdef000000000000000000020000000000000123456789abcdef"; const VALUE1: &str = "0123456789"; const VALUE2: &str = "9876543210"; +const STRING_NAME: &str = "String_Filename"; #[serial] #[nativelink_test] @@ -288,7 +305,9 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> { .await?, ); - let content = store.get_part_unchunked(digest, 0, None).await?; + let key = StoreKey::Digest(digest); + + let content = store.get_part_unchunked(key, 0, None).await?; assert_eq!(content, VALUE1.as_bytes()); } @@ -325,7 +344,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { store.update_oneshot(digest1, VALUE1.into()).await?; - let expected_file_name = OsString::from(format!("{content_path}/{digest1}")); + let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest1}")); { // Check to ensure our file exists where it should and content matches. let data = read_file_contents(&expected_file_name).await?; @@ -356,18 +375,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { tokio::task::yield_now().await; } - let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone()) - .await - .err_tip(|| "Failed opening temp directory")? - .into_inner(); - let mut read_dir_stream = ReadDirStream::new(temp_dir_handle); - - if let Some(temp_dir_entry) = read_dir_stream.next().await { - let path = temp_dir_entry?.path(); - panic!("No files should exist in temp directory, found: {path:?}"); - } - - Ok(()) + check_temp_empty(&temp_path).await } // This test ensures that if a file is overridden and an open stream to the file already @@ -433,8 +441,8 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error> tokio::task::yield_now().await; { - // Now ensure we only have 1 file in our temp path. - let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone()) + // Now ensure we only have 1 file in our temp path - we know it is a digest. + let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{DIGEST_FOLDER}")) .await .err_tip(|| "Failed opening temp directory")? .into_inner(); @@ -474,20 +482,8 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error> tokio::task::yield_now().await; } - { - // Now ensure our temp file was cleaned up. - let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone()) - .await - .err_tip(|| "Failed opening temp directory")? - .into_inner(); - let mut read_dir_stream = ReadDirStream::new(temp_dir_handle); - if let Some(temp_dir_entry) = read_dir_stream.next().await { - let path = temp_dir_entry?.path(); - panic!("No files should exist in temp directory, found: {path:?}"); - } - } - - Ok(()) + // Now ensure our temp file was cleaned up. + check_temp_empty(&temp_path).await } // Eviction has a different code path than a file replacement, so we check that if a @@ -546,8 +542,8 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> { tokio::task::yield_now().await; { - // Now ensure we only have 1 file in our temp path. - let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone()) + // Now ensure we only have 1 file in our temp path - we know it is a digest. + let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{DIGEST_FOLDER}")) .await .err_tip(|| "Failed opening temp directory")? .into_inner(); @@ -583,20 +579,8 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> { tokio::task::yield_now().await; } - { - // Now ensure our temp file was cleaned up. - let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone()) - .await - .err_tip(|| "Failed opening temp directory")? - .into_inner(); - let mut read_dir_stream = ReadDirStream::new(temp_dir_handle); - if let Some(temp_dir_entry) = read_dir_stream.next().await { - let path = temp_dir_entry?.path(); - panic!("No files should exist in temp directory, found: {path:?}"); - } - } - - Ok(()) + // Now ensure our temp file was cleaned up. + check_temp_empty(&temp_path).await } #[serial] @@ -646,51 +630,6 @@ async fn atime_updates_on_get_part_test() -> Result<(), Error> { Ok(()) } -#[serial] -#[nativelink_test] -async fn oldest_entry_evicted_with_access_times_loaded_from_disk() -> Result<(), Error> { - // Note these are swapped to ensure they aren't in numerical order. - let digest1 = DigestInfo::try_new(HASH2, VALUE2.len())?; - let digest2 = DigestInfo::try_new(HASH1, VALUE1.len())?; - - let content_path = make_temp_path("content_path"); - fs::create_dir_all(&content_path).await?; - - // Make the two files on disk before loading the store. - let file1 = OsString::from(format!("{content_path}/{digest1}")); - let file2 = OsString::from(format!("{content_path}/{digest2}")); - write_file(&file1, VALUE1.as_bytes()).await?; - write_file(&file2, VALUE2.as_bytes()).await?; - set_file_atime(&file1, FileTime::from_unix_time(0, 0))?; - set_file_atime(&file2, FileTime::from_unix_time(1, 0))?; - - // Load the existing store from disk. - let store = Box::pin( - FilesystemStore::::new(&FilesystemSpec { - content_path, - temp_path: make_temp_path("temp_path"), - eviction_policy: Some(nativelink_config::stores::EvictionPolicy { - max_bytes: 0, - max_seconds: 0, - max_count: 1, - evict_bytes: 0, - }), - ..Default::default() - }) - .await?, - ); - - // This should exist and not have been evicted. - store.get_file_entry_for_digest(&digest2).await?; - // This should have been evicted. - match store.get_file_entry_for_digest(&digest1).await { - Ok(_) => panic!("Oldest file should have been evicted."), - Err(error) => assert_eq!(error.code, Code::NotFound), - } - - Ok(()) -} - #[serial] #[nativelink_test] async fn eviction_drops_file_test() -> Result<(), Error> { @@ -794,23 +733,26 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { const SMALL_VALUE: &str = "01"; const BIG_VALUE: &str = "0123"; - static UNREFED_DIGESTS: LazyLock>> = + static UNREFED_DIGESTS: LazyLock>>> = LazyLock::new(|| Mutex::new(Vec::new())); struct LocalHooks {} impl FileEntryHooks for LocalHooks { fn on_unref(file_entry: &Fe) { block_on(file_entry.get_file_path_locked(move |path_str| async move { let path = Path::new(&path_str); - let digest = - digest_from_filename(path.file_name().unwrap().to_str().unwrap()).unwrap(); - UNREFED_DIGESTS.lock().push(digest); + let digest = key_from_file( + path.file_name().unwrap().to_str().unwrap(), + FileType::Digest, + ) + .unwrap(); + UNREFED_DIGESTS.lock().push(digest.borrow().into_owned()); Ok(()) })) .unwrap(); } } - let small_digest = DigestInfo::try_new(HASH1, SMALL_VALUE.len())?; + let small_digest = StoreKey::Digest(DigestInfo::try_new(HASH1, SMALL_VALUE.len())?); let big_digest = DigestInfo::try_new(HASH1, BIG_VALUE.len())?; let store = Box::pin( @@ -828,7 +770,7 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { ); // Insert data into store. store - .update_oneshot(small_digest, SMALL_VALUE.into()) + .update_oneshot(small_digest.borrow(), SMALL_VALUE.into()) .await?; store.update_oneshot(big_digest, BIG_VALUE.into()).await?; @@ -859,7 +801,10 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens() ) -> Result { loop { yield_fn().await?; - let (_permit, dir_handle) = fs::read_dir(&temp_path).await?.into_inner(); + // Now ensure we only have 1 file in our temp path - we know it is a digest. + let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{DIGEST_FOLDER}")) + .await? + .into_inner(); let mut read_dir_stream = ReadDirStream::new(dir_handle); if let Some(dir_entry) = read_dir_stream.next().await { assert!( @@ -967,13 +912,7 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens() // Now it should have cleaned up its temp files. { - // Ensure `temp_path` is empty. - let (_permit, dir_handle) = fs::read_dir(&temp_path).await?.into_inner(); - let mut read_dir_stream = ReadDirStream::new(dir_handle); - assert!( - read_dir_stream.next().await.is_none(), - "File found in temp_path after update() rename failure" - ); + check_temp_empty(&temp_path).await?; } // Finally ensure that our entry is not in the store. @@ -1089,7 +1028,8 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> { loop { yield_fn().await?; - let empty_digest_file_name = OsString::from(format!("{content_path}/{digest}")); + let empty_digest_file_name = + OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}")); let file_metadata = fs::metadata(empty_digest_file_name) .await @@ -1218,7 +1158,7 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> { .get_file_path_locked(move |file_path| async move { assert_eq!( file_path, - OsString::from(format!("{content_path}/{digest}")) + OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}")) ); Ok(()) }) @@ -1250,7 +1190,7 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { store.update_oneshot(digest, VALUE1.into()).await?; - let stored_file_path = OsString::from(format!("{content_path}/{digest}")); + let stored_file_path = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}")); std::fs::remove_file(stored_file_path)?; let digest_result = store @@ -1259,6 +1199,23 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { .err_tip(|| "Failed to execute has")?; assert!(digest_result.is_none()); + // Repeat with a string typed key. + + let string_key = StoreKey::new_str(STRING_NAME); + + store + .update_oneshot(string_key.borrow(), VALUE2.into()) + .await?; + + let stored_file_path = OsString::from(format!("{content_path}/{STR_FOLDER}/{STRING_NAME}")); + std::fs::remove_file(stored_file_path)?; + + let string_result = store + .has(string_key) + .await + .err_tip(|| "Failed to execute has")?; + assert!(string_result.is_none()); + Ok(()) } @@ -1460,7 +1417,7 @@ async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> { "Expected filesystem store to consume the file" ); - let expected_file_name = OsString::from(format!("{content_path}/{digest}")); + let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}")); let new_inode = fs::create_file(expected_file_name) .await? .as_reader() diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index f04470772..4f01194b3 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::{BorrowMut, Cow}; +use std::borrow::{Borrow, BorrowMut, Cow}; use std::collections::hash_map::DefaultHasher as StdHasher; use std::convert::Into; use std::hash::{Hash, Hasher}; @@ -144,6 +144,41 @@ pub enum StoreOptimizations { NoopDownloads, } +/// A wrapper struct for [`StoreKey`] to work around +/// lifetime limitations in `HashMap::get()` as described in +/// +/// +/// As such this is a wrapper type that is stored in the +/// maps using the workaround as described in +/// +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[repr(transparent)] +pub struct StoreKeyBorrow(StoreKey<'static>); + +impl From> for StoreKeyBorrow { + fn from(key: StoreKey<'static>) -> Self { + Self(key) + } +} + +impl From for StoreKey<'static> { + fn from(key_borrow: StoreKeyBorrow) -> Self { + key_borrow.0 + } +} + +impl<'a> Borrow> for StoreKeyBorrow { + fn borrow(&self) -> &StoreKey<'a> { + &self.0 + } +} + +impl<'a> Borrow> for &StoreKeyBorrow { + fn borrow(&self) -> &StoreKey<'a> { + &self.0 + } +} + /// Holds something that can be converted into a key the /// store API can understand. Generally this is a digest /// but it can also be a string if the caller wishes to