Skip to content

Commit

Permalink
[cherrypick] PR#20236 (#20280)
Browse files Browse the repository at this point in the history
## Description 

pick writeback cache fix into next mainnet release

Co-authored-by: Mark Logan <[email protected]>
  • Loading branch information
arun-koshy and mystenmark authored Nov 15, 2024
1 parent 2b23b49 commit a5d8b0d
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 59 deletions.
72 changes: 72 additions & 0 deletions crates/sui-core/src/execution_cache/cache_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

use std::cmp::Ordering;
use std::collections::VecDeque;
use std::hash::Hash;
use std::sync::Arc;

use moka::sync::Cache as MokaCache;
use parking_lot::Mutex;
use sui_types::base_types::SequenceNumber;

/// CachedVersionMap is a map from version to value, with the additional contraints:
Expand Down Expand Up @@ -137,6 +141,74 @@ where
}
}

// Could just use the Ord trait but I think it would be confusing to overload it
// in that way.
pub trait IsNewer {
fn is_newer_than(&self, other: &Self) -> bool;
}

pub struct MonotonicCache<K, V> {
cache: MokaCache<K, Arc<Mutex<V>>>,
}

impl<K, V> MonotonicCache<K, V>
where
K: Hash + Eq + Send + Sync + Copy + 'static,
V: IsNewer + Clone + Send + Sync + 'static,
{
pub fn new(cache_size: u64) -> Self {
Self {
cache: MokaCache::builder().max_capacity(cache_size).build(),
}
}

pub fn get(&self, key: &K) -> Option<Arc<Mutex<V>>> {
self.cache.get(key)
}

// Update the cache with guaranteed monotonicity. That is, if there are N
// calls to the this function from N threads, the write with the newest value will
// win the race regardless of what ordering the writes occur in.
//
// Caller should log the insert with trace! and increment the appropriate metric.
pub fn insert(&self, key: &K, value: V) {
// Warning: tricky code!
let entry = self
.cache
.entry(*key)
// only one racing insert will call the closure
.or_insert_with(|| Arc::new(Mutex::new(value.clone())));

// We may be racing with another thread that observed an older version of value
if !entry.is_fresh() {
// !is_fresh means we lost the race, and entry holds the value that was
// inserted by the other thread. We need to check if we have a more recent value
// than the other reader.
let mut entry = entry.value().lock();
if value.is_newer_than(&entry) {
*entry = value;
}
}
}

pub fn invalidate(&self, key: &K) {
self.cache.invalidate(key);
}

#[cfg(test)]
pub fn contains_key(&self, key: &K) -> bool {
self.cache.contains_key(key)
}

pub fn invalidate_all(&self) {
self.cache.invalidate_all();
}

pub fn is_empty(&self) -> bool {
self.cache.iter().next().is_none()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
78 changes: 19 additions & 59 deletions crates/sui-core/src/execution_cache/writeback_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ use tracing::{debug, info, instrument, trace, warn};

use super::ExecutionCacheAPI;
use super::{
cache_types::CachedVersionMap, implement_passthrough_traits, object_locks::ObjectLocks,
cache_types::{CachedVersionMap, IsNewer, MonotonicCache},
implement_passthrough_traits,
object_locks::ObjectLocks,
CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
};
Expand Down Expand Up @@ -153,6 +155,16 @@ enum LatestObjectCacheEntry {
}

impl LatestObjectCacheEntry {
#[cfg(test)]
fn version(&self) -> Option<SequenceNumber> {
match self {
LatestObjectCacheEntry::Object(version, _) => Some(*version),
LatestObjectCacheEntry::NonExistent => None,
}
}
}

impl IsNewer for LatestObjectCacheEntry {
fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
match (self, other) {
(LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
Expand All @@ -162,14 +174,6 @@ impl LatestObjectCacheEntry {
_ => false,
}
}

#[cfg(test)]
fn version(&self) -> Option<SequenceNumber> {
match self {
LatestObjectCacheEntry::Object(version, _) => Some(*version),
LatestObjectCacheEntry::NonExistent => None,
}
}
}

type MarkerKey = (EpochId, ObjectID);
Expand Down Expand Up @@ -271,7 +275,7 @@ struct CachedCommittedData {
// We cannot simply insert objects that we read off the disk into `object_cache`,
// since that may violate the no-missing-versions property.
// `object_by_id_cache` is also written to on writes so that it is always coherent.
object_by_id_cache: MokaCache<ObjectID, Arc<Mutex<LatestObjectCacheEntry>>>,
object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,

// See module level comment for an explanation of caching strategy.
marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
Expand All @@ -295,10 +299,6 @@ impl CachedCommittedData {
.max_capacity(MAX_CACHE_SIZE)
.max_capacity(MAX_CACHE_SIZE)
.build();
let object_by_id_cache = MokaCache::builder()
.max_capacity(MAX_CACHE_SIZE)
.max_capacity(MAX_CACHE_SIZE)
.build();
let marker_cache = MokaCache::builder()
.max_capacity(MAX_CACHE_SIZE)
.max_capacity(MAX_CACHE_SIZE)
Expand Down Expand Up @@ -326,7 +326,7 @@ impl CachedCommittedData {

Self {
object_cache,
object_by_id_cache,
object_by_id_cache: MonotonicCache::new(MAX_CACHE_SIZE),
marker_cache,
transactions,
transaction_effects,
Expand All @@ -347,7 +347,7 @@ impl CachedCommittedData {
self._transaction_objects.invalidate_all();

assert_empty(&self.object_cache);
assert_empty(&self.object_by_id_cache);
assert!(&self.object_by_id_cache.is_empty());
assert_empty(&self.marker_cache);
assert_empty(&self.transactions);
assert_empty(&self.transaction_effects);
Expand Down Expand Up @@ -486,11 +486,8 @@ impl WritebackCache {
let mut entry = self.dirty.objects.entry(*object_id).or_default();

self.cached.object_by_id_cache.insert(
*object_id,
Arc::new(Mutex::new(LatestObjectCacheEntry::Object(
version,
object.clone(),
))),
object_id,
LatestObjectCacheEntry::Object(version, object.clone()),
);

entry.insert(version, object);
Expand Down Expand Up @@ -1087,47 +1084,10 @@ impl WritebackCache {
}

// Updates the latest object id cache with an entry that was read from the db.
// Writes bypass this function, because an object write is guaranteed to be the
// most recent version (and cannot race with any other writes to that object id)
//
// If there are racing calls to this function, it is guaranteed that after a call
// has returned, reads from that thread will not observe a lower version than the
// one they inserted
fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) {
trace!("caching object by id: {:?} {:?}", object_id, object);
self.metrics.record_cache_write("object_by_id");
// Warning: tricky code!
let entry = self
.cached
.object_by_id_cache
.entry(*object_id)
// only one racing insert will call the closure
.or_insert_with(|| Arc::new(Mutex::new(object.clone())));

// We may be racing with another thread that observed an older version of the object
if !entry.is_fresh() {
// !is_fresh means we lost the race, and entry holds the value that was
// inserted by the other thread. We need to check if we have a more recent version
// than the other reader.
//
// This could also mean that the entry was inserted by a transaction write. This
// could occur in the following case:
//
// THREAD 1 | THREAD 2
// reads object at v1 |
// | tx writes object at v2
// tries to cache v1
//
// Thread 1 will see that v2 is already in the cache when it tries to cache it,
// and will try to update the cache with v1. But the is_newer_than check will fail,
// so v2 will remain in the cache

// Ensure only the latest version is inserted.
let mut entry = entry.value().lock();
if object.is_newer_than(&entry) {
*entry = object;
}
}
self.cached.object_by_id_cache.insert(object_id, object);
}

fn cache_object_not_found(&self, object_id: &ObjectID) {
Expand Down

0 comments on commit a5d8b0d

Please sign in to comment.