From 703aa3233059c5d6fac1526ce2cd31ca853da8de Mon Sep 17 00:00:00 2001 From: Kirpal Grewal Date: Thu, 21 Nov 2024 17:38:42 +0000 Subject: [PATCH] Support native StoreKey in FilesystemStore --- nativelink-store/src/filesystem_store.rs | 157 +++++++++++------- nativelink-store/src/memory_store.rs | 22 +-- .../tests/filesystem_store_test.rs | 52 ++++-- nativelink-util/src/store_trait.rs | 37 ++++- 4 files changed, 181 insertions(+), 87 deletions(-) diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 7e8c24cba..3bc5098c6 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -35,7 +35,9 @@ use nativelink_util::buf_channel::{ use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; -use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo}; +use nativelink_util::store_trait::{ + StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo, +}; use nativelink_util::{background_spawn, spawn_blocking}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::time::{sleep, timeout, Sleep}; @@ -49,6 +51,9 @@ const DEFAULT_BUFF_SIZE: usize = 32 * 1024; // Default block size of all major filesystems is 4KB const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024; +pub const STR_PREFIX: &str = "s-"; +pub const DIGEST_PREFIX: &str = "d-"; + #[derive(Debug, MetricsComponent)] pub struct SharedContext { // Used in testing to know how many active drop() spawns are running. @@ -70,22 +75,21 @@ enum PathType { Custom(OsString), } -// Note: We don't store the full path of the file because it would cause -// a lot of needless memeory bloat. There's a high chance we'll end up with a -// lot of small files, so to prevent storing duplicate data, we store an Arc -// to the path of the directory where the file is stored and the packed digest. -// Resulting in usize + sizeof(DigestInfo). -type FileNameDigest = DigestInfo; +/// [`EncodedFilePath`] stores the path to the file +/// including the context, path type and key to the file. +/// The whole [`StoreKey`] is stored as opposed to solely +/// the [`DigestInfo`] so that it is more usable for things +/// such as BEP -see Issue #1108 pub struct EncodedFilePath { shared_context: Arc, path_type: PathType, - digest: FileNameDigest, + key: StoreKey<'static>, } impl EncodedFilePath { #[inline] fn get_file_path(&self) -> Cow<'_, OsStr> { - get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.digest) + get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key) } } @@ -93,14 +97,14 @@ impl EncodedFilePath { fn get_file_path_raw<'a>( path_type: &'a PathType, shared_context: &SharedContext, - digest: &DigestInfo, + key: &StoreKey<'a>, ) -> Cow<'a, OsStr> { let folder = match path_type { PathType::Content => &shared_context.content_path, PathType::Temp => &shared_context.temp_path, PathType::Custom(path) => return Cow::Borrowed(path), }; - Cow::Owned(to_full_path_from_digest(folder, digest)) + Cow::Owned(to_full_path_from_key(folder, key)) } impl Drop for EncodedFilePath { @@ -131,9 +135,23 @@ impl Drop for EncodedFilePath { } } +/// This creates the file path from the [`StoreKey`]. If +/// it is a string, the string, prefixed with [`STR_PREFIX`] +/// for backwards compatibility, is stored. +/// +/// If it is a [`DigestInfo`], it is prefixed by [`DIGEST_PREFIX`] +/// followed by the string representation of a digest - the hash in hex, +/// a hyphen then the size in bytes +/// +/// Previously, only the string representation of the [`DigestInfo`] was +/// used with no prefix #[inline] -fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString { - format!("{folder}/{digest}").into() +fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString { + match key { + StoreKey::Str(str) => format!("{folder}/{STR_PREFIX}{str}"), + StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_PREFIX}{digest_info}"), + } + .into() } pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { @@ -300,7 +318,7 @@ impl Debug for FileEntryImpl { } } -fn make_temp_digest(digest: &mut DigestInfo) { +fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo { static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0); let mut hash = *digest.packed_hash(); hash[24..].clone_from_slice( @@ -309,6 +327,11 @@ fn make_temp_digest(digest: &mut DigestInfo) { .to_le_bytes(), ); digest.set_packed_hash(*hash); + digest +} + +fn make_temp_key(key: &StoreKey) -> StoreKey<'static> { + StoreKey::Digest(make_temp_digest(key.borrow().into_digest())) } impl LenEntry for FileEntryImpl { @@ -362,16 +385,15 @@ impl LenEntry for FileEntryImpl { return; } let from_path = encoded_file_path.get_file_path(); - let mut new_digest = encoded_file_path.digest; - make_temp_digest(&mut new_digest); + let new_key = make_temp_key(&encoded_file_path.key); let to_path = - to_full_path_from_digest(&encoded_file_path.shared_context.temp_path, &new_digest); + to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key); if let Err(err) = fs::rename(&from_path, &to_path).await { event!( Level::WARN, - digest = ?encoded_file_path.digest, + key = ?encoded_file_path.key, ?from_path, ?to_path, ?err, @@ -380,37 +402,50 @@ impl LenEntry for FileEntryImpl { } else { event!( Level::INFO, - digest = ?encoded_file_path.digest, + key = ?encoded_file_path.key, ?from_path, ?to_path, "Renamed file", ); encoded_file_path.path_type = PathType::Temp; - encoded_file_path.digest = new_digest; + encoded_file_path.key = new_key; } } } } #[inline] -pub fn digest_from_filename(file_name: &str) -> Result { +fn digest_from_filename(file_name: &str) -> Result { let (hash, size) = file_name.split_once('-').err_tip(|| "")?; let size = size.parse::()?; DigestInfo::try_new(hash, size) } +pub fn key_from_filename(mut file_name: &str) -> Result, Error> { + if let Some(file_name) = file_name.strip_prefix(STR_PREFIX) { + return Ok(StoreKey::new_str(file_name)); + } + + // Remove the digest prefix if it exists. Permit unprefixed hashes for backwards compatibility. + if let Some(name) = file_name.strip_prefix(DIGEST_PREFIX) { + file_name = name; + } + + digest_from_filename(file_name).map(StoreKey::Digest) +} + /// The number of files to read the metadata for at the same time when running /// `add_files_to_cache`. const SIMULTANEOUS_METADATA_READS: usize = 200; async fn add_files_to_cache( - evicting_map: &EvictingMap, SystemTime>, + evicting_map: &EvictingMap, SystemTime>, anchor_time: &SystemTime, shared_context: &Arc, block_size: u64, ) -> Result<(), Error> { async fn process_entry( - evicting_map: &EvictingMap, SystemTime>, + evicting_map: &EvictingMap, SystemTime>, file_name: &str, atime: SystemTime, data_size: u64, @@ -418,7 +453,7 @@ async fn add_files_to_cache( anchor_time: &SystemTime, shared_context: &Arc, ) -> Result<(), Error> { - let digest = digest_from_filename(file_name)?; + let key = key_from_filename(file_name)?; let file_entry = Fe::create( data_size, @@ -426,7 +461,7 @@ async fn add_files_to_cache( RwLock::new(EncodedFilePath { shared_context: shared_context.clone(), path_type: PathType::Content, - digest, + key: key.borrow().into_owned(), }), ); let time_since_anchor = anchor_time @@ -434,7 +469,7 @@ async fn add_files_to_cache( .map_err(|_| make_input_err!("File access time newer than now"))?; evicting_map .insert_with_time( - digest, + key.into_owned().into(), Arc::new(file_entry), time_since_anchor.as_secs() as i32, ) @@ -525,7 +560,7 @@ pub struct FilesystemStore { #[metric] shared_context: Arc, #[metric(group = "evicting_map")] - evicting_map: Arc, SystemTime>>, + evicting_map: Arc, SystemTime>>, #[metric(help = "Block size of the configured filesystem")] block_size: u64, #[metric(help = "Size of the configured read buffer size")] @@ -595,7 +630,7 @@ impl FilesystemStore { pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result, Error> { self.evicting_map - .get(digest) + .get::>(&digest.into()) .await .ok_or_else(|| make_err!(Code::NotFound, "{digest} not found in filesystem store")) } @@ -604,7 +639,7 @@ impl FilesystemStore { self: Pin<&'a Self>, mut entry: Fe, mut resumeable_temp_file: fs::ResumeableFileSlot, - final_digest: DigestInfo, + final_key: StoreKey<'static>, mut reader: DropCloserReadHalf, ) -> Result<(), Error> { let mut data_size = 0; @@ -649,10 +684,10 @@ impl FilesystemStore { drop(resumeable_temp_file); *entry.data_size_mut() = data_size; - self.emplace_file(final_digest, Arc::new(entry)).await + self.emplace_file(final_key, Arc::new(entry)).await } - async fn emplace_file(&self, digest: DigestInfo, entry: Arc) -> Result<(), Error> { + async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc) -> Result<(), Error> { // This sequence of events is quite ticky to understand due to the amount of triggers that // happen, async'ness of it and the locking. So here is a breakdown of what happens: // 1. Here will hold a write lock on any file operations of this FileEntry. @@ -680,10 +715,12 @@ impl FilesystemStore { let final_path = get_file_path_raw( &PathType::Content, encoded_file_path.shared_context.as_ref(), - &digest, + &key, ); - evicting_map.insert(digest, entry.clone()).await; + evicting_map + .insert(key.borrow().into_owned().into(), entry.clone()) + .await; let from_path = encoded_file_path.get_file_path(); // Internally tokio spawns fs commands onto a blocking thread anyways. @@ -709,13 +746,14 @@ impl FilesystemStore { drop(encoded_file_path); // It is possible that the item in our map is no longer the item we inserted, // So, we need to conditionally remove it only if the pointers are the same. + evicting_map - .remove_if(&digest, |map_entry| Arc::::ptr_eq(map_entry, &entry)) + .remove_if(&key, |map_entry| Arc::::ptr_eq(map_entry, &entry)) .await; return Err(err); } encoded_file_path.path_type = PathType::Content; - encoded_file_path.digest = digest; + encoded_file_path.key = key; Ok(()) }) .await @@ -730,26 +768,25 @@ impl StoreDriver for FilesystemStore { keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - // TODO(allada) This is a bit of a hack to get around the lifetime issues with the - // existence_cache. We need to convert the digests to owned values to be able to - // insert them into the cache. In theory it should be able to elide this conversion - // but it seems to be a bit tricky to get right. - let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_digest()).collect(); self.evicting_map - .sizes_for_keys(&keys, results, false /* peek */) + .sizes_for_keys::<_, StoreKey<'_>, &StoreKey<'_>>( + keys.iter(), + results, + false, /* peek */ + ) .await; // We need to do a special pass to ensure our zero files exist. // If our results failed and the result was a zero file, we need to // create the file by spec. - for (digest, result) in keys.iter().zip(results.iter_mut()) { - if result.is_some() || !is_zero_digest(digest) { + for (key, result) in keys.iter().zip(results.iter_mut()) { + if result.is_some() || !is_zero_digest(key.borrow()) { continue; } let (mut tx, rx) = make_buf_channel_pair(); let send_eof_result = tx.send_eof(); - self.update(digest.into(), rx, UploadSizeInfo::ExactSize(0)) + self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0)) .await - .err_tip(|| format!("Failed to create zero file for key {digest}")) + .err_tip(|| format!("Failed to create zero file for key {}", key.as_str())) .merge( send_eof_result .err_tip(|| "Failed to send zero file EOF in filesystem store has"), @@ -766,21 +803,18 @@ impl StoreDriver for FilesystemStore { reader: DropCloserReadHalf, _upload_size: UploadSizeInfo, ) -> Result<(), Error> { - let digest = key.into_digest(); - let mut temp_digest = digest; - make_temp_digest(&mut temp_digest); - + let temp_key = make_temp_key(&key); let (entry, temp_file, temp_full_path) = Fe::make_and_open_file( self.block_size, EncodedFilePath { shared_context: self.shared_context.clone(), path_type: PathType::Temp, - digest: temp_digest, + key: temp_key, }, ) .await?; - self.update_file(entry, temp_file, digest, reader) + self.update_file(entry, temp_file, key.into_owned(), reader) .await .err_tip(|| format!("While processing with temp file {temp_full_path:?}")) } @@ -795,7 +829,6 @@ impl StoreDriver for FilesystemStore { mut file: fs::ResumeableFileSlot, upload_size: UploadSizeInfo, ) -> Result, Error> { - let digest = key.into_digest(); let path = file.get_path().as_os_str().to_os_string(); let file_size = match upload_size { UploadSizeInfo::ExactSize(size) => size, @@ -818,13 +851,13 @@ impl StoreDriver for FilesystemStore { RwLock::new(EncodedFilePath { shared_context: self.shared_context.clone(), path_type: PathType::Custom(path), - digest, + key: key.borrow().into_owned(), }), ); // We are done with the file, if we hold a reference to the file here, it could // result in a deadlock if `emplace_file()` also needs file descriptors. drop(file); - self.emplace_file(digest, Arc::new(entry)) + self.emplace_file(key.into_owned(), Arc::new(entry)) .await .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?; return Ok(None); @@ -837,9 +870,8 @@ impl StoreDriver for FilesystemStore { offset: u64, length: Option, ) -> Result<(), Error> { - let digest = key.into_digest(); - if is_zero_digest(digest) { - self.has(digest.into()) + if is_zero_digest(key.borrow()) { + self.has(key.borrow()) .await .err_tip(|| "Failed to check if zero digest exists in filesystem store")?; writer @@ -848,10 +880,13 @@ impl StoreDriver for FilesystemStore { return Ok(()); } - let entry = - self.evicting_map.get(&digest).await.ok_or_else(|| { - make_err!(Code::NotFound, "{digest} not found in filesystem store") - })?; + let entry = self.evicting_map.get(&key).await.ok_or_else(|| { + make_err!( + Code::NotFound, + "{} not found in filesystem store here", + key.as_str() + ) + })?; let read_limit = length.unwrap_or(u64::MAX); let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?; diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index 27cf2cda0..edc5e2dd5 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Borrow; use std::fmt::Debug; use std::ops::Bound; use std::pin::Pin; @@ -26,7 +27,7 @@ use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; -use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreKeyBorrow, UploadSizeInfo}; use crate::cas_utils::is_zero_digest; @@ -54,7 +55,7 @@ impl LenEntry for BytesWrapper { #[derive(MetricsComponent)] pub struct MemoryStore { #[metric(group = "evicting_map")] - evicting_map: EvictingMap, BytesWrapper, SystemTime>, + evicting_map: EvictingMap, } impl MemoryStore { @@ -73,7 +74,7 @@ impl MemoryStore { } pub async fn remove_entry(&self, key: StoreKey<'_>) -> bool { - self.evicting_map.remove(&key.into_owned()).await + self.evicting_map.remove(&key).await } } @@ -84,11 +85,12 @@ impl StoreDriver for MemoryStore { keys: &[StoreKey<'_>], results: &mut [Option], ) -> Result<(), Error> { - // TODO(allada): This is a dirty hack to get around the lifetime issues with the - // evicting map. - let digests: Vec<_> = keys.iter().map(|key| key.borrow().into_owned()).collect(); self.evicting_map - .sizes_for_keys(digests, results, false /* peek */) + .sizes_for_keys::<_, StoreKey<'_>, &StoreKey<'_>>( + keys.iter(), + results, + false, /* peek */ + ) .await; // We need to do a special pass to ensure our zero digest exist. keys.iter() @@ -112,7 +114,7 @@ impl StoreDriver for MemoryStore { ); let iterations = self .evicting_map - .range(range, move |key, _value| handler(key)) + .range(range, move |key, _value| handler(key.borrow())) .await; Ok(iterations) } @@ -136,7 +138,7 @@ impl StoreDriver for MemoryStore { }; self.evicting_map - .insert(key.borrow().into_owned(), BytesWrapper(final_buffer)) + .insert(key.into_owned().into(), BytesWrapper(final_buffer)) .await; Ok(()) } @@ -162,7 +164,7 @@ impl StoreDriver for MemoryStore { let value = self .evicting_map - .get(&key.borrow().into_owned()) + .get(&key) .await .err_tip_with_code(|_| (Code::NotFound, format!("Key {key:?} not found")))?; let default_len = usize::try_from(value.len()) diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 99b3333c8..5fbde7662 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -32,13 +32,14 @@ use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::{ - digest_from_filename, EncodedFilePath, FileEntry, FileEntryImpl, FilesystemStore, + key_from_filename, EncodedFilePath, FileEntry, FileEntryImpl, FilesystemStore, DIGEST_PREFIX, + STR_PREFIX, }; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::LenEntry; use nativelink_util::origin_context::ContextAwareFuture; -use nativelink_util::store_trait::{Store, StoreLike, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreKey, StoreLike, UploadSizeInfo}; use nativelink_util::{background_spawn, spawn}; use parking_lot::Mutex; use pretty_assertions::assert_eq; @@ -247,6 +248,7 @@ const HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789a const HASH2: &str = "0123456789abcdef000000000000000000020000000000000123456789abcdef"; const VALUE1: &str = "0123456789"; const VALUE2: &str = "9876543210"; +const STRING_NAME: &str = "String_Filename"; #[serial] #[nativelink_test] @@ -288,7 +290,9 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> { .await?, ); - let content = store.get_part_unchunked(digest, 0, None).await?; + let key = StoreKey::Digest(digest); + + let content = store.get_part_unchunked(key, 0, None).await?; assert_eq!(content, VALUE1.as_bytes()); } @@ -325,7 +329,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { store.update_oneshot(digest1, VALUE1.into()).await?; - let expected_file_name = OsString::from(format!("{content_path}/{digest1}")); + let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest1}")); { // Check to ensure our file exists where it should and content matches. let data = read_file_contents(&expected_file_name).await?; @@ -657,8 +661,8 @@ async fn oldest_entry_evicted_with_access_times_loaded_from_disk() -> Result<(), fs::create_dir_all(&content_path).await?; // Make the two files on disk before loading the store. - let file1 = OsString::from(format!("{content_path}/{digest1}")); - let file2 = OsString::from(format!("{content_path}/{digest2}")); + let file1 = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest1}")); + let file2 = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest2}")); write_file(&file1, VALUE1.as_bytes()).await?; write_file(&file2, VALUE2.as_bytes()).await?; set_file_atime(&file1, FileTime::from_unix_time(0, 0))?; @@ -794,7 +798,7 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { const SMALL_VALUE: &str = "01"; const BIG_VALUE: &str = "0123"; - static UNREFED_DIGESTS: LazyLock>> = + static UNREFED_DIGESTS: LazyLock>>> = LazyLock::new(|| Mutex::new(Vec::new())); struct LocalHooks {} impl FileEntryHooks for LocalHooks { @@ -802,15 +806,15 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { block_on(file_entry.get_file_path_locked(move |path_str| async move { let path = Path::new(&path_str); let digest = - digest_from_filename(path.file_name().unwrap().to_str().unwrap()).unwrap(); - UNREFED_DIGESTS.lock().push(digest); + key_from_filename(path.file_name().unwrap().to_str().unwrap()).unwrap(); + UNREFED_DIGESTS.lock().push(digest.borrow().into_owned()); Ok(()) })) .unwrap(); } } - let small_digest = DigestInfo::try_new(HASH1, SMALL_VALUE.len())?; + let small_digest = StoreKey::Digest(DigestInfo::try_new(HASH1, SMALL_VALUE.len())?); let big_digest = DigestInfo::try_new(HASH1, BIG_VALUE.len())?; let store = Box::pin( @@ -828,7 +832,7 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { ); // Insert data into store. store - .update_oneshot(small_digest, SMALL_VALUE.into()) + .update_oneshot(small_digest.borrow(), SMALL_VALUE.into()) .await?; store.update_oneshot(big_digest, BIG_VALUE.into()).await?; @@ -1089,7 +1093,8 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> { loop { yield_fn().await?; - let empty_digest_file_name = OsString::from(format!("{content_path}/{digest}")); + let empty_digest_file_name = + OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")); let file_metadata = fs::metadata(empty_digest_file_name) .await @@ -1218,7 +1223,7 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> { .get_file_path_locked(move |file_path| async move { assert_eq!( file_path, - OsString::from(format!("{content_path}/{digest}")) + OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")) ); Ok(()) }) @@ -1250,7 +1255,7 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { store.update_oneshot(digest, VALUE1.into()).await?; - let stored_file_path = OsString::from(format!("{content_path}/{digest}")); + let stored_file_path = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")); std::fs::remove_file(stored_file_path)?; let digest_result = store @@ -1259,6 +1264,23 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { .err_tip(|| "Failed to execute has")?; assert!(digest_result.is_none()); + // Repeat with a string typed key. + + let string_key = StoreKey::new_str(STRING_NAME); + + store + .update_oneshot(string_key.borrow(), VALUE2.into()) + .await?; + + let stored_file_path = OsString::from(format!("{content_path}/{STR_PREFIX}{STRING_NAME}")); + std::fs::remove_file(stored_file_path)?; + + let string_result = store + .has(string_key) + .await + .err_tip(|| "Failed to execute has")?; + assert!(string_result.is_none()); + Ok(()) } @@ -1460,7 +1482,7 @@ async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> { "Expected filesystem store to consume the file" ); - let expected_file_name = OsString::from(format!("{content_path}/{digest}")); + let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")); let new_inode = fs::create_file(expected_file_name) .await? .as_reader() diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index f04470772..4f01194b3 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::{BorrowMut, Cow}; +use std::borrow::{Borrow, BorrowMut, Cow}; use std::collections::hash_map::DefaultHasher as StdHasher; use std::convert::Into; use std::hash::{Hash, Hasher}; @@ -144,6 +144,41 @@ pub enum StoreOptimizations { NoopDownloads, } +/// A wrapper struct for [`StoreKey`] to work around +/// lifetime limitations in `HashMap::get()` as described in +/// +/// +/// As such this is a wrapper type that is stored in the +/// maps using the workaround as described in +/// +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[repr(transparent)] +pub struct StoreKeyBorrow(StoreKey<'static>); + +impl From> for StoreKeyBorrow { + fn from(key: StoreKey<'static>) -> Self { + Self(key) + } +} + +impl From for StoreKey<'static> { + fn from(key_borrow: StoreKeyBorrow) -> Self { + key_borrow.0 + } +} + +impl<'a> Borrow> for StoreKeyBorrow { + fn borrow(&self) -> &StoreKey<'a> { + &self.0 + } +} + +impl<'a> Borrow> for &StoreKeyBorrow { + fn borrow(&self) -> &StoreKey<'a> { + &self.0 + } +} + /// Holds something that can be converted into a key the /// store API can understand. Generally this is a digest /// but it can also be a string if the caller wishes to