diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f8b9fd34d..b94ee462a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,7 +45,12 @@ Main (unreleased) - Add perf_schema quantile columns to collector ### Bugfixes +- Fixed an issue in the `pyroscope.write` component to prevent TLS connection churn to Pyroscope when the `pyroscope.receive_http` clients don't request keepalive (@madaraszg-tulip) + +- Fixed an issue in the `pyroscope.write` component with multiple endpoints not working correctly for forwarding profiles from `pyroscope.receive_http` (@madaraszg-tulip) + - Fixed an issue in the `pyroscope.write` component to allow slashes in application names in the same way it is done in the Pyroscope push API (@marcsanmi) + - Fixed an issue in the `prometheus.exporter.postgres` component that would leak goroutines when the target was not reachable (@dehaansa) - Fixed an issue in the `otelcol.exporter.prometheus` component that would set series value incorrectly for stale metrics (@YusifAghalar) diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 157287f539..63fed96c60 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -155,40 +155,38 @@ func (c *Component) Update(newConfig component.Arguments) error { type fanOutClient struct { // The list of push clients to fan out to. - clients []pushv1connect.PusherServiceClient - httpClient *http.Client - config Arguments - opts component.Options - metrics *metrics + pushClients []pushv1connect.PusherServiceClient + ingestClients map[*EndpointOptions]*http.Client + config Arguments + opts component.Options + metrics *metrics } // NewFanOut creates a new fan out client that will fan out to all endpoints. func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) { - clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints)) + pushClients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints)) + ingestClients := make(map[*EndpointOptions]*http.Client) uid := alloyseed.Get().UID - var httpClient *http.Client for _, endpoint := range config.Endpoints { if endpoint.Headers == nil { endpoint.Headers = map[string]string{} } endpoint.Headers[alloyseed.LegacyHeaderName] = uid endpoint.Headers[alloyseed.HeaderName] = uid - client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) + httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) if err != nil { return nil, err } - clients = append(clients, pushv1connect.NewPusherServiceClient(client, endpoint.URL, WithUserAgent(userAgent))) - if httpClient == nil { - httpClient = client - } + pushClients = append(pushClients, pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent))) + ingestClients[endpoint] = httpClient } return &fanOutClient{ - clients: clients, - httpClient: httpClient, - config: config, - opts: opts, - metrics: metrics, + pushClients: pushClients, + ingestClients: ingestClients, + config: config, + opts: opts, + metrics: metrics, }, nil } @@ -202,7 +200,7 @@ func (f *fanOutClient) Push(ctx context.Context, req *connect.Request[pushv1.Pus reqSize, profileCount = requestSize(req) ) - for i, client := range f.clients { + for i, client := range f.pushClients { var ( client = client i = i @@ -395,6 +393,11 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco // First set profile headers as defaults for k, v := range profile.Headers { + // Ignore this header as it may interfere with keepalives in the connection to pyroscope + // which may cause huge load due to tls renegotiation + if k == "Connection" { + continue + } req.Header[k] = v } @@ -403,10 +406,14 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco req.Header.Set(k, v) } - resp, err := f.httpClient.Do(req) + resp, err := f.ingestClients[endpoint].Do(req) if err != nil { return fmt.Errorf("do request: %w", err) } + _, err = io.Copy(io.Discard, resp.Body) + if err != nil { + return fmt.Errorf("read response body: %w", err) + } defer resp.Body.Close() if resp.StatusCode != http.StatusOK {