Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): wake up memory acquire request in order #15921

Merged
merged 17 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,9 @@ pub struct StorageConfig {
#[serde(default = "default::storage::mem_table_spill_threshold")]
pub mem_table_spill_threshold: usize,

#[serde(default = "default::storage::memory_limit_for_behind_barrier_ratio")]
pub memory_limit_for_behind_barrier_ratio: u64,
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved

#[serde(default)]
pub object_store: ObjectStoreConfig,
}
Expand Down Expand Up @@ -1367,6 +1370,10 @@ pub mod default {
4 << 20
}

pub fn memory_limit_for_behind_barrier_ratio() -> u64 {
70
}

pub fn compactor_fast_max_compact_delete_ratio() -> u32 {
40
}
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ This page is automatically generated by `./risedev generate-example-config`
| max_sub_compaction | Max sub compaction task numbers | 4 |
| max_version_pinning_duration_sec | | 10800 |
| mem_table_spill_threshold | The spill threshold for mem table. | 4194304 |
| memory_limit_for_behind_barrier_ratio | | 70 |
| meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | |
| meta_file_cache | | |
| min_sst_size_for_streaming_upload | Whether to enable streaming upload for sstable. | 33554432 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ compactor_fast_max_compact_delete_ratio = 40
compactor_fast_max_compact_task_size = 2147483648
compactor_iter_max_io_retry_times = 8
mem_table_spill_threshold = 4194304
memory_limit_for_behind_barrier_ratio = 70

[storage.cache.block_cache_eviction]
algorithm = "Lru"
Expand Down
14 changes: 12 additions & 2 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,34 @@ impl BufferTracker {
) -> Self {
let capacity = config.shared_buffer_capacity_mb * (1 << 20);
let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
let blocking_threshold = config.memory_limit_for_behind_barrier_ratio;
assert!(
flush_threshold < capacity,
"flush_threshold {} should be less or equal to capacity {}",
flush_threshold,
capacity
);
Self::new(capacity, flush_threshold, global_upload_task_size)
Self::new(
capacity,
flush_threshold,
blocking_threshold,
global_upload_task_size,
)
}

pub fn new(
capacity: usize,
flush_threshold: usize,
blocking_threshold: u64,
global_upload_task_size: GenericGauge<AtomicU64>,
) -> Self {
assert!(capacity >= flush_threshold);
Self {
flush_threshold,
global_buffer: Arc::new(MemoryLimiter::new(capacity as u64)),
global_buffer: Arc::new(MemoryLimiter::new_with_blocking_ratio(
capacity as u64,
blocking_threshold,
)),
global_upload_task_size,
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1817,8 +1817,12 @@ mod tests {
impl Fn(Vec<ImmId>) -> (BoxFuture<'static, ()>, oneshot::Sender<()>),
) {
// flush threshold is 0. Flush anyway
let buffer_tracker =
BufferTracker::new(usize::MAX, 0, GenericGauge::new("test", "test").unwrap());
let buffer_tracker = BufferTracker::new(
usize::MAX,
0,
100,
GenericGauge::new("test", "test").unwrap(),
);
// (the started task send the imm ids of payload, the started task wait for finish notify)
#[allow(clippy::type_complexity)]
let task_notifier_holder: Arc<
Expand Down
190 changes: 117 additions & 73 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::VecDeque;
use std::fmt::{Debug, Formatter};
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::{Bound, RangeBounds};
Expand All @@ -22,15 +23,15 @@ use std::time::Duration;

use bytes::Bytes;
use foyer::memory::CacheContext;
use parking_lot::Mutex;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_hummock_sdk::key::{
bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey,
};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{can_concat, HummockEpoch};
use risingwave_pb::hummock::SstableInfo;
use tokio::sync::watch::Sender;
use tokio::sync::Notify;
use tokio::sync::oneshot::{channel, Sender};

use super::{HummockError, HummockResult};
use crate::error::StorageResult;
Expand Down Expand Up @@ -166,26 +167,47 @@ pub fn prune_nonoverlapping_ssts<'a>(
ssts[start_table_idx..=end_table_idx].iter()
}

#[derive(Debug)]
type RequestQueue = VecDeque<(Sender<MemoryTrackerImpl>, u64)>;

struct MemoryLimiterInner {
total_size: AtomicU64,
notify: Notify,
controller: Mutex<RequestQueue>,
pending_request_count: Arc<AtomicU64>,
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved
quota: u64,
fast_quota: u64,
}

impl MemoryLimiterInner {
fn release_quota(&self, quota: u64) {
self.total_size.fetch_sub(quota, AtomicOrdering::Release);
self.notify.notify_waiters();
}

fn add_memory(&self, quota: u64) {
self.total_size.fetch_add(quota, AtomicOrdering::SeqCst);
}

fn may_notify_waiters(self: &Arc<Self>) {
let mut waiters = self.controller.lock();
while let Some((tx, quota)) = waiters.pop_front() {
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
if !self.try_require_memory_in_capacity(quota, self.quota) {
waiters.push_front((tx, quota));
break;
}
let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota));
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

@hzxa21 hzxa21 May 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MemoryTrackerImpl and the newly added PendingRequestCancelGuard seem to be an over-design for the corner case mentioned above. Let's analyze all the cases when quota is acquired:

  1. When the quota is acquired via MemoryRequest::Ready (L287), we need to release the quota and notify waiters.
  2. When the quota is acquired here in may_notify_waiters in L243, there are two sub-cases:
    a. tx.send here succeeds, regardless of whether the rx is dropped or not, we need to release quota and notify waiter.
    b. tx.send here fails (due to rx has been dropped before tx.send). we only need to release quota but not notify watier (Otherwise, it will deadlock).

Therefore, the implementation can be greatly simplified without introducing MemoryTrackerImpl and PendingRequestCancelGuard.

// MemoryTrackerImpl is not needed.
struct MemoryTracker {
    limiter: Arc<MemoryLimiterInner>,
    quota: Option<u64>,
}

// MemoryTracker::drop will release quota + notifier waiters if quota is not None (1 + 2.a)
impl Drop for MemoryTracker {
    fn drop(&mut self) {
        if let Some(quota) = self.quota.take() {
            self.limiter.release_quota(quota);
            self.limiter.may_notify_waiters();
        }
    }
}

// Add a new method to releae quota without notify waiters.
impl MemoryTracker {
  fn release_quota(self) {
      if let Some(quota) = self.quota.take() {
            self.limiter.release_quota(quota);
      }
  }
}

...

// MemoryRequest takes MemoryTracker
enum MemoryRequest {
    Ready(MemoryTracker),
    Pending(Receiver<MemoryTracker>),
}

...

impl MemoryLimiterInner {
    ...
    fn may_notify_waiters(self: &Arc<Self>) {
        ...
        let mut waiters = self.controller.lock();
        while let Some((_, quota)) = waiters.front() {
            if !self.try_require_memory(*quota) {
                break;
            }
            ...
            if let Err(SendError(tracker)) = tx.send(MemoryTracker::new(self.clone(), quota)) {
                // 2.b
                tracker.release_quota(quota);
            }
    }
    ...
}

...

impl MemoryLimiter {
    pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
        // PendingRequestCancelGuard is not needed
        match self.inner.require_memory(quota) {
            MemoryRequest::Ready(tracker) => tracker,
            MemoryRequest::Pending(rx) => rx.await
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not send MemoryTracker with in lock of controller because if it drops in this method, it may call may_notify_waiters again, which would acquire one lock twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We met this bug in #6634

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tokio-rs/tokio#6558 looks good from the tokio community, we can have a patch on the tokio version we are using, and then we can adopt the code in this comment.

}

self.pending_request_count
.store(waiters.len() as u64, AtomicOrdering::Release);
}

fn try_require_memory(&self, quota: u64) -> bool {
self.try_require_memory_in_capacity(quota, self.quota)
}

#[inline(always)]
fn try_require_memory_in_capacity(&self, quota: u64, capacity: u64) -> bool {
let mut current_quota = self.total_size.load(AtomicOrdering::Acquire);
while self.permit_quota(current_quota, quota) {
while self.permit_quota(current_quota, capacity) {
match self.total_size.compare_exchange(
current_quota,
current_quota + quota,
Expand All @@ -203,93 +225,99 @@ impl MemoryLimiterInner {
false
}

async fn require_memory(&self, quota: u64) {
let current_quota = self.total_size.load(AtomicOrdering::Acquire);
if self.permit_quota(current_quota, quota)
&& self
.total_size
.compare_exchange(
current_quota,
current_quota + quota,
AtomicOrdering::SeqCst,
AtomicOrdering::SeqCst,
)
.is_ok()
{
// fast path.
return;
}
loop {
let notified = self.notify.notified();
let current_quota = self.total_size.load(AtomicOrdering::Acquire);
if self.permit_quota(current_quota, quota) {
match self.total_size.compare_exchange(
current_quota,
current_quota + quota,
AtomicOrdering::SeqCst,
AtomicOrdering::SeqCst,
) {
Ok(_) => break,
Err(old_quota) => {
// The quota is enough but just changed by other threads. So just try to
// update again without waiting notify.
if self.permit_quota(old_quota, quota) {
continue;
}
}
async fn require_memory(self: &Arc<Self>, quota: u64) -> MemoryTrackerImpl {
let waiter = {
let mut waiters = self.controller.lock();
if waiters.is_empty() {
// When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory_in_capacity` again to avoid deadlock.
self.pending_request_count.store(1, AtomicOrdering::Release);
if self.try_require_memory_in_capacity(quota, self.fast_quota) {
self.pending_request_count.store(0, AtomicOrdering::Release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this fast path code self.try_require_memory_in_capacity(quota, self.fast_quota) to before acquiring the lock? In this way, if we successfully acquire the quota via CAS, we can avoid locking and writing the atomic variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is always called before try_require_memory, it means that has a failed try by CAS without lock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must try to acquire with lock, because if there is no tracker hold by other threads, we can not notify this waiter forever.

return MemoryTrackerImpl::new(self.clone(), quota);
}
}
notified.await;
}
let (tx, rc) = channel();
waiters.push_back((tx, quota));
// This variable can only update within lock.
self.pending_request_count
.store(waiters.len() as u64, AtomicOrdering::Release);
rc
};
waiter.await.unwrap()
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved
}

fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool {
current_quota <= self.quota
fn permit_quota(&self, current_quota: u64, capacity: u64) -> bool {
current_quota <= capacity
}
}

#[derive(Debug)]
pub struct MemoryLimiter {
inner: Arc<MemoryLimiterInner>,
}

pub struct MemoryTracker {
impl Debug for MemoryLimiter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemoryLimiter")
.field("quota", &self.inner.quota)
.field("usage", &self.inner.total_size)
.finish()
}
}

struct MemoryTrackerImpl {
limiter: Arc<MemoryLimiterInner>,
quota: u64,
quota: Option<u64>,
}
impl MemoryTrackerImpl {
pub fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
Self {
limiter,
quota: Some(quota),
}
}
}

pub struct MemoryTracker {
inner: MemoryTrackerImpl,
}

impl Debug for MemoryTracker {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("quota").field("quota", &self.quota).finish()
f.debug_struct("MemoryTracker")
.field("quota", &self.inner.quota)
.finish()
}
}

impl MemoryLimiter {
pub fn unlimit() -> Arc<Self> {
Arc::new(Self {
inner: Arc::new(MemoryLimiterInner {
total_size: AtomicU64::new(0),
notify: Notify::new(),
quota: u64::MAX - 1,
}),
})
Arc::new(Self::new_with_blocking_ratio(u64::MAX / 100, 100))
}

pub fn new(quota: u64) -> Self {
Self::new_with_blocking_ratio(quota, 100)
}

pub fn new_with_blocking_ratio(quota: u64, blocking_ratio: u64) -> Self {
let main_quota = quota / 100 * blocking_ratio;
Self {
inner: Arc::new(MemoryLimiterInner {
total_size: AtomicU64::new(0),
notify: Notify::new(),
controller: Mutex::new(VecDeque::default()),
pending_request_count: Arc::new(AtomicU64::new(0)),
fast_quota: main_quota,
quota,
}),
}
}

pub fn try_require_memory(&self, quota: u64) -> Option<MemoryTracker> {
if self.inner.try_require_memory(quota) {
if self
.inner
.try_require_memory_in_capacity(quota, self.inner.fast_quota)
{
Some(MemoryTracker {
limiter: self.inner.clone(),
quota,
inner: MemoryTrackerImpl::new(self.inner.clone(), quota),
})
} else {
None
Expand All @@ -310,41 +338,57 @@ impl MemoryLimiter {
}

MemoryTracker {
limiter: self.inner.clone(),
quota,
inner: MemoryTrackerImpl::new(self.inner.clone(), quota),
}
}
}

impl MemoryLimiter {
pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
// Since the over provision limiter gets blocked only when the current usage exceeds the
// memory quota, it is allowed to apply for more than the memory quota.
self.inner.require_memory(quota).await;
MemoryTracker {
limiter: self.inner.clone(),
quota,
}
let inner = self.inner.require_memory(quota).await;
MemoryTracker { inner }
}
}

impl MemoryTracker {
pub fn try_increase_memory(&mut self, target: u64) -> bool {
if self.quota >= target {
let quota = self.inner.quota.unwrap();
if quota >= target {
return true;
}
if self.limiter.try_require_memory(target - self.quota) {
self.quota = target;
if self.inner.limiter.try_require_memory(target - quota) {
self.inner.quota = Some(target);
true
} else {
false
}
}
}

impl Drop for MemoryTrackerImpl {
fn drop(&mut self) {
if let Some(quota) = self.quota.take() {
self.limiter.release_quota(quota);
}
}
}

// We must notify waiters outside `MemoryTrackerImpl` to avoid dead-lock and loop-owner.
impl Drop for MemoryTracker {
fn drop(&mut self) {
self.limiter.release_quota(self.quota);
if let Some(quota) = self.inner.quota.take() {
self.inner.limiter.release_quota(quota);
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved
}
// check `inflight_barrier` to avoid access lock every times drop `MemoryTracker`.
if self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can optimize the case when there are concurrent drops in multiple threads.

A rough idea will be, each thread will try to set the pending_request_count to 0 via CAS in a loop, and the only one that successfully sets it to 0 will do the notification.

.inner
.limiter
.pending_request_count
.load(AtomicOrdering::Acquire)
> 0
{
self.inner.limiter.may_notify_waiters();
}
}
}

Expand Down Expand Up @@ -568,7 +612,7 @@ pub(crate) fn filter_with_delete_range<'a>(
}

pub(crate) async fn wait_for_epoch(
notifier: &Sender<HummockEpoch>,
notifier: &tokio::sync::watch::Sender<HummockEpoch>,
wait_epoch: u64,
) -> StorageResult<()> {
let mut receiver = notifier.subscribe();
Expand Down
Loading
Loading