From d4fff887b899e811b21a07b9fd93f73b9c0373b5 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Thu, 23 May 2024 18:50:39 +0200 Subject: [PATCH] Align config options with OTEL, HTTP exporter --- README.md | 33 ++++++- go.mod | 3 +- go.sum | 2 + pkg/opentelemetry/config.go | 158 +++++++++++++++++++++++++------ pkg/opentelemetry/config_test.go | 36 +++++-- pkg/opentelemetry/exporter.go | 53 +++++++++++ pkg/opentelemetry/output.go | 21 ++-- 7 files changed, 248 insertions(+), 58 deletions(-) create mode 100644 pkg/opentelemetry/exporter.go diff --git a/README.md b/README.md index 9aa9740..0067014 100644 --- a/README.md +++ b/README.md @@ -5,13 +5,36 @@ A work in progress k6 extension to output real-time test metrics in [OpenTelemet > [!WARNING] > It's work in progress implementation and not ready for production use. -Configuration options (currently environment variables only): +## Configuration options + +Currently, environment variables only. It's worth to mention that the extension is using the [OpenTelemetry Go SDK](https://opentelemetry.io/docs/languages/go/getting-started/) that's why it's possible to use the configuration environment variables from the SDK. However, if the `K6_OTEL_*` environment variables are set, they will take precedence over the SDK configuration. + +### k6-specific configuration -* `K6_OTEL_RECEIVER_TYPE` - OpenTelemetry receiver type, currently only `grpc` is supported. Default is `grpc`. -* `K6_OTEL_RECEIVER_ENDPOINT` - OpenTelemetry receiver endpoint. Default is `localhost:4317`. * `K6_OTEL_METRIC_PREFIX` - Metric prefix. Default is empty. -* `K6_OTEL_FLUSH_INTERVAL` - How frequently to flush metrics to the receiver from k6. Default is `1s`. -* `K6_OTEL_PUSH_INTERVAL` - How frequently to push metrics to the receiver from k6. Default is `1s`. +* `K6_OTEL_FLUSH_INTERVAL` - How frequently to flush metrics from k6 metrics engine. Default is `1s`. + +### OpenTelemetry-specific configuration + +* `K6_OTEL_EXPORT_INTERVAL` - configures the intervening time between metrics exports. Default is `1s`. +* `K6_OTEL_EXPORTER_TYPE` - metric exporter type. Default is `grpc`. + +#### GRPC exporter + +* `K6_OTEL_GRPC_EXPORTER_INSECURE` - disables client transport security for the gRPC exporter. +* `K6_OTEL_GRPC_EXPORTER_ENDPOINT` - configures the gRPC exporter endpoint. Default is `localhost:4317`. + +> [!TIP] +> Also, you can use [OpenTelemetry SDK configuration environment variables](https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc@v1.26.0). + +#### HTTP exporter + +* `K6_OTEL_HTTP_EXPORTER_INSECURE` - disables client transport security for the HTTP exporter. +* `K6_OTEL_HTTP_EXPORTER_ENDPOINT` - configures the HTTP exporter endpoint. Default is `localhost:4318`. +* `K6_OTEL_HTTP_EXPORTER_URL_PATH` - configures the HTTP exporter path. Default is `/v1/metrics`. + +> [!TIP] +> Also, you can use [OpenTelemetry SDK configuration environment variables](https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp@v1.26.0). ## Build diff --git a/go.mod b/go.mod index de64dd1..435eaa3 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,11 @@ require ( go.k6.io/k6 v0.51.0 go.opentelemetry.io/otel v1.26.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0 go.opentelemetry.io/otel/metric v1.26.0 go.opentelemetry.io/otel/sdk v1.26.0 go.opentelemetry.io/otel/sdk/metric v1.26.0 + gopkg.in/guregu/null.v3 v3.5.0 ) require ( @@ -40,6 +42,5 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/grpc v1.63.2 // indirect google.golang.org/protobuf v1.33.0 // indirect - gopkg.in/guregu/null.v3 v3.5.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 08bcae4..3d8311f 100644 --- a/go.sum +++ b/go.sum @@ -195,6 +195,8 @@ go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0 h1:+hm+I+KigBy3M24/h1p/NHkUx/evbLH0PNcjpMyCHc4= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0/go.mod h1:NjC8142mLvvNT6biDpaMjyz78kyEHIwAJlSX0N9P5KI= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0 h1:HGZWGmCVRCVyAs2GQaiHQPbDHo+ObFWeUEOd+zDnp64= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0/go.mod h1:SaH+v38LSCHddyk7RGlU9uZyQoRrKao6IBnJw6Kbn+c= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 h1:Mw5xcxMwlqoJd97vwPxA8isEaIoxsta9/Q51+TTJLGE= diff --git a/pkg/opentelemetry/config.go b/pkg/opentelemetry/config.go index 33b16dc..ca8d4b0 100644 --- a/pkg/opentelemetry/config.go +++ b/pkg/opentelemetry/config.go @@ -5,76 +5,162 @@ import ( "fmt" "time" + k6Const "go.k6.io/k6/lib/consts" "go.k6.io/k6/output" + "gopkg.in/guregu/null.v3" ) const ( - grpcReceiverType = "grpc" + // grpcExporterType GRPC exporter type + grpcExporterType = "grpc" + // httpExporterType HTTP exporter type + httpExporterType = "http" ) // Config is the config for the template collector type Config struct { + // ServiceName is the name of the service to use for the metrics + // export, if not set it will use "k6" + ServiceName string + // ServiceVersion is the version of the service to use for the metrics + // export, if not set it will use k6's library version + ServiceVersion string // MetricPrefix is the prefix to use for the metrics MetricPrefix string - // ReceiverType is the type of the receiver to use - ReceiverType string - // GRPCReceiverEndpoint is the endpoint of the gRPC receiver - GRPCReceiverEndpoint string - // PushInterval is the interval at which to push metrics to the receiver - PushInterval time.Duration // FlushInterval is the interval at which to flush metrics from the k6 FlushInterval time.Duration + + // ExporterType sets the type of OpenTelemetry Exporter to use + // Currently only "grpc" is supported + ExporterType string + // ExportInterval configures the intervening time between metrics exports + ExportInterval time.Duration + + // HTTPExporterInsecure disables client transport security for the Exporter's HTTP + // connection. + HTTPExporterInsecure null.Bool + // HTTPExporterEndpoint sets the target endpoint the OpenTelemetry Exporter + // will connect to. + HTTPExporterEndpoint string + // HTTPExporterURLPath sets the target URL path the OpenTelemetry Exporter + HTTPExporterURLPath string + + // GRPCExporterEndpoint sets the target endpoint the OpenTelemetry Exporter + // will connect to. + GRPCExporterEndpoint string + // GRPCExporterInsecure disables client transport security for the Exporter's gRPC + // connection. + GRPCExporterInsecure null.Bool } // NewConfig creates and validates a new config func NewConfig(p output.Params) (Config, error) { cfg := Config{ - MetricPrefix: "", - ReceiverType: grpcReceiverType, - GRPCReceiverEndpoint: "localhost:4317", - PushInterval: 1 * time.Second, - FlushInterval: 1 * time.Second, + ServiceName: "k6", + ServiceVersion: k6Const.Version, + MetricPrefix: "", + ExporterType: grpcExporterType, + + HTTPExporterInsecure: null.BoolFrom(false), + HTTPExporterEndpoint: "localhost:4318", + HTTPExporterURLPath: "/v1/metrics", + + GRPCExporterInsecure: null.BoolFrom(false), + GRPCExporterEndpoint: "localhost:4317", + + ExportInterval: 1 * time.Second, + FlushInterval: 1 * time.Second, } var err error for k, v := range p.Environment { switch k { - case "K6_OTEL_PUSH_INTERVAL": - cfg.PushInterval, err = time.ParseDuration(v) - if err != nil { - return cfg, fmt.Errorf("error parsing environment variable 'K6_OTEL_PUSH_INTERVAL': %w", err) - } + case "K6_OTEL_SERVICE_NAME": + cfg.ServiceName = v + case "K6_OTEL_SERVICE_VERSION": + cfg.ServiceVersion = v case "K6_OTEL_METRIC_PREFIX": cfg.MetricPrefix = v + case "K6_OTEL_EXPORT_INTERVAL": + cfg.ExportInterval, err = time.ParseDuration(v) + if err != nil { + return cfg, fmt.Errorf("error parsing environment variable 'K6_OTEL_EXPORT_INTERVAL': %w", err) + } case "K6_OTEL_FLUSH_INTERVAL": cfg.FlushInterval, err = time.ParseDuration(v) if err != nil { return cfg, fmt.Errorf("error parsing environment variable 'K6_OTEL_FLUSH_INTERVAL': %w", err) } - case "K6_OTEL_RECEIVER_TYPE": - cfg.ReceiverType = v - case "K6_OTEL_GRPC_RECEIVER_ENDPOINT": - cfg.GRPCReceiverEndpoint = v + case "K6_OTEL_EXPORTER_TYPE": + cfg.ExporterType = v + case "K6_OTEL_GRPC_EXPORTER_ENDPOINT": + cfg.GRPCExporterEndpoint = v + case "K6_OTEL_HTTP_EXPORTER_ENDPOINT": + cfg.HTTPExporterEndpoint = v + case "K6_OTEL_HTTP_EXPORTER_URL_PATH": + cfg.HTTPExporterURLPath = v + case "K6_OTEL_HTTP_EXPORTER_INSECURE": + cfg.HTTPExporterInsecure, err = parseBool(k, v) + if err != nil { + return cfg, err + } + case "K6_OTEL_GRPC_EXPORTER_INSECURE": + cfg.GRPCExporterInsecure, err = parseBool(k, v) + if err != nil { + return cfg, err + } } } // TDOO: consolidated config if err = cfg.Validate(); err != nil { - return cfg, fmt.Errorf("error validating config: %w", err) + return cfg, fmt.Errorf("error validating OpenTelemetry output config: %w", err) } return cfg, nil } +func parseBool(k, v string) (null.Bool, error) { + bv := null.NewBool(false, false) + + err := bv.UnmarshalText([]byte(v)) + if err != nil { + return bv, fmt.Errorf("error parsing %q environment variable: %w", k, err) + } + + return bv, nil +} + // Validate validates the config func (c Config) Validate() error { - if c.ReceiverType != grpcReceiverType { - return fmt.Errorf("unsupported receiver type %q, currently only %q supported", c.ReceiverType, grpcReceiverType) + if c.ServiceName == "" { + return errors.New("providing service name is required") } - if c.GRPCReceiverEndpoint == "" { - return errors.New("gRPC receiver endpoint is required") + if c.ServiceVersion == "" { + return errors.New("providing service version is required") + } + + if c.ExporterType != grpcExporterType && c.ExporterType != httpExporterType { + return fmt.Errorf( + "unsupported exporter type %q, currently only %q and %q supported", + c.ExporterType, + grpcExporterType, + httpExporterType, + ) + } + + if c.ExporterType == grpcExporterType { + if c.GRPCExporterEndpoint == "" { + return errors.New("gRPC exporter endpoint is required") + } + } + + if c.ExporterType == httpExporterType { + if c.HTTPExporterEndpoint == "" { + return errors.New("HTTP exporter endpoint is required") + } } return nil @@ -82,5 +168,23 @@ func (c Config) Validate() error { // String returns a string representation of the config func (c Config) String() string { - return fmt.Sprintf("%s, %s", c.ReceiverType, c.GRPCReceiverEndpoint) + var endpoint string + exporter := c.ExporterType + + if c.ExporterType == httpExporterType { + endpoint = "http" + if !c.HTTPExporterInsecure.Bool { + endpoint += "s" + } + + endpoint += "://" + c.HTTPExporterEndpoint + c.HTTPExporterURLPath + } else { + endpoint = c.GRPCExporterEndpoint + + if c.GRPCExporterInsecure.Bool { + exporter += " (insecure)" + } + } + + return fmt.Sprintf("%s, %s", exporter, endpoint) } diff --git a/pkg/opentelemetry/config_test.go b/pkg/opentelemetry/config_test.go index 7cd2c4b..cf3b45f 100644 --- a/pkg/opentelemetry/config_test.go +++ b/pkg/opentelemetry/config_test.go @@ -6,7 +6,11 @@ import ( "time" "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v3" + "go.k6.io/k6/output" + + k6Const "go.k6.io/k6/lib/consts" ) func TestConfig(t *testing.T) { @@ -21,31 +25,43 @@ func TestConfig(t *testing.T) { }{ "default": { expectedConfig: Config{ - ReceiverType: grpcReceiverType, - GRPCReceiverEndpoint: "localhost:4317", - PushInterval: 1 * time.Second, + ServiceName: "k6", + ServiceVersion: k6Const.Version, + ExporterType: grpcExporterType, + HTTPExporterInsecure: null.NewBool(false, true), + HTTPExporterEndpoint: "localhost:4318", + HTTPExporterURLPath: "/v1/metrics", + GRPCExporterInsecure: null.NewBool(false, true), + GRPCExporterEndpoint: "localhost:4317", + ExportInterval: 1 * time.Second, FlushInterval: 1 * time.Second, }, }, "overwrite": { - env: map[string]string{"K6_OTEL_GRPC_RECEIVER_ENDPOINT": "else", "K6_OTEL_PUSH_INTERVAL": "4ms"}, + env: map[string]string{"K6_OTEL_GRPC_EXPORTER_ENDPOINT": "else", "K6_OTEL_EXPORT_INTERVAL": "4ms"}, expectedConfig: Config{ - ReceiverType: grpcReceiverType, - GRPCReceiverEndpoint: "else", - PushInterval: 4 * time.Millisecond, + ServiceName: "k6", + ServiceVersion: k6Const.Version, + ExporterType: grpcExporterType, + HTTPExporterInsecure: null.NewBool(false, true), + HTTPExporterEndpoint: "localhost:4318", + HTTPExporterURLPath: "/v1/metrics", + GRPCExporterInsecure: null.NewBool(false, true), + GRPCExporterEndpoint: "else", + ExportInterval: 4 * time.Millisecond, FlushInterval: 1 * time.Second, }, }, "early error": { - env: map[string]string{"K6_OTEL_GRPC_RECEIVER_ENDPOINT": "else", "K6_OTEL_PUSH_INTERVAL": "4something"}, + env: map[string]string{"K6_OTEL_GRPC_EXPORTER_ENDPOINT": "else", "K6_OTEL_EXPORT_INTERVAL": "4something"}, err: `time: unknown unit "something" in duration "4something"`, }, "unsupported receiver type": { - env: map[string]string{"K6_OTEL_GRPC_RECEIVER_ENDPOINT": "else", "K6_OTEL_PUSH_INTERVAL": "4m", "K6_OTEL_RECEIVER_TYPE": "http"}, - err: `error validating config: unsupported receiver type "http", currently only "grpc" supported`, + env: map[string]string{"K6_OTEL_GRPC_EXPORTER_ENDPOINT": "else", "K6_OTEL_EXPORT_INTERVAL": "4m", "K6_OTEL_EXPORTER_TYPE": "socket"}, + err: `error validating OpenTelemetry output config: unsupported exporter type "socket", currently only "grpc" and "http" supported`, }, } diff --git a/pkg/opentelemetry/exporter.go b/pkg/opentelemetry/exporter.go new file mode 100644 index 0000000..3ad1f83 --- /dev/null +++ b/pkg/opentelemetry/exporter.go @@ -0,0 +1,53 @@ +package opentelemetry + +import ( + "context" + "errors" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/sdk/metric" +) + +func getExporter(cfg Config) (metric.Exporter, error) { + // at the point of writing this code + // ctx isn't used at any point in the exporter + // later on, it could be used for the connection timeout + ctx := context.Background() + + if cfg.ExporterType == grpcExporterType { + return buildGRPCExporter(ctx, cfg) + } + + if cfg.ExporterType == httpExporterType { + return buildHTTPExporter(ctx, cfg) + } + + return nil, errors.New("unsupported exporter type " + cfg.ExporterType + " specified") +} + +func buildHTTPExporter(ctx context.Context, cfg Config) (metric.Exporter, error) { + opts := []otlpmetrichttp.Option{ + otlpmetrichttp.WithEndpoint(cfg.HTTPExporterEndpoint), + otlpmetrichttp.WithURLPath(cfg.HTTPExporterURLPath), + } + + if cfg.HTTPExporterInsecure.Bool { + opts = append(opts, otlpmetrichttp.WithInsecure()) + } + + return otlpmetrichttp.New(ctx, opts...) +} + +func buildGRPCExporter(ctx context.Context, cfg Config) (metric.Exporter, error) { + opt := []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithEndpoint(cfg.GRPCExporterEndpoint), + } + + // TODO: give priority to the TLS + if cfg.GRPCExporterInsecure.Bool { + opt = append(opt, otlpmetricgrpc.WithInsecure()) + } + + return otlpmetricgrpc.New(ctx, opt...) +} diff --git a/pkg/opentelemetry/output.go b/pkg/opentelemetry/output.go index 0d26844..8383caf 100644 --- a/pkg/opentelemetry/output.go +++ b/pkg/opentelemetry/output.go @@ -8,13 +8,11 @@ import ( "time" "github.com/sirupsen/logrus" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" otelMetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" - k6Const "go.k6.io/k6/lib/consts" "go.k6.io/k6/metrics" "go.k6.io/k6/output" ) @@ -57,8 +55,8 @@ func (o *Output) Description() string { // StopWithTestError flushes all remaining metrics and finalizes the test run func (o *Output) StopWithTestError(_ error) error { - o.logger.Info("Stopping...") - defer o.logger.Info("Stopped!") + o.logger.Debug("Stopping...") + defer o.logger.Debug("Stopped!") if err := o.meterProvider.Shutdown(context.Background()); err != nil { o.logger.WithError(err).Error("can't shutdown OpenTelemetry metric provider") @@ -78,22 +76,15 @@ func (o *Output) Stop() error { func (o *Output) Start() error { o.logger.Debug("Starting output...") - ctx := context.Background() - - // TODO: support different exporters (e.g. OTLP/HTTP), authentication, etc. - exp, err := otlpmetricgrpc.New( - ctx, - otlpmetricgrpc.WithInsecure(), - otlpmetricgrpc.WithEndpoint(o.config.GRPCReceiverEndpoint), - ) + exp, err := getExporter(o.config) if err != nil { return fmt.Errorf("failed to create OpenTelemetry exporter: %w", err) } res, err := resource.Merge(resource.Default(), resource.NewWithAttributes(semconv.SchemaURL, - semconv.ServiceName("k6"), - semconv.ServiceVersion(k6Const.Version), + semconv.ServiceName(o.config.ServiceName), + semconv.ServiceVersion(o.config.ServiceVersion), )) if err != nil { return fmt.Errorf("failed to create OpenTelemetry resource: %w", err) @@ -104,7 +95,7 @@ func (o *Output) Start() error { metric.WithReader( metric.NewPeriodicReader( exp, - metric.WithInterval(o.config.PushInterval), + metric.WithInterval(o.config.ExportInterval), ), ), )