From 391315978a3a694d18fea08c6556b86c737bc343 Mon Sep 17 00:00:00 2001 From: aidanleuck Date: Thu, 2 Jan 2025 19:41:46 -0700 Subject: [PATCH] MAIL for documentation and merge --- .github/workflows/helm-test.yml | 2 +- CHANGELOG.md | 6 +- .../components/otelcol/otelcol.auth.basic.md | 2 +- .../components/otelcol/otelcol.auth.bearer.md | 2 +- .../otelcol/otelcol.auth.headers.md | 2 +- .../components/otelcol/otelcol.auth.oauth2.md | 2 +- .../components/otelcol/otelcol.auth.sigv4.md | 2 +- ...telcol.extension.jaeger_remote_sampling.md | 11 +- .../otelcol/otelcol.receiver.datadog.md | 9 +- .../otelcol/otelcol.receiver.influxdb.md | 9 +- .../otelcol/otelcol.receiver.jaeger.md | 13 +- .../otelcol/otelcol.receiver.opencensus.md | 9 +- .../otelcol/otelcol.receiver.otlp.md | 9 +- .../otelcol/otelcol.receiver.zipkin.md | 9 +- .../prometheus/prometheus.remote_write.md | 2 +- docs/sources/troubleshoot/debug.md | 1 + go.mod | 5 +- go.sum | 5 +- .../cmd/integration-tests/docker-compose.yaml | 2 +- internal/component/component_provider.go | 43 +++--- .../component/discovery/relabel/relabel.go | 24 ++- internal/component/otelcol/auth/auth.go | 2 +- .../component/otelcol/connector/connector.go | 7 +- .../component/otelcol/exporter/exporter.go | 7 +- .../component/otelcol/extension/extension.go | 2 +- .../otelcol/internal/scheduler/scheduler.go | 137 ++++++++++-------- .../internal/scheduler/scheduler_test.go | 37 +++-- .../component/otelcol/processor/processor.go | 8 +- .../component/otelcol/receiver/receiver.go | 2 +- .../pyroscope/receive_http/receive_http.go | 28 +++- .../receive_http/receive_http_test.go | 42 ++++++ internal/runtime/alloy_components.go | 4 + .../src/features/component/ComponentView.tsx | 29 ++-- .../web/ui/src/features/component/types.ts | 5 + 34 files changed, 322 insertions(+), 157 deletions(-) diff --git a/.github/workflows/helm-test.yml b/.github/workflows/helm-test.yml index 43fc01757e..aec51de3f0 100644 --- a/.github/workflows/helm-test.yml +++ b/.github/workflows/helm-test.yml @@ -71,7 +71,7 @@ jobs: run: ct lint --config ./operations/helm/ct.yaml - name: Create kind cluster - uses: helm/kind-action@v1.10.0 + uses: helm/kind-action@v1.11.0 if: steps.list-changed.outputs.changed == 'true' - name: Add dependency chart repos diff --git a/CHANGELOG.md b/CHANGELOG.md index 38b5dbd68a..14770f3fac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,12 +50,14 @@ Main (unreleased) - Change processlist query to support ONLY_FULL_GROUP_BY sql_mode - Add perf_schema quantile columns to collector +- Live Debugging button should appear in UI only for supported components (@ravishankar15) - Add three new stdlib functions to_base64, from_URLbase64 and to_URLbase64 (@ravishankar15) - Add `ignore_older_than` option for local.file_match (@ravishankar15) +- Add livedebugging support for `discover.relabel` (@ravishankar15) -- Use a forked `github.com/goccy/go-json` module which reduces the memory consumption of an Alloy instance by 20MB. +- Upgrade `github.com/goccy/go-json` to v0.10.4, which reduces the memory consumption of an Alloy instance by 20MB. If Alloy is running certain otelcol components, this reduction will not apply. (@ptodev) - + - Update `prometheus.write.queue` library for performance increases in cpu. (@mattdurham) ### Bugfixes diff --git a/docs/sources/reference/components/otelcol/otelcol.auth.basic.md b/docs/sources/reference/components/otelcol/otelcol.auth.basic.md index 814773ba6e..a046bc036d 100644 --- a/docs/sources/reference/components/otelcol/otelcol.auth.basic.md +++ b/docs/sources/reference/components/otelcol/otelcol.auth.basic.md @@ -11,7 +11,7 @@ title: otelcol.auth.basic `otelcol.auth.basic` exposes a `handler` that can be used by other `otelcol` components to authenticate requests using basic authentication. -This extension supports both server and client authentication. +This `component` supports both server and client authentication. > **NOTE**: `otelcol.auth.basic` is a wrapper over the upstream OpenTelemetry > Collector `basicauth` extension. Bug reports or feature requests will be diff --git a/docs/sources/reference/components/otelcol/otelcol.auth.bearer.md b/docs/sources/reference/components/otelcol/otelcol.auth.bearer.md index e7725159e5..56745d4586 100644 --- a/docs/sources/reference/components/otelcol/otelcol.auth.bearer.md +++ b/docs/sources/reference/components/otelcol/otelcol.auth.bearer.md @@ -11,7 +11,7 @@ title: otelcol.auth.bearer `otelcol.auth.bearer` exposes a `handler` that can be used by other `otelcol` components to authenticate requests using bearer token authentication. -This extension supports both server and client authentication. +This `component` supports both server and client authentication. {{< admonition type="note" >}} `otelcol.auth.bearer` is a wrapper over the upstream OpenTelemetry Collector `bearertokenauth` extension. diff --git a/docs/sources/reference/components/otelcol/otelcol.auth.headers.md b/docs/sources/reference/components/otelcol/otelcol.auth.headers.md index 2ebb557f4f..55353a1945 100644 --- a/docs/sources/reference/components/otelcol/otelcol.auth.headers.md +++ b/docs/sources/reference/components/otelcol/otelcol.auth.headers.md @@ -11,7 +11,7 @@ title: otelcol.auth.headers `otelcol.auth.headers` exposes a `handler` that can be used by other `otelcol` components to authenticate requests using custom headers. -This extension only supports client authentication. +This `component` only supports client authentication. {{< admonition type="note" >}} `otelcol.auth.headers` is a wrapper over the upstream OpenTelemetry Collector `headerssetter` extension. diff --git a/docs/sources/reference/components/otelcol/otelcol.auth.oauth2.md b/docs/sources/reference/components/otelcol/otelcol.auth.oauth2.md index 83c1a30dfb..52cd9dbca5 100644 --- a/docs/sources/reference/components/otelcol/otelcol.auth.oauth2.md +++ b/docs/sources/reference/components/otelcol/otelcol.auth.oauth2.md @@ -10,7 +10,7 @@ title: otelcol.auth.oauth2 `otelcol.auth.oauth2` exposes a `handler` that can be used by other `otelcol` components to authenticate requests using OAuth 2.0. -This extension only supports client authentication. +This `component` only supports client authentication. The authorization tokens can be used by HTTP and gRPC based OpenTelemetry exporters. This component can fetch and refresh expired tokens automatically. diff --git a/docs/sources/reference/components/otelcol/otelcol.auth.sigv4.md b/docs/sources/reference/components/otelcol/otelcol.auth.sigv4.md index 300d9db837..a4d55c16d8 100644 --- a/docs/sources/reference/components/otelcol/otelcol.auth.sigv4.md +++ b/docs/sources/reference/components/otelcol/otelcol.auth.sigv4.md @@ -12,7 +12,7 @@ title: otelcol.auth.sigv4 components to authenticate requests to AWS services using the AWS Signature Version 4 (SigV4) protocol. For more information about SigV4 see the AWS documentation about [Signing AWS API requests][]. -This extension only supports client authentication. +This `component` only supports client authentication. [Signing AWS API requests]: https://docs.aws.amazon.com/general/latest/gr/signing-aws-api-requests.html diff --git a/docs/sources/reference/components/otelcol/otelcol.extension.jaeger_remote_sampling.md b/docs/sources/reference/components/otelcol/otelcol.extension.jaeger_remote_sampling.md index 9796fbcea0..58eda23e13 100644 --- a/docs/sources/reference/components/otelcol/otelcol.extension.jaeger_remote_sampling.md +++ b/docs/sources/reference/components/otelcol/otelcol.extension.jaeger_remote_sampling.md @@ -296,9 +296,14 @@ otelcol.extension.jaeger_remote_sampling "example" { } ``` -## Enabling Authentication - -You can create a `jaeger_remote_sampling` extensions that requires authentication for requests. This is useful for limiting access to the sampling document. Note that not all OpenTelemetry Collector (otelcol) authentication plugins support receiver authentication. Please refer to the documentation for each `otelcol.auth.*` plugin to determine its compatibility. +## Enable authentication + +You can use `jaeger_remote_sampling` to authenticate requests. +This allows you to limit access to the sampling document. + {{< admonition type="note" >}} +Not all OpenTelemetry Collector authentication plugins support receiver authentication. +Refer to the [documentation](https://grafana.com/docs/alloy//reference/components/otelcol/) for each `otelcol.auth.*` component to determine its compatibility. + {{< /admonition >}} ```alloy otelcol.extension.jaeger_remote_sampling "default" { diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.datadog.md b/docs/sources/reference/components/otelcol/otelcol.receiver.datadog.md index ae5c6eeae5..d805201d0f 100644 --- a/docs/sources/reference/components/otelcol/otelcol.receiver.datadog.md +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.datadog.md @@ -136,9 +136,14 @@ otelcol.exporter.otlp "default" { } ``` -## Enabling Authentication +## Enable authentication -You can create a `datadog` receiver that requires authentication for requests. This is useful for limiting who can push data to the server. Note that not all OpenTelemetry Collector (otelcol) authentication plugins support receiver authentication. Please refer to the documentation for each `otelcol.auth.*` plugin to determine its compatibility. +You can create a `otelcol.receiver.datadog` component that requires authentication for requests. This is useful for limiting who can push data to the server. + +{{< admonition type="note" >}} +Not all OpenTelemetry Collector authentication plugins support receiver authentication. +Refer to the [documentation](https://grafana.com/docs/alloy//reference/components/otelcol/) for each `otelcol.auth.*` component to determine its compatibility. +{{< /admonition >}} ```alloy otelcol.receiver.datadog "default" { diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md b/docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md index bfac9e933b..c0e3883273 100644 --- a/docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md @@ -151,9 +151,14 @@ prometheus.remote_write "mimir" { } ``` -## Enabling Authentication +## Enable authentication -You can create a `influxdb` receiver that requires authentication for requests. This is useful for limiting who can push data to the server. Note that not all OpenTelemetry Collector (otelcol) authentication plugins support receiver authentication. Please refer to the documentation for each `otelcol.auth.*` plugin to determine its compatibility. +You can create a `otelcol.receiver.influxdb` component that requires authentication for requests. This is useful for limiting who can push data to the server. + +{{< admonition type="note" >}} +Not all OpenTelemetry Collector authentication plugins support receiver authentication. +Refer to the [documentation](https://grafana.com/docs/alloy//reference/components/otelcol/) for each `otelcol.auth.*` component to determine its compatibility. +{{< /admonition >}} ```alloy otelcol.receiver.influxdb "influxdb_metrics" { diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.jaeger.md b/docs/sources/reference/components/otelcol/otelcol.receiver.jaeger.md index 4349cf56f1..afacf77ba4 100644 --- a/docs/sources/reference/components/otelcol/otelcol.receiver.jaeger.md +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.jaeger.md @@ -264,9 +264,18 @@ otelcol.exporter.otlp "default" { `otelcol.receiver.jaeger` supports [Gzip](https://en.wikipedia.org/wiki/Gzip) for compression. -## Enabling Authentication +## Enable authentication -You can create a `jaeger` receiver that requires authentication for requests. This is useful for limiting who can push data to the server. Note that not all OpenTelemetry Collector (otelcol) authentication plugins support receiver authentication. Please refer to the documentation for each `otelcol.auth.*` plugin to determine its compatibility. This functionality is currently limited to the GRPC/HTTP blocks. +You can create a `otelcol.receiver.jaeger` component that requires authentication for requests. This is useful for limiting who can push data to the server. + +{{< admonition type="note" >}} +This functionality is currently limited to the GRPC/HTTP blocks. +{{< /admonition >}} + +{{< admonition type="note" >}} +Not all OpenTelemetry Collector authentication plugins support receiver authentication. +Refer to the [documentation](https://grafana.com/docs/alloy//reference/components/otelcol/) for each `otelcol.auth.*` component to determine its compatibility. +{{< /admonition >}} ```alloy otelcol.receiver.jaeger "default" { diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.opencensus.md b/docs/sources/reference/components/otelcol/otelcol.receiver.opencensus.md index fe6767fce7..dd13657b2e 100644 --- a/docs/sources/reference/components/otelcol/otelcol.receiver.opencensus.md +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.opencensus.md @@ -210,9 +210,14 @@ otelcol.exporter.otlp "default" { } ``` -## Enabling Authentication +## Enable authentication -You can create a `opencensus` receiver that requires authentication for requests. This is useful for limiting who can push data to the server. Note that not all OpenTelemetry Collector (otelcol) authentication plugins support receiver authentication. Please refer to the documentation for each `otelcol.auth.*` plugin to determine its compatibility. +You can create a `otelcol.receiver.opencensus` component that requires authentication for requests. This is useful for limiting who can push data to the server. + +{{< admonition type="note" >}} +Not all OpenTelemetry Collector authentication plugins support receiver authentication. +Refer to the [documentation](https://grafana.com/docs/alloy//reference/components/otelcol/) for each `otelcol.auth.*` component to determine its compatibility. +{{< /admonition >}} ```alloy otelcol.receiver.opencensus "default" { diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.otlp.md b/docs/sources/reference/components/otelcol/otelcol.receiver.otlp.md index c7239e7bbb..862e2bb4c1 100644 --- a/docs/sources/reference/components/otelcol/otelcol.receiver.otlp.md +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.otlp.md @@ -243,9 +243,14 @@ otelcol.exporter.otlp "default" { `otelcol.receiver.otlp` supports [gzip](https://en.wikipedia.org/wiki/Gzip) for compression. -## Enabling Authentication +## Enable authentication -You can create a `otlp` receiver that requires authentication for requests. This is useful for limiting who can push data to the server. Note that not all OpenTelemetry Collector (otelcol) authentication plugins support receiver authentication. Please refer to the documentation for each `otelcol.auth.*` plugin to determine its compatibility. +You can create a `otelcol.reciever.otlp` component that requires authentication for requests. This is useful for limiting who can push data to the server. + +{{< admonition type="note" >}} +Not all OpenTelemetry Collector authentication plugins support receiver authentication. +Refer to the [documentation](https://grafana.com/docs/alloy//reference/components/otelcol/) for each `otelcol.auth.*` component to determine its compatibility. +{{< /admonition >}} ```alloy otelcol.receiver.otlp "default" { diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.zipkin.md b/docs/sources/reference/components/otelcol/otelcol.receiver.zipkin.md index de6d8558cd..7e9ce32459 100644 --- a/docs/sources/reference/components/otelcol/otelcol.receiver.zipkin.md +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.zipkin.md @@ -143,9 +143,14 @@ otelcol.exporter.otlp "default" { } ``` -## Enabling Authentication +## Enable authentication -You can create a `zipkin` receiver that requires authentication for requests. This is useful for limiting who can push data to the server. Note that not all OpenTelemetry Collector (otelcol) authentication plugins support receiver authentication. Please refer to the documentation for each `otelcol.auth.*` plugin to determine its compatibility. +You can create a `otelcol.receiver.zipkin` component that requires authentication for requests. This is useful for limiting who can push data to the server. + +{{< admonition type="note" >}} +Not all OpenTelemetry Collector authentication plugins support receiver authentication. +Refer to the [documentation](https://grafana.com/docs/alloy//reference/components/otelcol/) for each `otelcol.auth.*` component to determine its compatibility. +{{< /admonition >}} ```alloy otelcol.receiver.zipkin "default" { diff --git a/docs/sources/reference/components/prometheus/prometheus.remote_write.md b/docs/sources/reference/components/prometheus/prometheus.remote_write.md index d2614a2e52..a919d750f0 100644 --- a/docs/sources/reference/components/prometheus/prometheus.remote_write.md +++ b/docs/sources/reference/components/prometheus/prometheus.remote_write.md @@ -427,7 +427,7 @@ To troubleshoot, take the following steps in order: You can use [Promtool][promtool] to inspect it and find out which metric series were sent by this {{< param "PRODUCT_NAME" >}} instance since the last WAL truncation event. For example: ``` - ./promtool tsdb dump --match='{__name__=\"otelcol_connector_spanmetrics_duration_seconds_bucket\", http_method=\"GET\", job=\"ExampleJobName\"' /path/to/wal/ + ./promtool tsdb dump --match='{__name__="otelcol_connector_spanmetrics_duration_seconds_bucket", http_method="GET", job="ExampleJobName"}' /path/to/wal/ ``` [clustering]: ../../configure/clustering diff --git a/docs/sources/troubleshoot/debug.md b/docs/sources/troubleshoot/debug.md index 0e674aa1a1..5b8518515d 100644 --- a/docs/sources/troubleshoot/debug.md +++ b/docs/sources/troubleshoot/debug.md @@ -111,6 +111,7 @@ Supported components: * `otelcol.receiver.*` * `prometheus.relabel` {{< /admonition >}} +* `discovery.relabel` ## Debug using the UI diff --git a/go.mod b/go.mod index 0c662da968..4dbf6351b1 100644 --- a/go.mod +++ b/go.mod @@ -518,7 +518,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/go-zookeeper/zk v1.0.3 // indirect github.com/gobwas/glob v0.2.3 // indirect - github.com/goccy/go-json v0.10.3 // indirect + github.com/goccy/go-json v0.10.4 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/googleapis v1.4.1 // indirect @@ -945,6 +945,3 @@ exclude ( ) replace github.com/prometheus/procfs => github.com/prometheus/procfs v0.12.0 - -// TODO(ptodev): Remove when this PR has been merged: https://github.com/goccy/go-json/pull/490 -replace github.com/goccy/go-json => github.com/grafana/go-json v0.0.0-20241106155216-71a03f133f5c diff --git a/go.sum b/go.sum index 77ad0b47c0..6076b1a759 100644 --- a/go.sum +++ b/go.sum @@ -1619,6 +1619,9 @@ github.com/goburrow/modbus v0.1.0/go.mod h1:Kx552D5rLIS8E7TyUwQ/UdHEqvX5T8tyiGBT github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= +github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM= +github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -1853,8 +1856,6 @@ github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb h1:AWE6+kvtE18HP+lRW github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb/go.mod h1:kkWM4WUV230bNG3urVRWPBnSJHs64y/0RmWjftnnn0c= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= -github.com/grafana/go-json v0.0.0-20241106155216-71a03f133f5c h1:yKBKEC347YZpgii1KazRCfxHsTaxMqWZzoivM1OTT50= -github.com/grafana/go-json v0.0.0-20241106155216-71a03f133f5c/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/grafana/go-offsets-tracker v0.1.7 h1:2zBQ7iiGzvyXY7LA8kaaSiEqH/Yx82UcfRabbY5aOG4= github.com/grafana/go-offsets-tracker v0.1.7/go.mod h1:qcQdu7zlUKIFNUdBJlLyNHuJGW0SKWKjkrN6jtt+jds= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I= diff --git a/internal/cmd/integration-tests/docker-compose.yaml b/internal/cmd/integration-tests/docker-compose.yaml index 1632286bab..451fd1b779 100644 --- a/internal/cmd/integration-tests/docker-compose.yaml +++ b/internal/cmd/integration-tests/docker-compose.yaml @@ -12,7 +12,7 @@ services: - "9009:9009" tempo: - image: grafana/tempo:latest + image: grafana/tempo:2.6.1 command: [ "-config.file=/etc/tempo.yaml" ] volumes: - ./configs/tempo/tempo.yaml:/etc/tempo.yaml diff --git a/internal/component/component_provider.go b/internal/component/component_provider.go index 1f2c981447..bc685ae597 100644 --- a/internal/component/component_provider.go +++ b/internal/component/component_provider.go @@ -123,9 +123,10 @@ type Info struct { ComponentName string // Name of the component. Health Health // Current component health. - Arguments Arguments // Current arguments value of the component. - Exports Exports // Current exports value of the component. - DebugInfo interface{} // Current debug info of the component. + Arguments Arguments // Current arguments value of the component. + Exports Exports // Current exports value of the component. + DebugInfo interface{} // Current debug info of the component. + LiveDebuggingEnabled bool } // MarshalJSON returns a JSON representation of cd. The format of the @@ -139,19 +140,20 @@ func (info *Info) MarshalJSON() ([]byte, error) { } componentDetailJSON struct { - Name string `json:"name"` - Type string `json:"type,omitempty"` - LocalID string `json:"localID"` - ModuleID string `json:"moduleID"` - Label string `json:"label,omitempty"` - References []string `json:"referencesTo"` - ReferencedBy []string `json:"referencedBy"` - Health *componentHealthJSON `json:"health"` - Original string `json:"original"` - Arguments json.RawMessage `json:"arguments,omitempty"` - Exports json.RawMessage `json:"exports,omitempty"` - DebugInfo json.RawMessage `json:"debugInfo,omitempty"` - CreatedModuleIDs []string `json:"createdModuleIDs,omitempty"` + Name string `json:"name"` + Type string `json:"type,omitempty"` + LocalID string `json:"localID"` + ModuleID string `json:"moduleID"` + Label string `json:"label,omitempty"` + References []string `json:"referencesTo"` + ReferencedBy []string `json:"referencedBy"` + Health *componentHealthJSON `json:"health"` + Original string `json:"original"` + Arguments json.RawMessage `json:"arguments,omitempty"` + Exports json.RawMessage `json:"exports,omitempty"` + DebugInfo json.RawMessage `json:"debugInfo,omitempty"` + CreatedModuleIDs []string `json:"createdModuleIDs,omitempty"` + LiveDebuggingEnabled bool `json:"liveDebuggingEnabled"` } ) @@ -196,10 +198,11 @@ func (info *Info) MarshalJSON() ([]byte, error) { Message: info.Health.Message, UpdatedTime: info.Health.UpdateTime, }, - Arguments: arguments, - Exports: exports, - DebugInfo: debugInfo, - CreatedModuleIDs: info.ModuleIDs, + Arguments: arguments, + Exports: exports, + DebugInfo: debugInfo, + CreatedModuleIDs: info.ModuleIDs, + LiveDebuggingEnabled: info.LiveDebuggingEnabled, }) } diff --git a/internal/component/discovery/relabel/relabel.go b/internal/component/discovery/relabel/relabel.go index 551f361d76..6613c6c801 100644 --- a/internal/component/discovery/relabel/relabel.go +++ b/internal/component/discovery/relabel/relabel.go @@ -2,12 +2,14 @@ package relabel import ( "context" + "fmt" "sync" "github.com/grafana/alloy/internal/component" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/service/livedebugging" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" ) @@ -46,13 +48,23 @@ type Component struct { mut sync.RWMutex rcs []*relabel.Config + + debugDataPublisher livedebugging.DebugDataPublisher } var _ component.Component = (*Component)(nil) +var _ component.LiveDebugging = (*Component)(nil) // New creates a new discovery.relabel component. func New(o component.Options, args Arguments) (*Component, error) { - c := &Component{opts: o} + debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName) + if err != nil { + return nil, err + } + c := &Component{ + opts: o, + debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), + } // Call to Update() to set the output once at the start if err := c.Update(args); err != nil { @@ -81,9 +93,13 @@ func (c *Component) Update(args component.Arguments) error { for _, t := range newArgs.Targets { lset := componentMapToPromLabels(t) - lset, keep := relabel.Process(lset, relabelConfigs...) + relabelled, keep := relabel.Process(lset, relabelConfigs...) if keep { - targets = append(targets, promLabelsToComponent(lset)) + targets = append(targets, promLabelsToComponent(relabelled)) + } + componentID := livedebugging.ComponentID(c.opts.ID) + if c.debugDataPublisher.IsActive(componentID) { + c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lset.String(), relabelled.String())) } } @@ -95,6 +111,8 @@ func (c *Component) Update(args component.Arguments) error { return nil } +func (c *Component) LiveDebugging(_ int) {} + func componentMapToPromLabels(ls discovery.Target) labels.Labels { res := make([]labels.Label, 0, len(ls)) for k, v := range ls { diff --git a/internal/component/otelcol/auth/auth.go b/internal/component/otelcol/auth/auth.go index 6f2ba6e585..a1898ab993 100644 --- a/internal/component/otelcol/auth/auth.go +++ b/internal/component/otelcol/auth/auth.go @@ -304,7 +304,7 @@ func (a *Auth) Update(args component.Arguments) error { ) // Schedule the components to run once our component is running. - a.sched.Schedule(host, components...) + a.sched.Schedule(a.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index bb9b3a151e..643730000f 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -214,9 +214,12 @@ func (p *Connector) Update(args component.Arguments) error { return errors.New("unsupported connector type") } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + } + // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) - p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 63e0f1cdd0..4f19b5a625 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -253,9 +253,12 @@ func (e *Exporter) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + } + // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) - e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + e.sched.Schedule(e.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/extension/extension.go b/internal/component/otelcol/extension/extension.go index 1e494e71c1..7eca6e7349 100644 --- a/internal/component/otelcol/extension/extension.go +++ b/internal/component/otelcol/extension/extension.go @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) + e.sched.Schedule(e.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 2c731616a5..fbf70e22c0 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -37,9 +37,7 @@ type Scheduler struct { schedMut sync.Mutex schedComponents []otelcomponent.Component // Most recently created components host otelcomponent.Host - - // newComponentsCh is written to when schedComponents gets updated. - newComponentsCh chan struct{} + running bool // onPause is called when scheduler is making changes to running components. onPause func() @@ -51,89 +49,102 @@ type Scheduler struct { // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: func() {}, - onResume: func() {}, + log: l, + onPause: func() {}, + onResume: func() {}, } } -// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to -// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running -// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it -// will call onResume as a last step. +// NewWithPauseCallbacks is like New, but allows to specify onPause() and onResume() callbacks. +// The callbacks are a useful way of pausing and resuming the ingestion of data by the components: +// * onPause() is called before the scheduler stops the components. +// * onResume() is called after the scheduler starts the components. +// The callbacks are used by the Schedule() and Run() functions. +// The scheduler is assumed to start paused; Schedule() won't call onPause() if Run() was never ran. func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: onPause, - onResume: onResume, + log: l, + onPause: onPause, + onResume: onResume, } } -// Schedule schedules a new set of OpenTelemetry Components to run. Components -// will only be scheduled when the Scheduler is running. +// Schedule a new set of OpenTelemetry Components to run. +// Components will only be started when the Scheduler's Run() function has been called. +// +// Schedule() completely overrides the set of previously running components. +// Components which have been removed since the last call to Schedule will be stopped. // -// Schedule completely overrides the set of previously running components; -// components which have been removed since the last call to Schedule will be -// stopped. -func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) { +// updateConsumers is called after the components are paused and before starting the new components. +// It is expected that this function will set the new set of consumers to the wrapping consumer that's assigned to the Alloy component. +func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() - cs.schedComponents = cc + // If the scheduler isn't running yet, just update the state. + // That way the Run function is ready to go. + if !cs.running { + cs.schedComponents = cc + cs.host = h + updateConsumers() + return + } + + // The new components must be setup in this order: + // + // 1. Pause consumers + // 2. Stop the old components + // 3. Change the consumers + // 4. Start the new components + // 5. Start the consumer + // + // There could be race conditions if the order above is not followed. + + // 1. Pause consumers + // This prevents them from accepting new data while we're shutting them down. + cs.onPause() + + // 2. Stop the old components + cs.stopComponents(ctx, cs.schedComponents...) + + // 3. Change the consumers + // This can only be done after stopping the pervious components and before starting the new ones. + updateConsumers() + + // 4. Start the new components + level.Debug(cs.log).Log("msg", "scheduling otelcol components", "count", len(cs.schedComponents)) + cs.schedComponents = cs.startComponents(ctx, h, cc...) cs.host = h + //TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers? - select { - case cs.newComponentsCh <- struct{}{}: - // Queued new message. - default: - // A message is already queued for refreshing running components so we - // don't have to do anything here. - } + // 5. Start the consumer + // The new components will now start accepting telemetry data. + cs.onResume() } -// Run starts the Scheduler. Run will watch for schedule components to appear -// and run them, terminating previously running components if they exist. +// Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { - firstRun := true - var components []otelcomponent.Component + cs.schedMut.Lock() + cs.running = true + + cs.onPause() + cs.startComponents(ctx, cs.host, cs.schedComponents...) + cs.onResume() + + cs.schedMut.Unlock() // Make sure we terminate all of our running components on shutdown. defer func() { - if !firstRun { // always handle the callbacks correctly - cs.onPause() - } - cs.stopComponents(context.Background(), components...) + cs.schedMut.Lock() + defer cs.schedMut.Unlock() + cs.stopComponents(context.Background(), cs.schedComponents...) + // this Resume call should not be needed but is added for robustness to ensure that + // it does not ever exit in "paused" state. cs.onResume() }() - // Wait for a write to cs.newComponentsCh. The initial list of components is - // always empty so there's nothing to do until cs.newComponentsCh is written - // to. - for { - select { - case <-ctx.Done(): - return nil - case <-cs.newComponentsCh: - if !firstRun { - cs.onPause() // do not pause on first run - } else { - firstRun = false - } - // Stop the old components before running new scheduled ones. - cs.stopComponents(ctx, components...) - - cs.schedMut.Lock() - components = cs.schedComponents - host := cs.host - cs.schedMut.Unlock() - - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components)) - components = cs.startComponents(ctx, host, components...) - cs.onResume() - } - } + <-ctx.Done() + return nil } func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) { diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 469d679b7f..c034e262f8 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -5,10 +5,11 @@ import ( "testing" "time" + "go.uber.org/atomic" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" otelcomponent "go.opentelemetry.io/collector/component" - "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" "github.com/grafana/alloy/internal/runtime/componenttest" @@ -32,7 +33,7 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started trigger once it is // running. component, started, _ := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), func() {}, h, component) require.NoError(t, started.Wait(5*time.Second), "component did not start") }) @@ -52,12 +53,12 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), func() {}, h, component) // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) + cs.Schedule(context.Background(), func() {}, h) require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) @@ -81,26 +82,32 @@ func TestScheduler(t *testing.T) { require.NoError(t, err) }() + toInt := func(a *atomic.Int32) int { return int(a.Load()) } + + // The Run function starts the components. They should be paused and then resumed. + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 1, toInt(pauseCalls), "pause callbacks should be called on run") + assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on run") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) - - toInt := func(a *atomic.Int32) int { return int(a.Load()) } + cs.Schedule(ctx, func() {}, h, component) require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 0, toInt(pauseCalls), "pause callbacks should not be called on first run") - assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on first run") + assert.Equal(t, 2, toInt(pauseCalls), "pause callbacks should be called on schedule") + assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on schedule") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) + cs.Schedule(ctx, func() {}, h) require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 1, toInt(pauseCalls), "pause callback should be called on second run") - assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on second run") + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should be called on second schedule") + assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on second schedule") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") @@ -109,8 +116,8 @@ func TestScheduler(t *testing.T) { cancel() require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown") - assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown") + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should not be called on shutdown") + assert.Equal(t, 4, toInt(resumeCalls), "resume callback should be called on shutdown") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") }) @@ -133,7 +140,7 @@ func TestScheduler(t *testing.T) { // Schedule our component which will notify our trigger when Shutdown gets // called. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(ctx, func() {}, h, component) // Wait for the component to start, and then stop our scheduler, which // should cause our running component to terminate. diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 5072d65233..08a7a6181c 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -237,9 +237,13 @@ func (p *Processor) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + } + // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) - p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) + return nil } diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 80b82efb06..ff4c3c21d1 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - r.sched.Schedule(host, components...) + r.sched.Schedule(r.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index 35a1240273..c17868b427 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" "github.com/grafana/alloy/internal/component" @@ -18,6 +19,7 @@ import ( "github.com/grafana/alloy/internal/component/pyroscope/write" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/util" ) const ( @@ -53,16 +55,21 @@ func (a *Arguments) SetToDefault() { } type Component struct { - opts component.Options - server *fnet.TargetServer - appendables []pyroscope.Appendable - mut sync.Mutex + opts component.Options + server *fnet.TargetServer + uncheckedCollector *util.UncheckedCollector + appendables []pyroscope.Appendable + mut sync.Mutex } func New(opts component.Options, args Arguments) (*Component, error) { + uncheckedCollector := util.NewUncheckedCollector(nil) + opts.Registerer.MustRegister(uncheckedCollector) + c := &Component{ - opts: opts, - appendables: args.ForwardTo, + opts: opts, + uncheckedCollector: uncheckedCollector, + appendables: args.ForwardTo, } if err := c.Update(args); err != nil { @@ -116,7 +123,14 @@ func (c *Component) Update(args component.Arguments) error { c.shutdownServer() - srv, err := fnet.NewTargetServer(c.opts.Logger, "pyroscope_receive_http", c.opts.Registerer, newArgs.Server) + // [server.Server] registers new metrics every time it is created. To + // avoid issues with re-registering metrics with the same name, we create a + // new registry for the server every time we create one, and pass it to an + // unchecked collector to bypass uniqueness checking. + serverRegistry := prometheus.NewRegistry() + c.uncheckedCollector.SetCollector(serverRegistry) + + srv, err := fnet.NewTargetServer(c.opts.Logger, "pyroscope_receive_http", serverRegistry, newArgs.Server) if err != nil { return fmt.Errorf("failed to create server: %w", err) } diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go index 71929abce6..5c507016cb 100644 --- a/internal/component/pyroscope/receive_http/receive_http_test.go +++ b/internal/component/pyroscope/receive_http/receive_http_test.go @@ -287,3 +287,45 @@ func testOptions(t *testing.T) component.Options { Registerer: prometheus.NewRegistry(), } } + +// TestUpdateArgs verifies that the component can be updated with new arguments. This explictly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts. +func TestUpdateArgs(t *testing.T) { + ports, err := freeport.GetFreePorts(2) + require.NoError(t, err) + + forwardTo := []pyroscope.Appendable{testAppendable(nil)} + + args := Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: ports[0], + }, + }, + ForwardTo: forwardTo, + } + + comp, err := New(testOptions(t), args) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + go func() { + require.NoError(t, comp.Run(ctx)) + }() + + waitForServerReady(t, ports[0]) + + comp.Update(Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: ports[1], + }, + }, + ForwardTo: forwardTo, + }) + + waitForServerReady(t, ports[1]) +} diff --git a/internal/runtime/alloy_components.go b/internal/runtime/alloy_components.go index fef0061a83..439d96949d 100644 --- a/internal/runtime/alloy_components.go +++ b/internal/runtime/alloy_components.go @@ -128,6 +128,10 @@ func (f *Runtime) getComponentDetail(cn controller.ComponentNode, graph *dag.Gra componentInfo.DebugInfo = builtinComponent.DebugInfo() } } + + _, liveDebuggingEnabled := componentInfo.Component.(component.LiveDebugging) + componentInfo.LiveDebuggingEnabled = liveDebuggingEnabled + return componentInfo } diff --git a/internal/web/ui/src/features/component/ComponentView.tsx b/internal/web/ui/src/features/component/ComponentView.tsx index 81b81c1d6f..5aa29c1375 100644 --- a/internal/web/ui/src/features/component/ComponentView.tsx +++ b/internal/web/ui/src/features/component/ComponentView.tsx @@ -23,6 +23,7 @@ export const ComponentView: FC = (props) => { const referencedBy = props.component.referencedBy.filter((id) => props.info[id] !== undefined).map((id) => props.info[id]); const referencesTo = props.component.referencesTo.filter((id) => props.info[id] !== undefined).map((id) => props.info[id]); + const liveDebuggingEnabled = props.component.liveDebuggingEnabled; const argsPartition = partitionBody(props.component.arguments, 'Arguments'); const exportsPartition = props.component.exports && partitionBody(props.component.exports, 'Exports'); @@ -47,6 +48,24 @@ export const ComponentView: FC = (props) => { ); } + function liveDebuggingButton(): ReactElement | string { + if (useRemotecfg) { + return 'Live debugging is not yet available for remote components'; + } + + if (!liveDebuggingEnabled) { + return 'Live debugging is not yet available for this component'; + } + + return ( + + ); + } + return (
- {useRemotecfg ? ( - 'Live debugging is not yet available for remote components' - ) : ( - - )} + {liveDebuggingButton()} {props.component.health.message && (
diff --git a/internal/web/ui/src/features/component/types.ts b/internal/web/ui/src/features/component/types.ts index 4678b4ca17..b8e8162ec8 100644 --- a/internal/web/ui/src/features/component/types.ts +++ b/internal/web/ui/src/features/component/types.ts @@ -42,6 +42,11 @@ export interface ComponentInfo { * IDs of components which this component is referencing. */ referencesTo: string[]; + + /** + * Used to indicate if live debugging is available for the component + */ + liveDebuggingEnabled: boolean; } /**