diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index aa23dea7d3c..000ca5042b0 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -232,6 +232,7 @@ impl Semaphore { return; } + // MEMO: ここでlockを取るのか. // Assign permits to the wait queue self.add_permits_locked(added, self.waiters.lock()); } @@ -297,6 +298,8 @@ impl Semaphore { Acquire::new(self, num_permits) } + // MEMO: この関数は、単純にpermitを増やすだけじゃなくて、permitふよによってwaiterを起こすこともやっている + // この逆の関数を実行するときは、waiterを起こすことはないので、多少これよりは実装が楽になると思われる. /// Release `rem` permits to the semaphore's wait list, starting from the /// end of the queue. /// @@ -307,12 +310,16 @@ impl Semaphore { let mut lock = Some(waiters); let mut is_empty = false; while rem > 0 { + // TODO: これなにしてる?どうして引数のwaiterをわざわざ使ってるの?(毎回self.waiters.lock()をせずに) let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock()); + // MEMO: wakersに空きがあるかぎり 'inner: while wakers.can_push() { // Was the waiter assigned enough permits to wake it? match waiters.queue.last() { Some(waiter) => { + // MEMO: このwaiterにpermitsを割り当てて、queueからpopすることを試みる if !waiter.assign_permits(&mut rem) { + // MEMO: waiterをpopできずに、permitを使い切った. この関数からreturnする. break 'inner; } } @@ -323,6 +330,7 @@ impl Semaphore { break 'inner; } }; + // MEMO: popできたwaiterをwakersにpushする. let mut waiter = waiters.queue.pop_back().unwrap(); if let Some(waker) = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) } @@ -331,6 +339,7 @@ impl Semaphore { } } + // MEMO: もしpermitが余ったら、Semaphoreのpermitの更新 if rem > 0 && is_empty { let permits = rem; assert!( @@ -368,6 +377,29 @@ impl Semaphore { assert_eq!(rem, 0); } + /// try to decrease at most `n` permits of this semaphore. + /// If the number of "permits" is negative, returns the number of "permits" that were reduced. + /// TODO: Doesn't that conflict with other places looking at self.permits? + pub(crate) fn decrease_permit(&self, n: usize) -> usize { + let mut curr_bit; + loop { + curr_bit = self.permits.load(Acquire); + let curr = curr_bit >> Self::PERMIT_SHIFT; + let new = curr.saturating_sub(n); + match self.permits.compare_exchange( + curr_bit, + new << Self::PERMIT_SHIFT, + AcqRel, + Acquire, + ) { + Ok(_) => break, + Err(_) => continue, + }; + } + std::cmp::min(curr_bit >> Self::PERMIT_SHIFT, n) + } + + // MEMO: async version fn poll_acquire( &self, cx: &mut Context<'_>, @@ -398,9 +430,11 @@ impl Semaphore { .checked_add(acquired) .expect("number of permits must not overflow"); let (next, acq) = if total >= needed { + // MEMO: 現在availableなpermitの方が、必要とされるpermitよりも多い let next = curr - (needed - acquired); (next, needed >> Self::PERMIT_SHIFT) } else { + // MEMO: permitが不足している remaining = (needed - acquired) - curr; (0, curr >> Self::PERMIT_SHIFT) }; @@ -519,6 +553,7 @@ impl Waiter { } } + // MEMO: このWaiterのpermit(実装上ではpermit)を減らして行って, 0になったらtrueを返す /// Assign permits to the waiter. /// /// Returns `true` if the waiter should be removed from the queue diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 25e4134373c..61917695296 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -481,6 +481,11 @@ impl Semaphore { self.ll_sem.release(n); } + /// docs + pub fn decrease_permit(&self, n: usize) -> usize { + self.ll_sem.decrease_permit(n) + } + /// Acquires a permit from the semaphore. /// /// If the semaphore has been closed, this returns an [`AcquireError`].