Skip to content

Commit

Permalink
Hammer: refactor for clearer separation (#114)
Browse files Browse the repository at this point in the history
This is just moving code around for clearer separation and documentation
via method naming.
  • Loading branch information
mhutchinson authored Aug 6, 2024
1 parent f20cc14 commit 247cea4
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 183 deletions.
272 changes: 89 additions & 183 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"time"

movingaverage "github.com/RobinUS2/golang-moving-average"
"github.com/gdamore/tcell/v2"
"github.com/rivo/tview"
"github.com/transparency-dev/trillian-tessera/client"
"golang.org/x/mod/sumdb/note"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -154,107 +152,110 @@ func (h *Hammer) Run(ctx context.Context) {
h.writers.Grow(ctx)
}

// Set up logging for any errors
go func() {
tick := time.NewTicker(time.Second)
pbCount := 0
for {
select {
case <-ctx.Done(): //context cancelled
return
case <-tick.C:
if pbCount > 0 {
klog.Warningf("%d requests received pushback from log", pbCount)
pbCount = 0
}
case err := <-h.errChan:
if errors.Is(err, ErrRetry) {
pbCount++
continue
}
klog.Warning(err)
}
}
}()
go h.errorLoop(ctx)

// Start the throttles
go h.readThrottle.Run(ctx)
go h.writeThrottle.Run(ctx)

go func() {
tick := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-ctx.Done():
return
case <-tick.C:
size := h.tracker.LatestConsistent.Size
_, _, _, err := h.tracker.Update(ctx)
if err != nil {
klog.Warning(err)
inconsistentErr := client.ErrInconsistency{}
if errors.As(err, &inconsistentErr) {
klog.Fatalf("Last Good Checkpoint:\n%s\n\nFirst Bad Checkpoint:\n%s\n\n%v", string(inconsistentErr.SmallerRaw), string(inconsistentErr.LargerRaw), inconsistentErr)
}
}
newSize := h.tracker.LatestConsistent.Size
if newSize > size {
klog.V(1).Infof("Updated checkpoint from %d to %d", size, newSize)
}
go h.updateCheckpointLoop(ctx)
go h.updateStatsLoop(ctx)
}

func (h *Hammer) errorLoop(ctx context.Context) {
tick := time.NewTicker(time.Second)
pbCount := 0
for {
select {
case <-ctx.Done(): //context cancelled
return
case <-tick.C:
if pbCount > 0 {
klog.Warningf("%d requests received pushback from log", pbCount)
pbCount = 0
}
case err := <-h.errChan:
if errors.Is(err, ErrRetry) {
pbCount++
continue
}
klog.Warning(err)
}
}()
}
}

go func() {
tick := time.NewTicker(100 * time.Millisecond)
size := h.tracker.LatestConsistent.Size
for {
select {
case <-ctx.Done():
return
case <-tick.C:
func (h *Hammer) updateCheckpointLoop(ctx context.Context) {
tick := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-ctx.Done():
return
case <-tick.C:
size := h.tracker.LatestConsistent.Size
_, _, _, err := h.tracker.Update(ctx)
if err != nil {
klog.Warning(err)
inconsistentErr := client.ErrInconsistency{}
if errors.As(err, &inconsistentErr) {
klog.Fatalf("Last Good Checkpoint:\n%s\n\nFirst Bad Checkpoint:\n%s\n\n%v", string(inconsistentErr.SmallerRaw), string(inconsistentErr.LargerRaw), inconsistentErr)
}
}
newSize := h.tracker.LatestConsistent.Size
if newSize <= size {
continue
if newSize > size {
klog.V(1).Infof("Updated checkpoint from %d to %d", size, newSize)
}
now := time.Now()
totalLatency := time.Duration(0)
queueLatency := time.Duration(0)
numLeaves := 0
var sample *leafTime
for {
if sample == nil {
l, ok := <-h.leafSampleChan
if !ok {
break
}
sample = &l
}
// Stop considering leaf times once we've caught up with that cross
// either the current checkpoint or "now":
// - leaves with indices beyond the tree size we're considering are not integrated yet, so we can't calculate their TTI
// - leaves which were queued before "now", but not assigned by "now" should also be ignored as they don't fall into this epoch (and would contribute a -ve latency if they were included).
if sample.idx >= newSize || sample.assignedAt.After(now) {
}
}
}

func (h *Hammer) updateStatsLoop(ctx context.Context) {
tick := time.NewTicker(100 * time.Millisecond)
size := h.tracker.LatestConsistent.Size
for {
select {
case <-ctx.Done():
return
case <-tick.C:
}
newSize := h.tracker.LatestConsistent.Size
if newSize <= size {
continue
}
now := time.Now()
totalLatency := time.Duration(0)
queueLatency := time.Duration(0)
numLeaves := 0
var sample *leafTime
for {
if sample == nil {
l, ok := <-h.leafSampleChan
if !ok {
break
}
queueLatency += sample.assignedAt.Sub(sample.queuedAt)
// totalLatency is skewed towards being higher than perhaps it may technically be by:
// - the tick interval of this goroutine,
// - the tick interval of the goroutine which updates the LogStateTracker,
// - any latency in writes to the log becoming visible for reads.
// But it's probably good enough for now.
totalLatency += now.Sub(sample.queuedAt)

numLeaves++
sample = nil
sample = &l
}
if numLeaves > 0 {
h.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
h.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves))
// Stop considering leaf times once we've caught up with that cross
// either the current checkpoint or "now":
// - leaves with indices beyond the tree size we're considering are not integrated yet, so we can't calculate their TTI
// - leaves which were queued before "now", but not assigned by "now" should also be ignored as they don't fall into this epoch (and would contribute a -ve latency if they were included).
if sample.idx >= newSize || sample.assignedAt.After(now) {
break
}
queueLatency += sample.assignedAt.Sub(sample.queuedAt)
// totalLatency is skewed towards being higher than perhaps it may technically be by:
// - the tick interval of this goroutine,
// - the tick interval of the goroutine which updates the LogStateTracker,
// - any latency in writes to the log becoming visible for reads.
// But it's probably good enough for now.
totalLatency += now.Sub(sample.queuedAt)

numLeaves++
sample = nil
}
if numLeaves > 0 {
h.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
h.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves))
}
}()
}
}

func genLeaf(n uint64, minLeafSize int) []byte {
Expand Down Expand Up @@ -347,101 +348,6 @@ func (t *Throttle) String() string {
return fmt.Sprintf("Current max: %d/s. Oversupply in last second: %d", t.opsPerSecond, t.oversupply)
}

func formatMovingAverage(ma *movingaverage.MovingAverage) string {
aMin, _ := ma.Min()
aMax, _ := ma.Max()
aAvg := ma.Avg()
return fmt.Sprintf("%.0fms/%.0fms/%.0fms (min/avg/max)", aMin, aAvg, aMax)
}

func hostUI(ctx context.Context, hammer *Hammer) {
grid := tview.NewGrid()
grid.SetRows(5, 0, 10).SetColumns(0).SetBorders(true)
// Status box
statusView := tview.NewTextView()
grid.AddItem(statusView, 0, 0, 1, 1, 0, 0, false)
// Log view box
logView := tview.NewTextView()
logView.ScrollToEnd()
logView.SetMaxLines(10000)
grid.AddItem(logView, 1, 0, 1, 1, 0, 0, false)
if err := flag.Set("logtostderr", "false"); err != nil {
klog.Exitf("Failed to set flag: %v", err)
}
if err := flag.Set("alsologtostderr", "false"); err != nil {
klog.Exitf("Failed to set flag: %v", err)
}
klog.SetOutput(logView)

helpView := tview.NewTextView()
helpView.SetText("+/- to increase/decrease read load\n>/< to increase/decrease write load\nw/W to increase/decrease workers")
grid.AddItem(helpView, 2, 0, 1, 1, 0, 0, false)

app := tview.NewApplication()
interval := 500 * time.Millisecond
ticker := time.NewTicker(interval)
go func() {
lastSize := hammer.tracker.LatestConsistent.Size
maSlots := int((30 * time.Second) / interval)
growth := movingaverage.New(maSlots)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s := hammer.tracker.LatestConsistent.Size
growth.Add(float64(s - lastSize))
lastSize = s
qps := growth.Avg() * float64(time.Second/interval)
text := fmt.Sprintf("Read: %s\nWrite: %s\nTreeSize: %d (Δ %.0fqps over %ds)\nTime-in-queue: %s\nObserved-time-to-integrate: %s",
hammer.readThrottle.String(),
hammer.writeThrottle.String(),
s,
qps,
time.Duration(maSlots*int(interval))/time.Second,
formatMovingAverage(hammer.queueTime),
formatMovingAverage(hammer.integrationTime),
)
statusView.SetText(text)
app.Draw()
}
}
}()
app.SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey {
switch event.Rune() {
case '+':
klog.Info("Increasing the read operations per second")
hammer.readThrottle.Increase()
case '-':
klog.Info("Decreasing the read operations per second")
hammer.readThrottle.Decrease()
case '>':
klog.Info("Increasing the write operations per second")
hammer.writeThrottle.Increase()
case '<':
klog.Info("Decreasing the write operations per second")
hammer.writeThrottle.Decrease()
case 'w':
klog.Info("Increasing the number of workers")
hammer.randomReaders.Grow(ctx)
hammer.fullReaders.Grow(ctx)
hammer.writers.Grow(ctx)
case 'W':
klog.Info("Decreasing the number of workers")
hammer.randomReaders.Shrink(ctx)
hammer.fullReaders.Shrink(ctx)
hammer.writers.Shrink(ctx)
}
return event
})
// logView.SetChangedFunc(func() {
// app.Draw()
// })
if err := app.SetRoot(grid, true).Run(); err != nil {
panic(err)
}
}

// multiStringFlag allows a flag to be specified multiple times on the command
// line, and stores all of these values.
type multiStringFlag []string
Expand Down
Loading

0 comments on commit 247cea4

Please sign in to comment.