Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #84 from filecoin-saturn/fix/latency-metrics
Browse files Browse the repository at this point in the history
Fix latency distribution metrics
  • Loading branch information
aarshkshah1992 authored Apr 6, 2023
2 parents 4078b55 + c69d0f5 commit 34e6ab0
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 44 deletions.
48 changes: 32 additions & 16 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strconv"
"time"

"github.com/influxdata/tdigest"

"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
Expand All @@ -28,10 +30,10 @@ var (
)

// doFetch attempts to fetch a block from a given Saturn endpoint. It sends the retrieval logs to the logging endpoint upon a successful or failed attempt.
func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) (b blocks.Block, latencyMs, speedPerMs float64, e error) {
func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) (b blocks.Block, e error) {
reqUrl := fmt.Sprintf(saturnReqTmpl, c)

latencyMs, speedPerMs, e = p.fetchResource(ctx, from, reqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error {
e = p.fetchResource(ctx, from, reqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error {
block, err := io.ReadAll(io.LimitReader(r, maxBlockSize))
if err != nil {
switch {
Expand Down Expand Up @@ -68,15 +70,15 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)
}

// TODO Refactor to use a metrics collector that separates the collection of metrics from the actual fetching
func (p *pool) fetchResource(ctx context.Context, from string, resource string, mime string, attempt int, cb DataCallback) (latencyMs, speedPerMs float64, err error) {
func (p *pool) fetchResource(ctx context.Context, from string, resource string, mime string, attempt int, cb DataCallback) (err error) {
resourceType := "car"
if mime == "application/vnd.ipld.raw" {
resourceType = "block"
}
fetchCalledTotalMetric.WithLabelValues(resourceType).Add(1)
if ce := ctx.Err(); ce != nil {
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "init").Add(1)
return 0, 0, ce
return ce
}

requestId := uuid.NewString()
Expand Down Expand Up @@ -153,7 +155,24 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
fetchDurationPerCarPerPeerSuccessMetric.WithLabelValues(cacheStatus).Observe(float64(response_success_end.Sub(start).Milliseconds()))
}

latencyMs, speedPerMs = updateSuccessServerTimingMetrics(respHeader.Values(servertiming.HeaderKey), resourceType, isCacheHit, durationMs, ttfbMs, received)
// update L1 server timings
updateSuccessServerTimingMetrics(respHeader.Values(servertiming.HeaderKey), resourceType, isCacheHit, durationMs, ttfbMs, received)

// record latency & speed dist for cache hits as we know that the observed fetch latency & speed correlates
// with the network latency & speed here.
if isCacheHit {
p.lk.Lock()
if _, ok := p.nodePerf[from]; !ok {
p.nodePerf[from] = &perf{
latencyDigest: tdigest.NewWithCompression(1000),
speedDigest: tdigest.NewWithCompression(1000),
}
}
perf := p.nodePerf[from]
perf.latencyDigest.Add(float64(ttfbMs), 1)
perf.speedDigest.Add(float64(received)/float64(durationMs), 1)
p.lk.Unlock()
}
} else {
if isBlockRequest {
fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
Expand Down Expand Up @@ -196,7 +215,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
if err != nil {
recordIfContextErr(resourceType, reqCtx, "build-request", start, requestTimeout)
return 0, 0, err
return err
}

req.Header.Add("Accept", mime)
Expand All @@ -217,7 +236,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
recordIfContextErr(resourceType, reqCtx, "send-request", start, requestTimeout)

networkError = err.Error()
return 0, 0, fmt.Errorf("http request failed: %w", err)
return fmt.Errorf("http request failed: %w", err)
}
respHeader = resp.Header
defer resp.Body.Close()
Expand Down Expand Up @@ -253,33 +272,33 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
retryAfter = p.config.SaturnNodeCoolOff
}

return 0, 0, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, &ErrSaturnTooManyRequests{retryAfter: retryAfter, Node: from})
return fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, &ErrSaturnTooManyRequests{retryAfter: retryAfter, Node: from})
}

// empty body so it can be re-used.
_, _ = io.Copy(io.Discard, resp.Body)
if resp.StatusCode == http.StatusGatewayTimeout {
return 0, 0, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrSaturnTimeout)
return fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrSaturnTimeout)
}

// This should only be 502, but L1s were not translating 404 from Lassie, so we have to support both for now.
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusBadGateway {
return 0, 0, fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrContentProviderNotFound)
return fmt.Errorf("http error from strn: %d, err=%w", resp.StatusCode, ErrContentProviderNotFound)
}

return 0, 0, fmt.Errorf("http error from strn: %d", resp.StatusCode)
return fmt.Errorf("http error from strn: %d", resp.StatusCode)
}

wrapped := TrackingReader{resp.Body, time.Time{}, 0}
err = cb(resource, &wrapped)
recordIfContextErr(resourceType, reqCtx, "read-response", start, requestTimeout)

fb = wrapped.firstByte
received = wrapped.len

// drain body so it can be re-used.
_, _ = io.Copy(io.Discard, resp.Body)
if err != nil {
recordIfContextErr(resourceType, reqCtx, "read-response", start, requestTimeout)
return
}

Expand Down Expand Up @@ -309,7 +328,7 @@ func recordIfContextErr(resourceType string, ctx context.Context, requestState s
}

// todo: refactor for dryness
func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType string, isCacheHit bool, totalTimeMs, ttfbMs int64, recieved int) (latencyMs, speedPerMs float64) {
func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType string, isCacheHit bool, totalTimeMs, ttfbMs int64, recieved int) {
if len(timingHeaders) == 0 {
goLogger.Debug("no timing headers in request response.")
return
Expand All @@ -332,18 +351,15 @@ func updateSuccessServerTimingMetrics(timingHeaders []string, resourceType strin
if networkTimeMs > 0 {
s := float64(recieved) / float64(networkTimeMs)
fetchNetworkSpeedPerPeerSuccessMetric.WithLabelValues(resourceType).Observe(s)
speedPerMs = s
}
networkLatencyMs := ttfbMs - m.Duration.Milliseconds()
fetchNetworkLatencyPeerSuccessMetric.WithLabelValues(resourceType).Observe(float64(networkLatencyMs))
latencyMs = float64(networkLatencyMs)
}
default:
}
}
}
}
return
}

func getCacheStatus(isCacheHit bool) string {
Expand Down
4 changes: 4 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ var (
// buckets to record duration in milliseconds to fetch a CAR,
// histogram buckets will be [50ms,.., 30 minutes] -> total 40 buckets +1 prometheus Inf bucket
durationMsPerCarHistogram = prometheus.ExponentialBucketsRange(50, 1800000, 40)

// buckets to measure latency between a caboose peer a Saturn L1,
// [50ms, 100ms, 200ms, ..., ~25 seconds]
latencyDistMsHistogram = prometheus.ExponentialBuckets(50, 2, 10)
)

// pool metrics
Expand Down
39 changes: 16 additions & 23 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (p *pool) doRefresh() {
latencyHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_peer_latency_dist"),
Help: "Fetch latency distribution for peers in millis",
Buckets: durationMsPerCarHistogram,
Buckets: latencyDistMsHistogram,
}, []string{"percentile"})

speedHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand All @@ -190,8 +190,13 @@ func (p *pool) doRefresh() {
for _, perf := range p.nodePerf {
perf := perf
for _, pt := range percentiles {
latencyHist.WithLabelValues(fmt.Sprintf("P%f", pt)).Observe(perf.latencyDigest.Quantile(pt))
speedHist.WithLabelValues(fmt.Sprintf("P%f", pt)).Observe(perf.speedDigest.Quantile(pt))
// only consider peers with more than a 100 successful retrievals
if perf.latencyDigest.Count() > 100 {
latencyHist.WithLabelValues(fmt.Sprintf("P%f", pt)).Observe(perf.latencyDigest.Quantile(pt))
}
if perf.speedDigest.Count() > 100 {
speedHist.WithLabelValues(fmt.Sprintf("P%f", pt)).Observe(perf.speedDigest.Quantile(pt))
}
}
}

Expand Down Expand Up @@ -537,29 +542,29 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba
}

func (p *pool) fetchBlockAndUpdate(ctx context.Context, node string, c cid.Cid, attempt int) (blk blocks.Block, err error) {
blk, latencyMs, speedPerMs, err := p.doFetch(ctx, node, c, attempt)
blk, err = p.doFetch(ctx, node, c, attempt)
if err != nil {
goLogger.Debugw("fetch attempt failed", "from", node, "attempt", attempt, "of", c, "error", err)
}

err = p.commonUpdate(node, err, latencyMs, speedPerMs)
err = p.commonUpdate(node, err)
return
}

func (p *pool) fetchResourceAndUpdate(ctx context.Context, node string, path string, attempt int, cb DataCallback) (err error) {
latencyMs, speedPerMs, err := p.fetchResource(ctx, node, path, "application/vnd.ipld.car", attempt, cb)
err = p.fetchResource(ctx, node, path, "application/vnd.ipld.car", attempt, cb)
if err != nil {
goLogger.Debugw("fetch attempt failed", "from", node, "attempt", attempt, "of", path, "error", err)
}

p.commonUpdate(node, err, latencyMs, speedPerMs)
p.commonUpdate(node, err)
return
}

func (p *pool) commonUpdate(node string, err error, latencyMs, speedPerMs float64) (ferr error) {
func (p *pool) commonUpdate(node string, err error) (ferr error) {
ferr = err
if err == nil {
p.changeWeight(node, false, latencyMs, speedPerMs)
p.changeWeight(node, false)
// Saturn fetch worked, we return the block.
return
}
Expand All @@ -579,7 +584,7 @@ func (p *pool) commonUpdate(node string, err error, latencyMs, speedPerMs float6
}

// Saturn fetch failed, we downvote the failing member.
p.changeWeight(node, true, latencyMs, speedPerMs)
p.changeWeight(node, true)
return
}

Expand All @@ -603,22 +608,10 @@ func (p *pool) isCoolOffLocked(node string) bool {
}

// returns the updated weight mapping for tests
func (p *pool) changeWeight(node string, failure bool, latencyMs, speedPerMs float64) {
func (p *pool) changeWeight(node string, failure bool) {
p.lk.Lock()
defer p.lk.Unlock()

if !failure {
if _, ok := p.nodePerf[node]; !ok {
p.nodePerf[node] = &perf{
latencyDigest: tdigest.NewWithCompression(1000),
speedDigest: tdigest.NewWithCompression(1000),
}
}
perf := p.nodePerf[node]
perf.latencyDigest.Add(latencyMs, 1)
perf.speedDigest.Add(speedPerMs, 1)
}

// build new member
idx := -1
var nm *Member
Expand Down
10 changes: 5 additions & 5 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestUpdateWeightWithRefresh(t *testing.T) {
break
}
}
ph.pool.changeWeight(ph.eps[0], true, 0, 0)
ph.pool.changeWeight(ph.eps[0], true)

// when node is downvoted to zero, it will be added back by a refresh with a weight of 10% max as it has been removed recently.

Expand All @@ -66,7 +66,7 @@ func TestUpdateWeightWithMembershipDebounce(t *testing.T) {
break
}
}
ph.pool.changeWeight(ph.eps[0], true, 0, 0)
ph.pool.changeWeight(ph.eps[0], true)

// node is added back but with 10% max weight.
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -152,17 +152,17 @@ func (ph *poolHarness) assertRemoved(t *testing.T, url string) {
}

func (ph *poolHarness) downvoteAndAssertRemoved(t *testing.T, url string) {
ph.pool.changeWeight(url, true, 0, 0)
ph.pool.changeWeight(url, true)
ph.assertRemoved(t, url)
}

func (ph *poolHarness) downvoteAndAssertDownvoted(t *testing.T, url string, expected int) {
ph.pool.changeWeight(url, true, 0, 0)
ph.pool.changeWeight(url, true)
ph.assertWeight(t, url, expected)
}

func (ph *poolHarness) upvoteAndAssertUpvoted(t *testing.T, url string, expected int) {
ph.pool.changeWeight(url, false, 0, 0)
ph.pool.changeWeight(url, false)
ph.assertWeight(t, url, expected)
}

Expand Down

0 comments on commit 34e6ab0

Please sign in to comment.