diff --git a/go.mod b/go.mod index 5ab95d477b..55446a07a0 100644 --- a/go.mod +++ b/go.mod @@ -214,6 +214,7 @@ require ( go.opentelemetry.io/collector/consumer/consumertest v0.112.0 go.opentelemetry.io/collector/exporter v0.112.0 go.opentelemetry.io/collector/exporter/debugexporter v0.112.0 + go.opentelemetry.io/collector/exporter/exportertest v0.112.0 go.opentelemetry.io/collector/exporter/otlpexporter v0.112.0 go.opentelemetry.io/collector/exporter/otlphttpexporter v0.112.0 go.opentelemetry.io/collector/extension v0.112.0 @@ -227,6 +228,7 @@ require ( go.opentelemetry.io/collector/processor v0.112.0 go.opentelemetry.io/collector/processor/batchprocessor v0.112.0 go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.112.0 + go.opentelemetry.io/collector/processor/processortest v0.112.0 go.opentelemetry.io/collector/receiver v0.112.0 go.opentelemetry.io/collector/receiver/otlpreceiver v0.112.0 go.opentelemetry.io/collector/semconv v0.112.0 @@ -837,13 +839,11 @@ require ( go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles v0.112.0 // indirect go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles v0.112.0 // indirect go.opentelemetry.io/collector/exporter/exporterprofiles v0.112.0 // indirect - go.opentelemetry.io/collector/exporter/exportertest v0.112.0 // indirect go.opentelemetry.io/collector/extension/experimental/storage v0.112.0 // indirect go.opentelemetry.io/collector/extension/extensioncapabilities v0.112.0 // indirect go.opentelemetry.io/collector/internal/memorylimiter v0.112.0 // indirect go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.112.0 // indirect go.opentelemetry.io/collector/processor/processorprofiles v0.112.0 // indirect - go.opentelemetry.io/collector/processor/processortest v0.112.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.7.0 // indirect ) @@ -915,18 +915,6 @@ replace ( github.com/prometheus/node_exporter => github.com/grafana/node_exporter v0.18.1-grafana-r01.0.20231004161416-702318429731 ) -// Replacing for an internal fork which allows us to observe metrics produced by the Collector. -// This is a temporary solution while a new configuration design is discussed for the collector. Related issues: -// https://github.com/open-telemetry/opentelemetry-collector/issues/7532 -// https://github.com/open-telemetry/opentelemetry-collector/pull/7644 -// https://github.com/open-telemetry/opentelemetry-collector/pull/7696 -// https://github.com/open-telemetry/opentelemetry-collector/issues/4970 -replace ( - go.opentelemetry.io/collector/otelcol => github.com/grafana/opentelemetry-collector/otelcol v0.0.0-20241104164848-8ea9d0a3e17a - go.opentelemetry.io/collector/processor/batchprocessor => github.com/grafana/opentelemetry-collector/processor/batchprocessor v0.0.0-20241104164848-8ea9d0a3e17a - go.opentelemetry.io/collector/service => github.com/grafana/opentelemetry-collector/service v0.0.0-20241104164848-8ea9d0a3e17a -) - replace github.com/github/smimesign => github.com/grafana/smimesign v0.2.1-0.20220408144937-2a5adf3481d3 // Submodules. diff --git a/go.sum b/go.sum index 409391ecb1..1e428dfcf6 100644 --- a/go.sum +++ b/go.sum @@ -1252,12 +1252,6 @@ github.com/grafana/opentelemetry-collector-contrib/receiver/prometheusreceiver v github.com/grafana/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20240326165551-1ae1b9218b1b/go.mod h1:XVaL+FyxEWEfHkDI/X6FJgFYDA0080A1/qoOHQ43dxM= github.com/grafana/opentelemetry-collector/featuregate v0.0.0-20240325174506-2fd1623b2ca0 h1:i/Ne0XwoRokYj52ZcSmnvuyID3h/uA91n0Ycg/grHU8= github.com/grafana/opentelemetry-collector/featuregate v0.0.0-20240325174506-2fd1623b2ca0/go.mod h1:mm8+xyQfgDmqhyegZRNIQmoKsNnDTwWKFLsdMoXAb7A= -github.com/grafana/opentelemetry-collector/otelcol v0.0.0-20241104164848-8ea9d0a3e17a h1:6TnkhPwEjuleHfhKSw+IyC/I2cQORuEsImZZMhoodIE= -github.com/grafana/opentelemetry-collector/otelcol v0.0.0-20241104164848-8ea9d0a3e17a/go.mod h1:H/HurP0qCcwcWdDRgvTJ/FRrVLnt++agxzHvgLQn/Ew= -github.com/grafana/opentelemetry-collector/processor/batchprocessor v0.0.0-20241104164848-8ea9d0a3e17a h1:WrDY3pU4EkHsCIepUAYWt7lIfsMEPdugmYB8k6RYjMo= -github.com/grafana/opentelemetry-collector/processor/batchprocessor v0.0.0-20241104164848-8ea9d0a3e17a/go.mod h1:QLQ31rGjPuMc/nGw4rL4HzQI9F0jVAPEmC342chxoqA= -github.com/grafana/opentelemetry-collector/service v0.0.0-20241104164848-8ea9d0a3e17a h1:ZycgUSrrwtB2x1fMdLD88J2k8886/HvX1tYHlaOH/hg= -github.com/grafana/opentelemetry-collector/service v0.0.0-20241104164848-8ea9d0a3e17a/go.mod h1:VTLnax+DjHal3q7WKQO0ITjWdfPTq2txaoNRcVXYzgE= github.com/grafana/postgres_exporter v0.15.1-0.20241105053755-e0a51174f168 h1:I7FyVTtge/3G5YHVOMDG0l4If6W+kXbFDqtzj5gCSGs= github.com/grafana/postgres_exporter v0.15.1-0.20241105053755-e0a51174f168/go.mod h1:dMrETGkSetWByp2XGsm8g6pRVh/ibnrDxKsN4BqnGNg= github.com/grafana/prometheus v1.8.2-0.20240514135907-13889ba362e6 h1:kih3d3M3dxAmrpFLvnIxFzWx8KMQyKxQwKgWP67C/Fg= @@ -2686,6 +2680,8 @@ go.opentelemetry.io/collector/filter v0.112.0 h1:xX0MIfXOkbcWzCcmNqvtpnBDnSZAJmN go.opentelemetry.io/collector/filter v0.112.0/go.mod h1:ZcPbD9CLxqcQJ5D2dV2Ma0Hm2IKMOYggTLW8dDdZQSQ= go.opentelemetry.io/collector/internal/memorylimiter v0.112.0 h1:u1hUa48x1qEONUSOtz8dx/c8oz74RpIHyWnGFJ7t0CE= go.opentelemetry.io/collector/internal/memorylimiter v0.112.0/go.mod h1:BtHruDt40QTW8klZVQCqsVfhVsOkh6hDg5w1cPvLpeU= +go.opentelemetry.io/collector/otelcol v0.112.0 h1:xOq7z5WK5jS1Qg5w+l99H1EiQRq9rHHDv7EIiLryldw= +go.opentelemetry.io/collector/otelcol v0.112.0/go.mod h1:H/HurP0qCcwcWdDRgvTJ/FRrVLnt++agxzHvgLQn/Ew= go.opentelemetry.io/collector/otelcol/otelcoltest v0.112.0 h1:AdjoVnYl7RxoOvhWZcJb0SWY1VvlRT1cdlCwHBpn9vs= go.opentelemetry.io/collector/otelcol/otelcoltest v0.112.0/go.mod h1:VSbEYgmiSM5K6p501XD35QuhxbDpkxrfS2Wf5OKnHPs= go.opentelemetry.io/collector/pdata v1.18.0 h1:/yg2rO2dxqDM2p6GutsMCxXN6sKlXwyIz/ZYyUPONBg= @@ -2700,6 +2696,8 @@ go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.112.0 h1:opXGNrlJAjYR go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.112.0/go.mod h1:c9yn4x+vY3G10eLCRuUu/oH7Y8YdE/BsgmLWmfHkaNY= go.opentelemetry.io/collector/processor v0.112.0 h1:nMv9DOBYR9MB78ddUgY3A3ytwAwk3t4HQMNIu+w8o0g= go.opentelemetry.io/collector/processor v0.112.0/go.mod h1:AJ8EHq8Z/ev90f4gU6G5ULUncdpWmBRATYk8ioR3pvw= +go.opentelemetry.io/collector/processor/batchprocessor v0.112.0 h1:Dq/RpdClawI8HrnSi177LziPjfHo733BWOCgRTbWrfY= +go.opentelemetry.io/collector/processor/batchprocessor v0.112.0/go.mod h1:QLQ31rGjPuMc/nGw4rL4HzQI9F0jVAPEmC342chxoqA= go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.112.0 h1:+V+4OUcg1s3CrZpttT4dA+Uuv7VWpOIPQpOkcsrMBIo= go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.112.0/go.mod h1:f/eEZ3JMbRNLsRzNwATtTjuulDrLvhYMvXinLrmHtTU= go.opentelemetry.io/collector/processor/processorprofiles v0.112.0 h1:Aef68SAbmBbhbsZZPuZb0ECwkV05vIcHIizGOGbWsbM= @@ -2714,6 +2712,8 @@ go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 h1:SShkZsWRsFss go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0/go.mod h1:615smszDXiz4YWwXslxlAjX7FzOVDU7Bk6xARFk+zpk= go.opentelemetry.io/collector/semconv v0.112.0 h1:JPQyvZhlNLVSuVI+FScONaiFygB7h7NTZceUEKIQUEc= go.opentelemetry.io/collector/semconv v0.112.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A= +go.opentelemetry.io/collector/service v0.112.0 h1:SI5bwPrRHLRn/kR9AoSSDX/8vaKFe+NMYloAcXfWMSE= +go.opentelemetry.io/collector/service v0.112.0/go.mod h1:VTLnax+DjHal3q7WKQO0ITjWdfPTq2txaoNRcVXYzgE= go.opentelemetry.io/contrib/config v0.10.0 h1:2JknAzMaYjxrHkTnZh3eOme/Y2P5eHE2SWfhfV6Xd6c= go.opentelemetry.io/contrib/config v0.10.0/go.mod h1:aND2M6/KfNkntI5cyvHriR/zvZgPf8j9yETdSmvpfmc= go.opentelemetry.io/contrib/detectors/aws/ec2 v1.28.0 h1:d+y/wygENfwEbVpo7c3A9GfnMhoTiepQcthQSh+Mc9g= diff --git a/internal/static/traces/config.go b/internal/static/traces/config.go index 93b0ff3b08..ad9fb7655a 100644 --- a/internal/static/traces/config.go +++ b/internal/static/traces/config.go @@ -41,6 +41,7 @@ import ( "github.com/grafana/alloy/internal/static/logs" "github.com/grafana/alloy/internal/static/traces/automaticloggingprocessor" + "github.com/grafana/alloy/internal/static/traces/forked_otelcol" "github.com/grafana/alloy/internal/static/traces/noopreceiver" "github.com/grafana/alloy/internal/static/traces/promsdprocessor" "github.com/grafana/alloy/internal/static/traces/pushreceiver" @@ -1003,7 +1004,7 @@ func orderProcessors(processors []string, splitPipelines bool) [][]string { func otelcolConfigFromStringMap(otelMapStructure map[string]interface{}, factories *otelcol.Factories) (*otelcol.Config, error) { configMap := confmap.NewFromStringMap(otelMapStructure) - otelCfg, err := otelcol.Unmarshal(configMap, *factories) + otelCfg, err := forked_otelcol.Unmarshal(configMap, *factories) if err != nil { return nil, fmt.Errorf("failed to load OTel config: %w", err) } diff --git a/internal/static/traces/forked_configunmarshaler/README.md b/internal/static/traces/forked_configunmarshaler/README.md new file mode 100644 index 0000000000..a727e21468 --- /dev/null +++ b/internal/static/traces/forked_configunmarshaler/README.md @@ -0,0 +1,11 @@ +The code was copied from: +https://github.com/open-telemetry/opentelemetry-collector/tree/v0.114.0/otelcol/internal/configunmarshaler + +This forked package exists because the "forked_otelcol" package depends on it. + +There is no need to update it every time Alloy's OTel version is updated: +* This is very fundamental code which doesn't change often. +* As long as Static mode's code still behaves as expected and the converter tests pass, that's fine. +* The version of OTel which Agent's repo uses is different from the one which Alloy uses anyway. + +The only time when this forked package may need updating is when it fails to build due to refactoring in the OTel repo. \ No newline at end of file diff --git a/internal/static/traces/forked_configunmarshaler/configs.go b/internal/static/traces/forked_configunmarshaler/configs.go new file mode 100644 index 0000000000..36636574e7 --- /dev/null +++ b/internal/static/traces/forked_configunmarshaler/configs.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package forked_configunmarshaler // import "go.opentelemetry.io/collector/otelcol/internal/configunmarshaler" + +import ( + "errors" + "fmt" + + "golang.org/x/exp/maps" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" +) + +type Configs[F component.Factory] struct { + cfgs map[component.ID]component.Config + + factories map[component.Type]F +} + +func NewConfigs[F component.Factory](factories map[component.Type]F) *Configs[F] { + return &Configs[F]{factories: factories} +} + +func (c *Configs[F]) Unmarshal(conf *confmap.Conf) error { + rawCfgs := make(map[component.ID]map[string]any) + if err := conf.Unmarshal(&rawCfgs); err != nil { + return err + } + + // Prepare resulting map. + c.cfgs = make(map[component.ID]component.Config) + // Iterate over raw configs and create a config for each. + for id := range rawCfgs { + // Find factory based on component kind and type that we read from config source. + factory, ok := c.factories[id.Type()] + if !ok { + return errorUnknownType(id, maps.Keys(c.factories)) + } + + // Get the configuration from the confmap.Conf to preserve internal representation. + sub, err := conf.Sub(id.String()) + if err != nil { + return errorUnmarshalError(id, err) + } + + // Create the default config for this component. + cfg := factory.CreateDefaultConfig() + + // Now that the default config struct is created we can Unmarshal into it, + // and it will apply user-defined config on top of the default. + if err := sub.Unmarshal(&cfg); err != nil { + return errorUnmarshalError(id, err) + } + + c.cfgs[id] = cfg + } + + return nil +} + +func (c *Configs[F]) Configs() map[component.ID]component.Config { + return c.cfgs +} + +func errorUnknownType(id component.ID, factories []component.Type) error { + if id.Type().String() == "logging" { + return errors.New("the logging exporter has been deprecated, use the debug exporter instead") + } + return fmt.Errorf("unknown type: %q for id: %q (valid values: %v)", id.Type(), id, factories) +} + +func errorUnmarshalError(id component.ID, err error) error { + return fmt.Errorf("error reading configuration for %q: %w", id, err) +} diff --git a/internal/static/traces/forked_configunmarshaler/configs_test.go b/internal/static/traces/forked_configunmarshaler/configs_test.go new file mode 100644 index 0000000000..e599278d97 --- /dev/null +++ b/internal/static/traces/forked_configunmarshaler/configs_test.go @@ -0,0 +1,145 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package forked_configunmarshaler + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/connector/connectortest" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/processor/processortest" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +var nopType = component.MustNewType("nop") + +var testKinds = []struct { + kind string + factories map[component.Type]component.Factory +}{ + { + kind: "receiver", + factories: map[component.Type]component.Factory{ + nopType: receivertest.NewNopFactory(), + }, + }, + { + kind: "processor", + factories: map[component.Type]component.Factory{ + nopType: processortest.NewNopFactory(), + }, + }, + { + kind: "exporter", + factories: map[component.Type]component.Factory{ + nopType: exportertest.NewNopFactory(), + }, + }, + { + kind: "connector", + factories: map[component.Type]component.Factory{ + nopType: connectortest.NewNopFactory(), + }, + }, + { + kind: "extension", + factories: map[component.Type]component.Factory{ + nopType: extensiontest.NewNopFactory(), + }, + }, +} + +func TestUnmarshal(t *testing.T) { + for _, tk := range testKinds { + t.Run(tk.kind, func(t *testing.T) { + cfgs := NewConfigs(tk.factories) + conf := confmap.NewFromStringMap(map[string]any{ + "nop": nil, + "nop/my" + tk.kind: nil, + }) + require.NoError(t, cfgs.Unmarshal(conf)) + + assert.Equal(t, map[component.ID]component.Config{ + component.NewID(nopType): tk.factories[nopType].CreateDefaultConfig(), + component.NewIDWithName(nopType, "my"+tk.kind): tk.factories[nopType].CreateDefaultConfig(), + }, cfgs.Configs()) + }) + } +} + +func TestUnmarshalError(t *testing.T) { + for _, tk := range testKinds { + t.Run(tk.kind, func(t *testing.T) { + var testCases = []struct { + name string + conf *confmap.Conf + // string that the error must contain + expectedError string + }{ + { + name: "invalid-type", + conf: confmap.NewFromStringMap(map[string]any{ + "nop": nil, + "/custom": nil, + }), + expectedError: "the part before / should not be empty", + }, + { + name: "invalid-name-after-slash", + conf: confmap.NewFromStringMap(map[string]any{ + "nop": nil, + "nop/": nil, + }), + expectedError: "the part after / should not be empty", + }, + { + name: "unknown-type", + conf: confmap.NewFromStringMap(map[string]any{ + "nosuch" + tk.kind: nil, + }), + expectedError: "unknown type: \"nosuch" + tk.kind + "\" for id: \"nosuch" + tk.kind + "\" (valid values: [nop])", + }, + { + name: "duplicate", + conf: confmap.NewFromStringMap(map[string]any{ + "nop /my" + tk.kind + " ": nil, + " nop/ my" + tk.kind: nil, + }), + expectedError: "duplicate name", + }, + { + name: "invalid-section", + conf: confmap.NewFromStringMap(map[string]any{ + "nop": map[string]any{ + "unknown_section": tk.kind, + }, + }), + expectedError: "error reading configuration for \"nop\"", + }, + { + name: "invalid-sub-config", + conf: confmap.NewFromStringMap(map[string]any{ + "nop": "tests", + }), + expectedError: "'[nop]' expected a map, got 'string'", + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + cfgs := NewConfigs(tk.factories) + err := cfgs.Unmarshal(tt.conf) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedError) + }) + } + }) + } +} diff --git a/internal/static/traces/forked_otelcol/README.md b/internal/static/traces/forked_otelcol/README.md new file mode 100644 index 0000000000..3d53d2825f --- /dev/null +++ b/internal/static/traces/forked_otelcol/README.md @@ -0,0 +1,14 @@ +The code was copied from: +https://github.com/open-telemetry/opentelemetry-collector/blob/v0.114.0/otelcol/unmarshaler.go +https://github.com/open-telemetry/opentelemetry-collector/blob/v0.114.0/otelcol/unmarshaler_test.go +https://github.com/open-telemetry/opentelemetry-collector/blob/v0.114.0/otelcol/factories_test.go + +The purpose of this forked package is to allow Static mode to use the "Unmarshal(...)" function. +In the original OTel repo it is internal (it's lowercase - "unmarshal(...)"). + +There is no need to update this forked package every time Alloy's OTel version is updated: +* This is very fundamental code which doesn't change often. +* As long as Static mode's code still behaves as expected and the converter tests pass, that's fine. +* The version of OTel which Agent's repo uses is different from the one which Alloy uses anyway. + +The only time when this forked package may need updating is when it fails to build due to refactoring in the OTel repo. \ No newline at end of file diff --git a/internal/static/traces/forked_otelcol/factories_test.go b/internal/static/traces/forked_otelcol/factories_test.go new file mode 100644 index 0000000000..4eac485b4e --- /dev/null +++ b/internal/static/traces/forked_otelcol/factories_test.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package forked_otelcol + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/connector/connectortest" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/otelcol" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func nopFactories() (otelcol.Factories, error) { + var factories otelcol.Factories + var err error + + if factories.Connectors, err = connector.MakeFactoryMap(connectortest.NewNopFactory()); err != nil { + return otelcol.Factories{}, err + } + factories.ConnectorModules = make(map[component.Type]string, len(factories.Connectors)) + for _, con := range factories.Connectors { + factories.ConnectorModules[con.Type()] = "go.opentelemetry.io/collector/connector/connectortest v1.2.3" + } + + if factories.Extensions, err = extension.MakeFactoryMap(extensiontest.NewNopFactory()); err != nil { + return otelcol.Factories{}, err + } + factories.ExtensionModules = make(map[component.Type]string, len(factories.Extensions)) + for _, ext := range factories.Extensions { + factories.ExtensionModules[ext.Type()] = "go.opentelemetry.io/collector/extension/extensiontest v1.2.3" + } + + if factories.Receivers, err = receiver.MakeFactoryMap(receivertest.NewNopFactory(), receivertest.NewNopFactoryForType(pipeline.SignalLogs)); err != nil { + return otelcol.Factories{}, err + } + factories.ReceiverModules = make(map[component.Type]string, len(factories.Receivers)) + for _, rec := range factories.Receivers { + factories.ReceiverModules[rec.Type()] = "go.opentelemetry.io/collector/receiver/receivertest v1.2.3" + } + + if factories.Exporters, err = exporter.MakeFactoryMap(exportertest.NewNopFactory()); err != nil { + return otelcol.Factories{}, err + } + factories.ExporterModules = make(map[component.Type]string, len(factories.Exporters)) + for _, exp := range factories.Exporters { + factories.ExporterModules[exp.Type()] = "go.opentelemetry.io/collector/exporter/exportertest v1.2.3" + } + + if factories.Processors, err = processor.MakeFactoryMap(processortest.NewNopFactory()); err != nil { + return otelcol.Factories{}, err + } + factories.ProcessorModules = make(map[component.Type]string, len(factories.Processors)) + for _, proc := range factories.Processors { + factories.ProcessorModules[proc.Type()] = "go.opentelemetry.io/collector/processor/processortest v1.2.3" + } + + return factories, err +} diff --git a/internal/static/traces/forked_otelcol/unmarshaler.go b/internal/static/traces/forked_otelcol/unmarshaler.go new file mode 100644 index 0000000000..db1bd0bbc3 --- /dev/null +++ b/internal/static/traces/forked_otelcol/unmarshaler.go @@ -0,0 +1,49 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package forked_otelcol // import "go.opentelemetry.io/collector/otelcol" + +import ( + "github.com/grafana/alloy/internal/static/traces/forked_configunmarshaler" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/otelcol" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/service" + "go.opentelemetry.io/collector/service/telemetry" +) + +type ConfigSettings struct { + Receivers *forked_configunmarshaler.Configs[receiver.Factory] `mapstructure:"receivers"` + Processors *forked_configunmarshaler.Configs[processor.Factory] `mapstructure:"processors"` + Exporters *forked_configunmarshaler.Configs[exporter.Factory] `mapstructure:"exporters"` + Connectors *forked_configunmarshaler.Configs[connector.Factory] `mapstructure:"connectors"` + Extensions *forked_configunmarshaler.Configs[extension.Factory] `mapstructure:"extensions"` + Service service.Config `mapstructure:"service"` +} + +// unmarshal the configSettings from a confmap.Conf. +// After the config is unmarshalled, `Validate()` must be called to validate. +func Unmarshal(v *confmap.Conf, factories otelcol.Factories) (*ConfigSettings, error) { + + telFactory := telemetry.NewFactory() + defaultTelConfig := *telFactory.CreateDefaultConfig().(*telemetry.Config) + + // Unmarshal top level sections and validate. + cfg := &ConfigSettings{ + Receivers: forked_configunmarshaler.NewConfigs(factories.Receivers), + Processors: forked_configunmarshaler.NewConfigs(factories.Processors), + Exporters: forked_configunmarshaler.NewConfigs(factories.Exporters), + Connectors: forked_configunmarshaler.NewConfigs(factories.Connectors), + Extensions: forked_configunmarshaler.NewConfigs(factories.Extensions), + // TODO: Add a component.ServiceFactory to allow this to be defined by the Service. + Service: service.Config{ + Telemetry: defaultTelConfig, + }, + } + + return cfg, v.Unmarshal(&cfg) +} diff --git a/internal/static/traces/forked_otelcol/unmarshaler_test.go b/internal/static/traces/forked_otelcol/unmarshaler_test.go new file mode 100644 index 0000000000..6cbf4a68ac --- /dev/null +++ b/internal/static/traces/forked_otelcol/unmarshaler_test.go @@ -0,0 +1,208 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package forked_otelcol + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/service" + "go.opentelemetry.io/collector/service/pipelines" + "go.opentelemetry.io/collector/service/telemetry" +) + +func TestUnmarshalEmpty(t *testing.T) { + factories, err := nopFactories() + require.NoError(t, err) + + _, err = Unmarshal(confmap.New(), factories) + assert.NoError(t, err) +} + +func TestUnmarshalEmptyAllSections(t *testing.T) { + factories, err := nopFactories() + require.NoError(t, err) + + conf := confmap.NewFromStringMap(map[string]any{ + "receivers": nil, + "processors": nil, + "exporters": nil, + "connectors": nil, + "extensions": nil, + "service": nil, + }) + cfg, err := Unmarshal(conf, factories) + require.NoError(t, err) + + zapProdCfg := zap.NewProductionConfig() + assert.Equal(t, telemetry.LogsConfig{ + Level: zapProdCfg.Level.Level(), + Development: zapProdCfg.Development, + Encoding: "console", + Sampling: &telemetry.LogsSamplingConfig{ + Enabled: true, + Tick: 10 * time.Second, + Initial: 10, + Thereafter: 100, + }, + DisableCaller: zapProdCfg.DisableCaller, + DisableStacktrace: zapProdCfg.DisableStacktrace, + OutputPaths: zapProdCfg.OutputPaths, + ErrorOutputPaths: zapProdCfg.ErrorOutputPaths, + InitialFields: zapProdCfg.InitialFields, + }, cfg.Service.Telemetry.Logs) +} + +func TestUnmarshalUnknownTopLevel(t *testing.T) { + factories, err := nopFactories() + require.NoError(t, err) + + conf := confmap.NewFromStringMap(map[string]any{ + "unknown_section": nil, + }) + _, err = Unmarshal(conf, factories) + assert.ErrorContains(t, err, "'' has invalid keys: unknown_section") +} + +func TestPipelineConfigUnmarshalError(t *testing.T) { + var testCases = []struct { + // test case name (also file name containing config yaml) + name string + conf *confmap.Conf + // string that the error must contain + expectError string + }{ + { + name: "duplicate-pipeline", + conf: confmap.NewFromStringMap(map[string]any{ + "traces/ pipe": nil, + "traces /pipe": nil, + }), + expectError: "duplicate name", + }, + { + name: "invalid-pipeline-name-after-slash", + conf: confmap.NewFromStringMap(map[string]any{ + "metrics/": nil, + }), + expectError: "in \"metrics/\" id: the part after / should not be empty", + }, + { + name: "invalid-pipeline-section", + conf: confmap.NewFromStringMap(map[string]any{ + "traces": map[string]any{ + "unknown_section": nil, + }, + }), + expectError: "'[traces]' has invalid keys: unknown_section", + }, + { + name: "invalid-pipeline-sub-config", + conf: confmap.NewFromStringMap(map[string]any{ + "traces": "string", + }), + expectError: "'[traces]' expected a map, got 'string'", + }, + { + name: "invalid-pipeline-type", + conf: confmap.NewFromStringMap(map[string]any{ + "/metrics": nil, + }), + expectError: "in \"/metrics\" id: the part before / should not be empty", + }, + { + name: "invalid-sequence-value", + conf: confmap.NewFromStringMap(map[string]any{ + "traces": map[string]any{ + "receivers": map[string]any{ + "nop": map[string]any{ + "some": "config", + }, + }, + }, + }), + expectError: "'[traces].receivers': source data must be an array or slice, got map", + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + pips := new(pipelines.Config) + err := tt.conf.Unmarshal(&pips) + assert.ErrorContains(t, err, tt.expectError) + }) + } +} + +func TestServiceUnmarshalError(t *testing.T) { + var testCases = []struct { + // test case name (also file name containing config yaml) + name string + conf *confmap.Conf + // string that the error must contain + expectError string + }{ + { + name: "invalid-logs-level", + conf: confmap.NewFromStringMap(map[string]any{ + "telemetry": map[string]any{ + "logs": map[string]any{ + "level": "UNKNOWN", + }, + }, + }), + expectError: "error decoding 'telemetry': decoding failed due to the following error(s):\n\nerror decoding 'logs.level': unrecognized level: \"UNKNOWN\"", + }, + { + name: "invalid-metrics-level", + conf: confmap.NewFromStringMap(map[string]any{ + "telemetry": map[string]any{ + "metrics": map[string]any{ + "level": "unknown", + }, + }, + }), + expectError: "error decoding 'telemetry': decoding failed due to the following error(s):\n\nerror decoding 'metrics.level': unknown metrics level \"unknown\"", + }, + { + name: "invalid-service-extensions-section", + conf: confmap.NewFromStringMap(map[string]any{ + "extensions": []any{ + map[string]any{ + "nop": map[string]any{ + "some": "config", + }, + }, + }, + }), + expectError: "'extensions[0]' has invalid keys: nop", + }, + { + name: "invalid-service-section", + conf: confmap.NewFromStringMap(map[string]any{ + "unknown_section": "string", + }), + expectError: "'' has invalid keys: unknown_section", + }, + { + name: "invalid-pipelines-config", + conf: confmap.NewFromStringMap(map[string]any{ + "pipelines": "string", + }), + expectError: "'pipelines' expected a map, got 'string'", + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + err := tt.conf.Unmarshal(&service.Config{}) + require.ErrorContains(t, err, tt.expectError) + }) + } +} diff --git a/internal/static/traces/traceutils/server.go b/internal/static/traces/traceutils/server.go deleted file mode 100644 index 548c514088..0000000000 --- a/internal/static/traces/traceutils/server.go +++ /dev/null @@ -1,247 +0,0 @@ -package traceutils - -import ( - "context" - "fmt" - "math/rand" - "strings" - "testing" - "time" - - "github.com/grafana/alloy/internal/util" - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/extension" - "go.opentelemetry.io/collector/otelcol" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/otlpreceiver" - "go.opentelemetry.io/collector/service" - "go.opentelemetry.io/otel/trace/noop" - "gopkg.in/yaml.v3" -) - -// server is a Tracing testing server that invokes a function every time a span -// is received. -type server struct { - service *service.Service -} - -// NewTestServer creates a new server for testing, where received traces will -// call the callback function. The returned string is the address where traces -// can be sent using OTLP. -func NewTestServer(t *testing.T, callback func(ptrace.Traces)) string { - t.Helper() - - srv, listenAddr, err := newServerWithRandomPort(callback) - if err != nil { - t.Fatalf("failed to create OTLP server: %s", err) - } - t.Cleanup(func() { - err := srv.stop() - assert.NoError(t, err) - }) - - return listenAddr -} - -// newServerWithRandomPort calls NewServer with a random port >49152 and -// <65535. It will try up to five times before failing. -func newServerWithRandomPort(callback func(ptrace.Traces)) (srv *server, addr string, err error) { - var lastError error - - for i := 0; i < 5; i++ { - port := rand.Intn(65535-49152) + 49152 - listenAddr := fmt.Sprintf("127.0.0.1:%d", port) - - srv, err = newServer(listenAddr, callback) - if err != nil { - lastError = err - continue - } - - return srv, listenAddr, nil - } - - return nil, "", fmt.Errorf("failed 5 times to create a server. last error: %w", lastError) -} - -// newServer creates an OTLP-accepting server that calls a function when a -// trace is received. This is primarily useful for testing. -func newServer(addr string, callback func(ptrace.Traces)) (*server, error) { - conf := util.Untab(fmt.Sprintf(` - processors: - func_processor: - receivers: - otlp: - protocols: - grpc: - endpoint: %s - exporters: - noop: - service: - pipelines: - traces: - receivers: [otlp] - processors: [func_processor] - exporters: [noop] - `, addr)) - - var cfg map[string]interface{} - if err := yaml.NewDecoder(strings.NewReader(conf)).Decode(&cfg); err != nil { - panic("could not decode config: " + err.Error()) - } - - extensionsFactory, err := extension.MakeFactoryMap() - if err != nil { - return nil, fmt.Errorf("failed to make extension factory map: %w", err) - } - - receiversFactory, err := receiver.MakeFactoryMap(otlpreceiver.NewFactory()) - if err != nil { - return nil, fmt.Errorf("failed to make receiver factory map: %w", err) - } - - exportersFactory, err := exporter.MakeFactoryMap(newNoopExporterFactory()) - if err != nil { - return nil, fmt.Errorf("failed to make exporter factory map: %w", err) - } - - processorsFactory, err := processor.MakeFactoryMap( - newFuncProcessorFactory(callback), - ) - if err != nil { - return nil, fmt.Errorf("failed to make processor factory map: %w", err) - } - - factories := otelcol.Factories{ - Extensions: extensionsFactory, - Receivers: receiversFactory, - Processors: processorsFactory, - Exporters: exportersFactory, - } - - configMap := confmap.NewFromStringMap(cfg) - otelCfgSettings, err := otelcol.Unmarshal(configMap, factories) - if err != nil { - return nil, fmt.Errorf("failed to make otel config: %w", err) - } - - otelCfg := otelcol.Config{ - Receivers: otelCfgSettings.Receivers.Configs(), - Processors: otelCfgSettings.Processors.Configs(), - Exporters: otelCfgSettings.Exporters.Configs(), - Connectors: otelCfgSettings.Connectors.Configs(), - Extensions: otelCfgSettings.Extensions.Configs(), - Service: otelCfgSettings.Service, - } - - if err := otelCfg.Validate(); err != nil { - return nil, err - } - - svc, err := service.New(context.Background(), service.Settings{ - ReceiversConfigs: otelCfg.Receivers, - ReceiversFactories: factories.Receivers, - ProcessorsConfigs: otelCfg.Processors, - ProcessorsFactories: factories.Processors, - ExportersConfigs: otelCfg.Exporters, - ExportersFactories: factories.Exporters, - ConnectorsConfigs: otelCfg.Connectors, - ConnectorsFactories: factories.Connectors, - ExtensionsConfigs: otelCfg.Extensions, - ExtensionsFactories: factories.Extensions, - TracerProvider: noop.NewTracerProvider(), - }, otelCfg.Service) - if err != nil { - return nil, fmt.Errorf("failed to create Otel service: %w", err) - } - - if err := svc.Start(context.Background()); err != nil { - return nil, fmt.Errorf("failed to start Otel service: %w", err) - } - - return &server{ - service: svc, - }, nil -} - -// stop stops the testing server. -func (s *server) stop() error { - shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - return s.service.Shutdown(shutdownCtx) -} - -func newFuncProcessorFactory(callback func(ptrace.Traces)) processor.Factory { - return processor.NewFactory( - component.MustNewType("func_processor"), - func() component.Config { - return &struct{}{} - }, - processor.WithTraces(func( - _ context.Context, - _ processor.Settings, - _ component.Config, - next consumer.Traces, - ) (processor.Traces, error) { - - return &funcProcessor{ - Callback: callback, - Next: next, - }, nil - }, component.StabilityLevelUndefined), - ) -} - -type funcProcessor struct { - Callback func(ptrace.Traces) - Next consumer.Traces -} - -func (p *funcProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - if p.Callback != nil { - p.Callback(td) - } - return p.Next.ConsumeTraces(ctx, td) -} - -func (p *funcProcessor) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: true} -} - -func (p *funcProcessor) Start(context.Context, component.Host) error { return nil } -func (p *funcProcessor) Shutdown(context.Context) error { return nil } - -func newNoopExporterFactory() exporter.Factory { - return exporter.NewFactory( - component.MustNewType("noop"), - func() component.Config { - return &struct{}{} - }, - exporter.WithTraces(func( - context.Context, - exporter.Settings, - component.Config) ( - exporter.Traces, - error) { - - return &noopExporter{}, nil - }, component.StabilityLevelUndefined), - ) -} - -type noopExporter struct{} - -func (n noopExporter) Start(context.Context, component.Host) error { return nil } - -func (n noopExporter) Shutdown(context.Context) error { return nil } - -func (n noopExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{} } - -func (n noopExporter) ConsumeTraces(context.Context, ptrace.Traces) error { return nil }