Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/custom_buckets #4313

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (ng *engine) buildChainWithNativeMiddlewares(fr featuredRoutes, route Route
chn = chn.Append(ng.getLogHandler())
}
if ng.conf.Middlewares.Prometheus {
chn = chn.Append(handler.PrometheusHandler(route.Path, route.Method))
chn = chn.Append(handler.PrometheusHandler(route.Path, route.Method, fr.metricReqDurBuckets))
}
if ng.conf.Middlewares.MaxConns {
chn = chn.Append(handler.MaxConnsHandler(ng.conf.MaxConns))
Expand Down
31 changes: 20 additions & 11 deletions rest/handler/prometheushandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,7 @@ import (
const serverNamespace = "http_server"

var (
metricServerReqDur = metric.NewHistogramVec(&metric.HistogramVecOpts{
Namespace: serverNamespace,
Subsystem: "requests",
Name: "duration_ms",
Help: "http server requests duration(ms).",
Labels: []string{"path", "method", "code"},
Buckets: []float64{5, 10, 25, 50, 100, 250, 500, 750, 1000},
})

defaultDurationBuckets = []float64{1, 2, 5, 10, 25, 50, 100, 250, 500, 1000, 2000, 5000}
metricServerReqCodeTotal = metric.NewCounterVec(&metric.CounterVecOpts{
Namespace: serverNamespace,
Subsystem: "requests",
Expand All @@ -31,18 +23,35 @@ var (
)

// PrometheusHandler returns a middleware that reports stats to prometheus.
func PrometheusHandler(path, method string) func(http.Handler) http.Handler {
func PrometheusHandler(path, method string, buckets []float64) func(http.Handler) http.Handler {
if len(buckets) == 0 {
buckets = defaultDurationBuckets
}

metricDurationHistogram := initMetricServerReqDur(buckets)

return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
startTime := timex.Now()
cw := response.NewWithCodeResponseWriter(w)
defer func() {
code := strconv.Itoa(cw.Code)
metricServerReqDur.Observe(timex.Since(startTime).Milliseconds(), path, method, code)
metricDurationHistogram.Observe(timex.Since(startTime).Milliseconds(), path, method, code)
metricServerReqCodeTotal.Inc(path, method, code)
}()

next.ServeHTTP(cw, r)
})
}
}

func initMetricServerReqDur(buckets []float64) metric.HistogramVec {
return metric.NewHistogramVec(&metric.HistogramVecOpts{
Namespace: serverNamespace,
Subsystem: "requests",
Name: "duration_ms",
Help: "http server requests duration(ms).",
Labels: []string{"path", "method", "code"},
Buckets: buckets,
})
}
5 changes: 3 additions & 2 deletions rest/handler/prometheushandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/zeromicro/go-zero/core/prometheus"
)

func TestPromMetricHandler_Disabled(t *testing.T) {
promMetricHandler := PrometheusHandler("/user/login", http.MethodGet)
promMetricHandler := PrometheusHandler("/user/login", http.MethodGet, nil)
handler := promMetricHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
Expand All @@ -26,7 +27,7 @@ func TestPromMetricHandler_Enabled(t *testing.T) {
Host: "localhost",
Path: "/",
})
promMetricHandler := PrometheusHandler("/user/login", http.MethodGet)
promMetricHandler := PrometheusHandler("/user/login", http.MethodGet, nil)
handler := promMetricHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
Expand Down
7 changes: 7 additions & 0 deletions rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ func WithMaxBytes(maxBytes int64) RouteOption {
}
}

// WithMetricsReqDurBuckets returns a RouteOption to set metrics request duration buckets.
func WithMetricsReqDurBuckets(buckets []float64) RouteOption {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define buckets for each route?

return func(r *featuredRoutes) {
r.metricReqDurBuckets = buckets
}
}

// WithMiddlewares adds given middlewares to given routes.
func WithMiddlewares(ms []Middleware, rs ...Route) []Route {
for i := len(ms) - 1; i >= 0; i-- {
Expand Down
11 changes: 11 additions & 0 deletions rest/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/stretchr/testify/assert"

"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx/logtest"
"github.com/zeromicro/go-zero/rest/chain"
Expand Down Expand Up @@ -134,6 +135,16 @@ func TestWithMaxBytes(t *testing.T) {
assert.Equal(t, int64(maxBytes), fr.maxBytes)
}

func TestWithMetricReqDurBuckets(t *testing.T) {
const (
bucket1 = 0.1
bucket2 = 0.5
)
var fr featuredRoutes
WithMetricsReqDurBuckets([]float64{bucket1, bucket2})(&fr)
assert.Equal(t, []float64{bucket1, bucket2}, fr.metricReqDurBuckets)
}

func TestWithMiddleware(t *testing.T) {
m := make(map[string]string)
rt := router.NewRouter()
Expand Down
13 changes: 7 additions & 6 deletions rest/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ type (
}

featuredRoutes struct {
timeout time.Duration
priority bool
jwt jwtSetting
signature signatureSetting
routes []Route
maxBytes int64
timeout time.Duration
priority bool
jwt jwtSetting
signature signatureSetting
routes []Route
maxBytes int64
metricReqDurBuckets []float64
}
)
7 changes: 5 additions & 2 deletions zrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package zrpc
import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/zrpc/internal"
"github.com/zeromicro/go-zero/zrpc/internal/auth"
"github.com/zeromicro/go-zero/zrpc/internal/clientinterceptors"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

var (
Expand All @@ -25,6 +26,8 @@ var (
WithTransportCredentials = internal.WithTransportCredentials
// WithUnaryClientInterceptor is an alias of internal.WithUnaryClientInterceptor.
WithUnaryClientInterceptor = internal.WithUnaryClientInterceptor
// WithClientMetricReqDurBuckets is an alias of internal.WithMetricReqDurBuckets.
WithClientMetricReqDurBuckets = internal.WithMetricReqDurBuckets
)

type (
Expand Down
31 changes: 20 additions & 11 deletions zrpc/internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"strings"
"time"

"github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c"
"github.com/zeromicro/go-zero/zrpc/internal/clientinterceptors"
"github.com/zeromicro/go-zero/zrpc/resolver"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/zeromicro/go-zero/zrpc/internal/balancer/p2c"
"github.com/zeromicro/go-zero/zrpc/internal/clientinterceptors"
"github.com/zeromicro/go-zero/zrpc/resolver"
)

const (
Expand All @@ -32,10 +33,11 @@ type (

// A ClientOptions is a client options.
ClientOptions struct {
NonBlock bool
Timeout time.Duration
Secure bool
DialOptions []grpc.DialOption
NonBlock bool
Timeout time.Duration
Secure bool
DialOptions []grpc.DialOption
MetricReqDurBuckets []float64
}

// ClientOption defines the method to customize a ClientOptions.
Expand Down Expand Up @@ -84,7 +86,7 @@ func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {
}

options = append(options,
grpc.WithChainUnaryInterceptor(c.buildUnaryInterceptors(cliOpts.Timeout)...),
grpc.WithChainUnaryInterceptor(c.buildUnaryInterceptors(cliOpts)...),
grpc.WithChainStreamInterceptor(c.buildStreamInterceptors()...),
)

Expand All @@ -101,7 +103,7 @@ func (c *client) buildStreamInterceptors() []grpc.StreamClientInterceptor {
return interceptors
}

func (c *client) buildUnaryInterceptors(timeout time.Duration) []grpc.UnaryClientInterceptor {
func (c *client) buildUnaryInterceptors(cliOpts ClientOptions) []grpc.UnaryClientInterceptor {
var interceptors []grpc.UnaryClientInterceptor

if c.middlewares.Trace {
Expand All @@ -111,13 +113,13 @@ func (c *client) buildUnaryInterceptors(timeout time.Duration) []grpc.UnaryClien
interceptors = append(interceptors, clientinterceptors.DurationInterceptor)
}
if c.middlewares.Prometheus {
interceptors = append(interceptors, clientinterceptors.PrometheusInterceptor)
interceptors = append(interceptors, clientinterceptors.PrometheusInterceptor(cliOpts.MetricReqDurBuckets))
}
if c.middlewares.Breaker {
interceptors = append(interceptors, clientinterceptors.BreakerInterceptor)
}
if c.middlewares.Timeout {
interceptors = append(interceptors, clientinterceptors.TimeoutInterceptor(timeout))
interceptors = append(interceptors, clientinterceptors.TimeoutInterceptor(cliOpts.Timeout))
}

return interceptors
Expand Down Expand Up @@ -152,6 +154,13 @@ func WithDialOption(opt grpc.DialOption) ClientOption {
}
}

// WithMetricReqDurBuckets returns a func to customize a ClientOptions with given buckets.
func WithMetricReqDurBuckets(buckets []float64) ClientOption {
return func(options *ClientOptions) {
options.MetricReqDurBuckets = buckets
}
}

// WithNonBlock sets the dialing to be nonblock.
func WithNonBlock() ClientOption {
return func(options *ClientOptions) {
Expand Down
7 changes: 7 additions & 0 deletions zrpc/internal/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ func TestWithStreamClientInterceptor(t *testing.T) {
assert.Equal(t, 1, len(options.DialOptions))
}

func TestWithMetricReqDurBuckets(t *testing.T) {
var options ClientOptions
opt := WithMetricReqDurBuckets([]float64{0.1, 0.2})
opt(&options)
assert.Equal(t, []float64{0.1, 0.2}, options.MetricReqDurBuckets)
}

func TestWithTransportCredentials(t *testing.T) {
var options ClientOptions
opt := WithTransportCredentials(nil)
Expand Down
48 changes: 30 additions & 18 deletions zrpc/internal/clientinterceptors/prometheusinterceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,17 @@ import (
"context"
"strconv"

"github.com/zeromicro/go-zero/core/metric"
"github.com/zeromicro/go-zero/core/timex"
"google.golang.org/grpc"
"google.golang.org/grpc/status"

"github.com/zeromicro/go-zero/core/metric"
"github.com/zeromicro/go-zero/core/timex"
)

const clientNamespace = "rpc_client"

var (
metricClientReqDur = metric.NewHistogramVec(&metric.HistogramVecOpts{
Namespace: clientNamespace,
Subsystem: "requests",
Name: "duration_ms",
Help: "rpc client requests duration(ms).",
Labels: []string{"method"},
Buckets: []float64{1, 2, 5, 10, 25, 50, 100, 250, 500, 1000, 2000, 5000},
})

defaultDurationBuckets = []float64{1, 2, 5, 10, 25, 50, 100, 250, 500, 1000, 2000, 5000}
metricClientReqCodeTotal = metric.NewCounterVec(&metric.CounterVecOpts{
Namespace: clientNamespace,
Subsystem: "requests",
Expand All @@ -32,11 +25,30 @@ var (
)

// PrometheusInterceptor is an interceptor that reports to prometheus server.
func PrometheusInterceptor(ctx context.Context, method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
startTime := timex.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
metricClientReqDur.Observe(timex.Since(startTime).Milliseconds(), method)
metricClientReqCodeTotal.Inc(method, strconv.Itoa(int(status.Code(err))))
return err
func PrometheusInterceptor(buckets []float64) grpc.UnaryClientInterceptor {
if len(buckets) == 0 {
buckets = defaultDurationBuckets
}

metricDurationHistogram := initMetricClientReqDur(buckets)

return func(ctx context.Context, method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
startTime := timex.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
metricDurationHistogram.Observe(timex.Since(startTime).Milliseconds(), method)
metricClientReqCodeTotal.Inc(method, strconv.Itoa(int(status.Code(err))))
return err
}
}

func initMetricClientReqDur(buckets []float64) metric.HistogramVec {
return metric.NewHistogramVec(&metric.HistogramVecOpts{
Namespace: clientNamespace,
Subsystem: "requests",
Name: "duration_ms",
Help: "rpc client requests duration(ms).",
Labels: []string{"method"},
Buckets: buckets,
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/prometheus"
"google.golang.org/grpc"

"github.com/zeromicro/go-zero/core/prometheus"
)

func TestPromMetricInterceptor(t *testing.T) {
Expand Down Expand Up @@ -39,7 +40,7 @@ func TestPromMetricInterceptor(t *testing.T) {
})
}
cc := new(grpc.ClientConn)
err := PrometheusInterceptor(context.Background(), "/foo", nil, nil, cc,
err := PrometheusInterceptor(nil)(context.Background(), "/foo", nil, nil, cc,
func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn,
opts ...grpc.CallOption) error {
return test.err
Expand Down
Loading
Loading