Skip to content

Commit

Permalink
All leaves read/written have indices compared
Browse files Browse the repository at this point in the history
This supports checking what number of duplicates we have seen in the
hammer.
  • Loading branch information
mhutchinson committed Jun 26, 2024
1 parent b7a3786 commit fe73fcf
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 8 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/gdamore/tcell/v2 v2.7.4
github.com/google/go-cmp v0.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/rivo/tview v0.0.0-20240413115534-b0d41c484b95
github.com/transparency-dev/formats v0.0.0-20230914071414-5732692f1e50
github.com/transparency-dev/merkle v0.0.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
Expand Down
23 changes: 20 additions & 3 deletions hammer/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// NewLeafReader creates a LeafReader.
// The next function provides a strategy for which leaves will be read.
// Custom implementations can be passed, or use RandomNextLeaf or MonotonicallyIncreasingNextLeaf.
func NewLeafReader(tracker *client.LogStateTracker, f client.Fetcher, next func(uint64) uint64, bundleSize int, throttle <-chan bool, errchan chan<- error) *LeafReader {
func NewLeafReader(tracker *client.LogStateTracker, f client.Fetcher, next func(uint64) uint64, bundleSize int, throttle <-chan bool, errchan chan<- error, leafchan chan<- Leaf) *LeafReader {
if bundleSize <= 0 {
panic("bundleSize must be > 0")
}
Expand All @@ -47,6 +47,7 @@ func NewLeafReader(tracker *client.LogStateTracker, f client.Fetcher, next func(
bundleSize: bundleSize,
throttle: throttle,
errchan: errchan,
leafchan: leafchan,
}
}

Expand All @@ -58,6 +59,7 @@ type LeafReader struct {
bundleSize int
throttle <-chan bool
errchan chan<- error
leafchan chan<- Leaf
cancel func()
c leafBundleCache
}
Expand All @@ -83,10 +85,14 @@ func (r *LeafReader) Run(ctx context.Context) {
continue
}
klog.V(2).Infof("LeafReader getting %d", i)
_, err := r.getLeaf(ctx, i, size)
data, err := r.getLeaf(ctx, i, size)
if err != nil {
r.errchan <- fmt.Errorf("failed to get leaf %d: %v", i, err)
}
r.leafchan <- Leaf{
Index: uint64(i),
Data: data,
}
}
}

Expand Down Expand Up @@ -177,13 +183,14 @@ func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 {
// NewLogWriter creates a LogWriter.
// u is the URL of the write endpoint for the log.
// gen is a function that generates new leaves to add.
func NewLogWriter(hc *http.Client, u *url.URL, gen func() []byte, throttle <-chan bool, errchan chan<- error) *LogWriter {
func NewLogWriter(hc *http.Client, u *url.URL, gen func() []byte, throttle <-chan bool, errchan chan<- error, leafchan chan<- Leaf) *LogWriter {
return &LogWriter{
hc: hc,
u: u,
gen: gen,
throttle: throttle,
errchan: errchan,
leafchan: leafchan,
}
}

Expand All @@ -194,6 +201,7 @@ type LogWriter struct {
gen func() []byte
throttle <-chan bool
errchan chan<- error
leafchan chan<- Leaf
cancel func()
}

Expand Down Expand Up @@ -245,6 +253,10 @@ func (w *LogWriter) Run(ctx context.Context) {
continue
}

w.leafchan <- Leaf{
Index: uint64(index),
Data: newLeaf,
}
klog.V(2).Infof("Wrote leaf at index %d", index)
}
}
Expand All @@ -256,3 +268,8 @@ func (w *LogWriter) Kill() {
w.cancel()
}
}

type Leaf struct {
Index uint64
Data []byte
}
61 changes: 56 additions & 5 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/gdamore/tcell/v2"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/rivo/tview"
"github.com/transparency-dev/merkle/rfc6962"
"github.com/transparency-dev/serverless-log/client"
Expand Down Expand Up @@ -151,27 +152,76 @@ func main() {
}
}

func NewLeafConsumer() *LeafConsumer {
lookup, err := lru.New[string, uint64](1024)
if err != nil {
panic(err)
}
return &LeafConsumer{
leafchan: make(chan Leaf, 256),
lookup: lookup,
}
}

// LeafConsumer eats leaves from the channel and performs analysis
// that is somewhat global. At the moment this just checks how many
// times it sees a duplicate leaf (i.e. a leaf that appears at multiple
// indices). This could be extended to measure integration time etc.
type LeafConsumer struct {
leafchan chan Leaf
lookup *lru.Cache[string, uint64]
duplicateCount uint64
}

func (c *LeafConsumer) Run(ctx context.Context) {
defer close(c.leafchan)
for {
select {
case <-ctx.Done(): //context cancelled
return
case l := <-c.leafchan:
strData := string(l.Data)
if oIdx, found := c.lookup.Get(strData); found {
if oIdx != l.Index {
c.duplicateCount++
klog.V(2).Infof("Found two indices for data %q: (%d, %d)", strData, oIdx, l.Index)
}
} else {
c.lookup.Add(strData, l.Index)
}
}
}
}

func (c *LeafConsumer) String() string {
return fmt.Sprintf("Duplicates: %d", c.duplicateCount)
}

func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.URL) *Hammer {
readThrottle := NewThrottle(*maxReadOpsPerSecond)
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)
errChan := make(chan error, 20)
leafConsumer := NewLeafConsumer()
go leafConsumer.Run(context.Background())

gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize)
randomReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, RandomNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan)
return NewLeafReader(tracker, f, RandomNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan, leafConsumer.leafchan)
})
fullReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan)
return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan, leafConsumer.leafchan)
})
writers := newWorkerPool(func() worker {
return NewLogWriter(hc, addURL, gen, writeThrottle.tokenChan, errChan, leafConsumer.leafchan)
})
writers := newWorkerPool(func() worker { return NewLogWriter(hc, addURL, gen, writeThrottle.tokenChan, errChan) })

return &Hammer{
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
leafConsumer: leafConsumer,
errChan: errChan,
}
}
Expand All @@ -183,6 +233,7 @@ type Hammer struct {
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
leafConsumer *LeafConsumer
errChan chan error
}

Expand Down Expand Up @@ -359,7 +410,7 @@ func hostUI(ctx context.Context, hammer *Hammer) {
case <-ctx.Done():
return
case <-ticker.C:
text := fmt.Sprintf("Read: %s\nWrite: %s", hammer.readThrottle.String(), hammer.writeThrottle.String())
text := fmt.Sprintf("Read: %s\nWrite: %s\nAnalysis: %s", hammer.readThrottle.String(), hammer.writeThrottle.String(), hammer.leafConsumer.String())
statusView.SetText(text)
app.Draw()
}
Expand Down

0 comments on commit fe73fcf

Please sign in to comment.