From 78c1b33476db04458ac2f66c8eef509a8152ef01 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 6 Apr 2023 15:58:15 +0400 Subject: [PATCH 1/3] fix latency distribution metrics --- fetcher.go | 47 ++++++++++++++++++++++++++++++++--------------- metrics.go | 4 ++++ pool.go | 39 ++++++++++++++++----------------------- pool_test.go | 10 +++++----- 4 files changed, 57 insertions(+), 43 deletions(-) diff --git a/fetcher.go b/fetcher.go index 9a4c657..3f2573f 100644 --- a/fetcher.go +++ b/fetcher.go @@ -10,6 +10,8 @@ import ( "strconv" "time" + "github.com/influxdata/tdigest" + "github.com/google/uuid" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -28,10 +30,10 @@ var ( ) // doFetch attempts to fetch a block from a given Saturn endpoint. It sends the retrieval logs to the logging endpoint upon a successful or failed attempt. -func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) (b blocks.Block, latencyMs, speedPerMs float64, e error) { +func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) (b blocks.Block, e error) { reqUrl := fmt.Sprintf(saturnReqTmpl, c) - latencyMs, speedPerMs, e = p.fetchResource(ctx, from, reqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error { + e = p.fetchResource(ctx, from, reqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error { block, err := io.ReadAll(io.LimitReader(r, maxBlockSize)) if err != nil { switch { @@ -68,7 +70,7 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) } // TODO Refactor to use a metrics collector that separates the collection of metrics from the actual fetching -func (p *pool) fetchResource(ctx context.Context, from string, resource string, mime string, attempt int, cb DataCallback) (latencyMs, speedPerMs float64, err error) { +func (p *pool) fetchResource(ctx context.Context, from string, resource string, mime string, attempt int, cb DataCallback) (err error) { resourceType := "car" if mime == "application/vnd.ipld.raw" { resourceType = "block" @@ -76,7 +78,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, fetchCalledTotalMetric.WithLabelValues(resourceType).Add(1) if ctx.Err() != nil { fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, ctx.Err().Error(), "init").Add(1) - return 0, 0, ctx.Err() + return ctx.Err() } requestId := uuid.NewString() @@ -153,7 +155,24 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, fetchDurationPerCarPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(response_success_end.Sub(start).Milliseconds())) } - latencyMs, speedPerMs = updateSuccessServerTimingMetrics(respHeader.Values(servertiming.HeaderKey), resourceType, isCacheHit, durationMs, ttfbMs, received) + // update L1 server timings + updateSuccessServerTimingMetrics(respHeader.Values(servertiming.HeaderKey), resourceType, isCacheHit, durationMs, ttfbMs, received) + + // record latency & speed dist for cache hits as we know that the observed fetch latency & speed correlates + // with the network latency & speed here. + if isCacheHit { + p.lk.Lock() + if _, ok := p.nodePerf[from]; !ok { + p.nodePerf[from] = &perf{ + latencyDigest: tdigest.NewWithCompression(1000), + speedDigest: tdigest.NewWithCompression(1000), + } + } + perf := p.nodePerf[from] + perf.latencyDigest.Add(float64(ttfbMs), 1) + perf.speedDigest.Add(float64(received)/float64(durationMs), 1) + p.lk.Unlock() + } } else { if isBlockRequest { fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds())) @@ -196,7 +215,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil) if err != nil { recordIfContextErr(resourceType, reqCtx, "build-request", start, requestTimeout) - return 0, 0, err + return err } req.Header.Add("Accept", mime) @@ -217,7 +236,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, recordIfContextErr(resourceType, reqCtx, "send-request", start, requestTimeout) networkError = err.Error() - return 0, 0, fmt.Errorf("http request failed: %w", err) + return fmt.Errorf("http request failed: %w", err) } respHeader = resp.Header defer resp.Body.Close() @@ -253,26 +272,25 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, retryAfter = p.config.SaturnNodeCoolOff } - return 0, 0, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, &ErrSaturnTooManyRequests{retryAfter: retryAfter, Node: from}) + return fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, &ErrSaturnTooManyRequests{retryAfter: retryAfter, Node: from}) } // empty body so it can be re-used. _, _ = io.Copy(io.Discard, resp.Body) if resp.StatusCode == http.StatusGatewayTimeout { - return 0, 0, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrSaturnTimeout) + return fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrSaturnTimeout) } // This should only be 502, but L1s were not translating 404 from Lassie, so we have to support both for now. if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusBadGateway { - return 0, 0, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrContentProviderNotFound) + return fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrContentProviderNotFound) } - return 0, 0, fmt.Errorf("http error from strn: %d", resp.StatusCode) + return fmt.Errorf("http error from strn: %d", resp.StatusCode) } wrapped := TrackingReader{resp.Body, time.Time{}, 0} err = cb(resource, &wrapped) - recordIfContextErr(resourceType, reqCtx, "read-response", start, requestTimeout) fb = wrapped.firstByte received = wrapped.len @@ -280,6 +298,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, // drain body so it can be re-used. _, _ = io.Copy(io.Discard, resp.Body) if err != nil { + recordIfContextErr(resourceType, reqCtx, "read-response", start, requestTimeout) return } @@ -309,7 +328,7 @@ func recordIfContextErr(resourceType string, ctx context.Context, requestState s } // todo: refactor for dryness -func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType string, isCacheHit bool, totalTimeMs, ttfbMs int64, recieved int) (latencyMs, speedPerMs float64) { +func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType string, isCacheHit bool, totalTimeMs, ttfbMs int64, recieved int) { if len(timingHeaders) == 0 { goLogger.Debug("no timing headers in request response.") return @@ -332,11 +351,9 @@ func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType strin if networkTimeMs > 0 { s := float64(recieved) / float64(networkTimeMs) fetchNetworkSpeedPerPeerSuccessMetric.WithLabelValues(resourceType).Observe(s) - speedPerMs = s } networkLatencyMs := ttfbMs - m.Duration.Milliseconds() fetchNetworkLatencyPeerSuccessMetric.WithLabelValues(resourceType).Observe(float64(networkLatencyMs)) - latencyMs = float64(networkLatencyMs) } default: } diff --git a/metrics.go b/metrics.go index 575634f..4ad9a53 100644 --- a/metrics.go +++ b/metrics.go @@ -56,6 +56,10 @@ var ( // buckets to record duration in milliseconds to fetch a CAR, // histogram buckets will be [50ms,.., 30 minutes] -> total 40 buckets +1 prometheus Inf bucket durationMsPerCarHistogram = prometheus.ExponentialBucketsRange(50, 1800000, 40) + + // buckets to measure latency between a caboose peer a Saturn L1, + // [50ms, 100ms, 200ms, ..., ~25 seconds] + latencyDistMsHistogram = prometheus.ExponentialBuckets(50, 2, 10) ) // pool metrics diff --git a/pool.go b/pool.go index 4bacb6c..d96350f 100644 --- a/pool.go +++ b/pool.go @@ -176,7 +176,7 @@ func (p *pool) doRefresh() { latencyHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_peer_latency_dist"), Help: "Fetch latency distribution for peers in millis", - Buckets: durationMsPerCarHistogram, + Buckets: latencyDistMsHistogram, }, []string{"percentile"}) speedHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{ @@ -190,8 +190,13 @@ func (p *pool) doRefresh() { for _, perf := range p.nodePerf { perf := perf for _, pt := range percentiles { - latencyHist.WithLabelValues(fmt.Sprintf("P%f", pt)).Observe(perf.latencyDigest.Quantile(pt)) - speedHist.WithLabelValues(fmt.Sprintf("P%f", pt)).Observe(perf.speedDigest.Quantile(pt)) + // only consider peers with more than a 100 successful retrievals + if perf.latencyDigest.Count() > 100 { + latencyHist.WithLabelValues(fmt.Sprintf("P%f", pt)).Observe(perf.latencyDigest.Quantile(pt)) + } + if perf.speedDigest.Count() > 100 { + speedHist.WithLabelValues(fmt.Sprintf("P%f", pt)).Observe(perf.speedDigest.Quantile(pt)) + } } } @@ -531,29 +536,29 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba } func (p *pool) fetchBlockAndUpdate(ctx context.Context, node string, c cid.Cid, attempt int) (blk blocks.Block, err error) { - blk, latencyMs, speedPerMs, err := p.doFetch(ctx, node, c, attempt) + blk, err = p.doFetch(ctx, node, c, attempt) if err != nil { goLogger.Debugw("fetch attempt failed", "from", node, "attempt", attempt, "of", c, "error", err) } - err = p.commonUpdate(node, err, latencyMs, speedPerMs) + err = p.commonUpdate(node, err) return } func (p *pool) fetchResourceAndUpdate(ctx context.Context, node string, path string, attempt int, cb DataCallback) (err error) { - latencyMs, speedPerMs, err := p.fetchResource(ctx, node, path, "application/vnd.ipld.car", attempt, cb) + err = p.fetchResource(ctx, node, path, "application/vnd.ipld.car", attempt, cb) if err != nil { goLogger.Debugw("fetch attempt failed", "from", node, "attempt", attempt, "of", path, "error", err) } - p.commonUpdate(node, err, latencyMs, speedPerMs) + p.commonUpdate(node, err) return } -func (p *pool) commonUpdate(node string, err error, latencyMs, speedPerMs float64) (ferr error) { +func (p *pool) commonUpdate(node string, err error) (ferr error) { ferr = err if err == nil { - p.changeWeight(node, false, latencyMs, speedPerMs) + p.changeWeight(node, false) // Saturn fetch worked, we return the block. return } @@ -573,7 +578,7 @@ func (p *pool) commonUpdate(node string, err error, latencyMs, speedPerMs float6 } // Saturn fetch failed, we downvote the failing member. - p.changeWeight(node, true, latencyMs, speedPerMs) + p.changeWeight(node, true) return } @@ -597,22 +602,10 @@ func (p *pool) isCoolOffLocked(node string) bool { } // returns the updated weight mapping for tests -func (p *pool) changeWeight(node string, failure bool, latencyMs, speedPerMs float64) { +func (p *pool) changeWeight(node string, failure bool) { p.lk.Lock() defer p.lk.Unlock() - if !failure { - if _, ok := p.nodePerf[node]; !ok { - p.nodePerf[node] = &perf{ - latencyDigest: tdigest.NewWithCompression(1000), - speedDigest: tdigest.NewWithCompression(1000), - } - } - perf := p.nodePerf[node] - perf.latencyDigest.Add(latencyMs, 1) - perf.speedDigest.Add(speedPerMs, 1) - } - // build new member idx := -1 var nm *Member diff --git a/pool_test.go b/pool_test.go index 6077f0d..d404440 100644 --- a/pool_test.go +++ b/pool_test.go @@ -41,7 +41,7 @@ func TestUpdateWeightWithRefresh(t *testing.T) { break } } - ph.pool.changeWeight(ph.eps[0], true, 0, 0) + ph.pool.changeWeight(ph.eps[0], true) // when node is downvoted to zero, it will be added back by a refresh with a weight of 10% max as it has been removed recently. @@ -66,7 +66,7 @@ func TestUpdateWeightWithMembershipDebounce(t *testing.T) { break } } - ph.pool.changeWeight(ph.eps[0], true, 0, 0) + ph.pool.changeWeight(ph.eps[0], true) // node is added back but with 10% max weight. require.Eventually(t, func() bool { @@ -152,17 +152,17 @@ func (ph *poolHarness) assertRemoved(t *testing.T, url string) { } func (ph *poolHarness) downvoteAndAssertRemoved(t *testing.T, url string) { - ph.pool.changeWeight(url, true, 0, 0) + ph.pool.changeWeight(url, true) ph.assertRemoved(t, url) } func (ph *poolHarness) downvoteAndAssertDownvoted(t *testing.T, url string, expected int) { - ph.pool.changeWeight(url, true, 0, 0) + ph.pool.changeWeight(url, true) ph.assertWeight(t, url, expected) } func (ph *poolHarness) upvoteAndAssertUpvoted(t *testing.T, url string, expected int) { - ph.pool.changeWeight(url, false, 0, 0) + ph.pool.changeWeight(url, false) ph.assertWeight(t, url, expected) } From 2b4de3f2b7789d09bb955d4b06661c524257d470 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 6 Apr 2023 16:07:20 +0400 Subject: [PATCH 2/3] fix context error --- fetcher.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/fetcher.go b/fetcher.go index 3f2573f..16bc7f4 100644 --- a/fetcher.go +++ b/fetcher.go @@ -76,9 +76,9 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, resourceType = "block" } fetchCalledTotalMetric.WithLabelValues(resourceType).Add(1) - if ctx.Err() != nil { - fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, ctx.Err().Error(), "init").Add(1) - return ctx.Err() + if ce := ctx.Err(); ce != nil { + fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "init").Add(1) + return ce } requestId := uuid.NewString() @@ -318,10 +318,10 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, } func recordIfContextErr(resourceType string, ctx context.Context, requestState string, start time.Time, timeout time.Duration) { - if ctx.Err() != nil { - fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, ctx.Err().Error(), requestState).Add(1) + if ce := ctx.Err(); ce != nil { + fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), requestState).Add(1) - if errors.Is(ctx.Err(), context.DeadlineExceeded) && time.Since(start) < (timeout-5*time.Second) { + if errors.Is(ce, context.DeadlineExceeded) && time.Since(start) < (timeout-5*time.Second) { fetchIncorrectDeadlineErrorTotalMetric.WithLabelValues(resourceType, requestState).Add(1) } } From c69d0f5b46ab344e5e58cef8427d326ac424703f Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 6 Apr 2023 16:13:13 +0400 Subject: [PATCH 3/3] fix static check --- fetcher.go | 1 - 1 file changed, 1 deletion(-) diff --git a/fetcher.go b/fetcher.go index 16bc7f4..aa5878e 100644 --- a/fetcher.go +++ b/fetcher.go @@ -360,7 +360,6 @@ func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType strin } } } - return } func getCacheStatus(isCacheHit bool) string {