diff --git a/crates/sui-core/src/execution_cache/cache_types.rs b/crates/sui-core/src/execution_cache/cache_types.rs index 3faea764a1f4d..55a5f81d8fcfc 100644 --- a/crates/sui-core/src/execution_cache/cache_types.rs +++ b/crates/sui-core/src/execution_cache/cache_types.rs @@ -1,10 +1,11 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::cmp::Ordering; use std::collections::VecDeque; -use std::hash::Hash; +use std::hash::{Hash, Hasher}; +use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::{cmp::Ordering, hash::DefaultHasher}; use moka::sync::Cache as MokaCache; use parking_lot::Mutex; @@ -149,8 +150,31 @@ pub trait IsNewer { pub struct MonotonicCache { cache: MokaCache>>, + // When inserting a possibly stale value, we prove that it is not stale by + // ensuring that no fresh value was inserted since we began reading the value + // we are inserting. We do this by hashing the key to an element in this array, + // reading the current value, and then passing that value to insert(). If the + // value is out of date, then there may have been an intervening write, so we + // discard the insert attempt. + key_generation: Vec, } +pub enum Ticket { + // Read tickets are used when caching the result of a read from the db. + // They are only valid if the generation number matches the current generation. + // Used to ensure that no write occurred while we were reading from the db. + Read(u64), + // Write tickets are always valid. Used when caching writes, which cannot be stale. + Write, +} + +// key_generation should be big enough to make false positives unlikely. If, on +// average, there is one millisecond between acquiring the ticket and calling insert(), +// then even at 1 million inserts per second, there will be 1000 inserts between acquiring +// the ticket and calling insert(), so about 1/16th of the entries will be invalidated, +// so valid inserts will succeed with probability 15/16. +const KEY_GENERATION_SIZE: usize = 1024 * 16; + impl MonotonicCache where K: Hash + Eq + Send + Sync + Copy + 'static, @@ -159,6 +183,9 @@ where pub fn new(cache_size: u64) -> Self { Self { cache: MokaCache::builder().max_capacity(cache_size).build(), + key_generation: (0..KEY_GENERATION_SIZE) + .map(|_| AtomicU64::new(0)) + .collect(), } } @@ -166,29 +193,111 @@ where self.cache.get(key) } + fn generation(&self, key: &K) -> &AtomicU64 { + let mut state = DefaultHasher::new(); + key.hash(&mut state); + let hash = state.finish(); + &self.key_generation[(hash % KEY_GENERATION_SIZE as u64) as usize] + } + + /// Get a ticket for caching the result of a read operation. The ticket will be + /// expired if a writer writes a new version of the value. + /// The caller must obtain the ticket BEFORE checking the dirty set and db. By + /// obeying this rule, the caller can be sure that if their ticket remains valid + /// at insert time, they either are inserting the most recent value, or a concurrent + /// writer will shortly overwrite their value. + pub fn get_ticket_for_read(&self, key: &K) -> Ticket { + let gen = self.generation(key); + Ticket::Read(gen.load(std::sync::atomic::Ordering::Acquire)) + } + // Update the cache with guaranteed monotonicity. That is, if there are N // calls to the this function from N threads, the write with the newest value will // win the race regardless of what ordering the writes occur in. // // Caller should log the insert with trace! and increment the appropriate metric. - pub fn insert(&self, key: &K, value: V) { + pub fn insert(&self, key: &K, value: V, ticket: Ticket) -> Result<(), ()> { + let gen = self.generation(key); + + // invalidate other readers as early as possible. If a reader acquires a + // new ticket after this point, then it will read the new value from + // the dirty set (or db). + if matches!(ticket, Ticket::Write) { + gen.fetch_add(1, std::sync::atomic::Ordering::Release); + } + + let check_ticket = || -> Result<(), ()> { + match ticket { + Ticket::Read(ticket) => { + if ticket != gen.load(std::sync::atomic::Ordering::Acquire) { + return Err(()); + } + Ok(()) + } + Ticket::Write => Ok(()), + } + }; + // Warning: tricky code! let entry = self .cache .entry(*key) - // only one racing insert will call the closure - .or_insert_with(|| Arc::new(Mutex::new(value.clone()))); - - // We may be racing with another thread that observed an older version of value + // Suppose there is a reader (who has an old version) and a writer (who has + // the newest version by definition) both trying to insert when the cache has + // no entry. Here are the possible outcomes: + // + // 1. Race in `or_optionally_insert_with`: + // 1. Reader wins race, ticket is valid, and reader inserts old version. + // Writer will overwrite the old version after the !is_fresh check. + // 2. Writer wins race. Reader will enter is_fresh check, lock entry, and + // find that its ticket is expired. + // + // 2. No race on `or_optionally_insert_with`: + // 1. Reader inserts first (via `or_optionally_insert_with`), writer enters !is_fresh + // check and overwrites entry. + // 1. There are two sub-cases here because the reader's entry could be evicted, + // but in either case the writer obviously overwrites it. + // 2. Writer inserts first (via `or_optionally_insert_with`), invalidates ticket. + // Then, two cases can follow: + // 1. Reader skips `or_optionally_insert_with` (because entry is present), enters + // !is_fresh check, and does not insert because its ticket is expired. + // 2. The writer's cache entry is evicted already, so reader enters + // `or_optionally_insert_with`. The ticket is expired so we do not insert. + // + // The other cases are where there is already an entry. In this case neither reader + // nor writer will enter `or_optionally_insert_with` callback. Instead they will both enter + // the !is_fresh check and lock the entry: + // 1. If the reader locks first, it will insert its old version. Then the writer + // will lock and overwrite it with the newer version. + // 2. If the writer locks first, it will have already expired the ticket, and the + // reader will not insert anything. + // + // There may also be more than one concurrent reader. However, the only way the two + // readers can have different versions is if there is concurrently a writer that wrote + // a new version. In this case all stale readers will fail the ticket check, and only + // up-to-date readers will remain. So we cannot have a bad insert caused by two readers + // racing to insert, both with valid tickets. + .or_optionally_insert_with(|| { + check_ticket().ok()?; + Some(Arc::new(Mutex::new(value.clone()))) + }) + // Note: Ticket::Write cannot expire, but an insert can still fail, in the case where + // a writer and reader are racing to call `or_optionally_insert_with`, the reader wins, + // but then fails to insert because its ticket is expired. Then no entry at all is inserted. + .ok_or(())?; + + // !is_fresh means we did not insert a new entry in or_optionally_insert_with above. if !entry.is_fresh() { - // !is_fresh means we lost the race, and entry holds the value that was - // inserted by the other thread. We need to check if we have a more recent value - // than the other reader. let mut entry = entry.value().lock(); - if value.is_newer_than(&entry) { - *entry = value; - } + check_ticket()?; + + // Ticket expiry makes this assert impossible. + // TODO: relax to debug_assert? + assert!(!entry.is_newer_than(&value), "entry is newer than value"); + *entry = value; } + + Ok(()) } pub fn invalidate(&self, key: &K) { diff --git a/crates/sui-core/src/execution_cache/metrics.rs b/crates/sui-core/src/execution_cache/metrics.rs index ef158cd378250..83b946557d964 100644 --- a/crates/sui-core/src/execution_cache/metrics.rs +++ b/crates/sui-core/src/execution_cache/metrics.rs @@ -4,8 +4,8 @@ use tracing::trace; use prometheus::{ - register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec, - IntGauge, Registry, + register_int_counter_vec_with_registry, register_int_counter_with_registry, + register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry, }; pub struct ExecutionCacheMetrics { @@ -15,6 +15,7 @@ pub struct ExecutionCacheMetrics { pub(crate) cache_negative_hits: IntCounterVec, pub(crate) cache_misses: IntCounterVec, pub(crate) cache_writes: IntCounterVec, + pub(crate) expired_tickets: IntCounter, } impl ExecutionCacheMetrics { @@ -65,6 +66,13 @@ impl ExecutionCacheMetrics { registry, ) .unwrap(), + + expired_tickets: register_int_counter_with_registry!( + "execution_cache_expired_tickets", + "Failed inserts to monotonic caches because of expired tickets", + registry, + ) + .unwrap(), } } @@ -121,4 +129,8 @@ impl ExecutionCacheMetrics { pub(crate) fn record_cache_write(&self, collection: &'static str) { self.cache_writes.with_label_values(&[collection]).inc(); } + + pub(crate) fn record_ticket_expiry(&self) { + self.expired_tickets.inc(); + } } diff --git a/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs b/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs index 45ca4ca4d0d71..3e74656031d8a 100644 --- a/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs +++ b/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs @@ -1217,6 +1217,7 @@ async fn test_concurrent_lockers_same_tx() { #[tokio::test] async fn latest_object_cache_race_test() { + telemetry_subscribers::init_for_testing(); let authority = TestAuthorityBuilder::new().build().await; let store = authority.database_for_testing().clone(); @@ -1258,11 +1259,19 @@ async fn latest_object_cache_race_test() { let start = Instant::now(); std::thread::spawn(move || { while start.elapsed() < Duration::from_secs(2) { - let Some(latest_version) = cache + // If you move the get_ticket_for_read to after we get the latest version, + // the test will fail! (this is good, it means the test is doing something) + let ticket = cache .cached .object_by_id_cache + .get_ticket_for_read(&object_id); + + // get the latest version, but then let it become stale + let Some(latest_version) = cache + .dirty + .objects .get(&object_id) - .and_then(|e| e.lock().version()) + .and_then(|e| e.value().get_highest().map(|v| v.0)) else { continue; }; @@ -1275,14 +1284,30 @@ async fn latest_object_cache_race_test() { let object = Object::with_id_owner_version_for_testing(object_id, latest_version, owner); + // because we obtained the ticket before reading the object, we will not write a stale + // version to the cache. cache.cache_latest_object_by_id( &object_id, LatestObjectCacheEntry::Object(latest_version, object.into()), + ticket, ); } }) }; + // a thread that just invalidates the cache as fast as it can + let invalidator = { + let cache = cache.clone(); + let start = Instant::now(); + std::thread::spawn(move || { + while start.elapsed() < Duration::from_secs(2) { + cache.cached.object_by_id_cache.invalidate(&object_id); + // sleep for 1 to 10µs + std::thread::sleep(Duration::from_micros(rand::thread_rng().gen_range(1..10))); + } + }) + }; + // a thread that does nothing but watch to see if the cache goes back in time let checker = { let cache = cache.clone(); @@ -1300,7 +1325,7 @@ async fn latest_object_cache_race_test() { continue; }; - assert!(cur >= latest); + assert!(cur >= latest, "{} >= {}", cur, latest); latest = cur; } }) @@ -1309,4 +1334,5 @@ async fn latest_object_cache_race_test() { writer.join().unwrap(); reader.join().unwrap(); checker.join().unwrap(); + invalidator.join().unwrap(); } diff --git a/crates/sui-core/src/execution_cache/writeback_cache.rs b/crates/sui-core/src/execution_cache/writeback_cache.rs index ce38017ce1d66..0f62e659d69e6 100644 --- a/crates/sui-core/src/execution_cache/writeback_cache.rs +++ b/crates/sui-core/src/execution_cache/writeback_cache.rs @@ -77,6 +77,7 @@ use sui_types::transaction::{VerifiedSignedTransaction, VerifiedTransaction}; use tap::TapOptional; use tracing::{debug, info, instrument, trace, warn}; +use super::cache_types::Ticket; use super::ExecutionCacheAPI; use super::{ cache_types::{CachedVersionMap, IsNewer, MonotonicCache}, @@ -487,10 +488,16 @@ impl WritebackCache { // the tx finalizer, plus checkpoint executor, consensus, and RPCs from fullnodes. let mut entry = self.dirty.objects.entry(*object_id).or_default(); - self.cached.object_by_id_cache.insert( - object_id, - LatestObjectCacheEntry::Object(version, object.clone()), - ); + self.cached + .object_by_id_cache + .insert( + object_id, + LatestObjectCacheEntry::Object(version, object.clone()), + Ticket::Write, + ) + // While Ticket::Write cannot expire, this insert may still fail. + // See the comment in `MonotonicCache::insert`. + .ok(); entry.insert(version, object); } @@ -731,6 +738,7 @@ impl WritebackCache { } fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option { + let ticket = self.cached.object_by_id_cache.get_ticket_for_read(id); match self.get_object_by_id_cache_only(request_type, id) { CacheResult::Hit((_, object)) => Some(object), CacheResult::NegativeHit => None, @@ -740,9 +748,10 @@ impl WritebackCache { self.cache_latest_object_by_id( id, LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()), + ticket, ); } else { - self.cache_object_not_found(id); + self.cache_object_not_found(id, ticket); } obj } @@ -1075,14 +1084,28 @@ impl WritebackCache { } // Updates the latest object id cache with an entry that was read from the db. - fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) { + fn cache_latest_object_by_id( + &self, + object_id: &ObjectID, + object: LatestObjectCacheEntry, + ticket: Ticket, + ) { trace!("caching object by id: {:?} {:?}", object_id, object); - self.metrics.record_cache_write("object_by_id"); - self.cached.object_by_id_cache.insert(object_id, object); + if self + .cached + .object_by_id_cache + .insert(object_id, object, ticket) + .is_ok() + { + self.metrics.record_cache_write("object_by_id"); + } else { + trace!("discarded cache write due to expired ticket"); + self.metrics.record_ticket_expiry(); + } } - fn cache_object_not_found(&self, object_id: &ObjectID) { - self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent); + fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) { + self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket); } fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) { @@ -1432,6 +1455,7 @@ impl ObjectCacheRead for WritebackCache { .cloned() .tap_none(|| panic!("dirty set cannot be empty")) } else { + // TODO: we should try not to read from the db while holding the locks. self.record_db_get("object_lt_or_eq_version_latest") .get_latest_object_or_tombstone(object_id) .expect("db error") @@ -1444,9 +1468,18 @@ impl ObjectCacheRead for WritebackCache { // we can always cache the latest object (or tombstone), even if it is not within the // version_bound. This is done in order to warm the cache in the case where a sequence // of transactions all read the same child object without writing to it. + + // Note: no need to call with_object_by_id_cache_update here, because we are holding + // the lock on the dirty cache entry, and `latest` cannot become out-of-date + // while we hold that lock. self.cache_latest_object_by_id( &object_id, LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()), + // We can get a ticket at the last second, because we are holding the lock + // on dirty, so there cannot be any concurrent writes. + self.cached + .object_by_id_cache + .get_ticket_for_read(&object_id), ); if obj_version <= version_bound { @@ -1470,7 +1503,13 @@ impl ObjectCacheRead for WritebackCache { // cache let highest = cached_entry.and_then(|c| c.get_highest()); assert!(highest.is_none() || highest.unwrap().1.is_tombstone()); - self.cache_object_not_found(&object_id); + self.cache_object_not_found( + &object_id, + // okay to get ticket at last second - see above + self.cached + .object_by_id_cache + .get_ticket_for_read(&object_id), + ); None } },