Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hammer stats #96

Merged
merged 7 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.5
require (
cloud.google.com/go/spanner v1.65.0
cloud.google.com/go/storage v1.43.0
github.com/RobinUS2/golang-moving-average v1.0.0
github.com/gdamore/tcell/v2 v2.7.4
github.com/globocom/go-buffer v1.2.2
github.com/google/go-cmp v0.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,8 @@ github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 h1:oVLqHXhnYtUwM89y9T1
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0/go.mod h1:dppbR7CwXD4pgtV9t3wD1812RaLDcBjtblcDF5f1vI0=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RobinUS2/golang-moving-average v1.0.0 h1:PD7DDZNt+UFb9XlsBbTIu/DtXqqaD/MD86DYnk3mwvA=
github.com/RobinUS2/golang-moving-average v1.0.0/go.mod h1:MdzhY+KoEvi+OBygTPH0OSaKrOJzvILWN2SPQzaKVsY=
github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY=
github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
Expand Down
120 changes: 101 additions & 19 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"time"

movingaverage "github.com/RobinUS2/golang-moving-average"
"github.com/gdamore/tcell/v2"
"github.com/rivo/tview"
"github.com/transparency-dev/trillian-tessera/client"
Expand Down Expand Up @@ -103,6 +104,7 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter)
readThrottle := NewThrottle(*maxReadOpsPerSecond)
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)
errChan := make(chan error, 20)
leafSampleChan := make(chan leafTime, 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe leafWriteStats instead? It isn't sampled from what I can see, and it only comes from the writer workers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, it's sampled in a very primitive sense in that the channel only takes 100 entries, and if it's full we just drop the sample.

Happy to change that approach though.


gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize)
randomReaders := newWorkerPool(func() worker {
Expand All @@ -111,27 +113,33 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter)
fullReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan)
})
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan) })
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, leafSampleChan) })

return &Hammer{
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
errChan: errChan,
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
errChan: errChan,
leafSampleChan: leafSampleChan,
integrationTime: movingaverage.New(30),
queueTime: movingaverage.New(30),
}
}

type Hammer struct {
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
errChan chan error
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
errChan chan error
leafSampleChan chan leafTime
queueTime *movingaverage.MovingAverage
integrationTime *movingaverage.MovingAverage
}

func (h *Hammer) Run(ctx context.Context) {
Expand Down Expand Up @@ -163,7 +171,7 @@ func (h *Hammer) Run(ctx context.Context) {
go h.writeThrottle.Run(ctx)

go func() {
tick := time.NewTicker(1 * time.Second)
tick := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-ctx.Done():
Expand All @@ -185,6 +193,57 @@ func (h *Hammer) Run(ctx context.Context) {
}
}
}()

go func() {
tick := time.NewTicker(100 * time.Millisecond)
size := h.tracker.LatestConsistent.Size
for {
select {
case <-ctx.Done():
return
case <-tick.C:
}
newSize := h.tracker.LatestConsistent.Size
if newSize <= size {
continue
}
now := time.Now()
totalLatency := time.Duration(0)
queueLatency := time.Duration(0)
numLeaves := 0
var sample *leafTime
for {
if sample == nil {
l, ok := <-h.leafSampleChan
if !ok {
break
}
sample = &l
}
// Stop considering leaf times once we've caught up with that cross
// either the current checkpoint or "now":
// - leaves with indices beyond the tree size we're considering are not integrated yet, so we can't calculate their TTI
// - leaves which were queued before "now", but not assigned by "now" should also be ignored as they don't fall into this epoch (and would contribute a -ve latency if they were included).
if sample.idx >= newSize || sample.assignedAt.After(now) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this have a comment explaining why both conditions are here? I can see the first one plainly, but the second one seems like it shouldn't be possible if the first condition holds, and would thus be redundant. But I can't completely convince myself of that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this helps. I think the second case can't happen as implemented right now though? It looks like we only feed it down the channel when it's got an index, right?

break
}
queueLatency += sample.assignedAt.Sub(sample.queuedAt)
// totalLatency is skewed towards being higher than perhaps it may technically be by:
// - the tick interval of this goroutine,
// - the tick interval of the goroutine which updates the LogStateTracker,
// - any latency in writes to the log becoming visible for reads.
// But it's probably good enough for now.
totalLatency += now.Sub(sample.queuedAt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More an observation than an actionable comment, but this total latency is somewhat skewed by the tick time above right? So it's more like total observed latency. Could be worth a comment. Or not... :-)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly, yup - the tick in the goroutine which calculates the latency and the tick in the one which clocks the LogStateTracker forwards.

I've changed the text on the UI and added a comment.


numLeaves++
sample = nil
}
if numLeaves > 0 {
h.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
h.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves))
Comment on lines +242 to +243
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to do it this way instead of posting the row values? This is a bit blurry because it calculates a mean here, and then the moving average will calculate averages from them (if I understand the library right).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly just trying to keep the size of the retained data down, and have some sort of loose idea of how the window relates to time even as throughput varies.

I believe that a mean of means is the same as mean of all the things if the subsets are of equal size. Admittedly this is probably not always the case here (certainly if throughput is varying and < 100qps then it's pretty unlikely to be), but tbh with the sampling and "observed" nature of this it's probably all only really good enough for a rough guide to what's going on, but that should be enough to tell you if your log is melting and falling further and further behind, for example.

I'm sure there are better/more precise ways of doing it :)

}
}
}()
}

func genLeaf(n uint64, minLeafSize int) []byte {
Expand Down Expand Up @@ -277,9 +336,16 @@ func (t *Throttle) String() string {
return fmt.Sprintf("Current max: %d/s. Oversupply in last second: %d", t.opsPerSecond, t.oversupply)
}

func formatMovingAverage(ma *movingaverage.MovingAverage) string {
aMin, _ := ma.Min()
aMax, _ := ma.Max()
aAvg := ma.Avg()
return fmt.Sprintf("%.0fms/%.0fms/%.0fms (min/avg/max)", aMin, aAvg, aMax)
}

func hostUI(ctx context.Context, hammer *Hammer) {
grid := tview.NewGrid()
grid.SetRows(3, 0, 10).SetColumns(0).SetBorders(true)
grid.SetRows(5, 0, 10).SetColumns(0).SetBorders(true)
// Status box
statusView := tview.NewTextView()
grid.AddItem(statusView, 0, 0, 1, 1, 0, 0, false)
Expand All @@ -301,14 +367,30 @@ func hostUI(ctx context.Context, hammer *Hammer) {
grid.AddItem(helpView, 2, 0, 1, 1, 0, 0, false)

app := tview.NewApplication()
ticker := time.NewTicker(1 * time.Second)
interval := 500 * time.Millisecond
ticker := time.NewTicker(interval)
go func() {
lastSize := hammer.tracker.LatestConsistent.Size
maSlots := int((30 * time.Second) / interval)
growth := movingaverage.New(maSlots)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
text := fmt.Sprintf("Read: %s\nWrite: %s", hammer.readThrottle.String(), hammer.writeThrottle.String())
s := hammer.tracker.LatestConsistent.Size
growth.Add(float64(s - lastSize))
lastSize = s
qps := growth.Avg() * float64(time.Second/interval)
text := fmt.Sprintf("Read: %s\nWrite: %s\nTreeSize: %d (Δ %.0fqps over %ds)\nTime-in-queue: %s\nObserved-time-to-integrate: %s",
hammer.readThrottle.String(),
hammer.writeThrottle.String(),
s,
qps,
time.Duration(maSlots*int(interval))/time.Second,
formatMovingAverage(hammer.queueTime),
formatMovingAverage(hammer.integrationTime),
)
statusView.SetText(text)
app.Draw()
}
Expand Down
23 changes: 22 additions & 1 deletion hammer/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math/rand"
"time"

"github.com/transparency-dev/trillian-tessera/client"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -151,15 +152,26 @@ func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 {
}
}

// leafTime records the time at which a leaf was assigned the given index.
//
// This is used when sampling leaves which are added in order to later calculate
// how long it took to for them to become integrated.
type leafTime struct {
idx uint64
queuedAt time.Time
assignedAt time.Time
}

// NewLogWriter creates a LogWriter.
// u is the URL of the write endpoint for the log.
// gen is a function that generates new leaves to add.
func NewLogWriter(writer LeafWriter, gen func() []byte, throttle <-chan bool, errChan chan<- error) *LogWriter {
func NewLogWriter(writer LeafWriter, gen func() []byte, throttle <-chan bool, errChan chan<- error, leafSampleChan chan<- leafTime) *LogWriter {
return &LogWriter{
writer: writer,
gen: gen,
throttle: throttle,
errChan: errChan,
leafChan: leafSampleChan,
}
}

Expand All @@ -169,6 +181,7 @@ type LogWriter struct {
gen func() []byte
throttle <-chan bool
errChan chan<- error
leafChan chan<- leafTime
cancel func()
}

Expand All @@ -185,11 +198,19 @@ func (w *LogWriter) Run(ctx context.Context) {
case <-w.throttle:
}
newLeaf := w.gen()
lt := leafTime{queuedAt: time.Now()}
index, err := w.writer(ctx, newLeaf)
if err != nil {
w.errChan <- fmt.Errorf("failed to create request: %v", err)
continue
}
lt.idx, lt.assignedAt = index, time.Now()
// See if we can send a leaf sample
select {
// TODO: we might want to count dropped samples, and/or make sampling a bit more statistical.
case w.leafChan <- lt:
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a TODO in this case for incrementing a "dropped sample" counter. It would be useful to understand when this is happening, because it will deteriorate the stats a fair bit. Especially as it's not a statistical sample and will be measuring only the first into the pipe, which will bias the data one way or another.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
klog.V(2).Infof("Wrote leaf at index %d", index)
}
}
Expand Down
Loading