Skip to content

Commit

Permalink
EvictingMap should evict keys on all public access.
Browse files Browse the repository at this point in the history
* Implementation duplication on size[s]_for_key[s].
* Update EvictingMap where all public methods should attempt at evicting keys that have expired.
  • Loading branch information
Adam Singer committed Jan 17, 2024
1 parent 8c433cd commit d8c37e2
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 24 deletions.
73 changes: 53 additions & 20 deletions nativelink-util/src/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_lock::Mutex;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::{future, StreamExt};
use futures::{future, stream, FutureExt, StreamExt};
use lru::LruCache;
use nativelink_config::stores::EvictionPolicy;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -172,7 +172,9 @@ where
}

pub async fn build_lru_index(&self) -> SerializedLRU {
let state = self.state.lock().await;
let mut state = self.state.lock().await;
self.evict_items(state.deref_mut()).await;

let mut serialized_lru = SerializedLRU {
data: Vec::with_capacity(state.lru.len()),
anchor_time: self.anchor_time.unix_timestamp(),
Expand Down Expand Up @@ -247,43 +249,73 @@ where
}
}

/// Return the size of a `DigestInfo`, if not found `None` is returned.
pub async fn size_for_key(&self, digest: &DigestInfo) -> Option<usize> {
let mut state = self.state.lock().await;
let entry = state.lru.get_mut(digest)?;
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
let data = entry.data.clone();
drop(state);
data.touch().await;
Some(data.len())
let mut results = [None];
self.sizes_for_keys(&[*digest], &mut results[..]).await;
results[0]
}

/// Return the sizes of a collection of `DigestInfo`. Expects `results` collection
/// to be provided for storing the resulting `DigestInfo` size. Each index value in
/// `digests` maps directly to the size value of the `DigestInfo` in `results`.
/// If no digest is found in the internal map, `None` is filled in its place.
pub async fn sizes_for_keys(&self, digests: &[DigestInfo], results: &mut [Option<usize>]) {
let mut state = self.state.lock().await;
let seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
self.evict_items(state.deref_mut()).await;
let mut remove_digests: Vec<&DigestInfo> = Vec::new();

let to_touch: Vec<T> = digests
.iter()
.zip(results.iter_mut())
.filter_map(|(digest, result)| {
let entry = state.lru.get_mut(digest)?;
entry.seconds_since_anchor = seconds_since_anchor;
let data = entry.data.clone();
*result = Some(data.len());
Some(data)
.flat_map(|(digest, result)| {
let lru_len = state.lru.len();
let sum_store_size = state.sum_store_size;
// Determine if a digest should be evicted or data should be touched.
// Digests to be eviected are collected in separate vector and chained
// in a single future.
if let Some(entry) = state.lru.get(digest) {
if self.should_evict(lru_len, entry, sum_store_size, self.max_bytes) {
// Digest should be evicted.
remove_digests.push(digest);
None
} else {
// Extract data entry to be touched and slot length into results.
let data = entry.data.clone();
*result = Some(data.len());
Some(data)
}
} else {
// Digest will be evicted if not in lru map, this is a pedantic case.
remove_digests.push(digest);
None
}
})
.collect();
drop(state);

let remove_stream = stream::iter(vec![
async move {
for digest in remove_digests {
self.inner_remove(state.deref_mut(), digest).await;
}
}
.await,
]);

to_touch
.iter()
.map(|data| data.touch())
.map(|data| data.touch().map(|_| ()))
.collect::<FuturesUnordered<_>>()
.chain(remove_stream)
.for_each(|_| future::ready(()))
.await;
}

pub async fn get(&self, digest: &DigestInfo) -> Option<T> {
let mut state = self.state.lock().await;
self.evict_items(state.deref_mut()).await;

if let Some(entry) = state.lru.get_mut(digest) {
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
let data = entry.data.clone();
drop(state);
data.touch().await;
Expand Down Expand Up @@ -354,7 +386,8 @@ where
self.inner_remove(&mut state, digest).await
}

async fn inner_remove(&self, state: &mut State<T>, digest: &DigestInfo) -> bool {
async fn inner_remove(&self, mut state: &mut State<T>, digest: &DigestInfo) -> bool {
self.evict_items(state.deref_mut()).await;
if let Some(entry) = state.lru.pop(digest) {
let data_len = entry.data.len() as u64;
state.sum_store_size -= data_len;
Expand Down
65 changes: 61 additions & 4 deletions nativelink-util/tests/evicting_map_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ mod evicting_map_tests {

assert_eq!(
evicting_map.size_for_key(&DigestInfo::try_new(HASH1, 0)?).await,
Some(DATA.len()),
"Expected map to have item 1"
None,
"Expected map to not have item 1"
);
assert_eq!(
evicting_map.size_for_key(&DigestInfo::try_new(HASH2, 0)?).await,
Expand Down Expand Up @@ -415,8 +415,8 @@ mod evicting_map_tests {

assert_eq!(
evicting_map.size_for_key(&DigestInfo::try_new(HASH1, 0)?).await,
Some(8),
"Expected map to have item 1"
None,
"Expected map to not have item 1"
);
assert_eq!(
evicting_map.size_for_key(&DigestInfo::try_new(HASH2, 0)?).await,
Expand Down Expand Up @@ -541,4 +541,61 @@ mod evicting_map_tests {

Ok(())
}

#[tokio::test]
async fn get_evicts_on_time() -> Result<(), Error> {
let evicting_map = EvictingMap::<BytesWrapper, MockInstantWrapped>::new(
&EvictionPolicy {
max_count: 0,
max_seconds: 5,
max_bytes: 0,
evict_bytes: 0,
},
MockInstantWrapped(MockInstant::now()),
);

const DATA: &str = "12345678";
let digest_info1: DigestInfo = DigestInfo::try_new(HASH1, 0)?;
evicting_map.insert(digest_info1, Bytes::from(DATA).into()).await;

// Getting from map before time has expired should return the value.
assert_eq!(evicting_map.get(&digest_info1).await, Some(Bytes::from(DATA).into()));

MockClock::advance(Duration::from_secs(10));

// Getting from map after time has expired should return None.
assert_eq!(evicting_map.get(&digest_info1).await, None);

Ok(())
}

#[tokio::test]
async fn remove_evicts_on_time() -> Result<(), Error> {
let evicting_map = EvictingMap::<BytesWrapper, MockInstantWrapped>::new(
&EvictionPolicy {
max_count: 0,
max_seconds: 5,
max_bytes: 0,
evict_bytes: 0,
},
MockInstantWrapped(MockInstant::now()),
);

const DATA: &str = "12345678";
let digest_info1: DigestInfo = DigestInfo::try_new(HASH1, 0)?;
evicting_map.insert(digest_info1, Bytes::from(DATA).into()).await;

let digest_info2: DigestInfo = DigestInfo::try_new(HASH2, 0)?;
evicting_map.insert(digest_info2, Bytes::from(DATA).into()).await;

// Removing digest before time has expired should return true.
assert!(evicting_map.remove(&digest_info2).await);

MockClock::advance(Duration::from_secs(10));

// Removing digest after time has expired should return false.
assert!(!evicting_map.remove(&digest_info1).await);

Ok(())
}
}

0 comments on commit d8c37e2

Please sign in to comment.