From fad70bc946c91d5454540e39f9392c3075fba4cc Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Sat, 7 Dec 2024 11:47:10 +0100 Subject: [PATCH 01/11] Configure metrics exporter for loadbalancing when available --- CHANGELOG.md | 2 + .../exporter/loadbalancing/loadbalancing.go | 41 ++++++++++++++----- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d9fa0633d..56a99d612b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ Main (unreleased) ### Features +- Add support for metrics in `otelcol.exporter.loadbalancing` (@madaraszg-tulip) + - Add `add_cloudwatch_timestamp` to `prometheus.exporter.cloudwatch` metrics. (@captncraig) - Add support to `prometheus.operator.servicemonitors` to allow `endpointslice` role. (@yoyosir) diff --git a/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go b/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go index a6f94c43e2..a403bcf8bb 100644 --- a/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go +++ b/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go @@ -13,6 +13,7 @@ import ( otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/exporter" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/syntax" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" otelcomponent "go.opentelemetry.io/collector/component" @@ -34,10 +35,33 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := loadbalancingexporter.NewFactory() - //TODO(ptodev): LB exporter cannot yet work with metrics due to a limitation in Alloy: - // https://github.com/grafana/agent/pull/5684 - // Once the limitation is removed, we may be able to remove the need for exporter.TypeSignal altogether. - return exporter.New(opts, fact, args.(Arguments), exporter.TypeLogs|exporter.TypeTraces) + + myArgs := args.(Arguments) + + var typeSignal exporter.TypeSignal + + // As per https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/loadbalancingexporter/README.md + // metrics is considered "development" stability level + + switch myArgs.RoutingKey { + case "traceID": + typeSignal = exporter.TypeLogs | exporter.TypeTraces + case "service": + if opts.MinStability.Permits(featuregate.StabilityExperimental) { + typeSignal = exporter.TypeLogs | exporter.TypeTraces | exporter.TypeMetrics + } else { + level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it") + typeSignal = exporter.TypeLogs | exporter.TypeTraces + } + case "resource", "metric", "streamID": + if opts.MinStability.Permits(featuregate.StabilityExperimental) { + typeSignal = exporter.TypeMetrics + } else { + level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it") + } + } + + return exporter.New(opts, fact, myArgs, typeSignal) }, }) } @@ -69,13 +93,10 @@ func (args *Arguments) SetToDefault() { // Validate implements syntax.Validator. func (args *Arguments) Validate() error { - //TODO(ptodev): Add support for "resource" and "metric" routing keys later. - // The reason we can't add them yet is that otelcol.exporter.loadbalancing - // is labeled as "beta", but those routing keys are experimental. - // We need a way to label otelcol.exporter.loadbalancing as "public-preview" - // for logs and traces, but "experimental" for metrics. + // Allow routing keys for all signal types. Metrics exporter will be disabled + // if stability level is above experimental switch args.RoutingKey { - case "service", "traceID": + case "service", "traceID", "resource", "metric", "streamID": // The routing key is valid. default: return fmt.Errorf("invalid routing key %q", args.RoutingKey) From 5e5af968b38a4f17d662c89e8467024bc827669a Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Mon, 9 Dec 2024 20:03:50 +0100 Subject: [PATCH 02/11] Update docs --- .../otelcol/otelcol.exporter.loadbalancing.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md index 67c5f6aab0..917c57b0d2 100644 --- a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md +++ b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md @@ -62,10 +62,18 @@ Name | Type | Description | Default | `routing_key` | `string` | Routing strategy for load balancing. | `"traceID"` | no The `routing_key` attribute determines how to route signals across endpoints. Its value could be one of the following: -* `"service"`: spans with the same `service.name` will be exported to the same backend. +- `"service"`: spans/logs/metrics with the same `service.name` will be exported to the same backend. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent {{< param "PRODUCT_NAME" >}} instances for metric collection. Otherwise, metrics for the same services would be sent to different instances, making aggregations inaccurate. -* `"traceID"`: spans belonging to the same traceID will be exported to the same backend. +- `"traceID"`: spans/logs belonging to the same traceID will be exported to the same backend. +- `"resource"`: metrics belonging to the same resource will be exported to the same backend. +- `"metric"`: metrics with the same name will be exported to the same backend +- `"streamID"`: metrics with the same streamID will be exported to the same backend + +The loadbalancer will configure the exporter for the singal types supported by the `routing_key` + +Metrics support in `otelcol.exporter.loadbalancing` is considered experimental, so an exporter +will be configured for metrics only if Alloy is run with `--stability-level=experimental` ## Blocks From 2e5f4369eec7fb98ad1d7cae31c013ab196ffe1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gergely=20Madar=C3=A1sz?= Date: Mon, 9 Dec 2024 21:37:49 +0100 Subject: [PATCH 03/11] Update docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../components/otelcol/otelcol.exporter.loadbalancing.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md index 917c57b0d2..d98803afe0 100644 --- a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md +++ b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md @@ -62,7 +62,7 @@ Name | Type | Description | Default | `routing_key` | `string` | Routing strategy for load balancing. | `"traceID"` | no The `routing_key` attribute determines how to route signals across endpoints. Its value could be one of the following: -- `"service"`: spans/logs/metrics with the same `service.name` will be exported to the same backend. +- `"service"`: spans, logs, and metrics with the same `service.name` will be exported to the same backend. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent {{< param "PRODUCT_NAME" >}} instances for metric collection. Otherwise, metrics for the same services would be sent to different instances, making aggregations inaccurate. - `"traceID"`: spans/logs belonging to the same traceID will be exported to the same backend. From b750200a504fcf644ae061a3fab638f1f0a49637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gergely=20Madar=C3=A1sz?= Date: Mon, 9 Dec 2024 21:38:20 +0100 Subject: [PATCH 04/11] Update docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../components/otelcol/otelcol.exporter.loadbalancing.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md index d98803afe0..b6253508eb 100644 --- a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md +++ b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md @@ -67,7 +67,7 @@ This is useful when using processors like the span metrics, so all spans for eac for metric collection. Otherwise, metrics for the same services would be sent to different instances, making aggregations inaccurate. - `"traceID"`: spans/logs belonging to the same traceID will be exported to the same backend. - `"resource"`: metrics belonging to the same resource will be exported to the same backend. -- `"metric"`: metrics with the same name will be exported to the same backend +- `"metric"`: metrics with the same name will be exported to the same backend. - `"streamID"`: metrics with the same streamID will be exported to the same backend The loadbalancer will configure the exporter for the singal types supported by the `routing_key` From 8c69571ca8a7c1c803ceb62d7c1ae938667661a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gergely=20Madar=C3=A1sz?= Date: Mon, 9 Dec 2024 21:39:00 +0100 Subject: [PATCH 05/11] Update docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../components/otelcol/otelcol.exporter.loadbalancing.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md index b6253508eb..cfe7129c17 100644 --- a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md +++ b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md @@ -65,7 +65,7 @@ The `routing_key` attribute determines how to route signals across endpoints. It - `"service"`: spans, logs, and metrics with the same `service.name` will be exported to the same backend. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent {{< param "PRODUCT_NAME" >}} instances for metric collection. Otherwise, metrics for the same services would be sent to different instances, making aggregations inaccurate. -- `"traceID"`: spans/logs belonging to the same traceID will be exported to the same backend. +- `"traceID"`: spans and logs belonging to the same `traceID` will be exported to the same backend. - `"resource"`: metrics belonging to the same resource will be exported to the same backend. - `"metric"`: metrics with the same name will be exported to the same backend. - `"streamID"`: metrics with the same streamID will be exported to the same backend From 57fd9f98bba0e38f35211922efe2fce4f44e6e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gergely=20Madar=C3=A1sz?= Date: Mon, 9 Dec 2024 21:39:07 +0100 Subject: [PATCH 06/11] Update docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../components/otelcol/otelcol.exporter.loadbalancing.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md index cfe7129c17..16876f6303 100644 --- a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md +++ b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md @@ -68,7 +68,7 @@ for metric collection. Otherwise, metrics for the same services would be sent to - `"traceID"`: spans and logs belonging to the same `traceID` will be exported to the same backend. - `"resource"`: metrics belonging to the same resource will be exported to the same backend. - `"metric"`: metrics with the same name will be exported to the same backend. -- `"streamID"`: metrics with the same streamID will be exported to the same backend +- `"streamID"`: metrics with the same `streamID` will be exported to the same backend. The loadbalancer will configure the exporter for the singal types supported by the `routing_key` From faf9323fa9e43768bdf59f0d7f396ae769207275 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gergely=20Madar=C3=A1sz?= Date: Mon, 9 Dec 2024 21:39:14 +0100 Subject: [PATCH 07/11] Update docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../components/otelcol/otelcol.exporter.loadbalancing.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md index 16876f6303..a7f7a5234b 100644 --- a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md +++ b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md @@ -70,7 +70,7 @@ for metric collection. Otherwise, metrics for the same services would be sent to - `"metric"`: metrics with the same name will be exported to the same backend. - `"streamID"`: metrics with the same `streamID` will be exported to the same backend. -The loadbalancer will configure the exporter for the singal types supported by the `routing_key` +The loadbalancer configures the exporter for the signal types supported by the `routing_key`. Metrics support in `otelcol.exporter.loadbalancing` is considered experimental, so an exporter will be configured for metrics only if Alloy is run with `--stability-level=experimental` From c3d4086b769c2e1a8e6f5686e7ed80dab97b591d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gergely=20Madar=C3=A1sz?= Date: Mon, 9 Dec 2024 21:39:37 +0100 Subject: [PATCH 08/11] Update docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../components/otelcol/otelcol.exporter.loadbalancing.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md index a7f7a5234b..99f003f08d 100644 --- a/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md +++ b/docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md @@ -72,8 +72,9 @@ for metric collection. Otherwise, metrics for the same services would be sent to The loadbalancer configures the exporter for the signal types supported by the `routing_key`. -Metrics support in `otelcol.exporter.loadbalancing` is considered experimental, so an exporter -will be configured for metrics only if Alloy is run with `--stability-level=experimental` +> **EXPERIMENTAL**: Metrics support in `otelcol.exporter.loadbalancing` is an [experimental][] feature. +> Experimental features are subject to frequent breaking changes, and may be removed with no equivalent replacement. +> The `stability.level` flag must be set to `experimental` to use the feature. ## Blocks From beaa50b7e60c67309b55511fb85e995ed2d71f05 Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Tue, 10 Dec 2024 12:22:46 +0100 Subject: [PATCH 09/11] Allow update of signal type during exporter lifetime --- .../component/otelcol/exporter/awss3/awss3.go | 2 +- .../otelcol/exporter/datadog/datadog.go | 2 +- .../component/otelcol/exporter/debug/debug.go | 2 +- .../component/otelcol/exporter/exporter.go | 20 +++++++--- .../otelcol/exporter/exporter_test.go | 2 +- .../component/otelcol/exporter/kafka/kafka.go | 2 +- .../exporter/loadbalancing/loadbalancing.go | 40 +++++++++---------- .../component/otelcol/exporter/otlp/otlp.go | 2 +- .../otelcol/exporter/otlphttp/otlphttp.go | 2 +- .../otelcol/exporter/splunkhec/splunkhec.go | 2 +- .../otelcol/exporter/syslog/syslog.go | 2 +- 11 files changed, 44 insertions(+), 34 deletions(-) diff --git a/internal/component/otelcol/exporter/awss3/awss3.go b/internal/component/otelcol/exporter/awss3/awss3.go index b033ebb303..4276358462 100644 --- a/internal/component/otelcol/exporter/awss3/awss3.go +++ b/internal/component/otelcol/exporter/awss3/awss3.go @@ -23,7 +23,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := awss3exporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/datadog/datadog.go b/internal/component/otelcol/exporter/datadog/datadog.go index 2198e956e2..2248a47e64 100644 --- a/internal/component/otelcol/exporter/datadog/datadog.go +++ b/internal/component/otelcol/exporter/datadog/datadog.go @@ -42,7 +42,7 @@ func init() { // Since we don't have that, we disable the feature gate to allow the exporter to compute APM stats. // See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/datadogexporter for more featuregate.GlobalRegistry().Set("exporter.datadogexporter.DisableAPMStats", false) - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/debug/debug.go b/internal/component/otelcol/exporter/debug/debug.go index 5c2662501e..09d8e21a0a 100644 --- a/internal/component/otelcol/exporter/debug/debug.go +++ b/internal/component/otelcol/exporter/debug/debug.go @@ -25,7 +25,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := debugexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 22a2b56d3a..2a30e2b2ca 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -77,6 +77,14 @@ func (s TypeSignal) SupportsTraces() bool { return s&TypeTraces != 0 } +type TypeSignalFunc func(component.Options, component.Arguments) TypeSignal + +func TypeSignalConstFunc(ts TypeSignal) TypeSignalFunc { + return func(component.Options, component.Arguments) TypeSignal { + return ts + } +} + // Exporter is an Alloy component shim which manages an OpenTelemetry Collector // exporter component. type Exporter struct { @@ -92,7 +100,7 @@ type Exporter struct { // Signals which the exporter is able to export. // Can be logs, metrics, traces or any combination of them. - supportedSignals TypeSignal + supportedSignals TypeSignalFunc } var ( @@ -106,7 +114,7 @@ var ( // // The registered component must be registered to export the // otelcol.ConsumerExports type, otherwise New will panic. -func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignal) (*Exporter, error) { +func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignalFunc) (*Exporter, error) { ctx, cancel := context.WithCancel(context.Background()) consumer := lazyconsumer.NewPaused(ctx) @@ -212,8 +220,10 @@ func (e *Exporter) Update(args component.Arguments) error { // supported telemetry signals. var components []otelcomponent.Component + supportedSignals := e.supportedSignals(e.opts, args) + var tracesExporter otelexporter.Traces - if e.supportedSignals.SupportsTraces() { + if supportedSignals.SupportsTraces() { tracesExporter, err = e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig) if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) { return err @@ -223,7 +233,7 @@ func (e *Exporter) Update(args component.Arguments) error { } var metricsExporter otelexporter.Metrics - if e.supportedSignals.SupportsMetrics() { + if supportedSignals.SupportsMetrics() { metricsExporter, err = e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig) if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) { return err @@ -233,7 +243,7 @@ func (e *Exporter) Update(args component.Arguments) error { } var logsExporter otelexporter.Logs - if e.supportedSignals.SupportsLogs() { + if supportedSignals.SupportsLogs() { logsExporter, err = e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig) if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) { return err diff --git a/internal/component/otelcol/exporter/exporter_test.go b/internal/component/otelcol/exporter/exporter_test.go index a2c8f13be2..85fe79d6a9 100644 --- a/internal/component/otelcol/exporter/exporter_test.go +++ b/internal/component/otelcol/exporter/exporter_test.go @@ -105,7 +105,7 @@ func newTestEnvironment(t *testing.T, fe *fakeExporter) *testEnvironment { }, otelcomponent.StabilityLevelUndefined), ) - return exporter.New(opts, factory, args.(exporter.Arguments), exporter.TypeAll) + return exporter.New(opts, factory, args.(exporter.Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, } diff --git a/internal/component/otelcol/exporter/kafka/kafka.go b/internal/component/otelcol/exporter/kafka/kafka.go index 2b928bccfe..89b0956098 100644 --- a/internal/component/otelcol/exporter/kafka/kafka.go +++ b/internal/component/otelcol/exporter/kafka/kafka.go @@ -28,7 +28,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := kafkaexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go b/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go index a403bcf8bb..4d1c886695 100644 --- a/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go +++ b/internal/component/otelcol/exporter/loadbalancing/loadbalancing.go @@ -36,32 +36,32 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := loadbalancingexporter.NewFactory() - myArgs := args.(Arguments) - - var typeSignal exporter.TypeSignal - // As per https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/loadbalancingexporter/README.md // metrics is considered "development" stability level - switch myArgs.RoutingKey { - case "traceID": - typeSignal = exporter.TypeLogs | exporter.TypeTraces - case "service": - if opts.MinStability.Permits(featuregate.StabilityExperimental) { - typeSignal = exporter.TypeLogs | exporter.TypeTraces | exporter.TypeMetrics - } else { - level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it") + typeSignalFunc := func(opts component.Options, args component.Arguments) exporter.TypeSignal { + myArgs := args.(Arguments) + var typeSignal exporter.TypeSignal + switch myArgs.RoutingKey { + case "traceID": typeSignal = exporter.TypeLogs | exporter.TypeTraces + case "service": + if opts.MinStability.Permits(featuregate.StabilityExperimental) { + typeSignal = exporter.TypeLogs | exporter.TypeTraces | exporter.TypeMetrics + } else { + level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it") + typeSignal = exporter.TypeLogs | exporter.TypeTraces + } + case "resource", "metric", "streamID": + if opts.MinStability.Permits(featuregate.StabilityExperimental) { + typeSignal = exporter.TypeMetrics + } else { + level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it") + } } - case "resource", "metric", "streamID": - if opts.MinStability.Permits(featuregate.StabilityExperimental) { - typeSignal = exporter.TypeMetrics - } else { - level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it") - } + return typeSignal } - - return exporter.New(opts, fact, myArgs, typeSignal) + return exporter.New(opts, fact, args.(Arguments), typeSignalFunc) }, }) } diff --git a/internal/component/otelcol/exporter/otlp/otlp.go b/internal/component/otelcol/exporter/otlp/otlp.go index 30c82ee367..fe90afc0d5 100644 --- a/internal/component/otelcol/exporter/otlp/otlp.go +++ b/internal/component/otelcol/exporter/otlp/otlp.go @@ -25,7 +25,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := otlpexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/otlphttp/otlphttp.go b/internal/component/otelcol/exporter/otlphttp/otlphttp.go index 091c8361f0..f996462c2d 100644 --- a/internal/component/otelcol/exporter/otlphttp/otlphttp.go +++ b/internal/component/otelcol/exporter/otlphttp/otlphttp.go @@ -26,7 +26,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := otlphttpexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/splunkhec/splunkhec.go b/internal/component/otelcol/exporter/splunkhec/splunkhec.go index 2b4e385f85..3c51d230ce 100644 --- a/internal/component/otelcol/exporter/splunkhec/splunkhec.go +++ b/internal/component/otelcol/exporter/splunkhec/splunkhec.go @@ -24,7 +24,7 @@ func init() { Exports: otelcol.ConsumerExports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := splunkhecexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll)) }, }) } diff --git a/internal/component/otelcol/exporter/syslog/syslog.go b/internal/component/otelcol/exporter/syslog/syslog.go index ded5330c83..2723e65e8b 100644 --- a/internal/component/otelcol/exporter/syslog/syslog.go +++ b/internal/component/otelcol/exporter/syslog/syslog.go @@ -27,7 +27,7 @@ func init() { Build: func(opts component.Options, args component.Arguments) (component.Component, error) { fact := syslogexporter.NewFactory() - return exporter.New(opts, fact, args.(Arguments), exporter.TypeLogs) + return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeLogs)) }, }) } From 5ad56628fb79ecf1d7083b37aba0d46daa520c8e Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Tue, 10 Dec 2024 13:40:51 +0100 Subject: [PATCH 10/11] Updated loadbalancing test to verify routing_key changes --- .../loadbalancing/loadbalancing_test.go | 210 ++++++++++++++++++ 1 file changed, 210 insertions(+) diff --git a/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go b/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go index 83e241c202..741b59d033 100644 --- a/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go +++ b/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go @@ -1,13 +1,20 @@ package loadbalancing_test import ( + "context" + "fmt" + "net" "testing" "time" "github.com/grafana/alloy/internal/component/otelcol" otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/exporter/loadbalancing" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" + "github.com/grafana/dskit/backoff" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/configgrpc" @@ -15,6 +22,9 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "google.golang.org/grpc" ) func getPtrToUint(v uint16) *uint16 { @@ -22,6 +32,206 @@ func getPtrToUint(v uint16) *uint16 { return res } +// Test performs a basic integration test which runs the otelcol.exporter.loadbalancing +// component and ensures that it can pass data to an OTLP gRPC server. +func Test(t *testing.T) { + traceCh := make(chan ptrace.Traces) + tracesServer := makeTracesServer(t, traceCh) + + ctx := componenttest.TestContext(t) + l := util.TestLogger(t) + + ctrl, err := componenttest.NewControllerFromID(l, "otelcol.exporter.loadbalancing") + require.NoError(t, err) + + cfgTemplate := ` + routing_key = "%s" + resolver { + static { + hostnames = ["%s"] + } + } + protocol { + otlp { + client { + compression = "none" + + tls { + insecure = true + insecure_skip_verify = true + } + } + } + } + + debug_metrics { + disable_high_cardinality_metrics = true + } + ` + + cfg := fmt.Sprintf(cfgTemplate, "traceID", tracesServer) + var args loadbalancing.Arguments + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + require.Equal(t, args.DebugMetricsConfig().DisableHighCardinalityMetrics, true) + + go func() { + err := ctrl.Run(ctx, args) + require.NoError(t, err) + }() + + require.NoError(t, ctrl.WaitRunning(time.Second), "component never started") + require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything") + + // Send traces in the background to our exporter. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our exporter to finish and pass data to our rpc server. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for traces") + case tr := <-traceCh: + require.Equal(t, 1, tr.SpanCount()) + } + + // Update the config to disable traces export + cfg = fmt.Sprintf(cfgTemplate, "metric", tracesServer) + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + ctrl.Update(args) + + // Send traces in the background to our exporter. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MaxRetries: 3, + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + require.ErrorContains(t, err, "telemetry type is not supported") + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our exporter to finish and pass data to our rpc server. + // no error here, as we we expect to fail sending in the first place + select { + case <-traceCh: + require.FailNow(t, "no traces expected here") + case <-time.After(time.Second): + } + + // Re-run the test with reenabled traces export + cfg = fmt.Sprintf(cfgTemplate, "traceID", tracesServer) + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + ctrl.Update(args) + + // Send traces in the background to our exporter. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + if err != nil { + level.Error(l).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our exporter to finish and pass data to our rpc server. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for traces") + case tr := <-traceCh: + require.Equal(t, 1, tr.SpanCount()) + } +} + +// makeTracesServer returns a host:port which will accept traces over insecure +// gRPC. +func makeTracesServer(t *testing.T, ch chan ptrace.Traces) string { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + srv := grpc.NewServer() + ptraceotlp.RegisterGRPCServer(srv, &mockTracesReceiver{ch: ch}) + + go func() { + err := srv.Serve(lis) + require.NoError(t, err) + }() + t.Cleanup(srv.Stop) + + return lis.Addr().String() +} + +type mockTracesReceiver struct { + ptraceotlp.UnimplementedGRPCServer + ch chan ptrace.Traces +} + +var _ ptraceotlp.GRPCServer = (*mockTracesReceiver)(nil) + +func (ms *mockTracesReceiver) Export(_ context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { + ms.ch <- req.Traces() + return ptraceotlp.NewExportResponse(), nil +} + +func createTestTraces() ptrace.Traces { + // Matches format from the protobuf definition: + // https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto + bb := `{ + "resource_spans": [{ + "scope_spans": [{ + "spans": [{ + "name": "TestSpan" + }] + }] + }] + }` + + decoder := &ptrace.JSONUnmarshaler{} + data, err := decoder.UnmarshalTraces([]byte(bb)) + if err != nil { + panic(err) + } + return data +} + func TestConfigConversion(t *testing.T) { var ( defaultRetrySettings = configretry.NewDefaultBackOffConfig() From fd63dbb222c06fff764fe43c2bc283a613b89f69 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Tue, 10 Dec 2024 17:39:54 +0000 Subject: [PATCH 11/11] Update internal/component/otelcol/exporter/exporter.go --- internal/component/otelcol/exporter/exporter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 2a30e2b2ca..63e0f1cdd0 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -100,6 +100,7 @@ type Exporter struct { // Signals which the exporter is able to export. // Can be logs, metrics, traces or any combination of them. + // This is a function because which signals are supported may depend on the component configuration. supportedSignals TypeSignalFunc }