diff --git a/go.mod b/go.mod index 904598f24e..59b7c2f99c 100644 --- a/go.mod +++ b/go.mod @@ -786,7 +786,7 @@ require ( go.opentelemetry.io/contrib/detectors/azure/azurevm v0.0.1 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.28.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 go.opentelemetry.io/contrib/propagators/b3 v1.31.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 // indirect @@ -831,7 +831,6 @@ require ( github.com/ebitengine/purego v0.8.0 // indirect github.com/elastic/lunes v0.1.0 // indirect github.com/moby/sys/userns v0.1.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter v0.112.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.112.0 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect go.opentelemetry.io/collector/connector/connectorprofiles v0.112.0 // indirect @@ -850,7 +849,11 @@ require ( go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.7.0 // indirect ) -require github.com/mackerelio/go-osstat v0.2.5 +require ( + github.com/mackerelio/go-osstat v0.2.5 + github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter v0.112.0 + go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0 +) // NOTE: replace directives below must always be *temporary*. // diff --git a/go.sum b/go.sum index fef3703fb6..4fc85f01c7 100644 --- a/go.sum +++ b/go.sum @@ -2732,6 +2732,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.45 go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.45.0/go.mod h1:LOjFy00/ZMyMYfKFPta6kZe2cDUc1sNo/qtv1pSORWA= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 h1:yMkBS9yViCc7U7yeLzJPM2XizlfdVvBRSmsQDWu6qc0= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0/go.mod h1:n8MR6/liuGB5EmTETUBeU5ZgqMOlqKRxUaqPQBOANZ8= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0 h1:4BZHA+B1wXEQoGNHxW8mURaLhcdGwvRnmhGbm+odRbc= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.56.0/go.mod h1:3qi2EEwMgB4xnKgPLqsDP3j9qxnHDZeHsnAxfjQqTko= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= go.opentelemetry.io/contrib/propagators/b3 v1.31.0 h1:PQPXYscmwbCp76QDvO4hMngF2j8Bx/OTV86laEl8uqo= diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index 35a1240273..4de81140c2 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/alloy/internal/component/pyroscope/write" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" + "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" ) const ( @@ -120,9 +121,14 @@ func (c *Component) Update(args component.Arguments) error { if err != nil { return fmt.Errorf("failed to create server: %w", err) } + c.server = srv return c.server.MountAndRun(func(router *mux.Router) { + router.Use(otelmux.Middleware( + "alloy", + otelmux.WithTracerProvider(c.opts.Tracer), + )) router.HandleFunc("/ingest", c.handleIngest).Methods(http.MethodPost) }) } diff --git a/internal/component/pyroscope/scrape/manager.go b/internal/component/pyroscope/scrape/manager.go index f89a6bb5e6..cdf01a1bf8 100644 --- a/internal/component/pyroscope/scrape/manager.go +++ b/internal/component/pyroscope/scrape/manager.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" config_util "github.com/prometheus/common/config" "github.com/prometheus/prometheus/discovery/targetgroup" + "go.opentelemetry.io/otel/trace" ) var reloadInterval = 5 * time.Second @@ -21,6 +22,7 @@ type Options struct { type Manager struct { logger log.Logger + tracer trace.TracerProvider options Options @@ -35,13 +37,14 @@ type Manager struct { triggerReload chan struct{} } -func NewManager(o Options, appendable pyroscope.Appendable, logger log.Logger) *Manager { +func NewManager(o Options, appendable pyroscope.Appendable, logger log.Logger, tracer trace.TracerProvider) *Manager { if logger == nil { logger = log.NewNopLogger() } return &Manager{ options: o, logger: logger, + tracer: tracer, appendable: appendable, graceShut: make(chan struct{}), triggerReload: make(chan struct{}, 1), @@ -97,7 +100,7 @@ func (m *Manager) reload() { var wg sync.WaitGroup for setName, groups := range m.targetSets { if _, ok := m.targetsGroups[setName]; !ok { - sp, err := newScrapePool(m.options.HTTPClientOptions, m.config, m.appendable, log.With(m.logger, "scrape_pool", setName)) + sp, err := newScrapePool(m.options.HTTPClientOptions, m.config, m.appendable, log.With(m.logger, "scrape_pool", setName), m.tracer) if err != nil { level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName) continue diff --git a/internal/component/pyroscope/scrape/manager_test.go b/internal/component/pyroscope/scrape/manager_test.go index ef87515a83..a3197a1008 100644 --- a/internal/component/pyroscope/scrape/manager_test.go +++ b/internal/component/pyroscope/scrape/manager_test.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/goleak" ) @@ -21,7 +22,7 @@ func TestManager(t *testing.T) { m := NewManager(Options{}, pyroscope.AppendableFunc(func(ctx context.Context, labels labels.Labels, samples []*pyroscope.RawSample) error { return nil - }), util.TestLogger(t)) + }), util.TestLogger(t), noop.NewTracerProvider()) defer m.Stop() targetSetsChan := make(chan map[string][]*targetgroup.Group) diff --git a/internal/component/pyroscope/scrape/scrape.go b/internal/component/pyroscope/scrape/scrape.go index 301ac86053..da0e32c1e6 100644 --- a/internal/component/pyroscope/scrape/scrape.go +++ b/internal/component/pyroscope/scrape/scrape.go @@ -275,7 +275,7 @@ func New(o component.Options, args Arguments) (*Component, error) { config_util.WithDialContextFunc(httpData.DialFunc), }, } - scraper := NewManager(scrapeHttpOptions, alloyAppendable, o.Logger) + scraper := NewManager(scrapeHttpOptions, alloyAppendable, o.Logger, o.Tracer) c := &Component{ opts: o, cluster: clusterData, diff --git a/internal/component/pyroscope/scrape/scrape_loop.go b/internal/component/pyroscope/scrape/scrape_loop.go index c68cde74e9..db90fd4798 100644 --- a/internal/component/pyroscope/scrape/scrape_loop.go +++ b/internal/component/pyroscope/scrape/scrape_loop.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "net/http/httptrace" "reflect" "sync" "time" @@ -17,6 +18,9 @@ import ( commonconfig "github.com/prometheus/common/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/pool" + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/trace" "golang.org/x/net/context/ctxhttp" ) @@ -29,6 +33,7 @@ type scrapePool struct { config Arguments logger log.Logger + tracer trace.TracerProvider scrapeClient *http.Client appendable pyroscope.Appendable @@ -37,15 +42,24 @@ type scrapePool struct { droppedTargets []*Target } -func newScrapePool(hco []commonconfig.HTTPClientOption, cfg Arguments, appendable pyroscope.Appendable, logger log.Logger) (*scrapePool, error) { +func newScrapePool(hco []commonconfig.HTTPClientOption, cfg Arguments, appendable pyroscope.Appendable, logger log.Logger, tracer trace.TracerProvider) (*scrapePool, error) { scrapeClient, err := commonconfig.NewClientFromConfig(*cfg.HTTPClientConfig.Convert(), cfg.JobName, hco...) if err != nil { return nil, err } + scrapeClient.Transport = otelhttp.NewTransport( + scrapeClient.Transport, + otelhttp.WithTracerProvider(tracer), + otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) + }), + ) + return &scrapePool{ config: cfg, logger: logger, + tracer: tracer, scrapeClient: scrapeClient, appendable: appendable, activeTargets: map[uint64]*scrapeLoop{}, @@ -75,7 +89,7 @@ func (tg *scrapePool) sync(groups []*targetgroup.Group) { for _, t := range actives { if _, ok := tg.activeTargets[t.Hash()]; !ok { - loop := newScrapeLoop(t, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger) + loop := newScrapeLoop(t, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger, tg.tracer) tg.activeTargets[t.Hash()] = loop loop.start() } else { @@ -113,11 +127,20 @@ func (tg *scrapePool) reload(cfg Arguments) error { if err != nil { return err } + + scrapeClient.Transport = otelhttp.NewTransport( + scrapeClient.Transport, + otelhttp.WithTracerProvider(tg.tracer), + otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) + }), + ) + tg.scrapeClient = scrapeClient for hash, t := range tg.activeTargets { // restart the loop with the new configuration t.stop(false) - loop := newScrapeLoop(t.Target, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger) + loop := newScrapeLoop(t.Target, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger, tg.tracer) tg.activeTargets[hash] = loop loop.start() } @@ -167,13 +190,14 @@ type scrapeLoop struct { req *http.Request logger log.Logger + tracer trace.TracerProvider interval, timeout time.Duration graceShut chan struct{} once sync.Once wg sync.WaitGroup } -func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Appendable, interval, timeout time.Duration, logger log.Logger) *scrapeLoop { +func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Appendable, interval, timeout time.Duration, logger log.Logger, tracer trace.TracerProvider) *scrapeLoop { // if the URL parameter have a seconds parameter, then the collection will // take at least scrape_duration - 1 second, as the HTTP request will block // until the profile is collected. @@ -184,6 +208,7 @@ func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Ap return &scrapeLoop{ Target: t, logger: logger, + tracer: tracer, scrapeClient: scrapeClient, appender: NewDeltaAppender(appendable.Appender(), t.allLabels), interval: interval, @@ -228,6 +253,9 @@ func (t *scrapeLoop) scrape() { ) defer cancel() + scrapeCtx, span := t.tracer.Tracer("").Start(scrapeCtx, "scrape") + defer span.End() + for _, l := range t.allLabels { if l.Name == ProfileName { profileType = l.Value @@ -244,7 +272,7 @@ func (t *scrapeLoop) scrape() { if len(b) > 0 { t.lastScrapeSize = len(b) } - if err := t.appender.Append(context.Background(), t.allLabels, []*pyroscope.RawSample{{RawProfile: b}}); err != nil { + if err := t.appender.Append(context.WithoutCancel(scrapeCtx), t.allLabels, []*pyroscope.RawSample{{RawProfile: b}}); err != nil { level.Error(t.logger).Log("msg", "push failed", "labels", t.Labels().String(), "err", err) t.updateTargetStatus(start, err) return diff --git a/internal/component/pyroscope/scrape/scrape_loop_test.go b/internal/component/pyroscope/scrape/scrape_loop_test.go index 3fdf7969aa..b9764486d2 100644 --- a/internal/component/pyroscope/scrape/scrape_loop_test.go +++ b/internal/component/pyroscope/scrape/scrape_loop_test.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/atomic" "go.uber.org/goleak" ) @@ -38,7 +39,8 @@ func TestScrapePool(t *testing.T) { func(ctx context.Context, labels labels.Labels, samples []*pyroscope.RawSample) error { return nil }), - util.TestLogger(t)) + util.TestLogger(t), + noop.NewTracerProvider()) require.NoError(t, err) defer p.stop() @@ -192,7 +194,7 @@ func TestScrapeLoop(t *testing.T) { require.Equal(t, []byte("ok"), samples[0].RawProfile) return nil }), - 200*time.Millisecond, 30*time.Second, util.TestLogger(t)) + 200*time.Millisecond, 30*time.Second, util.TestLogger(t), noop.NewTracerProvider()) defer loop.stop(true) require.Equal(t, HealthUnknown, loop.Health()) @@ -218,7 +220,8 @@ func BenchmarkSync(b *testing.B) { func(ctx context.Context, labels labels.Labels, samples []*pyroscope.RawSample) error { return nil }), - log.NewNopLogger()) + log.NewNopLogger(), + noop.NewTracerProvider()) require.NoError(b, err) groups1 := []*targetgroup.Group{ { diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 157287f539..665e7064bc 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "net/http/httptrace" "net/url" "path" "strings" @@ -16,6 +17,8 @@ import ( commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.uber.org/multierr" "golang.org/x/sync/errgroup" @@ -174,10 +177,20 @@ func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fan } endpoint.Headers[alloyseed.LegacyHeaderName] = uid endpoint.Headers[alloyseed.HeaderName] = uid + client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) if err != nil { return nil, err } + + client.Transport = otelhttp.NewTransport( + client.Transport, + otelhttp.WithTracerProvider(opts.Tracer), + otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans()) + }), + ) + clients = append(clients, pushv1connect.NewPusherServiceClient(client, endpoint.URL, WithUserAgent(userAgent))) if httpClient == nil { httpClient = client