Skip to content

Commit

Permalink
Protect against CPU leaser leaks (#8190)
Browse files Browse the repository at this point in the history
Protect against leaser leaks by warning when they happen and
automatically dropping leases if there are more than 1000 active at a
time.

Also restructure the internal lease implementation to track each lease
separately from the load on CPUs. This allows for warning about the
location of leaks more easily.
  • Loading branch information
tylerwilliams authored Jan 15, 2025
1 parent 1a6f978 commit 7020e60
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ func getTestEnv(ctx context.Context, t *testing.T, opts envOpts) *testenv.TestEn
require.NoError(t, err)
env.SetCPULeaser(leaser)
flags.Set(t, "executor.cpu_leaser.enable", true)
t.Cleanup(func() {
orphanedLeases := leaser.TestOnlyGetOpenLeases()
require.Equal(t, 0, len(orphanedLeases))
})

return env
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func installLeaserInEnv(t testing.TB, env *real_environment.RealEnv) {
require.NoError(t, err)
env.SetCPULeaser(leaser)
flags.Set(t, "executor.cpu_leaser.enable", true)

t.Cleanup(func() {
orphanedLeases := leaser.TestOnlyGetOpenLeases()
require.Equal(t, 0, len(orphanedLeases))
})
}

func TestRun(t *testing.T) {
Expand Down Expand Up @@ -628,6 +633,11 @@ func TestCreateFailureHasStderr(t *testing.T) {
ContainerImage: image,
},
})
t.Cleanup(func() {
err := c.Remove(ctx)
require.NoError(t, err)
})

require.NoError(t, err)
err = c.Create(ctx, wd+"nonexistent")
require.ErrorContains(t, err, "nonexistent")
Expand Down Expand Up @@ -658,6 +668,11 @@ func TestDevices(t *testing.T) {
ContainerImage: image,
},
})

t.Cleanup(func() {
err := c.Remove(ctx)
require.NoError(t, err)
})
require.NoError(t, err)
res := c.Run(ctx, &repb.Command{
Arguments: []string{"sh", "-e", "-c", `
Expand Down
1 change: 1 addition & 0 deletions enterprise/server/util/cpuset/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//server/interfaces",
"//server/util/alert",
"//server/util/flag",
"//server/util/log",
"//server/util/priority_queue",
Expand Down
115 changes: 93 additions & 22 deletions enterprise/server/util/cpuset/cpuset.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package cpuset
import (
"fmt"
"math"
"runtime"
"strconv"
"strings"
"sync"

"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/util/alert"
"github.com/buildbuddy-io/buildbuddy/server/util/flag"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/priority_queue"
Expand All @@ -20,14 +22,23 @@ var (
cpuLeaserOverhead = flag.Float64("executor.cpu_leaser.overhead", .20, "The amount of extra CPU *above the task size* to include in a lease")
cpuLeaserMinOverhead = flag.Int("executor.cpu_leaser.min_overhead", 2, "Always ensure at least this many extra cpus are included in a lease")
cpuLeaserCPUSet = flag.String("executor.cpu_leaser.cpuset", "", "Manual override for the set of CPUs that may be leased. Ignored if empty. Ex. '0-1,3'")
warnAboutLeaks = flag.Bool("executor.cpu_leaser.warn_about_leaks", true, "If set, warn about leaked leases")
)

const (
// MaxNumLeases configures a safety valve to prevent a memory leak if
// leasers forget to close their leases.
MaxNumLeases = 1000
)

// Compile-time check that cpuLeaser implements the interface.
var _ interfaces.CPULeaser = (*cpuLeaser)(nil)
var _ interfaces.CPULeaser = (*CPULeaser)(nil)

type cpuLeaser struct {
type CPULeaser struct {
mu sync.Mutex
leases map[cpuInfo][]string
cpus []cpuInfo
leases []lease
load map[int]int
physicalProcessors int
}

Expand All @@ -36,6 +47,12 @@ type cpuInfo struct {
physicalID int // numa node
}

type lease struct {
taskID string
cpus []int
location string // only set if *warnAboutLeaks is enabled
}

func toCPUInfos(processors []int, physicalID int) []cpuInfo {
infos := make([]cpuInfo, len(processors))
for i, p := range processors {
Expand Down Expand Up @@ -114,9 +131,10 @@ func Parse(s string) ([]int, error) {
return processors, nil
}

func NewLeaser() (interfaces.CPULeaser, error) {
cl := &cpuLeaser{
leases: make(map[cpuInfo][]string),
func NewLeaser() (*CPULeaser, error) {
cl := &CPULeaser{
leases: make([]lease, 0, MaxNumLeases),
load: make(map[int]int, 0),
}

var cpus []cpuInfo
Expand All @@ -130,13 +148,16 @@ func NewLeaser() (interfaces.CPULeaser, error) {
cpus = GetCPUs()
}

cl.cpus = make([]cpuInfo, len(cpus))

processors := make(map[int]struct{}, 0)
for _, cpu := range cpus {
cl.leases[cpu] = make([]string, 0)
for i, cpu := range cpus {
cl.cpus[i] = cpu
cl.load[cpu.processor] = 0
processors[cpu.physicalID] = struct{}{}
}
cl.physicalProcessors = len(processors)
log.Debugf("NewLeaser with %d processors and %d cores", cl.physicalProcessors, len(cl.leases))
log.Debugf("NewLeaser with %d processors and %d cores", cl.physicalProcessors, len(cl.cpus))
return cl, nil
}

Expand Down Expand Up @@ -167,7 +188,7 @@ func WithNoOverhead() Option {

// Acquire leases a set of CPUs (identified by index) for a task. The returned
// function should be called to free the CPUs when they are no longer used.
func (l *cpuLeaser) Acquire(milliCPU int64, taskID string, opts ...any) (int, []int, func()) {
func (l *CPULeaser) Acquire(milliCPU int64, taskID string, opts ...any) (int, []int, func()) {
l.mu.Lock()
defer l.mu.Unlock()

Expand All @@ -181,15 +202,16 @@ func (l *cpuLeaser) Acquire(milliCPU int64, taskID string, opts ...any) (int, []
numCPUs := computeNumCPUs(milliCPU, !options.disableOverhead)
// If the CPU leaser is disabled; return all CPUs.
if !*cpuLeaserEnable {
numCPUs = len(l.leases)
numCPUs = len(l.cpus)
}

// Put all CPUs in a priority queue.
pq := priority_queue.New[cpuInfo]()
for cpuid, tasks := range l.leases {
for _, cpuInfo := range l.cpus {
numTasks := l.load[cpuInfo.processor]
// we want the least loaded cpus first, so give the
// cpus with more tasks a more negative score.
pq.Push(cpuid, -1*len(tasks))
pq.Push(cpuInfo, -1*numTasks)
}

// Get the set of CPUs, in order of load (incr).
Expand Down Expand Up @@ -217,30 +239,79 @@ func (l *cpuLeaser) Acquire(milliCPU int64, taskID string, opts ...any) (int, []
if c.physicalID != selectedNode {
continue
}
// If the CPULeaser is enabled, actually track the lease.
if *cpuLeaserEnable {
l.leases[c] = append(l.leases[c], taskID)
}
leaseSet = append(leaseSet, c.processor)
l.load[c.processor] += 1
if len(leaseSet) == numCPUs {
break
}
}

// If the CPULeaser is enabled, actually track the lease.
if *cpuLeaserEnable {
if len(l.leases) >= MaxNumLeases {
droppedLease := l.leases[0]
l.leases = l.leases[1:]
for _, processor := range droppedLease.cpus {
l.load[processor] -= 1
}
if *warnAboutLeaks {
alert.UnexpectedEvent("cpu_leaser_leak", "Acquire() handle leak at %s!", droppedLease.location)
}
}

lease := lease{
taskID: taskID,
cpus: leaseSet,
}
if *warnAboutLeaks {
if _, file, no, ok := runtime.Caller(1); ok {
lease.location = fmt.Sprintf("%s:%d", file, no)
}
}

l.leases = append(l.leases, lease)
}

log.Debugf("Leased %s to task: %q (%d milliCPU)", Format(leaseSet), taskID, milliCPU)
return selectedNode, leaseSet, func() {
l.release(taskID)
}
}

func (l *cpuLeaser) release(taskID string) {
func (l *CPULeaser) release(taskID string) {
l.mu.Lock()
defer l.mu.Unlock()

for cpuid, tasks := range l.leases {
l.leases[cpuid] = slices.DeleteFunc(tasks, func(s string) bool {
return s == taskID
})
var cpus []int
l.leases = slices.DeleteFunc(l.leases, func(l lease) bool {
if l.taskID == taskID {
cpus = l.cpus
return true
}
return false
})

for _, cpu := range cpus {
l.load[cpu] -= 1
}

log.Debugf("Task: %q released CPUs", taskID)
}

func (l *CPULeaser) TestOnlyGetOpenLeases() map[string]int {
l.mu.Lock()
defer l.mu.Unlock()

taskCounts := make(map[string]int)
for _, l := range l.leases {
taskCounts[l.taskID] = len(l.cpus)
}
return taskCounts
}

func (l *CPULeaser) TestOnlyGetLoads() map[int]int {
l.mu.Lock()
defer l.mu.Unlock()

return l.load
}
32 changes: 32 additions & 0 deletions enterprise/server/util/cpuset/cpuset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,35 @@ func TestNonContiguousNumaNodes(t *testing.T) {
assert.NotElementsMatch(t, cpus1, cpus2)
assert.NotEqual(t, numa1, numa2)
}

func TestMaxNumberOfLeases(t *testing.T) {
flags.Set(t, "executor.cpu_leaser.enable", true)
flags.Set(t, "executor.cpu_leaser.overhead", 0)
flags.Set(t, "executor.cpu_leaser.min_overhead", 0)
flags.Set(t, "executor.cpu_leaser.cpuset", "0")

cs, err := cpuset.NewLeaser()
require.NoError(t, err)

numLeases := cpuset.MaxNumLeases * 3
taskIDs := make([]string, numLeases)
cancels := make([]func(), numLeases)
for i := 0; i < numLeases; i++ {
task := uuid.New()
_, _, cancel := cs.Acquire(1000, task)
taskIDs[i] = task
cancels[i] = cancel
}

require.Equal(t, cpuset.MaxNumLeases, len(cs.TestOnlyGetOpenLeases()))
for _, cancel := range cancels {
cancel()
}

// There should be no open leases, and no active load after
// cancelling all open leases.
require.Equal(t, 0, len(cs.TestOnlyGetOpenLeases()))
for _, load := range cs.TestOnlyGetLoads() {
require.Equal(t, 0, load)
}
}

0 comments on commit 7020e60

Please sign in to comment.