Skip to content

Commit

Permalink
Add queued time stats
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Aug 1, 2024
1 parent eac66b3 commit 3a3672d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
28 changes: 20 additions & 8 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter)
errChan: errChan,
leafSampleChan: leafSampleChan,
integrationTime: movingaverage.New(30),
queueTime: movingaverage.New(30),
}
}

Expand All @@ -137,6 +138,7 @@ type Hammer struct {
tracker *client.LogStateTracker
errChan chan error
leafSampleChan chan leafTime
queueTime *movingaverage.MovingAverage
integrationTime *movingaverage.MovingAverage
}

Expand Down Expand Up @@ -190,19 +192,24 @@ func (h *Hammer) Run(ctx context.Context) {
}
now := time.Now()
totalLatency := time.Duration(0)
queueLatency := time.Duration(0)
numLeaves := 0
for {
l, ok := <-h.leafSampleChan
if !ok {
break
}
if l.at.After(now) {
if l.queuedAt.After(now) {
// We've caught up with everything which happened in this "epoch"
break
}
totalLatency += now.Sub(l.at)
queueLatency += l.assignedAt.Sub(l.queuedAt)
totalLatency += now.Sub(l.queuedAt)

numLeaves++
}
h.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
h.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves))
}
}
}()
Expand Down Expand Up @@ -298,9 +305,16 @@ func (t *Throttle) String() string {
return fmt.Sprintf("Current max: %d/s. Oversupply in last second: %d", t.opsPerSecond, t.oversupply)
}

func formatMovingAverage(ma *movingaverage.MovingAverage) string {
aMin, _ := ma.Min()
aMax, _ := ma.Max()
aAvg := ma.Avg()
return fmt.Sprintf("%.0fms/%.0fms/%.0fms (min/avg/max)", aMin, aAvg, aMax)
}

func hostUI(ctx context.Context, hammer *Hammer) {
grid := tview.NewGrid()
grid.SetRows(4, 0, 10).SetColumns(0).SetBorders(true)
grid.SetRows(5, 0, 10).SetColumns(0).SetBorders(true)
// Status box
statusView := tview.NewTextView()
grid.AddItem(statusView, 0, 0, 1, 1, 0, 0, false)
Expand Down Expand Up @@ -336,17 +350,15 @@ func hostUI(ctx context.Context, hammer *Hammer) {
s := hammer.tracker.LatestConsistent.Size
growth.Add(float64(s - lastSize))
lastSize = s
ttiMin, _ := hammer.integrationTime.Min()
ttiMax, _ := hammer.integrationTime.Max()
ttiAvg := hammer.integrationTime.Avg()
qps := growth.Avg() * float64(time.Second/interval)
text := fmt.Sprintf("Read: %s\nWrite: %s\nTreeSize: %d (Δ %.0fqps over %ds)\nTime-to-integrate: %.0fms/%.0fms/%.0fms (min/avg/max)",
text := fmt.Sprintf("Read: %s\nWrite: %s\nTreeSize: %d (Δ %.0fqps over %ds)\nTime-in-queue: %s\nTotal-time-to-integrate: %s",
hammer.readThrottle.String(),
hammer.writeThrottle.String(),
s,
qps,
time.Duration(maSlots*int(interval))/time.Second,
ttiMin, ttiAvg, ttiMax,
formatMovingAverage(hammer.queueTime),
formatMovingAverage(hammer.integrationTime),
)
statusView.SetText(text)
app.Draw()
Expand Down
9 changes: 6 additions & 3 deletions hammer/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 {
// This is used when sampling leaves which are added in order to later calculate
// how long it took to for them to become integrated.
type leafTime struct {
idx uint64
at time.Time
idx uint64
queuedAt time.Time
assignedAt time.Time
}

// NewLogWriter creates a LogWriter.
Expand Down Expand Up @@ -197,14 +198,16 @@ func (w *LogWriter) Run(ctx context.Context) {
case <-w.throttle:
}
newLeaf := w.gen()
lt := leafTime{queuedAt: time.Now()}
index, err := w.writer(ctx, newLeaf)
if err != nil {
w.errChan <- fmt.Errorf("failed to create request: %v", err)
continue
}
lt.idx, lt.assignedAt = index, time.Now()
// See if we can send a leaf sample
select {
case w.leafChan <- leafTime{idx: index, at: time.Now()}:
case w.leafChan <- lt:
default:
}
klog.V(2).Infof("Wrote leaf at index %d", index)
Expand Down

0 comments on commit 3a3672d

Please sign in to comment.