Skip to content

Commit

Permalink
refactor!: FileStore to use StoreKey
Browse files Browse the repository at this point in the history
  • Loading branch information
varshith257 authored Dec 8, 2024
1 parent d75d20d commit 743f4d1
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 78 deletions.
163 changes: 100 additions & 63 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
// Default block size of all major filesystems is 4KB
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;

pub const STRING_PREFIX: &str = "s-";
pub const DIGEST_PREFIX: &str = "d-";

#[derive(Debug, MetricsComponent)]
pub struct SharedContext {
// Used in testing to know how many active drop() spawns are running.
Expand All @@ -75,32 +78,31 @@ enum PathType {
// lot of small files, so to prevent storing duplicate data, we store an Arc
// to the path of the directory where the file is stored and the packed digest.
// Resulting in usize + sizeof(DigestInfo).
type FileNameDigest = DigestInfo;
pub struct EncodedFilePath {
shared_context: Arc<SharedContext>,
path_type: PathType,
digest: FileNameDigest,
key: StoreKey<'static>,
}

impl EncodedFilePath {
#[inline]
fn get_file_path(&self) -> Cow<'_, OsStr> {
get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.digest)
get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key)
}
}

#[inline]
fn get_file_path_raw<'a>(
path_type: &'a PathType,
shared_context: &SharedContext,
digest: &DigestInfo,
key: &StoreKey<'a>,
) -> Cow<'a, OsStr> {
let folder = match path_type {
PathType::Content => &shared_context.content_path,
PathType::Temp => &shared_context.temp_path,
PathType::Custom(path) => return Cow::Borrowed(path),
};
Cow::Owned(to_full_path_from_digest(folder, digest))
Cow::Owned(to_full_path_from_key(folder, key))
}

impl Drop for EncodedFilePath {
Expand Down Expand Up @@ -132,8 +134,12 @@ impl Drop for EncodedFilePath {
}

#[inline]
fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString {
format!("{folder}/{digest}").into()
fn to_full_path_from_key(folder: &str, key: &StoreKey) -> OsString {
match key {
StoreKey::Str(str) => format!("{folder}/{STRING_PREFIX}{str}"),
StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_PREFIX}{digest_info}"),
}
.into()
}

pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
Expand Down Expand Up @@ -300,15 +306,31 @@ impl Debug for FileEntryImpl {
}
}

fn make_temp_digest(digest: &mut DigestInfo) {
static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
let mut hash = *digest.packed_hash();
hash[24..].clone_from_slice(
&DELETE_FILE_COUNTER
.fetch_add(1, Ordering::Relaxed)
.to_le_bytes(),
);
digest.set_packed_hash(*hash);
fn make_temp_key(key: &StoreKey) -> StoreKey<'static> {
static TEMP_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);

match key {
// For digest-based keys, generate a unique suffix using the counter
StoreKey::Digest(digest) => {
let mut temp_digest: [u8; 32] = digest
.packed_hash()
.as_ref()
.try_into()
.expect("PackedHash should fit into [u8; 32]");
let counter = TEMP_FILE_COUNTER
.fetch_add(1, Ordering::Relaxed)
.to_le_bytes();
temp_digest[24..].clone_from_slice(&counter);

StoreKey::Digest(DigestInfo::new(temp_digest, digest.size_bytes()))
}
// For string-based keys, append a counter-based suffix for uniqueness
StoreKey::Str(key) => {
let suffix = TEMP_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
let temp_key = format!("{key}_temp{suffix}");
StoreKey::Str(Cow::Owned(temp_key))
}
}
}

impl LenEntry for FileEntryImpl {
Expand Down Expand Up @@ -362,16 +384,15 @@ impl LenEntry for FileEntryImpl {
return;
}
let from_path = encoded_file_path.get_file_path();
let mut new_digest = encoded_file_path.digest;
make_temp_digest(&mut new_digest);
let new_key = make_temp_key(&encoded_file_path.key);

let to_path =
to_full_path_from_digest(&encoded_file_path.shared_context.temp_path, &new_digest);
let temp_path = &encoded_file_path.shared_context.temp_path;
let to_path = to_full_path_from_key(temp_path, &new_key);

if let Err(err) = fs::rename(&from_path, &to_path).await {
event!(
Level::WARN,
digest = ?encoded_file_path.digest,
key = ?encoded_file_path.key,
?from_path,
?to_path,
?err,
Expand All @@ -380,61 +401,73 @@ impl LenEntry for FileEntryImpl {
} else {
event!(
Level::INFO,
digest = ?encoded_file_path.digest,
key = ?encoded_file_path.key,
?from_path,
?to_path,
"Renamed file",
);
encoded_file_path.path_type = PathType::Temp;
encoded_file_path.digest = new_digest;
encoded_file_path.key = new_key;
}
}
}
}

#[inline]
pub fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
let (hash, size) = file_name.split_once('-').err_tip(|| "")?;
pub fn key_from_filename(mut file_name: &str) -> Result<StoreKey<'static>, Error> {
if let Some(file_name) = file_name.strip_prefix(STRING_PREFIX) {
return Ok(StoreKey::Str(Cow::Owned(file_name.to_owned())));
}

if let Some(name) = file_name.strip_prefix(DIGEST_PREFIX) {
file_name = name;
}

// Fallback: legacy digest handling for backward compatibility
let (hash, size) = file_name
.split_once('-')
.err_tip(|| "Invalid filename format")?;
let size = size.parse::<i64>()?;
DigestInfo::try_new(hash, size)
let digest = DigestInfo::try_new(hash, size)?;
Ok(StoreKey::Digest(digest))
}

/// The number of files to read the metadata for at the same time when running
/// `add_files_to_cache`.
const SIMULTANEOUS_METADATA_READS: usize = 200;

async fn add_files_to_cache<Fe: FileEntry>(
evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
evicting_map: &EvictingMap<StoreKey<'static>, Arc<Fe>, SystemTime>,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
block_size: u64,
) -> Result<(), Error> {
async fn process_entry<Fe: FileEntry>(
evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
evicting_map: &EvictingMap<StoreKey<'static>, Arc<Fe>, SystemTime>,
file_name: &str,
atime: SystemTime,
data_size: u64,
block_size: u64,
anchor_time: &SystemTime,
shared_context: &Arc<SharedContext>,
) -> Result<(), Error> {
let digest = digest_from_filename(file_name)?;
let key = key_from_filename(file_name)?;

let file_entry = Fe::create(
data_size,
block_size,
RwLock::new(EncodedFilePath {
shared_context: shared_context.clone(),
path_type: PathType::Content,
digest,
key: key.clone(),
}),
);
let time_since_anchor = anchor_time
.duration_since(atime)
.map_err(|_| make_input_err!("File access time newer than now"))?;
evicting_map
.insert_with_time(
digest,
key,
Arc::new(file_entry),
time_since_anchor.as_secs() as i32,
)
Expand Down Expand Up @@ -525,7 +558,7 @@ pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
#[metric]
shared_context: Arc<SharedContext>,
#[metric(group = "evicting_map")]
evicting_map: Arc<EvictingMap<DigestInfo, Arc<Fe>, SystemTime>>,
evicting_map: Arc<EvictingMap<StoreKey<'static>, Arc<Fe>, SystemTime>>,
#[metric(help = "Block size of the configured filesystem")]
block_size: u64,
#[metric(help = "Size of the configured read buffer size")]
Expand Down Expand Up @@ -594,8 +627,9 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
}

pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
let key = <StoreKey<'static>>::Digest(*digest);
self.evicting_map
.get(digest)
.get(&key)
.await
.ok_or_else(|| make_err!(Code::NotFound, "{digest} not found in filesystem store"))
}
Expand All @@ -604,7 +638,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
self: Pin<&'a Self>,
mut entry: Fe,
mut resumeable_temp_file: fs::ResumeableFileSlot,
final_digest: DigestInfo,
final_key: StoreKey<'static>,
mut reader: DropCloserReadHalf,
) -> Result<(), Error> {
let mut data_size = 0;
Expand Down Expand Up @@ -649,10 +683,10 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
drop(resumeable_temp_file);

*entry.data_size_mut() = data_size;
self.emplace_file(final_digest, Arc::new(entry)).await
self.emplace_file(final_key, Arc::new(entry)).await
}

async fn emplace_file(&self, digest: DigestInfo, entry: Arc<Fe>) -> Result<(), Error> {
async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc<Fe>) -> Result<(), Error> {
// This sequence of events is quite ticky to understand due to the amount of triggers that
// happen, async'ness of it and the locking. So here is a breakdown of what happens:
// 1. Here will hold a write lock on any file operations of this FileEntry.
Expand Down Expand Up @@ -680,10 +714,11 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
let final_path = get_file_path_raw(
&PathType::Content,
encoded_file_path.shared_context.as_ref(),
&digest,
&key,
);

evicting_map.insert(digest, entry.clone()).await;
// Cloning is necessary because `evicting_map` requires ownership of both the key and the entry:
evicting_map.insert(key.clone(), entry.clone()).await;

let from_path = encoded_file_path.get_file_path();
// Internally tokio spawns fs commands onto a blocking thread anyways.
Expand All @@ -710,12 +745,12 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
// It is possible that the item in our map is no longer the item we inserted,
// So, we need to conditionally remove it only if the pointers are the same.
evicting_map
.remove_if(&digest, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
.remove_if(&key, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
.await;
return Err(err);
}
encoded_file_path.path_type = PathType::Content;
encoded_file_path.digest = digest;
encoded_file_path.key = key;
Ok(())
})
.await
Expand All @@ -730,26 +765,23 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
keys: &[StoreKey<'_>],
results: &mut [Option<u64>],
) -> Result<(), Error> {
// TODO(allada) This is a bit of a hack to get around the lifetime issues with the
// existence_cache. We need to convert the digests to owned values to be able to
// insert them into the cache. In theory it should be able to elide this conversion
// but it seems to be a bit tricky to get right.
let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_digest()).collect();
let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_owned()).collect();

self.evicting_map
.sizes_for_keys(&keys, results, false /* peek */)
.await;
// We need to do a special pass to ensure our zero files exist.
// If our results failed and the result was a zero file, we need to
// create the file by spec.
for (digest, result) in keys.iter().zip(results.iter_mut()) {
if result.is_some() || !is_zero_digest(digest) {
for (key, result) in keys.iter().zip(results.iter_mut()) {
if result.is_some() || !is_zero_digest(key.borrow()) {
continue;
}
let (mut tx, rx) = make_buf_channel_pair();
let send_eof_result = tx.send_eof();
self.update(digest.into(), rx, UploadSizeInfo::ExactSize(0))
self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0))
.await
.err_tip(|| format!("Failed to create zero file for key {digest}"))
.err_tip(|| format!("Failed to create zero file for key {key:?}"))
.merge(
send_eof_result
.err_tip(|| "Failed to send zero file EOF in filesystem store has"),
Expand All @@ -762,25 +794,24 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {

async fn update(
self: Pin<&Self>,
//breaking?
key: StoreKey<'_>,
reader: DropCloserReadHalf,
_upload_size: UploadSizeInfo,
) -> Result<(), Error> {
let digest = key.into_digest();
let mut temp_digest = digest;
make_temp_digest(&mut temp_digest);
let temp_key = make_temp_key(&key);

let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
self.block_size,
EncodedFilePath {
shared_context: self.shared_context.clone(),
path_type: PathType::Temp,
digest: temp_digest,
key: temp_key,
},
)
.await?;

self.update_file(entry, temp_file, digest, reader)
self.update_file(entry, temp_file, key.borrow().into_owned(), reader)
.await
.err_tip(|| format!("While processing with temp file {temp_full_path:?}"))
}
Expand All @@ -795,7 +826,7 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
mut file: fs::ResumeableFileSlot,
upload_size: UploadSizeInfo,
) -> Result<Option<fs::ResumeableFileSlot>, Error> {
let digest = key.into_digest();
let key_owned = key.into_owned();
let path = file.get_path().as_os_str().to_os_string();
let file_size = match upload_size {
UploadSizeInfo::ExactSize(size) => size,
Expand All @@ -818,13 +849,13 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
RwLock::new(EncodedFilePath {
shared_context: self.shared_context.clone(),
path_type: PathType::Custom(path),
digest,
key: key_owned.clone(),
}),
);
// We are done with the file, if we hold a reference to the file here, it could
// result in a deadlock if `emplace_file()` also needs file descriptors.
drop(file);
self.emplace_file(digest, Arc::new(entry))
self.emplace_file(key_owned, Arc::new(entry))
.await
.err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?;
return Ok(None);
Expand All @@ -837,9 +868,8 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
offset: u64,
length: Option<u64>,
) -> Result<(), Error> {
let digest = key.into_digest();
if is_zero_digest(digest) {
self.has(digest.into())
if is_zero_digest(key.borrow()) {
self.has(key.borrow())
.await
.err_tip(|| "Failed to check if zero digest exists in filesystem store")?;
writer
Expand All @@ -848,9 +878,16 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
return Ok(());
}

let entry =
self.evicting_map.get(&digest).await.ok_or_else(|| {
make_err!(Code::NotFound, "{digest} not found in filesystem store")
let entry = self
.evicting_map
.get(&key.borrow().into_owned())
.await
.ok_or_else(|| {
make_err!(
Code::NotFound,
"{:?} not found in filesystem store",
key.as_str()
)
})?;
let read_limit = length.unwrap_or(u64::MAX);
let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?;
Expand Down
Loading

0 comments on commit 743f4d1

Please sign in to comment.