Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): wake up memory acquire request in order #15921

Merged
merged 17 commits into from
May 15, 2024
Merged
260 changes: 191 additions & 69 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,28 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::VecDeque;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;

use bytes::Bytes;
use foyer::memory::CacheContext;
use futures::FutureExt;
use parking_lot::Mutex;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_hummock_sdk::key::{
bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey,
};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{can_concat, HummockEpoch};
use risingwave_pb::hummock::SstableInfo;
use tokio::sync::watch::Sender;
use tokio::sync::Notify;
use tokio::sync::oneshot::{channel, Receiver, Sender};

use super::{HummockError, HummockResult};
use crate::error::StorageResult;
Expand Down Expand Up @@ -166,23 +170,88 @@ pub fn prune_nonoverlapping_ssts<'a>(
ssts[start_table_idx..=end_table_idx].iter()
}

#[derive(Debug)]
type RequestQueue = VecDeque<(Sender<MemoryTrackerImpl>, u64)>;
enum MemoryRequest {
Ready(MemoryTrackerImpl),
Pending(Receiver<MemoryTrackerImpl>),
}

pub struct PendingRequestCancelGuard {
inner: Option<Arc<MemoryLimiterInner>>,
rx: Receiver<MemoryTrackerImpl>,
}

impl Drop for PendingRequestCancelGuard {
fn drop(&mut self) {
if let Some(limiter) = self.inner.take() {
// close rc before calling `try_recv`, it will make `MemoryTrackerImpl` which generates after dropping of `PendingRequestCancelGuard` drop in loop of `may_notify_waiters` in other thread.
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved
// If `MemoryTrackerImpl` send before this thread calling `close`, it can still be received by this thread. Once this thread receives the message, it need drop the message and update `total_size` in `MemoryTrackerImpl`'s drop.
self.rx.close();
if let Ok(msg) = self.rx.try_recv() {
drop(msg);
// every time `MemoryTrackerImpl` drop, it will update `total_size` and we need to check whether there exist waiters to be notified.
limiter.may_notify_waiters();
}
}
}
}

impl Future for PendingRequestCancelGuard {
type Output = Option<MemoryTrackerImpl>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
match self.rx.poll_unpin(cx) {
Poll::Ready(Ok(msg)) => {
self.inner.take();
Poll::Ready(Some(msg))
}
Poll::Ready(Err(_)) => {
self.inner.take();
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}

struct MemoryLimiterInner {
total_size: AtomicU64,
notify: Notify,
controller: Mutex<RequestQueue>,
has_waiter: AtomicBool,
quota: u64,
}

impl MemoryLimiterInner {
fn release_quota(&self, quota: u64) {
self.total_size.fetch_sub(quota, AtomicOrdering::Release);
self.notify.notify_waiters();
self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst);
}

fn add_memory(&self, quota: u64) {
self.total_size.fetch_add(quota, AtomicOrdering::SeqCst);
}

fn may_notify_waiters(self: &Arc<Self>) {
// check `has_waiter` to avoid access lock every times drop `MemoryTracker`.
if !self.has_waiter.load(AtomicOrdering::Acquire) {
return;
}
let mut waiters = self.controller.lock();
while let Some((_, quota)) = waiters.front() {
if !self.try_require_memory(*quota) {
break;
}
let (tx, quota) = waiters.pop_front().unwrap();
let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota));
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

@hzxa21 hzxa21 May 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MemoryTrackerImpl and the newly added PendingRequestCancelGuard seem to be an over-design for the corner case mentioned above. Let's analyze all the cases when quota is acquired:

  1. When the quota is acquired via MemoryRequest::Ready (L287), we need to release the quota and notify waiters.
  2. When the quota is acquired here in may_notify_waiters in L243, there are two sub-cases:
    a. tx.send here succeeds, regardless of whether the rx is dropped or not, we need to release quota and notify waiter.
    b. tx.send here fails (due to rx has been dropped before tx.send). we only need to release quota but not notify watier (Otherwise, it will deadlock).

Therefore, the implementation can be greatly simplified without introducing MemoryTrackerImpl and PendingRequestCancelGuard.

// MemoryTrackerImpl is not needed.
struct MemoryTracker {
    limiter: Arc<MemoryLimiterInner>,
    quota: Option<u64>,
}

// MemoryTracker::drop will release quota + notifier waiters if quota is not None (1 + 2.a)
impl Drop for MemoryTracker {
    fn drop(&mut self) {
        if let Some(quota) = self.quota.take() {
            self.limiter.release_quota(quota);
            self.limiter.may_notify_waiters();
        }
    }
}

// Add a new method to releae quota without notify waiters.
impl MemoryTracker {
  fn release_quota(self) {
      if let Some(quota) = self.quota.take() {
            self.limiter.release_quota(quota);
      }
  }
}

...

// MemoryRequest takes MemoryTracker
enum MemoryRequest {
    Ready(MemoryTracker),
    Pending(Receiver<MemoryTracker>),
}

...

impl MemoryLimiterInner {
    ...
    fn may_notify_waiters(self: &Arc<Self>) {
        ...
        let mut waiters = self.controller.lock();
        while let Some((_, quota)) = waiters.front() {
            if !self.try_require_memory(*quota) {
                break;
            }
            ...
            if let Err(SendError(tracker)) = tx.send(MemoryTracker::new(self.clone(), quota)) {
                // 2.b
                tracker.release_quota(quota);
            }
    }
    ...
}

...

impl MemoryLimiter {
    pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
        // PendingRequestCancelGuard is not needed
        match self.inner.require_memory(quota) {
            MemoryRequest::Ready(tracker) => tracker,
            MemoryRequest::Pending(rx) => rx.await
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not send MemoryTracker with in lock of controller because if it drops in this method, it may call may_notify_waiters again, which would acquire one lock twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We met this bug in #6634

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tokio-rs/tokio#6558 looks good from the tokio community, we can have a patch on the tokio version we are using, and then we can adopt the code in this comment.

}

if waiters.is_empty() {
self.has_waiter.store(false, AtomicOrdering::Release);
}
}

fn try_require_memory(&self, quota: u64) -> bool {
let mut current_quota = self.total_size.load(AtomicOrdering::Acquire);
while self.permit_quota(current_quota, quota) {
Expand All @@ -203,83 +272,79 @@ impl MemoryLimiterInner {
false
}

async fn require_memory(&self, quota: u64) {
let current_quota = self.total_size.load(AtomicOrdering::Acquire);
if self.permit_quota(current_quota, quota)
&& self
.total_size
.compare_exchange(
current_quota,
current_quota + quota,
AtomicOrdering::SeqCst,
AtomicOrdering::SeqCst,
)
.is_ok()
{
// fast path.
return;
fn require_memory(self: &Arc<Self>, quota: u64) -> MemoryRequest {
let mut waiters = self.controller.lock();
let first_req = waiters.is_empty();
if first_req {
// When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `has_waiter` and found it was false. So we must set it one and retry `try_require_memory` again to avoid deadlock.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: “set it one”

self.has_waiter.store(true, AtomicOrdering::Release);
}
loop {
let notified = self.notify.notified();
let current_quota = self.total_size.load(AtomicOrdering::Acquire);
if self.permit_quota(current_quota, quota) {
match self.total_size.compare_exchange(
current_quota,
current_quota + quota,
AtomicOrdering::SeqCst,
AtomicOrdering::SeqCst,
) {
Ok(_) => break,
Err(old_quota) => {
// The quota is enough but just changed by other threads. So just try to
// update again without waiting notify.
if self.permit_quota(old_quota, quota) {
continue;
}
}
}
// We must require again with lock because some other MemoryTracker may drop just after this thread gets mutex but before it enters queue.
if self.try_require_memory(quota) {
if first_req {
self.has_waiter.store(false, AtomicOrdering::Release);
}
notified.await;
return MemoryRequest::Ready(MemoryTrackerImpl::new(self.clone(), quota));
}
let (tx, rx) = channel();
waiters.push_back((tx, quota));
MemoryRequest::Pending(rx)
}

fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool {
current_quota <= self.quota
}
}

#[derive(Debug)]
pub struct MemoryLimiter {
inner: Arc<MemoryLimiterInner>,
}

pub struct MemoryTracker {
impl Debug for MemoryLimiter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemoryLimiter")
.field("quota", &self.inner.quota)
.field("usage", &self.inner.total_size)
.finish()
}
}

pub struct MemoryTrackerImpl {
limiter: Arc<MemoryLimiterInner>,
quota: u64,
quota: Option<u64>,
}
impl MemoryTrackerImpl {
fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
Self {
limiter,
quota: Some(quota),
}
}
}

pub struct MemoryTracker {
inner: MemoryTrackerImpl,
}

impl Debug for MemoryTracker {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("quota").field("quota", &self.quota).finish()
f.debug_struct("MemoryTracker")
.field("quota", &self.inner.quota)
.finish()
}
}

impl MemoryLimiter {
pub fn unlimit() -> Arc<Self> {
Arc::new(Self {
inner: Arc::new(MemoryLimiterInner {
total_size: AtomicU64::new(0),
notify: Notify::new(),
quota: u64::MAX - 1,
}),
})
Arc::new(Self::new(u64::MAX))
}

pub fn new(quota: u64) -> Self {
Self {
inner: Arc::new(MemoryLimiterInner {
total_size: AtomicU64::new(0),
notify: Notify::new(),
controller: Mutex::new(VecDeque::default()),
has_waiter: AtomicBool::new(false),
quota,
}),
}
Expand All @@ -288,8 +353,7 @@ impl MemoryLimiter {
pub fn try_require_memory(&self, quota: u64) -> Option<MemoryTracker> {
if self.inner.try_require_memory(quota) {
Some(MemoryTracker {
limiter: self.inner.clone(),
quota,
inner: MemoryTrackerImpl::new(self.inner.clone(), quota),
})
} else {
None
Expand All @@ -310,41 +374,58 @@ impl MemoryLimiter {
}

MemoryTracker {
limiter: self.inner.clone(),
quota,
inner: MemoryTrackerImpl::new(self.inner.clone(), quota),
}
}
}

impl MemoryLimiter {
pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
// Since the over provision limiter gets blocked only when the current usage exceeds the
// memory quota, it is allowed to apply for more than the memory quota.
self.inner.require_memory(quota).await;
MemoryTracker {
limiter: self.inner.clone(),
quota,
match self.inner.require_memory(quota) {
MemoryRequest::Ready(inner) => MemoryTracker { inner },
MemoryRequest::Pending(rx) => {
let guard = PendingRequestCancelGuard {
inner: Some(self.inner.clone()),
rx,
};
// We will never clear an exist `require_memory` request. Every request will be return in some time unless it is canceled. So it is safe to await unwrap here.
let inner = guard.await.unwrap();
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
MemoryTracker { inner }
}
}
}
}

impl MemoryTracker {
pub fn try_increase_memory(&mut self, target: u64) -> bool {
if self.quota >= target {
let quota = self.inner.quota.unwrap();
if quota >= target {
return true;
}
if self.limiter.try_require_memory(target - self.quota) {
self.quota = target;
if self.inner.limiter.try_require_memory(target - quota) {
self.inner.quota = Some(target);
true
} else {
false
}
}
}

impl Drop for MemoryTrackerImpl {
fn drop(&mut self) {
if let Some(quota) = self.quota.take() {
self.limiter.release_quota(quota);
}
}
}

// We must notify waiters outside `MemoryTrackerImpl` to avoid dead-lock and loop-owner.
impl Drop for MemoryTracker {
fn drop(&mut self) {
self.limiter.release_quota(self.quota);
if let Some(quota) = self.inner.quota.take() {
self.inner.limiter.release_quota(quota);
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved
self.inner.limiter.may_notify_waiters();
}
}
}

Expand Down Expand Up @@ -568,7 +649,7 @@ pub(crate) fn filter_with_delete_range<'a>(
}

pub(crate) async fn wait_for_epoch(
notifier: &Sender<HummockEpoch>,
notifier: &tokio::sync::watch::Sender<HummockEpoch>,
wait_epoch: u64,
) -> StorageResult<()> {
let mut receiver = notifier.subscribe();
Expand Down Expand Up @@ -611,9 +692,12 @@ pub(crate) async fn wait_for_epoch(
#[cfg(test)]
mod tests {
use std::future::{poll_fn, Future};
use std::sync::Arc;
use std::task::Poll;

use futures::future::join_all;
use futures::FutureExt;
use rand::random;

use crate::hummock::utils::MemoryLimiter;

Expand Down Expand Up @@ -645,4 +729,42 @@ mod tests {
drop(tracker3);
assert_eq!(0, memory_limiter.get_memory_usage());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_multi_thread_acquire_memory() {
const QUOTA: u64 = 10;
let memory_limiter = Arc::new(MemoryLimiter::new(200));
let mut handles = vec![];
for _ in 0..40 {
let limiter = memory_limiter.clone();
let h = tokio::spawn(async move {
let mut buffers = vec![];
let mut current_buffer_usage = (random::<usize>() % 8) + 2;
for _ in 0..1000 {
if buffers.len() < current_buffer_usage
&& let Some(tracker) = limiter.try_require_memory(QUOTA)
{
buffers.push(tracker);
} else {
buffers.clear();
current_buffer_usage = (random::<usize>() % 8) + 2;
let req = limiter.require_memory(QUOTA);
match tokio::time::timeout(std::time::Duration::from_millis(1), req).await {
Ok(tracker) => {
buffers.push(tracker);
}
Err(_) => {
continue;
}
}
}
let sleep_time = random::<u64>() % 3 + 1;
tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await;
}
});
handles.push(h);
}
let h = join_all(handles);
let _ = h.await;
}
}