Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split the analysis out of the hammer #125

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 73 additions & 57 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,57 +89,54 @@ func main() {
klog.Exitf("Failed to get initial state of the log: %v", err)
}

ha := newHammerAnalyser(&tracker, 100)
go ha.updateStatsLoop(ctx)
go ha.errorLoop(ctx)

gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize, *dupChance)
hammer := NewHammer(&tracker, f.Fetch, w.Write, gen)
hammer := NewHammer(&tracker, f.Fetch, w.Write, gen, ha.seqLeafChan, ha.errChan)
hammer.Run(ctx)

if *showUI {
c := newController(hammer)
c := newController(hammer, ha)
c.Run(ctx)
} else {
<-ctx.Done()
}
}

func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter, gen func() []byte) *Hammer {
func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter, gen func() []byte, seqLeafChan chan<- leafTime, errChan chan<- error) *Hammer {
readThrottle := NewThrottle(*maxReadOpsPerSecond)
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)
errChan := make(chan error, 20)
leafSampleChan := make(chan leafTime, 100)

randomReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, RandomNextLeaf(), readThrottle.tokenChan, errChan)
})
fullReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan)
})
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, leafSampleChan) })
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, seqLeafChan) })

return &Hammer{
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
errChan: errChan,
leafSampleChan: leafSampleChan,
integrationTime: movingaverage.New(30),
queueTime: movingaverage.New(30),
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
}
}

// Hammer is responsible for coordinating the operations against the log in the form
// of write and read operations. The work of analysing the results of hammering should
// live outside of this class.
type Hammer struct {
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
errChan chan error
leafSampleChan chan leafTime
queueTime *movingaverage.MovingAverage
integrationTime *movingaverage.MovingAverage
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
}

func (h *Hammer) Run(ctx context.Context) {
Expand All @@ -154,35 +151,10 @@ func (h *Hammer) Run(ctx context.Context) {
h.writers.Grow(ctx)
}

go h.errorLoop(ctx)

go h.readThrottle.Run(ctx)
go h.writeThrottle.Run(ctx)

go h.updateCheckpointLoop(ctx)
go h.updateStatsLoop(ctx)
}

func (h *Hammer) errorLoop(ctx context.Context) {
tick := time.NewTicker(time.Second)
pbCount := 0
for {
select {
case <-ctx.Done(): //context cancelled
return
case <-tick.C:
if pbCount > 0 {
klog.Warningf("%d requests received pushback from log", pbCount)
pbCount = 0
}
case err := <-h.errChan:
if errors.Is(err, ErrRetry) {
pbCount++
continue
}
klog.Warning(err)
}
}
}

func (h *Hammer) updateCheckpointLoop(ctx context.Context) {
Expand All @@ -209,16 +181,38 @@ func (h *Hammer) updateCheckpointLoop(ctx context.Context) {
}
}

func (h *Hammer) updateStatsLoop(ctx context.Context) {
func newHammerAnalyser(tracker *client.LogStateTracker, chanSize int) *HammerAnalyser {
leafSampleChan := make(chan leafTime, chanSize)
errChan := make(chan error, 20)
return &HammerAnalyser{
tracker: tracker,
seqLeafChan: leafSampleChan,
errChan: errChan,
integrationTime: movingaverage.New(30),
queueTime: movingaverage.New(30),
}
}

// HammerAnalyser is responsible for measuring and interpreting the result of hammering.
type HammerAnalyser struct {
tracker *client.LogStateTracker
seqLeafChan chan leafTime
errChan chan error

queueTime *movingaverage.MovingAverage
integrationTime *movingaverage.MovingAverage
}

func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) {
tick := time.NewTicker(100 * time.Millisecond)
size := h.tracker.LatestConsistent.Size
size := a.tracker.LatestConsistent.Size
for {
select {
case <-ctx.Done():
return
case <-tick.C:
}
newSize := h.tracker.LatestConsistent.Size
newSize := a.tracker.LatestConsistent.Size
if newSize <= size {
continue
}
Expand All @@ -229,7 +223,7 @@ func (h *Hammer) updateStatsLoop(ctx context.Context) {
var sample *leafTime
for {
if sample == nil {
l, ok := <-h.leafSampleChan
l, ok := <-a.seqLeafChan
if !ok {
break
}
Expand All @@ -254,8 +248,30 @@ func (h *Hammer) updateStatsLoop(ctx context.Context) {
sample = nil
}
if numLeaves > 0 {
h.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
h.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves))
a.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
a.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves))
}
}
}

func (a *HammerAnalyser) errorLoop(ctx context.Context) {
tick := time.NewTicker(time.Second)
pbCount := 0
for {
select {
case <-ctx.Done(): //context cancelled
return
case <-tick.C:
if pbCount > 0 {
klog.Warningf("%d requests received pushback from log", pbCount)
pbCount = 0
}
case err := <-a.errChan:
if errors.Is(err, ErrRetry) {
pbCount++
continue
}
klog.Warning(err)
}
}
}
Expand Down
27 changes: 17 additions & 10 deletions hammer/tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"flag"
"fmt"
"strings"
"time"

movingaverage "github.com/RobinUS2/golang-moving-average"
Expand All @@ -28,16 +29,18 @@ import (

type tuiController struct {
hammer *Hammer
analyser *HammerAnalyser
app *tview.Application
statusView *tview.TextView
logView *tview.TextView
helpView *tview.TextView
}

func newController(h *Hammer) *tuiController {
func newController(h *Hammer, a *HammerAnalyser) *tuiController {
c := tuiController{
hammer: h,
app: tview.NewApplication(),
hammer: h,
analyser: a,
app: tview.NewApplication(),
}
grid := tview.NewGrid()
grid.SetRows(5, 0, 10).SetColumns(0).SetBorders(true)
Expand Down Expand Up @@ -129,17 +132,21 @@ func (c *tuiController) updateStatsLoop(ctx context.Context, interval time.Durat
growth.Add(float64(s - lastSize))
lastSize = s
qps := growth.Avg() * float64(time.Second/interval)
text := fmt.Sprintf("Read (%d workers): %s\nWrite (%d workers): %s\nTreeSize: %d (Δ %.0fqps over %ds)\nTime-in-queue: %s\nObserved-time-to-integrate: %s",
readWorkersLine := fmt.Sprintf("Read (%d workers): %s",
c.hammer.fullReaders.Size()+c.hammer.randomReaders.Size(),
c.hammer.readThrottle.String(),
c.hammer.readThrottle.String())
writeWorkersLine := fmt.Sprintf("Write (%d workers): %s",
c.hammer.writers.Size(),
c.hammer.writeThrottle.String(),
c.hammer.writeThrottle.String())
treeSizeLine := fmt.Sprintf("TreeSize: %d (Δ %.0fqps over %ds)",
s,
qps,
time.Duration(maSlots*int(interval))/time.Second,
formatMovingAverage(c.hammer.queueTime),
formatMovingAverage(c.hammer.integrationTime),
)
time.Duration(maSlots*int(interval))/time.Second)
queueLine := fmt.Sprintf("Time-in-queue: %s",
formatMovingAverage(c.analyser.queueTime))
integrateLine := fmt.Sprintf("Observed-time-to-integrate: %s",
formatMovingAverage(c.analyser.integrationTime))
text := strings.Join([]string{readWorkersLine, writeWorkersLine, treeSizeLine, queueLine, integrateLine}, "\n")
mhutchinson marked this conversation as resolved.
Show resolved Hide resolved
c.statusView.SetText(text)
c.app.Draw()
}
Expand Down
Loading