Skip to content

Commit

Permalink
feat(otelbench.promrw.replay): add --report flag
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Jan 14, 2024
1 parent 6b70a4c commit 7ec5a62
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 10 deletions.
4 changes: 2 additions & 2 deletions cmd/otelbench/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
"github.com/go-faster/errors"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/prometheus/prompb"
"github.com/spf13/cobra"
"go.uber.org/atomic"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/otelbench"
"github.com/go-faster/oteldb/internal/prompb"
)

type Record struct {
Expand Down Expand Up @@ -81,7 +81,7 @@ func (r *Record) read(req *http.Request) ([]byte, error) {
}

func fmtInt(v int) string {
s := humanize.SIWithDigits(float64(v), 0, "")
s := humanize.SIWithDigits(float64(v), 2, "")
s = strings.ReplaceAll(s, " ", "")
return s
}
Expand Down
89 changes: 81 additions & 8 deletions cmd/otelbench/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,65 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/go-faster/errors"
"github.com/go-faster/yaml"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"github.com/schollz/progressbar/v3"
"github.com/spf13/cobra"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/otelbench"
"github.com/go-faster/oteldb/internal/prompb"
)

type Replay struct {
Target string
Source string
Encoding string
Workers int
Target string
Source string
Encoding string
ReportPath string
Workers int

points atomic.Uint64
}

func (r *Replay) report(compressedData []byte) error {
if r.ReportPath == "" {
return nil
}
var (
data []byte
err error
)
switch r.Encoding {
case "zstd":
reader, err := zstd.NewReader(bytes.NewReader(compressedData))
if err != nil {
return errors.Wrap(err, "create decoder")
}
defer reader.Close()
if data, err = io.ReadAll(reader); err != nil {
return errors.Wrap(err, "read data")
}
case "snappy":
if data, err = snappy.Decode(data, compressedData); err != nil {
return errors.Wrap(err, "decode data")
}
default:
return errors.Errorf("unsupported encoding %q", r.Encoding)
}
var writeRequest prompb.WriteRequest
if err := writeRequest.Unmarshal(data); err != nil {
return errors.Wrap(err, "unmarshal request")
}
r.points.Add(uint64(len(writeRequest.Timeseries)))
return nil
}

type ReplayReport struct {
Points int `yaml:"points" json:"points"`
DurationNanos int64 `yaml:"duration_nanos" json:"durationNanos"`
PointsPerSecond int `yaml:"points_per_second" json:"pointsPerSecond"`
}

func (r *Replay) Run(ctx context.Context) error {
Expand Down Expand Up @@ -78,6 +125,7 @@ func (r *Replay) Run(ctx context.Context) error {

g, ctx := errgroup.WithContext(ctx)
inputs := make(chan []byte)
start := time.Now()
for i := 0; i < r.Workers; i++ {
g.Go(func() error {
for {
Expand All @@ -94,6 +142,9 @@ func (r *Replay) Run(ctx context.Context) error {
if err := backoff.Retry(do, backoff.WithContext(bo, ctx)); err != nil {
return errors.Wrap(err, "retry")
}
if err := r.report(data); err != nil {
return errors.Wrap(err, "report")
}
}
}
})
Expand Down Expand Up @@ -121,6 +172,26 @@ func (r *Replay) Run(ctx context.Context) error {
}
_ = pb.Finish()
fmt.Println("Done")
duration := time.Since(start).Round(time.Millisecond)
fmt.Println("Duration:", duration)
if r.ReportPath != "" {
fmt.Println("Points:", fmtInt(int(r.points.Load())))
pointsPerSecond := float64(r.points.Load()) / duration.Seconds()
fmt.Println("Per second:", fmtInt(int(pointsPerSecond)))
rep := ReplayReport{
Points: int(r.points.Load()),
DurationNanos: duration.Nanoseconds(),
PointsPerSecond: int(pointsPerSecond),
}
data, err := yaml.Marshal(rep)
if err != nil {
return errors.Wrap(err, "marshal report")
}
// #nosec G306
if err := os.WriteFile(r.ReportPath, data, 0644); err != nil {
return errors.Wrap(err, "write report")
}
}
return nil
}

Expand All @@ -135,9 +206,11 @@ func newReplayCommand() *cobra.Command {
return replay.Run(cmd.Context())
},
}
cmd.Flags().StringVar(&replay.Target, "target", "http://127.0.0.1:19291", "Target server")
cmd.Flags().StringVarP(&replay.Source, "input", "i", "requests.rwq", "Source file")
cmd.Flags().IntVarP(&replay.Workers, "workers", "j", 8, "Number of workers")
cmd.Flags().StringVar(&replay.Encoding, "encoding", "zstd", "Encoding")
f := cmd.Flags()
f.StringVar(&replay.Target, "target", "http://127.0.0.1:19291", "Target server")
f.StringVarP(&replay.Source, "input", "i", "requests.rwq", "Source file")
f.IntVarP(&replay.Workers, "workers", "j", 8, "Number of workers")
f.StringVar(&replay.Encoding, "encoding", "zstd", "Encoding")
f.StringVar(&replay.ReportPath, "report", "", "Report path")
return cmd
}

0 comments on commit 7ec5a62

Please sign in to comment.