Skip to content

Commit

Permalink
Configure metrics exporter for loadbalancing when possible (#2242)
Browse files Browse the repository at this point in the history
* Configure metrics exporter for loadbalancing when available

* Update docs/sources/reference/components/otelcol/otelcol.exporter.loadbalancing.md

Co-authored-by: Clayton Cornell <[email protected]>

* Allow update of signal type during exporter lifetime

* Updated loadbalancing test to verify routing_key changes

* Update internal/component/otelcol/exporter/exporter.go

---------

Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
madaraszg-tulip and clayton-cornell authored Dec 11, 2024
1 parent 14c3bd2 commit 057deff
Show file tree
Hide file tree
Showing 14 changed files with 279 additions and 26 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Main (unreleased)

### Features

- Add support for metrics in `otelcol.exporter.loadbalancing` (@madaraszg-tulip)

- Add `add_cloudwatch_timestamp` to `prometheus.exporter.cloudwatch` metrics. (@captncraig)

- Add support to `prometheus.operator.servicemonitors` to allow `endpointslice` role. (@yoyosir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,19 @@ Name | Type | Description | Default |
`routing_key` | `string` | Routing strategy for load balancing. | `"traceID"` | no

The `routing_key` attribute determines how to route signals across endpoints. Its value could be one of the following:
* `"service"`: spans with the same `service.name` will be exported to the same backend.
- `"service"`: spans, logs, and metrics with the same `service.name` will be exported to the same backend.
This is useful when using processors like the span metrics, so all spans for each service are sent to consistent {{< param "PRODUCT_NAME" >}} instances
for metric collection. Otherwise, metrics for the same services would be sent to different instances, making aggregations inaccurate.
* `"traceID"`: spans belonging to the same traceID will be exported to the same backend.
- `"traceID"`: spans and logs belonging to the same `traceID` will be exported to the same backend.
- `"resource"`: metrics belonging to the same resource will be exported to the same backend.
- `"metric"`: metrics with the same name will be exported to the same backend.
- `"streamID"`: metrics with the same `streamID` will be exported to the same backend.

The loadbalancer configures the exporter for the signal types supported by the `routing_key`.

> **EXPERIMENTAL**: Metrics support in `otelcol.exporter.loadbalancing` is an [experimental][] feature.
> Experimental features are subject to frequent breaking changes, and may be removed with no equivalent replacement.
> The `stability.level` flag must be set to `experimental` to use the feature.
## Blocks

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/awss3/awss3.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := awss3exporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll))
},
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/datadog/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func init() {
// Since we don't have that, we disable the feature gate to allow the exporter to compute APM stats.
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/datadogexporter for more
featuregate.GlobalRegistry().Set("exporter.datadogexporter.DisableAPMStats", false)
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll))
},
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := debugexporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll))
},
})
}
Expand Down
21 changes: 16 additions & 5 deletions internal/component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func (s TypeSignal) SupportsTraces() bool {
return s&TypeTraces != 0
}

type TypeSignalFunc func(component.Options, component.Arguments) TypeSignal

func TypeSignalConstFunc(ts TypeSignal) TypeSignalFunc {
return func(component.Options, component.Arguments) TypeSignal {
return ts
}
}

// Exporter is an Alloy component shim which manages an OpenTelemetry Collector
// exporter component.
type Exporter struct {
Expand All @@ -92,7 +100,8 @@ type Exporter struct {

// Signals which the exporter is able to export.
// Can be logs, metrics, traces or any combination of them.
supportedSignals TypeSignal
// This is a function because which signals are supported may depend on the component configuration.
supportedSignals TypeSignalFunc
}

var (
Expand All @@ -106,7 +115,7 @@ var (
//
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignal) (*Exporter, error) {
func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignalFunc) (*Exporter, error) {
ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.NewPaused(ctx)
Expand Down Expand Up @@ -212,8 +221,10 @@ func (e *Exporter) Update(args component.Arguments) error {
// supported telemetry signals.
var components []otelcomponent.Component

supportedSignals := e.supportedSignals(e.opts, args)

var tracesExporter otelexporter.Traces
if e.supportedSignals.SupportsTraces() {
if supportedSignals.SupportsTraces() {
tracesExporter, err = e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) {
return err
Expand All @@ -223,7 +234,7 @@ func (e *Exporter) Update(args component.Arguments) error {
}

var metricsExporter otelexporter.Metrics
if e.supportedSignals.SupportsMetrics() {
if supportedSignals.SupportsMetrics() {
metricsExporter, err = e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) {
return err
Expand All @@ -233,7 +244,7 @@ func (e *Exporter) Update(args component.Arguments) error {
}

var logsExporter otelexporter.Logs
if e.supportedSignals.SupportsLogs() {
if supportedSignals.SupportsLogs() {
logsExporter, err = e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newTestEnvironment(t *testing.T, fe *fakeExporter) *testEnvironment {
}, otelcomponent.StabilityLevelUndefined),
)

return exporter.New(opts, factory, args.(exporter.Arguments), exporter.TypeAll)
return exporter.New(opts, factory, args.(exporter.Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll))
},
}

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := kafkaexporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll))
},
})
}
Expand Down
41 changes: 31 additions & 10 deletions internal/component/otelcol/exporter/loadbalancing/loadbalancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/exporter"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/syntax"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
otelcomponent "go.opentelemetry.io/collector/component"
Expand All @@ -34,10 +35,33 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := loadbalancingexporter.NewFactory()
//TODO(ptodev): LB exporter cannot yet work with metrics due to a limitation in Alloy:
// https://github.com/grafana/agent/pull/5684
// Once the limitation is removed, we may be able to remove the need for exporter.TypeSignal altogether.
return exporter.New(opts, fact, args.(Arguments), exporter.TypeLogs|exporter.TypeTraces)

// As per https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/loadbalancingexporter/README.md
// metrics is considered "development" stability level

typeSignalFunc := func(opts component.Options, args component.Arguments) exporter.TypeSignal {
myArgs := args.(Arguments)
var typeSignal exporter.TypeSignal
switch myArgs.RoutingKey {
case "traceID":
typeSignal = exporter.TypeLogs | exporter.TypeTraces
case "service":
if opts.MinStability.Permits(featuregate.StabilityExperimental) {
typeSignal = exporter.TypeLogs | exporter.TypeTraces | exporter.TypeMetrics
} else {
level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it")
typeSignal = exporter.TypeLogs | exporter.TypeTraces
}
case "resource", "metric", "streamID":
if opts.MinStability.Permits(featuregate.StabilityExperimental) {
typeSignal = exporter.TypeMetrics
} else {
level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it")
}
}
return typeSignal
}
return exporter.New(opts, fact, args.(Arguments), typeSignalFunc)
},
})
}
Expand Down Expand Up @@ -69,13 +93,10 @@ func (args *Arguments) SetToDefault() {

// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
//TODO(ptodev): Add support for "resource" and "metric" routing keys later.
// The reason we can't add them yet is that otelcol.exporter.loadbalancing
// is labeled as "beta", but those routing keys are experimental.
// We need a way to label otelcol.exporter.loadbalancing as "public-preview"
// for logs and traces, but "experimental" for metrics.
// Allow routing keys for all signal types. Metrics exporter will be disabled
// if stability level is above experimental
switch args.RoutingKey {
case "service", "traceID":
case "service", "traceID", "resource", "metric", "streamID":
// The routing key is valid.
default:
return fmt.Errorf("invalid routing key %q", args.RoutingKey)
Expand Down
Loading

0 comments on commit 057deff

Please sign in to comment.