From a5d8b0d730f517f57660db8801e00f60a0eee672 Mon Sep 17 00:00:00 2001 From: Arun Koshy <97870774+arun-koshy@users.noreply.github.com> Date: Thu, 14 Nov 2024 22:22:26 -0800 Subject: [PATCH] [cherrypick] PR#20236 (#20280) ## Description pick writeback cache fix into next mainnet release Co-authored-by: Mark Logan <103447440+mystenmark@users.noreply.github.com> --- .../src/execution_cache/cache_types.rs | 72 +++++++++++++++++ .../src/execution_cache/writeback_cache.rs | 78 +++++-------------- 2 files changed, 91 insertions(+), 59 deletions(-) diff --git a/crates/sui-core/src/execution_cache/cache_types.rs b/crates/sui-core/src/execution_cache/cache_types.rs index acfd5aea5b1db..3faea764a1f4d 100644 --- a/crates/sui-core/src/execution_cache/cache_types.rs +++ b/crates/sui-core/src/execution_cache/cache_types.rs @@ -3,7 +3,11 @@ use std::cmp::Ordering; use std::collections::VecDeque; +use std::hash::Hash; +use std::sync::Arc; +use moka::sync::Cache as MokaCache; +use parking_lot::Mutex; use sui_types::base_types::SequenceNumber; /// CachedVersionMap is a map from version to value, with the additional contraints: @@ -137,6 +141,74 @@ where } } +// Could just use the Ord trait but I think it would be confusing to overload it +// in that way. +pub trait IsNewer { + fn is_newer_than(&self, other: &Self) -> bool; +} + +pub struct MonotonicCache { + cache: MokaCache>>, +} + +impl MonotonicCache +where + K: Hash + Eq + Send + Sync + Copy + 'static, + V: IsNewer + Clone + Send + Sync + 'static, +{ + pub fn new(cache_size: u64) -> Self { + Self { + cache: MokaCache::builder().max_capacity(cache_size).build(), + } + } + + pub fn get(&self, key: &K) -> Option>> { + self.cache.get(key) + } + + // Update the cache with guaranteed monotonicity. That is, if there are N + // calls to the this function from N threads, the write with the newest value will + // win the race regardless of what ordering the writes occur in. + // + // Caller should log the insert with trace! and increment the appropriate metric. + pub fn insert(&self, key: &K, value: V) { + // Warning: tricky code! + let entry = self + .cache + .entry(*key) + // only one racing insert will call the closure + .or_insert_with(|| Arc::new(Mutex::new(value.clone()))); + + // We may be racing with another thread that observed an older version of value + if !entry.is_fresh() { + // !is_fresh means we lost the race, and entry holds the value that was + // inserted by the other thread. We need to check if we have a more recent value + // than the other reader. + let mut entry = entry.value().lock(); + if value.is_newer_than(&entry) { + *entry = value; + } + } + } + + pub fn invalidate(&self, key: &K) { + self.cache.invalidate(key); + } + + #[cfg(test)] + pub fn contains_key(&self, key: &K) -> bool { + self.cache.contains_key(key) + } + + pub fn invalidate_all(&self) { + self.cache.invalidate_all(); + } + + pub fn is_empty(&self) -> bool { + self.cache.iter().next().is_none() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/sui-core/src/execution_cache/writeback_cache.rs b/crates/sui-core/src/execution_cache/writeback_cache.rs index 5ff74be1c0182..1a5d77729affc 100644 --- a/crates/sui-core/src/execution_cache/writeback_cache.rs +++ b/crates/sui-core/src/execution_cache/writeback_cache.rs @@ -78,7 +78,9 @@ use tracing::{debug, info, instrument, trace, warn}; use super::ExecutionCacheAPI; use super::{ - cache_types::CachedVersionMap, implement_passthrough_traits, object_locks::ObjectLocks, + cache_types::{CachedVersionMap, IsNewer, MonotonicCache}, + implement_passthrough_traits, + object_locks::ObjectLocks, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead, }; @@ -153,6 +155,16 @@ enum LatestObjectCacheEntry { } impl LatestObjectCacheEntry { + #[cfg(test)] + fn version(&self) -> Option { + match self { + LatestObjectCacheEntry::Object(version, _) => Some(*version), + LatestObjectCacheEntry::NonExistent => None, + } + } +} + +impl IsNewer for LatestObjectCacheEntry { fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool { match (self, other) { (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => { @@ -162,14 +174,6 @@ impl LatestObjectCacheEntry { _ => false, } } - - #[cfg(test)] - fn version(&self) -> Option { - match self { - LatestObjectCacheEntry::Object(version, _) => Some(*version), - LatestObjectCacheEntry::NonExistent => None, - } - } } type MarkerKey = (EpochId, ObjectID); @@ -271,7 +275,7 @@ struct CachedCommittedData { // We cannot simply insert objects that we read off the disk into `object_cache`, // since that may violate the no-missing-versions property. // `object_by_id_cache` is also written to on writes so that it is always coherent. - object_by_id_cache: MokaCache>>, + object_by_id_cache: MonotonicCache, // See module level comment for an explanation of caching strategy. marker_cache: MokaCache>>>, @@ -295,10 +299,6 @@ impl CachedCommittedData { .max_capacity(MAX_CACHE_SIZE) .max_capacity(MAX_CACHE_SIZE) .build(); - let object_by_id_cache = MokaCache::builder() - .max_capacity(MAX_CACHE_SIZE) - .max_capacity(MAX_CACHE_SIZE) - .build(); let marker_cache = MokaCache::builder() .max_capacity(MAX_CACHE_SIZE) .max_capacity(MAX_CACHE_SIZE) @@ -326,7 +326,7 @@ impl CachedCommittedData { Self { object_cache, - object_by_id_cache, + object_by_id_cache: MonotonicCache::new(MAX_CACHE_SIZE), marker_cache, transactions, transaction_effects, @@ -347,7 +347,7 @@ impl CachedCommittedData { self._transaction_objects.invalidate_all(); assert_empty(&self.object_cache); - assert_empty(&self.object_by_id_cache); + assert!(&self.object_by_id_cache.is_empty()); assert_empty(&self.marker_cache); assert_empty(&self.transactions); assert_empty(&self.transaction_effects); @@ -486,11 +486,8 @@ impl WritebackCache { let mut entry = self.dirty.objects.entry(*object_id).or_default(); self.cached.object_by_id_cache.insert( - *object_id, - Arc::new(Mutex::new(LatestObjectCacheEntry::Object( - version, - object.clone(), - ))), + object_id, + LatestObjectCacheEntry::Object(version, object.clone()), ); entry.insert(version, object); @@ -1087,47 +1084,10 @@ impl WritebackCache { } // Updates the latest object id cache with an entry that was read from the db. - // Writes bypass this function, because an object write is guaranteed to be the - // most recent version (and cannot race with any other writes to that object id) - // - // If there are racing calls to this function, it is guaranteed that after a call - // has returned, reads from that thread will not observe a lower version than the - // one they inserted fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) { trace!("caching object by id: {:?} {:?}", object_id, object); self.metrics.record_cache_write("object_by_id"); - // Warning: tricky code! - let entry = self - .cached - .object_by_id_cache - .entry(*object_id) - // only one racing insert will call the closure - .or_insert_with(|| Arc::new(Mutex::new(object.clone()))); - - // We may be racing with another thread that observed an older version of the object - if !entry.is_fresh() { - // !is_fresh means we lost the race, and entry holds the value that was - // inserted by the other thread. We need to check if we have a more recent version - // than the other reader. - // - // This could also mean that the entry was inserted by a transaction write. This - // could occur in the following case: - // - // THREAD 1 | THREAD 2 - // reads object at v1 | - // | tx writes object at v2 - // tries to cache v1 - // - // Thread 1 will see that v2 is already in the cache when it tries to cache it, - // and will try to update the cache with v1. But the is_newer_than check will fail, - // so v2 will remain in the cache - - // Ensure only the latest version is inserted. - let mut entry = entry.value().lock(); - if object.is_newer_than(&entry) { - *entry = object; - } - } + self.cached.object_by_id_cache.insert(object_id, object); } fn cache_object_not_found(&self, object_id: &ObjectID) {