From 6226f54ad8c3cabbcaa81c1c7cc4d98b81115ea2 Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Mon, 9 Dec 2024 12:00:21 -1000 Subject: [PATCH] Fix bug in object_by_id_cache (#20450) Suppose that a reader thread is trying to cache an object that it just read, while a writer thread is trying to cache an object that it just wrote. The writer thread definitionally has the latest version. The reader thread may be out of date. While we previously took some care to not replace a new version with an old version, this did not take into account evictions, and so the following bug was possible: READER WRITER read object_by_id_cache (miss) read dirty set (miss) write to dirty read db (old version) write to cache (while holding dirty lock) cache entry is evicted write to cache There is no way for the reader to tell that the value it is caching is out of date, because the up to date entry is already gone from the cache. We fix this by requiring reader threads to obtain a ticket before they read from the dirty set and/or db. Tickets are expired by writers. Then, the above case looks like this: READER WRITER get ticket read cache (miss) read dirty (miss) write dirty read db (old version) expire ticket write cache (while holding dirty lock) cache eviction no write to cache (ticket expired) Any interleaving of the above either results in the reader seeing a recent version, or else having an expired ticket. --- .../src/execution_cache/cache_types.rs | 135 ++++++++++++++++-- .../sui-core/src/execution_cache/metrics.rs | 16 ++- .../unit_tests/writeback_cache_tests.rs | 32 ++++- .../src/execution_cache/writeback_cache.rs | 61 ++++++-- 4 files changed, 215 insertions(+), 29 deletions(-) diff --git a/crates/sui-core/src/execution_cache/cache_types.rs b/crates/sui-core/src/execution_cache/cache_types.rs index 3faea764a1f4d..55a5f81d8fcfc 100644 --- a/crates/sui-core/src/execution_cache/cache_types.rs +++ b/crates/sui-core/src/execution_cache/cache_types.rs @@ -1,10 +1,11 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::cmp::Ordering; use std::collections::VecDeque; -use std::hash::Hash; +use std::hash::{Hash, Hasher}; +use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::{cmp::Ordering, hash::DefaultHasher}; use moka::sync::Cache as MokaCache; use parking_lot::Mutex; @@ -149,8 +150,31 @@ pub trait IsNewer { pub struct MonotonicCache { cache: MokaCache>>, + // When inserting a possibly stale value, we prove that it is not stale by + // ensuring that no fresh value was inserted since we began reading the value + // we are inserting. We do this by hashing the key to an element in this array, + // reading the current value, and then passing that value to insert(). If the + // value is out of date, then there may have been an intervening write, so we + // discard the insert attempt. + key_generation: Vec, } +pub enum Ticket { + // Read tickets are used when caching the result of a read from the db. + // They are only valid if the generation number matches the current generation. + // Used to ensure that no write occurred while we were reading from the db. + Read(u64), + // Write tickets are always valid. Used when caching writes, which cannot be stale. + Write, +} + +// key_generation should be big enough to make false positives unlikely. If, on +// average, there is one millisecond between acquiring the ticket and calling insert(), +// then even at 1 million inserts per second, there will be 1000 inserts between acquiring +// the ticket and calling insert(), so about 1/16th of the entries will be invalidated, +// so valid inserts will succeed with probability 15/16. +const KEY_GENERATION_SIZE: usize = 1024 * 16; + impl MonotonicCache where K: Hash + Eq + Send + Sync + Copy + 'static, @@ -159,6 +183,9 @@ where pub fn new(cache_size: u64) -> Self { Self { cache: MokaCache::builder().max_capacity(cache_size).build(), + key_generation: (0..KEY_GENERATION_SIZE) + .map(|_| AtomicU64::new(0)) + .collect(), } } @@ -166,29 +193,111 @@ where self.cache.get(key) } + fn generation(&self, key: &K) -> &AtomicU64 { + let mut state = DefaultHasher::new(); + key.hash(&mut state); + let hash = state.finish(); + &self.key_generation[(hash % KEY_GENERATION_SIZE as u64) as usize] + } + + /// Get a ticket for caching the result of a read operation. The ticket will be + /// expired if a writer writes a new version of the value. + /// The caller must obtain the ticket BEFORE checking the dirty set and db. By + /// obeying this rule, the caller can be sure that if their ticket remains valid + /// at insert time, they either are inserting the most recent value, or a concurrent + /// writer will shortly overwrite their value. + pub fn get_ticket_for_read(&self, key: &K) -> Ticket { + let gen = self.generation(key); + Ticket::Read(gen.load(std::sync::atomic::Ordering::Acquire)) + } + // Update the cache with guaranteed monotonicity. That is, if there are N // calls to the this function from N threads, the write with the newest value will // win the race regardless of what ordering the writes occur in. // // Caller should log the insert with trace! and increment the appropriate metric. - pub fn insert(&self, key: &K, value: V) { + pub fn insert(&self, key: &K, value: V, ticket: Ticket) -> Result<(), ()> { + let gen = self.generation(key); + + // invalidate other readers as early as possible. If a reader acquires a + // new ticket after this point, then it will read the new value from + // the dirty set (or db). + if matches!(ticket, Ticket::Write) { + gen.fetch_add(1, std::sync::atomic::Ordering::Release); + } + + let check_ticket = || -> Result<(), ()> { + match ticket { + Ticket::Read(ticket) => { + if ticket != gen.load(std::sync::atomic::Ordering::Acquire) { + return Err(()); + } + Ok(()) + } + Ticket::Write => Ok(()), + } + }; + // Warning: tricky code! let entry = self .cache .entry(*key) - // only one racing insert will call the closure - .or_insert_with(|| Arc::new(Mutex::new(value.clone()))); - - // We may be racing with another thread that observed an older version of value + // Suppose there is a reader (who has an old version) and a writer (who has + // the newest version by definition) both trying to insert when the cache has + // no entry. Here are the possible outcomes: + // + // 1. Race in `or_optionally_insert_with`: + // 1. Reader wins race, ticket is valid, and reader inserts old version. + // Writer will overwrite the old version after the !is_fresh check. + // 2. Writer wins race. Reader will enter is_fresh check, lock entry, and + // find that its ticket is expired. + // + // 2. No race on `or_optionally_insert_with`: + // 1. Reader inserts first (via `or_optionally_insert_with`), writer enters !is_fresh + // check and overwrites entry. + // 1. There are two sub-cases here because the reader's entry could be evicted, + // but in either case the writer obviously overwrites it. + // 2. Writer inserts first (via `or_optionally_insert_with`), invalidates ticket. + // Then, two cases can follow: + // 1. Reader skips `or_optionally_insert_with` (because entry is present), enters + // !is_fresh check, and does not insert because its ticket is expired. + // 2. The writer's cache entry is evicted already, so reader enters + // `or_optionally_insert_with`. The ticket is expired so we do not insert. + // + // The other cases are where there is already an entry. In this case neither reader + // nor writer will enter `or_optionally_insert_with` callback. Instead they will both enter + // the !is_fresh check and lock the entry: + // 1. If the reader locks first, it will insert its old version. Then the writer + // will lock and overwrite it with the newer version. + // 2. If the writer locks first, it will have already expired the ticket, and the + // reader will not insert anything. + // + // There may also be more than one concurrent reader. However, the only way the two + // readers can have different versions is if there is concurrently a writer that wrote + // a new version. In this case all stale readers will fail the ticket check, and only + // up-to-date readers will remain. So we cannot have a bad insert caused by two readers + // racing to insert, both with valid tickets. + .or_optionally_insert_with(|| { + check_ticket().ok()?; + Some(Arc::new(Mutex::new(value.clone()))) + }) + // Note: Ticket::Write cannot expire, but an insert can still fail, in the case where + // a writer and reader are racing to call `or_optionally_insert_with`, the reader wins, + // but then fails to insert because its ticket is expired. Then no entry at all is inserted. + .ok_or(())?; + + // !is_fresh means we did not insert a new entry in or_optionally_insert_with above. if !entry.is_fresh() { - // !is_fresh means we lost the race, and entry holds the value that was - // inserted by the other thread. We need to check if we have a more recent value - // than the other reader. let mut entry = entry.value().lock(); - if value.is_newer_than(&entry) { - *entry = value; - } + check_ticket()?; + + // Ticket expiry makes this assert impossible. + // TODO: relax to debug_assert? + assert!(!entry.is_newer_than(&value), "entry is newer than value"); + *entry = value; } + + Ok(()) } pub fn invalidate(&self, key: &K) { diff --git a/crates/sui-core/src/execution_cache/metrics.rs b/crates/sui-core/src/execution_cache/metrics.rs index ef158cd378250..83b946557d964 100644 --- a/crates/sui-core/src/execution_cache/metrics.rs +++ b/crates/sui-core/src/execution_cache/metrics.rs @@ -4,8 +4,8 @@ use tracing::trace; use prometheus::{ - register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec, - IntGauge, Registry, + register_int_counter_vec_with_registry, register_int_counter_with_registry, + register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry, }; pub struct ExecutionCacheMetrics { @@ -15,6 +15,7 @@ pub struct ExecutionCacheMetrics { pub(crate) cache_negative_hits: IntCounterVec, pub(crate) cache_misses: IntCounterVec, pub(crate) cache_writes: IntCounterVec, + pub(crate) expired_tickets: IntCounter, } impl ExecutionCacheMetrics { @@ -65,6 +66,13 @@ impl ExecutionCacheMetrics { registry, ) .unwrap(), + + expired_tickets: register_int_counter_with_registry!( + "execution_cache_expired_tickets", + "Failed inserts to monotonic caches because of expired tickets", + registry, + ) + .unwrap(), } } @@ -121,4 +129,8 @@ impl ExecutionCacheMetrics { pub(crate) fn record_cache_write(&self, collection: &'static str) { self.cache_writes.with_label_values(&[collection]).inc(); } + + pub(crate) fn record_ticket_expiry(&self) { + self.expired_tickets.inc(); + } } diff --git a/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs b/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs index 45ca4ca4d0d71..3e74656031d8a 100644 --- a/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs +++ b/crates/sui-core/src/execution_cache/unit_tests/writeback_cache_tests.rs @@ -1217,6 +1217,7 @@ async fn test_concurrent_lockers_same_tx() { #[tokio::test] async fn latest_object_cache_race_test() { + telemetry_subscribers::init_for_testing(); let authority = TestAuthorityBuilder::new().build().await; let store = authority.database_for_testing().clone(); @@ -1258,11 +1259,19 @@ async fn latest_object_cache_race_test() { let start = Instant::now(); std::thread::spawn(move || { while start.elapsed() < Duration::from_secs(2) { - let Some(latest_version) = cache + // If you move the get_ticket_for_read to after we get the latest version, + // the test will fail! (this is good, it means the test is doing something) + let ticket = cache .cached .object_by_id_cache + .get_ticket_for_read(&object_id); + + // get the latest version, but then let it become stale + let Some(latest_version) = cache + .dirty + .objects .get(&object_id) - .and_then(|e| e.lock().version()) + .and_then(|e| e.value().get_highest().map(|v| v.0)) else { continue; }; @@ -1275,14 +1284,30 @@ async fn latest_object_cache_race_test() { let object = Object::with_id_owner_version_for_testing(object_id, latest_version, owner); + // because we obtained the ticket before reading the object, we will not write a stale + // version to the cache. cache.cache_latest_object_by_id( &object_id, LatestObjectCacheEntry::Object(latest_version, object.into()), + ticket, ); } }) }; + // a thread that just invalidates the cache as fast as it can + let invalidator = { + let cache = cache.clone(); + let start = Instant::now(); + std::thread::spawn(move || { + while start.elapsed() < Duration::from_secs(2) { + cache.cached.object_by_id_cache.invalidate(&object_id); + // sleep for 1 to 10µs + std::thread::sleep(Duration::from_micros(rand::thread_rng().gen_range(1..10))); + } + }) + }; + // a thread that does nothing but watch to see if the cache goes back in time let checker = { let cache = cache.clone(); @@ -1300,7 +1325,7 @@ async fn latest_object_cache_race_test() { continue; }; - assert!(cur >= latest); + assert!(cur >= latest, "{} >= {}", cur, latest); latest = cur; } }) @@ -1309,4 +1334,5 @@ async fn latest_object_cache_race_test() { writer.join().unwrap(); reader.join().unwrap(); checker.join().unwrap(); + invalidator.join().unwrap(); } diff --git a/crates/sui-core/src/execution_cache/writeback_cache.rs b/crates/sui-core/src/execution_cache/writeback_cache.rs index ce38017ce1d66..0f62e659d69e6 100644 --- a/crates/sui-core/src/execution_cache/writeback_cache.rs +++ b/crates/sui-core/src/execution_cache/writeback_cache.rs @@ -77,6 +77,7 @@ use sui_types::transaction::{VerifiedSignedTransaction, VerifiedTransaction}; use tap::TapOptional; use tracing::{debug, info, instrument, trace, warn}; +use super::cache_types::Ticket; use super::ExecutionCacheAPI; use super::{ cache_types::{CachedVersionMap, IsNewer, MonotonicCache}, @@ -487,10 +488,16 @@ impl WritebackCache { // the tx finalizer, plus checkpoint executor, consensus, and RPCs from fullnodes. let mut entry = self.dirty.objects.entry(*object_id).or_default(); - self.cached.object_by_id_cache.insert( - object_id, - LatestObjectCacheEntry::Object(version, object.clone()), - ); + self.cached + .object_by_id_cache + .insert( + object_id, + LatestObjectCacheEntry::Object(version, object.clone()), + Ticket::Write, + ) + // While Ticket::Write cannot expire, this insert may still fail. + // See the comment in `MonotonicCache::insert`. + .ok(); entry.insert(version, object); } @@ -731,6 +738,7 @@ impl WritebackCache { } fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option { + let ticket = self.cached.object_by_id_cache.get_ticket_for_read(id); match self.get_object_by_id_cache_only(request_type, id) { CacheResult::Hit((_, object)) => Some(object), CacheResult::NegativeHit => None, @@ -740,9 +748,10 @@ impl WritebackCache { self.cache_latest_object_by_id( id, LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()), + ticket, ); } else { - self.cache_object_not_found(id); + self.cache_object_not_found(id, ticket); } obj } @@ -1075,14 +1084,28 @@ impl WritebackCache { } // Updates the latest object id cache with an entry that was read from the db. - fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) { + fn cache_latest_object_by_id( + &self, + object_id: &ObjectID, + object: LatestObjectCacheEntry, + ticket: Ticket, + ) { trace!("caching object by id: {:?} {:?}", object_id, object); - self.metrics.record_cache_write("object_by_id"); - self.cached.object_by_id_cache.insert(object_id, object); + if self + .cached + .object_by_id_cache + .insert(object_id, object, ticket) + .is_ok() + { + self.metrics.record_cache_write("object_by_id"); + } else { + trace!("discarded cache write due to expired ticket"); + self.metrics.record_ticket_expiry(); + } } - fn cache_object_not_found(&self, object_id: &ObjectID) { - self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent); + fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) { + self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket); } fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) { @@ -1432,6 +1455,7 @@ impl ObjectCacheRead for WritebackCache { .cloned() .tap_none(|| panic!("dirty set cannot be empty")) } else { + // TODO: we should try not to read from the db while holding the locks. self.record_db_get("object_lt_or_eq_version_latest") .get_latest_object_or_tombstone(object_id) .expect("db error") @@ -1444,9 +1468,18 @@ impl ObjectCacheRead for WritebackCache { // we can always cache the latest object (or tombstone), even if it is not within the // version_bound. This is done in order to warm the cache in the case where a sequence // of transactions all read the same child object without writing to it. + + // Note: no need to call with_object_by_id_cache_update here, because we are holding + // the lock on the dirty cache entry, and `latest` cannot become out-of-date + // while we hold that lock. self.cache_latest_object_by_id( &object_id, LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()), + // We can get a ticket at the last second, because we are holding the lock + // on dirty, so there cannot be any concurrent writes. + self.cached + .object_by_id_cache + .get_ticket_for_read(&object_id), ); if obj_version <= version_bound { @@ -1470,7 +1503,13 @@ impl ObjectCacheRead for WritebackCache { // cache let highest = cached_entry.and_then(|c| c.get_highest()); assert!(highest.is_none() || highest.unwrap().1.is_tombstone()); - self.cache_object_not_found(&object_id); + self.cache_object_not_found( + &object_id, + // okay to get ticket at last second - see above + self.cached + .object_by_id_cache + .get_ticket_for_read(&object_id), + ); None } },