Skip to content

Commit

Permalink
add decrease_permit
Browse files Browse the repository at this point in the history
  • Loading branch information
mox692 committed Feb 5, 2024
1 parent 131e7b4 commit e32371f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
35 changes: 35 additions & 0 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ impl Semaphore {
return;
}

// MEMO: ここでlockを取るのか.
// Assign permits to the wait queue
self.add_permits_locked(added, self.waiters.lock());
}
Expand Down Expand Up @@ -297,6 +298,8 @@ impl Semaphore {
Acquire::new(self, num_permits)
}

// MEMO: この関数は、単純にpermitを増やすだけじゃなくて、permitふよによってwaiterを起こすこともやっている
// この逆の関数を実行するときは、waiterを起こすことはないので、多少これよりは実装が楽になると思われる.
/// Release `rem` permits to the semaphore's wait list, starting from the
/// end of the queue.
///
Expand All @@ -307,12 +310,16 @@ impl Semaphore {
let mut lock = Some(waiters);
let mut is_empty = false;
while rem > 0 {
// TODO: これなにしてる?どうして引数のwaiterをわざわざ使ってるの?(毎回self.waiters.lock()をせずに)
let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
// MEMO: wakersに空きがあるかぎり
'inner: while wakers.can_push() {
// Was the waiter assigned enough permits to wake it?
match waiters.queue.last() {
Some(waiter) => {
// MEMO: このwaiterにpermitsを割り当てて、queueからpopすることを試みる
if !waiter.assign_permits(&mut rem) {
// MEMO: waiterをpopできずに、permitを使い切った. この関数からreturnする.
break 'inner;
}
}
Expand All @@ -323,6 +330,7 @@ impl Semaphore {
break 'inner;
}
};
// MEMO: popできたwaiterをwakersにpushする.
let mut waiter = waiters.queue.pop_back().unwrap();
if let Some(waker) =
unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
Expand All @@ -331,6 +339,7 @@ impl Semaphore {
}
}

// MEMO: もしpermitが余ったら、Semaphoreのpermitの更新
if rem > 0 && is_empty {
let permits = rem;
assert!(
Expand Down Expand Up @@ -368,6 +377,29 @@ impl Semaphore {
assert_eq!(rem, 0);
}

/// try to decrease at most `n` permits of this semaphore.
/// If the number of "permits" is negative, returns the number of "permits" that were reduced.
/// TODO: Doesn't that conflict with other places looking at self.permits?
pub(crate) fn decrease_permit(&self, n: usize) -> usize {
let mut curr_bit;
loop {
curr_bit = self.permits.load(Acquire);
let curr = curr_bit >> Self::PERMIT_SHIFT;
let new = curr.saturating_sub(n);
match self.permits.compare_exchange(
curr_bit,
new << Self::PERMIT_SHIFT,
AcqRel,
Acquire,
) {
Ok(_) => break,
Err(_) => continue,
};
}
std::cmp::min(curr_bit >> Self::PERMIT_SHIFT, n)
}

// MEMO: async version
fn poll_acquire(
&self,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -398,9 +430,11 @@ impl Semaphore {
.checked_add(acquired)
.expect("number of permits must not overflow");
let (next, acq) = if total >= needed {
// MEMO: 現在availableなpermitの方が、必要とされるpermitよりも多い
let next = curr - (needed - acquired);
(next, needed >> Self::PERMIT_SHIFT)
} else {
// MEMO: permitが不足している
remaining = (needed - acquired) - curr;
(0, curr >> Self::PERMIT_SHIFT)
};
Expand Down Expand Up @@ -519,6 +553,7 @@ impl Waiter {
}
}

// MEMO: このWaiterのpermit(実装上ではpermit)を減らして行って, 0になったらtrueを返す
/// Assign permits to the waiter.
///
/// Returns `true` if the waiter should be removed from the queue
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ impl Semaphore {
self.ll_sem.release(n);
}

/// docs
pub fn decrease_permit(&self, n: usize) -> usize {
self.ll_sem.decrease_permit(n)
}

/// Acquires a permit from the semaphore.
///
/// If the semaphore has been closed, this returns an [`AcquireError`].
Expand Down

0 comments on commit e32371f

Please sign in to comment.