Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure metrics exporter for loadbalancing when possible #2242

Merged
merged 11 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Main (unreleased)

### Features

- Add support for metrics in `otelcol.exporter.loadbalancing` (@madaraszg-tulip)

- Add `add_cloudwatch_timestamp` to `prometheus.exporter.cloudwatch` metrics. (@captncraig)

- Add support to `prometheus.operator.servicemonitors` to allow `endpointslice` role. (@yoyosir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,19 @@ Name | Type | Description | Default |
`routing_key` | `string` | Routing strategy for load balancing. | `"traceID"` | no

The `routing_key` attribute determines how to route signals across endpoints. Its value could be one of the following:
* `"service"`: spans with the same `service.name` will be exported to the same backend.
- `"service"`: spans, logs, and metrics with the same `service.name` will be exported to the same backend.
This is useful when using processors like the span metrics, so all spans for each service are sent to consistent {{< param "PRODUCT_NAME" >}} instances
for metric collection. Otherwise, metrics for the same services would be sent to different instances, making aggregations inaccurate.
* `"traceID"`: spans belonging to the same traceID will be exported to the same backend.
- `"traceID"`: spans and logs belonging to the same `traceID` will be exported to the same backend.
- `"resource"`: metrics belonging to the same resource will be exported to the same backend.
- `"metric"`: metrics with the same name will be exported to the same backend.
- `"streamID"`: metrics with the same `streamID` will be exported to the same backend.

The loadbalancer configures the exporter for the signal types supported by the `routing_key`.

> **EXPERIMENTAL**: Metrics support in `otelcol.exporter.loadbalancing` is an [experimental][] feature.
> Experimental features are subject to frequent breaking changes, and may be removed with no equivalent replacement.
> The `stability.level` flag must be set to `experimental` to use the feature.

## Blocks

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/awss3/awss3.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := awss3exporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll))
},
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/datadog/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func init() {
// Since we don't have that, we disable the feature gate to allow the exporter to compute APM stats.
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/datadogexporter for more
featuregate.GlobalRegistry().Set("exporter.datadogexporter.DisableAPMStats", false)
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll))
},
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := debugexporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll))
},
})
}
Expand Down
20 changes: 15 additions & 5 deletions internal/component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func (s TypeSignal) SupportsTraces() bool {
return s&TypeTraces != 0
}

type TypeSignalFunc func(component.Options, component.Arguments) TypeSignal

func TypeSignalConstFunc(ts TypeSignal) TypeSignalFunc {
return func(component.Options, component.Arguments) TypeSignal {
return ts
}
}

// Exporter is an Alloy component shim which manages an OpenTelemetry Collector
// exporter component.
type Exporter struct {
Expand All @@ -92,7 +100,7 @@ type Exporter struct {

// Signals which the exporter is able to export.
// Can be logs, metrics, traces or any combination of them.
supportedSignals TypeSignal
supportedSignals TypeSignalFunc
ptodev marked this conversation as resolved.
Show resolved Hide resolved
}

var (
Expand All @@ -106,7 +114,7 @@ var (
//
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignal) (*Exporter, error) {
func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignalFunc) (*Exporter, error) {
ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.NewPaused(ctx)
Expand Down Expand Up @@ -212,8 +220,10 @@ func (e *Exporter) Update(args component.Arguments) error {
// supported telemetry signals.
var components []otelcomponent.Component

supportedSignals := e.supportedSignals(e.opts, args)

var tracesExporter otelexporter.Traces
if e.supportedSignals.SupportsTraces() {
if supportedSignals.SupportsTraces() {
tracesExporter, err = e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) {
return err
Expand All @@ -223,7 +233,7 @@ func (e *Exporter) Update(args component.Arguments) error {
}

var metricsExporter otelexporter.Metrics
if e.supportedSignals.SupportsMetrics() {
if supportedSignals.SupportsMetrics() {
metricsExporter, err = e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) {
return err
Expand All @@ -233,7 +243,7 @@ func (e *Exporter) Update(args component.Arguments) error {
}

var logsExporter otelexporter.Logs
if e.supportedSignals.SupportsLogs() {
if supportedSignals.SupportsLogs() {
logsExporter, err = e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newTestEnvironment(t *testing.T, fe *fakeExporter) *testEnvironment {
}, otelcomponent.StabilityLevelUndefined),
)

return exporter.New(opts, factory, args.(exporter.Arguments), exporter.TypeAll)
return exporter.New(opts, factory, args.(exporter.Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll))
},
}

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := kafkaexporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
return exporter.New(opts, fact, args.(Arguments), exporter.TypeSignalConstFunc(exporter.TypeAll))
},
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/exporter"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/syntax"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
otelcomponent "go.opentelemetry.io/collector/component"
Expand All @@ -34,10 +35,33 @@ func init() {

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := loadbalancingexporter.NewFactory()
//TODO(ptodev): LB exporter cannot yet work with metrics due to a limitation in Alloy:
// https://github.com/grafana/agent/pull/5684
// Once the limitation is removed, we may be able to remove the need for exporter.TypeSignal altogether.
return exporter.New(opts, fact, args.(Arguments), exporter.TypeLogs|exporter.TypeTraces)

// As per https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/loadbalancingexporter/README.md
// metrics is considered "development" stability level

typeSignalFunc := func(opts component.Options, args component.Arguments) exporter.TypeSignal {
myArgs := args.(Arguments)
var typeSignal exporter.TypeSignal
switch myArgs.RoutingKey {
case "traceID":
typeSignal = exporter.TypeLogs | exporter.TypeTraces
case "service":
if opts.MinStability.Permits(featuregate.StabilityExperimental) {
typeSignal = exporter.TypeLogs | exporter.TypeTraces | exporter.TypeMetrics
} else {
level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it")
typeSignal = exporter.TypeLogs | exporter.TypeTraces
}
case "resource", "metric", "streamID":
if opts.MinStability.Permits(featuregate.StabilityExperimental) {
typeSignal = exporter.TypeMetrics
} else {
level.Warn(opts.Logger).Log("msg", "disabling metrics exporter as stability level does not allow it")
}
}
return typeSignal
}
return exporter.New(opts, fact, args.(Arguments), typeSignalFunc)
},
})
}
Expand Down Expand Up @@ -69,13 +93,10 @@ func (args *Arguments) SetToDefault() {

// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
//TODO(ptodev): Add support for "resource" and "metric" routing keys later.
// The reason we can't add them yet is that otelcol.exporter.loadbalancing
// is labeled as "beta", but those routing keys are experimental.
// We need a way to label otelcol.exporter.loadbalancing as "public-preview"
// for logs and traces, but "experimental" for metrics.
// Allow routing keys for all signal types. Metrics exporter will be disabled
// if stability level is above experimental
switch args.RoutingKey {
case "service", "traceID":
case "service", "traceID", "resource", "metric", "streamID":
// The routing key is valid.
default:
return fmt.Errorf("invalid routing key %q", args.RoutingKey)
Expand Down
Loading
Loading