Skip to content

Commit

Permalink
Add sampled integration latency
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Aug 1, 2024
1 parent d5b64df commit d57ee2d
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 24 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/gdamore/tcell/v2 v2.7.4
github.com/globocom/go-buffer v1.2.2
github.com/google/go-cmp v0.6.0
github.com/mxmCherry/movavg v1.1.0
github.com/rivo/tview v0.0.0-20240625185742-b0a7293b8130
github.com/transparency-dev/formats v0.0.0-20240715203801-9ff9b9e3905f
github.com/transparency-dev/merkle v0.0.2
Expand All @@ -19,6 +18,8 @@ require (
k8s.io/klog/v2 v2.130.1
)

require github.com/RobinUS2/golang-moving-average v1.0.0 // indirect

require (
cel.dev/expr v0.15.0 // indirect
cloud.google.com/go v0.115.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,8 @@ github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 h1:oVLqHXhnYtUwM89y9T1
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0/go.mod h1:dppbR7CwXD4pgtV9t3wD1812RaLDcBjtblcDF5f1vI0=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RobinUS2/golang-moving-average v1.0.0 h1:PD7DDZNt+UFb9XlsBbTIu/DtXqqaD/MD86DYnk3mwvA=
github.com/RobinUS2/golang-moving-average v1.0.0/go.mod h1:MdzhY+KoEvi+OBygTPH0OSaKrOJzvILWN2SPQzaKVsY=
github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY=
github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
Expand Down Expand Up @@ -869,8 +871,6 @@ github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh
github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/mxmCherry/movavg v1.1.0 h1:92Ye8RKXcIaELZJH1pKSdDdh3k7+6tD6Yr+Azxwy3v8=
github.com/mxmCherry/movavg v1.1.0/go.mod h1:8Jn4ovhDwtfTnwqxAEKKHTbKXLgnozsIRg+/Ge6EoJI=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
71 changes: 51 additions & 20 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"strings"
"time"

movingaverage "github.com/RobinUS2/golang-moving-average"
"github.com/gdamore/tcell/v2"
"github.com/mxmCherry/movavg"
"github.com/rivo/tview"
"github.com/transparency-dev/trillian-tessera/client"
"golang.org/x/mod/sumdb/note"
Expand Down Expand Up @@ -104,6 +104,7 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter)
readThrottle := NewThrottle(*maxReadOpsPerSecond)
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)
errChan := make(chan error, 20)
leafSampleChan := make(chan leafTime, 100)

gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize)
randomReaders := newWorkerPool(func() worker {
Expand All @@ -112,27 +113,31 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter)
fullReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan)
})
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan) })
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, leafSampleChan) })

return &Hammer{
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
errChan: errChan,
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
errChan: errChan,
leafSampleChan: leafSampleChan,
integrationTime: movingaverage.New(10),
}
}

type Hammer struct {
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
errChan chan error
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
errChan chan error
leafSampleChan chan leafTime
integrationTime *movingaverage.MovingAverage
}

func (h *Hammer) Run(ctx context.Context) {
Expand Down Expand Up @@ -183,6 +188,21 @@ func (h *Hammer) Run(ctx context.Context) {
if newSize > size {
klog.V(1).Infof("Updated checkpoint from %d to %d", size, newSize)
}
now := time.Now()
totalLatency := time.Duration(0)
numLeaves := 0
for {
l, ok := <-h.leafSampleChan
if !ok {
break
}
if l.at.After(now) {
break
}
totalLatency += now.Sub(l.at)
numLeaves++
}
h.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
}
}
}()
Expand Down Expand Up @@ -280,7 +300,7 @@ func (t *Throttle) String() string {

func hostUI(ctx context.Context, hammer *Hammer) {
grid := tview.NewGrid()
grid.SetRows(3, 0, 10).SetColumns(0).SetBorders(true)
grid.SetRows(4, 0, 10).SetColumns(0).SetBorders(true)
// Status box
statusView := tview.NewTextView()
grid.AddItem(statusView, 0, 0, 1, 1, 0, 0, false)
Expand All @@ -302,12 +322,12 @@ func hostUI(ctx context.Context, hammer *Hammer) {
grid.AddItem(helpView, 2, 0, 1, 1, 0, 0, false)

app := tview.NewApplication()
interval := time.Second
interval := 500 * time.Millisecond
ticker := time.NewTicker(interval)
go func() {
lastSize := hammer.tracker.LatestConsistent.Size
maSlots := 10
growth := movavg.NewSMA(maSlots)
growth := movingaverage.New(maSlots)
for {
select {
case <-ctx.Done():
Expand All @@ -316,7 +336,18 @@ func hostUI(ctx context.Context, hammer *Hammer) {
s := hammer.tracker.LatestConsistent.Size
growth.Add(float64(s - lastSize))
lastSize = s
text := fmt.Sprintf("Read: %s\nWrite: %s\nTreeSize: %d (Δ %.0fqps over %ds)", hammer.readThrottle.String(), hammer.writeThrottle.String(), s, growth.Avg(), maSlots*int(interval/time.Second))
ttiMin, _ := hammer.integrationTime.Min()
ttiMax, _ := hammer.integrationTime.Max()
ttiAvg := hammer.integrationTime.Avg()
qps := growth.Avg() * float64(time.Second/interval)
text := fmt.Sprintf("Read: %s\nWrite: %s\nTreeSize: %d (Δ %.0fqps over %ds)\nTime-to-integrate: %.0fms/%.0fms/%.0fms (min/avg/max)",
hammer.readThrottle.String(),
hammer.writeThrottle.String(),
s,
qps,
time.Duration(maSlots*int(interval))/time.Second,
ttiMin, ttiAvg, ttiMax,
)
statusView.SetText(text)
app.Draw()
}
Expand Down
19 changes: 18 additions & 1 deletion hammer/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math/rand"
"time"

"github.com/transparency-dev/trillian-tessera/client"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -151,15 +152,25 @@ func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 {
}
}

// leafTime records the time at which a leaf was assigned the given index.
//
// This is used when sampling leaves which are added in order to later calculate
// how long it took to for them to become integrated.
type leafTime struct {
idx uint64
at time.Time
}

// NewLogWriter creates a LogWriter.
// u is the URL of the write endpoint for the log.
// gen is a function that generates new leaves to add.
func NewLogWriter(writer LeafWriter, gen func() []byte, throttle <-chan bool, errChan chan<- error) *LogWriter {
func NewLogWriter(writer LeafWriter, gen func() []byte, throttle <-chan bool, errChan chan<- error, leafSampleChan chan<- leafTime) *LogWriter {
return &LogWriter{
writer: writer,
gen: gen,
throttle: throttle,
errChan: errChan,
leafChan: leafSampleChan,
}
}

Expand All @@ -169,6 +180,7 @@ type LogWriter struct {
gen func() []byte
throttle <-chan bool
errChan chan<- error
leafChan chan<- leafTime
cancel func()
}

Expand All @@ -190,6 +202,11 @@ func (w *LogWriter) Run(ctx context.Context) {
w.errChan <- fmt.Errorf("failed to create request: %v", err)
continue
}
// See if we can send a leaf sample
select {
case w.leafChan <- leafTime{idx: index, at: time.Now()}:
default:
}
klog.V(2).Infof("Wrote leaf at index %d", index)
}
}
Expand Down

0 comments on commit d57ee2d

Please sign in to comment.