Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic hammer for testing read scaling #112

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions hammer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Hammer: A load testing tool for serverless logs

## Usage

As an example for testing the serving capabilities of the Armored Witness CI log:

```bash
SERVERLESS_LOG_PUBLIC_KEY=transparency.dev-aw-ftlog-ci-2+f77c6276+AZXqiaARpwF4MoNOxx46kuiIRjrML0PDTm+c7BLaAMt6 go run ./hammer -v=2 \
--log_url=https://api.transparency.dev/armored-witness-firmware/ci/log/2/ \
--origin="transparency.dev/armored-witness/firmware_transparency/ci/2"
```

The process can be killed with <Ctrl-C>.
102 changes: 102 additions & 0 deletions hammer/clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"fmt"
"math/rand"
"time"

"github.com/transparency-dev/serverless-log/client"
"k8s.io/klog/v2"
)

// NewRandomLeafReader creates a RandomLeafReader.
func NewRandomLeafReader(tracker *client.LogStateTracker, f client.Fetcher, throttle <-chan bool, errchan chan<- error) *RandomLeafReader {
return &RandomLeafReader{
tracker: tracker,
f: f,
throttle: throttle,
errchan: errchan,
}
}

// RandomLeafReader reads random leaves across the tree.
type RandomLeafReader struct {
tracker *client.LogStateTracker
f client.Fetcher
throttle <-chan bool
errchan chan<- error
}

// Run runs the log reader. This should be called in a goroutine.
func (r *RandomLeafReader) Run(ctx context.Context) {
for {
<-r.throttle
i := uint64(rand.Int63n(int64(r.tracker.LatestConsistent.Size)))
klog.V(2).Infof("RandomLeafReader getting %d", i)
_, err := client.GetLeaf(ctx, r.f, i)
if err != nil {
r.errchan <- fmt.Errorf("Failed to get random leaf: %v", err)
}
}
}

// NewFullLogReader creates a FullLogReader.
func NewFullLogReader(tracker *client.LogStateTracker, f client.Fetcher, throttle <-chan bool, errchan chan<- error) *FullLogReader {
return &FullLogReader{
tracker: tracker,
f: f,
throttle: throttle,
errchan: errchan,

current: 0,
}
}

// FullLogReader reads the whole log from the start until the end.
type FullLogReader struct {
tracker *client.LogStateTracker
f client.Fetcher
throttle <-chan bool
errchan chan<- error

current uint64
}

// Run runs the log reader. This should be called in a goroutine.
func (r *FullLogReader) Run(ctx context.Context) {
for {
if r.current >= r.tracker.LatestConsistent.Size {
klog.V(2).Infof("FullLogReader has consumed whole log of size %d. Sleeping.", r.tracker.LatestConsistent.Size)
// Sleep a bit and then try again
select {
case <-ctx.Done(): //context cancelled
return
case <-time.After(2 * time.Second): //timeout
}
continue
}
<-r.throttle
klog.V(2).Infof("FullLeafReader getting %d", r.current)
_, err := client.GetLeaf(ctx, r.f, r.current)
if err != nil {
r.errchan <- fmt.Errorf("Failed to get next leaf: %v", err)
continue
}
r.current++
}
}
200 changes: 200 additions & 0 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright 2024 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// hammer is a tool to load test a serverless log.
package main

import (
"context"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/transparency-dev/merkle/rfc6962"
"github.com/transparency-dev/serverless-log/client"
"golang.org/x/mod/sumdb/note"
"k8s.io/klog/v2"
)

var (
logURL = flag.String("log_url", "", "Log storage root URL, e.g. https://log.server/and/path/")
logPubKeyFile = flag.String("log_public_key", "", "Location of log public key file. If unset, uses the contents of the SERVERLESS_LOG_PUBLIC_KEY environment variable")
origin = flag.String("origin", "", "Expected first line of checkpoints from log")

maxReadOpsPerSecond = flag.Int("max_read_ops", 20, "The maximum number of read operations per second")
numReadersRandom = flag.Int("num_readers_random", 4, "The number of readers looking for random leaves")
numReadersFull = flag.Int("num_readers_full", 4, "The number of readers downloading the whole log")
)

func main() {
klog.InitFlags(nil)
flag.Parse()

ctx := context.Background()

logSigV, _, err := logSigVerifier(*logPubKeyFile)
if err != nil {
klog.Exitf("failed to read log public key: %v", err)
}

u := *logURL
if len(u) == 0 {
klog.Exitf("--log_url must be provided")
}
// url must reference a directory, by definition
if !strings.HasSuffix(u, "/") {
u += "/"
}

rootURL, err := url.Parse(u)
if err != nil {
klog.Exitf("Invalid log URL: %v", err)
}

var cpRaw []byte
f := newFetcher(rootURL)
cons := client.UnilateralConsensus(f)
hasher := rfc6962.DefaultHasher
tracker, err := client.NewLogStateTracker(ctx, f, hasher, cpRaw, logSigV, *origin, cons)
if err != nil {
klog.Exitf("Failed to create LogStateTracker: %v", err)
}
// Fetch initial state of log
_, _, _, err = tracker.Update(ctx)
if err != nil {
klog.Exitf("Failed to get initial state of the log: %v", err)
}

// Kick off readers
errChan := make(chan error, 20)
throttle := make(chan bool, *maxReadOpsPerSecond)
for i := 0; i < *numReadersRandom; i++ {
go NewRandomLeafReader(&tracker, f, throttle, errChan).Run(ctx)
}
for i := 0; i < *numReadersFull; i++ {
go NewFullLogReader(&tracker, f, throttle, errChan).Run(ctx)
}

// Set up log throttle token generator
go func() {
for {
select {
case <-ctx.Done(): //context cancelled
return
case <-time.After(1 * time.Second): //timeout
}
for i := 0; i < *maxReadOpsPerSecond; i++ {
throttle <- true
}
}
}()

// Set up logging for any errors
go func() {
for {
select {
case <-ctx.Done(): //context cancelled
return
case err := <-errChan:
klog.Warning(err)
}
}
}()

// TODO(mhutchinson): Set up writing

klog.Info("It's hammer time")
<-ctx.Done()
}

// Returns a log signature verifier and the public key bytes it uses.
// Attempts to read key material from f, or uses the SERVERLESS_LOG_PUBLIC_KEY
// env var if f is unset.
func logSigVerifier(f string) (note.Verifier, []byte, error) {
var pubKey []byte
var err error
if len(f) > 0 {
pubKey, err = os.ReadFile(f)
if err != nil {
return nil, nil, fmt.Errorf("failed to read public key from file %q: %v", f, err)
}
} else {
pubKey = []byte(os.Getenv("SERVERLESS_LOG_PUBLIC_KEY"))
if len(pubKey) == 0 {
return nil, nil, fmt.Errorf("supply public key file path using --log_public_key or set SERVERLESS_LOG_PUBLIC_KEY environment variable")
}
}

v, err := note.NewVerifier(string(pubKey))
if err != nil {
return nil, nil, fmt.Errorf("failed to create verifier: %v", err)
}

return v, pubKey, nil
}

// newFetcher creates a Fetcher for the log at the given root location.
func newFetcher(root *url.URL) client.Fetcher {
get := getByScheme[root.Scheme]
if get == nil {
panic(fmt.Errorf("unsupported URL scheme %s", root.Scheme))
}

return func(ctx context.Context, p string) ([]byte, error) {
u, err := root.Parse(p)
if err != nil {
return nil, err
}
return get(ctx, u)
}
}

var getByScheme = map[string]func(context.Context, *url.URL) ([]byte, error){
"http": readHTTP,
"https": readHTTP,
"file": func(_ context.Context, u *url.URL) ([]byte, error) {
return os.ReadFile(u.Path)
},
}

func readHTTP(ctx context.Context, u *url.URL) ([]byte, error) {
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
switch resp.StatusCode {
case 404:
klog.Infof("Not found: %q", u.String())
return nil, os.ErrNotExist
case 200:
break
default:
return nil, fmt.Errorf("unexpected http status %q", resp.Status)
}
defer func() {
if err := resp.Body.Close(); err != nil {
klog.Errorf("resp.Body.Close(): %v", err)
}
}()
return io.ReadAll(resp.Body)
}
Loading