Skip to content

Commit

Permalink
otelcol.processor.deltatocumulative: new component (#1044)
Browse files Browse the repository at this point in the history
Introduce a new `otelcol` component which tracks metric streams with the
delta temporality and converts them into the cumulative temporality.

This component is initially introduced at the experimental level.

Co-authored-by: Carrie Edwards <[email protected]>
Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
3 people authored Jun 14, 2024
1 parent 3f18ccc commit c22b4a4
Show file tree
Hide file tree
Showing 20 changed files with 403 additions and 47 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Main (unreleased)

- Component `otelcol.receiver.vcenter` removed `vcenter.host.network.packet.errors`, `vcenter.host.network.packet.count`, and
`vcenter.vm.network.packet.count`.
- `vcenter.host.network.packet.errors` replaced by `vcenter.host.network.packet.error.rate`.
- `vcenter.host.network.packet.errors` replaced by `vcenter.host.network.packet.error.rate`.
- `vcenter.host.network.packet.count` replaced by `vcenter.host.network.packet.rate`.
- `vcenter.vm.network.packet.count` replaced by `vcenter.vm.network.packet.rate`.

Expand All @@ -36,6 +36,9 @@ Main (unreleased)

- Added live debugging support for `prometheus.relabel`. (@wildum)

- (_Experimental_) Add a `otelcol.processor.deltatocumulative` component to convert metrics from
delta temporality to cumulative by accumulating samples in memory. (@rfratto)

### Enhancements

- (_Public preview_) Add native histogram support to `otelcol.receiver.prometheus`. (@wildum)
Expand Down Expand Up @@ -105,6 +108,8 @@ Main (unreleased)

- Fixed an issue with `loki.source.docker` where collecting logs from targets configured with multiple networks would result in errors. (@wildum)

- Fixed an issue where converting OpenTelemetry Collector configs with unused telemetry types resulted in those types being explicitly configured with an empty array in `output` blocks, rather than them being omitted entirely. (@rfratto)

### Other changes

- `pyroscope.ebpf`, `pyroscope.java`, `pyroscope.scrape`, `pyroscope.write` and `discovery.process` components are now GA. (@korniltsev)
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/reference/compatibility/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ The following components, grouped by namespace, _export_ OpenTelemetry `otelcol.
- [otelcol.exporter.prometheus](../components/otelcol.exporter.prometheus)
- [otelcol.processor.attributes](../components/otelcol.processor.attributes)
- [otelcol.processor.batch](../components/otelcol.processor.batch)
- [otelcol.processor.deltatocumulative](../components/otelcol.processor.deltatocumulative)
- [otelcol.processor.discovery](../components/otelcol.processor.discovery)
- [otelcol.processor.filter](../components/otelcol.processor.filter)
- [otelcol.processor.k8sattributes](../components/otelcol.processor.k8sattributes)
Expand Down Expand Up @@ -330,6 +331,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol
- [otelcol.connector.spanmetrics](../components/otelcol.connector.spanmetrics)
- [otelcol.processor.attributes](../components/otelcol.processor.attributes)
- [otelcol.processor.batch](../components/otelcol.processor.batch)
- [otelcol.processor.deltatocumulative](../components/otelcol.processor.deltatocumulative)
- [otelcol.processor.discovery](../components/otelcol.processor.discovery)
- [otelcol.processor.filter](../components/otelcol.processor.filter)
- [otelcol.processor.k8sattributes](../components/otelcol.processor.k8sattributes)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
---
canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol.processor.deltatocumulative/
description: Learn about otelcol.processor.deltatocumulative
title: otelcol.processor.deltatocumulative
---

<span class="badge docs-labels__stage docs-labels__item">Experimental</span>

# otelcol.processor.deltatocumulative

{{< docs/shared lookup="stability/experimental.md" source="alloy" version="<ALLOY_VERSION>" >}}

`otelcol.processor.deltatocumulative` accepts metrics from other `otelcol` components and converts metrics with the delta temporality to cumulative.

{{< admonition type="note" >}}
`otelcol.processor.deltatocumulative` is a wrapper over the upstream OpenTelemetry Collector `deltatocumulative` processor.
Bug reports or feature requests will be redirected to the upstream repository, if necessary.
{{< /admonition >}}

You can specify multiple `otelcol.processor.deltatocumulative` components by giving them different labels.

## Usage

```alloy
otelcol.processor.deltatocumulative "LABEL" {
output {
metrics = [...]
}
}
```

## Arguments

`otelcol.processor.deltatocumulative` supports the following arguments:

Name | Type | Description | Default | Required
------------- | ---------- | ------------------------------------------------------------------- | ------- | --------
`max_stale` | `duration` | How long to wait for a new sample before marking a stream as stale. | `"5m"` | no
`max_streams` | `number` | Upper limit of streams to track. Set to `0` to disable. | `0` | no

`otelcol.processor.deltatocumulative` tracks incoming metric streams.
Sum and exponential histogram metrics with delta temporality are tracked and converted into cumulative temporality.

If a new sample hasn't been received since the duration specified by `max_stale`, tracked streams are considered stale and dropped. `max_stale` must be set to a duration greater than `"0s"`.

The `max_streams` attribute configures the upper limit of streams to track.
If the limit of tracked streams is reached, new incoming streams are dropped.
You can disable this behavior by setting `max_streams` to `0`.

## Blocks

The following blocks are supported inside the definition of `otelcol.processor.deltatocumulative`:

Hierarchy | Block | Description | Required
------------- | ----------------- | -------------------------------------------------------------------------- | --------
output | [output][] | Configures where to send received telemetry data. | yes
debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no

[output]: #output-block
[debug_metrics]: #debug_metrics-block

### output block

{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

### debug_metrics block

{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

## Exported fields

The following fields are exported and can be referenced by other components:

Name | Type | Description
--------|--------------------|-----------------------------------------------------------------
`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to.

`input` accepts `otelcol.Consumer` data for metrics.

## Component health

`otelcol.processor.deltatocumulative` is only reported as unhealthy if given an invalid configuration.

## Debug information

`otelcol.processor.deltatocumulative` does not expose any component-specific debug information.

## Debug metrics

* `processor_deltatocumulative_streams_tracked` (gauge): Number of streams currently tracked by the aggregation state.
* `processor_deltatocumulative_streams_limit` (gauge): Upper limit of tracked streams.
* `processor_deltatocumulative_streams_evicted` (counter): Total number of streams removed from tracking to ingest newer streams.
* `processor_deltatocumulative_streams_max_stale` (gauge): Duration without new samples after which streams are dropped.
* `processor_deltatocumulative_datapoints_processed` (counter): Total number of datapoints processed (successfully or unsuccessfully).
* `processor_deltatocumulative_datapoints_dropped` (counter): Faulty datapoints that were dropped due to the reason given in the `reason` label.
* `processor_deltatocumulative_gaps_length` (counter): Total length of all gaps in the streams, such as being due to lost in transit.

## Examples

### Basic usage

This example converts delta temporality metrics to cumulative before sending it to [otelcol.exporter.otlp][] for further processing:

```alloy
otelcol.processor.deltatocumulative "default" {
output {
metrics = [otelcol.exporter.otlp.production.input]
}
}
otelcol.exporter.otlp "production" {
client {
endpoint = env("OTLP_SERVER_ENDPOINT")
}
}
```

[otelcol.exporter.otlp]: ../otelcol.exporter.otlp/

### Exporting Prometheus data

This example converts delta temporality metrics to cumulative metrics before it is converted to Prometheus data, which requires cumulative temporality:

```alloy
otelcol.processor.deltatocumulative "default" {
output {
metrics = [otelcol.exporter.prometheus.default.input]
}
}
otelcol.exporter.prometheus "default" {
forward_to = [prometheus.remote_write.default.receiver]
}
prometheus.remote_write "default" {
endpoint {
url = env("PROMETHEUS_SERVER_URL")
}
}
```

<!-- START GENERATED COMPATIBLE COMPONENTS -->

## Compatible components

`otelcol.processor.deltatocumulative` can accept arguments from the following components:

- Components that export [OpenTelemetry `otelcol.Consumer`](../../compatibility/#opentelemetry-otelcolconsumer-exporters)

`otelcol.processor.deltatocumulative` has exports that can be consumed by the following components:

- Components that consume [OpenTelemetry `otelcol.Consumer`](../../compatibility/#opentelemetry-otelcolconsumer-consumers)

{{< admonition type="note" >}}
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
Refer to the linked documentation for more details.
{{< /admonition >}}

<!-- END GENERATED COMPATIBLE COMPONENTS -->
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ require (
github.com/oliver006/redis_exporter v1.54.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.102.0
Expand All @@ -118,6 +119,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.102.0
Expand Down Expand Up @@ -249,8 +251,6 @@ require (
sigs.k8s.io/yaml v1.4.0
)

require github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter v0.102.0

require (
cloud.google.com/go v0.112.2 // indirect
cloud.google.com/go/auth v0.4.1 // indirect
Expand Down Expand Up @@ -557,6 +557,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.102.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0/go.mod h1:cBbjwd8m4rBVgCQksUbAVQX1EoM5IuCyNQw2mzvibEM=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0 h1:qsM5HhWpAfIMg8LdO4u+CHofu4UuCuJwg/M+ySO9uZA=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0/go.mod h1:wBJlGy9Wx6s7AxIMcSne2sGw73e5ZUy1AQ/duYwpFf8=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.102.0 h1:D3v6GrN/7C70PGa+xpI5kV5U+oZTHtcgThhuZxV4Zz0=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.102.0/go.mod h1:AbsUQcUIMTCAmvXbOdjuejqWftMi+pfYEK9YQGqRS7s=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0 h1:TN+wdhgwDn4zSr39fFOG0e7XJNCDwUSJb8HiBZ5orWk=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0/go.mod h1:RNe02aDLdqqEsJ+nemN+TDJf016wKf87eZYuAEfhZyU=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.102.0 h1:CS9t6i//34KdqCw/kOmSydkmBtpOB7+1fLv1QN3kKyE=
Expand Down Expand Up @@ -1746,6 +1748,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.102.0/go.mod h1:BEQy0zEel5uIOTEFBBmvQJ4A32R6nKLtSMtC6ylLI8k=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.102.0 h1:7CHzBkwrwfKBAYid7ii7CKO7kxSVVruMJKEnXFfO8ig=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.102.0/go.mod h1:OSi85ea3BWIrFqrB6q1QN1F5sCfTzJS6ECGD2Bk30JQ=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.102.0 h1:YkTuvT7ODIMVIrc/XDxiHOlZLbo+uEQSCnZ2+42SF4A=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.102.0/go.mod h1:4mjsDJoPFf7MDE6bQpDEr25D/U2HTaD4OZKwo7gt8t8=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.102.0 h1:DaEYlVCn58GtkyYVK0IT/ZMjRFJ+BfmR0p9I0Eq42aQ=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.102.0/go.mod h1:u9x08rUCWdgI8Nle5XOMTCmxd0K26KTZvMMA5H8Xjyg=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.102.0 h1:mkRDKVWXfG1gTxwg69ttJoGmXOKNHAGsGms06DrwTlQ=
Expand Down
1 change: 1 addition & 0 deletions internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import (
_ "github.com/grafana/alloy/internal/component/otelcol/extension/jaeger_remote_sampling" // Import otelcol.extension.jaeger_remote_sampling
_ "github.com/grafana/alloy/internal/component/otelcol/processor/attributes" // Import otelcol.processor.attributes
_ "github.com/grafana/alloy/internal/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" // Import otelcol.processor.deltatocumulative
_ "github.com/grafana/alloy/internal/component/otelcol/processor/discovery" // Import otelcol.processor.discovery
_ "github.com/grafana/alloy/internal/component/otelcol/processor/filter" // Import otelcol.processor.filter
_ "github.com/grafana/alloy/internal/component/otelcol/processor/k8sattributes" // Import otelcol.processor.k8sattributes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Package deltatocumulative provides an otelcol.processor.deltatocumulative
// component.
package deltatocumulative

import (
"fmt"
"time"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/processor"
"github.com/grafana/alloy/internal/featuregate"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.deltatocumulative",
Stability: featuregate.StabilityExperimental,
Args: Arguments{},
Exports: otelcol.ConsumerExports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := deltatocumulativeprocessor.NewFactory()
return processor.New(opts, fact, args.(Arguments))
},
})
}

// Arguments configures the otelcol.processor.deltatocumulative component.
type Arguments struct {
MaxStale time.Duration `alloy:"max_stale,attr,optional"`
MaxStreams int `alloy:"max_streams,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `alloy:"output,block"`

// DebugMetrics configures component internal metrics. Optional.
DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"`
}

var (
_ processor.Arguments = Arguments{}
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
MaxStale: 5 * time.Minute,

// NOTE(rfratto): 0 means disable, and there is an ongoing effort upstream to
// identify a good non-zero default:
//
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31603
MaxStreams: 0,
}

// SetToDefault implements syntax.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
args.DebugMetrics.SetToDefault()
}

// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
if args.MaxStale <= 0 {
return fmt.Errorf("max_stale must be a positive duration (got %s)", args.MaxStale)
}
if args.MaxStreams < 0 {
return fmt.Errorf("max_streams must be a positive number or zero (got %d)", args.MaxStreams)
}
return nil
}

// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &deltatocumulativeprocessor.Config{
MaxStale: args.MaxStale,
MaxStreams: args.MaxStreams,
}, nil
}

// Extensions implements processor.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements processor.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements processor.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}

// DebugMetricsConfig implements processor.Arguments.
func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments {
return args.DebugMetrics
}
3 changes: 3 additions & 0 deletions internal/converter/internal/otelcolconvert/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (state *State) Next(c component.InstanceID, dataType component.DataType) []
})
}

if len(ids) == 0 {
return nil
}
return ids
}

Expand Down
Loading

0 comments on commit c22b4a4

Please sign in to comment.