Skip to content

Commit

Permalink
Fix bug in object_by_id_cache (#20450)
Browse files Browse the repository at this point in the history
Suppose that a reader thread is trying to cache an object that it just
read, while a writer thread is trying to cache an object that it just
wrote. The writer thread definitionally has the latest version. The
reader thread may be out of date. While we previously took some care to
not replace a new version with an old version, this did not take into
account evictions, and so the following bug was possible:

      READER                            WRITER
      read object_by_id_cache (miss)
      read dirty set (miss)
                                        write to dirty
      read db (old version)
write to cache (while holding dirty lock)
                                        cache entry is evicted
      write to cache

There is no way for the reader to tell that the value it is caching is
out of date, because the up to date entry is already gone from the
cache.

We fix this by requiring reader threads to obtain a ticket before they
read from the dirty set and/or db. Tickets are expired by writers. Then,
the above case looks like this:

      READER                            WRITER
      get ticket
      read cache (miss)
      read dirty (miss)
                                        write dirty
      read db (old version)
                                        expire ticket
write cache (while holding dirty lock)
                                        cache eviction
      no write to cache (ticket expired)

Any interleaving of the above either results in the reader seeing a
recent version, or else having an expired ticket.
  • Loading branch information
mystenmark authored Dec 9, 2024
1 parent b70066c commit 6226f54
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 29 deletions.
135 changes: 122 additions & 13 deletions crates/sui-core/src/execution_cache/cache_types.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::cmp::Ordering;
use std::collections::VecDeque;
use std::hash::Hash;
use std::hash::{Hash, Hasher};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::{cmp::Ordering, hash::DefaultHasher};

use moka::sync::Cache as MokaCache;
use parking_lot::Mutex;
Expand Down Expand Up @@ -149,8 +150,31 @@ pub trait IsNewer {

pub struct MonotonicCache<K, V> {
cache: MokaCache<K, Arc<Mutex<V>>>,
// When inserting a possibly stale value, we prove that it is not stale by
// ensuring that no fresh value was inserted since we began reading the value
// we are inserting. We do this by hashing the key to an element in this array,
// reading the current value, and then passing that value to insert(). If the
// value is out of date, then there may have been an intervening write, so we
// discard the insert attempt.
key_generation: Vec<AtomicU64>,
}

pub enum Ticket {
// Read tickets are used when caching the result of a read from the db.
// They are only valid if the generation number matches the current generation.
// Used to ensure that no write occurred while we were reading from the db.
Read(u64),
// Write tickets are always valid. Used when caching writes, which cannot be stale.
Write,
}

// key_generation should be big enough to make false positives unlikely. If, on
// average, there is one millisecond between acquiring the ticket and calling insert(),
// then even at 1 million inserts per second, there will be 1000 inserts between acquiring
// the ticket and calling insert(), so about 1/16th of the entries will be invalidated,
// so valid inserts will succeed with probability 15/16.
const KEY_GENERATION_SIZE: usize = 1024 * 16;

impl<K, V> MonotonicCache<K, V>
where
K: Hash + Eq + Send + Sync + Copy + 'static,
Expand All @@ -159,36 +183,121 @@ where
pub fn new(cache_size: u64) -> Self {
Self {
cache: MokaCache::builder().max_capacity(cache_size).build(),
key_generation: (0..KEY_GENERATION_SIZE)
.map(|_| AtomicU64::new(0))
.collect(),
}
}

pub fn get(&self, key: &K) -> Option<Arc<Mutex<V>>> {
self.cache.get(key)
}

fn generation(&self, key: &K) -> &AtomicU64 {
let mut state = DefaultHasher::new();
key.hash(&mut state);
let hash = state.finish();
&self.key_generation[(hash % KEY_GENERATION_SIZE as u64) as usize]
}

/// Get a ticket for caching the result of a read operation. The ticket will be
/// expired if a writer writes a new version of the value.
/// The caller must obtain the ticket BEFORE checking the dirty set and db. By
/// obeying this rule, the caller can be sure that if their ticket remains valid
/// at insert time, they either are inserting the most recent value, or a concurrent
/// writer will shortly overwrite their value.
pub fn get_ticket_for_read(&self, key: &K) -> Ticket {
let gen = self.generation(key);
Ticket::Read(gen.load(std::sync::atomic::Ordering::Acquire))
}

// Update the cache with guaranteed monotonicity. That is, if there are N
// calls to the this function from N threads, the write with the newest value will
// win the race regardless of what ordering the writes occur in.
//
// Caller should log the insert with trace! and increment the appropriate metric.
pub fn insert(&self, key: &K, value: V) {
pub fn insert(&self, key: &K, value: V, ticket: Ticket) -> Result<(), ()> {
let gen = self.generation(key);

// invalidate other readers as early as possible. If a reader acquires a
// new ticket after this point, then it will read the new value from
// the dirty set (or db).
if matches!(ticket, Ticket::Write) {
gen.fetch_add(1, std::sync::atomic::Ordering::Release);
}

let check_ticket = || -> Result<(), ()> {
match ticket {
Ticket::Read(ticket) => {
if ticket != gen.load(std::sync::atomic::Ordering::Acquire) {
return Err(());
}
Ok(())
}
Ticket::Write => Ok(()),
}
};

// Warning: tricky code!
let entry = self
.cache
.entry(*key)
// only one racing insert will call the closure
.or_insert_with(|| Arc::new(Mutex::new(value.clone())));

// We may be racing with another thread that observed an older version of value
// Suppose there is a reader (who has an old version) and a writer (who has
// the newest version by definition) both trying to insert when the cache has
// no entry. Here are the possible outcomes:
//
// 1. Race in `or_optionally_insert_with`:
// 1. Reader wins race, ticket is valid, and reader inserts old version.
// Writer will overwrite the old version after the !is_fresh check.
// 2. Writer wins race. Reader will enter is_fresh check, lock entry, and
// find that its ticket is expired.
//
// 2. No race on `or_optionally_insert_with`:
// 1. Reader inserts first (via `or_optionally_insert_with`), writer enters !is_fresh
// check and overwrites entry.
// 1. There are two sub-cases here because the reader's entry could be evicted,
// but in either case the writer obviously overwrites it.
// 2. Writer inserts first (via `or_optionally_insert_with`), invalidates ticket.
// Then, two cases can follow:
// 1. Reader skips `or_optionally_insert_with` (because entry is present), enters
// !is_fresh check, and does not insert because its ticket is expired.
// 2. The writer's cache entry is evicted already, so reader enters
// `or_optionally_insert_with`. The ticket is expired so we do not insert.
//
// The other cases are where there is already an entry. In this case neither reader
// nor writer will enter `or_optionally_insert_with` callback. Instead they will both enter
// the !is_fresh check and lock the entry:
// 1. If the reader locks first, it will insert its old version. Then the writer
// will lock and overwrite it with the newer version.
// 2. If the writer locks first, it will have already expired the ticket, and the
// reader will not insert anything.
//
// There may also be more than one concurrent reader. However, the only way the two
// readers can have different versions is if there is concurrently a writer that wrote
// a new version. In this case all stale readers will fail the ticket check, and only
// up-to-date readers will remain. So we cannot have a bad insert caused by two readers
// racing to insert, both with valid tickets.
.or_optionally_insert_with(|| {
check_ticket().ok()?;
Some(Arc::new(Mutex::new(value.clone())))
})
// Note: Ticket::Write cannot expire, but an insert can still fail, in the case where
// a writer and reader are racing to call `or_optionally_insert_with`, the reader wins,
// but then fails to insert because its ticket is expired. Then no entry at all is inserted.
.ok_or(())?;

// !is_fresh means we did not insert a new entry in or_optionally_insert_with above.
if !entry.is_fresh() {
// !is_fresh means we lost the race, and entry holds the value that was
// inserted by the other thread. We need to check if we have a more recent value
// than the other reader.
let mut entry = entry.value().lock();
if value.is_newer_than(&entry) {
*entry = value;
}
check_ticket()?;

// Ticket expiry makes this assert impossible.
// TODO: relax to debug_assert?
assert!(!entry.is_newer_than(&value), "entry is newer than value");
*entry = value;
}

Ok(())
}

pub fn invalidate(&self, key: &K) {
Expand Down
16 changes: 14 additions & 2 deletions crates/sui-core/src/execution_cache/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
use tracing::trace;

use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec,
IntGauge, Registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry,
};

pub struct ExecutionCacheMetrics {
Expand All @@ -15,6 +15,7 @@ pub struct ExecutionCacheMetrics {
pub(crate) cache_negative_hits: IntCounterVec,
pub(crate) cache_misses: IntCounterVec,
pub(crate) cache_writes: IntCounterVec,
pub(crate) expired_tickets: IntCounter,
}

impl ExecutionCacheMetrics {
Expand Down Expand Up @@ -65,6 +66,13 @@ impl ExecutionCacheMetrics {
registry,
)
.unwrap(),

expired_tickets: register_int_counter_with_registry!(
"execution_cache_expired_tickets",
"Failed inserts to monotonic caches because of expired tickets",
registry,
)
.unwrap(),
}
}

Expand Down Expand Up @@ -121,4 +129,8 @@ impl ExecutionCacheMetrics {
pub(crate) fn record_cache_write(&self, collection: &'static str) {
self.cache_writes.with_label_values(&[collection]).inc();
}

pub(crate) fn record_ticket_expiry(&self) {
self.expired_tickets.inc();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,7 @@ async fn test_concurrent_lockers_same_tx() {

#[tokio::test]
async fn latest_object_cache_race_test() {
telemetry_subscribers::init_for_testing();
let authority = TestAuthorityBuilder::new().build().await;

let store = authority.database_for_testing().clone();
Expand Down Expand Up @@ -1258,11 +1259,19 @@ async fn latest_object_cache_race_test() {
let start = Instant::now();
std::thread::spawn(move || {
while start.elapsed() < Duration::from_secs(2) {
let Some(latest_version) = cache
// If you move the get_ticket_for_read to after we get the latest version,
// the test will fail! (this is good, it means the test is doing something)
let ticket = cache
.cached
.object_by_id_cache
.get_ticket_for_read(&object_id);

// get the latest version, but then let it become stale
let Some(latest_version) = cache
.dirty
.objects
.get(&object_id)
.and_then(|e| e.lock().version())
.and_then(|e| e.value().get_highest().map(|v| v.0))
else {
continue;
};
Expand All @@ -1275,14 +1284,30 @@ async fn latest_object_cache_race_test() {
let object =
Object::with_id_owner_version_for_testing(object_id, latest_version, owner);

// because we obtained the ticket before reading the object, we will not write a stale
// version to the cache.
cache.cache_latest_object_by_id(
&object_id,
LatestObjectCacheEntry::Object(latest_version, object.into()),
ticket,
);
}
})
};

// a thread that just invalidates the cache as fast as it can
let invalidator = {
let cache = cache.clone();
let start = Instant::now();
std::thread::spawn(move || {
while start.elapsed() < Duration::from_secs(2) {
cache.cached.object_by_id_cache.invalidate(&object_id);
// sleep for 1 to 10µs
std::thread::sleep(Duration::from_micros(rand::thread_rng().gen_range(1..10)));
}
})
};

// a thread that does nothing but watch to see if the cache goes back in time
let checker = {
let cache = cache.clone();
Expand All @@ -1300,7 +1325,7 @@ async fn latest_object_cache_race_test() {
continue;
};

assert!(cur >= latest);
assert!(cur >= latest, "{} >= {}", cur, latest);
latest = cur;
}
})
Expand All @@ -1309,4 +1334,5 @@ async fn latest_object_cache_race_test() {
writer.join().unwrap();
reader.join().unwrap();
checker.join().unwrap();
invalidator.join().unwrap();
}
Loading

0 comments on commit 6226f54

Please sign in to comment.