From 057deff85ad875d361ea06a1b19c4618e328156a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gergely=20Madar=C3=A1sz?= Date: Wed, 11 Dec 2024 11:46:15 +0100 Subject: [PATCH] Configure metrics exporter for loadbalancing when possible (#2242) * Configure metrics exporter for loadbalancing when available * Update docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * Allow update of signal type during exporter lifetime * Updated loadbalancing test to verify routing_key changes * Update internal/component/otelcol/exporter/exporter.go --------- Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- CHANGELOG.md | 2 + .../otelcol/otelcol.exporter.loadbalancing.md | 13 +- .../component/otelcol/exporter/awss3/awss3.go | 2 +- .../otelcol/exporter/datadog/datadog.go | 2 +- .../component/otelcol/exporter/debug/debug.go | 2 +- .../component/otelcol/exporter/exporter.go | 21 +- .../otelcol/exporter/exporter_test.go | 2 +- .../component/otelcol/exporter/kafka/kafka.go | 2 +- .../exporter/loadbalancing/loadbalancing.go | 41 +++- .../loadbalancing/loadbalancing_test.go | 210 ++++++++++++++++++ .../component/otelcol/exporter/otlp/otlp.go | 2 +- .../otelcol/exporter/otlphttp/otlphttp.go | 2 +- .../otelcol/exporter/splunkhec/splunkhec.go | 2 +- .../otelcol/exporter/syslog/syslog.go | 2 +- 14 files changed, 279 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d9fa0633d..56a99d612b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ Main (unreleased) ### Features +- Add support for metrics in `otelcol.exporter.loadbalancing` (@madaraszg-tulip) + - Add `add_cloudwatch_timestamp` to `prometheus.exporter.cloudwatch` metrics. (@captncraig) - Add support to `prometheus.operator.servicemonitors` to allow `endpointslice` role. (@yoyosir) diff --git a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md index 67c5f6aab0..99f003f08d 100644 --- a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md +++ b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md @@ -62,10 +62,19 @@ Name | Type | Description | Default | `routing_key` | `string` | Routing strategy for load balancing. | `"traceID"` | no The `routing_key` attribute determines how to route signals across endpoints. Its value could be one of the following: -* `"service"`: spans with the same `service.name` will be exported to the same backend. +- `"service"`: spans, logs, and metrics with the same `service.name` will be exported to the same backend. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent {{< param "PRODUCT_NAME" >}} instances for metric collection. Otherwise, metrics for the same services would be sent to different instances, making aggregations inaccurate. -* `"traceID"`: spans belonging to the same traceID will be exported to the same backend. +- `"traceID"`: spans and logs belonging to the same `traceID` will be exported to the same backend. +- `"resource"`: metrics belonging to the same resource will be exported to the same backend. +- `"metric"`: metrics with the same name will be exported to the same backend. +- `"streamID"`: metrics with the same `streamID` will be exported to the same backend. + +The loadbalancer configures the exporter for the signal types supported by the `routing_key`. + +> **EXPERIMENTAL**: Metrics support in `otelcol.exporter.loadbalancing` is an [experimental][] feature. +> Experimental features are subject to frequent breaking changes, and may be removed with no equivalent replacement. +> The `stability.level` flag must be set to `experimental` to use the feature. ## Blocks diff --git a/internal/component/otelcol/exporter/awss3/awss3.go b/internal/component/otelcol/exporter/awss3/awss3.go index b033ebb303..4276358462 100644 --- a/internal/component/otelcol/exporter/awss3/awss3.go +++ b/internal/component/otelcol/exporter/awss3/awss3.go @@ -23,7 +23,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := awss3exporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/datadog/datadog.go b/internal/component/otelcol/exporter/datadog/datadog.go index 2198e956e2..2248a47e64 100644 --- a/internal/component/otelcol/exporter/datadog/datadog.go +++ b/internal/component/otelcol/exporter/datadog/datadog.go @@ -42,7 +42,7 @@ func init() { // Since we don't have that, we disable the feature gate to allow the exporter to compute APM stats. // See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/datadogexporter for more featuregate.GlobalRegistry().Set("exporter.datadogexporter.DisableAPMStats", false) - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/debug/debug.go b/internal/component/otelcol/exporter/debug/debug.go index 5c2662501e..09d8e21a0a 100644 --- a/internal/component/otelcol/exporter/debug/debug.go +++ b/internal/component/otelcol/exporter/debug/debug.go @@ -25,7 +25,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := debugexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 22a2b56d3a..63e0f1cdd0 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -77,6 +77,14 @@ func (s TypeSignal) SupportsTraces() bool { return s&TypeTraces != 0 } +type TypeSignalFunc func(component.Options, component.Arguments) TypeSignal + +func TypeSignalConstFunc(ts TypeSignal) TypeSignalFunc { + return func(component.Options, component.Arguments) TypeSignal { + return ts + } +} + // Exporter is an Alloy component shim which manages an OpenTelemetry Collector // exporter component. type Exporter struct { @@ -92,7 +100,8 @@ type Exporter struct { // Signals which the exporter is able to export. // Can be logs, metrics, traces or any combination of them. - supportedSignals TypeSignal + // This is a function because which signals are supported may depend on the component configuration. + supportedSignals TypeSignalFunc } var ( @@ -106,7 +115,7 @@ var ( // // The registered component must be registered to export the // otelcol.ConsumerExports type, otherwise New will panic. -func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignal) (*Exporter, error) { +func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignalFunc) (*Exporter, error) { ctx, cancel := context.WithCancel(context.Background()) consumer := lazyconsumer.NewPaused(ctx) @@ -212,8 +221,10 @@ func (e *Exporter) Update(args component.Arguments) error { // supported telemetry signals. var components []otelcomponent.Component + supportedSignals := e.supportedSignals(e.opts, args) + var tracesExporter otelexporter.Traces - if e.supportedSignals.SupportsTraces() { + if supportedSignals.SupportsTraces() { tracesExporter, err = e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig) if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) { return err @@ -223,7 +234,7 @@ func (e *Exporter) Update(args component.Arguments) error { } var metricsExporter otelexporter.Metrics - if e.supportedSignals.SupportsMetrics() { + if supportedSignals.SupportsMetrics() { metricsExporter, err = e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig) if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) { return err @@ -233,7 +244,7 @@ func (e *Exporter) Update(args component.Arguments) error { } var logsExporter otelexporter.Logs - if e.supportedSignals.SupportsLogs() { + if supportedSignals.SupportsLogs() { logsExporter, err = e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig) if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) { return err diff --git a/internal/component/otelcol/exporter/exporter_test.go b/internal/component/otelcol/exporter/exporter_test.go index a2c8f13be2..85fe79d6a9 100644 --- a/internal/component/otelcol/exporter/exporter_test.go +++ b/internal/component/otelcol/exporter/exporter_test.go @@ -105,7 +105,7 @@ func newTestEnvironment(t *testing.T, fe *fakeExporter) *testEnvironment { }, otelcomponent.StabilityLevelUndefined), ) - return exporter.New(opts, factory, args.(exporter.Arguments), exporter.TypeAll) + return exporter.New(opts, factory, args.(exporter.Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, } diff --git a/internal/component/otelcol/exporter/kafka/kafka.go b/internal/component/otelcol/exporter/kafka/kafka.go index 2b928bccfe..89b0956098 100644 --- a/internal/component/otelcol/exporter/kafka/kafka.go +++ b/internal/component/otelcol/exporter/kafka/kafka.go @@ -28,7 +28,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := kafkaexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go b/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go index a6f94c43e2..4d1c886695 100644 --- a/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go +++ b/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go @@ -13,6 +13,7 @@ import ( otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/exporter" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/syntax" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" otelcomponent "go.opentelemetry.io/collector/component" @@ -34,10 +35,33 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := loadbalancingexporter.NewFactory() - //TODO(ptodev): LB exporter cannot yet work with metrics due to a limitation in Alloy: - // https://github.com/grafana/agent/pull/5684 - // Once the limitation is removed, we may be able to remove the need for exporter.TypeSignal altogether. - return exporter.New(opts, fact, args.(Arguments), exporter.TypeLogs|exporter.TypeTraces) + + // As per https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/loadbalancingexporter/README.md + // metrics is considered "development" stability level + + typeSignalFunc := func(opts component.Options, args component.Arguments) exporter.TypeSignal { + myArgs := args.(Arguments) + var typeSignal exporter.TypeSignal + switch myArgs.RoutingKey { + case "traceID": + typeSignal = exporter.TypeLogs | exporter.TypeTraces + case "service": + if opts.MinStability.Permits(featuregate.StabilityExperimental) { + typeSignal = exporter.TypeLogs | exporter.TypeTraces | exporter.TypeMetrics + } else { + level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it") + typeSignal = exporter.TypeLogs | exporter.TypeTraces + } + case "resource", "metric", "streamID": + if opts.MinStability.Permits(featuregate.StabilityExperimental) { + typeSignal = exporter.TypeMetrics + } else { + level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it") + } + } + return typeSignal + } + return exporter.New(opts, fact, args.(Arguments), typeSignalFunc) }, }) } @@ -69,13 +93,10 @@ func (args *Arguments) SetToDefault() { // Validate implements syntax.Validator. func (args *Arguments) Validate() error { - //TODO(ptodev): Add support for "resource" and "metric" routing keys later. - // The reason we can't add them yet is that otelcol.exporter.loadbalancing - // is labeled as "beta", but those routing keys are experimental. - // We need a way to label otelcol.exporter.loadbalancing as "public-preview" - // for logs and traces, but "experimental" for metrics. + // Allow routing keys for all signal types. Metrics exporter will be disabled + // if stability level is above experimental switch args.RoutingKey { - case "service", "traceID": + case "service", "traceID", "resource", "metric", "streamID": // The routing key is valid. default: return fmt.Errorf("invalid routing key %q", args.RoutingKey) diff --git a/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go b/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go index 83e241c202..741b59d033 100644 --- a/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go +++ b/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go @@ -1,13 +1,20 @@ package loadbalancing_test import ( + "context" + "fmt" + "net" "testing" "time" "github.com/grafana/alloy/internal/component/otelcol" otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/exporter/loadbalancing" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" + "github.com/grafana/dskit/backoff" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/configgrpc" @@ -15,6 +22,9 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "google.golang.org/grpc" ) func getPtrToUint(v uint16) *uint16 { @@ -22,6 +32,206 @@ func getPtrToUint(v uint16) *uint16 { return res } +// Test performs a basic integration test which runs the otelcol.exporter.loadbalancing +// component and ensures that it can pass data to an OTLP gRPC server. +func Test(t *testing.T) { + traceCh := make(chan ptrace.Traces) + tracesServer := makeTracesServer(t, traceCh) + + ctx := componenttest.TestContext(t) + l := util.TestLogger(t) + + ctrl, err := componenttest.NewControllerFromID(l, "otelcol.exporter.loadbalancing") + require.NoError(t, err) + + cfgTemplate := ` + routing_key = "%s" + resolver { + static { + hostnames = ["%s"] + } + } + protocol { + otlp { + client { + compression = "none" + + tls { + insecure = true + insecure_skip_verify = true + } + } + } + } + + debug_metrics { + disable_high_cardinality_metrics = true + } + ` + + cfg := fmt.Sprintf(cfgTemplate, "traceID", tracesServer) + var args loadbalancing.Arguments + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + require.Equal(t, args.DebugMetricsConfig().DisableHighCardinalityMetrics, true) + + go func() { + err := ctrl.Run(ctx, args) + require.NoError(t, err) + }() + + require.NoError(t, ctrl.WaitRunning(time.Second), "component never started") + require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything") + + // Send traces in the background to our exporter. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our exporter to finish and pass data to our rpc server. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for traces") + case tr := <-traceCh: + require.Equal(t, 1, tr.SpanCount()) + } + + // Update the config to disable traces export + cfg = fmt.Sprintf(cfgTemplate, "metric", tracesServer) + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + ctrl.Update(args) + + // Send traces in the background to our exporter. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MaxRetries: 3, + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + require.ErrorContains(t, err, "telemetry type is not supported") + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our exporter to finish and pass data to our rpc server. + // no error here, as we we expect to fail sending in the first place + select { + case <-traceCh: + require.FailNow(t, "no traces expected here") + case <-time.After(time.Second): + } + + // Re-run the test with reenabled traces export + cfg = fmt.Sprintf(cfgTemplate, "traceID", tracesServer) + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + ctrl.Update(args) + + // Send traces in the background to our exporter. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our exporter to finish and pass data to our rpc server. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for traces") + case tr := <-traceCh: + require.Equal(t, 1, tr.SpanCount()) + } +} + +// makeTracesServer returns a host:port which will accept traces over insecure +// gRPC. +func makeTracesServer(t *testing.T, ch chan ptrace.Traces) string { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + srv := grpc.NewServer() + ptraceotlp.RegisterGRPCServer(srv, &mockTracesReceiver{ch: ch}) + + go func() { + err := srv.Serve(lis) + require.NoError(t, err) + }() + t.Cleanup(srv.Stop) + + return lis.Addr().String() +} + +type mockTracesReceiver struct { + ptraceotlp.UnimplementedGRPCServer + ch chan ptrace.Traces +} + +var _ ptraceotlp.GRPCServer = (*mockTracesReceiver)(nil) + +func (ms *mockTracesReceiver) Export(_ context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { + ms.ch <- req.Traces() + return ptraceotlp.NewExportResponse(), nil +} + +func createTestTraces() ptrace.Traces { + // Matches format from the protobuf definition: + // https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto + bb := `{ + "resource_spans": [{ + "scope_spans": [{ + "spans": [{ + "name": "TestSpan" + }] + }] + }] + }` + + decoder := &ptrace.JSONUnmarshaler{} + data, err := decoder.UnmarshalTraces([]byte(bb)) + if err != nil { + panic(err) + } + return data +} + func TestConfigConversion(t *testing.T) { var ( defaultRetrySettings = configretry.NewDefaultBackOffConfig() diff --git a/internal/component/otelcol/exporter/otlp/otlp.go b/internal/component/otelcol/exporter/otlp/otlp.go index 30c82ee367..fe90afc0d5 100644 --- a/internal/component/otelcol/exporter/otlp/otlp.go +++ b/internal/component/otelcol/exporter/otlp/otlp.go @@ -25,7 +25,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := otlpexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/otlphttp/otlphttp.go b/internal/component/otelcol/exporter/otlphttp/otlphttp.go index 091c8361f0..f996462c2d 100644 --- a/internal/component/otelcol/exporter/otlphttp/otlphttp.go +++ b/internal/component/otelcol/exporter/otlphttp/otlphttp.go @@ -26,7 +26,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := otlphttpexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/splunkhec/splunkhec.go b/internal/component/otelcol/exporter/splunkhec/splunkhec.go index 2b4e385f85..3c51d230ce 100644 --- a/internal/component/otelcol/exporter/splunkhec/splunkhec.go +++ b/internal/component/otelcol/exporter/splunkhec/splunkhec.go @@ -24,7 +24,7 @@ func init() { Exports: otelcol.ConsumerExports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := splunkhecexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/syslog/syslog.go b/internal/component/otelcol/exporter/syslog/syslog.go index ded5330c83..2723e65e8b 100644 --- a/internal/component/otelcol/exporter/syslog/syslog.go +++ b/internal/component/otelcol/exporter/syslog/syslog.go @@ -27,7 +27,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := syslogexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeLogs) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeLogs)) }, }) }