From 61b50105c0d7ca5fdd78baa1fa564ca4cd719b7c Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Thu, 28 Nov 2024 15:04:47 +0100 Subject: [PATCH 1/4] Implement tracing for pyroscope components --- go.mod | 9 ++++--- go.sum | 2 ++ .../pyroscope/receive_http/receive_http.go | 6 +++++ .../component/pyroscope/scrape/manager.go | 7 +++-- .../pyroscope/scrape/manager_test.go | 3 ++- internal/component/pyroscope/scrape/scrape.go | 2 +- .../component/pyroscope/scrape/scrape_loop.go | 26 +++++++++++++++---- .../pyroscope/scrape/scrape_loop_test.go | 9 ++++--- internal/component/pyroscope/write/write.go | 7 +++++ 9 files changed, 56 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 904598f24e..59b7c2f99c 100644 --- a/go.mod +++ b/go.mod @@ -786,7 +786,7 @@ require ( go.opentelemetry.io/contrib/detectors/azure/azurevm v0.0.1 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.28.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 go.opentelemetry.io/contrib/propagators/b3 v1.31.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 // indirect @@ -831,7 +831,6 @@ require ( github.com/ebitengine/purego v0.8.0 // indirect github.com/elastic/lunes v0.1.0 // indirect github.com/moby/sys/userns v0.1.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter v0.112.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.112.0 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect go.opentelemetry.io/collector/connector/connectorprofiles v0.112.0 // indirect @@ -850,7 +849,11 @@ require ( go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.7.0 // indirect ) -require github.com/mackerelio/go-osstat v0.2.5 +require ( + github.com/mackerelio/go-osstat v0.2.5 + github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter v0.112.0 + go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0 +) // NOTE: replace directives below must always be *temporary*. // diff --git a/go.sum b/go.sum index fef3703fb6..4fc85f01c7 100644 --- a/go.sum +++ b/go.sum @@ -2732,6 +2732,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.45 go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.45.0/go.mod h1:LOjFy00/ZMyMYfKFPta6kZe2cDUc1sNo/qtv1pSORWA= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 h1:yMkBS9yViCc7U7yeLzJPM2XizlfdVvBRSmsQDWu6qc0= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0/go.mod h1:n8MR6/liuGB5EmTETUBeU5ZgqMOlqKRxUaqPQBOANZ8= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0 h1:4BZHA+B1wXEQoGNHxW8mURaLhcdGwvRnmhGbm+odRbc= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0/go.mod h1:3qi2EEwMgB4xnKgPLqsDP3j9qxnHDZeHsnAxfjQqTko= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= go.opentelemetry.io/contrib/propagators/b3 v1.31.0 h1:PQPXYscmwbCp76QDvO4hMngF2j8Bx/OTV86laEl8uqo= diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index 35a1240273..4de81140c2 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/alloy/internal/component/pyroscope/write" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" + "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" ) const ( @@ -120,9 +121,14 @@ func (c *Component) Update(args component.Arguments) error { if err != nil { return fmt.Errorf("failed to create server: %w", err) } + c.server = srv return c.server.MountAndRun(func(router *mux.Router) { + router.Use(otelmux.Middleware( + "alloy", + otelmux.WithTracerProvider(c.opts.Tracer), + )) router.HandleFunc("/ingest", c.handleIngest).Methods(http.MethodPost) }) } diff --git a/internal/component/pyroscope/scrape/manager.go b/internal/component/pyroscope/scrape/manager.go index f89a6bb5e6..cdf01a1bf8 100644 --- a/internal/component/pyroscope/scrape/manager.go +++ b/internal/component/pyroscope/scrape/manager.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" config_util "github.com/prometheus/common/config" "github.com/prometheus/prometheus/discovery/targetgroup" + "go.opentelemetry.io/otel/trace" ) var reloadInterval = 5 * time.Second @@ -21,6 +22,7 @@ type Options struct { type Manager struct { logger log.Logger + tracer trace.TracerProvider options Options @@ -35,13 +37,14 @@ type Manager struct { triggerReload chan struct{} } -func NewManager(o Options, appendable pyroscope.Appendable, logger log.Logger) *Manager { +func NewManager(o Options, appendable pyroscope.Appendable, logger log.Logger, tracer trace.TracerProvider) *Manager { if logger == nil { logger = log.NewNopLogger() } return &Manager{ options: o, logger: logger, + tracer: tracer, appendable: appendable, graceShut: make(chan struct{}), triggerReload: make(chan struct{}, 1), @@ -97,7 +100,7 @@ func (m *Manager) reload() { var wg sync.WaitGroup for setName, groups := range m.targetSets { if _, ok := m.targetsGroups[setName]; !ok { - sp, err := newScrapePool(m.options.HTTPClientOptions, m.config, m.appendable, log.With(m.logger, "scrape_pool", setName)) + sp, err := newScrapePool(m.options.HTTPClientOptions, m.config, m.appendable, log.With(m.logger, "scrape_pool", setName), m.tracer) if err != nil { level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName) continue diff --git a/internal/component/pyroscope/scrape/manager_test.go b/internal/component/pyroscope/scrape/manager_test.go index ef87515a83..a3197a1008 100644 --- a/internal/component/pyroscope/scrape/manager_test.go +++ b/internal/component/pyroscope/scrape/manager_test.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/goleak" ) @@ -21,7 +22,7 @@ func TestManager(t *testing.T) { m := NewManager(Options{}, pyroscope.AppendableFunc(func(ctx context.Context, labels labels.Labels, samples []*pyroscope.RawSample) error { return nil - }), util.TestLogger(t)) + }), util.TestLogger(t), noop.NewTracerProvider()) defer m.Stop() targetSetsChan := make(chan map[string][]*targetgroup.Group) diff --git a/internal/component/pyroscope/scrape/scrape.go b/internal/component/pyroscope/scrape/scrape.go index 301ac86053..da0e32c1e6 100644 --- a/internal/component/pyroscope/scrape/scrape.go +++ b/internal/component/pyroscope/scrape/scrape.go @@ -275,7 +275,7 @@ func New(o component.Options, args Arguments) (*Component, error) { config_util.WithDialContextFunc(httpData.DialFunc), }, } - scraper := NewManager(scrapeHttpOptions, alloyAppendable, o.Logger) + scraper := NewManager(scrapeHttpOptions, alloyAppendable, o.Logger, o.Tracer) c := &Component{ opts: o, cluster: clusterData, diff --git a/internal/component/pyroscope/scrape/scrape_loop.go b/internal/component/pyroscope/scrape/scrape_loop.go index c68cde74e9..938ab7b4a8 100644 --- a/internal/component/pyroscope/scrape/scrape_loop.go +++ b/internal/component/pyroscope/scrape/scrape_loop.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "net/http/httptrace" "reflect" "sync" "time" @@ -17,6 +18,9 @@ import ( commonconfig "github.com/prometheus/common/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/pool" + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/trace" "golang.org/x/net/context/ctxhttp" ) @@ -29,6 +33,7 @@ type scrapePool struct { config Arguments logger log.Logger + tracer trace.TracerProvider scrapeClient *http.Client appendable pyroscope.Appendable @@ -37,15 +42,21 @@ type scrapePool struct { droppedTargets []*Target } -func newScrapePool(hco []commonconfig.HTTPClientOption, cfg Arguments, appendable pyroscope.Appendable, logger log.Logger) (*scrapePool, error) { +func newScrapePool(hco []commonconfig.HTTPClientOption, cfg Arguments, appendable pyroscope.Appendable, logger log.Logger, tracer trace.TracerProvider) (*scrapePool, error) { scrapeClient, err := commonconfig.NewClientFromConfig(*cfg.HTTPClientConfig.Convert(), cfg.JobName, hco...) if err != nil { return nil, err } + scrapeClient.Transport = otelhttp.NewTransport(scrapeClient.Transport, otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) + }), + ) + return &scrapePool{ config: cfg, logger: logger, + tracer: tracer, scrapeClient: scrapeClient, appendable: appendable, activeTargets: map[uint64]*scrapeLoop{}, @@ -75,7 +86,7 @@ func (tg *scrapePool) sync(groups []*targetgroup.Group) { for _, t := range actives { if _, ok := tg.activeTargets[t.Hash()]; !ok { - loop := newScrapeLoop(t, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger) + loop := newScrapeLoop(t, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger, tg.tracer) tg.activeTargets[t.Hash()] = loop loop.start() } else { @@ -117,7 +128,7 @@ func (tg *scrapePool) reload(cfg Arguments) error { for hash, t := range tg.activeTargets { // restart the loop with the new configuration t.stop(false) - loop := newScrapeLoop(t.Target, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger) + loop := newScrapeLoop(t.Target, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger, tg.tracer) tg.activeTargets[hash] = loop loop.start() } @@ -167,13 +178,14 @@ type scrapeLoop struct { req *http.Request logger log.Logger + tracer trace.TracerProvider interval, timeout time.Duration graceShut chan struct{} once sync.Once wg sync.WaitGroup } -func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Appendable, interval, timeout time.Duration, logger log.Logger) *scrapeLoop { +func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Appendable, interval, timeout time.Duration, logger log.Logger, tracer trace.TracerProvider) *scrapeLoop { // if the URL parameter have a seconds parameter, then the collection will // take at least scrape_duration - 1 second, as the HTTP request will block // until the profile is collected. @@ -184,6 +196,7 @@ func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Ap return &scrapeLoop{ Target: t, logger: logger, + tracer: tracer, scrapeClient: scrapeClient, appender: NewDeltaAppender(appendable.Appender(), t.allLabels), interval: interval, @@ -228,6 +241,9 @@ func (t *scrapeLoop) scrape() { ) defer cancel() + scrapeCtx, span := t.tracer.Tracer("").Start(scrapeCtx, "scrape") + defer span.End() + for _, l := range t.allLabels { if l.Name == ProfileName { profileType = l.Value @@ -244,7 +260,7 @@ func (t *scrapeLoop) scrape() { if len(b) > 0 { t.lastScrapeSize = len(b) } - if err := t.appender.Append(context.Background(), t.allLabels, []*pyroscope.RawSample{{RawProfile: b}}); err != nil { + if err := t.appender.Append(context.WithoutCancel(scrapeCtx), t.allLabels, []*pyroscope.RawSample{{RawProfile: b}}); err != nil { level.Error(t.logger).Log("msg", "push failed", "labels", t.Labels().String(), "err", err) t.updateTargetStatus(start, err) return diff --git a/internal/component/pyroscope/scrape/scrape_loop_test.go b/internal/component/pyroscope/scrape/scrape_loop_test.go index 3fdf7969aa..b9764486d2 100644 --- a/internal/component/pyroscope/scrape/scrape_loop_test.go +++ b/internal/component/pyroscope/scrape/scrape_loop_test.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/atomic" "go.uber.org/goleak" ) @@ -38,7 +39,8 @@ func TestScrapePool(t *testing.T) { func(ctx context.Context, labels labels.Labels, samples []*pyroscope.RawSample) error { return nil }), - util.TestLogger(t)) + util.TestLogger(t), + noop.NewTracerProvider()) require.NoError(t, err) defer p.stop() @@ -192,7 +194,7 @@ func TestScrapeLoop(t *testing.T) { require.Equal(t, []byte("ok"), samples[0].RawProfile) return nil }), - 200*time.Millisecond, 30*time.Second, util.TestLogger(t)) + 200*time.Millisecond, 30*time.Second, util.TestLogger(t), noop.NewTracerProvider()) defer loop.stop(true) require.Equal(t, HealthUnknown, loop.Health()) @@ -218,7 +220,8 @@ func BenchmarkSync(b *testing.B) { func(ctx context.Context, labels labels.Labels, samples []*pyroscope.RawSample) error { return nil }), - log.NewNopLogger()) + log.NewNopLogger(), + noop.NewTracerProvider()) require.NoError(b, err) groups1 := []*targetgroup.Group{ { diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 157287f539..6f26578bd2 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "net/http/httptrace" "net/url" "path" "strings" @@ -16,6 +17,8 @@ import ( commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.uber.org/multierr" "golang.org/x/sync/errgroup" @@ -175,6 +178,10 @@ func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fan endpoint.Headers[alloyseed.LegacyHeaderName] = uid endpoint.Headers[alloyseed.HeaderName] = uid client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) + client.Transport = otelhttp.NewTransport(client.Transport, otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) + }), + ) if err != nil { return nil, err } From 4681d64d7f115b7964512d29321fa777daa952a2 Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Thu, 28 Nov 2024 15:19:27 +0100 Subject: [PATCH 2/4] Use configured tracerprovider for otelhttp --- internal/component/pyroscope/scrape/scrape_loop.go | 9 ++++++--- internal/component/pyroscope/write/write.go | 9 ++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/internal/component/pyroscope/scrape/scrape_loop.go b/internal/component/pyroscope/scrape/scrape_loop.go index 938ab7b4a8..a61abfac7a 100644 --- a/internal/component/pyroscope/scrape/scrape_loop.go +++ b/internal/component/pyroscope/scrape/scrape_loop.go @@ -48,9 +48,12 @@ func newScrapePool(hco []commonconfig.HTTPClientOption, cfg Arguments, appendabl return nil, err } - scrapeClient.Transport = otelhttp.NewTransport(scrapeClient.Transport, otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { - return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) - }), + scrapeClient.Transport = otelhttp.NewTransport( + scrapeClient.Transport, + otelhttp.WithTracerProvider(tracer), + otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) + }), ) return &scrapePool{ diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 6f26578bd2..5e57c94663 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -178,9 +178,12 @@ func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fan endpoint.Headers[alloyseed.LegacyHeaderName] = uid endpoint.Headers[alloyseed.HeaderName] = uid client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) - client.Transport = otelhttp.NewTransport(client.Transport, otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { - return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) - }), + client.Transport = otelhttp.NewTransport( + client.Transport, + otelhttp.WithTracerProvider(opts.Tracer), + otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) + }), ) if err != nil { return nil, err From 3d0a7397df03e0f7287edd7fb6d8fa1b20a0e013 Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Thu, 28 Nov 2024 16:34:33 +0100 Subject: [PATCH 3/4] Setup tracing after reload of the scrape component --- internal/component/pyroscope/scrape/scrape_loop.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/internal/component/pyroscope/scrape/scrape_loop.go b/internal/component/pyroscope/scrape/scrape_loop.go index a61abfac7a..db90fd4798 100644 --- a/internal/component/pyroscope/scrape/scrape_loop.go +++ b/internal/component/pyroscope/scrape/scrape_loop.go @@ -127,6 +127,15 @@ func (tg *scrapePool) reload(cfg Arguments) error { if err != nil { return err } + + scrapeClient.Transport = otelhttp.NewTransport( + scrapeClient.Transport, + otelhttp.WithTracerProvider(tg.tracer), + otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) + }), + ) + tg.scrapeClient = scrapeClient for hash, t := range tg.activeTargets { // restart the loop with the new configuration From f9125ab5cac4b955a7f532351ed9b01558854c88 Mon Sep 17 00:00:00 2001 From: Gergely Madarasz Date: Fri, 29 Nov 2024 17:07:08 +0100 Subject: [PATCH 4/4] Fix error handling --- internal/component/pyroscope/write/write.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 5e57c94663..665e7064bc 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -177,7 +177,12 @@ func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fan } endpoint.Headers[alloyseed.LegacyHeaderName] = uid endpoint.Headers[alloyseed.HeaderName] = uid + client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) + if err != nil { + return nil, err + } + client.Transport = otelhttp.NewTransport( client.Transport, otelhttp.WithTracerProvider(opts.Tracer), @@ -185,9 +190,7 @@ func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fan return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) }), ) - if err != nil { - return nil, err - } + clients = append(clients, pushv1connect.NewPusherServiceClient(client, endpoint.URL, WithUserAgent(userAgent))) if httpClient == nil { httpClient = client