From 44e711ccc578671984439e1e95c64381d6fcaf47 Mon Sep 17 00:00:00 2001 From: Wallace Date: Wed, 15 May 2024 16:01:27 +0800 Subject: [PATCH] feat(storage): wake up memory acquire request in order (#15921) Signed-off-by: Little-Wallace --- src/storage/src/hummock/utils.rs | 205 ++++++++++++++++++++----------- 1 file changed, 133 insertions(+), 72 deletions(-) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index aa1ad2ea64eb..a65f272e4fdb 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -13,15 +13,17 @@ // limitations under the License. use std::cmp::Ordering; +use std::collections::VecDeque; use std::fmt::{Debug, Formatter}; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; -use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}; use std::sync::Arc; use std::time::Duration; use bytes::Bytes; use foyer::CacheContext; +use parking_lot::Mutex; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::key::{ bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey, @@ -29,8 +31,7 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, HummockEpoch}; use risingwave_pb::hummock::SstableInfo; -use tokio::sync::watch::Sender; -use tokio::sync::Notify; +use tokio::sync::oneshot::{channel, Receiver, Sender}; use super::{HummockError, HummockResult}; use crate::error::StorageResult; @@ -166,23 +167,54 @@ pub fn prune_nonoverlapping_ssts<'a>( ssts[start_table_idx..=end_table_idx].iter() } -#[derive(Debug)] +type RequestQueue = VecDeque<(Sender, u64)>; +enum MemoryRequest { + Ready(MemoryTracker), + Pending(Receiver), +} + struct MemoryLimiterInner { total_size: AtomicU64, - notify: Notify, + controller: Mutex, + has_waiter: AtomicBool, quota: u64, } impl MemoryLimiterInner { fn release_quota(&self, quota: u64) { - self.total_size.fetch_sub(quota, AtomicOrdering::Release); - self.notify.notify_waiters(); + self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst); } fn add_memory(&self, quota: u64) { self.total_size.fetch_add(quota, AtomicOrdering::SeqCst); } + fn may_notify_waiters(self: &Arc) { + // check `has_waiter` to avoid access lock every times drop `MemoryTracker`. + if !self.has_waiter.load(AtomicOrdering::Acquire) { + return; + } + let mut notify_waiters = vec![]; + { + let mut waiters = self.controller.lock(); + while let Some((_, quota)) = waiters.front() { + if !self.try_require_memory(*quota) { + break; + } + let (tx, quota) = waiters.pop_front().unwrap(); + notify_waiters.push((tx, quota)); + } + + if waiters.is_empty() { + self.has_waiter.store(false, AtomicOrdering::Release); + } + } + + for (tx, quota) in notify_waiters { + let _ = tx.send(MemoryTracker::new(self.clone(), quota)); + } + } + fn try_require_memory(&self, quota: u64) -> bool { let mut current_quota = self.total_size.load(AtomicOrdering::Acquire); while self.permit_quota(current_quota, quota) { @@ -203,44 +235,23 @@ impl MemoryLimiterInner { false } - async fn require_memory(&self, quota: u64) { - let current_quota = self.total_size.load(AtomicOrdering::Acquire); - if self.permit_quota(current_quota, quota) - && self - .total_size - .compare_exchange( - current_quota, - current_quota + quota, - AtomicOrdering::SeqCst, - AtomicOrdering::SeqCst, - ) - .is_ok() - { - // fast path. - return; + fn require_memory(self: &Arc, quota: u64) -> MemoryRequest { + let mut waiters = self.controller.lock(); + let first_req = waiters.is_empty(); + if first_req { + // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `has_waiter` and found it was false. So we must set it one and retry `try_require_memory` again to avoid deadlock. + self.has_waiter.store(true, AtomicOrdering::Release); } - loop { - let notified = self.notify.notified(); - let current_quota = self.total_size.load(AtomicOrdering::Acquire); - if self.permit_quota(current_quota, quota) { - match self.total_size.compare_exchange( - current_quota, - current_quota + quota, - AtomicOrdering::SeqCst, - AtomicOrdering::SeqCst, - ) { - Ok(_) => break, - Err(old_quota) => { - // The quota is enough but just changed by other threads. So just try to - // update again without waiting notify. - if self.permit_quota(old_quota, quota) { - continue; - } - } - } + // We must require again with lock because some other MemoryTracker may drop just after this thread gets mutex but before it enters queue. + if self.try_require_memory(quota) { + if first_req { + self.has_waiter.store(false, AtomicOrdering::Release); } - notified.await; + return MemoryRequest::Ready(MemoryTracker::new(self.clone(), quota)); } + let (tx, rx) = channel(); + waiters.push_back((tx, quota)); + MemoryRequest::Pending(rx) } fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool { @@ -248,38 +259,51 @@ impl MemoryLimiterInner { } } -#[derive(Debug)] pub struct MemoryLimiter { inner: Arc, } +impl Debug for MemoryLimiter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemoryLimiter") + .field("quota", &self.inner.quota) + .field("usage", &self.inner.total_size) + .finish() + } +} + pub struct MemoryTracker { limiter: Arc, - quota: u64, + quota: Option, +} +impl MemoryTracker { + fn new(limiter: Arc, quota: u64) -> Self { + Self { + limiter, + quota: Some(quota), + } + } } impl Debug for MemoryTracker { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("quota").field("quota", &self.quota).finish() + f.debug_struct("MemoryTracker") + .field("quota", &self.quota) + .finish() } } impl MemoryLimiter { pub fn unlimit() -> Arc { - Arc::new(Self { - inner: Arc::new(MemoryLimiterInner { - total_size: AtomicU64::new(0), - notify: Notify::new(), - quota: u64::MAX - 1, - }), - }) + Arc::new(Self::new(u64::MAX)) } pub fn new(quota: u64) -> Self { Self { inner: Arc::new(MemoryLimiterInner { total_size: AtomicU64::new(0), - notify: Notify::new(), + controller: Mutex::new(VecDeque::default()), + has_waiter: AtomicBool::new(false), quota, }), } @@ -287,10 +311,7 @@ impl MemoryLimiter { pub fn try_require_memory(&self, quota: u64) -> Option { if self.inner.try_require_memory(quota) { - Some(MemoryTracker { - limiter: self.inner.clone(), - quota, - }) + Some(MemoryTracker::new(self.inner.clone(), quota)) } else { None } @@ -309,32 +330,27 @@ impl MemoryLimiter { self.inner.add_memory(quota); } - MemoryTracker { - limiter: self.inner.clone(), - quota, - } + MemoryTracker::new(self.inner.clone(), quota) } } impl MemoryLimiter { pub async fn require_memory(&self, quota: u64) -> MemoryTracker { - // Since the over provision limiter gets blocked only when the current usage exceeds the - // memory quota, it is allowed to apply for more than the memory quota. - self.inner.require_memory(quota).await; - MemoryTracker { - limiter: self.inner.clone(), - quota, + match self.inner.require_memory(quota) { + MemoryRequest::Ready(tracker) => tracker, + MemoryRequest::Pending(rx) => rx.await.unwrap(), } } } impl MemoryTracker { pub fn try_increase_memory(&mut self, target: u64) -> bool { - if self.quota >= target { + let quota = self.quota.unwrap(); + if quota >= target { return true; } - if self.limiter.try_require_memory(target - self.quota) { - self.quota = target; + if self.limiter.try_require_memory(target - quota) { + self.quota = Some(target); true } else { false @@ -342,9 +358,13 @@ impl MemoryTracker { } } +// We must notify waiters outside `MemoryTracker` to avoid dead-lock and loop-owner. impl Drop for MemoryTracker { fn drop(&mut self) { - self.limiter.release_quota(self.quota); + if let Some(quota) = self.quota.take() { + self.limiter.release_quota(quota); + self.limiter.may_notify_waiters(); + } } } @@ -576,7 +596,7 @@ pub(crate) fn filter_with_delete_range<'a>( } pub(crate) async fn wait_for_epoch( - notifier: &Sender, + notifier: &tokio::sync::watch::Sender, wait_epoch: u64, ) -> StorageResult<()> { let mut receiver = notifier.subscribe(); @@ -619,9 +639,12 @@ pub(crate) async fn wait_for_epoch( #[cfg(test)] mod tests { use std::future::{poll_fn, Future}; + use std::sync::Arc; use std::task::Poll; + use futures::future::join_all; use futures::FutureExt; + use rand::random; use crate::hummock::utils::MemoryLimiter; @@ -653,4 +676,42 @@ mod tests { drop(tracker3); assert_eq!(0, memory_limiter.get_memory_usage()); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] + async fn test_multi_thread_acquire_memory() { + const QUOTA: u64 = 10; + let memory_limiter = Arc::new(MemoryLimiter::new(200)); + let mut handles = vec![]; + for _ in 0..40 { + let limiter = memory_limiter.clone(); + let h = tokio::spawn(async move { + let mut buffers = vec![]; + let mut current_buffer_usage = (random::() % 8) + 2; + for _ in 0..1000 { + if buffers.len() < current_buffer_usage + && let Some(tracker) = limiter.try_require_memory(QUOTA) + { + buffers.push(tracker); + } else { + buffers.clear(); + current_buffer_usage = (random::() % 8) + 2; + let req = limiter.require_memory(QUOTA); + match tokio::time::timeout(std::time::Duration::from_millis(1), req).await { + Ok(tracker) => { + buffers.push(tracker); + } + Err(_) => { + continue; + } + } + } + let sleep_time = random::() % 3 + 1; + tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await; + } + }); + handles.push(h); + } + let h = join_all(handles); + let _ = h.await; + } }