diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ae9192b754..5388df8dfac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `ExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric.Stream` to allow using views to configure the exemplar reservoir to use for a metric. (#5861) - Add `ReservoirProvider`, `HistogramReservoirProvider` and `FixedSizeReservoirProvider` to `go.opentelemetry.io/otel/sdk/metric/exemplar` to make it convenient to use providers of Reservoirs. (#5861) +### Fixed + +- Global MeterProvider registration unwraps global instrument Observers, the undocumented Unwrap() methods are now private. (#5881) + diff --git a/internal/global/alternate_meter_test.go b/internal/global/alternate_meter_test.go new file mode 100644 index 00000000000..428534178a3 --- /dev/null +++ b/internal/global/alternate_meter_test.go @@ -0,0 +1,250 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package global // import "go.opentelemetry.io/otel/internal/global" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/embedded" + "go.opentelemetry.io/otel/metric/noop" +) + +// Below, an alternate meter provider is constructed specifically to +// test the asynchronous instrument path. The alternative SDK uses +// no-op implementations for its synchronous instruments, and the six +// asynchronous instrument types are created here to test that +// instruments and callbacks are unwrapped inside this library. + +type altMeterProvider struct { + t *testing.T + meters []*altMeter + embedded.MeterProvider +} + +var _ metric.MeterProvider = &altMeterProvider{} + +func (amp *altMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + am := &altMeter{ + provider: amp, + } + amp.meters = append(amp.meters, am) + return am +} + +type altMeter struct { + provider *altMeterProvider + cbs []metric.Callback + embedded.Meter +} + +var _ metric.Meter = &altMeter{} + +type testAiCounter struct { + meter *altMeter + embedded.Int64ObservableCounter + metric.Int64Observable +} + +var _ metric.Int64ObservableCounter = &testAiCounter{} + +type testAfCounter struct { + meter *altMeter + embedded.Float64ObservableCounter + metric.Float64Observable +} + +var _ metric.Float64ObservableCounter = &testAfCounter{} + +type testAiUpDownCounter struct { + meter *altMeter + embedded.Int64ObservableUpDownCounter + metric.Int64Observable +} + +var _ metric.Int64ObservableUpDownCounter = &testAiUpDownCounter{} + +type testAfUpDownCounter struct { + meter *altMeter + embedded.Float64ObservableUpDownCounter + metric.Float64Observable +} + +var _ metric.Float64ObservableUpDownCounter = &testAfUpDownCounter{} + +type testAiGauge struct { + meter *altMeter + embedded.Int64ObservableGauge + metric.Int64Observable +} + +var _ metric.Int64ObservableGauge = &testAiGauge{} + +type testAfGauge struct { + meter *altMeter + embedded.Float64ObservableGauge + metric.Float64Observable +} + +var _ metric.Float64ObservableGauge = &testAfGauge{} + +type altRegistration struct { + cb metric.Callback + embedded.Registration +} + +type altObserver struct { + t *testing.T + embedded.Observer +} + +func (*altRegistration) Unregister() error { + return nil +} + +func (am *altMeter) Int64Counter(name string, _ ...metric.Int64CounterOption) (metric.Int64Counter, error) { + return noop.NewMeterProvider().Meter("noop").Int64Counter(name) +} + +func (am *altMeter) Int64UpDownCounter(name string, _ ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) { + return noop.NewMeterProvider().Meter("noop").Int64UpDownCounter(name) +} + +func (am *altMeter) Int64Histogram(name string, _ ...metric.Int64HistogramOption) (metric.Int64Histogram, error) { + return noop.NewMeterProvider().Meter("noop").Int64Histogram(name) +} + +func (am *altMeter) Int64Gauge(name string, _ ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { + return noop.NewMeterProvider().Meter("noop").Int64Gauge(name) +} + +func (am *altMeter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) { + return &testAiCounter{ + meter: am, + }, nil +} + +func (am *altMeter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) { + return &testAiUpDownCounter{ + meter: am, + }, nil +} + +func (am *altMeter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { + return &testAiGauge{ + meter: am, + }, nil +} + +func (am *altMeter) Float64Counter(name string, _ ...metric.Float64CounterOption) (metric.Float64Counter, error) { + return noop.NewMeterProvider().Meter("noop").Float64Counter(name) +} + +func (am *altMeter) Float64UpDownCounter(name string, _ ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) { + return noop.NewMeterProvider().Meter("noop").Float64UpDownCounter(name) +} + +func (am *altMeter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { + return noop.NewMeterProvider().Meter("noop").Float64Histogram(name) +} + +func (am *altMeter) Float64Gauge(name string, options ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { + return noop.NewMeterProvider().Meter("noop").Float64Gauge(name) +} + +func (am *altMeter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { + return &testAfCounter{ + meter: am, + }, nil +} + +func (am *altMeter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) { + return &testAfUpDownCounter{ + meter: am, + }, nil +} + +func (am *altMeter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) { + return &testAfGauge{ + meter: am, + }, nil +} + +func (am *altMeter) RegisterCallback(f metric.Callback, instruments ...metric.Observable) (metric.Registration, error) { + for _, inst := range instruments { + switch inst.(type) { + case *testAiCounter, *testAfCounter, + *testAiUpDownCounter, *testAfUpDownCounter, + *testAiGauge, *testAfGauge: + // OK! + default: + am.provider.t.Errorf("unexpected type %T", inst) + } + } + am.cbs = append(am.cbs, f) + return &altRegistration{cb: f}, nil +} + +func (ao *altObserver) ObserveFloat64(inst metric.Float64Observable, _ float64, _ ...metric.ObserveOption) { + ao.observe(inst) +} + +func (ao *altObserver) ObserveInt64(inst metric.Int64Observable, _ int64, _ ...metric.ObserveOption) { + ao.observe(inst) +} + +func (ao *altObserver) observe(inst any) { + switch inst.(type) { + case *testAiCounter, *testAfCounter, + *testAiUpDownCounter, *testAfUpDownCounter, + *testAiGauge, *testAfGauge: + // OK! + default: + ao.t.Errorf("unexpected type %T", inst) + } +} + +func TestMeterDelegation(t *testing.T) { + ResetForTest(t) + + amp := &altMeterProvider{t: t} + + gm := MeterProvider().Meter("test") + aic, err := gm.Int64ObservableCounter("test_counter_i") + require.NoError(t, err) + afc, err := gm.Float64ObservableCounter("test_counter_f") + require.NoError(t, err) + aiu, err := gm.Int64ObservableUpDownCounter("test_updowncounter_i") + require.NoError(t, err) + afu, err := gm.Float64ObservableUpDownCounter("test_updowncounter_f") + require.NoError(t, err) + aig, err := gm.Int64ObservableGauge("test_gauge_i") + require.NoError(t, err) + afg, err := gm.Float64ObservableGauge("test_gauge_f") + require.NoError(t, err) + + _, err = gm.RegisterCallback(func(_ context.Context, obs metric.Observer) error { + obs.ObserveInt64(aic, 10) + obs.ObserveFloat64(afc, 10) + obs.ObserveInt64(aiu, 10) + obs.ObserveFloat64(afu, 10) + obs.ObserveInt64(aig, 10) + obs.ObserveFloat64(afg, 10) + return nil + }, aic, afc, aiu, afu, aig, afg) + require.NoError(t, err) + + SetMeterProvider(amp) + + ctx := context.Background() + ao := &altObserver{t: t} + for _, meter := range amp.meters { + for _, cb := range meter.cbs { + require.NoError(t, cb(ctx, ao)) + } + } +} diff --git a/internal/global/instruments.go b/internal/global/instruments.go index 3a0cc42f6a4..ae92a425166 100644 --- a/internal/global/instruments.go +++ b/internal/global/instruments.go @@ -13,7 +13,7 @@ import ( // unwrapper unwraps to return the underlying instrument implementation. type unwrapper interface { - Unwrap() metric.Observable + unwrap() metric.Observable } type afCounter struct { @@ -40,7 +40,7 @@ func (i *afCounter) setDelegate(m metric.Meter) { i.delegate.Store(ctr) } -func (i *afCounter) Unwrap() metric.Observable { +func (i *afCounter) unwrap() metric.Observable { if ctr := i.delegate.Load(); ctr != nil { return ctr.(metric.Float64ObservableCounter) } @@ -71,7 +71,7 @@ func (i *afUpDownCounter) setDelegate(m metric.Meter) { i.delegate.Store(ctr) } -func (i *afUpDownCounter) Unwrap() metric.Observable { +func (i *afUpDownCounter) unwrap() metric.Observable { if ctr := i.delegate.Load(); ctr != nil { return ctr.(metric.Float64ObservableUpDownCounter) } @@ -102,7 +102,7 @@ func (i *afGauge) setDelegate(m metric.Meter) { i.delegate.Store(ctr) } -func (i *afGauge) Unwrap() metric.Observable { +func (i *afGauge) unwrap() metric.Observable { if ctr := i.delegate.Load(); ctr != nil { return ctr.(metric.Float64ObservableGauge) } @@ -133,7 +133,7 @@ func (i *aiCounter) setDelegate(m metric.Meter) { i.delegate.Store(ctr) } -func (i *aiCounter) Unwrap() metric.Observable { +func (i *aiCounter) unwrap() metric.Observable { if ctr := i.delegate.Load(); ctr != nil { return ctr.(metric.Int64ObservableCounter) } @@ -164,7 +164,7 @@ func (i *aiUpDownCounter) setDelegate(m metric.Meter) { i.delegate.Store(ctr) } -func (i *aiUpDownCounter) Unwrap() metric.Observable { +func (i *aiUpDownCounter) unwrap() metric.Observable { if ctr := i.delegate.Load(); ctr != nil { return ctr.(metric.Int64ObservableUpDownCounter) } @@ -195,7 +195,7 @@ func (i *aiGauge) setDelegate(m metric.Meter) { i.delegate.Store(ctr) } -func (i *aiGauge) Unwrap() metric.Observable { +func (i *aiGauge) unwrap() metric.Observable { if ctr := i.delegate.Load(); ctr != nil { return ctr.(metric.Int64ObservableGauge) } diff --git a/internal/global/instruments_test.go b/internal/global/instruments_test.go index 48c772e3b8a..74a89892bb8 100644 --- a/internal/global/instruments_test.go +++ b/internal/global/instruments_test.go @@ -57,19 +57,19 @@ func TestAsyncInstrumentSetDelegateConcurrentSafe(t *testing.T) { t.Run("Float64", func(t *testing.T) { t.Run("Counter", func(t *testing.T) { delegate := &afCounter{} - f := func(float64) { _ = delegate.Unwrap() } + f := func(float64) { _ = delegate.unwrap() } testFloat64ConcurrentSafe(f, delegate.setDelegate) }) t.Run("UpDownCounter", func(t *testing.T) { delegate := &afUpDownCounter{} - f := func(float64) { _ = delegate.Unwrap() } + f := func(float64) { _ = delegate.unwrap() } testFloat64ConcurrentSafe(f, delegate.setDelegate) }) t.Run("Gauge", func(t *testing.T) { delegate := &afGauge{} - f := func(float64) { _ = delegate.Unwrap() } + f := func(float64) { _ = delegate.unwrap() } testFloat64ConcurrentSafe(f, delegate.setDelegate) }) }) @@ -79,19 +79,19 @@ func TestAsyncInstrumentSetDelegateConcurrentSafe(t *testing.T) { t.Run("Int64", func(t *testing.T) { t.Run("Counter", func(t *testing.T) { delegate := &aiCounter{} - f := func(int64) { _ = delegate.Unwrap() } + f := func(int64) { _ = delegate.unwrap() } testInt64ConcurrentSafe(f, delegate.setDelegate) }) t.Run("UpDownCounter", func(t *testing.T) { delegate := &aiUpDownCounter{} - f := func(int64) { _ = delegate.Unwrap() } + f := func(int64) { _ = delegate.unwrap() } testInt64ConcurrentSafe(f, delegate.setDelegate) }) t.Run("Gauge", func(t *testing.T) { delegate := &aiGauge{} - f := func(int64) { _ = delegate.Unwrap() } + f := func(int64) { _ = delegate.unwrap() } testInt64ConcurrentSafe(f, delegate.setDelegate) }) }) diff --git a/internal/global/meter.go b/internal/global/meter.go index e3db438a09f..78520e8d6e9 100644 --- a/internal/global/meter.go +++ b/internal/global/meter.go @@ -5,6 +5,7 @@ package global // import "go.opentelemetry.io/otel/internal/global" import ( "container/list" + "context" "reflect" "sync" @@ -472,8 +473,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) defer m.mtx.Unlock() if m.delegate != nil { - insts = unwrapInstruments(insts) - return m.delegate.RegisterCallback(f, insts...) + return m.delegate.RegisterCallback(unwrapCallback(f), unwrapInstruments(insts)...) } reg := ®istration{instruments: insts, function: f} @@ -487,15 +487,11 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) return reg, nil } -type wrapped interface { - unwrap() metric.Observable -} - func unwrapInstruments(instruments []metric.Observable) []metric.Observable { out := make([]metric.Observable, 0, len(instruments)) for _, inst := range instruments { - if in, ok := inst.(wrapped); ok { + if in, ok := inst.(unwrapper); ok { out = append(out, in.unwrap()) } else { out = append(out, inst) @@ -515,9 +511,61 @@ type registration struct { unregMu sync.Mutex } -func (c *registration) setDelegate(m metric.Meter) { - insts := unwrapInstruments(c.instruments) +type unwrapObs struct { + embedded.Observer + obs metric.Observer +} + +// unwrapFloat64Observable returns an expected metric.Float64Observable after +// unwrapping the global object. +func unwrapFloat64Observable(inst metric.Float64Observable) metric.Float64Observable { + if unwrapped, ok := inst.(unwrapper); ok { + if floatObs, ok := unwrapped.unwrap().(metric.Float64Observable); ok { + // Note: if the unwrapped object does not + // unwrap as an observable for either of the + // predicates here, it means an internal bug in + // this package. We avoid logging an error in + // this case, because the SDK has to try its + // own type conversion on the object. The SDK + // will see this and be forced to respond with + // its own error. + // + // This code uses a double-nested if statement + // to avoid creating a branch that is + // impossible to cover. + inst = floatObs + } + } + return inst +} + +// unwrapInt64Observable returns an expected metric.Int64Observable after +// unwrapping the global object. +func unwrapInt64Observable(inst metric.Int64Observable) metric.Int64Observable { + if unwrapped, ok := inst.(unwrapper); ok { + if unint, ok := unwrapped.unwrap().(metric.Int64Observable); ok { + // See the comment in unwrapFloat64Observable(). + inst = unint + } + } + return inst +} + +func (uo *unwrapObs) ObserveFloat64(inst metric.Float64Observable, value float64, opts ...metric.ObserveOption) { + uo.obs.ObserveFloat64(unwrapFloat64Observable(inst), value, opts...) +} + +func (uo *unwrapObs) ObserveInt64(inst metric.Int64Observable, value int64, opts ...metric.ObserveOption) { + uo.obs.ObserveInt64(unwrapInt64Observable(inst), value, opts...) +} +func unwrapCallback(f metric.Callback) metric.Callback { + return func(ctx context.Context, obs metric.Observer) error { + return f(ctx, &unwrapObs{obs: obs}) + } +} + +func (c *registration) setDelegate(m metric.Meter) { c.unregMu.Lock() defer c.unregMu.Unlock() @@ -526,7 +574,7 @@ func (c *registration) setDelegate(m metric.Meter) { return } - reg, err := m.RegisterCallback(c.function, insts...) + reg, err := m.RegisterCallback(unwrapCallback(c.function), unwrapInstruments(c.instruments)...) if err != nil { GetErrorHandler().Handle(err) return diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index abff4650e1a..44d39b35dfe 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -444,13 +444,6 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) reg := newObserver() var errs multierror for _, inst := range insts { - // Unwrap any global. - if u, ok := inst.(interface { - Unwrap() metric.Observable - }); ok { - inst = u.Unwrap() - } - switch o := inst.(type) { case int64Observable: if err := o.registerable(m); err != nil { @@ -521,16 +514,6 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ... switch conv := o.(type) { case float64Observable: oImpl = conv - case interface { - Unwrap() metric.Observable - }: - // Unwrap any global. - async := conv.Unwrap() - var ok bool - if oImpl, ok = async.(float64Observable); !ok { - global.Error(errUnknownObserver, "failed to record asynchronous") - return - } default: global.Error(errUnknownObserver, "failed to record") return @@ -556,16 +539,6 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric switch conv := o.(type) { case int64Observable: oImpl = conv - case interface { - Unwrap() metric.Observable - }: - // Unwrap any global. - async := conv.Unwrap() - var ok bool - if oImpl, ok = async.(int64Observable); !ok { - global.Error(errUnknownObserver, "failed to record asynchronous") - return - } default: global.Error(errUnknownObserver, "failed to record") return