Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the readers to have a single reader class #121

Merged
merged 4 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 55 additions & 96 deletions hammer/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,44 @@ import (
"os"
"path/filepath"
"strconv"
"time"

"github.com/transparency-dev/serverless-log/api/layout"
"github.com/transparency-dev/serverless-log/client"
"k8s.io/klog/v2"
)

// NewRandomLeafReader creates a RandomLeafReader.
func NewRandomLeafReader(tracker *client.LogStateTracker, f client.Fetcher, bundleSize int, throttle <-chan bool, errchan chan<- error) *RandomLeafReader {
// NewLeafReader creates a LeafReader.
// The next function provides a strategy for which leaves will be read.
// Custom implementations can be passed, or use RandomNextLeaf or MonotonicallyIncreasingNextLeaf.
func NewLeafReader(tracker *client.LogStateTracker, f client.Fetcher, next func(uint64) uint64, bundleSize int, throttle <-chan bool, errchan chan<- error) *LeafReader {
mhutchinson marked this conversation as resolved.
Show resolved Hide resolved
if bundleSize <= 0 {
panic("bundleSize must be > 0")
}
return &RandomLeafReader{
return &LeafReader{
tracker: tracker,
f: f,
next: next,
bundleSize: bundleSize,
throttle: throttle,
errchan: errchan,
}
}

// RandomLeafReader reads random leaves across the tree.
type RandomLeafReader struct {
// LeafReader reads leaves from the tree.
type LeafReader struct {
tracker *client.LogStateTracker
f client.Fetcher
next func(uint64) uint64
bundleSize int
throttle <-chan bool
errchan chan<- error
cancel func()
}

// Run runs the log reader. This should be called in a goroutine.
func (r *RandomLeafReader) Run(ctx context.Context) {
func (r *LeafReader) Run(ctx context.Context) {
if r.cancel != nil {
panic("RandomLeafReader was ran multiple times")
panic("LeafReader was ran multiple times")
}
ctx, r.cancel = context.WithCancel(ctx)
for {
Expand All @@ -74,107 +77,34 @@ func (r *RandomLeafReader) Run(ctx context.Context) {
if size == 0 {
continue
}
i := uint64(rand.Int63n(int64(size)))
klog.V(2).Infof("RandomLeafReader getting %d", i)
_, err := getLeaf(ctx, r.f, i, r.tracker.LatestConsistent.Size, r.bundleSize)
if err != nil {
r.errchan <- fmt.Errorf("Failed to get random leaf: %v", err)
}
}
}

// Kills this leaf reader at the next opportune moment.
// This function may return before the reader is dead.
func (r *RandomLeafReader) Kill() {
if r.cancel != nil {
r.cancel()
}
}

// NewFullLogReader creates a FullLogReader.
func NewFullLogReader(tracker *client.LogStateTracker, f client.Fetcher, bundleSize int, throttle <-chan bool, errchan chan<- error) *FullLogReader {
if bundleSize <= 0 {
panic("bundleSize must be > 0")
}
return &FullLogReader{
tracker: tracker,
f: f,
bundleSize: bundleSize,
throttle: throttle,
errchan: errchan,

current: 0,
}
}

// FullLogReader reads the whole log from the start until the end.
type FullLogReader struct {
tracker *client.LogStateTracker
f client.Fetcher
bundleSize int
throttle <-chan bool
errchan chan<- error
cancel func()

current uint64
}

// Run runs the log reader. This should be called in a goroutine.
func (r *FullLogReader) Run(ctx context.Context) {
if r.cancel != nil {
panic("FullLogReader was ran multiple times")
}
ctx, r.cancel = context.WithCancel(ctx)
for {
if r.current >= r.tracker.LatestConsistent.Size {
klog.V(2).Infof("FullLogReader has consumed whole log of size %d. Sleeping.", r.tracker.LatestConsistent.Size)
// Sleep a bit and then try again
select {
case <-ctx.Done(): //context cancelled
return
case <-time.After(2 * time.Second): //timeout
}
i := r.next(size)
if i >= size {
continue
}
select {
case <-ctx.Done():
return
case <-r.throttle:
}
klog.V(2).Infof("FullLogReader getting %d", r.current)
_, err := getLeaf(ctx, r.f, r.current, r.tracker.LatestConsistent.Size, r.bundleSize)
klog.V(2).Infof("LeafReader getting %d", i)
_, err := r.getLeaf(ctx, i, size)
if err != nil {
r.errchan <- fmt.Errorf("Failed to get next leaf: %v", err)
continue
r.errchan <- fmt.Errorf("failed to get leaf: %v", err)
}
r.current++
}
}

// Kills this leaf reader at the next opportune moment.
// This function may return before the reader is dead.
func (r *FullLogReader) Kill() {
if r.cancel != nil {
r.cancel()
}
}

// getLeaf fetches the raw contents committed to at a given leaf index.
func getLeaf(ctx context.Context, f client.Fetcher, i uint64, logSize uint64, bundleSize int) ([]byte, error) {
func (r *LeafReader) getLeaf(ctx context.Context, i uint64, logSize uint64) ([]byte, error) {
if i >= logSize {
return nil, fmt.Errorf("requested leaf %d >= log size %d", i, logSize)
}
bi := i / uint64(bundleSize)
bi := i / uint64(r.bundleSize)
br := uint64(0)
// Check for partial leaf bundle
if bi == logSize/uint64(bundleSize) {
br = logSize % uint64(bundleSize)
if bi == logSize/uint64(r.bundleSize) {
br = logSize % uint64(r.bundleSize)
}
p := filepath.Join(layout.SeqPath("", bi))
if br > 0 {
p += fmt.Sprintf(".%d", br)
}
bRaw, err := f(ctx, p)
bRaw, err := r.f(ctx, p)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("leaf index %d not found: %w", i, err)
Expand All @@ -189,6 +119,35 @@ func getLeaf(ctx context.Context, f client.Fetcher, i uint64, logSize uint64, bu
return base64.StdEncoding.DecodeString(string(bs[br]))
}

// Kills this leaf reader at the next opportune moment.
// This function may return before the reader is dead.
func (r *LeafReader) Kill() {
if r.cancel != nil {
r.cancel()
}
}

// RandomNextLeaf returns a function that fetches a random leaf available in the tree.
func RandomNextLeaf() func(uint64) uint64 {
return func(size uint64) uint64 {
return uint64(rand.Int63n(int64(size)))
}
}

// MonotonicallyIncreasingNextLeaf returns a function that always wants the next available
// leaf after the one it previously fetched. It starts at leaf 0.
func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 {
var i uint64
return func(size uint64) uint64 {
if i < size {
r := i
i++
return r
}
return size
}
}

// NewLogWriter creates a LogWriter.
// u is the URL of the write endpoint for the log.
// gen is a function that generates new leaves to add.
Expand Down Expand Up @@ -225,26 +184,26 @@ func (r *LogWriter) Run(ctx context.Context) {
newLeaf := r.gen()
resp, err := http.Post(r.u.String(), "application/octet-stream", bytes.NewReader(newLeaf))
if err != nil {
r.errchan <- fmt.Errorf("Failed to write leaf: %v", err)
r.errchan <- fmt.Errorf("failed to write leaf: %v", err)
continue
}
body, err := io.ReadAll(resp.Body)
if err != nil {
r.errchan <- fmt.Errorf("Failed to read body: %v", err)
r.errchan <- fmt.Errorf("failed to read body: %v", err)
continue
}
if resp.StatusCode != http.StatusOK {
r.errchan <- fmt.Errorf("Write leaf was not OK. Status code: %d. Body: %q", resp.StatusCode, body)
r.errchan <- fmt.Errorf("write leaf was not OK. Status code: %d. Body: %q", resp.StatusCode, body)
continue
}
if resp.Request.Method != http.MethodPost {
r.errchan <- fmt.Errorf("Write leaf was redirected to %s", resp.Request.URL)
r.errchan <- fmt.Errorf("write leaf was redirected to %s", resp.Request.URL)
continue
}
parts := bytes.Split(body, []byte("\n"))
index, err := strconv.Atoi(string(parts[0]))
if err != nil {
r.errchan <- fmt.Errorf("Write leaf failed to parse response: %v", body)
r.errchan <- fmt.Errorf("write leaf failed to parse response: %v", body)
continue
}

Expand Down
12 changes: 6 additions & 6 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.UR
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)
errChan := make(chan error, 20)

randomReaders := make([]*RandomLeafReader, *numReadersRandom)
fullReaders := make([]*FullLogReader, *numReadersFull)
randomReaders := make([]*LeafReader, *numReadersRandom)
fullReaders := make([]*LeafReader, *numReadersFull)
writers := make([]*LogWriter, *numWriters)
for i := 0; i < *numReadersRandom; i++ {
randomReaders[i] = NewRandomLeafReader(tracker, f, *leafBundleSize, readThrottle.tokenChan, errChan)
randomReaders[i] = NewLeafReader(tracker, f, RandomNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan)
}
for i := 0; i < *numReadersFull; i++ {
fullReaders[i] = NewFullLogReader(tracker, f, *leafBundleSize, readThrottle.tokenChan, errChan)
fullReaders[i] = NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), *leafBundleSize, readThrottle.tokenChan, errChan)
}
for i := 0; i < *numWriters; i++ {
writers[i] = NewLogWriter(addURL, gen, writeThrottle.tokenChan, errChan)
Expand All @@ -141,8 +141,8 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.UR
}

type Hammer struct {
randomReaders []*RandomLeafReader
fullReaders []*FullLogReader
randomReaders []*LeafReader
fullReaders []*LeafReader
writers []*LogWriter
readThrottle *Throttle
writeThrottle *Throttle
Expand Down
Loading