Skip to content

Commit

Permalink
feat(storage): wake up memory acquire request in order (#15921)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored May 15, 2024
1 parent 113ede9 commit 44e711c
Showing 1 changed file with 133 additions and 72 deletions.
205 changes: 133 additions & 72 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,25 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::VecDeque;
use std::fmt::{Debug, Formatter};
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use foyer::CacheContext;
use parking_lot::Mutex;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_hummock_sdk::key::{
bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey,
};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{can_concat, HummockEpoch};
use risingwave_pb::hummock::SstableInfo;
use tokio::sync::watch::Sender;
use tokio::sync::Notify;
use tokio::sync::oneshot::{channel, Receiver, Sender};

use super::{HummockError, HummockResult};
use crate::error::StorageResult;
Expand Down Expand Up @@ -166,23 +167,54 @@ pub fn prune_nonoverlapping_ssts<'a>(
ssts[start_table_idx..=end_table_idx].iter()
}

#[derive(Debug)]
type RequestQueue = VecDeque<(Sender<MemoryTracker>, u64)>;
enum MemoryRequest {
Ready(MemoryTracker),
Pending(Receiver<MemoryTracker>),
}

struct MemoryLimiterInner {
total_size: AtomicU64,
notify: Notify,
controller: Mutex<RequestQueue>,
has_waiter: AtomicBool,
quota: u64,
}

impl MemoryLimiterInner {
fn release_quota(&self, quota: u64) {
self.total_size.fetch_sub(quota, AtomicOrdering::Release);
self.notify.notify_waiters();
self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst);
}

fn add_memory(&self, quota: u64) {
self.total_size.fetch_add(quota, AtomicOrdering::SeqCst);
}

fn may_notify_waiters(self: &Arc<Self>) {
// check `has_waiter` to avoid access lock every times drop `MemoryTracker`.
if !self.has_waiter.load(AtomicOrdering::Acquire) {
return;
}
let mut notify_waiters = vec![];
{
let mut waiters = self.controller.lock();
while let Some((_, quota)) = waiters.front() {
if !self.try_require_memory(*quota) {
break;
}
let (tx, quota) = waiters.pop_front().unwrap();
notify_waiters.push((tx, quota));
}

if waiters.is_empty() {
self.has_waiter.store(false, AtomicOrdering::Release);
}
}

for (tx, quota) in notify_waiters {
let _ = tx.send(MemoryTracker::new(self.clone(), quota));
}
}

fn try_require_memory(&self, quota: u64) -> bool {
let mut current_quota = self.total_size.load(AtomicOrdering::Acquire);
while self.permit_quota(current_quota, quota) {
Expand All @@ -203,94 +235,83 @@ impl MemoryLimiterInner {
false
}

async fn require_memory(&self, quota: u64) {
let current_quota = self.total_size.load(AtomicOrdering::Acquire);
if self.permit_quota(current_quota, quota)
&& self
.total_size
.compare_exchange(
current_quota,
current_quota + quota,
AtomicOrdering::SeqCst,
AtomicOrdering::SeqCst,
)
.is_ok()
{
// fast path.
return;
fn require_memory(self: &Arc<Self>, quota: u64) -> MemoryRequest {
let mut waiters = self.controller.lock();
let first_req = waiters.is_empty();
if first_req {
// When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `has_waiter` and found it was false. So we must set it one and retry `try_require_memory` again to avoid deadlock.
self.has_waiter.store(true, AtomicOrdering::Release);
}
loop {
let notified = self.notify.notified();
let current_quota = self.total_size.load(AtomicOrdering::Acquire);
if self.permit_quota(current_quota, quota) {
match self.total_size.compare_exchange(
current_quota,
current_quota + quota,
AtomicOrdering::SeqCst,
AtomicOrdering::SeqCst,
) {
Ok(_) => break,
Err(old_quota) => {
// The quota is enough but just changed by other threads. So just try to
// update again without waiting notify.
if self.permit_quota(old_quota, quota) {
continue;
}
}
}
// We must require again with lock because some other MemoryTracker may drop just after this thread gets mutex but before it enters queue.
if self.try_require_memory(quota) {
if first_req {
self.has_waiter.store(false, AtomicOrdering::Release);
}
notified.await;
return MemoryRequest::Ready(MemoryTracker::new(self.clone(), quota));
}
let (tx, rx) = channel();
waiters.push_back((tx, quota));
MemoryRequest::Pending(rx)
}

fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool {
current_quota <= self.quota
}
}

#[derive(Debug)]
pub struct MemoryLimiter {
inner: Arc<MemoryLimiterInner>,
}

impl Debug for MemoryLimiter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemoryLimiter")
.field("quota", &self.inner.quota)
.field("usage", &self.inner.total_size)
.finish()
}
}

pub struct MemoryTracker {
limiter: Arc<MemoryLimiterInner>,
quota: u64,
quota: Option<u64>,
}
impl MemoryTracker {
fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
Self {
limiter,
quota: Some(quota),
}
}
}

impl Debug for MemoryTracker {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("quota").field("quota", &self.quota).finish()
f.debug_struct("MemoryTracker")
.field("quota", &self.quota)
.finish()
}
}

impl MemoryLimiter {
pub fn unlimit() -> Arc<Self> {
Arc::new(Self {
inner: Arc::new(MemoryLimiterInner {
total_size: AtomicU64::new(0),
notify: Notify::new(),
quota: u64::MAX - 1,
}),
})
Arc::new(Self::new(u64::MAX))
}

pub fn new(quota: u64) -> Self {
Self {
inner: Arc::new(MemoryLimiterInner {
total_size: AtomicU64::new(0),
notify: Notify::new(),
controller: Mutex::new(VecDeque::default()),
has_waiter: AtomicBool::new(false),
quota,
}),
}
}

pub fn try_require_memory(&self, quota: u64) -> Option<MemoryTracker> {
if self.inner.try_require_memory(quota) {
Some(MemoryTracker {
limiter: self.inner.clone(),
quota,
})
Some(MemoryTracker::new(self.inner.clone(), quota))
} else {
None
}
Expand All @@ -309,42 +330,41 @@ impl MemoryLimiter {
self.inner.add_memory(quota);
}

MemoryTracker {
limiter: self.inner.clone(),
quota,
}
MemoryTracker::new(self.inner.clone(), quota)
}
}

impl MemoryLimiter {
pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
// Since the over provision limiter gets blocked only when the current usage exceeds the
// memory quota, it is allowed to apply for more than the memory quota.
self.inner.require_memory(quota).await;
MemoryTracker {
limiter: self.inner.clone(),
quota,
match self.inner.require_memory(quota) {
MemoryRequest::Ready(tracker) => tracker,
MemoryRequest::Pending(rx) => rx.await.unwrap(),
}
}
}

impl MemoryTracker {
pub fn try_increase_memory(&mut self, target: u64) -> bool {
if self.quota >= target {
let quota = self.quota.unwrap();
if quota >= target {
return true;
}
if self.limiter.try_require_memory(target - self.quota) {
self.quota = target;
if self.limiter.try_require_memory(target - quota) {
self.quota = Some(target);
true
} else {
false
}
}
}

// We must notify waiters outside `MemoryTracker` to avoid dead-lock and loop-owner.
impl Drop for MemoryTracker {
fn drop(&mut self) {
self.limiter.release_quota(self.quota);
if let Some(quota) = self.quota.take() {
self.limiter.release_quota(quota);
self.limiter.may_notify_waiters();
}
}
}

Expand Down Expand Up @@ -576,7 +596,7 @@ pub(crate) fn filter_with_delete_range<'a>(
}

pub(crate) async fn wait_for_epoch(
notifier: &Sender<HummockEpoch>,
notifier: &tokio::sync::watch::Sender<HummockEpoch>,
wait_epoch: u64,
) -> StorageResult<()> {
let mut receiver = notifier.subscribe();
Expand Down Expand Up @@ -619,9 +639,12 @@ pub(crate) async fn wait_for_epoch(
#[cfg(test)]
mod tests {
use std::future::{poll_fn, Future};
use std::sync::Arc;
use std::task::Poll;

use futures::future::join_all;
use futures::FutureExt;
use rand::random;

use crate::hummock::utils::MemoryLimiter;

Expand Down Expand Up @@ -653,4 +676,42 @@ mod tests {
drop(tracker3);
assert_eq!(0, memory_limiter.get_memory_usage());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_multi_thread_acquire_memory() {
const QUOTA: u64 = 10;
let memory_limiter = Arc::new(MemoryLimiter::new(200));
let mut handles = vec![];
for _ in 0..40 {
let limiter = memory_limiter.clone();
let h = tokio::spawn(async move {
let mut buffers = vec![];
let mut current_buffer_usage = (random::<usize>() % 8) + 2;
for _ in 0..1000 {
if buffers.len() < current_buffer_usage
&& let Some(tracker) = limiter.try_require_memory(QUOTA)
{
buffers.push(tracker);
} else {
buffers.clear();
current_buffer_usage = (random::<usize>() % 8) + 2;
let req = limiter.require_memory(QUOTA);
match tokio::time::timeout(std::time::Duration::from_millis(1), req).await {
Ok(tracker) => {
buffers.push(tracker);
}
Err(_) => {
continue;
}
}
}
let sleep_time = random::<u64>() % 3 + 1;
tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await;
}
});
handles.push(h);
}
let h = join_all(handles);
let _ = h.await;
}
}

0 comments on commit 44e711c

Please sign in to comment.