Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(memory): use thread-local sequence-based memory eviction policy #16087

Merged
merged 31 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9eb816a
perf(memory): use thread-local squence-based memory eviction policy
MrCroxx Apr 2, 2024
d998a13
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 2, 2024
b87e911
test(bench): add sequencer benchmark
MrCroxx Apr 2, 2024
f9678c3
fix: fix license header
MrCroxx Apr 2, 2024
3959a16
fix: do not init sequence when insert lru
MrCroxx Apr 2, 2024
1ba5fd6
perf: add lru bench
MrCroxx Apr 2, 2024
6483b58
fix: clear lru cache after drop
MrCroxx Apr 2, 2024
7a9a7c8
refactor: simplify clear
MrCroxx Apr 2, 2024
f76da99
fix: drop inited field when clear
MrCroxx Apr 3, 2024
f12c141
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 3, 2024
8eccfee
refactor: update metrics in rw
MrCroxx Apr 3, 2024
594111d
chore: update grafana
MrCroxx Apr 3, 2024
dc0737a
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 7, 2024
5d98779
refactor: make sequencer args configurable
MrCroxx Apr 7, 2024
f77d980
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 8, 2024
dce4999
chore: tiny refactors
MrCroxx Apr 8, 2024
7ebc5d3
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 16, 2024
2339eee
chore: make clippy happier
MrCroxx Apr 16, 2024
ebc27aa
fix: enable unstabl feature
MrCroxx Apr 16, 2024
d19b1ae
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 22, 2024
8bc7ee1
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx May 10, 2024
8e6140c
Merge branch 'main' into xx/thread-local-sequence
MrCroxx May 13, 2024
dc43b6a
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx May 23, 2024
6ae11d5
chore: fill rust docs for Sequencer
MrCroxx May 23, 2024
c934b8e
chore: refine docs for controller
MrCroxx May 23, 2024
3dfd2d6
fix: fix bench build
MrCroxx May 23, 2024
207df01
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx May 23, 2024
7b68659
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx May 24, 2024
93858bb
fix: resolve grafana build
MrCroxx May 27, 2024
3d5917c
refactor: remove `update_epoch`
MrCroxx May 27, 2024
a46061b
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx May 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

16 changes: 10 additions & 6 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3698,21 +3698,25 @@ def section_memory_manager(outer_panels):
],
),
panels.timeseries_count(
MrCroxx marked this conversation as resolved.
Show resolved Hide resolved
"LRU manager watermark steps",
"LRU manager eviction policy",
"",
[
panels.target(
f"{metric('lru_watermark_step')}",
f"{metric('lru_eviction_policy')}",
"",
),
],
),
panels.timeseries_ms(
"LRU manager diff between watermark_time and now (ms)",
"watermark_time is the current lower watermark of cached data. physical_now is the current time of the machine. The diff (physical_now - watermark_time) shows how much data is cached.",
panels.timeseries_count(
MrCroxx marked this conversation as resolved.
Show resolved Hide resolved
"LRU manager sequence",
"",
[
panels.target(
f"{metric('lru_physical_now_ms')} - {metric('lru_current_watermark_time_ms')}",
f"{metric('lru_latest_sequence')}",
"",
),
panels.target(
f"{metric('lru_watermark_sequence')}",
"",
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions src/common/Cargo.toml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I heard that some stateless queries in NexMark were negatively affected by this PR for some "unknown" cause. Have we found the reason now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not. But the regression hasn't appear these weeks.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
ahash = "0.8"
anyhow = "1"
arc-swap = "1"
arrow-array = { workspace = true }
Expand Down Expand Up @@ -48,6 +49,7 @@ fixedbitset = "0.5"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
hashbrown = "0.14"
hex = "0.4.3"
http = "0.2"
humantime = "2.1"
Expand Down Expand Up @@ -135,6 +137,7 @@ libc = "0.2"
mach2 = "0.4"

[dev-dependencies]
coarsetime = "0.1"
criterion = { workspace = true }
expect-test = "1"
more-asserts = "0.3"
Expand Down Expand Up @@ -167,5 +170,13 @@ harness = false
name = "bench_array"
harness = false

[[bench]]
name = "bench_sequencer"
harness = false

[[bench]]
name = "bench_lru"
harness = false

[lints]
workspace = true
88 changes: 88 additions & 0 deletions src/common/benches/bench_lru.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hint::black_box;
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};

use itertools::Itertools;
use lru::LruCache;
use risingwave_common::lru::LruCache as RwLruCache;
use risingwave_common::sequence::SEQUENCE_GLOBAL;

fn lru(loops: usize, evict_ratio: u64) -> (usize, Duration) {
let mut lru = LruCache::unbounded();
let mut evicted = 0;
let now = Instant::now();
for i in 0..loops as u64 {
if i % evict_ratio == 0 && i != 0 {
lru.update_epoch(i);
while lru.pop_lru_by_epoch(i).is_some() {
evicted += 1;
}
}
lru.put(i, i);
}

(evicted, now.elapsed())
}

fn rw_lru(loops: usize, evict_ratio: u64) -> (usize, Duration) {
let mut lru = RwLruCache::unbounded();
let mut evicted = 0;
let now = Instant::now();
for i in 0..loops as u64 {
if i % evict_ratio == 0 {
let sequence = SEQUENCE_GLOBAL.load(Ordering::Relaxed);
while lru.pop_with_sequence(sequence).is_some() {
evicted += 1;
}
}
lru.put(i, i);
}

(evicted, now.elapsed())
}

fn benchmark<F>(name: &str, threads: usize, loops: usize, f: F)
where
F: Fn() -> (usize, Duration) + Clone + Send + 'static,
{
let handles = (0..threads)
.map(|_| std::thread::spawn(black_box(f.clone())))
.collect_vec();
let mut dur = Duration::from_nanos(0);
let mut evicted = 0;
for handle in handles {
let (e, d) = handle.join().unwrap();
evicted += e;
dur += d;
}
println!(
"{:20} {} threads {} loops: {:?} per iter, total evicted: {}",
name,
threads,
loops,
Duration::from_nanos((dur.as_nanos() / threads as u128 / loops as u128) as u64),
evicted,
);
}

fn main() {
for threads in [1, 4, 8, 16, 32, 64] {
println!();
benchmark("lru - 1024", threads, 1000000, || lru(1000000, 1024));
benchmark("rw - 1024", threads, 1000000, || rw_lru(1000000, 1024));
}
}
170 changes: 170 additions & 0 deletions src/common/benches/bench_sequencer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]

use std::cell::RefCell;
MrCroxx marked this conversation as resolved.
Show resolved Hide resolved
use std::hint::black_box;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

use itertools::Itertools;
use risingwave_common::sequence::*;

thread_local! {
pub static SEQUENCER_64_8: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(64, 64 * 8)) };
pub static SEQUENCER_64_16: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(64, 64 * 16)) };
pub static SEQUENCER_64_32: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(64, 64 * 32)) };
pub static SEQUENCER_128_8: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(128, 128 * 8)) };
pub static SEQUENCER_128_16: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(128, 128 * 16)) };
pub static SEQUENCER_128_32: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(128, 128 * 32)) };
}

fn coarse(loops: usize) -> Duration {
let now = Instant::now();
for _ in 0..loops {
let _ = coarsetime::Instant::now();
}
now.elapsed()
}

#[expect(clippy::explicit_counter_loop)]
fn primitive(loops: usize) -> Duration {
let mut cnt = 0usize;
let now = Instant::now();
for _ in 0..loops {
cnt += 1;
let _ = cnt;
}
now.elapsed()
}

fn atomic(loops: usize, atomic: Arc<AtomicUsize>) -> Duration {
let now = Instant::now();
for _ in 0..loops {
let _ = atomic.fetch_add(1, Ordering::Relaxed);
}
now.elapsed()
}

fn atomic_skip(loops: usize, atomic: Arc<AtomicUsize>, skip: usize) -> Duration {
let mut cnt = 0usize;
let now = Instant::now();
for _ in 0..loops {
cnt += 1;
let _ = cnt;
if cnt % skip == 0 {
let _ = atomic.fetch_add(skip, Ordering::Relaxed);
} else {
let _ = atomic.load(Ordering::Relaxed);
}
}
now.elapsed()
}

fn sequencer(loops: usize, step: Sequence, lag_amp: Sequence) -> Duration {
let sequencer = match (step, lag_amp) {
(64, 8) => &SEQUENCER_64_8,
(64, 16) => &SEQUENCER_64_16,
(64, 32) => &SEQUENCER_64_32,
(128, 8) => &SEQUENCER_128_8,
(128, 16) => &SEQUENCER_128_16,
(128, 32) => &SEQUENCER_128_32,
_ => unimplemented!(),
};
let now = Instant::now();
for _ in 0..loops {
let _ = sequencer.with(|s| s.borrow_mut().alloc());
}
now.elapsed()
}

fn benchmark<F>(name: &str, threads: usize, loops: usize, f: F)
where
F: Fn() -> Duration + Clone + Send + 'static,
{
let handles = (0..threads)
.map(|_| std::thread::spawn(black_box(f.clone())))
.collect_vec();
let mut dur = Duration::from_nanos(0);
for handle in handles {
dur += handle.join().unwrap();
}
println!(
"{:20} {} threads {} loops: {:?} per iter",
name,
threads,
loops,
Duration::from_nanos((dur.as_nanos() / threads as u128 / loops as u128) as u64)
);
}

fn main() {
for (threads, loops) in [
(1, 10_000_000),
(4, 10_000_000),
(8, 10_000_000),
(16, 10_000_000),
(32, 10_000_000),
] {
println!();

benchmark("primitive", threads, loops, move || primitive(loops));

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic", threads, loops, move || atomic(loops, a.clone()));

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 8", threads, loops, move || {
atomic_skip(loops, a.clone(), 8)
});

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 16", threads, loops, move || {
atomic_skip(loops, a.clone(), 16)
});

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 32", threads, loops, move || {
atomic_skip(loops, a.clone(), 32)
});

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 64", threads, loops, move || {
atomic_skip(loops, a.clone(), 64)
});

benchmark("sequencer(64,8)", threads, loops, move || {
sequencer(loops, 64, 8)
});
benchmark("sequencer(64,16)", threads, loops, move || {
sequencer(loops, 64, 16)
});
benchmark("sequencer(64,32)", threads, loops, move || {
sequencer(loops, 64, 32)
});
benchmark("sequencer(128,8)", threads, loops, move || {
sequencer(loops, 128, 8)
});
benchmark("sequencer(128,16)", threads, loops, move || {
sequencer(loops, 128, 16)
});
benchmark("sequencer(128,32)", threads, loops, move || {
sequencer(loops, 128, 32)
});

benchmark("coarse", threads, loops, move || coarse(loops));
}
}
Loading
Loading