From 4ebc432903f5921379c8785d3a129e744a0a1253 Mon Sep 17 00:00:00 2001 From: Kirpal Grewal Date: Thu, 21 Nov 2024 17:38:42 +0000 Subject: [PATCH] Support native StoreKey in FilesystemStore --- nativelink-store/src/filesystem_store.rs | 134 +++++++++++------- .../tests/filesystem_store_test.rs | 33 +++-- 2 files changed, 98 insertions(+), 69 deletions(-) diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 22d79cec8..196321b40 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -49,6 +49,9 @@ const DEFAULT_BUFF_SIZE: usize = 32 * 1024; // Default block size of all major filesystems is 4KB const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024; +pub const STR_PREFIX: &str = "s-"; +pub const DIGEST_PREFIX: &str = "d-"; + #[derive(Debug, MetricsComponent)] pub struct SharedContext { // Used in testing to know how many active drop() spawns are running. @@ -70,22 +73,16 @@ enum PathType { Custom(OsString), } -// Note: We don't store the full path of the file because it would cause -// a lot of needless memeory bloat. There's a high chance we'll end up with a -// lot of small files, so to prevent storing duplicate data, we store an Arc -// to the path of the directory where the file is stored and the packed digest. -// Resulting in usize + sizeof(DigestInfo). -type FileNameDigest = DigestInfo; pub struct EncodedFilePath { shared_context: Arc, path_type: PathType, - digest: FileNameDigest, + key: StoreKey<'static>, } impl EncodedFilePath { #[inline] fn get_file_path(&self) -> Cow<'_, OsStr> { - get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.digest) + get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key) } } @@ -93,14 +90,14 @@ impl EncodedFilePath { fn get_file_path_raw<'a>( path_type: &'a PathType, shared_context: &SharedContext, - digest: &DigestInfo, + key: &StoreKey<'a>, ) -> Cow<'a, OsStr> { let folder = match path_type { PathType::Content => &shared_context.content_path, PathType::Temp => &shared_context.temp_path, PathType::Custom(path) => return Cow::Borrowed(path), }; - Cow::Owned(to_full_path_from_digest(folder, digest)) + Cow::Owned(to_full_path_from_key(folder, key)) } impl Drop for EncodedFilePath { @@ -132,8 +129,13 @@ impl Drop for EncodedFilePath { } #[inline] -fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString { - format!("{folder}/{digest}").into() +fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString { + // appropriately prefix file name + match key { + StoreKey::Str(str) => format!("{folder}/{STR_PREFIX}{str}"), + StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_PREFIX}{digest_info}"), + } + .into() } pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { @@ -300,7 +302,7 @@ impl Debug for FileEntryImpl { } } -fn make_temp_digest(digest: &mut DigestInfo) { +fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo { static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0); let mut hash = *digest.packed_hash(); hash[24..].clone_from_slice( @@ -309,6 +311,11 @@ fn make_temp_digest(digest: &mut DigestInfo) { .to_le_bytes(), ); digest.set_packed_hash(*hash); + digest +} + +fn make_temp_key(key: &StoreKey) -> StoreKey<'static> { + StoreKey::Digest(make_temp_digest(key.borrow().into_digest())) } impl LenEntry for FileEntryImpl { @@ -362,16 +369,15 @@ impl LenEntry for FileEntryImpl { return; } let from_path = encoded_file_path.get_file_path(); - let mut new_digest = encoded_file_path.digest; - make_temp_digest(&mut new_digest); + let new_key = make_temp_key(&encoded_file_path.key); let to_path = - to_full_path_from_digest(&encoded_file_path.shared_context.temp_path, &new_digest); + to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key); if let Err(err) = fs::rename(&from_path, &to_path).await { event!( Level::WARN, - digest = ?encoded_file_path.digest, + key = ?encoded_file_path.key, ?from_path, ?to_path, ?err, @@ -380,37 +386,50 @@ impl LenEntry for FileEntryImpl { } else { event!( Level::INFO, - digest = ?encoded_file_path.digest, + key = ?encoded_file_path.key, ?from_path, ?to_path, "Renamed file", ); encoded_file_path.path_type = PathType::Temp; - encoded_file_path.digest = new_digest; + encoded_file_path.key = new_key; } } } } #[inline] -pub fn digest_from_filename(file_name: &str) -> Result { +fn digest_from_filename(file_name: &str) -> Result { let (hash, size) = file_name.split_once('-').err_tip(|| "")?; let size = size.parse::()?; DigestInfo::try_new(hash, size) } +pub fn key_from_filename(mut file_name: &str) -> Result, Error> { + if let Some(file_name) = file_name.strip_prefix(STR_PREFIX) { + return Ok(StoreKey::new_str(file_name)); + } + + // Remove the digest prefix if it exists. Permit unprefixed hashes for backwards compatibility. + if let Some(name) = file_name.strip_prefix(DIGEST_PREFIX) { + file_name = name; + } + + digest_from_filename(file_name).map(StoreKey::Digest) +} + /// The number of files to read the metadata for at the same time when running /// add_files_to_cache. const SIMULTANEOUS_METADATA_READS: usize = 200; async fn add_files_to_cache( - evicting_map: &EvictingMap, SystemTime>, + evicting_map: &EvictingMap, Arc, SystemTime>, anchor_time: &SystemTime, shared_context: &Arc, block_size: u64, ) -> Result<(), Error> { async fn process_entry( - evicting_map: &EvictingMap, SystemTime>, + evicting_map: &EvictingMap, Arc, SystemTime>, file_name: &str, atime: SystemTime, data_size: u64, @@ -418,7 +437,7 @@ async fn add_files_to_cache( anchor_time: &SystemTime, shared_context: &Arc, ) -> Result<(), Error> { - let digest = digest_from_filename(file_name)?; + let key = key_from_filename(file_name)?; let file_entry = Fe::create( data_size, @@ -426,7 +445,7 @@ async fn add_files_to_cache( RwLock::new(EncodedFilePath { shared_context: shared_context.clone(), path_type: PathType::Content, - digest, + key: key.borrow().into_owned(), }), ); let time_since_anchor = anchor_time @@ -434,7 +453,7 @@ async fn add_files_to_cache( .map_err(|_| make_input_err!("File access time newer than now"))?; evicting_map .insert_with_time( - digest, + key.into_owned(), Arc::new(file_entry), time_since_anchor.as_secs() as i32, ) @@ -525,7 +544,7 @@ pub struct FilesystemStore { #[metric] shared_context: Arc, #[metric(group = "evicting_map")] - evicting_map: Arc, SystemTime>>, + evicting_map: Arc, Arc, SystemTime>>, #[metric(help = "Block size of the configured filesystem")] block_size: u64, #[metric(help = "Size of the configured read buffer size")] @@ -595,16 +614,16 @@ impl FilesystemStore { pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result, Error> { self.evicting_map - .get(digest) + .get(&digest.into()) .await - .ok_or_else(|| make_err!(Code::NotFound, "{digest} not found in filesystem store")) + .ok_or_else(|| make_err!(Code::NotFound, "{} not found in filesystem store", digest)) } async fn update_file<'a>( self: Pin<&'a Self>, mut entry: Fe, mut resumeable_temp_file: fs::ResumeableFileSlot, - final_digest: DigestInfo, + final_digest: StoreKey<'static>, mut reader: DropCloserReadHalf, ) -> Result<(), Error> { let mut data_size = 0; @@ -652,7 +671,7 @@ impl FilesystemStore { self.emplace_file(final_digest, Arc::new(entry)).await } - async fn emplace_file(&self, digest: DigestInfo, entry: Arc) -> Result<(), Error> { + async fn emplace_file(&self, key: StoreKey<'_>, entry: Arc) -> Result<(), Error> { // This sequence of events is quite ticky to understand due to the amount of triggers that // happen, async'ness of it and the locking. So here is a breakdown of what happens: // 1. Here will hold a write lock on any file operations of this FileEntry. @@ -673,6 +692,9 @@ impl FilesystemStore { let evicting_map = self.evicting_map.clone(); let rename_fn = self.rename_fn; + // we need to extend the lifetime into 'static, for background spawn + let key = key.borrow().into_owned(); + // We need to guarantee that this will get to the end even if the parent future is dropped. // See: https://github.com/TraceMachina/nativelink/issues/495 background_spawn!("filesystem_store_emplace_file", async move { @@ -680,10 +702,12 @@ impl FilesystemStore { let final_path = get_file_path_raw( &PathType::Content, encoded_file_path.shared_context.as_ref(), - &digest, + &key, ); - evicting_map.insert(digest, entry.clone()).await; + evicting_map + .insert(key.borrow().into_owned(), entry.clone()) + .await; let from_path = encoded_file_path.get_file_path(); // Internally tokio spawns fs commands onto a blocking thread anyways. @@ -710,12 +734,12 @@ impl FilesystemStore { // It is possible that the item in our map is no longer the item we inserted, // So, we need to conditionally remove it only if the pointers are the same. evicting_map - .remove_if(&digest, |map_entry| Arc::::ptr_eq(map_entry, &entry)) + .remove_if(&key, |map_entry| Arc::::ptr_eq(map_entry, &entry)) .await; return Err(err); } encoded_file_path.path_type = PathType::Content; - encoded_file_path.digest = digest; + encoded_file_path.key = key.borrow().into_owned(); Ok(()) }) .await @@ -734,22 +758,22 @@ impl StoreDriver for FilesystemStore { // existence_cache. We need to convert the digests to owned values to be able to // insert them into the cache. In theory it should be able to elide this conversion // but it seems to be a bit tricky to get right. - let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_digest()).collect(); + let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_owned()).collect(); self.evicting_map .sizes_for_keys(&keys, results, false /* peek */) .await; // We need to do a special pass to ensure our zero files exist. // If our results failed and the result was a zero file, we need to // create the file by spec. - for (digest, result) in keys.iter().zip(results.iter_mut()) { - if result.is_some() || !is_zero_digest(digest) { + for (key, result) in keys.iter().zip(results.iter_mut()) { + if result.is_some() || !is_zero_digest(key.borrow()) { continue; } let (mut tx, rx) = make_buf_channel_pair(); let send_eof_result = tx.send_eof(); - self.update(digest.into(), rx, UploadSizeInfo::ExactSize(0)) + self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0)) .await - .err_tip(|| format!("Failed to create zero file for key {digest}")) + .err_tip(|| format!("Failed to create zero file for key {}", key.as_str())) .merge( send_eof_result .err_tip(|| "Failed to send zero file EOF in filesystem store has"), @@ -766,21 +790,18 @@ impl StoreDriver for FilesystemStore { reader: DropCloserReadHalf, _upload_size: UploadSizeInfo, ) -> Result<(), Error> { - let digest = key.into_digest(); - let mut temp_digest = digest; - make_temp_digest(&mut temp_digest); - + let temp_key = make_temp_key(&key); let (entry, temp_file, temp_full_path) = Fe::make_and_open_file( self.block_size, EncodedFilePath { shared_context: self.shared_context.clone(), path_type: PathType::Temp, - digest: temp_digest, + key: temp_key, }, ) .await?; - self.update_file(entry, temp_file, digest, reader) + self.update_file(entry, temp_file, key.borrow().into_owned(), reader) .await .err_tip(|| format!("While processing with temp file {temp_full_path:?}")) } @@ -795,7 +816,6 @@ impl StoreDriver for FilesystemStore { mut file: fs::ResumeableFileSlot, upload_size: UploadSizeInfo, ) -> Result, Error> { - let digest = key.into_digest(); let path = file.get_path().as_os_str().to_os_string(); let file_size = match upload_size { UploadSizeInfo::ExactSize(size) => size, @@ -818,13 +838,13 @@ impl StoreDriver for FilesystemStore { RwLock::new(EncodedFilePath { shared_context: self.shared_context.clone(), path_type: PathType::Custom(path), - digest, + key: key.borrow().into_owned(), }), ); // We are done with the file, if we hold a reference to the file here, it could // result in a deadlock if `emplace_file()` also needs file descriptors. drop(file); - self.emplace_file(digest, Arc::new(entry)) + self.emplace_file(key, Arc::new(entry)) .await .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?; return Ok(None); @@ -837,9 +857,8 @@ impl StoreDriver for FilesystemStore { offset: u64, length: Option, ) -> Result<(), Error> { - let digest = key.into_digest(); - if is_zero_digest(digest) { - self.has(digest.into()) + if is_zero_digest(key.borrow()) { + self.has(key.borrow()) .await .err_tip(|| "Failed to check if zero digest exists in filesystem store")?; writer @@ -848,9 +867,16 @@ impl StoreDriver for FilesystemStore { return Ok(()); } - let entry = - self.evicting_map.get(&digest).await.ok_or_else(|| { - make_err!(Code::NotFound, "{digest} not found in filesystem store") + let entry = self + .evicting_map + .get(&key.borrow().into_owned()) + .await + .ok_or_else(|| { + make_err!( + Code::NotFound, + "{} not found in filesystem store here", + key.as_str() + ) })?; let read_limit = length.unwrap_or(u64::MAX); let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?; diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 99b3333c8..73cfef643 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -32,13 +32,13 @@ use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::{ - digest_from_filename, EncodedFilePath, FileEntry, FileEntryImpl, FilesystemStore, + key_from_filename, EncodedFilePath, FileEntry, FileEntryImpl, FilesystemStore, DIGEST_PREFIX, }; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::LenEntry; use nativelink_util::origin_context::ContextAwareFuture; -use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreKey, StoreLike, UploadSizeInfo}; use nativelink_util::{background_spawn, spawn}; use parking_lot::Mutex; use pretty_assertions::assert_eq; @@ -288,7 +288,9 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> { .await?, ); - let content = store.get_part_unchunked(digest, 0, None).await?; + let key = StoreKey::Digest(digest); + + let content = store.get_part_unchunked(key, 0, None).await?; assert_eq!(content, VALUE1.as_bytes()); } @@ -325,7 +327,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { store.update_oneshot(digest1, VALUE1.into()).await?; - let expected_file_name = OsString::from(format!("{content_path}/{digest1}")); + let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest1}")); { // Check to ensure our file exists where it should and content matches. let data = read_file_contents(&expected_file_name).await?; @@ -657,8 +659,8 @@ async fn oldest_entry_evicted_with_access_times_loaded_from_disk() -> Result<(), fs::create_dir_all(&content_path).await?; // Make the two files on disk before loading the store. - let file1 = OsString::from(format!("{content_path}/{digest1}")); - let file2 = OsString::from(format!("{content_path}/{digest2}")); + let file1 = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest1}")); + let file2 = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest2}")); write_file(&file1, VALUE1.as_bytes()).await?; write_file(&file2, VALUE2.as_bytes()).await?; set_file_atime(&file1, FileTime::from_unix_time(0, 0))?; @@ -794,7 +796,7 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { const SMALL_VALUE: &str = "01"; const BIG_VALUE: &str = "0123"; - static UNREFED_DIGESTS: LazyLock>> = + static UNREFED_DIGESTS: LazyLock>>> = LazyLock::new(|| Mutex::new(Vec::new())); struct LocalHooks {} impl FileEntryHooks for LocalHooks { @@ -802,15 +804,15 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { block_on(file_entry.get_file_path_locked(move |path_str| async move { let path = Path::new(&path_str); let digest = - digest_from_filename(path.file_name().unwrap().to_str().unwrap()).unwrap(); - UNREFED_DIGESTS.lock().push(digest); + key_from_filename(path.file_name().unwrap().to_str().unwrap()).unwrap(); + UNREFED_DIGESTS.lock().push(digest.borrow().into_owned()); Ok(()) })) .unwrap(); } } - let small_digest = DigestInfo::try_new(HASH1, SMALL_VALUE.len())?; + let small_digest = StoreKey::Digest(DigestInfo::try_new(HASH1, SMALL_VALUE.len())?); let big_digest = DigestInfo::try_new(HASH1, BIG_VALUE.len())?; let store = Box::pin( @@ -828,7 +830,7 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { ); // Insert data into store. store - .update_oneshot(small_digest, SMALL_VALUE.into()) + .update_oneshot(small_digest.borrow(), SMALL_VALUE.into()) .await?; store.update_oneshot(big_digest, BIG_VALUE.into()).await?; @@ -1089,7 +1091,8 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> { loop { yield_fn().await?; - let empty_digest_file_name = OsString::from(format!("{content_path}/{digest}")); + let empty_digest_file_name = + OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")); let file_metadata = fs::metadata(empty_digest_file_name) .await @@ -1218,7 +1221,7 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> { .get_file_path_locked(move |file_path| async move { assert_eq!( file_path, - OsString::from(format!("{content_path}/{digest}")) + OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")) ); Ok(()) }) @@ -1250,7 +1253,7 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { store.update_oneshot(digest, VALUE1.into()).await?; - let stored_file_path = OsString::from(format!("{content_path}/{digest}")); + let stored_file_path = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")); std::fs::remove_file(stored_file_path)?; let digest_result = store @@ -1460,7 +1463,7 @@ async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> { "Expected filesystem store to consume the file" ); - let expected_file_name = OsString::from(format!("{content_path}/{digest}")); + let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")); let new_inode = fs::create_file(expected_file_name) .await? .as_reader()