diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c0e15ca9c..ff26cd53ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Main (unreleased) ### Features - Add the function `path_join` to the stdlib. (@wildum) +- Add `pyroscope.receive_http` component to receive and forward Pyroscope profiles (@marcsanmi) - Add support to `loki.source.syslog` for the RFC3164 format ("BSD syslog"). (@sushain97) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index ba37ddfa4f..16a298ba66 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -395,6 +395,7 @@ The following components, grouped by namespace, _consume_ Pyroscope `ProfilesRec {{< collapse title="pyroscope" >}} - [pyroscope.ebpf](../components/pyroscope/pyroscope.ebpf) - [pyroscope.java](../components/pyroscope/pyroscope.java) +- [pyroscope.receive_http](../components/pyroscope/pyroscope.receive_http) - [pyroscope.scrape](../components/pyroscope/pyroscope.scrape) {{< /collapse >}} diff --git a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md new file mode 100644 index 0000000000..3b34f393a9 --- /dev/null +++ b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md @@ -0,0 +1,119 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/pyroscope/pyroscope.receive_http/ +description: Learn about pyroscope.receive_http +title: pyroscope.receive_http +--- + +# pyroscope.receive_http + +`pyroscope.receive_http` receives profiles over HTTP and forwards them to `pyroscope.*` components capable of receiving profiles. + +The HTTP API exposed is compatible with the Pyroscope [HTTP ingest API](https://grafana.com/docs/pyroscope/latest/configure-server/about-server-api/). +This allows `pyroscope.receive_http` to act as a proxy for Pyroscope profiles, enabling flexible routing and distribution of profile data. + +## Usage + +```alloy +pyroscope.receive_http "LABEL" { + http { + listen_address = "LISTEN_ADDRESS" + listen_port = PORT + } + forward_to = RECEIVER_LIST +} +``` + +The component will start an HTTP server supporting the following endpoint. + +* `POST /ingest` - send profiles to the component, which will be forwarded to the receivers as configured in the `forward_to argument`. The request format must match the format of the Pyroscope ingest API. + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +------------------|---------------|-------------------------------------------------|---------|--------- +`forward_to` | `list(ProfilesReceiver)` | List of receivers to send profiles to. | | yes + +## Blocks + +The following blocks are supported inside the definition of `pyroscope.receive_http`: + +Hierarchy | Name | Description | Required +----------|------|----------------------------------------------------|--------- +`http` | `http` | Configures the HTTP server that receives requests. | no + +### http + +The `http` block configures the HTTP server. + +You can use the following arguments to configure the `http` block. Any omitted fields take their default values. + +Name | Type | Description | Default | Required +-----------------------|------------|------------------------------------------------------------------------------------------------------------------|----------|--------- +`conn_limit` | `int` | Maximum number of simultaneous HTTP connections. Defaults to 100. | `0` | no +`listen_address` | `string` | Network address on which the server listens for new connections. Defaults to accepting all incoming connections. | `""` | no +`listen_port` | `int` | Port number on which the server listens for new connections. | `8080` | no +`server_idle_timeout` | `duration` | Idle timeout for the HTTP server. | `"120s"` | no +`server_read_timeout` | `duration` | Read timeout for the HTTP server. | `"30s"` | no +`server_write_timeout` | `duration` | Write timeout for the HTTP server. | `"30s"` | no + +## Exported fields + +`pyroscope.receive_http` does not export any fields. + +## Component health + +`pyroscope.receive_http` is reported as unhealthy if it is given an invalid configuration. + +## Example + +This example creates a `pyroscope.receive_http` component, which starts an HTTP server listening on `0.0.0.0` and port `9999`. +The server receives profiles and forwards them to multiple `pyroscope.write` components, which write these profiles to different HTTP endpoints. +```alloy +// Receives profiles over HTTP +pyroscope.receive_http "default" { + http { + listen_address = "0.0.0.0" + listen_port = 9999 + } + forward_to = [pyroscope.write.staging.receiver, pyroscope.write.production.receiver] +} + +// Send profiles to a staging Pyroscope instance +pyroscope.write "staging" { + endpoint { + url = "http://pyroscope-staging:4040" + } +} + +// Send profiles to a production Pyroscope instance +pyroscope.write "production" { + endpoint { + url = "http://pyroscope-production:4040" + } +} +``` + +{{< admonition type="note" >}} +This example demonstrates forwarding to multiple `pyroscope.write` components. +This configuration will duplicate the received profiles and send a copy to each configured `pyroscope.write` component. +{{< /admonition >}} + +You can also create multiple `pyroscope.receive_http` components with different configurations to listen on different addresses or ports as needed. This flexibility allows you to design a setup that best fits your infrastructure and profile routing requirements. + + + +## Compatible components + +`pyroscope.receive_http` can accept arguments from the following components: + +- Components that export [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-exporters) + + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 3db04d7a34..1ff33335ce 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -140,6 +140,7 @@ import ( _ "github.com/grafana/alloy/internal/component/prometheus/write/queue" // Import prometheus.write.queue _ "github.com/grafana/alloy/internal/component/pyroscope/ebpf" // Import pyroscope.ebpf _ "github.com/grafana/alloy/internal/component/pyroscope/java" // Import pyroscope.java + _ "github.com/grafana/alloy/internal/component/pyroscope/receive_http" // Import pyroscope.receive_http _ "github.com/grafana/alloy/internal/component/pyroscope/scrape" // Import pyroscope.scrape _ "github.com/grafana/alloy/internal/component/pyroscope/write" // Import pyroscope.write _ "github.com/grafana/alloy/internal/component/remote/http" // Import remote.http diff --git a/internal/component/pyroscope/appender.go b/internal/component/pyroscope/appender.go index 41c2cca164..a856e80b0b 100644 --- a/internal/component/pyroscope/appender.go +++ b/internal/component/pyroscope/appender.go @@ -2,6 +2,9 @@ package pyroscope import ( "context" + "io" + "net/http" + "net/url" "sync" "time" @@ -22,6 +25,7 @@ type Appendable interface { type Appender interface { Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error + AppendIngest(ctx context.Context, profile *IncomingProfile) error } type RawSample struct { @@ -29,6 +33,12 @@ type RawSample struct { RawProfile []byte } +type IncomingProfile struct { + Body io.ReadCloser + Headers http.Header + URL *url.URL +} + var _ Appendable = (*Fanout)(nil) // Fanout supports the default Alloy style of appendables since it can go to multiple outputs. It also allows the intercepting of appends. @@ -112,12 +122,33 @@ func (a *appender) Append(ctx context.Context, labels labels.Labels, samples []* return multiErr } +// AppendIngest satisfies the AppenderIngest interface. +func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) error { + now := time.Now() + defer func() { + a.writeLatency.Observe(time.Since(now).Seconds()) + }() + var multiErr error + for _, x := range a.children { + err := x.AppendIngest(ctx, profile) + if err != nil { + multiErr = multierror.Append(multiErr, err) + } + } + return multiErr +} + type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error +func (f AppendableFunc) Appender() Appender { + return f +} + func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error { return f(ctx, labels, samples) } -func (f AppendableFunc) Appender() Appender { - return f +func (f AppendableFunc) AppendIngest(_ context.Context, _ *IncomingProfile) error { + // This is a no-op implementation + return nil } diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go new file mode 100644 index 0000000000..35a1240273 --- /dev/null +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -0,0 +1,199 @@ +package receive_http + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "reflect" + "sync" + + "github.com/gorilla/mux" + "golang.org/x/sync/errgroup" + + "github.com/grafana/alloy/internal/component" + fnet "github.com/grafana/alloy/internal/component/common/net" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/component/pyroscope/write" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +const ( + // defaultMaxConnLimit defines the maximum number of simultaneous HTTP connections + defaultMaxConnLimit = 100 +) + +func init() { + component.Register(component.Registration{ + Name: "pyroscope.receive_http", + Stability: featuregate.StabilityPublicPreview, + Args: Arguments{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +type Arguments struct { + Server *fnet.ServerConfig `alloy:",squash"` + ForwardTo []pyroscope.Appendable `alloy:"forward_to,attr"` +} + +// SetToDefault implements syntax.Defaulter. +func (a *Arguments) SetToDefault() { + serverConfig := fnet.DefaultServerConfig() + if serverConfig.HTTP.ConnLimit == 0 { + serverConfig.HTTP.ConnLimit = defaultMaxConnLimit + } + *a = Arguments{ + Server: serverConfig, + } +} + +type Component struct { + opts component.Options + server *fnet.TargetServer + appendables []pyroscope.Appendable + mut sync.Mutex +} + +func New(opts component.Options, args Arguments) (*Component, error) { + c := &Component{ + opts: opts, + appendables: args.ForwardTo, + } + + if err := c.Update(args); err != nil { + return nil, err + } + + return c, nil +} + +func (c *Component) Run(ctx context.Context) error { + defer func() { + c.mut.Lock() + defer c.mut.Unlock() + c.shutdownServer() + }() + + <-ctx.Done() + level.Info(c.opts.Logger).Log("msg", "terminating due to context done") + return nil +} + +func (c *Component) Update(args component.Arguments) error { + newArgs := args.(Arguments) + + c.mut.Lock() + defer c.mut.Unlock() + + c.appendables = newArgs.ForwardTo + + // if no server config provided, we'll use defaults + if newArgs.Server == nil { + newArgs.Server = fnet.DefaultServerConfig() + } + + // Only apply default max connections limit if using default config + if newArgs.Server.HTTP.ConnLimit == 0 { + newArgs.Server.HTTP.ConnLimit = defaultMaxConnLimit + } + + if newArgs.Server.HTTP == nil { + newArgs.Server.HTTP = &fnet.HTTPConfig{ + ListenPort: 0, + ListenAddress: "127.0.0.1", + } + } + + serverNeedsRestarting := c.server == nil || !reflect.DeepEqual(c.server, *newArgs.Server.HTTP) + if !serverNeedsRestarting { + return nil + } + + c.shutdownServer() + + srv, err := fnet.NewTargetServer(c.opts.Logger, "pyroscope_receive_http", c.opts.Registerer, newArgs.Server) + if err != nil { + return fmt.Errorf("failed to create server: %w", err) + } + c.server = srv + + return c.server.MountAndRun(func(router *mux.Router) { + router.HandleFunc("/ingest", c.handleIngest).Methods(http.MethodPost) + }) +} + +func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { + c.mut.Lock() + appendables := c.appendables + c.mut.Unlock() + + // Create a pipe for each appendable + pipeWriters := make([]io.Writer, len(appendables)) + pipeReaders := make([]io.Reader, len(appendables)) + for i := range appendables { + pr, pw := io.Pipe() + pipeReaders[i] = pr + pipeWriters[i] = pw + } + mw := io.MultiWriter(pipeWriters...) + + // Create an errgroup with the timeout context + g, ctx := errgroup.WithContext(r.Context()) + + // Start copying the request body to all pipes + g.Go(func() error { + defer func() { + for _, pw := range pipeWriters { + pw.(io.WriteCloser).Close() + } + }() + _, err := io.Copy(mw, r.Body) + return err + }) + + // Process each appendable + for i, appendable := range appendables { + g.Go(func() error { + defer pipeReaders[i].(io.ReadCloser).Close() + + profile := &pyroscope.IncomingProfile{ + Body: io.NopCloser(pipeReaders[i]), + Headers: r.Header.Clone(), + URL: r.URL, + } + + err := appendable.Appender().AppendIngest(ctx, profile) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "Failed to append profile", "appendable", i, "err", err) + return err + } + level.Debug(c.opts.Logger).Log("msg", "Profile appended successfully", "appendable", i) + return nil + }) + } + + err := g.Wait() + if err != nil { + var writeErr *write.PyroscopeWriteError + if errors.As(err, &writeErr) { + http.Error(w, http.StatusText(writeErr.StatusCode), writeErr.StatusCode) + } else { + level.Error(c.opts.Logger).Log("msg", "Failed to process request", "err", err) + http.Error(w, "Failed to process request", http.StatusInternalServerError) + } + return + } + w.WriteHeader(http.StatusOK) +} + +func (c *Component) shutdownServer() { + if c.server != nil { + c.server.StopAndShutdown() + c.server = nil + } +} diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go new file mode 100644 index 0000000000..71929abce6 --- /dev/null +++ b/internal/component/pyroscope/receive_http/receive_http_test.go @@ -0,0 +1,289 @@ +package receive_http + +import ( + "bytes" + "context" + "crypto/rand" + "fmt" + "io" + "net/http" + "net/url" + "testing" + "time" + + "github.com/phayes/freeport" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component" + fnet "github.com/grafana/alloy/internal/component/common/net" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/util" +) + +// TestForwardsProfiles verifies the behavior of the pyroscope.receive_http component +// under various scenarios. It tests different profile sizes, HTTP methods, paths, +// query parameters, and error conditions to ensure correct forwarding behavior +// and proper error handling. +func TestForwardsProfiles(t *testing.T) { + tests := []struct { + name string + profileSize int + method string + path string + queryParams string + headers map[string]string + appendableErrors []error + expectedStatus int + expectedForwards int + }{ + { + name: "Small profile", + profileSize: 1024, // 1KB + method: "POST", + path: "/ingest", + queryParams: "name=test_app_1&from=1234567890&until=1234567900", + headers: map[string]string{"Content-Type": "application/octet-stream"}, + appendableErrors: []error{nil, nil}, + expectedStatus: http.StatusOK, + expectedForwards: 2, + }, + { + name: "Large profile with custom headers", + profileSize: 1024 * 1024, // 1MB + method: "POST", + path: "/ingest", + queryParams: "name=test_app_2&from=1234567891&until=1234567901&custom=param1", + headers: map[string]string{"X-Scope-OrgID": "1234"}, + appendableErrors: []error{nil}, + expectedStatus: http.StatusOK, + expectedForwards: 1, + }, + { + name: "Invalid method", + profileSize: 1024, + method: "GET", + path: "/ingest", + queryParams: "name=test_app_3&from=1234567892&until=1234567902", + headers: map[string]string{}, + appendableErrors: []error{nil, nil}, + expectedStatus: http.StatusMethodNotAllowed, + expectedForwards: 0, + }, + { + name: "Invalid query params", + profileSize: 1024, + method: "GET", + path: "/ingest", + queryParams: "test=test_app", + headers: map[string]string{}, + appendableErrors: []error{nil, nil}, + expectedStatus: http.StatusMethodNotAllowed, + expectedForwards: 0, + }, + { + name: "Invalid path", + profileSize: 1024, + method: "POST", + path: "/invalid", + queryParams: "name=test_app_4&from=1234567893&until=1234567903", + headers: map[string]string{"Content-Type": "application/octet-stream"}, + appendableErrors: []error{nil, nil}, + expectedStatus: http.StatusNotFound, + expectedForwards: 0, + }, + { + name: "All appendables fail", + profileSize: 2048, + method: "POST", + path: "/ingest", + queryParams: "name=test_app_5&from=1234567894&until=1234567904&scenario=all_fail", + headers: map[string]string{"Content-Type": "application/octet-stream", "X-Test": "fail-all"}, + appendableErrors: []error{fmt.Errorf("error1"), fmt.Errorf("error2")}, + expectedStatus: http.StatusInternalServerError, + expectedForwards: 2, + }, + { + name: "One appendable fails, one succeeds", + profileSize: 4096, + method: "POST", + path: "/ingest", + queryParams: "name=test_app_6&from=1234567895&until=1234567905&scenario=partial_failure", + headers: map[string]string{"X-Custom-ID": "test-6"}, + appendableErrors: []error{fmt.Errorf("error"), nil}, + expectedStatus: http.StatusInternalServerError, + expectedForwards: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + appendables := createTestAppendables(tt.appendableErrors) + port := startComponent(t, appendables) + + testProfile, resp := sendCustomRequest(t, port, tt.method, tt.path, tt.queryParams, tt.headers, tt.profileSize) + require.Equal(t, tt.expectedStatus, resp.StatusCode) + + forwardedCount := countForwardedProfiles(appendables) + require.Equal(t, tt.expectedForwards, forwardedCount, "Unexpected number of forwards") + + if tt.expectedForwards > 0 { + verifyForwardedProfiles(t, appendables, testProfile, tt.headers, tt.queryParams) + } + }) + } +} + +func createTestAppendables(errors []error) []pyroscope.Appendable { + var appendables []pyroscope.Appendable + for _, err := range errors { + appendables = append(appendables, testAppendable(err)) + } + return appendables +} + +func countForwardedProfiles(appendables []pyroscope.Appendable) int { + count := 0 + for _, app := range appendables { + if testApp, ok := app.(*testAppender); ok && testApp.lastProfile != nil { + count++ + } + } + return count +} + +func verifyForwardedProfiles(t *testing.T, appendables []pyroscope.Appendable, expectedProfile []byte, expectedHeaders map[string]string, expectedQueryParams string) { + for i, app := range appendables { + testApp, ok := app.(*testAppender) + require.True(t, ok, "Appendable is not a testAppender") + + if testApp.lastProfile != nil { + // Verify profile body + body, err := io.ReadAll(testApp.lastProfile.Body) + require.NoError(t, err, "Failed to read profile body for appendable %d", i) + require.Equal(t, expectedProfile, body, "Profile mismatch for appendable %d", i) + + // Verify headers + for key, value := range expectedHeaders { + require.Equal(t, value, testApp.lastProfile.Headers.Get(key), "Header mismatch for key %s in appendable %d", key, i) + } + + // Verify query parameters + expectedParams, err := url.ParseQuery(expectedQueryParams) + require.NoError(t, err, "Failed to parse expected query parameters") + actualParams := testApp.lastProfile.URL.Query() + for key, values := range expectedParams { + require.Equal(t, values, actualParams[key], "Query parameter mismatch for key %s in appendable %d", key, i) + } + } + } +} + +func startComponent(t *testing.T, appendables []pyroscope.Appendable) int { + port, err := freeport.GetFreePort() + require.NoError(t, err) + + args := Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + }, + ForwardTo: appendables, + } + + comp, err := New(testOptions(t), args) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + go func() { + require.NoError(t, comp.Run(ctx)) + }() + + waitForServerReady(t, port) + return port +} + +func sendCustomRequest(t *testing.T, port int, method, path, queryParams string, headers map[string]string, profileSize int) ([]byte, *http.Response) { + t.Helper() + testProfile := make([]byte, profileSize) + _, err := rand.Read(testProfile) + require.NoError(t, err) + + testURL := fmt.Sprintf("http://localhost:%d%s?%s", port, path, queryParams) + + req, err := http.NewRequest(method, testURL, bytes.NewReader(testProfile)) + require.NoError(t, err) + + for key, value := range headers { + req.Header.Set(key, value) + } + + client := &http.Client{ + Timeout: 5 * time.Second, + } + resp, err := client.Do(req) + require.NoError(t, err) + + return testProfile, resp +} + +func waitForServerReady(t *testing.T, port int) { + t.Helper() + require.Eventually(t, func() bool { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/", port)) + if err != nil { + return false + } + defer resp.Body.Close() + return resp.StatusCode == http.StatusNotFound + }, 2*time.Second, 100*time.Millisecond, "server did not start in time") +} + +func testAppendable(appendErr error) pyroscope.Appendable { + return &testAppender{appendErr: appendErr} +} + +type testAppender struct { + appendErr error + lastProfile *pyroscope.IncomingProfile +} + +func (a *testAppender) Appender() pyroscope.Appender { + return a +} + +func (a *testAppender) Append(_ context.Context, _ labels.Labels, _ []*pyroscope.RawSample) error { + return fmt.Errorf("Append method not implemented for test") +} + +func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error { + var buf bytes.Buffer + tee := io.TeeReader(profile.Body, &buf) + + newProfile := &pyroscope.IncomingProfile{ + Body: io.NopCloser(&buf), + Headers: profile.Headers, + URL: profile.URL, + } + a.lastProfile = newProfile + + _, err := io.Copy(io.Discard, tee) + if err != nil { + return err + } + + return a.appendErr +} + +func testOptions(t *testing.T) component.Options { + return component.Options{ + ID: "pyroscope.receive_http.test", + Logger: util.TestAlloyLogger(t), + Registerer: prometheus.NewRegistry(), + } +} diff --git a/internal/component/pyroscope/scrape/delta_profiles.go b/internal/component/pyroscope/scrape/delta_profiles.go index 926b7179cc..c2de9154f8 100644 --- a/internal/component/pyroscope/scrape/delta_profiles.go +++ b/internal/component/pyroscope/scrape/delta_profiles.go @@ -132,6 +132,11 @@ func (d *deltaAppender) Append(ctx context.Context, lbs labels.Labels, samples [ return nil } +func (d *deltaAppender) AppendIngest(_ context.Context, _ *pyroscope.IncomingProfile) error { + // No-op: AppendIngest is not used in deltaAppender + return nil +} + // computeDelta computes the delta between the given profile and the last // data is uncompressed if it is gzip compressed. // The returned data is always gzip compressed. diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 99c53cbfd2..d268edb28c 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -3,23 +3,29 @@ package write import ( "context" "errors" + "fmt" + "io" + "net/http" + "net/url" + "path" "strings" "time" "connectrpc.com/connect" - "github.com/grafana/alloy/internal/alloyseed" - "github.com/grafana/alloy/internal/component/pyroscope" - "github.com/grafana/alloy/internal/featuregate" - "github.com/grafana/alloy/internal/runtime/logging/level" - "github.com/grafana/alloy/internal/useragent" "github.com/oklog/run" commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" + "github.com/grafana/alloy/internal/alloyseed" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/config" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/useragent" "github.com/grafana/dskit/backoff" pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" @@ -149,34 +155,40 @@ func (c *Component) Update(newConfig component.Arguments) error { type fanOutClient struct { // The list of push clients to fan out to. - clients []pushv1connect.PusherServiceClient - - config Arguments - opts component.Options - metrics *metrics + clients []pushv1connect.PusherServiceClient + httpClient *http.Client + config Arguments + opts component.Options + metrics *metrics } // NewFanOut creates a new fan out client that will fan out to all endpoints. func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) { clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints)) uid := alloyseed.Get().UID + + var httpClient *http.Client for _, endpoint := range config.Endpoints { if endpoint.Headers == nil { endpoint.Headers = map[string]string{} } endpoint.Headers[alloyseed.LegacyHeaderName] = uid endpoint.Headers[alloyseed.HeaderName] = uid - httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) + client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) if err != nil { return nil, err } - clients = append(clients, pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent))) + clients = append(clients, pushv1connect.NewPusherServiceClient(client, endpoint.URL, WithUserAgent(userAgent))) + if httpClient == nil { + httpClient = client + } } return &fanOutClient{ - clients: clients, - config: config, - opts: opts, - metrics: metrics, + clients: clients, + httpClient: httpClient, + config: config, + opts: opts, + metrics: metrics, }, nil } @@ -271,7 +283,7 @@ func requestSize(req *connect.Request[pushv1.PushRequest]) (int64, int64) { return size, profiles } -// Append implements the pyroscope.Appendable interface. +// Appender implements the pyroscope.Appendable interface. func (f *fanOutClient) Appender() pyroscope.Appender { return f } @@ -318,6 +330,82 @@ func (f *fanOutClient) Append(ctx context.Context, lbs labels.Labels, samples [] return err } +type PyroscopeWriteError struct { + StatusCode int +} + +func (e *PyroscopeWriteError) Error() string { + return fmt.Sprintf("pyroscope write error: status %d", e.StatusCode) +} + +// AppendIngest implements the pyroscope.Appender interface. +func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error { + pipeWriters := make([]io.Writer, len(f.config.Endpoints)) + pipeReaders := make([]io.Reader, len(f.config.Endpoints)) + for i := range f.config.Endpoints { + pr, pw := io.Pipe() + pipeReaders[i] = pr + pipeWriters[i] = pw + } + mw := io.MultiWriter(pipeWriters...) + + g, ctx := errgroup.WithContext(ctx) + + // Start copying the profile body to all pipes + g.Go(func() error { + defer func() { + for _, pw := range pipeWriters { + pw.(io.WriteCloser).Close() + } + }() + _, err := io.Copy(mw, profile.Body) + return err + }) + + // Send to each endpoint concurrently + for i, endpoint := range f.config.Endpoints { + g.Go(func() error { + defer pipeReaders[i].(io.ReadCloser).Close() + + u, err := url.Parse(endpoint.URL) + if err != nil { + return fmt.Errorf("parse endpoint URL: %w", err) + } + + u.Path = path.Join(u.Path, profile.URL.Path) + u.RawQuery = profile.URL.RawQuery + + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), pipeReaders[i]) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + // First set profile headers as defaults + for k, v := range profile.Headers { + req.Header[k] = v + } + + // Override any profile duplicated header + for k, v := range endpoint.Headers { + req.Header.Set(k, v) + } + + resp, err := f.httpClient.Do(req) + if err != nil { + return fmt.Errorf("do request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return &PyroscopeWriteError{StatusCode: resp.StatusCode} + } + return nil + }) + } + + return g.Wait() +} + // WithUserAgent returns a `connect.ClientOption` that sets the User-Agent header on. func WithUserAgent(agent string) connect.ClientOption { return connect.WithInterceptors(&agentInterceptor{agent}) diff --git a/internal/component/pyroscope/write/write_test.go b/internal/component/pyroscope/write/write_test.go index 3439435e04..74fa3d5612 100644 --- a/internal/component/pyroscope/write/write_test.go +++ b/internal/component/pyroscope/write/write_test.go @@ -1,10 +1,13 @@ package write import ( + "bytes" "context" "errors" + "io" "net/http" "net/http/httptest" + "net/url" "sync" "testing" "time" @@ -253,3 +256,81 @@ func TestBadAlloyConfig(t *testing.T) { err := syntax.Unmarshal([]byte(exampleAlloyConfig), &args) require.ErrorContains(t, err, "at most one of basic_auth, authorization, oauth2, bearer_token & bearer_token_file must be configured") } + +func Test_Write_AppendIngest(t *testing.T) { + var ( + export Exports + argument = DefaultArguments() + appendCount = atomic.NewInt32(0) + serverCount = int32(3) + servers = make([]*httptest.Server, serverCount) + endpoints = make([]*EndpointOptions, 0, serverCount) + ) + + testData := []byte("test-profile-data") + + handlerFn := func(expectedPath, expectedQuery string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + appendCount.Inc() + require.Equal(t, expectedPath, r.URL.Path, "Unexpected path") + require.Equal(t, expectedQuery, r.URL.RawQuery, "Unexpected query") + require.Equal(t, "endpoint-value", r.Header.Get("X-Test-Header")) + require.Equal(t, []string{"profile-value1", "profile-value2"}, r.Header["X-Profile-Header"]) + body, err := io.ReadAll(r.Body) + require.NoError(t, err, "Failed to read request body") + require.Equal(t, testData, body, "Unexpected body content") + w.WriteHeader(http.StatusOK) + } + } + + for i := int32(0); i < serverCount; i++ { + servers[i] = httptest.NewServer(handlerFn("/ingest", "key=value")) + endpoints = append(endpoints, &EndpointOptions{ + URL: servers[i].URL, + RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout, + Headers: map[string]string{ + "X-Test-Header": "endpoint-value", + }, + }) + } + defer func() { + for _, s := range servers { + s.Close() + } + }() + + argument.Endpoints = endpoints + + // Create the receiver + var wg sync.WaitGroup + wg.Add(1) + c, err := New(component.Options{ + ID: "test-write", + Logger: util.TestAlloyLogger(t), + Registerer: prometheus.NewRegistry(), + OnStateChange: func(e component.Exports) { + defer wg.Done() + export = e.(Exports) + }, + }, argument) + require.NoError(t, err, "Failed to create component") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go c.Run(ctx) + wg.Wait() // wait for the state change to happen + require.NotNil(t, export.Receiver, "Receiver is nil") + + incomingProfile := &pyroscope.IncomingProfile{ + Body: io.NopCloser(bytes.NewReader(testData)), + Headers: http.Header{ + "X-Test-Header": []string{"profile-value"}, // This should be overridden by endpoint + "X-Profile-Header": []string{"profile-value1", "profile-value2"}, // This should be preserved + }, + URL: &url.URL{Path: "/ingest", RawQuery: "key=value"}, + } + + err = export.Receiver.Appender().AppendIngest(context.Background(), incomingProfile) + require.NoError(t, err) + require.Equal(t, serverCount, appendCount.Load()) +}