Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #85 from filecoin-saturn/feat/more-context-cancell…
Browse files Browse the repository at this point in the history
…ation

More instrumentation of context cancellations and reward low latency nodes
  • Loading branch information
aarshkshah1992 authored Apr 7, 2023
2 parents 34e6ab0 + b99404c commit 65275d0
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 33 deletions.
25 changes: 12 additions & 13 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (

var saturnReqTmpl = "/ipfs/%s?format=raw"

var (
const (
saturnNodeIdKey = "Saturn-Node-Id"
saturnTransferIdKey = "Saturn-Transfer-Id"
saturnCacheHitKey = "Saturn-Cache-Status"
saturnCacheHit = "HIT"
saturnRetryAfterKey = "Retry-After"
resourceTypeCar = "car"
resourceTypeBlock = "block"
)

// doFetch attempts to fetch a block from a given Saturn endpoint. It sends the retrieval logs to the logging endpoint upon a successful or failed attempt.
Expand Down Expand Up @@ -71,13 +73,12 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)

// TODO Refactor to use a metrics collector that separates the collection of metrics from the actual fetching
func (p *pool) fetchResource(ctx context.Context, from string, resource string, mime string, attempt int, cb DataCallback) (err error) {
resourceType := "car"
resourceType := resourceTypeCar
if mime == "application/vnd.ipld.raw" {
resourceType = "block"
resourceType = resourceTypeBlock
}
fetchCalledTotalMetric.WithLabelValues(resourceType).Add(1)
if ce := ctx.Err(); ce != nil {
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "init").Add(1)
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "fetchResource-init").Add(1)
return ce
}

Expand Down Expand Up @@ -214,7 +215,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
if err != nil {
recordIfContextErr(resourceType, reqCtx, "build-request", start, requestTimeout)
recordIfContextErr(resourceType, reqCtx, "build-http-request")
return err
}

Expand All @@ -233,7 +234,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
var resp *http.Response
resp, err = p.config.SaturnClient.Do(req)
if err != nil {
recordIfContextErr(resourceType, reqCtx, "send-request", start, requestTimeout)
recordIfContextErr(resourceType, reqCtx, "send-http-request")

networkError = err.Error()
return fmt.Errorf("http request failed: %w", err)
Expand Down Expand Up @@ -298,7 +299,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
// drain body so it can be re-used.
_, _ = io.Copy(io.Discard, resp.Body)
if err != nil {
recordIfContextErr(resourceType, reqCtx, "read-response", start, requestTimeout)
recordIfContextErr(resourceType, reqCtx, "read-http-response")
return
}

Expand All @@ -317,14 +318,12 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
return
}

func recordIfContextErr(resourceType string, ctx context.Context, requestState string, start time.Time, timeout time.Duration) {
func recordIfContextErr(resourceType string, ctx context.Context, requestState string) bool {
if ce := ctx.Err(); ce != nil {
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), requestState).Add(1)

if errors.Is(ce, context.DeadlineExceeded) && time.Since(start) < (timeout-5*time.Second) {
fetchIncorrectDeadlineErrorTotalMetric.WithLabelValues(resourceType, requestState).Add(1)
}
return true
}
return false
}

// todo: refactor for dryness
Expand Down
14 changes: 7 additions & 7 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ var (
durationMsPerCarHistogram = prometheus.ExponentialBucketsRange(50, 1800000, 40)

// buckets to measure latency between a caboose peer a Saturn L1,
// [50ms, 100ms, 200ms, ..., ~25 seconds]
latencyDistMsHistogram = prometheus.ExponentialBuckets(50, 2, 10)
// [50ms, 75ms, 100ms, ..., 500 ms]
latencyDistMsHistogram = prometheus.LinearBuckets(25, 25, 20)
)

// pool metrics
Expand All @@ -84,6 +84,10 @@ var (
Name: prometheus.BuildFQName("ipfs", "caboose", "pool_new_members"),
Help: "New members added to the Caboose pool",
}, []string{"weight"})

poolWeightBumpMetric = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "pool_weight_bump"),
})
)

var (
Expand Down Expand Up @@ -222,10 +226,6 @@ var (
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_called_total"),
}, []string{"resourceType"})

fetchIncorrectDeadlineErrorTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_incorrect_deadline_error_total"),
}, []string{"resourceType", "requestStage"})

fetchRequestContextErrorTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_request_context_error_total"),
}, []string{"resourceType", "errorType", "requestStage"})
Expand Down Expand Up @@ -276,5 +276,5 @@ func init() {
CabooseMetrics.MustRegister(fetchCalledTotalMetric)
CabooseMetrics.MustRegister(fetchRequestSuccessTimeTraceMetric)

CabooseMetrics.MustRegister(fetchIncorrectDeadlineErrorTotalMetric)
CabooseMetrics.MustRegister(poolWeightBumpMetric)
}
49 changes: 36 additions & 13 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ import (

const maxPoolSize = 300

var (
latencyPercentile = 0.75
maxLatency = float64(200)
minSuccessfulRetrievals = 100
)

// loadPool refreshes the set of Saturn endpoints in the pool by fetching an updated list of responsive Saturn nodes from the
// Saturn Orchestrator.
func (p *pool) loadPool() ([]string, error) {
Expand Down Expand Up @@ -202,12 +208,6 @@ func (p *pool) doRefresh() {

peerLatencyDistribution = latencyHist
peerSpeedDistribution = speedHist

// TODO: The orchestrator periodically prunes "bad" L1s based on a reputation system
// it owns and runs. We should probably just forget about the Saturn endpoints that were
// previously in the pool but are no longer being returned by the orchestrator. It's highly
// likely that the Orchestrator has deemed them to be non-functional/malicious.
// Let's just override the old pool with the new endpoints returned here.
oldMap := make(map[string]bool)
n := make([]*Member, 0, len(newEP))
for _, o := range p.endpoints {
Expand Down Expand Up @@ -245,6 +245,20 @@ func (p *pool) doRefresh() {
}
}

// give weight bumps to low latency peers that have served > 100 successful "low latency" cache hit retrievals.
poolWeightBumpMetric.Set(0)
for _, m := range n {
m := m
if perf, ok := p.nodePerf[m.url]; ok {
// Our analysis so far shows that we do have ~10-15 peers with -75 < 200ms latency.
// It's not the best but it's a good start and we can tune as we go along.
if perf.latencyDigest.Count() > float64(minSuccessfulRetrievals) && perf.latencyDigest.Quantile(latencyPercentile) <= maxLatency {
poolWeightBumpMetric.Add(1)
m.weight = maxWeight
}
}
}

// If we have more than maxPoolSize nodes, pick the top maxPoolSize sorted by (weight * age).
if len(n) > maxPoolSize {
sort.Slice(n, func(i, j int) bool {
Expand All @@ -262,7 +276,6 @@ func (p *pool) doRefresh() {
p.c.UpdateWithWeights(p.endpoints.ToWeights())
}
poolSizeMetric.Set(float64(len(n)))

poolNewMembersMetric.Reset()
// periodic update of a pool health metric
byWeight := make(map[int]int)
Expand Down Expand Up @@ -329,6 +342,10 @@ func cidToKey(c cid.Cid) string {
}

func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk blocks.Block, err error) {
fetchCalledTotalMetric.WithLabelValues(resourceTypeBlock).Add(1)
if recordIfContextErr(resourceTypeBlock, ctx, "fetchBlockWith") {
return nil, ctx.Err()
}
// wait for pool to be initialised
<-p.started

Expand All @@ -352,6 +369,9 @@ func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk

blockFetchStart := time.Now()
for i := 0; i < len(nodes); i++ {
if recordIfContextErr(resourceTypeBlock, ctx, "fetchBlockWithLoop") {
return nil, ctx.Err()
}
blk, err = p.fetchBlockAndUpdate(ctx, nodes[i], c, i)

if err == nil {
Expand All @@ -360,9 +380,6 @@ func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk

return
}
if ce := ctx.Err(); ce != nil {
return nil, ce
}
}

fetchDurationBlockFailureMetric.Observe(float64(time.Since(blockFetchStart).Milliseconds()))
Expand Down Expand Up @@ -471,6 +488,11 @@ func (p *pool) getNodesToFetch(key string, with string) ([]string, error) {
}

func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallback, with string) (err error) {
fetchCalledTotalMetric.WithLabelValues(resourceTypeCar).Add(1)
if recordIfContextErr(resourceTypeCar, ctx, "fetchResourceWith") {
return ctx.Err()
}

// wait for pool to be initialised
<-p.started

Expand All @@ -496,6 +518,10 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba

pq := []string{path}
for i := 0; i < len(nodes); i++ {
if recordIfContextErr(resourceTypeCar, ctx, "fetchResourceWithLoop") {
return ctx.Err()
}

err = p.fetchResourceAndUpdate(ctx, nodes[i], pq[0], i, cb)

var epr = ErrPartialResponse{}
Expand Down Expand Up @@ -528,9 +554,6 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba
// for now: reset i on partials so we also give them a chance to retry.
i = -1
}
if ce := ctx.Err(); ce != nil {
return ce
}
}

fetchDurationCarFailureMetric.Observe(float64(time.Since(carFetchStart).Milliseconds()))
Expand Down

0 comments on commit 65275d0

Please sign in to comment.