From c22b4a493ad987bce2583c4ab9023558d883a640 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Fri, 14 Jun 2024 14:00:41 -0400 Subject: [PATCH] otelcol.processor.deltatocumulative: new component (#1044) Introduce a new `otelcol` component which tracks metric streams with the delta temporality and converts them into the cumulative temporality. This component is initially introduced at the experimental level. Co-authored-by: Carrie Edwards Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- CHANGELOG.md | 7 +- .../sources/reference/compatibility/_index.md | 2 + .../otelcol.processor.deltatocumulative.md | 159 ++++++++++++++++++ go.mod | 5 +- go.sum | 4 + internal/component/all/all.go | 1 + .../deltatocumulative/deltatocumulative.go | 103 ++++++++++++ .../internal/otelcolconvert/converter.go | 3 + .../converter_deltatocumulativeprocessor.go | 62 +++++++ .../otelcolconvert/converter_helpers.go | 4 + .../testdata/deltatocumulative.alloy | 21 +++ .../testdata/deltatocumulative.yaml | 19 +++ .../testdata/inconsistent_processor.alloy | 4 +- .../testdata/servicegraph.alloy | 4 +- .../otelcolconvert/testdata/spanmetrics.alloy | 5 +- .../testdata/spanmetrics_full.alloy | 4 +- .../testdata/vcenterreceiver.alloy | 3 +- .../staticconvert/testdata/traces.alloy | 20 +-- .../staticconvert/testdata/traces_multi.alloy | 16 +- .../staticconvert/testdata/unsupported.alloy | 4 +- 20 files changed, 403 insertions(+), 47 deletions(-) create mode 100644 docs/sources/reference/components/otelcol.processor.deltatocumulative.md create mode 100644 internal/component/otelcol/processor/deltatocumulative/deltatocumulative.go create mode 100644 internal/converter/internal/otelcolconvert/converter_deltatocumulativeprocessor.go create mode 100644 internal/converter/internal/otelcolconvert/testdata/deltatocumulative.alloy create mode 100644 internal/converter/internal/otelcolconvert/testdata/deltatocumulative.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cedec99fa..7d491a7383 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ Main (unreleased) - Component `otelcol.receiver.vcenter` removed `vcenter.host.network.packet.errors`, `vcenter.host.network.packet.count`, and `vcenter.vm.network.packet.count`. - - `vcenter.host.network.packet.errors` replaced by `vcenter.host.network.packet.error.rate`. + - `vcenter.host.network.packet.errors` replaced by `vcenter.host.network.packet.error.rate`. - `vcenter.host.network.packet.count` replaced by `vcenter.host.network.packet.rate`. - `vcenter.vm.network.packet.count` replaced by `vcenter.vm.network.packet.rate`. @@ -36,6 +36,9 @@ Main (unreleased) - Added live debugging support for `prometheus.relabel`. (@wildum) +- (_Experimental_) Add a `otelcol.processor.deltatocumulative` component to convert metrics from + delta temporality to cumulative by accumulating samples in memory. (@rfratto) + ### Enhancements - (_Public preview_) Add native histogram support to `otelcol.receiver.prometheus`. (@wildum) @@ -105,6 +108,8 @@ Main (unreleased) - Fixed an issue with `loki.source.docker` where collecting logs from targets configured with multiple networks would result in errors. (@wildum) +- Fixed an issue where converting OpenTelemetry Collector configs with unused telemetry types resulted in those types being explicitly configured with an empty array in `output` blocks, rather than them being omitted entirely. (@rfratto) + ### Other changes - `pyroscope.ebpf`, `pyroscope.java`, `pyroscope.scrape`, `pyroscope.write` and `discovery.process` components are now GA. (@korniltsev) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index a1f9609424..fa599c6234 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -295,6 +295,7 @@ The following components, grouped by namespace, _export_ OpenTelemetry `otelcol. - [otelcol.exporter.prometheus](../components/otelcol.exporter.prometheus) - [otelcol.processor.attributes](../components/otelcol.processor.attributes) - [otelcol.processor.batch](../components/otelcol.processor.batch) +- [otelcol.processor.deltatocumulative](../components/otelcol.processor.deltatocumulative) - [otelcol.processor.discovery](../components/otelcol.processor.discovery) - [otelcol.processor.filter](../components/otelcol.processor.filter) - [otelcol.processor.k8sattributes](../components/otelcol.processor.k8sattributes) @@ -330,6 +331,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol - [otelcol.connector.spanmetrics](../components/otelcol.connector.spanmetrics) - [otelcol.processor.attributes](../components/otelcol.processor.attributes) - [otelcol.processor.batch](../components/otelcol.processor.batch) +- [otelcol.processor.deltatocumulative](../components/otelcol.processor.deltatocumulative) - [otelcol.processor.discovery](../components/otelcol.processor.discovery) - [otelcol.processor.filter](../components/otelcol.processor.filter) - [otelcol.processor.k8sattributes](../components/otelcol.processor.k8sattributes) diff --git a/docs/sources/reference/components/otelcol.processor.deltatocumulative.md b/docs/sources/reference/components/otelcol.processor.deltatocumulative.md new file mode 100644 index 0000000000..7e5f8bde40 --- /dev/null +++ b/docs/sources/reference/components/otelcol.processor.deltatocumulative.md @@ -0,0 +1,159 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol.processor.deltatocumulative/ +description: Learn about otelcol.processor.deltatocumulative +title: otelcol.processor.deltatocumulative +--- + +Experimental + +# otelcol.processor.deltatocumulative + +{{< docs/shared lookup="stability/experimental.md" source="alloy" version="" >}} + +`otelcol.processor.deltatocumulative` accepts metrics from other `otelcol` components and converts metrics with the delta temporality to cumulative. + +{{< admonition type="note" >}} +`otelcol.processor.deltatocumulative` is a wrapper over the upstream OpenTelemetry Collector `deltatocumulative` processor. +Bug reports or feature requests will be redirected to the upstream repository, if necessary. +{{< /admonition >}} + +You can specify multiple `otelcol.processor.deltatocumulative` components by giving them different labels. + +## Usage + +```alloy +otelcol.processor.deltatocumulative "LABEL" { + output { + metrics = [...] + } +} +``` + +## Arguments + +`otelcol.processor.deltatocumulative` supports the following arguments: + +Name | Type | Description | Default | Required +------------- | ---------- | ------------------------------------------------------------------- | ------- | -------- +`max_stale` | `duration` | How long to wait for a new sample before marking a stream as stale. | `"5m"` | no +`max_streams` | `number` | Upper limit of streams to track. Set to `0` to disable. | `0` | no + +`otelcol.processor.deltatocumulative` tracks incoming metric streams. +Sum and exponential histogram metrics with delta temporality are tracked and converted into cumulative temporality. + +If a new sample hasn't been received since the duration specified by `max_stale`, tracked streams are considered stale and dropped. `max_stale` must be set to a duration greater than `"0s"`. + +The `max_streams` attribute configures the upper limit of streams to track. +If the limit of tracked streams is reached, new incoming streams are dropped. +You can disable this behavior by setting `max_streams` to `0`. + +## Blocks + +The following blocks are supported inside the definition of `otelcol.processor.deltatocumulative`: + +Hierarchy | Block | Description | Required +------------- | ----------------- | -------------------------------------------------------------------------- | -------- +output | [output][] | Configures where to send received telemetry data. | yes +debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no + +[output]: #output-block +[debug_metrics]: #debug_metrics-block + +### output block + +{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="" >}} + +### debug_metrics block + +{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +--------|--------------------|----------------------------------------------------------------- +`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to. + +`input` accepts `otelcol.Consumer` data for metrics. + +## Component health + +`otelcol.processor.deltatocumulative` is only reported as unhealthy if given an invalid configuration. + +## Debug information + +`otelcol.processor.deltatocumulative` does not expose any component-specific debug information. + +## Debug metrics + +* `processor_deltatocumulative_streams_tracked` (gauge): Number of streams currently tracked by the aggregation state. +* `processor_deltatocumulative_streams_limit` (gauge): Upper limit of tracked streams. +* `processor_deltatocumulative_streams_evicted` (counter): Total number of streams removed from tracking to ingest newer streams. +* `processor_deltatocumulative_streams_max_stale` (gauge): Duration without new samples after which streams are dropped. +* `processor_deltatocumulative_datapoints_processed` (counter): Total number of datapoints processed (successfully or unsuccessfully). +* `processor_deltatocumulative_datapoints_dropped` (counter): Faulty datapoints that were dropped due to the reason given in the `reason` label. +* `processor_deltatocumulative_gaps_length` (counter): Total length of all gaps in the streams, such as being due to lost in transit. + +## Examples + +### Basic usage + +This example converts delta temporality metrics to cumulative before sending it to [otelcol.exporter.otlp][] for further processing: + +```alloy +otelcol.processor.deltatocumulative "default" { + output { + metrics = [otelcol.exporter.otlp.production.input] + } +} + +otelcol.exporter.otlp "production" { + client { + endpoint = env("OTLP_SERVER_ENDPOINT") + } +} +``` + +[otelcol.exporter.otlp]: ../otelcol.exporter.otlp/ + +### Exporting Prometheus data + +This example converts delta temporality metrics to cumulative metrics before it is converted to Prometheus data, which requires cumulative temporality: + +```alloy +otelcol.processor.deltatocumulative "default" { + output { + metrics = [otelcol.exporter.prometheus.default.input] + } +} + +otelcol.exporter.prometheus "default" { + forward_to = [prometheus.remote_write.default.receiver] +} + +prometheus.remote_write "default" { + endpoint { + url = env("PROMETHEUS_SERVER_URL") + } +} +``` + + + +## Compatible components + +`otelcol.processor.deltatocumulative` can accept arguments from the following components: + +- Components that export [OpenTelemetry `otelcol.Consumer`](../../compatibility/#opentelemetry-otelcolconsumer-exporters) + +`otelcol.processor.deltatocumulative` has exports that can be consumed by the following components: + +- Components that consume [OpenTelemetry `otelcol.Consumer`](../../compatibility/#opentelemetry-otelcolconsumer-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/go.mod b/go.mod index 6221d96acb..74d4eabf12 100644 --- a/go.mod +++ b/go.mod @@ -103,6 +103,7 @@ require ( github.com/oliver006/redis_exporter v1.54.0 github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector v0.102.0 + github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.102.0 @@ -118,6 +119,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.102.0 + github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.102.0 @@ -249,8 +251,6 @@ require ( sigs.k8s.io/yaml v1.4.0 ) -require github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter v0.102.0 - require ( cloud.google.com/go v0.112.2 // indirect cloud.google.com/go/auth v0.4.1 // indirect @@ -557,6 +557,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.102.0 // indirect diff --git a/go.sum b/go.sum index 75c5a59d02..7e68de3f25 100644 --- a/go.sum +++ b/go.sum @@ -1706,6 +1706,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0/go.mod h1:cBbjwd8m4rBVgCQksUbAVQX1EoM5IuCyNQw2mzvibEM= github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0 h1:qsM5HhWpAfIMg8LdO4u+CHofu4UuCuJwg/M+ySO9uZA= github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0/go.mod h1:wBJlGy9Wx6s7AxIMcSne2sGw73e5ZUy1AQ/duYwpFf8= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.102.0 h1:D3v6GrN/7C70PGa+xpI5kV5U+oZTHtcgThhuZxV4Zz0= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.102.0/go.mod h1:AbsUQcUIMTCAmvXbOdjuejqWftMi+pfYEK9YQGqRS7s= github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0 h1:TN+wdhgwDn4zSr39fFOG0e7XJNCDwUSJb8HiBZ5orWk= github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0/go.mod h1:RNe02aDLdqqEsJ+nemN+TDJf016wKf87eZYuAEfhZyU= github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.102.0 h1:CS9t6i//34KdqCw/kOmSydkmBtpOB7+1fLv1QN3kKyE= @@ -1746,6 +1748,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.102.0/go.mod h1:BEQy0zEel5uIOTEFBBmvQJ4A32R6nKLtSMtC6ylLI8k= github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.102.0 h1:7CHzBkwrwfKBAYid7ii7CKO7kxSVVruMJKEnXFfO8ig= github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.102.0/go.mod h1:OSi85ea3BWIrFqrB6q1QN1F5sCfTzJS6ECGD2Bk30JQ= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.102.0 h1:YkTuvT7ODIMVIrc/XDxiHOlZLbo+uEQSCnZ2+42SF4A= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.102.0/go.mod h1:4mjsDJoPFf7MDE6bQpDEr25D/U2HTaD4OZKwo7gt8t8= github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.102.0 h1:DaEYlVCn58GtkyYVK0IT/ZMjRFJ+BfmR0p9I0Eq42aQ= github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.102.0/go.mod h1:u9x08rUCWdgI8Nle5XOMTCmxd0K26KTZvMMA5H8Xjyg= github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.102.0 h1:mkRDKVWXfG1gTxwg69ttJoGmXOKNHAGsGms06DrwTlQ= diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 5af2516dfb..981896ef8c 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -78,6 +78,7 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/extension/jaeger_remote_sampling" // Import otelcol.extension.jaeger_remote_sampling _ "github.com/grafana/alloy/internal/component/otelcol/processor/attributes" // Import otelcol.processor.attributes _ "github.com/grafana/alloy/internal/component/otelcol/processor/batch" // Import otelcol.processor.batch + _ "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" // Import otelcol.processor.deltatocumulative _ "github.com/grafana/alloy/internal/component/otelcol/processor/discovery" // Import otelcol.processor.discovery _ "github.com/grafana/alloy/internal/component/otelcol/processor/filter" // Import otelcol.processor.filter _ "github.com/grafana/alloy/internal/component/otelcol/processor/k8sattributes" // Import otelcol.processor.k8sattributes diff --git a/internal/component/otelcol/processor/deltatocumulative/deltatocumulative.go b/internal/component/otelcol/processor/deltatocumulative/deltatocumulative.go new file mode 100644 index 0000000000..024e62363a --- /dev/null +++ b/internal/component/otelcol/processor/deltatocumulative/deltatocumulative.go @@ -0,0 +1,103 @@ +// Package deltatocumulative provides an otelcol.processor.deltatocumulative +// component. +package deltatocumulative + +import ( + "fmt" + "time" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/processor" + "github.com/grafana/alloy/internal/featuregate" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" + otelcomponent "go.opentelemetry.io/collector/component" + otelextension "go.opentelemetry.io/collector/extension" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.processor.deltatocumulative", + Stability: featuregate.StabilityExperimental, + Args: Arguments{}, + Exports: otelcol.ConsumerExports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := deltatocumulativeprocessor.NewFactory() + return processor.New(opts, fact, args.(Arguments)) + }, + }) +} + +// Arguments configures the otelcol.processor.deltatocumulative component. +type Arguments struct { + MaxStale time.Duration `alloy:"max_stale,attr,optional"` + MaxStreams int `alloy:"max_streams,attr,optional"` + + // Output configures where to send processed data. Required. + Output *otelcol.ConsumerArguments `alloy:"output,block"` + + // DebugMetrics configures component internal metrics. Optional. + DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` +} + +var ( + _ processor.Arguments = Arguments{} +) + +// DefaultArguments holds default settings for Arguments. +var DefaultArguments = Arguments{ + MaxStale: 5 * time.Minute, + + // NOTE(rfratto): 0 means disable, and there is an ongoing effort upstream to + // identify a good non-zero default: + // + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31603 + MaxStreams: 0, +} + +// SetToDefault implements syntax.Defaulter. +func (args *Arguments) SetToDefault() { + *args = DefaultArguments + args.DebugMetrics.SetToDefault() +} + +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + if args.MaxStale <= 0 { + return fmt.Errorf("max_stale must be a positive duration (got %s)", args.MaxStale) + } + if args.MaxStreams < 0 { + return fmt.Errorf("max_streams must be a positive number or zero (got %d)", args.MaxStreams) + } + return nil +} + +// Convert implements processor.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + return &deltatocumulativeprocessor.Config{ + MaxStale: args.MaxStale, + MaxStreams: args.MaxStreams, + }, nil +} + +// Extensions implements processor.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements processor.Arguments. +func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// NextConsumers implements processor.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} + +// DebugMetricsConfig implements processor.Arguments. +func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments { + return args.DebugMetrics +} diff --git a/internal/converter/internal/otelcolconvert/converter.go b/internal/converter/internal/otelcolconvert/converter.go index cf06004589..fc03ab0839 100644 --- a/internal/converter/internal/otelcolconvert/converter.go +++ b/internal/converter/internal/otelcolconvert/converter.go @@ -174,6 +174,9 @@ func (state *State) Next(c component.InstanceID, dataType component.DataType) [] }) } + if len(ids) == 0 { + return nil + } return ids } diff --git a/internal/converter/internal/otelcolconvert/converter_deltatocumulativeprocessor.go b/internal/converter/internal/otelcolconvert/converter_deltatocumulativeprocessor.go new file mode 100644 index 0000000000..4020a031df --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_deltatocumulativeprocessor.go @@ -0,0 +1,62 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" + "github.com/grafana/alloy/internal/converter/diag" + "github.com/grafana/alloy/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" + "go.opentelemetry.io/collector/component" +) + +func init() { + converters = append(converters, deltatocumulativeProcessorConverter{}) +} + +type deltatocumulativeProcessorConverter struct{} + +func (deltatocumulativeProcessorConverter) Factory() component.Factory { + return deltatocumulativeprocessor.NewFactory() +} + +func (deltatocumulativeProcessorConverter) InputComponentName() string { + return "otelcol.processor.deltatocumulative" +} + +func (deltatocumulativeProcessorConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.AlloyComponentLabel() + + args := toDeltatocumulativeProcessor(state, id, cfg.(*deltatocumulativeprocessor.Config)) + block := common.NewBlockWithOverride([]string{"otelcol", "processor", "deltatocumulative"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toDeltatocumulativeProcessor(state *State, id component.InstanceID, cfg *deltatocumulativeprocessor.Config) *deltatocumulative.Arguments { + var ( + nextMetrics = state.Next(id, component.DataTypeMetrics) + nextLogs = state.Next(id, component.DataTypeLogs) + nextTraces = state.Next(id, component.DataTypeTraces) + ) + + return &deltatocumulative.Arguments{ + MaxStale: cfg.MaxStale, + MaxStreams: cfg.MaxStreams, + Output: &otelcol.ConsumerArguments{ + Metrics: ToTokenizedConsumers(nextMetrics), + Logs: ToTokenizedConsumers(nextLogs), + Traces: ToTokenizedConsumers(nextTraces), + }, + DebugMetrics: common.DefaultValue[deltatocumulative.Arguments]().DebugMetrics, + } +} diff --git a/internal/converter/internal/otelcolconvert/converter_helpers.go b/internal/converter/internal/otelcolconvert/converter_helpers.go index 8c774dac06..fb1765c2c1 100644 --- a/internal/converter/internal/otelcolconvert/converter_helpers.go +++ b/internal/converter/internal/otelcolconvert/converter_helpers.go @@ -31,6 +31,10 @@ func (tc tokenizedConsumer) AlloyTokenize() []builder.Token { } func ToTokenizedConsumers(components []componentID) []otelcol.Consumer { + if len(components) == 0 { + return nil + } + res := make([]otelcol.Consumer, 0, len(components)) for _, component := range components { diff --git a/internal/converter/internal/otelcolconvert/testdata/deltatocumulative.alloy b/internal/converter/internal/otelcolconvert/testdata/deltatocumulative.alloy new file mode 100644 index 0000000000..a0885f0bde --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/deltatocumulative.alloy @@ -0,0 +1,21 @@ +otelcol.receiver.otlp "default" { + grpc { } + + http { } + + output { + metrics = [otelcol.processor.deltatocumulative.default.input] + } +} + +otelcol.processor.deltatocumulative "default" { + output { + metrics = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = "database:4317" + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/deltatocumulative.yaml b/internal/converter/internal/otelcolconvert/testdata/deltatocumulative.yaml new file mode 100644 index 0000000000..16080020ee --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/deltatocumulative.yaml @@ -0,0 +1,19 @@ +receivers: + otlp: + protocols: + grpc: + http: + +processors: + deltatocumulative: + +exporters: + otlp: + endpoint: database:4317 + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [deltatocumulative] + exporters: [otlp] diff --git a/internal/converter/internal/otelcolconvert/testdata/inconsistent_processor.alloy b/internal/converter/internal/otelcolconvert/testdata/inconsistent_processor.alloy index 199156c6ac..3c65f4d552 100644 --- a/internal/converter/internal/otelcolconvert/testdata/inconsistent_processor.alloy +++ b/internal/converter/internal/otelcolconvert/testdata/inconsistent_processor.alloy @@ -12,9 +12,7 @@ otelcol.receiver.otlp "default" { otelcol.processor.batch "default" { output { - metrics = [] - logs = [otelcol.exporter.otlp.default.input] - traces = [] + logs = [otelcol.exporter.otlp.default.input] } } diff --git a/internal/converter/internal/otelcolconvert/testdata/servicegraph.alloy b/internal/converter/internal/otelcolconvert/testdata/servicegraph.alloy index 33f3aef91f..d9369075ef 100644 --- a/internal/converter/internal/otelcolconvert/testdata/servicegraph.alloy +++ b/internal/converter/internal/otelcolconvert/testdata/servicegraph.alloy @@ -4,9 +4,7 @@ otelcol.receiver.otlp "default" { http { } output { - metrics = [] - logs = [] - traces = [otelcol.connector.servicegraph.default.input] + traces = [otelcol.connector.servicegraph.default.input] } } diff --git a/internal/converter/internal/otelcolconvert/testdata/spanmetrics.alloy b/internal/converter/internal/otelcolconvert/testdata/spanmetrics.alloy index 7cb422b5b0..4ee7ce61f0 100644 --- a/internal/converter/internal/otelcolconvert/testdata/spanmetrics.alloy +++ b/internal/converter/internal/otelcolconvert/testdata/spanmetrics.alloy @@ -4,9 +4,8 @@ otelcol.receiver.otlp "default" { http { } output { - metrics = [] - logs = [otelcol.exporter.otlp.default.input] - traces = [otelcol.connector.spanmetrics.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.connector.spanmetrics.default.input] } } diff --git a/internal/converter/internal/otelcolconvert/testdata/spanmetrics_full.alloy b/internal/converter/internal/otelcolconvert/testdata/spanmetrics_full.alloy index f2806e3d12..df295b13e9 100644 --- a/internal/converter/internal/otelcolconvert/testdata/spanmetrics_full.alloy +++ b/internal/converter/internal/otelcolconvert/testdata/spanmetrics_full.alloy @@ -4,9 +4,7 @@ otelcol.receiver.otlp "default_traces" { http { } output { - metrics = [] - logs = [] - traces = [otelcol.exporter.otlp.default_traces_backend.input, otelcol.connector.spanmetrics.default.input] + traces = [otelcol.exporter.otlp.default_traces_backend.input, otelcol.connector.spanmetrics.default.input] } } diff --git a/internal/converter/internal/otelcolconvert/testdata/vcenterreceiver.alloy b/internal/converter/internal/otelcolconvert/testdata/vcenterreceiver.alloy index 905459d7c6..fe6518e67b 100644 --- a/internal/converter/internal/otelcolconvert/testdata/vcenterreceiver.alloy +++ b/internal/converter/internal/otelcolconvert/testdata/vcenterreceiver.alloy @@ -16,8 +16,7 @@ otelcol.receiver.vcenter "default" { } output { - metrics = [] - traces = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] } } diff --git a/internal/converter/internal/staticconvert/testdata/traces.alloy b/internal/converter/internal/staticconvert/testdata/traces.alloy index 0e1684d924..ee555aa7aa 100644 --- a/internal/converter/internal/staticconvert/testdata/traces.alloy +++ b/internal/converter/internal/staticconvert/testdata/traces.alloy @@ -34,9 +34,7 @@ otelcol.receiver.otlp "_0_default" { } output { - metrics = [] - logs = [] - traces = [otelcol.processor.discovery._0_default.input] + traces = [otelcol.processor.discovery._0_default.input] } } @@ -84,9 +82,7 @@ otelcol.processor.discovery "_0_default" { pod_associations = ["ip", "net.host.ip"] output { - metrics = [] - logs = [] - traces = [otelcol.processor.attributes._0_default.input] + traces = [otelcol.processor.attributes._0_default.input] } } @@ -97,9 +93,7 @@ otelcol.processor.attributes "_0_default" { } output { - metrics = [] - logs = [] - traces = [otelcol.exporter.loadbalancing._0_default.input, otelcol.exporter.logging._0_default.input, otelcol.connector.spanmetrics._0_default.input] + traces = [otelcol.exporter.loadbalancing._0_default.input, otelcol.exporter.logging._0_default.input, otelcol.connector.spanmetrics._0_default.input] } } @@ -161,9 +155,7 @@ otelcol.receiver.otlp "_1_lb" { } output { - metrics = [] - logs = [] - traces = [otelcol.processor.tail_sampling._1_default.input] + traces = [otelcol.processor.tail_sampling._1_default.input] } } @@ -185,9 +177,7 @@ otelcol.processor.batch "_1_default" { send_batch_max_size = 4096 output { - metrics = [] - logs = [] - traces = [otelcol.exporter.otlp._1_0.input, otelcol.exporter.logging._1_default.input, otelcol.connector.spanmetrics._1_default.input] + traces = [otelcol.exporter.otlp._1_0.input, otelcol.exporter.logging._1_default.input, otelcol.connector.spanmetrics._1_default.input] } } diff --git a/internal/converter/internal/staticconvert/testdata/traces_multi.alloy b/internal/converter/internal/staticconvert/testdata/traces_multi.alloy index 4116720eac..af546d0b81 100644 --- a/internal/converter/internal/staticconvert/testdata/traces_multi.alloy +++ b/internal/converter/internal/staticconvert/testdata/traces_multi.alloy @@ -8,9 +8,7 @@ otelcol.receiver.otlp "trace_config_1_default" { } output { - metrics = [] - logs = [] - traces = [otelcol.processor.attributes.trace_config_1_default.input] + traces = [otelcol.processor.attributes.trace_config_1_default.input] } } @@ -21,9 +19,7 @@ otelcol.processor.attributes "trace_config_1_default" { } output { - metrics = [] - logs = [] - traces = [otelcol.exporter.otlp.trace_config_1_default_0.input] + traces = [otelcol.exporter.otlp.trace_config_1_default_0.input] } } @@ -47,9 +43,7 @@ otelcol.receiver.otlp "trace_config_2_default" { } output { - metrics = [] - logs = [] - traces = [otelcol.processor.attributes.trace_config_2_default.input] + traces = [otelcol.processor.attributes.trace_config_2_default.input] } } @@ -61,9 +55,7 @@ otelcol.processor.attributes "trace_config_2_default" { } output { - metrics = [] - logs = [] - traces = [otelcol.exporter.otlp.trace_config_2_default_0.input] + traces = [otelcol.exporter.otlp.trace_config_2_default_0.input] } } diff --git a/internal/converter/internal/staticconvert/testdata/unsupported.alloy b/internal/converter/internal/staticconvert/testdata/unsupported.alloy index e7c3021fd0..a60621afb3 100644 --- a/internal/converter/internal/staticconvert/testdata/unsupported.alloy +++ b/internal/converter/internal/staticconvert/testdata/unsupported.alloy @@ -60,9 +60,7 @@ otelcol.receiver.otlp "default" { } output { - metrics = [] - logs = [] - traces = [otelcol.exporter.otlp.default_0.input, otelcol.exporter.logging.default.input] + traces = [otelcol.exporter.otlp.default_0.input, otelcol.exporter.logging.default.input] } }