Skip to content

Commit

Permalink
Fix BatchSemaphore bug causing deadlocks (#167)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Rajeev Joshi <{ID}+{username}@users.noreply.github.com>
Co-authored-by: Sarek Høverstad Skotåm <[email protected]>
  • Loading branch information
3 people authored Dec 5, 2024
1 parent 4915ff6 commit b77c209
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 106 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ serde_json = { version = "1.0", optional = true }
criterion = { version = "0.4.0", features = ["html_reports"] }
futures = "0.3.15"
proptest = "1.0.0"
proptest-derive = "0.5.0"
regex = "1.5.5"
tempfile = "3.2.0"
test-log = { version = "0.2.8", default-features = false, features = ["trace"] }
Expand Down
102 changes: 60 additions & 42 deletions src/future/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,49 @@ impl BatchSemaphoreState {
Err(TryAcquireError::NoPermits)
}
}

fn unblock_waiters_from_front(&mut self) {
while let Some(front) = self.waiters.front() {
if front.num_permits <= self.permits_available.available() {
let waiter = self.waiters.pop_front().unwrap();

crate::annotations::record_semaphore_acquire_unblocked(
self.id.unwrap(),
waiter.task_id,
waiter.num_permits,
);

// The clock we pass into the semaphore is the clock of the
// waiter, corresponding to the point at which the waiter was
// enqueued. The clock we get in return corresponds to the
// join of the clocks of the acquired permits, used to update
// the waiter's clock to causally depend on the release events.
let clock = self
.permits_available
.acquire(waiter.num_permits, waiter.clock.clone())
.unwrap();
trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter);

// Update waiter state as it is no longer in the queue
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));
assert!(!waiter.has_permits.swap(true, Ordering::SeqCst));
ExecutionState::with(|s| {
let task = s.get_mut(waiter.task_id);
assert!(!task.finished());
// The acquiry is causally dependent on the event
// which released the acquired permits.
task.clock.update(&clock);
task.unblock();
});
let mut maybe_waker = waiter.waker.lock().unwrap();
if let Some(waker) = maybe_waker.take() {
waker.wake();
}
} else {
return;
}
}
}
}

/// Counting semaphore
Expand Down Expand Up @@ -447,6 +490,20 @@ impl BatchSemaphore {

state.waiters.remove(index).unwrap();
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));

match self.fairness {
Fairness::StrictlyFair => {
if index == 0 {
// If the semaphore is strictly fair, and we removed the first waiter, check if its
// removal unblocks remaining waiters. This can happen in the following situation:
// - the semahore has 1 permit available
// - there are 2 waiters W1 and W2 where W1 wants 2 permits, and W2 wants 1 permit
// - if W1 gives up and drops out, we want to ensure W2 is granted the semaphore
state.unblock_waiters_from_front();
}
}
Fairness::Unfair => {}
}
}

/// Acquire the specified number of permits (async API)
Expand Down Expand Up @@ -501,48 +558,9 @@ impl BatchSemaphore {

match self.fairness {
Fairness::StrictlyFair => {
// in a strictly fair mode we will always pick the first waiter
// in the queue, as long as there are enough permits available
while let Some(front) = state.waiters.front() {
if front.num_permits <= state.permits_available.available() {
let waiter = state.waiters.pop_front().unwrap();

crate::annotations::record_semaphore_acquire_unblocked(
state.id.unwrap(),
waiter.task_id,
waiter.num_permits,
);

// The clock we pass into the semaphore is the clock of the
// waiter, corresponding to the point at which the waiter was
// enqueued. The clock we get in return corresponds to the
// join of the clocks of the acquired permits, used to update
// the waiter's clock to causally depend on the release events.
let clock = state
.permits_available
.acquire(waiter.num_permits, waiter.clock.clone())
.unwrap();
trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter);

// Update waiter state as it is no longer in the queue
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));
assert!(!waiter.has_permits.swap(true, Ordering::SeqCst));
ExecutionState::with(|s| {
let task = s.get_mut(waiter.task_id);
assert!(!task.finished());
// The acquiry is causally dependent on the event
// which released the acquired permits.
task.clock.update(&clock);
task.unblock();
});
let mut maybe_waker = waiter.waker.lock().unwrap();
if let Some(waker) = maybe_waker.take() {
waker.wake();
}
} else {
break;
}
}
// in a strictly fair mode we will grant permits to waiters from the front
// of the queue, as long as there are enough permits available
state.unblock_waiters_from_front();
}
Fairness::Unfair => {
// in an unfair mode, we will unblock all the waiters for which
Expand Down
159 changes: 95 additions & 64 deletions tests/future/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,52 +512,63 @@ fn bugged_cleanup_would_cause_deadlock() {
)
}

// This test exercises the following scenario where SEM is a BatchSemaphore with N permits.
// 1. Initially the semaphore has 0 permits
// 2. Task T1 tries to acquire 1 permit, gets added as the first Waiter
// 3. Task T2 tries to acquire 1 permit, gets added as the second Waiter
// 4. Task T0 releases 1 permit
// 5. Task T1 drops its Acquire handle without calling poll()
// This test exercises scenarios to ensure that the BatchSemaphore behaves correctly in the presence
// of tasks that drop an `Acquire` guard without waiting for the semaphore to become available.
//
// At this point, T2 should be woken up and should get the permit.
// Unfortunately, in an earlier version of BatchSemaphore, this was not happening because
// the Drop handler for Acquire was only returning permits to the semaphore, but not
// waking up the next waiter in the queue.
// The general idea is that there are 3 types of tasks: `EarlyDrop`, `Hold` and `Release` tasks
// (determined by the `Behavior` enum).
// 1. The semaphore initially has 0 permits.
// 2. The main task spawns a set of tasks, specifying the behavior and the number of permits each task should request
// 3. Each task polls the semaphore once when it is created, in order to get added as a Waiter.
// 4. The main task releases N semaphores, which are sufficient to ensure that all tasks complete (see below)
// 5. Each task then proceeds according to its defined `behavior`:
// `EarlyDrop` tasks drop their Acquire guards (and are removed from the waiters queue)
// `Hold` tasks wait to acquire their permits, and then terminate (without releasing any permits)
// `Release` tasks wait to acquire their permits, and then release their permits and terminate
//
// The tests below exercise both the specific scenario above (with 1 permit and two tasks), and more
// general scenarios involving multiple tasks, of which a random subset drop the Acquire guards early.
mod early_acquire_drop_test {
// The value of N is computed as
// (sum of permits requested by the Hold tasks) + (max over the permits requested by the Release and EarlyDrop tasks)
mod early_acquire_drop_tests {
use super::*;
use futures::{
future::join_all,
task::{Context, Poll, Waker},
Future,
};
use pin_project::pin_project;
use proptest::proptest;
use proptest::prelude::*;
use proptest_derive::Arbitrary;
use shuttle::{
check_random,
sync::mpsc::{channel, Sender},
};
use std::pin::Pin;
use test_log::test;

#[derive(Arbitrary, Clone, Copy, Debug)]
enum Behavior {
EarlyDrop, // Task drops before future completes
Release, // Task releases permits it acquires
Hold, // Task holds permits it acquires
}

#[pin_project]
struct Task {
poll_count: usize, // how many times the Future has been polled
early_drop: bool, // whether to drop the Acquire handle after polling it once
tx: Sender<Waker>, // channel for informing the main task
poll_count: usize, // how many times the Future has been polled
behavior: Behavior, // how this task should behave
requested_permits: usize, // how many permits this Task requests
tx: Sender<Waker>, // channel for informing the main task that this task is added as a Waiter
#[pin]
acquire: Acquire<'static>,
}

impl Task {
fn new(early_drop: bool, tx: Sender<Waker>, sem: &'static BatchSemaphore) -> Self {
fn new(behavior: Behavior, requested_permits: usize, tx: Sender<Waker>, sem: &'static BatchSemaphore) -> Self {
Self {
poll_count: 0,
early_drop,
behavior,
requested_permits,
tx,
acquire: sem.acquire(1),
acquire: sem.acquire(requested_permits),
}
}
}
Expand All @@ -575,52 +586,43 @@ mod early_acquire_drop_test {
this.tx.send(cx.waker().clone()).unwrap(); // Notify main task
*this.poll_count += 1;
Poll::Pending
} else if *this.early_drop {
} else if matches!(*this.behavior, Behavior::EarlyDrop) {
// Since this is an early drop, we got 0 permits
Poll::Ready(0)
} else {
// If not early dropping, wait until the inner Acquire handle successfully gets
// a permit. When successful, return 1 permit.
this.acquire.as_mut().poll(cx).map(|_| 1)
// a permit. When successful, return the number of permits acquired.
this.acquire.as_mut().poll(cx).map(|_| *this.requested_permits)
}
}
}

// High-level sketch of the test:
// S1. Initialize the semaphore with no permits
// S2. Spawn a set of tasks, each randomly decides whether or not to drop early
// Each task creates an Acquire handle and polls it once (to get into the waiter queue)
// The task then notifies the main task (by sending a message on an mpsc channel)
// S3. The main task waits for messages from all the spawned tasks (so it knows each is a waiter)
// S4. The main task releases N permits on the BatchSemaphore, and wakes up all the tasks
// S5. At this point, each task either drops its Acquire handle, or tries to acquire the BatchSemaphore
// by polling it until it acquires a permit.
fn dropped_acquire_must_release(num_permits: usize, early_drop: Vec<bool>) {
shuttle::lazy_static! {
// S1. Initialize the semaphore with no permits
static ref SEM: BatchSemaphore = BatchSemaphore::new(0, Fairness::StrictlyFair);
}

fn dropped_acquire_must_release(sem: &'static BatchSemaphore, task_config: Vec<(Behavior, usize)>) {
future::block_on(async move {
let mut wakers = vec![];
let mut handles = vec![];

// S2. Main task spawns a set of tasks; the `early_drop` vector of booleans determines
// which tasks will drop the `Acquire` after polling it exactly once
for early_drop in early_drop {
let mut total_held = 0usize;
let mut max_requested = 0usize;

for (behavior, requested_permits) in task_config {
let (tx, rx) = channel();
let task: Task = Task::new(early_drop, tx, &SEM);
match behavior {
Behavior::Hold => total_held += requested_permits,
_ => max_requested = std::cmp::max(max_requested, requested_permits),
}
handles.push(future::spawn(async move {
let task: Task = Task::new(behavior, requested_permits, tx, sem);
let p = task.await;
// Note: tasks doing an early drop will return p=0, and release(0) is a no-op
SEM.release(p);
if matches!(behavior, Behavior::Release) {
sem.release(p);
}
}));
// S3. Main task waits for message from spawned task indicating it has polled once
wakers.push(rx.recv().unwrap());
}

// S4. Main task releases N permits and wakes up all tasks
SEM.release(num_permits);
sem.release(total_held + max_requested);
for w in wakers.into_iter() {
w.wake();
}
Expand All @@ -629,26 +631,55 @@ mod early_acquire_drop_test {
});
}

// The minimal test case (generated by the proptest below) is with 1 permit and 2 tasks, where Task1 does
// an early drop, and Task2 does not early drop. This test checks that scenario exhaustively using check_dfs.
#[test]
fn dropped_acquire_must_release_exhaustive() {
check_dfs(|| dropped_acquire_must_release(1, vec![true, false]), None);
}
macro_rules! sem_tests {
($mod_name:ident, $fairness:expr) => {
mod $mod_name {
use super::*;

// This test checks scenarios where the main task releases multiple permits and there are several tasks, any
// subset of which may do an early drop of their Acquire handle.
#[test_log::test]
fn dropped_acquire_must_release_exhaustive() {
shuttle::lazy_static! {
static ref SEM: BatchSemaphore = BatchSemaphore::new(0, $fairness);
}
check_dfs(
|| dropped_acquire_must_release(&SEM, vec![(Behavior::EarlyDrop, 1), (Behavior::Release, 1)]),
None,
);
}

const MAX_PERMITS: usize = 8;
const MAX_TASKS: usize = 7;
#[test_log::test]
fn dropped_acquire_must_release_deadlock() {
shuttle::lazy_static! {
static ref SEM: BatchSemaphore = BatchSemaphore::new(0, $fairness);
}
check_dfs(
|| dropped_acquire_must_release(&SEM, vec![(Behavior::Hold, 1), (Behavior::EarlyDrop, 2), (Behavior::Release, 1)]),
None,
);
}

proptest! {
#[test]
fn dropped_acquire_must_release_random(num_permits in 1..MAX_PERMITS, early_drop in proptest::collection::vec(proptest::arbitrary::any::<bool>(), 1..MAX_TASKS)) {
check_random(
move || dropped_acquire_must_release(num_permits, early_drop.clone()),
10_000,
);
const MAX_REQUESTED_PERMITS: usize = 3;
const MAX_TASKS: usize = 7;

proptest! {
#[test_log::test]
fn dropped_acquire_must_release_random(behavior in proptest::collection::vec((proptest::arbitrary::any::<Behavior>(), 1..=MAX_REQUESTED_PERMITS), 1..=MAX_TASKS)) {
check_random(
move || {
shuttle::lazy_static! {
static ref SEM: BatchSemaphore = BatchSemaphore::new(0, $fairness);
}
dropped_acquire_must_release(&SEM, behavior.clone())
},
10_000,
);
}
}
}
}
}

sem_tests!(unfair, Fairness::Unfair);

sem_tests!(fair, Fairness::StrictlyFair);
}

0 comments on commit b77c209

Please sign in to comment.