diff --git a/go.mod b/go.mod index 2119ee8..c65f105 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/gdamore/tcell/v2 v2.7.4 github.com/google/go-cmp v0.6.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/rivo/tview v0.0.0-20240413115534-b0d41c484b95 github.com/transparency-dev/formats v0.0.0-20230914071414-5732692f1e50 github.com/transparency-dev/merkle v0.0.2 diff --git a/go.sum b/go.sum index 5fc0abb..bc1b22a 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= diff --git a/hammer/clients.go b/hammer/clients.go index 0e5aa7c..6cf3bf8 100644 --- a/hammer/clients.go +++ b/hammer/clients.go @@ -36,7 +36,7 @@ import ( // NewLeafReader creates a LeafReader. // The next function provides a strategy for which leaves will be read. // Custom implementations can be passed, or use RandomNextLeaf or MonotonicallyIncreasingNextLeaf. -func NewLeafReader(tracker *client.LogStateTracker, f client.Fetcher, next func(uint64) uint64, bundleSize int, throttle <-chan bool, errchan chan<- error) *LeafReader { +func NewLeafReader(tracker *client.LogStateTracker, f client.Fetcher, next func(uint64) uint64, bundleSize int, throttle <-chan bool, errchan chan<- error, leafchan chan<- Leaf) *LeafReader { if bundleSize <= 0 { panic("bundleSize must be > 0") } @@ -47,6 +47,7 @@ func NewLeafReader(tracker *client.LogStateTracker, f client.Fetcher, next func( bundleSize: bundleSize, throttle: throttle, errchan: errchan, + leafchan: leafchan, } } @@ -58,6 +59,7 @@ type LeafReader struct { bundleSize int throttle <-chan bool errchan chan<- error + leafchan chan<- Leaf cancel func() c leafBundleCache } @@ -83,10 +85,14 @@ func (r *LeafReader) Run(ctx context.Context) { continue } klog.V(2).Infof("LeafReader getting %d", i) - _, err := r.getLeaf(ctx, i, size) + data, err := r.getLeaf(ctx, i, size) if err != nil { r.errchan <- fmt.Errorf("failed to get leaf %d: %v", i, err) } + r.leafchan <- Leaf{ + Index: uint64(i), + Data: data, + } } } @@ -177,13 +183,14 @@ func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 { // NewLogWriter creates a LogWriter. // u is the URL of the write endpoint for the log. // gen is a function that generates new leaves to add. -func NewLogWriter(hc *http.Client, u *url.URL, gen func() []byte, throttle <-chan bool, errchan chan<- error) *LogWriter { +func NewLogWriter(hc *http.Client, u *url.URL, gen func() []byte, throttle <-chan bool, errchan chan<- error, leafchan chan<- Leaf) *LogWriter { return &LogWriter{ hc: hc, u: u, gen: gen, throttle: throttle, errchan: errchan, + leafchan: leafchan, } } @@ -194,6 +201,7 @@ type LogWriter struct { gen func() []byte throttle <-chan bool errchan chan<- error + leafchan chan<- Leaf cancel func() } @@ -245,6 +253,10 @@ func (w *LogWriter) Run(ctx context.Context) { continue } + w.leafchan <- Leaf{ + Index: uint64(index), + Data: newLeaf, + } klog.V(2).Infof("Wrote leaf at index %d", index) } } @@ -256,3 +268,8 @@ func (w *LogWriter) Kill() { w.cancel() } } + +type Leaf struct { + Index uint64 + Data []byte +} diff --git a/hammer/hammer.go b/hammer/hammer.go index 6386769..013fd08 100644 --- a/hammer/hammer.go +++ b/hammer/hammer.go @@ -31,6 +31,7 @@ import ( "time" "github.com/gdamore/tcell/v2" + lru "github.com/hashicorp/golang-lru/v2" "github.com/rivo/tview" "github.com/transparency-dev/merkle/rfc6962" "github.com/transparency-dev/serverless-log/client" @@ -151,20 +152,68 @@ func main() { } } +func NewLeafConsumer() *LeafConsumer { + lookup, err := lru.New[string, uint64](1024) + if err != nil { + panic(err) + } + return &LeafConsumer{ + leafchan: make(chan Leaf, 256), + lookup: lookup, + } +} + +// LeafConsumer eats leaves from the channel and performs analysis +// that is somewhat global. At the moment this just checks how many +// times it sees a duplicate leaf (i.e. a leaf that appears at multiple +// indices). This could be extended to measure integration time etc. +type LeafConsumer struct { + leafchan chan Leaf + lookup *lru.Cache[string, uint64] + duplicateCount uint64 +} + +func (c *LeafConsumer) Run(ctx context.Context) { + defer close(c.leafchan) + for { + select { + case <-ctx.Done(): //context cancelled + return + case l := <-c.leafchan: + strData := string(l.Data) + if oIdx, found := c.lookup.Get(strData); found { + if oIdx != l.Index { + c.duplicateCount++ + klog.V(2).Infof("Found two indices for data %q: (%d, %d)", strData, oIdx, l.Index) + } + } else { + c.lookup.Add(strData, l.Index) + } + } + } +} + +func (c *LeafConsumer) String() string { + return fmt.Sprintf("Duplicates: %d", c.duplicateCount) +} + func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.URL) *Hammer { readThrottle := NewThrottle(*maxReadOpsPerSecond) writeThrottle := NewThrottle(*maxWriteOpsPerSecond) errChan := make(chan error, 20) + leafConsumer := NewLeafConsumer() + go leafConsumer.Run(context.Background()) gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize) randomReaders := newWorkerPool(func() worker { - return NewLeafReader(tracker, f, RandomNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan) + return NewLeafReader(tracker, f, RandomNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan, leafConsumer.leafchan) }) fullReaders := newWorkerPool(func() worker { - return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan) + return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan, leafConsumer.leafchan) + }) + writers := newWorkerPool(func() worker { + return NewLogWriter(hc, addURL, gen, writeThrottle.tokenChan, errChan, leafConsumer.leafchan) }) - writers := newWorkerPool(func() worker { return NewLogWriter(hc, addURL, gen, writeThrottle.tokenChan, errChan) }) - return &Hammer{ randomReaders: randomReaders, fullReaders: fullReaders, @@ -172,6 +221,7 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.UR readThrottle: readThrottle, writeThrottle: writeThrottle, tracker: tracker, + leafConsumer: leafConsumer, errChan: errChan, } } @@ -183,6 +233,7 @@ type Hammer struct { readThrottle *Throttle writeThrottle *Throttle tracker *client.LogStateTracker + leafConsumer *LeafConsumer errChan chan error } @@ -359,7 +410,7 @@ func hostUI(ctx context.Context, hammer *Hammer) { case <-ctx.Done(): return case <-ticker.C: - text := fmt.Sprintf("Read: %s\nWrite: %s", hammer.readThrottle.String(), hammer.writeThrottle.String()) + text := fmt.Sprintf("Read: %s\nWrite: %s\nAnalysis: %s", hammer.readThrottle.String(), hammer.writeThrottle.String(), hammer.leafConsumer.String()) statusView.SetText(text) app.Draw() }