diff --git a/hammer/hammer.go b/hammer/hammer.go index 4e8eb2be4..9f0649f1e 100644 --- a/hammer/hammer.go +++ b/hammer/hammer.go @@ -171,7 +171,7 @@ func (h *Hammer) Run(ctx context.Context) { go h.writeThrottle.Run(ctx) go func() { - tick := time.NewTicker(1 * time.Second) + tick := time.NewTicker(100 * time.Millisecond) for { select { case <-ctx.Done(): @@ -190,28 +190,52 @@ func (h *Hammer) Run(ctx context.Context) { if newSize > size { klog.V(1).Infof("Updated checkpoint from %d to %d", size, newSize) } - now := time.Now() - totalLatency := time.Duration(0) - queueLatency := time.Duration(0) - numLeaves := 0 - for { + } + } + }() + + go func() { + tick := time.NewTicker(100 * time.Millisecond) + size := h.tracker.LatestConsistent.Size + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + } + newSize := h.tracker.LatestConsistent.Size + if newSize <= size { + continue + } + now := time.Now() + totalLatency := time.Duration(0) + queueLatency := time.Duration(0) + numLeaves := 0 + var sample *leafTime + for { + if sample == nil { l, ok := <-h.leafSampleChan if !ok { break } - if l.queuedAt.After(now) { - // We've caught up with everything which happened in this "epoch" - break - } - queueLatency += l.assignedAt.Sub(l.queuedAt) - totalLatency += now.Sub(l.queuedAt) - - numLeaves++ + sample = &l } + if sample.idx >= newSize || sample.queuedAt.After(now) { + break + } + queueLatency += sample.assignedAt.Sub(sample.queuedAt) + totalLatency += now.Sub(sample.queuedAt) + + numLeaves++ + sample = nil + } + if numLeaves > 0 { h.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves)) h.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves)) } } + + klog.Exitf("BAM") }() }