Skip to content

Commit

Permalink
Hammer stats (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter authored Aug 1, 2024
1 parent 4a6d2c3 commit b14d898
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 20 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.5
require (
cloud.google.com/go/spanner v1.65.0
cloud.google.com/go/storage v1.43.0
github.com/RobinUS2/golang-moving-average v1.0.0
github.com/gdamore/tcell/v2 v2.7.4
github.com/globocom/go-buffer v1.2.2
github.com/google/go-cmp v0.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,8 @@ github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 h1:oVLqHXhnYtUwM89y9T1
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0/go.mod h1:dppbR7CwXD4pgtV9t3wD1812RaLDcBjtblcDF5f1vI0=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RobinUS2/golang-moving-average v1.0.0 h1:PD7DDZNt+UFb9XlsBbTIu/DtXqqaD/MD86DYnk3mwvA=
github.com/RobinUS2/golang-moving-average v1.0.0/go.mod h1:MdzhY+KoEvi+OBygTPH0OSaKrOJzvILWN2SPQzaKVsY=
github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY=
github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
Expand Down
120 changes: 101 additions & 19 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"time"

movingaverage "github.com/RobinUS2/golang-moving-average"
"github.com/gdamore/tcell/v2"
"github.com/rivo/tview"
"github.com/transparency-dev/trillian-tessera/client"
Expand Down Expand Up @@ -103,6 +104,7 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter)
readThrottle := NewThrottle(*maxReadOpsPerSecond)
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)
errChan := make(chan error, 20)
leafSampleChan := make(chan leafTime, 100)

gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize)
randomReaders := newWorkerPool(func() worker {
Expand All @@ -111,27 +113,33 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter)
fullReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan)
})
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan) })
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, leafSampleChan) })

return &Hammer{
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
errChan: errChan,
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
errChan: errChan,
leafSampleChan: leafSampleChan,
integrationTime: movingaverage.New(30),
queueTime: movingaverage.New(30),
}
}

type Hammer struct {
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
errChan chan error
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
errChan chan error
leafSampleChan chan leafTime
queueTime *movingaverage.MovingAverage
integrationTime *movingaverage.MovingAverage
}

func (h *Hammer) Run(ctx context.Context) {
Expand Down Expand Up @@ -163,7 +171,7 @@ func (h *Hammer) Run(ctx context.Context) {
go h.writeThrottle.Run(ctx)

go func() {
tick := time.NewTicker(1 * time.Second)
tick := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-ctx.Done():
Expand All @@ -185,6 +193,57 @@ func (h *Hammer) Run(ctx context.Context) {
}
}
}()

go func() {
tick := time.NewTicker(100 * time.Millisecond)
size := h.tracker.LatestConsistent.Size
for {
select {
case <-ctx.Done():
return
case <-tick.C:
}
newSize := h.tracker.LatestConsistent.Size
if newSize <= size {
continue
}
now := time.Now()
totalLatency := time.Duration(0)
queueLatency := time.Duration(0)
numLeaves := 0
var sample *leafTime
for {
if sample == nil {
l, ok := <-h.leafSampleChan
if !ok {
break
}
sample = &l
}
// Stop considering leaf times once we've caught up with that cross
// either the current checkpoint or "now":
// - leaves with indices beyond the tree size we're considering are not integrated yet, so we can't calculate their TTI
// - leaves which were queued before "now", but not assigned by "now" should also be ignored as they don't fall into this epoch (and would contribute a -ve latency if they were included).
if sample.idx >= newSize || sample.assignedAt.After(now) {
break
}
queueLatency += sample.assignedAt.Sub(sample.queuedAt)
// totalLatency is skewed towards being higher than perhaps it may technically be by:
// - the tick interval of this goroutine,
// - the tick interval of the goroutine which updates the LogStateTracker,
// - any latency in writes to the log becoming visible for reads.
// But it's probably good enough for now.
totalLatency += now.Sub(sample.queuedAt)

numLeaves++
sample = nil
}
if numLeaves > 0 {
h.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
h.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves))
}
}
}()
}

func genLeaf(n uint64, minLeafSize int) []byte {
Expand Down Expand Up @@ -277,9 +336,16 @@ func (t *Throttle) String() string {
return fmt.Sprintf("Current max: %d/s. Oversupply in last second: %d", t.opsPerSecond, t.oversupply)
}

func formatMovingAverage(ma *movingaverage.MovingAverage) string {
aMin, _ := ma.Min()
aMax, _ := ma.Max()
aAvg := ma.Avg()
return fmt.Sprintf("%.0fms/%.0fms/%.0fms (min/avg/max)", aMin, aAvg, aMax)
}

func hostUI(ctx context.Context, hammer *Hammer) {
grid := tview.NewGrid()
grid.SetRows(3, 0, 10).SetColumns(0).SetBorders(true)
grid.SetRows(5, 0, 10).SetColumns(0).SetBorders(true)
// Status box
statusView := tview.NewTextView()
grid.AddItem(statusView, 0, 0, 1, 1, 0, 0, false)
Expand All @@ -301,14 +367,30 @@ func hostUI(ctx context.Context, hammer *Hammer) {
grid.AddItem(helpView, 2, 0, 1, 1, 0, 0, false)

app := tview.NewApplication()
ticker := time.NewTicker(1 * time.Second)
interval := 500 * time.Millisecond
ticker := time.NewTicker(interval)
go func() {
lastSize := hammer.tracker.LatestConsistent.Size
maSlots := int((30 * time.Second) / interval)
growth := movingaverage.New(maSlots)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
text := fmt.Sprintf("Read: %s\nWrite: %s", hammer.readThrottle.String(), hammer.writeThrottle.String())
s := hammer.tracker.LatestConsistent.Size
growth.Add(float64(s - lastSize))
lastSize = s
qps := growth.Avg() * float64(time.Second/interval)
text := fmt.Sprintf("Read: %s\nWrite: %s\nTreeSize: %d (Δ %.0fqps over %ds)\nTime-in-queue: %s\nObserved-time-to-integrate: %s",
hammer.readThrottle.String(),
hammer.writeThrottle.String(),
s,
qps,
time.Duration(maSlots*int(interval))/time.Second,
formatMovingAverage(hammer.queueTime),
formatMovingAverage(hammer.integrationTime),
)
statusView.SetText(text)
app.Draw()
}
Expand Down
23 changes: 22 additions & 1 deletion hammer/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math/rand"
"time"

"github.com/transparency-dev/trillian-tessera/client"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -151,15 +152,26 @@ func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 {
}
}

// leafTime records the time at which a leaf was assigned the given index.
//
// This is used when sampling leaves which are added in order to later calculate
// how long it took to for them to become integrated.
type leafTime struct {
idx uint64
queuedAt time.Time
assignedAt time.Time
}

// NewLogWriter creates a LogWriter.
// u is the URL of the write endpoint for the log.
// gen is a function that generates new leaves to add.
func NewLogWriter(writer LeafWriter, gen func() []byte, throttle <-chan bool, errChan chan<- error) *LogWriter {
func NewLogWriter(writer LeafWriter, gen func() []byte, throttle <-chan bool, errChan chan<- error, leafSampleChan chan<- leafTime) *LogWriter {
return &LogWriter{
writer: writer,
gen: gen,
throttle: throttle,
errChan: errChan,
leafChan: leafSampleChan,
}
}

Expand All @@ -169,6 +181,7 @@ type LogWriter struct {
gen func() []byte
throttle <-chan bool
errChan chan<- error
leafChan chan<- leafTime
cancel func()
}

Expand All @@ -185,11 +198,19 @@ func (w *LogWriter) Run(ctx context.Context) {
case <-w.throttle:
}
newLeaf := w.gen()
lt := leafTime{queuedAt: time.Now()}
index, err := w.writer(ctx, newLeaf)
if err != nil {
w.errChan <- fmt.Errorf("failed to create request: %v", err)
continue
}
lt.idx, lt.assignedAt = index, time.Now()
// See if we can send a leaf sample
select {
// TODO: we might want to count dropped samples, and/or make sampling a bit more statistical.
case w.leafChan <- lt:
default:
}
klog.V(2).Infof("Wrote leaf at index %d", index)
}
}
Expand Down

0 comments on commit b14d898

Please sign in to comment.