Skip to content

Commit

Permalink
Fix pyroscope.write issues with pyroscope.receive_http
Browse files Browse the repository at this point in the history
The nodejs Pyroscope SDK sends profiles with a `Connection: close` header.
This header was copied to the upstream request, causing connection churn
towards Pyroscope, which can be quite bad on the CPU when using TLS. Do not
copy the `Connection` header from the incoming request to fix this issue.

Additionally, `pyroscope.write` had a single `http.Client` used for
forwarding data from `pyroscope.receive_http`, which may not work if
multiple endpoints are configured with different options. To fix this,
store a `http.Client` for each endpoint.
  • Loading branch information
madaraszg-tulip committed Dec 2, 2024
1 parent 2b16ff0 commit f803ffc
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ Main (unreleased)
- Add perf_schema quantile columns to collector

### Bugfixes
- Fixed an issue in the `pyroscope.write` component to prevent TLS connection churn to Pyroscope when the `pyroscope.receive_http` clients don't request keepalive (@madaraszg-tulip)

- Fixed an issue in the `pyroscope.write` component with multiple endpoints not working correctly for forwarding profiles from `pyroscope.receive_http` (@madaraszg-tulip)

- Fixed an issue in the `pyroscope.write` component to allow slashes in application names in the same way it is done in the Pyroscope push API (@marcsanmi)

- Fixed an issue in the `prometheus.exporter.postgres` component that would leak goroutines when the target was not reachable (@dehaansa)

- Fixed an issue in the `otelcol.exporter.prometheus` component that would set series value incorrectly for stale metrics (@YusifAghalar)
Expand Down
46 changes: 27 additions & 19 deletions internal/component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,40 +155,38 @@ func (c *Component) Update(newConfig component.Arguments) error {

type fanOutClient struct {
// The list of push clients to fan out to.
clients []pushv1connect.PusherServiceClient
httpClient *http.Client
config Arguments
opts component.Options
metrics *metrics
pushClients []pushv1connect.PusherServiceClient
ingestClients map[*EndpointOptions]*http.Client
config Arguments
opts component.Options
metrics *metrics
}

// NewFanOut creates a new fan out client that will fan out to all endpoints.
func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) {
clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints))
pushClients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints))
ingestClients := make(map[*EndpointOptions]*http.Client)
uid := alloyseed.Get().UID

var httpClient *http.Client
for _, endpoint := range config.Endpoints {
if endpoint.Headers == nil {
endpoint.Headers = map[string]string{}
}
endpoint.Headers[alloyseed.LegacyHeaderName] = uid
endpoint.Headers[alloyseed.HeaderName] = uid
client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name)
httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name)
if err != nil {
return nil, err
}
clients = append(clients, pushv1connect.NewPusherServiceClient(client, endpoint.URL, WithUserAgent(userAgent)))
if httpClient == nil {
httpClient = client
}
pushClients = append(pushClients, pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent)))
ingestClients[endpoint] = httpClient
}
return &fanOutClient{
clients: clients,
httpClient: httpClient,
config: config,
opts: opts,
metrics: metrics,
pushClients: pushClients,
ingestClients: ingestClients,
config: config,
opts: opts,
metrics: metrics,
}, nil
}

Expand All @@ -202,7 +200,7 @@ func (f *fanOutClient) Push(ctx context.Context, req *connect.Request[pushv1.Pus
reqSize, profileCount = requestSize(req)
)

for i, client := range f.clients {
for i, client := range f.pushClients {
var (
client = client
i = i
Expand Down Expand Up @@ -395,6 +393,11 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco

// First set profile headers as defaults
for k, v := range profile.Headers {
// Ignore this header as it may interfere with keepalives in the connection to pyroscope
// which may cause huge load due to tls renegotiation
if k == "Connection" {
continue
}
req.Header[k] = v
}

Expand All @@ -403,12 +406,17 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
req.Header.Set(k, v)
}

resp, err := f.httpClient.Do(req)
resp, err := f.ingestClients[endpoint].Do(req)
if err != nil {
return fmt.Errorf("do request: %w", err)
}
defer resp.Body.Close()

_, err = io.Copy(io.Discard, resp.Body)
if err != nil {
return fmt.Errorf("read response body: %w", err)
}

if resp.StatusCode != http.StatusOK {
return &PyroscopeWriteError{StatusCode: resp.StatusCode}
}
Expand Down

0 comments on commit f803ffc

Please sign in to comment.