-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hammer stats #96
Hammer stats #96
Changes from all commits
d5b64df
d57ee2d
9670adb
58d83c9
73d5445
4920ba4
826f215
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ import ( | |
"strings" | ||
"time" | ||
|
||
movingaverage "github.com/RobinUS2/golang-moving-average" | ||
"github.com/gdamore/tcell/v2" | ||
"github.com/rivo/tview" | ||
"github.com/transparency-dev/trillian-tessera/client" | ||
|
@@ -103,6 +104,7 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter) | |
readThrottle := NewThrottle(*maxReadOpsPerSecond) | ||
writeThrottle := NewThrottle(*maxWriteOpsPerSecond) | ||
errChan := make(chan error, 20) | ||
leafSampleChan := make(chan leafTime, 100) | ||
|
||
gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize) | ||
randomReaders := newWorkerPool(func() worker { | ||
|
@@ -111,27 +113,33 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter) | |
fullReaders := newWorkerPool(func() worker { | ||
return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan) | ||
}) | ||
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan) }) | ||
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, leafSampleChan) }) | ||
|
||
return &Hammer{ | ||
randomReaders: randomReaders, | ||
fullReaders: fullReaders, | ||
writers: writers, | ||
readThrottle: readThrottle, | ||
writeThrottle: writeThrottle, | ||
tracker: tracker, | ||
errChan: errChan, | ||
randomReaders: randomReaders, | ||
fullReaders: fullReaders, | ||
writers: writers, | ||
readThrottle: readThrottle, | ||
writeThrottle: writeThrottle, | ||
tracker: tracker, | ||
errChan: errChan, | ||
leafSampleChan: leafSampleChan, | ||
integrationTime: movingaverage.New(30), | ||
queueTime: movingaverage.New(30), | ||
} | ||
} | ||
|
||
type Hammer struct { | ||
randomReaders workerPool | ||
fullReaders workerPool | ||
writers workerPool | ||
readThrottle *Throttle | ||
writeThrottle *Throttle | ||
tracker *client.LogStateTracker | ||
errChan chan error | ||
randomReaders workerPool | ||
fullReaders workerPool | ||
writers workerPool | ||
readThrottle *Throttle | ||
writeThrottle *Throttle | ||
tracker *client.LogStateTracker | ||
errChan chan error | ||
leafSampleChan chan leafTime | ||
queueTime *movingaverage.MovingAverage | ||
integrationTime *movingaverage.MovingAverage | ||
} | ||
|
||
func (h *Hammer) Run(ctx context.Context) { | ||
|
@@ -163,7 +171,7 @@ func (h *Hammer) Run(ctx context.Context) { | |
go h.writeThrottle.Run(ctx) | ||
|
||
go func() { | ||
tick := time.NewTicker(1 * time.Second) | ||
tick := time.NewTicker(500 * time.Millisecond) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
|
@@ -185,6 +193,57 @@ func (h *Hammer) Run(ctx context.Context) { | |
} | ||
} | ||
}() | ||
|
||
go func() { | ||
tick := time.NewTicker(100 * time.Millisecond) | ||
size := h.tracker.LatestConsistent.Size | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-tick.C: | ||
} | ||
newSize := h.tracker.LatestConsistent.Size | ||
if newSize <= size { | ||
continue | ||
} | ||
now := time.Now() | ||
totalLatency := time.Duration(0) | ||
queueLatency := time.Duration(0) | ||
numLeaves := 0 | ||
var sample *leafTime | ||
for { | ||
if sample == nil { | ||
l, ok := <-h.leafSampleChan | ||
if !ok { | ||
break | ||
} | ||
sample = &l | ||
} | ||
// Stop considering leaf times once we've caught up with that cross | ||
// either the current checkpoint or "now": | ||
// - leaves with indices beyond the tree size we're considering are not integrated yet, so we can't calculate their TTI | ||
// - leaves which were queued before "now", but not assigned by "now" should also be ignored as they don't fall into this epoch (and would contribute a -ve latency if they were included). | ||
if sample.idx >= newSize || sample.assignedAt.After(now) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this have a comment explaining why both conditions are here? I can see the first one plainly, but the second one seems like it shouldn't be possible if the first condition holds, and would thus be redundant. But I can't completely convince myself of that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, this helps. I think the second case can't happen as implemented right now though? It looks like we only feed it down the channel when it's got an index, right? |
||
break | ||
} | ||
queueLatency += sample.assignedAt.Sub(sample.queuedAt) | ||
// totalLatency is skewed towards being higher than perhaps it may technically be by: | ||
// - the tick interval of this goroutine, | ||
// - the tick interval of the goroutine which updates the LogStateTracker, | ||
// - any latency in writes to the log becoming visible for reads. | ||
// But it's probably good enough for now. | ||
totalLatency += now.Sub(sample.queuedAt) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More an observation than an actionable comment, but this total latency is somewhat skewed by the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sadly, yup - the tick in the goroutine which calculates the latency and the tick in the one which clocks the I've changed the text on the UI and added a comment. |
||
|
||
numLeaves++ | ||
sample = nil | ||
} | ||
if numLeaves > 0 { | ||
h.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves)) | ||
h.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves)) | ||
Comment on lines
+242
to
+243
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason to do it this way instead of posting the row values? This is a bit blurry because it calculates a mean here, and then the moving average will calculate averages from them (if I understand the library right). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mainly just trying to keep the size of the retained data down, and have some sort of loose idea of how the window relates to time even as throughput varies. I believe that a mean of means is the same as mean of all the things if the subsets are of equal size. Admittedly this is probably not always the case here (certainly if throughput is varying and < 100qps then it's pretty unlikely to be), but tbh with the sampling and "observed" nature of this it's probably all only really good enough for a rough guide to what's going on, but that should be enough to tell you if your log is melting and falling further and further behind, for example. I'm sure there are better/more precise ways of doing it :) |
||
} | ||
} | ||
}() | ||
} | ||
|
||
func genLeaf(n uint64, minLeafSize int) []byte { | ||
|
@@ -277,9 +336,16 @@ func (t *Throttle) String() string { | |
return fmt.Sprintf("Current max: %d/s. Oversupply in last second: %d", t.opsPerSecond, t.oversupply) | ||
} | ||
|
||
func formatMovingAverage(ma *movingaverage.MovingAverage) string { | ||
aMin, _ := ma.Min() | ||
aMax, _ := ma.Max() | ||
aAvg := ma.Avg() | ||
return fmt.Sprintf("%.0fms/%.0fms/%.0fms (min/avg/max)", aMin, aAvg, aMax) | ||
} | ||
|
||
func hostUI(ctx context.Context, hammer *Hammer) { | ||
grid := tview.NewGrid() | ||
grid.SetRows(3, 0, 10).SetColumns(0).SetBorders(true) | ||
grid.SetRows(5, 0, 10).SetColumns(0).SetBorders(true) | ||
// Status box | ||
statusView := tview.NewTextView() | ||
grid.AddItem(statusView, 0, 0, 1, 1, 0, 0, false) | ||
|
@@ -301,14 +367,30 @@ func hostUI(ctx context.Context, hammer *Hammer) { | |
grid.AddItem(helpView, 2, 0, 1, 1, 0, 0, false) | ||
|
||
app := tview.NewApplication() | ||
ticker := time.NewTicker(1 * time.Second) | ||
interval := 500 * time.Millisecond | ||
ticker := time.NewTicker(interval) | ||
go func() { | ||
lastSize := hammer.tracker.LatestConsistent.Size | ||
maSlots := int((30 * time.Second) / interval) | ||
growth := movingaverage.New(maSlots) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
text := fmt.Sprintf("Read: %s\nWrite: %s", hammer.readThrottle.String(), hammer.writeThrottle.String()) | ||
s := hammer.tracker.LatestConsistent.Size | ||
growth.Add(float64(s - lastSize)) | ||
lastSize = s | ||
qps := growth.Avg() * float64(time.Second/interval) | ||
text := fmt.Sprintf("Read: %s\nWrite: %s\nTreeSize: %d (Δ %.0fqps over %ds)\nTime-in-queue: %s\nObserved-time-to-integrate: %s", | ||
hammer.readThrottle.String(), | ||
hammer.writeThrottle.String(), | ||
s, | ||
qps, | ||
time.Duration(maSlots*int(interval))/time.Second, | ||
formatMovingAverage(hammer.queueTime), | ||
formatMovingAverage(hammer.integrationTime), | ||
) | ||
statusView.SetText(text) | ||
app.Draw() | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"errors" | ||
"fmt" | ||
"math/rand" | ||
"time" | ||
|
||
"github.com/transparency-dev/trillian-tessera/client" | ||
"k8s.io/klog/v2" | ||
|
@@ -151,15 +152,26 @@ func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 { | |
} | ||
} | ||
|
||
// leafTime records the time at which a leaf was assigned the given index. | ||
// | ||
// This is used when sampling leaves which are added in order to later calculate | ||
// how long it took to for them to become integrated. | ||
type leafTime struct { | ||
idx uint64 | ||
queuedAt time.Time | ||
assignedAt time.Time | ||
} | ||
|
||
// NewLogWriter creates a LogWriter. | ||
// u is the URL of the write endpoint for the log. | ||
// gen is a function that generates new leaves to add. | ||
func NewLogWriter(writer LeafWriter, gen func() []byte, throttle <-chan bool, errChan chan<- error) *LogWriter { | ||
func NewLogWriter(writer LeafWriter, gen func() []byte, throttle <-chan bool, errChan chan<- error, leafSampleChan chan<- leafTime) *LogWriter { | ||
return &LogWriter{ | ||
writer: writer, | ||
gen: gen, | ||
throttle: throttle, | ||
errChan: errChan, | ||
leafChan: leafSampleChan, | ||
} | ||
} | ||
|
||
|
@@ -169,6 +181,7 @@ type LogWriter struct { | |
gen func() []byte | ||
throttle <-chan bool | ||
errChan chan<- error | ||
leafChan chan<- leafTime | ||
cancel func() | ||
} | ||
|
||
|
@@ -185,11 +198,19 @@ func (w *LogWriter) Run(ctx context.Context) { | |
case <-w.throttle: | ||
} | ||
newLeaf := w.gen() | ||
lt := leafTime{queuedAt: time.Now()} | ||
index, err := w.writer(ctx, newLeaf) | ||
if err != nil { | ||
w.errChan <- fmt.Errorf("failed to create request: %v", err) | ||
continue | ||
} | ||
lt.idx, lt.assignedAt = index, time.Now() | ||
// See if we can send a leaf sample | ||
select { | ||
// TODO: we might want to count dropped samples, and/or make sampling a bit more statistical. | ||
case w.leafChan <- lt: | ||
default: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a TODO in this case for incrementing a "dropped sample" counter. It would be useful to understand when this is happening, because it will deteriorate the stats a fair bit. Especially as it's not a statistical sample and will be measuring only the first into the pipe, which will bias the data one way or another. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
klog.V(2).Infof("Wrote leaf at index %d", index) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
leafWriteStats
instead? It isn't sampled from what I can see, and it only comes from the writer workers.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, it's sampled in a very primitive sense in that the channel only takes 100 entries, and if it's full we just drop the sample.
Happy to change that approach though.