Skip to content

Commit

Permalink
Implementing tree traversal for pruning stale actions
Browse files Browse the repository at this point in the history
  • Loading branch information
blakehatch committed Oct 26, 2023
1 parent 6c937da commit b875024
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 8 deletions.
7 changes: 7 additions & 0 deletions cas/store/compression_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,13 @@ impl StoreTrait for CompressionStore {
write_result.merge(update_result)
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
7 changes: 7 additions & 0 deletions cas/store/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ impl StoreTrait for DedupStore {
Ok(())
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
7 changes: 7 additions & 0 deletions cas/store/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ impl StoreTrait for FastSlowStore {
Ok(())
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
7 changes: 7 additions & 0 deletions cas/store/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,13 @@ impl<Fe: FileEntry> StoreTrait for FilesystemStore<Fe> {
.err_tip(|| format!("While processing with temp file {:?}", temp_full_path))
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
31 changes: 30 additions & 1 deletion cas/store/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,28 @@ impl GrpcStore {
.await
.map(|_| ())
}

pub async fn prune_tree(&self, stale_items: Vec<DigestInfo>, grpc_request: Request<GetTreeRequest>) -> Result<(), Error> {
let root_node = self.get_tree(grpc_request).await?;

// Perform a Depth-First Search (DFS) on the tree using recursion
self.dfs_prune(root_node, stale_items).await
}

// TODO(BlakeHatch) figure out how to convert between an Action and the Digest on the CAS tree

async fn dfs_prune(&self, node: Response<Streaming<GetTreeResponse>>, stale_items: Vec<DigestInfo>) -> Result<(), Error> {
//TODO(BlakeHatch): Figure out correct way to write this
for child in node.children {
// Perform comparison of the stale items
if stale_items.contains(&node.digest) {
// If the node is in the stale items list, remove it
self.(node).await?;
}
}

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -567,6 +589,13 @@ impl StoreTrait for GrpcStore {
Ok(())
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down Expand Up @@ -632,5 +661,5 @@ impl StoreTrait for GrpcStore {

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}
}
}
7 changes: 7 additions & 0 deletions cas/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ impl StoreTrait for MemoryStore {
Ok(())
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
7 changes: 7 additions & 0 deletions cas/store/ref_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ impl StoreTrait for RefStore {
Pin::new(store.as_ref()).update(digest, reader, size_info).await
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
20 changes: 19 additions & 1 deletion cas/store/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use rusoto_core::request::{DispatchSignedRequest, DispatchSignedRequestFuture};
use rusoto_core::{region::Region, ByteStream, HttpClient, HttpDispatchError, RusotoError};
use rusoto_s3::{
AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectRequest,
CreateMultipartUploadRequest, GetObjectError, GetObjectRequest, DeleteObjectRequest, HeadObjectError, HeadObjectRequest,
PutObjectRequest, S3Client, UploadPartRequest, S3,
};
use rusoto_signature::signature::SignedRequest;
Expand Down Expand Up @@ -413,6 +413,24 @@ impl StoreTrait for S3Store {
complete_result
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
let s3_path = &self.make_s3_path(&_digest);
let delete_request = DeleteObjectRequest {
bucket: self.bucket.to_owned(),
key: s3_path.to_owned(),
..Default::default()
};
self.s3_client
.delete_object(delete_request)
.await
.map_or_else(|e| Err(make_err!(Code::Unknown, "{:?}", e)), |_| Ok(()))
.err_tip(|| "Failed to delete object from s3")?;
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
7 changes: 7 additions & 0 deletions cas/store/shard_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ impl StoreTrait for ShardStore {
.err_tip(|| "In ShardStore::update()")
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
7 changes: 7 additions & 0 deletions cas/store/size_partitioning_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ impl StoreTrait for SizePartitioningStore {
.await
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
7 changes: 7 additions & 0 deletions cas/store/store_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ pub trait StoreTrait: Sync + Send + Unpin {
upload_size: UploadSizeInfo,
) -> Result<(), Error>;

/// Delete a digest from the store.
/// If the digest does not exist in the store, return Ok(()).
async fn delete(
self: Pin<&Self>,
digest: DigestInfo,
) -> Result<(), Error>;

// Utility to send all the data to the store when you have all the bytes.
async fn update_oneshot(self: Pin<&Self>, digest: DigestInfo, data: Bytes) -> Result<(), Error> {
// TODO(blaise.bruer) This is extremely inefficient, since we have exactly
Expand Down
31 changes: 31 additions & 0 deletions cas/store/tests/s3_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,37 @@ mod s3_store_tests {
Ok(())
}

#[tokio::test]
async fn simple_del_ac() -> Result<(), Error> {
const AC_ENTRY_SIZE: u64 = 1000; // Any size that is not VALUE.len().

let s3_client = S3Client::new_with(
MockRequestDispatcher::with_status(StatusCode::OK.into()),
MockCredentialsProvider,
Region::UsEast1,
);
let store = S3Store::new_with_client_and_jitter(
&config::stores::S3Store {
bucket: BUCKET_NAME.to_string(),
..Default::default()
},
s3_client,
Box::new(move |_delay| Duration::from_secs(0)),
)?;
let store_pin = Pin::new(&store);

let delete_result = store_pin
.delete(DigestInfo::try_new(VALID_HASH1, AC_ENTRY_SIZE)?)
.await?;
assert_eq!(
delete_result,
(),
"Hash for key: {} did not delete.",
VALID_HASH1,
);
Ok(())
}

#[tokio::test]
async fn smoke_test_get_part() -> Result<(), Error> {
const AC_ENTRY_SIZE: u64 = 1000; // Any size that is not raw_send_data.len().
Expand Down
7 changes: 7 additions & 0 deletions cas/store/verify_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ impl StoreTrait for VerifyStore {
update_res.merge(check_res)
}

async fn delete(
self: Pin<&Self>,
_digest: DigestInfo,
) -> Result<(), Error> {
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
47 changes: 43 additions & 4 deletions cas/worker/running_actions_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The Turbo Cache Authors. All rights reserved.
// Copyright 2023 The Turbo Cache Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -482,6 +482,30 @@ impl RunningActionImpl {
did_cleanup: AtomicBool::new(false),
}
}

async fn find_and_del_stale_actions(self: Arc<Self>, ac_store: Arc<dyn Store + Send + Sync>, stale_treshold: Arc<i64>, digests: Arc<Vec<DigestInfo>>) -> Result<Vec<DigestInfo>, Box<dyn std::error::Error + Send + Sync + 'static>> {

let mut stale_digests = Vec::new();
let now = SystemTime::now();

for digest in &*digests {
//Decode the digest into an ActionResult using get_and_decode_digest::
let action_result = get_and_decode_digest::<ActionResult>(ac_store.clone(), &digest).await?;

//Get timestamp of completion from Action metadata and compare to current time
let completion_time = action_result.execution_metadata.completion_timestamp;
if now.duration_since(completion_time)? > stale_treshold {
//If it happened too long ago add the digest to a list of stale digests
stale_digests.push(digest);

// Also delete the digest from the Action Cache
ac_store.delete(digest).await?;
}
}

//return list of stale digests
Ok(stale_digests)
}

fn metrics(&self) -> &Arc<Metrics> {
&self.running_actions_manager.metrics
Expand Down Expand Up @@ -1080,6 +1104,8 @@ pub trait RunningActionsManager: Sync + Send + Sized + Unpin + 'static {

async fn kill_all(&self);

async fn find_stale_build_actions(root_action: Arc<RunningActionImpl>, ac_store: Arc<dyn Store + Send + Sync>) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;

fn metrics(&self) -> &Arc<Metrics>;
}

Expand Down Expand Up @@ -1128,7 +1154,8 @@ pub struct RunningActionsManagerImpl {
// Note: We don't use Notify because we need to support a .wait_for()-like function, which
// Notify does not support.
action_done_tx: watch::Sender<()>,
callbacks: Callbacks,
callbacks: Callbacks,
find_stale_build_actions: Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>,
metrics: Arc<Metrics>,
}

Expand Down Expand Up @@ -1162,6 +1189,7 @@ impl RunningActionsManagerImpl {
running_actions: Mutex::new(HashMap::new()),
action_done_tx,
callbacks,
find_stale_build_actions: Ok(()),
metrics: Arc::new(Metrics::default()),
})
}
Expand Down Expand Up @@ -1250,7 +1278,7 @@ impl RunningActionsManagerImpl {
log::error!("Error sending kill to running action {}", hex::encode(action.action_id));
}
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -1393,12 +1421,23 @@ impl RunningActionsManager for RunningActionsManagerImpl {
.subscribe()
.wait_for(|_| self.running_actions.lock().is_empty())
.await;
}
}

#[inline]
fn metrics(&self) -> &Arc<Metrics> {
&self.metrics
}

async fn find_stale_build_actions(root_action: Arc<RunningActionImpl>, ac_store: Arc<dyn Store + Send + Sync>) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let mut stale_digests: Vec<DigestInfo> = Vec::new();
// Your code to populate digest_infos goes here

// Store these in a Vec
stale_digests = root_action.find_and_del_stale_actions(ac_store, stale_treshold, stale_digests).await?;

// Remove stale actions
Ok(stale_digests)
}
}

#[derive(Default)]
Expand Down
Loading

0 comments on commit b875024

Please sign in to comment.