-
Notifications
You must be signed in to change notification settings - Fork 486
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds
phlare.scrape
and phlare.write
component to support Profilin…
…g. (#2552) Adds pprof.scrape and phlare.write component to support Profiling. Co-authored-by: Karen Germond <[email protected]> Co-authored-by: Robert Fratto <[email protected]>
- Loading branch information
1 parent
af97242
commit 308955c
Showing
18 changed files
with
2,915 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
package phlare | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"github.com/hashicorp/go-multierror" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/prometheus/model/labels" | ||
) | ||
|
||
var NoopAppendable = AppendableFunc(func(_ context.Context, _ labels.Labels, _ []*RawSample) error { return nil }) | ||
|
||
type Appendable interface { | ||
Appender() Appender | ||
} | ||
|
||
type Appender interface { | ||
Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error | ||
} | ||
|
||
type RawSample struct { | ||
// raw_profile is the set of bytes of the pprof profile | ||
RawProfile []byte | ||
} | ||
|
||
var _ Appendable = (*Fanout)(nil) | ||
|
||
// Fanout supports the default Flow style of appendables since it can go to multiple outputs. It also allows the intercepting of appends. | ||
type Fanout struct { | ||
mut sync.RWMutex | ||
// children is where to fan out. | ||
children []Appendable | ||
// ComponentID is what component this belongs to. | ||
componentID string | ||
writeLatency prometheus.Histogram | ||
} | ||
|
||
// NewFanout creates a fanout appendable. | ||
func NewFanout(children []Appendable, componentID string, register prometheus.Registerer) *Fanout { | ||
wl := prometheus.NewHistogram(prometheus.HistogramOpts{ | ||
Name: "phlare_fanout_latency", | ||
Help: "Write latency for sending to phlare profiles", | ||
}) | ||
_ = register.Register(wl) | ||
return &Fanout{ | ||
children: children, | ||
componentID: componentID, | ||
writeLatency: wl, | ||
} | ||
} | ||
|
||
// UpdateChildren allows changing of the children of the fanout. | ||
func (f *Fanout) UpdateChildren(children []Appendable) { | ||
f.mut.Lock() | ||
defer f.mut.Unlock() | ||
f.children = children | ||
} | ||
|
||
// Children returns the children of the fanout. | ||
func (f *Fanout) Children() []Appendable { | ||
f.mut.Lock() | ||
defer f.mut.Unlock() | ||
return f.children | ||
} | ||
|
||
// Appender satisfies the Appendable interface. | ||
func (f *Fanout) Appender() Appender { | ||
f.mut.RLock() | ||
defer f.mut.RUnlock() | ||
|
||
app := &appender{ | ||
children: make([]Appender, 0), | ||
componentID: f.componentID, | ||
writeLatency: f.writeLatency, | ||
} | ||
for _, x := range f.children { | ||
if x == nil { | ||
continue | ||
} | ||
app.children = append(app.children, x.Appender()) | ||
} | ||
return app | ||
} | ||
|
||
var _ Appender = (*appender)(nil) | ||
|
||
type appender struct { | ||
children []Appender | ||
componentID string | ||
writeLatency prometheus.Histogram | ||
} | ||
|
||
// Append satisfies the Appender interface. | ||
func (a *appender) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error { | ||
now := time.Now() | ||
defer func() { | ||
a.writeLatency.Observe(time.Since(now).Seconds()) | ||
}() | ||
var multiErr error | ||
for _, x := range a.children { | ||
err := x.Append(ctx, labels, samples) | ||
if err != nil { | ||
multiErr = multierror.Append(multiErr, err) | ||
} | ||
} | ||
return multiErr | ||
} | ||
|
||
type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error | ||
|
||
func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error { | ||
return f(ctx, labels, samples) | ||
} | ||
|
||
func (f AppendableFunc) Appender() Appender { | ||
return f | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package phlare | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/atomic" | ||
) | ||
|
||
func Test_FanOut(t *testing.T) { | ||
totalAppend := atomic.NewInt32(0) | ||
lbls := labels.Labels{ | ||
{Name: "foo", Value: "bar"}, | ||
} | ||
f := NewFanout([]Appendable{ | ||
AppendableFunc(func(_ context.Context, labels labels.Labels, _ []*RawSample) error { | ||
require.Equal(t, lbls, labels) | ||
totalAppend.Inc() | ||
return nil | ||
}), | ||
AppendableFunc(func(_ context.Context, labels labels.Labels, _ []*RawSample) error { | ||
require.Equal(t, lbls, labels) | ||
totalAppend.Inc() | ||
return nil | ||
}), | ||
AppendableFunc(func(_ context.Context, labels labels.Labels, _ []*RawSample) error { | ||
require.Equal(t, lbls, labels) | ||
totalAppend.Inc() | ||
return nil | ||
}), | ||
}, "foo", prometheus.NewRegistry()) | ||
require.NoError(t, f.Appender().Append(context.Background(), lbls, []*RawSample{})) | ||
require.Equal(t, int32(3), totalAppend.Load()) | ||
f.UpdateChildren([]Appendable{ | ||
AppendableFunc(func(_ context.Context, labels labels.Labels, _ []*RawSample) error { | ||
require.Equal(t, lbls, labels) | ||
totalAppend.Inc() | ||
return errors.New("foo") | ||
}), | ||
AppendableFunc(func(_ context.Context, labels labels.Labels, _ []*RawSample) error { | ||
require.Equal(t, lbls, labels) | ||
totalAppend.Inc() | ||
return nil | ||
}), | ||
}) | ||
totalAppend.Store(0) | ||
require.Error(t, f.Appender().Append(context.Background(), lbls, []*RawSample{})) | ||
require.Equal(t, int32(2), totalAppend.Load()) | ||
} |
Oops, something went wrong.