From 1003b6a1b6b28c7b397ee15626e009557441fcf4 Mon Sep 17 00:00:00 2001 From: Kirpal Grewal Date: Thu, 12 Dec 2024 13:01:19 +0000 Subject: [PATCH] use subfolders instead of prefixes rename --- nativelink-store/src/filesystem_store.rs | 140 ++++++++++++------ .../tests/filesystem_store_test.rs | 123 ++++++++------- 2 files changed, 155 insertions(+), 108 deletions(-) diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 3bc5098c6..8ed54f847 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::panic; use std::borrow::Cow; use std::ffi::{OsStr, OsString}; use std::fmt::{Debug, Formatter}; @@ -51,8 +52,14 @@ const DEFAULT_BUFF_SIZE: usize = 32 * 1024; // Default block size of all major filesystems is 4KB const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024; -pub const STR_PREFIX: &str = "s-"; -pub const DIGEST_PREFIX: &str = "d-"; +pub const STR_FOLDER: &str = "s"; +pub const DIGEST_FOLDER: &str = "d"; + +#[derive(Clone, Copy, Debug)] +pub enum FileType { + Digest, + String, +} #[derive(Debug, MetricsComponent)] pub struct SharedContext { @@ -148,8 +155,8 @@ impl Drop for EncodedFilePath { #[inline] fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString { match key { - StoreKey::Str(str) => format!("{folder}/{STR_PREFIX}{str}"), - StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_PREFIX}{digest_info}"), + StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"), + StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"), } .into() } @@ -421,17 +428,11 @@ fn digest_from_filename(file_name: &str) -> Result { DigestInfo::try_new(hash, size) } -pub fn key_from_filename(mut file_name: &str) -> Result, Error> { - if let Some(file_name) = file_name.strip_prefix(STR_PREFIX) { - return Ok(StoreKey::new_str(file_name)); +pub fn key_from_file(file_name: &str, file_type: FileType) -> Result, Error> { + match file_type { + FileType::String => Ok(StoreKey::new_str(file_name)), + FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest), } - - // Remove the digest prefix if it exists. Permit unprefixed hashes for backwards compatibility. - if let Some(name) = file_name.strip_prefix(DIGEST_PREFIX) { - file_name = name; - } - - digest_from_filename(file_name).map(StoreKey::Digest) } /// The number of files to read the metadata for at the same time when running @@ -444,16 +445,18 @@ async fn add_files_to_cache( shared_context: &Arc, block_size: u64, ) -> Result<(), Error> { + #[expect(clippy::too_many_arguments)] async fn process_entry( evicting_map: &EvictingMap, SystemTime>, file_name: &str, + file_type: FileType, atime: SystemTime, data_size: u64, block_size: u64, anchor_time: &SystemTime, shared_context: &Arc, ) -> Result<(), Error> { - let key = key_from_filename(file_name)?; + let key = key_from_file(file_name, file_type)?; let file_entry = Fe::create( data_size, @@ -477,11 +480,19 @@ async fn add_files_to_cache( Ok(()) } - let mut file_infos: Vec<(String, SystemTime, u64)> = { - let (_permit, dir_handle) = fs::read_dir(format!("{}/", shared_context.content_path)) - .await - .err_tip(|| "Failed opening content directory for iterating in filesystem store")? - .into_inner(); + async fn read_files( + filetype: FileType, + shared_context: &SharedContext, + ) -> Result, Error> { + let folder = match filetype { + FileType::String => STR_FOLDER, + FileType::Digest => DIGEST_FOLDER, + }; + let (_permit, dir_handle) = + fs::read_dir(format!("{}/{folder}/", shared_context.content_path)) + .await + .err_tip(|| "Failed opening content directory for iterating in filesystem store")? + .into_inner(); let read_dir_stream = ReadDirStream::new(dir_handle); read_dir_stream @@ -505,18 +516,32 @@ async fn add_files_to_cache( ); } }; - Result::<(String, SystemTime, u64), Error>::Ok((file_name, atime, metadata.len())) + Result::<(String, FileType, SystemTime, u64), Error>::Ok(( + file_name, + filetype, + atime, + metadata.len(), + )) }) .buffer_unordered(SIMULTANEOUS_METADATA_READS) .try_collect() - .await? - }; + .await + } + + let mut file_infos: Vec<(String, FileType, SystemTime, u64)> = + read_files(FileType::String, shared_context).await?; - file_infos.sort_by(|a, b| a.1.cmp(&b.1)); - for (file_name, atime, data_size) in file_infos { + let digest_infos: Vec<(String, FileType, SystemTime, u64)> = + read_files(FileType::Digest, shared_context).await?; + + file_infos.extend(digest_infos); + file_infos.sort_by(|a, b| a.2.cmp(&b.2)); + + for (file_name, file_type, atime, data_size) in file_infos { let result = process_entry( evicting_map, &file_name, + file_type, atime, data_size, block_size, @@ -532,26 +557,48 @@ async fn add_files_to_cache( "Failed to add file to eviction cache", ); // Ignore result. - let _ = - fs::remove_file(format!("{}/{}", &shared_context.content_path, &file_name)).await; + let _ = match file_type { + FileType::String => { + fs::remove_file(format!( + "{}/{}/{}", + &shared_context.content_path, STR_FOLDER, &file_name + )) + .await + } + FileType::Digest => { + fs::remove_file(format!( + "{}/{}/{}", + &shared_context.content_path, DIGEST_FOLDER, &file_name + )) + .await + } + }; } } Ok(()) } async fn prune_temp_path(temp_path: &str) -> Result<(), Error> { - let (_permit, dir_handle) = fs::read_dir(temp_path) - .await - .err_tip(|| "Failed opening temp directory to prune partial downloads in filesystem store")? - .into_inner(); - - let mut read_dir_stream = ReadDirStream::new(dir_handle); - while let Some(dir_entry) = read_dir_stream.next().await { - let path = dir_entry?.path(); - if let Err(err) = fs::remove_file(&path).await { - event!(Level::WARN, ?path, ?err, "Failed to delete file",); + async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> { + let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}")) + .await + .err_tip(|| { + "Failed opening temp directory to prune partial downloads in filesystem store" + })? + .into_inner(); + + let mut read_dir_stream = ReadDirStream::new(dir_handle); + while let Some(dir_entry) = read_dir_stream.next().await { + let path = dir_entry?.path(); + if let Err(err) = fs::remove_file(&path).await { + event!(Level::WARN, ?path, ?err, "Failed to delete file",); + } } + Ok(()) } + + prune_temp_inner(temp_path, STR_FOLDER).await?; + prune_temp_inner(temp_path, DIGEST_FOLDER).await?; Ok(()) } @@ -581,18 +628,25 @@ impl FilesystemStore { sleep_fn: fn(Duration) -> Sleep, rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>, ) -> Result, Error> { + async fn create_subdirs(path: &str) -> Result<(), Error> { + fs::create_dir_all(format!("{path}/{STR_FOLDER}")) + .await + .err_tip(|| format!("Failed to create directory {path}/{STR_FOLDER}"))?; + fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}")) + .await + .err_tip(|| format!("Failed to create directory {path}/{DIGEST_FOLDER}")) + } + let now = SystemTime::now(); let empty_policy = nativelink_config::stores::EvictionPolicy::default(); let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy); let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now)); - fs::create_dir_all(&spec.temp_path) - .await - .err_tip(|| format!("Failed to temp directory {:?}", &spec.temp_path))?; - fs::create_dir_all(&spec.content_path) - .await - .err_tip(|| format!("Failed to content directory {:?}", &spec.content_path))?; + // Create temp and content directories and the s and d subdirectories. + + create_subdirs(&spec.temp_path).await?; + create_subdirs(&spec.content_path).await?; let shared_context = Arc::new(SharedContext { active_drop_spawns: AtomicU64::new(0), diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 5fbde7662..f0f2ad555 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -32,8 +32,8 @@ use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::{ - key_from_filename, EncodedFilePath, FileEntry, FileEntryImpl, FilesystemStore, DIGEST_PREFIX, - STR_PREFIX, + key_from_file, EncodedFilePath, FileEntry, FileEntryImpl, FileType, FilesystemStore, + DIGEST_FOLDER, STR_FOLDER, }; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::common::{fs, DigestInfo}; @@ -244,6 +244,34 @@ async fn wait_for_no_open_files() -> Result<(), Error> { Ok(()) } +/// Helper function to ensure there are no temporary files left. +async fn check_temp_empty(temp_path: &str) -> Result<(), Error> { + let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{DIGEST_FOLDER}")) + .await + .err_tip(|| "Failed opening temp directory")? + .into_inner(); + + let mut read_dir_stream = ReadDirStream::new(temp_dir_handle); + + if let Some(temp_dir_entry) = read_dir_stream.next().await { + let path = temp_dir_entry?.path(); + panic!("No files should exist in temp directory, found: {path:?}"); + } + + let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{STR_FOLDER}")) + .await + .err_tip(|| "Failed opening temp directory")? + .into_inner(); + + let mut read_dir_stream = ReadDirStream::new(temp_dir_handle); + + if let Some(temp_dir_entry) = read_dir_stream.next().await { + let path = temp_dir_entry?.path(); + panic!("No files should exist in temp directory, found: {path:?}"); + } + Ok(()) +} + const HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef"; const HASH2: &str = "0123456789abcdef000000000000000000020000000000000123456789abcdef"; const VALUE1: &str = "0123456789"; @@ -329,7 +357,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { store.update_oneshot(digest1, VALUE1.into()).await?; - let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest1}")); + let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest1}")); { // Check to ensure our file exists where it should and content matches. let data = read_file_contents(&expected_file_name).await?; @@ -360,18 +388,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { tokio::task::yield_now().await; } - let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone()) - .await - .err_tip(|| "Failed opening temp directory")? - .into_inner(); - let mut read_dir_stream = ReadDirStream::new(temp_dir_handle); - - if let Some(temp_dir_entry) = read_dir_stream.next().await { - let path = temp_dir_entry?.path(); - panic!("No files should exist in temp directory, found: {path:?}"); - } - - Ok(()) + check_temp_empty(&temp_path).await } // This test ensures that if a file is overridden and an open stream to the file already @@ -437,8 +454,8 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error> tokio::task::yield_now().await; { - // Now ensure we only have 1 file in our temp path. - let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone()) + // Now ensure we only have 1 file in our temp path - we know it is a digest. + let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{DIGEST_FOLDER}")) .await .err_tip(|| "Failed opening temp directory")? .into_inner(); @@ -478,20 +495,8 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error> tokio::task::yield_now().await; } - { - // Now ensure our temp file was cleaned up. - let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone()) - .await - .err_tip(|| "Failed opening temp directory")? - .into_inner(); - let mut read_dir_stream = ReadDirStream::new(temp_dir_handle); - if let Some(temp_dir_entry) = read_dir_stream.next().await { - let path = temp_dir_entry?.path(); - panic!("No files should exist in temp directory, found: {path:?}"); - } - } - - Ok(()) + // Now ensure our temp file was cleaned up. + check_temp_empty(&temp_path).await } // Eviction has a different code path than a file replacement, so we check that if a @@ -550,8 +555,8 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> { tokio::task::yield_now().await; { - // Now ensure we only have 1 file in our temp path. - let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone()) + // Now ensure we only have 1 file in our temp path - we know it is a digest. + let (_permit, temp_dir_handle) = fs::read_dir(format!("{temp_path}/{DIGEST_FOLDER}")) .await .err_tip(|| "Failed opening temp directory")? .into_inner(); @@ -587,20 +592,8 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> { tokio::task::yield_now().await; } - { - // Now ensure our temp file was cleaned up. - let (_permit, temp_dir_handle) = fs::read_dir(temp_path.clone()) - .await - .err_tip(|| "Failed opening temp directory")? - .into_inner(); - let mut read_dir_stream = ReadDirStream::new(temp_dir_handle); - if let Some(temp_dir_entry) = read_dir_stream.next().await { - let path = temp_dir_entry?.path(); - panic!("No files should exist in temp directory, found: {path:?}"); - } - } - - Ok(()) + // Now ensure our temp file was cleaned up. + check_temp_empty(&temp_path).await } #[serial] @@ -658,11 +651,11 @@ async fn oldest_entry_evicted_with_access_times_loaded_from_disk() -> Result<(), let digest2 = DigestInfo::try_new(HASH1, VALUE1.len())?; let content_path = make_temp_path("content_path"); - fs::create_dir_all(&content_path).await?; + fs::create_dir_all(format!("{content_path}/{DIGEST_FOLDER}")).await?; // Make the two files on disk before loading the store. - let file1 = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest1}")); - let file2 = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest2}")); + let file1 = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest1}")); + let file2 = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest2}")); write_file(&file1, VALUE1.as_bytes()).await?; write_file(&file2, VALUE2.as_bytes()).await?; set_file_atime(&file1, FileTime::from_unix_time(0, 0))?; @@ -805,8 +798,11 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { fn on_unref(file_entry: &Fe) { block_on(file_entry.get_file_path_locked(move |path_str| async move { let path = Path::new(&path_str); - let digest = - key_from_filename(path.file_name().unwrap().to_str().unwrap()).unwrap(); + let digest = key_from_file( + path.file_name().unwrap().to_str().unwrap(), + FileType::Digest, + ) + .unwrap(); UNREFED_DIGESTS.lock().push(digest.borrow().into_owned()); Ok(()) })) @@ -863,7 +859,10 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens() ) -> Result { loop { yield_fn().await?; - let (_permit, dir_handle) = fs::read_dir(&temp_path).await?.into_inner(); + // Now ensure we only have 1 file in our temp path - we know it is a digest. + let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{DIGEST_FOLDER}")) + .await? + .into_inner(); let mut read_dir_stream = ReadDirStream::new(dir_handle); if let Some(dir_entry) = read_dir_stream.next().await { assert!( @@ -971,13 +970,7 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens() // Now it should have cleaned up its temp files. { - // Ensure `temp_path` is empty. - let (_permit, dir_handle) = fs::read_dir(&temp_path).await?.into_inner(); - let mut read_dir_stream = ReadDirStream::new(dir_handle); - assert!( - read_dir_stream.next().await.is_none(), - "File found in temp_path after update() rename failure" - ); + check_temp_empty(&temp_path).await?; } // Finally ensure that our entry is not in the store. @@ -1094,7 +1087,7 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> { yield_fn().await?; let empty_digest_file_name = - OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")); + OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}")); let file_metadata = fs::metadata(empty_digest_file_name) .await @@ -1223,7 +1216,7 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> { .get_file_path_locked(move |file_path| async move { assert_eq!( file_path, - OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")) + OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}")) ); Ok(()) }) @@ -1255,7 +1248,7 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { store.update_oneshot(digest, VALUE1.into()).await?; - let stored_file_path = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")); + let stored_file_path = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}")); std::fs::remove_file(stored_file_path)?; let digest_result = store @@ -1272,7 +1265,7 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { .update_oneshot(string_key.borrow(), VALUE2.into()) .await?; - let stored_file_path = OsString::from(format!("{content_path}/{STR_PREFIX}{STRING_NAME}")); + let stored_file_path = OsString::from(format!("{content_path}/{STR_FOLDER}/{STRING_NAME}")); std::fs::remove_file(stored_file_path)?; let string_result = store @@ -1482,7 +1475,7 @@ async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> { "Expected filesystem store to consume the file" ); - let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_PREFIX}{digest}")); + let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}")); let new_inode = fs::create_file(expected_file_name) .await? .as_reader()