Skip to content

Commit

Permalink
fix notify order
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed May 8, 2024
1 parent ce102f4 commit b46bf76
Showing 1 changed file with 40 additions and 9 deletions.
49 changes: 40 additions & 9 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;

use bytes::Bytes;
use foyer::memory::CacheContext;
use futures::FutureExt;
use parking_lot::Mutex;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_hummock_sdk::key::{
Expand Down Expand Up @@ -175,12 +178,40 @@ enum MemoryRequest {

pub struct PendingRequestCancelGuard {
inner: Option<Arc<MemoryLimiterInner>>,
rc: Receiver<MemoryTrackerImpl>,
}

impl Drop for PendingRequestCancelGuard {
fn drop(&mut self) {
if let Some(limiter) = self.inner.take() {
limiter.may_notify_waiters();
self.rc.close();
if let Ok(msg) = self.rc.try_recv() {
drop(msg);
if limiter.pending_request_count.load(AtomicOrdering::Acquire) > 0 {
limiter.may_notify_waiters();
}
}
}
}
}

impl Future for PendingRequestCancelGuard {
type Output = Option<MemoryTrackerImpl>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
match self.rc.poll_unpin(cx) {
Poll::Ready(Ok(msg)) => {
self.inner.take();
Poll::Ready(Some(msg))
}
Poll::Ready(Err(_)) => {
self.inner.take();
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
Expand All @@ -203,11 +234,11 @@ impl MemoryLimiterInner {

fn may_notify_waiters(self: &Arc<Self>) {
let mut waiters = self.controller.lock();
while let Some((tx, quota)) = waiters.pop_front() {
if !self.try_require_memory(quota) {
waiters.push_front((tx, quota));
while let Some((_, quota)) = waiters.front() {
if !self.try_require_memory(*quota) {
break;
}
let (tx, quota) = waiters.pop_front().unwrap();
let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota));
}

Expand Down Expand Up @@ -275,12 +306,12 @@ impl Debug for MemoryLimiter {
}
}

struct MemoryTrackerImpl {
pub struct MemoryTrackerImpl {
limiter: Arc<MemoryLimiterInner>,
quota: Option<u64>,
}
impl MemoryTrackerImpl {
pub fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
Self {
limiter,
quota: Some(quota),
Expand Down Expand Up @@ -350,11 +381,11 @@ impl MemoryLimiter {
match self.inner.require_memory(quota) {
MemoryRequest::Ready(inner) => MemoryTracker { inner },
MemoryRequest::Pending(rc) => {
let mut guard = PendingRequestCancelGuard {
let guard = PendingRequestCancelGuard {
inner: Some(self.inner.clone()),
rc,
};
let inner = rc.await.unwrap();
guard.inner.take();
let inner = guard.await.unwrap();
MemoryTracker { inner }
}
}
Expand Down

0 comments on commit b46bf76

Please sign in to comment.