Skip to content

Commit

Permalink
Refactor reader/writer worker pools (#154)
Browse files Browse the repository at this point in the history
These were previously stored in raw slices. Now they are stored within
the workerPool abstraction. This couples the ability to create a new
worker with the pool, making growing and shrinking the number of workers
easy.

Support w/W keys for increasing/decreasing workers

If this needs to be more granular then we can split this to different
keys for changing the size of each of the 3 pools separately.
  • Loading branch information
mhutchinson authored Jun 12, 2024
1 parent 0da203b commit 276c407
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 22 deletions.
50 changes: 28 additions & 22 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,15 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.UR
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)
errChan := make(chan error, 20)

randomReaders := make([]*LeafReader, *numReadersRandom)
fullReaders := make([]*LeafReader, *numReadersFull)
writers := make([]*LogWriter, *numWriters)
for i := 0; i < *numReadersRandom; i++ {
randomReaders[i] = NewLeafReader(tracker, f, RandomNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan)
}
for i := 0; i < *numReadersFull; i++ {
fullReaders[i] = NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan)
}
gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize)
for i := 0; i < *numWriters; i++ {
writers[i] = NewLogWriter(hc, addURL, gen, writeThrottle.tokenChan, errChan)
}
randomReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, RandomNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan)
})
fullReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan)
})
writers := newWorkerPool(func() worker { return NewLogWriter(hc, addURL, gen, writeThrottle.tokenChan, errChan) })

return &Hammer{
randomReaders: randomReaders,
fullReaders: fullReaders,
Expand All @@ -181,9 +177,9 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.UR
}

type Hammer struct {
randomReaders []*LeafReader
fullReaders []*LeafReader
writers []*LogWriter
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
Expand All @@ -192,14 +188,14 @@ type Hammer struct {

func (h *Hammer) Run(ctx context.Context) {
// Kick off readers & writers
for _, r := range h.randomReaders {
go r.Run(ctx)
for i := 0; i < *numReadersRandom; i++ {
h.randomReaders.Grow(ctx)
}
for _, r := range h.fullReaders {
go r.Run(ctx)
for i := 0; i < *numReadersFull; i++ {
h.fullReaders.Grow(ctx)
}
for _, w := range h.writers {
go w.Run(ctx)
for i := 0; i < *numWriters; i++ {
h.writers.Grow(ctx)
}

// Set up logging for any errors
Expand Down Expand Up @@ -352,7 +348,7 @@ func hostUI(ctx context.Context, hammer *Hammer) {
klog.SetOutput(logView)

helpView := tview.NewTextView()
helpView.SetText("+/- to increase/decrease read load\n>/< to increase/decrease write load")
helpView.SetText("+/- to increase/decrease read load\n>/< to increase/decrease write load\nw/W to increase/decrease workers")
grid.AddItem(helpView, 2, 0, 1, 1, 0, 0, false)

app := tview.NewApplication()
Expand Down Expand Up @@ -383,6 +379,16 @@ func hostUI(ctx context.Context, hammer *Hammer) {
case '<':
klog.Info("Decreasing the write operations per second")
hammer.writeThrottle.Decrease()
case 'w':
klog.Info("Increasing the number of workers")
hammer.randomReaders.Grow(ctx)
hammer.fullReaders.Grow(ctx)
hammer.writers.Grow(ctx)
case 'W':
klog.Info("Decreasing the number of workers")
hammer.randomReaders.Shrink(ctx)
hammer.fullReaders.Shrink(ctx)
hammer.writers.Shrink(ctx)
}
return event
})
Expand Down
38 changes: 38 additions & 0 deletions hammer/workerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import "context"

type worker interface {
Run(ctx context.Context)
Kill()
}

func newWorkerPool(factory func() worker) workerPool {
workers := make([]worker, 0)
pool := workerPool{
workers: workers,
factory: factory,
}
return pool
}

// workerPool contains a collection of _running_ workers.
type workerPool struct {
workers []worker
factory func() worker
}

func (p *workerPool) Grow(ctx context.Context) {
w := p.factory()
p.workers = append(p.workers, w)
go w.Run(ctx)
}

func (p *workerPool) Shrink(ctx context.Context) {
if len(p.workers) == 0 {
return
}
w := p.workers[len(p.workers)-1]
p.workers = p.workers[:len(p.workers)-1]
w.Kill()
}

0 comments on commit 276c407

Please sign in to comment.