diff --git a/hammer/hammer.go b/hammer/hammer.go index 64c5a32..6386769 100644 --- a/hammer/hammer.go +++ b/hammer/hammer.go @@ -156,19 +156,15 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.UR writeThrottle := NewThrottle(*maxWriteOpsPerSecond) errChan := make(chan error, 20) - randomReaders := make([]*LeafReader, *numReadersRandom) - fullReaders := make([]*LeafReader, *numReadersFull) - writers := make([]*LogWriter, *numWriters) - for i := 0; i < *numReadersRandom; i++ { - randomReaders[i] = NewLeafReader(tracker, f, RandomNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan) - } - for i := 0; i < *numReadersFull; i++ { - fullReaders[i] = NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan) - } gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize) - for i := 0; i < *numWriters; i++ { - writers[i] = NewLogWriter(hc, addURL, gen, writeThrottle.tokenChan, errChan) - } + randomReaders := newWorkerPool(func() worker { + return NewLeafReader(tracker, f, RandomNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan) + }) + fullReaders := newWorkerPool(func() worker { + return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan) + }) + writers := newWorkerPool(func() worker { return NewLogWriter(hc, addURL, gen, writeThrottle.tokenChan, errChan) }) + return &Hammer{ randomReaders: randomReaders, fullReaders: fullReaders, @@ -181,9 +177,9 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.UR } type Hammer struct { - randomReaders []*LeafReader - fullReaders []*LeafReader - writers []*LogWriter + randomReaders workerPool + fullReaders workerPool + writers workerPool readThrottle *Throttle writeThrottle *Throttle tracker *client.LogStateTracker @@ -192,14 +188,14 @@ type Hammer struct { func (h *Hammer) Run(ctx context.Context) { // Kick off readers & writers - for _, r := range h.randomReaders { - go r.Run(ctx) + for i := 0; i < *numReadersRandom; i++ { + h.randomReaders.Grow(ctx) } - for _, r := range h.fullReaders { - go r.Run(ctx) + for i := 0; i < *numReadersFull; i++ { + h.fullReaders.Grow(ctx) } - for _, w := range h.writers { - go w.Run(ctx) + for i := 0; i < *numWriters; i++ { + h.writers.Grow(ctx) } // Set up logging for any errors @@ -352,7 +348,7 @@ func hostUI(ctx context.Context, hammer *Hammer) { klog.SetOutput(logView) helpView := tview.NewTextView() - helpView.SetText("+/- to increase/decrease read load\n>/< to increase/decrease write load") + helpView.SetText("+/- to increase/decrease read load\n>/< to increase/decrease write load\nw/W to increase/decrease workers") grid.AddItem(helpView, 2, 0, 1, 1, 0, 0, false) app := tview.NewApplication() @@ -383,6 +379,16 @@ func hostUI(ctx context.Context, hammer *Hammer) { case '<': klog.Info("Decreasing the write operations per second") hammer.writeThrottle.Decrease() + case 'w': + klog.Info("Increasing the number of workers") + hammer.randomReaders.Grow(ctx) + hammer.fullReaders.Grow(ctx) + hammer.writers.Grow(ctx) + case 'W': + klog.Info("Decreasing the number of workers") + hammer.randomReaders.Shrink(ctx) + hammer.fullReaders.Shrink(ctx) + hammer.writers.Shrink(ctx) } return event }) diff --git a/hammer/workerpool.go b/hammer/workerpool.go new file mode 100644 index 0000000..9b0beeb --- /dev/null +++ b/hammer/workerpool.go @@ -0,0 +1,38 @@ +package main + +import "context" + +type worker interface { + Run(ctx context.Context) + Kill() +} + +func newWorkerPool(factory func() worker) workerPool { + workers := make([]worker, 0) + pool := workerPool{ + workers: workers, + factory: factory, + } + return pool +} + +// workerPool contains a collection of _running_ workers. +type workerPool struct { + workers []worker + factory func() worker +} + +func (p *workerPool) Grow(ctx context.Context) { + w := p.factory() + p.workers = append(p.workers, w) + go w.Run(ctx) +} + +func (p *workerPool) Shrink(ctx context.Context) { + if len(p.workers) == 0 { + return + } + w := p.workers[len(p.workers)-1] + p.workers = p.workers[:len(p.workers)-1] + w.Kill() +}