Skip to content

Commit

Permalink
use subfolders instead of prefixes
Browse files Browse the repository at this point in the history
rename
  • Loading branch information
KGrewal1 committed Dec 12, 2024
1 parent 703aa32 commit 1003b6a
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 108 deletions.
140 changes: 97 additions & 43 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::panic;
use std::borrow::Cow;
use std::ffi::{OsStr, OsString};
use std::fmt::{Debug, Formatter};
Expand Down Expand Up @@ -51,8 +52,14 @@ const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
// Default block size of all major filesystems is 4KB
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;

pub const STR_PREFIX: &str = "s-";
pub const DIGEST_PREFIX: &str = "d-";
pub const STR_FOLDER: &str = "s";
pub const DIGEST_FOLDER: &str = "d";

#[derive(Clone, Copy, Debug)]
pub enum FileType {
Digest,
String,
}

#[derive(Debug, MetricsComponent)]
pub struct SharedContext {
Expand Down Expand Up @@ -148,8 +155,8 @@ impl Drop for EncodedFilePath {
#[inline]
fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString {
match key {
StoreKey::Str(str) => format!("{folder}/{STR_PREFIX}{str}"),
StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_PREFIX}{digest_info}"),
StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"),
StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"),
}
.into()
}
Expand Down Expand Up @@ -421,17 +428,11 @@ fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
DigestInfo::try_new(hash, size)
}

pub fn key_from_filename(mut file_name: &str) -> Result<StoreKey<'_>, Error> {
if let Some(file_name) = file_name.strip_prefix(STR_PREFIX) {
return Ok(StoreKey::new_str(file_name));
pub fn key_from_file(file_name: &str, file_type: FileType) -> Result<StoreKey<'_>, Error> {
match file_type {
FileType::String => Ok(StoreKey::new_str(file_name)),
FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest),
}

// Remove the digest prefix if it exists. Permit unprefixed hashes for backwards compatibility.
if let Some(name) = file_name.strip_prefix(DIGEST_PREFIX) {
file_name = name;
}

digest_from_filename(file_name).map(StoreKey::Digest)
}

/// The number of files to read the metadata for at the same time when running
Expand All @@ -444,16 +445,18 @@ async fn add_files_to_cache<Fe: FileEntry>(
shared_context: &Arc<SharedContext>,
block_size: u64,
) -> Result<(), Error> {
#[expect(clippy::too_many_arguments)]
async fn process_entry<Fe: FileEntry>(
evicting_map: &EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>,
file_name: &str,
file_type: FileType,
atime: SystemTime,
data_size: u64,
block_size: u64,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
) -> Result<(), Error> {
let key = key_from_filename(file_name)?;
let key = key_from_file(file_name, file_type)?;

let file_entry = Fe::create(
data_size,
Expand All @@ -477,11 +480,19 @@ async fn add_files_to_cache<Fe: FileEntry>(
Ok(())
}

let mut file_infos: Vec<(String, SystemTime, u64)> = {
let (_permit, dir_handle) = fs::read_dir(format!("{}/", shared_context.content_path))
.await
.err_tip(|| "Failed opening content directory for iterating in filesystem store")?
.into_inner();
async fn read_files(
filetype: FileType,
shared_context: &SharedContext,
) -> Result<Vec<(String, FileType, SystemTime, u64)>, Error> {
let folder = match filetype {
FileType::String => STR_FOLDER,
FileType::Digest => DIGEST_FOLDER,
};
let (_permit, dir_handle) =
fs::read_dir(format!("{}/{folder}/", shared_context.content_path))
.await
.err_tip(|| "Failed opening content directory for iterating in filesystem store")?
.into_inner();

let read_dir_stream = ReadDirStream::new(dir_handle);
read_dir_stream
Expand All @@ -505,18 +516,32 @@ async fn add_files_to_cache<Fe: FileEntry>(
);
}
};
Result::<(String, SystemTime, u64), Error>::Ok((file_name, atime, metadata.len()))
Result::<(String, FileType, SystemTime, u64), Error>::Ok((
file_name,
filetype,
atime,
metadata.len(),
))
})
.buffer_unordered(SIMULTANEOUS_METADATA_READS)
.try_collect()
.await?
};
.await
}

let mut file_infos: Vec<(String, FileType, SystemTime, u64)> =
read_files(FileType::String, shared_context).await?;

file_infos.sort_by(|a, b| a.1.cmp(&b.1));
for (file_name, atime, data_size) in file_infos {
let digest_infos: Vec<(String, FileType, SystemTime, u64)> =
read_files(FileType::Digest, shared_context).await?;

file_infos.extend(digest_infos);
file_infos.sort_by(|a, b| a.2.cmp(&b.2));

for (file_name, file_type, atime, data_size) in file_infos {
let result = process_entry(
evicting_map,
&file_name,
file_type,
atime,
data_size,
block_size,
Expand All @@ -532,26 +557,48 @@ async fn add_files_to_cache<Fe: FileEntry>(
"Failed to add file to eviction cache",
);
// Ignore result.
let _ =
fs::remove_file(format!("{}/{}", &shared_context.content_path, &file_name)).await;
let _ = match file_type {
FileType::String => {
fs::remove_file(format!(
"{}/{}/{}",
&shared_context.content_path, STR_FOLDER, &file_name
))
.await
}
FileType::Digest => {
fs::remove_file(format!(
"{}/{}/{}",
&shared_context.content_path, DIGEST_FOLDER, &file_name
))
.await
}
};
}
}
Ok(())
}

async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
let (_permit, dir_handle) = fs::read_dir(temp_path)
.await
.err_tip(|| "Failed opening temp directory to prune partial downloads in filesystem store")?
.into_inner();

let mut read_dir_stream = ReadDirStream::new(dir_handle);
while let Some(dir_entry) = read_dir_stream.next().await {
let path = dir_entry?.path();
if let Err(err) = fs::remove_file(&path).await {
event!(Level::WARN, ?path, ?err, "Failed to delete file",);
async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> {
let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}"))
.await
.err_tip(|| {
"Failed opening temp directory to prune partial downloads in filesystem store"
})?
.into_inner();

let mut read_dir_stream = ReadDirStream::new(dir_handle);
while let Some(dir_entry) = read_dir_stream.next().await {
let path = dir_entry?.path();
if let Err(err) = fs::remove_file(&path).await {
event!(Level::WARN, ?path, ?err, "Failed to delete file",);
}
}
Ok(())
}

prune_temp_inner(temp_path, STR_FOLDER).await?;
prune_temp_inner(temp_path, DIGEST_FOLDER).await?;
Ok(())
}

Expand Down Expand Up @@ -581,18 +628,25 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
) -> Result<Arc<Self>, Error> {
async fn create_subdirs(path: &str) -> Result<(), Error> {
fs::create_dir_all(format!("{path}/{STR_FOLDER}"))
.await
.err_tip(|| format!("Failed to create directory {path}/{STR_FOLDER}"))?;
fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}"))
.await
.err_tip(|| format!("Failed to create directory {path}/{DIGEST_FOLDER}"))
}

let now = SystemTime::now();

let empty_policy = nativelink_config::stores::EvictionPolicy::default();
let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));

fs::create_dir_all(&spec.temp_path)
.await
.err_tip(|| format!("Failed to temp directory {:?}", &spec.temp_path))?;
fs::create_dir_all(&spec.content_path)
.await
.err_tip(|| format!("Failed to content directory {:?}", &spec.content_path))?;
// Create temp and content directories and the s and d subdirectories.

create_subdirs(&spec.temp_path).await?;
create_subdirs(&spec.content_path).await?;

let shared_context = Arc::new(SharedContext {
active_drop_spawns: AtomicU64::new(0),
Expand Down
Loading

0 comments on commit 1003b6a

Please sign in to comment.