diff --git a/cmd/otelbench/record.go b/cmd/otelbench/record.go index bcf98640..0f176ce0 100644 --- a/cmd/otelbench/record.go +++ b/cmd/otelbench/record.go @@ -14,13 +14,13 @@ import ( "github.com/go-faster/errors" "github.com/klauspost/compress/snappy" "github.com/klauspost/compress/zstd" - "github.com/prometheus/prometheus/prompb" "github.com/spf13/cobra" "go.uber.org/atomic" "go.uber.org/multierr" "golang.org/x/sync/errgroup" "github.com/go-faster/oteldb/internal/otelbench" + "github.com/go-faster/oteldb/internal/prompb" ) type Record struct { @@ -81,7 +81,7 @@ func (r *Record) read(req *http.Request) ([]byte, error) { } func fmtInt(v int) string { - s := humanize.SIWithDigits(float64(v), 0, "") + s := humanize.SIWithDigits(float64(v), 2, "") s = strings.ReplaceAll(s, " ", "") return s } diff --git a/cmd/otelbench/replay.go b/cmd/otelbench/replay.go index 91315abf..c5e9cbf2 100644 --- a/cmd/otelbench/replay.go +++ b/cmd/otelbench/replay.go @@ -12,18 +12,65 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/go-faster/errors" + "github.com/go-faster/yaml" + "github.com/klauspost/compress/snappy" + "github.com/klauspost/compress/zstd" "github.com/schollz/progressbar/v3" "github.com/spf13/cobra" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/go-faster/oteldb/internal/otelbench" + "github.com/go-faster/oteldb/internal/prompb" ) type Replay struct { - Target string - Source string - Encoding string - Workers int + Target string + Source string + Encoding string + ReportPath string + Workers int + + points atomic.Uint64 +} + +func (r *Replay) report(compressedData []byte) error { + if r.ReportPath == "" { + return nil + } + var ( + data []byte + err error + ) + switch r.Encoding { + case "zstd": + reader, err := zstd.NewReader(bytes.NewReader(compressedData)) + if err != nil { + return errors.Wrap(err, "create decoder") + } + defer reader.Close() + if data, err = io.ReadAll(reader); err != nil { + return errors.Wrap(err, "read data") + } + case "snappy": + if data, err = snappy.Decode(data, compressedData); err != nil { + return errors.Wrap(err, "decode data") + } + default: + return errors.Errorf("unsupported encoding %q", r.Encoding) + } + var writeRequest prompb.WriteRequest + if err := writeRequest.Unmarshal(data); err != nil { + return errors.Wrap(err, "unmarshal request") + } + r.points.Add(uint64(len(writeRequest.Timeseries))) + return nil +} + +type ReplayReport struct { + Points int `yaml:"points" json:"points"` + DurationNanos int64 `yaml:"duration_nanos" json:"durationNanos"` + PointsPerSecond int `yaml:"points_per_second" json:"pointsPerSecond"` } func (r *Replay) Run(ctx context.Context) error { @@ -78,6 +125,7 @@ func (r *Replay) Run(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) inputs := make(chan []byte) + start := time.Now() for i := 0; i < r.Workers; i++ { g.Go(func() error { for { @@ -94,6 +142,9 @@ func (r *Replay) Run(ctx context.Context) error { if err := backoff.Retry(do, backoff.WithContext(bo, ctx)); err != nil { return errors.Wrap(err, "retry") } + if err := r.report(data); err != nil { + return errors.Wrap(err, "report") + } } } }) @@ -121,6 +172,26 @@ func (r *Replay) Run(ctx context.Context) error { } _ = pb.Finish() fmt.Println("Done") + duration := time.Since(start).Round(time.Millisecond) + fmt.Println("Duration:", duration) + if r.ReportPath != "" { + fmt.Println("Points:", fmtInt(int(r.points.Load()))) + pointsPerSecond := float64(r.points.Load()) / duration.Seconds() + fmt.Println("Per second:", fmtInt(int(pointsPerSecond))) + rep := ReplayReport{ + Points: int(r.points.Load()), + DurationNanos: duration.Nanoseconds(), + PointsPerSecond: int(pointsPerSecond), + } + data, err := yaml.Marshal(rep) + if err != nil { + return errors.Wrap(err, "marshal report") + } + // #nosec G306 + if err := os.WriteFile(r.ReportPath, data, 0644); err != nil { + return errors.Wrap(err, "write report") + } + } return nil } @@ -135,9 +206,11 @@ func newReplayCommand() *cobra.Command { return replay.Run(cmd.Context()) }, } - cmd.Flags().StringVar(&replay.Target, "target", "http://127.0.0.1:19291", "Target server") - cmd.Flags().StringVarP(&replay.Source, "input", "i", "requests.rwq", "Source file") - cmd.Flags().IntVarP(&replay.Workers, "workers", "j", 8, "Number of workers") - cmd.Flags().StringVar(&replay.Encoding, "encoding", "zstd", "Encoding") + f := cmd.Flags() + f.StringVar(&replay.Target, "target", "http://127.0.0.1:19291", "Target server") + f.StringVarP(&replay.Source, "input", "i", "requests.rwq", "Source file") + f.IntVarP(&replay.Workers, "workers", "j", 8, "Number of workers") + f.StringVar(&replay.Encoding, "encoding", "zstd", "Encoding") + f.StringVar(&replay.ReportPath, "report", "", "Report path") return cmd }