Skip to content

Commit

Permalink
Refactor the readers to have a single reader class
Browse files Browse the repository at this point in the history
The strategy for choosing the next leaf to read is now dictated by a
function. This allows much of the logic to be simplified and the getLeaf
function can now become an method of the reader, which avoids needing to
pass in as many parameters.
  • Loading branch information
mhutchinson committed Apr 23, 2024
1 parent 28d6c98 commit 2eba0ae
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 97 deletions.
139 changes: 48 additions & 91 deletions hammer/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,42 @@ import (
"os"
"path/filepath"
"strconv"
"time"

"github.com/transparency-dev/serverless-log/api/layout"
"github.com/transparency-dev/serverless-log/client"
"k8s.io/klog/v2"
)

// NewRandomLeafReader creates a RandomLeafReader.
func NewRandomLeafReader(tracker *client.LogStateTracker, f client.Fetcher, bundleSize int, throttle <-chan bool, errchan chan<- error) *RandomLeafReader {
// NewLeafReader creates a LeafReader.
func NewLeafReader(tracker *client.LogStateTracker, f client.Fetcher, next func(uint64) uint64, bundleSize int, throttle <-chan bool, errchan chan<- error) *LeafReader {
if bundleSize <= 0 {
panic("bundleSize must be > 0")
}
return &RandomLeafReader{
return &LeafReader{
tracker: tracker,
f: f,
next: next,
bundleSize: bundleSize,
throttle: throttle,
errchan: errchan,
}
}

// RandomLeafReader reads random leaves across the tree.
type RandomLeafReader struct {
// LeafReader reads random leaves across the tree.
type LeafReader struct {
tracker *client.LogStateTracker
f client.Fetcher
next func(uint64) uint64
bundleSize int
throttle <-chan bool
errchan chan<- error
cancel func()
}

// Run runs the log reader. This should be called in a goroutine.
func (r *RandomLeafReader) Run(ctx context.Context) {
func (r *LeafReader) Run(ctx context.Context) {
if r.cancel != nil {
panic("RandomLeafReader was ran multiple times")
panic("LeafReader was ran multiple times")
}
ctx, r.cancel = context.WithCancel(ctx)
for {
Expand All @@ -74,107 +75,34 @@ func (r *RandomLeafReader) Run(ctx context.Context) {
if size == 0 {
continue
}
i := uint64(rand.Int63n(int64(size)))
klog.V(2).Infof("RandomLeafReader getting %d", i)
_, err := getLeaf(ctx, r.f, i, r.tracker.LatestConsistent.Size, r.bundleSize)
if err != nil {
r.errchan <- fmt.Errorf("Failed to get random leaf: %v", err)
}
}
}

// Kills this leaf reader at the next opportune moment.
// This function may return before the reader is dead.
func (r *RandomLeafReader) Kill() {
if r.cancel != nil {
r.cancel()
}
}

// NewFullLogReader creates a FullLogReader.
func NewFullLogReader(tracker *client.LogStateTracker, f client.Fetcher, bundleSize int, throttle <-chan bool, errchan chan<- error) *FullLogReader {
if bundleSize <= 0 {
panic("bundleSize must be > 0")
}
return &FullLogReader{
tracker: tracker,
f: f,
bundleSize: bundleSize,
throttle: throttle,
errchan: errchan,

current: 0,
}
}

// FullLogReader reads the whole log from the start until the end.
type FullLogReader struct {
tracker *client.LogStateTracker
f client.Fetcher
bundleSize int
throttle <-chan bool
errchan chan<- error
cancel func()

current uint64
}

// Run runs the log reader. This should be called in a goroutine.
func (r *FullLogReader) Run(ctx context.Context) {
if r.cancel != nil {
panic("FullLogReader was ran multiple times")
}
ctx, r.cancel = context.WithCancel(ctx)
for {
if r.current >= r.tracker.LatestConsistent.Size {
klog.V(2).Infof("FullLogReader has consumed whole log of size %d. Sleeping.", r.tracker.LatestConsistent.Size)
// Sleep a bit and then try again
select {
case <-ctx.Done(): //context cancelled
return
case <-time.After(2 * time.Second): //timeout
}
i := r.next(size)
if i == size {
continue
}
select {
case <-ctx.Done():
return
case <-r.throttle:
}
klog.V(2).Infof("FullLogReader getting %d", r.current)
_, err := getLeaf(ctx, r.f, r.current, r.tracker.LatestConsistent.Size, r.bundleSize)
klog.V(2).Infof("LeafReader getting %d", i)
_, err := r.getLeaf(ctx, i, size)
if err != nil {
r.errchan <- fmt.Errorf("Failed to get next leaf: %v", err)
continue
r.errchan <- fmt.Errorf("Failed to get random leaf: %v", err)
}
r.current++
}
}

// Kills this leaf reader at the next opportune moment.
// This function may return before the reader is dead.
func (r *FullLogReader) Kill() {
if r.cancel != nil {
r.cancel()
}
}

// getLeaf fetches the raw contents committed to at a given leaf index.
func getLeaf(ctx context.Context, f client.Fetcher, i uint64, logSize uint64, bundleSize int) ([]byte, error) {
func (r *LeafReader) getLeaf(ctx context.Context, i uint64, logSize uint64) ([]byte, error) {
if i >= logSize {
return nil, fmt.Errorf("requested leaf %d >= log size %d", i, logSize)
}
bi := i / uint64(bundleSize)
bi := i / uint64(r.bundleSize)
br := uint64(0)
// Check for partial leaf bundle
if bi == logSize/uint64(bundleSize) {
br = logSize % uint64(bundleSize)
if bi == logSize/uint64(r.bundleSize) {
br = logSize % uint64(r.bundleSize)
}
p := filepath.Join(layout.SeqPath("", bi))
if br > 0 {
p += fmt.Sprintf(".%d", br)
}
bRaw, err := f(ctx, p)
bRaw, err := r.f(ctx, p)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("leaf index %d not found: %w", i, err)
Expand All @@ -189,6 +117,35 @@ func getLeaf(ctx context.Context, f client.Fetcher, i uint64, logSize uint64, bu
return base64.StdEncoding.DecodeString(string(bs[br]))
}

// Kills this leaf reader at the next opportune moment.
// This function may return before the reader is dead.
func (r *LeafReader) Kill() {
if r.cancel != nil {
r.cancel()
}
}

// RandomNextLeaf returns a function that fetches a random leaf available in the tree.
func RandomNextLeaf() func(uint64) uint64 {
return func(size uint64) uint64 {
return uint64(rand.Int63n(int64(size)))
}
}

// MonotonicallyIncreasingNextLeaf returns a function that always wants the next available
// leaf after the one it previously fetched. It starts at leaf 0.
func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 {
var i uint64
return func(size uint64) uint64 {
if i < size {
r := i
i++
return r
}
return size
}
}

// NewLogWriter creates a LogWriter.
// u is the URL of the write endpoint for the log.
// gen is a function that generates new leaves to add.
Expand Down
12 changes: 6 additions & 6 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.UR
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)
errChan := make(chan error, 20)

randomReaders := make([]*RandomLeafReader, *numReadersRandom)
fullReaders := make([]*FullLogReader, *numReadersFull)
randomReaders := make([]*LeafReader, *numReadersRandom)
fullReaders := make([]*LeafReader, *numReadersFull)
writers := make([]*LogWriter, *numWriters)
for i := 0; i < *numReadersRandom; i++ {
randomReaders[i] = NewRandomLeafReader(tracker, f, *leafBundleSize, readThrottle.tokenChan, errChan)
randomReaders[i] = NewLeafReader(tracker, f, RandomNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan)
}
for i := 0; i < *numReadersFull; i++ {
fullReaders[i] = NewFullLogReader(tracker, f, *leafBundleSize, readThrottle.tokenChan, errChan)
fullReaders[i] = NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan)
}
for i := 0; i < *numWriters; i++ {
writers[i] = NewLogWriter(addURL, gen, writeThrottle.tokenChan, errChan)
Expand All @@ -141,8 +141,8 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.UR
}

type Hammer struct {
randomReaders []*RandomLeafReader
fullReaders []*FullLogReader
randomReaders []*LeafReader
fullReaders []*LeafReader
writers []*LogWriter
readThrottle *Throttle
writeThrottle *Throttle
Expand Down

0 comments on commit 2eba0ae

Please sign in to comment.