diff --git a/fetcher.go b/fetcher.go index aa5878e..29c3c03 100644 --- a/fetcher.go +++ b/fetcher.go @@ -21,12 +21,14 @@ import ( var saturnReqTmpl = "/ipfs/%s?format=raw" -var ( +const ( saturnNodeIdKey = "Saturn-Node-Id" saturnTransferIdKey = "Saturn-Transfer-Id" saturnCacheHitKey = "Saturn-Cache-Status" saturnCacheHit = "HIT" saturnRetryAfterKey = "Retry-After" + resourceTypeCar = "car" + resourceTypeBlock = "block" ) // doFetch attempts to fetch a block from a given Saturn endpoint. It sends the retrieval logs to the logging endpoint upon a successful or failed attempt. @@ -71,13 +73,12 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) // TODO Refactor to use a metrics collector that separates the collection of metrics from the actual fetching func (p *pool) fetchResource(ctx context.Context, from string, resource string, mime string, attempt int, cb DataCallback) (err error) { - resourceType := "car" + resourceType := resourceTypeCar if mime == "application/vnd.ipld.raw" { - resourceType = "block" + resourceType = resourceTypeBlock } - fetchCalledTotalMetric.WithLabelValues(resourceType).Add(1) if ce := ctx.Err(); ce != nil { - fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "init").Add(1) + fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "fetchResource-init").Add(1) return ce } @@ -214,7 +215,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, defer cancel() req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil) if err != nil { - recordIfContextErr(resourceType, reqCtx, "build-request", start, requestTimeout) + recordIfContextErr(resourceType, reqCtx, "build-http-request") return err } @@ -233,7 +234,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, var resp *http.Response resp, err = p.config.SaturnClient.Do(req) if err != nil { - recordIfContextErr(resourceType, reqCtx, "send-request", start, requestTimeout) + recordIfContextErr(resourceType, reqCtx, "send-http-request") networkError = err.Error() return fmt.Errorf("http request failed: %w", err) @@ -298,7 +299,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, // drain body so it can be re-used. _, _ = io.Copy(io.Discard, resp.Body) if err != nil { - recordIfContextErr(resourceType, reqCtx, "read-response", start, requestTimeout) + recordIfContextErr(resourceType, reqCtx, "read-http-response") return } @@ -317,14 +318,12 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, return } -func recordIfContextErr(resourceType string, ctx context.Context, requestState string, start time.Time, timeout time.Duration) { +func recordIfContextErr(resourceType string, ctx context.Context, requestState string) bool { if ce := ctx.Err(); ce != nil { fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), requestState).Add(1) - - if errors.Is(ce, context.DeadlineExceeded) && time.Since(start) < (timeout-5*time.Second) { - fetchIncorrectDeadlineErrorTotalMetric.WithLabelValues(resourceType, requestState).Add(1) - } + return true } + return false } // todo: refactor for dryness diff --git a/metrics.go b/metrics.go index 4ad9a53..c59191e 100644 --- a/metrics.go +++ b/metrics.go @@ -58,8 +58,8 @@ var ( durationMsPerCarHistogram = prometheus.ExponentialBucketsRange(50, 1800000, 40) // buckets to measure latency between a caboose peer a Saturn L1, - // [50ms, 100ms, 200ms, ..., ~25 seconds] - latencyDistMsHistogram = prometheus.ExponentialBuckets(50, 2, 10) + // [50ms, 75ms, 100ms, ..., 500 ms] + latencyDistMsHistogram = prometheus.LinearBuckets(25, 25, 20) ) // pool metrics @@ -84,6 +84,10 @@ var ( Name: prometheus.BuildFQName("ipfs", "caboose", "pool_new_members"), Help: "New members added to the Caboose pool", }, []string{"weight"}) + + poolWeightBumpMetric = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName("ipfs", "caboose", "pool_weight_bump"), + }) ) var ( @@ -222,10 +226,6 @@ var ( Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_called_total"), }, []string{"resourceType"}) - fetchIncorrectDeadlineErrorTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_incorrect_deadline_error_total"), - }, []string{"resourceType", "requestStage"}) - fetchRequestContextErrorTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_request_context_error_total"), }, []string{"resourceType", "errorType", "requestStage"}) @@ -276,5 +276,5 @@ func init() { CabooseMetrics.MustRegister(fetchCalledTotalMetric) CabooseMetrics.MustRegister(fetchRequestSuccessTimeTraceMetric) - CabooseMetrics.MustRegister(fetchIncorrectDeadlineErrorTotalMetric) + CabooseMetrics.MustRegister(poolWeightBumpMetric) } diff --git a/pool.go b/pool.go index 656c30d..abb4a75 100644 --- a/pool.go +++ b/pool.go @@ -23,6 +23,12 @@ import ( const maxPoolSize = 300 +var ( + latencyPercentile = 0.75 + maxLatency = float64(200) + minSuccessfulRetrievals = 100 +) + // loadPool refreshes the set of Saturn endpoints in the pool by fetching an updated list of responsive Saturn nodes from the // Saturn Orchestrator. func (p *pool) loadPool() ([]string, error) { @@ -202,12 +208,6 @@ func (p *pool) doRefresh() { peerLatencyDistribution = latencyHist peerSpeedDistribution = speedHist - - // TODO: The orchestrator periodically prunes "bad" L1s based on a reputation system - // it owns and runs. We should probably just forget about the Saturn endpoints that were - // previously in the pool but are no longer being returned by the orchestrator. It's highly - // likely that the Orchestrator has deemed them to be non-functional/malicious. - // Let's just override the old pool with the new endpoints returned here. oldMap := make(map[string]bool) n := make([]*Member, 0, len(newEP)) for _, o := range p.endpoints { @@ -245,6 +245,20 @@ func (p *pool) doRefresh() { } } + // give weight bumps to low latency peers that have served > 100 successful "low latency" cache hit retrievals. + poolWeightBumpMetric.Set(0) + for _, m := range n { + m := m + if perf, ok := p.nodePerf[m.url]; ok { + // Our analysis so far shows that we do have ~10-15 peers with -75 < 200ms latency. + // It's not the best but it's a good start and we can tune as we go along. + if perf.latencyDigest.Count() > float64(minSuccessfulRetrievals) && perf.latencyDigest.Quantile(latencyPercentile) <= maxLatency { + poolWeightBumpMetric.Add(1) + m.weight = maxWeight + } + } + } + // If we have more than maxPoolSize nodes, pick the top maxPoolSize sorted by (weight * age). if len(n) > maxPoolSize { sort.Slice(n, func(i, j int) bool { @@ -262,7 +276,6 @@ func (p *pool) doRefresh() { p.c.UpdateWithWeights(p.endpoints.ToWeights()) } poolSizeMetric.Set(float64(len(n))) - poolNewMembersMetric.Reset() // periodic update of a pool health metric byWeight := make(map[int]int) @@ -329,6 +342,10 @@ func cidToKey(c cid.Cid) string { } func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk blocks.Block, err error) { + fetchCalledTotalMetric.WithLabelValues(resourceTypeBlock).Add(1) + if recordIfContextErr(resourceTypeBlock, ctx, "fetchBlockWith") { + return nil, ctx.Err() + } // wait for pool to be initialised <-p.started @@ -352,6 +369,9 @@ func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk blockFetchStart := time.Now() for i := 0; i < len(nodes); i++ { + if recordIfContextErr(resourceTypeBlock, ctx, "fetchBlockWithLoop") { + return nil, ctx.Err() + } blk, err = p.fetchBlockAndUpdate(ctx, nodes[i], c, i) if err == nil { @@ -360,9 +380,6 @@ func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk return } - if ce := ctx.Err(); ce != nil { - return nil, ce - } } fetchDurationBlockFailureMetric.Observe(float64(time.Since(blockFetchStart).Milliseconds())) @@ -471,6 +488,11 @@ func (p *pool) getNodesToFetch(key string, with string) ([]string, error) { } func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallback, with string) (err error) { + fetchCalledTotalMetric.WithLabelValues(resourceTypeCar).Add(1) + if recordIfContextErr(resourceTypeCar, ctx, "fetchResourceWith") { + return ctx.Err() + } + // wait for pool to be initialised <-p.started @@ -496,6 +518,10 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba pq := []string{path} for i := 0; i < len(nodes); i++ { + if recordIfContextErr(resourceTypeCar, ctx, "fetchResourceWithLoop") { + return ctx.Err() + } + err = p.fetchResourceAndUpdate(ctx, nodes[i], pq[0], i, cb) var epr = ErrPartialResponse{} @@ -528,9 +554,6 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba // for now: reset i on partials so we also give them a chance to retry. i = -1 } - if ce := ctx.Err(); ce != nil { - return ce - } } fetchDurationCarFailureMetric.Observe(float64(time.Since(carFetchStart).Milliseconds()))