diff --git a/CHANGELOG.md b/CHANGELOG.md index eaf39ee382..59e6cff844 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,8 +42,10 @@ Main (unreleased) - Change processlist query to support ONLY_FULL_GROUP_BY sql_mode - Add perf_schema quantile columns to collector -### Bugfixes +- For sharding targets during clustering, `loki.source.podlogs` now only takes into account some labels. (@ptodev) +### Bugfixes +- Fixed an issue in the `pyroscope.write` component to allow slashes in application names in the same way it is done in the Pyroscope push API (@marcsanmi) - Fixed an issue in the `prometheus.exporter.postgres` component that would leak goroutines when the target was not reachable (@dehaansa) - Fixed an issue in the `otelcol.exporter.prometheus` component that would set series value incorrectly for stale metrics (@YusifAghalar) @@ -54,6 +56,8 @@ Main (unreleased) - Fixed a race condition that could lead to a deadlock when using `import` statements, which could lead to a memory leak on `/metrics` endpoint of an Alloy instance. (@thampiotr) +- Updated `prometheus.write.queue` to fix issue with TTL comparing different scales of time. (@mattdurham) + ### Other changes - Change the stability of the `livedebugging` feature from "experimental" to "generally available". (@wildum) diff --git a/docs/sources/reference/components/loki/loki.source.kubernetes.md b/docs/sources/reference/components/loki/loki.source.kubernetes.md index e0e3896a99..4cb86cc620 100644 --- a/docs/sources/reference/components/loki/loki.source.kubernetes.md +++ b/docs/sources/reference/components/loki/loki.source.kubernetes.md @@ -139,7 +139,7 @@ When {{< param "PRODUCT_NAME" >}} is [using clustering][], and `enabled` is set If {{< param "PRODUCT_NAME" >}} is _not_ running in clustered mode, then the block is a no-op and `loki.source.kubernetes` collects logs from every target it receives in its arguments. -Clustering only looks at the following labels for determining the shard key: +Clustering looks only at the following labels for determining the shard key: * `__pod_namespace__` * `__pod_name__` diff --git a/docs/sources/reference/components/loki/loki.source.podlogs.md b/docs/sources/reference/components/loki/loki.source.podlogs.md index 0ce4caac49..ae4703f675 100644 --- a/docs/sources/reference/components/loki/loki.source.podlogs.md +++ b/docs/sources/reference/components/loki/loki.source.podlogs.md @@ -231,6 +231,21 @@ cluster to distribute the load of log collection between all cluster nodes. If {{< param "PRODUCT_NAME" >}} is _not_ running in clustered mode, then the block is a no-op and `loki.source.podlogs` collects logs based on every PodLogs resource discovered. +Clustering looks only at the following labels for determining the shard key: + +* `__pod_namespace__` +* `__pod_name__` +* `__pod_container_name__` +* `__pod_uid__` +* `__meta_kubernetes_namespace` +* `__meta_kubernetes_pod_name` +* `__meta_kubernetes_pod_container_name` +* `__meta_kubernetes_pod_uid` +* `container` +* `pod` +* `job` +* `namespace` + [using clustering]: ../../../../get-started/clustering/ ## Exported fields diff --git a/go.mod b/go.mod index 9c90e0c414..acd1199831 100644 --- a/go.mod +++ b/go.mod @@ -73,7 +73,7 @@ require ( github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 github.com/grafana/vmware_exporter v0.0.5-beta - github.com/grafana/walqueue v0.0.0-20241118151117-77b057cbe5a2 + github.com/grafana/walqueue v0.0.0-20241202135041-6ec70efeec94 github.com/hashicorp/consul/api v1.29.5 github.com/hashicorp/go-discover v0.0.0-20230724184603-e89ebd1b2f65 github.com/hashicorp/go-multierror v1.1.1 @@ -929,18 +929,6 @@ replace ( github.com/prometheus/node_exporter => github.com/grafana/node_exporter v0.18.1-grafana-r01.0.20231004161416-702318429731 ) -// Replacing for an internal fork which allows us to observe metrics produced by the Collector. -// This is a temporary solution while a new configuration design is discussed for the collector. Related issues: -// https://github.com/open-telemetry/opentelemetry-collector/issues/7532 -// https://github.com/open-telemetry/opentelemetry-collector/pull/7644 -// https://github.com/open-telemetry/opentelemetry-collector/pull/7696 -// https://github.com/open-telemetry/opentelemetry-collector/issues/4970 -replace ( - go.opentelemetry.io/collector/otelcol => github.com/grafana/opentelemetry-collector/otelcol v0.0.0-20241104164848-8ea9d0a3e17a - go.opentelemetry.io/collector/processor/batchprocessor => github.com/grafana/opentelemetry-collector/processor/batchprocessor v0.0.0-20241104164848-8ea9d0a3e17a - go.opentelemetry.io/collector/service => github.com/grafana/opentelemetry-collector/service v0.0.0-20241104164848-8ea9d0a3e17a -) - replace github.com/github/smimesign => github.com/grafana/smimesign v0.2.1-0.20220408144937-2a5adf3481d3 // Submodules. diff --git a/go.sum b/go.sum index d30874b8d4..850c4de59d 100644 --- a/go.sum +++ b/go.sum @@ -1881,12 +1881,6 @@ github.com/grafana/opentelemetry-collector-contrib/receiver/prometheusreceiver v github.com/grafana/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.0.0-20240326165551-1ae1b9218b1b/go.mod h1:XVaL+FyxEWEfHkDI/X6FJgFYDA0080A1/qoOHQ43dxM= github.com/grafana/opentelemetry-collector/featuregate v0.0.0-20240325174506-2fd1623b2ca0 h1:i/Ne0XwoRokYj52ZcSmnvuyID3h/uA91n0Ycg/grHU8= github.com/grafana/opentelemetry-collector/featuregate v0.0.0-20240325174506-2fd1623b2ca0/go.mod h1:mm8+xyQfgDmqhyegZRNIQmoKsNnDTwWKFLsdMoXAb7A= -github.com/grafana/opentelemetry-collector/otelcol v0.0.0-20241104164848-8ea9d0a3e17a h1:6TnkhPwEjuleHfhKSw+IyC/I2cQORuEsImZZMhoodIE= -github.com/grafana/opentelemetry-collector/otelcol v0.0.0-20241104164848-8ea9d0a3e17a/go.mod h1:H/HurP0qCcwcWdDRgvTJ/FRrVLnt++agxzHvgLQn/Ew= -github.com/grafana/opentelemetry-collector/processor/batchprocessor v0.0.0-20241104164848-8ea9d0a3e17a h1:WrDY3pU4EkHsCIepUAYWt7lIfsMEPdugmYB8k6RYjMo= -github.com/grafana/opentelemetry-collector/processor/batchprocessor v0.0.0-20241104164848-8ea9d0a3e17a/go.mod h1:QLQ31rGjPuMc/nGw4rL4HzQI9F0jVAPEmC342chxoqA= -github.com/grafana/opentelemetry-collector/service v0.0.0-20241104164848-8ea9d0a3e17a h1:ZycgUSrrwtB2x1fMdLD88J2k8886/HvX1tYHlaOH/hg= -github.com/grafana/opentelemetry-collector/service v0.0.0-20241104164848-8ea9d0a3e17a/go.mod h1:VTLnax+DjHal3q7WKQO0ITjWdfPTq2txaoNRcVXYzgE= github.com/grafana/postgres_exporter v0.15.1-0.20241105053755-e0a51174f168 h1:I7FyVTtge/3G5YHVOMDG0l4If6W+kXbFDqtzj5gCSGs= github.com/grafana/postgres_exporter v0.15.1-0.20241105053755-e0a51174f168/go.mod h1:dMrETGkSetWByp2XGsm8g6pRVh/ibnrDxKsN4BqnGNg= github.com/grafana/prometheus v1.8.2-0.20240514135907-13889ba362e6 h1:kih3d3M3dxAmrpFLvnIxFzWx8KMQyKxQwKgWP67C/Fg= @@ -1907,8 +1901,8 @@ github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPF github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0/go.mod h1:7t5XR+2IA8P2qggOAHTj/GCZfoLBle3OvNSYh1VkRBU= github.com/grafana/vmware_exporter v0.0.5-beta h1:2JCqzIWJzns8FN78wPsueC9rT3e3kZo2OUoL5kGMjdM= github.com/grafana/vmware_exporter v0.0.5-beta/go.mod h1:1CecUZII0zVsVcHtNfNeTTcxK7EksqAsAn/TCCB0Mh4= -github.com/grafana/walqueue v0.0.0-20241118151117-77b057cbe5a2 h1:FJuWxAbqu6uH6t7ymIoVEmO8Gw08x8snEWC42U0Ad4Q= -github.com/grafana/walqueue v0.0.0-20241118151117-77b057cbe5a2/go.mod h1:2B+4gxoOgzgRhstKcikROUHusMXLqd5nE/UKukaQrJI= +github.com/grafana/walqueue v0.0.0-20241202135041-6ec70efeec94 h1:d3Hgun3ailVbNArBIhvRIjmCBOOCO9ClKNpzqQFsMLE= +github.com/grafana/walqueue v0.0.0-20241202135041-6ec70efeec94/go.mod h1:2B+4gxoOgzgRhstKcikROUHusMXLqd5nE/UKukaQrJI= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445 h1:FlKQKUYPZ5yDCN248M3R7x8yu2E3yEZ0H7aLomE4EoE= github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445/go.mod h1:L69/dBlPQlWkcnU76WgcppK5e4rrxzQdi6LhLnK/ytA= @@ -3380,6 +3374,8 @@ go.opentelemetry.io/collector/filter v0.112.0 h1:xX0MIfXOkbcWzCcmNqvtpnBDnSZAJmN go.opentelemetry.io/collector/filter v0.112.0/go.mod h1:ZcPbD9CLxqcQJ5D2dV2Ma0Hm2IKMOYggTLW8dDdZQSQ= go.opentelemetry.io/collector/internal/memorylimiter v0.112.0 h1:u1hUa48x1qEONUSOtz8dx/c8oz74RpIHyWnGFJ7t0CE= go.opentelemetry.io/collector/internal/memorylimiter v0.112.0/go.mod h1:BtHruDt40QTW8klZVQCqsVfhVsOkh6hDg5w1cPvLpeU= +go.opentelemetry.io/collector/otelcol v0.112.0 h1:xOq7z5WK5jS1Qg5w+l99H1EiQRq9rHHDv7EIiLryldw= +go.opentelemetry.io/collector/otelcol v0.112.0/go.mod h1:H/HurP0qCcwcWdDRgvTJ/FRrVLnt++agxzHvgLQn/Ew= go.opentelemetry.io/collector/otelcol/otelcoltest v0.112.0 h1:AdjoVnYl7RxoOvhWZcJb0SWY1VvlRT1cdlCwHBpn9vs= go.opentelemetry.io/collector/otelcol/otelcoltest v0.112.0/go.mod h1:VSbEYgmiSM5K6p501XD35QuhxbDpkxrfS2Wf5OKnHPs= go.opentelemetry.io/collector/pdata v1.18.0 h1:/yg2rO2dxqDM2p6GutsMCxXN6sKlXwyIz/ZYyUPONBg= @@ -3394,6 +3390,8 @@ go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.112.0 h1:opXGNrlJAjYR go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.112.0/go.mod h1:c9yn4x+vY3G10eLCRuUu/oH7Y8YdE/BsgmLWmfHkaNY= go.opentelemetry.io/collector/processor v0.112.0 h1:nMv9DOBYR9MB78ddUgY3A3ytwAwk3t4HQMNIu+w8o0g= go.opentelemetry.io/collector/processor v0.112.0/go.mod h1:AJ8EHq8Z/ev90f4gU6G5ULUncdpWmBRATYk8ioR3pvw= +go.opentelemetry.io/collector/processor/batchprocessor v0.112.0 h1:Dq/RpdClawI8HrnSi177LziPjfHo733BWOCgRTbWrfY= +go.opentelemetry.io/collector/processor/batchprocessor v0.112.0/go.mod h1:QLQ31rGjPuMc/nGw4rL4HzQI9F0jVAPEmC342chxoqA= go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.112.0 h1:+V+4OUcg1s3CrZpttT4dA+Uuv7VWpOIPQpOkcsrMBIo= go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.112.0/go.mod h1:f/eEZ3JMbRNLsRzNwATtTjuulDrLvhYMvXinLrmHtTU= go.opentelemetry.io/collector/processor/processorprofiles v0.112.0 h1:Aef68SAbmBbhbsZZPuZb0ECwkV05vIcHIizGOGbWsbM= @@ -3408,6 +3406,8 @@ go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 h1:SShkZsWRsFss go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0/go.mod h1:615smszDXiz4YWwXslxlAjX7FzOVDU7Bk6xARFk+zpk= go.opentelemetry.io/collector/semconv v0.112.0 h1:JPQyvZhlNLVSuVI+FScONaiFygB7h7NTZceUEKIQUEc= go.opentelemetry.io/collector/semconv v0.112.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A= +go.opentelemetry.io/collector/service v0.112.0 h1:SI5bwPrRHLRn/kR9AoSSDX/8vaKFe+NMYloAcXfWMSE= +go.opentelemetry.io/collector/service v0.112.0/go.mod h1:VTLnax+DjHal3q7WKQO0ITjWdfPTq2txaoNRcVXYzgE= go.opentelemetry.io/contrib/config v0.10.0 h1:2JknAzMaYjxrHkTnZh3eOme/Y2P5eHE2SWfhfV6Xd6c= go.opentelemetry.io/contrib/config v0.10.0/go.mod h1:aND2M6/KfNkntI5cyvHriR/zvZgPf8j9yETdSmvpfmc= go.opentelemetry.io/contrib/detectors/aws/ec2 v1.28.0 h1:d+y/wygENfwEbVpo7c3A9GfnMhoTiepQcthQSh+Mc9g= diff --git a/internal/component/loki/source/podlogs/reconciler.go b/internal/component/loki/source/podlogs/reconciler.go index 977cb57bba..65c74b49b3 100644 --- a/internal/component/loki/source/podlogs/reconciler.go +++ b/internal/component/loki/source/podlogs/reconciler.go @@ -3,6 +3,7 @@ package podlogs import ( "context" "fmt" + "slices" "sort" "strings" "sync" @@ -15,6 +16,7 @@ import ( "github.com/grafana/alloy/internal/service/cluster" "github.com/grafana/ckit/shard" "github.com/prometheus/common/model" + prom_lbls "github.com/prometheus/prometheus/model/labels" promlabels "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/util/strutil" @@ -126,6 +128,17 @@ func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error { return nil } +func filterLabels(lbls prom_lbls.Labels, keysToKeep []string) prom_lbls.Labels { + var res prom_lbls.Labels + for _, k := range lbls { + if slices.Contains(keysToKeep, k.Name) { + res = append(res, prom_lbls.Label{Name: k.Name, Value: k.Value}) + } + } + sort.Sort(res) + return res +} + func distributeTargets(c cluster.Cluster, targets []*kubetail.Target) []*kubetail.Target { if c == nil { return targets @@ -140,7 +153,11 @@ func distributeTargets(c cluster.Cluster, targets []*kubetail.Target) []*kubetai res := make([]*kubetail.Target, 0, resCap) for _, target := range targets { - peers, err := c.Lookup(shard.StringKey(target.Labels().String()), 1, shard.OpReadWrite) + // Only take into account the labels necessary to uniquely identify a pod/container instance. + // If we take into account more labels than necessary, there may be issues due to labels changing + // over the lifetime of the pod. + clusteringLabels := filterLabels(target.DiscoveryLabels(), kubetail.ClusteringLabels) + peers, err := c.Lookup(shard.StringKey(clusteringLabels.String()), 1, shard.OpReadWrite) if err != nil { // This can only fail in case we ask for more owners than the // available peers. This will never happen, but in any case we fall diff --git a/internal/component/prometheus/write/queue/e2e_stats_test.go b/internal/component/prometheus/write/queue/e2e_stats_test.go index bb4c059d1c..b6737d5be8 100644 --- a/internal/component/prometheus/write/queue/e2e_stats_test.go +++ b/internal/component/prometheus/write/queue/e2e_stats_test.go @@ -49,6 +49,8 @@ const alloyMetadataRetried = "alloy_queue_metadata_network_retried" const alloyNetworkTimestamp = "alloy_queue_series_network_timestamp_seconds" +const alloyDrift = "alloy_queue_series_timestamp_drift_seconds" + // TestMetadata is the large end to end testing for the queue based wal, specifically for metadata. func TestMetadata(t *testing.T) { // Check assumes you are checking for any value that is not 0. @@ -228,6 +230,10 @@ func TestMetrics(t *testing.T) { name: inTimestamp, valueFunc: isReasonableTimeStamp, }, + { + name: alloyDrift, + valueFunc: greaterThenZero, + }, }, }, { @@ -270,6 +276,10 @@ func TestMetrics(t *testing.T) { name: inTimestamp, valueFunc: isReasonableTimeStamp, }, + { + name: alloyDrift, + valueFunc: greaterThenZero, + }, }, }, // histograms @@ -353,6 +363,10 @@ func TestMetrics(t *testing.T) { name: inTimestamp, valueFunc: isReasonableTimeStamp, }, + { + name: alloyDrift, + valueFunc: greaterThenZero, + }, }, }, { @@ -395,6 +409,10 @@ func TestMetrics(t *testing.T) { name: inTimestamp, valueFunc: isReasonableTimeStamp, }, + { + name: alloyDrift, + valueFunc: greaterThenZero, + }, }, }, // TURNING OFF EXEMPLAR TESTS until underlying issue is resolved. diff --git a/internal/component/prometheus/write/queue/e2e_test.go b/internal/component/prometheus/write/queue/e2e_test.go index dfd3963d47..71b8516ce7 100644 --- a/internal/component/prometheus/write/queue/e2e_test.go +++ b/internal/component/prometheus/write/queue/e2e_test.go @@ -234,7 +234,7 @@ func handlePost(t *testing.T, _ http.ResponseWriter, r *http.Request) ([]prompb. } func makeSeries(index int) (int64, float64, labels.Labels) { - return time.Now().UTC().Unix(), float64(index), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)) + return time.Now().UTC().UnixMilli(), float64(index), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)) } func makeMetadata(index int) (metadata.Metadata, labels.Labels) { @@ -246,13 +246,13 @@ func makeMetadata(index int) (metadata.Metadata, labels.Labels) { } func makeHistogram(index int) (int64, labels.Labels, *histogram.Histogram) { - return time.Now().UTC().Unix(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), hist(index) + return time.Now().UTC().UnixMilli(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), hist(index) } func makeExemplar(index int) exemplar.Exemplar { return exemplar.Exemplar{ Labels: labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), - Ts: time.Now().Unix(), + Ts: time.Now().UnixMilli(), HasTs: true, Value: float64(index), } @@ -305,7 +305,7 @@ func histSpanSame(t *testing.T, h []histogram.Span, pb []prompb.BucketSpan) { } func makeFloatHistogram(index int) (int64, labels.Labels, *histogram.FloatHistogram) { - return time.Now().UTC().Unix(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), histFloat(index) + return time.Now().UTC().UnixMilli(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), histFloat(index) } func histFloat(i int) *histogram.FloatHistogram { diff --git a/internal/component/pyroscope/write/parser.go b/internal/component/pyroscope/write/parser.go index 385a241862..3bd97583b3 100644 --- a/internal/component/pyroscope/write/parser.go +++ b/internal/component/pyroscope/write/parser.go @@ -192,7 +192,7 @@ func validateAppName(n string) error { } func isAppNameRuneAllowed(r rune) bool { - return r == '-' || r == '.' || isTagKeyRuneAllowed(r) + return r == '-' || r == '.' || r == '/' || isTagKeyRuneAllowed(r) } func isTagKeyReserved(k string) bool { diff --git a/internal/component/pyroscope/write/parser_test.go b/internal/component/pyroscope/write/parser_test.go new file mode 100644 index 0000000000..66cd6e9cd0 --- /dev/null +++ b/internal/component/pyroscope/write/parser_test.go @@ -0,0 +1,150 @@ +package write + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseKey(t *testing.T) { + tests := []struct { + name string + input string + expected *Key + wantErr bool + }{ + { + name: "basic app name", + input: "simple-app", + expected: &Key{ + labels: map[string]string{ + "__name__": "simple-app", + }, + }, + }, + { + name: "app name with slashes and tags", + input: "my/service/name{environment=prod,version=1.0}", + expected: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + "environment": "prod", + "version": "1.0", + }, + }, + }, + { + name: "multiple slashes and special characters", + input: "app/service/v1.0-beta/component{region=us-west}", + expected: &Key{ + labels: map[string]string{ + "__name__": "app/service/v1.0-beta/component", + "region": "us-west", + }, + }, + }, + { + name: "empty app name", + input: "{}", + wantErr: true, + }, + { + name: "invalid characters in tag key", + input: "my/service/name{invalid@key=value}", + wantErr: true, + }, + { + name: "whitespace handling", + input: "my/service/name{ tag1 = value1 , tag2 = value2 }", + expected: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + "tag1": "value1", + "tag2": "value2", + }, + }, + }, + { + name: "dots in service name", + input: "my/service.name/v1.0{environment=prod}", + expected: &Key{ + labels: map[string]string{ + "__name__": "my/service.name/v1.0", + "environment": "prod", + }, + }, + }, + { + name: "app name with slashes", + input: "my/service/name{}", + expected: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseKey(tt.input) + + if tt.wantErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tt.expected, got) + }) + } +} + +func TestKey_Normalized(t *testing.T) { + tests := []struct { + name string + key *Key + expected string + }{ + { + name: "simple normalization", + key: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + }, + }, + expected: "my/service/name{}", + }, + { + name: "normalization with tags", + key: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + "environment": "prod", + "version": "1.0", + }, + }, + expected: "my/service/name{environment=prod,version=1.0}", + }, + { + name: "tags should be sorted", + key: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + "c": "3", + "b": "2", + "a": "1", + }, + }, + expected: "my/service/name{a=1,b=2,c=3}", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.key.Normalized() + assert.Equal(t, tt.expected, got) + }) + } +} diff --git a/internal/static/traces/config.go b/internal/static/traces/config.go index 93b0ff3b08..c69eabad6e 100644 --- a/internal/static/traces/config.go +++ b/internal/static/traces/config.go @@ -1,6 +1,8 @@ package traces import ( + "bytes" + "context" "encoding/base64" "errors" "fmt" @@ -24,7 +26,9 @@ import ( "github.com/prometheus/client_golang/prometheus" prom_config "github.com/prometheus/common/config" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/provider/yamlprovider" otelexporter "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/exporter/otlphttpexporter" @@ -277,25 +281,6 @@ type TLSClientSetting struct { ServerNameOverride string `yaml:"server_name_override,omitempty"` } -func (c TLSClientSetting) toOtelConfig() map[string]interface{} { - m := make(map[string]interface{}, 0) - m["ca_file"] = c.CAFile - m["ca_pem"] = c.CAPem - m["include_system_ca_certs_pool"] = c.IncludeSystemCACertsPool - m["cert_file"] = c.CertFile - m["cert_pem"] = c.CertPem - m["key_file"] = c.KeyFile - m["key_pem"] = c.KeyPem - m["min_version"] = c.MinVersion - m["max_version"] = c.MaxVersion - m["cipher_suites"] = c.CipherSuites - m["reload_interval"] = c.ReloadInterval - m["insecure"] = c.Insecure - m["insecure_skip_verify"] = c.InsecureSkipVerify - m["server_name_override"] = c.ServerNameOverride - return m -} - // OAuth2Config configures the oauth2client extension for a remote_write exporter // compatible with oauth2clientauthextension.Config type OAuth2Config struct { @@ -308,21 +293,6 @@ type OAuth2Config struct { Timeout time.Duration `yaml:"timeout,omitempty"` } -// Agent uses standard YAML unmarshalling, while the oauth2clientauthextension relies on -// mapstructure without providing YAML labels. `toOtelConfig` marshals `Oauth2Config` to configuration type expected by -// the oauth2clientauthextension Extension Factory -func (c OAuth2Config) toOtelConfig() map[string]interface{} { - m := make(map[string]interface{}, 0) - m["client_id"] = c.ClientID - m["client_secret"] = c.ClientSecret - m["endpoint_params"] = c.EndpointParams - m["token_url"] = c.TokenURL - m["scopes"] = c.Scopes - m["tls"] = c.TLS.toOtelConfig() - m["timeout"] = c.Timeout - return m -} - // RemoteWriteConfig controls the configuration of an exporter type RemoteWriteConfig struct { Endpoint string `yaml:"endpoint,omitempty"` @@ -571,8 +541,7 @@ func (c *InstanceConfig) extensions() (map[string]interface{}, error) { if err != nil { return nil, err } - oauthConfig := remoteWriteConfig.Oauth2.toOtelConfig() - extensions[getAuthExtensionName(exporterName)] = oauthConfig + extensions[getAuthExtensionName(exporterName)] = remoteWriteConfig.Oauth2 } if c.JaegerRemoteSampling != nil { if len(c.JaegerRemoteSampling) == 0 { @@ -1002,26 +971,46 @@ func orderProcessors(processors []string, splitPipelines bool) [][]string { } func otelcolConfigFromStringMap(otelMapStructure map[string]interface{}, factories *otelcol.Factories) (*otelcol.Config, error) { - configMap := confmap.NewFromStringMap(otelMapStructure) - otelCfg, err := otelcol.Unmarshal(configMap, *factories) + var b bytes.Buffer + enc := yaml.NewEncoder(&b) + + enc.SetHook(func(in interface{}) (ok bool, out interface{}, err error) { + switch v := in.(type) { + case SecretString: + return true, string(v), nil + case configopaque.String: + return true, string(v), nil + default: + return false, nil, nil + } + }) + + if err := enc.Encode(otelMapStructure); err != nil { + return nil, err + } + cp, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + URIs: []string{"yaml:" + string(b.Bytes())}, + ProviderFactories: []confmap.ProviderFactory{ + yamlprovider.NewFactory(), + }, + }, + }) + if err != nil { - return nil, fmt.Errorf("failed to load OTel config: %w", err) + return nil, fmt.Errorf("failed to create config provider: %w", err) } - res := otelcol.Config{ - Receivers: otelCfg.Receivers.Configs(), - Processors: otelCfg.Processors.Configs(), - Exporters: otelCfg.Exporters.Configs(), - Connectors: otelCfg.Connectors.Configs(), - Extensions: otelCfg.Extensions.Configs(), - Service: otelCfg.Service, + otelCfg, err := cp.Get(context.Background(), *factories) + if err != nil { + return nil, fmt.Errorf("failed to load OTel config: %w", err) } - if err := res.Validate(); err != nil { + if err := otelCfg.Validate(); err != nil { return nil, err } - return &res, nil + return otelCfg, nil } // Code taken from OTel's service/configcheck.go diff --git a/internal/static/traces/config_test.go b/internal/static/traces/config_test.go index f67dbfd620..d8a24a55fc 100644 --- a/internal/static/traces/config_test.go +++ b/internal/static/traces/config_test.go @@ -1160,8 +1160,6 @@ remote_write: scopes: ["api.metrics"] timeout: 2s `, - // The tls.ciphersuites would appear that it should output nothing or nil but during the conversion otelcolConfigFromStringMap the otelcol.Unmarshal converts the nil array to a blank one []. This is specifically used - // in confmap.go that transforms nil arrays into [] on the zeroSliceHookFunc. expectedConfig: ` receivers: push_receiver: {} @@ -1175,8 +1173,6 @@ extensions: token_url: https://example.com/oauth2/default/v1/token scopes: ["api.metrics"] timeout: 2s - tls: - cipher_suites: [] exporters: otlphttp/0: endpoint: example.com:12345 @@ -1245,7 +1241,6 @@ extensions: key_file: keyfile min_version: "1.3" reload_interval: 1h - cipher_suites: [] exporters: otlphttp/0: endpoint: example.com:12345 @@ -1314,7 +1309,6 @@ extensions: key_pem: test_secret_key_pem_string max_version: "1.2" reload_interval: 1h - cipher_suites: [] exporters: otlphttp/0: endpoint: example.com:12345 @@ -1371,16 +1365,12 @@ extensions: token_url: https://example.com/oauth2/default/v1/token scopes: ["api.metrics"] timeout: 2s - tls: - cipher_suites: [] oauth2client/otlp1: client_id: anotherclientid client_secret: anotherclientsecret token_url: https://example.com/oauth2/default/v1/token scopes: ["api.metrics"] timeout: 2s - tls: - cipher_suites: [] exporters: otlphttp/0: endpoint: example.com:12345 @@ -1441,7 +1431,6 @@ extensions: timeout: 2s tls: insecure: true - cipher_suites: [] exporters: otlphttp/0: endpoint: http://example.com:12345 @@ -1545,7 +1534,7 @@ service: sortService(actualConfig) sortService(expectedConfig) - assert.Equal(t, *expectedConfig, *actualConfig) + assert.Equal(t, expectedConfig, actualConfig) }) } } diff --git a/internal/static/traces/traceutils/server.go b/internal/static/traces/traceutils/server.go deleted file mode 100644 index 548c514088..0000000000 --- a/internal/static/traces/traceutils/server.go +++ /dev/null @@ -1,247 +0,0 @@ -package traceutils - -import ( - "context" - "fmt" - "math/rand" - "strings" - "testing" - "time" - - "github.com/grafana/alloy/internal/util" - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/extension" - "go.opentelemetry.io/collector/otelcol" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/otlpreceiver" - "go.opentelemetry.io/collector/service" - "go.opentelemetry.io/otel/trace/noop" - "gopkg.in/yaml.v3" -) - -// server is a Tracing testing server that invokes a function every time a span -// is received. -type server struct { - service *service.Service -} - -// NewTestServer creates a new server for testing, where received traces will -// call the callback function. The returned string is the address where traces -// can be sent using OTLP. -func NewTestServer(t *testing.T, callback func(ptrace.Traces)) string { - t.Helper() - - srv, listenAddr, err := newServerWithRandomPort(callback) - if err != nil { - t.Fatalf("failed to create OTLP server: %s", err) - } - t.Cleanup(func() { - err := srv.stop() - assert.NoError(t, err) - }) - - return listenAddr -} - -// newServerWithRandomPort calls NewServer with a random port >49152 and -// <65535. It will try up to five times before failing. -func newServerWithRandomPort(callback func(ptrace.Traces)) (srv *server, addr string, err error) { - var lastError error - - for i := 0; i < 5; i++ { - port := rand.Intn(65535-49152) + 49152 - listenAddr := fmt.Sprintf("127.0.0.1:%d", port) - - srv, err = newServer(listenAddr, callback) - if err != nil { - lastError = err - continue - } - - return srv, listenAddr, nil - } - - return nil, "", fmt.Errorf("failed 5 times to create a server. last error: %w", lastError) -} - -// newServer creates an OTLP-accepting server that calls a function when a -// trace is received. This is primarily useful for testing. -func newServer(addr string, callback func(ptrace.Traces)) (*server, error) { - conf := util.Untab(fmt.Sprintf(` - processors: - func_processor: - receivers: - otlp: - protocols: - grpc: - endpoint: %s - exporters: - noop: - service: - pipelines: - traces: - receivers: [otlp] - processors: [func_processor] - exporters: [noop] - `, addr)) - - var cfg map[string]interface{} - if err := yaml.NewDecoder(strings.NewReader(conf)).Decode(&cfg); err != nil { - panic("could not decode config: " + err.Error()) - } - - extensionsFactory, err := extension.MakeFactoryMap() - if err != nil { - return nil, fmt.Errorf("failed to make extension factory map: %w", err) - } - - receiversFactory, err := receiver.MakeFactoryMap(otlpreceiver.NewFactory()) - if err != nil { - return nil, fmt.Errorf("failed to make receiver factory map: %w", err) - } - - exportersFactory, err := exporter.MakeFactoryMap(newNoopExporterFactory()) - if err != nil { - return nil, fmt.Errorf("failed to make exporter factory map: %w", err) - } - - processorsFactory, err := processor.MakeFactoryMap( - newFuncProcessorFactory(callback), - ) - if err != nil { - return nil, fmt.Errorf("failed to make processor factory map: %w", err) - } - - factories := otelcol.Factories{ - Extensions: extensionsFactory, - Receivers: receiversFactory, - Processors: processorsFactory, - Exporters: exportersFactory, - } - - configMap := confmap.NewFromStringMap(cfg) - otelCfgSettings, err := otelcol.Unmarshal(configMap, factories) - if err != nil { - return nil, fmt.Errorf("failed to make otel config: %w", err) - } - - otelCfg := otelcol.Config{ - Receivers: otelCfgSettings.Receivers.Configs(), - Processors: otelCfgSettings.Processors.Configs(), - Exporters: otelCfgSettings.Exporters.Configs(), - Connectors: otelCfgSettings.Connectors.Configs(), - Extensions: otelCfgSettings.Extensions.Configs(), - Service: otelCfgSettings.Service, - } - - if err := otelCfg.Validate(); err != nil { - return nil, err - } - - svc, err := service.New(context.Background(), service.Settings{ - ReceiversConfigs: otelCfg.Receivers, - ReceiversFactories: factories.Receivers, - ProcessorsConfigs: otelCfg.Processors, - ProcessorsFactories: factories.Processors, - ExportersConfigs: otelCfg.Exporters, - ExportersFactories: factories.Exporters, - ConnectorsConfigs: otelCfg.Connectors, - ConnectorsFactories: factories.Connectors, - ExtensionsConfigs: otelCfg.Extensions, - ExtensionsFactories: factories.Extensions, - TracerProvider: noop.NewTracerProvider(), - }, otelCfg.Service) - if err != nil { - return nil, fmt.Errorf("failed to create Otel service: %w", err) - } - - if err := svc.Start(context.Background()); err != nil { - return nil, fmt.Errorf("failed to start Otel service: %w", err) - } - - return &server{ - service: svc, - }, nil -} - -// stop stops the testing server. -func (s *server) stop() error { - shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - return s.service.Shutdown(shutdownCtx) -} - -func newFuncProcessorFactory(callback func(ptrace.Traces)) processor.Factory { - return processor.NewFactory( - component.MustNewType("func_processor"), - func() component.Config { - return &struct{}{} - }, - processor.WithTraces(func( - _ context.Context, - _ processor.Settings, - _ component.Config, - next consumer.Traces, - ) (processor.Traces, error) { - - return &funcProcessor{ - Callback: callback, - Next: next, - }, nil - }, component.StabilityLevelUndefined), - ) -} - -type funcProcessor struct { - Callback func(ptrace.Traces) - Next consumer.Traces -} - -func (p *funcProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - if p.Callback != nil { - p.Callback(td) - } - return p.Next.ConsumeTraces(ctx, td) -} - -func (p *funcProcessor) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: true} -} - -func (p *funcProcessor) Start(context.Context, component.Host) error { return nil } -func (p *funcProcessor) Shutdown(context.Context) error { return nil } - -func newNoopExporterFactory() exporter.Factory { - return exporter.NewFactory( - component.MustNewType("noop"), - func() component.Config { - return &struct{}{} - }, - exporter.WithTraces(func( - context.Context, - exporter.Settings, - component.Config) ( - exporter.Traces, - error) { - - return &noopExporter{}, nil - }, component.StabilityLevelUndefined), - ) -} - -type noopExporter struct{} - -func (n noopExporter) Start(context.Context, component.Host) error { return nil } - -func (n noopExporter) Shutdown(context.Context) error { return nil } - -func (n noopExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{} } - -func (n noopExporter) ConsumeTraces(context.Context, ptrace.Traces) error { return nil }