Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelgrpc: Fix NewClientHandler to emit proper request/response metrics #6250

Merged
merged 5 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Transform nil attribute values to `log.Value` zero value instead of panicking in `go.opentelemetry.io/contrib/bridges/otellogrus`. (#6237)
- Transform nil attribute values to `log.Value` zero value instead of panicking in `go.opentelemetry.io/contrib/bridges/otelzap`. (#6237)
- Transform nil attribute values to `log.Value` zero value instead of `log.StringValue("<nil>")` in `go.opentelemetry.io/contrib/bridges/otelslog`. (#6246)
- Fix request/response metrics in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`, as in StatsHandler `in` means response for Client and request for Server. (#6250)
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
52 changes: 35 additions & 17 deletions instrumentation/google.golang.org/grpc/otelgrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type config struct {
tracer trace.Tracer
meter metric.Meter

rpcDuration metric.Float64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
rpcDuration metric.Float64Histogram
rpcInBytes metric.Int64Histogram
rpcOutBytes metric.Int64Histogram
rpcInMessages metric.Int64Histogram
rpcOutMessages metric.Int64Histogram
}

// Option applies an option value for a config.
Expand Down Expand Up @@ -96,46 +96,64 @@ func newConfig(opts []Option, role string) *config {
}
}

c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
rpcRequestSize, err := c.meter.Int64Histogram("rpc."+role+".request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
if c.rpcRequestSize == nil {
c.rpcRequestSize = noop.Int64Histogram{}
if rpcRequestSize == nil {
rpcRequestSize = noop.Int64Histogram{}
}
}

c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
rpcResponseSize, err := c.meter.Int64Histogram("rpc."+role+".response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
if c.rpcResponseSize == nil {
c.rpcResponseSize = noop.Int64Histogram{}
if rpcResponseSize == nil {
rpcResponseSize = noop.Int64Histogram{}
}
}

c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
rpcRequestsPerRPC, err := c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
if c.rpcRequestsPerRPC == nil {
c.rpcRequestsPerRPC = noop.Int64Histogram{}
if rpcRequestsPerRPC == nil {
rpcRequestsPerRPC = noop.Int64Histogram{}
}
}

c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
rpcResponsesPerRPC, err := c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
if c.rpcResponsesPerRPC == nil {
c.rpcResponsesPerRPC = noop.Int64Histogram{}
if rpcResponsesPerRPC == nil {
rpcResponsesPerRPC = noop.Int64Histogram{}
}
}

switch role {
case "client":
c.rpcInBytes = rpcResponseSize
c.rpcInMessages = rpcResponsesPerRPC
c.rpcOutBytes = rpcRequestSize
c.rpcOutMessages = rpcRequestsPerRPC
case "server":
c.rpcInBytes = rpcRequestSize
c.rpcInMessages = rpcRequestsPerRPC
c.rpcOutBytes = rpcResponseSize
c.rpcOutMessages = rpcResponsesPerRPC
default:
c.rpcInBytes = noop.Int64Histogram{}
c.rpcInMessages = noop.Int64Histogram{}
c.rpcOutBytes = noop.Int64Histogram{}
c.rpcOutMessages = noop.Int64Histogram{}
}

return c
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func TestNilInstruments(t *testing.T) {

ctx := context.Background()
assert.NotPanics(t, func() { c.rpcDuration.Record(ctx, 0) }, "rpcDuration")
assert.NotPanics(t, func() { c.rpcRequestSize.Record(ctx, 0) }, "rpcRequestSize")
assert.NotPanics(t, func() { c.rpcResponseSize.Record(ctx, 0) }, "rpcResponseSize")
assert.NotPanics(t, func() { c.rpcRequestsPerRPC.Record(ctx, 0) }, "rpcRequestsPerRPC")
assert.NotPanics(t, func() { c.rpcResponsesPerRPC.Record(ctx, 0) }, "rpcResponsesPerRPC")
assert.NotPanics(t, func() { c.rpcInBytes.Record(ctx, 0) }, "rpcInBytes")
assert.NotPanics(t, func() { c.rpcOutBytes.Record(ctx, 0) }, "rpcOutBytes")
assert.NotPanics(t, func() { c.rpcInMessages.Record(ctx, 0) }, "rpcInMessages")
assert.NotPanics(t, func() { c.rpcOutMessages.Record(ctx, 0) }, "rpcOutMessages")
}

type meterProvider struct {
Expand Down
23 changes: 12 additions & 11 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@ import (
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
)

type gRPCContextKey struct{}

type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
record bool
inMessages int64
outMessages int64
metricAttrs []attribute.KeyValue
record bool
}

type serverHandler struct {
Expand Down Expand Up @@ -150,8 +151,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
messageId = atomic.AddInt64(&gctx.inMessages, 1)
c.rpcInBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

if c.ReceivedEvent {
Expand All @@ -166,8 +167,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
}
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
messageId = atomic.AddInt64(&gctx.outMessages, 1)
c.rpcOutBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

if c.SentEvent {
Expand Down Expand Up @@ -213,8 +214,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool

c.rpcDuration.Record(ctx, elapsedTime, recordOpts...)
if gctx != nil {
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), recordOpts...)
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), recordOpts...)
c.rpcInMessages.Record(ctx, atomic.LoadInt64(&gctx.inMessages), recordOpts...)
c.rpcOutMessages.Record(ctx, atomic.LoadInt64(&gctx.outMessages), recordOpts...)
}
default:
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/test"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"

testpb "google.golang.org/grpc/interop/grpc_testing"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/test"
)

var (
Expand Down Expand Up @@ -803,10 +802,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
Max: metricdata.NewExtrema(int64(314167)),
Min: metricdata.NewExtrema(int64(314167)),
Max: metricdata.NewExtrema(int64(271840)),
Min: metricdata.NewExtrema(int64(271840)),
Count: 1,
Sum: 314167,
Sum: 271840,
},
{
Attributes: attribute.NewSet(
Expand All @@ -815,11 +814,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 4,
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45912)),
Min: metricdata.NewExtrema(int64(12)),
Count: 4,
Sum: 74948,
},
{
Attributes: attribute.NewSet(
Expand All @@ -828,11 +827,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
Count: 4,
Sum: 93082,
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(21)),
Min: metricdata.NewExtrema(int64(21)),
Count: 1,
Sum: 21,
},
{
Attributes: attribute.NewSet(
Expand All @@ -841,11 +840,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45918)),
Min: metricdata.NewExtrema(int64(16)),
Count: 4,
Sum: 93082,
Sum: 74969,
},
},
},
Expand Down Expand Up @@ -878,10 +877,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
Max: metricdata.NewExtrema(int64(271840)),
Min: metricdata.NewExtrema(int64(271840)),
Max: metricdata.NewExtrema(int64(314167)),
Min: metricdata.NewExtrema(int64(314167)),
Count: 1,
Sum: 271840,
Sum: 314167,
},
{
Attributes: attribute.NewSet(
Expand All @@ -890,11 +889,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45912)),
Min: metricdata.NewExtrema(int64(12)),
Count: 4,
Sum: 74948,
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 4,
},
{
Attributes: attribute.NewSet(
Expand All @@ -903,11 +902,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(21)),
Min: metricdata.NewExtrema(int64(21)),
Count: 1,
Sum: 21,
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
Count: 4,
Sum: 93082,
},
{
Attributes: attribute.NewSet(
Expand All @@ -916,11 +915,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45918)),
Min: metricdata.NewExtrema(int64(16)),
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
Count: 4,
Sum: 74969,
Sum: 93082,
},
},
},
Expand Down Expand Up @@ -969,10 +968,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 1,
Sum: 4,
},
{
Attributes: attribute.NewSet(
Expand All @@ -983,10 +982,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Count: 1,
Sum: 4,
Sum: 1,
},
{
Attributes: attribute.NewSet(
Expand Down Expand Up @@ -1049,10 +1048,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Count: 1,
Sum: 4,
Sum: 1,
},
{
Attributes: attribute.NewSet(
Expand All @@ -1063,10 +1062,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 1,
Sum: 4,
},
{
Attributes: attribute.NewSet(
Expand Down
Loading