Skip to content

Commit

Permalink
Fix request/response metrics. In StatsHandler In means response for C…
Browse files Browse the repository at this point in the history
…lient

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Oct 15, 2024
1 parent 9cf5701 commit 759e042
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 81 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Fixed

- Fix request/response metrics. In StatsHandler In means response for Client. (#6250)

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
52 changes: 35 additions & 17 deletions instrumentation/google.golang.org/grpc/otelgrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type config struct {
tracer trace.Tracer
meter metric.Meter

rpcDuration metric.Float64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
rpcDuration metric.Float64Histogram
rpcInBytes metric.Int64Histogram
rpcOutBytes metric.Int64Histogram
rpcInMessages metric.Int64Histogram
rpcOutMessages metric.Int64Histogram
}

// Option applies an option value for a config.
Expand Down Expand Up @@ -96,46 +96,64 @@ func newConfig(opts []Option, role string) *config {
}
}

c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
rpcRequestSize, err := c.meter.Int64Histogram("rpc."+role+".request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
if c.rpcRequestSize == nil {
c.rpcRequestSize = noop.Int64Histogram{}
if rpcRequestSize == nil {
rpcRequestSize = noop.Int64Histogram{}
}
}

c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
rpcResponseSize, err := c.meter.Int64Histogram("rpc."+role+".response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
if c.rpcResponseSize == nil {
c.rpcResponseSize = noop.Int64Histogram{}
if rpcResponseSize == nil {
rpcResponseSize = noop.Int64Histogram{}
}
}

c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
rpcRequestsPerRPC, err := c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
if c.rpcRequestsPerRPC == nil {
c.rpcRequestsPerRPC = noop.Int64Histogram{}
if rpcRequestsPerRPC == nil {
rpcRequestsPerRPC = noop.Int64Histogram{}
}
}

c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
rpcResponsesPerRPC, err := c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
if c.rpcResponsesPerRPC == nil {
c.rpcResponsesPerRPC = noop.Int64Histogram{}
if rpcResponsesPerRPC == nil {
rpcResponsesPerRPC = noop.Int64Histogram{}
}
}

switch role {
case "client":
c.rpcInBytes = rpcResponseSize
c.rpcInMessages = rpcResponsesPerRPC
c.rpcOutBytes = rpcRequestSize
c.rpcOutMessages = rpcRequestsPerRPC
case "server":
c.rpcInBytes = rpcRequestSize
c.rpcInMessages = rpcRequestsPerRPC
c.rpcOutBytes = rpcResponseSize
c.rpcOutMessages = rpcResponsesPerRPC
default:
c.rpcInBytes = noop.Int64Histogram{}
c.rpcInMessages = noop.Int64Histogram{}
c.rpcOutBytes = noop.Int64Histogram{}
c.rpcOutMessages = noop.Int64Histogram{}
}

return c
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func TestNilInstruments(t *testing.T) {

ctx := context.Background()
assert.NotPanics(t, func() { c.rpcDuration.Record(ctx, 0) }, "rpcDuration")
assert.NotPanics(t, func() { c.rpcRequestSize.Record(ctx, 0) }, "rpcRequestSize")
assert.NotPanics(t, func() { c.rpcResponseSize.Record(ctx, 0) }, "rpcResponseSize")
assert.NotPanics(t, func() { c.rpcRequestsPerRPC.Record(ctx, 0) }, "rpcRequestsPerRPC")
assert.NotPanics(t, func() { c.rpcResponsesPerRPC.Record(ctx, 0) }, "rpcResponsesPerRPC")
assert.NotPanics(t, func() { c.rpcInBytes.Record(ctx, 0) }, "rpcInBytes")
assert.NotPanics(t, func() { c.rpcOutBytes.Record(ctx, 0) }, "rpcOutBytes")
assert.NotPanics(t, func() { c.rpcInMessages.Record(ctx, 0) }, "rpcInMessages")
assert.NotPanics(t, func() { c.rpcOutMessages.Record(ctx, 0) }, "rpcOutMessages")
}

type meterProvider struct {
Expand Down
23 changes: 12 additions & 11 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@ import (
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
)

type gRPCContextKey struct{}

type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
record bool
inMessages int64
outMessages int64
metricAttrs []attribute.KeyValue
record bool
}

type serverHandler struct {
Expand Down Expand Up @@ -150,8 +151,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
messageId = atomic.AddInt64(&gctx.inMessages, 1)
c.rpcInBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

if c.ReceivedEvent {
Expand All @@ -166,8 +167,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
}
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
messageId = atomic.AddInt64(&gctx.outMessages, 1)
c.rpcOutBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

if c.SentEvent {
Expand Down Expand Up @@ -213,8 +214,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool

c.rpcDuration.Record(ctx, elapsedTime, recordOpts...)
if gctx != nil {
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), recordOpts...)
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), recordOpts...)
c.rpcInMessages.Record(ctx, atomic.LoadInt64(&gctx.inMessages), recordOpts...)
c.rpcOutMessages.Record(ctx, atomic.LoadInt64(&gctx.outMessages), recordOpts...)
}
default:
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/test"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/test"

"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
Expand Down Expand Up @@ -803,10 +804,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
Max: metricdata.NewExtrema(int64(314167)),
Min: metricdata.NewExtrema(int64(314167)),
Max: metricdata.NewExtrema(int64(271840)),
Min: metricdata.NewExtrema(int64(271840)),
Count: 1,
Sum: 314167,
Sum: 271840,
},
{
Attributes: attribute.NewSet(
Expand All @@ -815,11 +816,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 4,
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45912)),
Min: metricdata.NewExtrema(int64(12)),
Count: 4,
Sum: 74948,
},
{
Attributes: attribute.NewSet(
Expand All @@ -828,11 +829,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
Count: 4,
Sum: 93082,
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(21)),
Min: metricdata.NewExtrema(int64(21)),
Count: 1,
Sum: 21,
},
{
Attributes: attribute.NewSet(
Expand All @@ -841,11 +842,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45918)),
Min: metricdata.NewExtrema(int64(16)),
Count: 4,
Sum: 93082,
Sum: 74969,
},
},
},
Expand Down Expand Up @@ -878,10 +879,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
Max: metricdata.NewExtrema(int64(271840)),
Min: metricdata.NewExtrema(int64(271840)),
Max: metricdata.NewExtrema(int64(314167)),
Min: metricdata.NewExtrema(int64(314167)),
Count: 1,
Sum: 271840,
Sum: 314167,
},
{
Attributes: attribute.NewSet(
Expand All @@ -890,11 +891,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45912)),
Min: metricdata.NewExtrema(int64(12)),
Count: 4,
Sum: 74948,
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 4,
},
{
Attributes: attribute.NewSet(
Expand All @@ -903,11 +904,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(21)),
Min: metricdata.NewExtrema(int64(21)),
Count: 1,
Sum: 21,
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
Count: 4,
Sum: 93082,
},
{
Attributes: attribute.NewSet(
Expand All @@ -916,11 +917,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45918)),
Min: metricdata.NewExtrema(int64(16)),
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
Count: 4,
Sum: 74969,
Sum: 93082,
},
},
},
Expand Down Expand Up @@ -969,10 +970,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 1,
Sum: 4,
},
{
Attributes: attribute.NewSet(
Expand All @@ -983,10 +984,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Count: 1,
Sum: 4,
Sum: 1,
},
{
Attributes: attribute.NewSet(
Expand Down Expand Up @@ -1049,10 +1050,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Count: 1,
Sum: 4,
Sum: 1,
},
{
Attributes: attribute.NewSet(
Expand All @@ -1063,10 +1064,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 1,
Sum: 4,
},
{
Attributes: attribute.NewSet(
Expand Down

0 comments on commit 759e042

Please sign in to comment.