From b3a1ae1a2b0b1b2292d77dd8233a072bfb0495c9 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Fri, 18 Aug 2023 14:38:42 +0200 Subject: [PATCH] Add `Metrics()` to HTTP an GRPC proxies (#274) * agent/protocol: add Metrics API to proxy * agent/http: add support for metrics * agent/grpc: add metrics to grpc handler * agent/grpc: slightly refactor handler to reduce nesting --- pkg/agent/protocol/grpc/handler.go | 39 +++++--- pkg/agent/protocol/grpc/proxy.go | 10 +- pkg/agent/protocol/grpc/proxy_test.go | 127 +++++++++++++++++++++++++- pkg/agent/protocol/http/proxy.go | 12 +++ pkg/agent/protocol/http/proxy_test.go | 70 ++++++++++++++ pkg/agent/protocol/metricmap.go | 34 +++++++ pkg/agent/protocol/metricmap_test.go | 27 ++++++ pkg/agent/protocol/protocol.go | 12 +++ 8 files changed, 315 insertions(+), 16 deletions(-) create mode 100644 pkg/agent/protocol/metricmap.go create mode 100644 pkg/agent/protocol/metricmap_test.go diff --git a/pkg/agent/protocol/grpc/handler.go b/pkg/agent/protocol/grpc/handler.go index 4ed0cd8a..b0e1f4ec 100644 --- a/pkg/agent/protocol/grpc/handler.go +++ b/pkg/agent/protocol/grpc/handler.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/grafana/xk6-disruptor/pkg/agent/protocol" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -24,10 +25,11 @@ func clientStreamDescForProxy() *grpc.StreamDesc { } // NewHandler returns a StreamHandler that attempts to proxy all requests that are not registered in the server. -func NewHandler(disruption Disruption, forwardConn *grpc.ClientConn) grpc.StreamHandler { +func NewHandler(disruption Disruption, forwardConn *grpc.ClientConn, metrics *protocol.MetricMap) grpc.StreamHandler { handler := &handler{ disruption: disruption, forwardConn: forwardConn, + metrics: metrics, } // return the handler function @@ -37,6 +39,7 @@ func NewHandler(disruption Disruption, forwardConn *grpc.ClientConn) grpc.Stream type handler struct { disruption Disruption forwardConn *grpc.ClientConn + metrics *protocol.MetricMap } // contains verifies if a list of strings contains the given string @@ -52,27 +55,35 @@ func contains(list []string, target string) bool { // handles requests from the client. If selected for error injection, returns an error, // otherwise, forwards to the server transparently func (h *handler) streamHandler(_ interface{}, serverStream grpc.ServerStream) error { + h.metrics.Inc(protocol.MetricRequests) + fullMethodName, ok := grpc.MethodFromServerStream(serverStream) if !ok { return status.Errorf(codes.Internal, "ServerTransportStream not exists in context") } + // full method name has the form /service/method, we want the service serviceName := strings.Split(fullMethodName, "/")[1] - excluded := contains(h.disruption.Excluded, serviceName) - if !excluded { - if h.disruption.ErrorRate > 0 && rand.Float32() <= h.disruption.ErrorRate { - return h.injectError(serverStream) - } + if contains(h.disruption.Excluded, serviceName) { + h.metrics.Inc(protocol.MetricRequestsExcluded) + return h.transparentForward(serverStream) + } - // add delay - if h.disruption.AverageDelay > 0 { - delay := int64(h.disruption.AverageDelay) - if h.disruption.DelayVariation > 0 { - variation := int64(h.disruption.DelayVariation) - delay = delay + variation - 2*rand.Int63n(variation) - } - time.Sleep(time.Duration(delay)) + if rand.Float32() < h.disruption.ErrorRate { + h.metrics.Inc(protocol.MetricRequestsDisrupted) + return h.injectError(serverStream) + } + + // add delay + if h.disruption.AverageDelay > 0 { + h.metrics.Inc(protocol.MetricRequestsDisrupted) + + delay := int64(h.disruption.AverageDelay) + if h.disruption.DelayVariation > 0 { + variation := int64(h.disruption.DelayVariation) + delay = delay + variation - 2*rand.Int63n(variation) } + time.Sleep(time.Duration(delay)) } return h.transparentForward(serverStream) diff --git a/pkg/agent/protocol/grpc/proxy.go b/pkg/agent/protocol/grpc/proxy.go index 7de30073..3074de49 100644 --- a/pkg/agent/protocol/grpc/proxy.go +++ b/pkg/agent/protocol/grpc/proxy.go @@ -44,6 +44,7 @@ type proxy struct { config ProxyConfig disruption Disruption srv *grpc.Server + metrics *protocol.MetricMap } // NewProxy return a new Proxy @@ -79,6 +80,7 @@ func NewProxy(c ProxyConfig, d Disruption) (protocol.Proxy, error) { return &proxy{ disruption: d, config: c, + metrics: &protocol.MetricMap{}, }, nil } @@ -93,7 +95,7 @@ func (p *proxy) Start() error { if err != nil { return fmt.Errorf("error dialing %s: %w", p.config.UpstreamAddress, err) } - handler := NewHandler(p.disruption, conn) + handler := NewHandler(p.disruption, conn, p.metrics) p.srv = grpc.NewServer( grpc.UnknownServiceHandler(handler), @@ -120,6 +122,12 @@ func (p *proxy) Stop() error { return nil } +// Metrics returns runtime metrics for the proxy. +// TODO: Add metrics. +func (p *proxy) Metrics() map[string]uint { + return p.metrics.Map() +} + // Force stops the proxy without waiting for connections to drain // In grpc this action is a nop func (p *proxy) Force() error { diff --git a/pkg/agent/protocol/grpc/proxy_test.go b/pkg/agent/protocol/grpc/proxy_test.go index d32eadfd..8a4bb463 100644 --- a/pkg/agent/protocol/grpc/proxy_test.go +++ b/pkg/agent/protocol/grpc/proxy_test.go @@ -8,7 +8,9 @@ import ( "path/filepath" "testing" + "github.com/google/go-cmp/cmp" "github.com/google/uuid" + "github.com/grafana/xk6-disruptor/pkg/agent/protocol" "github.com/grafana/xk6-disruptor/pkg/testutils/grpc/ping" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -187,6 +189,7 @@ func Test_ProxyHandler(t *testing.T) { expectStatus codes.Code } + // TODO: Add test for excluded endpoints testCases := []TestCase{ { title: "default proxy", @@ -207,7 +210,7 @@ func Test_ProxyHandler(t *testing.T) { expectStatus: codes.OK, }, { - title: "error injection ", + title: "error injection", disruption: Disruption{ AverageDelay: 0, DelayVariation: 0, @@ -331,3 +334,125 @@ func Test_ProxyHandler(t *testing.T) { }) } } + +func Test_ProxyMetrics(t *testing.T) { + t.Parallel() + + type TestCase struct { + title string + disruption Disruption + expectedMetrics map[string]uint + } + + // TODO: Add test for excluded endpoints + testCases := []TestCase{ + { + title: "passthrough", + disruption: Disruption{ + AverageDelay: 0, + DelayVariation: 0, + ErrorRate: 0.0, + StatusCode: 0, + StatusMessage: "", + }, + expectedMetrics: map[string]uint{ + protocol.MetricRequests: 1, + }, + }, + { + title: "error injection", + disruption: Disruption{ + AverageDelay: 0, + DelayVariation: 0, + ErrorRate: 1.0, + StatusCode: int32(codes.Internal), + StatusMessage: "Internal server error", + }, + expectedMetrics: map[string]uint{ + protocol.MetricRequests: 1, + protocol.MetricRequestsDisrupted: 1, + }, + }, + } + + for _, tc := range testCases { + tc := tc + + t.Run(tc.title, func(t *testing.T) { + t.Parallel() + + // start test server in a random unix socket + serverSocket := filepath.Join(os.TempDir(), uuid.New().String()) + l, err := net.Listen("unix", serverSocket) + if err != nil { + t.Errorf("error starting test server in unix:%s: %v", serverSocket, err) + return + } + + srv := grpc.NewServer() + ping.RegisterPingServiceServer(srv, ping.NewPingServer()) + go func() { + if serr := srv.Serve(l); err != nil { + t.Logf("error in the server: %v", serr) + } + }() + + // start proxy in a random unix socket + proxySocket := filepath.Join(os.TempDir(), uuid.New().String()) + config := ProxyConfig{ + Network: "unix", + ListenAddress: proxySocket, + UpstreamAddress: fmt.Sprintf("unix:%s", serverSocket), + } + + proxy, err := NewProxy(config, tc.disruption) + if err != nil { + t.Errorf("error creating proxy: %v", err) + return + } + + defer func() { + _ = proxy.Stop() + }() + + go func() { + if perr := proxy.Start(); perr != nil { + t.Logf("error starting proxy: %v", perr) + } + }() + + // connect client to proxy + conn, err := grpc.DialContext( + context.TODO(), + fmt.Sprintf("unix:%s", proxySocket), + grpc.WithInsecure(), + ) + if err != nil { + t.Fatal(err) + } + + defer func() { + _ = conn.Close() + }() + + client := ping.NewPingServiceClient(conn) + + var headers metadata.MD + _, _ = client.Ping( + context.TODO(), + &ping.PingRequest{ + Error: 0, + Message: "ping", + }, + grpc.Header(&headers), + grpc.WaitForReady(true), + ) + + metrics := proxy.Metrics() + + if diff := cmp.Diff(tc.expectedMetrics, metrics); diff != "" { + t.Fatalf("expected metrics do not match returned:\n%s", diff) + } + }) + } +} diff --git a/pkg/agent/protocol/http/proxy.go b/pkg/agent/protocol/http/proxy.go index 46520c8f..e6a4c290 100644 --- a/pkg/agent/protocol/http/proxy.go +++ b/pkg/agent/protocol/http/proxy.go @@ -44,6 +44,7 @@ type proxy struct { config ProxyConfig disruption Disruption srv *http.Server + metrics protocol.MetricMap } // NewProxy return a new Proxy for HTTP requests @@ -85,6 +86,7 @@ type httpHandler struct { upstreamURL url.URL disruption Disruption client httpClient + metrics *protocol.MetricMap } // isExcluded checks whether a request should be proxied through without any kind of modification whatsoever. @@ -146,7 +148,10 @@ func (h *httpHandler) injectError(rw http.ResponseWriter, delay time.Duration) { } func (h *httpHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + h.metrics.Inc(protocol.MetricRequests) + if h.isExcluded(req) { + h.metrics.Inc(protocol.MetricRequestsExcluded) //nolint:contextcheck // Unclear which context the linter requires us to propagate here. h.forward(rw, req, 0) return @@ -159,6 +164,7 @@ func (h *httpHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } if h.disruption.ErrorRate > 0 && rand.Float32() <= h.disruption.ErrorRate { + h.metrics.Inc(protocol.MetricRequestsDisrupted) h.injectError(rw, delay) return } @@ -178,6 +184,7 @@ func (p *proxy) Start() error { upstreamURL: *upstreamURL, disruption: p.disruption, client: http.DefaultClient, + metrics: &p.metrics, } p.srv = &http.Server{ @@ -200,6 +207,11 @@ func (p *proxy) Stop() error { return nil } +// Metrics returns runtime metrics for the proxy. +func (p *proxy) Metrics() map[string]uint { + return p.metrics.Map() +} + // Force stops the proxy without waiting for connections to drain func (p *proxy) Force() error { if p.srv != nil { diff --git a/pkg/agent/protocol/http/proxy_test.go b/pkg/agent/protocol/http/proxy_test.go index fa466927..c3f40018 100644 --- a/pkg/agent/protocol/http/proxy_test.go +++ b/pkg/agent/protocol/http/proxy_test.go @@ -11,6 +11,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/grafana/xk6-disruptor/pkg/agent/protocol" ) // fakeHTTPClient mocks the execution of a request returning the predefines @@ -385,6 +386,7 @@ func Test_ProxyHandler(t *testing.T) { upstreamURL: *upstreamURL, client: client, disruption: tc.disruption, + metrics: &protocol.MetricMap{}, } reqURL := fmt.Sprintf("http://%s%s", tc.config.ListenAddress, tc.path) @@ -415,3 +417,71 @@ func Test_ProxyHandler(t *testing.T) { }) } } + +// TODO: This test covers metrics generated by the handler, but not the proxy. The reason for this is that the proxy is +// currently not easily testable, as it coupled with `http.ListenAndServe`. +func Test_Metrics(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + config Disruption + endpoints []string + expectedMetrics map[string]uint + }{ + { + name: "no requests", + expectedMetrics: map[string]uint{}, + }, + { + name: "requests", + config: Disruption{ + Excluded: []string{"/excluded"}, + ErrorRate: 1.0, + ErrorCode: http.StatusTeapot, + }, + endpoints: []string{"/included", "/excluded"}, + expectedMetrics: map[string]uint{ + protocol.MetricRequests: 2, + protocol.MetricRequestsExcluded: 1, + protocol.MetricRequestsDisrupted: 1, + }, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + upstreamServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + rw.WriteHeader(http.StatusOK) + })) + + upstreamURL, err := url.Parse(upstreamServer.URL) + if err != nil { + t.Fatalf("error parsing httptest url") + } + + metrics := protocol.MetricMap{} + + handler := &httpHandler{ + upstreamURL: *upstreamURL, + disruption: tc.config, + client: http.DefaultClient, + metrics: &metrics, + } + + proxyServer := httptest.NewServer(handler) + + for _, endpoint := range tc.endpoints { + _, err = http.Get(proxyServer.URL + endpoint) + if err != nil { + t.Fatalf("requesting %s: %v", endpoint, err) + } + } + + if diff := cmp.Diff(tc.expectedMetrics, metrics.Map()); diff != "" { + t.Fatalf("expected metrics do not match output:\n%s", diff) + } + }) + } +} diff --git a/pkg/agent/protocol/metricmap.go b/pkg/agent/protocol/metricmap.go new file mode 100644 index 00000000..68c7a0fc --- /dev/null +++ b/pkg/agent/protocol/metricmap.go @@ -0,0 +1,34 @@ +package protocol + +import "sync" + +// MetricMap is a simple storage for name-indexed counter metrics. +type MetricMap struct { + metrics map[string]uint + mutex sync.RWMutex +} + +// Inc increases the value of the specified counter by one. +func (m *MetricMap) Inc(name string) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.metrics == nil { + m.metrics = make(map[string]uint) + } + + m.metrics[name]++ +} + +// Map returns a map of the counters indexed by name. The returned map is a copy of the internal storage. +func (m *MetricMap) Map() map[string]uint { + m.mutex.RLock() + defer m.mutex.RUnlock() + + out := make(map[string]uint, len(m.metrics)) + for k, v := range m.metrics { + out[k] = v + } + + return out +} diff --git a/pkg/agent/protocol/metricmap_test.go b/pkg/agent/protocol/metricmap_test.go new file mode 100644 index 00000000..0e024557 --- /dev/null +++ b/pkg/agent/protocol/metricmap_test.go @@ -0,0 +1,27 @@ +package protocol_test + +import ( + "testing" + + "github.com/grafana/xk6-disruptor/pkg/agent/protocol" +) + +func TestMetricMap(t *testing.T) { + t.Parallel() + + t.Run("increases counters", func(t *testing.T) { + t.Parallel() + + const name = "foo_metric" + + mm := protocol.MetricMap{} + if current := mm.Map(); current[name] != 0 { + t.Fatalf("map should start containing zero") + } + + mm.Inc(name) + if updated := mm.Map(); updated[name] != 1 { + t.Fatalf("metric was not incremented") + } + }) +} diff --git a/pkg/agent/protocol/protocol.go b/pkg/agent/protocol/protocol.go index c87fc948..1048d954 100644 --- a/pkg/agent/protocol/protocol.go +++ b/pkg/agent/protocol/protocol.go @@ -29,9 +29,21 @@ type Disruptor interface { type Proxy interface { Start() error Stop() error + // Metrics returns a map of counter-type metrics. Implementations may return zero or more of the metrics defined + // below, as well as any number of implementation-defined metrics. + Metrics() map[string]uint Force() error } +const ( + // MetricRequests is the total number of requests received by the proxy. + MetricRequests = "requests_total" + // MetricRequestsExcluded is the total number of requests passed through due to exclusion rules. + MetricRequestsExcluded = "requests_excluded" + // MetricRequestsDisrupted is the total number requests that the proxy altered in any way. + MetricRequestsDisrupted = "requests_disrupted" +) + // disruptor is an instance of a Disruptor that applies a disruption // to a target type disruptor struct {