Skip to content

Commit

Permalink
add decrease_permit (#7)
Browse files Browse the repository at this point in the history
add decrease_permit
  • Loading branch information
mox692 authored Feb 6, 2024
1 parent 47a5fe3 commit cd4c507
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 0 deletions.
33 changes: 33 additions & 0 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ impl Semaphore {
return;
}

// MEMO: ここでlockを取るのか.
// Assign permits to the wait queue
self.add_permits_locked(added, self.waiters.lock());
}
Expand Down Expand Up @@ -297,6 +298,8 @@ impl Semaphore {
Acquire::new(self, num_permits)
}

// MEMO: この関数は、単純にpermitを増やすだけじゃなくて、permitふよによってwaiterを起こすこともやっている
// この逆の関数を実行するときは、waiterを起こすことはないので、多少これよりは実装が楽になると思われる.
/// Release `rem` permits to the semaphore's wait list, starting from the
/// end of the queue.
///
Expand All @@ -307,12 +310,16 @@ impl Semaphore {
let mut lock = Some(waiters);
let mut is_empty = false;
while rem > 0 {
// TODO: これなにしてる?どうして引数のwaiterをわざわざ使ってるの?(毎回self.waiters.lock()をせずに)
let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
// MEMO: wakersに空きがあるかぎり
'inner: while wakers.can_push() {
// Was the waiter assigned enough permits to wake it?
match waiters.queue.last() {
Some(waiter) => {
// MEMO: このwaiterにpermitsを割り当てて、queueからpopすることを試みる
if !waiter.assign_permits(&mut rem) {
// MEMO: waiterをpopできずに、permitを使い切った. この関数からreturnする.
break 'inner;
}
}
Expand All @@ -323,6 +330,7 @@ impl Semaphore {
break 'inner;
}
};
// MEMO: popできたwaiterをwakersにpushする.
let mut waiter = waiters.queue.pop_back().unwrap();
if let Some(waker) =
unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
Expand All @@ -331,6 +339,7 @@ impl Semaphore {
}
}

// MEMO: もしpermitが余ったら、Semaphoreのpermitの更新
if rem > 0 && is_empty {
let permits = rem;
assert!(
Expand Down Expand Up @@ -368,6 +377,27 @@ impl Semaphore {
assert_eq!(rem, 0);
}

/// try to decrease at most `n` permits of this semaphore.
/// If the number of "permits" is negative, returns the number of "permits" that were reduced.
pub(crate) fn decrease_permits(&self, n: usize) -> usize {
let mut curr_bit = self.permits.load(Acquire);
loop {
let curr = curr_bit >> Self::PERMIT_SHIFT;
let new = curr.saturating_sub(n);
match self.permits.compare_exchange_weak(
curr_bit,
new << Self::PERMIT_SHIFT,
AcqRel,
Acquire,
) {
Ok(_) => break,
Err(actual) => curr_bit = actual,
};
}
std::cmp::min(curr_bit >> Self::PERMIT_SHIFT, n)
}

// MEMO: async version
fn poll_acquire(
&self,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -398,9 +428,11 @@ impl Semaphore {
.checked_add(acquired)
.expect("number of permits must not overflow");
let (next, acq) = if total >= needed {
// MEMO: 現在availableなpermitの方が、必要とされるpermitよりも多い
let next = curr - (needed - acquired);
(next, needed >> Self::PERMIT_SHIFT)
} else {
// MEMO: permitが不足している
remaining = (needed - acquired) - curr;
(0, curr >> Self::PERMIT_SHIFT)
};
Expand Down Expand Up @@ -519,6 +551,7 @@ impl Waiter {
}
}

// MEMO: このWaiterのpermit(実装上ではpermit)を減らして行って, 0になったらtrueを返す
/// Assign permits to the waiter.
///
/// Returns `true` if the waiter should be removed from the queue
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ impl Semaphore {
self.ll_sem.release(n);
}

/// Decreases `n` permits from the semaphore.
pub fn decrease_permits(&self, n: usize) -> usize {
self.ll_sem.decrease_permits(n)
}

/// Acquires a permit from the semaphore.
///
/// If the semaphore has been closed, this returns an [`AcquireError`].
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/sync/tests/loom_semaphore_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,22 @@ fn release_during_acquire() {
assert_eq!(10, semaphore.available_permits());
})
}

#[test]
fn concurrent_permit_updates() {
loom::model(move || {
let semaphore = Arc::new(Semaphore::new(50));
let t1 = {
let semaphore = semaphore.clone();
thread::spawn(move || semaphore.release(30))
};
let t2 = {
let semaphore = semaphore.clone();
thread::spawn(move || semaphore.decrease_permits(20))
};

t1.join().unwrap();
t2.join().unwrap();
assert_eq!(semaphore.available_permits(), 60);
})
}
27 changes: 27 additions & 0 deletions tokio/src/sync/tests/semaphore_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,33 @@ fn cancel_acquire_releases_permits() {
assert_ok!(s.try_acquire(6));
}

#[test]
fn decrease_permits_basic() {
let s = Semaphore::new(10);
s.decrease_permits(5);
assert_eq!(5, s.available_permits());

let decreased = s.decrease_permits(10);
assert_eq!(5, decreased);
assert_eq!(0, s.available_permits());
}

#[test]
fn acquire_after_decrease_permits() {
let s = Semaphore::new(10);
s.try_acquire(4).expect("uncontended try_acquire succeeds");

s.decrease_permits(4);

let mut acquire = task::spawn(s.acquire(4));
assert_pending!(acquire.poll());
assert_eq!(0, s.available_permits());

drop(acquire);

assert_eq!(2, s.available_permits());
}

#[test]
fn release_permits_at_drop() {
use crate::sync::semaphore::*;
Expand Down

0 comments on commit cd4c507

Please sign in to comment.