Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pyroscope.write issues with pyroscope.receive_http #2201

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ Main (unreleased)
- Add perf_schema quantile columns to collector

### Bugfixes
- Fixed an issue in the `pyroscope.write` component to prevent TLS connection churn to Pyroscope when the `pyroscope.receive_http` clients don't request keepalive (@madaraszg-tulip)

- Fixed an issue in the `pyroscope.write` component with multiple endpoints not working correctly for forwarding profiles from `pyroscope.receive_http` (@madaraszg-tulip)

- Fixed an issue in the `pyroscope.write` component to allow slashes in application names in the same way it is done in the Pyroscope push API (@marcsanmi)

- Fixed an issue in the `prometheus.exporter.postgres` component that would leak goroutines when the target was not reachable (@dehaansa)

- Fixed an issue in the `otelcol.exporter.prometheus` component that would set series value incorrectly for stale metrics (@YusifAghalar)
Expand Down
57 changes: 38 additions & 19 deletions internal/component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ var (
return Arguments{}
}
_ component.Component = (*Component)(nil)

// List of headers to ignore when copying headers from client to server connection
// https://datatracker.ietf.org/doc/html/rfc9113#name-connection-specific-header-
ignoreProxyHeaders = map[string]bool{
"Connection": true,
"Proxy-Connection": true,
"Keep-Alive": true,
"Transfer-Encoding": true,
"Upgrade": true,
"TE": true,
}
)

func init() {
Expand Down Expand Up @@ -155,40 +166,38 @@ func (c *Component) Update(newConfig component.Arguments) error {

type fanOutClient struct {
// The list of push clients to fan out to.
clients []pushv1connect.PusherServiceClient
httpClient *http.Client
config Arguments
opts component.Options
metrics *metrics
pushClients []pushv1connect.PusherServiceClient
ingestClients map[*EndpointOptions]*http.Client
config Arguments
opts component.Options
metrics *metrics
}

// NewFanOut creates a new fan out client that will fan out to all endpoints.
func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) {
clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints))
pushClients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints))
ingestClients := make(map[*EndpointOptions]*http.Client)
uid := alloyseed.Get().UID

var httpClient *http.Client
for _, endpoint := range config.Endpoints {
if endpoint.Headers == nil {
endpoint.Headers = map[string]string{}
}
endpoint.Headers[alloyseed.LegacyHeaderName] = uid
endpoint.Headers[alloyseed.HeaderName] = uid
client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name)
httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name)
if err != nil {
return nil, err
}
clients = append(clients, pushv1connect.NewPusherServiceClient(client, endpoint.URL, WithUserAgent(userAgent)))
if httpClient == nil {
httpClient = client
}
pushClients = append(pushClients, pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent)))
ingestClients[endpoint] = httpClient
}
return &fanOutClient{
clients: clients,
httpClient: httpClient,
config: config,
opts: opts,
metrics: metrics,
pushClients: pushClients,
ingestClients: ingestClients,
config: config,
opts: opts,
metrics: metrics,
}, nil
}

Expand All @@ -202,7 +211,7 @@ func (f *fanOutClient) Push(ctx context.Context, req *connect.Request[pushv1.Pus
reqSize, profileCount = requestSize(req)
)

for i, client := range f.clients {
for i, client := range f.pushClients {
var (
client = client
i = i
Expand Down Expand Up @@ -395,6 +404,11 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco

// First set profile headers as defaults
for k, v := range profile.Headers {
// Ignore this header as it may interfere with keepalives in the connection to pyroscope
// which may cause huge load due to tls renegotiation
if _, exists := ignoreProxyHeaders[k]; exists {
continue
}
req.Header[k] = v
}

Expand All @@ -403,12 +417,17 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
req.Header.Set(k, v)
}

resp, err := f.httpClient.Do(req)
resp, err := f.ingestClients[endpoint].Do(req)
if err != nil {
return fmt.Errorf("do request: %w", err)
}
defer resp.Body.Close()

_, err = io.Copy(io.Discard, resp.Body)
if err != nil {
return fmt.Errorf("read response body: %w", err)
}

if resp.StatusCode != http.StatusOK {
return &PyroscopeWriteError{StatusCode: resp.StatusCode}
}
Expand Down
Loading