-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): wake up memory acquire request in order #15921
Conversation
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current memory limiter is very similar to tokio::sync::Semaphore. Maybe we can try borrowing some implementation details from it.
src/storage/src/hummock/utils.rs
Outdated
// When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory_in_capacity` again to avoid deadlock. | ||
self.pending_request_count.store(1, AtomicOrdering::Release); | ||
if self.try_require_memory_in_capacity(quota, self.fast_quota) { | ||
self.pending_request_count.store(0, AtomicOrdering::Release); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this fast path code self.try_require_memory_in_capacity(quota, self.fast_quota)
to before acquiring the lock? In this way, if we successfully acquire the quota via CAS, we can avoid locking and writing the atomic variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is always called before try_require_memory
, it means that has a failed try by CAS without lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We must try to acquire with lock, because if there is no tracker hold by other threads, we can not notify this waiter forever.
src/storage/src/hummock/utils.rs
Outdated
self.inner.limiter.release_quota(quota); | ||
} | ||
// check `inflight_barrier` to avoid access lock every times drop `MemoryTracker`. | ||
if self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can optimize the case when there are concurrent drops in multiple threads.
A rough idea will be, each thread will try to set the pending_request_count
to 0 via CAS in a loop, and the only one that successfully sets it to 0 will do the notification.
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
616e2aa
to
ce102f4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have some unit tests to test the correctness and performance? In unit test we can launch a multi-threaded tokio runtime and spawn many tasks to keep doing require_memory
, sleep
for a while and then drop the tracker, in this way we may find out some uncovered corner case.
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
Signed-off-by: Little-Wallace <[email protected]>
src/storage/src/hummock/utils.rs
Outdated
let first_req = waiters.is_empty(); | ||
if first_req { | ||
// When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory` again to avoid deadlock. | ||
self.pending_request_count.store(1, AtomicOrdering::Release); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is simpler and easier to understand if we make this an atomic boolean and rename it to has_waiters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Signed-off-by: Little-Wallace <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
let mut waiters = self.controller.lock(); | ||
let first_req = waiters.is_empty(); | ||
if first_req { | ||
// When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `has_waiter` and found it was false. So we must set it one and retry `try_require_memory` again to avoid deadlock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: “set it one”
src/storage/src/hummock/utils.rs
Outdated
break; | ||
} | ||
let (tx, quota) = waiters.pop_front().unwrap(); | ||
let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MemoryTrackerImpl
and the newly added PendingRequestCancelGuard
seem to be an over-design for the corner case mentioned above. Let's analyze all the cases when quota is acquired:
- When the quota is acquired via
MemoryRequest::Ready
(L287), we need to release the quota and notify waiters. - When the quota is acquired here in
may_notify_waiters
in L243, there are two sub-cases:
a.tx.send
here succeeds, regardless of whether the rx is dropped or not, we need to release quota and notify waiter.
b.tx.send
here fails (due to rx has been dropped before tx.send). we only need to release quota but not notify watier (Otherwise, it will deadlock).
Therefore, the implementation can be greatly simplified without introducing MemoryTrackerImpl
and PendingRequestCancelGuard
.
// MemoryTrackerImpl is not needed.
struct MemoryTracker {
limiter: Arc<MemoryLimiterInner>,
quota: Option<u64>,
}
// MemoryTracker::drop will release quota + notifier waiters if quota is not None (1 + 2.a)
impl Drop for MemoryTracker {
fn drop(&mut self) {
if let Some(quota) = self.quota.take() {
self.limiter.release_quota(quota);
self.limiter.may_notify_waiters();
}
}
}
// Add a new method to releae quota without notify waiters.
impl MemoryTracker {
fn release_quota(self) {
if let Some(quota) = self.quota.take() {
self.limiter.release_quota(quota);
}
}
}
...
// MemoryRequest takes MemoryTracker
enum MemoryRequest {
Ready(MemoryTracker),
Pending(Receiver<MemoryTracker>),
}
...
impl MemoryLimiterInner {
...
fn may_notify_waiters(self: &Arc<Self>) {
...
let mut waiters = self.controller.lock();
while let Some((_, quota)) = waiters.front() {
if !self.try_require_memory(*quota) {
break;
}
...
if let Err(SendError(tracker)) = tx.send(MemoryTracker::new(self.clone(), quota)) {
// 2.b
tracker.release_quota(quota);
}
}
...
}
...
impl MemoryLimiter {
pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
// PendingRequestCancelGuard is not needed
match self.inner.require_memory(quota) {
MemoryRequest::Ready(tracker) => tracker,
MemoryRequest::Pending(rx) => rx.await
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can not send MemoryTracker
with in lock of controller
because if it drops in this method, it may call may_notify_waiters
again, which would acquire one lock twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We met this bug in #6634
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If tokio-rs/tokio#6558 looks good from the tokio community, we can have a patch on the tokio version we are using, and then we can adopt the code in this comment.
Signed-off-by: Little-Wallace <[email protected]>
57d2a3f
to
22188ed
Compare
I found loom, an interesting framework to test concurrent utilities. It provides mocked atomic variable and mutex, and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
true | ||
} else { | ||
false | ||
} | ||
} | ||
} | ||
|
||
// We must notify waiters outside `MemoryTracker` to avoid dead-lock and loop-owner. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: doc
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
close #15786
See design details in issue
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.