Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): wake up memory acquire request in order #15921

Merged
merged 17 commits into from
May 15, 2024

Conversation

Little-Wallace
Copy link
Contributor

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

close #15786
See design details in issue

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Signed-off-by: Little-Wallace <[email protected]>
@Little-Wallace Little-Wallace marked this pull request as ready for review March 29, 2024 13:11
@wenym1 wenym1 requested review from hzxa21, Li0k and wenym1 May 6, 2024 10:06
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current memory limiter is very similar to tokio::sync::Semaphore. Maybe we can try borrowing some implementation details from it.

// When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory_in_capacity` again to avoid deadlock.
self.pending_request_count.store(1, AtomicOrdering::Release);
if self.try_require_memory_in_capacity(quota, self.fast_quota) {
self.pending_request_count.store(0, AtomicOrdering::Release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this fast path code self.try_require_memory_in_capacity(quota, self.fast_quota) to before acquiring the lock? In this way, if we successfully acquire the quota via CAS, we can avoid locking and writing the atomic variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is always called before try_require_memory, it means that has a failed try by CAS without lock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must try to acquire with lock, because if there is no tracker hold by other threads, we can not notify this waiter forever.

src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
self.inner.limiter.release_quota(quota);
}
// check `inflight_barrier` to avoid access lock every times drop `MemoryTracker`.
if self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can optimize the case when there are concurrent drops in multiple threads.

A rough idea will be, each thread will try to set the pending_request_count to 0 via CAS in a loop, and the only one that successfully sets it to 0 will do the notification.

src/common/src/config.rs Outdated Show resolved Hide resolved
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
@Little-Wallace Little-Wallace changed the title feat(storage): allocate quota in barrier order feat(storage): wake up memory acquire request in order May 8, 2024
@Little-Wallace Little-Wallace force-pushed the wallace/memory-limit-barrier branch from 616e2aa to ce102f4 Compare May 8, 2024 06:27
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have some unit tests to test the correctness and performance? In unit test we can launch a multi-threaded tokio runtime and spawn many tasks to keep doing require_memory, sleep for a while and then drop the tracker, in this way we may find out some uncovered corner case.

src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
Signed-off-by: Little-Wallace <[email protected]>
@wenym1 wenym1 requested a review from MrCroxx May 9, 2024 08:58
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
Signed-off-by: Little-Wallace <[email protected]>
let first_req = waiters.is_empty();
if first_req {
// When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `inflight_barrier` and found it was zero. So we must set it one and retry `try_require_memory` again to avoid deadlock.
self.pending_request_count.store(1, AtomicOrdering::Release);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is simpler and easier to understand if we make this an atomic boolean and rename it to has_waiters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
Signed-off-by: Little-Wallace <[email protected]>
Copy link
Contributor

@MrCroxx MrCroxx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

src/storage/src/hummock/utils.rs Outdated Show resolved Hide resolved
let mut waiters = self.controller.lock();
let first_req = waiters.is_empty();
if first_req {
// When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `has_waiter` and found it was false. So we must set it one and retry `try_require_memory` again to avoid deadlock.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: “set it one”

break;
}
let (tx, quota) = waiters.pop_front().unwrap();
let _ = tx.send(MemoryTrackerImpl::new(self.clone(), quota));
Copy link
Collaborator

@hzxa21 hzxa21 May 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MemoryTrackerImpl and the newly added PendingRequestCancelGuard seem to be an over-design for the corner case mentioned above. Let's analyze all the cases when quota is acquired:

  1. When the quota is acquired via MemoryRequest::Ready (L287), we need to release the quota and notify waiters.
  2. When the quota is acquired here in may_notify_waiters in L243, there are two sub-cases:
    a. tx.send here succeeds, regardless of whether the rx is dropped or not, we need to release quota and notify waiter.
    b. tx.send here fails (due to rx has been dropped before tx.send). we only need to release quota but not notify watier (Otherwise, it will deadlock).

Therefore, the implementation can be greatly simplified without introducing MemoryTrackerImpl and PendingRequestCancelGuard.

// MemoryTrackerImpl is not needed.
struct MemoryTracker {
    limiter: Arc<MemoryLimiterInner>,
    quota: Option<u64>,
}

// MemoryTracker::drop will release quota + notifier waiters if quota is not None (1 + 2.a)
impl Drop for MemoryTracker {
    fn drop(&mut self) {
        if let Some(quota) = self.quota.take() {
            self.limiter.release_quota(quota);
            self.limiter.may_notify_waiters();
        }
    }
}

// Add a new method to releae quota without notify waiters.
impl MemoryTracker {
  fn release_quota(self) {
      if let Some(quota) = self.quota.take() {
            self.limiter.release_quota(quota);
      }
  }
}

...

// MemoryRequest takes MemoryTracker
enum MemoryRequest {
    Ready(MemoryTracker),
    Pending(Receiver<MemoryTracker>),
}

...

impl MemoryLimiterInner {
    ...
    fn may_notify_waiters(self: &Arc<Self>) {
        ...
        let mut waiters = self.controller.lock();
        while let Some((_, quota)) = waiters.front() {
            if !self.try_require_memory(*quota) {
                break;
            }
            ...
            if let Err(SendError(tracker)) = tx.send(MemoryTracker::new(self.clone(), quota)) {
                // 2.b
                tracker.release_quota(quota);
            }
    }
    ...
}

...

impl MemoryLimiter {
    pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
        // PendingRequestCancelGuard is not needed
        match self.inner.require_memory(quota) {
            MemoryRequest::Ready(tracker) => tracker,
            MemoryRequest::Pending(rx) => rx.await
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not send MemoryTracker with in lock of controller because if it drops in this method, it may call may_notify_waiters again, which would acquire one lock twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We met this bug in #6634

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tokio-rs/tokio#6558 looks good from the tokio community, we can have a patch on the tokio version we are using, and then we can adopt the code in this comment.

Signed-off-by: Little-Wallace <[email protected]>
@Little-Wallace Little-Wallace force-pushed the wallace/memory-limit-barrier branch from 57d2a3f to 22188ed Compare May 14, 2024 03:34
@wenym1
Copy link
Contributor

wenym1 commented May 15, 2024

I found loom, an interesting framework to test concurrent utilities. It provides mocked atomic variable and mutex, and block_on to support future.await, which covers all functionalities we use to sync between multiple parallelisms in our memory limiter. Shall we have some test written with the loom framework? It will be really helpful to prove the correctness of our implementation. You may refer to the loom test written for tokio::sync.

Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Little-Wallace Little-Wallace added this pull request to the merge queue May 15, 2024
Copy link
Contributor

@Li0k Li0k left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

true
} else {
false
}
}
}

// We must notify waiters outside `MemoryTracker` to avoid dead-lock and loop-owner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: doc

Merged via the queue into main with commit 44e711c May 15, 2024
27 of 28 checks passed
@Little-Wallace Little-Wallace deleted the wallace/memory-limit-barrier branch May 15, 2024 08:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: allocate memory quota for SharedBufferBatch in barrier order
5 participants