From 0f24619d3b22d9132f00566f600c76fb4a85aed2 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 19 Mar 2024 11:05:49 +0800 Subject: [PATCH 01/14] support blocking behind barrier Signed-off-by: Little-Wallace --- src/common/src/config.rs | 7 + src/config/docs.md | 1 + src/config/example.toml | 1 + .../event_handler/hummock_event_handler.rs | 14 +- .../src/hummock/event_handler/uploader.rs | 23 +- .../hummock/store/local_hummock_storage.rs | 2 +- src/storage/src/hummock/utils.rs | 254 +++++++++++++----- src/storage/src/opts.rs | 2 + 8 files changed, 227 insertions(+), 77 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 59094c949b62c..cda0b518684cf 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -709,6 +709,9 @@ pub struct StorageConfig { #[serde(default = "default::storage::mem_table_spill_threshold")] pub mem_table_spill_threshold: usize, + #[serde(default = "default::storage::memory_limit_for_behind_barrier_ratio")] + pub memory_limit_for_behind_barrier_ratio: u64, + #[serde(default)] pub object_store: ObjectStoreConfig, } @@ -1342,6 +1345,10 @@ pub mod default { 4 << 20 } + pub fn memory_limit_for_behind_barrier_ratio() -> u64 { + 70 + } + pub fn compactor_fast_max_compact_delete_ratio() -> u32 { 40 } diff --git a/src/config/docs.md b/src/config/docs.md index f673ea0c186b4..9c72991e33412 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -117,6 +117,7 @@ This page is automatically generated by `./risedev generate-example-config` | max_sub_compaction | Max sub compaction task numbers | 4 | | max_version_pinning_duration_sec | | 10800 | | mem_table_spill_threshold | The spill threshold for mem table. | 4194304 | +| memory_limit_for_behind_barrier_ratio | | 70 | | meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | | | meta_file_cache | | | | min_sst_size_for_streaming_upload | Whether to enable streaming upload for sstable. | 33554432 | diff --git a/src/config/example.toml b/src/config/example.toml index 3e5bd8b266e07..44e5ea28ca568 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -133,6 +133,7 @@ max_preload_io_retry_times = 3 compactor_fast_max_compact_delete_ratio = 40 compactor_fast_max_compact_task_size = 2147483648 mem_table_spill_threshold = 4194304 +memory_limit_for_behind_barrier_ratio = 70 [storage.cache.block_cache_eviction] algorithm = "Lru" diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index b18f698689c8e..fed02fe3f11ca 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -76,24 +76,34 @@ impl BufferTracker { ) -> Self { let capacity = config.shared_buffer_capacity_mb * (1 << 20); let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize; + let blocking_threshold = config.memory_limit_for_behind_barrier_ratio; assert!( flush_threshold < capacity, "flush_threshold {} should be less or equal to capacity {}", flush_threshold, capacity ); - Self::new(capacity, flush_threshold, global_upload_task_size) + Self::new( + capacity, + flush_threshold, + blocking_threshold, + global_upload_task_size, + ) } pub fn new( capacity: usize, flush_threshold: usize, + blocking_threshold: u64, global_upload_task_size: GenericGauge, ) -> Self { assert!(capacity >= flush_threshold); Self { flush_threshold, - global_buffer: Arc::new(MemoryLimiter::new(capacity as u64)), + global_buffer: Arc::new(MemoryLimiter::new_with_blocking_ratio( + capacity as u64, + blocking_threshold, + )), global_upload_task_size, } } diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index d00f64c42cee3..7289432c97cbb 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -409,6 +409,10 @@ impl UnsealedEpochData { } } + fn get_imm_data_size(&self) -> usize { + self.imms.iter().map(|imm| imm.size()).sum() + } + fn add_table_watermarks( &mut self, table_id: TableId, @@ -887,6 +891,7 @@ impl HummockUploader { } else { debug!("epoch {} to seal has no data", epoch); } + self.buffer_tracker().get_memory_limiter().seal_epoch(epoch); } pub(crate) fn start_merge_imms(&mut self, sealed_epoch: HummockEpoch) { @@ -1061,7 +1066,15 @@ impl HummockUploader { if self.context.buffer_tracker.need_more_flush() { // iterate from older epoch to newer epoch - for unsealed_data in self.unsealed_data.values_mut() { + let mut unseal_epochs = vec![]; + for (epoch, unsealed_data) in self.unsealed_data.iter() { + unseal_epochs.push((unsealed_data.get_imm_data_size(), *epoch)); + } + // flush large data at first to avoid generate small files. + unseal_epochs.sort_by_key(|item| item.0); + unseal_epochs.reverse(); + for (_, epoch) in unseal_epochs { + let unsealed_data = self.unsealed_data.get_mut(&epoch).unwrap(); unsealed_data.flush(&self.context); if !self.context.buffer_tracker.need_more_flush() { break; @@ -1803,8 +1816,12 @@ mod tests { impl Fn(Vec) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), ) { // flush threshold is 0. Flush anyway - let buffer_tracker = - BufferTracker::new(usize::MAX, 0, GenericGauge::new("test", "test").unwrap()); + let buffer_tracker = BufferTracker::new( + usize::MAX, + 0, + 100, + GenericGauge::new("test", "test").unwrap(), + ); // (the started task send the imm ids of payload, the started task wait for finish notify) #[allow(clippy::type_complexity)] let task_notifier_holder: Arc< diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index bfd8d5ace90c9..29ca831a0fab5 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -492,7 +492,7 @@ impl LocalHummockStorage { .send(HummockEvent::BufferMayFlush) .expect("should be able to send"); let tracker = limiter - .require_memory(size as u64) + .require_memory_for_epoch(size as u64, epoch) .verbose_instrument_await("hummock_require_memory") .await; warn!( diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 82af4beadf761..1c19ad59d74f5 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::cmp::Ordering; +use std::collections::{BTreeMap, VecDeque}; use std::fmt::{Debug, Formatter}; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; @@ -22,6 +23,7 @@ use std::time::Duration; use bytes::Bytes; use foyer::memory::CacheContext; +use parking_lot::Mutex; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::key::{ bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey, @@ -29,8 +31,7 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, HummockEpoch}; use risingwave_pb::hummock::SstableInfo; -use tokio::sync::watch::Sender; -use tokio::sync::Notify; +use tokio::sync::oneshot::{channel, Sender}; use super::{HummockError, HummockResult}; use crate::error::StorageResult; @@ -166,26 +167,95 @@ pub fn prune_nonoverlapping_ssts<'a>( ssts[start_table_idx..=end_table_idx].iter() } -#[derive(Debug)] +pub struct BarrierPriorityController { + waiters: BTreeMap, u64)>>, + max_seal_epoch: u64, + min_unseal_epoch: u64, + inflight_epoch_count: Arc, +} + +impl BarrierPriorityController { + pub fn new(inflight_epoch_count: Arc) -> Self { + Self { + inflight_epoch_count, + min_unseal_epoch: 0, + waiters: BTreeMap::default(), + max_seal_epoch: 0, + } + } +} + +impl Debug for BarrierPriorityController { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BarrierPriorityController") + .field("min_unseal_epoch", &self.min_unseal_epoch) + .field("waiter", &self.waiters.keys()) + .field("inflight_epoch_count", &self.inflight_epoch_count) + .finish_non_exhaustive() + } +} + +impl BarrierPriorityController { + fn can_write(&mut self, barrier: u64) -> bool { + if barrier > self.max_seal_epoch && self.max_seal_epoch >= self.min_unseal_epoch { + self.min_unseal_epoch = barrier; + true + } else if barrier <= self.min_unseal_epoch { + self.min_unseal_epoch = barrier; + true + } else { + false + } + } + + pub fn seal_epoch(&mut self, barrier: u64) { + // Only event handler thread will update this variable. + self.max_seal_epoch = barrier; + } +} + struct MemoryLimiterInner { total_size: AtomicU64, - notify: Notify, + contoller: Mutex, + inflight_barrier: Arc, quota: u64, + fast_quota: u64, } impl MemoryLimiterInner { fn release_quota(&self, quota: u64) { self.total_size.fetch_sub(quota, AtomicOrdering::Release); - self.notify.notify_waiters(); } fn add_memory(&self, quota: u64) { self.total_size.fetch_add(quota, AtomicOrdering::SeqCst); } + fn may_notify_waiters(self: &Arc) { + let mut controller = self.contoller.lock(); + while let Some(mut entry) = controller.waiters.first_entry() { + let que = entry.get_mut(); + while let Some((tx, quota)) = que.pop_front() { + if !self.try_require_memory_in_capacity(quota, self.fast_quota) { + que.push_front((tx, quota)); + return; + } + let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota)); + } + entry.remove(); + } + self.inflight_barrier + .store(controller.waiters.len() as u64, AtomicOrdering::Release); + } + fn try_require_memory(&self, quota: u64) -> bool { + self.try_require_memory_in_capacity(quota, self.quota) + } + + #[inline(always)] + fn try_require_memory_in_capacity(&self, quota: u64, capacity: u64) -> bool { let mut current_quota = self.total_size.load(AtomicOrdering::Acquire); - while self.permit_quota(current_quota, quota) { + while self.permit_quota(current_quota, capacity) { match self.total_size.compare_exchange( current_quota, current_quota + quota, @@ -203,93 +273,108 @@ impl MemoryLimiterInner { false } - async fn require_memory(&self, quota: u64) { - let current_quota = self.total_size.load(AtomicOrdering::Acquire); - if self.permit_quota(current_quota, quota) - && self - .total_size - .compare_exchange( - current_quota, - current_quota + quota, - AtomicOrdering::SeqCst, - AtomicOrdering::SeqCst, - ) - .is_ok() - { - // fast path. - return; - } - loop { - let notified = self.notify.notified(); - let current_quota = self.total_size.load(AtomicOrdering::Acquire); - if self.permit_quota(current_quota, quota) { - match self.total_size.compare_exchange( - current_quota, - current_quota + quota, - AtomicOrdering::SeqCst, - AtomicOrdering::SeqCst, - ) { - Ok(_) => break, - Err(old_quota) => { - // The quota is enough but just changed by other threads. So just try to - // update again without waiting notify. - if self.permit_quota(old_quota, quota) { - continue; - } - } + async fn require_memory(self: &Arc, epoch: u64, quota: u64) -> MemoryTrackerImpl { + let waiter = { + let mut controller = self.contoller.lock(); + // only allow first epoch get quota. + if controller.can_write(epoch) { + if self.try_require_memory(quota) { + return MemoryTrackerImpl::new(self.clone(), quota); + } + } else if controller.waiters.is_empty() { + // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory_in_capacity` again to avoid deadlock. + self.inflight_barrier.store(1, AtomicOrdering::Release); + if self.try_require_memory_in_capacity(quota, self.fast_quota) { + self.inflight_barrier.store(0, AtomicOrdering::Release); + return MemoryTrackerImpl::new(self.clone(), quota); } } - notified.await; - } + let (tx, rc) = channel(); + controller + .waiters + .entry(epoch) + .or_default() + .push_back((tx, quota)); + // This variable can only update within lock. + self.inflight_barrier.store(controller.waiters.len() as u64, AtomicOrdering::Release); + rc + }; + waiter.await.unwrap() } - fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool { - current_quota <= self.quota + fn permit_quota(&self, current_quota: u64, capacity: u64) -> bool { + current_quota <= capacity } } -#[derive(Debug)] pub struct MemoryLimiter { inner: Arc, } -pub struct MemoryTracker { +impl Debug for MemoryLimiter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemoryLimiter") + .field("quota", &self.inner.quota) + .field("usage", &self.inner.total_size) + .finish() + } +} + +struct MemoryTrackerImpl { limiter: Arc, - quota: u64, + quota: Option, +} +impl MemoryTrackerImpl { + pub fn new(limiter: Arc, quota: u64) -> Self { + Self { + limiter, + quota: Some(quota), + } + } +} + +pub struct MemoryTracker { + inner: MemoryTrackerImpl, } impl Debug for MemoryTracker { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("quota").field("quota", &self.quota).finish() + f.debug_struct("MemoryTracker") + .field("quota", &self.inner.quota) + .finish() } } impl MemoryLimiter { pub fn unlimit() -> Arc { - Arc::new(Self { - inner: Arc::new(MemoryLimiterInner { - total_size: AtomicU64::new(0), - notify: Notify::new(), - quota: u64::MAX - 1, - }), - }) + Arc::new(Self::new_with_blocking_ratio(u64::MAX / 100, 100)) } pub fn new(quota: u64) -> Self { + Self::new_with_blocking_ratio(quota, 100) + } + + pub fn new_with_blocking_ratio(quota: u64, blocking_ratio: u64) -> Self { + let main_quota = quota * blocking_ratio / 100; + let inflight_barrier = Arc::new(AtomicU64::new(0)); Self { inner: Arc::new(MemoryLimiterInner { total_size: AtomicU64::new(0), - notify: Notify::new(), + contoller: Mutex::new(BarrierPriorityController::new(inflight_barrier.clone())), + inflight_barrier, + fast_quota: main_quota, quota, }), } } pub fn try_require_memory(&self, quota: u64) -> Option { - if self.inner.try_require_memory(quota) { + if self + .inner + .try_require_memory_in_capacity(quota, self.inner.fast_quota) + { Some(MemoryTracker { - limiter: self.inner.clone(), - quota, + inner: MemoryTrackerImpl::new(self.inner.clone(), quota), }) } else { None @@ -304,37 +389,43 @@ impl MemoryLimiter { self.inner.quota } + pub fn seal_epoch(&self, epoch: u64) { + self.inner.contoller.lock().seal_epoch(epoch); + } + pub fn must_require_memory(&self, quota: u64) -> MemoryTracker { if !self.inner.try_require_memory(quota) { self.inner.add_memory(quota); } MemoryTracker { - limiter: self.inner.clone(), - quota, + inner: MemoryTrackerImpl::new(self.inner.clone(), quota), } } } impl MemoryLimiter { pub async fn require_memory(&self, quota: u64) -> MemoryTracker { + let inner = self.inner.require_memory(0, quota).await; + MemoryTracker { inner } + } + + pub async fn require_memory_for_epoch(&self, quota: u64, epoch: u64) -> MemoryTracker { // Since the over provision limiter gets blocked only when the current usage exceeds the // memory quota, it is allowed to apply for more than the memory quota. - self.inner.require_memory(quota).await; - MemoryTracker { - limiter: self.inner.clone(), - quota, - } + let inner = self.inner.require_memory(epoch, quota).await; + MemoryTracker { inner } } } impl MemoryTracker { pub fn try_increase_memory(&mut self, target: u64) -> bool { - if self.quota >= target { + let quota = self.inner.quota.clone().unwrap(); + if quota >= target { return true; } - if self.limiter.try_require_memory(target - self.quota) { - self.quota = target; + if self.inner.limiter.try_require_memory(target - quota) { + self.inner.quota = Some(target); true } else { false @@ -342,9 +433,30 @@ impl MemoryTracker { } } +impl Drop for MemoryTrackerImpl { + fn drop(&mut self) { + if let Some(quota) = self.quota.take() { + self.limiter.release_quota(quota); + } + } +} + +// We must notify waiters outside `MemoryTrackerImpl` to avoid dead-lock and loop-owner. impl Drop for MemoryTracker { fn drop(&mut self) { - self.limiter.release_quota(self.quota); + if let Some(quota) = self.inner.quota.take() { + self.inner.limiter.release_quota(quota); + } + // check `inflight_barrier` to avoid access lock every times drop `MemoryTracker`. + if self + .inner + .limiter + .inflight_barrier + .load(AtomicOrdering::Acquire) + > 0 + { + self.inner.limiter.may_notify_waiters(); + } } } @@ -568,7 +680,7 @@ pub(crate) fn filter_with_delete_range<'a>( } pub(crate) async fn wait_for_epoch( - notifier: &Sender, + notifier: &tokio::sync::watch::Sender, wait_epoch: u64, ) -> StorageResult<()> { let mut receiver = notifier.subscribe(); diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 6a39cad111268..ddeec221bf137 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -77,6 +77,7 @@ pub struct StorageOpts { pub max_sub_compaction: u32, pub max_concurrent_compaction_task_number: u64, pub max_version_pinning_duration_sec: u64, + pub memory_limit_for_behind_barrier_ratio: u64, pub data_file_cache_dir: String, pub data_file_cache_capacity_mb: usize, @@ -269,6 +270,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt .storage .compactor_fast_max_compact_delete_ratio, compactor_fast_max_compact_task_size: c.storage.compactor_fast_max_compact_task_size, + memory_limit_for_behind_barrier_ratio: c.storage.memory_limit_for_behind_barrier_ratio, } } } From cea060643f8a0921b658a2265b02e15591584d38 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 26 Mar 2024 21:31:07 +0800 Subject: [PATCH 02/14] notify and change first barrier Signed-off-by: Little-Wallace --- .../src/hummock/event_handler/uploader.rs | 14 +--- src/storage/src/hummock/utils.rs | 79 +++++++++++++------ 2 files changed, 54 insertions(+), 39 deletions(-) diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 7289432c97cbb..fc60c0fe9230a 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -409,10 +409,6 @@ impl UnsealedEpochData { } } - fn get_imm_data_size(&self) -> usize { - self.imms.iter().map(|imm| imm.size()).sum() - } - fn add_table_watermarks( &mut self, table_id: TableId, @@ -1066,15 +1062,7 @@ impl HummockUploader { if self.context.buffer_tracker.need_more_flush() { // iterate from older epoch to newer epoch - let mut unseal_epochs = vec![]; - for (epoch, unsealed_data) in self.unsealed_data.iter() { - unseal_epochs.push((unsealed_data.get_imm_data_size(), *epoch)); - } - // flush large data at first to avoid generate small files. - unseal_epochs.sort_by_key(|item| item.0); - unseal_epochs.reverse(); - for (_, epoch) in unseal_epochs { - let unsealed_data = self.unsealed_data.get_mut(&epoch).unwrap(); + for unsealed_data in self.unsealed_data.values_mut() { unsealed_data.flush(&self.context); if !self.context.buffer_tracker.need_more_flush() { break; diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 1c19ad59d74f5..af64efdd4e030 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -183,24 +183,11 @@ impl BarrierPriorityController { max_seal_epoch: 0, } } -} -impl Debug for BarrierPriorityController { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BarrierPriorityController") - .field("min_unseal_epoch", &self.min_unseal_epoch) - .field("waiter", &self.waiters.keys()) - .field("inflight_epoch_count", &self.inflight_epoch_count) - .finish_non_exhaustive() - } -} - -impl BarrierPriorityController { fn can_write(&mut self, barrier: u64) -> bool { - if barrier > self.max_seal_epoch && self.max_seal_epoch >= self.min_unseal_epoch { - self.min_unseal_epoch = barrier; - true - } else if barrier <= self.min_unseal_epoch { + if (barrier > self.max_seal_epoch && self.max_seal_epoch >= self.min_unseal_epoch) + || barrier <= self.min_unseal_epoch + { self.min_unseal_epoch = barrier; true } else { @@ -212,11 +199,29 @@ impl BarrierPriorityController { // Only event handler thread will update this variable. self.max_seal_epoch = barrier; } + + pub fn min_inflight_epoch(&self) -> Option { + if self.max_seal_epoch < self.min_unseal_epoch { + Some(self.min_unseal_epoch) + } else { + None + } + } +} + +impl Debug for BarrierPriorityController { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BarrierPriorityController") + .field("min_unseal_epoch", &self.min_unseal_epoch) + .field("waiter", &self.waiters.keys()) + .field("inflight_epoch_count", &self.inflight_epoch_count) + .finish_non_exhaustive() + } } struct MemoryLimiterInner { total_size: AtomicU64, - contoller: Mutex, + controller: Mutex, inflight_barrier: Arc, quota: u64, fast_quota: u64, @@ -232,17 +237,38 @@ impl MemoryLimiterInner { } fn may_notify_waiters(self: &Arc) { - let mut controller = self.contoller.lock(); + let mut controller = self.controller.lock(); + let mut first_inflight_epoch = controller.min_inflight_epoch(); while let Some(mut entry) = controller.waiters.first_entry() { + let epoch = *entry.key(); let que = entry.get_mut(); + + let is_first_barrier = if let Some(first_epoch) = first_inflight_epoch.as_ref() { + epoch <= *first_epoch + } else { + true + }; + + if is_first_barrier { + first_inflight_epoch = Some(epoch); + } while let Some((tx, quota)) = que.pop_front() { - if !self.try_require_memory_in_capacity(quota, self.fast_quota) { + if (is_first_barrier && !self.try_require_memory(quota)) + || (!is_first_barrier + && !self.try_require_memory_in_capacity(quota, self.fast_quota)) + { que.push_front((tx, quota)); - return; + break; } let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota)); } - entry.remove(); + if que.is_empty() { + entry.remove(); + } + } + + if let Some(first_epoch) = first_inflight_epoch { + controller.can_write(first_epoch); } self.inflight_barrier .store(controller.waiters.len() as u64, AtomicOrdering::Release); @@ -275,7 +301,7 @@ impl MemoryLimiterInner { async fn require_memory(self: &Arc, epoch: u64, quota: u64) -> MemoryTrackerImpl { let waiter = { - let mut controller = self.contoller.lock(); + let mut controller = self.controller.lock(); // only allow first epoch get quota. if controller.can_write(epoch) { if self.try_require_memory(quota) { @@ -296,7 +322,8 @@ impl MemoryLimiterInner { .or_default() .push_back((tx, quota)); // This variable can only update within lock. - self.inflight_barrier.store(controller.waiters.len() as u64, AtomicOrdering::Release); + self.inflight_barrier + .store(controller.waiters.len() as u64, AtomicOrdering::Release); rc }; waiter.await.unwrap() @@ -360,7 +387,7 @@ impl MemoryLimiter { Self { inner: Arc::new(MemoryLimiterInner { total_size: AtomicU64::new(0), - contoller: Mutex::new(BarrierPriorityController::new(inflight_barrier.clone())), + controller: Mutex::new(BarrierPriorityController::new(inflight_barrier.clone())), inflight_barrier, fast_quota: main_quota, quota, @@ -390,7 +417,7 @@ impl MemoryLimiter { } pub fn seal_epoch(&self, epoch: u64) { - self.inner.contoller.lock().seal_epoch(epoch); + self.inner.controller.lock().seal_epoch(epoch); } pub fn must_require_memory(&self, quota: u64) -> MemoryTracker { @@ -420,7 +447,7 @@ impl MemoryLimiter { impl MemoryTracker { pub fn try_increase_memory(&mut self, target: u64) -> bool { - let quota = self.inner.quota.clone().unwrap(); + let quota = self.inner.quota.unwrap(); if quota >= target { return true; } From c69ad89f0f8bdc4fc89c223e37a8074576f25bee Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 26 Apr 2024 12:49:05 +0800 Subject: [PATCH 03/14] refactor to single que Signed-off-by: Little-Wallace --- .../src/hummock/event_handler/uploader.rs | 1 - .../hummock/store/local_hummock_storage.rs | 2 +- src/storage/src/hummock/utils.rs | 143 +++--------------- 3 files changed, 25 insertions(+), 121 deletions(-) diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 5ad8a41d1444f..650670282cfea 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -900,7 +900,6 @@ impl HummockUploader { UnsealedEpochData::default() }; self.sealed_data.seal_new_epoch(epoch, unsealed_data); - self.buffer_tracker().get_memory_limiter().seal_epoch(epoch); } pub(crate) fn start_merge_imms(&mut self, sealed_epoch: HummockEpoch) { diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 2d86829f43cc1..5215a9eaf18db 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -492,7 +492,7 @@ impl LocalHummockStorage { .send(HummockEvent::BufferMayFlush) .expect("should be able to send"); let tracker = limiter - .require_memory_for_epoch(size as u64, epoch) + .require_memory(size as u64) .verbose_instrument_await("hummock_require_memory") .await; warn!( diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index af64efdd4e030..5fbfa5f49f65d 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cmp::Ordering; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::VecDeque; use std::fmt::{Debug, Formatter}; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; @@ -167,62 +167,12 @@ pub fn prune_nonoverlapping_ssts<'a>( ssts[start_table_idx..=end_table_idx].iter() } -pub struct BarrierPriorityController { - waiters: BTreeMap, u64)>>, - max_seal_epoch: u64, - min_unseal_epoch: u64, - inflight_epoch_count: Arc, -} - -impl BarrierPriorityController { - pub fn new(inflight_epoch_count: Arc) -> Self { - Self { - inflight_epoch_count, - min_unseal_epoch: 0, - waiters: BTreeMap::default(), - max_seal_epoch: 0, - } - } - - fn can_write(&mut self, barrier: u64) -> bool { - if (barrier > self.max_seal_epoch && self.max_seal_epoch >= self.min_unseal_epoch) - || barrier <= self.min_unseal_epoch - { - self.min_unseal_epoch = barrier; - true - } else { - false - } - } - - pub fn seal_epoch(&mut self, barrier: u64) { - // Only event handler thread will update this variable. - self.max_seal_epoch = barrier; - } - - pub fn min_inflight_epoch(&self) -> Option { - if self.max_seal_epoch < self.min_unseal_epoch { - Some(self.min_unseal_epoch) - } else { - None - } - } -} - -impl Debug for BarrierPriorityController { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BarrierPriorityController") - .field("min_unseal_epoch", &self.min_unseal_epoch) - .field("waiter", &self.waiters.keys()) - .field("inflight_epoch_count", &self.inflight_epoch_count) - .finish_non_exhaustive() - } -} +type RequestQueue = VecDeque<(Sender, u64)>; struct MemoryLimiterInner { total_size: AtomicU64, - controller: Mutex, - inflight_barrier: Arc, + controller: Mutex, + pending_request_count: Arc, quota: u64, fast_quota: u64, } @@ -237,41 +187,17 @@ impl MemoryLimiterInner { } fn may_notify_waiters(self: &Arc) { - let mut controller = self.controller.lock(); - let mut first_inflight_epoch = controller.min_inflight_epoch(); - while let Some(mut entry) = controller.waiters.first_entry() { - let epoch = *entry.key(); - let que = entry.get_mut(); - - let is_first_barrier = if let Some(first_epoch) = first_inflight_epoch.as_ref() { - epoch <= *first_epoch - } else { - true - }; - - if is_first_barrier { - first_inflight_epoch = Some(epoch); - } - while let Some((tx, quota)) = que.pop_front() { - if (is_first_barrier && !self.try_require_memory(quota)) - || (!is_first_barrier - && !self.try_require_memory_in_capacity(quota, self.fast_quota)) - { - que.push_front((tx, quota)); - break; - } - let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota)); - } - if que.is_empty() { - entry.remove(); + let mut waiters = self.controller.lock(); + while let Some((tx, quota)) = waiters.pop_front() { + if !self.try_require_memory_in_capacity(quota, self.quota) { + waiters.push_front((tx, quota)); + break; } + let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota)); } - if let Some(first_epoch) = first_inflight_epoch { - controller.can_write(first_epoch); - } - self.inflight_barrier - .store(controller.waiters.len() as u64, AtomicOrdering::Release); + self.pending_request_count + .store(waiters.len() as u64, AtomicOrdering::Release); } fn try_require_memory(&self, quota: u64) -> bool { @@ -299,31 +225,22 @@ impl MemoryLimiterInner { false } - async fn require_memory(self: &Arc, epoch: u64, quota: u64) -> MemoryTrackerImpl { + async fn require_memory(self: &Arc, quota: u64) -> MemoryTrackerImpl { let waiter = { - let mut controller = self.controller.lock(); - // only allow first epoch get quota. - if controller.can_write(epoch) { - if self.try_require_memory(quota) { - return MemoryTrackerImpl::new(self.clone(), quota); - } - } else if controller.waiters.is_empty() { + let mut waiters = self.controller.lock(); + if waiters.is_empty() { // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory_in_capacity` again to avoid deadlock. - self.inflight_barrier.store(1, AtomicOrdering::Release); + self.pending_request_count.store(1, AtomicOrdering::Release); if self.try_require_memory_in_capacity(quota, self.fast_quota) { - self.inflight_barrier.store(0, AtomicOrdering::Release); + self.pending_request_count.store(0, AtomicOrdering::Release); return MemoryTrackerImpl::new(self.clone(), quota); } } let (tx, rc) = channel(); - controller - .waiters - .entry(epoch) - .or_default() - .push_back((tx, quota)); + waiters.push_back((tx, quota)); // This variable can only update within lock. - self.inflight_barrier - .store(controller.waiters.len() as u64, AtomicOrdering::Release); + self.pending_request_count + .store(waiters.len() as u64, AtomicOrdering::Release); rc }; waiter.await.unwrap() @@ -383,12 +300,11 @@ impl MemoryLimiter { pub fn new_with_blocking_ratio(quota: u64, blocking_ratio: u64) -> Self { let main_quota = quota * blocking_ratio / 100; - let inflight_barrier = Arc::new(AtomicU64::new(0)); Self { inner: Arc::new(MemoryLimiterInner { total_size: AtomicU64::new(0), - controller: Mutex::new(BarrierPriorityController::new(inflight_barrier.clone())), - inflight_barrier, + controller: Mutex::new(VecDeque::default()), + pending_request_count: Arc::new(AtomicU64::new(0)), fast_quota: main_quota, quota, }), @@ -416,10 +332,6 @@ impl MemoryLimiter { self.inner.quota } - pub fn seal_epoch(&self, epoch: u64) { - self.inner.controller.lock().seal_epoch(epoch); - } - pub fn must_require_memory(&self, quota: u64) -> MemoryTracker { if !self.inner.try_require_memory(quota) { self.inner.add_memory(quota); @@ -433,14 +345,7 @@ impl MemoryLimiter { impl MemoryLimiter { pub async fn require_memory(&self, quota: u64) -> MemoryTracker { - let inner = self.inner.require_memory(0, quota).await; - MemoryTracker { inner } - } - - pub async fn require_memory_for_epoch(&self, quota: u64, epoch: u64) -> MemoryTracker { - // Since the over provision limiter gets blocked only when the current usage exceeds the - // memory quota, it is allowed to apply for more than the memory quota. - let inner = self.inner.require_memory(epoch, quota).await; + let inner = self.inner.require_memory(quota).await; MemoryTracker { inner } } } @@ -478,7 +383,7 @@ impl Drop for MemoryTracker { if self .inner .limiter - .inflight_barrier + .pending_request_count .load(AtomicOrdering::Acquire) > 0 { From 8f5120291d8fc7a906ef3ecf3362f92d659fddce Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 6 May 2024 18:05:19 +0800 Subject: [PATCH 04/14] fix memory order Signed-off-by: Little-Wallace --- src/storage/src/hummock/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 5fbfa5f49f65d..2c91f5e0fc469 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -299,7 +299,7 @@ impl MemoryLimiter { } pub fn new_with_blocking_ratio(quota: u64, blocking_ratio: u64) -> Self { - let main_quota = quota * blocking_ratio / 100; + let main_quota = quota / 100 * blocking_ratio; Self { inner: Arc::new(MemoryLimiterInner { total_size: AtomicU64::new(0), From 241a2fb1da59fcd6accaa76d0ccb5235a2b0f8a0 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 7 May 2024 15:32:39 +0800 Subject: [PATCH 05/14] fix ut Signed-off-by: Little-Wallace --- src/storage/src/hummock/utils.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 2c91f5e0fc469..89edcdbeba33f 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -228,13 +228,16 @@ impl MemoryLimiterInner { async fn require_memory(self: &Arc, quota: u64) -> MemoryTrackerImpl { let waiter = { let mut waiters = self.controller.lock(); - if waiters.is_empty() { + let first_req = waiters.is_empty() ; + if first_req { // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory_in_capacity` again to avoid deadlock. self.pending_request_count.store(1, AtomicOrdering::Release); - if self.try_require_memory_in_capacity(quota, self.fast_quota) { + } + if self.try_require_memory_in_capacity(quota, self.quota) { + if first_req { self.pending_request_count.store(0, AtomicOrdering::Release); - return MemoryTrackerImpl::new(self.clone(), quota); } + return MemoryTrackerImpl::new(self.clone(), quota); } let (tx, rc) = channel(); waiters.push_back((tx, quota)); @@ -291,7 +294,7 @@ impl Debug for MemoryTracker { impl MemoryLimiter { pub fn unlimit() -> Arc { - Arc::new(Self::new_with_blocking_ratio(u64::MAX / 100, 100)) + Arc::new(Self::new_with_blocking_ratio(u64::MAX, 100)) } pub fn new(quota: u64) -> Self { From 93f608dcf18b2741601444941461700d8a9c7574 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 7 May 2024 16:27:14 +0800 Subject: [PATCH 06/14] address comment Signed-off-by: Little-Wallace --- src/storage/src/hummock/utils.rs | 93 ++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 35 deletions(-) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 89edcdbeba33f..21c66270dfb00 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -31,7 +31,7 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, HummockEpoch}; use risingwave_pb::hummock::SstableInfo; -use tokio::sync::oneshot::{channel, Sender}; +use tokio::sync::oneshot::{channel, Receiver, Sender}; use super::{HummockError, HummockResult}; use crate::error::StorageResult; @@ -168,11 +168,27 @@ pub fn prune_nonoverlapping_ssts<'a>( } type RequestQueue = VecDeque<(Sender, u64)>; +enum MemoryRequest { + Ready(MemoryTrackerImpl), + Pending(Receiver), +} + +pub struct PendingRequestCancelGuard { + inner: Option>, +} + +impl Drop for PendingRequestCancelGuard { + fn drop(&mut self) { + if let Some(limiter) = self.inner.take() { + limiter.may_notify_waiters(); + } + } +} struct MemoryLimiterInner { total_size: AtomicU64, controller: Mutex, - pending_request_count: Arc, + pending_request_count: AtomicU64, quota: u64, fast_quota: u64, } @@ -225,28 +241,26 @@ impl MemoryLimiterInner { false } - async fn require_memory(self: &Arc, quota: u64) -> MemoryTrackerImpl { - let waiter = { - let mut waiters = self.controller.lock(); - let first_req = waiters.is_empty() ; + fn require_memory(self: &Arc, quota: u64) -> MemoryRequest { + let mut waiters = self.controller.lock(); + let first_req = waiters.is_empty(); + if first_req { + // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory_in_capacity` again to avoid deadlock. + self.pending_request_count.store(1, AtomicOrdering::Release); + } + // We must require again with lock because some other MemoryTracker may drop just after this thread gets mutex but before it enters queue. + if self.try_require_memory_in_capacity(quota, self.quota) { if first_req { - // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory_in_capacity` again to avoid deadlock. - self.pending_request_count.store(1, AtomicOrdering::Release); - } - if self.try_require_memory_in_capacity(quota, self.quota) { - if first_req { - self.pending_request_count.store(0, AtomicOrdering::Release); - } - return MemoryTrackerImpl::new(self.clone(), quota); + self.pending_request_count.store(0, AtomicOrdering::Release); } - let (tx, rc) = channel(); - waiters.push_back((tx, quota)); - // This variable can only update within lock. - self.pending_request_count - .store(waiters.len() as u64, AtomicOrdering::Release); - rc - }; - waiter.await.unwrap() + return MemoryRequest::Ready(MemoryTrackerImpl::new(self.clone(), quota)); + } + let (tx, rc) = channel(); + waiters.push_back((tx, quota)); + // This variable can only update within lock. + self.pending_request_count + .store(waiters.len() as u64, AtomicOrdering::Release); + MemoryRequest::Pending(rc) } fn permit_quota(&self, current_quota: u64, capacity: u64) -> bool { @@ -307,7 +321,7 @@ impl MemoryLimiter { inner: Arc::new(MemoryLimiterInner { total_size: AtomicU64::new(0), controller: Mutex::new(VecDeque::default()), - pending_request_count: Arc::new(AtomicU64::new(0)), + pending_request_count: AtomicU64::new(0), fast_quota: main_quota, quota, }), @@ -348,8 +362,17 @@ impl MemoryLimiter { impl MemoryLimiter { pub async fn require_memory(&self, quota: u64) -> MemoryTracker { - let inner = self.inner.require_memory(quota).await; - MemoryTracker { inner } + match self.inner.require_memory(quota) { + MemoryRequest::Ready(inner) => MemoryTracker { inner }, + MemoryRequest::Pending(rc) => { + let mut guard = PendingRequestCancelGuard { + inner: Some(self.inner.clone()), + }; + let inner = rc.await.unwrap(); + guard.inner.take(); + MemoryTracker { inner } + } + } } } @@ -381,16 +404,16 @@ impl Drop for MemoryTracker { fn drop(&mut self) { if let Some(quota) = self.inner.quota.take() { self.inner.limiter.release_quota(quota); - } - // check `inflight_barrier` to avoid access lock every times drop `MemoryTracker`. - if self - .inner - .limiter - .pending_request_count - .load(AtomicOrdering::Acquire) - > 0 - { - self.inner.limiter.may_notify_waiters(); + // check `inflight_barrier` to avoid access lock every times drop `MemoryTracker`. + if self + .inner + .limiter + .pending_request_count + .load(AtomicOrdering::Acquire) + > 0 + { + self.inner.limiter.may_notify_waiters(); + } } } } From b6ae16bf1ee080bd7e1f844caf9f20d1fe56a4d9 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 7 May 2024 18:11:08 +0800 Subject: [PATCH 07/14] revert fast quota Signed-off-by: Little-Wallace --- src/common/src/config.rs | 8 ----- src/config/docs.md | 1 - src/config/example.toml | 1 - .../event_handler/hummock_event_handler.rs | 14 ++------- src/storage/src/hummock/utils.rs | 31 +++++-------------- src/storage/src/opts.rs | 2 -- 6 files changed, 10 insertions(+), 47 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 8cf015eb2ecd1..9b75d735f333d 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -721,9 +721,6 @@ pub struct StorageConfig { #[serde(default = "default::storage::mem_table_spill_threshold")] pub mem_table_spill_threshold: usize, - #[serde(default = "default::storage::memory_limit_for_behind_barrier_ratio")] - pub memory_limit_for_behind_barrier_ratio: u64, - #[serde(default)] pub object_store: ObjectStoreConfig, } @@ -1369,11 +1366,6 @@ pub mod default { pub fn mem_table_spill_threshold() -> usize { 4 << 20 } - - pub fn memory_limit_for_behind_barrier_ratio() -> u64 { - 70 - } - pub fn compactor_fast_max_compact_delete_ratio() -> u32 { 40 } diff --git a/src/config/docs.md b/src/config/docs.md index f03b170542e4a..d925a428fb211 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -119,7 +119,6 @@ This page is automatically generated by `./risedev generate-example-config` | max_sub_compaction | Max sub compaction task numbers | 4 | | max_version_pinning_duration_sec | | 10800 | | mem_table_spill_threshold | The spill threshold for mem table. | 4194304 | -| memory_limit_for_behind_barrier_ratio | | 70 | | meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | | | meta_file_cache | | | | min_sst_size_for_streaming_upload | Whether to enable streaming upload for sstable. | 33554432 | diff --git a/src/config/example.toml b/src/config/example.toml index 05c5c4d7a7c04..b8a1dcc27defc 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -136,7 +136,6 @@ compactor_fast_max_compact_delete_ratio = 40 compactor_fast_max_compact_task_size = 2147483648 compactor_iter_max_io_retry_times = 8 mem_table_spill_threshold = 4194304 -memory_limit_for_behind_barrier_ratio = 70 [storage.cache.block_cache_eviction] algorithm = "Lru" diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 041bd114a8f1a..fcfcac53a478e 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -76,34 +76,24 @@ impl BufferTracker { ) -> Self { let capacity = config.shared_buffer_capacity_mb * (1 << 20); let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize; - let blocking_threshold = config.memory_limit_for_behind_barrier_ratio; assert!( flush_threshold < capacity, "flush_threshold {} should be less or equal to capacity {}", flush_threshold, capacity ); - Self::new( - capacity, - flush_threshold, - blocking_threshold, - global_upload_task_size, - ) + Self::new(capacity, flush_threshold, global_upload_task_size) } pub fn new( capacity: usize, flush_threshold: usize, - blocking_threshold: u64, global_upload_task_size: GenericGauge, ) -> Self { assert!(capacity >= flush_threshold); Self { flush_threshold, - global_buffer: Arc::new(MemoryLimiter::new_with_blocking_ratio( - capacity as u64, - blocking_threshold, - )), + global_buffer: Arc::new(MemoryLimiter::new(capacity as u64)), global_upload_task_size, } } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 21c66270dfb00..c8889a6164cb2 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -190,7 +190,6 @@ struct MemoryLimiterInner { controller: Mutex, pending_request_count: AtomicU64, quota: u64, - fast_quota: u64, } impl MemoryLimiterInner { @@ -205,7 +204,7 @@ impl MemoryLimiterInner { fn may_notify_waiters(self: &Arc) { let mut waiters = self.controller.lock(); while let Some((tx, quota)) = waiters.pop_front() { - if !self.try_require_memory_in_capacity(quota, self.quota) { + if !self.try_require_memory(quota) { waiters.push_front((tx, quota)); break; } @@ -217,13 +216,8 @@ impl MemoryLimiterInner { } fn try_require_memory(&self, quota: u64) -> bool { - self.try_require_memory_in_capacity(quota, self.quota) - } - - #[inline(always)] - fn try_require_memory_in_capacity(&self, quota: u64, capacity: u64) -> bool { let mut current_quota = self.total_size.load(AtomicOrdering::Acquire); - while self.permit_quota(current_quota, capacity) { + while self.permit_quota(current_quota, quota) { match self.total_size.compare_exchange( current_quota, current_quota + quota, @@ -245,11 +239,11 @@ impl MemoryLimiterInner { let mut waiters = self.controller.lock(); let first_req = waiters.is_empty(); if first_req { - // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory_in_capacity` again to avoid deadlock. + // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory` again to avoid deadlock. self.pending_request_count.store(1, AtomicOrdering::Release); } // We must require again with lock because some other MemoryTracker may drop just after this thread gets mutex but before it enters queue. - if self.try_require_memory_in_capacity(quota, self.quota) { + if self.try_require_memory(quota) { if first_req { self.pending_request_count.store(0, AtomicOrdering::Release); } @@ -263,8 +257,8 @@ impl MemoryLimiterInner { MemoryRequest::Pending(rc) } - fn permit_quota(&self, current_quota: u64, capacity: u64) -> bool { - current_quota <= capacity + fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool { + current_quota <= self.quota } } @@ -308,31 +302,22 @@ impl Debug for MemoryTracker { impl MemoryLimiter { pub fn unlimit() -> Arc { - Arc::new(Self::new_with_blocking_ratio(u64::MAX, 100)) + Arc::new(Self::new(u64::MAX)) } pub fn new(quota: u64) -> Self { - Self::new_with_blocking_ratio(quota, 100) - } - - pub fn new_with_blocking_ratio(quota: u64, blocking_ratio: u64) -> Self { - let main_quota = quota / 100 * blocking_ratio; Self { inner: Arc::new(MemoryLimiterInner { total_size: AtomicU64::new(0), controller: Mutex::new(VecDeque::default()), pending_request_count: AtomicU64::new(0), - fast_quota: main_quota, quota, }), } } pub fn try_require_memory(&self, quota: u64) -> Option { - if self - .inner - .try_require_memory_in_capacity(quota, self.inner.fast_quota) - { + if self.inner.try_require_memory(quota) { Some(MemoryTracker { inner: MemoryTrackerImpl::new(self.inner.clone(), quota), }) diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 0576b12ea1896..7d7452f229b43 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -78,7 +78,6 @@ pub struct StorageOpts { pub max_sub_compaction: u32, pub max_concurrent_compaction_task_number: u64, pub max_version_pinning_duration_sec: u64, - pub memory_limit_for_behind_barrier_ratio: u64, pub compactor_iter_max_io_retry_times: usize, pub data_file_cache_dir: String, @@ -262,7 +261,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt .storage .compactor_fast_max_compact_delete_ratio, compactor_fast_max_compact_task_size: c.storage.compactor_fast_max_compact_task_size, - memory_limit_for_behind_barrier_ratio: c.storage.memory_limit_for_behind_barrier_ratio, compactor_iter_max_io_retry_times: c.storage.compactor_iter_max_io_retry_times, } } From ce102f470b698664d0620f31c2094de09a3bb335 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 7 May 2024 20:32:09 +0800 Subject: [PATCH 08/14] fix ci Signed-off-by: Little-Wallace --- src/common/src/config.rs | 1 + src/storage/src/hummock/event_handler/uploader.rs | 8 ++------ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 9b75d735f333d..0868c785eedc4 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1366,6 +1366,7 @@ pub mod default { pub fn mem_table_spill_threshold() -> usize { 4 << 20 } + pub fn compactor_fast_max_compact_delete_ratio() -> u32 { 40 } diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 650670282cfea..eafa99042a506 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -1817,12 +1817,8 @@ mod tests { impl Fn(Vec) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), ) { // flush threshold is 0. Flush anyway - let buffer_tracker = BufferTracker::new( - usize::MAX, - 0, - 100, - GenericGauge::new("test", "test").unwrap(), - ); + let buffer_tracker = + BufferTracker::new(usize::MAX, 0, GenericGauge::new("test", "test").unwrap()); // (the started task send the imm ids of payload, the started task wait for finish notify) #[allow(clippy::type_complexity)] let task_notifier_holder: Arc< From b46bf76480d8ce7ea2ee495cfa35750fe3e1dcf6 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 8 May 2024 17:43:23 +0800 Subject: [PATCH 09/14] fix notify order Signed-off-by: Little-Wallace --- src/storage/src/hummock/utils.rs | 49 ++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index c8889a6164cb2..2e2e1d76f7389 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -15,14 +15,17 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::fmt::{Debug, Formatter}; +use std::future::Future; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; use std::sync::Arc; +use std::task::Poll; use std::time::Duration; use bytes::Bytes; use foyer::memory::CacheContext; +use futures::FutureExt; use parking_lot::Mutex; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::key::{ @@ -175,12 +178,40 @@ enum MemoryRequest { pub struct PendingRequestCancelGuard { inner: Option>, + rc: Receiver, } impl Drop for PendingRequestCancelGuard { fn drop(&mut self) { if let Some(limiter) = self.inner.take() { - limiter.may_notify_waiters(); + self.rc.close(); + if let Ok(msg) = self.rc.try_recv() { + drop(msg); + if limiter.pending_request_count.load(AtomicOrdering::Acquire) > 0 { + limiter.may_notify_waiters(); + } + } + } + } +} + +impl Future for PendingRequestCancelGuard { + type Output = Option; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll { + match self.rc.poll_unpin(cx) { + Poll::Ready(Ok(msg)) => { + self.inner.take(); + Poll::Ready(Some(msg)) + } + Poll::Ready(Err(_)) => { + self.inner.take(); + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, } } } @@ -203,11 +234,11 @@ impl MemoryLimiterInner { fn may_notify_waiters(self: &Arc) { let mut waiters = self.controller.lock(); - while let Some((tx, quota)) = waiters.pop_front() { - if !self.try_require_memory(quota) { - waiters.push_front((tx, quota)); + while let Some((_, quota)) = waiters.front() { + if !self.try_require_memory(*quota) { break; } + let (tx, quota) = waiters.pop_front().unwrap(); let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota)); } @@ -275,12 +306,12 @@ impl Debug for MemoryLimiter { } } -struct MemoryTrackerImpl { +pub struct MemoryTrackerImpl { limiter: Arc, quota: Option, } impl MemoryTrackerImpl { - pub fn new(limiter: Arc, quota: u64) -> Self { + fn new(limiter: Arc, quota: u64) -> Self { Self { limiter, quota: Some(quota), @@ -350,11 +381,11 @@ impl MemoryLimiter { match self.inner.require_memory(quota) { MemoryRequest::Ready(inner) => MemoryTracker { inner }, MemoryRequest::Pending(rc) => { - let mut guard = PendingRequestCancelGuard { + let guard = PendingRequestCancelGuard { inner: Some(self.inner.clone()), + rc, }; - let inner = rc.await.unwrap(); - guard.inner.take(); + let inner = guard.await.unwrap(); MemoryTracker { inner } } } From efa24f4abd97bc968a0d7e3ad8aa625342f8a4e7 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 8 May 2024 19:09:25 +0800 Subject: [PATCH 10/14] add ut Signed-off-by: Little-Wallace --- src/storage/src/hummock/utils.rs | 37 ++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 2e2e1d76f7389..0c42af9288e9f 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -697,8 +697,10 @@ pub(crate) async fn wait_for_epoch( #[cfg(test)] mod tests { use std::future::{poll_fn, Future}; + use std::sync::Arc; use std::task::Poll; + use futures::future::join_all; use futures::FutureExt; use crate::hummock::utils::MemoryLimiter; @@ -731,4 +733,39 @@ mod tests { drop(tracker3); assert_eq!(0, memory_limiter.get_memory_usage()); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_multi_thread_acquire_memory() { + const QUOTA: u64 = 15; + let memory_limiter = Arc::new(MemoryLimiter::new(100)); + let mut handles = vec![]; + for _ in 0..10 { + let limiter = memory_limiter.clone(); + let h = tokio::spawn(async move { + let mut buffers = vec![]; + for idx in 0..1000 { + if let Some(tracker) = limiter.try_require_memory(QUOTA) { + buffers.push(tracker); + } else { + buffers.clear(); + let req = limiter.require_memory(QUOTA); + match tokio::time::timeout(std::time::Duration::from_millis(1), req).await { + Ok(tracker) => { + buffers.push(tracker); + } + Err(_) => { + continue; + } + } + } + if idx % 3 == 0 { + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + } + } + }); + handles.push(h); + } + let h = join_all(handles); + let _ = h.await; + } } From df6f40d01f7b6ac538c7bbae3332fbde5662bfea Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 9 May 2024 13:41:18 +0800 Subject: [PATCH 11/14] fix test Signed-off-by: Little-Wallace --- src/storage/src/hummock/utils.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 0c42af9288e9f..a0339e37b4a0f 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -225,7 +225,7 @@ struct MemoryLimiterInner { impl MemoryLimiterInner { fn release_quota(&self, quota: u64) { - self.total_size.fetch_sub(quota, AtomicOrdering::Release); + self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst); } fn add_memory(&self, quota: u64) { @@ -702,6 +702,7 @@ mod tests { use futures::future::join_all; use futures::FutureExt; + use rand::random; use crate::hummock::utils::MemoryLimiter; @@ -734,20 +735,24 @@ mod tests { assert_eq!(0, memory_limiter.get_memory_usage()); } - #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn test_multi_thread_acquire_memory() { - const QUOTA: u64 = 15; - let memory_limiter = Arc::new(MemoryLimiter::new(100)); + const QUOTA: u64 = 10; + let memory_limiter = Arc::new(MemoryLimiter::new(200)); let mut handles = vec![]; - for _ in 0..10 { + for _ in 0..40 { let limiter = memory_limiter.clone(); let h = tokio::spawn(async move { let mut buffers = vec![]; - for idx in 0..1000 { - if let Some(tracker) = limiter.try_require_memory(QUOTA) { + let mut current_buffer_usage = (random::() % 8) + 2; + for _ in 0..1000 { + if buffers.len() < current_buffer_usage + && let Some(tracker) = limiter.try_require_memory(QUOTA) + { buffers.push(tracker); } else { buffers.clear(); + current_buffer_usage = (random::() % 8) + 2; let req = limiter.require_memory(QUOTA); match tokio::time::timeout(std::time::Duration::from_millis(1), req).await { Ok(tracker) => { @@ -758,9 +763,8 @@ mod tests { } } } - if idx % 3 == 0 { - tokio::time::sleep(std::time::Duration::from_millis(1)).await; - } + let sleep_time = random::() % 3 + 1; + tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await; } }); handles.push(h); From 11fc843914eb36801283504545aed489650e3b5b Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 10 May 2024 00:16:18 +0800 Subject: [PATCH 12/14] address comment Signed-off-by: Little-Wallace --- src/storage/src/hummock/utils.rs | 39 +++++++++++++++----------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index a0339e37b4a0f..bd8c5a9f36aec 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -178,18 +178,19 @@ enum MemoryRequest { pub struct PendingRequestCancelGuard { inner: Option>, - rc: Receiver, + rx: Receiver, } impl Drop for PendingRequestCancelGuard { fn drop(&mut self) { if let Some(limiter) = self.inner.take() { - self.rc.close(); - if let Ok(msg) = self.rc.try_recv() { + // close rc before calling `try_recv`, it will make `MemoryTrackerImpl` which generates after dropping of `PendingRequestCancelGuard` drop in loop of `may_notify_waiters` in other thread. + // If `MemoryTrackerImpl` send before this thread calling `close`, it can still be received by this thread. Once this thread receives the message, it need drop the message and update `total_size` in `MemoryTrackerImpl`'s drop. + self.rx.close(); + if let Ok(msg) = self.rx.try_recv() { drop(msg); - if limiter.pending_request_count.load(AtomicOrdering::Acquire) > 0 { - limiter.may_notify_waiters(); - } + // every time `MemoryTrackerImpl` drop, it will update `total_size` and we need to check whether there exist waiters to be notified. + limiter.may_notify_waiters(); } } } @@ -202,7 +203,7 @@ impl Future for PendingRequestCancelGuard { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll { - match self.rc.poll_unpin(cx) { + match self.rx.poll_unpin(cx) { Poll::Ready(Ok(msg)) => { self.inner.take(); Poll::Ready(Some(msg)) @@ -233,6 +234,10 @@ impl MemoryLimiterInner { } fn may_notify_waiters(self: &Arc) { + if self.pending_request_count.load(AtomicOrdering::Acquire) == 0 { + return; + } + // check `inflight_barrier` to avoid access lock every times drop `MemoryTracker`. let mut waiters = self.controller.lock(); while let Some((_, quota)) = waiters.front() { if !self.try_require_memory(*quota) { @@ -280,12 +285,12 @@ impl MemoryLimiterInner { } return MemoryRequest::Ready(MemoryTrackerImpl::new(self.clone(), quota)); } - let (tx, rc) = channel(); + let (tx, rx) = channel(); waiters.push_back((tx, quota)); // This variable can only update within lock. self.pending_request_count .store(waiters.len() as u64, AtomicOrdering::Release); - MemoryRequest::Pending(rc) + MemoryRequest::Pending(rx) } fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool { @@ -380,11 +385,12 @@ impl MemoryLimiter { pub async fn require_memory(&self, quota: u64) -> MemoryTracker { match self.inner.require_memory(quota) { MemoryRequest::Ready(inner) => MemoryTracker { inner }, - MemoryRequest::Pending(rc) => { + MemoryRequest::Pending(rx) => { let guard = PendingRequestCancelGuard { inner: Some(self.inner.clone()), - rc, + rx, }; + // We will never clear an exist `require_memory` request. Every request will be return in some time unless it is canceled. So it is safe to await unwrap here. let inner = guard.await.unwrap(); MemoryTracker { inner } } @@ -420,16 +426,7 @@ impl Drop for MemoryTracker { fn drop(&mut self) { if let Some(quota) = self.inner.quota.take() { self.inner.limiter.release_quota(quota); - // check `inflight_barrier` to avoid access lock every times drop `MemoryTracker`. - if self - .inner - .limiter - .pending_request_count - .load(AtomicOrdering::Acquire) - > 0 - { - self.inner.limiter.may_notify_waiters(); - } + self.inner.limiter.may_notify_waiters(); } } } From e33824fa76db434c902577542fef0165b242815f Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 10 May 2024 19:28:51 +0800 Subject: [PATCH 13/14] address comment Signed-off-by: Little-Wallace --- src/storage/src/hummock/utils.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index bd8c5a9f36aec..e2bf6de55700c 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -18,7 +18,7 @@ use std::fmt::{Debug, Formatter}; use std::future::Future; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; -use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}; use std::sync::Arc; use std::task::Poll; use std::time::Duration; @@ -220,7 +220,7 @@ impl Future for PendingRequestCancelGuard { struct MemoryLimiterInner { total_size: AtomicU64, controller: Mutex, - pending_request_count: AtomicU64, + has_waiter: AtomicBool, quota: u64, } @@ -234,10 +234,10 @@ impl MemoryLimiterInner { } fn may_notify_waiters(self: &Arc) { - if self.pending_request_count.load(AtomicOrdering::Acquire) == 0 { + // check `has_waiter` to avoid access lock every times drop `MemoryTracker`. + if !self.has_waiter.load(AtomicOrdering::Acquire) { return; } - // check `inflight_barrier` to avoid access lock every times drop `MemoryTracker`. let mut waiters = self.controller.lock(); while let Some((_, quota)) = waiters.front() { if !self.try_require_memory(*quota) { @@ -247,8 +247,9 @@ impl MemoryLimiterInner { let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota)); } - self.pending_request_count - .store(waiters.len() as u64, AtomicOrdering::Release); + if waiters.is_empty() { + self.has_waiter.store(false, AtomicOrdering::Release); + } } fn try_require_memory(&self, quota: u64) -> bool { @@ -275,21 +276,18 @@ impl MemoryLimiterInner { let mut waiters = self.controller.lock(); let first_req = waiters.is_empty(); if first_req { - // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory` again to avoid deadlock. - self.pending_request_count.store(1, AtomicOrdering::Release); + // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `has_waiter` and found it was false. So we must set it one and retry `try_require_memory` again to avoid deadlock. + self.has_waiter.store(true, AtomicOrdering::Release); } // We must require again with lock because some other MemoryTracker may drop just after this thread gets mutex but before it enters queue. if self.try_require_memory(quota) { if first_req { - self.pending_request_count.store(0, AtomicOrdering::Release); + self.has_waiter.store(false, AtomicOrdering::Release); } return MemoryRequest::Ready(MemoryTrackerImpl::new(self.clone(), quota)); } let (tx, rx) = channel(); waiters.push_back((tx, quota)); - // This variable can only update within lock. - self.pending_request_count - .store(waiters.len() as u64, AtomicOrdering::Release); MemoryRequest::Pending(rx) } @@ -346,7 +344,7 @@ impl MemoryLimiter { inner: Arc::new(MemoryLimiterInner { total_size: AtomicU64::new(0), controller: Mutex::new(VecDeque::default()), - pending_request_count: AtomicU64::new(0), + has_waiter: AtomicBool::new(false), quota, }), } From 22188ed82e57b8d17a5373889d85c58da1feccc3 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 14 May 2024 11:31:10 +0800 Subject: [PATCH 14/14] refactor Signed-off-by: Little-Wallace --- src/storage/src/hummock/utils.rs | 125 ++++++++----------------------- 1 file changed, 32 insertions(+), 93 deletions(-) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index e2bf6de55700c..ba689e7cac711 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -15,17 +15,14 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::fmt::{Debug, Formatter}; -use std::future::Future; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}; use std::sync::Arc; -use std::task::Poll; use std::time::Duration; use bytes::Bytes; use foyer::memory::CacheContext; -use futures::FutureExt; use parking_lot::Mutex; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::key::{ @@ -170,51 +167,10 @@ pub fn prune_nonoverlapping_ssts<'a>( ssts[start_table_idx..=end_table_idx].iter() } -type RequestQueue = VecDeque<(Sender, u64)>; +type RequestQueue = VecDeque<(Sender, u64)>; enum MemoryRequest { - Ready(MemoryTrackerImpl), - Pending(Receiver), -} - -pub struct PendingRequestCancelGuard { - inner: Option>, - rx: Receiver, -} - -impl Drop for PendingRequestCancelGuard { - fn drop(&mut self) { - if let Some(limiter) = self.inner.take() { - // close rc before calling `try_recv`, it will make `MemoryTrackerImpl` which generates after dropping of `PendingRequestCancelGuard` drop in loop of `may_notify_waiters` in other thread. - // If `MemoryTrackerImpl` send before this thread calling `close`, it can still be received by this thread. Once this thread receives the message, it need drop the message and update `total_size` in `MemoryTrackerImpl`'s drop. - self.rx.close(); - if let Ok(msg) = self.rx.try_recv() { - drop(msg); - // every time `MemoryTrackerImpl` drop, it will update `total_size` and we need to check whether there exist waiters to be notified. - limiter.may_notify_waiters(); - } - } - } -} - -impl Future for PendingRequestCancelGuard { - type Output = Option; - - fn poll( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll { - match self.rx.poll_unpin(cx) { - Poll::Ready(Ok(msg)) => { - self.inner.take(); - Poll::Ready(Some(msg)) - } - Poll::Ready(Err(_)) => { - self.inner.take(); - Poll::Ready(None) - } - Poll::Pending => Poll::Pending, - } - } + Ready(MemoryTracker), + Pending(Receiver), } struct MemoryLimiterInner { @@ -238,17 +194,24 @@ impl MemoryLimiterInner { if !self.has_waiter.load(AtomicOrdering::Acquire) { return; } - let mut waiters = self.controller.lock(); - while let Some((_, quota)) = waiters.front() { - if !self.try_require_memory(*quota) { - break; + let mut notify_waiters = vec![]; + { + let mut waiters = self.controller.lock(); + while let Some((_, quota)) = waiters.front() { + if !self.try_require_memory(*quota) { + break; + } + let (tx, quota) = waiters.pop_front().unwrap(); + notify_waiters.push((tx, quota)); + } + + if waiters.is_empty() { + self.has_waiter.store(false, AtomicOrdering::Release); } - let (tx, quota) = waiters.pop_front().unwrap(); - let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota)); } - if waiters.is_empty() { - self.has_waiter.store(false, AtomicOrdering::Release); + for (tx, quota) in notify_waiters { + let _ = tx.send(MemoryTracker::new(self.clone(), quota)); } } @@ -284,7 +247,7 @@ impl MemoryLimiterInner { if first_req { self.has_waiter.store(false, AtomicOrdering::Release); } - return MemoryRequest::Ready(MemoryTrackerImpl::new(self.clone(), quota)); + return MemoryRequest::Ready(MemoryTracker::new(self.clone(), quota)); } let (tx, rx) = channel(); waiters.push_back((tx, quota)); @@ -309,11 +272,11 @@ impl Debug for MemoryLimiter { } } -pub struct MemoryTrackerImpl { +pub struct MemoryTracker { limiter: Arc, quota: Option, } -impl MemoryTrackerImpl { +impl MemoryTracker { fn new(limiter: Arc, quota: u64) -> Self { Self { limiter, @@ -322,14 +285,10 @@ impl MemoryTrackerImpl { } } -pub struct MemoryTracker { - inner: MemoryTrackerImpl, -} - impl Debug for MemoryTracker { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("MemoryTracker") - .field("quota", &self.inner.quota) + .field("quota", &self.quota) .finish() } } @@ -352,9 +311,7 @@ impl MemoryLimiter { pub fn try_require_memory(&self, quota: u64) -> Option { if self.inner.try_require_memory(quota) { - Some(MemoryTracker { - inner: MemoryTrackerImpl::new(self.inner.clone(), quota), - }) + Some(MemoryTracker::new(self.inner.clone(), quota)) } else { None } @@ -373,37 +330,27 @@ impl MemoryLimiter { self.inner.add_memory(quota); } - MemoryTracker { - inner: MemoryTrackerImpl::new(self.inner.clone(), quota), - } + MemoryTracker::new(self.inner.clone(), quota) } } impl MemoryLimiter { pub async fn require_memory(&self, quota: u64) -> MemoryTracker { match self.inner.require_memory(quota) { - MemoryRequest::Ready(inner) => MemoryTracker { inner }, - MemoryRequest::Pending(rx) => { - let guard = PendingRequestCancelGuard { - inner: Some(self.inner.clone()), - rx, - }; - // We will never clear an exist `require_memory` request. Every request will be return in some time unless it is canceled. So it is safe to await unwrap here. - let inner = guard.await.unwrap(); - MemoryTracker { inner } - } + MemoryRequest::Ready(tracker) => tracker, + MemoryRequest::Pending(rx) => rx.await.unwrap(), } } } impl MemoryTracker { pub fn try_increase_memory(&mut self, target: u64) -> bool { - let quota = self.inner.quota.unwrap(); + let quota = self.quota.unwrap(); if quota >= target { return true; } - if self.inner.limiter.try_require_memory(target - quota) { - self.inner.quota = Some(target); + if self.limiter.try_require_memory(target - quota) { + self.quota = Some(target); true } else { false @@ -411,20 +358,12 @@ impl MemoryTracker { } } -impl Drop for MemoryTrackerImpl { +// We must notify waiters outside `MemoryTracker` to avoid dead-lock and loop-owner. +impl Drop for MemoryTracker { fn drop(&mut self) { if let Some(quota) = self.quota.take() { self.limiter.release_quota(quota); - } - } -} - -// We must notify waiters outside `MemoryTrackerImpl` to avoid dead-lock and loop-owner. -impl Drop for MemoryTracker { - fn drop(&mut self) { - if let Some(quota) = self.inner.quota.take() { - self.inner.limiter.release_quota(quota); - self.inner.limiter.may_notify_waiters(); + self.limiter.may_notify_waiters(); } } }