Skip to content

Commit

Permalink
Fix pyroscope.write issues with pyroscope.receive_http (#2201)
Browse files Browse the repository at this point in the history
* Fix pyroscope.write issues with pyroscope.receive_http

The nodejs Pyroscope SDK sends profiles with a `Connection: close` header.
This header was copied to the upstream request, causing connection churn
towards Pyroscope, which can be quite bad on the CPU when using TLS. Do not
copy the `Connection` header from the incoming request to fix this issue.

Additionally, `pyroscope.write` had a single `http.Client` used for
forwarding data from `pyroscope.receive_http`, which may not work if
multiple endpoints are configured with different options. To fix this,
store a `http.Client` for each endpoint.
  • Loading branch information
madaraszg-tulip authored Dec 3, 2024
1 parent 9177f33 commit 94dd6f2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 19 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ Main (unreleased)
- For sharding targets during clustering, `loki.source.podlogs` now only takes into account some labels. (@ptodev)

### Bugfixes
- Fixed an issue in the `pyroscope.write` component to prevent TLS connection churn to Pyroscope when the `pyroscope.receive_http` clients don't request keepalive (@madaraszg-tulip)

- Fixed an issue in the `pyroscope.write` component with multiple endpoints not working correctly for forwarding profiles from `pyroscope.receive_http` (@madaraszg-tulip)

- Fixed an issue in the `pyroscope.write` component to allow slashes in application names in the same way it is done in the Pyroscope push API (@marcsanmi)

- Fixed an issue in the `prometheus.exporter.postgres` component that would leak goroutines when the target was not reachable (@dehaansa)

- Fixed an issue in the `otelcol.exporter.prometheus` component that would set series value incorrectly for stale metrics (@YusifAghalar)
Expand Down
57 changes: 38 additions & 19 deletions internal/component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ var (
return Arguments{}
}
_ component.Component = (*Component)(nil)

// List of headers to ignore when copying headers from client to server connection
// https://datatracker.ietf.org/doc/html/rfc9113#name-connection-specific-header-
ignoreProxyHeaders = map[string]bool{
"Connection": true,
"Proxy-Connection": true,
"Keep-Alive": true,
"Transfer-Encoding": true,
"Upgrade": true,
"TE": true,
}
)

func init() {
Expand Down Expand Up @@ -155,40 +166,38 @@ func (c *Component) Update(newConfig component.Arguments) error {

type fanOutClient struct {
// The list of push clients to fan out to.
clients []pushv1connect.PusherServiceClient
httpClient *http.Client
config Arguments
opts component.Options
metrics *metrics
pushClients []pushv1connect.PusherServiceClient
ingestClients map[*EndpointOptions]*http.Client
config Arguments
opts component.Options
metrics *metrics
}

// NewFanOut creates a new fan out client that will fan out to all endpoints.
func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) {
clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints))
pushClients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints))
ingestClients := make(map[*EndpointOptions]*http.Client)
uid := alloyseed.Get().UID

var httpClient *http.Client
for _, endpoint := range config.Endpoints {
if endpoint.Headers == nil {
endpoint.Headers = map[string]string{}
}
endpoint.Headers[alloyseed.LegacyHeaderName] = uid
endpoint.Headers[alloyseed.HeaderName] = uid
client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name)
httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name)
if err != nil {
return nil, err
}
clients = append(clients, pushv1connect.NewPusherServiceClient(client, endpoint.URL, WithUserAgent(userAgent)))
if httpClient == nil {
httpClient = client
}
pushClients = append(pushClients, pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent)))
ingestClients[endpoint] = httpClient
}
return &fanOutClient{
clients: clients,
httpClient: httpClient,
config: config,
opts: opts,
metrics: metrics,
pushClients: pushClients,
ingestClients: ingestClients,
config: config,
opts: opts,
metrics: metrics,
}, nil
}

Expand All @@ -202,7 +211,7 @@ func (f *fanOutClient) Push(ctx context.Context, req *connect.Request[pushv1.Pus
reqSize, profileCount = requestSize(req)
)

for i, client := range f.clients {
for i, client := range f.pushClients {
var (
client = client
i = i
Expand Down Expand Up @@ -395,6 +404,11 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco

// First set profile headers as defaults
for k, v := range profile.Headers {
// Ignore this header as it may interfere with keepalives in the connection to pyroscope
// which may cause huge load due to tls renegotiation
if _, exists := ignoreProxyHeaders[k]; exists {
continue
}
req.Header[k] = v
}

Expand All @@ -403,12 +417,17 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
req.Header.Set(k, v)
}

resp, err := f.httpClient.Do(req)
resp, err := f.ingestClients[endpoint].Do(req)
if err != nil {
return fmt.Errorf("do request: %w", err)
}
defer resp.Body.Close()

_, err = io.Copy(io.Discard, resp.Body)
if err != nil {
return fmt.Errorf("read response body: %w", err)
}

if resp.StatusCode != http.StatusOK {
return &PyroscopeWriteError{StatusCode: resp.StatusCode}
}
Expand Down

0 comments on commit 94dd6f2

Please sign in to comment.