Skip to content

Commit

Permalink
Add selector of exemplar reservoir providers to metric.Stream configu…
Browse files Browse the repository at this point in the history
…ration (#5861)

Resolve #5249

### Spec

> exemplar_reservoir: A functional type that generates an exemplar
reservoir a MeterProvider will use when storing exemplars. This
functional type needs to be a factory or callback similar to aggregation
selection functionality which allows different reservoirs to be chosen
by the aggregation.

> Users can provide an exemplar_reservoir, but it is up to their
discretion. Therefore, the stream configuration parameter needs to be
structured to accept an exemplar_reservoir, but MUST NOT obligate a user
to provide one. If the user does not provide an exemplar_reservoir
value, the MeterProvider MUST apply a [default exemplar
reservoir](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar-defaults).

Also,

> the reservoir MUST be given the Attributes associated with its
timeseries point either at construction so that additional sampling
performed by the reservoir has access to all attributes from a
measurement in the "offer" method.

### Changes

In sdk/metric/exemplar, add:
* `exemplar.ReservoirProvider`
* `exemplar.HistogramReservoirProvider`
* `exemplar.FixedSizeReservoirProvider`

In sdk/metric, add:
* `metric.ExemplarReservoirProviderSelector` (func Aggregation ->
ReservoirProvider)
* `metric.DefaultExemplarReservoirProviderSelector` (our default
implementation)
* `ExemplarReservoirProviderSelector` added to `metric.Stream`

Note: the only required public types are
`metric.ExemplarReservoirProviderSelector` and
`ExemplarReservoirProviderSelector` in `metric.Stream`. Others are for
convenience and readability.

### Alternatives considered

#### Add ExemplarReservoirProvider directly to metric.Stream, instead of
ExemplarReservoirProviderSelector

This would mean users can configure a `func() exemplar.Reservoir`
instead of a `func(Aggregation) func() exemplar.Reservoir`.

I don't think this complies with the statement: `This functional type
needs to be a factory or callback similar to aggregation selection
functionality which allows different reservoirs to be chosen by the
aggregation.`. I'm interpreting "allows different reservoirs to be
chosen by the aggregation" as meaning "allows different reservoirs to be
chosen **based on the** aggregation", rather than meaning that the
aggregation is somehow choosing the reservoir.

### Future work

There is some refactoring I plan to do after this to simplify the
interaction between the internal/aggregate and exemplar package. I've
omitted that from this PR to keep the diff smaller.

---------

Co-authored-by: Tyler Yahn <[email protected]>
Co-authored-by: Robert Pająk <[email protected]>
  • Loading branch information
3 people authored Oct 18, 2024
1 parent cd754a6 commit 81b2a33
Show file tree
Hide file tree
Showing 21 changed files with 170 additions and 54 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850)
- Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850)
- Add `ExemplarReservoirProviderSelector` and `DefaultExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric`, which defines the exemplar reservoir to use based on the aggregation of the metric. (#5861)
- Add `ExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric.Stream` to allow using views to configure the exemplar reservoir to use for a metric. (#5861)
- Add `ReservoirProvider`, `HistogramReservoirProvider` and `FixedSizeReservoirProvider` to `go.opentelemetry.io/otel/sdk/metric/exemplar` to make it convenient to use providers of Reservoirs. (#5861)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
25 changes: 25 additions & 0 deletions sdk/metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,31 @@ func ExampleNewView_exponentialHistogram() {
)
}

func ExampleNewView_exemplarreservoirproviderselector() {
// Create a view that makes all metrics use a different exemplar reservoir.
view := metric.NewView(
metric.Instrument{Name: "*"},
metric.Stream{
ExemplarReservoirProviderSelector: func(agg metric.Aggregation) exemplar.ReservoirProvider {
// This example uses a fixed-size reservoir with a size of 10
// for explicit bucket histograms instead of the default
// bucket-aligned reservoir.
if _, ok := agg.(metric.AggregationExplicitBucketHistogram); ok {
return exemplar.FixedSizeReservoirProvider(10)
}
// Fall back to the default reservoir otherwise.
return metric.DefaultExemplarReservoirProviderSelector(agg)
},
},
)

// The created view can then be registered with the OpenTelemetry metric
// SDK using the WithView option.
_ = metric.NewMeterProvider(
metric.WithView(view),
)
}

func ExampleWithExemplarFilter_disabled() {
// Use exemplar.AlwaysOffFilter to disable exemplar collection.
_ = metric.NewMeterProvider(
Expand Down
41 changes: 31 additions & 10 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,48 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"runtime"
"slices"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

// ExemplarReservoirProviderSelector selects the
// [exemplar.ReservoirProvider] to use
// based on the [Aggregation] of the metric.
type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and filter configuration.
func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] {
func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
}
}

// DefaultExemplarReservoirProviderSelector returns the default
// [exemplar.ReservoirProvider] for the
// provided [Aggregation].
//
// For explicit bucket histograms with more than 1 bucket, it uses the
// [exemplar.HistogramReservoirProvider].
// For exponential histograms, it uses the
// [exemplar.FixedSizeReservoirProvider]
// with a size of min(20, max_buckets).
// For all other aggregations, it uses the
// [exemplar.FixedSizeReservoirProvider]
// with a size equal to the number of CPUs.
//
// Exemplar default reservoirs MAY change in a minor version bump. No
// guarantees are made on the shape or statistical properties of returned
// exemplars.
func DefaultExemplarReservoirProviderSelector(agg Aggregation) exemplar.ReservoirProvider {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() aggregate.FilteredExemplarReservoir[N] {
bounds := cp
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds))
}
return exemplar.HistogramReservoirProvider(a.Boundaries)
}

var n int
Expand All @@ -50,7 +73,5 @@ func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) f
}
}

return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n))
}
return exemplar.FixedSizeReservoirProvider(n)
}
7 changes: 7 additions & 0 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
func FixedSizeReservoirProvider(k int) ReservoirProvider {
return func(_ attribute.Set) Reservoir {
return NewFixedSizeReservoir(k)
}
}

// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
// k exemplars. If there are k or less measurements made, the Reservoir will
// sample each one. If there are more than k, the Reservoir will then randomly
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/exemplar/fixed_size_reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
)

func TestNewFixedSizeReservoir(t *testing.T) {
t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir, int) {
return NewFixedSizeReservoir(n), n
t.Run("Int64", ReservoirTest[int64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
}))

t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir, int) {
return NewFixedSizeReservoir(n), n
t.Run("Float64", ReservoirTest[float64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
}))
}

Expand Down
12 changes: 10 additions & 2 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// HistogramReservoirProvider is a provider of [HistogramReservoir].
func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
cp := slices.Clone(bounds)
slices.Sort(cp)
return func(_ attribute.Set) Reservoir {
return NewHistogramReservoir(cp)
}
}

// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
// measurement that falls within a histogram bucket. The histogram bucket
// upper-boundaries are define by bounds.
//
// The passed bounds will be sorted by this function.
// The passed bounds must be sorted before calling this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
slices.Sort(bounds)
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/exemplar/histogram_reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import "testing"

func TestHist(t *testing.T) {
bounds := []float64{0, 100}
t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) {
return NewHistogramReservoir(bounds), len(bounds)
t.Run("Int64", ReservoirTest[int64](func(int) (ReservoirProvider, int) {
return HistogramReservoirProvider(bounds), len(bounds)
}))

t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) {
return NewHistogramReservoir(bounds), len(bounds)
t.Run("Float64", ReservoirTest[float64](func(int) (ReservoirProvider, int) {
return HistogramReservoirProvider(bounds), len(bounds)
}))
}
8 changes: 8 additions & 0 deletions sdk/metric/exemplar/reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,11 @@ type Reservoir interface {
// The Reservoir state is preserved after this call.
Collect(dest *[]Exemplar)
}

// ReservoirProvider creates new [Reservoir]s.
//
// The attributes provided are attributes which are kept by the aggregation, and
// are exclusive with attributes passed to Offer. The combination of these
// attributes and the attributes passed to Offer is the complete set of
// attributes a measurement was made with.
type ReservoirProvider func(attr attribute.Set) Reservoir
17 changes: 11 additions & 6 deletions sdk/metric/exemplar/reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// Sat Jan 01 2000 00:00:00 GMT+0000.
var staticTime = time.Unix(946684800, 0)

type factory func(requestedCap int) (r Reservoir, actualCap int)
type factory func(requestedCap int) (r ReservoirProvider, actualCap int)

func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
return func(t *testing.T) {
Expand All @@ -29,10 +29,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("CaptureSpanContext", func(t *testing.T) {
t.Helper()

r, n := f(1)
rp, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

tID, sID := trace.TraceID{0x01}, trace.SpanID{0x01}
sc := trace.NewSpanContext(trace.SpanContextConfig{
Expand Down Expand Up @@ -60,10 +61,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("FilterAttributes", func(t *testing.T) {
t.Helper()

r, n := f(1)
rp, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

adminTrue := attribute.Bool("admin", true)
r.Offer(ctx, staticTime, NewValue(N(10)), []attribute.KeyValue{adminTrue})
Expand All @@ -83,10 +85,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("CollectLessThanN", func(t *testing.T) {
t.Helper()

r, n := f(2)
rp, n := f(2)
if n < 2 {
t.Skip("skipping, reservoir capacity less than 2:", n)
}
r := rp(*attribute.EmptySet())

r.Offer(ctx, staticTime, NewValue(N(10)), nil)

Expand All @@ -99,10 +102,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("MultipleOffers", func(t *testing.T) {
t.Helper()

r, n := f(3)
rp, n := f(3)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

for i := 0; i < n+1; i++ {
v := NewValue(N(i))
Expand All @@ -127,10 +131,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("DropAll", func(t *testing.T) {
t.Helper()

r, n := f(0)
rp, n := f(0)
if n > 0 {
t.Skip("skipping, reservoir capacity greater than 0:", n)
}
r := rp(*attribute.EmptySet())

r.Offer(context.Background(), staticTime, NewValue(N(10)), nil)

Expand Down
6 changes: 6 additions & 0 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ type Stream struct {
// Use NewAllowKeysFilter from "go.opentelemetry.io/otel/attribute" to
// provide an allow-list of attribute keys here.
AttributeFilter attribute.Filter
// ExemplarReservoirProvider selects the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] based
// on the [Aggregation].
//
// If unspecified, [DefaultExemplarReservoirProviderSelector] is used.
ExemplarReservoirProviderSelector ExemplarReservoirProviderSelector
}

// instID are the identifying properties of a instrument.
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Builder[N int64 | float64] struct {
//
// If this is not provided a default factory function that returns an
// dropReservoir reservoir will be used.
ReservoirFunc func() FilteredExemplarReservoir[N]
ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -49,7 +49,7 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] {
func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}

func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] {
return dropReservoir[N]()
func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] {
return dropReservoir[N](attr)
}

func TestBuilderFilter(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion sdk/metric/internal/aggregate/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
)

// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func dropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }
func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
return &dropRes[N]{}
}

type dropRes[N int64 | float64] struct{}

Expand Down
3 changes: 2 additions & 1 deletion sdk/metric/internal/aggregate/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

Expand All @@ -17,7 +18,7 @@ func TestDrop(t *testing.T) {
}

func testDropFiltered[N int64 | float64](t *testing.T) {
r := dropReservoir[N]()
r := dropReservoir[N](*attribute.EmptySet())

var dest []exemplar.Exemplar
r.Collect(&dest)
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int32

newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand All @@ -327,7 +327,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
v, ok := e.values[attr.Equivalent()]
if !ok {
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
v.res = e.newRes()
v.res = e.newRes(attr)

e.values[attr.Equivalent()] = v
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand Down Expand Up @@ -93,7 +93,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets[N](attr, len(s.bounds)+1)
b.res = s.newRes()
b.res = s.newRes(attr)

// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = value, value
Expand All @@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
Expand Down
Loading

0 comments on commit 81b2a33

Please sign in to comment.